Files
BigDataTool/modules/redis_query.py
2025-08-05 11:23:49 +08:00

423 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Redis查询引擎模块
=================
本模块是Redis数据比对的核心引擎提供高级的Redis数据查询和比较功能。
核心功能:
1. 多模式查询随机采样和指定Key两种查询模式
2. 全类型支持支持所有Redis数据类型的查询和比较
3. 智能比较:针对不同数据类型的专门比较算法
4. 性能监控:详细的查询时间和性能统计
5. 错误容错单个Key查询失败不影响整体结果
查询模式:
- 随机采样从源集群随机获取指定数量的Key进行比对
- 指定Key对用户提供的Key列表进行精确比对
- 模式匹配支持通配符模式的Key筛选
支持的数据类型:
- String字符串类型自动检测JSON格式
- Hash哈希表字段级别的深度比较
- List列表保持元素顺序的精确比较
- Set集合自动排序后的内容比较
- ZSet有序集合包含分数的完整比较
- Stream消息流消息级别的详细比较
比较算法:
- JSON智能比较自动检测和比较JSON格式数据
- 类型一致性检查:确保两个集群中数据类型一致
- 内容深度比较:递归比较复杂数据结构
- 性能优化:大数据集的高效比较算法
统计分析:
- 一致性统计相同、不同、缺失Key的详细统计
- 类型分布:各种数据类型的分布统计
- 性能指标:查询时间、连接时间等性能数据
- 错误分析:查询失败的详细错误统计
作者BigDataTool项目组
更新时间2024年8月
"""
import time
import logging
import random
from redis.cluster import key_slot
from redis.exceptions import RedisError
from .redis_client import RedisPerformanceTracker
logger = logging.getLogger(__name__)
# 导入查询日志收集器
try:
from app import query_log_collector
except ImportError:
# 如果导入失败,创建一个空的日志收集器
class DummyQueryLogCollector:
def start_new_batch(self, query_type):
return None
def end_current_batch(self):
pass
def set_history_id(self, history_id):
pass
def add_log(self, level, message):
pass
query_log_collector = DummyQueryLogCollector()
def get_random_keys_from_redis(redis_client, count=100, pattern="*", performance_tracker=None):
"""
从Redis集群中获取随机keys
Args:
redis_client: Redis客户端
count: 要获取的key数量
pattern: key匹配模式默认为 "*"
performance_tracker: 性能追踪器
Returns:
list: 随机key列表
"""
start_time = time.time()
keys = set()
logger.info(f"开始扫描获取随机keys目标数量: {count},模式: {pattern}")
query_log_collector.add_log('INFO', f"开始扫描获取随机keys目标数量: {count},模式: {pattern}")
try:
# 使用scan_iter获取keys
scan_count = max(count * 2, 1000) # 扫描更多key以确保随机性
for key in redis_client.scan_iter(match=pattern, count=scan_count):
keys.add(key)
if len(keys) >= count * 3: # 获取更多key以便随机选择
break
# 如果获取的key数量超过需要的数量随机选择
if len(keys) > count:
keys = random.sample(list(keys), count)
else:
keys = list(keys)
end_time = time.time()
scan_duration = end_time - start_time
if performance_tracker:
performance_tracker.record_scan_time(scan_duration)
logger.info(f"扫描获取 {len(keys)} 个随机keys耗时 {scan_duration:.3f}")
query_log_collector.add_log('INFO', f"扫描获取 {len(keys)} 个随机keys耗时 {scan_duration:.3f}")
return keys
except RedisError as e:
end_time = time.time()
scan_duration = end_time - start_time
if performance_tracker:
performance_tracker.record_scan_time(scan_duration)
logger.error(f"获取随机keys失败: {e},耗时 {scan_duration:.3f}")
query_log_collector.add_log('ERROR', f"获取随机keys失败: {e},耗时 {scan_duration:.3f}")
return []
def get_redis_values_by_keys(redis_client, keys, cluster_name="Redis集群", performance_tracker=None):
"""
批量查询Redis中指定keys的值支持所有Redis数据类型String、Hash、List、Set、ZSet等
Args:
redis_client: Redis客户端
keys: 要查询的key列表
cluster_name: 集群名称用于日志
performance_tracker: 性能追踪器
Returns:
list: 对应keys的值信息字典列表包含类型、值和显示格式
"""
from .redis_types import get_redis_value_with_type
start_time = time.time()
result = []
logger.info(f"开始从{cluster_name}批量查询 {len(keys)} 个keys支持所有数据类型")
query_log_collector.add_log('INFO', f"开始从{cluster_name}批量查询 {len(keys)} 个keys支持所有数据类型")
try:
# 逐个查询每个key支持所有Redis数据类型
for key in keys:
key_info = get_redis_value_with_type(redis_client, key)
result.append(key_info)
end_time = time.time()
query_duration = end_time - start_time
if performance_tracker:
performance_tracker.record_query(f"{cluster_name}_typed_batch_query", query_duration)
# 统计成功获取的key数量和类型分布
successful_count = sum(1 for r in result if r['exists'])
type_stats = {}
for r in result:
if r['exists']:
key_type = r['type']
type_stats[key_type] = type_stats.get(key_type, 0) + 1
type_info = ", ".join([f"{t}: {c}" for t, c in type_stats.items()]) if type_stats else ""
logger.info(f"{cluster_name}查询完成,成功获取 {successful_count}/{len(keys)} 个值,数据类型分布: [{type_info}],耗时 {query_duration:.3f}")
query_log_collector.add_log('INFO', f"{cluster_name}查询完成,成功获取 {successful_count}/{len(keys)} 个值,数据类型分布: [{type_info}],耗时 {query_duration:.3f}")
return result
except Exception as e:
end_time = time.time()
query_duration = end_time - start_time
if performance_tracker:
performance_tracker.record_query(f"{cluster_name}_typed_batch_query_error", query_duration)
logger.error(f"{cluster_name}批量查询失败: {e},耗时 {query_duration:.3f}")
query_log_collector.add_log('ERROR', f"{cluster_name}批量查询失败: {e},耗时 {query_duration:.3f}")
# 返回错误占位符
return [{'type': 'error', 'value': None, 'display_value': f'<error: {e}>', 'exists': False} for _ in keys]
def compare_redis_data(client1, client2, keys, cluster1_name="生产集群", cluster2_name="测试集群", performance_tracker=None):
"""
比较两个Redis集群中指定keys的数据支持所有Redis数据类型
Args:
client1: 第一个Redis客户端生产
client2: 第二个Redis客户端测试
keys: 要比较的key列表
cluster1_name: 第一个集群名称
cluster2_name: 第二个集群名称
performance_tracker: 性能追踪器
Returns:
dict: 比较结果,包含统计信息和差异详情
"""
from .redis_types import compare_redis_values
comparison_start_time = time.time()
logger.info(f"开始比较 {cluster1_name}{cluster2_name} 的数据支持所有Redis数据类型")
query_log_collector.add_log('INFO', f"开始比较 {cluster1_name}{cluster2_name} 的数据支持所有Redis数据类型")
# 获取两个集群的数据
values1 = get_redis_values_by_keys(client1, keys, cluster1_name, performance_tracker)
if not values1:
return {'error': f'{cluster1_name}获取数据失败'}
values2 = get_redis_values_by_keys(client2, keys, cluster2_name, performance_tracker)
if not values2:
return {'error': f'{cluster2_name}获取数据失败'}
# 开始数据比对
compare_start = time.time()
# 初始化统计数据
stats = {
'total_keys': len(keys),
'identical_count': 0,
'different_count': 0,
'missing_in_cluster1': 0,
'missing_in_cluster2': 0,
'both_missing': 0
}
# 详细结果列表
identical_results = []
different_results = []
missing_results = []
# 逐个比较
for i, key in enumerate(keys):
key_str = key.decode('utf-8') if isinstance(key, bytes) else key
value1_info = values1[i]
value2_info = values2[i]
# 使用redis_types模块的比较函数
comparison_result = compare_redis_values(value1_info, value2_info)
if comparison_result['status'] == 'both_missing':
stats['both_missing'] += 1
missing_results.append({
'key': key_str,
'status': 'both_missing',
'message': comparison_result['message']
})
elif comparison_result['status'] == 'missing_in_cluster1':
stats['missing_in_cluster1'] += 1
missing_results.append({
'key': key_str,
'status': 'missing_in_cluster1',
'cluster1_value': None,
'cluster2_value': value2_info['display_value'],
'cluster2_type': value2_info['type'],
'message': comparison_result['message']
})
elif comparison_result['status'] == 'missing_in_cluster2':
stats['missing_in_cluster2'] += 1
missing_results.append({
'key': key_str,
'status': 'missing_in_cluster2',
'cluster1_value': value1_info['display_value'],
'cluster1_type': value1_info['type'],
'cluster2_value': None,
'message': comparison_result['message']
})
elif comparison_result['status'] == 'identical':
stats['identical_count'] += 1
identical_results.append({
'key': key_str,
'value': value1_info['display_value'],
'type': value1_info['type']
})
else: # different
stats['different_count'] += 1
different_results.append({
'key': key_str,
'cluster1_value': value1_info['display_value'],
'cluster1_type': value1_info['type'],
'cluster2_value': value2_info['display_value'],
'cluster2_type': value2_info['type'],
'message': comparison_result['message']
})
compare_end = time.time()
comparison_duration = compare_end - compare_start
total_duration = compare_end - comparison_start_time
if performance_tracker:
performance_tracker.record_comparison_time(comparison_duration)
# 计算百分比
def safe_percentage(part, total):
return round((part / total * 100), 2) if total > 0 else 0
stats['identical_percentage'] = safe_percentage(stats['identical_count'], stats['total_keys'])
stats['different_percentage'] = safe_percentage(stats['different_count'], stats['total_keys'])
stats['missing_percentage'] = safe_percentage(
stats['missing_in_cluster1'] + stats['missing_in_cluster2'] + stats['both_missing'],
stats['total_keys']
)
result = {
'success': True,
'stats': stats,
'identical_results': identical_results,
'different_results': different_results,
'missing_results': missing_results,
'performance': {
'comparison_time': comparison_duration,
'total_time': total_duration
},
'clusters': {
'cluster1_name': cluster1_name,
'cluster2_name': cluster2_name
}
}
logger.info(f"数据比对完成,耗时 {comparison_duration:.3f}")
logger.info(f"比对统计: 总计{stats['total_keys']}个key相同{stats['identical_count']}个,不同{stats['different_count']}个,缺失{stats['missing_in_cluster1'] + stats['missing_in_cluster2'] + stats['both_missing']}")
query_log_collector.add_log('INFO', f"数据比对完成,耗时 {comparison_duration:.3f}")
query_log_collector.add_log('INFO', f"比对统计: 总计{stats['total_keys']}个key相同{stats['identical_count']}个,不同{stats['different_count']}个,缺失{stats['missing_in_cluster1'] + stats['missing_in_cluster2'] + stats['both_missing']}")
return result
def execute_redis_comparison(config1, config2, query_options):
"""
执行Redis数据比较的主要函数
Args:
config1: 第一个Redis集群配置
config2: 第二个Redis集群配置
query_options: 查询选项,包含查询模式和参数
Returns:
dict: 完整的比较结果
"""
from .redis_client import create_redis_client
# 创建性能追踪器
performance_tracker = RedisPerformanceTracker()
cluster1_name = config1.get('name', '生产集群')
cluster2_name = config2.get('name', '测试集群')
logger.info(f"开始执行Redis数据比较: {cluster1_name} vs {cluster2_name}")
query_log_collector.add_log('INFO', f"开始执行Redis数据比较: {cluster1_name} vs {cluster2_name}")
# 创建连接
client1 = create_redis_client(config1, cluster1_name, performance_tracker)
client2 = create_redis_client(config2, cluster2_name, performance_tracker)
if not client1:
error_msg = f'{cluster1_name}连接失败'
query_log_collector.add_log('ERROR', error_msg)
return {'error': error_msg}
if not client2:
error_msg = f'{cluster2_name}连接失败'
query_log_collector.add_log('ERROR', error_msg)
return {'error': error_msg}
try:
# 获取要比较的keys
keys = []
query_mode = query_options.get('mode', 'random')
if query_mode == 'random':
# 随机获取keys
count = query_options.get('count', 100)
pattern = query_options.get('pattern', '*')
source_cluster = query_options.get('source_cluster', 'cluster2') # 默认从第二个集群获取
source_client = client2 if source_cluster == 'cluster2' else client1
source_name = cluster2_name if source_cluster == 'cluster2' else cluster1_name
logger.info(f"{source_name}随机获取 {count} 个keys")
query_log_collector.add_log('INFO', f"{source_name}随机获取 {count} 个keys")
keys = get_random_keys_from_redis(source_client, count, pattern, performance_tracker)
elif query_mode == 'specified':
# 指定keys
keys = query_options.get('keys', [])
# 如果keys是字符串需要转换为bytesRedis通常使用bytes
keys = [k.encode('utf-8') if isinstance(k, str) else k for k in keys]
query_log_collector.add_log('INFO', f"使用指定的 {len(keys)} 个keys进行比较")
if not keys:
error_msg = '未获取到任何keys进行比较'
query_log_collector.add_log('ERROR', error_msg)
return {'error': error_msg}
logger.info(f"准备比较 {len(keys)} 个keys")
query_log_collector.add_log('INFO', f"准备比较 {len(keys)} 个keys")
# 执行比较
comparison_result = compare_redis_data(
client1, client2, keys,
cluster1_name, cluster2_name,
performance_tracker
)
# 添加性能报告
comparison_result['performance_report'] = performance_tracker.generate_report()
comparison_result['query_options'] = query_options
return comparison_result
except Exception as e:
logger.error(f"Redis数据比较执行失败: {e}")
query_log_collector.add_log('ERROR', f"Redis数据比较执行失败: {e}")
return {'error': f'执行失败: {str(e)}'}
finally:
# 关闭连接
try:
if client1:
client1.close()
if client2:
client2.close()
except:
pass