source dump of claude code
at main 404 lines 12 kB view raw
1import { randomUUID } from 'crypto' 2import { getOauthConfig } from '../constants/oauth.js' 3import type { SDKMessage } from '../entrypoints/agentSdkTypes.js' 4import type { 5 SDKControlCancelRequest, 6 SDKControlRequest, 7 SDKControlRequestInner, 8 SDKControlResponse, 9} from '../entrypoints/sdk/controlTypes.js' 10import { logForDebugging } from '../utils/debug.js' 11import { errorMessage } from '../utils/errors.js' 12import { logError } from '../utils/log.js' 13import { getWebSocketTLSOptions } from '../utils/mtls.js' 14import { getWebSocketProxyAgent, getWebSocketProxyUrl } from '../utils/proxy.js' 15import { jsonParse, jsonStringify } from '../utils/slowOperations.js' 16 17const RECONNECT_DELAY_MS = 2000 18const MAX_RECONNECT_ATTEMPTS = 5 19const PING_INTERVAL_MS = 30000 20 21/** 22 * Maximum retries for 4001 (session not found). During compaction the 23 * server may briefly consider the session stale; a short retry window 24 * lets the client recover without giving up permanently. 25 */ 26const MAX_SESSION_NOT_FOUND_RETRIES = 3 27 28/** 29 * WebSocket close codes that indicate a permanent server-side rejection. 30 * The client stops reconnecting immediately. 31 * Note: 4001 (session not found) is handled separately with limited 32 * retries since it can be transient during compaction. 33 */ 34const PERMANENT_CLOSE_CODES = new Set([ 35 4003, // unauthorized 36]) 37 38type WebSocketState = 'connecting' | 'connected' | 'closed' 39 40type SessionsMessage = 41 | SDKMessage 42 | SDKControlRequest 43 | SDKControlResponse 44 | SDKControlCancelRequest 45 46function isSessionsMessage(value: unknown): value is SessionsMessage { 47 if (typeof value !== 'object' || value === null || !('type' in value)) { 48 return false 49 } 50 // Accept any message with a string `type` field. Downstream handlers 51 // (sdkMessageAdapter, RemoteSessionManager) decide what to do with 52 // unknown types. A hardcoded allowlist here would silently drop new 53 // message types the backend starts sending before the client is updated. 54 return typeof value.type === 'string' 55} 56 57export type SessionsWebSocketCallbacks = { 58 onMessage: (message: SessionsMessage) => void 59 onClose?: () => void 60 onError?: (error: Error) => void 61 onConnected?: () => void 62 /** Fired when a transient close is detected and a reconnect is scheduled. 63 * onClose fires only for permanent close (server ended / attempts exhausted). */ 64 onReconnecting?: () => void 65} 66 67// Common interface between globalThis.WebSocket and ws.WebSocket 68type WebSocketLike = { 69 close(): void 70 send(data: string): void 71 ping?(): void // Bun & ws both support this 72} 73 74/** 75 * WebSocket client for connecting to CCR sessions via /v1/sessions/ws/{id}/subscribe 76 * 77 * Protocol: 78 * 1. Connect to wss://api.anthropic.com/v1/sessions/ws/{sessionId}/subscribe?organization_uuid=... 79 * 2. Send auth message: { type: 'auth', credential: { type: 'oauth', token: '...' } } 80 * 3. Receive SDKMessage stream from the session 81 */ 82export class SessionsWebSocket { 83 private ws: WebSocketLike | null = null 84 private state: WebSocketState = 'closed' 85 private reconnectAttempts = 0 86 private sessionNotFoundRetries = 0 87 private pingInterval: NodeJS.Timeout | null = null 88 private reconnectTimer: NodeJS.Timeout | null = null 89 90 constructor( 91 private readonly sessionId: string, 92 private readonly orgUuid: string, 93 private readonly getAccessToken: () => string, 94 private readonly callbacks: SessionsWebSocketCallbacks, 95 ) {} 96 97 /** 98 * Connect to the sessions WebSocket endpoint 99 */ 100 async connect(): Promise<void> { 101 if (this.state === 'connecting') { 102 logForDebugging('[SessionsWebSocket] Already connecting') 103 return 104 } 105 106 this.state = 'connecting' 107 108 const baseUrl = getOauthConfig().BASE_API_URL.replace('https://', 'wss://') 109 const url = `${baseUrl}/v1/sessions/ws/${this.sessionId}/subscribe?organization_uuid=${this.orgUuid}` 110 111 logForDebugging(`[SessionsWebSocket] Connecting to ${url}`) 112 113 // Get fresh token for each connection attempt 114 const accessToken = this.getAccessToken() 115 const headers = { 116 Authorization: `Bearer ${accessToken}`, 117 'anthropic-version': '2023-06-01', 118 } 119 120 if (typeof Bun !== 'undefined') { 121 // Bun's WebSocket supports headers/proxy options but the DOM typings don't 122 // eslint-disable-next-line eslint-plugin-n/no-unsupported-features/node-builtins 123 const ws = new globalThis.WebSocket(url, { 124 headers, 125 proxy: getWebSocketProxyUrl(url), 126 tls: getWebSocketTLSOptions() || undefined, 127 } as unknown as string[]) 128 this.ws = ws 129 130 ws.addEventListener('open', () => { 131 logForDebugging( 132 '[SessionsWebSocket] Connection opened, authenticated via headers', 133 ) 134 this.state = 'connected' 135 this.reconnectAttempts = 0 136 this.sessionNotFoundRetries = 0 137 this.startPingInterval() 138 this.callbacks.onConnected?.() 139 }) 140 141 ws.addEventListener('message', (event: MessageEvent) => { 142 const data = 143 typeof event.data === 'string' ? event.data : String(event.data) 144 this.handleMessage(data) 145 }) 146 147 ws.addEventListener('error', () => { 148 const err = new Error('[SessionsWebSocket] WebSocket error') 149 logError(err) 150 this.callbacks.onError?.(err) 151 }) 152 153 // eslint-disable-next-line eslint-plugin-n/no-unsupported-features/node-builtins 154 ws.addEventListener('close', (event: CloseEvent) => { 155 logForDebugging( 156 `[SessionsWebSocket] Closed: code=${event.code} reason=${event.reason}`, 157 ) 158 this.handleClose(event.code) 159 }) 160 161 ws.addEventListener('pong', () => { 162 logForDebugging('[SessionsWebSocket] Pong received') 163 }) 164 } else { 165 const { default: WS } = await import('ws') 166 const ws = new WS(url, { 167 headers, 168 agent: getWebSocketProxyAgent(url), 169 ...getWebSocketTLSOptions(), 170 }) 171 this.ws = ws 172 173 ws.on('open', () => { 174 logForDebugging( 175 '[SessionsWebSocket] Connection opened, authenticated via headers', 176 ) 177 // Auth is handled via headers, so we're immediately connected 178 this.state = 'connected' 179 this.reconnectAttempts = 0 180 this.sessionNotFoundRetries = 0 181 this.startPingInterval() 182 this.callbacks.onConnected?.() 183 }) 184 185 ws.on('message', (data: Buffer) => { 186 this.handleMessage(data.toString()) 187 }) 188 189 ws.on('error', (err: Error) => { 190 logError(new Error(`[SessionsWebSocket] Error: ${err.message}`)) 191 this.callbacks.onError?.(err) 192 }) 193 194 ws.on('close', (code: number, reason: Buffer) => { 195 logForDebugging( 196 `[SessionsWebSocket] Closed: code=${code} reason=${reason.toString()}`, 197 ) 198 this.handleClose(code) 199 }) 200 201 ws.on('pong', () => { 202 logForDebugging('[SessionsWebSocket] Pong received') 203 }) 204 } 205 } 206 207 /** 208 * Handle incoming WebSocket message 209 */ 210 private handleMessage(data: string): void { 211 try { 212 const message: unknown = jsonParse(data) 213 214 // Forward SDK messages to callback 215 if (isSessionsMessage(message)) { 216 this.callbacks.onMessage(message) 217 } else { 218 logForDebugging( 219 `[SessionsWebSocket] Ignoring message type: ${typeof message === 'object' && message !== null && 'type' in message ? String(message.type) : 'unknown'}`, 220 ) 221 } 222 } catch (error) { 223 logError( 224 new Error( 225 `[SessionsWebSocket] Failed to parse message: ${errorMessage(error)}`, 226 ), 227 ) 228 } 229 } 230 231 /** 232 * Handle WebSocket close 233 */ 234 private handleClose(closeCode: number): void { 235 this.stopPingInterval() 236 237 if (this.state === 'closed') { 238 return 239 } 240 241 this.ws = null 242 243 const previousState = this.state 244 this.state = 'closed' 245 246 // Permanent codes: stop reconnecting — server has definitively ended the session 247 if (PERMANENT_CLOSE_CODES.has(closeCode)) { 248 logForDebugging( 249 `[SessionsWebSocket] Permanent close code ${closeCode}, not reconnecting`, 250 ) 251 this.callbacks.onClose?.() 252 return 253 } 254 255 // 4001 (session not found) can be transient during compaction: the 256 // server may briefly consider the session stale while the CLI worker 257 // is busy with the compaction API call and not emitting events. 258 if (closeCode === 4001) { 259 this.sessionNotFoundRetries++ 260 if (this.sessionNotFoundRetries > MAX_SESSION_NOT_FOUND_RETRIES) { 261 logForDebugging( 262 `[SessionsWebSocket] 4001 retry budget exhausted (${MAX_SESSION_NOT_FOUND_RETRIES}), not reconnecting`, 263 ) 264 this.callbacks.onClose?.() 265 return 266 } 267 this.scheduleReconnect( 268 RECONNECT_DELAY_MS * this.sessionNotFoundRetries, 269 `4001 attempt ${this.sessionNotFoundRetries}/${MAX_SESSION_NOT_FOUND_RETRIES}`, 270 ) 271 return 272 } 273 274 // Attempt reconnection if we were connected 275 if ( 276 previousState === 'connected' && 277 this.reconnectAttempts < MAX_RECONNECT_ATTEMPTS 278 ) { 279 this.reconnectAttempts++ 280 this.scheduleReconnect( 281 RECONNECT_DELAY_MS, 282 `attempt ${this.reconnectAttempts}/${MAX_RECONNECT_ATTEMPTS}`, 283 ) 284 } else { 285 logForDebugging('[SessionsWebSocket] Not reconnecting') 286 this.callbacks.onClose?.() 287 } 288 } 289 290 private scheduleReconnect(delay: number, label: string): void { 291 this.callbacks.onReconnecting?.() 292 logForDebugging( 293 `[SessionsWebSocket] Scheduling reconnect (${label}) in ${delay}ms`, 294 ) 295 this.reconnectTimer = setTimeout(() => { 296 this.reconnectTimer = null 297 void this.connect() 298 }, delay) 299 } 300 301 private startPingInterval(): void { 302 this.stopPingInterval() 303 304 this.pingInterval = setInterval(() => { 305 if (this.ws && this.state === 'connected') { 306 try { 307 this.ws.ping?.() 308 } catch { 309 // Ignore ping errors, close handler will deal with connection issues 310 } 311 } 312 }, PING_INTERVAL_MS) 313 } 314 315 /** 316 * Stop ping interval 317 */ 318 private stopPingInterval(): void { 319 if (this.pingInterval) { 320 clearInterval(this.pingInterval) 321 this.pingInterval = null 322 } 323 } 324 325 /** 326 * Send a control response back to the session 327 */ 328 sendControlResponse(response: SDKControlResponse): void { 329 if (!this.ws || this.state !== 'connected') { 330 logError(new Error('[SessionsWebSocket] Cannot send: not connected')) 331 return 332 } 333 334 logForDebugging('[SessionsWebSocket] Sending control response') 335 this.ws.send(jsonStringify(response)) 336 } 337 338 /** 339 * Send a control request to the session (e.g., interrupt) 340 */ 341 sendControlRequest(request: SDKControlRequestInner): void { 342 if (!this.ws || this.state !== 'connected') { 343 logError(new Error('[SessionsWebSocket] Cannot send: not connected')) 344 return 345 } 346 347 const controlRequest: SDKControlRequest = { 348 type: 'control_request', 349 request_id: randomUUID(), 350 request, 351 } 352 353 logForDebugging( 354 `[SessionsWebSocket] Sending control request: ${request.subtype}`, 355 ) 356 this.ws.send(jsonStringify(controlRequest)) 357 } 358 359 /** 360 * Check if connected 361 */ 362 isConnected(): boolean { 363 return this.state === 'connected' 364 } 365 366 /** 367 * Close the WebSocket connection 368 */ 369 close(): void { 370 logForDebugging('[SessionsWebSocket] Closing connection') 371 this.state = 'closed' 372 this.stopPingInterval() 373 374 if (this.reconnectTimer) { 375 clearTimeout(this.reconnectTimer) 376 this.reconnectTimer = null 377 } 378 379 if (this.ws) { 380 // Null out event handlers to prevent race conditions during reconnect. 381 // Under Bun (native WebSocket), onX handlers are the clean way to detach. 382 // Under Node (ws package), the listeners were attached with .on() in connect(), 383 // but since we're about to close and null out this.ws, no cleanup is needed. 384 this.ws.close() 385 this.ws = null 386 } 387 } 388 389 /** 390 * Force reconnect - closes existing connection and establishes a new one. 391 * Useful when the subscription becomes stale (e.g., after container shutdown). 392 */ 393 reconnect(): void { 394 logForDebugging('[SessionsWebSocket] Force reconnecting') 395 this.reconnectAttempts = 0 396 this.sessionNotFoundRetries = 0 397 this.close() 398 // Small delay before reconnecting (stored in reconnectTimer so it can be cancelled) 399 this.reconnectTimer = setTimeout(() => { 400 this.reconnectTimer = null 401 void this.connect() 402 }, 500) 403 } 404}