diff --git a/app.py b/app.py index e453ffa..c51a411 100644 --- a/app.py +++ b/app.py @@ -188,7 +188,7 @@ def init_database(): ) ''') - # 创建查询历史表 + # 创建查询历史表,包含分表配置字段 cursor.execute(''' CREATE TABLE IF NOT EXISTS query_history ( id INTEGER PRIMARY KEY AUTOINCREMENT, @@ -203,6 +203,8 @@ def init_database(): total_keys INTEGER NOT NULL, differences_count INTEGER NOT NULL, identical_count INTEGER NOT NULL, + sharding_config TEXT, + query_type TEXT DEFAULT 'single', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') @@ -262,6 +264,23 @@ def ensure_database(): conn.commit() logger.info("sharding_config字段添加成功") + # 检查query_history表是否有分表相关字段 + cursor.execute("PRAGMA table_info(query_history)") + history_columns = cursor.fetchall() + history_column_names = [column[1] for column in history_columns] + + if 'sharding_config' not in history_column_names: + logger.info("添加sharding_config字段到query_history表...") + cursor.execute("ALTER TABLE query_history ADD COLUMN sharding_config TEXT") + conn.commit() + logger.info("query_history表sharding_config字段添加成功") + + if 'query_type' not in history_column_names: + logger.info("添加query_type字段到query_history表...") + cursor.execute("ALTER TABLE query_history ADD COLUMN query_type TEXT DEFAULT 'single'") + conn.commit() + logger.info("query_history表query_type字段添加成功") + conn.close() return True except Exception as e: @@ -614,8 +633,9 @@ def delete_config_group(group_id): conn.close() def save_query_history(name, description, pro_config, test_config, query_config, query_keys, - results_summary, execution_time, total_keys, differences_count, identical_count): - """保存查询历史记录""" + results_summary, execution_time, total_keys, differences_count, identical_count, + sharding_config=None, query_type='single'): + """保存查询历史记录,支持分表查询""" if not ensure_database(): logger.error("数据库初始化失败") return False @@ -627,8 +647,9 @@ def save_query_history(name, description, pro_config, test_config, query_config, cursor.execute(''' INSERT INTO query_history (name, description, pro_config, test_config, query_config, query_keys, - results_summary, execution_time, total_keys, differences_count, identical_count) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + results_summary, execution_time, total_keys, differences_count, identical_count, + sharding_config, query_type) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( name, description, json.dumps(pro_config), @@ -639,10 +660,12 @@ def save_query_history(name, description, pro_config, test_config, query_config, execution_time, total_keys, differences_count, - identical_count + identical_count, + json.dumps(sharding_config) if sharding_config else None, + query_type )) conn.commit() - logger.info(f"查询历史记录 '{name}' 保存成功") + logger.info(f"查询历史记录 '{name}' 保存成功,查询类型:{query_type}") return True except Exception as e: logger.error(f"保存查询历史记录失败: {e}") @@ -662,7 +685,7 @@ def get_query_history(): try: cursor.execute(''' SELECT id, name, description, execution_time, total_keys, - differences_count, identical_count, created_at + differences_count, identical_count, created_at, query_type FROM query_history ORDER BY created_at DESC ''') @@ -678,7 +701,8 @@ def get_query_history(): 'total_keys': row['total_keys'], 'differences_count': row['differences_count'], 'identical_count': row['identical_count'], - 'created_at': row['created_at'] + 'created_at': row['created_at'], + 'query_type': row.get('query_type', 'single') }) return history_list @@ -717,7 +741,10 @@ def get_query_history_by_id(history_id): 'total_keys': row['total_keys'], 'differences_count': row['differences_count'], 'identical_count': row['identical_count'], - 'created_at': row['created_at'] + 'created_at': row['created_at'], + # 处理新字段,保持向后兼容 + 'sharding_config': json.loads(row['sharding_config']) if row.get('sharding_config') else None, + 'query_type': row.get('query_type', 'single') } return None except Exception as e: @@ -749,7 +776,7 @@ def delete_query_history(history_id): conn.close() def create_connection(config): - """创建Cassandra连接,带有增强的错误诊断""" + """创建Cassandra连接,带有增强的错误诊断和容错机制""" start_time = time.time() logger.info(f"=== 开始创建Cassandra连接 ===") @@ -763,13 +790,49 @@ def create_connection(config): auth_provider = PlainTextAuthProvider(username=config['username'], password=config['password']) logger.info("正在创建集群连接...") - cluster = Cluster(config['hosts'], port=config['port'], auth_provider=auth_provider) + # 设置连接池配置,提高容错性 + from cassandra.policies import DCAwareRoundRobinPolicy + + # 设置负载均衡策略,避免单点故障 + load_balancing_policy = DCAwareRoundRobinPolicy(local_dc=config.get('datacenter', 'dc1')) + + # 创建连接配置,增加容错参数 + cluster = Cluster( + config['hosts'], + port=config['port'], + auth_provider=auth_provider, + load_balancing_policy=load_balancing_policy, + # 增加容错配置 + protocol_version=4, # 使用稳定的协议版本 + connect_timeout=15, # 连接超时 + control_connection_timeout=15, # 控制连接超时 + max_schema_agreement_wait=30 # schema同步等待时间 + ) logger.info("正在连接到Keyspace...") session = cluster.connect(config['keyspace']) + # 设置session级别的容错参数 + session.default_timeout = 30 # 查询超时时间 + connection_time = time.time() - start_time - logger.info(f"✅ Cassandra连接成功: 连接时间={connection_time:.3f}秒, 集群={cluster.metadata.cluster_name}") + logger.info(f"✅ Cassandra连接成功: 连接时间={connection_time:.3f}秒") + + # 记录集群状态 + try: + cluster_name = cluster.metadata.cluster_name or "Unknown" + logger.info(f" 集群名称: {cluster_name}") + + # 记录可用主机状态 + live_hosts = [str(host.address) for host in cluster.metadata.all_hosts() if host.is_up] + down_hosts = [str(host.address) for host in cluster.metadata.all_hosts() if not host.is_up] + + logger.info(f" 可用节点: {live_hosts} ({len(live_hosts)}个)") + if down_hosts: + logger.warning(f" 故障节点: {down_hosts} ({len(down_hosts)}个)") + + except Exception as meta_error: + logger.warning(f"无法获取集群元数据: {meta_error}") return cluster, session @@ -1523,7 +1586,7 @@ def api_get_query_history(): @app.route('/api/query-history', methods=['POST']) def api_save_query_history(): - """保存查询历史记录""" + """保存查询历史记录,支持分表查询""" try: data = request.json name = data.get('name', '').strip() @@ -1537,6 +1600,9 @@ def api_save_query_history(): total_keys = data.get('total_keys', 0) differences_count = data.get('differences_count', 0) identical_count = data.get('identical_count', 0) + # 新增分表相关字段 + sharding_config = data.get('sharding_config') + query_type = data.get('query_type', 'single') if not name: return jsonify({'success': False, 'error': '历史记录名称不能为空'}), 400 @@ -1544,11 +1610,12 @@ def api_save_query_history(): success = save_query_history( name, description, pro_config, test_config, query_config, query_keys, results_summary, execution_time, total_keys, - differences_count, identical_count + differences_count, identical_count, sharding_config, query_type ) if success: - return jsonify({'success': True, 'message': '查询历史记录保存成功'}) + query_type_desc = '分表查询' if query_type == 'sharding' else '单表查询' + return jsonify({'success': True, 'message': f'{query_type_desc}历史记录保存成功'}) else: return jsonify({'success': False, 'error': '查询历史记录保存失败'}), 500 diff --git a/static/js/app.js b/static/js/app.js index 3d10ba7..b837edf 100644 --- a/static/js/app.js +++ b/static/js/app.js @@ -2417,7 +2417,12 @@ async function showQueryHistoryDialog() {
${history.description || '无描述'}