Files
BigDataTool/modules/cassandra_client.py
2025-08-12 16:27:00 +08:00

196 lines
8.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Cassandra连接管理模块
====================
本模块负责Cassandra数据库的连接管理和高级错误诊断功能。
核心功能:
1. 智能连接管理:自动处理集群连接和故障转移
2. 错误诊断系统:详细的连接失败分析和解决建议
3. 性能监控:连接时间和集群状态的实时监控
4. 容错机制:连接超时、重试和优雅降级
5. 安全认证支持用户名密码认证和SSL连接
连接特性:
- 负载均衡使用DCAwareRoundRobinPolicy避免单点故障
- 连接池管理:优化的连接复用和资源管理
- 超时控制:可配置的连接和查询超时时间
- 协议版本使用稳定的CQL协议版本4
- Schema同步自动等待集群Schema一致性
错误诊断系统:
- 连接拒绝:检查服务状态和网络连通性
- 认证失败:验证用户名密码和权限设置
- 超时错误:分析网络延迟和服务器负载
- Keyspace错误验证Keyspace存在性和访问权限
- 未知错误:提供通用的故障排查指南
监控功能:
- 集群状态:实时显示可用和故障节点
- 连接时间:精确的连接建立时间测量
- 元数据获取:集群名称和节点信息展示
- 性能指标:连接成功率和响应时间统计
作者BigDataTool项目组
更新时间2024年8月
"""
import time
import logging
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from cassandra.policies import DCAwareRoundRobinPolicy
logger = logging.getLogger(__name__)
def create_connection(config):
"""
创建Cassandra数据库连接具备增强的错误诊断和容错机制
本函数提供企业级的Cassandra连接管理包括
- 智能连接建立:自动选择最优连接参数
- 详细错误诊断:针对不同错误类型提供具体解决方案
- 性能监控:记录连接时间和集群状态
- 容错处理:连接失败时的优雅降级
Args:
config (dict): Cassandra连接配置包含以下字段
- hosts (list): Cassandra节点地址列表
- port (int): 连接端口默认9042
- username (str): 认证用户名
- password (str): 认证密码
- keyspace (str): 目标keyspace名称
- datacenter (str): 数据中心名称,默认'dc1'
Returns:
tuple: (cluster, session) 连接对象元组
- cluster: Cassandra集群对象用于管理连接
- session: 数据库会话对象,用于执行查询
- 连接失败时返回 (None, None)
连接配置优化:
- 协议版本使用稳定的协议版本4
- 连接超时15秒连接超时避免长时间等待
- 负载均衡DCAwareRoundRobinPolicy避免跨DC查询
- Schema同步30秒Schema一致性等待时间
- 查询超时30秒默认查询超时时间
错误诊断:
- 连接拒绝:提供服务状态检查建议
- 认证失败:提供用户权限验证指南
- 超时错误:提供网络和性能优化建议
- Keyspace错误提供Keyspace创建和权限指南
使用示例:
config = {
'hosts': ['127.0.0.1'],
'port': 9042,
'username': 'cassandra',
'password': 'password',
'keyspace': 'example_keyspace',
'datacenter': 'dc1'
}
cluster, session = create_connection(config)
if session:
result = session.execute("SELECT * FROM example_table LIMIT 10")
cluster.shutdown()
"""
start_time = time.time()
logger.info(f"=== 开始创建Cassandra连接 ===")
logger.info(f"主机列表: {config.get('hosts', [])}")
logger.info(f"端口: {config.get('port', 9042)}")
logger.info(f"用户名: {config.get('username', 'N/A')}")
logger.info(f"Keyspace: {config.get('keyspace', 'N/A')}")
try:
logger.info("正在创建认证提供者...")
auth_provider = PlainTextAuthProvider(username=config['username'], password=config['password'])
logger.info("正在创建集群连接...")
# 设置负载均衡策略,避免单点故障
load_balancing_policy = DCAwareRoundRobinPolicy(local_dc=config.get('datacenter', 'dc1'))
# 创建连接配置,增加容错参数
cluster = Cluster(
config['hosts'],
port=config['port'],
auth_provider=auth_provider,
load_balancing_policy=load_balancing_policy,
# 增加容错配置
protocol_version=4, # 使用稳定的协议版本
connect_timeout=15, # 连接超时
control_connection_timeout=15, # 控制连接超时
max_schema_agreement_wait=30 # schema同步等待时间
)
logger.info("正在连接到Keyspace...")
session = cluster.connect(config['keyspace'])
# 设置session级别的容错参数
session.default_timeout = 30 # 查询超时时间
connection_time = time.time() - start_time
logger.info(f"✅ Cassandra连接成功: 连接时间={connection_time:.3f}")
# 记录集群状态
try:
cluster_name = cluster.metadata.cluster_name or "Unknown"
logger.info(f" 集群名称: {cluster_name}")
# 记录可用主机状态
live_hosts = [str(host.address) for host in cluster.metadata.all_hosts() if host.is_up]
down_hosts = [str(host.address) for host in cluster.metadata.all_hosts() if not host.is_up]
logger.info(f" 可用节点: {live_hosts} ({len(live_hosts)}个)")
if down_hosts:
logger.warning(f" 故障节点: {down_hosts} ({len(down_hosts)}个)")
except Exception as meta_error:
logger.warning(f"无法获取集群元数据: {meta_error}")
return cluster, session
except Exception as e:
connection_time = time.time() - start_time
error_msg = str(e)
logger.error(f"❌ Cassandra连接失败: 连接时间={connection_time:.3f}")
logger.error(f"错误类型: {type(e).__name__}")
logger.error(f"错误详情: {error_msg}")
# 提供详细的诊断信息
if "connection refused" in error_msg.lower() or "unable to connect" in error_msg.lower():
logger.error("❌ 诊断无法连接到Cassandra服务器")
logger.error("🔧 建议检查:")
logger.error(" 1. Cassandra服务是否启动")
logger.error(" 2. 主机地址和端口是否正确")
logger.error(" 3. 网络防火墙是否阻挡连接")
elif "timeout" in error_msg.lower():
logger.error("❌ 诊断:连接超时")
logger.error("🔧 建议检查:")
logger.error(" 1. 网络延迟是否过高")
logger.error(" 2. Cassandra服务器负载是否过高")
logger.error(" 3. 增加连接超时时间")
elif "authentication" in error_msg.lower() or "unauthorized" in error_msg.lower():
logger.error("❌ 诊断:认证失败")
logger.error("🔧 建议检查:")
logger.error(" 1. 用户名和密码是否正确")
logger.error(" 2. 用户是否有访问该keyspace的权限")
elif "keyspace" in error_msg.lower():
logger.error("❌ 诊断Keyspace不存在")
logger.error("🔧 建议检查:")
logger.error(" 1. Keyspace名称是否正确")
logger.error(" 2. Keyspace是否已创建")
else:
logger.error("❌ 诊断:未知连接错误")
logger.error("🔧 建议:")
logger.error(" 1. 检查所有连接参数")
logger.error(" 2. 查看Cassandra服务器日志")
logger.error(" 3. 测试网络连通性")
return None, None