From 19cf623d6449b62f8f83e4c53069aef64b65d5d7 Mon Sep 17 00:00:00 2001 From: Mufeed VH Date: Fri, 4 Jul 2025 20:27:12 +0530 Subject: [PATCH] feat(agents): enhance execution control with stop functionality and improved database handling - Refactor database path handling in agents.rs to initialize paths earlier and make them available to all async tasks - Add immediate session ID updates to the database when session IDs are extracted - Implement stop/cancel functionality for running agent sessions in both AgentRunOutputViewer and AgentRunView - Improve loading states and event listener management in AgentRunOutputViewer - Add stop button UI controls with proper styling and positioning - Enhance error handling and logging throughout the agent execution flow --- src-tauri/src/commands/agents.rs | 68 +++++++++--- src/components/AgentRunOutputViewer.tsx | 120 +++++++++++++++++++-- src/components/AgentRunView.tsx | 133 +++++++++++++++++------- 3 files changed, 259 insertions(+), 62 deletions(-) diff --git a/src-tauri/src/commands/agents.rs b/src-tauri/src/commands/agents.rs index 6c52ea7..5e61549 100644 --- a/src-tauri/src/commands/agents.rs +++ b/src-tauri/src/commands/agents.rs @@ -814,6 +814,14 @@ async fn spawn_agent_sidecar( // We'll extract the session ID from Claude's init message let session_id_holder: Arc>> = Arc::new(Mutex::new(None)); + // Create variables we need for the spawned task + let app_dir = app + .path() + .app_data_dir() + .expect("Failed to get app data dir"); + let db_path = app_dir.join("agents.db"); + let db_path_for_stream = db_path.clone(); // Clone for the streaming task + // Spawn task to read events from sidecar let app_handle = app.clone(); let session_id_holder_clone = session_id_holder.clone(); @@ -865,6 +873,23 @@ async fn spawn_agent_sidecar( if current_session_id.is_none() { *current_session_id = Some(sid.to_string()); info!("🔑 Extracted session ID: {}", sid); + + // Update database immediately with session ID + if let Ok(conn) = Connection::open(&db_path_for_stream) { + match conn.execute( + "UPDATE agent_runs SET session_id = ?1 WHERE id = ?2", + params![sid, run_id], + ) { + Ok(rows) => { + if rows > 0 { + info!("✅ Updated agent run {} with session ID immediately", run_id); + } + } + Err(e) => { + error!("❌ Failed to update session ID immediately: {}", e); + } + } + } } } } @@ -905,13 +930,6 @@ async fn spawn_agent_sidecar( info!("📖 Finished reading Claude sidecar events. Total lines: {}", line_count); }); - // Create variables we need for the spawned task - let app_dir = app - .path() - .app_data_dir() - .expect("Failed to get app data dir"); - let db_path = app_dir.join("agents.db"); - // Monitor process status and wait for completion tokio::spawn(async move { info!("🕐 Starting sidecar process monitoring..."); @@ -1040,6 +1058,13 @@ async fn spawn_agent_system( let stdout_reader = TokioBufReader::new(stdout); let stderr_reader = TokioBufReader::new(stderr); + // Create variables we need for the spawned tasks + let app_dir = app + .path() + .app_data_dir() + .expect("Failed to get app data dir"); + let db_path = app_dir.join("agents.db"); + // Shared state for collecting session ID and live output let session_id = std::sync::Arc::new(Mutex::new(String::new())); let live_output = std::sync::Arc::new(Mutex::new(String::new())); @@ -1052,6 +1077,7 @@ async fn spawn_agent_system( let registry_clone = registry.0.clone(); let first_output = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)); let first_output_clone = first_output.clone(); + let db_path_for_stdout = db_path.clone(); // Clone the db_path for the stdout task let stdout_task = tokio::spawn(async move { info!("📖 Starting to read Claude stdout..."); @@ -1095,6 +1121,23 @@ async fn spawn_agent_system( if current_session_id.is_empty() { *current_session_id = sid.to_string(); info!("🔑 Extracted session ID: {}", sid); + + // Update database immediately with session ID + if let Ok(conn) = Connection::open(&db_path_for_stdout) { + match conn.execute( + "UPDATE agent_runs SET session_id = ?1 WHERE id = ?2", + params![sid, run_id], + ) { + Ok(rows) => { + if rows > 0 { + info!("✅ Updated agent run {} with session ID immediately", run_id); + } + } + Err(e) => { + error!("❌ Failed to update session ID immediately: {}", e); + } + } + } } } } @@ -1164,12 +1207,7 @@ async fn spawn_agent_system( .map_err(|e| format!("Failed to register process: {}", e))?; info!("📋 Registered process in registry"); - // Create variables we need for the spawned task - let app_dir = app - .path() - .app_data_dir() - .expect("Failed to get app data dir"); - let db_path = app_dir.join("agents.db"); + let db_path_for_monitor = db_path.clone(); // Clone for the monitor task // Monitor process status and wait for completion tokio::spawn(async move { @@ -1221,7 +1259,7 @@ async fn spawn_agent_system( } // Update database - if let Ok(conn) = Connection::open(&db_path) { + if let Ok(conn) = Connection::open(&db_path_for_monitor) { let _ = conn.execute( "UPDATE agent_runs SET status = 'failed', completed_at = CURRENT_TIMESTAMP WHERE id = ?1", params![run_id], @@ -1255,7 +1293,7 @@ async fn spawn_agent_system( info!("✅ Claude process execution monitoring complete"); // Update the run record with session ID and mark as completed - open a new connection - if let Ok(conn) = Connection::open(&db_path) { + if let Ok(conn) = Connection::open(&db_path_for_monitor) { info!("🔄 Updating database with extracted session ID: {}", extracted_session_id); match conn.execute( "UPDATE agent_runs SET session_id = ?1, status = 'completed', completed_at = CURRENT_TIMESTAMP WHERE id = ?2", diff --git a/src/components/AgentRunOutputViewer.tsx b/src/components/AgentRunOutputViewer.tsx index fd0d4f3..928c265 100644 --- a/src/components/AgentRunOutputViewer.tsx +++ b/src/components/AgentRunOutputViewer.tsx @@ -12,7 +12,8 @@ import { Clock, Hash, DollarSign, - ExternalLink + ExternalLink, + StopCircle } from 'lucide-react'; import { Button } from '@/components/ui/button'; import { Card, CardContent, CardHeader, CardTitle } from '@/components/ui/card'; @@ -64,13 +65,17 @@ export function AgentRunOutputViewer({ }: AgentRunOutputViewerProps) { const [messages, setMessages] = useState([]); const [rawJsonlOutput, setRawJsonlOutput] = useState([]); - const [loading, setLoading] = useState(false); + const [loading, setLoading] = useState(true); const [isFullscreen, setIsFullscreen] = useState(false); const [refreshing, setRefreshing] = useState(false); const [toast, setToast] = useState<{ message: string; type: "success" | "error" } | null>(null); const [copyPopoverOpen, setCopyPopoverOpen] = useState(false); const [hasUserScrolled, setHasUserScrolled] = useState(false); + // Track whether we're in the initial load phase + const isInitialLoadRef = useRef(true); + const hasSetupListenersRef = useRef(false); + const scrollAreaRef = useRef(null); const outputEndRef = useRef(null); const fullscreenScrollRef = useRef(null); @@ -98,10 +103,12 @@ export function AgentRunOutputViewer({ } }; - // Clean up listeners on unmount + // Cleanup on unmount useEffect(() => { return () => { unlistenRefs.current.forEach(unlisten => unlisten()); + unlistenRefs.current = []; + hasSetupListenersRef.current = false; }; }, []); @@ -235,17 +242,33 @@ export function AgentRunOutputViewer({ } }; + // Set up live event listeners for running sessions const setupLiveEventListeners = async () => { - if (!run.id) return; + if (!run.id || hasSetupListenersRef.current) return; try { // Clean up existing listeners unlistenRefs.current.forEach(unlisten => unlisten()); unlistenRefs.current = []; + // Mark that we've set up listeners + hasSetupListenersRef.current = true; + + // After setup, we're no longer in initial load + // Small delay to ensure any pending messages are processed + setTimeout(() => { + isInitialLoadRef.current = false; + }, 100); + // Set up live event listeners with run ID isolation const outputUnlisten = await listen(`agent-output:${run.id}`, (event) => { try { + // Skip messages during initial load phase + if (isInitialLoadRef.current) { + console.log('[AgentRunOutputViewer] Skipping message during initial load'); + return; + } + // Store raw JSONL setRawJsonlOutput(prev => [...prev, event.payload]); @@ -253,17 +276,18 @@ export function AgentRunOutputViewer({ const message = JSON.parse(event.payload) as ClaudeStreamMessage; setMessages(prev => [...prev, message]); } catch (err) { - console.error("Failed to parse message:", err, event.payload); + console.error("[AgentRunOutputViewer] Failed to parse message:", err, event.payload); } }); const errorUnlisten = await listen(`agent-error:${run.id}`, (event) => { - console.error("Agent error:", event.payload); + console.error("[AgentRunOutputViewer] Agent error:", event.payload); setToast({ message: event.payload, type: 'error' }); }); const completeUnlisten = await listen(`agent-complete:${run.id}`, () => { setToast({ message: 'Agent execution completed', type: 'success' }); + // Don't set status here as the parent component should handle it }); const cancelUnlisten = await listen(`agent-cancelled:${run.id}`, () => { @@ -272,7 +296,7 @@ export function AgentRunOutputViewer({ unlistenRefs.current = [outputUnlisten, errorUnlisten, completeUnlisten, cancelUnlisten]; } catch (error) { - console.error('Failed to set up live event listeners:', error); + console.error('[AgentRunOutputViewer] Failed to set up live event listeners:', error); } }; @@ -341,12 +365,63 @@ export function AgentRunOutputViewer({ setToast({ message: 'Output copied as Markdown', type: 'success' }); }; - const refreshOutput = async () => { + const handleRefresh = async () => { setRefreshing(true); - await loadOutput(true); // Skip cache + await loadOutput(); setRefreshing(false); }; + const handleStop = async () => { + if (!run.id) { + console.error('[AgentRunOutputViewer] No run ID available to stop'); + return; + } + + try { + // Call the API to kill the agent session + const success = await api.killAgentSession(run.id); + + if (success) { + console.log(`[AgentRunOutputViewer] Successfully stopped agent session ${run.id}`); + setToast({ message: 'Agent execution stopped', type: 'success' }); + + // Clean up listeners + unlistenRefs.current.forEach(unlisten => unlisten()); + unlistenRefs.current = []; + hasSetupListenersRef.current = false; + + // Add a message indicating execution was stopped + const stopMessage: ClaudeStreamMessage = { + type: "result", + subtype: "error", + is_error: true, + result: "Execution stopped by user", + duration_ms: 0, + usage: { + input_tokens: 0, + output_tokens: 0 + } + }; + setMessages(prev => [...prev, stopMessage]); + + // Update the run status locally + // Optionally refresh the parent component + setTimeout(() => { + window.location.reload(); // Simple refresh to update the status + }, 1000); + } else { + console.warn(`[AgentRunOutputViewer] Failed to stop agent session ${run.id} - it may have already finished`); + setToast({ message: 'Failed to stop agent - it may have already finished', type: 'error' }); + } + } catch (err) { + console.error('[AgentRunOutputViewer] Failed to stop agent:', err); + setToast({ + message: `Failed to stop execution: ${err instanceof Error ? err.message : 'Unknown error'}`, + type: 'error' + }); + } + }; + const handleScroll = (e: React.UIEvent) => { const target = e.currentTarget; const { scrollTop, scrollHeight, clientHeight } = target; @@ -562,13 +637,25 @@ export function AgentRunOutputViewer({ + {run.status === 'running' && ( + + )} + {run.status === 'running' && ( + + )} - } - content={ -
+ )} + + - Copy as JSONL + + Copy Output + - -
- } - open={copyPopoverOpen} - onOpenChange={setCopyPopoverOpen} - align="end" - /> + } + content={ +
+ + +
+ } + open={copyPopoverOpen} + onOpenChange={setCopyPopoverOpen} + align="end" + /> + {/* Run Details */}