Tighten bridge and teleport debug redaction

This commit is contained in:
2026-04-04 11:09:07 +08:00
parent 5149320afd
commit 86e7dbd1ab
8 changed files with 447 additions and 111 deletions

View File

@@ -55,6 +55,36 @@ export class BridgeFatalError extends Error {
}
}
function summarizeBridgeApiPayloadForDebug(data: unknown): string {
if (data === null) return 'null'
if (data === undefined) return 'undefined'
if (Array.isArray(data)) {
return debugBody({
type: 'array',
length: data.length,
})
}
if (typeof data !== 'object') {
return String(data)
}
const value = data as Record<string, unknown>
const workData =
value.data && typeof value.data === 'object'
? (value.data as Record<string, unknown>)
: undefined
return debugBody({
type: 'object',
keys: Object.keys(value)
.sort()
.slice(0, 10),
hasEnvironmentId: typeof value.environment_id === 'string',
hasEnvironmentSecret: typeof value.environment_secret === 'string',
hasWorkId: typeof value.id === 'string',
workType: typeof workData?.type === 'string' ? workData.type : undefined,
hasSessionId: typeof workData?.id === 'string',
})
}
export function createBridgeApiClient(deps: BridgeApiDeps): BridgeApiClient {
function debug(msg: string): void {
deps.onDebug?.(msg)
@@ -168,12 +198,14 @@ export function createBridgeApiClient(deps: BridgeApiDeps): BridgeApiClient {
handleErrorStatus(response.status, response.data, 'Registration')
debug(
`[bridge:api] POST /v1/environments/bridge -> ${response.status} environment_id=${response.data.environment_id}`,
`[bridge:api] POST /v1/environments/bridge -> ${response.status}`,
)
debug(
`[bridge:api] >>> ${debugBody({ max_sessions: config.maxSessions, metadata: { worker_type: config.workerType } })}`,
)
debug(`[bridge:api] <<< ${debugBody(response.data)}`)
debug(
`[bridge:api] <<< ${summarizeBridgeApiPayloadForDebug(response.data)}`,
)
return response.data
},
@@ -221,9 +253,11 @@ export function createBridgeApiClient(deps: BridgeApiDeps): BridgeApiClient {
}
debug(
`[bridge:api] GET .../work/poll -> ${response.status} workId=${response.data.id} type=${response.data.data?.type}${response.data.data?.id ? ` sessionId=${response.data.data.id}` : ''}`,
`[bridge:api] GET .../work/poll -> ${response.status} type=${response.data.data?.type ?? 'unknown'}`,
)
debug(
`[bridge:api] <<< ${summarizeBridgeApiPayloadForDebug(response.data)}`,
)
debug(`[bridge:api] <<< ${debugBody(response.data)}`)
return response.data
},
@@ -427,7 +461,9 @@ export function createBridgeApiClient(deps: BridgeApiDeps): BridgeApiClient {
`[bridge:api] POST /v1/sessions/${sessionId}/events -> ${response.status}`,
)
debug(`[bridge:api] >>> ${debugBody({ events: [event] })}`)
debug(`[bridge:api] <<< ${debugBody(response.data)}`)
debug(
`[bridge:api] <<< ${summarizeBridgeApiPayloadForDebug(response.data)}`,
)
},
}
}

View File

@@ -9,7 +9,7 @@
import axios from 'axios'
import { logForDebugging } from '../utils/debug.js'
import { errorMessage } from '../utils/errors.js'
import { toError } from '../utils/errors.js'
import { jsonStringify } from '../utils/slowOperations.js'
import { extractErrorDetail } from './debugUtils.js'
@@ -23,6 +23,62 @@ function oauthHeaders(accessToken: string): Record<string, string> {
}
}
function summarizeCodeSessionResponseForDebug(data: unknown): string {
if (data === null) return 'null'
if (data === undefined) return 'undefined'
if (Array.isArray(data)) {
return jsonStringify({
payloadType: 'array',
length: data.length,
})
}
if (typeof data === 'object') {
const value = data as Record<string, unknown>
const session =
value.session && typeof value.session === 'object'
? (value.session as Record<string, unknown>)
: undefined
return jsonStringify({
payloadType: 'object',
keys: Object.keys(value)
.sort()
.slice(0, 10),
hasSession: Boolean(session),
hasSessionId: typeof session?.id === 'string',
hasWorkerJwt: typeof value.worker_jwt === 'string',
hasApiBaseUrl: typeof value.api_base_url === 'string',
hasExpiresIn: typeof value.expires_in === 'number',
hasWorkerEpoch:
typeof value.worker_epoch === 'number' ||
typeof value.worker_epoch === 'string',
})
}
return typeof data
}
function summarizeCodeSessionErrorForDebug(err: unknown): string {
const error = toError(err)
const summary: Record<string, unknown> = {
errorType: error.constructor.name,
errorName: error.name,
hasMessage: error.message.length > 0,
hasStack: Boolean(error.stack),
}
if (err && typeof err === 'object') {
const errorObj = err as Record<string, unknown>
if (typeof errorObj.code === 'string' || typeof errorObj.code === 'number') {
summary.code = errorObj.code
}
if (errorObj.response && typeof errorObj.response === 'object') {
const response = errorObj.response as Record<string, unknown>
if (typeof response.status === 'number') {
summary.httpStatus = response.status
}
}
}
return jsonStringify(summary)
}
export async function createCodeSession(
baseUrl: string,
accessToken: string,
@@ -47,7 +103,9 @@ export async function createCodeSession(
)
} catch (err: unknown) {
logForDebugging(
`[code-session] Session create request failed: ${errorMessage(err)}`,
`[code-session] Session create request failed: ${summarizeCodeSessionErrorForDebug(
err,
)}`,
)
return null
}
@@ -72,7 +130,9 @@ export async function createCodeSession(
!data.session.id.startsWith('cse_')
) {
logForDebugging(
`[code-session] No session.id (cse_*) in response: ${jsonStringify(data).slice(0, 200)}`,
`[code-session] No session.id (cse_*) in response: ${summarizeCodeSessionResponseForDebug(
data,
)}`,
)
return null
}
@@ -110,7 +170,9 @@ export async function fetchRemoteCredentials(
)
} catch (err: unknown) {
logForDebugging(
`[code-session] /bridge request failed: ${errorMessage(err)}`,
`[code-session] /bridge request failed: ${summarizeCodeSessionErrorForDebug(
err,
)}`,
)
return null
}
@@ -136,7 +198,9 @@ export async function fetchRemoteCredentials(
!('worker_epoch' in data)
) {
logForDebugging(
`[code-session] /bridge response malformed (need worker_jwt, expires_in, api_base_url, worker_epoch): ${jsonStringify(data).slice(0, 200)}`,
`[code-session] /bridge response malformed (need worker_jwt, expires_in, api_base_url, worker_epoch): ${summarizeCodeSessionResponseForDebug(
data,
)}`,
)
return null
}

View File

@@ -21,15 +21,10 @@ const SECRET_PATTERN = new RegExp(
'g',
)
const REDACT_MIN_LENGTH = 16
export function redactSecrets(s: string): string {
return s.replace(SECRET_PATTERN, (_match, field: string, value: string) => {
if (value.length < REDACT_MIN_LENGTH) {
return `"${field}":"[REDACTED]"`
}
const redacted = `${value.slice(0, 8)}...${value.slice(-4)}`
return `"${field}":"${redacted}"`
void value
return `"${field}":"[REDACTED]"`
})
}
@@ -52,6 +47,73 @@ export function debugBody(data: unknown): string {
return s.slice(0, DEBUG_MSG_LIMIT) + `... (${s.length} chars)`
}
function summarizeValueShapeForDebug(value: unknown): unknown {
if (value === null) return 'null'
if (value === undefined) return 'undefined'
if (Array.isArray(value)) {
return {
type: 'array',
length: value.length,
}
}
if (typeof value === 'object') {
return {
type: 'object',
keys: Object.keys(value as Record<string, unknown>)
.sort()
.slice(0, 10),
}
}
return typeof value
}
export function summarizeBridgeErrorForDebug(err: unknown): string {
const summary: Record<string, unknown> = {}
if (err instanceof Error) {
summary.errorType = err.constructor.name
summary.errorName = err.name
summary.hasMessage = err.message.length > 0
summary.hasStack = Boolean(err.stack)
} else {
summary.errorType = typeof err
summary.hasValue = err !== undefined && err !== null
}
if (err && typeof err === 'object') {
const errorObj = err as Record<string, unknown>
if (
typeof errorObj.code === 'string' ||
typeof errorObj.code === 'number'
) {
summary.code = errorObj.code
}
if (
typeof errorObj.errno === 'string' ||
typeof errorObj.errno === 'number'
) {
summary.errno = errorObj.errno
}
if (typeof errorObj.status === 'number') {
summary.status = errorObj.status
}
if (typeof errorObj.syscall === 'string') {
summary.syscall = errorObj.syscall
}
if (errorObj.response && typeof errorObj.response === 'object') {
const response = errorObj.response as Record<string, unknown>
if (typeof response.status === 'number') {
summary.httpStatus = response.status
}
if ('data' in response) {
summary.responseData = summarizeValueShapeForDebug(response.data)
}
}
}
return jsonStringify(summary)
}
/**
* Extract a descriptive error message from an axios error (or any error).
* For HTTP errors, appends the server's response body message if available,

View File

@@ -107,7 +107,7 @@ export function createTokenRefreshScheduler({
// (such as the follow-up refresh set by doRefresh) so the refresh
// chain is not broken.
logForDebugging(
`[${label}:token] Could not decode JWT expiry for sessionId=${sessionId}, token prefix=${token.slice(0, 15)}…, keeping existing timer`,
`[${label}:token] Could not decode JWT expiry for sessionId=${sessionId}, keeping existing timer`,
)
return
}
@@ -209,7 +209,7 @@ export function createTokenRefreshScheduler({
failureCounts.delete(sessionId)
logForDebugging(
`[${label}:token] Refreshing token for sessionId=${sessionId}: new token prefix=${oauthToken.slice(0, 15)}`,
`[${label}:token] Refreshing token for sessionId=${sessionId}`,
)
logEvent('tengu_bridge_token_refreshed', {})
onRefresh(sessionId, oauthToken)

View File

@@ -50,7 +50,10 @@ import {
extractTitleText,
BoundedUUIDSet,
} from './bridgeMessaging.js'
import { logBridgeSkip } from './debugUtils.js'
import {
logBridgeSkip,
summarizeBridgeErrorForDebug,
} from './debugUtils.js'
import { logForDebugging } from '../utils/debug.js'
import { logForDiagnosticsNoPII } from '../utils/diagLogs.js'
import { isInProtectedNamespace } from '../utils/envUtils.js'
@@ -235,10 +238,12 @@ export async function initEnvLessBridgeCore(
})
} catch (err) {
logForDebugging(
`[remote-bridge] v2 transport setup failed: ${errorMessage(err)}`,
`[remote-bridge] v2 transport setup failed: ${summarizeBridgeErrorForDebug(
err,
)}`,
{ level: 'error' },
)
onStateChange?.('failed', `Transport setup failed: ${errorMessage(err)}`)
onStateChange?.('failed', 'Transport setup failed')
logBridgeSkip('v2_transport_setup_failed', undefined, true)
void archiveSession(
sessionId,
@@ -356,7 +361,9 @@ export async function initEnvLessBridgeCore(
)
} catch (err) {
logForDebugging(
`[remote-bridge] Proactive refresh rebuild failed: ${errorMessage(err)}`,
`[remote-bridge] Proactive refresh rebuild failed: ${summarizeBridgeErrorForDebug(
err,
)}`,
{ level: 'error' },
)
logForDiagnosticsNoPII(
@@ -364,7 +371,7 @@ export async function initEnvLessBridgeCore(
'bridge_repl_v2_proactive_refresh_failed',
)
if (!tornDown) {
onStateChange?.('failed', `Refresh failed: ${errorMessage(err)}`)
onStateChange?.('failed', 'Refresh failed')
}
} finally {
authRecoveryInFlight = false
@@ -394,9 +401,13 @@ export async function initEnvLessBridgeCore(
// (Same guard pattern as replBridge.ts:1119.)
const flushTransport = transport
void flushHistory(initialMessages)
.catch(e =>
logForDebugging(`[remote-bridge] flushHistory failed: ${e}`),
)
.catch(e => {
logForDebugging(
`[remote-bridge] flushHistory failed: ${summarizeBridgeErrorForDebug(
e,
)}`,
)
})
.finally(() => {
// authRecoveryInFlight catches the v1-vs-v2 asymmetry: v1 nulls
// transport synchronously in setOnClose (replBridge.ts:1175), so
@@ -576,12 +587,14 @@ export async function initEnvLessBridgeCore(
logForDebugging('[remote-bridge] Transport rebuilt after 401')
} catch (err) {
logForDebugging(
`[remote-bridge] 401 recovery failed: ${errorMessage(err)}`,
`[remote-bridge] 401 recovery failed: ${summarizeBridgeErrorForDebug(
err,
)}`,
{ level: 'error' },
)
logForDiagnosticsNoPII('error', 'bridge_repl_v2_jwt_refresh_failed')
if (!tornDown) {
onStateChange?.('failed', `JWT refresh failed: ${errorMessage(err)}`)
onStateChange?.('failed', 'JWT refresh failed')
}
} finally {
authRecoveryInFlight = false
@@ -706,7 +719,9 @@ export async function initEnvLessBridgeCore(
)
} catch (err) {
logForDebugging(
`[remote-bridge] Teardown 401 retry threw: ${errorMessage(err)}`,
`[remote-bridge] Teardown 401 retry threw: ${summarizeBridgeErrorForDebug(
err,
)}`,
{ level: 'error' },
)
}
@@ -823,7 +838,7 @@ export async function initEnvLessBridgeCore(
sendControlRequest(request: SDKControlRequest) {
if (authRecoveryInFlight) {
logForDebugging(
`[remote-bridge] Dropping control_request during 401 recovery: ${request.request_id}`,
'[remote-bridge] Dropping control_request during 401 recovery',
)
return
}
@@ -832,9 +847,7 @@ export async function initEnvLessBridgeCore(
transport.reportState('requires_action')
}
void transport.write(event)
logForDebugging(
`[remote-bridge] Sent control_request request_id=${request.request_id}`,
)
logForDebugging('[remote-bridge] Sent control_request')
},
sendControlResponse(response: SDKControlResponse) {
if (authRecoveryInFlight) {
@@ -851,7 +864,7 @@ export async function initEnvLessBridgeCore(
sendControlCancelRequest(requestId: string) {
if (authRecoveryInFlight) {
logForDebugging(
`[remote-bridge] Dropping control_cancel_request during 401 recovery: ${requestId}`,
'[remote-bridge] Dropping control_cancel_request during 401 recovery',
)
return
}
@@ -865,9 +878,7 @@ export async function initEnvLessBridgeCore(
// those paths, so without this the server stays on requires_action.
transport.reportState('running')
void transport.write(event)
logForDebugging(
`[remote-bridge] Sent control_cancel_request request_id=${requestId}`,
)
logForDebugging('[remote-bridge] Sent control_cancel_request')
},
sendResult() {
if (authRecoveryInFlight) {
@@ -876,7 +887,7 @@ export async function initEnvLessBridgeCore(
}
transport.reportState('idle')
void transport.write(makeResultMessage(sessionId))
logForDebugging(`[remote-bridge] Sent result`)
logForDebugging('[remote-bridge] Sent result')
},
async teardown() {
unregister()
@@ -992,12 +1003,13 @@ async function archiveSession(
},
)
logForDebugging(
`[remote-bridge] Archive ${compatId} status=${response.status}`,
`[remote-bridge] Archive status=${response.status}`,
)
return response.status
} catch (err) {
const msg = errorMessage(err)
logForDebugging(`[remote-bridge] Archive failed: ${msg}`)
logForDebugging(
`[remote-bridge] Archive failed: ${summarizeBridgeErrorForDebug(err)}`,
)
return axios.isAxiosError(err) && err.code === 'ECONNABORTED'
? 'timeout'
: 'error'

View File

@@ -43,6 +43,7 @@ import {
describeAxiosError,
extractHttpStatus,
logBridgeSkip,
summarizeBridgeErrorForDebug,
} from './debugUtils.js'
import type { Message } from '../types/message.js'
import type { SDKMessage } from '../entrypoints/agentSdkTypes.ts'
@@ -303,7 +304,7 @@ export async function initBridgeCore(
const prior = rawPrior?.source === 'repl' ? rawPrior : null
logForDebugging(
`[bridge:repl] initBridgeCore #${seq} starting (initialMessages=${initialMessages?.length ?? 0}${prior ? ` perpetual prior=env:${prior.environmentId}` : ''})`,
`[bridge:repl] initBridgeCore #${seq} starting (initialMessages=${initialMessages?.length ?? 0}${prior ? ' perpetual prior pointer present' : ''})`,
)
// 5. Register bridge environment
@@ -342,7 +343,9 @@ export async function initBridgeCore(
} catch (err) {
logBridgeSkip(
'registration_failed',
`[bridge:repl] Environment registration failed: ${errorMessage(err)}`,
`[bridge:repl] Environment registration failed: ${summarizeBridgeErrorForDebug(
err,
)}`,
)
// Stale pointer may be the cause (expired/deleted env) — clear it so
// the next start doesn't retry the same dead ID.
@@ -353,7 +356,7 @@ export async function initBridgeCore(
return null
}
logForDebugging(`[bridge:repl] Environment registered: ${environmentId}`)
logForDebugging('[bridge:repl] Environment registered')
logForDiagnosticsNoPII('info', 'bridge_repl_env_registered')
logEvent('tengu_bridge_repl_env_registered', {})
@@ -371,7 +374,7 @@ export async function initBridgeCore(
): Promise<boolean> {
if (environmentId !== requestedEnvId) {
logForDebugging(
`[bridge:repl] Env mismatch (requested ${requestedEnvId}, got ${environmentId}) — cannot reconnect in place`,
'[bridge:repl] Env mismatch — cannot reconnect in place',
)
return false
}
@@ -389,13 +392,13 @@ export async function initBridgeCore(
for (const id of candidates) {
try {
await api.reconnectSession(environmentId, id)
logForDebugging(
`[bridge:repl] Reconnected session ${id} in place on env ${environmentId}`,
)
logForDebugging('[bridge:repl] Reconnected existing session in place')
return true
} catch (err) {
logForDebugging(
`[bridge:repl] reconnectSession(${id}) failed: ${errorMessage(err)}`,
`[bridge:repl] reconnectSession failed: ${summarizeBridgeErrorForDebug(
err,
)}`,
)
}
}
@@ -679,7 +682,9 @@ export async function initBridgeCore(
} catch (err) {
bridgeConfig.reuseEnvironmentId = undefined
logForDebugging(
`[bridge:repl] Environment re-registration failed: ${errorMessage(err)}`,
`[bridge:repl] Environment re-registration failed: ${summarizeBridgeErrorForDebug(
err,
)}`,
)
return false
}
@@ -688,7 +693,7 @@ export async function initBridgeCore(
bridgeConfig.reuseEnvironmentId = undefined
logForDebugging(
`[bridge:repl] Re-registered: requested=${requestedEnvId} got=${environmentId}`,
'[bridge:repl] Re-registered environment',
)
// Bail out if teardown started while we were registering
@@ -984,7 +989,7 @@ export async function initBridgeCore(
injectFault: injectBridgeFault,
wakePollLoop,
describe: () =>
`env=${environmentId} session=${currentSessionId} transport=${transport?.getStateLabel() ?? 'null'} workId=${currentWorkId ?? 'null'}`,
`transport=${transport?.getStateLabel() ?? 'null'} hasSession=${Boolean(currentSessionId)} hasWork=${Boolean(currentWorkId)}`,
})
}
@@ -1038,7 +1043,9 @@ export async function initBridgeCore(
.stopWork(environmentId, currentWorkId, false)
.catch((e: unknown) => {
logForDebugging(
`[bridge:repl] stopWork after heartbeat fatal: ${errorMessage(e)}`,
`[bridge:repl] stopWork after heartbeat fatal: ${summarizeBridgeErrorForDebug(
e,
)}`,
)
})
}
@@ -1365,7 +1372,7 @@ export async function initBridgeCore(
const sessionUrl = buildCCRv2SdkUrl(baseUrl, workSessionId)
const thisGen = v2Generation
logForDebugging(
`[bridge:repl] CCR v2: creating transport for session=${workSessionId} gen=${thisGen}`,
`[bridge:repl] CCR v2: creating transport gen=${thisGen}`,
)
void createV2ReplTransport({
sessionUrl,
@@ -1399,7 +1406,9 @@ export async function initBridgeCore(
},
(err: unknown) => {
logForDebugging(
`[bridge:repl] CCR v2: createV2ReplTransport failed: ${errorMessage(err)}`,
`[bridge:repl] CCR v2: createV2ReplTransport failed: ${summarizeBridgeErrorForDebug(
err,
)}`,
{ level: 'error' },
)
logEvent('tengu_bridge_repl_ccr_v2_init_failed', {})
@@ -1414,7 +1423,9 @@ export async function initBridgeCore(
.stopWork(environmentId, currentWorkId, false)
.catch((e: unknown) => {
logForDebugging(
`[bridge:repl] stopWork after v2 init failure: ${errorMessage(e)}`,
`[bridge:repl] stopWork after v2 init failure: ${summarizeBridgeErrorForDebug(
e,
)}`,
)
})
currentWorkId = null
@@ -1436,9 +1447,7 @@ export async function initBridgeCore(
// WS reconnect attempt.
const wsUrl = buildSdkUrl(sessionIngressUrl, workSessionId)
logForDebugging('[bridge:repl] Using session ingress WebSocket endpoint')
logForDebugging(
`[bridge:repl] Creating HybridTransport: session=${workSessionId}`,
)
logForDebugging('[bridge:repl] Creating HybridTransport')
// v1OauthToken was validated non-null above (we'd have returned early).
const oauthToken = v1OauthToken ?? ''
wireTransport(
@@ -1523,7 +1532,9 @@ export async function initBridgeCore(
logForDebugging('[bridge:repl] keep_alive sent')
void transport.write({ type: 'keep_alive' }).catch((err: unknown) => {
logForDebugging(
`[bridge:repl] keep_alive write failed: ${errorMessage(err)}`,
`[bridge:repl] keep_alive write failed: ${summarizeBridgeErrorForDebug(
err,
)}`,
)
})
}, keepAliveIntervalMs)
@@ -1536,15 +1547,13 @@ export async function initBridgeCore(
doTeardownImpl = async (): Promise<void> => {
if (teardownStarted) {
logForDebugging(
`[bridge:repl] Teardown already in progress, skipping duplicate call env=${environmentId} session=${currentSessionId}`,
'[bridge:repl] Teardown already in progress, skipping duplicate call',
)
return
}
teardownStarted = true
const teardownStart = Date.now()
logForDebugging(
`[bridge:repl] Teardown starting: env=${environmentId} session=${currentSessionId} workId=${currentWorkId ?? 'none'} transportState=${transport?.getStateLabel() ?? 'null'}`,
)
logForDebugging('[bridge:repl] Teardown starting')
if (pointerRefreshTimer !== null) {
clearInterval(pointerRefreshTimer)
@@ -1593,7 +1602,7 @@ export async function initBridgeCore(
source: 'repl',
})
logForDebugging(
`[bridge:repl] Teardown (perpetual): leaving env=${environmentId} session=${currentSessionId} alive on server, duration=${Date.now() - teardownStart}ms`,
`[bridge:repl] Teardown (perpetual): leaving bridge session alive on server, duration=${Date.now() - teardownStart}ms`,
)
return
}
@@ -1619,7 +1628,9 @@ export async function initBridgeCore(
})
.catch((err: unknown) => {
logForDebugging(
`[bridge:repl] Teardown stopWork failed: ${errorMessage(err)}`,
`[bridge:repl] Teardown stopWork failed: ${summarizeBridgeErrorForDebug(
err,
)}`,
)
})
: Promise.resolve()
@@ -1636,7 +1647,9 @@ export async function initBridgeCore(
await api.deregisterEnvironment(environmentId).catch((err: unknown) => {
logForDebugging(
`[bridge:repl] Teardown deregister failed: ${errorMessage(err)}`,
`[bridge:repl] Teardown deregister failed: ${summarizeBridgeErrorForDebug(
err,
)}`,
)
})
@@ -1646,16 +1659,14 @@ export async function initBridgeCore(
await clearBridgePointer(dir)
logForDebugging(
`[bridge:repl] Teardown complete: env=${environmentId} duration=${Date.now() - teardownStart}ms`,
`[bridge:repl] Teardown complete: duration=${Date.now() - teardownStart}ms`,
)
}
// 8. Register cleanup for graceful shutdown
const unregister = registerCleanup(() => doTeardownImpl?.())
logForDebugging(
`[bridge:repl] Ready: env=${environmentId} session=${currentSessionId}`,
)
logForDebugging('[bridge:repl] Ready')
onStateChange?.('ready')
return {
@@ -1713,7 +1724,7 @@ export async function initBridgeCore(
if (!transport) {
const types = filtered.map(m => m.type).join(',')
logForDebugging(
`[bridge:repl] Transport not configured, dropping ${filtered.length} message(s) [${types}] for session=${currentSessionId}`,
`[bridge:repl] Transport not configured, dropping ${filtered.length} message(s) [${types}]`,
{ level: 'warn' },
)
return
@@ -1748,7 +1759,7 @@ export async function initBridgeCore(
if (filtered.length === 0) return
if (!transport) {
logForDebugging(
`[bridge:repl] Transport not configured, dropping ${filtered.length} SDK message(s) for session=${currentSessionId}`,
`[bridge:repl] Transport not configured, dropping ${filtered.length} SDK message(s)`,
{ level: 'warn' },
)
return
@@ -1768,9 +1779,7 @@ export async function initBridgeCore(
}
const event = { ...request, session_id: currentSessionId }
void transport.write(event)
logForDebugging(
`[bridge:repl] Sent control_request request_id=${request.request_id}`,
)
logForDebugging('[bridge:repl] Sent control_request')
},
sendControlResponse(response: SDKControlResponse) {
if (!transport) {
@@ -1796,21 +1805,17 @@ export async function initBridgeCore(
session_id: currentSessionId,
}
void transport.write(event)
logForDebugging(
`[bridge:repl] Sent control_cancel_request request_id=${requestId}`,
)
logForDebugging('[bridge:repl] Sent control_cancel_request')
},
sendResult() {
if (!transport) {
logForDebugging(
`[bridge:repl] sendResult: skipping, transport not configured session=${currentSessionId}`,
'[bridge:repl] sendResult: skipping, transport not configured',
)
return
}
void transport.write(makeResultMessage(currentSessionId))
logForDebugging(
`[bridge:repl] Sent result for session=${currentSessionId}`,
)
logForDebugging('[bridge:repl] Sent result')
},
async teardown() {
unregister()
@@ -1903,7 +1908,7 @@ async function startWorkPollLoop({
const MAX_ENVIRONMENT_RECREATIONS = 3
logForDebugging(
`[bridge:repl] Starting work poll loop for env=${getCredentials().environmentId}`,
'[bridge:repl] Starting work poll loop',
)
let consecutiveErrors = 0
@@ -2006,7 +2011,9 @@ async function startWorkPollLoop({
)
} catch (err) {
logForDebugging(
`[bridge:repl:heartbeat] Failed: ${errorMessage(err)}`,
`[bridge:repl:heartbeat] Failed: ${summarizeBridgeErrorForDebug(
err,
)}`,
)
if (err instanceof BridgeFatalError) {
cap.cleanup()
@@ -2124,7 +2131,9 @@ async function startWorkPollLoop({
secret = decodeWorkSecret(work.secret)
} catch (err) {
logForDebugging(
`[bridge:repl] Failed to decode work secret: ${errorMessage(err)}`,
`[bridge:repl] Failed to decode work secret: ${summarizeBridgeErrorForDebug(
err,
)}`,
)
logEvent('tengu_bridge_repl_work_secret_failed', {})
// Can't ack (needs the JWT we failed to decode). stopWork uses OAuth.
@@ -2135,12 +2144,14 @@ async function startWorkPollLoop({
// Explicitly acknowledge to prevent redelivery. Non-fatal on failure:
// server re-delivers, and the onWorkReceived callback handles dedup.
logForDebugging(`[bridge:repl] Acknowledging workId=${work.id}`)
logForDebugging('[bridge:repl] Acknowledging work item')
try {
await api.acknowledgeWork(envId, work.id, secret.session_ingress_token)
} catch (err) {
logForDebugging(
`[bridge:repl] Acknowledge failed workId=${work.id}: ${errorMessage(err)}`,
`[bridge:repl] Acknowledge failed: ${summarizeBridgeErrorForDebug(
err,
)}`,
)
}
@@ -2192,7 +2203,7 @@ async function startWorkPollLoop({
const currentEnvId = getCredentials().environmentId
if (envId !== currentEnvId) {
logForDebugging(
`[bridge:repl] Stale poll error for old env=${envId}, current env=${currentEnvId} — skipping onEnvironmentLost`,
'[bridge:repl] Stale poll error for superseded environment — skipping onEnvironmentLost',
)
consecutiveErrors = 0
firstErrorTime = null
@@ -2238,9 +2249,7 @@ async function startWorkPollLoop({
consecutiveErrors = 0
firstErrorTime = null
onStateChange?.('ready')
logForDebugging(
`[bridge:repl] Re-registered environment: ${newCreds.environmentId}`,
)
logForDebugging('[bridge:repl] Re-registered environment')
continue
}
@@ -2376,7 +2385,7 @@ async function startWorkPollLoop({
}
logForDebugging(
`[bridge:repl] Work poll loop ended (aborted=${signal.aborted}) env=${getCredentials().environmentId}`,
`[bridge:repl] Work poll loop ended (aborted=${signal.aborted})`,
)
}

View File

@@ -2,6 +2,33 @@ import axios from 'axios'
import { jsonParse, jsonStringify } from '../utils/slowOperations.js'
import type { WorkSecret } from './types.js'
function summarizeRegisterWorkerResponseForDebug(data: unknown): string {
if (data === null) return 'null'
if (data === undefined) return 'undefined'
if (Array.isArray(data)) {
return jsonStringify({
payloadType: 'array',
length: data.length,
})
}
if (typeof data === 'object') {
const value = data as Record<string, unknown>
return jsonStringify({
payloadType: 'object',
keys: Object.keys(value)
.sort()
.slice(0, 10),
hasWorkerEpoch:
typeof value.worker_epoch === 'number' ||
typeof value.worker_epoch === 'string',
hasSessionIngressToken:
typeof value.session_ingress_token === 'string',
hasApiBaseUrl: typeof value.api_base_url === 'string',
})
}
return typeof data
}
/** Decode a base64url-encoded work secret and validate its version. */
export function decodeWorkSecret(secret: string): WorkSecret {
const json = Buffer.from(secret, 'base64url').toString('utf-8')
@@ -120,7 +147,9 @@ export async function registerWorker(
!Number.isSafeInteger(epoch)
) {
throw new Error(
`registerWorker: invalid worker_epoch in response: ${jsonStringify(response.data)}`,
`registerWorker: invalid worker_epoch in response: ${summarizeRegisterWorkerResponseForDebug(
response.data,
)}`,
)
}
return epoch

File diff suppressed because one or more lines are too long