完善查询历史记录

This commit is contained in:
2025-08-03 10:28:09 +08:00
parent f674373401
commit 313319e2bb
2 changed files with 298 additions and 16 deletions

158
app.py
View File

@@ -5,7 +5,7 @@ import json
import os
import logging
import sqlite3
from datetime import datetime
from datetime import datetime, timedelta
import re
import concurrent.futures
import time
@@ -27,6 +27,7 @@ class QueryLogCollector:
self.current_batch_id = None
self.batch_counter = 0
self.current_query_type = 'single'
self.current_history_id = None # 当前关联的历史记录ID
self.db_path = db_path or DATABASE_PATH
def start_new_batch(self, query_type='single'):
@@ -34,29 +35,59 @@ class QueryLogCollector:
self.batch_counter += 1
self.current_batch_id = f"batch_{self.batch_counter}_{datetime.now().strftime('%H%M%S')}"
self.current_query_type = query_type
self.current_history_id = None # 重置历史记录ID
# 添加批次开始标记
self.add_log('INFO', f"=== 开始{query_type}查询批次 (ID: {self.current_batch_id}) ===", force_batch_id=self.current_batch_id)
return self.current_batch_id
def set_history_id(self, history_id):
"""设置当前批次关联的历史记录ID"""
self.current_history_id = history_id
if self.current_batch_id and history_id:
self.add_log('INFO', f"关联历史记录ID: {history_id}", force_batch_id=self.current_batch_id)
# 更新当前批次的所有日志记录的history_id
self._update_batch_history_id(self.current_batch_id, history_id)
def _update_batch_history_id(self, batch_id, history_id):
"""更新批次中所有日志的history_id"""
try:
conn = sqlite3.connect(self.db_path, timeout=30)
cursor = conn.cursor()
cursor.execute('''
UPDATE query_logs
SET history_id = ?
WHERE batch_id = ?
''', (history_id, batch_id))
conn.commit()
conn.close()
logger.info(f"已更新批次 {batch_id} 的历史记录关联到 {history_id}")
except Exception as e:
print(f"Warning: Failed to update batch history_id: {e}")
def end_current_batch(self):
"""结束当前查询批次"""
if self.current_batch_id:
self.add_log('INFO', f"=== 查询批次完成 (ID: {self.current_batch_id}) ===", force_batch_id=self.current_batch_id)
self.current_batch_id = None
self.current_history_id = None
def add_log(self, level, message, force_batch_id=None, force_query_type=None):
def add_log(self, level, message, force_batch_id=None, force_query_type=None, force_history_id=None):
"""添加日志到内存和数据库"""
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
batch_id = force_batch_id or self.current_batch_id
query_type = force_query_type or self.current_query_type
history_id = force_history_id or self.current_history_id
log_entry = {
'timestamp': timestamp,
'level': level,
'message': message,
'batch_id': batch_id,
'query_type': query_type
'query_type': query_type,
'history_id': history_id
}
# 添加到内存缓存
@@ -74,10 +105,11 @@ class QueryLogCollector:
cursor = conn.cursor()
cursor.execute('''
INSERT INTO query_logs (batch_id, timestamp, level, message, query_type)
VALUES (?, ?, ?, ?, ?)
INSERT INTO query_logs (batch_id, history_id, timestamp, level, message, query_type)
VALUES (?, ?, ?, ?, ?, ?)
''', (
log_entry['batch_id'],
log_entry['history_id'],
log_entry['timestamp'],
log_entry['level'],
log_entry['message'],
@@ -108,7 +140,7 @@ class QueryLogCollector:
cursor = conn.cursor()
query = '''
SELECT batch_id, timestamp, level, message, query_type
SELECT batch_id, history_id, timestamp, level, message, query_type
FROM query_logs
ORDER BY id DESC
'''
@@ -124,6 +156,7 @@ class QueryLogCollector:
for row in reversed(rows):
logs.append({
'batch_id': row['batch_id'],
'history_id': row['history_id'],
'timestamp': row['timestamp'],
'level': row['level'],
'message': row['message'],
@@ -150,6 +183,38 @@ class QueryLogCollector:
print(f"Warning: Failed to get logs count from database: {e}")
return len(self.logs)
def get_logs_by_history_id(self, history_id):
"""根据历史记录ID获取相关日志"""
try:
conn = sqlite3.connect(self.db_path, timeout=30)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute('''
SELECT batch_id, history_id, timestamp, level, message, query_type
FROM query_logs
WHERE history_id = ?
ORDER BY id ASC
''', (history_id,))
rows = cursor.fetchall()
logs = []
for row in rows:
logs.append({
'batch_id': row['batch_id'],
'history_id': row['history_id'],
'timestamp': row['timestamp'],
'level': row['level'],
'message': row['message'],
'query_type': row['query_type']
})
conn.close()
return logs
except Exception as e:
print(f"Warning: Failed to get logs by history_id: {e}")
return []
def get_logs_grouped_by_batch(self, limit=None, from_db=True):
"""按批次分组获取日志"""
logs = self.get_logs(limit, from_db)
@@ -391,16 +456,19 @@ def init_database():
CREATE TABLE IF NOT EXISTS query_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
batch_id TEXT NOT NULL,
history_id INTEGER,
timestamp TEXT NOT NULL,
level TEXT NOT NULL,
message TEXT NOT NULL,
query_type TEXT DEFAULT 'single',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (history_id) REFERENCES query_history (id) ON DELETE CASCADE
)
''')
# 创建索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_query_logs_batch_id ON query_logs(batch_id)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_query_logs_history_id ON query_logs(history_id)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_query_logs_timestamp ON query_logs(timestamp)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_query_logs_level ON query_logs(level)')
@@ -480,6 +548,19 @@ def ensure_database():
conn.commit()
logger.info("query_history表identical_data字段添加成功")
# 检查query_logs表是否存在history_id字段
cursor.execute("PRAGMA table_info(query_logs)")
logs_columns = cursor.fetchall()
logs_column_names = [column[1] for column in logs_columns]
if 'history_id' not in logs_column_names:
logger.info("添加history_id字段到query_logs表...")
cursor.execute("ALTER TABLE query_logs ADD COLUMN history_id INTEGER")
# 创建外键索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_query_logs_history_id ON query_logs(history_id)')
conn.commit()
logger.info("query_logs表history_id字段添加成功")
conn.close()
return True
except Exception as e:
@@ -834,10 +915,10 @@ def delete_config_group(group_id):
def save_query_history(name, description, pro_config, test_config, query_config, query_keys,
results_summary, execution_time, total_keys, differences_count, identical_count,
sharding_config=None, query_type='single', raw_results=None, differences_data=None, identical_data=None):
"""保存查询历史记录,支持分表查询和查询结果数据"""
"""保存查询历史记录,支持分表查询和查询结果数据返回历史记录ID"""
if not ensure_database():
logger.error("数据库初始化失败")
return False
return None
conn = get_db_connection()
cursor = conn.cursor()
@@ -866,12 +947,15 @@ def save_query_history(name, description, pro_config, test_config, query_config,
json.dumps(differences_data) if differences_data else None,
json.dumps(identical_data) if identical_data else None
))
# 获取插入记录的ID
history_id = cursor.lastrowid
conn.commit()
logger.info(f"查询历史记录 '{name}' 保存成功,查询类型:{query_type}")
return True
logger.info(f"查询历史记录 '{name}' 保存成功,查询类型:{query_type}ID{history_id}")
return history_id
except Exception as e:
logger.error(f"保存查询历史记录失败: {e}")
return False
return None
finally:
conn.close()
@@ -1574,7 +1658,7 @@ def sharding_query_compare():
history_description = f"自动保存 - 分表查询{len(values)}个Key发现{len(differences)}处差异"
# 保存历史记录
save_query_history(
history_id = save_query_history(
name=history_name,
description=history_description,
pro_config=pro_config,
@@ -1602,7 +1686,13 @@ def sharding_query_compare():
differences_data=differences,
identical_data=identical_results
)
logger.info(f"分表查询历史记录保存成功: {history_name}")
# 关联查询日志与历史记录
if history_id:
query_log_collector.set_history_id(history_id)
logger.info(f"分表查询历史记录保存成功: {history_name}, ID: {history_id}")
else:
logger.warning("分表查询历史记录保存失败无法获取history_id")
except Exception as e:
logger.warning(f"保存分表查询历史记录失败: {e}")
@@ -1726,7 +1816,7 @@ def query_compare():
history_description = f"自动保存 - 查询{len(values)}个Key发现{len(differences)}处差异"
# 保存历史记录
save_query_history(
history_id = save_query_history(
name=history_name,
description=history_description,
pro_config=pro_config,
@@ -1750,6 +1840,13 @@ def query_compare():
differences_data=differences,
identical_data=identical_results
)
# 关联查询日志与历史记录
if history_id:
query_log_collector.set_history_id(history_id)
logger.info(f"查询历史记录保存成功: {history_name}, ID: {history_id}")
else:
logger.warning("查询历史记录保存失败无法获取history_id")
except Exception as e:
logger.warning(f"保存查询历史记录失败: {e}")
@@ -2051,5 +2148,36 @@ def api_cleanup_old_logs():
logger.error(f"清理旧日志失败: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/query-logs/history/<int:history_id>', methods=['GET'])
def api_get_query_logs_by_history(history_id):
"""根据历史记录ID获取相关查询日志"""
try:
logs = query_log_collector.get_logs_by_history_id(history_id)
# 按批次分组显示
grouped_logs = {}
batch_order = []
for log in logs:
batch_id = log.get('batch_id', 'unknown')
if batch_id not in grouped_logs:
grouped_logs[batch_id] = []
batch_order.append(batch_id)
grouped_logs[batch_id].append(log)
# 返回按时间顺序排列的批次
grouped_result = [(batch_id, grouped_logs[batch_id]) for batch_id in batch_order]
return jsonify({
'success': True,
'data': grouped_result,
'total': len(logs),
'history_id': history_id,
'grouped': True
})
except Exception as e:
logger.error(f"获取历史记录相关查询日志失败: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
if __name__ == '__main__':
app.run(debug=True, port=5000)