Files
BigDataTool/modules/redis_types.py
2025-08-12 16:27:00 +08:00

460 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Redis数据类型支持增强模块
================================
本模块提供对Redis所有主要数据类型的完整支持包括
- String类型包括JSON字符串的智能检测和格式化
- Hash类型键值对映射
- List类型有序列表
- Set类型无序集合
- ZSet类型有序集合带分数
- Stream类型消息流完整支持消息解析和比较
主要功能:
1. get_redis_value_with_type() - 获取任意类型的Redis键值
2. compare_redis_values() - 智能比较不同数据类型的值
3. batch_get_redis_values_with_type() - 批量获取键值信息
设计特点:
- 类型安全自动检测并处理每种Redis数据类型
- 编码处理完善的UTF-8解码和二进制数据处理
- JSON支持智能识别和格式化JSON字符串
- Stream支持完整的Stream消息结构解析和比较
- 错误处理:优雅处理连接错误和数据异常
作者BigDataTool项目组
更新时间2024年8月
"""
import json
import logging
from redis.exceptions import RedisError
logger = logging.getLogger(__name__)
def get_redis_value_with_type(redis_client, key):
"""
获取Redis键值及其数据类型的完整信息
这是本模块的核心函数支持所有Redis数据类型的获取和解析。
它会自动检测键的类型然后使用相应的Redis命令获取数据
并进行适当的格式化处理。
Args:
redis_client: Redis客户端连接对象
key (str): 要查询的Redis键名
Returns:
dict: 包含以下字段的字典
- 'type' (str): Redis数据类型 ('string', 'hash', 'list', 'set', 'zset', 'stream')
- 'value': 解析后的原始值Python对象
- 'display_value' (str): 格式化后用于显示的字符串
- 'exists' (bool): 键是否存在
支持的数据类型处理:
- String: 自动检测JSON格式支持二进制数据
- Hash: 完整的字段映射UTF-8解码
- List: 有序列表,保持原始顺序
- Set: 无序集合,自动排序便于比较
- ZSet: 有序集合,包含成员和分数
- Stream: 完整的消息流解析,包含元数据和消息内容
异常处理:
- 连接异常:返回错误状态
- 编码异常:标记为二进制数据
- 数据异常:记录警告并提供基本信息
示例:
>>> result = get_redis_value_with_type(client, "user:example")
>>> print(result['type']) # 'string'
>>> print(result['value']) # 'John Doe'
>>> print(result['exists']) # True
"""
try:
# 检查key是否存在
if not redis_client.exists(key):
return {
'type': None,
'value': None,
'display_value': None,
'exists': False
}
# 获取数据类型
key_type = redis_client.type(key).decode('utf-8')
result = {
'type': key_type,
'exists': True
}
if key_type == 'string':
# String类型处理 - 支持普通字符串和JSON字符串的智能识别
value = redis_client.get(key)
if value:
try:
# 尝试UTF-8解码
str_value = value.decode('utf-8')
result['value'] = str_value
# 智能检测JSON格式并格式化显示
try:
json_value = json.loads(str_value)
result['display_value'] = json.dumps(json_value, indent=2, ensure_ascii=False)
result['type'] = 'json_string' # 标记为JSON字符串类型
except json.JSONDecodeError:
# 不是JSON格式直接显示字符串内容
result['display_value'] = str_value
except UnicodeDecodeError:
# 处理二进制数据 - 无法UTF-8解码的数据
result['value'] = value
result['display_value'] = f"<binary data: {len(value)} bytes>"
else:
# 空字符串处理
result['value'] = ""
result['display_value'] = ""
elif key_type == 'hash':
# Hash类型
hash_data = redis_client.hgetall(key)
decoded_hash = {}
for field, value in hash_data.items():
try:
decoded_field = field.decode('utf-8')
decoded_value = value.decode('utf-8')
decoded_hash[decoded_field] = decoded_value
except UnicodeDecodeError:
decoded_hash[str(field)] = f"<binary: {len(value)} bytes>"
result['value'] = decoded_hash
result['display_value'] = json.dumps(decoded_hash, indent=2, ensure_ascii=False)
elif key_type == 'list':
# List类型
list_data = redis_client.lrange(key, 0, -1)
decoded_list = []
for item in list_data:
try:
decoded_item = item.decode('utf-8')
decoded_list.append(decoded_item)
except UnicodeDecodeError:
decoded_list.append(f"<binary: {len(item)} bytes>")
result['value'] = decoded_list
result['display_value'] = json.dumps(decoded_list, indent=2, ensure_ascii=False)
elif key_type == 'set':
# Set类型
set_data = redis_client.smembers(key)
decoded_set = []
for item in set_data:
try:
decoded_item = item.decode('utf-8')
decoded_set.append(decoded_item)
except UnicodeDecodeError:
decoded_set.append(f"<binary: {len(item)} bytes>")
# 排序以便比较
decoded_set.sort()
result['value'] = decoded_set
result['display_value'] = json.dumps(decoded_set, indent=2, ensure_ascii=False)
elif key_type == 'zset':
# Sorted Set类型
zset_data = redis_client.zrange(key, 0, -1, withscores=True)
decoded_zset = []
for member, score in zset_data:
try:
decoded_member = member.decode('utf-8')
decoded_zset.append([decoded_member, score])
except UnicodeDecodeError:
decoded_zset.append([f"<binary: {len(member)} bytes>", score])
result['value'] = decoded_zset
result['display_value'] = json.dumps(decoded_zset, indent=2, ensure_ascii=False)
elif key_type == 'stream':
# Stream类型
try:
# 获取Stream信息
stream_info = redis_client.xinfo_stream(key)
# 获取Stream中的消息最多获取100条最新消息
stream_messages = redis_client.xrange(key, count=100)
# 解析Stream数据
decoded_stream = {
'info': {
'length': stream_info.get('length', 0),
'radix_tree_keys': stream_info.get('radix-tree-keys', 0),
'radix_tree_nodes': stream_info.get('radix-tree-nodes', 0),
'last_generated_id': stream_info.get('last-generated-id', '').decode('utf-8') if stream_info.get('last-generated-id') else '',
'first_entry': None,
'last_entry': None
},
'messages': []
}
# 处理first-entry和last-entry
if stream_info.get('first-entry'):
first_entry = stream_info['first-entry']
decoded_stream['info']['first_entry'] = {
'id': first_entry[0].decode('utf-8'),
'fields': {first_entry[1][i].decode('utf-8'): first_entry[1][i+1].decode('utf-8')
for i in range(0, len(first_entry[1]), 2)}
}
if stream_info.get('last-entry'):
last_entry = stream_info['last-entry']
decoded_stream['info']['last_entry'] = {
'id': last_entry[0].decode('utf-8'),
'fields': {last_entry[1][i].decode('utf-8'): last_entry[1][i+1].decode('utf-8')
for i in range(0, len(last_entry[1]), 2)}
}
# 处理消息列表
for message in stream_messages:
message_id = message[0].decode('utf-8')
message_fields = message[1]
decoded_message = {
'id': message_id,
'fields': {}
}
# 解析消息字段
for i in range(0, len(message_fields), 2):
try:
field_name = message_fields[i].decode('utf-8')
field_value = message_fields[i+1].decode('utf-8')
decoded_message['fields'][field_name] = field_value
except (IndexError, UnicodeDecodeError):
continue
decoded_stream['messages'].append(decoded_message)
result['value'] = decoded_stream
result['display_value'] = json.dumps(decoded_stream, indent=2, ensure_ascii=False)
except Exception as stream_error:
logger.warning(f"获取Stream详细信息失败 {key}: {stream_error}")
# 如果详细获取失败,至少获取基本信息
try:
stream_length = redis_client.xlen(key)
result['value'] = {'length': stream_length, 'messages': []}
result['display_value'] = f"Stream (length: {stream_length} messages)"
except:
result['value'] = "Stream data (unable to read details)"
result['display_value'] = "Stream data (unable to read details)"
else:
# 未知类型
result['value'] = f"<unsupported type: {key_type}>"
result['display_value'] = f"<unsupported type: {key_type}>"
return result
except Exception as e:
logger.error(f"获取Redis键值失败 {key}: {e}")
return {
'type': 'error',
'value': None,
'display_value': f"<error: {str(e)}>",
'exists': False
}
def compare_redis_values(value1_info, value2_info):
"""
比较两个Redis值
Args:
value1_info: 第一个值的信息字典
value2_info: 第二个值的信息字典
Returns:
dict: 比较结果
"""
# 检查存在性
if not value1_info['exists'] and not value2_info['exists']:
return {
'status': 'both_missing',
'message': '两个集群都不存在此键'
}
elif not value1_info['exists']:
return {
'status': 'missing_in_cluster1',
'message': '集群1中不存在此键'
}
elif not value2_info['exists']:
return {
'status': 'missing_in_cluster2',
'message': '集群2中不存在此键'
}
# 检查类型
type1 = value1_info['type']
type2 = value2_info['type']
if type1 != type2:
return {
'status': 'different',
'message': f'数据类型不同: {type1} vs {type2}'
}
# 比较值
value1 = value1_info['value']
value2 = value2_info['value']
if type1 in ['string', 'json_string']:
# 字符串比较
if value1 == value2:
return {'status': 'identical', 'message': '值相同'}
else:
return {'status': 'different', 'message': '值不同'}
elif type1 == 'hash':
# Hash比较
if value1 == value2:
return {'status': 'identical', 'message': '哈希值相同'}
else:
# 详细比较哈希字段
keys1 = set(value1.keys())
keys2 = set(value2.keys())
if keys1 != keys2:
return {'status': 'different', 'message': f'哈希字段不同: {keys1 - keys2} vs {keys2 - keys1}'}
diff_fields = []
for key in keys1:
if value1[key] != value2[key]:
diff_fields.append(key)
if diff_fields:
return {'status': 'different', 'message': f'哈希字段值不同: {diff_fields}'}
else:
return {'status': 'identical', 'message': '哈希值相同'}
elif type1 == 'list':
# List比较顺序敏感
if value1 == value2:
return {'status': 'identical', 'message': '列表相同'}
else:
return {'status': 'different', 'message': f'列表不同,长度: {len(value1)} vs {len(value2)}'}
elif type1 == 'set':
# Set比较顺序无关
if set(value1) == set(value2):
return {'status': 'identical', 'message': '集合相同'}
else:
return {'status': 'different', 'message': f'集合不同,大小: {len(value1)} vs {len(value2)}'}
elif type1 == 'zset':
# Sorted Set比较
if value1 == value2:
return {'status': 'identical', 'message': '有序集合相同'}
else:
return {'status': 'different', 'message': f'有序集合不同,大小: {len(value1)} vs {len(value2)}'}
elif type1 == 'stream':
# Stream比较
if value1 == value2:
return {'status': 'identical', 'message': 'Stream完全相同'}
else:
# 详细比较Stream
if isinstance(value1, dict) and isinstance(value2, dict):
# 比较Stream基本信息
info1 = value1.get('info', {})
info2 = value2.get('info', {})
if info1.get('length', 0) != info2.get('length', 0):
return {
'status': 'different',
'message': f'Stream长度不同: {info1.get("length", 0)} vs {info2.get("length", 0)}'
}
# 比较最后生成的ID
if info1.get('last_generated_id') != info2.get('last_generated_id'):
return {
'status': 'different',
'message': f'Stream最后ID不同: {info1.get("last_generated_id")} vs {info2.get("last_generated_id")}'
}
# 比较消息内容
messages1 = value1.get('messages', [])
messages2 = value2.get('messages', [])
if len(messages1) != len(messages2):
return {
'status': 'different',
'message': f'Stream消息数量不同: {len(messages1)} vs {len(messages2)}'
}
# 比较具体消息
for i, (msg1, msg2) in enumerate(zip(messages1, messages2)):
if msg1.get('id') != msg2.get('id'):
return {
'status': 'different',
'message': f'Stream消息ID不同 (第{i+1}条): {msg1.get("id")} vs {msg2.get("id")}'
}
if msg1.get('fields') != msg2.get('fields'):
return {
'status': 'different',
'message': f'Stream消息内容不同 (第{i+1}条消息)'
}
return {'status': 'identical', 'message': 'Stream数据相同'}
else:
return {'status': 'different', 'message': 'Stream数据格式不同'}
else:
# 其他类型的通用比较
if value1 == value2:
return {'status': 'identical', 'message': '值相同'}
else:
return {'status': 'different', 'message': '值不同'}
def batch_get_redis_values_with_type(redis_client, keys, cluster_name="Redis集群", performance_tracker=None):
"""
批量获取Redis键值及类型信息
Args:
redis_client: Redis客户端
keys: 键名列表
cluster_name: 集群名称
performance_tracker: 性能追踪器
Returns:
list: 每个键的值信息字典列表
"""
import time
start_time = time.time()
results = []
logger.info(f"开始从{cluster_name}批量获取 {len(keys)} 个键的详细信息")
try:
for key in keys:
key_info = get_redis_value_with_type(redis_client, key)
results.append(key_info)
end_time = time.time()
duration = end_time - start_time
if performance_tracker:
performance_tracker.record_query(f"{cluster_name}_detailed_query", duration)
successful_count = sum(1 for r in results if r['exists'])
logger.info(f"{cluster_name}详细查询完成,成功获取 {successful_count}/{len(keys)} 个值,耗时 {duration:.3f}")
return results
except Exception as e:
logger.error(f"{cluster_name}批量详细查询失败: {e}")
# 返回错误占位符
return [{'type': 'error', 'value': None, 'display_value': f'<error: {e}>', 'exists': False} for _ in keys]