diff --git a/app.py b/app.py index b4fc317..3d644a6 100644 --- a/app.py +++ b/app.py @@ -76,26 +76,46 @@ class ShardingCalculator: def extract_timestamp_from_key(self, key): """ 从Key中提取时间戳 - 新规则:删除所有非数字字符,然后作为时间戳 + 新规则:优先提取最后一个下划线后的数字,如果没有下划线则提取最后连续的数字部分 """ if not key: return None key_str = str(key) - # 删除所有非数字字符 - numbers = re.sub(r'\D', '', key_str) + # 方法1:如果包含下划线,尝试提取最后一个下划线后的部分 + if '_' in key_str: + parts = key_str.split('_') + last_part = parts[-1] + # 检查最后一部分是否为纯数字 + if last_part.isdigit(): + timestamp = int(last_part) + logger.info(f"Key '{key}' 通过下划线分割提取到时间戳: {timestamp}") + return timestamp - if not numbers: + # 方法2:使用正则表达式找到所有数字序列,取最后一个较长的 + number_sequences = re.findall(r'\d+', key_str) + + if not number_sequences: logger.warning(f"Key '{key}' 中没有找到数字字符") return None + # 如果有多个数字序列,优先选择最长的,如果长度相同则选择最后一个 + longest_sequence = max(number_sequences, key=len) + + # 如果最长的有多个,选择最后一个最长的 + max_length = len(longest_sequence) + last_longest = None + for seq in number_sequences: + if len(seq) == max_length: + last_longest = seq + try: - timestamp = int(numbers) - logger.info(f"Key '{key}' 提取到时间戳: {timestamp}") + timestamp = int(last_longest) + logger.info(f"Key '{key}' 通过数字序列提取到时间戳: {timestamp} (从序列 {number_sequences} 中选择)") return timestamp except ValueError: - logger.error(f"Key '{key}' 数字转换失败: {numbers}") + logger.error(f"Key '{key}' 时间戳转换失败: {last_longest}") return None def calculate_shard_index(self, timestamp): @@ -729,26 +749,86 @@ def delete_query_history(history_id): conn.close() def create_connection(config): - """创建Cassandra连接""" + """创建Cassandra连接,带有增强的错误诊断""" + start_time = time.time() + + logger.info(f"=== 开始创建Cassandra连接 ===") + logger.info(f"主机列表: {config.get('hosts', [])}") + logger.info(f"端口: {config.get('port', 9042)}") + logger.info(f"用户名: {config.get('username', 'N/A')}") + logger.info(f"Keyspace: {config.get('keyspace', 'N/A')}") + try: - logger.info(f"正在连接Cassandra数据库: {config['hosts']}:{config['port']}, keyspace={config['keyspace']}") - start_time = time.time() - + logger.info("正在创建认证提供者...") auth_provider = PlainTextAuthProvider(username=config['username'], password=config['password']) + + logger.info("正在创建集群连接...") cluster = Cluster(config['hosts'], port=config['port'], auth_provider=auth_provider) + + logger.info("正在连接到Keyspace...") session = cluster.connect(config['keyspace']) connection_time = time.time() - start_time - logger.info(f"Cassandra连接成功: 连接时间={connection_time:.3f}秒, 集群={cluster.metadata.cluster_name}") + logger.info(f"✅ Cassandra连接成功: 连接时间={connection_time:.3f}秒, 集群={cluster.metadata.cluster_name}") return cluster, session + except Exception as e: - logger.error(f"Cassandra连接失败: hosts={config['hosts']}, keyspace={config['keyspace']}, 错误={str(e)}") + connection_time = time.time() - start_time + error_msg = str(e) + + logger.error(f"❌ Cassandra连接失败: 连接时间={connection_time:.3f}秒") + logger.error(f"错误类型: {type(e).__name__}") + logger.error(f"错误详情: {error_msg}") + + # 提供详细的诊断信息 + if "connection refused" in error_msg.lower() or "unable to connect" in error_msg.lower(): + logger.error("❌ 诊断:无法连接到Cassandra服务器") + logger.error("🔧 建议检查:") + logger.error(" 1. Cassandra服务是否启动") + logger.error(" 2. 主机地址和端口是否正确") + logger.error(" 3. 网络防火墙是否阻挡连接") + + elif "timeout" in error_msg.lower(): + logger.error("❌ 诊断:连接超时") + logger.error("🔧 建议检查:") + logger.error(" 1. 网络延迟是否过高") + logger.error(" 2. Cassandra服务器负载是否过高") + logger.error(" 3. 增加连接超时时间") + + elif "authentication" in error_msg.lower() or "unauthorized" in error_msg.lower(): + logger.error("❌ 诊断:认证失败") + logger.error("🔧 建议检查:") + logger.error(" 1. 用户名和密码是否正确") + logger.error(" 2. 用户是否有访问该keyspace的权限") + + elif "keyspace" in error_msg.lower(): + logger.error("❌ 诊断:Keyspace不存在") + logger.error("🔧 建议检查:") + logger.error(" 1. Keyspace名称是否正确") + logger.error(" 2. Keyspace是否已创建") + + else: + logger.error("❌ 诊断:未知连接错误") + logger.error("🔧 建议:") + logger.error(" 1. 检查所有连接参数") + logger.error(" 2. 查看Cassandra服务器日志") + logger.error(" 3. 测试网络连通性") + return None, None def execute_query(session, table, keys, fields, values, exclude_fields=None): """执行查询""" try: + # 参数验证 + if not keys or len(keys) == 0: + logger.error("Keys参数为空,无法构建查询") + return [] + + if not values or len(values) == 0: + logger.error("Values参数为空,无法构建查询") + return [] + # 构建查询条件 quoted_values = [f"'{value}'" for value in values] query_conditions = f"{keys[0]} IN ({', '.join(quoted_values)})" @@ -826,14 +906,25 @@ def execute_mixed_query(pro_session, test_session, pro_config, test_config, keys # 处理生产环境查询 if sharding_config.get('use_sharding_for_pro', False): + # 记录生产环境分表配置信息 + pro_interval = sharding_config.get('pro_interval_seconds', 604800) + pro_table_count = sharding_config.get('pro_table_count', 14) + logger.info(f"=== 生产环境分表配置 ===") + logger.info(f"启用分表查询: True") + logger.info(f"时间间隔: {pro_interval}秒 ({pro_interval//86400}天)") + logger.info(f"分表数量: {pro_table_count}张") + logger.info(f"基础表名: {pro_config['table']}") + pro_calculator = ShardingCalculator( - interval_seconds=sharding_config.get('pro_interval_seconds', 604800), - table_count=sharding_config.get('pro_table_count', 14) + interval_seconds=pro_interval, + table_count=pro_table_count ) pro_shard_mapping, pro_failed_keys, pro_calc_stats = pro_calculator.get_all_shard_tables_for_keys( pro_config['table'], values ) + logger.info(f"生产环境分表映射结果: 涉及{len(pro_shard_mapping)}张分表, 失败Key数量: {len(pro_failed_keys)}") + pro_data, pro_queried_tables, pro_error_tables = execute_sharding_query( pro_session, pro_shard_mapping, keys, fields_to_compare, exclude_fields ) @@ -850,6 +941,10 @@ def execute_mixed_query(pro_session, test_session, pro_config, test_config, keys results['sharding_info']['calculation_stats'].update(pro_calc_stats) else: # 生产环境单表查询 + logger.info(f"=== 生产环境单表配置 ===") + logger.info(f"启用分表查询: False") + logger.info(f"表名: {pro_config['table']}") + pro_data = execute_query(pro_session, pro_config['table'], keys, fields_to_compare, values, exclude_fields) results['pro_data'] = pro_data results['sharding_info']['pro_shards'] = { @@ -859,14 +954,25 @@ def execute_mixed_query(pro_session, test_session, pro_config, test_config, keys # 处理测试环境查询 if sharding_config.get('use_sharding_for_test', False): + # 记录测试环境分表配置信息 + test_interval = sharding_config.get('test_interval_seconds', 604800) + test_table_count = sharding_config.get('test_table_count', 14) + logger.info(f"=== 测试环境分表配置 ===") + logger.info(f"启用分表查询: True") + logger.info(f"时间间隔: {test_interval}秒 ({test_interval//86400}天)") + logger.info(f"分表数量: {test_table_count}张") + logger.info(f"基础表名: {test_config['table']}") + test_calculator = ShardingCalculator( - interval_seconds=sharding_config.get('test_interval_seconds', 604800), - table_count=sharding_config.get('test_table_count', 14) + interval_seconds=test_interval, + table_count=test_table_count ) test_shard_mapping, test_failed_keys, test_calc_stats = test_calculator.get_all_shard_tables_for_keys( test_config['table'], values ) + logger.info(f"测试环境分表映射结果: 涉及{len(test_shard_mapping)}张分表, 失败Key数量: {len(test_failed_keys)}") + test_data, test_queried_tables, test_error_tables = execute_sharding_query( test_session, test_shard_mapping, keys, fields_to_compare, exclude_fields ) @@ -886,6 +992,10 @@ def execute_mixed_query(pro_session, test_session, pro_config, test_config, keys results['sharding_info']['calculation_stats'] = test_calc_stats else: # 测试环境单表查询 + logger.info(f"=== 测试环境单表配置 ===") + logger.info(f"启用分表查询: False") + logger.info(f"表名: {test_config['table']}") + test_data = execute_query(test_session, test_config['table'], keys, fields_to_compare, values, exclude_fields) results['test_data'] = test_data results['sharding_info']['test_shards'] = { @@ -1079,15 +1189,32 @@ def sharding_query_compare(): # 解析配置 pro_config = data.get('pro_config', DEFAULT_CONFIG['pro_config']) test_config = data.get('test_config', DEFAULT_CONFIG['test_config']) - keys = data.get('keys', DEFAULT_CONFIG['keys']) - fields_to_compare = data.get('fields_to_compare', DEFAULT_CONFIG['fields_to_compare']) - exclude_fields = data.get('exclude_fields', DEFAULT_CONFIG['exclude_fields']) + + # 从query_config中获取keys等参数 + query_config = data.get('query_config', {}) + keys = query_config.get('keys', DEFAULT_CONFIG['keys']) + fields_to_compare = query_config.get('fields_to_compare', DEFAULT_CONFIG['fields_to_compare']) + exclude_fields = query_config.get('exclude_fields', DEFAULT_CONFIG['exclude_fields']) + values = data.get('values', []) sharding_config = data.get('sharding_config', {}) + # 参数验证 if not values: logger.warning("分表查询失败:未提供查询key值") return jsonify({'error': '请提供查询key值'}), 400 + + if not keys: + logger.warning("分表查询失败:未提供主键字段") + return jsonify({'error': '请提供主键字段'}), 400 + + # 添加详细的参数日志 + logger.info(f"分表查询参数解析结果:") + logger.info(f" keys: {keys}") + logger.info(f" values数量: {len(values)}") + logger.info(f" fields_to_compare: {fields_to_compare}") + logger.info(f" exclude_fields: {exclude_fields}") + logger.info(f" sharding_config: {sharding_config}") logger.info(f"分表查询配置:{len(values)}个key值,生产表:{pro_config['table']},测试表:{test_config['table']}") @@ -1174,14 +1301,30 @@ def query_compare(): # 解析配置 pro_config = data.get('pro_config', DEFAULT_CONFIG['pro_config']) test_config = data.get('test_config', DEFAULT_CONFIG['test_config']) - keys = data.get('keys', DEFAULT_CONFIG['keys']) - fields_to_compare = data.get('fields_to_compare', DEFAULT_CONFIG['fields_to_compare']) - exclude_fields = data.get('exclude_fields', DEFAULT_CONFIG['exclude_fields']) + + # 从query_config中获取keys等参数 + query_config = data.get('query_config', {}) + keys = query_config.get('keys', DEFAULT_CONFIG['keys']) + fields_to_compare = query_config.get('fields_to_compare', DEFAULT_CONFIG['fields_to_compare']) + exclude_fields = query_config.get('exclude_fields', DEFAULT_CONFIG['exclude_fields']) + values = data.get('values', []) + # 参数验证 if not values: logger.warning("查询失败:未提供查询key值") return jsonify({'error': '请提供查询key值'}), 400 + + if not keys: + logger.warning("查询失败:未提供主键字段") + return jsonify({'error': '请提供主键字段'}), 400 + + # 添加详细的参数日志 + logger.info(f"单表查询参数解析结果:") + logger.info(f" keys: {keys}") + logger.info(f" values数量: {len(values)}") + logger.info(f" fields_to_compare: {fields_to_compare}") + logger.info(f" exclude_fields: {exclude_fields}") logger.info(f"查询配置:{len(values)}个key值,生产表:{pro_config['table']},测试表:{test_config['table']}")