修复reids
This commit is contained in:
287
modules/redis_types.py
Normal file
287
modules/redis_types.py
Normal file
@@ -0,0 +1,287 @@
|
||||
"""
|
||||
Redis数据类型支持增强模块
|
||||
支持string、hash、list、set、zset、json等数据类型的比较
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
from redis.exceptions import RedisError
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
def get_redis_value_with_type(redis_client, key):
|
||||
"""
|
||||
获取Redis键值及其数据类型
|
||||
|
||||
Args:
|
||||
redis_client: Redis客户端
|
||||
key: Redis键名
|
||||
|
||||
Returns:
|
||||
dict: {
|
||||
'type': 数据类型,
|
||||
'value': 值,
|
||||
'display_value': 用于显示的格式化值,
|
||||
'exists': 是否存在
|
||||
}
|
||||
"""
|
||||
try:
|
||||
# 检查key是否存在
|
||||
if not redis_client.exists(key):
|
||||
return {
|
||||
'type': None,
|
||||
'value': None,
|
||||
'display_value': None,
|
||||
'exists': False
|
||||
}
|
||||
|
||||
# 获取数据类型
|
||||
key_type = redis_client.type(key).decode('utf-8')
|
||||
result = {
|
||||
'type': key_type,
|
||||
'exists': True
|
||||
}
|
||||
|
||||
if key_type == 'string':
|
||||
# 字符串类型
|
||||
value = redis_client.get(key)
|
||||
if value:
|
||||
try:
|
||||
# 尝试解码为字符串
|
||||
str_value = value.decode('utf-8')
|
||||
result['value'] = str_value
|
||||
|
||||
# 尝试解析为JSON
|
||||
try:
|
||||
json_value = json.loads(str_value)
|
||||
result['display_value'] = json.dumps(json_value, indent=2, ensure_ascii=False)
|
||||
result['type'] = 'json_string' # 标记为JSON字符串
|
||||
except:
|
||||
result['display_value'] = str_value
|
||||
|
||||
except UnicodeDecodeError:
|
||||
# 二进制数据
|
||||
result['value'] = value
|
||||
result['display_value'] = f"<binary data: {len(value)} bytes>"
|
||||
else:
|
||||
result['value'] = ""
|
||||
result['display_value'] = ""
|
||||
|
||||
elif key_type == 'hash':
|
||||
# Hash类型
|
||||
hash_data = redis_client.hgetall(key)
|
||||
decoded_hash = {}
|
||||
|
||||
for field, value in hash_data.items():
|
||||
try:
|
||||
decoded_field = field.decode('utf-8')
|
||||
decoded_value = value.decode('utf-8')
|
||||
decoded_hash[decoded_field] = decoded_value
|
||||
except UnicodeDecodeError:
|
||||
decoded_hash[str(field)] = f"<binary: {len(value)} bytes>"
|
||||
|
||||
result['value'] = decoded_hash
|
||||
result['display_value'] = json.dumps(decoded_hash, indent=2, ensure_ascii=False)
|
||||
|
||||
elif key_type == 'list':
|
||||
# List类型
|
||||
list_data = redis_client.lrange(key, 0, -1)
|
||||
decoded_list = []
|
||||
|
||||
for item in list_data:
|
||||
try:
|
||||
decoded_item = item.decode('utf-8')
|
||||
decoded_list.append(decoded_item)
|
||||
except UnicodeDecodeError:
|
||||
decoded_list.append(f"<binary: {len(item)} bytes>")
|
||||
|
||||
result['value'] = decoded_list
|
||||
result['display_value'] = json.dumps(decoded_list, indent=2, ensure_ascii=False)
|
||||
|
||||
elif key_type == 'set':
|
||||
# Set类型
|
||||
set_data = redis_client.smembers(key)
|
||||
decoded_set = []
|
||||
|
||||
for item in set_data:
|
||||
try:
|
||||
decoded_item = item.decode('utf-8')
|
||||
decoded_set.append(decoded_item)
|
||||
except UnicodeDecodeError:
|
||||
decoded_set.append(f"<binary: {len(item)} bytes>")
|
||||
|
||||
# 排序以便比较
|
||||
decoded_set.sort()
|
||||
result['value'] = decoded_set
|
||||
result['display_value'] = json.dumps(decoded_set, indent=2, ensure_ascii=False)
|
||||
|
||||
elif key_type == 'zset':
|
||||
# Sorted Set类型
|
||||
zset_data = redis_client.zrange(key, 0, -1, withscores=True)
|
||||
decoded_zset = []
|
||||
|
||||
for member, score in zset_data:
|
||||
try:
|
||||
decoded_member = member.decode('utf-8')
|
||||
decoded_zset.append([decoded_member, score])
|
||||
except UnicodeDecodeError:
|
||||
decoded_zset.append([f"<binary: {len(member)} bytes>", score])
|
||||
|
||||
result['value'] = decoded_zset
|
||||
result['display_value'] = json.dumps(decoded_zset, indent=2, ensure_ascii=False)
|
||||
|
||||
else:
|
||||
# 未知类型
|
||||
result['value'] = f"<unsupported type: {key_type}>"
|
||||
result['display_value'] = f"<unsupported type: {key_type}>"
|
||||
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"获取Redis键值失败 {key}: {e}")
|
||||
return {
|
||||
'type': 'error',
|
||||
'value': None,
|
||||
'display_value': f"<error: {str(e)}>",
|
||||
'exists': False
|
||||
}
|
||||
|
||||
def compare_redis_values(value1_info, value2_info):
|
||||
"""
|
||||
比较两个Redis值
|
||||
|
||||
Args:
|
||||
value1_info: 第一个值的信息字典
|
||||
value2_info: 第二个值的信息字典
|
||||
|
||||
Returns:
|
||||
dict: 比较结果
|
||||
"""
|
||||
# 检查存在性
|
||||
if not value1_info['exists'] and not value2_info['exists']:
|
||||
return {
|
||||
'status': 'both_missing',
|
||||
'message': '两个集群都不存在此键'
|
||||
}
|
||||
elif not value1_info['exists']:
|
||||
return {
|
||||
'status': 'missing_in_cluster1',
|
||||
'message': '集群1中不存在此键'
|
||||
}
|
||||
elif not value2_info['exists']:
|
||||
return {
|
||||
'status': 'missing_in_cluster2',
|
||||
'message': '集群2中不存在此键'
|
||||
}
|
||||
|
||||
# 检查类型
|
||||
type1 = value1_info['type']
|
||||
type2 = value2_info['type']
|
||||
|
||||
if type1 != type2:
|
||||
return {
|
||||
'status': 'different',
|
||||
'message': f'数据类型不同: {type1} vs {type2}'
|
||||
}
|
||||
|
||||
# 比较值
|
||||
value1 = value1_info['value']
|
||||
value2 = value2_info['value']
|
||||
|
||||
if type1 in ['string', 'json_string']:
|
||||
# 字符串比较
|
||||
if value1 == value2:
|
||||
return {'status': 'identical', 'message': '值相同'}
|
||||
else:
|
||||
return {'status': 'different', 'message': '值不同'}
|
||||
|
||||
elif type1 == 'hash':
|
||||
# Hash比较
|
||||
if value1 == value2:
|
||||
return {'status': 'identical', 'message': '哈希值相同'}
|
||||
else:
|
||||
# 详细比较哈希字段
|
||||
keys1 = set(value1.keys())
|
||||
keys2 = set(value2.keys())
|
||||
|
||||
if keys1 != keys2:
|
||||
return {'status': 'different', 'message': f'哈希字段不同: {keys1 - keys2} vs {keys2 - keys1}'}
|
||||
|
||||
diff_fields = []
|
||||
for key in keys1:
|
||||
if value1[key] != value2[key]:
|
||||
diff_fields.append(key)
|
||||
|
||||
if diff_fields:
|
||||
return {'status': 'different', 'message': f'哈希字段值不同: {diff_fields}'}
|
||||
else:
|
||||
return {'status': 'identical', 'message': '哈希值相同'}
|
||||
|
||||
elif type1 == 'list':
|
||||
# List比较(顺序敏感)
|
||||
if value1 == value2:
|
||||
return {'status': 'identical', 'message': '列表相同'}
|
||||
else:
|
||||
return {'status': 'different', 'message': f'列表不同,长度: {len(value1)} vs {len(value2)}'}
|
||||
|
||||
elif type1 == 'set':
|
||||
# Set比较(顺序无关)
|
||||
if set(value1) == set(value2):
|
||||
return {'status': 'identical', 'message': '集合相同'}
|
||||
else:
|
||||
return {'status': 'different', 'message': f'集合不同,大小: {len(value1)} vs {len(value2)}'}
|
||||
|
||||
elif type1 == 'zset':
|
||||
# Sorted Set比较
|
||||
if value1 == value2:
|
||||
return {'status': 'identical', 'message': '有序集合相同'}
|
||||
else:
|
||||
return {'status': 'different', 'message': f'有序集合不同,大小: {len(value1)} vs {len(value2)}'}
|
||||
|
||||
else:
|
||||
# 其他类型的通用比较
|
||||
if value1 == value2:
|
||||
return {'status': 'identical', 'message': '值相同'}
|
||||
else:
|
||||
return {'status': 'different', 'message': '值不同'}
|
||||
|
||||
def batch_get_redis_values_with_type(redis_client, keys, cluster_name="Redis集群", performance_tracker=None):
|
||||
"""
|
||||
批量获取Redis键值及类型信息
|
||||
|
||||
Args:
|
||||
redis_client: Redis客户端
|
||||
keys: 键名列表
|
||||
cluster_name: 集群名称
|
||||
performance_tracker: 性能追踪器
|
||||
|
||||
Returns:
|
||||
list: 每个键的值信息字典列表
|
||||
"""
|
||||
import time
|
||||
|
||||
start_time = time.time()
|
||||
results = []
|
||||
|
||||
logger.info(f"开始从{cluster_name}批量获取 {len(keys)} 个键的详细信息")
|
||||
|
||||
try:
|
||||
for key in keys:
|
||||
key_info = get_redis_value_with_type(redis_client, key)
|
||||
results.append(key_info)
|
||||
|
||||
end_time = time.time()
|
||||
duration = end_time - start_time
|
||||
|
||||
if performance_tracker:
|
||||
performance_tracker.record_query(f"{cluster_name}_detailed_query", duration)
|
||||
|
||||
successful_count = sum(1 for r in results if r['exists'])
|
||||
logger.info(f"从{cluster_name}详细查询完成,成功获取 {successful_count}/{len(keys)} 个值,耗时 {duration:.3f} 秒")
|
||||
|
||||
return results
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"从{cluster_name}批量详细查询失败: {e}")
|
||||
# 返回错误占位符
|
||||
return [{'type': 'error', 'value': None, 'display_value': f'<error: {e}>', 'exists': False} for _ in keys]
|
Reference in New Issue
Block a user