source dump of claude code
at main 461 lines 16 kB view raw
1/** 2 * Shared transport-layer helpers for bridge message handling. 3 * 4 * Extracted from replBridge.ts so both the env-based core (initBridgeCore) 5 * and the env-less core (initEnvLessBridgeCore) can use the same ingress 6 * parsing, control-request handling, and echo-dedup machinery. 7 * 8 * Everything here is pure — no closure over bridge-specific state. All 9 * collaborators (transport, sessionId, UUID sets, callbacks) are passed 10 * as params. 11 */ 12 13import { randomUUID } from 'crypto' 14import type { SDKMessage } from '../entrypoints/agentSdkTypes.js' 15import type { 16 SDKControlRequest, 17 SDKControlResponse, 18} from '../entrypoints/sdk/controlTypes.js' 19import type { SDKResultSuccess } from '../entrypoints/sdk/coreTypes.js' 20import { logEvent } from '../services/analytics/index.js' 21import { EMPTY_USAGE } from '../services/api/emptyUsage.js' 22import type { Message } from '../types/message.js' 23import { normalizeControlMessageKeys } from '../utils/controlMessageCompat.js' 24import { logForDebugging } from '../utils/debug.js' 25import { stripDisplayTagsAllowEmpty } from '../utils/displayTags.js' 26import { errorMessage } from '../utils/errors.js' 27import type { PermissionMode } from '../utils/permissions/PermissionMode.js' 28import { jsonParse } from '../utils/slowOperations.js' 29import type { ReplBridgeTransport } from './replBridgeTransport.js' 30 31// ─── Type guards ───────────────────────────────────────────────────────────── 32 33/** Type predicate for parsed WebSocket messages. SDKMessage is a 34 * discriminated union on `type` — validating the discriminant is 35 * sufficient for the predicate; callers narrow further via the union. */ 36export function isSDKMessage(value: unknown): value is SDKMessage { 37 return ( 38 value !== null && 39 typeof value === 'object' && 40 'type' in value && 41 typeof value.type === 'string' 42 ) 43} 44 45/** Type predicate for control_response messages from the server. */ 46export function isSDKControlResponse( 47 value: unknown, 48): value is SDKControlResponse { 49 return ( 50 value !== null && 51 typeof value === 'object' && 52 'type' in value && 53 value.type === 'control_response' && 54 'response' in value 55 ) 56} 57 58/** Type predicate for control_request messages from the server. */ 59export function isSDKControlRequest( 60 value: unknown, 61): value is SDKControlRequest { 62 return ( 63 value !== null && 64 typeof value === 'object' && 65 'type' in value && 66 value.type === 'control_request' && 67 'request_id' in value && 68 'request' in value 69 ) 70} 71 72/** 73 * True for message types that should be forwarded to the bridge transport. 74 * The server only wants user/assistant turns and slash-command system events; 75 * everything else (tool_result, progress, etc.) is internal REPL chatter. 76 */ 77export function isEligibleBridgeMessage(m: Message): boolean { 78 // Virtual messages (REPL inner calls) are display-only — bridge/SDK 79 // consumers see the REPL tool_use/result which summarizes the work. 80 if ((m.type === 'user' || m.type === 'assistant') && m.isVirtual) { 81 return false 82 } 83 return ( 84 m.type === 'user' || 85 m.type === 'assistant' || 86 (m.type === 'system' && m.subtype === 'local_command') 87 ) 88} 89 90/** 91 * Extract title-worthy text from a Message for onUserMessage. Returns 92 * undefined for messages that shouldn't title the session: non-user, meta 93 * (nudges), tool results, compact summaries, non-human origins (task 94 * notifications, channel messages), or pure display-tag content 95 * (<ide_opened_file>, <session-start-hook>, etc.). 96 * 97 * Synthetic interrupts ([Request interrupted by user]) are NOT filtered here — 98 * isSyntheticMessage lives in messages.ts (heavy import, pulls command 99 * registry). The initialMessages path in initReplBridge checks it; the 100 * writeMessages path reaching an interrupt as the *first* message is 101 * implausible (an interrupt implies a prior prompt already flowed through). 102 */ 103export function extractTitleText(m: Message): string | undefined { 104 if (m.type !== 'user' || m.isMeta || m.toolUseResult || m.isCompactSummary) 105 return undefined 106 if (m.origin && m.origin.kind !== 'human') return undefined 107 const content = m.message.content 108 let raw: string | undefined 109 if (typeof content === 'string') { 110 raw = content 111 } else { 112 for (const block of content) { 113 if (block.type === 'text') { 114 raw = block.text 115 break 116 } 117 } 118 } 119 if (!raw) return undefined 120 const clean = stripDisplayTagsAllowEmpty(raw) 121 return clean || undefined 122} 123 124// ─── Ingress routing ───────────────────────────────────────────────────────── 125 126/** 127 * Parse an ingress WebSocket message and route it to the appropriate handler. 128 * Ignores messages whose UUID is in recentPostedUUIDs (echoes of what we sent) 129 * or in recentInboundUUIDs (re-deliveries we've already forwarded — e.g. 130 * server replayed history after a transport swap lost the seq-num cursor). 131 */ 132export function handleIngressMessage( 133 data: string, 134 recentPostedUUIDs: BoundedUUIDSet, 135 recentInboundUUIDs: BoundedUUIDSet, 136 onInboundMessage: ((msg: SDKMessage) => void | Promise<void>) | undefined, 137 onPermissionResponse?: ((response: SDKControlResponse) => void) | undefined, 138 onControlRequest?: ((request: SDKControlRequest) => void) | undefined, 139): void { 140 try { 141 const parsed: unknown = normalizeControlMessageKeys(jsonParse(data)) 142 143 // control_response is not an SDKMessage — check before the type guard 144 if (isSDKControlResponse(parsed)) { 145 logForDebugging('[bridge:repl] Ingress message type=control_response') 146 onPermissionResponse?.(parsed) 147 return 148 } 149 150 // control_request from the server (initialize, set_model, can_use_tool). 151 // Must respond promptly or the server kills the WS (~10-14s timeout). 152 if (isSDKControlRequest(parsed)) { 153 logForDebugging( 154 `[bridge:repl] Inbound control_request subtype=${parsed.request.subtype}`, 155 ) 156 onControlRequest?.(parsed) 157 return 158 } 159 160 if (!isSDKMessage(parsed)) return 161 162 // Check for UUID to detect echoes of our own messages 163 const uuid = 164 'uuid' in parsed && typeof parsed.uuid === 'string' 165 ? parsed.uuid 166 : undefined 167 168 if (uuid && recentPostedUUIDs.has(uuid)) { 169 logForDebugging( 170 `[bridge:repl] Ignoring echo: type=${parsed.type} uuid=${uuid}`, 171 ) 172 return 173 } 174 175 // Defensive dedup: drop inbound prompts we've already forwarded. The 176 // SSE seq-num carryover (lastTransportSequenceNum) is the primary fix 177 // for history-replay; this catches edge cases where that negotiation 178 // fails (server ignores from_sequence_num, transport died before 179 // receiving any frames, etc). 180 if (uuid && recentInboundUUIDs.has(uuid)) { 181 logForDebugging( 182 `[bridge:repl] Ignoring re-delivered inbound: type=${parsed.type} uuid=${uuid}`, 183 ) 184 return 185 } 186 187 logForDebugging( 188 `[bridge:repl] Ingress message type=${parsed.type}${uuid ? ` uuid=${uuid}` : ''}`, 189 ) 190 191 if (parsed.type === 'user') { 192 if (uuid) recentInboundUUIDs.add(uuid) 193 logEvent('tengu_bridge_message_received', { 194 is_repl: true, 195 }) 196 // Fire-and-forget — handler may be async (attachment resolution). 197 void onInboundMessage?.(parsed) 198 } else { 199 logForDebugging( 200 `[bridge:repl] Ignoring non-user inbound message: type=${parsed.type}`, 201 ) 202 } 203 } catch (err) { 204 logForDebugging( 205 `[bridge:repl] Failed to parse ingress message: ${errorMessage(err)}`, 206 ) 207 } 208} 209 210// ─── Server-initiated control requests ─────────────────────────────────────── 211 212export type ServerControlRequestHandlers = { 213 transport: ReplBridgeTransport | null 214 sessionId: string 215 /** 216 * When true, all mutable requests (interrupt, set_model, set_permission_mode, 217 * set_max_thinking_tokens) reply with an error instead of false-success. 218 * initialize still replies success — the server kills the connection otherwise. 219 * Used by the outbound-only bridge mode and the SDK's /bridge subpath so claude.ai sees a 220 * proper error instead of "action succeeded but nothing happened locally". 221 */ 222 outboundOnly?: boolean 223 onInterrupt?: () => void 224 onSetModel?: (model: string | undefined) => void 225 onSetMaxThinkingTokens?: (maxTokens: number | null) => void 226 onSetPermissionMode?: ( 227 mode: PermissionMode, 228 ) => { ok: true } | { ok: false; error: string } 229} 230 231const OUTBOUND_ONLY_ERROR = 232 'This session is outbound-only. Enable Remote Control locally to allow inbound control.' 233 234/** 235 * Respond to inbound control_request messages from the server. The server 236 * sends these for session lifecycle events (initialize, set_model) and 237 * for turn-level coordination (interrupt, set_max_thinking_tokens). If we 238 * don't respond, the server hangs and kills the WS after ~10-14s. 239 * 240 * Previously a closure inside initBridgeCore's onWorkReceived; now takes 241 * collaborators as params so both cores can use it. 242 */ 243export function handleServerControlRequest( 244 request: SDKControlRequest, 245 handlers: ServerControlRequestHandlers, 246): void { 247 const { 248 transport, 249 sessionId, 250 outboundOnly, 251 onInterrupt, 252 onSetModel, 253 onSetMaxThinkingTokens, 254 onSetPermissionMode, 255 } = handlers 256 if (!transport) { 257 logForDebugging( 258 '[bridge:repl] Cannot respond to control_request: transport not configured', 259 ) 260 return 261 } 262 263 let response: SDKControlResponse 264 265 // Outbound-only: reply error for mutable requests so claude.ai doesn't show 266 // false success. initialize must still succeed (server kills the connection 267 // if it doesn't — see comment above). 268 if (outboundOnly && request.request.subtype !== 'initialize') { 269 response = { 270 type: 'control_response', 271 response: { 272 subtype: 'error', 273 request_id: request.request_id, 274 error: OUTBOUND_ONLY_ERROR, 275 }, 276 } 277 const event = { ...response, session_id: sessionId } 278 void transport.write(event) 279 logForDebugging( 280 `[bridge:repl] Rejected ${request.request.subtype} (outbound-only) request_id=${request.request_id}`, 281 ) 282 return 283 } 284 285 switch (request.request.subtype) { 286 case 'initialize': 287 // Respond with minimal capabilities — the REPL handles 288 // commands, models, and account info itself. 289 response = { 290 type: 'control_response', 291 response: { 292 subtype: 'success', 293 request_id: request.request_id, 294 response: { 295 commands: [], 296 output_style: 'normal', 297 available_output_styles: ['normal'], 298 models: [], 299 account: {}, 300 pid: process.pid, 301 }, 302 }, 303 } 304 break 305 306 case 'set_model': 307 onSetModel?.(request.request.model) 308 response = { 309 type: 'control_response', 310 response: { 311 subtype: 'success', 312 request_id: request.request_id, 313 }, 314 } 315 break 316 317 case 'set_max_thinking_tokens': 318 onSetMaxThinkingTokens?.(request.request.max_thinking_tokens) 319 response = { 320 type: 'control_response', 321 response: { 322 subtype: 'success', 323 request_id: request.request_id, 324 }, 325 } 326 break 327 328 case 'set_permission_mode': { 329 // The callback returns a policy verdict so we can send an error 330 // control_response without importing isAutoModeGateEnabled / 331 // isBypassPermissionsModeDisabled here (bootstrap-isolation). If no 332 // callback is registered (daemon context, which doesn't wire this — 333 // see daemonBridge.ts), return an error verdict rather than a silent 334 // false-success: the mode is never actually applied in that context, 335 // so success would lie to the client. 336 const verdict = onSetPermissionMode?.(request.request.mode) ?? { 337 ok: false, 338 error: 339 'set_permission_mode is not supported in this context (onSetPermissionMode callback not registered)', 340 } 341 if (verdict.ok) { 342 response = { 343 type: 'control_response', 344 response: { 345 subtype: 'success', 346 request_id: request.request_id, 347 }, 348 } 349 } else { 350 response = { 351 type: 'control_response', 352 response: { 353 subtype: 'error', 354 request_id: request.request_id, 355 error: verdict.error, 356 }, 357 } 358 } 359 break 360 } 361 362 case 'interrupt': 363 onInterrupt?.() 364 response = { 365 type: 'control_response', 366 response: { 367 subtype: 'success', 368 request_id: request.request_id, 369 }, 370 } 371 break 372 373 default: 374 // Unknown subtype — respond with error so the server doesn't 375 // hang waiting for a reply that never comes. 376 response = { 377 type: 'control_response', 378 response: { 379 subtype: 'error', 380 request_id: request.request_id, 381 error: `REPL bridge does not handle control_request subtype: ${request.request.subtype}`, 382 }, 383 } 384 } 385 386 const event = { ...response, session_id: sessionId } 387 void transport.write(event) 388 logForDebugging( 389 `[bridge:repl] Sent control_response for ${request.request.subtype} request_id=${request.request_id} result=${response.response.subtype}`, 390 ) 391} 392 393// ─── Result message (for session archival on teardown) ─────────────────────── 394 395/** 396 * Build a minimal `SDKResultSuccess` message for session archival. 397 * The server needs this event before a WS close to trigger archival. 398 */ 399export function makeResultMessage(sessionId: string): SDKResultSuccess { 400 return { 401 type: 'result', 402 subtype: 'success', 403 duration_ms: 0, 404 duration_api_ms: 0, 405 is_error: false, 406 num_turns: 0, 407 result: '', 408 stop_reason: null, 409 total_cost_usd: 0, 410 usage: { ...EMPTY_USAGE }, 411 modelUsage: {}, 412 permission_denials: [], 413 session_id: sessionId, 414 uuid: randomUUID(), 415 } 416} 417 418// ─── BoundedUUIDSet (echo-dedup ring buffer) ───────────────────────────────── 419 420/** 421 * FIFO-bounded set backed by a circular buffer. Evicts the oldest entry 422 * when capacity is reached, keeping memory usage constant at O(capacity). 423 * 424 * Messages are added in chronological order, so evicted entries are always 425 * the oldest. The caller relies on external ordering (the hook's 426 * lastWrittenIndexRef) as the primary dedup — this set is a secondary 427 * safety net for echo filtering and race-condition dedup. 428 */ 429export class BoundedUUIDSet { 430 private readonly capacity: number 431 private readonly ring: (string | undefined)[] 432 private readonly set = new Set<string>() 433 private writeIdx = 0 434 435 constructor(capacity: number) { 436 this.capacity = capacity 437 this.ring = new Array<string | undefined>(capacity) 438 } 439 440 add(uuid: string): void { 441 if (this.set.has(uuid)) return 442 // Evict the entry at the current write position (if occupied) 443 const evicted = this.ring[this.writeIdx] 444 if (evicted !== undefined) { 445 this.set.delete(evicted) 446 } 447 this.ring[this.writeIdx] = uuid 448 this.set.add(uuid) 449 this.writeIdx = (this.writeIdx + 1) % this.capacity 450 } 451 452 has(uuid: string): boolean { 453 return this.set.has(uuid) 454 } 455 456 clear(): void { 457 this.set.clear() 458 this.ring.fill(undefined) 459 this.writeIdx = 0 460 } 461}