source dump of claude code
at main 188 lines 5.5 kB view raw
1import type { ToolUseBlock } from '@anthropic-ai/sdk/resources/index.mjs' 2import type { CanUseToolFn } from '../../hooks/useCanUseTool.js' 3import { findToolByName, type ToolUseContext } from '../../Tool.js' 4import type { AssistantMessage, Message } from '../../types/message.js' 5import { all } from '../../utils/generators.js' 6import { type MessageUpdateLazy, runToolUse } from './toolExecution.js' 7 8function getMaxToolUseConcurrency(): number { 9 return ( 10 parseInt(process.env.CLAUDE_CODE_MAX_TOOL_USE_CONCURRENCY || '', 10) || 10 11 ) 12} 13 14export type MessageUpdate = { 15 message?: Message 16 newContext: ToolUseContext 17} 18 19export async function* runTools( 20 toolUseMessages: ToolUseBlock[], 21 assistantMessages: AssistantMessage[], 22 canUseTool: CanUseToolFn, 23 toolUseContext: ToolUseContext, 24): AsyncGenerator<MessageUpdate, void> { 25 let currentContext = toolUseContext 26 for (const { isConcurrencySafe, blocks } of partitionToolCalls( 27 toolUseMessages, 28 currentContext, 29 )) { 30 if (isConcurrencySafe) { 31 const queuedContextModifiers: Record< 32 string, 33 ((context: ToolUseContext) => ToolUseContext)[] 34 > = {} 35 // Run read-only batch concurrently 36 for await (const update of runToolsConcurrently( 37 blocks, 38 assistantMessages, 39 canUseTool, 40 currentContext, 41 )) { 42 if (update.contextModifier) { 43 const { toolUseID, modifyContext } = update.contextModifier 44 if (!queuedContextModifiers[toolUseID]) { 45 queuedContextModifiers[toolUseID] = [] 46 } 47 queuedContextModifiers[toolUseID].push(modifyContext) 48 } 49 yield { 50 message: update.message, 51 newContext: currentContext, 52 } 53 } 54 for (const block of blocks) { 55 const modifiers = queuedContextModifiers[block.id] 56 if (!modifiers) { 57 continue 58 } 59 for (const modifier of modifiers) { 60 currentContext = modifier(currentContext) 61 } 62 } 63 yield { newContext: currentContext } 64 } else { 65 // Run non-read-only batch serially 66 for await (const update of runToolsSerially( 67 blocks, 68 assistantMessages, 69 canUseTool, 70 currentContext, 71 )) { 72 if (update.newContext) { 73 currentContext = update.newContext 74 } 75 yield { 76 message: update.message, 77 newContext: currentContext, 78 } 79 } 80 } 81 } 82} 83 84type Batch = { isConcurrencySafe: boolean; blocks: ToolUseBlock[] } 85 86/** 87 * Partition tool calls into batches where each batch is either: 88 * 1. A single non-read-only tool, or 89 * 2. Multiple consecutive read-only tools 90 */ 91function partitionToolCalls( 92 toolUseMessages: ToolUseBlock[], 93 toolUseContext: ToolUseContext, 94): Batch[] { 95 return toolUseMessages.reduce((acc: Batch[], toolUse) => { 96 const tool = findToolByName(toolUseContext.options.tools, toolUse.name) 97 const parsedInput = tool?.inputSchema.safeParse(toolUse.input) 98 const isConcurrencySafe = parsedInput?.success 99 ? (() => { 100 try { 101 return Boolean(tool?.isConcurrencySafe(parsedInput.data)) 102 } catch { 103 // If isConcurrencySafe throws (e.g., due to shell-quote parse failure), 104 // treat as not concurrency-safe to be conservative 105 return false 106 } 107 })() 108 : false 109 if (isConcurrencySafe && acc[acc.length - 1]?.isConcurrencySafe) { 110 acc[acc.length - 1]!.blocks.push(toolUse) 111 } else { 112 acc.push({ isConcurrencySafe, blocks: [toolUse] }) 113 } 114 return acc 115 }, []) 116} 117 118async function* runToolsSerially( 119 toolUseMessages: ToolUseBlock[], 120 assistantMessages: AssistantMessage[], 121 canUseTool: CanUseToolFn, 122 toolUseContext: ToolUseContext, 123): AsyncGenerator<MessageUpdate, void> { 124 let currentContext = toolUseContext 125 126 for (const toolUse of toolUseMessages) { 127 toolUseContext.setInProgressToolUseIDs(prev => 128 new Set(prev).add(toolUse.id), 129 ) 130 for await (const update of runToolUse( 131 toolUse, 132 assistantMessages.find(_ => 133 _.message.content.some( 134 _ => _.type === 'tool_use' && _.id === toolUse.id, 135 ), 136 )!, 137 canUseTool, 138 currentContext, 139 )) { 140 if (update.contextModifier) { 141 currentContext = update.contextModifier.modifyContext(currentContext) 142 } 143 yield { 144 message: update.message, 145 newContext: currentContext, 146 } 147 } 148 markToolUseAsComplete(toolUseContext, toolUse.id) 149 } 150} 151 152async function* runToolsConcurrently( 153 toolUseMessages: ToolUseBlock[], 154 assistantMessages: AssistantMessage[], 155 canUseTool: CanUseToolFn, 156 toolUseContext: ToolUseContext, 157): AsyncGenerator<MessageUpdateLazy, void> { 158 yield* all( 159 toolUseMessages.map(async function* (toolUse) { 160 toolUseContext.setInProgressToolUseIDs(prev => 161 new Set(prev).add(toolUse.id), 162 ) 163 yield* runToolUse( 164 toolUse, 165 assistantMessages.find(_ => 166 _.message.content.some( 167 _ => _.type === 'tool_use' && _.id === toolUse.id, 168 ), 169 )!, 170 canUseTool, 171 toolUseContext, 172 ) 173 markToolUseAsComplete(toolUseContext, toolUse.id) 174 }), 175 getMaxToolUseConcurrency(), 176 ) 177} 178 179function markToolUseAsComplete( 180 toolUseContext: ToolUseContext, 181 toolUseID: string, 182) { 183 toolUseContext.setInProgressToolUseIDs(prev => { 184 const next = new Set(prev) 185 next.delete(toolUseID) 186 return next 187 }) 188}