feat(core): implement session isolation for agent and claude executions

- Add run_id/session_id based event isolation for concurrent executions
- Enhance process registry with graceful shutdown and fallback kill methods
- Implement session-specific event listeners in React components
- Add proper process cleanup with timeout handling
- Support both isolated and backward-compatible event emissions
- Improve error handling and logging for process management

This change prevents event crosstalk between multiple concurrent agent/claude sessions
running simultaneously, ensuring proper isolation and user experience.
This commit is contained in:
Mufeed VH
2025-06-25 02:14:18 +05:30
parent f73d21e09f
commit 97290e5665
8 changed files with 352 additions and 148 deletions

View File

@@ -1251,7 +1251,9 @@ pub async fn execute_agent(
}
}
// Emit the line to the frontend
// Emit the line to the frontend with run_id for isolation
let _ = app_handle.emit(&format!("agent-output:{}", run_id), &line);
// Also emit to the generic event for backward compatibility
let _ = app_handle.emit("agent-output", &line);
}
@@ -1277,7 +1279,9 @@ pub async fn execute_agent(
}
error!("stderr[{}]: {}", error_count, line);
// Emit error lines to the frontend
// Emit error lines to the frontend with run_id for isolation
let _ = app_handle_stderr.emit(&format!("agent-error:{}", run_id), &line);
// Also emit to the generic event for backward compatibility
let _ = app_handle_stderr.emit("agent-error", &line);
}
@@ -1366,6 +1370,7 @@ pub async fn execute_agent(
}
let _ = app.emit("agent-complete", false);
let _ = app.emit(&format!("agent-complete:{}", run_id), false);
return;
}
@@ -1398,6 +1403,7 @@ pub async fn execute_agent(
// Cleanup will be handled by the cleanup_finished_processes function
let _ = app.emit("agent-complete", true);
let _ = app.emit(&format!("agent-complete:{}", run_id), true);
});
Ok(run_id)
@@ -1442,43 +1448,45 @@ pub async fn list_running_sessions(
/// Kill a running agent session
#[tauri::command]
pub async fn kill_agent_session(
app: AppHandle,
db: State<'_, AgentDb>,
registry: State<'_, crate::process::ProcessRegistryState>,
run_id: i64,
) -> Result<bool, String> {
// First try to kill the process using system kill
let pid_result = {
let conn = db.0.lock().map_err(|e| e.to_string())?;
conn.query_row(
"SELECT pid FROM agent_runs WHERE id = ?1 AND status = 'running'",
params![run_id],
|row| row.get::<_, Option<i64>>(0)
)
.map_err(|e| e.to_string())?
info!("Attempting to kill agent session {}", run_id);
// First try to kill using the process registry
let killed_via_registry = match registry.0.kill_process(run_id).await {
Ok(success) => {
if success {
info!("Successfully killed process {} via registry", run_id);
true
} else {
warn!("Process {} not found in registry", run_id);
false
}
}
Err(e) => {
warn!("Failed to kill process {} via registry: {}", run_id, e);
false
}
};
if let Some(pid) = pid_result {
// Try to kill the process
let kill_result = if cfg!(target_os = "windows") {
std::process::Command::new("taskkill")
.args(["/F", "/PID", &pid.to_string()])
.output()
} else {
std::process::Command::new("kill")
.args(["-TERM", &pid.to_string()])
.output()
// If registry kill didn't work, try fallback with PID from database
if !killed_via_registry {
let pid_result = {
let conn = db.0.lock().map_err(|e| e.to_string())?;
conn.query_row(
"SELECT pid FROM agent_runs WHERE id = ?1 AND status = 'running'",
params![run_id],
|row| row.get::<_, Option<i64>>(0)
)
.map_err(|e| e.to_string())?
};
match kill_result {
Ok(output) => {
if output.status.success() {
info!("Successfully killed process {}", pid);
} else {
warn!("Kill command failed for PID {}: {}", pid, String::from_utf8_lossy(&output.stderr));
}
}
Err(e) => {
warn!("Failed to execute kill command for PID {}: {}", pid, e);
}
if let Some(pid) = pid_result {
info!("Attempting fallback kill for PID {} from database", pid);
let _ = registry.0.kill_process_by_pid(run_id, pid as u32)?;
}
}
@@ -1489,7 +1497,10 @@ pub async fn kill_agent_session(
params![run_id],
).map_err(|e| e.to_string())?;
Ok(updated > 0)
// Emit cancellation event with run_id for proper isolation
let _ = app.emit(&format!("agent-cancelled:{}", run_id), true);
Ok(updated > 0 || killed_via_registry)
}
/// Get the status of a specific agent session

View File

@@ -9,6 +9,7 @@ use tauri::{AppHandle, Emitter, Manager};
use tokio::process::{Command, Child};
use tokio::sync::Mutex;
use std::sync::Arc;
use uuid;
/// Global state to track current Claude process
pub struct ClaudeProcessState {
@@ -857,8 +858,8 @@ pub async fn resume_claude_code(
/// Cancel the currently running Claude Code execution
#[tauri::command]
pub async fn cancel_claude_execution(app: AppHandle) -> Result<(), String> {
log::info!("Cancelling Claude Code execution");
pub async fn cancel_claude_execution(app: AppHandle, session_id: Option<String>) -> Result<(), String> {
log::info!("Cancelling Claude Code execution for session: {:?}", session_id);
let claude_state = app.state::<ClaudeProcessState>();
let mut current_process = claude_state.current_process.lock().await;
@@ -872,9 +873,16 @@ pub async fn cancel_claude_execution(app: AppHandle) -> Result<(), String> {
match child.kill().await {
Ok(_) => {
log::info!("Successfully killed Claude process");
// Emit cancellation event
// If we have a session ID, emit session-specific events
if let Some(sid) = session_id {
let _ = app.emit(&format!("claude-cancelled:{}", sid), true);
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
let _ = app.emit(&format!("claude-complete:{}", sid), false);
}
// Also emit generic events for backward compatibility
let _ = app.emit("claude-cancelled", true);
// Also emit complete with false to indicate failure
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
let _ = app.emit("claude-complete", false);
Ok(())
@@ -1055,6 +1063,15 @@ fn get_claude_settings_sync(_app: &AppHandle) -> Result<ClaudeSettings, String>
async fn spawn_claude_process(app: AppHandle, mut cmd: Command) -> Result<(), String> {
use tokio::io::{AsyncBufReadExt, BufReader};
// Generate a unique session ID for this Claude Code session
let session_id = format!("claude-{}-{}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis(),
uuid::Uuid::new_v4().to_string()
);
// Spawn the process
let mut child = cmd.spawn().map_err(|e| format!("Failed to spawn Claude: {}", e))?;
@@ -1064,36 +1081,47 @@ async fn spawn_claude_process(app: AppHandle, mut cmd: Command) -> Result<(), St
// Get the child PID for logging
let pid = child.id();
log::info!("Spawned Claude process with PID: {:?}", pid);
log::info!("Spawned Claude process with PID: {:?} and session ID: {}", pid, session_id);
// Create readers
let stdout_reader = BufReader::new(stdout);
let stderr_reader = BufReader::new(stderr);
// Store the child process in the global state
// Store the child process in the global state (for backward compatibility)
let claude_state = app.state::<ClaudeProcessState>();
{
let mut current_process = claude_state.current_process.lock().await;
// If there's already a process running, kill it first
if let Some(mut existing_child) = current_process.take() {
log::warn!("Killing existing Claude process before starting new one");
let _ = existing_child.kill().await;
}
*current_process = Some(child);
}
// Spawn tasks to read stdout and stderr
let app_handle = app.clone();
let session_id_clone = session_id.clone();
let stdout_task = tokio::spawn(async move {
let mut lines = stdout_reader.lines();
while let Ok(Some(line)) = lines.next_line().await {
log::debug!("Claude stdout: {}", line);
// Emit the line to the frontend
// Emit the line to the frontend with session isolation
let _ = app_handle.emit(&format!("claude-output:{}", session_id_clone), &line);
// Also emit to the generic event for backward compatibility
let _ = app_handle.emit("claude-output", &line);
}
});
let app_handle_stderr = app.clone();
let session_id_clone2 = session_id.clone();
let stderr_task = tokio::spawn(async move {
let mut lines = stderr_reader.lines();
while let Ok(Some(line)) = lines.next_line().await {
log::error!("Claude stderr: {}", line);
// Emit error lines to the frontend
// Emit error lines to the frontend with session isolation
let _ = app_handle_stderr.emit(&format!("claude-error:{}", session_id_clone2), &line);
// Also emit to the generic event for backward compatibility
let _ = app_handle_stderr.emit("claude-error", &line);
}
});
@@ -1101,6 +1129,7 @@ async fn spawn_claude_process(app: AppHandle, mut cmd: Command) -> Result<(), St
// Wait for the process to complete
let app_handle_wait = app.clone();
let claude_state_wait = claude_state.current_process.clone();
let session_id_clone3 = session_id.clone();
tokio::spawn(async move {
let _ = stdout_task.await;
let _ = stderr_task.await;
@@ -1113,12 +1142,16 @@ async fn spawn_claude_process(app: AppHandle, mut cmd: Command) -> Result<(), St
log::info!("Claude process exited with status: {}", status);
// Add a small delay to ensure all messages are processed
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
let _ = app_handle_wait.emit(&format!("claude-complete:{}", session_id_clone3), status.success());
// Also emit to the generic event for backward compatibility
let _ = app_handle_wait.emit("claude-complete", status.success());
}
Err(e) => {
log::error!("Failed to wait for Claude process: {}", e);
// Add a small delay to ensure all messages are processed
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
let _ = app_handle_wait.emit(&format!("claude-complete:{}", session_id_clone3), false);
// Also emit to the generic event for backward compatibility
let _ = app_handle_wait.emit("claude-complete", false);
}
}
@@ -1128,6 +1161,9 @@ async fn spawn_claude_process(app: AppHandle, mut cmd: Command) -> Result<(), St
*current_process = None;
});
// Return the session ID to the frontend
let _ = app.emit(&format!("claude-session-started:{}", session_id), session_id.clone());
Ok(())
}

View File

@@ -94,29 +94,176 @@ impl ProcessRegistry {
Ok(processes.get(&run_id).map(|handle| handle.info.clone()))
}
/// Kill a running process
#[allow(dead_code)]
/// Kill a running process with proper cleanup
pub async fn kill_process(&self, run_id: i64) -> Result<bool, String> {
let processes = self.processes.lock().map_err(|e| e.to_string())?;
use log::{info, warn, error};
if let Some(handle) = processes.get(&run_id) {
let child_arc = handle.child.clone();
drop(processes); // Release the lock before async operation
// First check if the process exists and get its PID
let (pid, child_arc) = {
let processes = self.processes.lock().map_err(|e| e.to_string())?;
if let Some(handle) = processes.get(&run_id) {
(handle.info.pid, handle.child.clone())
} else {
return Ok(false); // Process not found
}
};
info!("Attempting graceful shutdown of process {} (PID: {})", run_id, pid);
// Send kill signal to the process
let kill_sent = {
let mut child_guard = child_arc.lock().map_err(|e| e.to_string())?;
if let Some(ref mut child) = child_guard.as_mut() {
match child.kill().await {
if let Some(child) = child_guard.as_mut() {
match child.start_kill() {
Ok(_) => {
*child_guard = None; // Clear the child handle
Ok(true)
info!("Successfully sent kill signal to process {}", run_id);
true
}
Err(e) => {
error!("Failed to send kill signal to process {}: {}", run_id, e);
return Err(format!("Failed to kill process: {}", e));
}
Err(e) => Err(format!("Failed to kill process: {}", e)),
}
} else {
Ok(false) // Process was already killed or completed
false // Process already killed
}
};
if !kill_sent {
return Ok(false);
}
// Wait for the process to exit (with timeout)
let wait_result = tokio::time::timeout(
tokio::time::Duration::from_secs(5),
async {
loop {
// Check if process has exited
let status = {
let mut child_guard = child_arc.lock().map_err(|e| e.to_string())?;
if let Some(child) = child_guard.as_mut() {
match child.try_wait() {
Ok(Some(status)) => {
info!("Process {} exited with status: {:?}", run_id, status);
*child_guard = None; // Clear the child handle
Some(Ok::<(), String>(()))
}
Ok(None) => {
// Still running
None
}
Err(e) => {
error!("Error checking process status: {}", e);
Some(Err(e.to_string()))
}
}
} else {
// Process already gone
Some(Ok(()))
}
};
match status {
Some(result) => return result,
None => {
// Still running, wait a bit
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
}
}
}
).await;
match wait_result {
Ok(Ok(_)) => {
info!("Process {} exited gracefully", run_id);
}
Ok(Err(e)) => {
error!("Error waiting for process {}: {}", run_id, e);
}
Err(_) => {
warn!("Process {} didn't exit within 5 seconds after kill", run_id);
// Force clear the handle
if let Ok(mut child_guard) = child_arc.lock() {
*child_guard = None;
}
}
}
// Remove from registry after killing
self.unregister_process(run_id)?;
Ok(true)
}
/// Kill a process by PID using system commands (fallback method)
pub fn kill_process_by_pid(&self, run_id: i64, pid: u32) -> Result<bool, String> {
use log::{info, warn, error};
info!("Attempting to kill process {} by PID {}", run_id, pid);
let kill_result = if cfg!(target_os = "windows") {
std::process::Command::new("taskkill")
.args(["/F", "/PID", &pid.to_string()])
.output()
} else {
Ok(false) // Process not found
// First try SIGTERM
let term_result = std::process::Command::new("kill")
.args(["-TERM", &pid.to_string()])
.output();
match &term_result {
Ok(output) if output.status.success() => {
info!("Sent SIGTERM to PID {}", pid);
// Give it 2 seconds to exit gracefully
std::thread::sleep(std::time::Duration::from_secs(2));
// Check if still running
let check_result = std::process::Command::new("kill")
.args(["-0", &pid.to_string()])
.output();
if let Ok(output) = check_result {
if output.status.success() {
// Still running, send SIGKILL
warn!("Process {} still running after SIGTERM, sending SIGKILL", pid);
std::process::Command::new("kill")
.args(["-KILL", &pid.to_string()])
.output()
} else {
term_result
}
} else {
term_result
}
}
_ => {
// SIGTERM failed, try SIGKILL directly
warn!("SIGTERM failed for PID {}, trying SIGKILL", pid);
std::process::Command::new("kill")
.args(["-KILL", &pid.to_string()])
.output()
}
}
};
match kill_result {
Ok(output) => {
if output.status.success() {
info!("Successfully killed process with PID {}", pid);
// Remove from registry
self.unregister_process(run_id)?;
Ok(true)
} else {
let error_msg = String::from_utf8_lossy(&output.stderr);
warn!("Failed to kill PID {}: {}", pid, error_msg);
Ok(false)
}
}
Err(e) => {
error!("Failed to execute kill command for PID {}: {}", pid, e);
Err(format!("Failed to execute kill command: {}", e))
}
}
}