447 lines
18 KiB
Python
447 lines
18 KiB
Python
"""
|
||
数据比较引擎模块
|
||
================
|
||
|
||
本模块是BigDataTool的智能数据比较引擎,提供高级的数据差异分析功能。
|
||
|
||
核心功能:
|
||
1. 数据集比较:生产环境与测试环境数据的精确比对
|
||
2. JSON智能比较:支持复杂JSON结构的深度比较
|
||
3. 数组顺序无关比较:数组元素的智能匹配算法
|
||
4. 复合主键支持:多字段主键的精确匹配
|
||
5. 差异分析:详细的字段级差异统计和分析
|
||
6. 数据质量评估:自动生成数据一致性报告
|
||
|
||
比较算法特性:
|
||
- JSON标准化:自动处理JSON格式差异(空格、顺序等)
|
||
- 数组智能比较:忽略数组元素顺序的深度比较
|
||
- 类型容错:自动处理字符串与数字的类型差异
|
||
- 编码处理:完善的UTF-8和二进制数据处理
|
||
- 性能优化:大数据集的高效比较算法
|
||
|
||
支持的数据类型:
|
||
- 基础类型:字符串、数字、布尔值、null
|
||
- JSON对象:嵌套对象的递归比较
|
||
- JSON数组:元素级别的智能匹配
|
||
- 二进制数据:字节级别的精确比较
|
||
- 复合主键:多字段组合的精确匹配
|
||
|
||
输出格式:
|
||
- 差异记录:详细的字段级差异信息
|
||
- 统计报告:数据一致性的量化分析
|
||
- 质量评估:数据质量等级和改进建议
|
||
- 性能指标:比较过程的性能统计
|
||
|
||
作者:BigDataTool项目组
|
||
更新时间:2024年8月
|
||
"""
|
||
|
||
import json
|
||
import logging
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
def compare_results(pro_data, test_data, keys, fields_to_compare, exclude_fields, values):
|
||
"""比较查询结果,支持复合主键"""
|
||
differences = []
|
||
field_diff_count = {}
|
||
identical_results = [] # 存储相同的结果
|
||
|
||
def match_composite_key(row, composite_value, keys):
|
||
"""检查数据行是否匹配复合主键值"""
|
||
if len(keys) == 1:
|
||
# 单主键匹配
|
||
return getattr(row, keys[0]) == composite_value
|
||
else:
|
||
# 复合主键匹配
|
||
if isinstance(composite_value, str) and ',' in composite_value:
|
||
key_values = [v.strip() for v in composite_value.split(',')]
|
||
if len(key_values) == len(keys):
|
||
return all(str(getattr(row, key)) == key_val for key, key_val in zip(keys, key_values))
|
||
# 如果不是复合值,只匹配第一个主键
|
||
return getattr(row, keys[0]) == composite_value
|
||
|
||
for value in values:
|
||
# 查找生产表和测试表中该主键值的相关数据
|
||
rows_pro = [row for row in pro_data if match_composite_key(row, value, keys)]
|
||
rows_test = [row for row in test_data if match_composite_key(row, value, keys)]
|
||
|
||
for row_pro in rows_pro:
|
||
# 在测试表中查找相同主键的行
|
||
row_test = next(
|
||
(row for row in rows_test if all(getattr(row, key) == getattr(row_pro, key) for key in keys)),
|
||
None
|
||
)
|
||
|
||
if row_test:
|
||
# 确定要比较的列
|
||
columns = fields_to_compare if fields_to_compare else row_pro._fields
|
||
columns = [col for col in columns if col not in exclude_fields]
|
||
|
||
has_difference = False
|
||
row_differences = []
|
||
identical_fields = {}
|
||
|
||
for column in columns:
|
||
value_pro = getattr(row_pro, column)
|
||
value_test = getattr(row_test, column)
|
||
|
||
# 使用智能比较函数,传递字段名用于标签字段判断
|
||
if not compare_values(value_pro, value_test, column):
|
||
has_difference = True
|
||
# 格式化显示值
|
||
formatted_pro_value = format_json_for_display(value_pro)
|
||
formatted_test_value = format_json_for_display(value_test)
|
||
|
||
row_differences.append({
|
||
'key': {key: getattr(row_pro, key) for key in keys},
|
||
'field': column,
|
||
'pro_value': formatted_pro_value,
|
||
'test_value': formatted_test_value,
|
||
'is_json': is_json_field(value_pro) or is_json_field(value_test),
|
||
'is_array': is_json_array_field(value_pro) or is_json_array_field(value_test),
|
||
'is_tag': is_tag_field(column, value_pro) or is_tag_field(column, value_test)
|
||
})
|
||
|
||
# 统计字段差异次数
|
||
field_diff_count[column] = field_diff_count.get(column, 0) + 1
|
||
else:
|
||
# 存储相同的字段值
|
||
identical_fields[column] = format_json_for_display(value_pro)
|
||
|
||
if has_difference:
|
||
differences.extend(row_differences)
|
||
else:
|
||
# 如果没有差异,存储到相同结果中
|
||
identical_results.append({
|
||
'key': {key: getattr(row_pro, key) for key in keys},
|
||
'pro_fields': identical_fields,
|
||
'test_fields': {col: format_json_for_display(getattr(row_test, col)) for col in columns}
|
||
})
|
||
else:
|
||
# 在测试表中未找到对应行
|
||
differences.append({
|
||
'key': {key: getattr(row_pro, key) for key in keys},
|
||
'message': '在测试表中未找到该行'
|
||
})
|
||
|
||
# 检查测试表中是否有生产表中不存在的行
|
||
for row_test in rows_test:
|
||
row_pro = next(
|
||
(row for row in rows_pro if all(getattr(row, key) == getattr(row_test, key) for key in keys)),
|
||
None
|
||
)
|
||
if not row_pro:
|
||
differences.append({
|
||
'key': {key: getattr(row_test, key) for key in keys},
|
||
'message': '在生产表中未找到该行'
|
||
})
|
||
|
||
return differences, field_diff_count, identical_results
|
||
|
||
def normalize_json_string(value):
|
||
"""标准化JSON字符串,用于比较"""
|
||
if not isinstance(value, str):
|
||
return value
|
||
|
||
try:
|
||
# 尝试解析JSON
|
||
json_obj = json.loads(value)
|
||
|
||
# 如果是数组,需要进行特殊处理
|
||
if isinstance(json_obj, list):
|
||
# 尝试对数组元素进行标准化排序
|
||
normalized_array = normalize_json_array(json_obj)
|
||
return json.dumps(normalized_array, sort_keys=True, separators=(',', ':'))
|
||
else:
|
||
# 普通对象,直接序列化
|
||
return json.dumps(json_obj, sort_keys=True, separators=(',', ':'))
|
||
except (json.JSONDecodeError, TypeError):
|
||
# 如果不是JSON,返回原值
|
||
return value
|
||
|
||
def normalize_json_array(json_array):
|
||
"""标准化JSON数组,处理元素顺序问题"""
|
||
try:
|
||
normalized_elements = []
|
||
|
||
for element in json_array:
|
||
if isinstance(element, dict):
|
||
# 对字典元素进行标准化
|
||
normalized_elements.append(json.dumps(element, sort_keys=True, separators=(',', ':')))
|
||
elif isinstance(element, str):
|
||
# 如果是字符串,尝试解析为JSON
|
||
try:
|
||
parsed_element = json.loads(element)
|
||
normalized_elements.append(json.dumps(parsed_element, sort_keys=True, separators=(',', ':')))
|
||
except:
|
||
normalized_elements.append(element)
|
||
else:
|
||
normalized_elements.append(element)
|
||
|
||
# 对标准化后的元素进行排序,确保顺序一致
|
||
normalized_elements.sort()
|
||
|
||
# 重新解析为对象数组
|
||
result_array = []
|
||
for element in normalized_elements:
|
||
if isinstance(element, str):
|
||
try:
|
||
result_array.append(json.loads(element))
|
||
except:
|
||
result_array.append(element)
|
||
else:
|
||
result_array.append(element)
|
||
|
||
return result_array
|
||
|
||
except Exception as e:
|
||
logger.warning(f"数组标准化失败: {e}")
|
||
return json_array
|
||
|
||
def is_json_array_field(value):
|
||
"""检查字段是否为JSON数组格式"""
|
||
if not isinstance(value, (str, list)):
|
||
return False
|
||
|
||
try:
|
||
if isinstance(value, str):
|
||
parsed = json.loads(value)
|
||
return isinstance(parsed, list)
|
||
elif isinstance(value, list):
|
||
# 检查是否为JSON字符串数组
|
||
if len(value) > 0 and isinstance(value[0], str):
|
||
try:
|
||
json.loads(value[0])
|
||
return True
|
||
except:
|
||
return False
|
||
return True
|
||
except:
|
||
return False
|
||
|
||
def compare_array_values(value1, value2):
|
||
"""专门用于比较数组类型的值"""
|
||
try:
|
||
# 处理字符串表示的数组
|
||
if isinstance(value1, str) and isinstance(value2, str):
|
||
try:
|
||
array1 = json.loads(value1)
|
||
array2 = json.loads(value2)
|
||
if isinstance(array1, list) and isinstance(array2, list):
|
||
return compare_json_arrays(array1, array2)
|
||
except:
|
||
pass
|
||
|
||
# 处理Python列表类型
|
||
elif isinstance(value1, list) and isinstance(value2, list):
|
||
return compare_json_arrays(value1, value2)
|
||
|
||
# 处理混合情况:一个是字符串数组,一个是列表
|
||
elif isinstance(value1, list) and isinstance(value2, str):
|
||
try:
|
||
array2 = json.loads(value2)
|
||
if isinstance(array2, list):
|
||
return compare_json_arrays(value1, array2)
|
||
except:
|
||
pass
|
||
elif isinstance(value1, str) and isinstance(value2, list):
|
||
try:
|
||
array1 = json.loads(value1)
|
||
if isinstance(array1, list):
|
||
return compare_json_arrays(array1, value2)
|
||
except:
|
||
pass
|
||
|
||
return False
|
||
except Exception as e:
|
||
logger.warning(f"数组比较失败: {e}")
|
||
return False
|
||
|
||
def compare_json_arrays(array1, array2):
|
||
"""比较两个JSON数组,忽略元素顺序"""
|
||
try:
|
||
if len(array1) != len(array2):
|
||
return False
|
||
|
||
# 标准化两个数组
|
||
normalized_array1 = normalize_json_array(array1.copy())
|
||
normalized_array2 = normalize_json_array(array2.copy())
|
||
|
||
# 将标准化后的数组转换为可比较的格式
|
||
comparable1 = json.dumps(normalized_array1, sort_keys=True)
|
||
comparable2 = json.dumps(normalized_array2, sort_keys=True)
|
||
|
||
return comparable1 == comparable2
|
||
|
||
except Exception as e:
|
||
logger.warning(f"JSON数组比较失败: {e}")
|
||
return False
|
||
|
||
def format_json_for_display(value):
|
||
"""格式化JSON用于显示"""
|
||
# 处理None值
|
||
if value is None:
|
||
return "null"
|
||
|
||
# 处理非字符串类型
|
||
if not isinstance(value, str):
|
||
return str(value)
|
||
|
||
try:
|
||
# 尝试解析JSON
|
||
json_obj = json.loads(value)
|
||
# 格式化显示(带缩进)
|
||
return json.dumps(json_obj, sort_keys=True, indent=2, ensure_ascii=False)
|
||
except (json.JSONDecodeError, TypeError):
|
||
# 如果不是JSON,返回原值
|
||
return str(value)
|
||
|
||
def is_json_field(value):
|
||
"""检查字段是否为JSON格式"""
|
||
if not isinstance(value, str):
|
||
return False
|
||
|
||
try:
|
||
json.loads(value)
|
||
return True
|
||
except (json.JSONDecodeError, TypeError):
|
||
return False
|
||
|
||
def is_tag_field(field_name, value):
|
||
"""判断是否为标签类字段(空格分隔的标签列表)
|
||
|
||
标签字段特征:
|
||
1. 字段名包含 'tag' 关键字
|
||
2. 值是字符串类型
|
||
3. 包含空格分隔的多个元素
|
||
"""
|
||
if not isinstance(value, str):
|
||
return False
|
||
|
||
# 检查字段名是否包含tag
|
||
if field_name and 'tag' in field_name.lower():
|
||
# 检查是否包含空格分隔的多个元素
|
||
elements = value.strip().split()
|
||
if len(elements) > 1:
|
||
return True
|
||
|
||
return False
|
||
|
||
def compare_tag_values(value1, value2):
|
||
"""比较标签类字段的值(忽略顺序)
|
||
|
||
将空格分隔的标签字符串拆分成集合进行比较
|
||
"""
|
||
if not isinstance(value1, str) or not isinstance(value2, str):
|
||
return value1 == value2
|
||
|
||
# 将标签字符串拆分成集合
|
||
tags1 = set(value1.strip().split())
|
||
tags2 = set(value2.strip().split())
|
||
|
||
# 比较集合是否相等(忽略顺序)
|
||
return tags1 == tags2
|
||
|
||
def compare_values(value1, value2, field_name=None):
|
||
"""智能比较两个值,支持JSON标准化、数组比较和标签比较
|
||
|
||
Args:
|
||
value1: 第一个值
|
||
value2: 第二个值
|
||
field_name: 字段名(可选,用于判断是否为标签字段)
|
||
"""
|
||
# 检查是否为标签字段
|
||
if field_name and (is_tag_field(field_name, value1) or is_tag_field(field_name, value2)):
|
||
return compare_tag_values(value1, value2)
|
||
|
||
# 检查是否为数组类型
|
||
if is_json_array_field(value1) or is_json_array_field(value2):
|
||
return compare_array_values(value1, value2)
|
||
|
||
# 如果两个值都是字符串,尝试JSON标准化比较
|
||
if isinstance(value1, str) and isinstance(value2, str):
|
||
normalized_value1 = normalize_json_string(value1)
|
||
normalized_value2 = normalize_json_string(value2)
|
||
return normalized_value1 == normalized_value2
|
||
|
||
# 其他情况直接比较
|
||
return value1 == value2
|
||
|
||
def generate_comparison_summary(total_keys, pro_count, test_count, differences, identical_results, field_diff_count):
|
||
"""生成比较总结报告"""
|
||
# 计算基本统计
|
||
different_records = len(set([list(diff['key'].values())[0] for diff in differences if 'field' in diff]))
|
||
identical_records = len(identical_results)
|
||
missing_in_test = len([diff for diff in differences if diff.get('message') == '在测试表中未找到该行'])
|
||
missing_in_pro = len([diff for diff in differences if diff.get('message') == '在生产表中未找到该行'])
|
||
|
||
# 计算百分比
|
||
def safe_percentage(part, total):
|
||
return round((part / total * 100), 2) if total > 0 else 0
|
||
|
||
identical_percentage = safe_percentage(identical_records, total_keys)
|
||
different_percentage = safe_percentage(different_records, total_keys)
|
||
|
||
# 生成总结
|
||
summary = {
|
||
'overview': {
|
||
'total_keys_queried': total_keys,
|
||
'pro_records_found': pro_count,
|
||
'test_records_found': test_count,
|
||
'identical_records': identical_records,
|
||
'different_records': different_records,
|
||
'missing_in_test': missing_in_test,
|
||
'missing_in_pro': missing_in_pro
|
||
},
|
||
'percentages': {
|
||
'data_consistency': identical_percentage,
|
||
'data_differences': different_percentage,
|
||
'missing_rate': safe_percentage(missing_in_test + missing_in_pro, total_keys)
|
||
},
|
||
'field_analysis': {
|
||
'total_fields_compared': len(field_diff_count) if field_diff_count else 0,
|
||
'most_different_fields': sorted(field_diff_count.items(), key=lambda x: x[1], reverse=True)[:5] if field_diff_count else []
|
||
},
|
||
'data_quality': {
|
||
'completeness': safe_percentage(pro_count + test_count, total_keys * 2),
|
||
'consistency_score': identical_percentage,
|
||
'quality_level': get_quality_level(identical_percentage)
|
||
},
|
||
'recommendations': generate_recommendations(identical_percentage, missing_in_test, missing_in_pro, field_diff_count)
|
||
}
|
||
|
||
return summary
|
||
|
||
def get_quality_level(consistency_percentage):
|
||
"""根据一致性百分比获取数据质量等级"""
|
||
if consistency_percentage >= 95:
|
||
return {'level': '优秀', 'color': 'success', 'description': '数据一致性非常高'}
|
||
elif consistency_percentage >= 90:
|
||
return {'level': '良好', 'color': 'info', 'description': '数据一致性较高'}
|
||
elif consistency_percentage >= 80:
|
||
return {'level': '一般', 'color': 'warning', 'description': '数据一致性中等,需要关注'}
|
||
else:
|
||
return {'level': '较差', 'color': 'danger', 'description': '数据一致性较低,需要重点处理'}
|
||
|
||
def generate_recommendations(consistency_percentage, missing_in_test, missing_in_pro, field_diff_count):
|
||
"""生成改进建议"""
|
||
recommendations = []
|
||
|
||
if consistency_percentage < 90:
|
||
recommendations.append('建议重点关注数据一致性问题,检查数据同步机制')
|
||
|
||
if missing_in_test > 0:
|
||
recommendations.append(f'测试环境缺失 {missing_in_test} 条记录,建议检查数据迁移过程')
|
||
|
||
if missing_in_pro > 0:
|
||
recommendations.append(f'生产环境缺失 {missing_in_pro} 条记录,建议检查数据完整性')
|
||
|
||
if field_diff_count:
|
||
top_diff_field = max(field_diff_count.items(), key=lambda x: x[1])
|
||
recommendations.append(f'字段 "{top_diff_field[0]}" 差异最多({top_diff_field[1]}次),建议优先处理')
|
||
|
||
if not recommendations:
|
||
recommendations.append('数据质量良好,建议继续保持当前的数据管理流程')
|
||
|
||
return recommendations |