TableSynthesis/seat_allocation_system.py
2025-06-30 21:00:51 +08:00

1013 lines
42 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
完整的座位分配系统
包含文件校验、智能分配算法和日志输出功能
支持1-10人连坐需求能够处理不连续座位
"""
import sys
import os
from pathlib import Path
import datetime
def check_dependencies():
"""检查并安装必要的依赖包"""
required_packages = {
'pandas': 'pandas>=1.3.0',
'numpy': 'numpy>=1.20.0',
'openpyxl': 'openpyxl>=3.0.0'
}
missing_packages = []
print("检查依赖包...")
for package_name, package_spec in required_packages.items():
try:
__import__(package_name)
print(f"{package_name} 已安装")
except ImportError:
missing_packages.append(package_spec)
print(f"{package_name} 未安装")
if missing_packages:
print(f"\n发现缺失依赖: {', '.join(missing_packages)}")
print("正在尝试自动安装...")
try:
import subprocess
for package in missing_packages:
print(f"安装 {package}...")
result = subprocess.run([sys.executable, '-m', 'pip', 'install', package],
capture_output=True, text=True)
if result.returncode == 0:
print(f"{package} 安装成功")
else:
print(f"{package} 安装失败: {result.stderr}")
return False
except Exception as e:
print(f"❌ 自动安装失败: {e}")
print("\n请手动安装以下依赖包:")
for package in missing_packages:
print(f" pip install {package}")
input("安装完成后按Enter键继续...")
return False
return True
def check_data_files():
"""检查必要的数据文件"""
required_files = ['人员信息.xlsx', '座位信息.xlsx']
missing_files = []
print("\n检查数据文件...")
for file_name in required_files:
if Path(file_name).exists():
print(f"{file_name} 存在")
else:
missing_files.append(file_name)
print(f"{file_name} 不存在")
if missing_files:
print(f"\n❌ 缺少必要文件: {', '.join(missing_files)}")
print("\n请确保以下文件存在于程序同一目录下:")
print("1. 人员信息.xlsx - 包含姓名、证件类型、证件号、手机号、备注等列")
print("2. 座位信息.xlsx - 包含区域、楼层、排号、座位号等列")
print("\n提示: 您可以参考示例文件来准备数据")
return False
return True
# 只有在依赖检查通过后才导入这些包
if check_dependencies():
try:
import pandas as pd
import numpy as np
except ImportError as e:
print(f"❌ 导入依赖包失败: {e}")
input("按Enter键退出...")
sys.exit(1)
else:
print("❌ 依赖检查失败")
input("按Enter键退出...")
sys.exit(1)
class Logger:
"""日志记录器"""
def __init__(self, log_file='seat_allocation_log.txt'):
self.log_file = log_file
self.logs = []
def log(self, message, print_to_console=True):
"""记录日志"""
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log_entry = f"[{timestamp}] {message}"
self.logs.append(log_entry)
if print_to_console:
print(message)
def save_logs(self):
"""保存日志到文件"""
try:
with open(self.log_file, 'w', encoding='utf-8') as f:
f.write('\n'.join(self.logs))
print(f"日志已保存到: {self.log_file}")
return True
except Exception as e:
print(f"保存日志失败: {e}")
return False
class SeatAllocationSystem:
"""座位分配系统"""
def __init__(self):
self.logger = Logger()
self.personnel_df = None
self.seat_df = None
self.seating_groups = []
self.row_analysis = {}
def load_data(self):
"""加载人员信息和座位信息数据"""
self.logger.log("=== 开始加载数据 ===")
# 检查文件是否存在
personnel_file = '人员信息.xlsx'
seat_file = '座位信息.xlsx'
if not Path(personnel_file).exists():
self.logger.log(f"❌ 错误: {personnel_file} 文件不存在")
return False
if not Path(seat_file).exists():
self.logger.log(f"❌ 错误: {seat_file} 文件不存在")
return False
try:
# 读取人员信息,指定数据类型
self.personnel_df = pd.read_excel(personnel_file, dtype={
'姓名': 'str',
'证件类型': 'str',
'证件号': 'str', # 证件号作为字符串读取避免X被转换
'手机号': 'str'
})
# 读取座位信息,指定数据类型
self.seat_df = pd.read_excel(seat_file, dtype={
'区域': 'str',
'楼层': 'str',
'排号': 'str',
'座位号': 'str',
'姓名': 'str',
'证件类型': 'str',
'证件号': 'str',
'手机号': 'str',
'签发地/国籍': 'str'
})
# 清理文字信息中的空格
self.clean_text_data()
self.logger.log(f"✅ 文件加载成功")
self.logger.log(f" 人员信息: {self.personnel_df.shape[0]}× {self.personnel_df.shape[1]}")
self.logger.log(f" 座位信息: {self.seat_df.shape[0]}× {self.seat_df.shape[1]}")
return True
except Exception as e:
self.logger.log(f"❌ 文件加载失败: {e}")
return False
def clean_text_data(self):
"""清理文字数据中的空格"""
self.logger.log("清理文字数据中的空格...")
# 清理人员信息中的空格
text_columns_personnel = ['姓名', '证件类型', '证件号']
for col in text_columns_personnel:
if col in self.personnel_df.columns:
self.personnel_df[col] = self.personnel_df[col].astype(str).str.strip()
# 清理座位信息中的空格
text_columns_seat = ['区域', '楼层', '排号', '座位号', '姓名', '证件类型', '证件号', '签发地/国籍']
for col in text_columns_seat:
if col in self.seat_df.columns:
# 只对非空值进行清理
mask = self.seat_df[col].notna()
self.seat_df.loc[mask, col] = self.seat_df.loc[mask, col].astype(str).str.strip()
self.logger.log("✅ 文字数据清理完成")
def validate_personnel_structure(self):
"""校验人员信息文件结构"""
self.logger.log("\n=== 人员信息结构校验 ===")
required_columns = ['姓名', '证件类型', '证件号', '手机号', '备注']
validation_results = []
# 检查必需列
missing_columns = []
for col in required_columns:
if col not in self.personnel_df.columns:
missing_columns.append(col)
if missing_columns:
self.logger.log(f"❌ 缺少必需列: {missing_columns}")
validation_results.append(False)
else:
self.logger.log("✅ 所有必需列都存在")
validation_results.append(True)
# 检查数据完整性
self.logger.log("\n数据完整性检查:")
for col in ['姓名', '证件类型', '证件号', '手机号']:
if col in self.personnel_df.columns:
null_count = self.personnel_df[col].isnull().sum()
if null_count > 0:
self.logger.log(f"⚠️ {col} 列有 {null_count} 个空值")
validation_results.append(False)
else:
self.logger.log(f"{col} 列数据完整")
validation_results.append(True)
# 检查重复姓名
duplicate_names = self.personnel_df[self.personnel_df['姓名'].duplicated()]
if not duplicate_names.empty:
self.logger.log(f"⚠️ 发现重复姓名: {duplicate_names['姓名'].tolist()}")
validation_results.append(False)
else:
self.logger.log("✅ 姓名无重复")
validation_results.append(True)
return all(validation_results)
def validate_seating_groups(self):
"""校验连坐组的完整性和正确性"""
self.logger.log("\n=== 连坐组校验 ===")
validation_results = []
issues = []
i = 0
group_num = 1
while i < len(self.personnel_df):
person = self.personnel_df.iloc[i]
remark = person['备注']
if pd.isna(remark):
# 无备注,单独坐
self.logger.log(f"✅ 第 {group_num} 组: {person['姓名']} (单独)")
i += 1
else:
# 有备注,检查连坐组
try:
group_size = int(remark)
if group_size < 1 or group_size > 10:
issue = f"❌ 第 {group_num} 组: {person['姓名']} 的备注 {group_size} 超出范围 (1-10)"
self.logger.log(issue)
issues.append(issue)
validation_results.append(False)
i += 1
continue
# 检查后续人员是否足够
if i + group_size > len(self.personnel_df):
issue = f"❌ 第 {group_num} 组: {person['姓名']} 需要 {group_size} 人连坐,但后续人员不足"
self.logger.log(issue)
issues.append(issue)
validation_results.append(False)
i += 1
continue
# 检查后续人员的备注是否为空
group_members = []
valid_group = True
for j in range(group_size):
member = self.personnel_df.iloc[i + j]
group_members.append(member['姓名'])
if j == 0:
# 第一个人应该有备注
if pd.isna(member['备注']) or int(member['备注']) != group_size:
issue = f"❌ 第 {group_num} 组: 组长 {member['姓名']} 的备注应该是 {group_size}"
self.logger.log(issue)
issues.append(issue)
valid_group = False
else:
# 后续人员应该没有备注
if not pd.isna(member['备注']):
issue = f"❌ 第 {group_num} 组: {member['姓名']} 应该没有备注(当前备注: {member['备注']}"
self.logger.log(issue)
issues.append(issue)
valid_group = False
if valid_group:
self.logger.log(f"✅ 第 {group_num} 组: {', '.join(group_members)} ({group_size}人连坐)")
validation_results.append(True)
else:
validation_results.append(False)
i += group_size
except ValueError:
issue = f"❌ 第 {group_num} 组: {person['姓名']} 的备注 '{remark}' 不是有效数字"
self.logger.log(issue)
issues.append(issue)
validation_results.append(False)
i += 1
group_num += 1
return all(validation_results), issues
def validate_seat_structure(self):
"""校验座位信息结构和连续性"""
self.logger.log("\n=== 座位信息结构校验 ===")
required_columns = ['区域', '楼层', '排号', '座位号']
validation_results = []
# 检查必需列
missing_columns = []
for col in required_columns:
if col not in self.seat_df.columns:
missing_columns.append(col)
if missing_columns:
self.logger.log(f"❌ 缺少必需列: {missing_columns}")
return False, {}
self.logger.log("✅ 所有必需列都存在")
# 检查数据完整性
for col in required_columns:
null_count = self.seat_df[col].isnull().sum()
if null_count > 0:
self.logger.log(f"{col} 列有 {null_count} 个空值")
validation_results.append(False)
else:
self.logger.log(f"{col} 列数据完整")
validation_results.append(True)
# 分析座位结构
self.logger.log("\n座位结构分析:")
seat_groups = {}
for _, row in self.seat_df.iterrows():
key = (row['区域'], row['楼层'], row['排号'])
if key not in seat_groups:
seat_groups[key] = []
seat_groups[key].append(row['座位号'])
# 检查每排的座位结构和可用连续段
self.logger.log("\n座位结构分析:")
for (area, floor, row_num), seats in seat_groups.items():
# 提取座位号数字
seat_numbers = []
for seat in seats:
try:
seat_numbers.append(int(str(seat).replace('', '')))
except:
self.logger.log(f"{area}-{floor}-{row_num}: 座位号格式异常 '{seat}'")
validation_results.append(False)
continue
seat_numbers.sort()
# 分析连续段
consecutive_segments = []
if seat_numbers:
current_segment = [seat_numbers[0]]
for i in range(1, len(seat_numbers)):
if seat_numbers[i] - seat_numbers[i-1] == 1:
# 连续
current_segment.append(seat_numbers[i])
else:
# 不连续,保存当前段,开始新段
consecutive_segments.append(current_segment)
current_segment = [seat_numbers[i]]
# 添加最后一段
consecutive_segments.append(current_segment)
# 显示分析结果
if len(consecutive_segments) == 1 and len(consecutive_segments[0]) == len(seat_numbers):
self.logger.log(f"{area}-{floor}-{row_num}: {len(seats)} 个座位完全连续 ({min(seat_numbers)}-{max(seat_numbers)})")
else:
segments_info = []
max_segment_size = 0
for segment in consecutive_segments:
if len(segment) == 1:
segments_info.append(f"{segment[0]}")
else:
segments_info.append(f"{segment[0]}-{segment[-1]}")
max_segment_size = max(max_segment_size, len(segment))
self.logger.log(f"📊 {area}-{floor}-{row_num}: {len(seats)} 个座位,{len(consecutive_segments)} 个连续段: {', '.join(segments_info)}")
self.logger.log(f" 最大连续段: {max_segment_size} 个座位")
validation_results.append(True) # 座位不连续不算错误,只是需要特殊处理
return all(validation_results), seat_groups
def validate_capacity_feasibility(self, seat_groups):
"""校验座位容量和分配可行性"""
self.logger.log("\n=== 容量和可行性校验 ===")
# 统计人员总数
total_people = len(self.personnel_df)
total_seats = sum(len(seats) for seats in seat_groups.values())
self.logger.log(f"总人数: {total_people}")
self.logger.log(f"总座位数: {total_seats}")
if total_people > total_seats:
self.logger.log(f"❌ 座位不足: 需要 {total_people} 个座位,只有 {total_seats}")
return False
else:
self.logger.log(f"✅ 座位充足: 剩余 {total_seats - total_people} 个座位")
# 分析连坐组需求
self.logger.log("\n连坐组需求分析:")
group_sizes = []
i = 0
while i < len(self.personnel_df):
person = self.personnel_df.iloc[i]
remark = person['备注']
if pd.isna(remark):
group_sizes.append(1)
i += 1
else:
try:
group_size = int(remark)
group_sizes.append(group_size)
i += group_size
except:
group_sizes.append(1)
i += 1
max_group_size = max(group_sizes)
self.logger.log(f"最大连坐组: {max_group_size}")
# 检查是否有足够的连续座位来容纳最大连坐组
self.logger.log(f"\n连续座位可行性分析:")
max_consecutive_available = 0
feasible_rows = []
for (area, floor, row_num), seats in seat_groups.items():
# 分析每排的连续座位段
seat_numbers = []
for seat in seats:
try:
seat_numbers.append(int(str(seat).replace('', '')))
except:
continue
seat_numbers.sort()
# 找出最大连续段
max_consecutive_in_row = 0
if seat_numbers:
current_consecutive = 1
for i in range(1, len(seat_numbers)):
if seat_numbers[i] - seat_numbers[i-1] == 1:
current_consecutive += 1
else:
max_consecutive_in_row = max(max_consecutive_in_row, current_consecutive)
current_consecutive = 1
max_consecutive_in_row = max(max_consecutive_in_row, current_consecutive)
if max_consecutive_in_row >= max_group_size:
feasible_rows.append((area, floor, row_num, max_consecutive_in_row))
max_consecutive_available = max(max_consecutive_available, max_consecutive_in_row)
self.logger.log(f" {area}-{floor}-{row_num}: 最大连续 {max_consecutive_in_row} 个座位")
self.logger.log(f"\n全场最大连续座位: {max_consecutive_available}")
if max_group_size > max_consecutive_available:
self.logger.log(f"❌ 无法容纳最大连坐组: 需要 {max_group_size} 个连续座位,最大连续段只有 {max_consecutive_available}")
return False
else:
self.logger.log(f"✅ 可以容纳最大连坐组")
self.logger.log(f"可容纳最大连坐组的排数: {len(feasible_rows)}")
# 统计各种大小的连坐组
size_counts = {}
for size in group_sizes:
size_counts[size] = size_counts.get(size, 0) + 1
self.logger.log("\n连坐组分布:")
for size in sorted(size_counts.keys()):
count = size_counts[size]
if size == 1:
self.logger.log(f" 单人组: {count}")
else:
self.logger.log(f" {size}人连坐组: {count}")
return True
def analyze_seating_requirements(self):
"""分析人员连坐需求"""
self.logger.log("\n=== 人员连坐需求分析 ===")
self.seating_groups = []
i = 0
while i < len(self.personnel_df):
person = self.personnel_df.iloc[i]
remark = person['备注']
if pd.isna(remark):
# 无备注,单独坐
self.seating_groups.append({
'type': 'single',
'size': 1,
'members': [person],
'leader': person['姓名']
})
i += 1
else:
# 有备注,按备注数量连坐
group_size = int(remark)
group_members = []
# 收集连坐组成员
for j in range(group_size):
if i + j < len(self.personnel_df):
group_members.append(self.personnel_df.iloc[i + j])
self.seating_groups.append({
'type': 'group' if group_size > 1 else 'single',
'size': len(group_members),
'members': group_members,
'leader': person['姓名']
})
i += group_size
# 统计
size_stats = {}
for group in self.seating_groups:
size = group['size']
size_stats[size] = size_stats.get(size, 0) + 1
self.logger.log(f"总共识别出 {len(self.seating_groups)} 个座位组:")
for size in sorted(size_stats.keys()):
count = size_stats[size]
if size == 1:
self.logger.log(f" 单人组: {count}")
else:
self.logger.log(f" {size}人连坐组: {count}")
return True
def analyze_seat_structure_advanced(self):
"""高级座位结构分析,识别连续段"""
self.logger.log("\n=== 高级座位结构分析 ===")
seat_groups = {}
for _, row in self.seat_df.iterrows():
key = (row['区域'], row['楼层'], row['排号'])
if key not in seat_groups:
seat_groups[key] = []
seat_groups[key].append({
'index': row.name,
'座位号': row['座位号'],
'row_data': row
})
# 分析每排的连续段
self.row_analysis = {}
for key, seats in seat_groups.items():
area, floor, row_num = key
# 按座位号排序
def get_seat_number(seat_info):
try:
return int(seat_info['座位号'].replace('', ''))
except:
return 0
seats.sort(key=get_seat_number)
seat_numbers = [get_seat_number(seat) for seat in seats]
# 找出所有连续段
consecutive_segments = []
if seat_numbers:
current_segment_start = 0
for i in range(1, len(seat_numbers)):
if seat_numbers[i] - seat_numbers[i-1] != 1:
# 发现间隙,结束当前段
consecutive_segments.append({
'start_idx': current_segment_start,
'end_idx': i - 1,
'size': i - current_segment_start,
'seat_numbers': seat_numbers[current_segment_start:i],
'seats': seats[current_segment_start:i]
})
current_segment_start = i
# 添加最后一段
consecutive_segments.append({
'start_idx': current_segment_start,
'end_idx': len(seat_numbers) - 1,
'size': len(seat_numbers) - current_segment_start,
'seat_numbers': seat_numbers[current_segment_start:],
'seats': seats[current_segment_start:]
})
self.row_analysis[key] = {
'total_seats': len(seats),
'all_seats': seats,
'consecutive_segments': consecutive_segments,
'max_consecutive': max(seg['size'] for seg in consecutive_segments) if consecutive_segments else 0
}
# 显示分析结果
if len(consecutive_segments) == 1:
self.logger.log(f"{area}-{floor}-{row_num}: {len(seats)} 个座位完全连续")
else:
segments_info = []
for seg in consecutive_segments:
if seg['size'] == 1:
segments_info.append(f"{seg['seat_numbers'][0]}")
else:
segments_info.append(f"{seg['seat_numbers'][0]}-{seg['seat_numbers'][-1]}")
self.logger.log(f"📊 {area}-{floor}-{row_num}: {len(seats)} 个座位,{len(consecutive_segments)} 段: {', '.join(segments_info)}")
self.logger.log(f" 最大连续段: {self.row_analysis[key]['max_consecutive']} 个座位")
return True
def smart_seat_assignment(self):
"""智能座位分配算法"""
self.logger.log("\n=== 开始智能座位分配 ===")
# 按组大小排序,大组优先分配
sorted_groups = sorted(self.seating_groups, key=lambda x: x['size'], reverse=True)
# 创建座位分配结果
seat_df_copy = self.seat_df.copy()
# 记录已使用的座位
used_seats = set()
assignment_log = []
unassigned_groups = []
self.logger.log(f"需要分配 {len(sorted_groups)} 个组")
for group_idx, seating_group in enumerate(sorted_groups):
group_size = seating_group['size']
group_type = seating_group['type']
leader = seating_group['leader']
self.logger.log(f"\n处理第 {group_idx + 1} 组: {leader} ({group_type}, {group_size} 人)")
if group_size == 1:
# 单人组,找任意可用座位
assigned = False
for (area, floor, row_num), analysis in self.row_analysis.items():
for seat_info in analysis['all_seats']:
if seat_info['index'] not in used_seats:
# 分配座位
person_info = seating_group['members'][0]
seat_index = seat_info['index']
# 更新座位信息,确保数据类型正确
seat_df_copy.loc[seat_index, '姓名'] = str(person_info['姓名']).strip()
seat_df_copy.loc[seat_index, '证件类型'] = str(person_info['证件类型']).strip()
seat_df_copy.loc[seat_index, '证件号'] = str(person_info['证件号']).strip()
seat_df_copy.loc[seat_index, '手机国家号'] = person_info.get('Unnamed: 3', 86)
seat_df_copy.loc[seat_index, '手机号'] = str(person_info['手机号']).strip()
seat_df_copy.loc[seat_index, '签发地/国籍'] = str(person_info.get('备注', '')).strip()
assignment_log.append({
'组号': group_idx + 1,
'组类型': group_type,
'组大小': group_size,
'组长': leader,
'姓名': person_info['姓名'],
'区域': area,
'楼层': floor,
'排号': row_num,
'座位号': seat_info['座位号']
})
used_seats.add(seat_index)
self.logger.log(f" 分配到 {area}-{floor}-{row_num}: {person_info['姓名']} -> {seat_info['座位号']}")
assigned = True
break
if assigned:
break
else:
# 多人组,需要连续座位
# 更新可用座位分析(排除已使用的座位)
current_row_analysis = {}
for key, analysis in self.row_analysis.items():
available_seats = [seat for seat in analysis['all_seats'] if seat['index'] not in used_seats]
if available_seats:
# 重新分析连续段
available_seats.sort(key=lambda x: int(x['座位号'].replace('', '')))
seat_numbers = [int(seat['座位号'].replace('', '')) for seat in available_seats]
# 找出连续段
consecutive_segments = []
if seat_numbers:
current_segment_start = 0
for i in range(1, len(seat_numbers)):
if seat_numbers[i] - seat_numbers[i-1] != 1:
consecutive_segments.append({
'start_idx': current_segment_start,
'end_idx': i - 1,
'size': i - current_segment_start,
'seats': available_seats[current_segment_start:i]
})
current_segment_start = i
consecutive_segments.append({
'start_idx': current_segment_start,
'end_idx': len(seat_numbers) - 1,
'size': len(seat_numbers) - current_segment_start,
'seats': available_seats[current_segment_start:]
})
current_row_analysis[key] = {
'consecutive_segments': consecutive_segments
}
# 寻找合适的连续座位
assigned = False
for (area, floor, row_num), analysis in current_row_analysis.items():
for segment in analysis['consecutive_segments']:
if segment['size'] >= group_size:
# 找到合适的连续座位
seats_to_use = segment['seats'][:group_size]
seat_numbers = [int(seat['座位号'].replace('', '')) for seat in seats_to_use]
self.logger.log(f" 分配到 {area}-{floor}-{row_num} (连续座位: {min(seat_numbers)}-{max(seat_numbers)})")
# 分配座位
for i, seat_info in enumerate(seats_to_use):
person_info = seating_group['members'][i]
seat_index = seat_info['index']
seat_df_copy.loc[seat_index, '姓名'] = str(person_info['姓名']).strip()
seat_df_copy.loc[seat_index, '证件类型'] = str(person_info['证件类型']).strip()
seat_df_copy.loc[seat_index, '证件号'] = str(person_info['证件号']).strip()
seat_df_copy.loc[seat_index, '手机国家号'] = person_info.get('Unnamed: 3', 86)
seat_df_copy.loc[seat_index, '手机号'] = str(person_info['手机号']).strip()
seat_df_copy.loc[seat_index, '签发地/国籍'] = str(person_info.get('备注', '')).strip()
assignment_log.append({
'组号': group_idx + 1,
'组类型': group_type,
'组大小': group_size,
'组长': leader,
'姓名': person_info['姓名'],
'区域': area,
'楼层': floor,
'排号': row_num,
'座位号': seat_info['座位号']
})
used_seats.add(seat_index)
self.logger.log(f" {person_info['姓名']} -> {seat_info['座位号']}")
assigned = True
break
if assigned:
break
if not assigned:
self.logger.log(f" ❌ 无法为第 {group_idx + 1} 组分配座位")
unassigned_groups.append(seating_group)
# 显示未分配的组
if unassigned_groups:
self.logger.log(f"\n⚠️ 有 {len(unassigned_groups)} 个组未能分配座位")
for group in unassigned_groups:
if group['type'] == 'single':
self.logger.log(f" 未分配: {group['leader']}")
else:
member_names = [member['姓名'] for member in group['members']]
self.logger.log(f" 未分配: {', '.join(member_names)} (连坐 {group['size']} 人)")
return seat_df_copy, assignment_log
def save_results(self, seat_df_result, assignment_log):
"""保存分配结果"""
try:
# 保存更新后的座位信息
output_file = '座位信息_最终分配.xlsx'
seat_df_result.to_excel(output_file, index=False)
self.logger.log(f"\n座位分配结果已保存到: {output_file}")
# 保存分配日志
if assignment_log:
log_df = pd.DataFrame(assignment_log)
log_file = '最终座位分配日志.xlsx'
log_df.to_excel(log_file, index=False)
self.logger.log(f"分配日志已保存到: {log_file}")
# 显示分配统计
self.logger.log(f"\n=== 分配统计 ===")
self.logger.log(f"总共分配了 {len(assignment_log)} 个座位")
# 按组大小统计
size_stats = log_df.groupby('组大小').agg({
'姓名': 'count',
'组号': 'nunique'
}).rename(columns={'姓名': '总人数', '组号': '组数'})
self.logger.log("\n按组大小统计:")
for size, row in size_stats.iterrows():
if size == 1:
self.logger.log(f" 单人组: {row['组数']} 个组, {row['总人数']}")
else:
self.logger.log(f" {size}人连坐组: {row['组数']} 个组, {row['总人数']}")
# 验证连续性
self.logger.log("\n=== 连续性验证 ===")
consecutive_check = []
for group_num in sorted(log_df['组号'].unique()):
group_data = log_df[log_df['组号'] == group_num]
group_size = group_data.iloc[0]['组大小']
group_leader = group_data.iloc[0]['组长']
if group_size > 1: # 多人组
# 检查是否在同一排
areas = group_data['区域'].unique()
floors = group_data['楼层'].unique()
rows = group_data['排号'].unique()
if len(areas) == 1 and len(floors) == 1 and len(rows) == 1:
# 在同一排,检查座位号是否连续
seats = group_data['座位号'].tolist()
seat_numbers = []
for seat in seats:
try:
seat_numbers.append(int(seat.replace('', '')))
except:
seat_numbers.append(0)
seat_numbers.sort()
is_consecutive = all(seat_numbers[i] + 1 == seat_numbers[i+1] for i in range(len(seat_numbers)-1))
if is_consecutive:
consecutive_check.append(True)
self.logger.log(f"✅ 组 {group_num} ({group_leader}): {group_size}人连坐,座位连续 {seat_numbers}")
else:
consecutive_check.append(False)
self.logger.log(f"❌ 组 {group_num} ({group_leader}): {group_size}人连坐,座位不连续 {seat_numbers}")
else:
consecutive_check.append(False)
self.logger.log(f"❌ 组 {group_num} ({group_leader}): 不在同一排")
success_rate = sum(consecutive_check) / len(consecutive_check) * 100 if consecutive_check else 100
self.logger.log(f"\n连续性检查结果: {sum(consecutive_check)}/{len(consecutive_check)} 个多人组座位连续 ({success_rate:.1f}%)")
return True
except Exception as e:
self.logger.log(f"保存结果时出错: {e}")
return False
def run_validation(self):
"""运行完整的文件校验"""
self.logger.log("=" * 60)
self.logger.log("座位分配系统 - 文件校验")
self.logger.log("=" * 60)
# 加载数据
if not self.load_data():
return False
# 人员信息结构校验
personnel_structure_valid = self.validate_personnel_structure()
# 连坐组校验
seating_groups_valid, group_issues = self.validate_seating_groups()
# 座位信息校验
seat_structure_valid, seat_groups = self.validate_seat_structure()
# 容量可行性校验
capacity_valid = self.validate_capacity_feasibility(seat_groups)
# 总结
self.logger.log("\n" + "=" * 60)
self.logger.log("校验结果总结")
self.logger.log("=" * 60)
all_valid = all([
personnel_structure_valid,
seating_groups_valid,
seat_structure_valid,
capacity_valid
])
self.logger.log(f"人员信息结构: {'✅ 通过' if personnel_structure_valid else '❌ 失败'}")
self.logger.log(f"连坐组完整性: {'✅ 通过' if seating_groups_valid else '❌ 失败'}")
self.logger.log(f"座位信息结构: {'✅ 通过' if seat_structure_valid else '❌ 失败'}")
self.logger.log(f"容量可行性: {'✅ 通过' if capacity_valid else '❌ 失败'}")
self.logger.log(f"\n总体校验结果: {'✅ 全部通过' if all_valid else '❌ 存在问题'}")
if group_issues:
self.logger.log(f"\n发现的问题:")
for issue in group_issues:
self.logger.log(f" {issue}")
if all_valid:
self.logger.log("\n🎉 文件校验通过,可以进行座位分配!")
else:
self.logger.log("\n⚠️ 请修复上述问题后再进行座位分配。")
return all_valid
def run_allocation(self):
"""运行完整的座位分配"""
self.logger.log("\n" + "=" * 60)
self.logger.log("开始座位分配")
self.logger.log("=" * 60)
# 分析人员连坐需求
if not self.analyze_seating_requirements():
return False
# 高级座位结构分析
if not self.analyze_seat_structure_advanced():
return False
# 执行智能座位分配
seat_df_result, assignment_log = self.smart_seat_assignment()
# 保存结果
if self.save_results(seat_df_result, assignment_log):
self.logger.log("\n🎉 座位分配完成!")
return True
else:
self.logger.log("\n❌ 座位分配失败!")
return False
def main():
"""主函数"""
print("=" * 60)
print("座位分配系统 v1.0")
print("=" * 60)
try:
# 检查数据文件
if not check_data_files():
input("\n按Enter键退出...")
return
print("\n开始运行座位分配系统...")
system = SeatAllocationSystem()
# 运行校验
if system.run_validation():
# 校验通过,询问是否继续分配
response = input("\n文件校验通过,是否开始座位分配? (Y/n): ")
if response.lower() in ['', 'y', 'yes']:
# 运行分配
if system.run_allocation():
print("\n🎉 座位分配完成!")
print("请查看以下输出文件:")
print("- 座位信息_最终分配.xlsx (分配结果)")
print("- 最终座位分配日志.xlsx (详细日志)")
print("- seat_allocation_log.txt (运行日志)")
else:
print("\n❌ 座位分配失败!")
else:
print("用户取消座位分配")
else:
print("\n❌ 文件校验失败,请修复问题后重试")
# 保存日志
system.logger.save_logs()
except FileNotFoundError as e:
print(f"\n❌ 文件未找到: {e}")
print("请确保所有必要文件都在程序目录下")
except PermissionError as e:
print(f"\n❌ 文件权限错误: {e}")
print("请确保程序有读写文件的权限")
except Exception as e:
print(f"\n❌ 程序运行出错: {e}")
print("请检查数据文件格式是否正确")
finally:
# 等待用户确认后退出
input("\n程序结束按Enter键退出...")
if __name__ == "__main__":
main()