// biome-ignore-all assist/source/organizeImports: ANT-ONLY import markers must not be reordered /** * Env-less Remote Control bridge core. * * "Env-less" = no Environments API layer. Distinct from "CCR v2" (the * /worker/* transport protocol) — the env-based path (replBridge.ts) can also * use CCR v2 transport via CLAUDE_CODE_USE_CCR_V2. This file is about removing * the poll/dispatch layer, not about which transport protocol is underneath. * * Unlike initBridgeCore (env-based, ~2400 lines), this connects directly * to the session-ingress layer without the Environments API work-dispatch * layer: * * 1. POST /v1/code/sessions (OAuth, no env_id) → session.id * 2. POST /v1/code/sessions/{id}/bridge (OAuth) → {worker_jwt, expires_in, api_base_url, worker_epoch} * Each /bridge call bumps epoch — it IS the register. No separate /worker/register. * 3. createV2ReplTransport(worker_jwt, worker_epoch) → SSE + CCRClient * 4. createTokenRefreshScheduler → proactive /bridge re-call (new JWT + new epoch) * 5. 401 on SSE → rebuild transport with fresh /bridge credentials (same seq-num) * * No register/poll/ack/stop/heartbeat/deregister environment lifecycle. * The Environments API historically existed because CCR's /worker/* * endpoints required a session_id+role=worker JWT that only the work-dispatch * layer could mint. Server PR #292605 (renamed in #293280) adds the /bridge endpoint as a direct * OAuth→worker_jwt exchange, making the env layer optional for REPL sessions. * * Gated by `tengu_bridge_repl_v2` GrowthBook flag in initReplBridge.ts. * REPL-only — daemon/print stay on env-based. */ import { feature } from 'bun:bundle' import axios from 'axios' import { createV2ReplTransport, type ReplBridgeTransport, } from './replBridgeTransport.js' import { buildCCRv2SdkUrl } from './workSecret.js' import { toCompatSessionId } from './sessionIdCompat.js' import { FlushGate } from './flushGate.js' import { createTokenRefreshScheduler } from './jwtUtils.js' import { getTrustedDeviceToken } from './trustedDevice.js' import { getEnvLessBridgeConfig, type EnvLessBridgeConfig, } from './envLessBridgeConfig.js' import { handleIngressMessage, handleServerControlRequest, makeResultMessage, isEligibleBridgeMessage, extractTitleText, BoundedUUIDSet, } from './bridgeMessaging.js' import { logBridgeSkip } from './debugUtils.js' import { logForDebugging } from '../utils/debug.js' import { logForDiagnosticsNoPII } from '../utils/diagLogs.js' import { isInProtectedNamespace } from '../utils/envUtils.js' import { errorMessage } from '../utils/errors.js' import { sleep } from '../utils/sleep.js' import { registerCleanup } from '../utils/cleanupRegistry.js' import { type AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, logEvent, } from '../services/analytics/index.js' import type { ReplBridgeHandle, BridgeState } from './replBridge.js' import type { Message } from '../types/message.js' import type { SDKMessage } from '../entrypoints/agentSdkTypes.js' import type { SDKControlRequest, SDKControlResponse, } from '../entrypoints/sdk/controlTypes.js' import type { PermissionMode } from '../utils/permissions/PermissionMode.js' const ANTHROPIC_VERSION = '2023-06-01' // Telemetry discriminator for ws_connected. 'initial' is the default and // never passed to rebuildTransport (which can only be called post-init); // Exclude<> makes that constraint explicit at both signatures. type ConnectCause = 'initial' | 'proactive_refresh' | 'auth_401_recovery' function oauthHeaders(accessToken: string): Record { return { Authorization: `Bearer ${accessToken}`, 'Content-Type': 'application/json', 'anthropic-version': ANTHROPIC_VERSION, } } export type EnvLessBridgeParams = { baseUrl: string orgUUID: string title: string getAccessToken: () => string | undefined onAuth401?: (staleAccessToken: string) => Promise /** * Converts internal Message[] → SDKMessage[] for writeMessages() and the * initial-flush/drain paths. Injected rather than imported — mappers.ts * transitively pulls in src/commands.ts (entire command registry + React * tree) which would bloat bundles that don't already have it. */ toSDKMessages: (messages: Message[]) => SDKMessage[] initialHistoryCap: number initialMessages?: Message[] onInboundMessage?: (msg: SDKMessage) => void | Promise /** * Fired on each title-worthy user message seen in writeMessages() until * the callback returns true (done). Mirrors replBridge.ts's onUserMessage — * caller derives a title and PATCHes /v1/sessions/{id} so auto-started * sessions don't stay at the generic fallback. The caller owns the * derive-at-count-1-and-3 policy; the transport just keeps calling until * told to stop. sessionId is the raw cse_* — updateBridgeSessionTitle * retags internally. */ onUserMessage?: (text: string, sessionId: string) => boolean onPermissionResponse?: (response: SDKControlResponse) => void onInterrupt?: () => void onSetModel?: (model: string | undefined) => void onSetMaxThinkingTokens?: (maxTokens: number | null) => void onSetPermissionMode?: ( mode: PermissionMode, ) => { ok: true } | { ok: false; error: string } onStateChange?: (state: BridgeState, detail?: string) => void /** * When true, skip opening the SSE read stream — only the CCRClient write * path is activated. Threaded to createV2ReplTransport and * handleServerControlRequest. */ outboundOnly?: boolean /** Free-form tags for session categorization (e.g. ['ccr-mirror']). */ tags?: string[] } /** * Create a session, fetch a worker JWT, connect the v2 transport. * * Returns null on any pre-flight failure (session create failed, /bridge * failed, transport setup failed). Caller (initReplBridge) surfaces this * as a generic "initialization failed" state. */ export async function initEnvLessBridgeCore( params: EnvLessBridgeParams, ): Promise { const { baseUrl, orgUUID, title, getAccessToken, onAuth401, toSDKMessages, initialHistoryCap, initialMessages, onInboundMessage, onUserMessage, onPermissionResponse, onInterrupt, onSetModel, onSetMaxThinkingTokens, onSetPermissionMode, onStateChange, outboundOnly, tags, } = params const cfg = await getEnvLessBridgeConfig() // ── 1. Create session (POST /v1/code/sessions, no env_id) ─────────────── const accessToken = getAccessToken() if (!accessToken) { logForDebugging('[remote-bridge] No OAuth token') return null } const createdSessionId = await withRetry( () => createCodeSession(baseUrl, accessToken, title, cfg.http_timeout_ms, tags), 'createCodeSession', cfg, ) if (!createdSessionId) { onStateChange?.('failed', 'Session creation failed — see debug log') logBridgeSkip('v2_session_create_failed', undefined, true) return null } const sessionId: string = createdSessionId logForDebugging(`[remote-bridge] Created session ${sessionId}`) logForDiagnosticsNoPII('info', 'bridge_repl_v2_session_created') // ── 2. Fetch bridge credentials (POST /bridge → worker_jwt, expires_in, api_base_url) ── const credentials = await withRetry( () => fetchRemoteCredentials( sessionId, baseUrl, accessToken, cfg.http_timeout_ms, ), 'fetchRemoteCredentials', cfg, ) if (!credentials) { onStateChange?.('failed', 'Remote credentials fetch failed — see debug log') logBridgeSkip('v2_remote_creds_failed', undefined, true) void archiveSession( sessionId, baseUrl, accessToken, orgUUID, cfg.http_timeout_ms, ) return null } logForDebugging( `[remote-bridge] Fetched bridge credentials (expires_in=${credentials.expires_in}s)`, ) // ── 3. Build v2 transport (SSETransport + CCRClient) ──────────────────── const sessionUrl = buildCCRv2SdkUrl(credentials.api_base_url, sessionId) logForDebugging(`[remote-bridge] v2 session URL: ${sessionUrl}`) let transport: ReplBridgeTransport try { transport = await createV2ReplTransport({ sessionUrl, ingressToken: credentials.worker_jwt, sessionId, epoch: credentials.worker_epoch, heartbeatIntervalMs: cfg.heartbeat_interval_ms, heartbeatJitterFraction: cfg.heartbeat_jitter_fraction, // Per-instance closure — keeps the worker JWT out of // process.env.CLAUDE_CODE_SESSION_ACCESS_TOKEN, which mcp/client.ts // reads ungatedly and would otherwise send to user-configured ws/http // MCP servers. Frozen-at-construction is correct: transport is fully // rebuilt on refresh (rebuildTransport below). getAuthToken: () => credentials.worker_jwt, outboundOnly, }) } catch (err) { logForDebugging( `[remote-bridge] v2 transport setup failed: ${errorMessage(err)}`, { level: 'error' }, ) onStateChange?.('failed', `Transport setup failed: ${errorMessage(err)}`) logBridgeSkip('v2_transport_setup_failed', undefined, true) void archiveSession( sessionId, baseUrl, accessToken, orgUUID, cfg.http_timeout_ms, ) return null } logForDebugging( `[remote-bridge] v2 transport created (epoch=${credentials.worker_epoch})`, ) onStateChange?.('ready') // ── 4. State ──────────────────────────────────────────────────────────── // Echo dedup: messages we POST come back on the read stream. Seeded with // initial message UUIDs so server echoes of flushed history are recognized. // Both sets cover initial UUIDs — recentPostedUUIDs is a 2000-cap ring buffer // and could evict them after enough live writes; initialMessageUUIDs is the // unbounded fallback. Defense-in-depth; mirrors replBridge.ts. const recentPostedUUIDs = new BoundedUUIDSet(cfg.uuid_dedup_buffer_size) const initialMessageUUIDs = new Set() if (initialMessages) { for (const msg of initialMessages) { initialMessageUUIDs.add(msg.uuid) recentPostedUUIDs.add(msg.uuid) } } // Defensive dedup for re-delivered inbound prompts (seq-num negotiation // edge cases, server history replay after transport swap). const recentInboundUUIDs = new BoundedUUIDSet(cfg.uuid_dedup_buffer_size) // FlushGate: queue live writes while the history flush POST is in flight, // so the server receives [history..., live...] in order. const flushGate = new FlushGate() let initialFlushDone = false let tornDown = false let authRecoveryInFlight = false // Latch for onUserMessage — flips true when the callback returns true // (policy says "done deriving"). sessionId is const (no re-create path — // rebuildTransport swaps JWT/epoch, same session), so no reset needed. let userMessageCallbackDone = !onUserMessage // Telemetry: why did onConnect fire? Set by rebuildTransport before // wireTransportCallbacks; read asynchronously by onConnect. Race-safe // because authRecoveryInFlight serializes rebuild callers, and a fresh // initEnvLessBridgeCore() call gets a fresh closure defaulting to 'initial'. let connectCause: ConnectCause = 'initial' // Deadline for onConnect after transport.connect(). Cleared by onConnect // (connected) and onClose (got a close — not silent). If neither fires // before cfg.connect_timeout_ms, onConnectTimeout emits — the only // signal for the `started → (silence)` gap. let connectDeadline: ReturnType | undefined function onConnectTimeout(cause: ConnectCause): void { if (tornDown) return logEvent('tengu_bridge_repl_connect_timeout', { v2: true, elapsed_ms: cfg.connect_timeout_ms, cause: cause as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, }) } // ── 5. JWT refresh scheduler ──────────────────────────────────────────── // Schedule a callback 5min before expiry (per response.expires_in). On fire, // re-fetch /bridge with OAuth → rebuild transport with fresh credentials. // Each /bridge call bumps epoch server-side, so a JWT-only swap would leave // the old CCRClient heartbeating with a stale epoch → 409 within 20s. // JWT is opaque — do not decode. const refresh = createTokenRefreshScheduler({ refreshBufferMs: cfg.token_refresh_buffer_ms, getAccessToken: async () => { // Unconditionally refresh OAuth before calling /bridge — getAccessToken() // returns expired tokens as non-null strings (doesn't check expiresAt), // so truthiness doesn't mean valid. Pass the stale token to onAuth401 // so handleOAuth401Error's keychain-comparison can detect parallel refresh. const stale = getAccessToken() if (onAuth401) await onAuth401(stale ?? '') return getAccessToken() ?? stale }, onRefresh: (sid, oauthToken) => { void (async () => { // Laptop wake: overdue proactive timer + SSE 401 fire ~simultaneously. // Claim the flag BEFORE the /bridge fetch so the other path skips // entirely — prevents double epoch bump (each /bridge call bumps; if // both fetch, the first rebuild gets a stale epoch and 409s). if (authRecoveryInFlight || tornDown) { logForDebugging( '[remote-bridge] Recovery already in flight, skipping proactive refresh', ) return } authRecoveryInFlight = true try { const fresh = await withRetry( () => fetchRemoteCredentials( sid, baseUrl, oauthToken, cfg.http_timeout_ms, ), 'fetchRemoteCredentials (proactive)', cfg, ) if (!fresh || tornDown) return await rebuildTransport(fresh, 'proactive_refresh') logForDebugging( '[remote-bridge] Transport rebuilt (proactive refresh)', ) } catch (err) { logForDebugging( `[remote-bridge] Proactive refresh rebuild failed: ${errorMessage(err)}`, { level: 'error' }, ) logForDiagnosticsNoPII( 'error', 'bridge_repl_v2_proactive_refresh_failed', ) if (!tornDown) { onStateChange?.('failed', `Refresh failed: ${errorMessage(err)}`) } } finally { authRecoveryInFlight = false } })() }, label: 'remote', }) refresh.scheduleFromExpiresIn(sessionId, credentials.expires_in) // ── 6. Wire callbacks (extracted so transport-rebuild can re-wire) ────── function wireTransportCallbacks(): void { transport.setOnConnect(() => { clearTimeout(connectDeadline) logForDebugging('[remote-bridge] v2 transport connected') logForDiagnosticsNoPII('info', 'bridge_repl_v2_transport_connected') logEvent('tengu_bridge_repl_ws_connected', { v2: true, cause: connectCause as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, }) if (!initialFlushDone && initialMessages && initialMessages.length > 0) { initialFlushDone = true // Capture current transport — if 401/teardown happens mid-flush, // the stale .finally() must not drain the gate or signal connected. // (Same guard pattern as replBridge.ts:1119.) const flushTransport = transport void flushHistory(initialMessages) .catch(e => logForDebugging(`[remote-bridge] flushHistory failed: ${e}`), ) .finally(() => { // authRecoveryInFlight catches the v1-vs-v2 asymmetry: v1 nulls // transport synchronously in setOnClose (replBridge.ts:1175), so // transport !== flushTransport trips immediately. v2 doesn't null — // transport reassigned only at rebuildTransport:346, 3 awaits deep. // authRecoveryInFlight is set synchronously at rebuildTransport entry. if ( transport !== flushTransport || tornDown || authRecoveryInFlight ) { return } drainFlushGate() onStateChange?.('connected') }) } else if (!flushGate.active) { onStateChange?.('connected') } }) transport.setOnData((data: string) => { handleIngressMessage( data, recentPostedUUIDs, recentInboundUUIDs, onInboundMessage, // Remote client answered the permission prompt — the turn resumes. // Without this the server stays on requires_action until the next // user message or turn-end result. onPermissionResponse ? res => { transport.reportState('running') onPermissionResponse(res) } : undefined, req => handleServerControlRequest(req, { transport, sessionId, onInterrupt, onSetModel, onSetMaxThinkingTokens, onSetPermissionMode, outboundOnly, }), ) }) transport.setOnClose((code?: number) => { clearTimeout(connectDeadline) if (tornDown) return logForDebugging(`[remote-bridge] v2 transport closed (code=${code})`) logEvent('tengu_bridge_repl_ws_closed', { code, v2: true }) // onClose fires only for TERMINAL failures: 401 (JWT invalid), // 4090 (CCR epoch mismatch), 4091 (CCR init failed), or SSE 10-min // reconnect budget exhausted. Transient disconnects are handled // transparently inside SSETransport. 401 we can recover from (fetch // fresh JWT, rebuild transport); all other codes are dead-ends. if (code === 401 && !authRecoveryInFlight) { void recoverFromAuthFailure() return } onStateChange?.('failed', `Transport closed (code ${code})`) }) } // ── 7. Transport rebuild (shared by proactive refresh + 401 recovery) ── // Every /bridge call bumps epoch server-side. Both refresh paths must // rebuild the transport with the new epoch — a JWT-only swap leaves the // old CCRClient heartbeating stale epoch → 409. SSE resumes from the old // transport's high-water-mark seq-num so no server-side replay. // Caller MUST set authRecoveryInFlight = true before calling (synchronously, // before any await) and clear it in a finally. This function doesn't manage // the flag — moving it here would be too late to prevent a double /bridge // fetch, and each fetch bumps epoch. async function rebuildTransport( fresh: RemoteCredentials, cause: Exclude, ): Promise { connectCause = cause // Queue writes during rebuild — once /bridge returns, the old transport's // epoch is stale and its next write/heartbeat 409s. Without this gate, // writeMessages adds UUIDs to recentPostedUUIDs then writeBatch silently // no-ops (closed uploader after 409) → permanent silent message loss. flushGate.start() try { const seq = transport.getLastSequenceNum() transport.close() transport = await createV2ReplTransport({ sessionUrl: buildCCRv2SdkUrl(fresh.api_base_url, sessionId), ingressToken: fresh.worker_jwt, sessionId, epoch: fresh.worker_epoch, heartbeatIntervalMs: cfg.heartbeat_interval_ms, heartbeatJitterFraction: cfg.heartbeat_jitter_fraction, initialSequenceNum: seq, getAuthToken: () => fresh.worker_jwt, outboundOnly, }) if (tornDown) { // Teardown fired during the async createV2ReplTransport window. // Don't wire/connect/schedule — we'd re-arm timers after cancelAll() // and fire onInboundMessage into a torn-down bridge. transport.close() return } wireTransportCallbacks() transport.connect() connectDeadline = setTimeout( onConnectTimeout, cfg.connect_timeout_ms, connectCause, ) refresh.scheduleFromExpiresIn(sessionId, fresh.expires_in) // Drain queued writes into the new uploader. Runs before // ccr.initialize() resolves (transport.connect() is fire-and-forget), // but the uploader serializes behind the initial PUT /worker. If // init fails (4091), events drop — but only recentPostedUUIDs // (per-instance) is populated, so re-enabling the bridge re-flushes. drainFlushGate() } finally { // End the gate on failure paths too — drainFlushGate already ended // it on success. Queued messages are dropped (transport still dead). flushGate.drop() } } // ── 8. 401 recovery (OAuth refresh + rebuild) ─────────────────────────── async function recoverFromAuthFailure(): Promise { // setOnClose already guards `!authRecoveryInFlight` but that check and // this set must be atomic against onRefresh — claim synchronously before // any await. Laptop wake fires both paths ~simultaneously. if (authRecoveryInFlight) return authRecoveryInFlight = true onStateChange?.('reconnecting', 'JWT expired — refreshing') logForDebugging('[remote-bridge] 401 on SSE — attempting JWT refresh') try { // Unconditionally try OAuth refresh — getAccessToken() returns expired // tokens as non-null strings, so !oauthToken doesn't catch expiry. // Pass the stale token so handleOAuth401Error's keychain-comparison // can detect if another tab already refreshed. const stale = getAccessToken() if (onAuth401) await onAuth401(stale ?? '') const oauthToken = getAccessToken() ?? stale if (!oauthToken || tornDown) { if (!tornDown) { onStateChange?.('failed', 'JWT refresh failed: no OAuth token') } return } const fresh = await withRetry( () => fetchRemoteCredentials( sessionId, baseUrl, oauthToken, cfg.http_timeout_ms, ), 'fetchRemoteCredentials (recovery)', cfg, ) if (!fresh || tornDown) { if (!tornDown) { onStateChange?.('failed', 'JWT refresh failed after 401') } return } // If 401 interrupted the initial flush, writeBatch may have silently // no-op'd on the closed uploader (ccr.close() ran in the SSE wrapper // before our setOnClose callback). Reset so the new onConnect re-flushes. // (v1 scopes initialFlushDone inside the per-transport closure at // replBridge.ts:1027 so it resets naturally; v2 has it at outer scope.) initialFlushDone = false await rebuildTransport(fresh, 'auth_401_recovery') logForDebugging('[remote-bridge] Transport rebuilt after 401') } catch (err) { logForDebugging( `[remote-bridge] 401 recovery failed: ${errorMessage(err)}`, { level: 'error' }, ) logForDiagnosticsNoPII('error', 'bridge_repl_v2_jwt_refresh_failed') if (!tornDown) { onStateChange?.('failed', `JWT refresh failed: ${errorMessage(err)}`) } } finally { authRecoveryInFlight = false } } wireTransportCallbacks() // Start flushGate BEFORE connect so writeMessages() during handshake // queues instead of racing the history POST. if (initialMessages && initialMessages.length > 0) { flushGate.start() } transport.connect() connectDeadline = setTimeout( onConnectTimeout, cfg.connect_timeout_ms, connectCause, ) // ── 8. History flush + drain helpers ──────────────────────────────────── function drainFlushGate(): void { const msgs = flushGate.end() if (msgs.length === 0) return for (const msg of msgs) recentPostedUUIDs.add(msg.uuid) const events = toSDKMessages(msgs).map(m => ({ ...m, session_id: sessionId, })) if (msgs.some(m => m.type === 'user')) { transport.reportState('running') } logForDebugging( `[remote-bridge] Drained ${msgs.length} queued message(s) after flush`, ) void transport.writeBatch(events) } async function flushHistory(msgs: Message[]): Promise { // v2 always creates a fresh server session (unconditional createCodeSession // above) — no session reuse, no double-post risk. Unlike v1, we do NOT // filter by previouslyFlushedUUIDs: that set persists across REPL enable/ // disable cycles (useRef), so it would wrongly suppress history on re-enable. const eligible = msgs.filter(isEligibleBridgeMessage) const capped = initialHistoryCap > 0 && eligible.length > initialHistoryCap ? eligible.slice(-initialHistoryCap) : eligible if (capped.length < eligible.length) { logForDebugging( `[remote-bridge] Capped initial flush: ${eligible.length} -> ${capped.length} (cap=${initialHistoryCap})`, ) } const events = toSDKMessages(capped).map(m => ({ ...m, session_id: sessionId, })) if (events.length === 0) return // Mid-turn init: if Remote Control is enabled while a query is running, // the last eligible message is a user prompt or tool_result (both 'user' // type). Without this the init PUT's 'idle' sticks until the next user- // type message forwards via writeMessages — which for a pure-text turn // is never (only assistant chunks stream post-init). Check eligible (pre- // cap), not capped: the cap may truncate to a user message even when the // actual trailing message is assistant. if (eligible.at(-1)?.type === 'user') { transport.reportState('running') } logForDebugging(`[remote-bridge] Flushing ${events.length} history events`) await transport.writeBatch(events) } // ── 9. Teardown ─────────────────────────────────────────────────────────── // On SIGINT/SIGTERM/⁠/exit, gracefulShutdown races runCleanupFunctions() // against a 2s cap before forceExit kills the process. Budget accordingly: // - archive: teardown_archive_timeout_ms (default 1500, cap 2000) // - result write: fire-and-forget, archive latency covers the drain // - 401 retry: only if first archive 401s, shares the same budget async function teardown(): Promise { if (tornDown) return tornDown = true refresh.cancelAll() clearTimeout(connectDeadline) flushGate.drop() // Fire the result message before archive — transport.write() only awaits // enqueue (SerialBatchEventUploader resolves once buffered, drain is // async). Archiving before close() gives the uploader's drain loop a // window (typical archive ≈ 100-500ms) to POST the result without an // explicit sleep. close() sets closed=true which interrupts drain at the // next while-check, so close-before-archive drops the result. transport.reportState('idle') void transport.write(makeResultMessage(sessionId)) let token = getAccessToken() let status = await archiveSession( sessionId, baseUrl, token, orgUUID, cfg.teardown_archive_timeout_ms, ) // Token is usually fresh (refresh scheduler runs 5min before expiry) but // laptop-wake past the refresh window leaves getAccessToken() returning a // stale string. Retry once on 401 — onAuth401 (= handleOAuth401Error) // clears keychain cache + force-refreshes. No proactive refresh on the // happy path: handleOAuth401Error force-refreshes even valid tokens, // which would waste budget 99% of the time. try/catch mirrors // recoverFromAuthFailure: keychain reads can throw (macOS locked after // wake); an uncaught throw here would skip transport.close + telemetry. if (status === 401 && onAuth401) { try { await onAuth401(token ?? '') token = getAccessToken() status = await archiveSession( sessionId, baseUrl, token, orgUUID, cfg.teardown_archive_timeout_ms, ) } catch (err) { logForDebugging( `[remote-bridge] Teardown 401 retry threw: ${errorMessage(err)}`, { level: 'error' }, ) } } transport.close() const archiveStatus: ArchiveTelemetryStatus = status === 'no_token' ? 'skipped_no_token' : status === 'timeout' || status === 'error' ? 'network_error' : status >= 500 ? 'server_5xx' : status >= 400 ? 'server_4xx' : 'ok' logForDebugging(`[remote-bridge] Torn down (archive=${status})`) logForDiagnosticsNoPII('info', 'bridge_repl_v2_teardown') logEvent( feature('CCR_MIRROR') && outboundOnly ? 'tengu_ccr_mirror_teardown' : 'tengu_bridge_repl_teardown', { v2: true, archive_status: archiveStatus as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, archive_ok: typeof status === 'number' && status < 400, archive_http_status: typeof status === 'number' ? status : undefined, archive_timeout: status === 'timeout', archive_no_token: status === 'no_token', }, ) } const unregister = registerCleanup(teardown) if (feature('CCR_MIRROR') && outboundOnly) { logEvent('tengu_ccr_mirror_started', { v2: true, expires_in_s: credentials.expires_in, }) } else { logEvent('tengu_bridge_repl_started', { has_initial_messages: !!(initialMessages && initialMessages.length > 0), v2: true, expires_in_s: credentials.expires_in, inProtectedNamespace: isInProtectedNamespace(), }) } // ── 10. Handle ────────────────────────────────────────────────────────── return { bridgeSessionId: sessionId, environmentId: '', sessionIngressUrl: credentials.api_base_url, writeMessages(messages) { const filtered = messages.filter( m => isEligibleBridgeMessage(m) && !initialMessageUUIDs.has(m.uuid) && !recentPostedUUIDs.has(m.uuid), ) if (filtered.length === 0) return // Fire onUserMessage for title derivation. Scan before the flushGate // check — prompts are title-worthy even if they queue. Keeps calling // on every title-worthy message until the callback returns true; the // caller owns the policy (derive at 1st and 3rd, skip if explicit). if (!userMessageCallbackDone) { for (const m of filtered) { const text = extractTitleText(m) if (text !== undefined && onUserMessage?.(text, sessionId)) { userMessageCallbackDone = true break } } } if (flushGate.enqueue(...filtered)) { logForDebugging( `[remote-bridge] Queued ${filtered.length} message(s) during flush`, ) return } for (const msg of filtered) recentPostedUUIDs.add(msg.uuid) const events = toSDKMessages(filtered).map(m => ({ ...m, session_id: sessionId, })) // v2 does not derive worker_status from events server-side (unlike v1 // session-ingress session_status_updater.go). Push it from here so the // CCR web session list shows Running instead of stuck on Idle. A user // message in the batch marks turn start. CCRClient.reportState dedupes // consecutive same-state pushes. if (filtered.some(m => m.type === 'user')) { transport.reportState('running') } logForDebugging(`[remote-bridge] Sending ${filtered.length} message(s)`) void transport.writeBatch(events) }, writeSdkMessages(messages: SDKMessage[]) { const filtered = messages.filter( m => !m.uuid || !recentPostedUUIDs.has(m.uuid), ) if (filtered.length === 0) return for (const msg of filtered) { if (msg.uuid) recentPostedUUIDs.add(msg.uuid) } const events = filtered.map(m => ({ ...m, session_id: sessionId })) void transport.writeBatch(events) }, sendControlRequest(request: SDKControlRequest) { if (authRecoveryInFlight) { logForDebugging( `[remote-bridge] Dropping control_request during 401 recovery: ${request.request_id}`, ) return } const event = { ...request, session_id: sessionId } if (request.request.subtype === 'can_use_tool') { transport.reportState('requires_action') } void transport.write(event) logForDebugging( `[remote-bridge] Sent control_request request_id=${request.request_id}`, ) }, sendControlResponse(response: SDKControlResponse) { if (authRecoveryInFlight) { logForDebugging( '[remote-bridge] Dropping control_response during 401 recovery', ) return } const event = { ...response, session_id: sessionId } transport.reportState('running') void transport.write(event) logForDebugging('[remote-bridge] Sent control_response') }, sendControlCancelRequest(requestId: string) { if (authRecoveryInFlight) { logForDebugging( `[remote-bridge] Dropping control_cancel_request during 401 recovery: ${requestId}`, ) return } const event = { type: 'control_cancel_request' as const, request_id: requestId, session_id: sessionId, } // Hook/classifier/channel/recheck resolved the permission locally — // interactiveHandler calls only cancelRequest (no sendResponse) on // those paths, so without this the server stays on requires_action. transport.reportState('running') void transport.write(event) logForDebugging( `[remote-bridge] Sent control_cancel_request request_id=${requestId}`, ) }, sendResult() { if (authRecoveryInFlight) { logForDebugging('[remote-bridge] Dropping result during 401 recovery') return } transport.reportState('idle') void transport.write(makeResultMessage(sessionId)) logForDebugging(`[remote-bridge] Sent result`) }, async teardown() { unregister() await teardown() }, } } // ─── Session API (v2 /code/sessions, no env) ───────────────────────────────── /** Retry an async init call with exponential backoff + jitter. */ async function withRetry( fn: () => Promise, label: string, cfg: EnvLessBridgeConfig, ): Promise { const max = cfg.init_retry_max_attempts for (let attempt = 1; attempt <= max; attempt++) { const result = await fn() if (result !== null) return result if (attempt < max) { const base = cfg.init_retry_base_delay_ms * 2 ** (attempt - 1) const jitter = base * cfg.init_retry_jitter_fraction * (2 * Math.random() - 1) const delay = Math.min(base + jitter, cfg.init_retry_max_delay_ms) logForDebugging( `[remote-bridge] ${label} failed (attempt ${attempt}/${max}), retrying in ${Math.round(delay)}ms`, ) await sleep(delay) } } return null } // Moved to codeSessionApi.ts so the SDK /bridge subpath can bundle them // without pulling in this file's heavy CLI tree (analytics, transport). export { createCodeSession, type RemoteCredentials, } from './codeSessionApi.js' import { createCodeSession, fetchRemoteCredentials as fetchRemoteCredentialsRaw, type RemoteCredentials, } from './codeSessionApi.js' import { getBridgeBaseUrlOverride } from './bridgeConfig.js' // CLI-side wrapper that applies the CLAUDE_BRIDGE_BASE_URL dev override and // injects the trusted-device token (both are env/GrowthBook reads that the // SDK-facing codeSessionApi.ts export must stay free of). export async function fetchRemoteCredentials( sessionId: string, baseUrl: string, accessToken: string, timeoutMs: number, ): Promise { const creds = await fetchRemoteCredentialsRaw( sessionId, baseUrl, accessToken, timeoutMs, getTrustedDeviceToken(), ) if (!creds) return null return getBridgeBaseUrlOverride() ? { ...creds, api_base_url: baseUrl } : creds } type ArchiveStatus = number | 'timeout' | 'error' | 'no_token' // Single categorical for BQ `GROUP BY archive_status`. The booleans on // _teardown predate this and are redundant with it (except archive_timeout, // which distinguishes ECONNABORTED from other network errors — both map to // 'network_error' here since the dominant cause in a 1.5s window is timeout). type ArchiveTelemetryStatus = | 'ok' | 'skipped_no_token' | 'network_error' | 'server_4xx' | 'server_5xx' async function archiveSession( sessionId: string, baseUrl: string, accessToken: string | undefined, orgUUID: string, timeoutMs: number, ): Promise { if (!accessToken) return 'no_token' // Archive lives at the compat layer (/v1/sessions/*, not /v1/code/sessions). // compat.parseSessionID only accepts TagSession (session_*), so retag cse_*. // anthropic-beta + x-organization-uuid are required — without them the // compat gateway 404s before reaching the handler. // // Unlike bridgeMain.ts (which caches compatId in sessionCompatIds to keep // in-memory titledSessions/logger keys consistent across a mid-session // gate flip), this compatId is only a server URL path segment — no // in-memory state. Fresh compute matches whatever the server currently // validates: if the gate is OFF, the server has been updated to accept // cse_* and we correctly send it. const compatId = toCompatSessionId(sessionId) try { const response = await axios.post( `${baseUrl}/v1/sessions/${compatId}/archive`, {}, { headers: { ...oauthHeaders(accessToken), 'anthropic-beta': 'ccr-byoc-2025-07-29', 'x-organization-uuid': orgUUID, }, timeout: timeoutMs, validateStatus: () => true, }, ) logForDebugging( `[remote-bridge] Archive ${compatId} status=${response.status}`, ) return response.status } catch (err) { const msg = errorMessage(err) logForDebugging(`[remote-bridge] Archive failed: ${msg}`) return axios.isAxiosError(err) && err.code === 'ECONNABORTED' ? 'timeout' : 'error' } }