196 lines
8.0 KiB
Python
196 lines
8.0 KiB
Python
"""
|
||
Cassandra连接管理模块
|
||
====================
|
||
|
||
本模块负责Cassandra数据库的连接管理和高级错误诊断功能。
|
||
|
||
核心功能:
|
||
1. 智能连接管理:自动处理集群连接和故障转移
|
||
2. 错误诊断系统:详细的连接失败分析和解决建议
|
||
3. 性能监控:连接时间和集群状态的实时监控
|
||
4. 容错机制:连接超时、重试和优雅降级
|
||
5. 安全认证:支持用户名密码认证和SSL连接
|
||
|
||
连接特性:
|
||
- 负载均衡:使用DCAwareRoundRobinPolicy避免单点故障
|
||
- 连接池管理:优化的连接复用和资源管理
|
||
- 超时控制:可配置的连接和查询超时时间
|
||
- 协议版本:使用稳定的CQL协议版本4
|
||
- Schema同步:自动等待集群Schema一致性
|
||
|
||
错误诊断系统:
|
||
- 连接拒绝:检查服务状态和网络连通性
|
||
- 认证失败:验证用户名密码和权限设置
|
||
- 超时错误:分析网络延迟和服务器负载
|
||
- Keyspace错误:验证Keyspace存在性和访问权限
|
||
- 未知错误:提供通用的故障排查指南
|
||
|
||
监控功能:
|
||
- 集群状态:实时显示可用和故障节点
|
||
- 连接时间:精确的连接建立时间测量
|
||
- 元数据获取:集群名称和节点信息展示
|
||
- 性能指标:连接成功率和响应时间统计
|
||
|
||
作者:BigDataTool项目组
|
||
更新时间:2024年8月
|
||
"""
|
||
|
||
import time
|
||
import logging
|
||
from cassandra.cluster import Cluster
|
||
from cassandra.auth import PlainTextAuthProvider
|
||
from cassandra.policies import DCAwareRoundRobinPolicy
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
def create_connection(config):
|
||
"""
|
||
创建Cassandra数据库连接,具备增强的错误诊断和容错机制
|
||
|
||
本函数提供企业级的Cassandra连接管理,包括:
|
||
- 智能连接建立:自动选择最优连接参数
|
||
- 详细错误诊断:针对不同错误类型提供具体解决方案
|
||
- 性能监控:记录连接时间和集群状态
|
||
- 容错处理:连接失败时的优雅降级
|
||
|
||
Args:
|
||
config (dict): Cassandra连接配置,包含以下字段:
|
||
- hosts (list): Cassandra节点地址列表
|
||
- port (int): 连接端口,默认9042
|
||
- username (str): 认证用户名
|
||
- password (str): 认证密码
|
||
- keyspace (str): 目标keyspace名称
|
||
- datacenter (str): 数据中心名称,默认'dc1'
|
||
|
||
Returns:
|
||
tuple: (cluster, session) 连接对象元组
|
||
- cluster: Cassandra集群对象,用于管理连接
|
||
- session: 数据库会话对象,用于执行查询
|
||
- 连接失败时返回 (None, None)
|
||
|
||
连接配置优化:
|
||
- 协议版本:使用稳定的协议版本4
|
||
- 连接超时:15秒连接超时,避免长时间等待
|
||
- 负载均衡:DCAwareRoundRobinPolicy避免跨DC查询
|
||
- Schema同步:30秒Schema一致性等待时间
|
||
- 查询超时:30秒默认查询超时时间
|
||
|
||
错误诊断:
|
||
- 连接拒绝:提供服务状态检查建议
|
||
- 认证失败:提供用户权限验证指南
|
||
- 超时错误:提供网络和性能优化建议
|
||
- Keyspace错误:提供Keyspace创建和权限指南
|
||
|
||
使用示例:
|
||
config = {
|
||
'hosts': ['127.0.0.1'],
|
||
'port': 9042,
|
||
'username': 'cassandra',
|
||
'password': 'password',
|
||
'keyspace': 'example_keyspace',
|
||
'datacenter': 'dc1'
|
||
}
|
||
cluster, session = create_connection(config)
|
||
if session:
|
||
result = session.execute("SELECT * FROM example_table LIMIT 10")
|
||
cluster.shutdown()
|
||
"""
|
||
start_time = time.time()
|
||
|
||
logger.info(f"=== 开始创建Cassandra连接 ===")
|
||
logger.info(f"主机列表: {config.get('hosts', [])}")
|
||
logger.info(f"端口: {config.get('port', 9042)}")
|
||
logger.info(f"用户名: {config.get('username', 'N/A')}")
|
||
logger.info(f"Keyspace: {config.get('keyspace', 'N/A')}")
|
||
|
||
try:
|
||
logger.info("正在创建认证提供者...")
|
||
auth_provider = PlainTextAuthProvider(username=config['username'], password=config['password'])
|
||
|
||
logger.info("正在创建集群连接...")
|
||
# 设置负载均衡策略,避免单点故障
|
||
load_balancing_policy = DCAwareRoundRobinPolicy(local_dc=config.get('datacenter', 'dc1'))
|
||
|
||
# 创建连接配置,增加容错参数
|
||
cluster = Cluster(
|
||
config['hosts'],
|
||
port=config['port'],
|
||
auth_provider=auth_provider,
|
||
load_balancing_policy=load_balancing_policy,
|
||
# 增加容错配置
|
||
protocol_version=4, # 使用稳定的协议版本
|
||
connect_timeout=15, # 连接超时
|
||
control_connection_timeout=15, # 控制连接超时
|
||
max_schema_agreement_wait=30 # schema同步等待时间
|
||
)
|
||
|
||
logger.info("正在连接到Keyspace...")
|
||
session = cluster.connect(config['keyspace'])
|
||
|
||
# 设置session级别的容错参数
|
||
session.default_timeout = 30 # 查询超时时间
|
||
|
||
connection_time = time.time() - start_time
|
||
logger.info(f"✅ Cassandra连接成功: 连接时间={connection_time:.3f}秒")
|
||
|
||
# 记录集群状态
|
||
try:
|
||
cluster_name = cluster.metadata.cluster_name or "Unknown"
|
||
logger.info(f" 集群名称: {cluster_name}")
|
||
|
||
# 记录可用主机状态
|
||
live_hosts = [str(host.address) for host in cluster.metadata.all_hosts() if host.is_up]
|
||
down_hosts = [str(host.address) for host in cluster.metadata.all_hosts() if not host.is_up]
|
||
|
||
logger.info(f" 可用节点: {live_hosts} ({len(live_hosts)}个)")
|
||
if down_hosts:
|
||
logger.warning(f" 故障节点: {down_hosts} ({len(down_hosts)}个)")
|
||
|
||
except Exception as meta_error:
|
||
logger.warning(f"无法获取集群元数据: {meta_error}")
|
||
|
||
return cluster, session
|
||
|
||
except Exception as e:
|
||
connection_time = time.time() - start_time
|
||
error_msg = str(e)
|
||
|
||
logger.error(f"❌ Cassandra连接失败: 连接时间={connection_time:.3f}秒")
|
||
logger.error(f"错误类型: {type(e).__name__}")
|
||
logger.error(f"错误详情: {error_msg}")
|
||
|
||
# 提供详细的诊断信息
|
||
if "connection refused" in error_msg.lower() or "unable to connect" in error_msg.lower():
|
||
logger.error("❌ 诊断:无法连接到Cassandra服务器")
|
||
logger.error("🔧 建议检查:")
|
||
logger.error(" 1. Cassandra服务是否启动")
|
||
logger.error(" 2. 主机地址和端口是否正确")
|
||
logger.error(" 3. 网络防火墙是否阻挡连接")
|
||
|
||
elif "timeout" in error_msg.lower():
|
||
logger.error("❌ 诊断:连接超时")
|
||
logger.error("🔧 建议检查:")
|
||
logger.error(" 1. 网络延迟是否过高")
|
||
logger.error(" 2. Cassandra服务器负载是否过高")
|
||
logger.error(" 3. 增加连接超时时间")
|
||
|
||
elif "authentication" in error_msg.lower() or "unauthorized" in error_msg.lower():
|
||
logger.error("❌ 诊断:认证失败")
|
||
logger.error("🔧 建议检查:")
|
||
logger.error(" 1. 用户名和密码是否正确")
|
||
logger.error(" 2. 用户是否有访问该keyspace的权限")
|
||
|
||
elif "keyspace" in error_msg.lower():
|
||
logger.error("❌ 诊断:Keyspace不存在")
|
||
logger.error("🔧 建议检查:")
|
||
logger.error(" 1. Keyspace名称是否正确")
|
||
logger.error(" 2. Keyspace是否已创建")
|
||
|
||
else:
|
||
logger.error("❌ 诊断:未知连接错误")
|
||
logger.error("🔧 建议:")
|
||
logger.error(" 1. 检查所有连接参数")
|
||
logger.error(" 2. 查看Cassandra服务器日志")
|
||
logger.error(" 3. 测试网络连通性")
|
||
|
||
return None, None |