304 lines
11 KiB
Python
304 lines
11 KiB
Python
"""
|
||
查询日志管理模块
|
||
================
|
||
|
||
本模块提供BigDataTool的完整查询日志管理功能,支持实时日志收集和历史日志分析。
|
||
|
||
核心功能:
|
||
1. 实时日志收集:自动收集所有查询操作的详细日志
|
||
2. 批次管理:按查询批次组织日志,便于追踪完整的查询流程
|
||
3. 双重存储:内存缓存 + SQLite持久化存储
|
||
4. 历史关联:将日志与查询历史记录关联,支持完整的操作回溯
|
||
5. 性能监控:记录查询时间、记录数等性能指标
|
||
|
||
日志收集特性:
|
||
- 多级日志:支持INFO、WARNING、ERROR等日志级别
|
||
- 批次追踪:每个查询批次分配唯一ID,便于日志分组
|
||
- 时间戳:精确到毫秒的时间戳记录
|
||
- 查询类型:区分单表、分表、Redis等不同查询类型
|
||
- 历史关联:支持日志与查询历史记录的双向关联
|
||
|
||
存储策略:
|
||
- 内存缓存:最近的日志保存在内存中,支持快速访问
|
||
- 数据库持久化:所有日志自动保存到SQLite数据库
|
||
- 容量控制:内存缓存有容量限制,自动清理旧日志
|
||
- 事务安全:数据库写入失败不影响程序运行
|
||
|
||
查询和分析:
|
||
- 按批次查询:支持按查询批次获取相关日志
|
||
- 按历史记录查询:支持按历史记录ID获取相关日志
|
||
- 分页支持:大量日志的分页显示
|
||
- 时间范围:支持按时间范围筛选日志
|
||
- 日志清理:支持按时间清理旧日志
|
||
|
||
作者:BigDataTool项目组
|
||
更新时间:2024年8月
|
||
"""
|
||
|
||
import sqlite3
|
||
import logging
|
||
from datetime import datetime, timedelta
|
||
from .database import DATABASE_PATH
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
class QueryLogCollector:
|
||
def __init__(self, max_logs=1000, db_path=None):
|
||
self.logs = [] # 内存中的日志缓存
|
||
self.max_logs = max_logs
|
||
self.current_batch_id = None
|
||
self.batch_counter = 0
|
||
self.current_query_type = 'single'
|
||
self.current_history_id = None # 当前关联的历史记录ID
|
||
self.db_path = db_path or DATABASE_PATH
|
||
|
||
def start_new_batch(self, query_type='single'):
|
||
"""开始新的查询批次"""
|
||
self.batch_counter += 1
|
||
self.current_batch_id = f"batch_{self.batch_counter}_{datetime.now().strftime('%H%M%S')}"
|
||
self.current_query_type = query_type
|
||
self.current_history_id = None # 重置历史记录ID
|
||
|
||
# 添加批次开始标记
|
||
self.add_log('INFO', f"=== 开始{query_type}查询批次 (ID: {self.current_batch_id}) ===", force_batch_id=self.current_batch_id)
|
||
return self.current_batch_id
|
||
|
||
def set_history_id(self, history_id):
|
||
"""设置当前批次关联的历史记录ID"""
|
||
self.current_history_id = history_id
|
||
if self.current_batch_id and history_id:
|
||
self.add_log('INFO', f"关联历史记录ID: {history_id}", force_batch_id=self.current_batch_id)
|
||
# 更新当前批次的所有日志记录的history_id
|
||
self._update_batch_history_id(self.current_batch_id, history_id)
|
||
|
||
def _update_batch_history_id(self, batch_id, history_id):
|
||
"""更新批次中所有日志的history_id"""
|
||
try:
|
||
conn = sqlite3.connect(self.db_path, timeout=30)
|
||
cursor = conn.cursor()
|
||
|
||
cursor.execute('''
|
||
UPDATE query_logs
|
||
SET history_id = ?
|
||
WHERE batch_id = ?
|
||
''', (history_id, batch_id))
|
||
|
||
conn.commit()
|
||
conn.close()
|
||
logger.info(f"已更新批次 {batch_id} 的历史记录关联到 {history_id}")
|
||
except Exception as e:
|
||
print(f"Warning: Failed to update batch history_id: {e}")
|
||
|
||
def end_current_batch(self):
|
||
"""结束当前查询批次"""
|
||
if self.current_batch_id:
|
||
self.add_log('INFO', f"=== 查询批次完成 (ID: {self.current_batch_id}) ===", force_batch_id=self.current_batch_id)
|
||
self.current_batch_id = None
|
||
self.current_history_id = None
|
||
|
||
def add_log(self, level, message, force_batch_id=None, force_query_type=None, force_history_id=None):
|
||
"""添加日志到内存和数据库"""
|
||
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
|
||
batch_id = force_batch_id or self.current_batch_id
|
||
query_type = force_query_type or self.current_query_type
|
||
history_id = force_history_id or self.current_history_id
|
||
|
||
log_entry = {
|
||
'timestamp': timestamp,
|
||
'level': level,
|
||
'message': message,
|
||
'batch_id': batch_id,
|
||
'query_type': query_type,
|
||
'history_id': history_id
|
||
}
|
||
|
||
# 添加到内存缓存
|
||
self.logs.append(log_entry)
|
||
if len(self.logs) > self.max_logs:
|
||
self.logs.pop(0)
|
||
|
||
# 保存到数据库
|
||
self._save_log_to_db(log_entry)
|
||
|
||
def _save_log_to_db(self, log_entry):
|
||
"""将日志保存到数据库"""
|
||
try:
|
||
conn = sqlite3.connect(self.db_path, timeout=30)
|
||
cursor = conn.cursor()
|
||
|
||
cursor.execute('''
|
||
INSERT INTO query_logs (batch_id, history_id, timestamp, level, message, query_type)
|
||
VALUES (?, ?, ?, ?, ?, ?)
|
||
''', (
|
||
log_entry['batch_id'],
|
||
log_entry['history_id'],
|
||
log_entry['timestamp'],
|
||
log_entry['level'],
|
||
log_entry['message'],
|
||
log_entry['query_type']
|
||
))
|
||
|
||
conn.commit()
|
||
conn.close()
|
||
except Exception as e:
|
||
# 数据库写入失败时记录到控制台,但不影响程序运行
|
||
print(f"Warning: Failed to save log to database: {e}")
|
||
|
||
def get_logs(self, limit=None, from_db=True):
|
||
"""获取日志,支持从数据库或内存获取"""
|
||
if from_db:
|
||
return self._get_logs_from_db(limit)
|
||
else:
|
||
# 从内存获取
|
||
if limit:
|
||
return self.logs[-limit:]
|
||
return self.logs
|
||
|
||
def _get_logs_from_db(self, limit=None):
|
||
"""从数据库获取日志"""
|
||
try:
|
||
conn = sqlite3.connect(self.db_path, timeout=30)
|
||
conn.row_factory = sqlite3.Row
|
||
cursor = conn.cursor()
|
||
|
||
query = '''
|
||
SELECT batch_id, history_id, timestamp, level, message, query_type
|
||
FROM query_logs
|
||
ORDER BY id DESC
|
||
'''
|
||
|
||
if limit:
|
||
query += f' LIMIT {limit}'
|
||
|
||
cursor.execute(query)
|
||
rows = cursor.fetchall()
|
||
|
||
# 转换为字典格式并反转顺序(最新的在前)
|
||
logs = []
|
||
for row in reversed(rows):
|
||
logs.append({
|
||
'batch_id': row['batch_id'],
|
||
'history_id': row['history_id'],
|
||
'timestamp': row['timestamp'],
|
||
'level': row['level'],
|
||
'message': row['message'],
|
||
'query_type': row['query_type']
|
||
})
|
||
|
||
conn.close()
|
||
return logs
|
||
except Exception as e:
|
||
print(f"Warning: Failed to get logs from database: {e}")
|
||
# 如果数据库读取失败,返回内存中的日志
|
||
return self.get_logs(limit, from_db=False)
|
||
|
||
def _get_total_logs_count(self):
|
||
"""获取数据库中的日志总数"""
|
||
try:
|
||
conn = sqlite3.connect(self.db_path, timeout=30)
|
||
cursor = conn.cursor()
|
||
cursor.execute('SELECT COUNT(*) FROM query_logs')
|
||
count = cursor.fetchone()[0]
|
||
conn.close()
|
||
return count
|
||
except Exception as e:
|
||
print(f"Warning: Failed to get logs count from database: {e}")
|
||
return len(self.logs)
|
||
|
||
def get_logs_by_history_id(self, history_id):
|
||
"""根据历史记录ID获取相关日志"""
|
||
try:
|
||
conn = sqlite3.connect(self.db_path, timeout=30)
|
||
conn.row_factory = sqlite3.Row
|
||
cursor = conn.cursor()
|
||
|
||
cursor.execute('''
|
||
SELECT batch_id, history_id, timestamp, level, message, query_type
|
||
FROM query_logs
|
||
WHERE history_id = ?
|
||
ORDER BY id ASC
|
||
''', (history_id,))
|
||
|
||
rows = cursor.fetchall()
|
||
logs = []
|
||
for row in rows:
|
||
logs.append({
|
||
'batch_id': row['batch_id'],
|
||
'history_id': row['history_id'],
|
||
'timestamp': row['timestamp'],
|
||
'level': row['level'],
|
||
'message': row['message'],
|
||
'query_type': row['query_type']
|
||
})
|
||
|
||
conn.close()
|
||
return logs
|
||
except Exception as e:
|
||
print(f"Warning: Failed to get logs by history_id: {e}")
|
||
return []
|
||
|
||
def get_logs_grouped_by_batch(self, limit=None, from_db=True):
|
||
"""按批次分组获取日志"""
|
||
logs = self.get_logs(limit, from_db)
|
||
grouped_logs = {}
|
||
batch_order = []
|
||
|
||
for log in logs:
|
||
batch_id = log.get('batch_id', 'unknown')
|
||
if batch_id not in grouped_logs:
|
||
grouped_logs[batch_id] = []
|
||
batch_order.append(batch_id)
|
||
grouped_logs[batch_id].append(log)
|
||
|
||
# 返回按时间顺序排列的批次
|
||
return [(batch_id, grouped_logs[batch_id]) for batch_id in batch_order]
|
||
|
||
def clear_logs(self, clear_db=True):
|
||
"""清空日志"""
|
||
# 清空内存
|
||
self.logs.clear()
|
||
self.current_batch_id = None
|
||
self.batch_counter = 0
|
||
|
||
# 清空数据库
|
||
if clear_db:
|
||
try:
|
||
conn = sqlite3.connect(self.db_path, timeout=30)
|
||
cursor = conn.cursor()
|
||
cursor.execute('DELETE FROM query_logs')
|
||
conn.commit()
|
||
conn.close()
|
||
except Exception as e:
|
||
print(f"Warning: Failed to clear logs from database: {e}")
|
||
|
||
def cleanup_old_logs(self, days_to_keep=30):
|
||
"""清理旧日志,保留指定天数的日志"""
|
||
try:
|
||
conn = sqlite3.connect(self.db_path, timeout=30)
|
||
cursor = conn.cursor()
|
||
|
||
# 删除超过指定天数的日志
|
||
cutoff_date = datetime.now() - timedelta(days=days_to_keep)
|
||
cursor.execute('''
|
||
DELETE FROM query_logs
|
||
WHERE created_at < ?
|
||
''', (cutoff_date.strftime('%Y-%m-%d %H:%M:%S'),))
|
||
|
||
deleted_count = cursor.rowcount
|
||
conn.commit()
|
||
conn.close()
|
||
|
||
logger.info(f"清理了 {deleted_count} 条超过 {days_to_keep} 天的旧日志")
|
||
return deleted_count
|
||
except Exception as e:
|
||
logger.error(f"清理旧日志失败: {e}")
|
||
return 0
|
||
|
||
# 自定义日志处理器
|
||
class CollectorHandler(logging.Handler):
|
||
def __init__(self, collector):
|
||
super().__init__()
|
||
self.collector = collector
|
||
|
||
def emit(self, record):
|
||
self.collector.add_log(record.levelname, record.getMessage()) |