From 313319e2bb8231b3e94c2c74d015fb4aac6780c6 Mon Sep 17 00:00:00 2001 From: YoVinchen Date: Sun, 3 Aug 2025 10:28:09 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E5=96=84=E6=9F=A5=E8=AF=A2=E5=8E=86?= =?UTF-8?q?=E5=8F=B2=E8=AE=B0=E5=BD=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app.py | 158 ++++++++++++++++++++++++++++++++++++++++++----- static/js/app.js | 156 +++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 298 insertions(+), 16 deletions(-) diff --git a/app.py b/app.py index 1532002..6960aa4 100644 --- a/app.py +++ b/app.py @@ -5,7 +5,7 @@ import json import os import logging import sqlite3 -from datetime import datetime +from datetime import datetime, timedelta import re import concurrent.futures import time @@ -27,6 +27,7 @@ class QueryLogCollector: self.current_batch_id = None self.batch_counter = 0 self.current_query_type = 'single' + self.current_history_id = None # 当前关联的历史记录ID self.db_path = db_path or DATABASE_PATH def start_new_batch(self, query_type='single'): @@ -34,29 +35,59 @@ class QueryLogCollector: self.batch_counter += 1 self.current_batch_id = f"batch_{self.batch_counter}_{datetime.now().strftime('%H%M%S')}" self.current_query_type = query_type + self.current_history_id = None # 重置历史记录ID # 添加批次开始标记 self.add_log('INFO', f"=== 开始{query_type}查询批次 (ID: {self.current_batch_id}) ===", force_batch_id=self.current_batch_id) return self.current_batch_id + def set_history_id(self, history_id): + """设置当前批次关联的历史记录ID""" + self.current_history_id = history_id + if self.current_batch_id and history_id: + self.add_log('INFO', f"关联历史记录ID: {history_id}", force_batch_id=self.current_batch_id) + # 更新当前批次的所有日志记录的history_id + self._update_batch_history_id(self.current_batch_id, history_id) + + def _update_batch_history_id(self, batch_id, history_id): + """更新批次中所有日志的history_id""" + try: + conn = sqlite3.connect(self.db_path, timeout=30) + cursor = conn.cursor() + + cursor.execute(''' + UPDATE query_logs + SET history_id = ? + WHERE batch_id = ? + ''', (history_id, batch_id)) + + conn.commit() + conn.close() + logger.info(f"已更新批次 {batch_id} 的历史记录关联到 {history_id}") + except Exception as e: + print(f"Warning: Failed to update batch history_id: {e}") + def end_current_batch(self): """结束当前查询批次""" if self.current_batch_id: self.add_log('INFO', f"=== 查询批次完成 (ID: {self.current_batch_id}) ===", force_batch_id=self.current_batch_id) self.current_batch_id = None + self.current_history_id = None - def add_log(self, level, message, force_batch_id=None, force_query_type=None): + def add_log(self, level, message, force_batch_id=None, force_query_type=None, force_history_id=None): """添加日志到内存和数据库""" timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] batch_id = force_batch_id or self.current_batch_id query_type = force_query_type or self.current_query_type + history_id = force_history_id or self.current_history_id log_entry = { 'timestamp': timestamp, 'level': level, 'message': message, 'batch_id': batch_id, - 'query_type': query_type + 'query_type': query_type, + 'history_id': history_id } # 添加到内存缓存 @@ -74,10 +105,11 @@ class QueryLogCollector: cursor = conn.cursor() cursor.execute(''' - INSERT INTO query_logs (batch_id, timestamp, level, message, query_type) - VALUES (?, ?, ?, ?, ?) + INSERT INTO query_logs (batch_id, history_id, timestamp, level, message, query_type) + VALUES (?, ?, ?, ?, ?, ?) ''', ( log_entry['batch_id'], + log_entry['history_id'], log_entry['timestamp'], log_entry['level'], log_entry['message'], @@ -108,7 +140,7 @@ class QueryLogCollector: cursor = conn.cursor() query = ''' - SELECT batch_id, timestamp, level, message, query_type + SELECT batch_id, history_id, timestamp, level, message, query_type FROM query_logs ORDER BY id DESC ''' @@ -124,6 +156,7 @@ class QueryLogCollector: for row in reversed(rows): logs.append({ 'batch_id': row['batch_id'], + 'history_id': row['history_id'], 'timestamp': row['timestamp'], 'level': row['level'], 'message': row['message'], @@ -150,6 +183,38 @@ class QueryLogCollector: print(f"Warning: Failed to get logs count from database: {e}") return len(self.logs) + def get_logs_by_history_id(self, history_id): + """根据历史记录ID获取相关日志""" + try: + conn = sqlite3.connect(self.db_path, timeout=30) + conn.row_factory = sqlite3.Row + cursor = conn.cursor() + + cursor.execute(''' + SELECT batch_id, history_id, timestamp, level, message, query_type + FROM query_logs + WHERE history_id = ? + ORDER BY id ASC + ''', (history_id,)) + + rows = cursor.fetchall() + logs = [] + for row in rows: + logs.append({ + 'batch_id': row['batch_id'], + 'history_id': row['history_id'], + 'timestamp': row['timestamp'], + 'level': row['level'], + 'message': row['message'], + 'query_type': row['query_type'] + }) + + conn.close() + return logs + except Exception as e: + print(f"Warning: Failed to get logs by history_id: {e}") + return [] + def get_logs_grouped_by_batch(self, limit=None, from_db=True): """按批次分组获取日志""" logs = self.get_logs(limit, from_db) @@ -391,16 +456,19 @@ def init_database(): CREATE TABLE IF NOT EXISTS query_logs ( id INTEGER PRIMARY KEY AUTOINCREMENT, batch_id TEXT NOT NULL, + history_id INTEGER, timestamp TEXT NOT NULL, level TEXT NOT NULL, message TEXT NOT NULL, query_type TEXT DEFAULT 'single', - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY (history_id) REFERENCES query_history (id) ON DELETE CASCADE ) ''') # 创建索引 cursor.execute('CREATE INDEX IF NOT EXISTS idx_query_logs_batch_id ON query_logs(batch_id)') + cursor.execute('CREATE INDEX IF NOT EXISTS idx_query_logs_history_id ON query_logs(history_id)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_query_logs_timestamp ON query_logs(timestamp)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_query_logs_level ON query_logs(level)') @@ -480,6 +548,19 @@ def ensure_database(): conn.commit() logger.info("query_history表identical_data字段添加成功") + # 检查query_logs表是否存在history_id字段 + cursor.execute("PRAGMA table_info(query_logs)") + logs_columns = cursor.fetchall() + logs_column_names = [column[1] for column in logs_columns] + + if 'history_id' not in logs_column_names: + logger.info("添加history_id字段到query_logs表...") + cursor.execute("ALTER TABLE query_logs ADD COLUMN history_id INTEGER") + # 创建外键索引 + cursor.execute('CREATE INDEX IF NOT EXISTS idx_query_logs_history_id ON query_logs(history_id)') + conn.commit() + logger.info("query_logs表history_id字段添加成功") + conn.close() return True except Exception as e: @@ -834,10 +915,10 @@ def delete_config_group(group_id): def save_query_history(name, description, pro_config, test_config, query_config, query_keys, results_summary, execution_time, total_keys, differences_count, identical_count, sharding_config=None, query_type='single', raw_results=None, differences_data=None, identical_data=None): - """保存查询历史记录,支持分表查询和查询结果数据""" + """保存查询历史记录,支持分表查询和查询结果数据,返回历史记录ID""" if not ensure_database(): logger.error("数据库初始化失败") - return False + return None conn = get_db_connection() cursor = conn.cursor() @@ -866,12 +947,15 @@ def save_query_history(name, description, pro_config, test_config, query_config, json.dumps(differences_data) if differences_data else None, json.dumps(identical_data) if identical_data else None )) + + # 获取插入记录的ID + history_id = cursor.lastrowid conn.commit() - logger.info(f"查询历史记录 '{name}' 保存成功,查询类型:{query_type}") - return True + logger.info(f"查询历史记录 '{name}' 保存成功,查询类型:{query_type},ID:{history_id}") + return history_id except Exception as e: logger.error(f"保存查询历史记录失败: {e}") - return False + return None finally: conn.close() @@ -1574,7 +1658,7 @@ def sharding_query_compare(): history_description = f"自动保存 - 分表查询{len(values)}个Key,发现{len(differences)}处差异" # 保存历史记录 - save_query_history( + history_id = save_query_history( name=history_name, description=history_description, pro_config=pro_config, @@ -1602,7 +1686,13 @@ def sharding_query_compare(): differences_data=differences, identical_data=identical_results ) - logger.info(f"分表查询历史记录保存成功: {history_name}") + + # 关联查询日志与历史记录 + if history_id: + query_log_collector.set_history_id(history_id) + logger.info(f"分表查询历史记录保存成功: {history_name}, ID: {history_id}") + else: + logger.warning("分表查询历史记录保存失败,无法获取history_id") except Exception as e: logger.warning(f"保存分表查询历史记录失败: {e}") @@ -1726,7 +1816,7 @@ def query_compare(): history_description = f"自动保存 - 查询{len(values)}个Key,发现{len(differences)}处差异" # 保存历史记录 - save_query_history( + history_id = save_query_history( name=history_name, description=history_description, pro_config=pro_config, @@ -1750,6 +1840,13 @@ def query_compare(): differences_data=differences, identical_data=identical_results ) + + # 关联查询日志与历史记录 + if history_id: + query_log_collector.set_history_id(history_id) + logger.info(f"查询历史记录保存成功: {history_name}, ID: {history_id}") + else: + logger.warning("查询历史记录保存失败,无法获取history_id") except Exception as e: logger.warning(f"保存查询历史记录失败: {e}") @@ -2051,5 +2148,36 @@ def api_cleanup_old_logs(): logger.error(f"清理旧日志失败: {e}") return jsonify({'success': False, 'error': str(e)}), 500 +@app.route('/api/query-logs/history/', methods=['GET']) +def api_get_query_logs_by_history(history_id): + """根据历史记录ID获取相关查询日志""" + try: + logs = query_log_collector.get_logs_by_history_id(history_id) + + # 按批次分组显示 + grouped_logs = {} + batch_order = [] + + for log in logs: + batch_id = log.get('batch_id', 'unknown') + if batch_id not in grouped_logs: + grouped_logs[batch_id] = [] + batch_order.append(batch_id) + grouped_logs[batch_id].append(log) + + # 返回按时间顺序排列的批次 + grouped_result = [(batch_id, grouped_logs[batch_id]) for batch_id in batch_order] + + return jsonify({ + 'success': True, + 'data': grouped_result, + 'total': len(logs), + 'history_id': history_id, + 'grouped': True + }) + except Exception as e: + logger.error(f"获取历史记录相关查询日志失败: {e}") + return jsonify({'success': False, 'error': str(e)}), 500 + if __name__ == '__main__': app.run(debug=True, port=5000) diff --git a/static/js/app.js b/static/js/app.js index 3814c2b..a87eb1d 100644 --- a/static/js/app.js +++ b/static/js/app.js @@ -2758,6 +2758,21 @@ async function loadHistoryResults(historyId) { // 显示结果 displayResults(result.data); + // 加载相关的查询日志 + try { + await loadQueryLogsByHistory(historyId); + // 自动切换到查询日志选项卡以显示关联信息 + const logsTab = document.getElementById('logs-tab'); + if (logsTab) { + // 添加一个小延迟确保结果已经显示 + setTimeout(() => { + logsTab.click(); + }, 500); + } + } catch (logError) { + console.warn('加载历史记录相关查询日志失败:', logError); + } + // 关闭历史记录modal const modal = bootstrap.Modal.getInstance(document.getElementById('queryHistoryModal')); if (modal) { @@ -2766,7 +2781,7 @@ async function loadHistoryResults(historyId) { const queryTypeDesc = (result.data.history_info && result.data.history_info.query_type === 'sharding') ? '分表查询' : '单表查询'; const historyName = (result.data.history_info && result.data.history_info.name) || '未知'; - showAlert('success', `${queryTypeDesc}历史记录结果 "${historyName}" 加载成功`); + showAlert('success', `${queryTypeDesc}历史记录结果 "${historyName}" 加载成功,查询日志已关联显示`); } else { showAlert('danger', result.error || '加载历史记录结果失败'); } @@ -3441,6 +3456,145 @@ async function cleanupOldLogs() { } } +// 根据历史记录ID获取相关查询日志 +async function loadQueryLogsByHistory(historyId) { + try { + const response = await fetch(`/api/query-logs/history/${historyId}`); + const result = await response.json(); + + if (result.success && result.data) { + // 显示关联的查询日志 + displayHistoryRelatedLogs(result.data, historyId); + return result.data; + } else { + console.warn('获取历史记录相关日志失败:', result.error); + return []; + } + } catch (error) { + console.error('获取历史记录相关查询日志失败:', error); + return []; + } +} + +// 显示历史记录相关的查询日志 +function displayHistoryRelatedLogs(groupedLogs, historyId) { + const container = document.getElementById('query-logs'); + + if (!groupedLogs || groupedLogs.length === 0) { + container.innerHTML = ` +
+ + 历史记录 ID ${historyId} 没有关联的查询日志 +
+ `; + return; + } + + let html = ` +
+ + 显示历史记录 ID ${historyId} 的相关查询日志 (共 ${groupedLogs.length} 个批次) +
+ `; + + // 使用与refreshQueryLogs相同的显示逻辑 + groupedLogs.forEach((batchData, index) => { + const [batchId, logs] = batchData; + const isExpanded = true; // 历史记录日志默认展开所有批次 + const collapseId = `history-batch-${batchId}`; + + // 统计批次信息 + const logCounts = { + INFO: logs.filter(log => log.level === 'INFO').length, + WARNING: logs.filter(log => log.level === 'WARNING').length, + ERROR: logs.filter(log => log.level === 'ERROR').length + }; + + const totalLogs = logs.length; + const firstLog = logs[0]; + const lastLog = logs[logs.length - 1]; + const duration = firstLog && lastLog ? calculateDuration(firstLog.timestamp, lastLog.timestamp) : '0秒'; + + // 确定批次类型和状态 + const hasErrors = logCounts.ERROR > 0; + const hasWarnings = logCounts.WARNING > 0; + const batchStatus = hasErrors ? 'danger' : hasWarnings ? 'warning' : 'success'; + const batchIcon = hasErrors ? 'fas fa-times-circle' : hasWarnings ? 'fas fa-exclamation-triangle' : 'fas fa-check-circle'; + + // 提取查询类型 + const batchTypeMatch = firstLog?.message.match(/开始(\w+)查询批次/); + const batchType = batchTypeMatch ? batchTypeMatch[1] : '未知'; + + html += ` +
+
+
+
+ + ${batchType}查询批次 ${batchId} + ${totalLogs}条日志 + 历史记录 +
+
+ + ${duration} + +
+ ${logCounts.INFO > 0 ? `${logCounts.INFO} INFO` : ''} + ${logCounts.WARNING > 0 ? `${logCounts.WARNING} WARN` : ''} + ${logCounts.ERROR > 0 ? `${logCounts.ERROR} ERROR` : ''} +
+ +
+
+
+
+
+
+ `; + + // 显示该批次的日志条目 + logs.forEach(log => { + const shouldShow = true; // 历史记录日志显示所有级别 + + if (shouldShow) { + const levelClass = log.level === 'ERROR' ? 'danger' : + log.level === 'WARNING' ? 'warning' : 'info'; + const levelIcon = log.level === 'ERROR' ? 'fas fa-times-circle' : + log.level === 'WARNING' ? 'fas fa-exclamation-triangle' : 'fas fa-info-circle'; + + // 简化时间戳显示 + const timeOnly = log.timestamp.split(' ')[1] || log.timestamp; + + html += ` +
+
+ + ${log.level} + + + ${timeOnly} + +
+ ${formatLogMessage(log.message)} +
+
+
+ `; + } + }); + + html += ` +
+
+
+
+ `; + }); + + container.innerHTML = html; +} + // 在查询执行后自动刷新日志 function autoRefreshLogsAfterQuery() { // 延迟一下确保后端日志已经记录