TableSynthesis/seat_allocation_system.py
yovinchen 02341150a9 增加按照手机号排序
增加自动选择文以及识别文件
修改座位信息表位置
2025-07-02 16:26:35 +08:00

1520 lines
64 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
完整的座位分配系统
包含文件校验、智能分配算法和日志输出功能
支持1-10人连坐需求能够处理不连续座位
"""
import sys
import os
from pathlib import Path
import datetime
import pandas as pd
import numpy as np
def check_dependencies():
"""检查并安装必要的依赖包"""
required_packages = {
'pandas': 'pandas>=1.3.0',
'numpy': 'numpy>=1.20.0',
'openpyxl': 'openpyxl>=3.0.0'
}
missing_packages = []
print("检查依赖包...")
for package_name, package_spec in required_packages.items():
try:
__import__(package_name)
print(f"{package_name} 已安装")
except ImportError:
missing_packages.append(package_spec)
print(f"{package_name} 未安装")
if missing_packages:
print(f"\n发现缺失依赖: {', '.join(missing_packages)}")
print("正在尝试自动安装...")
try:
import subprocess
for package in missing_packages:
print(f"安装 {package}...")
result = subprocess.run([sys.executable, '-m', 'pip', 'install', package],
capture_output=True, text=True)
if result.returncode == 0:
print(f"{package} 安装成功")
else:
print(f"{package} 安装失败: {result.stderr}")
return False
except Exception as e:
print(f"❌ 自动安装失败: {e}")
print("\n请手动安装以下依赖包:")
for package in missing_packages:
print(f" pip install {package}")
input("安装完成后按Enter键继续...")
return False
return True
def auto_detect_files():
"""自动检测和识别Excel文件"""
print("\n=== 自动文件识别系统 ===")
# 扫描当前目录下的所有xlsx文件
current_dir = Path('.')
xlsx_files = list(current_dir.glob('*.xlsx'))
# 过滤掉输出文件和示例文件
exclude_patterns = ['座位信息_最终分配', '最终座位分配日志', '_示例', '_temp', '_backup']
xlsx_files = [f for f in xlsx_files if not any(pattern in f.name for pattern in exclude_patterns)]
print(f"发现 {len(xlsx_files)} 个Excel文件:")
for i, file_path in enumerate(xlsx_files, 1):
file_size = file_path.stat().st_size / 1024 # KB
print(f" {i}. {file_path.name} ({file_size:.1f} KB)")
if len(xlsx_files) == 0:
print("\n❌ 未找到Excel文件")
print("请确保当前目录下有Excel数据文件")
return None, None
if len(xlsx_files) > 2:
print(f"\n⚠️ 发现 {len(xlsx_files)} 个Excel文件超过2个")
print("为避免混淆请确保目录下只有2个数据文件")
print("(程序会自动忽略输出文件和示例文件)")
print("\n请手动移除多余文件,或选择要使用的文件:")
for i, file_path in enumerate(xlsx_files, 1):
print(f" {i}. {file_path.name}")
return None, None
# 分析文件结构
print(f"\n正在分析文件结构...")
file_info = []
for file_path in xlsx_files:
try:
# 读取文件获取基本信息
df = pd.read_excel(file_path, nrows=5) # 只读前5行进行分析
info = {
'path': file_path,
'name': file_path.name,
'columns': len(df.columns),
'rows': len(df) + 1, # 加上标题行
'column_names': list(df.columns),
'sample_data': df.head(2) # 获取前2行样本数据
}
file_info.append(info)
print(f"\n📄 {file_path.name}:")
print(f" 列数: {info['columns']}")
print(f" 行数: {info['rows']}+ (仅检查前5行)")
print(f" 列名: {', '.join(str(col) for col in info['column_names'][:6])}{'...' if len(info['column_names']) > 6 else ''}")
except Exception as e:
print(f"\n❌ 读取文件 {file_path.name} 失败: {e}")
return None, None
# 根据列数自动识别文件类型
personnel_file = None
seat_file = None
print(f"\n=== 文件类型识别 ===")
for info in file_info:
if info['columns'] == 5 or info['columns'] == 6: # 人员信息通常是5-6列
# 进一步检查列名是否符合人员信息格式
col_names = [str(col).lower() for col in info['column_names']]
personnel_keywords = ['姓名', 'name', '证件', 'id', '手机', 'phone', '备注', 'remark']
if any(keyword in ' '.join(col_names) for keyword in personnel_keywords):
personnel_file = info
print(f"{info['name']} -> 识别为人员信息文件 ({info['columns']}列)")
else:
print(f"⚠️ {info['name']}{info['columns']}列,但列名不符合人员信息格式")
elif info['columns'] >= 10: # 座位信息通常是10-12列
# 进一步检查列名是否符合座位信息格式
col_names = [str(col).lower() for col in info['column_names']]
seat_keywords = ['区域', 'area', '楼层', 'floor', '排号', 'row', '座位', 'seat']
if any(keyword in ' '.join(col_names) for keyword in seat_keywords):
seat_file = info
print(f"{info['name']} -> 识别为座位信息文件 ({info['columns']}列)")
else:
print(f"⚠️ {info['name']}{info['columns']}列,但列名不符合座位信息格式")
else:
print(f"{info['name']}{info['columns']}列,无法自动识别类型")
# 验证识别结果
if personnel_file is None:
print(f"\n❌ 未找到人员信息文件")
print("人员信息文件应包含: 姓名、证件类型、证件号、手机号、备注等列")
return None, None
if seat_file is None:
print(f"\n❌ 未找到座位信息文件")
print("座位信息文件应包含: 区域、楼层、排号、座位号等列")
return None, None
print(f"\n🎉 文件识别成功!")
print(f"人员信息: {personnel_file['name']}")
print(f"座位信息: {seat_file['name']}")
# 显示文件内容预览
print(f"\n=== 文件内容预览 ===")
print(f"\n📋 人员信息文件预览 ({personnel_file['name']}):")
print(personnel_file['sample_data'].to_string(index=False, max_cols=10))
print(f"\n🪑 座位信息文件预览 ({seat_file['name']}):")
print(seat_file['sample_data'].to_string(index=False, max_cols=10))
return personnel_file['path'], seat_file['path']
def check_data_files():
"""检查并自动识别数据文件"""
print("\n=== 数据文件检查 ===")
# 首先尝试自动识别
personnel_file, seat_file = auto_detect_files()
if personnel_file is None or seat_file is None:
print(f"\n❌ 自动识别失败")
print("\n请检查以下要求:")
print("1. 确保目录下有且仅有2个Excel文件")
print("2. 人员信息文件应有5-6列姓名、证件类型、证件号、手机号、备注等")
print("3. 座位信息文件应有10+列(区域、楼层、排号、座位号等)")
print("4. 列名应包含相关关键词")
print("\n提示: 您可以参考示例文件来准备数据")
return False, None, None
return True, str(personnel_file), str(seat_file)
# 检查依赖
if not check_dependencies():
print("❌ 依赖检查失败")
input("按Enter键退出...")
sys.exit(1)
class Logger:
"""日志记录器"""
def __init__(self, log_file='seat_allocation_log.txt'):
# 创建log文件夹
self.log_dir = Path('log')
self.log_dir.mkdir(exist_ok=True)
# 设置日志文件路径
self.log_file = self.log_dir / log_file
self.logs = []
def log(self, message, print_to_console=True):
"""记录日志"""
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log_entry = f"[{timestamp}] {message}"
self.logs.append(log_entry)
if print_to_console:
print(message)
def save_logs(self):
"""保存日志到文件"""
try:
with open(self.log_file, 'w', encoding='utf-8') as f:
f.write('\n'.join(self.logs))
print(f"日志已保存到: {self.log_file}")
return True
except Exception as e:
print(f"保存日志失败: {e}")
return False
def get_log_path(self, filename):
"""获取log文件夹中的文件路径"""
return self.log_dir / filename
class SeatAllocationSystem:
"""座位分配系统"""
def __init__(self):
self.logger = Logger()
self.personnel_df = None
self.seat_df = None
self.seating_groups = []
self.row_analysis = {}
self.personnel_file = None
self.seat_file = None
def choose_file_mode(self):
"""选择文件识别模式"""
self.logger.log("\n=== 选择文件识别模式 ===")
self.logger.log("1. 默认文件名 - 读取 '人员信息.xlsx''座位信息.xlsx'")
self.logger.log("2. 智能识别 - 自动识别目录中的Excel文件")
while True:
try:
choice = input("\n请选择文件识别模式 (1/2直接回车选择默认): ").strip()
if choice == '' or choice == '1':
self.logger.log("✅ 选择默认文件名模式")
return 'default'
elif choice == '2':
self.logger.log("✅ 选择智能识别模式")
return 'smart'
else:
print("请输入 1 或 2")
except KeyboardInterrupt:
self.logger.log("\n用户取消操作")
return 'default'
def smart_identify_files(self):
"""智能识别Excel文件"""
self.logger.log("\n=== 智能识别Excel文件 ===")
# 获取当前目录下的所有Excel文件
excel_files = []
for ext in ['*.xlsx', '*.xls']:
excel_files.extend(Path('.').glob(ext))
if len(excel_files) < 2:
self.logger.log(f"❌ 当前目录只找到 {len(excel_files)} 个Excel文件需要至少2个")
return None, None
self.logger.log(f"📁 找到 {len(excel_files)} 个Excel文件:")
# 分析每个文件的内容
file_analysis = []
for file_path in excel_files:
try:
# 先读取完整文件获取真实行数
df_full = pd.read_excel(file_path)
total_rows = len(df_full)
# 再读取前5行进行列分析
df_sample = pd.read_excel(file_path, nrows=5)
analysis = {
'path': file_path,
'name': file_path.name,
'rows': total_rows, # 使用真实行数
'cols': len(df_sample.columns),
'columns': list(df_sample.columns),
'score': 0,
'type': 'unknown'
}
# 判断文件类型 - 改进的识别算法
columns_str = ' '.join(str(col).lower() for col in df_sample.columns)
columns_list = [str(col).lower() for col in df_sample.columns]
# 座位信息文件的强特征(优先判断)
seat_strong_keywords = ['座位id', '区域', '楼层', '排号', '座位号']
seat_strong_score = sum(1 for keyword in seat_strong_keywords if keyword in columns_str)
# 人员信息文件的强特征
personnel_strong_keywords = ['备注'] # 备注列是人员信息文件的强特征
personnel_strong_score = sum(1 for keyword in personnel_strong_keywords if keyword in columns_str)
# 如果有座位强特征,优先识别为座位文件
if seat_strong_score >= 3: # 至少包含3个座位强特征
analysis['type'] = 'seat'
analysis['score'] = seat_strong_score * 10 # 给予高权重
# 如果有人员强特征且列数较少,识别为人员文件
elif personnel_strong_score > 0 and len(df_sample.columns) <= 8:
analysis['type'] = 'personnel'
analysis['score'] = personnel_strong_score * 10
else:
# 使用原有的弱特征判断
personnel_keywords = ['姓名', '证件', '手机', 'name', 'id', 'phone']
personnel_score = sum(1 for keyword in personnel_keywords if keyword in columns_str)
seat_keywords = ['区域', '楼层', '排号', '座位', 'area', 'floor', 'row', 'seat']
seat_score = sum(1 for keyword in seat_keywords if keyword in columns_str)
# 结合列数进行判断
if len(df_sample.columns) <= 8 and personnel_score > 0:
analysis['type'] = 'personnel'
analysis['score'] = personnel_score
elif len(df_sample.columns) > 8 and seat_score > 0:
analysis['type'] = 'seat'
analysis['score'] = seat_score
elif personnel_score > seat_score:
analysis['type'] = 'personnel'
analysis['score'] = personnel_score
elif seat_score > personnel_score:
analysis['type'] = 'seat'
analysis['score'] = seat_score
file_analysis.append(analysis)
self.logger.log(f" 📄 {file_path.name}: {total_rows}× {len(df_sample.columns)}")
self.logger.log(f" 列名: {', '.join(str(col) for col in df_sample.columns[:5])}{'...' if len(df_sample.columns) > 5 else ''}")
self.logger.log(f" 推测类型: {analysis['type']} (得分: {analysis['score']})")
except Exception as e:
self.logger.log(f"{file_path.name}: 读取失败 - {e}")
# 自动匹配最佳文件
personnel_files = [f for f in file_analysis if f['type'] == 'personnel']
seat_files = [f for f in file_analysis if f['type'] == 'seat']
personnel_file = None
seat_file = None
if personnel_files:
personnel_file = max(personnel_files, key=lambda x: x['score'])['path']
self.logger.log(f"\n🎯 自动识别人员信息文件: {personnel_file.name}")
if seat_files:
seat_file = max(seat_files, key=lambda x: x['score'])['path']
self.logger.log(f"🎯 自动识别座位信息文件: {seat_file.name}")
# 如果自动识别不完整,提供手动选择
if not personnel_file or not seat_file:
self.logger.log("\n⚠️ 自动识别不完整,请手动选择:")
personnel_file, seat_file = self.manual_select_files(file_analysis)
return personnel_file, seat_file
def manual_select_files(self, file_analysis):
"""手动选择文件"""
self.logger.log("\n=== 手动选择文件 ===")
print("\n可用的Excel文件:")
for i, analysis in enumerate(file_analysis):
print(f"{i+1}. {analysis['name']} ({analysis['rows']}× {analysis['cols']}列)")
print(f" 列名: {', '.join(str(col) for col in analysis['columns'][:3])}...")
personnel_file = None
seat_file = None
# 选择人员信息文件
while not personnel_file:
try:
choice = input(f"\n请选择人员信息文件 (1-{len(file_analysis)}): ").strip()
idx = int(choice) - 1
if 0 <= idx < len(file_analysis):
personnel_file = file_analysis[idx]['path']
self.logger.log(f"✅ 选择人员信息文件: {personnel_file.name}")
else:
print("请输入有效的数字")
except (ValueError, KeyboardInterrupt):
print("请输入有效的数字")
# 选择座位信息文件
while not seat_file:
try:
choice = input(f"\n请选择座位信息文件 (1-{len(file_analysis)}): ").strip()
idx = int(choice) - 1
if 0 <= idx < len(file_analysis):
if file_analysis[idx]['path'] == personnel_file:
print("不能选择相同的文件,请重新选择")
continue
seat_file = file_analysis[idx]['path']
self.logger.log(f"✅ 选择座位信息文件: {seat_file.name}")
else:
print("请输入有效的数字")
except (ValueError, KeyboardInterrupt):
print("请输入有效的数字")
return personnel_file, seat_file
def set_data_files(self, personnel_file, seat_file):
"""设置数据文件路径"""
self.personnel_file = personnel_file
self.seat_file = seat_file
def identify_and_set_files(self):
"""识别并设置数据文件"""
file_mode = self.choose_file_mode()
if file_mode == 'default':
# 默认文件名模式
personnel_file = Path('人员信息.xlsx')
seat_file = Path('座位信息.xlsx')
if not personnel_file.exists():
self.logger.log(f"❌ 错误: {personnel_file} 文件不存在")
return False
if not seat_file.exists():
self.logger.log(f"❌ 错误: {seat_file} 文件不存在")
return False
else:
# 智能识别模式
personnel_file, seat_file = self.smart_identify_files()
if not personnel_file or not seat_file:
self.logger.log("❌ 文件识别失败")
return False
self.set_data_files(personnel_file, seat_file)
return True
def load_data(self):
"""加载人员信息和座位信息数据"""
self.logger.log("=== 开始加载数据 ===")
# 使用自动识别的文件路径
if not self.personnel_file or not self.seat_file:
self.logger.log("❌ 错误: 数据文件路径未设置")
return False
if not Path(self.personnel_file).exists():
self.logger.log(f"❌ 错误: {self.personnel_file} 文件不存在")
return False
if not Path(self.seat_file).exists():
self.logger.log(f"❌ 错误: {self.seat_file} 文件不存在")
return False
try:
self.logger.log(f"正在读取人员信息文件: {self.personnel_file}")
# 读取人员信息,指定数据类型
self.personnel_df = pd.read_excel(self.personnel_file, dtype={
'姓名': 'str',
'证件类型': 'str',
'证件号': 'str', # 证件号作为字符串读取避免X被转换
'手机号': 'str'
})
self.logger.log(f"正在读取座位信息文件: {self.seat_file}")
# 读取座位信息,指定数据类型
seat_df_raw = pd.read_excel(self.seat_file, dtype={
'区域': 'str',
'楼层': 'str',
'排号': 'str',
'座位号': 'str',
'姓名': 'str',
'证件类型': 'str',
'证件号': 'str',
'手机号': 'str',
'签发地/国籍': 'str'
})
# 过滤和清洗座位数据
self.seat_df = self.filter_seat_data(seat_df_raw)
# 清理文字信息中的空格
self.clean_text_data()
self.logger.log(f"✅ 文件加载成功")
self.logger.log(f" 人员信息: {self.personnel_df.shape[0]}× {self.personnel_df.shape[1]}")
self.logger.log(f" 座位信息: {self.seat_df.shape[0]}× {self.seat_df.shape[1]}")
return True
except Exception as e:
self.logger.log(f"❌ 文件加载失败: {e}")
return False
def clean_text_data(self):
"""清理文字数据中的空格"""
self.logger.log("清理文字数据中的空格...")
# 清理人员信息中的空格
text_columns_personnel = ['姓名', '证件类型', '证件号']
for col in text_columns_personnel:
if col in self.personnel_df.columns:
self.personnel_df[col] = self.personnel_df[col].astype(str).str.strip()
# 清理座位信息中的空格
text_columns_seat = ['区域', '楼层', '排号', '座位号', '姓名', '证件类型', '证件号', '签发地/国籍']
for col in text_columns_seat:
if col in self.seat_df.columns:
# 只对非空值进行清理
mask = self.seat_df[col].notna()
self.seat_df.loc[mask, col] = self.seat_df.loc[mask, col].astype(str).str.strip()
self.logger.log("✅ 文字数据清理完成")
def filter_seat_data(self, seat_df_raw):
"""过滤和清洗座位数据,移除空行和无效数据"""
self.logger.log("开始过滤座位数据...")
original_count = len(seat_df_raw)
self.logger.log(f"原始数据行数: {original_count}")
# 关键列,这些列都不能为空
key_columns = ['区域', '楼层', '排号', '座位号']
# 创建过滤条件:关键列都不能为空且不能为'nan'字符串
filter_condition = True
for col in key_columns:
if col in seat_df_raw.columns:
# 过滤空值、NaN、'nan'字符串
col_condition = (
seat_df_raw[col].notna() &
(seat_df_raw[col].astype(str).str.strip() != '') &
(seat_df_raw[col].astype(str).str.lower() != 'nan')
)
filter_condition = filter_condition & col_condition
# 应用过滤条件
seat_df_filtered = seat_df_raw[filter_condition].copy()
# 重置索引
seat_df_filtered.reset_index(drop=True, inplace=True)
filtered_count = len(seat_df_filtered)
removed_count = original_count - filtered_count
self.logger.log(f"过滤后数据行数: {filtered_count}")
self.logger.log(f"移除无效行数: {removed_count}")
if removed_count > 0:
self.logger.log(f"✅ 已过滤掉 {removed_count} 行无效数据(空行、示例数据等)")
# 显示过滤后的数据概览
if filtered_count > 0:
areas = seat_df_filtered['区域'].unique()
floors = seat_df_filtered['楼层'].unique()
self.logger.log(f"有效座位区域: {', '.join(areas)}")
self.logger.log(f"有效座位楼层: {', '.join(floors)}")
return seat_df_filtered
def validate_personnel_structure(self):
"""校验人员信息文件结构"""
self.logger.log("\n=== 人员信息结构校验 ===")
required_columns = ['姓名', '证件类型', '证件号', '手机号', '备注']
validation_results = []
# 检查必需列
missing_columns = []
for col in required_columns:
if col not in self.personnel_df.columns:
missing_columns.append(col)
if missing_columns:
self.logger.log(f"❌ 缺少必需列: {missing_columns}")
validation_results.append(False)
else:
self.logger.log("✅ 所有必需列都存在")
validation_results.append(True)
# 检查数据完整性
self.logger.log("\n数据完整性检查:")
for col in ['姓名', '证件类型', '证件号', '手机号']:
if col in self.personnel_df.columns:
null_count = self.personnel_df[col].isnull().sum()
if null_count > 0:
self.logger.log(f"⚠️ {col} 列有 {null_count} 个空值")
validation_results.append(False)
else:
self.logger.log(f"{col} 列数据完整")
validation_results.append(True)
# 检查重复人员(姓名+身份证号组合)
self.logger.log("\n重复人员检查:")
if '姓名' in self.personnel_df.columns and '证件号' in self.personnel_df.columns:
# 创建姓名+证件号的组合标识
self.personnel_df['人员标识'] = self.personnel_df['姓名'].astype(str) + '_' + self.personnel_df['证件号'].astype(str)
duplicate_persons = self.personnel_df[self.personnel_df['人员标识'].duplicated()]
if not duplicate_persons.empty:
self.logger.log(f"⚠️ 发现重复人员:")
for _, person in duplicate_persons.iterrows():
self.logger.log(f" {person['姓名']} (证件号: {person['证件号']})")
validation_results.append(False)
else:
self.logger.log("✅ 无重复人员(姓名+证件号组合检查)")
validation_results.append(True)
# 单独检查重名情况(仅提示,不影响校验结果)
duplicate_names = self.personnel_df[self.personnel_df['姓名'].duplicated(keep=False)]
if not duplicate_names.empty:
self.logger.log(f"📝 发现重名人员(但证件号不同,允许通过):")
name_groups = duplicate_names.groupby('姓名')
for name, group in name_groups:
self.logger.log(f" 姓名: {name}")
for _, person in group.iterrows():
self.logger.log(f" 证件号: {person['证件号']}")
else:
self.logger.log("❌ 缺少姓名或证件号列,无法进行重复检查")
validation_results.append(False)
return all(validation_results)
def validate_seating_groups(self):
"""校验连坐组的完整性和正确性"""
self.logger.log("\n=== 连坐组校验 ===")
validation_results = []
issues = []
i = 0
group_num = 1
while i < len(self.personnel_df):
person = self.personnel_df.iloc[i]
remark = person['备注']
if pd.isna(remark):
# 无备注,单独坐
self.logger.log(f"✅ 第 {group_num} 组: {person['姓名']} (单独)")
i += 1
else:
# 有备注,检查连坐组
try:
group_size = int(remark)
if group_size < 1 or group_size > 10:
issue = f"❌ 第 {group_num} 组: {person['姓名']} 的备注 {group_size} 超出范围 (1-10)"
self.logger.log(issue)
issues.append(issue)
validation_results.append(False)
i += 1
continue
# 检查后续人员是否足够
if i + group_size > len(self.personnel_df):
issue = f"❌ 第 {group_num} 组: {person['姓名']} 需要 {group_size} 人连坐,但后续人员不足"
self.logger.log(issue)
issues.append(issue)
validation_results.append(False)
i += 1
continue
# 检查后续人员的备注是否为空
group_members = []
valid_group = True
for j in range(group_size):
member = self.personnel_df.iloc[i + j]
group_members.append(member['姓名'])
if j == 0:
# 第一个人应该有备注
if pd.isna(member['备注']) or int(member['备注']) != group_size:
issue = f"❌ 第 {group_num} 组: 组长 {member['姓名']} 的备注应该是 {group_size}"
self.logger.log(issue)
issues.append(issue)
valid_group = False
else:
# 后续人员应该没有备注
if not pd.isna(member['备注']):
issue = f"❌ 第 {group_num} 组: {member['姓名']} 应该没有备注(当前备注: {member['备注']}"
self.logger.log(issue)
issues.append(issue)
valid_group = False
if valid_group:
self.logger.log(f"✅ 第 {group_num} 组: {', '.join(group_members)} ({group_size}人连坐)")
validation_results.append(True)
else:
validation_results.append(False)
i += group_size
except ValueError:
issue = f"❌ 第 {group_num} 组: {person['姓名']} 的备注 '{remark}' 不是有效数字"
self.logger.log(issue)
issues.append(issue)
validation_results.append(False)
i += 1
group_num += 1
return all(validation_results), issues
def validate_seat_structure(self):
"""校验座位信息结构和连续性"""
self.logger.log("\n=== 座位信息结构校验 ===")
required_columns = ['区域', '楼层', '排号', '座位号']
validation_results = []
# 检查必需列
missing_columns = []
for col in required_columns:
if col not in self.seat_df.columns:
missing_columns.append(col)
if missing_columns:
self.logger.log(f"❌ 缺少必需列: {missing_columns}")
return False, {}
self.logger.log("✅ 所有必需列都存在")
# 检查数据完整性
for col in required_columns:
null_count = self.seat_df[col].isnull().sum()
if null_count > 0:
self.logger.log(f"{col} 列有 {null_count} 个空值")
validation_results.append(False)
else:
self.logger.log(f"{col} 列数据完整")
validation_results.append(True)
# 分析座位结构
self.logger.log("\n座位结构分析:")
seat_groups = {}
for _, row in self.seat_df.iterrows():
key = (row['区域'], row['楼层'], row['排号'])
if key not in seat_groups:
seat_groups[key] = []
seat_groups[key].append(row['座位号'])
# 检查每排的座位结构和可用连续段
self.logger.log("\n座位结构分析:")
for (area, floor, row_num), seats in seat_groups.items():
# 提取座位号数字
seat_numbers = []
for seat in seats:
try:
seat_numbers.append(int(str(seat).replace('', '')))
except:
self.logger.log(f"{area}-{floor}-{row_num}: 座位号格式异常 '{seat}'")
validation_results.append(False)
continue
seat_numbers.sort()
# 分析连续段
consecutive_segments = []
if seat_numbers:
current_segment = [seat_numbers[0]]
for i in range(1, len(seat_numbers)):
if seat_numbers[i] - seat_numbers[i-1] == 1:
# 连续
current_segment.append(seat_numbers[i])
else:
# 不连续,保存当前段,开始新段
consecutive_segments.append(current_segment)
current_segment = [seat_numbers[i]]
# 添加最后一段
consecutive_segments.append(current_segment)
# 显示分析结果
if len(consecutive_segments) == 1 and len(consecutive_segments[0]) == len(seat_numbers):
self.logger.log(f"{area}-{floor}-{row_num}: {len(seats)} 个座位完全连续 ({min(seat_numbers)}-{max(seat_numbers)})")
else:
segments_info = []
max_segment_size = 0
for segment in consecutive_segments:
if len(segment) == 1:
segments_info.append(f"{segment[0]}")
else:
segments_info.append(f"{segment[0]}-{segment[-1]}")
max_segment_size = max(max_segment_size, len(segment))
self.logger.log(f"📊 {area}-{floor}-{row_num}: {len(seats)} 个座位,{len(consecutive_segments)} 个连续段: {', '.join(segments_info)}")
self.logger.log(f" 最大连续段: {max_segment_size} 个座位")
validation_results.append(True) # 座位不连续不算错误,只是需要特殊处理
return all(validation_results), seat_groups
def validate_capacity_feasibility(self, seat_groups):
"""校验座位容量和分配可行性"""
self.logger.log("\n=== 容量和可行性校验 ===")
# 统计人员总数
total_people = len(self.personnel_df)
total_seats = sum(len(seats) for seats in seat_groups.values())
self.logger.log(f"总人数: {total_people}")
self.logger.log(f"总座位数: {total_seats}")
if total_people > total_seats:
self.logger.log(f"❌ 座位不足: 需要 {total_people} 个座位,只有 {total_seats}")
return False
else:
self.logger.log(f"✅ 座位充足: 剩余 {total_seats - total_people} 个座位")
# 分析连坐组需求
self.logger.log("\n连坐组需求分析:")
group_sizes = []
i = 0
while i < len(self.personnel_df):
person = self.personnel_df.iloc[i]
remark = person['备注']
if pd.isna(remark):
group_sizes.append(1)
i += 1
else:
try:
group_size = int(remark)
group_sizes.append(group_size)
i += group_size
except:
group_sizes.append(1)
i += 1
max_group_size = max(group_sizes)
self.logger.log(f"最大连坐组: {max_group_size}")
# 检查是否有足够的连续座位来容纳最大连坐组
self.logger.log(f"\n连续座位可行性分析:")
max_consecutive_available = 0
feasible_rows = []
for (area, floor, row_num), seats in seat_groups.items():
# 分析每排的连续座位段
seat_numbers = []
for seat in seats:
try:
seat_numbers.append(int(str(seat).replace('', '')))
except:
continue
seat_numbers.sort()
# 找出最大连续段
max_consecutive_in_row = 0
if seat_numbers:
current_consecutive = 1
for i in range(1, len(seat_numbers)):
if seat_numbers[i] - seat_numbers[i-1] == 1:
current_consecutive += 1
else:
max_consecutive_in_row = max(max_consecutive_in_row, current_consecutive)
current_consecutive = 1
max_consecutive_in_row = max(max_consecutive_in_row, current_consecutive)
if max_consecutive_in_row >= max_group_size:
feasible_rows.append((area, floor, row_num, max_consecutive_in_row))
max_consecutive_available = max(max_consecutive_available, max_consecutive_in_row)
self.logger.log(f" {area}-{floor}-{row_num}: 最大连续 {max_consecutive_in_row} 个座位")
self.logger.log(f"\n全场最大连续座位: {max_consecutive_available}")
if max_group_size > max_consecutive_available:
self.logger.log(f"❌ 无法容纳最大连坐组: 需要 {max_group_size} 个连续座位,最大连续段只有 {max_consecutive_available}")
return False
else:
self.logger.log(f"✅ 可以容纳最大连坐组")
self.logger.log(f"可容纳最大连坐组的排数: {len(feasible_rows)}")
# 统计各种大小的连坐组
size_counts = {}
for size in group_sizes:
size_counts[size] = size_counts.get(size, 0) + 1
self.logger.log("\n连坐组分布:")
for size in sorted(size_counts.keys()):
count = size_counts[size]
if size == 1:
self.logger.log(f" 单人组: {count}")
else:
self.logger.log(f" {size}人连坐组: {count}")
return True
def analyze_seating_requirements(self, use_phone_grouping=False):
"""分析人员连坐需求"""
self.logger.log("\n=== 人员连坐需求分析 ===")
if use_phone_grouping:
self.logger.log("🔍 使用手机号分组模式")
return self.analyze_seating_by_phone()
else:
self.logger.log("🔍 使用备注分组模式")
return self.analyze_seating_by_remark()
def analyze_seating_by_phone(self):
"""基于手机号分析连坐需求"""
self.logger.log("根据相同手机号识别连坐组...")
# 按手机号分组
phone_groups = self.personnel_df.groupby('手机号')
self.seating_groups = []
for phone, group in phone_groups:
group_members = [row for _, row in group.iterrows()]
group_size = len(group_members)
if group_size == 1:
# 单人组
self.seating_groups.append({
'type': 'single',
'size': 1,
'members': group_members,
'leader': group_members[0]['姓名'],
'grouping_method': 'phone'
})
self.logger.log(f"📱 单人: {group_members[0]['姓名']} (手机号: {phone})")
else:
# 连坐组
leader_name = group_members[0]['姓名']
member_names = [member['姓名'] for member in group_members]
self.seating_groups.append({
'type': 'group',
'size': group_size,
'members': group_members,
'leader': leader_name,
'grouping_method': 'phone'
})
self.logger.log(f"📱 连坐组: {', '.join(member_names)} (手机号: {phone}, {group_size}人)")
# 统计
size_stats = {}
for group in self.seating_groups:
size = group['size']
size_stats[size] = size_stats.get(size, 0) + 1
self.logger.log(f"\n基于手机号识别出 {len(self.seating_groups)} 个座位组:")
for size in sorted(size_stats.keys()):
count = size_stats[size]
if size == 1:
self.logger.log(f" 单人组: {count}")
else:
self.logger.log(f" {size}人连坐组: {count}")
return True
def analyze_seating_by_remark(self):
"""基于备注分析连坐需求"""
self.logger.log("根据备注数字识别连坐组...")
self.seating_groups = []
i = 0
while i < len(self.personnel_df):
person = self.personnel_df.iloc[i]
remark = person['备注']
if pd.isna(remark):
# 无备注,单独坐
self.seating_groups.append({
'type': 'single',
'size': 1,
'members': [person],
'leader': person['姓名'],
'grouping_method': 'remark'
})
i += 1
else:
# 有备注,按备注数量连坐
group_size = int(remark)
group_members = []
# 收集连坐组成员
for j in range(group_size):
if i + j < len(self.personnel_df):
group_members.append(self.personnel_df.iloc[i + j])
self.seating_groups.append({
'type': 'group' if group_size > 1 else 'single',
'size': len(group_members),
'members': group_members,
'leader': person['姓名'],
'grouping_method': 'remark'
})
i += group_size
# 统计
size_stats = {}
for group in self.seating_groups:
size = group['size']
size_stats[size] = size_stats.get(size, 0) + 1
self.logger.log(f"\n基于备注识别出 {len(self.seating_groups)} 个座位组:")
for size in sorted(size_stats.keys()):
count = size_stats[size]
if size == 1:
self.logger.log(f" 单人组: {count}")
else:
self.logger.log(f" {size}人连坐组: {count}")
return True
def analyze_seat_structure_advanced(self):
"""高级座位结构分析,识别连续段"""
self.logger.log("\n=== 高级座位结构分析 ===")
seat_groups = {}
for _, row in self.seat_df.iterrows():
key = (row['区域'], row['楼层'], row['排号'])
if key not in seat_groups:
seat_groups[key] = []
seat_groups[key].append({
'index': row.name,
'座位号': row['座位号'],
'row_data': row
})
# 分析每排的连续段
self.row_analysis = {}
for key, seats in seat_groups.items():
area, floor, row_num = key
# 按座位号排序
def get_seat_number(seat_info):
try:
return int(seat_info['座位号'].replace('', ''))
except:
return 0
seats.sort(key=get_seat_number)
seat_numbers = [get_seat_number(seat) for seat in seats]
# 找出所有连续段
consecutive_segments = []
if seat_numbers:
current_segment_start = 0
for i in range(1, len(seat_numbers)):
if seat_numbers[i] - seat_numbers[i-1] != 1:
# 发现间隙,结束当前段
consecutive_segments.append({
'start_idx': current_segment_start,
'end_idx': i - 1,
'size': i - current_segment_start,
'seat_numbers': seat_numbers[current_segment_start:i],
'seats': seats[current_segment_start:i]
})
current_segment_start = i
# 添加最后一段
consecutive_segments.append({
'start_idx': current_segment_start,
'end_idx': len(seat_numbers) - 1,
'size': len(seat_numbers) - current_segment_start,
'seat_numbers': seat_numbers[current_segment_start:],
'seats': seats[current_segment_start:]
})
self.row_analysis[key] = {
'total_seats': len(seats),
'all_seats': seats,
'consecutive_segments': consecutive_segments,
'max_consecutive': max(seg['size'] for seg in consecutive_segments) if consecutive_segments else 0
}
# 显示分析结果
if len(consecutive_segments) == 1:
self.logger.log(f"{area}-{floor}-{row_num}: {len(seats)} 个座位完全连续")
else:
segments_info = []
for seg in consecutive_segments:
if seg['size'] == 1:
segments_info.append(f"{seg['seat_numbers'][0]}")
else:
segments_info.append(f"{seg['seat_numbers'][0]}-{seg['seat_numbers'][-1]}")
self.logger.log(f"📊 {area}-{floor}-{row_num}: {len(seats)} 个座位,{len(consecutive_segments)} 段: {', '.join(segments_info)}")
self.logger.log(f" 最大连续段: {self.row_analysis[key]['max_consecutive']} 个座位")
return True
def smart_seat_assignment(self):
"""智能座位分配算法"""
self.logger.log("\n=== 开始智能座位分配 ===")
# 按组大小排序,大组优先分配
sorted_groups = sorted(self.seating_groups, key=lambda x: x['size'], reverse=True)
# 创建座位分配结果
seat_df_copy = self.seat_df.copy()
# 记录已使用的座位
used_seats = set()
assignment_log = []
unassigned_groups = []
self.logger.log(f"需要分配 {len(sorted_groups)} 个组")
for group_idx, seating_group in enumerate(sorted_groups):
group_size = seating_group['size']
group_type = seating_group['type']
leader = seating_group['leader']
self.logger.log(f"\n处理第 {group_idx + 1} 组: {leader} ({group_type}, {group_size} 人)")
if group_size == 1:
# 单人组,找任意可用座位
assigned = False
for (area, floor, row_num), analysis in self.row_analysis.items():
for seat_info in analysis['all_seats']:
if seat_info['index'] not in used_seats:
# 分配座位
person_info = seating_group['members'][0]
seat_index = seat_info['index']
# 覆盖座位表中的人员信息列
seat_df_copy.loc[seat_index, '姓名'] = str(person_info['姓名']).strip()
seat_df_copy.loc[seat_index, '证件类型'] = str(person_info['证件类型']).strip()
seat_df_copy.loc[seat_index, '证件号'] = str(person_info['证件号']).strip()
seat_df_copy.loc[seat_index, '手机国家号'] = person_info.get('Unnamed: 3', 86)
seat_df_copy.loc[seat_index, '手机号'] = str(person_info['手机号']).strip()
# 只有分配备注放在第13列
seat_df_copy.loc[seat_index, '分配备注'] = str(person_info.get('备注', '')).strip()
assignment_log.append({
'组号': group_idx + 1,
'组类型': group_type,
'组大小': group_size,
'组长': leader,
'姓名': person_info['姓名'],
'区域': area,
'楼层': floor,
'排号': row_num,
'座位号': seat_info['座位号']
})
used_seats.add(seat_index)
self.logger.log(f" 分配到 {area}-{floor}-{row_num}: {person_info['姓名']} -> {seat_info['座位号']}")
assigned = True
break
if assigned:
break
else:
# 多人组,需要连续座位
# 更新可用座位分析(排除已使用的座位)
current_row_analysis = {}
for key, analysis in self.row_analysis.items():
available_seats = [seat for seat in analysis['all_seats'] if seat['index'] not in used_seats]
if available_seats:
# 重新分析连续段
available_seats.sort(key=lambda x: int(x['座位号'].replace('', '')))
seat_numbers = [int(seat['座位号'].replace('', '')) for seat in available_seats]
# 找出连续段
consecutive_segments = []
if seat_numbers:
current_segment_start = 0
for i in range(1, len(seat_numbers)):
if seat_numbers[i] - seat_numbers[i-1] != 1:
consecutive_segments.append({
'start_idx': current_segment_start,
'end_idx': i - 1,
'size': i - current_segment_start,
'seats': available_seats[current_segment_start:i]
})
current_segment_start = i
consecutive_segments.append({
'start_idx': current_segment_start,
'end_idx': len(seat_numbers) - 1,
'size': len(seat_numbers) - current_segment_start,
'seats': available_seats[current_segment_start:]
})
current_row_analysis[key] = {
'consecutive_segments': consecutive_segments
}
# 寻找合适的连续座位
assigned = False
for (area, floor, row_num), analysis in current_row_analysis.items():
for segment in analysis['consecutive_segments']:
if segment['size'] >= group_size:
# 找到合适的连续座位
seats_to_use = segment['seats'][:group_size]
seat_numbers = [int(seat['座位号'].replace('', '')) for seat in seats_to_use]
self.logger.log(f" 分配到 {area}-{floor}-{row_num} (连续座位: {min(seat_numbers)}-{max(seat_numbers)})")
# 分配座位
for i, seat_info in enumerate(seats_to_use):
person_info = seating_group['members'][i]
seat_index = seat_info['index']
seat_df_copy.loc[seat_index, '姓名'] = str(person_info['姓名']).strip()
seat_df_copy.loc[seat_index, '证件类型'] = str(person_info['证件类型']).strip()
seat_df_copy.loc[seat_index, '证件号'] = str(person_info['证件号']).strip()
seat_df_copy.loc[seat_index, '手机国家号'] = person_info.get('Unnamed: 3', 86)
seat_df_copy.loc[seat_index, '手机号'] = str(person_info['手机号']).strip()
# 只有分配备注放在第13列
seat_df_copy.loc[seat_index, '分配备注'] = str(person_info.get('备注', '')).strip()
assignment_log.append({
'组号': group_idx + 1,
'组类型': group_type,
'组大小': group_size,
'组长': leader,
'姓名': person_info['姓名'],
'区域': area,
'楼层': floor,
'排号': row_num,
'座位号': seat_info['座位号']
})
used_seats.add(seat_index)
self.logger.log(f" {person_info['姓名']} -> {seat_info['座位号']}")
assigned = True
break
if assigned:
break
if not assigned:
self.logger.log(f" ❌ 无法为第 {group_idx + 1} 组分配座位")
unassigned_groups.append(seating_group)
# 显示未分配的组
if unassigned_groups:
self.logger.log(f"\n⚠️ 有 {len(unassigned_groups)} 个组未能分配座位")
for group in unassigned_groups:
if group['type'] == 'single':
self.logger.log(f" 未分配: {group['leader']}")
else:
member_names = [member['姓名'] for member in group['members']]
self.logger.log(f" 未分配: {', '.join(member_names)} (连坐 {group['size']} 人)")
return seat_df_copy, assignment_log
def save_results(self, seat_df_result, assignment_log):
"""保存分配结果"""
try:
# 保存更新后的座位信息
output_file = self.logger.get_log_path('座位信息_最终分配.xlsx')
seat_df_result.to_excel(output_file, index=False)
self.logger.log(f"\n座位分配结果已保存到: {output_file}")
self.logger.log(f"📋 人员信息已覆盖到座位表对应列:")
self.logger.log(f" 第6列: 姓名")
self.logger.log(f" 第7列: 证件类型")
self.logger.log(f" 第8列: 证件号")
self.logger.log(f" 第9列: 手机国家号")
self.logger.log(f" 第10列: 手机号")
self.logger.log(f"📝 分配备注信息:")
self.logger.log(f" 第13列: 分配备注(新增列,不覆盖原数据)")
self.logger.log(f"💡 座位基本信息第1-5列保持不变")
# 保存分配日志
if assignment_log:
log_df = pd.DataFrame(assignment_log)
log_file = self.logger.get_log_path('最终座位分配日志.xlsx')
log_df.to_excel(log_file, index=False)
self.logger.log(f"分配日志已保存到: {log_file}")
# 显示分配统计
self.logger.log(f"\n=== 分配统计 ===")
self.logger.log(f"总共分配了 {len(assignment_log)} 个座位")
# 按组大小统计
size_stats = log_df.groupby('组大小').agg({
'姓名': 'count',
'组号': 'nunique'
}).rename(columns={'姓名': '总人数', '组号': '组数'})
self.logger.log("\n按组大小统计:")
for size, row in size_stats.iterrows():
if size == 1:
self.logger.log(f" 单人组: {row['组数']} 个组, {row['总人数']}")
else:
self.logger.log(f" {size}人连坐组: {row['组数']} 个组, {row['总人数']}")
# 验证连续性
self.logger.log("\n=== 连续性验证 ===")
consecutive_check = []
for group_num in sorted(log_df['组号'].unique()):
group_data = log_df[log_df['组号'] == group_num]
group_size = group_data.iloc[0]['组大小']
group_leader = group_data.iloc[0]['组长']
if group_size > 1: # 多人组
# 检查是否在同一排
areas = group_data['区域'].unique()
floors = group_data['楼层'].unique()
rows = group_data['排号'].unique()
if len(areas) == 1 and len(floors) == 1 and len(rows) == 1:
# 在同一排,检查座位号是否连续
seats = group_data['座位号'].tolist()
seat_numbers = []
for seat in seats:
try:
seat_numbers.append(int(seat.replace('', '')))
except:
seat_numbers.append(0)
seat_numbers.sort()
is_consecutive = all(seat_numbers[i] + 1 == seat_numbers[i+1] for i in range(len(seat_numbers)-1))
if is_consecutive:
consecutive_check.append(True)
self.logger.log(f"✅ 组 {group_num} ({group_leader}): {group_size}人连坐,座位连续 {seat_numbers}")
else:
consecutive_check.append(False)
self.logger.log(f"❌ 组 {group_num} ({group_leader}): {group_size}人连坐,座位不连续 {seat_numbers}")
else:
consecutive_check.append(False)
self.logger.log(f"❌ 组 {group_num} ({group_leader}): 不在同一排")
success_rate = sum(consecutive_check) / len(consecutive_check) * 100 if consecutive_check else 100
self.logger.log(f"\n连续性检查结果: {sum(consecutive_check)}/{len(consecutive_check)} 个多人组座位连续 ({success_rate:.1f}%)")
return True
except Exception as e:
self.logger.log(f"保存结果时出错: {e}")
return False
def run_validation(self):
"""运行完整的文件校验"""
self.logger.log("=" * 60)
self.logger.log("座位分配系统 - 文件校验")
self.logger.log("=" * 60)
# 识别并设置数据文件
if not self.identify_and_set_files():
return False
# 加载数据
if not self.load_data():
return False
# 人员信息结构校验
personnel_structure_valid = self.validate_personnel_structure()
# 连坐组校验
seating_groups_valid, group_issues = self.validate_seating_groups()
# 座位信息校验
seat_structure_valid, seat_groups = self.validate_seat_structure()
# 容量可行性校验
capacity_valid = self.validate_capacity_feasibility(seat_groups)
# 总结
self.logger.log("\n" + "=" * 60)
self.logger.log("校验结果总结")
self.logger.log("=" * 60)
all_valid = all([
personnel_structure_valid,
seating_groups_valid,
seat_structure_valid,
capacity_valid
])
self.logger.log(f"人员信息结构: {'✅ 通过' if personnel_structure_valid else '❌ 失败'}")
self.logger.log(f"连坐组完整性: {'✅ 通过' if seating_groups_valid else '❌ 失败'}")
self.logger.log(f"座位信息结构: {'✅ 通过' if seat_structure_valid else '❌ 失败'}")
self.logger.log(f"容量可行性: {'✅ 通过' if capacity_valid else '❌ 失败'}")
self.logger.log(f"\n总体校验结果: {'✅ 全部通过' if all_valid else '❌ 存在问题'}")
if group_issues:
self.logger.log(f"\n发现的问题:")
for issue in group_issues:
self.logger.log(f" {issue}")
if all_valid:
self.logger.log("\n🎉 文件校验通过,可以进行座位分配!")
else:
self.logger.log("\n⚠️ 请修复上述问题后再进行座位分配。")
return all_valid
def run_allocation(self):
"""运行完整的座位分配"""
self.logger.log("\n" + "=" * 60)
self.logger.log("开始座位分配")
self.logger.log("=" * 60)
# 选择分组方式
grouping_method = self.choose_grouping_method()
# 分析人员连坐需求
if not self.analyze_seating_requirements(use_phone_grouping=(grouping_method == 'phone')):
return False
# 高级座位结构分析
if not self.analyze_seat_structure_advanced():
return False
# 执行智能座位分配
seat_df_result, assignment_log = self.smart_seat_assignment()
# 保存结果
if self.save_results(seat_df_result, assignment_log):
self.logger.log("\n🎉 座位分配完成!")
return True
else:
self.logger.log("\n❌ 座位分配失败!")
return False
def choose_grouping_method(self):
"""选择分组方式"""
self.logger.log("\n=== 选择连坐分组方式 ===")
self.logger.log("1. 备注分组 - 根据备注数字识别连坐组(默认)")
self.logger.log("2. 手机号分组 - 根据相同手机号识别连坐组")
while True:
try:
choice = input("\n请选择分组方式 (1/2直接回车选择默认): ").strip()
if choice == '' or choice == '1':
self.logger.log("✅ 选择备注分组模式")
return 'remark'
elif choice == '2':
self.logger.log("✅ 选择手机号分组模式")
return 'phone'
else:
print("请输入 1 或 2")
except KeyboardInterrupt:
self.logger.log("\n用户取消操作")
return 'remark'
def main():
"""主函数"""
print("=" * 60)
print("座位分配系统 v2.0 (智能识别版)")
print("=" * 60)
try:
# 自动检查和识别数据文件
success, personnel_file, seat_file = check_data_files()
if not success:
input("\n按Enter键退出...")
return
print("\n开始运行座位分配系统...")
system = SeatAllocationSystem()
# 设置识别出的文件路径
system.set_data_files(personnel_file, seat_file)
# 运行校验
if system.run_validation():
# 校验通过,询问是否继续分配
response = input("\n文件校验通过,是否开始座位分配? (Y/n): ")
if response.lower() in ['', 'y', 'yes']:
# 运行分配
if system.run_allocation():
print("\n🎉 座位分配完成!")
print("请查看log文件夹中的输出文件:")
print("- log/座位信息_最终分配.xlsx (分配结果)")
print("- log/最终座位分配日志.xlsx (详细日志)")
print("- log/seat_allocation_log.txt (运行日志)")
else:
print("\n❌ 座位分配失败!")
else:
print("用户取消座位分配")
else:
print("\n❌ 文件校验失败,请修复问题后重试")
# 保存日志
system.logger.save_logs()
except FileNotFoundError as e:
print(f"\n❌ 文件未找到: {e}")
print("请确保所有必要文件都在程序目录下")
except PermissionError as e:
print(f"\n❌ 文件权限错误: {e}")
print("请确保程序有读写文件的权限")
except Exception as e:
print(f"\n❌ 程序运行出错: {e}")
print("请检查数据文件格式是否正确")
finally:
# 等待用户确认后退出
input("\n程序结束按Enter键退出...")
if __name__ == "__main__":
main()