@recaptime-dev's working patches + fork for Phorge, a community fork of Phabricator. (Upstream dev and stable branches are at upstream/main and upstream/stable respectively.)
hq.recaptime.dev/wiki/Phorge
phorge
phabricator
1<?php
2
3/**
4 * Schedule and execute event triggers, which run code at specific times.
5 *
6 * Also performs garbage collection of old logs, caches, etc.
7 *
8 * @task garbage Garbage Collection
9 */
10final class PhabricatorTriggerDaemon
11 extends PhabricatorDaemon {
12
13 const COUNTER_VERSION = 'trigger.version';
14 const COUNTER_CURSOR = 'trigger.cursor';
15
16 private $garbageCollectors;
17 private $nextCollection;
18
19 private $anyNuanceData;
20 private $nuanceSources;
21 private $nuanceCursors;
22
23 private $calendarEngine;
24
25 protected function run() {
26
27 // The trigger daemon is a low-level infrastructure daemon which schedules
28 // and executes chronological events. Examples include a subscription which
29 // generates a bill on the 12th of every month, or a reminder email 15
30 // minutes before a meeting.
31
32 // Only one trigger daemon can run at a time, and very little work should
33 // happen in the daemon process. In general, triggered events should
34 // just schedule a task into the normal daemon worker queue and then
35 // return. This allows the real work to take longer to execute without
36 // disrupting other triggers.
37
38 // The trigger mechanism guarantees that events will execute exactly once,
39 // but does not guarantee that they will execute at precisely the specified
40 // time. Under normal circumstances, they should execute within a minute or
41 // so of the desired time, so this mechanism can be used for things like
42 // meeting reminders.
43
44 // If the trigger queue backs up (for example, because it is overwhelmed by
45 // trigger updates, doesn't run for a while, or a trigger action is written
46 // inefficiently) or the daemon queue backs up (usually for similar
47 // reasons), events may execute an arbitrarily long time after they were
48 // scheduled to execute. In some cases (like billing a subscription) this
49 // may be desirable; in other cases (like sending a meeting reminder) the
50 // action may want to check the current time and see if the event is still
51 // relevant.
52
53 // The trigger daemon works in two phases:
54 //
55 // 1. A scheduling phase processes recently updated triggers and
56 // schedules them for future execution. For example, this phase would
57 // see that a meeting trigger had been changed recently, determine
58 // when the reminder for it should execute, and then schedule the
59 // action to execute at that future date.
60 // 2. An execution phase runs the actions for any scheduled events which
61 // are due to execute.
62 //
63 // The major goal of this design is to deliver on the guarantee that events
64 // will execute exactly once. It prevents race conditions in scheduling
65 // and execution by ensuring there is only one writer for either of these
66 // phases. Without this separation of responsibilities, web processes
67 // trying to reschedule events after an update could race with other web
68 // processes or the daemon.
69
70 // We want to start the first GC cycle right away, not wait 4 hours.
71 $this->nextCollection = PhabricatorTime::getNow();
72
73 do {
74 PhabricatorCaches::destroyRequestCache();
75
76 $lock = PhabricatorGlobalLock::newLock('trigger');
77
78 try {
79 $lock->lock(5);
80 } catch (PhutilLockException $ex) {
81 throw new Exception(
82 pht(
83 'Another process is holding the trigger lock. Usually, this '.
84 'means another copy of the trigger daemon is running elsewhere. '.
85 'Multiple processes are not permitted to update triggers '.
86 'simultaneously.'),
87 0,
88 $ex);
89 }
90
91 // Run the scheduling phase. This finds updated triggers which we have
92 // not scheduled yet and schedules them.
93 $last_version = $this->loadCurrentCursor();
94 $head_version = $this->loadCurrentVersion();
95
96 // The cursor points at the next record to process, so we can only skip
97 // this step if we're ahead of the version number.
98 if ($last_version <= $head_version) {
99 $this->scheduleTriggers($last_version);
100 }
101
102 // Run the execution phase. This finds events which are due to execute
103 // and runs them.
104 $this->executeTriggers();
105
106 $lock->unlock();
107
108 $sleep_duration = $this->getSleepDuration();
109 $sleep_duration = $this->runNuanceImportCursors($sleep_duration);
110 $sleep_duration = $this->runGarbageCollection($sleep_duration);
111 $sleep_duration = $this->runCalendarNotifier($sleep_duration);
112
113 if ($this->shouldHibernate($sleep_duration)) {
114 break;
115 }
116
117 $this->sleep($sleep_duration);
118 } while (!$this->shouldExit());
119 }
120
121
122 /**
123 * Process all of the triggers which have been updated since the last time
124 * the daemon ran, scheduling them into the event table.
125 *
126 * @param int $cursor Cursor for the next version update to process.
127 * @return void
128 */
129 private function scheduleTriggers($cursor) {
130 $limit = 100;
131
132 $query = id(new PhabricatorWorkerTriggerQuery())
133 ->setViewer($this->getViewer())
134 ->withVersionBetween($cursor, null)
135 ->setOrder(PhabricatorWorkerTriggerQuery::ORDER_VERSION)
136 ->needEvents(true)
137 ->setLimit($limit);
138 while (true) {
139 $triggers = $query->execute();
140
141 foreach ($triggers as $trigger) {
142 $event = $trigger->getEvent();
143 if ($event) {
144 $last_epoch = $event->getLastEventEpoch();
145 } else {
146 $last_epoch = null;
147 }
148
149 $next_epoch = $trigger->getNextEventEpoch(
150 $last_epoch,
151 $is_reschedule = false);
152
153 $new_event = PhabricatorWorkerTriggerEvent::initializeNewEvent($trigger)
154 ->setLastEventEpoch($last_epoch)
155 ->setNextEventEpoch($next_epoch);
156
157 $new_event->openTransaction();
158 if ($event) {
159 $event->delete();
160 }
161
162 // Always save the new event. Note that we save it even if the next
163 // epoch is `null`, indicating that it will never fire, because we
164 // would lose the last epoch information if we delete it.
165 //
166 // In particular, some events may want to execute exactly once.
167 // Retaining the last epoch allows them to do this, even if the
168 // trigger is updated.
169 $new_event->save();
170
171 // Move the cursor forward to make sure we don't reprocess this
172 // trigger until it is updated again.
173 $this->updateCursor($trigger->getTriggerVersion() + 1);
174 $new_event->saveTransaction();
175 }
176
177 // If we saw fewer than a full page of updated triggers, we're caught
178 // up, so we can move on to the execution phase.
179 if (count($triggers) < $limit) {
180 break;
181 }
182
183 // Otherwise, skip past the stuff we just processed and grab another
184 // page of updated triggers.
185 $min = last($triggers)->getTriggerVersion() + 1;
186 $query->withVersionBetween($min, null);
187
188 $this->stillWorking();
189 }
190 }
191
192
193 /**
194 * Run scheduled event triggers which are due for execution.
195 *
196 * @return void
197 */
198 private function executeTriggers() {
199
200 // We run only a limited number of triggers before ending the execution
201 // phase. If we ran until exhaustion, we could end up executing very
202 // out-of-date triggers if there was a long backlog: trigger changes
203 // during this phase are not reflected in the event table until we run
204 // another scheduling phase.
205
206 // If we exit this phase with triggers still ready to execute we'll
207 // jump back into the scheduling phase immediately, so this just makes
208 // sure we don't spend an unreasonably long amount of time without
209 // processing trigger updates and doing rescheduling.
210
211 $limit = 100;
212 $now = PhabricatorTime::getNow();
213
214 $triggers = id(new PhabricatorWorkerTriggerQuery())
215 ->setViewer($this->getViewer())
216 ->setOrder(PhabricatorWorkerTriggerQuery::ORDER_EXECUTION)
217 ->withNextEventBetween(null, $now)
218 ->needEvents(true)
219 ->setLimit($limit)
220 ->execute();
221 foreach ($triggers as $trigger) {
222 $event = $trigger->getEvent();
223
224 // Execute the trigger action.
225 $trigger->executeTrigger(
226 $event->getLastEventEpoch(),
227 $event->getNextEventEpoch());
228
229 // Now that we've executed the trigger, the current trigger epoch is
230 // going to become the last epoch.
231 $last_epoch = $event->getNextEventEpoch();
232
233 // If this is a recurring trigger, give it an opportunity to reschedule.
234 $reschedule_epoch = $trigger->getNextEventEpoch(
235 $last_epoch,
236 $is_reschedule = true);
237
238 // Don't reschedule events unless the next occurrence is in the future.
239 if (($reschedule_epoch !== null) &&
240 ($last_epoch !== null) &&
241 ($reschedule_epoch <= $last_epoch)) {
242 throw new Exception(
243 pht(
244 'Trigger is attempting to perform a routine reschedule where '.
245 'the next event (at %s) does not occur after the previous event '.
246 '(at %s). Routine reschedules must strictly move event triggers '.
247 'forward through time to avoid executing a trigger an infinite '.
248 'number of times instantaneously.',
249 $reschedule_epoch,
250 $last_epoch));
251 }
252
253 $new_event = PhabricatorWorkerTriggerEvent::initializeNewEvent($trigger)
254 ->setLastEventEpoch($last_epoch)
255 ->setNextEventEpoch($reschedule_epoch);
256
257 $event->openTransaction();
258 // Remove the event we just processed.
259 $event->delete();
260
261 // See note in the scheduling phase about this; we save the new event
262 // even if the next epoch is `null`.
263 $new_event->save();
264 $event->saveTransaction();
265 }
266 }
267
268
269 /**
270 * Get the number of seconds to sleep for before starting the next scheduling
271 * phase.
272 *
273 * If no events are scheduled soon, we'll sleep briefly. Otherwise,
274 * we'll sleep until the next scheduled event.
275 *
276 * @return int Number of seconds to sleep for.
277 */
278 private function getSleepDuration() {
279 $sleep = phutil_units('3 minutes in seconds');
280
281 $next_triggers = id(new PhabricatorWorkerTriggerQuery())
282 ->setViewer($this->getViewer())
283 ->setOrder(PhabricatorWorkerTriggerQuery::ORDER_EXECUTION)
284 ->withNextEventBetween(0, null)
285 ->setLimit(1)
286 ->needEvents(true)
287 ->execute();
288 if ($next_triggers) {
289 $next_trigger = head($next_triggers);
290 $next_epoch = $next_trigger->getEvent()->getNextEventEpoch();
291 $until = max(0, $next_epoch - PhabricatorTime::getNow());
292 $sleep = min($sleep, $until);
293 }
294
295 return $sleep;
296 }
297
298
299/* -( Counters )----------------------------------------------------------- */
300
301
302 private function loadCurrentCursor() {
303 return $this->loadCurrentCounter(self::COUNTER_CURSOR);
304 }
305
306 private function loadCurrentVersion() {
307 return $this->loadCurrentCounter(self::COUNTER_VERSION);
308 }
309
310 private function updateCursor($value) {
311 LiskDAO::overwriteCounterValue(
312 id(new PhabricatorWorkerTrigger())->establishConnection('w'),
313 self::COUNTER_CURSOR,
314 $value);
315 }
316
317 private function loadCurrentCounter($counter_name) {
318 return (int)LiskDAO::loadCurrentCounterValue(
319 id(new PhabricatorWorkerTrigger())->establishConnection('w'),
320 $counter_name);
321 }
322
323
324/* -( Garbage Collection )------------------------------------------------- */
325
326
327 /**
328 * Run the garbage collector for up to a specified number of seconds.
329 *
330 * @param int $duration Number of seconds the GC may run for.
331 * @return int Number of seconds remaining in the time budget.
332 * @task garbage
333 */
334 private function runGarbageCollection($duration) {
335 $run_until = (PhabricatorTime::getNow() + $duration);
336
337 // NOTE: We always run at least one GC cycle to make sure the GC can make
338 // progress even if the trigger queue is busy.
339 do {
340 $more_garbage = $this->updateGarbageCollection();
341 if (!$more_garbage) {
342 // If we don't have any more collection work to perform, we're all
343 // done.
344 break;
345 }
346 } while (PhabricatorTime::getNow() <= $run_until);
347
348 $remaining = max(0, $run_until - PhabricatorTime::getNow());
349
350 return $remaining;
351 }
352
353
354 /**
355 * Update garbage collection, possibly collecting a small amount of garbage.
356 *
357 * @return bool True if there is more garbage to collect.
358 * @task garbage
359 */
360 private function updateGarbageCollection() {
361 // If we're ready to start the next collection cycle, load all the
362 // collectors.
363 $next = $this->nextCollection;
364 if ($next && (PhabricatorTime::getNow() >= $next)) {
365 $this->nextCollection = null;
366
367 $all_collectors = PhabricatorGarbageCollector::getAllCollectors();
368 $this->garbageCollectors = $all_collectors;
369 }
370
371 // If we're in a collection cycle, continue collection.
372 if ($this->garbageCollectors) {
373 foreach ($this->garbageCollectors as $key => $collector) {
374 $more_garbage = $collector->runCollector();
375 if (!$more_garbage) {
376 unset($this->garbageCollectors[$key]);
377 }
378 // We only run one collection per call, to prevent triggers from being
379 // thrown too far off schedule if there's a lot of garbage to collect.
380 break;
381 }
382
383 if ($this->garbageCollectors) {
384 // If we have more work to do, return true.
385 return true;
386 }
387
388 // Otherwise, reschedule another cycle in 4 hours.
389 $now = PhabricatorTime::getNow();
390 $wait = phutil_units('4 hours in seconds');
391 $this->nextCollection = $now + $wait;
392 }
393
394 return false;
395 }
396
397
398/* -( Nuance Importers )--------------------------------------------------- */
399
400
401 private function runNuanceImportCursors($duration) {
402 $run_until = (PhabricatorTime::getNow() + $duration);
403
404 do {
405 $more_data = $this->updateNuanceImportCursors();
406 if (!$more_data) {
407 break;
408 }
409 } while (PhabricatorTime::getNow() <= $run_until);
410
411 $remaining = max(0, $run_until - PhabricatorTime::getNow());
412
413 return $remaining;
414 }
415
416
417 private function updateNuanceImportCursors() {
418 $nuance_class = PhabricatorNuanceApplication::class;
419 if (!PhabricatorApplication::isClassInstalled($nuance_class)) {
420 return false;
421 }
422
423 // If we haven't loaded sources yet, load them first.
424 if (!$this->nuanceSources && !$this->nuanceCursors) {
425 $this->anyNuanceData = false;
426
427 $sources = id(new NuanceSourceQuery())
428 ->setViewer($this->getViewer())
429 ->withIsDisabled(false)
430 ->withHasImportCursors(true)
431 ->execute();
432 if (!$sources) {
433 return false;
434 }
435
436 $this->nuanceSources = array_reverse($sources);
437 }
438
439 // If we don't have any cursors, move to the next source and generate its
440 // cursors.
441 if (!$this->nuanceCursors) {
442 $source = array_pop($this->nuanceSources);
443
444 $definition = $source->getDefinition()
445 ->setViewer($this->getViewer())
446 ->setSource($source);
447
448 $cursors = $definition->getImportCursors();
449 $this->nuanceCursors = array_reverse($cursors);
450 }
451
452 // Update the next cursor.
453 $cursor = array_pop($this->nuanceCursors);
454 if ($cursor) {
455 $more_data = $cursor->importFromSource();
456 if ($more_data) {
457 $this->anyNuanceData = true;
458 }
459 }
460
461 if (!$this->nuanceSources && !$this->nuanceCursors) {
462 return $this->anyNuanceData;
463 }
464
465 return true;
466 }
467
468
469/* -( Calendar Notifier )-------------------------------------------------- */
470
471
472 private function runCalendarNotifier($duration) {
473 $run_until = (PhabricatorTime::getNow() + $duration);
474
475 if (!$this->calendarEngine) {
476 $this->calendarEngine = new PhabricatorCalendarNotificationEngine();
477 }
478
479 $this->calendarEngine->publishNotifications();
480
481 $remaining = max(0, $run_until - PhabricatorTime::getNow());
482 return $remaining;
483 }
484
485}