823 lines
28 KiB
Python
823 lines
28 KiB
Python
"""
|
||
配置管理模块
|
||
============
|
||
|
||
本模块负责BigDataTool项目的配置管理和查询历史管理,提供完整的CRUD操作。
|
||
|
||
核心功能:
|
||
1. Cassandra配置组管理:数据库连接配置的保存、加载、删除
|
||
2. Redis配置组管理:Redis集群配置的完整生命周期管理
|
||
3. 查询历史管理:查询记录的持久化存储和检索
|
||
4. 配置解析和验证:YAML格式配置的智能解析
|
||
|
||
支持的配置类型:
|
||
- Cassandra配置:集群地址、认证信息、keyspace等
|
||
- Redis配置:集群节点、连接参数、查询选项等
|
||
- 查询配置:主键字段、比较字段、排除字段等
|
||
- 分表配置:TWCS分表参数、时间间隔、表数量等
|
||
|
||
数据存储格式:
|
||
- 所有配置以JSON格式存储在SQLite数据库中
|
||
- 支持复杂嵌套结构和数组类型
|
||
- 自动处理序列化和反序列化
|
||
- 保持数据类型完整性
|
||
|
||
设计特点:
|
||
- 类型安全:完整的参数验证和类型检查
|
||
- 事务安全:数据库操作的原子性保证
|
||
- 错误恢复:数据库异常时的优雅降级
|
||
- 向后兼容:支持旧版本配置格式的自动升级
|
||
|
||
作者:BigDataTool项目组
|
||
更新时间:2024年8月
|
||
"""
|
||
|
||
import json
|
||
import logging
|
||
from datetime import datetime
|
||
from .database import ensure_database, get_db_connection
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
def convert_bytes_to_str(obj):
|
||
"""递归转换对象中的bytes类型为字符串,用于JSON序列化
|
||
|
||
Args:
|
||
obj: 需要转换的对象(可以是dict, list或其他类型)
|
||
|
||
Returns:
|
||
转换后的对象,所有bytes类型都被转换为hex字符串
|
||
"""
|
||
if isinstance(obj, bytes):
|
||
# 将bytes转换为十六进制字符串
|
||
return obj.hex()
|
||
elif isinstance(obj, dict):
|
||
# 递归处理字典
|
||
return {key: convert_bytes_to_str(value) for key, value in obj.items()}
|
||
elif isinstance(obj, list):
|
||
# 递归处理列表
|
||
return [convert_bytes_to_str(item) for item in obj]
|
||
elif isinstance(obj, tuple):
|
||
# 递归处理元组
|
||
return tuple(convert_bytes_to_str(item) for item in obj)
|
||
else:
|
||
# 其他类型直接返回
|
||
return obj
|
||
|
||
# Cassandra数据库默认配置模板
|
||
# 注意:此配置不包含敏感信息,仅作为UI表单的初始模板使用
|
||
DEFAULT_CONFIG = {
|
||
'pro_config': {
|
||
'cluster_name': '',
|
||
'hosts': [],
|
||
'port': 9042,
|
||
'datacenter': '',
|
||
'username': '',
|
||
'password': '',
|
||
'keyspace': '',
|
||
'table': ''
|
||
},
|
||
'test_config': {
|
||
'cluster_name': '',
|
||
'hosts': [],
|
||
'port': 9042,
|
||
'datacenter': '',
|
||
'username': '',
|
||
'password': '',
|
||
'keyspace': '',
|
||
'table': ''
|
||
},
|
||
'keys': [],
|
||
'fields_to_compare': [],
|
||
'exclude_fields': []
|
||
}
|
||
|
||
# Redis集群默认配置模板
|
||
# 支持单节点和集群模式,自动检测连接类型
|
||
REDIS_DEFAULT_CONFIG = {
|
||
'cluster1_config': {
|
||
'name': '生产集群',
|
||
'nodes': [
|
||
{'host': '127.0.0.1', 'port': 7000}
|
||
],
|
||
'password': '',
|
||
'socket_timeout': 3,
|
||
'socket_connect_timeout': 3,
|
||
'max_connections_per_node': 16
|
||
},
|
||
'cluster2_config': {
|
||
'name': '测试集群',
|
||
'nodes': [
|
||
{'host': '127.0.0.1', 'port': 7001}
|
||
],
|
||
'password': '',
|
||
'socket_timeout': 3,
|
||
'socket_connect_timeout': 3,
|
||
'max_connections_per_node': 16
|
||
},
|
||
'query_options': {
|
||
'mode': 'random',
|
||
'count': 100,
|
||
'pattern': '*',
|
||
'source_cluster': 'cluster2',
|
||
'keys': []
|
||
}
|
||
}
|
||
|
||
def save_redis_config_group(name, description, cluster1_config, cluster2_config, query_options):
|
||
"""保存Redis配置组"""
|
||
if not ensure_database():
|
||
logger.error("数据库初始化失败")
|
||
return False
|
||
|
||
conn = get_db_connection()
|
||
cursor = conn.cursor()
|
||
|
||
try:
|
||
cursor.execute('''
|
||
INSERT OR REPLACE INTO redis_config_groups
|
||
(name, description, cluster1_config, cluster2_config, query_options, updated_at)
|
||
VALUES (?, ?, ?, ?, ?, ?)
|
||
''', (
|
||
name, description,
|
||
json.dumps(cluster1_config),
|
||
json.dumps(cluster2_config),
|
||
json.dumps(query_options),
|
||
datetime.now().isoformat()
|
||
))
|
||
conn.commit()
|
||
logger.info(f"Redis配置组 '{name}' 保存成功")
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"保存Redis配置组失败: {e}")
|
||
return False
|
||
finally:
|
||
conn.close()
|
||
|
||
def get_redis_config_groups():
|
||
"""获取所有Redis配置组"""
|
||
if not ensure_database():
|
||
logger.error("数据库初始化失败")
|
||
return []
|
||
|
||
conn = get_db_connection()
|
||
cursor = conn.cursor()
|
||
|
||
try:
|
||
cursor.execute('''
|
||
SELECT id, name, description, created_at, updated_at
|
||
FROM redis_config_groups
|
||
ORDER BY updated_at DESC
|
||
''')
|
||
rows = cursor.fetchall()
|
||
|
||
config_groups = []
|
||
for row in rows:
|
||
config_groups.append({
|
||
'id': row['id'],
|
||
'name': row['name'],
|
||
'description': row['description'],
|
||
'created_at': row['created_at'],
|
||
'updated_at': row['updated_at']
|
||
})
|
||
|
||
return config_groups
|
||
except Exception as e:
|
||
logger.error(f"获取Redis配置组失败: {e}")
|
||
return []
|
||
finally:
|
||
conn.close()
|
||
|
||
def get_redis_config_group_by_id(group_id):
|
||
"""根据ID获取Redis配置组详情"""
|
||
if not ensure_database():
|
||
logger.error("数据库初始化失败")
|
||
return None
|
||
|
||
conn = get_db_connection()
|
||
cursor = conn.cursor()
|
||
|
||
try:
|
||
cursor.execute('''
|
||
SELECT id, name, description, cluster1_config, cluster2_config, query_options,
|
||
created_at, updated_at
|
||
FROM redis_config_groups WHERE id = ?
|
||
''', (group_id,))
|
||
row = cursor.fetchone()
|
||
|
||
if row:
|
||
config = {
|
||
'id': row['id'],
|
||
'name': row['name'],
|
||
'description': row['description'],
|
||
'cluster1_config': json.loads(row['cluster1_config']),
|
||
'cluster2_config': json.loads(row['cluster2_config']),
|
||
'query_options': json.loads(row['query_options']),
|
||
'created_at': row['created_at'],
|
||
'updated_at': row['updated_at']
|
||
}
|
||
return config
|
||
return None
|
||
except Exception as e:
|
||
logger.error(f"获取Redis配置组详情失败: {e}")
|
||
return None
|
||
finally:
|
||
conn.close()
|
||
|
||
def delete_redis_config_group(group_id):
|
||
"""删除Redis配置组"""
|
||
if not ensure_database():
|
||
logger.error("数据库初始化失败")
|
||
return False
|
||
|
||
conn = get_db_connection()
|
||
cursor = conn.cursor()
|
||
|
||
try:
|
||
cursor.execute('DELETE FROM redis_config_groups WHERE id = ?', (group_id,))
|
||
conn.commit()
|
||
success = cursor.rowcount > 0
|
||
if success:
|
||
logger.info(f"Redis配置组ID {group_id} 删除成功")
|
||
return success
|
||
except Exception as e:
|
||
logger.error(f"删除Redis配置组失败: {e}")
|
||
return False
|
||
finally:
|
||
conn.close()
|
||
|
||
def save_redis_query_history(name, description, cluster1_config, cluster2_config, query_options,
|
||
query_keys, results_summary, execution_time, total_keys,
|
||
different_count, identical_count, missing_count, raw_results=None):
|
||
"""保存Redis查询历史记录,返回历史记录ID"""
|
||
if not ensure_database():
|
||
logger.error("数据库初始化失败")
|
||
return None
|
||
|
||
conn = get_db_connection()
|
||
cursor = conn.cursor()
|
||
|
||
try:
|
||
# 转换可能包含bytes类型的数据
|
||
raw_results = convert_bytes_to_str(raw_results) if raw_results else None
|
||
|
||
cursor.execute('''
|
||
INSERT INTO redis_query_history
|
||
(name, description, cluster1_config, cluster2_config, query_options, query_keys,
|
||
results_summary, execution_time, total_keys, different_count, identical_count,
|
||
missing_count, raw_results)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||
''', (
|
||
name, description,
|
||
json.dumps(cluster1_config),
|
||
json.dumps(cluster2_config),
|
||
json.dumps(query_options),
|
||
json.dumps(query_keys),
|
||
json.dumps(results_summary),
|
||
execution_time,
|
||
total_keys,
|
||
different_count,
|
||
identical_count,
|
||
missing_count,
|
||
json.dumps(raw_results) if raw_results else None
|
||
))
|
||
|
||
# 获取插入记录的ID
|
||
history_id = cursor.lastrowid
|
||
conn.commit()
|
||
logger.info(f"Redis查询历史记录 '{name}' 保存成功,ID:{history_id}")
|
||
return history_id
|
||
except Exception as e:
|
||
logger.error(f"保存Redis查询历史记录失败: {e}")
|
||
return None
|
||
finally:
|
||
conn.close()
|
||
|
||
def get_redis_query_history():
|
||
"""获取Redis查询历史记录"""
|
||
if not ensure_database():
|
||
logger.error("数据库初始化失败")
|
||
return []
|
||
|
||
conn = get_db_connection()
|
||
cursor = conn.cursor()
|
||
|
||
try:
|
||
cursor.execute('''
|
||
SELECT id, name, description, execution_time, total_keys,
|
||
different_count, identical_count, missing_count, created_at
|
||
FROM redis_query_history
|
||
ORDER BY created_at DESC
|
||
''')
|
||
rows = cursor.fetchall()
|
||
|
||
history_list = []
|
||
for row in rows:
|
||
history_list.append({
|
||
'id': row['id'],
|
||
'name': row['name'],
|
||
'description': row['description'],
|
||
'execution_time': row['execution_time'],
|
||
'total_keys': row['total_keys'],
|
||
'different_count': row['different_count'],
|
||
'identical_count': row['identical_count'],
|
||
'missing_count': row['missing_count'],
|
||
'created_at': row['created_at']
|
||
})
|
||
|
||
return history_list
|
||
except Exception as e:
|
||
logger.error(f"获取Redis查询历史记录失败: {e}")
|
||
return []
|
||
finally:
|
||
conn.close()
|
||
|
||
def get_redis_query_history_by_id(history_id):
|
||
"""根据ID获取Redis查询历史记录详情"""
|
||
if not ensure_database():
|
||
logger.error("数据库初始化失败")
|
||
return None
|
||
|
||
conn = get_db_connection()
|
||
cursor = conn.cursor()
|
||
|
||
try:
|
||
cursor.execute('''
|
||
SELECT * FROM redis_query_history WHERE id = ?
|
||
''', (history_id,))
|
||
row = cursor.fetchone()
|
||
|
||
if row:
|
||
return {
|
||
'id': row['id'],
|
||
'name': row['name'],
|
||
'description': row['description'],
|
||
'cluster1_config': json.loads(row['cluster1_config']),
|
||
'cluster2_config': json.loads(row['cluster2_config']),
|
||
'query_options': json.loads(row['query_options']),
|
||
'query_keys': json.loads(row['query_keys']),
|
||
'results_summary': json.loads(row['results_summary']),
|
||
'execution_time': row['execution_time'],
|
||
'total_keys': row['total_keys'],
|
||
'different_count': row['different_count'],
|
||
'identical_count': row['identical_count'],
|
||
'missing_count': row['missing_count'],
|
||
'created_at': row['created_at'],
|
||
'raw_results': json.loads(row['raw_results']) if row['raw_results'] else None
|
||
}
|
||
return None
|
||
except Exception as e:
|
||
logger.error(f"获取Redis查询历史记录详情失败: {e}")
|
||
return None
|
||
finally:
|
||
conn.close()
|
||
|
||
def delete_redis_query_history(history_id):
|
||
"""删除Redis查询历史记录"""
|
||
if not ensure_database():
|
||
logger.error("数据库初始化失败")
|
||
return False
|
||
|
||
conn = get_db_connection()
|
||
cursor = conn.cursor()
|
||
|
||
try:
|
||
cursor.execute('DELETE FROM redis_query_history WHERE id = ?', (history_id,))
|
||
conn.commit()
|
||
success = cursor.rowcount > 0
|
||
if success:
|
||
logger.info(f"Redis查询历史记录ID {history_id} 删除成功")
|
||
return success
|
||
except Exception as e:
|
||
logger.error(f"删除Redis查询历史记录失败: {e}")
|
||
return False
|
||
finally:
|
||
conn.close()
|
||
|
||
def batch_delete_redis_query_history(history_ids):
|
||
"""批量删除Redis查询历史记录"""
|
||
if not history_ids:
|
||
return {'success': True, 'message': '没有要删除的记录', 'deleted_count': 0}
|
||
|
||
if not ensure_database():
|
||
logger.error("数据库初始化失败")
|
||
return {'success': False, 'error': '数据库初始化失败', 'deleted_count': 0}
|
||
|
||
conn = get_db_connection()
|
||
cursor = conn.cursor()
|
||
|
||
try:
|
||
# 构建IN子句的占位符
|
||
placeholders = ','.join(['?' for _ in history_ids])
|
||
sql = f'DELETE FROM redis_query_history WHERE id IN ({placeholders})'
|
||
|
||
cursor.execute(sql, history_ids)
|
||
conn.commit()
|
||
deleted_count = cursor.rowcount
|
||
|
||
if deleted_count > 0:
|
||
logger.info(f"成功批量删除 {deleted_count} 条Redis查询历史记录: {history_ids}")
|
||
return {
|
||
'success': True,
|
||
'message': f'成功删除 {deleted_count} 条记录',
|
||
'deleted_count': deleted_count
|
||
}
|
||
else:
|
||
return {
|
||
'success': False,
|
||
'error': '没有找到要删除的记录',
|
||
'deleted_count': 0
|
||
}
|
||
except Exception as e:
|
||
logger.error(f"批量删除Redis查询历史记录失败: {e}")
|
||
return {
|
||
'success': False,
|
||
'error': f'删除失败: {str(e)}',
|
||
'deleted_count': 0
|
||
}
|
||
finally:
|
||
conn.close()
|
||
|
||
def parse_redis_config_from_yaml(yaml_text):
|
||
"""从YAML格式文本解析Redis配置"""
|
||
try:
|
||
config = {}
|
||
lines = yaml_text.strip().split('\n')
|
||
|
||
for line in lines:
|
||
line = line.strip()
|
||
if ':' in line:
|
||
key, value = line.split(':', 1)
|
||
key = key.strip()
|
||
value = value.strip()
|
||
|
||
# 移除引号
|
||
if value.startswith('"') and value.endswith('"'):
|
||
value = value[1:-1]
|
||
elif value.startswith("'") and value.endswith("'"):
|
||
value = value[1:-1]
|
||
|
||
config[key] = value
|
||
|
||
# 转换为Redis集群配置格式
|
||
redis_config = {
|
||
'name': config.get('clusterName', ''),
|
||
'nodes': [],
|
||
'password': config.get('clusterPassword', ''),
|
||
'socket_timeout': 3,
|
||
'socket_connect_timeout': 3,
|
||
'max_connections_per_node': 16
|
||
}
|
||
|
||
# 解析地址
|
||
cluster_address = config.get('clusterAddress', '')
|
||
if cluster_address:
|
||
if ':' in cluster_address:
|
||
host, port = cluster_address.split(':', 1)
|
||
redis_config['nodes'] = [{'host': host, 'port': int(port)}]
|
||
else:
|
||
redis_config['nodes'] = [{'host': cluster_address, 'port': 6379}]
|
||
|
||
return redis_config
|
||
except Exception as e:
|
||
logger.error(f"解析Redis配置失败: {e}")
|
||
return None
|
||
|
||
def save_config_group(name, description, pro_config, test_config, query_config, sharding_config=None):
|
||
"""保存配置组"""
|
||
if not ensure_database():
|
||
logger.error("数据库初始化失败")
|
||
return False
|
||
|
||
conn = get_db_connection()
|
||
cursor = conn.cursor()
|
||
|
||
try:
|
||
cursor.execute('''
|
||
INSERT OR REPLACE INTO config_groups
|
||
(name, description, pro_config, test_config, query_config, sharding_config, updated_at)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?)
|
||
''', (
|
||
name, description,
|
||
json.dumps(pro_config),
|
||
json.dumps(test_config),
|
||
json.dumps(query_config),
|
||
json.dumps(sharding_config) if sharding_config else None,
|
||
datetime.now().isoformat()
|
||
))
|
||
conn.commit()
|
||
logger.info(f"配置组 '{name}' 保存成功,包含分表配置: {sharding_config is not None}")
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"保存配置组失败: {e}")
|
||
return False
|
||
finally:
|
||
conn.close()
|
||
|
||
def get_config_groups():
|
||
"""获取所有配置组"""
|
||
if not ensure_database():
|
||
logger.error("数据库初始化失败")
|
||
return []
|
||
|
||
conn = get_db_connection()
|
||
cursor = conn.cursor()
|
||
|
||
try:
|
||
cursor.execute('''
|
||
SELECT id, name, description, created_at, updated_at
|
||
FROM config_groups
|
||
ORDER BY updated_at DESC
|
||
''')
|
||
rows = cursor.fetchall()
|
||
|
||
config_groups = []
|
||
for row in rows:
|
||
config_groups.append({
|
||
'id': row['id'],
|
||
'name': row['name'],
|
||
'description': row['description'],
|
||
'created_at': row['created_at'],
|
||
'updated_at': row['updated_at']
|
||
})
|
||
|
||
return config_groups
|
||
except Exception as e:
|
||
logger.error(f"获取配置组失败: {e}")
|
||
return []
|
||
finally:
|
||
conn.close()
|
||
|
||
def get_config_group_by_id(group_id):
|
||
"""根据ID获取配置组详情"""
|
||
if not ensure_database():
|
||
logger.error("数据库初始化失败")
|
||
return None
|
||
|
||
conn = get_db_connection()
|
||
cursor = conn.cursor()
|
||
|
||
try:
|
||
cursor.execute('''
|
||
SELECT id, name, description, pro_config, test_config, query_config,
|
||
sharding_config, created_at, updated_at
|
||
FROM config_groups WHERE id = ?
|
||
''', (group_id,))
|
||
row = cursor.fetchone()
|
||
|
||
if row:
|
||
config = {
|
||
'id': row['id'],
|
||
'name': row['name'],
|
||
'description': row['description'],
|
||
'pro_config': json.loads(row['pro_config']),
|
||
'test_config': json.loads(row['test_config']),
|
||
'query_config': json.loads(row['query_config']),
|
||
'created_at': row['created_at'],
|
||
'updated_at': row['updated_at']
|
||
}
|
||
|
||
# 添加分表配置
|
||
if row['sharding_config']:
|
||
try:
|
||
config['sharding_config'] = json.loads(row['sharding_config'])
|
||
except (json.JSONDecodeError, TypeError):
|
||
config['sharding_config'] = None
|
||
else:
|
||
config['sharding_config'] = None
|
||
|
||
return config
|
||
return None
|
||
except Exception as e:
|
||
logger.error(f"获取配置组详情失败: {e}")
|
||
return None
|
||
finally:
|
||
conn.close()
|
||
|
||
def delete_config_group(group_id):
|
||
"""删除配置组"""
|
||
if not ensure_database():
|
||
logger.error("数据库初始化失败")
|
||
return False
|
||
|
||
conn = get_db_connection()
|
||
cursor = conn.cursor()
|
||
|
||
try:
|
||
cursor.execute('DELETE FROM config_groups WHERE id = ?', (group_id,))
|
||
conn.commit()
|
||
success = cursor.rowcount > 0
|
||
if success:
|
||
logger.info(f"配置组ID {group_id} 删除成功")
|
||
return success
|
||
except Exception as e:
|
||
logger.error(f"删除配置组失败: {e}")
|
||
return False
|
||
finally:
|
||
conn.close()
|
||
|
||
def save_query_history(name, description, pro_config, test_config, query_config, query_keys,
|
||
results_summary, execution_time, total_keys, differences_count, identical_count,
|
||
sharding_config=None, query_type='single', raw_results=None, differences_data=None, identical_data=None):
|
||
"""保存查询历史记录,支持分表查询和查询结果数据,返回历史记录ID"""
|
||
if not ensure_database():
|
||
logger.error("数据库初始化失败")
|
||
return None
|
||
|
||
conn = get_db_connection()
|
||
cursor = conn.cursor()
|
||
|
||
try:
|
||
# 转换可能包含bytes类型的数据
|
||
raw_results = convert_bytes_to_str(raw_results) if raw_results else None
|
||
differences_data = convert_bytes_to_str(differences_data) if differences_data else None
|
||
identical_data = convert_bytes_to_str(identical_data) if identical_data else None
|
||
|
||
cursor.execute('''
|
||
INSERT INTO query_history
|
||
(name, description, pro_config, test_config, query_config, query_keys,
|
||
results_summary, execution_time, total_keys, differences_count, identical_count,
|
||
sharding_config, query_type, raw_results, differences_data, identical_data)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||
''', (
|
||
name, description,
|
||
json.dumps(pro_config),
|
||
json.dumps(test_config),
|
||
json.dumps(query_config),
|
||
json.dumps(query_keys),
|
||
json.dumps(results_summary),
|
||
execution_time,
|
||
total_keys,
|
||
differences_count,
|
||
identical_count,
|
||
json.dumps(sharding_config) if sharding_config else None,
|
||
query_type,
|
||
json.dumps(raw_results) if raw_results else None,
|
||
json.dumps(differences_data) if differences_data else None,
|
||
json.dumps(identical_data) if identical_data else None
|
||
))
|
||
|
||
# 获取插入记录的ID
|
||
history_id = cursor.lastrowid
|
||
conn.commit()
|
||
logger.info(f"查询历史记录 '{name}' 保存成功,查询类型:{query_type},ID:{history_id}")
|
||
return history_id
|
||
except Exception as e:
|
||
logger.error(f"保存查询历史记录失败: {e}")
|
||
return None
|
||
finally:
|
||
conn.close()
|
||
|
||
def get_query_history():
|
||
"""获取所有查询历史记录"""
|
||
if not ensure_database():
|
||
logger.error("数据库初始化失败")
|
||
return []
|
||
|
||
conn = get_db_connection()
|
||
cursor = conn.cursor()
|
||
|
||
try:
|
||
cursor.execute('''
|
||
SELECT id, name, description, execution_time, total_keys,
|
||
differences_count, identical_count, created_at, query_type
|
||
FROM query_history
|
||
ORDER BY created_at DESC
|
||
''')
|
||
rows = cursor.fetchall()
|
||
|
||
history_list = []
|
||
for row in rows:
|
||
# 获取列名列表以检查字段是否存在
|
||
column_names = [desc[0] for desc in cursor.description]
|
||
history_list.append({
|
||
'id': row['id'],
|
||
'name': row['name'],
|
||
'description': row['description'],
|
||
'execution_time': row['execution_time'],
|
||
'total_keys': row['total_keys'],
|
||
'differences_count': row['differences_count'],
|
||
'identical_count': row['identical_count'],
|
||
'created_at': row['created_at'],
|
||
'query_type': row['query_type'] if 'query_type' in column_names else 'single'
|
||
})
|
||
|
||
return history_list
|
||
except Exception as e:
|
||
logger.error(f"获取查询历史记录失败: {e}")
|
||
return []
|
||
finally:
|
||
conn.close()
|
||
|
||
def get_query_history_by_id(history_id):
|
||
"""根据ID获取查询历史记录详情"""
|
||
if not ensure_database():
|
||
logger.error("数据库初始化失败")
|
||
return None
|
||
|
||
conn = get_db_connection()
|
||
cursor = conn.cursor()
|
||
|
||
try:
|
||
cursor.execute('''
|
||
SELECT * FROM query_history WHERE id = ?
|
||
''', (history_id,))
|
||
row = cursor.fetchone()
|
||
|
||
if row:
|
||
# 获取列名列表以检查字段是否存在
|
||
column_names = [desc[0] for desc in cursor.description]
|
||
return {
|
||
'id': row['id'],
|
||
'name': row['name'],
|
||
'description': row['description'],
|
||
'pro_config': json.loads(row['pro_config']),
|
||
'test_config': json.loads(row['test_config']),
|
||
'query_config': json.loads(row['query_config']),
|
||
'query_keys': json.loads(row['query_keys']),
|
||
'results_summary': json.loads(row['results_summary']),
|
||
'execution_time': row['execution_time'],
|
||
'total_keys': row['total_keys'],
|
||
'differences_count': row['differences_count'],
|
||
'identical_count': row['identical_count'],
|
||
'created_at': row['created_at'],
|
||
# 处理新字段,保持向后兼容
|
||
'sharding_config': json.loads(row['sharding_config']) if 'sharding_config' in column_names and row['sharding_config'] else None,
|
||
'query_type': row['query_type'] if 'query_type' in column_names else 'single',
|
||
# 添加查询结果数据支持
|
||
'raw_results': json.loads(row['raw_results']) if 'raw_results' in column_names and row['raw_results'] else None,
|
||
'differences_data': json.loads(row['differences_data']) if 'differences_data' in column_names and row['differences_data'] else None,
|
||
'identical_data': json.loads(row['identical_data']) if 'identical_data' in column_names and row['identical_data'] else None
|
||
}
|
||
return None
|
||
except Exception as e:
|
||
logger.error(f"获取查询历史记录详情失败: {e}")
|
||
return None
|
||
finally:
|
||
conn.close()
|
||
|
||
def delete_query_history(history_id):
|
||
"""删除查询历史记录"""
|
||
if not ensure_database():
|
||
logger.error("数据库初始化失败")
|
||
return False
|
||
|
||
conn = get_db_connection()
|
||
cursor = conn.cursor()
|
||
|
||
try:
|
||
cursor.execute('DELETE FROM query_history WHERE id = ?', (history_id,))
|
||
conn.commit()
|
||
success = cursor.rowcount > 0
|
||
if success:
|
||
logger.info(f"查询历史记录ID {history_id} 删除成功")
|
||
return success
|
||
except Exception as e:
|
||
logger.error(f"删除查询历史记录失败: {e}")
|
||
return False
|
||
finally:
|
||
conn.close()
|
||
|
||
def batch_delete_query_history(history_ids):
|
||
"""批量删除Cassandra查询历史记录"""
|
||
if not history_ids:
|
||
return {'success': True, 'message': '没有要删除的记录', 'deleted_count': 0}
|
||
|
||
if not ensure_database():
|
||
logger.error("数据库初始化失败")
|
||
return {'success': False, 'error': '数据库初始化失败', 'deleted_count': 0}
|
||
|
||
conn = get_db_connection()
|
||
cursor = conn.cursor()
|
||
|
||
try:
|
||
# 构建IN子句的占位符
|
||
placeholders = ','.join(['?' for _ in history_ids])
|
||
sql = f'DELETE FROM query_history WHERE id IN ({placeholders})'
|
||
|
||
cursor.execute(sql, history_ids)
|
||
conn.commit()
|
||
deleted_count = cursor.rowcount
|
||
|
||
if deleted_count > 0:
|
||
logger.info(f"成功批量删除 {deleted_count} 条Cassandra查询历史记录: {history_ids}")
|
||
return {
|
||
'success': True,
|
||
'message': f'成功删除 {deleted_count} 条记录',
|
||
'deleted_count': deleted_count
|
||
}
|
||
else:
|
||
return {
|
||
'success': False,
|
||
'error': '没有找到要删除的记录',
|
||
'deleted_count': 0
|
||
}
|
||
except Exception as e:
|
||
logger.error(f"批量删除Cassandra查询历史记录失败: {e}")
|
||
return {
|
||
'success': False,
|
||
'error': f'删除失败: {str(e)}',
|
||
'deleted_count': 0
|
||
}
|
||
finally:
|
||
conn.close() |