""" TWCS分表计算引擎模块 =================== 本模块实现基于TWCS(Time Window Compaction Strategy)策略的时间分表计算功能。 核心功能: 1. 时间戳提取:从Key中智能提取时间戳信息 2. 分表索引计算:基于时间窗口计算目标分表索引 3. 分表映射:将大批量Key映射到对应的分表 4. 统计分析:提供分表计算的详细统计信息 TWCS分表策略: - 时间窗口:可配置的时间间隔(默认7天) - 分表数量:可配置的分表总数(默认14张) - 计算公式:timestamp // interval_seconds % table_count - 表命名:base_table_name + "_" + shard_index 时间戳提取算法: - 优先规则:提取Key中最后一个下划线后的数字 - 备用规则:提取Key中最长的数字序列 - 容错处理:无法提取时记录到失败列表 - 格式支持:支持各种Key格式的时间戳提取 应用场景: - 大数据表的时间分片:按时间窗口将数据分散到多张表 - 查询性能优化:减少单表数据量,提高查询效率 - 数据生命周期管理:支持按时间窗口的数据清理 - 负载均衡:将查询负载分散到多张表 性能特点: - 批量计算:支持大批量Key的高效分表计算 - 内存友好:使用生成器和迭代器优化内存使用 - 统计完整:提供详细的计算成功率和分布统计 - 错误容错:单个Key计算失败不影响整体处理 作者:BigDataTool项目组 更新时间:2024年8月 """ import re import logging logger = logging.getLogger(__name__) class ShardingCalculator: """ TWCS分表计算器 基于Time Window Compaction Strategy实现的智能分表计算器, 用于将时间相关的Key映射到对应的时间窗口分表。 主要特性: - 时间窗口分片:按配置的时间间隔进行分表 - 智能时间戳提取:支持多种Key格式的时间戳解析 - 负载均衡:通过取模运算实现分表间的负载均衡 - 批量处理:高效处理大批量Key的分表映射 适用场景: - 时序数据的分表存储 - 大数据表的性能优化 - 数据生命周期管理 - 查询负载分散 """ def __init__(self, interval_seconds=604800, table_count=14): """ 初始化分表计算器 :param interval_seconds: 时间间隔(秒),默认604800(7天) :param table_count: 分表数量,默认14 """ self.interval_seconds = interval_seconds self.table_count = table_count def extract_timestamp_from_key(self, key): """ 从Key中提取时间戳 新规则:优先提取最后一个下划线后的数字,如果没有下划线则提取最后连续的数字部分 """ if not key: return None key_str = str(key) # 方法1:如果包含下划线,尝试提取最后一个下划线后的部分 if '_' in key_str: parts = key_str.split('_') last_part = parts[-1] # 检查最后一部分是否为纯数字 if last_part.isdigit(): timestamp = int(last_part) logger.info(f"Key '{key}' 通过下划线分割提取到时间戳: {timestamp}") return timestamp # 方法2:使用正则表达式找到所有数字序列,取最后一个较长的 number_sequences = re.findall(r'\d+', key_str) if not number_sequences: logger.warning(f"Key '{key}' 中没有找到数字字符") return None # 如果有多个数字序列,优先选择最长的,如果长度相同则选择最后一个 longest_sequence = max(number_sequences, key=len) # 如果最长的有多个,选择最后一个最长的 max_length = len(longest_sequence) last_longest = None for seq in number_sequences: if len(seq) == max_length: last_longest = seq try: timestamp = int(last_longest) logger.info(f"Key '{key}' 通过数字序列提取到时间戳: {timestamp} (从序列 {number_sequences} 中选择)") return timestamp except ValueError: logger.error(f"Key '{key}' 时间戳转换失败: {last_longest}") return None def calculate_shard_index(self, timestamp): """ 计算分表索引 公式:timestamp // interval_seconds % table_count """ if timestamp is None: return None return int(timestamp) // self.interval_seconds % self.table_count def get_shard_table_name(self, base_table_name, key): """ 根据Key获取对应的分表名称 """ timestamp = self.extract_timestamp_from_key(key) if timestamp is None: return None shard_index = self.calculate_shard_index(timestamp) return f"{base_table_name}_{shard_index}" def get_all_shard_tables_for_keys(self, base_table_name, keys): """ 为一批Keys计算所有需要查询的分表 返回: {shard_table_name: [keys_for_this_shard], ...} """ shard_mapping = {} failed_keys = [] calculation_stats = { 'total_keys': len(keys), 'successful_extractions': 0, 'failed_extractions': 0, 'unique_shards': 0 } for key in keys: shard_table = self.get_shard_table_name(base_table_name, key) if shard_table: if shard_table not in shard_mapping: shard_mapping[shard_table] = [] shard_mapping[shard_table].append(key) calculation_stats['successful_extractions'] += 1 else: failed_keys.append(key) calculation_stats['failed_extractions'] += 1 calculation_stats['unique_shards'] = len(shard_mapping) return shard_mapping, failed_keys, calculation_stats