完成基本分表查询

This commit is contained in:
2025-08-02 01:23:33 +08:00
parent 689328eeca
commit b7a05e56d0
4 changed files with 1087 additions and 28 deletions

471
app.py
View File

@@ -6,16 +6,147 @@ import os
import logging
import sqlite3
from datetime import datetime
import re
import concurrent.futures
import time
app = Flask(__name__)
# 配置日志
logging.basicConfig(level=logging.INFO)
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# 查询日志收集器
class QueryLogCollector:
def __init__(self, max_logs=1000):
self.logs = []
self.max_logs = max_logs
def add_log(self, level, message):
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
log_entry = {
'timestamp': timestamp,
'level': level,
'message': message
}
self.logs.append(log_entry)
# 保持日志数量在限制内
if len(self.logs) > self.max_logs:
self.logs.pop(0)
def get_logs(self, limit=None):
if limit:
return self.logs[-limit:]
return self.logs
def clear_logs(self):
self.logs.clear()
# 全局日志收集器实例
query_log_collector = QueryLogCollector()
# 自定义日志处理器
class CollectorHandler(logging.Handler):
def __init__(self, collector):
super().__init__()
self.collector = collector
def emit(self, record):
self.collector.add_log(record.levelname, record.getMessage())
# 添加收集器处理器到logger
collector_handler = CollectorHandler(query_log_collector)
logger.addHandler(collector_handler)
# 数据库配置
DATABASE_PATH = 'config_groups.db'
class ShardingCalculator:
"""分表计算器基于TWCS策略"""
def __init__(self, interval_seconds=604800, table_count=14):
"""
初始化分表计算器
:param interval_seconds: 时间间隔(秒)默认604800(7天)
:param table_count: 分表数量默认14
"""
self.interval_seconds = interval_seconds
self.table_count = table_count
def extract_timestamp_from_key(self, key):
"""
从Key中提取时间戳
新规则:删除所有非数字字符,然后作为时间戳
"""
if not key:
return None
key_str = str(key)
# 删除所有非数字字符
numbers = re.sub(r'\D', '', key_str)
if not numbers:
logger.warning(f"Key '{key}' 中没有找到数字字符")
return None
try:
timestamp = int(numbers)
logger.info(f"Key '{key}' 提取到时间戳: {timestamp}")
return timestamp
except ValueError:
logger.error(f"Key '{key}' 数字转换失败: {numbers}")
return None
def calculate_shard_index(self, timestamp):
"""
计算分表索引
公式timestamp // interval_seconds % table_count
"""
if timestamp is None:
return None
return int(timestamp) // self.interval_seconds % self.table_count
def get_shard_table_name(self, base_table_name, key):
"""
根据Key获取对应的分表名称
"""
timestamp = self.extract_timestamp_from_key(key)
if timestamp is None:
return None
shard_index = self.calculate_shard_index(timestamp)
return f"{base_table_name}_{shard_index}"
def get_all_shard_tables_for_keys(self, base_table_name, keys):
"""
为一批Keys计算所有需要查询的分表
返回: {shard_table_name: [keys_for_this_shard], ...}
"""
shard_mapping = {}
failed_keys = []
calculation_stats = {
'total_keys': len(keys),
'successful_extractions': 0,
'failed_extractions': 0,
'unique_shards': 0
}
for key in keys:
shard_table = self.get_shard_table_name(base_table_name, key)
if shard_table:
if shard_table not in shard_mapping:
shard_mapping[shard_table] = []
shard_mapping[shard_table].append(key)
calculation_stats['successful_extractions'] += 1
else:
failed_keys.append(key)
calculation_stats['failed_extractions'] += 1
calculation_stats['unique_shards'] = len(shard_mapping)
return shard_mapping, failed_keys, calculation_stats
def init_database():
"""初始化数据库"""
try:
@@ -31,6 +162,7 @@ def init_database():
pro_config TEXT NOT NULL,
test_config TEXT NOT NULL,
query_config TEXT NOT NULL,
sharding_config TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
@@ -55,6 +187,21 @@ def init_database():
)
''')
# 创建分表配置组表
cursor.execute('''
CREATE TABLE IF NOT EXISTS sharding_config_groups (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE,
description TEXT,
pro_config TEXT NOT NULL,
test_config TEXT NOT NULL,
query_config TEXT NOT NULL,
sharding_config TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
conn.close()
logger.info("数据库初始化完成")
@@ -73,14 +220,29 @@ def ensure_database():
try:
conn = sqlite3.connect(DATABASE_PATH)
cursor = conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name IN ('config_groups', 'query_history')")
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name IN ('config_groups', 'query_history', 'sharding_config_groups')")
results = cursor.fetchall()
existing_tables = [row[0] for row in results]
if 'config_groups' not in existing_tables or 'query_history' not in existing_tables:
logger.info("数据库表不完整,正在重新创建...")
required_tables = ['config_groups', 'query_history', 'sharding_config_groups']
missing_tables = [table for table in required_tables if table not in existing_tables]
if missing_tables:
logger.info(f"数据库表不完整,缺少表:{missing_tables},正在重新创建...")
return init_database()
# 检查config_groups表是否有sharding_config字段
cursor.execute("PRAGMA table_info(config_groups)")
columns = cursor.fetchall()
column_names = [column[1] for column in columns]
if 'sharding_config' not in column_names:
logger.info("添加sharding_config字段到config_groups表...")
cursor.execute("ALTER TABLE config_groups ADD COLUMN sharding_config TEXT")
conn.commit()
logger.info("sharding_config字段添加成功")
conn.close()
return True
except Exception as e:
logger.error(f"检查数据库表失败: {e}")
@@ -298,7 +460,7 @@ DEFAULT_CONFIG = {
'exclude_fields': []
}
def save_config_group(name, description, pro_config, test_config, query_config):
def save_config_group(name, description, pro_config, test_config, query_config, sharding_config=None):
"""保存配置组"""
if not ensure_database():
logger.error("数据库初始化失败")
@@ -310,17 +472,18 @@ def save_config_group(name, description, pro_config, test_config, query_config):
try:
cursor.execute('''
INSERT OR REPLACE INTO config_groups
(name, description, pro_config, test_config, query_config, updated_at)
VALUES (?, ?, ?, ?, ?, ?)
(name, description, pro_config, test_config, query_config, sharding_config, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', (
name, description,
json.dumps(pro_config),
json.dumps(test_config),
json.dumps(query_config),
json.dumps(sharding_config) if sharding_config else None,
datetime.now().isoformat()
))
conn.commit()
logger.info(f"配置组 '{name}' 保存成功")
logger.info(f"配置组 '{name}' 保存成功,包含分表配置: {sharding_config is not None}")
return True
except Exception as e:
logger.error(f"保存配置组失败: {e}")
@@ -378,7 +541,7 @@ def get_config_group_by_id(group_id):
row = cursor.fetchone()
if row:
return {
config = {
'id': row['id'],
'name': row['name'],
'description': row['description'],
@@ -388,6 +551,30 @@ def get_config_group_by_id(group_id):
'created_at': row['created_at'],
'updated_at': row['updated_at']
}
# 添加分表配置(如果存在)
sharding_config_data = None
try:
# 尝试获取sharding_config字段
sharding_config_data = row[len(row) - 3] # sharding_config在倒数第三个位置
except (IndexError, KeyError):
# 如果字段不存在,尝试通过列名获取
try:
cursor.execute("PRAGMA table_info(config_groups)")
columns = cursor.fetchall()
column_names = [col[1] for col in columns]
if 'sharding_config' in column_names:
sharding_index = column_names.index('sharding_config')
sharding_config_data = row[sharding_index]
except:
pass
if sharding_config_data:
config['sharding_config'] = json.loads(sharding_config_data)
else:
config['sharding_config'] = None
return config
return None
except Exception as e:
logger.error(f"获取配置组详情失败: {e}")
@@ -555,11 +742,19 @@ def delete_query_history(history_id):
def create_connection(config):
"""创建Cassandra连接"""
try:
logger.info(f"正在连接Cassandra数据库: {config['hosts']}:{config['port']}, keyspace={config['keyspace']}")
start_time = time.time()
auth_provider = PlainTextAuthProvider(username=config['username'], password=config['password'])
cluster = Cluster(config['hosts'], port=config['port'], auth_provider=auth_provider)
session = cluster.connect(config['keyspace'])
connection_time = time.time() - start_time
logger.info(f"Cassandra连接成功: 连接时间={connection_time:.3f}秒, 集群={cluster.metadata.cluster_name}")
return cluster, session
except Exception as e:
logger.error(f"Cassandra连接失败: hosts={config['hosts']}, keyspace={config['keyspace']}, 错误={str(e)}")
return None, None
def execute_query(session, table, keys, fields, values, exclude_fields=None):
@@ -576,11 +771,141 @@ def execute_query(session, table, keys, fields, values, exclude_fields=None):
fields_str = "*"
query_sql = f"SELECT {fields_str} FROM {table} WHERE {query_conditions};"
# 记录查询SQL日志
logger.info(f"执行查询SQL: {query_sql}")
logger.info(f"查询参数: 表={table}, 字段={fields_str}, Key数量={len(values)}")
# 执行查询
start_time = time.time()
result = session.execute(query_sql)
return list(result) if result else []
execution_time = time.time() - start_time
result_list = list(result) if result else []
logger.info(f"查询完成: 执行时间={execution_time:.3f}秒, 返回记录数={len(result_list)}")
return result_list
except Exception as e:
logger.error(f"查询执行失败: SQL={query_sql if 'query_sql' in locals() else 'N/A'}, 错误={str(e)}")
return []
def execute_sharding_query(session, shard_mapping, keys, fields, exclude_fields=None):
"""
执行分表查询
:param session: Cassandra会话
:param shard_mapping: 分表映射 {table_name: [keys]}
:param keys: 主键字段名列表
:param fields: 要查询的字段列表
:param exclude_fields: 要排除的字段列表
:return: (查询结果列表, 查询到的表列表, 查询失败的表列表)
"""
all_results = []
queried_tables = []
error_tables = []
logger.info(f"开始执行分表查询,涉及 {len(shard_mapping)} 张分表")
total_start_time = time.time()
for table_name, table_keys in shard_mapping.items():
try:
logger.info(f"查询分表 {table_name},包含 {len(table_keys)} 个key: {table_keys}")
# 为每个分表执行查询
table_results = execute_query(session, table_name, keys, fields, table_keys, exclude_fields)
all_results.extend(table_results)
queried_tables.append(table_name)
logger.info(f"分表 {table_name} 查询成功,返回 {len(table_results)} 条记录")
except Exception as e:
logger.error(f"分表 {table_name} 查询失败: {e}")
error_tables.append(table_name)
total_execution_time = time.time() - total_start_time
logger.info(f"分表查询总计完成: 执行时间={total_execution_time:.3f}秒, 成功表数={len(queried_tables)}, 失败表数={len(error_tables)}, 总记录数={len(all_results)}")
return all_results, queried_tables, error_tables
def execute_mixed_query(pro_session, test_session, pro_config, test_config, keys, fields_to_compare, values, exclude_fields, sharding_config):
"""
执行混合查询(生产环境分表,测试环境可能单表或分表)
"""
results = {
'pro_data': [],
'test_data': [],
'sharding_info': {
'calculation_stats': {}
}
}
# 处理生产环境查询
if sharding_config.get('use_sharding_for_pro', False):
pro_calculator = ShardingCalculator(
interval_seconds=sharding_config.get('pro_interval_seconds', 604800),
table_count=sharding_config.get('pro_table_count', 14)
)
pro_shard_mapping, pro_failed_keys, pro_calc_stats = pro_calculator.get_all_shard_tables_for_keys(
pro_config['table'], values
)
pro_data, pro_queried_tables, pro_error_tables = execute_sharding_query(
pro_session, pro_shard_mapping, keys, fields_to_compare, exclude_fields
)
results['pro_data'] = pro_data
results['sharding_info']['pro_shards'] = {
'enabled': True,
'interval_seconds': sharding_config.get('pro_interval_seconds', 604800),
'table_count': sharding_config.get('pro_table_count', 14),
'queried_tables': pro_queried_tables,
'error_tables': pro_error_tables,
'failed_keys': pro_failed_keys
}
results['sharding_info']['calculation_stats'].update(pro_calc_stats)
else:
# 生产环境单表查询
pro_data = execute_query(pro_session, pro_config['table'], keys, fields_to_compare, values, exclude_fields)
results['pro_data'] = pro_data
results['sharding_info']['pro_shards'] = {
'enabled': False,
'queried_tables': [pro_config['table']]
}
# 处理测试环境查询
if sharding_config.get('use_sharding_for_test', False):
test_calculator = ShardingCalculator(
interval_seconds=sharding_config.get('test_interval_seconds', 604800),
table_count=sharding_config.get('test_table_count', 14)
)
test_shard_mapping, test_failed_keys, test_calc_stats = test_calculator.get_all_shard_tables_for_keys(
test_config['table'], values
)
test_data, test_queried_tables, test_error_tables = execute_sharding_query(
test_session, test_shard_mapping, keys, fields_to_compare, exclude_fields
)
results['test_data'] = test_data
results['sharding_info']['test_shards'] = {
'enabled': True,
'interval_seconds': sharding_config.get('test_interval_seconds', 604800),
'table_count': sharding_config.get('test_table_count', 14),
'queried_tables': test_queried_tables,
'error_tables': test_error_tables,
'failed_keys': test_failed_keys
}
# 合并计算统计信息
if not results['sharding_info']['calculation_stats']:
results['sharding_info']['calculation_stats'] = test_calc_stats
else:
# 测试环境单表查询
test_data = execute_query(test_session, test_config['table'], keys, fields_to_compare, values, exclude_fields)
results['test_data'] = test_data
results['sharding_info']['test_shards'] = {
'enabled': False,
'queried_tables': [test_config['table']]
}
return results
def compare_results(pro_data, test_data, keys, fields_to_compare, exclude_fields, values):
"""比较查询结果"""
differences = []
@@ -750,6 +1075,102 @@ def index():
def db_compare():
return render_template('db_compare.html')
@app.route('/api/sharding-query', methods=['POST'])
def sharding_query_compare():
"""分表查询比对API"""
try:
data = request.json
logger.info("开始执行分表数据库比对查询")
# 解析配置
pro_config = data.get('pro_config', DEFAULT_CONFIG['pro_config'])
test_config = data.get('test_config', DEFAULT_CONFIG['test_config'])
keys = data.get('keys', DEFAULT_CONFIG['keys'])
fields_to_compare = data.get('fields_to_compare', DEFAULT_CONFIG['fields_to_compare'])
exclude_fields = data.get('exclude_fields', DEFAULT_CONFIG['exclude_fields'])
values = data.get('values', [])
sharding_config = data.get('sharding_config', {})
if not values:
logger.warning("分表查询失败未提供查询key值")
return jsonify({'error': '请提供查询key值'}), 400
logger.info(f"分表查询配置:{len(values)}个key值生产表{pro_config['table']},测试表:{test_config['table']}")
# 创建数据库连接
pro_cluster, pro_session = create_connection(pro_config)
test_cluster, test_session = create_connection(test_config)
if not pro_session or not test_session:
logger.error("数据库连接失败")
return jsonify({'error': '数据库连接失败,请检查配置信息'}), 500
try:
# 执行混合查询(支持生产环境分表、测试环境单表/分表的组合)
logger.info("执行分表混合查询")
query_results = execute_mixed_query(
pro_session, test_session, pro_config, test_config,
keys, fields_to_compare, values, exclude_fields, sharding_config
)
pro_data = query_results['pro_data']
test_data = query_results['test_data']
sharding_info = query_results['sharding_info']
logger.info(f"分表查询结果:生产表 {len(pro_data)} 条记录,测试表 {len(test_data)} 条记录")
# 比较结果
differences, field_diff_count, identical_results = compare_results(
pro_data, test_data, keys, fields_to_compare, exclude_fields, values
)
# 统计信息
different_ids = set()
for diff in differences:
if 'field' in diff:
different_ids.add(list(diff['key'].values())[0])
non_different_ids = set(values) - different_ids
# 生成比较总结
summary = generate_comparison_summary(
len(values), len(pro_data), len(test_data),
differences, identical_results, field_diff_count
)
result = {
'total_keys': len(values),
'pro_count': len(pro_data),
'test_count': len(test_data),
'differences': differences,
'identical_results': identical_results,
'field_diff_count': field_diff_count,
'different_ids': list(different_ids),
'non_different_ids': list(non_different_ids),
'summary': summary,
'sharding_info': sharding_info, # 包含分表查询信息
'raw_pro_data': [dict(row._asdict()) for row in pro_data] if pro_data else [],
'raw_test_data': [dict(row._asdict()) for row in test_data] if test_data else []
}
logger.info(f"分表比对完成:发现 {len(differences)} 处差异")
return jsonify(result)
except Exception as e:
logger.error(f"分表查询执行失败:{str(e)}")
return jsonify({'error': f'分表查询执行失败:{str(e)}'}), 500
finally:
# 关闭连接
if pro_cluster:
pro_cluster.shutdown()
if test_cluster:
test_cluster.shutdown()
except Exception as e:
logger.error(f"分表查询请求处理失败:{str(e)}")
return jsonify({'error': f'分表查询请求处理失败:{str(e)}'}), 500
@app.route('/api/query', methods=['POST'])
def query_compare():
try:
@@ -890,10 +1311,13 @@ def api_save_config_group():
'exclude_fields': data.get('exclude_fields', [])
}
# 提取分表配置
sharding_config = data.get('sharding_config')
if not name:
return jsonify({'success': False, 'error': '配置组名称不能为空'}), 400
success = save_config_group(name, description, pro_config, test_config, query_config)
success = save_config_group(name, description, pro_config, test_config, query_config, sharding_config)
if success:
return jsonify({'success': True, 'message': '配置组保存成功'})
@@ -995,5 +1419,30 @@ def api_delete_query_history(history_id):
else:
return jsonify({'success': False, 'error': '查询历史记录删除失败'}), 500
@app.route('/api/query-logs', methods=['GET'])
def api_get_query_logs():
"""获取查询日志"""
try:
limit = request.args.get('limit', type=int)
logs = query_log_collector.get_logs(limit)
return jsonify({
'success': True,
'data': logs,
'total': len(query_log_collector.logs)
})
except Exception as e:
logger.error(f"获取查询日志失败: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/query-logs', methods=['DELETE'])
def api_clear_query_logs():
"""清空查询日志"""
try:
query_log_collector.clear_logs()
return jsonify({'success': True, 'message': '查询日志已清空'})
except Exception as e:
logger.error(f"清空查询日志失败: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
if __name__ == '__main__':
app.run(debug=True, port=5000)