source dump of claude code
at main 370 lines 16 kB view raw
1import type { StdoutMessage } from 'src/entrypoints/sdk/controlTypes.js' 2import { CCRClient } from '../cli/transports/ccrClient.js' 3import type { HybridTransport } from '../cli/transports/HybridTransport.js' 4import { SSETransport } from '../cli/transports/SSETransport.js' 5import { logForDebugging } from '../utils/debug.js' 6import { errorMessage } from '../utils/errors.js' 7import { updateSessionIngressAuthToken } from '../utils/sessionIngressAuth.js' 8import type { SessionState } from '../utils/sessionState.js' 9import { registerWorker } from './workSecret.js' 10 11/** 12 * Transport abstraction for replBridge. Covers exactly the surface that 13 * replBridge.ts uses against HybridTransport so the v1/v2 choice is 14 * confined to the construction site. 15 * 16 * - v1: HybridTransport (WS reads + POST writes to Session-Ingress) 17 * - v2: SSETransport (reads) + CCRClient (writes to CCR v2 /worker/*) 18 * 19 * The v2 write path goes through CCRClient.writeEvent → SerialBatchEventUploader, 20 * NOT through SSETransport.write() — SSETransport.write() targets the 21 * Session-Ingress POST URL shape, which is wrong for CCR v2. 22 */ 23export type ReplBridgeTransport = { 24 write(message: StdoutMessage): Promise<void> 25 writeBatch(messages: StdoutMessage[]): Promise<void> 26 close(): void 27 isConnectedStatus(): boolean 28 getStateLabel(): string 29 setOnData(callback: (data: string) => void): void 30 setOnClose(callback: (closeCode?: number) => void): void 31 setOnConnect(callback: () => void): void 32 connect(): void 33 /** 34 * High-water mark of the underlying read stream's event sequence numbers. 35 * replBridge reads this before swapping transports so the new one can 36 * resume from where the old one left off (otherwise the server replays 37 * the entire session history from seq 0). 38 * 39 * v1 returns 0 — Session-Ingress WS doesn't use SSE sequence numbers; 40 * replay-on-reconnect is handled by the server-side message cursor. 41 */ 42 getLastSequenceNum(): number 43 /** 44 * Monotonic count of batches dropped via maxConsecutiveFailures. 45 * Snapshot before writeBatch() and compare after to detect silent drops 46 * (writeBatch() resolves normally even when batches were dropped). 47 * v2 returns 0 — the v2 write path doesn't set maxConsecutiveFailures. 48 */ 49 readonly droppedBatchCount: number 50 /** 51 * PUT /worker state (v2 only; v1 is a no-op). `requires_action` tells 52 * the backend a permission prompt is pending — claude.ai shows the 53 * "waiting for input" indicator. REPL/daemon callers don't need this 54 * (user watches the REPL locally); multi-session worker callers do. 55 */ 56 reportState(state: SessionState): void 57 /** PUT /worker external_metadata (v2 only; v1 is a no-op). */ 58 reportMetadata(metadata: Record<string, unknown>): void 59 /** 60 * POST /worker/events/{id}/delivery (v2 only; v1 is a no-op). Populates 61 * CCR's processing_at/processed_at columns. `received` is auto-fired by 62 * CCRClient on every SSE frame and is not exposed here. 63 */ 64 reportDelivery(eventId: string, status: 'processing' | 'processed'): void 65 /** 66 * Drain the write queue before close() (v2 only; v1 resolves 67 * immediately — HybridTransport POSTs are already awaited per-write). 68 */ 69 flush(): Promise<void> 70} 71 72/** 73 * v1 adapter: HybridTransport already has the full surface (it extends 74 * WebSocketTransport which has setOnConnect + getStateLabel). This is a 75 * no-op wrapper that exists only so replBridge's `transport` variable 76 * has a single type. 77 */ 78export function createV1ReplTransport( 79 hybrid: HybridTransport, 80): ReplBridgeTransport { 81 return { 82 write: msg => hybrid.write(msg), 83 writeBatch: msgs => hybrid.writeBatch(msgs), 84 close: () => hybrid.close(), 85 isConnectedStatus: () => hybrid.isConnectedStatus(), 86 getStateLabel: () => hybrid.getStateLabel(), 87 setOnData: cb => hybrid.setOnData(cb), 88 setOnClose: cb => hybrid.setOnClose(cb), 89 setOnConnect: cb => hybrid.setOnConnect(cb), 90 connect: () => void hybrid.connect(), 91 // v1 Session-Ingress WS doesn't use SSE sequence numbers; replay 92 // semantics are different. Always return 0 so the seq-num carryover 93 // logic in replBridge is a no-op for v1. 94 getLastSequenceNum: () => 0, 95 get droppedBatchCount() { 96 return hybrid.droppedBatchCount 97 }, 98 reportState: () => {}, 99 reportMetadata: () => {}, 100 reportDelivery: () => {}, 101 flush: () => Promise.resolve(), 102 } 103} 104 105/** 106 * v2 adapter: wrap SSETransport (reads) + CCRClient (writes, heartbeat, 107 * state, delivery tracking). 108 * 109 * Auth: v2 endpoints validate the JWT's session_id claim (register_worker.go:32) 110 * and worker role (environment_auth.py:856). OAuth tokens have neither. 111 * This is the inverse of the v1 replBridge path, which deliberately uses OAuth. 112 * The JWT is refreshed when the poll loop re-dispatches work — the caller 113 * invokes createV2ReplTransport again with the fresh token. 114 * 115 * Registration happens here (not in the caller) so the entire v2 handshake 116 * is one async step. registerWorker failure propagates — replBridge will 117 * catch it and stay on the poll loop. 118 */ 119export async function createV2ReplTransport(opts: { 120 sessionUrl: string 121 ingressToken: string 122 sessionId: string 123 /** 124 * SSE sequence-number high-water mark from the previous transport. 125 * Passed to the new SSETransport so its first connect() sends 126 * from_sequence_num / Last-Event-ID and the server resumes from where 127 * the old stream left off. Without this, every transport swap asks the 128 * server to replay the entire session history from seq 0. 129 */ 130 initialSequenceNum?: number 131 /** 132 * Worker epoch from POST /bridge response. When provided, the server 133 * already bumped epoch (the /bridge call IS the register — see server 134 * PR #293280). When omitted (v1 CCR-v2 path via replBridge.ts poll loop), 135 * call registerWorker as before. 136 */ 137 epoch?: number 138 /** CCRClient heartbeat interval. Defaults to 20s when omitted. */ 139 heartbeatIntervalMs?: number 140 /** ±fraction per-beat jitter. Defaults to 0 (no jitter) when omitted. */ 141 heartbeatJitterFraction?: number 142 /** 143 * When true, skip opening the SSE read stream — only the CCRClient write 144 * path is activated. Use for mirror-mode attachments that forward events 145 * but never receive inbound prompts or control requests. 146 */ 147 outboundOnly?: boolean 148 /** 149 * Per-instance auth header source. When provided, CCRClient + SSETransport 150 * read auth from this closure instead of the process-wide 151 * CLAUDE_CODE_SESSION_ACCESS_TOKEN env var. Required for callers managing 152 * multiple concurrent sessions — the env-var path stomps across sessions. 153 * When omitted, falls back to the env var (single-session callers). 154 */ 155 getAuthToken?: () => string | undefined 156}): Promise<ReplBridgeTransport> { 157 const { 158 sessionUrl, 159 ingressToken, 160 sessionId, 161 initialSequenceNum, 162 getAuthToken, 163 } = opts 164 165 // Auth header builder. If getAuthToken is provided, read from it 166 // (per-instance, multi-session safe). Otherwise write ingressToken to 167 // the process-wide env var (legacy single-session path — CCRClient's 168 // default getAuthHeaders reads it via getSessionIngressAuthHeaders). 169 let getAuthHeaders: (() => Record<string, string>) | undefined 170 if (getAuthToken) { 171 getAuthHeaders = (): Record<string, string> => { 172 const token = getAuthToken() 173 if (!token) return {} 174 return { Authorization: `Bearer ${token}` } 175 } 176 } else { 177 // CCRClient.request() and SSETransport.connect() both read auth via 178 // getSessionIngressAuthHeaders() → this env var. Set it before either 179 // touches the network. 180 updateSessionIngressAuthToken(ingressToken) 181 } 182 183 const epoch = opts.epoch ?? (await registerWorker(sessionUrl, ingressToken)) 184 logForDebugging( 185 `[bridge:repl] CCR v2: worker sessionId=${sessionId} epoch=${epoch}${opts.epoch !== undefined ? ' (from /bridge)' : ' (via registerWorker)'}`, 186 ) 187 188 // Derive SSE stream URL. Same logic as transportUtils.ts:26-33 but 189 // starting from an http(s) base instead of a --sdk-url that might be ws://. 190 const sseUrl = new URL(sessionUrl) 191 sseUrl.pathname = sseUrl.pathname.replace(/\/$/, '') + '/worker/events/stream' 192 193 const sse = new SSETransport( 194 sseUrl, 195 {}, 196 sessionId, 197 undefined, 198 initialSequenceNum, 199 getAuthHeaders, 200 ) 201 let onCloseCb: ((closeCode?: number) => void) | undefined 202 const ccr = new CCRClient(sse, new URL(sessionUrl), { 203 getAuthHeaders, 204 heartbeatIntervalMs: opts.heartbeatIntervalMs, 205 heartbeatJitterFraction: opts.heartbeatJitterFraction, 206 // Default is process.exit(1) — correct for spawn-mode children. In-process, 207 // that kills the REPL. Close instead: replBridge's onClose wakes the poll 208 // loop, which picks up the server's re-dispatch (with fresh epoch). 209 onEpochMismatch: () => { 210 logForDebugging( 211 '[bridge:repl] CCR v2: epoch superseded (409) — closing for poll-loop recovery', 212 ) 213 // Close resources in a try block so the throw always executes. 214 // If ccr.close() or sse.close() throw, we still need to unwind 215 // the caller (request()) — otherwise handleEpochMismatch's `never` 216 // return type is violated at runtime and control falls through. 217 try { 218 ccr.close() 219 sse.close() 220 onCloseCb?.(4090) 221 } catch (closeErr: unknown) { 222 logForDebugging( 223 `[bridge:repl] CCR v2: error during epoch-mismatch cleanup: ${errorMessage(closeErr)}`, 224 { level: 'error' }, 225 ) 226 } 227 // Don't return — the calling request() code continues after the 409 228 // branch, so callers see the logged warning and a false return. We 229 // throw to unwind; the uploaders catch it as a send failure. 230 throw new Error('epoch superseded') 231 }, 232 }) 233 234 // CCRClient's constructor wired sse.setOnEvent → reportDelivery('received'). 235 // remoteIO.ts additionally sends 'processing'/'processed' via 236 // setCommandLifecycleListener, which the in-process query loop fires. This 237 // transport's only caller (replBridge/daemonBridge) has no such wiring — the 238 // daemon's agent child is a separate process (ProcessTransport), and its 239 // notifyCommandLifecycle calls fire with listener=null in its own module 240 // scope. So events stay at 'received' forever, and reconnectSession re-queues 241 // them on every daemon restart (observed: 21→24→25 phantom prompts as 242 // "user sent a new message while you were working" system-reminders). 243 // 244 // Fix: ACK 'processed' immediately alongside 'received'. The window between 245 // SSE receipt and transcript-write is narrow (queue → SDK → child stdin → 246 // model); a crash there loses one prompt vs. the observed N-prompt flood on 247 // every restart. Overwrite the constructor's wiring to do both — setOnEvent 248 // replaces, not appends (SSETransport.ts:658). 249 sse.setOnEvent(event => { 250 ccr.reportDelivery(event.event_id, 'received') 251 ccr.reportDelivery(event.event_id, 'processed') 252 }) 253 254 // Both sse.connect() and ccr.initialize() are deferred to connect() below. 255 // replBridge's calling order is newTransport → setOnConnect → setOnData → 256 // setOnClose → connect(), and both calls need those callbacks wired first: 257 // sse.connect() opens the stream (events flow to onData/onClose immediately), 258 // and ccr.initialize().then() fires onConnectCb. 259 // 260 // onConnect fires once ccr.initialize() resolves. Writes go via 261 // CCRClient HTTP POST (SerialBatchEventUploader), not SSE, so the 262 // write path is ready the moment workerEpoch is set. SSE.connect() 263 // awaits its read loop and never resolves — don't gate on it. 264 // The SSE stream opens in parallel (~30ms) and starts delivering 265 // inbound events via setOnData; outbound doesn't need to wait for it. 266 let onConnectCb: (() => void) | undefined 267 let ccrInitialized = false 268 let closed = false 269 270 return { 271 write(msg) { 272 return ccr.writeEvent(msg) 273 }, 274 async writeBatch(msgs) { 275 // SerialBatchEventUploader already batches internally (maxBatchSize=100); 276 // sequential enqueue preserves order and the uploader coalesces. 277 // Check closed between writes to avoid sending partial batches after 278 // transport teardown (epoch mismatch, SSE drop). 279 for (const m of msgs) { 280 if (closed) break 281 await ccr.writeEvent(m) 282 } 283 }, 284 close() { 285 closed = true 286 ccr.close() 287 sse.close() 288 }, 289 isConnectedStatus() { 290 // Write-readiness, not read-readiness — replBridge checks this 291 // before calling writeBatch. SSE open state is orthogonal. 292 return ccrInitialized 293 }, 294 getStateLabel() { 295 // SSETransport doesn't expose its state string; synthesize from 296 // what we can observe. replBridge only uses this for debug logging. 297 if (sse.isClosedStatus()) return 'closed' 298 if (sse.isConnectedStatus()) return ccrInitialized ? 'connected' : 'init' 299 return 'connecting' 300 }, 301 setOnData(cb) { 302 sse.setOnData(cb) 303 }, 304 setOnClose(cb) { 305 onCloseCb = cb 306 // SSE reconnect-budget exhaustion fires onClose(undefined) — map to 307 // 4092 so ws_closed telemetry can distinguish it from HTTP-status 308 // closes (SSETransport:280 passes response.status). Stop CCRClient's 309 // heartbeat timer before notifying replBridge. (sse.close() doesn't 310 // invoke this, so the epoch-mismatch path above isn't double-firing.) 311 sse.setOnClose(code => { 312 ccr.close() 313 cb(code ?? 4092) 314 }) 315 }, 316 setOnConnect(cb) { 317 onConnectCb = cb 318 }, 319 getLastSequenceNum() { 320 return sse.getLastSequenceNum() 321 }, 322 // v2 write path (CCRClient) doesn't set maxConsecutiveFailures — no drops. 323 droppedBatchCount: 0, 324 reportState(state) { 325 ccr.reportState(state) 326 }, 327 reportMetadata(metadata) { 328 ccr.reportMetadata(metadata) 329 }, 330 reportDelivery(eventId, status) { 331 ccr.reportDelivery(eventId, status) 332 }, 333 flush() { 334 return ccr.flush() 335 }, 336 connect() { 337 // Outbound-only: skip the SSE read stream entirely — no inbound 338 // events to receive, no delivery ACKs to send. Only the CCRClient 339 // write path (POST /worker/events) and heartbeat are needed. 340 if (!opts.outboundOnly) { 341 // Fire-and-forget — SSETransport.connect() awaits readStream() 342 // (the read loop) and only resolves on stream close/error. The 343 // spawn-mode path in remoteIO.ts does the same void discard. 344 void sse.connect() 345 } 346 void ccr.initialize(epoch).then( 347 () => { 348 ccrInitialized = true 349 logForDebugging( 350 `[bridge:repl] v2 transport ready for writes (epoch=${epoch}, sse=${sse.isConnectedStatus() ? 'open' : 'opening'})`, 351 ) 352 onConnectCb?.() 353 }, 354 (err: unknown) => { 355 logForDebugging( 356 `[bridge:repl] CCR v2 initialize failed: ${errorMessage(err)}`, 357 { level: 'error' }, 358 ) 359 // Close transport resources and notify replBridge via onClose 360 // so the poll loop can retry on the next work dispatch. 361 // Without this callback, replBridge never learns the transport 362 // failed to initialize and sits with transport === null forever. 363 ccr.close() 364 sse.close() 365 onCloseCb?.(4091) // 4091 = init failure, distinguishable from 4090 epoch mismatch 366 }, 367 ) 368 }, 369 } 370}