chore: initialize recovered claude workspace

This commit is contained in:
2026-04-02 15:29:01 +08:00
commit a10efa3b4b
1940 changed files with 506426 additions and 0 deletions

View File

@@ -0,0 +1,38 @@
/**
* Shared analytics configuration
*
* Common logic for determining when analytics should be disabled
* across all analytics systems (Datadog, 1P)
*/
import { isEnvTruthy } from '../../utils/envUtils.js'
import { isTelemetryDisabled } from '../../utils/privacyLevel.js'
/**
* Check if analytics operations should be disabled
*
* Analytics is disabled in the following cases:
* - Test environment (NODE_ENV === 'test')
* - Third-party cloud providers (Bedrock/Vertex)
* - Privacy level is no-telemetry or essential-traffic
*/
export function isAnalyticsDisabled(): boolean {
return (
process.env.NODE_ENV === 'test' ||
isEnvTruthy(process.env.CLAUDE_CODE_USE_BEDROCK) ||
isEnvTruthy(process.env.CLAUDE_CODE_USE_VERTEX) ||
isEnvTruthy(process.env.CLAUDE_CODE_USE_FOUNDRY) ||
isTelemetryDisabled()
)
}
/**
* Check if the feedback survey should be suppressed.
*
* Unlike isAnalyticsDisabled(), this does NOT block on 3P providers
* (Bedrock/Vertex/Foundry). The survey is a local UI prompt with no
* transcript data — enterprise customers capture responses via OTEL.
*/
export function isFeedbackSurveyDisabled(): boolean {
return process.env.NODE_ENV === 'test' || isTelemetryDisabled()
}

View File

@@ -0,0 +1,307 @@
import axios from 'axios'
import { createHash } from 'crypto'
import memoize from 'lodash-es/memoize.js'
import { getOrCreateUserID } from '../../utils/config.js'
import { logError } from '../../utils/log.js'
import { getCanonicalName } from '../../utils/model/model.js'
import { getAPIProvider } from '../../utils/model/providers.js'
import { MODEL_COSTS } from '../../utils/modelCost.js'
import { isAnalyticsDisabled } from './config.js'
import { getEventMetadata } from './metadata.js'
const DATADOG_LOGS_ENDPOINT =
'https://http-intake.logs.us5.datadoghq.com/api/v2/logs'
const DATADOG_CLIENT_TOKEN = 'pubbbf48e6d78dae54bceaa4acf463299bf'
const DEFAULT_FLUSH_INTERVAL_MS = 15000
const MAX_BATCH_SIZE = 100
const NETWORK_TIMEOUT_MS = 5000
const DATADOG_ALLOWED_EVENTS = new Set([
'chrome_bridge_connection_succeeded',
'chrome_bridge_connection_failed',
'chrome_bridge_disconnected',
'chrome_bridge_tool_call_completed',
'chrome_bridge_tool_call_error',
'chrome_bridge_tool_call_started',
'chrome_bridge_tool_call_timeout',
'tengu_api_error',
'tengu_api_success',
'tengu_brief_mode_enabled',
'tengu_brief_mode_toggled',
'tengu_brief_send',
'tengu_cancel',
'tengu_compact_failed',
'tengu_exit',
'tengu_flicker',
'tengu_init',
'tengu_model_fallback_triggered',
'tengu_oauth_error',
'tengu_oauth_success',
'tengu_oauth_token_refresh_failure',
'tengu_oauth_token_refresh_success',
'tengu_oauth_token_refresh_lock_acquiring',
'tengu_oauth_token_refresh_lock_acquired',
'tengu_oauth_token_refresh_starting',
'tengu_oauth_token_refresh_completed',
'tengu_oauth_token_refresh_lock_releasing',
'tengu_oauth_token_refresh_lock_released',
'tengu_query_error',
'tengu_session_file_read',
'tengu_started',
'tengu_tool_use_error',
'tengu_tool_use_granted_in_prompt_permanent',
'tengu_tool_use_granted_in_prompt_temporary',
'tengu_tool_use_rejected_in_prompt',
'tengu_tool_use_success',
'tengu_uncaught_exception',
'tengu_unhandled_rejection',
'tengu_voice_recording_started',
'tengu_voice_toggled',
'tengu_team_mem_sync_pull',
'tengu_team_mem_sync_push',
'tengu_team_mem_sync_started',
'tengu_team_mem_entries_capped',
])
const TAG_FIELDS = [
'arch',
'clientType',
'errorType',
'http_status_range',
'http_status',
'kairosActive',
'model',
'platform',
'provider',
'skillMode',
'subscriptionType',
'toolName',
'userBucket',
'userType',
'version',
'versionBase',
]
function camelToSnakeCase(str: string): string {
return str.replace(/[A-Z]/g, letter => `_${letter.toLowerCase()}`)
}
type DatadogLog = {
ddsource: string
ddtags: string
message: string
service: string
hostname: string
[key: string]: unknown
}
let logBatch: DatadogLog[] = []
let flushTimer: NodeJS.Timeout | null = null
let datadogInitialized: boolean | null = null
async function flushLogs(): Promise<void> {
if (logBatch.length === 0) return
const logsToSend = logBatch
logBatch = []
try {
await axios.post(DATADOG_LOGS_ENDPOINT, logsToSend, {
headers: {
'Content-Type': 'application/json',
'DD-API-KEY': DATADOG_CLIENT_TOKEN,
},
timeout: NETWORK_TIMEOUT_MS,
})
} catch (error) {
logError(error)
}
}
function scheduleFlush(): void {
if (flushTimer) return
flushTimer = setTimeout(() => {
flushTimer = null
void flushLogs()
}, getFlushIntervalMs()).unref()
}
export const initializeDatadog = memoize(async (): Promise<boolean> => {
if (isAnalyticsDisabled()) {
datadogInitialized = false
return false
}
try {
datadogInitialized = true
return true
} catch (error) {
logError(error)
datadogInitialized = false
return false
}
})
/**
* Flush remaining Datadog logs and shut down.
* Called from gracefulShutdown() before process.exit() since
* forceExit() prevents the beforeExit handler from firing.
*/
export async function shutdownDatadog(): Promise<void> {
if (flushTimer) {
clearTimeout(flushTimer)
flushTimer = null
}
await flushLogs()
}
// NOTE: use via src/services/analytics/index.ts > logEvent
export async function trackDatadogEvent(
eventName: string,
properties: { [key: string]: boolean | number | undefined },
): Promise<void> {
if (process.env.NODE_ENV !== 'production') {
return
}
// Don't send events for 3P providers (Bedrock, Vertex, Foundry)
if (getAPIProvider() !== 'firstParty') {
return
}
// Fast path: use cached result if available to avoid await overhead
let initialized = datadogInitialized
if (initialized === null) {
initialized = await initializeDatadog()
}
if (!initialized || !DATADOG_ALLOWED_EVENTS.has(eventName)) {
return
}
try {
const metadata = await getEventMetadata({
model: properties.model,
betas: properties.betas,
})
// Destructure to avoid duplicate envContext (once nested, once flattened)
const { envContext, ...restMetadata } = metadata
const allData: Record<string, unknown> = {
...restMetadata,
...envContext,
...properties,
userBucket: getUserBucket(),
}
// Normalize MCP tool names to "mcp" for cardinality reduction
if (
typeof allData.toolName === 'string' &&
allData.toolName.startsWith('mcp__')
) {
allData.toolName = 'mcp'
}
// Normalize model names for cardinality reduction (external users only)
if (process.env.USER_TYPE !== 'ant' && typeof allData.model === 'string') {
const shortName = getCanonicalName(allData.model.replace(/\[1m]$/i, ''))
allData.model = shortName in MODEL_COSTS ? shortName : 'other'
}
// Truncate dev version to base + date (remove timestamp and sha for cardinality reduction)
// e.g. "2.0.53-dev.20251124.t173302.sha526cc6a" -> "2.0.53-dev.20251124"
if (typeof allData.version === 'string') {
allData.version = allData.version.replace(
/^(\d+\.\d+\.\d+-dev\.\d{8})\.t\d+\.sha[a-f0-9]+$/,
'$1',
)
}
// Transform status to http_status and http_status_range to avoid Datadog reserved field
if (allData.status !== undefined && allData.status !== null) {
const statusCode = String(allData.status)
allData.http_status = statusCode
// Determine status range (1xx, 2xx, 3xx, 4xx, 5xx)
const firstDigit = statusCode.charAt(0)
if (firstDigit >= '1' && firstDigit <= '5') {
allData.http_status_range = `${firstDigit}xx`
}
// Remove original status field to avoid conflict with Datadog's reserved field
delete allData.status
}
// Build ddtags with high-cardinality fields for filtering.
// event:<name> is prepended so the event name is searchable via the
// log search API — the `message` field (where eventName also lives)
// is a DD reserved field and is NOT queryable from dashboard widget
// queries or the aggregation API. See scripts/release/MONITORING.md.
const allDataRecord = allData
const tags = [
`event:${eventName}`,
...TAG_FIELDS.filter(
field =>
allDataRecord[field] !== undefined && allDataRecord[field] !== null,
).map(field => `${camelToSnakeCase(field)}:${allDataRecord[field]}`),
]
const log: DatadogLog = {
ddsource: 'nodejs',
ddtags: tags.join(','),
message: eventName,
service: 'claude-code',
hostname: 'claude-code',
env: process.env.USER_TYPE,
}
// Add all fields as searchable attributes (not duplicated in tags)
for (const [key, value] of Object.entries(allData)) {
if (value !== undefined && value !== null) {
log[camelToSnakeCase(key)] = value
}
}
logBatch.push(log)
// Flush immediately if batch is full, otherwise schedule
if (logBatch.length >= MAX_BATCH_SIZE) {
if (flushTimer) {
clearTimeout(flushTimer)
flushTimer = null
}
void flushLogs()
} else {
scheduleFlush()
}
} catch (error) {
logError(error)
}
}
const NUM_USER_BUCKETS = 30
/**
* Gets a 'bucket' that the user ID falls into.
*
* For alerting purposes, we want to alert on the number of users impacted
* by an issue, rather than the number of events- often a small number of users
* can generate a large number of events (e.g. due to retries). To approximate
* this without ruining cardinality by counting user IDs directly, we hash the user ID
* and assign it to one of a fixed number of buckets.
*
* This allows us to estimate the number of unique users by counting unique buckets,
* while preserving user privacy and reducing cardinality.
*/
const getUserBucket = memoize((): number => {
const userId = getOrCreateUserID()
const hash = createHash('sha256').update(userId).digest('hex')
return parseInt(hash.slice(0, 8), 16) % NUM_USER_BUCKETS
})
function getFlushIntervalMs(): number {
// Allow tests to override to not block on the default flush interval.
return (
parseInt(process.env.CLAUDE_CODE_DATADOG_FLUSH_INTERVAL_MS || '', 10) ||
DEFAULT_FLUSH_INTERVAL_MS
)
}

View File

@@ -0,0 +1,449 @@
import type { AnyValueMap, Logger, logs } from '@opentelemetry/api-logs'
import { resourceFromAttributes } from '@opentelemetry/resources'
import {
BatchLogRecordProcessor,
LoggerProvider,
} from '@opentelemetry/sdk-logs'
import {
ATTR_SERVICE_NAME,
ATTR_SERVICE_VERSION,
} from '@opentelemetry/semantic-conventions'
import { randomUUID } from 'crypto'
import { isEqual } from 'lodash-es'
import { getOrCreateUserID } from '../../utils/config.js'
import { logForDebugging } from '../../utils/debug.js'
import { logError } from '../../utils/log.js'
import { getPlatform, getWslVersion } from '../../utils/platform.js'
import { jsonStringify } from '../../utils/slowOperations.js'
import { profileCheckpoint } from '../../utils/startupProfiler.js'
import { getCoreUserData } from '../../utils/user.js'
import { isAnalyticsDisabled } from './config.js'
import { FirstPartyEventLoggingExporter } from './firstPartyEventLoggingExporter.js'
import type { GrowthBookUserAttributes } from './growthbook.js'
import { getDynamicConfig_CACHED_MAY_BE_STALE } from './growthbook.js'
import { getEventMetadata } from './metadata.js'
import { isSinkKilled } from './sinkKillswitch.js'
/**
* Configuration for sampling individual event types.
* Each event name maps to an object containing sample_rate (0-1).
* Events not in the config are logged at 100% rate.
*/
export type EventSamplingConfig = {
[eventName: string]: {
sample_rate: number
}
}
const EVENT_SAMPLING_CONFIG_NAME = 'tengu_event_sampling_config'
/**
* Get the event sampling configuration from GrowthBook.
* Uses cached value if available, updates cache in background.
*/
export function getEventSamplingConfig(): EventSamplingConfig {
return getDynamicConfig_CACHED_MAY_BE_STALE<EventSamplingConfig>(
EVENT_SAMPLING_CONFIG_NAME,
{},
)
}
/**
* Determine if an event should be sampled based on its sample rate.
* Returns the sample rate if sampled, null if not sampled.
*
* @param eventName - Name of the event to check
* @returns The sample_rate if event should be logged, null if it should be dropped
*/
export function shouldSampleEvent(eventName: string): number | null {
const config = getEventSamplingConfig()
const eventConfig = config[eventName]
// If no config for this event, log at 100% rate (no sampling)
if (!eventConfig) {
return null
}
const sampleRate = eventConfig.sample_rate
// Validate sample rate is in valid range
if (typeof sampleRate !== 'number' || sampleRate < 0 || sampleRate > 1) {
return null
}
// Sample rate of 1 means log everything (no need to add metadata)
if (sampleRate >= 1) {
return null
}
// Sample rate of 0 means drop everything
if (sampleRate <= 0) {
return 0
}
// Randomly decide whether to sample this event
return Math.random() < sampleRate ? sampleRate : 0
}
const BATCH_CONFIG_NAME = 'tengu_1p_event_batch_config'
type BatchConfig = {
scheduledDelayMillis?: number
maxExportBatchSize?: number
maxQueueSize?: number
skipAuth?: boolean
maxAttempts?: number
path?: string
baseUrl?: string
}
function getBatchConfig(): BatchConfig {
return getDynamicConfig_CACHED_MAY_BE_STALE<BatchConfig>(
BATCH_CONFIG_NAME,
{},
)
}
// Module-local state for event logging (not exposed globally)
let firstPartyEventLogger: ReturnType<typeof logs.getLogger> | null = null
let firstPartyEventLoggerProvider: LoggerProvider | null = null
// Last batch config used to construct the provider — used by
// reinitialize1PEventLoggingIfConfigChanged to decide whether a rebuild is
// needed when GrowthBook refreshes.
let lastBatchConfig: BatchConfig | null = null
/**
* Flush and shutdown the 1P event logger.
* This should be called as the final step before process exit to ensure
* all events (including late ones from API responses) are exported.
*/
export async function shutdown1PEventLogging(): Promise<void> {
if (!firstPartyEventLoggerProvider) {
return
}
try {
await firstPartyEventLoggerProvider.shutdown()
if (process.env.USER_TYPE === 'ant') {
logForDebugging('1P event logging: final shutdown complete')
}
} catch {
// Ignore shutdown errors
}
}
/**
* Check if 1P event logging is enabled.
* Respects the same opt-outs as other analytics sinks:
* - Test environment
* - Third-party cloud providers (Bedrock/Vertex)
* - Global telemetry opt-outs
* - Non-essential traffic disabled
*
* Note: Unlike BigQuery metrics, event logging does NOT check organization-level
* metrics opt-out via API. It follows the same pattern as Statsig event logging.
*/
export function is1PEventLoggingEnabled(): boolean {
// Respect standard analytics opt-outs
return !isAnalyticsDisabled()
}
/**
* Log a 1st-party event for internal analytics (async version).
* Events are batched and exported to /api/event_logging/batch
*
* This enriches the event with core metadata (model, session, env context, etc.)
* at log time, similar to logEventToStatsig.
*
* @param eventName - Name of the event (e.g., 'tengu_api_query')
* @param metadata - Additional metadata for the event (intentionally no strings, to avoid accidentally logging code/filepaths)
*/
async function logEventTo1PAsync(
firstPartyEventLogger: Logger,
eventName: string,
metadata: Record<string, number | boolean | undefined> = {},
): Promise<void> {
try {
// Enrich with core metadata at log time (similar to Statsig pattern)
const coreMetadata = await getEventMetadata({
model: metadata.model,
betas: metadata.betas,
})
// Build attributes - OTel supports nested objects natively via AnyValueMap
// Cast through unknown since our nested objects are structurally compatible
// with AnyValue but TS doesn't recognize it due to missing index signatures
const attributes = {
event_name: eventName,
event_id: randomUUID(),
// Pass objects directly - no JSON serialization needed
core_metadata: coreMetadata,
user_metadata: getCoreUserData(true),
event_metadata: metadata,
} as unknown as AnyValueMap
// Add user_id if available
const userId = getOrCreateUserID()
if (userId) {
attributes.user_id = userId
}
// Debug logging when debug mode is enabled
if (process.env.USER_TYPE === 'ant') {
logForDebugging(
`[ANT-ONLY] 1P event: ${eventName} ${jsonStringify(metadata, null, 0)}`,
)
}
// Emit log record
firstPartyEventLogger.emit({
body: eventName,
attributes,
})
} catch (e) {
if (process.env.NODE_ENV === 'development') {
throw e
}
if (process.env.USER_TYPE === 'ant') {
logError(e as Error)
}
// swallow
}
}
/**
* Log a 1st-party event for internal analytics.
* Events are batched and exported to /api/event_logging/batch
*
* @param eventName - Name of the event (e.g., 'tengu_api_query')
* @param metadata - Additional metadata for the event (intentionally no strings, to avoid accidentally logging code/filepaths)
*/
export function logEventTo1P(
eventName: string,
metadata: Record<string, number | boolean | undefined> = {},
): void {
if (!is1PEventLoggingEnabled()) {
return
}
if (!firstPartyEventLogger || isSinkKilled('firstParty')) {
return
}
// Fire and forget - don't block on metadata enrichment
void logEventTo1PAsync(firstPartyEventLogger, eventName, metadata)
}
/**
* GrowthBook experiment event data for logging
*/
export type GrowthBookExperimentData = {
experimentId: string
variationId: number
userAttributes?: GrowthBookUserAttributes
experimentMetadata?: Record<string, unknown>
}
// api.anthropic.com only serves the "production" GrowthBook environment
// (see starling/starling/cli/cli.py DEFAULT_ENVIRONMENTS). Staging and
// development environments are not exported to the prod API.
function getEnvironmentForGrowthBook(): string {
return 'production'
}
/**
* Log a GrowthBook experiment assignment event to 1P.
* Events are batched and exported to /api/event_logging/batch
*
* @param data - GrowthBook experiment assignment data
*/
export function logGrowthBookExperimentTo1P(
data: GrowthBookExperimentData,
): void {
if (!is1PEventLoggingEnabled()) {
return
}
if (!firstPartyEventLogger || isSinkKilled('firstParty')) {
return
}
const userId = getOrCreateUserID()
const { accountUuid, organizationUuid } = getCoreUserData(true)
// Build attributes for GrowthbookExperimentEvent
const attributes = {
event_type: 'GrowthbookExperimentEvent',
event_id: randomUUID(),
experiment_id: data.experimentId,
variation_id: data.variationId,
...(userId && { device_id: userId }),
...(accountUuid && { account_uuid: accountUuid }),
...(organizationUuid && { organization_uuid: organizationUuid }),
...(data.userAttributes && {
session_id: data.userAttributes.sessionId,
user_attributes: jsonStringify(data.userAttributes),
}),
...(data.experimentMetadata && {
experiment_metadata: jsonStringify(data.experimentMetadata),
}),
environment: getEnvironmentForGrowthBook(),
}
if (process.env.USER_TYPE === 'ant') {
logForDebugging(
`[ANT-ONLY] 1P GrowthBook experiment: ${data.experimentId} variation=${data.variationId}`,
)
}
firstPartyEventLogger.emit({
body: 'growthbook_experiment',
attributes,
})
}
const DEFAULT_LOGS_EXPORT_INTERVAL_MS = 10000
const DEFAULT_MAX_EXPORT_BATCH_SIZE = 200
const DEFAULT_MAX_QUEUE_SIZE = 8192
/**
* Initialize 1P event logging infrastructure.
* This creates a separate LoggerProvider for internal event logging,
* independent of customer OTLP telemetry.
*
* This uses its own minimal resource configuration with just the attributes
* we need for internal analytics (service name, version, platform info).
*/
export function initialize1PEventLogging(): void {
profileCheckpoint('1p_event_logging_start')
const enabled = is1PEventLoggingEnabled()
if (!enabled) {
if (process.env.USER_TYPE === 'ant') {
logForDebugging('1P event logging not enabled')
}
return
}
// Fetch batch processor configuration from GrowthBook dynamic config
// Uses cached value if available, refreshes in background
const batchConfig = getBatchConfig()
lastBatchConfig = batchConfig
profileCheckpoint('1p_event_after_growthbook_config')
const scheduledDelayMillis =
batchConfig.scheduledDelayMillis ||
parseInt(
process.env.OTEL_LOGS_EXPORT_INTERVAL ||
DEFAULT_LOGS_EXPORT_INTERVAL_MS.toString(),
)
const maxExportBatchSize =
batchConfig.maxExportBatchSize || DEFAULT_MAX_EXPORT_BATCH_SIZE
const maxQueueSize = batchConfig.maxQueueSize || DEFAULT_MAX_QUEUE_SIZE
// Build our own resource for 1P event logging with minimal attributes
const platform = getPlatform()
const attributes: Record<string, string> = {
[ATTR_SERVICE_NAME]: 'claude-code',
[ATTR_SERVICE_VERSION]: MACRO.VERSION,
}
// Add WSL-specific attributes if running on WSL
if (platform === 'wsl') {
const wslVersion = getWslVersion()
if (wslVersion) {
attributes['wsl.version'] = wslVersion
}
}
const resource = resourceFromAttributes(attributes)
// Create a new LoggerProvider with the EventLoggingExporter
// NOTE: This is kept separate from customer telemetry logs to ensure
// internal events don't leak to customer endpoints and vice versa.
// We don't register this globally - it's only used for internal event logging.
const eventLoggingExporter = new FirstPartyEventLoggingExporter({
maxBatchSize: maxExportBatchSize,
skipAuth: batchConfig.skipAuth,
maxAttempts: batchConfig.maxAttempts,
path: batchConfig.path,
baseUrl: batchConfig.baseUrl,
isKilled: () => isSinkKilled('firstParty'),
})
firstPartyEventLoggerProvider = new LoggerProvider({
resource,
processors: [
new BatchLogRecordProcessor(eventLoggingExporter, {
scheduledDelayMillis,
maxExportBatchSize,
maxQueueSize,
}),
],
})
// Initialize event logger from our internal provider (NOT from global API)
// IMPORTANT: We must get the logger from our local provider, not logs.getLogger()
// because logs.getLogger() returns a logger from the global provider, which is
// separate and used for customer telemetry.
firstPartyEventLogger = firstPartyEventLoggerProvider.getLogger(
'com.anthropic.claude_code.events',
MACRO.VERSION,
)
}
/**
* Rebuild the 1P event logging pipeline if the batch config changed.
* Register this with onGrowthBookRefresh so long-running sessions pick up
* changes to batch size, delay, endpoint, etc.
*
* Event-loss safety:
* 1. Null the logger first — concurrent logEventTo1P() calls hit the
* !firstPartyEventLogger guard and bail during the swap window. This drops
* a handful of events but prevents emitting to a draining provider.
* 2. forceFlush() drains the old BatchLogRecordProcessor buffer to the
* exporter. Export failures go to disk at getCurrentBatchFilePath() which
* is keyed by module-level BATCH_UUID + sessionId — unchanged across
* reinit — so the NEW exporter's disk-backed retry picks them up.
* 3. Swap to new provider/logger; old provider shutdown runs in background
* (buffer already drained, just cleanup).
*/
export async function reinitialize1PEventLoggingIfConfigChanged(): Promise<void> {
if (!is1PEventLoggingEnabled() || !firstPartyEventLoggerProvider) {
return
}
const newConfig = getBatchConfig()
if (isEqual(newConfig, lastBatchConfig)) {
return
}
if (process.env.USER_TYPE === 'ant') {
logForDebugging(
`1P event logging: ${BATCH_CONFIG_NAME} changed, reinitializing`,
)
}
const oldProvider = firstPartyEventLoggerProvider
const oldLogger = firstPartyEventLogger
firstPartyEventLogger = null
try {
await oldProvider.forceFlush()
} catch {
// Export failures are already on disk; new exporter will retry them.
}
firstPartyEventLoggerProvider = null
try {
initialize1PEventLogging()
} catch (e) {
// Restore so the next GrowthBook refresh can retry. oldProvider was
// only forceFlush()'d, not shut down — it's still functional. Without
// this, both stay null and the !firstPartyEventLoggerProvider gate at
// the top makes recovery impossible.
firstPartyEventLoggerProvider = oldProvider
firstPartyEventLogger = oldLogger
logError(e)
return
}
void oldProvider.shutdown().catch(() => {})
}

View File

@@ -0,0 +1,806 @@
import type { HrTime } from '@opentelemetry/api'
import { type ExportResult, ExportResultCode } from '@opentelemetry/core'
import type {
LogRecordExporter,
ReadableLogRecord,
} from '@opentelemetry/sdk-logs'
import axios from 'axios'
import { randomUUID } from 'crypto'
import { appendFile, mkdir, readdir, unlink, writeFile } from 'fs/promises'
import * as path from 'path'
import type { CoreUserData } from 'src/utils/user.js'
import {
getIsNonInteractiveSession,
getSessionId,
} from '../../bootstrap/state.js'
import { ClaudeCodeInternalEvent } from '../../types/generated/events_mono/claude_code/v1/claude_code_internal_event.js'
import { GrowthbookExperimentEvent } from '../../types/generated/events_mono/growthbook/v1/growthbook_experiment_event.js'
import {
getClaudeAIOAuthTokens,
hasProfileScope,
isClaudeAISubscriber,
} from '../../utils/auth.js'
import { checkHasTrustDialogAccepted } from '../../utils/config.js'
import { logForDebugging } from '../../utils/debug.js'
import { getClaudeConfigHomeDir } from '../../utils/envUtils.js'
import { errorMessage, isFsInaccessible, toError } from '../../utils/errors.js'
import { getAuthHeaders } from '../../utils/http.js'
import { readJSONLFile } from '../../utils/json.js'
import { logError } from '../../utils/log.js'
import { sleep } from '../../utils/sleep.js'
import { jsonStringify } from '../../utils/slowOperations.js'
import { getClaudeCodeUserAgent } from '../../utils/userAgent.js'
import { isOAuthTokenExpired } from '../oauth/client.js'
import { stripProtoFields } from './index.js'
import { type EventMetadata, to1PEventFormat } from './metadata.js'
// Unique ID for this process run - used to isolate failed event files between runs
const BATCH_UUID = randomUUID()
// File prefix for failed event storage
const FILE_PREFIX = '1p_failed_events.'
// Storage directory for failed events - evaluated at runtime to respect CLAUDE_CONFIG_DIR in tests
function getStorageDir(): string {
return path.join(getClaudeConfigHomeDir(), 'telemetry')
}
// API envelope - event_data is the JSON output from proto toJSON()
type FirstPartyEventLoggingEvent = {
event_type: 'ClaudeCodeInternalEvent' | 'GrowthbookExperimentEvent'
event_data: unknown
}
type FirstPartyEventLoggingPayload = {
events: FirstPartyEventLoggingEvent[]
}
/**
* Exporter for 1st-party event logging to /api/event_logging/batch.
*
* Export cycles are controlled by OpenTelemetry's BatchLogRecordProcessor, which
* triggers export() when either:
* - Time interval elapses (default: 5 seconds via scheduledDelayMillis)
* - Batch size is reached (default: 200 events via maxExportBatchSize)
*
* This exporter adds resilience on top:
* - Append-only log for failed events (concurrency-safe)
* - Quadratic backoff retry for failed events, dropped after maxAttempts
* - Immediate retry of queued events when any export succeeds (endpoint is healthy)
* - Chunking large event sets into smaller batches
* - Auth fallback: retries without auth on 401 errors
*/
export class FirstPartyEventLoggingExporter implements LogRecordExporter {
private readonly endpoint: string
private readonly timeout: number
private readonly maxBatchSize: number
private readonly skipAuth: boolean
private readonly batchDelayMs: number
private readonly baseBackoffDelayMs: number
private readonly maxBackoffDelayMs: number
private readonly maxAttempts: number
private readonly isKilled: () => boolean
private pendingExports: Promise<void>[] = []
private isShutdown = false
private readonly schedule: (
fn: () => Promise<void>,
delayMs: number,
) => () => void
private cancelBackoff: (() => void) | null = null
private attempts = 0
private isRetrying = false
private lastExportErrorContext: string | undefined
constructor(
options: {
timeout?: number
maxBatchSize?: number
skipAuth?: boolean
batchDelayMs?: number
baseBackoffDelayMs?: number
maxBackoffDelayMs?: number
maxAttempts?: number
path?: string
baseUrl?: string
// Injected killswitch probe. Checked per-POST so that disabling the
// firstParty sink also stops backoff retries (not just new emits).
// Passed in rather than imported to avoid a cycle with firstPartyEventLogger.ts.
isKilled?: () => boolean
schedule?: (fn: () => Promise<void>, delayMs: number) => () => void
} = {},
) {
// Default: prod, except when ANTHROPIC_BASE_URL is explicitly staging.
// Overridable via tengu_1p_event_batch_config.baseUrl.
const baseUrl =
options.baseUrl ||
(process.env.ANTHROPIC_BASE_URL === 'https://api-staging.anthropic.com'
? 'https://api-staging.anthropic.com'
: 'https://api.anthropic.com')
this.endpoint = `${baseUrl}${options.path || '/api/event_logging/batch'}`
this.timeout = options.timeout || 10000
this.maxBatchSize = options.maxBatchSize || 200
this.skipAuth = options.skipAuth ?? false
this.batchDelayMs = options.batchDelayMs || 100
this.baseBackoffDelayMs = options.baseBackoffDelayMs || 500
this.maxBackoffDelayMs = options.maxBackoffDelayMs || 30000
this.maxAttempts = options.maxAttempts ?? 8
this.isKilled = options.isKilled ?? (() => false)
this.schedule =
options.schedule ??
((fn, ms) => {
const t = setTimeout(fn, ms)
return () => clearTimeout(t)
})
// Retry any failed events from previous runs of this session (in background)
void this.retryPreviousBatches()
}
// Expose for testing
async getQueuedEventCount(): Promise<number> {
return (await this.loadEventsFromCurrentBatch()).length
}
// --- Storage helpers ---
private getCurrentBatchFilePath(): string {
return path.join(
getStorageDir(),
`${FILE_PREFIX}${getSessionId()}.${BATCH_UUID}.json`,
)
}
private async loadEventsFromFile(
filePath: string,
): Promise<FirstPartyEventLoggingEvent[]> {
try {
return await readJSONLFile<FirstPartyEventLoggingEvent>(filePath)
} catch {
return []
}
}
private async loadEventsFromCurrentBatch(): Promise<
FirstPartyEventLoggingEvent[]
> {
return this.loadEventsFromFile(this.getCurrentBatchFilePath())
}
private async saveEventsToFile(
filePath: string,
events: FirstPartyEventLoggingEvent[],
): Promise<void> {
try {
if (events.length === 0) {
try {
await unlink(filePath)
} catch {
// File doesn't exist, nothing to delete
}
} else {
// Ensure storage directory exists
await mkdir(getStorageDir(), { recursive: true })
// Write as JSON lines (one event per line)
const content = events.map(e => jsonStringify(e)).join('\n') + '\n'
await writeFile(filePath, content, 'utf8')
}
} catch (error) {
logError(error)
}
}
private async appendEventsToFile(
filePath: string,
events: FirstPartyEventLoggingEvent[],
): Promise<void> {
if (events.length === 0) return
try {
// Ensure storage directory exists
await mkdir(getStorageDir(), { recursive: true })
// Append as JSON lines (one event per line) - atomic on most filesystems
const content = events.map(e => jsonStringify(e)).join('\n') + '\n'
await appendFile(filePath, content, 'utf8')
} catch (error) {
logError(error)
}
}
private async deleteFile(filePath: string): Promise<void> {
try {
await unlink(filePath)
} catch {
// File doesn't exist or can't be deleted, ignore
}
}
// --- Previous batch retry (startup) ---
private async retryPreviousBatches(): Promise<void> {
try {
const prefix = `${FILE_PREFIX}${getSessionId()}.`
let files: string[]
try {
files = (await readdir(getStorageDir()))
.filter((f: string) => f.startsWith(prefix) && f.endsWith('.json'))
.filter((f: string) => !f.includes(BATCH_UUID)) // Exclude current batch
} catch (e) {
if (isFsInaccessible(e)) return
throw e
}
for (const file of files) {
const filePath = path.join(getStorageDir(), file)
void this.retryFileInBackground(filePath)
}
} catch (error) {
logError(error)
}
}
private async retryFileInBackground(filePath: string): Promise<void> {
if (this.attempts >= this.maxAttempts) {
await this.deleteFile(filePath)
return
}
const events = await this.loadEventsFromFile(filePath)
if (events.length === 0) {
await this.deleteFile(filePath)
return
}
if (process.env.USER_TYPE === 'ant') {
logForDebugging(
`1P event logging: retrying ${events.length} events from previous batch`,
)
}
const failedEvents = await this.sendEventsInBatches(events)
if (failedEvents.length === 0) {
await this.deleteFile(filePath)
if (process.env.USER_TYPE === 'ant') {
logForDebugging('1P event logging: previous batch retry succeeded')
}
} else {
// Save only the failed events back (not all original events)
await this.saveEventsToFile(filePath, failedEvents)
if (process.env.USER_TYPE === 'ant') {
logForDebugging(
`1P event logging: previous batch retry failed, ${failedEvents.length} events remain`,
)
}
}
}
async export(
logs: ReadableLogRecord[],
resultCallback: (result: ExportResult) => void,
): Promise<void> {
if (this.isShutdown) {
if (process.env.USER_TYPE === 'ant') {
logForDebugging(
'1P event logging export failed: Exporter has been shutdown',
)
}
resultCallback({
code: ExportResultCode.FAILED,
error: new Error('Exporter has been shutdown'),
})
return
}
const exportPromise = this.doExport(logs, resultCallback)
this.pendingExports.push(exportPromise)
// Clean up completed exports
void exportPromise.finally(() => {
const index = this.pendingExports.indexOf(exportPromise)
if (index > -1) {
void this.pendingExports.splice(index, 1)
}
})
}
private async doExport(
logs: ReadableLogRecord[],
resultCallback: (result: ExportResult) => void,
): Promise<void> {
try {
// Filter for event logs only (by scope name)
const eventLogs = logs.filter(
log =>
log.instrumentationScope?.name === 'com.anthropic.claude_code.events',
)
if (eventLogs.length === 0) {
resultCallback({ code: ExportResultCode.SUCCESS })
return
}
// Transform new logs (failed events are retried independently via backoff)
const events = this.transformLogsToEvents(eventLogs).events
if (events.length === 0) {
resultCallback({ code: ExportResultCode.SUCCESS })
return
}
if (this.attempts >= this.maxAttempts) {
resultCallback({
code: ExportResultCode.FAILED,
error: new Error(
`Dropped ${events.length} events: max attempts (${this.maxAttempts}) reached`,
),
})
return
}
// Send events
const failedEvents = await this.sendEventsInBatches(events)
this.attempts++
if (failedEvents.length > 0) {
await this.queueFailedEvents(failedEvents)
this.scheduleBackoffRetry()
const context = this.lastExportErrorContext
? ` (${this.lastExportErrorContext})`
: ''
resultCallback({
code: ExportResultCode.FAILED,
error: new Error(
`Failed to export ${failedEvents.length} events${context}`,
),
})
return
}
// Success - reset backoff and immediately retry any queued events
this.resetBackoff()
if ((await this.getQueuedEventCount()) > 0 && !this.isRetrying) {
void this.retryFailedEvents()
}
resultCallback({ code: ExportResultCode.SUCCESS })
} catch (error) {
if (process.env.USER_TYPE === 'ant') {
logForDebugging(
`1P event logging export failed: ${errorMessage(error)}`,
)
}
logError(error)
resultCallback({
code: ExportResultCode.FAILED,
error: toError(error),
})
}
}
private async sendEventsInBatches(
events: FirstPartyEventLoggingEvent[],
): Promise<FirstPartyEventLoggingEvent[]> {
// Chunk events into batches
const batches: FirstPartyEventLoggingEvent[][] = []
for (let i = 0; i < events.length; i += this.maxBatchSize) {
batches.push(events.slice(i, i + this.maxBatchSize))
}
if (process.env.USER_TYPE === 'ant') {
logForDebugging(
`1P event logging: exporting ${events.length} events in ${batches.length} batch(es)`,
)
}
// Send each batch with delay between them. On first failure, assume the
// endpoint is down and short-circuit: queue the failed batch plus all
// remaining unsent batches without POSTing them. The backoff retry will
// probe again with a single batch next tick.
const failedBatchEvents: FirstPartyEventLoggingEvent[] = []
let lastErrorContext: string | undefined
for (let i = 0; i < batches.length; i++) {
const batch = batches[i]!
try {
await this.sendBatchWithRetry({ events: batch })
} catch (error) {
lastErrorContext = getAxiosErrorContext(error)
for (let j = i; j < batches.length; j++) {
failedBatchEvents.push(...batches[j]!)
}
if (process.env.USER_TYPE === 'ant') {
const skipped = batches.length - 1 - i
logForDebugging(
`1P event logging: batch ${i + 1}/${batches.length} failed (${lastErrorContext}); short-circuiting ${skipped} remaining batch(es)`,
)
}
break
}
if (i < batches.length - 1 && this.batchDelayMs > 0) {
await sleep(this.batchDelayMs)
}
}
if (failedBatchEvents.length > 0 && lastErrorContext) {
this.lastExportErrorContext = lastErrorContext
}
return failedBatchEvents
}
private async queueFailedEvents(
events: FirstPartyEventLoggingEvent[],
): Promise<void> {
const filePath = this.getCurrentBatchFilePath()
// Append-only: just add new events to file (atomic on most filesystems)
await this.appendEventsToFile(filePath, events)
const context = this.lastExportErrorContext
? ` (${this.lastExportErrorContext})`
: ''
const message = `1P event logging: ${events.length} events failed to export${context}`
logError(new Error(message))
}
private scheduleBackoffRetry(): void {
// Don't schedule if already retrying or shutdown
if (this.cancelBackoff || this.isRetrying || this.isShutdown) {
return
}
// Quadratic backoff (matching Statsig SDK): base * attempts²
const delay = Math.min(
this.baseBackoffDelayMs * this.attempts * this.attempts,
this.maxBackoffDelayMs,
)
if (process.env.USER_TYPE === 'ant') {
logForDebugging(
`1P event logging: scheduling backoff retry in ${delay}ms (attempt ${this.attempts})`,
)
}
this.cancelBackoff = this.schedule(async () => {
this.cancelBackoff = null
await this.retryFailedEvents()
}, delay)
}
private async retryFailedEvents(): Promise<void> {
const filePath = this.getCurrentBatchFilePath()
// Keep retrying while there are events and endpoint is healthy
while (!this.isShutdown) {
const events = await this.loadEventsFromFile(filePath)
if (events.length === 0) break
if (this.attempts >= this.maxAttempts) {
if (process.env.USER_TYPE === 'ant') {
logForDebugging(
`1P event logging: max attempts (${this.maxAttempts}) reached, dropping ${events.length} events`,
)
}
await this.deleteFile(filePath)
this.resetBackoff()
return
}
this.isRetrying = true
// Clear file before retry (we have events in memory now)
await this.deleteFile(filePath)
if (process.env.USER_TYPE === 'ant') {
logForDebugging(
`1P event logging: retrying ${events.length} failed events (attempt ${this.attempts + 1})`,
)
}
const failedEvents = await this.sendEventsInBatches(events)
this.attempts++
this.isRetrying = false
if (failedEvents.length > 0) {
// Write failures back to disk
await this.saveEventsToFile(filePath, failedEvents)
this.scheduleBackoffRetry()
return // Failed - wait for backoff
}
// Success - reset backoff and continue loop to drain any newly queued events
this.resetBackoff()
if (process.env.USER_TYPE === 'ant') {
logForDebugging('1P event logging: backoff retry succeeded')
}
}
}
private resetBackoff(): void {
this.attempts = 0
if (this.cancelBackoff) {
this.cancelBackoff()
this.cancelBackoff = null
}
}
private async sendBatchWithRetry(
payload: FirstPartyEventLoggingPayload,
): Promise<void> {
if (this.isKilled()) {
// Throw so the caller short-circuits remaining batches and queues
// everything to disk. Zero network traffic while killed; the backoff
// timer keeps ticking and will resume POSTs as soon as the GrowthBook
// cache picks up the cleared flag.
throw new Error('firstParty sink killswitch active')
}
const baseHeaders: Record<string, string> = {
'Content-Type': 'application/json',
'User-Agent': getClaudeCodeUserAgent(),
'x-service-name': 'claude-code',
}
// Skip auth if trust hasn't been established yet
// This prevents executing apiKeyHelper commands before the trust dialog
// Non-interactive sessions implicitly have workspace trust
const hasTrust =
checkHasTrustDialogAccepted() || getIsNonInteractiveSession()
if (process.env.USER_TYPE === 'ant' && !hasTrust) {
logForDebugging('1P event logging: Trust not accepted')
}
// Skip auth when the OAuth token is expired or lacks user:profile
// scope (service key sessions). Falls through to unauthenticated send.
let shouldSkipAuth = this.skipAuth || !hasTrust
if (!shouldSkipAuth && isClaudeAISubscriber()) {
const tokens = getClaudeAIOAuthTokens()
if (!hasProfileScope()) {
shouldSkipAuth = true
} else if (tokens && isOAuthTokenExpired(tokens.expiresAt)) {
shouldSkipAuth = true
if (process.env.USER_TYPE === 'ant') {
logForDebugging(
'1P event logging: OAuth token expired, skipping auth to avoid 401',
)
}
}
}
// Try with auth headers first (unless trust not established or token is known to be expired)
const authResult = shouldSkipAuth
? { headers: {}, error: 'trust not established or Oauth token expired' }
: getAuthHeaders()
const useAuth = !authResult.error
if (!useAuth && process.env.USER_TYPE === 'ant') {
logForDebugging(
`1P event logging: auth not available, sending without auth`,
)
}
const headers = useAuth
? { ...baseHeaders, ...authResult.headers }
: baseHeaders
try {
const response = await axios.post(this.endpoint, payload, {
timeout: this.timeout,
headers,
})
this.logSuccess(payload.events.length, useAuth, response.data)
return
} catch (error) {
// Handle 401 by retrying without auth
if (
useAuth &&
axios.isAxiosError(error) &&
error.response?.status === 401
) {
if (process.env.USER_TYPE === 'ant') {
logForDebugging(
'1P event logging: 401 auth error, retrying without auth',
)
}
const response = await axios.post(this.endpoint, payload, {
timeout: this.timeout,
headers: baseHeaders,
})
this.logSuccess(payload.events.length, false, response.data)
return
}
throw error
}
}
private logSuccess(
eventCount: number,
withAuth: boolean,
responseData: unknown,
): void {
if (process.env.USER_TYPE === 'ant') {
logForDebugging(
`1P event logging: ${eventCount} events exported successfully${withAuth ? ' (with auth)' : ' (without auth)'}`,
)
logForDebugging(`API Response: ${jsonStringify(responseData, null, 2)}`)
}
}
private hrTimeToDate(hrTime: HrTime): Date {
const [seconds, nanoseconds] = hrTime
return new Date(seconds * 1000 + nanoseconds / 1000000)
}
private transformLogsToEvents(
logs: ReadableLogRecord[],
): FirstPartyEventLoggingPayload {
const events: FirstPartyEventLoggingEvent[] = []
for (const log of logs) {
const attributes = log.attributes || {}
// Check if this is a GrowthBook experiment event
if (attributes.event_type === 'GrowthbookExperimentEvent') {
const timestamp = this.hrTimeToDate(log.hrTime)
const account_uuid = attributes.account_uuid as string | undefined
const organization_uuid = attributes.organization_uuid as
| string
| undefined
events.push({
event_type: 'GrowthbookExperimentEvent',
event_data: GrowthbookExperimentEvent.toJSON({
event_id: attributes.event_id as string,
timestamp,
experiment_id: attributes.experiment_id as string,
variation_id: attributes.variation_id as number,
environment: attributes.environment as string,
user_attributes: attributes.user_attributes as string,
experiment_metadata: attributes.experiment_metadata as string,
device_id: attributes.device_id as string,
session_id: attributes.session_id as string,
auth:
account_uuid || organization_uuid
? { account_uuid, organization_uuid }
: undefined,
}),
})
continue
}
// Extract event name
const eventName =
(attributes.event_name as string) || (log.body as string) || 'unknown'
// Extract metadata objects directly (no JSON parsing needed)
const coreMetadata = attributes.core_metadata as EventMetadata | undefined
const userMetadata = attributes.user_metadata as CoreUserData
const eventMetadata = (attributes.event_metadata || {}) as Record<
string,
unknown
>
if (!coreMetadata) {
// Emit partial event if core metadata is missing
if (process.env.USER_TYPE === 'ant') {
logForDebugging(
`1P event logging: core_metadata missing for event ${eventName}`,
)
}
events.push({
event_type: 'ClaudeCodeInternalEvent',
event_data: ClaudeCodeInternalEvent.toJSON({
event_id: attributes.event_id as string | undefined,
event_name: eventName,
client_timestamp: this.hrTimeToDate(log.hrTime),
session_id: getSessionId(),
additional_metadata: Buffer.from(
jsonStringify({
transform_error: 'core_metadata attribute is missing',
}),
).toString('base64'),
}),
})
continue
}
// Transform to 1P format
const formatted = to1PEventFormat(
coreMetadata,
userMetadata,
eventMetadata,
)
// _PROTO_* keys are PII-tagged values meant only for privileged BQ
// columns. Hoist known keys to proto fields, then defensively strip any
// remaining _PROTO_* so an unrecognized future key can't silently land
// in the general-access additional_metadata blob. sink.ts applies the
// same strip before Datadog; this closes the 1P side.
const {
_PROTO_skill_name,
_PROTO_plugin_name,
_PROTO_marketplace_name,
...rest
} = formatted.additional
const additionalMetadata = stripProtoFields(rest)
events.push({
event_type: 'ClaudeCodeInternalEvent',
event_data: ClaudeCodeInternalEvent.toJSON({
event_id: attributes.event_id as string | undefined,
event_name: eventName,
client_timestamp: this.hrTimeToDate(log.hrTime),
device_id: attributes.user_id as string | undefined,
email: userMetadata?.email,
auth: formatted.auth,
...formatted.core,
env: formatted.env,
process: formatted.process,
skill_name:
typeof _PROTO_skill_name === 'string'
? _PROTO_skill_name
: undefined,
plugin_name:
typeof _PROTO_plugin_name === 'string'
? _PROTO_plugin_name
: undefined,
marketplace_name:
typeof _PROTO_marketplace_name === 'string'
? _PROTO_marketplace_name
: undefined,
additional_metadata:
Object.keys(additionalMetadata).length > 0
? Buffer.from(jsonStringify(additionalMetadata)).toString(
'base64',
)
: undefined,
}),
})
}
return { events }
}
async shutdown(): Promise<void> {
this.isShutdown = true
this.resetBackoff()
await this.forceFlush()
if (process.env.USER_TYPE === 'ant') {
logForDebugging('1P event logging exporter shutdown complete')
}
}
async forceFlush(): Promise<void> {
await Promise.all(this.pendingExports)
if (process.env.USER_TYPE === 'ant') {
logForDebugging('1P event logging exporter flush complete')
}
}
}
function getAxiosErrorContext(error: unknown): string {
if (!axios.isAxiosError(error)) {
return errorMessage(error)
}
const parts: string[] = []
const requestId = error.response?.headers?.['request-id']
if (requestId) {
parts.push(`request-id=${requestId}`)
}
if (error.response?.status) {
parts.push(`status=${error.response.status}`)
}
if (error.code) {
parts.push(`code=${error.code}`)
}
if (error.message) {
parts.push(error.message)
}
return parts.join(', ')
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,173 @@
/**
* Analytics service - public API for event logging
*
* This module serves as the main entry point for analytics events in Claude CLI.
*
* DESIGN: This module has NO dependencies to avoid import cycles.
* Events are queued until attachAnalyticsSink() is called during app initialization.
* The sink handles routing to Datadog and 1P event logging.
*/
/**
* Marker type for verifying analytics metadata doesn't contain sensitive data
*
* This type forces explicit verification that string values being logged
* don't contain code snippets, file paths, or other sensitive information.
*
* Usage: `myString as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS`
*/
export type AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS = never
/**
* Marker type for values routed to PII-tagged proto columns via `_PROTO_*`
* payload keys. The destination BQ column has privileged access controls,
* so unredacted values are acceptable — unlike general-access backends.
*
* sink.ts strips `_PROTO_*` keys before Datadog fanout; only the 1P
* exporter (firstPartyEventLoggingExporter) sees them and hoists them to the
* top-level proto field. A single stripProtoFields call guards all non-1P
* sinks — no per-sink filtering to forget.
*
* Usage: `rawName as AnalyticsMetadata_I_VERIFIED_THIS_IS_PII_TAGGED`
*/
export type AnalyticsMetadata_I_VERIFIED_THIS_IS_PII_TAGGED = never
/**
* Strip `_PROTO_*` keys from a payload destined for general-access storage.
* Used by:
* - sink.ts: before Datadog fanout (never sees PII-tagged values)
* - firstPartyEventLoggingExporter: defensive strip of additional_metadata
* after hoisting known _PROTO_* keys to proto fields — prevents a future
* unrecognized _PROTO_foo from silently landing in the BQ JSON blob.
*
* Returns the input unchanged (same reference) when no _PROTO_ keys present.
*/
export function stripProtoFields<V>(
metadata: Record<string, V>,
): Record<string, V> {
let result: Record<string, V> | undefined
for (const key in metadata) {
if (key.startsWith('_PROTO_')) {
if (result === undefined) {
result = { ...metadata }
}
delete result[key]
}
}
return result ?? metadata
}
// Internal type for logEvent metadata - different from the enriched EventMetadata in metadata.ts
type LogEventMetadata = { [key: string]: boolean | number | undefined }
type QueuedEvent = {
eventName: string
metadata: LogEventMetadata
async: boolean
}
/**
* Sink interface for the analytics backend
*/
export type AnalyticsSink = {
logEvent: (eventName: string, metadata: LogEventMetadata) => void
logEventAsync: (
eventName: string,
metadata: LogEventMetadata,
) => Promise<void>
}
// Event queue for events logged before sink is attached
const eventQueue: QueuedEvent[] = []
// Sink - initialized during app startup
let sink: AnalyticsSink | null = null
/**
* Attach the analytics sink that will receive all events.
* Queued events are drained asynchronously via queueMicrotask to avoid
* adding latency to the startup path.
*
* Idempotent: if a sink is already attached, this is a no-op. This allows
* calling from both the preAction hook (for subcommands) and setup() (for
* the default command) without coordination.
*/
export function attachAnalyticsSink(newSink: AnalyticsSink): void {
if (sink !== null) {
return
}
sink = newSink
// Drain the queue asynchronously to avoid blocking startup
if (eventQueue.length > 0) {
const queuedEvents = [...eventQueue]
eventQueue.length = 0
// Log queue size for ants to help debug analytics initialization timing
if (process.env.USER_TYPE === 'ant') {
sink.logEvent('analytics_sink_attached', {
queued_event_count: queuedEvents.length,
})
}
queueMicrotask(() => {
for (const event of queuedEvents) {
if (event.async) {
void sink!.logEventAsync(event.eventName, event.metadata)
} else {
sink!.logEvent(event.eventName, event.metadata)
}
}
})
}
}
/**
* Log an event to analytics backends (synchronous)
*
* Events may be sampled based on the 'tengu_event_sampling_config' dynamic config.
* When sampled, the sample_rate is added to the event metadata.
*
* If no sink is attached, events are queued and drained when the sink attaches.
*/
export function logEvent(
eventName: string,
// intentionally no strings unless AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
// to avoid accidentally logging code/filepaths
metadata: LogEventMetadata,
): void {
if (sink === null) {
eventQueue.push({ eventName, metadata, async: false })
return
}
sink.logEvent(eventName, metadata)
}
/**
* Log an event to analytics backends (asynchronous)
*
* Events may be sampled based on the 'tengu_event_sampling_config' dynamic config.
* When sampled, the sample_rate is added to the event metadata.
*
* If no sink is attached, events are queued and drained when the sink attaches.
*/
export async function logEventAsync(
eventName: string,
// intentionally no strings, to avoid accidentally logging code/filepaths
metadata: LogEventMetadata,
): Promise<void> {
if (sink === null) {
eventQueue.push({ eventName, metadata, async: true })
return
}
await sink.logEventAsync(eventName, metadata)
}
/**
* Reset analytics state for testing purposes only.
* @internal
*/
export function _resetForTesting(): void {
sink = null
eventQueue.length = 0
}

View File

@@ -0,0 +1,973 @@
// biome-ignore-all assist/source/organizeImports: ANT-ONLY import markers must not be reordered
/**
* Shared event metadata enrichment for analytics systems
*
* This module provides a single source of truth for collecting and formatting
* event metadata across all analytics systems (Datadog, 1P).
*/
import { extname } from 'path'
import memoize from 'lodash-es/memoize.js'
import { env, getHostPlatformForAnalytics } from '../../utils/env.js'
import { envDynamic } from '../../utils/envDynamic.js'
import { getModelBetas } from '../../utils/betas.js'
import { getMainLoopModel } from '../../utils/model/model.js'
import {
getSessionId,
getIsInteractive,
getKairosActive,
getClientType,
getParentSessionId as getParentSessionIdFromState,
} from '../../bootstrap/state.js'
import { isEnvTruthy } from '../../utils/envUtils.js'
import { isOfficialMcpUrl } from '../mcp/officialRegistry.js'
import { isClaudeAISubscriber, getSubscriptionType } from '../../utils/auth.js'
import { getRepoRemoteHash } from '../../utils/git.js'
import {
getWslVersion,
getLinuxDistroInfo,
detectVcs,
} from '../../utils/platform.js'
import type { CoreUserData } from 'src/utils/user.js'
import { getAgentContext } from '../../utils/agentContext.js'
import type { EnvironmentMetadata } from '../../types/generated/events_mono/claude_code/v1/claude_code_internal_event.js'
import type { PublicApiAuth } from '../../types/generated/events_mono/common/v1/auth.js'
import { jsonStringify } from '../../utils/slowOperations.js'
import {
getAgentId,
getParentSessionId as getTeammateParentSessionId,
getTeamName,
isTeammate,
} from '../../utils/teammate.js'
import { feature } from 'bun:bundle'
/**
* Marker type for verifying analytics metadata doesn't contain sensitive data
*
* This type forces explicit verification that string values being logged
* don't contain code snippets, file paths, or other sensitive information.
*
* The metadata is expected to be JSON-serializable.
*
* Usage: `myString as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS`
*
* The type is `never` which means it can never actually hold a value - this is
* intentional as it's only used for type-casting to document developer intent.
*/
export type AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS = never
/**
* Sanitizes tool names for analytics logging to avoid PII exposure.
*
* MCP tool names follow the format `mcp__<server>__<tool>` and can reveal
* user-specific server configurations, which is considered PII-medium.
* This function redacts MCP tool names while preserving built-in tool names
* (Bash, Read, Write, etc.) which are safe to log.
*
* @param toolName - The tool name to sanitize
* @returns The original name for built-in tools, or 'mcp_tool' for MCP tools
*/
export function sanitizeToolNameForAnalytics(
toolName: string,
): AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS {
if (toolName.startsWith('mcp__')) {
return 'mcp_tool' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS
}
return toolName as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS
}
/**
* Check if detailed tool name logging is enabled for OTLP events.
* When enabled, MCP server/tool names and Skill names are logged.
* Disabled by default to protect PII (user-specific server configurations).
*
* Enable with OTEL_LOG_TOOL_DETAILS=1
*/
export function isToolDetailsLoggingEnabled(): boolean {
return isEnvTruthy(process.env.OTEL_LOG_TOOL_DETAILS)
}
/**
* Check if detailed tool name logging (MCP server/tool names) is enabled
* for analytics events.
*
* Per go/taxonomy, MCP names are medium PII. We log them for:
* - Cowork (entrypoint=local-agent) — no ZDR concept, log all MCPs
* - claude.ai-proxied connectors — always official (from claude.ai's list)
* - Servers whose URL matches the official MCP registry — directory
* connectors added via `claude mcp add`, not customer-specific config
*
* Custom/user-configured MCPs stay sanitized (toolName='mcp_tool').
*/
export function isAnalyticsToolDetailsLoggingEnabled(
mcpServerType: string | undefined,
mcpServerBaseUrl: string | undefined,
): boolean {
if (process.env.CLAUDE_CODE_ENTRYPOINT === 'local-agent') {
return true
}
if (mcpServerType === 'claudeai-proxy') {
return true
}
if (mcpServerBaseUrl && isOfficialMcpUrl(mcpServerBaseUrl)) {
return true
}
return false
}
/**
* Built-in first-party MCP servers whose names are fixed reserved strings,
* not user-configured — so logging them is not PII. Checked in addition to
* isAnalyticsToolDetailsLoggingEnabled's transport/URL gates, which a stdio
* built-in would otherwise fail.
*
* Feature-gated so the set is empty when the feature is off: the name
* reservation (main.tsx, config.ts addMcpServer) is itself feature-gated, so
* a user-configured 'computer-use' is possible in builds without the feature.
*/
/* eslint-disable @typescript-eslint/no-require-imports */
const BUILTIN_MCP_SERVER_NAMES: ReadonlySet<string> = new Set(
feature('CHICAGO_MCP')
? [
(
require('../../utils/computerUse/common.js') as typeof import('../../utils/computerUse/common.js')
).COMPUTER_USE_MCP_SERVER_NAME,
]
: [],
)
/* eslint-enable @typescript-eslint/no-require-imports */
/**
* Spreadable helper for logEvent payloads — returns {mcpServerName, mcpToolName}
* if the gate passes, empty object otherwise. Consolidates the identical IIFE
* pattern at each tengu_tool_use_* call site.
*/
export function mcpToolDetailsForAnalytics(
toolName: string,
mcpServerType: string | undefined,
mcpServerBaseUrl: string | undefined,
): {
mcpServerName?: AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS
mcpToolName?: AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS
} {
const details = extractMcpToolDetails(toolName)
if (!details) {
return {}
}
if (
!BUILTIN_MCP_SERVER_NAMES.has(details.serverName) &&
!isAnalyticsToolDetailsLoggingEnabled(mcpServerType, mcpServerBaseUrl)
) {
return {}
}
return {
mcpServerName: details.serverName,
mcpToolName: details.mcpToolName,
}
}
/**
* Extract MCP server and tool names from a full MCP tool name.
* MCP tool names follow the format: mcp__<server>__<tool>
*
* @param toolName - The full tool name (e.g., 'mcp__slack__read_channel')
* @returns Object with serverName and toolName, or undefined if not an MCP tool
*/
export function extractMcpToolDetails(toolName: string):
| {
serverName: AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS
mcpToolName: AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS
}
| undefined {
if (!toolName.startsWith('mcp__')) {
return undefined
}
// Format: mcp__<server>__<tool>
const parts = toolName.split('__')
if (parts.length < 3) {
return undefined
}
const serverName = parts[1]
// Tool name may contain __ so rejoin remaining parts
const mcpToolName = parts.slice(2).join('__')
if (!serverName || !mcpToolName) {
return undefined
}
return {
serverName:
serverName as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
mcpToolName:
mcpToolName as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
}
}
/**
* Extract skill name from Skill tool input.
*
* @param toolName - The tool name (should be 'Skill')
* @param input - The tool input containing the skill name
* @returns The skill name if this is a Skill tool call, undefined otherwise
*/
export function extractSkillName(
toolName: string,
input: unknown,
): AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS | undefined {
if (toolName !== 'Skill') {
return undefined
}
if (
typeof input === 'object' &&
input !== null &&
'skill' in input &&
typeof (input as { skill: unknown }).skill === 'string'
) {
return (input as { skill: string })
.skill as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS
}
return undefined
}
const TOOL_INPUT_STRING_TRUNCATE_AT = 512
const TOOL_INPUT_STRING_TRUNCATE_TO = 128
const TOOL_INPUT_MAX_JSON_CHARS = 4 * 1024
const TOOL_INPUT_MAX_COLLECTION_ITEMS = 20
const TOOL_INPUT_MAX_DEPTH = 2
function truncateToolInputValue(value: unknown, depth = 0): unknown {
if (typeof value === 'string') {
if (value.length > TOOL_INPUT_STRING_TRUNCATE_AT) {
return `${value.slice(0, TOOL_INPUT_STRING_TRUNCATE_TO)}…[${value.length} chars]`
}
return value
}
if (
typeof value === 'number' ||
typeof value === 'boolean' ||
value === null ||
value === undefined
) {
return value
}
if (depth >= TOOL_INPUT_MAX_DEPTH) {
return '<nested>'
}
if (Array.isArray(value)) {
const mapped = value
.slice(0, TOOL_INPUT_MAX_COLLECTION_ITEMS)
.map(v => truncateToolInputValue(v, depth + 1))
if (value.length > TOOL_INPUT_MAX_COLLECTION_ITEMS) {
mapped.push(`…[${value.length} items]`)
}
return mapped
}
if (typeof value === 'object') {
const entries = Object.entries(value as Record<string, unknown>)
// Skip internal marker keys (e.g. _simulatedSedEdit re-introduced by
// SedEditPermissionRequest) so they don't leak into telemetry.
.filter(([k]) => !k.startsWith('_'))
const mapped = entries
.slice(0, TOOL_INPUT_MAX_COLLECTION_ITEMS)
.map(([k, v]) => [k, truncateToolInputValue(v, depth + 1)])
if (entries.length > TOOL_INPUT_MAX_COLLECTION_ITEMS) {
mapped.push(['…', `${entries.length} keys`])
}
return Object.fromEntries(mapped)
}
return String(value)
}
/**
* Serialize a tool's input arguments for the OTel tool_result event.
* Truncates long strings and deep nesting to keep the output bounded while
* preserving forensically useful fields like file paths, URLs, and MCP args.
* Returns undefined when OTEL_LOG_TOOL_DETAILS is not enabled.
*/
export function extractToolInputForTelemetry(
input: unknown,
): string | undefined {
if (!isToolDetailsLoggingEnabled()) {
return undefined
}
const truncated = truncateToolInputValue(input)
let json = jsonStringify(truncated)
if (json.length > TOOL_INPUT_MAX_JSON_CHARS) {
json = json.slice(0, TOOL_INPUT_MAX_JSON_CHARS) + '…[truncated]'
}
return json
}
/**
* Maximum length for file extensions to be logged.
* Extensions longer than this are considered potentially sensitive
* (e.g., hash-based filenames like "key-hash-abcd-123-456") and
* will be replaced with 'other'.
*/
const MAX_FILE_EXTENSION_LENGTH = 10
/**
* Extracts and sanitizes a file extension for analytics logging.
*
* Uses Node's path.extname for reliable cross-platform extension extraction.
* Returns 'other' for extensions exceeding MAX_FILE_EXTENSION_LENGTH to avoid
* logging potentially sensitive data (like hash-based filenames).
*
* @param filePath - The file path to extract the extension from
* @returns The sanitized extension, 'other' for long extensions, or undefined if no extension
*/
export function getFileExtensionForAnalytics(
filePath: string,
): AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS | undefined {
const ext = extname(filePath).toLowerCase()
if (!ext || ext === '.') {
return undefined
}
const extension = ext.slice(1) // remove leading dot
if (extension.length > MAX_FILE_EXTENSION_LENGTH) {
return 'other' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS
}
return extension as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS
}
/** Allow list of commands we extract file extensions from. */
const FILE_COMMANDS = new Set([
'rm',
'mv',
'cp',
'touch',
'mkdir',
'chmod',
'chown',
'cat',
'head',
'tail',
'sort',
'stat',
'diff',
'wc',
'grep',
'rg',
'sed',
])
/** Regex to split bash commands on compound operators (&&, ||, ;, |). */
const COMPOUND_OPERATOR_REGEX = /\s*(?:&&|\|\||[;|])\s*/
/** Regex to split on whitespace. */
const WHITESPACE_REGEX = /\s+/
/**
* Extracts file extensions from a bash command for analytics.
* Best-effort: splits on operators and whitespace, extracts extensions
* from non-flag args of allowed commands. No heavy shell parsing needed
* because grep patterns and sed scripts rarely resemble file extensions.
*/
export function getFileExtensionsFromBashCommand(
command: string,
simulatedSedEditFilePath?: string,
): AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS | undefined {
if (!command.includes('.') && !simulatedSedEditFilePath) return undefined
let result: string | undefined
const seen = new Set<string>()
if (simulatedSedEditFilePath) {
const ext = getFileExtensionForAnalytics(simulatedSedEditFilePath)
if (ext) {
seen.add(ext)
result = ext
}
}
for (const subcmd of command.split(COMPOUND_OPERATOR_REGEX)) {
if (!subcmd) continue
const tokens = subcmd.split(WHITESPACE_REGEX)
if (tokens.length < 2) continue
const firstToken = tokens[0]!
const slashIdx = firstToken.lastIndexOf('/')
const baseCmd = slashIdx >= 0 ? firstToken.slice(slashIdx + 1) : firstToken
if (!FILE_COMMANDS.has(baseCmd)) continue
for (let i = 1; i < tokens.length; i++) {
const arg = tokens[i]!
if (arg.charCodeAt(0) === 45 /* - */) continue
const ext = getFileExtensionForAnalytics(arg)
if (ext && !seen.has(ext)) {
seen.add(ext)
result = result ? result + ',' + ext : ext
}
}
}
if (!result) return undefined
return result as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS
}
/**
* Environment context metadata
*/
export type EnvContext = {
platform: string
platformRaw: string
arch: string
nodeVersion: string
terminal: string | null
packageManagers: string
runtimes: string
isRunningWithBun: boolean
isCi: boolean
isClaubbit: boolean
isClaudeCodeRemote: boolean
isLocalAgentMode: boolean
isConductor: boolean
remoteEnvironmentType?: string
coworkerType?: string
claudeCodeContainerId?: string
claudeCodeRemoteSessionId?: string
tags?: string
isGithubAction: boolean
isClaudeCodeAction: boolean
isClaudeAiAuth: boolean
version: string
versionBase?: string
buildTime: string
deploymentEnvironment: string
githubEventName?: string
githubActionsRunnerEnvironment?: string
githubActionsRunnerOs?: string
githubActionRef?: string
wslVersion?: string
linuxDistroId?: string
linuxDistroVersion?: string
linuxKernel?: string
vcs?: string
}
/**
* Process metrics included with all analytics events.
*/
export type ProcessMetrics = {
uptime: number
rss: number
heapTotal: number
heapUsed: number
external: number
arrayBuffers: number
constrainedMemory: number | undefined
cpuUsage: NodeJS.CpuUsage
cpuPercent: number | undefined
}
/**
* Core event metadata shared across all analytics systems
*/
export type EventMetadata = {
model: string
sessionId: string
userType: string
betas?: string
envContext: EnvContext
entrypoint?: string
agentSdkVersion?: string
isInteractive: string
clientType: string
processMetrics?: ProcessMetrics
sweBenchRunId: string
sweBenchInstanceId: string
sweBenchTaskId: string
// Swarm/team agent identification for analytics attribution
agentId?: string // CLAUDE_CODE_AGENT_ID (format: agentName@teamName) or subagent UUID
parentSessionId?: string // CLAUDE_CODE_PARENT_SESSION_ID (team lead's session)
agentType?: 'teammate' | 'subagent' | 'standalone' // Distinguishes swarm teammates, Agent tool subagents, and standalone agents
teamName?: string // Team name for swarm agents (from env var or AsyncLocalStorage)
subscriptionType?: string // OAuth subscription tier (max, pro, enterprise, team)
rh?: string // Hashed repo remote URL (first 16 chars of SHA256), for joining with server-side data
kairosActive?: true // KAIROS assistant mode active (ant-only; set in main.tsx after gate check)
skillMode?: 'discovery' | 'coach' | 'discovery_and_coach' // Which skill surfacing mechanism(s) are gated on (ant-only; for BQ session segmentation)
observerMode?: 'backseat' | 'skillcoach' | 'both' // Which observer classifiers are gated on (ant-only; for BQ cohort splits on tengu_backseat_* events)
}
/**
* Options for enriching event metadata
*/
export type EnrichMetadataOptions = {
// Model to use, falls back to getMainLoopModel() if not provided
model?: unknown
// Explicit betas string (already joined)
betas?: unknown
// Additional metadata to include (optional)
additionalMetadata?: Record<string, unknown>
}
/**
* Get agent identification for analytics.
* Priority: AsyncLocalStorage context (subagents) > env vars (swarm teammates)
*/
function getAgentIdentification(): {
agentId?: string
parentSessionId?: string
agentType?: 'teammate' | 'subagent' | 'standalone'
teamName?: string
} {
// Check AsyncLocalStorage first (for subagents running in same process)
const agentContext = getAgentContext()
if (agentContext) {
const result: ReturnType<typeof getAgentIdentification> = {
agentId: agentContext.agentId,
parentSessionId: agentContext.parentSessionId,
agentType: agentContext.agentType,
}
if (agentContext.agentType === 'teammate') {
result.teamName = agentContext.teamName
}
return result
}
// Fall back to swarm helpers (for swarm agents)
const agentId = getAgentId()
const parentSessionId = getTeammateParentSessionId()
const teamName = getTeamName()
const isSwarmAgent = isTeammate()
// For standalone agents (have agent ID but not a teammate), set agentType to 'standalone'
const agentType = isSwarmAgent
? ('teammate' as const)
: agentId
? ('standalone' as const)
: undefined
if (agentId || agentType || parentSessionId || teamName) {
return {
...(agentId ? { agentId } : {}),
...(agentType ? { agentType } : {}),
...(parentSessionId ? { parentSessionId } : {}),
...(teamName ? { teamName } : {}),
}
}
// Check bootstrap state for parent session ID (e.g., plan mode -> implementation)
const stateParentSessionId = getParentSessionIdFromState()
if (stateParentSessionId) {
return { parentSessionId: stateParentSessionId }
}
return {}
}
/**
* Extract base version from full version string. "2.0.36-dev.20251107.t174150.sha2709699" → "2.0.36-dev"
*/
const getVersionBase = memoize((): string | undefined => {
const match = MACRO.VERSION.match(/^\d+\.\d+\.\d+(?:-[a-z]+)?/)
return match ? match[0] : undefined
})
/**
* Builds the environment context object
*/
const buildEnvContext = memoize(async (): Promise<EnvContext> => {
const [packageManagers, runtimes, linuxDistroInfo, vcs] = await Promise.all([
env.getPackageManagers(),
env.getRuntimes(),
getLinuxDistroInfo(),
detectVcs(),
])
return {
platform: getHostPlatformForAnalytics(),
// Raw process.platform so freebsd/openbsd/aix/sunos are visible in BQ.
// getHostPlatformForAnalytics() buckets those into 'linux'; here we want
// the truth. CLAUDE_CODE_HOST_PLATFORM still overrides for container/remote.
platformRaw: process.env.CLAUDE_CODE_HOST_PLATFORM || process.platform,
arch: env.arch,
nodeVersion: env.nodeVersion,
terminal: envDynamic.terminal,
packageManagers: packageManagers.join(','),
runtimes: runtimes.join(','),
isRunningWithBun: env.isRunningWithBun(),
isCi: isEnvTruthy(process.env.CI),
isClaubbit: isEnvTruthy(process.env.CLAUBBIT),
isClaudeCodeRemote: isEnvTruthy(process.env.CLAUDE_CODE_REMOTE),
isLocalAgentMode: process.env.CLAUDE_CODE_ENTRYPOINT === 'local-agent',
isConductor: env.isConductor(),
...(process.env.CLAUDE_CODE_REMOTE_ENVIRONMENT_TYPE && {
remoteEnvironmentType: process.env.CLAUDE_CODE_REMOTE_ENVIRONMENT_TYPE,
}),
// Gated by feature flag to prevent leaking "coworkerType" string in external builds
...(feature('COWORKER_TYPE_TELEMETRY')
? process.env.CLAUDE_CODE_COWORKER_TYPE
? { coworkerType: process.env.CLAUDE_CODE_COWORKER_TYPE }
: {}
: {}),
...(process.env.CLAUDE_CODE_CONTAINER_ID && {
claudeCodeContainerId: process.env.CLAUDE_CODE_CONTAINER_ID,
}),
...(process.env.CLAUDE_CODE_REMOTE_SESSION_ID && {
claudeCodeRemoteSessionId: process.env.CLAUDE_CODE_REMOTE_SESSION_ID,
}),
...(process.env.CLAUDE_CODE_TAGS && {
tags: process.env.CLAUDE_CODE_TAGS,
}),
isGithubAction: isEnvTruthy(process.env.GITHUB_ACTIONS),
isClaudeCodeAction: isEnvTruthy(process.env.CLAUDE_CODE_ACTION),
isClaudeAiAuth: isClaudeAISubscriber(),
version: MACRO.VERSION,
versionBase: getVersionBase(),
buildTime: MACRO.BUILD_TIME,
deploymentEnvironment: env.detectDeploymentEnvironment(),
...(isEnvTruthy(process.env.GITHUB_ACTIONS) && {
githubEventName: process.env.GITHUB_EVENT_NAME,
githubActionsRunnerEnvironment: process.env.RUNNER_ENVIRONMENT,
githubActionsRunnerOs: process.env.RUNNER_OS,
githubActionRef: process.env.GITHUB_ACTION_PATH?.includes(
'claude-code-action/',
)
? process.env.GITHUB_ACTION_PATH.split('claude-code-action/')[1]
: undefined,
}),
...(getWslVersion() && { wslVersion: getWslVersion() }),
...(linuxDistroInfo ?? {}),
...(vcs.length > 0 ? { vcs: vcs.join(',') } : {}),
}
})
// --
// CPU% delta tracking — inherently process-global, same pattern as logBatch/flushTimer in datadog.ts
let prevCpuUsage: NodeJS.CpuUsage | null = null
let prevWallTimeMs: number | null = null
/**
* Builds process metrics object for all users.
*/
function buildProcessMetrics(): ProcessMetrics | undefined {
try {
const mem = process.memoryUsage()
const cpu = process.cpuUsage()
const now = Date.now()
let cpuPercent: number | undefined
if (prevCpuUsage && prevWallTimeMs) {
const wallDeltaMs = now - prevWallTimeMs
if (wallDeltaMs > 0) {
const userDeltaUs = cpu.user - prevCpuUsage.user
const systemDeltaUs = cpu.system - prevCpuUsage.system
cpuPercent =
((userDeltaUs + systemDeltaUs) / (wallDeltaMs * 1000)) * 100
}
}
prevCpuUsage = cpu
prevWallTimeMs = now
return {
uptime: process.uptime(),
rss: mem.rss,
heapTotal: mem.heapTotal,
heapUsed: mem.heapUsed,
external: mem.external,
arrayBuffers: mem.arrayBuffers,
// eslint-disable-next-line eslint-plugin-n/no-unsupported-features/node-builtins
constrainedMemory: process.constrainedMemory(),
cpuUsage: cpu,
cpuPercent,
}
} catch {
return undefined
}
}
/**
* Get core event metadata shared across all analytics systems.
*
* This function collects environment, runtime, and context information
* that should be included with all analytics events.
*
* @param options - Configuration options
* @returns Promise resolving to enriched metadata object
*/
export async function getEventMetadata(
options: EnrichMetadataOptions = {},
): Promise<EventMetadata> {
const model = options.model ? String(options.model) : getMainLoopModel()
const betas =
typeof options.betas === 'string'
? options.betas
: getModelBetas(model).join(',')
const [envContext, repoRemoteHash] = await Promise.all([
buildEnvContext(),
getRepoRemoteHash(),
])
const processMetrics = buildProcessMetrics()
const metadata: EventMetadata = {
model,
sessionId: getSessionId(),
userType: process.env.USER_TYPE || '',
...(betas.length > 0 ? { betas: betas } : {}),
envContext,
...(process.env.CLAUDE_CODE_ENTRYPOINT && {
entrypoint: process.env.CLAUDE_CODE_ENTRYPOINT,
}),
...(process.env.CLAUDE_AGENT_SDK_VERSION && {
agentSdkVersion: process.env.CLAUDE_AGENT_SDK_VERSION,
}),
isInteractive: String(getIsInteractive()),
clientType: getClientType(),
...(processMetrics && { processMetrics }),
sweBenchRunId: process.env.SWE_BENCH_RUN_ID || '',
sweBenchInstanceId: process.env.SWE_BENCH_INSTANCE_ID || '',
sweBenchTaskId: process.env.SWE_BENCH_TASK_ID || '',
// Swarm/team agent identification
// Priority: AsyncLocalStorage context (subagents) > env vars (swarm teammates)
...getAgentIdentification(),
// Subscription tier for DAU-by-tier analytics
...(getSubscriptionType() && {
subscriptionType: getSubscriptionType()!,
}),
// Assistant mode tag — lives outside memoized buildEnvContext() because
// setKairosActive() runs at main.tsx:~1648, after the first event may
// have already fired and memoized the env. Read fresh per-event instead.
...(feature('KAIROS') && getKairosActive()
? { kairosActive: true as const }
: {}),
// Repo remote hash for joining with server-side repo bundle data
...(repoRemoteHash && { rh: repoRemoteHash }),
}
return metadata
}
/**
* Core event metadata for 1P event logging (snake_case format).
*/
export type FirstPartyEventLoggingCoreMetadata = {
session_id: string
model: string
user_type: string
betas?: string
entrypoint?: string
agent_sdk_version?: string
is_interactive: boolean
client_type: string
swe_bench_run_id?: string
swe_bench_instance_id?: string
swe_bench_task_id?: string
// Swarm/team agent identification
agent_id?: string
parent_session_id?: string
agent_type?: 'teammate' | 'subagent' | 'standalone'
team_name?: string
}
/**
* Complete event logging metadata format for 1P events.
*/
export type FirstPartyEventLoggingMetadata = {
env: EnvironmentMetadata
process?: string
// auth is a top-level field on ClaudeCodeInternalEvent (proto PublicApiAuth).
// account_id is intentionally omitted — only UUID fields are populated client-side.
auth?: PublicApiAuth
// core fields correspond to the top level of ClaudeCodeInternalEvent.
// They get directly exported to their individual columns in the BigQuery tables
core: FirstPartyEventLoggingCoreMetadata
// additional fields are populated in the additional_metadata field of the
// ClaudeCodeInternalEvent proto. Includes but is not limited to information
// that differs by event type.
additional: Record<string, unknown>
}
/**
* Convert metadata to 1P event logging format (snake_case fields).
*
* The /api/event_logging/batch endpoint expects snake_case field names
* for environment and core metadata.
*
* @param metadata - Core event metadata
* @param additionalMetadata - Additional metadata to include
* @returns Metadata formatted for 1P event logging
*/
export function to1PEventFormat(
metadata: EventMetadata,
userMetadata: CoreUserData,
additionalMetadata: Record<string, unknown> = {},
): FirstPartyEventLoggingMetadata {
const {
envContext,
processMetrics,
rh,
kairosActive,
skillMode,
observerMode,
...coreFields
} = metadata
// Convert envContext to snake_case.
// IMPORTANT: env is typed as the proto-generated EnvironmentMetadata so that
// adding a field here that the proto doesn't define is a compile error. The
// generated toJSON() serializer silently drops unknown keys — a hand-written
// parallel type previously let #11318, #13924, #19448, and coworker_type all
// ship fields that never reached BQ.
// Adding a field? Update the monorepo proto first (go/cc-logging):
// event_schemas/.../claude_code/v1/claude_code_internal_event.proto
// then run `bun run generate:proto` here.
const env: EnvironmentMetadata = {
platform: envContext.platform,
platform_raw: envContext.platformRaw,
arch: envContext.arch,
node_version: envContext.nodeVersion,
terminal: envContext.terminal || 'unknown',
package_managers: envContext.packageManagers,
runtimes: envContext.runtimes,
is_running_with_bun: envContext.isRunningWithBun,
is_ci: envContext.isCi,
is_claubbit: envContext.isClaubbit,
is_claude_code_remote: envContext.isClaudeCodeRemote,
is_local_agent_mode: envContext.isLocalAgentMode,
is_conductor: envContext.isConductor,
is_github_action: envContext.isGithubAction,
is_claude_code_action: envContext.isClaudeCodeAction,
is_claude_ai_auth: envContext.isClaudeAiAuth,
version: envContext.version,
build_time: envContext.buildTime,
deployment_environment: envContext.deploymentEnvironment,
}
// Add optional env fields
if (envContext.remoteEnvironmentType) {
env.remote_environment_type = envContext.remoteEnvironmentType
}
if (feature('COWORKER_TYPE_TELEMETRY') && envContext.coworkerType) {
env.coworker_type = envContext.coworkerType
}
if (envContext.claudeCodeContainerId) {
env.claude_code_container_id = envContext.claudeCodeContainerId
}
if (envContext.claudeCodeRemoteSessionId) {
env.claude_code_remote_session_id = envContext.claudeCodeRemoteSessionId
}
if (envContext.tags) {
env.tags = envContext.tags
.split(',')
.map(t => t.trim())
.filter(Boolean)
}
if (envContext.githubEventName) {
env.github_event_name = envContext.githubEventName
}
if (envContext.githubActionsRunnerEnvironment) {
env.github_actions_runner_environment =
envContext.githubActionsRunnerEnvironment
}
if (envContext.githubActionsRunnerOs) {
env.github_actions_runner_os = envContext.githubActionsRunnerOs
}
if (envContext.githubActionRef) {
env.github_action_ref = envContext.githubActionRef
}
if (envContext.wslVersion) {
env.wsl_version = envContext.wslVersion
}
if (envContext.linuxDistroId) {
env.linux_distro_id = envContext.linuxDistroId
}
if (envContext.linuxDistroVersion) {
env.linux_distro_version = envContext.linuxDistroVersion
}
if (envContext.linuxKernel) {
env.linux_kernel = envContext.linuxKernel
}
if (envContext.vcs) {
env.vcs = envContext.vcs
}
if (envContext.versionBase) {
env.version_base = envContext.versionBase
}
// Convert core fields to snake_case
const core: FirstPartyEventLoggingCoreMetadata = {
session_id: coreFields.sessionId,
model: coreFields.model,
user_type: coreFields.userType,
is_interactive: coreFields.isInteractive === 'true',
client_type: coreFields.clientType,
}
// Add other core fields
if (coreFields.betas) {
core.betas = coreFields.betas
}
if (coreFields.entrypoint) {
core.entrypoint = coreFields.entrypoint
}
if (coreFields.agentSdkVersion) {
core.agent_sdk_version = coreFields.agentSdkVersion
}
if (coreFields.sweBenchRunId) {
core.swe_bench_run_id = coreFields.sweBenchRunId
}
if (coreFields.sweBenchInstanceId) {
core.swe_bench_instance_id = coreFields.sweBenchInstanceId
}
if (coreFields.sweBenchTaskId) {
core.swe_bench_task_id = coreFields.sweBenchTaskId
}
// Swarm/team agent identification
if (coreFields.agentId) {
core.agent_id = coreFields.agentId
}
if (coreFields.parentSessionId) {
core.parent_session_id = coreFields.parentSessionId
}
if (coreFields.agentType) {
core.agent_type = coreFields.agentType
}
if (coreFields.teamName) {
core.team_name = coreFields.teamName
}
// Map userMetadata to output fields.
// Based on src/utils/user.ts getUser(), but with fields present in other
// parts of ClaudeCodeInternalEvent deduplicated.
// Convert camelCase GitHubActionsMetadata to snake_case for 1P API
// Note: github_actions_metadata is placed inside env (EnvironmentMetadata)
// rather than at the top level of ClaudeCodeInternalEvent
if (userMetadata.githubActionsMetadata) {
const ghMeta = userMetadata.githubActionsMetadata
env.github_actions_metadata = {
actor_id: ghMeta.actorId,
repository_id: ghMeta.repositoryId,
repository_owner_id: ghMeta.repositoryOwnerId,
}
}
let auth: PublicApiAuth | undefined
if (userMetadata.accountUuid || userMetadata.organizationUuid) {
auth = {
account_uuid: userMetadata.accountUuid,
organization_uuid: userMetadata.organizationUuid,
}
}
return {
env,
...(processMetrics && {
process: Buffer.from(jsonStringify(processMetrics)).toString('base64'),
}),
...(auth && { auth }),
core,
additional: {
...(rh && { rh }),
...(kairosActive && { is_assistant_mode: true }),
...(skillMode && { skill_mode: skillMode }),
...(observerMode && { observer_mode: observerMode }),
...additionalMetadata,
},
}
}

View File

@@ -0,0 +1,114 @@
/**
* Analytics sink implementation
*
* This module contains the actual analytics routing logic and should be
* initialized during app startup. It routes events to Datadog and 1P event
* logging.
*
* Usage: Call initializeAnalyticsSink() during app startup to attach the sink.
*/
import { trackDatadogEvent } from './datadog.js'
import { logEventTo1P, shouldSampleEvent } from './firstPartyEventLogger.js'
import { checkStatsigFeatureGate_CACHED_MAY_BE_STALE } from './growthbook.js'
import { attachAnalyticsSink, stripProtoFields } from './index.js'
import { isSinkKilled } from './sinkKillswitch.js'
// Local type matching the logEvent metadata signature
type LogEventMetadata = { [key: string]: boolean | number | undefined }
const DATADOG_GATE_NAME = 'tengu_log_datadog_events'
// Module-level gate state - starts undefined, initialized during startup
let isDatadogGateEnabled: boolean | undefined = undefined
/**
* Check if Datadog tracking is enabled.
* Falls back to cached value from previous session if not yet initialized.
*/
function shouldTrackDatadog(): boolean {
if (isSinkKilled('datadog')) {
return false
}
if (isDatadogGateEnabled !== undefined) {
return isDatadogGateEnabled
}
// Fallback to cached value from previous session
try {
return checkStatsigFeatureGate_CACHED_MAY_BE_STALE(DATADOG_GATE_NAME)
} catch {
return false
}
}
/**
* Log an event (synchronous implementation)
*/
function logEventImpl(eventName: string, metadata: LogEventMetadata): void {
// Check if this event should be sampled
const sampleResult = shouldSampleEvent(eventName)
// If sample result is 0, the event was not selected for logging
if (sampleResult === 0) {
return
}
// If sample result is a positive number, add it to metadata
const metadataWithSampleRate =
sampleResult !== null
? { ...metadata, sample_rate: sampleResult }
: metadata
if (shouldTrackDatadog()) {
// Datadog is a general-access backend — strip _PROTO_* keys
// (unredacted PII-tagged values meant only for the 1P privileged column).
void trackDatadogEvent(eventName, stripProtoFields(metadataWithSampleRate))
}
// 1P receives the full payload including _PROTO_* — the exporter
// destructures and routes those keys to proto fields itself.
logEventTo1P(eventName, metadataWithSampleRate)
}
/**
* Log an event (asynchronous implementation)
*
* With Segment removed the two remaining sinks are fire-and-forget, so this
* just wraps the sync impl — kept to preserve the sink interface contract.
*/
function logEventAsyncImpl(
eventName: string,
metadata: LogEventMetadata,
): Promise<void> {
logEventImpl(eventName, metadata)
return Promise.resolve()
}
/**
* Initialize analytics gates during startup.
*
* Updates gate values from server. Early events use cached values from previous
* session to avoid data loss during initialization.
*
* Called from main.tsx during setupBackend().
*/
export function initializeAnalyticsGates(): void {
isDatadogGateEnabled =
checkStatsigFeatureGate_CACHED_MAY_BE_STALE(DATADOG_GATE_NAME)
}
/**
* Initialize the analytics sink.
*
* Call this during app startup to attach the analytics backend.
* Any events logged before this is called will be queued and drained.
*
* Idempotent: safe to call multiple times (subsequent calls are no-ops).
*/
export function initializeAnalyticsSink(): void {
attachAnalyticsSink({
logEvent: logEventImpl,
logEventAsync: logEventAsyncImpl,
})
}

View File

@@ -0,0 +1,25 @@
import { getDynamicConfig_CACHED_MAY_BE_STALE } from './growthbook.js'
// Mangled name: per-sink analytics killswitch
const SINK_KILLSWITCH_CONFIG_NAME = 'tengu_frond_boric'
export type SinkName = 'datadog' | 'firstParty'
/**
* GrowthBook JSON config that disables individual analytics sinks.
* Shape: { datadog?: boolean, firstParty?: boolean }
* A value of true for a key stops all dispatch to that sink.
* Default {} (nothing killed). Fail-open: missing/malformed config = sink stays on.
*
* NOTE: Must NOT be called from inside is1PEventLoggingEnabled() -
* growthbook.ts:isGrowthBookEnabled() calls that, so a lookup here would recurse.
* Call at per-event dispatch sites instead.
*/
export function isSinkKilled(sink: SinkName): boolean {
const config = getDynamicConfig_CACHED_MAY_BE_STALE<
Partial<Record<SinkName, boolean>>
>(SINK_KILLSWITCH_CONFIG_NAME, {})
// getFeatureValue_CACHED_MAY_BE_STALE guards on `!== undefined`, so a
// cached JSON null leaks through instead of falling back to {}.
return config?.[sink] === true
}