source dump of claude code
at main 451 lines 14 kB view raw
1import { constants as fsConstants } from 'fs' 2import { 3 type FileHandle, 4 mkdir, 5 open, 6 stat, 7 symlink, 8 unlink, 9} from 'fs/promises' 10import { join } from 'path' 11import { getSessionId } from '../../bootstrap/state.js' 12import { getErrnoCode } from '../errors.js' 13import { readFileRange, tailFile } from '../fsOperations.js' 14import { logError } from '../log.js' 15import { getProjectTempDir } from '../permissions/filesystem.js' 16 17// SECURITY: O_NOFOLLOW prevents following symlinks when opening task output files. 18// Without this, an attacker in the sandbox could create symlinks in the tasks directory 19// pointing to arbitrary files, causing Claude Code on the host to write to those files. 20// O_NOFOLLOW is not available on Windows, but the sandbox attack vector is Unix-only. 21const O_NOFOLLOW = fsConstants.O_NOFOLLOW ?? 0 22 23const DEFAULT_MAX_READ_BYTES = 8 * 1024 * 1024 // 8MB 24 25/** 26 * Disk cap for task output files. In file mode (bash), a watchdog polls 27 * file size and kills the process. In pipe mode (hooks), DiskTaskOutput 28 * drops chunks past this limit. Shared so both caps stay in sync. 29 */ 30export const MAX_TASK_OUTPUT_BYTES = 5 * 1024 * 1024 * 1024 31export const MAX_TASK_OUTPUT_BYTES_DISPLAY = '5GB' 32 33/** 34 * Get the task output directory for this session. 35 * Uses project temp directory so reads are auto-allowed by checkReadableInternalPath. 36 * 37 * The session ID is included so concurrent sessions in the same project don't 38 * clobber each other's output files. Startup cleanup in one session previously 39 * unlinked in-flight output files from other sessions — the writing process's fd 40 * keeps the inode alive but reads via path fail ENOENT, and getStdout() returned 41 * empty string (inc-4586 / boris-20260309-060423). 42 * 43 * The session ID is captured at FIRST CALL, not re-read on every invocation. 44 * /clear calls regenerateSessionId(), which would otherwise cause 45 * ensureOutputDir() to create a new-session path while existing TaskOutput 46 * instances still hold old-session paths — open() would ENOENT. Background 47 * bash tasks surviving /clear need their output files to stay reachable. 48 */ 49let _taskOutputDir: string | undefined 50export function getTaskOutputDir(): string { 51 if (_taskOutputDir === undefined) { 52 _taskOutputDir = join(getProjectTempDir(), getSessionId(), 'tasks') 53 } 54 return _taskOutputDir 55} 56 57/** Test helper — clears the memoized dir. */ 58export function _resetTaskOutputDirForTest(): void { 59 _taskOutputDir = undefined 60} 61 62/** 63 * Ensure the task output directory exists 64 */ 65async function ensureOutputDir(): Promise<void> { 66 await mkdir(getTaskOutputDir(), { recursive: true }) 67} 68 69/** 70 * Get the output file path for a task 71 */ 72export function getTaskOutputPath(taskId: string): string { 73 return join(getTaskOutputDir(), `${taskId}.output`) 74} 75 76// Tracks fire-and-forget promises (initTaskOutput, initTaskOutputAsSymlink, 77// evictTaskOutput, #drain) so tests can drain before teardown. Prevents the 78// async-ENOENT-after-teardown flake class (#24957, #25065): a voided async 79// resumes after preload's afterEach nuked the temp dir → ENOENT → unhandled 80// rejection → flaky test failure. allSettled so a rejection doesn't short- 81// circuit the drain and leave other ops racing the rmSync. 82const _pendingOps = new Set<Promise<unknown>>() 83function track<T>(p: Promise<T>): Promise<T> { 84 _pendingOps.add(p) 85 void p.finally(() => _pendingOps.delete(p)).catch(() => {}) 86 return p 87} 88 89/** 90 * Encapsulates async disk writes for a single task's output. 91 * 92 * Uses a flat array as a write queue processed by a single drain loop, 93 * so each chunk can be GC'd immediately after its write completes. 94 * This avoids the memory retention problem of chained .then() closures 95 * where every reaction captures its data until the whole chain resolves. 96 */ 97export class DiskTaskOutput { 98 #path: string 99 #fileHandle: FileHandle | null = null 100 #queue: string[] = [] 101 #bytesWritten = 0 102 #capped = false 103 #flushPromise: Promise<void> | null = null 104 #flushResolve: (() => void) | null = null 105 106 constructor(taskId: string) { 107 this.#path = getTaskOutputPath(taskId) 108 } 109 110 append(content: string): void { 111 if (this.#capped) { 112 return 113 } 114 // content.length (UTF-16 code units) undercounts UTF-8 bytes by at most ~3×. 115 // Acceptable for a coarse disk-fill guard — avoids re-scanning every chunk. 116 this.#bytesWritten += content.length 117 if (this.#bytesWritten > MAX_TASK_OUTPUT_BYTES) { 118 this.#capped = true 119 this.#queue.push( 120 `\n[output truncated: exceeded ${MAX_TASK_OUTPUT_BYTES_DISPLAY} disk cap]\n`, 121 ) 122 } else { 123 this.#queue.push(content) 124 } 125 if (!this.#flushPromise) { 126 this.#flushPromise = new Promise<void>(resolve => { 127 this.#flushResolve = resolve 128 }) 129 void track(this.#drain()) 130 } 131 } 132 133 flush(): Promise<void> { 134 return this.#flushPromise ?? Promise.resolve() 135 } 136 137 cancel(): void { 138 this.#queue.length = 0 139 } 140 141 async #drainAllChunks(): Promise<void> { 142 while (true) { 143 try { 144 if (!this.#fileHandle) { 145 await ensureOutputDir() 146 this.#fileHandle = await open( 147 this.#path, 148 process.platform === 'win32' 149 ? 'a' 150 : fsConstants.O_WRONLY | 151 fsConstants.O_APPEND | 152 fsConstants.O_CREAT | 153 O_NOFOLLOW, 154 ) 155 } 156 while (true) { 157 await this.#writeAllChunks() 158 if (this.#queue.length === 0) { 159 break 160 } 161 } 162 } finally { 163 if (this.#fileHandle) { 164 const fileHandle = this.#fileHandle 165 this.#fileHandle = null 166 await fileHandle.close() 167 } 168 } 169 // you could have another .append() while we're waiting for the file to close, so we check the queue again before fully exiting 170 if (this.#queue.length) { 171 continue 172 } 173 174 break 175 } 176 } 177 178 #writeAllChunks(): Promise<void> { 179 // This code is extremely precise. 180 // You **must not** add an await here!! That will cause memory to balloon as the queue grows. 181 // It's okay to add an `await` to the caller of this method (e.g. #drainAllChunks) because that won't cause Buffer[] to be kept alive in memory. 182 return this.#fileHandle!.appendFile( 183 // This variable needs to get GC'd ASAP. 184 this.#queueToBuffers(), 185 ) 186 } 187 188 /** Keep this in a separate method so that GC doesn't keep it alive for any longer than it should. */ 189 #queueToBuffers(): Buffer { 190 // Use .splice to in-place mutate the array, informing the GC it can free it. 191 const queue = this.#queue.splice(0, this.#queue.length) 192 193 let totalLength = 0 194 for (const str of queue) { 195 totalLength += Buffer.byteLength(str, 'utf8') 196 } 197 198 const buffer = Buffer.allocUnsafe(totalLength) 199 let offset = 0 200 for (const str of queue) { 201 offset += buffer.write(str, offset, 'utf8') 202 } 203 204 return buffer 205 } 206 207 async #drain(): Promise<void> { 208 try { 209 await this.#drainAllChunks() 210 } catch (e) { 211 // Transient fs errors (EMFILE on busy CI, EPERM on Windows pending- 212 // delete) previously rode up through `void this.#drain()` as an 213 // unhandled rejection while the flush promise resolved anyway — callers 214 // saw an empty file with no error. Retry once for the transient case 215 // (queue is intact if open() failed), then log and give up. 216 logError(e) 217 if (this.#queue.length > 0) { 218 try { 219 await this.#drainAllChunks() 220 } catch (e2) { 221 logError(e2) 222 } 223 } 224 } finally { 225 const resolve = this.#flushResolve! 226 this.#flushPromise = null 227 this.#flushResolve = null 228 resolve() 229 } 230 } 231} 232 233const outputs = new Map<string, DiskTaskOutput>() 234 235/** 236 * Test helper — cancel pending writes, await in-flight ops, clear the map. 237 * backgroundShells.test.ts and other task tests spawn real shells that 238 * write through this module without afterEach cleanup; their entries 239 * leak into diskOutput.test.ts on the same shard. 240 * 241 * Awaits all tracked promises until the set stabilizes — a settling promise 242 * may spawn another (initTaskOutputAsSymlink's catch → initTaskOutput). 243 * Call this in afterEach BEFORE rmSync to avoid async-ENOENT-after-teardown. 244 */ 245export async function _clearOutputsForTest(): Promise<void> { 246 for (const output of outputs.values()) { 247 output.cancel() 248 } 249 while (_pendingOps.size > 0) { 250 await Promise.allSettled([..._pendingOps]) 251 } 252 outputs.clear() 253} 254 255function getOrCreateOutput(taskId: string): DiskTaskOutput { 256 let output = outputs.get(taskId) 257 if (!output) { 258 output = new DiskTaskOutput(taskId) 259 outputs.set(taskId, output) 260 } 261 return output 262} 263 264/** 265 * Append output to a task's disk file asynchronously. 266 * Creates the file if it doesn't exist. 267 */ 268export function appendTaskOutput(taskId: string, content: string): void { 269 getOrCreateOutput(taskId).append(content) 270} 271 272/** 273 * Wait for all pending writes for a task to complete. 274 * Useful before reading output to ensure all data is flushed. 275 */ 276export async function flushTaskOutput(taskId: string): Promise<void> { 277 const output = outputs.get(taskId) 278 if (output) { 279 await output.flush() 280 } 281} 282 283/** 284 * Evict a task's DiskTaskOutput from the in-memory map after flushing. 285 * Unlike cleanupTaskOutput, this does not delete the output file on disk. 286 * Call this when a task completes and its output has been consumed. 287 */ 288export function evictTaskOutput(taskId: string): Promise<void> { 289 return track( 290 (async () => { 291 const output = outputs.get(taskId) 292 if (output) { 293 await output.flush() 294 outputs.delete(taskId) 295 } 296 })(), 297 ) 298} 299 300/** 301 * Get delta (new content) since last read. 302 * Reads only from the byte offset, up to maxBytes — never loads the full file. 303 */ 304export async function getTaskOutputDelta( 305 taskId: string, 306 fromOffset: number, 307 maxBytes: number = DEFAULT_MAX_READ_BYTES, 308): Promise<{ content: string; newOffset: number }> { 309 try { 310 const result = await readFileRange( 311 getTaskOutputPath(taskId), 312 fromOffset, 313 maxBytes, 314 ) 315 if (!result) { 316 return { content: '', newOffset: fromOffset } 317 } 318 return { 319 content: result.content, 320 newOffset: fromOffset + result.bytesRead, 321 } 322 } catch (e) { 323 const code = getErrnoCode(e) 324 if (code === 'ENOENT') { 325 return { content: '', newOffset: fromOffset } 326 } 327 logError(e) 328 return { content: '', newOffset: fromOffset } 329 } 330} 331 332/** 333 * Get output for a task, reading the tail of the file. 334 * Caps at maxBytes to avoid loading multi-GB files into memory. 335 */ 336export async function getTaskOutput( 337 taskId: string, 338 maxBytes: number = DEFAULT_MAX_READ_BYTES, 339): Promise<string> { 340 try { 341 const { content, bytesTotal, bytesRead } = await tailFile( 342 getTaskOutputPath(taskId), 343 maxBytes, 344 ) 345 if (bytesTotal > bytesRead) { 346 return `[${Math.round((bytesTotal - bytesRead) / 1024)}KB of earlier output omitted]\n${content}` 347 } 348 return content 349 } catch (e) { 350 const code = getErrnoCode(e) 351 if (code === 'ENOENT') { 352 return '' 353 } 354 logError(e) 355 return '' 356 } 357} 358 359/** 360 * Get the current size (offset) of a task's output file. 361 */ 362export async function getTaskOutputSize(taskId: string): Promise<number> { 363 try { 364 return (await stat(getTaskOutputPath(taskId))).size 365 } catch (e) { 366 const code = getErrnoCode(e) 367 if (code === 'ENOENT') { 368 return 0 369 } 370 logError(e) 371 return 0 372 } 373} 374 375/** 376 * Clean up a task's output file and write queue. 377 */ 378export async function cleanupTaskOutput(taskId: string): Promise<void> { 379 const output = outputs.get(taskId) 380 if (output) { 381 output.cancel() 382 outputs.delete(taskId) 383 } 384 385 try { 386 await unlink(getTaskOutputPath(taskId)) 387 } catch (e) { 388 const code = getErrnoCode(e) 389 if (code === 'ENOENT') { 390 return 391 } 392 logError(e) 393 } 394} 395 396/** 397 * Initialize output file for a new task. 398 * Creates an empty file to ensure the path exists. 399 */ 400export function initTaskOutput(taskId: string): Promise<string> { 401 return track( 402 (async () => { 403 await ensureOutputDir() 404 const outputPath = getTaskOutputPath(taskId) 405 // SECURITY: O_NOFOLLOW prevents symlink-following attacks from the sandbox. 406 // O_EXCL ensures we create a new file and fail if something already exists at this path. 407 // On Windows, use string flags — numeric O_EXCL can produce EINVAL through libuv. 408 const fh = await open( 409 outputPath, 410 process.platform === 'win32' 411 ? 'wx' 412 : fsConstants.O_WRONLY | 413 fsConstants.O_CREAT | 414 fsConstants.O_EXCL | 415 O_NOFOLLOW, 416 ) 417 await fh.close() 418 return outputPath 419 })(), 420 ) 421} 422 423/** 424 * Initialize output file as a symlink to another file (e.g., agent transcript). 425 * Tries to create the symlink first; if a file already exists, removes it and retries. 426 */ 427export function initTaskOutputAsSymlink( 428 taskId: string, 429 targetPath: string, 430): Promise<string> { 431 return track( 432 (async () => { 433 try { 434 await ensureOutputDir() 435 const outputPath = getTaskOutputPath(taskId) 436 437 try { 438 await symlink(targetPath, outputPath) 439 } catch { 440 await unlink(outputPath) 441 await symlink(targetPath, outputPath) 442 } 443 444 return outputPath 445 } catch (error) { 446 logError(error) 447 return initTaskOutput(taskId) 448 } 449 })(), 450 ) 451}