318 lines
12 KiB
Python
318 lines
12 KiB
Python
"""
|
||
数据库管理模块
|
||
==============
|
||
|
||
本模块负责BigDataTool项目的SQLite数据库管理,包括:
|
||
|
||
核心功能:
|
||
1. 数据库初始化和表结构创建
|
||
2. 数据库连接管理和事务处理
|
||
3. 表结构版本控制和字段动态添加
|
||
4. 数据库完整性检查和自动修复
|
||
|
||
数据表结构:
|
||
- config_groups: 配置组管理(Cassandra/Redis连接配置)
|
||
- query_history: 查询历史记录(单表/分表/Redis查询)
|
||
- sharding_config_groups: 分表配置组(TWCS分表参数)
|
||
- query_logs: 查询日志(实时操作日志和性能监控)
|
||
- redis_config_groups: Redis配置组(集群连接配置)
|
||
- redis_query_history: Redis查询历史(Redis数据比对记录)
|
||
|
||
设计特点:
|
||
- 自动化表结构管理:支持字段动态添加和版本升级
|
||
- 向后兼容性:确保旧版本数据的正常访问
|
||
- 错误恢复:数据库损坏时自动重建表结构
|
||
- 索引优化:为查询性能优化的索引设计
|
||
|
||
使用方式:
|
||
- ensure_database(): 确保数据库和表结构存在
|
||
- get_db_connection(): 获取标准的数据库连接
|
||
- init_database(): 手动初始化数据库(通常自动调用)
|
||
|
||
作者:BigDataTool项目组
|
||
更新时间:2024年8月
|
||
"""
|
||
|
||
import sqlite3
|
||
import json
|
||
import os
|
||
import logging
|
||
from datetime import datetime
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
DATABASE_PATH = 'config_groups.db'
|
||
|
||
def init_database():
|
||
"""
|
||
初始化SQLite数据库和所有必要的表结构
|
||
|
||
创建以下数据表:
|
||
1. config_groups - Cassandra配置组存储
|
||
2. query_history - 查询历史记录存储
|
||
3. sharding_config_groups - 分表配置组存储
|
||
4. query_logs - 查询日志存储
|
||
5. redis_config_groups - Redis配置组存储
|
||
6. redis_query_history - Redis查询历史存储
|
||
|
||
同时创建必要的索引以优化查询性能。
|
||
|
||
Returns:
|
||
bool: 初始化成功返回True,失败返回False
|
||
|
||
注意:
|
||
- 使用IF NOT EXISTS确保重复调用安全
|
||
- 自动创建性能优化索引
|
||
- 支持外键约束和级联删除
|
||
"""
|
||
try:
|
||
conn = sqlite3.connect(DATABASE_PATH)
|
||
cursor = conn.cursor()
|
||
|
||
# 创建配置组表
|
||
cursor.execute('''
|
||
CREATE TABLE IF NOT EXISTS config_groups (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
name TEXT NOT NULL UNIQUE,
|
||
description TEXT,
|
||
pro_config TEXT NOT NULL,
|
||
test_config TEXT NOT NULL,
|
||
query_config TEXT NOT NULL,
|
||
sharding_config TEXT,
|
||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
||
)
|
||
''')
|
||
|
||
# 创建查询历史表,包含分表配置字段
|
||
cursor.execute('''
|
||
CREATE TABLE IF NOT EXISTS query_history (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
name TEXT NOT NULL,
|
||
description TEXT,
|
||
pro_config TEXT NOT NULL,
|
||
test_config TEXT NOT NULL,
|
||
query_config TEXT NOT NULL,
|
||
query_keys TEXT NOT NULL,
|
||
results_summary TEXT NOT NULL,
|
||
execution_time REAL NOT NULL,
|
||
total_keys INTEGER NOT NULL,
|
||
differences_count INTEGER NOT NULL,
|
||
identical_count INTEGER NOT NULL,
|
||
sharding_config TEXT,
|
||
query_type TEXT DEFAULT 'single',
|
||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
||
)
|
||
''')
|
||
|
||
# 创建分表配置组表
|
||
cursor.execute('''
|
||
CREATE TABLE IF NOT EXISTS sharding_config_groups (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
name TEXT NOT NULL UNIQUE,
|
||
description TEXT,
|
||
pro_config TEXT NOT NULL,
|
||
test_config TEXT NOT NULL,
|
||
query_config TEXT NOT NULL,
|
||
sharding_config TEXT NOT NULL,
|
||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
||
)
|
||
''')
|
||
|
||
# 创建查询日志表
|
||
cursor.execute('''
|
||
CREATE TABLE IF NOT EXISTS query_logs (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
batch_id TEXT NOT NULL,
|
||
history_id INTEGER,
|
||
timestamp TEXT NOT NULL,
|
||
level TEXT NOT NULL,
|
||
message TEXT NOT NULL,
|
||
query_type TEXT DEFAULT 'single',
|
||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||
FOREIGN KEY (history_id) REFERENCES query_history (id) ON DELETE CASCADE
|
||
)
|
||
''')
|
||
|
||
# 创建Redis配置组表
|
||
cursor.execute('''
|
||
CREATE TABLE IF NOT EXISTS redis_config_groups (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
name TEXT NOT NULL UNIQUE,
|
||
description TEXT,
|
||
cluster1_config TEXT NOT NULL,
|
||
cluster2_config TEXT NOT NULL,
|
||
query_options TEXT NOT NULL,
|
||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
||
)
|
||
''')
|
||
|
||
# 创建Redis查询历史表
|
||
cursor.execute('''
|
||
CREATE TABLE IF NOT EXISTS redis_query_history (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
name TEXT NOT NULL,
|
||
description TEXT,
|
||
cluster1_config TEXT NOT NULL,
|
||
cluster2_config TEXT NOT NULL,
|
||
query_options TEXT NOT NULL,
|
||
query_keys TEXT NOT NULL,
|
||
results_summary TEXT NOT NULL,
|
||
execution_time REAL NOT NULL,
|
||
total_keys INTEGER NOT NULL,
|
||
different_count INTEGER NOT NULL,
|
||
identical_count INTEGER NOT NULL,
|
||
missing_count INTEGER NOT NULL,
|
||
raw_results TEXT,
|
||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
||
)
|
||
''')
|
||
|
||
# 创建索引
|
||
cursor.execute('CREATE INDEX IF NOT EXISTS idx_query_logs_batch_id ON query_logs(batch_id)')
|
||
cursor.execute('CREATE INDEX IF NOT EXISTS idx_query_logs_history_id ON query_logs(history_id)')
|
||
cursor.execute('CREATE INDEX IF NOT EXISTS idx_query_logs_timestamp ON query_logs(timestamp)')
|
||
cursor.execute('CREATE INDEX IF NOT EXISTS idx_query_logs_level ON query_logs(level)')
|
||
|
||
conn.commit()
|
||
conn.close()
|
||
logger.info("数据库初始化完成")
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"数据库初始化失败: {e}")
|
||
return False
|
||
|
||
def ensure_database():
|
||
"""
|
||
确保数据库文件和表结构完整存在
|
||
|
||
执行以下检查和操作:
|
||
1. 检查数据库文件是否存在,不存在则创建
|
||
2. 验证所有必要表是否存在,缺失则重建
|
||
3. 检查表结构是否完整,缺少字段则动态添加
|
||
4. 确保索引完整性
|
||
|
||
支持的表结构升级:
|
||
- config_groups表:添加sharding_config字段
|
||
- query_history表:添加sharding_config、query_type、raw_results等字段
|
||
- query_logs表:添加history_id外键字段
|
||
|
||
Returns:
|
||
bool: 数据库就绪返回True,初始化失败返回False
|
||
|
||
特性:
|
||
- 向后兼容:支持从旧版本数据库升级
|
||
- 自动修复:检测到问题时自动重建
|
||
- 零停机:升级过程不影响现有数据
|
||
"""
|
||
if not os.path.exists(DATABASE_PATH):
|
||
logger.info("数据库文件不存在,正在创建...")
|
||
return init_database()
|
||
|
||
# 检查表是否存在
|
||
try:
|
||
conn = sqlite3.connect(DATABASE_PATH)
|
||
cursor = conn.cursor()
|
||
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name IN ('config_groups', 'query_history', 'sharding_config_groups', 'query_logs', 'redis_config_groups', 'redis_query_history')")
|
||
results = cursor.fetchall()
|
||
existing_tables = [row[0] for row in results]
|
||
|
||
required_tables = ['config_groups', 'query_history', 'sharding_config_groups', 'query_logs', 'redis_config_groups', 'redis_query_history']
|
||
missing_tables = [table for table in required_tables if table not in existing_tables]
|
||
|
||
if missing_tables:
|
||
logger.info(f"数据库表不完整,缺少表:{missing_tables},正在重新创建...")
|
||
return init_database()
|
||
|
||
# 检查config_groups表是否有sharding_config字段
|
||
cursor.execute("PRAGMA table_info(config_groups)")
|
||
columns = cursor.fetchall()
|
||
column_names = [column[1] for column in columns]
|
||
|
||
if 'sharding_config' not in column_names:
|
||
logger.info("添加sharding_config字段到config_groups表...")
|
||
cursor.execute("ALTER TABLE config_groups ADD COLUMN sharding_config TEXT")
|
||
conn.commit()
|
||
logger.info("sharding_config字段添加成功")
|
||
|
||
# 检查query_history表是否有分表相关字段
|
||
cursor.execute("PRAGMA table_info(query_history)")
|
||
history_columns = cursor.fetchall()
|
||
history_column_names = [column[1] for column in history_columns]
|
||
|
||
if 'sharding_config' not in history_column_names:
|
||
logger.info("添加sharding_config字段到query_history表...")
|
||
cursor.execute("ALTER TABLE query_history ADD COLUMN sharding_config TEXT")
|
||
conn.commit()
|
||
logger.info("query_history表sharding_config字段添加成功")
|
||
|
||
if 'query_type' not in history_column_names:
|
||
logger.info("添加query_type字段到query_history表...")
|
||
cursor.execute("ALTER TABLE query_history ADD COLUMN query_type TEXT DEFAULT 'single'")
|
||
conn.commit()
|
||
logger.info("query_history表query_type字段添加成功")
|
||
|
||
# 添加查询结果数据存储字段
|
||
if 'raw_results' not in history_column_names:
|
||
logger.info("添加raw_results字段到query_history表...")
|
||
cursor.execute("ALTER TABLE query_history ADD COLUMN raw_results TEXT")
|
||
conn.commit()
|
||
logger.info("query_history表raw_results字段添加成功")
|
||
|
||
if 'differences_data' not in history_column_names:
|
||
logger.info("添加differences_data字段到query_history表...")
|
||
cursor.execute("ALTER TABLE query_history ADD COLUMN differences_data TEXT")
|
||
conn.commit()
|
||
logger.info("query_history表differences_data字段添加成功")
|
||
|
||
if 'identical_data' not in history_column_names:
|
||
logger.info("添加identical_data字段到query_history表...")
|
||
cursor.execute("ALTER TABLE query_history ADD COLUMN identical_data TEXT")
|
||
conn.commit()
|
||
logger.info("query_history表identical_data字段添加成功")
|
||
|
||
# 检查query_logs表是否存在history_id字段
|
||
cursor.execute("PRAGMA table_info(query_logs)")
|
||
logs_columns = cursor.fetchall()
|
||
logs_column_names = [column[1] for column in logs_columns]
|
||
|
||
if 'history_id' not in logs_column_names:
|
||
logger.info("添加history_id字段到query_logs表...")
|
||
cursor.execute("ALTER TABLE query_logs ADD COLUMN history_id INTEGER")
|
||
# 创建外键索引
|
||
cursor.execute('CREATE INDEX IF NOT EXISTS idx_query_logs_history_id ON query_logs(history_id)')
|
||
conn.commit()
|
||
logger.info("query_logs表history_id字段添加成功")
|
||
|
||
conn.close()
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"检查数据库表失败: {e}")
|
||
return init_database()
|
||
|
||
def get_db_connection():
|
||
"""
|
||
获取配置好的SQLite数据库连接
|
||
|
||
返回一个配置了Row工厂的数据库连接,支持:
|
||
- 字典式访问查询结果(row['column_name'])
|
||
- 自动类型转换
|
||
- 标准的SQLite连接功能
|
||
|
||
Returns:
|
||
sqlite3.Connection: 配置好的数据库连接对象
|
||
|
||
使用示例:
|
||
conn = get_db_connection()
|
||
cursor = conn.cursor()
|
||
cursor.execute("SELECT * FROM config_groups")
|
||
rows = cursor.fetchall()
|
||
for row in rows:
|
||
print(row['name']) # 字典式访问
|
||
conn.close()
|
||
"""
|
||
conn = sqlite3.connect(DATABASE_PATH)
|
||
conn.row_factory = sqlite3.Row
|
||
return conn |