source dump of claude code
at main 1008 lines 39 kB view raw
1// biome-ignore-all assist/source/organizeImports: ANT-ONLY import markers must not be reordered 2/** 3 * Env-less Remote Control bridge core. 4 * 5 * "Env-less" = no Environments API layer. Distinct from "CCR v2" (the 6 * /worker/* transport protocol) — the env-based path (replBridge.ts) can also 7 * use CCR v2 transport via CLAUDE_CODE_USE_CCR_V2. This file is about removing 8 * the poll/dispatch layer, not about which transport protocol is underneath. 9 * 10 * Unlike initBridgeCore (env-based, ~2400 lines), this connects directly 11 * to the session-ingress layer without the Environments API work-dispatch 12 * layer: 13 * 14 * 1. POST /v1/code/sessions (OAuth, no env_id) → session.id 15 * 2. POST /v1/code/sessions/{id}/bridge (OAuth) → {worker_jwt, expires_in, api_base_url, worker_epoch} 16 * Each /bridge call bumps epoch — it IS the register. No separate /worker/register. 17 * 3. createV2ReplTransport(worker_jwt, worker_epoch) → SSE + CCRClient 18 * 4. createTokenRefreshScheduler → proactive /bridge re-call (new JWT + new epoch) 19 * 5. 401 on SSE → rebuild transport with fresh /bridge credentials (same seq-num) 20 * 21 * No register/poll/ack/stop/heartbeat/deregister environment lifecycle. 22 * The Environments API historically existed because CCR's /worker/* 23 * endpoints required a session_id+role=worker JWT that only the work-dispatch 24 * layer could mint. Server PR #292605 (renamed in #293280) adds the /bridge endpoint as a direct 25 * OAuth→worker_jwt exchange, making the env layer optional for REPL sessions. 26 * 27 * Gated by `tengu_bridge_repl_v2` GrowthBook flag in initReplBridge.ts. 28 * REPL-only — daemon/print stay on env-based. 29 */ 30 31import { feature } from 'bun:bundle' 32import axios from 'axios' 33import { 34 createV2ReplTransport, 35 type ReplBridgeTransport, 36} from './replBridgeTransport.js' 37import { buildCCRv2SdkUrl } from './workSecret.js' 38import { toCompatSessionId } from './sessionIdCompat.js' 39import { FlushGate } from './flushGate.js' 40import { createTokenRefreshScheduler } from './jwtUtils.js' 41import { getTrustedDeviceToken } from './trustedDevice.js' 42import { 43 getEnvLessBridgeConfig, 44 type EnvLessBridgeConfig, 45} from './envLessBridgeConfig.js' 46import { 47 handleIngressMessage, 48 handleServerControlRequest, 49 makeResultMessage, 50 isEligibleBridgeMessage, 51 extractTitleText, 52 BoundedUUIDSet, 53} from './bridgeMessaging.js' 54import { logBridgeSkip } from './debugUtils.js' 55import { logForDebugging } from '../utils/debug.js' 56import { logForDiagnosticsNoPII } from '../utils/diagLogs.js' 57import { isInProtectedNamespace } from '../utils/envUtils.js' 58import { errorMessage } from '../utils/errors.js' 59import { sleep } from '../utils/sleep.js' 60import { registerCleanup } from '../utils/cleanupRegistry.js' 61import { 62 type AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 63 logEvent, 64} from '../services/analytics/index.js' 65import type { ReplBridgeHandle, BridgeState } from './replBridge.js' 66import type { Message } from '../types/message.js' 67import type { SDKMessage } from '../entrypoints/agentSdkTypes.js' 68import type { 69 SDKControlRequest, 70 SDKControlResponse, 71} from '../entrypoints/sdk/controlTypes.js' 72import type { PermissionMode } from '../utils/permissions/PermissionMode.js' 73 74const ANTHROPIC_VERSION = '2023-06-01' 75 76// Telemetry discriminator for ws_connected. 'initial' is the default and 77// never passed to rebuildTransport (which can only be called post-init); 78// Exclude<> makes that constraint explicit at both signatures. 79type ConnectCause = 'initial' | 'proactive_refresh' | 'auth_401_recovery' 80 81function oauthHeaders(accessToken: string): Record<string, string> { 82 return { 83 Authorization: `Bearer ${accessToken}`, 84 'Content-Type': 'application/json', 85 'anthropic-version': ANTHROPIC_VERSION, 86 } 87} 88 89export type EnvLessBridgeParams = { 90 baseUrl: string 91 orgUUID: string 92 title: string 93 getAccessToken: () => string | undefined 94 onAuth401?: (staleAccessToken: string) => Promise<boolean> 95 /** 96 * Converts internal Message[] → SDKMessage[] for writeMessages() and the 97 * initial-flush/drain paths. Injected rather than imported — mappers.ts 98 * transitively pulls in src/commands.ts (entire command registry + React 99 * tree) which would bloat bundles that don't already have it. 100 */ 101 toSDKMessages: (messages: Message[]) => SDKMessage[] 102 initialHistoryCap: number 103 initialMessages?: Message[] 104 onInboundMessage?: (msg: SDKMessage) => void | Promise<void> 105 /** 106 * Fired on each title-worthy user message seen in writeMessages() until 107 * the callback returns true (done). Mirrors replBridge.ts's onUserMessage — 108 * caller derives a title and PATCHes /v1/sessions/{id} so auto-started 109 * sessions don't stay at the generic fallback. The caller owns the 110 * derive-at-count-1-and-3 policy; the transport just keeps calling until 111 * told to stop. sessionId is the raw cse_* — updateBridgeSessionTitle 112 * retags internally. 113 */ 114 onUserMessage?: (text: string, sessionId: string) => boolean 115 onPermissionResponse?: (response: SDKControlResponse) => void 116 onInterrupt?: () => void 117 onSetModel?: (model: string | undefined) => void 118 onSetMaxThinkingTokens?: (maxTokens: number | null) => void 119 onSetPermissionMode?: ( 120 mode: PermissionMode, 121 ) => { ok: true } | { ok: false; error: string } 122 onStateChange?: (state: BridgeState, detail?: string) => void 123 /** 124 * When true, skip opening the SSE read stream — only the CCRClient write 125 * path is activated. Threaded to createV2ReplTransport and 126 * handleServerControlRequest. 127 */ 128 outboundOnly?: boolean 129 /** Free-form tags for session categorization (e.g. ['ccr-mirror']). */ 130 tags?: string[] 131} 132 133/** 134 * Create a session, fetch a worker JWT, connect the v2 transport. 135 * 136 * Returns null on any pre-flight failure (session create failed, /bridge 137 * failed, transport setup failed). Caller (initReplBridge) surfaces this 138 * as a generic "initialization failed" state. 139 */ 140export async function initEnvLessBridgeCore( 141 params: EnvLessBridgeParams, 142): Promise<ReplBridgeHandle | null> { 143 const { 144 baseUrl, 145 orgUUID, 146 title, 147 getAccessToken, 148 onAuth401, 149 toSDKMessages, 150 initialHistoryCap, 151 initialMessages, 152 onInboundMessage, 153 onUserMessage, 154 onPermissionResponse, 155 onInterrupt, 156 onSetModel, 157 onSetMaxThinkingTokens, 158 onSetPermissionMode, 159 onStateChange, 160 outboundOnly, 161 tags, 162 } = params 163 164 const cfg = await getEnvLessBridgeConfig() 165 166 // ── 1. Create session (POST /v1/code/sessions, no env_id) ─────────────── 167 const accessToken = getAccessToken() 168 if (!accessToken) { 169 logForDebugging('[remote-bridge] No OAuth token') 170 return null 171 } 172 173 const createdSessionId = await withRetry( 174 () => 175 createCodeSession(baseUrl, accessToken, title, cfg.http_timeout_ms, tags), 176 'createCodeSession', 177 cfg, 178 ) 179 if (!createdSessionId) { 180 onStateChange?.('failed', 'Session creation failed — see debug log') 181 logBridgeSkip('v2_session_create_failed', undefined, true) 182 return null 183 } 184 const sessionId: string = createdSessionId 185 logForDebugging(`[remote-bridge] Created session ${sessionId}`) 186 logForDiagnosticsNoPII('info', 'bridge_repl_v2_session_created') 187 188 // ── 2. Fetch bridge credentials (POST /bridge → worker_jwt, expires_in, api_base_url) ── 189 const credentials = await withRetry( 190 () => 191 fetchRemoteCredentials( 192 sessionId, 193 baseUrl, 194 accessToken, 195 cfg.http_timeout_ms, 196 ), 197 'fetchRemoteCredentials', 198 cfg, 199 ) 200 if (!credentials) { 201 onStateChange?.('failed', 'Remote credentials fetch failed — see debug log') 202 logBridgeSkip('v2_remote_creds_failed', undefined, true) 203 void archiveSession( 204 sessionId, 205 baseUrl, 206 accessToken, 207 orgUUID, 208 cfg.http_timeout_ms, 209 ) 210 return null 211 } 212 logForDebugging( 213 `[remote-bridge] Fetched bridge credentials (expires_in=${credentials.expires_in}s)`, 214 ) 215 216 // ── 3. Build v2 transport (SSETransport + CCRClient) ──────────────────── 217 const sessionUrl = buildCCRv2SdkUrl(credentials.api_base_url, sessionId) 218 logForDebugging(`[remote-bridge] v2 session URL: ${sessionUrl}`) 219 220 let transport: ReplBridgeTransport 221 try { 222 transport = await createV2ReplTransport({ 223 sessionUrl, 224 ingressToken: credentials.worker_jwt, 225 sessionId, 226 epoch: credentials.worker_epoch, 227 heartbeatIntervalMs: cfg.heartbeat_interval_ms, 228 heartbeatJitterFraction: cfg.heartbeat_jitter_fraction, 229 // Per-instance closure — keeps the worker JWT out of 230 // process.env.CLAUDE_CODE_SESSION_ACCESS_TOKEN, which mcp/client.ts 231 // reads ungatedly and would otherwise send to user-configured ws/http 232 // MCP servers. Frozen-at-construction is correct: transport is fully 233 // rebuilt on refresh (rebuildTransport below). 234 getAuthToken: () => credentials.worker_jwt, 235 outboundOnly, 236 }) 237 } catch (err) { 238 logForDebugging( 239 `[remote-bridge] v2 transport setup failed: ${errorMessage(err)}`, 240 { level: 'error' }, 241 ) 242 onStateChange?.('failed', `Transport setup failed: ${errorMessage(err)}`) 243 logBridgeSkip('v2_transport_setup_failed', undefined, true) 244 void archiveSession( 245 sessionId, 246 baseUrl, 247 accessToken, 248 orgUUID, 249 cfg.http_timeout_ms, 250 ) 251 return null 252 } 253 logForDebugging( 254 `[remote-bridge] v2 transport created (epoch=${credentials.worker_epoch})`, 255 ) 256 onStateChange?.('ready') 257 258 // ── 4. State ──────────────────────────────────────────────────────────── 259 260 // Echo dedup: messages we POST come back on the read stream. Seeded with 261 // initial message UUIDs so server echoes of flushed history are recognized. 262 // Both sets cover initial UUIDs — recentPostedUUIDs is a 2000-cap ring buffer 263 // and could evict them after enough live writes; initialMessageUUIDs is the 264 // unbounded fallback. Defense-in-depth; mirrors replBridge.ts. 265 const recentPostedUUIDs = new BoundedUUIDSet(cfg.uuid_dedup_buffer_size) 266 const initialMessageUUIDs = new Set<string>() 267 if (initialMessages) { 268 for (const msg of initialMessages) { 269 initialMessageUUIDs.add(msg.uuid) 270 recentPostedUUIDs.add(msg.uuid) 271 } 272 } 273 274 // Defensive dedup for re-delivered inbound prompts (seq-num negotiation 275 // edge cases, server history replay after transport swap). 276 const recentInboundUUIDs = new BoundedUUIDSet(cfg.uuid_dedup_buffer_size) 277 278 // FlushGate: queue live writes while the history flush POST is in flight, 279 // so the server receives [history..., live...] in order. 280 const flushGate = new FlushGate<Message>() 281 282 let initialFlushDone = false 283 let tornDown = false 284 let authRecoveryInFlight = false 285 // Latch for onUserMessage — flips true when the callback returns true 286 // (policy says "done deriving"). sessionId is const (no re-create path — 287 // rebuildTransport swaps JWT/epoch, same session), so no reset needed. 288 let userMessageCallbackDone = !onUserMessage 289 290 // Telemetry: why did onConnect fire? Set by rebuildTransport before 291 // wireTransportCallbacks; read asynchronously by onConnect. Race-safe 292 // because authRecoveryInFlight serializes rebuild callers, and a fresh 293 // initEnvLessBridgeCore() call gets a fresh closure defaulting to 'initial'. 294 let connectCause: ConnectCause = 'initial' 295 296 // Deadline for onConnect after transport.connect(). Cleared by onConnect 297 // (connected) and onClose (got a close — not silent). If neither fires 298 // before cfg.connect_timeout_ms, onConnectTimeout emits — the only 299 // signal for the `started → (silence)` gap. 300 let connectDeadline: ReturnType<typeof setTimeout> | undefined 301 function onConnectTimeout(cause: ConnectCause): void { 302 if (tornDown) return 303 logEvent('tengu_bridge_repl_connect_timeout', { 304 v2: true, 305 elapsed_ms: cfg.connect_timeout_ms, 306 cause: 307 cause as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 308 }) 309 } 310 311 // ── 5. JWT refresh scheduler ──────────────────────────────────────────── 312 // Schedule a callback 5min before expiry (per response.expires_in). On fire, 313 // re-fetch /bridge with OAuth → rebuild transport with fresh credentials. 314 // Each /bridge call bumps epoch server-side, so a JWT-only swap would leave 315 // the old CCRClient heartbeating with a stale epoch → 409 within 20s. 316 // JWT is opaque — do not decode. 317 const refresh = createTokenRefreshScheduler({ 318 refreshBufferMs: cfg.token_refresh_buffer_ms, 319 getAccessToken: async () => { 320 // Unconditionally refresh OAuth before calling /bridge — getAccessToken() 321 // returns expired tokens as non-null strings (doesn't check expiresAt), 322 // so truthiness doesn't mean valid. Pass the stale token to onAuth401 323 // so handleOAuth401Error's keychain-comparison can detect parallel refresh. 324 const stale = getAccessToken() 325 if (onAuth401) await onAuth401(stale ?? '') 326 return getAccessToken() ?? stale 327 }, 328 onRefresh: (sid, oauthToken) => { 329 void (async () => { 330 // Laptop wake: overdue proactive timer + SSE 401 fire ~simultaneously. 331 // Claim the flag BEFORE the /bridge fetch so the other path skips 332 // entirely — prevents double epoch bump (each /bridge call bumps; if 333 // both fetch, the first rebuild gets a stale epoch and 409s). 334 if (authRecoveryInFlight || tornDown) { 335 logForDebugging( 336 '[remote-bridge] Recovery already in flight, skipping proactive refresh', 337 ) 338 return 339 } 340 authRecoveryInFlight = true 341 try { 342 const fresh = await withRetry( 343 () => 344 fetchRemoteCredentials( 345 sid, 346 baseUrl, 347 oauthToken, 348 cfg.http_timeout_ms, 349 ), 350 'fetchRemoteCredentials (proactive)', 351 cfg, 352 ) 353 if (!fresh || tornDown) return 354 await rebuildTransport(fresh, 'proactive_refresh') 355 logForDebugging( 356 '[remote-bridge] Transport rebuilt (proactive refresh)', 357 ) 358 } catch (err) { 359 logForDebugging( 360 `[remote-bridge] Proactive refresh rebuild failed: ${errorMessage(err)}`, 361 { level: 'error' }, 362 ) 363 logForDiagnosticsNoPII( 364 'error', 365 'bridge_repl_v2_proactive_refresh_failed', 366 ) 367 if (!tornDown) { 368 onStateChange?.('failed', `Refresh failed: ${errorMessage(err)}`) 369 } 370 } finally { 371 authRecoveryInFlight = false 372 } 373 })() 374 }, 375 label: 'remote', 376 }) 377 refresh.scheduleFromExpiresIn(sessionId, credentials.expires_in) 378 379 // ── 6. Wire callbacks (extracted so transport-rebuild can re-wire) ────── 380 function wireTransportCallbacks(): void { 381 transport.setOnConnect(() => { 382 clearTimeout(connectDeadline) 383 logForDebugging('[remote-bridge] v2 transport connected') 384 logForDiagnosticsNoPII('info', 'bridge_repl_v2_transport_connected') 385 logEvent('tengu_bridge_repl_ws_connected', { 386 v2: true, 387 cause: 388 connectCause as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 389 }) 390 391 if (!initialFlushDone && initialMessages && initialMessages.length > 0) { 392 initialFlushDone = true 393 // Capture current transport — if 401/teardown happens mid-flush, 394 // the stale .finally() must not drain the gate or signal connected. 395 // (Same guard pattern as replBridge.ts:1119.) 396 const flushTransport = transport 397 void flushHistory(initialMessages) 398 .catch(e => 399 logForDebugging(`[remote-bridge] flushHistory failed: ${e}`), 400 ) 401 .finally(() => { 402 // authRecoveryInFlight catches the v1-vs-v2 asymmetry: v1 nulls 403 // transport synchronously in setOnClose (replBridge.ts:1175), so 404 // transport !== flushTransport trips immediately. v2 doesn't null — 405 // transport reassigned only at rebuildTransport:346, 3 awaits deep. 406 // authRecoveryInFlight is set synchronously at rebuildTransport entry. 407 if ( 408 transport !== flushTransport || 409 tornDown || 410 authRecoveryInFlight 411 ) { 412 return 413 } 414 drainFlushGate() 415 onStateChange?.('connected') 416 }) 417 } else if (!flushGate.active) { 418 onStateChange?.('connected') 419 } 420 }) 421 422 transport.setOnData((data: string) => { 423 handleIngressMessage( 424 data, 425 recentPostedUUIDs, 426 recentInboundUUIDs, 427 onInboundMessage, 428 // Remote client answered the permission prompt — the turn resumes. 429 // Without this the server stays on requires_action until the next 430 // user message or turn-end result. 431 onPermissionResponse 432 ? res => { 433 transport.reportState('running') 434 onPermissionResponse(res) 435 } 436 : undefined, 437 req => 438 handleServerControlRequest(req, { 439 transport, 440 sessionId, 441 onInterrupt, 442 onSetModel, 443 onSetMaxThinkingTokens, 444 onSetPermissionMode, 445 outboundOnly, 446 }), 447 ) 448 }) 449 450 transport.setOnClose((code?: number) => { 451 clearTimeout(connectDeadline) 452 if (tornDown) return 453 logForDebugging(`[remote-bridge] v2 transport closed (code=${code})`) 454 logEvent('tengu_bridge_repl_ws_closed', { code, v2: true }) 455 // onClose fires only for TERMINAL failures: 401 (JWT invalid), 456 // 4090 (CCR epoch mismatch), 4091 (CCR init failed), or SSE 10-min 457 // reconnect budget exhausted. Transient disconnects are handled 458 // transparently inside SSETransport. 401 we can recover from (fetch 459 // fresh JWT, rebuild transport); all other codes are dead-ends. 460 if (code === 401 && !authRecoveryInFlight) { 461 void recoverFromAuthFailure() 462 return 463 } 464 onStateChange?.('failed', `Transport closed (code ${code})`) 465 }) 466 } 467 468 // ── 7. Transport rebuild (shared by proactive refresh + 401 recovery) ── 469 // Every /bridge call bumps epoch server-side. Both refresh paths must 470 // rebuild the transport with the new epoch — a JWT-only swap leaves the 471 // old CCRClient heartbeating stale epoch → 409. SSE resumes from the old 472 // transport's high-water-mark seq-num so no server-side replay. 473 // Caller MUST set authRecoveryInFlight = true before calling (synchronously, 474 // before any await) and clear it in a finally. This function doesn't manage 475 // the flag — moving it here would be too late to prevent a double /bridge 476 // fetch, and each fetch bumps epoch. 477 async function rebuildTransport( 478 fresh: RemoteCredentials, 479 cause: Exclude<ConnectCause, 'initial'>, 480 ): Promise<void> { 481 connectCause = cause 482 // Queue writes during rebuild — once /bridge returns, the old transport's 483 // epoch is stale and its next write/heartbeat 409s. Without this gate, 484 // writeMessages adds UUIDs to recentPostedUUIDs then writeBatch silently 485 // no-ops (closed uploader after 409) → permanent silent message loss. 486 flushGate.start() 487 try { 488 const seq = transport.getLastSequenceNum() 489 transport.close() 490 transport = await createV2ReplTransport({ 491 sessionUrl: buildCCRv2SdkUrl(fresh.api_base_url, sessionId), 492 ingressToken: fresh.worker_jwt, 493 sessionId, 494 epoch: fresh.worker_epoch, 495 heartbeatIntervalMs: cfg.heartbeat_interval_ms, 496 heartbeatJitterFraction: cfg.heartbeat_jitter_fraction, 497 initialSequenceNum: seq, 498 getAuthToken: () => fresh.worker_jwt, 499 outboundOnly, 500 }) 501 if (tornDown) { 502 // Teardown fired during the async createV2ReplTransport window. 503 // Don't wire/connect/schedule — we'd re-arm timers after cancelAll() 504 // and fire onInboundMessage into a torn-down bridge. 505 transport.close() 506 return 507 } 508 wireTransportCallbacks() 509 transport.connect() 510 connectDeadline = setTimeout( 511 onConnectTimeout, 512 cfg.connect_timeout_ms, 513 connectCause, 514 ) 515 refresh.scheduleFromExpiresIn(sessionId, fresh.expires_in) 516 // Drain queued writes into the new uploader. Runs before 517 // ccr.initialize() resolves (transport.connect() is fire-and-forget), 518 // but the uploader serializes behind the initial PUT /worker. If 519 // init fails (4091), events drop — but only recentPostedUUIDs 520 // (per-instance) is populated, so re-enabling the bridge re-flushes. 521 drainFlushGate() 522 } finally { 523 // End the gate on failure paths too — drainFlushGate already ended 524 // it on success. Queued messages are dropped (transport still dead). 525 flushGate.drop() 526 } 527 } 528 529 // ── 8. 401 recovery (OAuth refresh + rebuild) ─────────────────────────── 530 async function recoverFromAuthFailure(): Promise<void> { 531 // setOnClose already guards `!authRecoveryInFlight` but that check and 532 // this set must be atomic against onRefresh — claim synchronously before 533 // any await. Laptop wake fires both paths ~simultaneously. 534 if (authRecoveryInFlight) return 535 authRecoveryInFlight = true 536 onStateChange?.('reconnecting', 'JWT expired — refreshing') 537 logForDebugging('[remote-bridge] 401 on SSE — attempting JWT refresh') 538 try { 539 // Unconditionally try OAuth refresh — getAccessToken() returns expired 540 // tokens as non-null strings, so !oauthToken doesn't catch expiry. 541 // Pass the stale token so handleOAuth401Error's keychain-comparison 542 // can detect if another tab already refreshed. 543 const stale = getAccessToken() 544 if (onAuth401) await onAuth401(stale ?? '') 545 const oauthToken = getAccessToken() ?? stale 546 if (!oauthToken || tornDown) { 547 if (!tornDown) { 548 onStateChange?.('failed', 'JWT refresh failed: no OAuth token') 549 } 550 return 551 } 552 553 const fresh = await withRetry( 554 () => 555 fetchRemoteCredentials( 556 sessionId, 557 baseUrl, 558 oauthToken, 559 cfg.http_timeout_ms, 560 ), 561 'fetchRemoteCredentials (recovery)', 562 cfg, 563 ) 564 if (!fresh || tornDown) { 565 if (!tornDown) { 566 onStateChange?.('failed', 'JWT refresh failed after 401') 567 } 568 return 569 } 570 // If 401 interrupted the initial flush, writeBatch may have silently 571 // no-op'd on the closed uploader (ccr.close() ran in the SSE wrapper 572 // before our setOnClose callback). Reset so the new onConnect re-flushes. 573 // (v1 scopes initialFlushDone inside the per-transport closure at 574 // replBridge.ts:1027 so it resets naturally; v2 has it at outer scope.) 575 initialFlushDone = false 576 await rebuildTransport(fresh, 'auth_401_recovery') 577 logForDebugging('[remote-bridge] Transport rebuilt after 401') 578 } catch (err) { 579 logForDebugging( 580 `[remote-bridge] 401 recovery failed: ${errorMessage(err)}`, 581 { level: 'error' }, 582 ) 583 logForDiagnosticsNoPII('error', 'bridge_repl_v2_jwt_refresh_failed') 584 if (!tornDown) { 585 onStateChange?.('failed', `JWT refresh failed: ${errorMessage(err)}`) 586 } 587 } finally { 588 authRecoveryInFlight = false 589 } 590 } 591 592 wireTransportCallbacks() 593 594 // Start flushGate BEFORE connect so writeMessages() during handshake 595 // queues instead of racing the history POST. 596 if (initialMessages && initialMessages.length > 0) { 597 flushGate.start() 598 } 599 transport.connect() 600 connectDeadline = setTimeout( 601 onConnectTimeout, 602 cfg.connect_timeout_ms, 603 connectCause, 604 ) 605 606 // ── 8. History flush + drain helpers ──────────────────────────────────── 607 function drainFlushGate(): void { 608 const msgs = flushGate.end() 609 if (msgs.length === 0) return 610 for (const msg of msgs) recentPostedUUIDs.add(msg.uuid) 611 const events = toSDKMessages(msgs).map(m => ({ 612 ...m, 613 session_id: sessionId, 614 })) 615 if (msgs.some(m => m.type === 'user')) { 616 transport.reportState('running') 617 } 618 logForDebugging( 619 `[remote-bridge] Drained ${msgs.length} queued message(s) after flush`, 620 ) 621 void transport.writeBatch(events) 622 } 623 624 async function flushHistory(msgs: Message[]): Promise<void> { 625 // v2 always creates a fresh server session (unconditional createCodeSession 626 // above) — no session reuse, no double-post risk. Unlike v1, we do NOT 627 // filter by previouslyFlushedUUIDs: that set persists across REPL enable/ 628 // disable cycles (useRef), so it would wrongly suppress history on re-enable. 629 const eligible = msgs.filter(isEligibleBridgeMessage) 630 const capped = 631 initialHistoryCap > 0 && eligible.length > initialHistoryCap 632 ? eligible.slice(-initialHistoryCap) 633 : eligible 634 if (capped.length < eligible.length) { 635 logForDebugging( 636 `[remote-bridge] Capped initial flush: ${eligible.length} -> ${capped.length} (cap=${initialHistoryCap})`, 637 ) 638 } 639 const events = toSDKMessages(capped).map(m => ({ 640 ...m, 641 session_id: sessionId, 642 })) 643 if (events.length === 0) return 644 // Mid-turn init: if Remote Control is enabled while a query is running, 645 // the last eligible message is a user prompt or tool_result (both 'user' 646 // type). Without this the init PUT's 'idle' sticks until the next user- 647 // type message forwards via writeMessages — which for a pure-text turn 648 // is never (only assistant chunks stream post-init). Check eligible (pre- 649 // cap), not capped: the cap may truncate to a user message even when the 650 // actual trailing message is assistant. 651 if (eligible.at(-1)?.type === 'user') { 652 transport.reportState('running') 653 } 654 logForDebugging(`[remote-bridge] Flushing ${events.length} history events`) 655 await transport.writeBatch(events) 656 } 657 658 // ── 9. Teardown ─────────────────────────────────────────────────────────── 659 // On SIGINT/SIGTERM/⁠/exit, gracefulShutdown races runCleanupFunctions() 660 // against a 2s cap before forceExit kills the process. Budget accordingly: 661 // - archive: teardown_archive_timeout_ms (default 1500, cap 2000) 662 // - result write: fire-and-forget, archive latency covers the drain 663 // - 401 retry: only if first archive 401s, shares the same budget 664 async function teardown(): Promise<void> { 665 if (tornDown) return 666 tornDown = true 667 refresh.cancelAll() 668 clearTimeout(connectDeadline) 669 flushGate.drop() 670 671 // Fire the result message before archive — transport.write() only awaits 672 // enqueue (SerialBatchEventUploader resolves once buffered, drain is 673 // async). Archiving before close() gives the uploader's drain loop a 674 // window (typical archive ≈ 100-500ms) to POST the result without an 675 // explicit sleep. close() sets closed=true which interrupts drain at the 676 // next while-check, so close-before-archive drops the result. 677 transport.reportState('idle') 678 void transport.write(makeResultMessage(sessionId)) 679 680 let token = getAccessToken() 681 let status = await archiveSession( 682 sessionId, 683 baseUrl, 684 token, 685 orgUUID, 686 cfg.teardown_archive_timeout_ms, 687 ) 688 689 // Token is usually fresh (refresh scheduler runs 5min before expiry) but 690 // laptop-wake past the refresh window leaves getAccessToken() returning a 691 // stale string. Retry once on 401 — onAuth401 (= handleOAuth401Error) 692 // clears keychain cache + force-refreshes. No proactive refresh on the 693 // happy path: handleOAuth401Error force-refreshes even valid tokens, 694 // which would waste budget 99% of the time. try/catch mirrors 695 // recoverFromAuthFailure: keychain reads can throw (macOS locked after 696 // wake); an uncaught throw here would skip transport.close + telemetry. 697 if (status === 401 && onAuth401) { 698 try { 699 await onAuth401(token ?? '') 700 token = getAccessToken() 701 status = await archiveSession( 702 sessionId, 703 baseUrl, 704 token, 705 orgUUID, 706 cfg.teardown_archive_timeout_ms, 707 ) 708 } catch (err) { 709 logForDebugging( 710 `[remote-bridge] Teardown 401 retry threw: ${errorMessage(err)}`, 711 { level: 'error' }, 712 ) 713 } 714 } 715 716 transport.close() 717 718 const archiveStatus: ArchiveTelemetryStatus = 719 status === 'no_token' 720 ? 'skipped_no_token' 721 : status === 'timeout' || status === 'error' 722 ? 'network_error' 723 : status >= 500 724 ? 'server_5xx' 725 : status >= 400 726 ? 'server_4xx' 727 : 'ok' 728 729 logForDebugging(`[remote-bridge] Torn down (archive=${status})`) 730 logForDiagnosticsNoPII('info', 'bridge_repl_v2_teardown') 731 logEvent( 732 feature('CCR_MIRROR') && outboundOnly 733 ? 'tengu_ccr_mirror_teardown' 734 : 'tengu_bridge_repl_teardown', 735 { 736 v2: true, 737 archive_status: 738 archiveStatus as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 739 archive_ok: typeof status === 'number' && status < 400, 740 archive_http_status: typeof status === 'number' ? status : undefined, 741 archive_timeout: status === 'timeout', 742 archive_no_token: status === 'no_token', 743 }, 744 ) 745 } 746 const unregister = registerCleanup(teardown) 747 748 if (feature('CCR_MIRROR') && outboundOnly) { 749 logEvent('tengu_ccr_mirror_started', { 750 v2: true, 751 expires_in_s: credentials.expires_in, 752 }) 753 } else { 754 logEvent('tengu_bridge_repl_started', { 755 has_initial_messages: !!(initialMessages && initialMessages.length > 0), 756 v2: true, 757 expires_in_s: credentials.expires_in, 758 inProtectedNamespace: isInProtectedNamespace(), 759 }) 760 } 761 762 // ── 10. Handle ────────────────────────────────────────────────────────── 763 return { 764 bridgeSessionId: sessionId, 765 environmentId: '', 766 sessionIngressUrl: credentials.api_base_url, 767 writeMessages(messages) { 768 const filtered = messages.filter( 769 m => 770 isEligibleBridgeMessage(m) && 771 !initialMessageUUIDs.has(m.uuid) && 772 !recentPostedUUIDs.has(m.uuid), 773 ) 774 if (filtered.length === 0) return 775 776 // Fire onUserMessage for title derivation. Scan before the flushGate 777 // check — prompts are title-worthy even if they queue. Keeps calling 778 // on every title-worthy message until the callback returns true; the 779 // caller owns the policy (derive at 1st and 3rd, skip if explicit). 780 if (!userMessageCallbackDone) { 781 for (const m of filtered) { 782 const text = extractTitleText(m) 783 if (text !== undefined && onUserMessage?.(text, sessionId)) { 784 userMessageCallbackDone = true 785 break 786 } 787 } 788 } 789 790 if (flushGate.enqueue(...filtered)) { 791 logForDebugging( 792 `[remote-bridge] Queued ${filtered.length} message(s) during flush`, 793 ) 794 return 795 } 796 797 for (const msg of filtered) recentPostedUUIDs.add(msg.uuid) 798 const events = toSDKMessages(filtered).map(m => ({ 799 ...m, 800 session_id: sessionId, 801 })) 802 // v2 does not derive worker_status from events server-side (unlike v1 803 // session-ingress session_status_updater.go). Push it from here so the 804 // CCR web session list shows Running instead of stuck on Idle. A user 805 // message in the batch marks turn start. CCRClient.reportState dedupes 806 // consecutive same-state pushes. 807 if (filtered.some(m => m.type === 'user')) { 808 transport.reportState('running') 809 } 810 logForDebugging(`[remote-bridge] Sending ${filtered.length} message(s)`) 811 void transport.writeBatch(events) 812 }, 813 writeSdkMessages(messages: SDKMessage[]) { 814 const filtered = messages.filter( 815 m => !m.uuid || !recentPostedUUIDs.has(m.uuid), 816 ) 817 if (filtered.length === 0) return 818 for (const msg of filtered) { 819 if (msg.uuid) recentPostedUUIDs.add(msg.uuid) 820 } 821 const events = filtered.map(m => ({ ...m, session_id: sessionId })) 822 void transport.writeBatch(events) 823 }, 824 sendControlRequest(request: SDKControlRequest) { 825 if (authRecoveryInFlight) { 826 logForDebugging( 827 `[remote-bridge] Dropping control_request during 401 recovery: ${request.request_id}`, 828 ) 829 return 830 } 831 const event = { ...request, session_id: sessionId } 832 if (request.request.subtype === 'can_use_tool') { 833 transport.reportState('requires_action') 834 } 835 void transport.write(event) 836 logForDebugging( 837 `[remote-bridge] Sent control_request request_id=${request.request_id}`, 838 ) 839 }, 840 sendControlResponse(response: SDKControlResponse) { 841 if (authRecoveryInFlight) { 842 logForDebugging( 843 '[remote-bridge] Dropping control_response during 401 recovery', 844 ) 845 return 846 } 847 const event = { ...response, session_id: sessionId } 848 transport.reportState('running') 849 void transport.write(event) 850 logForDebugging('[remote-bridge] Sent control_response') 851 }, 852 sendControlCancelRequest(requestId: string) { 853 if (authRecoveryInFlight) { 854 logForDebugging( 855 `[remote-bridge] Dropping control_cancel_request during 401 recovery: ${requestId}`, 856 ) 857 return 858 } 859 const event = { 860 type: 'control_cancel_request' as const, 861 request_id: requestId, 862 session_id: sessionId, 863 } 864 // Hook/classifier/channel/recheck resolved the permission locally — 865 // interactiveHandler calls only cancelRequest (no sendResponse) on 866 // those paths, so without this the server stays on requires_action. 867 transport.reportState('running') 868 void transport.write(event) 869 logForDebugging( 870 `[remote-bridge] Sent control_cancel_request request_id=${requestId}`, 871 ) 872 }, 873 sendResult() { 874 if (authRecoveryInFlight) { 875 logForDebugging('[remote-bridge] Dropping result during 401 recovery') 876 return 877 } 878 transport.reportState('idle') 879 void transport.write(makeResultMessage(sessionId)) 880 logForDebugging(`[remote-bridge] Sent result`) 881 }, 882 async teardown() { 883 unregister() 884 await teardown() 885 }, 886 } 887} 888 889// ─── Session API (v2 /code/sessions, no env) ───────────────────────────────── 890 891/** Retry an async init call with exponential backoff + jitter. */ 892async function withRetry<T>( 893 fn: () => Promise<T | null>, 894 label: string, 895 cfg: EnvLessBridgeConfig, 896): Promise<T | null> { 897 const max = cfg.init_retry_max_attempts 898 for (let attempt = 1; attempt <= max; attempt++) { 899 const result = await fn() 900 if (result !== null) return result 901 if (attempt < max) { 902 const base = cfg.init_retry_base_delay_ms * 2 ** (attempt - 1) 903 const jitter = 904 base * cfg.init_retry_jitter_fraction * (2 * Math.random() - 1) 905 const delay = Math.min(base + jitter, cfg.init_retry_max_delay_ms) 906 logForDebugging( 907 `[remote-bridge] ${label} failed (attempt ${attempt}/${max}), retrying in ${Math.round(delay)}ms`, 908 ) 909 await sleep(delay) 910 } 911 } 912 return null 913} 914 915// Moved to codeSessionApi.ts so the SDK /bridge subpath can bundle them 916// without pulling in this file's heavy CLI tree (analytics, transport). 917export { 918 createCodeSession, 919 type RemoteCredentials, 920} from './codeSessionApi.js' 921import { 922 createCodeSession, 923 fetchRemoteCredentials as fetchRemoteCredentialsRaw, 924 type RemoteCredentials, 925} from './codeSessionApi.js' 926import { getBridgeBaseUrlOverride } from './bridgeConfig.js' 927 928// CLI-side wrapper that applies the CLAUDE_BRIDGE_BASE_URL dev override and 929// injects the trusted-device token (both are env/GrowthBook reads that the 930// SDK-facing codeSessionApi.ts export must stay free of). 931export async function fetchRemoteCredentials( 932 sessionId: string, 933 baseUrl: string, 934 accessToken: string, 935 timeoutMs: number, 936): Promise<RemoteCredentials | null> { 937 const creds = await fetchRemoteCredentialsRaw( 938 sessionId, 939 baseUrl, 940 accessToken, 941 timeoutMs, 942 getTrustedDeviceToken(), 943 ) 944 if (!creds) return null 945 return getBridgeBaseUrlOverride() 946 ? { ...creds, api_base_url: baseUrl } 947 : creds 948} 949 950type ArchiveStatus = number | 'timeout' | 'error' | 'no_token' 951 952// Single categorical for BQ `GROUP BY archive_status`. The booleans on 953// _teardown predate this and are redundant with it (except archive_timeout, 954// which distinguishes ECONNABORTED from other network errors — both map to 955// 'network_error' here since the dominant cause in a 1.5s window is timeout). 956type ArchiveTelemetryStatus = 957 | 'ok' 958 | 'skipped_no_token' 959 | 'network_error' 960 | 'server_4xx' 961 | 'server_5xx' 962 963async function archiveSession( 964 sessionId: string, 965 baseUrl: string, 966 accessToken: string | undefined, 967 orgUUID: string, 968 timeoutMs: number, 969): Promise<ArchiveStatus> { 970 if (!accessToken) return 'no_token' 971 // Archive lives at the compat layer (/v1/sessions/*, not /v1/code/sessions). 972 // compat.parseSessionID only accepts TagSession (session_*), so retag cse_*. 973 // anthropic-beta + x-organization-uuid are required — without them the 974 // compat gateway 404s before reaching the handler. 975 // 976 // Unlike bridgeMain.ts (which caches compatId in sessionCompatIds to keep 977 // in-memory titledSessions/logger keys consistent across a mid-session 978 // gate flip), this compatId is only a server URL path segment — no 979 // in-memory state. Fresh compute matches whatever the server currently 980 // validates: if the gate is OFF, the server has been updated to accept 981 // cse_* and we correctly send it. 982 const compatId = toCompatSessionId(sessionId) 983 try { 984 const response = await axios.post( 985 `${baseUrl}/v1/sessions/${compatId}/archive`, 986 {}, 987 { 988 headers: { 989 ...oauthHeaders(accessToken), 990 'anthropic-beta': 'ccr-byoc-2025-07-29', 991 'x-organization-uuid': orgUUID, 992 }, 993 timeout: timeoutMs, 994 validateStatus: () => true, 995 }, 996 ) 997 logForDebugging( 998 `[remote-bridge] Archive ${compatId} status=${response.status}`, 999 ) 1000 return response.status 1001 } catch (err) { 1002 const msg = errorMessage(err) 1003 logForDebugging(`[remote-bridge] Archive failed: ${msg}`) 1004 return axios.isAxiosError(err) && err.code === 'ECONNABORTED' 1005 ? 'timeout' 1006 : 'error' 1007 } 1008}