Files
BigDataTool/modules/query_logger.py
2025-08-05 11:23:49 +08:00

304 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
查询日志管理模块
================
本模块提供BigDataTool的完整查询日志管理功能支持实时日志收集和历史日志分析。
核心功能:
1. 实时日志收集:自动收集所有查询操作的详细日志
2. 批次管理:按查询批次组织日志,便于追踪完整的查询流程
3. 双重存储:内存缓存 + SQLite持久化存储
4. 历史关联:将日志与查询历史记录关联,支持完整的操作回溯
5. 性能监控:记录查询时间、记录数等性能指标
日志收集特性:
- 多级日志支持INFO、WARNING、ERROR等日志级别
- 批次追踪每个查询批次分配唯一ID便于日志分组
- 时间戳:精确到毫秒的时间戳记录
- 查询类型区分单表、分表、Redis等不同查询类型
- 历史关联:支持日志与查询历史记录的双向关联
存储策略:
- 内存缓存:最近的日志保存在内存中,支持快速访问
- 数据库持久化所有日志自动保存到SQLite数据库
- 容量控制:内存缓存有容量限制,自动清理旧日志
- 事务安全:数据库写入失败不影响程序运行
查询和分析:
- 按批次查询:支持按查询批次获取相关日志
- 按历史记录查询支持按历史记录ID获取相关日志
- 分页支持:大量日志的分页显示
- 时间范围:支持按时间范围筛选日志
- 日志清理:支持按时间清理旧日志
作者BigDataTool项目组
更新时间2024年8月
"""
import sqlite3
import logging
from datetime import datetime, timedelta
from .database import DATABASE_PATH
logger = logging.getLogger(__name__)
class QueryLogCollector:
def __init__(self, max_logs=1000, db_path=None):
self.logs = [] # 内存中的日志缓存
self.max_logs = max_logs
self.current_batch_id = None
self.batch_counter = 0
self.current_query_type = 'single'
self.current_history_id = None # 当前关联的历史记录ID
self.db_path = db_path or DATABASE_PATH
def start_new_batch(self, query_type='single'):
"""开始新的查询批次"""
self.batch_counter += 1
self.current_batch_id = f"batch_{self.batch_counter}_{datetime.now().strftime('%H%M%S')}"
self.current_query_type = query_type
self.current_history_id = None # 重置历史记录ID
# 添加批次开始标记
self.add_log('INFO', f"=== 开始{query_type}查询批次 (ID: {self.current_batch_id}) ===", force_batch_id=self.current_batch_id)
return self.current_batch_id
def set_history_id(self, history_id):
"""设置当前批次关联的历史记录ID"""
self.current_history_id = history_id
if self.current_batch_id and history_id:
self.add_log('INFO', f"关联历史记录ID: {history_id}", force_batch_id=self.current_batch_id)
# 更新当前批次的所有日志记录的history_id
self._update_batch_history_id(self.current_batch_id, history_id)
def _update_batch_history_id(self, batch_id, history_id):
"""更新批次中所有日志的history_id"""
try:
conn = sqlite3.connect(self.db_path, timeout=30)
cursor = conn.cursor()
cursor.execute('''
UPDATE query_logs
SET history_id = ?
WHERE batch_id = ?
''', (history_id, batch_id))
conn.commit()
conn.close()
logger.info(f"已更新批次 {batch_id} 的历史记录关联到 {history_id}")
except Exception as e:
print(f"Warning: Failed to update batch history_id: {e}")
def end_current_batch(self):
"""结束当前查询批次"""
if self.current_batch_id:
self.add_log('INFO', f"=== 查询批次完成 (ID: {self.current_batch_id}) ===", force_batch_id=self.current_batch_id)
self.current_batch_id = None
self.current_history_id = None
def add_log(self, level, message, force_batch_id=None, force_query_type=None, force_history_id=None):
"""添加日志到内存和数据库"""
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
batch_id = force_batch_id or self.current_batch_id
query_type = force_query_type or self.current_query_type
history_id = force_history_id or self.current_history_id
log_entry = {
'timestamp': timestamp,
'level': level,
'message': message,
'batch_id': batch_id,
'query_type': query_type,
'history_id': history_id
}
# 添加到内存缓存
self.logs.append(log_entry)
if len(self.logs) > self.max_logs:
self.logs.pop(0)
# 保存到数据库
self._save_log_to_db(log_entry)
def _save_log_to_db(self, log_entry):
"""将日志保存到数据库"""
try:
conn = sqlite3.connect(self.db_path, timeout=30)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO query_logs (batch_id, history_id, timestamp, level, message, query_type)
VALUES (?, ?, ?, ?, ?, ?)
''', (
log_entry['batch_id'],
log_entry['history_id'],
log_entry['timestamp'],
log_entry['level'],
log_entry['message'],
log_entry['query_type']
))
conn.commit()
conn.close()
except Exception as e:
# 数据库写入失败时记录到控制台,但不影响程序运行
print(f"Warning: Failed to save log to database: {e}")
def get_logs(self, limit=None, from_db=True):
"""获取日志,支持从数据库或内存获取"""
if from_db:
return self._get_logs_from_db(limit)
else:
# 从内存获取
if limit:
return self.logs[-limit:]
return self.logs
def _get_logs_from_db(self, limit=None):
"""从数据库获取日志"""
try:
conn = sqlite3.connect(self.db_path, timeout=30)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
query = '''
SELECT batch_id, history_id, timestamp, level, message, query_type
FROM query_logs
ORDER BY id DESC
'''
if limit:
query += f' LIMIT {limit}'
cursor.execute(query)
rows = cursor.fetchall()
# 转换为字典格式并反转顺序(最新的在前)
logs = []
for row in reversed(rows):
logs.append({
'batch_id': row['batch_id'],
'history_id': row['history_id'],
'timestamp': row['timestamp'],
'level': row['level'],
'message': row['message'],
'query_type': row['query_type']
})
conn.close()
return logs
except Exception as e:
print(f"Warning: Failed to get logs from database: {e}")
# 如果数据库读取失败,返回内存中的日志
return self.get_logs(limit, from_db=False)
def _get_total_logs_count(self):
"""获取数据库中的日志总数"""
try:
conn = sqlite3.connect(self.db_path, timeout=30)
cursor = conn.cursor()
cursor.execute('SELECT COUNT(*) FROM query_logs')
count = cursor.fetchone()[0]
conn.close()
return count
except Exception as e:
print(f"Warning: Failed to get logs count from database: {e}")
return len(self.logs)
def get_logs_by_history_id(self, history_id):
"""根据历史记录ID获取相关日志"""
try:
conn = sqlite3.connect(self.db_path, timeout=30)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute('''
SELECT batch_id, history_id, timestamp, level, message, query_type
FROM query_logs
WHERE history_id = ?
ORDER BY id ASC
''', (history_id,))
rows = cursor.fetchall()
logs = []
for row in rows:
logs.append({
'batch_id': row['batch_id'],
'history_id': row['history_id'],
'timestamp': row['timestamp'],
'level': row['level'],
'message': row['message'],
'query_type': row['query_type']
})
conn.close()
return logs
except Exception as e:
print(f"Warning: Failed to get logs by history_id: {e}")
return []
def get_logs_grouped_by_batch(self, limit=None, from_db=True):
"""按批次分组获取日志"""
logs = self.get_logs(limit, from_db)
grouped_logs = {}
batch_order = []
for log in logs:
batch_id = log.get('batch_id', 'unknown')
if batch_id not in grouped_logs:
grouped_logs[batch_id] = []
batch_order.append(batch_id)
grouped_logs[batch_id].append(log)
# 返回按时间顺序排列的批次
return [(batch_id, grouped_logs[batch_id]) for batch_id in batch_order]
def clear_logs(self, clear_db=True):
"""清空日志"""
# 清空内存
self.logs.clear()
self.current_batch_id = None
self.batch_counter = 0
# 清空数据库
if clear_db:
try:
conn = sqlite3.connect(self.db_path, timeout=30)
cursor = conn.cursor()
cursor.execute('DELETE FROM query_logs')
conn.commit()
conn.close()
except Exception as e:
print(f"Warning: Failed to clear logs from database: {e}")
def cleanup_old_logs(self, days_to_keep=30):
"""清理旧日志,保留指定天数的日志"""
try:
conn = sqlite3.connect(self.db_path, timeout=30)
cursor = conn.cursor()
# 删除超过指定天数的日志
cutoff_date = datetime.now() - timedelta(days=days_to_keep)
cursor.execute('''
DELETE FROM query_logs
WHERE created_at < ?
''', (cutoff_date.strftime('%Y-%m-%d %H:%M:%S'),))
deleted_count = cursor.rowcount
conn.commit()
conn.close()
logger.info(f"清理了 {deleted_count} 条超过 {days_to_keep} 天的旧日志")
return deleted_count
except Exception as e:
logger.error(f"清理旧日志失败: {e}")
return 0
# 自定义日志处理器
class CollectorHandler(logging.Handler):
def __init__(self, collector):
super().__init__()
self.collector = collector
def emit(self, record):
self.collector.add_log(record.levelname, record.getMessage())