source dump of claude code
at main 998 lines 34 kB view raw
1import { randomUUID } from 'crypto' 2import type { 3 SDKPartialAssistantMessage, 4 StdoutMessage, 5} from 'src/entrypoints/sdk/controlTypes.js' 6import { decodeJwtExpiry } from '../../bridge/jwtUtils.js' 7import { logForDebugging } from '../../utils/debug.js' 8import { logForDiagnosticsNoPII } from '../../utils/diagLogs.js' 9import { errorMessage, getErrnoCode } from '../../utils/errors.js' 10import { createAxiosInstance } from '../../utils/proxy.js' 11import { 12 registerSessionActivityCallback, 13 unregisterSessionActivityCallback, 14} from '../../utils/sessionActivity.js' 15import { 16 getSessionIngressAuthHeaders, 17 getSessionIngressAuthToken, 18} from '../../utils/sessionIngressAuth.js' 19import type { 20 RequiresActionDetails, 21 SessionState, 22} from '../../utils/sessionState.js' 23import { sleep } from '../../utils/sleep.js' 24import { getClaudeCodeUserAgent } from '../../utils/userAgent.js' 25import { 26 RetryableError, 27 SerialBatchEventUploader, 28} from './SerialBatchEventUploader.js' 29import type { SSETransport, StreamClientEvent } from './SSETransport.js' 30import { WorkerStateUploader } from './WorkerStateUploader.js' 31 32/** Default interval between heartbeat events (20s; server TTL is 60s). */ 33const DEFAULT_HEARTBEAT_INTERVAL_MS = 20_000 34 35/** 36 * stream_event messages accumulate in a delay buffer for up to this many ms 37 * before enqueue. Mirrors HybridTransport's batching window. text_delta 38 * events for the same content block accumulate into a single full-so-far 39 * snapshot per flush — each emitted event is self-contained so a client 40 * connecting mid-stream sees complete text, not a fragment. 41 */ 42const STREAM_EVENT_FLUSH_INTERVAL_MS = 100 43 44/** Hoisted axios validateStatus callback to avoid per-request closure allocation. */ 45function alwaysValidStatus(): boolean { 46 return true 47} 48 49export type CCRInitFailReason = 50 | 'no_auth_headers' 51 | 'missing_epoch' 52 | 'worker_register_failed' 53 54/** Thrown by initialize(); carries a typed reason for the diag classifier. */ 55export class CCRInitError extends Error { 56 constructor(readonly reason: CCRInitFailReason) { 57 super(`CCRClient init failed: ${reason}`) 58 } 59} 60 61/** 62 * Consecutive 401/403 with a VALID-LOOKING token before giving up. An 63 * expired JWT short-circuits this (exits immediately — deterministic, 64 * retry is futile). This threshold is for the uncertain case: token's 65 * exp is in the future but server says 401 (userauth down, KMS hiccup, 66 * clock skew). 10 × 20s heartbeat ≈ 200s to ride it out. 67 */ 68const MAX_CONSECUTIVE_AUTH_FAILURES = 10 69 70type EventPayload = { 71 uuid: string 72 type: string 73 [key: string]: unknown 74} 75 76type ClientEvent = { 77 payload: EventPayload 78 ephemeral?: boolean 79} 80 81/** 82 * Structural subset of a stream_event carrying a text_delta. Not a narrowing 83 * of SDKPartialAssistantMessage — RawMessageStreamEvent's delta is a union and 84 * narrowing through two levels defeats the discriminant. 85 */ 86type CoalescedStreamEvent = { 87 type: 'stream_event' 88 uuid: string 89 session_id: string 90 parent_tool_use_id: string | null 91 event: { 92 type: 'content_block_delta' 93 index: number 94 delta: { type: 'text_delta'; text: string } 95 } 96} 97 98/** 99 * Accumulator state for text_delta coalescing. Keyed by API message ID so 100 * lifetime is tied to the assistant message — cleared when the complete 101 * SDKAssistantMessage arrives (writeEvent), which is reliable even when 102 * abort/error paths skip content_block_stop/message_stop delivery. 103 */ 104export type StreamAccumulatorState = { 105 /** API message ID (msg_...) → blocks[blockIndex] → chunk array. */ 106 byMessage: Map<string, string[][]> 107 /** 108 * {session_id}:{parent_tool_use_id} → active message ID. 109 * content_block_delta events don't carry the message ID (only 110 * message_start does), so we track which message is currently streaming 111 * for each scope. At most one message streams per scope at a time. 112 */ 113 scopeToMessage: Map<string, string> 114} 115 116export function createStreamAccumulator(): StreamAccumulatorState { 117 return { byMessage: new Map(), scopeToMessage: new Map() } 118} 119 120function scopeKey(m: { 121 session_id: string 122 parent_tool_use_id: string | null 123}): string { 124 return `${m.session_id}:${m.parent_tool_use_id ?? ''}` 125} 126 127/** 128 * Accumulate text_delta stream_events into full-so-far snapshots per content 129 * block. Each flush emits ONE event per touched block containing the FULL 130 * accumulated text from the start of the block — a client connecting 131 * mid-stream receives a self-contained snapshot, not a fragment. 132 * 133 * Non-text-delta events pass through unchanged. message_start records the 134 * active message ID for the scope; content_block_delta appends chunks; 135 * the snapshot event reuses the first text_delta UUID seen for that block in 136 * this flush so server-side idempotency remains stable across retries. 137 * 138 * Cleanup happens in writeEvent when the complete assistant message arrives 139 * (reliable), not here on stop events (abort/error paths skip those). 140 */ 141export function accumulateStreamEvents( 142 buffer: SDKPartialAssistantMessage[], 143 state: StreamAccumulatorState, 144): EventPayload[] { 145 const out: EventPayload[] = [] 146 // chunks[] → snapshot already in `out` this flush. Keyed by the chunks 147 // array reference (stable per {messageId, index}) so subsequent deltas 148 // rewrite the same entry instead of emitting one event per delta. 149 const touched = new Map<string[], CoalescedStreamEvent>() 150 for (const msg of buffer) { 151 switch (msg.event.type) { 152 case 'message_start': { 153 const id = msg.event.message.id 154 const prevId = state.scopeToMessage.get(scopeKey(msg)) 155 if (prevId) state.byMessage.delete(prevId) 156 state.scopeToMessage.set(scopeKey(msg), id) 157 state.byMessage.set(id, []) 158 out.push(msg) 159 break 160 } 161 case 'content_block_delta': { 162 if (msg.event.delta.type !== 'text_delta') { 163 out.push(msg) 164 break 165 } 166 const messageId = state.scopeToMessage.get(scopeKey(msg)) 167 const blocks = messageId ? state.byMessage.get(messageId) : undefined 168 if (!blocks) { 169 // Delta without a preceding message_start (reconnect mid-stream, 170 // or message_start was in a prior buffer that got dropped). Pass 171 // through raw — can't produce a full-so-far snapshot without the 172 // prior chunks anyway. 173 out.push(msg) 174 break 175 } 176 const chunks = (blocks[msg.event.index] ??= []) 177 chunks.push(msg.event.delta.text) 178 const existing = touched.get(chunks) 179 if (existing) { 180 existing.event.delta.text = chunks.join('') 181 break 182 } 183 const snapshot: CoalescedStreamEvent = { 184 type: 'stream_event', 185 uuid: msg.uuid, 186 session_id: msg.session_id, 187 parent_tool_use_id: msg.parent_tool_use_id, 188 event: { 189 type: 'content_block_delta', 190 index: msg.event.index, 191 delta: { type: 'text_delta', text: chunks.join('') }, 192 }, 193 } 194 touched.set(chunks, snapshot) 195 out.push(snapshot) 196 break 197 } 198 default: 199 out.push(msg) 200 } 201 } 202 return out 203} 204 205/** 206 * Clear accumulator entries for a completed assistant message. Called from 207 * writeEvent when the SDKAssistantMessage arrives — the reliable end-of-stream 208 * signal that fires even when abort/interrupt/error skip SSE stop events. 209 */ 210export function clearStreamAccumulatorForMessage( 211 state: StreamAccumulatorState, 212 assistant: { 213 session_id: string 214 parent_tool_use_id: string | null 215 message: { id: string } 216 }, 217): void { 218 state.byMessage.delete(assistant.message.id) 219 const scope = scopeKey(assistant) 220 if (state.scopeToMessage.get(scope) === assistant.message.id) { 221 state.scopeToMessage.delete(scope) 222 } 223} 224 225type RequestResult = { ok: true } | { ok: false; retryAfterMs?: number } 226 227type WorkerEvent = { 228 payload: EventPayload 229 is_compaction?: boolean 230 agent_id?: string 231} 232 233export type InternalEvent = { 234 event_id: string 235 event_type: string 236 payload: Record<string, unknown> 237 event_metadata?: Record<string, unknown> | null 238 is_compaction: boolean 239 created_at: string 240 agent_id?: string 241} 242 243type ListInternalEventsResponse = { 244 data: InternalEvent[] 245 next_cursor?: string 246} 247 248type WorkerStateResponse = { 249 worker?: { 250 external_metadata?: Record<string, unknown> 251 } 252} 253 254/** 255 * Manages the worker lifecycle protocol with CCR v2: 256 * - Epoch management: reads worker_epoch from CLAUDE_CODE_WORKER_EPOCH env var 257 * - Runtime state reporting: PUT /sessions/{id}/worker 258 * - Heartbeat: POST /sessions/{id}/worker/heartbeat for liveness detection 259 * 260 * All writes go through this.request(). 261 */ 262export class CCRClient { 263 private workerEpoch = 0 264 private readonly heartbeatIntervalMs: number 265 private readonly heartbeatJitterFraction: number 266 private heartbeatTimer: NodeJS.Timeout | null = null 267 private heartbeatInFlight = false 268 private closed = false 269 private consecutiveAuthFailures = 0 270 private currentState: SessionState | null = null 271 private readonly sessionBaseUrl: string 272 private readonly sessionId: string 273 private readonly http = createAxiosInstance({ keepAlive: true }) 274 275 // stream_event delay buffer — accumulates content deltas for up to 276 // STREAM_EVENT_FLUSH_INTERVAL_MS before enqueueing (reduces POST count 277 // and enables text_delta coalescing). Mirrors HybridTransport's pattern. 278 private streamEventBuffer: SDKPartialAssistantMessage[] = [] 279 private streamEventTimer: ReturnType<typeof setTimeout> | null = null 280 // Full-so-far text accumulator. Persists across flushes so each emitted 281 // text_delta event carries the complete text from the start of the block — 282 // mid-stream reconnects see a self-contained snapshot. Keyed by API message 283 // ID; cleared in writeEvent when the complete assistant message arrives. 284 private streamTextAccumulator = createStreamAccumulator() 285 286 private readonly workerState: WorkerStateUploader 287 private readonly eventUploader: SerialBatchEventUploader<ClientEvent> 288 private readonly internalEventUploader: SerialBatchEventUploader<WorkerEvent> 289 private readonly deliveryUploader: SerialBatchEventUploader<{ 290 eventId: string 291 status: 'received' | 'processing' | 'processed' 292 }> 293 294 /** 295 * Called when the server returns 409 (a newer worker epoch superseded ours). 296 * Default: process.exit(1) — correct for spawn-mode children where the 297 * parent bridge re-spawns. In-process callers (replBridge) MUST override 298 * this to close gracefully instead; exit would kill the user's REPL. 299 */ 300 private readonly onEpochMismatch: () => never 301 302 /** 303 * Auth header source. Defaults to the process-wide session-ingress token 304 * (CLAUDE_CODE_SESSION_ACCESS_TOKEN env var). Callers managing multiple 305 * concurrent sessions with distinct JWTs MUST inject this — the env-var 306 * path is a process global and would stomp across sessions. 307 */ 308 private readonly getAuthHeaders: () => Record<string, string> 309 310 constructor( 311 transport: SSETransport, 312 sessionUrl: URL, 313 opts?: { 314 onEpochMismatch?: () => never 315 heartbeatIntervalMs?: number 316 heartbeatJitterFraction?: number 317 /** 318 * Per-instance auth header source. Omit to read the process-wide 319 * CLAUDE_CODE_SESSION_ACCESS_TOKEN (single-session callers — REPL, 320 * daemon). Required for concurrent multi-session callers. 321 */ 322 getAuthHeaders?: () => Record<string, string> 323 }, 324 ) { 325 this.onEpochMismatch = 326 opts?.onEpochMismatch ?? 327 (() => { 328 // eslint-disable-next-line custom-rules/no-process-exit 329 process.exit(1) 330 }) 331 this.heartbeatIntervalMs = 332 opts?.heartbeatIntervalMs ?? DEFAULT_HEARTBEAT_INTERVAL_MS 333 this.heartbeatJitterFraction = opts?.heartbeatJitterFraction ?? 0 334 this.getAuthHeaders = opts?.getAuthHeaders ?? getSessionIngressAuthHeaders 335 // Session URL: https://host/v1/code/sessions/{id} 336 if (sessionUrl.protocol !== 'http:' && sessionUrl.protocol !== 'https:') { 337 throw new Error( 338 `CCRClient: Expected http(s) URL, got ${sessionUrl.protocol}`, 339 ) 340 } 341 const pathname = sessionUrl.pathname.replace(/\/$/, '') 342 this.sessionBaseUrl = `${sessionUrl.protocol}//${sessionUrl.host}${pathname}` 343 // Extract session ID from the URL path (last segment) 344 this.sessionId = pathname.split('/').pop() || '' 345 346 this.workerState = new WorkerStateUploader({ 347 send: body => 348 this.request( 349 'put', 350 '/worker', 351 { worker_epoch: this.workerEpoch, ...body }, 352 'PUT worker', 353 ).then(r => r.ok), 354 baseDelayMs: 500, 355 maxDelayMs: 30_000, 356 jitterMs: 500, 357 }) 358 359 this.eventUploader = new SerialBatchEventUploader<ClientEvent>({ 360 maxBatchSize: 100, 361 maxBatchBytes: 10 * 1024 * 1024, 362 // flushStreamEventBuffer() enqueues a full 100ms window of accumulated 363 // stream_events in one call. A burst of mixed delta types that don't 364 // fold into a single snapshot could exceed the old cap (50) and deadlock 365 // on the SerialBatchEventUploader backpressure check. Match 366 // HybridTransport's bound — high enough to be memory-only. 367 maxQueueSize: 100_000, 368 send: async batch => { 369 const result = await this.request( 370 'post', 371 '/worker/events', 372 { worker_epoch: this.workerEpoch, events: batch }, 373 'client events', 374 ) 375 if (!result.ok) { 376 throw new RetryableError( 377 'client event POST failed', 378 result.retryAfterMs, 379 ) 380 } 381 }, 382 baseDelayMs: 500, 383 maxDelayMs: 30_000, 384 jitterMs: 500, 385 }) 386 387 this.internalEventUploader = new SerialBatchEventUploader<WorkerEvent>({ 388 maxBatchSize: 100, 389 maxBatchBytes: 10 * 1024 * 1024, 390 maxQueueSize: 200, 391 send: async batch => { 392 const result = await this.request( 393 'post', 394 '/worker/internal-events', 395 { worker_epoch: this.workerEpoch, events: batch }, 396 'internal events', 397 ) 398 if (!result.ok) { 399 throw new RetryableError( 400 'internal event POST failed', 401 result.retryAfterMs, 402 ) 403 } 404 }, 405 baseDelayMs: 500, 406 maxDelayMs: 30_000, 407 jitterMs: 500, 408 }) 409 410 this.deliveryUploader = new SerialBatchEventUploader<{ 411 eventId: string 412 status: 'received' | 'processing' | 'processed' 413 }>({ 414 maxBatchSize: 64, 415 maxQueueSize: 64, 416 send: async batch => { 417 const result = await this.request( 418 'post', 419 '/worker/events/delivery', 420 { 421 worker_epoch: this.workerEpoch, 422 updates: batch.map(d => ({ 423 event_id: d.eventId, 424 status: d.status, 425 })), 426 }, 427 'delivery batch', 428 ) 429 if (!result.ok) { 430 throw new RetryableError('delivery POST failed', result.retryAfterMs) 431 } 432 }, 433 baseDelayMs: 500, 434 maxDelayMs: 30_000, 435 jitterMs: 500, 436 }) 437 438 // Ack each received client_event so CCR can track delivery status. 439 // Wired here (not in initialize()) so the callback is registered the 440 // moment new CCRClient() returns — remoteIO must be free to call 441 // transport.connect() immediately after without racing the first 442 // SSE catch-up frame against an unwired onEventCallback. 443 transport.setOnEvent((event: StreamClientEvent) => { 444 this.reportDelivery(event.event_id, 'received') 445 }) 446 } 447 448 /** 449 * Initialize the session worker: 450 * 1. Take worker_epoch from the argument, or fall back to 451 * CLAUDE_CODE_WORKER_EPOCH (set by env-manager / bridge spawner) 452 * 2. Report state as 'idle' 453 * 3. Start heartbeat timer 454 * 455 * In-process callers (replBridge) pass the epoch directly — they 456 * registered the worker themselves and there is no parent process 457 * setting env vars. 458 */ 459 async initialize(epoch?: number): Promise<Record<string, unknown> | null> { 460 const startMs = Date.now() 461 if (Object.keys(this.getAuthHeaders()).length === 0) { 462 throw new CCRInitError('no_auth_headers') 463 } 464 if (epoch === undefined) { 465 const rawEpoch = process.env.CLAUDE_CODE_WORKER_EPOCH 466 epoch = rawEpoch ? parseInt(rawEpoch, 10) : NaN 467 } 468 if (isNaN(epoch)) { 469 throw new CCRInitError('missing_epoch') 470 } 471 this.workerEpoch = epoch 472 473 // Concurrent with the init PUT — neither depends on the other. 474 const restoredPromise = this.getWorkerState() 475 476 const result = await this.request( 477 'put', 478 '/worker', 479 { 480 worker_status: 'idle', 481 worker_epoch: this.workerEpoch, 482 // Clear stale pending_action/task_summary left by a prior 483 // worker crash — the in-session clears don't survive process restart. 484 external_metadata: { 485 pending_action: null, 486 task_summary: null, 487 }, 488 }, 489 'PUT worker (init)', 490 ) 491 if (!result.ok) { 492 // 409 → onEpochMismatch may throw, but request() catches it and returns 493 // false. Without this check we'd continue to startHeartbeat(), leaking a 494 // 20s timer against a dead epoch. Throw so connect()'s rejection handler 495 // fires instead of the success path. 496 throw new CCRInitError('worker_register_failed') 497 } 498 this.currentState = 'idle' 499 this.startHeartbeat() 500 501 // sessionActivity's refcount-gated timer fires while an API call or tool 502 // is in-flight; without a write the container lease can expire mid-wait. 503 // v1 wires this in WebSocketTransport per-connection. 504 registerSessionActivityCallback(() => { 505 void this.writeEvent({ type: 'keep_alive' }) 506 }) 507 508 logForDebugging(`CCRClient: initialized, epoch=${this.workerEpoch}`) 509 logForDiagnosticsNoPII('info', 'cli_worker_lifecycle_initialized', { 510 epoch: this.workerEpoch, 511 duration_ms: Date.now() - startMs, 512 }) 513 514 // Await the concurrent GET and log state_restored here, after the PUT 515 // has succeeded — logging inside getWorkerState() raced: if the GET 516 // resolved before the PUT failed, diagnostics showed both init_failed 517 // and state_restored for the same session. 518 const { metadata, durationMs } = await restoredPromise 519 if (!this.closed) { 520 logForDiagnosticsNoPII('info', 'cli_worker_state_restored', { 521 duration_ms: durationMs, 522 had_state: metadata !== null, 523 }) 524 } 525 return metadata 526 } 527 528 // Control_requests are marked processed and not re-delivered on 529 // restart, so read back what the prior worker wrote. 530 private async getWorkerState(): Promise<{ 531 metadata: Record<string, unknown> | null 532 durationMs: number 533 }> { 534 const startMs = Date.now() 535 const authHeaders = this.getAuthHeaders() 536 if (Object.keys(authHeaders).length === 0) { 537 return { metadata: null, durationMs: 0 } 538 } 539 const data = await this.getWithRetry<WorkerStateResponse>( 540 `${this.sessionBaseUrl}/worker`, 541 authHeaders, 542 'worker_state', 543 ) 544 return { 545 metadata: data?.worker?.external_metadata ?? null, 546 durationMs: Date.now() - startMs, 547 } 548 } 549 550 /** 551 * Send an authenticated HTTP request to CCR. Handles auth headers, 552 * 409 epoch mismatch, and error logging. Returns { ok: true } on 2xx. 553 * On 429, reads Retry-After (integer seconds) so the uploader can honor 554 * the server's backoff hint instead of blindly exponentiating. 555 */ 556 private async request( 557 method: 'post' | 'put', 558 path: string, 559 body: unknown, 560 label: string, 561 { timeout = 10_000 }: { timeout?: number } = {}, 562 ): Promise<RequestResult> { 563 const authHeaders = this.getAuthHeaders() 564 if (Object.keys(authHeaders).length === 0) return { ok: false } 565 566 try { 567 const response = await this.http[method]( 568 `${this.sessionBaseUrl}${path}`, 569 body, 570 { 571 headers: { 572 ...authHeaders, 573 'Content-Type': 'application/json', 574 'anthropic-version': '2023-06-01', 575 'User-Agent': getClaudeCodeUserAgent(), 576 }, 577 validateStatus: alwaysValidStatus, 578 timeout, 579 }, 580 ) 581 582 if (response.status >= 200 && response.status < 300) { 583 this.consecutiveAuthFailures = 0 584 return { ok: true } 585 } 586 if (response.status === 409) { 587 this.handleEpochMismatch() 588 } 589 if (response.status === 401 || response.status === 403) { 590 // A 401 with an expired JWT is deterministic — no retry will 591 // ever succeed. Check the token's own exp before burning 592 // wall-clock on the threshold loop. 593 const tok = getSessionIngressAuthToken() 594 const exp = tok ? decodeJwtExpiry(tok) : null 595 if (exp !== null && exp * 1000 < Date.now()) { 596 logForDebugging( 597 `CCRClient: session_token expired (exp=${new Date(exp * 1000).toISOString()}) — no refresh was delivered, exiting`, 598 { level: 'error' }, 599 ) 600 logForDiagnosticsNoPII('error', 'cli_worker_token_expired_no_refresh') 601 this.onEpochMismatch() 602 } 603 // Token looks valid but server says 401 — possible server-side 604 // blip (userauth down, KMS hiccup). Count toward threshold. 605 this.consecutiveAuthFailures++ 606 if (this.consecutiveAuthFailures >= MAX_CONSECUTIVE_AUTH_FAILURES) { 607 logForDebugging( 608 `CCRClient: ${this.consecutiveAuthFailures} consecutive auth failures with a valid-looking token — server-side auth unrecoverable, exiting`, 609 { level: 'error' }, 610 ) 611 logForDiagnosticsNoPII('error', 'cli_worker_auth_failures_exhausted') 612 this.onEpochMismatch() 613 } 614 } 615 logForDebugging(`CCRClient: ${label} returned ${response.status}`, { 616 level: 'warn', 617 }) 618 logForDiagnosticsNoPII('warn', 'cli_worker_request_failed', { 619 method, 620 path, 621 status: response.status, 622 }) 623 if (response.status === 429) { 624 const raw = response.headers?.['retry-after'] 625 const seconds = typeof raw === 'string' ? parseInt(raw, 10) : NaN 626 if (!isNaN(seconds) && seconds >= 0) { 627 return { ok: false, retryAfterMs: seconds * 1000 } 628 } 629 } 630 return { ok: false } 631 } catch (error) { 632 logForDebugging(`CCRClient: ${label} failed: ${errorMessage(error)}`, { 633 level: 'warn', 634 }) 635 logForDiagnosticsNoPII('warn', 'cli_worker_request_error', { 636 method, 637 path, 638 error_code: getErrnoCode(error), 639 }) 640 return { ok: false } 641 } 642 } 643 644 /** Report worker state to CCR via PUT /sessions/{id}/worker. */ 645 reportState(state: SessionState, details?: RequiresActionDetails): void { 646 if (state === this.currentState && !details) return 647 this.currentState = state 648 this.workerState.enqueue({ 649 worker_status: state, 650 requires_action_details: details 651 ? { 652 tool_name: details.tool_name, 653 action_description: details.action_description, 654 request_id: details.request_id, 655 } 656 : null, 657 }) 658 } 659 660 /** Report external metadata to CCR via PUT /worker. */ 661 reportMetadata(metadata: Record<string, unknown>): void { 662 this.workerState.enqueue({ external_metadata: metadata }) 663 } 664 665 /** 666 * Handle epoch mismatch (409 Conflict). A newer CC instance has replaced 667 * this one — exit immediately. 668 */ 669 private handleEpochMismatch(): never { 670 logForDebugging('CCRClient: Epoch mismatch (409), shutting down', { 671 level: 'error', 672 }) 673 logForDiagnosticsNoPII('error', 'cli_worker_epoch_mismatch') 674 this.onEpochMismatch() 675 } 676 677 /** Start periodic heartbeat. */ 678 private startHeartbeat(): void { 679 this.stopHeartbeat() 680 const schedule = (): void => { 681 const jitter = 682 this.heartbeatIntervalMs * 683 this.heartbeatJitterFraction * 684 (2 * Math.random() - 1) 685 this.heartbeatTimer = setTimeout(tick, this.heartbeatIntervalMs + jitter) 686 } 687 const tick = (): void => { 688 void this.sendHeartbeat() 689 // stopHeartbeat nulls the timer; check after the fire-and-forget send 690 // but before rescheduling so close() during sendHeartbeat is honored. 691 if (this.heartbeatTimer === null) return 692 schedule() 693 } 694 schedule() 695 } 696 697 /** Stop heartbeat timer. */ 698 private stopHeartbeat(): void { 699 if (this.heartbeatTimer) { 700 clearTimeout(this.heartbeatTimer) 701 this.heartbeatTimer = null 702 } 703 } 704 705 /** Send a heartbeat via POST /sessions/{id}/worker/heartbeat. */ 706 private async sendHeartbeat(): Promise<void> { 707 if (this.heartbeatInFlight) return 708 this.heartbeatInFlight = true 709 try { 710 const result = await this.request( 711 'post', 712 '/worker/heartbeat', 713 { session_id: this.sessionId, worker_epoch: this.workerEpoch }, 714 'Heartbeat', 715 { timeout: 5_000 }, 716 ) 717 if (result.ok) { 718 logForDebugging('CCRClient: Heartbeat sent') 719 } 720 } finally { 721 this.heartbeatInFlight = false 722 } 723 } 724 725 /** 726 * Write a StdoutMessage as a client event via POST /sessions/{id}/worker/events. 727 * These events are visible to frontend clients via the SSE stream. 728 * Injects a UUID if missing to ensure server-side idempotency on retry. 729 * 730 * stream_event messages are held in a 100ms delay buffer and accumulated 731 * (text_deltas for the same content block emit a full-so-far snapshot per 732 * flush). A non-stream_event write flushes the buffer first so downstream 733 * ordering is preserved. 734 */ 735 async writeEvent(message: StdoutMessage): Promise<void> { 736 if (message.type === 'stream_event') { 737 this.streamEventBuffer.push(message) 738 if (!this.streamEventTimer) { 739 this.streamEventTimer = setTimeout( 740 () => void this.flushStreamEventBuffer(), 741 STREAM_EVENT_FLUSH_INTERVAL_MS, 742 ) 743 } 744 return 745 } 746 await this.flushStreamEventBuffer() 747 if (message.type === 'assistant') { 748 clearStreamAccumulatorForMessage(this.streamTextAccumulator, message) 749 } 750 await this.eventUploader.enqueue(this.toClientEvent(message)) 751 } 752 753 /** Wrap a StdoutMessage as a ClientEvent, injecting a UUID if missing. */ 754 private toClientEvent(message: StdoutMessage): ClientEvent { 755 const msg = message as unknown as Record<string, unknown> 756 return { 757 payload: { 758 ...msg, 759 uuid: typeof msg.uuid === 'string' ? msg.uuid : randomUUID(), 760 } as EventPayload, 761 } 762 } 763 764 /** 765 * Drain the stream_event delay buffer: accumulate text_deltas into 766 * full-so-far snapshots, clear the timer, enqueue the resulting events. 767 * Called from the timer, from writeEvent on a non-stream message, and from 768 * flush(). close() drops the buffer — call flush() first if you need 769 * delivery. 770 */ 771 private async flushStreamEventBuffer(): Promise<void> { 772 if (this.streamEventTimer) { 773 clearTimeout(this.streamEventTimer) 774 this.streamEventTimer = null 775 } 776 if (this.streamEventBuffer.length === 0) return 777 const buffered = this.streamEventBuffer 778 this.streamEventBuffer = [] 779 const payloads = accumulateStreamEvents( 780 buffered, 781 this.streamTextAccumulator, 782 ) 783 await this.eventUploader.enqueue( 784 payloads.map(payload => ({ payload, ephemeral: true })), 785 ) 786 } 787 788 /** 789 * Write an internal worker event via POST /sessions/{id}/worker/internal-events. 790 * These events are NOT visible to frontend clients — they store worker-internal 791 * state (transcript messages, compaction markers) needed for session resume. 792 */ 793 async writeInternalEvent( 794 eventType: string, 795 payload: Record<string, unknown>, 796 { 797 isCompaction = false, 798 agentId, 799 }: { 800 isCompaction?: boolean 801 agentId?: string 802 } = {}, 803 ): Promise<void> { 804 const event: WorkerEvent = { 805 payload: { 806 type: eventType, 807 ...payload, 808 uuid: typeof payload.uuid === 'string' ? payload.uuid : randomUUID(), 809 } as EventPayload, 810 ...(isCompaction && { is_compaction: true }), 811 ...(agentId && { agent_id: agentId }), 812 } 813 await this.internalEventUploader.enqueue(event) 814 } 815 816 /** 817 * Flush pending internal events. Call between turns and on shutdown 818 * to ensure transcript entries are persisted. 819 */ 820 flushInternalEvents(): Promise<void> { 821 return this.internalEventUploader.flush() 822 } 823 824 /** 825 * Flush pending client events (writeEvent queue). Call before close() 826 * when the caller needs delivery confirmation — close() abandons the 827 * queue. Resolves once the uploader drains or rejects; returns 828 * regardless of whether individual POSTs succeeded (check server state 829 * separately if that matters). 830 */ 831 async flush(): Promise<void> { 832 await this.flushStreamEventBuffer() 833 return this.eventUploader.flush() 834 } 835 836 /** 837 * Read foreground agent internal events from 838 * GET /sessions/{id}/worker/internal-events. 839 * Returns transcript entries from the last compaction boundary, or null on failure. 840 * Used for session resume. 841 */ 842 async readInternalEvents(): Promise<InternalEvent[] | null> { 843 return this.paginatedGet('/worker/internal-events', {}, 'internal_events') 844 } 845 846 /** 847 * Read all subagent internal events from 848 * GET /sessions/{id}/worker/internal-events?subagents=true. 849 * Returns a merged stream across all non-foreground agents, each from its 850 * compaction point. Used for session resume. 851 */ 852 async readSubagentInternalEvents(): Promise<InternalEvent[] | null> { 853 return this.paginatedGet( 854 '/worker/internal-events', 855 { subagents: 'true' }, 856 'subagent_events', 857 ) 858 } 859 860 /** 861 * Paginated GET with retry. Fetches all pages from a list endpoint, 862 * retrying each page on failure with exponential backoff + jitter. 863 */ 864 private async paginatedGet( 865 path: string, 866 params: Record<string, string>, 867 context: string, 868 ): Promise<InternalEvent[] | null> { 869 const authHeaders = this.getAuthHeaders() 870 if (Object.keys(authHeaders).length === 0) return null 871 872 const allEvents: InternalEvent[] = [] 873 let cursor: string | undefined 874 875 do { 876 const url = new URL(`${this.sessionBaseUrl}${path}`) 877 for (const [k, v] of Object.entries(params)) { 878 url.searchParams.set(k, v) 879 } 880 if (cursor) { 881 url.searchParams.set('cursor', cursor) 882 } 883 884 const page = await this.getWithRetry<ListInternalEventsResponse>( 885 url.toString(), 886 authHeaders, 887 context, 888 ) 889 if (!page) return null 890 891 allEvents.push(...(page.data ?? [])) 892 cursor = page.next_cursor 893 } while (cursor) 894 895 logForDebugging( 896 `CCRClient: Read ${allEvents.length} internal events from ${path}${params.subagents ? ' (subagents)' : ''}`, 897 ) 898 return allEvents 899 } 900 901 /** 902 * Single GET request with retry. Returns the parsed response body 903 * on success, null if all retries are exhausted. 904 */ 905 private async getWithRetry<T>( 906 url: string, 907 authHeaders: Record<string, string>, 908 context: string, 909 ): Promise<T | null> { 910 for (let attempt = 1; attempt <= 10; attempt++) { 911 let response 912 try { 913 response = await this.http.get<T>(url, { 914 headers: { 915 ...authHeaders, 916 'anthropic-version': '2023-06-01', 917 'User-Agent': getClaudeCodeUserAgent(), 918 }, 919 validateStatus: alwaysValidStatus, 920 timeout: 30_000, 921 }) 922 } catch (error) { 923 logForDebugging( 924 `CCRClient: GET ${url} failed (attempt ${attempt}/10): ${errorMessage(error)}`, 925 { level: 'warn' }, 926 ) 927 if (attempt < 10) { 928 const delay = 929 Math.min(500 * 2 ** (attempt - 1), 30_000) + Math.random() * 500 930 await sleep(delay) 931 } 932 continue 933 } 934 935 if (response.status >= 200 && response.status < 300) { 936 return response.data 937 } 938 if (response.status === 409) { 939 this.handleEpochMismatch() 940 } 941 logForDebugging( 942 `CCRClient: GET ${url} returned ${response.status} (attempt ${attempt}/10)`, 943 { level: 'warn' }, 944 ) 945 946 if (attempt < 10) { 947 const delay = 948 Math.min(500 * 2 ** (attempt - 1), 30_000) + Math.random() * 500 949 await sleep(delay) 950 } 951 } 952 953 logForDebugging('CCRClient: GET retries exhausted', { level: 'error' }) 954 logForDiagnosticsNoPII('error', 'cli_worker_get_retries_exhausted', { 955 context, 956 }) 957 return null 958 } 959 960 /** 961 * Report delivery status for a client-to-worker event. 962 * POST /v1/code/sessions/{id}/worker/events/delivery (batch endpoint) 963 */ 964 reportDelivery( 965 eventId: string, 966 status: 'received' | 'processing' | 'processed', 967 ): void { 968 void this.deliveryUploader.enqueue({ eventId, status }) 969 } 970 971 /** Get the current epoch (for external use). */ 972 getWorkerEpoch(): number { 973 return this.workerEpoch 974 } 975 976 /** Internal-event queue depth — shutdown-snapshot backpressure signal. */ 977 get internalEventsPending(): number { 978 return this.internalEventUploader.pendingCount 979 } 980 981 /** Clean up uploaders and timers. */ 982 close(): void { 983 this.closed = true 984 this.stopHeartbeat() 985 unregisterSessionActivityCallback() 986 if (this.streamEventTimer) { 987 clearTimeout(this.streamEventTimer) 988 this.streamEventTimer = null 989 } 990 this.streamEventBuffer = [] 991 this.streamTextAccumulator.byMessage.clear() 992 this.streamTextAccumulator.scopeToMessage.clear() 993 this.workerState.close() 994 this.eventUploader.close() 995 this.internalEventUploader.close() 996 this.deliveryUploader.close() 997 } 998}