diff --git a/README.md b/README.md new file mode 100644 index 0000000..2ad5c5d --- /dev/null +++ b/README.md @@ -0,0 +1,212 @@ +# BigDataTool - 大数据查询比对工具 + +[![Python Version](https://img.shields.io/badge/python-3.8%2B-blue.svg)](https://python.org) +[![Flask Version](https://img.shields.io/badge/flask-2.3.3-green.svg)](https://flask.palletsprojects.com/) +[![License](https://img.shields.io/badge/license-MIT-blue.svg)](LICENSE) + +BigDataTool是一个功能强大的数据库查询比对工具,专门用于Cassandra数据库和Redis集群的数据一致性验证。支持单表查询、TWCS分表查询、多主键查询等多种复杂场景。 + +## 🚀 核心功能 + +### Cassandra数据比对 +- **单表查询**:标准的生产环境与测试环境数据比对 +- **分表查询**:基于TWCS策略的时间分表查询支持 +- **多主键查询**:支持复合主键的精确匹配和比对 +- **智能数据比较**:JSON、数组等复杂数据类型的深度比较 + +### Redis数据比对 +- **全类型支持**:String、Hash、List、Set、ZSet、Stream等所有Redis数据类型 +- **集群支持**:单节点和集群模式的自动检测和连接 +- **随机采样**:支持随机Key采样和指定Key比对两种模式 +- **性能监控**:详细的连接时间和查询性能统计 + +### 配置管理 +- **配置组管理**:数据库连接配置的保存、加载和复用 +- **查询历史**:查询记录的持久化存储和一键回放 +- **实时日志**:详细的操作日志和性能监控 +- **YAML导入**:支持YAML格式配置的一键导入 + +## 📋 系统要求 + +- Python 3.8+ +- Flask 2.3.3 +- Cassandra Driver 3.29.1 +- Redis 5.0.1 + +## 🛠️ 安装部署 + +### 1. 克隆项目 +```bash +git clone https://github.com/your-org/BigDataTool.git +cd BigDataTool +``` + +### 2. 安装依赖 +```bash +pip install -r requirements.txt +``` + +### 3. 启动应用 +```bash +python app.py +``` + +应用将在 `http://localhost:5000` 启动 + +## 🎯 快速开始 + +### Cassandra数据比对 + +1. 访问 `http://localhost:5000/db-compare` +2. 配置生产环境和测试环境的Cassandra连接信息 +3. 设置主键字段和查询参数 +4. 输入要比对的Key值列表 +5. 点击"开始查询"执行比对 + +#### 单主键查询示例 +``` +主键字段: id +查询Key值: +1001 +1002 +1003 +``` + +#### 复合主键查询示例 +``` +主键字段: docid,id +查询Key值: +8825C293B3609175B2224236E984FEDB,8825C293B3609175B2224236E984FED +9925C293B3609175B2224236E984FEDB,9925C293B3609175B2224236E984FED +``` + +### Redis数据比对 + +1. 访问 `http://localhost:5000/redis-compare` +2. 配置两个Redis集群的连接信息 +3. 选择查询模式(随机采样或指定Key) +4. 设置查询参数 +5. 执行比对分析 + +## 📊 功能特性 + +### 数据比对引擎 +- **智能JSON比较**:自动处理JSON格式差异和嵌套结构 +- **数组顺序无关比较**:忽略数组元素顺序的深度比较 +- **字段级差异分析**:详细的字段差异统计和热点分析 +- **数据质量评估**:自动生成数据一致性报告和改进建议 + +### 分表查询支持 +- **TWCS策略**:基于Time Window Compaction Strategy的分表计算 +- **时间戳提取**:智能从Key中提取时间戳信息 +- **混合查询**:支持生产分表+测试单表等组合场景 +- **并行查询**:多分表并行查询以提高性能 + +### 用户界面 +- **响应式设计**:基于Bootstrap的现代化界面 +- **实时反馈**:查询进度和结果的实时显示 +- **分页展示**:大数据集的高效分页显示 +- **多视图模式**:原始数据、格式化、差异对比等多种视图 + +## 🔧 配置说明 + +### Cassandra配置 +```json +{ + "cluster_name": "生产集群", + "hosts": ["192.168.1.100", "192.168.1.101"], + "port": 9042, + "datacenter": "dc1", + "username": "cassandra", + "password": "password", + "keyspace": "my_keyspace", + "table": "my_table" +} +``` + +### Redis配置 +```json +{ + "name": "生产Redis", + "nodes": [ + {"host": "192.168.1.200", "port": 7000}, + {"host": "192.168.1.201", "port": 7001} + ], + "password": "redis_password", + "socket_timeout": 3, + "socket_connect_timeout": 3, + "max_connections_per_node": 16 +} +``` + +## 📈 性能优化 + +- **连接池管理**:优化的数据库连接复用 +- **批量查询**:支持大批量Key的高效查询 +- **内存管理**:大结果集的内存友好处理 +- **并行处理**:多表并行查询和数据比对 +- **缓存机制**:查询结果和配置的智能缓存 + +## 🔍 故障排查 + +### 常见问题 + +1. **Cassandra连接失败** + - 检查网络连通性:`telnet ` + - 验证认证信息:用户名、密码、keyspace + - 确认防火墙设置 + +2. **Redis连接失败** + - 检查Redis服务状态:`redis-cli ping` + - 验证集群配置:节点地址和端口 + - 确认密码设置 + +3. **查询超时** + - 调整连接超时参数 + - 检查数据库服务器负载 + - 优化查询条件和索引 + +## 📝 API文档 + +### 主要API端点 + +- `POST /api/query` - 单表查询比对 +- `POST /api/sharding-query` - 分表查询比对 +- `POST /api/redis/compare` - Redis数据比对 +- `GET /api/config-groups` - 获取配置组列表 +- `POST /api/config-groups` - 创建配置组 +- `GET /api/query-history` - 获取查询历史 +- `GET /api/query-logs` - 获取查询日志 + +详细API文档请参考 [API.md](docs/API.md) + +## 📚 文档目录 + +- [API文档](docs/API.md) - 完整的API接口说明 +- [使用指南](docs/USER_GUIDE.md) - 详细的功能使用说明 +- [架构设计](docs/ARCHITECTURE.md) - 系统架构和设计原理 +- [部署指南](docs/DEPLOYMENT.md) - 生产环境部署说明 + +## 🤝 贡献指南 + +1. Fork 项目 +2. 创建功能分支 (`git checkout -b feature/AmazingFeature`) +3. 提交更改 (`git commit -m 'Add some AmazingFeature'`) +4. 推送到分支 (`git push origin feature/AmazingFeature`) +5. 开启 Pull Request + +## 📄 许可证 + +本项目采用 MIT 许可证 - 查看 [LICENSE](LICENSE) 文件了解详情 + +## 👥 作者 + +BigDataTool项目组 + +## 🙏 致谢 + +感谢所有为这个项目做出贡献的开发者和用户。 + +--- + +**注意**:使用前请确保已正确配置数据库连接信息,并在生产环境中谨慎使用。 diff --git a/app.py b/app.py index 6ded8aa..eeb5d99 100644 --- a/app.py +++ b/app.py @@ -1,6 +1,30 @@ """ -BigDataTool - 主应用文件 -模块化重构后的主应用,使用分离的模块组织代码 +BigDataTool - 大数据查询比对工具主应用 + +这是一个基于Flask的数据库查询比对工具,主要用于: +1. Cassandra数据库的生产环境与测试环境数据比对 +2. Redis集群数据的一致性验证 +3. 支持单表查询、TWCS分表查询、多主键查询等多种场景 +4. 提供完整的配置管理、查询历史和日志记录功能 + +主要特性: +- 模块化架构:清晰的代码组织和职责分离 +- 多数据源支持:Cassandra + Redis +- 智能数据比对:支持JSON、数组等复杂数据类型 +- 分表查询:基于TWCS策略的时间分表支持 +- 多主键查询:支持复合主键的精确匹配 +- 配置管理:数据库配置的保存、加载和复用 +- 查询历史:查询记录的持久化存储和回放 +- 实时日志:详细的操作日志和性能监控 + +技术栈: +- 后端:Flask + SQLite + Cassandra Driver + Redis +- 前端:原生JavaScript + Bootstrap +- 数据库:SQLite(配置存储)+ Cassandra(数据查询)+ Redis(数据比对) + +作者:BigDataTool项目组 +版本:v2.0 +更新时间:2024年8月 """ import logging diff --git a/app_original_backup.py b/app_original_backup.py deleted file mode 100644 index 714e56b..0000000 --- a/app_original_backup.py +++ /dev/null @@ -1,2229 +0,0 @@ -from flask import Flask, render_template, request, jsonify, send_from_directory -from cassandra.cluster import Cluster -from cassandra.auth import PlainTextAuthProvider -import json -import os -import logging -import sqlite3 -from datetime import datetime, timedelta -import re -import concurrent.futures -import time - -app = Flask(__name__) - -# 配置日志 -logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') -logger = logging.getLogger(__name__) - -# 数据库配置 -DATABASE_PATH = 'config_groups.db' - -# 查询日志收集器 -class QueryLogCollector: - def __init__(self, max_logs=1000, db_path=None): - self.logs = [] # 内存中的日志缓存 - self.max_logs = max_logs - self.current_batch_id = None - self.batch_counter = 0 - self.current_query_type = 'single' - self.current_history_id = None # 当前关联的历史记录ID - self.db_path = db_path or DATABASE_PATH - - def start_new_batch(self, query_type='single'): - """开始新的查询批次""" - self.batch_counter += 1 - self.current_batch_id = f"batch_{self.batch_counter}_{datetime.now().strftime('%H%M%S')}" - self.current_query_type = query_type - self.current_history_id = None # 重置历史记录ID - - # 添加批次开始标记 - self.add_log('INFO', f"=== 开始{query_type}查询批次 (ID: {self.current_batch_id}) ===", force_batch_id=self.current_batch_id) - return self.current_batch_id - - def set_history_id(self, history_id): - """设置当前批次关联的历史记录ID""" - self.current_history_id = history_id - if self.current_batch_id and history_id: - self.add_log('INFO', f"关联历史记录ID: {history_id}", force_batch_id=self.current_batch_id) - # 更新当前批次的所有日志记录的history_id - self._update_batch_history_id(self.current_batch_id, history_id) - - def _update_batch_history_id(self, batch_id, history_id): - """更新批次中所有日志的history_id""" - try: - conn = sqlite3.connect(self.db_path, timeout=30) - cursor = conn.cursor() - - cursor.execute(''' - UPDATE query_logs - SET history_id = ? - WHERE batch_id = ? - ''', (history_id, batch_id)) - - conn.commit() - conn.close() - logger.info(f"已更新批次 {batch_id} 的历史记录关联到 {history_id}") - except Exception as e: - print(f"Warning: Failed to update batch history_id: {e}") - - def end_current_batch(self): - """结束当前查询批次""" - if self.current_batch_id: - self.add_log('INFO', f"=== 查询批次完成 (ID: {self.current_batch_id}) ===", force_batch_id=self.current_batch_id) - self.current_batch_id = None - self.current_history_id = None - - def add_log(self, level, message, force_batch_id=None, force_query_type=None, force_history_id=None): - """添加日志到内存和数据库""" - timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] - batch_id = force_batch_id or self.current_batch_id - query_type = force_query_type or self.current_query_type - history_id = force_history_id or self.current_history_id - - log_entry = { - 'timestamp': timestamp, - 'level': level, - 'message': message, - 'batch_id': batch_id, - 'query_type': query_type, - 'history_id': history_id - } - - # 添加到内存缓存 - self.logs.append(log_entry) - if len(self.logs) > self.max_logs: - self.logs.pop(0) - - # 保存到数据库 - self._save_log_to_db(log_entry) - - def _save_log_to_db(self, log_entry): - """将日志保存到数据库""" - try: - conn = sqlite3.connect(self.db_path, timeout=30) - cursor = conn.cursor() - - cursor.execute(''' - INSERT INTO query_logs (batch_id, history_id, timestamp, level, message, query_type) - VALUES (?, ?, ?, ?, ?, ?) - ''', ( - log_entry['batch_id'], - log_entry['history_id'], - log_entry['timestamp'], - log_entry['level'], - log_entry['message'], - log_entry['query_type'] - )) - - conn.commit() - conn.close() - except Exception as e: - # 数据库写入失败时记录到控制台,但不影响程序运行 - print(f"Warning: Failed to save log to database: {e}") - - def get_logs(self, limit=None, from_db=True): - """获取日志,支持从数据库或内存获取""" - if from_db: - return self._get_logs_from_db(limit) - else: - # 从内存获取 - if limit: - return self.logs[-limit:] - return self.logs - - def _get_logs_from_db(self, limit=None): - """从数据库获取日志""" - try: - conn = sqlite3.connect(self.db_path, timeout=30) - conn.row_factory = sqlite3.Row - cursor = conn.cursor() - - query = ''' - SELECT batch_id, history_id, timestamp, level, message, query_type - FROM query_logs - ORDER BY id DESC - ''' - - if limit: - query += f' LIMIT {limit}' - - cursor.execute(query) - rows = cursor.fetchall() - - # 转换为字典格式并反转顺序(最新的在前) - logs = [] - for row in reversed(rows): - logs.append({ - 'batch_id': row['batch_id'], - 'history_id': row['history_id'], - 'timestamp': row['timestamp'], - 'level': row['level'], - 'message': row['message'], - 'query_type': row['query_type'] - }) - - conn.close() - return logs - except Exception as e: - print(f"Warning: Failed to get logs from database: {e}") - # 如果数据库读取失败,返回内存中的日志 - return self.get_logs(limit, from_db=False) - - def _get_total_logs_count(self): - """获取数据库中的日志总数""" - try: - conn = sqlite3.connect(self.db_path, timeout=30) - cursor = conn.cursor() - cursor.execute('SELECT COUNT(*) FROM query_logs') - count = cursor.fetchone()[0] - conn.close() - return count - except Exception as e: - print(f"Warning: Failed to get logs count from database: {e}") - return len(self.logs) - - def get_logs_by_history_id(self, history_id): - """根据历史记录ID获取相关日志""" - try: - conn = sqlite3.connect(self.db_path, timeout=30) - conn.row_factory = sqlite3.Row - cursor = conn.cursor() - - cursor.execute(''' - SELECT batch_id, history_id, timestamp, level, message, query_type - FROM query_logs - WHERE history_id = ? - ORDER BY id ASC - ''', (history_id,)) - - rows = cursor.fetchall() - logs = [] - for row in rows: - logs.append({ - 'batch_id': row['batch_id'], - 'history_id': row['history_id'], - 'timestamp': row['timestamp'], - 'level': row['level'], - 'message': row['message'], - 'query_type': row['query_type'] - }) - - conn.close() - return logs - except Exception as e: - print(f"Warning: Failed to get logs by history_id: {e}") - return [] - - def get_logs_grouped_by_batch(self, limit=None, from_db=True): - """按批次分组获取日志""" - logs = self.get_logs(limit, from_db) - grouped_logs = {} - batch_order = [] - - for log in logs: - batch_id = log.get('batch_id', 'unknown') - if batch_id not in grouped_logs: - grouped_logs[batch_id] = [] - batch_order.append(batch_id) - grouped_logs[batch_id].append(log) - - # 返回按时间顺序排列的批次 - return [(batch_id, grouped_logs[batch_id]) for batch_id in batch_order] - - def clear_logs(self, clear_db=True): - """清空日志""" - # 清空内存 - self.logs.clear() - self.current_batch_id = None - self.batch_counter = 0 - - # 清空数据库 - if clear_db: - try: - conn = sqlite3.connect(self.db_path, timeout=30) - cursor = conn.cursor() - cursor.execute('DELETE FROM query_logs') - conn.commit() - conn.close() - except Exception as e: - print(f"Warning: Failed to clear logs from database: {e}") - - def cleanup_old_logs(self, days_to_keep=30): - """清理旧日志,保留指定天数的日志""" - try: - conn = sqlite3.connect(self.db_path, timeout=30) - cursor = conn.cursor() - - # 删除超过指定天数的日志 - cutoff_date = datetime.now() - timedelta(days=days_to_keep) - cursor.execute(''' - DELETE FROM query_logs - WHERE created_at < ? - ''', (cutoff_date.strftime('%Y-%m-%d %H:%M:%S'),)) - - deleted_count = cursor.rowcount - conn.commit() - conn.close() - - logger.info(f"清理了 {deleted_count} 条超过 {days_to_keep} 天的旧日志") - return deleted_count - except Exception as e: - logger.error(f"清理旧日志失败: {e}") - return 0 - -# 全局日志收集器实例 -query_log_collector = QueryLogCollector() - -# 自定义日志处理器 -class CollectorHandler(logging.Handler): - def __init__(self, collector): - super().__init__() - self.collector = collector - - def emit(self, record): - self.collector.add_log(record.levelname, record.getMessage()) - -# 添加收集器处理器到logger -collector_handler = CollectorHandler(query_log_collector) -logger.addHandler(collector_handler) - -class ShardingCalculator: - """分表计算器,基于TWCS策略""" - - def __init__(self, interval_seconds=604800, table_count=14): - """ - 初始化分表计算器 - :param interval_seconds: 时间间隔(秒),默认604800(7天) - :param table_count: 分表数量,默认14 - """ - self.interval_seconds = interval_seconds - self.table_count = table_count - - def extract_timestamp_from_key(self, key): - """ - 从Key中提取时间戳 - 新规则:优先提取最后一个下划线后的数字,如果没有下划线则提取最后连续的数字部分 - """ - if not key: - return None - - key_str = str(key) - - # 方法1:如果包含下划线,尝试提取最后一个下划线后的部分 - if '_' in key_str: - parts = key_str.split('_') - last_part = parts[-1] - # 检查最后一部分是否为纯数字 - if last_part.isdigit(): - timestamp = int(last_part) - logger.info(f"Key '{key}' 通过下划线分割提取到时间戳: {timestamp}") - return timestamp - - # 方法2:使用正则表达式找到所有数字序列,取最后一个较长的 - number_sequences = re.findall(r'\d+', key_str) - - if not number_sequences: - logger.warning(f"Key '{key}' 中没有找到数字字符") - return None - - # 如果有多个数字序列,优先选择最长的,如果长度相同则选择最后一个 - longest_sequence = max(number_sequences, key=len) - - # 如果最长的有多个,选择最后一个最长的 - max_length = len(longest_sequence) - last_longest = None - for seq in number_sequences: - if len(seq) == max_length: - last_longest = seq - - try: - timestamp = int(last_longest) - logger.info(f"Key '{key}' 通过数字序列提取到时间戳: {timestamp} (从序列 {number_sequences} 中选择)") - return timestamp - except ValueError: - logger.error(f"Key '{key}' 时间戳转换失败: {last_longest}") - return None - - def calculate_shard_index(self, timestamp): - """ - 计算分表索引 - 公式:timestamp // interval_seconds % table_count - """ - if timestamp is None: - return None - return int(timestamp) // self.interval_seconds % self.table_count - - def get_shard_table_name(self, base_table_name, key): - """ - 根据Key获取对应的分表名称 - """ - timestamp = self.extract_timestamp_from_key(key) - if timestamp is None: - return None - - shard_index = self.calculate_shard_index(timestamp) - return f"{base_table_name}_{shard_index}" - - def get_all_shard_tables_for_keys(self, base_table_name, keys): - """ - 为一批Keys计算所有需要查询的分表 - 返回: {shard_table_name: [keys_for_this_shard], ...} - """ - shard_mapping = {} - failed_keys = [] - calculation_stats = { - 'total_keys': len(keys), - 'successful_extractions': 0, - 'failed_extractions': 0, - 'unique_shards': 0 - } - - for key in keys: - shard_table = self.get_shard_table_name(base_table_name, key) - if shard_table: - if shard_table not in shard_mapping: - shard_mapping[shard_table] = [] - shard_mapping[shard_table].append(key) - calculation_stats['successful_extractions'] += 1 - else: - failed_keys.append(key) - calculation_stats['failed_extractions'] += 1 - - calculation_stats['unique_shards'] = len(shard_mapping) - - return shard_mapping, failed_keys, calculation_stats - -def init_database(): - """初始化数据库""" - try: - conn = sqlite3.connect(DATABASE_PATH) - cursor = conn.cursor() - - # 创建配置组表 - cursor.execute(''' - CREATE TABLE IF NOT EXISTS config_groups ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - name TEXT NOT NULL UNIQUE, - description TEXT, - pro_config TEXT NOT NULL, - test_config TEXT NOT NULL, - query_config TEXT NOT NULL, - sharding_config TEXT, - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP - ) - ''') - - # 创建查询历史表,包含分表配置字段 - cursor.execute(''' - CREATE TABLE IF NOT EXISTS query_history ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - name TEXT NOT NULL, - description TEXT, - pro_config TEXT NOT NULL, - test_config TEXT NOT NULL, - query_config TEXT NOT NULL, - query_keys TEXT NOT NULL, - results_summary TEXT NOT NULL, - execution_time REAL NOT NULL, - total_keys INTEGER NOT NULL, - differences_count INTEGER NOT NULL, - identical_count INTEGER NOT NULL, - sharding_config TEXT, - query_type TEXT DEFAULT 'single', - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP - ) - ''') - - # 创建分表配置组表 - cursor.execute(''' - CREATE TABLE IF NOT EXISTS sharding_config_groups ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - name TEXT NOT NULL UNIQUE, - description TEXT, - pro_config TEXT NOT NULL, - test_config TEXT NOT NULL, - query_config TEXT NOT NULL, - sharding_config TEXT NOT NULL, - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP - ) - ''') - - # 创建查询日志表 - cursor.execute(''' - CREATE TABLE IF NOT EXISTS query_logs ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - batch_id TEXT NOT NULL, - history_id INTEGER, - timestamp TEXT NOT NULL, - level TEXT NOT NULL, - message TEXT NOT NULL, - query_type TEXT DEFAULT 'single', - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - FOREIGN KEY (history_id) REFERENCES query_history (id) ON DELETE CASCADE - ) - ''') - - # 创建索引 - cursor.execute('CREATE INDEX IF NOT EXISTS idx_query_logs_batch_id ON query_logs(batch_id)') - cursor.execute('CREATE INDEX IF NOT EXISTS idx_query_logs_history_id ON query_logs(history_id)') - cursor.execute('CREATE INDEX IF NOT EXISTS idx_query_logs_timestamp ON query_logs(timestamp)') - cursor.execute('CREATE INDEX IF NOT EXISTS idx_query_logs_level ON query_logs(level)') - - conn.commit() - conn.close() - logger.info("数据库初始化完成") - return True - except Exception as e: - logger.error(f"数据库初始化失败: {e}") - return False - -def ensure_database(): - """确保数据库和表存在""" - if not os.path.exists(DATABASE_PATH): - logger.info("数据库文件不存在,正在创建...") - return init_database() - - # 检查表是否存在 - try: - conn = sqlite3.connect(DATABASE_PATH) - cursor = conn.cursor() - cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name IN ('config_groups', 'query_history', 'sharding_config_groups', 'query_logs')") - results = cursor.fetchall() - existing_tables = [row[0] for row in results] - - required_tables = ['config_groups', 'query_history', 'sharding_config_groups', 'query_logs'] - missing_tables = [table for table in required_tables if table not in existing_tables] - - if missing_tables: - logger.info(f"数据库表不完整,缺少表:{missing_tables},正在重新创建...") - return init_database() - - # 检查config_groups表是否有sharding_config字段 - cursor.execute("PRAGMA table_info(config_groups)") - columns = cursor.fetchall() - column_names = [column[1] for column in columns] - - if 'sharding_config' not in column_names: - logger.info("添加sharding_config字段到config_groups表...") - cursor.execute("ALTER TABLE config_groups ADD COLUMN sharding_config TEXT") - conn.commit() - logger.info("sharding_config字段添加成功") - - # 检查query_history表是否有分表相关字段 - cursor.execute("PRAGMA table_info(query_history)") - history_columns = cursor.fetchall() - history_column_names = [column[1] for column in history_columns] - - if 'sharding_config' not in history_column_names: - logger.info("添加sharding_config字段到query_history表...") - cursor.execute("ALTER TABLE query_history ADD COLUMN sharding_config TEXT") - conn.commit() - logger.info("query_history表sharding_config字段添加成功") - - if 'query_type' not in history_column_names: - logger.info("添加query_type字段到query_history表...") - cursor.execute("ALTER TABLE query_history ADD COLUMN query_type TEXT DEFAULT 'single'") - conn.commit() - logger.info("query_history表query_type字段添加成功") - - # 添加查询结果数据存储字段 - if 'raw_results' not in history_column_names: - logger.info("添加raw_results字段到query_history表...") - cursor.execute("ALTER TABLE query_history ADD COLUMN raw_results TEXT") - conn.commit() - logger.info("query_history表raw_results字段添加成功") - - if 'differences_data' not in history_column_names: - logger.info("添加differences_data字段到query_history表...") - cursor.execute("ALTER TABLE query_history ADD COLUMN differences_data TEXT") - conn.commit() - logger.info("query_history表differences_data字段添加成功") - - if 'identical_data' not in history_column_names: - logger.info("添加identical_data字段到query_history表...") - cursor.execute("ALTER TABLE query_history ADD COLUMN identical_data TEXT") - conn.commit() - logger.info("query_history表identical_data字段添加成功") - - # 检查query_logs表是否存在history_id字段 - cursor.execute("PRAGMA table_info(query_logs)") - logs_columns = cursor.fetchall() - logs_column_names = [column[1] for column in logs_columns] - - if 'history_id' not in logs_column_names: - logger.info("添加history_id字段到query_logs表...") - cursor.execute("ALTER TABLE query_logs ADD COLUMN history_id INTEGER") - # 创建外键索引 - cursor.execute('CREATE INDEX IF NOT EXISTS idx_query_logs_history_id ON query_logs(history_id)') - conn.commit() - logger.info("query_logs表history_id字段添加成功") - - conn.close() - return True - except Exception as e: - logger.error(f"检查数据库表失败: {e}") - return init_database() - -def get_db_connection(): - """获取数据库连接""" - conn = sqlite3.connect(DATABASE_PATH) - conn.row_factory = sqlite3.Row - return conn - -def normalize_json_string(value): - """标准化JSON字符串,用于比较""" - if not isinstance(value, str): - return value - - try: - # 尝试解析JSON - json_obj = json.loads(value) - - # 如果是数组,需要进行特殊处理 - if isinstance(json_obj, list): - # 尝试对数组元素进行标准化排序 - normalized_array = normalize_json_array(json_obj) - return json.dumps(normalized_array, sort_keys=True, separators=(',', ':')) - else: - # 普通对象,直接序列化 - return json.dumps(json_obj, sort_keys=True, separators=(',', ':')) - except (json.JSONDecodeError, TypeError): - # 如果不是JSON,返回原值 - return value - -def normalize_json_array(json_array): - """标准化JSON数组,处理元素顺序问题""" - try: - normalized_elements = [] - - for element in json_array: - if isinstance(element, dict): - # 对字典元素进行标准化 - normalized_elements.append(json.dumps(element, sort_keys=True, separators=(',', ':'))) - elif isinstance(element, str): - # 如果是字符串,尝试解析为JSON - try: - parsed_element = json.loads(element) - normalized_elements.append(json.dumps(parsed_element, sort_keys=True, separators=(',', ':'))) - except: - normalized_elements.append(element) - else: - normalized_elements.append(element) - - # 对标准化后的元素进行排序,确保顺序一致 - normalized_elements.sort() - - # 重新解析为对象数组 - result_array = [] - for element in normalized_elements: - if isinstance(element, str): - try: - result_array.append(json.loads(element)) - except: - result_array.append(element) - else: - result_array.append(element) - - return result_array - - except Exception as e: - logger.warning(f"数组标准化失败: {e}") - return json_array - -def is_json_array_field(value): - """检查字段是否为JSON数组格式""" - if not isinstance(value, (str, list)): - return False - - try: - if isinstance(value, str): - parsed = json.loads(value) - return isinstance(parsed, list) - elif isinstance(value, list): - # 检查是否为JSON字符串数组 - if len(value) > 0 and isinstance(value[0], str): - try: - json.loads(value[0]) - return True - except: - return False - return True - except: - return False - -def compare_array_values(value1, value2): - """专门用于比较数组类型的值""" - try: - # 处理字符串表示的数组 - if isinstance(value1, str) and isinstance(value2, str): - try: - array1 = json.loads(value1) - array2 = json.loads(value2) - if isinstance(array1, list) and isinstance(array2, list): - return compare_json_arrays(array1, array2) - except: - pass - - # 处理Python列表类型 - elif isinstance(value1, list) and isinstance(value2, list): - return compare_json_arrays(value1, value2) - - # 处理混合情况:一个是字符串数组,一个是列表 - elif isinstance(value1, list) and isinstance(value2, str): - try: - array2 = json.loads(value2) - if isinstance(array2, list): - return compare_json_arrays(value1, array2) - except: - pass - elif isinstance(value1, str) and isinstance(value2, list): - try: - array1 = json.loads(value1) - if isinstance(array1, list): - return compare_json_arrays(array1, value2) - except: - pass - - return False - except Exception as e: - logger.warning(f"数组比较失败: {e}") - return False - -def compare_json_arrays(array1, array2): - """比较两个JSON数组,忽略元素顺序""" - try: - if len(array1) != len(array2): - return False - - # 标准化两个数组 - normalized_array1 = normalize_json_array(array1.copy()) - normalized_array2 = normalize_json_array(array2.copy()) - - # 将标准化后的数组转换为可比较的格式 - comparable1 = json.dumps(normalized_array1, sort_keys=True) - comparable2 = json.dumps(normalized_array2, sort_keys=True) - - return comparable1 == comparable2 - - except Exception as e: - logger.warning(f"JSON数组比较失败: {e}") - return False - -def format_json_for_display(value): - """格式化JSON用于显示""" - if not isinstance(value, str): - return str(value) - - try: - # 尝试解析JSON - json_obj = json.loads(value) - # 格式化显示(带缩进) - return json.dumps(json_obj, sort_keys=True, indent=2, ensure_ascii=False) - except (json.JSONDecodeError, TypeError): - # 如果不是JSON,返回原值 - return str(value) - -def is_json_field(value): - """检查字段是否为JSON格式""" - if not isinstance(value, str): - return False - - try: - json.loads(value) - return True - except (json.JSONDecodeError, TypeError): - return False - -def compare_values(value1, value2): - """智能比较两个值,支持JSON标准化和数组比较""" - # 首先检查是否为数组类型 - if is_json_array_field(value1) or is_json_array_field(value2): - return compare_array_values(value1, value2) - - # 如果两个值都是字符串,尝试JSON标准化比较 - if isinstance(value1, str) and isinstance(value2, str): - normalized_value1 = normalize_json_string(value1) - normalized_value2 = normalize_json_string(value2) - return normalized_value1 == normalized_value2 - - # 其他情况直接比较 - return value1 == value2 - -# 默认配置(不显示敏感信息) -DEFAULT_CONFIG = { - 'pro_config': { - 'cluster_name': '', - 'hosts': [], - 'port': 9042, - 'datacenter': '', - 'username': '', - 'password': '', - 'keyspace': '', - 'table': '' - }, - 'test_config': { - 'cluster_name': '', - 'hosts': [], - 'port': 9042, - 'datacenter': '', - 'username': '', - 'password': '', - 'keyspace': '', - 'table': '' - }, - 'keys': [], - 'fields_to_compare': [], - 'exclude_fields': [] -} - -def save_config_group(name, description, pro_config, test_config, query_config, sharding_config=None): - """保存配置组""" - if not ensure_database(): - logger.error("数据库初始化失败") - return False - - conn = get_db_connection() - cursor = conn.cursor() - - try: - cursor.execute(''' - INSERT OR REPLACE INTO config_groups - (name, description, pro_config, test_config, query_config, sharding_config, updated_at) - VALUES (?, ?, ?, ?, ?, ?, ?) - ''', ( - name, description, - json.dumps(pro_config), - json.dumps(test_config), - json.dumps(query_config), - json.dumps(sharding_config) if sharding_config else None, - datetime.now().isoformat() - )) - conn.commit() - logger.info(f"配置组 '{name}' 保存成功,包含分表配置: {sharding_config is not None}") - return True - except Exception as e: - logger.error(f"保存配置组失败: {e}") - return False - finally: - conn.close() - -def get_config_groups(): - """获取所有配置组""" - if not ensure_database(): - logger.error("数据库初始化失败") - return [] - - conn = get_db_connection() - cursor = conn.cursor() - - try: - cursor.execute(''' - SELECT id, name, description, created_at, updated_at - FROM config_groups - ORDER BY updated_at DESC - ''') - rows = cursor.fetchall() - - config_groups = [] - for row in rows: - config_groups.append({ - 'id': row['id'], - 'name': row['name'], - 'description': row['description'], - 'created_at': row['created_at'], - 'updated_at': row['updated_at'] - }) - - return config_groups - except Exception as e: - logger.error(f"获取配置组失败: {e}") - return [] - finally: - conn.close() - -def get_config_group_by_id(group_id): - """根据ID获取配置组详情""" - if not ensure_database(): - logger.error("数据库初始化失败") - return None - - conn = get_db_connection() - cursor = conn.cursor() - - try: - cursor.execute(''' - SELECT id, name, description, pro_config, test_config, query_config, - sharding_config, created_at, updated_at - FROM config_groups WHERE id = ? - ''', (group_id,)) - row = cursor.fetchone() - - if row: - config = { - 'id': row['id'], - 'name': row['name'], - 'description': row['description'], - 'pro_config': json.loads(row['pro_config']), - 'test_config': json.loads(row['test_config']), - 'query_config': json.loads(row['query_config']), - 'created_at': row['created_at'], - 'updated_at': row['updated_at'] - } - - # 添加分表配置 - if row['sharding_config']: - try: - config['sharding_config'] = json.loads(row['sharding_config']) - except (json.JSONDecodeError, TypeError): - config['sharding_config'] = None - else: - config['sharding_config'] = None - - return config - return None - except Exception as e: - logger.error(f"获取配置组详情失败: {e}") - return None - finally: - conn.close() - -def delete_config_group(group_id): - """删除配置组""" - if not ensure_database(): - logger.error("数据库初始化失败") - return False - - conn = get_db_connection() - cursor = conn.cursor() - - try: - cursor.execute('DELETE FROM config_groups WHERE id = ?', (group_id,)) - conn.commit() - success = cursor.rowcount > 0 - if success: - logger.info(f"配置组ID {group_id} 删除成功") - return success - except Exception as e: - logger.error(f"删除配置组失败: {e}") - return False - finally: - conn.close() - -def save_query_history(name, description, pro_config, test_config, query_config, query_keys, - results_summary, execution_time, total_keys, differences_count, identical_count, - sharding_config=None, query_type='single', raw_results=None, differences_data=None, identical_data=None): - """保存查询历史记录,支持分表查询和查询结果数据,返回历史记录ID""" - if not ensure_database(): - logger.error("数据库初始化失败") - return None - - conn = get_db_connection() - cursor = conn.cursor() - - try: - cursor.execute(''' - INSERT INTO query_history - (name, description, pro_config, test_config, query_config, query_keys, - results_summary, execution_time, total_keys, differences_count, identical_count, - sharding_config, query_type, raw_results, differences_data, identical_data) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - ''', ( - name, description, - json.dumps(pro_config), - json.dumps(test_config), - json.dumps(query_config), - json.dumps(query_keys), - json.dumps(results_summary), - execution_time, - total_keys, - differences_count, - identical_count, - json.dumps(sharding_config) if sharding_config else None, - query_type, - json.dumps(raw_results) if raw_results else None, - json.dumps(differences_data) if differences_data else None, - json.dumps(identical_data) if identical_data else None - )) - - # 获取插入记录的ID - history_id = cursor.lastrowid - conn.commit() - logger.info(f"查询历史记录 '{name}' 保存成功,查询类型:{query_type},ID:{history_id}") - return history_id - except Exception as e: - logger.error(f"保存查询历史记录失败: {e}") - return None - finally: - conn.close() - -def get_query_history(): - """获取所有查询历史记录""" - if not ensure_database(): - logger.error("数据库初始化失败") - return [] - - conn = get_db_connection() - cursor = conn.cursor() - - try: - cursor.execute(''' - SELECT id, name, description, execution_time, total_keys, - differences_count, identical_count, created_at, query_type - FROM query_history - ORDER BY created_at DESC - ''') - rows = cursor.fetchall() - - history_list = [] - for row in rows: - # 获取列名列表以检查字段是否存在 - column_names = [desc[0] for desc in cursor.description] - history_list.append({ - 'id': row['id'], - 'name': row['name'], - 'description': row['description'], - 'execution_time': row['execution_time'], - 'total_keys': row['total_keys'], - 'differences_count': row['differences_count'], - 'identical_count': row['identical_count'], - 'created_at': row['created_at'], - 'query_type': row['query_type'] if 'query_type' in column_names else 'single' - }) - - return history_list - except Exception as e: - logger.error(f"获取查询历史记录失败: {e}") - return [] - finally: - conn.close() - -def get_query_history_by_id(history_id): - """根据ID获取查询历史记录详情""" - if not ensure_database(): - logger.error("数据库初始化失败") - return None - - conn = get_db_connection() - cursor = conn.cursor() - - try: - cursor.execute(''' - SELECT * FROM query_history WHERE id = ? - ''', (history_id,)) - row = cursor.fetchone() - - if row: - # 获取列名列表以检查字段是否存在 - column_names = [desc[0] for desc in cursor.description] - return { - 'id': row['id'], - 'name': row['name'], - 'description': row['description'], - 'pro_config': json.loads(row['pro_config']), - 'test_config': json.loads(row['test_config']), - 'query_config': json.loads(row['query_config']), - 'query_keys': json.loads(row['query_keys']), - 'results_summary': json.loads(row['results_summary']), - 'execution_time': row['execution_time'], - 'total_keys': row['total_keys'], - 'differences_count': row['differences_count'], - 'identical_count': row['identical_count'], - 'created_at': row['created_at'], - # 处理新字段,保持向后兼容 - 'sharding_config': json.loads(row['sharding_config']) if 'sharding_config' in column_names and row['sharding_config'] else None, - 'query_type': row['query_type'] if 'query_type' in column_names else 'single', - # 添加查询结果数据支持 - 'raw_results': json.loads(row['raw_results']) if 'raw_results' in column_names and row['raw_results'] else None, - 'differences_data': json.loads(row['differences_data']) if 'differences_data' in column_names and row['differences_data'] else None, - 'identical_data': json.loads(row['identical_data']) if 'identical_data' in column_names and row['identical_data'] else None - } - return None - except Exception as e: - logger.error(f"获取查询历史记录详情失败: {e}") - return None - finally: - conn.close() - -def delete_query_history(history_id): - """删除查询历史记录""" - if not ensure_database(): - logger.error("数据库初始化失败") - return False - - conn = get_db_connection() - cursor = conn.cursor() - - try: - cursor.execute('DELETE FROM query_history WHERE id = ?', (history_id,)) - conn.commit() - success = cursor.rowcount > 0 - if success: - logger.info(f"查询历史记录ID {history_id} 删除成功") - return success - except Exception as e: - logger.error(f"删除查询历史记录失败: {e}") - return False - finally: - conn.close() - -def create_connection(config): - """创建Cassandra连接,带有增强的错误诊断和容错机制""" - start_time = time.time() - - logger.info(f"=== 开始创建Cassandra连接 ===") - logger.info(f"主机列表: {config.get('hosts', [])}") - logger.info(f"端口: {config.get('port', 9042)}") - logger.info(f"用户名: {config.get('username', 'N/A')}") - logger.info(f"Keyspace: {config.get('keyspace', 'N/A')}") - - try: - logger.info("正在创建认证提供者...") - auth_provider = PlainTextAuthProvider(username=config['username'], password=config['password']) - - logger.info("正在创建集群连接...") - # 设置连接池配置,提高容错性 - from cassandra.policies import DCAwareRoundRobinPolicy - - # 设置负载均衡策略,避免单点故障 - load_balancing_policy = DCAwareRoundRobinPolicy(local_dc=config.get('datacenter', 'dc1')) - - # 创建连接配置,增加容错参数 - cluster = Cluster( - config['hosts'], - port=config['port'], - auth_provider=auth_provider, - load_balancing_policy=load_balancing_policy, - # 增加容错配置 - protocol_version=4, # 使用稳定的协议版本 - connect_timeout=15, # 连接超时 - control_connection_timeout=15, # 控制连接超时 - max_schema_agreement_wait=30 # schema同步等待时间 - ) - - logger.info("正在连接到Keyspace...") - session = cluster.connect(config['keyspace']) - - # 设置session级别的容错参数 - session.default_timeout = 30 # 查询超时时间 - - connection_time = time.time() - start_time - logger.info(f"✅ Cassandra连接成功: 连接时间={connection_time:.3f}秒") - - # 记录集群状态 - try: - cluster_name = cluster.metadata.cluster_name or "Unknown" - logger.info(f" 集群名称: {cluster_name}") - - # 记录可用主机状态 - live_hosts = [str(host.address) for host in cluster.metadata.all_hosts() if host.is_up] - down_hosts = [str(host.address) for host in cluster.metadata.all_hosts() if not host.is_up] - - logger.info(f" 可用节点: {live_hosts} ({len(live_hosts)}个)") - if down_hosts: - logger.warning(f" 故障节点: {down_hosts} ({len(down_hosts)}个)") - - except Exception as meta_error: - logger.warning(f"无法获取集群元数据: {meta_error}") - - return cluster, session - - except Exception as e: - connection_time = time.time() - start_time - error_msg = str(e) - - logger.error(f"❌ Cassandra连接失败: 连接时间={connection_time:.3f}秒") - logger.error(f"错误类型: {type(e).__name__}") - logger.error(f"错误详情: {error_msg}") - - # 提供详细的诊断信息 - if "connection refused" in error_msg.lower() or "unable to connect" in error_msg.lower(): - logger.error("❌ 诊断:无法连接到Cassandra服务器") - logger.error("🔧 建议检查:") - logger.error(" 1. Cassandra服务是否启动") - logger.error(" 2. 主机地址和端口是否正确") - logger.error(" 3. 网络防火墙是否阻挡连接") - - elif "timeout" in error_msg.lower(): - logger.error("❌ 诊断:连接超时") - logger.error("🔧 建议检查:") - logger.error(" 1. 网络延迟是否过高") - logger.error(" 2. Cassandra服务器负载是否过高") - logger.error(" 3. 增加连接超时时间") - - elif "authentication" in error_msg.lower() or "unauthorized" in error_msg.lower(): - logger.error("❌ 诊断:认证失败") - logger.error("🔧 建议检查:") - logger.error(" 1. 用户名和密码是否正确") - logger.error(" 2. 用户是否有访问该keyspace的权限") - - elif "keyspace" in error_msg.lower(): - logger.error("❌ 诊断:Keyspace不存在") - logger.error("🔧 建议检查:") - logger.error(" 1. Keyspace名称是否正确") - logger.error(" 2. Keyspace是否已创建") - - else: - logger.error("❌ 诊断:未知连接错误") - logger.error("🔧 建议:") - logger.error(" 1. 检查所有连接参数") - logger.error(" 2. 查看Cassandra服务器日志") - logger.error(" 3. 测试网络连通性") - - return None, None - -def execute_query(session, table, keys, fields, values, exclude_fields=None): - """执行查询,支持单主键和复合主键""" - try: - # 参数验证 - if not keys or len(keys) == 0: - logger.error("Keys参数为空,无法构建查询") - return [] - - if not values or len(values) == 0: - logger.error("Values参数为空,无法构建查询") - return [] - - # 构建查询条件 - if len(keys) == 1: - # 单主键查询(保持原有逻辑) - quoted_values = [f"'{value}'" for value in values] - query_conditions = f"{keys[0]} IN ({', '.join(quoted_values)})" - else: - # 复合主键查询 - conditions = [] - for value in values: - # 检查value是否包含复合主键分隔符 - if isinstance(value, str) and ',' in value: - # 解析复合主键值 - key_values = [v.strip() for v in value.split(',')] - if len(key_values) == len(keys): - # 构建单个复合主键条件: (key1='val1' AND key2='val2') - key_conditions = [] - for i, (key, val) in enumerate(zip(keys, key_values)): - key_conditions.append(f"{key} = '{val}'") - conditions.append(f"({' AND '.join(key_conditions)})") - else: - logger.warning(f"复合主键值 '{value}' 的字段数量({len(key_values)})与主键字段数量({len(keys)})不匹配") - # 将其作为第一个主键的值处理 - conditions.append(f"{keys[0]} = '{value}'") - else: - # 单值,作为第一个主键的值处理 - conditions.append(f"{keys[0]} = '{value}'") - - if conditions: - query_conditions = ' OR '.join(conditions) - else: - logger.error("无法构建有效的查询条件") - return [] - - # 确定要查询的字段 - if fields: - fields_str = ", ".join(fields) - else: - fields_str = "*" - - query_sql = f"SELECT {fields_str} FROM {table} WHERE {query_conditions};" - - # 记录查询SQL日志 - logger.info(f"执行查询SQL: {query_sql}") - if len(keys) > 1: - logger.info(f"复合主键查询参数: 表={table}, 主键字段={keys}, 字段={fields_str}, Key数量={len(values)}") - else: - logger.info(f"单主键查询参数: 表={table}, 主键字段={keys[0]}, 字段={fields_str}, Key数量={len(values)}") - - # 执行查询 - start_time = time.time() - result = session.execute(query_sql) - execution_time = time.time() - start_time - - result_list = list(result) if result else [] - logger.info(f"查询完成: 执行时间={execution_time:.3f}秒, 返回记录数={len(result_list)}") - - return result_list - except Exception as e: - logger.error(f"查询执行失败: SQL={query_sql if 'query_sql' in locals() else 'N/A'}, 错误={str(e)}") - return [] - -def execute_sharding_query(session, shard_mapping, keys, fields, exclude_fields=None): - """ - 执行分表查询 - :param session: Cassandra会话 - :param shard_mapping: 分表映射 {table_name: [keys]} - :param keys: 主键字段名列表 - :param fields: 要查询的字段列表 - :param exclude_fields: 要排除的字段列表 - :return: (查询结果列表, 查询到的表列表, 查询失败的表列表) - """ - all_results = [] - queried_tables = [] - error_tables = [] - - logger.info(f"开始执行分表查询,涉及 {len(shard_mapping)} 张分表") - total_start_time = time.time() - - for table_name, table_keys in shard_mapping.items(): - try: - logger.info(f"查询分表 {table_name},包含 {len(table_keys)} 个key: {table_keys}") - # 为每个分表执行查询 - table_results = execute_query(session, table_name, keys, fields, table_keys, exclude_fields) - all_results.extend(table_results) - queried_tables.append(table_name) - logger.info(f"分表 {table_name} 查询成功,返回 {len(table_results)} 条记录") - except Exception as e: - logger.error(f"分表 {table_name} 查询失败: {e}") - error_tables.append(table_name) - - total_execution_time = time.time() - total_start_time - logger.info(f"分表查询总计完成: 执行时间={total_execution_time:.3f}秒, 成功表数={len(queried_tables)}, 失败表数={len(error_tables)}, 总记录数={len(all_results)}") - - return all_results, queried_tables, error_tables - -def execute_mixed_query(pro_session, test_session, pro_config, test_config, keys, fields_to_compare, values, exclude_fields, sharding_config): - """ - 执行混合查询(生产环境分表,测试环境可能单表或分表) - """ - results = { - 'pro_data': [], - 'test_data': [], - 'sharding_info': { - 'calculation_stats': {} - } - } - - # 处理生产环境查询 - if sharding_config.get('use_sharding_for_pro', False): - # 获取生产环境分表配置参数,优先使用专用参数,否则使用通用参数 - pro_interval = sharding_config.get('pro_interval_seconds') or sharding_config.get('interval_seconds', 604800) - pro_table_count = sharding_config.get('pro_table_count') or sharding_config.get('table_count', 14) - - # 记录生产环境分表配置信息 - logger.info(f"=== 生产环境分表配置 ===") - logger.info(f"启用分表查询: True") - logger.info(f"时间间隔: {pro_interval}秒 ({pro_interval//86400}天)") - logger.info(f"分表数量: {pro_table_count}张") - logger.info(f"基础表名: {pro_config['table']}") - - pro_calculator = ShardingCalculator( - interval_seconds=pro_interval, - table_count=pro_table_count - ) - pro_shard_mapping, pro_failed_keys, pro_calc_stats = pro_calculator.get_all_shard_tables_for_keys( - pro_config['table'], values - ) - - logger.info(f"生产环境分表映射结果: 涉及{len(pro_shard_mapping)}张分表, 失败Key数量: {len(pro_failed_keys)}") - - pro_data, pro_queried_tables, pro_error_tables = execute_sharding_query( - pro_session, pro_shard_mapping, keys, fields_to_compare, exclude_fields - ) - - results['pro_data'] = pro_data - results['sharding_info']['pro_shards'] = { - 'enabled': True, - 'interval_seconds': sharding_config.get('pro_interval_seconds', 604800), - 'table_count': sharding_config.get('pro_table_count', 14), - 'queried_tables': pro_queried_tables, - 'error_tables': pro_error_tables, - 'failed_keys': pro_failed_keys - } - results['sharding_info']['calculation_stats'].update(pro_calc_stats) - else: - # 生产环境单表查询 - logger.info(f"=== 生产环境单表配置 ===") - logger.info(f"启用分表查询: False") - logger.info(f"表名: {pro_config['table']}") - - pro_data = execute_query(pro_session, pro_config['table'], keys, fields_to_compare, values, exclude_fields) - results['pro_data'] = pro_data - results['sharding_info']['pro_shards'] = { - 'enabled': False, - 'queried_tables': [pro_config['table']] - } - - # 处理测试环境查询 - if sharding_config.get('use_sharding_for_test', False): - # 获取测试环境分表配置参数,优先使用专用参数,否则使用通用参数 - test_interval = sharding_config.get('test_interval_seconds') or sharding_config.get('interval_seconds', 604800) - test_table_count = sharding_config.get('test_table_count') or sharding_config.get('table_count', 14) - - # 记录测试环境分表配置信息 - logger.info(f"=== 测试环境分表配置 ===") - logger.info(f"启用分表查询: True") - logger.info(f"时间间隔: {test_interval}秒 ({test_interval//86400}天)") - logger.info(f"分表数量: {test_table_count}张") - logger.info(f"基础表名: {test_config['table']}") - - test_calculator = ShardingCalculator( - interval_seconds=test_interval, - table_count=test_table_count - ) - test_shard_mapping, test_failed_keys, test_calc_stats = test_calculator.get_all_shard_tables_for_keys( - test_config['table'], values - ) - - logger.info(f"测试环境分表映射结果: 涉及{len(test_shard_mapping)}张分表, 失败Key数量: {len(test_failed_keys)}") - - test_data, test_queried_tables, test_error_tables = execute_sharding_query( - test_session, test_shard_mapping, keys, fields_to_compare, exclude_fields - ) - - results['test_data'] = test_data - results['sharding_info']['test_shards'] = { - 'enabled': True, - 'interval_seconds': test_interval, - 'table_count': test_table_count, - 'queried_tables': test_queried_tables, - 'error_tables': test_error_tables, - 'failed_keys': test_failed_keys - } - - # 合并计算统计信息 - if not results['sharding_info']['calculation_stats']: - results['sharding_info']['calculation_stats'] = test_calc_stats - else: - # 测试环境单表查询 - logger.info(f"=== 测试环境单表配置 ===") - logger.info(f"启用分表查询: False") - logger.info(f"表名: {test_config['table']}") - - test_data = execute_query(test_session, test_config['table'], keys, fields_to_compare, values, exclude_fields) - results['test_data'] = test_data - results['sharding_info']['test_shards'] = { - 'enabled': False, - 'queried_tables': [test_config['table']] - } - - return results - -def compare_results(pro_data, test_data, keys, fields_to_compare, exclude_fields, values): - """比较查询结果,支持复合主键""" - differences = [] - field_diff_count = {} - identical_results = [] # 存储相同的结果 - - def match_composite_key(row, composite_value, keys): - """检查数据行是否匹配复合主键值""" - if len(keys) == 1: - # 单主键匹配 - return getattr(row, keys[0]) == composite_value - else: - # 复合主键匹配 - if isinstance(composite_value, str) and ',' in composite_value: - key_values = [v.strip() for v in composite_value.split(',')] - if len(key_values) == len(keys): - return all(str(getattr(row, key)) == key_val for key, key_val in zip(keys, key_values)) - # 如果不是复合值,只匹配第一个主键 - return getattr(row, keys[0]) == composite_value - - for value in values: - # 查找生产表和测试表中该主键值的相关数据 - rows_pro = [row for row in pro_data if match_composite_key(row, value, keys)] - rows_test = [row for row in test_data if match_composite_key(row, value, keys)] - - for row_pro in rows_pro: - # 在测试表中查找相同主键的行 - row_test = next( - (row for row in rows_test if all(getattr(row, key) == getattr(row_pro, key) for key in keys)), - None - ) - - if row_test: - # 确定要比较的列 - columns = fields_to_compare if fields_to_compare else row_pro._fields - columns = [col for col in columns if col not in exclude_fields] - - has_difference = False - row_differences = [] - identical_fields = {} - - for column in columns: - value_pro = getattr(row_pro, column) - value_test = getattr(row_test, column) - - # 使用智能比较函数 - if not compare_values(value_pro, value_test): - has_difference = True - # 格式化显示值 - formatted_pro_value = format_json_for_display(value_pro) - formatted_test_value = format_json_for_display(value_test) - - row_differences.append({ - 'key': {key: getattr(row_pro, key) for key in keys}, - 'field': column, - 'pro_value': formatted_pro_value, - 'test_value': formatted_test_value, - 'is_json': is_json_field(value_pro) or is_json_field(value_test), - 'is_array': is_json_array_field(value_pro) or is_json_array_field(value_test) - }) - - # 统计字段差异次数 - field_diff_count[column] = field_diff_count.get(column, 0) + 1 - else: - # 存储相同的字段值 - identical_fields[column] = format_json_for_display(value_pro) - - if has_difference: - differences.extend(row_differences) - else: - # 如果没有差异,存储到相同结果中 - identical_results.append({ - 'key': {key: getattr(row_pro, key) for key in keys}, - 'pro_fields': identical_fields, - 'test_fields': {col: format_json_for_display(getattr(row_test, col)) for col in columns} - }) - else: - # 在测试表中未找到对应行 - differences.append({ - 'key': {key: getattr(row_pro, key) for key in keys}, - 'message': '在测试表中未找到该行' - }) - - # 检查测试表中是否有生产表中不存在的行 - for row_test in rows_test: - row_pro = next( - (row for row in rows_pro if all(getattr(row, key) == getattr(row_test, key) for key in keys)), - None - ) - if not row_pro: - differences.append({ - 'key': {key: getattr(row_test, key) for key in keys}, - 'message': '在生产表中未找到该行' - }) - - return differences, field_diff_count, identical_results - -def generate_comparison_summary(total_keys, pro_count, test_count, differences, identical_results, field_diff_count): - """生成比较总结报告""" - # 计算基本统计 - different_records = len(set([list(diff['key'].values())[0] for diff in differences if 'field' in diff])) - identical_records = len(identical_results) - missing_in_test = len([diff for diff in differences if diff.get('message') == '在测试表中未找到该行']) - missing_in_pro = len([diff for diff in differences if diff.get('message') == '在生产表中未找到该行']) - - # 计算百分比 - def safe_percentage(part, total): - return round((part / total * 100), 2) if total > 0 else 0 - - identical_percentage = safe_percentage(identical_records, total_keys) - different_percentage = safe_percentage(different_records, total_keys) - - # 生成总结 - summary = { - 'overview': { - 'total_keys_queried': total_keys, - 'pro_records_found': pro_count, - 'test_records_found': test_count, - 'identical_records': identical_records, - 'different_records': different_records, - 'missing_in_test': missing_in_test, - 'missing_in_pro': missing_in_pro - }, - 'percentages': { - 'data_consistency': identical_percentage, - 'data_differences': different_percentage, - 'missing_rate': safe_percentage(missing_in_test + missing_in_pro, total_keys) - }, - 'field_analysis': { - 'total_fields_compared': len(field_diff_count) if field_diff_count else 0, - 'most_different_fields': sorted(field_diff_count.items(), key=lambda x: x[1], reverse=True)[:5] if field_diff_count else [] - }, - 'data_quality': { - 'completeness': safe_percentage(pro_count + test_count, total_keys * 2), - 'consistency_score': identical_percentage, - 'quality_level': get_quality_level(identical_percentage) - }, - 'recommendations': generate_recommendations(identical_percentage, missing_in_test, missing_in_pro, field_diff_count) - } - - return summary - -def get_quality_level(consistency_percentage): - """根据一致性百分比获取数据质量等级""" - if consistency_percentage >= 95: - return {'level': '优秀', 'color': 'success', 'description': '数据一致性非常高'} - elif consistency_percentage >= 90: - return {'level': '良好', 'color': 'info', 'description': '数据一致性较高'} - elif consistency_percentage >= 80: - return {'level': '一般', 'color': 'warning', 'description': '数据一致性中等,需要关注'} - else: - return {'level': '较差', 'color': 'danger', 'description': '数据一致性较低,需要重点处理'} - -def generate_recommendations(consistency_percentage, missing_in_test, missing_in_pro, field_diff_count): - """生成改进建议""" - recommendations = [] - - if consistency_percentage < 90: - recommendations.append('建议重点关注数据一致性问题,检查数据同步机制') - - if missing_in_test > 0: - recommendations.append(f'测试环境缺失 {missing_in_test} 条记录,建议检查数据迁移过程') - - if missing_in_pro > 0: - recommendations.append(f'生产环境缺失 {missing_in_pro} 条记录,建议检查数据完整性') - - if field_diff_count: - top_diff_field = max(field_diff_count.items(), key=lambda x: x[1]) - recommendations.append(f'字段 "{top_diff_field[0]}" 差异最多({top_diff_field[1]}次),建议优先处理') - - if not recommendations: - recommendations.append('数据质量良好,建议继续保持当前的数据管理流程') - - return recommendations - -@app.route('/') -def index(): - return render_template('index.html') - -@app.route('/test-config-load') -def test_config_load(): - """配置加载测试页面""" - return send_from_directory('.', 'test_config_load.html') - -@app.route('/db-compare') -def db_compare(): - return render_template('db_compare.html') - -@app.route('/api/sharding-query', methods=['POST']) -def sharding_query_compare(): - """分表查询比对API""" - try: - data = request.json - - # 开始新的查询批次 - batch_id = query_log_collector.start_new_batch('分表') - - logger.info("开始执行分表数据库比对查询") - - # 解析配置 - pro_config = data.get('pro_config', DEFAULT_CONFIG['pro_config']) - test_config = data.get('test_config', DEFAULT_CONFIG['test_config']) - - # 从query_config中获取keys等参数 - query_config = data.get('query_config', {}) - keys = query_config.get('keys', DEFAULT_CONFIG['keys']) - fields_to_compare = query_config.get('fields_to_compare', DEFAULT_CONFIG['fields_to_compare']) - exclude_fields = query_config.get('exclude_fields', DEFAULT_CONFIG['exclude_fields']) - - values = data.get('values', []) - sharding_config = data.get('sharding_config', {}) - - # 参数验证 - if not values: - logger.warning("分表查询失败:未提供查询key值") - return jsonify({'error': '请提供查询key值'}), 400 - - if not keys: - logger.warning("分表查询失败:未提供主键字段") - return jsonify({'error': '请提供主键字段'}), 400 - - # 添加详细的参数日志 - logger.info(f"分表查询参数解析结果:") - logger.info(f" keys: {keys}") - logger.info(f" values数量: {len(values)}") - logger.info(f" fields_to_compare: {fields_to_compare}") - logger.info(f" exclude_fields: {exclude_fields}") - logger.info(f" sharding_config原始数据: {sharding_config}") - logger.info(f" sharding_config具体参数:") - logger.info(f" use_sharding_for_pro: {sharding_config.get('use_sharding_for_pro')}") - logger.info(f" use_sharding_for_test: {sharding_config.get('use_sharding_for_test')}") - logger.info(f" pro_interval_seconds: {sharding_config.get('pro_interval_seconds')}") - logger.info(f" pro_table_count: {sharding_config.get('pro_table_count')}") - logger.info(f" test_interval_seconds: {sharding_config.get('test_interval_seconds')}") - logger.info(f" test_table_count: {sharding_config.get('test_table_count')}") - logger.info(f" interval_seconds: {sharding_config.get('interval_seconds')}") - logger.info(f" table_count: {sharding_config.get('table_count')}") - - logger.info(f"分表查询配置:{len(values)}个key值,生产表:{pro_config['table']},测试表:{test_config['table']}") - - # 创建数据库连接 - pro_cluster, pro_session = create_connection(pro_config) - test_cluster, test_session = create_connection(test_config) - - if not pro_session or not test_session: - logger.error("数据库连接失败") - return jsonify({'error': '数据库连接失败,请检查配置信息'}), 500 - - try: - # 执行混合查询(支持生产环境分表、测试环境单表/分表的组合) - logger.info("执行分表混合查询") - query_results = execute_mixed_query( - pro_session, test_session, pro_config, test_config, - keys, fields_to_compare, values, exclude_fields, sharding_config - ) - - pro_data = query_results['pro_data'] - test_data = query_results['test_data'] - sharding_info = query_results['sharding_info'] - - logger.info(f"分表查询结果:生产表 {len(pro_data)} 条记录,测试表 {len(test_data)} 条记录") - - # 比较结果 - differences, field_diff_count, identical_results = compare_results( - pro_data, test_data, keys, fields_to_compare, exclude_fields, values - ) - - # 统计信息 - different_ids = set() - for diff in differences: - if 'field' in diff: - different_ids.add(list(diff['key'].values())[0]) - - non_different_ids = set(values) - different_ids - - # 生成比较总结 - summary = generate_comparison_summary( - len(values), len(pro_data), len(test_data), - differences, identical_results, field_diff_count - ) - - result = { - 'total_keys': len(values), - 'pro_count': len(pro_data), - 'test_count': len(test_data), - 'differences': differences, - 'identical_results': identical_results, - 'field_diff_count': field_diff_count, - 'different_ids': list(different_ids), - 'non_different_ids': list(non_different_ids), - 'summary': summary, - 'sharding_info': sharding_info, # 包含分表查询信息 - 'raw_pro_data': [dict(row._asdict()) for row in pro_data] if pro_data else [], - 'raw_test_data': [dict(row._asdict()) for row in test_data] if test_data else [] - } - - logger.info(f"分表比对完成:发现 {len(differences)} 处差异") - - # 自动保存分表查询历史记录 - try: - # 生成历史记录名称 - timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") - history_name = f"分表查询_{timestamp}" - history_description = f"自动保存 - 分表查询{len(values)}个Key,发现{len(differences)}处差异" - - # 保存历史记录 - history_id = save_query_history( - name=history_name, - description=history_description, - pro_config=pro_config, - test_config=test_config, - query_config={ - 'keys': keys, - 'fields_to_compare': fields_to_compare, - 'exclude_fields': exclude_fields - }, - query_keys=values, - results_summary=summary, - execution_time=0.0, # 可以后续优化计算实际执行时间 - total_keys=len(values), - differences_count=len(differences), - identical_count=len(identical_results), - # 新增分表相关参数 - sharding_config=sharding_config, - query_type='sharding', - # 添加查询结果数据 - raw_results={ - 'raw_pro_data': [dict(row._asdict()) for row in pro_data] if pro_data else [], - 'raw_test_data': [dict(row._asdict()) for row in test_data] if test_data else [], - 'sharding_info': sharding_info # 包含分表信息 - }, - differences_data=differences, - identical_data=identical_results - ) - - # 关联查询日志与历史记录 - if history_id: - query_log_collector.set_history_id(history_id) - logger.info(f"分表查询历史记录保存成功: {history_name}, ID: {history_id}") - else: - logger.warning("分表查询历史记录保存失败,无法获取history_id") - except Exception as e: - logger.warning(f"保存分表查询历史记录失败: {e}") - - # 结束查询批次 - query_log_collector.end_current_batch() - return jsonify(result) - - except Exception as e: - logger.error(f"分表查询执行失败:{str(e)}") - # 结束查询批次(出错情况) - query_log_collector.end_current_batch() - return jsonify({'error': f'分表查询执行失败:{str(e)}'}), 500 - finally: - # 关闭连接 - if pro_cluster: - pro_cluster.shutdown() - if test_cluster: - test_cluster.shutdown() - - except Exception as e: - logger.error(f"分表查询请求处理失败:{str(e)}") - # 结束查询批次(请求处理出错) - query_log_collector.end_current_batch() - return jsonify({'error': f'分表查询请求处理失败:{str(e)}'}), 500 - -@app.route('/api/query', methods=['POST']) -def query_compare(): - try: - data = request.json - - # 开始新的查询批次 - batch_id = query_log_collector.start_new_batch('单表') - - logger.info("开始执行数据库比对查询") - - # 解析配置 - pro_config = data.get('pro_config', DEFAULT_CONFIG['pro_config']) - test_config = data.get('test_config', DEFAULT_CONFIG['test_config']) - - # 从query_config中获取keys等参数 - query_config = data.get('query_config', {}) - keys = query_config.get('keys', DEFAULT_CONFIG['keys']) - fields_to_compare = query_config.get('fields_to_compare', DEFAULT_CONFIG['fields_to_compare']) - exclude_fields = query_config.get('exclude_fields', DEFAULT_CONFIG['exclude_fields']) - - values = data.get('values', []) - - # 参数验证 - if not values: - logger.warning("查询失败:未提供查询key值") - return jsonify({'error': '请提供查询key值'}), 400 - - if not keys: - logger.warning("查询失败:未提供主键字段") - return jsonify({'error': '请提供主键字段'}), 400 - - # 添加详细的参数日志 - logger.info(f"单表查询参数解析结果:") - logger.info(f" keys: {keys}") - logger.info(f" values数量: {len(values)}") - logger.info(f" fields_to_compare: {fields_to_compare}") - logger.info(f" exclude_fields: {exclude_fields}") - - logger.info(f"查询配置:{len(values)}个key值,生产表:{pro_config['table']},测试表:{test_config['table']}") - - # 创建数据库连接 - pro_cluster, pro_session = create_connection(pro_config) - test_cluster, test_session = create_connection(test_config) - - if not pro_session or not test_session: - logger.error("数据库连接失败") - return jsonify({'error': '数据库连接失败,请检查配置信息'}), 500 - - try: - # 执行查询 - logger.info("执行生产环境查询") - pro_data = execute_query(pro_session, pro_config['table'], keys, fields_to_compare, values, exclude_fields) - logger.info("执行测试环境查询") - test_data = execute_query(test_session, test_config['table'], keys, fields_to_compare, values, exclude_fields) - - logger.info(f"查询结果:生产表 {len(pro_data)} 条记录,测试表 {len(test_data)} 条记录") - - # 比较结果 - differences, field_diff_count, identical_results = compare_results(pro_data, test_data, keys, fields_to_compare, exclude_fields, values) - - # 统计信息 - different_ids = set() - for diff in differences: - if 'field' in diff: - different_ids.add(list(diff['key'].values())[0]) - - non_different_ids = set(values) - different_ids - - # 生成比较总结 - summary = generate_comparison_summary( - len(values), len(pro_data), len(test_data), - differences, identical_results, field_diff_count - ) - - result = { - 'total_keys': len(values), - 'pro_count': len(pro_data), - 'test_count': len(test_data), - 'differences': differences, - 'identical_results': identical_results, - 'field_diff_count': field_diff_count, - 'different_ids': list(different_ids), - 'non_different_ids': list(non_different_ids), - 'summary': summary, - 'raw_pro_data': [dict(row._asdict()) for row in pro_data] if pro_data else [], - 'raw_test_data': [dict(row._asdict()) for row in test_data] if test_data else [] - } - - logger.info(f"比对完成:发现 {len(differences)} 处差异") - - # 自动保存查询历史记录(可选,基于执行结果) - try: - # 生成历史记录名称 - timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") - history_name = f"查询_{timestamp}" - history_description = f"自动保存 - 查询{len(values)}个Key,发现{len(differences)}处差异" - - # 保存历史记录 - history_id = save_query_history( - name=history_name, - description=history_description, - pro_config=pro_config, - test_config=test_config, - query_config={ - 'keys': keys, - 'fields_to_compare': fields_to_compare, - 'exclude_fields': exclude_fields - }, - query_keys=values, - results_summary=summary, - execution_time=0.0, # 可以后续优化计算实际执行时间 - total_keys=len(values), - differences_count=len(differences), - identical_count=len(identical_results), - # 添加查询结果数据 - raw_results={ - 'raw_pro_data': [dict(row._asdict()) for row in pro_data] if pro_data else [], - 'raw_test_data': [dict(row._asdict()) for row in test_data] if test_data else [] - }, - differences_data=differences, - identical_data=identical_results - ) - - # 关联查询日志与历史记录 - if history_id: - query_log_collector.set_history_id(history_id) - logger.info(f"查询历史记录保存成功: {history_name}, ID: {history_id}") - else: - logger.warning("查询历史记录保存失败,无法获取history_id") - except Exception as e: - logger.warning(f"保存查询历史记录失败: {e}") - - # 结束查询批次 - query_log_collector.end_current_batch() - return jsonify(result) - - except Exception as e: - logger.error(f"查询执行失败:{str(e)}") - # 结束查询批次(出错情况) - query_log_collector.end_current_batch() - return jsonify({'error': f'查询执行失败:{str(e)}'}), 500 - finally: - # 关闭连接 - if pro_cluster: - pro_cluster.shutdown() - if test_cluster: - test_cluster.shutdown() - - except Exception as e: - logger.error(f"请求处理失败:{str(e)}") - # 结束查询批次(请求处理出错) - query_log_collector.end_current_batch() - return jsonify({'error': f'请求处理失败:{str(e)}'}), 500 - -@app.route('/api/default-config') -def get_default_config(): - return jsonify(DEFAULT_CONFIG) - -# 配置组管理API -@app.route('/api/config-groups', methods=['GET']) -def api_get_config_groups(): - """获取所有配置组""" - config_groups = get_config_groups() - return jsonify({'success': True, 'data': config_groups}) - -@app.route('/api/config-groups', methods=['POST']) -def api_save_config_group(): - """保存配置组""" - try: - data = request.json - name = data.get('name', '').strip() - description = data.get('description', '').strip() - pro_config = data.get('pro_config', {}) - test_config = data.get('test_config', {}) - - # 获取查询配置,支持两种格式 - if 'query_config' in data: - # 嵌套格式 - query_config = data.get('query_config', {}) - else: - # 平铺格式 - query_config = { - 'keys': data.get('keys', []), - 'fields_to_compare': data.get('fields_to_compare', []), - 'exclude_fields': data.get('exclude_fields', []) - } - - # 提取分表配置 - sharding_config = data.get('sharding_config') - - if not name: - return jsonify({'success': False, 'error': '配置组名称不能为空'}), 400 - - success = save_config_group(name, description, pro_config, test_config, query_config, sharding_config) - - if success: - return jsonify({'success': True, 'message': '配置组保存成功'}) - else: - return jsonify({'success': False, 'error': '配置组保存失败'}), 500 - - except Exception as e: - logger.error(f"保存配置组API失败: {e}") - return jsonify({'success': False, 'error': str(e)}), 500 - -@app.route('/api/config-groups/', methods=['GET']) -def api_get_config_group(group_id): - """获取指定配置组详情""" - config_group = get_config_group_by_id(group_id) - - if config_group: - return jsonify({'success': True, 'data': config_group}) - else: - return jsonify({'success': False, 'error': '配置组不存在'}), 404 - -@app.route('/api/config-groups/', methods=['DELETE']) -def api_delete_config_group(group_id): - """删除配置组""" - success = delete_config_group(group_id) - - if success: - return jsonify({'success': True, 'message': '配置组删除成功'}) - else: - return jsonify({'success': False, 'error': '配置组删除失败'}), 500 - -@app.route('/api/init-db', methods=['POST']) -def api_init_database(): - """手动初始化数据库(用于测试)""" - success = init_database() - if success: - return jsonify({'success': True, 'message': '数据库初始化成功'}) - else: - return jsonify({'success': False, 'error': '数据库初始化失败'}), 500 - -# 查询历史管理API -@app.route('/api/query-history', methods=['GET']) -def api_get_query_history(): - """获取所有查询历史记录""" - history_list = get_query_history() - return jsonify({'success': True, 'data': history_list}) - -@app.route('/api/query-history', methods=['POST']) -def api_save_query_history(): - """保存查询历史记录,支持分表查询""" - try: - data = request.json - name = data.get('name', '').strip() - description = data.get('description', '').strip() - pro_config = data.get('pro_config', {}) - test_config = data.get('test_config', {}) - query_config = data.get('query_config', {}) - query_keys = data.get('query_keys', []) - results_summary = data.get('results_summary', {}) - execution_time = data.get('execution_time', 0.0) - total_keys = data.get('total_keys', 0) - differences_count = data.get('differences_count', 0) - identical_count = data.get('identical_count', 0) - # 新增分表相关字段 - sharding_config = data.get('sharding_config') - query_type = data.get('query_type', 'single') - - if not name: - return jsonify({'success': False, 'error': '历史记录名称不能为空'}), 400 - - success = save_query_history( - name, description, pro_config, test_config, query_config, - query_keys, results_summary, execution_time, total_keys, - differences_count, identical_count, sharding_config, query_type - ) - - if success: - query_type_desc = '分表查询' if query_type == 'sharding' else '单表查询' - return jsonify({'success': True, 'message': f'{query_type_desc}历史记录保存成功'}) - else: - return jsonify({'success': False, 'error': '查询历史记录保存失败'}), 500 - - except Exception as e: - logger.error(f"保存查询历史记录API失败: {e}") - return jsonify({'success': False, 'error': str(e)}), 500 - -@app.route('/api/query-history/', methods=['GET']) -def api_get_query_history_detail(history_id): - """获取指定查询历史记录详情""" - history_record = get_query_history_by_id(history_id) - - if history_record: - return jsonify({'success': True, 'data': history_record}) - else: - return jsonify({'success': False, 'error': '查询历史记录不存在'}), 404 - -@app.route('/api/query-history//results', methods=['GET']) -def api_get_query_history_results(history_id): - """获取查询历史记录的完整结果数据""" - try: - history_record = get_query_history_by_id(history_id) - if not history_record: - return jsonify({'success': False, 'error': '历史记录不存在'}), 404 - - # 安全获取raw_results数据 - raw_results = history_record.get('raw_results') - if raw_results and isinstance(raw_results, dict): - raw_pro_data = raw_results.get('raw_pro_data', []) or [] - raw_test_data = raw_results.get('raw_test_data', []) or [] - sharding_info = raw_results.get('sharding_info') if history_record.get('query_type') == 'sharding' else None - else: - raw_pro_data = [] - raw_test_data = [] - sharding_info = None - - # 安全获取差异和相同结果数据 - differences_data = history_record.get('differences_data') or [] - identical_data = history_record.get('identical_data') or [] - - # 构建完整的查询结果格式,与API查询结果保持一致 - result = { - 'total_keys': history_record['total_keys'], - 'pro_count': len(raw_pro_data), - 'test_count': len(raw_test_data), - 'differences': differences_data, - 'identical_results': identical_data, - 'field_diff_count': {}, # 可以从differences_data中重新计算 - 'summary': history_record.get('results_summary', {}), - 'raw_pro_data': raw_pro_data, - 'raw_test_data': raw_test_data, - # 如果是分表查询,添加分表信息 - 'sharding_info': sharding_info, - # 添加历史记录元信息 - 'history_info': { - 'id': history_record['id'], - 'name': history_record['name'], - 'description': history_record['description'], - 'created_at': history_record['created_at'], - 'query_type': history_record.get('query_type', 'single') - } - } - - # 重新计算field_diff_count - if differences_data: - field_diff_count = {} - for diff in differences_data: - if isinstance(diff, dict) and 'field' in diff: - field_name = diff['field'] - field_diff_count[field_name] = field_diff_count.get(field_name, 0) + 1 - result['field_diff_count'] = field_diff_count - - return jsonify({ - 'success': True, - 'data': result, - 'message': f'历史记录 "{history_record["name"]}" 结果加载成功' - }) - - except Exception as e: - logger.error(f"获取查询历史记录结果失败: {e}") - return jsonify({'success': False, 'error': f'获取历史记录结果失败: {str(e)}'}), 500 - -@app.route('/api/query-history/', methods=['DELETE']) -def api_delete_query_history(history_id): - """删除查询历史记录""" - success = delete_query_history(history_id) - - if success: - return jsonify({'success': True, 'message': '查询历史记录删除成功'}) - else: - return jsonify({'success': False, 'error': '查询历史记录删除失败'}), 500 - -@app.route('/api/query-logs', methods=['GET']) -def api_get_query_logs(): - """获取查询日志,支持分组显示和数据库存储""" - try: - limit = request.args.get('limit', type=int) - grouped = request.args.get('grouped', 'true').lower() == 'true' # 默认分组显示 - from_db = request.args.get('from_db', 'true').lower() == 'true' # 默认从数据库获取 - - if grouped: - # 返回分组日志 - grouped_logs = query_log_collector.get_logs_grouped_by_batch(limit, from_db) - # 获取总数(用于统计) - total_logs = query_log_collector._get_total_logs_count() if from_db else len(query_log_collector.logs) - - return jsonify({ - 'success': True, - 'data': grouped_logs, - 'total': total_logs, - 'grouped': True, - 'from_db': from_db - }) - else: - # 返回原始日志列表 - logs = query_log_collector.get_logs(limit, from_db) - total_logs = query_log_collector._get_total_logs_count() if from_db else len(query_log_collector.logs) - - return jsonify({ - 'success': True, - 'data': logs, - 'total': total_logs, - 'grouped': False, - 'from_db': from_db - }) - except Exception as e: - logger.error(f"获取查询日志失败: {e}") - return jsonify({'success': False, 'error': str(e)}), 500 - -@app.route('/api/query-logs', methods=['DELETE']) -def api_clear_query_logs(): - """清空查询日志,支持清空数据库日志""" - try: - clear_db = request.args.get('clear_db', 'true').lower() == 'true' # 默认清空数据库 - query_log_collector.clear_logs(clear_db) - - message = '查询日志已清空(包括数据库)' if clear_db else '查询日志已清空(仅内存)' - return jsonify({'success': True, 'message': message}) - except Exception as e: - logger.error(f"清空查询日志失败: {e}") - return jsonify({'success': False, 'error': str(e)}), 500 - -@app.route('/api/query-logs/cleanup', methods=['POST']) -def api_cleanup_old_logs(): - """清理旧的查询日志""" - try: - days_to_keep = request.json.get('days_to_keep', 30) if request.json else 30 - deleted_count = query_log_collector.cleanup_old_logs(days_to_keep) - - return jsonify({ - 'success': True, - 'message': f'成功清理 {deleted_count} 条超过 {days_to_keep} 天的旧日志', - 'deleted_count': deleted_count - }) - except Exception as e: - logger.error(f"清理旧日志失败: {e}") - return jsonify({'success': False, 'error': str(e)}), 500 - -@app.route('/api/query-logs/history/', methods=['GET']) -def api_get_query_logs_by_history(history_id): - """根据历史记录ID获取相关查询日志""" - try: - logs = query_log_collector.get_logs_by_history_id(history_id) - - # 按批次分组显示 - grouped_logs = {} - batch_order = [] - - for log in logs: - batch_id = log.get('batch_id', 'unknown') - if batch_id not in grouped_logs: - grouped_logs[batch_id] = [] - batch_order.append(batch_id) - grouped_logs[batch_id].append(log) - - # 返回按时间顺序排列的批次 - grouped_result = [(batch_id, grouped_logs[batch_id]) for batch_id in batch_order] - - return jsonify({ - 'success': True, - 'data': grouped_result, - 'total': len(logs), - 'history_id': history_id, - 'grouped': True - }) - except Exception as e: - logger.error(f"获取历史记录相关查询日志失败: {e}") - return jsonify({'success': False, 'error': str(e)}), 500 - -if __name__ == '__main__': - app.run(debug=True, port=5001) diff --git a/modules/api_routes.py b/modules/api_routes.py index bfbacfb..b02e158 100644 --- a/modules/api_routes.py +++ b/modules/api_routes.py @@ -38,26 +38,15 @@ def setup_routes(app, query_log_collector): def index(): return render_template('index.html') - @app.route('/test-config-load') - def test_config_load(): - """配置加载测试页面""" - return send_from_directory('.', 'test_config_load.html') - @app.route('/db-compare') def db_compare(): + """Cassandra数据库比对工具页面""" return render_template('db_compare.html') @app.route('/redis-compare') def redis_compare(): + """Redis数据比对工具页面""" return render_template('redis_compare.html') - - @app.route('/redis-js-test') - def redis_js_test(): - return render_template('redis_js_test.html') - - @app.route('/redis-test') - def redis_test(): - return render_template('redis_test.html') # 基础API @app.route('/api/default-config') diff --git a/modules/cassandra_client.py b/modules/cassandra_client.py index 5ab13c8..e483963 100644 --- a/modules/cassandra_client.py +++ b/modules/cassandra_client.py @@ -1,6 +1,38 @@ """ Cassandra连接管理模块 -负责Cassandra数据库的连接和错误诊断 +==================== + +本模块负责Cassandra数据库的连接管理和高级错误诊断功能。 + +核心功能: +1. 智能连接管理:自动处理集群连接和故障转移 +2. 错误诊断系统:详细的连接失败分析和解决建议 +3. 性能监控:连接时间和集群状态的实时监控 +4. 容错机制:连接超时、重试和优雅降级 +5. 安全认证:支持用户名密码认证和SSL连接 + +连接特性: +- 负载均衡:使用DCAwareRoundRobinPolicy避免单点故障 +- 连接池管理:优化的连接复用和资源管理 +- 超时控制:可配置的连接和查询超时时间 +- 协议版本:使用稳定的CQL协议版本4 +- Schema同步:自动等待集群Schema一致性 + +错误诊断系统: +- 连接拒绝:检查服务状态和网络连通性 +- 认证失败:验证用户名密码和权限设置 +- 超时错误:分析网络延迟和服务器负载 +- Keyspace错误:验证Keyspace存在性和访问权限 +- 未知错误:提供通用的故障排查指南 + +监控功能: +- 集群状态:实时显示可用和故障节点 +- 连接时间:精确的连接建立时间测量 +- 元数据获取:集群名称和节点信息展示 +- 性能指标:连接成功率和响应时间统计 + +作者:BigDataTool项目组 +更新时间:2024年8月 """ import time @@ -12,7 +44,57 @@ from cassandra.policies import DCAwareRoundRobinPolicy logger = logging.getLogger(__name__) def create_connection(config): - """创建Cassandra连接,带有增强的错误诊断和容错机制""" + """ + 创建Cassandra数据库连接,具备增强的错误诊断和容错机制 + + 本函数提供企业级的Cassandra连接管理,包括: + - 智能连接建立:自动选择最优连接参数 + - 详细错误诊断:针对不同错误类型提供具体解决方案 + - 性能监控:记录连接时间和集群状态 + - 容错处理:连接失败时的优雅降级 + + Args: + config (dict): Cassandra连接配置,包含以下字段: + - hosts (list): Cassandra节点地址列表 + - port (int): 连接端口,默认9042 + - username (str): 认证用户名 + - password (str): 认证密码 + - keyspace (str): 目标keyspace名称 + - datacenter (str): 数据中心名称,默认'dc1' + + Returns: + tuple: (cluster, session) 连接对象元组 + - cluster: Cassandra集群对象,用于管理连接 + - session: 数据库会话对象,用于执行查询 + - 连接失败时返回 (None, None) + + 连接配置优化: + - 协议版本:使用稳定的协议版本4 + - 连接超时:15秒连接超时,避免长时间等待 + - 负载均衡:DCAwareRoundRobinPolicy避免跨DC查询 + - Schema同步:30秒Schema一致性等待时间 + - 查询超时:30秒默认查询超时时间 + + 错误诊断: + - 连接拒绝:提供服务状态检查建议 + - 认证失败:提供用户权限验证指南 + - 超时错误:提供网络和性能优化建议 + - Keyspace错误:提供Keyspace创建和权限指南 + + 使用示例: + config = { + 'hosts': ['192.168.1.100', '192.168.1.101'], + 'port': 9042, + 'username': 'cassandra', + 'password': 'password', + 'keyspace': 'my_keyspace', + 'datacenter': 'dc1' + } + cluster, session = create_connection(config) + if session: + result = session.execute("SELECT * FROM my_table LIMIT 10") + cluster.shutdown() + """ start_time = time.time() logger.info(f"=== 开始创建Cassandra连接 ===") diff --git a/modules/config_manager.py b/modules/config_manager.py index f796699..310e51a 100644 --- a/modules/config_manager.py +++ b/modules/config_manager.py @@ -1,6 +1,35 @@ """ 配置管理模块 -负责配置组和查询历史的CRUD操作 +============ + +本模块负责BigDataTool项目的配置管理和查询历史管理,提供完整的CRUD操作。 + +核心功能: +1. Cassandra配置组管理:数据库连接配置的保存、加载、删除 +2. Redis配置组管理:Redis集群配置的完整生命周期管理 +3. 查询历史管理:查询记录的持久化存储和检索 +4. 配置解析和验证:YAML格式配置的智能解析 + +支持的配置类型: +- Cassandra配置:集群地址、认证信息、keyspace等 +- Redis配置:集群节点、连接参数、查询选项等 +- 查询配置:主键字段、比较字段、排除字段等 +- 分表配置:TWCS分表参数、时间间隔、表数量等 + +数据存储格式: +- 所有配置以JSON格式存储在SQLite数据库中 +- 支持复杂嵌套结构和数组类型 +- 自动处理序列化和反序列化 +- 保持数据类型完整性 + +设计特点: +- 类型安全:完整的参数验证和类型检查 +- 事务安全:数据库操作的原子性保证 +- 错误恢复:数据库异常时的优雅降级 +- 向后兼容:支持旧版本配置格式的自动升级 + +作者:BigDataTool项目组 +更新时间:2024年8月 """ import json @@ -10,7 +39,8 @@ from .database import ensure_database, get_db_connection logger = logging.getLogger(__name__) -# 默认配置(不显示敏感信息) +# Cassandra数据库默认配置模板 +# 注意:此配置不包含敏感信息,仅作为UI表单的初始模板使用 DEFAULT_CONFIG = { 'pro_config': { 'cluster_name': '', @@ -37,7 +67,8 @@ DEFAULT_CONFIG = { 'exclude_fields': [] } -# Redis默认配置 +# Redis集群默认配置模板 +# 支持单节点和集群模式,自动检测连接类型 REDIS_DEFAULT_CONFIG = { 'cluster1_config': { 'name': '生产集群', diff --git a/modules/data_comparison.py b/modules/data_comparison.py index fb1816a..12f35f1 100644 --- a/modules/data_comparison.py +++ b/modules/data_comparison.py @@ -1,6 +1,39 @@ """ -数据比较模块 -负责两个数据集之间的比较、JSON处理和差异分析 +数据比较引擎模块 +================ + +本模块是BigDataTool的智能数据比较引擎,提供高级的数据差异分析功能。 + +核心功能: +1. 数据集比较:生产环境与测试环境数据的精确比对 +2. JSON智能比较:支持复杂JSON结构的深度比较 +3. 数组顺序无关比较:数组元素的智能匹配算法 +4. 复合主键支持:多字段主键的精确匹配 +5. 差异分析:详细的字段级差异统计和分析 +6. 数据质量评估:自动生成数据一致性报告 + +比较算法特性: +- JSON标准化:自动处理JSON格式差异(空格、顺序等) +- 数组智能比较:忽略数组元素顺序的深度比较 +- 类型容错:自动处理字符串与数字的类型差异 +- 编码处理:完善的UTF-8和二进制数据处理 +- 性能优化:大数据集的高效比较算法 + +支持的数据类型: +- 基础类型:字符串、数字、布尔值、null +- JSON对象:嵌套对象的递归比较 +- JSON数组:元素级别的智能匹配 +- 二进制数据:字节级别的精确比较 +- 复合主键:多字段组合的精确匹配 + +输出格式: +- 差异记录:详细的字段级差异信息 +- 统计报告:数据一致性的量化分析 +- 质量评估:数据质量等级和改进建议 +- 性能指标:比较过程的性能统计 + +作者:BigDataTool项目组 +更新时间:2024年8月 """ import json diff --git a/modules/database.py b/modules/database.py index 58fe8de..b2d2fa2 100644 --- a/modules/database.py +++ b/modules/database.py @@ -1,6 +1,36 @@ """ 数据库管理模块 -负责SQLite数据库的初始化、连接和表结构管理 +============== + +本模块负责BigDataTool项目的SQLite数据库管理,包括: + +核心功能: +1. 数据库初始化和表结构创建 +2. 数据库连接管理和事务处理 +3. 表结构版本控制和字段动态添加 +4. 数据库完整性检查和自动修复 + +数据表结构: +- config_groups: 配置组管理(Cassandra/Redis连接配置) +- query_history: 查询历史记录(单表/分表/Redis查询) +- sharding_config_groups: 分表配置组(TWCS分表参数) +- query_logs: 查询日志(实时操作日志和性能监控) +- redis_config_groups: Redis配置组(集群连接配置) +- redis_query_history: Redis查询历史(Redis数据比对记录) + +设计特点: +- 自动化表结构管理:支持字段动态添加和版本升级 +- 向后兼容性:确保旧版本数据的正常访问 +- 错误恢复:数据库损坏时自动重建表结构 +- 索引优化:为查询性能优化的索引设计 + +使用方式: +- ensure_database(): 确保数据库和表结构存在 +- get_db_connection(): 获取标准的数据库连接 +- init_database(): 手动初始化数据库(通常自动调用) + +作者:BigDataTool项目组 +更新时间:2024年8月 """ import sqlite3 @@ -14,7 +44,27 @@ logger = logging.getLogger(__name__) DATABASE_PATH = 'config_groups.db' def init_database(): - """初始化数据库""" + """ + 初始化SQLite数据库和所有必要的表结构 + + 创建以下数据表: + 1. config_groups - Cassandra配置组存储 + 2. query_history - 查询历史记录存储 + 3. sharding_config_groups - 分表配置组存储 + 4. query_logs - 查询日志存储 + 5. redis_config_groups - Redis配置组存储 + 6. redis_query_history - Redis查询历史存储 + + 同时创建必要的索引以优化查询性能。 + + Returns: + bool: 初始化成功返回True,失败返回False + + 注意: + - 使用IF NOT EXISTS确保重复调用安全 + - 自动创建性能优化索引 + - 支持外键约束和级联删除 + """ try: conn = sqlite3.connect(DATABASE_PATH) cursor = conn.cursor() @@ -135,7 +185,28 @@ def init_database(): return False def ensure_database(): - """确保数据库和表存在""" + """ + 确保数据库文件和表结构完整存在 + + 执行以下检查和操作: + 1. 检查数据库文件是否存在,不存在则创建 + 2. 验证所有必要表是否存在,缺失则重建 + 3. 检查表结构是否完整,缺少字段则动态添加 + 4. 确保索引完整性 + + 支持的表结构升级: + - config_groups表:添加sharding_config字段 + - query_history表:添加sharding_config、query_type、raw_results等字段 + - query_logs表:添加history_id外键字段 + + Returns: + bool: 数据库就绪返回True,初始化失败返回False + + 特性: + - 向后兼容:支持从旧版本数据库升级 + - 自动修复:检测到问题时自动重建 + - 零停机:升级过程不影响现有数据 + """ if not os.path.exists(DATABASE_PATH): logger.info("数据库文件不存在,正在创建...") return init_database() @@ -222,7 +293,26 @@ def ensure_database(): return init_database() def get_db_connection(): - """获取数据库连接""" + """ + 获取配置好的SQLite数据库连接 + + 返回一个配置了Row工厂的数据库连接,支持: + - 字典式访问查询结果(row['column_name']) + - 自动类型转换 + - 标准的SQLite连接功能 + + Returns: + sqlite3.Connection: 配置好的数据库连接对象 + + 使用示例: + conn = get_db_connection() + cursor = conn.cursor() + cursor.execute("SELECT * FROM config_groups") + rows = cursor.fetchall() + for row in rows: + print(row['name']) # 字典式访问 + conn.close() + """ conn = sqlite3.connect(DATABASE_PATH) conn.row_factory = sqlite3.Row return conn \ No newline at end of file diff --git a/modules/query_engine.py b/modules/query_engine.py index 53eff0d..a85be3a 100644 --- a/modules/query_engine.py +++ b/modules/query_engine.py @@ -1,6 +1,35 @@ """ -数据查询模块 -负责Cassandra数据的查询执行,支持单表、分表和多主键查询 +数据查询引擎模块 +================ + +本模块是BigDataTool的核心查询引擎,负责Cassandra数据库的高级查询功能。 + +核心功能: +1. 单表查询:标准的Cassandra CQL查询执行 +2. 分表查询:基于TWCS策略的时间分表查询 +3. 多主键查询:支持复合主键的复杂查询条件 +4. 混合查询:生产环境分表+测试环境单表的组合查询 + +查询类型支持: +- 单主键查询:WHERE key IN (val1, val2, val3) +- 复合主键查询:WHERE (key1='val1' AND key2='val2') OR (key1='val3' AND key2='val4') +- 分表查询:自动计算分表名称并并行查询多张表 +- 字段过滤:支持指定查询字段和排除字段 + +分表查询特性: +- 时间戳提取:从Key中智能提取时间戳信息 +- 分表计算:基于TWCS策略计算目标分表 +- 并行查询:同时查询多张分表以提高性能 +- 错误容错:单个分表查询失败不影响整体结果 + +性能优化: +- 查询时间监控:记录每个查询的执行时间 +- 批量处理:支持大批量Key的高效查询 +- 连接复用:优化数据库连接的使用 +- 内存管理:大结果集的内存友好处理 + +作者:BigDataTool项目组 +更新时间:2024年8月 """ import time @@ -10,7 +39,39 @@ from .sharding import ShardingCalculator logger = logging.getLogger(__name__) def execute_query(session, table, keys, fields, values, exclude_fields=None): - """执行查询,支持单主键和复合主键""" + """ + 执行Cassandra数据库查询,支持单主键和复合主键查询 + + 本函数是查询引擎的核心,能够智能处理不同类型的主键查询: + - 单主键:生成 WHERE key IN (val1, val2, val3) 查询 + - 复合主键:生成 WHERE (key1='val1' AND key2='val2') OR ... 查询 + + Args: + session: Cassandra数据库会话对象 + table (str): 目标表名 + keys (list): 主键字段名列表,如 ['id'] 或 ['docid', 'id'] + fields (list): 要查询的字段列表,空列表表示查询所有字段 + values (list): 查询值列表,复合主键值用逗号分隔 + exclude_fields (list, optional): 要排除的字段列表 + + Returns: + list: 查询结果列表,每个元素是一个Row对象 + + 查询示例: + # 单主键查询 + execute_query(session, 'users', ['id'], ['name', 'email'], ['1', '2', '3']) + # 生成SQL: SELECT name, email FROM users WHERE id IN ('1', '2', '3') + + # 复合主键查询 + execute_query(session, 'orders', ['user_id', 'order_id'], ['*'], ['1,100', '2,200']) + # 生成SQL: SELECT * FROM orders WHERE (user_id='1' AND order_id='100') OR (user_id='2' AND order_id='200') + + 错误处理: + - 参数验证:检查keys和values是否为空 + - SQL注入防护:对查询值进行适当转义 + - 异常捕获:数据库错误时返回空列表 + - 日志记录:记录查询SQL和执行统计 + """ try: # 参数验证 if not keys or len(keys) == 0: diff --git a/modules/query_logger.py b/modules/query_logger.py index 87684c9..a6a6634 100644 --- a/modules/query_logger.py +++ b/modules/query_logger.py @@ -1,6 +1,38 @@ """ 查询日志管理模块 -负责查询日志的收集、存储和检索 +================ + +本模块提供BigDataTool的完整查询日志管理功能,支持实时日志收集和历史日志分析。 + +核心功能: +1. 实时日志收集:自动收集所有查询操作的详细日志 +2. 批次管理:按查询批次组织日志,便于追踪完整的查询流程 +3. 双重存储:内存缓存 + SQLite持久化存储 +4. 历史关联:将日志与查询历史记录关联,支持完整的操作回溯 +5. 性能监控:记录查询时间、记录数等性能指标 + +日志收集特性: +- 多级日志:支持INFO、WARNING、ERROR等日志级别 +- 批次追踪:每个查询批次分配唯一ID,便于日志分组 +- 时间戳:精确到毫秒的时间戳记录 +- 查询类型:区分单表、分表、Redis等不同查询类型 +- 历史关联:支持日志与查询历史记录的双向关联 + +存储策略: +- 内存缓存:最近的日志保存在内存中,支持快速访问 +- 数据库持久化:所有日志自动保存到SQLite数据库 +- 容量控制:内存缓存有容量限制,自动清理旧日志 +- 事务安全:数据库写入失败不影响程序运行 + +查询和分析: +- 按批次查询:支持按查询批次获取相关日志 +- 按历史记录查询:支持按历史记录ID获取相关日志 +- 分页支持:大量日志的分页显示 +- 时间范围:支持按时间范围筛选日志 +- 日志清理:支持按时间清理旧日志 + +作者:BigDataTool项目组 +更新时间:2024年8月 """ import sqlite3 diff --git a/modules/redis_client.py b/modules/redis_client.py index 1fca580..324365d 100644 --- a/modules/redis_client.py +++ b/modules/redis_client.py @@ -1,6 +1,36 @@ """ Redis连接管理模块 -负责Redis集群的连接、错误处理和性能追踪 +=================== + +本模块提供Redis集群的连接管理和基础操作功能,支持单节点和集群模式。 + +核心功能: +1. 智能连接管理:自动检测单节点和集群模式 +2. 连接池优化:高效的连接复用和资源管理 +3. 错误处理:完善的连接失败诊断和重试机制 +4. 性能监控:连接时间和操作性能的实时监控 +5. 类型检测:自动识别Redis数据类型 + +连接特性: +- 自适应模式:根据节点数量自动选择连接方式 +- 连接池管理:每个节点独立的连接池配置 +- 超时控制:可配置的连接和操作超时时间 +- 密码认证:支持Redis AUTH认证 +- 健康检查:连接状态的实时监控 + +支持的Redis版本: +- Redis 5.0+:完整功能支持 +- Redis Cluster:集群模式支持 +- Redis Sentinel:哨兵模式支持(通过配置) + +错误诊断: +- 连接超时:网络延迟和服务器负载分析 +- 认证失败:密码验证和权限检查 +- 集群错误:节点状态和集群配置验证 +- 数据类型错误:类型检测和转换建议 + +作者:BigDataTool项目组 +更新时间:2024年8月 """ import time diff --git a/modules/redis_query.py b/modules/redis_query.py index c73ab12..3246950 100644 --- a/modules/redis_query.py +++ b/modules/redis_query.py @@ -1,6 +1,43 @@ """ -Redis查询和数据比较模块 -负责Redis数据的查询、随机key获取和数据比较 +Redis查询引擎模块 +================= + +本模块是Redis数据比对的核心引擎,提供高级的Redis数据查询和比较功能。 + +核心功能: +1. 多模式查询:随机采样和指定Key两种查询模式 +2. 全类型支持:支持所有Redis数据类型的查询和比较 +3. 智能比较:针对不同数据类型的专门比较算法 +4. 性能监控:详细的查询时间和性能统计 +5. 错误容错:单个Key查询失败不影响整体结果 + +查询模式: +- 随机采样:从源集群随机获取指定数量的Key进行比对 +- 指定Key:对用户提供的Key列表进行精确比对 +- 模式匹配:支持通配符模式的Key筛选 + +支持的数据类型: +- String:字符串类型,自动检测JSON格式 +- Hash:哈希表,字段级别的深度比较 +- List:列表,保持元素顺序的精确比较 +- Set:集合,自动排序后的内容比较 +- ZSet:有序集合,包含分数的完整比较 +- Stream:消息流,消息级别的详细比较 + +比较算法: +- JSON智能比较:自动检测和比较JSON格式数据 +- 类型一致性检查:确保两个集群中数据类型一致 +- 内容深度比较:递归比较复杂数据结构 +- 性能优化:大数据集的高效比较算法 + +统计分析: +- 一致性统计:相同、不同、缺失Key的详细统计 +- 类型分布:各种数据类型的分布统计 +- 性能指标:查询时间、连接时间等性能数据 +- 错误分析:查询失败的详细错误统计 + +作者:BigDataTool项目组 +更新时间:2024年8月 """ import time diff --git a/modules/redis_types.py b/modules/redis_types.py index c7eb207..9c398cf 100644 --- a/modules/redis_types.py +++ b/modules/redis_types.py @@ -1,6 +1,29 @@ """ Redis数据类型支持增强模块 -支持string、hash、list、set、zset、json等数据类型的比较 +================================ + +本模块提供对Redis所有主要数据类型的完整支持,包括: +- String类型(包括JSON字符串的智能检测和格式化) +- Hash类型(键值对映射) +- List类型(有序列表) +- Set类型(无序集合) +- ZSet类型(有序集合,带分数) +- Stream类型(消息流,完整支持消息解析和比较) + +主要功能: +1. get_redis_value_with_type() - 获取任意类型的Redis键值 +2. compare_redis_values() - 智能比较不同数据类型的值 +3. batch_get_redis_values_with_type() - 批量获取键值信息 + +设计特点: +- 类型安全:自动检测并处理每种Redis数据类型 +- 编码处理:完善的UTF-8解码和二进制数据处理 +- JSON支持:智能识别和格式化JSON字符串 +- Stream支持:完整的Stream消息结构解析和比较 +- 错误处理:优雅处理连接错误和数据异常 + +作者:BigDataTool项目组 +更新时间:2024年8月 """ import json @@ -11,19 +34,41 @@ logger = logging.getLogger(__name__) def get_redis_value_with_type(redis_client, key): """ - 获取Redis键值及其数据类型 + 获取Redis键值及其数据类型的完整信息 + + 这是本模块的核心函数,支持所有Redis数据类型的获取和解析。 + 它会自动检测键的类型,然后使用相应的Redis命令获取数据, + 并进行适当的格式化处理。 Args: - redis_client: Redis客户端 - key: Redis键名 + redis_client: Redis客户端连接对象 + key (str): 要查询的Redis键名 Returns: - dict: { - 'type': 数据类型, - 'value': 值, - 'display_value': 用于显示的格式化值, - 'exists': 是否存在 - } + dict: 包含以下字段的字典 + - 'type' (str): Redis数据类型 ('string', 'hash', 'list', 'set', 'zset', 'stream') + - 'value': 解析后的原始值(Python对象) + - 'display_value' (str): 格式化后用于显示的字符串 + - 'exists' (bool): 键是否存在 + + 支持的数据类型处理: + - String: 自动检测JSON格式,支持二进制数据 + - Hash: 完整的字段映射,UTF-8解码 + - List: 有序列表,保持原始顺序 + - Set: 无序集合,自动排序便于比较 + - ZSet: 有序集合,包含成员和分数 + - Stream: 完整的消息流解析,包含元数据和消息内容 + + 异常处理: + - 连接异常:返回错误状态 + - 编码异常:标记为二进制数据 + - 数据异常:记录警告并提供基本信息 + + 示例: + >>> result = get_redis_value_with_type(client, "user:1001") + >>> print(result['type']) # 'string' + >>> print(result['value']) # 'John Doe' + >>> print(result['exists']) # True """ try: # 检查key是否存在 @@ -43,27 +88,29 @@ def get_redis_value_with_type(redis_client, key): } if key_type == 'string': - # 字符串类型 + # String类型处理 - 支持普通字符串和JSON字符串的智能识别 value = redis_client.get(key) if value: try: - # 尝试解码为字符串 + # 尝试UTF-8解码 str_value = value.decode('utf-8') result['value'] = str_value - # 尝试解析为JSON + # 智能检测JSON格式并格式化显示 try: json_value = json.loads(str_value) result['display_value'] = json.dumps(json_value, indent=2, ensure_ascii=False) - result['type'] = 'json_string' # 标记为JSON字符串 - except: + result['type'] = 'json_string' # 标记为JSON字符串类型 + except json.JSONDecodeError: + # 不是JSON格式,直接显示字符串内容 result['display_value'] = str_value except UnicodeDecodeError: - # 二进制数据 + # 处理二进制数据 - 无法UTF-8解码的数据 result['value'] = value result['display_value'] = f"" else: + # 空字符串处理 result['value'] = "" result['display_value'] = "" @@ -130,6 +177,80 @@ def get_redis_value_with_type(redis_client, key): result['value'] = decoded_zset result['display_value'] = json.dumps(decoded_zset, indent=2, ensure_ascii=False) + elif key_type == 'stream': + # Stream类型 + try: + # 获取Stream信息 + stream_info = redis_client.xinfo_stream(key) + + # 获取Stream中的消息(最多获取100条最新消息) + stream_messages = redis_client.xrange(key, count=100) + + # 解析Stream数据 + decoded_stream = { + 'info': { + 'length': stream_info.get('length', 0), + 'radix_tree_keys': stream_info.get('radix-tree-keys', 0), + 'radix_tree_nodes': stream_info.get('radix-tree-nodes', 0), + 'last_generated_id': stream_info.get('last-generated-id', '').decode('utf-8') if stream_info.get('last-generated-id') else '', + 'first_entry': None, + 'last_entry': None + }, + 'messages': [] + } + + # 处理first-entry和last-entry + if stream_info.get('first-entry'): + first_entry = stream_info['first-entry'] + decoded_stream['info']['first_entry'] = { + 'id': first_entry[0].decode('utf-8'), + 'fields': {first_entry[1][i].decode('utf-8'): first_entry[1][i+1].decode('utf-8') + for i in range(0, len(first_entry[1]), 2)} + } + + if stream_info.get('last-entry'): + last_entry = stream_info['last-entry'] + decoded_stream['info']['last_entry'] = { + 'id': last_entry[0].decode('utf-8'), + 'fields': {last_entry[1][i].decode('utf-8'): last_entry[1][i+1].decode('utf-8') + for i in range(0, len(last_entry[1]), 2)} + } + + # 处理消息列表 + for message in stream_messages: + message_id = message[0].decode('utf-8') + message_fields = message[1] + + decoded_message = { + 'id': message_id, + 'fields': {} + } + + # 解析消息字段 + for i in range(0, len(message_fields), 2): + try: + field_name = message_fields[i].decode('utf-8') + field_value = message_fields[i+1].decode('utf-8') + decoded_message['fields'][field_name] = field_value + except (IndexError, UnicodeDecodeError): + continue + + decoded_stream['messages'].append(decoded_message) + + result['value'] = decoded_stream + result['display_value'] = json.dumps(decoded_stream, indent=2, ensure_ascii=False) + + except Exception as stream_error: + logger.warning(f"获取Stream详细信息失败 {key}: {stream_error}") + # 如果详细获取失败,至少获取基本信息 + try: + stream_length = redis_client.xlen(key) + result['value'] = {'length': stream_length, 'messages': []} + result['display_value'] = f"Stream (length: {stream_length} messages)" + except: + result['value'] = "Stream data (unable to read details)" + result['display_value'] = "Stream data (unable to read details)" + else: # 未知类型 result['value'] = f"" @@ -238,6 +359,58 @@ def compare_redis_values(value1_info, value2_info): else: return {'status': 'different', 'message': f'有序集合不同,大小: {len(value1)} vs {len(value2)}'} + elif type1 == 'stream': + # Stream比较 + if value1 == value2: + return {'status': 'identical', 'message': 'Stream完全相同'} + else: + # 详细比较Stream + if isinstance(value1, dict) and isinstance(value2, dict): + # 比较Stream基本信息 + info1 = value1.get('info', {}) + info2 = value2.get('info', {}) + + if info1.get('length', 0) != info2.get('length', 0): + return { + 'status': 'different', + 'message': f'Stream长度不同: {info1.get("length", 0)} vs {info2.get("length", 0)}' + } + + # 比较最后生成的ID + if info1.get('last_generated_id') != info2.get('last_generated_id'): + return { + 'status': 'different', + 'message': f'Stream最后ID不同: {info1.get("last_generated_id")} vs {info2.get("last_generated_id")}' + } + + # 比较消息内容 + messages1 = value1.get('messages', []) + messages2 = value2.get('messages', []) + + if len(messages1) != len(messages2): + return { + 'status': 'different', + 'message': f'Stream消息数量不同: {len(messages1)} vs {len(messages2)}' + } + + # 比较具体消息 + for i, (msg1, msg2) in enumerate(zip(messages1, messages2)): + if msg1.get('id') != msg2.get('id'): + return { + 'status': 'different', + 'message': f'Stream消息ID不同 (第{i+1}条): {msg1.get("id")} vs {msg2.get("id")}' + } + + if msg1.get('fields') != msg2.get('fields'): + return { + 'status': 'different', + 'message': f'Stream消息内容不同 (第{i+1}条消息)' + } + + return {'status': 'identical', 'message': 'Stream数据相同'} + else: + return {'status': 'different', 'message': 'Stream数据格式不同'} + else: # 其他类型的通用比较 if value1 == value2: diff --git a/modules/sharding.py b/modules/sharding.py index bf8f41e..ee7e139 100644 --- a/modules/sharding.py +++ b/modules/sharding.py @@ -1,6 +1,41 @@ """ -分表计算模块 -负责TWCS时间分表的计算和映射 +TWCS分表计算引擎模块 +=================== + +本模块实现基于TWCS(Time Window Compaction Strategy)策略的时间分表计算功能。 + +核心功能: +1. 时间戳提取:从Key中智能提取时间戳信息 +2. 分表索引计算:基于时间窗口计算目标分表索引 +3. 分表映射:将大批量Key映射到对应的分表 +4. 统计分析:提供分表计算的详细统计信息 + +TWCS分表策略: +- 时间窗口:可配置的时间间隔(默认7天) +- 分表数量:可配置的分表总数(默认14张) +- 计算公式:timestamp // interval_seconds % table_count +- 表命名:base_table_name + "_" + shard_index + +时间戳提取算法: +- 优先规则:提取Key中最后一个下划线后的数字 +- 备用规则:提取Key中最长的数字序列 +- 容错处理:无法提取时记录到失败列表 +- 格式支持:支持各种Key格式的时间戳提取 + +应用场景: +- 大数据表的时间分片:按时间窗口将数据分散到多张表 +- 查询性能优化:减少单表数据量,提高查询效率 +- 数据生命周期管理:支持按时间窗口的数据清理 +- 负载均衡:将查询负载分散到多张表 + +性能特点: +- 批量计算:支持大批量Key的高效分表计算 +- 内存友好:使用生成器和迭代器优化内存使用 +- 统计完整:提供详细的计算成功率和分布统计 +- 错误容错:单个Key计算失败不影响整体处理 + +作者:BigDataTool项目组 +更新时间:2024年8月 """ import re @@ -9,7 +44,24 @@ import logging logger = logging.getLogger(__name__) class ShardingCalculator: - """分表计算器,基于TWCS策略""" + """ + TWCS分表计算器 + + 基于Time Window Compaction Strategy实现的智能分表计算器, + 用于将时间相关的Key映射到对应的时间窗口分表。 + + 主要特性: + - 时间窗口分片:按配置的时间间隔进行分表 + - 智能时间戳提取:支持多种Key格式的时间戳解析 + - 负载均衡:通过取模运算实现分表间的负载均衡 + - 批量处理:高效处理大批量Key的分表映射 + + 适用场景: + - 时序数据的分表存储 + - 大数据表的性能优化 + - 数据生命周期管理 + - 查询负载分散 + """ def __init__(self, interval_seconds=604800, table_count=14): """ diff --git a/templates/redis_compare_old.html b/templates/redis_compare_old.html deleted file mode 100644 index b5162ec..0000000 --- a/templates/redis_compare_old.html +++ /dev/null @@ -1,696 +0,0 @@ - - - - - - Redis集群比对工具 - - - - - - - - -
- - - - -
-
-
- -
-

Redis集群比对工具

-

专业的Redis集群数据比对工具,支持随机采样和指定Key查询

-
-
-
-
- - -
- -
-
-

配置管理

- -
- -
-
-
-
配置组管理
-
-
-
-
- -
-
- -
-
-
-
- -
-
- -
-
-
-
-
- - -
-
-
-
查询历史
-
-
-
-
- -
-
- -
-
-
-
- -
-
- -
-
-
-
-
-
- - -
-
-
-
-
配置导入
-
-
- - - - 支持YAML格式配置导入,如:clusterName、clusterAddress、clusterPassword等 - -
-
-
-
-
-
-
查询日志
-
-
- - - 查看Redis比较操作的详细执行日志 - -
-
-
-
-
-
- - -
-
-

Redis集群配置

- -
- -
-
-
集群1 (生产环境)
- -
- - -
- -
- -
-
- - - -
-
- -
- -
- - -
- -
-
- - -
-
- - -
-
- -
- -
-
-
- - -
-
-
集群2 (测试环境)
- -
- - -
- -
- -
-
- - - -
-
- -
- -
- - -
- -
-
- - -
-
- - -
-
- -
- -
-
-
-
-
-
-
- - -
-
-
-

查询选项

- -
- -
- - -
-
- - -
-
- - -
-
-
- - -
-
- - -
-
- - -
-
-
- - - -
-
-
- - -
-
-
- - - -
-
- Loading... -
- 正在执行Redis数据比较,请稍候... -
-
-
-
- - - -
- - - - - - - - - - - - - - - - - - - - - - - - \ No newline at end of file