source dump of claude code
at main 565 lines 22 kB view raw
1// Non-React scheduler core for .claude/scheduled_tasks.json. 2// Shared by REPL (via useScheduledTasks) and SDK/-p mode (print.ts). 3// 4// Lifecycle: poll getScheduledTasksEnabled() until true (flag flips when 5// CronCreate runs or a skill on: trigger fires) → load tasks + watch the 6// file + start a 1s check timer → on fire, call onFire(prompt). stop() 7// tears everything down. 8 9import type { FSWatcher } from 'chokidar' 10import { 11 getScheduledTasksEnabled, 12 getSessionCronTasks, 13 removeSessionCronTasks, 14 setScheduledTasksEnabled, 15} from '../bootstrap/state.js' 16import { 17 type AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 18 logEvent, 19} from '../services/analytics/index.js' 20import { cronToHuman } from './cron.js' 21import { 22 type CronJitterConfig, 23 type CronTask, 24 DEFAULT_CRON_JITTER_CONFIG, 25 findMissedTasks, 26 getCronFilePath, 27 hasCronTasksSync, 28 jitteredNextCronRunMs, 29 markCronTasksFired, 30 oneShotJitteredNextCronRunMs, 31 readCronTasks, 32 removeCronTasks, 33} from './cronTasks.js' 34import { 35 releaseSchedulerLock, 36 tryAcquireSchedulerLock, 37} from './cronTasksLock.js' 38import { logForDebugging } from './debug.js' 39 40const CHECK_INTERVAL_MS = 1000 41const FILE_STABILITY_MS = 300 42// How often a non-owning session re-probes the scheduler lock. Coarse 43// because takeover only matters when the owning session has crashed. 44const LOCK_PROBE_INTERVAL_MS = 5000 45/** 46 * True when a recurring task was created more than `maxAgeMs` ago and should 47 * be deleted on its next fire. Permanent tasks never age. `maxAgeMs === 0` 48 * means unlimited (never ages out). Sourced from 49 * {@link CronJitterConfig.recurringMaxAgeMs} at call time. 50 * Extracted for testability — the scheduler's check() is buried under 51 * setInterval/chokidar/lock machinery. 52 */ 53export function isRecurringTaskAged( 54 t: CronTask, 55 nowMs: number, 56 maxAgeMs: number, 57): boolean { 58 if (maxAgeMs === 0) return false 59 return Boolean(t.recurring && !t.permanent && nowMs - t.createdAt >= maxAgeMs) 60} 61 62type CronSchedulerOptions = { 63 /** Called when a task fires (regular or missed-on-startup). */ 64 onFire: (prompt: string) => void 65 /** While true, firing is deferred to the next tick. */ 66 isLoading: () => boolean 67 /** 68 * When true, bypasses the isLoading gate in check() and auto-enables the 69 * scheduler without waiting for setScheduledTasksEnabled(). The 70 * auto-enable is the load-bearing part — assistant mode has tasks in 71 * scheduled_tasks.json at install time and shouldn't wait on a loader 72 * skill to flip the flag. The isLoading bypass is minor post-#20425 73 * (assistant mode now idles between turns like a normal REPL). 74 */ 75 assistantMode?: boolean 76 /** 77 * When provided, receives the full CronTask on normal fires (and onFire is 78 * NOT called for that fire). Lets daemon callers see the task id/cron/etc 79 * instead of just the prompt string. 80 */ 81 onFireTask?: (task: CronTask) => void 82 /** 83 * When provided, receives the missed one-shot tasks on initial load (and 84 * onFire is NOT called with the pre-formatted notification). Daemon decides 85 * how to surface them. 86 */ 87 onMissed?: (tasks: CronTask[]) => void 88 /** 89 * Directory containing .claude/scheduled_tasks.json. When provided, the 90 * scheduler never touches bootstrap state: getProjectRoot/getSessionId are 91 * not read, and the getScheduledTasksEnabled() poll is skipped (enable() 92 * runs immediately on start). Required for Agent SDK daemon callers. 93 */ 94 dir?: string 95 /** 96 * Owner key written into the lock file. Defaults to getSessionId(). 97 * Daemon callers must pass a stable per-process UUID since they have no 98 * session. PID remains the liveness probe regardless. 99 */ 100 lockIdentity?: string 101 /** 102 * Returns the cron jitter config to use for this tick. Called once per 103 * check() cycle. REPL callers pass a GrowthBook-backed implementation 104 * (see cronJitterConfig.ts) for live tuning — ops can widen the jitter 105 * window mid-session during a :00 load spike without restarting clients. 106 * Agent SDK daemon callers omit this and get DEFAULT_CRON_JITTER_CONFIG, 107 * which is safe since daemons restart on config change anyway, and the 108 * growthbook.ts → config.ts → commands.ts → REPL chain stays out of 109 * sdk.mjs. 110 */ 111 getJitterConfig?: () => CronJitterConfig 112 /** 113 * Killswitch: polled once per check() tick. When true, check() bails 114 * before firing anything — existing crons stop dead mid-session. CLI 115 * callers inject `() => !isKairosCronEnabled()` so flipping the 116 * tengu_kairos_cron gate off stops already-running schedulers (not just 117 * new ones). Daemon callers omit this, same rationale as getJitterConfig. 118 */ 119 isKilled?: () => boolean 120 /** 121 * Per-task gate applied before any side effect. Tasks returning false are 122 * invisible to this scheduler: never fired, never stamped with 123 * `lastFiredAt`, never deleted, never surfaced as missed, absent from 124 * `getNextFireTime()`. The daemon cron worker uses `t => t.permanent` so 125 * non-permanent tasks in the same scheduled_tasks.json are untouched. 126 */ 127 filter?: (t: CronTask) => boolean 128} 129 130export type CronScheduler = { 131 start: () => void 132 stop: () => void 133 /** 134 * Epoch ms of the soonest scheduled fire across all loaded tasks, or null 135 * if nothing is scheduled (no tasks, or all tasks already in-flight). 136 * Daemon callers use this to decide whether to tear down an idle agent 137 * subprocess or keep it warm for an imminent fire. 138 */ 139 getNextFireTime: () => number | null 140} 141 142export function createCronScheduler( 143 options: CronSchedulerOptions, 144): CronScheduler { 145 const { 146 onFire, 147 isLoading, 148 assistantMode = false, 149 onFireTask, 150 onMissed, 151 dir, 152 lockIdentity, 153 getJitterConfig, 154 isKilled, 155 filter, 156 } = options 157 const lockOpts = dir || lockIdentity ? { dir, lockIdentity } : undefined 158 159 // File-backed tasks only. Session tasks (durable: false) are NOT loaded 160 // here — they can be added/removed mid-session with no file event, so 161 // check() reads them fresh from bootstrap state on every tick instead. 162 let tasks: CronTask[] = [] 163 // Per-task next-fire times (epoch ms). 164 const nextFireAt = new Map<string, number>() 165 // Ids we've already enqueued a "missed task" prompt for — prevents 166 // re-asking on every file change before the user answers. 167 const missedAsked = new Set<string>() 168 // Tasks currently enqueued but not yet removed from the file. Prevents 169 // double-fire if the interval ticks again before removeCronTasks lands. 170 const inFlight = new Set<string>() 171 172 let enablePoll: ReturnType<typeof setInterval> | null = null 173 let checkTimer: ReturnType<typeof setInterval> | null = null 174 let lockProbeTimer: ReturnType<typeof setInterval> | null = null 175 let watcher: FSWatcher | null = null 176 let stopped = false 177 let isOwner = false 178 179 async function load(initial: boolean) { 180 const next = await readCronTasks(dir) 181 if (stopped) return 182 tasks = next 183 184 // Only surface missed tasks on initial load. Chokidar-triggered 185 // reloads leave overdue tasks to check() (which anchors from createdAt 186 // and fires immediately). This avoids a misleading "missed while Claude 187 // was not running" prompt for tasks that became overdue mid-session. 188 // 189 // Recurring tasks are NOT surfaced or deleted — check() handles them 190 // correctly (fires on first tick, reschedules forward). Only one-shot 191 // missed tasks need user input (run once now, or discard forever). 192 if (!initial) return 193 194 const now = Date.now() 195 const missed = findMissedTasks(next, now).filter( 196 t => !t.recurring && !missedAsked.has(t.id) && (!filter || filter(t)), 197 ) 198 if (missed.length > 0) { 199 for (const t of missed) { 200 missedAsked.add(t.id) 201 // Prevent check() from re-firing the raw prompt while the async 202 // removeCronTasks + chokidar reload chain is in progress. 203 nextFireAt.set(t.id, Infinity) 204 } 205 logEvent('tengu_scheduled_task_missed', { 206 count: missed.length, 207 taskIds: missed 208 .map(t => t.id) 209 .join( 210 ',', 211 ) as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 212 }) 213 if (onMissed) { 214 onMissed(missed) 215 } else { 216 onFire(buildMissedTaskNotification(missed)) 217 } 218 void removeCronTasks( 219 missed.map(t => t.id), 220 dir, 221 ).catch(e => 222 logForDebugging(`[ScheduledTasks] failed to remove missed tasks: ${e}`), 223 ) 224 logForDebugging( 225 `[ScheduledTasks] surfaced ${missed.length} missed one-shot task(s)`, 226 ) 227 } 228 } 229 230 function check() { 231 if (isKilled?.()) return 232 if (isLoading() && !assistantMode) return 233 const now = Date.now() 234 const seen = new Set<string>() 235 // File-backed recurring tasks that fired this tick. Batched into one 236 // markCronTasksFired call after the loop so N fires = one write. Session 237 // tasks excluded — they die with the process, no point persisting. 238 const firedFileRecurring: string[] = [] 239 // Read once per tick. REPL callers pass getJitterConfig backed by 240 // GrowthBook so a config push takes effect without restart. Daemon and 241 // SDK callers omit it and get DEFAULT_CRON_JITTER_CONFIG (safe — jitter 242 // is an ops lever for REPL fleet load-shedding, not a daemon concern). 243 const jitterCfg = getJitterConfig?.() ?? DEFAULT_CRON_JITTER_CONFIG 244 245 // Shared loop body. `isSession` routes the one-shot cleanup path: 246 // session tasks are removed synchronously from memory, file tasks go 247 // through the async removeCronTasks + chokidar reload. 248 function process(t: CronTask, isSession: boolean) { 249 if (filter && !filter(t)) return 250 seen.add(t.id) 251 if (inFlight.has(t.id)) return 252 253 let next = nextFireAt.get(t.id) 254 if (next === undefined) { 255 // First sight — anchor from lastFiredAt (recurring) or createdAt. 256 // Never-fired recurring tasks use createdAt: if isLoading delayed 257 // this tick past the fire time, anchoring from `now` would compute 258 // next-year for pinned crons (`30 14 27 2 *`). Fired-before tasks 259 // use lastFiredAt: the reschedule below writes `now` back to disk, 260 // so on next process spawn first-sight computes the SAME newNext we 261 // set in-memory here. Without this, a daemon child despawning on 262 // idle loses nextFireAt and the next spawn re-anchors from 10-day- 263 // old createdAt → fires every task every cycle. 264 next = t.recurring 265 ? (jitteredNextCronRunMs( 266 t.cron, 267 t.lastFiredAt ?? t.createdAt, 268 t.id, 269 jitterCfg, 270 ) ?? Infinity) 271 : (oneShotJitteredNextCronRunMs( 272 t.cron, 273 t.createdAt, 274 t.id, 275 jitterCfg, 276 ) ?? Infinity) 277 nextFireAt.set(t.id, next) 278 logForDebugging( 279 `[ScheduledTasks] scheduled ${t.id} for ${next === Infinity ? 'never' : new Date(next).toISOString()}`, 280 ) 281 } 282 283 if (now < next) return 284 285 logForDebugging( 286 `[ScheduledTasks] firing ${t.id}${t.recurring ? ' (recurring)' : ''}`, 287 ) 288 logEvent('tengu_scheduled_task_fire', { 289 recurring: t.recurring ?? false, 290 taskId: 291 t.id as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 292 }) 293 if (onFireTask) { 294 onFireTask(t) 295 } else { 296 onFire(t.prompt) 297 } 298 299 // Aged-out recurring tasks fall through to the one-shot delete paths 300 // below (session tasks get synchronous removal; file tasks get the 301 // async inFlight/chokidar path). Fires one last time, then is removed. 302 const aged = isRecurringTaskAged(t, now, jitterCfg.recurringMaxAgeMs) 303 if (aged) { 304 const ageHours = Math.floor((now - t.createdAt) / 1000 / 60 / 60) 305 logForDebugging( 306 `[ScheduledTasks] recurring task ${t.id} aged out (${ageHours}h since creation), deleting after final fire`, 307 ) 308 logEvent('tengu_scheduled_task_expired', { 309 taskId: 310 t.id as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 311 ageHours, 312 }) 313 } 314 315 if (t.recurring && !aged) { 316 // Recurring: reschedule from now (not from next) to avoid rapid 317 // catch-up if the session was blocked. Jitter keeps us off the 318 // exact :00 wall-clock boundary every cycle. 319 const newNext = 320 jitteredNextCronRunMs(t.cron, now, t.id, jitterCfg) ?? Infinity 321 nextFireAt.set(t.id, newNext) 322 // Persist lastFiredAt=now so next process spawn reconstructs this 323 // same newNext on first-sight. Session tasks skip — process-local. 324 if (!isSession) firedFileRecurring.push(t.id) 325 } else if (isSession) { 326 // One-shot (or aged-out recurring) session task: synchronous memory 327 // removal. No inFlight window — the next tick will read a session 328 // store without this id. 329 removeSessionCronTasks([t.id]) 330 nextFireAt.delete(t.id) 331 } else { 332 // One-shot (or aged-out recurring) file task: delete from disk. 333 // inFlight guards against double-fire during the async 334 // removeCronTasks + chokidar reload. 335 inFlight.add(t.id) 336 void removeCronTasks([t.id], dir) 337 .catch(e => 338 logForDebugging( 339 `[ScheduledTasks] failed to remove task ${t.id}: ${e}`, 340 ), 341 ) 342 .finally(() => inFlight.delete(t.id)) 343 nextFireAt.delete(t.id) 344 } 345 } 346 347 // File-backed tasks: only when we own the scheduler lock. The lock 348 // exists to stop two Claude sessions in the same cwd from double-firing 349 // the same on-disk task. 350 if (isOwner) { 351 for (const t of tasks) process(t, false) 352 // Batched lastFiredAt write. inFlight guards against double-fire 353 // during the chokidar-triggered reload (same pattern as removeCronTasks 354 // below) — the reload re-seeds `tasks` with the just-written 355 // lastFiredAt, and first-sight on that yields the same newNext we 356 // already set in-memory, so it's idempotent even without inFlight. 357 // Guarding anyway keeps the semantics obvious. 358 if (firedFileRecurring.length > 0) { 359 for (const id of firedFileRecurring) inFlight.add(id) 360 void markCronTasksFired(firedFileRecurring, now, dir) 361 .catch(e => 362 logForDebugging( 363 `[ScheduledTasks] failed to persist lastFiredAt: ${e}`, 364 ), 365 ) 366 .finally(() => { 367 for (const id of firedFileRecurring) inFlight.delete(id) 368 }) 369 } 370 } 371 // Session-only tasks: process-private, the lock does not apply — the 372 // other session cannot see them and there is no double-fire risk. Read 373 // fresh from bootstrap state every tick (no chokidar, no load()). This 374 // is skipped on the daemon path (`dir !== undefined`) which never 375 // touches bootstrap state. 376 if (dir === undefined) { 377 for (const t of getSessionCronTasks()) process(t, true) 378 } 379 380 if (seen.size === 0) { 381 // No live tasks this tick — clear the whole schedule so 382 // getNextFireTime() returns null. The eviction loop below is 383 // unreachable here (seen is empty), so stale entries would 384 // otherwise survive indefinitely and keep the daemon agent warm. 385 nextFireAt.clear() 386 return 387 } 388 // Evict schedule entries for tasks no longer present. When !isOwner, 389 // file-task ids aren't in `seen` and get evicted — harmless: they 390 // re-anchor from createdAt on the first owned tick. 391 for (const id of nextFireAt.keys()) { 392 if (!seen.has(id)) nextFireAt.delete(id) 393 } 394 } 395 396 async function enable() { 397 if (stopped) return 398 if (enablePoll) { 399 clearInterval(enablePoll) 400 enablePoll = null 401 } 402 403 const { default: chokidar } = await import('chokidar') 404 if (stopped) return 405 406 // Acquire the per-project scheduler lock. Only the owning session runs 407 // check(). Other sessions probe periodically to take over if the owner 408 // dies. Prevents double-firing when multiple Claudes share a cwd. 409 isOwner = await tryAcquireSchedulerLock(lockOpts).catch(() => false) 410 if (stopped) { 411 if (isOwner) { 412 isOwner = false 413 void releaseSchedulerLock(lockOpts) 414 } 415 return 416 } 417 if (!isOwner) { 418 lockProbeTimer = setInterval(() => { 419 void tryAcquireSchedulerLock(lockOpts) 420 .then(owned => { 421 if (stopped) { 422 if (owned) void releaseSchedulerLock(lockOpts) 423 return 424 } 425 if (owned) { 426 isOwner = true 427 if (lockProbeTimer) { 428 clearInterval(lockProbeTimer) 429 lockProbeTimer = null 430 } 431 } 432 }) 433 .catch(e => logForDebugging(String(e), { level: 'error' })) 434 }, LOCK_PROBE_INTERVAL_MS) 435 lockProbeTimer.unref?.() 436 } 437 438 void load(true) 439 440 const path = getCronFilePath(dir) 441 watcher = chokidar.watch(path, { 442 persistent: false, 443 ignoreInitial: true, 444 awaitWriteFinish: { stabilityThreshold: FILE_STABILITY_MS }, 445 ignorePermissionErrors: true, 446 }) 447 watcher.on('add', () => void load(false)) 448 watcher.on('change', () => void load(false)) 449 watcher.on('unlink', () => { 450 if (!stopped) { 451 tasks = [] 452 nextFireAt.clear() 453 } 454 }) 455 456 checkTimer = setInterval(check, CHECK_INTERVAL_MS) 457 // Don't keep the process alive for the scheduler alone — in -p text mode 458 // the process should exit after the single turn even if a cron was created. 459 checkTimer.unref?.() 460 } 461 462 return { 463 start() { 464 stopped = false 465 // Daemon path (dir explicitly given): don't touch bootstrap state — 466 // getScheduledTasksEnabled() would read a never-initialized flag. The 467 // daemon is asking to schedule; just enable. 468 if (dir !== undefined) { 469 logForDebugging( 470 `[ScheduledTasks] scheduler start() — dir=${dir}, hasTasks=${hasCronTasksSync(dir)}`, 471 ) 472 void enable() 473 return 474 } 475 logForDebugging( 476 `[ScheduledTasks] scheduler start() — enabled=${getScheduledTasksEnabled()}, hasTasks=${hasCronTasksSync()}`, 477 ) 478 // Auto-enable when scheduled_tasks.json has entries. CronCreateTool 479 // also sets this when a task is created mid-session. 480 if ( 481 !getScheduledTasksEnabled() && 482 (assistantMode || hasCronTasksSync()) 483 ) { 484 setScheduledTasksEnabled(true) 485 } 486 if (getScheduledTasksEnabled()) { 487 void enable() 488 return 489 } 490 enablePoll = setInterval( 491 en => { 492 if (getScheduledTasksEnabled()) void en() 493 }, 494 CHECK_INTERVAL_MS, 495 enable, 496 ) 497 enablePoll.unref?.() 498 }, 499 stop() { 500 stopped = true 501 if (enablePoll) { 502 clearInterval(enablePoll) 503 enablePoll = null 504 } 505 if (checkTimer) { 506 clearInterval(checkTimer) 507 checkTimer = null 508 } 509 if (lockProbeTimer) { 510 clearInterval(lockProbeTimer) 511 lockProbeTimer = null 512 } 513 void watcher?.close() 514 watcher = null 515 if (isOwner) { 516 isOwner = false 517 void releaseSchedulerLock(lockOpts) 518 } 519 }, 520 getNextFireTime() { 521 // nextFireAt uses Infinity for "never" (in-flight one-shots, bad cron 522 // strings). Filter those out so callers can distinguish "soon" from 523 // "nothing pending". 524 let min = Infinity 525 for (const t of nextFireAt.values()) { 526 if (t < min) min = t 527 } 528 return min === Infinity ? null : min 529 }, 530 } 531} 532 533/** 534 * Build the missed-task notification text. Guidance precedes the task list 535 * and the list is wrapped in a code fence so a multi-line imperative prompt 536 * is not interpreted as immediate instructions to avoid self-inflicted 537 * prompt injection. The full prompt body is preserved — this path DOES 538 * need the model to execute the prompt after user 539 * confirmation, and tasks are already deleted from JSON before the model 540 * sees this notification. 541 */ 542export function buildMissedTaskNotification(missed: CronTask[]): string { 543 const plural = missed.length > 1 544 const header = 545 `The following one-shot scheduled task${plural ? 's were' : ' was'} missed while Claude was not running. ` + 546 `${plural ? 'They have' : 'It has'} already been removed from .claude/scheduled_tasks.json.\n\n` + 547 `Do NOT execute ${plural ? 'these prompts' : 'this prompt'} yet. ` + 548 `First use the AskUserQuestion tool to ask whether to run ${plural ? 'each one' : 'it'} now. ` + 549 `Only execute if the user confirms.` 550 551 const blocks = missed.map(t => { 552 const meta = `[${cronToHuman(t.cron)}, created ${new Date(t.createdAt).toLocaleString()}]` 553 // Use a fence one longer than any backtick run in the prompt so a 554 // prompt containing ``` cannot close the fence early and un-wrap the 555 // trailing text (CommonMark fence-matching rule). 556 const longestRun = (t.prompt.match(/`+/g) ?? []).reduce( 557 (max, run) => Math.max(max, run.length), 558 0, 559 ) 560 const fence = '`'.repeat(Math.max(3, longestRun + 1)) 561 return `${meta}\n${fence}\n${t.prompt}\n${fence}` 562 }) 563 564 return `${header}\n\n${blocks.join('\n\n')}` 565}