from flask import Flask, render_template, request, jsonify from cassandra.cluster import Cluster from cassandra.auth import PlainTextAuthProvider import json import os import logging import sqlite3 from datetime import datetime app = Flask(__name__) # 配置日志 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # 数据库配置 DATABASE_PATH = 'config_groups.db' def init_database(): """初始化数据库""" try: conn = sqlite3.connect(DATABASE_PATH) cursor = conn.cursor() # 创建配置组表 cursor.execute(''' CREATE TABLE IF NOT EXISTS config_groups ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL UNIQUE, description TEXT, pro_config TEXT NOT NULL, test_config TEXT NOT NULL, query_config TEXT NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') # 创建查询历史表 cursor.execute(''' CREATE TABLE IF NOT EXISTS query_history ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, description TEXT, pro_config TEXT NOT NULL, test_config TEXT NOT NULL, query_config TEXT NOT NULL, query_keys TEXT NOT NULL, results_summary TEXT NOT NULL, execution_time REAL NOT NULL, total_keys INTEGER NOT NULL, differences_count INTEGER NOT NULL, identical_count INTEGER NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') conn.commit() conn.close() logger.info("数据库初始化完成") return True except Exception as e: logger.error(f"数据库初始化失败: {e}") return False def ensure_database(): """确保数据库和表存在""" if not os.path.exists(DATABASE_PATH): logger.info("数据库文件不存在,正在创建...") return init_database() # 检查表是否存在 try: conn = sqlite3.connect(DATABASE_PATH) cursor = conn.cursor() cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name IN ('config_groups', 'query_history')") results = cursor.fetchall() existing_tables = [row[0] for row in results] if 'config_groups' not in existing_tables or 'query_history' not in existing_tables: logger.info("数据库表不完整,正在重新创建...") return init_database() return True except Exception as e: logger.error(f"检查数据库表失败: {e}") return init_database() def get_db_connection(): """获取数据库连接""" conn = sqlite3.connect(DATABASE_PATH) conn.row_factory = sqlite3.Row return conn def normalize_json_string(value): """标准化JSON字符串,用于比较""" if not isinstance(value, str): return value try: # 尝试解析JSON json_obj = json.loads(value) # 如果是数组,需要进行特殊处理 if isinstance(json_obj, list): # 尝试对数组元素进行标准化排序 normalized_array = normalize_json_array(json_obj) return json.dumps(normalized_array, sort_keys=True, separators=(',', ':')) else: # 普通对象,直接序列化 return json.dumps(json_obj, sort_keys=True, separators=(',', ':')) except (json.JSONDecodeError, TypeError): # 如果不是JSON,返回原值 return value def normalize_json_array(json_array): """标准化JSON数组,处理元素顺序问题""" try: normalized_elements = [] for element in json_array: if isinstance(element, dict): # 对字典元素进行标准化 normalized_elements.append(json.dumps(element, sort_keys=True, separators=(',', ':'))) elif isinstance(element, str): # 如果是字符串,尝试解析为JSON try: parsed_element = json.loads(element) normalized_elements.append(json.dumps(parsed_element, sort_keys=True, separators=(',', ':'))) except: normalized_elements.append(element) else: normalized_elements.append(element) # 对标准化后的元素进行排序,确保顺序一致 normalized_elements.sort() # 重新解析为对象数组 result_array = [] for element in normalized_elements: if isinstance(element, str): try: result_array.append(json.loads(element)) except: result_array.append(element) else: result_array.append(element) return result_array except Exception as e: logger.warning(f"数组标准化失败: {e}") return json_array def is_json_array_field(value): """检查字段是否为JSON数组格式""" if not isinstance(value, (str, list)): return False try: if isinstance(value, str): parsed = json.loads(value) return isinstance(parsed, list) elif isinstance(value, list): # 检查是否为JSON字符串数组 if len(value) > 0 and isinstance(value[0], str): try: json.loads(value[0]) return True except: return False return True except: return False def compare_array_values(value1, value2): """专门用于比较数组类型的值""" try: # 处理字符串表示的数组 if isinstance(value1, str) and isinstance(value2, str): try: array1 = json.loads(value1) array2 = json.loads(value2) if isinstance(array1, list) and isinstance(array2, list): return compare_json_arrays(array1, array2) except: pass # 处理Python列表类型 elif isinstance(value1, list) and isinstance(value2, list): return compare_json_arrays(value1, value2) # 处理混合情况:一个是字符串数组,一个是列表 elif isinstance(value1, list) and isinstance(value2, str): try: array2 = json.loads(value2) if isinstance(array2, list): return compare_json_arrays(value1, array2) except: pass elif isinstance(value1, str) and isinstance(value2, list): try: array1 = json.loads(value1) if isinstance(array1, list): return compare_json_arrays(array1, value2) except: pass return False except Exception as e: logger.warning(f"数组比较失败: {e}") return False def compare_json_arrays(array1, array2): """比较两个JSON数组,忽略元素顺序""" try: if len(array1) != len(array2): return False # 标准化两个数组 normalized_array1 = normalize_json_array(array1.copy()) normalized_array2 = normalize_json_array(array2.copy()) # 将标准化后的数组转换为可比较的格式 comparable1 = json.dumps(normalized_array1, sort_keys=True) comparable2 = json.dumps(normalized_array2, sort_keys=True) return comparable1 == comparable2 except Exception as e: logger.warning(f"JSON数组比较失败: {e}") return False def format_json_for_display(value): """格式化JSON用于显示""" if not isinstance(value, str): return str(value) try: # 尝试解析JSON json_obj = json.loads(value) # 格式化显示(带缩进) return json.dumps(json_obj, sort_keys=True, indent=2, ensure_ascii=False) except (json.JSONDecodeError, TypeError): # 如果不是JSON,返回原值 return str(value) def is_json_field(value): """检查字段是否为JSON格式""" if not isinstance(value, str): return False try: json.loads(value) return True except (json.JSONDecodeError, TypeError): return False def compare_values(value1, value2): """智能比较两个值,支持JSON标准化和数组比较""" # 首先检查是否为数组类型 if is_json_array_field(value1) or is_json_array_field(value2): return compare_array_values(value1, value2) # 如果两个值都是字符串,尝试JSON标准化比较 if isinstance(value1, str) and isinstance(value2, str): normalized_value1 = normalize_json_string(value1) normalized_value2 = normalize_json_string(value2) return normalized_value1 == normalized_value2 # 其他情况直接比较 return value1 == value2 # 默认配置(不显示敏感信息) DEFAULT_CONFIG = { 'pro_config': { 'cluster_name': '', 'hosts': [], 'port': 9042, 'datacenter': '', 'username': '', 'password': '', 'keyspace': '', 'table': '' }, 'test_config': { 'cluster_name': '', 'hosts': [], 'port': 9042, 'datacenter': '', 'username': '', 'password': '', 'keyspace': '', 'table': '' }, 'keys': ['docid'], 'fields_to_compare': [], 'exclude_fields': [] } def save_config_group(name, description, pro_config, test_config, query_config): """保存配置组""" if not ensure_database(): logger.error("数据库初始化失败") return False conn = get_db_connection() cursor = conn.cursor() try: cursor.execute(''' INSERT OR REPLACE INTO config_groups (name, description, pro_config, test_config, query_config, updated_at) VALUES (?, ?, ?, ?, ?, ?) ''', ( name, description, json.dumps(pro_config), json.dumps(test_config), json.dumps(query_config), datetime.now().isoformat() )) conn.commit() logger.info(f"配置组 '{name}' 保存成功") return True except Exception as e: logger.error(f"保存配置组失败: {e}") return False finally: conn.close() def get_config_groups(): """获取所有配置组""" if not ensure_database(): logger.error("数据库初始化失败") return [] conn = get_db_connection() cursor = conn.cursor() try: cursor.execute(''' SELECT id, name, description, created_at, updated_at FROM config_groups ORDER BY updated_at DESC ''') rows = cursor.fetchall() config_groups = [] for row in rows: config_groups.append({ 'id': row['id'], 'name': row['name'], 'description': row['description'], 'created_at': row['created_at'], 'updated_at': row['updated_at'] }) return config_groups except Exception as e: logger.error(f"获取配置组失败: {e}") return [] finally: conn.close() def get_config_group_by_id(group_id): """根据ID获取配置组详情""" if not ensure_database(): logger.error("数据库初始化失败") return None conn = get_db_connection() cursor = conn.cursor() try: cursor.execute(''' SELECT * FROM config_groups WHERE id = ? ''', (group_id,)) row = cursor.fetchone() if row: return { 'id': row['id'], 'name': row['name'], 'description': row['description'], 'pro_config': json.loads(row['pro_config']), 'test_config': json.loads(row['test_config']), 'query_config': json.loads(row['query_config']), 'created_at': row['created_at'], 'updated_at': row['updated_at'] } return None except Exception as e: logger.error(f"获取配置组详情失败: {e}") return None finally: conn.close() def delete_config_group(group_id): """删除配置组""" if not ensure_database(): logger.error("数据库初始化失败") return False conn = get_db_connection() cursor = conn.cursor() try: cursor.execute('DELETE FROM config_groups WHERE id = ?', (group_id,)) conn.commit() success = cursor.rowcount > 0 if success: logger.info(f"配置组ID {group_id} 删除成功") return success except Exception as e: logger.error(f"删除配置组失败: {e}") return False finally: conn.close() def save_query_history(name, description, pro_config, test_config, query_config, query_keys, results_summary, execution_time, total_keys, differences_count, identical_count): """保存查询历史记录""" if not ensure_database(): logger.error("数据库初始化失败") return False conn = get_db_connection() cursor = conn.cursor() try: cursor.execute(''' INSERT INTO query_history (name, description, pro_config, test_config, query_config, query_keys, results_summary, execution_time, total_keys, differences_count, identical_count) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( name, description, json.dumps(pro_config), json.dumps(test_config), json.dumps(query_config), json.dumps(query_keys), json.dumps(results_summary), execution_time, total_keys, differences_count, identical_count )) conn.commit() logger.info(f"查询历史记录 '{name}' 保存成功") return True except Exception as e: logger.error(f"保存查询历史记录失败: {e}") return False finally: conn.close() def get_query_history(): """获取所有查询历史记录""" if not ensure_database(): logger.error("数据库初始化失败") return [] conn = get_db_connection() cursor = conn.cursor() try: cursor.execute(''' SELECT id, name, description, execution_time, total_keys, differences_count, identical_count, created_at FROM query_history ORDER BY created_at DESC ''') rows = cursor.fetchall() history_list = [] for row in rows: history_list.append({ 'id': row['id'], 'name': row['name'], 'description': row['description'], 'execution_time': row['execution_time'], 'total_keys': row['total_keys'], 'differences_count': row['differences_count'], 'identical_count': row['identical_count'], 'created_at': row['created_at'] }) return history_list except Exception as e: logger.error(f"获取查询历史记录失败: {e}") return [] finally: conn.close() def get_query_history_by_id(history_id): """根据ID获取查询历史记录详情""" if not ensure_database(): logger.error("数据库初始化失败") return None conn = get_db_connection() cursor = conn.cursor() try: cursor.execute(''' SELECT * FROM query_history WHERE id = ? ''', (history_id,)) row = cursor.fetchone() if row: return { 'id': row['id'], 'name': row['name'], 'description': row['description'], 'pro_config': json.loads(row['pro_config']), 'test_config': json.loads(row['test_config']), 'query_config': json.loads(row['query_config']), 'query_keys': json.loads(row['query_keys']), 'results_summary': json.loads(row['results_summary']), 'execution_time': row['execution_time'], 'total_keys': row['total_keys'], 'differences_count': row['differences_count'], 'identical_count': row['identical_count'], 'created_at': row['created_at'] } return None except Exception as e: logger.error(f"获取查询历史记录详情失败: {e}") return None finally: conn.close() def delete_query_history(history_id): """删除查询历史记录""" if not ensure_database(): logger.error("数据库初始化失败") return False conn = get_db_connection() cursor = conn.cursor() try: cursor.execute('DELETE FROM query_history WHERE id = ?', (history_id,)) conn.commit() success = cursor.rowcount > 0 if success: logger.info(f"查询历史记录ID {history_id} 删除成功") return success except Exception as e: logger.error(f"删除查询历史记录失败: {e}") return False finally: conn.close() def create_connection(config): """创建Cassandra连接""" try: auth_provider = PlainTextAuthProvider(username=config['username'], password=config['password']) cluster = Cluster(config['hosts'], port=config['port'], auth_provider=auth_provider) session = cluster.connect(config['keyspace']) return cluster, session except Exception as e: return None, None def execute_query(session, table, keys, fields, values, exclude_fields=None): """执行查询""" try: # 构建查询条件 quoted_values = [f"'{value}'" for value in values] query_conditions = f"{keys[0]} IN ({', '.join(quoted_values)})" # 确定要查询的字段 if fields: fields_str = ", ".join(fields) else: fields_str = "*" query_sql = f"SELECT {fields_str} FROM {table} WHERE {query_conditions};" result = session.execute(query_sql) return list(result) if result else [] except Exception as e: return [] def compare_results(pro_data, test_data, keys, fields_to_compare, exclude_fields, values): """比较查询结果""" differences = [] field_diff_count = {} identical_results = [] # 存储相同的结果 for value in values: # 查找原表和测试表中该ID的相关数据 rows_pro = [row for row in pro_data if getattr(row, keys[0]) == value] rows_test = [row for row in test_data if getattr(row, keys[0]) == value] for row_pro in rows_pro: # 在测试表中查找相同主键的行 row_test = next( (row for row in rows_test if all(getattr(row, key) == getattr(row_pro, key) for key in keys)), None ) if row_test: # 确定要比较的列 columns = fields_to_compare if fields_to_compare else row_pro._fields columns = [col for col in columns if col not in exclude_fields] has_difference = False row_differences = [] identical_fields = {} for column in columns: value_pro = getattr(row_pro, column) value_test = getattr(row_test, column) # 使用智能比较函数 if not compare_values(value_pro, value_test): has_difference = True # 格式化显示值 formatted_pro_value = format_json_for_display(value_pro) formatted_test_value = format_json_for_display(value_test) row_differences.append({ 'key': {key: getattr(row_pro, key) for key in keys}, 'field': column, 'pro_value': formatted_pro_value, 'test_value': formatted_test_value, 'is_json': is_json_field(value_pro) or is_json_field(value_test), 'is_array': is_json_array_field(value_pro) or is_json_array_field(value_test) }) # 统计字段差异次数 field_diff_count[column] = field_diff_count.get(column, 0) + 1 else: # 存储相同的字段值 identical_fields[column] = format_json_for_display(value_pro) if has_difference: differences.extend(row_differences) else: # 如果没有差异,存储到相同结果中 identical_results.append({ 'key': {key: getattr(row_pro, key) for key in keys}, 'pro_fields': identical_fields, 'test_fields': {col: format_json_for_display(getattr(row_test, col)) for col in columns} }) else: # 在测试表中未找到对应行 differences.append({ 'key': {key: getattr(row_pro, key) for key in keys}, 'message': '在测试表中未找到该行' }) # 检查测试表中是否有生产表中不存在的行 for row_test in rows_test: row_pro = next( (row for row in rows_pro if all(getattr(row, key) == getattr(row_test, key) for key in keys)), None ) if not row_pro: differences.append({ 'key': {key: getattr(row_test, key) for key in keys}, 'message': '在生产表中未找到该行' }) return differences, field_diff_count, identical_results def generate_comparison_summary(total_keys, pro_count, test_count, differences, identical_results, field_diff_count): """生成比较总结报告""" # 计算基本统计 different_records = len(set([list(diff['key'].values())[0] for diff in differences if 'field' in diff])) identical_records = len(identical_results) missing_in_test = len([diff for diff in differences if diff.get('message') == '在测试表中未找到该行']) missing_in_pro = len([diff for diff in differences if diff.get('message') == '在生产表中未找到该行']) # 计算百分比 def safe_percentage(part, total): return round((part / total * 100), 2) if total > 0 else 0 identical_percentage = safe_percentage(identical_records, total_keys) different_percentage = safe_percentage(different_records, total_keys) # 生成总结 summary = { 'overview': { 'total_keys_queried': total_keys, 'pro_records_found': pro_count, 'test_records_found': test_count, 'identical_records': identical_records, 'different_records': different_records, 'missing_in_test': missing_in_test, 'missing_in_pro': missing_in_pro }, 'percentages': { 'data_consistency': identical_percentage, 'data_differences': different_percentage, 'missing_rate': safe_percentage(missing_in_test + missing_in_pro, total_keys) }, 'field_analysis': { 'total_fields_compared': len(field_diff_count) if field_diff_count else 0, 'most_different_fields': sorted(field_diff_count.items(), key=lambda x: x[1], reverse=True)[:5] if field_diff_count else [] }, 'data_quality': { 'completeness': safe_percentage(pro_count + test_count, total_keys * 2), 'consistency_score': identical_percentage, 'quality_level': get_quality_level(identical_percentage) }, 'recommendations': generate_recommendations(identical_percentage, missing_in_test, missing_in_pro, field_diff_count) } return summary def get_quality_level(consistency_percentage): """根据一致性百分比获取数据质量等级""" if consistency_percentage >= 95: return {'level': '优秀', 'color': 'success', 'description': '数据一致性非常高'} elif consistency_percentage >= 90: return {'level': '良好', 'color': 'info', 'description': '数据一致性较高'} elif consistency_percentage >= 80: return {'level': '一般', 'color': 'warning', 'description': '数据一致性中等,需要关注'} else: return {'level': '较差', 'color': 'danger', 'description': '数据一致性较低,需要重点处理'} def generate_recommendations(consistency_percentage, missing_in_test, missing_in_pro, field_diff_count): """生成改进建议""" recommendations = [] if consistency_percentage < 90: recommendations.append('建议重点关注数据一致性问题,检查数据同步机制') if missing_in_test > 0: recommendations.append(f'测试环境缺失 {missing_in_test} 条记录,建议检查数据迁移过程') if missing_in_pro > 0: recommendations.append(f'生产环境缺失 {missing_in_pro} 条记录,建议检查数据完整性') if field_diff_count: top_diff_field = max(field_diff_count.items(), key=lambda x: x[1]) recommendations.append(f'字段 "{top_diff_field[0]}" 差异最多({top_diff_field[1]}次),建议优先处理') if not recommendations: recommendations.append('数据质量良好,建议继续保持当前的数据管理流程') return recommendations @app.route('/') def index(): return render_template('index.html') @app.route('/db-compare') def db_compare(): return render_template('db_compare.html') @app.route('/api/query', methods=['POST']) def query_compare(): try: data = request.json logger.info("开始执行数据库比对查询") # 解析配置 pro_config = data.get('pro_config', DEFAULT_CONFIG['pro_config']) test_config = data.get('test_config', DEFAULT_CONFIG['test_config']) keys = data.get('keys', DEFAULT_CONFIG['keys']) fields_to_compare = data.get('fields_to_compare', DEFAULT_CONFIG['fields_to_compare']) exclude_fields = data.get('exclude_fields', DEFAULT_CONFIG['exclude_fields']) values = data.get('values', []) if not values: logger.warning("查询失败:未提供查询key值") return jsonify({'error': '请提供查询key值'}), 400 logger.info(f"查询配置:{len(values)}个key值,生产表:{pro_config['table']},测试表:{test_config['table']}") # 创建数据库连接 pro_cluster, pro_session = create_connection(pro_config) test_cluster, test_session = create_connection(test_config) if not pro_session or not test_session: logger.error("数据库连接失败") return jsonify({'error': '数据库连接失败,请检查配置信息'}), 500 try: # 执行查询 logger.info("执行生产环境查询") pro_data = execute_query(pro_session, pro_config['table'], keys, fields_to_compare, values, exclude_fields) logger.info("执行测试环境查询") test_data = execute_query(test_session, test_config['table'], keys, fields_to_compare, values, exclude_fields) logger.info(f"查询结果:生产表 {len(pro_data)} 条记录,测试表 {len(test_data)} 条记录") # 比较结果 differences, field_diff_count, identical_results = compare_results(pro_data, test_data, keys, fields_to_compare, exclude_fields, values) # 统计信息 different_ids = set() for diff in differences: if 'field' in diff: different_ids.add(list(diff['key'].values())[0]) non_different_ids = set(values) - different_ids # 生成比较总结 summary = generate_comparison_summary( len(values), len(pro_data), len(test_data), differences, identical_results, field_diff_count ) result = { 'total_keys': len(values), 'pro_count': len(pro_data), 'test_count': len(test_data), 'differences': differences, 'identical_results': identical_results, 'field_diff_count': field_diff_count, 'different_ids': list(different_ids), 'non_different_ids': list(non_different_ids), 'summary': summary, 'raw_pro_data': [dict(row._asdict()) for row in pro_data] if pro_data else [], 'raw_test_data': [dict(row._asdict()) for row in test_data] if test_data else [] } logger.info(f"比对完成:发现 {len(differences)} 处差异") # 自动保存查询历史记录(可选,基于执行结果) try: # 生成历史记录名称 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") history_name = f"查询_{timestamp}" history_description = f"自动保存 - 查询{len(values)}个Key,发现{len(differences)}处差异" # 保存历史记录 save_query_history( name=history_name, description=history_description, pro_config=pro_config, test_config=test_config, query_config={ 'keys': keys, 'fields_to_compare': fields_to_compare, 'exclude_fields': exclude_fields }, query_keys=values, results_summary=summary, execution_time=0.0, # 可以后续优化计算实际执行时间 total_keys=len(values), differences_count=len(differences), identical_count=len(identical_results) ) except Exception as e: logger.warning(f"保存查询历史记录失败: {e}") return jsonify(result) except Exception as e: logger.error(f"查询执行失败:{str(e)}") return jsonify({'error': f'查询执行失败:{str(e)}'}), 500 finally: # 关闭连接 if pro_cluster: pro_cluster.shutdown() if test_cluster: test_cluster.shutdown() except Exception as e: logger.error(f"请求处理失败:{str(e)}") return jsonify({'error': f'请求处理失败:{str(e)}'}), 500 @app.route('/api/default-config') def get_default_config(): return jsonify(DEFAULT_CONFIG) # 配置组管理API @app.route('/api/config-groups', methods=['GET']) def api_get_config_groups(): """获取所有配置组""" config_groups = get_config_groups() return jsonify({'success': True, 'data': config_groups}) @app.route('/api/config-groups', methods=['POST']) def api_save_config_group(): """保存配置组""" try: data = request.json name = data.get('name', '').strip() description = data.get('description', '').strip() pro_config = data.get('pro_config', {}) test_config = data.get('test_config', {}) query_config = { 'keys': data.get('keys', []), 'fields_to_compare': data.get('fields_to_compare', []), 'exclude_fields': data.get('exclude_fields', []) } if not name: return jsonify({'success': False, 'error': '配置组名称不能为空'}), 400 success = save_config_group(name, description, pro_config, test_config, query_config) if success: return jsonify({'success': True, 'message': '配置组保存成功'}) else: return jsonify({'success': False, 'error': '配置组保存失败'}), 500 except Exception as e: logger.error(f"保存配置组API失败: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/config-groups/', methods=['GET']) def api_get_config_group(group_id): """获取指定配置组详情""" config_group = get_config_group_by_id(group_id) if config_group: return jsonify({'success': True, 'data': config_group}) else: return jsonify({'success': False, 'error': '配置组不存在'}), 404 @app.route('/api/config-groups/', methods=['DELETE']) def api_delete_config_group(group_id): """删除配置组""" success = delete_config_group(group_id) if success: return jsonify({'success': True, 'message': '配置组删除成功'}) else: return jsonify({'success': False, 'error': '配置组删除失败'}), 500 @app.route('/api/init-db', methods=['POST']) def api_init_database(): """手动初始化数据库(用于测试)""" success = init_database() if success: return jsonify({'success': True, 'message': '数据库初始化成功'}) else: return jsonify({'success': False, 'error': '数据库初始化失败'}), 500 # 查询历史管理API @app.route('/api/query-history', methods=['GET']) def api_get_query_history(): """获取所有查询历史记录""" history_list = get_query_history() return jsonify({'success': True, 'data': history_list}) @app.route('/api/query-history', methods=['POST']) def api_save_query_history(): """保存查询历史记录""" try: data = request.json name = data.get('name', '').strip() description = data.get('description', '').strip() pro_config = data.get('pro_config', {}) test_config = data.get('test_config', {}) query_config = data.get('query_config', {}) query_keys = data.get('query_keys', []) results_summary = data.get('results_summary', {}) execution_time = data.get('execution_time', 0.0) total_keys = data.get('total_keys', 0) differences_count = data.get('differences_count', 0) identical_count = data.get('identical_count', 0) if not name: return jsonify({'success': False, 'error': '历史记录名称不能为空'}), 400 success = save_query_history( name, description, pro_config, test_config, query_config, query_keys, results_summary, execution_time, total_keys, differences_count, identical_count ) if success: return jsonify({'success': True, 'message': '查询历史记录保存成功'}) else: return jsonify({'success': False, 'error': '查询历史记录保存失败'}), 500 except Exception as e: logger.error(f"保存查询历史记录API失败: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/query-history/', methods=['GET']) def api_get_query_history_detail(history_id): """获取指定查询历史记录详情""" history_record = get_query_history_by_id(history_id) if history_record: return jsonify({'success': True, 'data': history_record}) else: return jsonify({'success': False, 'error': '查询历史记录不存在'}), 404 @app.route('/api/query-history/', methods=['DELETE']) def api_delete_query_history(history_id): """删除查询历史记录""" success = delete_query_history(history_id) if success: return jsonify({'success': True, 'message': '查询历史记录删除成功'}) else: return jsonify({'success': False, 'error': '查询历史记录删除失败'}), 500 if __name__ == '__main__': app.run(debug=True, port=5000)