""" 数据库管理模块 ============== 本模块负责BigDataTool项目的SQLite数据库管理,包括: 核心功能: 1. 数据库初始化和表结构创建 2. 数据库连接管理和事务处理 3. 表结构版本控制和字段动态添加 4. 数据库完整性检查和自动修复 数据表结构: - config_groups: 配置组管理(Cassandra/Redis连接配置) - query_history: 查询历史记录(单表/分表/Redis查询) - sharding_config_groups: 分表配置组(TWCS分表参数) - query_logs: 查询日志(实时操作日志和性能监控) - redis_config_groups: Redis配置组(集群连接配置) - redis_query_history: Redis查询历史(Redis数据比对记录) 设计特点: - 自动化表结构管理:支持字段动态添加和版本升级 - 向后兼容性:确保旧版本数据的正常访问 - 错误恢复:数据库损坏时自动重建表结构 - 索引优化:为查询性能优化的索引设计 使用方式: - ensure_database(): 确保数据库和表结构存在 - get_db_connection(): 获取标准的数据库连接 - init_database(): 手动初始化数据库(通常自动调用) 作者:BigDataTool项目组 更新时间:2024年8月 """ import sqlite3 import json import os import logging from datetime import datetime logger = logging.getLogger(__name__) DATABASE_PATH = 'config_groups.db' def init_database(): """ 初始化SQLite数据库和所有必要的表结构 创建以下数据表: 1. config_groups - Cassandra配置组存储 2. query_history - 查询历史记录存储 3. sharding_config_groups - 分表配置组存储 4. query_logs - 查询日志存储 5. redis_config_groups - Redis配置组存储 6. redis_query_history - Redis查询历史存储 同时创建必要的索引以优化查询性能。 Returns: bool: 初始化成功返回True,失败返回False 注意: - 使用IF NOT EXISTS确保重复调用安全 - 自动创建性能优化索引 - 支持外键约束和级联删除 """ try: conn = sqlite3.connect(DATABASE_PATH) cursor = conn.cursor() # 创建配置组表 cursor.execute(''' CREATE TABLE IF NOT EXISTS config_groups ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL UNIQUE, description TEXT, pro_config TEXT NOT NULL, test_config TEXT NOT NULL, query_config TEXT NOT NULL, sharding_config TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') # 创建查询历史表,包含分表配置字段 cursor.execute(''' CREATE TABLE IF NOT EXISTS query_history ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, description TEXT, pro_config TEXT NOT NULL, test_config TEXT NOT NULL, query_config TEXT NOT NULL, query_keys TEXT NOT NULL, results_summary TEXT NOT NULL, execution_time REAL NOT NULL, total_keys INTEGER NOT NULL, differences_count INTEGER NOT NULL, identical_count INTEGER NOT NULL, sharding_config TEXT, query_type TEXT DEFAULT 'single', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') # 创建分表配置组表 cursor.execute(''' CREATE TABLE IF NOT EXISTS sharding_config_groups ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL UNIQUE, description TEXT, pro_config TEXT NOT NULL, test_config TEXT NOT NULL, query_config TEXT NOT NULL, sharding_config TEXT NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') # 创建查询日志表 cursor.execute(''' CREATE TABLE IF NOT EXISTS query_logs ( id INTEGER PRIMARY KEY AUTOINCREMENT, batch_id TEXT NOT NULL, history_id INTEGER, timestamp TEXT NOT NULL, level TEXT NOT NULL, message TEXT NOT NULL, query_type TEXT DEFAULT 'single', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (history_id) REFERENCES query_history (id) ON DELETE CASCADE ) ''') # 创建Redis配置组表 cursor.execute(''' CREATE TABLE IF NOT EXISTS redis_config_groups ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL UNIQUE, description TEXT, cluster1_config TEXT NOT NULL, cluster2_config TEXT NOT NULL, query_options TEXT NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') # 创建Redis查询历史表 cursor.execute(''' CREATE TABLE IF NOT EXISTS redis_query_history ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, description TEXT, cluster1_config TEXT NOT NULL, cluster2_config TEXT NOT NULL, query_options TEXT NOT NULL, query_keys TEXT NOT NULL, results_summary TEXT NOT NULL, execution_time REAL NOT NULL, total_keys INTEGER NOT NULL, different_count INTEGER NOT NULL, identical_count INTEGER NOT NULL, missing_count INTEGER NOT NULL, raw_results TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') # 创建索引 cursor.execute('CREATE INDEX IF NOT EXISTS idx_query_logs_batch_id ON query_logs(batch_id)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_query_logs_history_id ON query_logs(history_id)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_query_logs_timestamp ON query_logs(timestamp)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_query_logs_level ON query_logs(level)') conn.commit() conn.close() logger.info("数据库初始化完成") return True except Exception as e: logger.error(f"数据库初始化失败: {e}") return False def ensure_database(): """ 确保数据库文件和表结构完整存在 执行以下检查和操作: 1. 检查数据库文件是否存在,不存在则创建 2. 验证所有必要表是否存在,缺失则重建 3. 检查表结构是否完整,缺少字段则动态添加 4. 确保索引完整性 支持的表结构升级: - config_groups表:添加sharding_config字段 - query_history表:添加sharding_config、query_type、raw_results等字段 - query_logs表:添加history_id外键字段 Returns: bool: 数据库就绪返回True,初始化失败返回False 特性: - 向后兼容:支持从旧版本数据库升级 - 自动修复:检测到问题时自动重建 - 零停机:升级过程不影响现有数据 """ if not os.path.exists(DATABASE_PATH): logger.info("数据库文件不存在,正在创建...") return init_database() # 检查表是否存在 try: conn = sqlite3.connect(DATABASE_PATH) cursor = conn.cursor() cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name IN ('config_groups', 'query_history', 'sharding_config_groups', 'query_logs', 'redis_config_groups', 'redis_query_history')") results = cursor.fetchall() existing_tables = [row[0] for row in results] required_tables = ['config_groups', 'query_history', 'sharding_config_groups', 'query_logs', 'redis_config_groups', 'redis_query_history'] missing_tables = [table for table in required_tables if table not in existing_tables] if missing_tables: logger.info(f"数据库表不完整,缺少表:{missing_tables},正在重新创建...") return init_database() # 检查config_groups表是否有sharding_config字段 cursor.execute("PRAGMA table_info(config_groups)") columns = cursor.fetchall() column_names = [column[1] for column in columns] if 'sharding_config' not in column_names: logger.info("添加sharding_config字段到config_groups表...") cursor.execute("ALTER TABLE config_groups ADD COLUMN sharding_config TEXT") conn.commit() logger.info("sharding_config字段添加成功") # 检查query_history表是否有分表相关字段 cursor.execute("PRAGMA table_info(query_history)") history_columns = cursor.fetchall() history_column_names = [column[1] for column in history_columns] if 'sharding_config' not in history_column_names: logger.info("添加sharding_config字段到query_history表...") cursor.execute("ALTER TABLE query_history ADD COLUMN sharding_config TEXT") conn.commit() logger.info("query_history表sharding_config字段添加成功") if 'query_type' not in history_column_names: logger.info("添加query_type字段到query_history表...") cursor.execute("ALTER TABLE query_history ADD COLUMN query_type TEXT DEFAULT 'single'") conn.commit() logger.info("query_history表query_type字段添加成功") # 添加查询结果数据存储字段 if 'raw_results' not in history_column_names: logger.info("添加raw_results字段到query_history表...") cursor.execute("ALTER TABLE query_history ADD COLUMN raw_results TEXT") conn.commit() logger.info("query_history表raw_results字段添加成功") if 'differences_data' not in history_column_names: logger.info("添加differences_data字段到query_history表...") cursor.execute("ALTER TABLE query_history ADD COLUMN differences_data TEXT") conn.commit() logger.info("query_history表differences_data字段添加成功") if 'identical_data' not in history_column_names: logger.info("添加identical_data字段到query_history表...") cursor.execute("ALTER TABLE query_history ADD COLUMN identical_data TEXT") conn.commit() logger.info("query_history表identical_data字段添加成功") # 检查query_logs表是否存在history_id字段 cursor.execute("PRAGMA table_info(query_logs)") logs_columns = cursor.fetchall() logs_column_names = [column[1] for column in logs_columns] if 'history_id' not in logs_column_names: logger.info("添加history_id字段到query_logs表...") cursor.execute("ALTER TABLE query_logs ADD COLUMN history_id INTEGER") # 创建外键索引 cursor.execute('CREATE INDEX IF NOT EXISTS idx_query_logs_history_id ON query_logs(history_id)') conn.commit() logger.info("query_logs表history_id字段添加成功") conn.close() return True except Exception as e: logger.error(f"检查数据库表失败: {e}") return init_database() def get_db_connection(): """ 获取配置好的SQLite数据库连接 返回一个配置了Row工厂的数据库连接,支持: - 字典式访问查询结果(row['column_name']) - 自动类型转换 - 标准的SQLite连接功能 Returns: sqlite3.Connection: 配置好的数据库连接对象 使用示例: conn = get_db_connection() cursor = conn.cursor() cursor.execute("SELECT * FROM config_groups") rows = cursor.fetchall() for row in rows: print(row['name']) # 字典式访问 conn.close() """ conn = sqlite3.connect(DATABASE_PATH) conn.row_factory = sqlite3.Row return conn