项目基本完成

This commit is contained in:
2025-06-30 10:36:41 +08:00
commit 672760bb25
12 changed files with 1725 additions and 0 deletions

898
seat_allocation_system.py Normal file
View File

@@ -0,0 +1,898 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
完整的座位分配系统
包含文件校验、智能分配算法和日志输出功能
支持1-10人连坐需求能够处理不连续座位
"""
import pandas as pd
import numpy as np
from pathlib import Path
import datetime
import sys
class Logger:
"""日志记录器"""
def __init__(self, log_file='seat_allocation_log.txt'):
self.log_file = log_file
self.logs = []
def log(self, message, print_to_console=True):
"""记录日志"""
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log_entry = f"[{timestamp}] {message}"
self.logs.append(log_entry)
if print_to_console:
print(message)
def save_logs(self):
"""保存日志到文件"""
try:
with open(self.log_file, 'w', encoding='utf-8') as f:
f.write('\n'.join(self.logs))
self.log(f"日志已保存到: {self.log_file}")
return True
except Exception as e:
print(f"保存日志失败: {e}")
return False
class SeatAllocationSystem:
"""座位分配系统"""
def __init__(self):
self.logger = Logger()
self.personnel_df = None
self.seat_df = None
self.seating_groups = []
self.row_analysis = {}
def load_data(self):
"""加载人员信息和座位信息数据"""
self.logger.log("=== 开始加载数据 ===")
# 检查文件是否存在
personnel_file = '人员信息.xlsx'
seat_file = '座位信息.xlsx'
if not Path(personnel_file).exists():
self.logger.log(f"❌ 错误: {personnel_file} 文件不存在")
return False
if not Path(seat_file).exists():
self.logger.log(f"❌ 错误: {seat_file} 文件不存在")
return False
try:
# 读取人员信息,指定数据类型
self.personnel_df = pd.read_excel(personnel_file, dtype={
'姓名': 'str',
'证件类型': 'str',
'证件号': 'str', # 证件号作为字符串读取避免X被转换
'手机号': 'str'
})
# 读取座位信息,指定数据类型
self.seat_df = pd.read_excel(seat_file, dtype={
'区域': 'str',
'楼层': 'str',
'排号': 'str',
'座位号': 'str',
'姓名': 'str',
'证件类型': 'str',
'证件号': 'str',
'手机号': 'str',
'签发地/国籍': 'str'
})
# 清理文字信息中的空格
self.clean_text_data()
self.logger.log(f"✅ 文件加载成功")
self.logger.log(f" 人员信息: {self.personnel_df.shape[0]}× {self.personnel_df.shape[1]}")
self.logger.log(f" 座位信息: {self.seat_df.shape[0]}× {self.seat_df.shape[1]}")
return True
except Exception as e:
self.logger.log(f"❌ 文件加载失败: {e}")
return False
def clean_text_data(self):
"""清理文字数据中的空格"""
self.logger.log("清理文字数据中的空格...")
# 清理人员信息中的空格
text_columns_personnel = ['姓名', '证件类型', '证件号']
for col in text_columns_personnel:
if col in self.personnel_df.columns:
self.personnel_df[col] = self.personnel_df[col].astype(str).str.strip()
# 清理座位信息中的空格
text_columns_seat = ['区域', '楼层', '排号', '座位号', '姓名', '证件类型', '证件号', '签发地/国籍']
for col in text_columns_seat:
if col in self.seat_df.columns:
# 只对非空值进行清理
mask = self.seat_df[col].notna()
self.seat_df.loc[mask, col] = self.seat_df.loc[mask, col].astype(str).str.strip()
self.logger.log("✅ 文字数据清理完成")
def validate_personnel_structure(self):
"""校验人员信息文件结构"""
self.logger.log("\n=== 人员信息结构校验 ===")
required_columns = ['姓名', '证件类型', '证件号', '手机号', '备注']
validation_results = []
# 检查必需列
missing_columns = []
for col in required_columns:
if col not in self.personnel_df.columns:
missing_columns.append(col)
if missing_columns:
self.logger.log(f"❌ 缺少必需列: {missing_columns}")
validation_results.append(False)
else:
self.logger.log("✅ 所有必需列都存在")
validation_results.append(True)
# 检查数据完整性
self.logger.log("\n数据完整性检查:")
for col in ['姓名', '证件类型', '证件号', '手机号']:
if col in self.personnel_df.columns:
null_count = self.personnel_df[col].isnull().sum()
if null_count > 0:
self.logger.log(f"⚠️ {col} 列有 {null_count} 个空值")
validation_results.append(False)
else:
self.logger.log(f"{col} 列数据完整")
validation_results.append(True)
# 检查重复姓名
duplicate_names = self.personnel_df[self.personnel_df['姓名'].duplicated()]
if not duplicate_names.empty:
self.logger.log(f"⚠️ 发现重复姓名: {duplicate_names['姓名'].tolist()}")
validation_results.append(False)
else:
self.logger.log("✅ 姓名无重复")
validation_results.append(True)
return all(validation_results)
def validate_seating_groups(self):
"""校验连坐组的完整性和正确性"""
self.logger.log("\n=== 连坐组校验 ===")
validation_results = []
issues = []
i = 0
group_num = 1
while i < len(self.personnel_df):
person = self.personnel_df.iloc[i]
remark = person['备注']
if pd.isna(remark):
# 无备注,单独坐
self.logger.log(f"✅ 第 {group_num} 组: {person['姓名']} (单独)")
i += 1
else:
# 有备注,检查连坐组
try:
group_size = int(remark)
if group_size < 1 or group_size > 10:
issue = f"❌ 第 {group_num} 组: {person['姓名']} 的备注 {group_size} 超出范围 (1-10)"
self.logger.log(issue)
issues.append(issue)
validation_results.append(False)
i += 1
continue
# 检查后续人员是否足够
if i + group_size > len(self.personnel_df):
issue = f"❌ 第 {group_num} 组: {person['姓名']} 需要 {group_size} 人连坐,但后续人员不足"
self.logger.log(issue)
issues.append(issue)
validation_results.append(False)
i += 1
continue
# 检查后续人员的备注是否为空
group_members = []
valid_group = True
for j in range(group_size):
member = self.personnel_df.iloc[i + j]
group_members.append(member['姓名'])
if j == 0:
# 第一个人应该有备注
if pd.isna(member['备注']) or int(member['备注']) != group_size:
issue = f"❌ 第 {group_num} 组: 组长 {member['姓名']} 的备注应该是 {group_size}"
self.logger.log(issue)
issues.append(issue)
valid_group = False
else:
# 后续人员应该没有备注
if not pd.isna(member['备注']):
issue = f"❌ 第 {group_num} 组: {member['姓名']} 应该没有备注(当前备注: {member['备注']}"
self.logger.log(issue)
issues.append(issue)
valid_group = False
if valid_group:
self.logger.log(f"✅ 第 {group_num} 组: {', '.join(group_members)} ({group_size}人连坐)")
validation_results.append(True)
else:
validation_results.append(False)
i += group_size
except ValueError:
issue = f"❌ 第 {group_num} 组: {person['姓名']} 的备注 '{remark}' 不是有效数字"
self.logger.log(issue)
issues.append(issue)
validation_results.append(False)
i += 1
group_num += 1
return all(validation_results), issues
def validate_seat_structure(self):
"""校验座位信息结构和连续性"""
self.logger.log("\n=== 座位信息结构校验 ===")
required_columns = ['区域', '楼层', '排号', '座位号']
validation_results = []
# 检查必需列
missing_columns = []
for col in required_columns:
if col not in self.seat_df.columns:
missing_columns.append(col)
if missing_columns:
self.logger.log(f"❌ 缺少必需列: {missing_columns}")
return False, {}
self.logger.log("✅ 所有必需列都存在")
# 检查数据完整性
for col in required_columns:
null_count = self.seat_df[col].isnull().sum()
if null_count > 0:
self.logger.log(f"{col} 列有 {null_count} 个空值")
validation_results.append(False)
else:
self.logger.log(f"{col} 列数据完整")
validation_results.append(True)
# 分析座位结构
self.logger.log("\n座位结构分析:")
seat_groups = {}
for _, row in self.seat_df.iterrows():
key = (row['区域'], row['楼层'], row['排号'])
if key not in seat_groups:
seat_groups[key] = []
seat_groups[key].append(row['座位号'])
# 检查每排的座位结构和可用连续段
self.logger.log("\n座位结构分析:")
for (area, floor, row_num), seats in seat_groups.items():
# 提取座位号数字
seat_numbers = []
for seat in seats:
try:
seat_numbers.append(int(str(seat).replace('', '')))
except:
self.logger.log(f"{area}-{floor}-{row_num}: 座位号格式异常 '{seat}'")
validation_results.append(False)
continue
seat_numbers.sort()
# 分析连续段
consecutive_segments = []
if seat_numbers:
current_segment = [seat_numbers[0]]
for i in range(1, len(seat_numbers)):
if seat_numbers[i] - seat_numbers[i-1] == 1:
# 连续
current_segment.append(seat_numbers[i])
else:
# 不连续,保存当前段,开始新段
consecutive_segments.append(current_segment)
current_segment = [seat_numbers[i]]
# 添加最后一段
consecutive_segments.append(current_segment)
# 显示分析结果
if len(consecutive_segments) == 1 and len(consecutive_segments[0]) == len(seat_numbers):
self.logger.log(f"{area}-{floor}-{row_num}: {len(seats)} 个座位完全连续 ({min(seat_numbers)}-{max(seat_numbers)})")
else:
segments_info = []
max_segment_size = 0
for segment in consecutive_segments:
if len(segment) == 1:
segments_info.append(f"{segment[0]}")
else:
segments_info.append(f"{segment[0]}-{segment[-1]}")
max_segment_size = max(max_segment_size, len(segment))
self.logger.log(f"📊 {area}-{floor}-{row_num}: {len(seats)} 个座位,{len(consecutive_segments)} 个连续段: {', '.join(segments_info)}")
self.logger.log(f" 最大连续段: {max_segment_size} 个座位")
validation_results.append(True) # 座位不连续不算错误,只是需要特殊处理
return all(validation_results), seat_groups
def validate_capacity_feasibility(self, seat_groups):
"""校验座位容量和分配可行性"""
self.logger.log("\n=== 容量和可行性校验 ===")
# 统计人员总数
total_people = len(self.personnel_df)
total_seats = sum(len(seats) for seats in seat_groups.values())
self.logger.log(f"总人数: {total_people}")
self.logger.log(f"总座位数: {total_seats}")
if total_people > total_seats:
self.logger.log(f"❌ 座位不足: 需要 {total_people} 个座位,只有 {total_seats}")
return False
else:
self.logger.log(f"✅ 座位充足: 剩余 {total_seats - total_people} 个座位")
# 分析连坐组需求
self.logger.log("\n连坐组需求分析:")
group_sizes = []
i = 0
while i < len(self.personnel_df):
person = self.personnel_df.iloc[i]
remark = person['备注']
if pd.isna(remark):
group_sizes.append(1)
i += 1
else:
try:
group_size = int(remark)
group_sizes.append(group_size)
i += group_size
except:
group_sizes.append(1)
i += 1
max_group_size = max(group_sizes)
self.logger.log(f"最大连坐组: {max_group_size}")
# 检查是否有足够的连续座位来容纳最大连坐组
self.logger.log(f"\n连续座位可行性分析:")
max_consecutive_available = 0
feasible_rows = []
for (area, floor, row_num), seats in seat_groups.items():
# 分析每排的连续座位段
seat_numbers = []
for seat in seats:
try:
seat_numbers.append(int(str(seat).replace('', '')))
except:
continue
seat_numbers.sort()
# 找出最大连续段
max_consecutive_in_row = 0
if seat_numbers:
current_consecutive = 1
for i in range(1, len(seat_numbers)):
if seat_numbers[i] - seat_numbers[i-1] == 1:
current_consecutive += 1
else:
max_consecutive_in_row = max(max_consecutive_in_row, current_consecutive)
current_consecutive = 1
max_consecutive_in_row = max(max_consecutive_in_row, current_consecutive)
if max_consecutive_in_row >= max_group_size:
feasible_rows.append((area, floor, row_num, max_consecutive_in_row))
max_consecutive_available = max(max_consecutive_available, max_consecutive_in_row)
self.logger.log(f" {area}-{floor}-{row_num}: 最大连续 {max_consecutive_in_row} 个座位")
self.logger.log(f"\n全场最大连续座位: {max_consecutive_available}")
if max_group_size > max_consecutive_available:
self.logger.log(f"❌ 无法容纳最大连坐组: 需要 {max_group_size} 个连续座位,最大连续段只有 {max_consecutive_available}")
return False
else:
self.logger.log(f"✅ 可以容纳最大连坐组")
self.logger.log(f"可容纳最大连坐组的排数: {len(feasible_rows)}")
# 统计各种大小的连坐组
size_counts = {}
for size in group_sizes:
size_counts[size] = size_counts.get(size, 0) + 1
self.logger.log("\n连坐组分布:")
for size in sorted(size_counts.keys()):
count = size_counts[size]
if size == 1:
self.logger.log(f" 单人组: {count}")
else:
self.logger.log(f" {size}人连坐组: {count}")
return True
def analyze_seating_requirements(self):
"""分析人员连坐需求"""
self.logger.log("\n=== 人员连坐需求分析 ===")
self.seating_groups = []
i = 0
while i < len(self.personnel_df):
person = self.personnel_df.iloc[i]
remark = person['备注']
if pd.isna(remark):
# 无备注,单独坐
self.seating_groups.append({
'type': 'single',
'size': 1,
'members': [person],
'leader': person['姓名']
})
i += 1
else:
# 有备注,按备注数量连坐
group_size = int(remark)
group_members = []
# 收集连坐组成员
for j in range(group_size):
if i + j < len(self.personnel_df):
group_members.append(self.personnel_df.iloc[i + j])
self.seating_groups.append({
'type': 'group' if group_size > 1 else 'single',
'size': len(group_members),
'members': group_members,
'leader': person['姓名']
})
i += group_size
# 统计
size_stats = {}
for group in self.seating_groups:
size = group['size']
size_stats[size] = size_stats.get(size, 0) + 1
self.logger.log(f"总共识别出 {len(self.seating_groups)} 个座位组:")
for size in sorted(size_stats.keys()):
count = size_stats[size]
if size == 1:
self.logger.log(f" 单人组: {count}")
else:
self.logger.log(f" {size}人连坐组: {count}")
return True
def analyze_seat_structure_advanced(self):
"""高级座位结构分析,识别连续段"""
self.logger.log("\n=== 高级座位结构分析 ===")
seat_groups = {}
for _, row in self.seat_df.iterrows():
key = (row['区域'], row['楼层'], row['排号'])
if key not in seat_groups:
seat_groups[key] = []
seat_groups[key].append({
'index': row.name,
'座位号': row['座位号'],
'row_data': row
})
# 分析每排的连续段
self.row_analysis = {}
for key, seats in seat_groups.items():
area, floor, row_num = key
# 按座位号排序
def get_seat_number(seat_info):
try:
return int(seat_info['座位号'].replace('', ''))
except:
return 0
seats.sort(key=get_seat_number)
seat_numbers = [get_seat_number(seat) for seat in seats]
# 找出所有连续段
consecutive_segments = []
if seat_numbers:
current_segment_start = 0
for i in range(1, len(seat_numbers)):
if seat_numbers[i] - seat_numbers[i-1] != 1:
# 发现间隙,结束当前段
consecutive_segments.append({
'start_idx': current_segment_start,
'end_idx': i - 1,
'size': i - current_segment_start,
'seat_numbers': seat_numbers[current_segment_start:i],
'seats': seats[current_segment_start:i]
})
current_segment_start = i
# 添加最后一段
consecutive_segments.append({
'start_idx': current_segment_start,
'end_idx': len(seat_numbers) - 1,
'size': len(seat_numbers) - current_segment_start,
'seat_numbers': seat_numbers[current_segment_start:],
'seats': seats[current_segment_start:]
})
self.row_analysis[key] = {
'total_seats': len(seats),
'all_seats': seats,
'consecutive_segments': consecutive_segments,
'max_consecutive': max(seg['size'] for seg in consecutive_segments) if consecutive_segments else 0
}
# 显示分析结果
if len(consecutive_segments) == 1:
self.logger.log(f"{area}-{floor}-{row_num}: {len(seats)} 个座位完全连续")
else:
segments_info = []
for seg in consecutive_segments:
if seg['size'] == 1:
segments_info.append(f"{seg['seat_numbers'][0]}")
else:
segments_info.append(f"{seg['seat_numbers'][0]}-{seg['seat_numbers'][-1]}")
self.logger.log(f"📊 {area}-{floor}-{row_num}: {len(seats)} 个座位,{len(consecutive_segments)} 段: {', '.join(segments_info)}")
self.logger.log(f" 最大连续段: {self.row_analysis[key]['max_consecutive']} 个座位")
return True
def smart_seat_assignment(self):
"""智能座位分配算法"""
self.logger.log("\n=== 开始智能座位分配 ===")
# 按组大小排序,大组优先分配
sorted_groups = sorted(self.seating_groups, key=lambda x: x['size'], reverse=True)
# 创建座位分配结果
seat_df_copy = self.seat_df.copy()
# 记录已使用的座位
used_seats = set()
assignment_log = []
unassigned_groups = []
self.logger.log(f"需要分配 {len(sorted_groups)} 个组")
for group_idx, seating_group in enumerate(sorted_groups):
group_size = seating_group['size']
group_type = seating_group['type']
leader = seating_group['leader']
self.logger.log(f"\n处理第 {group_idx + 1} 组: {leader} ({group_type}, {group_size} 人)")
if group_size == 1:
# 单人组,找任意可用座位
assigned = False
for (area, floor, row_num), analysis in self.row_analysis.items():
for seat_info in analysis['all_seats']:
if seat_info['index'] not in used_seats:
# 分配座位
person_info = seating_group['members'][0]
seat_index = seat_info['index']
# 更新座位信息,确保数据类型正确
seat_df_copy.loc[seat_index, '姓名'] = str(person_info['姓名']).strip()
seat_df_copy.loc[seat_index, '证件类型'] = str(person_info['证件类型']).strip()
seat_df_copy.loc[seat_index, '证件号'] = str(person_info['证件号']).strip()
seat_df_copy.loc[seat_index, '手机国家号'] = person_info.get('Unnamed: 3', 86)
seat_df_copy.loc[seat_index, '手机号'] = str(person_info['手机号']).strip()
seat_df_copy.loc[seat_index, '签发地/国籍'] = str(person_info.get('备注', '')).strip()
assignment_log.append({
'组号': group_idx + 1,
'组类型': group_type,
'组大小': group_size,
'组长': leader,
'姓名': person_info['姓名'],
'区域': area,
'楼层': floor,
'排号': row_num,
'座位号': seat_info['座位号']
})
used_seats.add(seat_index)
self.logger.log(f" 分配到 {area}-{floor}-{row_num}: {person_info['姓名']} -> {seat_info['座位号']}")
assigned = True
break
if assigned:
break
else:
# 多人组,需要连续座位
# 更新可用座位分析(排除已使用的座位)
current_row_analysis = {}
for key, analysis in self.row_analysis.items():
available_seats = [seat for seat in analysis['all_seats'] if seat['index'] not in used_seats]
if available_seats:
# 重新分析连续段
available_seats.sort(key=lambda x: int(x['座位号'].replace('', '')))
seat_numbers = [int(seat['座位号'].replace('', '')) for seat in available_seats]
# 找出连续段
consecutive_segments = []
if seat_numbers:
current_segment_start = 0
for i in range(1, len(seat_numbers)):
if seat_numbers[i] - seat_numbers[i-1] != 1:
consecutive_segments.append({
'start_idx': current_segment_start,
'end_idx': i - 1,
'size': i - current_segment_start,
'seats': available_seats[current_segment_start:i]
})
current_segment_start = i
consecutive_segments.append({
'start_idx': current_segment_start,
'end_idx': len(seat_numbers) - 1,
'size': len(seat_numbers) - current_segment_start,
'seats': available_seats[current_segment_start:]
})
current_row_analysis[key] = {
'consecutive_segments': consecutive_segments
}
# 寻找合适的连续座位
assigned = False
for (area, floor, row_num), analysis in current_row_analysis.items():
for segment in analysis['consecutive_segments']:
if segment['size'] >= group_size:
# 找到合适的连续座位
seats_to_use = segment['seats'][:group_size]
seat_numbers = [int(seat['座位号'].replace('', '')) for seat in seats_to_use]
self.logger.log(f" 分配到 {area}-{floor}-{row_num} (连续座位: {min(seat_numbers)}-{max(seat_numbers)})")
# 分配座位
for i, seat_info in enumerate(seats_to_use):
person_info = seating_group['members'][i]
seat_index = seat_info['index']
seat_df_copy.loc[seat_index, '姓名'] = str(person_info['姓名']).strip()
seat_df_copy.loc[seat_index, '证件类型'] = str(person_info['证件类型']).strip()
seat_df_copy.loc[seat_index, '证件号'] = str(person_info['证件号']).strip()
seat_df_copy.loc[seat_index, '手机国家号'] = person_info.get('Unnamed: 3', 86)
seat_df_copy.loc[seat_index, '手机号'] = str(person_info['手机号']).strip()
seat_df_copy.loc[seat_index, '签发地/国籍'] = str(person_info.get('备注', '')).strip()
assignment_log.append({
'组号': group_idx + 1,
'组类型': group_type,
'组大小': group_size,
'组长': leader,
'姓名': person_info['姓名'],
'区域': area,
'楼层': floor,
'排号': row_num,
'座位号': seat_info['座位号']
})
used_seats.add(seat_index)
self.logger.log(f" {person_info['姓名']} -> {seat_info['座位号']}")
assigned = True
break
if assigned:
break
if not assigned:
self.logger.log(f" ❌ 无法为第 {group_idx + 1} 组分配座位")
unassigned_groups.append(seating_group)
# 显示未分配的组
if unassigned_groups:
self.logger.log(f"\n⚠️ 有 {len(unassigned_groups)} 个组未能分配座位")
for group in unassigned_groups:
if group['type'] == 'single':
self.logger.log(f" 未分配: {group['leader']}")
else:
member_names = [member['姓名'] for member in group['members']]
self.logger.log(f" 未分配: {', '.join(member_names)} (连坐 {group['size']} 人)")
return seat_df_copy, assignment_log
def save_results(self, seat_df_result, assignment_log):
"""保存分配结果"""
try:
# 保存更新后的座位信息
output_file = '座位信息_最终分配.xlsx'
seat_df_result.to_excel(output_file, index=False)
self.logger.log(f"\n座位分配结果已保存到: {output_file}")
# 保存分配日志
if assignment_log:
log_df = pd.DataFrame(assignment_log)
log_file = '最终座位分配日志.xlsx'
log_df.to_excel(log_file, index=False)
self.logger.log(f"分配日志已保存到: {log_file}")
# 显示分配统计
self.logger.log(f"\n=== 分配统计 ===")
self.logger.log(f"总共分配了 {len(assignment_log)} 个座位")
# 按组大小统计
size_stats = log_df.groupby('组大小').agg({
'姓名': 'count',
'组号': 'nunique'
}).rename(columns={'姓名': '总人数', '组号': '组数'})
self.logger.log("\n按组大小统计:")
for size, row in size_stats.iterrows():
if size == 1:
self.logger.log(f" 单人组: {row['组数']} 个组, {row['总人数']}")
else:
self.logger.log(f" {size}人连坐组: {row['组数']} 个组, {row['总人数']}")
# 验证连续性
self.logger.log("\n=== 连续性验证 ===")
consecutive_check = []
for group_num in sorted(log_df['组号'].unique()):
group_data = log_df[log_df['组号'] == group_num]
group_size = group_data.iloc[0]['组大小']
group_leader = group_data.iloc[0]['组长']
if group_size > 1: # 多人组
# 检查是否在同一排
areas = group_data['区域'].unique()
floors = group_data['楼层'].unique()
rows = group_data['排号'].unique()
if len(areas) == 1 and len(floors) == 1 and len(rows) == 1:
# 在同一排,检查座位号是否连续
seats = group_data['座位号'].tolist()
seat_numbers = []
for seat in seats:
try:
seat_numbers.append(int(seat.replace('', '')))
except:
seat_numbers.append(0)
seat_numbers.sort()
is_consecutive = all(seat_numbers[i] + 1 == seat_numbers[i+1] for i in range(len(seat_numbers)-1))
if is_consecutive:
consecutive_check.append(True)
self.logger.log(f"✅ 组 {group_num} ({group_leader}): {group_size}人连坐,座位连续 {seat_numbers}")
else:
consecutive_check.append(False)
self.logger.log(f"❌ 组 {group_num} ({group_leader}): {group_size}人连坐,座位不连续 {seat_numbers}")
else:
consecutive_check.append(False)
self.logger.log(f"❌ 组 {group_num} ({group_leader}): 不在同一排")
success_rate = sum(consecutive_check) / len(consecutive_check) * 100 if consecutive_check else 100
self.logger.log(f"\n连续性检查结果: {sum(consecutive_check)}/{len(consecutive_check)} 个多人组座位连续 ({success_rate:.1f}%)")
return True
except Exception as e:
self.logger.log(f"保存结果时出错: {e}")
return False
def run_validation(self):
"""运行完整的文件校验"""
self.logger.log("=" * 60)
self.logger.log("座位分配系统 - 文件校验")
self.logger.log("=" * 60)
# 加载数据
if not self.load_data():
return False
# 人员信息结构校验
personnel_structure_valid = self.validate_personnel_structure()
# 连坐组校验
seating_groups_valid, group_issues = self.validate_seating_groups()
# 座位信息校验
seat_structure_valid, seat_groups = self.validate_seat_structure()
# 容量可行性校验
capacity_valid = self.validate_capacity_feasibility(seat_groups)
# 总结
self.logger.log("\n" + "=" * 60)
self.logger.log("校验结果总结")
self.logger.log("=" * 60)
all_valid = all([
personnel_structure_valid,
seating_groups_valid,
seat_structure_valid,
capacity_valid
])
self.logger.log(f"人员信息结构: {'✅ 通过' if personnel_structure_valid else '❌ 失败'}")
self.logger.log(f"连坐组完整性: {'✅ 通过' if seating_groups_valid else '❌ 失败'}")
self.logger.log(f"座位信息结构: {'✅ 通过' if seat_structure_valid else '❌ 失败'}")
self.logger.log(f"容量可行性: {'✅ 通过' if capacity_valid else '❌ 失败'}")
self.logger.log(f"\n总体校验结果: {'✅ 全部通过' if all_valid else '❌ 存在问题'}")
if group_issues:
self.logger.log(f"\n发现的问题:")
for issue in group_issues:
self.logger.log(f" {issue}")
if all_valid:
self.logger.log("\n🎉 文件校验通过,可以进行座位分配!")
else:
self.logger.log("\n⚠️ 请修复上述问题后再进行座位分配。")
return all_valid
def run_allocation(self):
"""运行完整的座位分配"""
self.logger.log("\n" + "=" * 60)
self.logger.log("开始座位分配")
self.logger.log("=" * 60)
# 分析人员连坐需求
if not self.analyze_seating_requirements():
return False
# 高级座位结构分析
if not self.analyze_seat_structure_advanced():
return False
# 执行智能座位分配
seat_df_result, assignment_log = self.smart_seat_assignment()
# 保存结果
if self.save_results(seat_df_result, assignment_log):
self.logger.log("\n🎉 座位分配完成!")
return True
else:
self.logger.log("\n❌ 座位分配失败!")
return False
def main():
"""主函数"""
system = SeatAllocationSystem()
try:
# 运行校验
if system.run_validation():
# 校验通过,运行分配
system.run_allocation()
# 保存日志
system.logger.save_logs()
except Exception as e:
system.logger.log(f"系统运行出错: {e}")
system.logger.save_logs()
if __name__ == "__main__":
main()