source dump of claude code
at main 544 lines 21 kB view raw
1// Anthropic voice_stream speech-to-text client for push-to-talk. 2// 3// Only reachable in ant builds (gated by feature('VOICE_MODE') in useVoice.ts import). 4// 5// Connects to Anthropic's voice_stream WebSocket endpoint using the same 6// OAuth credentials as Claude Code. The endpoint uses conversation_engine 7// backed models for speech-to-text. Designed for hold-to-talk: hold the 8// keybinding to record, release to stop and submit. 9// 10// The wire protocol uses JSON control messages (KeepAlive, CloseStream) and 11// binary audio frames. The server responds with TranscriptText and 12// TranscriptEndpoint JSON messages. 13 14import type { ClientRequest, IncomingMessage } from 'http' 15import WebSocket from 'ws' 16import { getOauthConfig } from '../constants/oauth.js' 17import { 18 checkAndRefreshOAuthTokenIfNeeded, 19 getClaudeAIOAuthTokens, 20 isAnthropicAuthEnabled, 21} from '../utils/auth.js' 22import { logForDebugging } from '../utils/debug.js' 23import { getUserAgent } from '../utils/http.js' 24import { logError } from '../utils/log.js' 25import { getWebSocketTLSOptions } from '../utils/mtls.js' 26import { getWebSocketProxyAgent, getWebSocketProxyUrl } from '../utils/proxy.js' 27import { jsonParse, jsonStringify } from '../utils/slowOperations.js' 28 29const KEEPALIVE_MSG = '{"type":"KeepAlive"}' 30const CLOSE_STREAM_MSG = '{"type":"CloseStream"}' 31 32import { getFeatureValue_CACHED_MAY_BE_STALE } from './analytics/growthbook.js' 33 34// ─── Constants ─────────────────────────────────────────────────────── 35 36const VOICE_STREAM_PATH = '/api/ws/speech_to_text/voice_stream' 37 38const KEEPALIVE_INTERVAL_MS = 8_000 39 40// finalize() resolution timers. `noData` fires when no TranscriptText 41// arrives post-CloseStream — the server has nothing; don't wait out the 42// full ~3-5s WS teardown to confirm emptiness. `safety` is the last- 43// resort cap if the WS hangs. Exported so tests can shorten them. 44export const FINALIZE_TIMEOUTS_MS = { 45 safety: 5_000, 46 noData: 1_500, 47} 48 49// ─── Types ────────────────────────────────────────────────────────── 50 51export type VoiceStreamCallbacks = { 52 onTranscript: (text: string, isFinal: boolean) => void 53 onError: (error: string, opts?: { fatal?: boolean }) => void 54 onClose: () => void 55 onReady: (connection: VoiceStreamConnection) => void 56} 57 58// How finalize() resolved. `no_data_timeout` means zero server messages 59// after CloseStream — the silent-drop signature (anthropics/anthropic#287008). 60export type FinalizeSource = 61 | 'post_closestream_endpoint' 62 | 'no_data_timeout' 63 | 'safety_timeout' 64 | 'ws_close' 65 | 'ws_already_closed' 66 67export type VoiceStreamConnection = { 68 send: (audioChunk: Buffer) => void 69 finalize: () => Promise<FinalizeSource> 70 close: () => void 71 isConnected: () => boolean 72} 73 74// The voice_stream endpoint returns transcript chunks and endpoint markers. 75type VoiceStreamTranscriptText = { 76 type: 'TranscriptText' 77 data: string 78} 79 80type VoiceStreamTranscriptEndpoint = { 81 type: 'TranscriptEndpoint' 82} 83 84type VoiceStreamTranscriptError = { 85 type: 'TranscriptError' 86 error_code?: string 87 description?: string 88} 89 90type VoiceStreamMessage = 91 | VoiceStreamTranscriptText 92 | VoiceStreamTranscriptEndpoint 93 | VoiceStreamTranscriptError 94 | { type: 'error'; message?: string } 95 96// ─── Availability ────────────────────────────────────────────────────── 97 98export function isVoiceStreamAvailable(): boolean { 99 // voice_stream uses the same OAuth as Claude Code — available when the 100 // user is authenticated with Anthropic (Claude.ai subscriber or has 101 // valid OAuth tokens). 102 if (!isAnthropicAuthEnabled()) { 103 return false 104 } 105 const tokens = getClaudeAIOAuthTokens() 106 return tokens !== null && tokens.accessToken !== null 107} 108 109// ─── Connection ──────────────────────────────────────────────────────── 110 111export async function connectVoiceStream( 112 callbacks: VoiceStreamCallbacks, 113 options?: { language?: string; keyterms?: string[] }, 114): Promise<VoiceStreamConnection | null> { 115 // Ensure OAuth token is fresh before connecting 116 await checkAndRefreshOAuthTokenIfNeeded() 117 118 const tokens = getClaudeAIOAuthTokens() 119 if (!tokens?.accessToken) { 120 logForDebugging('[voice_stream] No OAuth token available') 121 return null 122 } 123 124 // voice_stream is a private_api route, but /api/ws/ is also exposed on 125 // the api.anthropic.com listener (service_definitions.yaml private-api: 126 // visibility.external: true). We target that host instead of claude.ai 127 // because the claude.ai CF zone uses TLS fingerprinting and challenges 128 // non-browser clients (anthropics/claude-code#34094). Same private-api 129 // pod, same OAuth Bearer auth — just a CF zone that doesn't block us. 130 // Desktop dictation still uses claude.ai (Swift URLSession has a 131 // browser-class JA3 fingerprint, so CF lets it through). 132 const wsBaseUrl = 133 process.env.VOICE_STREAM_BASE_URL || 134 getOauthConfig() 135 .BASE_API_URL.replace('https://', 'wss://') 136 .replace('http://', 'ws://') 137 138 if (process.env.VOICE_STREAM_BASE_URL) { 139 logForDebugging( 140 `[voice_stream] Using VOICE_STREAM_BASE_URL override: ${process.env.VOICE_STREAM_BASE_URL}`, 141 ) 142 } 143 144 const params = new URLSearchParams({ 145 encoding: 'linear16', 146 sample_rate: '16000', 147 channels: '1', 148 endpointing_ms: '300', 149 utterance_end_ms: '1000', 150 language: options?.language ?? 'en', 151 }) 152 153 // Route through conversation-engine with Deepgram Nova 3 (bypassing 154 // the server's project_bell_v2_config GrowthBook gate). The server 155 // side is anthropics/anthropic#278327 + #281372; this lets us ramp 156 // clients independently. 157 const isNova3 = getFeatureValue_CACHED_MAY_BE_STALE( 158 'tengu_cobalt_frost', 159 false, 160 ) 161 if (isNova3) { 162 params.set('use_conversation_engine', 'true') 163 params.set('stt_provider', 'deepgram-nova3') 164 logForDebugging('[voice_stream] Nova 3 gate enabled (tengu_cobalt_frost)') 165 } 166 167 // Append keyterms as query params — the voice_stream proxy forwards 168 // these to the STT service which applies appropriate boosting. 169 if (options?.keyterms?.length) { 170 for (const term of options.keyterms) { 171 params.append('keyterms', term) 172 } 173 } 174 175 const url = `${wsBaseUrl}${VOICE_STREAM_PATH}?${params.toString()}` 176 177 logForDebugging(`[voice_stream] Connecting to ${url}`) 178 179 const headers: Record<string, string> = { 180 Authorization: `Bearer ${tokens.accessToken}`, 181 'User-Agent': getUserAgent(), 182 'x-app': 'cli', 183 } 184 185 const tlsOptions = getWebSocketTLSOptions() 186 const wsOptions = 187 typeof Bun !== 'undefined' 188 ? { 189 headers, 190 proxy: getWebSocketProxyUrl(url), 191 tls: tlsOptions || undefined, 192 } 193 : { headers, agent: getWebSocketProxyAgent(url), ...tlsOptions } 194 195 const ws = new WebSocket(url, wsOptions) 196 197 let keepaliveTimer: ReturnType<typeof setInterval> | null = null 198 let connected = false 199 // Set to true once CloseStream has been sent (or the ws is closed). 200 // After this, further audio sends are dropped. 201 let finalized = false 202 // Set to true when finalize() is first called, to prevent double-fire. 203 let finalizing = false 204 // Set when the HTTP upgrade was rejected (unexpected-response). The 205 // close event that follows (1006 from our req.destroy()) is just 206 // mechanical teardown; the upgrade handler already reported the error. 207 let upgradeRejected = false 208 // Resolves finalize(). Four triggers: TranscriptEndpoint post-CloseStream 209 // (~300ms); no-data timer (1.5s); WS close (~3-5s); safety timer (5s). 210 let resolveFinalize: ((source: FinalizeSource) => void) | null = null 211 let cancelNoDataTimer: (() => void) | null = null 212 213 // Define the connection object before event handlers so it can be passed 214 // to onReady when the WebSocket opens. 215 const connection: VoiceStreamConnection = { 216 send(audioChunk: Buffer): void { 217 if (ws.readyState !== WebSocket.OPEN) { 218 return 219 } 220 if (finalized) { 221 // After CloseStream has been sent, the server rejects further audio. 222 // Drop the chunk to avoid a protocol error. 223 logForDebugging( 224 `[voice_stream] Dropping audio chunk after CloseStream: ${String(audioChunk.length)} bytes`, 225 ) 226 return 227 } 228 logForDebugging( 229 `[voice_stream] Sending audio chunk: ${String(audioChunk.length)} bytes`, 230 ) 231 // Copy the buffer before sending: NAPI Buffer objects from native 232 // modules may share a pooled ArrayBuffer. Creating a view with 233 // `new Uint8Array(buf.buffer, offset, len)` can reference stale or 234 // overlapping memory by the time the ws library reads it. 235 // `Buffer.from()` makes an owned copy that the ws library can safely 236 // consume as a binary WebSocket frame. 237 ws.send(Buffer.from(audioChunk)) 238 }, 239 finalize(): Promise<FinalizeSource> { 240 if (finalizing || finalized) { 241 // Already finalized or WebSocket already closed — resolve immediately. 242 return Promise.resolve('ws_already_closed') 243 } 244 finalizing = true 245 246 return new Promise<FinalizeSource>(resolve => { 247 const safetyTimer = setTimeout( 248 () => resolveFinalize?.('safety_timeout'), 249 FINALIZE_TIMEOUTS_MS.safety, 250 ) 251 const noDataTimer = setTimeout( 252 () => resolveFinalize?.('no_data_timeout'), 253 FINALIZE_TIMEOUTS_MS.noData, 254 ) 255 cancelNoDataTimer = () => { 256 clearTimeout(noDataTimer) 257 cancelNoDataTimer = null 258 } 259 260 resolveFinalize = (source: FinalizeSource) => { 261 clearTimeout(safetyTimer) 262 clearTimeout(noDataTimer) 263 resolveFinalize = null 264 cancelNoDataTimer = null 265 // Legacy Deepgram can leave an interim in lastTranscriptText 266 // with no TranscriptEndpoint (websocket_manager.py sends 267 // TranscriptChunk and TranscriptEndpoint as independent 268 // channel items). All resolve triggers must promote it; 269 // centralize here. No-op when the close handler already did. 270 if (lastTranscriptText) { 271 logForDebugging( 272 `[voice_stream] Promoting unreported interim before ${source} resolve`, 273 ) 274 const t = lastTranscriptText 275 lastTranscriptText = '' 276 callbacks.onTranscript(t, true) 277 } 278 logForDebugging(`[voice_stream] Finalize resolved via ${source}`) 279 resolve(source) 280 } 281 282 // If the WebSocket is already closed, resolve immediately. 283 if ( 284 ws.readyState === WebSocket.CLOSED || 285 ws.readyState === WebSocket.CLOSING 286 ) { 287 resolveFinalize('ws_already_closed') 288 return 289 } 290 291 // Defer CloseStream to the next event-loop iteration so any audio 292 // callbacks already queued by the native recording module are flushed 293 // to the WebSocket before the server is told to stop accepting audio. 294 // Without this, stopRecording() can return synchronously while the 295 // native module still has a pending onData callback in the event queue, 296 // causing audio to arrive after CloseStream. 297 setTimeout(() => { 298 finalized = true 299 if (ws.readyState === WebSocket.OPEN) { 300 logForDebugging('[voice_stream] Sending CloseStream (finalize)') 301 ws.send(CLOSE_STREAM_MSG) 302 } 303 }, 0) 304 }) 305 }, 306 close(): void { 307 finalized = true 308 if (keepaliveTimer) { 309 clearInterval(keepaliveTimer) 310 keepaliveTimer = null 311 } 312 connected = false 313 if (ws.readyState === WebSocket.OPEN) { 314 ws.close() 315 } 316 }, 317 isConnected(): boolean { 318 return connected && ws.readyState === WebSocket.OPEN 319 }, 320 } 321 322 ws.on('open', () => { 323 logForDebugging('[voice_stream] WebSocket connected') 324 connected = true 325 326 // Send an immediate KeepAlive so the server knows the client is active. 327 // Audio hardware initialisation can take >1s, so this prevents the 328 // server from closing the connection before audio capture starts. 329 logForDebugging('[voice_stream] Sending initial KeepAlive') 330 ws.send(KEEPALIVE_MSG) 331 332 // Send periodic keepalive to prevent idle timeout 333 keepaliveTimer = setInterval( 334 ws => { 335 if (ws.readyState === WebSocket.OPEN) { 336 logForDebugging('[voice_stream] Sending periodic KeepAlive') 337 ws.send(KEEPALIVE_MSG) 338 } 339 }, 340 KEEPALIVE_INTERVAL_MS, 341 ws, 342 ) 343 344 // Pass the connection to the caller so it can start sending audio. 345 // This fires only after the WebSocket is truly open, guaranteeing 346 // that send() calls will not be silently dropped. 347 callbacks.onReady(connection) 348 }) 349 350 // Track the last TranscriptText so that when TranscriptEndpoint arrives 351 // we can emit it as the final transcript. The server sometimes sends 352 // multiple non-cumulative TranscriptText messages without endpoints 353 // between them; the TranscriptText handler auto-finalizes previous 354 // segments when it detects the text has changed non-cumulatively. 355 let lastTranscriptText = '' 356 357 ws.on('message', (raw: Buffer | string) => { 358 const text = raw.toString() 359 logForDebugging( 360 `[voice_stream] Message received (${String(text.length)} chars): ${text.slice(0, 200)}`, 361 ) 362 let msg: VoiceStreamMessage 363 try { 364 msg = jsonParse(text) as VoiceStreamMessage 365 } catch { 366 return 367 } 368 369 switch (msg.type) { 370 case 'TranscriptText': { 371 const transcript = msg.data 372 logForDebugging(`[voice_stream] TranscriptText: "${transcript ?? ''}"`) 373 // Data arrived after CloseStream — disarm the no-data timer so 374 // a slow-but-real flush isn't cut off. Only disarm once finalized 375 // (CloseStream sent); pre-CloseStream data racing the deferred 376 // send would cancel the timer prematurely, falling back to the 377 // slower 5s safety timeout instead of the 1.5s no-data timer. 378 if (finalized) { 379 cancelNoDataTimer?.() 380 } 381 if (transcript) { 382 // Detect when the server has moved to a new speech segment. 383 // Progressive refinements extend or shorten the previous text 384 // (e.g., "hello" → "hello world", or "hello wor" → "hello wo"). 385 // A new segment starts with completely different text (neither 386 // is a prefix of the other). When detected, emit the previous 387 // text as final so the caller can accumulate it, preventing 388 // the new segment from overwriting and losing the old one. 389 // 390 // Nova 3's interims are cumulative across segments AND can 391 // revise earlier text ("Hello?" → "Hello."). Revision breaks 392 // the prefix check, causing false auto-finalize → the same 393 // text committed once AND re-appearing in the cumulative 394 // interim = duplication. Nova 3 only endpoints on the final 395 // flush, so auto-finalize is never correct for it. 396 if (!isNova3 && lastTranscriptText) { 397 const prev = lastTranscriptText.trimStart() 398 const next = transcript.trimStart() 399 if ( 400 prev && 401 next && 402 !next.startsWith(prev) && 403 !prev.startsWith(next) 404 ) { 405 logForDebugging( 406 `[voice_stream] Auto-finalizing previous segment (new segment detected): "${lastTranscriptText}"`, 407 ) 408 callbacks.onTranscript(lastTranscriptText, true) 409 } 410 } 411 lastTranscriptText = transcript 412 // Emit as interim so the caller can show a live preview. 413 callbacks.onTranscript(transcript, false) 414 } 415 break 416 } 417 case 'TranscriptEndpoint': { 418 logForDebugging( 419 `[voice_stream] TranscriptEndpoint received, lastTranscriptText="${lastTranscriptText}"`, 420 ) 421 // The server signals the end of an utterance. Emit the last 422 // TranscriptText as a final transcript so the caller can commit it. 423 const finalText = lastTranscriptText 424 lastTranscriptText = '' 425 if (finalText) { 426 callbacks.onTranscript(finalText, true) 427 } 428 // When TranscriptEndpoint arrives after CloseStream was sent, 429 // the server has flushed its final transcript — nothing more is 430 // coming. Resolve finalize now so the caller reads the 431 // accumulated buffer immediately (~300ms) instead of waiting 432 // for the WebSocket close event (~3-5s of server teardown). 433 // `finalized` (not `finalizing`) is the right gate: it flips 434 // inside the setTimeout(0) that actually sends CloseStream, so 435 // a TranscriptEndpoint that races the deferred send still waits. 436 if (finalized) { 437 resolveFinalize?.('post_closestream_endpoint') 438 } 439 break 440 } 441 case 'TranscriptError': { 442 const desc = 443 msg.description ?? msg.error_code ?? 'unknown transcription error' 444 logForDebugging(`[voice_stream] TranscriptError: ${desc}`) 445 if (!finalizing) { 446 callbacks.onError(desc) 447 } 448 break 449 } 450 case 'error': { 451 const errorDetail = msg.message ?? jsonStringify(msg) 452 logForDebugging(`[voice_stream] Server error: ${errorDetail}`) 453 if (!finalizing) { 454 callbacks.onError(errorDetail) 455 } 456 break 457 } 458 default: 459 break 460 } 461 }) 462 463 ws.on('close', (code, reason) => { 464 const reasonStr = reason?.toString() ?? '' 465 logForDebugging( 466 `[voice_stream] WebSocket closed: code=${String(code)} reason="${reasonStr}"`, 467 ) 468 connected = false 469 if (keepaliveTimer) { 470 clearInterval(keepaliveTimer) 471 keepaliveTimer = null 472 } 473 // If the server closed the connection before sending TranscriptEndpoint, 474 // promote the last interim transcript to final so no text is lost. 475 if (lastTranscriptText) { 476 logForDebugging( 477 '[voice_stream] Promoting unreported interim transcript to final on close', 478 ) 479 const finalText = lastTranscriptText 480 lastTranscriptText = '' 481 callbacks.onTranscript(finalText, true) 482 } 483 // During finalize, suppress onError — the session already delivered 484 // whatever it had. useVoice's onError path wipes accumulatedRef, 485 // which would destroy the transcript before the finalize .then() 486 // reads it. `finalizing` (not resolveFinalize) is the gate: set once 487 // at finalize() entry, never cleared, so it stays accurate after the 488 // fast path or a timer already resolved. 489 resolveFinalize?.('ws_close') 490 if (!finalizing && !upgradeRejected && code !== 1000 && code !== 1005) { 491 callbacks.onError( 492 `Connection closed: code ${String(code)}${reasonStr ? `${reasonStr}` : ''}`, 493 ) 494 } 495 callbacks.onClose() 496 }) 497 498 // The ws library fires 'unexpected-response' when the HTTP upgrade 499 // returns a non-101 status. Listening lets us surface the actual status 500 // and flag 4xx as fatal (same token/TLS fingerprint won't change on 501 // retry). With a listener registered, ws does NOT abort on our behalf — 502 // we destroy the request; 'error' does not fire, 'close' does (suppressed 503 // via upgradeRejected above). 504 // 505 // Bun's ws shim historically didn't implement this event (a warning 506 // is logged once at registration). Under Bun a non-101 upgrade falls 507 // through to the generic 'error' + 'close' 1002 path with no recoverable 508 // status; the attemptGenRef guard in useVoice.ts still surfaces the 509 // retry-attempt failure, the user just sees "Expected 101 status code" 510 // instead of "HTTP 503". No harm — the gen fix is the load-bearing part. 511 ws.on('unexpected-response', (req: ClientRequest, res: IncomingMessage) => { 512 const status = res.statusCode ?? 0 513 // Bun's ws implementation on Windows can fire this event for a 514 // successful 101 Switching Protocols response (anthropics/claude-code#40510). 515 // 101 is never a rejection — bail before we destroy a working upgrade. 516 if (status === 101) { 517 logForDebugging( 518 '[voice_stream] unexpected-response fired with 101; ignoring', 519 ) 520 return 521 } 522 logForDebugging( 523 `[voice_stream] Upgrade rejected: status=${String(status)} cf-mitigated=${String(res.headers['cf-mitigated'])} cf-ray=${String(res.headers['cf-ray'])}`, 524 ) 525 upgradeRejected = true 526 res.resume() 527 req.destroy() 528 if (finalizing) return 529 callbacks.onError( 530 `WebSocket upgrade rejected with HTTP ${String(status)}`, 531 { fatal: status >= 400 && status < 500 }, 532 ) 533 }) 534 535 ws.on('error', (err: Error) => { 536 logError(err) 537 logForDebugging(`[voice_stream] WebSocket error: ${err.message}`) 538 if (!finalizing) { 539 callbacks.onError(`Voice stream connection error: ${err.message}`) 540 } 541 }) 542 543 return connection 544}