@recaptime-dev's working patches + fork for Phorge, a community fork of Phabricator. (Upstream dev and stable branches are at upstream/main and upstream/stable respectively.) hq.recaptime.dev/wiki/Phorge
phorge phabricator
at recaptime-dev/main 485 lines 16 kB view raw
1<?php 2 3/** 4 * Schedule and execute event triggers, which run code at specific times. 5 * 6 * Also performs garbage collection of old logs, caches, etc. 7 * 8 * @task garbage Garbage Collection 9 */ 10final class PhabricatorTriggerDaemon 11 extends PhabricatorDaemon { 12 13 const COUNTER_VERSION = 'trigger.version'; 14 const COUNTER_CURSOR = 'trigger.cursor'; 15 16 private $garbageCollectors; 17 private $nextCollection; 18 19 private $anyNuanceData; 20 private $nuanceSources; 21 private $nuanceCursors; 22 23 private $calendarEngine; 24 25 protected function run() { 26 27 // The trigger daemon is a low-level infrastructure daemon which schedules 28 // and executes chronological events. Examples include a subscription which 29 // generates a bill on the 12th of every month, or a reminder email 15 30 // minutes before a meeting. 31 32 // Only one trigger daemon can run at a time, and very little work should 33 // happen in the daemon process. In general, triggered events should 34 // just schedule a task into the normal daemon worker queue and then 35 // return. This allows the real work to take longer to execute without 36 // disrupting other triggers. 37 38 // The trigger mechanism guarantees that events will execute exactly once, 39 // but does not guarantee that they will execute at precisely the specified 40 // time. Under normal circumstances, they should execute within a minute or 41 // so of the desired time, so this mechanism can be used for things like 42 // meeting reminders. 43 44 // If the trigger queue backs up (for example, because it is overwhelmed by 45 // trigger updates, doesn't run for a while, or a trigger action is written 46 // inefficiently) or the daemon queue backs up (usually for similar 47 // reasons), events may execute an arbitrarily long time after they were 48 // scheduled to execute. In some cases (like billing a subscription) this 49 // may be desirable; in other cases (like sending a meeting reminder) the 50 // action may want to check the current time and see if the event is still 51 // relevant. 52 53 // The trigger daemon works in two phases: 54 // 55 // 1. A scheduling phase processes recently updated triggers and 56 // schedules them for future execution. For example, this phase would 57 // see that a meeting trigger had been changed recently, determine 58 // when the reminder for it should execute, and then schedule the 59 // action to execute at that future date. 60 // 2. An execution phase runs the actions for any scheduled events which 61 // are due to execute. 62 // 63 // The major goal of this design is to deliver on the guarantee that events 64 // will execute exactly once. It prevents race conditions in scheduling 65 // and execution by ensuring there is only one writer for either of these 66 // phases. Without this separation of responsibilities, web processes 67 // trying to reschedule events after an update could race with other web 68 // processes or the daemon. 69 70 // We want to start the first GC cycle right away, not wait 4 hours. 71 $this->nextCollection = PhabricatorTime::getNow(); 72 73 do { 74 PhabricatorCaches::destroyRequestCache(); 75 76 $lock = PhabricatorGlobalLock::newLock('trigger'); 77 78 try { 79 $lock->lock(5); 80 } catch (PhutilLockException $ex) { 81 throw new Exception( 82 pht( 83 'Another process is holding the trigger lock. Usually, this '. 84 'means another copy of the trigger daemon is running elsewhere. '. 85 'Multiple processes are not permitted to update triggers '. 86 'simultaneously.'), 87 0, 88 $ex); 89 } 90 91 // Run the scheduling phase. This finds updated triggers which we have 92 // not scheduled yet and schedules them. 93 $last_version = $this->loadCurrentCursor(); 94 $head_version = $this->loadCurrentVersion(); 95 96 // The cursor points at the next record to process, so we can only skip 97 // this step if we're ahead of the version number. 98 if ($last_version <= $head_version) { 99 $this->scheduleTriggers($last_version); 100 } 101 102 // Run the execution phase. This finds events which are due to execute 103 // and runs them. 104 $this->executeTriggers(); 105 106 $lock->unlock(); 107 108 $sleep_duration = $this->getSleepDuration(); 109 $sleep_duration = $this->runNuanceImportCursors($sleep_duration); 110 $sleep_duration = $this->runGarbageCollection($sleep_duration); 111 $sleep_duration = $this->runCalendarNotifier($sleep_duration); 112 113 if ($this->shouldHibernate($sleep_duration)) { 114 break; 115 } 116 117 $this->sleep($sleep_duration); 118 } while (!$this->shouldExit()); 119 } 120 121 122 /** 123 * Process all of the triggers which have been updated since the last time 124 * the daemon ran, scheduling them into the event table. 125 * 126 * @param int $cursor Cursor for the next version update to process. 127 * @return void 128 */ 129 private function scheduleTriggers($cursor) { 130 $limit = 100; 131 132 $query = id(new PhabricatorWorkerTriggerQuery()) 133 ->setViewer($this->getViewer()) 134 ->withVersionBetween($cursor, null) 135 ->setOrder(PhabricatorWorkerTriggerQuery::ORDER_VERSION) 136 ->needEvents(true) 137 ->setLimit($limit); 138 while (true) { 139 $triggers = $query->execute(); 140 141 foreach ($triggers as $trigger) { 142 $event = $trigger->getEvent(); 143 if ($event) { 144 $last_epoch = $event->getLastEventEpoch(); 145 } else { 146 $last_epoch = null; 147 } 148 149 $next_epoch = $trigger->getNextEventEpoch( 150 $last_epoch, 151 $is_reschedule = false); 152 153 $new_event = PhabricatorWorkerTriggerEvent::initializeNewEvent($trigger) 154 ->setLastEventEpoch($last_epoch) 155 ->setNextEventEpoch($next_epoch); 156 157 $new_event->openTransaction(); 158 if ($event) { 159 $event->delete(); 160 } 161 162 // Always save the new event. Note that we save it even if the next 163 // epoch is `null`, indicating that it will never fire, because we 164 // would lose the last epoch information if we delete it. 165 // 166 // In particular, some events may want to execute exactly once. 167 // Retaining the last epoch allows them to do this, even if the 168 // trigger is updated. 169 $new_event->save(); 170 171 // Move the cursor forward to make sure we don't reprocess this 172 // trigger until it is updated again. 173 $this->updateCursor($trigger->getTriggerVersion() + 1); 174 $new_event->saveTransaction(); 175 } 176 177 // If we saw fewer than a full page of updated triggers, we're caught 178 // up, so we can move on to the execution phase. 179 if (count($triggers) < $limit) { 180 break; 181 } 182 183 // Otherwise, skip past the stuff we just processed and grab another 184 // page of updated triggers. 185 $min = last($triggers)->getTriggerVersion() + 1; 186 $query->withVersionBetween($min, null); 187 188 $this->stillWorking(); 189 } 190 } 191 192 193 /** 194 * Run scheduled event triggers which are due for execution. 195 * 196 * @return void 197 */ 198 private function executeTriggers() { 199 200 // We run only a limited number of triggers before ending the execution 201 // phase. If we ran until exhaustion, we could end up executing very 202 // out-of-date triggers if there was a long backlog: trigger changes 203 // during this phase are not reflected in the event table until we run 204 // another scheduling phase. 205 206 // If we exit this phase with triggers still ready to execute we'll 207 // jump back into the scheduling phase immediately, so this just makes 208 // sure we don't spend an unreasonably long amount of time without 209 // processing trigger updates and doing rescheduling. 210 211 $limit = 100; 212 $now = PhabricatorTime::getNow(); 213 214 $triggers = id(new PhabricatorWorkerTriggerQuery()) 215 ->setViewer($this->getViewer()) 216 ->setOrder(PhabricatorWorkerTriggerQuery::ORDER_EXECUTION) 217 ->withNextEventBetween(null, $now) 218 ->needEvents(true) 219 ->setLimit($limit) 220 ->execute(); 221 foreach ($triggers as $trigger) { 222 $event = $trigger->getEvent(); 223 224 // Execute the trigger action. 225 $trigger->executeTrigger( 226 $event->getLastEventEpoch(), 227 $event->getNextEventEpoch()); 228 229 // Now that we've executed the trigger, the current trigger epoch is 230 // going to become the last epoch. 231 $last_epoch = $event->getNextEventEpoch(); 232 233 // If this is a recurring trigger, give it an opportunity to reschedule. 234 $reschedule_epoch = $trigger->getNextEventEpoch( 235 $last_epoch, 236 $is_reschedule = true); 237 238 // Don't reschedule events unless the next occurrence is in the future. 239 if (($reschedule_epoch !== null) && 240 ($last_epoch !== null) && 241 ($reschedule_epoch <= $last_epoch)) { 242 throw new Exception( 243 pht( 244 'Trigger is attempting to perform a routine reschedule where '. 245 'the next event (at %s) does not occur after the previous event '. 246 '(at %s). Routine reschedules must strictly move event triggers '. 247 'forward through time to avoid executing a trigger an infinite '. 248 'number of times instantaneously.', 249 $reschedule_epoch, 250 $last_epoch)); 251 } 252 253 $new_event = PhabricatorWorkerTriggerEvent::initializeNewEvent($trigger) 254 ->setLastEventEpoch($last_epoch) 255 ->setNextEventEpoch($reschedule_epoch); 256 257 $event->openTransaction(); 258 // Remove the event we just processed. 259 $event->delete(); 260 261 // See note in the scheduling phase about this; we save the new event 262 // even if the next epoch is `null`. 263 $new_event->save(); 264 $event->saveTransaction(); 265 } 266 } 267 268 269 /** 270 * Get the number of seconds to sleep for before starting the next scheduling 271 * phase. 272 * 273 * If no events are scheduled soon, we'll sleep briefly. Otherwise, 274 * we'll sleep until the next scheduled event. 275 * 276 * @return int Number of seconds to sleep for. 277 */ 278 private function getSleepDuration() { 279 $sleep = phutil_units('3 minutes in seconds'); 280 281 $next_triggers = id(new PhabricatorWorkerTriggerQuery()) 282 ->setViewer($this->getViewer()) 283 ->setOrder(PhabricatorWorkerTriggerQuery::ORDER_EXECUTION) 284 ->withNextEventBetween(0, null) 285 ->setLimit(1) 286 ->needEvents(true) 287 ->execute(); 288 if ($next_triggers) { 289 $next_trigger = head($next_triggers); 290 $next_epoch = $next_trigger->getEvent()->getNextEventEpoch(); 291 $until = max(0, $next_epoch - PhabricatorTime::getNow()); 292 $sleep = min($sleep, $until); 293 } 294 295 return $sleep; 296 } 297 298 299/* -( Counters )----------------------------------------------------------- */ 300 301 302 private function loadCurrentCursor() { 303 return $this->loadCurrentCounter(self::COUNTER_CURSOR); 304 } 305 306 private function loadCurrentVersion() { 307 return $this->loadCurrentCounter(self::COUNTER_VERSION); 308 } 309 310 private function updateCursor($value) { 311 LiskDAO::overwriteCounterValue( 312 id(new PhabricatorWorkerTrigger())->establishConnection('w'), 313 self::COUNTER_CURSOR, 314 $value); 315 } 316 317 private function loadCurrentCounter($counter_name) { 318 return (int)LiskDAO::loadCurrentCounterValue( 319 id(new PhabricatorWorkerTrigger())->establishConnection('w'), 320 $counter_name); 321 } 322 323 324/* -( Garbage Collection )------------------------------------------------- */ 325 326 327 /** 328 * Run the garbage collector for up to a specified number of seconds. 329 * 330 * @param int $duration Number of seconds the GC may run for. 331 * @return int Number of seconds remaining in the time budget. 332 * @task garbage 333 */ 334 private function runGarbageCollection($duration) { 335 $run_until = (PhabricatorTime::getNow() + $duration); 336 337 // NOTE: We always run at least one GC cycle to make sure the GC can make 338 // progress even if the trigger queue is busy. 339 do { 340 $more_garbage = $this->updateGarbageCollection(); 341 if (!$more_garbage) { 342 // If we don't have any more collection work to perform, we're all 343 // done. 344 break; 345 } 346 } while (PhabricatorTime::getNow() <= $run_until); 347 348 $remaining = max(0, $run_until - PhabricatorTime::getNow()); 349 350 return $remaining; 351 } 352 353 354 /** 355 * Update garbage collection, possibly collecting a small amount of garbage. 356 * 357 * @return bool True if there is more garbage to collect. 358 * @task garbage 359 */ 360 private function updateGarbageCollection() { 361 // If we're ready to start the next collection cycle, load all the 362 // collectors. 363 $next = $this->nextCollection; 364 if ($next && (PhabricatorTime::getNow() >= $next)) { 365 $this->nextCollection = null; 366 367 $all_collectors = PhabricatorGarbageCollector::getAllCollectors(); 368 $this->garbageCollectors = $all_collectors; 369 } 370 371 // If we're in a collection cycle, continue collection. 372 if ($this->garbageCollectors) { 373 foreach ($this->garbageCollectors as $key => $collector) { 374 $more_garbage = $collector->runCollector(); 375 if (!$more_garbage) { 376 unset($this->garbageCollectors[$key]); 377 } 378 // We only run one collection per call, to prevent triggers from being 379 // thrown too far off schedule if there's a lot of garbage to collect. 380 break; 381 } 382 383 if ($this->garbageCollectors) { 384 // If we have more work to do, return true. 385 return true; 386 } 387 388 // Otherwise, reschedule another cycle in 4 hours. 389 $now = PhabricatorTime::getNow(); 390 $wait = phutil_units('4 hours in seconds'); 391 $this->nextCollection = $now + $wait; 392 } 393 394 return false; 395 } 396 397 398/* -( Nuance Importers )--------------------------------------------------- */ 399 400 401 private function runNuanceImportCursors($duration) { 402 $run_until = (PhabricatorTime::getNow() + $duration); 403 404 do { 405 $more_data = $this->updateNuanceImportCursors(); 406 if (!$more_data) { 407 break; 408 } 409 } while (PhabricatorTime::getNow() <= $run_until); 410 411 $remaining = max(0, $run_until - PhabricatorTime::getNow()); 412 413 return $remaining; 414 } 415 416 417 private function updateNuanceImportCursors() { 418 $nuance_class = PhabricatorNuanceApplication::class; 419 if (!PhabricatorApplication::isClassInstalled($nuance_class)) { 420 return false; 421 } 422 423 // If we haven't loaded sources yet, load them first. 424 if (!$this->nuanceSources && !$this->nuanceCursors) { 425 $this->anyNuanceData = false; 426 427 $sources = id(new NuanceSourceQuery()) 428 ->setViewer($this->getViewer()) 429 ->withIsDisabled(false) 430 ->withHasImportCursors(true) 431 ->execute(); 432 if (!$sources) { 433 return false; 434 } 435 436 $this->nuanceSources = array_reverse($sources); 437 } 438 439 // If we don't have any cursors, move to the next source and generate its 440 // cursors. 441 if (!$this->nuanceCursors) { 442 $source = array_pop($this->nuanceSources); 443 444 $definition = $source->getDefinition() 445 ->setViewer($this->getViewer()) 446 ->setSource($source); 447 448 $cursors = $definition->getImportCursors(); 449 $this->nuanceCursors = array_reverse($cursors); 450 } 451 452 // Update the next cursor. 453 $cursor = array_pop($this->nuanceCursors); 454 if ($cursor) { 455 $more_data = $cursor->importFromSource(); 456 if ($more_data) { 457 $this->anyNuanceData = true; 458 } 459 } 460 461 if (!$this->nuanceSources && !$this->nuanceCursors) { 462 return $this->anyNuanceData; 463 } 464 465 return true; 466 } 467 468 469/* -( Calendar Notifier )-------------------------------------------------- */ 470 471 472 private function runCalendarNotifier($duration) { 473 $run_until = (PhabricatorTime::getNow() + $duration); 474 475 if (!$this->calendarEngine) { 476 $this->calendarEngine = new PhabricatorCalendarNotificationEngine(); 477 } 478 479 $this->calendarEngine->publishNotifications(); 480 481 $remaining = max(0, $run_until - PhabricatorTime::getNow()); 482 return $remaining; 483 } 484 485}