Disable analytics, GrowthBook, and telemetry egress
This commit is contained in:
@@ -45,10 +45,11 @@ Removed in this repository:
|
||||
- Remote Control / Bridge registration fields that sent machine name, git branch, and git repository URL, plus git source/outcome data in bridge session creation.
|
||||
- Trusted-device enrollment and trusted-device token header emission for bridge requests.
|
||||
- `/insights` automatic S3 upload; reports now stay local via `file://` paths only.
|
||||
- Datadog analytics and Anthropic 1P event-logging egress.
|
||||
- GrowthBook remote evaluation/network fetches; local env/config overrides and cached values remain available for compatibility.
|
||||
- OpenTelemetry initialization and event export paths.
|
||||
|
||||
Still present:
|
||||
|
||||
- Normal Claude API requests are still part of product functionality; this fork only removes extra local metadata injection, not core model/network access.
|
||||
- Datadog and Anthropic 1P analytics codepaths still exist and can emit environment/process metadata unless disabled by runtime privacy settings.
|
||||
- GrowthBook remote evaluation still exists and still prepares remote-eval user attributes.
|
||||
- Optional OpenTelemetry export still exists behind telemetry configuration flags.
|
||||
- Compatibility scaffolding for analytics, GrowthBook, and telemetry still exists in the tree as local no-op or cache-only code.
|
||||
|
||||
@@ -87,22 +87,8 @@ export const init = memoize(async (): Promise<void> => {
|
||||
setupGracefulShutdown()
|
||||
profileCheckpoint('init_after_graceful_shutdown')
|
||||
|
||||
// Initialize 1P event logging (no security concerns, but deferred to avoid
|
||||
// loading OpenTelemetry sdk-logs at startup). growthbook.js is already in
|
||||
// the module cache by this point (firstPartyEventLogger imports it), so the
|
||||
// second dynamic import adds no load cost.
|
||||
void Promise.all([
|
||||
import('../services/analytics/firstPartyEventLogger.js'),
|
||||
import('../services/analytics/growthbook.js'),
|
||||
]).then(([fp, gb]) => {
|
||||
fp.initialize1PEventLogging()
|
||||
// Rebuild the logger provider if tengu_1p_event_batch_config changes
|
||||
// mid-session. Change detection (isEqual) is inside the handler so
|
||||
// unchanged refreshes are no-ops.
|
||||
gb.onGrowthBookRefresh(() => {
|
||||
void fp.reinitialize1PEventLoggingIfConfigChanged()
|
||||
})
|
||||
})
|
||||
// Telemetry/log export is disabled in this build. Keep the startup
|
||||
// checkpoint so callers depending on the init timeline still see it.
|
||||
profileCheckpoint('init_after_1p_event_logging')
|
||||
|
||||
// Populate OAuth account info if it is not already cached in config. This is needed since the
|
||||
@@ -245,96 +231,14 @@ export const init = memoize(async (): Promise<void> => {
|
||||
* This should only be called once, after the trust dialog has been accepted.
|
||||
*/
|
||||
export function initializeTelemetryAfterTrust(): void {
|
||||
if (isEligibleForRemoteManagedSettings()) {
|
||||
// For SDK/headless mode with beta tracing, initialize eagerly first
|
||||
// to ensure the tracer is ready before the first query runs.
|
||||
// The async path below will still run but doInitializeTelemetry() guards against double init.
|
||||
if (getIsNonInteractiveSession() && isBetaTracingEnabled()) {
|
||||
void doInitializeTelemetry().catch(error => {
|
||||
logForDebugging(
|
||||
`[3P telemetry] Eager telemetry init failed (beta tracing): ${errorMessage(error)}`,
|
||||
{ level: 'error' },
|
||||
)
|
||||
})
|
||||
}
|
||||
logForDebugging(
|
||||
'[3P telemetry] Waiting for remote managed settings before telemetry init',
|
||||
)
|
||||
void waitForRemoteManagedSettingsToLoad()
|
||||
.then(async () => {
|
||||
logForDebugging(
|
||||
'[3P telemetry] Remote managed settings loaded, initializing telemetry',
|
||||
)
|
||||
// Re-apply env vars to pick up remote settings before initializing telemetry.
|
||||
applyConfigEnvironmentVariables()
|
||||
await doInitializeTelemetry()
|
||||
})
|
||||
.catch(error => {
|
||||
logForDebugging(
|
||||
`[3P telemetry] Telemetry init failed (remote settings path): ${errorMessage(error)}`,
|
||||
{ level: 'error' },
|
||||
)
|
||||
})
|
||||
} else {
|
||||
void doInitializeTelemetry().catch(error => {
|
||||
logForDebugging(
|
||||
`[3P telemetry] Telemetry init failed: ${errorMessage(error)}`,
|
||||
{ level: 'error' },
|
||||
)
|
||||
})
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
async function doInitializeTelemetry(): Promise<void> {
|
||||
if (telemetryInitialized) {
|
||||
// Already initialized, nothing to do
|
||||
void telemetryInitialized
|
||||
return
|
||||
}
|
||||
|
||||
// Set flag before init to prevent double initialization
|
||||
telemetryInitialized = true
|
||||
try {
|
||||
await setMeterState()
|
||||
} catch (error) {
|
||||
// Reset flag on failure so subsequent calls can retry
|
||||
telemetryInitialized = false
|
||||
throw error
|
||||
}
|
||||
}
|
||||
|
||||
async function setMeterState(): Promise<void> {
|
||||
// Lazy-load instrumentation to defer ~400KB of OpenTelemetry + protobuf
|
||||
const { initializeTelemetry } = await import(
|
||||
'../utils/telemetry/instrumentation.js'
|
||||
)
|
||||
// Initialize customer OTLP telemetry (metrics, logs, traces)
|
||||
const meter = await initializeTelemetry()
|
||||
if (meter) {
|
||||
// Create factory function for attributed counters
|
||||
const createAttributedCounter = (
|
||||
name: string,
|
||||
options: MetricOptions,
|
||||
): AttributedCounter => {
|
||||
const counter = meter?.createCounter(name, options)
|
||||
|
||||
return {
|
||||
add(value: number, additionalAttributes: Attributes = {}) {
|
||||
// Always fetch fresh telemetry attributes to ensure they're up to date
|
||||
const currentAttributes = getTelemetryAttributes()
|
||||
const mergedAttributes = {
|
||||
...currentAttributes,
|
||||
...additionalAttributes,
|
||||
}
|
||||
counter?.add(value, mergedAttributes)
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
setMeter(meter, createAttributedCounter)
|
||||
|
||||
// Increment session counter here because the startup telemetry path
|
||||
// runs before this async initialization completes, so the counter
|
||||
// would be null there.
|
||||
getSessionCounter()?.add(1)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -1,307 +1,20 @@
|
||||
import axios from 'axios'
|
||||
import { createHash } from 'crypto'
|
||||
import memoize from 'lodash-es/memoize.js'
|
||||
import { getOrCreateUserID } from '../../utils/config.js'
|
||||
import { logError } from '../../utils/log.js'
|
||||
import { getCanonicalName } from '../../utils/model/model.js'
|
||||
import { getAPIProvider } from '../../utils/model/providers.js'
|
||||
import { MODEL_COSTS } from '../../utils/modelCost.js'
|
||||
import { isAnalyticsDisabled } from './config.js'
|
||||
import { getEventMetadata } from './metadata.js'
|
||||
|
||||
const DATADOG_LOGS_ENDPOINT =
|
||||
'https://http-intake.logs.us5.datadoghq.com/api/v2/logs'
|
||||
const DATADOG_CLIENT_TOKEN = 'pubbbf48e6d78dae54bceaa4acf463299bf'
|
||||
const DEFAULT_FLUSH_INTERVAL_MS = 15000
|
||||
const MAX_BATCH_SIZE = 100
|
||||
const NETWORK_TIMEOUT_MS = 5000
|
||||
|
||||
const DATADOG_ALLOWED_EVENTS = new Set([
|
||||
'chrome_bridge_connection_succeeded',
|
||||
'chrome_bridge_connection_failed',
|
||||
'chrome_bridge_disconnected',
|
||||
'chrome_bridge_tool_call_completed',
|
||||
'chrome_bridge_tool_call_error',
|
||||
'chrome_bridge_tool_call_started',
|
||||
'chrome_bridge_tool_call_timeout',
|
||||
'tengu_api_error',
|
||||
'tengu_api_success',
|
||||
'tengu_brief_mode_enabled',
|
||||
'tengu_brief_mode_toggled',
|
||||
'tengu_brief_send',
|
||||
'tengu_cancel',
|
||||
'tengu_compact_failed',
|
||||
'tengu_exit',
|
||||
'tengu_flicker',
|
||||
'tengu_init',
|
||||
'tengu_model_fallback_triggered',
|
||||
'tengu_oauth_error',
|
||||
'tengu_oauth_success',
|
||||
'tengu_oauth_token_refresh_failure',
|
||||
'tengu_oauth_token_refresh_success',
|
||||
'tengu_oauth_token_refresh_lock_acquiring',
|
||||
'tengu_oauth_token_refresh_lock_acquired',
|
||||
'tengu_oauth_token_refresh_starting',
|
||||
'tengu_oauth_token_refresh_completed',
|
||||
'tengu_oauth_token_refresh_lock_releasing',
|
||||
'tengu_oauth_token_refresh_lock_released',
|
||||
'tengu_query_error',
|
||||
'tengu_session_file_read',
|
||||
'tengu_started',
|
||||
'tengu_tool_use_error',
|
||||
'tengu_tool_use_granted_in_prompt_permanent',
|
||||
'tengu_tool_use_granted_in_prompt_temporary',
|
||||
'tengu_tool_use_rejected_in_prompt',
|
||||
'tengu_tool_use_success',
|
||||
'tengu_uncaught_exception',
|
||||
'tengu_unhandled_rejection',
|
||||
'tengu_voice_recording_started',
|
||||
'tengu_voice_toggled',
|
||||
'tengu_team_mem_sync_pull',
|
||||
'tengu_team_mem_sync_push',
|
||||
'tengu_team_mem_sync_started',
|
||||
'tengu_team_mem_entries_capped',
|
||||
])
|
||||
|
||||
const TAG_FIELDS = [
|
||||
'arch',
|
||||
'clientType',
|
||||
'errorType',
|
||||
'http_status_range',
|
||||
'http_status',
|
||||
'kairosActive',
|
||||
'model',
|
||||
'platform',
|
||||
'provider',
|
||||
'skillMode',
|
||||
'subscriptionType',
|
||||
'toolName',
|
||||
'userBucket',
|
||||
'userType',
|
||||
'version',
|
||||
'versionBase',
|
||||
]
|
||||
|
||||
function camelToSnakeCase(str: string): string {
|
||||
return str.replace(/[A-Z]/g, letter => `_${letter.toLowerCase()}`)
|
||||
}
|
||||
|
||||
type DatadogLog = {
|
||||
ddsource: string
|
||||
ddtags: string
|
||||
message: string
|
||||
service: string
|
||||
hostname: string
|
||||
[key: string]: unknown
|
||||
}
|
||||
|
||||
let logBatch: DatadogLog[] = []
|
||||
let flushTimer: NodeJS.Timeout | null = null
|
||||
let datadogInitialized: boolean | null = null
|
||||
|
||||
async function flushLogs(): Promise<void> {
|
||||
if (logBatch.length === 0) return
|
||||
|
||||
const logsToSend = logBatch
|
||||
logBatch = []
|
||||
|
||||
try {
|
||||
await axios.post(DATADOG_LOGS_ENDPOINT, logsToSend, {
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
'DD-API-KEY': DATADOG_CLIENT_TOKEN,
|
||||
},
|
||||
timeout: NETWORK_TIMEOUT_MS,
|
||||
})
|
||||
} catch (error) {
|
||||
logError(error)
|
||||
}
|
||||
}
|
||||
|
||||
function scheduleFlush(): void {
|
||||
if (flushTimer) return
|
||||
|
||||
flushTimer = setTimeout(() => {
|
||||
flushTimer = null
|
||||
void flushLogs()
|
||||
}, getFlushIntervalMs()).unref()
|
||||
}
|
||||
|
||||
export const initializeDatadog = memoize(async (): Promise<boolean> => {
|
||||
if (isAnalyticsDisabled()) {
|
||||
datadogInitialized = false
|
||||
return false
|
||||
}
|
||||
|
||||
try {
|
||||
datadogInitialized = true
|
||||
return true
|
||||
} catch (error) {
|
||||
logError(error)
|
||||
datadogInitialized = false
|
||||
return false
|
||||
}
|
||||
})
|
||||
|
||||
/**
|
||||
* Flush remaining Datadog logs and shut down.
|
||||
* Called from gracefulShutdown() before process.exit() since
|
||||
* forceExit() prevents the beforeExit handler from firing.
|
||||
* Datadog analytics egress is disabled in this build.
|
||||
*
|
||||
* The exported functions remain so existing call sites do not need to branch.
|
||||
*/
|
||||
|
||||
export async function initializeDatadog(): Promise<boolean> {
|
||||
return false
|
||||
}
|
||||
|
||||
export async function shutdownDatadog(): Promise<void> {
|
||||
if (flushTimer) {
|
||||
clearTimeout(flushTimer)
|
||||
flushTimer = null
|
||||
}
|
||||
await flushLogs()
|
||||
return
|
||||
}
|
||||
|
||||
// NOTE: use via src/services/analytics/index.ts > logEvent
|
||||
export async function trackDatadogEvent(
|
||||
eventName: string,
|
||||
properties: { [key: string]: boolean | number | undefined },
|
||||
_eventName: string,
|
||||
_properties: { [key: string]: boolean | number | undefined },
|
||||
): Promise<void> {
|
||||
if (process.env.NODE_ENV !== 'production') {
|
||||
return
|
||||
}
|
||||
|
||||
// Don't send events for 3P providers (Bedrock, Vertex, Foundry)
|
||||
if (getAPIProvider() !== 'firstParty') {
|
||||
return
|
||||
}
|
||||
|
||||
// Fast path: use cached result if available to avoid await overhead
|
||||
let initialized = datadogInitialized
|
||||
if (initialized === null) {
|
||||
initialized = await initializeDatadog()
|
||||
}
|
||||
if (!initialized || !DATADOG_ALLOWED_EVENTS.has(eventName)) {
|
||||
return
|
||||
}
|
||||
|
||||
try {
|
||||
const metadata = await getEventMetadata({
|
||||
model: properties.model,
|
||||
betas: properties.betas,
|
||||
})
|
||||
// Destructure to avoid duplicate envContext (once nested, once flattened)
|
||||
const { envContext, ...restMetadata } = metadata
|
||||
const allData: Record<string, unknown> = {
|
||||
...restMetadata,
|
||||
...envContext,
|
||||
...properties,
|
||||
userBucket: getUserBucket(),
|
||||
}
|
||||
|
||||
// Normalize MCP tool names to "mcp" for cardinality reduction
|
||||
if (
|
||||
typeof allData.toolName === 'string' &&
|
||||
allData.toolName.startsWith('mcp__')
|
||||
) {
|
||||
allData.toolName = 'mcp'
|
||||
}
|
||||
|
||||
// Normalize model names for cardinality reduction (external users only)
|
||||
if (process.env.USER_TYPE !== 'ant' && typeof allData.model === 'string') {
|
||||
const shortName = getCanonicalName(allData.model.replace(/\[1m]$/i, ''))
|
||||
allData.model = shortName in MODEL_COSTS ? shortName : 'other'
|
||||
}
|
||||
|
||||
// Truncate dev version to base + date (remove timestamp and sha for cardinality reduction)
|
||||
// e.g. "2.0.53-dev.20251124.t173302.sha526cc6a" -> "2.0.53-dev.20251124"
|
||||
if (typeof allData.version === 'string') {
|
||||
allData.version = allData.version.replace(
|
||||
/^(\d+\.\d+\.\d+-dev\.\d{8})\.t\d+\.sha[a-f0-9]+$/,
|
||||
'$1',
|
||||
)
|
||||
}
|
||||
|
||||
// Transform status to http_status and http_status_range to avoid Datadog reserved field
|
||||
if (allData.status !== undefined && allData.status !== null) {
|
||||
const statusCode = String(allData.status)
|
||||
allData.http_status = statusCode
|
||||
|
||||
// Determine status range (1xx, 2xx, 3xx, 4xx, 5xx)
|
||||
const firstDigit = statusCode.charAt(0)
|
||||
if (firstDigit >= '1' && firstDigit <= '5') {
|
||||
allData.http_status_range = `${firstDigit}xx`
|
||||
}
|
||||
|
||||
// Remove original status field to avoid conflict with Datadog's reserved field
|
||||
delete allData.status
|
||||
}
|
||||
|
||||
// Build ddtags with high-cardinality fields for filtering.
|
||||
// event:<name> is prepended so the event name is searchable via the
|
||||
// log search API — the `message` field (where eventName also lives)
|
||||
// is a DD reserved field and is NOT queryable from dashboard widget
|
||||
// queries or the aggregation API. See scripts/release/MONITORING.md.
|
||||
const allDataRecord = allData
|
||||
const tags = [
|
||||
`event:${eventName}`,
|
||||
...TAG_FIELDS.filter(
|
||||
field =>
|
||||
allDataRecord[field] !== undefined && allDataRecord[field] !== null,
|
||||
).map(field => `${camelToSnakeCase(field)}:${allDataRecord[field]}`),
|
||||
]
|
||||
|
||||
const log: DatadogLog = {
|
||||
ddsource: 'nodejs',
|
||||
ddtags: tags.join(','),
|
||||
message: eventName,
|
||||
service: 'claude-code',
|
||||
hostname: 'claude-code',
|
||||
env: process.env.USER_TYPE,
|
||||
}
|
||||
|
||||
// Add all fields as searchable attributes (not duplicated in tags)
|
||||
for (const [key, value] of Object.entries(allData)) {
|
||||
if (value !== undefined && value !== null) {
|
||||
log[camelToSnakeCase(key)] = value
|
||||
}
|
||||
}
|
||||
|
||||
logBatch.push(log)
|
||||
|
||||
// Flush immediately if batch is full, otherwise schedule
|
||||
if (logBatch.length >= MAX_BATCH_SIZE) {
|
||||
if (flushTimer) {
|
||||
clearTimeout(flushTimer)
|
||||
flushTimer = null
|
||||
}
|
||||
void flushLogs()
|
||||
} else {
|
||||
scheduleFlush()
|
||||
}
|
||||
} catch (error) {
|
||||
logError(error)
|
||||
}
|
||||
}
|
||||
|
||||
const NUM_USER_BUCKETS = 30
|
||||
|
||||
/**
|
||||
* Gets a 'bucket' that the user ID falls into.
|
||||
*
|
||||
* For alerting purposes, we want to alert on the number of users impacted
|
||||
* by an issue, rather than the number of events- often a small number of users
|
||||
* can generate a large number of events (e.g. due to retries). To approximate
|
||||
* this without ruining cardinality by counting user IDs directly, we hash the user ID
|
||||
* and assign it to one of a fixed number of buckets.
|
||||
*
|
||||
* This allows us to estimate the number of unique users by counting unique buckets,
|
||||
* while preserving user privacy and reducing cardinality.
|
||||
*/
|
||||
const getUserBucket = memoize((): number => {
|
||||
const userId = getOrCreateUserID()
|
||||
const hash = createHash('sha256').update(userId).digest('hex')
|
||||
return parseInt(hash.slice(0, 8), 16) % NUM_USER_BUCKETS
|
||||
})
|
||||
|
||||
function getFlushIntervalMs(): number {
|
||||
// Allow tests to override to not block on the default flush interval.
|
||||
return (
|
||||
parseInt(process.env.CLAUDE_CODE_DATADOG_FLUSH_INTERVAL_MS || '', 10) ||
|
||||
DEFAULT_FLUSH_INTERVAL_MS
|
||||
)
|
||||
}
|
||||
|
||||
@@ -1,237 +1,41 @@
|
||||
import type { AnyValueMap, Logger, logs } from '@opentelemetry/api-logs'
|
||||
import { resourceFromAttributes } from '@opentelemetry/resources'
|
||||
import {
|
||||
BatchLogRecordProcessor,
|
||||
LoggerProvider,
|
||||
} from '@opentelemetry/sdk-logs'
|
||||
import {
|
||||
ATTR_SERVICE_NAME,
|
||||
ATTR_SERVICE_VERSION,
|
||||
} from '@opentelemetry/semantic-conventions'
|
||||
import { randomUUID } from 'crypto'
|
||||
import { isEqual } from 'lodash-es'
|
||||
import { getOrCreateUserID } from '../../utils/config.js'
|
||||
import { logForDebugging } from '../../utils/debug.js'
|
||||
import { logError } from '../../utils/log.js'
|
||||
import { getPlatform, getWslVersion } from '../../utils/platform.js'
|
||||
import { jsonStringify } from '../../utils/slowOperations.js'
|
||||
import { profileCheckpoint } from '../../utils/startupProfiler.js'
|
||||
import { getCoreUserData } from '../../utils/user.js'
|
||||
import { isAnalyticsDisabled } from './config.js'
|
||||
import { FirstPartyEventLoggingExporter } from './firstPartyEventLoggingExporter.js'
|
||||
import type { GrowthBookUserAttributes } from './growthbook.js'
|
||||
import { getDynamicConfig_CACHED_MAY_BE_STALE } from './growthbook.js'
|
||||
import { getEventMetadata } from './metadata.js'
|
||||
import { isSinkKilled } from './sinkKillswitch.js'
|
||||
|
||||
/**
|
||||
* Configuration for sampling individual event types.
|
||||
* Each event name maps to an object containing sample_rate (0-1).
|
||||
* Events not in the config are logged at 100% rate.
|
||||
* Anthropic 1P event logging egress is disabled in this build.
|
||||
*
|
||||
* The module keeps its public API so the rest of the app can call into it
|
||||
* without conditional imports.
|
||||
*/
|
||||
|
||||
import type { GrowthBookUserAttributes } from './growthbook.js'
|
||||
|
||||
export type EventSamplingConfig = {
|
||||
[eventName: string]: {
|
||||
sample_rate: number
|
||||
}
|
||||
}
|
||||
|
||||
const EVENT_SAMPLING_CONFIG_NAME = 'tengu_event_sampling_config'
|
||||
/**
|
||||
* Get the event sampling configuration from GrowthBook.
|
||||
* Uses cached value if available, updates cache in background.
|
||||
*/
|
||||
export function getEventSamplingConfig(): EventSamplingConfig {
|
||||
return getDynamicConfig_CACHED_MAY_BE_STALE<EventSamplingConfig>(
|
||||
EVENT_SAMPLING_CONFIG_NAME,
|
||||
{},
|
||||
)
|
||||
return {}
|
||||
}
|
||||
|
||||
/**
|
||||
* Determine if an event should be sampled based on its sample rate.
|
||||
* Returns the sample rate if sampled, null if not sampled.
|
||||
*
|
||||
* @param eventName - Name of the event to check
|
||||
* @returns The sample_rate if event should be logged, null if it should be dropped
|
||||
*/
|
||||
export function shouldSampleEvent(eventName: string): number | null {
|
||||
const config = getEventSamplingConfig()
|
||||
const eventConfig = config[eventName]
|
||||
|
||||
// If no config for this event, log at 100% rate (no sampling)
|
||||
if (!eventConfig) {
|
||||
export function shouldSampleEvent(_eventName: string): number | null {
|
||||
return null
|
||||
}
|
||||
|
||||
const sampleRate = eventConfig.sample_rate
|
||||
|
||||
// Validate sample rate is in valid range
|
||||
if (typeof sampleRate !== 'number' || sampleRate < 0 || sampleRate > 1) {
|
||||
return null
|
||||
}
|
||||
|
||||
// Sample rate of 1 means log everything (no need to add metadata)
|
||||
if (sampleRate >= 1) {
|
||||
return null
|
||||
}
|
||||
|
||||
// Sample rate of 0 means drop everything
|
||||
if (sampleRate <= 0) {
|
||||
return 0
|
||||
}
|
||||
|
||||
// Randomly decide whether to sample this event
|
||||
return Math.random() < sampleRate ? sampleRate : 0
|
||||
}
|
||||
|
||||
const BATCH_CONFIG_NAME = 'tengu_1p_event_batch_config'
|
||||
type BatchConfig = {
|
||||
scheduledDelayMillis?: number
|
||||
maxExportBatchSize?: number
|
||||
maxQueueSize?: number
|
||||
skipAuth?: boolean
|
||||
maxAttempts?: number
|
||||
path?: string
|
||||
baseUrl?: string
|
||||
}
|
||||
function getBatchConfig(): BatchConfig {
|
||||
return getDynamicConfig_CACHED_MAY_BE_STALE<BatchConfig>(
|
||||
BATCH_CONFIG_NAME,
|
||||
{},
|
||||
)
|
||||
}
|
||||
|
||||
// Module-local state for event logging (not exposed globally)
|
||||
let firstPartyEventLogger: ReturnType<typeof logs.getLogger> | null = null
|
||||
let firstPartyEventLoggerProvider: LoggerProvider | null = null
|
||||
// Last batch config used to construct the provider — used by
|
||||
// reinitialize1PEventLoggingIfConfigChanged to decide whether a rebuild is
|
||||
// needed when GrowthBook refreshes.
|
||||
let lastBatchConfig: BatchConfig | null = null
|
||||
/**
|
||||
* Flush and shutdown the 1P event logger.
|
||||
* This should be called as the final step before process exit to ensure
|
||||
* all events (including late ones from API responses) are exported.
|
||||
*/
|
||||
export async function shutdown1PEventLogging(): Promise<void> {
|
||||
if (!firstPartyEventLoggerProvider) {
|
||||
return
|
||||
}
|
||||
try {
|
||||
await firstPartyEventLoggerProvider.shutdown()
|
||||
if (process.env.USER_TYPE === 'ant') {
|
||||
logForDebugging('1P event logging: final shutdown complete')
|
||||
}
|
||||
} catch {
|
||||
// Ignore shutdown errors
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if 1P event logging is enabled.
|
||||
* Respects the same opt-outs as other analytics sinks:
|
||||
* - Test environment
|
||||
* - Third-party cloud providers (Bedrock/Vertex)
|
||||
* - Global telemetry opt-outs
|
||||
* - Non-essential traffic disabled
|
||||
*
|
||||
* Note: Unlike BigQuery metrics, event logging does NOT check organization-level
|
||||
* metrics opt-out via API. It follows the same pattern as Statsig event logging.
|
||||
*/
|
||||
export function is1PEventLoggingEnabled(): boolean {
|
||||
// Respect standard analytics opt-outs
|
||||
return !isAnalyticsDisabled()
|
||||
return false
|
||||
}
|
||||
|
||||
/**
|
||||
* Log a 1st-party event for internal analytics (async version).
|
||||
* Events are batched and exported to /api/event_logging/batch
|
||||
*
|
||||
* This enriches the event with core metadata (model, session, env context, etc.)
|
||||
* at log time, similar to logEventToStatsig.
|
||||
*
|
||||
* @param eventName - Name of the event (e.g., 'tengu_api_query')
|
||||
* @param metadata - Additional metadata for the event (intentionally no strings, to avoid accidentally logging code/filepaths)
|
||||
*/
|
||||
async function logEventTo1PAsync(
|
||||
firstPartyEventLogger: Logger,
|
||||
eventName: string,
|
||||
metadata: Record<string, number | boolean | undefined> = {},
|
||||
): Promise<void> {
|
||||
try {
|
||||
// Enrich with core metadata at log time (similar to Statsig pattern)
|
||||
const coreMetadata = await getEventMetadata({
|
||||
model: metadata.model,
|
||||
betas: metadata.betas,
|
||||
})
|
||||
|
||||
// Build attributes - OTel supports nested objects natively via AnyValueMap
|
||||
// Cast through unknown since our nested objects are structurally compatible
|
||||
// with AnyValue but TS doesn't recognize it due to missing index signatures
|
||||
const attributes = {
|
||||
event_name: eventName,
|
||||
event_id: randomUUID(),
|
||||
// Pass objects directly - no JSON serialization needed
|
||||
core_metadata: coreMetadata,
|
||||
user_metadata: getCoreUserData(true),
|
||||
event_metadata: metadata,
|
||||
} as unknown as AnyValueMap
|
||||
|
||||
// Add user_id if available
|
||||
const userId = getOrCreateUserID()
|
||||
if (userId) {
|
||||
attributes.user_id = userId
|
||||
}
|
||||
|
||||
// Debug logging when debug mode is enabled
|
||||
if (process.env.USER_TYPE === 'ant') {
|
||||
logForDebugging(
|
||||
`[ANT-ONLY] 1P event: ${eventName} ${jsonStringify(metadata, null, 0)}`,
|
||||
)
|
||||
}
|
||||
|
||||
// Emit log record
|
||||
firstPartyEventLogger.emit({
|
||||
body: eventName,
|
||||
attributes,
|
||||
})
|
||||
} catch (e) {
|
||||
if (process.env.NODE_ENV === 'development') {
|
||||
throw e
|
||||
}
|
||||
if (process.env.USER_TYPE === 'ant') {
|
||||
logError(e as Error)
|
||||
}
|
||||
// swallow
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Log a 1st-party event for internal analytics.
|
||||
* Events are batched and exported to /api/event_logging/batch
|
||||
*
|
||||
* @param eventName - Name of the event (e.g., 'tengu_api_query')
|
||||
* @param metadata - Additional metadata for the event (intentionally no strings, to avoid accidentally logging code/filepaths)
|
||||
*/
|
||||
export function logEventTo1P(
|
||||
eventName: string,
|
||||
metadata: Record<string, number | boolean | undefined> = {},
|
||||
_eventName: string,
|
||||
_metadata: Record<string, number | boolean | undefined> = {},
|
||||
): void {
|
||||
if (!is1PEventLoggingEnabled()) {
|
||||
return
|
||||
}
|
||||
|
||||
if (!firstPartyEventLogger || isSinkKilled('firstParty')) {
|
||||
return
|
||||
}
|
||||
|
||||
// Fire and forget - don't block on metadata enrichment
|
||||
void logEventTo1PAsync(firstPartyEventLogger, eventName, metadata)
|
||||
}
|
||||
|
||||
/**
|
||||
* GrowthBook experiment event data for logging
|
||||
*/
|
||||
export type GrowthBookExperimentData = {
|
||||
experimentId: string
|
||||
variationId: number
|
||||
@@ -239,211 +43,16 @@ export type GrowthBookExperimentData = {
|
||||
experimentMetadata?: Record<string, unknown>
|
||||
}
|
||||
|
||||
// api.anthropic.com only serves the "production" GrowthBook environment
|
||||
// (see starling/starling/cli/cli.py DEFAULT_ENVIRONMENTS). Staging and
|
||||
// development environments are not exported to the prod API.
|
||||
function getEnvironmentForGrowthBook(): string {
|
||||
return 'production'
|
||||
}
|
||||
|
||||
/**
|
||||
* Log a GrowthBook experiment assignment event to 1P.
|
||||
* Events are batched and exported to /api/event_logging/batch
|
||||
*
|
||||
* @param data - GrowthBook experiment assignment data
|
||||
*/
|
||||
export function logGrowthBookExperimentTo1P(
|
||||
data: GrowthBookExperimentData,
|
||||
_data: GrowthBookExperimentData,
|
||||
): void {
|
||||
if (!is1PEventLoggingEnabled()) {
|
||||
return
|
||||
}
|
||||
|
||||
if (!firstPartyEventLogger || isSinkKilled('firstParty')) {
|
||||
return
|
||||
}
|
||||
|
||||
const userId = getOrCreateUserID()
|
||||
const { accountUuid, organizationUuid } = getCoreUserData(true)
|
||||
|
||||
// Build attributes for GrowthbookExperimentEvent
|
||||
const attributes = {
|
||||
event_type: 'GrowthbookExperimentEvent',
|
||||
event_id: randomUUID(),
|
||||
experiment_id: data.experimentId,
|
||||
variation_id: data.variationId,
|
||||
...(userId && { device_id: userId }),
|
||||
...(accountUuid && { account_uuid: accountUuid }),
|
||||
...(organizationUuid && { organization_uuid: organizationUuid }),
|
||||
...(data.userAttributes && {
|
||||
session_id: data.userAttributes.sessionId,
|
||||
user_attributes: jsonStringify(data.userAttributes),
|
||||
}),
|
||||
...(data.experimentMetadata && {
|
||||
experiment_metadata: jsonStringify(data.experimentMetadata),
|
||||
}),
|
||||
environment: getEnvironmentForGrowthBook(),
|
||||
}
|
||||
|
||||
if (process.env.USER_TYPE === 'ant') {
|
||||
logForDebugging(
|
||||
`[ANT-ONLY] 1P GrowthBook experiment: ${data.experimentId} variation=${data.variationId}`,
|
||||
)
|
||||
}
|
||||
|
||||
firstPartyEventLogger.emit({
|
||||
body: 'growthbook_experiment',
|
||||
attributes,
|
||||
})
|
||||
}
|
||||
|
||||
const DEFAULT_LOGS_EXPORT_INTERVAL_MS = 10000
|
||||
const DEFAULT_MAX_EXPORT_BATCH_SIZE = 200
|
||||
const DEFAULT_MAX_QUEUE_SIZE = 8192
|
||||
|
||||
/**
|
||||
* Initialize 1P event logging infrastructure.
|
||||
* This creates a separate LoggerProvider for internal event logging,
|
||||
* independent of customer OTLP telemetry.
|
||||
*
|
||||
* This uses its own minimal resource configuration with just the attributes
|
||||
* we need for internal analytics (service name, version, platform info).
|
||||
*/
|
||||
export function initialize1PEventLogging(): void {
|
||||
profileCheckpoint('1p_event_logging_start')
|
||||
const enabled = is1PEventLoggingEnabled()
|
||||
|
||||
if (!enabled) {
|
||||
if (process.env.USER_TYPE === 'ant') {
|
||||
logForDebugging('1P event logging not enabled')
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Fetch batch processor configuration from GrowthBook dynamic config
|
||||
// Uses cached value if available, refreshes in background
|
||||
const batchConfig = getBatchConfig()
|
||||
lastBatchConfig = batchConfig
|
||||
profileCheckpoint('1p_event_after_growthbook_config')
|
||||
|
||||
const scheduledDelayMillis =
|
||||
batchConfig.scheduledDelayMillis ||
|
||||
parseInt(
|
||||
process.env.OTEL_LOGS_EXPORT_INTERVAL ||
|
||||
DEFAULT_LOGS_EXPORT_INTERVAL_MS.toString(),
|
||||
)
|
||||
|
||||
const maxExportBatchSize =
|
||||
batchConfig.maxExportBatchSize || DEFAULT_MAX_EXPORT_BATCH_SIZE
|
||||
|
||||
const maxQueueSize = batchConfig.maxQueueSize || DEFAULT_MAX_QUEUE_SIZE
|
||||
|
||||
// Build our own resource for 1P event logging with minimal attributes
|
||||
const platform = getPlatform()
|
||||
const attributes: Record<string, string> = {
|
||||
[ATTR_SERVICE_NAME]: 'claude-code',
|
||||
[ATTR_SERVICE_VERSION]: MACRO.VERSION,
|
||||
}
|
||||
|
||||
// Add WSL-specific attributes if running on WSL
|
||||
if (platform === 'wsl') {
|
||||
const wslVersion = getWslVersion()
|
||||
if (wslVersion) {
|
||||
attributes['wsl.version'] = wslVersion
|
||||
}
|
||||
}
|
||||
|
||||
const resource = resourceFromAttributes(attributes)
|
||||
|
||||
// Create a new LoggerProvider with the EventLoggingExporter
|
||||
// NOTE: This is kept separate from customer telemetry logs to ensure
|
||||
// internal events don't leak to customer endpoints and vice versa.
|
||||
// We don't register this globally - it's only used for internal event logging.
|
||||
const eventLoggingExporter = new FirstPartyEventLoggingExporter({
|
||||
maxBatchSize: maxExportBatchSize,
|
||||
skipAuth: batchConfig.skipAuth,
|
||||
maxAttempts: batchConfig.maxAttempts,
|
||||
path: batchConfig.path,
|
||||
baseUrl: batchConfig.baseUrl,
|
||||
isKilled: () => isSinkKilled('firstParty'),
|
||||
})
|
||||
firstPartyEventLoggerProvider = new LoggerProvider({
|
||||
resource,
|
||||
processors: [
|
||||
new BatchLogRecordProcessor(eventLoggingExporter, {
|
||||
scheduledDelayMillis,
|
||||
maxExportBatchSize,
|
||||
maxQueueSize,
|
||||
}),
|
||||
],
|
||||
})
|
||||
|
||||
// Initialize event logger from our internal provider (NOT from global API)
|
||||
// IMPORTANT: We must get the logger from our local provider, not logs.getLogger()
|
||||
// because logs.getLogger() returns a logger from the global provider, which is
|
||||
// separate and used for customer telemetry.
|
||||
firstPartyEventLogger = firstPartyEventLoggerProvider.getLogger(
|
||||
'com.anthropic.claude_code.events',
|
||||
MACRO.VERSION,
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
* Rebuild the 1P event logging pipeline if the batch config changed.
|
||||
* Register this with onGrowthBookRefresh so long-running sessions pick up
|
||||
* changes to batch size, delay, endpoint, etc.
|
||||
*
|
||||
* Event-loss safety:
|
||||
* 1. Null the logger first — concurrent logEventTo1P() calls hit the
|
||||
* !firstPartyEventLogger guard and bail during the swap window. This drops
|
||||
* a handful of events but prevents emitting to a draining provider.
|
||||
* 2. forceFlush() drains the old BatchLogRecordProcessor buffer to the
|
||||
* exporter. Export failures go to disk at getCurrentBatchFilePath() which
|
||||
* is keyed by module-level BATCH_UUID + sessionId — unchanged across
|
||||
* reinit — so the NEW exporter's disk-backed retry picks them up.
|
||||
* 3. Swap to new provider/logger; old provider shutdown runs in background
|
||||
* (buffer already drained, just cleanup).
|
||||
*/
|
||||
export async function reinitialize1PEventLoggingIfConfigChanged(): Promise<void> {
|
||||
if (!is1PEventLoggingEnabled() || !firstPartyEventLoggerProvider) {
|
||||
return
|
||||
}
|
||||
|
||||
const newConfig = getBatchConfig()
|
||||
|
||||
if (isEqual(newConfig, lastBatchConfig)) {
|
||||
return
|
||||
}
|
||||
|
||||
if (process.env.USER_TYPE === 'ant') {
|
||||
logForDebugging(
|
||||
`1P event logging: ${BATCH_CONFIG_NAME} changed, reinitializing`,
|
||||
)
|
||||
}
|
||||
|
||||
const oldProvider = firstPartyEventLoggerProvider
|
||||
const oldLogger = firstPartyEventLogger
|
||||
firstPartyEventLogger = null
|
||||
|
||||
try {
|
||||
await oldProvider.forceFlush()
|
||||
} catch {
|
||||
// Export failures are already on disk; new exporter will retry them.
|
||||
}
|
||||
|
||||
firstPartyEventLoggerProvider = null
|
||||
try {
|
||||
initialize1PEventLogging()
|
||||
} catch (e) {
|
||||
// Restore so the next GrowthBook refresh can retry. oldProvider was
|
||||
// only forceFlush()'d, not shut down — it's still functional. Without
|
||||
// this, both stay null and the !firstPartyEventLoggerProvider gate at
|
||||
// the top makes recovery impossible.
|
||||
firstPartyEventLoggerProvider = oldProvider
|
||||
firstPartyEventLogger = oldLogger
|
||||
logError(e)
|
||||
return
|
||||
}
|
||||
|
||||
void oldProvider.shutdown().catch(() => {})
|
||||
}
|
||||
|
||||
@@ -21,7 +21,6 @@ import {
|
||||
getUserForGrowthBook,
|
||||
} from '../../utils/user.js'
|
||||
import {
|
||||
is1PEventLoggingEnabled,
|
||||
logGrowthBookExperimentTo1P,
|
||||
} from './firstPartyEventLogger.js'
|
||||
|
||||
@@ -219,6 +218,19 @@ function getConfigOverrides(): Record<string, unknown> | undefined {
|
||||
}
|
||||
}
|
||||
|
||||
function getCachedGrowthBookFeature<T>(feature: string): T | undefined {
|
||||
if (remoteEvalFeatureValues.has(feature)) {
|
||||
return remoteEvalFeatureValues.get(feature) as T
|
||||
}
|
||||
|
||||
try {
|
||||
const cached = getGlobalConfig().cachedGrowthBookFeatures?.[feature]
|
||||
return cached !== undefined ? (cached as T) : undefined
|
||||
} catch {
|
||||
return undefined
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Enumerate all known GrowthBook features and their current resolved values
|
||||
* (not including overrides). In-memory payload first, disk cache fallback —
|
||||
@@ -420,8 +432,9 @@ function syncRemoteEvalToDisk(): void {
|
||||
* Check if GrowthBook operations should be enabled
|
||||
*/
|
||||
function isGrowthBookEnabled(): boolean {
|
||||
// GrowthBook depends on 1P event logging.
|
||||
return is1PEventLoggingEnabled()
|
||||
// Network-backed GrowthBook egress is disabled in this build. Callers still
|
||||
// read local cache and explicit overrides through the helpers below.
|
||||
return false
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -682,6 +695,11 @@ async function getFeatureValueInternal<T>(
|
||||
return configOverrides[feature] as T
|
||||
}
|
||||
|
||||
const cached = getCachedGrowthBookFeature<T>(feature)
|
||||
if (cached !== undefined) {
|
||||
return cached
|
||||
}
|
||||
|
||||
if (!isGrowthBookEnabled()) {
|
||||
return defaultValue
|
||||
}
|
||||
@@ -745,6 +763,11 @@ export function getFeatureValue_CACHED_MAY_BE_STALE<T>(
|
||||
return configOverrides[feature] as T
|
||||
}
|
||||
|
||||
const cached = getCachedGrowthBookFeature<T>(feature)
|
||||
if (cached !== undefined) {
|
||||
return cached
|
||||
}
|
||||
|
||||
if (!isGrowthBookEnabled()) {
|
||||
return defaultValue
|
||||
}
|
||||
@@ -814,6 +837,16 @@ export function checkStatsigFeatureGate_CACHED_MAY_BE_STALE(
|
||||
return Boolean(configOverrides[gate])
|
||||
}
|
||||
|
||||
const cached = getCachedGrowthBookFeature<boolean>(gate)
|
||||
if (cached !== undefined) {
|
||||
return Boolean(cached)
|
||||
}
|
||||
|
||||
const statsigCached = getGlobalConfig().cachedStatsigGates?.[gate]
|
||||
if (statsigCached !== undefined) {
|
||||
return Boolean(statsigCached)
|
||||
}
|
||||
|
||||
if (!isGrowthBookEnabled()) {
|
||||
return false
|
||||
}
|
||||
@@ -861,6 +894,16 @@ export async function checkSecurityRestrictionGate(
|
||||
return Boolean(configOverrides[gate])
|
||||
}
|
||||
|
||||
const cached = getCachedGrowthBookFeature<boolean>(gate)
|
||||
if (cached !== undefined) {
|
||||
return Boolean(cached)
|
||||
}
|
||||
|
||||
const statsigCached = getGlobalConfig().cachedStatsigGates?.[gate]
|
||||
if (statsigCached !== undefined) {
|
||||
return Boolean(statsigCached)
|
||||
}
|
||||
|
||||
if (!isGrowthBookEnabled()) {
|
||||
return false
|
||||
}
|
||||
@@ -871,19 +914,6 @@ export async function checkSecurityRestrictionGate(
|
||||
await reinitializingPromise
|
||||
}
|
||||
|
||||
// Check Statsig cache first - it may have correct value from previous logged-in session
|
||||
const config = getGlobalConfig()
|
||||
const statsigCached = config.cachedStatsigGates?.[gate]
|
||||
if (statsigCached !== undefined) {
|
||||
return Boolean(statsigCached)
|
||||
}
|
||||
|
||||
// Then check GrowthBook cache
|
||||
const gbCached = config.cachedGrowthBookFeatures?.[gate]
|
||||
if (gbCached !== undefined) {
|
||||
return Boolean(gbCached)
|
||||
}
|
||||
|
||||
// No cache - return false (don't block on init for uncached gates)
|
||||
return false
|
||||
}
|
||||
@@ -914,13 +944,23 @@ export async function checkGate_CACHED_OR_BLOCKING(
|
||||
return Boolean(configOverrides[gate])
|
||||
}
|
||||
|
||||
const cached = getCachedGrowthBookFeature<boolean>(gate)
|
||||
if (cached !== undefined) {
|
||||
return Boolean(cached)
|
||||
}
|
||||
|
||||
const statsigCached = getGlobalConfig().cachedStatsigGates?.[gate]
|
||||
if (statsigCached !== undefined) {
|
||||
return Boolean(statsigCached)
|
||||
}
|
||||
|
||||
if (!isGrowthBookEnabled()) {
|
||||
return false
|
||||
}
|
||||
|
||||
// Fast path: disk cache already says true — trust it
|
||||
const cached = getGlobalConfig().cachedGrowthBookFeatures?.[gate]
|
||||
if (cached === true) {
|
||||
const diskCached = getGlobalConfig().cachedGrowthBookFeatures?.[gate]
|
||||
if (diskCached === true) {
|
||||
// Log experiment exposure if data is available, otherwise defer
|
||||
if (experimentDataByFeature.has(gate)) {
|
||||
logExposureForFeature(gate)
|
||||
|
||||
@@ -1,111 +1,32 @@
|
||||
/**
|
||||
* Analytics sink implementation
|
||||
*
|
||||
* This module contains the actual analytics routing logic and should be
|
||||
* initialized during app startup. It routes events to Datadog and 1P event
|
||||
* logging.
|
||||
*
|
||||
* Usage: Call initializeAnalyticsSink() during app startup to attach the sink.
|
||||
* This open build keeps the analytics sink boundary for compatibility, but
|
||||
* drops all queued analytics events locally instead of routing them onward.
|
||||
*/
|
||||
|
||||
import { trackDatadogEvent } from './datadog.js'
|
||||
import { logEventTo1P, shouldSampleEvent } from './firstPartyEventLogger.js'
|
||||
import { checkStatsigFeatureGate_CACHED_MAY_BE_STALE } from './growthbook.js'
|
||||
import { attachAnalyticsSink, stripProtoFields } from './index.js'
|
||||
import { isSinkKilled } from './sinkKillswitch.js'
|
||||
import { attachAnalyticsSink } from './index.js'
|
||||
|
||||
// Local type matching the logEvent metadata signature
|
||||
type LogEventMetadata = { [key: string]: boolean | number | undefined }
|
||||
|
||||
const DATADOG_GATE_NAME = 'tengu_log_datadog_events'
|
||||
|
||||
// Module-level gate state - starts undefined, initialized during startup
|
||||
let isDatadogGateEnabled: boolean | undefined = undefined
|
||||
|
||||
/**
|
||||
* Check if Datadog tracking is enabled.
|
||||
* Falls back to cached value from previous session if not yet initialized.
|
||||
*/
|
||||
function shouldTrackDatadog(): boolean {
|
||||
if (isSinkKilled('datadog')) {
|
||||
return false
|
||||
}
|
||||
if (isDatadogGateEnabled !== undefined) {
|
||||
return isDatadogGateEnabled
|
||||
}
|
||||
|
||||
// Fallback to cached value from previous session
|
||||
try {
|
||||
return checkStatsigFeatureGate_CACHED_MAY_BE_STALE(DATADOG_GATE_NAME)
|
||||
} catch {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Log an event (synchronous implementation)
|
||||
*/
|
||||
function logEventImpl(eventName: string, metadata: LogEventMetadata): void {
|
||||
// Check if this event should be sampled
|
||||
const sampleResult = shouldSampleEvent(eventName)
|
||||
|
||||
// If sample result is 0, the event was not selected for logging
|
||||
if (sampleResult === 0) {
|
||||
function logEventImpl(
|
||||
_eventName: string,
|
||||
_metadata: LogEventMetadata,
|
||||
): void {
|
||||
return
|
||||
}
|
||||
|
||||
// If sample result is a positive number, add it to metadata
|
||||
const metadataWithSampleRate =
|
||||
sampleResult !== null
|
||||
? { ...metadata, sample_rate: sampleResult }
|
||||
: metadata
|
||||
|
||||
if (shouldTrackDatadog()) {
|
||||
// Datadog is a general-access backend — strip _PROTO_* keys
|
||||
// (unredacted PII-tagged values meant only for the 1P privileged column).
|
||||
void trackDatadogEvent(eventName, stripProtoFields(metadataWithSampleRate))
|
||||
}
|
||||
|
||||
// 1P receives the full payload including _PROTO_* — the exporter
|
||||
// destructures and routes those keys to proto fields itself.
|
||||
logEventTo1P(eventName, metadataWithSampleRate)
|
||||
}
|
||||
|
||||
/**
|
||||
* Log an event (asynchronous implementation)
|
||||
*
|
||||
* With Segment removed the two remaining sinks are fire-and-forget, so this
|
||||
* just wraps the sync impl — kept to preserve the sink interface contract.
|
||||
*/
|
||||
function logEventAsyncImpl(
|
||||
eventName: string,
|
||||
metadata: LogEventMetadata,
|
||||
_eventName: string,
|
||||
_metadata: LogEventMetadata,
|
||||
): Promise<void> {
|
||||
logEventImpl(eventName, metadata)
|
||||
return Promise.resolve()
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize analytics gates during startup.
|
||||
*
|
||||
* Updates gate values from server. Early events use cached values from previous
|
||||
* session to avoid data loss during initialization.
|
||||
*
|
||||
* Called from main.tsx during setupBackend().
|
||||
*/
|
||||
export function initializeAnalyticsGates(): void {
|
||||
isDatadogGateEnabled =
|
||||
checkStatsigFeatureGate_CACHED_MAY_BE_STALE(DATADOG_GATE_NAME)
|
||||
return
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize the analytics sink.
|
||||
*
|
||||
* Call this during app startup to attach the analytics backend.
|
||||
* Any events logged before this is called will be queued and drained.
|
||||
*
|
||||
* Idempotent: safe to call multiple times (subsequent calls are no-ops).
|
||||
*/
|
||||
export function initializeAnalyticsSink(): void {
|
||||
attachAnalyticsSink({
|
||||
logEvent: logEventImpl,
|
||||
|
||||
@@ -1,75 +1,14 @@
|
||||
import type { Attributes } from '@opentelemetry/api'
|
||||
import { getEventLogger, getPromptId } from 'src/bootstrap/state.js'
|
||||
import { logForDebugging } from '../debug.js'
|
||||
import { isEnvTruthy } from '../envUtils.js'
|
||||
import { getTelemetryAttributes } from '../telemetryAttributes.js'
|
||||
/**
|
||||
* OpenTelemetry event egress is disabled in this build.
|
||||
*/
|
||||
|
||||
// Monotonically increasing counter for ordering events within a session
|
||||
let eventSequence = 0
|
||||
|
||||
// Track whether we've already warned about a null event logger to avoid spamming
|
||||
let hasWarnedNoEventLogger = false
|
||||
|
||||
function isUserPromptLoggingEnabled() {
|
||||
return isEnvTruthy(process.env.OTEL_LOG_USER_PROMPTS)
|
||||
}
|
||||
|
||||
export function redactIfDisabled(content: string): string {
|
||||
return isUserPromptLoggingEnabled() ? content : '<REDACTED>'
|
||||
export function redactIfDisabled(_content: string): string {
|
||||
return '<REDACTED>'
|
||||
}
|
||||
|
||||
export async function logOTelEvent(
|
||||
eventName: string,
|
||||
metadata: { [key: string]: string | undefined } = {},
|
||||
_eventName: string,
|
||||
_metadata: { [key: string]: string | undefined } = {},
|
||||
): Promise<void> {
|
||||
const eventLogger = getEventLogger()
|
||||
if (!eventLogger) {
|
||||
if (!hasWarnedNoEventLogger) {
|
||||
hasWarnedNoEventLogger = true
|
||||
logForDebugging(
|
||||
`[3P telemetry] Event dropped (no event logger initialized): ${eventName}`,
|
||||
{ level: 'warn' },
|
||||
)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Skip logging in test environment
|
||||
if (process.env.NODE_ENV === 'test') {
|
||||
return
|
||||
}
|
||||
|
||||
const attributes: Attributes = {
|
||||
...getTelemetryAttributes(),
|
||||
'event.name': eventName,
|
||||
'event.timestamp': new Date().toISOString(),
|
||||
'event.sequence': eventSequence++,
|
||||
}
|
||||
|
||||
// Add prompt ID to events (but not metrics, where it would cause unbounded cardinality)
|
||||
const promptId = getPromptId()
|
||||
if (promptId) {
|
||||
attributes['prompt.id'] = promptId
|
||||
}
|
||||
|
||||
// Workspace directory from the desktop app (host path). Events only —
|
||||
// filesystem paths are too high-cardinality for metric dimensions, and
|
||||
// the BQ metrics pipeline must never see them.
|
||||
const workspaceDir = process.env.CLAUDE_CODE_WORKSPACE_HOST_PATHS
|
||||
if (workspaceDir) {
|
||||
attributes['workspace.host_paths'] = workspaceDir.split('|')
|
||||
}
|
||||
|
||||
// Add metadata as attributes - all values are already strings
|
||||
for (const [key, value] of Object.entries(metadata)) {
|
||||
if (value !== undefined) {
|
||||
attributes[key] = value
|
||||
}
|
||||
}
|
||||
|
||||
// Emit log record as an event
|
||||
eventLogger.emit({
|
||||
body: `claude_code.${eventName}`,
|
||||
attributes,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -1,123 +1,5 @@
|
||||
import { DiagLogLevel, diag, trace } from '@opentelemetry/api'
|
||||
import { logs } from '@opentelemetry/api-logs'
|
||||
// OTLP/Prometheus exporters are dynamically imported inside the protocol
|
||||
// switch statements below. A process uses at most one protocol variant per
|
||||
// signal, but static imports would load all 6 (~1.2MB) on every startup.
|
||||
import {
|
||||
envDetector,
|
||||
hostDetector,
|
||||
osDetector,
|
||||
resourceFromAttributes,
|
||||
} from '@opentelemetry/resources'
|
||||
import {
|
||||
BatchLogRecordProcessor,
|
||||
ConsoleLogRecordExporter,
|
||||
LoggerProvider,
|
||||
} from '@opentelemetry/sdk-logs'
|
||||
import {
|
||||
ConsoleMetricExporter,
|
||||
MeterProvider,
|
||||
PeriodicExportingMetricReader,
|
||||
} from '@opentelemetry/sdk-metrics'
|
||||
import {
|
||||
BasicTracerProvider,
|
||||
BatchSpanProcessor,
|
||||
ConsoleSpanExporter,
|
||||
} from '@opentelemetry/sdk-trace-base'
|
||||
import {
|
||||
ATTR_SERVICE_NAME,
|
||||
ATTR_SERVICE_VERSION,
|
||||
SEMRESATTRS_HOST_ARCH,
|
||||
} from '@opentelemetry/semantic-conventions'
|
||||
import { HttpsProxyAgent } from 'https-proxy-agent'
|
||||
import {
|
||||
getLoggerProvider,
|
||||
getMeterProvider,
|
||||
getTracerProvider,
|
||||
setEventLogger,
|
||||
setLoggerProvider,
|
||||
setMeterProvider,
|
||||
setTracerProvider,
|
||||
} from 'src/bootstrap/state.js'
|
||||
import {
|
||||
getOtelHeadersFromHelper,
|
||||
getSubscriptionType,
|
||||
is1PApiCustomer,
|
||||
isClaudeAISubscriber,
|
||||
} from 'src/utils/auth.js'
|
||||
import { getPlatform, getWslVersion } from 'src/utils/platform.js'
|
||||
export function bootstrapTelemetry(): void {}
|
||||
|
||||
import { getCACertificates } from '../caCerts.js'
|
||||
import { registerCleanup } from '../cleanupRegistry.js'
|
||||
import { getHasFormattedOutput, logForDebugging } from '../debug.js'
|
||||
import { isEnvTruthy } from '../envUtils.js'
|
||||
import { errorMessage } from '../errors.js'
|
||||
import { getMTLSConfig } from '../mtls.js'
|
||||
import { getProxyUrl, shouldBypassProxy } from '../proxy.js'
|
||||
import { getSettings_DEPRECATED } from '../settings/settings.js'
|
||||
import { jsonStringify } from '../slowOperations.js'
|
||||
import { profileCheckpoint } from '../startupProfiler.js'
|
||||
import { isBetaTracingEnabled } from './betaSessionTracing.js'
|
||||
import { BigQueryMetricsExporter } from './bigqueryExporter.js'
|
||||
import { ClaudeCodeDiagLogger } from './logger.js'
|
||||
import { initializePerfettoTracing } from './perfettoTracing.js'
|
||||
import {
|
||||
endInteractionSpan,
|
||||
isEnhancedTelemetryEnabled,
|
||||
} from './sessionTracing.js'
|
||||
|
||||
const DEFAULT_METRICS_EXPORT_INTERVAL_MS = 60000
|
||||
const DEFAULT_LOGS_EXPORT_INTERVAL_MS = 5000
|
||||
const DEFAULT_TRACES_EXPORT_INTERVAL_MS = 5000
|
||||
|
||||
class TelemetryTimeoutError extends Error {}
|
||||
|
||||
function telemetryTimeout(ms: number, message: string): Promise<never> {
|
||||
return new Promise((_, reject) => {
|
||||
setTimeout(
|
||||
(rej: (e: Error) => void, msg: string) =>
|
||||
rej(new TelemetryTimeoutError(msg)),
|
||||
ms,
|
||||
reject,
|
||||
message,
|
||||
).unref()
|
||||
})
|
||||
}
|
||||
|
||||
export function bootstrapTelemetry() {
|
||||
if (process.env.USER_TYPE === 'ant') {
|
||||
// Read from ANT_ prefixed variables that are defined at build time
|
||||
if (process.env.ANT_OTEL_METRICS_EXPORTER) {
|
||||
process.env.OTEL_METRICS_EXPORTER = process.env.ANT_OTEL_METRICS_EXPORTER
|
||||
}
|
||||
if (process.env.ANT_OTEL_LOGS_EXPORTER) {
|
||||
process.env.OTEL_LOGS_EXPORTER = process.env.ANT_OTEL_LOGS_EXPORTER
|
||||
}
|
||||
if (process.env.ANT_OTEL_TRACES_EXPORTER) {
|
||||
process.env.OTEL_TRACES_EXPORTER = process.env.ANT_OTEL_TRACES_EXPORTER
|
||||
}
|
||||
if (process.env.ANT_OTEL_EXPORTER_OTLP_PROTOCOL) {
|
||||
process.env.OTEL_EXPORTER_OTLP_PROTOCOL =
|
||||
process.env.ANT_OTEL_EXPORTER_OTLP_PROTOCOL
|
||||
}
|
||||
if (process.env.ANT_OTEL_EXPORTER_OTLP_ENDPOINT) {
|
||||
process.env.OTEL_EXPORTER_OTLP_ENDPOINT =
|
||||
process.env.ANT_OTEL_EXPORTER_OTLP_ENDPOINT
|
||||
}
|
||||
if (process.env.ANT_OTEL_EXPORTER_OTLP_HEADERS) {
|
||||
process.env.OTEL_EXPORTER_OTLP_HEADERS =
|
||||
process.env.ANT_OTEL_EXPORTER_OTLP_HEADERS
|
||||
}
|
||||
}
|
||||
|
||||
// Set default tempoality to 'delta' because it's the more sane default
|
||||
if (!process.env.OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE) {
|
||||
process.env.OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE = 'delta'
|
||||
}
|
||||
}
|
||||
|
||||
// Per OTEL spec, "none" means "no automatically configured exporter for this signal".
|
||||
// https://opentelemetry.io/docs/specs/otel/configuration/sdk-environment-variables/#exporter-selection
|
||||
export function parseExporterTypes(value: string | undefined): string[] {
|
||||
return (value || '')
|
||||
.trim()
|
||||
@@ -127,699 +9,14 @@ export function parseExporterTypes(value: string | undefined): string[] {
|
||||
.filter(t => t !== 'none')
|
||||
}
|
||||
|
||||
async function getOtlpReaders() {
|
||||
const exporterTypes = parseExporterTypes(process.env.OTEL_METRICS_EXPORTER)
|
||||
const exportInterval = parseInt(
|
||||
process.env.OTEL_METRIC_EXPORT_INTERVAL ||
|
||||
DEFAULT_METRICS_EXPORT_INTERVAL_MS.toString(),
|
||||
)
|
||||
|
||||
const exporters = []
|
||||
for (const exporterType of exporterTypes) {
|
||||
if (exporterType === 'console') {
|
||||
// Custom console exporter that shows resource attributes
|
||||
const consoleExporter = new ConsoleMetricExporter()
|
||||
const originalExport = consoleExporter.export.bind(consoleExporter)
|
||||
|
||||
consoleExporter.export = (metrics, callback) => {
|
||||
// Log resource attributes once at the start
|
||||
if (metrics.resource && metrics.resource.attributes) {
|
||||
// The console exporter is for debugging, so console output is intentional here
|
||||
|
||||
logForDebugging('\n=== Resource Attributes ===')
|
||||
logForDebugging(jsonStringify(metrics.resource.attributes))
|
||||
logForDebugging('===========================\n')
|
||||
}
|
||||
|
||||
return originalExport(metrics, callback)
|
||||
}
|
||||
|
||||
exporters.push(consoleExporter)
|
||||
} else if (exporterType === 'otlp') {
|
||||
const protocol =
|
||||
process.env.OTEL_EXPORTER_OTLP_METRICS_PROTOCOL?.trim() ||
|
||||
process.env.OTEL_EXPORTER_OTLP_PROTOCOL?.trim()
|
||||
|
||||
const httpConfig = getOTLPExporterConfig()
|
||||
|
||||
switch (protocol) {
|
||||
case 'grpc': {
|
||||
// Lazy-import to keep @grpc/grpc-js (~700KB) out of the telemetry chunk
|
||||
// when the protocol is http/protobuf (ant default) or http/json.
|
||||
const { OTLPMetricExporter } = await import(
|
||||
'@opentelemetry/exporter-metrics-otlp-grpc'
|
||||
)
|
||||
exporters.push(new OTLPMetricExporter())
|
||||
break
|
||||
}
|
||||
case 'http/json': {
|
||||
const { OTLPMetricExporter } = await import(
|
||||
'@opentelemetry/exporter-metrics-otlp-http'
|
||||
)
|
||||
exporters.push(new OTLPMetricExporter(httpConfig))
|
||||
break
|
||||
}
|
||||
case 'http/protobuf': {
|
||||
const { OTLPMetricExporter } = await import(
|
||||
'@opentelemetry/exporter-metrics-otlp-proto'
|
||||
)
|
||||
exporters.push(new OTLPMetricExporter(httpConfig))
|
||||
break
|
||||
}
|
||||
default:
|
||||
throw new Error(
|
||||
`Unknown protocol set in OTEL_EXPORTER_OTLP_METRICS_PROTOCOL or OTEL_EXPORTER_OTLP_PROTOCOL env var: ${protocol}`,
|
||||
)
|
||||
}
|
||||
} else if (exporterType === 'prometheus') {
|
||||
const { PrometheusExporter } = await import(
|
||||
'@opentelemetry/exporter-prometheus'
|
||||
)
|
||||
exporters.push(new PrometheusExporter())
|
||||
} else {
|
||||
throw new Error(
|
||||
`Unknown exporter type set in OTEL_EXPORTER_OTLP_METRICS_PROTOCOL or OTEL_EXPORTER_OTLP_PROTOCOL env var: ${exporterType}`,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
return exporters.map(exporter => {
|
||||
if ('export' in exporter) {
|
||||
return new PeriodicExportingMetricReader({
|
||||
exporter,
|
||||
exportIntervalMillis: exportInterval,
|
||||
})
|
||||
}
|
||||
return exporter
|
||||
})
|
||||
export function isTelemetryEnabled(): boolean {
|
||||
return false
|
||||
}
|
||||
|
||||
async function getOtlpLogExporters() {
|
||||
const exporterTypes = parseExporterTypes(process.env.OTEL_LOGS_EXPORTER)
|
||||
|
||||
const protocol =
|
||||
process.env.OTEL_EXPORTER_OTLP_LOGS_PROTOCOL?.trim() ||
|
||||
process.env.OTEL_EXPORTER_OTLP_PROTOCOL?.trim()
|
||||
const endpoint = process.env.OTEL_EXPORTER_OTLP_ENDPOINT
|
||||
|
||||
logForDebugging(
|
||||
`[3P telemetry] getOtlpLogExporters: types=${jsonStringify(exporterTypes)}, protocol=${protocol}, endpoint=${endpoint}`,
|
||||
)
|
||||
|
||||
const exporters = []
|
||||
for (const exporterType of exporterTypes) {
|
||||
if (exporterType === 'console') {
|
||||
exporters.push(new ConsoleLogRecordExporter())
|
||||
} else if (exporterType === 'otlp') {
|
||||
const httpConfig = getOTLPExporterConfig()
|
||||
|
||||
switch (protocol) {
|
||||
case 'grpc': {
|
||||
const { OTLPLogExporter } = await import(
|
||||
'@opentelemetry/exporter-logs-otlp-grpc'
|
||||
)
|
||||
exporters.push(new OTLPLogExporter())
|
||||
break
|
||||
}
|
||||
case 'http/json': {
|
||||
const { OTLPLogExporter } = await import(
|
||||
'@opentelemetry/exporter-logs-otlp-http'
|
||||
)
|
||||
exporters.push(new OTLPLogExporter(httpConfig))
|
||||
break
|
||||
}
|
||||
case 'http/protobuf': {
|
||||
const { OTLPLogExporter } = await import(
|
||||
'@opentelemetry/exporter-logs-otlp-proto'
|
||||
)
|
||||
exporters.push(new OTLPLogExporter(httpConfig))
|
||||
break
|
||||
}
|
||||
default:
|
||||
throw new Error(
|
||||
`Unknown protocol set in OTEL_EXPORTER_OTLP_LOGS_PROTOCOL or OTEL_EXPORTER_OTLP_PROTOCOL env var: ${protocol}`,
|
||||
)
|
||||
}
|
||||
} else {
|
||||
throw new Error(
|
||||
`Unknown exporter type set in OTEL_LOGS_EXPORTER env var: ${exporterType}`,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
return exporters
|
||||
export async function initializeTelemetry(): Promise<null> {
|
||||
return null
|
||||
}
|
||||
|
||||
async function getOtlpTraceExporters() {
|
||||
const exporterTypes = parseExporterTypes(process.env.OTEL_TRACES_EXPORTER)
|
||||
|
||||
const exporters = []
|
||||
for (const exporterType of exporterTypes) {
|
||||
if (exporterType === 'console') {
|
||||
exporters.push(new ConsoleSpanExporter())
|
||||
} else if (exporterType === 'otlp') {
|
||||
const protocol =
|
||||
process.env.OTEL_EXPORTER_OTLP_TRACES_PROTOCOL?.trim() ||
|
||||
process.env.OTEL_EXPORTER_OTLP_PROTOCOL?.trim()
|
||||
|
||||
const httpConfig = getOTLPExporterConfig()
|
||||
|
||||
switch (protocol) {
|
||||
case 'grpc': {
|
||||
const { OTLPTraceExporter } = await import(
|
||||
'@opentelemetry/exporter-trace-otlp-grpc'
|
||||
)
|
||||
exporters.push(new OTLPTraceExporter())
|
||||
break
|
||||
}
|
||||
case 'http/json': {
|
||||
const { OTLPTraceExporter } = await import(
|
||||
'@opentelemetry/exporter-trace-otlp-http'
|
||||
)
|
||||
exporters.push(new OTLPTraceExporter(httpConfig))
|
||||
break
|
||||
}
|
||||
case 'http/protobuf': {
|
||||
const { OTLPTraceExporter } = await import(
|
||||
'@opentelemetry/exporter-trace-otlp-proto'
|
||||
)
|
||||
exporters.push(new OTLPTraceExporter(httpConfig))
|
||||
break
|
||||
}
|
||||
default:
|
||||
throw new Error(
|
||||
`Unknown protocol set in OTEL_EXPORTER_OTLP_TRACES_PROTOCOL or OTEL_EXPORTER_OTLP_PROTOCOL env var: ${protocol}`,
|
||||
)
|
||||
}
|
||||
} else {
|
||||
throw new Error(
|
||||
`Unknown exporter type set in OTEL_TRACES_EXPORTER env var: ${exporterType}`,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
return exporters
|
||||
}
|
||||
|
||||
export function isTelemetryEnabled() {
|
||||
return isEnvTruthy(process.env.CLAUDE_CODE_ENABLE_TELEMETRY)
|
||||
}
|
||||
|
||||
function getBigQueryExportingReader() {
|
||||
const bigqueryExporter = new BigQueryMetricsExporter()
|
||||
return new PeriodicExportingMetricReader({
|
||||
exporter: bigqueryExporter,
|
||||
exportIntervalMillis: 5 * 60 * 1000, // 5mins for BigQuery metrics exporter to reduce load
|
||||
})
|
||||
}
|
||||
|
||||
function isBigQueryMetricsEnabled() {
|
||||
// BigQuery metrics are enabled for:
|
||||
// 1. API customers (excluding Claude.ai subscribers and Bedrock/Vertex)
|
||||
// 2. Claude for Enterprise (C4E) users
|
||||
// 3. Claude for Teams users
|
||||
const subscriptionType = getSubscriptionType()
|
||||
const isC4EOrTeamUser =
|
||||
isClaudeAISubscriber() &&
|
||||
(subscriptionType === 'enterprise' || subscriptionType === 'team')
|
||||
|
||||
return is1PApiCustomer() || isC4EOrTeamUser
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize beta tracing - a separate code path for detailed debugging.
|
||||
* Uses BETA_TRACING_ENDPOINT instead of OTEL_EXPORTER_OTLP_ENDPOINT.
|
||||
*/
|
||||
async function initializeBetaTracing(
|
||||
resource: ReturnType<typeof resourceFromAttributes>,
|
||||
): Promise<void> {
|
||||
const endpoint = process.env.BETA_TRACING_ENDPOINT
|
||||
if (!endpoint) {
|
||||
return
|
||||
}
|
||||
|
||||
const [{ OTLPTraceExporter }, { OTLPLogExporter }] = await Promise.all([
|
||||
import('@opentelemetry/exporter-trace-otlp-http'),
|
||||
import('@opentelemetry/exporter-logs-otlp-http'),
|
||||
])
|
||||
|
||||
const httpConfig = {
|
||||
url: `${endpoint}/v1/traces`,
|
||||
}
|
||||
|
||||
const logHttpConfig = {
|
||||
url: `${endpoint}/v1/logs`,
|
||||
}
|
||||
|
||||
// Initialize trace exporter
|
||||
const traceExporter = new OTLPTraceExporter(httpConfig)
|
||||
const spanProcessor = new BatchSpanProcessor(traceExporter, {
|
||||
scheduledDelayMillis: DEFAULT_TRACES_EXPORT_INTERVAL_MS,
|
||||
})
|
||||
|
||||
const tracerProvider = new BasicTracerProvider({
|
||||
resource,
|
||||
spanProcessors: [spanProcessor],
|
||||
})
|
||||
|
||||
trace.setGlobalTracerProvider(tracerProvider)
|
||||
setTracerProvider(tracerProvider)
|
||||
|
||||
// Initialize log exporter
|
||||
const logExporter = new OTLPLogExporter(logHttpConfig)
|
||||
const loggerProvider = new LoggerProvider({
|
||||
resource,
|
||||
processors: [
|
||||
new BatchLogRecordProcessor(logExporter, {
|
||||
scheduledDelayMillis: DEFAULT_LOGS_EXPORT_INTERVAL_MS,
|
||||
}),
|
||||
],
|
||||
})
|
||||
|
||||
logs.setGlobalLoggerProvider(loggerProvider)
|
||||
setLoggerProvider(loggerProvider)
|
||||
|
||||
// Initialize event logger
|
||||
const eventLogger = logs.getLogger(
|
||||
'com.anthropic.claude_code.events',
|
||||
MACRO.VERSION,
|
||||
)
|
||||
setEventLogger(eventLogger)
|
||||
|
||||
// Setup flush handlers - flush both logs AND traces
|
||||
process.on('beforeExit', async () => {
|
||||
await loggerProvider?.forceFlush()
|
||||
await tracerProvider?.forceFlush()
|
||||
})
|
||||
|
||||
process.on('exit', () => {
|
||||
void loggerProvider?.forceFlush()
|
||||
void tracerProvider?.forceFlush()
|
||||
})
|
||||
}
|
||||
|
||||
export async function initializeTelemetry() {
|
||||
profileCheckpoint('telemetry_init_start')
|
||||
bootstrapTelemetry()
|
||||
|
||||
// Console exporters call console.dir on a timer (5s logs/traces, 60s
|
||||
// metrics), writing pretty-printed objects to stdout. In stream-json
|
||||
// mode stdout is the SDK message channel; the first line (`{`) breaks
|
||||
// the SDK's line reader. Stripped here (not main.tsx) because init.ts
|
||||
// re-runs applyConfigEnvironmentVariables() inside initializeTelemetry-
|
||||
// AfterTrust for remote-managed-settings users, and bootstrapTelemetry
|
||||
// above copies ANT_OTEL_* for ant users — both would undo an earlier strip.
|
||||
if (getHasFormattedOutput()) {
|
||||
for (const key of [
|
||||
'OTEL_METRICS_EXPORTER',
|
||||
'OTEL_LOGS_EXPORTER',
|
||||
'OTEL_TRACES_EXPORTER',
|
||||
] as const) {
|
||||
const v = process.env[key]
|
||||
if (v?.includes('console')) {
|
||||
process.env[key] = v
|
||||
.split(',')
|
||||
.map(s => s.trim())
|
||||
.filter(s => s !== 'console')
|
||||
.join(',')
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
diag.setLogger(new ClaudeCodeDiagLogger(), DiagLogLevel.ERROR)
|
||||
|
||||
// Initialize Perfetto tracing (independent of OTEL)
|
||||
// Enable via CLAUDE_CODE_PERFETTO_TRACE=1 or CLAUDE_CODE_PERFETTO_TRACE=<path>
|
||||
initializePerfettoTracing()
|
||||
|
||||
const readers = []
|
||||
|
||||
// Add customer exporters (if enabled)
|
||||
const telemetryEnabled = isTelemetryEnabled()
|
||||
logForDebugging(
|
||||
`[3P telemetry] isTelemetryEnabled=${telemetryEnabled} (CLAUDE_CODE_ENABLE_TELEMETRY=${process.env.CLAUDE_CODE_ENABLE_TELEMETRY})`,
|
||||
)
|
||||
if (telemetryEnabled) {
|
||||
readers.push(...(await getOtlpReaders()))
|
||||
}
|
||||
|
||||
// Add BigQuery exporter (for API customers, C4E users, and internal users)
|
||||
if (isBigQueryMetricsEnabled()) {
|
||||
readers.push(getBigQueryExportingReader())
|
||||
}
|
||||
|
||||
// Create base resource with service attributes
|
||||
const platform = getPlatform()
|
||||
const baseAttributes: Record<string, string> = {
|
||||
[ATTR_SERVICE_NAME]: 'claude-code',
|
||||
[ATTR_SERVICE_VERSION]: MACRO.VERSION,
|
||||
}
|
||||
|
||||
// Add WSL-specific attributes if running on WSL
|
||||
if (platform === 'wsl') {
|
||||
const wslVersion = getWslVersion()
|
||||
if (wslVersion) {
|
||||
baseAttributes['wsl.version'] = wslVersion
|
||||
}
|
||||
}
|
||||
|
||||
const baseResource = resourceFromAttributes(baseAttributes)
|
||||
|
||||
// Use OpenTelemetry detectors
|
||||
const osResource = resourceFromAttributes(
|
||||
osDetector.detect().attributes || {},
|
||||
)
|
||||
|
||||
// Extract only host.arch from hostDetector
|
||||
const hostDetected = hostDetector.detect()
|
||||
const hostArchAttributes = hostDetected.attributes?.[SEMRESATTRS_HOST_ARCH]
|
||||
? {
|
||||
[SEMRESATTRS_HOST_ARCH]: hostDetected.attributes[SEMRESATTRS_HOST_ARCH],
|
||||
}
|
||||
: {}
|
||||
const hostArchResource = resourceFromAttributes(hostArchAttributes)
|
||||
|
||||
const envResource = resourceFromAttributes(
|
||||
envDetector.detect().attributes || {},
|
||||
)
|
||||
|
||||
// Merge resources - later resources take precedence
|
||||
const resource = baseResource
|
||||
.merge(osResource)
|
||||
.merge(hostArchResource)
|
||||
.merge(envResource)
|
||||
|
||||
// Check if beta tracing is enabled - this is a separate code path
|
||||
// Available to all users who set ENABLE_BETA_TRACING_DETAILED=1 and BETA_TRACING_ENDPOINT
|
||||
if (isBetaTracingEnabled()) {
|
||||
void initializeBetaTracing(resource).catch(e =>
|
||||
logForDebugging(`Beta tracing init failed: ${e}`, { level: 'error' }),
|
||||
)
|
||||
// Still set up meter provider for metrics (but skip regular logs/traces setup)
|
||||
const meterProvider = new MeterProvider({
|
||||
resource,
|
||||
views: [],
|
||||
readers,
|
||||
})
|
||||
setMeterProvider(meterProvider)
|
||||
|
||||
// Register shutdown for beta tracing
|
||||
const shutdownTelemetry = async () => {
|
||||
const timeoutMs = parseInt(
|
||||
process.env.CLAUDE_CODE_OTEL_SHUTDOWN_TIMEOUT_MS || '2000',
|
||||
)
|
||||
try {
|
||||
endInteractionSpan()
|
||||
|
||||
// Force flush + shutdown together inside the timeout. Previously forceFlush
|
||||
// was awaited unbounded BEFORE the race, blocking exit on slow OTLP endpoints.
|
||||
// Each provider's flush→shutdown is chained independently so a slow logger
|
||||
// flush doesn't delay meterProvider/tracerProvider shutdown (no waterfall).
|
||||
const loggerProvider = getLoggerProvider()
|
||||
const tracerProvider = getTracerProvider()
|
||||
|
||||
const chains: Promise<void>[] = [meterProvider.shutdown()]
|
||||
if (loggerProvider) {
|
||||
chains.push(
|
||||
loggerProvider.forceFlush().then(() => loggerProvider.shutdown()),
|
||||
)
|
||||
}
|
||||
if (tracerProvider) {
|
||||
chains.push(
|
||||
tracerProvider.forceFlush().then(() => tracerProvider.shutdown()),
|
||||
)
|
||||
}
|
||||
|
||||
await Promise.race([
|
||||
Promise.all(chains),
|
||||
telemetryTimeout(timeoutMs, 'OpenTelemetry shutdown timeout'),
|
||||
])
|
||||
} catch {
|
||||
// Ignore shutdown errors
|
||||
}
|
||||
}
|
||||
registerCleanup(shutdownTelemetry)
|
||||
|
||||
return meterProvider.getMeter('com.anthropic.claude_code', MACRO.VERSION)
|
||||
}
|
||||
|
||||
const meterProvider = new MeterProvider({
|
||||
resource,
|
||||
views: [],
|
||||
readers,
|
||||
})
|
||||
|
||||
// Store reference in state for flushing
|
||||
setMeterProvider(meterProvider)
|
||||
|
||||
// Initialize logs if telemetry is enabled
|
||||
if (telemetryEnabled) {
|
||||
const logExporters = await getOtlpLogExporters()
|
||||
logForDebugging(
|
||||
`[3P telemetry] Created ${logExporters.length} log exporter(s)`,
|
||||
)
|
||||
|
||||
if (logExporters.length > 0) {
|
||||
const loggerProvider = new LoggerProvider({
|
||||
resource,
|
||||
// Add batch processors for each exporter
|
||||
processors: logExporters.map(
|
||||
exporter =>
|
||||
new BatchLogRecordProcessor(exporter, {
|
||||
scheduledDelayMillis: parseInt(
|
||||
process.env.OTEL_LOGS_EXPORT_INTERVAL ||
|
||||
DEFAULT_LOGS_EXPORT_INTERVAL_MS.toString(),
|
||||
),
|
||||
}),
|
||||
),
|
||||
})
|
||||
|
||||
// Register the logger provider globally
|
||||
logs.setGlobalLoggerProvider(loggerProvider)
|
||||
setLoggerProvider(loggerProvider)
|
||||
|
||||
// Initialize event logger
|
||||
const eventLogger = logs.getLogger(
|
||||
'com.anthropic.claude_code.events',
|
||||
MACRO.VERSION,
|
||||
)
|
||||
setEventLogger(eventLogger)
|
||||
logForDebugging('[3P telemetry] Event logger set successfully')
|
||||
|
||||
// 'beforeExit' is emitted when Node.js empties its event loop and has no additional work to schedule.
|
||||
// Unlike 'exit', it allows us to perform async operations, so it works well for letting
|
||||
// network requests complete before the process exits naturally.
|
||||
process.on('beforeExit', async () => {
|
||||
await loggerProvider?.forceFlush()
|
||||
// Also flush traces - they use BatchSpanProcessor which needs explicit flush
|
||||
const tracerProvider = getTracerProvider()
|
||||
await tracerProvider?.forceFlush()
|
||||
})
|
||||
|
||||
process.on('exit', () => {
|
||||
// Final attempt to flush logs and traces
|
||||
void loggerProvider?.forceFlush()
|
||||
void getTracerProvider()?.forceFlush()
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Initialize tracing if enhanced telemetry is enabled (BETA)
|
||||
if (telemetryEnabled && isEnhancedTelemetryEnabled()) {
|
||||
const traceExporters = await getOtlpTraceExporters()
|
||||
if (traceExporters.length > 0) {
|
||||
// Create span processors for each exporter
|
||||
const spanProcessors = traceExporters.map(
|
||||
exporter =>
|
||||
new BatchSpanProcessor(exporter, {
|
||||
scheduledDelayMillis: parseInt(
|
||||
process.env.OTEL_TRACES_EXPORT_INTERVAL ||
|
||||
DEFAULT_TRACES_EXPORT_INTERVAL_MS.toString(),
|
||||
),
|
||||
}),
|
||||
)
|
||||
|
||||
const tracerProvider = new BasicTracerProvider({
|
||||
resource,
|
||||
spanProcessors,
|
||||
})
|
||||
|
||||
// Register the tracer provider globally
|
||||
trace.setGlobalTracerProvider(tracerProvider)
|
||||
setTracerProvider(tracerProvider)
|
||||
}
|
||||
}
|
||||
|
||||
// Shutdown metrics and logs on exit (flushes and closes exporters)
|
||||
const shutdownTelemetry = async () => {
|
||||
const timeoutMs = parseInt(
|
||||
process.env.CLAUDE_CODE_OTEL_SHUTDOWN_TIMEOUT_MS || '2000',
|
||||
)
|
||||
|
||||
try {
|
||||
// End any active interaction span before shutdown
|
||||
endInteractionSpan()
|
||||
|
||||
const shutdownPromises = [meterProvider.shutdown()]
|
||||
const loggerProvider = getLoggerProvider()
|
||||
if (loggerProvider) {
|
||||
shutdownPromises.push(loggerProvider.shutdown())
|
||||
}
|
||||
const tracerProvider = getTracerProvider()
|
||||
if (tracerProvider) {
|
||||
shutdownPromises.push(tracerProvider.shutdown())
|
||||
}
|
||||
|
||||
await Promise.race([
|
||||
Promise.all(shutdownPromises),
|
||||
telemetryTimeout(timeoutMs, 'OpenTelemetry shutdown timeout'),
|
||||
])
|
||||
} catch (error) {
|
||||
if (error instanceof Error && error.message.includes('timeout')) {
|
||||
logForDebugging(
|
||||
`
|
||||
OpenTelemetry telemetry flush timed out after ${timeoutMs}ms
|
||||
|
||||
To resolve this issue, you can:
|
||||
1. Increase the timeout by setting CLAUDE_CODE_OTEL_SHUTDOWN_TIMEOUT_MS env var (e.g., 5000 for 5 seconds)
|
||||
2. Check if your OpenTelemetry backend is experiencing scalability issues
|
||||
3. Disable OpenTelemetry by unsetting CLAUDE_CODE_ENABLE_TELEMETRY env var
|
||||
|
||||
Current timeout: ${timeoutMs}ms
|
||||
`,
|
||||
{ level: 'error' },
|
||||
)
|
||||
}
|
||||
throw error
|
||||
}
|
||||
}
|
||||
|
||||
// Always register shutdown (internal metrics are always enabled)
|
||||
registerCleanup(shutdownTelemetry)
|
||||
|
||||
return meterProvider.getMeter('com.anthropic.claude_code', MACRO.VERSION)
|
||||
}
|
||||
|
||||
/**
|
||||
* Flush all pending telemetry data immediately.
|
||||
* This should be called before logout or org switching to prevent data leakage.
|
||||
*/
|
||||
export async function flushTelemetry(): Promise<void> {
|
||||
const meterProvider = getMeterProvider()
|
||||
if (!meterProvider) {
|
||||
return
|
||||
}
|
||||
|
||||
const timeoutMs = parseInt(
|
||||
process.env.CLAUDE_CODE_OTEL_FLUSH_TIMEOUT_MS || '5000',
|
||||
)
|
||||
|
||||
try {
|
||||
const flushPromises = [meterProvider.forceFlush()]
|
||||
const loggerProvider = getLoggerProvider()
|
||||
if (loggerProvider) {
|
||||
flushPromises.push(loggerProvider.forceFlush())
|
||||
}
|
||||
const tracerProvider = getTracerProvider()
|
||||
if (tracerProvider) {
|
||||
flushPromises.push(tracerProvider.forceFlush())
|
||||
}
|
||||
|
||||
await Promise.race([
|
||||
Promise.all(flushPromises),
|
||||
telemetryTimeout(timeoutMs, 'OpenTelemetry flush timeout'),
|
||||
])
|
||||
|
||||
logForDebugging('Telemetry flushed successfully')
|
||||
} catch (error) {
|
||||
if (error instanceof TelemetryTimeoutError) {
|
||||
logForDebugging(
|
||||
`Telemetry flush timed out after ${timeoutMs}ms. Some metrics may not be exported.`,
|
||||
{ level: 'warn' },
|
||||
)
|
||||
} else {
|
||||
logForDebugging(`Telemetry flush failed: ${errorMessage(error)}`, {
|
||||
level: 'error',
|
||||
})
|
||||
}
|
||||
// Don't throw - allow logout to continue even if flush fails
|
||||
}
|
||||
}
|
||||
|
||||
function parseOtelHeadersEnvVar(): Record<string, string> {
|
||||
const headers: Record<string, string> = {}
|
||||
const envHeaders = process.env.OTEL_EXPORTER_OTLP_HEADERS
|
||||
if (envHeaders) {
|
||||
for (const pair of envHeaders.split(',')) {
|
||||
const [key, ...valueParts] = pair.split('=')
|
||||
if (key && valueParts.length > 0) {
|
||||
headers[key.trim()] = valueParts.join('=').trim()
|
||||
}
|
||||
}
|
||||
}
|
||||
return headers
|
||||
}
|
||||
|
||||
/**
|
||||
* Get configuration for OTLP exporters including:
|
||||
* - HTTP agent options (proxy, mTLS)
|
||||
* - Dynamic headers via otelHeadersHelper or static headers from env var
|
||||
*/
|
||||
function getOTLPExporterConfig() {
|
||||
const proxyUrl = getProxyUrl()
|
||||
const mtlsConfig = getMTLSConfig()
|
||||
const settings = getSettings_DEPRECATED()
|
||||
|
||||
// Build base config
|
||||
const config: Record<string, unknown> = {}
|
||||
|
||||
// Parse static headers from env var once (doesn't change at runtime)
|
||||
const staticHeaders = parseOtelHeadersEnvVar()
|
||||
|
||||
// If otelHeadersHelper is configured, use async headers function for dynamic refresh
|
||||
// Otherwise just return static headers if any exist
|
||||
if (settings?.otelHeadersHelper) {
|
||||
config.headers = async (): Promise<Record<string, string>> => {
|
||||
const dynamicHeaders = getOtelHeadersFromHelper()
|
||||
return { ...staticHeaders, ...dynamicHeaders }
|
||||
}
|
||||
} else if (Object.keys(staticHeaders).length > 0) {
|
||||
config.headers = async (): Promise<Record<string, string>> => staticHeaders
|
||||
}
|
||||
|
||||
// Check if we should bypass proxy for OTEL endpoint
|
||||
const otelEndpoint = process.env.OTEL_EXPORTER_OTLP_ENDPOINT
|
||||
if (!proxyUrl || (otelEndpoint && shouldBypassProxy(otelEndpoint))) {
|
||||
// No proxy configured or OTEL endpoint should bypass proxy
|
||||
const caCerts = getCACertificates()
|
||||
if (mtlsConfig || caCerts) {
|
||||
config.httpAgentOptions = {
|
||||
...mtlsConfig,
|
||||
...(caCerts && { ca: caCerts }),
|
||||
}
|
||||
}
|
||||
return config
|
||||
}
|
||||
|
||||
// Return an HttpAgentFactory function that creates our proxy agent
|
||||
const caCerts = getCACertificates()
|
||||
const agentFactory = (_protocol: string) => {
|
||||
// Create and return the proxy agent with mTLS and CA cert config
|
||||
const proxyAgent =
|
||||
mtlsConfig || caCerts
|
||||
? new HttpsProxyAgent(proxyUrl, {
|
||||
...(mtlsConfig && {
|
||||
cert: mtlsConfig.cert,
|
||||
key: mtlsConfig.key,
|
||||
passphrase: mtlsConfig.passphrase,
|
||||
}),
|
||||
...(caCerts && { ca: caCerts }),
|
||||
})
|
||||
: new HttpsProxyAgent(proxyUrl)
|
||||
|
||||
return proxyAgent
|
||||
}
|
||||
|
||||
config.httpAgentOptions = agentFactory
|
||||
return config
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user