""" 数据查询引擎模块 ================ 本模块是BigDataTool的核心查询引擎,负责Cassandra数据库的高级查询功能。 核心功能: 1. 单表查询:标准的Cassandra CQL查询执行 2. 分表查询:基于TWCS策略的时间分表查询 3. 多主键查询:支持复合主键的复杂查询条件 4. 混合查询:生产环境分表+测试环境单表的组合查询 查询类型支持: - 单主键查询:WHERE key IN (val1, val2, val3) - 复合主键查询:WHERE (key1='val1' AND key2='val2') OR (key1='val3' AND key2='val4') - 分表查询:自动计算分表名称并并行查询多张表 - 字段过滤:支持指定查询字段和排除字段 分表查询特性: - 时间戳提取:从Key中智能提取时间戳信息 - 分表计算:基于TWCS策略计算目标分表 - 并行查询:同时查询多张分表以提高性能 - 错误容错:单个分表查询失败不影响整体结果 性能优化: - 查询时间监控:记录每个查询的执行时间 - 批量处理:支持大批量Key的高效查询 - 连接复用:优化数据库连接的使用 - 内存管理:大结果集的内存友好处理 作者:BigDataTool项目组 更新时间:2024年8月 """ import time import logging from .sharding import ShardingCalculator logger = logging.getLogger(__name__) def execute_query(session, table, keys, fields, values, exclude_fields=None): """ 执行Cassandra数据库查询,支持单主键和复合主键查询 本函数是查询引擎的核心,能够智能处理不同类型的主键查询: - 单主键:生成 WHERE key IN (val1, val2, val3) 查询 - 复合主键:生成 WHERE (key1='val1' AND key2='val2') OR ... 查询 Args: session: Cassandra数据库会话对象 table (str): 目标表名 keys (list): 主键字段名列表,如 ['id'] 或 ['docid', 'id'] fields (list): 要查询的字段列表,空列表表示查询所有字段 values (list): 查询值列表,复合主键值用逗号分隔 exclude_fields (list, optional): 要排除的字段列表 Returns: list: 查询结果列表,每个元素是一个Row对象 查询示例: # 单主键查询 execute_query(session, 'users', ['id'], ['name', 'email'], ['1', '2', '3']) # 生成SQL: SELECT name, email FROM users WHERE id IN ('1', '2', '3') # 复合主键查询 execute_query(session, 'orders', ['user_id', 'order_id'], ['*'], ['1,100', '2,200']) # 生成SQL: SELECT * FROM orders WHERE (user_id='1' AND order_id='100') OR (user_id='2' AND order_id='200') 错误处理: - 参数验证:检查keys和values是否为空 - SQL注入防护:对查询值进行适当转义 - 异常捕获:数据库错误时返回空列表 - 日志记录:记录查询SQL和执行统计 """ try: # 参数验证 if not keys or len(keys) == 0: logger.error("Keys参数为空,无法构建查询") return [] if not values or len(values) == 0: logger.error("Values参数为空,无法构建查询") return [] # 构建查询条件 if len(keys) == 1: # 单主键查询(保持原有逻辑) quoted_values = [f"'{value}'" for value in values] query_conditions = f"{keys[0]} IN ({', '.join(quoted_values)})" else: # 复合主键查询 conditions = [] for value in values: # 检查value是否包含复合主键分隔符 if isinstance(value, str) and ',' in value: # 解析复合主键值 key_values = [v.strip() for v in value.split(',')] if len(key_values) == len(keys): # 构建单个复合主键条件: (key1='val1' AND key2='val2') key_conditions = [] for i, (key, val) in enumerate(zip(keys, key_values)): key_conditions.append(f"{key} = '{val}'") conditions.append(f"({' AND '.join(key_conditions)})") else: logger.warning(f"复合主键值 '{value}' 的字段数量({len(key_values)})与主键字段数量({len(keys)})不匹配") # 将其作为第一个主键的值处理 conditions.append(f"{keys[0]} = '{value}'") else: # 单值,作为第一个主键的值处理 conditions.append(f"{keys[0]} = '{value}'") if conditions: query_conditions = ' OR '.join(conditions) else: logger.error("无法构建有效的查询条件") return [] # 确定要查询的字段 if fields: fields_str = ", ".join(fields) else: fields_str = "*" query_sql = f"SELECT {fields_str} FROM {table} WHERE {query_conditions};" # 记录查询SQL日志 logger.info(f"执行查询SQL: {query_sql}") if len(keys) > 1: logger.info(f"复合主键查询参数: 表={table}, 主键字段={keys}, 字段={fields_str}, Key数量={len(values)}") else: logger.info(f"单主键查询参数: 表={table}, 主键字段={keys[0]}, 字段={fields_str}, Key数量={len(values)}") # 执行查询 start_time = time.time() result = session.execute(query_sql) execution_time = time.time() - start_time result_list = list(result) if result else [] logger.info(f"查询完成: 执行时间={execution_time:.3f}秒, 返回记录数={len(result_list)}") return result_list except Exception as e: logger.error(f"查询执行失败: SQL={query_sql if 'query_sql' in locals() else 'N/A'}, 错误={str(e)}") return [] def execute_sharding_query(session, shard_mapping, keys, fields, exclude_fields=None): """ 执行分表查询 :param session: Cassandra会话 :param shard_mapping: 分表映射 {table_name: [keys]} :param keys: 主键字段名列表 :param fields: 要查询的字段列表 :param exclude_fields: 要排除的字段列表 :return: (查询结果列表, 查询到的表列表, 查询失败的表列表) """ all_results = [] queried_tables = [] error_tables = [] logger.info(f"开始执行分表查询,涉及 {len(shard_mapping)} 张分表") total_start_time = time.time() for table_name, table_keys in shard_mapping.items(): try: logger.info(f"查询分表 {table_name},包含 {len(table_keys)} 个key: {table_keys}") # 为每个分表执行查询 table_results = execute_query(session, table_name, keys, fields, table_keys, exclude_fields) all_results.extend(table_results) queried_tables.append(table_name) logger.info(f"分表 {table_name} 查询成功,返回 {len(table_results)} 条记录") except Exception as e: logger.error(f"分表 {table_name} 查询失败: {e}") error_tables.append(table_name) total_execution_time = time.time() - total_start_time logger.info(f"分表查询总计完成: 执行时间={total_execution_time:.3f}秒, 成功表数={len(queried_tables)}, 失败表数={len(error_tables)}, 总记录数={len(all_results)}") return all_results, queried_tables, error_tables def execute_mixed_query(pro_session, test_session, pro_config, test_config, keys, fields_to_compare, values, exclude_fields, sharding_config): """ 执行混合查询(生产环境分表,测试环境可能单表或分表) """ results = { 'pro_data': [], 'test_data': [], 'sharding_info': { 'calculation_stats': {} } } # 处理生产环境查询 if sharding_config.get('use_sharding_for_pro', False): # 获取生产环境分表配置参数,优先使用专用参数,否则使用通用参数 pro_interval = sharding_config.get('pro_interval_seconds') or sharding_config.get('interval_seconds', 604800) pro_table_count = sharding_config.get('pro_table_count') or sharding_config.get('table_count', 14) # 记录生产环境分表配置信息 logger.info(f"=== 生产环境分表配置 ===") logger.info(f"启用分表查询: True") logger.info(f"时间间隔: {pro_interval}秒 ({pro_interval//86400}天)") logger.info(f"分表数量: {pro_table_count}张") logger.info(f"基础表名: {pro_config['table']}") pro_calculator = ShardingCalculator( interval_seconds=pro_interval, table_count=pro_table_count ) pro_shard_mapping, pro_failed_keys, pro_calc_stats = pro_calculator.get_all_shard_tables_for_keys( pro_config['table'], values ) logger.info(f"生产环境分表映射结果: 涉及{len(pro_shard_mapping)}张分表, 失败Key数量: {len(pro_failed_keys)}") pro_data, pro_queried_tables, pro_error_tables = execute_sharding_query( pro_session, pro_shard_mapping, keys, fields_to_compare, exclude_fields ) results['pro_data'] = pro_data results['sharding_info']['pro_shards'] = { 'enabled': True, 'interval_seconds': sharding_config.get('pro_interval_seconds', 604800), 'table_count': sharding_config.get('pro_table_count', 14), 'queried_tables': pro_queried_tables, 'error_tables': pro_error_tables, 'failed_keys': pro_failed_keys } results['sharding_info']['calculation_stats'].update(pro_calc_stats) else: # 生产环境单表查询 logger.info(f"=== 生产环境单表配置 ===") logger.info(f"启用分表查询: False") logger.info(f"表名: {pro_config['table']}") pro_data = execute_query(pro_session, pro_config['table'], keys, fields_to_compare, values, exclude_fields) results['pro_data'] = pro_data results['sharding_info']['pro_shards'] = { 'enabled': False, 'queried_tables': [pro_config['table']] } # 处理测试环境查询 if sharding_config.get('use_sharding_for_test', False): # 获取测试环境分表配置参数,优先使用专用参数,否则使用通用参数 test_interval = sharding_config.get('test_interval_seconds') or sharding_config.get('interval_seconds', 604800) test_table_count = sharding_config.get('test_table_count') or sharding_config.get('table_count', 14) # 记录测试环境分表配置信息 logger.info(f"=== 测试环境分表配置 ===") logger.info(f"启用分表查询: True") logger.info(f"时间间隔: {test_interval}秒 ({test_interval//86400}天)") logger.info(f"分表数量: {test_table_count}张") logger.info(f"基础表名: {test_config['table']}") test_calculator = ShardingCalculator( interval_seconds=test_interval, table_count=test_table_count ) test_shard_mapping, test_failed_keys, test_calc_stats = test_calculator.get_all_shard_tables_for_keys( test_config['table'], values ) logger.info(f"测试环境分表映射结果: 涉及{len(test_shard_mapping)}张分表, 失败Key数量: {len(test_failed_keys)}") test_data, test_queried_tables, test_error_tables = execute_sharding_query( test_session, test_shard_mapping, keys, fields_to_compare, exclude_fields ) results['test_data'] = test_data results['sharding_info']['test_shards'] = { 'enabled': True, 'interval_seconds': test_interval, 'table_count': test_table_count, 'queried_tables': test_queried_tables, 'error_tables': test_error_tables, 'failed_keys': test_failed_keys } # 合并计算统计信息 if not results['sharding_info']['calculation_stats']: results['sharding_info']['calculation_stats'] = test_calc_stats else: # 测试环境单表查询 logger.info(f"=== 测试环境单表配置 ===") logger.info(f"启用分表查询: False") logger.info(f"表名: {test_config['table']}") test_data = execute_query(test_session, test_config['table'], keys, fields_to_compare, values, exclude_fields) results['test_data'] = test_data results['sharding_info']['test_shards'] = { 'enabled': False, 'queried_tables': [test_config['table']] } return results