diff --git a/app.py b/app.py index 444adbd..1532002 100644 --- a/app.py +++ b/app.py @@ -16,18 +16,24 @@ app = Flask(__name__) logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) +# 数据库配置 +DATABASE_PATH = 'config_groups.db' + # 查询日志收集器 class QueryLogCollector: - def __init__(self, max_logs=1000): - self.logs = [] + def __init__(self, max_logs=1000, db_path=None): + self.logs = [] # 内存中的日志缓存 self.max_logs = max_logs self.current_batch_id = None self.batch_counter = 0 + self.current_query_type = 'single' + self.db_path = db_path or DATABASE_PATH def start_new_batch(self, query_type='single'): """开始新的查询批次""" self.batch_counter += 1 self.current_batch_id = f"batch_{self.batch_counter}_{datetime.now().strftime('%H%M%S')}" + self.current_query_type = query_type # 添加批次开始标记 self.add_log('INFO', f"=== 开始{query_type}查询批次 (ID: {self.current_batch_id}) ===", force_batch_id=self.current_batch_id) @@ -39,29 +45,114 @@ class QueryLogCollector: self.add_log('INFO', f"=== 查询批次完成 (ID: {self.current_batch_id}) ===", force_batch_id=self.current_batch_id) self.current_batch_id = None - def add_log(self, level, message, force_batch_id=None): + def add_log(self, level, message, force_batch_id=None, force_query_type=None): + """添加日志到内存和数据库""" timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] batch_id = force_batch_id or self.current_batch_id + query_type = force_query_type or self.current_query_type log_entry = { 'timestamp': timestamp, 'level': level, 'message': message, - 'batch_id': batch_id + 'batch_id': batch_id, + 'query_type': query_type } + + # 添加到内存缓存 self.logs.append(log_entry) - # 保持日志数量在限制内 if len(self.logs) > self.max_logs: self.logs.pop(0) + + # 保存到数据库 + self._save_log_to_db(log_entry) - def get_logs(self, limit=None): - if limit: - return self.logs[-limit:] - return self.logs + def _save_log_to_db(self, log_entry): + """将日志保存到数据库""" + try: + conn = sqlite3.connect(self.db_path, timeout=30) + cursor = conn.cursor() + + cursor.execute(''' + INSERT INTO query_logs (batch_id, timestamp, level, message, query_type) + VALUES (?, ?, ?, ?, ?) + ''', ( + log_entry['batch_id'], + log_entry['timestamp'], + log_entry['level'], + log_entry['message'], + log_entry['query_type'] + )) + + conn.commit() + conn.close() + except Exception as e: + # 数据库写入失败时记录到控制台,但不影响程序运行 + print(f"Warning: Failed to save log to database: {e}") - def get_logs_grouped_by_batch(self, limit=None): + def get_logs(self, limit=None, from_db=True): + """获取日志,支持从数据库或内存获取""" + if from_db: + return self._get_logs_from_db(limit) + else: + # 从内存获取 + if limit: + return self.logs[-limit:] + return self.logs + + def _get_logs_from_db(self, limit=None): + """从数据库获取日志""" + try: + conn = sqlite3.connect(self.db_path, timeout=30) + conn.row_factory = sqlite3.Row + cursor = conn.cursor() + + query = ''' + SELECT batch_id, timestamp, level, message, query_type + FROM query_logs + ORDER BY id DESC + ''' + + if limit: + query += f' LIMIT {limit}' + + cursor.execute(query) + rows = cursor.fetchall() + + # 转换为字典格式并反转顺序(最新的在前) + logs = [] + for row in reversed(rows): + logs.append({ + 'batch_id': row['batch_id'], + 'timestamp': row['timestamp'], + 'level': row['level'], + 'message': row['message'], + 'query_type': row['query_type'] + }) + + conn.close() + return logs + except Exception as e: + print(f"Warning: Failed to get logs from database: {e}") + # 如果数据库读取失败,返回内存中的日志 + return self.get_logs(limit, from_db=False) + + def _get_total_logs_count(self): + """获取数据库中的日志总数""" + try: + conn = sqlite3.connect(self.db_path, timeout=30) + cursor = conn.cursor() + cursor.execute('SELECT COUNT(*) FROM query_logs') + count = cursor.fetchone()[0] + conn.close() + return count + except Exception as e: + print(f"Warning: Failed to get logs count from database: {e}") + return len(self.logs) + + def get_logs_grouped_by_batch(self, limit=None, from_db=True): """按批次分组获取日志""" - logs = self.get_logs(limit) + logs = self.get_logs(limit, from_db) grouped_logs = {} batch_order = [] @@ -75,10 +166,46 @@ class QueryLogCollector: # 返回按时间顺序排列的批次 return [(batch_id, grouped_logs[batch_id]) for batch_id in batch_order] - def clear_logs(self): + def clear_logs(self, clear_db=True): + """清空日志""" + # 清空内存 self.logs.clear() self.current_batch_id = None self.batch_counter = 0 + + # 清空数据库 + if clear_db: + try: + conn = sqlite3.connect(self.db_path, timeout=30) + cursor = conn.cursor() + cursor.execute('DELETE FROM query_logs') + conn.commit() + conn.close() + except Exception as e: + print(f"Warning: Failed to clear logs from database: {e}") + + def cleanup_old_logs(self, days_to_keep=30): + """清理旧日志,保留指定天数的日志""" + try: + conn = sqlite3.connect(self.db_path, timeout=30) + cursor = conn.cursor() + + # 删除超过指定天数的日志 + cutoff_date = datetime.now() - timedelta(days=days_to_keep) + cursor.execute(''' + DELETE FROM query_logs + WHERE created_at < ? + ''', (cutoff_date.strftime('%Y-%m-%d %H:%M:%S'),)) + + deleted_count = cursor.rowcount + conn.commit() + conn.close() + + logger.info(f"清理了 {deleted_count} 条超过 {days_to_keep} 天的旧日志") + return deleted_count + except Exception as e: + logger.error(f"清理旧日志失败: {e}") + return 0 # 全局日志收集器实例 query_log_collector = QueryLogCollector() @@ -96,9 +223,6 @@ class CollectorHandler(logging.Handler): collector_handler = CollectorHandler(query_log_collector) logger.addHandler(collector_handler) -# 数据库配置 -DATABASE_PATH = 'config_groups.db' - class ShardingCalculator: """分表计算器,基于TWCS策略""" @@ -262,6 +386,24 @@ def init_database(): ) ''') + # 创建查询日志表 + cursor.execute(''' + CREATE TABLE IF NOT EXISTS query_logs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + batch_id TEXT NOT NULL, + timestamp TEXT NOT NULL, + level TEXT NOT NULL, + message TEXT NOT NULL, + query_type TEXT DEFAULT 'single', + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ) + ''') + + # 创建索引 + cursor.execute('CREATE INDEX IF NOT EXISTS idx_query_logs_batch_id ON query_logs(batch_id)') + cursor.execute('CREATE INDEX IF NOT EXISTS idx_query_logs_timestamp ON query_logs(timestamp)') + cursor.execute('CREATE INDEX IF NOT EXISTS idx_query_logs_level ON query_logs(level)') + conn.commit() conn.close() logger.info("数据库初始化完成") @@ -280,11 +422,11 @@ def ensure_database(): try: conn = sqlite3.connect(DATABASE_PATH) cursor = conn.cursor() - cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name IN ('config_groups', 'query_history', 'sharding_config_groups')") + cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name IN ('config_groups', 'query_history', 'sharding_config_groups', 'query_logs')") results = cursor.fetchall() existing_tables = [row[0] for row in results] - required_tables = ['config_groups', 'query_history', 'sharding_config_groups'] + required_tables = ['config_groups', 'query_history', 'sharding_config_groups', 'query_logs'] missing_tables = [table for table in required_tables if table not in existing_tables] if missing_tables: @@ -1776,19 +1918,34 @@ def api_get_query_history_results(history_id): if not history_record: return jsonify({'success': False, 'error': '历史记录不存在'}), 404 + # 安全获取raw_results数据 + raw_results = history_record.get('raw_results') + if raw_results and isinstance(raw_results, dict): + raw_pro_data = raw_results.get('raw_pro_data', []) or [] + raw_test_data = raw_results.get('raw_test_data', []) or [] + sharding_info = raw_results.get('sharding_info') if history_record.get('query_type') == 'sharding' else None + else: + raw_pro_data = [] + raw_test_data = [] + sharding_info = None + + # 安全获取差异和相同结果数据 + differences_data = history_record.get('differences_data') or [] + identical_data = history_record.get('identical_data') or [] + # 构建完整的查询结果格式,与API查询结果保持一致 result = { 'total_keys': history_record['total_keys'], - 'pro_count': len(history_record.get('raw_results', {}).get('raw_pro_data', [])) if history_record.get('raw_results') else 0, - 'test_count': len(history_record.get('raw_results', {}).get('raw_test_data', [])) if history_record.get('raw_results') else 0, - 'differences': history_record.get('differences_data', []), - 'identical_results': history_record.get('identical_data', []), + 'pro_count': len(raw_pro_data), + 'test_count': len(raw_test_data), + 'differences': differences_data, + 'identical_results': identical_data, 'field_diff_count': {}, # 可以从differences_data中重新计算 'summary': history_record.get('results_summary', {}), - 'raw_pro_data': history_record.get('raw_results', {}).get('raw_pro_data', []) if history_record.get('raw_results') else [], - 'raw_test_data': history_record.get('raw_results', {}).get('raw_test_data', []) if history_record.get('raw_results') else [], + 'raw_pro_data': raw_pro_data, + 'raw_test_data': raw_test_data, # 如果是分表查询,添加分表信息 - 'sharding_info': history_record.get('raw_results', {}).get('sharding_info') if history_record.get('raw_results') and history_record.get('query_type') == 'sharding' else None, + 'sharding_info': sharding_info, # 添加历史记录元信息 'history_info': { 'id': history_record['id'], @@ -1800,10 +1957,10 @@ def api_get_query_history_results(history_id): } # 重新计算field_diff_count - if history_record.get('differences_data'): + if differences_data: field_diff_count = {} - for diff in history_record['differences_data']: - if 'field' in diff: + for diff in differences_data: + if isinstance(diff, dict) and 'field' in diff: field_name = diff['field'] field_diff_count[field_name] = field_diff_count.get(field_name, 0) + 1 result['field_diff_count'] = field_diff_count @@ -1830,28 +1987,36 @@ def api_delete_query_history(history_id): @app.route('/api/query-logs', methods=['GET']) def api_get_query_logs(): - """获取查询日志,支持分组显示""" + """获取查询日志,支持分组显示和数据库存储""" try: limit = request.args.get('limit', type=int) grouped = request.args.get('grouped', 'true').lower() == 'true' # 默认分组显示 + from_db = request.args.get('from_db', 'true').lower() == 'true' # 默认从数据库获取 if grouped: # 返回分组日志 - grouped_logs = query_log_collector.get_logs_grouped_by_batch(limit) + grouped_logs = query_log_collector.get_logs_grouped_by_batch(limit, from_db) + # 获取总数(用于统计) + total_logs = query_log_collector._get_total_logs_count() if from_db else len(query_log_collector.logs) + return jsonify({ 'success': True, 'data': grouped_logs, - 'total': len(query_log_collector.logs), - 'grouped': True + 'total': total_logs, + 'grouped': True, + 'from_db': from_db }) else: # 返回原始日志列表 - logs = query_log_collector.get_logs(limit) + logs = query_log_collector.get_logs(limit, from_db) + total_logs = query_log_collector._get_total_logs_count() if from_db else len(query_log_collector.logs) + return jsonify({ 'success': True, 'data': logs, - 'total': len(query_log_collector.logs), - 'grouped': False + 'total': total_logs, + 'grouped': False, + 'from_db': from_db }) except Exception as e: logger.error(f"获取查询日志失败: {e}") @@ -1859,13 +2024,32 @@ def api_get_query_logs(): @app.route('/api/query-logs', methods=['DELETE']) def api_clear_query_logs(): - """清空查询日志""" + """清空查询日志,支持清空数据库日志""" try: - query_log_collector.clear_logs() - return jsonify({'success': True, 'message': '查询日志已清空'}) + clear_db = request.args.get('clear_db', 'true').lower() == 'true' # 默认清空数据库 + query_log_collector.clear_logs(clear_db) + + message = '查询日志已清空(包括数据库)' if clear_db else '查询日志已清空(仅内存)' + return jsonify({'success': True, 'message': message}) except Exception as e: logger.error(f"清空查询日志失败: {e}") return jsonify({'success': False, 'error': str(e)}), 500 +@app.route('/api/query-logs/cleanup', methods=['POST']) +def api_cleanup_old_logs(): + """清理旧的查询日志""" + try: + days_to_keep = request.json.get('days_to_keep', 30) if request.json else 30 + deleted_count = query_log_collector.cleanup_old_logs(days_to_keep) + + return jsonify({ + 'success': True, + 'message': f'成功清理 {deleted_count} 条超过 {days_to_keep} 天的旧日志', + 'deleted_count': deleted_count + }) + except Exception as e: + logger.error(f"清理旧日志失败: {e}") + return jsonify({'success': False, 'error': str(e)}), 500 + if __name__ == '__main__': app.run(debug=True, port=5000) diff --git a/static/js/app.js b/static/js/app.js index 6bf4d3a..3814c2b 100644 --- a/static/js/app.js +++ b/static/js/app.js @@ -2736,8 +2736,16 @@ async function loadHistoryResults(historyId) { // 设置当前结果数据 currentResults = result.data; + // 确保必要的数组字段存在 + if (!currentResults.differences) currentResults.differences = []; + if (!currentResults.identical_results) currentResults.identical_results = []; + if (!currentResults.raw_pro_data) currentResults.raw_pro_data = []; + if (!currentResults.raw_test_data) currentResults.raw_test_data = []; + if (!currentResults.field_diff_count) currentResults.field_diff_count = {}; + if (!currentResults.summary) currentResults.summary = {}; + // 根据查询类型设置分表模式 - if (result.data.history_info.query_type === 'sharding') { + if (result.data.history_info && result.data.history_info.query_type === 'sharding') { isShardingMode = true; document.getElementById('enableSharding').checked = true; toggleShardingMode(); @@ -2752,14 +2760,18 @@ async function loadHistoryResults(historyId) { // 关闭历史记录modal const modal = bootstrap.Modal.getInstance(document.getElementById('queryHistoryModal')); - modal.hide(); + if (modal) { + modal.hide(); + } - const queryTypeDesc = result.data.history_info.query_type === 'sharding' ? '分表查询' : '单表查询'; - showAlert('success', `${queryTypeDesc}历史记录结果 "${result.data.history_info.name}" 加载成功`); + const queryTypeDesc = (result.data.history_info && result.data.history_info.query_type === 'sharding') ? '分表查询' : '单表查询'; + const historyName = (result.data.history_info && result.data.history_info.name) || '未知'; + showAlert('success', `${queryTypeDesc}历史记录结果 "${historyName}" 加载成功`); } else { showAlert('danger', result.error || '加载历史记录结果失败'); } } catch (error) { + console.error('加载历史记录结果失败:', error); showAlert('danger', '加载历史记录结果失败: ' + error.message); } } @@ -3173,7 +3185,7 @@ let allQueryLogs = []; // 存储所有日志 async function refreshQueryLogs() { try { - const response = await fetch('/api/query-logs?grouped=true'); + const response = await fetch('/api/query-logs?grouped=true&from_db=true'); const result = await response.json(); if (result.success && result.data) { @@ -3366,19 +3378,19 @@ function filterLogsByLevel() { } async function clearQueryLogs() { - if (!confirm('确定要清空所有查询日志吗?')) { + if (!confirm('确定要清空所有查询日志吗?这将删除内存和数据库中的所有日志记录。')) { return; } try { - const response = await fetch('/api/query-logs', { + const response = await fetch('/api/query-logs?clear_db=true', { method: 'DELETE' }); const result = await response.json(); if (result.success) { document.getElementById('query-logs').innerHTML = '