source dump of claude code
at main 255 lines 9.9 kB view raw
1import type { StdoutMessage } from 'src/entrypoints/sdk/controlTypes.js' 2import { PassThrough } from 'stream' 3import { URL } from 'url' 4import { getSessionId } from '../bootstrap/state.js' 5import { getPollIntervalConfig } from '../bridge/pollConfig.js' 6import { registerCleanup } from '../utils/cleanupRegistry.js' 7import { setCommandLifecycleListener } from '../utils/commandLifecycle.js' 8import { isDebugMode, logForDebugging } from '../utils/debug.js' 9import { logForDiagnosticsNoPII } from '../utils/diagLogs.js' 10import { isEnvTruthy } from '../utils/envUtils.js' 11import { errorMessage } from '../utils/errors.js' 12import { gracefulShutdown } from '../utils/gracefulShutdown.js' 13import { logError } from '../utils/log.js' 14import { writeToStdout } from '../utils/process.js' 15import { getSessionIngressAuthToken } from '../utils/sessionIngressAuth.js' 16import { 17 setSessionMetadataChangedListener, 18 setSessionStateChangedListener, 19} from '../utils/sessionState.js' 20import { 21 setInternalEventReader, 22 setInternalEventWriter, 23} from '../utils/sessionStorage.js' 24import { ndjsonSafeStringify } from './ndjsonSafeStringify.js' 25import { StructuredIO } from './structuredIO.js' 26import { CCRClient, CCRInitError } from './transports/ccrClient.js' 27import { SSETransport } from './transports/SSETransport.js' 28import type { Transport } from './transports/Transport.js' 29import { getTransportForUrl } from './transports/transportUtils.js' 30 31/** 32 * Bidirectional streaming for SDK mode with session tracking 33 * Supports WebSocket transport 34 */ 35export class RemoteIO extends StructuredIO { 36 private url: URL 37 private transport: Transport 38 private inputStream: PassThrough 39 private readonly isBridge: boolean = false 40 private readonly isDebug: boolean = false 41 private ccrClient: CCRClient | null = null 42 private keepAliveTimer: ReturnType<typeof setInterval> | null = null 43 44 constructor( 45 streamUrl: string, 46 initialPrompt?: AsyncIterable<string>, 47 replayUserMessages?: boolean, 48 ) { 49 const inputStream = new PassThrough({ encoding: 'utf8' }) 50 super(inputStream, replayUserMessages) 51 this.inputStream = inputStream 52 this.url = new URL(streamUrl) 53 54 // Prepare headers with session token if available 55 const headers: Record<string, string> = {} 56 const sessionToken = getSessionIngressAuthToken() 57 if (sessionToken) { 58 headers['Authorization'] = `Bearer ${sessionToken}` 59 } else { 60 logForDebugging('[remote-io] No session ingress token available', { 61 level: 'error', 62 }) 63 } 64 65 // Add environment runner version if available (set by Environment Manager) 66 const erVersion = process.env.CLAUDE_CODE_ENVIRONMENT_RUNNER_VERSION 67 if (erVersion) { 68 headers['x-environment-runner-version'] = erVersion 69 } 70 71 // Provide a callback that re-reads the session token dynamically. 72 // When the parent process refreshes the token (via token file or env var), 73 // the transport can pick it up on reconnection. 74 const refreshHeaders = (): Record<string, string> => { 75 const h: Record<string, string> = {} 76 const freshToken = getSessionIngressAuthToken() 77 if (freshToken) { 78 h['Authorization'] = `Bearer ${freshToken}` 79 } 80 const freshErVersion = process.env.CLAUDE_CODE_ENVIRONMENT_RUNNER_VERSION 81 if (freshErVersion) { 82 h['x-environment-runner-version'] = freshErVersion 83 } 84 return h 85 } 86 87 // Get appropriate transport based on URL protocol 88 this.transport = getTransportForUrl( 89 this.url, 90 headers, 91 getSessionId(), 92 refreshHeaders, 93 ) 94 95 // Set up data callback 96 this.isBridge = process.env.CLAUDE_CODE_ENVIRONMENT_KIND === 'bridge' 97 this.isDebug = isDebugMode() 98 this.transport.setOnData((data: string) => { 99 this.inputStream.write(data) 100 if (this.isBridge && this.isDebug) { 101 writeToStdout(data.endsWith('\n') ? data : data + '\n') 102 } 103 }) 104 105 // Set up close callback to handle connection failures 106 this.transport.setOnClose(() => { 107 // End the input stream to trigger graceful shutdown 108 this.inputStream.end() 109 }) 110 111 // Initialize CCR v2 client (heartbeats, epoch, state reporting, event writes). 112 // The CCRClient constructor wires the SSE received-ack handler 113 // synchronously, so new CCRClient() MUST run before transport.connect() — 114 // otherwise early SSE frames hit an unwired onEventCallback and their 115 // 'received' delivery acks are silently dropped. 116 if (isEnvTruthy(process.env.CLAUDE_CODE_USE_CCR_V2)) { 117 // CCR v2 is SSE+POST by definition. getTransportForUrl returns 118 // SSETransport under the same env var, but the two checks live in 119 // different files — assert the invariant so a future decoupling 120 // fails loudly here instead of confusingly inside CCRClient. 121 if (!(this.transport instanceof SSETransport)) { 122 throw new Error( 123 'CCR v2 requires SSETransport; check getTransportForUrl', 124 ) 125 } 126 this.ccrClient = new CCRClient(this.transport, this.url) 127 const init = this.ccrClient.initialize() 128 this.restoredWorkerState = init.catch(() => null) 129 init.catch((error: unknown) => { 130 logForDiagnosticsNoPII('error', 'cli_worker_lifecycle_init_failed', { 131 reason: error instanceof CCRInitError ? error.reason : 'unknown', 132 }) 133 logError( 134 new Error(`CCRClient initialization failed: ${errorMessage(error)}`), 135 ) 136 void gracefulShutdown(1, 'other') 137 }) 138 registerCleanup(async () => this.ccrClient?.close()) 139 140 // Register internal event writer for transcript persistence. 141 // When set, sessionStorage writes transcript messages as CCR v2 142 // internal events instead of v1 Session Ingress. 143 setInternalEventWriter((eventType, payload, options) => 144 this.ccrClient!.writeInternalEvent(eventType, payload, options), 145 ) 146 147 // Register internal event readers for session resume. 148 // When set, hydrateFromCCRv2InternalEvents() can fetch foreground 149 // and subagent internal events to reconstruct conversation state. 150 setInternalEventReader( 151 () => this.ccrClient!.readInternalEvents(), 152 () => this.ccrClient!.readSubagentInternalEvents(), 153 ) 154 155 const LIFECYCLE_TO_DELIVERY = { 156 started: 'processing', 157 completed: 'processed', 158 } as const 159 setCommandLifecycleListener((uuid, state) => { 160 this.ccrClient?.reportDelivery(uuid, LIFECYCLE_TO_DELIVERY[state]) 161 }) 162 setSessionStateChangedListener((state, details) => { 163 this.ccrClient?.reportState(state, details) 164 }) 165 setSessionMetadataChangedListener(metadata => { 166 this.ccrClient?.reportMetadata(metadata) 167 }) 168 } 169 170 // Start connection only after all callbacks are wired (setOnData above, 171 // setOnEvent inside new CCRClient() when CCR v2 is enabled). 172 void this.transport.connect() 173 174 // Push a silent keep_alive frame on a fixed interval so upstream 175 // proxies and the session-ingress layer don't GC an otherwise-idle 176 // remote control session. The keep_alive type is filtered before 177 // reaching any client UI (Query.ts drops it; structuredIO.ts drops it; 178 // web/iOS/Android never see it in their message loop). Interval comes 179 // from GrowthBook (tengu_bridge_poll_interval_config 180 // session_keepalive_interval_v2_ms, default 120s); 0 = disabled. 181 // Bridge-only: fixes Envoy idle timeout on bridge-topology sessions 182 // (#21931). byoc workers ran without this before #21931 and do not 183 // need it — different network path. 184 const keepAliveIntervalMs = 185 getPollIntervalConfig().session_keepalive_interval_v2_ms 186 if (this.isBridge && keepAliveIntervalMs > 0) { 187 this.keepAliveTimer = setInterval(() => { 188 logForDebugging('[remote-io] keep_alive sent') 189 void this.write({ type: 'keep_alive' }).catch(err => { 190 logForDebugging( 191 `[remote-io] keep_alive write failed: ${errorMessage(err)}`, 192 ) 193 }) 194 }, keepAliveIntervalMs) 195 this.keepAliveTimer.unref?.() 196 } 197 198 // Register for graceful shutdown cleanup 199 registerCleanup(async () => this.close()) 200 201 // If initial prompt is provided, send it through the input stream 202 if (initialPrompt) { 203 // Convert the initial prompt to the input stream format. 204 // Chunks from stdin may already contain trailing newlines, so strip 205 // them before appending our own to avoid double-newline issues that 206 // cause structuredIO to parse empty lines. String() handles both 207 // string chunks and Buffer objects from process.stdin. 208 const stream = this.inputStream 209 void (async () => { 210 for await (const chunk of initialPrompt) { 211 stream.write(String(chunk).replace(/\n$/, '') + '\n') 212 } 213 })() 214 } 215 } 216 217 override flushInternalEvents(): Promise<void> { 218 return this.ccrClient?.flushInternalEvents() ?? Promise.resolve() 219 } 220 221 override get internalEventsPending(): number { 222 return this.ccrClient?.internalEventsPending ?? 0 223 } 224 225 /** 226 * Send output to the transport. 227 * In bridge mode, control_request messages are always echoed to stdout so the 228 * bridge parent can detect permission requests. Other messages are echoed only 229 * in debug mode. 230 */ 231 async write(message: StdoutMessage): Promise<void> { 232 if (this.ccrClient) { 233 await this.ccrClient.writeEvent(message) 234 } else { 235 await this.transport.write(message) 236 } 237 if (this.isBridge) { 238 if (message.type === 'control_request' || this.isDebug) { 239 writeToStdout(ndjsonSafeStringify(message) + '\n') 240 } 241 } 242 } 243 244 /** 245 * Clean up connections gracefully 246 */ 247 close(): void { 248 if (this.keepAliveTimer) { 249 clearInterval(this.keepAliveTimer) 250 this.keepAliveTimer = null 251 } 252 this.transport.close() 253 this.inputStream.end() 254 } 255}