Files
BigDataTool/modules/database.py
2025-08-05 11:23:49 +08:00

318 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
数据库管理模块
==============
本模块负责BigDataTool项目的SQLite数据库管理包括
核心功能:
1. 数据库初始化和表结构创建
2. 数据库连接管理和事务处理
3. 表结构版本控制和字段动态添加
4. 数据库完整性检查和自动修复
数据表结构:
- config_groups: 配置组管理Cassandra/Redis连接配置
- query_history: 查询历史记录(单表/分表/Redis查询
- sharding_config_groups: 分表配置组TWCS分表参数
- query_logs: 查询日志(实时操作日志和性能监控)
- redis_config_groups: Redis配置组集群连接配置
- redis_query_history: Redis查询历史Redis数据比对记录
设计特点:
- 自动化表结构管理:支持字段动态添加和版本升级
- 向后兼容性:确保旧版本数据的正常访问
- 错误恢复:数据库损坏时自动重建表结构
- 索引优化:为查询性能优化的索引设计
使用方式:
- ensure_database(): 确保数据库和表结构存在
- get_db_connection(): 获取标准的数据库连接
- init_database(): 手动初始化数据库(通常自动调用)
作者BigDataTool项目组
更新时间2024年8月
"""
import sqlite3
import json
import os
import logging
from datetime import datetime
logger = logging.getLogger(__name__)
DATABASE_PATH = 'config_groups.db'
def init_database():
"""
初始化SQLite数据库和所有必要的表结构
创建以下数据表:
1. config_groups - Cassandra配置组存储
2. query_history - 查询历史记录存储
3. sharding_config_groups - 分表配置组存储
4. query_logs - 查询日志存储
5. redis_config_groups - Redis配置组存储
6. redis_query_history - Redis查询历史存储
同时创建必要的索引以优化查询性能。
Returns:
bool: 初始化成功返回True失败返回False
注意:
- 使用IF NOT EXISTS确保重复调用安全
- 自动创建性能优化索引
- 支持外键约束和级联删除
"""
try:
conn = sqlite3.connect(DATABASE_PATH)
cursor = conn.cursor()
# 创建配置组表
cursor.execute('''
CREATE TABLE IF NOT EXISTS config_groups (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE,
description TEXT,
pro_config TEXT NOT NULL,
test_config TEXT NOT NULL,
query_config TEXT NOT NULL,
sharding_config TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 创建查询历史表,包含分表配置字段
cursor.execute('''
CREATE TABLE IF NOT EXISTS query_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
description TEXT,
pro_config TEXT NOT NULL,
test_config TEXT NOT NULL,
query_config TEXT NOT NULL,
query_keys TEXT NOT NULL,
results_summary TEXT NOT NULL,
execution_time REAL NOT NULL,
total_keys INTEGER NOT NULL,
differences_count INTEGER NOT NULL,
identical_count INTEGER NOT NULL,
sharding_config TEXT,
query_type TEXT DEFAULT 'single',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 创建分表配置组表
cursor.execute('''
CREATE TABLE IF NOT EXISTS sharding_config_groups (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE,
description TEXT,
pro_config TEXT NOT NULL,
test_config TEXT NOT NULL,
query_config TEXT NOT NULL,
sharding_config TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 创建查询日志表
cursor.execute('''
CREATE TABLE IF NOT EXISTS query_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
batch_id TEXT NOT NULL,
history_id INTEGER,
timestamp TEXT NOT NULL,
level TEXT NOT NULL,
message TEXT NOT NULL,
query_type TEXT DEFAULT 'single',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (history_id) REFERENCES query_history (id) ON DELETE CASCADE
)
''')
# 创建Redis配置组表
cursor.execute('''
CREATE TABLE IF NOT EXISTS redis_config_groups (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE,
description TEXT,
cluster1_config TEXT NOT NULL,
cluster2_config TEXT NOT NULL,
query_options TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 创建Redis查询历史表
cursor.execute('''
CREATE TABLE IF NOT EXISTS redis_query_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
description TEXT,
cluster1_config TEXT NOT NULL,
cluster2_config TEXT NOT NULL,
query_options TEXT NOT NULL,
query_keys TEXT NOT NULL,
results_summary TEXT NOT NULL,
execution_time REAL NOT NULL,
total_keys INTEGER NOT NULL,
different_count INTEGER NOT NULL,
identical_count INTEGER NOT NULL,
missing_count INTEGER NOT NULL,
raw_results TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 创建索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_query_logs_batch_id ON query_logs(batch_id)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_query_logs_history_id ON query_logs(history_id)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_query_logs_timestamp ON query_logs(timestamp)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_query_logs_level ON query_logs(level)')
conn.commit()
conn.close()
logger.info("数据库初始化完成")
return True
except Exception as e:
logger.error(f"数据库初始化失败: {e}")
return False
def ensure_database():
"""
确保数据库文件和表结构完整存在
执行以下检查和操作:
1. 检查数据库文件是否存在,不存在则创建
2. 验证所有必要表是否存在,缺失则重建
3. 检查表结构是否完整,缺少字段则动态添加
4. 确保索引完整性
支持的表结构升级:
- config_groups表添加sharding_config字段
- query_history表添加sharding_config、query_type、raw_results等字段
- query_logs表添加history_id外键字段
Returns:
bool: 数据库就绪返回True初始化失败返回False
特性:
- 向后兼容:支持从旧版本数据库升级
- 自动修复:检测到问题时自动重建
- 零停机:升级过程不影响现有数据
"""
if not os.path.exists(DATABASE_PATH):
logger.info("数据库文件不存在,正在创建...")
return init_database()
# 检查表是否存在
try:
conn = sqlite3.connect(DATABASE_PATH)
cursor = conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name IN ('config_groups', 'query_history', 'sharding_config_groups', 'query_logs', 'redis_config_groups', 'redis_query_history')")
results = cursor.fetchall()
existing_tables = [row[0] for row in results]
required_tables = ['config_groups', 'query_history', 'sharding_config_groups', 'query_logs', 'redis_config_groups', 'redis_query_history']
missing_tables = [table for table in required_tables if table not in existing_tables]
if missing_tables:
logger.info(f"数据库表不完整,缺少表:{missing_tables},正在重新创建...")
return init_database()
# 检查config_groups表是否有sharding_config字段
cursor.execute("PRAGMA table_info(config_groups)")
columns = cursor.fetchall()
column_names = [column[1] for column in columns]
if 'sharding_config' not in column_names:
logger.info("添加sharding_config字段到config_groups表...")
cursor.execute("ALTER TABLE config_groups ADD COLUMN sharding_config TEXT")
conn.commit()
logger.info("sharding_config字段添加成功")
# 检查query_history表是否有分表相关字段
cursor.execute("PRAGMA table_info(query_history)")
history_columns = cursor.fetchall()
history_column_names = [column[1] for column in history_columns]
if 'sharding_config' not in history_column_names:
logger.info("添加sharding_config字段到query_history表...")
cursor.execute("ALTER TABLE query_history ADD COLUMN sharding_config TEXT")
conn.commit()
logger.info("query_history表sharding_config字段添加成功")
if 'query_type' not in history_column_names:
logger.info("添加query_type字段到query_history表...")
cursor.execute("ALTER TABLE query_history ADD COLUMN query_type TEXT DEFAULT 'single'")
conn.commit()
logger.info("query_history表query_type字段添加成功")
# 添加查询结果数据存储字段
if 'raw_results' not in history_column_names:
logger.info("添加raw_results字段到query_history表...")
cursor.execute("ALTER TABLE query_history ADD COLUMN raw_results TEXT")
conn.commit()
logger.info("query_history表raw_results字段添加成功")
if 'differences_data' not in history_column_names:
logger.info("添加differences_data字段到query_history表...")
cursor.execute("ALTER TABLE query_history ADD COLUMN differences_data TEXT")
conn.commit()
logger.info("query_history表differences_data字段添加成功")
if 'identical_data' not in history_column_names:
logger.info("添加identical_data字段到query_history表...")
cursor.execute("ALTER TABLE query_history ADD COLUMN identical_data TEXT")
conn.commit()
logger.info("query_history表identical_data字段添加成功")
# 检查query_logs表是否存在history_id字段
cursor.execute("PRAGMA table_info(query_logs)")
logs_columns = cursor.fetchall()
logs_column_names = [column[1] for column in logs_columns]
if 'history_id' not in logs_column_names:
logger.info("添加history_id字段到query_logs表...")
cursor.execute("ALTER TABLE query_logs ADD COLUMN history_id INTEGER")
# 创建外键索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_query_logs_history_id ON query_logs(history_id)')
conn.commit()
logger.info("query_logs表history_id字段添加成功")
conn.close()
return True
except Exception as e:
logger.error(f"检查数据库表失败: {e}")
return init_database()
def get_db_connection():
"""
获取配置好的SQLite数据库连接
返回一个配置了Row工厂的数据库连接支持
- 字典式访问查询结果row['column_name']
- 自动类型转换
- 标准的SQLite连接功能
Returns:
sqlite3.Connection: 配置好的数据库连接对象
使用示例:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("SELECT * FROM config_groups")
rows = cursor.fetchall()
for row in rows:
print(row['name']) # 字典式访问
conn.close()
"""
conn = sqlite3.connect(DATABASE_PATH)
conn.row_factory = sqlite3.Row
return conn