完善查询
This commit is contained in:
99
app.py
99
app.py
@@ -188,7 +188,7 @@ def init_database():
|
||||
)
|
||||
''')
|
||||
|
||||
# 创建查询历史表
|
||||
# 创建查询历史表,包含分表配置字段
|
||||
cursor.execute('''
|
||||
CREATE TABLE IF NOT EXISTS query_history (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
@@ -203,6 +203,8 @@ def init_database():
|
||||
total_keys INTEGER NOT NULL,
|
||||
differences_count INTEGER NOT NULL,
|
||||
identical_count INTEGER NOT NULL,
|
||||
sharding_config TEXT,
|
||||
query_type TEXT DEFAULT 'single',
|
||||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
||||
)
|
||||
''')
|
||||
@@ -262,6 +264,23 @@ def ensure_database():
|
||||
conn.commit()
|
||||
logger.info("sharding_config字段添加成功")
|
||||
|
||||
# 检查query_history表是否有分表相关字段
|
||||
cursor.execute("PRAGMA table_info(query_history)")
|
||||
history_columns = cursor.fetchall()
|
||||
history_column_names = [column[1] for column in history_columns]
|
||||
|
||||
if 'sharding_config' not in history_column_names:
|
||||
logger.info("添加sharding_config字段到query_history表...")
|
||||
cursor.execute("ALTER TABLE query_history ADD COLUMN sharding_config TEXT")
|
||||
conn.commit()
|
||||
logger.info("query_history表sharding_config字段添加成功")
|
||||
|
||||
if 'query_type' not in history_column_names:
|
||||
logger.info("添加query_type字段到query_history表...")
|
||||
cursor.execute("ALTER TABLE query_history ADD COLUMN query_type TEXT DEFAULT 'single'")
|
||||
conn.commit()
|
||||
logger.info("query_history表query_type字段添加成功")
|
||||
|
||||
conn.close()
|
||||
return True
|
||||
except Exception as e:
|
||||
@@ -614,8 +633,9 @@ def delete_config_group(group_id):
|
||||
conn.close()
|
||||
|
||||
def save_query_history(name, description, pro_config, test_config, query_config, query_keys,
|
||||
results_summary, execution_time, total_keys, differences_count, identical_count):
|
||||
"""保存查询历史记录"""
|
||||
results_summary, execution_time, total_keys, differences_count, identical_count,
|
||||
sharding_config=None, query_type='single'):
|
||||
"""保存查询历史记录,支持分表查询"""
|
||||
if not ensure_database():
|
||||
logger.error("数据库初始化失败")
|
||||
return False
|
||||
@@ -627,8 +647,9 @@ def save_query_history(name, description, pro_config, test_config, query_config,
|
||||
cursor.execute('''
|
||||
INSERT INTO query_history
|
||||
(name, description, pro_config, test_config, query_config, query_keys,
|
||||
results_summary, execution_time, total_keys, differences_count, identical_count)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
results_summary, execution_time, total_keys, differences_count, identical_count,
|
||||
sharding_config, query_type)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
''', (
|
||||
name, description,
|
||||
json.dumps(pro_config),
|
||||
@@ -639,10 +660,12 @@ def save_query_history(name, description, pro_config, test_config, query_config,
|
||||
execution_time,
|
||||
total_keys,
|
||||
differences_count,
|
||||
identical_count
|
||||
identical_count,
|
||||
json.dumps(sharding_config) if sharding_config else None,
|
||||
query_type
|
||||
))
|
||||
conn.commit()
|
||||
logger.info(f"查询历史记录 '{name}' 保存成功")
|
||||
logger.info(f"查询历史记录 '{name}' 保存成功,查询类型:{query_type}")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"保存查询历史记录失败: {e}")
|
||||
@@ -662,7 +685,7 @@ def get_query_history():
|
||||
try:
|
||||
cursor.execute('''
|
||||
SELECT id, name, description, execution_time, total_keys,
|
||||
differences_count, identical_count, created_at
|
||||
differences_count, identical_count, created_at, query_type
|
||||
FROM query_history
|
||||
ORDER BY created_at DESC
|
||||
''')
|
||||
@@ -678,7 +701,8 @@ def get_query_history():
|
||||
'total_keys': row['total_keys'],
|
||||
'differences_count': row['differences_count'],
|
||||
'identical_count': row['identical_count'],
|
||||
'created_at': row['created_at']
|
||||
'created_at': row['created_at'],
|
||||
'query_type': row.get('query_type', 'single')
|
||||
})
|
||||
|
||||
return history_list
|
||||
@@ -717,7 +741,10 @@ def get_query_history_by_id(history_id):
|
||||
'total_keys': row['total_keys'],
|
||||
'differences_count': row['differences_count'],
|
||||
'identical_count': row['identical_count'],
|
||||
'created_at': row['created_at']
|
||||
'created_at': row['created_at'],
|
||||
# 处理新字段,保持向后兼容
|
||||
'sharding_config': json.loads(row['sharding_config']) if row.get('sharding_config') else None,
|
||||
'query_type': row.get('query_type', 'single')
|
||||
}
|
||||
return None
|
||||
except Exception as e:
|
||||
@@ -749,7 +776,7 @@ def delete_query_history(history_id):
|
||||
conn.close()
|
||||
|
||||
def create_connection(config):
|
||||
"""创建Cassandra连接,带有增强的错误诊断"""
|
||||
"""创建Cassandra连接,带有增强的错误诊断和容错机制"""
|
||||
start_time = time.time()
|
||||
|
||||
logger.info(f"=== 开始创建Cassandra连接 ===")
|
||||
@@ -763,13 +790,49 @@ def create_connection(config):
|
||||
auth_provider = PlainTextAuthProvider(username=config['username'], password=config['password'])
|
||||
|
||||
logger.info("正在创建集群连接...")
|
||||
cluster = Cluster(config['hosts'], port=config['port'], auth_provider=auth_provider)
|
||||
# 设置连接池配置,提高容错性
|
||||
from cassandra.policies import DCAwareRoundRobinPolicy
|
||||
|
||||
# 设置负载均衡策略,避免单点故障
|
||||
load_balancing_policy = DCAwareRoundRobinPolicy(local_dc=config.get('datacenter', 'dc1'))
|
||||
|
||||
# 创建连接配置,增加容错参数
|
||||
cluster = Cluster(
|
||||
config['hosts'],
|
||||
port=config['port'],
|
||||
auth_provider=auth_provider,
|
||||
load_balancing_policy=load_balancing_policy,
|
||||
# 增加容错配置
|
||||
protocol_version=4, # 使用稳定的协议版本
|
||||
connect_timeout=15, # 连接超时
|
||||
control_connection_timeout=15, # 控制连接超时
|
||||
max_schema_agreement_wait=30 # schema同步等待时间
|
||||
)
|
||||
|
||||
logger.info("正在连接到Keyspace...")
|
||||
session = cluster.connect(config['keyspace'])
|
||||
|
||||
# 设置session级别的容错参数
|
||||
session.default_timeout = 30 # 查询超时时间
|
||||
|
||||
connection_time = time.time() - start_time
|
||||
logger.info(f"✅ Cassandra连接成功: 连接时间={connection_time:.3f}秒, 集群={cluster.metadata.cluster_name}")
|
||||
logger.info(f"✅ Cassandra连接成功: 连接时间={connection_time:.3f}秒")
|
||||
|
||||
# 记录集群状态
|
||||
try:
|
||||
cluster_name = cluster.metadata.cluster_name or "Unknown"
|
||||
logger.info(f" 集群名称: {cluster_name}")
|
||||
|
||||
# 记录可用主机状态
|
||||
live_hosts = [str(host.address) for host in cluster.metadata.all_hosts() if host.is_up]
|
||||
down_hosts = [str(host.address) for host in cluster.metadata.all_hosts() if not host.is_up]
|
||||
|
||||
logger.info(f" 可用节点: {live_hosts} ({len(live_hosts)}个)")
|
||||
if down_hosts:
|
||||
logger.warning(f" 故障节点: {down_hosts} ({len(down_hosts)}个)")
|
||||
|
||||
except Exception as meta_error:
|
||||
logger.warning(f"无法获取集群元数据: {meta_error}")
|
||||
|
||||
return cluster, session
|
||||
|
||||
@@ -1523,7 +1586,7 @@ def api_get_query_history():
|
||||
|
||||
@app.route('/api/query-history', methods=['POST'])
|
||||
def api_save_query_history():
|
||||
"""保存查询历史记录"""
|
||||
"""保存查询历史记录,支持分表查询"""
|
||||
try:
|
||||
data = request.json
|
||||
name = data.get('name', '').strip()
|
||||
@@ -1537,6 +1600,9 @@ def api_save_query_history():
|
||||
total_keys = data.get('total_keys', 0)
|
||||
differences_count = data.get('differences_count', 0)
|
||||
identical_count = data.get('identical_count', 0)
|
||||
# 新增分表相关字段
|
||||
sharding_config = data.get('sharding_config')
|
||||
query_type = data.get('query_type', 'single')
|
||||
|
||||
if not name:
|
||||
return jsonify({'success': False, 'error': '历史记录名称不能为空'}), 400
|
||||
@@ -1544,11 +1610,12 @@ def api_save_query_history():
|
||||
success = save_query_history(
|
||||
name, description, pro_config, test_config, query_config,
|
||||
query_keys, results_summary, execution_time, total_keys,
|
||||
differences_count, identical_count
|
||||
differences_count, identical_count, sharding_config, query_type
|
||||
)
|
||||
|
||||
if success:
|
||||
return jsonify({'success': True, 'message': '查询历史记录保存成功'})
|
||||
query_type_desc = '分表查询' if query_type == 'sharding' else '单表查询'
|
||||
return jsonify({'success': True, 'message': f'{query_type_desc}历史记录保存成功'})
|
||||
else:
|
||||
return jsonify({'success': False, 'error': '查询历史记录保存失败'}), 500
|
||||
|
||||
|
Reference in New Issue
Block a user