460 lines
18 KiB
Python
460 lines
18 KiB
Python
"""
|
||
Redis数据类型支持增强模块
|
||
================================
|
||
|
||
本模块提供对Redis所有主要数据类型的完整支持,包括:
|
||
- String类型(包括JSON字符串的智能检测和格式化)
|
||
- Hash类型(键值对映射)
|
||
- List类型(有序列表)
|
||
- Set类型(无序集合)
|
||
- ZSet类型(有序集合,带分数)
|
||
- Stream类型(消息流,完整支持消息解析和比较)
|
||
|
||
主要功能:
|
||
1. get_redis_value_with_type() - 获取任意类型的Redis键值
|
||
2. compare_redis_values() - 智能比较不同数据类型的值
|
||
3. batch_get_redis_values_with_type() - 批量获取键值信息
|
||
|
||
设计特点:
|
||
- 类型安全:自动检测并处理每种Redis数据类型
|
||
- 编码处理:完善的UTF-8解码和二进制数据处理
|
||
- JSON支持:智能识别和格式化JSON字符串
|
||
- Stream支持:完整的Stream消息结构解析和比较
|
||
- 错误处理:优雅处理连接错误和数据异常
|
||
|
||
作者:BigDataTool项目组
|
||
更新时间:2024年8月
|
||
"""
|
||
|
||
import json
|
||
import logging
|
||
from redis.exceptions import RedisError
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
def get_redis_value_with_type(redis_client, key):
|
||
"""
|
||
获取Redis键值及其数据类型的完整信息
|
||
|
||
这是本模块的核心函数,支持所有Redis数据类型的获取和解析。
|
||
它会自动检测键的类型,然后使用相应的Redis命令获取数据,
|
||
并进行适当的格式化处理。
|
||
|
||
Args:
|
||
redis_client: Redis客户端连接对象
|
||
key (str): 要查询的Redis键名
|
||
|
||
Returns:
|
||
dict: 包含以下字段的字典
|
||
- 'type' (str): Redis数据类型 ('string', 'hash', 'list', 'set', 'zset', 'stream')
|
||
- 'value': 解析后的原始值(Python对象)
|
||
- 'display_value' (str): 格式化后用于显示的字符串
|
||
- 'exists' (bool): 键是否存在
|
||
|
||
支持的数据类型处理:
|
||
- String: 自动检测JSON格式,支持二进制数据
|
||
- Hash: 完整的字段映射,UTF-8解码
|
||
- List: 有序列表,保持原始顺序
|
||
- Set: 无序集合,自动排序便于比较
|
||
- ZSet: 有序集合,包含成员和分数
|
||
- Stream: 完整的消息流解析,包含元数据和消息内容
|
||
|
||
异常处理:
|
||
- 连接异常:返回错误状态
|
||
- 编码异常:标记为二进制数据
|
||
- 数据异常:记录警告并提供基本信息
|
||
|
||
示例:
|
||
>>> result = get_redis_value_with_type(client, "user:example")
|
||
>>> print(result['type']) # 'string'
|
||
>>> print(result['value']) # 'John Doe'
|
||
>>> print(result['exists']) # True
|
||
"""
|
||
try:
|
||
# 检查key是否存在
|
||
if not redis_client.exists(key):
|
||
return {
|
||
'type': None,
|
||
'value': None,
|
||
'display_value': None,
|
||
'exists': False
|
||
}
|
||
|
||
# 获取数据类型
|
||
key_type = redis_client.type(key).decode('utf-8')
|
||
result = {
|
||
'type': key_type,
|
||
'exists': True
|
||
}
|
||
|
||
if key_type == 'string':
|
||
# String类型处理 - 支持普通字符串和JSON字符串的智能识别
|
||
value = redis_client.get(key)
|
||
if value:
|
||
try:
|
||
# 尝试UTF-8解码
|
||
str_value = value.decode('utf-8')
|
||
result['value'] = str_value
|
||
|
||
# 智能检测JSON格式并格式化显示
|
||
try:
|
||
json_value = json.loads(str_value)
|
||
result['display_value'] = json.dumps(json_value, indent=2, ensure_ascii=False)
|
||
result['type'] = 'json_string' # 标记为JSON字符串类型
|
||
except json.JSONDecodeError:
|
||
# 不是JSON格式,直接显示字符串内容
|
||
result['display_value'] = str_value
|
||
|
||
except UnicodeDecodeError:
|
||
# 处理二进制数据 - 无法UTF-8解码的数据
|
||
result['value'] = value
|
||
result['display_value'] = f"<binary data: {len(value)} bytes>"
|
||
else:
|
||
# 空字符串处理
|
||
result['value'] = ""
|
||
result['display_value'] = ""
|
||
|
||
elif key_type == 'hash':
|
||
# Hash类型
|
||
hash_data = redis_client.hgetall(key)
|
||
decoded_hash = {}
|
||
|
||
for field, value in hash_data.items():
|
||
try:
|
||
decoded_field = field.decode('utf-8')
|
||
decoded_value = value.decode('utf-8')
|
||
decoded_hash[decoded_field] = decoded_value
|
||
except UnicodeDecodeError:
|
||
decoded_hash[str(field)] = f"<binary: {len(value)} bytes>"
|
||
|
||
result['value'] = decoded_hash
|
||
result['display_value'] = json.dumps(decoded_hash, indent=2, ensure_ascii=False)
|
||
|
||
elif key_type == 'list':
|
||
# List类型
|
||
list_data = redis_client.lrange(key, 0, -1)
|
||
decoded_list = []
|
||
|
||
for item in list_data:
|
||
try:
|
||
decoded_item = item.decode('utf-8')
|
||
decoded_list.append(decoded_item)
|
||
except UnicodeDecodeError:
|
||
decoded_list.append(f"<binary: {len(item)} bytes>")
|
||
|
||
result['value'] = decoded_list
|
||
result['display_value'] = json.dumps(decoded_list, indent=2, ensure_ascii=False)
|
||
|
||
elif key_type == 'set':
|
||
# Set类型
|
||
set_data = redis_client.smembers(key)
|
||
decoded_set = []
|
||
|
||
for item in set_data:
|
||
try:
|
||
decoded_item = item.decode('utf-8')
|
||
decoded_set.append(decoded_item)
|
||
except UnicodeDecodeError:
|
||
decoded_set.append(f"<binary: {len(item)} bytes>")
|
||
|
||
# 排序以便比较
|
||
decoded_set.sort()
|
||
result['value'] = decoded_set
|
||
result['display_value'] = json.dumps(decoded_set, indent=2, ensure_ascii=False)
|
||
|
||
elif key_type == 'zset':
|
||
# Sorted Set类型
|
||
zset_data = redis_client.zrange(key, 0, -1, withscores=True)
|
||
decoded_zset = []
|
||
|
||
for member, score in zset_data:
|
||
try:
|
||
decoded_member = member.decode('utf-8')
|
||
decoded_zset.append([decoded_member, score])
|
||
except UnicodeDecodeError:
|
||
decoded_zset.append([f"<binary: {len(member)} bytes>", score])
|
||
|
||
result['value'] = decoded_zset
|
||
result['display_value'] = json.dumps(decoded_zset, indent=2, ensure_ascii=False)
|
||
|
||
elif key_type == 'stream':
|
||
# Stream类型
|
||
try:
|
||
# 获取Stream信息
|
||
stream_info = redis_client.xinfo_stream(key)
|
||
|
||
# 获取Stream中的消息(最多获取100条最新消息)
|
||
stream_messages = redis_client.xrange(key, count=100)
|
||
|
||
# 解析Stream数据
|
||
decoded_stream = {
|
||
'info': {
|
||
'length': stream_info.get('length', 0),
|
||
'radix_tree_keys': stream_info.get('radix-tree-keys', 0),
|
||
'radix_tree_nodes': stream_info.get('radix-tree-nodes', 0),
|
||
'last_generated_id': stream_info.get('last-generated-id', '').decode('utf-8') if stream_info.get('last-generated-id') else '',
|
||
'first_entry': None,
|
||
'last_entry': None
|
||
},
|
||
'messages': []
|
||
}
|
||
|
||
# 处理first-entry和last-entry
|
||
if stream_info.get('first-entry'):
|
||
first_entry = stream_info['first-entry']
|
||
decoded_stream['info']['first_entry'] = {
|
||
'id': first_entry[0].decode('utf-8'),
|
||
'fields': {first_entry[1][i].decode('utf-8'): first_entry[1][i+1].decode('utf-8')
|
||
for i in range(0, len(first_entry[1]), 2)}
|
||
}
|
||
|
||
if stream_info.get('last-entry'):
|
||
last_entry = stream_info['last-entry']
|
||
decoded_stream['info']['last_entry'] = {
|
||
'id': last_entry[0].decode('utf-8'),
|
||
'fields': {last_entry[1][i].decode('utf-8'): last_entry[1][i+1].decode('utf-8')
|
||
for i in range(0, len(last_entry[1]), 2)}
|
||
}
|
||
|
||
# 处理消息列表
|
||
for message in stream_messages:
|
||
message_id = message[0].decode('utf-8')
|
||
message_fields = message[1]
|
||
|
||
decoded_message = {
|
||
'id': message_id,
|
||
'fields': {}
|
||
}
|
||
|
||
# 解析消息字段
|
||
for i in range(0, len(message_fields), 2):
|
||
try:
|
||
field_name = message_fields[i].decode('utf-8')
|
||
field_value = message_fields[i+1].decode('utf-8')
|
||
decoded_message['fields'][field_name] = field_value
|
||
except (IndexError, UnicodeDecodeError):
|
||
continue
|
||
|
||
decoded_stream['messages'].append(decoded_message)
|
||
|
||
result['value'] = decoded_stream
|
||
result['display_value'] = json.dumps(decoded_stream, indent=2, ensure_ascii=False)
|
||
|
||
except Exception as stream_error:
|
||
logger.warning(f"获取Stream详细信息失败 {key}: {stream_error}")
|
||
# 如果详细获取失败,至少获取基本信息
|
||
try:
|
||
stream_length = redis_client.xlen(key)
|
||
result['value'] = {'length': stream_length, 'messages': []}
|
||
result['display_value'] = f"Stream (length: {stream_length} messages)"
|
||
except:
|
||
result['value'] = "Stream data (unable to read details)"
|
||
result['display_value'] = "Stream data (unable to read details)"
|
||
|
||
else:
|
||
# 未知类型
|
||
result['value'] = f"<unsupported type: {key_type}>"
|
||
result['display_value'] = f"<unsupported type: {key_type}>"
|
||
|
||
return result
|
||
|
||
except Exception as e:
|
||
logger.error(f"获取Redis键值失败 {key}: {e}")
|
||
return {
|
||
'type': 'error',
|
||
'value': None,
|
||
'display_value': f"<error: {str(e)}>",
|
||
'exists': False
|
||
}
|
||
|
||
def compare_redis_values(value1_info, value2_info):
|
||
"""
|
||
比较两个Redis值
|
||
|
||
Args:
|
||
value1_info: 第一个值的信息字典
|
||
value2_info: 第二个值的信息字典
|
||
|
||
Returns:
|
||
dict: 比较结果
|
||
"""
|
||
# 检查存在性
|
||
if not value1_info['exists'] and not value2_info['exists']:
|
||
return {
|
||
'status': 'both_missing',
|
||
'message': '两个集群都不存在此键'
|
||
}
|
||
elif not value1_info['exists']:
|
||
return {
|
||
'status': 'missing_in_cluster1',
|
||
'message': '集群1中不存在此键'
|
||
}
|
||
elif not value2_info['exists']:
|
||
return {
|
||
'status': 'missing_in_cluster2',
|
||
'message': '集群2中不存在此键'
|
||
}
|
||
|
||
# 检查类型
|
||
type1 = value1_info['type']
|
||
type2 = value2_info['type']
|
||
|
||
if type1 != type2:
|
||
return {
|
||
'status': 'different',
|
||
'message': f'数据类型不同: {type1} vs {type2}'
|
||
}
|
||
|
||
# 比较值
|
||
value1 = value1_info['value']
|
||
value2 = value2_info['value']
|
||
|
||
if type1 in ['string', 'json_string']:
|
||
# 字符串比较
|
||
if value1 == value2:
|
||
return {'status': 'identical', 'message': '值相同'}
|
||
else:
|
||
return {'status': 'different', 'message': '值不同'}
|
||
|
||
elif type1 == 'hash':
|
||
# Hash比较
|
||
if value1 == value2:
|
||
return {'status': 'identical', 'message': '哈希值相同'}
|
||
else:
|
||
# 详细比较哈希字段
|
||
keys1 = set(value1.keys())
|
||
keys2 = set(value2.keys())
|
||
|
||
if keys1 != keys2:
|
||
return {'status': 'different', 'message': f'哈希字段不同: {keys1 - keys2} vs {keys2 - keys1}'}
|
||
|
||
diff_fields = []
|
||
for key in keys1:
|
||
if value1[key] != value2[key]:
|
||
diff_fields.append(key)
|
||
|
||
if diff_fields:
|
||
return {'status': 'different', 'message': f'哈希字段值不同: {diff_fields}'}
|
||
else:
|
||
return {'status': 'identical', 'message': '哈希值相同'}
|
||
|
||
elif type1 == 'list':
|
||
# List比较(顺序敏感)
|
||
if value1 == value2:
|
||
return {'status': 'identical', 'message': '列表相同'}
|
||
else:
|
||
return {'status': 'different', 'message': f'列表不同,长度: {len(value1)} vs {len(value2)}'}
|
||
|
||
elif type1 == 'set':
|
||
# Set比较(顺序无关)
|
||
if set(value1) == set(value2):
|
||
return {'status': 'identical', 'message': '集合相同'}
|
||
else:
|
||
return {'status': 'different', 'message': f'集合不同,大小: {len(value1)} vs {len(value2)}'}
|
||
|
||
elif type1 == 'zset':
|
||
# Sorted Set比较
|
||
if value1 == value2:
|
||
return {'status': 'identical', 'message': '有序集合相同'}
|
||
else:
|
||
return {'status': 'different', 'message': f'有序集合不同,大小: {len(value1)} vs {len(value2)}'}
|
||
|
||
elif type1 == 'stream':
|
||
# Stream比较
|
||
if value1 == value2:
|
||
return {'status': 'identical', 'message': 'Stream完全相同'}
|
||
else:
|
||
# 详细比较Stream
|
||
if isinstance(value1, dict) and isinstance(value2, dict):
|
||
# 比较Stream基本信息
|
||
info1 = value1.get('info', {})
|
||
info2 = value2.get('info', {})
|
||
|
||
if info1.get('length', 0) != info2.get('length', 0):
|
||
return {
|
||
'status': 'different',
|
||
'message': f'Stream长度不同: {info1.get("length", 0)} vs {info2.get("length", 0)}'
|
||
}
|
||
|
||
# 比较最后生成的ID
|
||
if info1.get('last_generated_id') != info2.get('last_generated_id'):
|
||
return {
|
||
'status': 'different',
|
||
'message': f'Stream最后ID不同: {info1.get("last_generated_id")} vs {info2.get("last_generated_id")}'
|
||
}
|
||
|
||
# 比较消息内容
|
||
messages1 = value1.get('messages', [])
|
||
messages2 = value2.get('messages', [])
|
||
|
||
if len(messages1) != len(messages2):
|
||
return {
|
||
'status': 'different',
|
||
'message': f'Stream消息数量不同: {len(messages1)} vs {len(messages2)}'
|
||
}
|
||
|
||
# 比较具体消息
|
||
for i, (msg1, msg2) in enumerate(zip(messages1, messages2)):
|
||
if msg1.get('id') != msg2.get('id'):
|
||
return {
|
||
'status': 'different',
|
||
'message': f'Stream消息ID不同 (第{i+1}条): {msg1.get("id")} vs {msg2.get("id")}'
|
||
}
|
||
|
||
if msg1.get('fields') != msg2.get('fields'):
|
||
return {
|
||
'status': 'different',
|
||
'message': f'Stream消息内容不同 (第{i+1}条消息)'
|
||
}
|
||
|
||
return {'status': 'identical', 'message': 'Stream数据相同'}
|
||
else:
|
||
return {'status': 'different', 'message': 'Stream数据格式不同'}
|
||
|
||
else:
|
||
# 其他类型的通用比较
|
||
if value1 == value2:
|
||
return {'status': 'identical', 'message': '值相同'}
|
||
else:
|
||
return {'status': 'different', 'message': '值不同'}
|
||
|
||
def batch_get_redis_values_with_type(redis_client, keys, cluster_name="Redis集群", performance_tracker=None):
|
||
"""
|
||
批量获取Redis键值及类型信息
|
||
|
||
Args:
|
||
redis_client: Redis客户端
|
||
keys: 键名列表
|
||
cluster_name: 集群名称
|
||
performance_tracker: 性能追踪器
|
||
|
||
Returns:
|
||
list: 每个键的值信息字典列表
|
||
"""
|
||
import time
|
||
|
||
start_time = time.time()
|
||
results = []
|
||
|
||
logger.info(f"开始从{cluster_name}批量获取 {len(keys)} 个键的详细信息")
|
||
|
||
try:
|
||
for key in keys:
|
||
key_info = get_redis_value_with_type(redis_client, key)
|
||
results.append(key_info)
|
||
|
||
end_time = time.time()
|
||
duration = end_time - start_time
|
||
|
||
if performance_tracker:
|
||
performance_tracker.record_query(f"{cluster_name}_detailed_query", duration)
|
||
|
||
successful_count = sum(1 for r in results if r['exists'])
|
||
logger.info(f"从{cluster_name}详细查询完成,成功获取 {successful_count}/{len(keys)} 个值,耗时 {duration:.3f} 秒")
|
||
|
||
return results
|
||
|
||
except Exception as e:
|
||
logger.error(f"从{cluster_name}批量详细查询失败: {e}")
|
||
# 返回错误占位符
|
||
return [{'type': 'error', 'value': None, 'display_value': f'<error: {e}>', 'exists': False} for _ in keys] |