167 lines
6.0 KiB
Python
167 lines
6.0 KiB
Python
"""
|
||
TWCS分表计算引擎模块
|
||
===================
|
||
|
||
本模块实现基于TWCS(Time Window Compaction Strategy)策略的时间分表计算功能。
|
||
|
||
核心功能:
|
||
1. 时间戳提取:从Key中智能提取时间戳信息
|
||
2. 分表索引计算:基于时间窗口计算目标分表索引
|
||
3. 分表映射:将大批量Key映射到对应的分表
|
||
4. 统计分析:提供分表计算的详细统计信息
|
||
|
||
TWCS分表策略:
|
||
- 时间窗口:可配置的时间间隔(默认7天)
|
||
- 分表数量:可配置的分表总数(默认14张)
|
||
- 计算公式:timestamp // interval_seconds % table_count
|
||
- 表命名:base_table_name + "_" + shard_index
|
||
|
||
时间戳提取算法:
|
||
- 优先规则:提取Key中最后一个下划线后的数字
|
||
- 备用规则:提取Key中最长的数字序列
|
||
- 容错处理:无法提取时记录到失败列表
|
||
- 格式支持:支持各种Key格式的时间戳提取
|
||
|
||
应用场景:
|
||
- 大数据表的时间分片:按时间窗口将数据分散到多张表
|
||
- 查询性能优化:减少单表数据量,提高查询效率
|
||
- 数据生命周期管理:支持按时间窗口的数据清理
|
||
- 负载均衡:将查询负载分散到多张表
|
||
|
||
性能特点:
|
||
- 批量计算:支持大批量Key的高效分表计算
|
||
- 内存友好:使用生成器和迭代器优化内存使用
|
||
- 统计完整:提供详细的计算成功率和分布统计
|
||
- 错误容错:单个Key计算失败不影响整体处理
|
||
|
||
作者:BigDataTool项目组
|
||
更新时间:2024年8月
|
||
"""
|
||
|
||
import re
|
||
import logging
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
class ShardingCalculator:
|
||
"""
|
||
TWCS分表计算器
|
||
|
||
基于Time Window Compaction Strategy实现的智能分表计算器,
|
||
用于将时间相关的Key映射到对应的时间窗口分表。
|
||
|
||
主要特性:
|
||
- 时间窗口分片:按配置的时间间隔进行分表
|
||
- 智能时间戳提取:支持多种Key格式的时间戳解析
|
||
- 负载均衡:通过取模运算实现分表间的负载均衡
|
||
- 批量处理:高效处理大批量Key的分表映射
|
||
|
||
适用场景:
|
||
- 时序数据的分表存储
|
||
- 大数据表的性能优化
|
||
- 数据生命周期管理
|
||
- 查询负载分散
|
||
"""
|
||
|
||
def __init__(self, interval_seconds=604800, table_count=14):
|
||
"""
|
||
初始化分表计算器
|
||
:param interval_seconds: 时间间隔(秒),默认604800(7天)
|
||
:param table_count: 分表数量,默认14
|
||
"""
|
||
self.interval_seconds = interval_seconds
|
||
self.table_count = table_count
|
||
|
||
def extract_timestamp_from_key(self, key):
|
||
"""
|
||
从Key中提取时间戳
|
||
新规则:优先提取最后一个下划线后的数字,如果没有下划线则提取最后连续的数字部分
|
||
"""
|
||
if not key:
|
||
return None
|
||
|
||
key_str = str(key)
|
||
|
||
# 方法1:如果包含下划线,尝试提取最后一个下划线后的部分
|
||
if '_' in key_str:
|
||
parts = key_str.split('_')
|
||
last_part = parts[-1]
|
||
# 检查最后一部分是否为纯数字
|
||
if last_part.isdigit():
|
||
timestamp = int(last_part)
|
||
logger.info(f"Key '{key}' 通过下划线分割提取到时间戳: {timestamp}")
|
||
return timestamp
|
||
|
||
# 方法2:使用正则表达式找到所有数字序列,取最后一个较长的
|
||
number_sequences = re.findall(r'\d+', key_str)
|
||
|
||
if not number_sequences:
|
||
logger.warning(f"Key '{key}' 中没有找到数字字符")
|
||
return None
|
||
|
||
# 如果有多个数字序列,优先选择最长的,如果长度相同则选择最后一个
|
||
longest_sequence = max(number_sequences, key=len)
|
||
|
||
# 如果最长的有多个,选择最后一个最长的
|
||
max_length = len(longest_sequence)
|
||
last_longest = None
|
||
for seq in number_sequences:
|
||
if len(seq) == max_length:
|
||
last_longest = seq
|
||
|
||
try:
|
||
timestamp = int(last_longest)
|
||
logger.info(f"Key '{key}' 通过数字序列提取到时间戳: {timestamp} (从序列 {number_sequences} 中选择)")
|
||
return timestamp
|
||
except ValueError:
|
||
logger.error(f"Key '{key}' 时间戳转换失败: {last_longest}")
|
||
return None
|
||
|
||
def calculate_shard_index(self, timestamp):
|
||
"""
|
||
计算分表索引
|
||
公式:timestamp // interval_seconds % table_count
|
||
"""
|
||
if timestamp is None:
|
||
return None
|
||
return int(timestamp) // self.interval_seconds % self.table_count
|
||
|
||
def get_shard_table_name(self, base_table_name, key):
|
||
"""
|
||
根据Key获取对应的分表名称
|
||
"""
|
||
timestamp = self.extract_timestamp_from_key(key)
|
||
if timestamp is None:
|
||
return None
|
||
|
||
shard_index = self.calculate_shard_index(timestamp)
|
||
return f"{base_table_name}_{shard_index}"
|
||
|
||
def get_all_shard_tables_for_keys(self, base_table_name, keys):
|
||
"""
|
||
为一批Keys计算所有需要查询的分表
|
||
返回: {shard_table_name: [keys_for_this_shard], ...}
|
||
"""
|
||
shard_mapping = {}
|
||
failed_keys = []
|
||
calculation_stats = {
|
||
'total_keys': len(keys),
|
||
'successful_extractions': 0,
|
||
'failed_extractions': 0,
|
||
'unique_shards': 0
|
||
}
|
||
|
||
for key in keys:
|
||
shard_table = self.get_shard_table_name(base_table_name, key)
|
||
if shard_table:
|
||
if shard_table not in shard_mapping:
|
||
shard_mapping[shard_table] = []
|
||
shard_mapping[shard_table].append(key)
|
||
calculation_stats['successful_extractions'] += 1
|
||
else:
|
||
failed_keys.append(key)
|
||
calculation_stats['failed_extractions'] += 1
|
||
|
||
calculation_stats['unique_shards'] = len(shard_mapping)
|
||
|
||
return shard_mapping, failed_keys, calculation_stats |