Files
BigDataTool/modules/config_manager.py
YoVinchen fe2803f3da 修复 部分 json 数据不能识别
修复 标签字段比对不完全
2025-08-14 16:32:44 +08:00

823 lines
28 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
配置管理模块
============
本模块负责BigDataTool项目的配置管理和查询历史管理提供完整的CRUD操作。
核心功能:
1. Cassandra配置组管理数据库连接配置的保存、加载、删除
2. Redis配置组管理Redis集群配置的完整生命周期管理
3. 查询历史管理:查询记录的持久化存储和检索
4. 配置解析和验证YAML格式配置的智能解析
支持的配置类型:
- Cassandra配置集群地址、认证信息、keyspace等
- Redis配置集群节点、连接参数、查询选项等
- 查询配置:主键字段、比较字段、排除字段等
- 分表配置TWCS分表参数、时间间隔、表数量等
数据存储格式:
- 所有配置以JSON格式存储在SQLite数据库中
- 支持复杂嵌套结构和数组类型
- 自动处理序列化和反序列化
- 保持数据类型完整性
设计特点:
- 类型安全:完整的参数验证和类型检查
- 事务安全:数据库操作的原子性保证
- 错误恢复:数据库异常时的优雅降级
- 向后兼容:支持旧版本配置格式的自动升级
作者BigDataTool项目组
更新时间2024年8月
"""
import json
import logging
from datetime import datetime
from .database import ensure_database, get_db_connection
logger = logging.getLogger(__name__)
def convert_bytes_to_str(obj):
"""递归转换对象中的bytes类型为字符串用于JSON序列化
Args:
obj: 需要转换的对象可以是dict, list或其他类型
Returns:
转换后的对象所有bytes类型都被转换为hex字符串
"""
if isinstance(obj, bytes):
# 将bytes转换为十六进制字符串
return obj.hex()
elif isinstance(obj, dict):
# 递归处理字典
return {key: convert_bytes_to_str(value) for key, value in obj.items()}
elif isinstance(obj, list):
# 递归处理列表
return [convert_bytes_to_str(item) for item in obj]
elif isinstance(obj, tuple):
# 递归处理元组
return tuple(convert_bytes_to_str(item) for item in obj)
else:
# 其他类型直接返回
return obj
# Cassandra数据库默认配置模板
# 注意此配置不包含敏感信息仅作为UI表单的初始模板使用
DEFAULT_CONFIG = {
'pro_config': {
'cluster_name': '',
'hosts': [],
'port': 9042,
'datacenter': '',
'username': '',
'password': '',
'keyspace': '',
'table': ''
},
'test_config': {
'cluster_name': '',
'hosts': [],
'port': 9042,
'datacenter': '',
'username': '',
'password': '',
'keyspace': '',
'table': ''
},
'keys': [],
'fields_to_compare': [],
'exclude_fields': []
}
# Redis集群默认配置模板
# 支持单节点和集群模式,自动检测连接类型
REDIS_DEFAULT_CONFIG = {
'cluster1_config': {
'name': '生产集群',
'nodes': [
{'host': '127.0.0.1', 'port': 7000}
],
'password': '',
'socket_timeout': 3,
'socket_connect_timeout': 3,
'max_connections_per_node': 16
},
'cluster2_config': {
'name': '测试集群',
'nodes': [
{'host': '127.0.0.1', 'port': 7001}
],
'password': '',
'socket_timeout': 3,
'socket_connect_timeout': 3,
'max_connections_per_node': 16
},
'query_options': {
'mode': 'random',
'count': 100,
'pattern': '*',
'source_cluster': 'cluster2',
'keys': []
}
}
def save_redis_config_group(name, description, cluster1_config, cluster2_config, query_options):
"""保存Redis配置组"""
if not ensure_database():
logger.error("数据库初始化失败")
return False
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute('''
INSERT OR REPLACE INTO redis_config_groups
(name, description, cluster1_config, cluster2_config, query_options, updated_at)
VALUES (?, ?, ?, ?, ?, ?)
''', (
name, description,
json.dumps(cluster1_config),
json.dumps(cluster2_config),
json.dumps(query_options),
datetime.now().isoformat()
))
conn.commit()
logger.info(f"Redis配置组 '{name}' 保存成功")
return True
except Exception as e:
logger.error(f"保存Redis配置组失败: {e}")
return False
finally:
conn.close()
def get_redis_config_groups():
"""获取所有Redis配置组"""
if not ensure_database():
logger.error("数据库初始化失败")
return []
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute('''
SELECT id, name, description, created_at, updated_at
FROM redis_config_groups
ORDER BY updated_at DESC
''')
rows = cursor.fetchall()
config_groups = []
for row in rows:
config_groups.append({
'id': row['id'],
'name': row['name'],
'description': row['description'],
'created_at': row['created_at'],
'updated_at': row['updated_at']
})
return config_groups
except Exception as e:
logger.error(f"获取Redis配置组失败: {e}")
return []
finally:
conn.close()
def get_redis_config_group_by_id(group_id):
"""根据ID获取Redis配置组详情"""
if not ensure_database():
logger.error("数据库初始化失败")
return None
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute('''
SELECT id, name, description, cluster1_config, cluster2_config, query_options,
created_at, updated_at
FROM redis_config_groups WHERE id = ?
''', (group_id,))
row = cursor.fetchone()
if row:
config = {
'id': row['id'],
'name': row['name'],
'description': row['description'],
'cluster1_config': json.loads(row['cluster1_config']),
'cluster2_config': json.loads(row['cluster2_config']),
'query_options': json.loads(row['query_options']),
'created_at': row['created_at'],
'updated_at': row['updated_at']
}
return config
return None
except Exception as e:
logger.error(f"获取Redis配置组详情失败: {e}")
return None
finally:
conn.close()
def delete_redis_config_group(group_id):
"""删除Redis配置组"""
if not ensure_database():
logger.error("数据库初始化失败")
return False
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute('DELETE FROM redis_config_groups WHERE id = ?', (group_id,))
conn.commit()
success = cursor.rowcount > 0
if success:
logger.info(f"Redis配置组ID {group_id} 删除成功")
return success
except Exception as e:
logger.error(f"删除Redis配置组失败: {e}")
return False
finally:
conn.close()
def save_redis_query_history(name, description, cluster1_config, cluster2_config, query_options,
query_keys, results_summary, execution_time, total_keys,
different_count, identical_count, missing_count, raw_results=None):
"""保存Redis查询历史记录返回历史记录ID"""
if not ensure_database():
logger.error("数据库初始化失败")
return None
conn = get_db_connection()
cursor = conn.cursor()
try:
# 转换可能包含bytes类型的数据
raw_results = convert_bytes_to_str(raw_results) if raw_results else None
cursor.execute('''
INSERT INTO redis_query_history
(name, description, cluster1_config, cluster2_config, query_options, query_keys,
results_summary, execution_time, total_keys, different_count, identical_count,
missing_count, raw_results)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
name, description,
json.dumps(cluster1_config),
json.dumps(cluster2_config),
json.dumps(query_options),
json.dumps(query_keys),
json.dumps(results_summary),
execution_time,
total_keys,
different_count,
identical_count,
missing_count,
json.dumps(raw_results) if raw_results else None
))
# 获取插入记录的ID
history_id = cursor.lastrowid
conn.commit()
logger.info(f"Redis查询历史记录 '{name}' 保存成功ID{history_id}")
return history_id
except Exception as e:
logger.error(f"保存Redis查询历史记录失败: {e}")
return None
finally:
conn.close()
def get_redis_query_history():
"""获取Redis查询历史记录"""
if not ensure_database():
logger.error("数据库初始化失败")
return []
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute('''
SELECT id, name, description, execution_time, total_keys,
different_count, identical_count, missing_count, created_at
FROM redis_query_history
ORDER BY created_at DESC
''')
rows = cursor.fetchall()
history_list = []
for row in rows:
history_list.append({
'id': row['id'],
'name': row['name'],
'description': row['description'],
'execution_time': row['execution_time'],
'total_keys': row['total_keys'],
'different_count': row['different_count'],
'identical_count': row['identical_count'],
'missing_count': row['missing_count'],
'created_at': row['created_at']
})
return history_list
except Exception as e:
logger.error(f"获取Redis查询历史记录失败: {e}")
return []
finally:
conn.close()
def get_redis_query_history_by_id(history_id):
"""根据ID获取Redis查询历史记录详情"""
if not ensure_database():
logger.error("数据库初始化失败")
return None
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute('''
SELECT * FROM redis_query_history WHERE id = ?
''', (history_id,))
row = cursor.fetchone()
if row:
return {
'id': row['id'],
'name': row['name'],
'description': row['description'],
'cluster1_config': json.loads(row['cluster1_config']),
'cluster2_config': json.loads(row['cluster2_config']),
'query_options': json.loads(row['query_options']),
'query_keys': json.loads(row['query_keys']),
'results_summary': json.loads(row['results_summary']),
'execution_time': row['execution_time'],
'total_keys': row['total_keys'],
'different_count': row['different_count'],
'identical_count': row['identical_count'],
'missing_count': row['missing_count'],
'created_at': row['created_at'],
'raw_results': json.loads(row['raw_results']) if row['raw_results'] else None
}
return None
except Exception as e:
logger.error(f"获取Redis查询历史记录详情失败: {e}")
return None
finally:
conn.close()
def delete_redis_query_history(history_id):
"""删除Redis查询历史记录"""
if not ensure_database():
logger.error("数据库初始化失败")
return False
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute('DELETE FROM redis_query_history WHERE id = ?', (history_id,))
conn.commit()
success = cursor.rowcount > 0
if success:
logger.info(f"Redis查询历史记录ID {history_id} 删除成功")
return success
except Exception as e:
logger.error(f"删除Redis查询历史记录失败: {e}")
return False
finally:
conn.close()
def batch_delete_redis_query_history(history_ids):
"""批量删除Redis查询历史记录"""
if not history_ids:
return {'success': True, 'message': '没有要删除的记录', 'deleted_count': 0}
if not ensure_database():
logger.error("数据库初始化失败")
return {'success': False, 'error': '数据库初始化失败', 'deleted_count': 0}
conn = get_db_connection()
cursor = conn.cursor()
try:
# 构建IN子句的占位符
placeholders = ','.join(['?' for _ in history_ids])
sql = f'DELETE FROM redis_query_history WHERE id IN ({placeholders})'
cursor.execute(sql, history_ids)
conn.commit()
deleted_count = cursor.rowcount
if deleted_count > 0:
logger.info(f"成功批量删除 {deleted_count} 条Redis查询历史记录: {history_ids}")
return {
'success': True,
'message': f'成功删除 {deleted_count} 条记录',
'deleted_count': deleted_count
}
else:
return {
'success': False,
'error': '没有找到要删除的记录',
'deleted_count': 0
}
except Exception as e:
logger.error(f"批量删除Redis查询历史记录失败: {e}")
return {
'success': False,
'error': f'删除失败: {str(e)}',
'deleted_count': 0
}
finally:
conn.close()
def parse_redis_config_from_yaml(yaml_text):
"""从YAML格式文本解析Redis配置"""
try:
config = {}
lines = yaml_text.strip().split('\n')
for line in lines:
line = line.strip()
if ':' in line:
key, value = line.split(':', 1)
key = key.strip()
value = value.strip()
# 移除引号
if value.startswith('"') and value.endswith('"'):
value = value[1:-1]
elif value.startswith("'") and value.endswith("'"):
value = value[1:-1]
config[key] = value
# 转换为Redis集群配置格式
redis_config = {
'name': config.get('clusterName', ''),
'nodes': [],
'password': config.get('clusterPassword', ''),
'socket_timeout': 3,
'socket_connect_timeout': 3,
'max_connections_per_node': 16
}
# 解析地址
cluster_address = config.get('clusterAddress', '')
if cluster_address:
if ':' in cluster_address:
host, port = cluster_address.split(':', 1)
redis_config['nodes'] = [{'host': host, 'port': int(port)}]
else:
redis_config['nodes'] = [{'host': cluster_address, 'port': 6379}]
return redis_config
except Exception as e:
logger.error(f"解析Redis配置失败: {e}")
return None
def save_config_group(name, description, pro_config, test_config, query_config, sharding_config=None):
"""保存配置组"""
if not ensure_database():
logger.error("数据库初始化失败")
return False
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute('''
INSERT OR REPLACE INTO config_groups
(name, description, pro_config, test_config, query_config, sharding_config, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', (
name, description,
json.dumps(pro_config),
json.dumps(test_config),
json.dumps(query_config),
json.dumps(sharding_config) if sharding_config else None,
datetime.now().isoformat()
))
conn.commit()
logger.info(f"配置组 '{name}' 保存成功,包含分表配置: {sharding_config is not None}")
return True
except Exception as e:
logger.error(f"保存配置组失败: {e}")
return False
finally:
conn.close()
def get_config_groups():
"""获取所有配置组"""
if not ensure_database():
logger.error("数据库初始化失败")
return []
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute('''
SELECT id, name, description, created_at, updated_at
FROM config_groups
ORDER BY updated_at DESC
''')
rows = cursor.fetchall()
config_groups = []
for row in rows:
config_groups.append({
'id': row['id'],
'name': row['name'],
'description': row['description'],
'created_at': row['created_at'],
'updated_at': row['updated_at']
})
return config_groups
except Exception as e:
logger.error(f"获取配置组失败: {e}")
return []
finally:
conn.close()
def get_config_group_by_id(group_id):
"""根据ID获取配置组详情"""
if not ensure_database():
logger.error("数据库初始化失败")
return None
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute('''
SELECT id, name, description, pro_config, test_config, query_config,
sharding_config, created_at, updated_at
FROM config_groups WHERE id = ?
''', (group_id,))
row = cursor.fetchone()
if row:
config = {
'id': row['id'],
'name': row['name'],
'description': row['description'],
'pro_config': json.loads(row['pro_config']),
'test_config': json.loads(row['test_config']),
'query_config': json.loads(row['query_config']),
'created_at': row['created_at'],
'updated_at': row['updated_at']
}
# 添加分表配置
if row['sharding_config']:
try:
config['sharding_config'] = json.loads(row['sharding_config'])
except (json.JSONDecodeError, TypeError):
config['sharding_config'] = None
else:
config['sharding_config'] = None
return config
return None
except Exception as e:
logger.error(f"获取配置组详情失败: {e}")
return None
finally:
conn.close()
def delete_config_group(group_id):
"""删除配置组"""
if not ensure_database():
logger.error("数据库初始化失败")
return False
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute('DELETE FROM config_groups WHERE id = ?', (group_id,))
conn.commit()
success = cursor.rowcount > 0
if success:
logger.info(f"配置组ID {group_id} 删除成功")
return success
except Exception as e:
logger.error(f"删除配置组失败: {e}")
return False
finally:
conn.close()
def save_query_history(name, description, pro_config, test_config, query_config, query_keys,
results_summary, execution_time, total_keys, differences_count, identical_count,
sharding_config=None, query_type='single', raw_results=None, differences_data=None, identical_data=None):
"""保存查询历史记录支持分表查询和查询结果数据返回历史记录ID"""
if not ensure_database():
logger.error("数据库初始化失败")
return None
conn = get_db_connection()
cursor = conn.cursor()
try:
# 转换可能包含bytes类型的数据
raw_results = convert_bytes_to_str(raw_results) if raw_results else None
differences_data = convert_bytes_to_str(differences_data) if differences_data else None
identical_data = convert_bytes_to_str(identical_data) if identical_data else None
cursor.execute('''
INSERT INTO query_history
(name, description, pro_config, test_config, query_config, query_keys,
results_summary, execution_time, total_keys, differences_count, identical_count,
sharding_config, query_type, raw_results, differences_data, identical_data)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
name, description,
json.dumps(pro_config),
json.dumps(test_config),
json.dumps(query_config),
json.dumps(query_keys),
json.dumps(results_summary),
execution_time,
total_keys,
differences_count,
identical_count,
json.dumps(sharding_config) if sharding_config else None,
query_type,
json.dumps(raw_results) if raw_results else None,
json.dumps(differences_data) if differences_data else None,
json.dumps(identical_data) if identical_data else None
))
# 获取插入记录的ID
history_id = cursor.lastrowid
conn.commit()
logger.info(f"查询历史记录 '{name}' 保存成功,查询类型:{query_type}ID{history_id}")
return history_id
except Exception as e:
logger.error(f"保存查询历史记录失败: {e}")
return None
finally:
conn.close()
def get_query_history():
"""获取所有查询历史记录"""
if not ensure_database():
logger.error("数据库初始化失败")
return []
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute('''
SELECT id, name, description, execution_time, total_keys,
differences_count, identical_count, created_at, query_type
FROM query_history
ORDER BY created_at DESC
''')
rows = cursor.fetchall()
history_list = []
for row in rows:
# 获取列名列表以检查字段是否存在
column_names = [desc[0] for desc in cursor.description]
history_list.append({
'id': row['id'],
'name': row['name'],
'description': row['description'],
'execution_time': row['execution_time'],
'total_keys': row['total_keys'],
'differences_count': row['differences_count'],
'identical_count': row['identical_count'],
'created_at': row['created_at'],
'query_type': row['query_type'] if 'query_type' in column_names else 'single'
})
return history_list
except Exception as e:
logger.error(f"获取查询历史记录失败: {e}")
return []
finally:
conn.close()
def get_query_history_by_id(history_id):
"""根据ID获取查询历史记录详情"""
if not ensure_database():
logger.error("数据库初始化失败")
return None
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute('''
SELECT * FROM query_history WHERE id = ?
''', (history_id,))
row = cursor.fetchone()
if row:
# 获取列名列表以检查字段是否存在
column_names = [desc[0] for desc in cursor.description]
return {
'id': row['id'],
'name': row['name'],
'description': row['description'],
'pro_config': json.loads(row['pro_config']),
'test_config': json.loads(row['test_config']),
'query_config': json.loads(row['query_config']),
'query_keys': json.loads(row['query_keys']),
'results_summary': json.loads(row['results_summary']),
'execution_time': row['execution_time'],
'total_keys': row['total_keys'],
'differences_count': row['differences_count'],
'identical_count': row['identical_count'],
'created_at': row['created_at'],
# 处理新字段,保持向后兼容
'sharding_config': json.loads(row['sharding_config']) if 'sharding_config' in column_names and row['sharding_config'] else None,
'query_type': row['query_type'] if 'query_type' in column_names else 'single',
# 添加查询结果数据支持
'raw_results': json.loads(row['raw_results']) if 'raw_results' in column_names and row['raw_results'] else None,
'differences_data': json.loads(row['differences_data']) if 'differences_data' in column_names and row['differences_data'] else None,
'identical_data': json.loads(row['identical_data']) if 'identical_data' in column_names and row['identical_data'] else None
}
return None
except Exception as e:
logger.error(f"获取查询历史记录详情失败: {e}")
return None
finally:
conn.close()
def delete_query_history(history_id):
"""删除查询历史记录"""
if not ensure_database():
logger.error("数据库初始化失败")
return False
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute('DELETE FROM query_history WHERE id = ?', (history_id,))
conn.commit()
success = cursor.rowcount > 0
if success:
logger.info(f"查询历史记录ID {history_id} 删除成功")
return success
except Exception as e:
logger.error(f"删除查询历史记录失败: {e}")
return False
finally:
conn.close()
def batch_delete_query_history(history_ids):
"""批量删除Cassandra查询历史记录"""
if not history_ids:
return {'success': True, 'message': '没有要删除的记录', 'deleted_count': 0}
if not ensure_database():
logger.error("数据库初始化失败")
return {'success': False, 'error': '数据库初始化失败', 'deleted_count': 0}
conn = get_db_connection()
cursor = conn.cursor()
try:
# 构建IN子句的占位符
placeholders = ','.join(['?' for _ in history_ids])
sql = f'DELETE FROM query_history WHERE id IN ({placeholders})'
cursor.execute(sql, history_ids)
conn.commit()
deleted_count = cursor.rowcount
if deleted_count > 0:
logger.info(f"成功批量删除 {deleted_count} 条Cassandra查询历史记录: {history_ids}")
return {
'success': True,
'message': f'成功删除 {deleted_count} 条记录',
'deleted_count': deleted_count
}
else:
return {
'success': False,
'error': '没有找到要删除的记录',
'deleted_count': 0
}
except Exception as e:
logger.error(f"批量删除Cassandra查询历史记录失败: {e}")
return {
'success': False,
'error': f'删除失败: {str(e)}',
'deleted_count': 0
}
finally:
conn.close()