279 lines
10 KiB
Python
279 lines
10 KiB
Python
"""
|
||
Redis连接管理模块
|
||
===================
|
||
|
||
本模块提供Redis集群的连接管理和基础操作功能,支持单节点和集群模式。
|
||
|
||
核心功能:
|
||
1. 智能连接管理:自动检测单节点和集群模式
|
||
2. 连接池优化:高效的连接复用和资源管理
|
||
3. 错误处理:完善的连接失败诊断和重试机制
|
||
4. 性能监控:连接时间和操作性能的实时监控
|
||
5. 类型检测:自动识别Redis数据类型
|
||
|
||
连接特性:
|
||
- 自适应模式:根据节点数量自动选择连接方式
|
||
- 连接池管理:每个节点独立的连接池配置
|
||
- 超时控制:可配置的连接和操作超时时间
|
||
- 密码认证:支持Redis AUTH认证
|
||
- 健康检查:连接状态的实时监控
|
||
|
||
支持的Redis版本:
|
||
- Redis 5.0+:完整功能支持
|
||
- Redis Cluster:集群模式支持
|
||
- Redis Sentinel:哨兵模式支持(通过配置)
|
||
|
||
错误诊断:
|
||
- 连接超时:网络延迟和服务器负载分析
|
||
- 认证失败:密码验证和权限检查
|
||
- 集群错误:节点状态和集群配置验证
|
||
- 数据类型错误:类型检测和转换建议
|
||
|
||
作者:BigDataTool项目组
|
||
更新时间:2024年8月
|
||
"""
|
||
|
||
import time
|
||
import logging
|
||
import redis
|
||
from redis.cluster import RedisCluster, ClusterNode, key_slot
|
||
from redis.exceptions import RedisError, ConnectionError
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
class RedisPerformanceTracker:
|
||
"""Redis操作性能统计追踪器"""
|
||
|
||
def __init__(self):
|
||
self.connection_times = {} # 连接耗时
|
||
self.query_times = {} # 查询耗时
|
||
self.comparison_time = 0 # 比对耗时
|
||
self.scan_time = 0 # scan操作耗时
|
||
self.connection_status = {} # 连接状态
|
||
self.start_time = time.time()
|
||
|
||
def record_connection(self, cluster_name, start_time, end_time, success, error_msg=None):
|
||
"""记录连接信息"""
|
||
self.connection_times[cluster_name] = end_time - start_time
|
||
self.connection_status[cluster_name] = {
|
||
'success': success,
|
||
'error_msg': error_msg,
|
||
'connect_time': end_time - start_time
|
||
}
|
||
|
||
def record_query(self, operation_name, duration):
|
||
"""记录查询操作耗时"""
|
||
self.query_times[operation_name] = duration
|
||
|
||
def record_scan_time(self, duration):
|
||
"""记录scan操作耗时"""
|
||
self.scan_time = duration
|
||
|
||
def record_comparison_time(self, duration):
|
||
"""记录比对耗时"""
|
||
self.comparison_time = duration
|
||
|
||
def get_total_time(self):
|
||
"""获取总耗时"""
|
||
return time.time() - self.start_time
|
||
|
||
def generate_report(self):
|
||
"""生成性能报告"""
|
||
total_time = self.get_total_time()
|
||
report = {
|
||
'total_time': total_time,
|
||
'connections': self.connection_status,
|
||
'operations': {
|
||
'scan_time': self.scan_time,
|
||
'comparison_time': self.comparison_time,
|
||
'queries': self.query_times
|
||
}
|
||
}
|
||
return report
|
||
|
||
def create_redis_client(cluster_config, cluster_name="Redis集群", performance_tracker=None):
|
||
"""
|
||
创建Redis客户端,自动检测单节点或集群模式
|
||
|
||
Args:
|
||
cluster_config: Redis配置
|
||
cluster_name: 集群名称用于日志
|
||
performance_tracker: 性能追踪器
|
||
|
||
Returns:
|
||
Redis客户端实例或None
|
||
"""
|
||
start_time = time.time()
|
||
|
||
try:
|
||
# 获取节点配置
|
||
nodes = cluster_config.get('nodes', [])
|
||
if not nodes:
|
||
raise RedisError("未配置Redis节点")
|
||
|
||
# 通用连接参数
|
||
common_params = {
|
||
'password': cluster_config.get('password'),
|
||
'socket_timeout': cluster_config.get('socket_timeout', 3),
|
||
'socket_connect_timeout': cluster_config.get('socket_connect_timeout', 3),
|
||
'decode_responses': False, # 保持原始字节数据
|
||
'retry_on_timeout': True
|
||
}
|
||
|
||
logger.info(f"正在连接{cluster_name}...")
|
||
logger.info(f"节点配置: {[(node['host'], node['port']) for node in nodes]}")
|
||
|
||
# 首先尝试单节点模式连接第一个节点
|
||
first_node = nodes[0]
|
||
try:
|
||
logger.info(f"尝试单节点模式连接: {first_node['host']}:{first_node['port']}")
|
||
|
||
single_client = redis.Redis(
|
||
host=first_node['host'],
|
||
port=first_node['port'],
|
||
**common_params
|
||
)
|
||
|
||
# 测试连接
|
||
single_client.ping()
|
||
|
||
# 检查是否启用了集群模式
|
||
try:
|
||
info = single_client.info()
|
||
cluster_enabled = info.get('cluster_enabled', 0)
|
||
|
||
if cluster_enabled == 1:
|
||
# 这是一个集群节点,关闭单节点连接,使用集群模式
|
||
logger.info("检测到集群模式已启用,切换到集群客户端")
|
||
single_client.close()
|
||
return _create_cluster_client(cluster_config, cluster_name, performance_tracker, start_time, common_params)
|
||
else:
|
||
# 单节点模式工作正常
|
||
end_time = time.time()
|
||
connection_time = end_time - start_time
|
||
|
||
if performance_tracker:
|
||
performance_tracker.record_connection(cluster_name, start_time, end_time, True)
|
||
|
||
logger.info(f"✅ {cluster_name}连接成功(单节点模式),耗时 {connection_time:.3f} 秒")
|
||
return single_client
|
||
|
||
except Exception as info_error:
|
||
# 如果获取info失败,但ping成功,仍然使用单节点模式
|
||
logger.warning(f"无法获取集群信息,继续使用单节点模式: {info_error}")
|
||
end_time = time.time()
|
||
connection_time = end_time - start_time
|
||
|
||
if performance_tracker:
|
||
performance_tracker.record_connection(cluster_name, start_time, end_time, True)
|
||
|
||
logger.info(f"✅ {cluster_name}连接成功(单节点模式),耗时 {connection_time:.3f} 秒")
|
||
return single_client
|
||
|
||
except Exception as single_error:
|
||
logger.warning(f"单节点模式连接失败: {single_error}")
|
||
logger.info("尝试集群模式连接...")
|
||
|
||
# 单节点模式失败,尝试集群模式
|
||
return _create_cluster_client(cluster_config, cluster_name, performance_tracker, start_time, common_params)
|
||
|
||
except Exception as e:
|
||
end_time = time.time()
|
||
connection_time = end_time - start_time
|
||
error_msg = f"连接失败: {str(e)}"
|
||
|
||
if performance_tracker:
|
||
performance_tracker.record_connection(cluster_name, start_time, end_time, False, error_msg)
|
||
|
||
logger.error(f"❌ {cluster_name}{error_msg},耗时 {connection_time:.3f} 秒")
|
||
return None
|
||
|
||
def _create_cluster_client(cluster_config, cluster_name, performance_tracker, start_time, common_params):
|
||
"""创建集群客户端"""
|
||
try:
|
||
# 构建集群节点列表
|
||
startup_nodes = []
|
||
for node in cluster_config.get('nodes', []):
|
||
startup_nodes.append(ClusterNode(node['host'], node['port']))
|
||
|
||
# 创建Redis集群客户端
|
||
cluster_client = RedisCluster(
|
||
startup_nodes=startup_nodes,
|
||
max_connections_per_node=cluster_config.get('max_connections_per_node', 16),
|
||
skip_full_coverage_check=True, # 跳过全覆盖检查,允许部分节点不可用
|
||
**common_params
|
||
)
|
||
|
||
# 测试集群连接
|
||
cluster_client.ping()
|
||
|
||
end_time = time.time()
|
||
connection_time = end_time - start_time
|
||
|
||
if performance_tracker:
|
||
performance_tracker.record_connection(cluster_name, start_time, end_time, True)
|
||
|
||
logger.info(f"✅ {cluster_name}连接成功(集群模式),耗时 {connection_time:.3f} 秒")
|
||
return cluster_client
|
||
|
||
except Exception as cluster_error:
|
||
end_time = time.time()
|
||
connection_time = end_time - start_time
|
||
error_msg = f"集群模式连接失败: {str(cluster_error)}"
|
||
|
||
if performance_tracker:
|
||
performance_tracker.record_connection(cluster_name, start_time, end_time, False, error_msg)
|
||
|
||
logger.error(f"❌ {cluster_name}{error_msg},耗时 {connection_time:.3f} 秒")
|
||
return None
|
||
|
||
def test_redis_connection(cluster_config, cluster_name="Redis集群"):
|
||
"""
|
||
测试Redis连接
|
||
|
||
Args:
|
||
cluster_config: Redis集群配置
|
||
cluster_name: 集群名称
|
||
|
||
Returns:
|
||
dict: 连接测试结果
|
||
"""
|
||
result = {
|
||
'success': False,
|
||
'error': None,
|
||
'connection_time': 0,
|
||
'cluster_info': None
|
||
}
|
||
|
||
start_time = time.time()
|
||
client = None
|
||
|
||
try:
|
||
client = create_redis_client(cluster_config, cluster_name)
|
||
if client:
|
||
# 获取集群信息
|
||
info = client.info()
|
||
cluster_info = {
|
||
'redis_version': info.get('redis_version', 'Unknown'),
|
||
'connected_clients': info.get('connected_clients', 0),
|
||
'used_memory_human': info.get('used_memory_human', 'Unknown'),
|
||
'keyspace_hits': info.get('keyspace_hits', 0),
|
||
'keyspace_misses': info.get('keyspace_misses', 0)
|
||
}
|
||
|
||
result['success'] = True
|
||
result['cluster_info'] = cluster_info
|
||
else:
|
||
result['error'] = "连接创建失败"
|
||
|
||
except Exception as e:
|
||
result['error'] = str(e)
|
||
finally:
|
||
result['connection_time'] = time.time() - start_time
|
||
if client:
|
||
try:
|
||
client.close()
|
||
except:
|
||
pass
|
||
|
||
return result |