#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 完整的座位分配系统 包含文件校验、智能分配算法和日志输出功能 支持1-10人连坐需求,能够处理不连续座位 """ import sys import os from pathlib import Path import datetime import pandas as pd import numpy as np def check_dependencies(): """检查并安装必要的依赖包""" required_packages = { 'pandas': 'pandas>=1.3.0', 'numpy': 'numpy>=1.20.0', 'openpyxl': 'openpyxl>=3.0.0' } missing_packages = [] print("检查依赖包...") for package_name, package_spec in required_packages.items(): try: __import__(package_name) print(f"✅ {package_name} 已安装") except ImportError: missing_packages.append(package_spec) print(f"❌ {package_name} 未安装") if missing_packages: print(f"\n发现缺失依赖: {', '.join(missing_packages)}") print("正在尝试自动安装...") try: import subprocess for package in missing_packages: print(f"安装 {package}...") result = subprocess.run([sys.executable, '-m', 'pip', 'install', package], capture_output=True, text=True) if result.returncode == 0: print(f"✅ {package} 安装成功") else: print(f"❌ {package} 安装失败: {result.stderr}") return False except Exception as e: print(f"❌ 自动安装失败: {e}") print("\n请手动安装以下依赖包:") for package in missing_packages: print(f" pip install {package}") input("安装完成后按Enter键继续...") return False return True def auto_detect_files(): """自动检测和识别Excel文件""" print("\n=== 自动文件识别系统 ===") # 扫描当前目录下的所有xlsx文件 current_dir = Path('.') xlsx_files = list(current_dir.glob('*.xlsx')) # 过滤掉输出文件和示例文件 exclude_patterns = ['座位信息_最终分配', '最终座位分配日志', '_示例', '_temp', '_backup'] xlsx_files = [f for f in xlsx_files if not any(pattern in f.name for pattern in exclude_patterns)] print(f"发现 {len(xlsx_files)} 个Excel文件:") for i, file_path in enumerate(xlsx_files, 1): file_size = file_path.stat().st_size / 1024 # KB print(f" {i}. {file_path.name} ({file_size:.1f} KB)") if len(xlsx_files) == 0: print("\n❌ 未找到Excel文件") print("请确保当前目录下有Excel数据文件") return None, None if len(xlsx_files) > 2: print(f"\n⚠️ 发现 {len(xlsx_files)} 个Excel文件,超过2个") print("为避免混淆,请确保目录下只有2个数据文件") print("(程序会自动忽略输出文件和示例文件)") print("\n请手动移除多余文件,或选择要使用的文件:") for i, file_path in enumerate(xlsx_files, 1): print(f" {i}. {file_path.name}") return None, None # 分析文件结构 print(f"\n正在分析文件结构...") file_info = [] for file_path in xlsx_files: try: # 读取文件获取基本信息 df = pd.read_excel(file_path, nrows=5) # 只读前5行进行分析 info = { 'path': file_path, 'name': file_path.name, 'columns': len(df.columns), 'rows': len(df) + 1, # 加上标题行 'column_names': list(df.columns), 'sample_data': df.head(2) # 获取前2行样本数据 } file_info.append(info) print(f"\n📄 {file_path.name}:") print(f" 列数: {info['columns']}") print(f" 行数: {info['rows']}+ (仅检查前5行)") print(f" 列名: {', '.join(str(col) for col in info['column_names'][:6])}{'...' if len(info['column_names']) > 6 else ''}") except Exception as e: print(f"\n❌ 读取文件 {file_path.name} 失败: {e}") return None, None # 根据列数自动识别文件类型 personnel_file = None seat_file = None print(f"\n=== 文件类型识别 ===") for info in file_info: if info['columns'] == 5 or info['columns'] == 6: # 人员信息通常是5-6列 # 进一步检查列名是否符合人员信息格式 col_names = [str(col).lower() for col in info['column_names']] personnel_keywords = ['姓名', 'name', '证件', 'id', '手机', 'phone', '备注', 'remark'] if any(keyword in ' '.join(col_names) for keyword in personnel_keywords): personnel_file = info print(f"✅ {info['name']} -> 识别为人员信息文件 ({info['columns']}列)") else: print(f"⚠️ {info['name']} 有{info['columns']}列,但列名不符合人员信息格式") elif info['columns'] >= 10: # 座位信息通常是10-12列 # 进一步检查列名是否符合座位信息格式 col_names = [str(col).lower() for col in info['column_names']] seat_keywords = ['区域', 'area', '楼层', 'floor', '排号', 'row', '座位', 'seat'] if any(keyword in ' '.join(col_names) for keyword in seat_keywords): seat_file = info print(f"✅ {info['name']} -> 识别为座位信息文件 ({info['columns']}列)") else: print(f"⚠️ {info['name']} 有{info['columns']}列,但列名不符合座位信息格式") else: print(f"❓ {info['name']} 有{info['columns']}列,无法自动识别类型") # 验证识别结果 if personnel_file is None: print(f"\n❌ 未找到人员信息文件") print("人员信息文件应包含: 姓名、证件类型、证件号、手机号、备注等列") return None, None if seat_file is None: print(f"\n❌ 未找到座位信息文件") print("座位信息文件应包含: 区域、楼层、排号、座位号等列") return None, None print(f"\n🎉 文件识别成功!") print(f"人员信息: {personnel_file['name']}") print(f"座位信息: {seat_file['name']}") # 显示文件内容预览 print(f"\n=== 文件内容预览 ===") print(f"\n📋 人员信息文件预览 ({personnel_file['name']}):") print(personnel_file['sample_data'].to_string(index=False, max_cols=10)) print(f"\n🪑 座位信息文件预览 ({seat_file['name']}):") print(seat_file['sample_data'].to_string(index=False, max_cols=10)) return personnel_file['path'], seat_file['path'] def check_data_files(): """检查并自动识别数据文件""" print("\n=== 数据文件检查 ===") # 首先尝试自动识别 personnel_file, seat_file = auto_detect_files() if personnel_file is None or seat_file is None: print(f"\n❌ 自动识别失败") print("\n请检查以下要求:") print("1. 确保目录下有且仅有2个Excel文件") print("2. 人员信息文件应有5-6列(姓名、证件类型、证件号、手机号、备注等)") print("3. 座位信息文件应有10+列(区域、楼层、排号、座位号等)") print("4. 列名应包含相关关键词") print("\n提示: 您可以参考示例文件来准备数据") return False, None, None return True, str(personnel_file), str(seat_file) # 检查依赖 if not check_dependencies(): print("❌ 依赖检查失败") input("按Enter键退出...") sys.exit(1) class Logger: """日志记录器""" def __init__(self, log_file='seat_allocation_log.txt'): # 创建log文件夹 self.log_dir = Path('log') self.log_dir.mkdir(exist_ok=True) # 设置日志文件路径 self.log_file = self.log_dir / log_file self.logs = [] def log(self, message, print_to_console=True): """记录日志""" timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") log_entry = f"[{timestamp}] {message}" self.logs.append(log_entry) if print_to_console: print(message) def save_logs(self): """保存日志到文件""" try: with open(self.log_file, 'w', encoding='utf-8') as f: f.write('\n'.join(self.logs)) print(f"日志已保存到: {self.log_file}") return True except Exception as e: print(f"保存日志失败: {e}") return False def get_log_path(self, filename): """获取log文件夹中的文件路径""" return self.log_dir / filename class SeatAllocationSystem: """座位分配系统""" def __init__(self): self.logger = Logger() self.personnel_df = None self.seat_df = None self.seating_groups = [] self.row_analysis = {} self.personnel_file = None self.seat_file = None def choose_file_mode(self): """选择文件识别模式""" self.logger.log("\n=== 选择文件识别模式 ===") self.logger.log("1. 默认文件名 - 读取 '人员信息.xlsx' 和 '座位信息.xlsx'") self.logger.log("2. 智能识别 - 自动识别目录中的Excel文件") while True: try: choice = input("\n请选择文件识别模式 (1/2,直接回车选择默认): ").strip() if choice == '' or choice == '1': self.logger.log("✅ 选择默认文件名模式") return 'default' elif choice == '2': self.logger.log("✅ 选择智能识别模式") return 'smart' else: print("请输入 1 或 2") except KeyboardInterrupt: self.logger.log("\n用户取消操作") return 'default' def smart_identify_files(self): """智能识别Excel文件""" self.logger.log("\n=== 智能识别Excel文件 ===") # 获取当前目录下的所有Excel文件 excel_files = [] for ext in ['*.xlsx', '*.xls']: excel_files.extend(Path('.').glob(ext)) if len(excel_files) < 2: self.logger.log(f"❌ 当前目录只找到 {len(excel_files)} 个Excel文件,需要至少2个") return None, None self.logger.log(f"📁 找到 {len(excel_files)} 个Excel文件:") # 分析每个文件的内容 file_analysis = [] for file_path in excel_files: try: # 先读取完整文件获取真实行数 df_full = pd.read_excel(file_path) total_rows = len(df_full) # 再读取前5行进行列分析 df_sample = pd.read_excel(file_path, nrows=5) analysis = { 'path': file_path, 'name': file_path.name, 'rows': total_rows, # 使用真实行数 'cols': len(df_sample.columns), 'columns': list(df_sample.columns), 'score': 0, 'type': 'unknown' } # 判断文件类型 - 改进的识别算法 columns_str = ' '.join(str(col).lower() for col in df_sample.columns) columns_list = [str(col).lower() for col in df_sample.columns] # 座位信息文件的强特征(优先判断) seat_strong_keywords = ['座位id', '区域', '楼层', '排号', '座位号'] seat_strong_score = sum(1 for keyword in seat_strong_keywords if keyword in columns_str) # 人员信息文件的强特征 personnel_strong_keywords = ['备注'] # 备注列是人员信息文件的强特征 personnel_strong_score = sum(1 for keyword in personnel_strong_keywords if keyword in columns_str) # 如果有座位强特征,优先识别为座位文件 if seat_strong_score >= 3: # 至少包含3个座位强特征 analysis['type'] = 'seat' analysis['score'] = seat_strong_score * 10 # 给予高权重 # 如果有人员强特征且列数较少,识别为人员文件 elif personnel_strong_score > 0 and len(df_sample.columns) <= 8: analysis['type'] = 'personnel' analysis['score'] = personnel_strong_score * 10 else: # 使用原有的弱特征判断 personnel_keywords = ['姓名', '证件', '手机', 'name', 'id', 'phone'] personnel_score = sum(1 for keyword in personnel_keywords if keyword in columns_str) seat_keywords = ['区域', '楼层', '排号', '座位', 'area', 'floor', 'row', 'seat'] seat_score = sum(1 for keyword in seat_keywords if keyword in columns_str) # 结合列数进行判断 if len(df_sample.columns) <= 8 and personnel_score > 0: analysis['type'] = 'personnel' analysis['score'] = personnel_score elif len(df_sample.columns) > 8 and seat_score > 0: analysis['type'] = 'seat' analysis['score'] = seat_score elif personnel_score > seat_score: analysis['type'] = 'personnel' analysis['score'] = personnel_score elif seat_score > personnel_score: analysis['type'] = 'seat' analysis['score'] = seat_score file_analysis.append(analysis) self.logger.log(f" 📄 {file_path.name}: {total_rows}行 × {len(df_sample.columns)}列") self.logger.log(f" 列名: {', '.join(str(col) for col in df_sample.columns[:5])}{'...' if len(df_sample.columns) > 5 else ''}") self.logger.log(f" 推测类型: {analysis['type']} (得分: {analysis['score']})") except Exception as e: self.logger.log(f" ❌ {file_path.name}: 读取失败 - {e}") # 自动匹配最佳文件 personnel_files = [f for f in file_analysis if f['type'] == 'personnel'] seat_files = [f for f in file_analysis if f['type'] == 'seat'] personnel_file = None seat_file = None if personnel_files: personnel_file = max(personnel_files, key=lambda x: x['score'])['path'] self.logger.log(f"\n🎯 自动识别人员信息文件: {personnel_file.name}") if seat_files: seat_file = max(seat_files, key=lambda x: x['score'])['path'] self.logger.log(f"🎯 自动识别座位信息文件: {seat_file.name}") # 如果自动识别不完整,提供手动选择 if not personnel_file or not seat_file: self.logger.log("\n⚠️ 自动识别不完整,请手动选择:") personnel_file, seat_file = self.manual_select_files(file_analysis) return personnel_file, seat_file def manual_select_files(self, file_analysis): """手动选择文件""" self.logger.log("\n=== 手动选择文件 ===") print("\n可用的Excel文件:") for i, analysis in enumerate(file_analysis): print(f"{i+1}. {analysis['name']} ({analysis['rows']}行 × {analysis['cols']}列)") print(f" 列名: {', '.join(str(col) for col in analysis['columns'][:3])}...") personnel_file = None seat_file = None # 选择人员信息文件 while not personnel_file: try: choice = input(f"\n请选择人员信息文件 (1-{len(file_analysis)}): ").strip() idx = int(choice) - 1 if 0 <= idx < len(file_analysis): personnel_file = file_analysis[idx]['path'] self.logger.log(f"✅ 选择人员信息文件: {personnel_file.name}") else: print("请输入有效的数字") except (ValueError, KeyboardInterrupt): print("请输入有效的数字") # 选择座位信息文件 while not seat_file: try: choice = input(f"\n请选择座位信息文件 (1-{len(file_analysis)}): ").strip() idx = int(choice) - 1 if 0 <= idx < len(file_analysis): if file_analysis[idx]['path'] == personnel_file: print("不能选择相同的文件,请重新选择") continue seat_file = file_analysis[idx]['path'] self.logger.log(f"✅ 选择座位信息文件: {seat_file.name}") else: print("请输入有效的数字") except (ValueError, KeyboardInterrupt): print("请输入有效的数字") return personnel_file, seat_file def set_data_files(self, personnel_file, seat_file): """设置数据文件路径""" self.personnel_file = personnel_file self.seat_file = seat_file def identify_and_set_files(self): """识别并设置数据文件""" file_mode = self.choose_file_mode() if file_mode == 'default': # 默认文件名模式 personnel_file = Path('人员信息.xlsx') seat_file = Path('座位信息.xlsx') if not personnel_file.exists(): self.logger.log(f"❌ 错误: {personnel_file} 文件不存在") return False if not seat_file.exists(): self.logger.log(f"❌ 错误: {seat_file} 文件不存在") return False else: # 智能识别模式 personnel_file, seat_file = self.smart_identify_files() if not personnel_file or not seat_file: self.logger.log("❌ 文件识别失败") return False self.set_data_files(personnel_file, seat_file) return True def load_data(self): """加载人员信息和座位信息数据""" self.logger.log("=== 开始加载数据 ===") # 使用自动识别的文件路径 if not self.personnel_file or not self.seat_file: self.logger.log("❌ 错误: 数据文件路径未设置") return False if not Path(self.personnel_file).exists(): self.logger.log(f"❌ 错误: {self.personnel_file} 文件不存在") return False if not Path(self.seat_file).exists(): self.logger.log(f"❌ 错误: {self.seat_file} 文件不存在") return False try: self.logger.log(f"正在读取人员信息文件: {self.personnel_file}") # 读取人员信息,指定数据类型 self.personnel_df = pd.read_excel(self.personnel_file, dtype={ '姓名': 'str', '证件类型': 'str', '证件号': 'str', # 证件号作为字符串读取,避免X被转换 '手机号': 'str' }) self.logger.log(f"正在读取座位信息文件: {self.seat_file}") # 读取座位信息,指定数据类型 seat_df_raw = pd.read_excel(self.seat_file, dtype={ '区域': 'str', '楼层': 'str', '排号': 'str', '座位号': 'str', '姓名': 'str', '证件类型': 'str', '证件号': 'str', '手机号': 'str', '签发地/国籍': 'str' }) # 过滤和清洗座位数据 self.seat_df = self.filter_seat_data(seat_df_raw) # 清理文字信息中的空格 self.clean_text_data() self.logger.log(f"✅ 文件加载成功") self.logger.log(f" 人员信息: {self.personnel_df.shape[0]} 行 × {self.personnel_df.shape[1]} 列") self.logger.log(f" 座位信息: {self.seat_df.shape[0]} 行 × {self.seat_df.shape[1]} 列") return True except Exception as e: self.logger.log(f"❌ 文件加载失败: {e}") return False def clean_text_data(self): """清理文字数据中的空格""" self.logger.log("清理文字数据中的空格...") # 清理人员信息中的空格 text_columns_personnel = ['姓名', '证件类型', '证件号'] for col in text_columns_personnel: if col in self.personnel_df.columns: self.personnel_df[col] = self.personnel_df[col].astype(str).str.strip() # 清理座位信息中的空格 text_columns_seat = ['区域', '楼层', '排号', '座位号', '姓名', '证件类型', '证件号', '签发地/国籍'] for col in text_columns_seat: if col in self.seat_df.columns: # 只对非空值进行清理 mask = self.seat_df[col].notna() self.seat_df.loc[mask, col] = self.seat_df.loc[mask, col].astype(str).str.strip() self.logger.log("✅ 文字数据清理完成") def filter_seat_data(self, seat_df_raw): """过滤和清洗座位数据,移除空行和无效数据""" self.logger.log("开始过滤座位数据...") original_count = len(seat_df_raw) self.logger.log(f"原始数据行数: {original_count}") # 关键列,这些列都不能为空 key_columns = ['区域', '楼层', '排号', '座位号'] # 创建过滤条件:关键列都不能为空且不能为'nan'字符串 filter_condition = True for col in key_columns: if col in seat_df_raw.columns: # 过滤空值、NaN、'nan'字符串 col_condition = ( seat_df_raw[col].notna() & (seat_df_raw[col].astype(str).str.strip() != '') & (seat_df_raw[col].astype(str).str.lower() != 'nan') ) filter_condition = filter_condition & col_condition # 应用过滤条件 seat_df_filtered = seat_df_raw[filter_condition].copy() # 重置索引 seat_df_filtered.reset_index(drop=True, inplace=True) filtered_count = len(seat_df_filtered) removed_count = original_count - filtered_count self.logger.log(f"过滤后数据行数: {filtered_count}") self.logger.log(f"移除无效行数: {removed_count}") if removed_count > 0: self.logger.log(f"✅ 已过滤掉 {removed_count} 行无效数据(空行、示例数据等)") # 显示过滤后的数据概览 if filtered_count > 0: areas = seat_df_filtered['区域'].unique() floors = seat_df_filtered['楼层'].unique() self.logger.log(f"有效座位区域: {', '.join(areas)}") self.logger.log(f"有效座位楼层: {', '.join(floors)}") return seat_df_filtered def validate_personnel_structure(self): """校验人员信息文件结构""" self.logger.log("\n=== 人员信息结构校验 ===") required_columns = ['姓名', '证件类型', '证件号', '手机号', '备注'] validation_results = [] # 检查必需列 missing_columns = [] for col in required_columns: if col not in self.personnel_df.columns: missing_columns.append(col) if missing_columns: self.logger.log(f"❌ 缺少必需列: {missing_columns}") validation_results.append(False) else: self.logger.log("✅ 所有必需列都存在") validation_results.append(True) # 检查数据完整性 self.logger.log("\n数据完整性检查:") for col in ['姓名', '证件类型', '证件号', '手机号']: if col in self.personnel_df.columns: null_count = self.personnel_df[col].isnull().sum() if null_count > 0: self.logger.log(f"⚠️ {col} 列有 {null_count} 个空值") validation_results.append(False) else: self.logger.log(f"✅ {col} 列数据完整") validation_results.append(True) # 检查重复人员(姓名+身份证号组合) self.logger.log("\n重复人员检查:") if '姓名' in self.personnel_df.columns and '证件号' in self.personnel_df.columns: # 创建姓名+证件号的组合标识 self.personnel_df['人员标识'] = self.personnel_df['姓名'].astype(str) + '_' + self.personnel_df['证件号'].astype(str) duplicate_persons = self.personnel_df[self.personnel_df['人员标识'].duplicated()] if not duplicate_persons.empty: self.logger.log(f"⚠️ 发现重复人员:") for _, person in duplicate_persons.iterrows(): self.logger.log(f" {person['姓名']} (证件号: {person['证件号']})") validation_results.append(False) else: self.logger.log("✅ 无重复人员(姓名+证件号组合检查)") validation_results.append(True) # 单独检查重名情况(仅提示,不影响校验结果) duplicate_names = self.personnel_df[self.personnel_df['姓名'].duplicated(keep=False)] if not duplicate_names.empty: self.logger.log(f"📝 发现重名人员(但证件号不同,允许通过):") name_groups = duplicate_names.groupby('姓名') for name, group in name_groups: self.logger.log(f" 姓名: {name}") for _, person in group.iterrows(): self.logger.log(f" 证件号: {person['证件号']}") else: self.logger.log("❌ 缺少姓名或证件号列,无法进行重复检查") validation_results.append(False) return all(validation_results) def validate_seating_groups(self): """校验连坐组的完整性和正确性""" self.logger.log("\n=== 连坐组校验 ===") validation_results = [] issues = [] i = 0 group_num = 1 while i < len(self.personnel_df): person = self.personnel_df.iloc[i] remark = person['备注'] if pd.isna(remark): # 无备注,单独坐 self.logger.log(f"✅ 第 {group_num} 组: {person['姓名']} (单独)") i += 1 else: # 有备注,检查连坐组 try: group_size = int(remark) if group_size < 1 or group_size > 10: issue = f"❌ 第 {group_num} 组: {person['姓名']} 的备注 {group_size} 超出范围 (1-10)" self.logger.log(issue) issues.append(issue) validation_results.append(False) i += 1 continue # 检查后续人员是否足够 if i + group_size > len(self.personnel_df): issue = f"❌ 第 {group_num} 组: {person['姓名']} 需要 {group_size} 人连坐,但后续人员不足" self.logger.log(issue) issues.append(issue) validation_results.append(False) i += 1 continue # 检查后续人员的备注是否为空 group_members = [] valid_group = True for j in range(group_size): member = self.personnel_df.iloc[i + j] group_members.append(member['姓名']) if j == 0: # 第一个人应该有备注 if pd.isna(member['备注']) or int(member['备注']) != group_size: issue = f"❌ 第 {group_num} 组: 组长 {member['姓名']} 的备注应该是 {group_size}" self.logger.log(issue) issues.append(issue) valid_group = False else: # 后续人员应该没有备注 if not pd.isna(member['备注']): issue = f"❌ 第 {group_num} 组: {member['姓名']} 应该没有备注(当前备注: {member['备注']})" self.logger.log(issue) issues.append(issue) valid_group = False if valid_group: self.logger.log(f"✅ 第 {group_num} 组: {', '.join(group_members)} ({group_size}人连坐)") validation_results.append(True) else: validation_results.append(False) i += group_size except ValueError: issue = f"❌ 第 {group_num} 组: {person['姓名']} 的备注 '{remark}' 不是有效数字" self.logger.log(issue) issues.append(issue) validation_results.append(False) i += 1 group_num += 1 return all(validation_results), issues def validate_seat_structure(self): """校验座位信息结构和连续性""" self.logger.log("\n=== 座位信息结构校验 ===") required_columns = ['区域', '楼层', '排号', '座位号'] validation_results = [] # 检查必需列 missing_columns = [] for col in required_columns: if col not in self.seat_df.columns: missing_columns.append(col) if missing_columns: self.logger.log(f"❌ 缺少必需列: {missing_columns}") return False, {} self.logger.log("✅ 所有必需列都存在") # 检查数据完整性 for col in required_columns: null_count = self.seat_df[col].isnull().sum() if null_count > 0: self.logger.log(f"❌ {col} 列有 {null_count} 个空值") validation_results.append(False) else: self.logger.log(f"✅ {col} 列数据完整") validation_results.append(True) # 分析座位结构 self.logger.log("\n座位结构分析:") seat_groups = {} for _, row in self.seat_df.iterrows(): key = (row['区域'], row['楼层'], row['排号']) if key not in seat_groups: seat_groups[key] = [] seat_groups[key].append(row['座位号']) # 检查每排的座位结构和可用连续段 self.logger.log("\n座位结构分析:") for (area, floor, row_num), seats in seat_groups.items(): # 提取座位号数字 seat_numbers = [] for seat in seats: try: seat_numbers.append(int(str(seat).replace('号', ''))) except: self.logger.log(f"❌ {area}-{floor}-{row_num}: 座位号格式异常 '{seat}'") validation_results.append(False) continue seat_numbers.sort() # 分析连续段 consecutive_segments = [] if seat_numbers: current_segment = [seat_numbers[0]] for i in range(1, len(seat_numbers)): if seat_numbers[i] - seat_numbers[i-1] == 1: # 连续 current_segment.append(seat_numbers[i]) else: # 不连续,保存当前段,开始新段 consecutive_segments.append(current_segment) current_segment = [seat_numbers[i]] # 添加最后一段 consecutive_segments.append(current_segment) # 显示分析结果 if len(consecutive_segments) == 1 and len(consecutive_segments[0]) == len(seat_numbers): self.logger.log(f"✅ {area}-{floor}-{row_num}: {len(seats)} 个座位完全连续 ({min(seat_numbers)}-{max(seat_numbers)})") else: segments_info = [] max_segment_size = 0 for segment in consecutive_segments: if len(segment) == 1: segments_info.append(f"{segment[0]}") else: segments_info.append(f"{segment[0]}-{segment[-1]}") max_segment_size = max(max_segment_size, len(segment)) self.logger.log(f"📊 {area}-{floor}-{row_num}: {len(seats)} 个座位,{len(consecutive_segments)} 个连续段: {', '.join(segments_info)}") self.logger.log(f" 最大连续段: {max_segment_size} 个座位") validation_results.append(True) # 座位不连续不算错误,只是需要特殊处理 return all(validation_results), seat_groups def validate_capacity_feasibility(self, seat_groups): """校验座位容量和分配可行性""" self.logger.log("\n=== 容量和可行性校验 ===") # 统计人员总数 total_people = len(self.personnel_df) total_seats = sum(len(seats) for seats in seat_groups.values()) self.logger.log(f"总人数: {total_people}") self.logger.log(f"总座位数: {total_seats}") if total_people > total_seats: self.logger.log(f"❌ 座位不足: 需要 {total_people} 个座位,只有 {total_seats} 个") return False else: self.logger.log(f"✅ 座位充足: 剩余 {total_seats - total_people} 个座位") # 分析连坐组需求 self.logger.log("\n连坐组需求分析:") group_sizes = [] i = 0 while i < len(self.personnel_df): person = self.personnel_df.iloc[i] remark = person['备注'] if pd.isna(remark): group_sizes.append(1) i += 1 else: try: group_size = int(remark) group_sizes.append(group_size) i += group_size except: group_sizes.append(1) i += 1 max_group_size = max(group_sizes) self.logger.log(f"最大连坐组: {max_group_size} 人") # 检查是否有足够的连续座位来容纳最大连坐组 self.logger.log(f"\n连续座位可行性分析:") max_consecutive_available = 0 feasible_rows = [] for (area, floor, row_num), seats in seat_groups.items(): # 分析每排的连续座位段 seat_numbers = [] for seat in seats: try: seat_numbers.append(int(str(seat).replace('号', ''))) except: continue seat_numbers.sort() # 找出最大连续段 max_consecutive_in_row = 0 if seat_numbers: current_consecutive = 1 for i in range(1, len(seat_numbers)): if seat_numbers[i] - seat_numbers[i-1] == 1: current_consecutive += 1 else: max_consecutive_in_row = max(max_consecutive_in_row, current_consecutive) current_consecutive = 1 max_consecutive_in_row = max(max_consecutive_in_row, current_consecutive) if max_consecutive_in_row >= max_group_size: feasible_rows.append((area, floor, row_num, max_consecutive_in_row)) max_consecutive_available = max(max_consecutive_available, max_consecutive_in_row) self.logger.log(f" {area}-{floor}-{row_num}: 最大连续 {max_consecutive_in_row} 个座位") self.logger.log(f"\n全场最大连续座位: {max_consecutive_available} 个") if max_group_size > max_consecutive_available: self.logger.log(f"❌ 无法容纳最大连坐组: 需要 {max_group_size} 个连续座位,最大连续段只有 {max_consecutive_available} 个") return False else: self.logger.log(f"✅ 可以容纳最大连坐组") self.logger.log(f"可容纳最大连坐组的排数: {len(feasible_rows)} 个") # 统计各种大小的连坐组 size_counts = {} for size in group_sizes: size_counts[size] = size_counts.get(size, 0) + 1 self.logger.log("\n连坐组分布:") for size in sorted(size_counts.keys()): count = size_counts[size] if size == 1: self.logger.log(f" 单人组: {count} 个") else: self.logger.log(f" {size}人连坐组: {count} 个") return True def analyze_seating_requirements(self, use_phone_grouping=False): """分析人员连坐需求""" self.logger.log("\n=== 人员连坐需求分析 ===") if use_phone_grouping: self.logger.log("🔍 使用手机号分组模式") return self.analyze_seating_by_phone() else: self.logger.log("🔍 使用备注分组模式") return self.analyze_seating_by_remark() def analyze_seating_by_phone(self): """基于手机号分析连坐需求""" self.logger.log("根据相同手机号识别连坐组...") # 按手机号分组 phone_groups = self.personnel_df.groupby('手机号') self.seating_groups = [] for phone, group in phone_groups: group_members = [row for _, row in group.iterrows()] group_size = len(group_members) if group_size == 1: # 单人组 self.seating_groups.append({ 'type': 'single', 'size': 1, 'members': group_members, 'leader': group_members[0]['姓名'], 'grouping_method': 'phone' }) self.logger.log(f"📱 单人: {group_members[0]['姓名']} (手机号: {phone})") else: # 连坐组 leader_name = group_members[0]['姓名'] member_names = [member['姓名'] for member in group_members] self.seating_groups.append({ 'type': 'group', 'size': group_size, 'members': group_members, 'leader': leader_name, 'grouping_method': 'phone' }) self.logger.log(f"📱 连坐组: {', '.join(member_names)} (手机号: {phone}, {group_size}人)") # 统计 size_stats = {} for group in self.seating_groups: size = group['size'] size_stats[size] = size_stats.get(size, 0) + 1 self.logger.log(f"\n基于手机号识别出 {len(self.seating_groups)} 个座位组:") for size in sorted(size_stats.keys()): count = size_stats[size] if size == 1: self.logger.log(f" 单人组: {count} 个") else: self.logger.log(f" {size}人连坐组: {count} 个") return True def analyze_seating_by_remark(self): """基于备注分析连坐需求""" self.logger.log("根据备注数字识别连坐组...") self.seating_groups = [] i = 0 while i < len(self.personnel_df): person = self.personnel_df.iloc[i] remark = person['备注'] if pd.isna(remark): # 无备注,单独坐 self.seating_groups.append({ 'type': 'single', 'size': 1, 'members': [person], 'leader': person['姓名'], 'grouping_method': 'remark' }) i += 1 else: # 有备注,按备注数量连坐 group_size = int(remark) group_members = [] # 收集连坐组成员 for j in range(group_size): if i + j < len(self.personnel_df): group_members.append(self.personnel_df.iloc[i + j]) self.seating_groups.append({ 'type': 'group' if group_size > 1 else 'single', 'size': len(group_members), 'members': group_members, 'leader': person['姓名'], 'grouping_method': 'remark' }) i += group_size # 统计 size_stats = {} for group in self.seating_groups: size = group['size'] size_stats[size] = size_stats.get(size, 0) + 1 self.logger.log(f"\n基于备注识别出 {len(self.seating_groups)} 个座位组:") for size in sorted(size_stats.keys()): count = size_stats[size] if size == 1: self.logger.log(f" 单人组: {count} 个") else: self.logger.log(f" {size}人连坐组: {count} 个") return True def analyze_seat_structure_advanced(self): """高级座位结构分析,识别连续段""" self.logger.log("\n=== 高级座位结构分析 ===") seat_groups = {} for _, row in self.seat_df.iterrows(): key = (row['区域'], row['楼层'], row['排号']) if key not in seat_groups: seat_groups[key] = [] seat_groups[key].append({ 'index': row.name, '座位号': row['座位号'], 'row_data': row }) # 分析每排的连续段 self.row_analysis = {} for key, seats in seat_groups.items(): area, floor, row_num = key # 按座位号排序 def get_seat_number(seat_info): try: return int(seat_info['座位号'].replace('号', '')) except: return 0 seats.sort(key=get_seat_number) seat_numbers = [get_seat_number(seat) for seat in seats] # 找出所有连续段 consecutive_segments = [] if seat_numbers: current_segment_start = 0 for i in range(1, len(seat_numbers)): if seat_numbers[i] - seat_numbers[i-1] != 1: # 发现间隙,结束当前段 consecutive_segments.append({ 'start_idx': current_segment_start, 'end_idx': i - 1, 'size': i - current_segment_start, 'seat_numbers': seat_numbers[current_segment_start:i], 'seats': seats[current_segment_start:i] }) current_segment_start = i # 添加最后一段 consecutive_segments.append({ 'start_idx': current_segment_start, 'end_idx': len(seat_numbers) - 1, 'size': len(seat_numbers) - current_segment_start, 'seat_numbers': seat_numbers[current_segment_start:], 'seats': seats[current_segment_start:] }) self.row_analysis[key] = { 'total_seats': len(seats), 'all_seats': seats, 'consecutive_segments': consecutive_segments, 'max_consecutive': max(seg['size'] for seg in consecutive_segments) if consecutive_segments else 0 } # 显示分析结果 if len(consecutive_segments) == 1: self.logger.log(f"✅ {area}-{floor}-{row_num}: {len(seats)} 个座位完全连续") else: segments_info = [] for seg in consecutive_segments: if seg['size'] == 1: segments_info.append(f"{seg['seat_numbers'][0]}") else: segments_info.append(f"{seg['seat_numbers'][0]}-{seg['seat_numbers'][-1]}") self.logger.log(f"📊 {area}-{floor}-{row_num}: {len(seats)} 个座位,{len(consecutive_segments)} 段: {', '.join(segments_info)}") self.logger.log(f" 最大连续段: {self.row_analysis[key]['max_consecutive']} 个座位") return True def smart_seat_assignment(self): """智能座位分配算法""" self.logger.log("\n=== 开始智能座位分配 ===") # 按组大小排序,大组优先分配 sorted_groups = sorted(self.seating_groups, key=lambda x: x['size'], reverse=True) # 创建座位分配结果 seat_df_copy = self.seat_df.copy() # 记录已使用的座位 used_seats = set() assignment_log = [] unassigned_groups = [] self.logger.log(f"需要分配 {len(sorted_groups)} 个组") for group_idx, seating_group in enumerate(sorted_groups): group_size = seating_group['size'] group_type = seating_group['type'] leader = seating_group['leader'] self.logger.log(f"\n处理第 {group_idx + 1} 组: {leader} ({group_type}, {group_size} 人)") if group_size == 1: # 单人组,找任意可用座位 assigned = False for (area, floor, row_num), analysis in self.row_analysis.items(): for seat_info in analysis['all_seats']: if seat_info['index'] not in used_seats: # 分配座位 person_info = seating_group['members'][0] seat_index = seat_info['index'] # 覆盖座位表中的人员信息列 seat_df_copy.loc[seat_index, '姓名'] = str(person_info['姓名']).strip() seat_df_copy.loc[seat_index, '证件类型'] = str(person_info['证件类型']).strip() seat_df_copy.loc[seat_index, '证件号'] = str(person_info['证件号']).strip() seat_df_copy.loc[seat_index, '手机国家号'] = person_info.get('Unnamed: 3', 86) seat_df_copy.loc[seat_index, '手机号'] = str(person_info['手机号']).strip() # 只有分配备注放在第13列 seat_df_copy.loc[seat_index, '分配备注'] = str(person_info.get('备注', '')).strip() assignment_log.append({ '组号': group_idx + 1, '组类型': group_type, '组大小': group_size, '组长': leader, '姓名': person_info['姓名'], '区域': area, '楼层': floor, '排号': row_num, '座位号': seat_info['座位号'] }) used_seats.add(seat_index) self.logger.log(f" 分配到 {area}-{floor}-{row_num}: {person_info['姓名']} -> {seat_info['座位号']}") assigned = True break if assigned: break else: # 多人组,需要连续座位 # 更新可用座位分析(排除已使用的座位) current_row_analysis = {} for key, analysis in self.row_analysis.items(): available_seats = [seat for seat in analysis['all_seats'] if seat['index'] not in used_seats] if available_seats: # 重新分析连续段 available_seats.sort(key=lambda x: int(x['座位号'].replace('号', ''))) seat_numbers = [int(seat['座位号'].replace('号', '')) for seat in available_seats] # 找出连续段 consecutive_segments = [] if seat_numbers: current_segment_start = 0 for i in range(1, len(seat_numbers)): if seat_numbers[i] - seat_numbers[i-1] != 1: consecutive_segments.append({ 'start_idx': current_segment_start, 'end_idx': i - 1, 'size': i - current_segment_start, 'seats': available_seats[current_segment_start:i] }) current_segment_start = i consecutive_segments.append({ 'start_idx': current_segment_start, 'end_idx': len(seat_numbers) - 1, 'size': len(seat_numbers) - current_segment_start, 'seats': available_seats[current_segment_start:] }) current_row_analysis[key] = { 'consecutive_segments': consecutive_segments } # 寻找合适的连续座位 assigned = False for (area, floor, row_num), analysis in current_row_analysis.items(): for segment in analysis['consecutive_segments']: if segment['size'] >= group_size: # 找到合适的连续座位 seats_to_use = segment['seats'][:group_size] seat_numbers = [int(seat['座位号'].replace('号', '')) for seat in seats_to_use] self.logger.log(f" 分配到 {area}-{floor}-{row_num} (连续座位: {min(seat_numbers)}-{max(seat_numbers)})") # 分配座位 for i, seat_info in enumerate(seats_to_use): person_info = seating_group['members'][i] seat_index = seat_info['index'] seat_df_copy.loc[seat_index, '姓名'] = str(person_info['姓名']).strip() seat_df_copy.loc[seat_index, '证件类型'] = str(person_info['证件类型']).strip() seat_df_copy.loc[seat_index, '证件号'] = str(person_info['证件号']).strip() seat_df_copy.loc[seat_index, '手机国家号'] = person_info.get('Unnamed: 3', 86) seat_df_copy.loc[seat_index, '手机号'] = str(person_info['手机号']).strip() # 只有分配备注放在第13列 seat_df_copy.loc[seat_index, '分配备注'] = str(person_info.get('备注', '')).strip() assignment_log.append({ '组号': group_idx + 1, '组类型': group_type, '组大小': group_size, '组长': leader, '姓名': person_info['姓名'], '区域': area, '楼层': floor, '排号': row_num, '座位号': seat_info['座位号'] }) used_seats.add(seat_index) self.logger.log(f" {person_info['姓名']} -> {seat_info['座位号']}") assigned = True break if assigned: break if not assigned: self.logger.log(f" ❌ 无法为第 {group_idx + 1} 组分配座位") unassigned_groups.append(seating_group) # 显示未分配的组 if unassigned_groups: self.logger.log(f"\n⚠️ 有 {len(unassigned_groups)} 个组未能分配座位") for group in unassigned_groups: if group['type'] == 'single': self.logger.log(f" 未分配: {group['leader']}") else: member_names = [member['姓名'] for member in group['members']] self.logger.log(f" 未分配: {', '.join(member_names)} (连坐 {group['size']} 人)") return seat_df_copy, assignment_log def save_results(self, seat_df_result, assignment_log): """保存分配结果""" try: # 保存更新后的座位信息 output_file = self.logger.get_log_path('座位信息_最终分配.xlsx') seat_df_result.to_excel(output_file, index=False) self.logger.log(f"\n座位分配结果已保存到: {output_file}") self.logger.log(f"📋 人员信息已覆盖到座位表对应列:") self.logger.log(f" 第6列: 姓名") self.logger.log(f" 第7列: 证件类型") self.logger.log(f" 第8列: 证件号") self.logger.log(f" 第9列: 手机国家号") self.logger.log(f" 第10列: 手机号") self.logger.log(f"📝 分配备注信息:") self.logger.log(f" 第13列: 分配备注(新增列,不覆盖原数据)") self.logger.log(f"💡 座位基本信息(第1-5列)保持不变") # 保存分配日志 if assignment_log: log_df = pd.DataFrame(assignment_log) log_file = self.logger.get_log_path('最终座位分配日志.xlsx') log_df.to_excel(log_file, index=False) self.logger.log(f"分配日志已保存到: {log_file}") # 显示分配统计 self.logger.log(f"\n=== 分配统计 ===") self.logger.log(f"总共分配了 {len(assignment_log)} 个座位") # 按组大小统计 size_stats = log_df.groupby('组大小').agg({ '姓名': 'count', '组号': 'nunique' }).rename(columns={'姓名': '总人数', '组号': '组数'}) self.logger.log("\n按组大小统计:") for size, row in size_stats.iterrows(): if size == 1: self.logger.log(f" 单人组: {row['组数']} 个组, {row['总人数']} 人") else: self.logger.log(f" {size}人连坐组: {row['组数']} 个组, {row['总人数']} 人") # 验证连续性 self.logger.log("\n=== 连续性验证 ===") consecutive_check = [] for group_num in sorted(log_df['组号'].unique()): group_data = log_df[log_df['组号'] == group_num] group_size = group_data.iloc[0]['组大小'] group_leader = group_data.iloc[0]['组长'] if group_size > 1: # 多人组 # 检查是否在同一排 areas = group_data['区域'].unique() floors = group_data['楼层'].unique() rows = group_data['排号'].unique() if len(areas) == 1 and len(floors) == 1 and len(rows) == 1: # 在同一排,检查座位号是否连续 seats = group_data['座位号'].tolist() seat_numbers = [] for seat in seats: try: seat_numbers.append(int(seat.replace('号', ''))) except: seat_numbers.append(0) seat_numbers.sort() is_consecutive = all(seat_numbers[i] + 1 == seat_numbers[i+1] for i in range(len(seat_numbers)-1)) if is_consecutive: consecutive_check.append(True) self.logger.log(f"✅ 组 {group_num} ({group_leader}): {group_size}人连坐,座位连续 {seat_numbers}") else: consecutive_check.append(False) self.logger.log(f"❌ 组 {group_num} ({group_leader}): {group_size}人连坐,座位不连续 {seat_numbers}") else: consecutive_check.append(False) self.logger.log(f"❌ 组 {group_num} ({group_leader}): 不在同一排") success_rate = sum(consecutive_check) / len(consecutive_check) * 100 if consecutive_check else 100 self.logger.log(f"\n连续性检查结果: {sum(consecutive_check)}/{len(consecutive_check)} 个多人组座位连续 ({success_rate:.1f}%)") return True except Exception as e: self.logger.log(f"保存结果时出错: {e}") return False def run_validation(self): """运行完整的文件校验""" self.logger.log("=" * 60) self.logger.log("座位分配系统 - 文件校验") self.logger.log("=" * 60) # 识别并设置数据文件 if not self.identify_and_set_files(): return False # 加载数据 if not self.load_data(): return False # 人员信息结构校验 personnel_structure_valid = self.validate_personnel_structure() # 连坐组校验 seating_groups_valid, group_issues = self.validate_seating_groups() # 座位信息校验 seat_structure_valid, seat_groups = self.validate_seat_structure() # 容量可行性校验 capacity_valid = self.validate_capacity_feasibility(seat_groups) # 总结 self.logger.log("\n" + "=" * 60) self.logger.log("校验结果总结") self.logger.log("=" * 60) all_valid = all([ personnel_structure_valid, seating_groups_valid, seat_structure_valid, capacity_valid ]) self.logger.log(f"人员信息结构: {'✅ 通过' if personnel_structure_valid else '❌ 失败'}") self.logger.log(f"连坐组完整性: {'✅ 通过' if seating_groups_valid else '❌ 失败'}") self.logger.log(f"座位信息结构: {'✅ 通过' if seat_structure_valid else '❌ 失败'}") self.logger.log(f"容量可行性: {'✅ 通过' if capacity_valid else '❌ 失败'}") self.logger.log(f"\n总体校验结果: {'✅ 全部通过' if all_valid else '❌ 存在问题'}") if group_issues: self.logger.log(f"\n发现的问题:") for issue in group_issues: self.logger.log(f" {issue}") if all_valid: self.logger.log("\n🎉 文件校验通过,可以进行座位分配!") else: self.logger.log("\n⚠️ 请修复上述问题后再进行座位分配。") return all_valid def run_allocation(self): """运行完整的座位分配""" self.logger.log("\n" + "=" * 60) self.logger.log("开始座位分配") self.logger.log("=" * 60) # 选择分组方式 grouping_method = self.choose_grouping_method() # 分析人员连坐需求 if not self.analyze_seating_requirements(use_phone_grouping=(grouping_method == 'phone')): return False # 高级座位结构分析 if not self.analyze_seat_structure_advanced(): return False # 执行智能座位分配 seat_df_result, assignment_log = self.smart_seat_assignment() # 保存结果 if self.save_results(seat_df_result, assignment_log): self.logger.log("\n🎉 座位分配完成!") return True else: self.logger.log("\n❌ 座位分配失败!") return False def choose_grouping_method(self): """选择分组方式""" self.logger.log("\n=== 选择连坐分组方式 ===") self.logger.log("1. 备注分组 - 根据备注数字识别连坐组(默认)") self.logger.log("2. 手机号分组 - 根据相同手机号识别连坐组") while True: try: choice = input("\n请选择分组方式 (1/2,直接回车选择默认): ").strip() if choice == '' or choice == '1': self.logger.log("✅ 选择备注分组模式") return 'remark' elif choice == '2': self.logger.log("✅ 选择手机号分组模式") return 'phone' else: print("请输入 1 或 2") except KeyboardInterrupt: self.logger.log("\n用户取消操作") return 'remark' def main(): """主函数""" print("=" * 60) print("座位分配系统 v2.0 (智能识别版)") print("=" * 60) try: # 自动检查和识别数据文件 success, personnel_file, seat_file = check_data_files() if not success: input("\n按Enter键退出...") return print("\n开始运行座位分配系统...") system = SeatAllocationSystem() # 设置识别出的文件路径 system.set_data_files(personnel_file, seat_file) # 运行校验 if system.run_validation(): # 校验通过,询问是否继续分配 response = input("\n文件校验通过,是否开始座位分配? (Y/n): ") if response.lower() in ['', 'y', 'yes']: # 运行分配 if system.run_allocation(): print("\n🎉 座位分配完成!") print("请查看log文件夹中的输出文件:") print("- log/座位信息_最终分配.xlsx (分配结果)") print("- log/最终座位分配日志.xlsx (详细日志)") print("- log/seat_allocation_log.txt (运行日志)") else: print("\n❌ 座位分配失败!") else: print("用户取消座位分配") else: print("\n❌ 文件校验失败,请修复问题后重试") # 保存日志 system.logger.save_logs() except FileNotFoundError as e: print(f"\n❌ 文件未找到: {e}") print("请确保所有必要文件都在程序目录下") except PermissionError as e: print(f"\n❌ 文件权限错误: {e}") print("请确保程序有读写文件的权限") except Exception as e: print(f"\n❌ 程序运行出错: {e}") print("请检查数据文件格式是否正确") finally: # 等待用户确认后退出 input("\n程序结束,按Enter键退出...") if __name__ == "__main__": main()