""" API路由模块 定义所有Flask路由和请求处理逻辑 """ import logging from datetime import datetime from flask import jsonify, request, render_template, send_from_directory # 导入自定义模块 from .config_manager import ( DEFAULT_CONFIG, save_config_group, get_config_groups, get_config_group_by_id, delete_config_group, save_query_history, get_query_history, get_query_history_by_id, delete_query_history, batch_delete_query_history, # Redis配置管理 REDIS_DEFAULT_CONFIG, save_redis_config_group, get_redis_config_groups, get_redis_config_group_by_id, delete_redis_config_group, save_redis_query_history, get_redis_query_history, get_redis_query_history_by_id, delete_redis_query_history, batch_delete_redis_query_history, parse_redis_config_from_yaml, convert_bytes_to_str # 添加bytes转换函数 ) from .cassandra_client import create_connection from .query_engine import execute_query, execute_mixed_query from .data_comparison import compare_results, generate_comparison_summary from .database import init_database # Redis相关模块 from .redis_client import create_redis_client, test_redis_connection from .redis_query import execute_redis_comparison logger = logging.getLogger(__name__) def setup_routes(app, query_log_collector): """设置所有路由,需要传入app实例和query_log_collector""" # 页面路由 @app.route('/') def index(): return render_template('index.html') @app.route('/test-config-load') def test_config_load(): """配置加载测试页面""" return send_from_directory('.', 'test_config_load.html') @app.route('/db-compare') def db_compare(): return render_template('db_compare.html') # 新增:更语义化的路由别名 @app.route('/cassandra-compare') def cassandra_compare(): """Cassandra数据比对工具 - 语义化路由""" return render_template('db_compare.html') @app.route('/redis-compare') def redis_compare(): return render_template('redis_compare.html') @app.route('/redis-js-test') def redis_js_test(): return render_template('redis_js_test.html') @app.route('/redis-test') def redis_test(): return render_template('redis_test.html') # 基础API @app.route('/api/health') def health_check(): """健康检查端点,用于Docker健康检查和服务监控""" try: # 检查应用基本状态 current_time = datetime.now().isoformat() # 简单的数据库连接检查(可选) from .database import ensure_database db_status = ensure_database() return jsonify({ 'status': 'healthy', 'timestamp': current_time, 'service': 'BigDataTool', 'version': '2.0', 'database': 'ok' if db_status else 'warning' }) except Exception as e: logger.error(f"健康检查失败: {str(e)}") return jsonify({ 'status': 'unhealthy', 'error': str(e), 'timestamp': datetime.now().isoformat() }), 503 @app.route('/api/default-config') def get_default_config(): return jsonify(DEFAULT_CONFIG) @app.route('/api/init-db', methods=['POST']) def api_init_database(): """手动初始化数据库(用于测试)""" success = init_database() if success: return jsonify({'success': True, 'message': '数据库初始化成功'}) else: return jsonify({'success': False, 'error': '数据库初始化失败'}), 500 # 分表查询API @app.route('/api/sharding-query', methods=['POST']) def sharding_query_compare(): """分表查询比对API""" try: data = request.json # 开始新的查询批次 batch_id = query_log_collector.start_new_batch('分表') logger.info("开始执行分表数据库比对查询") # 解析配置 pro_config = data.get('pro_config', DEFAULT_CONFIG['pro_config']) test_config = data.get('test_config', DEFAULT_CONFIG['test_config']) # 从query_config中获取keys等参数 query_config = data.get('query_config', {}) keys = query_config.get('keys', DEFAULT_CONFIG['keys']) fields_to_compare = query_config.get('fields_to_compare', DEFAULT_CONFIG['fields_to_compare']) exclude_fields = query_config.get('exclude_fields', DEFAULT_CONFIG['exclude_fields']) values = data.get('values', []) sharding_config = data.get('sharding_config', {}) # 参数验证 if not values: logger.warning("分表查询失败:未提供查询key值") return jsonify({'error': '请提供查询key值'}), 400 if not keys: logger.warning("分表查询失败:未提供主键字段") return jsonify({'error': '请提供主键字段'}), 400 # 添加详细的参数日志 logger.info(f"分表查询参数解析结果:") logger.info(f" keys: {keys}") logger.info(f" values数量: {len(values)}") logger.info(f" fields_to_compare: {fields_to_compare}") logger.info(f" exclude_fields: {exclude_fields}") logger.info(f" sharding_config原始数据: {sharding_config}") logger.info(f" sharding_config具体参数:") logger.info(f" use_sharding_for_pro: {sharding_config.get('use_sharding_for_pro')}") logger.info(f" use_sharding_for_test: {sharding_config.get('use_sharding_for_test')}") logger.info(f" pro_interval_seconds: {sharding_config.get('pro_interval_seconds')}") logger.info(f" pro_table_count: {sharding_config.get('pro_table_count')}") logger.info(f" test_interval_seconds: {sharding_config.get('test_interval_seconds')}") logger.info(f" test_table_count: {sharding_config.get('test_table_count')}") logger.info(f" interval_seconds: {sharding_config.get('interval_seconds')}") logger.info(f" table_count: {sharding_config.get('table_count')}") logger.info(f"分表查询配置:{len(values)}个key值,生产表:{pro_config['table']},测试表:{test_config['table']}") # 创建数据库连接 pro_cluster, pro_session = create_connection(pro_config) test_cluster, test_session = create_connection(test_config) if not pro_session or not test_session: logger.error("数据库连接失败") return jsonify({'error': '数据库连接失败,请检查配置信息'}), 500 try: # 执行混合查询(支持生产环境分表、测试环境单表/分表的组合) logger.info("执行分表混合查询") query_results = execute_mixed_query( pro_session, test_session, pro_config, test_config, keys, fields_to_compare, values, exclude_fields, sharding_config ) pro_data = query_results['pro_data'] test_data = query_results['test_data'] sharding_info = query_results['sharding_info'] logger.info(f"分表查询结果:生产表 {len(pro_data)} 条记录,测试表 {len(test_data)} 条记录") # 比较结果 differences, field_diff_count, identical_results = compare_results( pro_data, test_data, keys, fields_to_compare, exclude_fields, values ) # 统计信息 different_ids = set() for diff in differences: if 'field' in diff: different_ids.add(list(diff['key'].values())[0]) non_different_ids = set(values) - different_ids # 生成比较总结 summary = generate_comparison_summary( len(values), len(pro_data), len(test_data), differences, identical_results, field_diff_count ) result = { 'total_keys': len(values), 'pro_count': len(pro_data), 'test_count': len(test_data), 'differences': differences, 'identical_results': identical_results, 'field_diff_count': field_diff_count, 'different_ids': list(different_ids), 'non_different_ids': list(non_different_ids), 'summary': summary, 'sharding_info': sharding_info, # 包含分表查询信息 'raw_pro_data': [dict(row._asdict()) for row in pro_data] if pro_data else [], 'raw_test_data': [dict(row._asdict()) for row in test_data] if test_data else [] } logger.info(f"分表比对完成:发现 {len(differences)} 处差异") # 自动保存分表查询历史记录 try: # 生成历史记录名称 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") history_name = f"分表查询_{timestamp}" history_description = f"自动保存 - 分表查询{len(values)}个Key,发现{len(differences)}处差异" # 保存历史记录 history_id = save_query_history( name=history_name, description=history_description, pro_config=pro_config, test_config=test_config, query_config={ 'keys': keys, 'fields_to_compare': fields_to_compare, 'exclude_fields': exclude_fields }, query_keys=values, results_summary=summary, execution_time=0.0, # 可以后续优化计算实际执行时间 total_keys=len(values), differences_count=len(differences), identical_count=len(identical_results), # 新增分表相关参数 sharding_config=sharding_config, query_type='sharding', # 添加查询结果数据 raw_results={ 'raw_pro_data': [dict(row._asdict()) for row in pro_data] if pro_data else [], 'raw_test_data': [dict(row._asdict()) for row in test_data] if test_data else [], 'sharding_info': sharding_info # 包含分表信息 }, differences_data=differences, identical_data=identical_results ) # 关联查询日志与历史记录 if history_id: query_log_collector.set_history_id(history_id) logger.info(f"分表查询历史记录保存成功: {history_name}, ID: {history_id}") else: logger.warning("分表查询历史记录保存失败,无法获取history_id") except Exception as e: logger.warning(f"保存分表查询历史记录失败: {e}") # 结束查询批次 query_log_collector.end_current_batch() # 转换result中可能包含的bytes类型数据 result = convert_bytes_to_str(result) return jsonify(result) except Exception as e: logger.error(f"分表查询执行失败:{str(e)}") # 结束查询批次(出错情况) query_log_collector.end_current_batch() return jsonify({'error': f'分表查询执行失败:{str(e)}'}), 500 finally: # 关闭连接 if pro_cluster: pro_cluster.shutdown() if test_cluster: test_cluster.shutdown() except Exception as e: logger.error(f"分表查询请求处理失败:{str(e)}") # 结束查询批次(请求处理出错) query_log_collector.end_current_batch() return jsonify({'error': f'分表查询请求处理失败:{str(e)}'}), 500 # 单表查询API @app.route('/api/query', methods=['POST']) def query_compare(): try: data = request.json # 开始新的查询批次 batch_id = query_log_collector.start_new_batch('单表') logger.info("开始执行数据库比对查询") # 解析配置 pro_config = data.get('pro_config', DEFAULT_CONFIG['pro_config']) test_config = data.get('test_config', DEFAULT_CONFIG['test_config']) # 从query_config中获取keys等参数 query_config = data.get('query_config', {}) keys = query_config.get('keys', DEFAULT_CONFIG['keys']) fields_to_compare = query_config.get('fields_to_compare', DEFAULT_CONFIG['fields_to_compare']) exclude_fields = query_config.get('exclude_fields', DEFAULT_CONFIG['exclude_fields']) values = data.get('values', []) # 参数验证 if not values: logger.warning("查询失败:未提供查询key值") return jsonify({'error': '请提供查询key值'}), 400 if not keys: logger.warning("查询失败:未提供主键字段") return jsonify({'error': '请提供主键字段'}), 400 # 添加详细的参数日志 logger.info(f"单表查询参数解析结果:") logger.info(f" keys: {keys}") logger.info(f" values数量: {len(values)}") logger.info(f" fields_to_compare: {fields_to_compare}") logger.info(f" exclude_fields: {exclude_fields}") logger.info(f"查询配置:{len(values)}个key值,生产表:{pro_config['table']},测试表:{test_config['table']}") # 创建数据库连接 pro_cluster, pro_session = create_connection(pro_config) test_cluster, test_session = create_connection(test_config) if not pro_session or not test_session: logger.error("数据库连接失败") return jsonify({'error': '数据库连接失败,请检查配置信息'}), 500 try: # 执行查询 logger.info("执行生产环境查询") pro_data = execute_query(pro_session, pro_config['table'], keys, fields_to_compare, values, exclude_fields) logger.info("执行测试环境查询") test_data = execute_query(test_session, test_config['table'], keys, fields_to_compare, values, exclude_fields) logger.info(f"查询结果:生产表 {len(pro_data)} 条记录,测试表 {len(test_data)} 条记录") # 比较结果 differences, field_diff_count, identical_results = compare_results(pro_data, test_data, keys, fields_to_compare, exclude_fields, values) # 统计信息 different_ids = set() for diff in differences: if 'field' in diff: different_ids.add(list(diff['key'].values())[0]) non_different_ids = set(values) - different_ids # 生成比较总结 summary = generate_comparison_summary( len(values), len(pro_data), len(test_data), differences, identical_results, field_diff_count ) result = { 'total_keys': len(values), 'pro_count': len(pro_data), 'test_count': len(test_data), 'differences': differences, 'identical_results': identical_results, 'field_diff_count': field_diff_count, 'different_ids': list(different_ids), 'non_different_ids': list(non_different_ids), 'summary': summary, 'raw_pro_data': [dict(row._asdict()) for row in pro_data] if pro_data else [], 'raw_test_data': [dict(row._asdict()) for row in test_data] if test_data else [] } logger.info(f"比对完成:发现 {len(differences)} 处差异") # 自动保存查询历史记录(可选,基于执行结果) try: # 生成历史记录名称 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") history_name = f"查询_{timestamp}" history_description = f"自动保存 - 查询{len(values)}个Key,发现{len(differences)}处差异" # 保存历史记录 history_id = save_query_history( name=history_name, description=history_description, pro_config=pro_config, test_config=test_config, query_config={ 'keys': keys, 'fields_to_compare': fields_to_compare, 'exclude_fields': exclude_fields }, query_keys=values, results_summary=summary, execution_time=0.0, # 可以后续优化计算实际执行时间 total_keys=len(values), differences_count=len(differences), identical_count=len(identical_results), # 添加查询结果数据 raw_results={ 'raw_pro_data': [dict(row._asdict()) for row in pro_data] if pro_data else [], 'raw_test_data': [dict(row._asdict()) for row in test_data] if test_data else [] }, differences_data=differences, identical_data=identical_results ) # 关联查询日志与历史记录 if history_id: query_log_collector.set_history_id(history_id) logger.info(f"查询历史记录保存成功: {history_name}, ID: {history_id}") else: logger.warning("查询历史记录保存失败,无法获取history_id") except Exception as e: logger.warning(f"保存查询历史记录失败: {e}") # 结束查询批次 query_log_collector.end_current_batch() # 转换result中可能包含的bytes类型数据 result = convert_bytes_to_str(result) return jsonify(result) except Exception as e: logger.error(f"查询执行失败:{str(e)}") # 结束查询批次(出错情况) query_log_collector.end_current_batch() return jsonify({'error': f'查询执行失败:{str(e)}'}), 500 finally: # 关闭连接 if pro_cluster: pro_cluster.shutdown() if test_cluster: test_cluster.shutdown() except Exception as e: logger.error(f"请求处理失败:{str(e)}") # 结束查询批次(请求处理出错) query_log_collector.end_current_batch() return jsonify({'error': f'请求处理失败:{str(e)}'}), 500 # 配置组管理API @app.route('/api/config-groups', methods=['GET']) def api_get_config_groups(): """获取所有配置组""" config_groups = get_config_groups() return jsonify({'success': True, 'data': config_groups}) @app.route('/api/config-groups', methods=['POST']) def api_save_config_group(): """保存配置组""" try: data = request.json name = data.get('name', '').strip() description = data.get('description', '').strip() pro_config = data.get('pro_config', {}) test_config = data.get('test_config', {}) # 获取查询配置,支持两种格式 if 'query_config' in data: # 嵌套格式 query_config = data.get('query_config', {}) else: # 平铺格式 query_config = { 'keys': data.get('keys', []), 'fields_to_compare': data.get('fields_to_compare', []), 'exclude_fields': data.get('exclude_fields', []) } # 提取分表配置 sharding_config = data.get('sharding_config') if not name: return jsonify({'success': False, 'error': '配置组名称不能为空'}), 400 success = save_config_group(name, description, pro_config, test_config, query_config, sharding_config) if success: return jsonify({'success': True, 'message': '配置组保存成功'}) else: return jsonify({'success': False, 'error': '配置组保存失败'}), 500 except Exception as e: logger.error(f"保存配置组API失败: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/config-groups/', methods=['GET']) def api_get_config_group(group_id): """获取指定配置组详情""" config_group = get_config_group_by_id(group_id) if config_group: return jsonify({'success': True, 'data': config_group}) else: return jsonify({'success': False, 'error': '配置组不存在'}), 404 @app.route('/api/config-groups/', methods=['DELETE']) def api_delete_config_group(group_id): """删除配置组""" success = delete_config_group(group_id) if success: return jsonify({'success': True, 'message': '配置组删除成功'}) else: return jsonify({'success': False, 'error': '配置组删除失败'}), 500 # 查询历史管理API @app.route('/api/query-history', methods=['GET']) def api_get_query_history(): """获取所有查询历史记录""" history_list = get_query_history() return jsonify({'success': True, 'data': history_list}) @app.route('/api/query-history', methods=['POST']) def api_save_query_history(): """保存查询历史记录,支持分表查询""" try: data = request.json name = data.get('name', '').strip() description = data.get('description', '').strip() pro_config = data.get('pro_config', {}) test_config = data.get('test_config', {}) query_config = data.get('query_config', {}) query_keys = data.get('query_keys', []) results_summary = data.get('results_summary', {}) execution_time = data.get('execution_time', 0.0) total_keys = data.get('total_keys', 0) differences_count = data.get('differences_count', 0) identical_count = data.get('identical_count', 0) # 新增分表相关字段 sharding_config = data.get('sharding_config') query_type = data.get('query_type', 'single') if not name: return jsonify({'success': False, 'error': '历史记录名称不能为空'}), 400 success = save_query_history( name, description, pro_config, test_config, query_config, query_keys, results_summary, execution_time, total_keys, differences_count, identical_count, sharding_config, query_type ) if success: query_type_desc = '分表查询' if query_type == 'sharding' else '单表查询' return jsonify({'success': True, 'message': f'{query_type_desc}历史记录保存成功'}) else: return jsonify({'success': False, 'error': '查询历史记录保存失败'}), 500 except Exception as e: logger.error(f"保存查询历史记录API失败: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/query-history/', methods=['GET']) def api_get_query_history_detail(history_id): """获取指定查询历史记录详情""" history_record = get_query_history_by_id(history_id) if history_record: return jsonify({'success': True, 'data': history_record}) else: return jsonify({'success': False, 'error': '查询历史记录不存在'}), 404 @app.route('/api/query-history//results', methods=['GET']) def api_get_query_history_results(history_id): """获取查询历史记录的完整结果数据""" try: history_record = get_query_history_by_id(history_id) if not history_record: return jsonify({'success': False, 'error': '历史记录不存在'}), 404 # 安全获取raw_results数据 raw_results = history_record.get('raw_results') if raw_results and isinstance(raw_results, dict): raw_pro_data = raw_results.get('raw_pro_data', []) or [] raw_test_data = raw_results.get('raw_test_data', []) or [] sharding_info = raw_results.get('sharding_info') if history_record.get('query_type') == 'sharding' else None else: raw_pro_data = [] raw_test_data = [] sharding_info = None # 安全获取差异和相同结果数据 differences_data = history_record.get('differences_data') or [] identical_data = history_record.get('identical_data') or [] # 构建完整的查询结果格式,与API查询结果保持一致 result = { 'total_keys': history_record['total_keys'], 'pro_count': len(raw_pro_data), 'test_count': len(raw_test_data), 'differences': differences_data, 'identical_results': identical_data, 'field_diff_count': {}, # 可以从differences_data中重新计算 'summary': history_record.get('results_summary', {}), 'raw_pro_data': raw_pro_data, 'raw_test_data': raw_test_data, # 如果是分表查询,添加分表信息 'sharding_info': sharding_info, # 添加历史记录元信息 'history_info': { 'id': history_record['id'], 'name': history_record['name'], 'description': history_record['description'], 'created_at': history_record['created_at'], 'query_type': history_record.get('query_type', 'single') } } # 重新计算field_diff_count if differences_data: field_diff_count = {} for diff in differences_data: if isinstance(diff, dict) and 'field' in diff: field_name = diff['field'] field_diff_count[field_name] = field_diff_count.get(field_name, 0) + 1 result['field_diff_count'] = field_diff_count return jsonify({ 'success': True, 'data': result, 'message': f'历史记录 "{history_record["name"]}" 结果加载成功' }) except Exception as e: logger.error(f"获取查询历史记录结果失败: {e}") return jsonify({'success': False, 'error': f'获取历史记录结果失败: {str(e)}'}), 500 @app.route('/api/query-history/', methods=['DELETE']) def api_delete_query_history(history_id): """删除查询历史记录""" success = delete_query_history(history_id) if success: return jsonify({'success': True, 'message': '查询历史记录删除成功'}) else: return jsonify({'success': False, 'error': '查询历史记录删除失败'}), 500 @app.route('/api/query-history/batch-delete', methods=['POST']) def api_batch_delete_query_history(): """批量删除Cassandra查询历史记录""" try: data = request.get_json() if not data: return jsonify({'success': False, 'error': '请求数据格式错误'}), 400 history_ids = data.get('history_ids', []) # 验证参数 if not history_ids: return jsonify({'success': False, 'error': '请提供要删除的历史记录ID列表'}), 400 if not isinstance(history_ids, list): return jsonify({'success': False, 'error': 'history_ids必须是数组'}), 400 # 验证所有ID都是整数 try: history_ids = [int(id) for id in history_ids] except (ValueError, TypeError): return jsonify({'success': False, 'error': '历史记录ID必须是整数'}), 400 # 调用批量删除函数 result = batch_delete_query_history(history_ids) if result['success']: return jsonify(result) else: return jsonify(result), 500 except Exception as e: logger.error(f"批量删除Cassandra查询历史记录异常: {e}") return jsonify({'success': False, 'error': f'服务器内部错误: {str(e)}'}), 500 # 查询日志管理API @app.route('/api/query-logs', methods=['GET']) def api_get_query_logs(): """获取查询日志,支持分组显示和数据库存储""" try: limit = request.args.get('limit', type=int) grouped = request.args.get('grouped', 'true').lower() == 'true' # 默认分组显示 from_db = request.args.get('from_db', 'true').lower() == 'true' # 默认从数据库获取 if grouped: # 返回分组日志 grouped_logs = query_log_collector.get_logs_grouped_by_batch(limit, from_db) # 获取总数(用于统计) total_logs = query_log_collector._get_total_logs_count() if from_db else len(query_log_collector.logs) return jsonify({ 'success': True, 'data': grouped_logs, 'total': total_logs, 'grouped': True, 'from_db': from_db }) else: # 返回原始日志列表 logs = query_log_collector.get_logs(limit, from_db) total_logs = query_log_collector._get_total_logs_count() if from_db else len(query_log_collector.logs) return jsonify({ 'success': True, 'data': logs, 'total': total_logs, 'grouped': False, 'from_db': from_db }) except Exception as e: logger.error(f"获取查询日志失败: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/query-logs', methods=['DELETE']) def api_clear_query_logs(): """清空查询日志,支持清空数据库日志""" try: clear_db = request.args.get('clear_db', 'true').lower() == 'true' # 默认清空数据库 query_log_collector.clear_logs(clear_db) message = '查询日志已清空(包括数据库)' if clear_db else '查询日志已清空(仅内存)' return jsonify({'success': True, 'message': message}) except Exception as e: logger.error(f"清空查询日志失败: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/query-logs/cleanup', methods=['POST']) def api_cleanup_old_logs(): """清理旧的查询日志""" try: days_to_keep = request.json.get('days_to_keep', 30) if request.json else 30 deleted_count = query_log_collector.cleanup_old_logs(days_to_keep) return jsonify({ 'success': True, 'message': f'成功清理 {deleted_count} 条超过 {days_to_keep} 天的旧日志', 'deleted_count': deleted_count }) except Exception as e: logger.error(f"清理旧日志失败: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/query-logs/history/', methods=['GET']) def api_get_query_logs_by_history(history_id): """根据历史记录ID获取相关查询日志""" try: logs = query_log_collector.get_logs_by_history_id(history_id) # 按批次分组显示 grouped_logs = {} batch_order = [] for log in logs: batch_id = log.get('batch_id', 'unknown') if batch_id not in grouped_logs: grouped_logs[batch_id] = [] batch_order.append(batch_id) grouped_logs[batch_id].append(log) # 返回按时间顺序排列的批次 grouped_result = [(batch_id, grouped_logs[batch_id]) for batch_id in batch_order] return jsonify({ 'success': True, 'data': grouped_result, 'total': len(logs), 'history_id': history_id, 'grouped': True }) except Exception as e: logger.error(f"获取历史记录相关查询日志失败: {e}") return jsonify({'success': False, 'error': str(e)}), 500 # Redis相关API @app.route('/api/redis/test-connection', methods=['POST']) def api_test_redis_connection(): """测试Redis连接""" try: data = request.json cluster_config = data.get('cluster_config', {}) cluster_name = data.get('cluster_name', 'Redis集群') # 验证配置 if not cluster_config.get('nodes'): return jsonify({'success': False, 'error': '未配置Redis节点'}), 400 # 测试连接 result = test_redis_connection(cluster_config, cluster_name) if result['success']: return jsonify({ 'success': True, 'message': f'{cluster_name}连接成功', 'data': { 'connection_time': result['connection_time'], 'cluster_info': result['cluster_info'] } }) else: return jsonify({ 'success': False, 'error': result['error'], 'connection_time': result['connection_time'] }), 500 except Exception as e: logger.error(f"Redis连接测试失败: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/redis/compare', methods=['POST']) def api_redis_compare(): """Redis数据比较API""" try: data = request.json # 开始新的查询批次 batch_id = query_log_collector.start_new_batch('Redis') logger.info("开始执行Redis数据比较") # 解析配置 cluster1_config = data.get('cluster1_config', {}) cluster2_config = data.get('cluster2_config', {}) query_options = data.get('query_options', {}) # 参数验证 if not cluster1_config.get('nodes'): logger.warning("Redis比较失败:未配置第一个Redis集群") return jsonify({'error': '请配置第一个Redis集群'}), 400 if not cluster2_config.get('nodes'): logger.warning("Redis比较失败:未配置第二个Redis集群") return jsonify({'error': '请配置第二个Redis集群'}), 400 # 添加详细的参数日志 logger.info(f"Redis比较参数解析结果:") logger.info(f" 集群1: {cluster1_config.get('name', '集群1')}") logger.info(f" 集群2: {cluster2_config.get('name', '集群2')}") logger.info(f" 查询模式: {query_options.get('mode', 'random')}") if query_options.get('mode') == 'random': logger.info(f" 随机查询数量: {query_options.get('count', 100)}") logger.info(f" Key模式: {query_options.get('pattern', '*')}") else: logger.info(f" 指定Key数量: {len(query_options.get('keys', []))}") # 执行Redis比较 logger.info("执行Redis数据比较") result = execute_redis_comparison(cluster1_config, cluster2_config, query_options) if 'error' in result: logger.error(f"Redis比较失败: {result['error']}") query_log_collector.end_current_batch() return jsonify({'error': result['error']}), 500 logger.info(f"Redis比较完成") logger.info(f"比较统计: 总计{result['stats']['total_keys']}个key,相同{result['stats']['identical_count']}个,不同{result['stats']['different_count']}个") # 增强结果,添加原生数据信息 enhanced_result = result.copy() enhanced_result['raw_data'] = { 'cluster1_data': [], 'cluster2_data': [] } # 从比较结果中提取原生数据信息 for item in result.get('identical_results', []): if 'key' in item and 'value' in item: enhanced_result['raw_data']['cluster1_data'].append({ 'key': item['key'], 'value': item['value'], 'type': 'identical' }) enhanced_result['raw_data']['cluster2_data'].append({ 'key': item['key'], 'value': item['value'], 'type': 'identical' }) for item in result.get('different_results', []): if 'key' in item: if 'cluster1_value' in item: enhanced_result['raw_data']['cluster1_data'].append({ 'key': item['key'], 'value': item['cluster1_value'], 'type': 'different' }) if 'cluster2_value' in item: enhanced_result['raw_data']['cluster2_data'].append({ 'key': item['key'], 'value': item['cluster2_value'], 'type': 'different' }) for item in result.get('missing_results', []): if 'key' in item: if 'cluster1_value' in item and item['cluster1_value'] is not None: enhanced_result['raw_data']['cluster1_data'].append({ 'key': item['key'], 'value': item['cluster1_value'], 'type': 'missing' }) if 'cluster2_value' in item and item['cluster2_value'] is not None: enhanced_result['raw_data']['cluster2_data'].append({ 'key': item['key'], 'value': item['cluster2_value'], 'type': 'missing' }) logger.info(f"原生数据统计: 集群1={len(enhanced_result['raw_data']['cluster1_data'])}条, 集群2={len(enhanced_result['raw_data']['cluster2_data'])}条") # 使用增强结果进行后续处理 result = enhanced_result # 自动保存Redis查询历史记录 try: # 生成历史记录名称 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") history_name = f"Redis比较_{timestamp}" history_description = f"自动保存 - Redis比较{result['stats']['total_keys']}个Key,发现{result['stats']['different_count']}处差异" # 计算查询键值列表 query_keys = [] if query_options.get('mode') == 'specified': query_keys = query_options.get('keys', []) elif query_options.get('mode') == 'random': # 对于随机模式,从结果中提取实际查询的键 for item in result.get('identical_results', []): if 'key' in item: query_keys.append(item['key']) for item in result.get('different_results', []): if 'key' in item: query_keys.append(item['key']) for item in result.get('missing_results', []): if 'key' in item: query_keys.append(item['key']) # 保存Redis查询历史记录 history_id = save_redis_query_history( name=history_name, description=history_description, cluster1_config=cluster1_config, cluster2_config=cluster2_config, query_options=query_options, query_keys=query_keys, results_summary=result['stats'], execution_time=result['performance_report']['total_time'], total_keys=result['stats']['total_keys'], different_count=result['stats']['different_count'], identical_count=result['stats']['identical_count'], missing_count=result['stats']['missing_in_cluster1'] + result['stats']['missing_in_cluster2'] + result['stats']['both_missing'], raw_results={ 'identical_results': result['identical_results'], 'different_results': result['different_results'], 'missing_results': result['missing_results'], 'performance_report': result['performance_report'] } ) # 关联查询日志与历史记录 if history_id: query_log_collector.set_history_id(history_id) logger.info(f"Redis查询历史记录保存成功: {history_name}, ID: {history_id}") else: logger.warning("Redis查询历史记录保存失败,无法获取history_id") except Exception as e: logger.warning(f"保存Redis查询历史记录失败: {e}") import traceback logger.error(f"详细错误信息: {traceback.format_exc()}") # 结束查询批次 query_log_collector.end_current_batch() return jsonify(result) except Exception as e: logger.error(f"Redis比较请求处理失败:{str(e)}") # 结束查询批次(请求处理出错) query_log_collector.end_current_batch() return jsonify({'error': f'Redis比较请求处理失败:{str(e)}'}), 500 @app.route('/api/redis/default-config') def get_redis_default_config(): """获取Redis默认配置""" default_redis_config = { 'cluster1_config': { 'name': '生产集群', 'nodes': [ {'host': '127.0.0.1', 'port': 7000} ], 'password': '', 'socket_timeout': 3, 'socket_connect_timeout': 3, 'max_connections_per_node': 16 }, 'cluster2_config': { 'name': '测试集群', 'nodes': [ {'host': '127.0.0.1', 'port': 7001} ], 'password': '', 'socket_timeout': 3, 'socket_connect_timeout': 3, 'max_connections_per_node': 16 }, 'query_options': { 'mode': 'random', 'count': 100, 'pattern': '*', 'source_cluster': 'cluster2', 'keys': [] } } return jsonify(default_redis_config) # Redis配置管理API @app.route('/api/redis/config-groups', methods=['GET']) def api_get_redis_config_groups(): """获取所有Redis配置组""" config_groups = get_redis_config_groups() return jsonify({'success': True, 'data': config_groups}) @app.route('/api/redis/config-groups', methods=['POST']) def api_save_redis_config_group(): """保存Redis配置组""" try: data = request.json name = data.get('name', '').strip() description = data.get('description', '').strip() cluster1_config = data.get('cluster1_config', {}) cluster2_config = data.get('cluster2_config', {}) query_options = data.get('query_options', {}) # 参数验证 if not name: return jsonify({'success': False, 'error': '配置组名称不能为空'}), 400 if not cluster1_config or not cluster1_config.get('nodes'): return jsonify({'success': False, 'error': '请配置集群1信息'}), 400 if not cluster2_config or not cluster2_config.get('nodes'): return jsonify({'success': False, 'error': '请配置集群2信息'}), 400 success = save_redis_config_group(name, description, cluster1_config, cluster2_config, query_options) if success: return jsonify({'success': True, 'message': f'Redis配置组 "{name}" 保存成功'}) else: return jsonify({'success': False, 'error': 'Redis配置组保存失败'}), 500 except Exception as e: logger.error(f"保存Redis配置组API失败: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/redis/config-groups/', methods=['GET']) def api_get_redis_config_group_detail(group_id): """获取Redis配置组详情""" config_group = get_redis_config_group_by_id(group_id) if config_group: return jsonify({'success': True, 'data': config_group}) else: return jsonify({'success': False, 'error': 'Redis配置组不存在'}), 404 @app.route('/api/redis/config-groups/', methods=['DELETE']) def api_delete_redis_config_group(group_id): """删除Redis配置组""" success = delete_redis_config_group(group_id) if success: return jsonify({'success': True, 'message': 'Redis配置组删除成功'}) else: return jsonify({'success': False, 'error': 'Redis配置组删除失败'}), 500 @app.route('/api/redis/import-config', methods=['POST']) def api_import_redis_config(): """一键导入Redis配置""" try: data = request.json config_text = data.get('config_text', '').strip() if not config_text: return jsonify({'success': False, 'error': '配置内容不能为空'}), 400 # 解析配置 redis_config = parse_redis_config_from_yaml(config_text) if not redis_config: return jsonify({'success': False, 'error': '配置格式解析失败,请检查配置内容'}), 400 # 验证必要字段 if not redis_config.get('nodes'): return jsonify({'success': False, 'error': '未找到有效的集群地址配置'}), 400 return jsonify({ 'success': True, 'data': redis_config, 'message': '配置导入成功' }) except Exception as e: logger.error(f"导入Redis配置失败: {e}") return jsonify({'success': False, 'error': f'导入配置失败: {str(e)}'}), 500 # Redis查询历史API @app.route('/api/redis/query-history', methods=['GET']) def api_get_redis_query_history(): """获取Redis查询历史记录""" history_list = get_redis_query_history() return jsonify({'success': True, 'data': history_list}) @app.route('/api/redis/query-history', methods=['POST']) def api_save_redis_query_history(): """保存Redis查询历史记录""" try: data = request.json name = data.get('name', '').strip() description = data.get('description', '').strip() cluster1_config = data.get('cluster1_config', {}) cluster2_config = data.get('cluster2_config', {}) query_options = data.get('query_options', {}) query_keys = data.get('query_keys', []) results_summary = data.get('results_summary', {}) execution_time = data.get('execution_time', 0) total_keys = data.get('total_keys', 0) different_count = data.get('different_count', 0) identical_count = data.get('identical_count', 0) missing_count = data.get('missing_count', 0) raw_results = data.get('raw_results') # 参数验证 if not name: return jsonify({'success': False, 'error': '历史记录名称不能为空'}), 400 history_id = save_redis_query_history( name, description, cluster1_config, cluster2_config, query_options, query_keys, results_summary, execution_time, total_keys, different_count, identical_count, missing_count, raw_results ) if history_id: return jsonify({'success': True, 'message': f'Redis查询历史记录保存成功', 'history_id': history_id}) else: return jsonify({'success': False, 'error': 'Redis查询历史记录保存失败'}), 500 except Exception as e: logger.error(f"保存Redis查询历史记录API失败: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/redis/query-history/', methods=['GET']) def api_get_redis_query_history_detail(history_id): """获取Redis查询历史记录详情""" history_record = get_redis_query_history_by_id(history_id) if history_record: return jsonify({'success': True, 'data': history_record}) else: return jsonify({'success': False, 'error': 'Redis查询历史记录不存在'}), 404 @app.route('/api/redis/query-history/', methods=['DELETE']) def api_delete_redis_query_history(history_id): """删除Redis查询历史记录""" success = delete_redis_query_history(history_id) if success: return jsonify({'success': True, 'message': 'Redis查询历史记录删除成功'}) else: return jsonify({'success': False, 'error': 'Redis查询历史记录删除失败'}), 500 @app.route('/api/redis/query-history/batch-delete', methods=['POST']) def api_batch_delete_redis_query_history(): """批量删除Redis查询历史记录""" try: data = request.get_json() if not data or 'history_ids' not in data: return jsonify({'success': False, 'error': '请提供要删除的历史记录ID列表'}), 400 history_ids = data['history_ids'] if not isinstance(history_ids, list): return jsonify({'success': False, 'error': '历史记录ID列表格式错误'}), 400 if not history_ids: return jsonify({'success': False, 'error': '没有要删除的记录'}), 400 # 验证ID都是整数 try: history_ids = [int(id) for id in history_ids] except (ValueError, TypeError): return jsonify({'success': False, 'error': '历史记录ID格式错误'}), 400 result = batch_delete_redis_query_history(history_ids) if result['success']: return jsonify(result) else: return jsonify(result), 500 except Exception as e: logger.error(f"批量删除Redis历史记录异常: {e}") return jsonify({'success': False, 'error': f'批量删除失败: {str(e)}'}), 500 # Redis查询日志API @app.route('/api/redis/query-logs', methods=['GET']) def api_get_redis_query_logs(): """获取Redis查询日志,支持分组显示""" try: limit = request.args.get('limit', 1000, type=int) grouped = request.args.get('grouped', 'true').lower() == 'true' # 默认分组显示 from_db = request.args.get('from_db', 'true').lower() == 'true' # 默认从数据库获取 if grouped: # 返回分组日志 grouped_logs = query_log_collector.get_logs_grouped_by_batch(limit, from_db) # 过滤出Redis相关的日志组 redis_grouped_logs = [] for batch_id, logs in grouped_logs: # 过滤每个批次中的日志,只保留Redis相关的 redis_logs = [ log for log in logs if log.get('query_type', '').lower() == 'redis' or (log.get('message') and 'redis' in log.get('message', '').lower()) ] if redis_logs: # 只有当批次中有Redis日志时才添加 redis_grouped_logs.append([batch_id, redis_logs]) # 获取总数(用于统计) total_logs = sum(len(logs) for _, logs in redis_grouped_logs) return jsonify({ 'success': True, 'data': redis_grouped_logs, 'grouped': True, 'total_logs': total_logs, 'from_db': from_db }) else: # 返回平铺日志 logs = query_log_collector.get_logs(limit, from_db) # 过滤Redis相关的日志 redis_logs = [ log for log in logs if log.get('query_type', '').lower() == 'redis' or (log.get('message') and 'redis' in log.get('message', '').lower()) ] return jsonify({ 'success': True, 'data': redis_logs, 'grouped': False, 'total_logs': len(redis_logs), 'from_db': from_db }) except Exception as e: logger.error(f"获取Redis查询日志失败: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/redis/query-logs/history/', methods=['GET']) def api_get_redis_query_logs_by_history(history_id): """获取特定历史记录的Redis查询日志""" try: logs = query_log_collector.get_logs_by_history_id(history_id) return jsonify({'success': True, 'data': logs}) except Exception as e: logger.error(f"获取历史记录 {history_id} 的Redis查询日志失败: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/redis/query-logs', methods=['DELETE']) def api_clear_redis_query_logs(): """清空Redis查询日志""" try: query_log_collector.clear_logs() return jsonify({'success': True, 'message': 'Redis查询日志清空成功'}) except Exception as e: logger.error(f"清空Redis查询日志失败: {e}") return jsonify({'success': False, 'error': str(e)}), 500