Files
BigDataTool/app.py
2025-07-31 21:17:00 +08:00

774 lines
28 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask import Flask, render_template, request, jsonify
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
import json
import os
import logging
import sqlite3
from datetime import datetime
app = Flask(__name__)
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# 数据库配置
DATABASE_PATH = 'config_groups.db'
def init_database():
"""初始化数据库"""
try:
conn = sqlite3.connect(DATABASE_PATH)
cursor = conn.cursor()
# 创建配置组表
cursor.execute('''
CREATE TABLE IF NOT EXISTS config_groups (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE,
description TEXT,
pro_config TEXT NOT NULL,
test_config TEXT NOT NULL,
query_config TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 创建查询历史表
cursor.execute('''
CREATE TABLE IF NOT EXISTS query_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
description TEXT,
pro_config TEXT NOT NULL,
test_config TEXT NOT NULL,
query_config TEXT NOT NULL,
query_keys TEXT NOT NULL,
results_summary TEXT NOT NULL,
execution_time REAL NOT NULL,
total_keys INTEGER NOT NULL,
differences_count INTEGER NOT NULL,
identical_count INTEGER NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
conn.close()
logger.info("数据库初始化完成")
return True
except Exception as e:
logger.error(f"数据库初始化失败: {e}")
return False
def ensure_database():
"""确保数据库和表存在"""
if not os.path.exists(DATABASE_PATH):
logger.info("数据库文件不存在,正在创建...")
return init_database()
# 检查表是否存在
try:
conn = sqlite3.connect(DATABASE_PATH)
cursor = conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='config_groups'")
result = cursor.fetchone()
conn.close()
if not result:
logger.info("config_groups表不存在正在创建...")
return init_database()
return True
except Exception as e:
logger.error(f"检查数据库表失败: {e}")
return init_database()
def get_db_connection():
"""获取数据库连接"""
conn = sqlite3.connect(DATABASE_PATH)
conn.row_factory = sqlite3.Row
return conn
def normalize_json_string(value):
"""标准化JSON字符串用于比较"""
if not isinstance(value, str):
return value
try:
# 尝试解析JSON
json_obj = json.loads(value)
# 如果是数组,需要进行特殊处理
if isinstance(json_obj, list):
# 尝试对数组元素进行标准化排序
normalized_array = normalize_json_array(json_obj)
return json.dumps(normalized_array, sort_keys=True, separators=(',', ':'))
else:
# 普通对象,直接序列化
return json.dumps(json_obj, sort_keys=True, separators=(',', ':'))
except (json.JSONDecodeError, TypeError):
# 如果不是JSON返回原值
return value
def normalize_json_array(json_array):
"""标准化JSON数组处理元素顺序问题"""
try:
normalized_elements = []
for element in json_array:
if isinstance(element, dict):
# 对字典元素进行标准化
normalized_elements.append(json.dumps(element, sort_keys=True, separators=(',', ':')))
elif isinstance(element, str):
# 如果是字符串尝试解析为JSON
try:
parsed_element = json.loads(element)
normalized_elements.append(json.dumps(parsed_element, sort_keys=True, separators=(',', ':')))
except:
normalized_elements.append(element)
else:
normalized_elements.append(element)
# 对标准化后的元素进行排序,确保顺序一致
normalized_elements.sort()
# 重新解析为对象数组
result_array = []
for element in normalized_elements:
if isinstance(element, str):
try:
result_array.append(json.loads(element))
except:
result_array.append(element)
else:
result_array.append(element)
return result_array
except Exception as e:
logger.warning(f"数组标准化失败: {e}")
return json_array
def is_json_array_field(value):
"""检查字段是否为JSON数组格式"""
if not isinstance(value, (str, list)):
return False
try:
if isinstance(value, str):
parsed = json.loads(value)
return isinstance(parsed, list)
elif isinstance(value, list):
# 检查是否为JSON字符串数组
if len(value) > 0 and isinstance(value[0], str):
try:
json.loads(value[0])
return True
except:
return False
return True
except:
return False
def compare_array_values(value1, value2):
"""专门用于比较数组类型的值"""
try:
# 处理字符串表示的数组
if isinstance(value1, str) and isinstance(value2, str):
try:
array1 = json.loads(value1)
array2 = json.loads(value2)
if isinstance(array1, list) and isinstance(array2, list):
return compare_json_arrays(array1, array2)
except:
pass
# 处理Python列表类型
elif isinstance(value1, list) and isinstance(value2, list):
return compare_json_arrays(value1, value2)
# 处理混合情况:一个是字符串数组,一个是列表
elif isinstance(value1, list) and isinstance(value2, str):
try:
array2 = json.loads(value2)
if isinstance(array2, list):
return compare_json_arrays(value1, array2)
except:
pass
elif isinstance(value1, str) and isinstance(value2, list):
try:
array1 = json.loads(value1)
if isinstance(array1, list):
return compare_json_arrays(array1, value2)
except:
pass
return False
except Exception as e:
logger.warning(f"数组比较失败: {e}")
return False
def compare_json_arrays(array1, array2):
"""比较两个JSON数组忽略元素顺序"""
try:
if len(array1) != len(array2):
return False
# 标准化两个数组
normalized_array1 = normalize_json_array(array1.copy())
normalized_array2 = normalize_json_array(array2.copy())
# 将标准化后的数组转换为可比较的格式
comparable1 = json.dumps(normalized_array1, sort_keys=True)
comparable2 = json.dumps(normalized_array2, sort_keys=True)
return comparable1 == comparable2
except Exception as e:
logger.warning(f"JSON数组比较失败: {e}")
return False
def format_json_for_display(value):
"""格式化JSON用于显示"""
if not isinstance(value, str):
return str(value)
try:
# 尝试解析JSON
json_obj = json.loads(value)
# 格式化显示(带缩进)
return json.dumps(json_obj, sort_keys=True, indent=2, ensure_ascii=False)
except (json.JSONDecodeError, TypeError):
# 如果不是JSON返回原值
return str(value)
def is_json_field(value):
"""检查字段是否为JSON格式"""
if not isinstance(value, str):
return False
try:
json.loads(value)
return True
except (json.JSONDecodeError, TypeError):
return False
def compare_values(value1, value2):
"""智能比较两个值支持JSON标准化和数组比较"""
# 首先检查是否为数组类型
if is_json_array_field(value1) or is_json_array_field(value2):
return compare_array_values(value1, value2)
# 如果两个值都是字符串尝试JSON标准化比较
if isinstance(value1, str) and isinstance(value2, str):
normalized_value1 = normalize_json_string(value1)
normalized_value2 = normalize_json_string(value2)
return normalized_value1 == normalized_value2
# 其他情况直接比较
return value1 == value2
# 默认配置(不显示敏感信息)
DEFAULT_CONFIG = {
'pro_config': {
'cluster_name': '',
'hosts': [],
'port': 9042,
'datacenter': '',
'username': '',
'password': '',
'keyspace': '',
'table': ''
},
'test_config': {
'cluster_name': '',
'hosts': [],
'port': 9042,
'datacenter': '',
'username': '',
'password': '',
'keyspace': '',
'table': ''
},
'keys': ['docid'],
'fields_to_compare': [],
'exclude_fields': []
}
def save_config_group(name, description, pro_config, test_config, query_config):
"""保存配置组"""
if not ensure_database():
logger.error("数据库初始化失败")
return False
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute('''
INSERT OR REPLACE INTO config_groups
(name, description, pro_config, test_config, query_config, updated_at)
VALUES (?, ?, ?, ?, ?, ?)
''', (
name, description,
json.dumps(pro_config),
json.dumps(test_config),
json.dumps(query_config),
datetime.now().isoformat()
))
conn.commit()
logger.info(f"配置组 '{name}' 保存成功")
return True
except Exception as e:
logger.error(f"保存配置组失败: {e}")
return False
finally:
conn.close()
def get_config_groups():
"""获取所有配置组"""
if not ensure_database():
logger.error("数据库初始化失败")
return []
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute('''
SELECT id, name, description, created_at, updated_at
FROM config_groups
ORDER BY updated_at DESC
''')
rows = cursor.fetchall()
config_groups = []
for row in rows:
config_groups.append({
'id': row['id'],
'name': row['name'],
'description': row['description'],
'created_at': row['created_at'],
'updated_at': row['updated_at']
})
return config_groups
except Exception as e:
logger.error(f"获取配置组失败: {e}")
return []
finally:
conn.close()
def get_config_group_by_id(group_id):
"""根据ID获取配置组详情"""
if not ensure_database():
logger.error("数据库初始化失败")
return None
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute('''
SELECT * FROM config_groups WHERE id = ?
''', (group_id,))
row = cursor.fetchone()
if row:
return {
'id': row['id'],
'name': row['name'],
'description': row['description'],
'pro_config': json.loads(row['pro_config']),
'test_config': json.loads(row['test_config']),
'query_config': json.loads(row['query_config']),
'created_at': row['created_at'],
'updated_at': row['updated_at']
}
return None
except Exception as e:
logger.error(f"获取配置组详情失败: {e}")
return None
finally:
conn.close()
def delete_config_group(group_id):
"""删除配置组"""
if not ensure_database():
logger.error("数据库初始化失败")
return False
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute('DELETE FROM config_groups WHERE id = ?', (group_id,))
conn.commit()
success = cursor.rowcount > 0
if success:
logger.info(f"配置组ID {group_id} 删除成功")
return success
except Exception as e:
logger.error(f"删除配置组失败: {e}")
return False
finally:
conn.close()
def create_connection(config):
"""创建Cassandra连接"""
try:
auth_provider = PlainTextAuthProvider(username=config['username'], password=config['password'])
cluster = Cluster(config['hosts'], port=config['port'], auth_provider=auth_provider)
session = cluster.connect(config['keyspace'])
return cluster, session
except Exception as e:
return None, None
def execute_query(session, table, keys, fields, values, exclude_fields=None):
"""执行查询"""
try:
# 构建查询条件
quoted_values = [f"'{value}'" for value in values]
query_conditions = f"{keys[0]} IN ({', '.join(quoted_values)})"
# 确定要查询的字段
if fields:
fields_str = ", ".join(fields)
else:
fields_str = "*"
query_sql = f"SELECT {fields_str} FROM {table} WHERE {query_conditions};"
result = session.execute(query_sql)
return list(result) if result else []
except Exception as e:
return []
def compare_results(pro_data, test_data, keys, fields_to_compare, exclude_fields, values):
"""比较查询结果"""
differences = []
field_diff_count = {}
identical_results = [] # 存储相同的结果
for value in values:
# 查找原表和测试表中该ID的相关数据
rows_pro = [row for row in pro_data if getattr(row, keys[0]) == value]
rows_test = [row for row in test_data if getattr(row, keys[0]) == value]
for row_pro in rows_pro:
# 在测试表中查找相同主键的行
row_test = next(
(row for row in rows_test if all(getattr(row, key) == getattr(row_pro, key) for key in keys)),
None
)
if row_test:
# 确定要比较的列
columns = fields_to_compare if fields_to_compare else row_pro._fields
columns = [col for col in columns if col not in exclude_fields]
has_difference = False
row_differences = []
identical_fields = {}
for column in columns:
value_pro = getattr(row_pro, column)
value_test = getattr(row_test, column)
# 使用智能比较函数
if not compare_values(value_pro, value_test):
has_difference = True
# 格式化显示值
formatted_pro_value = format_json_for_display(value_pro)
formatted_test_value = format_json_for_display(value_test)
row_differences.append({
'key': {key: getattr(row_pro, key) for key in keys},
'field': column,
'pro_value': formatted_pro_value,
'test_value': formatted_test_value,
'is_json': is_json_field(value_pro) or is_json_field(value_test),
'is_array': is_json_array_field(value_pro) or is_json_array_field(value_test)
})
# 统计字段差异次数
field_diff_count[column] = field_diff_count.get(column, 0) + 1
else:
# 存储相同的字段值
identical_fields[column] = format_json_for_display(value_pro)
if has_difference:
differences.extend(row_differences)
else:
# 如果没有差异,存储到相同结果中
identical_results.append({
'key': {key: getattr(row_pro, key) for key in keys},
'pro_fields': identical_fields,
'test_fields': {col: format_json_for_display(getattr(row_test, col)) for col in columns}
})
else:
# 在测试表中未找到对应行
differences.append({
'key': {key: getattr(row_pro, key) for key in keys},
'message': '在测试表中未找到该行'
})
# 检查测试表中是否有生产表中不存在的行
for row_test in rows_test:
row_pro = next(
(row for row in rows_pro if all(getattr(row, key) == getattr(row_test, key) for key in keys)),
None
)
if not row_pro:
differences.append({
'key': {key: getattr(row_test, key) for key in keys},
'message': '在生产表中未找到该行'
})
return differences, field_diff_count, identical_results
def generate_comparison_summary(total_keys, pro_count, test_count, differences, identical_results, field_diff_count):
"""生成比较总结报告"""
# 计算基本统计
different_records = len(set([list(diff['key'].values())[0] for diff in differences if 'field' in diff]))
identical_records = len(identical_results)
missing_in_test = len([diff for diff in differences if diff.get('message') == '在测试表中未找到该行'])
missing_in_pro = len([diff for diff in differences if diff.get('message') == '在生产表中未找到该行'])
# 计算百分比
def safe_percentage(part, total):
return round((part / total * 100), 2) if total > 0 else 0
identical_percentage = safe_percentage(identical_records, total_keys)
different_percentage = safe_percentage(different_records, total_keys)
# 生成总结
summary = {
'overview': {
'total_keys_queried': total_keys,
'pro_records_found': pro_count,
'test_records_found': test_count,
'identical_records': identical_records,
'different_records': different_records,
'missing_in_test': missing_in_test,
'missing_in_pro': missing_in_pro
},
'percentages': {
'data_consistency': identical_percentage,
'data_differences': different_percentage,
'missing_rate': safe_percentage(missing_in_test + missing_in_pro, total_keys)
},
'field_analysis': {
'total_fields_compared': len(field_diff_count) if field_diff_count else 0,
'most_different_fields': sorted(field_diff_count.items(), key=lambda x: x[1], reverse=True)[:5] if field_diff_count else []
},
'data_quality': {
'completeness': safe_percentage(pro_count + test_count, total_keys * 2),
'consistency_score': identical_percentage,
'quality_level': get_quality_level(identical_percentage)
},
'recommendations': generate_recommendations(identical_percentage, missing_in_test, missing_in_pro, field_diff_count)
}
return summary
def get_quality_level(consistency_percentage):
"""根据一致性百分比获取数据质量等级"""
if consistency_percentage >= 95:
return {'level': '优秀', 'color': 'success', 'description': '数据一致性非常高'}
elif consistency_percentage >= 90:
return {'level': '良好', 'color': 'info', 'description': '数据一致性较高'}
elif consistency_percentage >= 80:
return {'level': '一般', 'color': 'warning', 'description': '数据一致性中等,需要关注'}
else:
return {'level': '较差', 'color': 'danger', 'description': '数据一致性较低,需要重点处理'}
def generate_recommendations(consistency_percentage, missing_in_test, missing_in_pro, field_diff_count):
"""生成改进建议"""
recommendations = []
if consistency_percentage < 90:
recommendations.append('建议重点关注数据一致性问题,检查数据同步机制')
if missing_in_test > 0:
recommendations.append(f'测试环境缺失 {missing_in_test} 条记录,建议检查数据迁移过程')
if missing_in_pro > 0:
recommendations.append(f'生产环境缺失 {missing_in_pro} 条记录,建议检查数据完整性')
if field_diff_count:
top_diff_field = max(field_diff_count.items(), key=lambda x: x[1])
recommendations.append(f'字段 "{top_diff_field[0]}" 差异最多({top_diff_field[1]}次),建议优先处理')
if not recommendations:
recommendations.append('数据质量良好,建议继续保持当前的数据管理流程')
return recommendations
@app.route('/')
def index():
return render_template('index.html')
@app.route('/db-compare')
def db_compare():
return render_template('db_compare.html')
@app.route('/api/query', methods=['POST'])
def query_compare():
try:
data = request.json
logger.info("开始执行数据库比对查询")
# 解析配置
pro_config = data.get('pro_config', DEFAULT_CONFIG['pro_config'])
test_config = data.get('test_config', DEFAULT_CONFIG['test_config'])
keys = data.get('keys', DEFAULT_CONFIG['keys'])
fields_to_compare = data.get('fields_to_compare', DEFAULT_CONFIG['fields_to_compare'])
exclude_fields = data.get('exclude_fields', DEFAULT_CONFIG['exclude_fields'])
values = data.get('values', [])
if not values:
logger.warning("查询失败未提供查询key值")
return jsonify({'error': '请提供查询key值'}), 400
logger.info(f"查询配置:{len(values)}个key值生产表{pro_config['table']},测试表:{test_config['table']}")
# 创建数据库连接
pro_cluster, pro_session = create_connection(pro_config)
test_cluster, test_session = create_connection(test_config)
if not pro_session or not test_session:
logger.error("数据库连接失败")
return jsonify({'error': '数据库连接失败,请检查配置信息'}), 500
try:
# 执行查询
logger.info("执行生产环境查询")
pro_data = execute_query(pro_session, pro_config['table'], keys, fields_to_compare, values, exclude_fields)
logger.info("执行测试环境查询")
test_data = execute_query(test_session, test_config['table'], keys, fields_to_compare, values, exclude_fields)
logger.info(f"查询结果:生产表 {len(pro_data)} 条记录,测试表 {len(test_data)} 条记录")
# 比较结果
differences, field_diff_count, identical_results = compare_results(pro_data, test_data, keys, fields_to_compare, exclude_fields, values)
# 统计信息
different_ids = set()
for diff in differences:
if 'field' in diff:
different_ids.add(list(diff['key'].values())[0])
non_different_ids = set(values) - different_ids
# 生成比较总结
summary = generate_comparison_summary(
len(values), len(pro_data), len(test_data),
differences, identical_results, field_diff_count
)
result = {
'total_keys': len(values),
'pro_count': len(pro_data),
'test_count': len(test_data),
'differences': differences,
'identical_results': identical_results,
'field_diff_count': field_diff_count,
'different_ids': list(different_ids),
'non_different_ids': list(non_different_ids),
'summary': summary,
'raw_pro_data': [dict(row._asdict()) for row in pro_data] if pro_data else [],
'raw_test_data': [dict(row._asdict()) for row in test_data] if test_data else []
}
logger.info(f"比对完成:发现 {len(differences)} 处差异")
return jsonify(result)
except Exception as e:
logger.error(f"查询执行失败:{str(e)}")
return jsonify({'error': f'查询执行失败:{str(e)}'}), 500
finally:
# 关闭连接
if pro_cluster:
pro_cluster.shutdown()
if test_cluster:
test_cluster.shutdown()
except Exception as e:
logger.error(f"请求处理失败:{str(e)}")
return jsonify({'error': f'请求处理失败:{str(e)}'}), 500
@app.route('/api/default-config')
def get_default_config():
return jsonify(DEFAULT_CONFIG)
# 配置组管理API
@app.route('/api/config-groups', methods=['GET'])
def api_get_config_groups():
"""获取所有配置组"""
config_groups = get_config_groups()
return jsonify({'success': True, 'data': config_groups})
@app.route('/api/config-groups', methods=['POST'])
def api_save_config_group():
"""保存配置组"""
try:
data = request.json
name = data.get('name', '').strip()
description = data.get('description', '').strip()
pro_config = data.get('pro_config', {})
test_config = data.get('test_config', {})
query_config = {
'keys': data.get('keys', []),
'fields_to_compare': data.get('fields_to_compare', []),
'exclude_fields': data.get('exclude_fields', [])
}
if not name:
return jsonify({'success': False, 'error': '配置组名称不能为空'}), 400
success = save_config_group(name, description, pro_config, test_config, query_config)
if success:
return jsonify({'success': True, 'message': '配置组保存成功'})
else:
return jsonify({'success': False, 'error': '配置组保存失败'}), 500
except Exception as e:
logger.error(f"保存配置组API失败: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/config-groups/<int:group_id>', methods=['GET'])
def api_get_config_group(group_id):
"""获取指定配置组详情"""
config_group = get_config_group_by_id(group_id)
if config_group:
return jsonify({'success': True, 'data': config_group})
else:
return jsonify({'success': False, 'error': '配置组不存在'}), 404
@app.route('/api/config-groups/<int:group_id>', methods=['DELETE'])
def api_delete_config_group(group_id):
"""删除配置组"""
success = delete_config_group(group_id)
if success:
return jsonify({'success': True, 'message': '配置组删除成功'})
else:
return jsonify({'success': False, 'error': '配置组删除失败'}), 500
@app.route('/api/init-db', methods=['POST'])
def api_init_database():
"""手动初始化数据库(用于测试)"""
success = init_database()
if success:
return jsonify({'success': True, 'message': '数据库初始化成功'})
else:
return jsonify({'success': False, 'error': '数据库初始化失败'}), 500
if __name__ == '__main__':
app.run(debug=True, port=5001)