优化项目整合内容

This commit is contained in:
2025-08-05 19:56:38 +08:00
parent 3f78ce7365
commit 4a0800a776
7 changed files with 2608 additions and 364 deletions

View File

@@ -13,11 +13,13 @@ from .config_manager import (
get_config_group_by_id, delete_config_group,
save_query_history, get_query_history,
get_query_history_by_id, delete_query_history,
batch_delete_query_history,
# Redis配置管理
REDIS_DEFAULT_CONFIG, save_redis_config_group, get_redis_config_groups,
get_redis_config_group_by_id, delete_redis_config_group,
save_redis_query_history, get_redis_query_history,
get_redis_query_history_by_id, delete_redis_query_history,
batch_delete_redis_query_history,
parse_redis_config_from_yaml
)
from .cassandra_client import create_connection
@@ -38,15 +40,26 @@ def setup_routes(app, query_log_collector):
def index():
return render_template('index.html')
@app.route('/test-config-load')
def test_config_load():
"""配置加载测试页面"""
return send_from_directory('.', 'test_config_load.html')
@app.route('/db-compare')
def db_compare():
"""Cassandra数据库比对工具页面"""
return render_template('db_compare.html')
@app.route('/redis-compare')
def redis_compare():
"""Redis数据比对工具页面"""
return render_template('redis_compare.html')
@app.route('/redis-js-test')
def redis_js_test():
return render_template('redis_js_test.html')
@app.route('/redis-test')
def redis_test():
return render_template('redis_test.html')
# 基础API
@app.route('/api/default-config')
@@ -593,6 +606,41 @@ def setup_routes(app, query_log_collector):
else:
return jsonify({'success': False, 'error': '查询历史记录删除失败'}), 500
@app.route('/api/query-history/batch-delete', methods=['POST'])
def api_batch_delete_query_history():
"""批量删除Cassandra查询历史记录"""
try:
data = request.get_json()
if not data:
return jsonify({'success': False, 'error': '请求数据格式错误'}), 400
history_ids = data.get('history_ids', [])
# 验证参数
if not history_ids:
return jsonify({'success': False, 'error': '请提供要删除的历史记录ID列表'}), 400
if not isinstance(history_ids, list):
return jsonify({'success': False, 'error': 'history_ids必须是数组'}), 400
# 验证所有ID都是整数
try:
history_ids = [int(id) for id in history_ids]
except (ValueError, TypeError):
return jsonify({'success': False, 'error': '历史记录ID必须是整数'}), 400
# 调用批量删除函数
result = batch_delete_query_history(history_ids)
if result['success']:
return jsonify(result)
else:
return jsonify(result), 500
except Exception as e:
logger.error(f"批量删除Cassandra查询历史记录异常: {e}")
return jsonify({'success': False, 'error': f'服务器内部错误: {str(e)}'}), 500
# 查询日志管理API
@app.route('/api/query-logs', methods=['GET'])
def api_get_query_logs():
@@ -1087,22 +1135,89 @@ def setup_routes(app, query_log_collector):
else:
return jsonify({'success': False, 'error': 'Redis查询历史记录删除失败'}), 500
@app.route('/api/redis/query-history/batch-delete', methods=['POST'])
def api_batch_delete_redis_query_history():
"""批量删除Redis查询历史记录"""
try:
data = request.get_json()
if not data or 'history_ids' not in data:
return jsonify({'success': False, 'error': '请提供要删除的历史记录ID列表'}), 400
history_ids = data['history_ids']
if not isinstance(history_ids, list):
return jsonify({'success': False, 'error': '历史记录ID列表格式错误'}), 400
if not history_ids:
return jsonify({'success': False, 'error': '没有要删除的记录'}), 400
# 验证ID都是整数
try:
history_ids = [int(id) for id in history_ids]
except (ValueError, TypeError):
return jsonify({'success': False, 'error': '历史记录ID格式错误'}), 400
result = batch_delete_redis_query_history(history_ids)
if result['success']:
return jsonify(result)
else:
return jsonify(result), 500
except Exception as e:
logger.error(f"批量删除Redis历史记录异常: {e}")
return jsonify({'success': False, 'error': f'批量删除失败: {str(e)}'}), 500
# Redis查询日志API
@app.route('/api/redis/query-logs', methods=['GET'])
def api_get_redis_query_logs():
"""获取Redis查询日志"""
"""获取Redis查询日志,支持分组显示"""
try:
limit = request.args.get('limit', 100, type=int)
# 获取最新的查询日志
logs = query_log_collector.get_logs(limit=limit)
# 过滤Redis相关的日志
redis_logs = []
for log in logs:
if (log.get('message') and 'redis' in log.get('message', '').lower()) or log.get('query_type') == 'redis':
redis_logs.append(log)
return jsonify({'success': True, 'data': redis_logs})
limit = request.args.get('limit', 1000, type=int)
grouped = request.args.get('grouped', 'true').lower() == 'true' # 默认分组显示
from_db = request.args.get('from_db', 'true').lower() == 'true' # 默认从数据库获取
if grouped:
# 返回分组日志
grouped_logs = query_log_collector.get_logs_grouped_by_batch(limit, from_db)
# 过滤出Redis相关的日志组
redis_grouped_logs = []
for batch_id, logs in grouped_logs:
# 过滤每个批次中的日志只保留Redis相关的
redis_logs = [
log for log in logs
if log.get('query_type') == 'redis' or
(log.get('message') and 'redis' in log.get('message', '').lower())
]
if redis_logs: # 只有当批次中有Redis日志时才添加
redis_grouped_logs.append([batch_id, redis_logs])
# 获取总数(用于统计)
total_logs = sum(len(logs) for _, logs in redis_grouped_logs)
return jsonify({
'success': True,
'data': redis_grouped_logs,
'grouped': True,
'total_logs': total_logs,
'from_db': from_db
})
else:
# 返回平铺日志
logs = query_log_collector.get_logs(limit, from_db)
# 过滤Redis相关的日志
redis_logs = [
log for log in logs
if log.get('query_type') == 'redis' or
(log.get('message') and 'redis' in log.get('message', '').lower())
]
return jsonify({
'success': True,
'data': redis_logs,
'grouped': False,
'total_logs': len(redis_logs),
'from_db': from_db
})
except Exception as e:
logger.error(f"获取Redis查询日志失败: {e}")
return jsonify({'success': False, 'error': str(e)}), 500

View File

@@ -366,6 +366,50 @@ def delete_redis_query_history(history_id):
finally:
conn.close()
def batch_delete_redis_query_history(history_ids):
"""批量删除Redis查询历史记录"""
if not history_ids:
return {'success': True, 'message': '没有要删除的记录', 'deleted_count': 0}
if not ensure_database():
logger.error("数据库初始化失败")
return {'success': False, 'error': '数据库初始化失败', 'deleted_count': 0}
conn = get_db_connection()
cursor = conn.cursor()
try:
# 构建IN子句的占位符
placeholders = ','.join(['?' for _ in history_ids])
sql = f'DELETE FROM redis_query_history WHERE id IN ({placeholders})'
cursor.execute(sql, history_ids)
conn.commit()
deleted_count = cursor.rowcount
if deleted_count > 0:
logger.info(f"成功批量删除 {deleted_count} 条Redis查询历史记录: {history_ids}")
return {
'success': True,
'message': f'成功删除 {deleted_count} 条记录',
'deleted_count': deleted_count
}
else:
return {
'success': False,
'error': '没有找到要删除的记录',
'deleted_count': 0
}
except Exception as e:
logger.error(f"批量删除Redis查询历史记录失败: {e}")
return {
'success': False,
'error': f'删除失败: {str(e)}',
'deleted_count': 0
}
finally:
conn.close()
def parse_redis_config_from_yaml(yaml_text):
"""从YAML格式文本解析Redis配置"""
try:
@@ -698,5 +742,49 @@ def delete_query_history(history_id):
except Exception as e:
logger.error(f"删除查询历史记录失败: {e}")
return False
finally:
conn.close()
def batch_delete_query_history(history_ids):
"""批量删除Cassandra查询历史记录"""
if not history_ids:
return {'success': True, 'message': '没有要删除的记录', 'deleted_count': 0}
if not ensure_database():
logger.error("数据库初始化失败")
return {'success': False, 'error': '数据库初始化失败', 'deleted_count': 0}
conn = get_db_connection()
cursor = conn.cursor()
try:
# 构建IN子句的占位符
placeholders = ','.join(['?' for _ in history_ids])
sql = f'DELETE FROM query_history WHERE id IN ({placeholders})'
cursor.execute(sql, history_ids)
conn.commit()
deleted_count = cursor.rowcount
if deleted_count > 0:
logger.info(f"成功批量删除 {deleted_count} 条Cassandra查询历史记录: {history_ids}")
return {
'success': True,
'message': f'成功删除 {deleted_count} 条记录',
'deleted_count': deleted_count
}
else:
return {
'success': False,
'error': '没有找到要删除的记录',
'deleted_count': 0
}
except Exception as e:
logger.error(f"批量删除Cassandra查询历史记录失败: {e}")
return {
'success': False,
'error': f'删除失败: {str(e)}',
'deleted_count': 0
}
finally:
conn.close()

View File

@@ -66,6 +66,93 @@ except ImportError:
query_log_collector = DummyQueryLogCollector()
def _get_redis_command_by_type(redis_type):
"""根据Redis数据类型返回对应的查询命令"""
command_map = {
'string': 'GET',
'hash': 'HGETALL',
'list': 'LRANGE',
'set': 'SMEMBERS',
'zset': 'ZRANGE',
'stream': 'XRANGE'
}
return command_map.get(redis_type, 'TYPE')
def _get_data_summary(key_info):
"""获取数据内容的概要信息"""
if not key_info['exists']:
return "不存在"
key_type = key_info['type']
value = key_info['value']
try:
if key_type == 'string':
if isinstance(value, str):
if len(value) > 50:
return f"字符串({len(value)}字符): {value[:47]}..."
else:
return f"字符串: {value}"
else:
return f"字符串: {str(value)[:50]}..."
elif key_type == 'hash':
if isinstance(value, dict):
field_count = len(value)
sample_fields = list(value.keys())[:3]
fields_str = ", ".join(sample_fields)
if field_count > 3:
fields_str += "..."
return f"哈希({field_count}个字段): {fields_str}"
else:
return f"哈希: {str(value)[:50]}..."
elif key_type == 'list':
if isinstance(value, list):
list_len = len(value)
if list_len > 0:
first_item = str(value[0])[:20] if value[0] else ""
return f"列表({list_len}个元素): [{first_item}...]"
else:
return "列表(空)"
else:
return f"列表: {str(value)[:50]}..."
elif key_type == 'set':
if isinstance(value, (set, list)):
set_len = len(value)
if set_len > 0:
first_item = str(list(value)[0])[:20] if value else ""
return f"集合({set_len}个元素): {{{first_item}...}}"
else:
return "集合(空)"
else:
return f"集合: {str(value)[:50]}..."
elif key_type == 'zset':
if isinstance(value, list):
zset_len = len(value)
if zset_len > 0:
first_item = f"{value[0][0]}:{value[0][1]}" if value[0] else ""
return f"有序集合({zset_len}个元素): {{{first_item}...}}"
else:
return "有序集合(空)"
else:
return f"有序集合: {str(value)[:50]}..."
elif key_type == 'stream':
if isinstance(value, list):
stream_len = len(value)
return f"流({stream_len}条消息)"
else:
return f"流: {str(value)[:50]}..."
else:
return f"未知类型: {str(value)[:50]}..."
except Exception as e:
return f"解析错误: {str(e)[:30]}..."
def get_random_keys_from_redis(redis_client, count=100, pattern="*", performance_tracker=None):
"""
从Redis集群中获取随机keys
@@ -83,31 +170,52 @@ def get_random_keys_from_redis(redis_client, count=100, pattern="*", performance
keys = set()
logger.info(f"开始扫描获取随机keys目标数量: {count},模式: {pattern}")
query_log_collector.add_log('INFO', f"开始扫描获取随机keys,目标数量: {count},模式: {pattern}")
query_log_collector.add_log('INFO', f"🔍 开始扫描Key目标数量: {count}匹配模式: '{pattern}'")
try:
# 使用scan_iter获取keys
scan_count = max(count * 2, 1000) # 扫描更多key以确保随机性
query_log_collector.add_log('INFO', f"📡 执行SCAN命令扫描批次大小: {scan_count}")
scan_iterations = 0
for key in redis_client.scan_iter(match=pattern, count=scan_count):
keys.add(key)
scan_iterations += 1
# 每扫描1000个key记录一次进度
if scan_iterations % 1000 == 0:
query_log_collector.add_log('INFO', f"📊 扫描进度: 已发现 {len(keys)} 个匹配的Key")
if len(keys) >= count * 3: # 获取更多key以便随机选择
break
total_found = len(keys)
query_log_collector.add_log('INFO', f"🎯 扫描完成,共发现 {total_found} 个匹配的Key")
# 如果获取的key数量超过需要的数量随机选择
if len(keys) > count:
keys = random.sample(list(keys), count)
query_log_collector.add_log('INFO', f"🎲 从 {total_found} 个Key中随机选择 {count}")
else:
keys = list(keys)
if total_found < count:
query_log_collector.add_log('WARNING', f"⚠️ 实际找到的Key数量({total_found})少于目标数量({count})")
# 记录选中的Key样本前10个
key_sample = keys[:10] if len(keys) > 10 else keys
key_list_str = ", ".join([f"'{k}'" for k in key_sample])
if len(keys) > 10:
key_list_str += f" ... (共{len(keys)}个)"
query_log_collector.add_log('INFO', f"📋 选中的Key样本: [{key_list_str}]")
end_time = time.time()
scan_duration = end_time - start_time
if performance_tracker:
performance_tracker.record_scan_time(scan_duration)
logger.info(f"扫描获取 {len(keys)} 个随机keys耗时 {scan_duration:.3f}")
query_log_collector.add_log('INFO', f"扫描获取 {len(keys)}随机keys耗时 {scan_duration:.3f}")
query_log_collector.add_log('INFO', f"✅ Key扫描完成最终获取 {len(keys)} 个keys耗时 {scan_duration:.3f}")
return keys
except RedisError as e:
@@ -124,36 +232,64 @@ def get_random_keys_from_redis(redis_client, count=100, pattern="*", performance
def get_redis_values_by_keys(redis_client, keys, cluster_name="Redis集群", performance_tracker=None):
"""
批量查询Redis中指定keys的值支持所有Redis数据类型String、Hash、List、Set、ZSet等
Args:
redis_client: Redis客户端
keys: 要查询的key列表
cluster_name: 集群名称用于日志
performance_tracker: 性能追踪器
Returns:
list: 对应keys的值信息字典列表包含类型、值和显示格式
"""
from .redis_types import get_redis_value_with_type
start_time = time.time()
result = []
logger.info(f"开始从{cluster_name}批量查询 {len(keys)} 个keys支持所有数据类型")
query_log_collector.add_log('INFO', f"开始从{cluster_name}批量查询 {len(keys)} 个keys支持所有数据类型")
query_log_collector.add_log('INFO', f"📊 开始从{cluster_name}批量查询 {len(keys)} 个keys支持所有数据类型")
# 记录要查询的Key列表前10个避免日志过长
key_sample = keys[:10] if len(keys) > 10 else keys
key_list_str = ", ".join([f"'{k}'" for k in key_sample])
if len(keys) > 10:
key_list_str += f" ... (共{len(keys)}个)"
query_log_collector.add_log('INFO', f"🔍 查询Key列表: [{key_list_str}]")
try:
# 逐个查询每个key支持所有Redis数据类型
for key in keys:
redis_commands_used = {} # 记录使用的Redis命令
for i, key in enumerate(keys):
key_start_time = time.time()
key_info = get_redis_value_with_type(redis_client, key)
key_duration = time.time() - key_start_time
result.append(key_info)
# 记录每个key的查询详情
if key_info['exists']:
key_type = key_info['type']
# 根据类型确定使用的Redis命令
redis_cmd = _get_redis_command_by_type(key_type)
redis_commands_used[redis_cmd] = redis_commands_used.get(redis_cmd, 0) + 1
# 获取数据内容概要
data_summary = _get_data_summary(key_info)
query_log_collector.add_log('INFO',
f"✅ Key '{key}' | 类型: {key_type} | 命令: {redis_cmd} | 数据: {data_summary} | 耗时: {key_duration:.3f}s")
else:
query_log_collector.add_log('WARNING',
f"❌ Key '{key}' | 状态: 不存在 | 耗时: {key_duration:.3f}s")
end_time = time.time()
query_duration = end_time - start_time
if performance_tracker:
performance_tracker.record_query(f"{cluster_name}_typed_batch_query", query_duration)
# 统计成功获取的key数量和类型分布
successful_count = sum(1 for r in result if r['exists'])
type_stats = {}
@@ -161,11 +297,14 @@ def get_redis_values_by_keys(redis_client, keys, cluster_name="Redis集群", per
if r['exists']:
key_type = r['type']
type_stats[key_type] = type_stats.get(key_type, 0) + 1
# 记录Redis命令使用统计
cmd_stats = ", ".join([f"{cmd}: {count}" for cmd, count in redis_commands_used.items()]) if redis_commands_used else ""
type_info = ", ".join([f"{t}: {c}" for t, c in type_stats.items()]) if type_stats else ""
logger.info(f"{cluster_name}查询完成,成功获取 {successful_count}/{len(keys)} 个值,数据类型分布: [{type_info}],耗时 {query_duration:.3f}")
query_log_collector.add_log('INFO', f"{cluster_name}查询完成,成功获取 {successful_count}/{len(keys)} 个值,数据类型分布: [{type_info}],耗时 {query_duration:.3f}")
query_log_collector.add_log('INFO', f"🎯 Redis命令统计: [{cmd_stats}]")
query_log_collector.add_log('INFO', f"📈 从{cluster_name}查询完成,成功获取 {successful_count}/{len(keys)} 个值,数据类型分布: [{type_info}],总耗时 {query_duration:.3f}")
return result
except Exception as e:
@@ -200,19 +339,27 @@ def compare_redis_data(client1, client2, keys, cluster1_name="生产集群", clu
comparison_start_time = time.time()
logger.info(f"开始比较 {cluster1_name}{cluster2_name} 的数据支持所有Redis数据类型")
query_log_collector.add_log('INFO', f"开始比较 {cluster1_name}{cluster2_name} 的数据支持所有Redis数据类型")
query_log_collector.add_log('INFO', f"🔄 开始比较 {cluster1_name}{cluster2_name} 的数据支持所有Redis数据类型")
query_log_collector.add_log('INFO', f"📊 比较范围: {len(keys)} 个Key")
# 获取两个集群的数据
query_log_collector.add_log('INFO', f"📥 第一步: 从{cluster1_name}获取数据")
values1 = get_redis_values_by_keys(client1, keys, cluster1_name, performance_tracker)
if not values1:
return {'error': f'{cluster1_name}获取数据失败'}
error_msg = f'{cluster1_name}获取数据失败'
query_log_collector.add_log('ERROR', f"{error_msg}")
return {'error': error_msg}
query_log_collector.add_log('INFO', f"📥 第二步: 从{cluster2_name}获取数据")
values2 = get_redis_values_by_keys(client2, keys, cluster2_name, performance_tracker)
if not values2:
return {'error': f'{cluster2_name}获取数据失败'}
error_msg = f'{cluster2_name}获取数据失败'
query_log_collector.add_log('ERROR', f"{error_msg}")
return {'error': error_msg}
# 开始数据比对
compare_start = time.time()
query_log_collector.add_log('INFO', f"🔍 第三步: 开始逐个比较Key的数据内容")
# 初始化统计数据
stats = {
@@ -230,14 +377,27 @@ def compare_redis_data(client1, client2, keys, cluster1_name="生产集群", clu
missing_results = []
# 逐个比较
comparison_details = [] # 记录比较详情
for i, key in enumerate(keys):
key_str = key.decode('utf-8') if isinstance(key, bytes) else key
value1_info = values1[i]
value2_info = values2[i]
# 使用redis_types模块的比较函数
comparison_result = compare_redis_values(value1_info, value2_info)
# 记录比较详情
comparison_detail = {
'key': key_str,
'cluster1_exists': value1_info['exists'],
'cluster2_exists': value2_info['exists'],
'cluster1_type': value1_info.get('type'),
'cluster2_type': value2_info.get('type'),
'status': comparison_result['status']
}
comparison_details.append(comparison_detail)
if comparison_result['status'] == 'both_missing':
stats['both_missing'] += 1
missing_results.append({
@@ -245,6 +405,8 @@ def compare_redis_data(client1, client2, keys, cluster1_name="生产集群", clu
'status': 'both_missing',
'message': comparison_result['message']
})
query_log_collector.add_log('WARNING', f"⚠️ Key '{key_str}': 两个集群都不存在")
elif comparison_result['status'] == 'missing_in_cluster1':
stats['missing_in_cluster1'] += 1
missing_results.append({
@@ -255,6 +417,8 @@ def compare_redis_data(client1, client2, keys, cluster1_name="生产集群", clu
'cluster2_type': value2_info['type'],
'message': comparison_result['message']
})
query_log_collector.add_log('WARNING', f"❌ Key '{key_str}': 仅在{cluster2_name}存在 (类型: {value2_info['type']})")
elif comparison_result['status'] == 'missing_in_cluster2':
stats['missing_in_cluster2'] += 1
missing_results.append({
@@ -265,6 +429,7 @@ def compare_redis_data(client1, client2, keys, cluster1_name="生产集群", clu
'cluster2_value': None,
'message': comparison_result['message']
})
query_log_collector.add_log('WARNING', f"❌ Key '{key_str}': 仅在{cluster1_name}存在 (类型: {value1_info['type']})")
elif comparison_result['status'] == 'identical':
stats['identical_count'] += 1
identical_results.append({
@@ -272,6 +437,8 @@ def compare_redis_data(client1, client2, keys, cluster1_name="生产集群", clu
'value': value1_info['display_value'],
'type': value1_info['type']
})
query_log_collector.add_log('INFO', f"✅ Key '{key_str}': 数据一致 (类型: {value1_info['type']})")
else: # different
stats['different_count'] += 1
different_results.append({
@@ -282,6 +449,14 @@ def compare_redis_data(client1, client2, keys, cluster1_name="生产集群", clu
'cluster2_type': value2_info['type'],
'message': comparison_result['message']
})
# 记录差异详情
type_info = f"{value1_info['type']} vs {value2_info['type']}" if value1_info['type'] != value2_info['type'] else value1_info['type']
query_log_collector.add_log('WARNING', f"🔄 Key '{key_str}': 数据不一致 (类型: {type_info}) - {comparison_result['message']}")
# 每处理100个key记录一次进度
if (i + 1) % 100 == 0:
progress = f"{i + 1}/{len(keys)}"
query_log_collector.add_log('INFO', f"📊 比较进度: {progress} ({((i + 1) / len(keys) * 100):.1f}%)")
compare_end = time.time()
comparison_duration = compare_end - compare_start
@@ -317,10 +492,68 @@ def compare_redis_data(client1, client2, keys, cluster1_name="生产集群", clu
}
}
# 记录详细的比较总结
query_log_collector.add_log('INFO', f"🎯 数据比对完成,纯比较耗时 {comparison_duration:.3f} 秒,总耗时 {total_duration:.3f}")
# 记录统计信息
query_log_collector.add_log('INFO', f"📊 比对统计总览:")
query_log_collector.add_log('INFO', f" • 总Key数量: {stats['total_keys']}")
query_log_collector.add_log('INFO', f" • ✅ 数据一致: {stats['identical_count']} ({stats['identical_percentage']}%)")
query_log_collector.add_log('INFO', f" • 🔄 数据不同: {stats['different_count']} ({stats['different_percentage']}%)")
query_log_collector.add_log('INFO', f" • ❌ 仅{cluster1_name}存在: {stats['missing_in_cluster2']}")
query_log_collector.add_log('INFO', f" • ❌ 仅{cluster2_name}存在: {stats['missing_in_cluster1']}")
query_log_collector.add_log('INFO', f" • ⚠️ 两集群都不存在: {stats['both_missing']}")
# 记录性能信息
if performance_tracker:
query_log_collector.add_log('INFO', f"⚡ 性能统计: 平均每Key比较耗时 {(comparison_duration / len(keys) * 1000):.2f}ms")
# 记录所有Key的详细信息
query_log_collector.add_log('INFO', f"📋 全部Key详细信息:")
# 统计类型分布
type_distribution = {}
for detail in comparison_details:
key_str = detail['key']
cluster1_type = detail.get('cluster1_type', 'N/A')
cluster2_type = detail.get('cluster2_type', 'N/A')
status = detail.get('status', 'unknown')
# 统计类型分布
if cluster1_type != 'N/A':
type_distribution[cluster1_type] = type_distribution.get(cluster1_type, 0) + 1
elif cluster2_type != 'N/A':
type_distribution[cluster2_type] = type_distribution.get(cluster2_type, 0) + 1
# 记录每个Key的详细信息
if status == 'identical':
query_log_collector.add_log('INFO', f"{key_str} → 类型: {cluster1_type}, 状态: 数据一致")
elif status == 'different':
type_info = cluster1_type if cluster1_type == cluster2_type else f"{cluster1_name}:{cluster1_type} vs {cluster2_name}:{cluster2_type}"
query_log_collector.add_log('INFO', f" 🔄 {key_str} → 类型: {type_info}, 状态: 数据不同")
elif status == 'missing_in_cluster1':
query_log_collector.add_log('INFO', f"{key_str} → 类型: {cluster2_type}, 状态: 仅在{cluster2_name}存在")
elif status == 'missing_in_cluster2':
query_log_collector.add_log('INFO', f"{key_str} → 类型: {cluster1_type}, 状态: 仅在{cluster1_name}存在")
elif status == 'both_missing':
query_log_collector.add_log('INFO', f" ⚠️ {key_str} → 类型: N/A, 状态: 两集群都不存在")
# 记录类型分布统计
if type_distribution:
query_log_collector.add_log('INFO', f"📊 数据类型分布统计:")
for data_type, count in sorted(type_distribution.items()):
percentage = (count / len(keys)) * 100
query_log_collector.add_log('INFO', f"{data_type}: {count} 个 ({percentage:.1f}%)")
# 记录Key列表摘要
key_summary = [detail['key'] for detail in comparison_details[:10]] # 显示前10个key
key_list_str = ', '.join(key_summary)
if len(comparison_details) > 10:
key_list_str += f" ... (共{len(comparison_details)}个Key)"
query_log_collector.add_log('INFO', f"📝 Key列表摘要: [{key_list_str}]")
logger.info(f"数据比对完成,耗时 {comparison_duration:.3f}")
logger.info(f"比对统计: 总计{stats['total_keys']}个key相同{stats['identical_count']}个,不同{stats['different_count']}个,缺失{stats['missing_in_cluster1'] + stats['missing_in_cluster2'] + stats['both_missing']}")
query_log_collector.add_log('INFO', f"数据比对完成,耗时 {comparison_duration:.3f}")
query_log_collector.add_log('INFO', f"比对统计: 总计{stats['total_keys']}个key相同{stats['identical_count']}个,不同{stats['different_count']}个,缺失{stats['missing_in_cluster1'] + stats['missing_in_cluster2'] + stats['both_missing']}")
return result
@@ -345,7 +578,11 @@ def execute_redis_comparison(config1, config2, query_options):
cluster2_name = config2.get('name', '测试集群')
logger.info(f"开始执行Redis数据比较: {cluster1_name} vs {cluster2_name}")
query_log_collector.add_log('INFO', f"开始执行Redis数据比较: {cluster1_name} vs {cluster2_name}")
# 开始新的查询批次使用redis查询类型
batch_id = query_log_collector.start_new_batch('redis')
query_log_collector.add_log('INFO', f"🚀 开始执行Redis数据比较: {cluster1_name} vs {cluster2_name}")
query_log_collector.add_log('INFO', f"📋 查询批次ID: {batch_id}")
# 创建连接
client1 = create_redis_client(config1, cluster1_name, performance_tracker)
@@ -404,14 +641,26 @@ def execute_redis_comparison(config1, config2, query_options):
# 添加性能报告
comparison_result['performance_report'] = performance_tracker.generate_report()
comparison_result['query_options'] = query_options
comparison_result['batch_id'] = batch_id # 添加批次ID到结果中
# 记录最终结果
if comparison_result.get('success'):
query_log_collector.add_log('INFO', f"🎉 Redis数据比较执行成功完成")
# 结束当前批次
query_log_collector.end_current_batch()
return comparison_result
except Exception as e:
logger.error(f"Redis数据比较执行失败: {e}")
query_log_collector.add_log('ERROR', f"Redis数据比较执行失败: {e}")
return {'error': f'执行失败: {str(e)}'}
query_log_collector.add_log('ERROR', f"💥 Redis数据比较执行失败: {e}")
# 结束当前批次
query_log_collector.end_current_batch()
return {'error': f'执行失败: {str(e)}', 'batch_id': batch_id}
finally:
# 关闭连接
try: