/** * In-process teammate runner * * Wraps runAgent() for in-process teammates, providing: * - AsyncLocalStorage-based context isolation via runWithTeammateContext() * - Progress tracking and AppState updates * - Idle notification to leader when complete * - Plan mode approval flow support * - Cleanup on completion or abort */ import { feature } from 'bun:bundle' import type { ContentBlockParam } from '@anthropic-ai/sdk/resources/messages.mjs' import { getSystemPrompt } from '../../constants/prompts.js' import { TEAMMATE_MESSAGE_TAG } from '../../constants/xml.js' import type { CanUseToolFn } from '../../hooks/useCanUseTool.js' import { processMailboxPermissionResponse, registerPermissionCallback, unregisterPermissionCallback, } from '../../hooks/useSwarmPermissionPoller.js' import { type AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, logEvent, } from '../../services/analytics/index.js' import { getAutoCompactThreshold } from '../../services/compact/autoCompact.js' import { buildPostCompactMessages, compactConversation, ERROR_MESSAGE_USER_ABORT, } from '../../services/compact/compact.js' import { resetMicrocompactState } from '../../services/compact/microCompact.js' import type { AppState } from '../../state/AppState.js' import type { Tool, ToolUseContext } from '../../Tool.js' import { appendTeammateMessage } from '../../tasks/InProcessTeammateTask/InProcessTeammateTask.js' import type { InProcessTeammateTaskState, TeammateIdentity, } from '../../tasks/InProcessTeammateTask/types.js' import { appendCappedMessage } from '../../tasks/InProcessTeammateTask/types.js' import { createActivityDescriptionResolver, createProgressTracker, getProgressUpdate, updateProgressFromMessage, } from '../../tasks/LocalAgentTask/LocalAgentTask.js' import type { CustomAgentDefinition } from '../../tools/AgentTool/loadAgentsDir.js' import { runAgent } from '../../tools/AgentTool/runAgent.js' import { awaitClassifierAutoApproval } from '../../tools/BashTool/bashPermissions.js' import { BASH_TOOL_NAME } from '../../tools/BashTool/toolName.js' import { SEND_MESSAGE_TOOL_NAME } from '../../tools/SendMessageTool/constants.js' import { TASK_CREATE_TOOL_NAME } from '../../tools/TaskCreateTool/constants.js' import { TASK_GET_TOOL_NAME } from '../../tools/TaskGetTool/constants.js' import { TASK_LIST_TOOL_NAME } from '../../tools/TaskListTool/constants.js' import { TASK_UPDATE_TOOL_NAME } from '../../tools/TaskUpdateTool/constants.js' import { TEAM_CREATE_TOOL_NAME } from '../../tools/TeamCreateTool/constants.js' import { TEAM_DELETE_TOOL_NAME } from '../../tools/TeamDeleteTool/constants.js' import type { Message } from '../../types/message.js' import type { PermissionDecision } from '../../types/permissions.js' import { createAssistantAPIErrorMessage, createUserMessage, } from '../../utils/messages.js' import { evictTaskOutput } from '../../utils/task/diskOutput.js' import { evictTerminalTask } from '../../utils/task/framework.js' import { tokenCountWithEstimation } from '../../utils/tokens.js' import { createAbortController } from '../abortController.js' import { type AgentContext, runWithAgentContext } from '../agentContext.js' import { count } from '../array.js' import { logForDebugging } from '../debug.js' import { cloneFileStateCache } from '../fileStateCache.js' import { SUBAGENT_REJECT_MESSAGE, SUBAGENT_REJECT_MESSAGE_WITH_REASON_PREFIX, } from '../messages.js' import type { ModelAlias } from '../model/aliases.js' import { applyPermissionUpdates, persistPermissionUpdates, } from '../permissions/PermissionUpdate.js' import type { PermissionUpdate } from '../permissions/PermissionUpdateSchema.js' import { hasPermissionsToUseTool } from '../permissions/permissions.js' import { emitTaskTerminatedSdk } from '../sdkEventQueue.js' import { sleep } from '../sleep.js' import { jsonStringify } from '../slowOperations.js' import { asSystemPrompt } from '../systemPromptType.js' import { claimTask, listTasks, type Task, updateTask } from '../tasks.js' import type { TeammateContext } from '../teammateContext.js' import { runWithTeammateContext } from '../teammateContext.js' import { createIdleNotification, getLastPeerDmSummary, isPermissionResponse, isShutdownRequest, markMessageAsReadByIndex, readMailbox, writeToMailbox, } from '../teammateMailbox.js' import { unregisterAgent as unregisterPerfettoAgent } from '../telemetry/perfettoTracing.js' import { createContentReplacementState } from '../toolResultStorage.js' import { TEAM_LEAD_NAME } from './constants.js' import { getLeaderSetToolPermissionContext, getLeaderToolUseConfirmQueue, } from './leaderPermissionBridge.js' import { createPermissionRequest, sendPermissionRequestViaMailbox, } from './permissionSync.js' import { TEAMMATE_SYSTEM_PROMPT_ADDENDUM } from './teammatePromptAddendum.js' type SetAppStateFn = (updater: (prev: AppState) => AppState) => void const PERMISSION_POLL_INTERVAL_MS = 500 /** * Creates a canUseTool function for in-process teammates that properly resolves * 'ask' permissions via the UI rather than treating them as denials. * * Always uses the leader's ToolUseConfirm dialog with a worker badge when * the bridge is available, giving teammates the same tool-specific UI * (BashPermissionRequest, FileEditToolDiff, etc.) as the leader's own tools. * * Falls back to the mailbox system when the bridge is unavailable: * sends a permission request to the leader's inbox, waits for the response * in the teammate's own mailbox. */ function createInProcessCanUseTool( identity: TeammateIdentity, abortController: AbortController, onPermissionWaitMs?: (waitMs: number) => void, ): CanUseToolFn { return async ( tool, input, toolUseContext, assistantMessage, toolUseID, forceDecision, ) => { const result = forceDecision ?? (await hasPermissionsToUseTool( tool, input, toolUseContext, assistantMessage, toolUseID, )) // Pass through allow/deny decisions directly if (result.behavior !== 'ask') { return result } // For bash commands, try classifier auto-approval before showing leader dialog. // Agents await the classifier result (rather than racing it against user // interaction like the main agent). if ( feature('BASH_CLASSIFIER') && tool.name === BASH_TOOL_NAME && result.pendingClassifierCheck ) { const classifierDecision = await awaitClassifierAutoApproval( result.pendingClassifierCheck, abortController.signal, toolUseContext.options.isNonInteractiveSession, ) if (classifierDecision) { return { behavior: 'allow', updatedInput: input as Record, decisionReason: classifierDecision, } } } // Check if aborted before showing UI if (abortController.signal.aborted) { return { behavior: 'ask', message: SUBAGENT_REJECT_MESSAGE } } const appState = toolUseContext.getAppState() const description = await (tool as Tool).description(input as never, { isNonInteractiveSession: toolUseContext.options.isNonInteractiveSession, toolPermissionContext: appState.toolPermissionContext, tools: toolUseContext.options.tools, }) if (abortController.signal.aborted) { return { behavior: 'ask', message: SUBAGENT_REJECT_MESSAGE } } const setToolUseConfirmQueue = getLeaderToolUseConfirmQueue() // Standard path: use ToolUseConfirm dialog with worker badge if (setToolUseConfirmQueue) { return new Promise(resolve => { let decisionMade = false const permissionStartMs = Date.now() // Report permission wait time to the caller so it can be // subtracted from the displayed elapsed time. const reportPermissionWait = () => { onPermissionWaitMs?.(Date.now() - permissionStartMs) } const onAbortListener = () => { if (decisionMade) return decisionMade = true reportPermissionWait() resolve({ behavior: 'ask', message: SUBAGENT_REJECT_MESSAGE }) setToolUseConfirmQueue(queue => queue.filter(item => item.toolUseID !== toolUseID), ) } abortController.signal.addEventListener('abort', onAbortListener, { once: true, }) setToolUseConfirmQueue(queue => [ ...queue, { assistantMessage, tool: tool as Tool, description, input, toolUseContext, toolUseID, permissionResult: result, permissionPromptStartTimeMs: permissionStartMs, workerBadge: identity.color ? { name: identity.agentName, color: identity.color } : undefined, onUserInteraction() { // No-op for teammates (no classifier auto-approval) }, onAbort() { if (decisionMade) return decisionMade = true abortController.signal.removeEventListener( 'abort', onAbortListener, ) reportPermissionWait() resolve({ behavior: 'ask', message: SUBAGENT_REJECT_MESSAGE }) }, async onAllow( updatedInput: Record, permissionUpdates: PermissionUpdate[], feedback?: string, contentBlocks?: ContentBlockParam[], ) { if (decisionMade) return decisionMade = true abortController.signal.removeEventListener( 'abort', onAbortListener, ) reportPermissionWait() persistPermissionUpdates(permissionUpdates) // Write back permission updates to the leader's shared context if (permissionUpdates.length > 0) { const setToolPermissionContext = getLeaderSetToolPermissionContext() if (setToolPermissionContext) { const currentAppState = toolUseContext.getAppState() const updatedContext = applyPermissionUpdates( currentAppState.toolPermissionContext, permissionUpdates, ) // Preserve the leader's mode to prevent workers' // transformed 'acceptEdits' context from leaking back // to the coordinator setToolPermissionContext(updatedContext, { preserveMode: true, }) } } const trimmedFeedback = feedback?.trim() resolve({ behavior: 'allow', updatedInput, userModified: false, acceptFeedback: trimmedFeedback || undefined, ...(contentBlocks && contentBlocks.length > 0 && { contentBlocks }), }) }, onReject(feedback?: string, contentBlocks?: ContentBlockParam[]) { if (decisionMade) return decisionMade = true abortController.signal.removeEventListener( 'abort', onAbortListener, ) reportPermissionWait() const message = feedback ? `${SUBAGENT_REJECT_MESSAGE_WITH_REASON_PREFIX}${feedback}` : SUBAGENT_REJECT_MESSAGE resolve({ behavior: 'ask', message, contentBlocks }) }, async recheckPermission() { if (decisionMade) return const freshResult = await hasPermissionsToUseTool( tool, input, toolUseContext, assistantMessage, toolUseID, ) if (freshResult.behavior === 'allow') { decisionMade = true abortController.signal.removeEventListener( 'abort', onAbortListener, ) reportPermissionWait() setToolUseConfirmQueue(queue => queue.filter(item => item.toolUseID !== toolUseID), ) resolve({ ...freshResult, updatedInput: input, userModified: false, }) } }, }, ]) }) } // Fallback: use mailbox system when leader UI queue is unavailable return new Promise(resolve => { const request = createPermissionRequest({ toolName: (tool as Tool).name, toolUseId: toolUseID, input, description, permissionSuggestions: result.suggestions, workerId: identity.agentId, workerName: identity.agentName, workerColor: identity.color, teamName: identity.teamName, }) // Register callback to be invoked when the leader responds registerPermissionCallback({ requestId: request.id, toolUseId: toolUseID, onAllow( updatedInput: Record | undefined, permissionUpdates: PermissionUpdate[], _feedback?: string, contentBlocks?: ContentBlockParam[], ) { cleanup() persistPermissionUpdates(permissionUpdates) const finalInput = updatedInput && Object.keys(updatedInput).length > 0 ? updatedInput : input resolve({ behavior: 'allow', updatedInput: finalInput, userModified: false, ...(contentBlocks && contentBlocks.length > 0 && { contentBlocks }), }) }, onReject(feedback?: string, contentBlocks?: ContentBlockParam[]) { cleanup() const message = feedback ? `${SUBAGENT_REJECT_MESSAGE_WITH_REASON_PREFIX}${feedback}` : SUBAGENT_REJECT_MESSAGE resolve({ behavior: 'ask', message, contentBlocks }) }, }) // Send request to leader's mailbox void sendPermissionRequestViaMailbox(request) // Poll teammate's mailbox for the response const pollInterval = setInterval( async (abortController, cleanup, resolve, identity, request) => { if (abortController.signal.aborted) { cleanup() resolve({ behavior: 'ask', message: SUBAGENT_REJECT_MESSAGE }) return } const allMessages = await readMailbox( identity.agentName, identity.teamName, ) for (let i = 0; i < allMessages.length; i++) { const msg = allMessages[i] if (msg && !msg.read) { const parsed = isPermissionResponse(msg.text) if (parsed && parsed.request_id === request.id) { await markMessageAsReadByIndex( identity.agentName, identity.teamName, i, ) if (parsed.subtype === 'success') { processMailboxPermissionResponse({ requestId: parsed.request_id, decision: 'approved', updatedInput: parsed.response?.updated_input, permissionUpdates: parsed.response?.permission_updates, }) } else { processMailboxPermissionResponse({ requestId: parsed.request_id, decision: 'rejected', feedback: parsed.error, }) } return // Callback already resolves the promise } } } }, PERMISSION_POLL_INTERVAL_MS, abortController, cleanup, resolve, identity, request, ) const onAbortListener = () => { cleanup() resolve({ behavior: 'ask', message: SUBAGENT_REJECT_MESSAGE }) } abortController.signal.addEventListener('abort', onAbortListener, { once: true, }) function cleanup() { clearInterval(pollInterval) unregisterPermissionCallback(request.id) abortController.signal.removeEventListener('abort', onAbortListener) } }) } } /** * Formats a message as XML for injection into the conversation. * This ensures the model sees messages in the same format as tmux teammates. */ function formatAsTeammateMessage( from: string, content: string, color?: string, summary?: string, ): string { const colorAttr = color ? ` color="${color}"` : '' const summaryAttr = summary ? ` summary="${summary}"` : '' return `<${TEAMMATE_MESSAGE_TAG} teammate_id="${from}"${colorAttr}${summaryAttr}>\n${content}\n` } /** * Configuration for running an in-process teammate. */ export type InProcessRunnerConfig = { /** Teammate identity for context */ identity: TeammateIdentity /** Task ID in AppState */ taskId: string /** Initial prompt for the teammate */ prompt: string /** Optional agent definition (for specialized agents) */ agentDefinition?: CustomAgentDefinition /** Teammate context for AsyncLocalStorage */ teammateContext: TeammateContext /** Parent's tool use context */ toolUseContext: ToolUseContext /** Abort controller linked to parent */ abortController: AbortController /** Optional model override for this teammate */ model?: string /** Optional system prompt override for this teammate */ systemPrompt?: string /** How to apply the system prompt: 'replace' or 'append' to default */ systemPromptMode?: 'default' | 'replace' | 'append' /** Tool permissions to auto-allow for this teammate */ allowedTools?: string[] /** Whether this teammate can show permission prompts for unlisted tools. * When false (default), unlisted tools are auto-denied. */ allowPermissionPrompts?: boolean /** Short description of the task (used as summary for the initial prompt header) */ description?: string /** request_id of the API call that spawned this teammate, for lineage * tracing on tengu_api_* events. */ invokingRequestId?: string } /** * Result from running an in-process teammate. */ export type InProcessRunnerResult = { /** Whether the run completed successfully */ success: boolean /** Error message if failed */ error?: string /** Messages produced by the agent */ messages: Message[] } /** * Updates task state in AppState. */ function updateTaskState( taskId: string, updater: (task: InProcessTeammateTaskState) => InProcessTeammateTaskState, setAppState: SetAppStateFn, ): void { setAppState(prev => { const task = prev.tasks[taskId] if (!task || task.type !== 'in_process_teammate') { return prev } const updated = updater(task) if (updated === task) { return prev } return { ...prev, tasks: { ...prev.tasks, [taskId]: updated, }, } }) } /** * Sends a message to the leader's file-based mailbox. * Uses the same mailbox system as tmux teammates for consistency. */ async function sendMessageToLeader( from: string, text: string, color: string | undefined, teamName: string, ): Promise { await writeToMailbox( TEAM_LEAD_NAME, { from, text, timestamp: new Date().toISOString(), color, }, teamName, ) } /** * Sends idle notification to the leader via file-based mailbox. * Uses agentName (not agentId) for consistency with process-based teammates. */ async function sendIdleNotification( agentName: string, agentColor: string | undefined, teamName: string, options?: { idleReason?: 'available' | 'interrupted' | 'failed' summary?: string completedTaskId?: string completedStatus?: 'resolved' | 'blocked' | 'failed' failureReason?: string }, ): Promise { const notification = createIdleNotification(agentName, options) await sendMessageToLeader( agentName, jsonStringify(notification), agentColor, teamName, ) } /** * Find an available task from the team's task list. * A task is available if it's pending, has no owner, and is not blocked. */ function findAvailableTask(tasks: Task[]): Task | undefined { const unresolvedTaskIds = new Set( tasks.filter(t => t.status !== 'completed').map(t => t.id), ) return tasks.find(task => { if (task.status !== 'pending') return false if (task.owner) return false return task.blockedBy.every(id => !unresolvedTaskIds.has(id)) }) } /** * Format a task as a prompt for the teammate to work on. */ function formatTaskAsPrompt(task: Task): string { let prompt = `Complete all open tasks. Start with task #${task.id}: \n\n ${task.subject}` if (task.description) { prompt += `\n\n${task.description}` } return prompt } /** * Try to claim an available task from the team's task list. * Returns the formatted prompt if a task was claimed, or undefined if none available. */ async function tryClaimNextTask( taskListId: string, agentName: string, ): Promise { try { const tasks = await listTasks(taskListId) const availableTask = findAvailableTask(tasks) if (!availableTask) { return undefined } const result = await claimTask(taskListId, availableTask.id, agentName) if (!result.success) { logForDebugging( `[inProcessRunner] Failed to claim task #${availableTask.id}: ${result.reason}`, ) return undefined } // Also set status to in_progress so the UI reflects it immediately await updateTask(taskListId, availableTask.id, { status: 'in_progress' }) logForDebugging( `[inProcessRunner] Claimed task #${availableTask.id}: ${availableTask.subject}`, ) return formatTaskAsPrompt(availableTask) } catch (err) { logForDebugging(`[inProcessRunner] Error checking task list: ${err}`) return undefined } } /** * Result of waiting for messages. */ type WaitResult = | { type: 'shutdown_request' request: ReturnType originalMessage: string } | { type: 'new_message' message: string from: string color?: string summary?: string } | { type: 'aborted' } /** * Waits for new prompts or shutdown request. * Polls the teammate's mailbox every 500ms, checking for: * - Shutdown request from leader (returned to caller for model decision) * - New messages/prompts from leader * - Abort signal * * This keeps the teammate alive in 'idle' state instead of terminating. * Does NOT auto-approve shutdown - the model should make that decision. */ async function waitForNextPromptOrShutdown( identity: TeammateIdentity, abortController: AbortController, taskId: string, getAppState: () => AppState, setAppState: SetAppStateFn, taskListId: string, ): Promise { const POLL_INTERVAL_MS = 500 logForDebugging( `[inProcessRunner] ${identity.agentName} starting poll loop (abort=${abortController.signal.aborted})`, ) let pollCount = 0 while (!abortController.signal.aborted) { // Check for in-memory pending messages on every iteration (from transcript viewing) const appState = getAppState() const task = appState.tasks[taskId] if ( task && task.type === 'in_process_teammate' && task.pendingUserMessages.length > 0 ) { const message = task.pendingUserMessages[0]! // Safe: checked length > 0 // Pop the message from the queue setAppState(prev => { const prevTask = prev.tasks[taskId] if (!prevTask || prevTask.type !== 'in_process_teammate') { return prev } return { ...prev, tasks: { ...prev.tasks, [taskId]: { ...prevTask, pendingUserMessages: prevTask.pendingUserMessages.slice(1), }, }, } }) logForDebugging( `[inProcessRunner] ${identity.agentName} found pending user message (poll #${pollCount})`, ) return { type: 'new_message', message, from: 'user', } } // Wait before next poll (skip on first iteration to check immediately) if (pollCount > 0) { await sleep(POLL_INTERVAL_MS) } pollCount++ // Check for abort if (abortController.signal.aborted) { logForDebugging( `[inProcessRunner] ${identity.agentName} aborted while waiting (poll #${pollCount})`, ) return { type: 'aborted' } } // Check for messages in mailbox logForDebugging( `[inProcessRunner] ${identity.agentName} poll #${pollCount}: checking mailbox`, ) try { // Read all messages and scan unread for shutdown requests first. // Shutdown requests are prioritized over regular messages to prevent // starvation when peer-to-peer messages flood the queue. const allMessages = await readMailbox( identity.agentName, identity.teamName, ) // Scan all unread messages for shutdown requests (highest priority). // readMailbox() already reads all messages from disk, so this scan // adds only ~1-2ms of JSON parsing overhead. let shutdownIndex = -1 let shutdownParsed: ReturnType = null for (let i = 0; i < allMessages.length; i++) { const m = allMessages[i] if (m && !m.read) { const parsed = isShutdownRequest(m.text) if (parsed) { shutdownIndex = i shutdownParsed = parsed break } } } if (shutdownIndex !== -1) { const msg = allMessages[shutdownIndex]! const skippedUnread = count( allMessages.slice(0, shutdownIndex), m => !m.read, ) logForDebugging( `[inProcessRunner] ${identity.agentName} received shutdown request from ${shutdownParsed?.from} (prioritized over ${skippedUnread} unread messages)`, ) await markMessageAsReadByIndex( identity.agentName, identity.teamName, shutdownIndex, ) return { type: 'shutdown_request', request: shutdownParsed, originalMessage: msg.text, } } // No shutdown request found. Prioritize team-lead messages over peer // messages — the leader represents user intent and coordination, so // their messages should not be starved behind peer-to-peer chatter. // Fall back to FIFO for peer messages. let selectedIndex = -1 // Check for unread team-lead messages first for (let i = 0; i < allMessages.length; i++) { const m = allMessages[i] if (m && !m.read && m.from === TEAM_LEAD_NAME) { selectedIndex = i break } } // Fall back to first unread message (any sender) if (selectedIndex === -1) { selectedIndex = allMessages.findIndex(m => !m.read) } if (selectedIndex !== -1) { const msg = allMessages[selectedIndex] if (msg) { logForDebugging( `[inProcessRunner] ${identity.agentName} received new message from ${msg.from} (index ${selectedIndex})`, ) await markMessageAsReadByIndex( identity.agentName, identity.teamName, selectedIndex, ) return { type: 'new_message', message: msg.text, from: msg.from, color: msg.color, summary: msg.summary, } } } } catch (err) { logForDebugging( `[inProcessRunner] ${identity.agentName} poll error: ${err}`, ) // Continue polling even if one read fails } // Check the team's task list for unclaimed tasks const taskPrompt = await tryClaimNextTask(taskListId, identity.agentName) if (taskPrompt) { return { type: 'new_message', message: taskPrompt, from: 'task-list', } } } logForDebugging( `[inProcessRunner] ${identity.agentName} exiting poll loop (abort=${abortController.signal.aborted}, polls=${pollCount})`, ) return { type: 'aborted' } } /** * Runs an in-process teammate with a continuous prompt loop. * * Executes runAgent() within the teammate's AsyncLocalStorage context, * tracks progress, updates task state, sends idle notification on completion, * then waits for new prompts or shutdown requests. * * Unlike background tasks, teammates stay alive and can receive multiple prompts. * The loop only exits on abort or after shutdown is approved by the model. * * @param config - Runner configuration * @returns Result with messages and success status */ export async function runInProcessTeammate( config: InProcessRunnerConfig, ): Promise { const { identity, taskId, prompt, description, agentDefinition, teammateContext, toolUseContext, abortController, model, systemPrompt, systemPromptMode, allowedTools, allowPermissionPrompts, invokingRequestId, } = config const { setAppState } = toolUseContext logForDebugging( `[inProcessRunner] Starting agent loop for ${identity.agentId}`, ) // Create AgentContext for analytics attribution const agentContext: AgentContext = { agentId: identity.agentId, parentSessionId: identity.parentSessionId, agentName: identity.agentName, teamName: identity.teamName, agentColor: identity.color, planModeRequired: identity.planModeRequired, isTeamLead: false, agentType: 'teammate', invokingRequestId, invocationKind: 'spawn', invocationEmitted: false, } // Build system prompt based on systemPromptMode let teammateSystemPrompt: string if (systemPromptMode === 'replace' && systemPrompt) { teammateSystemPrompt = systemPrompt } else { const fullSystemPromptParts = await getSystemPrompt( toolUseContext.options.tools, toolUseContext.options.mainLoopModel, undefined, toolUseContext.options.mcpClients, ) const systemPromptParts = [ ...fullSystemPromptParts, TEAMMATE_SYSTEM_PROMPT_ADDENDUM, ] // If custom agent definition provided, append its prompt if (agentDefinition) { const customPrompt = agentDefinition.getSystemPrompt() if (customPrompt) { systemPromptParts.push(`\n# Custom Agent Instructions\n${customPrompt}`) } // Log agent memory loaded event for in-process teammates if (agentDefinition.memory) { logEvent('tengu_agent_memory_loaded', { ...(process.env.USER_TYPE === 'ant' ? { agent_type: agentDefinition.agentType as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, } : {}), scope: agentDefinition.memory as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, source: 'in-process-teammate' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, }) } } // Append mode: add provided system prompt after default if (systemPromptMode === 'append' && systemPrompt) { systemPromptParts.push(systemPrompt) } teammateSystemPrompt = systemPromptParts.join('\n') } // Resolve agent definition - use full system prompt with teammate addendum // IMPORTANT: Set permissionMode to 'default' so teammates always get full tool // access regardless of the leader's permission mode. const resolvedAgentDefinition: CustomAgentDefinition = { agentType: identity.agentName, whenToUse: `In-process teammate: ${identity.agentName}`, getSystemPrompt: () => teammateSystemPrompt, // Inject team-essential tools so teammates can always respond to // shutdown requests, send messages, and coordinate via the task list, // even with explicit tool lists tools: agentDefinition?.tools ? [ ...new Set([ ...agentDefinition.tools, SEND_MESSAGE_TOOL_NAME, TEAM_CREATE_TOOL_NAME, TEAM_DELETE_TOOL_NAME, TASK_CREATE_TOOL_NAME, TASK_GET_TOOL_NAME, TASK_LIST_TOOL_NAME, TASK_UPDATE_TOOL_NAME, ]), ] : ['*'], source: 'projectSettings', permissionMode: 'default', // Propagate model from custom agent definition so getAgentModel() // can use it as a fallback when no tool-level model is specified ...(agentDefinition?.model ? { model: agentDefinition.model } : {}), } // All messages across all prompts const allMessages: Message[] = [] // Wrap initial prompt with XML for proper styling in transcript view const wrappedInitialPrompt = formatAsTeammateMessage( 'team-lead', prompt, undefined, description, ) let currentPrompt = wrappedInitialPrompt let shouldExit = false // Try to claim an available task immediately so the UI can show activity // from the very start. The idle loop handles claiming for subsequent tasks. // Use parentSessionId as the task list ID since the leader creates tasks // under its session ID, not the team name. await tryClaimNextTask(identity.parentSessionId, identity.agentName) try { // Add initial prompt to task.messages for display (wrapped with XML) updateTaskState( taskId, task => ({ ...task, messages: appendCappedMessage( task.messages, createUserMessage({ content: wrappedInitialPrompt }), ), }), setAppState, ) // Per-teammate content replacement state. The while-loop below calls // runAgent repeatedly over an accumulating `allMessages` buffer (which // carries FULL original tool result content, not previews — query() yields // originals, enforcement is non-mutating). Without persisting state across // iterations, each call gets a fresh empty state from createSubagentContext // and makes holistic replace-globally-largest decisions, diverging from // earlier iterations' incremental frozen-first decisions → wire prefix // differs → cache miss. Gated on parent to inherit feature-flag-off. let teammateReplacementState = toolUseContext.contentReplacementState ? createContentReplacementState() : undefined // Main teammate loop - runs until abort or shutdown approved while (!abortController.signal.aborted && !shouldExit) { logForDebugging( `[inProcessRunner] ${identity.agentId} processing prompt: ${currentPrompt.substring(0, 50)}...`, ) // Create a per-turn abort controller for this iteration. // This allows Escape to stop current work without killing the whole teammate. // The lifecycle abortController still kills the whole teammate if needed. const currentWorkAbortController = createAbortController() // Store the work controller in task state so UI can abort it updateTaskState( taskId, task => ({ ...task, currentWorkAbortController }), setAppState, ) // Prepare prompt messages for this iteration // For the first iteration, start fresh // For subsequent iterations, pass accumulated messages as context const userMessage = createUserMessage({ content: currentPrompt }) const promptMessages: Message[] = [userMessage] // Check if compaction is needed before building context let contextMessages = allMessages const tokenCount = tokenCountWithEstimation(allMessages) if ( tokenCount > getAutoCompactThreshold(toolUseContext.options.mainLoopModel) ) { logForDebugging( `[inProcessRunner] ${identity.agentId} compacting history (${tokenCount} tokens)`, ) // Create an isolated copy of toolUseContext so that compaction // does not clear the main session's readFileState cache or // trigger the main session's UI callbacks. const isolatedContext: ToolUseContext = { ...toolUseContext, readFileState: cloneFileStateCache(toolUseContext.readFileState), onCompactProgress: undefined, setStreamMode: undefined, } const compactedSummary = await compactConversation( allMessages, isolatedContext, { systemPrompt: asSystemPrompt([]), userContext: {}, systemContext: {}, toolUseContext: isolatedContext, forkContextMessages: [], }, true, // suppressFollowUpQuestions undefined, // customInstructions true, // isAutoCompact ) contextMessages = buildPostCompactMessages(compactedSummary) // Reset microcompact state since full compact replaces all // messages — old tool IDs are no longer relevant resetMicrocompactState() // Reset content replacement state — compact replaces all messages // so old tool_use_ids are gone. Stale Map entries are harmless // (UUID keys never match) but accumulate memory over long runs. if (teammateReplacementState) { teammateReplacementState = createContentReplacementState() } // Update allMessages in place with compacted version allMessages.length = 0 allMessages.push(...contextMessages) // Mirror compaction into task.messages — otherwise the AppState // mirror grows unbounded (500 turns = 500+ messages, 10-50MB). // Replace with the compacted messages, matching allMessages. updateTaskState( taskId, task => ({ ...task, messages: [...contextMessages, userMessage] }), setAppState, ) } // Pass previous messages as context to preserve conversation history // allMessages accumulates all previous messages (user + assistant) from prior iterations const forkContextMessages = contextMessages.length > 0 ? [...contextMessages] : undefined // Add the user message to allMessages so it's included in future context // This ensures the full conversation (user + assistant turns) is preserved allMessages.push(userMessage) // Create fresh progress tracker for this prompt const tracker = createProgressTracker() const resolveActivity = createActivityDescriptionResolver( toolUseContext.options.tools, ) const iterationMessages: Message[] = [] // Read current permission mode from task state (may have been cycled by leader via Shift+Tab) const currentAppState = toolUseContext.getAppState() const currentTask = currentAppState.tasks[taskId] const currentPermissionMode = currentTask && currentTask.type === 'in_process_teammate' ? currentTask.permissionMode : 'default' const iterationAgentDefinition = { ...resolvedAgentDefinition, permissionMode: currentPermissionMode, } // Track if this iteration was interrupted by work abort (not lifecycle abort) let workWasAborted = false // Run agent within contexts await runWithTeammateContext(teammateContext, async () => { return runWithAgentContext(agentContext, async () => { // Mark task as running (not idle) updateTaskState( taskId, task => ({ ...task, status: 'running', isIdle: false }), setAppState, ) // Run the normal agent loop - same runAgent() used by AgentTool/subagents. // This calls query() internally, so we share the core API infrastructure. // Pass forkContextMessages to preserve conversation history across prompts. // In-process teammates are async but run in the same process as the leader, // so they CAN show permission prompts (unlike true background agents). // Use currentWorkAbortController so Escape stops this turn only, not the teammate. for await (const message of runAgent({ agentDefinition: iterationAgentDefinition, promptMessages, toolUseContext, canUseTool: createInProcessCanUseTool( identity, currentWorkAbortController, (waitMs: number) => { updateTaskState( taskId, task => ({ ...task, totalPausedMs: (task.totalPausedMs ?? 0) + waitMs, }), setAppState, ) }, ), isAsync: true, canShowPermissionPrompts: allowPermissionPrompts ?? true, forkContextMessages, querySource: 'agent:custom', override: { abortController: currentWorkAbortController }, model: model as ModelAlias | undefined, preserveToolUseResults: true, availableTools: toolUseContext.options.tools, allowedTools, contentReplacementState: teammateReplacementState, })) { // Check lifecycle abort first (kills whole teammate) if (abortController.signal.aborted) { logForDebugging( `[inProcessRunner] ${identity.agentId} lifecycle aborted`, ) break } // Check work abort (stops current turn only) if (currentWorkAbortController.signal.aborted) { logForDebugging( `[inProcessRunner] ${identity.agentId} current work aborted (Escape pressed)`, ) workWasAborted = true break } iterationMessages.push(message) allMessages.push(message) updateProgressFromMessage( tracker, message, resolveActivity, toolUseContext.options.tools, ) const progress = getProgressUpdate(tracker) updateTaskState( taskId, task => { // Track in-progress tool use IDs for animation in transcript view let inProgressToolUseIDs = task.inProgressToolUseIDs if (message.type === 'assistant') { for (const block of message.message.content) { if (block.type === 'tool_use') { inProgressToolUseIDs = new Set([ ...(inProgressToolUseIDs ?? []), block.id, ]) } } } else if (message.type === 'user') { const content = message.message.content if (Array.isArray(content)) { for (const block of content) { if ( typeof block === 'object' && 'type' in block && block.type === 'tool_result' ) { if (inProgressToolUseIDs) { inProgressToolUseIDs = new Set(inProgressToolUseIDs) inProgressToolUseIDs.delete(block.tool_use_id) } } } } } return { ...task, progress, messages: appendCappedMessage(task.messages, message), inProgressToolUseIDs, } }, setAppState, ) } return { success: true, messages: iterationMessages } }) }) // Clear the work controller from state (it's no longer valid) updateTaskState( taskId, task => ({ ...task, currentWorkAbortController: undefined }), setAppState, ) // Check if lifecycle aborted during agent run (kills whole teammate) if (abortController.signal.aborted) { break } // If work was aborted (Escape), log it and add interrupt message, then continue to idle state if (workWasAborted) { logForDebugging( `[inProcessRunner] ${identity.agentId} work interrupted, returning to idle`, ) // Add interrupt message to teammate's messages so it appears in their scrollback const interruptMessage = createAssistantAPIErrorMessage({ content: ERROR_MESSAGE_USER_ABORT, }) updateTaskState( taskId, task => ({ ...task, messages: appendCappedMessage(task.messages, interruptMessage), }), setAppState, ) } // Check if already idle before updating (to skip duplicate notification) const prevAppState = toolUseContext.getAppState() const prevTask = prevAppState.tasks[taskId] const wasAlreadyIdle = prevTask?.type === 'in_process_teammate' && prevTask.isIdle // Mark task as idle (NOT completed) and notify any waiters updateTaskState( taskId, task => { // Call any registered idle callbacks task.onIdleCallbacks?.forEach(cb => cb()) return { ...task, isIdle: true, onIdleCallbacks: [] } }, setAppState, ) // Note: We do NOT automatically send the teammate's response to the leader. // Teammates should use the Teammate tool to communicate with the leader. // This matches process-based teammates where output is not visible to the leader. // Only send idle notification on transition to idle (not if already idle) if (!wasAlreadyIdle) { await sendIdleNotification( identity.agentName, identity.color, identity.teamName, { idleReason: workWasAborted ? 'interrupted' : 'available', summary: getLastPeerDmSummary(allMessages), }, ) } else { logForDebugging( `[inProcessRunner] Skipping duplicate idle notification for ${identity.agentName}`, ) } logForDebugging( `[inProcessRunner] ${identity.agentId} finished prompt, waiting for next`, ) // Wait for next message or shutdown const waitResult = await waitForNextPromptOrShutdown( identity, abortController, taskId, toolUseContext.getAppState, setAppState, identity.parentSessionId, ) switch (waitResult.type) { case 'shutdown_request': // Pass shutdown request to model for decision // Format as teammate-message for consistency with how tmux teammates receive it // The model will use approveShutdown or rejectShutdown tool logForDebugging( `[inProcessRunner] ${identity.agentId} received shutdown request - passing to model`, ) currentPrompt = formatAsTeammateMessage( waitResult.request?.from || 'team-lead', waitResult.originalMessage, ) // Add shutdown request to task.messages for transcript display appendTeammateMessage( taskId, createUserMessage({ content: currentPrompt }), setAppState, ) break case 'new_message': // New prompt from leader or teammate logForDebugging( `[inProcessRunner] ${identity.agentId} received new message from ${waitResult.from}`, ) // Messages from the user should be plain text (not wrapped in XML) // Messages from other teammates get XML wrapper for identification if (waitResult.from === 'user') { currentPrompt = waitResult.message } else { currentPrompt = formatAsTeammateMessage( waitResult.from, waitResult.message, waitResult.color, waitResult.summary, ) // Add to task.messages for transcript display (only for non-user messages) // Messages from 'user' come from pendingUserMessages which are already // added by injectUserMessageToTeammate appendTeammateMessage( taskId, createUserMessage({ content: currentPrompt }), setAppState, ) } break case 'aborted': logForDebugging( `[inProcessRunner] ${identity.agentId} aborted while waiting`, ) shouldExit = true break } } // Mark as completed when exiting the loop let alreadyTerminal = false let toolUseId: string | undefined updateTaskState( taskId, task => { // killInProcessTeammate may have already set status:killed + // notified:true + cleared fields. Don't overwrite (would flip // killed → completed and double-emit the SDK bookend). if (task.status !== 'running') { alreadyTerminal = true return task } toolUseId = task.toolUseId task.onIdleCallbacks?.forEach(cb => cb()) task.unregisterCleanup?.() return { ...task, status: 'completed' as const, notified: true, endTime: Date.now(), messages: task.messages?.length ? [task.messages.at(-1)!] : undefined, pendingUserMessages: [], inProgressToolUseIDs: undefined, abortController: undefined, unregisterCleanup: undefined, currentWorkAbortController: undefined, onIdleCallbacks: [], } }, setAppState, ) void evictTaskOutput(taskId) // Eagerly evict task from AppState since it's been consumed evictTerminalTask(taskId, setAppState) // notified:true pre-set → no XML notification → print.ts won't emit // the SDK task_notification. Close the task_started bookend directly. if (!alreadyTerminal) { emitTaskTerminatedSdk(taskId, 'completed', { toolUseId, summary: identity.agentId, }) } unregisterPerfettoAgent(identity.agentId) return { success: true, messages: allMessages } } catch (error) { const errorMessage = error instanceof Error ? error.message : 'Unknown error' logForDebugging( `[inProcessRunner] Agent ${identity.agentId} failed: ${errorMessage}`, ) // Mark task as failed and notify any waiters let alreadyTerminal = false let toolUseId: string | undefined updateTaskState( taskId, task => { if (task.status !== 'running') { alreadyTerminal = true return task } toolUseId = task.toolUseId task.onIdleCallbacks?.forEach(cb => cb()) task.unregisterCleanup?.() return { ...task, status: 'failed' as const, notified: true, error: errorMessage, isIdle: true, endTime: Date.now(), onIdleCallbacks: [], messages: task.messages?.length ? [task.messages.at(-1)!] : undefined, pendingUserMessages: [], inProgressToolUseIDs: undefined, abortController: undefined, unregisterCleanup: undefined, currentWorkAbortController: undefined, } }, setAppState, ) void evictTaskOutput(taskId) // Eagerly evict task from AppState since it's been consumed evictTerminalTask(taskId, setAppState) // notified:true pre-set → no XML notification → close SDK bookend directly. if (!alreadyTerminal) { emitTaskTerminatedSdk(taskId, 'failed', { toolUseId, summary: identity.agentId, }) } // Send idle notification with failure via file-based mailbox await sendIdleNotification( identity.agentName, identity.color, identity.teamName, { idleReason: 'failed', completedStatus: 'failed', failureReason: errorMessage, }, ) unregisterPerfettoAgent(identity.agentId) return { success: false, error: errorMessage, messages: allMessages, } } } /** * Starts an in-process teammate in the background. * * This is the main entry point called after spawn. It starts the agent * execution loop in a fire-and-forget manner. * * @param config - Runner configuration */ export function startInProcessTeammate(config: InProcessRunnerConfig): void { // Extract agentId before the closure so the catch handler doesn't retain // the full config object (including toolUseContext) while the promise is // pending - which can be hours for a long-running teammate. const agentId = config.identity.agentId void runInProcessTeammate(config).catch(error => { logForDebugging(`[inProcessRunner] Unhandled error in ${agentId}: ${error}`) }) }