Files
BigDataTool/modules/redis_query.py
2025-08-05 19:56:38 +08:00

672 lines
28 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Redis查询引擎模块
=================
本模块是Redis数据比对的核心引擎提供高级的Redis数据查询和比较功能。
核心功能:
1. 多模式查询随机采样和指定Key两种查询模式
2. 全类型支持支持所有Redis数据类型的查询和比较
3. 智能比较:针对不同数据类型的专门比较算法
4. 性能监控:详细的查询时间和性能统计
5. 错误容错单个Key查询失败不影响整体结果
查询模式:
- 随机采样从源集群随机获取指定数量的Key进行比对
- 指定Key对用户提供的Key列表进行精确比对
- 模式匹配支持通配符模式的Key筛选
支持的数据类型:
- String字符串类型自动检测JSON格式
- Hash哈希表字段级别的深度比较
- List列表保持元素顺序的精确比较
- Set集合自动排序后的内容比较
- ZSet有序集合包含分数的完整比较
- Stream消息流消息级别的详细比较
比较算法:
- JSON智能比较自动检测和比较JSON格式数据
- 类型一致性检查:确保两个集群中数据类型一致
- 内容深度比较:递归比较复杂数据结构
- 性能优化:大数据集的高效比较算法
统计分析:
- 一致性统计相同、不同、缺失Key的详细统计
- 类型分布:各种数据类型的分布统计
- 性能指标:查询时间、连接时间等性能数据
- 错误分析:查询失败的详细错误统计
作者BigDataTool项目组
更新时间2024年8月
"""
import time
import logging
import random
from redis.cluster import key_slot
from redis.exceptions import RedisError
from .redis_client import RedisPerformanceTracker
logger = logging.getLogger(__name__)
# 导入查询日志收集器
try:
from app import query_log_collector
except ImportError:
# 如果导入失败,创建一个空的日志收集器
class DummyQueryLogCollector:
def start_new_batch(self, query_type):
return None
def end_current_batch(self):
pass
def set_history_id(self, history_id):
pass
def add_log(self, level, message):
pass
query_log_collector = DummyQueryLogCollector()
def _get_redis_command_by_type(redis_type):
"""根据Redis数据类型返回对应的查询命令"""
command_map = {
'string': 'GET',
'hash': 'HGETALL',
'list': 'LRANGE',
'set': 'SMEMBERS',
'zset': 'ZRANGE',
'stream': 'XRANGE'
}
return command_map.get(redis_type, 'TYPE')
def _get_data_summary(key_info):
"""获取数据内容的概要信息"""
if not key_info['exists']:
return "不存在"
key_type = key_info['type']
value = key_info['value']
try:
if key_type == 'string':
if isinstance(value, str):
if len(value) > 50:
return f"字符串({len(value)}字符): {value[:47]}..."
else:
return f"字符串: {value}"
else:
return f"字符串: {str(value)[:50]}..."
elif key_type == 'hash':
if isinstance(value, dict):
field_count = len(value)
sample_fields = list(value.keys())[:3]
fields_str = ", ".join(sample_fields)
if field_count > 3:
fields_str += "..."
return f"哈希({field_count}个字段): {fields_str}"
else:
return f"哈希: {str(value)[:50]}..."
elif key_type == 'list':
if isinstance(value, list):
list_len = len(value)
if list_len > 0:
first_item = str(value[0])[:20] if value[0] else ""
return f"列表({list_len}个元素): [{first_item}...]"
else:
return "列表(空)"
else:
return f"列表: {str(value)[:50]}..."
elif key_type == 'set':
if isinstance(value, (set, list)):
set_len = len(value)
if set_len > 0:
first_item = str(list(value)[0])[:20] if value else ""
return f"集合({set_len}个元素): {{{first_item}...}}"
else:
return "集合(空)"
else:
return f"集合: {str(value)[:50]}..."
elif key_type == 'zset':
if isinstance(value, list):
zset_len = len(value)
if zset_len > 0:
first_item = f"{value[0][0]}:{value[0][1]}" if value[0] else ""
return f"有序集合({zset_len}个元素): {{{first_item}...}}"
else:
return "有序集合(空)"
else:
return f"有序集合: {str(value)[:50]}..."
elif key_type == 'stream':
if isinstance(value, list):
stream_len = len(value)
return f"流({stream_len}条消息)"
else:
return f"流: {str(value)[:50]}..."
else:
return f"未知类型: {str(value)[:50]}..."
except Exception as e:
return f"解析错误: {str(e)[:30]}..."
def get_random_keys_from_redis(redis_client, count=100, pattern="*", performance_tracker=None):
"""
从Redis集群中获取随机keys
Args:
redis_client: Redis客户端
count: 要获取的key数量
pattern: key匹配模式默认为 "*"
performance_tracker: 性能追踪器
Returns:
list: 随机key列表
"""
start_time = time.time()
keys = set()
logger.info(f"开始扫描获取随机keys目标数量: {count},模式: {pattern}")
query_log_collector.add_log('INFO', f"🔍 开始扫描Key目标数量: {count},匹配模式: '{pattern}'")
try:
# 使用scan_iter获取keys
scan_count = max(count * 2, 1000) # 扫描更多key以确保随机性
query_log_collector.add_log('INFO', f"📡 执行SCAN命令扫描批次大小: {scan_count}")
scan_iterations = 0
for key in redis_client.scan_iter(match=pattern, count=scan_count):
keys.add(key)
scan_iterations += 1
# 每扫描1000个key记录一次进度
if scan_iterations % 1000 == 0:
query_log_collector.add_log('INFO', f"📊 扫描进度: 已发现 {len(keys)} 个匹配的Key")
if len(keys) >= count * 3: # 获取更多key以便随机选择
break
total_found = len(keys)
query_log_collector.add_log('INFO', f"🎯 扫描完成,共发现 {total_found} 个匹配的Key")
# 如果获取的key数量超过需要的数量随机选择
if len(keys) > count:
keys = random.sample(list(keys), count)
query_log_collector.add_log('INFO', f"🎲 从 {total_found} 个Key中随机选择 {count}")
else:
keys = list(keys)
if total_found < count:
query_log_collector.add_log('WARNING', f"⚠️ 实际找到的Key数量({total_found})少于目标数量({count})")
# 记录选中的Key样本前10个
key_sample = keys[:10] if len(keys) > 10 else keys
key_list_str = ", ".join([f"'{k}'" for k in key_sample])
if len(keys) > 10:
key_list_str += f" ... (共{len(keys)}个)"
query_log_collector.add_log('INFO', f"📋 选中的Key样本: [{key_list_str}]")
end_time = time.time()
scan_duration = end_time - start_time
if performance_tracker:
performance_tracker.record_scan_time(scan_duration)
logger.info(f"扫描获取 {len(keys)} 个随机keys耗时 {scan_duration:.3f}")
query_log_collector.add_log('INFO', f"✅ Key扫描完成最终获取 {len(keys)} 个keys总耗时 {scan_duration:.3f}")
return keys
except RedisError as e:
end_time = time.time()
scan_duration = end_time - start_time
if performance_tracker:
performance_tracker.record_scan_time(scan_duration)
logger.error(f"获取随机keys失败: {e},耗时 {scan_duration:.3f}")
query_log_collector.add_log('ERROR', f"获取随机keys失败: {e},耗时 {scan_duration:.3f}")
return []
def get_redis_values_by_keys(redis_client, keys, cluster_name="Redis集群", performance_tracker=None):
"""
批量查询Redis中指定keys的值支持所有Redis数据类型String、Hash、List、Set、ZSet等
Args:
redis_client: Redis客户端
keys: 要查询的key列表
cluster_name: 集群名称用于日志
performance_tracker: 性能追踪器
Returns:
list: 对应keys的值信息字典列表包含类型、值和显示格式
"""
from .redis_types import get_redis_value_with_type
start_time = time.time()
result = []
logger.info(f"开始从{cluster_name}批量查询 {len(keys)} 个keys支持所有数据类型")
query_log_collector.add_log('INFO', f"📊 开始从{cluster_name}批量查询 {len(keys)} 个keys支持所有数据类型")
# 记录要查询的Key列表前10个避免日志过长
key_sample = keys[:10] if len(keys) > 10 else keys
key_list_str = ", ".join([f"'{k}'" for k in key_sample])
if len(keys) > 10:
key_list_str += f" ... (共{len(keys)}个)"
query_log_collector.add_log('INFO', f"🔍 查询Key列表: [{key_list_str}]")
try:
# 逐个查询每个key支持所有Redis数据类型
redis_commands_used = {} # 记录使用的Redis命令
for i, key in enumerate(keys):
key_start_time = time.time()
key_info = get_redis_value_with_type(redis_client, key)
key_duration = time.time() - key_start_time
result.append(key_info)
# 记录每个key的查询详情
if key_info['exists']:
key_type = key_info['type']
# 根据类型确定使用的Redis命令
redis_cmd = _get_redis_command_by_type(key_type)
redis_commands_used[redis_cmd] = redis_commands_used.get(redis_cmd, 0) + 1
# 获取数据内容概要
data_summary = _get_data_summary(key_info)
query_log_collector.add_log('INFO',
f"✅ Key '{key}' | 类型: {key_type} | 命令: {redis_cmd} | 数据: {data_summary} | 耗时: {key_duration:.3f}s")
else:
query_log_collector.add_log('WARNING',
f"❌ Key '{key}' | 状态: 不存在 | 耗时: {key_duration:.3f}s")
end_time = time.time()
query_duration = end_time - start_time
if performance_tracker:
performance_tracker.record_query(f"{cluster_name}_typed_batch_query", query_duration)
# 统计成功获取的key数量和类型分布
successful_count = sum(1 for r in result if r['exists'])
type_stats = {}
for r in result:
if r['exists']:
key_type = r['type']
type_stats[key_type] = type_stats.get(key_type, 0) + 1
# 记录Redis命令使用统计
cmd_stats = ", ".join([f"{cmd}: {count}" for cmd, count in redis_commands_used.items()]) if redis_commands_used else ""
type_info = ", ".join([f"{t}: {c}" for t, c in type_stats.items()]) if type_stats else ""
query_log_collector.add_log('INFO', f"🎯 Redis命令统计: [{cmd_stats}]")
query_log_collector.add_log('INFO', f"📈 从{cluster_name}查询完成,成功获取 {successful_count}/{len(keys)} 个值,数据类型分布: [{type_info}],总耗时 {query_duration:.3f}")
return result
except Exception as e:
end_time = time.time()
query_duration = end_time - start_time
if performance_tracker:
performance_tracker.record_query(f"{cluster_name}_typed_batch_query_error", query_duration)
logger.error(f"{cluster_name}批量查询失败: {e},耗时 {query_duration:.3f}")
query_log_collector.add_log('ERROR', f"{cluster_name}批量查询失败: {e},耗时 {query_duration:.3f}")
# 返回错误占位符
return [{'type': 'error', 'value': None, 'display_value': f'<error: {e}>', 'exists': False} for _ in keys]
def compare_redis_data(client1, client2, keys, cluster1_name="生产集群", cluster2_name="测试集群", performance_tracker=None):
"""
比较两个Redis集群中指定keys的数据支持所有Redis数据类型
Args:
client1: 第一个Redis客户端生产
client2: 第二个Redis客户端测试
keys: 要比较的key列表
cluster1_name: 第一个集群名称
cluster2_name: 第二个集群名称
performance_tracker: 性能追踪器
Returns:
dict: 比较结果,包含统计信息和差异详情
"""
from .redis_types import compare_redis_values
comparison_start_time = time.time()
logger.info(f"开始比较 {cluster1_name}{cluster2_name} 的数据支持所有Redis数据类型")
query_log_collector.add_log('INFO', f"🔄 开始比较 {cluster1_name}{cluster2_name} 的数据支持所有Redis数据类型")
query_log_collector.add_log('INFO', f"📊 比较范围: {len(keys)} 个Key")
# 获取两个集群的数据
query_log_collector.add_log('INFO', f"📥 第一步: 从{cluster1_name}获取数据")
values1 = get_redis_values_by_keys(client1, keys, cluster1_name, performance_tracker)
if not values1:
error_msg = f'{cluster1_name}获取数据失败'
query_log_collector.add_log('ERROR', f"{error_msg}")
return {'error': error_msg}
query_log_collector.add_log('INFO', f"📥 第二步: 从{cluster2_name}获取数据")
values2 = get_redis_values_by_keys(client2, keys, cluster2_name, performance_tracker)
if not values2:
error_msg = f'{cluster2_name}获取数据失败'
query_log_collector.add_log('ERROR', f"{error_msg}")
return {'error': error_msg}
# 开始数据比对
compare_start = time.time()
query_log_collector.add_log('INFO', f"🔍 第三步: 开始逐个比较Key的数据内容")
# 初始化统计数据
stats = {
'total_keys': len(keys),
'identical_count': 0,
'different_count': 0,
'missing_in_cluster1': 0,
'missing_in_cluster2': 0,
'both_missing': 0
}
# 详细结果列表
identical_results = []
different_results = []
missing_results = []
# 逐个比较
comparison_details = [] # 记录比较详情
for i, key in enumerate(keys):
key_str = key.decode('utf-8') if isinstance(key, bytes) else key
value1_info = values1[i]
value2_info = values2[i]
# 使用redis_types模块的比较函数
comparison_result = compare_redis_values(value1_info, value2_info)
# 记录比较详情
comparison_detail = {
'key': key_str,
'cluster1_exists': value1_info['exists'],
'cluster2_exists': value2_info['exists'],
'cluster1_type': value1_info.get('type'),
'cluster2_type': value2_info.get('type'),
'status': comparison_result['status']
}
comparison_details.append(comparison_detail)
if comparison_result['status'] == 'both_missing':
stats['both_missing'] += 1
missing_results.append({
'key': key_str,
'status': 'both_missing',
'message': comparison_result['message']
})
query_log_collector.add_log('WARNING', f"⚠️ Key '{key_str}': 两个集群都不存在")
elif comparison_result['status'] == 'missing_in_cluster1':
stats['missing_in_cluster1'] += 1
missing_results.append({
'key': key_str,
'status': 'missing_in_cluster1',
'cluster1_value': None,
'cluster2_value': value2_info['display_value'],
'cluster2_type': value2_info['type'],
'message': comparison_result['message']
})
query_log_collector.add_log('WARNING', f"❌ Key '{key_str}': 仅在{cluster2_name}存在 (类型: {value2_info['type']})")
elif comparison_result['status'] == 'missing_in_cluster2':
stats['missing_in_cluster2'] += 1
missing_results.append({
'key': key_str,
'status': 'missing_in_cluster2',
'cluster1_value': value1_info['display_value'],
'cluster1_type': value1_info['type'],
'cluster2_value': None,
'message': comparison_result['message']
})
query_log_collector.add_log('WARNING', f"❌ Key '{key_str}': 仅在{cluster1_name}存在 (类型: {value1_info['type']})")
elif comparison_result['status'] == 'identical':
stats['identical_count'] += 1
identical_results.append({
'key': key_str,
'value': value1_info['display_value'],
'type': value1_info['type']
})
query_log_collector.add_log('INFO', f"✅ Key '{key_str}': 数据一致 (类型: {value1_info['type']})")
else: # different
stats['different_count'] += 1
different_results.append({
'key': key_str,
'cluster1_value': value1_info['display_value'],
'cluster1_type': value1_info['type'],
'cluster2_value': value2_info['display_value'],
'cluster2_type': value2_info['type'],
'message': comparison_result['message']
})
# 记录差异详情
type_info = f"{value1_info['type']} vs {value2_info['type']}" if value1_info['type'] != value2_info['type'] else value1_info['type']
query_log_collector.add_log('WARNING', f"🔄 Key '{key_str}': 数据不一致 (类型: {type_info}) - {comparison_result['message']}")
# 每处理100个key记录一次进度
if (i + 1) % 100 == 0:
progress = f"{i + 1}/{len(keys)}"
query_log_collector.add_log('INFO', f"📊 比较进度: {progress} ({((i + 1) / len(keys) * 100):.1f}%)")
compare_end = time.time()
comparison_duration = compare_end - compare_start
total_duration = compare_end - comparison_start_time
if performance_tracker:
performance_tracker.record_comparison_time(comparison_duration)
# 计算百分比
def safe_percentage(part, total):
return round((part / total * 100), 2) if total > 0 else 0
stats['identical_percentage'] = safe_percentage(stats['identical_count'], stats['total_keys'])
stats['different_percentage'] = safe_percentage(stats['different_count'], stats['total_keys'])
stats['missing_percentage'] = safe_percentage(
stats['missing_in_cluster1'] + stats['missing_in_cluster2'] + stats['both_missing'],
stats['total_keys']
)
result = {
'success': True,
'stats': stats,
'identical_results': identical_results,
'different_results': different_results,
'missing_results': missing_results,
'performance': {
'comparison_time': comparison_duration,
'total_time': total_duration
},
'clusters': {
'cluster1_name': cluster1_name,
'cluster2_name': cluster2_name
}
}
# 记录详细的比较总结
query_log_collector.add_log('INFO', f"🎯 数据比对完成,纯比较耗时 {comparison_duration:.3f} 秒,总耗时 {total_duration:.3f}")
# 记录统计信息
query_log_collector.add_log('INFO', f"📊 比对统计总览:")
query_log_collector.add_log('INFO', f" • 总Key数量: {stats['total_keys']}")
query_log_collector.add_log('INFO', f" • ✅ 数据一致: {stats['identical_count']} ({stats['identical_percentage']}%)")
query_log_collector.add_log('INFO', f" • 🔄 数据不同: {stats['different_count']} ({stats['different_percentage']}%)")
query_log_collector.add_log('INFO', f" • ❌ 仅{cluster1_name}存在: {stats['missing_in_cluster2']}")
query_log_collector.add_log('INFO', f" • ❌ 仅{cluster2_name}存在: {stats['missing_in_cluster1']}")
query_log_collector.add_log('INFO', f" • ⚠️ 两集群都不存在: {stats['both_missing']}")
# 记录性能信息
if performance_tracker:
query_log_collector.add_log('INFO', f"⚡ 性能统计: 平均每Key比较耗时 {(comparison_duration / len(keys) * 1000):.2f}ms")
# 记录所有Key的详细信息
query_log_collector.add_log('INFO', f"📋 全部Key详细信息:")
# 统计类型分布
type_distribution = {}
for detail in comparison_details:
key_str = detail['key']
cluster1_type = detail.get('cluster1_type', 'N/A')
cluster2_type = detail.get('cluster2_type', 'N/A')
status = detail.get('status', 'unknown')
# 统计类型分布
if cluster1_type != 'N/A':
type_distribution[cluster1_type] = type_distribution.get(cluster1_type, 0) + 1
elif cluster2_type != 'N/A':
type_distribution[cluster2_type] = type_distribution.get(cluster2_type, 0) + 1
# 记录每个Key的详细信息
if status == 'identical':
query_log_collector.add_log('INFO', f"{key_str} → 类型: {cluster1_type}, 状态: 数据一致")
elif status == 'different':
type_info = cluster1_type if cluster1_type == cluster2_type else f"{cluster1_name}:{cluster1_type} vs {cluster2_name}:{cluster2_type}"
query_log_collector.add_log('INFO', f" 🔄 {key_str} → 类型: {type_info}, 状态: 数据不同")
elif status == 'missing_in_cluster1':
query_log_collector.add_log('INFO', f"{key_str} → 类型: {cluster2_type}, 状态: 仅在{cluster2_name}存在")
elif status == 'missing_in_cluster2':
query_log_collector.add_log('INFO', f"{key_str} → 类型: {cluster1_type}, 状态: 仅在{cluster1_name}存在")
elif status == 'both_missing':
query_log_collector.add_log('INFO', f" ⚠️ {key_str} → 类型: N/A, 状态: 两集群都不存在")
# 记录类型分布统计
if type_distribution:
query_log_collector.add_log('INFO', f"📊 数据类型分布统计:")
for data_type, count in sorted(type_distribution.items()):
percentage = (count / len(keys)) * 100
query_log_collector.add_log('INFO', f"{data_type}: {count} 个 ({percentage:.1f}%)")
# 记录Key列表摘要
key_summary = [detail['key'] for detail in comparison_details[:10]] # 显示前10个key
key_list_str = ', '.join(key_summary)
if len(comparison_details) > 10:
key_list_str += f" ... (共{len(comparison_details)}个Key)"
query_log_collector.add_log('INFO', f"📝 Key列表摘要: [{key_list_str}]")
logger.info(f"数据比对完成,耗时 {comparison_duration:.3f}")
logger.info(f"比对统计: 总计{stats['total_keys']}个key相同{stats['identical_count']}个,不同{stats['different_count']}个,缺失{stats['missing_in_cluster1'] + stats['missing_in_cluster2'] + stats['both_missing']}")
return result
def execute_redis_comparison(config1, config2, query_options):
"""
执行Redis数据比较的主要函数
Args:
config1: 第一个Redis集群配置
config2: 第二个Redis集群配置
query_options: 查询选项,包含查询模式和参数
Returns:
dict: 完整的比较结果
"""
from .redis_client import create_redis_client
# 创建性能追踪器
performance_tracker = RedisPerformanceTracker()
cluster1_name = config1.get('name', '生产集群')
cluster2_name = config2.get('name', '测试集群')
logger.info(f"开始执行Redis数据比较: {cluster1_name} vs {cluster2_name}")
# 开始新的查询批次使用redis查询类型
batch_id = query_log_collector.start_new_batch('redis')
query_log_collector.add_log('INFO', f"🚀 开始执行Redis数据比较: {cluster1_name} vs {cluster2_name}")
query_log_collector.add_log('INFO', f"📋 查询批次ID: {batch_id}")
# 创建连接
client1 = create_redis_client(config1, cluster1_name, performance_tracker)
client2 = create_redis_client(config2, cluster2_name, performance_tracker)
if not client1:
error_msg = f'{cluster1_name}连接失败'
query_log_collector.add_log('ERROR', error_msg)
return {'error': error_msg}
if not client2:
error_msg = f'{cluster2_name}连接失败'
query_log_collector.add_log('ERROR', error_msg)
return {'error': error_msg}
try:
# 获取要比较的keys
keys = []
query_mode = query_options.get('mode', 'random')
if query_mode == 'random':
# 随机获取keys
count = query_options.get('count', 100)
pattern = query_options.get('pattern', '*')
source_cluster = query_options.get('source_cluster', 'cluster2') # 默认从第二个集群获取
source_client = client2 if source_cluster == 'cluster2' else client1
source_name = cluster2_name if source_cluster == 'cluster2' else cluster1_name
logger.info(f"{source_name}随机获取 {count} 个keys")
query_log_collector.add_log('INFO', f"{source_name}随机获取 {count} 个keys")
keys = get_random_keys_from_redis(source_client, count, pattern, performance_tracker)
elif query_mode == 'specified':
# 指定keys
keys = query_options.get('keys', [])
# 如果keys是字符串需要转换为bytesRedis通常使用bytes
keys = [k.encode('utf-8') if isinstance(k, str) else k for k in keys]
query_log_collector.add_log('INFO', f"使用指定的 {len(keys)} 个keys进行比较")
if not keys:
error_msg = '未获取到任何keys进行比较'
query_log_collector.add_log('ERROR', error_msg)
return {'error': error_msg}
logger.info(f"准备比较 {len(keys)} 个keys")
query_log_collector.add_log('INFO', f"准备比较 {len(keys)} 个keys")
# 执行比较
comparison_result = compare_redis_data(
client1, client2, keys,
cluster1_name, cluster2_name,
performance_tracker
)
# 添加性能报告
comparison_result['performance_report'] = performance_tracker.generate_report()
comparison_result['query_options'] = query_options
comparison_result['batch_id'] = batch_id # 添加批次ID到结果中
# 记录最终结果
if comparison_result.get('success'):
query_log_collector.add_log('INFO', f"🎉 Redis数据比较执行成功完成")
# 结束当前批次
query_log_collector.end_current_batch()
return comparison_result
except Exception as e:
logger.error(f"Redis数据比较执行失败: {e}")
query_log_collector.add_log('ERROR', f"💥 Redis数据比较执行失败: {e}")
# 结束当前批次
query_log_collector.end_current_batch()
return {'error': f'执行失败: {str(e)}', 'batch_id': batch_id}
finally:
# 关闭连接
try:
if client1:
client1.close()
if client2:
client2.close()
except:
pass