Files
BigDataTool/modules/api_routes.py
YoVinchen fe2803f3da 修复 部分 json 数据不能识别
修复 标签字段比对不完全
2025-08-14 16:32:44 +08:00

1280 lines
58 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
API路由模块
定义所有Flask路由和请求处理逻辑
"""
import logging
from datetime import datetime
from flask import jsonify, request, render_template, send_from_directory
# 导入自定义模块
from .config_manager import (
DEFAULT_CONFIG, save_config_group, get_config_groups,
get_config_group_by_id, delete_config_group,
save_query_history, get_query_history,
get_query_history_by_id, delete_query_history,
batch_delete_query_history,
# Redis配置管理
REDIS_DEFAULT_CONFIG, save_redis_config_group, get_redis_config_groups,
get_redis_config_group_by_id, delete_redis_config_group,
save_redis_query_history, get_redis_query_history,
get_redis_query_history_by_id, delete_redis_query_history,
batch_delete_redis_query_history,
parse_redis_config_from_yaml,
convert_bytes_to_str # 添加bytes转换函数
)
from .cassandra_client import create_connection
from .query_engine import execute_query, execute_mixed_query
from .data_comparison import compare_results, generate_comparison_summary
from .database import init_database
# Redis相关模块
from .redis_client import create_redis_client, test_redis_connection
from .redis_query import execute_redis_comparison
logger = logging.getLogger(__name__)
def setup_routes(app, query_log_collector):
"""设置所有路由需要传入app实例和query_log_collector"""
# 页面路由
@app.route('/')
def index():
return render_template('index.html')
@app.route('/test-config-load')
def test_config_load():
"""配置加载测试页面"""
return send_from_directory('.', 'test_config_load.html')
@app.route('/db-compare')
def db_compare():
return render_template('db_compare.html')
# 新增:更语义化的路由别名
@app.route('/cassandra-compare')
def cassandra_compare():
"""Cassandra数据比对工具 - 语义化路由"""
return render_template('db_compare.html')
@app.route('/redis-compare')
def redis_compare():
return render_template('redis_compare.html')
@app.route('/redis-js-test')
def redis_js_test():
return render_template('redis_js_test.html')
@app.route('/redis-test')
def redis_test():
return render_template('redis_test.html')
# 基础API
@app.route('/api/health')
def health_check():
"""健康检查端点用于Docker健康检查和服务监控"""
try:
# 检查应用基本状态
current_time = datetime.now().isoformat()
# 简单的数据库连接检查(可选)
from .database import ensure_database
db_status = ensure_database()
return jsonify({
'status': 'healthy',
'timestamp': current_time,
'service': 'BigDataTool',
'version': '2.0',
'database': 'ok' if db_status else 'warning'
})
except Exception as e:
logger.error(f"健康检查失败: {str(e)}")
return jsonify({
'status': 'unhealthy',
'error': str(e),
'timestamp': datetime.now().isoformat()
}), 503
@app.route('/api/default-config')
def get_default_config():
return jsonify(DEFAULT_CONFIG)
@app.route('/api/init-db', methods=['POST'])
def api_init_database():
"""手动初始化数据库(用于测试)"""
success = init_database()
if success:
return jsonify({'success': True, 'message': '数据库初始化成功'})
else:
return jsonify({'success': False, 'error': '数据库初始化失败'}), 500
# 分表查询API
@app.route('/api/sharding-query', methods=['POST'])
def sharding_query_compare():
"""分表查询比对API"""
try:
data = request.json
# 开始新的查询批次
batch_id = query_log_collector.start_new_batch('分表')
logger.info("开始执行分表数据库比对查询")
# 解析配置
pro_config = data.get('pro_config', DEFAULT_CONFIG['pro_config'])
test_config = data.get('test_config', DEFAULT_CONFIG['test_config'])
# 从query_config中获取keys等参数
query_config = data.get('query_config', {})
keys = query_config.get('keys', DEFAULT_CONFIG['keys'])
fields_to_compare = query_config.get('fields_to_compare', DEFAULT_CONFIG['fields_to_compare'])
exclude_fields = query_config.get('exclude_fields', DEFAULT_CONFIG['exclude_fields'])
values = data.get('values', [])
sharding_config = data.get('sharding_config', {})
# 参数验证
if not values:
logger.warning("分表查询失败未提供查询key值")
return jsonify({'error': '请提供查询key值'}), 400
if not keys:
logger.warning("分表查询失败:未提供主键字段")
return jsonify({'error': '请提供主键字段'}), 400
# 添加详细的参数日志
logger.info(f"分表查询参数解析结果:")
logger.info(f" keys: {keys}")
logger.info(f" values数量: {len(values)}")
logger.info(f" fields_to_compare: {fields_to_compare}")
logger.info(f" exclude_fields: {exclude_fields}")
logger.info(f" sharding_config原始数据: {sharding_config}")
logger.info(f" sharding_config具体参数:")
logger.info(f" use_sharding_for_pro: {sharding_config.get('use_sharding_for_pro')}")
logger.info(f" use_sharding_for_test: {sharding_config.get('use_sharding_for_test')}")
logger.info(f" pro_interval_seconds: {sharding_config.get('pro_interval_seconds')}")
logger.info(f" pro_table_count: {sharding_config.get('pro_table_count')}")
logger.info(f" test_interval_seconds: {sharding_config.get('test_interval_seconds')}")
logger.info(f" test_table_count: {sharding_config.get('test_table_count')}")
logger.info(f" interval_seconds: {sharding_config.get('interval_seconds')}")
logger.info(f" table_count: {sharding_config.get('table_count')}")
logger.info(f"分表查询配置:{len(values)}个key值生产表{pro_config['table']},测试表:{test_config['table']}")
# 创建数据库连接
pro_cluster, pro_session = create_connection(pro_config)
test_cluster, test_session = create_connection(test_config)
if not pro_session or not test_session:
logger.error("数据库连接失败")
return jsonify({'error': '数据库连接失败,请检查配置信息'}), 500
try:
# 执行混合查询(支持生产环境分表、测试环境单表/分表的组合)
logger.info("执行分表混合查询")
query_results = execute_mixed_query(
pro_session, test_session, pro_config, test_config,
keys, fields_to_compare, values, exclude_fields, sharding_config
)
pro_data = query_results['pro_data']
test_data = query_results['test_data']
sharding_info = query_results['sharding_info']
logger.info(f"分表查询结果:生产表 {len(pro_data)} 条记录,测试表 {len(test_data)} 条记录")
# 比较结果
differences, field_diff_count, identical_results = compare_results(
pro_data, test_data, keys, fields_to_compare, exclude_fields, values
)
# 统计信息
different_ids = set()
for diff in differences:
if 'field' in diff:
different_ids.add(list(diff['key'].values())[0])
non_different_ids = set(values) - different_ids
# 生成比较总结
summary = generate_comparison_summary(
len(values), len(pro_data), len(test_data),
differences, identical_results, field_diff_count
)
result = {
'total_keys': len(values),
'pro_count': len(pro_data),
'test_count': len(test_data),
'differences': differences,
'identical_results': identical_results,
'field_diff_count': field_diff_count,
'different_ids': list(different_ids),
'non_different_ids': list(non_different_ids),
'summary': summary,
'sharding_info': sharding_info, # 包含分表查询信息
'raw_pro_data': [dict(row._asdict()) for row in pro_data] if pro_data else [],
'raw_test_data': [dict(row._asdict()) for row in test_data] if test_data else []
}
logger.info(f"分表比对完成:发现 {len(differences)} 处差异")
# 自动保存分表查询历史记录
try:
# 生成历史记录名称
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
history_name = f"分表查询_{timestamp}"
history_description = f"自动保存 - 分表查询{len(values)}个Key发现{len(differences)}处差异"
# 保存历史记录
history_id = save_query_history(
name=history_name,
description=history_description,
pro_config=pro_config,
test_config=test_config,
query_config={
'keys': keys,
'fields_to_compare': fields_to_compare,
'exclude_fields': exclude_fields
},
query_keys=values,
results_summary=summary,
execution_time=0.0, # 可以后续优化计算实际执行时间
total_keys=len(values),
differences_count=len(differences),
identical_count=len(identical_results),
# 新增分表相关参数
sharding_config=sharding_config,
query_type='sharding',
# 添加查询结果数据
raw_results={
'raw_pro_data': [dict(row._asdict()) for row in pro_data] if pro_data else [],
'raw_test_data': [dict(row._asdict()) for row in test_data] if test_data else [],
'sharding_info': sharding_info # 包含分表信息
},
differences_data=differences,
identical_data=identical_results
)
# 关联查询日志与历史记录
if history_id:
query_log_collector.set_history_id(history_id)
logger.info(f"分表查询历史记录保存成功: {history_name}, ID: {history_id}")
else:
logger.warning("分表查询历史记录保存失败无法获取history_id")
except Exception as e:
logger.warning(f"保存分表查询历史记录失败: {e}")
# 结束查询批次
query_log_collector.end_current_batch()
# 转换result中可能包含的bytes类型数据
result = convert_bytes_to_str(result)
return jsonify(result)
except Exception as e:
logger.error(f"分表查询执行失败:{str(e)}")
# 结束查询批次(出错情况)
query_log_collector.end_current_batch()
return jsonify({'error': f'分表查询执行失败:{str(e)}'}), 500
finally:
# 关闭连接
if pro_cluster:
pro_cluster.shutdown()
if test_cluster:
test_cluster.shutdown()
except Exception as e:
logger.error(f"分表查询请求处理失败:{str(e)}")
# 结束查询批次(请求处理出错)
query_log_collector.end_current_batch()
return jsonify({'error': f'分表查询请求处理失败:{str(e)}'}), 500
# 单表查询API
@app.route('/api/query', methods=['POST'])
def query_compare():
try:
data = request.json
# 开始新的查询批次
batch_id = query_log_collector.start_new_batch('单表')
logger.info("开始执行数据库比对查询")
# 解析配置
pro_config = data.get('pro_config', DEFAULT_CONFIG['pro_config'])
test_config = data.get('test_config', DEFAULT_CONFIG['test_config'])
# 从query_config中获取keys等参数
query_config = data.get('query_config', {})
keys = query_config.get('keys', DEFAULT_CONFIG['keys'])
fields_to_compare = query_config.get('fields_to_compare', DEFAULT_CONFIG['fields_to_compare'])
exclude_fields = query_config.get('exclude_fields', DEFAULT_CONFIG['exclude_fields'])
values = data.get('values', [])
# 参数验证
if not values:
logger.warning("查询失败未提供查询key值")
return jsonify({'error': '请提供查询key值'}), 400
if not keys:
logger.warning("查询失败:未提供主键字段")
return jsonify({'error': '请提供主键字段'}), 400
# 添加详细的参数日志
logger.info(f"单表查询参数解析结果:")
logger.info(f" keys: {keys}")
logger.info(f" values数量: {len(values)}")
logger.info(f" fields_to_compare: {fields_to_compare}")
logger.info(f" exclude_fields: {exclude_fields}")
logger.info(f"查询配置:{len(values)}个key值生产表{pro_config['table']},测试表:{test_config['table']}")
# 创建数据库连接
pro_cluster, pro_session = create_connection(pro_config)
test_cluster, test_session = create_connection(test_config)
if not pro_session or not test_session:
logger.error("数据库连接失败")
return jsonify({'error': '数据库连接失败,请检查配置信息'}), 500
try:
# 执行查询
logger.info("执行生产环境查询")
pro_data = execute_query(pro_session, pro_config['table'], keys, fields_to_compare, values, exclude_fields)
logger.info("执行测试环境查询")
test_data = execute_query(test_session, test_config['table'], keys, fields_to_compare, values, exclude_fields)
logger.info(f"查询结果:生产表 {len(pro_data)} 条记录,测试表 {len(test_data)} 条记录")
# 比较结果
differences, field_diff_count, identical_results = compare_results(pro_data, test_data, keys, fields_to_compare, exclude_fields, values)
# 统计信息
different_ids = set()
for diff in differences:
if 'field' in diff:
different_ids.add(list(diff['key'].values())[0])
non_different_ids = set(values) - different_ids
# 生成比较总结
summary = generate_comparison_summary(
len(values), len(pro_data), len(test_data),
differences, identical_results, field_diff_count
)
result = {
'total_keys': len(values),
'pro_count': len(pro_data),
'test_count': len(test_data),
'differences': differences,
'identical_results': identical_results,
'field_diff_count': field_diff_count,
'different_ids': list(different_ids),
'non_different_ids': list(non_different_ids),
'summary': summary,
'raw_pro_data': [dict(row._asdict()) for row in pro_data] if pro_data else [],
'raw_test_data': [dict(row._asdict()) for row in test_data] if test_data else []
}
logger.info(f"比对完成:发现 {len(differences)} 处差异")
# 自动保存查询历史记录(可选,基于执行结果)
try:
# 生成历史记录名称
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
history_name = f"查询_{timestamp}"
history_description = f"自动保存 - 查询{len(values)}个Key发现{len(differences)}处差异"
# 保存历史记录
history_id = save_query_history(
name=history_name,
description=history_description,
pro_config=pro_config,
test_config=test_config,
query_config={
'keys': keys,
'fields_to_compare': fields_to_compare,
'exclude_fields': exclude_fields
},
query_keys=values,
results_summary=summary,
execution_time=0.0, # 可以后续优化计算实际执行时间
total_keys=len(values),
differences_count=len(differences),
identical_count=len(identical_results),
# 添加查询结果数据
raw_results={
'raw_pro_data': [dict(row._asdict()) for row in pro_data] if pro_data else [],
'raw_test_data': [dict(row._asdict()) for row in test_data] if test_data else []
},
differences_data=differences,
identical_data=identical_results
)
# 关联查询日志与历史记录
if history_id:
query_log_collector.set_history_id(history_id)
logger.info(f"查询历史记录保存成功: {history_name}, ID: {history_id}")
else:
logger.warning("查询历史记录保存失败无法获取history_id")
except Exception as e:
logger.warning(f"保存查询历史记录失败: {e}")
# 结束查询批次
query_log_collector.end_current_batch()
# 转换result中可能包含的bytes类型数据
result = convert_bytes_to_str(result)
return jsonify(result)
except Exception as e:
logger.error(f"查询执行失败:{str(e)}")
# 结束查询批次(出错情况)
query_log_collector.end_current_batch()
return jsonify({'error': f'查询执行失败:{str(e)}'}), 500
finally:
# 关闭连接
if pro_cluster:
pro_cluster.shutdown()
if test_cluster:
test_cluster.shutdown()
except Exception as e:
logger.error(f"请求处理失败:{str(e)}")
# 结束查询批次(请求处理出错)
query_log_collector.end_current_batch()
return jsonify({'error': f'请求处理失败:{str(e)}'}), 500
# 配置组管理API
@app.route('/api/config-groups', methods=['GET'])
def api_get_config_groups():
"""获取所有配置组"""
config_groups = get_config_groups()
return jsonify({'success': True, 'data': config_groups})
@app.route('/api/config-groups', methods=['POST'])
def api_save_config_group():
"""保存配置组"""
try:
data = request.json
name = data.get('name', '').strip()
description = data.get('description', '').strip()
pro_config = data.get('pro_config', {})
test_config = data.get('test_config', {})
# 获取查询配置,支持两种格式
if 'query_config' in data:
# 嵌套格式
query_config = data.get('query_config', {})
else:
# 平铺格式
query_config = {
'keys': data.get('keys', []),
'fields_to_compare': data.get('fields_to_compare', []),
'exclude_fields': data.get('exclude_fields', [])
}
# 提取分表配置
sharding_config = data.get('sharding_config')
if not name:
return jsonify({'success': False, 'error': '配置组名称不能为空'}), 400
success = save_config_group(name, description, pro_config, test_config, query_config, sharding_config)
if success:
return jsonify({'success': True, 'message': '配置组保存成功'})
else:
return jsonify({'success': False, 'error': '配置组保存失败'}), 500
except Exception as e:
logger.error(f"保存配置组API失败: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/config-groups/<int:group_id>', methods=['GET'])
def api_get_config_group(group_id):
"""获取指定配置组详情"""
config_group = get_config_group_by_id(group_id)
if config_group:
return jsonify({'success': True, 'data': config_group})
else:
return jsonify({'success': False, 'error': '配置组不存在'}), 404
@app.route('/api/config-groups/<int:group_id>', methods=['DELETE'])
def api_delete_config_group(group_id):
"""删除配置组"""
success = delete_config_group(group_id)
if success:
return jsonify({'success': True, 'message': '配置组删除成功'})
else:
return jsonify({'success': False, 'error': '配置组删除失败'}), 500
# 查询历史管理API
@app.route('/api/query-history', methods=['GET'])
def api_get_query_history():
"""获取所有查询历史记录"""
history_list = get_query_history()
return jsonify({'success': True, 'data': history_list})
@app.route('/api/query-history', methods=['POST'])
def api_save_query_history():
"""保存查询历史记录,支持分表查询"""
try:
data = request.json
name = data.get('name', '').strip()
description = data.get('description', '').strip()
pro_config = data.get('pro_config', {})
test_config = data.get('test_config', {})
query_config = data.get('query_config', {})
query_keys = data.get('query_keys', [])
results_summary = data.get('results_summary', {})
execution_time = data.get('execution_time', 0.0)
total_keys = data.get('total_keys', 0)
differences_count = data.get('differences_count', 0)
identical_count = data.get('identical_count', 0)
# 新增分表相关字段
sharding_config = data.get('sharding_config')
query_type = data.get('query_type', 'single')
if not name:
return jsonify({'success': False, 'error': '历史记录名称不能为空'}), 400
success = save_query_history(
name, description, pro_config, test_config, query_config,
query_keys, results_summary, execution_time, total_keys,
differences_count, identical_count, sharding_config, query_type
)
if success:
query_type_desc = '分表查询' if query_type == 'sharding' else '单表查询'
return jsonify({'success': True, 'message': f'{query_type_desc}历史记录保存成功'})
else:
return jsonify({'success': False, 'error': '查询历史记录保存失败'}), 500
except Exception as e:
logger.error(f"保存查询历史记录API失败: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/query-history/<int:history_id>', methods=['GET'])
def api_get_query_history_detail(history_id):
"""获取指定查询历史记录详情"""
history_record = get_query_history_by_id(history_id)
if history_record:
return jsonify({'success': True, 'data': history_record})
else:
return jsonify({'success': False, 'error': '查询历史记录不存在'}), 404
@app.route('/api/query-history/<int:history_id>/results', methods=['GET'])
def api_get_query_history_results(history_id):
"""获取查询历史记录的完整结果数据"""
try:
history_record = get_query_history_by_id(history_id)
if not history_record:
return jsonify({'success': False, 'error': '历史记录不存在'}), 404
# 安全获取raw_results数据
raw_results = history_record.get('raw_results')
if raw_results and isinstance(raw_results, dict):
raw_pro_data = raw_results.get('raw_pro_data', []) or []
raw_test_data = raw_results.get('raw_test_data', []) or []
sharding_info = raw_results.get('sharding_info') if history_record.get('query_type') == 'sharding' else None
else:
raw_pro_data = []
raw_test_data = []
sharding_info = None
# 安全获取差异和相同结果数据
differences_data = history_record.get('differences_data') or []
identical_data = history_record.get('identical_data') or []
# 构建完整的查询结果格式与API查询结果保持一致
result = {
'total_keys': history_record['total_keys'],
'pro_count': len(raw_pro_data),
'test_count': len(raw_test_data),
'differences': differences_data,
'identical_results': identical_data,
'field_diff_count': {}, # 可以从differences_data中重新计算
'summary': history_record.get('results_summary', {}),
'raw_pro_data': raw_pro_data,
'raw_test_data': raw_test_data,
# 如果是分表查询,添加分表信息
'sharding_info': sharding_info,
# 添加历史记录元信息
'history_info': {
'id': history_record['id'],
'name': history_record['name'],
'description': history_record['description'],
'created_at': history_record['created_at'],
'query_type': history_record.get('query_type', 'single')
}
}
# 重新计算field_diff_count
if differences_data:
field_diff_count = {}
for diff in differences_data:
if isinstance(diff, dict) and 'field' in diff:
field_name = diff['field']
field_diff_count[field_name] = field_diff_count.get(field_name, 0) + 1
result['field_diff_count'] = field_diff_count
return jsonify({
'success': True,
'data': result,
'message': f'历史记录 "{history_record["name"]}" 结果加载成功'
})
except Exception as e:
logger.error(f"获取查询历史记录结果失败: {e}")
return jsonify({'success': False, 'error': f'获取历史记录结果失败: {str(e)}'}), 500
@app.route('/api/query-history/<int:history_id>', methods=['DELETE'])
def api_delete_query_history(history_id):
"""删除查询历史记录"""
success = delete_query_history(history_id)
if success:
return jsonify({'success': True, 'message': '查询历史记录删除成功'})
else:
return jsonify({'success': False, 'error': '查询历史记录删除失败'}), 500
@app.route('/api/query-history/batch-delete', methods=['POST'])
def api_batch_delete_query_history():
"""批量删除Cassandra查询历史记录"""
try:
data = request.get_json()
if not data:
return jsonify({'success': False, 'error': '请求数据格式错误'}), 400
history_ids = data.get('history_ids', [])
# 验证参数
if not history_ids:
return jsonify({'success': False, 'error': '请提供要删除的历史记录ID列表'}), 400
if not isinstance(history_ids, list):
return jsonify({'success': False, 'error': 'history_ids必须是数组'}), 400
# 验证所有ID都是整数
try:
history_ids = [int(id) for id in history_ids]
except (ValueError, TypeError):
return jsonify({'success': False, 'error': '历史记录ID必须是整数'}), 400
# 调用批量删除函数
result = batch_delete_query_history(history_ids)
if result['success']:
return jsonify(result)
else:
return jsonify(result), 500
except Exception as e:
logger.error(f"批量删除Cassandra查询历史记录异常: {e}")
return jsonify({'success': False, 'error': f'服务器内部错误: {str(e)}'}), 500
# 查询日志管理API
@app.route('/api/query-logs', methods=['GET'])
def api_get_query_logs():
"""获取查询日志,支持分组显示和数据库存储"""
try:
limit = request.args.get('limit', type=int)
grouped = request.args.get('grouped', 'true').lower() == 'true' # 默认分组显示
from_db = request.args.get('from_db', 'true').lower() == 'true' # 默认从数据库获取
if grouped:
# 返回分组日志
grouped_logs = query_log_collector.get_logs_grouped_by_batch(limit, from_db)
# 获取总数(用于统计)
total_logs = query_log_collector._get_total_logs_count() if from_db else len(query_log_collector.logs)
return jsonify({
'success': True,
'data': grouped_logs,
'total': total_logs,
'grouped': True,
'from_db': from_db
})
else:
# 返回原始日志列表
logs = query_log_collector.get_logs(limit, from_db)
total_logs = query_log_collector._get_total_logs_count() if from_db else len(query_log_collector.logs)
return jsonify({
'success': True,
'data': logs,
'total': total_logs,
'grouped': False,
'from_db': from_db
})
except Exception as e:
logger.error(f"获取查询日志失败: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/query-logs', methods=['DELETE'])
def api_clear_query_logs():
"""清空查询日志,支持清空数据库日志"""
try:
clear_db = request.args.get('clear_db', 'true').lower() == 'true' # 默认清空数据库
query_log_collector.clear_logs(clear_db)
message = '查询日志已清空(包括数据库)' if clear_db else '查询日志已清空(仅内存)'
return jsonify({'success': True, 'message': message})
except Exception as e:
logger.error(f"清空查询日志失败: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/query-logs/cleanup', methods=['POST'])
def api_cleanup_old_logs():
"""清理旧的查询日志"""
try:
days_to_keep = request.json.get('days_to_keep', 30) if request.json else 30
deleted_count = query_log_collector.cleanup_old_logs(days_to_keep)
return jsonify({
'success': True,
'message': f'成功清理 {deleted_count} 条超过 {days_to_keep} 天的旧日志',
'deleted_count': deleted_count
})
except Exception as e:
logger.error(f"清理旧日志失败: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/query-logs/history/<int:history_id>', methods=['GET'])
def api_get_query_logs_by_history(history_id):
"""根据历史记录ID获取相关查询日志"""
try:
logs = query_log_collector.get_logs_by_history_id(history_id)
# 按批次分组显示
grouped_logs = {}
batch_order = []
for log in logs:
batch_id = log.get('batch_id', 'unknown')
if batch_id not in grouped_logs:
grouped_logs[batch_id] = []
batch_order.append(batch_id)
grouped_logs[batch_id].append(log)
# 返回按时间顺序排列的批次
grouped_result = [(batch_id, grouped_logs[batch_id]) for batch_id in batch_order]
return jsonify({
'success': True,
'data': grouped_result,
'total': len(logs),
'history_id': history_id,
'grouped': True
})
except Exception as e:
logger.error(f"获取历史记录相关查询日志失败: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
# Redis相关API
@app.route('/api/redis/test-connection', methods=['POST'])
def api_test_redis_connection():
"""测试Redis连接"""
try:
data = request.json
cluster_config = data.get('cluster_config', {})
cluster_name = data.get('cluster_name', 'Redis集群')
# 验证配置
if not cluster_config.get('nodes'):
return jsonify({'success': False, 'error': '未配置Redis节点'}), 400
# 测试连接
result = test_redis_connection(cluster_config, cluster_name)
if result['success']:
return jsonify({
'success': True,
'message': f'{cluster_name}连接成功',
'data': {
'connection_time': result['connection_time'],
'cluster_info': result['cluster_info']
}
})
else:
return jsonify({
'success': False,
'error': result['error'],
'connection_time': result['connection_time']
}), 500
except Exception as e:
logger.error(f"Redis连接测试失败: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/redis/compare', methods=['POST'])
def api_redis_compare():
"""Redis数据比较API"""
try:
data = request.json
# 开始新的查询批次
batch_id = query_log_collector.start_new_batch('Redis')
logger.info("开始执行Redis数据比较")
# 解析配置
cluster1_config = data.get('cluster1_config', {})
cluster2_config = data.get('cluster2_config', {})
query_options = data.get('query_options', {})
# 参数验证
if not cluster1_config.get('nodes'):
logger.warning("Redis比较失败未配置第一个Redis集群")
return jsonify({'error': '请配置第一个Redis集群'}), 400
if not cluster2_config.get('nodes'):
logger.warning("Redis比较失败未配置第二个Redis集群")
return jsonify({'error': '请配置第二个Redis集群'}), 400
# 添加详细的参数日志
logger.info(f"Redis比较参数解析结果:")
logger.info(f" 集群1: {cluster1_config.get('name', '集群1')}")
logger.info(f" 集群2: {cluster2_config.get('name', '集群2')}")
logger.info(f" 查询模式: {query_options.get('mode', 'random')}")
if query_options.get('mode') == 'random':
logger.info(f" 随机查询数量: {query_options.get('count', 100)}")
logger.info(f" Key模式: {query_options.get('pattern', '*')}")
else:
logger.info(f" 指定Key数量: {len(query_options.get('keys', []))}")
# 执行Redis比较
logger.info("执行Redis数据比较")
result = execute_redis_comparison(cluster1_config, cluster2_config, query_options)
if 'error' in result:
logger.error(f"Redis比较失败: {result['error']}")
query_log_collector.end_current_batch()
return jsonify({'error': result['error']}), 500
logger.info(f"Redis比较完成")
logger.info(f"比较统计: 总计{result['stats']['total_keys']}个key相同{result['stats']['identical_count']}个,不同{result['stats']['different_count']}")
# 增强结果,添加原生数据信息
enhanced_result = result.copy()
enhanced_result['raw_data'] = {
'cluster1_data': [],
'cluster2_data': []
}
# 从比较结果中提取原生数据信息
for item in result.get('identical_results', []):
if 'key' in item and 'value' in item:
enhanced_result['raw_data']['cluster1_data'].append({
'key': item['key'],
'value': item['value'],
'type': 'identical'
})
enhanced_result['raw_data']['cluster2_data'].append({
'key': item['key'],
'value': item['value'],
'type': 'identical'
})
for item in result.get('different_results', []):
if 'key' in item:
if 'cluster1_value' in item:
enhanced_result['raw_data']['cluster1_data'].append({
'key': item['key'],
'value': item['cluster1_value'],
'type': 'different'
})
if 'cluster2_value' in item:
enhanced_result['raw_data']['cluster2_data'].append({
'key': item['key'],
'value': item['cluster2_value'],
'type': 'different'
})
for item in result.get('missing_results', []):
if 'key' in item:
if 'cluster1_value' in item and item['cluster1_value'] is not None:
enhanced_result['raw_data']['cluster1_data'].append({
'key': item['key'],
'value': item['cluster1_value'],
'type': 'missing'
})
if 'cluster2_value' in item and item['cluster2_value'] is not None:
enhanced_result['raw_data']['cluster2_data'].append({
'key': item['key'],
'value': item['cluster2_value'],
'type': 'missing'
})
logger.info(f"原生数据统计: 集群1={len(enhanced_result['raw_data']['cluster1_data'])}条, 集群2={len(enhanced_result['raw_data']['cluster2_data'])}")
# 使用增强结果进行后续处理
result = enhanced_result
# 自动保存Redis查询历史记录
try:
# 生成历史记录名称
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
history_name = f"Redis比较_{timestamp}"
history_description = f"自动保存 - Redis比较{result['stats']['total_keys']}个Key发现{result['stats']['different_count']}处差异"
# 计算查询键值列表
query_keys = []
if query_options.get('mode') == 'specified':
query_keys = query_options.get('keys', [])
elif query_options.get('mode') == 'random':
# 对于随机模式,从结果中提取实际查询的键
for item in result.get('identical_results', []):
if 'key' in item:
query_keys.append(item['key'])
for item in result.get('different_results', []):
if 'key' in item:
query_keys.append(item['key'])
for item in result.get('missing_results', []):
if 'key' in item:
query_keys.append(item['key'])
# 保存Redis查询历史记录
history_id = save_redis_query_history(
name=history_name,
description=history_description,
cluster1_config=cluster1_config,
cluster2_config=cluster2_config,
query_options=query_options,
query_keys=query_keys,
results_summary=result['stats'],
execution_time=result['performance_report']['total_time'],
total_keys=result['stats']['total_keys'],
different_count=result['stats']['different_count'],
identical_count=result['stats']['identical_count'],
missing_count=result['stats']['missing_in_cluster1'] + result['stats']['missing_in_cluster2'] + result['stats']['both_missing'],
raw_results={
'identical_results': result['identical_results'],
'different_results': result['different_results'],
'missing_results': result['missing_results'],
'performance_report': result['performance_report']
}
)
# 关联查询日志与历史记录
if history_id:
query_log_collector.set_history_id(history_id)
logger.info(f"Redis查询历史记录保存成功: {history_name}, ID: {history_id}")
else:
logger.warning("Redis查询历史记录保存失败无法获取history_id")
except Exception as e:
logger.warning(f"保存Redis查询历史记录失败: {e}")
import traceback
logger.error(f"详细错误信息: {traceback.format_exc()}")
# 结束查询批次
query_log_collector.end_current_batch()
return jsonify(result)
except Exception as e:
logger.error(f"Redis比较请求处理失败{str(e)}")
# 结束查询批次(请求处理出错)
query_log_collector.end_current_batch()
return jsonify({'error': f'Redis比较请求处理失败{str(e)}'}), 500
@app.route('/api/redis/default-config')
def get_redis_default_config():
"""获取Redis默认配置"""
default_redis_config = {
'cluster1_config': {
'name': '生产集群',
'nodes': [
{'host': '127.0.0.1', 'port': 7000}
],
'password': '',
'socket_timeout': 3,
'socket_connect_timeout': 3,
'max_connections_per_node': 16
},
'cluster2_config': {
'name': '测试集群',
'nodes': [
{'host': '127.0.0.1', 'port': 7001}
],
'password': '',
'socket_timeout': 3,
'socket_connect_timeout': 3,
'max_connections_per_node': 16
},
'query_options': {
'mode': 'random',
'count': 100,
'pattern': '*',
'source_cluster': 'cluster2',
'keys': []
}
}
return jsonify(default_redis_config)
# Redis配置管理API
@app.route('/api/redis/config-groups', methods=['GET'])
def api_get_redis_config_groups():
"""获取所有Redis配置组"""
config_groups = get_redis_config_groups()
return jsonify({'success': True, 'data': config_groups})
@app.route('/api/redis/config-groups', methods=['POST'])
def api_save_redis_config_group():
"""保存Redis配置组"""
try:
data = request.json
name = data.get('name', '').strip()
description = data.get('description', '').strip()
cluster1_config = data.get('cluster1_config', {})
cluster2_config = data.get('cluster2_config', {})
query_options = data.get('query_options', {})
# 参数验证
if not name:
return jsonify({'success': False, 'error': '配置组名称不能为空'}), 400
if not cluster1_config or not cluster1_config.get('nodes'):
return jsonify({'success': False, 'error': '请配置集群1信息'}), 400
if not cluster2_config or not cluster2_config.get('nodes'):
return jsonify({'success': False, 'error': '请配置集群2信息'}), 400
success = save_redis_config_group(name, description, cluster1_config, cluster2_config, query_options)
if success:
return jsonify({'success': True, 'message': f'Redis配置组 "{name}" 保存成功'})
else:
return jsonify({'success': False, 'error': 'Redis配置组保存失败'}), 500
except Exception as e:
logger.error(f"保存Redis配置组API失败: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/redis/config-groups/<int:group_id>', methods=['GET'])
def api_get_redis_config_group_detail(group_id):
"""获取Redis配置组详情"""
config_group = get_redis_config_group_by_id(group_id)
if config_group:
return jsonify({'success': True, 'data': config_group})
else:
return jsonify({'success': False, 'error': 'Redis配置组不存在'}), 404
@app.route('/api/redis/config-groups/<int:group_id>', methods=['DELETE'])
def api_delete_redis_config_group(group_id):
"""删除Redis配置组"""
success = delete_redis_config_group(group_id)
if success:
return jsonify({'success': True, 'message': 'Redis配置组删除成功'})
else:
return jsonify({'success': False, 'error': 'Redis配置组删除失败'}), 500
@app.route('/api/redis/import-config', methods=['POST'])
def api_import_redis_config():
"""一键导入Redis配置"""
try:
data = request.json
config_text = data.get('config_text', '').strip()
if not config_text:
return jsonify({'success': False, 'error': '配置内容不能为空'}), 400
# 解析配置
redis_config = parse_redis_config_from_yaml(config_text)
if not redis_config:
return jsonify({'success': False, 'error': '配置格式解析失败,请检查配置内容'}), 400
# 验证必要字段
if not redis_config.get('nodes'):
return jsonify({'success': False, 'error': '未找到有效的集群地址配置'}), 400
return jsonify({
'success': True,
'data': redis_config,
'message': '配置导入成功'
})
except Exception as e:
logger.error(f"导入Redis配置失败: {e}")
return jsonify({'success': False, 'error': f'导入配置失败: {str(e)}'}), 500
# Redis查询历史API
@app.route('/api/redis/query-history', methods=['GET'])
def api_get_redis_query_history():
"""获取Redis查询历史记录"""
history_list = get_redis_query_history()
return jsonify({'success': True, 'data': history_list})
@app.route('/api/redis/query-history', methods=['POST'])
def api_save_redis_query_history():
"""保存Redis查询历史记录"""
try:
data = request.json
name = data.get('name', '').strip()
description = data.get('description', '').strip()
cluster1_config = data.get('cluster1_config', {})
cluster2_config = data.get('cluster2_config', {})
query_options = data.get('query_options', {})
query_keys = data.get('query_keys', [])
results_summary = data.get('results_summary', {})
execution_time = data.get('execution_time', 0)
total_keys = data.get('total_keys', 0)
different_count = data.get('different_count', 0)
identical_count = data.get('identical_count', 0)
missing_count = data.get('missing_count', 0)
raw_results = data.get('raw_results')
# 参数验证
if not name:
return jsonify({'success': False, 'error': '历史记录名称不能为空'}), 400
history_id = save_redis_query_history(
name, description, cluster1_config, cluster2_config, query_options,
query_keys, results_summary, execution_time, total_keys,
different_count, identical_count, missing_count, raw_results
)
if history_id:
return jsonify({'success': True, 'message': f'Redis查询历史记录保存成功', 'history_id': history_id})
else:
return jsonify({'success': False, 'error': 'Redis查询历史记录保存失败'}), 500
except Exception as e:
logger.error(f"保存Redis查询历史记录API失败: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/redis/query-history/<int:history_id>', methods=['GET'])
def api_get_redis_query_history_detail(history_id):
"""获取Redis查询历史记录详情"""
history_record = get_redis_query_history_by_id(history_id)
if history_record:
return jsonify({'success': True, 'data': history_record})
else:
return jsonify({'success': False, 'error': 'Redis查询历史记录不存在'}), 404
@app.route('/api/redis/query-history/<int:history_id>', methods=['DELETE'])
def api_delete_redis_query_history(history_id):
"""删除Redis查询历史记录"""
success = delete_redis_query_history(history_id)
if success:
return jsonify({'success': True, 'message': 'Redis查询历史记录删除成功'})
else:
return jsonify({'success': False, 'error': 'Redis查询历史记录删除失败'}), 500
@app.route('/api/redis/query-history/batch-delete', methods=['POST'])
def api_batch_delete_redis_query_history():
"""批量删除Redis查询历史记录"""
try:
data = request.get_json()
if not data or 'history_ids' not in data:
return jsonify({'success': False, 'error': '请提供要删除的历史记录ID列表'}), 400
history_ids = data['history_ids']
if not isinstance(history_ids, list):
return jsonify({'success': False, 'error': '历史记录ID列表格式错误'}), 400
if not history_ids:
return jsonify({'success': False, 'error': '没有要删除的记录'}), 400
# 验证ID都是整数
try:
history_ids = [int(id) for id in history_ids]
except (ValueError, TypeError):
return jsonify({'success': False, 'error': '历史记录ID格式错误'}), 400
result = batch_delete_redis_query_history(history_ids)
if result['success']:
return jsonify(result)
else:
return jsonify(result), 500
except Exception as e:
logger.error(f"批量删除Redis历史记录异常: {e}")
return jsonify({'success': False, 'error': f'批量删除失败: {str(e)}'}), 500
# Redis查询日志API
@app.route('/api/redis/query-logs', methods=['GET'])
def api_get_redis_query_logs():
"""获取Redis查询日志支持分组显示"""
try:
limit = request.args.get('limit', 1000, type=int)
grouped = request.args.get('grouped', 'true').lower() == 'true' # 默认分组显示
from_db = request.args.get('from_db', 'true').lower() == 'true' # 默认从数据库获取
if grouped:
# 返回分组日志
grouped_logs = query_log_collector.get_logs_grouped_by_batch(limit, from_db)
# 过滤出Redis相关的日志组
redis_grouped_logs = []
for batch_id, logs in grouped_logs:
# 过滤每个批次中的日志只保留Redis相关的
redis_logs = [
log for log in logs
if log.get('query_type', '').lower() == 'redis' or
(log.get('message') and 'redis' in log.get('message', '').lower())
]
if redis_logs: # 只有当批次中有Redis日志时才添加
redis_grouped_logs.append([batch_id, redis_logs])
# 获取总数(用于统计)
total_logs = sum(len(logs) for _, logs in redis_grouped_logs)
return jsonify({
'success': True,
'data': redis_grouped_logs,
'grouped': True,
'total_logs': total_logs,
'from_db': from_db
})
else:
# 返回平铺日志
logs = query_log_collector.get_logs(limit, from_db)
# 过滤Redis相关的日志
redis_logs = [
log for log in logs
if log.get('query_type', '').lower() == 'redis' or
(log.get('message') and 'redis' in log.get('message', '').lower())
]
return jsonify({
'success': True,
'data': redis_logs,
'grouped': False,
'total_logs': len(redis_logs),
'from_db': from_db
})
except Exception as e:
logger.error(f"获取Redis查询日志失败: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/redis/query-logs/history/<int:history_id>', methods=['GET'])
def api_get_redis_query_logs_by_history(history_id):
"""获取特定历史记录的Redis查询日志"""
try:
logs = query_log_collector.get_logs_by_history_id(history_id)
return jsonify({'success': True, 'data': logs})
except Exception as e:
logger.error(f"获取历史记录 {history_id} 的Redis查询日志失败: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/redis/query-logs', methods=['DELETE'])
def api_clear_redis_query_logs():
"""清空Redis查询日志"""
try:
query_log_collector.clear_logs()
return jsonify({'success': True, 'message': 'Redis查询日志清空成功'})
except Exception as e:
logger.error(f"清空Redis查询日志失败: {e}")
return jsonify({'success': False, 'error': str(e)}), 500