diff --git a/src-tauri/src/commands/agents.rs b/src-tauri/src/commands/agents.rs index 3ad4a2e..6c52ea7 100644 --- a/src-tauri/src/commands/agents.rs +++ b/src-tauri/src/commands/agents.rs @@ -1,17 +1,19 @@ use anyhow::Result; use chrono; +use dirs; use log::{debug, error, info, warn}; +use regex; use reqwest; use rusqlite::{params, Connection, Result as SqliteResult}; use serde::{Deserialize, Serialize}; use serde_json::Value as JsonValue; +use std::io::{BufRead, BufReader}; use std::process::Stdio; use std::sync::{Arc, Mutex}; use tauri::{AppHandle, Emitter, Manager, State}; use tauri_plugin_shell::ShellExt; -use tokio::io::{AsyncBufReadExt, BufReader}; +use tokio::io::{AsyncBufReadExt, BufReader as TokioBufReader}; use tokio::process::Command; -use regex; /// Finds the full path to the claude binary /// This is necessary because macOS apps have a limited PATH environment @@ -855,11 +857,15 @@ async fn spawn_agent_sidecar( // Extract session ID from JSONL output if let Ok(json) = serde_json::from_str::(&line) { - if let Some(sid) = json.get("sessionId").and_then(|s| s.as_str()) { - if let Ok(mut current_session_id) = session_id_holder_clone.lock() { - if current_session_id.is_none() { - *current_session_id = Some(sid.to_string()); - info!("🔑 Extracted session ID: {}", sid); + // Claude Code uses "session_id" (underscore), not "sessionId" + if json.get("type").and_then(|t| t.as_str()) == Some("system") && + json.get("subtype").and_then(|s| s.as_str()) == Some("init") { + if let Some(sid) = json.get("session_id").and_then(|s| s.as_str()) { + if let Ok(mut current_session_id) = session_id_holder_clone.lock() { + if current_session_id.is_none() { + *current_session_id = Some(sid.to_string()); + info!("🔑 Extracted session ID: {}", sid); + } } } } @@ -955,10 +961,24 @@ async fn spawn_agent_sidecar( // Update the run record with session ID and mark as completed if let Ok(conn) = Connection::open(&db_path) { - let _ = conn.execute( + info!("🔄 Updating database with extracted session ID: {}", extracted_session_id); + match conn.execute( "UPDATE agent_runs SET session_id = ?1, status = 'completed', completed_at = CURRENT_TIMESTAMP WHERE id = ?2", params![extracted_session_id, run_id], - ); + ) { + Ok(rows_affected) => { + if rows_affected > 0 { + info!("✅ Successfully updated agent run {} with session ID: {}", run_id, extracted_session_id); + } else { + warn!("⚠️ No rows affected when updating agent run {} with session ID", run_id); + } + } + Err(e) => { + error!("❌ Failed to update agent run {} with session ID: {}", run_id, e); + } + } + } else { + error!("❌ Failed to open database to update session ID for run {}", run_id); } info!("✅ Claude sidecar execution monitoring complete"); @@ -1017,8 +1037,8 @@ async fn spawn_agent_system( info!("📡 Set up stdout/stderr readers"); // Create readers - let stdout_reader = BufReader::new(stdout); - let stderr_reader = BufReader::new(stderr); + let stdout_reader = TokioBufReader::new(stdout); + let stderr_reader = TokioBufReader::new(stderr); // Shared state for collecting session ID and live output let session_id = std::sync::Arc::new(Mutex::new(String::new())); @@ -1067,11 +1087,15 @@ async fn spawn_agent_system( // Extract session ID from JSONL output if let Ok(json) = serde_json::from_str::(&line) { - if let Some(sid) = json.get("sessionId").and_then(|s| s.as_str()) { - if let Ok(mut current_session_id) = session_id_clone.lock() { - if current_session_id.is_empty() { - *current_session_id = sid.to_string(); - info!("🔑 Extracted session ID: {}", sid); + // Claude Code uses "session_id" (underscore), not "sessionId" + if json.get("type").and_then(|t| t.as_str()) == Some("system") && + json.get("subtype").and_then(|s| s.as_str()) == Some("init") { + if let Some(sid) = json.get("session_id").and_then(|s| s.as_str()) { + if let Ok(mut current_session_id) = session_id_clone.lock() { + if current_session_id.is_empty() { + *current_session_id = sid.to_string(); + info!("🔑 Extracted session ID: {}", sid); + } } } } @@ -1232,10 +1256,24 @@ async fn spawn_agent_system( // Update the run record with session ID and mark as completed - open a new connection if let Ok(conn) = Connection::open(&db_path) { - let _ = conn.execute( + info!("🔄 Updating database with extracted session ID: {}", extracted_session_id); + match conn.execute( "UPDATE agent_runs SET session_id = ?1, status = 'completed', completed_at = CURRENT_TIMESTAMP WHERE id = ?2", params![extracted_session_id, run_id], - ); + ) { + Ok(rows_affected) => { + if rows_affected > 0 { + info!("✅ Successfully updated agent run {} with session ID: {}", run_id, extracted_session_id); + } else { + warn!("⚠️ No rows affected when updating agent run {} with session ID", run_id); + } + } + Err(e) => { + error!("❌ Failed to update agent run {} with session ID: {}", run_id, e); + } + } + } else { + error!("❌ Failed to open database to update session ID for run {}", run_id); } // Cleanup will be handled by the cleanup_finished_processes function @@ -1484,13 +1522,66 @@ pub async fn get_session_output( return Ok(String::new()); } - // Read the JSONL content - match read_session_jsonl(&run.session_id, &run.project_path).await { - Ok(content) => Ok(content), - Err(_) => { - // Fallback to live output if JSONL file doesn't exist yet - let live_output = registry.0.get_live_output(run_id)?; - Ok(live_output) + // Get the Claude directory + let claude_dir = dirs::home_dir() + .ok_or("Failed to get home directory")? + .join(".claude"); + + // Find the correct project directory by searching for the session file + let projects_dir = claude_dir.join("projects"); + + // Check if projects directory exists + if !projects_dir.exists() { + log::error!("Projects directory not found at: {:?}", projects_dir); + return Err("Projects directory not found".to_string()); + } + + // Search for the session file in all project directories + let mut session_file_path = None; + log::info!("Searching for session file {} in all project directories", run.session_id); + + if let Ok(entries) = std::fs::read_dir(&projects_dir) { + for entry in entries.filter_map(Result::ok) { + let path = entry.path(); + if path.is_dir() { + let dir_name = path.file_name().unwrap_or_default().to_string_lossy(); + log::debug!("Checking project directory: {}", dir_name); + + let potential_session_file = path.join(format!("{}.jsonl", run.session_id)); + if potential_session_file.exists() { + log::info!("Found session file at: {:?}", potential_session_file); + session_file_path = Some(potential_session_file); + break; + } else { + log::debug!("Session file not found in: {}", dir_name); + } + } + } + } else { + log::error!("Failed to read projects directory"); + } + + // If we found the session file, read it + if let Some(session_path) = session_file_path { + match tokio::fs::read_to_string(&session_path).await { + Ok(content) => Ok(content), + Err(e) => { + log::error!("Failed to read session file {}: {}", session_path.display(), e); + // Fallback to live output if file read fails + let live_output = registry.0.get_live_output(run_id)?; + Ok(live_output) + } + } + } else { + // If session file not found, try the old method as fallback + log::warn!("Session file not found for {}, trying legacy method", run.session_id); + match read_session_jsonl(&run.session_id, &run.project_path).await { + Ok(content) => Ok(content), + Err(_) => { + // Final fallback to live output + let live_output = registry.0.get_live_output(run_id)?; + Ok(live_output) + } } } } @@ -2076,3 +2167,68 @@ pub async fn import_agent_from_github( // Import using existing function import_agent(db, json_data).await } + +/// Load agent session history from JSONL file +/// Similar to Claude Code's load_session_history, but searches across all project directories +#[tauri::command] +pub async fn load_agent_session_history( + session_id: String, +) -> Result, String> { + log::info!("Loading agent session history for session: {}", session_id); + + let claude_dir = dirs::home_dir() + .ok_or("Failed to get home directory")? + .join(".claude"); + + let projects_dir = claude_dir.join("projects"); + + if !projects_dir.exists() { + log::error!("Projects directory not found at: {:?}", projects_dir); + return Err("Projects directory not found".to_string()); + } + + // Search for the session file in all project directories + let mut session_file_path = None; + log::info!("Searching for session file {} in all project directories", session_id); + + if let Ok(entries) = std::fs::read_dir(&projects_dir) { + for entry in entries.filter_map(Result::ok) { + let path = entry.path(); + if path.is_dir() { + let dir_name = path.file_name().unwrap_or_default().to_string_lossy(); + log::debug!("Checking project directory: {}", dir_name); + + let potential_session_file = path.join(format!("{}.jsonl", session_id)); + if potential_session_file.exists() { + log::info!("Found session file at: {:?}", potential_session_file); + session_file_path = Some(potential_session_file); + break; + } else { + log::debug!("Session file not found in: {}", dir_name); + } + } + } + } else { + log::error!("Failed to read projects directory"); + } + + if let Some(session_path) = session_file_path { + let file = std::fs::File::open(&session_path) + .map_err(|e| format!("Failed to open session file: {}", e))?; + + let reader = BufReader::new(file); + let mut messages = Vec::new(); + + for line in reader.lines() { + if let Ok(line) = line { + if let Ok(json) = serde_json::from_str::(&line) { + messages.push(json); + } + } + } + + Ok(messages) + } else { + Err(format!("Session file not found: {}", session_id)) + } +} diff --git a/src-tauri/src/main.rs b/src-tauri/src/main.rs index 24a44b5..ce6946c 100644 --- a/src-tauri/src/main.rs +++ b/src-tauri/src/main.rs @@ -14,7 +14,7 @@ use commands::agents::{ get_live_session_output, get_session_output, get_session_status, import_agent, import_agent_from_file, import_agent_from_github, init_database, kill_agent_session, list_agent_runs, list_agent_runs_with_metrics, list_agents, list_claude_installations, - list_running_sessions, set_claude_binary_path, stream_session_output, update_agent, AgentDb, + list_running_sessions, load_agent_session_history, set_claude_binary_path, stream_session_output, update_agent, AgentDb, }; use commands::claude::{ cancel_claude_execution, check_auto_checkpoint, check_claude_version, cleanup_old_checkpoints, @@ -136,6 +136,7 @@ fn main() { get_session_output, get_live_session_output, stream_session_output, + load_agent_session_history, get_claude_binary_path, set_claude_binary_path, list_claude_installations, diff --git a/src/components/AgentRunOutputViewer.tsx b/src/components/AgentRunOutputViewer.tsx index 2640ffe..fd0d4f3 100644 --- a/src/components/AgentRunOutputViewer.tsx +++ b/src/components/AgentRunOutputViewer.tsx @@ -116,23 +116,81 @@ export function AgentRunOutputViewer({ const loadOutput = async (skipCache = false) => { if (!run.id) return; + console.log('[AgentRunOutputViewer] Loading output for run:', { + runId: run.id, + status: run.status, + sessionId: run.session_id, + skipCache + }); + try { // Check cache first if not skipping cache if (!skipCache) { const cached = getCachedOutput(run.id); if (cached) { + console.log('[AgentRunOutputViewer] Found cached output'); const cachedJsonlLines = cached.output.split('\n').filter(line => line.trim()); setRawJsonlOutput(cachedJsonlLines); setMessages(cached.messages); // If cache is recent (less than 5 seconds old) and session isn't running, use cache only if (Date.now() - cached.lastUpdated < 5000 && run.status !== 'running') { + console.log('[AgentRunOutputViewer] Using recent cache, skipping refresh'); return; } } } setLoading(true); + + // If we have a session_id, try to load from JSONL file first + if (run.session_id && run.session_id !== '') { + console.log('[AgentRunOutputViewer] Attempting to load from JSONL with session_id:', run.session_id); + try { + const history = await api.loadAgentSessionHistory(run.session_id); + console.log('[AgentRunOutputViewer] Successfully loaded JSONL history:', history.length, 'messages'); + + // Convert history to messages format + const loadedMessages: ClaudeStreamMessage[] = history.map(entry => ({ + ...entry, + type: entry.type || "assistant" + })); + + setMessages(loadedMessages); + setRawJsonlOutput(history.map(h => JSON.stringify(h))); + + // Update cache + setCachedOutput(run.id, { + output: history.map(h => JSON.stringify(h)).join('\n'), + messages: loadedMessages, + lastUpdated: Date.now(), + status: run.status + }); + + // Set up live event listeners for running sessions + if (run.status === 'running') { + console.log('[AgentRunOutputViewer] Setting up live listeners for running session'); + setupLiveEventListeners(); + + try { + await api.streamSessionOutput(run.id); + } catch (streamError) { + console.warn('[AgentRunOutputViewer] Failed to start streaming, will poll instead:', streamError); + } + } + + return; + } catch (err) { + console.warn('[AgentRunOutputViewer] Failed to load from JSONL:', err); + console.warn('[AgentRunOutputViewer] Falling back to regular output method'); + } + } else { + console.log('[AgentRunOutputViewer] No session_id available, using fallback method'); + } + + // Fallback to the original method if JSONL loading fails or no session_id + console.log('[AgentRunOutputViewer] Using getSessionOutput fallback'); const rawOutput = await api.getSessionOutput(run.id); + console.log('[AgentRunOutputViewer] Received raw output:', rawOutput.length, 'characters'); // Parse JSONL output into messages const jsonlLines = rawOutput.split('\n').filter(line => line.trim()); @@ -144,9 +202,10 @@ export function AgentRunOutputViewer({ const message = JSON.parse(line) as ClaudeStreamMessage; parsedMessages.push(message); } catch (err) { - console.error("Failed to parse message:", err, line); + console.error("[AgentRunOutputViewer] Failed to parse message:", err, line); } } + console.log('[AgentRunOutputViewer] Parsed', parsedMessages.length, 'messages from output'); setMessages(parsedMessages); // Update cache @@ -159,12 +218,13 @@ export function AgentRunOutputViewer({ // Set up live event listeners for running sessions if (run.status === 'running') { + console.log('[AgentRunOutputViewer] Setting up live listeners for running session (fallback)'); setupLiveEventListeners(); try { await api.streamSessionOutput(run.id); } catch (streamError) { - console.warn('Failed to start streaming, will poll instead:', streamError); + console.warn('[AgentRunOutputViewer] Failed to start streaming (fallback), will poll instead:', streamError); } } } catch (error) { diff --git a/src/components/AgentRunView.tsx b/src/components/AgentRunView.tsx index efd3fae..a8afa85 100644 --- a/src/components/AgentRunView.tsx +++ b/src/components/AgentRunView.tsx @@ -64,7 +64,25 @@ export const AgentRunView: React.FC = ({ const runData = await api.getAgentRunWithRealTimeMetrics(runId); setRun(runData); - // Parse JSONL output into messages + // If we have a session_id, try to load from JSONL file first + if (runData.session_id && runData.session_id !== '') { + try { + const history = await api.loadAgentSessionHistory(runData.session_id); + + // Convert history to messages format + const loadedMessages: ClaudeStreamMessage[] = history.map(entry => ({ + ...entry, + type: entry.type || "assistant" + })); + + setMessages(loadedMessages); + return; + } catch (err) { + console.warn('Failed to load from JSONL, falling back to output field:', err); + } + } + + // Fallback: Parse JSONL output from the output field if (runData.output) { const parsedMessages: ClaudeStreamMessage[] = []; const lines = runData.output.split('\n').filter(line => line.trim()); diff --git a/src/components/SessionOutputViewer.tsx b/src/components/SessionOutputViewer.tsx index 560b093..eafcc01 100644 --- a/src/components/SessionOutputViewer.tsx +++ b/src/components/SessionOutputViewer.tsx @@ -109,6 +109,47 @@ export function SessionOutputViewer({ session, onClose, className }: SessionOutp } setLoading(true); + + // If we have a session_id, try to load from JSONL file first + if (session.session_id && session.session_id !== '') { + try { + const history = await api.loadAgentSessionHistory(session.session_id); + + // Convert history to messages format using AgentExecution style + const loadedMessages: ClaudeStreamMessage[] = history.map(entry => ({ + ...entry, + type: entry.type || "assistant" + })); + + setMessages(loadedMessages); + setRawJsonlOutput(history.map(h => JSON.stringify(h))); + + // Update cache + setCachedOutput(session.id, { + output: history.map(h => JSON.stringify(h)).join('\n'), + messages: loadedMessages, + lastUpdated: Date.now(), + status: session.status + }); + + // Set up live event listeners for running sessions + if (session.status === 'running') { + setupLiveEventListeners(); + + try { + await api.streamSessionOutput(session.id); + } catch (streamError) { + console.warn('Failed to start streaming, will poll instead:', streamError); + } + } + + return; + } catch (err) { + console.warn('Failed to load from JSONL, falling back to regular output:', err); + } + } + + // Fallback to the original method if JSONL loading fails or no session_id const rawOutput = await api.getSessionOutput(session.id); // Parse JSONL output into messages using AgentExecution style diff --git a/src/lib/api.ts b/src/lib/api.ts index 1d6f2d6..562d4c9 100644 --- a/src/lib/api.ts +++ b/src/lib/api.ts @@ -925,6 +925,21 @@ export const api = { return invoke("load_session_history", { sessionId, projectId }); }, + /** + * Loads the JSONL history for a specific agent session + * Similar to loadSessionHistory but searches across all project directories + * @param sessionId - The session ID (UUID) + * @returns Promise resolving to array of session messages + */ + async loadAgentSessionHistory(sessionId: string): Promise { + try { + return await invoke('load_agent_session_history', { sessionId }); + } catch (error) { + console.error("Failed to load agent session history:", error); + throw error; + } + }, + /** * Executes a new interactive Claude Code session with streaming output */