From eb48cf17e670742eeb28e8573c27f04513062b31 Mon Sep 17 00:00:00 2001 From: YoVinchen Date: Sat, 2 Aug 2025 22:21:45 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E5=96=84=E6=9F=A5=E8=AF=A2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app.py | 99 ++++++++++++++++++++++++++++++++++++++++-------- static/js/app.js | 43 +++++++++++++++++++-- 2 files changed, 123 insertions(+), 19 deletions(-) diff --git a/app.py b/app.py index e453ffa..c51a411 100644 --- a/app.py +++ b/app.py @@ -188,7 +188,7 @@ def init_database(): ) ''') - # 创建查询历史表 + # 创建查询历史表,包含分表配置字段 cursor.execute(''' CREATE TABLE IF NOT EXISTS query_history ( id INTEGER PRIMARY KEY AUTOINCREMENT, @@ -203,6 +203,8 @@ def init_database(): total_keys INTEGER NOT NULL, differences_count INTEGER NOT NULL, identical_count INTEGER NOT NULL, + sharding_config TEXT, + query_type TEXT DEFAULT 'single', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') @@ -262,6 +264,23 @@ def ensure_database(): conn.commit() logger.info("sharding_config字段添加成功") + # 检查query_history表是否有分表相关字段 + cursor.execute("PRAGMA table_info(query_history)") + history_columns = cursor.fetchall() + history_column_names = [column[1] for column in history_columns] + + if 'sharding_config' not in history_column_names: + logger.info("添加sharding_config字段到query_history表...") + cursor.execute("ALTER TABLE query_history ADD COLUMN sharding_config TEXT") + conn.commit() + logger.info("query_history表sharding_config字段添加成功") + + if 'query_type' not in history_column_names: + logger.info("添加query_type字段到query_history表...") + cursor.execute("ALTER TABLE query_history ADD COLUMN query_type TEXT DEFAULT 'single'") + conn.commit() + logger.info("query_history表query_type字段添加成功") + conn.close() return True except Exception as e: @@ -614,8 +633,9 @@ def delete_config_group(group_id): conn.close() def save_query_history(name, description, pro_config, test_config, query_config, query_keys, - results_summary, execution_time, total_keys, differences_count, identical_count): - """保存查询历史记录""" + results_summary, execution_time, total_keys, differences_count, identical_count, + sharding_config=None, query_type='single'): + """保存查询历史记录,支持分表查询""" if not ensure_database(): logger.error("数据库初始化失败") return False @@ -627,8 +647,9 @@ def save_query_history(name, description, pro_config, test_config, query_config, cursor.execute(''' INSERT INTO query_history (name, description, pro_config, test_config, query_config, query_keys, - results_summary, execution_time, total_keys, differences_count, identical_count) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + results_summary, execution_time, total_keys, differences_count, identical_count, + sharding_config, query_type) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( name, description, json.dumps(pro_config), @@ -639,10 +660,12 @@ def save_query_history(name, description, pro_config, test_config, query_config, execution_time, total_keys, differences_count, - identical_count + identical_count, + json.dumps(sharding_config) if sharding_config else None, + query_type )) conn.commit() - logger.info(f"查询历史记录 '{name}' 保存成功") + logger.info(f"查询历史记录 '{name}' 保存成功,查询类型:{query_type}") return True except Exception as e: logger.error(f"保存查询历史记录失败: {e}") @@ -662,7 +685,7 @@ def get_query_history(): try: cursor.execute(''' SELECT id, name, description, execution_time, total_keys, - differences_count, identical_count, created_at + differences_count, identical_count, created_at, query_type FROM query_history ORDER BY created_at DESC ''') @@ -678,7 +701,8 @@ def get_query_history(): 'total_keys': row['total_keys'], 'differences_count': row['differences_count'], 'identical_count': row['identical_count'], - 'created_at': row['created_at'] + 'created_at': row['created_at'], + 'query_type': row.get('query_type', 'single') }) return history_list @@ -717,7 +741,10 @@ def get_query_history_by_id(history_id): 'total_keys': row['total_keys'], 'differences_count': row['differences_count'], 'identical_count': row['identical_count'], - 'created_at': row['created_at'] + 'created_at': row['created_at'], + # 处理新字段,保持向后兼容 + 'sharding_config': json.loads(row['sharding_config']) if row.get('sharding_config') else None, + 'query_type': row.get('query_type', 'single') } return None except Exception as e: @@ -749,7 +776,7 @@ def delete_query_history(history_id): conn.close() def create_connection(config): - """创建Cassandra连接,带有增强的错误诊断""" + """创建Cassandra连接,带有增强的错误诊断和容错机制""" start_time = time.time() logger.info(f"=== 开始创建Cassandra连接 ===") @@ -763,13 +790,49 @@ def create_connection(config): auth_provider = PlainTextAuthProvider(username=config['username'], password=config['password']) logger.info("正在创建集群连接...") - cluster = Cluster(config['hosts'], port=config['port'], auth_provider=auth_provider) + # 设置连接池配置,提高容错性 + from cassandra.policies import DCAwareRoundRobinPolicy + + # 设置负载均衡策略,避免单点故障 + load_balancing_policy = DCAwareRoundRobinPolicy(local_dc=config.get('datacenter', 'dc1')) + + # 创建连接配置,增加容错参数 + cluster = Cluster( + config['hosts'], + port=config['port'], + auth_provider=auth_provider, + load_balancing_policy=load_balancing_policy, + # 增加容错配置 + protocol_version=4, # 使用稳定的协议版本 + connect_timeout=15, # 连接超时 + control_connection_timeout=15, # 控制连接超时 + max_schema_agreement_wait=30 # schema同步等待时间 + ) logger.info("正在连接到Keyspace...") session = cluster.connect(config['keyspace']) + # 设置session级别的容错参数 + session.default_timeout = 30 # 查询超时时间 + connection_time = time.time() - start_time - logger.info(f"✅ Cassandra连接成功: 连接时间={connection_time:.3f}秒, 集群={cluster.metadata.cluster_name}") + logger.info(f"✅ Cassandra连接成功: 连接时间={connection_time:.3f}秒") + + # 记录集群状态 + try: + cluster_name = cluster.metadata.cluster_name or "Unknown" + logger.info(f" 集群名称: {cluster_name}") + + # 记录可用主机状态 + live_hosts = [str(host.address) for host in cluster.metadata.all_hosts() if host.is_up] + down_hosts = [str(host.address) for host in cluster.metadata.all_hosts() if not host.is_up] + + logger.info(f" 可用节点: {live_hosts} ({len(live_hosts)}个)") + if down_hosts: + logger.warning(f" 故障节点: {down_hosts} ({len(down_hosts)}个)") + + except Exception as meta_error: + logger.warning(f"无法获取集群元数据: {meta_error}") return cluster, session @@ -1523,7 +1586,7 @@ def api_get_query_history(): @app.route('/api/query-history', methods=['POST']) def api_save_query_history(): - """保存查询历史记录""" + """保存查询历史记录,支持分表查询""" try: data = request.json name = data.get('name', '').strip() @@ -1537,6 +1600,9 @@ def api_save_query_history(): total_keys = data.get('total_keys', 0) differences_count = data.get('differences_count', 0) identical_count = data.get('identical_count', 0) + # 新增分表相关字段 + sharding_config = data.get('sharding_config') + query_type = data.get('query_type', 'single') if not name: return jsonify({'success': False, 'error': '历史记录名称不能为空'}), 400 @@ -1544,11 +1610,12 @@ def api_save_query_history(): success = save_query_history( name, description, pro_config, test_config, query_config, query_keys, results_summary, execution_time, total_keys, - differences_count, identical_count + differences_count, identical_count, sharding_config, query_type ) if success: - return jsonify({'success': True, 'message': '查询历史记录保存成功'}) + query_type_desc = '分表查询' if query_type == 'sharding' else '单表查询' + return jsonify({'success': True, 'message': f'{query_type_desc}历史记录保存成功'}) else: return jsonify({'success': False, 'error': '查询历史记录保存失败'}), 500 diff --git a/static/js/app.js b/static/js/app.js index 3d10ba7..b837edf 100644 --- a/static/js/app.js +++ b/static/js/app.js @@ -2417,7 +2417,12 @@ async function showQueryHistoryDialog() {
-
${history.name}
+
+ ${history.name} + + ${history.query_type === 'sharding' ? '分表查询' : '单表查询'} + +

${history.description || '无描述'}

@@ -2531,11 +2536,31 @@ async function loadHistoryRecord(historyId) { // 填充查询Key值 document.getElementById('query_values').value = (history.query_keys || []).join('\n'); + // 处理分表配置(如果存在) + if (history.query_type === 'sharding' && history.sharding_config) { + // 启用分表模式 + document.getElementById('enableSharding').checked = true; + toggleShardingMode(); + + // 填充分表配置 + document.getElementById('use_sharding_for_pro').checked = history.sharding_config.use_sharding_for_pro || false; + document.getElementById('use_sharding_for_test').checked = history.sharding_config.use_sharding_for_test || false; + document.getElementById('pro_interval_seconds').value = history.sharding_config.pro_interval_seconds || 604800; + document.getElementById('pro_table_count').value = history.sharding_config.pro_table_count || 14; + document.getElementById('test_interval_seconds').value = history.sharding_config.test_interval_seconds || 604800; + document.getElementById('test_table_count').value = history.sharding_config.test_table_count || 14; + } else { + // 禁用分表模式 + document.getElementById('enableSharding').checked = false; + toggleShardingMode(); + } + // 关闭历史记录modal const modal = bootstrap.Modal.getInstance(document.getElementById('queryHistoryModal')); modal.hide(); - showAlert('success', `历史记录 "${history.name}" 加载成功`); + const queryTypeDesc = history.query_type === 'sharding' ? '分表查询' : '单表查询'; + showAlert('success', `${queryTypeDesc}历史记录 "${history.name}" 加载成功`); } else { showAlert('danger', result.error || '加载历史记录失败'); } @@ -2803,6 +2828,15 @@ async function saveHistoryRecord() { const config = getCurrentConfig(); + // 确定查询类型和获取相应配置 + let queryType = 'single'; + let shardingConfig = null; + + if (isShardingMode) { + queryType = 'sharding'; + shardingConfig = getShardingConfig().sharding_config; + } + try { const response = await fetch('/api/query-history', { method: 'POST', @@ -2820,7 +2854,10 @@ async function saveHistoryRecord() { execution_time: 0.0, total_keys: currentResults.total_keys, differences_count: currentResults.differences.length, - identical_count: currentResults.identical_results.length + identical_count: currentResults.identical_results.length, + // 新增分表相关字段 + sharding_config: shardingConfig, + query_type: queryType }) });