source dump of claude code
at main 2406 lines 100 kB view raw
1// biome-ignore-all assist/source/organizeImports: ANT-ONLY import markers must not be reordered 2import { randomUUID } from 'crypto' 3import { 4 createBridgeApiClient, 5 BridgeFatalError, 6 isExpiredErrorType, 7 isSuppressible403, 8} from './bridgeApi.js' 9import type { BridgeConfig, BridgeApiClient } from './types.js' 10import { logForDebugging } from '../utils/debug.js' 11import { logForDiagnosticsNoPII } from '../utils/diagLogs.js' 12import { 13 type AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 14 logEvent, 15} from '../services/analytics/index.js' 16import { registerCleanup } from '../utils/cleanupRegistry.js' 17import { 18 handleIngressMessage, 19 handleServerControlRequest, 20 makeResultMessage, 21 isEligibleBridgeMessage, 22 extractTitleText, 23 BoundedUUIDSet, 24} from './bridgeMessaging.js' 25import { 26 decodeWorkSecret, 27 buildSdkUrl, 28 buildCCRv2SdkUrl, 29 sameSessionId, 30} from './workSecret.js' 31import { toCompatSessionId, toInfraSessionId } from './sessionIdCompat.js' 32import { updateSessionBridgeId } from '../utils/concurrentSessions.js' 33import { getTrustedDeviceToken } from './trustedDevice.js' 34import { HybridTransport } from '../cli/transports/HybridTransport.js' 35import { 36 type ReplBridgeTransport, 37 createV1ReplTransport, 38 createV2ReplTransport, 39} from './replBridgeTransport.js' 40import { updateSessionIngressAuthToken } from '../utils/sessionIngressAuth.js' 41import { isEnvTruthy, isInProtectedNamespace } from '../utils/envUtils.js' 42import { validateBridgeId } from './bridgeApi.js' 43import { 44 describeAxiosError, 45 extractHttpStatus, 46 logBridgeSkip, 47} from './debugUtils.js' 48import type { Message } from '../types/message.js' 49import type { SDKMessage } from '../entrypoints/agentSdkTypes.js' 50import type { PermissionMode } from '../utils/permissions/PermissionMode.js' 51import type { 52 SDKControlRequest, 53 SDKControlResponse, 54} from '../entrypoints/sdk/controlTypes.js' 55import { createCapacityWake, type CapacitySignal } from './capacityWake.js' 56import { FlushGate } from './flushGate.js' 57import { 58 DEFAULT_POLL_CONFIG, 59 type PollIntervalConfig, 60} from './pollConfigDefaults.js' 61import { errorMessage } from '../utils/errors.js' 62import { sleep } from '../utils/sleep.js' 63import { 64 wrapApiForFaultInjection, 65 registerBridgeDebugHandle, 66 clearBridgeDebugHandle, 67 injectBridgeFault, 68} from './bridgeDebug.js' 69 70export type ReplBridgeHandle = { 71 bridgeSessionId: string 72 environmentId: string 73 sessionIngressUrl: string 74 writeMessages(messages: Message[]): void 75 writeSdkMessages(messages: SDKMessage[]): void 76 sendControlRequest(request: SDKControlRequest): void 77 sendControlResponse(response: SDKControlResponse): void 78 sendControlCancelRequest(requestId: string): void 79 sendResult(): void 80 teardown(): Promise<void> 81} 82 83export type BridgeState = 'ready' | 'connected' | 'reconnecting' | 'failed' 84 85/** 86 * Explicit-param input to initBridgeCore. Everything initReplBridge reads 87 * from bootstrap state (cwd, session ID, git, OAuth) becomes a field here. 88 * A daemon caller (Agent SDK, PR 4) that never runs main.tsx fills these 89 * in itself. 90 */ 91export type BridgeCoreParams = { 92 dir: string 93 machineName: string 94 branch: string 95 gitRepoUrl: string | null 96 title: string 97 baseUrl: string 98 sessionIngressUrl: string 99 /** 100 * Opaque string sent as metadata.worker_type. Use BridgeWorkerType for 101 * the two CLI-originated values; daemon callers may send any string the 102 * backend recognizes (it's just a filter key on the web side). 103 */ 104 workerType: string 105 getAccessToken: () => string | undefined 106 /** 107 * POST /v1/sessions. Injected because `createSession.ts` lazy-loads 108 * `auth.ts`/`model.ts`/`oauth/client.ts` and `bun --outfile` inlines 109 * dynamic imports — the lazy-load doesn't help, the whole REPL tree ends 110 * up in the Agent SDK bundle. 111 * 112 * REPL wrapper passes `createBridgeSession` from `createSession.ts`. 113 * Daemon wrapper passes `createBridgeSessionLean` from `sessionApi.ts` 114 * (HTTP-only, orgUUID+model supplied by the daemon caller). 115 * 116 * Receives `gitRepoUrl`+`branch` so the REPL wrapper can build the git 117 * source/outcome for claude.ai's session card. Daemon ignores them. 118 */ 119 createSession: (opts: { 120 environmentId: string 121 title: string 122 gitRepoUrl: string | null 123 branch: string 124 signal: AbortSignal 125 }) => Promise<string | null> 126 /** 127 * POST /v1/sessions/{id}/archive. Same injection rationale. Best-effort; 128 * the callback MUST NOT throw. 129 */ 130 archiveSession: (sessionId: string) => Promise<void> 131 /** 132 * Invoked on reconnect-after-env-lost to refresh the title. REPL wrapper 133 * reads session storage (picks up /rename); daemon returns the static 134 * title. Defaults to () => title. 135 */ 136 getCurrentTitle?: () => string 137 /** 138 * Converts internal Message[] → SDKMessage[] for writeMessages() and the 139 * initial-flush/drain paths. REPL wrapper passes the real toSDKMessages 140 * from utils/messages/mappers.ts. Daemon callers that only use 141 * writeSdkMessages() and pass no initialMessages can omit this — those 142 * code paths are unreachable. 143 * 144 * Injected rather than imported because mappers.ts transitively pulls in 145 * src/commands.ts via messages.ts → api.ts → prompts.ts, dragging the 146 * entire command registry + React tree into the Agent SDK bundle. 147 */ 148 toSDKMessages?: (messages: Message[]) => SDKMessage[] 149 /** 150 * OAuth 401 refresh handler passed to createBridgeApiClient. REPL wrapper 151 * passes handleOAuth401Error; daemon passes its AuthManager's handler. 152 * Injected because utils/auth.ts transitively pulls in the command 153 * registry via config.ts → file.ts → permissions/filesystem.ts → 154 * sessionStorage.ts → commands.ts. 155 */ 156 onAuth401?: (staleAccessToken: string) => Promise<boolean> 157 /** 158 * Poll interval config getter for the work-poll heartbeat loop. REPL 159 * wrapper passes the GrowthBook-backed getPollIntervalConfig (allows ops 160 * to live-tune poll rates fleet-wide). Daemon passes a static config 161 * with a 60s heartbeat (5× headroom under the 300s work-lease TTL). 162 * Injected because growthbook.ts transitively pulls in the command 163 * registry via the same config.ts chain. 164 */ 165 getPollIntervalConfig?: () => PollIntervalConfig 166 /** 167 * Max initial messages to replay on connect. REPL wrapper reads from the 168 * tengu_bridge_initial_history_cap GrowthBook flag. Daemon passes no 169 * initialMessages so this is never read. Default 200 matches the flag 170 * default. 171 */ 172 initialHistoryCap?: number 173 // Same REPL-flush machinery as InitBridgeOptions — daemon omits these. 174 initialMessages?: Message[] 175 previouslyFlushedUUIDs?: Set<string> 176 onInboundMessage?: (msg: SDKMessage) => void 177 onPermissionResponse?: (response: SDKControlResponse) => void 178 onInterrupt?: () => void 179 onSetModel?: (model: string | undefined) => void 180 onSetMaxThinkingTokens?: (maxTokens: number | null) => void 181 /** 182 * Returns a policy verdict so this module can emit an error control_response 183 * without importing the policy checks itself (bootstrap-isolation constraint). 184 * The callback must guard `auto` (isAutoModeGateEnabled) and 185 * `bypassPermissions` (isBypassPermissionsModeDisabled AND 186 * isBypassPermissionsModeAvailable) BEFORE calling transitionPermissionMode — 187 * that function's internal auto-gate check is a defensive throw, not a 188 * graceful guard, and its side-effect order is setAutoModeActive(true) then 189 * throw, which corrupts the 3-way invariant documented in src/CLAUDE.md if 190 * the callback lets the throw escape here. 191 */ 192 onSetPermissionMode?: ( 193 mode: PermissionMode, 194 ) => { ok: true } | { ok: false; error: string } 195 onStateChange?: (state: BridgeState, detail?: string) => void 196 /** 197 * Fires on each real user message to flow through writeMessages() until 198 * the callback returns true (done). Mirrors remoteBridgeCore.ts's 199 * onUserMessage so the REPL bridge can derive a session title from early 200 * prompts when none was set at init time (e.g. user runs /remote-control 201 * on an empty conversation, then types). Tool-result wrappers, meta 202 * messages, and display-tag-only messages are skipped. Receives 203 * currentSessionId so the wrapper can PATCH the title without a closure 204 * dance to reach the not-yet-returned handle. The caller owns the 205 * derive-at-count-1-and-3 policy; the transport just keeps calling until 206 * told to stop. Not fired for the writeSdkMessages daemon path (daemon 207 * sets its own title at init). Distinct from SessionSpawnOpts's 208 * onFirstUserMessage (spawn-bridge, PR #21250), which stays fire-once. 209 */ 210 onUserMessage?: (text: string, sessionId: string) => boolean 211 /** See InitBridgeOptions.perpetual. */ 212 perpetual?: boolean 213 /** 214 * Seeds lastTransportSequenceNum — the SSE event-stream high-water mark 215 * that's carried across transport swaps within one process. Daemon callers 216 * pass the value they persisted at shutdown so the FIRST SSE connect of a 217 * fresh process sends from_sequence_num and the server doesn't replay full 218 * history. REPL callers omit (fresh session each run → 0 is correct). 219 */ 220 initialSSESequenceNum?: number 221} 222 223/** 224 * Superset of ReplBridgeHandle. Adds getSSESequenceNum for daemon callers 225 * that persist the SSE seq-num across process restarts and pass it back as 226 * initialSSESequenceNum on the next start. 227 */ 228export type BridgeCoreHandle = ReplBridgeHandle & { 229 /** 230 * Current SSE sequence-number high-water mark. Updates as transports 231 * swap. Daemon callers persist this on shutdown and pass it back as 232 * initialSSESequenceNum on next start. 233 */ 234 getSSESequenceNum(): number 235} 236 237/** 238 * Poll error recovery constants. When the work poll starts failing (e.g. 239 * server 500s), we use exponential backoff and give up after this timeout. 240 * This is deliberately long — the server is the authority on when a session 241 * is truly dead. As long as the server accepts our poll, we keep waiting 242 * for it to re-dispatch the work item. 243 */ 244const POLL_ERROR_INITIAL_DELAY_MS = 2_000 245const POLL_ERROR_MAX_DELAY_MS = 60_000 246const POLL_ERROR_GIVE_UP_MS = 15 * 60 * 1000 247 248// Monotonically increasing counter for distinguishing init calls in logs 249let initSequence = 0 250 251/** 252 * Bootstrap-free core: env registration → session creation → poll loop → 253 * ingress WS → teardown. Reads nothing from bootstrap/state or 254 * sessionStorage — all context comes from params. Caller (initReplBridge 255 * below, or a daemon in PR 4) has already passed entitlement gates and 256 * gathered git/auth/title. 257 * 258 * Returns null on registration or session-creation failure. 259 */ 260export async function initBridgeCore( 261 params: BridgeCoreParams, 262): Promise<BridgeCoreHandle | null> { 263 const { 264 dir, 265 machineName, 266 branch, 267 gitRepoUrl, 268 title, 269 baseUrl, 270 sessionIngressUrl, 271 workerType, 272 getAccessToken, 273 createSession, 274 archiveSession, 275 getCurrentTitle = () => title, 276 toSDKMessages = () => { 277 throw new Error( 278 'BridgeCoreParams.toSDKMessages not provided. Pass it if you use writeMessages() or initialMessages — daemon callers that only use writeSdkMessages() never hit this path.', 279 ) 280 }, 281 onAuth401, 282 getPollIntervalConfig = () => DEFAULT_POLL_CONFIG, 283 initialHistoryCap = 200, 284 initialMessages, 285 previouslyFlushedUUIDs, 286 onInboundMessage, 287 onPermissionResponse, 288 onInterrupt, 289 onSetModel, 290 onSetMaxThinkingTokens, 291 onSetPermissionMode, 292 onStateChange, 293 onUserMessage, 294 perpetual, 295 initialSSESequenceNum = 0, 296 } = params 297 298 const seq = ++initSequence 299 300 // bridgePointer import hoisted: perpetual mode reads it before register; 301 // non-perpetual writes it after session create; both use clear at teardown. 302 const { writeBridgePointer, clearBridgePointer, readBridgePointer } = 303 await import('./bridgePointer.js') 304 305 // Perpetual mode: read the crash-recovery pointer and treat it as prior 306 // state. The pointer is written unconditionally after session create 307 // (crash-recovery for all sessions); perpetual mode just skips the 308 // teardown clear so it survives clean exits too. Only reuse 'repl' 309 // pointers — a crashed standalone bridge (`claude remote-control`) 310 // writes source:'standalone' with a different workerType. 311 const rawPrior = perpetual ? await readBridgePointer(dir) : null 312 const prior = rawPrior?.source === 'repl' ? rawPrior : null 313 314 logForDebugging( 315 `[bridge:repl] initBridgeCore #${seq} starting (initialMessages=${initialMessages?.length ?? 0}${prior ? ` perpetual prior=env:${prior.environmentId}` : ''})`, 316 ) 317 318 // 5. Register bridge environment 319 const rawApi = createBridgeApiClient({ 320 baseUrl, 321 getAccessToken, 322 runnerVersion: MACRO.VERSION, 323 onDebug: logForDebugging, 324 onAuth401, 325 getTrustedDeviceToken, 326 }) 327 // Ant-only: interpose so /bridge-kick can inject poll/register/heartbeat 328 // failures. Zero cost in external builds (rawApi passes through unchanged). 329 const api = 330 process.env.USER_TYPE === 'ant' ? wrapApiForFaultInjection(rawApi) : rawApi 331 332 const bridgeConfig: BridgeConfig = { 333 dir, 334 machineName, 335 branch, 336 gitRepoUrl, 337 maxSessions: 1, 338 spawnMode: 'single-session', 339 verbose: false, 340 sandbox: false, 341 bridgeId: randomUUID(), 342 workerType, 343 environmentId: randomUUID(), 344 reuseEnvironmentId: prior?.environmentId, 345 apiBaseUrl: baseUrl, 346 sessionIngressUrl, 347 } 348 349 let environmentId: string 350 let environmentSecret: string 351 try { 352 const reg = await api.registerBridgeEnvironment(bridgeConfig) 353 environmentId = reg.environment_id 354 environmentSecret = reg.environment_secret 355 } catch (err) { 356 logBridgeSkip( 357 'registration_failed', 358 `[bridge:repl] Environment registration failed: ${errorMessage(err)}`, 359 ) 360 // Stale pointer may be the cause (expired/deleted env) — clear it so 361 // the next start doesn't retry the same dead ID. 362 if (prior) { 363 await clearBridgePointer(dir) 364 } 365 onStateChange?.('failed', errorMessage(err)) 366 return null 367 } 368 369 logForDebugging(`[bridge:repl] Environment registered: ${environmentId}`) 370 logForDiagnosticsNoPII('info', 'bridge_repl_env_registered') 371 logEvent('tengu_bridge_repl_env_registered', {}) 372 373 /** 374 * Reconnect-in-place: if the just-registered environmentId matches what 375 * was requested, call reconnectSession to force-stop stale workers and 376 * re-queue the session. Used at init (perpetual mode — env is alive but 377 * idle after clean teardown) and in doReconnect() Strategy 1 (env lost 378 * then resurrected). Returns true on success; caller falls back to 379 * fresh session creation on false. 380 */ 381 async function tryReconnectInPlace( 382 requestedEnvId: string, 383 sessionId: string, 384 ): Promise<boolean> { 385 if (environmentId !== requestedEnvId) { 386 logForDebugging( 387 `[bridge:repl] Env mismatch (requested ${requestedEnvId}, got ${environmentId}) — cannot reconnect in place`, 388 ) 389 return false 390 } 391 // The pointer stores what createBridgeSession returned (session_*, 392 // compat/convert.go:41). /bridge/reconnect is an environments-layer 393 // endpoint — once the server's ccr_v2_compat_enabled gate is on it 394 // looks sessions up by their infra tag (cse_*) and returns "Session 395 // not found" for the session_* costume. We don't know the gate state 396 // pre-poll, so try both; the re-tag is a no-op if the ID is already 397 // cse_* (doReconnect Strategy 1 path — currentSessionId never mutates 398 // to cse_* but future-proof the check). 399 const infraId = toInfraSessionId(sessionId) 400 const candidates = 401 infraId === sessionId ? [sessionId] : [sessionId, infraId] 402 for (const id of candidates) { 403 try { 404 await api.reconnectSession(environmentId, id) 405 logForDebugging( 406 `[bridge:repl] Reconnected session ${id} in place on env ${environmentId}`, 407 ) 408 return true 409 } catch (err) { 410 logForDebugging( 411 `[bridge:repl] reconnectSession(${id}) failed: ${errorMessage(err)}`, 412 ) 413 } 414 } 415 logForDebugging( 416 '[bridge:repl] reconnectSession exhausted — falling through to fresh session', 417 ) 418 return false 419 } 420 421 // Perpetual init: env is alive but has no queued work after clean 422 // teardown. reconnectSession re-queues it. doReconnect() has the same 423 // call but only fires on poll 404 (env dead); 424 // here the env is alive but idle. 425 const reusedPriorSession = prior 426 ? await tryReconnectInPlace(prior.environmentId, prior.sessionId) 427 : false 428 if (prior && !reusedPriorSession) { 429 await clearBridgePointer(dir) 430 } 431 432 // 6. Create session on the bridge. Initial messages are NOT included as 433 // session creation events because those use STREAM_ONLY persistence and 434 // are published before the CCR UI subscribes, so they get lost. Instead, 435 // initial messages are flushed via the ingress WebSocket once it connects. 436 437 // Mutable session ID — updated when the environment+session pair is 438 // re-created after a connection loss. 439 let currentSessionId: string 440 441 442 if (reusedPriorSession && prior) { 443 currentSessionId = prior.sessionId 444 logForDebugging( 445 `[bridge:repl] Perpetual session reused: ${currentSessionId}`, 446 ) 447 // Server already has all initialMessages from the prior CLI run. Mark 448 // them as previously-flushed so the initial flush filter excludes them 449 // (previouslyFlushedUUIDs is a fresh Set on every CLI start). Duplicate 450 // UUIDs cause the server to kill the WebSocket. 451 if (initialMessages && previouslyFlushedUUIDs) { 452 for (const msg of initialMessages) { 453 previouslyFlushedUUIDs.add(msg.uuid) 454 } 455 } 456 } else { 457 const createdSessionId = await createSession({ 458 environmentId, 459 title, 460 gitRepoUrl, 461 branch, 462 signal: AbortSignal.timeout(15_000), 463 }) 464 465 if (!createdSessionId) { 466 logForDebugging( 467 '[bridge:repl] Session creation failed, deregistering environment', 468 ) 469 logEvent('tengu_bridge_repl_session_failed', {}) 470 await api.deregisterEnvironment(environmentId).catch(() => {}) 471 onStateChange?.('failed', 'Session creation failed') 472 return null 473 } 474 475 currentSessionId = createdSessionId 476 logForDebugging(`[bridge:repl] Session created: ${currentSessionId}`) 477 } 478 479 // Crash-recovery pointer: written now so a kill -9 at any point after 480 // this leaves a recoverable trail. Cleared in teardown (non-perpetual) 481 // or left alone (perpetual mode — pointer survives clean exit too). 482 // `claude remote-control --continue` from the same directory will detect 483 // it and offer to resume. 484 await writeBridgePointer(dir, { 485 sessionId: currentSessionId, 486 environmentId, 487 source: 'repl', 488 }) 489 logForDiagnosticsNoPII('info', 'bridge_repl_session_created') 490 logEvent('tengu_bridge_repl_started', { 491 has_initial_messages: !!(initialMessages && initialMessages.length > 0), 492 inProtectedNamespace: isInProtectedNamespace(), 493 }) 494 495 // UUIDs of initial messages. Used for dedup in writeMessages to avoid 496 // re-sending messages that were already flushed on WebSocket open. 497 const initialMessageUUIDs = new Set<string>() 498 if (initialMessages) { 499 for (const msg of initialMessages) { 500 initialMessageUUIDs.add(msg.uuid) 501 } 502 } 503 504 // Bounded ring buffer of UUIDs for messages we've already sent to the 505 // server via the ingress WebSocket. Serves two purposes: 506 // 1. Echo filtering — ignore our own messages bouncing back on the WS. 507 // 2. Secondary dedup in writeMessages — catch race conditions where 508 // the hook's index-based tracking isn't sufficient. 509 // 510 // Seeded with initialMessageUUIDs so that when the server echoes back 511 // the initial conversation context over the ingress WebSocket, those 512 // messages are recognized as echoes and not re-injected into the REPL. 513 // 514 // Capacity of 2000 covers well over any realistic echo window (echoes 515 // arrive within milliseconds) and any messages that might be re-encountered 516 // after compaction. The hook's lastWrittenIndexRef is the primary dedup; 517 // this is a safety net. 518 const recentPostedUUIDs = new BoundedUUIDSet(2000) 519 for (const uuid of initialMessageUUIDs) { 520 recentPostedUUIDs.add(uuid) 521 } 522 523 // Bounded set of INBOUND prompt UUIDs we've already forwarded to the REPL. 524 // Defensive dedup for when the server re-delivers prompts (seq-num 525 // negotiation failure, server edge cases, transport swap races). The 526 // seq-num carryover below is the primary fix; this is the safety net. 527 const recentInboundUUIDs = new BoundedUUIDSet(2000) 528 529 // 7. Start poll loop for work items — this is what makes the session 530 // "live" on claude.ai. When a user types there, the backend dispatches 531 // a work item to our environment. We poll for it, get the ingress token, 532 // and connect the ingress WebSocket. 533 // 534 // The poll loop keeps running: when work arrives it connects the ingress 535 // WebSocket, and if the WebSocket drops unexpectedly (code != 1000) it 536 // resumes polling to get a fresh ingress token and reconnect. 537 const pollController = new AbortController() 538 // Adapter over either HybridTransport (v1: WS reads + POST writes to 539 // Session-Ingress) or SSETransport+CCRClient (v2: SSE reads + POST 540 // writes to CCR /worker/*). The v1/v2 choice is made in onWorkReceived: 541 // server-driven via secret.use_code_sessions, with CLAUDE_BRIDGE_USE_CCR_V2 542 // as an ant-dev override. 543 let transport: ReplBridgeTransport | null = null 544 // Bumped on every onWorkReceived. Captured in createV2ReplTransport's .then() 545 // closure to detect stale resolutions: if two calls race while transport is 546 // null, both registerWorker() (bumping server epoch), and whichever resolves 547 // SECOND is the correct one — but the transport !== null check gets this 548 // backwards (first-to-resolve installs, second discards). The generation 549 // counter catches it independent of transport state. 550 let v2Generation = 0 551 // SSE sequence-number high-water mark carried across transport swaps. 552 // Without this, each new SSETransport starts at 0, sends no 553 // from_sequence_num / Last-Event-ID on its first connect, and the server 554 // replays the entire session event history — every prompt ever sent 555 // re-delivered as fresh inbound messages on every onWorkReceived. 556 // 557 // Seed only when we actually reconnected the prior session. If 558 // `reusedPriorSession` is false we fell through to `createSession()` — 559 // the caller's persisted seq-num belongs to a dead session and applying 560 // it to the fresh stream (starting at 1) silently drops events. Same 561 // hazard as doReconnect Strategy 2; same fix as the reset there. 562 let lastTransportSequenceNum = reusedPriorSession ? initialSSESequenceNum : 0 563 // Track the current work ID so teardown can call stopWork 564 let currentWorkId: string | null = null 565 // Session ingress JWT for the current work item — used for heartbeat auth. 566 let currentIngressToken: string | null = null 567 // Signal to wake the at-capacity sleep early when the transport is lost, 568 // so the poll loop immediately switches back to fast polling for new work. 569 const capacityWake = createCapacityWake(pollController.signal) 570 const wakePollLoop = capacityWake.wake 571 const capacitySignal = capacityWake.signal 572 // Gates message writes during the initial flush to prevent ordering 573 // races where new messages arrive at the server interleaved with history. 574 const flushGate = new FlushGate<Message>() 575 576 // Latch for onUserMessage — flips true when the callback returns true 577 // (policy says "done deriving"). If no callback, skip scanning entirely 578 // (daemon path — no title derivation needed). 579 let userMessageCallbackDone = !onUserMessage 580 581 // Shared counter for environment re-creations, used by both 582 // onEnvironmentLost and the abnormal-close handler. 583 const MAX_ENVIRONMENT_RECREATIONS = 3 584 let environmentRecreations = 0 585 let reconnectPromise: Promise<boolean> | null = null 586 587 /** 588 * Recover from onEnvironmentLost (poll returned 404 — env was reaped 589 * server-side). Tries two strategies in order: 590 * 591 * 1. Reconnect-in-place: idempotent re-register with reuseEnvironmentId 592 * → if the backend returns the same env ID, call reconnectSession() 593 * to re-queue the existing session. currentSessionId stays the same; 594 * the URL on the user's phone stays valid; previouslyFlushedUUIDs is 595 * preserved so history isn't re-sent. 596 * 597 * 2. Fresh session fallback: if the backend returns a different env ID 598 * (original TTL-expired, e.g. laptop slept >4h) or reconnectSession() 599 * throws, archive the old session and create a new one on the 600 * now-registered env. Old behavior before #20460 primitives landed. 601 * 602 * Uses a promise-based reentrancy guard so concurrent callers share the 603 * same reconnection attempt. 604 */ 605 async function reconnectEnvironmentWithSession(): Promise<boolean> { 606 if (reconnectPromise) { 607 return reconnectPromise 608 } 609 reconnectPromise = doReconnect() 610 try { 611 return await reconnectPromise 612 } finally { 613 reconnectPromise = null 614 } 615 } 616 617 async function doReconnect(): Promise<boolean> { 618 environmentRecreations++ 619 // Invalidate any in-flight v2 handshake — the environment is being 620 // recreated, so a stale transport arriving post-reconnect would be 621 // pointed at a dead session. 622 v2Generation++ 623 logForDebugging( 624 `[bridge:repl] Reconnecting after env lost (attempt ${environmentRecreations}/${MAX_ENVIRONMENT_RECREATIONS})`, 625 ) 626 627 if (environmentRecreations > MAX_ENVIRONMENT_RECREATIONS) { 628 logForDebugging( 629 `[bridge:repl] Environment reconnect limit reached (${MAX_ENVIRONMENT_RECREATIONS}), giving up`, 630 ) 631 return false 632 } 633 634 // Close the stale transport. Capture seq BEFORE close — if Strategy 1 635 // (tryReconnectInPlace) succeeds we keep the SAME session, and the 636 // next transport must resume where this one left off, not replay from 637 // the last transport-swap checkpoint. 638 if (transport) { 639 const seq = transport.getLastSequenceNum() 640 if (seq > lastTransportSequenceNum) { 641 lastTransportSequenceNum = seq 642 } 643 transport.close() 644 transport = null 645 } 646 // Transport is gone — wake the poll loop out of its at-capacity 647 // heartbeat sleep so it can fast-poll for re-dispatched work. 648 wakePollLoop() 649 // Reset flush gate so writeMessages() hits the !transport guard 650 // instead of silently queuing into a dead buffer. 651 flushGate.drop() 652 653 // Release the current work item (force=false — we may want the session 654 // back). Best-effort: the env is probably gone, so this likely 404s. 655 if (currentWorkId) { 656 const workIdBeingCleared = currentWorkId 657 await api 658 .stopWork(environmentId, workIdBeingCleared, false) 659 .catch(() => {}) 660 // When doReconnect runs concurrently with the poll loop (ws_closed 661 // handler case — void-called, unlike the awaited onEnvironmentLost 662 // path), onWorkReceived can fire during the stopWork await and set 663 // a fresh currentWorkId. If it did, the poll loop has already 664 // recovered on its own — defer to it rather than proceeding to 665 // archiveSession, which would destroy the session its new 666 // transport is connected to. 667 if (currentWorkId !== workIdBeingCleared) { 668 logForDebugging( 669 '[bridge:repl] Poll loop recovered during stopWork await — deferring to it', 670 ) 671 environmentRecreations = 0 672 return true 673 } 674 currentWorkId = null 675 currentIngressToken = null 676 } 677 678 // Bail out if teardown started while we were awaiting 679 if (pollController.signal.aborted) { 680 logForDebugging('[bridge:repl] Reconnect aborted by teardown') 681 return false 682 } 683 684 // Strategy 1: idempotent re-register with the server-issued env ID. 685 // If the backend resurrects the same env (fresh secret), we can 686 // reconnect the existing session. If it hands back a different ID, the 687 // original env is truly gone and we fall through to a fresh session. 688 const requestedEnvId = environmentId 689 bridgeConfig.reuseEnvironmentId = requestedEnvId 690 try { 691 const reg = await api.registerBridgeEnvironment(bridgeConfig) 692 environmentId = reg.environment_id 693 environmentSecret = reg.environment_secret 694 } catch (err) { 695 bridgeConfig.reuseEnvironmentId = undefined 696 logForDebugging( 697 `[bridge:repl] Environment re-registration failed: ${errorMessage(err)}`, 698 ) 699 return false 700 } 701 // Clear before any await — a stale value would poison the next fresh 702 // registration if doReconnect runs again. 703 bridgeConfig.reuseEnvironmentId = undefined 704 705 logForDebugging( 706 `[bridge:repl] Re-registered: requested=${requestedEnvId} got=${environmentId}`, 707 ) 708 709 // Bail out if teardown started while we were registering 710 if (pollController.signal.aborted) { 711 logForDebugging( 712 '[bridge:repl] Reconnect aborted after env registration, cleaning up', 713 ) 714 await api.deregisterEnvironment(environmentId).catch(() => {}) 715 return false 716 } 717 718 // Same race as above, narrower window: poll loop may have set up a 719 // transport during the registerBridgeEnvironment await. Bail before 720 // tryReconnectInPlace/archiveSession kill it server-side. 721 if (transport !== null) { 722 logForDebugging( 723 '[bridge:repl] Poll loop recovered during registerBridgeEnvironment await — deferring to it', 724 ) 725 environmentRecreations = 0 726 return true 727 } 728 729 // Strategy 1: same helper as perpetual init. currentSessionId stays 730 // the same on success; URL on mobile/web stays valid; 731 // previouslyFlushedUUIDs preserved (no re-flush). 732 if (await tryReconnectInPlace(requestedEnvId, currentSessionId)) { 733 logEvent('tengu_bridge_repl_reconnected_in_place', {}) 734 environmentRecreations = 0 735 return true 736 } 737 // Env differs → TTL-expired/reaped; or reconnect failed. 738 // Don't deregister — we have a fresh secret for this env either way. 739 if (environmentId !== requestedEnvId) { 740 logEvent('tengu_bridge_repl_env_expired_fresh_session', {}) 741 } 742 743 // Strategy 2: fresh session on the now-registered environment. 744 // Archive the old session first — it's orphaned (bound to a dead env, 745 // or reconnectSession rejected it). Don't deregister the env — we just 746 // got a fresh secret for it and are about to use it. 747 await archiveSession(currentSessionId) 748 749 // Bail out if teardown started while we were archiving 750 if (pollController.signal.aborted) { 751 logForDebugging( 752 '[bridge:repl] Reconnect aborted after archive, cleaning up', 753 ) 754 await api.deregisterEnvironment(environmentId).catch(() => {}) 755 return false 756 } 757 758 // Re-read the current title in case the user renamed the session. 759 // REPL wrapper reads session storage; daemon wrapper returns the 760 // original title (nothing to refresh). 761 const currentTitle = getCurrentTitle() 762 763 // Create a new session on the now-registered environment 764 const newSessionId = await createSession({ 765 environmentId, 766 title: currentTitle, 767 gitRepoUrl, 768 branch, 769 signal: AbortSignal.timeout(15_000), 770 }) 771 772 if (!newSessionId) { 773 logForDebugging( 774 '[bridge:repl] Session creation failed during reconnection', 775 ) 776 return false 777 } 778 779 // Bail out if teardown started during session creation (up to 15s) 780 if (pollController.signal.aborted) { 781 logForDebugging( 782 '[bridge:repl] Reconnect aborted after session creation, cleaning up', 783 ) 784 await archiveSession(newSessionId) 785 return false 786 } 787 788 currentSessionId = newSessionId 789 // Re-publish to the PID file so peer dedup (peerRegistry.ts) picks up the 790 // new ID — setReplBridgeHandle only fires at init/teardown, not reconnect. 791 void updateSessionBridgeId(toCompatSessionId(newSessionId)).catch(() => {}) 792 // Reset per-session transport state IMMEDIATELY after the session swap, 793 // before any await. If this runs after `await writeBridgePointer` below, 794 // there's a window where handle.bridgeSessionId already returns session B 795 // but getSSESequenceNum() still returns session A's seq — a daemon 796 // persistState() in that window writes {bridgeSessionId: B, seq: OLD_A}, 797 // which PASSES the session-ID validation check and defeats it entirely. 798 // 799 // The SSE seq-num is scoped to the session's event stream — carrying it 800 // over leaves the transport's lastSequenceNum stuck high (seq only 801 // advances when received > last), and its next internal reconnect would 802 // send from_sequence_num=OLD_SEQ against a stream starting at 1 → all 803 // events in the gap silently dropped. Inbound UUID dedup is also 804 // session-scoped. 805 lastTransportSequenceNum = 0 806 recentInboundUUIDs.clear() 807 // Title derivation is session-scoped too: if the user typed during the 808 // createSession await above, the callback fired against the OLD archived 809 // session ID (PATCH lost) and the new session got `currentTitle` captured 810 // BEFORE they typed. Reset so the next prompt can re-derive. Self- 811 // correcting: if the caller's policy is already done (explicit title or 812 // count ≥ 3), it returns true on the first post-reset call and re-latches. 813 userMessageCallbackDone = !onUserMessage 814 logForDebugging(`[bridge:repl] Re-created session: ${currentSessionId}`) 815 816 // Rewrite the crash-recovery pointer with the new IDs so a crash after 817 // this point resumes the right session. (The reconnect-in-place path 818 // above doesn't touch the pointer — same session, same env.) 819 await writeBridgePointer(dir, { 820 sessionId: currentSessionId, 821 environmentId, 822 source: 'repl', 823 }) 824 825 // Clear flushed UUIDs so initial messages are re-sent to the new session. 826 // UUIDs are scoped per-session on the server, so re-flushing is safe. 827 previouslyFlushedUUIDs?.clear() 828 829 830 // Reset the counter so independent reconnections hours apart don't 831 // exhaust the limit — it guards against rapid consecutive failures, 832 // not lifetime total. 833 environmentRecreations = 0 834 835 return true 836 } 837 838 // Helper: get the current OAuth access token for session ingress auth. 839 // Unlike the JWT path, OAuth tokens are refreshed by the standard OAuth 840 // flow — no proactive scheduler needed. 841 function getOAuthToken(): string | undefined { 842 return getAccessToken() 843 } 844 845 // Drain any messages that were queued during the initial flush. 846 // Called after writeBatch completes (or fails) so queued messages 847 // are sent in order after the historical messages. 848 function drainFlushGate(): void { 849 const msgs = flushGate.end() 850 if (msgs.length === 0) return 851 if (!transport) { 852 logForDebugging( 853 `[bridge:repl] Cannot drain ${msgs.length} pending message(s): no transport`, 854 ) 855 return 856 } 857 for (const msg of msgs) { 858 recentPostedUUIDs.add(msg.uuid) 859 } 860 const sdkMessages = toSDKMessages(msgs) 861 const events = sdkMessages.map(sdkMsg => ({ 862 ...sdkMsg, 863 session_id: currentSessionId, 864 })) 865 logForDebugging( 866 `[bridge:repl] Drained ${msgs.length} pending message(s) after flush`, 867 ) 868 void transport.writeBatch(events) 869 } 870 871 // Teardown reference — set after definition below. All callers are async 872 // callbacks that run after assignment, so the reference is always valid. 873 let doTeardownImpl: (() => Promise<void>) | null = null 874 function triggerTeardown(): void { 875 void doTeardownImpl?.() 876 } 877 878 /** 879 * Body of the transport's setOnClose callback, hoisted to initBridgeCore 880 * scope so /bridge-kick can fire it directly. setOnClose wraps this with 881 * a stale-transport guard; debugFireClose calls it bare. 882 * 883 * With autoReconnect:true, this only fires on: clean close (1000), 884 * permanent server rejection (4001/1002/4003), or 10-min budget 885 * exhaustion. Transient drops are retried internally by the transport. 886 */ 887 function handleTransportPermanentClose(closeCode: number | undefined): void { 888 logForDebugging( 889 `[bridge:repl] Transport permanently closed: code=${closeCode}`, 890 ) 891 logEvent('tengu_bridge_repl_ws_closed', { 892 code: closeCode, 893 }) 894 // Capture SSE seq high-water mark before nulling. When called from 895 // setOnClose the guard guarantees transport !== null; when fired from 896 // /bridge-kick it may already be null (e.g. fired twice) — skip. 897 if (transport) { 898 const closedSeq = transport.getLastSequenceNum() 899 if (closedSeq > lastTransportSequenceNum) { 900 lastTransportSequenceNum = closedSeq 901 } 902 transport = null 903 } 904 // Transport is gone — wake the poll loop out of its at-capacity 905 // heartbeat sleep so it's fast-polling by the time the reconnect 906 // below completes and the server re-queues work. 907 wakePollLoop() 908 // Reset flush state so writeMessages() hits the !transport guard 909 // (with a warning log) instead of silently queuing into a buffer 910 // that will never be drained. Unlike onWorkReceived (which 911 // preserves pending messages for the new transport), onClose is 912 // a permanent close — no new transport will drain these. 913 const dropped = flushGate.drop() 914 if (dropped > 0) { 915 logForDebugging( 916 `[bridge:repl] Dropping ${dropped} pending message(s) on transport close (code=${closeCode})`, 917 { level: 'warn' }, 918 ) 919 } 920 921 if (closeCode === 1000) { 922 // Clean close — session ended normally. Tear down the bridge. 923 onStateChange?.('failed', 'session ended') 924 pollController.abort() 925 triggerTeardown() 926 return 927 } 928 929 // Transport reconnect budget exhausted or permanent server 930 // rejection. By this point the env has usually been reaped 931 // server-side (BQ 2026-03-12: ~98% of ws_closed never recover 932 // via poll alone). stopWork(force=false) can't re-dispatch work 933 // from an archived env; reconnectEnvironmentWithSession can 934 // re-activate it via POST /bridge/reconnect, or fall through 935 // to a fresh session if the env is truly gone. The poll loop 936 // (already woken above) picks up the re-queued work once 937 // doReconnect completes. 938 onStateChange?.( 939 'reconnecting', 940 `Remote Control connection lost (code ${closeCode})`, 941 ) 942 logForDebugging( 943 `[bridge:repl] Transport reconnect budget exhausted (code=${closeCode}), attempting env reconnect`, 944 ) 945 void reconnectEnvironmentWithSession().then(success => { 946 if (success) return 947 // doReconnect has four abort-check return-false sites for 948 // teardown-in-progress. Don't pollute the BQ failure signal 949 // or double-teardown when the user just quit. 950 if (pollController.signal.aborted) return 951 // doReconnect returns false (never throws) on genuine failure. 952 // The dangerous case: registerBridgeEnvironment succeeded (so 953 // environmentId now points at a fresh valid env) but 954 // createSession failed — poll loop would poll a sessionless 955 // env getting null work with no errors, never hitting any 956 // give-up path. Tear down explicitly. 957 logForDebugging( 958 '[bridge:repl] reconnectEnvironmentWithSession resolved false — tearing down', 959 ) 960 logEvent('tengu_bridge_repl_reconnect_failed', { 961 close_code: closeCode, 962 }) 963 onStateChange?.('failed', 'reconnection failed') 964 triggerTeardown() 965 }) 966 } 967 968 // Ant-only: SIGUSR2 → force doReconnect() for manual testing. Skips the 969 // ~30s poll wait — fire-and-observe in the debug log immediately. 970 // Windows has no USR signals; `process.on` would throw there. 971 let sigusr2Handler: (() => void) | undefined 972 if (process.env.USER_TYPE === 'ant' && process.platform !== 'win32') { 973 sigusr2Handler = () => { 974 logForDebugging( 975 '[bridge:repl] SIGUSR2 received — forcing doReconnect() for testing', 976 ) 977 void reconnectEnvironmentWithSession() 978 } 979 process.on('SIGUSR2', sigusr2Handler) 980 } 981 982 // Ant-only: /bridge-kick fault injection. handleTransportPermanentClose 983 // is defined below and assigned into this slot so the slash command can 984 // invoke it directly — the real setOnClose callback is buried inside 985 // wireTransport which is itself inside onWorkReceived. 986 let debugFireClose: ((code: number) => void) | null = null 987 if (process.env.USER_TYPE === 'ant') { 988 registerBridgeDebugHandle({ 989 fireClose: code => { 990 if (!debugFireClose) { 991 logForDebugging('[bridge:debug] fireClose: no transport wired yet') 992 return 993 } 994 logForDebugging(`[bridge:debug] fireClose(${code}) — injecting`) 995 debugFireClose(code) 996 }, 997 forceReconnect: () => { 998 logForDebugging('[bridge:debug] forceReconnect — injecting') 999 void reconnectEnvironmentWithSession() 1000 }, 1001 injectFault: injectBridgeFault, 1002 wakePollLoop, 1003 describe: () => 1004 `env=${environmentId} session=${currentSessionId} transport=${transport?.getStateLabel() ?? 'null'} workId=${currentWorkId ?? 'null'}`, 1005 }) 1006 } 1007 1008 const pollOpts = { 1009 api, 1010 getCredentials: () => ({ environmentId, environmentSecret }), 1011 signal: pollController.signal, 1012 getPollIntervalConfig, 1013 onStateChange, 1014 getWsState: () => transport?.getStateLabel() ?? 'null', 1015 // REPL bridge is single-session: having any transport == at capacity. 1016 // No need to check isConnectedStatus() — even while the transport is 1017 // auto-reconnecting internally (up to 10 min), poll is heartbeat-only. 1018 isAtCapacity: () => transport !== null, 1019 capacitySignal, 1020 onFatalError: triggerTeardown, 1021 getHeartbeatInfo: () => { 1022 if (!currentWorkId || !currentIngressToken) { 1023 return null 1024 } 1025 return { 1026 environmentId, 1027 workId: currentWorkId, 1028 sessionToken: currentIngressToken, 1029 } 1030 }, 1031 // Work-item JWT expired (or work gone). The transport is useless — 1032 // SSE reconnects and CCR writes use the same stale token. Without 1033 // this callback the poll loop would do a 10-min at-capacity backoff, 1034 // during which the work lease (300s TTL) expires and the server stops 1035 // forwarding prompts → ~25-min dead window observed in daemon logs. 1036 // Kill the transport + work state so isAtCapacity()=false; the loop 1037 // fast-polls and picks up the server's re-dispatched work in seconds. 1038 onHeartbeatFatal: (err: BridgeFatalError) => { 1039 logForDebugging( 1040 `[bridge:repl] heartbeatWork fatal (status=${err.status}) — tearing down work item for fast re-dispatch`, 1041 ) 1042 if (transport) { 1043 const seq = transport.getLastSequenceNum() 1044 if (seq > lastTransportSequenceNum) { 1045 lastTransportSequenceNum = seq 1046 } 1047 transport.close() 1048 transport = null 1049 } 1050 flushGate.drop() 1051 // force=false → server re-queues. Likely already expired, but 1052 // idempotent and makes re-dispatch immediate if not. 1053 if (currentWorkId) { 1054 void api 1055 .stopWork(environmentId, currentWorkId, false) 1056 .catch((e: unknown) => { 1057 logForDebugging( 1058 `[bridge:repl] stopWork after heartbeat fatal: ${errorMessage(e)}`, 1059 ) 1060 }) 1061 } 1062 currentWorkId = null 1063 currentIngressToken = null 1064 wakePollLoop() 1065 onStateChange?.( 1066 'reconnecting', 1067 'Work item lease expired, fetching fresh token', 1068 ) 1069 }, 1070 async onEnvironmentLost() { 1071 const success = await reconnectEnvironmentWithSession() 1072 if (!success) { 1073 return null 1074 } 1075 return { environmentId, environmentSecret } 1076 }, 1077 onWorkReceived: ( 1078 workSessionId: string, 1079 ingressToken: string, 1080 workId: string, 1081 serverUseCcrV2: boolean, 1082 ) => { 1083 // When new work arrives while a transport is already open, the 1084 // server has decided to re-dispatch (e.g. token rotation, server 1085 // restart). Close the existing transport and reconnect — discarding 1086 // the work causes a stuck 'reconnecting' state if the old WS dies 1087 // shortly after (the server won't re-dispatch a work item it 1088 // already delivered). 1089 // ingressToken (JWT) is stored for heartbeat auth (both v1 and v2). 1090 // Transport auth diverges — see the v1/v2 split below. 1091 if (transport?.isConnectedStatus()) { 1092 logForDebugging( 1093 `[bridge:repl] Work received while transport connected, replacing with fresh token (workId=${workId})`, 1094 ) 1095 } 1096 1097 logForDebugging( 1098 `[bridge:repl] Work received: workId=${workId} workSessionId=${workSessionId} currentSessionId=${currentSessionId} match=${sameSessionId(workSessionId, currentSessionId)}`, 1099 ) 1100 1101 // Refresh the crash-recovery pointer's mtime. Staleness checks file 1102 // mtime (not embedded timestamp) so this re-write bumps the clock — 1103 // a 5h+ session that crashes still has a fresh pointer. Fires once 1104 // per work dispatch (infrequent — bounded by user message rate). 1105 void writeBridgePointer(dir, { 1106 sessionId: currentSessionId, 1107 environmentId, 1108 source: 'repl', 1109 }) 1110 1111 // Reject foreign session IDs — the server shouldn't assign sessions 1112 // from other environments. Since we create env+session as a pair, 1113 // a mismatch indicates an unexpected server-side reassignment. 1114 // 1115 // Compare by underlying UUID, not by tagged-ID prefix. When CCR 1116 // v2's compat layer serves the session, createBridgeSession gets 1117 // session_* from the v1-facing API (compat/convert.go:41) but the 1118 // infrastructure layer delivers cse_* in the work queue 1119 // (container_manager.go:129). Same UUID, different tag. 1120 if (!sameSessionId(workSessionId, currentSessionId)) { 1121 logForDebugging( 1122 `[bridge:repl] Rejecting foreign session: expected=${currentSessionId} got=${workSessionId}`, 1123 ) 1124 return 1125 } 1126 1127 currentWorkId = workId 1128 currentIngressToken = ingressToken 1129 1130 // Server decides per-session (secret.use_code_sessions from the work 1131 // secret, threaded through runWorkPollLoop). The env var is an ant-dev 1132 // override for forcing v2 before the server flag is on for your user — 1133 // requires ccr_v2_compat_enabled server-side or registerWorker 404s. 1134 // 1135 // Kept separate from CLAUDE_CODE_USE_CCR_V2 (the child-SDK transport 1136 // selector set by sessionRunner/environment-manager) to avoid the 1137 // inheritance hazard in spawn mode where the parent's orchestrator 1138 // var would leak into a v1 child. 1139 const useCcrV2 = 1140 serverUseCcrV2 || isEnvTruthy(process.env.CLAUDE_BRIDGE_USE_CCR_V2) 1141 1142 // Auth is the one place v1 and v2 diverge hard: 1143 // 1144 // - v1 (Session-Ingress): accepts OAuth OR JWT. We prefer OAuth 1145 // because the standard OAuth refresh flow handles expiry — no 1146 // separate JWT refresh scheduler needed. 1147 // 1148 // - v2 (CCR /worker/*): REQUIRES the JWT. register_worker.go:32 1149 // validates the session_id claim, which OAuth tokens don't carry. 1150 // The JWT from the work secret has both that claim and the worker 1151 // role (environment_auth.py:856). JWT refresh: when it expires the 1152 // server re-dispatches work with a fresh one, and onWorkReceived 1153 // fires again. createV2ReplTransport stores it via 1154 // updateSessionIngressAuthToken() before touching the network. 1155 let v1OauthToken: string | undefined 1156 if (!useCcrV2) { 1157 v1OauthToken = getOAuthToken() 1158 if (!v1OauthToken) { 1159 logForDebugging( 1160 '[bridge:repl] No OAuth token available for session ingress, skipping work', 1161 ) 1162 return 1163 } 1164 updateSessionIngressAuthToken(v1OauthToken) 1165 } 1166 logEvent('tengu_bridge_repl_work_received', {}) 1167 1168 // Close the previous transport. Nullify BEFORE calling close() so 1169 // the close callback doesn't treat the programmatic close as 1170 // "session ended normally" and trigger a full teardown. 1171 if (transport) { 1172 const oldTransport = transport 1173 transport = null 1174 // Capture the SSE sequence high-water mark so the next transport 1175 // resumes the stream instead of replaying from seq 0. Use max() — 1176 // a transport that died early (never received any frames) would 1177 // otherwise reset a non-zero mark back to 0. 1178 const oldSeq = oldTransport.getLastSequenceNum() 1179 if (oldSeq > lastTransportSequenceNum) { 1180 lastTransportSequenceNum = oldSeq 1181 } 1182 oldTransport.close() 1183 } 1184 // Reset flush state — the old flush (if any) is no longer relevant. 1185 // Preserve pending messages so they're drained after the new 1186 // transport's flush completes (the hook has already advanced its 1187 // lastWrittenIndex and won't re-send them). 1188 flushGate.deactivate() 1189 1190 // Closure adapter over the shared handleServerControlRequest — 1191 // captures transport/currentSessionId so the transport.setOnData 1192 // callback below doesn't need to thread them through. 1193 const onServerControlRequest = (request: SDKControlRequest): void => 1194 handleServerControlRequest(request, { 1195 transport, 1196 sessionId: currentSessionId, 1197 onInterrupt, 1198 onSetModel, 1199 onSetMaxThinkingTokens, 1200 onSetPermissionMode, 1201 }) 1202 1203 let initialFlushDone = false 1204 1205 // Wire callbacks onto a freshly constructed transport and connect. 1206 // Extracted so the (sync) v1 and (async) v2 construction paths can 1207 // share the identical callback + flush machinery. 1208 const wireTransport = (newTransport: ReplBridgeTransport): void => { 1209 transport = newTransport 1210 1211 newTransport.setOnConnect(() => { 1212 // Guard: if transport was replaced by a newer onWorkReceived call 1213 // while the WS was connecting, ignore this stale callback. 1214 if (transport !== newTransport) return 1215 1216 logForDebugging('[bridge:repl] Ingress transport connected') 1217 logEvent('tengu_bridge_repl_ws_connected', {}) 1218 1219 // Update the env var with the latest OAuth token so POST writes 1220 // (which read via getSessionIngressAuthToken()) use a fresh token. 1221 // v2 skips this — createV2ReplTransport already stored the JWT, 1222 // and overwriting it with OAuth would break subsequent /worker/* 1223 // requests (session_id claim check). 1224 if (!useCcrV2) { 1225 const freshToken = getOAuthToken() 1226 if (freshToken) { 1227 updateSessionIngressAuthToken(freshToken) 1228 } 1229 } 1230 1231 // Reset teardownStarted so future teardowns are not blocked. 1232 teardownStarted = false 1233 1234 // Flush initial messages only on first connect, not on every 1235 // WS reconnection. Re-flushing would cause duplicate messages. 1236 // IMPORTANT: onStateChange('connected') is deferred until the 1237 // flush completes. This prevents writeMessages() from sending 1238 // new messages that could arrive at the server interleaved with 1239 // the historical messages, and delays the web UI from showing 1240 // the session as active until history is persisted. 1241 if ( 1242 !initialFlushDone && 1243 initialMessages && 1244 initialMessages.length > 0 1245 ) { 1246 initialFlushDone = true 1247 1248 // Cap the initial flush to the most recent N messages. The full 1249 // history is UI-only (model doesn't see it) and large replays cause 1250 // slow session-ingress persistence (each event is a threadstore write) 1251 // plus elevated Firestore pressure. A 0 or negative cap disables it. 1252 const historyCap = initialHistoryCap 1253 const eligibleMessages = initialMessages.filter( 1254 m => 1255 isEligibleBridgeMessage(m) && 1256 !previouslyFlushedUUIDs?.has(m.uuid), 1257 ) 1258 const cappedMessages = 1259 historyCap > 0 && eligibleMessages.length > historyCap 1260 ? eligibleMessages.slice(-historyCap) 1261 : eligibleMessages 1262 if (cappedMessages.length < eligibleMessages.length) { 1263 logForDebugging( 1264 `[bridge:repl] Capped initial flush: ${eligibleMessages.length} -> ${cappedMessages.length} (cap=${historyCap})`, 1265 ) 1266 logEvent('tengu_bridge_repl_history_capped', { 1267 eligible_count: eligibleMessages.length, 1268 capped_count: cappedMessages.length, 1269 }) 1270 } 1271 const sdkMessages = toSDKMessages(cappedMessages) 1272 if (sdkMessages.length > 0) { 1273 logForDebugging( 1274 `[bridge:repl] Flushing ${sdkMessages.length} initial message(s) via transport`, 1275 ) 1276 const events = sdkMessages.map(sdkMsg => ({ 1277 ...sdkMsg, 1278 session_id: currentSessionId, 1279 })) 1280 const dropsBefore = newTransport.droppedBatchCount 1281 void newTransport 1282 .writeBatch(events) 1283 .then(() => { 1284 // If any batch was dropped during this flush (SI down for 1285 // maxConsecutiveFailures attempts), flush() still resolved 1286 // normally but the events were NOT delivered. Don't mark 1287 // UUIDs as flushed — keep them eligible for re-send on the 1288 // next onWorkReceived (JWT refresh re-dispatch, line ~1144). 1289 if (newTransport.droppedBatchCount > dropsBefore) { 1290 logForDebugging( 1291 `[bridge:repl] Initial flush dropped ${newTransport.droppedBatchCount - dropsBefore} batch(es) — not marking ${sdkMessages.length} UUID(s) as flushed`, 1292 ) 1293 return 1294 } 1295 if (previouslyFlushedUUIDs) { 1296 for (const sdkMsg of sdkMessages) { 1297 if (sdkMsg.uuid) { 1298 previouslyFlushedUUIDs.add(sdkMsg.uuid) 1299 } 1300 } 1301 } 1302 }) 1303 .catch(e => 1304 logForDebugging(`[bridge:repl] Initial flush failed: ${e}`), 1305 ) 1306 .finally(() => { 1307 // Guard: if transport was replaced during the flush, 1308 // don't signal connected or drain — the new transport 1309 // owns the lifecycle now. 1310 if (transport !== newTransport) return 1311 drainFlushGate() 1312 onStateChange?.('connected') 1313 }) 1314 } else { 1315 // All initial messages were already flushed (filtered by 1316 // previouslyFlushedUUIDs). No flush POST needed — clear 1317 // the flag and signal connected immediately. This is the 1318 // first connect for this transport (inside !initialFlushDone), 1319 // so no flush POST is in-flight — the flag was set before 1320 // connect() and must be cleared here. 1321 drainFlushGate() 1322 onStateChange?.('connected') 1323 } 1324 } else if (!flushGate.active) { 1325 // No initial messages or already flushed on first connect. 1326 // WS auto-reconnect path — only signal connected if no flush 1327 // POST is in-flight. If one is, .finally() owns the lifecycle. 1328 onStateChange?.('connected') 1329 } 1330 }) 1331 1332 newTransport.setOnData(data => { 1333 handleIngressMessage( 1334 data, 1335 recentPostedUUIDs, 1336 recentInboundUUIDs, 1337 onInboundMessage, 1338 onPermissionResponse, 1339 onServerControlRequest, 1340 ) 1341 }) 1342 1343 // Body lives at initBridgeCore scope so /bridge-kick can call it 1344 // directly via debugFireClose. All referenced closures (transport, 1345 // wakePollLoop, flushGate, reconnectEnvironmentWithSession, etc.) 1346 // are already at that scope. The only lexical dependency on 1347 // wireTransport was `newTransport.getLastSequenceNum()` — but after 1348 // the guard below passes we know transport === newTransport. 1349 debugFireClose = handleTransportPermanentClose 1350 newTransport.setOnClose(closeCode => { 1351 // Guard: if transport was replaced, ignore stale close. 1352 if (transport !== newTransport) return 1353 handleTransportPermanentClose(closeCode) 1354 }) 1355 1356 // Start the flush gate before connect() to cover the WS handshake 1357 // window. Between transport assignment and setOnConnect firing, 1358 // writeMessages() could send messages via HTTP POST before the 1359 // initial flush starts. Starting the gate here ensures those 1360 // calls are queued. If there are no initial messages, the gate 1361 // stays inactive. 1362 if ( 1363 !initialFlushDone && 1364 initialMessages && 1365 initialMessages.length > 0 1366 ) { 1367 flushGate.start() 1368 } 1369 1370 newTransport.connect() 1371 } // end wireTransport 1372 1373 // Bump unconditionally — ANY new transport (v1 or v2) invalidates an 1374 // in-flight v2 handshake. Also bumped in doReconnect(). 1375 v2Generation++ 1376 1377 if (useCcrV2) { 1378 // workSessionId is the cse_* form (infrastructure-layer ID from the 1379 // work queue), which is what /v1/code/sessions/{id}/worker/* wants. 1380 // The session_* form (currentSessionId) is NOT usable here — 1381 // handler/convert.go:30 validates TagCodeSession. 1382 const sessionUrl = buildCCRv2SdkUrl(baseUrl, workSessionId) 1383 const thisGen = v2Generation 1384 logForDebugging( 1385 `[bridge:repl] CCR v2: sessionUrl=${sessionUrl} session=${workSessionId} gen=${thisGen}`, 1386 ) 1387 void createV2ReplTransport({ 1388 sessionUrl, 1389 ingressToken, 1390 sessionId: workSessionId, 1391 initialSequenceNum: lastTransportSequenceNum, 1392 }).then( 1393 t => { 1394 // Teardown started while registerWorker was in flight. Teardown 1395 // saw transport === null and skipped close(); installing now 1396 // would leak CCRClient heartbeat timers and reset 1397 // teardownStarted via wireTransport's side effects. 1398 if (pollController.signal.aborted) { 1399 t.close() 1400 return 1401 } 1402 // onWorkReceived may have fired again while registerWorker() 1403 // was in flight (server re-dispatch with a fresh JWT). The 1404 // transport !== null check alone gets the race wrong when BOTH 1405 // attempts saw transport === null — it keeps the first resolver 1406 // (stale epoch) and discards the second (correct epoch). The 1407 // generation check catches it regardless of transport state. 1408 if (thisGen !== v2Generation) { 1409 logForDebugging( 1410 `[bridge:repl] CCR v2: discarding stale handshake gen=${thisGen} current=${v2Generation}`, 1411 ) 1412 t.close() 1413 return 1414 } 1415 wireTransport(t) 1416 }, 1417 (err: unknown) => { 1418 logForDebugging( 1419 `[bridge:repl] CCR v2: createV2ReplTransport failed: ${errorMessage(err)}`, 1420 { level: 'error' }, 1421 ) 1422 logEvent('tengu_bridge_repl_ccr_v2_init_failed', {}) 1423 // If a newer attempt is in flight or already succeeded, don't 1424 // touch its work item — our failure is irrelevant. 1425 if (thisGen !== v2Generation) return 1426 // Release the work item so the server re-dispatches immediately 1427 // instead of waiting for its own timeout. currentWorkId was set 1428 // above; without this, the session looks stuck to the user. 1429 if (currentWorkId) { 1430 void api 1431 .stopWork(environmentId, currentWorkId, false) 1432 .catch((e: unknown) => { 1433 logForDebugging( 1434 `[bridge:repl] stopWork after v2 init failure: ${errorMessage(e)}`, 1435 ) 1436 }) 1437 currentWorkId = null 1438 currentIngressToken = null 1439 } 1440 wakePollLoop() 1441 }, 1442 ) 1443 } else { 1444 // v1: HybridTransport (WS reads + POST writes to Session-Ingress). 1445 // autoReconnect is true (default) — when the WS dies, the transport 1446 // reconnects automatically with exponential backoff. POST writes 1447 // continue during reconnection (they use getSessionIngressAuthToken() 1448 // independently of WS state). The poll loop remains as a secondary 1449 // fallback if the reconnect budget is exhausted (10 min). 1450 // 1451 // Auth: uses OAuth tokens directly instead of the JWT from the work 1452 // secret. refreshHeaders picks up the latest OAuth token on each 1453 // WS reconnect attempt. 1454 const wsUrl = buildSdkUrl(sessionIngressUrl, workSessionId) 1455 logForDebugging(`[bridge:repl] Ingress URL: ${wsUrl}`) 1456 logForDebugging( 1457 `[bridge:repl] Creating HybridTransport: session=${workSessionId}`, 1458 ) 1459 // v1OauthToken was validated non-null above (we'd have returned early). 1460 const oauthToken = v1OauthToken ?? '' 1461 wireTransport( 1462 createV1ReplTransport( 1463 new HybridTransport( 1464 new URL(wsUrl), 1465 { 1466 Authorization: `Bearer ${oauthToken}`, 1467 'anthropic-version': '2023-06-01', 1468 }, 1469 workSessionId, 1470 () => ({ 1471 Authorization: `Bearer ${getOAuthToken() ?? oauthToken}`, 1472 'anthropic-version': '2023-06-01', 1473 }), 1474 // Cap retries so a persistently-failing session-ingress can't 1475 // pin the uploader drain loop for the lifetime of the bridge. 1476 // 50 attempts ≈ 20 min (15s POST timeout + 8s backoff + jitter 1477 // per cycle at steady state). Bridge-only — 1P keeps indefinite. 1478 { 1479 maxConsecutiveFailures: 50, 1480 isBridge: true, 1481 onBatchDropped: () => { 1482 onStateChange?.( 1483 'reconnecting', 1484 'Lost sync with Remote Control — events could not be delivered', 1485 ) 1486 // SI has been down ~20 min. Wake the poll loop so that when 1487 // SI recovers, next poll → onWorkReceived → fresh transport 1488 // → initial flush succeeds → onStateChange('connected') at 1489 // ~line 1420. Without this, state stays 'reconnecting' even 1490 // after SI recovers — daemon.ts:437 denies all permissions, 1491 // useReplBridge.ts:311 keeps replBridgeSessionActive=false. 1492 // If the env was archived during the outage, poll 404 → 1493 // onEnvironmentLost recovery path handles it. 1494 wakePollLoop() 1495 }, 1496 }, 1497 ), 1498 ), 1499 ) 1500 } 1501 }, 1502 } 1503 void startWorkPollLoop(pollOpts) 1504 1505 // Perpetual mode: hourly mtime refresh of the crash-recovery pointer. 1506 // The onWorkReceived refresh only fires per user prompt — a 1507 // daemon idle for >4h would have a stale pointer, and the next restart 1508 // would clear it (readBridgePointer TTL check) → fresh session. The 1509 // standalone bridge (bridgeMain.ts) has an identical hourly timer. 1510 const pointerRefreshTimer = perpetual 1511 ? setInterval(() => { 1512 // doReconnect() reassigns currentSessionId/environmentId non- 1513 // atomically (env at ~:634, session at ~:719, awaits in between). 1514 // If this timer fires in that window, its fire-and-forget write can 1515 // race with (and overwrite) doReconnect's own pointer write at ~:740, 1516 // leaving the pointer at the now-archived old session. doReconnect 1517 // writes the pointer itself, so skipping here is free. 1518 if (reconnectPromise) return 1519 void writeBridgePointer(dir, { 1520 sessionId: currentSessionId, 1521 environmentId, 1522 source: 'repl', 1523 }) 1524 }, 60 * 60_000) 1525 : null 1526 pointerRefreshTimer?.unref?.() 1527 1528 // Push a silent keep_alive frame on a fixed interval so upstream proxies 1529 // and the session-ingress layer don't GC an otherwise-idle remote control 1530 // session. The keep_alive type is filtered before reaching any client UI 1531 // (Query.ts drops it; web/iOS/Android never see it in their message loop). 1532 // Interval comes from GrowthBook (tengu_bridge_poll_interval_config 1533 // session_keepalive_interval_v2_ms, default 120s); 0 = disabled. 1534 const keepAliveIntervalMs = 1535 getPollIntervalConfig().session_keepalive_interval_v2_ms 1536 const keepAliveTimer = 1537 keepAliveIntervalMs > 0 1538 ? setInterval(() => { 1539 if (!transport) return 1540 logForDebugging('[bridge:repl] keep_alive sent') 1541 void transport.write({ type: 'keep_alive' }).catch((err: unknown) => { 1542 logForDebugging( 1543 `[bridge:repl] keep_alive write failed: ${errorMessage(err)}`, 1544 ) 1545 }) 1546 }, keepAliveIntervalMs) 1547 : null 1548 keepAliveTimer?.unref?.() 1549 1550 // Shared teardown sequence used by both cleanup registration and 1551 // the explicit teardown() method on the returned handle. 1552 let teardownStarted = false 1553 doTeardownImpl = async (): Promise<void> => { 1554 if (teardownStarted) { 1555 logForDebugging( 1556 `[bridge:repl] Teardown already in progress, skipping duplicate call env=${environmentId} session=${currentSessionId}`, 1557 ) 1558 return 1559 } 1560 teardownStarted = true 1561 const teardownStart = Date.now() 1562 logForDebugging( 1563 `[bridge:repl] Teardown starting: env=${environmentId} session=${currentSessionId} workId=${currentWorkId ?? 'none'} transportState=${transport?.getStateLabel() ?? 'null'}`, 1564 ) 1565 1566 if (pointerRefreshTimer !== null) { 1567 clearInterval(pointerRefreshTimer) 1568 } 1569 if (keepAliveTimer !== null) { 1570 clearInterval(keepAliveTimer) 1571 } 1572 if (sigusr2Handler) { 1573 process.off('SIGUSR2', sigusr2Handler) 1574 } 1575 if (process.env.USER_TYPE === 'ant') { 1576 clearBridgeDebugHandle() 1577 debugFireClose = null 1578 } 1579 pollController.abort() 1580 logForDebugging('[bridge:repl] Teardown: poll loop aborted') 1581 1582 // Capture the live transport's seq BEFORE close() — close() is sync 1583 // (just aborts the SSE fetch) and does NOT invoke onClose, so the 1584 // setOnClose capture path never runs for explicit teardown. 1585 // Without this, getSSESequenceNum() after teardown returns the stale 1586 // lastTransportSequenceNum (captured at the last transport swap), and 1587 // daemon callers persisting that value lose all events since then. 1588 if (transport) { 1589 const finalSeq = transport.getLastSequenceNum() 1590 if (finalSeq > lastTransportSequenceNum) { 1591 lastTransportSequenceNum = finalSeq 1592 } 1593 } 1594 1595 if (perpetual) { 1596 // Perpetual teardown is LOCAL-ONLY — do not send result, do not call 1597 // stopWork, do not close the transport. All of those signal the 1598 // server (and any mobile/attach subscribers) that the session is 1599 // ending. Instead: stop polling, let the socket die with the 1600 // process; the backend times the work-item lease back to pending on 1601 // its own (TTL 300s). Next daemon start reads the pointer and 1602 // reconnectSession re-queues work. 1603 transport = null 1604 flushGate.drop() 1605 // Refresh the pointer mtime so that sessions lasting longer than 1606 // BRIDGE_POINTER_TTL_MS (4h) don't appear stale on next start. 1607 await writeBridgePointer(dir, { 1608 sessionId: currentSessionId, 1609 environmentId, 1610 source: 'repl', 1611 }) 1612 logForDebugging( 1613 `[bridge:repl] Teardown (perpetual): leaving env=${environmentId} session=${currentSessionId} alive on server, duration=${Date.now() - teardownStart}ms`, 1614 ) 1615 return 1616 } 1617 1618 // Fire the result message, then archive, THEN close. transport.write() 1619 // only enqueues (SerialBatchEventUploader resolves on buffer-add); the 1620 // stopWork/archive latency (~200-500ms) is the drain window for the 1621 // result POST. Closing BEFORE archive meant relying on HybridTransport's 1622 // void-ed 3s grace period, which nothing awaits — forceExit can kill the 1623 // socket mid-POST. Same reorder as remoteBridgeCore.ts teardown (#22803). 1624 const teardownTransport = transport 1625 transport = null 1626 flushGate.drop() 1627 if (teardownTransport) { 1628 void teardownTransport.write(makeResultMessage(currentSessionId)) 1629 } 1630 1631 const stopWorkP = currentWorkId 1632 ? api 1633 .stopWork(environmentId, currentWorkId, true) 1634 .then(() => { 1635 logForDebugging('[bridge:repl] Teardown: stopWork completed') 1636 }) 1637 .catch((err: unknown) => { 1638 logForDebugging( 1639 `[bridge:repl] Teardown stopWork failed: ${errorMessage(err)}`, 1640 ) 1641 }) 1642 : Promise.resolve() 1643 1644 // Run stopWork and archiveSession in parallel. gracefulShutdown.ts:407 1645 // races runCleanupFunctions() against 2s (NOT the 5s outer failsafe), 1646 // so archive is capped at 1.5s at the injection site to stay under budget. 1647 // archiveSession is contractually no-throw; the injected implementations 1648 // log their own success/failure internally. 1649 await Promise.all([stopWorkP, archiveSession(currentSessionId)]) 1650 1651 teardownTransport?.close() 1652 logForDebugging('[bridge:repl] Teardown: transport closed') 1653 1654 await api.deregisterEnvironment(environmentId).catch((err: unknown) => { 1655 logForDebugging( 1656 `[bridge:repl] Teardown deregister failed: ${errorMessage(err)}`, 1657 ) 1658 }) 1659 1660 // Clear the crash-recovery pointer — explicit disconnect or clean REPL 1661 // exit means the user is done with this session. Crash/kill-9 never 1662 // reaches this line, leaving the pointer for next-launch recovery. 1663 await clearBridgePointer(dir) 1664 1665 logForDebugging( 1666 `[bridge:repl] Teardown complete: env=${environmentId} duration=${Date.now() - teardownStart}ms`, 1667 ) 1668 } 1669 1670 // 8. Register cleanup for graceful shutdown 1671 const unregister = registerCleanup(() => doTeardownImpl?.()) 1672 1673 logForDebugging( 1674 `[bridge:repl] Ready: env=${environmentId} session=${currentSessionId}`, 1675 ) 1676 onStateChange?.('ready') 1677 1678 return { 1679 get bridgeSessionId() { 1680 return currentSessionId 1681 }, 1682 get environmentId() { 1683 return environmentId 1684 }, 1685 getSSESequenceNum() { 1686 // lastTransportSequenceNum only updates when a transport is CLOSED 1687 // (captured at swap/onClose). During normal operation the CURRENT 1688 // transport's live seq isn't reflected there. Merge both so callers 1689 // (e.g. daemon persistState()) get the actual high-water mark. 1690 const live = transport?.getLastSequenceNum() ?? 0 1691 return Math.max(lastTransportSequenceNum, live) 1692 }, 1693 sessionIngressUrl, 1694 writeMessages(messages) { 1695 // Filter to user/assistant messages that haven't already been sent. 1696 // Two layers of dedup: 1697 // - initialMessageUUIDs: messages sent as session creation events 1698 // - recentPostedUUIDs: messages recently sent via POST 1699 const filtered = messages.filter( 1700 m => 1701 isEligibleBridgeMessage(m) && 1702 !initialMessageUUIDs.has(m.uuid) && 1703 !recentPostedUUIDs.has(m.uuid), 1704 ) 1705 if (filtered.length === 0) return 1706 1707 // Fire onUserMessage for title derivation. Scan before the flushGate 1708 // check — prompts are title-worthy even if they queue behind the 1709 // initial history flush. Keeps calling on every title-worthy message 1710 // until the callback returns true; the caller owns the policy. 1711 if (!userMessageCallbackDone) { 1712 for (const m of filtered) { 1713 const text = extractTitleText(m) 1714 if (text !== undefined && onUserMessage?.(text, currentSessionId)) { 1715 userMessageCallbackDone = true 1716 break 1717 } 1718 } 1719 } 1720 1721 // Queue messages while the initial flush is in progress to prevent 1722 // them from arriving at the server interleaved with history. 1723 if (flushGate.enqueue(...filtered)) { 1724 logForDebugging( 1725 `[bridge:repl] Queued ${filtered.length} message(s) during initial flush`, 1726 ) 1727 return 1728 } 1729 1730 if (!transport) { 1731 const types = filtered.map(m => m.type).join(',') 1732 logForDebugging( 1733 `[bridge:repl] Transport not configured, dropping ${filtered.length} message(s) [${types}] for session=${currentSessionId}`, 1734 { level: 'warn' }, 1735 ) 1736 return 1737 } 1738 1739 // Track in the bounded ring buffer for echo filtering and dedup. 1740 for (const msg of filtered) { 1741 recentPostedUUIDs.add(msg.uuid) 1742 } 1743 1744 logForDebugging( 1745 `[bridge:repl] Sending ${filtered.length} message(s) via transport`, 1746 ) 1747 1748 // Convert to SDK format and send via HTTP POST (HybridTransport). 1749 // The web UI receives them via the subscribe WebSocket. 1750 const sdkMessages = toSDKMessages(filtered) 1751 const events = sdkMessages.map(sdkMsg => ({ 1752 ...sdkMsg, 1753 session_id: currentSessionId, 1754 })) 1755 void transport.writeBatch(events) 1756 }, 1757 writeSdkMessages(messages) { 1758 // Daemon path: query() already yields SDKMessage, skip conversion. 1759 // Still run echo dedup (server bounces writes back on the WS). 1760 // No initialMessageUUIDs filter — daemon has no initial messages. 1761 // No flushGate — daemon never starts it (no initial flush). 1762 const filtered = messages.filter( 1763 m => !m.uuid || !recentPostedUUIDs.has(m.uuid), 1764 ) 1765 if (filtered.length === 0) return 1766 if (!transport) { 1767 logForDebugging( 1768 `[bridge:repl] Transport not configured, dropping ${filtered.length} SDK message(s) for session=${currentSessionId}`, 1769 { level: 'warn' }, 1770 ) 1771 return 1772 } 1773 for (const msg of filtered) { 1774 if (msg.uuid) recentPostedUUIDs.add(msg.uuid) 1775 } 1776 const events = filtered.map(m => ({ ...m, session_id: currentSessionId })) 1777 void transport.writeBatch(events) 1778 }, 1779 sendControlRequest(request: SDKControlRequest) { 1780 if (!transport) { 1781 logForDebugging( 1782 '[bridge:repl] Transport not configured, skipping control_request', 1783 ) 1784 return 1785 } 1786 const event = { ...request, session_id: currentSessionId } 1787 void transport.write(event) 1788 logForDebugging( 1789 `[bridge:repl] Sent control_request request_id=${request.request_id}`, 1790 ) 1791 }, 1792 sendControlResponse(response: SDKControlResponse) { 1793 if (!transport) { 1794 logForDebugging( 1795 '[bridge:repl] Transport not configured, skipping control_response', 1796 ) 1797 return 1798 } 1799 const event = { ...response, session_id: currentSessionId } 1800 void transport.write(event) 1801 logForDebugging('[bridge:repl] Sent control_response') 1802 }, 1803 sendControlCancelRequest(requestId: string) { 1804 if (!transport) { 1805 logForDebugging( 1806 '[bridge:repl] Transport not configured, skipping control_cancel_request', 1807 ) 1808 return 1809 } 1810 const event = { 1811 type: 'control_cancel_request' as const, 1812 request_id: requestId, 1813 session_id: currentSessionId, 1814 } 1815 void transport.write(event) 1816 logForDebugging( 1817 `[bridge:repl] Sent control_cancel_request request_id=${requestId}`, 1818 ) 1819 }, 1820 sendResult() { 1821 if (!transport) { 1822 logForDebugging( 1823 `[bridge:repl] sendResult: skipping, transport not configured session=${currentSessionId}`, 1824 ) 1825 return 1826 } 1827 void transport.write(makeResultMessage(currentSessionId)) 1828 logForDebugging( 1829 `[bridge:repl] Sent result for session=${currentSessionId}`, 1830 ) 1831 }, 1832 async teardown() { 1833 unregister() 1834 await doTeardownImpl?.() 1835 logForDebugging('[bridge:repl] Torn down') 1836 logEvent('tengu_bridge_repl_teardown', {}) 1837 }, 1838 } 1839} 1840 1841/** 1842 * Persistent poll loop for work items. Runs in the background for the 1843 * lifetime of the bridge connection. 1844 * 1845 * When a work item arrives, acknowledges it and calls onWorkReceived 1846 * with the session ID and ingress token (which connects the ingress 1847 * WebSocket). Then continues polling — the server will dispatch a new 1848 * work item if the ingress WebSocket drops, allowing automatic 1849 * reconnection without tearing down the bridge. 1850 */ 1851async function startWorkPollLoop({ 1852 api, 1853 getCredentials, 1854 signal, 1855 onStateChange, 1856 onWorkReceived, 1857 onEnvironmentLost, 1858 getWsState, 1859 isAtCapacity, 1860 capacitySignal, 1861 onFatalError, 1862 getPollIntervalConfig = () => DEFAULT_POLL_CONFIG, 1863 getHeartbeatInfo, 1864 onHeartbeatFatal, 1865}: { 1866 api: BridgeApiClient 1867 getCredentials: () => { environmentId: string; environmentSecret: string } 1868 signal: AbortSignal 1869 onStateChange?: (state: BridgeState, detail?: string) => void 1870 onWorkReceived: ( 1871 sessionId: string, 1872 ingressToken: string, 1873 workId: string, 1874 useCodeSessions: boolean, 1875 ) => void 1876 /** Called when the environment has been deleted. Returns new credentials or null. */ 1877 onEnvironmentLost?: () => Promise<{ 1878 environmentId: string 1879 environmentSecret: string 1880 } | null> 1881 /** Returns the current WebSocket readyState label for diagnostic logging. */ 1882 getWsState?: () => string 1883 /** 1884 * Returns true when the caller cannot accept new work (transport already 1885 * connected). When true, the loop polls at the configured at-capacity 1886 * interval as a heartbeat only. Server-side BRIDGE_LAST_POLL_TTL is 1887 * 4 hours — anything shorter than that is sufficient for liveness. 1888 */ 1889 isAtCapacity?: () => boolean 1890 /** 1891 * Produces a signal that aborts when capacity frees up (transport lost), 1892 * merged with the loop signal. Used to interrupt the at-capacity sleep 1893 * so recovery polling starts immediately. 1894 */ 1895 capacitySignal?: () => CapacitySignal 1896 /** Called on unrecoverable errors (e.g. server-side expiry) to trigger full teardown. */ 1897 onFatalError?: () => void 1898 /** Poll interval config getter — defaults to DEFAULT_POLL_CONFIG. */ 1899 getPollIntervalConfig?: () => PollIntervalConfig 1900 /** 1901 * Returns the current work ID and session ingress token for heartbeat. 1902 * When null, heartbeat is not possible (no active work item). 1903 */ 1904 getHeartbeatInfo?: () => { 1905 environmentId: string 1906 workId: string 1907 sessionToken: string 1908 } | null 1909 /** 1910 * Called when heartbeatWork throws BridgeFatalError (401/403/404/410 — 1911 * JWT expired or work item gone). Caller should tear down the transport 1912 * + work state so isAtCapacity() flips to false and the loop fast-polls 1913 * for the server's re-dispatched work item. When provided, the loop 1914 * SKIPS the at-capacity backoff sleep (which would otherwise cause a 1915 * ~10-minute dead window before recovery). When omitted, falls back to 1916 * the backoff sleep to avoid a tight poll+heartbeat loop. 1917 */ 1918 onHeartbeatFatal?: (err: BridgeFatalError) => void 1919}): Promise<void> { 1920 const MAX_ENVIRONMENT_RECREATIONS = 3 1921 1922 logForDebugging( 1923 `[bridge:repl] Starting work poll loop for env=${getCredentials().environmentId}`, 1924 ) 1925 1926 let consecutiveErrors = 0 1927 let firstErrorTime: number | null = null 1928 let lastPollErrorTime: number | null = null 1929 let environmentRecreations = 0 1930 // Set when the at-capacity sleep overruns its deadline by a large margin 1931 // (process suspension). Consumed at the top of the next iteration to 1932 // force one fast-poll cycle — isAtCapacity() is `transport !== null`, 1933 // which stays true while the transport auto-reconnects, so the poll 1934 // loop would otherwise go straight back to a 10-minute sleep on a 1935 // transport that may be pointed at a dead socket. 1936 let suspensionDetected = false 1937 1938 while (!signal.aborted) { 1939 // Capture credentials outside try so the catch block can detect 1940 // whether a concurrent reconnection replaced the environment. 1941 const { environmentId: envId, environmentSecret: envSecret } = 1942 getCredentials() 1943 const pollConfig = getPollIntervalConfig() 1944 try { 1945 const work = await api.pollForWork( 1946 envId, 1947 envSecret, 1948 signal, 1949 pollConfig.reclaim_older_than_ms, 1950 ) 1951 1952 // A successful poll proves the env is genuinely healthy — reset the 1953 // env-loss counter so events hours apart each start fresh. Outside 1954 // the state-change guard below because onEnvLost's success path 1955 // already emits 'ready'; emitting again here would be a duplicate. 1956 // (onEnvLost returning creds does NOT reset this — that would break 1957 // oscillation protection when the new env immediately dies.) 1958 environmentRecreations = 0 1959 1960 // Reset error tracking on successful poll 1961 if (consecutiveErrors > 0) { 1962 logForDebugging( 1963 `[bridge:repl] Poll recovered after ${consecutiveErrors} consecutive error(s)`, 1964 ) 1965 consecutiveErrors = 0 1966 firstErrorTime = null 1967 lastPollErrorTime = null 1968 onStateChange?.('ready') 1969 } 1970 1971 if (!work) { 1972 // Read-and-clear: after a detected suspension, skip the at-capacity 1973 // branch exactly once. The pollForWork above already refreshed the 1974 // server's BRIDGE_LAST_POLL_TTL; this fast cycle gives any 1975 // re-dispatched work item a chance to land before we go back under. 1976 const skipAtCapacityOnce = suspensionDetected 1977 suspensionDetected = false 1978 if (isAtCapacity?.() && capacitySignal && !skipAtCapacityOnce) { 1979 const atCapMs = pollConfig.poll_interval_ms_at_capacity 1980 // Heartbeat loops WITHOUT polling. When at-capacity polling is also 1981 // enabled (atCapMs > 0), the loop tracks a deadline and breaks out 1982 // to poll at that interval — heartbeat and poll compose instead of 1983 // one suppressing the other. Breaks out when: 1984 // - Poll deadline reached (atCapMs > 0 only) 1985 // - Auth fails (JWT expired → poll refreshes tokens) 1986 // - Capacity wake fires (transport lost → poll for new work) 1987 // - Heartbeat config disabled (GrowthBook update) 1988 // - Loop aborted (shutdown) 1989 if ( 1990 pollConfig.non_exclusive_heartbeat_interval_ms > 0 && 1991 getHeartbeatInfo 1992 ) { 1993 logEvent('tengu_bridge_heartbeat_mode_entered', { 1994 heartbeat_interval_ms: 1995 pollConfig.non_exclusive_heartbeat_interval_ms, 1996 }) 1997 // Deadline computed once at entry — GB updates to atCapMs don't 1998 // shift an in-flight deadline (next entry picks up the new value). 1999 const pollDeadline = atCapMs > 0 ? Date.now() + atCapMs : null 2000 let needsBackoff = false 2001 let hbCycles = 0 2002 while ( 2003 !signal.aborted && 2004 isAtCapacity() && 2005 (pollDeadline === null || Date.now() < pollDeadline) 2006 ) { 2007 const hbConfig = getPollIntervalConfig() 2008 if (hbConfig.non_exclusive_heartbeat_interval_ms <= 0) break 2009 2010 const info = getHeartbeatInfo() 2011 if (!info) break 2012 2013 // Capture capacity signal BEFORE the async heartbeat call so 2014 // a transport loss during the HTTP request is caught by the 2015 // subsequent sleep. 2016 const cap = capacitySignal() 2017 2018 try { 2019 await api.heartbeatWork( 2020 info.environmentId, 2021 info.workId, 2022 info.sessionToken, 2023 ) 2024 } catch (err) { 2025 logForDebugging( 2026 `[bridge:repl:heartbeat] Failed: ${errorMessage(err)}`, 2027 ) 2028 if (err instanceof BridgeFatalError) { 2029 cap.cleanup() 2030 logEvent('tengu_bridge_heartbeat_error', { 2031 status: 2032 err.status as unknown as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 2033 error_type: (err.status === 401 || err.status === 403 2034 ? 'auth_failed' 2035 : 'fatal') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 2036 }) 2037 // JWT expired (401/403) or work item gone (404/410). 2038 // Either way the current transport is dead — SSE 2039 // reconnects and CCR writes will fail on the same 2040 // stale token. If the caller gave us a recovery hook, 2041 // tear down work state and skip backoff: isAtCapacity() 2042 // flips to false, next outer-loop iteration fast-polls 2043 // for the server's re-dispatched work item. Without 2044 // the hook, backoff to avoid tight poll+heartbeat loop. 2045 if (onHeartbeatFatal) { 2046 onHeartbeatFatal(err) 2047 logForDebugging( 2048 `[bridge:repl:heartbeat] Fatal (status=${err.status}), work state cleared — fast-polling for re-dispatch`, 2049 ) 2050 } else { 2051 needsBackoff = true 2052 } 2053 break 2054 } 2055 } 2056 2057 hbCycles++ 2058 await sleep( 2059 hbConfig.non_exclusive_heartbeat_interval_ms, 2060 cap.signal, 2061 ) 2062 cap.cleanup() 2063 } 2064 2065 const exitReason = needsBackoff 2066 ? 'error' 2067 : signal.aborted 2068 ? 'shutdown' 2069 : !isAtCapacity() 2070 ? 'capacity_changed' 2071 : pollDeadline !== null && Date.now() >= pollDeadline 2072 ? 'poll_due' 2073 : 'config_disabled' 2074 logEvent('tengu_bridge_heartbeat_mode_exited', { 2075 reason: 2076 exitReason as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 2077 heartbeat_cycles: hbCycles, 2078 }) 2079 2080 // On auth_failed or fatal, backoff before polling to avoid a 2081 // tight poll+heartbeat loop. Fall through to the shared sleep 2082 // below — it's the same capacitySignal-wrapped sleep the legacy 2083 // path uses, and both need the suspension-overrun check. 2084 if (!needsBackoff) { 2085 if (exitReason === 'poll_due') { 2086 // bridgeApi throttles empty-poll logs (EMPTY_POLL_LOG_INTERVAL=100) 2087 // so the once-per-10min poll_due poll is invisible at counter=2. 2088 // Log it here so verification runs see both endpoints in the debug log. 2089 logForDebugging( 2090 `[bridge:repl] Heartbeat poll_due after ${hbCycles} cycles — falling through to pollForWork`, 2091 ) 2092 } 2093 continue 2094 } 2095 } 2096 // At-capacity sleep — reached by both the legacy path (heartbeat 2097 // disabled) and the heartbeat-backoff path (needsBackoff=true). 2098 // Merged so the suspension detector covers both; previously the 2099 // backoff path had no overrun check and could go straight back 2100 // under for 10 min after a laptop wake. Use atCapMs when enabled, 2101 // else the heartbeat interval as a floor (guaranteed > 0 on the 2102 // backoff path) so heartbeat-only configs don't tight-loop. 2103 const sleepMs = 2104 atCapMs > 0 2105 ? atCapMs 2106 : pollConfig.non_exclusive_heartbeat_interval_ms 2107 if (sleepMs > 0) { 2108 const cap = capacitySignal() 2109 const sleepStart = Date.now() 2110 await sleep(sleepMs, cap.signal) 2111 cap.cleanup() 2112 // Process-suspension detector. A setTimeout overshooting its 2113 // deadline by 60s means the process was suspended (laptop lid, 2114 // SIGSTOP, VM pause) — even a pathological GC pause is seconds, 2115 // not minutes. Early aborts (wakePollLoop → cap.signal) produce 2116 // overrun < 0 and fall through. Note: this only catches sleeps 2117 // that outlast their deadline; WebSocketTransport's ping 2118 // interval (10s granularity) is the primary detector for shorter 2119 // suspensions. This is the backstop for when that detector isn't 2120 // running (transport mid-reconnect, interval stopped). 2121 const overrun = Date.now() - sleepStart - sleepMs 2122 if (overrun > 60_000) { 2123 logForDebugging( 2124 `[bridge:repl] At-capacity sleep overran by ${Math.round(overrun / 1000)}s — process suspension detected, forcing one fast-poll cycle`, 2125 ) 2126 logEvent('tengu_bridge_repl_suspension_detected', { 2127 overrun_ms: overrun, 2128 }) 2129 suspensionDetected = true 2130 } 2131 } 2132 } else { 2133 await sleep(pollConfig.poll_interval_ms_not_at_capacity, signal) 2134 } 2135 continue 2136 } 2137 2138 // Decode before type dispatch — need the JWT for the explicit ack. 2139 let secret 2140 try { 2141 secret = decodeWorkSecret(work.secret) 2142 } catch (err) { 2143 logForDebugging( 2144 `[bridge:repl] Failed to decode work secret: ${errorMessage(err)}`, 2145 ) 2146 logEvent('tengu_bridge_repl_work_secret_failed', {}) 2147 // Can't ack (needs the JWT we failed to decode). stopWork uses OAuth. 2148 // Prevents XAUTOCLAIM re-delivering this poisoned item every cycle. 2149 await api.stopWork(envId, work.id, false).catch(() => {}) 2150 continue 2151 } 2152 2153 // Explicitly acknowledge to prevent redelivery. Non-fatal on failure: 2154 // server re-delivers, and the onWorkReceived callback handles dedup. 2155 logForDebugging(`[bridge:repl] Acknowledging workId=${work.id}`) 2156 try { 2157 await api.acknowledgeWork(envId, work.id, secret.session_ingress_token) 2158 } catch (err) { 2159 logForDebugging( 2160 `[bridge:repl] Acknowledge failed workId=${work.id}: ${errorMessage(err)}`, 2161 ) 2162 } 2163 2164 if (work.data.type === 'healthcheck') { 2165 logForDebugging('[bridge:repl] Healthcheck received') 2166 continue 2167 } 2168 2169 if (work.data.type === 'session') { 2170 const workSessionId = work.data.id 2171 try { 2172 validateBridgeId(workSessionId, 'session_id') 2173 } catch { 2174 logForDebugging( 2175 `[bridge:repl] Invalid session_id in work: ${workSessionId}`, 2176 ) 2177 continue 2178 } 2179 2180 onWorkReceived( 2181 workSessionId, 2182 secret.session_ingress_token, 2183 work.id, 2184 secret.use_code_sessions === true, 2185 ) 2186 logForDebugging('[bridge:repl] Work accepted, continuing poll loop') 2187 } 2188 } catch (err) { 2189 if (signal.aborted) break 2190 2191 // Detect permanent "environment deleted" error — no amount of 2192 // retrying will recover. Re-register a new environment instead. 2193 // Checked BEFORE the generic BridgeFatalError bail. pollForWork uses 2194 // validateStatus: s => s < 500, so 404 is always wrapped into a 2195 // BridgeFatalError by handleErrorStatus() — never an axios-shaped 2196 // error. The poll endpoint's only path param is the env ID; 404 2197 // unambiguously means env-gone (no-work is a 200 with null body). 2198 // The server sends error.type='not_found_error' (standard Anthropic 2199 // API shape), not a bridge-specific string — but status===404 is 2200 // the real signal and survives body-shape changes. 2201 if ( 2202 err instanceof BridgeFatalError && 2203 err.status === 404 && 2204 onEnvironmentLost 2205 ) { 2206 // If credentials have already been refreshed by a concurrent 2207 // reconnection (e.g. WS close handler), the stale poll's error 2208 // is expected — skip onEnvironmentLost and retry with fresh creds. 2209 const currentEnvId = getCredentials().environmentId 2210 if (envId !== currentEnvId) { 2211 logForDebugging( 2212 `[bridge:repl] Stale poll error for old env=${envId}, current env=${currentEnvId} — skipping onEnvironmentLost`, 2213 ) 2214 consecutiveErrors = 0 2215 firstErrorTime = null 2216 continue 2217 } 2218 2219 environmentRecreations++ 2220 logForDebugging( 2221 `[bridge:repl] Environment deleted, attempting re-registration (attempt ${environmentRecreations}/${MAX_ENVIRONMENT_RECREATIONS})`, 2222 ) 2223 logEvent('tengu_bridge_repl_env_lost', { 2224 attempt: environmentRecreations, 2225 } as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS) 2226 2227 if (environmentRecreations > MAX_ENVIRONMENT_RECREATIONS) { 2228 logForDebugging( 2229 `[bridge:repl] Environment re-registration limit reached (${MAX_ENVIRONMENT_RECREATIONS}), giving up`, 2230 ) 2231 onStateChange?.( 2232 'failed', 2233 'Environment deleted and re-registration limit reached', 2234 ) 2235 onFatalError?.() 2236 break 2237 } 2238 2239 onStateChange?.('reconnecting', 'environment lost, recreating session') 2240 const newCreds = await onEnvironmentLost() 2241 // doReconnect() makes several sequential network calls (1-5s). 2242 // If the user triggered teardown during that window, its internal 2243 // abort checks return false — but we need to re-check here to 2244 // avoid emitting a spurious 'failed' + onFatalError() during 2245 // graceful shutdown. 2246 if (signal.aborted) break 2247 if (newCreds) { 2248 // Credentials are updated in the outer scope via 2249 // reconnectEnvironmentWithSession — getCredentials() will 2250 // return the fresh values on the next poll iteration. 2251 // Do NOT reset environmentRecreations here — onEnvLost returning 2252 // creds only proves we tried to fix it, not that the env is 2253 // healthy. A successful poll (above) is the reset point; if the 2254 // new env immediately dies again we still want the limit to fire. 2255 consecutiveErrors = 0 2256 firstErrorTime = null 2257 onStateChange?.('ready') 2258 logForDebugging( 2259 `[bridge:repl] Re-registered environment: ${newCreds.environmentId}`, 2260 ) 2261 continue 2262 } 2263 2264 onStateChange?.( 2265 'failed', 2266 'Environment deleted and re-registration failed', 2267 ) 2268 onFatalError?.() 2269 break 2270 } 2271 2272 // Fatal errors (401/403/404/410) — no point retrying 2273 if (err instanceof BridgeFatalError) { 2274 const isExpiry = isExpiredErrorType(err.errorType) 2275 const isSuppressible = isSuppressible403(err) 2276 logForDebugging( 2277 `[bridge:repl] Fatal poll error: ${err.message} (status=${err.status}, type=${err.errorType ?? 'unknown'})${isSuppressible ? ' (suppressed)' : ''}`, 2278 ) 2279 logEvent('tengu_bridge_repl_fatal_error', { 2280 status: err.status, 2281 error_type: 2282 err.errorType as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 2283 }) 2284 logForDiagnosticsNoPII( 2285 isExpiry ? 'info' : 'error', 2286 'bridge_repl_fatal_error', 2287 { status: err.status, error_type: err.errorType }, 2288 ) 2289 // Cosmetic 403 errors (e.g., external_poll_sessions scope, 2290 // environments:manage permission) — suppress user-visible error 2291 // but always trigger teardown so cleanup runs. 2292 if (!isSuppressible) { 2293 onStateChange?.( 2294 'failed', 2295 isExpiry 2296 ? 'session expired · /remote-control to reconnect' 2297 : err.message, 2298 ) 2299 } 2300 // Always trigger teardown — matches bridgeMain.ts where fatalExit=true 2301 // is unconditional and post-loop cleanup always runs. 2302 onFatalError?.() 2303 break 2304 } 2305 2306 const now = Date.now() 2307 2308 // Detect system sleep/wake: if the gap since the last poll error 2309 // greatly exceeds the max backoff delay, the machine likely slept. 2310 // Reset error tracking so we retry with a fresh budget instead of 2311 // immediately giving up. 2312 if ( 2313 lastPollErrorTime !== null && 2314 now - lastPollErrorTime > POLL_ERROR_MAX_DELAY_MS * 2 2315 ) { 2316 logForDebugging( 2317 `[bridge:repl] Detected system sleep (${Math.round((now - lastPollErrorTime) / 1000)}s gap), resetting poll error budget`, 2318 ) 2319 logForDiagnosticsNoPII('info', 'bridge_repl_poll_sleep_detected', { 2320 gapMs: now - lastPollErrorTime, 2321 }) 2322 consecutiveErrors = 0 2323 firstErrorTime = null 2324 } 2325 lastPollErrorTime = now 2326 2327 consecutiveErrors++ 2328 if (firstErrorTime === null) { 2329 firstErrorTime = now 2330 } 2331 const elapsed = now - firstErrorTime 2332 const httpStatus = extractHttpStatus(err) 2333 const errMsg = describeAxiosError(err) 2334 const wsLabel = getWsState?.() ?? 'unknown' 2335 2336 logForDebugging( 2337 `[bridge:repl] Poll error (attempt ${consecutiveErrors}, elapsed ${Math.round(elapsed / 1000)}s, ws=${wsLabel}): ${errMsg}`, 2338 ) 2339 logEvent('tengu_bridge_repl_poll_error', { 2340 status: httpStatus, 2341 consecutiveErrors, 2342 elapsedMs: elapsed, 2343 } as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS) 2344 2345 // Only transition to 'reconnecting' on the first error — stay 2346 // there until a successful poll (avoid flickering the UI state). 2347 if (consecutiveErrors === 1) { 2348 onStateChange?.('reconnecting', errMsg) 2349 } 2350 2351 // Give up after continuous failures 2352 if (elapsed >= POLL_ERROR_GIVE_UP_MS) { 2353 logForDebugging( 2354 `[bridge:repl] Poll failures exceeded ${POLL_ERROR_GIVE_UP_MS / 1000}s (${consecutiveErrors} errors), giving up`, 2355 ) 2356 logForDiagnosticsNoPII('info', 'bridge_repl_poll_give_up') 2357 logEvent('tengu_bridge_repl_poll_give_up', { 2358 consecutiveErrors, 2359 elapsedMs: elapsed, 2360 lastStatus: httpStatus, 2361 } as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS) 2362 onStateChange?.('failed', 'connection to server lost') 2363 break 2364 } 2365 2366 // Exponential backoff: 2s → 4s → 8s → 16s → 32s → 60s (cap) 2367 const backoff = Math.min( 2368 POLL_ERROR_INITIAL_DELAY_MS * 2 ** (consecutiveErrors - 1), 2369 POLL_ERROR_MAX_DELAY_MS, 2370 ) 2371 // The poll_due heartbeat-loop exit leaves a healthy lease exposed to 2372 // this backoff path. Heartbeat before each sleep so /poll outages 2373 // (the VerifyEnvironmentSecretAuth DB path heartbeat was introduced to 2374 // avoid) don't kill the 300s lease TTL. 2375 if (getPollIntervalConfig().non_exclusive_heartbeat_interval_ms > 0) { 2376 const info = getHeartbeatInfo?.() 2377 if (info) { 2378 try { 2379 await api.heartbeatWork( 2380 info.environmentId, 2381 info.workId, 2382 info.sessionToken, 2383 ) 2384 } catch { 2385 // Best-effort — if heartbeat also fails the lease dies, same as 2386 // pre-poll_due behavior (where the only heartbeat-loop exits were 2387 // ones where the lease was already dying). 2388 } 2389 } 2390 } 2391 await sleep(backoff, signal) 2392 } 2393 } 2394 2395 logForDebugging( 2396 `[bridge:repl] Work poll loop ended (aborted=${signal.aborted}) env=${getCredentials().environmentId}`, 2397 ) 2398} 2399 2400// Exported for testing only 2401export { 2402 startWorkPollLoop as _startWorkPollLoopForTesting, 2403 POLL_ERROR_INITIAL_DELAY_MS as _POLL_ERROR_INITIAL_DELAY_MS_ForTesting, 2404 POLL_ERROR_MAX_DELAY_MS as _POLL_ERROR_MAX_DELAY_MS_ForTesting, 2405 POLL_ERROR_GIVE_UP_MS as _POLL_ERROR_GIVE_UP_MS_ForTesting, 2406}