import { feature } from 'bun:bundle' import type { Base64ImageSource, ContentBlockParam, MessageParam, } from '@anthropic-ai/sdk/resources/index.mjs' import { Client } from '@modelcontextprotocol/sdk/client/index.js' import { SSEClientTransport, type SSEClientTransportOptions, } from '@modelcontextprotocol/sdk/client/sse.js' import { StdioClientTransport } from '@modelcontextprotocol/sdk/client/stdio.js' import { StreamableHTTPClientTransport, type StreamableHTTPClientTransportOptions, } from '@modelcontextprotocol/sdk/client/streamableHttp.js' import { createFetchWithInit, type FetchLike, type Transport, } from '@modelcontextprotocol/sdk/shared/transport.js' import { CallToolResultSchema, ElicitRequestSchema, type ElicitRequestURLParams, type ElicitResult, ErrorCode, type JSONRPCMessage, type ListPromptsResult, ListPromptsResultSchema, ListResourcesResultSchema, ListRootsRequestSchema, type ListToolsResult, ListToolsResultSchema, McpError, type PromptMessage, type ResourceLink, } from '@modelcontextprotocol/sdk/types.js' import mapValues from 'lodash-es/mapValues.js' import memoize from 'lodash-es/memoize.js' import zipObject from 'lodash-es/zipObject.js' import pMap from 'p-map' import { getOriginalCwd, getSessionId } from '../../bootstrap/state.js' import type { Command } from '../../commands.js' import { getOauthConfig } from '../../constants/oauth.js' import { PRODUCT_URL } from '../../constants/product.js' import type { AppState } from '../../state/AppState.js' import { type Tool, type ToolCallProgress, toolMatchesName, } from '../../Tool.js' import { ListMcpResourcesTool } from '../../tools/ListMcpResourcesTool/ListMcpResourcesTool.js' import { type MCPProgress, MCPTool } from '../../tools/MCPTool/MCPTool.js' import { createMcpAuthTool } from '../../tools/McpAuthTool/McpAuthTool.js' import { ReadMcpResourceTool } from '../../tools/ReadMcpResourceTool/ReadMcpResourceTool.js' import { createAbortController } from '../../utils/abortController.js' import { count } from '../../utils/array.js' import { checkAndRefreshOAuthTokenIfNeeded, getClaudeAIOAuthTokens, handleOAuth401Error, } from '../../utils/auth.js' import { registerCleanup } from '../../utils/cleanupRegistry.js' import { detectCodeIndexingFromMcpServerName } from '../../utils/codeIndexing.js' import { logForDebugging } from '../../utils/debug.js' import { isEnvDefinedFalsy, isEnvTruthy } from '../../utils/envUtils.js' import { errorMessage, TelemetrySafeError_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, } from '../../utils/errors.js' import { getMCPUserAgent } from '../../utils/http.js' import { maybeNotifyIDEConnected } from '../../utils/ide.js' import { maybeResizeAndDownsampleImageBuffer } from '../../utils/imageResizer.js' import { logMCPDebug, logMCPError } from '../../utils/log.js' import { getBinaryBlobSavedMessage, getFormatDescription, getLargeOutputInstructions, persistBinaryContent, } from '../../utils/mcpOutputStorage.js' import { getContentSizeEstimate, type MCPToolResult, mcpContentNeedsTruncation, truncateMcpContentIfNeeded, } from '../../utils/mcpValidation.js' import { WebSocketTransport } from '../../utils/mcpWebSocketTransport.js' import { memoizeWithLRU } from '../../utils/memoize.js' import { getWebSocketTLSOptions } from '../../utils/mtls.js' import { getProxyFetchOptions, getWebSocketProxyAgent, getWebSocketProxyUrl, } from '../../utils/proxy.js' import { recursivelySanitizeUnicode } from '../../utils/sanitization.js' import { getSessionIngressAuthToken } from '../../utils/sessionIngressAuth.js' import { subprocessEnv } from '../../utils/subprocessEnv.js' import { isPersistError, persistToolResult, } from '../../utils/toolResultStorage.js' import { type AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, logEvent, } from '../analytics/index.js' import { type ElicitationWaitingState, runElicitationHooks, runElicitationResultHooks, } from './elicitationHandler.js' import { buildMcpToolName } from './mcpStringUtils.js' import { normalizeNameForMCP } from './normalization.js' import { getLoggingSafeMcpBaseUrl } from './utils.js' /* eslint-disable @typescript-eslint/no-require-imports */ const fetchMcpSkillsForClient = feature('MCP_SKILLS') ? ( require('../../skills/mcpSkills.js') as typeof import('../../skills/mcpSkills.js') ).fetchMcpSkillsForClient : null import { UnauthorizedError } from '@modelcontextprotocol/sdk/client/auth.js' import type { AssistantMessage } from 'src/types/message.js' /* eslint-enable @typescript-eslint/no-require-imports */ import { classifyMcpToolForCollapse } from '../../tools/MCPTool/classifyForCollapse.js' import { clearKeychainCache } from '../../utils/secureStorage/macOsKeychainHelpers.js' import { sleep } from '../../utils/sleep.js' import { ClaudeAuthProvider, hasMcpDiscoveryButNoToken, wrapFetchWithStepUpDetection, } from './auth.js' import { markClaudeAiMcpConnected } from './claudeai.js' import { getAllMcpConfigs, isMcpServerDisabled } from './config.js' import { getMcpServerHeaders } from './headersHelper.js' import { SdkControlClientTransport } from './SdkControlTransport.js' import type { ConnectedMCPServer, MCPServerConnection, McpSdkServerConfig, ScopedMcpServerConfig, ServerResource, } from './types.js' /** * Custom error class to indicate that an MCP tool call failed due to * authentication issues (e.g., expired OAuth token returning 401). * This error should be caught at the tool execution layer to update * the client's status to 'needs-auth'. */ export class McpAuthError extends Error { serverName: string constructor(serverName: string, message: string) { super(message) this.name = 'McpAuthError' this.serverName = serverName } } /** * Thrown when an MCP session has expired and the connection cache has been cleared. * The caller should get a fresh client via ensureConnectedClient and retry. */ class McpSessionExpiredError extends Error { constructor(serverName: string) { super(`MCP server "${serverName}" session expired`) this.name = 'McpSessionExpiredError' } } /** * Thrown when an MCP tool returns `isError: true`. Carries the result's `_meta` * so SDK consumers can still receive it — per the MCP spec, `_meta` is on the * base Result type and is valid on error results. */ export class McpToolCallError_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS extends TelemetrySafeError_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS { constructor( message: string, telemetryMessage: string, readonly mcpMeta?: { _meta?: Record }, ) { super(message, telemetryMessage) this.name = 'McpToolCallError' } } /** * Detects whether an error is an MCP "Session not found" error (HTTP 404 + JSON-RPC code -32001). * Per the MCP spec, servers return 404 when a session ID is no longer valid. * We check both signals to avoid false positives from generic 404s (wrong URL, server gone, etc.). */ export function isMcpSessionExpiredError(error: Error): boolean { const httpStatus = 'code' in error ? (error as Error & { code?: number }).code : undefined if (httpStatus !== 404) { return false } // The SDK embeds the response body text in the error message. // MCP servers return: {"error":{"code":-32001,"message":"Session not found"},...} // Check for the JSON-RPC error code to distinguish from generic web server 404s. return ( error.message.includes('"code":-32001') || error.message.includes('"code": -32001') ) } /** * Default timeout for MCP tool calls (effectively infinite - ~27.8 hours). */ const DEFAULT_MCP_TOOL_TIMEOUT_MS = 100_000_000 /** * Cap on MCP tool descriptions and server instructions sent to the model. * OpenAPI-generated MCP servers have been observed dumping 15-60KB of endpoint * docs into tool.description; this caps the p95 tail without losing the intent. */ const MAX_MCP_DESCRIPTION_LENGTH = 2048 /** * Gets the timeout for MCP tool calls in milliseconds. * Uses MCP_TOOL_TIMEOUT environment variable if set, otherwise defaults to ~27.8 hours. */ function getMcpToolTimeoutMs(): number { return ( parseInt(process.env.MCP_TOOL_TIMEOUT || '', 10) || DEFAULT_MCP_TOOL_TIMEOUT_MS ) } import { isClaudeInChromeMCPServer } from '../../utils/claudeInChrome/common.js' // Lazy: toolRendering.tsx pulls React/ink; only needed when Claude-in-Chrome MCP server is connected /* eslint-disable @typescript-eslint/no-require-imports */ const claudeInChromeToolRendering = (): typeof import('../../utils/claudeInChrome/toolRendering.js') => require('../../utils/claudeInChrome/toolRendering.js') // Lazy: wrapper.tsx → hostAdapter.ts → executor.ts pulls both native modules // (@ant/computer-use-input + @ant/computer-use-swift). Runtime-gated by // GrowthBook tengu_malort_pedway (see gates.ts). const computerUseWrapper = feature('CHICAGO_MCP') ? (): typeof import('../../utils/computerUse/wrapper.js') => require('../../utils/computerUse/wrapper.js') : undefined const isComputerUseMCPServer = feature('CHICAGO_MCP') ? ( require('../../utils/computerUse/common.js') as typeof import('../../utils/computerUse/common.js') ).isComputerUseMCPServer : undefined import { mkdir, readFile, unlink, writeFile } from 'fs/promises' import { dirname, join } from 'path' import { getClaudeConfigHomeDir } from '../../utils/envUtils.js' /* eslint-enable @typescript-eslint/no-require-imports */ import { jsonParse, jsonStringify } from '../../utils/slowOperations.js' const MCP_AUTH_CACHE_TTL_MS = 15 * 60 * 1000 // 15 min type McpAuthCacheData = Record function getMcpAuthCachePath(): string { return join(getClaudeConfigHomeDir(), 'mcp-needs-auth-cache.json') } // Memoized so N concurrent isMcpAuthCached() calls during batched connection // share a single file read instead of N reads of the same file. Invalidated // on write (setMcpAuthCacheEntry) and clear (clearMcpAuthCache). Not using // lodash memoize because we need to null out the cache, not delete by key. let authCachePromise: Promise | null = null function getMcpAuthCache(): Promise { if (!authCachePromise) { authCachePromise = readFile(getMcpAuthCachePath(), 'utf-8') .then(data => jsonParse(data) as McpAuthCacheData) .catch(() => ({})) } return authCachePromise } async function isMcpAuthCached(serverId: string): Promise { const cache = await getMcpAuthCache() const entry = cache[serverId] if (!entry) { return false } return Date.now() - entry.timestamp < MCP_AUTH_CACHE_TTL_MS } // Serialize cache writes through a promise chain to prevent concurrent // read-modify-write races when multiple servers return 401 in the same batch let writeChain = Promise.resolve() function setMcpAuthCacheEntry(serverId: string): void { writeChain = writeChain .then(async () => { const cache = await getMcpAuthCache() cache[serverId] = { timestamp: Date.now() } const cachePath = getMcpAuthCachePath() await mkdir(dirname(cachePath), { recursive: true }) await writeFile(cachePath, jsonStringify(cache)) // Invalidate the read cache so subsequent reads see the new entry. // Safe because writeChain serializes writes: the next write's // getMcpAuthCache() call will re-read the file with this entry present. authCachePromise = null }) .catch(() => { // Best-effort cache write }) } export function clearMcpAuthCache(): void { authCachePromise = null void unlink(getMcpAuthCachePath()).catch(() => { // Cache file may not exist }) } /** * Spread-ready analytics field for the server's base URL. Calls * getLoggingSafeMcpBaseUrl once (not twice like the inline ternary it replaces). * Typed as AnalyticsMetadata since the URL is query-stripped and safe to log. */ function mcpBaseUrlAnalytics(serverRef: ScopedMcpServerConfig): { mcpServerBaseUrl?: AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS } { const url = getLoggingSafeMcpBaseUrl(serverRef) return url ? { mcpServerBaseUrl: url as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, } : {} } function mcpBaseUrlForDebug(serverRef: ScopedMcpServerConfig): string { return getLoggingSafeMcpBaseUrl(serverRef) || '[unavailable]' } function summarizeHeadersForDebug( headers: Record | undefined, ): { headerCount: number headerNames: string[] hasAuthorization: boolean } { if (!headers) { return { headerCount: 0, headerNames: [], hasAuthorization: false, } } const headerNames = Object.keys(headers) return { headerCount: headerNames.length, headerNames: headerNames.sort(), hasAuthorization: headerNames.some( headerName => headerName.toLowerCase() === 'authorization', ), } } function summarizeProxyEnvForDebug(): Record { return { hasNodeOptions: Boolean(process.env.NODE_OPTIONS), uvThreadpoolSizeConfigured: Boolean(process.env.UV_THREADPOOL_SIZE), hasHttpProxy: Boolean(process.env.HTTP_PROXY), hasHttpsProxy: Boolean(process.env.HTTPS_PROXY), hasNoProxy: Boolean(process.env.NO_PROXY), } } function summarizeStderrForDebug(stderrOutput: string): string { const trimmed = stderrOutput.trim() const lineCount = trimmed === '' ? 0 : trimmed.split('\n').length return `Server stderr captured (${trimmed.length} chars, ${lineCount} lines)` } function summarizeMcpErrorForDebug(error: unknown): string { const summary: Record = {} if (error instanceof Error) { summary.errorType = error.constructor.name summary.errorName = error.name summary.hasMessage = error.message.length > 0 summary.hasStack = Boolean(error.stack) const errorObj = error as Error & { code?: unknown errno?: unknown syscall?: unknown status?: unknown cause?: unknown } if (typeof errorObj.code === 'string' || typeof errorObj.code === 'number') { summary.code = errorObj.code } if ( typeof errorObj.errno === 'string' || typeof errorObj.errno === 'number' ) { summary.errno = errorObj.errno } if (typeof errorObj.syscall === 'string') { summary.syscall = errorObj.syscall } if (typeof errorObj.status === 'number') { summary.status = errorObj.status } if (errorObj.cause !== undefined) { summary.hasCause = true } } else { summary.errorType = typeof error summary.hasValue = error !== undefined && error !== null } return jsonStringify(summary) } /** * Shared handler for sse/http/claudeai-proxy auth failures during connect: * emits tengu_mcp_server_needs_auth, caches the needs-auth entry, and returns * the needs-auth connection result. */ function handleRemoteAuthFailure( name: string, serverRef: ScopedMcpServerConfig, transportType: 'sse' | 'http' | 'claudeai-proxy', ): MCPServerConnection { logEvent('tengu_mcp_server_needs_auth', { transportType: transportType as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, ...mcpBaseUrlAnalytics(serverRef), }) const label: Record = { sse: 'SSE', http: 'HTTP', 'claudeai-proxy': 'claude.ai proxy', } logMCPDebug( name, `Authentication required for ${label[transportType]} server`, ) setMcpAuthCacheEntry(name) return { name, type: 'needs-auth', config: serverRef } } /** * Fetch wrapper for claude.ai proxy connections. Attaches the OAuth bearer * token and retries once on 401 via handleOAuth401Error (force-refresh). * * The Anthropic API path has this retry (withRetry.ts, grove.ts) to handle * memoize-cache staleness and clock drift. Without the same here, a single * stale token mass-401s every claude.ai connector and sticks them all in the * 15-min needs-auth cache. */ export function createClaudeAiProxyFetch(innerFetch: FetchLike): FetchLike { return async (url, init) => { const doRequest = async () => { await checkAndRefreshOAuthTokenIfNeeded() const currentTokens = getClaudeAIOAuthTokens() if (!currentTokens) { throw new Error('No claude.ai OAuth token available') } // eslint-disable-next-line eslint-plugin-n/no-unsupported-features/node-builtins const headers = new Headers(init?.headers) headers.set('Authorization', `Bearer ${currentTokens.accessToken}`) const response = await innerFetch(url, { ...init, headers }) // Return the exact token that was sent. Reading getClaudeAIOAuthTokens() // again after the request is wrong under concurrent 401s: another // connector's handleOAuth401Error clears the memoize cache, so we'd read // the NEW token from keychain, pass it to handleOAuth401Error, which // finds same-as-keychain → returns false → skips retry. Same pattern as // bridgeApi.ts withOAuthRetry (token passed as fn param). return { response, sentToken: currentTokens.accessToken } } const { response, sentToken } = await doRequest() if (response.status !== 401) { return response } // handleOAuth401Error returns true only if the token actually changed // (keychain had a newer one, or force-refresh succeeded). Gate retry on // that — otherwise we double round-trip time for every connector whose // downstream service genuinely needs auth (the common case: 30+ servers // with "MCP server requires authentication but no OAuth token configured"). const tokenChanged = await handleOAuth401Error(sentToken).catch(() => false) logEvent('tengu_mcp_claudeai_proxy_401', { tokenChanged: tokenChanged as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, }) if (!tokenChanged) { // ELOCKED contention: another connector may have won the lockfile and refreshed — check if token changed underneath us const now = getClaudeAIOAuthTokens()?.accessToken if (!now || now === sentToken) { return response } } try { return (await doRequest()).response } catch { // Retry itself failed (network error). Return the original 401 so the // outer handler can classify it. return response } } } // Minimal interface for WebSocket instances passed to mcpWebSocketTransport type WsClientLike = { readonly readyState: number close(): void send(data: string): void } /** * Create a ws.WebSocket client with the MCP protocol. * Bun's ws shim types lack the 3-arg constructor (url, protocols, options) * that the real ws package supports, so we cast the constructor here. */ async function createNodeWsClient( url: string, options: Record, ): Promise { const wsModule = await import('ws') const WS = wsModule.default as unknown as new ( url: string, protocols: string[], options: Record, ) => WsClientLike return new WS(url, ['mcp'], options) } const IMAGE_MIME_TYPES = new Set([ 'image/jpeg', 'image/png', 'image/gif', 'image/webp', ]) function getConnectionTimeoutMs(): number { return parseInt(process.env.MCP_TIMEOUT || '', 10) || 30000 } /** * Default timeout for individual MCP requests (auth, tool calls, etc.) */ const MCP_REQUEST_TIMEOUT_MS = 60000 /** * MCP Streamable HTTP spec requires clients to advertise acceptance of both * JSON and SSE on every POST. Servers that enforce this strictly reject * requests without it (HTTP 406). * https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#sending-messages-to-the-server */ const MCP_STREAMABLE_HTTP_ACCEPT = 'application/json, text/event-stream' /** * Wraps a fetch function to apply a fresh timeout signal to each request. * This avoids the bug where a single AbortSignal.timeout() created at connection * time becomes stale after 60 seconds, causing all subsequent requests to fail * immediately with "The operation timed out." Uses a 60-second timeout. * * Also ensures the Accept header required by the MCP Streamable HTTP spec is * present on POSTs. The MCP SDK sets this inside StreamableHTTPClientTransport.send(), * but it is attached to a Headers instance that passes through an object spread here, * and some runtimes/agents have been observed dropping it before it reaches the wire. * See https://github.com/anthropics/claude-agent-sdk-typescript/issues/202. * Normalizing here (the last wrapper before fetch()) guarantees it is sent. * * GET requests are excluded from the timeout since, for MCP transports, they are * long-lived SSE streams meant to stay open indefinitely. (Auth-related GETs use * a separate fetch wrapper with its own timeout in auth.ts.) * * @param baseFetch - The fetch function to wrap */ export function wrapFetchWithTimeout(baseFetch: FetchLike): FetchLike { return async (url: string | URL, init?: RequestInit) => { const method = (init?.method ?? 'GET').toUpperCase() // Skip timeout for GET requests - in MCP transports, these are long-lived SSE streams. // (OAuth discovery GETs in auth.ts use a separate createAuthFetch() with its own timeout.) if (method === 'GET') { return baseFetch(url, init) } // Normalize headers and guarantee the Streamable-HTTP Accept value. new Headers() // accepts HeadersInit | undefined and copies from plain objects, tuple arrays, // and existing Headers instances — so whatever shape the SDK handed us, the // Accept value survives the spread below as an own property of a concrete object. // eslint-disable-next-line eslint-plugin-n/no-unsupported-features/node-builtins const headers = new Headers(init?.headers) if (!headers.has('accept')) { headers.set('accept', MCP_STREAMABLE_HTTP_ACCEPT) } // Use setTimeout instead of AbortSignal.timeout() so we can clearTimeout on // completion. AbortSignal.timeout's internal timer is only released when the // signal is GC'd, which in Bun is lazy — ~2.4KB of native memory per request // lingers for the full 60s even when the request completes in milliseconds. const controller = new AbortController() const timer = setTimeout( c => c.abort(new DOMException('The operation timed out.', 'TimeoutError')), MCP_REQUEST_TIMEOUT_MS, controller, ) timer.unref?.() const parentSignal = init?.signal const abort = () => controller.abort(parentSignal?.reason) parentSignal?.addEventListener('abort', abort) if (parentSignal?.aborted) { controller.abort(parentSignal.reason) } const cleanup = () => { clearTimeout(timer) parentSignal?.removeEventListener('abort', abort) } try { const response = await baseFetch(url, { ...init, headers, signal: controller.signal, }) cleanup() return response } catch (error) { cleanup() throw error } } } export function getMcpServerConnectionBatchSize(): number { return parseInt(process.env.MCP_SERVER_CONNECTION_BATCH_SIZE || '', 10) || 3 } function getRemoteMcpServerConnectionBatchSize(): number { return ( parseInt(process.env.MCP_REMOTE_SERVER_CONNECTION_BATCH_SIZE || '', 10) || 20 ) } function isLocalMcpServer(config: ScopedMcpServerConfig): boolean { return !config.type || config.type === 'stdio' || config.type === 'sdk' } // For the IDE MCP servers, we only include specific tools const ALLOWED_IDE_TOOLS = ['mcp__ide__executeCode', 'mcp__ide__getDiagnostics'] function isIncludedMcpTool(tool: Tool): boolean { return ( !tool.name.startsWith('mcp__ide__') || ALLOWED_IDE_TOOLS.includes(tool.name) ) } /** * Generates the cache key for a server connection * @param name Server name * @param serverRef Server configuration * @returns Cache key string */ export function getServerCacheKey( name: string, serverRef: ScopedMcpServerConfig, ): string { return `${name}-${jsonStringify(serverRef)}` } /** * TODO (ollie): The memoization here increases complexity by a lot, and im not sure it really improves performance * Attempts to connect to a single MCP server * @param name Server name * @param serverRef Scoped server configuration * @returns A wrapped client (either connected or failed) */ export const connectToServer = memoize( async ( name: string, serverRef: ScopedMcpServerConfig, serverStats?: { totalServers: number stdioCount: number sseCount: number httpCount: number sseIdeCount: number wsIdeCount: number }, ): Promise => { const connectStartTime = Date.now() let inProcessServer: | { connect(t: Transport): Promise; close(): Promise } | undefined try { let transport // If we have the session ingress JWT, we will connect via the session ingress rather than // to remote MCP's directly. const sessionIngressToken = getSessionIngressAuthToken() if (serverRef.type === 'sse') { // Create an auth provider for this server const authProvider = new ClaudeAuthProvider(name, serverRef) // Get combined headers (static + dynamic) const combinedHeaders = await getMcpServerHeaders(name, serverRef) // Use the auth provider with SSEClientTransport const transportOptions: SSEClientTransportOptions = { authProvider, // Use fresh timeout per request to avoid stale AbortSignal bug. // Step-up detection wraps innermost so the 403 is seen before the // SDK's handler calls auth() → tokens(). fetch: wrapFetchWithTimeout( wrapFetchWithStepUpDetection(createFetchWithInit(), authProvider), ), requestInit: { headers: { 'User-Agent': getMCPUserAgent(), ...combinedHeaders, }, }, } // IMPORTANT: Always set eventSourceInit with a fetch that does NOT use the // timeout wrapper. The EventSource connection is long-lived (stays open indefinitely // to receive server-sent events), so applying a 60-second timeout would kill it. // The timeout is only meant for individual API requests (POST, auth refresh), not // the persistent SSE stream. transportOptions.eventSourceInit = { fetch: async (url: string | URL, init?: RequestInit) => { // Get auth headers from the auth provider const authHeaders: Record = {} const tokens = await authProvider.tokens() if (tokens) { authHeaders.Authorization = `Bearer ${tokens.access_token}` } const proxyOptions = getProxyFetchOptions() // eslint-disable-next-line eslint-plugin-n/no-unsupported-features/node-builtins return fetch(url, { ...init, ...proxyOptions, headers: { 'User-Agent': getMCPUserAgent(), ...authHeaders, ...init?.headers, ...combinedHeaders, Accept: 'text/event-stream', }, }) }, } transport = new SSEClientTransport( new URL(serverRef.url), transportOptions, ) logMCPDebug(name, `SSE transport initialized, awaiting connection`) } else if (serverRef.type === 'sse-ide') { logMCPDebug( name, `Setting up SSE-IDE transport to ${mcpBaseUrlForDebug(serverRef)}`, ) // IDE servers don't need authentication // TODO: Use the auth token provided in the lockfile const proxyOptions = getProxyFetchOptions() const transportOptions: SSEClientTransportOptions = proxyOptions.dispatcher ? { eventSourceInit: { fetch: async (url: string | URL, init?: RequestInit) => { // eslint-disable-next-line eslint-plugin-n/no-unsupported-features/node-builtins return fetch(url, { ...init, ...proxyOptions, headers: { 'User-Agent': getMCPUserAgent(), ...init?.headers, }, }) }, }, } : {} transport = new SSEClientTransport( new URL(serverRef.url), Object.keys(transportOptions).length > 0 ? transportOptions : undefined, ) } else if (serverRef.type === 'ws-ide') { const tlsOptions = getWebSocketTLSOptions() const wsHeaders = { 'User-Agent': getMCPUserAgent(), ...(serverRef.authToken && { 'X-Claude-Code-Ide-Authorization': serverRef.authToken, }), } let wsClient: WsClientLike if (typeof Bun !== 'undefined') { // Bun's WebSocket supports headers/proxy/tls options but the DOM typings don't // eslint-disable-next-line eslint-plugin-n/no-unsupported-features/node-builtins wsClient = new globalThis.WebSocket(serverRef.url, { protocols: ['mcp'], headers: wsHeaders, proxy: getWebSocketProxyUrl(serverRef.url), tls: tlsOptions || undefined, } as unknown as string[]) } else { wsClient = await createNodeWsClient(serverRef.url, { headers: wsHeaders, agent: getWebSocketProxyAgent(serverRef.url), ...(tlsOptions || {}), }) } transport = new WebSocketTransport(wsClient) } else if (serverRef.type === 'ws') { logMCPDebug( name, `Initializing WebSocket transport to ${mcpBaseUrlForDebug(serverRef)}`, ) const combinedHeaders = await getMcpServerHeaders(name, serverRef) const tlsOptions = getWebSocketTLSOptions() const wsHeaders = { 'User-Agent': getMCPUserAgent(), ...(sessionIngressToken && { Authorization: `Bearer ${sessionIngressToken}`, }), ...combinedHeaders, } const wsHeadersForLogging = summarizeHeadersForDebug( mapValues(wsHeaders, (_value, key) => key.toLowerCase() === 'authorization' ? '[REDACTED]' : '[set]', ), ) logMCPDebug( name, `WebSocket transport options: ${jsonStringify({ url: mcpBaseUrlForDebug(serverRef), ...wsHeadersForLogging, hasSessionAuth: !!sessionIngressToken, })}`, ) let wsClient: WsClientLike if (typeof Bun !== 'undefined') { // Bun's WebSocket supports headers/proxy/tls options but the DOM typings don't // eslint-disable-next-line eslint-plugin-n/no-unsupported-features/node-builtins wsClient = new globalThis.WebSocket(serverRef.url, { protocols: ['mcp'], headers: wsHeaders, proxy: getWebSocketProxyUrl(serverRef.url), tls: tlsOptions || undefined, } as unknown as string[]) } else { wsClient = await createNodeWsClient(serverRef.url, { headers: wsHeaders, agent: getWebSocketProxyAgent(serverRef.url), ...(tlsOptions || {}), }) } transport = new WebSocketTransport(wsClient) } else if (serverRef.type === 'http') { logMCPDebug( name, `Initializing HTTP transport to ${mcpBaseUrlForDebug(serverRef)}`, ) logMCPDebug( name, `Node version: ${process.version}, Platform: ${process.platform}`, ) logMCPDebug( name, `Environment: ${jsonStringify(summarizeProxyEnvForDebug())}`, ) // Create an auth provider for this server const authProvider = new ClaudeAuthProvider(name, serverRef) // Get combined headers (static + dynamic) const combinedHeaders = await getMcpServerHeaders(name, serverRef) // Check if this server has stored OAuth tokens. If so, the SDK's // authProvider will set Authorization — don't override with the // session ingress token (SDK merges requestInit AFTER authProvider). // CCR proxy URLs (ccr_shttp_mcp) have no stored OAuth, so they still // get the ingress token. See PR #24454 discussion. const hasOAuthTokens = !!(await authProvider.tokens()) // Use the auth provider with StreamableHTTPClientTransport const proxyOptions = getProxyFetchOptions() logMCPDebug( name, `Proxy options: ${proxyOptions.dispatcher ? 'custom dispatcher' : 'default'}`, ) const transportOptions: StreamableHTTPClientTransportOptions = { authProvider, // Use fresh timeout per request to avoid stale AbortSignal bug. // Step-up detection wraps innermost so the 403 is seen before the // SDK's handler calls auth() → tokens(). fetch: wrapFetchWithTimeout( wrapFetchWithStepUpDetection(createFetchWithInit(), authProvider), ), requestInit: { ...proxyOptions, headers: { 'User-Agent': getMCPUserAgent(), ...(sessionIngressToken && !hasOAuthTokens && { Authorization: `Bearer ${sessionIngressToken}`, }), ...combinedHeaders, }, }, } // Redact sensitive headers before logging const headersForLogging = transportOptions.requestInit?.headers ? mapValues( transportOptions.requestInit.headers as Record, (_value, key) => key.toLowerCase() === 'authorization' ? '[REDACTED]' : '[set]', ) : undefined logMCPDebug( name, `HTTP transport options: ${jsonStringify({ url: mcpBaseUrlForDebug(serverRef), ...summarizeHeadersForDebug(headersForLogging), hasAuthProvider: !!authProvider, timeoutMs: MCP_REQUEST_TIMEOUT_MS, })}`, ) transport = new StreamableHTTPClientTransport( new URL(serverRef.url), transportOptions, ) logMCPDebug(name, `HTTP transport created successfully`) } else if (serverRef.type === 'sdk') { throw new Error('SDK servers should be handled in print.ts') } else if (serverRef.type === 'claudeai-proxy') { logMCPDebug( name, `Initializing claude.ai proxy transport for server ${serverRef.id}`, ) const tokens = getClaudeAIOAuthTokens() if (!tokens) { throw new Error('No claude.ai OAuth token found') } const oauthConfig = getOauthConfig() const proxyUrl = `${oauthConfig.MCP_PROXY_URL}${oauthConfig.MCP_PROXY_PATH.replace('{server_id}', serverRef.id)}` logMCPDebug(name, `Using claude.ai proxy transport`) // eslint-disable-next-line eslint-plugin-n/no-unsupported-features/node-builtins const fetchWithAuth = createClaudeAiProxyFetch(globalThis.fetch) const proxyOptions = getProxyFetchOptions() const transportOptions: StreamableHTTPClientTransportOptions = { // Wrap fetchWithAuth with fresh timeout per request fetch: wrapFetchWithTimeout(fetchWithAuth), requestInit: { ...proxyOptions, headers: { 'User-Agent': getMCPUserAgent(), 'X-Mcp-Client-Session-Id': getSessionId(), }, }, } transport = new StreamableHTTPClientTransport( new URL(proxyUrl), transportOptions, ) logMCPDebug(name, `claude.ai proxy transport created successfully`) } else if ( (serverRef.type === 'stdio' || !serverRef.type) && isClaudeInChromeMCPServer(name) ) { // Run the Chrome MCP server in-process to avoid spawning a ~325 MB subprocess const { createChromeContext } = await import( '../../utils/claudeInChrome/mcpServer.js' ) const { createClaudeForChromeMcpServer } = await import( '@ant/claude-for-chrome-mcp' ) const { createLinkedTransportPair } = await import( './InProcessTransport.js' ) const context = createChromeContext(serverRef.env) inProcessServer = createClaudeForChromeMcpServer(context) const [clientTransport, serverTransport] = createLinkedTransportPair() await inProcessServer.connect(serverTransport) transport = clientTransport logMCPDebug(name, `In-process Chrome MCP server started`) } else if ( feature('CHICAGO_MCP') && (serverRef.type === 'stdio' || !serverRef.type) && isComputerUseMCPServer!(name) ) { // Run the Computer Use MCP server in-process — same rationale as // Chrome above. The package's CallTool handler is a stub; real // dispatch goes through wrapper.tsx's .call() override. const { createComputerUseMcpServerForCli } = await import( '../../utils/computerUse/mcpServer.js' ) const { createLinkedTransportPair } = await import( './InProcessTransport.js' ) inProcessServer = await createComputerUseMcpServerForCli() const [clientTransport, serverTransport] = createLinkedTransportPair() await inProcessServer.connect(serverTransport) transport = clientTransport logMCPDebug(name, `In-process Computer Use MCP server started`) } else if (serverRef.type === 'stdio' || !serverRef.type) { const finalCommand = process.env.CLAUDE_CODE_SHELL_PREFIX || serverRef.command const finalArgs = process.env.CLAUDE_CODE_SHELL_PREFIX ? [[serverRef.command, ...serverRef.args].join(' ')] : serverRef.args transport = new StdioClientTransport({ command: finalCommand, args: finalArgs, env: { ...subprocessEnv(), ...serverRef.env, } as Record, stderr: 'pipe', // prevents error output from the MCP server from printing to the UI }) } else { throw new Error(`Unsupported server type: ${serverRef.type}`) } // Set up stderr logging for stdio transport before connecting in case there are any stderr // outputs emitted during the connection start (this can be useful for debugging failed connections). // Store handler reference for cleanup to prevent memory leaks let stderrHandler: ((data: Buffer) => void) | undefined let stderrOutput = '' if (serverRef.type === 'stdio' || !serverRef.type) { const stdioTransport = transport as StdioClientTransport if (stdioTransport.stderr) { stderrHandler = (data: Buffer) => { // Cap stderr accumulation to prevent unbounded memory growth if (stderrOutput.length < 64 * 1024 * 1024) { try { stderrOutput += data.toString() } catch { // Ignore errors from exceeding max string length } } } stdioTransport.stderr.on('data', stderrHandler) } } const client = new Client( { name: 'claude-code', title: 'Claude Code', version: MACRO.VERSION ?? 'unknown', description: "Anthropic's agentic coding tool", websiteUrl: PRODUCT_URL, }, { capabilities: { roots: {}, // Empty object declares the capability. Sending {form:{},url:{}} // breaks Java MCP SDK servers (Spring AI) whose Elicitation class // has zero fields and fails on unknown properties. elicitation: {}, }, }, ) // Add debug logging for client events if available if (serverRef.type === 'http') { logMCPDebug(name, `Client created, setting up request handler`) } client.setRequestHandler(ListRootsRequestSchema, async () => { logMCPDebug(name, `Received ListRoots request from server`) return { roots: [ { uri: `file://${getOriginalCwd()}`, }, ], } }) // Add a timeout to connection attempts to prevent tests from hanging indefinitely logMCPDebug( name, `Starting connection with timeout of ${getConnectionTimeoutMs()}ms`, ) // For HTTP transport, try a basic connectivity test first if (serverRef.type === 'http') { logMCPDebug( name, `Testing basic HTTP connectivity to ${mcpBaseUrlForDebug(serverRef)}`, ) try { const testUrl = new URL(serverRef.url) logMCPDebug(name, 'Parsed HTTP endpoint for preflight checks') // Log DNS resolution attempt if ( testUrl.hostname === '127.0.0.1' || testUrl.hostname === 'localhost' ) { logMCPDebug(name, 'Using loopback HTTP endpoint') } } catch (urlError) { logMCPDebug( name, `Failed to parse HTTP endpoint for preflight: ${summarizeMcpErrorForDebug( urlError, )}`, ) } } const connectPromise = client.connect(transport) const timeoutPromise = new Promise((_, reject) => { const timeoutId = setTimeout(() => { const elapsed = Date.now() - connectStartTime logMCPDebug( name, `Connection timeout triggered after ${elapsed}ms (limit: ${getConnectionTimeoutMs()}ms)`, ) if (inProcessServer) { inProcessServer.close().catch(() => {}) } transport.close().catch(() => {}) reject( new TelemetrySafeError_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS( `MCP server "${name}" connection timed out after ${getConnectionTimeoutMs()}ms`, 'MCP connection timeout', ), ) }, getConnectionTimeoutMs()) // Clean up timeout if connect resolves or rejects connectPromise.then( () => { clearTimeout(timeoutId) }, _error => { clearTimeout(timeoutId) }, ) }) try { await Promise.race([connectPromise, timeoutPromise]) if (stderrOutput) { logMCPError(name, summarizeStderrForDebug(stderrOutput)) stderrOutput = '' // Release accumulated string to prevent memory growth } const elapsed = Date.now() - connectStartTime logMCPDebug( name, `Successfully connected (transport: ${serverRef.type || 'stdio'}) in ${elapsed}ms`, ) } catch (error) { const elapsed = Date.now() - connectStartTime // SSE-specific error logging if (serverRef.type === 'sse' && error instanceof Error) { logMCPDebug( name, `SSE connection failed after ${elapsed}ms: ${summarizeMcpErrorForDebug( error, )}`, ) logMCPError( name, `SSE connection failed: ${summarizeMcpErrorForDebug(error)}`, ) if (error instanceof UnauthorizedError) { return handleRemoteAuthFailure(name, serverRef, 'sse') } } else if (serverRef.type === 'http' && error instanceof Error) { logMCPDebug( name, `HTTP connection failed after ${elapsed}ms: ${summarizeMcpErrorForDebug( error, )}`, ) logMCPError( name, `HTTP connection failed: ${summarizeMcpErrorForDebug(error)}`, ) if (error instanceof UnauthorizedError) { return handleRemoteAuthFailure(name, serverRef, 'http') } } else if ( serverRef.type === 'claudeai-proxy' && error instanceof Error ) { logMCPDebug( name, `claude.ai proxy connection failed after ${elapsed}ms: ${summarizeMcpErrorForDebug( error, )}`, ) logMCPError( name, `claude.ai proxy connection failed: ${summarizeMcpErrorForDebug( error, )}`, ) // StreamableHTTPError has a `code` property with the HTTP status const errorCode = (error as Error & { code?: number }).code if (errorCode === 401) { return handleRemoteAuthFailure(name, serverRef, 'claudeai-proxy') } } else if ( serverRef.type === 'sse-ide' || serverRef.type === 'ws-ide' ) { logEvent('tengu_mcp_ide_server_connection_failed', { connectionDurationMs: elapsed, }) } if (inProcessServer) { inProcessServer.close().catch(() => {}) } transport.close().catch(() => {}) if (stderrOutput) { logMCPError(name, summarizeStderrForDebug(stderrOutput)) } throw error } const capabilities = client.getServerCapabilities() const serverVersion = client.getServerVersion() const rawInstructions = client.getInstructions() let instructions = rawInstructions if ( rawInstructions && rawInstructions.length > MAX_MCP_DESCRIPTION_LENGTH ) { instructions = rawInstructions.slice(0, MAX_MCP_DESCRIPTION_LENGTH) + '… [truncated]' logMCPDebug( name, `Server instructions truncated from ${rawInstructions.length} to ${MAX_MCP_DESCRIPTION_LENGTH} chars`, ) } // Log successful connection details logMCPDebug( name, `Connection established with capabilities: ${jsonStringify({ hasTools: !!capabilities?.tools, hasPrompts: !!capabilities?.prompts, hasResources: !!capabilities?.resources, hasResourceSubscribe: !!capabilities?.resources?.subscribe, serverVersion: serverVersion || 'unknown', })}`, ) logForDebugging( `[MCP] Server "${name}" connected with subscribe=${!!capabilities?.resources?.subscribe}`, ) // Register default elicitation handler that returns cancel during the // window before registerElicitationHandler overwrites it in // onConnectionAttempt (useManageMCPConnections). client.setRequestHandler(ElicitRequestSchema, async request => { logMCPDebug( name, `Elicitation request received during initialization: ${jsonStringify(request)}`, ) return { action: 'cancel' as const } }) if (serverRef.type === 'sse-ide' || serverRef.type === 'ws-ide') { const ideConnectionDurationMs = Date.now() - connectStartTime logEvent('tengu_mcp_ide_server_connection_succeeded', { connectionDurationMs: ideConnectionDurationMs, serverVersion: serverVersion as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, }) try { void maybeNotifyIDEConnected(client) } catch (error) { logMCPError( name, `Failed to send ide_connected notification: ${summarizeMcpErrorForDebug( error, )}`, ) } } // Enhanced connection drop detection and logging for all transport types const connectionStartTime = Date.now() let hasErrorOccurred = false // Store original handlers const originalOnerror = client.onerror const originalOnclose = client.onclose // The SDK's transport calls onerror on connection failures but doesn't call onclose, // which CC uses to trigger reconnection. We bridge this gap by tracking consecutive // terminal errors and manually closing after MAX_ERRORS_BEFORE_RECONNECT failures. let consecutiveConnectionErrors = 0 const MAX_ERRORS_BEFORE_RECONNECT = 3 // Guard against re-entry: close() aborts in-flight streams which may fire // onerror again before the close chain completes. let hasTriggeredClose = false // client.close() → transport.close() → transport.onclose → SDK's _onclose(): // rejects all pending request handlers (so hung callTool() promises fail with // McpError -32000 "Connection closed") and then invokes our client.onclose // handler below (which clears the memo cache so the next call reconnects). // Calling client.onclose?.() directly would only clear the cache — pending // tool calls would stay hung. const closeTransportAndRejectPending = (reason: string) => { if (hasTriggeredClose) return hasTriggeredClose = true logMCPDebug(name, `Closing transport (${reason})`) void client.close().catch(e => { logMCPDebug( name, `Error during close: ${summarizeMcpErrorForDebug(e)}`, ) }) } const isTerminalConnectionError = (msg: string): boolean => { return ( msg.includes('ECONNRESET') || msg.includes('ETIMEDOUT') || msg.includes('EPIPE') || msg.includes('EHOSTUNREACH') || msg.includes('ECONNREFUSED') || msg.includes('Body Timeout Error') || msg.includes('terminated') || // SDK SSE reconnection intermediate errors — may be wrapped around the // actual network error, so the substrings above won't match msg.includes('SSE stream disconnected') || msg.includes('Failed to reconnect SSE stream') ) } // Enhanced error handler with detailed logging client.onerror = (error: Error) => { const uptime = Date.now() - connectionStartTime hasErrorOccurred = true const transportType = serverRef.type || 'stdio' // Log the connection drop with context logMCPDebug( name, `${transportType.toUpperCase()} connection dropped after ${Math.floor(uptime / 1000)}s uptime`, ) // Log specific error details for debugging if (error.message) { if (error.message.includes('ECONNRESET')) { logMCPDebug( name, `Connection reset - server may have crashed or restarted`, ) } else if (error.message.includes('ETIMEDOUT')) { logMCPDebug( name, `Connection timeout - network issue or server unresponsive`, ) } else if (error.message.includes('ECONNREFUSED')) { logMCPDebug(name, `Connection refused - server may be down`) } else if (error.message.includes('EPIPE')) { logMCPDebug( name, `Broken pipe - server closed connection unexpectedly`, ) } else if (error.message.includes('EHOSTUNREACH')) { logMCPDebug(name, `Host unreachable - network connectivity issue`) } else if (error.message.includes('ESRCH')) { logMCPDebug( name, `Process not found - stdio server process terminated`, ) } else if (error.message.includes('spawn')) { logMCPDebug( name, `Failed to spawn process - check command and permissions`, ) } else { logMCPDebug( name, `Connection error: ${summarizeMcpErrorForDebug(error)}`, ) } } // For HTTP transports, detect session expiry (404 + JSON-RPC -32001) // and close the transport so pending tool calls reject and the next // call reconnects with a fresh session ID. if ( (transportType === 'http' || transportType === 'claudeai-proxy') && isMcpSessionExpiredError(error) ) { logMCPDebug( name, `MCP session expired (server returned 404 with session-not-found), triggering reconnection`, ) closeTransportAndRejectPending('session expired') if (originalOnerror) { originalOnerror(error) } return } // For remote transports (SSE/HTTP), track terminal connection errors // and trigger reconnection via close if we see repeated failures. if ( transportType === 'sse' || transportType === 'http' || transportType === 'claudeai-proxy' ) { // The SDK's StreamableHTTP transport fires this after exhausting its // own SSE reconnect attempts (default maxRetries: 2) — but it never // calls onclose, so pending callTool() promises hang indefinitely. // This is the definitive "transport gave up" signal. if (error.message.includes('Maximum reconnection attempts')) { closeTransportAndRejectPending('SSE reconnection exhausted') if (originalOnerror) { originalOnerror(error) } return } if (isTerminalConnectionError(error.message)) { consecutiveConnectionErrors++ logMCPDebug( name, `Terminal connection error ${consecutiveConnectionErrors}/${MAX_ERRORS_BEFORE_RECONNECT}`, ) if (consecutiveConnectionErrors >= MAX_ERRORS_BEFORE_RECONNECT) { consecutiveConnectionErrors = 0 closeTransportAndRejectPending('max consecutive terminal errors') } } else { // Non-terminal error (e.g., transient issue), reset counter consecutiveConnectionErrors = 0 } } // Call original handler if (originalOnerror) { originalOnerror(error) } } // Enhanced close handler with connection drop context client.onclose = () => { const uptime = Date.now() - connectionStartTime const transportType = serverRef.type ?? 'unknown' logMCPDebug( name, `${transportType.toUpperCase()} connection closed after ${Math.floor(uptime / 1000)}s (${hasErrorOccurred ? 'with errors' : 'cleanly'})`, ) // Clear the memoization cache so next operation reconnects const key = getServerCacheKey(name, serverRef) // Also clear fetch caches (keyed by server name). Reconnection // creates a new connection object; without clearing, the next // fetch would return stale tools/resources from the old connection. fetchToolsForClient.cache.delete(name) fetchResourcesForClient.cache.delete(name) fetchCommandsForClient.cache.delete(name) if (feature('MCP_SKILLS')) { fetchMcpSkillsForClient!.cache.delete(name) } connectToServer.cache.delete(key) logMCPDebug(name, `Cleared connection cache for reconnection`) if (originalOnclose) { originalOnclose() } } const cleanup = async () => { // In-process servers (e.g. Chrome MCP) don't have child processes or stderr if (inProcessServer) { try { await inProcessServer.close() } catch (error) { logMCPDebug( name, `Error closing in-process server: ${summarizeMcpErrorForDebug( error, )}`, ) } try { await client.close() } catch (error) { logMCPDebug( name, `Error closing client: ${summarizeMcpErrorForDebug(error)}`, ) } return } // Remove stderr event listener to prevent memory leaks if (stderrHandler && (serverRef.type === 'stdio' || !serverRef.type)) { const stdioTransport = transport as StdioClientTransport stdioTransport.stderr?.off('data', stderrHandler) } // For stdio transports, explicitly terminate the child process with proper signals // NOTE: StdioClientTransport.close() only sends an abort signal, but many MCP servers // (especially Docker containers) need explicit SIGINT/SIGTERM signals to trigger graceful shutdown if (serverRef.type === 'stdio') { try { const stdioTransport = transport as StdioClientTransport const childPid = stdioTransport.pid if (childPid) { logMCPDebug(name, 'Sending SIGINT to MCP server process') // First try SIGINT (like Ctrl+C) try { process.kill(childPid, 'SIGINT') } catch (error) { logMCPDebug( name, `Error sending SIGINT: ${summarizeMcpErrorForDebug(error)}`, ) return } // Wait for graceful shutdown with rapid escalation (total 500ms to keep CLI responsive) await new Promise(async resolve => { let resolved = false // Set up a timer to check if process still exists const checkInterval = setInterval(() => { try { // process.kill(pid, 0) checks if process exists without killing it process.kill(childPid, 0) } catch { // Process no longer exists if (!resolved) { resolved = true clearInterval(checkInterval) clearTimeout(failsafeTimeout) logMCPDebug(name, 'MCP server process exited cleanly') resolve() } } }, 50) // Absolute failsafe: clear interval after 600ms no matter what const failsafeTimeout = setTimeout(() => { if (!resolved) { resolved = true clearInterval(checkInterval) logMCPDebug( name, 'Cleanup timeout reached, stopping process monitoring', ) resolve() } }, 600) try { // Wait 100ms for SIGINT to work (usually much faster) await sleep(100) if (!resolved) { // Check if process still exists try { process.kill(childPid, 0) // Process still exists, SIGINT failed, try SIGTERM logMCPDebug( name, 'SIGINT failed, sending SIGTERM to MCP server process', ) try { process.kill(childPid, 'SIGTERM') } catch (termError) { logMCPDebug( name, `Error sending SIGTERM: ${summarizeMcpErrorForDebug( termError, )}`, ) resolved = true clearInterval(checkInterval) clearTimeout(failsafeTimeout) resolve() return } } catch { // Process already exited resolved = true clearInterval(checkInterval) clearTimeout(failsafeTimeout) resolve() return } // Wait 400ms for SIGTERM to work (slower than SIGINT, often used for cleanup) await sleep(400) if (!resolved) { // Check if process still exists try { process.kill(childPid, 0) // Process still exists, SIGTERM failed, force kill with SIGKILL logMCPDebug( name, 'SIGTERM failed, sending SIGKILL to MCP server process', ) try { process.kill(childPid, 'SIGKILL') } catch (killError) { logMCPDebug( name, `Error sending SIGKILL: ${summarizeMcpErrorForDebug( killError, )}`, ) } } catch { // Process already exited resolved = true clearInterval(checkInterval) clearTimeout(failsafeTimeout) resolve() } } } // Final timeout - always resolve after 500ms max (total cleanup time) if (!resolved) { resolved = true clearInterval(checkInterval) clearTimeout(failsafeTimeout) resolve() } } catch { // Handle any errors in the escalation sequence if (!resolved) { resolved = true clearInterval(checkInterval) clearTimeout(failsafeTimeout) resolve() } } }) } } catch (processError) { logMCPDebug( name, `Error terminating process: ${summarizeMcpErrorForDebug( processError, )}`, ) } } // Close the client connection (which also closes the transport) try { await client.close() } catch (error) { logMCPDebug( name, `Error closing client: ${summarizeMcpErrorForDebug(error)}`, ) } } // Register cleanup for all transport types - even network transports might need cleanup // This ensures all MCP servers get properly terminated, not just stdio ones const cleanupUnregister = registerCleanup(cleanup) // Create the wrapped cleanup that includes unregistering const wrappedCleanup = async () => { cleanupUnregister?.() await cleanup() } const connectionDurationMs = Date.now() - connectStartTime logEvent('tengu_mcp_server_connection_succeeded', { connectionDurationMs, transportType: (serverRef.type ?? 'stdio') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, totalServers: serverStats?.totalServers, stdioCount: serverStats?.stdioCount, sseCount: serverStats?.sseCount, httpCount: serverStats?.httpCount, sseIdeCount: serverStats?.sseIdeCount, wsIdeCount: serverStats?.wsIdeCount, ...mcpBaseUrlAnalytics(serverRef), }) return { name, client, type: 'connected' as const, capabilities: capabilities ?? {}, serverInfo: serverVersion, instructions, config: serverRef, cleanup: wrappedCleanup, } } catch (error) { const connectionDurationMs = Date.now() - connectStartTime logEvent('tengu_mcp_server_connection_failed', { connectionDurationMs, totalServers: serverStats?.totalServers || 1, stdioCount: serverStats?.stdioCount || (serverRef.type === 'stdio' ? 1 : 0), sseCount: serverStats?.sseCount || (serverRef.type === 'sse' ? 1 : 0), httpCount: serverStats?.httpCount || (serverRef.type === 'http' ? 1 : 0), sseIdeCount: serverStats?.sseIdeCount || (serverRef.type === 'sse-ide' ? 1 : 0), wsIdeCount: serverStats?.wsIdeCount || (serverRef.type === 'ws-ide' ? 1 : 0), transportType: (serverRef.type ?? 'stdio') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, ...mcpBaseUrlAnalytics(serverRef), }) logMCPDebug( name, `Connection failed after ${connectionDurationMs}ms: ${summarizeMcpErrorForDebug( error, )}`, ) logMCPError( name, `Connection failed: ${summarizeMcpErrorForDebug(error)}`, ) if (inProcessServer) { inProcessServer.close().catch(() => {}) } return { name, type: 'failed' as const, config: serverRef, error: errorMessage(error), } } }, getServerCacheKey, ) /** * Clears the memoize cache for a specific server * @param name Server name * @param serverRef Server configuration */ export async function clearServerCache( name: string, serverRef: ScopedMcpServerConfig, ): Promise { const key = getServerCacheKey(name, serverRef) try { const wrappedClient = await connectToServer(name, serverRef) if (wrappedClient.type === 'connected') { await wrappedClient.cleanup() } } catch { // Ignore errors - server might have failed to connect } // Clear from cache (both connection and fetch caches so reconnect // fetches fresh tools/resources/commands instead of stale ones) connectToServer.cache.delete(key) fetchToolsForClient.cache.delete(name) fetchResourcesForClient.cache.delete(name) fetchCommandsForClient.cache.delete(name) if (feature('MCP_SKILLS')) { fetchMcpSkillsForClient!.cache.delete(name) } } /** * Ensures a valid connected client for an MCP server. * For most server types, uses the memoization cache if available, or reconnects * if the cache was cleared (e.g., after onclose). This ensures tool/resource * calls always use a valid connection. * * SDK MCP servers run in-process and are handled separately via setupSdkMcpClients, * so they are returned as-is without going through connectToServer. * * @param client The connected MCP server client * @returns Connected MCP server client (same or reconnected) * @throws Error if server cannot be connected */ export async function ensureConnectedClient( client: ConnectedMCPServer, ): Promise { // SDK MCP servers run in-process and are handled separately via setupSdkMcpClients if (client.config.type === 'sdk') { return client } const connectedClient = await connectToServer(client.name, client.config) if (connectedClient.type !== 'connected') { throw new TelemetrySafeError_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS( `MCP server "${client.name}" is not connected`, 'MCP server not connected', ) } return connectedClient } /** * Compares two MCP server configurations to determine if they are equivalent. * Used to detect when a server needs to be reconnected due to config changes. */ export function areMcpConfigsEqual( a: ScopedMcpServerConfig, b: ScopedMcpServerConfig, ): boolean { // Quick type check first if (a.type !== b.type) return false // Compare by serializing - this handles all config variations // We exclude 'scope' from comparison since it's metadata, not connection config const { scope: _scopeA, ...configA } = a const { scope: _scopeB, ...configB } = b return jsonStringify(configA) === jsonStringify(configB) } // Max cache size for fetch* caches. Keyed by server name (stable across // reconnects), bounded to prevent unbounded growth with many MCP servers. const MCP_FETCH_CACHE_SIZE = 20 /** * Encode MCP tool input for the auto-mode security classifier. * Exported so the auto-mode eval scripts can mirror production encoding * for `mcp__*` tool stubs without duplicating this logic. */ export function mcpToolInputToAutoClassifierInput( input: Record, toolName: string, ): string { const keys = Object.keys(input) return keys.length > 0 ? keys.map(k => `${k}=${String(input[k])}`).join(' ') : toolName } export const fetchToolsForClient = memoizeWithLRU( async (client: MCPServerConnection): Promise => { if (client.type !== 'connected') return [] try { if (!client.capabilities?.tools) { return [] } const result = (await client.client.request( { method: 'tools/list' }, ListToolsResultSchema, )) as ListToolsResult // Sanitize tool data from MCP server const toolsToProcess = recursivelySanitizeUnicode(result.tools) // Check if we should skip the mcp__ prefix for SDK MCP servers const skipPrefix = client.config.type === 'sdk' && isEnvTruthy(process.env.CLAUDE_AGENT_SDK_MCP_NO_PREFIX) // Convert MCP tools to our Tool format return toolsToProcess .map((tool): Tool => { const fullyQualifiedName = buildMcpToolName(client.name, tool.name) return { ...MCPTool, // In skip-prefix mode, use the original name for model invocation so MCP tools // can override builtins by name. mcpInfo is used for permission checking. name: skipPrefix ? tool.name : fullyQualifiedName, mcpInfo: { serverName: client.name, toolName: tool.name }, isMcp: true, // Collapse whitespace: _meta is open to external MCP servers, and // a newline here would inject orphan lines into the deferred-tool // list (formatDeferredToolLine joins on '\n'). searchHint: typeof tool._meta?.['anthropic/searchHint'] === 'string' ? tool._meta['anthropic/searchHint'] .replace(/\s+/g, ' ') .trim() || undefined : undefined, alwaysLoad: tool._meta?.['anthropic/alwaysLoad'] === true, async description() { return tool.description ?? '' }, async prompt() { const desc = tool.description ?? '' return desc.length > MAX_MCP_DESCRIPTION_LENGTH ? desc.slice(0, MAX_MCP_DESCRIPTION_LENGTH) + '… [truncated]' : desc }, isConcurrencySafe() { return tool.annotations?.readOnlyHint ?? false }, isReadOnly() { return tool.annotations?.readOnlyHint ?? false }, toAutoClassifierInput(input) { return mcpToolInputToAutoClassifierInput(input, tool.name) }, isDestructive() { return tool.annotations?.destructiveHint ?? false }, isOpenWorld() { return tool.annotations?.openWorldHint ?? false }, isSearchOrReadCommand() { return classifyMcpToolForCollapse(client.name, tool.name) }, inputJSONSchema: tool.inputSchema as Tool['inputJSONSchema'], async checkPermissions() { return { behavior: 'passthrough' as const, message: 'MCPTool requires permission.', suggestions: [ { type: 'addRules' as const, rules: [ { toolName: fullyQualifiedName, ruleContent: undefined, }, ], behavior: 'allow' as const, destination: 'localSettings' as const, }, ], } }, async call( args: Record, context, _canUseTool, parentMessage, onProgress?: ToolCallProgress, ) { const toolUseId = extractToolUseId(parentMessage) const meta = toolUseId ? { 'claudecode/toolUseId': toolUseId } : {} // Emit progress when tool starts if (onProgress && toolUseId) { onProgress({ toolUseID: toolUseId, data: { type: 'mcp_progress', status: 'started', serverName: client.name, toolName: tool.name, }, }) } const startTime = Date.now() const MAX_SESSION_RETRIES = 1 for (let attempt = 0; ; attempt++) { try { const connectedClient = await ensureConnectedClient(client) const mcpResult = await callMCPToolWithUrlElicitationRetry({ client: connectedClient, clientConnection: client, tool: tool.name, args, meta, signal: context.abortController.signal, setAppState: context.setAppState, onProgress: onProgress && toolUseId ? progressData => { onProgress({ toolUseID: toolUseId, data: progressData, }) } : undefined, handleElicitation: context.handleElicitation, }) // Emit progress when tool completes successfully if (onProgress && toolUseId) { onProgress({ toolUseID: toolUseId, data: { type: 'mcp_progress', status: 'completed', serverName: client.name, toolName: tool.name, elapsedTimeMs: Date.now() - startTime, }, }) } return { data: mcpResult.content, ...((mcpResult._meta || mcpResult.structuredContent) && { mcpMeta: { ...(mcpResult._meta && { _meta: mcpResult._meta, }), ...(mcpResult.structuredContent && { structuredContent: mcpResult.structuredContent, }), }, }), } } catch (error) { // Session expired — the connection cache has been // cleared, so retry with a fresh client. if ( error instanceof McpSessionExpiredError && attempt < MAX_SESSION_RETRIES ) { logMCPDebug( client.name, `Retrying tool '${tool.name}' after session recovery`, ) continue } // Emit progress when tool fails if (onProgress && toolUseId) { onProgress({ toolUseID: toolUseId, data: { type: 'mcp_progress', status: 'failed', serverName: client.name, toolName: tool.name, elapsedTimeMs: Date.now() - startTime, }, }) } // Wrap MCP SDK errors so telemetry gets useful context // instead of just "Error" or "McpError" (the constructor // name). MCP SDK errors are protocol-level messages and // don't contain user file paths or code. if ( error instanceof Error && !( error instanceof TelemetrySafeError_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS ) ) { const name = error.constructor.name if (name === 'Error') { throw new TelemetrySafeError_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS( error.message, error.message.slice(0, 200), ) } // McpError has a numeric `code` with the JSON-RPC error // code (e.g. -32000 ConnectionClosed, -32001 RequestTimeout) if ( name === 'McpError' && 'code' in error && typeof error.code === 'number' ) { throw new TelemetrySafeError_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS( error.message, `McpError ${error.code}`, ) } } throw error } } }, userFacingName() { // Prefer title annotation if available, otherwise use tool name const displayName = tool.annotations?.title || tool.name return `${client.name} - ${displayName} (MCP)` }, ...(isClaudeInChromeMCPServer(client.name) && (client.config.type === 'stdio' || !client.config.type) ? claudeInChromeToolRendering().getClaudeInChromeMCPToolOverrides( tool.name, ) : {}), ...(feature('CHICAGO_MCP') && (client.config.type === 'stdio' || !client.config.type) && isComputerUseMCPServer!(client.name) ? computerUseWrapper!().getComputerUseMCPToolOverrides(tool.name) : {}), } }) .filter(isIncludedMcpTool) } catch (error) { logMCPError( client.name, `Failed to fetch tools: ${summarizeMcpErrorForDebug(error)}`, ) return [] } }, (client: MCPServerConnection) => client.name, MCP_FETCH_CACHE_SIZE, ) export const fetchResourcesForClient = memoizeWithLRU( async (client: MCPServerConnection): Promise => { if (client.type !== 'connected') return [] try { if (!client.capabilities?.resources) { return [] } const result = await client.client.request( { method: 'resources/list' }, ListResourcesResultSchema, ) if (!result.resources) return [] // Add server name to each resource return result.resources.map(resource => ({ ...resource, server: client.name, })) } catch (error) { logMCPError( client.name, `Failed to fetch resources: ${summarizeMcpErrorForDebug(error)}`, ) return [] } }, (client: MCPServerConnection) => client.name, MCP_FETCH_CACHE_SIZE, ) export const fetchCommandsForClient = memoizeWithLRU( async (client: MCPServerConnection): Promise => { if (client.type !== 'connected') return [] try { if (!client.capabilities?.prompts) { return [] } // Request prompts list from client const result = (await client.client.request( { method: 'prompts/list' }, ListPromptsResultSchema, )) as ListPromptsResult if (!result.prompts) return [] // Sanitize prompt data from MCP server const promptsToProcess = recursivelySanitizeUnicode(result.prompts) // Convert MCP prompts to our Command format return promptsToProcess.map(prompt => { const argNames = Object.values(prompt.arguments ?? {}).map(k => k.name) return { type: 'prompt' as const, name: 'mcp__' + normalizeNameForMCP(client.name) + '__' + prompt.name, description: prompt.description ?? '', hasUserSpecifiedDescription: !!prompt.description, contentLength: 0, // Dynamic MCP content isEnabled: () => true, isHidden: false, isMcp: true, progressMessage: 'running', userFacingName() { // Use prompt.name (programmatic identifier) not prompt.title (display name) // to avoid spaces breaking slash command parsing return `${client.name}:${prompt.name} (MCP)` }, argNames, source: 'mcp', async getPromptForCommand(args: string) { const argsArray = args.split(' ') try { const connectedClient = await ensureConnectedClient(client) const result = await connectedClient.client.getPrompt({ name: prompt.name, arguments: zipObject(argNames, argsArray), }) const transformed = await Promise.all( result.messages.map(message => transformResultContent(message.content, connectedClient.name), ), ) return transformed.flat() } catch (error) { logMCPError( client.name, `Error running command '${prompt.name}': ${summarizeMcpErrorForDebug( error, )}`, ) throw error } }, } }) } catch (error) { logMCPError( client.name, `Failed to fetch commands: ${summarizeMcpErrorForDebug(error)}`, ) return [] } }, (client: MCPServerConnection) => client.name, MCP_FETCH_CACHE_SIZE, ) /** * Call an IDE tool directly as an RPC * @param toolName The name of the tool to call * @param args The arguments to pass to the tool * @param client The IDE client to use for the RPC call * @returns The result of the tool call */ export async function callIdeRpc( toolName: string, args: Record, client: ConnectedMCPServer, ): Promise { const result = await callMCPTool({ client, tool: toolName, args, signal: createAbortController().signal, }) return result.content } /** * Note: This should not be called by UI components directly, they should use the reconnectMcpServer * function from useManageMcpConnections. * @param name Server name * @param config Server configuration * @returns Object containing the client connection and its resources */ export async function reconnectMcpServerImpl( name: string, config: ScopedMcpServerConfig, ): Promise<{ client: MCPServerConnection tools: Tool[] commands: Command[] resources?: ServerResource[] }> { try { // Invalidate the keychain cache so we read fresh credentials from disk. // This is necessary when another process (e.g. the VS Code extension host) // has modified stored tokens (cleared auth, saved new OAuth tokens) and then // asks the CLI subprocess to reconnect. Without this, the subprocess would // use stale cached data and never notice the tokens were removed. clearKeychainCache() await clearServerCache(name, config) const client = await connectToServer(name, config) if (client.type !== 'connected') { return { client, tools: [], commands: [], } } if (config.type === 'claudeai-proxy') { markClaudeAiMcpConnected(name) } const supportsResources = !!client.capabilities?.resources const [tools, mcpCommands, mcpSkills, resources] = await Promise.all([ fetchToolsForClient(client), fetchCommandsForClient(client), feature('MCP_SKILLS') && supportsResources ? fetchMcpSkillsForClient!(client) : Promise.resolve([]), supportsResources ? fetchResourcesForClient(client) : Promise.resolve([]), ]) const commands = [...mcpCommands, ...mcpSkills] // Check if we need to add resource tools const resourceTools: Tool[] = [] if (supportsResources) { // Only add resource tools if no other server has them const hasResourceTools = [ListMcpResourcesTool, ReadMcpResourceTool].some( tool => tools.some(t => toolMatchesName(t, tool.name)), ) if (!hasResourceTools) { resourceTools.push(ListMcpResourcesTool, ReadMcpResourceTool) } } return { client, tools: [...tools, ...resourceTools], commands, resources: resources.length > 0 ? resources : undefined, } } catch (error) { // Handle errors gracefully - connection might have closed during fetch logMCPError( name, `Error during reconnection: ${summarizeMcpErrorForDebug(error)}`, ) // Return with failed status return { client: { name, type: 'failed' as const, config }, tools: [], commands: [], } } } // Replaced 2026-03: previous implementation ran fixed-size sequential batches // (await batch 1 fully, then start batch 2). That meant one slow server in // batch N held up ALL servers in batch N+1, even if the other 19 slots were // idle. pMap frees each slot as soon as its server completes, so a single // slow server only occupies one slot instead of blocking an entire batch // boundary. Same concurrency ceiling, same results, better scheduling. async function processBatched( items: T[], concurrency: number, processor: (item: T) => Promise, ): Promise { await pMap(items, processor, { concurrency }) } export async function getMcpToolsCommandsAndResources( onConnectionAttempt: (params: { client: MCPServerConnection tools: Tool[] commands: Command[] resources?: ServerResource[] }) => void, mcpConfigs?: Record, ): Promise { let resourceToolsAdded = false const allConfigEntries = Object.entries( mcpConfigs ?? (await getAllMcpConfigs()).servers, ) // Partition into disabled and active entries — disabled servers should // never generate HTTP connections or flow through batch processing const configEntries: typeof allConfigEntries = [] for (const entry of allConfigEntries) { if (isMcpServerDisabled(entry[0])) { onConnectionAttempt({ client: { name: entry[0], type: 'disabled', config: entry[1] }, tools: [], commands: [], }) } else { configEntries.push(entry) } } // Calculate transport counts for logging const totalServers = configEntries.length const stdioCount = count(configEntries, ([_, c]) => c.type === 'stdio') const sseCount = count(configEntries, ([_, c]) => c.type === 'sse') const httpCount = count(configEntries, ([_, c]) => c.type === 'http') const sseIdeCount = count(configEntries, ([_, c]) => c.type === 'sse-ide') const wsIdeCount = count(configEntries, ([_, c]) => c.type === 'ws-ide') // Split servers by type: local (stdio/sdk) need lower concurrency due to // process spawning, remote servers can connect with higher concurrency const localServers = configEntries.filter(([_, config]) => isLocalMcpServer(config), ) const remoteServers = configEntries.filter( ([_, config]) => !isLocalMcpServer(config), ) const serverStats = { totalServers, stdioCount, sseCount, httpCount, sseIdeCount, wsIdeCount, } const processServer = async ([name, config]: [ string, ScopedMcpServerConfig, ]): Promise => { try { // Check if server is disabled - if so, just add it to state without connecting if (isMcpServerDisabled(name)) { onConnectionAttempt({ client: { name, type: 'disabled', config, }, tools: [], commands: [], }) return } // Skip connection for servers that recently returned 401 (15min TTL), // or that we have probed before but hold no token for. The second // check closes the gap the TTL leaves open: without it, every 15min // we re-probe servers that cannot succeed until the user runs /mcp. // Each probe is a network round-trip for connect-401 plus OAuth // discovery, and print mode awaits the whole batch (main.tsx:3503). if ( (config.type === 'claudeai-proxy' || config.type === 'http' || config.type === 'sse') && ((await isMcpAuthCached(name)) || ((config.type === 'http' || config.type === 'sse') && hasMcpDiscoveryButNoToken(name, config))) ) { logMCPDebug(name, `Skipping connection (cached needs-auth)`) onConnectionAttempt({ client: { name, type: 'needs-auth' as const, config }, tools: [createMcpAuthTool(name, config)], commands: [], }) return } const client = await connectToServer(name, config, serverStats) if (client.type !== 'connected') { onConnectionAttempt({ client, tools: client.type === 'needs-auth' ? [createMcpAuthTool(name, config)] : [], commands: [], }) return } if (config.type === 'claudeai-proxy') { markClaudeAiMcpConnected(name) } const supportsResources = !!client.capabilities?.resources const [tools, mcpCommands, mcpSkills, resources] = await Promise.all([ fetchToolsForClient(client), fetchCommandsForClient(client), // Discover skills from skill:// resources feature('MCP_SKILLS') && supportsResources ? fetchMcpSkillsForClient!(client) : Promise.resolve([]), // Fetch resources if supported supportsResources ? fetchResourcesForClient(client) : Promise.resolve([]), ]) const commands = [...mcpCommands, ...mcpSkills] // If this server resources and we haven't added resource tools yet, // include our resource tools with this client's tools const resourceTools: Tool[] = [] if (supportsResources && !resourceToolsAdded) { resourceToolsAdded = true resourceTools.push(ListMcpResourcesTool, ReadMcpResourceTool) } onConnectionAttempt({ client, tools: [...tools, ...resourceTools], commands, resources: resources.length > 0 ? resources : undefined, }) } catch (error) { // Handle errors gracefully - connection might have closed during fetch logMCPError( name, `Error fetching tools/commands/resources: ${summarizeMcpErrorForDebug( error, )}`, ) // Still update with the client but no tools/commands onConnectionAttempt({ client: { name, type: 'failed' as const, config }, tools: [], commands: [], }) } } // Process both groups concurrently, each with their own concurrency limits: // - Local servers (stdio/sdk): lower concurrency to avoid process spawning resource contention // - Remote servers: higher concurrency since they're just network connections await Promise.all([ processBatched( localServers, getMcpServerConnectionBatchSize(), processServer, ), processBatched( remoteServers, getRemoteMcpServerConnectionBatchSize(), processServer, ), ]) } // Not memoized: called only 2-3 times at startup/reconfig. The inner work // (connectToServer, fetch*ForClient) is already cached. Memoizing here by // mcpConfigs object ref leaked — main.tsx creates fresh config objects each call. export function prefetchAllMcpResources( mcpConfigs: Record, ): Promise<{ clients: MCPServerConnection[] tools: Tool[] commands: Command[] }> { return new Promise(resolve => { let pendingCount = 0 let completedCount = 0 pendingCount = Object.keys(mcpConfigs).length if (pendingCount === 0) { void resolve({ clients: [], tools: [], commands: [], }) return } const clients: MCPServerConnection[] = [] const tools: Tool[] = [] const commands: Command[] = [] getMcpToolsCommandsAndResources(result => { clients.push(result.client) tools.push(...result.tools) commands.push(...result.commands) completedCount++ if (completedCount >= pendingCount) { const commandsMetadataLength = commands.reduce((sum, command) => { const commandMetadataLength = command.name.length + (command.description ?? '').length + (command.argumentHint ?? '').length return sum + commandMetadataLength }, 0) logEvent('tengu_mcp_tools_commands_loaded', { tools_count: tools.length, commands_count: commands.length, commands_metadata_length: commandsMetadataLength, }) void resolve({ clients, tools, commands, }) } }, mcpConfigs).catch(error => { logMCPError( 'prefetchAllMcpResources', `Failed to get MCP resources: ${summarizeMcpErrorForDebug(error)}`, ) // Still resolve with empty results void resolve({ clients: [], tools: [], commands: [], }) }) }) } /** * Transform result content from an MCP tool or MCP prompt into message blocks */ export async function transformResultContent( resultContent: PromptMessage['content'], serverName: string, ): Promise> { switch (resultContent.type) { case 'text': return [ { type: 'text', text: resultContent.text, }, ] case 'audio': { const audioData = resultContent as { type: 'audio' data: string mimeType?: string } return await persistBlobToTextBlock( Buffer.from(audioData.data, 'base64'), audioData.mimeType, serverName, `[Audio from ${serverName}] `, ) } case 'image': { // Resize and compress image data, enforcing API dimension limits const imageBuffer = Buffer.from(String(resultContent.data), 'base64') const ext = resultContent.mimeType?.split('/')[1] || 'png' const resized = await maybeResizeAndDownsampleImageBuffer( imageBuffer, imageBuffer.length, ext, ) return [ { type: 'image', source: { data: resized.buffer.toString('base64'), media_type: `image/${resized.mediaType}` as Base64ImageSource['media_type'], type: 'base64', }, }, ] } case 'resource': { const resource = resultContent.resource const prefix = `[Resource from ${serverName} at ${resource.uri}] ` if ('text' in resource) { return [ { type: 'text', text: `${prefix}${resource.text}`, }, ] } else if ('blob' in resource) { const isImage = IMAGE_MIME_TYPES.has(resource.mimeType ?? '') if (isImage) { // Resize and compress image blob, enforcing API dimension limits const imageBuffer = Buffer.from(resource.blob, 'base64') const ext = resource.mimeType?.split('/')[1] || 'png' const resized = await maybeResizeAndDownsampleImageBuffer( imageBuffer, imageBuffer.length, ext, ) const content: MessageParam['content'] = [] if (prefix) { content.push({ type: 'text', text: prefix, }) } content.push({ type: 'image', source: { data: resized.buffer.toString('base64'), media_type: `image/${resized.mediaType}` as Base64ImageSource['media_type'], type: 'base64', }, }) return content } else { return await persistBlobToTextBlock( Buffer.from(resource.blob, 'base64'), resource.mimeType, serverName, prefix, ) } } return [] } case 'resource_link': { const resourceLink = resultContent as ResourceLink let text = `[Resource link: ${resourceLink.name}] ${resourceLink.uri}` if (resourceLink.description) { text += ` (${resourceLink.description})` } return [ { type: 'text', text, }, ] } default: return [] } } /** * Decode base64 binary content, write it to disk with the proper extension, * and return a small text block with the file path. Replaces the old behavior * of dumping raw base64 into the context. */ async function persistBlobToTextBlock( bytes: Buffer, mimeType: string | undefined, serverName: string, sourceDescription: string, ): Promise> { const persistId = `mcp-${normalizeNameForMCP(serverName)}-blob-${Date.now()}-${Math.random().toString(36).slice(2, 8)}` const result = await persistBinaryContent(bytes, mimeType, persistId) if ('error' in result) { return [ { type: 'text', text: `${sourceDescription}Binary content (${mimeType || 'unknown type'}, ${bytes.length} bytes) could not be saved to disk: ${result.error}`, }, ] } return [ { type: 'text', text: getBinaryBlobSavedMessage( result.filepath, mimeType, result.size, sourceDescription, ), }, ] } /** * Processes MCP tool result into a normalized format. */ export type MCPResultType = 'toolResult' | 'structuredContent' | 'contentArray' export type TransformedMCPResult = { content: MCPToolResult type: MCPResultType schema?: string } /** * Generates a compact, jq-friendly type signature for a value. * e.g. "{title: string, items: [{id: number, name: string}]}" */ export function inferCompactSchema(value: unknown, depth = 2): string { if (value === null) return 'null' if (Array.isArray(value)) { if (value.length === 0) return '[]' return `[${inferCompactSchema(value[0], depth - 1)}]` } if (typeof value === 'object') { if (depth <= 0) return '{...}' const entries = Object.entries(value).slice(0, 10) const props = entries.map( ([k, v]) => `${k}: ${inferCompactSchema(v, depth - 1)}`, ) const suffix = Object.keys(value).length > 10 ? ', ...' : '' return `{${props.join(', ')}${suffix}}` } return typeof value } export async function transformMCPResult( result: unknown, tool: string, // Tool name for validation (e.g., "search") name: string, // Server name for transformation (e.g., "slack") ): Promise { if (result && typeof result === 'object') { if ('toolResult' in result) { return { content: String(result.toolResult), type: 'toolResult', } } if ( 'structuredContent' in result && result.structuredContent !== undefined ) { return { content: jsonStringify(result.structuredContent), type: 'structuredContent', schema: inferCompactSchema(result.structuredContent), } } if ('content' in result && Array.isArray(result.content)) { const transformedContent = ( await Promise.all( result.content.map(item => transformResultContent(item, name)), ) ).flat() return { content: transformedContent, type: 'contentArray', schema: inferCompactSchema(transformedContent), } } } const errorMessage = `MCP server "${name}" tool "${tool}": unexpected response format` logMCPError(name, errorMessage) throw new TelemetrySafeError_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS( errorMessage, 'MCP tool unexpected response format', ) } /** * Check if MCP content contains any image blocks. * Used to decide whether to persist to file (images should use truncation instead * to preserve image compression and viewability). */ function contentContainsImages(content: MCPToolResult): boolean { if (!content || typeof content === 'string') { return false } return content.some(block => block.type === 'image') } export async function processMCPResult( result: unknown, tool: string, // Tool name for validation (e.g., "search") name: string, // Server name for IDE check and transformation (e.g., "slack") ): Promise { const { content, type, schema } = await transformMCPResult(result, tool, name) // IDE tools are not going to the model directly, so we don't need to // handle large output. if (name === 'ide') { return content } // Check if content needs truncation (i.e., is too large) if (!(await mcpContentNeedsTruncation(content))) { return content } const sizeEstimateTokens = getContentSizeEstimate(content) // If large output files feature is disabled, fall back to old truncation behavior if (isEnvDefinedFalsy(process.env.ENABLE_MCP_LARGE_OUTPUT_FILES)) { logEvent('tengu_mcp_large_result_handled', { outcome: 'truncated', reason: 'env_disabled', sizeEstimateTokens, } as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS) return await truncateMcpContentIfNeeded(content) } // Save large output to file and return instructions for reading it // Content is guaranteed to exist at this point (we checked mcpContentNeedsTruncation) if (!content) { return content } // If content contains images, fall back to truncation - persisting images as JSON // defeats the image compression logic and makes them non-viewable if (contentContainsImages(content)) { logEvent('tengu_mcp_large_result_handled', { outcome: 'truncated', reason: 'contains_images', sizeEstimateTokens, } as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS) return await truncateMcpContentIfNeeded(content) } // Generate a unique ID for the persisted file (server__tool-timestamp) const timestamp = Date.now() const persistId = `mcp-${normalizeNameForMCP(name)}-${normalizeNameForMCP(tool)}-${timestamp}` // Convert to string for persistence (persistToolResult expects string or specific block types) const contentStr = typeof content === 'string' ? content : jsonStringify(content, null, 2) const persistResult = await persistToolResult(contentStr, persistId) if (isPersistError(persistResult)) { // If file save failed, fall back to returning truncated content info const contentLength = contentStr.length logEvent('tengu_mcp_large_result_handled', { outcome: 'truncated', reason: 'persist_failed', sizeEstimateTokens, } as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS) return `Error: result (${contentLength.toLocaleString()} characters) exceeds maximum allowed tokens. Failed to save output to file: ${persistResult.error}. If this MCP server provides pagination or filtering tools, use them to retrieve specific portions of the data.` } logEvent('tengu_mcp_large_result_handled', { outcome: 'persisted', reason: 'file_saved', sizeEstimateTokens, persistedSizeChars: persistResult.originalSize, } as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS) const formatDescription = getFormatDescription(type, schema) return getLargeOutputInstructions( persistResult.filepath, persistResult.originalSize, formatDescription, ) } /** * Call an MCP tool, handling UrlElicitationRequiredError (-32042) by * displaying the URL elicitation to the user, waiting for the completion * notification, and retrying the tool call. */ type MCPToolCallResult = { content: MCPToolResult _meta?: Record structuredContent?: Record } /** @internal Exported for testing. */ export async function callMCPToolWithUrlElicitationRetry({ client: connectedClient, clientConnection, tool, args, meta, signal, setAppState, onProgress, callToolFn = callMCPTool, handleElicitation, }: { client: ConnectedMCPServer clientConnection: MCPServerConnection tool: string args: Record meta?: Record signal: AbortSignal setAppState: (f: (prev: AppState) => AppState) => void onProgress?: (data: MCPProgress) => void /** Injectable for testing. Defaults to callMCPTool. */ callToolFn?: (opts: { client: ConnectedMCPServer tool: string args: Record meta?: Record signal: AbortSignal onProgress?: (data: MCPProgress) => void }) => Promise /** Handler for URL elicitations when no hook handles them. * In print/SDK mode, delegates to structuredIO. In REPL, falls back to queue. */ handleElicitation?: ( serverName: string, params: ElicitRequestURLParams, signal: AbortSignal, ) => Promise }): Promise { const MAX_URL_ELICITATION_RETRIES = 3 for (let attempt = 0; ; attempt++) { try { return await callToolFn({ client: connectedClient, tool, args, meta, signal, onProgress, }) } catch (error) { // The MCP SDK's Protocol creates plain McpError (not UrlElicitationRequiredError) // for error responses, so we check the error code instead of instanceof. if ( !(error instanceof McpError) || error.code !== ErrorCode.UrlElicitationRequired ) { throw error } // Limit the number of URL elicitation retries if (attempt >= MAX_URL_ELICITATION_RETRIES) { throw error } const errorData = error.data const rawElicitations = errorData != null && typeof errorData === 'object' && 'elicitations' in errorData && Array.isArray(errorData.elicitations) ? (errorData.elicitations as unknown[]) : [] // Validate each element has the required fields for ElicitRequestURLParams const elicitations = rawElicitations.filter( (e): e is ElicitRequestURLParams => { if (e == null || typeof e !== 'object') return false const obj = e as Record return ( obj.mode === 'url' && typeof obj.url === 'string' && typeof obj.elicitationId === 'string' && typeof obj.message === 'string' ) }, ) const serverName = clientConnection.type === 'connected' ? clientConnection.name : 'unknown' if (elicitations.length === 0) { logMCPDebug( serverName, `Tool '${tool}' returned -32042 but no valid elicitations in error data`, ) throw error } logMCPDebug( serverName, `Tool '${tool}' requires URL elicitation (error -32042, attempt ${attempt + 1}), processing ${elicitations.length} elicitation(s)`, ) // Process each URL elicitation from the error. // The completion notification handler (in registerElicitationHandler) sets // `completed: true` on the matching queue event; the dialog reacts to this flag. for (const elicitation of elicitations) { const { elicitationId } = elicitation // Run elicitation hooks — they can resolve URL elicitations programmatically const hookResponse = await runElicitationHooks( serverName, elicitation, signal, ) if (hookResponse) { logMCPDebug( serverName, `URL elicitation ${elicitationId} resolved by hook: ${jsonStringify(hookResponse)}`, ) if (hookResponse.action !== 'accept') { return { content: `URL elicitation was ${hookResponse.action === 'decline' ? 'declined' : hookResponse.action + 'ed'} by a hook. The tool "${tool}" could not complete because it requires the user to open a URL.`, } } // Hook accepted — skip the UI and proceed to retry continue } // Resolve the URL elicitation via callback (print/SDK mode) or queue (REPL mode). let userResult: ElicitResult if (handleElicitation) { // Print/SDK mode: delegate to structuredIO which sends a control request userResult = await handleElicitation(serverName, elicitation, signal) } else { // REPL mode: queue for ElicitationDialog with two-phase consent/waiting flow const waitingState: ElicitationWaitingState = { actionLabel: 'Retry now', showCancel: true, } userResult = await new Promise(resolve => { const onAbort = () => { void resolve({ action: 'cancel' }) } if (signal.aborted) { onAbort() return } signal.addEventListener('abort', onAbort, { once: true }) setAppState(prev => ({ ...prev, elicitation: { queue: [ ...prev.elicitation.queue, { serverName, requestId: `error-elicit-${elicitationId}`, params: elicitation, signal, waitingState, respond: result => { // Phase 1 consent: accept is a no-op (doesn't resolve retry Promise) if (result.action === 'accept') { return } // Decline or cancel: resolve the retry Promise signal.removeEventListener('abort', onAbort) void resolve(result) }, onWaitingDismiss: action => { signal.removeEventListener('abort', onAbort) if (action === 'retry') { void resolve({ action: 'accept' }) } else { void resolve({ action: 'cancel' }) } }, }, ], }, })) }) } // Run ElicitationResult hooks — they can modify or block the response const finalResult = await runElicitationResultHooks( serverName, userResult, signal, 'url', elicitationId, ) if (finalResult.action !== 'accept') { logMCPDebug( serverName, `User ${finalResult.action === 'decline' ? 'declined' : finalResult.action + 'ed'} URL elicitation ${elicitationId}`, ) return { content: `URL elicitation was ${finalResult.action === 'decline' ? 'declined' : finalResult.action + 'ed'} by the user. The tool "${tool}" could not complete because it requires the user to open a URL.`, } } logMCPDebug( serverName, `Elicitation ${elicitationId} completed, retrying tool call`, ) } // Loop back to retry the tool call } } } async function callMCPTool({ client: { client, name, config }, tool, args, meta, signal, onProgress, }: { client: ConnectedMCPServer tool: string args: Record meta?: Record signal: AbortSignal onProgress?: (data: MCPProgress) => void }): Promise<{ content: MCPToolResult _meta?: Record structuredContent?: Record }> { const toolStartTime = Date.now() let progressInterval: NodeJS.Timeout | undefined try { logMCPDebug(name, `Calling MCP tool: ${tool}`) // Set up progress logging for long-running tools (every 30 seconds) progressInterval = setInterval( (startTime, name, tool) => { const elapsed = Date.now() - startTime const elapsedSeconds = Math.floor(elapsed / 1000) const duration = `${elapsedSeconds}s` logMCPDebug(name, `Tool '${tool}' still running (${duration} elapsed)`) }, 30000, // Log every 30 seconds toolStartTime, name, tool, ) // Use Promise.race with our own timeout to handle cases where SDK's // internal timeout doesn't work (e.g., SSE stream breaks mid-request) const timeoutMs = getMcpToolTimeoutMs() let timeoutId: NodeJS.Timeout | undefined const timeoutPromise = new Promise((_, reject) => { timeoutId = setTimeout( (reject, name, tool, timeoutMs) => { reject( new TelemetrySafeError_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS( `MCP server "${name}" tool "${tool}" timed out after ${Math.floor(timeoutMs / 1000)}s`, 'MCP tool timeout', ), ) }, timeoutMs, reject, name, tool, timeoutMs, ) }) const result = await Promise.race([ client.callTool( { name: tool, arguments: args, _meta: meta, }, CallToolResultSchema, { signal, timeout: timeoutMs, onprogress: onProgress ? sdkProgress => { onProgress({ type: 'mcp_progress', status: 'progress', serverName: name, toolName: tool, progress: sdkProgress.progress, total: sdkProgress.total, progressMessage: sdkProgress.message, }) } : undefined, }, ), timeoutPromise, ]).finally(() => { if (timeoutId) { clearTimeout(timeoutId) } }) if ('isError' in result && result.isError) { let errorDetails = 'Unknown error' if ( 'content' in result && Array.isArray(result.content) && result.content.length > 0 ) { const firstContent = result.content[0] if ( firstContent && typeof firstContent === 'object' && 'text' in firstContent ) { errorDetails = firstContent.text } } else if ('error' in result) { // Fallback for legacy error format errorDetails = String(result.error) } logMCPError(name, errorDetails) throw new McpToolCallError_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS( errorDetails, 'MCP tool returned error', '_meta' in result && result._meta ? { _meta: result._meta } : undefined, ) } const elapsed = Date.now() - toolStartTime const duration = elapsed < 1000 ? `${elapsed}ms` : elapsed < 60000 ? `${Math.floor(elapsed / 1000)}s` : `${Math.floor(elapsed / 60000)}m ${Math.floor((elapsed % 60000) / 1000)}s` logMCPDebug(name, `Tool '${tool}' completed successfully in ${duration}`) // Log code indexing tool usage const codeIndexingTool = detectCodeIndexingFromMcpServerName(name) if (codeIndexingTool) { logEvent('tengu_code_indexing_tool_used', { tool: codeIndexingTool as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, source: 'mcp' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, success: true, }) } const content = await processMCPResult(result, tool, name) return { content, _meta: result._meta as Record | undefined, structuredContent: result.structuredContent as | Record | undefined, } } catch (e) { // Clear intervals on error if (progressInterval !== undefined) { clearInterval(progressInterval) } const elapsed = Date.now() - toolStartTime if (e instanceof Error && e.name !== 'AbortError') { logMCPDebug( name, `Tool '${tool}' failed after ${Math.floor(elapsed / 1000)}s: ${e.message}`, ) } // Check for 401 errors indicating expired/invalid OAuth tokens // The MCP SDK's StreamableHTTPError has a `code` property with the HTTP status if (e instanceof Error) { const errorCode = 'code' in e ? (e.code as number | undefined) : undefined if (errorCode === 401 || e instanceof UnauthorizedError) { logMCPDebug( name, `Tool call returned 401 Unauthorized - token may have expired`, ) logEvent('tengu_mcp_tool_call_auth_error', {}) throw new McpAuthError( name, `MCP server "${name}" requires re-authorization (token expired)`, ) } // Check for session expiry — two error shapes can surface here: // 1. Direct 404 + JSON-RPC -32001 from the server (StreamableHTTPError) // 2. -32000 "Connection closed" (McpError) — the SDK closes the transport // after the onerror handler fires, so the pending callTool() rejects // with this derived error instead of the original 404. // In both cases, clear the connection cache so the next tool call // creates a fresh session. const isSessionExpired = isMcpSessionExpiredError(e) const isConnectionClosedOnHttp = 'code' in e && (e as Error & { code?: number }).code === -32000 && e.message.includes('Connection closed') && (config.type === 'http' || config.type === 'claudeai-proxy') if (isSessionExpired || isConnectionClosedOnHttp) { logMCPDebug( name, `MCP session expired during tool call (${isSessionExpired ? '404/-32001' : 'connection closed'}), clearing connection cache for re-initialization`, ) logEvent('tengu_mcp_session_expired', {}) await clearServerCache(name, config) throw new McpSessionExpiredError(name) } } // When the users hits esc, avoid logspew if (!(e instanceof Error) || e.name !== 'AbortError') { throw e } return { content: undefined } } finally { // Always clear intervals if (progressInterval !== undefined) { clearInterval(progressInterval) } } } function extractToolUseId(message: AssistantMessage): string | undefined { if (message.message.content[0]?.type !== 'tool_use') { return undefined } return message.message.content[0].id } /** * Sets up SDK MCP clients by creating transports and connecting them. * This is used for SDK MCP servers that run in the same process as the SDK. * * @param sdkMcpConfigs - The SDK MCP server configurations * @param sendMcpMessage - Callback to send MCP messages through the control channel * @returns Connected clients, their tools, and transport map for message routing */ export async function setupSdkMcpClients( sdkMcpConfigs: Record, sendMcpMessage: ( serverName: string, message: JSONRPCMessage, ) => Promise, ): Promise<{ clients: MCPServerConnection[] tools: Tool[] }> { const clients: MCPServerConnection[] = [] const tools: Tool[] = [] // Connect to all servers in parallel const results = await Promise.allSettled( Object.entries(sdkMcpConfigs).map(async ([name, config]) => { const transport = new SdkControlClientTransport(name, sendMcpMessage) const client = new Client( { name: 'claude-code', title: 'Claude Code', version: MACRO.VERSION ?? 'unknown', description: "Anthropic's agentic coding tool", websiteUrl: PRODUCT_URL, }, { capabilities: {}, }, ) try { // Connect the client await client.connect(transport) // Get capabilities from the server const capabilities = client.getServerCapabilities() // Create the connected client object const connectedClient: MCPServerConnection = { type: 'connected', name, capabilities: capabilities || {}, client, config: { ...config, scope: 'dynamic' as const }, cleanup: async () => { await client.close() }, } // Fetch tools if the server has them const serverTools: Tool[] = [] if (capabilities?.tools) { const sdkTools = await fetchToolsForClient(connectedClient) serverTools.push(...sdkTools) } return { client: connectedClient, tools: serverTools, } } catch (error) { // If connection fails, return failed server logMCPError( name, `Failed to connect SDK MCP server: ${summarizeMcpErrorForDebug( error, )}`, ) return { client: { type: 'failed' as const, name, config: { ...config, scope: 'user' as const }, }, tools: [], } } }), ) // Process results and collect clients and tools for (const result of results) { if (result.status === 'fulfilled') { clients.push(result.value.client) tools.push(...result.value.tools) } // If rejected (unexpected), the error was already logged inside the promise } return { clients, tools } }