Files
BigDataTool/modules/query_engine.py
2025-08-05 11:23:49 +08:00

295 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
数据查询引擎模块
================
本模块是BigDataTool的核心查询引擎负责Cassandra数据库的高级查询功能。
核心功能:
1. 单表查询标准的Cassandra CQL查询执行
2. 分表查询基于TWCS策略的时间分表查询
3. 多主键查询:支持复合主键的复杂查询条件
4. 混合查询:生产环境分表+测试环境单表的组合查询
查询类型支持:
- 单主键查询WHERE key IN (val1, val2, val3)
- 复合主键查询WHERE (key1='val1' AND key2='val2') OR (key1='val3' AND key2='val4')
- 分表查询:自动计算分表名称并并行查询多张表
- 字段过滤:支持指定查询字段和排除字段
分表查询特性:
- 时间戳提取从Key中智能提取时间戳信息
- 分表计算基于TWCS策略计算目标分表
- 并行查询:同时查询多张分表以提高性能
- 错误容错:单个分表查询失败不影响整体结果
性能优化:
- 查询时间监控:记录每个查询的执行时间
- 批量处理支持大批量Key的高效查询
- 连接复用:优化数据库连接的使用
- 内存管理:大结果集的内存友好处理
作者BigDataTool项目组
更新时间2024年8月
"""
import time
import logging
from .sharding import ShardingCalculator
logger = logging.getLogger(__name__)
def execute_query(session, table, keys, fields, values, exclude_fields=None):
"""
执行Cassandra数据库查询支持单主键和复合主键查询
本函数是查询引擎的核心,能够智能处理不同类型的主键查询:
- 单主键:生成 WHERE key IN (val1, val2, val3) 查询
- 复合主键:生成 WHERE (key1='val1' AND key2='val2') OR ... 查询
Args:
session: Cassandra数据库会话对象
table (str): 目标表名
keys (list): 主键字段名列表,如 ['id'] 或 ['docid', 'id']
fields (list): 要查询的字段列表,空列表表示查询所有字段
values (list): 查询值列表,复合主键值用逗号分隔
exclude_fields (list, optional): 要排除的字段列表
Returns:
list: 查询结果列表每个元素是一个Row对象
查询示例:
# 单主键查询
execute_query(session, 'users', ['id'], ['name', 'email'], ['1', '2', '3'])
# 生成SQL: SELECT name, email FROM users WHERE id IN ('1', '2', '3')
# 复合主键查询
execute_query(session, 'orders', ['user_id', 'order_id'], ['*'], ['1,100', '2,200'])
# 生成SQL: SELECT * FROM orders WHERE (user_id='1' AND order_id='100') OR (user_id='2' AND order_id='200')
错误处理:
- 参数验证检查keys和values是否为空
- SQL注入防护对查询值进行适当转义
- 异常捕获:数据库错误时返回空列表
- 日志记录记录查询SQL和执行统计
"""
try:
# 参数验证
if not keys or len(keys) == 0:
logger.error("Keys参数为空无法构建查询")
return []
if not values or len(values) == 0:
logger.error("Values参数为空无法构建查询")
return []
# 构建查询条件
if len(keys) == 1:
# 单主键查询(保持原有逻辑)
quoted_values = [f"'{value}'" for value in values]
query_conditions = f"{keys[0]} IN ({', '.join(quoted_values)})"
else:
# 复合主键查询
conditions = []
for value in values:
# 检查value是否包含复合主键分隔符
if isinstance(value, str) and ',' in value:
# 解析复合主键值
key_values = [v.strip() for v in value.split(',')]
if len(key_values) == len(keys):
# 构建单个复合主键条件: (key1='val1' AND key2='val2')
key_conditions = []
for i, (key, val) in enumerate(zip(keys, key_values)):
key_conditions.append(f"{key} = '{val}'")
conditions.append(f"({' AND '.join(key_conditions)})")
else:
logger.warning(f"复合主键值 '{value}' 的字段数量({len(key_values)})与主键字段数量({len(keys)})不匹配")
# 将其作为第一个主键的值处理
conditions.append(f"{keys[0]} = '{value}'")
else:
# 单值,作为第一个主键的值处理
conditions.append(f"{keys[0]} = '{value}'")
if conditions:
query_conditions = ' OR '.join(conditions)
else:
logger.error("无法构建有效的查询条件")
return []
# 确定要查询的字段
if fields:
fields_str = ", ".join(fields)
else:
fields_str = "*"
query_sql = f"SELECT {fields_str} FROM {table} WHERE {query_conditions};"
# 记录查询SQL日志
logger.info(f"执行查询SQL: {query_sql}")
if len(keys) > 1:
logger.info(f"复合主键查询参数: 表={table}, 主键字段={keys}, 字段={fields_str}, Key数量={len(values)}")
else:
logger.info(f"单主键查询参数: 表={table}, 主键字段={keys[0]}, 字段={fields_str}, Key数量={len(values)}")
# 执行查询
start_time = time.time()
result = session.execute(query_sql)
execution_time = time.time() - start_time
result_list = list(result) if result else []
logger.info(f"查询完成: 执行时间={execution_time:.3f}秒, 返回记录数={len(result_list)}")
return result_list
except Exception as e:
logger.error(f"查询执行失败: SQL={query_sql if 'query_sql' in locals() else 'N/A'}, 错误={str(e)}")
return []
def execute_sharding_query(session, shard_mapping, keys, fields, exclude_fields=None):
"""
执行分表查询
:param session: Cassandra会话
:param shard_mapping: 分表映射 {table_name: [keys]}
:param keys: 主键字段名列表
:param fields: 要查询的字段列表
:param exclude_fields: 要排除的字段列表
:return: (查询结果列表, 查询到的表列表, 查询失败的表列表)
"""
all_results = []
queried_tables = []
error_tables = []
logger.info(f"开始执行分表查询,涉及 {len(shard_mapping)} 张分表")
total_start_time = time.time()
for table_name, table_keys in shard_mapping.items():
try:
logger.info(f"查询分表 {table_name},包含 {len(table_keys)} 个key: {table_keys}")
# 为每个分表执行查询
table_results = execute_query(session, table_name, keys, fields, table_keys, exclude_fields)
all_results.extend(table_results)
queried_tables.append(table_name)
logger.info(f"分表 {table_name} 查询成功,返回 {len(table_results)} 条记录")
except Exception as e:
logger.error(f"分表 {table_name} 查询失败: {e}")
error_tables.append(table_name)
total_execution_time = time.time() - total_start_time
logger.info(f"分表查询总计完成: 执行时间={total_execution_time:.3f}秒, 成功表数={len(queried_tables)}, 失败表数={len(error_tables)}, 总记录数={len(all_results)}")
return all_results, queried_tables, error_tables
def execute_mixed_query(pro_session, test_session, pro_config, test_config, keys, fields_to_compare, values, exclude_fields, sharding_config):
"""
执行混合查询(生产环境分表,测试环境可能单表或分表)
"""
results = {
'pro_data': [],
'test_data': [],
'sharding_info': {
'calculation_stats': {}
}
}
# 处理生产环境查询
if sharding_config.get('use_sharding_for_pro', False):
# 获取生产环境分表配置参数,优先使用专用参数,否则使用通用参数
pro_interval = sharding_config.get('pro_interval_seconds') or sharding_config.get('interval_seconds', 604800)
pro_table_count = sharding_config.get('pro_table_count') or sharding_config.get('table_count', 14)
# 记录生产环境分表配置信息
logger.info(f"=== 生产环境分表配置 ===")
logger.info(f"启用分表查询: True")
logger.info(f"时间间隔: {pro_interval}秒 ({pro_interval//86400}天)")
logger.info(f"分表数量: {pro_table_count}")
logger.info(f"基础表名: {pro_config['table']}")
pro_calculator = ShardingCalculator(
interval_seconds=pro_interval,
table_count=pro_table_count
)
pro_shard_mapping, pro_failed_keys, pro_calc_stats = pro_calculator.get_all_shard_tables_for_keys(
pro_config['table'], values
)
logger.info(f"生产环境分表映射结果: 涉及{len(pro_shard_mapping)}张分表, 失败Key数量: {len(pro_failed_keys)}")
pro_data, pro_queried_tables, pro_error_tables = execute_sharding_query(
pro_session, pro_shard_mapping, keys, fields_to_compare, exclude_fields
)
results['pro_data'] = pro_data
results['sharding_info']['pro_shards'] = {
'enabled': True,
'interval_seconds': sharding_config.get('pro_interval_seconds', 604800),
'table_count': sharding_config.get('pro_table_count', 14),
'queried_tables': pro_queried_tables,
'error_tables': pro_error_tables,
'failed_keys': pro_failed_keys
}
results['sharding_info']['calculation_stats'].update(pro_calc_stats)
else:
# 生产环境单表查询
logger.info(f"=== 生产环境单表配置 ===")
logger.info(f"启用分表查询: False")
logger.info(f"表名: {pro_config['table']}")
pro_data = execute_query(pro_session, pro_config['table'], keys, fields_to_compare, values, exclude_fields)
results['pro_data'] = pro_data
results['sharding_info']['pro_shards'] = {
'enabled': False,
'queried_tables': [pro_config['table']]
}
# 处理测试环境查询
if sharding_config.get('use_sharding_for_test', False):
# 获取测试环境分表配置参数,优先使用专用参数,否则使用通用参数
test_interval = sharding_config.get('test_interval_seconds') or sharding_config.get('interval_seconds', 604800)
test_table_count = sharding_config.get('test_table_count') or sharding_config.get('table_count', 14)
# 记录测试环境分表配置信息
logger.info(f"=== 测试环境分表配置 ===")
logger.info(f"启用分表查询: True")
logger.info(f"时间间隔: {test_interval}秒 ({test_interval//86400}天)")
logger.info(f"分表数量: {test_table_count}")
logger.info(f"基础表名: {test_config['table']}")
test_calculator = ShardingCalculator(
interval_seconds=test_interval,
table_count=test_table_count
)
test_shard_mapping, test_failed_keys, test_calc_stats = test_calculator.get_all_shard_tables_for_keys(
test_config['table'], values
)
logger.info(f"测试环境分表映射结果: 涉及{len(test_shard_mapping)}张分表, 失败Key数量: {len(test_failed_keys)}")
test_data, test_queried_tables, test_error_tables = execute_sharding_query(
test_session, test_shard_mapping, keys, fields_to_compare, exclude_fields
)
results['test_data'] = test_data
results['sharding_info']['test_shards'] = {
'enabled': True,
'interval_seconds': test_interval,
'table_count': test_table_count,
'queried_tables': test_queried_tables,
'error_tables': test_error_tables,
'failed_keys': test_failed_keys
}
# 合并计算统计信息
if not results['sharding_info']['calculation_stats']:
results['sharding_info']['calculation_stats'] = test_calc_stats
else:
# 测试环境单表查询
logger.info(f"=== 测试环境单表配置 ===")
logger.info(f"启用分表查询: False")
logger.info(f"表名: {test_config['table']}")
test_data = execute_query(test_session, test_config['table'], keys, fields_to_compare, values, exclude_fields)
results['test_data'] = test_data
results['sharding_info']['test_shards'] = {
'enabled': False,
'queried_tables': [test_config['table']]
}
return results