完善查询

This commit is contained in:
2025-08-02 21:37:19 +08:00
parent 1a75dcd0fc
commit eae6a14272

189
app.py
View File

@@ -76,26 +76,46 @@ class ShardingCalculator:
def extract_timestamp_from_key(self, key):
"""
从Key中提取时间戳
新规则:删除所有非数字字符,然后作为时间戳
新规则:优先提取最后一个下划线后的数字,如果没有下划线则提取最后连续的数字部分
"""
if not key:
return None
key_str = str(key)
# 删除所有非数字字符
numbers = re.sub(r'\D', '', key_str)
# 方法1如果包含下划线尝试提取最后一个下划线后的部分
if '_' in key_str:
parts = key_str.split('_')
last_part = parts[-1]
# 检查最后一部分是否为纯数字
if last_part.isdigit():
timestamp = int(last_part)
logger.info(f"Key '{key}' 通过下划线分割提取到时间戳: {timestamp}")
return timestamp
if not numbers:
# 方法2使用正则表达式找到所有数字序列取最后一个较长的
number_sequences = re.findall(r'\d+', key_str)
if not number_sequences:
logger.warning(f"Key '{key}' 中没有找到数字字符")
return None
# 如果有多个数字序列,优先选择最长的,如果长度相同则选择最后一个
longest_sequence = max(number_sequences, key=len)
# 如果最长的有多个,选择最后一个最长的
max_length = len(longest_sequence)
last_longest = None
for seq in number_sequences:
if len(seq) == max_length:
last_longest = seq
try:
timestamp = int(numbers)
logger.info(f"Key '{key}' 提取到时间戳: {timestamp}")
timestamp = int(last_longest)
logger.info(f"Key '{key}' 通过数字序列提取到时间戳: {timestamp} (从序列 {number_sequences} 中选择)")
return timestamp
except ValueError:
logger.error(f"Key '{key}' 数字转换失败: {numbers}")
logger.error(f"Key '{key}' 时间戳转换失败: {last_longest}")
return None
def calculate_shard_index(self, timestamp):
@@ -729,26 +749,86 @@ def delete_query_history(history_id):
conn.close()
def create_connection(config):
"""创建Cassandra连接"""
"""创建Cassandra连接,带有增强的错误诊断"""
start_time = time.time()
logger.info(f"=== 开始创建Cassandra连接 ===")
logger.info(f"主机列表: {config.get('hosts', [])}")
logger.info(f"端口: {config.get('port', 9042)}")
logger.info(f"用户名: {config.get('username', 'N/A')}")
logger.info(f"Keyspace: {config.get('keyspace', 'N/A')}")
try:
logger.info(f"正在连接Cassandra数据库: {config['hosts']}:{config['port']}, keyspace={config['keyspace']}")
start_time = time.time()
logger.info("正在创建认证提供者...")
auth_provider = PlainTextAuthProvider(username=config['username'], password=config['password'])
logger.info("正在创建集群连接...")
cluster = Cluster(config['hosts'], port=config['port'], auth_provider=auth_provider)
logger.info("正在连接到Keyspace...")
session = cluster.connect(config['keyspace'])
connection_time = time.time() - start_time
logger.info(f"Cassandra连接成功: 连接时间={connection_time:.3f}秒, 集群={cluster.metadata.cluster_name}")
logger.info(f"Cassandra连接成功: 连接时间={connection_time:.3f}秒, 集群={cluster.metadata.cluster_name}")
return cluster, session
except Exception as e:
logger.error(f"Cassandra连接失败: hosts={config['hosts']}, keyspace={config['keyspace']}, 错误={str(e)}")
connection_time = time.time() - start_time
error_msg = str(e)
logger.error(f"❌ Cassandra连接失败: 连接时间={connection_time:.3f}")
logger.error(f"错误类型: {type(e).__name__}")
logger.error(f"错误详情: {error_msg}")
# 提供详细的诊断信息
if "connection refused" in error_msg.lower() or "unable to connect" in error_msg.lower():
logger.error("❌ 诊断无法连接到Cassandra服务器")
logger.error("🔧 建议检查:")
logger.error(" 1. Cassandra服务是否启动")
logger.error(" 2. 主机地址和端口是否正确")
logger.error(" 3. 网络防火墙是否阻挡连接")
elif "timeout" in error_msg.lower():
logger.error("❌ 诊断:连接超时")
logger.error("🔧 建议检查:")
logger.error(" 1. 网络延迟是否过高")
logger.error(" 2. Cassandra服务器负载是否过高")
logger.error(" 3. 增加连接超时时间")
elif "authentication" in error_msg.lower() or "unauthorized" in error_msg.lower():
logger.error("❌ 诊断:认证失败")
logger.error("🔧 建议检查:")
logger.error(" 1. 用户名和密码是否正确")
logger.error(" 2. 用户是否有访问该keyspace的权限")
elif "keyspace" in error_msg.lower():
logger.error("❌ 诊断Keyspace不存在")
logger.error("🔧 建议检查:")
logger.error(" 1. Keyspace名称是否正确")
logger.error(" 2. Keyspace是否已创建")
else:
logger.error("❌ 诊断:未知连接错误")
logger.error("🔧 建议:")
logger.error(" 1. 检查所有连接参数")
logger.error(" 2. 查看Cassandra服务器日志")
logger.error(" 3. 测试网络连通性")
return None, None
def execute_query(session, table, keys, fields, values, exclude_fields=None):
"""执行查询"""
try:
# 参数验证
if not keys or len(keys) == 0:
logger.error("Keys参数为空无法构建查询")
return []
if not values or len(values) == 0:
logger.error("Values参数为空无法构建查询")
return []
# 构建查询条件
quoted_values = [f"'{value}'" for value in values]
query_conditions = f"{keys[0]} IN ({', '.join(quoted_values)})"
@@ -826,14 +906,25 @@ def execute_mixed_query(pro_session, test_session, pro_config, test_config, keys
# 处理生产环境查询
if sharding_config.get('use_sharding_for_pro', False):
# 记录生产环境分表配置信息
pro_interval = sharding_config.get('pro_interval_seconds', 604800)
pro_table_count = sharding_config.get('pro_table_count', 14)
logger.info(f"=== 生产环境分表配置 ===")
logger.info(f"启用分表查询: True")
logger.info(f"时间间隔: {pro_interval}秒 ({pro_interval//86400}天)")
logger.info(f"分表数量: {pro_table_count}")
logger.info(f"基础表名: {pro_config['table']}")
pro_calculator = ShardingCalculator(
interval_seconds=sharding_config.get('pro_interval_seconds', 604800),
table_count=sharding_config.get('pro_table_count', 14)
interval_seconds=pro_interval,
table_count=pro_table_count
)
pro_shard_mapping, pro_failed_keys, pro_calc_stats = pro_calculator.get_all_shard_tables_for_keys(
pro_config['table'], values
)
logger.info(f"生产环境分表映射结果: 涉及{len(pro_shard_mapping)}张分表, 失败Key数量: {len(pro_failed_keys)}")
pro_data, pro_queried_tables, pro_error_tables = execute_sharding_query(
pro_session, pro_shard_mapping, keys, fields_to_compare, exclude_fields
)
@@ -850,6 +941,10 @@ def execute_mixed_query(pro_session, test_session, pro_config, test_config, keys
results['sharding_info']['calculation_stats'].update(pro_calc_stats)
else:
# 生产环境单表查询
logger.info(f"=== 生产环境单表配置 ===")
logger.info(f"启用分表查询: False")
logger.info(f"表名: {pro_config['table']}")
pro_data = execute_query(pro_session, pro_config['table'], keys, fields_to_compare, values, exclude_fields)
results['pro_data'] = pro_data
results['sharding_info']['pro_shards'] = {
@@ -859,14 +954,25 @@ def execute_mixed_query(pro_session, test_session, pro_config, test_config, keys
# 处理测试环境查询
if sharding_config.get('use_sharding_for_test', False):
# 记录测试环境分表配置信息
test_interval = sharding_config.get('test_interval_seconds', 604800)
test_table_count = sharding_config.get('test_table_count', 14)
logger.info(f"=== 测试环境分表配置 ===")
logger.info(f"启用分表查询: True")
logger.info(f"时间间隔: {test_interval}秒 ({test_interval//86400}天)")
logger.info(f"分表数量: {test_table_count}")
logger.info(f"基础表名: {test_config['table']}")
test_calculator = ShardingCalculator(
interval_seconds=sharding_config.get('test_interval_seconds', 604800),
table_count=sharding_config.get('test_table_count', 14)
interval_seconds=test_interval,
table_count=test_table_count
)
test_shard_mapping, test_failed_keys, test_calc_stats = test_calculator.get_all_shard_tables_for_keys(
test_config['table'], values
)
logger.info(f"测试环境分表映射结果: 涉及{len(test_shard_mapping)}张分表, 失败Key数量: {len(test_failed_keys)}")
test_data, test_queried_tables, test_error_tables = execute_sharding_query(
test_session, test_shard_mapping, keys, fields_to_compare, exclude_fields
)
@@ -886,6 +992,10 @@ def execute_mixed_query(pro_session, test_session, pro_config, test_config, keys
results['sharding_info']['calculation_stats'] = test_calc_stats
else:
# 测试环境单表查询
logger.info(f"=== 测试环境单表配置 ===")
logger.info(f"启用分表查询: False")
logger.info(f"表名: {test_config['table']}")
test_data = execute_query(test_session, test_config['table'], keys, fields_to_compare, values, exclude_fields)
results['test_data'] = test_data
results['sharding_info']['test_shards'] = {
@@ -1079,15 +1189,32 @@ def sharding_query_compare():
# 解析配置
pro_config = data.get('pro_config', DEFAULT_CONFIG['pro_config'])
test_config = data.get('test_config', DEFAULT_CONFIG['test_config'])
keys = data.get('keys', DEFAULT_CONFIG['keys'])
fields_to_compare = data.get('fields_to_compare', DEFAULT_CONFIG['fields_to_compare'])
exclude_fields = data.get('exclude_fields', DEFAULT_CONFIG['exclude_fields'])
# 从query_config中获取keys等参数
query_config = data.get('query_config', {})
keys = query_config.get('keys', DEFAULT_CONFIG['keys'])
fields_to_compare = query_config.get('fields_to_compare', DEFAULT_CONFIG['fields_to_compare'])
exclude_fields = query_config.get('exclude_fields', DEFAULT_CONFIG['exclude_fields'])
values = data.get('values', [])
sharding_config = data.get('sharding_config', {})
# 参数验证
if not values:
logger.warning("分表查询失败未提供查询key值")
return jsonify({'error': '请提供查询key值'}), 400
if not keys:
logger.warning("分表查询失败:未提供主键字段")
return jsonify({'error': '请提供主键字段'}), 400
# 添加详细的参数日志
logger.info(f"分表查询参数解析结果:")
logger.info(f" keys: {keys}")
logger.info(f" values数量: {len(values)}")
logger.info(f" fields_to_compare: {fields_to_compare}")
logger.info(f" exclude_fields: {exclude_fields}")
logger.info(f" sharding_config: {sharding_config}")
logger.info(f"分表查询配置:{len(values)}个key值生产表{pro_config['table']},测试表:{test_config['table']}")
@@ -1174,14 +1301,30 @@ def query_compare():
# 解析配置
pro_config = data.get('pro_config', DEFAULT_CONFIG['pro_config'])
test_config = data.get('test_config', DEFAULT_CONFIG['test_config'])
keys = data.get('keys', DEFAULT_CONFIG['keys'])
fields_to_compare = data.get('fields_to_compare', DEFAULT_CONFIG['fields_to_compare'])
exclude_fields = data.get('exclude_fields', DEFAULT_CONFIG['exclude_fields'])
# 从query_config中获取keys等参数
query_config = data.get('query_config', {})
keys = query_config.get('keys', DEFAULT_CONFIG['keys'])
fields_to_compare = query_config.get('fields_to_compare', DEFAULT_CONFIG['fields_to_compare'])
exclude_fields = query_config.get('exclude_fields', DEFAULT_CONFIG['exclude_fields'])
values = data.get('values', [])
# 参数验证
if not values:
logger.warning("查询失败未提供查询key值")
return jsonify({'error': '请提供查询key值'}), 400
if not keys:
logger.warning("查询失败:未提供主键字段")
return jsonify({'error': '请提供主键字段'}), 400
# 添加详细的参数日志
logger.info(f"单表查询参数解析结果:")
logger.info(f" keys: {keys}")
logger.info(f" values数量: {len(values)}")
logger.info(f" fields_to_compare: {fields_to_compare}")
logger.info(f" exclude_fields: {exclude_fields}")
logger.info(f"查询配置:{len(values)}个key值生产表{pro_config['table']},测试表:{test_config['table']}")