feat(agents): Major refactor of agent execution system

- Refactored execute_agent function with comprehensive error handling
- Added new imports: Arc, Mutex for thread safety and tauri_plugin_shell
- Improved Claude binary detection and command building
- Enhanced argument handling for agent execution
- Added better process management and output streaming
- Implemented more robust error propagation and logging
- Expanded command construction with proper environment setup
- Added extensive documentation and error context
This commit is contained in:
Vivek R
2025-07-04 14:17:21 +05:30
parent f5cefb97ba
commit 11d9469e49

View File

@@ -6,8 +6,9 @@ use rusqlite::{params, Connection, Result as SqliteResult};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::Value as JsonValue; use serde_json::Value as JsonValue;
use std::process::Stdio; use std::process::Stdio;
use std::sync::Mutex; use std::sync::{Arc, Mutex};
use tauri::{AppHandle, Emitter, Manager, State}; use tauri::{AppHandle, Emitter, Manager, State};
use tauri_plugin_shell::ShellExt;
use tokio::io::{AsyncBufReadExt, BufReader}; use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::Command; use tokio::process::Command;
@@ -691,7 +692,7 @@ pub async fn execute_agent(
conn.last_insert_rowid() conn.last_insert_rowid()
}; };
// Build the command // Find Claude binary
info!("Running agent '{}'", agent.name); info!("Running agent '{}'", agent.name);
let claude_path = match find_claude_binary(&app) { let claude_path = match find_claude_binary(&app) {
Ok(path) => path, Ok(path) => path,
@@ -700,24 +701,293 @@ pub async fn execute_agent(
return Err(e); return Err(e);
} }
}; };
let mut cmd = create_command_with_env(&claude_path);
cmd.arg("-p") // Build arguments
.arg(&task) let args = vec![
.arg("--system-prompt") "-p".to_string(),
.arg(&agent.system_prompt) task.clone(),
.arg("--model") "--system-prompt".to_string(),
.arg(&execution_model) agent.system_prompt.clone(),
.arg("--output-format") "--model".to_string(),
.arg("stream-json") execution_model.clone(),
.arg("--verbose") "--output-format".to_string(),
.arg("--dangerously-skip-permissions") "stream-json".to_string(),
.current_dir(&project_path) "--verbose".to_string(),
.stdin(Stdio::null()) // Don't pipe stdin - we have no input to send "--dangerously-skip-permissions".to_string(),
];
// Execute based on whether we should use sidecar or system binary
if should_use_sidecar(&claude_path) {
spawn_agent_sidecar(app, run_id, agent_id, agent.name.clone(), args, project_path, task, execution_model, db, registry).await
} else {
spawn_agent_system(app, run_id, agent_id, agent.name.clone(), claude_path, args, project_path, task, execution_model, db, registry).await
}
}
/// Determines whether to use sidecar or system binary execution for agents
fn should_use_sidecar(claude_path: &str) -> bool {
claude_path == "claude-code"
}
/// Creates a sidecar command for agent execution
fn create_agent_sidecar_command(
app: &AppHandle,
args: Vec<String>,
project_path: &str,
) -> Result<tauri_plugin_shell::process::Command, String> {
let mut sidecar_cmd = app
.shell()
.sidecar("claude-code")
.map_err(|e| format!("Failed to create sidecar command: {}", e))?;
// Add all arguments
sidecar_cmd = sidecar_cmd.args(args);
// Set working directory
sidecar_cmd = sidecar_cmd.current_dir(project_path);
Ok(sidecar_cmd)
}
/// Creates a system binary command for agent execution
fn create_agent_system_command(
claude_path: &str,
args: Vec<String>,
project_path: &str,
) -> Command {
let mut cmd = create_command_with_env(claude_path);
// Add all arguments
for arg in args {
cmd.arg(arg);
}
cmd.current_dir(project_path)
.stdin(Stdio::null())
.stdout(Stdio::piped()) .stdout(Stdio::piped())
.stderr(Stdio::piped()); .stderr(Stdio::piped());
cmd
}
/// Spawn agent using sidecar command
async fn spawn_agent_sidecar(
app: AppHandle,
run_id: i64,
_agent_id: i64,
_agent_name: String,
args: Vec<String>,
project_path: String,
_task: String,
_execution_model: String,
db: State<'_, AgentDb>,
registry: State<'_, crate::process::ProcessRegistryState>,
) -> Result<i64, String> {
use std::sync::Mutex;
// Create the sidecar command
let sidecar_cmd = create_agent_sidecar_command(&app, args, &project_path)?;
// Spawn the sidecar process
let (mut rx, child) = sidecar_cmd
.spawn()
.map_err(|e| format!("Failed to spawn Claude sidecar: {}", e))?;
// Get the child PID for logging
let pid = child.pid();
info!("✅ Spawned Claude sidecar process with PID: {:?}", pid);
// Update the database with PID and status
let now = chrono::Utc::now().to_rfc3339();
{
let conn = db.0.lock().map_err(|e| e.to_string())?;
conn.execute(
"UPDATE agent_runs SET status = 'running', pid = ?1, process_started_at = ?2 WHERE id = ?3",
params![pid as i64, now, run_id],
).map_err(|e| e.to_string())?;
info!("📝 Updated database with running status and PID");
}
// We'll extract the session ID from Claude's init message
let session_id_holder: Arc<Mutex<Option<String>>> = Arc::new(Mutex::new(None));
// Spawn task to read events from sidecar
let app_handle = app.clone();
let session_id_holder_clone = session_id_holder.clone();
let live_output = std::sync::Arc::new(Mutex::new(String::new()));
let live_output_clone = live_output.clone();
let registry_clone = registry.0.clone();
let first_output = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
let first_output_clone = first_output.clone();
let sidecar_task = tokio::spawn(async move {
info!("📖 Starting to read Claude sidecar events...");
let mut line_count = 0;
while let Some(event) = rx.recv().await {
match event {
tauri_plugin_shell::process::CommandEvent::Stdout(data) => {
let line = String::from_utf8_lossy(&data).trim().to_string();
if !line.is_empty() {
line_count += 1;
// Log first output
if !first_output_clone.load(std::sync::atomic::Ordering::Relaxed) {
info!("🎉 First output received from Claude sidecar! Line: {}", line);
first_output_clone.store(true, std::sync::atomic::Ordering::Relaxed);
}
if line_count <= 5 {
info!("sidecar stdout[{}]: {}", line_count, line);
} else {
debug!("sidecar stdout[{}]: {}", line_count, line);
}
// Store live output
if let Ok(mut output) = live_output_clone.lock() {
output.push_str(&line);
output.push('\n');
}
// Also store in process registry for cross-session access
let _ = registry_clone.append_live_output(run_id, &line);
// Extract session ID from JSONL output
if let Ok(json) = serde_json::from_str::<JsonValue>(&line) {
if let Some(sid) = json.get("sessionId").and_then(|s| s.as_str()) {
if let Ok(mut current_session_id) = session_id_holder_clone.lock() {
if current_session_id.is_none() {
*current_session_id = Some(sid.to_string());
info!("🔑 Extracted session ID: {}", sid);
}
}
}
}
// Emit the line to the frontend with run_id for isolation
let _ = app_handle.emit(&format!("agent-output:{}", run_id), &line);
// Also emit to the generic event for backward compatibility
let _ = app_handle.emit("agent-output", &line);
}
}
tauri_plugin_shell::process::CommandEvent::Stderr(data) => {
let line = String::from_utf8_lossy(&data).trim().to_string();
if !line.is_empty() {
error!("sidecar stderr: {}", line);
// Emit error lines to the frontend with run_id for isolation
let _ = app_handle.emit(&format!("agent-error:{}", run_id), &line);
// Also emit to the generic event for backward compatibility
let _ = app_handle.emit("agent-error", &line);
}
}
tauri_plugin_shell::process::CommandEvent::Terminated { .. } => {
info!("📖 Claude sidecar process terminated");
break;
}
tauri_plugin_shell::process::CommandEvent::Error(e) => {
error!("🔥 Claude sidecar error: {}", e);
break;
}
_ => {
// Handle any other event types we might not know about
debug!("Received unknown sidecar event type");
}
}
}
info!("📖 Finished reading Claude sidecar events. Total lines: {}", line_count);
});
// Create variables we need for the spawned task
let app_dir = app
.path()
.app_data_dir()
.expect("Failed to get app data dir");
let db_path = app_dir.join("agents.db");
// Monitor process status and wait for completion
tokio::spawn(async move {
info!("🕐 Starting sidecar process monitoring...");
// Wait for first output with timeout
for i in 0..300 {
// 30 seconds (300 * 100ms)
if first_output.load(std::sync::atomic::Ordering::Relaxed) {
info!("✅ Output detected after {}ms, continuing normal execution", i * 100);
break;
}
if i == 299 {
warn!("⏰ TIMEOUT: No output from Claude sidecar after 30 seconds");
warn!("💡 This usually means:");
warn!(" 1. Claude sidecar is waiting for user input");
warn!(" 2. Authentication issues (API key not found/invalid)");
warn!(" 3. Network connectivity issues");
warn!(" 4. Claude failed to initialize but didn't report an error");
// Update database with failed status
if let Ok(conn) = Connection::open(&db_path) {
let _ = conn.execute(
"UPDATE agent_runs SET status = 'failed', completed_at = CURRENT_TIMESTAMP WHERE id = ?1",
params![run_id],
);
}
let _ = app.emit("agent-complete", false);
let _ = app.emit(&format!("agent-complete:{}", run_id), false);
return;
}
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
// Wait for sidecar task to complete
info!("⏳ Waiting for sidecar reading to complete...");
let _ = sidecar_task.await;
// Get the session ID that was extracted
let extracted_session_id = if let Ok(Some(sid)) = session_id_holder.lock().map(|s| s.clone()) {
sid
} else {
String::new()
};
// Update the run record with session ID and mark as completed
if let Ok(conn) = Connection::open(&db_path) {
let _ = conn.execute(
"UPDATE agent_runs SET session_id = ?1, status = 'completed', completed_at = CURRENT_TIMESTAMP WHERE id = ?2",
params![extracted_session_id, run_id],
);
}
info!("✅ Claude sidecar execution monitoring complete");
let _ = app.emit("agent-complete", true);
let _ = app.emit(&format!("agent-complete:{}", run_id), true);
});
Ok(run_id)
}
/// Spawn agent using system binary command
async fn spawn_agent_system(
app: AppHandle,
run_id: i64,
agent_id: i64,
agent_name: String,
claude_path: String,
args: Vec<String>,
project_path: String,
task: String,
execution_model: String,
db: State<'_, AgentDb>,
registry: State<'_, crate::process::ProcessRegistryState>,
) -> Result<i64, String> {
// Build the command
let mut cmd = create_agent_system_command(&claude_path, args, &project_path);
// Spawn the process // Spawn the process
info!("🚀 Spawning Claude process..."); info!("🚀 Spawning Claude system process...");
let mut child = cmd.spawn().map_err(|e| { let mut child = cmd.spawn().map_err(|e| {
error!("❌ Failed to spawn Claude process: {}", e); error!("❌ Failed to spawn Claude process: {}", e);
format!("Failed to spawn Claude: {}", e) format!("Failed to spawn Claude: {}", e)
@@ -859,7 +1129,7 @@ pub async fn execute_agent(
.register_process( .register_process(
run_id, run_id,
agent_id, agent_id,
agent.name.clone(), agent_name,
pid, pid,
project_path.clone(), project_path.clone(),
task.clone(), task.clone(),
@@ -891,64 +1161,54 @@ pub async fn execute_agent(
break; break;
} }
tokio::time::sleep(std::time::Duration::from_millis(100)).await; if i == 299 {
warn!("⏰ TIMEOUT: No output from Claude process after 30 seconds");
warn!("💡 This usually means:");
warn!(" 1. Claude process is waiting for user input");
warn!(" 3. Claude failed to initialize but didn't report an error");
warn!(" 4. Network connectivity issues");
warn!(" 5. Authentication issues (API key not found/invalid)");
// Log progress every 5 seconds // Process timed out - kill it via PID
if i > 0 && i % 50 == 0 { warn!(
info!( "🔍 Process likely stuck waiting for input, attempting to kill PID: {}",
"⏳ Still waiting for Claude output... ({}s elapsed)", pid
i / 10
); );
} let kill_result = std::process::Command::new("kill")
} .arg("-TERM")
.arg(pid.to_string())
.output();
// Check if we timed out match kill_result {
if !first_output.load(std::sync::atomic::Ordering::Relaxed) { Ok(output) if output.status.success() => {
warn!("⏰ TIMEOUT: No output from Claude process after 30 seconds"); warn!("🔍 Successfully sent TERM signal to process");
warn!("💡 This usually means:"); }
warn!(" 1. Claude process is waiting for user input"); Ok(_) => {
warn!("🔍 Failed to kill process with TERM, trying KILL");
warn!(" 3. Claude failed to initialize but didn't report an error"); let _ = std::process::Command::new("kill")
warn!(" 4. Network connectivity issues"); .arg("-KILL")
warn!(" 5. Authentication issues (API key not found/invalid)"); .arg(pid.to_string())
.output();
// Process timed out - kill it via PID }
warn!( Err(e) => {
"🔍 Process likely stuck waiting for input, attempting to kill PID: {}", warn!("🔍 Error killing process: {}", e);
pid }
);
let kill_result = std::process::Command::new("kill")
.arg("-TERM")
.arg(pid.to_string())
.output();
match kill_result {
Ok(output) if output.status.success() => {
warn!("🔍 Successfully sent TERM signal to process");
} }
Ok(_) => {
warn!("🔍 Failed to kill process with TERM, trying KILL"); // Update database
let _ = std::process::Command::new("kill") if let Ok(conn) = Connection::open(&db_path) {
.arg("-KILL") let _ = conn.execute(
.arg(pid.to_string()) "UPDATE agent_runs SET status = 'failed', completed_at = CURRENT_TIMESTAMP WHERE id = ?1",
.output(); params![run_id],
} );
Err(e) => {
warn!("🔍 Error killing process: {}", e);
} }
let _ = app.emit("agent-complete", false);
let _ = app.emit(&format!("agent-complete:{}", run_id), false);
return;
} }
// Update database tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
if let Ok(conn) = Connection::open(&db_path) {
let _ = conn.execute(
"UPDATE agent_runs SET status = 'failed', completed_at = CURRENT_TIMESTAMP WHERE id = ?1",
params![run_id],
);
}
let _ = app.emit("agent-complete", false);
let _ = app.emit(&format!("agent-complete:{}", run_id), false);
return;
} }
// Wait for reading tasks to complete // Wait for reading tasks to complete