Files
BigDataTool/modules/redis_types.py
2025-08-04 21:55:48 +08:00

287 lines
9.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Redis数据类型支持增强模块
支持string、hash、list、set、zset、json等数据类型的比较
"""
import json
import logging
from redis.exceptions import RedisError
logger = logging.getLogger(__name__)
def get_redis_value_with_type(redis_client, key):
"""
获取Redis键值及其数据类型
Args:
redis_client: Redis客户端
key: Redis键名
Returns:
dict: {
'type': 数据类型,
'value': 值,
'display_value': 用于显示的格式化值,
'exists': 是否存在
}
"""
try:
# 检查key是否存在
if not redis_client.exists(key):
return {
'type': None,
'value': None,
'display_value': None,
'exists': False
}
# 获取数据类型
key_type = redis_client.type(key).decode('utf-8')
result = {
'type': key_type,
'exists': True
}
if key_type == 'string':
# 字符串类型
value = redis_client.get(key)
if value:
try:
# 尝试解码为字符串
str_value = value.decode('utf-8')
result['value'] = str_value
# 尝试解析为JSON
try:
json_value = json.loads(str_value)
result['display_value'] = json.dumps(json_value, indent=2, ensure_ascii=False)
result['type'] = 'json_string' # 标记为JSON字符串
except:
result['display_value'] = str_value
except UnicodeDecodeError:
# 二进制数据
result['value'] = value
result['display_value'] = f"<binary data: {len(value)} bytes>"
else:
result['value'] = ""
result['display_value'] = ""
elif key_type == 'hash':
# Hash类型
hash_data = redis_client.hgetall(key)
decoded_hash = {}
for field, value in hash_data.items():
try:
decoded_field = field.decode('utf-8')
decoded_value = value.decode('utf-8')
decoded_hash[decoded_field] = decoded_value
except UnicodeDecodeError:
decoded_hash[str(field)] = f"<binary: {len(value)} bytes>"
result['value'] = decoded_hash
result['display_value'] = json.dumps(decoded_hash, indent=2, ensure_ascii=False)
elif key_type == 'list':
# List类型
list_data = redis_client.lrange(key, 0, -1)
decoded_list = []
for item in list_data:
try:
decoded_item = item.decode('utf-8')
decoded_list.append(decoded_item)
except UnicodeDecodeError:
decoded_list.append(f"<binary: {len(item)} bytes>")
result['value'] = decoded_list
result['display_value'] = json.dumps(decoded_list, indent=2, ensure_ascii=False)
elif key_type == 'set':
# Set类型
set_data = redis_client.smembers(key)
decoded_set = []
for item in set_data:
try:
decoded_item = item.decode('utf-8')
decoded_set.append(decoded_item)
except UnicodeDecodeError:
decoded_set.append(f"<binary: {len(item)} bytes>")
# 排序以便比较
decoded_set.sort()
result['value'] = decoded_set
result['display_value'] = json.dumps(decoded_set, indent=2, ensure_ascii=False)
elif key_type == 'zset':
# Sorted Set类型
zset_data = redis_client.zrange(key, 0, -1, withscores=True)
decoded_zset = []
for member, score in zset_data:
try:
decoded_member = member.decode('utf-8')
decoded_zset.append([decoded_member, score])
except UnicodeDecodeError:
decoded_zset.append([f"<binary: {len(member)} bytes>", score])
result['value'] = decoded_zset
result['display_value'] = json.dumps(decoded_zset, indent=2, ensure_ascii=False)
else:
# 未知类型
result['value'] = f"<unsupported type: {key_type}>"
result['display_value'] = f"<unsupported type: {key_type}>"
return result
except Exception as e:
logger.error(f"获取Redis键值失败 {key}: {e}")
return {
'type': 'error',
'value': None,
'display_value': f"<error: {str(e)}>",
'exists': False
}
def compare_redis_values(value1_info, value2_info):
"""
比较两个Redis值
Args:
value1_info: 第一个值的信息字典
value2_info: 第二个值的信息字典
Returns:
dict: 比较结果
"""
# 检查存在性
if not value1_info['exists'] and not value2_info['exists']:
return {
'status': 'both_missing',
'message': '两个集群都不存在此键'
}
elif not value1_info['exists']:
return {
'status': 'missing_in_cluster1',
'message': '集群1中不存在此键'
}
elif not value2_info['exists']:
return {
'status': 'missing_in_cluster2',
'message': '集群2中不存在此键'
}
# 检查类型
type1 = value1_info['type']
type2 = value2_info['type']
if type1 != type2:
return {
'status': 'different',
'message': f'数据类型不同: {type1} vs {type2}'
}
# 比较值
value1 = value1_info['value']
value2 = value2_info['value']
if type1 in ['string', 'json_string']:
# 字符串比较
if value1 == value2:
return {'status': 'identical', 'message': '值相同'}
else:
return {'status': 'different', 'message': '值不同'}
elif type1 == 'hash':
# Hash比较
if value1 == value2:
return {'status': 'identical', 'message': '哈希值相同'}
else:
# 详细比较哈希字段
keys1 = set(value1.keys())
keys2 = set(value2.keys())
if keys1 != keys2:
return {'status': 'different', 'message': f'哈希字段不同: {keys1 - keys2} vs {keys2 - keys1}'}
diff_fields = []
for key in keys1:
if value1[key] != value2[key]:
diff_fields.append(key)
if diff_fields:
return {'status': 'different', 'message': f'哈希字段值不同: {diff_fields}'}
else:
return {'status': 'identical', 'message': '哈希值相同'}
elif type1 == 'list':
# List比较顺序敏感
if value1 == value2:
return {'status': 'identical', 'message': '列表相同'}
else:
return {'status': 'different', 'message': f'列表不同,长度: {len(value1)} vs {len(value2)}'}
elif type1 == 'set':
# Set比较顺序无关
if set(value1) == set(value2):
return {'status': 'identical', 'message': '集合相同'}
else:
return {'status': 'different', 'message': f'集合不同,大小: {len(value1)} vs {len(value2)}'}
elif type1 == 'zset':
# Sorted Set比较
if value1 == value2:
return {'status': 'identical', 'message': '有序集合相同'}
else:
return {'status': 'different', 'message': f'有序集合不同,大小: {len(value1)} vs {len(value2)}'}
else:
# 其他类型的通用比较
if value1 == value2:
return {'status': 'identical', 'message': '值相同'}
else:
return {'status': 'different', 'message': '值不同'}
def batch_get_redis_values_with_type(redis_client, keys, cluster_name="Redis集群", performance_tracker=None):
"""
批量获取Redis键值及类型信息
Args:
redis_client: Redis客户端
keys: 键名列表
cluster_name: 集群名称
performance_tracker: 性能追踪器
Returns:
list: 每个键的值信息字典列表
"""
import time
start_time = time.time()
results = []
logger.info(f"开始从{cluster_name}批量获取 {len(keys)} 个键的详细信息")
try:
for key in keys:
key_info = get_redis_value_with_type(redis_client, key)
results.append(key_info)
end_time = time.time()
duration = end_time - start_time
if performance_tracker:
performance_tracker.record_query(f"{cluster_name}_detailed_query", duration)
successful_count = sum(1 for r in results if r['exists'])
logger.info(f"{cluster_name}详细查询完成,成功获取 {successful_count}/{len(keys)} 个值,耗时 {duration:.3f}")
return results
except Exception as e:
logger.error(f"{cluster_name}批量详细查询失败: {e}")
# 返回错误占位符
return [{'type': 'error', 'value': None, 'display_value': f'<error: {e}>', 'exists': False} for _ in keys]