source dump of claude code
at main 3419 lines 126 kB view raw
1import type { 2 BetaContentBlock, 3 BetaContentBlockParam, 4 BetaImageBlockParam, 5 BetaJSONOutputFormat, 6 BetaMessage, 7 BetaMessageDeltaUsage, 8 BetaMessageStreamParams, 9 BetaOutputConfig, 10 BetaRawMessageStreamEvent, 11 BetaRequestDocumentBlock, 12 BetaStopReason, 13 BetaToolChoiceAuto, 14 BetaToolChoiceTool, 15 BetaToolResultBlockParam, 16 BetaToolUnion, 17 BetaUsage, 18 BetaMessageParam as MessageParam, 19} from '@anthropic-ai/sdk/resources/beta/messages/messages.mjs' 20import type { TextBlockParam } from '@anthropic-ai/sdk/resources/index.mjs' 21import type { Stream } from '@anthropic-ai/sdk/streaming.mjs' 22import { randomUUID } from 'crypto' 23import { 24 getAPIProvider, 25 isFirstPartyAnthropicBaseUrl, 26} from 'src/utils/model/providers.js' 27import { 28 getAttributionHeader, 29 getCLISyspromptPrefix, 30} from '../../constants/system.js' 31import { 32 getEmptyToolPermissionContext, 33 type QueryChainTracking, 34 type Tool, 35 type ToolPermissionContext, 36 type Tools, 37 toolMatchesName, 38} from '../../Tool.js' 39import type { AgentDefinition } from '../../tools/AgentTool/loadAgentsDir.js' 40import { 41 type ConnectorTextBlock, 42 type ConnectorTextDelta, 43 isConnectorTextBlock, 44} from '../../types/connectorText.js' 45import type { 46 AssistantMessage, 47 Message, 48 StreamEvent, 49 SystemAPIErrorMessage, 50 UserMessage, 51} from '../../types/message.js' 52import { 53 type CacheScope, 54 logAPIPrefix, 55 splitSysPromptPrefix, 56 toolToAPISchema, 57} from '../../utils/api.js' 58import { getOauthAccountInfo } from '../../utils/auth.js' 59import { 60 getBedrockExtraBodyParamsBetas, 61 getMergedBetas, 62 getModelBetas, 63} from '../../utils/betas.js' 64import { getOrCreateUserID } from '../../utils/config.js' 65import { 66 CAPPED_DEFAULT_MAX_TOKENS, 67 getModelMaxOutputTokens, 68 getSonnet1mExpTreatmentEnabled, 69} from '../../utils/context.js' 70import { resolveAppliedEffort } from '../../utils/effort.js' 71import { isEnvTruthy } from '../../utils/envUtils.js' 72import { errorMessage } from '../../utils/errors.js' 73import { computeFingerprintFromMessages } from '../../utils/fingerprint.js' 74import { captureAPIRequest, logError } from '../../utils/log.js' 75import { 76 createAssistantAPIErrorMessage, 77 createUserMessage, 78 ensureToolResultPairing, 79 normalizeContentFromAPI, 80 normalizeMessagesForAPI, 81 stripAdvisorBlocks, 82 stripCallerFieldFromAssistantMessage, 83 stripToolReferenceBlocksFromUserMessage, 84} from '../../utils/messages.js' 85import { 86 getDefaultOpusModel, 87 getDefaultSonnetModel, 88 getSmallFastModel, 89 isNonCustomOpusModel, 90} from '../../utils/model/model.js' 91import { 92 asSystemPrompt, 93 type SystemPrompt, 94} from '../../utils/systemPromptType.js' 95import { tokenCountFromLastAPIResponse } from '../../utils/tokens.js' 96import { getDynamicConfig_BLOCKS_ON_INIT } from '../analytics/growthbook.js' 97import { 98 currentLimits, 99 extractQuotaStatusFromError, 100 extractQuotaStatusFromHeaders, 101} from '../claudeAiLimits.js' 102import { getAPIContextManagement } from '../compact/apiMicrocompact.js' 103 104/* eslint-disable @typescript-eslint/no-require-imports */ 105const autoModeStateModule = feature('TRANSCRIPT_CLASSIFIER') 106 ? (require('../../utils/permissions/autoModeState.js') as typeof import('../../utils/permissions/autoModeState.js')) 107 : null 108 109import { feature } from 'bun:bundle' 110import type { ClientOptions } from '@anthropic-ai/sdk' 111import { 112 APIConnectionTimeoutError, 113 APIError, 114 APIUserAbortError, 115} from '@anthropic-ai/sdk/error' 116import { 117 getAfkModeHeaderLatched, 118 getCacheEditingHeaderLatched, 119 getFastModeHeaderLatched, 120 getLastApiCompletionTimestamp, 121 getPromptCache1hAllowlist, 122 getPromptCache1hEligible, 123 getSessionId, 124 getThinkingClearLatched, 125 setAfkModeHeaderLatched, 126 setCacheEditingHeaderLatched, 127 setFastModeHeaderLatched, 128 setLastMainRequestId, 129 setPromptCache1hAllowlist, 130 setPromptCache1hEligible, 131 setThinkingClearLatched, 132} from 'src/bootstrap/state.js' 133import { 134 AFK_MODE_BETA_HEADER, 135 CONTEXT_1M_BETA_HEADER, 136 CONTEXT_MANAGEMENT_BETA_HEADER, 137 EFFORT_BETA_HEADER, 138 FAST_MODE_BETA_HEADER, 139 PROMPT_CACHING_SCOPE_BETA_HEADER, 140 REDACT_THINKING_BETA_HEADER, 141 STRUCTURED_OUTPUTS_BETA_HEADER, 142 TASK_BUDGETS_BETA_HEADER, 143} from 'src/constants/betas.js' 144import type { QuerySource } from 'src/constants/querySource.js' 145import type { Notification } from 'src/context/notifications.js' 146import { addToTotalSessionCost } from 'src/cost-tracker.js' 147import { getFeatureValue_CACHED_MAY_BE_STALE } from 'src/services/analytics/growthbook.js' 148import type { AgentId } from 'src/types/ids.js' 149import { 150 ADVISOR_TOOL_INSTRUCTIONS, 151 getExperimentAdvisorModels, 152 isAdvisorEnabled, 153 isValidAdvisorModel, 154 modelSupportsAdvisor, 155} from 'src/utils/advisor.js' 156import { getAgentContext } from 'src/utils/agentContext.js' 157import { isClaudeAISubscriber } from 'src/utils/auth.js' 158import { 159 getToolSearchBetaHeader, 160 modelSupportsStructuredOutputs, 161 shouldIncludeFirstPartyOnlyBetas, 162 shouldUseGlobalCacheScope, 163} from 'src/utils/betas.js' 164import { CLAUDE_IN_CHROME_MCP_SERVER_NAME } from 'src/utils/claudeInChrome/common.js' 165import { CHROME_TOOL_SEARCH_INSTRUCTIONS } from 'src/utils/claudeInChrome/prompt.js' 166import { getMaxThinkingTokensForModel } from 'src/utils/context.js' 167import { logForDebugging } from 'src/utils/debug.js' 168import { logForDiagnosticsNoPII } from 'src/utils/diagLogs.js' 169import { type EffortValue, modelSupportsEffort } from 'src/utils/effort.js' 170import { 171 isFastModeAvailable, 172 isFastModeCooldown, 173 isFastModeEnabled, 174 isFastModeSupportedByModel, 175} from 'src/utils/fastMode.js' 176import { returnValue } from 'src/utils/generators.js' 177import { headlessProfilerCheckpoint } from 'src/utils/headlessProfiler.js' 178import { isMcpInstructionsDeltaEnabled } from 'src/utils/mcpInstructionsDelta.js' 179import { calculateUSDCost } from 'src/utils/modelCost.js' 180import { endQueryProfile, queryCheckpoint } from 'src/utils/queryProfiler.js' 181import { 182 modelSupportsAdaptiveThinking, 183 modelSupportsThinking, 184 type ThinkingConfig, 185} from 'src/utils/thinking.js' 186import { 187 extractDiscoveredToolNames, 188 isDeferredToolsDeltaEnabled, 189 isToolSearchEnabled, 190} from 'src/utils/toolSearch.js' 191import { API_MAX_MEDIA_PER_REQUEST } from '../../constants/apiLimits.js' 192import { ADVISOR_BETA_HEADER } from '../../constants/betas.js' 193import { 194 formatDeferredToolLine, 195 isDeferredTool, 196 TOOL_SEARCH_TOOL_NAME, 197} from '../../tools/ToolSearchTool/prompt.js' 198import { count } from '../../utils/array.js' 199import { insertBlockAfterToolResults } from '../../utils/contentArray.js' 200import { validateBoundedIntEnvVar } from '../../utils/envValidation.js' 201import { safeParseJSON } from '../../utils/json.js' 202import { getInferenceProfileBackingModel } from '../../utils/model/bedrock.js' 203import { 204 normalizeModelStringForAPI, 205 parseUserSpecifiedModel, 206} from '../../utils/model/model.js' 207import { 208 startSessionActivity, 209 stopSessionActivity, 210} from '../../utils/sessionActivity.js' 211import { jsonStringify } from '../../utils/slowOperations.js' 212import { 213 isBetaTracingEnabled, 214 type LLMRequestNewContext, 215 startLLMRequestSpan, 216} from '../../utils/telemetry/sessionTracing.js' 217/* eslint-enable @typescript-eslint/no-require-imports */ 218import { 219 type AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 220 logEvent, 221} from '../analytics/index.js' 222import { 223 consumePendingCacheEdits, 224 getPinnedCacheEdits, 225 markToolsSentToAPIState, 226 pinCacheEdits, 227} from '../compact/microCompact.js' 228import { getInitializationStatus } from '../lsp/manager.js' 229import { isToolFromMcpServer } from '../mcp/utils.js' 230import { withStreamingVCR, withVCR } from '../vcr.js' 231import { CLIENT_REQUEST_ID_HEADER, getAnthropicClient } from './client.js' 232import { 233 API_ERROR_MESSAGE_PREFIX, 234 CUSTOM_OFF_SWITCH_MESSAGE, 235 getAssistantMessageFromError, 236 getErrorMessageIfRefusal, 237} from './errors.js' 238import { 239 EMPTY_USAGE, 240 type GlobalCacheStrategy, 241 logAPIError, 242 logAPIQuery, 243 logAPISuccessAndDuration, 244 type NonNullableUsage, 245} from './logging.js' 246import { 247 CACHE_TTL_1HOUR_MS, 248 checkResponseForCacheBreak, 249 recordPromptState, 250} from './promptCacheBreakDetection.js' 251import { 252 CannotRetryError, 253 FallbackTriggeredError, 254 is529Error, 255 type RetryContext, 256 withRetry, 257} from './withRetry.js' 258 259// Define a type that represents valid JSON values 260type JsonValue = string | number | boolean | null | JsonObject | JsonArray 261type JsonObject = { [key: string]: JsonValue } 262type JsonArray = JsonValue[] 263 264/** 265 * Assemble the extra body parameters for the API request, based on the 266 * CLAUDE_CODE_EXTRA_BODY environment variable if present and on any beta 267 * headers (primarily for Bedrock requests). 268 * 269 * @param betaHeaders - An array of beta headers to include in the request. 270 * @returns A JSON object representing the extra body parameters. 271 */ 272export function getExtraBodyParams(betaHeaders?: string[]): JsonObject { 273 // Parse user's extra body parameters first 274 const extraBodyStr = process.env.CLAUDE_CODE_EXTRA_BODY 275 let result: JsonObject = {} 276 277 if (extraBodyStr) { 278 try { 279 // Parse as JSON, which can be null, boolean, number, string, array or object 280 const parsed = safeParseJSON(extraBodyStr) 281 // We expect an object with key-value pairs to spread into API parameters 282 if (parsed && typeof parsed === 'object' && !Array.isArray(parsed)) { 283 // Shallow clone — safeParseJSON is LRU-cached and returns the same 284 // object reference for the same string. Mutating `result` below 285 // would poison the cache, causing stale values to persist. 286 result = { ...(parsed as JsonObject) } 287 } else { 288 logForDebugging( 289 `CLAUDE_CODE_EXTRA_BODY env var must be a JSON object, but was given ${extraBodyStr}`, 290 { level: 'error' }, 291 ) 292 } 293 } catch (error) { 294 logForDebugging( 295 `Error parsing CLAUDE_CODE_EXTRA_BODY: ${errorMessage(error)}`, 296 { level: 'error' }, 297 ) 298 } 299 } 300 301 // Anti-distillation: send fake_tools opt-in for 1P CLI only 302 if ( 303 feature('ANTI_DISTILLATION_CC') 304 ? process.env.CLAUDE_CODE_ENTRYPOINT === 'cli' && 305 shouldIncludeFirstPartyOnlyBetas() && 306 getFeatureValue_CACHED_MAY_BE_STALE( 307 'tengu_anti_distill_fake_tool_injection', 308 false, 309 ) 310 : false 311 ) { 312 result.anti_distillation = ['fake_tools'] 313 } 314 315 // Handle beta headers if provided 316 if (betaHeaders && betaHeaders.length > 0) { 317 if (result.anthropic_beta && Array.isArray(result.anthropic_beta)) { 318 // Add to existing array, avoiding duplicates 319 const existingHeaders = result.anthropic_beta as string[] 320 const newHeaders = betaHeaders.filter( 321 header => !existingHeaders.includes(header), 322 ) 323 result.anthropic_beta = [...existingHeaders, ...newHeaders] 324 } else { 325 // Create new array with the beta headers 326 result.anthropic_beta = betaHeaders 327 } 328 } 329 330 return result 331} 332 333export function getPromptCachingEnabled(model: string): boolean { 334 // Global disable takes precedence 335 if (isEnvTruthy(process.env.DISABLE_PROMPT_CACHING)) return false 336 337 // Check if we should disable for small/fast model 338 if (isEnvTruthy(process.env.DISABLE_PROMPT_CACHING_HAIKU)) { 339 const smallFastModel = getSmallFastModel() 340 if (model === smallFastModel) return false 341 } 342 343 // Check if we should disable for default Sonnet 344 if (isEnvTruthy(process.env.DISABLE_PROMPT_CACHING_SONNET)) { 345 const defaultSonnet = getDefaultSonnetModel() 346 if (model === defaultSonnet) return false 347 } 348 349 // Check if we should disable for default Opus 350 if (isEnvTruthy(process.env.DISABLE_PROMPT_CACHING_OPUS)) { 351 const defaultOpus = getDefaultOpusModel() 352 if (model === defaultOpus) return false 353 } 354 355 return true 356} 357 358export function getCacheControl({ 359 scope, 360 querySource, 361}: { 362 scope?: CacheScope 363 querySource?: QuerySource 364} = {}): { 365 type: 'ephemeral' 366 ttl?: '1h' 367 scope?: CacheScope 368} { 369 return { 370 type: 'ephemeral', 371 ...(should1hCacheTTL(querySource) && { ttl: '1h' }), 372 ...(scope === 'global' && { scope }), 373 } 374} 375 376/** 377 * Determines if 1h TTL should be used for prompt caching. 378 * 379 * Only applied when: 380 * 1. User is eligible (ant or subscriber within rate limits) 381 * 2. The query source matches a pattern in the GrowthBook allowlist 382 * 383 * GrowthBook config shape: { allowlist: string[] } 384 * Patterns support trailing '*' for prefix matching. 385 * Examples: 386 * - { allowlist: ["repl_main_thread*", "sdk"] } — main thread + SDK only 387 * - { allowlist: ["repl_main_thread*", "sdk", "agent:*"] } — also subagents 388 * - { allowlist: ["*"] } — all sources 389 * 390 * The allowlist is cached in STATE for session stability — prevents mixed 391 * TTLs when GrowthBook's disk cache updates mid-request. 392 */ 393function should1hCacheTTL(querySource?: QuerySource): boolean { 394 // 3P Bedrock users get 1h TTL when opted in via env var — they manage their own billing 395 // No GrowthBook gating needed since 3P users don't have GrowthBook configured 396 if ( 397 getAPIProvider() === 'bedrock' && 398 isEnvTruthy(process.env.ENABLE_PROMPT_CACHING_1H_BEDROCK) 399 ) { 400 return true 401 } 402 403 // Latch eligibility in bootstrap state for session stability — prevents 404 // mid-session overage flips from changing the cache_control TTL, which 405 // would bust the server-side prompt cache (~20K tokens per flip). 406 let userEligible = getPromptCache1hEligible() 407 if (userEligible === null) { 408 userEligible = 409 process.env.USER_TYPE === 'ant' || 410 (isClaudeAISubscriber() && !currentLimits.isUsingOverage) 411 setPromptCache1hEligible(userEligible) 412 } 413 if (!userEligible) return false 414 415 // Cache allowlist in bootstrap state for session stability — prevents mixed 416 // TTLs when GrowthBook's disk cache updates mid-request 417 let allowlist = getPromptCache1hAllowlist() 418 if (allowlist === null) { 419 const config = getFeatureValue_CACHED_MAY_BE_STALE<{ 420 allowlist?: string[] 421 }>('tengu_prompt_cache_1h_config', {}) 422 allowlist = config.allowlist ?? [] 423 setPromptCache1hAllowlist(allowlist) 424 } 425 426 return ( 427 querySource !== undefined && 428 allowlist.some(pattern => 429 pattern.endsWith('*') 430 ? querySource.startsWith(pattern.slice(0, -1)) 431 : querySource === pattern, 432 ) 433 ) 434} 435 436/** 437 * Configure effort parameters for API request. 438 * 439 */ 440function configureEffortParams( 441 effortValue: EffortValue | undefined, 442 outputConfig: BetaOutputConfig, 443 extraBodyParams: Record<string, unknown>, 444 betas: string[], 445 model: string, 446): void { 447 if (!modelSupportsEffort(model) || 'effort' in outputConfig) { 448 return 449 } 450 451 if (effortValue === undefined) { 452 betas.push(EFFORT_BETA_HEADER) 453 } else if (typeof effortValue === 'string') { 454 // Send string effort level as is 455 outputConfig.effort = effortValue 456 betas.push(EFFORT_BETA_HEADER) 457 } else if (process.env.USER_TYPE === 'ant') { 458 // Numeric effort override - ant-only (uses anthropic_internal) 459 const existingInternal = 460 (extraBodyParams.anthropic_internal as Record<string, unknown>) || {} 461 extraBodyParams.anthropic_internal = { 462 ...existingInternal, 463 effort_override: effortValue, 464 } 465 } 466} 467 468// output_config.task_budget — API-side token budget awareness for the model. 469// Stainless SDK types don't yet include task_budget on BetaOutputConfig, so we 470// define the wire shape locally and cast. The API validates on receipt; see 471// api/api/schemas/messages/request/output_config.py:12-39 in the monorepo. 472// Beta: task-budgets-2026-03-13 (EAP, claude-strudel-eap only as of Mar 2026). 473type TaskBudgetParam = { 474 type: 'tokens' 475 total: number 476 remaining?: number 477} 478 479export function configureTaskBudgetParams( 480 taskBudget: Options['taskBudget'], 481 outputConfig: BetaOutputConfig & { task_budget?: TaskBudgetParam }, 482 betas: string[], 483): void { 484 if ( 485 !taskBudget || 486 'task_budget' in outputConfig || 487 !shouldIncludeFirstPartyOnlyBetas() 488 ) { 489 return 490 } 491 outputConfig.task_budget = { 492 type: 'tokens', 493 total: taskBudget.total, 494 ...(taskBudget.remaining !== undefined && { 495 remaining: taskBudget.remaining, 496 }), 497 } 498 if (!betas.includes(TASK_BUDGETS_BETA_HEADER)) { 499 betas.push(TASK_BUDGETS_BETA_HEADER) 500 } 501} 502 503export function getAPIMetadata() { 504 // https://docs.google.com/document/d/1dURO9ycXXQCBS0V4Vhl4poDBRgkelFc5t2BNPoEgH5Q/edit?tab=t.0#heading=h.5g7nec5b09w5 505 let extra: JsonObject = {} 506 const extraStr = process.env.CLAUDE_CODE_EXTRA_METADATA 507 if (extraStr) { 508 const parsed = safeParseJSON(extraStr, false) 509 if (parsed && typeof parsed === 'object' && !Array.isArray(parsed)) { 510 extra = parsed as JsonObject 511 } else { 512 logForDebugging( 513 `CLAUDE_CODE_EXTRA_METADATA env var must be a JSON object, but was given ${extraStr}`, 514 { level: 'error' }, 515 ) 516 } 517 } 518 519 return { 520 user_id: jsonStringify({ 521 ...extra, 522 device_id: getOrCreateUserID(), 523 // Only include OAuth account UUID when actively using OAuth authentication 524 account_uuid: getOauthAccountInfo()?.accountUuid ?? '', 525 session_id: getSessionId(), 526 }), 527 } 528} 529 530export async function verifyApiKey( 531 apiKey: string, 532 isNonInteractiveSession: boolean, 533): Promise<boolean> { 534 // Skip API verification if running in print mode (isNonInteractiveSession) 535 if (isNonInteractiveSession) { 536 return true 537 } 538 539 try { 540 // WARNING: if you change this to use a non-Haiku model, this request will fail in 1P unless it uses getCLISyspromptPrefix. 541 const model = getSmallFastModel() 542 const betas = getModelBetas(model) 543 return await returnValue( 544 withRetry( 545 () => 546 getAnthropicClient({ 547 apiKey, 548 maxRetries: 3, 549 model, 550 source: 'verify_api_key', 551 }), 552 async anthropic => { 553 const messages: MessageParam[] = [{ role: 'user', content: 'test' }] 554 // biome-ignore lint/plugin: API key verification is intentionally a minimal direct call 555 await anthropic.beta.messages.create({ 556 model, 557 max_tokens: 1, 558 messages, 559 temperature: 1, 560 ...(betas.length > 0 && { betas }), 561 metadata: getAPIMetadata(), 562 ...getExtraBodyParams(), 563 }) 564 return true 565 }, 566 { maxRetries: 2, model, thinkingConfig: { type: 'disabled' } }, // Use fewer retries for API key verification 567 ), 568 ) 569 } catch (errorFromRetry) { 570 let error = errorFromRetry 571 if (errorFromRetry instanceof CannotRetryError) { 572 error = errorFromRetry.originalError 573 } 574 logError(error) 575 // Check for authentication error 576 if ( 577 error instanceof Error && 578 error.message.includes( 579 '{"type":"error","error":{"type":"authentication_error","message":"invalid x-api-key"}}', 580 ) 581 ) { 582 return false 583 } 584 throw error 585 } 586} 587 588export function userMessageToMessageParam( 589 message: UserMessage, 590 addCache = false, 591 enablePromptCaching: boolean, 592 querySource?: QuerySource, 593): MessageParam { 594 if (addCache) { 595 if (typeof message.message.content === 'string') { 596 return { 597 role: 'user', 598 content: [ 599 { 600 type: 'text', 601 text: message.message.content, 602 ...(enablePromptCaching && { 603 cache_control: getCacheControl({ querySource }), 604 }), 605 }, 606 ], 607 } 608 } else { 609 return { 610 role: 'user', 611 content: message.message.content.map((_, i) => ({ 612 ..._, 613 ...(i === message.message.content.length - 1 614 ? enablePromptCaching 615 ? { cache_control: getCacheControl({ querySource }) } 616 : {} 617 : {}), 618 })), 619 } 620 } 621 } 622 // Clone array content to prevent in-place mutations (e.g., insertCacheEditsBlock's 623 // splice) from contaminating the original message. Without cloning, multiple calls 624 // to addCacheBreakpoints share the same array and each splices in duplicate cache_edits. 625 return { 626 role: 'user', 627 content: Array.isArray(message.message.content) 628 ? [...message.message.content] 629 : message.message.content, 630 } 631} 632 633export function assistantMessageToMessageParam( 634 message: AssistantMessage, 635 addCache = false, 636 enablePromptCaching: boolean, 637 querySource?: QuerySource, 638): MessageParam { 639 if (addCache) { 640 if (typeof message.message.content === 'string') { 641 return { 642 role: 'assistant', 643 content: [ 644 { 645 type: 'text', 646 text: message.message.content, 647 ...(enablePromptCaching && { 648 cache_control: getCacheControl({ querySource }), 649 }), 650 }, 651 ], 652 } 653 } else { 654 return { 655 role: 'assistant', 656 content: message.message.content.map((_, i) => ({ 657 ..._, 658 ...(i === message.message.content.length - 1 && 659 _.type !== 'thinking' && 660 _.type !== 'redacted_thinking' && 661 (feature('CONNECTOR_TEXT') ? !isConnectorTextBlock(_) : true) 662 ? enablePromptCaching 663 ? { cache_control: getCacheControl({ querySource }) } 664 : {} 665 : {}), 666 })), 667 } 668 } 669 } 670 return { 671 role: 'assistant', 672 content: message.message.content, 673 } 674} 675 676export type Options = { 677 getToolPermissionContext: () => Promise<ToolPermissionContext> 678 model: string 679 toolChoice?: BetaToolChoiceTool | BetaToolChoiceAuto | undefined 680 isNonInteractiveSession: boolean 681 extraToolSchemas?: BetaToolUnion[] 682 maxOutputTokensOverride?: number 683 fallbackModel?: string 684 onStreamingFallback?: () => void 685 querySource: QuerySource 686 agents: AgentDefinition[] 687 allowedAgentTypes?: string[] 688 hasAppendSystemPrompt: boolean 689 fetchOverride?: ClientOptions['fetch'] 690 enablePromptCaching?: boolean 691 skipCacheWrite?: boolean 692 temperatureOverride?: number 693 effortValue?: EffortValue 694 mcpTools: Tools 695 hasPendingMcpServers?: boolean 696 queryTracking?: QueryChainTracking 697 agentId?: AgentId // Only set for subagents 698 outputFormat?: BetaJSONOutputFormat 699 fastMode?: boolean 700 advisorModel?: string 701 addNotification?: (notif: Notification) => void 702 // API-side task budget (output_config.task_budget). Distinct from the 703 // tokenBudget.ts +500k auto-continue feature — this one is sent to the API 704 // so the model can pace itself. `remaining` is computed by the caller 705 // (query.ts decrements across the agentic loop). 706 taskBudget?: { total: number; remaining?: number } 707} 708 709export async function queryModelWithoutStreaming({ 710 messages, 711 systemPrompt, 712 thinkingConfig, 713 tools, 714 signal, 715 options, 716}: { 717 messages: Message[] 718 systemPrompt: SystemPrompt 719 thinkingConfig: ThinkingConfig 720 tools: Tools 721 signal: AbortSignal 722 options: Options 723}): Promise<AssistantMessage> { 724 // Store the assistant message but continue consuming the generator to ensure 725 // logAPISuccessAndDuration gets called (which happens after all yields) 726 let assistantMessage: AssistantMessage | undefined 727 for await (const message of withStreamingVCR(messages, async function* () { 728 yield* queryModel( 729 messages, 730 systemPrompt, 731 thinkingConfig, 732 tools, 733 signal, 734 options, 735 ) 736 })) { 737 if (message.type === 'assistant') { 738 assistantMessage = message 739 } 740 } 741 if (!assistantMessage) { 742 // If the signal was aborted, throw APIUserAbortError instead of a generic error 743 // This allows callers to handle abort scenarios gracefully 744 if (signal.aborted) { 745 throw new APIUserAbortError() 746 } 747 throw new Error('No assistant message found') 748 } 749 return assistantMessage 750} 751 752export async function* queryModelWithStreaming({ 753 messages, 754 systemPrompt, 755 thinkingConfig, 756 tools, 757 signal, 758 options, 759}: { 760 messages: Message[] 761 systemPrompt: SystemPrompt 762 thinkingConfig: ThinkingConfig 763 tools: Tools 764 signal: AbortSignal 765 options: Options 766}): AsyncGenerator< 767 StreamEvent | AssistantMessage | SystemAPIErrorMessage, 768 void 769> { 770 return yield* withStreamingVCR(messages, async function* () { 771 yield* queryModel( 772 messages, 773 systemPrompt, 774 thinkingConfig, 775 tools, 776 signal, 777 options, 778 ) 779 }) 780} 781 782/** 783 * Determines if an LSP tool should be deferred (tool appears with defer_loading: true) 784 * because LSP initialization is not yet complete. 785 */ 786function shouldDeferLspTool(tool: Tool): boolean { 787 if (!('isLsp' in tool) || !tool.isLsp) { 788 return false 789 } 790 const status = getInitializationStatus() 791 // Defer when pending or not started 792 return status.status === 'pending' || status.status === 'not-started' 793} 794 795/** 796 * Per-attempt timeout for non-streaming fallback requests, in milliseconds. 797 * Reads API_TIMEOUT_MS when set so slow backends and the streaming path 798 * share the same ceiling. 799 * 800 * Remote sessions default to 120s to stay under CCR's container idle-kill 801 * (~5min) so a hung fallback to a wedged backend surfaces a clean 802 * APIConnectionTimeoutError instead of stalling past SIGKILL. 803 * 804 * Otherwise defaults to 300s — long enough for slow backends without 805 * approaching the API's 10-minute non-streaming boundary. 806 */ 807function getNonstreamingFallbackTimeoutMs(): number { 808 const override = parseInt(process.env.API_TIMEOUT_MS || '', 10) 809 if (override) return override 810 return isEnvTruthy(process.env.CLAUDE_CODE_REMOTE) ? 120_000 : 300_000 811} 812 813/** 814 * Helper generator for non-streaming API requests. 815 * Encapsulates the common pattern of creating a withRetry generator, 816 * iterating to yield system messages, and returning the final BetaMessage. 817 */ 818export async function* executeNonStreamingRequest( 819 clientOptions: { 820 model: string 821 fetchOverride?: Options['fetchOverride'] 822 source: string 823 }, 824 retryOptions: { 825 model: string 826 fallbackModel?: string 827 thinkingConfig: ThinkingConfig 828 fastMode?: boolean 829 signal: AbortSignal 830 initialConsecutive529Errors?: number 831 querySource?: QuerySource 832 }, 833 paramsFromContext: (context: RetryContext) => BetaMessageStreamParams, 834 onAttempt: (attempt: number, start: number, maxOutputTokens: number) => void, 835 captureRequest: (params: BetaMessageStreamParams) => void, 836 /** 837 * Request ID of the failed streaming attempt this fallback is recovering 838 * from. Emitted in tengu_nonstreaming_fallback_error for funnel correlation. 839 */ 840 originatingRequestId?: string | null, 841): AsyncGenerator<SystemAPIErrorMessage, BetaMessage> { 842 const fallbackTimeoutMs = getNonstreamingFallbackTimeoutMs() 843 const generator = withRetry( 844 () => 845 getAnthropicClient({ 846 maxRetries: 0, 847 model: clientOptions.model, 848 fetchOverride: clientOptions.fetchOverride, 849 source: clientOptions.source, 850 }), 851 async (anthropic, attempt, context) => { 852 const start = Date.now() 853 const retryParams = paramsFromContext(context) 854 captureRequest(retryParams) 855 onAttempt(attempt, start, retryParams.max_tokens) 856 857 const adjustedParams = adjustParamsForNonStreaming( 858 retryParams, 859 MAX_NON_STREAMING_TOKENS, 860 ) 861 862 try { 863 // biome-ignore lint/plugin: non-streaming API call 864 return await anthropic.beta.messages.create( 865 { 866 ...adjustedParams, 867 model: normalizeModelStringForAPI(adjustedParams.model), 868 }, 869 { 870 signal: retryOptions.signal, 871 timeout: fallbackTimeoutMs, 872 }, 873 ) 874 } catch (err) { 875 // User aborts are not errors — re-throw immediately without logging 876 if (err instanceof APIUserAbortError) throw err 877 878 // Instrumentation: record when the non-streaming request errors (including 879 // timeouts). Lets us distinguish "fallback hung past container kill" 880 // (no event) from "fallback hit the bounded timeout" (this event). 881 logForDiagnosticsNoPII('error', 'cli_nonstreaming_fallback_error') 882 logEvent('tengu_nonstreaming_fallback_error', { 883 model: 884 clientOptions.model as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 885 error: 886 err instanceof Error 887 ? (err.name as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS) 888 : ('unknown' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS), 889 attempt, 890 timeout_ms: fallbackTimeoutMs, 891 request_id: (originatingRequestId ?? 892 'unknown') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 893 }) 894 throw err 895 } 896 }, 897 { 898 model: retryOptions.model, 899 fallbackModel: retryOptions.fallbackModel, 900 thinkingConfig: retryOptions.thinkingConfig, 901 ...(isFastModeEnabled() && { fastMode: retryOptions.fastMode }), 902 signal: retryOptions.signal, 903 initialConsecutive529Errors: retryOptions.initialConsecutive529Errors, 904 querySource: retryOptions.querySource, 905 }, 906 ) 907 908 let e 909 do { 910 e = await generator.next() 911 if (!e.done && e.value.type === 'system') { 912 yield e.value 913 } 914 } while (!e.done) 915 916 return e.value as BetaMessage 917} 918 919/** 920 * Extracts the request ID from the most recent assistant message in the 921 * conversation. Used to link consecutive API requests in analytics so we can 922 * join them for cache-hit-rate analysis and incremental token tracking. 923 * 924 * Deriving this from the message array (rather than global state) ensures each 925 * query chain (main thread, subagent, teammate) tracks its own request chain 926 * independently, and rollback/undo naturally updates the value. 927 */ 928function getPreviousRequestIdFromMessages( 929 messages: Message[], 930): string | undefined { 931 for (let i = messages.length - 1; i >= 0; i--) { 932 const msg = messages[i]! 933 if (msg.type === 'assistant' && msg.requestId) { 934 return msg.requestId 935 } 936 } 937 return undefined 938} 939 940function isMedia( 941 block: BetaContentBlockParam, 942): block is BetaImageBlockParam | BetaRequestDocumentBlock { 943 return block.type === 'image' || block.type === 'document' 944} 945 946function isToolResult( 947 block: BetaContentBlockParam, 948): block is BetaToolResultBlockParam { 949 return block.type === 'tool_result' 950} 951 952/** 953 * Ensures messages contain at most `limit` media items (images + documents). 954 * Strips oldest media first to preserve the most recent. 955 */ 956export function stripExcessMediaItems( 957 messages: (UserMessage | AssistantMessage)[], 958 limit: number, 959): (UserMessage | AssistantMessage)[] { 960 let toRemove = 0 961 for (const msg of messages) { 962 if (!Array.isArray(msg.message.content)) continue 963 for (const block of msg.message.content) { 964 if (isMedia(block)) toRemove++ 965 if (isToolResult(block) && Array.isArray(block.content)) { 966 for (const nested of block.content) { 967 if (isMedia(nested)) toRemove++ 968 } 969 } 970 } 971 } 972 toRemove -= limit 973 if (toRemove <= 0) return messages 974 975 return messages.map(msg => { 976 if (toRemove <= 0) return msg 977 const content = msg.message.content 978 if (!Array.isArray(content)) return msg 979 980 const before = toRemove 981 const stripped = content 982 .map(block => { 983 if ( 984 toRemove <= 0 || 985 !isToolResult(block) || 986 !Array.isArray(block.content) 987 ) 988 return block 989 const filtered = block.content.filter(n => { 990 if (toRemove > 0 && isMedia(n)) { 991 toRemove-- 992 return false 993 } 994 return true 995 }) 996 return filtered.length === block.content.length 997 ? block 998 : { ...block, content: filtered } 999 }) 1000 .filter(block => { 1001 if (toRemove > 0 && isMedia(block)) { 1002 toRemove-- 1003 return false 1004 } 1005 return true 1006 }) 1007 1008 return before === toRemove 1009 ? msg 1010 : { 1011 ...msg, 1012 message: { ...msg.message, content: stripped }, 1013 } 1014 }) as (UserMessage | AssistantMessage)[] 1015} 1016 1017async function* queryModel( 1018 messages: Message[], 1019 systemPrompt: SystemPrompt, 1020 thinkingConfig: ThinkingConfig, 1021 tools: Tools, 1022 signal: AbortSignal, 1023 options: Options, 1024): AsyncGenerator< 1025 StreamEvent | AssistantMessage | SystemAPIErrorMessage, 1026 void 1027> { 1028 // Check cheap conditions first — the off-switch await blocks on GrowthBook 1029 // init (~10ms). For non-Opus models (haiku, sonnet) this skips the await 1030 // entirely. Subscribers don't hit this path at all. 1031 if ( 1032 !isClaudeAISubscriber() && 1033 isNonCustomOpusModel(options.model) && 1034 ( 1035 await getDynamicConfig_BLOCKS_ON_INIT<{ activated: boolean }>( 1036 'tengu-off-switch', 1037 { 1038 activated: false, 1039 }, 1040 ) 1041 ).activated 1042 ) { 1043 logEvent('tengu_off_switch_query', {}) 1044 yield getAssistantMessageFromError( 1045 new Error(CUSTOM_OFF_SWITCH_MESSAGE), 1046 options.model, 1047 ) 1048 return 1049 } 1050 1051 // Derive previous request ID from the last assistant message in this query chain. 1052 // This is scoped per message array (main thread, subagent, teammate each have their own), 1053 // so concurrent agents don't clobber each other's request chain tracking. 1054 // Also naturally handles rollback/undo since removed messages won't be in the array. 1055 const previousRequestId = getPreviousRequestIdFromMessages(messages) 1056 1057 const resolvedModel = 1058 getAPIProvider() === 'bedrock' && 1059 options.model.includes('application-inference-profile') 1060 ? ((await getInferenceProfileBackingModel(options.model)) ?? 1061 options.model) 1062 : options.model 1063 1064 queryCheckpoint('query_tool_schema_build_start') 1065 const isAgenticQuery = 1066 options.querySource.startsWith('repl_main_thread') || 1067 options.querySource.startsWith('agent:') || 1068 options.querySource === 'sdk' || 1069 options.querySource === 'hook_agent' || 1070 options.querySource === 'verification_agent' 1071 const betas = getMergedBetas(options.model, { isAgenticQuery }) 1072 1073 // Always send the advisor beta header when advisor is enabled, so 1074 // non-agentic queries (compact, side_question, extract_memories, etc.) 1075 // can parse advisor server_tool_use blocks already in the conversation history. 1076 if (isAdvisorEnabled()) { 1077 betas.push(ADVISOR_BETA_HEADER) 1078 } 1079 1080 let advisorModel: string | undefined 1081 if (isAgenticQuery && isAdvisorEnabled()) { 1082 let advisorOption = options.advisorModel 1083 1084 const advisorExperiment = getExperimentAdvisorModels() 1085 if (advisorExperiment !== undefined) { 1086 if ( 1087 normalizeModelStringForAPI(advisorExperiment.baseModel) === 1088 normalizeModelStringForAPI(options.model) 1089 ) { 1090 // Override the advisor model if the base model matches. We 1091 // should only have experiment models if the user cannot 1092 // configure it themselves. 1093 advisorOption = advisorExperiment.advisorModel 1094 } 1095 } 1096 1097 if (advisorOption) { 1098 const normalizedAdvisorModel = normalizeModelStringForAPI( 1099 parseUserSpecifiedModel(advisorOption), 1100 ) 1101 if (!modelSupportsAdvisor(options.model)) { 1102 logForDebugging( 1103 `[AdvisorTool] Skipping advisor - base model ${options.model} does not support advisor`, 1104 ) 1105 } else if (!isValidAdvisorModel(normalizedAdvisorModel)) { 1106 logForDebugging( 1107 `[AdvisorTool] Skipping advisor - ${normalizedAdvisorModel} is not a valid advisor model`, 1108 ) 1109 } else { 1110 advisorModel = normalizedAdvisorModel 1111 logForDebugging( 1112 `[AdvisorTool] Server-side tool enabled with ${advisorModel} as the advisor model`, 1113 ) 1114 } 1115 } 1116 } 1117 1118 // Check if tool search is enabled (checks mode, model support, and threshold for auto mode) 1119 // This is async because it may need to calculate MCP tool description sizes for TstAuto mode 1120 let useToolSearch = await isToolSearchEnabled( 1121 options.model, 1122 tools, 1123 options.getToolPermissionContext, 1124 options.agents, 1125 'query', 1126 ) 1127 1128 // Precompute once — isDeferredTool does 2 GrowthBook lookups per call 1129 const deferredToolNames = new Set<string>() 1130 if (useToolSearch) { 1131 for (const t of tools) { 1132 if (isDeferredTool(t)) deferredToolNames.add(t.name) 1133 } 1134 } 1135 1136 // Even if tool search mode is enabled, skip if there are no deferred tools 1137 // AND no MCP servers are still connecting. When servers are pending, keep 1138 // ToolSearch available so the model can discover tools after they connect. 1139 if ( 1140 useToolSearch && 1141 deferredToolNames.size === 0 && 1142 !options.hasPendingMcpServers 1143 ) { 1144 logForDebugging( 1145 'Tool search disabled: no deferred tools available to search', 1146 ) 1147 useToolSearch = false 1148 } 1149 1150 // Filter out ToolSearchTool if tool search is not enabled for this model 1151 // ToolSearchTool returns tool_reference blocks which unsupported models can't handle 1152 let filteredTools: Tools 1153 1154 if (useToolSearch) { 1155 // Dynamic tool loading: Only include deferred tools that have been discovered 1156 // via tool_reference blocks in the message history. This eliminates the need 1157 // to predeclare all deferred tools upfront and removes limits on tool quantity. 1158 const discoveredToolNames = extractDiscoveredToolNames(messages) 1159 1160 filteredTools = tools.filter(tool => { 1161 // Always include non-deferred tools 1162 if (!deferredToolNames.has(tool.name)) return true 1163 // Always include ToolSearchTool (so it can discover more tools) 1164 if (toolMatchesName(tool, TOOL_SEARCH_TOOL_NAME)) return true 1165 // Only include deferred tools that have been discovered 1166 return discoveredToolNames.has(tool.name) 1167 }) 1168 } else { 1169 filteredTools = tools.filter( 1170 t => !toolMatchesName(t, TOOL_SEARCH_TOOL_NAME), 1171 ) 1172 } 1173 1174 // Add tool search beta header if enabled - required for defer_loading to be accepted 1175 // Header differs by provider: 1P/Foundry use advanced-tool-use, Vertex/Bedrock use tool-search-tool 1176 // For Bedrock, this header must go in extraBodyParams, not the betas array 1177 const toolSearchHeader = useToolSearch ? getToolSearchBetaHeader() : null 1178 if (toolSearchHeader && getAPIProvider() !== 'bedrock') { 1179 if (!betas.includes(toolSearchHeader)) { 1180 betas.push(toolSearchHeader) 1181 } 1182 } 1183 1184 // Determine if cached microcompact is enabled for this model. 1185 // Computed once here (in async context) and captured by paramsFromContext. 1186 // The beta header is also captured here to avoid a top-level import of the 1187 // ant-only CACHE_EDITING_BETA_HEADER constant. 1188 let cachedMCEnabled = false 1189 let cacheEditingBetaHeader = '' 1190 if (feature('CACHED_MICROCOMPACT')) { 1191 const { 1192 isCachedMicrocompactEnabled, 1193 isModelSupportedForCacheEditing, 1194 getCachedMCConfig, 1195 } = await import('../compact/cachedMicrocompact.js') 1196 const betas = await import('src/constants/betas.js') 1197 cacheEditingBetaHeader = betas.CACHE_EDITING_BETA_HEADER 1198 const featureEnabled = isCachedMicrocompactEnabled() 1199 const modelSupported = isModelSupportedForCacheEditing(options.model) 1200 cachedMCEnabled = featureEnabled && modelSupported 1201 const config = getCachedMCConfig() 1202 logForDebugging( 1203 `Cached MC gate: enabled=${featureEnabled} modelSupported=${modelSupported} model=${options.model} supportedModels=${jsonStringify(config.supportedModels)}`, 1204 ) 1205 } 1206 1207 const useGlobalCacheFeature = shouldUseGlobalCacheScope() 1208 const willDefer = (t: Tool) => 1209 useToolSearch && (deferredToolNames.has(t.name) || shouldDeferLspTool(t)) 1210 // MCP tools are per-user → dynamic tool section → can't globally cache. 1211 // Only gate when an MCP tool will actually render (not defer_loading). 1212 const needsToolBasedCacheMarker = 1213 useGlobalCacheFeature && 1214 filteredTools.some(t => t.isMcp === true && !willDefer(t)) 1215 1216 // Ensure prompt_caching_scope beta header is present when global cache is enabled. 1217 if ( 1218 useGlobalCacheFeature && 1219 !betas.includes(PROMPT_CACHING_SCOPE_BETA_HEADER) 1220 ) { 1221 betas.push(PROMPT_CACHING_SCOPE_BETA_HEADER) 1222 } 1223 1224 // Determine global cache strategy for logging 1225 const globalCacheStrategy: GlobalCacheStrategy = useGlobalCacheFeature 1226 ? needsToolBasedCacheMarker 1227 ? 'none' 1228 : 'system_prompt' 1229 : 'none' 1230 1231 // Build tool schemas, adding defer_loading for MCP tools when tool search is enabled 1232 // Note: We pass the full `tools` list (not filteredTools) to toolToAPISchema so that 1233 // ToolSearchTool's prompt can list ALL available MCP tools. The filtering only affects 1234 // which tools are actually sent to the API, not what the model sees in tool descriptions. 1235 const toolSchemas = await Promise.all( 1236 filteredTools.map(tool => 1237 toolToAPISchema(tool, { 1238 getToolPermissionContext: options.getToolPermissionContext, 1239 tools, 1240 agents: options.agents, 1241 allowedAgentTypes: options.allowedAgentTypes, 1242 model: options.model, 1243 deferLoading: willDefer(tool), 1244 }), 1245 ), 1246 ) 1247 1248 if (useToolSearch) { 1249 const includedDeferredTools = count(filteredTools, t => 1250 deferredToolNames.has(t.name), 1251 ) 1252 logForDebugging( 1253 `Dynamic tool loading: ${includedDeferredTools}/${deferredToolNames.size} deferred tools included`, 1254 ) 1255 } 1256 1257 queryCheckpoint('query_tool_schema_build_end') 1258 1259 // Normalize messages before building system prompt (needed for fingerprinting) 1260 // Instrumentation: Track message count before normalization 1261 logEvent('tengu_api_before_normalize', { 1262 preNormalizedMessageCount: messages.length, 1263 }) 1264 1265 queryCheckpoint('query_message_normalization_start') 1266 let messagesForAPI = normalizeMessagesForAPI(messages, filteredTools) 1267 queryCheckpoint('query_message_normalization_end') 1268 1269 // Model-specific post-processing: strip tool-search-specific fields if the 1270 // selected model doesn't support tool search. 1271 // 1272 // Why is this needed in addition to normalizeMessagesForAPI? 1273 // - normalizeMessagesForAPI uses isToolSearchEnabledNoModelCheck() because it's 1274 // called from ~20 places (analytics, feedback, sharing, etc.), many of which 1275 // don't have model context. Adding model to its signature would be a large refactor. 1276 // - This post-processing uses the model-aware isToolSearchEnabled() check 1277 // - This handles mid-conversation model switching (e.g., Sonnet → Haiku) where 1278 // stale tool-search fields from the previous model would cause 400 errors 1279 // 1280 // Note: For assistant messages, normalizeMessagesForAPI already normalized the 1281 // tool inputs, so stripCallerFieldFromAssistantMessage only needs to remove the 1282 // 'caller' field (not re-normalize inputs). 1283 if (!useToolSearch) { 1284 messagesForAPI = messagesForAPI.map(msg => { 1285 switch (msg.type) { 1286 case 'user': 1287 // Strip tool_reference blocks from tool_result content 1288 return stripToolReferenceBlocksFromUserMessage(msg) 1289 case 'assistant': 1290 // Strip 'caller' field from tool_use blocks 1291 return stripCallerFieldFromAssistantMessage(msg) 1292 default: 1293 return msg 1294 } 1295 }) 1296 } 1297 1298 // Repair tool_use/tool_result pairing mismatches that can occur when resuming 1299 // remote/teleport sessions. Inserts synthetic error tool_results for orphaned 1300 // tool_uses and strips orphaned tool_results referencing non-existent tool_uses. 1301 messagesForAPI = ensureToolResultPairing(messagesForAPI) 1302 1303 // Strip advisor blocks — the API rejects them without the beta header. 1304 if (!betas.includes(ADVISOR_BETA_HEADER)) { 1305 messagesForAPI = stripAdvisorBlocks(messagesForAPI) 1306 } 1307 1308 // Strip excess media items before making the API call. 1309 // The API rejects requests with >100 media items but returns a confusing error. 1310 // Rather than erroring (which is hard to recover from in Cowork/CCD), we 1311 // silently drop the oldest media items to stay within the limit. 1312 messagesForAPI = stripExcessMediaItems( 1313 messagesForAPI, 1314 API_MAX_MEDIA_PER_REQUEST, 1315 ) 1316 1317 // Instrumentation: Track message count after normalization 1318 logEvent('tengu_api_after_normalize', { 1319 postNormalizedMessageCount: messagesForAPI.length, 1320 }) 1321 1322 // Compute fingerprint from first user message for attribution. 1323 // Must run BEFORE injecting synthetic messages (e.g. deferred tool names) 1324 // so the fingerprint reflects the actual user input. 1325 const fingerprint = computeFingerprintFromMessages(messagesForAPI) 1326 1327 // When the delta attachment is enabled, deferred tools are announced 1328 // via persisted deferred_tools_delta attachments instead of this 1329 // ephemeral prepend (which busts cache whenever the pool changes). 1330 if (useToolSearch && !isDeferredToolsDeltaEnabled()) { 1331 const deferredToolList = tools 1332 .filter(t => deferredToolNames.has(t.name)) 1333 .map(formatDeferredToolLine) 1334 .sort() 1335 .join('\n') 1336 if (deferredToolList) { 1337 messagesForAPI = [ 1338 createUserMessage({ 1339 content: `<available-deferred-tools>\n${deferredToolList}\n</available-deferred-tools>`, 1340 isMeta: true, 1341 }), 1342 ...messagesForAPI, 1343 ] 1344 } 1345 } 1346 1347 // Chrome tool-search instructions: when the delta attachment is enabled, 1348 // these are carried as a client-side block in mcp_instructions_delta 1349 // (attachments.ts) instead of here. This per-request sys-prompt append 1350 // busts the prompt cache when chrome connects late. 1351 const hasChromeTools = filteredTools.some(t => 1352 isToolFromMcpServer(t.name, CLAUDE_IN_CHROME_MCP_SERVER_NAME), 1353 ) 1354 const injectChromeHere = 1355 useToolSearch && hasChromeTools && !isMcpInstructionsDeltaEnabled() 1356 1357 // filter(Boolean) works by converting each element to a boolean - empty strings become false and are filtered out. 1358 systemPrompt = asSystemPrompt( 1359 [ 1360 getAttributionHeader(fingerprint), 1361 getCLISyspromptPrefix({ 1362 isNonInteractive: options.isNonInteractiveSession, 1363 hasAppendSystemPrompt: options.hasAppendSystemPrompt, 1364 }), 1365 ...systemPrompt, 1366 ...(advisorModel ? [ADVISOR_TOOL_INSTRUCTIONS] : []), 1367 ...(injectChromeHere ? [CHROME_TOOL_SEARCH_INSTRUCTIONS] : []), 1368 ].filter(Boolean), 1369 ) 1370 1371 // Prepend system prompt block for easy API identification 1372 logAPIPrefix(systemPrompt) 1373 1374 const enablePromptCaching = 1375 options.enablePromptCaching ?? getPromptCachingEnabled(options.model) 1376 const system = buildSystemPromptBlocks(systemPrompt, enablePromptCaching, { 1377 skipGlobalCacheForSystemPrompt: needsToolBasedCacheMarker, 1378 querySource: options.querySource, 1379 }) 1380 const useBetas = betas.length > 0 1381 1382 // Build minimal context for detailed tracing (when beta tracing is enabled) 1383 // Note: The actual new_context message extraction is done in sessionTracing.ts using 1384 // hash-based tracking per querySource (agent) from the messagesForAPI array 1385 const extraToolSchemas = [...(options.extraToolSchemas ?? [])] 1386 if (advisorModel) { 1387 // Server tools must be in the tools array by API contract. Appended after 1388 // toolSchemas (which carries the cache_control marker) so toggling /advisor 1389 // only churns the small suffix, not the cached prefix. 1390 extraToolSchemas.push({ 1391 type: 'advisor_20260301', 1392 name: 'advisor', 1393 model: advisorModel, 1394 } as unknown as BetaToolUnion) 1395 } 1396 const allTools = [...toolSchemas, ...extraToolSchemas] 1397 1398 const isFastMode = 1399 isFastModeEnabled() && 1400 isFastModeAvailable() && 1401 !isFastModeCooldown() && 1402 isFastModeSupportedByModel(options.model) && 1403 !!options.fastMode 1404 1405 // Sticky-on latches for dynamic beta headers. Each header, once first 1406 // sent, keeps being sent for the rest of the session so mid-session 1407 // toggles don't change the server-side cache key and bust ~50-70K tokens. 1408 // Latches are cleared on /clear and /compact via clearBetaHeaderLatches(). 1409 // Per-call gates (isAgenticQuery, querySource===repl_main_thread) stay 1410 // per-call so non-agentic queries keep their own stable header set. 1411 1412 let afkHeaderLatched = getAfkModeHeaderLatched() === true 1413 if (feature('TRANSCRIPT_CLASSIFIER')) { 1414 if ( 1415 !afkHeaderLatched && 1416 isAgenticQuery && 1417 shouldIncludeFirstPartyOnlyBetas() && 1418 (autoModeStateModule?.isAutoModeActive() ?? false) 1419 ) { 1420 afkHeaderLatched = true 1421 setAfkModeHeaderLatched(true) 1422 } 1423 } 1424 1425 let fastModeHeaderLatched = getFastModeHeaderLatched() === true 1426 if (!fastModeHeaderLatched && isFastMode) { 1427 fastModeHeaderLatched = true 1428 setFastModeHeaderLatched(true) 1429 } 1430 1431 let cacheEditingHeaderLatched = getCacheEditingHeaderLatched() === true 1432 if (feature('CACHED_MICROCOMPACT')) { 1433 if ( 1434 !cacheEditingHeaderLatched && 1435 cachedMCEnabled && 1436 getAPIProvider() === 'firstParty' && 1437 options.querySource === 'repl_main_thread' 1438 ) { 1439 cacheEditingHeaderLatched = true 1440 setCacheEditingHeaderLatched(true) 1441 } 1442 } 1443 1444 // Only latch from agentic queries so a classifier call doesn't flip the 1445 // main thread's context_management mid-turn. 1446 let thinkingClearLatched = getThinkingClearLatched() === true 1447 if (!thinkingClearLatched && isAgenticQuery) { 1448 const lastCompletion = getLastApiCompletionTimestamp() 1449 if ( 1450 lastCompletion !== null && 1451 Date.now() - lastCompletion > CACHE_TTL_1HOUR_MS 1452 ) { 1453 thinkingClearLatched = true 1454 setThinkingClearLatched(true) 1455 } 1456 } 1457 1458 const effort = resolveAppliedEffort(options.model, options.effortValue) 1459 1460 if (feature('PROMPT_CACHE_BREAK_DETECTION')) { 1461 // Exclude defer_loading tools from the hash -- the API strips them from the 1462 // prompt, so they never affect the actual cache key. Including them creates 1463 // false-positive "tool schemas changed" breaks when tools are discovered or 1464 // MCP servers reconnect. 1465 const toolsForCacheDetection = allTools.filter( 1466 t => !('defer_loading' in t && t.defer_loading), 1467 ) 1468 // Capture everything that could affect the server-side cache key. 1469 // Pass latched header values (not live state) so break detection 1470 // reflects what we actually send, not what the user toggled. 1471 recordPromptState({ 1472 system, 1473 toolSchemas: toolsForCacheDetection, 1474 querySource: options.querySource, 1475 model: options.model, 1476 agentId: options.agentId, 1477 fastMode: fastModeHeaderLatched, 1478 globalCacheStrategy, 1479 betas, 1480 autoModeActive: afkHeaderLatched, 1481 isUsingOverage: currentLimits.isUsingOverage ?? false, 1482 cachedMCEnabled: cacheEditingHeaderLatched, 1483 effortValue: effort, 1484 extraBodyParams: getExtraBodyParams(), 1485 }) 1486 } 1487 1488 const newContext: LLMRequestNewContext | undefined = isBetaTracingEnabled() 1489 ? { 1490 systemPrompt: systemPrompt.join('\n\n'), 1491 querySource: options.querySource, 1492 tools: jsonStringify(allTools), 1493 } 1494 : undefined 1495 1496 // Capture the span so we can pass it to endLLMRequestSpan later 1497 // This ensures responses are matched to the correct request when multiple requests run in parallel 1498 const llmSpan = startLLMRequestSpan( 1499 options.model, 1500 newContext, 1501 messagesForAPI, 1502 isFastMode, 1503 ) 1504 1505 const startIncludingRetries = Date.now() 1506 let start = Date.now() 1507 let attemptNumber = 0 1508 const attemptStartTimes: number[] = [] 1509 let stream: Stream<BetaRawMessageStreamEvent> | undefined = undefined 1510 let streamRequestId: string | null | undefined = undefined 1511 let clientRequestId: string | undefined = undefined 1512 // eslint-disable-next-line eslint-plugin-n/no-unsupported-features/node-builtins -- Response is available in Node 18+ and is used by the SDK 1513 let streamResponse: Response | undefined = undefined 1514 1515 // Release all stream resources to prevent native memory leaks. 1516 // The Response object holds native TLS/socket buffers that live outside the 1517 // V8 heap (observed on the Node.js/npm path; see GH #32920), so we must 1518 // explicitly cancel and release it regardless of how the generator exits. 1519 function releaseStreamResources(): void { 1520 cleanupStream(stream) 1521 stream = undefined 1522 if (streamResponse) { 1523 streamResponse.body?.cancel().catch(() => {}) 1524 streamResponse = undefined 1525 } 1526 } 1527 1528 // Consume pending cache edits ONCE before paramsFromContext is defined. 1529 // paramsFromContext is called multiple times (logging, retries), so consuming 1530 // inside it would cause the first call to steal edits from subsequent calls. 1531 const consumedCacheEdits = cachedMCEnabled ? consumePendingCacheEdits() : null 1532 const consumedPinnedEdits = cachedMCEnabled ? getPinnedCacheEdits() : [] 1533 1534 // Capture the betas sent in the last API request, including the ones that 1535 // were dynamically added, so we can log and send it to telemetry. 1536 let lastRequestBetas: string[] | undefined 1537 1538 const paramsFromContext = (retryContext: RetryContext) => { 1539 const betasParams = [...betas] 1540 1541 // Append 1M beta dynamically for the Sonnet 1M experiment. 1542 if ( 1543 !betasParams.includes(CONTEXT_1M_BETA_HEADER) && 1544 getSonnet1mExpTreatmentEnabled(retryContext.model) 1545 ) { 1546 betasParams.push(CONTEXT_1M_BETA_HEADER) 1547 } 1548 1549 // For Bedrock, include both model-based betas and dynamically-added tool search header 1550 const bedrockBetas = 1551 getAPIProvider() === 'bedrock' 1552 ? [ 1553 ...getBedrockExtraBodyParamsBetas(retryContext.model), 1554 ...(toolSearchHeader ? [toolSearchHeader] : []), 1555 ] 1556 : [] 1557 const extraBodyParams = getExtraBodyParams(bedrockBetas) 1558 1559 const outputConfig: BetaOutputConfig = { 1560 ...((extraBodyParams.output_config as BetaOutputConfig) ?? {}), 1561 } 1562 1563 configureEffortParams( 1564 effort, 1565 outputConfig, 1566 extraBodyParams, 1567 betasParams, 1568 options.model, 1569 ) 1570 1571 configureTaskBudgetParams( 1572 options.taskBudget, 1573 outputConfig as BetaOutputConfig & { task_budget?: TaskBudgetParam }, 1574 betasParams, 1575 ) 1576 1577 // Merge outputFormat into extraBodyParams.output_config alongside effort 1578 // Requires structured-outputs beta header per SDK (see parse() in messages.mjs) 1579 if (options.outputFormat && !('format' in outputConfig)) { 1580 outputConfig.format = options.outputFormat as BetaJSONOutputFormat 1581 // Add beta header if not already present and provider supports it 1582 if ( 1583 modelSupportsStructuredOutputs(options.model) && 1584 !betasParams.includes(STRUCTURED_OUTPUTS_BETA_HEADER) 1585 ) { 1586 betasParams.push(STRUCTURED_OUTPUTS_BETA_HEADER) 1587 } 1588 } 1589 1590 // Retry context gets preference because it tries to course correct if we exceed the context window limit 1591 const maxOutputTokens = 1592 retryContext?.maxTokensOverride || 1593 options.maxOutputTokensOverride || 1594 getMaxOutputTokensForModel(options.model) 1595 1596 const hasThinking = 1597 thinkingConfig.type !== 'disabled' && 1598 !isEnvTruthy(process.env.CLAUDE_CODE_DISABLE_THINKING) 1599 let thinking: BetaMessageStreamParams['thinking'] | undefined = undefined 1600 1601 // IMPORTANT: Do not change the adaptive-vs-budget thinking selection below 1602 // without notifying the model launch DRI and research. This is a sensitive 1603 // setting that can greatly affect model quality and bashing. 1604 if (hasThinking && modelSupportsThinking(options.model)) { 1605 if ( 1606 !isEnvTruthy(process.env.CLAUDE_CODE_DISABLE_ADAPTIVE_THINKING) && 1607 modelSupportsAdaptiveThinking(options.model) 1608 ) { 1609 // For models that support adaptive thinking, always use adaptive 1610 // thinking without a budget. 1611 thinking = { 1612 type: 'adaptive', 1613 } satisfies BetaMessageStreamParams['thinking'] 1614 } else { 1615 // For models that do not support adaptive thinking, use the default 1616 // thinking budget unless explicitly specified. 1617 let thinkingBudget = getMaxThinkingTokensForModel(options.model) 1618 if ( 1619 thinkingConfig.type === 'enabled' && 1620 thinkingConfig.budgetTokens !== undefined 1621 ) { 1622 thinkingBudget = thinkingConfig.budgetTokens 1623 } 1624 thinkingBudget = Math.min(maxOutputTokens - 1, thinkingBudget) 1625 thinking = { 1626 budget_tokens: thinkingBudget, 1627 type: 'enabled', 1628 } satisfies BetaMessageStreamParams['thinking'] 1629 } 1630 } 1631 1632 // Get API context management strategies if enabled 1633 const contextManagement = getAPIContextManagement({ 1634 hasThinking, 1635 isRedactThinkingActive: betasParams.includes(REDACT_THINKING_BETA_HEADER), 1636 clearAllThinking: thinkingClearLatched, 1637 }) 1638 1639 const enablePromptCaching = 1640 options.enablePromptCaching ?? getPromptCachingEnabled(retryContext.model) 1641 1642 // Fast mode: header is latched session-stable (cache-safe), but 1643 // `speed='fast'` stays dynamic so cooldown still suppresses the actual 1644 // fast-mode request without changing the cache key. 1645 let speed: BetaMessageStreamParams['speed'] 1646 const isFastModeForRetry = 1647 isFastModeEnabled() && 1648 isFastModeAvailable() && 1649 !isFastModeCooldown() && 1650 isFastModeSupportedByModel(options.model) && 1651 !!retryContext.fastMode 1652 if (isFastModeForRetry) { 1653 speed = 'fast' 1654 } 1655 if (fastModeHeaderLatched && !betasParams.includes(FAST_MODE_BETA_HEADER)) { 1656 betasParams.push(FAST_MODE_BETA_HEADER) 1657 } 1658 1659 // AFK mode beta: latched once auto mode is first activated. Still gated 1660 // by isAgenticQuery per-call so classifiers/compaction don't get it. 1661 if (feature('TRANSCRIPT_CLASSIFIER')) { 1662 if ( 1663 afkHeaderLatched && 1664 shouldIncludeFirstPartyOnlyBetas() && 1665 isAgenticQuery && 1666 !betasParams.includes(AFK_MODE_BETA_HEADER) 1667 ) { 1668 betasParams.push(AFK_MODE_BETA_HEADER) 1669 } 1670 } 1671 1672 // Cache editing beta: header is latched session-stable; useCachedMC 1673 // (controls cache_edits body behavior) stays live so edits stop when 1674 // the feature disables but the header doesn't flip. 1675 const useCachedMC = 1676 cachedMCEnabled && 1677 getAPIProvider() === 'firstParty' && 1678 options.querySource === 'repl_main_thread' 1679 if ( 1680 cacheEditingHeaderLatched && 1681 getAPIProvider() === 'firstParty' && 1682 options.querySource === 'repl_main_thread' && 1683 !betasParams.includes(cacheEditingBetaHeader) 1684 ) { 1685 betasParams.push(cacheEditingBetaHeader) 1686 logForDebugging( 1687 'Cache editing beta header enabled for cached microcompact', 1688 ) 1689 } 1690 1691 // Only send temperature when thinking is disabled — the API requires 1692 // temperature: 1 when thinking is enabled, which is already the default. 1693 const temperature = !hasThinking 1694 ? (options.temperatureOverride ?? 1) 1695 : undefined 1696 1697 lastRequestBetas = betasParams 1698 1699 return { 1700 model: normalizeModelStringForAPI(options.model), 1701 messages: addCacheBreakpoints( 1702 messagesForAPI, 1703 enablePromptCaching, 1704 options.querySource, 1705 useCachedMC, 1706 consumedCacheEdits, 1707 consumedPinnedEdits, 1708 options.skipCacheWrite, 1709 ), 1710 system, 1711 tools: allTools, 1712 tool_choice: options.toolChoice, 1713 ...(useBetas && { betas: betasParams }), 1714 metadata: getAPIMetadata(), 1715 max_tokens: maxOutputTokens, 1716 thinking, 1717 ...(temperature !== undefined && { temperature }), 1718 ...(contextManagement && 1719 useBetas && 1720 betasParams.includes(CONTEXT_MANAGEMENT_BETA_HEADER) && { 1721 context_management: contextManagement, 1722 }), 1723 ...extraBodyParams, 1724 ...(Object.keys(outputConfig).length > 0 && { 1725 output_config: outputConfig, 1726 }), 1727 ...(speed !== undefined && { speed }), 1728 } 1729 } 1730 1731 // Compute log scalars synchronously so the fire-and-forget .then() closure 1732 // captures only primitives instead of paramsFromContext's full closure scope 1733 // (messagesForAPI, system, allTools, betas — the entire request-building 1734 // context), which would otherwise be pinned until the promise resolves. 1735 { 1736 const queryParams = paramsFromContext({ 1737 model: options.model, 1738 thinkingConfig, 1739 }) 1740 const logMessagesLength = queryParams.messages.length 1741 const logBetas = useBetas ? (queryParams.betas ?? []) : [] 1742 const logThinkingType = queryParams.thinking?.type ?? 'disabled' 1743 const logEffortValue = queryParams.output_config?.effort 1744 void options.getToolPermissionContext().then(permissionContext => { 1745 logAPIQuery({ 1746 model: options.model, 1747 messagesLength: logMessagesLength, 1748 temperature: options.temperatureOverride ?? 1, 1749 betas: logBetas, 1750 permissionMode: permissionContext.mode, 1751 querySource: options.querySource, 1752 queryTracking: options.queryTracking, 1753 thinkingType: logThinkingType, 1754 effortValue: logEffortValue, 1755 fastMode: isFastMode, 1756 previousRequestId, 1757 }) 1758 }) 1759 } 1760 1761 const newMessages: AssistantMessage[] = [] 1762 let ttftMs = 0 1763 let partialMessage: BetaMessage | undefined = undefined 1764 const contentBlocks: (BetaContentBlock | ConnectorTextBlock)[] = [] 1765 let usage: NonNullableUsage = EMPTY_USAGE 1766 let costUSD = 0 1767 let stopReason: BetaStopReason | null = null 1768 let didFallBackToNonStreaming = false 1769 let fallbackMessage: AssistantMessage | undefined 1770 let maxOutputTokens = 0 1771 let responseHeaders: globalThis.Headers | undefined = undefined 1772 let research: unknown = undefined 1773 let isFastModeRequest = isFastMode // Keep separate state as it may change if falling back 1774 let isAdvisorInProgress = false 1775 1776 try { 1777 queryCheckpoint('query_client_creation_start') 1778 const generator = withRetry( 1779 () => 1780 getAnthropicClient({ 1781 maxRetries: 0, // Disabled auto-retry in favor of manual implementation 1782 model: options.model, 1783 fetchOverride: options.fetchOverride, 1784 source: options.querySource, 1785 }), 1786 async (anthropic, attempt, context) => { 1787 attemptNumber = attempt 1788 isFastModeRequest = context.fastMode ?? false 1789 start = Date.now() 1790 attemptStartTimes.push(start) 1791 // Client has been created by withRetry's getClient() call. This fires 1792 // once per attempt; on retries the client is usually cached (withRetry 1793 // only calls getClient() again after auth errors), so the delta from 1794 // client_creation_start is meaningful on attempt 1. 1795 queryCheckpoint('query_client_creation_end') 1796 1797 const params = paramsFromContext(context) 1798 captureAPIRequest(params, options.querySource) // Capture for bug reports 1799 1800 maxOutputTokens = params.max_tokens 1801 1802 // Fire immediately before the fetch is dispatched. .withResponse() below 1803 // awaits until response headers arrive, so this MUST be before the await 1804 // or the "Network TTFB" phase measurement is wrong. 1805 queryCheckpoint('query_api_request_sent') 1806 if (!options.agentId) { 1807 headlessProfilerCheckpoint('api_request_sent') 1808 } 1809 1810 // Generate and track client request ID so timeouts (which return no 1811 // server request ID) can still be correlated with server logs. 1812 // First-party only — 3P providers don't log it (inc-4029 class). 1813 clientRequestId = 1814 getAPIProvider() === 'firstParty' && isFirstPartyAnthropicBaseUrl() 1815 ? randomUUID() 1816 : undefined 1817 1818 // Use raw stream instead of BetaMessageStream to avoid O(n²) partial JSON parsing 1819 // BetaMessageStream calls partialParse() on every input_json_delta, which we don't need 1820 // since we handle tool input accumulation ourselves 1821 // biome-ignore lint/plugin: main conversation loop handles attribution separately 1822 const result = await anthropic.beta.messages 1823 .create( 1824 { ...params, stream: true }, 1825 { 1826 signal, 1827 ...(clientRequestId && { 1828 headers: { [CLIENT_REQUEST_ID_HEADER]: clientRequestId }, 1829 }), 1830 }, 1831 ) 1832 .withResponse() 1833 queryCheckpoint('query_response_headers_received') 1834 streamRequestId = result.request_id 1835 streamResponse = result.response 1836 return result.data 1837 }, 1838 { 1839 model: options.model, 1840 fallbackModel: options.fallbackModel, 1841 thinkingConfig, 1842 ...(isFastModeEnabled() ? { fastMode: isFastMode } : false), 1843 signal, 1844 querySource: options.querySource, 1845 }, 1846 ) 1847 1848 let e 1849 do { 1850 e = await generator.next() 1851 1852 // yield API error messages (the stream has a 'controller' property, error messages don't) 1853 if (!('controller' in e.value)) { 1854 yield e.value 1855 } 1856 } while (!e.done) 1857 stream = e.value as Stream<BetaRawMessageStreamEvent> 1858 1859 // reset state 1860 newMessages.length = 0 1861 ttftMs = 0 1862 partialMessage = undefined 1863 contentBlocks.length = 0 1864 usage = EMPTY_USAGE 1865 stopReason = null 1866 isAdvisorInProgress = false 1867 1868 // Streaming idle timeout watchdog: abort the stream if no chunks arrive 1869 // for STREAM_IDLE_TIMEOUT_MS. Unlike the stall detection below (which only 1870 // fires when the *next* chunk arrives), this uses setTimeout to actively 1871 // kill hung streams. Without this, a silently dropped connection can hang 1872 // the session indefinitely since the SDK's request timeout only covers the 1873 // initial fetch(), not the streaming body. 1874 const streamWatchdogEnabled = isEnvTruthy( 1875 process.env.CLAUDE_ENABLE_STREAM_WATCHDOG, 1876 ) 1877 const STREAM_IDLE_TIMEOUT_MS = 1878 parseInt(process.env.CLAUDE_STREAM_IDLE_TIMEOUT_MS || '', 10) || 90_000 1879 const STREAM_IDLE_WARNING_MS = STREAM_IDLE_TIMEOUT_MS / 2 1880 let streamIdleAborted = false 1881 // performance.now() snapshot when watchdog fires, for measuring abort propagation delay 1882 let streamWatchdogFiredAt: number | null = null 1883 let streamIdleWarningTimer: ReturnType<typeof setTimeout> | null = null 1884 let streamIdleTimer: ReturnType<typeof setTimeout> | null = null 1885 function clearStreamIdleTimers(): void { 1886 if (streamIdleWarningTimer !== null) { 1887 clearTimeout(streamIdleWarningTimer) 1888 streamIdleWarningTimer = null 1889 } 1890 if (streamIdleTimer !== null) { 1891 clearTimeout(streamIdleTimer) 1892 streamIdleTimer = null 1893 } 1894 } 1895 function resetStreamIdleTimer(): void { 1896 clearStreamIdleTimers() 1897 if (!streamWatchdogEnabled) { 1898 return 1899 } 1900 streamIdleWarningTimer = setTimeout( 1901 warnMs => { 1902 logForDebugging( 1903 `Streaming idle warning: no chunks received for ${warnMs / 1000}s`, 1904 { level: 'warn' }, 1905 ) 1906 logForDiagnosticsNoPII('warn', 'cli_streaming_idle_warning') 1907 }, 1908 STREAM_IDLE_WARNING_MS, 1909 STREAM_IDLE_WARNING_MS, 1910 ) 1911 streamIdleTimer = setTimeout(() => { 1912 streamIdleAborted = true 1913 streamWatchdogFiredAt = performance.now() 1914 logForDebugging( 1915 `Streaming idle timeout: no chunks received for ${STREAM_IDLE_TIMEOUT_MS / 1000}s, aborting stream`, 1916 { level: 'error' }, 1917 ) 1918 logForDiagnosticsNoPII('error', 'cli_streaming_idle_timeout') 1919 logEvent('tengu_streaming_idle_timeout', { 1920 model: 1921 options.model as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 1922 request_id: (streamRequestId ?? 1923 'unknown') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 1924 timeout_ms: STREAM_IDLE_TIMEOUT_MS, 1925 }) 1926 releaseStreamResources() 1927 }, STREAM_IDLE_TIMEOUT_MS) 1928 } 1929 resetStreamIdleTimer() 1930 1931 startSessionActivity('api_call') 1932 try { 1933 // stream in and accumulate state 1934 let isFirstChunk = true 1935 let lastEventTime: number | null = null // Set after first chunk to avoid measuring TTFB as a stall 1936 const STALL_THRESHOLD_MS = 30_000 // 30 seconds 1937 let totalStallTime = 0 1938 let stallCount = 0 1939 1940 for await (const part of stream) { 1941 resetStreamIdleTimer() 1942 const now = Date.now() 1943 1944 // Detect and log streaming stalls (only after first event to avoid counting TTFB) 1945 if (lastEventTime !== null) { 1946 const timeSinceLastEvent = now - lastEventTime 1947 if (timeSinceLastEvent > STALL_THRESHOLD_MS) { 1948 stallCount++ 1949 totalStallTime += timeSinceLastEvent 1950 logForDebugging( 1951 `Streaming stall detected: ${(timeSinceLastEvent / 1000).toFixed(1)}s gap between events (stall #${stallCount})`, 1952 { level: 'warn' }, 1953 ) 1954 logEvent('tengu_streaming_stall', { 1955 stall_duration_ms: timeSinceLastEvent, 1956 stall_count: stallCount, 1957 total_stall_time_ms: totalStallTime, 1958 event_type: 1959 part.type as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 1960 model: 1961 options.model as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 1962 request_id: (streamRequestId ?? 1963 'unknown') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 1964 }) 1965 } 1966 } 1967 lastEventTime = now 1968 1969 if (isFirstChunk) { 1970 logForDebugging('Stream started - received first chunk') 1971 queryCheckpoint('query_first_chunk_received') 1972 if (!options.agentId) { 1973 headlessProfilerCheckpoint('first_chunk') 1974 } 1975 endQueryProfile() 1976 isFirstChunk = false 1977 } 1978 1979 switch (part.type) { 1980 case 'message_start': { 1981 partialMessage = part.message 1982 ttftMs = Date.now() - start 1983 usage = updateUsage(usage, part.message?.usage) 1984 // Capture research from message_start if available (internal only). 1985 // Always overwrite with the latest value. 1986 if ( 1987 process.env.USER_TYPE === 'ant' && 1988 'research' in (part.message as unknown as Record<string, unknown>) 1989 ) { 1990 research = (part.message as unknown as Record<string, unknown>) 1991 .research 1992 } 1993 break 1994 } 1995 case 'content_block_start': 1996 switch (part.content_block.type) { 1997 case 'tool_use': 1998 contentBlocks[part.index] = { 1999 ...part.content_block, 2000 input: '', 2001 } 2002 break 2003 case 'server_tool_use': 2004 contentBlocks[part.index] = { 2005 ...part.content_block, 2006 input: '' as unknown as { [key: string]: unknown }, 2007 } 2008 if ((part.content_block.name as string) === 'advisor') { 2009 isAdvisorInProgress = true 2010 logForDebugging(`[AdvisorTool] Advisor tool called`) 2011 logEvent('tengu_advisor_tool_call', { 2012 model: 2013 options.model as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 2014 advisor_model: (advisorModel ?? 2015 'unknown') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 2016 }) 2017 } 2018 break 2019 case 'text': 2020 contentBlocks[part.index] = { 2021 ...part.content_block, 2022 // awkwardly, the sdk sometimes returns text as part of a 2023 // content_block_start message, then returns the same text 2024 // again in a content_block_delta message. we ignore it here 2025 // since there doesn't seem to be a way to detect when a 2026 // content_block_delta message duplicates the text. 2027 text: '', 2028 } 2029 break 2030 case 'thinking': 2031 contentBlocks[part.index] = { 2032 ...part.content_block, 2033 // also awkward 2034 thinking: '', 2035 // initialize signature to ensure field exists even if signature_delta never arrives 2036 signature: '', 2037 } 2038 break 2039 default: 2040 // even more awkwardly, the sdk mutates the contents of text blocks 2041 // as it works. we want the blocks to be immutable, so that we can 2042 // accumulate state ourselves. 2043 contentBlocks[part.index] = { ...part.content_block } 2044 if ( 2045 (part.content_block.type as string) === 'advisor_tool_result' 2046 ) { 2047 isAdvisorInProgress = false 2048 logForDebugging(`[AdvisorTool] Advisor tool result received`) 2049 } 2050 break 2051 } 2052 break 2053 case 'content_block_delta': { 2054 const contentBlock = contentBlocks[part.index] 2055 const delta = part.delta as typeof part.delta | ConnectorTextDelta 2056 if (!contentBlock) { 2057 logEvent('tengu_streaming_error', { 2058 error_type: 2059 'content_block_not_found_delta' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 2060 part_type: 2061 part.type as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 2062 part_index: part.index, 2063 }) 2064 throw new RangeError('Content block not found') 2065 } 2066 if ( 2067 feature('CONNECTOR_TEXT') && 2068 delta.type === 'connector_text_delta' 2069 ) { 2070 if (contentBlock.type !== 'connector_text') { 2071 logEvent('tengu_streaming_error', { 2072 error_type: 2073 'content_block_type_mismatch_connector_text' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 2074 expected_type: 2075 'connector_text' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 2076 actual_type: 2077 contentBlock.type as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 2078 }) 2079 throw new Error('Content block is not a connector_text block') 2080 } 2081 contentBlock.connector_text += delta.connector_text 2082 } else { 2083 switch (delta.type) { 2084 case 'citations_delta': 2085 // TODO: handle citations 2086 break 2087 case 'input_json_delta': 2088 if ( 2089 contentBlock.type !== 'tool_use' && 2090 contentBlock.type !== 'server_tool_use' 2091 ) { 2092 logEvent('tengu_streaming_error', { 2093 error_type: 2094 'content_block_type_mismatch_input_json' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 2095 expected_type: 2096 'tool_use' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 2097 actual_type: 2098 contentBlock.type as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 2099 }) 2100 throw new Error('Content block is not a input_json block') 2101 } 2102 if (typeof contentBlock.input !== 'string') { 2103 logEvent('tengu_streaming_error', { 2104 error_type: 2105 'content_block_input_not_string' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 2106 input_type: 2107 typeof contentBlock.input as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 2108 }) 2109 throw new Error('Content block input is not a string') 2110 } 2111 contentBlock.input += delta.partial_json 2112 break 2113 case 'text_delta': 2114 if (contentBlock.type !== 'text') { 2115 logEvent('tengu_streaming_error', { 2116 error_type: 2117 'content_block_type_mismatch_text' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 2118 expected_type: 2119 'text' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 2120 actual_type: 2121 contentBlock.type as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 2122 }) 2123 throw new Error('Content block is not a text block') 2124 } 2125 contentBlock.text += delta.text 2126 break 2127 case 'signature_delta': 2128 if ( 2129 feature('CONNECTOR_TEXT') && 2130 contentBlock.type === 'connector_text' 2131 ) { 2132 contentBlock.signature = delta.signature 2133 break 2134 } 2135 if (contentBlock.type !== 'thinking') { 2136 logEvent('tengu_streaming_error', { 2137 error_type: 2138 'content_block_type_mismatch_thinking_signature' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 2139 expected_type: 2140 'thinking' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 2141 actual_type: 2142 contentBlock.type as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 2143 }) 2144 throw new Error('Content block is not a thinking block') 2145 } 2146 contentBlock.signature = delta.signature 2147 break 2148 case 'thinking_delta': 2149 if (contentBlock.type !== 'thinking') { 2150 logEvent('tengu_streaming_error', { 2151 error_type: 2152 'content_block_type_mismatch_thinking_delta' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 2153 expected_type: 2154 'thinking' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 2155 actual_type: 2156 contentBlock.type as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 2157 }) 2158 throw new Error('Content block is not a thinking block') 2159 } 2160 contentBlock.thinking += delta.thinking 2161 break 2162 } 2163 } 2164 // Capture research from content_block_delta if available (internal only). 2165 // Always overwrite with the latest value. 2166 if (process.env.USER_TYPE === 'ant' && 'research' in part) { 2167 research = (part as { research: unknown }).research 2168 } 2169 break 2170 } 2171 case 'content_block_stop': { 2172 const contentBlock = contentBlocks[part.index] 2173 if (!contentBlock) { 2174 logEvent('tengu_streaming_error', { 2175 error_type: 2176 'content_block_not_found_stop' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 2177 part_type: 2178 part.type as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 2179 part_index: part.index, 2180 }) 2181 throw new RangeError('Content block not found') 2182 } 2183 if (!partialMessage) { 2184 logEvent('tengu_streaming_error', { 2185 error_type: 2186 'partial_message_not_found' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 2187 part_type: 2188 part.type as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 2189 }) 2190 throw new Error('Message not found') 2191 } 2192 const m: AssistantMessage = { 2193 message: { 2194 ...partialMessage, 2195 content: normalizeContentFromAPI( 2196 [contentBlock] as BetaContentBlock[], 2197 tools, 2198 options.agentId, 2199 ), 2200 }, 2201 requestId: streamRequestId ?? undefined, 2202 type: 'assistant', 2203 uuid: randomUUID(), 2204 timestamp: new Date().toISOString(), 2205 ...(process.env.USER_TYPE === 'ant' && 2206 research !== undefined && { research }), 2207 ...(advisorModel && { advisorModel }), 2208 } 2209 newMessages.push(m) 2210 yield m 2211 break 2212 } 2213 case 'message_delta': { 2214 usage = updateUsage(usage, part.usage) 2215 // Capture research from message_delta if available (internal only). 2216 // Always overwrite with the latest value. Also write back to 2217 // already-yielded messages since message_delta arrives after 2218 // content_block_stop. 2219 if ( 2220 process.env.USER_TYPE === 'ant' && 2221 'research' in (part as unknown as Record<string, unknown>) 2222 ) { 2223 research = (part as unknown as Record<string, unknown>).research 2224 for (const msg of newMessages) { 2225 msg.research = research 2226 } 2227 } 2228 2229 // Write final usage and stop_reason back to the last yielded 2230 // message. Messages are created at content_block_stop from 2231 // partialMessage, which was set at message_start before any tokens 2232 // were generated (output_tokens: 0, stop_reason: null). 2233 // message_delta arrives after content_block_stop with the real 2234 // values. 2235 // 2236 // IMPORTANT: Use direct property mutation, not object replacement. 2237 // The transcript write queue holds a reference to message.message 2238 // and serializes it lazily (100ms flush interval). Object 2239 // replacement ({ ...lastMsg.message, usage }) would disconnect 2240 // the queued reference; direct mutation ensures the transcript 2241 // captures the final values. 2242 stopReason = part.delta.stop_reason 2243 2244 const lastMsg = newMessages.at(-1) 2245 if (lastMsg) { 2246 lastMsg.message.usage = usage 2247 lastMsg.message.stop_reason = stopReason 2248 } 2249 2250 // Update cost 2251 const costUSDForPart = calculateUSDCost(resolvedModel, usage) 2252 costUSD += addToTotalSessionCost( 2253 costUSDForPart, 2254 usage, 2255 options.model, 2256 ) 2257 2258 const refusalMessage = getErrorMessageIfRefusal( 2259 part.delta.stop_reason, 2260 options.model, 2261 ) 2262 if (refusalMessage) { 2263 yield refusalMessage 2264 } 2265 2266 if (stopReason === 'max_tokens') { 2267 logEvent('tengu_max_tokens_reached', { 2268 max_tokens: maxOutputTokens, 2269 }) 2270 yield createAssistantAPIErrorMessage({ 2271 content: `${API_ERROR_MESSAGE_PREFIX}: Claude's response exceeded the ${ 2272 maxOutputTokens 2273 } output token maximum. To configure this behavior, set the CLAUDE_CODE_MAX_OUTPUT_TOKENS environment variable.`, 2274 apiError: 'max_output_tokens', 2275 error: 'max_output_tokens', 2276 }) 2277 } 2278 2279 if (stopReason === 'model_context_window_exceeded') { 2280 logEvent('tengu_context_window_exceeded', { 2281 max_tokens: maxOutputTokens, 2282 output_tokens: usage.output_tokens, 2283 }) 2284 // Reuse the max_output_tokens recovery path — from the model's 2285 // perspective, both mean "response was cut off, continue from 2286 // where you left off." 2287 yield createAssistantAPIErrorMessage({ 2288 content: `${API_ERROR_MESSAGE_PREFIX}: The model has reached its context window limit.`, 2289 apiError: 'max_output_tokens', 2290 error: 'max_output_tokens', 2291 }) 2292 } 2293 break 2294 } 2295 case 'message_stop': 2296 break 2297 } 2298 2299 yield { 2300 type: 'stream_event', 2301 event: part, 2302 ...(part.type === 'message_start' ? { ttftMs } : undefined), 2303 } 2304 } 2305 // Clear the idle timeout watchdog now that the stream loop has exited 2306 clearStreamIdleTimers() 2307 2308 // If the stream was aborted by our idle timeout watchdog, fall back to 2309 // non-streaming retry rather than treating it as a completed stream. 2310 if (streamIdleAborted) { 2311 // Instrumentation: proves the for-await exited after the watchdog fired 2312 // (vs. hung forever). exit_delay_ms measures abort propagation latency: 2313 // 0-10ms = abort worked; >>1000ms = something else woke the loop. 2314 const exitDelayMs = 2315 streamWatchdogFiredAt !== null 2316 ? Math.round(performance.now() - streamWatchdogFiredAt) 2317 : -1 2318 logForDiagnosticsNoPII( 2319 'info', 2320 'cli_stream_loop_exited_after_watchdog_clean', 2321 ) 2322 logEvent('tengu_stream_loop_exited_after_watchdog', { 2323 request_id: (streamRequestId ?? 2324 'unknown') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 2325 exit_delay_ms: exitDelayMs, 2326 exit_path: 2327 'clean' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 2328 model: 2329 options.model as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 2330 }) 2331 // Prevent double-emit: this throw lands in the catch block below, 2332 // whose exit_path='error' probe guards on streamWatchdogFiredAt. 2333 streamWatchdogFiredAt = null 2334 throw new Error('Stream idle timeout - no chunks received') 2335 } 2336 2337 // Detect when the stream completed without producing any assistant messages. 2338 // This covers two proxy failure modes: 2339 // 1. No events at all (!partialMessage): proxy returned 200 with non-SSE body 2340 // 2. Partial events (partialMessage set but no content blocks completed AND 2341 // no stop_reason received): proxy returned message_start but stream ended 2342 // before content_block_stop and before message_delta with stop_reason 2343 // BetaMessageStream had the first check in _endRequest() but the raw Stream 2344 // does not - without it the generator silently returns no assistant messages, 2345 // causing "Execution error" in -p mode. 2346 // Note: We must check stopReason to avoid false positives. For example, with 2347 // structured output (--json-schema), the model calls a StructuredOutput tool 2348 // on turn 1, then on turn 2 responds with end_turn and no content blocks. 2349 // That's a legitimate empty response, not an incomplete stream. 2350 if (!partialMessage || (newMessages.length === 0 && !stopReason)) { 2351 logForDebugging( 2352 !partialMessage 2353 ? 'Stream completed without receiving message_start event - triggering non-streaming fallback' 2354 : 'Stream completed with message_start but no content blocks completed - triggering non-streaming fallback', 2355 { level: 'error' }, 2356 ) 2357 logEvent('tengu_stream_no_events', { 2358 model: 2359 options.model as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 2360 request_id: (streamRequestId ?? 2361 'unknown') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 2362 }) 2363 throw new Error('Stream ended without receiving any events') 2364 } 2365 2366 // Log summary if any stalls occurred during streaming 2367 if (stallCount > 0) { 2368 logForDebugging( 2369 `Streaming completed with ${stallCount} stall(s), total stall time: ${(totalStallTime / 1000).toFixed(1)}s`, 2370 { level: 'warn' }, 2371 ) 2372 logEvent('tengu_streaming_stall_summary', { 2373 stall_count: stallCount, 2374 total_stall_time_ms: totalStallTime, 2375 model: 2376 options.model as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 2377 request_id: (streamRequestId ?? 2378 'unknown') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 2379 }) 2380 } 2381 2382 // Check if the cache actually broke based on response tokens 2383 if (feature('PROMPT_CACHE_BREAK_DETECTION')) { 2384 void checkResponseForCacheBreak( 2385 options.querySource, 2386 usage.cache_read_input_tokens, 2387 usage.cache_creation_input_tokens, 2388 messages, 2389 options.agentId, 2390 streamRequestId, 2391 ) 2392 } 2393 2394 // Process fallback percentage header and quota status if available 2395 // streamResponse is set when the stream is created in the withRetry callback above 2396 // TypeScript's control flow analysis can't track that streamResponse is set in the callback 2397 // eslint-disable-next-line eslint-plugin-n/no-unsupported-features/node-builtins 2398 const resp = streamResponse as unknown as Response | undefined 2399 if (resp) { 2400 extractQuotaStatusFromHeaders(resp.headers) 2401 // Store headers for gateway detection 2402 responseHeaders = resp.headers 2403 } 2404 } catch (streamingError) { 2405 // Clear the idle timeout watchdog on error path too 2406 clearStreamIdleTimers() 2407 2408 // Instrumentation: if the watchdog had already fired and the for-await 2409 // threw (rather than exiting cleanly), record that the loop DID exit and 2410 // how long after the watchdog. Distinguishes true hangs from error exits. 2411 if (streamIdleAborted && streamWatchdogFiredAt !== null) { 2412 const exitDelayMs = Math.round( 2413 performance.now() - streamWatchdogFiredAt, 2414 ) 2415 logForDiagnosticsNoPII( 2416 'info', 2417 'cli_stream_loop_exited_after_watchdog_error', 2418 ) 2419 logEvent('tengu_stream_loop_exited_after_watchdog', { 2420 request_id: (streamRequestId ?? 2421 'unknown') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 2422 exit_delay_ms: exitDelayMs, 2423 exit_path: 2424 'error' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 2425 error_name: 2426 streamingError instanceof Error 2427 ? (streamingError.name as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS) 2428 : ('unknown' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS), 2429 model: 2430 options.model as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 2431 }) 2432 } 2433 2434 if (streamingError instanceof APIUserAbortError) { 2435 // Check if the abort signal was triggered by the user (ESC key) 2436 // If the signal is aborted, it's a user-initiated abort 2437 // If not, it's likely a timeout from the SDK 2438 if (signal.aborted) { 2439 // This is a real user abort (ESC key was pressed) 2440 logForDebugging( 2441 `Streaming aborted by user: ${errorMessage(streamingError)}`, 2442 ) 2443 if (isAdvisorInProgress) { 2444 logEvent('tengu_advisor_tool_interrupted', { 2445 model: 2446 options.model as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 2447 advisor_model: (advisorModel ?? 2448 'unknown') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 2449 }) 2450 } 2451 throw streamingError 2452 } else { 2453 // The SDK threw APIUserAbortError but our signal wasn't aborted 2454 // This means it's a timeout from the SDK's internal timeout 2455 logForDebugging( 2456 `Streaming timeout (SDK abort): ${streamingError.message}`, 2457 { level: 'error' }, 2458 ) 2459 // Throw a more specific error for timeout 2460 throw new APIConnectionTimeoutError({ message: 'Request timed out' }) 2461 } 2462 } 2463 2464 // When the flag is enabled, skip the non-streaming fallback and let the 2465 // error propagate to withRetry. The mid-stream fallback causes double tool 2466 // execution when streaming tool execution is active: the partial stream 2467 // starts a tool, then the non-streaming retry produces the same tool_use 2468 // and runs it again. See inc-4258. 2469 const disableFallback = 2470 isEnvTruthy(process.env.CLAUDE_CODE_DISABLE_NONSTREAMING_FALLBACK) || 2471 getFeatureValue_CACHED_MAY_BE_STALE( 2472 'tengu_disable_streaming_to_non_streaming_fallback', 2473 false, 2474 ) 2475 2476 if (disableFallback) { 2477 logForDebugging( 2478 `Error streaming (non-streaming fallback disabled): ${errorMessage(streamingError)}`, 2479 { level: 'error' }, 2480 ) 2481 logEvent('tengu_streaming_fallback_to_non_streaming', { 2482 model: 2483 options.model as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 2484 error: 2485 streamingError instanceof Error 2486 ? (streamingError.name as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS) 2487 : (String( 2488 streamingError, 2489 ) as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS), 2490 attemptNumber, 2491 maxOutputTokens, 2492 thinkingType: 2493 thinkingConfig.type as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 2494 fallback_disabled: true, 2495 request_id: (streamRequestId ?? 2496 'unknown') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 2497 fallback_cause: (streamIdleAborted 2498 ? 'watchdog' 2499 : 'other') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 2500 }) 2501 throw streamingError 2502 } 2503 2504 logForDebugging( 2505 `Error streaming, falling back to non-streaming mode: ${errorMessage(streamingError)}`, 2506 { level: 'error' }, 2507 ) 2508 didFallBackToNonStreaming = true 2509 if (options.onStreamingFallback) { 2510 options.onStreamingFallback() 2511 } 2512 2513 logEvent('tengu_streaming_fallback_to_non_streaming', { 2514 model: 2515 options.model as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 2516 error: 2517 streamingError instanceof Error 2518 ? (streamingError.name as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS) 2519 : (String( 2520 streamingError, 2521 ) as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS), 2522 attemptNumber, 2523 maxOutputTokens, 2524 thinkingType: 2525 thinkingConfig.type as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 2526 fallback_disabled: false, 2527 request_id: (streamRequestId ?? 2528 'unknown') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 2529 fallback_cause: (streamIdleAborted 2530 ? 'watchdog' 2531 : 'other') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 2532 }) 2533 2534 // Fall back to non-streaming mode with retries. 2535 // If the streaming failure was itself a 529, count it toward the 2536 // consecutive-529 budget so total 529s-before-model-fallback is the 2537 // same whether the overload was hit in streaming or non-streaming mode. 2538 // This is a speculative fix for https://github.com/anthropics/claude-code/issues/1513 2539 // Instrumentation: proves executeNonStreamingRequest was entered (vs. the 2540 // fallback event firing but the call itself hanging at dispatch). 2541 logForDiagnosticsNoPII('info', 'cli_nonstreaming_fallback_started') 2542 logEvent('tengu_nonstreaming_fallback_started', { 2543 request_id: (streamRequestId ?? 2544 'unknown') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 2545 model: 2546 options.model as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 2547 fallback_cause: (streamIdleAborted 2548 ? 'watchdog' 2549 : 'other') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 2550 }) 2551 const result = yield* executeNonStreamingRequest( 2552 { model: options.model, source: options.querySource }, 2553 { 2554 model: options.model, 2555 fallbackModel: options.fallbackModel, 2556 thinkingConfig, 2557 ...(isFastModeEnabled() && { fastMode: isFastMode }), 2558 signal, 2559 initialConsecutive529Errors: is529Error(streamingError) ? 1 : 0, 2560 querySource: options.querySource, 2561 }, 2562 paramsFromContext, 2563 (attempt, _startTime, tokens) => { 2564 attemptNumber = attempt 2565 maxOutputTokens = tokens 2566 }, 2567 params => captureAPIRequest(params, options.querySource), 2568 streamRequestId, 2569 ) 2570 2571 const m: AssistantMessage = { 2572 message: { 2573 ...result, 2574 content: normalizeContentFromAPI( 2575 result.content, 2576 tools, 2577 options.agentId, 2578 ), 2579 }, 2580 requestId: streamRequestId ?? undefined, 2581 type: 'assistant', 2582 uuid: randomUUID(), 2583 timestamp: new Date().toISOString(), 2584 ...(process.env.USER_TYPE === 'ant' && 2585 research !== undefined && { 2586 research, 2587 }), 2588 ...(advisorModel && { 2589 advisorModel, 2590 }), 2591 } 2592 newMessages.push(m) 2593 fallbackMessage = m 2594 yield m 2595 } finally { 2596 clearStreamIdleTimers() 2597 } 2598 } catch (errorFromRetry) { 2599 // FallbackTriggeredError must propagate to query.ts, which performs the 2600 // actual model switch. Swallowing it here would turn the fallback into a 2601 // no-op — the user would just see "Model fallback triggered: X -> Y" as 2602 // an error message with no actual retry on the fallback model. 2603 if (errorFromRetry instanceof FallbackTriggeredError) { 2604 throw errorFromRetry 2605 } 2606 2607 // Check if this is a 404 error during stream creation that should trigger 2608 // non-streaming fallback. This handles gateways that return 404 for streaming 2609 // endpoints but work fine with non-streaming. Before v2.1.8, BetaMessageStream 2610 // threw 404s during iteration (caught by inner catch with fallback), but now 2611 // with raw streams, 404s are thrown during creation (caught here). 2612 const is404StreamCreationError = 2613 !didFallBackToNonStreaming && 2614 errorFromRetry instanceof CannotRetryError && 2615 errorFromRetry.originalError instanceof APIError && 2616 errorFromRetry.originalError.status === 404 2617 2618 if (is404StreamCreationError) { 2619 // 404 is thrown at .withResponse() before streamRequestId is assigned, 2620 // and CannotRetryError means every retry failed — so grab the failed 2621 // request's ID from the error header instead. 2622 const failedRequestId = 2623 (errorFromRetry.originalError as APIError).requestID ?? 'unknown' 2624 logForDebugging( 2625 'Streaming endpoint returned 404, falling back to non-streaming mode', 2626 { level: 'warn' }, 2627 ) 2628 didFallBackToNonStreaming = true 2629 if (options.onStreamingFallback) { 2630 options.onStreamingFallback() 2631 } 2632 2633 logEvent('tengu_streaming_fallback_to_non_streaming', { 2634 model: 2635 options.model as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 2636 error: 2637 '404_stream_creation' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 2638 attemptNumber, 2639 maxOutputTokens, 2640 thinkingType: 2641 thinkingConfig.type as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 2642 request_id: 2643 failedRequestId as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 2644 fallback_cause: 2645 '404_stream_creation' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 2646 }) 2647 2648 try { 2649 // Fall back to non-streaming mode 2650 const result = yield* executeNonStreamingRequest( 2651 { model: options.model, source: options.querySource }, 2652 { 2653 model: options.model, 2654 fallbackModel: options.fallbackModel, 2655 thinkingConfig, 2656 ...(isFastModeEnabled() && { fastMode: isFastMode }), 2657 signal, 2658 }, 2659 paramsFromContext, 2660 (attempt, _startTime, tokens) => { 2661 attemptNumber = attempt 2662 maxOutputTokens = tokens 2663 }, 2664 params => captureAPIRequest(params, options.querySource), 2665 failedRequestId, 2666 ) 2667 2668 const m: AssistantMessage = { 2669 message: { 2670 ...result, 2671 content: normalizeContentFromAPI( 2672 result.content, 2673 tools, 2674 options.agentId, 2675 ), 2676 }, 2677 requestId: streamRequestId ?? undefined, 2678 type: 'assistant', 2679 uuid: randomUUID(), 2680 timestamp: new Date().toISOString(), 2681 ...(process.env.USER_TYPE === 'ant' && 2682 research !== undefined && { research }), 2683 ...(advisorModel && { advisorModel }), 2684 } 2685 newMessages.push(m) 2686 fallbackMessage = m 2687 yield m 2688 2689 // Continue to success logging below 2690 } catch (fallbackError) { 2691 // Propagate model-fallback signal to query.ts (see comment above). 2692 if (fallbackError instanceof FallbackTriggeredError) { 2693 throw fallbackError 2694 } 2695 2696 // Fallback also failed, handle as normal error 2697 logForDebugging( 2698 `Non-streaming fallback also failed: ${errorMessage(fallbackError)}`, 2699 { level: 'error' }, 2700 ) 2701 2702 let error = fallbackError 2703 let errorModel = options.model 2704 if (fallbackError instanceof CannotRetryError) { 2705 error = fallbackError.originalError 2706 errorModel = fallbackError.retryContext.model 2707 } 2708 2709 if (error instanceof APIError) { 2710 extractQuotaStatusFromError(error) 2711 } 2712 2713 const requestId = 2714 streamRequestId || 2715 (error instanceof APIError ? error.requestID : undefined) || 2716 (error instanceof APIError 2717 ? (error.error as { request_id?: string })?.request_id 2718 : undefined) 2719 2720 logAPIError({ 2721 error, 2722 model: errorModel, 2723 messageCount: messagesForAPI.length, 2724 messageTokens: tokenCountFromLastAPIResponse(messagesForAPI), 2725 durationMs: Date.now() - start, 2726 durationMsIncludingRetries: Date.now() - startIncludingRetries, 2727 attempt: attemptNumber, 2728 requestId, 2729 clientRequestId, 2730 didFallBackToNonStreaming, 2731 queryTracking: options.queryTracking, 2732 querySource: options.querySource, 2733 llmSpan, 2734 fastMode: isFastModeRequest, 2735 previousRequestId, 2736 }) 2737 2738 if (error instanceof APIUserAbortError) { 2739 releaseStreamResources() 2740 return 2741 } 2742 2743 yield getAssistantMessageFromError(error, errorModel, { 2744 messages, 2745 messagesForAPI, 2746 }) 2747 releaseStreamResources() 2748 return 2749 } 2750 } else { 2751 // Original error handling for non-404 errors 2752 logForDebugging(`Error in API request: ${errorMessage(errorFromRetry)}`, { 2753 level: 'error', 2754 }) 2755 2756 let error = errorFromRetry 2757 let errorModel = options.model 2758 if (errorFromRetry instanceof CannotRetryError) { 2759 error = errorFromRetry.originalError 2760 errorModel = errorFromRetry.retryContext.model 2761 } 2762 2763 // Extract quota status from error headers if it's a rate limit error 2764 if (error instanceof APIError) { 2765 extractQuotaStatusFromError(error) 2766 } 2767 2768 // Extract requestId from stream, error header, or error body 2769 const requestId = 2770 streamRequestId || 2771 (error instanceof APIError ? error.requestID : undefined) || 2772 (error instanceof APIError 2773 ? (error.error as { request_id?: string })?.request_id 2774 : undefined) 2775 2776 logAPIError({ 2777 error, 2778 model: errorModel, 2779 messageCount: messagesForAPI.length, 2780 messageTokens: tokenCountFromLastAPIResponse(messagesForAPI), 2781 durationMs: Date.now() - start, 2782 durationMsIncludingRetries: Date.now() - startIncludingRetries, 2783 attempt: attemptNumber, 2784 requestId, 2785 clientRequestId, 2786 didFallBackToNonStreaming, 2787 queryTracking: options.queryTracking, 2788 querySource: options.querySource, 2789 llmSpan, 2790 fastMode: isFastModeRequest, 2791 previousRequestId, 2792 }) 2793 2794 // Don't yield an assistant error message for user aborts 2795 // The interruption message is handled in query.ts 2796 if (error instanceof APIUserAbortError) { 2797 releaseStreamResources() 2798 return 2799 } 2800 2801 yield getAssistantMessageFromError(error, errorModel, { 2802 messages, 2803 messagesForAPI, 2804 }) 2805 releaseStreamResources() 2806 return 2807 } 2808 } finally { 2809 stopSessionActivity('api_call') 2810 // Must be in the finally block: if the generator is terminated early 2811 // via .return() (e.g. consumer breaks out of for-await-of, or query.ts 2812 // encounters an abort), code after the try/finally never executes. 2813 // Without this, the Response object's native TLS/socket buffers leak 2814 // until the generator itself is GC'd (see GH #32920). 2815 releaseStreamResources() 2816 2817 // Non-streaming fallback cost: the streaming path tracks cost in the 2818 // message_delta handler before any yield. Fallback pushes to newMessages 2819 // then yields, so tracking must be here to survive .return() at the yield. 2820 if (fallbackMessage) { 2821 const fallbackUsage = fallbackMessage.message.usage 2822 usage = updateUsage(EMPTY_USAGE, fallbackUsage) 2823 stopReason = fallbackMessage.message.stop_reason 2824 const fallbackCost = calculateUSDCost(resolvedModel, fallbackUsage) 2825 costUSD += addToTotalSessionCost( 2826 fallbackCost, 2827 fallbackUsage, 2828 options.model, 2829 ) 2830 } 2831 } 2832 2833 // Mark all registered tools as sent to API so they become eligible for deletion 2834 if (feature('CACHED_MICROCOMPACT') && cachedMCEnabled) { 2835 markToolsSentToAPIState() 2836 } 2837 2838 // Track the last requestId for the main conversation chain so shutdown 2839 // can send a cache eviction hint to inference. Exclude backgrounded 2840 // sessions (Ctrl+B) which share the repl_main_thread querySource but 2841 // run inside an agent context — they are independent conversation chains 2842 // whose cache should not be evicted when the foreground session clears. 2843 if ( 2844 streamRequestId && 2845 !getAgentContext() && 2846 (options.querySource.startsWith('repl_main_thread') || 2847 options.querySource === 'sdk') 2848 ) { 2849 setLastMainRequestId(streamRequestId) 2850 } 2851 2852 // Precompute scalars so the fire-and-forget .then() closure doesn't pin the 2853 // full messagesForAPI array (the entire conversation up to the context window 2854 // limit) until getToolPermissionContext() resolves. 2855 const logMessageCount = messagesForAPI.length 2856 const logMessageTokens = tokenCountFromLastAPIResponse(messagesForAPI) 2857 void options.getToolPermissionContext().then(permissionContext => { 2858 logAPISuccessAndDuration({ 2859 model: 2860 newMessages[0]?.message.model ?? partialMessage?.model ?? options.model, 2861 preNormalizedModel: options.model, 2862 usage, 2863 start, 2864 startIncludingRetries, 2865 attempt: attemptNumber, 2866 messageCount: logMessageCount, 2867 messageTokens: logMessageTokens, 2868 requestId: streamRequestId ?? null, 2869 stopReason, 2870 ttftMs, 2871 didFallBackToNonStreaming, 2872 querySource: options.querySource, 2873 headers: responseHeaders, 2874 costUSD, 2875 queryTracking: options.queryTracking, 2876 permissionMode: permissionContext.mode, 2877 // Pass newMessages for beta tracing - extraction happens in logging.ts 2878 // only when beta tracing is enabled 2879 newMessages, 2880 llmSpan, 2881 globalCacheStrategy, 2882 requestSetupMs: start - startIncludingRetries, 2883 attemptStartTimes, 2884 fastMode: isFastModeRequest, 2885 previousRequestId, 2886 betas: lastRequestBetas, 2887 }) 2888 }) 2889 2890 // Defensive: also release on normal completion (no-op if finally already ran). 2891 releaseStreamResources() 2892} 2893 2894/** 2895 * Cleans up stream resources to prevent memory leaks. 2896 * @internal Exported for testing 2897 */ 2898export function cleanupStream( 2899 stream: Stream<BetaRawMessageStreamEvent> | undefined, 2900): void { 2901 if (!stream) { 2902 return 2903 } 2904 try { 2905 // Abort the stream via its controller if not already aborted 2906 if (!stream.controller.signal.aborted) { 2907 stream.controller.abort() 2908 } 2909 } catch { 2910 // Ignore - stream may already be closed 2911 } 2912} 2913 2914/** 2915 * Updates usage statistics with new values from streaming API events. 2916 * Note: Anthropic's streaming API provides cumulative usage totals, not incremental deltas. 2917 * Each event contains the complete usage up to that point in the stream. 2918 * 2919 * Input-related tokens (input_tokens, cache_creation_input_tokens, cache_read_input_tokens) 2920 * are typically set in message_start and remain constant. message_delta events may send 2921 * explicit 0 values for these fields, which should not overwrite the values from message_start. 2922 * We only update these fields if they have a non-null, non-zero value. 2923 */ 2924export function updateUsage( 2925 usage: Readonly<NonNullableUsage>, 2926 partUsage: BetaMessageDeltaUsage | undefined, 2927): NonNullableUsage { 2928 if (!partUsage) { 2929 return { ...usage } 2930 } 2931 return { 2932 input_tokens: 2933 partUsage.input_tokens !== null && partUsage.input_tokens > 0 2934 ? partUsage.input_tokens 2935 : usage.input_tokens, 2936 cache_creation_input_tokens: 2937 partUsage.cache_creation_input_tokens !== null && 2938 partUsage.cache_creation_input_tokens > 0 2939 ? partUsage.cache_creation_input_tokens 2940 : usage.cache_creation_input_tokens, 2941 cache_read_input_tokens: 2942 partUsage.cache_read_input_tokens !== null && 2943 partUsage.cache_read_input_tokens > 0 2944 ? partUsage.cache_read_input_tokens 2945 : usage.cache_read_input_tokens, 2946 output_tokens: partUsage.output_tokens ?? usage.output_tokens, 2947 server_tool_use: { 2948 web_search_requests: 2949 partUsage.server_tool_use?.web_search_requests ?? 2950 usage.server_tool_use.web_search_requests, 2951 web_fetch_requests: 2952 partUsage.server_tool_use?.web_fetch_requests ?? 2953 usage.server_tool_use.web_fetch_requests, 2954 }, 2955 service_tier: usage.service_tier, 2956 cache_creation: { 2957 // SDK type BetaMessageDeltaUsage is missing cache_creation, but it's real! 2958 ephemeral_1h_input_tokens: 2959 (partUsage as BetaUsage).cache_creation?.ephemeral_1h_input_tokens ?? 2960 usage.cache_creation.ephemeral_1h_input_tokens, 2961 ephemeral_5m_input_tokens: 2962 (partUsage as BetaUsage).cache_creation?.ephemeral_5m_input_tokens ?? 2963 usage.cache_creation.ephemeral_5m_input_tokens, 2964 }, 2965 // cache_deleted_input_tokens: returned by the API when cache editing 2966 // deletes KV cache content, but not in SDK types. Kept off NonNullableUsage 2967 // so the string is eliminated from external builds by dead code elimination. 2968 // Uses the same > 0 guard as other token fields to prevent message_delta 2969 // from overwriting the real value with 0. 2970 ...(feature('CACHED_MICROCOMPACT') 2971 ? { 2972 cache_deleted_input_tokens: 2973 (partUsage as unknown as { cache_deleted_input_tokens?: number }) 2974 .cache_deleted_input_tokens != null && 2975 (partUsage as unknown as { cache_deleted_input_tokens: number }) 2976 .cache_deleted_input_tokens > 0 2977 ? (partUsage as unknown as { cache_deleted_input_tokens: number }) 2978 .cache_deleted_input_tokens 2979 : ((usage as unknown as { cache_deleted_input_tokens?: number }) 2980 .cache_deleted_input_tokens ?? 0), 2981 } 2982 : {}), 2983 inference_geo: usage.inference_geo, 2984 iterations: partUsage.iterations ?? usage.iterations, 2985 speed: (partUsage as BetaUsage).speed ?? usage.speed, 2986 } 2987} 2988 2989/** 2990 * Accumulates usage from one message into a total usage object. 2991 * Used to track cumulative usage across multiple assistant turns. 2992 */ 2993export function accumulateUsage( 2994 totalUsage: Readonly<NonNullableUsage>, 2995 messageUsage: Readonly<NonNullableUsage>, 2996): NonNullableUsage { 2997 return { 2998 input_tokens: totalUsage.input_tokens + messageUsage.input_tokens, 2999 cache_creation_input_tokens: 3000 totalUsage.cache_creation_input_tokens + 3001 messageUsage.cache_creation_input_tokens, 3002 cache_read_input_tokens: 3003 totalUsage.cache_read_input_tokens + messageUsage.cache_read_input_tokens, 3004 output_tokens: totalUsage.output_tokens + messageUsage.output_tokens, 3005 server_tool_use: { 3006 web_search_requests: 3007 totalUsage.server_tool_use.web_search_requests + 3008 messageUsage.server_tool_use.web_search_requests, 3009 web_fetch_requests: 3010 totalUsage.server_tool_use.web_fetch_requests + 3011 messageUsage.server_tool_use.web_fetch_requests, 3012 }, 3013 service_tier: messageUsage.service_tier, // Use the most recent service tier 3014 cache_creation: { 3015 ephemeral_1h_input_tokens: 3016 totalUsage.cache_creation.ephemeral_1h_input_tokens + 3017 messageUsage.cache_creation.ephemeral_1h_input_tokens, 3018 ephemeral_5m_input_tokens: 3019 totalUsage.cache_creation.ephemeral_5m_input_tokens + 3020 messageUsage.cache_creation.ephemeral_5m_input_tokens, 3021 }, 3022 // See comment in updateUsage — field is not on NonNullableUsage to keep 3023 // the string out of external builds. 3024 ...(feature('CACHED_MICROCOMPACT') 3025 ? { 3026 cache_deleted_input_tokens: 3027 ((totalUsage as unknown as { cache_deleted_input_tokens?: number }) 3028 .cache_deleted_input_tokens ?? 0) + 3029 (( 3030 messageUsage as unknown as { cache_deleted_input_tokens?: number } 3031 ).cache_deleted_input_tokens ?? 0), 3032 } 3033 : {}), 3034 inference_geo: messageUsage.inference_geo, // Use the most recent 3035 iterations: messageUsage.iterations, // Use the most recent 3036 speed: messageUsage.speed, // Use the most recent 3037 } 3038} 3039 3040function isToolResultBlock( 3041 block: unknown, 3042): block is { type: 'tool_result'; tool_use_id: string } { 3043 return ( 3044 block !== null && 3045 typeof block === 'object' && 3046 'type' in block && 3047 (block as { type: string }).type === 'tool_result' && 3048 'tool_use_id' in block 3049 ) 3050} 3051 3052type CachedMCEditsBlock = { 3053 type: 'cache_edits' 3054 edits: { type: 'delete'; cache_reference: string }[] 3055} 3056 3057type CachedMCPinnedEdits = { 3058 userMessageIndex: number 3059 block: CachedMCEditsBlock 3060} 3061 3062// Exported for testing cache_reference placement constraints 3063export function addCacheBreakpoints( 3064 messages: (UserMessage | AssistantMessage)[], 3065 enablePromptCaching: boolean, 3066 querySource?: QuerySource, 3067 useCachedMC = false, 3068 newCacheEdits?: CachedMCEditsBlock | null, 3069 pinnedEdits?: CachedMCPinnedEdits[], 3070 skipCacheWrite = false, 3071): MessageParam[] { 3072 logEvent('tengu_api_cache_breakpoints', { 3073 totalMessageCount: messages.length, 3074 cachingEnabled: enablePromptCaching, 3075 skipCacheWrite, 3076 }) 3077 3078 // Exactly one message-level cache_control marker per request. Mycro's 3079 // turn-to-turn eviction (page_manager/index.rs: Index::insert) frees 3080 // local-attention KV pages at any cached prefix position NOT in 3081 // cache_store_int_token_boundaries. With two markers the second-to-last 3082 // position is protected and its locals survive an extra turn even though 3083 // nothing will ever resume from there — with one marker they're freed 3084 // immediately. For fire-and-forget forks (skipCacheWrite) we shift the 3085 // marker to the second-to-last message: that's the last shared-prefix 3086 // point, so the write is a no-op merge on mycro (entry already exists) 3087 // and the fork doesn't leave its own tail in the KVCC. Dense pages are 3088 // refcounted and survive via the new hash either way. 3089 const markerIndex = skipCacheWrite ? messages.length - 2 : messages.length - 1 3090 const result = messages.map((msg, index) => { 3091 const addCache = index === markerIndex 3092 if (msg.type === 'user') { 3093 return userMessageToMessageParam( 3094 msg, 3095 addCache, 3096 enablePromptCaching, 3097 querySource, 3098 ) 3099 } 3100 return assistantMessageToMessageParam( 3101 msg, 3102 addCache, 3103 enablePromptCaching, 3104 querySource, 3105 ) 3106 }) 3107 3108 if (!useCachedMC) { 3109 return result 3110 } 3111 3112 // Track all cache_references being deleted to prevent duplicates across blocks. 3113 const seenDeleteRefs = new Set<string>() 3114 3115 // Helper to deduplicate a cache_edits block against already-seen deletions 3116 const deduplicateEdits = (block: CachedMCEditsBlock): CachedMCEditsBlock => { 3117 const uniqueEdits = block.edits.filter(edit => { 3118 if (seenDeleteRefs.has(edit.cache_reference)) { 3119 return false 3120 } 3121 seenDeleteRefs.add(edit.cache_reference) 3122 return true 3123 }) 3124 return { ...block, edits: uniqueEdits } 3125 } 3126 3127 // Re-insert all previously-pinned cache_edits at their original positions 3128 for (const pinned of pinnedEdits ?? []) { 3129 const msg = result[pinned.userMessageIndex] 3130 if (msg && msg.role === 'user') { 3131 if (!Array.isArray(msg.content)) { 3132 msg.content = [{ type: 'text', text: msg.content as string }] 3133 } 3134 const dedupedBlock = deduplicateEdits(pinned.block) 3135 if (dedupedBlock.edits.length > 0) { 3136 insertBlockAfterToolResults(msg.content, dedupedBlock) 3137 } 3138 } 3139 } 3140 3141 // Insert new cache_edits into the last user message and pin them 3142 if (newCacheEdits && result.length > 0) { 3143 const dedupedNewEdits = deduplicateEdits(newCacheEdits) 3144 if (dedupedNewEdits.edits.length > 0) { 3145 for (let i = result.length - 1; i >= 0; i--) { 3146 const msg = result[i] 3147 if (msg && msg.role === 'user') { 3148 if (!Array.isArray(msg.content)) { 3149 msg.content = [{ type: 'text', text: msg.content as string }] 3150 } 3151 insertBlockAfterToolResults(msg.content, dedupedNewEdits) 3152 // Pin so this block is re-sent at the same position in future calls 3153 pinCacheEdits(i, newCacheEdits) 3154 3155 logForDebugging( 3156 `Added cache_edits block with ${dedupedNewEdits.edits.length} deletion(s) to message[${i}]: ${dedupedNewEdits.edits.map(e => e.cache_reference).join(', ')}`, 3157 ) 3158 break 3159 } 3160 } 3161 } 3162 } 3163 3164 // Add cache_reference to tool_result blocks that are within the cached prefix. 3165 // Must be done AFTER cache_edits insertion since that modifies content arrays. 3166 if (enablePromptCaching) { 3167 // Find the last message containing a cache_control marker 3168 let lastCCMsg = -1 3169 for (let i = 0; i < result.length; i++) { 3170 const msg = result[i]! 3171 if (Array.isArray(msg.content)) { 3172 for (const block of msg.content) { 3173 if (block && typeof block === 'object' && 'cache_control' in block) { 3174 lastCCMsg = i 3175 } 3176 } 3177 } 3178 } 3179 3180 // Add cache_reference to tool_result blocks that are strictly before 3181 // the last cache_control marker. The API requires cache_reference to 3182 // appear "before or on" the last cache_control — we use strict "before" 3183 // to avoid edge cases where cache_edits splicing shifts block indices. 3184 // 3185 // Create new objects instead of mutating in-place to avoid contaminating 3186 // blocks reused by secondary queries that use models without cache_editing support. 3187 if (lastCCMsg >= 0) { 3188 for (let i = 0; i < lastCCMsg; i++) { 3189 const msg = result[i]! 3190 if (msg.role !== 'user' || !Array.isArray(msg.content)) { 3191 continue 3192 } 3193 let cloned = false 3194 for (let j = 0; j < msg.content.length; j++) { 3195 const block = msg.content[j] 3196 if (block && isToolResultBlock(block)) { 3197 if (!cloned) { 3198 msg.content = [...msg.content] 3199 cloned = true 3200 } 3201 msg.content[j] = Object.assign({}, block, { 3202 cache_reference: block.tool_use_id, 3203 }) 3204 } 3205 } 3206 } 3207 } 3208 } 3209 3210 return result 3211} 3212 3213export function buildSystemPromptBlocks( 3214 systemPrompt: SystemPrompt, 3215 enablePromptCaching: boolean, 3216 options?: { 3217 skipGlobalCacheForSystemPrompt?: boolean 3218 querySource?: QuerySource 3219 }, 3220): TextBlockParam[] { 3221 // IMPORTANT: Do not add any more blocks for caching or you will get a 400 3222 return splitSysPromptPrefix(systemPrompt, { 3223 skipGlobalCacheForSystemPrompt: options?.skipGlobalCacheForSystemPrompt, 3224 }).map(block => { 3225 return { 3226 type: 'text' as const, 3227 text: block.text, 3228 ...(enablePromptCaching && 3229 block.cacheScope !== null && { 3230 cache_control: getCacheControl({ 3231 scope: block.cacheScope, 3232 querySource: options?.querySource, 3233 }), 3234 }), 3235 } 3236 }) 3237} 3238 3239type HaikuOptions = Omit<Options, 'model' | 'getToolPermissionContext'> 3240 3241export async function queryHaiku({ 3242 systemPrompt = asSystemPrompt([]), 3243 userPrompt, 3244 outputFormat, 3245 signal, 3246 options, 3247}: { 3248 systemPrompt: SystemPrompt 3249 userPrompt: string 3250 outputFormat?: BetaJSONOutputFormat 3251 signal: AbortSignal 3252 options: HaikuOptions 3253}): Promise<AssistantMessage> { 3254 const result = await withVCR( 3255 [ 3256 createUserMessage({ 3257 content: systemPrompt.map(text => ({ type: 'text', text })), 3258 }), 3259 createUserMessage({ 3260 content: userPrompt, 3261 }), 3262 ], 3263 async () => { 3264 const messages = [ 3265 createUserMessage({ 3266 content: userPrompt, 3267 }), 3268 ] 3269 3270 const result = await queryModelWithoutStreaming({ 3271 messages, 3272 systemPrompt, 3273 thinkingConfig: { type: 'disabled' }, 3274 tools: [], 3275 signal, 3276 options: { 3277 ...options, 3278 model: getSmallFastModel(), 3279 enablePromptCaching: options.enablePromptCaching ?? false, 3280 outputFormat, 3281 async getToolPermissionContext() { 3282 return getEmptyToolPermissionContext() 3283 }, 3284 }, 3285 }) 3286 return [result] 3287 }, 3288 ) 3289 // We don't use streaming for Haiku so this is safe 3290 return result[0]! as AssistantMessage 3291} 3292 3293type QueryWithModelOptions = Omit<Options, 'getToolPermissionContext'> 3294 3295/** 3296 * Query a specific model through the Claude Code infrastructure. 3297 * This goes through the full query pipeline including proper authentication, 3298 * betas, and headers - unlike direct API calls. 3299 */ 3300export async function queryWithModel({ 3301 systemPrompt = asSystemPrompt([]), 3302 userPrompt, 3303 outputFormat, 3304 signal, 3305 options, 3306}: { 3307 systemPrompt: SystemPrompt 3308 userPrompt: string 3309 outputFormat?: BetaJSONOutputFormat 3310 signal: AbortSignal 3311 options: QueryWithModelOptions 3312}): Promise<AssistantMessage> { 3313 const result = await withVCR( 3314 [ 3315 createUserMessage({ 3316 content: systemPrompt.map(text => ({ type: 'text', text })), 3317 }), 3318 createUserMessage({ 3319 content: userPrompt, 3320 }), 3321 ], 3322 async () => { 3323 const messages = [ 3324 createUserMessage({ 3325 content: userPrompt, 3326 }), 3327 ] 3328 3329 const result = await queryModelWithoutStreaming({ 3330 messages, 3331 systemPrompt, 3332 thinkingConfig: { type: 'disabled' }, 3333 tools: [], 3334 signal, 3335 options: { 3336 ...options, 3337 enablePromptCaching: options.enablePromptCaching ?? false, 3338 outputFormat, 3339 async getToolPermissionContext() { 3340 return getEmptyToolPermissionContext() 3341 }, 3342 }, 3343 }) 3344 return [result] 3345 }, 3346 ) 3347 return result[0]! as AssistantMessage 3348} 3349 3350// Non-streaming requests have a 10min max per the docs: 3351// https://platform.claude.com/docs/en/api/errors#long-requests 3352// The SDK's 21333-token cap is derived from 10min × 128k tokens/hour, but we 3353// bypass it by setting a client-level timeout, so we can cap higher. 3354export const MAX_NON_STREAMING_TOKENS = 64_000 3355 3356/** 3357 * Adjusts thinking budget when max_tokens is capped for non-streaming fallback. 3358 * Ensures the API constraint: max_tokens > thinking.budget_tokens 3359 * 3360 * @param params - The parameters that will be sent to the API 3361 * @param maxTokensCap - The maximum allowed tokens (MAX_NON_STREAMING_TOKENS) 3362 * @returns Adjusted parameters with thinking budget capped if needed 3363 */ 3364export function adjustParamsForNonStreaming< 3365 T extends { 3366 max_tokens: number 3367 thinking?: BetaMessageStreamParams['thinking'] 3368 }, 3369>(params: T, maxTokensCap: number): T { 3370 const cappedMaxTokens = Math.min(params.max_tokens, maxTokensCap) 3371 3372 // Adjust thinking budget if it would exceed capped max_tokens 3373 // to maintain the constraint: max_tokens > thinking.budget_tokens 3374 const adjustedParams = { ...params } 3375 if ( 3376 adjustedParams.thinking?.type === 'enabled' && 3377 adjustedParams.thinking.budget_tokens 3378 ) { 3379 adjustedParams.thinking = { 3380 ...adjustedParams.thinking, 3381 budget_tokens: Math.min( 3382 adjustedParams.thinking.budget_tokens, 3383 cappedMaxTokens - 1, // Must be at least 1 less than max_tokens 3384 ), 3385 } 3386 } 3387 3388 return { 3389 ...adjustedParams, 3390 max_tokens: cappedMaxTokens, 3391 } 3392} 3393 3394function isMaxTokensCapEnabled(): boolean { 3395 // 3P default: false (not validated on Bedrock/Vertex) 3396 return getFeatureValue_CACHED_MAY_BE_STALE('tengu_otk_slot_v1', false) 3397} 3398 3399export function getMaxOutputTokensForModel(model: string): number { 3400 const maxOutputTokens = getModelMaxOutputTokens(model) 3401 3402 // Slot-reservation cap: drop default to 8k for all models. BQ p99 output 3403 // = 4,911 tokens; 32k/64k defaults over-reserve 8-16× slot capacity. 3404 // Requests hitting the cap get one clean retry at 64k (query.ts 3405 // max_output_tokens_escalate). Math.min keeps models with lower native 3406 // defaults (e.g. claude-3-opus at 4k) at their native value. Applied 3407 // before the env-var override so CLAUDE_CODE_MAX_OUTPUT_TOKENS still wins. 3408 const defaultTokens = isMaxTokensCapEnabled() 3409 ? Math.min(maxOutputTokens.default, CAPPED_DEFAULT_MAX_TOKENS) 3410 : maxOutputTokens.default 3411 3412 const result = validateBoundedIntEnvVar( 3413 'CLAUDE_CODE_MAX_OUTPUT_TOKENS', 3414 process.env.CLAUDE_CODE_MAX_OUTPUT_TOKENS, 3415 defaultTokens, 3416 maxOutputTokens.upperLimit, 3417 ) 3418 return result.effective 3419}