@recaptime-dev's working patches + fork for Phorge, a community fork of Phabricator. (Upstream dev and stable branches are at upstream/main and upstream/stable respectively.) hq.recaptime.dev/wiki/Phorge
phorge phabricator
at recaptime-dev/main 344 lines 10 kB view raw
1<?php 2 3/** 4 * @task config Configuring Retries and Failures 5 */ 6abstract class PhabricatorWorker extends Phobject { 7 8 private $data; 9 private static $runAllTasksInProcess = false; 10 private $queuedTasks = array(); 11 private $currentWorkerTask; 12 13 // NOTE: Lower priority numbers execute first. The priority numbers have to 14 // have the same ordering that IDs do (lowest first) so MySQL can use a 15 // multipart key across both of them efficiently. 16 17 const PRIORITY_ALERTS = 1000; 18 const PRIORITY_DEFAULT = 2000; 19 const PRIORITY_COMMIT = 2500; 20 const PRIORITY_BULK = 3000; 21 const PRIORITY_INDEX = 3500; 22 const PRIORITY_IMPORT = 4000; 23 24 /** 25 * Special owner indicating that the task has yielded. 26 */ 27 const YIELD_OWNER = '(yield)'; 28 29/* -( Configuring Retries and Failures )----------------------------------- */ 30 31 32 /** 33 * Return the number of seconds this worker needs hold a lease on the task for 34 * while it performs work. For most tasks you can leave this at `null`, which 35 * will give you a default lease (currently 2 hours). 36 * 37 * For tasks which may take a very long time to complete, you should return 38 * an upper bound on the amount of time the task may require. 39 * 40 * @return int|null Number of seconds this task needs to remain leased for, 41 * or null for a default lease. 42 * 43 * @task config 44 */ 45 public function getRequiredLeaseTime() { 46 return null; 47 } 48 49 50 /** 51 * Return the maximum number of times this task may be retried before it is 52 * considered permanently failed. By default, tasks retry indefinitely. You 53 * can throw a @{class:PhabricatorWorkerPermanentFailureException} to cause an 54 * immediate permanent failure. 55 * 56 * @return int|null Number of times the task will retry before permanent 57 * failure. Return `null` to retry indefinitely. 58 * 59 * @task config 60 */ 61 public function getMaximumRetryCount() { 62 return null; 63 } 64 65 66 /** 67 * Return the number of seconds a task should wait after a failure before 68 * retrying. For most tasks you can leave this at `null`, which will give you 69 * a short default retry period (currently 60 seconds). 70 * 71 * @param PhabricatorWorkerTask $task The task itself. This object is 72 * probably useful mostly to examine the 73 * failure count if you want to implement 74 * staggered retries, or to examine the 75 * execution exception if you want to react to 76 * different failures in different ways. 77 * @return int|null Number of seconds to wait between retries, 78 * or null for a default retry period 79 * (currently 60 seconds). 80 * 81 * @task config 82 */ 83 public function getWaitBeforeRetry(PhabricatorWorkerTask $task) { 84 return null; 85 } 86 87 public function setCurrentWorkerTask(PhabricatorWorkerTask $task) { 88 $this->currentWorkerTask = $task; 89 return $this; 90 } 91 92 /** 93 * @return PhabricatorWorkerTask 94 */ 95 public function getCurrentWorkerTask() { 96 return $this->currentWorkerTask; 97 } 98 99 public function getCurrentWorkerTaskID() { 100 $task = $this->getCurrentWorkerTask(); 101 if (!$task) { 102 return null; 103 } 104 return $task->getID(); 105 } 106 107 /** 108 * Perform some preparations and set up context, then call the final 109 * functionality (e.g. publishFeedStory() or importEvents() or whatever 110 * work the PhabricatorWorker subclass is supposed to do). 111 * 112 * @return void 113 */ 114 abstract protected function doWork(); 115 116 final public function __construct($data) { 117 $this->data = $data; 118 } 119 120 final protected function getTaskData() { 121 return $this->data; 122 } 123 124 final protected function getTaskDataValue($key, $default = null) { 125 $data = $this->getTaskData(); 126 if (!is_array($data)) { 127 throw new PhabricatorWorkerPermanentFailureException( 128 pht('Expected task data to be a dictionary.')); 129 } 130 return idx($data, $key, $default); 131 } 132 133 /** 134 * Public wrapper function which calls the doWork() function 135 * 136 * @return void 137 */ 138 final public function executeTask() { 139 $this->doWork(); 140 } 141 142 final public static function scheduleTask( 143 $task_class, 144 $data, 145 $options = array()) { 146 147 PhutilTypeSpec::checkMap( 148 $options, 149 array( 150 'priority' => 'optional int|null', 151 'objectPHID' => 'optional string|null', 152 'containerPHID' => 'optional string|null', 153 'delayUntil' => 'optional int|null', 154 )); 155 156 $priority = idx($options, 'priority'); 157 if ($priority === null) { 158 $priority = self::PRIORITY_DEFAULT; 159 } 160 $object_phid = idx($options, 'objectPHID'); 161 $container_phid = idx($options, 'containerPHID'); 162 163 $task = id(new PhabricatorWorkerActiveTask()) 164 ->setTaskClass($task_class) 165 ->setData($data) 166 ->setPriority($priority) 167 ->setObjectPHID($object_phid) 168 ->setContainerPHID($container_phid); 169 170 $delay = idx($options, 'delayUntil'); 171 if ($delay) { 172 $task->setLeaseExpires($delay); 173 } 174 175 if (self::$runAllTasksInProcess) { 176 // Do the work in-process. 177 $worker = newv($task_class, array($data)); 178 179 while (true) { 180 try { 181 $worker->executeTask(); 182 $worker->flushTaskQueue(); 183 184 $task_result = PhabricatorWorkerArchiveTask::RESULT_SUCCESS; 185 break; 186 } catch (PhabricatorWorkerPermanentFailureException $ex) { 187 $proxy = new Exception( 188 pht( 189 'In-process task ("%s") failed permanently.', 190 $task_class), 191 0, 192 $ex); 193 194 phlog($proxy); 195 196 $task_result = PhabricatorWorkerArchiveTask::RESULT_FAILURE; 197 break; 198 } catch (PhabricatorWorkerYieldException $ex) { 199 phlog( 200 pht( 201 'In-process task "%s" yielded for %s seconds, sleeping...', 202 $task_class, 203 $ex->getDuration())); 204 sleep($ex->getDuration()); 205 } 206 } 207 208 // Now, save a task row and immediately archive it so we can return an 209 // object with a valid ID. 210 $task->openTransaction(); 211 $task->save(); 212 $archived = $task->archiveTask($task_result, 0); 213 $task->saveTransaction(); 214 215 return $archived; 216 } else { 217 $task->save(); 218 return $task; 219 } 220 } 221 222 223 public function renderForDisplay(PhabricatorUser $viewer) { 224 return null; 225 } 226 227 /** 228 * Set this flag to execute scheduled tasks synchronously, in the same 229 * process. This is useful for debugging, and otherwise dramatically worse 230 * in every way imaginable. 231 */ 232 public static function setRunAllTasksInProcess($all) { 233 self::$runAllTasksInProcess = $all; 234 } 235 236 final protected function log($pattern /* , ... */) { 237 $console = PhutilConsole::getConsole(); 238 $argv = func_get_args(); 239 call_user_func_array(array($console, 'writeLog'), $argv); 240 return $this; 241 } 242 243 244 /** 245 * Queue a task to be executed after this one succeeds. 246 * 247 * The followup task will be queued only if this task completes cleanly. 248 * 249 * @param string $class Task class to queue. 250 * @param array $data Data for the followup task. 251 * @param array $options (optional) Options for the followup task. 252 * @return $this 253 */ 254 final protected function queueTask( 255 $class, 256 array $data, 257 array $options = array()) { 258 $this->queuedTasks[] = array($class, $data, $options); 259 return $this; 260 } 261 262 263 /** 264 * Get tasks queued as followups by @{method:queueTask}. 265 * 266 * @return list<array{string, mixed, int|null}> Queued task specifications. 267 */ 268 final protected function getQueuedTasks() { 269 return $this->queuedTasks; 270 } 271 272 273 /** 274 * Schedule any queued tasks, then empty the task queue. 275 * 276 * By default, the queue is flushed only if a task succeeds. You can call 277 * this method to force the queue to flush before failing (for example, if 278 * you are using queues to improve locking behavior). 279 * 280 * @param map<string, mixed> $defaults (optional) Default options. 281 */ 282 final public function flushTaskQueue($defaults = array()) { 283 foreach ($this->getQueuedTasks() as $task) { 284 list($class, $data, $options) = $task; 285 286 $options = $options + $defaults; 287 288 self::scheduleTask($class, $data, $options); 289 } 290 291 $this->queuedTasks = array(); 292 } 293 294 295 /** 296 * Awaken tasks that have yielded. 297 * 298 * Reschedules the specified tasks if they are currently queued in a yielded, 299 * unleased, unretried state so they'll execute sooner. This can let the 300 * queue avoid unnecessary waits. 301 * 302 * This method does not provide any assurances about when these tasks will 303 * execute, or even guarantee that it will have any effect at all. 304 * 305 * @param array<int> $ids List of task IDs to try to awaken. 306 * @return void 307 */ 308 final public static function awakenTaskIDs(array $ids) { 309 if (!$ids) { 310 return; 311 } 312 313 $table = new PhabricatorWorkerActiveTask(); 314 $conn_w = $table->establishConnection('w'); 315 316 // NOTE: At least for now, we're keeping these tasks yielded, just 317 // pretending that they threw a shorter yield than they really did. 318 319 // Overlap the windows here to handle minor client/server time differences 320 // and because it's likely correct to push these tasks to the head of their 321 // respective priorities. There is a good chance they are ready to execute. 322 $window = phutil_units('1 hour in seconds'); 323 $epoch_ago = (PhabricatorTime::getNow() - $window); 324 325 queryfx( 326 $conn_w, 327 'UPDATE %T SET leaseExpires = %d 328 WHERE id IN (%Ld) 329 AND leaseOwner = %s 330 AND leaseExpires > %d 331 AND failureCount = 0', 332 $table->getTableName(), 333 $epoch_ago, 334 $ids, 335 self::YIELD_OWNER, 336 $epoch_ago); 337 } 338 339 protected function newContentSource() { 340 return PhabricatorContentSource::newForSource( 341 PhabricatorDaemonContentSource::SOURCECONST); 342 } 343 344}