Files
BigDataTool/docs/database-design.md
2025-08-05 23:27:25 +08:00

19 KiB
Raw Blame History

DataTools Pro 数据库设计文档

1. 数据库概述

1.1 数据库架构

DataTools Pro采用多数据库架构针对不同的数据存储需求选择最适合的数据库技术

数据库架构图:
┌─────────────────────────────────────────────────────────┐
│                DataTools Pro 数据库架构                 │
├─────────────────────────────────────────────────────────┤
│  应用层 (Application Layer)                            │
│  ┌─────────────────────────────────────────────────────┐ │
│  │            Flask应用程序                            │ │
│  └─────────────────────────────────────────────────────┘ │
├─────────────────────────────────────────────────────────┤
│  数据访问层 (Data Access Layer)                        │
│  ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ │
│  │ Cassandra   │ │ Redis       │ │ SQLite              │ │
│  │ Driver      │ │ Client      │ │ Driver              │ │
│  └─────────────┘ └─────────────┘ └─────────────────────┘ │
├─────────────────────────────────────────────────────────┤
│  数据存储层 (Data Storage Layer)                       │
│  ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ │
│  │ Apache      │ │ Redis       │ │ SQLite              │ │
│  │ Cassandra   │ │ Cluster     │ │ 本地文件            │ │
│  │ 集群        │ │             │ │ config_groups.db    │ │
│  │ (外部数据源) │ │ (外部数据源) │ │ (配置和历史数据)    │ │
│  └─────────────┘ └─────────────┘ └─────────────────────┘ │
└─────────────────────────────────────────────────────────┘

1.2 数据库分类

1.2.1 外部数据源

  • Apache Cassandra: 主要的数据比对源,存储业务数据
  • Redis Cluster: 缓存和高性能数据存储,支持多种数据类型

1.2.2 内部存储

  • SQLite: 轻量级关系数据库,存储系统配置、查询历史和日志

1.3 设计原则

  • 数据隔离: 不同类型数据使用不同的存储方案
  • 性能优化: 根据访问模式选择合适的数据库
  • 易维护: 简化数据库管理和备份恢复
  • 扩展性: 支持数据量增长和功能扩展

2. SQLite数据库设计

2.1 数据库文件

  • 文件名: config_groups.db
  • 位置: 项目根目录
  • 编码: UTF-8
  • 版本: SQLite 3.x

2.2 表结构设计

2.2.1 config_groups表 - 配置组管理

CREATE TABLE config_groups (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    name TEXT NOT NULL UNIQUE,                    -- 配置组名称
    description TEXT,                             -- 描述信息
    pro_config TEXT NOT NULL,                     -- 生产环境配置(JSON)
    test_config TEXT NOT NULL,                    -- 测试环境配置(JSON)
    query_config TEXT NOT NULL,                   -- 查询配置(JSON)
    sharding_config TEXT,                         -- 分表配置(JSON)
    created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
    updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
);

-- 索引
CREATE INDEX idx_config_groups_name ON config_groups(name);
CREATE INDEX idx_config_groups_created_at ON config_groups(created_at);

字段详解:

  • id: 主键,自增整数
  • name: 配置组名称,唯一索引
  • description: 配置组描述,可为空
  • pro_config: 生产环境Cassandra配置JSON格式
  • test_config: 测试环境Cassandra配置JSON格式
  • query_config: 查询参数配置JSON格式
  • sharding_config: 分表查询配置JSON格式
  • created_at: 创建时间
  • updated_at: 更新时间

JSON配置示例:

{
    "pro_config": {
        "cluster_name": "production-cluster",
        "datacenter": "datacenter1",
        "hosts": ["10.0.1.100", "10.0.1.101"],
        "port": 9042,
        "username": "cassandra",
        "password": "encrypted_password",
        "keyspace": "production_ks",
        "table": "user_data"
    },
    "query_config": {
        "keys": ["user_id"],
        "fields_to_compare": ["name", "email", "status"],
        "exclude_fields": ["created_at", "updated_at"]
    },
    "sharding_config": {
        "use_sharding_for_pro": true,
        "use_sharding_for_test": false,
        "interval_seconds": 604800,
        "table_count": 14
    }
}

2.2.2 query_history表 - 查询历史记录

CREATE TABLE query_history (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    name TEXT NOT NULL,                           -- 历史记录名称
    description TEXT,                             -- 描述信息
    pro_config TEXT NOT NULL,                     -- 生产环境配置(JSON)
    test_config TEXT NOT NULL,                    -- 测试环境配置(JSON)
    query_config TEXT NOT NULL,                   -- 查询配置(JSON)
    query_keys TEXT NOT NULL,                     -- 查询的键值(JSON Array)
    results_summary TEXT NOT NULL,                -- 结果摘要(JSON)
    execution_time REAL NOT NULL,                 -- 执行时间(秒)
    total_keys INTEGER NOT NULL,                  -- 总Key数量
    differences_count INTEGER NOT NULL,           -- 差异数量
    identical_count INTEGER NOT NULL,             -- 相同数量
    query_type TEXT NOT NULL DEFAULT 'single',    -- 查询类型(single/sharding)
    sharding_config TEXT,                         -- 分表配置(JSON)
    raw_results TEXT,                             -- 完整查询结果(JSON)
    differences_data TEXT,                        -- 差异详细数据(JSON)
    identical_data TEXT,                          -- 相同数据详情(JSON)
    created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);

-- 索引
CREATE INDEX idx_query_history_name ON query_history(name);
CREATE INDEX idx_query_history_query_type ON query_history(query_type);
CREATE INDEX idx_query_history_created_at ON query_history(created_at);
CREATE INDEX idx_query_history_execution_time ON query_history(execution_time);

字段详解:

  • id: 主键,自增整数
  • name: 历史记录名称
  • description: 历史记录描述
  • pro_config: 生产环境配置快照
  • test_config: 测试环境配置快照
  • query_config: 查询配置快照
  • query_keys: 查询的Key值列表
  • results_summary: 查询结果摘要统计
  • execution_time: 查询执行时间
  • total_keys: 查询的总Key数量
  • differences_count: 发现的差异数量
  • identical_count: 相同记录数量
  • query_type: 查询类型(single/sharding)
  • sharding_config: 分表配置(仅分表查询)
  • raw_results: 完整的查询结果数据
  • differences_data: 差异数据详情
  • identical_data: 相同数据详情
  • created_at: 创建时间

2.2.3 query_logs表 - 查询日志记录

CREATE TABLE query_logs (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    batch_id TEXT NOT NULL,                       -- 批次ID(UUID)
    history_id INTEGER,                           -- 关联历史记录ID
    timestamp DATETIME NOT NULL,                  -- 日志时间戳
    level TEXT NOT NULL,                          -- 日志级别(INFO/WARNING/ERROR)
    message TEXT NOT NULL,                        -- 日志消息
    query_type TEXT NOT NULL,                     -- 查询类型标识
    created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
    
    FOREIGN KEY (history_id) REFERENCES query_history(id) ON DELETE SET NULL
);

-- 索引
CREATE INDEX idx_query_logs_batch_id ON query_logs(batch_id);
CREATE INDEX idx_query_logs_history_id ON query_logs(history_id);
CREATE INDEX idx_query_logs_level ON query_logs(level);
CREATE INDEX idx_query_logs_timestamp ON query_logs(timestamp);
CREATE INDEX idx_query_logs_query_type ON query_logs(query_type);

字段详解:

  • id: 主键,自增整数
  • batch_id: 批次标识同一次查询操作的日志共享同一个batch_id
  • history_id: 关联的历史记录ID外键关联
  • timestamp: 日志产生的精确时间戳
  • level: 日志级别(INFO/WARNING/ERROR)
  • message: 日志消息内容
  • query_type: 查询类型标识(cassandra_single/cassandra_sharding/redis_compare)
  • created_at: 记录创建时间

2.3 数据库触发器

2.3.1 自动更新updated_at触发器

-- 配置组更新时间触发器
CREATE TRIGGER update_config_groups_updated_at 
    AFTER UPDATE ON config_groups
    FOR EACH ROW
    BEGIN
        UPDATE config_groups 
        SET updated_at = CURRENT_TIMESTAMP 
        WHERE id = NEW.id;
    END;

2.4 数据库视图

2.4.1 查询历史统计视图

CREATE VIEW query_history_stats AS
SELECT 
    query_type,
    COUNT(*) as total_queries,
    AVG(execution_time) as avg_execution_time,
    AVG(total_keys) as avg_keys_per_query,
    AVG(differences_count) as avg_differences,
    AVG(CAST(differences_count AS REAL) / total_keys * 100) as avg_diff_rate,
    MIN(created_at) as first_query,
    MAX(created_at) as last_query
FROM query_history 
GROUP BY query_type;

2.4.2 日志统计视图

CREATE VIEW query_logs_stats AS
SELECT 
    DATE(created_at) as log_date,
    level,
    query_type,
    COUNT(*) as log_count
FROM query_logs 
GROUP BY DATE(created_at), level, query_type
ORDER BY log_date DESC, level;

3. Cassandra数据库设计

3.1 数据模型理解

DataTools Pro作为数据比对工具不直接管理Cassandra的数据模型而是连接到现有的Cassandra集群进行数据查询和比对。

3.2 支持的Cassandra特性

3.2.1 表结构支持

  • 单表查询: 支持任意Cassandra表结构
  • 分表查询: 支持TWCS(Time Window Compaction Strategy)分表模式
  • 复合主键: 支持多字段组合主键
  • 所有数据类型: text、int、timestamp、uuid、blob等

3.2.2 查询模式

-- 单主键查询示例
SELECT * FROM keyspace.table_name 
WHERE primary_key IN (?, ?, ?);

-- 复合主键查询示例  
SELECT * FROM keyspace.table_name 
WHERE (key1='value1' AND key2='value2') 
   OR (key1='value3' AND key2='value4');

-- 分表查询示例
SELECT * FROM keyspace.table_name_0 
WHERE primary_key IN (?, ?);
SELECT * FROM keyspace.table_name_1 
WHERE primary_key IN (?);

3.2.3 分表命名规范

基础表名: user_data
分表命名: user_data_0, user_data_1, user_data_2, ..., user_data_13
分表计算: shard_index = timestamp // interval_seconds % table_count

3.3 连接管理

# Cassandra连接配置
cassandra_config = {
    "cluster_name": "production-cluster",
    "datacenter": "datacenter1", 
    "hosts": ["10.0.1.100", "10.0.1.101", "10.0.1.102"],
    "port": 9042,
    "username": "app_user",
    "password": "secure_password",
    "keyspace": "application_data",
    "table": "user_profiles"
}

# 连接池配置
connection_settings = {
    "connect_timeout": 10,
    "request_timeout": 30,
    "load_balancing_policy": "DCAwareRoundRobinPolicy",
    "retry_policy": "RetryPolicy",
    "compression": "lz4"
}

4. Redis数据库设计

4.1 数据模型理解

DataTools Pro支持Redis集群的数据比对不直接管理Redis的数据结构而是连接到现有的Redis集群进行数据查询和比对。

4.2 支持的Redis数据类型

4.2.1 基本数据类型

  • String: 字符串类型,最常用的数据类型
  • Hash: 哈希表,存储字段-值对
  • List: 列表,有序的字符串集合
  • Set: 集合,无序的字符串集合
  • Sorted Set: 有序集合,带分数的有序字符串集合

4.2.2 查询操作

# 不同数据类型的查询操作
redis_operations = {
    "string": "GET key",
    "hash": "HGETALL key", 
    "list": "LRANGE key 0 -1",
    "set": "SMEMBERS key",
    "zset": "ZRANGE key 0 -1 WITHSCORES"
}

4.3 Redis集群配置

# Redis集群连接配置
redis_config = {
    "cluster_name": "production-redis",
    "nodes": [
        {"host": "10.0.1.100", "port": 6379},
        {"host": "10.0.1.101", "port": 6379},
        {"host": "10.0.1.102", "port": 6379}
    ],
    "password": "redis_password",
    "socket_timeout": 3,
    "socket_connect_timeout": 3,
    "max_connections_per_node": 16,
    "skip_full_coverage_check": True,
    "decode_responses": True
}

5. 数据迁移和版本管理

5.1 数据库版本控制

5.1.1 版本信息表

CREATE TABLE database_version (
    version TEXT PRIMARY KEY,
    applied_at DATETIME DEFAULT CURRENT_TIMESTAMP,
    description TEXT
);

INSERT INTO database_version (version, description) 
VALUES ('1.0.0', '初始数据库结构');

5.1.2 迁移脚本示例

-- 版本1.0.0 -> 1.1.0 迁移脚本
-- 添加Redis配置支持
ALTER TABLE config_groups 
ADD COLUMN redis_config TEXT;

INSERT INTO database_version (version, description) 
VALUES ('1.1.0', '添加Redis配置支持');

5.2 数据备份策略

5.2.1 SQLite备份

# 完整备份
cp config_groups.db config_groups_backup_$(date +%Y%m%d_%H%M%S).db

# 增量备份(基于时间戳)
sqlite3 config_groups.db "
SELECT * FROM config_groups 
WHERE updated_at > '2024-08-05 00:00:00';" > incremental_backup.sql

5.2.2 备份恢复

# 恢复完整备份
cp config_groups_backup_20240805_100000.db config_groups.db

# 恢复增量数据
sqlite3 config_groups.db < incremental_backup.sql

6. 性能优化

6.1 SQLite优化

6.1.1 配置优化

-- SQLite性能优化设置
PRAGMA journal_mode = WAL;          -- 写前日志模式
PRAGMA synchronous = NORMAL;        -- 平衡性能和安全
PRAGMA cache_size = 10000;          -- 缓存页面数量
PRAGMA temp_store = memory;         -- 临时表存储在内存
PRAGMA mmap_size = 268435456;       -- 内存映射大小(256MB)

6.1.2 查询优化

-- 分页查询优化
SELECT * FROM query_history 
WHERE created_at >= ? 
ORDER BY created_at DESC 
LIMIT ? OFFSET ?;

-- 统计查询优化  
SELECT query_type, COUNT(*), AVG(execution_time)
FROM query_history 
WHERE created_at >= DATE('now', '-30 days')
GROUP BY query_type;

6.2 连接池优化

6.2.1 Cassandra连接池

# 连接池配置
cluster = Cluster(
    hosts=['10.0.1.100', '10.0.1.101'],
    port=9042,
    load_balancing_policy=DCAwareRoundRobinPolicy(local_dc='datacenter1'),
    default_retry_policy=RetryPolicy(),
    compression=True,
    protocol_version=4
)

# 会话池管理
session_pool = cluster.connect()
session_pool.default_timeout = 30

6.2.2 Redis连接池

# Redis集群连接池
from rediscluster import RedisCluster

redis_cluster = RedisCluster(
    startup_nodes=[
        {"host": "10.0.1.100", "port": 6379},
        {"host": "10.0.1.101", "port": 6379}
    ],
    password="redis_password",
    socket_timeout=3,
    socket_connect_timeout=3,
    max_connections_per_node=16,
    skip_full_coverage_check=True
)

7. 数据安全

7.1 敏感数据处理

7.1.1 密码加密

# 配置中的密码加密存储
import base64
from cryptography.fernet import Fernet

def encrypt_password(password, key):
    """加密密码"""
    f = Fernet(key)
    encrypted_password = f.encrypt(password.encode())
    return base64.b64encode(encrypted_password).decode()

def decrypt_password(encrypted_password, key):
    """解密密码"""
    f = Fernet(key)
    decoded_password = base64.b64decode(encrypted_password.encode())
    return f.decrypt(decoded_password).decode()

7.1.2 数据脱敏

def mask_sensitive_data(config):
    """配置数据脱敏显示"""
    masked_config = config.copy()
    if 'password' in masked_config:
        masked_config['password'] = '***masked***'
    return masked_config

7.2 访问控制

7.2.1 数据库权限

-- SQLite访问控制(文件系统级别)
chmod 600 config_groups.db  -- 仅所有者可读写
chown app:app config_groups.db  -- 设置正确的所有者

7.2.2 网络安全

# Cassandra SSL连接
from cassandra.auth import PlainTextAuthProvider
from ssl import SSLContext, PROTOCOL_TLS

ssl_context = SSLContext(PROTOCOL_TLS)
auth_provider = PlainTextAuthProvider(username='app_user', password='password')

cluster = Cluster(
    ['10.0.1.100'],
    ssl_context=ssl_context,
    auth_provider=auth_provider
)

8. 监控和维护

8.1 数据库监控

8.1.1 SQLite监控

-- 数据库大小监控
SELECT 
    (page_count * page_size) / 1024 / 1024 as db_size_mb
FROM pragma_page_count(), pragma_page_size();

-- 表大小统计
SELECT 
    name,
    COUNT(*) as row_count
FROM sqlite_master sm
JOIN (
    SELECT 'config_groups' as name, COUNT(*) as cnt FROM config_groups
    UNION ALL
    SELECT 'query_history' as name, COUNT(*) as cnt FROM query_history  
    UNION ALL
    SELECT 'query_logs' as name, COUNT(*) as cnt FROM query_logs
) t ON sm.name = t.name;

8.1.2 连接监控

# 连接健康检查
def check_database_health():
    """检查数据库连接健康状态"""
    health_status = {
        "sqlite": check_sqlite_health(),
        "cassandra": check_cassandra_health(),
        "redis": check_redis_health()
    }
    return health_status

def check_sqlite_health():
    """检查SQLite健康状态"""
    try:
        conn = sqlite3.connect('config_groups.db')
        cursor = conn.cursor()
        cursor.execute("SELECT 1")
        conn.close()
        return {"status": "healthy", "timestamp": datetime.now()}
    except Exception as e:
        return {"status": "unhealthy", "error": str(e)}

8.2 数据维护

8.2.1 数据清理

-- 清理30天前的查询日志
DELETE FROM query_logs 
WHERE created_at < DATE('now', '-30 days');

-- 清理大型历史记录的详细数据
UPDATE query_history 
SET raw_results = NULL, 
    differences_data = NULL, 
    identical_data = NULL
WHERE created_at < DATE('now', '-90 days');

8.2.2 数据库维护

-- SQLite数据库优化
VACUUM;                    -- 重建数据库文件,回收空间
ANALYZE;                   -- 更新查询计划统计信息
PRAGMA optimize;           -- 优化数据库性能
REINDEX;                   -- 重建所有索引

版本: v1.0
更新日期: 2024-08-05
维护者: DataTools Pro Team