""" Redis查询引擎模块 ================= 本模块是Redis数据比对的核心引擎,提供高级的Redis数据查询和比较功能。 核心功能: 1. 多模式查询:随机采样和指定Key两种查询模式 2. 全类型支持:支持所有Redis数据类型的查询和比较 3. 智能比较:针对不同数据类型的专门比较算法 4. 性能监控:详细的查询时间和性能统计 5. 错误容错:单个Key查询失败不影响整体结果 查询模式: - 随机采样:从源集群随机获取指定数量的Key进行比对 - 指定Key:对用户提供的Key列表进行精确比对 - 模式匹配:支持通配符模式的Key筛选 支持的数据类型: - String:字符串类型,自动检测JSON格式 - Hash:哈希表,字段级别的深度比较 - List:列表,保持元素顺序的精确比较 - Set:集合,自动排序后的内容比较 - ZSet:有序集合,包含分数的完整比较 - Stream:消息流,消息级别的详细比较 比较算法: - JSON智能比较:自动检测和比较JSON格式数据 - 类型一致性检查:确保两个集群中数据类型一致 - 内容深度比较:递归比较复杂数据结构 - 性能优化:大数据集的高效比较算法 统计分析: - 一致性统计:相同、不同、缺失Key的详细统计 - 类型分布:各种数据类型的分布统计 - 性能指标:查询时间、连接时间等性能数据 - 错误分析:查询失败的详细错误统计 作者:BigDataTool项目组 更新时间:2024年8月 """ import time import logging import random from redis.cluster import key_slot from redis.exceptions import RedisError from .redis_client import RedisPerformanceTracker logger = logging.getLogger(__name__) # 导入查询日志收集器 try: from app import query_log_collector except ImportError: # 如果导入失败,创建一个空的日志收集器 class DummyQueryLogCollector: def start_new_batch(self, query_type): return None def end_current_batch(self): pass def set_history_id(self, history_id): pass def add_log(self, level, message): pass query_log_collector = DummyQueryLogCollector() def _get_redis_command_by_type(redis_type): """根据Redis数据类型返回对应的查询命令""" command_map = { 'string': 'GET', 'hash': 'HGETALL', 'list': 'LRANGE', 'set': 'SMEMBERS', 'zset': 'ZRANGE', 'stream': 'XRANGE' } return command_map.get(redis_type, 'TYPE') def _get_data_summary(key_info): """获取数据内容的概要信息""" if not key_info['exists']: return "不存在" key_type = key_info['type'] value = key_info['value'] try: if key_type == 'string': if isinstance(value, str): if len(value) > 50: return f"字符串({len(value)}字符): {value[:47]}..." else: return f"字符串: {value}" else: return f"字符串: {str(value)[:50]}..." elif key_type == 'hash': if isinstance(value, dict): field_count = len(value) sample_fields = list(value.keys())[:3] fields_str = ", ".join(sample_fields) if field_count > 3: fields_str += "..." return f"哈希({field_count}个字段): {fields_str}" else: return f"哈希: {str(value)[:50]}..." elif key_type == 'list': if isinstance(value, list): list_len = len(value) if list_len > 0: first_item = str(value[0])[:20] if value[0] else "空" return f"列表({list_len}个元素): [{first_item}...]" else: return "列表(空)" else: return f"列表: {str(value)[:50]}..." elif key_type == 'set': if isinstance(value, (set, list)): set_len = len(value) if set_len > 0: first_item = str(list(value)[0])[:20] if value else "空" return f"集合({set_len}个元素): {{{first_item}...}}" else: return "集合(空)" else: return f"集合: {str(value)[:50]}..." elif key_type == 'zset': if isinstance(value, list): zset_len = len(value) if zset_len > 0: first_item = f"{value[0][0]}:{value[0][1]}" if value[0] else "空" return f"有序集合({zset_len}个元素): {{{first_item}...}}" else: return "有序集合(空)" else: return f"有序集合: {str(value)[:50]}..." elif key_type == 'stream': if isinstance(value, list): stream_len = len(value) return f"流({stream_len}条消息)" else: return f"流: {str(value)[:50]}..." else: return f"未知类型: {str(value)[:50]}..." except Exception as e: return f"解析错误: {str(e)[:30]}..." def get_random_keys_from_redis(redis_client, count=100, pattern="*", performance_tracker=None): """ 从Redis集群中获取随机keys Args: redis_client: Redis客户端 count: 要获取的key数量 pattern: key匹配模式,默认为 "*" performance_tracker: 性能追踪器 Returns: list: 随机key列表 """ start_time = time.time() keys = set() logger.info(f"开始扫描获取随机keys,目标数量: {count},模式: {pattern}") query_log_collector.add_log('INFO', f"🔍 开始扫描Key,目标数量: {count},匹配模式: '{pattern}'") try: # 使用scan_iter获取keys scan_count = max(count * 2, 1000) # 扫描更多key以确保随机性 query_log_collector.add_log('INFO', f"📡 执行SCAN命令,扫描批次大小: {scan_count}") scan_iterations = 0 for key in redis_client.scan_iter(match=pattern, count=scan_count): keys.add(key) scan_iterations += 1 # 每扫描1000个key记录一次进度 if scan_iterations % 1000 == 0: query_log_collector.add_log('INFO', f"📊 扫描进度: 已发现 {len(keys)} 个匹配的Key") if len(keys) >= count * 3: # 获取更多key以便随机选择 break total_found = len(keys) query_log_collector.add_log('INFO', f"🎯 扫描完成,共发现 {total_found} 个匹配的Key") # 如果获取的key数量超过需要的数量,随机选择 if len(keys) > count: keys = random.sample(list(keys), count) query_log_collector.add_log('INFO', f"🎲 从 {total_found} 个Key中随机选择 {count} 个") else: keys = list(keys) if total_found < count: query_log_collector.add_log('WARNING', f"⚠️ 实际找到的Key数量({total_found})少于目标数量({count})") # 记录选中的Key样本(前10个) key_sample = keys[:10] if len(keys) > 10 else keys key_list_str = ", ".join([f"'{k}'" for k in key_sample]) if len(keys) > 10: key_list_str += f" ... (共{len(keys)}个)" query_log_collector.add_log('INFO', f"📋 选中的Key样本: [{key_list_str}]") end_time = time.time() scan_duration = end_time - start_time if performance_tracker: performance_tracker.record_scan_time(scan_duration) logger.info(f"扫描获取 {len(keys)} 个随机keys,耗时 {scan_duration:.3f} 秒") query_log_collector.add_log('INFO', f"✅ Key扫描完成,最终获取 {len(keys)} 个keys,总耗时 {scan_duration:.3f} 秒") return keys except RedisError as e: end_time = time.time() scan_duration = end_time - start_time if performance_tracker: performance_tracker.record_scan_time(scan_duration) logger.error(f"获取随机keys失败: {e},耗时 {scan_duration:.3f} 秒") query_log_collector.add_log('ERROR', f"获取随机keys失败: {e},耗时 {scan_duration:.3f} 秒") return [] def get_redis_values_by_keys(redis_client, keys, cluster_name="Redis集群", performance_tracker=None): """ 批量查询Redis中指定keys的值,支持所有Redis数据类型(String、Hash、List、Set、ZSet等) Args: redis_client: Redis客户端 keys: 要查询的key列表 cluster_name: 集群名称用于日志 performance_tracker: 性能追踪器 Returns: list: 对应keys的值信息字典列表,包含类型、值和显示格式 """ from .redis_types import get_redis_value_with_type start_time = time.time() result = [] logger.info(f"开始从{cluster_name}批量查询 {len(keys)} 个keys(支持所有数据类型)") query_log_collector.add_log('INFO', f"📊 开始从{cluster_name}批量查询 {len(keys)} 个keys(支持所有数据类型)") # 记录要查询的Key列表(前10个,避免日志过长) key_sample = keys[:10] if len(keys) > 10 else keys key_list_str = ", ".join([f"'{k}'" for k in key_sample]) if len(keys) > 10: key_list_str += f" ... (共{len(keys)}个)" query_log_collector.add_log('INFO', f"🔍 查询Key列表: [{key_list_str}]") try: # 逐个查询每个key,支持所有Redis数据类型 redis_commands_used = {} # 记录使用的Redis命令 for i, key in enumerate(keys): key_start_time = time.time() key_info = get_redis_value_with_type(redis_client, key) key_duration = time.time() - key_start_time result.append(key_info) # 记录每个key的查询详情 if key_info['exists']: key_type = key_info['type'] # 根据类型确定使用的Redis命令 redis_cmd = _get_redis_command_by_type(key_type) redis_commands_used[redis_cmd] = redis_commands_used.get(redis_cmd, 0) + 1 # 获取数据内容概要 data_summary = _get_data_summary(key_info) query_log_collector.add_log('INFO', f"✅ Key '{key}' | 类型: {key_type} | 命令: {redis_cmd} | 数据: {data_summary} | 耗时: {key_duration:.3f}s") else: query_log_collector.add_log('WARNING', f"❌ Key '{key}' | 状态: 不存在 | 耗时: {key_duration:.3f}s") end_time = time.time() query_duration = end_time - start_time if performance_tracker: performance_tracker.record_query(f"{cluster_name}_typed_batch_query", query_duration) # 统计成功获取的key数量和类型分布 successful_count = sum(1 for r in result if r['exists']) type_stats = {} for r in result: if r['exists']: key_type = r['type'] type_stats[key_type] = type_stats.get(key_type, 0) + 1 # 记录Redis命令使用统计 cmd_stats = ", ".join([f"{cmd}: {count}" for cmd, count in redis_commands_used.items()]) if redis_commands_used else "无" type_info = ", ".join([f"{t}: {c}" for t, c in type_stats.items()]) if type_stats else "无" query_log_collector.add_log('INFO', f"🎯 Redis命令统计: [{cmd_stats}]") query_log_collector.add_log('INFO', f"📈 从{cluster_name}查询完成,成功获取 {successful_count}/{len(keys)} 个值,数据类型分布: [{type_info}],总耗时 {query_duration:.3f} 秒") return result except Exception as e: end_time = time.time() query_duration = end_time - start_time if performance_tracker: performance_tracker.record_query(f"{cluster_name}_typed_batch_query_error", query_duration) logger.error(f"从{cluster_name}批量查询失败: {e},耗时 {query_duration:.3f} 秒") query_log_collector.add_log('ERROR', f"从{cluster_name}批量查询失败: {e},耗时 {query_duration:.3f} 秒") # 返回错误占位符 return [{'type': 'error', 'value': None, 'display_value': f'', 'exists': False} for _ in keys] def compare_redis_data(client1, client2, keys, cluster1_name="生产集群", cluster2_name="测试集群", performance_tracker=None): """ 比较两个Redis集群中指定keys的数据,支持所有Redis数据类型 Args: client1: 第一个Redis客户端(生产) client2: 第二个Redis客户端(测试) keys: 要比较的key列表 cluster1_name: 第一个集群名称 cluster2_name: 第二个集群名称 performance_tracker: 性能追踪器 Returns: dict: 比较结果,包含统计信息和差异详情 """ from .redis_types import compare_redis_values comparison_start_time = time.time() logger.info(f"开始比较 {cluster1_name} 和 {cluster2_name} 的数据(支持所有Redis数据类型)") query_log_collector.add_log('INFO', f"🔄 开始比较 {cluster1_name} 和 {cluster2_name} 的数据(支持所有Redis数据类型)") query_log_collector.add_log('INFO', f"📊 比较范围: {len(keys)} 个Key") # 获取两个集群的数据 query_log_collector.add_log('INFO', f"📥 第一步: 从{cluster1_name}获取数据") values1 = get_redis_values_by_keys(client1, keys, cluster1_name, performance_tracker) if not values1: error_msg = f'从{cluster1_name}获取数据失败' query_log_collector.add_log('ERROR', f"❌ {error_msg}") return {'error': error_msg} query_log_collector.add_log('INFO', f"📥 第二步: 从{cluster2_name}获取数据") values2 = get_redis_values_by_keys(client2, keys, cluster2_name, performance_tracker) if not values2: error_msg = f'从{cluster2_name}获取数据失败' query_log_collector.add_log('ERROR', f"❌ {error_msg}") return {'error': error_msg} # 开始数据比对 compare_start = time.time() query_log_collector.add_log('INFO', f"🔍 第三步: 开始逐个比较Key的数据内容") # 初始化统计数据 stats = { 'total_keys': len(keys), 'identical_count': 0, 'different_count': 0, 'missing_in_cluster1': 0, 'missing_in_cluster2': 0, 'both_missing': 0 } # 详细结果列表 identical_results = [] different_results = [] missing_results = [] # 逐个比较 comparison_details = [] # 记录比较详情 for i, key in enumerate(keys): key_str = key.decode('utf-8') if isinstance(key, bytes) else key value1_info = values1[i] value2_info = values2[i] # 使用redis_types模块的比较函数 comparison_result = compare_redis_values(value1_info, value2_info) # 记录比较详情 comparison_detail = { 'key': key_str, 'cluster1_exists': value1_info['exists'], 'cluster2_exists': value2_info['exists'], 'cluster1_type': value1_info.get('type'), 'cluster2_type': value2_info.get('type'), 'status': comparison_result['status'] } comparison_details.append(comparison_detail) if comparison_result['status'] == 'both_missing': stats['both_missing'] += 1 missing_results.append({ 'key': key_str, 'status': 'both_missing', 'message': comparison_result['message'] }) query_log_collector.add_log('WARNING', f"⚠️ Key '{key_str}': 两个集群都不存在") elif comparison_result['status'] == 'missing_in_cluster1': stats['missing_in_cluster1'] += 1 missing_results.append({ 'key': key_str, 'status': 'missing_in_cluster1', 'cluster1_value': None, 'cluster2_value': value2_info['display_value'], 'cluster2_type': value2_info['type'], 'message': comparison_result['message'] }) query_log_collector.add_log('WARNING', f"❌ Key '{key_str}': 仅在{cluster2_name}存在 (类型: {value2_info['type']})") elif comparison_result['status'] == 'missing_in_cluster2': stats['missing_in_cluster2'] += 1 missing_results.append({ 'key': key_str, 'status': 'missing_in_cluster2', 'cluster1_value': value1_info['display_value'], 'cluster1_type': value1_info['type'], 'cluster2_value': None, 'message': comparison_result['message'] }) query_log_collector.add_log('WARNING', f"❌ Key '{key_str}': 仅在{cluster1_name}存在 (类型: {value1_info['type']})") elif comparison_result['status'] == 'identical': stats['identical_count'] += 1 identical_results.append({ 'key': key_str, 'value': value1_info['display_value'], 'type': value1_info['type'] }) query_log_collector.add_log('INFO', f"✅ Key '{key_str}': 数据一致 (类型: {value1_info['type']})") else: # different stats['different_count'] += 1 different_results.append({ 'key': key_str, 'cluster1_value': value1_info['display_value'], 'cluster1_type': value1_info['type'], 'cluster2_value': value2_info['display_value'], 'cluster2_type': value2_info['type'], 'message': comparison_result['message'] }) # 记录差异详情 type_info = f"{value1_info['type']} vs {value2_info['type']}" if value1_info['type'] != value2_info['type'] else value1_info['type'] query_log_collector.add_log('WARNING', f"🔄 Key '{key_str}': 数据不一致 (类型: {type_info}) - {comparison_result['message']}") # 每处理100个key记录一次进度 if (i + 1) % 100 == 0: progress = f"{i + 1}/{len(keys)}" query_log_collector.add_log('INFO', f"📊 比较进度: {progress} ({((i + 1) / len(keys) * 100):.1f}%)") compare_end = time.time() comparison_duration = compare_end - compare_start total_duration = compare_end - comparison_start_time if performance_tracker: performance_tracker.record_comparison_time(comparison_duration) # 计算百分比 def safe_percentage(part, total): return round((part / total * 100), 2) if total > 0 else 0 stats['identical_percentage'] = safe_percentage(stats['identical_count'], stats['total_keys']) stats['different_percentage'] = safe_percentage(stats['different_count'], stats['total_keys']) stats['missing_percentage'] = safe_percentage( stats['missing_in_cluster1'] + stats['missing_in_cluster2'] + stats['both_missing'], stats['total_keys'] ) result = { 'success': True, 'stats': stats, 'identical_results': identical_results, 'different_results': different_results, 'missing_results': missing_results, 'performance': { 'comparison_time': comparison_duration, 'total_time': total_duration }, 'clusters': { 'cluster1_name': cluster1_name, 'cluster2_name': cluster2_name } } # 记录详细的比较总结 query_log_collector.add_log('INFO', f"🎯 数据比对完成,纯比较耗时 {comparison_duration:.3f} 秒,总耗时 {total_duration:.3f} 秒") # 记录统计信息 query_log_collector.add_log('INFO', f"📊 比对统计总览:") query_log_collector.add_log('INFO', f" • 总Key数量: {stats['total_keys']}") query_log_collector.add_log('INFO', f" • ✅ 数据一致: {stats['identical_count']} ({stats['identical_percentage']}%)") query_log_collector.add_log('INFO', f" • 🔄 数据不同: {stats['different_count']} ({stats['different_percentage']}%)") query_log_collector.add_log('INFO', f" • ❌ 仅{cluster1_name}存在: {stats['missing_in_cluster2']}") query_log_collector.add_log('INFO', f" • ❌ 仅{cluster2_name}存在: {stats['missing_in_cluster1']}") query_log_collector.add_log('INFO', f" • ⚠️ 两集群都不存在: {stats['both_missing']}") # 记录性能信息 if performance_tracker: query_log_collector.add_log('INFO', f"⚡ 性能统计: 平均每Key比较耗时 {(comparison_duration / len(keys) * 1000):.2f}ms") # 记录所有Key的详细信息 query_log_collector.add_log('INFO', f"📋 全部Key详细信息:") # 统计类型分布 type_distribution = {} for detail in comparison_details: key_str = detail['key'] cluster1_type = detail.get('cluster1_type', 'N/A') cluster2_type = detail.get('cluster2_type', 'N/A') status = detail.get('status', 'unknown') # 统计类型分布 if cluster1_type != 'N/A': type_distribution[cluster1_type] = type_distribution.get(cluster1_type, 0) + 1 elif cluster2_type != 'N/A': type_distribution[cluster2_type] = type_distribution.get(cluster2_type, 0) + 1 # 记录每个Key的详细信息 if status == 'identical': query_log_collector.add_log('INFO', f" ✅ {key_str} → 类型: {cluster1_type}, 状态: 数据一致") elif status == 'different': type_info = cluster1_type if cluster1_type == cluster2_type else f"{cluster1_name}:{cluster1_type} vs {cluster2_name}:{cluster2_type}" query_log_collector.add_log('INFO', f" 🔄 {key_str} → 类型: {type_info}, 状态: 数据不同") elif status == 'missing_in_cluster1': query_log_collector.add_log('INFO', f" ❌ {key_str} → 类型: {cluster2_type}, 状态: 仅在{cluster2_name}存在") elif status == 'missing_in_cluster2': query_log_collector.add_log('INFO', f" ❌ {key_str} → 类型: {cluster1_type}, 状态: 仅在{cluster1_name}存在") elif status == 'both_missing': query_log_collector.add_log('INFO', f" ⚠️ {key_str} → 类型: N/A, 状态: 两集群都不存在") # 记录类型分布统计 if type_distribution: query_log_collector.add_log('INFO', f"📊 数据类型分布统计:") for data_type, count in sorted(type_distribution.items()): percentage = (count / len(keys)) * 100 query_log_collector.add_log('INFO', f" • {data_type}: {count} 个 ({percentage:.1f}%)") # 记录Key列表摘要 key_summary = [detail['key'] for detail in comparison_details[:10]] # 显示前10个key key_list_str = ', '.join(key_summary) if len(comparison_details) > 10: key_list_str += f" ... (共{len(comparison_details)}个Key)" query_log_collector.add_log('INFO', f"📝 Key列表摘要: [{key_list_str}]") logger.info(f"数据比对完成,耗时 {comparison_duration:.3f} 秒") logger.info(f"比对统计: 总计{stats['total_keys']}个key,相同{stats['identical_count']}个,不同{stats['different_count']}个,缺失{stats['missing_in_cluster1'] + stats['missing_in_cluster2'] + stats['both_missing']}个") return result def execute_redis_comparison(config1, config2, query_options): """ 执行Redis数据比较的主要函数 Args: config1: 第一个Redis集群配置 config2: 第二个Redis集群配置 query_options: 查询选项,包含查询模式和参数 Returns: dict: 完整的比较结果 """ from .redis_client import create_redis_client # 创建性能追踪器 performance_tracker = RedisPerformanceTracker() cluster1_name = config1.get('name', '生产集群') cluster2_name = config2.get('name', '测试集群') logger.info(f"开始执行Redis数据比较: {cluster1_name} vs {cluster2_name}") # 开始新的查询批次,使用redis查询类型 batch_id = query_log_collector.start_new_batch('redis') query_log_collector.add_log('INFO', f"🚀 开始执行Redis数据比较: {cluster1_name} vs {cluster2_name}") query_log_collector.add_log('INFO', f"📋 查询批次ID: {batch_id}") # 创建连接 client1 = create_redis_client(config1, cluster1_name, performance_tracker) client2 = create_redis_client(config2, cluster2_name, performance_tracker) if not client1: error_msg = f'{cluster1_name}连接失败' query_log_collector.add_log('ERROR', error_msg) return {'error': error_msg} if not client2: error_msg = f'{cluster2_name}连接失败' query_log_collector.add_log('ERROR', error_msg) return {'error': error_msg} try: # 获取要比较的keys keys = [] query_mode = query_options.get('mode', 'random') if query_mode == 'random': # 随机获取keys count = query_options.get('count', 100) pattern = query_options.get('pattern', '*') source_cluster = query_options.get('source_cluster', 'cluster2') # 默认从第二个集群获取 source_client = client2 if source_cluster == 'cluster2' else client1 source_name = cluster2_name if source_cluster == 'cluster2' else cluster1_name logger.info(f"从{source_name}随机获取 {count} 个keys") query_log_collector.add_log('INFO', f"从{source_name}随机获取 {count} 个keys") keys = get_random_keys_from_redis(source_client, count, pattern, performance_tracker) elif query_mode == 'specified': # 指定keys keys = query_options.get('keys', []) # 如果keys是字符串,需要转换为bytes(Redis通常使用bytes) keys = [k.encode('utf-8') if isinstance(k, str) else k for k in keys] query_log_collector.add_log('INFO', f"使用指定的 {len(keys)} 个keys进行比较") if not keys: error_msg = '未获取到任何keys进行比较' query_log_collector.add_log('ERROR', error_msg) return {'error': error_msg} logger.info(f"准备比较 {len(keys)} 个keys") query_log_collector.add_log('INFO', f"准备比较 {len(keys)} 个keys") # 执行比较 comparison_result = compare_redis_data( client1, client2, keys, cluster1_name, cluster2_name, performance_tracker ) # 添加性能报告 comparison_result['performance_report'] = performance_tracker.generate_report() comparison_result['query_options'] = query_options comparison_result['batch_id'] = batch_id # 添加批次ID到结果中 # 记录最终结果 if comparison_result.get('success'): query_log_collector.add_log('INFO', f"🎉 Redis数据比较执行成功完成") # 结束当前批次 query_log_collector.end_current_batch() return comparison_result except Exception as e: logger.error(f"Redis数据比较执行失败: {e}") query_log_collector.add_log('ERROR', f"💥 Redis数据比较执行失败: {e}") # 结束当前批次 query_log_collector.end_current_batch() return {'error': f'执行失败: {str(e)}', 'batch_id': batch_id} finally: # 关闭连接 try: if client1: client1.close() if client2: client2.close() except: pass