增加分表查询

This commit is contained in:
2025-08-03 18:50:24 +08:00
parent 7aee03b7b9
commit e1a566012d
3 changed files with 110 additions and 28 deletions

64
app.py
View File

@@ -1175,7 +1175,7 @@ def create_connection(config):
return None, None
def execute_query(session, table, keys, fields, values, exclude_fields=None):
"""执行查询"""
"""执行查询,支持单主键和复合主键"""
try:
# 参数验证
if not keys or len(keys) == 0:
@@ -1187,8 +1187,37 @@ def execute_query(session, table, keys, fields, values, exclude_fields=None):
return []
# 构建查询条件
quoted_values = [f"'{value}'" for value in values]
query_conditions = f"{keys[0]} IN ({', '.join(quoted_values)})"
if len(keys) == 1:
# 单主键查询(保持原有逻辑)
quoted_values = [f"'{value}'" for value in values]
query_conditions = f"{keys[0]} IN ({', '.join(quoted_values)})"
else:
# 复合主键查询
conditions = []
for value in values:
# 检查value是否包含复合主键分隔符
if isinstance(value, str) and ',' in value:
# 解析复合主键值
key_values = [v.strip() for v in value.split(',')]
if len(key_values) == len(keys):
# 构建单个复合主键条件: (key1='val1' AND key2='val2')
key_conditions = []
for i, (key, val) in enumerate(zip(keys, key_values)):
key_conditions.append(f"{key} = '{val}'")
conditions.append(f"({' AND '.join(key_conditions)})")
else:
logger.warning(f"复合主键值 '{value}' 的字段数量({len(key_values)})与主键字段数量({len(keys)})不匹配")
# 将其作为第一个主键的值处理
conditions.append(f"{keys[0]} = '{value}'")
else:
# 单值,作为第一个主键的值处理
conditions.append(f"{keys[0]} = '{value}'")
if conditions:
query_conditions = ' OR '.join(conditions)
else:
logger.error("无法构建有效的查询条件")
return []
# 确定要查询的字段
if fields:
@@ -1200,7 +1229,10 @@ def execute_query(session, table, keys, fields, values, exclude_fields=None):
# 记录查询SQL日志
logger.info(f"执行查询SQL: {query_sql}")
logger.info(f"查询参数: 表={table}, 字段={fields_str}, Key数量={len(values)}")
if len(keys) > 1:
logger.info(f"复合主键查询参数: 表={table}, 主键字段={keys}, 字段={fields_str}, Key数量={len(values)}")
else:
logger.info(f"单主键查询参数: 表={table}, 主键字段={keys[0]}, 字段={fields_str}, Key数量={len(values)}")
# 执行查询
start_time = time.time()
@@ -1367,15 +1399,29 @@ def execute_mixed_query(pro_session, test_session, pro_config, test_config, keys
return results
def compare_results(pro_data, test_data, keys, fields_to_compare, exclude_fields, values):
"""比较查询结果"""
"""比较查询结果,支持复合主键"""
differences = []
field_diff_count = {}
identical_results = [] # 存储相同的结果
def match_composite_key(row, composite_value, keys):
"""检查数据行是否匹配复合主键值"""
if len(keys) == 1:
# 单主键匹配
return getattr(row, keys[0]) == composite_value
else:
# 复合主键匹配
if isinstance(composite_value, str) and ',' in composite_value:
key_values = [v.strip() for v in composite_value.split(',')]
if len(key_values) == len(keys):
return all(str(getattr(row, key)) == key_val for key, key_val in zip(keys, key_values))
# 如果不是复合值,只匹配第一个主键
return getattr(row, keys[0]) == composite_value
for value in values:
# 查找表和测试表中该ID的相关数据
rows_pro = [row for row in pro_data if getattr(row, keys[0]) == value]
rows_test = [row for row in test_data if getattr(row, keys[0]) == value]
# 查找生产表和测试表中该主键值的相关数据
rows_pro = [row for row in pro_data if match_composite_key(row, value, keys)]
rows_test = [row for row in test_data if match_composite_key(row, value, keys)]
for row_pro in rows_pro:
# 在测试表中查找相同主键的行
@@ -2180,4 +2226,4 @@ def api_get_query_logs_by_history(history_id):
return jsonify({'success': False, 'error': str(e)}), 500
if __name__ == '__main__':
app.run(debug=True, port=5000)
app.run(debug=True, port=5001)