完善查询历史记录

This commit is contained in:
2025-08-02 23:09:47 +08:00
parent 36915c45ea
commit f674373401
3 changed files with 285 additions and 45 deletions

258
app.py
View File

@@ -16,18 +16,24 @@ app = Flask(__name__)
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# 数据库配置
DATABASE_PATH = 'config_groups.db'
# 查询日志收集器
class QueryLogCollector:
def __init__(self, max_logs=1000):
self.logs = []
def __init__(self, max_logs=1000, db_path=None):
self.logs = [] # 内存中的日志缓存
self.max_logs = max_logs
self.current_batch_id = None
self.batch_counter = 0
self.current_query_type = 'single'
self.db_path = db_path or DATABASE_PATH
def start_new_batch(self, query_type='single'):
"""开始新的查询批次"""
self.batch_counter += 1
self.current_batch_id = f"batch_{self.batch_counter}_{datetime.now().strftime('%H%M%S')}"
self.current_query_type = query_type
# 添加批次开始标记
self.add_log('INFO', f"=== 开始{query_type}查询批次 (ID: {self.current_batch_id}) ===", force_batch_id=self.current_batch_id)
@@ -39,29 +45,114 @@ class QueryLogCollector:
self.add_log('INFO', f"=== 查询批次完成 (ID: {self.current_batch_id}) ===", force_batch_id=self.current_batch_id)
self.current_batch_id = None
def add_log(self, level, message, force_batch_id=None):
def add_log(self, level, message, force_batch_id=None, force_query_type=None):
"""添加日志到内存和数据库"""
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
batch_id = force_batch_id or self.current_batch_id
query_type = force_query_type or self.current_query_type
log_entry = {
'timestamp': timestamp,
'level': level,
'message': message,
'batch_id': batch_id
'batch_id': batch_id,
'query_type': query_type
}
# 添加到内存缓存
self.logs.append(log_entry)
# 保持日志数量在限制内
if len(self.logs) > self.max_logs:
self.logs.pop(0)
# 保存到数据库
self._save_log_to_db(log_entry)
def get_logs(self, limit=None):
if limit:
return self.logs[-limit:]
return self.logs
def _save_log_to_db(self, log_entry):
"""将日志保存到数据库"""
try:
conn = sqlite3.connect(self.db_path, timeout=30)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO query_logs (batch_id, timestamp, level, message, query_type)
VALUES (?, ?, ?, ?, ?)
''', (
log_entry['batch_id'],
log_entry['timestamp'],
log_entry['level'],
log_entry['message'],
log_entry['query_type']
))
conn.commit()
conn.close()
except Exception as e:
# 数据库写入失败时记录到控制台,但不影响程序运行
print(f"Warning: Failed to save log to database: {e}")
def get_logs_grouped_by_batch(self, limit=None):
def get_logs(self, limit=None, from_db=True):
"""获取日志,支持从数据库或内存获取"""
if from_db:
return self._get_logs_from_db(limit)
else:
# 从内存获取
if limit:
return self.logs[-limit:]
return self.logs
def _get_logs_from_db(self, limit=None):
"""从数据库获取日志"""
try:
conn = sqlite3.connect(self.db_path, timeout=30)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
query = '''
SELECT batch_id, timestamp, level, message, query_type
FROM query_logs
ORDER BY id DESC
'''
if limit:
query += f' LIMIT {limit}'
cursor.execute(query)
rows = cursor.fetchall()
# 转换为字典格式并反转顺序(最新的在前)
logs = []
for row in reversed(rows):
logs.append({
'batch_id': row['batch_id'],
'timestamp': row['timestamp'],
'level': row['level'],
'message': row['message'],
'query_type': row['query_type']
})
conn.close()
return logs
except Exception as e:
print(f"Warning: Failed to get logs from database: {e}")
# 如果数据库读取失败,返回内存中的日志
return self.get_logs(limit, from_db=False)
def _get_total_logs_count(self):
"""获取数据库中的日志总数"""
try:
conn = sqlite3.connect(self.db_path, timeout=30)
cursor = conn.cursor()
cursor.execute('SELECT COUNT(*) FROM query_logs')
count = cursor.fetchone()[0]
conn.close()
return count
except Exception as e:
print(f"Warning: Failed to get logs count from database: {e}")
return len(self.logs)
def get_logs_grouped_by_batch(self, limit=None, from_db=True):
"""按批次分组获取日志"""
logs = self.get_logs(limit)
logs = self.get_logs(limit, from_db)
grouped_logs = {}
batch_order = []
@@ -75,10 +166,46 @@ class QueryLogCollector:
# 返回按时间顺序排列的批次
return [(batch_id, grouped_logs[batch_id]) for batch_id in batch_order]
def clear_logs(self):
def clear_logs(self, clear_db=True):
"""清空日志"""
# 清空内存
self.logs.clear()
self.current_batch_id = None
self.batch_counter = 0
# 清空数据库
if clear_db:
try:
conn = sqlite3.connect(self.db_path, timeout=30)
cursor = conn.cursor()
cursor.execute('DELETE FROM query_logs')
conn.commit()
conn.close()
except Exception as e:
print(f"Warning: Failed to clear logs from database: {e}")
def cleanup_old_logs(self, days_to_keep=30):
"""清理旧日志,保留指定天数的日志"""
try:
conn = sqlite3.connect(self.db_path, timeout=30)
cursor = conn.cursor()
# 删除超过指定天数的日志
cutoff_date = datetime.now() - timedelta(days=days_to_keep)
cursor.execute('''
DELETE FROM query_logs
WHERE created_at < ?
''', (cutoff_date.strftime('%Y-%m-%d %H:%M:%S'),))
deleted_count = cursor.rowcount
conn.commit()
conn.close()
logger.info(f"清理了 {deleted_count} 条超过 {days_to_keep} 天的旧日志")
return deleted_count
except Exception as e:
logger.error(f"清理旧日志失败: {e}")
return 0
# 全局日志收集器实例
query_log_collector = QueryLogCollector()
@@ -96,9 +223,6 @@ class CollectorHandler(logging.Handler):
collector_handler = CollectorHandler(query_log_collector)
logger.addHandler(collector_handler)
# 数据库配置
DATABASE_PATH = 'config_groups.db'
class ShardingCalculator:
"""分表计算器基于TWCS策略"""
@@ -262,6 +386,24 @@ def init_database():
)
''')
# 创建查询日志表
cursor.execute('''
CREATE TABLE IF NOT EXISTS query_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
batch_id TEXT NOT NULL,
timestamp TEXT NOT NULL,
level TEXT NOT NULL,
message TEXT NOT NULL,
query_type TEXT DEFAULT 'single',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 创建索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_query_logs_batch_id ON query_logs(batch_id)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_query_logs_timestamp ON query_logs(timestamp)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_query_logs_level ON query_logs(level)')
conn.commit()
conn.close()
logger.info("数据库初始化完成")
@@ -280,11 +422,11 @@ def ensure_database():
try:
conn = sqlite3.connect(DATABASE_PATH)
cursor = conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name IN ('config_groups', 'query_history', 'sharding_config_groups')")
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name IN ('config_groups', 'query_history', 'sharding_config_groups', 'query_logs')")
results = cursor.fetchall()
existing_tables = [row[0] for row in results]
required_tables = ['config_groups', 'query_history', 'sharding_config_groups']
required_tables = ['config_groups', 'query_history', 'sharding_config_groups', 'query_logs']
missing_tables = [table for table in required_tables if table not in existing_tables]
if missing_tables:
@@ -1776,19 +1918,34 @@ def api_get_query_history_results(history_id):
if not history_record:
return jsonify({'success': False, 'error': '历史记录不存在'}), 404
# 安全获取raw_results数据
raw_results = history_record.get('raw_results')
if raw_results and isinstance(raw_results, dict):
raw_pro_data = raw_results.get('raw_pro_data', []) or []
raw_test_data = raw_results.get('raw_test_data', []) or []
sharding_info = raw_results.get('sharding_info') if history_record.get('query_type') == 'sharding' else None
else:
raw_pro_data = []
raw_test_data = []
sharding_info = None
# 安全获取差异和相同结果数据
differences_data = history_record.get('differences_data') or []
identical_data = history_record.get('identical_data') or []
# 构建完整的查询结果格式与API查询结果保持一致
result = {
'total_keys': history_record['total_keys'],
'pro_count': len(history_record.get('raw_results', {}).get('raw_pro_data', [])) if history_record.get('raw_results') else 0,
'test_count': len(history_record.get('raw_results', {}).get('raw_test_data', [])) if history_record.get('raw_results') else 0,
'differences': history_record.get('differences_data', []),
'identical_results': history_record.get('identical_data', []),
'pro_count': len(raw_pro_data),
'test_count': len(raw_test_data),
'differences': differences_data,
'identical_results': identical_data,
'field_diff_count': {}, # 可以从differences_data中重新计算
'summary': history_record.get('results_summary', {}),
'raw_pro_data': history_record.get('raw_results', {}).get('raw_pro_data', []) if history_record.get('raw_results') else [],
'raw_test_data': history_record.get('raw_results', {}).get('raw_test_data', []) if history_record.get('raw_results') else [],
'raw_pro_data': raw_pro_data,
'raw_test_data': raw_test_data,
# 如果是分表查询,添加分表信息
'sharding_info': history_record.get('raw_results', {}).get('sharding_info') if history_record.get('raw_results') and history_record.get('query_type') == 'sharding' else None,
'sharding_info': sharding_info,
# 添加历史记录元信息
'history_info': {
'id': history_record['id'],
@@ -1800,10 +1957,10 @@ def api_get_query_history_results(history_id):
}
# 重新计算field_diff_count
if history_record.get('differences_data'):
if differences_data:
field_diff_count = {}
for diff in history_record['differences_data']:
if 'field' in diff:
for diff in differences_data:
if isinstance(diff, dict) and 'field' in diff:
field_name = diff['field']
field_diff_count[field_name] = field_diff_count.get(field_name, 0) + 1
result['field_diff_count'] = field_diff_count
@@ -1830,28 +1987,36 @@ def api_delete_query_history(history_id):
@app.route('/api/query-logs', methods=['GET'])
def api_get_query_logs():
"""获取查询日志,支持分组显示"""
"""获取查询日志,支持分组显示和数据库存储"""
try:
limit = request.args.get('limit', type=int)
grouped = request.args.get('grouped', 'true').lower() == 'true' # 默认分组显示
from_db = request.args.get('from_db', 'true').lower() == 'true' # 默认从数据库获取
if grouped:
# 返回分组日志
grouped_logs = query_log_collector.get_logs_grouped_by_batch(limit)
grouped_logs = query_log_collector.get_logs_grouped_by_batch(limit, from_db)
# 获取总数(用于统计)
total_logs = query_log_collector._get_total_logs_count() if from_db else len(query_log_collector.logs)
return jsonify({
'success': True,
'data': grouped_logs,
'total': len(query_log_collector.logs),
'grouped': True
'total': total_logs,
'grouped': True,
'from_db': from_db
})
else:
# 返回原始日志列表
logs = query_log_collector.get_logs(limit)
logs = query_log_collector.get_logs(limit, from_db)
total_logs = query_log_collector._get_total_logs_count() if from_db else len(query_log_collector.logs)
return jsonify({
'success': True,
'data': logs,
'total': len(query_log_collector.logs),
'grouped': False
'total': total_logs,
'grouped': False,
'from_db': from_db
})
except Exception as e:
logger.error(f"获取查询日志失败: {e}")
@@ -1859,13 +2024,32 @@ def api_get_query_logs():
@app.route('/api/query-logs', methods=['DELETE'])
def api_clear_query_logs():
"""清空查询日志"""
"""清空查询日志,支持清空数据库日志"""
try:
query_log_collector.clear_logs()
return jsonify({'success': True, 'message': '查询日志已清空'})
clear_db = request.args.get('clear_db', 'true').lower() == 'true' # 默认清空数据库
query_log_collector.clear_logs(clear_db)
message = '查询日志已清空(包括数据库)' if clear_db else '查询日志已清空(仅内存)'
return jsonify({'success': True, 'message': message})
except Exception as e:
logger.error(f"清空查询日志失败: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/query-logs/cleanup', methods=['POST'])
def api_cleanup_old_logs():
"""清理旧的查询日志"""
try:
days_to_keep = request.json.get('days_to_keep', 30) if request.json else 30
deleted_count = query_log_collector.cleanup_old_logs(days_to_keep)
return jsonify({
'success': True,
'message': f'成功清理 {deleted_count} 条超过 {days_to_keep} 天的旧日志',
'deleted_count': deleted_count
})
except Exception as e:
logger.error(f"清理旧日志失败: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
if __name__ == '__main__':
app.run(debug=True, port=5000)

View File

@@ -2736,8 +2736,16 @@ async function loadHistoryResults(historyId) {
// 设置当前结果数据
currentResults = result.data;
// 确保必要的数组字段存在
if (!currentResults.differences) currentResults.differences = [];
if (!currentResults.identical_results) currentResults.identical_results = [];
if (!currentResults.raw_pro_data) currentResults.raw_pro_data = [];
if (!currentResults.raw_test_data) currentResults.raw_test_data = [];
if (!currentResults.field_diff_count) currentResults.field_diff_count = {};
if (!currentResults.summary) currentResults.summary = {};
// 根据查询类型设置分表模式
if (result.data.history_info.query_type === 'sharding') {
if (result.data.history_info && result.data.history_info.query_type === 'sharding') {
isShardingMode = true;
document.getElementById('enableSharding').checked = true;
toggleShardingMode();
@@ -2752,14 +2760,18 @@ async function loadHistoryResults(historyId) {
// 关闭历史记录modal
const modal = bootstrap.Modal.getInstance(document.getElementById('queryHistoryModal'));
modal.hide();
if (modal) {
modal.hide();
}
const queryTypeDesc = result.data.history_info.query_type === 'sharding' ? '分表查询' : '单表查询';
showAlert('success', `${queryTypeDesc}历史记录结果 "${result.data.history_info.name}" 加载成功`);
const queryTypeDesc = (result.data.history_info && result.data.history_info.query_type === 'sharding') ? '分表查询' : '单表查询';
const historyName = (result.data.history_info && result.data.history_info.name) || '未知';
showAlert('success', `${queryTypeDesc}历史记录结果 "${historyName}" 加载成功`);
} else {
showAlert('danger', result.error || '加载历史记录结果失败');
}
} catch (error) {
console.error('加载历史记录结果失败:', error);
showAlert('danger', '加载历史记录结果失败: ' + error.message);
}
}
@@ -3173,7 +3185,7 @@ let allQueryLogs = []; // 存储所有日志
async function refreshQueryLogs() {
try {
const response = await fetch('/api/query-logs?grouped=true');
const response = await fetch('/api/query-logs?grouped=true&from_db=true');
const result = await response.json();
if (result.success && result.data) {
@@ -3366,19 +3378,19 @@ function filterLogsByLevel() {
}
async function clearQueryLogs() {
if (!confirm('确定要清空所有查询日志吗?')) {
if (!confirm('确定要清空所有查询日志吗?这将删除内存和数据库中的所有日志记录。')) {
return;
}
try {
const response = await fetch('/api/query-logs', {
const response = await fetch('/api/query-logs?clear_db=true', {
method: 'DELETE'
});
const result = await response.json();
if (result.success) {
document.getElementById('query-logs').innerHTML = '<div class="alert alert-info">查询日志已清空</div>';
showAlert('success', '查询日志已清空');
showAlert('success', result.message);
} else {
showAlert('danger', '清空查询日志失败: ' + result.error);
}
@@ -3388,6 +3400,47 @@ async function clearQueryLogs() {
}
}
// 清理旧日志
async function cleanupOldLogs() {
const days = prompt('请输入要保留的天数默认30天:', '30');
if (days === null) return; // 用户取消
const daysToKeep = parseInt(days) || 30;
if (daysToKeep <= 0) {
showAlert('warning', '保留天数必须大于0');
return;
}
if (!confirm(`确定要清理超过 ${daysToKeep} 天的旧日志吗?`)) {
return;
}
try {
const response = await fetch('/api/query-logs/cleanup', {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({
days_to_keep: daysToKeep
})
});
const result = await response.json();
if (result.success) {
showAlert('success', result.message);
// 刷新日志显示
refreshQueryLogs();
} else {
showAlert('danger', '清理旧日志失败: ' + result.error);
}
} catch (error) {
console.error('清理旧日志失败:', error);
showAlert('danger', '清理旧日志失败');
}
}
// 在查询执行后自动刷新日志
function autoRefreshLogsAfterQuery() {
// 延迟一下确保后端日志已经记录

View File

@@ -707,6 +707,9 @@
<button class="btn btn-sm btn-outline-primary me-2" onclick="refreshQueryLogs()">
<i class="fas fa-sync-alt"></i> 刷新
</button>
<button class="btn btn-sm btn-outline-warning me-2" onclick="cleanupOldLogs()">
<i class="fas fa-broom"></i> 清理旧日志
</button>
<button class="btn btn-sm btn-outline-danger" onclick="clearQueryLogs()">
<i class="fas fa-trash"></i> 清空
</button>