source dump of claude code
at main 308 lines 9.9 kB view raw
1import { 2 OUTPUT_FILE_TAG, 3 STATUS_TAG, 4 SUMMARY_TAG, 5 TASK_ID_TAG, 6 TASK_NOTIFICATION_TAG, 7 TASK_TYPE_TAG, 8 TOOL_USE_ID_TAG, 9} from '../../constants/xml.js' 10import type { AppState } from '../../state/AppState.js' 11import { 12 isTerminalTaskStatus, 13 type TaskStatus, 14 type TaskType, 15} from '../../Task.js' 16import type { TaskState } from '../../tasks/types.js' 17import { enqueuePendingNotification } from '../messageQueueManager.js' 18import { enqueueSdkEvent } from '../sdkEventQueue.js' 19import { getTaskOutputDelta, getTaskOutputPath } from './diskOutput.js' 20 21// Standard polling interval for all tasks 22export const POLL_INTERVAL_MS = 1000 23 24// Duration to display killed tasks before eviction 25export const STOPPED_DISPLAY_MS = 3_000 26 27// Grace period for terminal local_agent tasks in the coordinator panel 28export const PANEL_GRACE_MS = 30_000 29 30// Attachment type for task status updates 31export type TaskAttachment = { 32 type: 'task_status' 33 taskId: string 34 toolUseId?: string 35 taskType: TaskType 36 status: TaskStatus 37 description: string 38 deltaSummary: string | null // New output since last attachment 39} 40 41type SetAppState = (updater: (prev: AppState) => AppState) => void 42 43/** 44 * Update a task's state in AppState. 45 * Helper function for task implementations. 46 * Generic to allow type-safe updates for specific task types. 47 */ 48export function updateTaskState<T extends TaskState>( 49 taskId: string, 50 setAppState: SetAppState, 51 updater: (task: T) => T, 52): void { 53 setAppState(prev => { 54 const task = prev.tasks?.[taskId] as T | undefined 55 if (!task) { 56 return prev 57 } 58 const updated = updater(task) 59 if (updated === task) { 60 // Updater returned the same reference (early-return no-op). Skip the 61 // spread so s.tasks subscribers don't re-render on unchanged state. 62 return prev 63 } 64 return { 65 ...prev, 66 tasks: { 67 ...prev.tasks, 68 [taskId]: updated, 69 }, 70 } 71 }) 72} 73 74/** 75 * Register a new task in AppState. 76 */ 77export function registerTask(task: TaskState, setAppState: SetAppState): void { 78 let isReplacement = false 79 setAppState(prev => { 80 const existing = prev.tasks[task.id] 81 isReplacement = existing !== undefined 82 // Carry forward UI-held state on re-register (resumeAgentBackground 83 // replaces the task; user's retain shouldn't reset). startTime keeps 84 // the panel sort stable; messages + diskLoaded preserve the viewed 85 // transcript across the replace (the user's just-appended prompt lives 86 // in messages and isn't on disk yet). 87 const merged = 88 existing && 'retain' in existing 89 ? { 90 ...task, 91 retain: existing.retain, 92 startTime: existing.startTime, 93 messages: existing.messages, 94 diskLoaded: existing.diskLoaded, 95 pendingMessages: existing.pendingMessages, 96 } 97 : task 98 return { ...prev, tasks: { ...prev.tasks, [task.id]: merged } } 99 }) 100 101 // Replacement (resume) — not a new start. Skip to avoid double-emit. 102 if (isReplacement) return 103 104 enqueueSdkEvent({ 105 type: 'system', 106 subtype: 'task_started', 107 task_id: task.id, 108 tool_use_id: task.toolUseId, 109 description: task.description, 110 task_type: task.type, 111 workflow_name: 112 'workflowName' in task 113 ? (task.workflowName as string | undefined) 114 : undefined, 115 prompt: 'prompt' in task ? (task.prompt as string) : undefined, 116 }) 117} 118 119/** 120 * Eagerly evict a terminal task from AppState. 121 * The task must be in a terminal state (completed/failed/killed) with notified=true. 122 * This allows memory to be freed without waiting for the next query loop iteration. 123 * The lazy GC in generateTaskAttachments() remains as a safety net. 124 */ 125export function evictTerminalTask( 126 taskId: string, 127 setAppState: SetAppState, 128): void { 129 setAppState(prev => { 130 const task = prev.tasks?.[taskId] 131 if (!task) return prev 132 if (!isTerminalTaskStatus(task.status)) return prev 133 if (!task.notified) return prev 134 // Panel grace period — blocks eviction until deadline passes. 135 // 'retain' in task narrows to LocalAgentTaskState (the only type with 136 // that field); evictAfter is optional so 'evictAfter' in task would 137 // miss tasks that haven't had it set yet. 138 if ('retain' in task && (task.evictAfter ?? Infinity) > Date.now()) { 139 return prev 140 } 141 const { [taskId]: _, ...remainingTasks } = prev.tasks 142 return { ...prev, tasks: remainingTasks } 143 }) 144} 145 146/** 147 * Get all running tasks. 148 */ 149export function getRunningTasks(state: AppState): TaskState[] { 150 const tasks = state.tasks ?? {} 151 return Object.values(tasks).filter(task => task.status === 'running') 152} 153 154/** 155 * Generate attachments for tasks with new output or status changes. 156 * Called by the framework to create push notifications. 157 */ 158export async function generateTaskAttachments(state: AppState): Promise<{ 159 attachments: TaskAttachment[] 160 // Only the offset patch — NOT the full task. The task may transition to 161 // completed during getTaskOutputDelta's async disk read, and spreading the 162 // full stale snapshot would clobber that transition (zombifying the task). 163 updatedTaskOffsets: Record<string, number> 164 evictedTaskIds: string[] 165}> { 166 const attachments: TaskAttachment[] = [] 167 const updatedTaskOffsets: Record<string, number> = {} 168 const evictedTaskIds: string[] = [] 169 const tasks = state.tasks ?? {} 170 171 for (const taskState of Object.values(tasks)) { 172 if (taskState.notified) { 173 switch (taskState.status) { 174 case 'completed': 175 case 'failed': 176 case 'killed': 177 // Evict terminal tasks — they've been consumed and can be GC'd 178 evictedTaskIds.push(taskState.id) 179 continue 180 case 'pending': 181 // Keep in map — hasn't run yet, but parent already knows about it 182 continue 183 case 'running': 184 // Fall through to running logic below 185 break 186 } 187 } 188 189 if (taskState.status === 'running') { 190 const delta = await getTaskOutputDelta( 191 taskState.id, 192 taskState.outputOffset, 193 ) 194 if (delta.content) { 195 updatedTaskOffsets[taskState.id] = delta.newOffset 196 } 197 } 198 199 // Completed tasks are NOT notified here — each task type handles its own 200 // completion notification via enqueuePendingNotification(). Generating 201 // attachments here would race with those per-type callbacks, causing 202 // dual delivery (one inline attachment + one separate API turn). 203 } 204 205 return { attachments, updatedTaskOffsets, evictedTaskIds } 206} 207 208/** 209 * Apply the outputOffset patches and evictions from generateTaskAttachments. 210 * Merges patches against FRESH prev.tasks (not the stale pre-await snapshot), 211 * so concurrent status transitions aren't clobbered. 212 */ 213export function applyTaskOffsetsAndEvictions( 214 setAppState: SetAppState, 215 updatedTaskOffsets: Record<string, number>, 216 evictedTaskIds: string[], 217): void { 218 const offsetIds = Object.keys(updatedTaskOffsets) 219 if (offsetIds.length === 0 && evictedTaskIds.length === 0) { 220 return 221 } 222 setAppState(prev => { 223 let changed = false 224 const newTasks = { ...prev.tasks } 225 for (const id of offsetIds) { 226 const fresh = newTasks[id] 227 // Re-check status on fresh state — task may have completed during the 228 // await. If it's no longer running, the offset update is moot. 229 if (fresh?.status === 'running') { 230 newTasks[id] = { ...fresh, outputOffset: updatedTaskOffsets[id]! } 231 changed = true 232 } 233 } 234 for (const id of evictedTaskIds) { 235 const fresh = newTasks[id] 236 // Re-check terminal+notified on fresh state (TOCTOU: resume may have 237 // replaced the task during the generateTaskAttachments await) 238 if (!fresh || !isTerminalTaskStatus(fresh.status) || !fresh.notified) { 239 continue 240 } 241 if ('retain' in fresh && (fresh.evictAfter ?? Infinity) > Date.now()) { 242 continue 243 } 244 delete newTasks[id] 245 changed = true 246 } 247 return changed ? { ...prev, tasks: newTasks } : prev 248 }) 249} 250 251/** 252 * Poll all running tasks and check for updates. 253 * This is the main polling loop called by the framework. 254 */ 255export async function pollTasks( 256 getAppState: () => AppState, 257 setAppState: SetAppState, 258): Promise<void> { 259 const state = getAppState() 260 const { attachments, updatedTaskOffsets, evictedTaskIds } = 261 await generateTaskAttachments(state) 262 263 applyTaskOffsetsAndEvictions(setAppState, updatedTaskOffsets, evictedTaskIds) 264 265 // Send notifications for completed tasks 266 for (const attachment of attachments) { 267 enqueueTaskNotification(attachment) 268 } 269} 270 271/** 272 * Enqueue a task notification to the message queue. 273 */ 274function enqueueTaskNotification(attachment: TaskAttachment): void { 275 const statusText = getStatusText(attachment.status) 276 277 const outputPath = getTaskOutputPath(attachment.taskId) 278 const toolUseIdLine = attachment.toolUseId 279 ? `\n<${TOOL_USE_ID_TAG}>${attachment.toolUseId}</${TOOL_USE_ID_TAG}>` 280 : '' 281 const message = `<${TASK_NOTIFICATION_TAG}> 282<${TASK_ID_TAG}>${attachment.taskId}</${TASK_ID_TAG}>${toolUseIdLine} 283<${TASK_TYPE_TAG}>${attachment.taskType}</${TASK_TYPE_TAG}> 284<${OUTPUT_FILE_TAG}>${outputPath}</${OUTPUT_FILE_TAG}> 285<${STATUS_TAG}>${attachment.status}</${STATUS_TAG}> 286<${SUMMARY_TAG}>Task "${attachment.description}" ${statusText}</${SUMMARY_TAG}> 287</${TASK_NOTIFICATION_TAG}>` 288 289 enqueuePendingNotification({ value: message, mode: 'task-notification' }) 290} 291 292/** 293 * Get human-readable status text. 294 */ 295function getStatusText(status: TaskStatus): string { 296 switch (status) { 297 case 'completed': 298 return 'completed successfully' 299 case 'failed': 300 return 'failed' 301 case 'killed': 302 return 'was stopped' 303 case 'running': 304 return 'is running' 305 case 'pending': 306 return 'is pending' 307 } 308}