feat(agents): enhance execution control with stop functionality and improved database handling

- Refactor database path handling in agents.rs to initialize paths earlier and make them available to all async tasks
- Add immediate session ID updates to the database when session IDs are extracted
- Implement stop/cancel functionality for running agent sessions in both AgentRunOutputViewer and AgentRunView
- Improve loading states and event listener management in AgentRunOutputViewer
- Add stop button UI controls with proper styling and positioning
- Enhance error handling and logging throughout the agent execution flow
This commit is contained in:
Mufeed VH
2025-07-04 20:27:12 +05:30
parent 85dce56e04
commit 19cf623d64
3 changed files with 259 additions and 62 deletions

View File

@@ -814,6 +814,14 @@ async fn spawn_agent_sidecar(
// We'll extract the session ID from Claude's init message // We'll extract the session ID from Claude's init message
let session_id_holder: Arc<Mutex<Option<String>>> = Arc::new(Mutex::new(None)); let session_id_holder: Arc<Mutex<Option<String>>> = Arc::new(Mutex::new(None));
// Create variables we need for the spawned task
let app_dir = app
.path()
.app_data_dir()
.expect("Failed to get app data dir");
let db_path = app_dir.join("agents.db");
let db_path_for_stream = db_path.clone(); // Clone for the streaming task
// Spawn task to read events from sidecar // Spawn task to read events from sidecar
let app_handle = app.clone(); let app_handle = app.clone();
let session_id_holder_clone = session_id_holder.clone(); let session_id_holder_clone = session_id_holder.clone();
@@ -865,6 +873,23 @@ async fn spawn_agent_sidecar(
if current_session_id.is_none() { if current_session_id.is_none() {
*current_session_id = Some(sid.to_string()); *current_session_id = Some(sid.to_string());
info!("🔑 Extracted session ID: {}", sid); info!("🔑 Extracted session ID: {}", sid);
// Update database immediately with session ID
if let Ok(conn) = Connection::open(&db_path_for_stream) {
match conn.execute(
"UPDATE agent_runs SET session_id = ?1 WHERE id = ?2",
params![sid, run_id],
) {
Ok(rows) => {
if rows > 0 {
info!("✅ Updated agent run {} with session ID immediately", run_id);
}
}
Err(e) => {
error!("❌ Failed to update session ID immediately: {}", e);
}
}
}
} }
} }
} }
@@ -905,13 +930,6 @@ async fn spawn_agent_sidecar(
info!("📖 Finished reading Claude sidecar events. Total lines: {}", line_count); info!("📖 Finished reading Claude sidecar events. Total lines: {}", line_count);
}); });
// Create variables we need for the spawned task
let app_dir = app
.path()
.app_data_dir()
.expect("Failed to get app data dir");
let db_path = app_dir.join("agents.db");
// Monitor process status and wait for completion // Monitor process status and wait for completion
tokio::spawn(async move { tokio::spawn(async move {
info!("🕐 Starting sidecar process monitoring..."); info!("🕐 Starting sidecar process monitoring...");
@@ -1040,6 +1058,13 @@ async fn spawn_agent_system(
let stdout_reader = TokioBufReader::new(stdout); let stdout_reader = TokioBufReader::new(stdout);
let stderr_reader = TokioBufReader::new(stderr); let stderr_reader = TokioBufReader::new(stderr);
// Create variables we need for the spawned tasks
let app_dir = app
.path()
.app_data_dir()
.expect("Failed to get app data dir");
let db_path = app_dir.join("agents.db");
// Shared state for collecting session ID and live output // Shared state for collecting session ID and live output
let session_id = std::sync::Arc::new(Mutex::new(String::new())); let session_id = std::sync::Arc::new(Mutex::new(String::new()));
let live_output = std::sync::Arc::new(Mutex::new(String::new())); let live_output = std::sync::Arc::new(Mutex::new(String::new()));
@@ -1052,6 +1077,7 @@ async fn spawn_agent_system(
let registry_clone = registry.0.clone(); let registry_clone = registry.0.clone();
let first_output = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)); let first_output = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
let first_output_clone = first_output.clone(); let first_output_clone = first_output.clone();
let db_path_for_stdout = db_path.clone(); // Clone the db_path for the stdout task
let stdout_task = tokio::spawn(async move { let stdout_task = tokio::spawn(async move {
info!("📖 Starting to read Claude stdout..."); info!("📖 Starting to read Claude stdout...");
@@ -1095,6 +1121,23 @@ async fn spawn_agent_system(
if current_session_id.is_empty() { if current_session_id.is_empty() {
*current_session_id = sid.to_string(); *current_session_id = sid.to_string();
info!("🔑 Extracted session ID: {}", sid); info!("🔑 Extracted session ID: {}", sid);
// Update database immediately with session ID
if let Ok(conn) = Connection::open(&db_path_for_stdout) {
match conn.execute(
"UPDATE agent_runs SET session_id = ?1 WHERE id = ?2",
params![sid, run_id],
) {
Ok(rows) => {
if rows > 0 {
info!("✅ Updated agent run {} with session ID immediately", run_id);
}
}
Err(e) => {
error!("❌ Failed to update session ID immediately: {}", e);
}
}
}
} }
} }
} }
@@ -1164,12 +1207,7 @@ async fn spawn_agent_system(
.map_err(|e| format!("Failed to register process: {}", e))?; .map_err(|e| format!("Failed to register process: {}", e))?;
info!("📋 Registered process in registry"); info!("📋 Registered process in registry");
// Create variables we need for the spawned task let db_path_for_monitor = db_path.clone(); // Clone for the monitor task
let app_dir = app
.path()
.app_data_dir()
.expect("Failed to get app data dir");
let db_path = app_dir.join("agents.db");
// Monitor process status and wait for completion // Monitor process status and wait for completion
tokio::spawn(async move { tokio::spawn(async move {
@@ -1221,7 +1259,7 @@ async fn spawn_agent_system(
} }
// Update database // Update database
if let Ok(conn) = Connection::open(&db_path) { if let Ok(conn) = Connection::open(&db_path_for_monitor) {
let _ = conn.execute( let _ = conn.execute(
"UPDATE agent_runs SET status = 'failed', completed_at = CURRENT_TIMESTAMP WHERE id = ?1", "UPDATE agent_runs SET status = 'failed', completed_at = CURRENT_TIMESTAMP WHERE id = ?1",
params![run_id], params![run_id],
@@ -1255,7 +1293,7 @@ async fn spawn_agent_system(
info!("✅ Claude process execution monitoring complete"); info!("✅ Claude process execution monitoring complete");
// Update the run record with session ID and mark as completed - open a new connection // Update the run record with session ID and mark as completed - open a new connection
if let Ok(conn) = Connection::open(&db_path) { if let Ok(conn) = Connection::open(&db_path_for_monitor) {
info!("🔄 Updating database with extracted session ID: {}", extracted_session_id); info!("🔄 Updating database with extracted session ID: {}", extracted_session_id);
match conn.execute( match conn.execute(
"UPDATE agent_runs SET session_id = ?1, status = 'completed', completed_at = CURRENT_TIMESTAMP WHERE id = ?2", "UPDATE agent_runs SET session_id = ?1, status = 'completed', completed_at = CURRENT_TIMESTAMP WHERE id = ?2",

View File

@@ -12,7 +12,8 @@ import {
Clock, Clock,
Hash, Hash,
DollarSign, DollarSign,
ExternalLink ExternalLink,
StopCircle
} from 'lucide-react'; } from 'lucide-react';
import { Button } from '@/components/ui/button'; import { Button } from '@/components/ui/button';
import { Card, CardContent, CardHeader, CardTitle } from '@/components/ui/card'; import { Card, CardContent, CardHeader, CardTitle } from '@/components/ui/card';
@@ -64,13 +65,17 @@ export function AgentRunOutputViewer({
}: AgentRunOutputViewerProps) { }: AgentRunOutputViewerProps) {
const [messages, setMessages] = useState<ClaudeStreamMessage[]>([]); const [messages, setMessages] = useState<ClaudeStreamMessage[]>([]);
const [rawJsonlOutput, setRawJsonlOutput] = useState<string[]>([]); const [rawJsonlOutput, setRawJsonlOutput] = useState<string[]>([]);
const [loading, setLoading] = useState(false); const [loading, setLoading] = useState(true);
const [isFullscreen, setIsFullscreen] = useState(false); const [isFullscreen, setIsFullscreen] = useState(false);
const [refreshing, setRefreshing] = useState(false); const [refreshing, setRefreshing] = useState(false);
const [toast, setToast] = useState<{ message: string; type: "success" | "error" } | null>(null); const [toast, setToast] = useState<{ message: string; type: "success" | "error" } | null>(null);
const [copyPopoverOpen, setCopyPopoverOpen] = useState(false); const [copyPopoverOpen, setCopyPopoverOpen] = useState(false);
const [hasUserScrolled, setHasUserScrolled] = useState(false); const [hasUserScrolled, setHasUserScrolled] = useState(false);
// Track whether we're in the initial load phase
const isInitialLoadRef = useRef(true);
const hasSetupListenersRef = useRef(false);
const scrollAreaRef = useRef<HTMLDivElement>(null); const scrollAreaRef = useRef<HTMLDivElement>(null);
const outputEndRef = useRef<HTMLDivElement>(null); const outputEndRef = useRef<HTMLDivElement>(null);
const fullscreenScrollRef = useRef<HTMLDivElement>(null); const fullscreenScrollRef = useRef<HTMLDivElement>(null);
@@ -98,10 +103,12 @@ export function AgentRunOutputViewer({
} }
}; };
// Clean up listeners on unmount // Cleanup on unmount
useEffect(() => { useEffect(() => {
return () => { return () => {
unlistenRefs.current.forEach(unlisten => unlisten()); unlistenRefs.current.forEach(unlisten => unlisten());
unlistenRefs.current = [];
hasSetupListenersRef.current = false;
}; };
}, []); }, []);
@@ -235,17 +242,33 @@ export function AgentRunOutputViewer({
} }
}; };
// Set up live event listeners for running sessions
const setupLiveEventListeners = async () => { const setupLiveEventListeners = async () => {
if (!run.id) return; if (!run.id || hasSetupListenersRef.current) return;
try { try {
// Clean up existing listeners // Clean up existing listeners
unlistenRefs.current.forEach(unlisten => unlisten()); unlistenRefs.current.forEach(unlisten => unlisten());
unlistenRefs.current = []; unlistenRefs.current = [];
// Mark that we've set up listeners
hasSetupListenersRef.current = true;
// After setup, we're no longer in initial load
// Small delay to ensure any pending messages are processed
setTimeout(() => {
isInitialLoadRef.current = false;
}, 100);
// Set up live event listeners with run ID isolation // Set up live event listeners with run ID isolation
const outputUnlisten = await listen<string>(`agent-output:${run.id}`, (event) => { const outputUnlisten = await listen<string>(`agent-output:${run.id}`, (event) => {
try { try {
// Skip messages during initial load phase
if (isInitialLoadRef.current) {
console.log('[AgentRunOutputViewer] Skipping message during initial load');
return;
}
// Store raw JSONL // Store raw JSONL
setRawJsonlOutput(prev => [...prev, event.payload]); setRawJsonlOutput(prev => [...prev, event.payload]);
@@ -253,17 +276,18 @@ export function AgentRunOutputViewer({
const message = JSON.parse(event.payload) as ClaudeStreamMessage; const message = JSON.parse(event.payload) as ClaudeStreamMessage;
setMessages(prev => [...prev, message]); setMessages(prev => [...prev, message]);
} catch (err) { } catch (err) {
console.error("Failed to parse message:", err, event.payload); console.error("[AgentRunOutputViewer] Failed to parse message:", err, event.payload);
} }
}); });
const errorUnlisten = await listen<string>(`agent-error:${run.id}`, (event) => { const errorUnlisten = await listen<string>(`agent-error:${run.id}`, (event) => {
console.error("Agent error:", event.payload); console.error("[AgentRunOutputViewer] Agent error:", event.payload);
setToast({ message: event.payload, type: 'error' }); setToast({ message: event.payload, type: 'error' });
}); });
const completeUnlisten = await listen<boolean>(`agent-complete:${run.id}`, () => { const completeUnlisten = await listen<boolean>(`agent-complete:${run.id}`, () => {
setToast({ message: 'Agent execution completed', type: 'success' }); setToast({ message: 'Agent execution completed', type: 'success' });
// Don't set status here as the parent component should handle it
}); });
const cancelUnlisten = await listen<boolean>(`agent-cancelled:${run.id}`, () => { const cancelUnlisten = await listen<boolean>(`agent-cancelled:${run.id}`, () => {
@@ -272,7 +296,7 @@ export function AgentRunOutputViewer({
unlistenRefs.current = [outputUnlisten, errorUnlisten, completeUnlisten, cancelUnlisten]; unlistenRefs.current = [outputUnlisten, errorUnlisten, completeUnlisten, cancelUnlisten];
} catch (error) { } catch (error) {
console.error('Failed to set up live event listeners:', error); console.error('[AgentRunOutputViewer] Failed to set up live event listeners:', error);
} }
}; };
@@ -341,12 +365,63 @@ export function AgentRunOutputViewer({
setToast({ message: 'Output copied as Markdown', type: 'success' }); setToast({ message: 'Output copied as Markdown', type: 'success' });
}; };
const refreshOutput = async () => { const handleRefresh = async () => {
setRefreshing(true); setRefreshing(true);
await loadOutput(true); // Skip cache await loadOutput();
setRefreshing(false); setRefreshing(false);
}; };
const handleStop = async () => {
if (!run.id) {
console.error('[AgentRunOutputViewer] No run ID available to stop');
return;
}
try {
// Call the API to kill the agent session
const success = await api.killAgentSession(run.id);
if (success) {
console.log(`[AgentRunOutputViewer] Successfully stopped agent session ${run.id}`);
setToast({ message: 'Agent execution stopped', type: 'success' });
// Clean up listeners
unlistenRefs.current.forEach(unlisten => unlisten());
unlistenRefs.current = [];
hasSetupListenersRef.current = false;
// Add a message indicating execution was stopped
const stopMessage: ClaudeStreamMessage = {
type: "result",
subtype: "error",
is_error: true,
result: "Execution stopped by user",
duration_ms: 0,
usage: {
input_tokens: 0,
output_tokens: 0
}
};
setMessages(prev => [...prev, stopMessage]);
// Update the run status locally
// Optionally refresh the parent component
setTimeout(() => {
window.location.reload(); // Simple refresh to update the status
}, 1000);
} else {
console.warn(`[AgentRunOutputViewer] Failed to stop agent session ${run.id} - it may have already finished`);
setToast({ message: 'Failed to stop agent - it may have already finished', type: 'error' });
}
} catch (err) {
console.error('[AgentRunOutputViewer] Failed to stop agent:', err);
setToast({
message: `Failed to stop execution: ${err instanceof Error ? err.message : 'Unknown error'}`,
type: 'error'
});
}
};
const handleScroll = (e: React.UIEvent<HTMLDivElement>) => { const handleScroll = (e: React.UIEvent<HTMLDivElement>) => {
const target = e.currentTarget; const target = e.currentTarget;
const { scrollTop, scrollHeight, clientHeight } = target; const { scrollTop, scrollHeight, clientHeight } = target;
@@ -562,13 +637,25 @@ export function AgentRunOutputViewer({
<Button <Button
variant="ghost" variant="ghost"
size="sm" size="sm"
onClick={refreshOutput} onClick={handleRefresh}
disabled={refreshing} disabled={refreshing}
title="Refresh output" title="Refresh output"
className="h-8 px-2" className="h-8 px-2"
> >
<RotateCcw className={`h-4 w-4 ${refreshing ? 'animate-spin' : ''}`} /> <RotateCcw className={`h-4 w-4 ${refreshing ? 'animate-spin' : ''}`} />
</Button> </Button>
{run.status === 'running' && (
<Button
variant="ghost"
size="sm"
onClick={handleStop}
disabled={refreshing}
title="Stop execution"
className="h-8 px-2 text-destructive hover:text-destructive"
>
<StopCircle className="h-4 w-4" />
</Button>
)}
<Button <Button
variant="ghost" variant="ghost"
size="sm" size="sm"
@@ -667,11 +754,22 @@ export function AgentRunOutputViewer({
<Button <Button
variant="outline" variant="outline"
size="sm" size="sm"
onClick={refreshOutput} onClick={handleRefresh}
disabled={refreshing} disabled={refreshing}
> >
<RotateCcw className={`h-4 w-4 ${refreshing ? 'animate-spin' : ''}`} /> <RotateCcw className={`h-4 w-4 ${refreshing ? 'animate-spin' : ''}`} />
</Button> </Button>
{run.status === 'running' && (
<Button
variant="outline"
size="sm"
onClick={handleStop}
disabled={refreshing}
>
<StopCircle className="h-4 w-4 mr-2" />
Stop
</Button>
)}
<Button <Button
variant="outline" variant="outline"
size="sm" size="sm"

View File

@@ -7,7 +7,8 @@ import {
Clock, Clock,
Hash, Hash,
DollarSign, DollarSign,
Bot Bot,
StopCircle
} from "lucide-react"; } from "lucide-react";
import { Button } from "@/components/ui/button"; import { Button } from "@/components/ui/button";
import { Badge } from "@/components/ui/badge"; import { Badge } from "@/components/ui/badge";
@@ -115,14 +116,16 @@ export const AgentRunView: React.FC<AgentRunViewProps> = ({
const handleCopyAsMarkdown = async () => { const handleCopyAsMarkdown = async () => {
if (!run) return; if (!run) return;
let markdown = `# Agent Execution: ${run.agent_name}\n\n`; let markdown = `# Agent Run: ${run.agent_name}\n\n`;
markdown += `**Task:** ${run.task}\n`; markdown += `**Task:** ${run.task}\n`;
markdown += `**Model:** ${run.model === 'opus' ? 'Claude 4 Opus' : 'Claude 4 Sonnet'}\n`; markdown += `**Model:** ${run.model}\n`;
markdown += `**Date:** ${formatISOTimestamp(run.created_at)}\n`; markdown += `**Status:** ${run.status}\n`;
if (run.metrics?.duration_ms) markdown += `**Duration:** ${(run.metrics.duration_ms / 1000).toFixed(2)}s\n`; if (run.metrics) {
if (run.metrics?.total_tokens) markdown += `**Total Tokens:** ${run.metrics.total_tokens}\n`; markdown += `**Tokens:** ${run.metrics.total_tokens || 'N/A'}\n`;
if (run.metrics?.cost_usd) markdown += `**Cost:** $${run.metrics.cost_usd.toFixed(4)} USD\n`; markdown += `**Cost:** $${run.metrics.cost_usd?.toFixed(4) || 'N/A'}\n`;
markdown += `\n---\n\n`; }
markdown += `**Date:** ${new Date(run.created_at).toISOString()}\n\n`;
markdown += `---\n\n`;
for (const msg of messages) { for (const msg of messages) {
if (msg.type === "system" && msg.subtype === "init") { if (msg.type === "system" && msg.subtype === "init") {
@@ -170,6 +173,50 @@ export const AgentRunView: React.FC<AgentRunViewProps> = ({
setCopyPopoverOpen(false); setCopyPopoverOpen(false);
}; };
const handleStop = async () => {
if (!runId) {
console.error('[AgentRunView] No run ID available to stop');
return;
}
try {
// Call the API to kill the agent session
const success = await api.killAgentSession(runId);
if (success) {
console.log(`[AgentRunView] Successfully stopped agent session ${runId}`);
// Update the run status locally
if (run) {
setRun({ ...run, status: 'cancelled' });
}
// Add a message indicating execution was stopped
const stopMessage: ClaudeStreamMessage = {
type: "result",
subtype: "error",
is_error: true,
result: "Execution stopped by user",
duration_ms: 0,
usage: {
input_tokens: 0,
output_tokens: 0
}
};
setMessages(prev => [...prev, stopMessage]);
// Reload the run data after a short delay
setTimeout(() => {
loadRun();
}, 1000);
} else {
console.warn(`[AgentRunView] Failed to stop agent session ${runId} - it may have already finished`);
}
} catch (err) {
console.error('[AgentRunView] Failed to stop agent:', err);
}
};
const renderIcon = (iconName: string) => { const renderIcon = (iconName: string) => {
const Icon = AGENT_ICONS[iconName as keyof typeof AGENT_ICONS] || Bot; const Icon = AGENT_ICONS[iconName as keyof typeof AGENT_ICONS] || Bot;
return <Icon className="h-5 w-5" />; return <Icon className="h-5 w-5" />;
@@ -220,42 +267,56 @@ export const AgentRunView: React.FC<AgentRunViewProps> = ({
</div> </div>
</div> </div>
<Popover <div className="flex items-center gap-2">
trigger={ {run?.status === 'running' && (
<Button <Button
variant="ghost"
size="sm" size="sm"
className="flex items-center gap-2" variant="ghost"
onClick={handleStop}
className="text-destructive hover:text-destructive"
> >
<Copy className="h-4 w-4" /> <StopCircle className="h-4 w-4 mr-1" />
Copy Output Stop
<ChevronDown className="h-3 w-3" />
</Button> </Button>
} )}
content={
<div className="w-44 p-1"> <Popover
trigger={
<Button <Button
variant="ghost" variant="ghost"
size="sm" size="sm"
className="w-full justify-start" className="flex items-center gap-2"
onClick={handleCopyAsJsonl}
> >
Copy as JSONL <Copy className="h-4 w-4" />
Copy Output
<ChevronDown className="h-3 w-3" />
</Button> </Button>
<Button }
variant="ghost" content={
size="sm" <div className="w-44 p-1">
className="w-full justify-start" <Button
onClick={handleCopyAsMarkdown} variant="ghost"
> size="sm"
Copy as Markdown className="w-full justify-start"
</Button> onClick={handleCopyAsJsonl}
</div> >
} Copy as JSONL
open={copyPopoverOpen} </Button>
onOpenChange={setCopyPopoverOpen} <Button
align="end" variant="ghost"
/> size="sm"
className="w-full justify-start"
onClick={handleCopyAsMarkdown}
>
Copy as Markdown
</Button>
</div>
}
open={copyPopoverOpen}
onOpenChange={setCopyPopoverOpen}
align="end"
/>
</div>
</motion.div> </motion.div>
{/* Run Details */} {/* Run Details */}