feat(agents): improve session ID extraction and JSONL file handling

- Fix session ID extraction to use correct field name "session_id" instead of "sessionId"
- Add comprehensive database update logging with error handling
- Implement cross-project session file search in get_session_output
- Add new load_agent_session_history command for robust JSONL loading
- Update UI components to prioritize JSONL file loading over fallback methods
- Improve error handling and logging throughout the session management flow
- Fix BufReader imports and alias conflicts in Tauri backend

This enhances the reliability of agent session tracking and output retrieval
by properly handling Claude Code's actual JSON structure and implementing
better fallback mechanisms for session data access.
This commit is contained in:
Mufeed VH
2025-07-04 19:12:47 +05:30
parent 9eeb336a8b
commit 7a2372dcde
6 changed files with 320 additions and 29 deletions

View File

@@ -1,17 +1,19 @@
use anyhow::Result; use anyhow::Result;
use chrono; use chrono;
use dirs;
use log::{debug, error, info, warn}; use log::{debug, error, info, warn};
use regex;
use reqwest; use reqwest;
use rusqlite::{params, Connection, Result as SqliteResult}; use rusqlite::{params, Connection, Result as SqliteResult};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::Value as JsonValue; use serde_json::Value as JsonValue;
use std::io::{BufRead, BufReader};
use std::process::Stdio; use std::process::Stdio;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use tauri::{AppHandle, Emitter, Manager, State}; use tauri::{AppHandle, Emitter, Manager, State};
use tauri_plugin_shell::ShellExt; use tauri_plugin_shell::ShellExt;
use tokio::io::{AsyncBufReadExt, BufReader}; use tokio::io::{AsyncBufReadExt, BufReader as TokioBufReader};
use tokio::process::Command; use tokio::process::Command;
use regex;
/// Finds the full path to the claude binary /// Finds the full path to the claude binary
/// This is necessary because macOS apps have a limited PATH environment /// This is necessary because macOS apps have a limited PATH environment
@@ -855,7 +857,10 @@ async fn spawn_agent_sidecar(
// Extract session ID from JSONL output // Extract session ID from JSONL output
if let Ok(json) = serde_json::from_str::<JsonValue>(&line) { if let Ok(json) = serde_json::from_str::<JsonValue>(&line) {
if let Some(sid) = json.get("sessionId").and_then(|s| s.as_str()) { // Claude Code uses "session_id" (underscore), not "sessionId"
if json.get("type").and_then(|t| t.as_str()) == Some("system") &&
json.get("subtype").and_then(|s| s.as_str()) == Some("init") {
if let Some(sid) = json.get("session_id").and_then(|s| s.as_str()) {
if let Ok(mut current_session_id) = session_id_holder_clone.lock() { if let Ok(mut current_session_id) = session_id_holder_clone.lock() {
if current_session_id.is_none() { if current_session_id.is_none() {
*current_session_id = Some(sid.to_string()); *current_session_id = Some(sid.to_string());
@@ -864,6 +869,7 @@ async fn spawn_agent_sidecar(
} }
} }
} }
}
// Emit the line to the frontend with run_id for isolation // Emit the line to the frontend with run_id for isolation
let _ = app_handle.emit(&format!("agent-output:{}", run_id), &line); let _ = app_handle.emit(&format!("agent-output:{}", run_id), &line);
@@ -955,10 +961,24 @@ async fn spawn_agent_sidecar(
// Update the run record with session ID and mark as completed // Update the run record with session ID and mark as completed
if let Ok(conn) = Connection::open(&db_path) { if let Ok(conn) = Connection::open(&db_path) {
let _ = conn.execute( info!("🔄 Updating database with extracted session ID: {}", extracted_session_id);
match conn.execute(
"UPDATE agent_runs SET session_id = ?1, status = 'completed', completed_at = CURRENT_TIMESTAMP WHERE id = ?2", "UPDATE agent_runs SET session_id = ?1, status = 'completed', completed_at = CURRENT_TIMESTAMP WHERE id = ?2",
params![extracted_session_id, run_id], params![extracted_session_id, run_id],
); ) {
Ok(rows_affected) => {
if rows_affected > 0 {
info!("✅ Successfully updated agent run {} with session ID: {}", run_id, extracted_session_id);
} else {
warn!("⚠️ No rows affected when updating agent run {} with session ID", run_id);
}
}
Err(e) => {
error!("❌ Failed to update agent run {} with session ID: {}", run_id, e);
}
}
} else {
error!("❌ Failed to open database to update session ID for run {}", run_id);
} }
info!("✅ Claude sidecar execution monitoring complete"); info!("✅ Claude sidecar execution monitoring complete");
@@ -1017,8 +1037,8 @@ async fn spawn_agent_system(
info!("📡 Set up stdout/stderr readers"); info!("📡 Set up stdout/stderr readers");
// Create readers // Create readers
let stdout_reader = BufReader::new(stdout); let stdout_reader = TokioBufReader::new(stdout);
let stderr_reader = BufReader::new(stderr); let stderr_reader = TokioBufReader::new(stderr);
// Shared state for collecting session ID and live output // Shared state for collecting session ID and live output
let session_id = std::sync::Arc::new(Mutex::new(String::new())); let session_id = std::sync::Arc::new(Mutex::new(String::new()));
@@ -1067,7 +1087,10 @@ async fn spawn_agent_system(
// Extract session ID from JSONL output // Extract session ID from JSONL output
if let Ok(json) = serde_json::from_str::<JsonValue>(&line) { if let Ok(json) = serde_json::from_str::<JsonValue>(&line) {
if let Some(sid) = json.get("sessionId").and_then(|s| s.as_str()) { // Claude Code uses "session_id" (underscore), not "sessionId"
if json.get("type").and_then(|t| t.as_str()) == Some("system") &&
json.get("subtype").and_then(|s| s.as_str()) == Some("init") {
if let Some(sid) = json.get("session_id").and_then(|s| s.as_str()) {
if let Ok(mut current_session_id) = session_id_clone.lock() { if let Ok(mut current_session_id) = session_id_clone.lock() {
if current_session_id.is_empty() { if current_session_id.is_empty() {
*current_session_id = sid.to_string(); *current_session_id = sid.to_string();
@@ -1076,6 +1099,7 @@ async fn spawn_agent_system(
} }
} }
} }
}
// Emit the line to the frontend with run_id for isolation // Emit the line to the frontend with run_id for isolation
let _ = app_handle.emit(&format!("agent-output:{}", run_id), &line); let _ = app_handle.emit(&format!("agent-output:{}", run_id), &line);
@@ -1232,10 +1256,24 @@ async fn spawn_agent_system(
// Update the run record with session ID and mark as completed - open a new connection // Update the run record with session ID and mark as completed - open a new connection
if let Ok(conn) = Connection::open(&db_path) { if let Ok(conn) = Connection::open(&db_path) {
let _ = conn.execute( info!("🔄 Updating database with extracted session ID: {}", extracted_session_id);
match conn.execute(
"UPDATE agent_runs SET session_id = ?1, status = 'completed', completed_at = CURRENT_TIMESTAMP WHERE id = ?2", "UPDATE agent_runs SET session_id = ?1, status = 'completed', completed_at = CURRENT_TIMESTAMP WHERE id = ?2",
params![extracted_session_id, run_id], params![extracted_session_id, run_id],
); ) {
Ok(rows_affected) => {
if rows_affected > 0 {
info!("✅ Successfully updated agent run {} with session ID: {}", run_id, extracted_session_id);
} else {
warn!("⚠️ No rows affected when updating agent run {} with session ID", run_id);
}
}
Err(e) => {
error!("❌ Failed to update agent run {} with session ID: {}", run_id, e);
}
}
} else {
error!("❌ Failed to open database to update session ID for run {}", run_id);
} }
// Cleanup will be handled by the cleanup_finished_processes function // Cleanup will be handled by the cleanup_finished_processes function
@@ -1484,15 +1522,68 @@ pub async fn get_session_output(
return Ok(String::new()); return Ok(String::new());
} }
// Read the JSONL content // Get the Claude directory
let claude_dir = dirs::home_dir()
.ok_or("Failed to get home directory")?
.join(".claude");
// Find the correct project directory by searching for the session file
let projects_dir = claude_dir.join("projects");
// Check if projects directory exists
if !projects_dir.exists() {
log::error!("Projects directory not found at: {:?}", projects_dir);
return Err("Projects directory not found".to_string());
}
// Search for the session file in all project directories
let mut session_file_path = None;
log::info!("Searching for session file {} in all project directories", run.session_id);
if let Ok(entries) = std::fs::read_dir(&projects_dir) {
for entry in entries.filter_map(Result::ok) {
let path = entry.path();
if path.is_dir() {
let dir_name = path.file_name().unwrap_or_default().to_string_lossy();
log::debug!("Checking project directory: {}", dir_name);
let potential_session_file = path.join(format!("{}.jsonl", run.session_id));
if potential_session_file.exists() {
log::info!("Found session file at: {:?}", potential_session_file);
session_file_path = Some(potential_session_file);
break;
} else {
log::debug!("Session file not found in: {}", dir_name);
}
}
}
} else {
log::error!("Failed to read projects directory");
}
// If we found the session file, read it
if let Some(session_path) = session_file_path {
match tokio::fs::read_to_string(&session_path).await {
Ok(content) => Ok(content),
Err(e) => {
log::error!("Failed to read session file {}: {}", session_path.display(), e);
// Fallback to live output if file read fails
let live_output = registry.0.get_live_output(run_id)?;
Ok(live_output)
}
}
} else {
// If session file not found, try the old method as fallback
log::warn!("Session file not found for {}, trying legacy method", run.session_id);
match read_session_jsonl(&run.session_id, &run.project_path).await { match read_session_jsonl(&run.session_id, &run.project_path).await {
Ok(content) => Ok(content), Ok(content) => Ok(content),
Err(_) => { Err(_) => {
// Fallback to live output if JSONL file doesn't exist yet // Final fallback to live output
let live_output = registry.0.get_live_output(run_id)?; let live_output = registry.0.get_live_output(run_id)?;
Ok(live_output) Ok(live_output)
} }
} }
}
} }
/// Stream real-time session output by watching the JSONL file /// Stream real-time session output by watching the JSONL file
@@ -2076,3 +2167,68 @@ pub async fn import_agent_from_github(
// Import using existing function // Import using existing function
import_agent(db, json_data).await import_agent(db, json_data).await
} }
/// Load agent session history from JSONL file
/// Similar to Claude Code's load_session_history, but searches across all project directories
#[tauri::command]
pub async fn load_agent_session_history(
session_id: String,
) -> Result<Vec<serde_json::Value>, String> {
log::info!("Loading agent session history for session: {}", session_id);
let claude_dir = dirs::home_dir()
.ok_or("Failed to get home directory")?
.join(".claude");
let projects_dir = claude_dir.join("projects");
if !projects_dir.exists() {
log::error!("Projects directory not found at: {:?}", projects_dir);
return Err("Projects directory not found".to_string());
}
// Search for the session file in all project directories
let mut session_file_path = None;
log::info!("Searching for session file {} in all project directories", session_id);
if let Ok(entries) = std::fs::read_dir(&projects_dir) {
for entry in entries.filter_map(Result::ok) {
let path = entry.path();
if path.is_dir() {
let dir_name = path.file_name().unwrap_or_default().to_string_lossy();
log::debug!("Checking project directory: {}", dir_name);
let potential_session_file = path.join(format!("{}.jsonl", session_id));
if potential_session_file.exists() {
log::info!("Found session file at: {:?}", potential_session_file);
session_file_path = Some(potential_session_file);
break;
} else {
log::debug!("Session file not found in: {}", dir_name);
}
}
}
} else {
log::error!("Failed to read projects directory");
}
if let Some(session_path) = session_file_path {
let file = std::fs::File::open(&session_path)
.map_err(|e| format!("Failed to open session file: {}", e))?;
let reader = BufReader::new(file);
let mut messages = Vec::new();
for line in reader.lines() {
if let Ok(line) = line {
if let Ok(json) = serde_json::from_str::<serde_json::Value>(&line) {
messages.push(json);
}
}
}
Ok(messages)
} else {
Err(format!("Session file not found: {}", session_id))
}
}

View File

@@ -14,7 +14,7 @@ use commands::agents::{
get_live_session_output, get_session_output, get_session_status, import_agent, get_live_session_output, get_session_output, get_session_status, import_agent,
import_agent_from_file, import_agent_from_github, init_database, kill_agent_session, import_agent_from_file, import_agent_from_github, init_database, kill_agent_session,
list_agent_runs, list_agent_runs_with_metrics, list_agents, list_claude_installations, list_agent_runs, list_agent_runs_with_metrics, list_agents, list_claude_installations,
list_running_sessions, set_claude_binary_path, stream_session_output, update_agent, AgentDb, list_running_sessions, load_agent_session_history, set_claude_binary_path, stream_session_output, update_agent, AgentDb,
}; };
use commands::claude::{ use commands::claude::{
cancel_claude_execution, check_auto_checkpoint, check_claude_version, cleanup_old_checkpoints, cancel_claude_execution, check_auto_checkpoint, check_claude_version, cleanup_old_checkpoints,
@@ -136,6 +136,7 @@ fn main() {
get_session_output, get_session_output,
get_live_session_output, get_live_session_output,
stream_session_output, stream_session_output,
load_agent_session_history,
get_claude_binary_path, get_claude_binary_path,
set_claude_binary_path, set_claude_binary_path,
list_claude_installations, list_claude_installations,

View File

@@ -116,23 +116,81 @@ export function AgentRunOutputViewer({
const loadOutput = async (skipCache = false) => { const loadOutput = async (skipCache = false) => {
if (!run.id) return; if (!run.id) return;
console.log('[AgentRunOutputViewer] Loading output for run:', {
runId: run.id,
status: run.status,
sessionId: run.session_id,
skipCache
});
try { try {
// Check cache first if not skipping cache // Check cache first if not skipping cache
if (!skipCache) { if (!skipCache) {
const cached = getCachedOutput(run.id); const cached = getCachedOutput(run.id);
if (cached) { if (cached) {
console.log('[AgentRunOutputViewer] Found cached output');
const cachedJsonlLines = cached.output.split('\n').filter(line => line.trim()); const cachedJsonlLines = cached.output.split('\n').filter(line => line.trim());
setRawJsonlOutput(cachedJsonlLines); setRawJsonlOutput(cachedJsonlLines);
setMessages(cached.messages); setMessages(cached.messages);
// If cache is recent (less than 5 seconds old) and session isn't running, use cache only // If cache is recent (less than 5 seconds old) and session isn't running, use cache only
if (Date.now() - cached.lastUpdated < 5000 && run.status !== 'running') { if (Date.now() - cached.lastUpdated < 5000 && run.status !== 'running') {
console.log('[AgentRunOutputViewer] Using recent cache, skipping refresh');
return; return;
} }
} }
} }
setLoading(true); setLoading(true);
// If we have a session_id, try to load from JSONL file first
if (run.session_id && run.session_id !== '') {
console.log('[AgentRunOutputViewer] Attempting to load from JSONL with session_id:', run.session_id);
try {
const history = await api.loadAgentSessionHistory(run.session_id);
console.log('[AgentRunOutputViewer] Successfully loaded JSONL history:', history.length, 'messages');
// Convert history to messages format
const loadedMessages: ClaudeStreamMessage[] = history.map(entry => ({
...entry,
type: entry.type || "assistant"
}));
setMessages(loadedMessages);
setRawJsonlOutput(history.map(h => JSON.stringify(h)));
// Update cache
setCachedOutput(run.id, {
output: history.map(h => JSON.stringify(h)).join('\n'),
messages: loadedMessages,
lastUpdated: Date.now(),
status: run.status
});
// Set up live event listeners for running sessions
if (run.status === 'running') {
console.log('[AgentRunOutputViewer] Setting up live listeners for running session');
setupLiveEventListeners();
try {
await api.streamSessionOutput(run.id);
} catch (streamError) {
console.warn('[AgentRunOutputViewer] Failed to start streaming, will poll instead:', streamError);
}
}
return;
} catch (err) {
console.warn('[AgentRunOutputViewer] Failed to load from JSONL:', err);
console.warn('[AgentRunOutputViewer] Falling back to regular output method');
}
} else {
console.log('[AgentRunOutputViewer] No session_id available, using fallback method');
}
// Fallback to the original method if JSONL loading fails or no session_id
console.log('[AgentRunOutputViewer] Using getSessionOutput fallback');
const rawOutput = await api.getSessionOutput(run.id); const rawOutput = await api.getSessionOutput(run.id);
console.log('[AgentRunOutputViewer] Received raw output:', rawOutput.length, 'characters');
// Parse JSONL output into messages // Parse JSONL output into messages
const jsonlLines = rawOutput.split('\n').filter(line => line.trim()); const jsonlLines = rawOutput.split('\n').filter(line => line.trim());
@@ -144,9 +202,10 @@ export function AgentRunOutputViewer({
const message = JSON.parse(line) as ClaudeStreamMessage; const message = JSON.parse(line) as ClaudeStreamMessage;
parsedMessages.push(message); parsedMessages.push(message);
} catch (err) { } catch (err) {
console.error("Failed to parse message:", err, line); console.error("[AgentRunOutputViewer] Failed to parse message:", err, line);
} }
} }
console.log('[AgentRunOutputViewer] Parsed', parsedMessages.length, 'messages from output');
setMessages(parsedMessages); setMessages(parsedMessages);
// Update cache // Update cache
@@ -159,12 +218,13 @@ export function AgentRunOutputViewer({
// Set up live event listeners for running sessions // Set up live event listeners for running sessions
if (run.status === 'running') { if (run.status === 'running') {
console.log('[AgentRunOutputViewer] Setting up live listeners for running session (fallback)');
setupLiveEventListeners(); setupLiveEventListeners();
try { try {
await api.streamSessionOutput(run.id); await api.streamSessionOutput(run.id);
} catch (streamError) { } catch (streamError) {
console.warn('Failed to start streaming, will poll instead:', streamError); console.warn('[AgentRunOutputViewer] Failed to start streaming (fallback), will poll instead:', streamError);
} }
} }
} catch (error) { } catch (error) {

View File

@@ -64,7 +64,25 @@ export const AgentRunView: React.FC<AgentRunViewProps> = ({
const runData = await api.getAgentRunWithRealTimeMetrics(runId); const runData = await api.getAgentRunWithRealTimeMetrics(runId);
setRun(runData); setRun(runData);
// Parse JSONL output into messages // If we have a session_id, try to load from JSONL file first
if (runData.session_id && runData.session_id !== '') {
try {
const history = await api.loadAgentSessionHistory(runData.session_id);
// Convert history to messages format
const loadedMessages: ClaudeStreamMessage[] = history.map(entry => ({
...entry,
type: entry.type || "assistant"
}));
setMessages(loadedMessages);
return;
} catch (err) {
console.warn('Failed to load from JSONL, falling back to output field:', err);
}
}
// Fallback: Parse JSONL output from the output field
if (runData.output) { if (runData.output) {
const parsedMessages: ClaudeStreamMessage[] = []; const parsedMessages: ClaudeStreamMessage[] = [];
const lines = runData.output.split('\n').filter(line => line.trim()); const lines = runData.output.split('\n').filter(line => line.trim());

View File

@@ -109,6 +109,47 @@ export function SessionOutputViewer({ session, onClose, className }: SessionOutp
} }
setLoading(true); setLoading(true);
// If we have a session_id, try to load from JSONL file first
if (session.session_id && session.session_id !== '') {
try {
const history = await api.loadAgentSessionHistory(session.session_id);
// Convert history to messages format using AgentExecution style
const loadedMessages: ClaudeStreamMessage[] = history.map(entry => ({
...entry,
type: entry.type || "assistant"
}));
setMessages(loadedMessages);
setRawJsonlOutput(history.map(h => JSON.stringify(h)));
// Update cache
setCachedOutput(session.id, {
output: history.map(h => JSON.stringify(h)).join('\n'),
messages: loadedMessages,
lastUpdated: Date.now(),
status: session.status
});
// Set up live event listeners for running sessions
if (session.status === 'running') {
setupLiveEventListeners();
try {
await api.streamSessionOutput(session.id);
} catch (streamError) {
console.warn('Failed to start streaming, will poll instead:', streamError);
}
}
return;
} catch (err) {
console.warn('Failed to load from JSONL, falling back to regular output:', err);
}
}
// Fallback to the original method if JSONL loading fails or no session_id
const rawOutput = await api.getSessionOutput(session.id); const rawOutput = await api.getSessionOutput(session.id);
// Parse JSONL output into messages using AgentExecution style // Parse JSONL output into messages using AgentExecution style

View File

@@ -925,6 +925,21 @@ export const api = {
return invoke("load_session_history", { sessionId, projectId }); return invoke("load_session_history", { sessionId, projectId });
}, },
/**
* Loads the JSONL history for a specific agent session
* Similar to loadSessionHistory but searches across all project directories
* @param sessionId - The session ID (UUID)
* @returns Promise resolving to array of session messages
*/
async loadAgentSessionHistory(sessionId: string): Promise<any[]> {
try {
return await invoke<any[]>('load_agent_session_history', { sessionId });
} catch (error) {
console.error("Failed to load agent session history:", error);
throw error;
}
},
/** /**
* Executes a new interactive Claude Code session with streaming output * Executes a new interactive Claude Code session with streaming output
*/ */