From 4a0800a77632b8f564e2eb04fcc8b72a7b151c59 Mon Sep 17 00:00:00 2001 From: YoVinchen Date: Tue, 5 Aug 2025 19:56:38 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E9=A1=B9=E7=9B=AE=E6=95=B4?= =?UTF-8?q?=E5=90=88=E5=86=85=E5=AE=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CLAUDE.md | 446 +++++------ modules/api_routes.py | 143 +++- modules/config_manager.py | 88 +++ modules/redis_query.py | 325 +++++++- static/js/app.js | 112 ++- static/js/redis_compare.js | 1420 +++++++++++++++++++++++++++++++++- templates/redis_compare.html | 438 +++++++++-- 7 files changed, 2608 insertions(+), 364 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index 0bd4e41..1bd9f56 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -4,94 +4,89 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co ## 项目架构 -这是一个基于 Flask 的数据库查询比对工具,用于比较 Cassandra 数据库中生产环境和测试环境的数据差异。现已支持单表查询、TWCS分表查询和**多主键查询**三种核心功能。 +这是一个基于Flask的现代化数据库查询比对工具,支持Cassandra和Redis两大数据源的数据一致性验证。采用模块化架构设计,支持单表查询、TWCS分表查询、多主键查询和Redis集群比对等多种复杂场景。 ### 核心组件架构 -**后端 (Flask)** -- `app.py`: 主应用文件,包含所有API端点和数据处理逻辑 - - 数据库连接管理(Cassandra + SQLite) - - 查询执行和结果比对算法 - - 配置组管理(CRUD操作) - - JSON字段特殊处理和数组比较逻辑 - - 查询历史记录管理 - - **分表查询功能**: - - `ShardingCalculator`类:TWCS时间分表计算器 - - `execute_sharding_query()`:分表查询执行 - - `execute_mixed_query()`:混合查询支持(生产分表+测试单表组合) - - `/api/sharding-query`:分表查询API端点 - - **多主键查询功能(新增)**: - - `execute_query()`函数支持复合主键SQL构建 - - `compare_results()`函数支持复合主键匹配 - - `match_composite_key()`辅助函数处理复合主键比较 -- `config_groups.db`: SQLite数据库,存储用户保存的配置组、查询历史和分表配置 +**主应用 (app.py)** +- 应用入口和全局配置管理 +- 模块导入和路由设置 +- 日志系统初始化 + +**模块化后端 (modules/)** +- `api_routes.py`: 所有Flask路由和请求处理逻辑 + - Cassandra查询API:`/api/query`, `/api/sharding-query` + - Redis比对API:`/api/redis/compare` + - 配置管理API:配置组CRUD操作 + - 查询历史API:历史记录的保存和回放 +- `database.py`: SQLite数据库管理 + - 数据库初始化和表结构创建 + - 版本控制和字段动态添加 + - 事务处理和连接管理 +- `query_engine.py`: Cassandra查询引擎 + - 单表查询和分表查询执行 + - 多主键查询支持(复合主键SQL构建) + - 并行查询和性能优化 +- `redis_query.py`: Redis查询引擎 + - 全Redis数据类型支持(String/Hash/List/Set/ZSet/Stream) + - 随机采样和指定Key两种查询模式 + - 集群模式的自动检测和连接 +- `data_comparison.py`: 数据比对引擎 + - JSON和数组的智能深度比较 + - 复合主键的精确匹配算法 + - 数据质量评估和建议生成 +- `cassandra_client.py` / `redis_client.py`: 数据库客户端 + - 连接管理和错误处理 + - 性能监控和连接池优化 +- `config_groups.db`: SQLite数据库,存储配置组、查询历史和日志 **前端 (原生JavaScript + Bootstrap)** -- `templates/db_compare.html`: 主界面模板,**现已支持单表、分表和多主键三种模式** - - 分表模式切换开关 - - 生产/测试环境独立分表配置 - - 分表参数配置(时间间隔、分表数量) - - 分表查询信息展示 - - **多主键查询支持**:UI提示和占位符文本更新 -- `templates/index.html`: 工具集合首页 -- `static/js/app.js`: 核心前端逻辑 +- `templates/index.html`: 工具集合首页,提供功能导航 +- `templates/db_compare.html`: Cassandra比对界面 + - 支持单表、分表和多主键三种查询模式 + - 分表模式切换和参数配置 + - 实时查询日志和性能监控 +- `templates/redis_compare.html`: Redis比对界面 + - 集群配置和连接管理 + - 随机采样和指定Key两种查询模式 + - 全数据类型支持的结果展示 +- `static/js/app.js`: Cassandra查询的前端逻辑 - 配置管理和表单处理 - - 差异结果的分页展示系统 - - 原生数据展示(多种视图模式:格式化、原始、差异对比、树形) - - 高级错误处理和用户反馈 - - **分表查询支持**: - - `toggleShardingMode()`:分表模式切换 - - `getShardingConfig()`:分表配置获取 - - `displayShardingInfo()`:分表查询结果展示 - - **多主键查询支持(新增)**: - - `getCurrentConfig()`函数解析复合主键配置 - - `formatCompositeKey()`:复合主键显示格式化 - - UI占位符和提示文本支持复合主键格式 - -**分表查询功能模块(重要新增)** -- **时间戳提取算法(已更新)**: - - **新规则**:使用 `re.sub(r'\D', '', key)` 删除Key中所有非数字字符 - - 将提取到的数字字符串转换为整数作为时间戳 - - 支持任意格式的Key,只要包含数字即可 - - 示例:`wmid_1609459200` → `1609459200`,`abc123def456` → `123456` -- **分表索引计算**: - - 公式:`int(numbers) // interval_seconds % table_count` - - 默认配置:604800秒间隔(7天),14张分表 - - 支持自定义配置 -- **混合查询场景**: - - 生产环境分表 + 测试环境单表 - - 生产环境分表 + 测试环境分表 - - 生产环境单表 + 测试环境分表 - - 生产环境单表 + 测试环境单表 - -**多主键查询功能模块(最新功能)** -- **复合主键格式**: - - 主键字段:逗号分隔,如 `docid,id` - - 查询值:逗号分隔,如 `8825C293B3609175B2224236E984FEDB,8825C293B3609175B2224236E984FED` - - 一行一组复合主键值 -- **SQL构建逻辑**: - - 单主键:`key IN (val1, val2, val3)` - - 复合主键:`(key1='val1' AND key2='val2') OR (key1='val3' AND key2='val4')` -- **数据匹配算法**: - - `match_composite_key()`函数处理单主键和复合主键的统一匹配 - - 支持字段数量验证和类型转换 -- **向后兼容**: - - 完全兼容现有单主键查询 - - 自动识别主键类型并采用相应处理逻辑 - -**核心文件** -- `app.py`: 唯一的主应用文件,包含所有功能实现 -- `config_groups.db`: SQLite数据库文件 + - 分页展示和数据可视化 + - 多主键查询的UI适配 +- `static/js/redis_compare.js`: Redis比对的前端逻辑 + - Redis集群配置管理 + - 查询模式切换和参数设置 + - 多类型数据的格式化展示 ### 关键功能模块 -**数据比对引擎** +**Cassandra数据比对引擎** - 支持复杂JSON字段的深度比较 - 数组字段的顺序无关比较 - 字段级别的差异统计和分析 - 数据质量评估和建议生成 - 支持包含和排除特定字段的比较 -- **多主键数据比对**:支持复合主键的精确匹配和差异检测 +- 多主键数据比对:支持复合主键的精确匹配和差异检测 + +**Redis数据比对引擎** +- 全数据类型支持:String、Hash、List、Set、ZSet、Stream +- 智能JSON检测和深度比较 +- 集群和单节点模式的自动适配 +- 随机采样和指定Key两种查询模式 +- 性能监控和连接时间统计 + +**分表查询功能模块** +- **时间戳提取算法**:使用 `re.sub(r'\D', '', key)` 删除Key中所有非数字字符 +- **分表索引计算**:公式 `int(numbers) // interval_seconds % table_count` +- **混合查询场景**:支持生产分表+测试单表等组合 +- **并行查询**:多分表同时查询以提高性能 + +**多主键查询功能模块** +- **复合主键格式**:主键字段逗号分隔,查询值逗号分隔 +- **SQL构建逻辑**:自动选择IN查询或OR条件组合 +- **数据匹配算法**:统一处理单主键和复合主键匹配 +- **向后兼容**:完全兼容现有单主键查询 **用户界面特性** - 分页系统(差异记录和相同记录) @@ -100,12 +95,7 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co - 配置导入导出和管理 - 详细的错误诊断和故障排查指南 - 查询历史记录和复用 -- **查询日志系统(新增)**: - - 实时显示SQL执行日志 - - 支持日志级别过滤(INFO/WARNING/ERROR) - - SQL语句语法高亮显示 - - 执行时间和记录数统计 - - 日志清空和刷新功能 +- 查询日志系统:实时显示SQL执行日志,支持日志级别过滤 ## 开发相关命令 @@ -117,95 +107,113 @@ pip install -r requirements.txt # 运行应用(默认端口5000) python app.py -# 自定义端口运行 -# 修改app.py最后一行:app.run(debug=True, port=XXXX) +# 开发模式启动(支持热重载) +# app.py中默认开启debug=True # 生产环境运行 -# python app.py 或使用WSGI服务器如gunicorn -``` - -### 调试和日志 -```bash -# 应用启动后,查询日志可通过以下API获取: -# GET /api/query-logs - 获取查询日志 -# DELETE /api/query-logs - 清空查询日志 - -# 日志级别:INFO, WARNING, ERROR -# 所有Cassandra查询和SQL操作都会记录到查询日志中 +# 使用WSGI服务器如gunicorn +gunicorn -w 4 -b 0.0.0.0:5000 app:app ``` ### 测试和验证 ```bash # 主要通过Web界面进行功能测试 -# 单表查询测试:http://localhost:5000/db-compare -# 分表查询测试:在Web界面中开启分表模式 +# Cassandra单表查询:http://localhost:5000/db-compare +# Redis数据比对:http://localhost:5000/redis-compare +# 工具集合首页:http://localhost:5000 -# 多主键查询测试示例: +# Cassandra多主键查询测试示例: # 1. 在主键字段中输入:docid,id # 2. 在查询Key值中输入(每行一组): # 8825C293B3609175B2224236E984FEDB,8825C293B3609175B2224236E984FED # 9925C293B3609175B2224236E984FEDB,9925C293B3609175B2224236E984FED +# Redis查询测试示例: +# 1. 配置两个Redis集群连接 +# 2. 选择随机采样模式,设置采样数量 +# 3. 或选择指定Key模式,输入要比对的Key列表 + # 数据库初始化(如果config_groups.db不存在) # 通过访问Web界面会自动创建数据库表结构 ``` -### 开发模式 -应用默认运行在debug模式,代码修改后自动重启。访问: -- http://localhost:5000 - 工具集合首页 -- http://localhost:5000/db-compare - 数据库比对工具 - ### 依赖项 - Flask==2.3.3 - cassandra-driver==3.29.1 +- redis==5.0.1 ### 项目特点 -- **单文件架构**:所有后端逻辑都在 `app.py` 中实现(2230+行代码) -- **内存+数据库日志系统**:使用 `QueryLogCollector` 类在内存和SQLite中收集查询日志 -- **SQLite本地存储**:配置组、查询历史和日志存储在本地 `config_groups.db` 文件中 -- **前端原生实现**:使用原生JavaScript + Bootstrap,无现代前端框架 -- **多模式支持**:单表查询、分表查询、多主键查询的统一架构 +- **模块化架构**:清晰的代码组织和职责分离 +- **双数据源支持**:同时支持Cassandra和Redis数据比对 +- **智能查询引擎**:针对不同数据源的优化查询策略 +- **SQLite本地存储**:配置组、查询历史和日志的本地持久化 +- **前端原生实现**:使用原生JavaScript + Bootstrap,无现代前端框架依赖 +- **多模式支持**:单表查询、分表查询、多主键查询、Redis比对的统一架构 ## API架构说明 -### 核心API端点 -- `GET /api/default-config`: 获取默认数据库配置 -- `POST /api/query`: 执行单表数据库查询比对(**支持多主键查询**) -- `POST /api/sharding-query`: 执行分表查询比对(**支持多主键查询**) -- `GET /api/config-groups`: 获取所有配置组 -- `POST /api/config-groups`: 创建新配置组 -- `GET /api/config-groups/`: 获取特定配置组 -- `DELETE /api/config-groups/`: 删除配置组 +### Cassandra相关API端点 +- `GET /api/default-config`: 获取默认Cassandra配置 +- `POST /api/query`: 执行单表数据库查询比对(支持多主键查询) +- `POST /api/sharding-query`: 执行分表查询比对(支持多主键查询) +- `GET /api/config-groups`: 获取所有Cassandra配置组 +- `POST /api/config-groups`: 创建新Cassandra配置组 +- `GET /api/config-groups/`: 获取特定Cassandra配置组 +- `DELETE /api/config-groups/`: 删除Cassandra配置组 + +### Redis相关API端点 +- `POST /api/redis/compare`: 执行Redis数据比对 +- `POST /api/redis/test-connection`: 测试Redis连接 +- `GET /api/redis/config-groups`: 获取所有Redis配置组 +- `POST /api/redis/config-groups`: 创建新Redis配置组 +- `GET /api/redis/config-groups/`: 获取特定Redis配置组 +- `DELETE /api/redis/config-groups/`: 删除Redis配置组 +- `GET /api/redis/query-history`: 获取Redis查询历史 +- `POST /api/redis/query-history`: 保存Redis查询历史 +- `GET /api/redis/query-history/`: 获取特定Redis历史记录 +- `DELETE /api/redis/query-history/`: 删除Redis历史记录 + +### 通用API端点 - `POST /api/init-db`: 初始化SQLite数据库 -- `GET /api/query-history`: 获取查询历史 -- `POST /api/query-history`: 保存查询历史 -- `GET /api/query-history/`: 获取特定历史记录 +- `GET /api/query-history`: 获取Cassandra查询历史 +- `POST /api/query-history`: 保存Cassandra查询历史 +- `GET /api/query-history/`: 获取特定Cassandra历史记录 - `GET /api/query-history//results`: 获取历史记录的完整结果数据 -- `DELETE /api/query-history/`: 删除历史记录 +- `DELETE /api/query-history/`: 删除Cassandra历史记录 - `GET /api/query-logs`: 获取查询日志(支持limit参数) - `GET /api/query-logs/history/`: 获取特定历史记录的相关日志 - `DELETE /api/query-logs`: 清空查询日志 ### 查询比对流程 -**单表查询流程(`/api/query`)**: +**Cassandra单表查询流程(`/api/query`)**: 1. 前端发送配置和Key值列表到 `/api/query` -2. 后端创建两个Cassandra连接(生产+测试) -3. 并行执行查询,获取原始数据 -4. 运行比较算法,生成差异报告 +2. 后端通过 `cassandra_client.py` 创建两个Cassandra连接(生产+测试) +3. `query_engine.py` 并行执行查询,获取原始数据 +4. `data_comparison.py` 运行比较算法,生成差异报告 5. 返回完整结果(差异、统计、原始数据) -**分表查询流程(`/api/sharding-query`)**: +**Cassandra分表查询流程(`/api/sharding-query`)**: 1. 前端发送配置、Key值列表和分表配置到 `/api/sharding-query` -2. 后端使用 `ShardingCalculator` 解析Key中的时间戳 +2. 后端使用 `sharding.py` 中的 `ShardingCalculator` 解析Key中的时间戳 3. 根据分表算法计算每个Key对应的分表名称 -4. 创建分表映射关系,并行执行分表查询 +4. `query_engine.py` 创建分表映射关系,并行执行分表查询 5. 汇总所有分表结果,执行比较算法 6. 返回包含分表信息的完整结果 +**Redis数据比对流程(`/api/redis/compare`)**: +1. 前端发送Redis集群配置和查询参数到 `/api/redis/compare` +2. 后端通过 `redis_client.py` 创建两个Redis连接 +3. `redis_query.py` 根据查询模式执行数据获取: + - 随机采样模式:从源集群随机获取指定数量的Key + - 指定Key模式:查询用户提供的Key列表 +4. 针对每个Key,查询其在两个集群中的值和数据类型 +5. 执行智能数据比较(根据数据类型选择比较策略) +6. 返回比对结果和统计信息 + ## 数据结构和配置 -### 数据库配置结构 +### Cassandra配置结构 **单表查询配置**: ```javascript @@ -240,6 +248,32 @@ python app.py } ``` +### Redis配置结构 + +**Redis集群配置**: +```javascript +{ + source_config: { + name: "源集群名称", + nodes: [ + {host: "192.168.1.200", port: 7000}, + {host: "192.168.1.201", port: 7001} + ], + password: "redis_password", + socket_timeout: 3, + socket_connect_timeout: 3, + max_connections_per_node: 16 + }, + target_config: { /* 同上 */ }, + query_config: { + mode: "random_sample", // 或 "specific_keys" + sample_size: 1000, // 随机采样数量(random_sample模式) + keys: ["key1", "key2"], // 指定Key列表(specific_keys模式) + key_pattern: "*" // Key匹配模式(可选) + } +} +``` + **多主键查询格式示例**: ```javascript // 复合主键配置 @@ -253,6 +287,8 @@ values: [ ``` ### 查询结果结构 + +**Cassandra查询结果**: ```javascript { total_keys, pro_count, test_count, @@ -282,103 +318,77 @@ values: [ } ``` +**Redis查询结果**: +```javascript +{ + total_keys, source_count, target_count, + identical_count, different_count, source_only_count, target_only_count, + + // 详细比对结果 + identical_keys: ["key1", "key2"], + different_keys: [ + { + key: "key3", + source_type: "string", target_type: "string", + source_value: "value1", target_value: "value2", + message: "Value mismatch" + } + ], + source_only_keys: ["key4"], // 仅源集群存在 + target_only_keys: ["key5"], // 仅目标集群存在 + + // 统计信息 + type_distribution: { + "string": 500, "hash": 200, "list": 100, + "set": 50, "zset": 30, "stream": 20 + }, + consistency_percentage: 85.5, + + // 性能统计 + query_time: 2.5, + source_connection_time: 0.1, + target_connection_time: 0.15 +} +``` + ## 开发注意事项 ### 代码修改指导 -- **单文件开发**:所有后端功能都在 `app.py` 中,修改时要注意代码结构清晰 +- **模块化开发**:功能按模块组织,修改时注意模块间的依赖关系 - **数据库模式变更**:修改SQLite表结构需要考虑向后兼容性 -- **前端JavaScript**:位于 `static/js/app.js`,使用原生JS,注意浏览器兼容性 +- **前端JavaScript**:分别位于 `static/js/app.js`(Cassandra)和 `static/js/redis_compare.js`(Redis) - **HTML模板**:使用Jinja2模板引擎,主要文件在 `templates/` 目录 -### 核心类和函数位置(app.py) -- `QueryLogCollector`类:日志收集系统(第23-276行) -- `ShardingCalculator`类:分表计算器(第291行开始) -- 数据库连接:`create_connection()` (第1072行) -- 查询比对:`execute_query()` (第1177行) 和 `execute_sharding_query()` (第1250行) -- **多主键支持**:`match_composite_key()` (第1407行) -- API路由:使用Flask装饰器定义 +### 关键模块和类位置 +- **主应用**:`app.py` - 应用入口和模块集成 +- **路由管理**:`modules/api_routes.py` - 所有API端点的实现 +- **数据库管理**:`modules/database.py` - SQLite数据库操作 +- **Cassandra客户端**:`modules/cassandra_client.py` - 连接管理和查询执行 +- **Redis客户端**:`modules/redis_client.py` - Redis连接和性能监控 +- **查询引擎**:`modules/query_engine.py` - Cassandra查询逻辑 +- **Redis查询**:`modules/redis_query.py` - Redis数据比对逻辑 +- **数据比较**:`modules/data_comparison.py` - 智能数据比较算法 +- **分表计算**:`modules/sharding.py` - TWCS分表逻辑 +- **配置管理**:`modules/config_manager.py` - 配置组管理 +- **日志收集**:`modules/query_logger.py` - 查询日志系统 -### 分表功能开发指导 -- **时间戳解析(已更新)**:`ShardingCalculator.extract_timestamp_from_key()` 新规则 - - 使用 `re.sub(r'\D', '', key)` 删除所有非数字字符 - - 将提取的数字字符串转换为整数作为时间戳 - - 不再进行时间戳有效性验证,支持任意数字组合 -- **分表索引计算**:使用公式 `int(numbers) // interval_seconds % table_count` -- **错误处理**:Key中没有数字字符时会记录到 `failed_keys` 中 -- **混合查询**:支持生产环境分表+测试环境单表的组合场景 -- **前端状态**:分表模式通过 `toggleShardingMode()` 切换,影响UI和提示文本 +### 模块依赖关系 +``` +app.py +├── modules/api_routes.py (路由层) + ├── modules/config_manager.py (配置管理) + ├── modules/cassandra_client.py (Cassandra连接) + ├── modules/redis_client.py (Redis连接) + ├── modules/query_engine.py (Cassandra查询) + │ ├── modules/sharding.py (分表计算) + │ └── modules/data_comparison.py (数据比较) + ├── modules/redis_query.py (Redis查询) + └── modules/database.py (SQLite数据库) +``` -### 多主键功能开发指导 -- **主键解析**:前端通过逗号分隔解析主键字段和值 -- **SQL构建**:后端 `execute_query()` 根据主键数量选择不同的WHERE条件构建策略 -- **数据匹配**:`match_composite_key()` 函数统一处理单主键和复合主键匹配逻辑 -- **UI适配**:占位符和提示文本根据模式动态更新 -- **结果展示**:支持复合主键对象格式的显示和格式化 - -### Cassandra连接处理 -- 连接包含详细的错误诊断和重试机制 -- 使用DCAwareRoundRobinPolicy避免负载均衡警告 -- 连接超时设置为10秒 -- 失败时提供网络连通性测试 -- 支持认证(PlainTextAuthProvider) -- 支持集群配置(cluster_name, datacenter) - -### 前端状态管理 -- `currentResults`: 存储最新查询结果 -- 分页状态:`currentIdenticalPage`, `currentDifferencePage` -- 过滤状态:`filteredIdenticalResults`, `filteredDifferenceResults` -- **日志状态(新增)**:`allQueryLogs` - 存储所有查询日志 - -### JSON和数组字段处理 -- `normalize_json_string()`: 标准化JSON字符串用于比较 -- `compare_array_values()`: 数组的顺序无关比较 -- `is_json_field()`: 智能检测JSON字段 -- 前端提供专门的JSON语法高亮和树形展示 - -### 错误处理策略 -- 后端:分类错误(connection_error, validation_error, query_error, system_error) -- 前端:详细错误展示,包含配置信息、解决建议、连接测试工具 -- 提供交互式故障排查指南 -- **查询日志(新增)**:所有SQL执行和错误信息都会记录到查询日志中 - -### 性能考虑 -- 大数据集的分页处理 -- 原生数据的延迟加载 -- JSON格式化的客户端缓存 -- 搜索和过滤的防抖处理 - -### SQLite数据库表结构 - -**config_groups表** -- id: 主键 -- name: 配置组名称(唯一) -- description: 描述 -- pro_config: 生产环境配置(JSON) -- test_config: 测试环境配置(JSON) -- query_config: 查询配置(JSON) -- **sharding_config: 分表配置(JSON,新增字段)** -- created_at/updated_at: 时间戳 - -**query_history表** -- id: 主键 -- name: 查询名称 -- description: 描述 -- pro_config/test_config/query_config: 配置(JSON) -- query_keys: 查询的键值(JSON) -- results_summary: 结果摘要(JSON) -- execution_time: 执行时间 -- total_keys/differences_count/identical_count: 统计数据 -- **sharding_config: 分表配置(JSON,新增字段)** -- **query_type: 查询类型('single'/'sharding',新增字段)** -- **raw_results/differences_data/identical_data: 查询结果数据(新增字段)** -- created_at: 时间戳 - -**query_logs表(新增表)** -- id: 主键 -- batch_id: 批次ID -- **history_id: 关联历史记录ID(外键)** -- timestamp: 时间戳 -- level: 日志级别(INFO/WARNING/ERROR) -- message: 日志消息 -- query_type: 查询类型 -- created_at: 创建时间 \ No newline at end of file +### 开发最佳实践 +- **错误处理**:每个模块都有详细的错误分类和处理机制 +- **日志记录**:使用统一的日志系统,支持不同级别的日志输出 +- **性能监控**:查询时间和连接时间的详细统计 +- **配置管理**:支持配置的导入导出和版本管理 +- **数据安全**:敏感信息(密码)的安全处理 \ No newline at end of file diff --git a/modules/api_routes.py b/modules/api_routes.py index b02e158..d5bd17f 100644 --- a/modules/api_routes.py +++ b/modules/api_routes.py @@ -13,11 +13,13 @@ from .config_manager import ( get_config_group_by_id, delete_config_group, save_query_history, get_query_history, get_query_history_by_id, delete_query_history, + batch_delete_query_history, # Redis配置管理 REDIS_DEFAULT_CONFIG, save_redis_config_group, get_redis_config_groups, get_redis_config_group_by_id, delete_redis_config_group, save_redis_query_history, get_redis_query_history, get_redis_query_history_by_id, delete_redis_query_history, + batch_delete_redis_query_history, parse_redis_config_from_yaml ) from .cassandra_client import create_connection @@ -38,15 +40,26 @@ def setup_routes(app, query_log_collector): def index(): return render_template('index.html') + @app.route('/test-config-load') + def test_config_load(): + """配置加载测试页面""" + return send_from_directory('.', 'test_config_load.html') + @app.route('/db-compare') def db_compare(): - """Cassandra数据库比对工具页面""" return render_template('db_compare.html') @app.route('/redis-compare') def redis_compare(): - """Redis数据比对工具页面""" return render_template('redis_compare.html') + + @app.route('/redis-js-test') + def redis_js_test(): + return render_template('redis_js_test.html') + + @app.route('/redis-test') + def redis_test(): + return render_template('redis_test.html') # 基础API @app.route('/api/default-config') @@ -593,6 +606,41 @@ def setup_routes(app, query_log_collector): else: return jsonify({'success': False, 'error': '查询历史记录删除失败'}), 500 + @app.route('/api/query-history/batch-delete', methods=['POST']) + def api_batch_delete_query_history(): + """批量删除Cassandra查询历史记录""" + try: + data = request.get_json() + if not data: + return jsonify({'success': False, 'error': '请求数据格式错误'}), 400 + + history_ids = data.get('history_ids', []) + + # 验证参数 + if not history_ids: + return jsonify({'success': False, 'error': '请提供要删除的历史记录ID列表'}), 400 + + if not isinstance(history_ids, list): + return jsonify({'success': False, 'error': 'history_ids必须是数组'}), 400 + + # 验证所有ID都是整数 + try: + history_ids = [int(id) for id in history_ids] + except (ValueError, TypeError): + return jsonify({'success': False, 'error': '历史记录ID必须是整数'}), 400 + + # 调用批量删除函数 + result = batch_delete_query_history(history_ids) + + if result['success']: + return jsonify(result) + else: + return jsonify(result), 500 + + except Exception as e: + logger.error(f"批量删除Cassandra查询历史记录异常: {e}") + return jsonify({'success': False, 'error': f'服务器内部错误: {str(e)}'}), 500 + # 查询日志管理API @app.route('/api/query-logs', methods=['GET']) def api_get_query_logs(): @@ -1087,22 +1135,89 @@ def setup_routes(app, query_log_collector): else: return jsonify({'success': False, 'error': 'Redis查询历史记录删除失败'}), 500 + @app.route('/api/redis/query-history/batch-delete', methods=['POST']) + def api_batch_delete_redis_query_history(): + """批量删除Redis查询历史记录""" + try: + data = request.get_json() + if not data or 'history_ids' not in data: + return jsonify({'success': False, 'error': '请提供要删除的历史记录ID列表'}), 400 + + history_ids = data['history_ids'] + if not isinstance(history_ids, list): + return jsonify({'success': False, 'error': '历史记录ID列表格式错误'}), 400 + + if not history_ids: + return jsonify({'success': False, 'error': '没有要删除的记录'}), 400 + + # 验证ID都是整数 + try: + history_ids = [int(id) for id in history_ids] + except (ValueError, TypeError): + return jsonify({'success': False, 'error': '历史记录ID格式错误'}), 400 + + result = batch_delete_redis_query_history(history_ids) + + if result['success']: + return jsonify(result) + else: + return jsonify(result), 500 + + except Exception as e: + logger.error(f"批量删除Redis历史记录异常: {e}") + return jsonify({'success': False, 'error': f'批量删除失败: {str(e)}'}), 500 + # Redis查询日志API @app.route('/api/redis/query-logs', methods=['GET']) def api_get_redis_query_logs(): - """获取Redis查询日志""" + """获取Redis查询日志,支持分组显示""" try: - limit = request.args.get('limit', 100, type=int) - # 获取最新的查询日志 - logs = query_log_collector.get_logs(limit=limit) - - # 过滤Redis相关的日志 - redis_logs = [] - for log in logs: - if (log.get('message') and 'redis' in log.get('message', '').lower()) or log.get('query_type') == 'redis': - redis_logs.append(log) - - return jsonify({'success': True, 'data': redis_logs}) + limit = request.args.get('limit', 1000, type=int) + grouped = request.args.get('grouped', 'true').lower() == 'true' # 默认分组显示 + from_db = request.args.get('from_db', 'true').lower() == 'true' # 默认从数据库获取 + + if grouped: + # 返回分组日志 + grouped_logs = query_log_collector.get_logs_grouped_by_batch(limit, from_db) + # 过滤出Redis相关的日志组 + redis_grouped_logs = [] + for batch_id, logs in grouped_logs: + # 过滤每个批次中的日志,只保留Redis相关的 + redis_logs = [ + log for log in logs + if log.get('query_type') == 'redis' or + (log.get('message') and 'redis' in log.get('message', '').lower()) + ] + if redis_logs: # 只有当批次中有Redis日志时才添加 + redis_grouped_logs.append([batch_id, redis_logs]) + + # 获取总数(用于统计) + total_logs = sum(len(logs) for _, logs in redis_grouped_logs) + + return jsonify({ + 'success': True, + 'data': redis_grouped_logs, + 'grouped': True, + 'total_logs': total_logs, + 'from_db': from_db + }) + else: + # 返回平铺日志 + logs = query_log_collector.get_logs(limit, from_db) + # 过滤Redis相关的日志 + redis_logs = [ + log for log in logs + if log.get('query_type') == 'redis' or + (log.get('message') and 'redis' in log.get('message', '').lower()) + ] + + return jsonify({ + 'success': True, + 'data': redis_logs, + 'grouped': False, + 'total_logs': len(redis_logs), + 'from_db': from_db + }) except Exception as e: logger.error(f"获取Redis查询日志失败: {e}") return jsonify({'success': False, 'error': str(e)}), 500 diff --git a/modules/config_manager.py b/modules/config_manager.py index 310e51a..fd9cdfe 100644 --- a/modules/config_manager.py +++ b/modules/config_manager.py @@ -366,6 +366,50 @@ def delete_redis_query_history(history_id): finally: conn.close() +def batch_delete_redis_query_history(history_ids): + """批量删除Redis查询历史记录""" + if not history_ids: + return {'success': True, 'message': '没有要删除的记录', 'deleted_count': 0} + + if not ensure_database(): + logger.error("数据库初始化失败") + return {'success': False, 'error': '数据库初始化失败', 'deleted_count': 0} + + conn = get_db_connection() + cursor = conn.cursor() + + try: + # 构建IN子句的占位符 + placeholders = ','.join(['?' for _ in history_ids]) + sql = f'DELETE FROM redis_query_history WHERE id IN ({placeholders})' + + cursor.execute(sql, history_ids) + conn.commit() + deleted_count = cursor.rowcount + + if deleted_count > 0: + logger.info(f"成功批量删除 {deleted_count} 条Redis查询历史记录: {history_ids}") + return { + 'success': True, + 'message': f'成功删除 {deleted_count} 条记录', + 'deleted_count': deleted_count + } + else: + return { + 'success': False, + 'error': '没有找到要删除的记录', + 'deleted_count': 0 + } + except Exception as e: + logger.error(f"批量删除Redis查询历史记录失败: {e}") + return { + 'success': False, + 'error': f'删除失败: {str(e)}', + 'deleted_count': 0 + } + finally: + conn.close() + def parse_redis_config_from_yaml(yaml_text): """从YAML格式文本解析Redis配置""" try: @@ -698,5 +742,49 @@ def delete_query_history(history_id): except Exception as e: logger.error(f"删除查询历史记录失败: {e}") return False + finally: + conn.close() + +def batch_delete_query_history(history_ids): + """批量删除Cassandra查询历史记录""" + if not history_ids: + return {'success': True, 'message': '没有要删除的记录', 'deleted_count': 0} + + if not ensure_database(): + logger.error("数据库初始化失败") + return {'success': False, 'error': '数据库初始化失败', 'deleted_count': 0} + + conn = get_db_connection() + cursor = conn.cursor() + + try: + # 构建IN子句的占位符 + placeholders = ','.join(['?' for _ in history_ids]) + sql = f'DELETE FROM query_history WHERE id IN ({placeholders})' + + cursor.execute(sql, history_ids) + conn.commit() + deleted_count = cursor.rowcount + + if deleted_count > 0: + logger.info(f"成功批量删除 {deleted_count} 条Cassandra查询历史记录: {history_ids}") + return { + 'success': True, + 'message': f'成功删除 {deleted_count} 条记录', + 'deleted_count': deleted_count + } + else: + return { + 'success': False, + 'error': '没有找到要删除的记录', + 'deleted_count': 0 + } + except Exception as e: + logger.error(f"批量删除Cassandra查询历史记录失败: {e}") + return { + 'success': False, + 'error': f'删除失败: {str(e)}', + 'deleted_count': 0 + } finally: conn.close() \ No newline at end of file diff --git a/modules/redis_query.py b/modules/redis_query.py index 3246950..8494143 100644 --- a/modules/redis_query.py +++ b/modules/redis_query.py @@ -66,6 +66,93 @@ except ImportError: query_log_collector = DummyQueryLogCollector() +def _get_redis_command_by_type(redis_type): + """根据Redis数据类型返回对应的查询命令""" + command_map = { + 'string': 'GET', + 'hash': 'HGETALL', + 'list': 'LRANGE', + 'set': 'SMEMBERS', + 'zset': 'ZRANGE', + 'stream': 'XRANGE' + } + return command_map.get(redis_type, 'TYPE') + +def _get_data_summary(key_info): + """获取数据内容的概要信息""" + if not key_info['exists']: + return "不存在" + + key_type = key_info['type'] + value = key_info['value'] + + try: + if key_type == 'string': + if isinstance(value, str): + if len(value) > 50: + return f"字符串({len(value)}字符): {value[:47]}..." + else: + return f"字符串: {value}" + else: + return f"字符串: {str(value)[:50]}..." + + elif key_type == 'hash': + if isinstance(value, dict): + field_count = len(value) + sample_fields = list(value.keys())[:3] + fields_str = ", ".join(sample_fields) + if field_count > 3: + fields_str += "..." + return f"哈希({field_count}个字段): {fields_str}" + else: + return f"哈希: {str(value)[:50]}..." + + elif key_type == 'list': + if isinstance(value, list): + list_len = len(value) + if list_len > 0: + first_item = str(value[0])[:20] if value[0] else "空" + return f"列表({list_len}个元素): [{first_item}...]" + else: + return "列表(空)" + else: + return f"列表: {str(value)[:50]}..." + + elif key_type == 'set': + if isinstance(value, (set, list)): + set_len = len(value) + if set_len > 0: + first_item = str(list(value)[0])[:20] if value else "空" + return f"集合({set_len}个元素): {{{first_item}...}}" + else: + return "集合(空)" + else: + return f"集合: {str(value)[:50]}..." + + elif key_type == 'zset': + if isinstance(value, list): + zset_len = len(value) + if zset_len > 0: + first_item = f"{value[0][0]}:{value[0][1]}" if value[0] else "空" + return f"有序集合({zset_len}个元素): {{{first_item}...}}" + else: + return "有序集合(空)" + else: + return f"有序集合: {str(value)[:50]}..." + + elif key_type == 'stream': + if isinstance(value, list): + stream_len = len(value) + return f"流({stream_len}条消息)" + else: + return f"流: {str(value)[:50]}..." + + else: + return f"未知类型: {str(value)[:50]}..." + + except Exception as e: + return f"解析错误: {str(e)[:30]}..." + def get_random_keys_from_redis(redis_client, count=100, pattern="*", performance_tracker=None): """ 从Redis集群中获取随机keys @@ -83,31 +170,52 @@ def get_random_keys_from_redis(redis_client, count=100, pattern="*", performance keys = set() logger.info(f"开始扫描获取随机keys,目标数量: {count},模式: {pattern}") - query_log_collector.add_log('INFO', f"开始扫描获取随机keys,目标数量: {count},模式: {pattern}") - + query_log_collector.add_log('INFO', f"🔍 开始扫描Key,目标数量: {count},匹配模式: '{pattern}'") + try: # 使用scan_iter获取keys scan_count = max(count * 2, 1000) # 扫描更多key以确保随机性 - + query_log_collector.add_log('INFO', f"📡 执行SCAN命令,扫描批次大小: {scan_count}") + + scan_iterations = 0 for key in redis_client.scan_iter(match=pattern, count=scan_count): keys.add(key) + scan_iterations += 1 + + # 每扫描1000个key记录一次进度 + if scan_iterations % 1000 == 0: + query_log_collector.add_log('INFO', f"📊 扫描进度: 已发现 {len(keys)} 个匹配的Key") + if len(keys) >= count * 3: # 获取更多key以便随机选择 break - + + total_found = len(keys) + query_log_collector.add_log('INFO', f"🎯 扫描完成,共发现 {total_found} 个匹配的Key") + # 如果获取的key数量超过需要的数量,随机选择 if len(keys) > count: keys = random.sample(list(keys), count) + query_log_collector.add_log('INFO', f"🎲 从 {total_found} 个Key中随机选择 {count} 个") else: keys = list(keys) - + if total_found < count: + query_log_collector.add_log('WARNING', f"⚠️ 实际找到的Key数量({total_found})少于目标数量({count})") + + # 记录选中的Key样本(前10个) + key_sample = keys[:10] if len(keys) > 10 else keys + key_list_str = ", ".join([f"'{k}'" for k in key_sample]) + if len(keys) > 10: + key_list_str += f" ... (共{len(keys)}个)" + query_log_collector.add_log('INFO', f"📋 选中的Key样本: [{key_list_str}]") + end_time = time.time() scan_duration = end_time - start_time - + if performance_tracker: performance_tracker.record_scan_time(scan_duration) - + logger.info(f"扫描获取 {len(keys)} 个随机keys,耗时 {scan_duration:.3f} 秒") - query_log_collector.add_log('INFO', f"扫描获取 {len(keys)} 个随机keys,耗时 {scan_duration:.3f} 秒") + query_log_collector.add_log('INFO', f"✅ Key扫描完成,最终获取 {len(keys)} 个keys,总耗时 {scan_duration:.3f} 秒") return keys except RedisError as e: @@ -124,36 +232,64 @@ def get_random_keys_from_redis(redis_client, count=100, pattern="*", performance def get_redis_values_by_keys(redis_client, keys, cluster_name="Redis集群", performance_tracker=None): """ 批量查询Redis中指定keys的值,支持所有Redis数据类型(String、Hash、List、Set、ZSet等) - + Args: redis_client: Redis客户端 keys: 要查询的key列表 cluster_name: 集群名称用于日志 performance_tracker: 性能追踪器 - + Returns: list: 对应keys的值信息字典列表,包含类型、值和显示格式 """ from .redis_types import get_redis_value_with_type - + start_time = time.time() result = [] - + logger.info(f"开始从{cluster_name}批量查询 {len(keys)} 个keys(支持所有数据类型)") - query_log_collector.add_log('INFO', f"开始从{cluster_name}批量查询 {len(keys)} 个keys(支持所有数据类型)") - + query_log_collector.add_log('INFO', f"📊 开始从{cluster_name}批量查询 {len(keys)} 个keys(支持所有数据类型)") + + # 记录要查询的Key列表(前10个,避免日志过长) + key_sample = keys[:10] if len(keys) > 10 else keys + key_list_str = ", ".join([f"'{k}'" for k in key_sample]) + if len(keys) > 10: + key_list_str += f" ... (共{len(keys)}个)" + query_log_collector.add_log('INFO', f"🔍 查询Key列表: [{key_list_str}]") + try: # 逐个查询每个key,支持所有Redis数据类型 - for key in keys: + redis_commands_used = {} # 记录使用的Redis命令 + + for i, key in enumerate(keys): + key_start_time = time.time() key_info = get_redis_value_with_type(redis_client, key) + key_duration = time.time() - key_start_time + result.append(key_info) - + + # 记录每个key的查询详情 + if key_info['exists']: + key_type = key_info['type'] + # 根据类型确定使用的Redis命令 + redis_cmd = _get_redis_command_by_type(key_type) + redis_commands_used[redis_cmd] = redis_commands_used.get(redis_cmd, 0) + 1 + + # 获取数据内容概要 + data_summary = _get_data_summary(key_info) + + query_log_collector.add_log('INFO', + f"✅ Key '{key}' | 类型: {key_type} | 命令: {redis_cmd} | 数据: {data_summary} | 耗时: {key_duration:.3f}s") + else: + query_log_collector.add_log('WARNING', + f"❌ Key '{key}' | 状态: 不存在 | 耗时: {key_duration:.3f}s") + end_time = time.time() query_duration = end_time - start_time - + if performance_tracker: performance_tracker.record_query(f"{cluster_name}_typed_batch_query", query_duration) - + # 统计成功获取的key数量和类型分布 successful_count = sum(1 for r in result if r['exists']) type_stats = {} @@ -161,11 +297,14 @@ def get_redis_values_by_keys(redis_client, keys, cluster_name="Redis集群", per if r['exists']: key_type = r['type'] type_stats[key_type] = type_stats.get(key_type, 0) + 1 - + + # 记录Redis命令使用统计 + cmd_stats = ", ".join([f"{cmd}: {count}" for cmd, count in redis_commands_used.items()]) if redis_commands_used else "无" type_info = ", ".join([f"{t}: {c}" for t, c in type_stats.items()]) if type_stats else "无" - logger.info(f"从{cluster_name}查询完成,成功获取 {successful_count}/{len(keys)} 个值,数据类型分布: [{type_info}],耗时 {query_duration:.3f} 秒") - query_log_collector.add_log('INFO', f"从{cluster_name}查询完成,成功获取 {successful_count}/{len(keys)} 个值,数据类型分布: [{type_info}],耗时 {query_duration:.3f} 秒") - + + query_log_collector.add_log('INFO', f"🎯 Redis命令统计: [{cmd_stats}]") + query_log_collector.add_log('INFO', f"📈 从{cluster_name}查询完成,成功获取 {successful_count}/{len(keys)} 个值,数据类型分布: [{type_info}],总耗时 {query_duration:.3f} 秒") + return result except Exception as e: @@ -200,19 +339,27 @@ def compare_redis_data(client1, client2, keys, cluster1_name="生产集群", clu comparison_start_time = time.time() logger.info(f"开始比较 {cluster1_name} 和 {cluster2_name} 的数据(支持所有Redis数据类型)") - query_log_collector.add_log('INFO', f"开始比较 {cluster1_name} 和 {cluster2_name} 的数据(支持所有Redis数据类型)") - + query_log_collector.add_log('INFO', f"🔄 开始比较 {cluster1_name} 和 {cluster2_name} 的数据(支持所有Redis数据类型)") + query_log_collector.add_log('INFO', f"📊 比较范围: {len(keys)} 个Key") + # 获取两个集群的数据 + query_log_collector.add_log('INFO', f"📥 第一步: 从{cluster1_name}获取数据") values1 = get_redis_values_by_keys(client1, keys, cluster1_name, performance_tracker) if not values1: - return {'error': f'从{cluster1_name}获取数据失败'} - + error_msg = f'从{cluster1_name}获取数据失败' + query_log_collector.add_log('ERROR', f"❌ {error_msg}") + return {'error': error_msg} + + query_log_collector.add_log('INFO', f"📥 第二步: 从{cluster2_name}获取数据") values2 = get_redis_values_by_keys(client2, keys, cluster2_name, performance_tracker) if not values2: - return {'error': f'从{cluster2_name}获取数据失败'} - + error_msg = f'从{cluster2_name}获取数据失败' + query_log_collector.add_log('ERROR', f"❌ {error_msg}") + return {'error': error_msg} + # 开始数据比对 compare_start = time.time() + query_log_collector.add_log('INFO', f"🔍 第三步: 开始逐个比较Key的数据内容") # 初始化统计数据 stats = { @@ -230,14 +377,27 @@ def compare_redis_data(client1, client2, keys, cluster1_name="生产集群", clu missing_results = [] # 逐个比较 + comparison_details = [] # 记录比较详情 + for i, key in enumerate(keys): key_str = key.decode('utf-8') if isinstance(key, bytes) else key value1_info = values1[i] value2_info = values2[i] - + # 使用redis_types模块的比较函数 comparison_result = compare_redis_values(value1_info, value2_info) - + + # 记录比较详情 + comparison_detail = { + 'key': key_str, + 'cluster1_exists': value1_info['exists'], + 'cluster2_exists': value2_info['exists'], + 'cluster1_type': value1_info.get('type'), + 'cluster2_type': value2_info.get('type'), + 'status': comparison_result['status'] + } + comparison_details.append(comparison_detail) + if comparison_result['status'] == 'both_missing': stats['both_missing'] += 1 missing_results.append({ @@ -245,6 +405,8 @@ def compare_redis_data(client1, client2, keys, cluster1_name="生产集群", clu 'status': 'both_missing', 'message': comparison_result['message'] }) + query_log_collector.add_log('WARNING', f"⚠️ Key '{key_str}': 两个集群都不存在") + elif comparison_result['status'] == 'missing_in_cluster1': stats['missing_in_cluster1'] += 1 missing_results.append({ @@ -255,6 +417,8 @@ def compare_redis_data(client1, client2, keys, cluster1_name="生产集群", clu 'cluster2_type': value2_info['type'], 'message': comparison_result['message'] }) + query_log_collector.add_log('WARNING', f"❌ Key '{key_str}': 仅在{cluster2_name}存在 (类型: {value2_info['type']})") + elif comparison_result['status'] == 'missing_in_cluster2': stats['missing_in_cluster2'] += 1 missing_results.append({ @@ -265,6 +429,7 @@ def compare_redis_data(client1, client2, keys, cluster1_name="生产集群", clu 'cluster2_value': None, 'message': comparison_result['message'] }) + query_log_collector.add_log('WARNING', f"❌ Key '{key_str}': 仅在{cluster1_name}存在 (类型: {value1_info['type']})") elif comparison_result['status'] == 'identical': stats['identical_count'] += 1 identical_results.append({ @@ -272,6 +437,8 @@ def compare_redis_data(client1, client2, keys, cluster1_name="生产集群", clu 'value': value1_info['display_value'], 'type': value1_info['type'] }) + query_log_collector.add_log('INFO', f"✅ Key '{key_str}': 数据一致 (类型: {value1_info['type']})") + else: # different stats['different_count'] += 1 different_results.append({ @@ -282,6 +449,14 @@ def compare_redis_data(client1, client2, keys, cluster1_name="生产集群", clu 'cluster2_type': value2_info['type'], 'message': comparison_result['message'] }) + # 记录差异详情 + type_info = f"{value1_info['type']} vs {value2_info['type']}" if value1_info['type'] != value2_info['type'] else value1_info['type'] + query_log_collector.add_log('WARNING', f"🔄 Key '{key_str}': 数据不一致 (类型: {type_info}) - {comparison_result['message']}") + + # 每处理100个key记录一次进度 + if (i + 1) % 100 == 0: + progress = f"{i + 1}/{len(keys)}" + query_log_collector.add_log('INFO', f"📊 比较进度: {progress} ({((i + 1) / len(keys) * 100):.1f}%)") compare_end = time.time() comparison_duration = compare_end - compare_start @@ -317,10 +492,68 @@ def compare_redis_data(client1, client2, keys, cluster1_name="生产集群", clu } } + # 记录详细的比较总结 + query_log_collector.add_log('INFO', f"🎯 数据比对完成,纯比较耗时 {comparison_duration:.3f} 秒,总耗时 {total_duration:.3f} 秒") + + # 记录统计信息 + query_log_collector.add_log('INFO', f"📊 比对统计总览:") + query_log_collector.add_log('INFO', f" • 总Key数量: {stats['total_keys']}") + query_log_collector.add_log('INFO', f" • ✅ 数据一致: {stats['identical_count']} ({stats['identical_percentage']}%)") + query_log_collector.add_log('INFO', f" • 🔄 数据不同: {stats['different_count']} ({stats['different_percentage']}%)") + query_log_collector.add_log('INFO', f" • ❌ 仅{cluster1_name}存在: {stats['missing_in_cluster2']}") + query_log_collector.add_log('INFO', f" • ❌ 仅{cluster2_name}存在: {stats['missing_in_cluster1']}") + query_log_collector.add_log('INFO', f" • ⚠️ 两集群都不存在: {stats['both_missing']}") + + # 记录性能信息 + if performance_tracker: + query_log_collector.add_log('INFO', f"⚡ 性能统计: 平均每Key比较耗时 {(comparison_duration / len(keys) * 1000):.2f}ms") + + # 记录所有Key的详细信息 + query_log_collector.add_log('INFO', f"📋 全部Key详细信息:") + + # 统计类型分布 + type_distribution = {} + for detail in comparison_details: + key_str = detail['key'] + cluster1_type = detail.get('cluster1_type', 'N/A') + cluster2_type = detail.get('cluster2_type', 'N/A') + status = detail.get('status', 'unknown') + + # 统计类型分布 + if cluster1_type != 'N/A': + type_distribution[cluster1_type] = type_distribution.get(cluster1_type, 0) + 1 + elif cluster2_type != 'N/A': + type_distribution[cluster2_type] = type_distribution.get(cluster2_type, 0) + 1 + + # 记录每个Key的详细信息 + if status == 'identical': + query_log_collector.add_log('INFO', f" ✅ {key_str} → 类型: {cluster1_type}, 状态: 数据一致") + elif status == 'different': + type_info = cluster1_type if cluster1_type == cluster2_type else f"{cluster1_name}:{cluster1_type} vs {cluster2_name}:{cluster2_type}" + query_log_collector.add_log('INFO', f" 🔄 {key_str} → 类型: {type_info}, 状态: 数据不同") + elif status == 'missing_in_cluster1': + query_log_collector.add_log('INFO', f" ❌ {key_str} → 类型: {cluster2_type}, 状态: 仅在{cluster2_name}存在") + elif status == 'missing_in_cluster2': + query_log_collector.add_log('INFO', f" ❌ {key_str} → 类型: {cluster1_type}, 状态: 仅在{cluster1_name}存在") + elif status == 'both_missing': + query_log_collector.add_log('INFO', f" ⚠️ {key_str} → 类型: N/A, 状态: 两集群都不存在") + + # 记录类型分布统计 + if type_distribution: + query_log_collector.add_log('INFO', f"📊 数据类型分布统计:") + for data_type, count in sorted(type_distribution.items()): + percentage = (count / len(keys)) * 100 + query_log_collector.add_log('INFO', f" • {data_type}: {count} 个 ({percentage:.1f}%)") + + # 记录Key列表摘要 + key_summary = [detail['key'] for detail in comparison_details[:10]] # 显示前10个key + key_list_str = ', '.join(key_summary) + if len(comparison_details) > 10: + key_list_str += f" ... (共{len(comparison_details)}个Key)" + query_log_collector.add_log('INFO', f"📝 Key列表摘要: [{key_list_str}]") + logger.info(f"数据比对完成,耗时 {comparison_duration:.3f} 秒") logger.info(f"比对统计: 总计{stats['total_keys']}个key,相同{stats['identical_count']}个,不同{stats['different_count']}个,缺失{stats['missing_in_cluster1'] + stats['missing_in_cluster2'] + stats['both_missing']}个") - query_log_collector.add_log('INFO', f"数据比对完成,耗时 {comparison_duration:.3f} 秒") - query_log_collector.add_log('INFO', f"比对统计: 总计{stats['total_keys']}个key,相同{stats['identical_count']}个,不同{stats['different_count']}个,缺失{stats['missing_in_cluster1'] + stats['missing_in_cluster2'] + stats['both_missing']}个") return result @@ -345,7 +578,11 @@ def execute_redis_comparison(config1, config2, query_options): cluster2_name = config2.get('name', '测试集群') logger.info(f"开始执行Redis数据比较: {cluster1_name} vs {cluster2_name}") - query_log_collector.add_log('INFO', f"开始执行Redis数据比较: {cluster1_name} vs {cluster2_name}") + + # 开始新的查询批次,使用redis查询类型 + batch_id = query_log_collector.start_new_batch('redis') + query_log_collector.add_log('INFO', f"🚀 开始执行Redis数据比较: {cluster1_name} vs {cluster2_name}") + query_log_collector.add_log('INFO', f"📋 查询批次ID: {batch_id}") # 创建连接 client1 = create_redis_client(config1, cluster1_name, performance_tracker) @@ -404,14 +641,26 @@ def execute_redis_comparison(config1, config2, query_options): # 添加性能报告 comparison_result['performance_report'] = performance_tracker.generate_report() comparison_result['query_options'] = query_options - + comparison_result['batch_id'] = batch_id # 添加批次ID到结果中 + + # 记录最终结果 + if comparison_result.get('success'): + query_log_collector.add_log('INFO', f"🎉 Redis数据比较执行成功完成") + + # 结束当前批次 + query_log_collector.end_current_batch() + return comparison_result - + except Exception as e: logger.error(f"Redis数据比较执行失败: {e}") - query_log_collector.add_log('ERROR', f"Redis数据比较执行失败: {e}") - return {'error': f'执行失败: {str(e)}'} - + query_log_collector.add_log('ERROR', f"💥 Redis数据比较执行失败: {e}") + + # 结束当前批次 + query_log_collector.end_current_batch() + + return {'error': f'执行失败: {str(e)}', 'batch_id': batch_id} + finally: # 关闭连接 try: diff --git a/static/js/app.js b/static/js/app.js index d873b7d..bfa6eaa 100644 --- a/static/js/app.js +++ b/static/js/app.js @@ -2425,15 +2425,36 @@ async function showQueryHistoryDialog() { if (result.data.length === 0) { historyList = '

暂无查询历史记录

'; } else { + // 添加批量操作控制栏 + historyList += ` +
+
+ + +
+
+ +
+
+ `; + result.data.forEach(history => { const createdDate = new Date(history.created_at).toLocaleString(); const consistencyRate = history.total_keys > 0 ? Math.round((history.identical_count / history.total_keys) * 100) : 0; + const safeName = (history.name || '').replace(/'/g, "\\'"); historyList += `
+
+ +
${history.name} @@ -2467,7 +2488,7 @@ async function showQueryHistoryDialog() { -
@@ -2484,7 +2505,7 @@ async function showQueryHistoryDialog() {