Reduce swarm local persistence

This commit is contained in:
2026-04-04 03:37:55 +08:00
parent eb96764770
commit f65baebb3c
7 changed files with 83 additions and 334 deletions

View File

@@ -59,7 +59,7 @@ import {
isShutdownApproved, isShutdownApproved,
isShutdownRequest, isShutdownRequest,
isTeamPermissionUpdate, isTeamPermissionUpdate,
markMessagesAsRead, markMessagesAsReadByPredicate,
readUnreadMessages, readUnreadMessages,
type TeammateMessage, type TeammateMessage,
writeToMailbox, writeToMailbox,
@@ -195,10 +195,20 @@ export function useInboxPoller({
} }
} }
// Helper to mark messages as read in the inbox file. // Helper to remove the unread batch we just processed from the inbox file.
// Called after messages are successfully delivered or reliably queued. // Called after messages are successfully delivered or reliably queued.
const deliveredMessageKeys = new Set(
unread.map(message => `${message.from}|${message.timestamp}|${message.text}`),
)
const markRead = () => { const markRead = () => {
void markMessagesAsRead(agentName, currentAppState.teamContext?.teamName) void markMessagesAsReadByPredicate(
agentName,
message =>
deliveredMessageKeys.has(
`${message.from}|${message.timestamp}|${message.text}`,
),
currentAppState.teamContext?.teamName,
)
} }
// Separate permission messages from regular teammate messages // Separate permission messages from regular teammate messages

View File

@@ -1,197 +0,0 @@
import { mkdir, readdir, readFile, unlink, writeFile } from 'fs/promises'
import { join } from 'path'
import { z } from 'zod/v4'
import { getCwd } from '../../utils/cwd.js'
import { logForDebugging } from '../../utils/debug.js'
import { lazySchema } from '../../utils/lazySchema.js'
import { jsonParse, jsonStringify } from '../../utils/slowOperations.js'
import { type AgentMemoryScope, getAgentMemoryDir } from './agentMemory.js'
const SNAPSHOT_BASE = 'agent-memory-snapshots'
const SNAPSHOT_JSON = 'snapshot.json'
const SYNCED_JSON = '.snapshot-synced.json'
const snapshotMetaSchema = lazySchema(() =>
z.object({
updatedAt: z.string().min(1),
}),
)
const syncedMetaSchema = lazySchema(() =>
z.object({
syncedFrom: z.string().min(1),
}),
)
type SyncedMeta = z.infer<ReturnType<typeof syncedMetaSchema>>
/**
* Returns the path to the snapshot directory for an agent in the current project.
* e.g., <cwd>/.claude/agent-memory-snapshots/<agentType>/
*/
export function getSnapshotDirForAgent(agentType: string): string {
return join(getCwd(), '.claude', SNAPSHOT_BASE, agentType)
}
function getSnapshotJsonPath(agentType: string): string {
return join(getSnapshotDirForAgent(agentType), SNAPSHOT_JSON)
}
function getSyncedJsonPath(agentType: string, scope: AgentMemoryScope): string {
return join(getAgentMemoryDir(agentType, scope), SYNCED_JSON)
}
async function readJsonFile<T>(
path: string,
schema: z.ZodType<T>,
): Promise<T | null> {
try {
const content = await readFile(path, { encoding: 'utf-8' })
const result = schema.safeParse(jsonParse(content))
return result.success ? result.data : null
} catch {
return null
}
}
async function copySnapshotToLocal(
agentType: string,
scope: AgentMemoryScope,
): Promise<void> {
const snapshotMemDir = getSnapshotDirForAgent(agentType)
const localMemDir = getAgentMemoryDir(agentType, scope)
await mkdir(localMemDir, { recursive: true })
try {
const files = await readdir(snapshotMemDir, { withFileTypes: true })
for (const dirent of files) {
if (!dirent.isFile() || dirent.name === SNAPSHOT_JSON) continue
const content = await readFile(join(snapshotMemDir, dirent.name), {
encoding: 'utf-8',
})
await writeFile(join(localMemDir, dirent.name), content)
}
} catch (e) {
logForDebugging(`Failed to copy snapshot to local agent memory: ${e}`)
}
}
async function saveSyncedMeta(
agentType: string,
scope: AgentMemoryScope,
snapshotTimestamp: string,
): Promise<void> {
const syncedPath = getSyncedJsonPath(agentType, scope)
const localMemDir = getAgentMemoryDir(agentType, scope)
await mkdir(localMemDir, { recursive: true })
const meta: SyncedMeta = { syncedFrom: snapshotTimestamp }
try {
await writeFile(syncedPath, jsonStringify(meta))
} catch (e) {
logForDebugging(`Failed to save snapshot sync metadata: ${e}`)
}
}
/**
* Check if a snapshot exists and whether it's newer than what we last synced.
*/
export async function checkAgentMemorySnapshot(
agentType: string,
scope: AgentMemoryScope,
): Promise<{
action: 'none' | 'initialize' | 'prompt-update'
snapshotTimestamp?: string
}> {
const snapshotMeta = await readJsonFile(
getSnapshotJsonPath(agentType),
snapshotMetaSchema(),
)
if (!snapshotMeta) {
return { action: 'none' }
}
const localMemDir = getAgentMemoryDir(agentType, scope)
let hasLocalMemory = false
try {
const dirents = await readdir(localMemDir, { withFileTypes: true })
hasLocalMemory = dirents.some(d => d.isFile() && d.name.endsWith('.md'))
} catch {
// Directory doesn't exist
}
if (!hasLocalMemory) {
return { action: 'initialize', snapshotTimestamp: snapshotMeta.updatedAt }
}
const syncedMeta = await readJsonFile(
getSyncedJsonPath(agentType, scope),
syncedMetaSchema(),
)
if (
!syncedMeta ||
new Date(snapshotMeta.updatedAt) > new Date(syncedMeta.syncedFrom)
) {
return {
action: 'prompt-update',
snapshotTimestamp: snapshotMeta.updatedAt,
}
}
return { action: 'none' }
}
/**
* Initialize local agent memory from a snapshot (first-time setup).
*/
export async function initializeFromSnapshot(
agentType: string,
scope: AgentMemoryScope,
snapshotTimestamp: string,
): Promise<void> {
logForDebugging(
`Initializing agent memory for ${agentType} from project snapshot`,
)
await copySnapshotToLocal(agentType, scope)
await saveSyncedMeta(agentType, scope, snapshotTimestamp)
}
/**
* Replace local agent memory with the snapshot.
*/
export async function replaceFromSnapshot(
agentType: string,
scope: AgentMemoryScope,
snapshotTimestamp: string,
): Promise<void> {
logForDebugging(
`Replacing agent memory for ${agentType} with project snapshot`,
)
// Remove existing .md files before copying to avoid orphans
const localMemDir = getAgentMemoryDir(agentType, scope)
try {
const existing = await readdir(localMemDir, { withFileTypes: true })
for (const dirent of existing) {
if (dirent.isFile() && dirent.name.endsWith('.md')) {
await unlink(join(localMemDir, dirent.name))
}
}
} catch {
// Directory may not exist yet
}
await copySnapshotToLocal(agentType, scope)
await saveSyncedMeta(agentType, scope, snapshotTimestamp)
}
/**
* Mark the current snapshot as synced without changing local memory.
*/
export async function markSnapshotSynced(
agentType: string,
scope: AgentMemoryScope,
snapshotTimestamp: string,
): Promise<void> {
await saveSyncedMeta(agentType, scope, snapshotTimestamp)
}

View File

@@ -47,10 +47,6 @@ import {
setAgentColor, setAgentColor,
} from './agentColorManager.js' } from './agentColorManager.js'
import { type AgentMemoryScope, loadAgentMemoryPrompt } from './agentMemory.js' import { type AgentMemoryScope, loadAgentMemoryPrompt } from './agentMemory.js'
import {
checkAgentMemorySnapshot,
initializeFromSnapshot,
} from './agentMemorySnapshot.js'
import { getBuiltInAgents } from './builtInAgents.js' import { getBuiltInAgents } from './builtInAgents.js'
// Type for MCP server specification in agent definitions // Type for MCP server specification in agent definitions
@@ -255,41 +251,14 @@ export function filterAgentsByMcpRequirements(
} }
/** /**
* Check for and initialize agent memory from project snapshots. * Agent memory snapshot sync is disabled in this fork to avoid copying
* For agents with memory enabled, copies snapshot to local if no local memory exists. * project-scoped memory into persistent user/local agent memory.
* For agents with newer snapshots, logs a debug message (user prompt TODO).
*/ */
async function initializeAgentMemorySnapshots( async function initializeAgentMemorySnapshots(
agents: CustomAgentDefinition[], _agents: CustomAgentDefinition[],
): Promise<void> { ): Promise<void> {
await Promise.all( logForDebugging(
agents.map(async agent => { '[loadAgentsDir] Agent memory snapshot sync is disabled in this build',
if (agent.memory !== 'user') return
const result = await checkAgentMemorySnapshot(
agent.agentType,
agent.memory,
)
switch (result.action) {
case 'initialize':
logForDebugging(
`Initializing ${agent.agentType} memory from project snapshot`,
)
await initializeFromSnapshot(
agent.agentType,
agent.memory,
result.snapshotTimestamp!,
)
break
case 'prompt-update':
agent.pendingSnapshotUpdate = {
snapshotTimestamp: result.snapshotTimestamp!,
}
logForDebugging(
`Newer snapshot available for ${agent.agentType} memory (snapshot: ${result.snapshotTimestamp})`,
)
break
}
}),
) )
} }

View File

@@ -1,5 +1,4 @@
import { z } from 'zod/v4' import { z } from 'zod/v4'
import { getSessionId } from '../../bootstrap/state.js'
import { logEvent } from '../../services/analytics/index.js' import { logEvent } from '../../services/analytics/index.js'
import type { AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS } from '../../services/analytics/metadata.js' import type { AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS } from '../../services/analytics/metadata.js'
import type { Tool } from '../../Tool.js' import type { Tool } from '../../Tool.js'
@@ -159,7 +158,6 @@ export const TeamCreateTool: Tool<InputSchema, Output> = buildTool({
description: _description, description: _description,
createdAt: Date.now(), createdAt: Date.now(),
leadAgentId, leadAgentId,
leadSessionId: getSessionId(), // Store actual session ID for team discovery
members: [ members: [
{ {
agentId: leadAgentId, agentId: leadAgentId,
@@ -169,7 +167,6 @@ export const TeamCreateTool: Tool<InputSchema, Output> = buildTool({
joinedAt: Date.now(), joinedAt: Date.now(),
tmuxPaneId: '', tmuxPaneId: '',
cwd: getCwd(), cwd: getCwd(),
subscriptions: [],
}, },
], ],
} }

View File

@@ -497,13 +497,11 @@ async function handleSpawnSplitPane(
name: sanitizedName, name: sanitizedName,
agentType: agent_type, agentType: agent_type,
model, model,
prompt,
color: teammateColor, color: teammateColor,
planModeRequired: plan_mode_required, planModeRequired: plan_mode_required,
joinedAt: Date.now(), joinedAt: Date.now(),
tmuxPaneId: paneId, tmuxPaneId: paneId,
cwd: workingDir, cwd: workingDir,
subscriptions: [],
backendType: detectionResult.backend.type, backendType: detectionResult.backend.type,
}) })
await writeTeamFileAsync(teamName, teamFile) await writeTeamFileAsync(teamName, teamFile)
@@ -711,13 +709,11 @@ async function handleSpawnSeparateWindow(
name: sanitizedName, name: sanitizedName,
agentType: agent_type, agentType: agent_type,
model, model,
prompt,
color: teammateColor, color: teammateColor,
planModeRequired: plan_mode_required, planModeRequired: plan_mode_required,
joinedAt: Date.now(), joinedAt: Date.now(),
tmuxPaneId: paneId, tmuxPaneId: paneId,
cwd: workingDir, cwd: workingDir,
subscriptions: [],
backendType: 'tmux', // This handler always uses tmux directly backendType: 'tmux', // This handler always uses tmux directly
}) })
await writeTeamFileAsync(teamName, teamFile) await writeTeamFileAsync(teamName, teamFile)
@@ -997,13 +993,11 @@ async function handleSpawnInProcess(
name: sanitizedName, name: sanitizedName,
agentType: agent_type, agentType: agent_type,
model, model,
prompt,
color: teammateColor, color: teammateColor,
planModeRequired: plan_mode_required, planModeRequired: plan_mode_required,
joinedAt: Date.now(), joinedAt: Date.now(),
tmuxPaneId: 'in-process', tmuxPaneId: 'in-process',
cwd: getCwd(), cwd: getCwd(),
subscriptions: [],
backendType: 'in-process', backendType: 'in-process',
}) })
await writeTeamFileAsync(teamName, teamFile) await writeTeamFileAsync(teamName, teamFile)

View File

@@ -66,7 +66,7 @@ export type TeamFile = {
description?: string description?: string
createdAt: number createdAt: number
leadAgentId: string leadAgentId: string
leadSessionId?: string // Actual session UUID of the leader (for discovery) leadSessionId?: string // Legacy field; stripped from persisted configs
hiddenPaneIds?: string[] // Pane IDs that are currently hidden from the UI hiddenPaneIds?: string[] // Pane IDs that are currently hidden from the UI
teamAllowedPaths?: TeamAllowedPath[] // Paths all teammates can edit without asking teamAllowedPaths?: TeamAllowedPath[] // Paths all teammates can edit without asking
members: Array<{ members: Array<{
@@ -74,15 +74,15 @@ export type TeamFile = {
name: string name: string
agentType?: string agentType?: string
model?: string model?: string
prompt?: string prompt?: string // Legacy field; stripped from persisted configs
color?: string color?: string
planModeRequired?: boolean planModeRequired?: boolean
joinedAt: number joinedAt: number
tmuxPaneId: string tmuxPaneId: string
cwd: string cwd: string
worktreePath?: string worktreePath?: string
sessionId?: string sessionId?: string // Legacy field; stripped from persisted configs
subscriptions: string[] subscriptions?: string[] // Legacy field; stripped from persisted configs
backendType?: BackendType backendType?: BackendType
isActive?: boolean // false when idle, undefined/true when active isActive?: boolean // false when idle, undefined/true when active
mode?: PermissionMode // Current permission mode for this teammate mode?: PermissionMode // Current permission mode for this teammate
@@ -123,6 +123,42 @@ export function getTeamFilePath(teamName: string): string {
return join(getTeamDir(teamName), 'config.json') return join(getTeamDir(teamName), 'config.json')
} }
function sanitizeTeamFileForPersistence(teamFile: TeamFile): TeamFile {
return {
name: teamFile.name,
...(teamFile.description ? { description: teamFile.description } : {}),
createdAt: teamFile.createdAt,
leadAgentId: teamFile.leadAgentId,
...(teamFile.hiddenPaneIds && teamFile.hiddenPaneIds.length > 0
? { hiddenPaneIds: [...teamFile.hiddenPaneIds] }
: {}),
...(teamFile.teamAllowedPaths && teamFile.teamAllowedPaths.length > 0
? {
teamAllowedPaths: teamFile.teamAllowedPaths.map(path => ({
...path,
})),
}
: {}),
members: teamFile.members.map(member => ({
agentId: member.agentId,
name: member.name,
...(member.agentType ? { agentType: member.agentType } : {}),
...(member.model ? { model: member.model } : {}),
...(member.color ? { color: member.color } : {}),
...(member.planModeRequired !== undefined
? { planModeRequired: member.planModeRequired }
: {}),
joinedAt: member.joinedAt,
tmuxPaneId: member.tmuxPaneId,
cwd: member.cwd,
...(member.worktreePath ? { worktreePath: member.worktreePath } : {}),
...(member.backendType ? { backendType: member.backendType } : {}),
...(member.isActive !== undefined ? { isActive: member.isActive } : {}),
...(member.mode ? { mode: member.mode } : {}),
})),
}
}
/** /**
* Reads a team file by name (sync — for sync contexts like React render paths) * Reads a team file by name (sync — for sync contexts like React render paths)
* @internal Exported for team discovery UI * @internal Exported for team discovery UI
@@ -131,7 +167,7 @@ export function getTeamFilePath(teamName: string): string {
export function readTeamFile(teamName: string): TeamFile | null { export function readTeamFile(teamName: string): TeamFile | null {
try { try {
const content = readFileSync(getTeamFilePath(teamName), 'utf-8') const content = readFileSync(getTeamFilePath(teamName), 'utf-8')
return jsonParse(content) as TeamFile return sanitizeTeamFileForPersistence(jsonParse(content) as TeamFile)
} catch (e) { } catch (e) {
if (getErrnoCode(e) === 'ENOENT') return null if (getErrnoCode(e) === 'ENOENT') return null
logForDebugging( logForDebugging(
@@ -149,7 +185,7 @@ export async function readTeamFileAsync(
): Promise<TeamFile | null> { ): Promise<TeamFile | null> {
try { try {
const content = await readFile(getTeamFilePath(teamName), 'utf-8') const content = await readFile(getTeamFilePath(teamName), 'utf-8')
return jsonParse(content) as TeamFile return sanitizeTeamFileForPersistence(jsonParse(content) as TeamFile)
} catch (e) { } catch (e) {
if (getErrnoCode(e) === 'ENOENT') return null if (getErrnoCode(e) === 'ENOENT') return null
logForDebugging( logForDebugging(
@@ -166,7 +202,10 @@ export async function readTeamFileAsync(
function writeTeamFile(teamName: string, teamFile: TeamFile): void { function writeTeamFile(teamName: string, teamFile: TeamFile): void {
const teamDir = getTeamDir(teamName) const teamDir = getTeamDir(teamName)
mkdirSync(teamDir, { recursive: true }) mkdirSync(teamDir, { recursive: true })
writeFileSync(getTeamFilePath(teamName), jsonStringify(teamFile, null, 2)) writeFileSync(
getTeamFilePath(teamName),
jsonStringify(sanitizeTeamFileForPersistence(teamFile), null, 2),
)
} }
/** /**
@@ -178,7 +217,10 @@ export async function writeTeamFileAsync(
): Promise<void> { ): Promise<void> {
const teamDir = getTeamDir(teamName) const teamDir = getTeamDir(teamName)
await mkdir(teamDir, { recursive: true }) await mkdir(teamDir, { recursive: true })
await writeFile(getTeamFilePath(teamName), jsonStringify(teamFile, null, 2)) await writeFile(
getTeamFilePath(teamName),
jsonStringify(sanitizeTeamFileForPersistence(teamFile), null, 2),
)
} }
/** /**

View File

@@ -15,7 +15,6 @@ import { PermissionModeSchema } from '../entrypoints/sdk/coreSchemas.js'
import { SEND_MESSAGE_TOOL_NAME } from '../tools/SendMessageTool/constants.js' import { SEND_MESSAGE_TOOL_NAME } from '../tools/SendMessageTool/constants.js'
import type { Message } from '../types/message.js' import type { Message } from '../types/message.js'
import { generateRequestId } from './agentId.js' import { generateRequestId } from './agentId.js'
import { count } from './array.js'
import { logForDebugging } from './debug.js' import { logForDebugging } from './debug.js'
import { getTeamsDir } from './envUtils.js' import { getTeamsDir } from './envUtils.js'
import { getErrnoCode } from './errors.js' import { getErrnoCode } from './errors.js'
@@ -192,8 +191,8 @@ export async function writeToMailbox(
} }
/** /**
* Mark a specific message in a teammate's inbox as read by index * Remove a specific processed message from a teammate's inbox by index.
* Uses file locking to prevent race conditions * Uses file locking to prevent race conditions.
* @param agentName - The agent name to mark message as read for * @param agentName - The agent name to mark message as read for
* @param teamName - Optional team name * @param teamName - Optional team name
* @param messageIndex - Index of the message to mark as read * @param messageIndex - Index of the message to mark as read
@@ -242,11 +241,17 @@ export async function markMessageAsReadByIndex(
return return
} }
messages[messageIndex] = { ...message, read: true } const updatedMessages = messages.filter(
(currentMessage, index) => index !== messageIndex && !currentMessage.read,
)
await writeFile(inboxPath, jsonStringify(messages, null, 2), 'utf-8') await writeFile(
inboxPath,
jsonStringify(updatedMessages, null, 2),
'utf-8',
)
logForDebugging( logForDebugging(
`[TeammateMailbox] markMessageAsReadByIndex: marked message at index ${messageIndex} as read`, `[TeammateMailbox] markMessageAsReadByIndex: removed message at index ${messageIndex} from inbox`,
) )
} catch (error) { } catch (error) {
const code = getErrnoCode(error) const code = getErrnoCode(error)
@@ -270,77 +275,6 @@ export async function markMessageAsReadByIndex(
} }
} }
/**
* Mark all messages in a teammate's inbox as read
* Uses file locking to prevent race conditions
* @param agentName - The agent name to mark messages as read for
* @param teamName - Optional team name
*/
export async function markMessagesAsRead(
agentName: string,
teamName?: string,
): Promise<void> {
const inboxPath = getInboxPath(agentName, teamName)
logForDebugging(
`[TeammateMailbox] markMessagesAsRead called: agentName=${agentName}, teamName=${teamName}, path=${inboxPath}`,
)
const lockFilePath = `${inboxPath}.lock`
let release: (() => Promise<void>) | undefined
try {
logForDebugging(`[TeammateMailbox] markMessagesAsRead: acquiring lock...`)
release = await lockfile.lock(inboxPath, {
lockfilePath: lockFilePath,
...LOCK_OPTIONS,
})
logForDebugging(`[TeammateMailbox] markMessagesAsRead: lock acquired`)
// Re-read messages after acquiring lock to get the latest state
const messages = await readMailbox(agentName, teamName)
logForDebugging(
`[TeammateMailbox] markMessagesAsRead: read ${messages.length} messages after lock`,
)
if (messages.length === 0) {
logForDebugging(
`[TeammateMailbox] markMessagesAsRead: no messages to mark`,
)
return
}
const unreadCount = count(messages, m => !m.read)
logForDebugging(
`[TeammateMailbox] markMessagesAsRead: ${unreadCount} unread of ${messages.length} total`,
)
// messages comes from jsonParse — fresh, unshared objects safe to mutate
for (const m of messages) m.read = true
await writeFile(inboxPath, jsonStringify(messages, null, 2), 'utf-8')
logForDebugging(
`[TeammateMailbox] markMessagesAsRead: WROTE ${unreadCount} message(s) as read to ${inboxPath}`,
)
} catch (error) {
const code = getErrnoCode(error)
if (code === 'ENOENT') {
logForDebugging(
`[TeammateMailbox] markMessagesAsRead: file does not exist at ${inboxPath}`,
)
return
}
logForDebugging(
`[TeammateMailbox] markMessagesAsRead FAILED for ${agentName}: ${error}`,
)
logError(error)
} finally {
if (release) {
await release()
logForDebugging(`[TeammateMailbox] markMessagesAsRead: lock released`)
}
}
}
/** /**
* Clear a teammate's inbox (delete all messages) * Clear a teammate's inbox (delete all messages)
* @param agentName - The agent name to clear inbox for * @param agentName - The agent name to clear inbox for
@@ -1095,8 +1029,8 @@ export function isStructuredProtocolMessage(messageText: string): boolean {
} }
/** /**
* Marks only messages matching a predicate as read, leaving others unread. * Removes only messages matching a predicate, leaving the rest unread.
* Uses the same file-locking mechanism as markMessagesAsRead. * Uses the same file-locking mechanism as the other mailbox update helpers.
*/ */
export async function markMessagesAsReadByPredicate( export async function markMessagesAsReadByPredicate(
agentName: string, agentName: string,
@@ -1119,8 +1053,8 @@ export async function markMessagesAsReadByPredicate(
return return
} }
const updatedMessages = messages.map(m => const updatedMessages = messages.filter(
!m.read && predicate(m) ? { ...m, read: true } : m, m => !m.read && !predicate(m),
) )
await writeFile(inboxPath, jsonStringify(updatedMessages, null, 2), 'utf-8') await writeFile(inboxPath, jsonStringify(updatedMessages, null, 2), 'utf-8')