source dump of claude code
at main 806 lines 26 kB view raw
1import type { HrTime } from '@opentelemetry/api' 2import { type ExportResult, ExportResultCode } from '@opentelemetry/core' 3import type { 4 LogRecordExporter, 5 ReadableLogRecord, 6} from '@opentelemetry/sdk-logs' 7import axios from 'axios' 8import { randomUUID } from 'crypto' 9import { appendFile, mkdir, readdir, unlink, writeFile } from 'fs/promises' 10import * as path from 'path' 11import type { CoreUserData } from 'src/utils/user.js' 12import { 13 getIsNonInteractiveSession, 14 getSessionId, 15} from '../../bootstrap/state.js' 16import { ClaudeCodeInternalEvent } from '../../types/generated/events_mono/claude_code/v1/claude_code_internal_event.js' 17import { GrowthbookExperimentEvent } from '../../types/generated/events_mono/growthbook/v1/growthbook_experiment_event.js' 18import { 19 getClaudeAIOAuthTokens, 20 hasProfileScope, 21 isClaudeAISubscriber, 22} from '../../utils/auth.js' 23import { checkHasTrustDialogAccepted } from '../../utils/config.js' 24import { logForDebugging } from '../../utils/debug.js' 25import { getClaudeConfigHomeDir } from '../../utils/envUtils.js' 26import { errorMessage, isFsInaccessible, toError } from '../../utils/errors.js' 27import { getAuthHeaders } from '../../utils/http.js' 28import { readJSONLFile } from '../../utils/json.js' 29import { logError } from '../../utils/log.js' 30import { sleep } from '../../utils/sleep.js' 31import { jsonStringify } from '../../utils/slowOperations.js' 32import { getClaudeCodeUserAgent } from '../../utils/userAgent.js' 33import { isOAuthTokenExpired } from '../oauth/client.js' 34import { stripProtoFields } from './index.js' 35import { type EventMetadata, to1PEventFormat } from './metadata.js' 36 37// Unique ID for this process run - used to isolate failed event files between runs 38const BATCH_UUID = randomUUID() 39 40// File prefix for failed event storage 41const FILE_PREFIX = '1p_failed_events.' 42 43// Storage directory for failed events - evaluated at runtime to respect CLAUDE_CONFIG_DIR in tests 44function getStorageDir(): string { 45 return path.join(getClaudeConfigHomeDir(), 'telemetry') 46} 47 48// API envelope - event_data is the JSON output from proto toJSON() 49type FirstPartyEventLoggingEvent = { 50 event_type: 'ClaudeCodeInternalEvent' | 'GrowthbookExperimentEvent' 51 event_data: unknown 52} 53 54type FirstPartyEventLoggingPayload = { 55 events: FirstPartyEventLoggingEvent[] 56} 57 58/** 59 * Exporter for 1st-party event logging to /api/event_logging/batch. 60 * 61 * Export cycles are controlled by OpenTelemetry's BatchLogRecordProcessor, which 62 * triggers export() when either: 63 * - Time interval elapses (default: 5 seconds via scheduledDelayMillis) 64 * - Batch size is reached (default: 200 events via maxExportBatchSize) 65 * 66 * This exporter adds resilience on top: 67 * - Append-only log for failed events (concurrency-safe) 68 * - Quadratic backoff retry for failed events, dropped after maxAttempts 69 * - Immediate retry of queued events when any export succeeds (endpoint is healthy) 70 * - Chunking large event sets into smaller batches 71 * - Auth fallback: retries without auth on 401 errors 72 */ 73export class FirstPartyEventLoggingExporter implements LogRecordExporter { 74 private readonly endpoint: string 75 private readonly timeout: number 76 private readonly maxBatchSize: number 77 private readonly skipAuth: boolean 78 private readonly batchDelayMs: number 79 private readonly baseBackoffDelayMs: number 80 private readonly maxBackoffDelayMs: number 81 private readonly maxAttempts: number 82 private readonly isKilled: () => boolean 83 private pendingExports: Promise<void>[] = [] 84 private isShutdown = false 85 private readonly schedule: ( 86 fn: () => Promise<void>, 87 delayMs: number, 88 ) => () => void 89 private cancelBackoff: (() => void) | null = null 90 private attempts = 0 91 private isRetrying = false 92 private lastExportErrorContext: string | undefined 93 94 constructor( 95 options: { 96 timeout?: number 97 maxBatchSize?: number 98 skipAuth?: boolean 99 batchDelayMs?: number 100 baseBackoffDelayMs?: number 101 maxBackoffDelayMs?: number 102 maxAttempts?: number 103 path?: string 104 baseUrl?: string 105 // Injected killswitch probe. Checked per-POST so that disabling the 106 // firstParty sink also stops backoff retries (not just new emits). 107 // Passed in rather than imported to avoid a cycle with firstPartyEventLogger.ts. 108 isKilled?: () => boolean 109 schedule?: (fn: () => Promise<void>, delayMs: number) => () => void 110 } = {}, 111 ) { 112 // Default: prod, except when ANTHROPIC_BASE_URL is explicitly staging. 113 // Overridable via tengu_1p_event_batch_config.baseUrl. 114 const baseUrl = 115 options.baseUrl || 116 (process.env.ANTHROPIC_BASE_URL === 'https://api-staging.anthropic.com' 117 ? 'https://api-staging.anthropic.com' 118 : 'https://api.anthropic.com') 119 120 this.endpoint = `${baseUrl}${options.path || '/api/event_logging/batch'}` 121 122 this.timeout = options.timeout || 10000 123 this.maxBatchSize = options.maxBatchSize || 200 124 this.skipAuth = options.skipAuth ?? false 125 this.batchDelayMs = options.batchDelayMs || 100 126 this.baseBackoffDelayMs = options.baseBackoffDelayMs || 500 127 this.maxBackoffDelayMs = options.maxBackoffDelayMs || 30000 128 this.maxAttempts = options.maxAttempts ?? 8 129 this.isKilled = options.isKilled ?? (() => false) 130 this.schedule = 131 options.schedule ?? 132 ((fn, ms) => { 133 const t = setTimeout(fn, ms) 134 return () => clearTimeout(t) 135 }) 136 137 // Retry any failed events from previous runs of this session (in background) 138 void this.retryPreviousBatches() 139 } 140 141 // Expose for testing 142 async getQueuedEventCount(): Promise<number> { 143 return (await this.loadEventsFromCurrentBatch()).length 144 } 145 146 // --- Storage helpers --- 147 148 private getCurrentBatchFilePath(): string { 149 return path.join( 150 getStorageDir(), 151 `${FILE_PREFIX}${getSessionId()}.${BATCH_UUID}.json`, 152 ) 153 } 154 155 private async loadEventsFromFile( 156 filePath: string, 157 ): Promise<FirstPartyEventLoggingEvent[]> { 158 try { 159 return await readJSONLFile<FirstPartyEventLoggingEvent>(filePath) 160 } catch { 161 return [] 162 } 163 } 164 165 private async loadEventsFromCurrentBatch(): Promise< 166 FirstPartyEventLoggingEvent[] 167 > { 168 return this.loadEventsFromFile(this.getCurrentBatchFilePath()) 169 } 170 171 private async saveEventsToFile( 172 filePath: string, 173 events: FirstPartyEventLoggingEvent[], 174 ): Promise<void> { 175 try { 176 if (events.length === 0) { 177 try { 178 await unlink(filePath) 179 } catch { 180 // File doesn't exist, nothing to delete 181 } 182 } else { 183 // Ensure storage directory exists 184 await mkdir(getStorageDir(), { recursive: true }) 185 // Write as JSON lines (one event per line) 186 const content = events.map(e => jsonStringify(e)).join('\n') + '\n' 187 await writeFile(filePath, content, 'utf8') 188 } 189 } catch (error) { 190 logError(error) 191 } 192 } 193 194 private async appendEventsToFile( 195 filePath: string, 196 events: FirstPartyEventLoggingEvent[], 197 ): Promise<void> { 198 if (events.length === 0) return 199 try { 200 // Ensure storage directory exists 201 await mkdir(getStorageDir(), { recursive: true }) 202 // Append as JSON lines (one event per line) - atomic on most filesystems 203 const content = events.map(e => jsonStringify(e)).join('\n') + '\n' 204 await appendFile(filePath, content, 'utf8') 205 } catch (error) { 206 logError(error) 207 } 208 } 209 210 private async deleteFile(filePath: string): Promise<void> { 211 try { 212 await unlink(filePath) 213 } catch { 214 // File doesn't exist or can't be deleted, ignore 215 } 216 } 217 218 // --- Previous batch retry (startup) --- 219 220 private async retryPreviousBatches(): Promise<void> { 221 try { 222 const prefix = `${FILE_PREFIX}${getSessionId()}.` 223 let files: string[] 224 try { 225 files = (await readdir(getStorageDir())) 226 .filter((f: string) => f.startsWith(prefix) && f.endsWith('.json')) 227 .filter((f: string) => !f.includes(BATCH_UUID)) // Exclude current batch 228 } catch (e) { 229 if (isFsInaccessible(e)) return 230 throw e 231 } 232 233 for (const file of files) { 234 const filePath = path.join(getStorageDir(), file) 235 void this.retryFileInBackground(filePath) 236 } 237 } catch (error) { 238 logError(error) 239 } 240 } 241 242 private async retryFileInBackground(filePath: string): Promise<void> { 243 if (this.attempts >= this.maxAttempts) { 244 await this.deleteFile(filePath) 245 return 246 } 247 248 const events = await this.loadEventsFromFile(filePath) 249 if (events.length === 0) { 250 await this.deleteFile(filePath) 251 return 252 } 253 254 if (process.env.USER_TYPE === 'ant') { 255 logForDebugging( 256 `1P event logging: retrying ${events.length} events from previous batch`, 257 ) 258 } 259 260 const failedEvents = await this.sendEventsInBatches(events) 261 if (failedEvents.length === 0) { 262 await this.deleteFile(filePath) 263 if (process.env.USER_TYPE === 'ant') { 264 logForDebugging('1P event logging: previous batch retry succeeded') 265 } 266 } else { 267 // Save only the failed events back (not all original events) 268 await this.saveEventsToFile(filePath, failedEvents) 269 if (process.env.USER_TYPE === 'ant') { 270 logForDebugging( 271 `1P event logging: previous batch retry failed, ${failedEvents.length} events remain`, 272 ) 273 } 274 } 275 } 276 277 async export( 278 logs: ReadableLogRecord[], 279 resultCallback: (result: ExportResult) => void, 280 ): Promise<void> { 281 if (this.isShutdown) { 282 if (process.env.USER_TYPE === 'ant') { 283 logForDebugging( 284 '1P event logging export failed: Exporter has been shutdown', 285 ) 286 } 287 resultCallback({ 288 code: ExportResultCode.FAILED, 289 error: new Error('Exporter has been shutdown'), 290 }) 291 return 292 } 293 294 const exportPromise = this.doExport(logs, resultCallback) 295 this.pendingExports.push(exportPromise) 296 297 // Clean up completed exports 298 void exportPromise.finally(() => { 299 const index = this.pendingExports.indexOf(exportPromise) 300 if (index > -1) { 301 void this.pendingExports.splice(index, 1) 302 } 303 }) 304 } 305 306 private async doExport( 307 logs: ReadableLogRecord[], 308 resultCallback: (result: ExportResult) => void, 309 ): Promise<void> { 310 try { 311 // Filter for event logs only (by scope name) 312 const eventLogs = logs.filter( 313 log => 314 log.instrumentationScope?.name === 'com.anthropic.claude_code.events', 315 ) 316 317 if (eventLogs.length === 0) { 318 resultCallback({ code: ExportResultCode.SUCCESS }) 319 return 320 } 321 322 // Transform new logs (failed events are retried independently via backoff) 323 const events = this.transformLogsToEvents(eventLogs).events 324 325 if (events.length === 0) { 326 resultCallback({ code: ExportResultCode.SUCCESS }) 327 return 328 } 329 330 if (this.attempts >= this.maxAttempts) { 331 resultCallback({ 332 code: ExportResultCode.FAILED, 333 error: new Error( 334 `Dropped ${events.length} events: max attempts (${this.maxAttempts}) reached`, 335 ), 336 }) 337 return 338 } 339 340 // Send events 341 const failedEvents = await this.sendEventsInBatches(events) 342 this.attempts++ 343 344 if (failedEvents.length > 0) { 345 await this.queueFailedEvents(failedEvents) 346 this.scheduleBackoffRetry() 347 const context = this.lastExportErrorContext 348 ? ` (${this.lastExportErrorContext})` 349 : '' 350 resultCallback({ 351 code: ExportResultCode.FAILED, 352 error: new Error( 353 `Failed to export ${failedEvents.length} events${context}`, 354 ), 355 }) 356 return 357 } 358 359 // Success - reset backoff and immediately retry any queued events 360 this.resetBackoff() 361 if ((await this.getQueuedEventCount()) > 0 && !this.isRetrying) { 362 void this.retryFailedEvents() 363 } 364 resultCallback({ code: ExportResultCode.SUCCESS }) 365 } catch (error) { 366 if (process.env.USER_TYPE === 'ant') { 367 logForDebugging( 368 `1P event logging export failed: ${errorMessage(error)}`, 369 ) 370 } 371 logError(error) 372 resultCallback({ 373 code: ExportResultCode.FAILED, 374 error: toError(error), 375 }) 376 } 377 } 378 379 private async sendEventsInBatches( 380 events: FirstPartyEventLoggingEvent[], 381 ): Promise<FirstPartyEventLoggingEvent[]> { 382 // Chunk events into batches 383 const batches: FirstPartyEventLoggingEvent[][] = [] 384 for (let i = 0; i < events.length; i += this.maxBatchSize) { 385 batches.push(events.slice(i, i + this.maxBatchSize)) 386 } 387 388 if (process.env.USER_TYPE === 'ant') { 389 logForDebugging( 390 `1P event logging: exporting ${events.length} events in ${batches.length} batch(es)`, 391 ) 392 } 393 394 // Send each batch with delay between them. On first failure, assume the 395 // endpoint is down and short-circuit: queue the failed batch plus all 396 // remaining unsent batches without POSTing them. The backoff retry will 397 // probe again with a single batch next tick. 398 const failedBatchEvents: FirstPartyEventLoggingEvent[] = [] 399 let lastErrorContext: string | undefined 400 for (let i = 0; i < batches.length; i++) { 401 const batch = batches[i]! 402 try { 403 await this.sendBatchWithRetry({ events: batch }) 404 } catch (error) { 405 lastErrorContext = getAxiosErrorContext(error) 406 for (let j = i; j < batches.length; j++) { 407 failedBatchEvents.push(...batches[j]!) 408 } 409 if (process.env.USER_TYPE === 'ant') { 410 const skipped = batches.length - 1 - i 411 logForDebugging( 412 `1P event logging: batch ${i + 1}/${batches.length} failed (${lastErrorContext}); short-circuiting ${skipped} remaining batch(es)`, 413 ) 414 } 415 break 416 } 417 418 if (i < batches.length - 1 && this.batchDelayMs > 0) { 419 await sleep(this.batchDelayMs) 420 } 421 } 422 423 if (failedBatchEvents.length > 0 && lastErrorContext) { 424 this.lastExportErrorContext = lastErrorContext 425 } 426 427 return failedBatchEvents 428 } 429 430 private async queueFailedEvents( 431 events: FirstPartyEventLoggingEvent[], 432 ): Promise<void> { 433 const filePath = this.getCurrentBatchFilePath() 434 435 // Append-only: just add new events to file (atomic on most filesystems) 436 await this.appendEventsToFile(filePath, events) 437 438 const context = this.lastExportErrorContext 439 ? ` (${this.lastExportErrorContext})` 440 : '' 441 const message = `1P event logging: ${events.length} events failed to export${context}` 442 logError(new Error(message)) 443 } 444 445 private scheduleBackoffRetry(): void { 446 // Don't schedule if already retrying or shutdown 447 if (this.cancelBackoff || this.isRetrying || this.isShutdown) { 448 return 449 } 450 451 // Quadratic backoff (matching Statsig SDK): base * attempts² 452 const delay = Math.min( 453 this.baseBackoffDelayMs * this.attempts * this.attempts, 454 this.maxBackoffDelayMs, 455 ) 456 457 if (process.env.USER_TYPE === 'ant') { 458 logForDebugging( 459 `1P event logging: scheduling backoff retry in ${delay}ms (attempt ${this.attempts})`, 460 ) 461 } 462 463 this.cancelBackoff = this.schedule(async () => { 464 this.cancelBackoff = null 465 await this.retryFailedEvents() 466 }, delay) 467 } 468 469 private async retryFailedEvents(): Promise<void> { 470 const filePath = this.getCurrentBatchFilePath() 471 472 // Keep retrying while there are events and endpoint is healthy 473 while (!this.isShutdown) { 474 const events = await this.loadEventsFromFile(filePath) 475 if (events.length === 0) break 476 477 if (this.attempts >= this.maxAttempts) { 478 if (process.env.USER_TYPE === 'ant') { 479 logForDebugging( 480 `1P event logging: max attempts (${this.maxAttempts}) reached, dropping ${events.length} events`, 481 ) 482 } 483 await this.deleteFile(filePath) 484 this.resetBackoff() 485 return 486 } 487 488 this.isRetrying = true 489 490 // Clear file before retry (we have events in memory now) 491 await this.deleteFile(filePath) 492 493 if (process.env.USER_TYPE === 'ant') { 494 logForDebugging( 495 `1P event logging: retrying ${events.length} failed events (attempt ${this.attempts + 1})`, 496 ) 497 } 498 499 const failedEvents = await this.sendEventsInBatches(events) 500 this.attempts++ 501 502 this.isRetrying = false 503 504 if (failedEvents.length > 0) { 505 // Write failures back to disk 506 await this.saveEventsToFile(filePath, failedEvents) 507 this.scheduleBackoffRetry() 508 return // Failed - wait for backoff 509 } 510 511 // Success - reset backoff and continue loop to drain any newly queued events 512 this.resetBackoff() 513 if (process.env.USER_TYPE === 'ant') { 514 logForDebugging('1P event logging: backoff retry succeeded') 515 } 516 } 517 } 518 519 private resetBackoff(): void { 520 this.attempts = 0 521 if (this.cancelBackoff) { 522 this.cancelBackoff() 523 this.cancelBackoff = null 524 } 525 } 526 527 private async sendBatchWithRetry( 528 payload: FirstPartyEventLoggingPayload, 529 ): Promise<void> { 530 if (this.isKilled()) { 531 // Throw so the caller short-circuits remaining batches and queues 532 // everything to disk. Zero network traffic while killed; the backoff 533 // timer keeps ticking and will resume POSTs as soon as the GrowthBook 534 // cache picks up the cleared flag. 535 throw new Error('firstParty sink killswitch active') 536 } 537 538 const baseHeaders: Record<string, string> = { 539 'Content-Type': 'application/json', 540 'User-Agent': getClaudeCodeUserAgent(), 541 'x-service-name': 'claude-code', 542 } 543 544 // Skip auth if trust hasn't been established yet 545 // This prevents executing apiKeyHelper commands before the trust dialog 546 // Non-interactive sessions implicitly have workspace trust 547 const hasTrust = 548 checkHasTrustDialogAccepted() || getIsNonInteractiveSession() 549 if (process.env.USER_TYPE === 'ant' && !hasTrust) { 550 logForDebugging('1P event logging: Trust not accepted') 551 } 552 553 // Skip auth when the OAuth token is expired or lacks user:profile 554 // scope (service key sessions). Falls through to unauthenticated send. 555 let shouldSkipAuth = this.skipAuth || !hasTrust 556 if (!shouldSkipAuth && isClaudeAISubscriber()) { 557 const tokens = getClaudeAIOAuthTokens() 558 if (!hasProfileScope()) { 559 shouldSkipAuth = true 560 } else if (tokens && isOAuthTokenExpired(tokens.expiresAt)) { 561 shouldSkipAuth = true 562 if (process.env.USER_TYPE === 'ant') { 563 logForDebugging( 564 '1P event logging: OAuth token expired, skipping auth to avoid 401', 565 ) 566 } 567 } 568 } 569 570 // Try with auth headers first (unless trust not established or token is known to be expired) 571 const authResult = shouldSkipAuth 572 ? { headers: {}, error: 'trust not established or Oauth token expired' } 573 : getAuthHeaders() 574 const useAuth = !authResult.error 575 576 if (!useAuth && process.env.USER_TYPE === 'ant') { 577 logForDebugging( 578 `1P event logging: auth not available, sending without auth`, 579 ) 580 } 581 582 const headers = useAuth 583 ? { ...baseHeaders, ...authResult.headers } 584 : baseHeaders 585 586 try { 587 const response = await axios.post(this.endpoint, payload, { 588 timeout: this.timeout, 589 headers, 590 }) 591 this.logSuccess(payload.events.length, useAuth, response.data) 592 return 593 } catch (error) { 594 // Handle 401 by retrying without auth 595 if ( 596 useAuth && 597 axios.isAxiosError(error) && 598 error.response?.status === 401 599 ) { 600 if (process.env.USER_TYPE === 'ant') { 601 logForDebugging( 602 '1P event logging: 401 auth error, retrying without auth', 603 ) 604 } 605 const response = await axios.post(this.endpoint, payload, { 606 timeout: this.timeout, 607 headers: baseHeaders, 608 }) 609 this.logSuccess(payload.events.length, false, response.data) 610 return 611 } 612 613 throw error 614 } 615 } 616 617 private logSuccess( 618 eventCount: number, 619 withAuth: boolean, 620 responseData: unknown, 621 ): void { 622 if (process.env.USER_TYPE === 'ant') { 623 logForDebugging( 624 `1P event logging: ${eventCount} events exported successfully${withAuth ? ' (with auth)' : ' (without auth)'}`, 625 ) 626 logForDebugging(`API Response: ${jsonStringify(responseData, null, 2)}`) 627 } 628 } 629 630 private hrTimeToDate(hrTime: HrTime): Date { 631 const [seconds, nanoseconds] = hrTime 632 return new Date(seconds * 1000 + nanoseconds / 1000000) 633 } 634 635 private transformLogsToEvents( 636 logs: ReadableLogRecord[], 637 ): FirstPartyEventLoggingPayload { 638 const events: FirstPartyEventLoggingEvent[] = [] 639 640 for (const log of logs) { 641 const attributes = log.attributes || {} 642 643 // Check if this is a GrowthBook experiment event 644 if (attributes.event_type === 'GrowthbookExperimentEvent') { 645 const timestamp = this.hrTimeToDate(log.hrTime) 646 const account_uuid = attributes.account_uuid as string | undefined 647 const organization_uuid = attributes.organization_uuid as 648 | string 649 | undefined 650 events.push({ 651 event_type: 'GrowthbookExperimentEvent', 652 event_data: GrowthbookExperimentEvent.toJSON({ 653 event_id: attributes.event_id as string, 654 timestamp, 655 experiment_id: attributes.experiment_id as string, 656 variation_id: attributes.variation_id as number, 657 environment: attributes.environment as string, 658 user_attributes: attributes.user_attributes as string, 659 experiment_metadata: attributes.experiment_metadata as string, 660 device_id: attributes.device_id as string, 661 session_id: attributes.session_id as string, 662 auth: 663 account_uuid || organization_uuid 664 ? { account_uuid, organization_uuid } 665 : undefined, 666 }), 667 }) 668 continue 669 } 670 671 // Extract event name 672 const eventName = 673 (attributes.event_name as string) || (log.body as string) || 'unknown' 674 675 // Extract metadata objects directly (no JSON parsing needed) 676 const coreMetadata = attributes.core_metadata as EventMetadata | undefined 677 const userMetadata = attributes.user_metadata as CoreUserData 678 const eventMetadata = (attributes.event_metadata || {}) as Record< 679 string, 680 unknown 681 > 682 683 if (!coreMetadata) { 684 // Emit partial event if core metadata is missing 685 if (process.env.USER_TYPE === 'ant') { 686 logForDebugging( 687 `1P event logging: core_metadata missing for event ${eventName}`, 688 ) 689 } 690 events.push({ 691 event_type: 'ClaudeCodeInternalEvent', 692 event_data: ClaudeCodeInternalEvent.toJSON({ 693 event_id: attributes.event_id as string | undefined, 694 event_name: eventName, 695 client_timestamp: this.hrTimeToDate(log.hrTime), 696 session_id: getSessionId(), 697 additional_metadata: Buffer.from( 698 jsonStringify({ 699 transform_error: 'core_metadata attribute is missing', 700 }), 701 ).toString('base64'), 702 }), 703 }) 704 continue 705 } 706 707 // Transform to 1P format 708 const formatted = to1PEventFormat( 709 coreMetadata, 710 userMetadata, 711 eventMetadata, 712 ) 713 714 // _PROTO_* keys are PII-tagged values meant only for privileged BQ 715 // columns. Hoist known keys to proto fields, then defensively strip any 716 // remaining _PROTO_* so an unrecognized future key can't silently land 717 // in the general-access additional_metadata blob. sink.ts applies the 718 // same strip before Datadog; this closes the 1P side. 719 const { 720 _PROTO_skill_name, 721 _PROTO_plugin_name, 722 _PROTO_marketplace_name, 723 ...rest 724 } = formatted.additional 725 const additionalMetadata = stripProtoFields(rest) 726 727 events.push({ 728 event_type: 'ClaudeCodeInternalEvent', 729 event_data: ClaudeCodeInternalEvent.toJSON({ 730 event_id: attributes.event_id as string | undefined, 731 event_name: eventName, 732 client_timestamp: this.hrTimeToDate(log.hrTime), 733 device_id: attributes.user_id as string | undefined, 734 email: userMetadata?.email, 735 auth: formatted.auth, 736 ...formatted.core, 737 env: formatted.env, 738 process: formatted.process, 739 skill_name: 740 typeof _PROTO_skill_name === 'string' 741 ? _PROTO_skill_name 742 : undefined, 743 plugin_name: 744 typeof _PROTO_plugin_name === 'string' 745 ? _PROTO_plugin_name 746 : undefined, 747 marketplace_name: 748 typeof _PROTO_marketplace_name === 'string' 749 ? _PROTO_marketplace_name 750 : undefined, 751 additional_metadata: 752 Object.keys(additionalMetadata).length > 0 753 ? Buffer.from(jsonStringify(additionalMetadata)).toString( 754 'base64', 755 ) 756 : undefined, 757 }), 758 }) 759 } 760 761 return { events } 762 } 763 764 async shutdown(): Promise<void> { 765 this.isShutdown = true 766 this.resetBackoff() 767 await this.forceFlush() 768 if (process.env.USER_TYPE === 'ant') { 769 logForDebugging('1P event logging exporter shutdown complete') 770 } 771 } 772 773 async forceFlush(): Promise<void> { 774 await Promise.all(this.pendingExports) 775 if (process.env.USER_TYPE === 'ant') { 776 logForDebugging('1P event logging exporter flush complete') 777 } 778 } 779} 780 781function getAxiosErrorContext(error: unknown): string { 782 if (!axios.isAxiosError(error)) { 783 return errorMessage(error) 784 } 785 786 const parts: string[] = [] 787 788 const requestId = error.response?.headers?.['request-id'] 789 if (requestId) { 790 parts.push(`request-id=${requestId}`) 791 } 792 793 if (error.response?.status) { 794 parts.push(`status=${error.response.status}`) 795 } 796 797 if (error.code) { 798 parts.push(`code=${error.code}`) 799 } 800 801 if (error.message) { 802 parts.push(error.message) 803 } 804 805 return parts.join(', ') 806}