From b7a05e56d0b13b89f27b422b764becb1748ce17c Mon Sep 17 00:00:00 2001 From: YoVinchen Date: Sat, 2 Aug 2025 01:23:33 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E6=88=90=E5=9F=BA=E6=9C=AC=E5=88=86?= =?UTF-8?q?=E8=A1=A8=E6=9F=A5=E8=AF=A2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CLAUDE.md | 133 ++++++++++- app.py | 471 +++++++++++++++++++++++++++++++++++++- static/js/app.js | 353 +++++++++++++++++++++++++++- templates/db_compare.html | 158 ++++++++++++- 4 files changed, 1087 insertions(+), 28 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index 77c43d7..b224e28 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -4,7 +4,7 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co ## 项目架构 -这是一个基于 Flask 的数据库查询比对工具,用于比较 Cassandra 数据库中生产环境和测试环境的数据差异。 +这是一个基于 Flask 的数据库查询比对工具,用于比较 Cassandra 数据库中生产环境和测试环境的数据差异。现已支持单表查询和TWCS分表查询两种模式。 ### 核心组件架构 @@ -15,20 +15,51 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co - 配置组管理(CRUD操作) - JSON字段特殊处理和数组比较逻辑 - 查询历史记录管理 -- `config_groups.db`: SQLite数据库,存储用户保存的配置组和查询历史 + - **分表查询功能(新增)**: + - `ShardingCalculator`类:TWCS时间分表计算器 + - `execute_sharding_query()`:分表查询执行 + - `execute_mixed_query()`:混合查询支持(生产分表+测试单表组合) + - `/api/sharding-query`:分表查询API端点 +- `config_groups.db`: SQLite数据库,存储用户保存的配置组、查询历史和分表配置 **前端 (原生JavaScript + Bootstrap)** -- `templates/db_compare.html`: 主界面模板,包含配置表单和结果展示 +- `templates/db_compare.html`: 主界面模板,**现已支持单表和分表双模式** + - 分表模式切换开关 + - 生产/测试环境独立分表配置 + - 分表参数配置(时间间隔、分表数量) + - 分表查询信息展示 - `templates/index.html`: 工具集合首页 - `static/js/app.js`: 核心前端逻辑 - 配置管理和表单处理 - 差异结果的分页展示系统 - 原生数据展示(多种视图模式:格式化、原始、差异对比、树形) - 高级错误处理和用户反馈 + - **分表查询支持(新增)**: + - `toggleShardingMode()`:分表模式切换 + - `getShardingConfig()`:分表配置获取 + - `displayShardingInfo()`:分表查询结果展示 + +**分表查询功能模块(重要新增)** +- **时间戳提取算法(已更新)**: + - **新规则**:使用 `re.sub(r'\D', '', key)` 删除Key中所有非数字字符 + - 将提取到的数字字符串转换为整数作为时间戳 + - 支持任意格式的Key,只要包含数字即可 + - 示例:`wmid_1609459200` → `1609459200`,`abc123def456` → `123456` +- **分表索引计算**: + - 公式:`int(numbers) // interval_seconds % table_count` + - 默认配置:604800秒间隔(7天),14张分表 + - 支持自定义配置 +- **混合查询场景**: + - 生产环境分表 + 测试环境单表 + - 生产环境分表 + 测试环境分表 + - 生产环境单表 + 测试环境分表 + - 生产环境单表 + 测试环境单表 **示例代码** - `demo/Query.py`: 独立的Cassandra查询比对脚本示例 - `demo/twcsQuery.py`: 另一个查询示例 +- `demo/CalculationLibrary.py`: 分表计算逻辑参考实现 +- `test_sharding.py`: 分表功能测试脚本 ### 关键功能模块 @@ -46,6 +77,12 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co - 配置导入导出和管理 - 详细的错误诊断和故障排查指南 - 查询历史记录和复用 +- **查询日志系统(新增)**: + - 实时显示SQL执行日志 + - 支持日志级别过滤(INFO/WARNING/ERROR) + - SQL语句语法高亮显示 + - 执行时间和记录数统计 + - 日志清空和刷新功能 ## 开发相关命令 @@ -61,8 +98,34 @@ python app.py # 修改app.py最后一行:app.run(debug=True, port=5001) ``` +### 测试和验证 +```bash +# 运行分表功能测试(测试时间戳提取和分表索引计算) +python test_sharding.py + +# 测试新的分表计算规则 +python test_new_sharding.py + +# 演示新分表规则的详细工作原理 +python demo_new_sharding.py + +# 测试查询日志功能 +python test_query_logs.py + +# 集成测试(分表功能 + 查询日志) +python test_integration.py + +# 测试数据库连接和查询功能 +# 通过Web界面:http://localhost:5000/db-compare +# 或直接运行示例脚本: +python demo/Query.py +python demo/twcsQuery.py +``` + ### 开发模式 -应用默认运行在debug模式,代码修改后自动重启。访问 http://localhost:5000 查看首页,http://localhost:5000/db-compare 使用比对工具。 +应用默认运行在debug模式,代码修改后自动重启。访问: +- http://localhost:5000 - 工具集合首页 +- http://localhost:5000/db-compare - 数据库比对工具 ### 依赖项 - Flask==2.3.3 @@ -72,7 +135,8 @@ python app.py ### 核心API端点 - `GET /api/default-config`: 获取默认数据库配置 -- `POST /api/query`: 执行数据库查询比对(主要功能) +- `POST /api/query`: 执行单表数据库查询比对(原有功能) +- `POST /api/sharding-query`: 执行分表查询比对(新增功能) - `GET /api/config-groups`: 获取所有配置组 - `POST /api/config-groups`: 创建新配置组 - `GET /api/config-groups/`: 获取特定配置组 @@ -82,17 +146,31 @@ python app.py - `POST /api/query-history`: 保存查询历史 - `GET /api/query-history/`: 获取特定历史记录 - `DELETE /api/query-history/`: 删除历史记录 +- `GET /api/query-logs`: 获取查询日志(支持limit参数) +- `DELETE /api/query-logs`: 清空查询日志 ### 查询比对流程 + +**单表查询流程(`/api/query`)**: 1. 前端发送配置和Key值列表到 `/api/query` 2. 后端创建两个Cassandra连接(生产+测试) 3. 并行执行查询,获取原始数据 4. 运行比较算法,生成差异报告 5. 返回完整结果(差异、统计、原始数据) +**分表查询流程(`/api/sharding-query`)**: +1. 前端发送配置、Key值列表和分表配置到 `/api/sharding-query` +2. 后端使用 `ShardingCalculator` 解析Key中的时间戳 +3. 根据分表算法计算每个Key对应的分表名称 +4. 创建分表映射关系,并行执行分表查询 +5. 汇总所有分表结果,执行比较算法 +6. 返回包含分表信息的完整结果 + ## 数据结构和配置 ### 数据库配置结构 + +**单表查询配置**: ```javascript { pro_config: { @@ -107,6 +185,24 @@ python app.py } ``` +**分表查询配置**: +```javascript +{ + pro_config: { /* 基础配置同上 */ }, + test_config: { /* 基础配置同上 */ }, + keys: ["主键字段名"], + fields_to_compare: ["字段1", "字段2"], + exclude_fields: ["排除字段"], + values: ["key1", "key2", "key3"], + sharding_config: { + use_sharding_for_pro: true, // 生产环境是否使用分表 + use_sharding_for_test: false, // 测试环境是否使用分表 + interval_seconds: 604800, // 分表时间间隔(默认7天) + table_count: 14 // 分表数量(默认14张表) + } +} +``` + ### 查询结果结构 ```javascript { @@ -115,12 +211,34 @@ python app.py identical_results: [{ key, pro_fields, test_fields }], field_diff_count: { "field_name": count }, raw_pro_data: [], raw_test_data: [], - summary: { overview, percentages, field_analysis, recommendations } + summary: { overview, percentages, field_analysis, recommendations }, + + // 分表查询特有字段 + sharding_info: { + pro_shard_mapping: { "key1": "table_name_0", "key2": "table_name_1" }, + test_shard_mapping: { /* 同上 */ }, + failed_keys: [], // 时间戳提取失败的Key + shard_stats: { + pro_tables_used: ["table_0", "table_1"], + test_tables_used: ["table_0"], + timestamp_extraction_success_rate: 95.5 + } + } } ``` ## 开发注意事项 +### 分表功能开发指导 +- **时间戳解析(已更新)**:`ShardingCalculator.extract_timestamp_from_key()` 新规则 + - 使用 `re.sub(r'\D', '', key)` 删除所有非数字字符 + - 将提取的数字字符串转换为整数作为时间戳 + - 不再进行时间戳有效性验证,支持任意数字组合 +- **分表索引计算**:使用公式 `int(numbers) // interval_seconds % table_count` +- **错误处理**:Key中没有数字字符时会记录到 `failed_keys` 中 +- **混合查询**:支持生产环境分表+测试环境单表的组合场景 +- **前端状态**:分表模式通过 `toggleShardingMode()` 切换,影响UI和提示文本 + ### Cassandra连接处理 - 连接包含详细的错误诊断和重试机制 - 使用DCAwareRoundRobinPolicy避免负载均衡警告 @@ -133,6 +251,7 @@ python app.py - `currentResults`: 存储最新查询结果 - 分页状态:`currentIdenticalPage`, `currentDifferencePage` - 过滤状态:`filteredIdenticalResults`, `filteredDifferenceResults` +- **日志状态(新增)**:`allQueryLogs` - 存储所有查询日志 ### JSON和数组字段处理 - `normalize_json_string()`: 标准化JSON字符串用于比较 @@ -144,6 +263,7 @@ python app.py - 后端:分类错误(connection_error, validation_error, query_error, system_error) - 前端:详细错误展示,包含配置信息、解决建议、连接测试工具 - 提供交互式故障排查指南 +- **查询日志(新增)**:所有SQL执行和错误信息都会记录到查询日志中 ### 性能考虑 - 大数据集的分页处理 @@ -160,6 +280,7 @@ python app.py - pro_config: 生产环境配置(JSON) - test_config: 测试环境配置(JSON) - query_config: 查询配置(JSON) +- **sharding_config: 分表配置(JSON,新增字段)** - created_at/updated_at: 时间戳 **query_history表** diff --git a/app.py b/app.py index f62e2b8..3fe992d 100644 --- a/app.py +++ b/app.py @@ -6,16 +6,147 @@ import os import logging import sqlite3 from datetime import datetime +import re +import concurrent.futures +import time app = Flask(__name__) # 配置日志 -logging.basicConfig(level=logging.INFO) +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) +# 查询日志收集器 +class QueryLogCollector: + def __init__(self, max_logs=1000): + self.logs = [] + self.max_logs = max_logs + + def add_log(self, level, message): + timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] + log_entry = { + 'timestamp': timestamp, + 'level': level, + 'message': message + } + self.logs.append(log_entry) + # 保持日志数量在限制内 + if len(self.logs) > self.max_logs: + self.logs.pop(0) + + def get_logs(self, limit=None): + if limit: + return self.logs[-limit:] + return self.logs + + def clear_logs(self): + self.logs.clear() + +# 全局日志收集器实例 +query_log_collector = QueryLogCollector() + +# 自定义日志处理器 +class CollectorHandler(logging.Handler): + def __init__(self, collector): + super().__init__() + self.collector = collector + + def emit(self, record): + self.collector.add_log(record.levelname, record.getMessage()) + +# 添加收集器处理器到logger +collector_handler = CollectorHandler(query_log_collector) +logger.addHandler(collector_handler) + # 数据库配置 DATABASE_PATH = 'config_groups.db' +class ShardingCalculator: + """分表计算器,基于TWCS策略""" + + def __init__(self, interval_seconds=604800, table_count=14): + """ + 初始化分表计算器 + :param interval_seconds: 时间间隔(秒),默认604800(7天) + :param table_count: 分表数量,默认14 + """ + self.interval_seconds = interval_seconds + self.table_count = table_count + + def extract_timestamp_from_key(self, key): + """ + 从Key中提取时间戳 + 新规则:删除所有非数字字符,然后作为时间戳 + """ + if not key: + return None + + key_str = str(key) + + # 删除所有非数字字符 + numbers = re.sub(r'\D', '', key_str) + + if not numbers: + logger.warning(f"Key '{key}' 中没有找到数字字符") + return None + + try: + timestamp = int(numbers) + logger.info(f"Key '{key}' 提取到时间戳: {timestamp}") + return timestamp + except ValueError: + logger.error(f"Key '{key}' 数字转换失败: {numbers}") + return None + + def calculate_shard_index(self, timestamp): + """ + 计算分表索引 + 公式:timestamp // interval_seconds % table_count + """ + if timestamp is None: + return None + return int(timestamp) // self.interval_seconds % self.table_count + + def get_shard_table_name(self, base_table_name, key): + """ + 根据Key获取对应的分表名称 + """ + timestamp = self.extract_timestamp_from_key(key) + if timestamp is None: + return None + + shard_index = self.calculate_shard_index(timestamp) + return f"{base_table_name}_{shard_index}" + + def get_all_shard_tables_for_keys(self, base_table_name, keys): + """ + 为一批Keys计算所有需要查询的分表 + 返回: {shard_table_name: [keys_for_this_shard], ...} + """ + shard_mapping = {} + failed_keys = [] + calculation_stats = { + 'total_keys': len(keys), + 'successful_extractions': 0, + 'failed_extractions': 0, + 'unique_shards': 0 + } + + for key in keys: + shard_table = self.get_shard_table_name(base_table_name, key) + if shard_table: + if shard_table not in shard_mapping: + shard_mapping[shard_table] = [] + shard_mapping[shard_table].append(key) + calculation_stats['successful_extractions'] += 1 + else: + failed_keys.append(key) + calculation_stats['failed_extractions'] += 1 + + calculation_stats['unique_shards'] = len(shard_mapping) + + return shard_mapping, failed_keys, calculation_stats + def init_database(): """初始化数据库""" try: @@ -31,6 +162,7 @@ def init_database(): pro_config TEXT NOT NULL, test_config TEXT NOT NULL, query_config TEXT NOT NULL, + sharding_config TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) @@ -55,6 +187,21 @@ def init_database(): ) ''') + # 创建分表配置组表 + cursor.execute(''' + CREATE TABLE IF NOT EXISTS sharding_config_groups ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT NOT NULL UNIQUE, + description TEXT, + pro_config TEXT NOT NULL, + test_config TEXT NOT NULL, + query_config TEXT NOT NULL, + sharding_config TEXT NOT NULL, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ) + ''') + conn.commit() conn.close() logger.info("数据库初始化完成") @@ -73,14 +220,29 @@ def ensure_database(): try: conn = sqlite3.connect(DATABASE_PATH) cursor = conn.cursor() - cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name IN ('config_groups', 'query_history')") + cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name IN ('config_groups', 'query_history', 'sharding_config_groups')") results = cursor.fetchall() existing_tables = [row[0] for row in results] - if 'config_groups' not in existing_tables or 'query_history' not in existing_tables: - logger.info("数据库表不完整,正在重新创建...") + required_tables = ['config_groups', 'query_history', 'sharding_config_groups'] + missing_tables = [table for table in required_tables if table not in existing_tables] + + if missing_tables: + logger.info(f"数据库表不完整,缺少表:{missing_tables},正在重新创建...") return init_database() + # 检查config_groups表是否有sharding_config字段 + cursor.execute("PRAGMA table_info(config_groups)") + columns = cursor.fetchall() + column_names = [column[1] for column in columns] + + if 'sharding_config' not in column_names: + logger.info("添加sharding_config字段到config_groups表...") + cursor.execute("ALTER TABLE config_groups ADD COLUMN sharding_config TEXT") + conn.commit() + logger.info("sharding_config字段添加成功") + + conn.close() return True except Exception as e: logger.error(f"检查数据库表失败: {e}") @@ -298,7 +460,7 @@ DEFAULT_CONFIG = { 'exclude_fields': [] } -def save_config_group(name, description, pro_config, test_config, query_config): +def save_config_group(name, description, pro_config, test_config, query_config, sharding_config=None): """保存配置组""" if not ensure_database(): logger.error("数据库初始化失败") @@ -310,17 +472,18 @@ def save_config_group(name, description, pro_config, test_config, query_config): try: cursor.execute(''' INSERT OR REPLACE INTO config_groups - (name, description, pro_config, test_config, query_config, updated_at) - VALUES (?, ?, ?, ?, ?, ?) + (name, description, pro_config, test_config, query_config, sharding_config, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?) ''', ( name, description, json.dumps(pro_config), json.dumps(test_config), json.dumps(query_config), + json.dumps(sharding_config) if sharding_config else None, datetime.now().isoformat() )) conn.commit() - logger.info(f"配置组 '{name}' 保存成功") + logger.info(f"配置组 '{name}' 保存成功,包含分表配置: {sharding_config is not None}") return True except Exception as e: logger.error(f"保存配置组失败: {e}") @@ -378,7 +541,7 @@ def get_config_group_by_id(group_id): row = cursor.fetchone() if row: - return { + config = { 'id': row['id'], 'name': row['name'], 'description': row['description'], @@ -388,6 +551,30 @@ def get_config_group_by_id(group_id): 'created_at': row['created_at'], 'updated_at': row['updated_at'] } + + # 添加分表配置(如果存在) + sharding_config_data = None + try: + # 尝试获取sharding_config字段 + sharding_config_data = row[len(row) - 3] # sharding_config在倒数第三个位置 + except (IndexError, KeyError): + # 如果字段不存在,尝试通过列名获取 + try: + cursor.execute("PRAGMA table_info(config_groups)") + columns = cursor.fetchall() + column_names = [col[1] for col in columns] + if 'sharding_config' in column_names: + sharding_index = column_names.index('sharding_config') + sharding_config_data = row[sharding_index] + except: + pass + + if sharding_config_data: + config['sharding_config'] = json.loads(sharding_config_data) + else: + config['sharding_config'] = None + + return config return None except Exception as e: logger.error(f"获取配置组详情失败: {e}") @@ -555,11 +742,19 @@ def delete_query_history(history_id): def create_connection(config): """创建Cassandra连接""" try: + logger.info(f"正在连接Cassandra数据库: {config['hosts']}:{config['port']}, keyspace={config['keyspace']}") + start_time = time.time() + auth_provider = PlainTextAuthProvider(username=config['username'], password=config['password']) cluster = Cluster(config['hosts'], port=config['port'], auth_provider=auth_provider) session = cluster.connect(config['keyspace']) + + connection_time = time.time() - start_time + logger.info(f"Cassandra连接成功: 连接时间={connection_time:.3f}秒, 集群={cluster.metadata.cluster_name}") + return cluster, session except Exception as e: + logger.error(f"Cassandra连接失败: hosts={config['hosts']}, keyspace={config['keyspace']}, 错误={str(e)}") return None, None def execute_query(session, table, keys, fields, values, exclude_fields=None): @@ -576,11 +771,141 @@ def execute_query(session, table, keys, fields, values, exclude_fields=None): fields_str = "*" query_sql = f"SELECT {fields_str} FROM {table} WHERE {query_conditions};" + + # 记录查询SQL日志 + logger.info(f"执行查询SQL: {query_sql}") + logger.info(f"查询参数: 表={table}, 字段={fields_str}, Key数量={len(values)}") + + # 执行查询 + start_time = time.time() result = session.execute(query_sql) - return list(result) if result else [] + execution_time = time.time() - start_time + + result_list = list(result) if result else [] + logger.info(f"查询完成: 执行时间={execution_time:.3f}秒, 返回记录数={len(result_list)}") + + return result_list except Exception as e: + logger.error(f"查询执行失败: SQL={query_sql if 'query_sql' in locals() else 'N/A'}, 错误={str(e)}") return [] +def execute_sharding_query(session, shard_mapping, keys, fields, exclude_fields=None): + """ + 执行分表查询 + :param session: Cassandra会话 + :param shard_mapping: 分表映射 {table_name: [keys]} + :param keys: 主键字段名列表 + :param fields: 要查询的字段列表 + :param exclude_fields: 要排除的字段列表 + :return: (查询结果列表, 查询到的表列表, 查询失败的表列表) + """ + all_results = [] + queried_tables = [] + error_tables = [] + + logger.info(f"开始执行分表查询,涉及 {len(shard_mapping)} 张分表") + total_start_time = time.time() + + for table_name, table_keys in shard_mapping.items(): + try: + logger.info(f"查询分表 {table_name},包含 {len(table_keys)} 个key: {table_keys}") + # 为每个分表执行查询 + table_results = execute_query(session, table_name, keys, fields, table_keys, exclude_fields) + all_results.extend(table_results) + queried_tables.append(table_name) + logger.info(f"分表 {table_name} 查询成功,返回 {len(table_results)} 条记录") + except Exception as e: + logger.error(f"分表 {table_name} 查询失败: {e}") + error_tables.append(table_name) + + total_execution_time = time.time() - total_start_time + logger.info(f"分表查询总计完成: 执行时间={total_execution_time:.3f}秒, 成功表数={len(queried_tables)}, 失败表数={len(error_tables)}, 总记录数={len(all_results)}") + + return all_results, queried_tables, error_tables + +def execute_mixed_query(pro_session, test_session, pro_config, test_config, keys, fields_to_compare, values, exclude_fields, sharding_config): + """ + 执行混合查询(生产环境分表,测试环境可能单表或分表) + """ + results = { + 'pro_data': [], + 'test_data': [], + 'sharding_info': { + 'calculation_stats': {} + } + } + + # 处理生产环境查询 + if sharding_config.get('use_sharding_for_pro', False): + pro_calculator = ShardingCalculator( + interval_seconds=sharding_config.get('pro_interval_seconds', 604800), + table_count=sharding_config.get('pro_table_count', 14) + ) + pro_shard_mapping, pro_failed_keys, pro_calc_stats = pro_calculator.get_all_shard_tables_for_keys( + pro_config['table'], values + ) + + pro_data, pro_queried_tables, pro_error_tables = execute_sharding_query( + pro_session, pro_shard_mapping, keys, fields_to_compare, exclude_fields + ) + + results['pro_data'] = pro_data + results['sharding_info']['pro_shards'] = { + 'enabled': True, + 'interval_seconds': sharding_config.get('pro_interval_seconds', 604800), + 'table_count': sharding_config.get('pro_table_count', 14), + 'queried_tables': pro_queried_tables, + 'error_tables': pro_error_tables, + 'failed_keys': pro_failed_keys + } + results['sharding_info']['calculation_stats'].update(pro_calc_stats) + else: + # 生产环境单表查询 + pro_data = execute_query(pro_session, pro_config['table'], keys, fields_to_compare, values, exclude_fields) + results['pro_data'] = pro_data + results['sharding_info']['pro_shards'] = { + 'enabled': False, + 'queried_tables': [pro_config['table']] + } + + # 处理测试环境查询 + if sharding_config.get('use_sharding_for_test', False): + test_calculator = ShardingCalculator( + interval_seconds=sharding_config.get('test_interval_seconds', 604800), + table_count=sharding_config.get('test_table_count', 14) + ) + test_shard_mapping, test_failed_keys, test_calc_stats = test_calculator.get_all_shard_tables_for_keys( + test_config['table'], values + ) + + test_data, test_queried_tables, test_error_tables = execute_sharding_query( + test_session, test_shard_mapping, keys, fields_to_compare, exclude_fields + ) + + results['test_data'] = test_data + results['sharding_info']['test_shards'] = { + 'enabled': True, + 'interval_seconds': sharding_config.get('test_interval_seconds', 604800), + 'table_count': sharding_config.get('test_table_count', 14), + 'queried_tables': test_queried_tables, + 'error_tables': test_error_tables, + 'failed_keys': test_failed_keys + } + + # 合并计算统计信息 + if not results['sharding_info']['calculation_stats']: + results['sharding_info']['calculation_stats'] = test_calc_stats + else: + # 测试环境单表查询 + test_data = execute_query(test_session, test_config['table'], keys, fields_to_compare, values, exclude_fields) + results['test_data'] = test_data + results['sharding_info']['test_shards'] = { + 'enabled': False, + 'queried_tables': [test_config['table']] + } + + return results + def compare_results(pro_data, test_data, keys, fields_to_compare, exclude_fields, values): """比较查询结果""" differences = [] @@ -750,6 +1075,102 @@ def index(): def db_compare(): return render_template('db_compare.html') +@app.route('/api/sharding-query', methods=['POST']) +def sharding_query_compare(): + """分表查询比对API""" + try: + data = request.json + logger.info("开始执行分表数据库比对查询") + + # 解析配置 + pro_config = data.get('pro_config', DEFAULT_CONFIG['pro_config']) + test_config = data.get('test_config', DEFAULT_CONFIG['test_config']) + keys = data.get('keys', DEFAULT_CONFIG['keys']) + fields_to_compare = data.get('fields_to_compare', DEFAULT_CONFIG['fields_to_compare']) + exclude_fields = data.get('exclude_fields', DEFAULT_CONFIG['exclude_fields']) + values = data.get('values', []) + sharding_config = data.get('sharding_config', {}) + + if not values: + logger.warning("分表查询失败:未提供查询key值") + return jsonify({'error': '请提供查询key值'}), 400 + + logger.info(f"分表查询配置:{len(values)}个key值,生产表:{pro_config['table']},测试表:{test_config['table']}") + + # 创建数据库连接 + pro_cluster, pro_session = create_connection(pro_config) + test_cluster, test_session = create_connection(test_config) + + if not pro_session or not test_session: + logger.error("数据库连接失败") + return jsonify({'error': '数据库连接失败,请检查配置信息'}), 500 + + try: + # 执行混合查询(支持生产环境分表、测试环境单表/分表的组合) + logger.info("执行分表混合查询") + query_results = execute_mixed_query( + pro_session, test_session, pro_config, test_config, + keys, fields_to_compare, values, exclude_fields, sharding_config + ) + + pro_data = query_results['pro_data'] + test_data = query_results['test_data'] + sharding_info = query_results['sharding_info'] + + logger.info(f"分表查询结果:生产表 {len(pro_data)} 条记录,测试表 {len(test_data)} 条记录") + + # 比较结果 + differences, field_diff_count, identical_results = compare_results( + pro_data, test_data, keys, fields_to_compare, exclude_fields, values + ) + + # 统计信息 + different_ids = set() + for diff in differences: + if 'field' in diff: + different_ids.add(list(diff['key'].values())[0]) + + non_different_ids = set(values) - different_ids + + # 生成比较总结 + summary = generate_comparison_summary( + len(values), len(pro_data), len(test_data), + differences, identical_results, field_diff_count + ) + + result = { + 'total_keys': len(values), + 'pro_count': len(pro_data), + 'test_count': len(test_data), + 'differences': differences, + 'identical_results': identical_results, + 'field_diff_count': field_diff_count, + 'different_ids': list(different_ids), + 'non_different_ids': list(non_different_ids), + 'summary': summary, + 'sharding_info': sharding_info, # 包含分表查询信息 + 'raw_pro_data': [dict(row._asdict()) for row in pro_data] if pro_data else [], + 'raw_test_data': [dict(row._asdict()) for row in test_data] if test_data else [] + } + + logger.info(f"分表比对完成:发现 {len(differences)} 处差异") + + return jsonify(result) + + except Exception as e: + logger.error(f"分表查询执行失败:{str(e)}") + return jsonify({'error': f'分表查询执行失败:{str(e)}'}), 500 + finally: + # 关闭连接 + if pro_cluster: + pro_cluster.shutdown() + if test_cluster: + test_cluster.shutdown() + + except Exception as e: + logger.error(f"分表查询请求处理失败:{str(e)}") + return jsonify({'error': f'分表查询请求处理失败:{str(e)}'}), 500 + @app.route('/api/query', methods=['POST']) def query_compare(): try: @@ -890,10 +1311,13 @@ def api_save_config_group(): 'exclude_fields': data.get('exclude_fields', []) } + # 提取分表配置 + sharding_config = data.get('sharding_config') + if not name: return jsonify({'success': False, 'error': '配置组名称不能为空'}), 400 - success = save_config_group(name, description, pro_config, test_config, query_config) + success = save_config_group(name, description, pro_config, test_config, query_config, sharding_config) if success: return jsonify({'success': True, 'message': '配置组保存成功'}) @@ -995,5 +1419,30 @@ def api_delete_query_history(history_id): else: return jsonify({'success': False, 'error': '查询历史记录删除失败'}), 500 +@app.route('/api/query-logs', methods=['GET']) +def api_get_query_logs(): + """获取查询日志""" + try: + limit = request.args.get('limit', type=int) + logs = query_log_collector.get_logs(limit) + return jsonify({ + 'success': True, + 'data': logs, + 'total': len(query_log_collector.logs) + }) + except Exception as e: + logger.error(f"获取查询日志失败: {e}") + return jsonify({'success': False, 'error': str(e)}), 500 + +@app.route('/api/query-logs', methods=['DELETE']) +def api_clear_query_logs(): + """清空查询日志""" + try: + query_log_collector.clear_logs() + return jsonify({'success': True, 'message': '查询日志已清空'}) + except Exception as e: + logger.error(f"清空查询日志失败: {e}") + return jsonify({'success': False, 'error': str(e)}), 500 + if __name__ == '__main__': app.run(debug=True, port=5000) diff --git a/static/js/app.js b/static/js/app.js index fd67519..1c4e530 100644 --- a/static/js/app.js +++ b/static/js/app.js @@ -6,13 +6,93 @@ let filteredIdenticalResults = []; let currentDifferencePage = 1; let differencePageSize = 10; let filteredDifferenceResults = []; +let isShardingMode = false; // 分表模式标志 // 页面加载完成后初始化 document.addEventListener('DOMContentLoaded', function() { loadDefaultConfig(); loadConfigGroups(); // 加载配置组列表 + bindShardingEvents(); // 绑定分表相关事件 }); +// 绑定分表相关事件 +function bindShardingEvents() { + // 分表模式切换事件 + const enableShardingCheckbox = document.getElementById('enableSharding'); + if (enableShardingCheckbox) { + enableShardingCheckbox.addEventListener('change', toggleShardingMode); + } + + // 生产环境分表开关变化事件 + const useShardingProCheckbox = document.getElementById('use_sharding_for_pro'); + if (useShardingProCheckbox) { + useShardingProCheckbox.addEventListener('change', function() { + updateTableNameHints(); + }); + } + + // 测试环境分表开关变化事件 + const useShardingTestCheckbox = document.getElementById('use_sharding_for_test'); + if (useShardingTestCheckbox) { + useShardingTestCheckbox.addEventListener('change', function() { + updateTableNameHints(); + }); + } +} + +// 切换分表模式 +function toggleShardingMode() { + isShardingMode = document.getElementById('enableSharding').checked; + const shardingConfig = document.getElementById('shardingConfig'); + const executeButton = document.getElementById('executeButtonText'); + const keyInputHint = document.getElementById('key_input_hint'); + const keysField = document.getElementById('keys'); + + if (isShardingMode) { + // 启用分表模式 + shardingConfig.style.display = 'block'; + executeButton.textContent = '执行分表查询比对'; + keyInputHint.textContent = '分表模式:Key值应包含时间戳用于计算分表索引'; + keysField.placeholder = 'wmid (推荐使用包含时间戳的字段)'; + keysField.value = 'wmid'; + + // 更新查询Key输入框的占位符 + const queryValues = document.getElementById('query_values'); + queryValues.placeholder = '请输入查询的Key值,一行一个\n分表查询示例(包含时间戳):\nwmid_1609459200\nwmid_1610064000\nwmid_1610668800'; + } else { + // 禁用分表模式 + shardingConfig.style.display = 'none'; + executeButton.textContent = '执行查询比对'; + keyInputHint.textContent = '单表模式:输入普通Key值'; + keysField.placeholder = 'docid'; + keysField.value = 'docid'; + + // 更新查询Key输入框的占位符 + const queryValues = document.getElementById('query_values'); + queryValues.placeholder = '请输入查询的Key值,一行一个\n单表查询示例:\nkey1\nkey2\nkey3'; + } + + updateTableNameHints(); +} + +// 更新表名字段的提示文本 +function updateTableNameHints() { + const proTableHint = document.getElementById('pro_table_hint'); + const testTableHint = document.getElementById('test_table_hint'); + const useShardingPro = document.getElementById('use_sharding_for_pro'); + const useShardingTest = document.getElementById('use_sharding_for_test'); + + if (isShardingMode) { + proTableHint.textContent = (useShardingPro && useShardingPro.checked) ? + '基础表名(自动添加索引后缀)' : '完整表名'; + testTableHint.textContent = (useShardingTest && useShardingTest.checked) ? + '基础表名(自动添加索引后缀)' : '完整表名'; + } else { + proTableHint.textContent = '完整表名'; + testTableHint.textContent = '完整表名'; + } +} + // 加载配置组列表 async function loadConfigGroups() { try { @@ -155,6 +235,9 @@ async function saveConfigGroup() { const config = getCurrentConfig(); + // 获取分表配置 + const shardingConfig = getShardingConfig().sharding_config; + try { const response = await fetch('/api/config-groups', { method: 'POST', @@ -164,7 +247,8 @@ async function saveConfigGroup() { body: JSON.stringify({ name: name, description: description, - ...config + ...config, + sharding_config: shardingConfig }) }); @@ -178,7 +262,7 @@ async function saveConfigGroup() { // 重新加载配置组列表 await loadConfigGroups(); - showAlert('success', result.message); + showAlert('success', result.message + '(包含分表配置)'); } else { showAlert('danger', result.error || '保存配置组失败'); } @@ -437,13 +521,30 @@ async function executeQuery() { document.getElementById('loading').style.display = 'block'; document.getElementById('results').style.display = 'none'; + // 更新加载文本 + const loadingText = document.getElementById('loadingText'); + if (isShardingMode) { + loadingText.textContent = '正在执行分表查询比对...'; + } else { + loadingText.textContent = '正在执行查询比对...'; + } + try { - const response = await fetch('/api/query', { + let apiEndpoint = '/api/query'; + let requestConfig = config; + + // 如果启用了分表模式,使用分表查询API和配置 + if (isShardingMode) { + apiEndpoint = '/api/sharding-query'; + requestConfig = getShardingConfig(); + } + + const response = await fetch(apiEndpoint, { method: 'POST', headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify(config) + body: JSON.stringify(requestConfig) }); if (!response.ok) { @@ -455,6 +556,9 @@ async function executeQuery() { currentResults = results; displayResults(results); + // 自动刷新查询日志 + autoRefreshLogsAfterQuery(); + } catch (error) { showAlert('danger', '查询失败: ' + error.message); } finally { @@ -462,11 +566,31 @@ async function executeQuery() { } } +// 获取分表查询配置 +function getShardingConfig() { + const baseConfig = getCurrentConfig(); + + return { + ...baseConfig, + sharding_config: { + use_sharding_for_pro: document.getElementById('use_sharding_for_pro').checked, + use_sharding_for_test: document.getElementById('use_sharding_for_test').checked, + pro_interval_seconds: parseInt(document.getElementById('pro_interval_seconds').value) || 604800, + pro_table_count: parseInt(document.getElementById('pro_table_count').value) || 14, + test_interval_seconds: parseInt(document.getElementById('test_interval_seconds').value) || 604800, + test_table_count: parseInt(document.getElementById('test_table_count').value) || 14 + } + }; +} + // 显示查询结果 function displayResults(results) { // 显示统计信息 displayStats(results); + // 显示分表查询信息(如果有) + displayShardingInfo(results); + // 更新选项卡计数 document.getElementById('diff-count').textContent = results.differences.length; document.getElementById('identical-count').textContent = results.identical_results.length; @@ -487,7 +611,99 @@ function displayResults(results) { // 显示结果区域 document.getElementById('results').style.display = 'block'; - showAlert('success', `查询完成!共处理${results.total_keys}个Key,发现${results.differences.length}处差异,${results.identical_results.length}条记录完全相同`); + // 根据查询类型显示不同的成功消息 + const queryType = isShardingMode ? '分表查询' : '单表查询'; + showAlert('success', `${queryType}完成!共处理${results.total_keys}个Key,发现${results.differences.length}处差异,${results.identical_results.length}条记录完全相同`); +} + +// 显示分表查询信息 +function displayShardingInfo(results) { + const shardingInfoContainer = document.getElementById('shardingInfoContainer'); + + if (!isShardingMode || !results.sharding_info) { + shardingInfoContainer.style.display = 'none'; + return; + } + + const shardingInfo = results.sharding_info; + shardingInfoContainer.style.display = 'block'; + + let html = '
'; + + // 生产环境分表信息 + if (shardingInfo.pro_shards) { + html += ` +
+
生产环境分表信息
+
+ 配置:${shardingInfo.pro_shards.interval_seconds}秒间隔,${shardingInfo.pro_shards.table_count}张分表 +
+
+ `; + + if (shardingInfo.pro_shards.queried_tables) { + shardingInfo.pro_shards.queried_tables.forEach(table => { + const hasError = shardingInfo.pro_shards.error_tables && + shardingInfo.pro_shards.error_tables.includes(table); + const cssClass = hasError ? 'shard-error-info' : 'shard-table-info'; + html += `${table}`; + }); + } + + html += '
'; + } + + // 测试环境分表信息 + if (shardingInfo.test_shards) { + html += ` +
+
测试环境分表信息
+
+ 配置:${shardingInfo.test_shards.interval_seconds}秒间隔,${shardingInfo.test_shards.table_count}张分表 +
+
+ `; + + if (shardingInfo.test_shards.queried_tables) { + shardingInfo.test_shards.queried_tables.forEach(table => { + const hasError = shardingInfo.test_shards.error_tables && + shardingInfo.test_shards.error_tables.includes(table); + const cssClass = hasError ? 'shard-error-info' : 'shard-table-info'; + html += `${table}`; + }); + } + + html += '
'; + } + + html += '
'; + + // 添加分表计算统计信息 + if (shardingInfo.calculation_stats) { + html += ` +
+
+
分表计算统计
+
+
+ 处理Key数:${shardingInfo.calculation_stats.total_keys || 0} +
+
+ 成功解析时间戳:${shardingInfo.calculation_stats.successful_extractions || 0} +
+
+ 计算出分表数:${shardingInfo.calculation_stats.unique_shards || 0} +
+
+ 解析失败:${shardingInfo.calculation_stats.failed_extractions || 0} +
+
+
+
+ `; + } + + document.getElementById('shardingInfo').innerHTML = html; } // 显示统计信息 @@ -2722,4 +2938,131 @@ function showAlert(type, message) { alert.remove(); } }, 5000); +} + +// 查询日志相关功能 +let allQueryLogs = []; // 存储所有日志 + +async function refreshQueryLogs() { + try { + const response = await fetch('/api/query-logs'); + const result = await response.json(); + + if (result.success && result.data) { + allQueryLogs = result.data; + filterLogsByLevel(); + } else { + document.getElementById('query-logs').innerHTML = '
无法获取查询日志
'; + } + } catch (error) { + console.error('获取查询日志失败:', error); + document.getElementById('query-logs').innerHTML = '
获取查询日志失败
'; + } +} + +function filterLogsByLevel() { + const showInfo = document.getElementById('log-level-info').checked; + const showWarning = document.getElementById('log-level-warning').checked; + const showError = document.getElementById('log-level-error').checked; + + const filteredLogs = allQueryLogs.filter(log => { + switch(log.level) { + case 'INFO': return showInfo; + case 'WARNING': return showWarning; + case 'ERROR': return showError; + default: return true; + } + }); + + displayQueryLogs(filteredLogs); +} + +async function clearQueryLogs() { + if (!confirm('确定要清空所有查询日志吗?')) { + return; + } + + try { + const response = await fetch('/api/query-logs', { + method: 'DELETE' + }); + const result = await response.json(); + + if (result.success) { + document.getElementById('query-logs').innerHTML = '
查询日志已清空
'; + showAlert('success', '查询日志已清空'); + } else { + showAlert('danger', '清空查询日志失败: ' + result.error); + } + } catch (error) { + console.error('清空查询日志失败:', error); + showAlert('danger', '清空查询日志失败'); + } +} + +function displayQueryLogs(logs) { + const container = document.getElementById('query-logs'); + + if (!logs || logs.length === 0) { + container.innerHTML = '
暂无查询日志
'; + return; + } + + const logHtml = logs.map(log => { + const levelClass = { + 'INFO': 'text-primary', + 'WARNING': 'text-warning', + 'ERROR': 'text-danger', + 'DEBUG': 'text-secondary' + }[log.level] || 'text-dark'; + + const levelIcon = { + 'INFO': 'fas fa-info-circle', + 'WARNING': 'fas fa-exclamation-triangle', + 'ERROR': 'fas fa-times-circle', + 'DEBUG': 'fas fa-bug' + }[log.level] || 'fas fa-circle'; + + // 改进SQL高亮显示 + let message = escapeHtml(log.message); + + // 高亮SQL查询语句 + if (message.includes('执行查询SQL:')) { + message = message.replace(/执行查询SQL: (SELECT.*?);/g, + '执行查询SQL:
$1;'); + } + + // 高亮重要信息 + message = message.replace(/(\d+\.\d{3}秒)/g, '$1'); + message = message.replace(/(返回记录数=\d+)/g, '$1'); + message = message.replace(/(执行时间=[\d.]+秒)/g, '$1'); + + return ` +
+
+
+ + + [${log.level}] + +
${message}
+
+ ${log.timestamp} +
+
+ `; + }).join(''); + + container.innerHTML = logHtml; + + // 自动滚动到底部 + container.scrollTop = container.scrollHeight; +} + +// 在查询执行后自动刷新日志 +function autoRefreshLogsAfterQuery() { + // 延迟一下确保后端日志已经记录 + setTimeout(() => { + refreshQueryLogs(); + }, 500); } \ No newline at end of file diff --git a/templates/db_compare.html b/templates/db_compare.html index bb60bd6..ad30877 100644 --- a/templates/db_compare.html +++ b/templates/db_compare.html @@ -3,7 +3,7 @@ - 数据库查询比对工具 + 数据库查询比对工具 - 支持分表查询 @@ -244,7 +273,10 @@ 首页 + @@ -256,6 +288,7 @@

数据库查询比对工具 + 支持单表查询和分表查询两种模式

@@ -266,6 +299,72 @@

配置管理

+ +
+
+
查询模式
+
+
+
+ + +
+
+
+ + + +
@@ -367,6 +466,7 @@
+ 完整表名或基础表名(分表时)
@@ -419,6 +519,7 @@
+ 完整表名或基础表名(分表时)
@@ -432,7 +533,8 @@
- + + 分表模式下推荐使用包含时间戳的字段如wmid
@@ -453,11 +555,12 @@

查询Key管理

- + + 单表模式:输入普通Key值 | 分表模式:Key值应包含时间戳用于计算分表索引