From 7a2372dcde3e7c5d5c3d1b1f7619a01f13c5df80 Mon Sep 17 00:00:00 2001 From: Mufeed VH Date: Fri, 4 Jul 2025 19:12:47 +0530 Subject: [PATCH] feat(agents): improve session ID extraction and JSONL file handling - Fix session ID extraction to use correct field name "session_id" instead of "sessionId" - Add comprehensive database update logging with error handling - Implement cross-project session file search in get_session_output - Add new load_agent_session_history command for robust JSONL loading - Update UI components to prioritize JSONL file loading over fallback methods - Improve error handling and logging throughout the session management flow - Fix BufReader imports and alias conflicts in Tauri backend This enhances the reliability of agent session tracking and output retrieval by properly handling Claude Code's actual JSON structure and implementing better fallback mechanisms for session data access. --- src-tauri/src/commands/agents.rs | 206 +++++++++++++++++++++--- src-tauri/src/main.rs | 3 +- src/components/AgentRunOutputViewer.tsx | 64 +++++++- src/components/AgentRunView.tsx | 20 ++- src/components/SessionOutputViewer.tsx | 41 +++++ src/lib/api.ts | 15 ++ 6 files changed, 320 insertions(+), 29 deletions(-) diff --git a/src-tauri/src/commands/agents.rs b/src-tauri/src/commands/agents.rs index 3ad4a2e..6c52ea7 100644 --- a/src-tauri/src/commands/agents.rs +++ b/src-tauri/src/commands/agents.rs @@ -1,17 +1,19 @@ use anyhow::Result; use chrono; +use dirs; use log::{debug, error, info, warn}; +use regex; use reqwest; use rusqlite::{params, Connection, Result as SqliteResult}; use serde::{Deserialize, Serialize}; use serde_json::Value as JsonValue; +use std::io::{BufRead, BufReader}; use std::process::Stdio; use std::sync::{Arc, Mutex}; use tauri::{AppHandle, Emitter, Manager, State}; use tauri_plugin_shell::ShellExt; -use tokio::io::{AsyncBufReadExt, BufReader}; +use tokio::io::{AsyncBufReadExt, BufReader as TokioBufReader}; use tokio::process::Command; -use regex; /// Finds the full path to the claude binary /// This is necessary because macOS apps have a limited PATH environment @@ -855,11 +857,15 @@ async fn spawn_agent_sidecar( // Extract session ID from JSONL output if let Ok(json) = serde_json::from_str::(&line) { - if let Some(sid) = json.get("sessionId").and_then(|s| s.as_str()) { - if let Ok(mut current_session_id) = session_id_holder_clone.lock() { - if current_session_id.is_none() { - *current_session_id = Some(sid.to_string()); - info!("🔑 Extracted session ID: {}", sid); + // Claude Code uses "session_id" (underscore), not "sessionId" + if json.get("type").and_then(|t| t.as_str()) == Some("system") && + json.get("subtype").and_then(|s| s.as_str()) == Some("init") { + if let Some(sid) = json.get("session_id").and_then(|s| s.as_str()) { + if let Ok(mut current_session_id) = session_id_holder_clone.lock() { + if current_session_id.is_none() { + *current_session_id = Some(sid.to_string()); + info!("🔑 Extracted session ID: {}", sid); + } } } } @@ -955,10 +961,24 @@ async fn spawn_agent_sidecar( // Update the run record with session ID and mark as completed if let Ok(conn) = Connection::open(&db_path) { - let _ = conn.execute( + info!("🔄 Updating database with extracted session ID: {}", extracted_session_id); + match conn.execute( "UPDATE agent_runs SET session_id = ?1, status = 'completed', completed_at = CURRENT_TIMESTAMP WHERE id = ?2", params![extracted_session_id, run_id], - ); + ) { + Ok(rows_affected) => { + if rows_affected > 0 { + info!("✅ Successfully updated agent run {} with session ID: {}", run_id, extracted_session_id); + } else { + warn!("⚠️ No rows affected when updating agent run {} with session ID", run_id); + } + } + Err(e) => { + error!("❌ Failed to update agent run {} with session ID: {}", run_id, e); + } + } + } else { + error!("❌ Failed to open database to update session ID for run {}", run_id); } info!("✅ Claude sidecar execution monitoring complete"); @@ -1017,8 +1037,8 @@ async fn spawn_agent_system( info!("📡 Set up stdout/stderr readers"); // Create readers - let stdout_reader = BufReader::new(stdout); - let stderr_reader = BufReader::new(stderr); + let stdout_reader = TokioBufReader::new(stdout); + let stderr_reader = TokioBufReader::new(stderr); // Shared state for collecting session ID and live output let session_id = std::sync::Arc::new(Mutex::new(String::new())); @@ -1067,11 +1087,15 @@ async fn spawn_agent_system( // Extract session ID from JSONL output if let Ok(json) = serde_json::from_str::(&line) { - if let Some(sid) = json.get("sessionId").and_then(|s| s.as_str()) { - if let Ok(mut current_session_id) = session_id_clone.lock() { - if current_session_id.is_empty() { - *current_session_id = sid.to_string(); - info!("🔑 Extracted session ID: {}", sid); + // Claude Code uses "session_id" (underscore), not "sessionId" + if json.get("type").and_then(|t| t.as_str()) == Some("system") && + json.get("subtype").and_then(|s| s.as_str()) == Some("init") { + if let Some(sid) = json.get("session_id").and_then(|s| s.as_str()) { + if let Ok(mut current_session_id) = session_id_clone.lock() { + if current_session_id.is_empty() { + *current_session_id = sid.to_string(); + info!("🔑 Extracted session ID: {}", sid); + } } } } @@ -1232,10 +1256,24 @@ async fn spawn_agent_system( // Update the run record with session ID and mark as completed - open a new connection if let Ok(conn) = Connection::open(&db_path) { - let _ = conn.execute( + info!("🔄 Updating database with extracted session ID: {}", extracted_session_id); + match conn.execute( "UPDATE agent_runs SET session_id = ?1, status = 'completed', completed_at = CURRENT_TIMESTAMP WHERE id = ?2", params![extracted_session_id, run_id], - ); + ) { + Ok(rows_affected) => { + if rows_affected > 0 { + info!("✅ Successfully updated agent run {} with session ID: {}", run_id, extracted_session_id); + } else { + warn!("⚠️ No rows affected when updating agent run {} with session ID", run_id); + } + } + Err(e) => { + error!("❌ Failed to update agent run {} with session ID: {}", run_id, e); + } + } + } else { + error!("❌ Failed to open database to update session ID for run {}", run_id); } // Cleanup will be handled by the cleanup_finished_processes function @@ -1484,13 +1522,66 @@ pub async fn get_session_output( return Ok(String::new()); } - // Read the JSONL content - match read_session_jsonl(&run.session_id, &run.project_path).await { - Ok(content) => Ok(content), - Err(_) => { - // Fallback to live output if JSONL file doesn't exist yet - let live_output = registry.0.get_live_output(run_id)?; - Ok(live_output) + // Get the Claude directory + let claude_dir = dirs::home_dir() + .ok_or("Failed to get home directory")? + .join(".claude"); + + // Find the correct project directory by searching for the session file + let projects_dir = claude_dir.join("projects"); + + // Check if projects directory exists + if !projects_dir.exists() { + log::error!("Projects directory not found at: {:?}", projects_dir); + return Err("Projects directory not found".to_string()); + } + + // Search for the session file in all project directories + let mut session_file_path = None; + log::info!("Searching for session file {} in all project directories", run.session_id); + + if let Ok(entries) = std::fs::read_dir(&projects_dir) { + for entry in entries.filter_map(Result::ok) { + let path = entry.path(); + if path.is_dir() { + let dir_name = path.file_name().unwrap_or_default().to_string_lossy(); + log::debug!("Checking project directory: {}", dir_name); + + let potential_session_file = path.join(format!("{}.jsonl", run.session_id)); + if potential_session_file.exists() { + log::info!("Found session file at: {:?}", potential_session_file); + session_file_path = Some(potential_session_file); + break; + } else { + log::debug!("Session file not found in: {}", dir_name); + } + } + } + } else { + log::error!("Failed to read projects directory"); + } + + // If we found the session file, read it + if let Some(session_path) = session_file_path { + match tokio::fs::read_to_string(&session_path).await { + Ok(content) => Ok(content), + Err(e) => { + log::error!("Failed to read session file {}: {}", session_path.display(), e); + // Fallback to live output if file read fails + let live_output = registry.0.get_live_output(run_id)?; + Ok(live_output) + } + } + } else { + // If session file not found, try the old method as fallback + log::warn!("Session file not found for {}, trying legacy method", run.session_id); + match read_session_jsonl(&run.session_id, &run.project_path).await { + Ok(content) => Ok(content), + Err(_) => { + // Final fallback to live output + let live_output = registry.0.get_live_output(run_id)?; + Ok(live_output) + } } } } @@ -2076,3 +2167,68 @@ pub async fn import_agent_from_github( // Import using existing function import_agent(db, json_data).await } + +/// Load agent session history from JSONL file +/// Similar to Claude Code's load_session_history, but searches across all project directories +#[tauri::command] +pub async fn load_agent_session_history( + session_id: String, +) -> Result, String> { + log::info!("Loading agent session history for session: {}", session_id); + + let claude_dir = dirs::home_dir() + .ok_or("Failed to get home directory")? + .join(".claude"); + + let projects_dir = claude_dir.join("projects"); + + if !projects_dir.exists() { + log::error!("Projects directory not found at: {:?}", projects_dir); + return Err("Projects directory not found".to_string()); + } + + // Search for the session file in all project directories + let mut session_file_path = None; + log::info!("Searching for session file {} in all project directories", session_id); + + if let Ok(entries) = std::fs::read_dir(&projects_dir) { + for entry in entries.filter_map(Result::ok) { + let path = entry.path(); + if path.is_dir() { + let dir_name = path.file_name().unwrap_or_default().to_string_lossy(); + log::debug!("Checking project directory: {}", dir_name); + + let potential_session_file = path.join(format!("{}.jsonl", session_id)); + if potential_session_file.exists() { + log::info!("Found session file at: {:?}", potential_session_file); + session_file_path = Some(potential_session_file); + break; + } else { + log::debug!("Session file not found in: {}", dir_name); + } + } + } + } else { + log::error!("Failed to read projects directory"); + } + + if let Some(session_path) = session_file_path { + let file = std::fs::File::open(&session_path) + .map_err(|e| format!("Failed to open session file: {}", e))?; + + let reader = BufReader::new(file); + let mut messages = Vec::new(); + + for line in reader.lines() { + if let Ok(line) = line { + if let Ok(json) = serde_json::from_str::(&line) { + messages.push(json); + } + } + } + + Ok(messages) + } else { + Err(format!("Session file not found: {}", session_id)) + } +} diff --git a/src-tauri/src/main.rs b/src-tauri/src/main.rs index 24a44b5..ce6946c 100644 --- a/src-tauri/src/main.rs +++ b/src-tauri/src/main.rs @@ -14,7 +14,7 @@ use commands::agents::{ get_live_session_output, get_session_output, get_session_status, import_agent, import_agent_from_file, import_agent_from_github, init_database, kill_agent_session, list_agent_runs, list_agent_runs_with_metrics, list_agents, list_claude_installations, - list_running_sessions, set_claude_binary_path, stream_session_output, update_agent, AgentDb, + list_running_sessions, load_agent_session_history, set_claude_binary_path, stream_session_output, update_agent, AgentDb, }; use commands::claude::{ cancel_claude_execution, check_auto_checkpoint, check_claude_version, cleanup_old_checkpoints, @@ -136,6 +136,7 @@ fn main() { get_session_output, get_live_session_output, stream_session_output, + load_agent_session_history, get_claude_binary_path, set_claude_binary_path, list_claude_installations, diff --git a/src/components/AgentRunOutputViewer.tsx b/src/components/AgentRunOutputViewer.tsx index 2640ffe..fd0d4f3 100644 --- a/src/components/AgentRunOutputViewer.tsx +++ b/src/components/AgentRunOutputViewer.tsx @@ -116,23 +116,81 @@ export function AgentRunOutputViewer({ const loadOutput = async (skipCache = false) => { if (!run.id) return; + console.log('[AgentRunOutputViewer] Loading output for run:', { + runId: run.id, + status: run.status, + sessionId: run.session_id, + skipCache + }); + try { // Check cache first if not skipping cache if (!skipCache) { const cached = getCachedOutput(run.id); if (cached) { + console.log('[AgentRunOutputViewer] Found cached output'); const cachedJsonlLines = cached.output.split('\n').filter(line => line.trim()); setRawJsonlOutput(cachedJsonlLines); setMessages(cached.messages); // If cache is recent (less than 5 seconds old) and session isn't running, use cache only if (Date.now() - cached.lastUpdated < 5000 && run.status !== 'running') { + console.log('[AgentRunOutputViewer] Using recent cache, skipping refresh'); return; } } } setLoading(true); + + // If we have a session_id, try to load from JSONL file first + if (run.session_id && run.session_id !== '') { + console.log('[AgentRunOutputViewer] Attempting to load from JSONL with session_id:', run.session_id); + try { + const history = await api.loadAgentSessionHistory(run.session_id); + console.log('[AgentRunOutputViewer] Successfully loaded JSONL history:', history.length, 'messages'); + + // Convert history to messages format + const loadedMessages: ClaudeStreamMessage[] = history.map(entry => ({ + ...entry, + type: entry.type || "assistant" + })); + + setMessages(loadedMessages); + setRawJsonlOutput(history.map(h => JSON.stringify(h))); + + // Update cache + setCachedOutput(run.id, { + output: history.map(h => JSON.stringify(h)).join('\n'), + messages: loadedMessages, + lastUpdated: Date.now(), + status: run.status + }); + + // Set up live event listeners for running sessions + if (run.status === 'running') { + console.log('[AgentRunOutputViewer] Setting up live listeners for running session'); + setupLiveEventListeners(); + + try { + await api.streamSessionOutput(run.id); + } catch (streamError) { + console.warn('[AgentRunOutputViewer] Failed to start streaming, will poll instead:', streamError); + } + } + + return; + } catch (err) { + console.warn('[AgentRunOutputViewer] Failed to load from JSONL:', err); + console.warn('[AgentRunOutputViewer] Falling back to regular output method'); + } + } else { + console.log('[AgentRunOutputViewer] No session_id available, using fallback method'); + } + + // Fallback to the original method if JSONL loading fails or no session_id + console.log('[AgentRunOutputViewer] Using getSessionOutput fallback'); const rawOutput = await api.getSessionOutput(run.id); + console.log('[AgentRunOutputViewer] Received raw output:', rawOutput.length, 'characters'); // Parse JSONL output into messages const jsonlLines = rawOutput.split('\n').filter(line => line.trim()); @@ -144,9 +202,10 @@ export function AgentRunOutputViewer({ const message = JSON.parse(line) as ClaudeStreamMessage; parsedMessages.push(message); } catch (err) { - console.error("Failed to parse message:", err, line); + console.error("[AgentRunOutputViewer] Failed to parse message:", err, line); } } + console.log('[AgentRunOutputViewer] Parsed', parsedMessages.length, 'messages from output'); setMessages(parsedMessages); // Update cache @@ -159,12 +218,13 @@ export function AgentRunOutputViewer({ // Set up live event listeners for running sessions if (run.status === 'running') { + console.log('[AgentRunOutputViewer] Setting up live listeners for running session (fallback)'); setupLiveEventListeners(); try { await api.streamSessionOutput(run.id); } catch (streamError) { - console.warn('Failed to start streaming, will poll instead:', streamError); + console.warn('[AgentRunOutputViewer] Failed to start streaming (fallback), will poll instead:', streamError); } } } catch (error) { diff --git a/src/components/AgentRunView.tsx b/src/components/AgentRunView.tsx index efd3fae..a8afa85 100644 --- a/src/components/AgentRunView.tsx +++ b/src/components/AgentRunView.tsx @@ -64,7 +64,25 @@ export const AgentRunView: React.FC = ({ const runData = await api.getAgentRunWithRealTimeMetrics(runId); setRun(runData); - // Parse JSONL output into messages + // If we have a session_id, try to load from JSONL file first + if (runData.session_id && runData.session_id !== '') { + try { + const history = await api.loadAgentSessionHistory(runData.session_id); + + // Convert history to messages format + const loadedMessages: ClaudeStreamMessage[] = history.map(entry => ({ + ...entry, + type: entry.type || "assistant" + })); + + setMessages(loadedMessages); + return; + } catch (err) { + console.warn('Failed to load from JSONL, falling back to output field:', err); + } + } + + // Fallback: Parse JSONL output from the output field if (runData.output) { const parsedMessages: ClaudeStreamMessage[] = []; const lines = runData.output.split('\n').filter(line => line.trim()); diff --git a/src/components/SessionOutputViewer.tsx b/src/components/SessionOutputViewer.tsx index 560b093..eafcc01 100644 --- a/src/components/SessionOutputViewer.tsx +++ b/src/components/SessionOutputViewer.tsx @@ -109,6 +109,47 @@ export function SessionOutputViewer({ session, onClose, className }: SessionOutp } setLoading(true); + + // If we have a session_id, try to load from JSONL file first + if (session.session_id && session.session_id !== '') { + try { + const history = await api.loadAgentSessionHistory(session.session_id); + + // Convert history to messages format using AgentExecution style + const loadedMessages: ClaudeStreamMessage[] = history.map(entry => ({ + ...entry, + type: entry.type || "assistant" + })); + + setMessages(loadedMessages); + setRawJsonlOutput(history.map(h => JSON.stringify(h))); + + // Update cache + setCachedOutput(session.id, { + output: history.map(h => JSON.stringify(h)).join('\n'), + messages: loadedMessages, + lastUpdated: Date.now(), + status: session.status + }); + + // Set up live event listeners for running sessions + if (session.status === 'running') { + setupLiveEventListeners(); + + try { + await api.streamSessionOutput(session.id); + } catch (streamError) { + console.warn('Failed to start streaming, will poll instead:', streamError); + } + } + + return; + } catch (err) { + console.warn('Failed to load from JSONL, falling back to regular output:', err); + } + } + + // Fallback to the original method if JSONL loading fails or no session_id const rawOutput = await api.getSessionOutput(session.id); // Parse JSONL output into messages using AgentExecution style diff --git a/src/lib/api.ts b/src/lib/api.ts index 1d6f2d6..562d4c9 100644 --- a/src/lib/api.ts +++ b/src/lib/api.ts @@ -925,6 +925,21 @@ export const api = { return invoke("load_session_history", { sessionId, projectId }); }, + /** + * Loads the JSONL history for a specific agent session + * Similar to loadSessionHistory but searches across all project directories + * @param sessionId - The session ID (UUID) + * @returns Promise resolving to array of session messages + */ + async loadAgentSessionHistory(sessionId: string): Promise { + try { + return await invoke('load_agent_session_history', { sessionId }); + } catch (error) { + console.error("Failed to load agent session history:", error); + throw error; + } + }, + /** * Executes a new interactive Claude Code session with streaming output */