Files
BigDataTool/app_original_backup.py
2025-08-04 09:14:27 +08:00

2230 lines
90 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask import Flask, render_template, request, jsonify, send_from_directory
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
import json
import os
import logging
import sqlite3
from datetime import datetime, timedelta
import re
import concurrent.futures
import time
app = Flask(__name__)
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# 数据库配置
DATABASE_PATH = 'config_groups.db'
# 查询日志收集器
class QueryLogCollector:
def __init__(self, max_logs=1000, db_path=None):
self.logs = [] # 内存中的日志缓存
self.max_logs = max_logs
self.current_batch_id = None
self.batch_counter = 0
self.current_query_type = 'single'
self.current_history_id = None # 当前关联的历史记录ID
self.db_path = db_path or DATABASE_PATH
def start_new_batch(self, query_type='single'):
"""开始新的查询批次"""
self.batch_counter += 1
self.current_batch_id = f"batch_{self.batch_counter}_{datetime.now().strftime('%H%M%S')}"
self.current_query_type = query_type
self.current_history_id = None # 重置历史记录ID
# 添加批次开始标记
self.add_log('INFO', f"=== 开始{query_type}查询批次 (ID: {self.current_batch_id}) ===", force_batch_id=self.current_batch_id)
return self.current_batch_id
def set_history_id(self, history_id):
"""设置当前批次关联的历史记录ID"""
self.current_history_id = history_id
if self.current_batch_id and history_id:
self.add_log('INFO', f"关联历史记录ID: {history_id}", force_batch_id=self.current_batch_id)
# 更新当前批次的所有日志记录的history_id
self._update_batch_history_id(self.current_batch_id, history_id)
def _update_batch_history_id(self, batch_id, history_id):
"""更新批次中所有日志的history_id"""
try:
conn = sqlite3.connect(self.db_path, timeout=30)
cursor = conn.cursor()
cursor.execute('''
UPDATE query_logs
SET history_id = ?
WHERE batch_id = ?
''', (history_id, batch_id))
conn.commit()
conn.close()
logger.info(f"已更新批次 {batch_id} 的历史记录关联到 {history_id}")
except Exception as e:
print(f"Warning: Failed to update batch history_id: {e}")
def end_current_batch(self):
"""结束当前查询批次"""
if self.current_batch_id:
self.add_log('INFO', f"=== 查询批次完成 (ID: {self.current_batch_id}) ===", force_batch_id=self.current_batch_id)
self.current_batch_id = None
self.current_history_id = None
def add_log(self, level, message, force_batch_id=None, force_query_type=None, force_history_id=None):
"""添加日志到内存和数据库"""
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
batch_id = force_batch_id or self.current_batch_id
query_type = force_query_type or self.current_query_type
history_id = force_history_id or self.current_history_id
log_entry = {
'timestamp': timestamp,
'level': level,
'message': message,
'batch_id': batch_id,
'query_type': query_type,
'history_id': history_id
}
# 添加到内存缓存
self.logs.append(log_entry)
if len(self.logs) > self.max_logs:
self.logs.pop(0)
# 保存到数据库
self._save_log_to_db(log_entry)
def _save_log_to_db(self, log_entry):
"""将日志保存到数据库"""
try:
conn = sqlite3.connect(self.db_path, timeout=30)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO query_logs (batch_id, history_id, timestamp, level, message, query_type)
VALUES (?, ?, ?, ?, ?, ?)
''', (
log_entry['batch_id'],
log_entry['history_id'],
log_entry['timestamp'],
log_entry['level'],
log_entry['message'],
log_entry['query_type']
))
conn.commit()
conn.close()
except Exception as e:
# 数据库写入失败时记录到控制台,但不影响程序运行
print(f"Warning: Failed to save log to database: {e}")
def get_logs(self, limit=None, from_db=True):
"""获取日志,支持从数据库或内存获取"""
if from_db:
return self._get_logs_from_db(limit)
else:
# 从内存获取
if limit:
return self.logs[-limit:]
return self.logs
def _get_logs_from_db(self, limit=None):
"""从数据库获取日志"""
try:
conn = sqlite3.connect(self.db_path, timeout=30)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
query = '''
SELECT batch_id, history_id, timestamp, level, message, query_type
FROM query_logs
ORDER BY id DESC
'''
if limit:
query += f' LIMIT {limit}'
cursor.execute(query)
rows = cursor.fetchall()
# 转换为字典格式并反转顺序(最新的在前)
logs = []
for row in reversed(rows):
logs.append({
'batch_id': row['batch_id'],
'history_id': row['history_id'],
'timestamp': row['timestamp'],
'level': row['level'],
'message': row['message'],
'query_type': row['query_type']
})
conn.close()
return logs
except Exception as e:
print(f"Warning: Failed to get logs from database: {e}")
# 如果数据库读取失败,返回内存中的日志
return self.get_logs(limit, from_db=False)
def _get_total_logs_count(self):
"""获取数据库中的日志总数"""
try:
conn = sqlite3.connect(self.db_path, timeout=30)
cursor = conn.cursor()
cursor.execute('SELECT COUNT(*) FROM query_logs')
count = cursor.fetchone()[0]
conn.close()
return count
except Exception as e:
print(f"Warning: Failed to get logs count from database: {e}")
return len(self.logs)
def get_logs_by_history_id(self, history_id):
"""根据历史记录ID获取相关日志"""
try:
conn = sqlite3.connect(self.db_path, timeout=30)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute('''
SELECT batch_id, history_id, timestamp, level, message, query_type
FROM query_logs
WHERE history_id = ?
ORDER BY id ASC
''', (history_id,))
rows = cursor.fetchall()
logs = []
for row in rows:
logs.append({
'batch_id': row['batch_id'],
'history_id': row['history_id'],
'timestamp': row['timestamp'],
'level': row['level'],
'message': row['message'],
'query_type': row['query_type']
})
conn.close()
return logs
except Exception as e:
print(f"Warning: Failed to get logs by history_id: {e}")
return []
def get_logs_grouped_by_batch(self, limit=None, from_db=True):
"""按批次分组获取日志"""
logs = self.get_logs(limit, from_db)
grouped_logs = {}
batch_order = []
for log in logs:
batch_id = log.get('batch_id', 'unknown')
if batch_id not in grouped_logs:
grouped_logs[batch_id] = []
batch_order.append(batch_id)
grouped_logs[batch_id].append(log)
# 返回按时间顺序排列的批次
return [(batch_id, grouped_logs[batch_id]) for batch_id in batch_order]
def clear_logs(self, clear_db=True):
"""清空日志"""
# 清空内存
self.logs.clear()
self.current_batch_id = None
self.batch_counter = 0
# 清空数据库
if clear_db:
try:
conn = sqlite3.connect(self.db_path, timeout=30)
cursor = conn.cursor()
cursor.execute('DELETE FROM query_logs')
conn.commit()
conn.close()
except Exception as e:
print(f"Warning: Failed to clear logs from database: {e}")
def cleanup_old_logs(self, days_to_keep=30):
"""清理旧日志,保留指定天数的日志"""
try:
conn = sqlite3.connect(self.db_path, timeout=30)
cursor = conn.cursor()
# 删除超过指定天数的日志
cutoff_date = datetime.now() - timedelta(days=days_to_keep)
cursor.execute('''
DELETE FROM query_logs
WHERE created_at < ?
''', (cutoff_date.strftime('%Y-%m-%d %H:%M:%S'),))
deleted_count = cursor.rowcount
conn.commit()
conn.close()
logger.info(f"清理了 {deleted_count} 条超过 {days_to_keep} 天的旧日志")
return deleted_count
except Exception as e:
logger.error(f"清理旧日志失败: {e}")
return 0
# 全局日志收集器实例
query_log_collector = QueryLogCollector()
# 自定义日志处理器
class CollectorHandler(logging.Handler):
def __init__(self, collector):
super().__init__()
self.collector = collector
def emit(self, record):
self.collector.add_log(record.levelname, record.getMessage())
# 添加收集器处理器到logger
collector_handler = CollectorHandler(query_log_collector)
logger.addHandler(collector_handler)
class ShardingCalculator:
"""分表计算器基于TWCS策略"""
def __init__(self, interval_seconds=604800, table_count=14):
"""
初始化分表计算器
:param interval_seconds: 时间间隔(秒)默认604800(7天)
:param table_count: 分表数量默认14
"""
self.interval_seconds = interval_seconds
self.table_count = table_count
def extract_timestamp_from_key(self, key):
"""
从Key中提取时间戳
新规则:优先提取最后一个下划线后的数字,如果没有下划线则提取最后连续的数字部分
"""
if not key:
return None
key_str = str(key)
# 方法1如果包含下划线尝试提取最后一个下划线后的部分
if '_' in key_str:
parts = key_str.split('_')
last_part = parts[-1]
# 检查最后一部分是否为纯数字
if last_part.isdigit():
timestamp = int(last_part)
logger.info(f"Key '{key}' 通过下划线分割提取到时间戳: {timestamp}")
return timestamp
# 方法2使用正则表达式找到所有数字序列取最后一个较长的
number_sequences = re.findall(r'\d+', key_str)
if not number_sequences:
logger.warning(f"Key '{key}' 中没有找到数字字符")
return None
# 如果有多个数字序列,优先选择最长的,如果长度相同则选择最后一个
longest_sequence = max(number_sequences, key=len)
# 如果最长的有多个,选择最后一个最长的
max_length = len(longest_sequence)
last_longest = None
for seq in number_sequences:
if len(seq) == max_length:
last_longest = seq
try:
timestamp = int(last_longest)
logger.info(f"Key '{key}' 通过数字序列提取到时间戳: {timestamp} (从序列 {number_sequences} 中选择)")
return timestamp
except ValueError:
logger.error(f"Key '{key}' 时间戳转换失败: {last_longest}")
return None
def calculate_shard_index(self, timestamp):
"""
计算分表索引
公式timestamp // interval_seconds % table_count
"""
if timestamp is None:
return None
return int(timestamp) // self.interval_seconds % self.table_count
def get_shard_table_name(self, base_table_name, key):
"""
根据Key获取对应的分表名称
"""
timestamp = self.extract_timestamp_from_key(key)
if timestamp is None:
return None
shard_index = self.calculate_shard_index(timestamp)
return f"{base_table_name}_{shard_index}"
def get_all_shard_tables_for_keys(self, base_table_name, keys):
"""
为一批Keys计算所有需要查询的分表
返回: {shard_table_name: [keys_for_this_shard], ...}
"""
shard_mapping = {}
failed_keys = []
calculation_stats = {
'total_keys': len(keys),
'successful_extractions': 0,
'failed_extractions': 0,
'unique_shards': 0
}
for key in keys:
shard_table = self.get_shard_table_name(base_table_name, key)
if shard_table:
if shard_table not in shard_mapping:
shard_mapping[shard_table] = []
shard_mapping[shard_table].append(key)
calculation_stats['successful_extractions'] += 1
else:
failed_keys.append(key)
calculation_stats['failed_extractions'] += 1
calculation_stats['unique_shards'] = len(shard_mapping)
return shard_mapping, failed_keys, calculation_stats
def init_database():
"""初始化数据库"""
try:
conn = sqlite3.connect(DATABASE_PATH)
cursor = conn.cursor()
# 创建配置组表
cursor.execute('''
CREATE TABLE IF NOT EXISTS config_groups (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE,
description TEXT,
pro_config TEXT NOT NULL,
test_config TEXT NOT NULL,
query_config TEXT NOT NULL,
sharding_config TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 创建查询历史表,包含分表配置字段
cursor.execute('''
CREATE TABLE IF NOT EXISTS query_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
description TEXT,
pro_config TEXT NOT NULL,
test_config TEXT NOT NULL,
query_config TEXT NOT NULL,
query_keys TEXT NOT NULL,
results_summary TEXT NOT NULL,
execution_time REAL NOT NULL,
total_keys INTEGER NOT NULL,
differences_count INTEGER NOT NULL,
identical_count INTEGER NOT NULL,
sharding_config TEXT,
query_type TEXT DEFAULT 'single',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 创建分表配置组表
cursor.execute('''
CREATE TABLE IF NOT EXISTS sharding_config_groups (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE,
description TEXT,
pro_config TEXT NOT NULL,
test_config TEXT NOT NULL,
query_config TEXT NOT NULL,
sharding_config TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 创建查询日志表
cursor.execute('''
CREATE TABLE IF NOT EXISTS query_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
batch_id TEXT NOT NULL,
history_id INTEGER,
timestamp TEXT NOT NULL,
level TEXT NOT NULL,
message TEXT NOT NULL,
query_type TEXT DEFAULT 'single',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (history_id) REFERENCES query_history (id) ON DELETE CASCADE
)
''')
# 创建索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_query_logs_batch_id ON query_logs(batch_id)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_query_logs_history_id ON query_logs(history_id)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_query_logs_timestamp ON query_logs(timestamp)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_query_logs_level ON query_logs(level)')
conn.commit()
conn.close()
logger.info("数据库初始化完成")
return True
except Exception as e:
logger.error(f"数据库初始化失败: {e}")
return False
def ensure_database():
"""确保数据库和表存在"""
if not os.path.exists(DATABASE_PATH):
logger.info("数据库文件不存在,正在创建...")
return init_database()
# 检查表是否存在
try:
conn = sqlite3.connect(DATABASE_PATH)
cursor = conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name IN ('config_groups', 'query_history', 'sharding_config_groups', 'query_logs')")
results = cursor.fetchall()
existing_tables = [row[0] for row in results]
required_tables = ['config_groups', 'query_history', 'sharding_config_groups', 'query_logs']
missing_tables = [table for table in required_tables if table not in existing_tables]
if missing_tables:
logger.info(f"数据库表不完整,缺少表:{missing_tables},正在重新创建...")
return init_database()
# 检查config_groups表是否有sharding_config字段
cursor.execute("PRAGMA table_info(config_groups)")
columns = cursor.fetchall()
column_names = [column[1] for column in columns]
if 'sharding_config' not in column_names:
logger.info("添加sharding_config字段到config_groups表...")
cursor.execute("ALTER TABLE config_groups ADD COLUMN sharding_config TEXT")
conn.commit()
logger.info("sharding_config字段添加成功")
# 检查query_history表是否有分表相关字段
cursor.execute("PRAGMA table_info(query_history)")
history_columns = cursor.fetchall()
history_column_names = [column[1] for column in history_columns]
if 'sharding_config' not in history_column_names:
logger.info("添加sharding_config字段到query_history表...")
cursor.execute("ALTER TABLE query_history ADD COLUMN sharding_config TEXT")
conn.commit()
logger.info("query_history表sharding_config字段添加成功")
if 'query_type' not in history_column_names:
logger.info("添加query_type字段到query_history表...")
cursor.execute("ALTER TABLE query_history ADD COLUMN query_type TEXT DEFAULT 'single'")
conn.commit()
logger.info("query_history表query_type字段添加成功")
# 添加查询结果数据存储字段
if 'raw_results' not in history_column_names:
logger.info("添加raw_results字段到query_history表...")
cursor.execute("ALTER TABLE query_history ADD COLUMN raw_results TEXT")
conn.commit()
logger.info("query_history表raw_results字段添加成功")
if 'differences_data' not in history_column_names:
logger.info("添加differences_data字段到query_history表...")
cursor.execute("ALTER TABLE query_history ADD COLUMN differences_data TEXT")
conn.commit()
logger.info("query_history表differences_data字段添加成功")
if 'identical_data' not in history_column_names:
logger.info("添加identical_data字段到query_history表...")
cursor.execute("ALTER TABLE query_history ADD COLUMN identical_data TEXT")
conn.commit()
logger.info("query_history表identical_data字段添加成功")
# 检查query_logs表是否存在history_id字段
cursor.execute("PRAGMA table_info(query_logs)")
logs_columns = cursor.fetchall()
logs_column_names = [column[1] for column in logs_columns]
if 'history_id' not in logs_column_names:
logger.info("添加history_id字段到query_logs表...")
cursor.execute("ALTER TABLE query_logs ADD COLUMN history_id INTEGER")
# 创建外键索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_query_logs_history_id ON query_logs(history_id)')
conn.commit()
logger.info("query_logs表history_id字段添加成功")
conn.close()
return True
except Exception as e:
logger.error(f"检查数据库表失败: {e}")
return init_database()
def get_db_connection():
"""获取数据库连接"""
conn = sqlite3.connect(DATABASE_PATH)
conn.row_factory = sqlite3.Row
return conn
def normalize_json_string(value):
"""标准化JSON字符串用于比较"""
if not isinstance(value, str):
return value
try:
# 尝试解析JSON
json_obj = json.loads(value)
# 如果是数组,需要进行特殊处理
if isinstance(json_obj, list):
# 尝试对数组元素进行标准化排序
normalized_array = normalize_json_array(json_obj)
return json.dumps(normalized_array, sort_keys=True, separators=(',', ':'))
else:
# 普通对象,直接序列化
return json.dumps(json_obj, sort_keys=True, separators=(',', ':'))
except (json.JSONDecodeError, TypeError):
# 如果不是JSON返回原值
return value
def normalize_json_array(json_array):
"""标准化JSON数组处理元素顺序问题"""
try:
normalized_elements = []
for element in json_array:
if isinstance(element, dict):
# 对字典元素进行标准化
normalized_elements.append(json.dumps(element, sort_keys=True, separators=(',', ':')))
elif isinstance(element, str):
# 如果是字符串尝试解析为JSON
try:
parsed_element = json.loads(element)
normalized_elements.append(json.dumps(parsed_element, sort_keys=True, separators=(',', ':')))
except:
normalized_elements.append(element)
else:
normalized_elements.append(element)
# 对标准化后的元素进行排序,确保顺序一致
normalized_elements.sort()
# 重新解析为对象数组
result_array = []
for element in normalized_elements:
if isinstance(element, str):
try:
result_array.append(json.loads(element))
except:
result_array.append(element)
else:
result_array.append(element)
return result_array
except Exception as e:
logger.warning(f"数组标准化失败: {e}")
return json_array
def is_json_array_field(value):
"""检查字段是否为JSON数组格式"""
if not isinstance(value, (str, list)):
return False
try:
if isinstance(value, str):
parsed = json.loads(value)
return isinstance(parsed, list)
elif isinstance(value, list):
# 检查是否为JSON字符串数组
if len(value) > 0 and isinstance(value[0], str):
try:
json.loads(value[0])
return True
except:
return False
return True
except:
return False
def compare_array_values(value1, value2):
"""专门用于比较数组类型的值"""
try:
# 处理字符串表示的数组
if isinstance(value1, str) and isinstance(value2, str):
try:
array1 = json.loads(value1)
array2 = json.loads(value2)
if isinstance(array1, list) and isinstance(array2, list):
return compare_json_arrays(array1, array2)
except:
pass
# 处理Python列表类型
elif isinstance(value1, list) and isinstance(value2, list):
return compare_json_arrays(value1, value2)
# 处理混合情况:一个是字符串数组,一个是列表
elif isinstance(value1, list) and isinstance(value2, str):
try:
array2 = json.loads(value2)
if isinstance(array2, list):
return compare_json_arrays(value1, array2)
except:
pass
elif isinstance(value1, str) and isinstance(value2, list):
try:
array1 = json.loads(value1)
if isinstance(array1, list):
return compare_json_arrays(array1, value2)
except:
pass
return False
except Exception as e:
logger.warning(f"数组比较失败: {e}")
return False
def compare_json_arrays(array1, array2):
"""比较两个JSON数组忽略元素顺序"""
try:
if len(array1) != len(array2):
return False
# 标准化两个数组
normalized_array1 = normalize_json_array(array1.copy())
normalized_array2 = normalize_json_array(array2.copy())
# 将标准化后的数组转换为可比较的格式
comparable1 = json.dumps(normalized_array1, sort_keys=True)
comparable2 = json.dumps(normalized_array2, sort_keys=True)
return comparable1 == comparable2
except Exception as e:
logger.warning(f"JSON数组比较失败: {e}")
return False
def format_json_for_display(value):
"""格式化JSON用于显示"""
if not isinstance(value, str):
return str(value)
try:
# 尝试解析JSON
json_obj = json.loads(value)
# 格式化显示(带缩进)
return json.dumps(json_obj, sort_keys=True, indent=2, ensure_ascii=False)
except (json.JSONDecodeError, TypeError):
# 如果不是JSON返回原值
return str(value)
def is_json_field(value):
"""检查字段是否为JSON格式"""
if not isinstance(value, str):
return False
try:
json.loads(value)
return True
except (json.JSONDecodeError, TypeError):
return False
def compare_values(value1, value2):
"""智能比较两个值支持JSON标准化和数组比较"""
# 首先检查是否为数组类型
if is_json_array_field(value1) or is_json_array_field(value2):
return compare_array_values(value1, value2)
# 如果两个值都是字符串尝试JSON标准化比较
if isinstance(value1, str) and isinstance(value2, str):
normalized_value1 = normalize_json_string(value1)
normalized_value2 = normalize_json_string(value2)
return normalized_value1 == normalized_value2
# 其他情况直接比较
return value1 == value2
# 默认配置(不显示敏感信息)
DEFAULT_CONFIG = {
'pro_config': {
'cluster_name': '',
'hosts': [],
'port': 9042,
'datacenter': '',
'username': '',
'password': '',
'keyspace': '',
'table': ''
},
'test_config': {
'cluster_name': '',
'hosts': [],
'port': 9042,
'datacenter': '',
'username': '',
'password': '',
'keyspace': '',
'table': ''
},
'keys': [],
'fields_to_compare': [],
'exclude_fields': []
}
def save_config_group(name, description, pro_config, test_config, query_config, sharding_config=None):
"""保存配置组"""
if not ensure_database():
logger.error("数据库初始化失败")
return False
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute('''
INSERT OR REPLACE INTO config_groups
(name, description, pro_config, test_config, query_config, sharding_config, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', (
name, description,
json.dumps(pro_config),
json.dumps(test_config),
json.dumps(query_config),
json.dumps(sharding_config) if sharding_config else None,
datetime.now().isoformat()
))
conn.commit()
logger.info(f"配置组 '{name}' 保存成功,包含分表配置: {sharding_config is not None}")
return True
except Exception as e:
logger.error(f"保存配置组失败: {e}")
return False
finally:
conn.close()
def get_config_groups():
"""获取所有配置组"""
if not ensure_database():
logger.error("数据库初始化失败")
return []
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute('''
SELECT id, name, description, created_at, updated_at
FROM config_groups
ORDER BY updated_at DESC
''')
rows = cursor.fetchall()
config_groups = []
for row in rows:
config_groups.append({
'id': row['id'],
'name': row['name'],
'description': row['description'],
'created_at': row['created_at'],
'updated_at': row['updated_at']
})
return config_groups
except Exception as e:
logger.error(f"获取配置组失败: {e}")
return []
finally:
conn.close()
def get_config_group_by_id(group_id):
"""根据ID获取配置组详情"""
if not ensure_database():
logger.error("数据库初始化失败")
return None
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute('''
SELECT id, name, description, pro_config, test_config, query_config,
sharding_config, created_at, updated_at
FROM config_groups WHERE id = ?
''', (group_id,))
row = cursor.fetchone()
if row:
config = {
'id': row['id'],
'name': row['name'],
'description': row['description'],
'pro_config': json.loads(row['pro_config']),
'test_config': json.loads(row['test_config']),
'query_config': json.loads(row['query_config']),
'created_at': row['created_at'],
'updated_at': row['updated_at']
}
# 添加分表配置
if row['sharding_config']:
try:
config['sharding_config'] = json.loads(row['sharding_config'])
except (json.JSONDecodeError, TypeError):
config['sharding_config'] = None
else:
config['sharding_config'] = None
return config
return None
except Exception as e:
logger.error(f"获取配置组详情失败: {e}")
return None
finally:
conn.close()
def delete_config_group(group_id):
"""删除配置组"""
if not ensure_database():
logger.error("数据库初始化失败")
return False
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute('DELETE FROM config_groups WHERE id = ?', (group_id,))
conn.commit()
success = cursor.rowcount > 0
if success:
logger.info(f"配置组ID {group_id} 删除成功")
return success
except Exception as e:
logger.error(f"删除配置组失败: {e}")
return False
finally:
conn.close()
def save_query_history(name, description, pro_config, test_config, query_config, query_keys,
results_summary, execution_time, total_keys, differences_count, identical_count,
sharding_config=None, query_type='single', raw_results=None, differences_data=None, identical_data=None):
"""保存查询历史记录支持分表查询和查询结果数据返回历史记录ID"""
if not ensure_database():
logger.error("数据库初始化失败")
return None
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute('''
INSERT INTO query_history
(name, description, pro_config, test_config, query_config, query_keys,
results_summary, execution_time, total_keys, differences_count, identical_count,
sharding_config, query_type, raw_results, differences_data, identical_data)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
name, description,
json.dumps(pro_config),
json.dumps(test_config),
json.dumps(query_config),
json.dumps(query_keys),
json.dumps(results_summary),
execution_time,
total_keys,
differences_count,
identical_count,
json.dumps(sharding_config) if sharding_config else None,
query_type,
json.dumps(raw_results) if raw_results else None,
json.dumps(differences_data) if differences_data else None,
json.dumps(identical_data) if identical_data else None
))
# 获取插入记录的ID
history_id = cursor.lastrowid
conn.commit()
logger.info(f"查询历史记录 '{name}' 保存成功,查询类型:{query_type}ID{history_id}")
return history_id
except Exception as e:
logger.error(f"保存查询历史记录失败: {e}")
return None
finally:
conn.close()
def get_query_history():
"""获取所有查询历史记录"""
if not ensure_database():
logger.error("数据库初始化失败")
return []
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute('''
SELECT id, name, description, execution_time, total_keys,
differences_count, identical_count, created_at, query_type
FROM query_history
ORDER BY created_at DESC
''')
rows = cursor.fetchall()
history_list = []
for row in rows:
# 获取列名列表以检查字段是否存在
column_names = [desc[0] for desc in cursor.description]
history_list.append({
'id': row['id'],
'name': row['name'],
'description': row['description'],
'execution_time': row['execution_time'],
'total_keys': row['total_keys'],
'differences_count': row['differences_count'],
'identical_count': row['identical_count'],
'created_at': row['created_at'],
'query_type': row['query_type'] if 'query_type' in column_names else 'single'
})
return history_list
except Exception as e:
logger.error(f"获取查询历史记录失败: {e}")
return []
finally:
conn.close()
def get_query_history_by_id(history_id):
"""根据ID获取查询历史记录详情"""
if not ensure_database():
logger.error("数据库初始化失败")
return None
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute('''
SELECT * FROM query_history WHERE id = ?
''', (history_id,))
row = cursor.fetchone()
if row:
# 获取列名列表以检查字段是否存在
column_names = [desc[0] for desc in cursor.description]
return {
'id': row['id'],
'name': row['name'],
'description': row['description'],
'pro_config': json.loads(row['pro_config']),
'test_config': json.loads(row['test_config']),
'query_config': json.loads(row['query_config']),
'query_keys': json.loads(row['query_keys']),
'results_summary': json.loads(row['results_summary']),
'execution_time': row['execution_time'],
'total_keys': row['total_keys'],
'differences_count': row['differences_count'],
'identical_count': row['identical_count'],
'created_at': row['created_at'],
# 处理新字段,保持向后兼容
'sharding_config': json.loads(row['sharding_config']) if 'sharding_config' in column_names and row['sharding_config'] else None,
'query_type': row['query_type'] if 'query_type' in column_names else 'single',
# 添加查询结果数据支持
'raw_results': json.loads(row['raw_results']) if 'raw_results' in column_names and row['raw_results'] else None,
'differences_data': json.loads(row['differences_data']) if 'differences_data' in column_names and row['differences_data'] else None,
'identical_data': json.loads(row['identical_data']) if 'identical_data' in column_names and row['identical_data'] else None
}
return None
except Exception as e:
logger.error(f"获取查询历史记录详情失败: {e}")
return None
finally:
conn.close()
def delete_query_history(history_id):
"""删除查询历史记录"""
if not ensure_database():
logger.error("数据库初始化失败")
return False
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute('DELETE FROM query_history WHERE id = ?', (history_id,))
conn.commit()
success = cursor.rowcount > 0
if success:
logger.info(f"查询历史记录ID {history_id} 删除成功")
return success
except Exception as e:
logger.error(f"删除查询历史记录失败: {e}")
return False
finally:
conn.close()
def create_connection(config):
"""创建Cassandra连接带有增强的错误诊断和容错机制"""
start_time = time.time()
logger.info(f"=== 开始创建Cassandra连接 ===")
logger.info(f"主机列表: {config.get('hosts', [])}")
logger.info(f"端口: {config.get('port', 9042)}")
logger.info(f"用户名: {config.get('username', 'N/A')}")
logger.info(f"Keyspace: {config.get('keyspace', 'N/A')}")
try:
logger.info("正在创建认证提供者...")
auth_provider = PlainTextAuthProvider(username=config['username'], password=config['password'])
logger.info("正在创建集群连接...")
# 设置连接池配置,提高容错性
from cassandra.policies import DCAwareRoundRobinPolicy
# 设置负载均衡策略,避免单点故障
load_balancing_policy = DCAwareRoundRobinPolicy(local_dc=config.get('datacenter', 'dc1'))
# 创建连接配置,增加容错参数
cluster = Cluster(
config['hosts'],
port=config['port'],
auth_provider=auth_provider,
load_balancing_policy=load_balancing_policy,
# 增加容错配置
protocol_version=4, # 使用稳定的协议版本
connect_timeout=15, # 连接超时
control_connection_timeout=15, # 控制连接超时
max_schema_agreement_wait=30 # schema同步等待时间
)
logger.info("正在连接到Keyspace...")
session = cluster.connect(config['keyspace'])
# 设置session级别的容错参数
session.default_timeout = 30 # 查询超时时间
connection_time = time.time() - start_time
logger.info(f"✅ Cassandra连接成功: 连接时间={connection_time:.3f}")
# 记录集群状态
try:
cluster_name = cluster.metadata.cluster_name or "Unknown"
logger.info(f" 集群名称: {cluster_name}")
# 记录可用主机状态
live_hosts = [str(host.address) for host in cluster.metadata.all_hosts() if host.is_up]
down_hosts = [str(host.address) for host in cluster.metadata.all_hosts() if not host.is_up]
logger.info(f" 可用节点: {live_hosts} ({len(live_hosts)}个)")
if down_hosts:
logger.warning(f" 故障节点: {down_hosts} ({len(down_hosts)}个)")
except Exception as meta_error:
logger.warning(f"无法获取集群元数据: {meta_error}")
return cluster, session
except Exception as e:
connection_time = time.time() - start_time
error_msg = str(e)
logger.error(f"❌ Cassandra连接失败: 连接时间={connection_time:.3f}")
logger.error(f"错误类型: {type(e).__name__}")
logger.error(f"错误详情: {error_msg}")
# 提供详细的诊断信息
if "connection refused" in error_msg.lower() or "unable to connect" in error_msg.lower():
logger.error("❌ 诊断无法连接到Cassandra服务器")
logger.error("🔧 建议检查:")
logger.error(" 1. Cassandra服务是否启动")
logger.error(" 2. 主机地址和端口是否正确")
logger.error(" 3. 网络防火墙是否阻挡连接")
elif "timeout" in error_msg.lower():
logger.error("❌ 诊断:连接超时")
logger.error("🔧 建议检查:")
logger.error(" 1. 网络延迟是否过高")
logger.error(" 2. Cassandra服务器负载是否过高")
logger.error(" 3. 增加连接超时时间")
elif "authentication" in error_msg.lower() or "unauthorized" in error_msg.lower():
logger.error("❌ 诊断:认证失败")
logger.error("🔧 建议检查:")
logger.error(" 1. 用户名和密码是否正确")
logger.error(" 2. 用户是否有访问该keyspace的权限")
elif "keyspace" in error_msg.lower():
logger.error("❌ 诊断Keyspace不存在")
logger.error("🔧 建议检查:")
logger.error(" 1. Keyspace名称是否正确")
logger.error(" 2. Keyspace是否已创建")
else:
logger.error("❌ 诊断:未知连接错误")
logger.error("🔧 建议:")
logger.error(" 1. 检查所有连接参数")
logger.error(" 2. 查看Cassandra服务器日志")
logger.error(" 3. 测试网络连通性")
return None, None
def execute_query(session, table, keys, fields, values, exclude_fields=None):
"""执行查询,支持单主键和复合主键"""
try:
# 参数验证
if not keys or len(keys) == 0:
logger.error("Keys参数为空无法构建查询")
return []
if not values or len(values) == 0:
logger.error("Values参数为空无法构建查询")
return []
# 构建查询条件
if len(keys) == 1:
# 单主键查询(保持原有逻辑)
quoted_values = [f"'{value}'" for value in values]
query_conditions = f"{keys[0]} IN ({', '.join(quoted_values)})"
else:
# 复合主键查询
conditions = []
for value in values:
# 检查value是否包含复合主键分隔符
if isinstance(value, str) and ',' in value:
# 解析复合主键值
key_values = [v.strip() for v in value.split(',')]
if len(key_values) == len(keys):
# 构建单个复合主键条件: (key1='val1' AND key2='val2')
key_conditions = []
for i, (key, val) in enumerate(zip(keys, key_values)):
key_conditions.append(f"{key} = '{val}'")
conditions.append(f"({' AND '.join(key_conditions)})")
else:
logger.warning(f"复合主键值 '{value}' 的字段数量({len(key_values)})与主键字段数量({len(keys)})不匹配")
# 将其作为第一个主键的值处理
conditions.append(f"{keys[0]} = '{value}'")
else:
# 单值,作为第一个主键的值处理
conditions.append(f"{keys[0]} = '{value}'")
if conditions:
query_conditions = ' OR '.join(conditions)
else:
logger.error("无法构建有效的查询条件")
return []
# 确定要查询的字段
if fields:
fields_str = ", ".join(fields)
else:
fields_str = "*"
query_sql = f"SELECT {fields_str} FROM {table} WHERE {query_conditions};"
# 记录查询SQL日志
logger.info(f"执行查询SQL: {query_sql}")
if len(keys) > 1:
logger.info(f"复合主键查询参数: 表={table}, 主键字段={keys}, 字段={fields_str}, Key数量={len(values)}")
else:
logger.info(f"单主键查询参数: 表={table}, 主键字段={keys[0]}, 字段={fields_str}, Key数量={len(values)}")
# 执行查询
start_time = time.time()
result = session.execute(query_sql)
execution_time = time.time() - start_time
result_list = list(result) if result else []
logger.info(f"查询完成: 执行时间={execution_time:.3f}秒, 返回记录数={len(result_list)}")
return result_list
except Exception as e:
logger.error(f"查询执行失败: SQL={query_sql if 'query_sql' in locals() else 'N/A'}, 错误={str(e)}")
return []
def execute_sharding_query(session, shard_mapping, keys, fields, exclude_fields=None):
"""
执行分表查询
:param session: Cassandra会话
:param shard_mapping: 分表映射 {table_name: [keys]}
:param keys: 主键字段名列表
:param fields: 要查询的字段列表
:param exclude_fields: 要排除的字段列表
:return: (查询结果列表, 查询到的表列表, 查询失败的表列表)
"""
all_results = []
queried_tables = []
error_tables = []
logger.info(f"开始执行分表查询,涉及 {len(shard_mapping)} 张分表")
total_start_time = time.time()
for table_name, table_keys in shard_mapping.items():
try:
logger.info(f"查询分表 {table_name},包含 {len(table_keys)} 个key: {table_keys}")
# 为每个分表执行查询
table_results = execute_query(session, table_name, keys, fields, table_keys, exclude_fields)
all_results.extend(table_results)
queried_tables.append(table_name)
logger.info(f"分表 {table_name} 查询成功,返回 {len(table_results)} 条记录")
except Exception as e:
logger.error(f"分表 {table_name} 查询失败: {e}")
error_tables.append(table_name)
total_execution_time = time.time() - total_start_time
logger.info(f"分表查询总计完成: 执行时间={total_execution_time:.3f}秒, 成功表数={len(queried_tables)}, 失败表数={len(error_tables)}, 总记录数={len(all_results)}")
return all_results, queried_tables, error_tables
def execute_mixed_query(pro_session, test_session, pro_config, test_config, keys, fields_to_compare, values, exclude_fields, sharding_config):
"""
执行混合查询(生产环境分表,测试环境可能单表或分表)
"""
results = {
'pro_data': [],
'test_data': [],
'sharding_info': {
'calculation_stats': {}
}
}
# 处理生产环境查询
if sharding_config.get('use_sharding_for_pro', False):
# 获取生产环境分表配置参数,优先使用专用参数,否则使用通用参数
pro_interval = sharding_config.get('pro_interval_seconds') or sharding_config.get('interval_seconds', 604800)
pro_table_count = sharding_config.get('pro_table_count') or sharding_config.get('table_count', 14)
# 记录生产环境分表配置信息
logger.info(f"=== 生产环境分表配置 ===")
logger.info(f"启用分表查询: True")
logger.info(f"时间间隔: {pro_interval}秒 ({pro_interval//86400}天)")
logger.info(f"分表数量: {pro_table_count}")
logger.info(f"基础表名: {pro_config['table']}")
pro_calculator = ShardingCalculator(
interval_seconds=pro_interval,
table_count=pro_table_count
)
pro_shard_mapping, pro_failed_keys, pro_calc_stats = pro_calculator.get_all_shard_tables_for_keys(
pro_config['table'], values
)
logger.info(f"生产环境分表映射结果: 涉及{len(pro_shard_mapping)}张分表, 失败Key数量: {len(pro_failed_keys)}")
pro_data, pro_queried_tables, pro_error_tables = execute_sharding_query(
pro_session, pro_shard_mapping, keys, fields_to_compare, exclude_fields
)
results['pro_data'] = pro_data
results['sharding_info']['pro_shards'] = {
'enabled': True,
'interval_seconds': sharding_config.get('pro_interval_seconds', 604800),
'table_count': sharding_config.get('pro_table_count', 14),
'queried_tables': pro_queried_tables,
'error_tables': pro_error_tables,
'failed_keys': pro_failed_keys
}
results['sharding_info']['calculation_stats'].update(pro_calc_stats)
else:
# 生产环境单表查询
logger.info(f"=== 生产环境单表配置 ===")
logger.info(f"启用分表查询: False")
logger.info(f"表名: {pro_config['table']}")
pro_data = execute_query(pro_session, pro_config['table'], keys, fields_to_compare, values, exclude_fields)
results['pro_data'] = pro_data
results['sharding_info']['pro_shards'] = {
'enabled': False,
'queried_tables': [pro_config['table']]
}
# 处理测试环境查询
if sharding_config.get('use_sharding_for_test', False):
# 获取测试环境分表配置参数,优先使用专用参数,否则使用通用参数
test_interval = sharding_config.get('test_interval_seconds') or sharding_config.get('interval_seconds', 604800)
test_table_count = sharding_config.get('test_table_count') or sharding_config.get('table_count', 14)
# 记录测试环境分表配置信息
logger.info(f"=== 测试环境分表配置 ===")
logger.info(f"启用分表查询: True")
logger.info(f"时间间隔: {test_interval}秒 ({test_interval//86400}天)")
logger.info(f"分表数量: {test_table_count}")
logger.info(f"基础表名: {test_config['table']}")
test_calculator = ShardingCalculator(
interval_seconds=test_interval,
table_count=test_table_count
)
test_shard_mapping, test_failed_keys, test_calc_stats = test_calculator.get_all_shard_tables_for_keys(
test_config['table'], values
)
logger.info(f"测试环境分表映射结果: 涉及{len(test_shard_mapping)}张分表, 失败Key数量: {len(test_failed_keys)}")
test_data, test_queried_tables, test_error_tables = execute_sharding_query(
test_session, test_shard_mapping, keys, fields_to_compare, exclude_fields
)
results['test_data'] = test_data
results['sharding_info']['test_shards'] = {
'enabled': True,
'interval_seconds': test_interval,
'table_count': test_table_count,
'queried_tables': test_queried_tables,
'error_tables': test_error_tables,
'failed_keys': test_failed_keys
}
# 合并计算统计信息
if not results['sharding_info']['calculation_stats']:
results['sharding_info']['calculation_stats'] = test_calc_stats
else:
# 测试环境单表查询
logger.info(f"=== 测试环境单表配置 ===")
logger.info(f"启用分表查询: False")
logger.info(f"表名: {test_config['table']}")
test_data = execute_query(test_session, test_config['table'], keys, fields_to_compare, values, exclude_fields)
results['test_data'] = test_data
results['sharding_info']['test_shards'] = {
'enabled': False,
'queried_tables': [test_config['table']]
}
return results
def compare_results(pro_data, test_data, keys, fields_to_compare, exclude_fields, values):
"""比较查询结果,支持复合主键"""
differences = []
field_diff_count = {}
identical_results = [] # 存储相同的结果
def match_composite_key(row, composite_value, keys):
"""检查数据行是否匹配复合主键值"""
if len(keys) == 1:
# 单主键匹配
return getattr(row, keys[0]) == composite_value
else:
# 复合主键匹配
if isinstance(composite_value, str) and ',' in composite_value:
key_values = [v.strip() for v in composite_value.split(',')]
if len(key_values) == len(keys):
return all(str(getattr(row, key)) == key_val for key, key_val in zip(keys, key_values))
# 如果不是复合值,只匹配第一个主键
return getattr(row, keys[0]) == composite_value
for value in values:
# 查找生产表和测试表中该主键值的相关数据
rows_pro = [row for row in pro_data if match_composite_key(row, value, keys)]
rows_test = [row for row in test_data if match_composite_key(row, value, keys)]
for row_pro in rows_pro:
# 在测试表中查找相同主键的行
row_test = next(
(row for row in rows_test if all(getattr(row, key) == getattr(row_pro, key) for key in keys)),
None
)
if row_test:
# 确定要比较的列
columns = fields_to_compare if fields_to_compare else row_pro._fields
columns = [col for col in columns if col not in exclude_fields]
has_difference = False
row_differences = []
identical_fields = {}
for column in columns:
value_pro = getattr(row_pro, column)
value_test = getattr(row_test, column)
# 使用智能比较函数
if not compare_values(value_pro, value_test):
has_difference = True
# 格式化显示值
formatted_pro_value = format_json_for_display(value_pro)
formatted_test_value = format_json_for_display(value_test)
row_differences.append({
'key': {key: getattr(row_pro, key) for key in keys},
'field': column,
'pro_value': formatted_pro_value,
'test_value': formatted_test_value,
'is_json': is_json_field(value_pro) or is_json_field(value_test),
'is_array': is_json_array_field(value_pro) or is_json_array_field(value_test)
})
# 统计字段差异次数
field_diff_count[column] = field_diff_count.get(column, 0) + 1
else:
# 存储相同的字段值
identical_fields[column] = format_json_for_display(value_pro)
if has_difference:
differences.extend(row_differences)
else:
# 如果没有差异,存储到相同结果中
identical_results.append({
'key': {key: getattr(row_pro, key) for key in keys},
'pro_fields': identical_fields,
'test_fields': {col: format_json_for_display(getattr(row_test, col)) for col in columns}
})
else:
# 在测试表中未找到对应行
differences.append({
'key': {key: getattr(row_pro, key) for key in keys},
'message': '在测试表中未找到该行'
})
# 检查测试表中是否有生产表中不存在的行
for row_test in rows_test:
row_pro = next(
(row for row in rows_pro if all(getattr(row, key) == getattr(row_test, key) for key in keys)),
None
)
if not row_pro:
differences.append({
'key': {key: getattr(row_test, key) for key in keys},
'message': '在生产表中未找到该行'
})
return differences, field_diff_count, identical_results
def generate_comparison_summary(total_keys, pro_count, test_count, differences, identical_results, field_diff_count):
"""生成比较总结报告"""
# 计算基本统计
different_records = len(set([list(diff['key'].values())[0] for diff in differences if 'field' in diff]))
identical_records = len(identical_results)
missing_in_test = len([diff for diff in differences if diff.get('message') == '在测试表中未找到该行'])
missing_in_pro = len([diff for diff in differences if diff.get('message') == '在生产表中未找到该行'])
# 计算百分比
def safe_percentage(part, total):
return round((part / total * 100), 2) if total > 0 else 0
identical_percentage = safe_percentage(identical_records, total_keys)
different_percentage = safe_percentage(different_records, total_keys)
# 生成总结
summary = {
'overview': {
'total_keys_queried': total_keys,
'pro_records_found': pro_count,
'test_records_found': test_count,
'identical_records': identical_records,
'different_records': different_records,
'missing_in_test': missing_in_test,
'missing_in_pro': missing_in_pro
},
'percentages': {
'data_consistency': identical_percentage,
'data_differences': different_percentage,
'missing_rate': safe_percentage(missing_in_test + missing_in_pro, total_keys)
},
'field_analysis': {
'total_fields_compared': len(field_diff_count) if field_diff_count else 0,
'most_different_fields': sorted(field_diff_count.items(), key=lambda x: x[1], reverse=True)[:5] if field_diff_count else []
},
'data_quality': {
'completeness': safe_percentage(pro_count + test_count, total_keys * 2),
'consistency_score': identical_percentage,
'quality_level': get_quality_level(identical_percentage)
},
'recommendations': generate_recommendations(identical_percentage, missing_in_test, missing_in_pro, field_diff_count)
}
return summary
def get_quality_level(consistency_percentage):
"""根据一致性百分比获取数据质量等级"""
if consistency_percentage >= 95:
return {'level': '优秀', 'color': 'success', 'description': '数据一致性非常高'}
elif consistency_percentage >= 90:
return {'level': '良好', 'color': 'info', 'description': '数据一致性较高'}
elif consistency_percentage >= 80:
return {'level': '一般', 'color': 'warning', 'description': '数据一致性中等,需要关注'}
else:
return {'level': '较差', 'color': 'danger', 'description': '数据一致性较低,需要重点处理'}
def generate_recommendations(consistency_percentage, missing_in_test, missing_in_pro, field_diff_count):
"""生成改进建议"""
recommendations = []
if consistency_percentage < 90:
recommendations.append('建议重点关注数据一致性问题,检查数据同步机制')
if missing_in_test > 0:
recommendations.append(f'测试环境缺失 {missing_in_test} 条记录,建议检查数据迁移过程')
if missing_in_pro > 0:
recommendations.append(f'生产环境缺失 {missing_in_pro} 条记录,建议检查数据完整性')
if field_diff_count:
top_diff_field = max(field_diff_count.items(), key=lambda x: x[1])
recommendations.append(f'字段 "{top_diff_field[0]}" 差异最多({top_diff_field[1]}次),建议优先处理')
if not recommendations:
recommendations.append('数据质量良好,建议继续保持当前的数据管理流程')
return recommendations
@app.route('/')
def index():
return render_template('index.html')
@app.route('/test-config-load')
def test_config_load():
"""配置加载测试页面"""
return send_from_directory('.', 'test_config_load.html')
@app.route('/db-compare')
def db_compare():
return render_template('db_compare.html')
@app.route('/api/sharding-query', methods=['POST'])
def sharding_query_compare():
"""分表查询比对API"""
try:
data = request.json
# 开始新的查询批次
batch_id = query_log_collector.start_new_batch('分表')
logger.info("开始执行分表数据库比对查询")
# 解析配置
pro_config = data.get('pro_config', DEFAULT_CONFIG['pro_config'])
test_config = data.get('test_config', DEFAULT_CONFIG['test_config'])
# 从query_config中获取keys等参数
query_config = data.get('query_config', {})
keys = query_config.get('keys', DEFAULT_CONFIG['keys'])
fields_to_compare = query_config.get('fields_to_compare', DEFAULT_CONFIG['fields_to_compare'])
exclude_fields = query_config.get('exclude_fields', DEFAULT_CONFIG['exclude_fields'])
values = data.get('values', [])
sharding_config = data.get('sharding_config', {})
# 参数验证
if not values:
logger.warning("分表查询失败未提供查询key值")
return jsonify({'error': '请提供查询key值'}), 400
if not keys:
logger.warning("分表查询失败:未提供主键字段")
return jsonify({'error': '请提供主键字段'}), 400
# 添加详细的参数日志
logger.info(f"分表查询参数解析结果:")
logger.info(f" keys: {keys}")
logger.info(f" values数量: {len(values)}")
logger.info(f" fields_to_compare: {fields_to_compare}")
logger.info(f" exclude_fields: {exclude_fields}")
logger.info(f" sharding_config原始数据: {sharding_config}")
logger.info(f" sharding_config具体参数:")
logger.info(f" use_sharding_for_pro: {sharding_config.get('use_sharding_for_pro')}")
logger.info(f" use_sharding_for_test: {sharding_config.get('use_sharding_for_test')}")
logger.info(f" pro_interval_seconds: {sharding_config.get('pro_interval_seconds')}")
logger.info(f" pro_table_count: {sharding_config.get('pro_table_count')}")
logger.info(f" test_interval_seconds: {sharding_config.get('test_interval_seconds')}")
logger.info(f" test_table_count: {sharding_config.get('test_table_count')}")
logger.info(f" interval_seconds: {sharding_config.get('interval_seconds')}")
logger.info(f" table_count: {sharding_config.get('table_count')}")
logger.info(f"分表查询配置:{len(values)}个key值生产表{pro_config['table']},测试表:{test_config['table']}")
# 创建数据库连接
pro_cluster, pro_session = create_connection(pro_config)
test_cluster, test_session = create_connection(test_config)
if not pro_session or not test_session:
logger.error("数据库连接失败")
return jsonify({'error': '数据库连接失败,请检查配置信息'}), 500
try:
# 执行混合查询(支持生产环境分表、测试环境单表/分表的组合)
logger.info("执行分表混合查询")
query_results = execute_mixed_query(
pro_session, test_session, pro_config, test_config,
keys, fields_to_compare, values, exclude_fields, sharding_config
)
pro_data = query_results['pro_data']
test_data = query_results['test_data']
sharding_info = query_results['sharding_info']
logger.info(f"分表查询结果:生产表 {len(pro_data)} 条记录,测试表 {len(test_data)} 条记录")
# 比较结果
differences, field_diff_count, identical_results = compare_results(
pro_data, test_data, keys, fields_to_compare, exclude_fields, values
)
# 统计信息
different_ids = set()
for diff in differences:
if 'field' in diff:
different_ids.add(list(diff['key'].values())[0])
non_different_ids = set(values) - different_ids
# 生成比较总结
summary = generate_comparison_summary(
len(values), len(pro_data), len(test_data),
differences, identical_results, field_diff_count
)
result = {
'total_keys': len(values),
'pro_count': len(pro_data),
'test_count': len(test_data),
'differences': differences,
'identical_results': identical_results,
'field_diff_count': field_diff_count,
'different_ids': list(different_ids),
'non_different_ids': list(non_different_ids),
'summary': summary,
'sharding_info': sharding_info, # 包含分表查询信息
'raw_pro_data': [dict(row._asdict()) for row in pro_data] if pro_data else [],
'raw_test_data': [dict(row._asdict()) for row in test_data] if test_data else []
}
logger.info(f"分表比对完成:发现 {len(differences)} 处差异")
# 自动保存分表查询历史记录
try:
# 生成历史记录名称
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
history_name = f"分表查询_{timestamp}"
history_description = f"自动保存 - 分表查询{len(values)}个Key发现{len(differences)}处差异"
# 保存历史记录
history_id = save_query_history(
name=history_name,
description=history_description,
pro_config=pro_config,
test_config=test_config,
query_config={
'keys': keys,
'fields_to_compare': fields_to_compare,
'exclude_fields': exclude_fields
},
query_keys=values,
results_summary=summary,
execution_time=0.0, # 可以后续优化计算实际执行时间
total_keys=len(values),
differences_count=len(differences),
identical_count=len(identical_results),
# 新增分表相关参数
sharding_config=sharding_config,
query_type='sharding',
# 添加查询结果数据
raw_results={
'raw_pro_data': [dict(row._asdict()) for row in pro_data] if pro_data else [],
'raw_test_data': [dict(row._asdict()) for row in test_data] if test_data else [],
'sharding_info': sharding_info # 包含分表信息
},
differences_data=differences,
identical_data=identical_results
)
# 关联查询日志与历史记录
if history_id:
query_log_collector.set_history_id(history_id)
logger.info(f"分表查询历史记录保存成功: {history_name}, ID: {history_id}")
else:
logger.warning("分表查询历史记录保存失败无法获取history_id")
except Exception as e:
logger.warning(f"保存分表查询历史记录失败: {e}")
# 结束查询批次
query_log_collector.end_current_batch()
return jsonify(result)
except Exception as e:
logger.error(f"分表查询执行失败:{str(e)}")
# 结束查询批次(出错情况)
query_log_collector.end_current_batch()
return jsonify({'error': f'分表查询执行失败:{str(e)}'}), 500
finally:
# 关闭连接
if pro_cluster:
pro_cluster.shutdown()
if test_cluster:
test_cluster.shutdown()
except Exception as e:
logger.error(f"分表查询请求处理失败:{str(e)}")
# 结束查询批次(请求处理出错)
query_log_collector.end_current_batch()
return jsonify({'error': f'分表查询请求处理失败:{str(e)}'}), 500
@app.route('/api/query', methods=['POST'])
def query_compare():
try:
data = request.json
# 开始新的查询批次
batch_id = query_log_collector.start_new_batch('单表')
logger.info("开始执行数据库比对查询")
# 解析配置
pro_config = data.get('pro_config', DEFAULT_CONFIG['pro_config'])
test_config = data.get('test_config', DEFAULT_CONFIG['test_config'])
# 从query_config中获取keys等参数
query_config = data.get('query_config', {})
keys = query_config.get('keys', DEFAULT_CONFIG['keys'])
fields_to_compare = query_config.get('fields_to_compare', DEFAULT_CONFIG['fields_to_compare'])
exclude_fields = query_config.get('exclude_fields', DEFAULT_CONFIG['exclude_fields'])
values = data.get('values', [])
# 参数验证
if not values:
logger.warning("查询失败未提供查询key值")
return jsonify({'error': '请提供查询key值'}), 400
if not keys:
logger.warning("查询失败:未提供主键字段")
return jsonify({'error': '请提供主键字段'}), 400
# 添加详细的参数日志
logger.info(f"单表查询参数解析结果:")
logger.info(f" keys: {keys}")
logger.info(f" values数量: {len(values)}")
logger.info(f" fields_to_compare: {fields_to_compare}")
logger.info(f" exclude_fields: {exclude_fields}")
logger.info(f"查询配置:{len(values)}个key值生产表{pro_config['table']},测试表:{test_config['table']}")
# 创建数据库连接
pro_cluster, pro_session = create_connection(pro_config)
test_cluster, test_session = create_connection(test_config)
if not pro_session or not test_session:
logger.error("数据库连接失败")
return jsonify({'error': '数据库连接失败,请检查配置信息'}), 500
try:
# 执行查询
logger.info("执行生产环境查询")
pro_data = execute_query(pro_session, pro_config['table'], keys, fields_to_compare, values, exclude_fields)
logger.info("执行测试环境查询")
test_data = execute_query(test_session, test_config['table'], keys, fields_to_compare, values, exclude_fields)
logger.info(f"查询结果:生产表 {len(pro_data)} 条记录,测试表 {len(test_data)} 条记录")
# 比较结果
differences, field_diff_count, identical_results = compare_results(pro_data, test_data, keys, fields_to_compare, exclude_fields, values)
# 统计信息
different_ids = set()
for diff in differences:
if 'field' in diff:
different_ids.add(list(diff['key'].values())[0])
non_different_ids = set(values) - different_ids
# 生成比较总结
summary = generate_comparison_summary(
len(values), len(pro_data), len(test_data),
differences, identical_results, field_diff_count
)
result = {
'total_keys': len(values),
'pro_count': len(pro_data),
'test_count': len(test_data),
'differences': differences,
'identical_results': identical_results,
'field_diff_count': field_diff_count,
'different_ids': list(different_ids),
'non_different_ids': list(non_different_ids),
'summary': summary,
'raw_pro_data': [dict(row._asdict()) for row in pro_data] if pro_data else [],
'raw_test_data': [dict(row._asdict()) for row in test_data] if test_data else []
}
logger.info(f"比对完成:发现 {len(differences)} 处差异")
# 自动保存查询历史记录(可选,基于执行结果)
try:
# 生成历史记录名称
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
history_name = f"查询_{timestamp}"
history_description = f"自动保存 - 查询{len(values)}个Key发现{len(differences)}处差异"
# 保存历史记录
history_id = save_query_history(
name=history_name,
description=history_description,
pro_config=pro_config,
test_config=test_config,
query_config={
'keys': keys,
'fields_to_compare': fields_to_compare,
'exclude_fields': exclude_fields
},
query_keys=values,
results_summary=summary,
execution_time=0.0, # 可以后续优化计算实际执行时间
total_keys=len(values),
differences_count=len(differences),
identical_count=len(identical_results),
# 添加查询结果数据
raw_results={
'raw_pro_data': [dict(row._asdict()) for row in pro_data] if pro_data else [],
'raw_test_data': [dict(row._asdict()) for row in test_data] if test_data else []
},
differences_data=differences,
identical_data=identical_results
)
# 关联查询日志与历史记录
if history_id:
query_log_collector.set_history_id(history_id)
logger.info(f"查询历史记录保存成功: {history_name}, ID: {history_id}")
else:
logger.warning("查询历史记录保存失败无法获取history_id")
except Exception as e:
logger.warning(f"保存查询历史记录失败: {e}")
# 结束查询批次
query_log_collector.end_current_batch()
return jsonify(result)
except Exception as e:
logger.error(f"查询执行失败:{str(e)}")
# 结束查询批次(出错情况)
query_log_collector.end_current_batch()
return jsonify({'error': f'查询执行失败:{str(e)}'}), 500
finally:
# 关闭连接
if pro_cluster:
pro_cluster.shutdown()
if test_cluster:
test_cluster.shutdown()
except Exception as e:
logger.error(f"请求处理失败:{str(e)}")
# 结束查询批次(请求处理出错)
query_log_collector.end_current_batch()
return jsonify({'error': f'请求处理失败:{str(e)}'}), 500
@app.route('/api/default-config')
def get_default_config():
return jsonify(DEFAULT_CONFIG)
# 配置组管理API
@app.route('/api/config-groups', methods=['GET'])
def api_get_config_groups():
"""获取所有配置组"""
config_groups = get_config_groups()
return jsonify({'success': True, 'data': config_groups})
@app.route('/api/config-groups', methods=['POST'])
def api_save_config_group():
"""保存配置组"""
try:
data = request.json
name = data.get('name', '').strip()
description = data.get('description', '').strip()
pro_config = data.get('pro_config', {})
test_config = data.get('test_config', {})
# 获取查询配置,支持两种格式
if 'query_config' in data:
# 嵌套格式
query_config = data.get('query_config', {})
else:
# 平铺格式
query_config = {
'keys': data.get('keys', []),
'fields_to_compare': data.get('fields_to_compare', []),
'exclude_fields': data.get('exclude_fields', [])
}
# 提取分表配置
sharding_config = data.get('sharding_config')
if not name:
return jsonify({'success': False, 'error': '配置组名称不能为空'}), 400
success = save_config_group(name, description, pro_config, test_config, query_config, sharding_config)
if success:
return jsonify({'success': True, 'message': '配置组保存成功'})
else:
return jsonify({'success': False, 'error': '配置组保存失败'}), 500
except Exception as e:
logger.error(f"保存配置组API失败: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/config-groups/<int:group_id>', methods=['GET'])
def api_get_config_group(group_id):
"""获取指定配置组详情"""
config_group = get_config_group_by_id(group_id)
if config_group:
return jsonify({'success': True, 'data': config_group})
else:
return jsonify({'success': False, 'error': '配置组不存在'}), 404
@app.route('/api/config-groups/<int:group_id>', methods=['DELETE'])
def api_delete_config_group(group_id):
"""删除配置组"""
success = delete_config_group(group_id)
if success:
return jsonify({'success': True, 'message': '配置组删除成功'})
else:
return jsonify({'success': False, 'error': '配置组删除失败'}), 500
@app.route('/api/init-db', methods=['POST'])
def api_init_database():
"""手动初始化数据库(用于测试)"""
success = init_database()
if success:
return jsonify({'success': True, 'message': '数据库初始化成功'})
else:
return jsonify({'success': False, 'error': '数据库初始化失败'}), 500
# 查询历史管理API
@app.route('/api/query-history', methods=['GET'])
def api_get_query_history():
"""获取所有查询历史记录"""
history_list = get_query_history()
return jsonify({'success': True, 'data': history_list})
@app.route('/api/query-history', methods=['POST'])
def api_save_query_history():
"""保存查询历史记录,支持分表查询"""
try:
data = request.json
name = data.get('name', '').strip()
description = data.get('description', '').strip()
pro_config = data.get('pro_config', {})
test_config = data.get('test_config', {})
query_config = data.get('query_config', {})
query_keys = data.get('query_keys', [])
results_summary = data.get('results_summary', {})
execution_time = data.get('execution_time', 0.0)
total_keys = data.get('total_keys', 0)
differences_count = data.get('differences_count', 0)
identical_count = data.get('identical_count', 0)
# 新增分表相关字段
sharding_config = data.get('sharding_config')
query_type = data.get('query_type', 'single')
if not name:
return jsonify({'success': False, 'error': '历史记录名称不能为空'}), 400
success = save_query_history(
name, description, pro_config, test_config, query_config,
query_keys, results_summary, execution_time, total_keys,
differences_count, identical_count, sharding_config, query_type
)
if success:
query_type_desc = '分表查询' if query_type == 'sharding' else '单表查询'
return jsonify({'success': True, 'message': f'{query_type_desc}历史记录保存成功'})
else:
return jsonify({'success': False, 'error': '查询历史记录保存失败'}), 500
except Exception as e:
logger.error(f"保存查询历史记录API失败: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/query-history/<int:history_id>', methods=['GET'])
def api_get_query_history_detail(history_id):
"""获取指定查询历史记录详情"""
history_record = get_query_history_by_id(history_id)
if history_record:
return jsonify({'success': True, 'data': history_record})
else:
return jsonify({'success': False, 'error': '查询历史记录不存在'}), 404
@app.route('/api/query-history/<int:history_id>/results', methods=['GET'])
def api_get_query_history_results(history_id):
"""获取查询历史记录的完整结果数据"""
try:
history_record = get_query_history_by_id(history_id)
if not history_record:
return jsonify({'success': False, 'error': '历史记录不存在'}), 404
# 安全获取raw_results数据
raw_results = history_record.get('raw_results')
if raw_results and isinstance(raw_results, dict):
raw_pro_data = raw_results.get('raw_pro_data', []) or []
raw_test_data = raw_results.get('raw_test_data', []) or []
sharding_info = raw_results.get('sharding_info') if history_record.get('query_type') == 'sharding' else None
else:
raw_pro_data = []
raw_test_data = []
sharding_info = None
# 安全获取差异和相同结果数据
differences_data = history_record.get('differences_data') or []
identical_data = history_record.get('identical_data') or []
# 构建完整的查询结果格式与API查询结果保持一致
result = {
'total_keys': history_record['total_keys'],
'pro_count': len(raw_pro_data),
'test_count': len(raw_test_data),
'differences': differences_data,
'identical_results': identical_data,
'field_diff_count': {}, # 可以从differences_data中重新计算
'summary': history_record.get('results_summary', {}),
'raw_pro_data': raw_pro_data,
'raw_test_data': raw_test_data,
# 如果是分表查询,添加分表信息
'sharding_info': sharding_info,
# 添加历史记录元信息
'history_info': {
'id': history_record['id'],
'name': history_record['name'],
'description': history_record['description'],
'created_at': history_record['created_at'],
'query_type': history_record.get('query_type', 'single')
}
}
# 重新计算field_diff_count
if differences_data:
field_diff_count = {}
for diff in differences_data:
if isinstance(diff, dict) and 'field' in diff:
field_name = diff['field']
field_diff_count[field_name] = field_diff_count.get(field_name, 0) + 1
result['field_diff_count'] = field_diff_count
return jsonify({
'success': True,
'data': result,
'message': f'历史记录 "{history_record["name"]}" 结果加载成功'
})
except Exception as e:
logger.error(f"获取查询历史记录结果失败: {e}")
return jsonify({'success': False, 'error': f'获取历史记录结果失败: {str(e)}'}), 500
@app.route('/api/query-history/<int:history_id>', methods=['DELETE'])
def api_delete_query_history(history_id):
"""删除查询历史记录"""
success = delete_query_history(history_id)
if success:
return jsonify({'success': True, 'message': '查询历史记录删除成功'})
else:
return jsonify({'success': False, 'error': '查询历史记录删除失败'}), 500
@app.route('/api/query-logs', methods=['GET'])
def api_get_query_logs():
"""获取查询日志,支持分组显示和数据库存储"""
try:
limit = request.args.get('limit', type=int)
grouped = request.args.get('grouped', 'true').lower() == 'true' # 默认分组显示
from_db = request.args.get('from_db', 'true').lower() == 'true' # 默认从数据库获取
if grouped:
# 返回分组日志
grouped_logs = query_log_collector.get_logs_grouped_by_batch(limit, from_db)
# 获取总数(用于统计)
total_logs = query_log_collector._get_total_logs_count() if from_db else len(query_log_collector.logs)
return jsonify({
'success': True,
'data': grouped_logs,
'total': total_logs,
'grouped': True,
'from_db': from_db
})
else:
# 返回原始日志列表
logs = query_log_collector.get_logs(limit, from_db)
total_logs = query_log_collector._get_total_logs_count() if from_db else len(query_log_collector.logs)
return jsonify({
'success': True,
'data': logs,
'total': total_logs,
'grouped': False,
'from_db': from_db
})
except Exception as e:
logger.error(f"获取查询日志失败: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/query-logs', methods=['DELETE'])
def api_clear_query_logs():
"""清空查询日志,支持清空数据库日志"""
try:
clear_db = request.args.get('clear_db', 'true').lower() == 'true' # 默认清空数据库
query_log_collector.clear_logs(clear_db)
message = '查询日志已清空(包括数据库)' if clear_db else '查询日志已清空(仅内存)'
return jsonify({'success': True, 'message': message})
except Exception as e:
logger.error(f"清空查询日志失败: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/query-logs/cleanup', methods=['POST'])
def api_cleanup_old_logs():
"""清理旧的查询日志"""
try:
days_to_keep = request.json.get('days_to_keep', 30) if request.json else 30
deleted_count = query_log_collector.cleanup_old_logs(days_to_keep)
return jsonify({
'success': True,
'message': f'成功清理 {deleted_count} 条超过 {days_to_keep} 天的旧日志',
'deleted_count': deleted_count
})
except Exception as e:
logger.error(f"清理旧日志失败: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/query-logs/history/<int:history_id>', methods=['GET'])
def api_get_query_logs_by_history(history_id):
"""根据历史记录ID获取相关查询日志"""
try:
logs = query_log_collector.get_logs_by_history_id(history_id)
# 按批次分组显示
grouped_logs = {}
batch_order = []
for log in logs:
batch_id = log.get('batch_id', 'unknown')
if batch_id not in grouped_logs:
grouped_logs[batch_id] = []
batch_order.append(batch_id)
grouped_logs[batch_id].append(log)
# 返回按时间顺序排列的批次
grouped_result = [(batch_id, grouped_logs[batch_id]) for batch_id in batch_order]
return jsonify({
'success': True,
'data': grouped_result,
'total': len(logs),
'history_id': history_id,
'grouped': True
})
except Exception as e:
logger.error(f"获取历史记录相关查询日志失败: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
if __name__ == '__main__':
app.run(debug=True, port=5001)