295 lines
13 KiB
Python
295 lines
13 KiB
Python
"""
|
||
数据查询引擎模块
|
||
================
|
||
|
||
本模块是BigDataTool的核心查询引擎,负责Cassandra数据库的高级查询功能。
|
||
|
||
核心功能:
|
||
1. 单表查询:标准的Cassandra CQL查询执行
|
||
2. 分表查询:基于TWCS策略的时间分表查询
|
||
3. 多主键查询:支持复合主键的复杂查询条件
|
||
4. 混合查询:生产环境分表+测试环境单表的组合查询
|
||
|
||
查询类型支持:
|
||
- 单主键查询:WHERE key IN (val1, val2, val3)
|
||
- 复合主键查询:WHERE (key1='val1' AND key2='val2') OR (key1='val3' AND key2='val4')
|
||
- 分表查询:自动计算分表名称并并行查询多张表
|
||
- 字段过滤:支持指定查询字段和排除字段
|
||
|
||
分表查询特性:
|
||
- 时间戳提取:从Key中智能提取时间戳信息
|
||
- 分表计算:基于TWCS策略计算目标分表
|
||
- 并行查询:同时查询多张分表以提高性能
|
||
- 错误容错:单个分表查询失败不影响整体结果
|
||
|
||
性能优化:
|
||
- 查询时间监控:记录每个查询的执行时间
|
||
- 批量处理:支持大批量Key的高效查询
|
||
- 连接复用:优化数据库连接的使用
|
||
- 内存管理:大结果集的内存友好处理
|
||
|
||
作者:BigDataTool项目组
|
||
更新时间:2024年8月
|
||
"""
|
||
|
||
import time
|
||
import logging
|
||
from .sharding import ShardingCalculator
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
def execute_query(session, table, keys, fields, values, exclude_fields=None):
|
||
"""
|
||
执行Cassandra数据库查询,支持单主键和复合主键查询
|
||
|
||
本函数是查询引擎的核心,能够智能处理不同类型的主键查询:
|
||
- 单主键:生成 WHERE key IN (val1, val2, val3) 查询
|
||
- 复合主键:生成 WHERE (key1='val1' AND key2='val2') OR ... 查询
|
||
|
||
Args:
|
||
session: Cassandra数据库会话对象
|
||
table (str): 目标表名
|
||
keys (list): 主键字段名列表,如 ['id'] 或 ['docid', 'id']
|
||
fields (list): 要查询的字段列表,空列表表示查询所有字段
|
||
values (list): 查询值列表,复合主键值用逗号分隔
|
||
exclude_fields (list, optional): 要排除的字段列表
|
||
|
||
Returns:
|
||
list: 查询结果列表,每个元素是一个Row对象
|
||
|
||
查询示例:
|
||
# 单主键查询
|
||
execute_query(session, 'users', ['id'], ['name', 'email'], ['1', '2', '3'])
|
||
# 生成SQL: SELECT name, email FROM users WHERE id IN ('1', '2', '3')
|
||
|
||
# 复合主键查询
|
||
execute_query(session, 'orders', ['user_id', 'order_id'], ['*'], ['1,100', '2,200'])
|
||
# 生成SQL: SELECT * FROM orders WHERE (user_id='1' AND order_id='100') OR (user_id='2' AND order_id='200')
|
||
|
||
错误处理:
|
||
- 参数验证:检查keys和values是否为空
|
||
- SQL注入防护:对查询值进行适当转义
|
||
- 异常捕获:数据库错误时返回空列表
|
||
- 日志记录:记录查询SQL和执行统计
|
||
"""
|
||
try:
|
||
# 参数验证
|
||
if not keys or len(keys) == 0:
|
||
logger.error("Keys参数为空,无法构建查询")
|
||
return []
|
||
|
||
if not values or len(values) == 0:
|
||
logger.error("Values参数为空,无法构建查询")
|
||
return []
|
||
|
||
# 构建查询条件
|
||
if len(keys) == 1:
|
||
# 单主键查询(保持原有逻辑)
|
||
quoted_values = [f"'{value}'" for value in values]
|
||
query_conditions = f"{keys[0]} IN ({', '.join(quoted_values)})"
|
||
else:
|
||
# 复合主键查询
|
||
conditions = []
|
||
for value in values:
|
||
# 检查value是否包含复合主键分隔符
|
||
if isinstance(value, str) and ',' in value:
|
||
# 解析复合主键值
|
||
key_values = [v.strip() for v in value.split(',')]
|
||
if len(key_values) == len(keys):
|
||
# 构建单个复合主键条件: (key1='val1' AND key2='val2')
|
||
key_conditions = []
|
||
for i, (key, val) in enumerate(zip(keys, key_values)):
|
||
key_conditions.append(f"{key} = '{val}'")
|
||
conditions.append(f"({' AND '.join(key_conditions)})")
|
||
else:
|
||
logger.warning(f"复合主键值 '{value}' 的字段数量({len(key_values)})与主键字段数量({len(keys)})不匹配")
|
||
# 将其作为第一个主键的值处理
|
||
conditions.append(f"{keys[0]} = '{value}'")
|
||
else:
|
||
# 单值,作为第一个主键的值处理
|
||
conditions.append(f"{keys[0]} = '{value}'")
|
||
|
||
if conditions:
|
||
query_conditions = ' OR '.join(conditions)
|
||
else:
|
||
logger.error("无法构建有效的查询条件")
|
||
return []
|
||
|
||
# 确定要查询的字段
|
||
if fields:
|
||
fields_str = ", ".join(fields)
|
||
else:
|
||
fields_str = "*"
|
||
|
||
query_sql = f"SELECT {fields_str} FROM {table} WHERE {query_conditions};"
|
||
|
||
# 记录查询SQL日志
|
||
logger.info(f"执行查询SQL: {query_sql}")
|
||
if len(keys) > 1:
|
||
logger.info(f"复合主键查询参数: 表={table}, 主键字段={keys}, 字段={fields_str}, Key数量={len(values)}")
|
||
else:
|
||
logger.info(f"单主键查询参数: 表={table}, 主键字段={keys[0]}, 字段={fields_str}, Key数量={len(values)}")
|
||
|
||
# 执行查询
|
||
start_time = time.time()
|
||
result = session.execute(query_sql)
|
||
execution_time = time.time() - start_time
|
||
|
||
result_list = list(result) if result else []
|
||
logger.info(f"查询完成: 执行时间={execution_time:.3f}秒, 返回记录数={len(result_list)}")
|
||
|
||
return result_list
|
||
except Exception as e:
|
||
logger.error(f"查询执行失败: SQL={query_sql if 'query_sql' in locals() else 'N/A'}, 错误={str(e)}")
|
||
return []
|
||
|
||
def execute_sharding_query(session, shard_mapping, keys, fields, exclude_fields=None):
|
||
"""
|
||
执行分表查询
|
||
:param session: Cassandra会话
|
||
:param shard_mapping: 分表映射 {table_name: [keys]}
|
||
:param keys: 主键字段名列表
|
||
:param fields: 要查询的字段列表
|
||
:param exclude_fields: 要排除的字段列表
|
||
:return: (查询结果列表, 查询到的表列表, 查询失败的表列表)
|
||
"""
|
||
all_results = []
|
||
queried_tables = []
|
||
error_tables = []
|
||
|
||
logger.info(f"开始执行分表查询,涉及 {len(shard_mapping)} 张分表")
|
||
total_start_time = time.time()
|
||
|
||
for table_name, table_keys in shard_mapping.items():
|
||
try:
|
||
logger.info(f"查询分表 {table_name},包含 {len(table_keys)} 个key: {table_keys}")
|
||
# 为每个分表执行查询
|
||
table_results = execute_query(session, table_name, keys, fields, table_keys, exclude_fields)
|
||
all_results.extend(table_results)
|
||
queried_tables.append(table_name)
|
||
logger.info(f"分表 {table_name} 查询成功,返回 {len(table_results)} 条记录")
|
||
except Exception as e:
|
||
logger.error(f"分表 {table_name} 查询失败: {e}")
|
||
error_tables.append(table_name)
|
||
|
||
total_execution_time = time.time() - total_start_time
|
||
logger.info(f"分表查询总计完成: 执行时间={total_execution_time:.3f}秒, 成功表数={len(queried_tables)}, 失败表数={len(error_tables)}, 总记录数={len(all_results)}")
|
||
|
||
return all_results, queried_tables, error_tables
|
||
|
||
def execute_mixed_query(pro_session, test_session, pro_config, test_config, keys, fields_to_compare, values, exclude_fields, sharding_config):
|
||
"""
|
||
执行混合查询(生产环境分表,测试环境可能单表或分表)
|
||
"""
|
||
results = {
|
||
'pro_data': [],
|
||
'test_data': [],
|
||
'sharding_info': {
|
||
'calculation_stats': {}
|
||
}
|
||
}
|
||
|
||
# 处理生产环境查询
|
||
if sharding_config.get('use_sharding_for_pro', False):
|
||
# 获取生产环境分表配置参数,优先使用专用参数,否则使用通用参数
|
||
pro_interval = sharding_config.get('pro_interval_seconds') or sharding_config.get('interval_seconds', 604800)
|
||
pro_table_count = sharding_config.get('pro_table_count') or sharding_config.get('table_count', 14)
|
||
|
||
# 记录生产环境分表配置信息
|
||
logger.info(f"=== 生产环境分表配置 ===")
|
||
logger.info(f"启用分表查询: True")
|
||
logger.info(f"时间间隔: {pro_interval}秒 ({pro_interval//86400}天)")
|
||
logger.info(f"分表数量: {pro_table_count}张")
|
||
logger.info(f"基础表名: {pro_config['table']}")
|
||
|
||
pro_calculator = ShardingCalculator(
|
||
interval_seconds=pro_interval,
|
||
table_count=pro_table_count
|
||
)
|
||
pro_shard_mapping, pro_failed_keys, pro_calc_stats = pro_calculator.get_all_shard_tables_for_keys(
|
||
pro_config['table'], values
|
||
)
|
||
|
||
logger.info(f"生产环境分表映射结果: 涉及{len(pro_shard_mapping)}张分表, 失败Key数量: {len(pro_failed_keys)}")
|
||
|
||
pro_data, pro_queried_tables, pro_error_tables = execute_sharding_query(
|
||
pro_session, pro_shard_mapping, keys, fields_to_compare, exclude_fields
|
||
)
|
||
|
||
results['pro_data'] = pro_data
|
||
results['sharding_info']['pro_shards'] = {
|
||
'enabled': True,
|
||
'interval_seconds': sharding_config.get('pro_interval_seconds', 604800),
|
||
'table_count': sharding_config.get('pro_table_count', 14),
|
||
'queried_tables': pro_queried_tables,
|
||
'error_tables': pro_error_tables,
|
||
'failed_keys': pro_failed_keys
|
||
}
|
||
results['sharding_info']['calculation_stats'].update(pro_calc_stats)
|
||
else:
|
||
# 生产环境单表查询
|
||
logger.info(f"=== 生产环境单表配置 ===")
|
||
logger.info(f"启用分表查询: False")
|
||
logger.info(f"表名: {pro_config['table']}")
|
||
|
||
pro_data = execute_query(pro_session, pro_config['table'], keys, fields_to_compare, values, exclude_fields)
|
||
results['pro_data'] = pro_data
|
||
results['sharding_info']['pro_shards'] = {
|
||
'enabled': False,
|
||
'queried_tables': [pro_config['table']]
|
||
}
|
||
|
||
# 处理测试环境查询
|
||
if sharding_config.get('use_sharding_for_test', False):
|
||
# 获取测试环境分表配置参数,优先使用专用参数,否则使用通用参数
|
||
test_interval = sharding_config.get('test_interval_seconds') or sharding_config.get('interval_seconds', 604800)
|
||
test_table_count = sharding_config.get('test_table_count') or sharding_config.get('table_count', 14)
|
||
|
||
# 记录测试环境分表配置信息
|
||
logger.info(f"=== 测试环境分表配置 ===")
|
||
logger.info(f"启用分表查询: True")
|
||
logger.info(f"时间间隔: {test_interval}秒 ({test_interval//86400}天)")
|
||
logger.info(f"分表数量: {test_table_count}张")
|
||
logger.info(f"基础表名: {test_config['table']}")
|
||
|
||
test_calculator = ShardingCalculator(
|
||
interval_seconds=test_interval,
|
||
table_count=test_table_count
|
||
)
|
||
test_shard_mapping, test_failed_keys, test_calc_stats = test_calculator.get_all_shard_tables_for_keys(
|
||
test_config['table'], values
|
||
)
|
||
|
||
logger.info(f"测试环境分表映射结果: 涉及{len(test_shard_mapping)}张分表, 失败Key数量: {len(test_failed_keys)}")
|
||
|
||
test_data, test_queried_tables, test_error_tables = execute_sharding_query(
|
||
test_session, test_shard_mapping, keys, fields_to_compare, exclude_fields
|
||
)
|
||
|
||
results['test_data'] = test_data
|
||
results['sharding_info']['test_shards'] = {
|
||
'enabled': True,
|
||
'interval_seconds': test_interval,
|
||
'table_count': test_table_count,
|
||
'queried_tables': test_queried_tables,
|
||
'error_tables': test_error_tables,
|
||
'failed_keys': test_failed_keys
|
||
}
|
||
|
||
# 合并计算统计信息
|
||
if not results['sharding_info']['calculation_stats']:
|
||
results['sharding_info']['calculation_stats'] = test_calc_stats
|
||
else:
|
||
# 测试环境单表查询
|
||
logger.info(f"=== 测试环境单表配置 ===")
|
||
logger.info(f"启用分表查询: False")
|
||
logger.info(f"表名: {test_config['table']}")
|
||
|
||
test_data = execute_query(test_session, test_config['table'], keys, fields_to_compare, values, exclude_fields)
|
||
results['test_data'] = test_data
|
||
results['sharding_info']['test_shards'] = {
|
||
'enabled': False,
|
||
'queried_tables': [test_config['table']]
|
||
}
|
||
|
||
return results |