Files
BigDataTool/modules/sharding.py
2025-08-05 11:23:49 +08:00

167 lines
6.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
TWCS分表计算引擎模块
===================
本模块实现基于TWCSTime Window Compaction Strategy策略的时间分表计算功能。
核心功能:
1. 时间戳提取从Key中智能提取时间戳信息
2. 分表索引计算:基于时间窗口计算目标分表索引
3. 分表映射将大批量Key映射到对应的分表
4. 统计分析:提供分表计算的详细统计信息
TWCS分表策略
- 时间窗口可配置的时间间隔默认7天
- 分表数量可配置的分表总数默认14张
- 计算公式timestamp // interval_seconds % table_count
- 表命名base_table_name + "_" + shard_index
时间戳提取算法:
- 优先规则提取Key中最后一个下划线后的数字
- 备用规则提取Key中最长的数字序列
- 容错处理:无法提取时记录到失败列表
- 格式支持支持各种Key格式的时间戳提取
应用场景:
- 大数据表的时间分片:按时间窗口将数据分散到多张表
- 查询性能优化:减少单表数据量,提高查询效率
- 数据生命周期管理:支持按时间窗口的数据清理
- 负载均衡:将查询负载分散到多张表
性能特点:
- 批量计算支持大批量Key的高效分表计算
- 内存友好:使用生成器和迭代器优化内存使用
- 统计完整:提供详细的计算成功率和分布统计
- 错误容错单个Key计算失败不影响整体处理
作者BigDataTool项目组
更新时间2024年8月
"""
import re
import logging
logger = logging.getLogger(__name__)
class ShardingCalculator:
"""
TWCS分表计算器
基于Time Window Compaction Strategy实现的智能分表计算器
用于将时间相关的Key映射到对应的时间窗口分表。
主要特性:
- 时间窗口分片:按配置的时间间隔进行分表
- 智能时间戳提取支持多种Key格式的时间戳解析
- 负载均衡:通过取模运算实现分表间的负载均衡
- 批量处理高效处理大批量Key的分表映射
适用场景:
- 时序数据的分表存储
- 大数据表的性能优化
- 数据生命周期管理
- 查询负载分散
"""
def __init__(self, interval_seconds=604800, table_count=14):
"""
初始化分表计算器
:param interval_seconds: 时间间隔(秒)默认604800(7天)
:param table_count: 分表数量默认14
"""
self.interval_seconds = interval_seconds
self.table_count = table_count
def extract_timestamp_from_key(self, key):
"""
从Key中提取时间戳
新规则:优先提取最后一个下划线后的数字,如果没有下划线则提取最后连续的数字部分
"""
if not key:
return None
key_str = str(key)
# 方法1如果包含下划线尝试提取最后一个下划线后的部分
if '_' in key_str:
parts = key_str.split('_')
last_part = parts[-1]
# 检查最后一部分是否为纯数字
if last_part.isdigit():
timestamp = int(last_part)
logger.info(f"Key '{key}' 通过下划线分割提取到时间戳: {timestamp}")
return timestamp
# 方法2使用正则表达式找到所有数字序列取最后一个较长的
number_sequences = re.findall(r'\d+', key_str)
if not number_sequences:
logger.warning(f"Key '{key}' 中没有找到数字字符")
return None
# 如果有多个数字序列,优先选择最长的,如果长度相同则选择最后一个
longest_sequence = max(number_sequences, key=len)
# 如果最长的有多个,选择最后一个最长的
max_length = len(longest_sequence)
last_longest = None
for seq in number_sequences:
if len(seq) == max_length:
last_longest = seq
try:
timestamp = int(last_longest)
logger.info(f"Key '{key}' 通过数字序列提取到时间戳: {timestamp} (从序列 {number_sequences} 中选择)")
return timestamp
except ValueError:
logger.error(f"Key '{key}' 时间戳转换失败: {last_longest}")
return None
def calculate_shard_index(self, timestamp):
"""
计算分表索引
公式timestamp // interval_seconds % table_count
"""
if timestamp is None:
return None
return int(timestamp) // self.interval_seconds % self.table_count
def get_shard_table_name(self, base_table_name, key):
"""
根据Key获取对应的分表名称
"""
timestamp = self.extract_timestamp_from_key(key)
if timestamp is None:
return None
shard_index = self.calculate_shard_index(timestamp)
return f"{base_table_name}_{shard_index}"
def get_all_shard_tables_for_keys(self, base_table_name, keys):
"""
为一批Keys计算所有需要查询的分表
返回: {shard_table_name: [keys_for_this_shard], ...}
"""
shard_mapping = {}
failed_keys = []
calculation_stats = {
'total_keys': len(keys),
'successful_extractions': 0,
'failed_extractions': 0,
'unique_shards': 0
}
for key in keys:
shard_table = self.get_shard_table_name(base_table_name, key)
if shard_table:
if shard_table not in shard_mapping:
shard_mapping[shard_table] = []
shard_mapping[shard_table].append(key)
calculation_stats['successful_extractions'] += 1
else:
failed_keys.append(key)
calculation_stats['failed_extractions'] += 1
calculation_stats['unique_shards'] = len(shard_mapping)
return shard_mapping, failed_keys, calculation_stats