""" 查询日志管理模块 ================ 本模块提供BigDataTool的完整查询日志管理功能,支持实时日志收集和历史日志分析。 核心功能: 1. 实时日志收集:自动收集所有查询操作的详细日志 2. 批次管理:按查询批次组织日志,便于追踪完整的查询流程 3. 双重存储:内存缓存 + SQLite持久化存储 4. 历史关联:将日志与查询历史记录关联,支持完整的操作回溯 5. 性能监控:记录查询时间、记录数等性能指标 日志收集特性: - 多级日志:支持INFO、WARNING、ERROR等日志级别 - 批次追踪:每个查询批次分配唯一ID,便于日志分组 - 时间戳:精确到毫秒的时间戳记录 - 查询类型:区分单表、分表、Redis等不同查询类型 - 历史关联:支持日志与查询历史记录的双向关联 存储策略: - 内存缓存:最近的日志保存在内存中,支持快速访问 - 数据库持久化:所有日志自动保存到SQLite数据库 - 容量控制:内存缓存有容量限制,自动清理旧日志 - 事务安全:数据库写入失败不影响程序运行 查询和分析: - 按批次查询:支持按查询批次获取相关日志 - 按历史记录查询:支持按历史记录ID获取相关日志 - 分页支持:大量日志的分页显示 - 时间范围:支持按时间范围筛选日志 - 日志清理:支持按时间清理旧日志 作者:BigDataTool项目组 更新时间:2024年8月 """ import sqlite3 import logging from datetime import datetime, timedelta from .database import DATABASE_PATH logger = logging.getLogger(__name__) class QueryLogCollector: def __init__(self, max_logs=1000, db_path=None): self.logs = [] # 内存中的日志缓存 self.max_logs = max_logs self.current_batch_id = None self.batch_counter = 0 self.current_query_type = 'single' self.current_history_id = None # 当前关联的历史记录ID self.db_path = db_path or DATABASE_PATH def start_new_batch(self, query_type='single'): """开始新的查询批次""" self.batch_counter += 1 self.current_batch_id = f"batch_{self.batch_counter}_{datetime.now().strftime('%H%M%S')}" self.current_query_type = query_type self.current_history_id = None # 重置历史记录ID # 添加批次开始标记 self.add_log('INFO', f"=== 开始{query_type}查询批次 (ID: {self.current_batch_id}) ===", force_batch_id=self.current_batch_id) return self.current_batch_id def set_history_id(self, history_id): """设置当前批次关联的历史记录ID""" self.current_history_id = history_id if self.current_batch_id and history_id: self.add_log('INFO', f"关联历史记录ID: {history_id}", force_batch_id=self.current_batch_id) # 更新当前批次的所有日志记录的history_id self._update_batch_history_id(self.current_batch_id, history_id) def _update_batch_history_id(self, batch_id, history_id): """更新批次中所有日志的history_id""" try: conn = sqlite3.connect(self.db_path, timeout=30) cursor = conn.cursor() cursor.execute(''' UPDATE query_logs SET history_id = ? WHERE batch_id = ? ''', (history_id, batch_id)) conn.commit() conn.close() logger.info(f"已更新批次 {batch_id} 的历史记录关联到 {history_id}") except Exception as e: print(f"Warning: Failed to update batch history_id: {e}") def end_current_batch(self): """结束当前查询批次""" if self.current_batch_id: self.add_log('INFO', f"=== 查询批次完成 (ID: {self.current_batch_id}) ===", force_batch_id=self.current_batch_id) self.current_batch_id = None self.current_history_id = None def add_log(self, level, message, force_batch_id=None, force_query_type=None, force_history_id=None): """添加日志到内存和数据库""" timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] batch_id = force_batch_id or self.current_batch_id query_type = force_query_type or self.current_query_type history_id = force_history_id or self.current_history_id log_entry = { 'timestamp': timestamp, 'level': level, 'message': message, 'batch_id': batch_id, 'query_type': query_type, 'history_id': history_id } # 添加到内存缓存 self.logs.append(log_entry) if len(self.logs) > self.max_logs: self.logs.pop(0) # 保存到数据库 self._save_log_to_db(log_entry) def _save_log_to_db(self, log_entry): """将日志保存到数据库""" try: conn = sqlite3.connect(self.db_path, timeout=30) cursor = conn.cursor() cursor.execute(''' INSERT INTO query_logs (batch_id, history_id, timestamp, level, message, query_type) VALUES (?, ?, ?, ?, ?, ?) ''', ( log_entry['batch_id'], log_entry['history_id'], log_entry['timestamp'], log_entry['level'], log_entry['message'], log_entry['query_type'] )) conn.commit() conn.close() except Exception as e: # 数据库写入失败时记录到控制台,但不影响程序运行 print(f"Warning: Failed to save log to database: {e}") def get_logs(self, limit=None, from_db=True): """获取日志,支持从数据库或内存获取""" if from_db: return self._get_logs_from_db(limit) else: # 从内存获取 if limit: return self.logs[-limit:] return self.logs def _get_logs_from_db(self, limit=None): """从数据库获取日志""" try: conn = sqlite3.connect(self.db_path, timeout=30) conn.row_factory = sqlite3.Row cursor = conn.cursor() query = ''' SELECT batch_id, history_id, timestamp, level, message, query_type FROM query_logs ORDER BY id DESC ''' if limit: query += f' LIMIT {limit}' cursor.execute(query) rows = cursor.fetchall() # 转换为字典格式并反转顺序(最新的在前) logs = [] for row in reversed(rows): logs.append({ 'batch_id': row['batch_id'], 'history_id': row['history_id'], 'timestamp': row['timestamp'], 'level': row['level'], 'message': row['message'], 'query_type': row['query_type'] }) conn.close() return logs except Exception as e: print(f"Warning: Failed to get logs from database: {e}") # 如果数据库读取失败,返回内存中的日志 return self.get_logs(limit, from_db=False) def _get_total_logs_count(self): """获取数据库中的日志总数""" try: conn = sqlite3.connect(self.db_path, timeout=30) cursor = conn.cursor() cursor.execute('SELECT COUNT(*) FROM query_logs') count = cursor.fetchone()[0] conn.close() return count except Exception as e: print(f"Warning: Failed to get logs count from database: {e}") return len(self.logs) def get_logs_by_history_id(self, history_id): """根据历史记录ID获取相关日志""" try: conn = sqlite3.connect(self.db_path, timeout=30) conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute(''' SELECT batch_id, history_id, timestamp, level, message, query_type FROM query_logs WHERE history_id = ? ORDER BY id ASC ''', (history_id,)) rows = cursor.fetchall() logs = [] for row in rows: logs.append({ 'batch_id': row['batch_id'], 'history_id': row['history_id'], 'timestamp': row['timestamp'], 'level': row['level'], 'message': row['message'], 'query_type': row['query_type'] }) conn.close() return logs except Exception as e: print(f"Warning: Failed to get logs by history_id: {e}") return [] def get_logs_grouped_by_batch(self, limit=None, from_db=True): """按批次分组获取日志""" logs = self.get_logs(limit, from_db) grouped_logs = {} batch_order = [] for log in logs: batch_id = log.get('batch_id', 'unknown') if batch_id not in grouped_logs: grouped_logs[batch_id] = [] batch_order.append(batch_id) grouped_logs[batch_id].append(log) # 返回按时间顺序排列的批次 return [(batch_id, grouped_logs[batch_id]) for batch_id in batch_order] def clear_logs(self, clear_db=True): """清空日志""" # 清空内存 self.logs.clear() self.current_batch_id = None self.batch_counter = 0 # 清空数据库 if clear_db: try: conn = sqlite3.connect(self.db_path, timeout=30) cursor = conn.cursor() cursor.execute('DELETE FROM query_logs') conn.commit() conn.close() except Exception as e: print(f"Warning: Failed to clear logs from database: {e}") def cleanup_old_logs(self, days_to_keep=30): """清理旧日志,保留指定天数的日志""" try: conn = sqlite3.connect(self.db_path, timeout=30) cursor = conn.cursor() # 删除超过指定天数的日志 cutoff_date = datetime.now() - timedelta(days=days_to_keep) cursor.execute(''' DELETE FROM query_logs WHERE created_at < ? ''', (cutoff_date.strftime('%Y-%m-%d %H:%M:%S'),)) deleted_count = cursor.rowcount conn.commit() conn.close() logger.info(f"清理了 {deleted_count} 条超过 {days_to_keep} 天的旧日志") return deleted_count except Exception as e: logger.error(f"清理旧日志失败: {e}") return 0 # 自定义日志处理器 class CollectorHandler(logging.Handler): def __init__(self, collector): super().__init__() self.collector = collector def emit(self, record): self.collector.add_log(record.levelname, record.getMessage())