Files
BigDataTool/modules/data_comparison.py
YoVinchen fe2803f3da 修复 部分 json 数据不能识别
修复 标签字段比对不完全
2025-08-14 16:32:44 +08:00

447 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
数据比较引擎模块
================
本模块是BigDataTool的智能数据比较引擎提供高级的数据差异分析功能。
核心功能:
1. 数据集比较:生产环境与测试环境数据的精确比对
2. JSON智能比较支持复杂JSON结构的深度比较
3. 数组顺序无关比较:数组元素的智能匹配算法
4. 复合主键支持:多字段主键的精确匹配
5. 差异分析:详细的字段级差异统计和分析
6. 数据质量评估:自动生成数据一致性报告
比较算法特性:
- JSON标准化自动处理JSON格式差异空格、顺序等
- 数组智能比较:忽略数组元素顺序的深度比较
- 类型容错:自动处理字符串与数字的类型差异
- 编码处理完善的UTF-8和二进制数据处理
- 性能优化:大数据集的高效比较算法
支持的数据类型:
- 基础类型字符串、数字、布尔值、null
- JSON对象嵌套对象的递归比较
- JSON数组元素级别的智能匹配
- 二进制数据:字节级别的精确比较
- 复合主键:多字段组合的精确匹配
输出格式:
- 差异记录:详细的字段级差异信息
- 统计报告:数据一致性的量化分析
- 质量评估:数据质量等级和改进建议
- 性能指标:比较过程的性能统计
作者BigDataTool项目组
更新时间2024年8月
"""
import json
import logging
logger = logging.getLogger(__name__)
def compare_results(pro_data, test_data, keys, fields_to_compare, exclude_fields, values):
"""比较查询结果,支持复合主键"""
differences = []
field_diff_count = {}
identical_results = [] # 存储相同的结果
def match_composite_key(row, composite_value, keys):
"""检查数据行是否匹配复合主键值"""
if len(keys) == 1:
# 单主键匹配
return getattr(row, keys[0]) == composite_value
else:
# 复合主键匹配
if isinstance(composite_value, str) and ',' in composite_value:
key_values = [v.strip() for v in composite_value.split(',')]
if len(key_values) == len(keys):
return all(str(getattr(row, key)) == key_val for key, key_val in zip(keys, key_values))
# 如果不是复合值,只匹配第一个主键
return getattr(row, keys[0]) == composite_value
for value in values:
# 查找生产表和测试表中该主键值的相关数据
rows_pro = [row for row in pro_data if match_composite_key(row, value, keys)]
rows_test = [row for row in test_data if match_composite_key(row, value, keys)]
for row_pro in rows_pro:
# 在测试表中查找相同主键的行
row_test = next(
(row for row in rows_test if all(getattr(row, key) == getattr(row_pro, key) for key in keys)),
None
)
if row_test:
# 确定要比较的列
columns = fields_to_compare if fields_to_compare else row_pro._fields
columns = [col for col in columns if col not in exclude_fields]
has_difference = False
row_differences = []
identical_fields = {}
for column in columns:
value_pro = getattr(row_pro, column)
value_test = getattr(row_test, column)
# 使用智能比较函数,传递字段名用于标签字段判断
if not compare_values(value_pro, value_test, column):
has_difference = True
# 格式化显示值
formatted_pro_value = format_json_for_display(value_pro)
formatted_test_value = format_json_for_display(value_test)
row_differences.append({
'key': {key: getattr(row_pro, key) for key in keys},
'field': column,
'pro_value': formatted_pro_value,
'test_value': formatted_test_value,
'is_json': is_json_field(value_pro) or is_json_field(value_test),
'is_array': is_json_array_field(value_pro) or is_json_array_field(value_test),
'is_tag': is_tag_field(column, value_pro) or is_tag_field(column, value_test)
})
# 统计字段差异次数
field_diff_count[column] = field_diff_count.get(column, 0) + 1
else:
# 存储相同的字段值
identical_fields[column] = format_json_for_display(value_pro)
if has_difference:
differences.extend(row_differences)
else:
# 如果没有差异,存储到相同结果中
identical_results.append({
'key': {key: getattr(row_pro, key) for key in keys},
'pro_fields': identical_fields,
'test_fields': {col: format_json_for_display(getattr(row_test, col)) for col in columns}
})
else:
# 在测试表中未找到对应行
differences.append({
'key': {key: getattr(row_pro, key) for key in keys},
'message': '在测试表中未找到该行'
})
# 检查测试表中是否有生产表中不存在的行
for row_test in rows_test:
row_pro = next(
(row for row in rows_pro if all(getattr(row, key) == getattr(row_test, key) for key in keys)),
None
)
if not row_pro:
differences.append({
'key': {key: getattr(row_test, key) for key in keys},
'message': '在生产表中未找到该行'
})
return differences, field_diff_count, identical_results
def normalize_json_string(value):
"""标准化JSON字符串用于比较"""
if not isinstance(value, str):
return value
try:
# 尝试解析JSON
json_obj = json.loads(value)
# 如果是数组,需要进行特殊处理
if isinstance(json_obj, list):
# 尝试对数组元素进行标准化排序
normalized_array = normalize_json_array(json_obj)
return json.dumps(normalized_array, sort_keys=True, separators=(',', ':'))
else:
# 普通对象,直接序列化
return json.dumps(json_obj, sort_keys=True, separators=(',', ':'))
except (json.JSONDecodeError, TypeError):
# 如果不是JSON返回原值
return value
def normalize_json_array(json_array):
"""标准化JSON数组处理元素顺序问题"""
try:
normalized_elements = []
for element in json_array:
if isinstance(element, dict):
# 对字典元素进行标准化
normalized_elements.append(json.dumps(element, sort_keys=True, separators=(',', ':')))
elif isinstance(element, str):
# 如果是字符串尝试解析为JSON
try:
parsed_element = json.loads(element)
normalized_elements.append(json.dumps(parsed_element, sort_keys=True, separators=(',', ':')))
except:
normalized_elements.append(element)
else:
normalized_elements.append(element)
# 对标准化后的元素进行排序,确保顺序一致
normalized_elements.sort()
# 重新解析为对象数组
result_array = []
for element in normalized_elements:
if isinstance(element, str):
try:
result_array.append(json.loads(element))
except:
result_array.append(element)
else:
result_array.append(element)
return result_array
except Exception as e:
logger.warning(f"数组标准化失败: {e}")
return json_array
def is_json_array_field(value):
"""检查字段是否为JSON数组格式"""
if not isinstance(value, (str, list)):
return False
try:
if isinstance(value, str):
parsed = json.loads(value)
return isinstance(parsed, list)
elif isinstance(value, list):
# 检查是否为JSON字符串数组
if len(value) > 0 and isinstance(value[0], str):
try:
json.loads(value[0])
return True
except:
return False
return True
except:
return False
def compare_array_values(value1, value2):
"""专门用于比较数组类型的值"""
try:
# 处理字符串表示的数组
if isinstance(value1, str) and isinstance(value2, str):
try:
array1 = json.loads(value1)
array2 = json.loads(value2)
if isinstance(array1, list) and isinstance(array2, list):
return compare_json_arrays(array1, array2)
except:
pass
# 处理Python列表类型
elif isinstance(value1, list) and isinstance(value2, list):
return compare_json_arrays(value1, value2)
# 处理混合情况:一个是字符串数组,一个是列表
elif isinstance(value1, list) and isinstance(value2, str):
try:
array2 = json.loads(value2)
if isinstance(array2, list):
return compare_json_arrays(value1, array2)
except:
pass
elif isinstance(value1, str) and isinstance(value2, list):
try:
array1 = json.loads(value1)
if isinstance(array1, list):
return compare_json_arrays(array1, value2)
except:
pass
return False
except Exception as e:
logger.warning(f"数组比较失败: {e}")
return False
def compare_json_arrays(array1, array2):
"""比较两个JSON数组忽略元素顺序"""
try:
if len(array1) != len(array2):
return False
# 标准化两个数组
normalized_array1 = normalize_json_array(array1.copy())
normalized_array2 = normalize_json_array(array2.copy())
# 将标准化后的数组转换为可比较的格式
comparable1 = json.dumps(normalized_array1, sort_keys=True)
comparable2 = json.dumps(normalized_array2, sort_keys=True)
return comparable1 == comparable2
except Exception as e:
logger.warning(f"JSON数组比较失败: {e}")
return False
def format_json_for_display(value):
"""格式化JSON用于显示"""
# 处理None值
if value is None:
return "null"
# 处理非字符串类型
if not isinstance(value, str):
return str(value)
try:
# 尝试解析JSON
json_obj = json.loads(value)
# 格式化显示(带缩进)
return json.dumps(json_obj, sort_keys=True, indent=2, ensure_ascii=False)
except (json.JSONDecodeError, TypeError):
# 如果不是JSON返回原值
return str(value)
def is_json_field(value):
"""检查字段是否为JSON格式"""
if not isinstance(value, str):
return False
try:
json.loads(value)
return True
except (json.JSONDecodeError, TypeError):
return False
def is_tag_field(field_name, value):
"""判断是否为标签类字段(空格分隔的标签列表)
标签字段特征:
1. 字段名包含 'tag' 关键字
2. 值是字符串类型
3. 包含空格分隔的多个元素
"""
if not isinstance(value, str):
return False
# 检查字段名是否包含tag
if field_name and 'tag' in field_name.lower():
# 检查是否包含空格分隔的多个元素
elements = value.strip().split()
if len(elements) > 1:
return True
return False
def compare_tag_values(value1, value2):
"""比较标签类字段的值(忽略顺序)
将空格分隔的标签字符串拆分成集合进行比较
"""
if not isinstance(value1, str) or not isinstance(value2, str):
return value1 == value2
# 将标签字符串拆分成集合
tags1 = set(value1.strip().split())
tags2 = set(value2.strip().split())
# 比较集合是否相等(忽略顺序)
return tags1 == tags2
def compare_values(value1, value2, field_name=None):
"""智能比较两个值支持JSON标准化、数组比较和标签比较
Args:
value1: 第一个值
value2: 第二个值
field_name: 字段名(可选,用于判断是否为标签字段)
"""
# 检查是否为标签字段
if field_name and (is_tag_field(field_name, value1) or is_tag_field(field_name, value2)):
return compare_tag_values(value1, value2)
# 检查是否为数组类型
if is_json_array_field(value1) or is_json_array_field(value2):
return compare_array_values(value1, value2)
# 如果两个值都是字符串尝试JSON标准化比较
if isinstance(value1, str) and isinstance(value2, str):
normalized_value1 = normalize_json_string(value1)
normalized_value2 = normalize_json_string(value2)
return normalized_value1 == normalized_value2
# 其他情况直接比较
return value1 == value2
def generate_comparison_summary(total_keys, pro_count, test_count, differences, identical_results, field_diff_count):
"""生成比较总结报告"""
# 计算基本统计
different_records = len(set([list(diff['key'].values())[0] for diff in differences if 'field' in diff]))
identical_records = len(identical_results)
missing_in_test = len([diff for diff in differences if diff.get('message') == '在测试表中未找到该行'])
missing_in_pro = len([diff for diff in differences if diff.get('message') == '在生产表中未找到该行'])
# 计算百分比
def safe_percentage(part, total):
return round((part / total * 100), 2) if total > 0 else 0
identical_percentage = safe_percentage(identical_records, total_keys)
different_percentage = safe_percentage(different_records, total_keys)
# 生成总结
summary = {
'overview': {
'total_keys_queried': total_keys,
'pro_records_found': pro_count,
'test_records_found': test_count,
'identical_records': identical_records,
'different_records': different_records,
'missing_in_test': missing_in_test,
'missing_in_pro': missing_in_pro
},
'percentages': {
'data_consistency': identical_percentage,
'data_differences': different_percentage,
'missing_rate': safe_percentage(missing_in_test + missing_in_pro, total_keys)
},
'field_analysis': {
'total_fields_compared': len(field_diff_count) if field_diff_count else 0,
'most_different_fields': sorted(field_diff_count.items(), key=lambda x: x[1], reverse=True)[:5] if field_diff_count else []
},
'data_quality': {
'completeness': safe_percentage(pro_count + test_count, total_keys * 2),
'consistency_score': identical_percentage,
'quality_level': get_quality_level(identical_percentage)
},
'recommendations': generate_recommendations(identical_percentage, missing_in_test, missing_in_pro, field_diff_count)
}
return summary
def get_quality_level(consistency_percentage):
"""根据一致性百分比获取数据质量等级"""
if consistency_percentage >= 95:
return {'level': '优秀', 'color': 'success', 'description': '数据一致性非常高'}
elif consistency_percentage >= 90:
return {'level': '良好', 'color': 'info', 'description': '数据一致性较高'}
elif consistency_percentage >= 80:
return {'level': '一般', 'color': 'warning', 'description': '数据一致性中等,需要关注'}
else:
return {'level': '较差', 'color': 'danger', 'description': '数据一致性较低,需要重点处理'}
def generate_recommendations(consistency_percentage, missing_in_test, missing_in_pro, field_diff_count):
"""生成改进建议"""
recommendations = []
if consistency_percentage < 90:
recommendations.append('建议重点关注数据一致性问题,检查数据同步机制')
if missing_in_test > 0:
recommendations.append(f'测试环境缺失 {missing_in_test} 条记录,建议检查数据迁移过程')
if missing_in_pro > 0:
recommendations.append(f'生产环境缺失 {missing_in_pro} 条记录,建议检查数据完整性')
if field_diff_count:
top_diff_field = max(field_diff_count.items(), key=lambda x: x[1])
recommendations.append(f'字段 "{top_diff_field[0]}" 差异最多({top_diff_field[1]}次),建议优先处理')
if not recommendations:
recommendations.append('数据质量良好,建议继续保持当前的数据管理流程')
return recommendations