@recaptime-dev's working patches + fork for Phorge, a community fork of Phabricator. (Upstream dev and stable branches are at upstream/main and upstream/stable respectively.)
hq.recaptime.dev/wiki/Phorge
phorge
phabricator
1<?php
2
3/**
4 * @task config Configuring Retries and Failures
5 */
6abstract class PhabricatorWorker extends Phobject {
7
8 private $data;
9 private static $runAllTasksInProcess = false;
10 private $queuedTasks = array();
11 private $currentWorkerTask;
12
13 // NOTE: Lower priority numbers execute first. The priority numbers have to
14 // have the same ordering that IDs do (lowest first) so MySQL can use a
15 // multipart key across both of them efficiently.
16
17 const PRIORITY_ALERTS = 1000;
18 const PRIORITY_DEFAULT = 2000;
19 const PRIORITY_COMMIT = 2500;
20 const PRIORITY_BULK = 3000;
21 const PRIORITY_INDEX = 3500;
22 const PRIORITY_IMPORT = 4000;
23
24 /**
25 * Special owner indicating that the task has yielded.
26 */
27 const YIELD_OWNER = '(yield)';
28
29/* -( Configuring Retries and Failures )----------------------------------- */
30
31
32 /**
33 * Return the number of seconds this worker needs hold a lease on the task for
34 * while it performs work. For most tasks you can leave this at `null`, which
35 * will give you a default lease (currently 2 hours).
36 *
37 * For tasks which may take a very long time to complete, you should return
38 * an upper bound on the amount of time the task may require.
39 *
40 * @return int|null Number of seconds this task needs to remain leased for,
41 * or null for a default lease.
42 *
43 * @task config
44 */
45 public function getRequiredLeaseTime() {
46 return null;
47 }
48
49
50 /**
51 * Return the maximum number of times this task may be retried before it is
52 * considered permanently failed. By default, tasks retry indefinitely. You
53 * can throw a @{class:PhabricatorWorkerPermanentFailureException} to cause an
54 * immediate permanent failure.
55 *
56 * @return int|null Number of times the task will retry before permanent
57 * failure. Return `null` to retry indefinitely.
58 *
59 * @task config
60 */
61 public function getMaximumRetryCount() {
62 return null;
63 }
64
65
66 /**
67 * Return the number of seconds a task should wait after a failure before
68 * retrying. For most tasks you can leave this at `null`, which will give you
69 * a short default retry period (currently 60 seconds).
70 *
71 * @param PhabricatorWorkerTask $task The task itself. This object is
72 * probably useful mostly to examine the
73 * failure count if you want to implement
74 * staggered retries, or to examine the
75 * execution exception if you want to react to
76 * different failures in different ways.
77 * @return int|null Number of seconds to wait between retries,
78 * or null for a default retry period
79 * (currently 60 seconds).
80 *
81 * @task config
82 */
83 public function getWaitBeforeRetry(PhabricatorWorkerTask $task) {
84 return null;
85 }
86
87 public function setCurrentWorkerTask(PhabricatorWorkerTask $task) {
88 $this->currentWorkerTask = $task;
89 return $this;
90 }
91
92 /**
93 * @return PhabricatorWorkerTask
94 */
95 public function getCurrentWorkerTask() {
96 return $this->currentWorkerTask;
97 }
98
99 public function getCurrentWorkerTaskID() {
100 $task = $this->getCurrentWorkerTask();
101 if (!$task) {
102 return null;
103 }
104 return $task->getID();
105 }
106
107 /**
108 * Perform some preparations and set up context, then call the final
109 * functionality (e.g. publishFeedStory() or importEvents() or whatever
110 * work the PhabricatorWorker subclass is supposed to do).
111 *
112 * @return void
113 */
114 abstract protected function doWork();
115
116 final public function __construct($data) {
117 $this->data = $data;
118 }
119
120 final protected function getTaskData() {
121 return $this->data;
122 }
123
124 final protected function getTaskDataValue($key, $default = null) {
125 $data = $this->getTaskData();
126 if (!is_array($data)) {
127 throw new PhabricatorWorkerPermanentFailureException(
128 pht('Expected task data to be a dictionary.'));
129 }
130 return idx($data, $key, $default);
131 }
132
133 /**
134 * Public wrapper function which calls the doWork() function
135 *
136 * @return void
137 */
138 final public function executeTask() {
139 $this->doWork();
140 }
141
142 final public static function scheduleTask(
143 $task_class,
144 $data,
145 $options = array()) {
146
147 PhutilTypeSpec::checkMap(
148 $options,
149 array(
150 'priority' => 'optional int|null',
151 'objectPHID' => 'optional string|null',
152 'containerPHID' => 'optional string|null',
153 'delayUntil' => 'optional int|null',
154 ));
155
156 $priority = idx($options, 'priority');
157 if ($priority === null) {
158 $priority = self::PRIORITY_DEFAULT;
159 }
160 $object_phid = idx($options, 'objectPHID');
161 $container_phid = idx($options, 'containerPHID');
162
163 $task = id(new PhabricatorWorkerActiveTask())
164 ->setTaskClass($task_class)
165 ->setData($data)
166 ->setPriority($priority)
167 ->setObjectPHID($object_phid)
168 ->setContainerPHID($container_phid);
169
170 $delay = idx($options, 'delayUntil');
171 if ($delay) {
172 $task->setLeaseExpires($delay);
173 }
174
175 if (self::$runAllTasksInProcess) {
176 // Do the work in-process.
177 $worker = newv($task_class, array($data));
178
179 while (true) {
180 try {
181 $worker->executeTask();
182 $worker->flushTaskQueue();
183
184 $task_result = PhabricatorWorkerArchiveTask::RESULT_SUCCESS;
185 break;
186 } catch (PhabricatorWorkerPermanentFailureException $ex) {
187 $proxy = new Exception(
188 pht(
189 'In-process task ("%s") failed permanently.',
190 $task_class),
191 0,
192 $ex);
193
194 phlog($proxy);
195
196 $task_result = PhabricatorWorkerArchiveTask::RESULT_FAILURE;
197 break;
198 } catch (PhabricatorWorkerYieldException $ex) {
199 phlog(
200 pht(
201 'In-process task "%s" yielded for %s seconds, sleeping...',
202 $task_class,
203 $ex->getDuration()));
204 sleep($ex->getDuration());
205 }
206 }
207
208 // Now, save a task row and immediately archive it so we can return an
209 // object with a valid ID.
210 $task->openTransaction();
211 $task->save();
212 $archived = $task->archiveTask($task_result, 0);
213 $task->saveTransaction();
214
215 return $archived;
216 } else {
217 $task->save();
218 return $task;
219 }
220 }
221
222
223 public function renderForDisplay(PhabricatorUser $viewer) {
224 return null;
225 }
226
227 /**
228 * Set this flag to execute scheduled tasks synchronously, in the same
229 * process. This is useful for debugging, and otherwise dramatically worse
230 * in every way imaginable.
231 */
232 public static function setRunAllTasksInProcess($all) {
233 self::$runAllTasksInProcess = $all;
234 }
235
236 final protected function log($pattern /* , ... */) {
237 $console = PhutilConsole::getConsole();
238 $argv = func_get_args();
239 call_user_func_array(array($console, 'writeLog'), $argv);
240 return $this;
241 }
242
243
244 /**
245 * Queue a task to be executed after this one succeeds.
246 *
247 * The followup task will be queued only if this task completes cleanly.
248 *
249 * @param string $class Task class to queue.
250 * @param array $data Data for the followup task.
251 * @param array $options (optional) Options for the followup task.
252 * @return $this
253 */
254 final protected function queueTask(
255 $class,
256 array $data,
257 array $options = array()) {
258 $this->queuedTasks[] = array($class, $data, $options);
259 return $this;
260 }
261
262
263 /**
264 * Get tasks queued as followups by @{method:queueTask}.
265 *
266 * @return list<array{string, mixed, int|null}> Queued task specifications.
267 */
268 final protected function getQueuedTasks() {
269 return $this->queuedTasks;
270 }
271
272
273 /**
274 * Schedule any queued tasks, then empty the task queue.
275 *
276 * By default, the queue is flushed only if a task succeeds. You can call
277 * this method to force the queue to flush before failing (for example, if
278 * you are using queues to improve locking behavior).
279 *
280 * @param map<string, mixed> $defaults (optional) Default options.
281 */
282 final public function flushTaskQueue($defaults = array()) {
283 foreach ($this->getQueuedTasks() as $task) {
284 list($class, $data, $options) = $task;
285
286 $options = $options + $defaults;
287
288 self::scheduleTask($class, $data, $options);
289 }
290
291 $this->queuedTasks = array();
292 }
293
294
295 /**
296 * Awaken tasks that have yielded.
297 *
298 * Reschedules the specified tasks if they are currently queued in a yielded,
299 * unleased, unretried state so they'll execute sooner. This can let the
300 * queue avoid unnecessary waits.
301 *
302 * This method does not provide any assurances about when these tasks will
303 * execute, or even guarantee that it will have any effect at all.
304 *
305 * @param array<int> $ids List of task IDs to try to awaken.
306 * @return void
307 */
308 final public static function awakenTaskIDs(array $ids) {
309 if (!$ids) {
310 return;
311 }
312
313 $table = new PhabricatorWorkerActiveTask();
314 $conn_w = $table->establishConnection('w');
315
316 // NOTE: At least for now, we're keeping these tasks yielded, just
317 // pretending that they threw a shorter yield than they really did.
318
319 // Overlap the windows here to handle minor client/server time differences
320 // and because it's likely correct to push these tasks to the head of their
321 // respective priorities. There is a good chance they are ready to execute.
322 $window = phutil_units('1 hour in seconds');
323 $epoch_ago = (PhabricatorTime::getNow() - $window);
324
325 queryfx(
326 $conn_w,
327 'UPDATE %T SET leaseExpires = %d
328 WHERE id IN (%Ld)
329 AND leaseOwner = %s
330 AND leaseExpires > %d
331 AND failureCount = 0',
332 $table->getTableName(),
333 $epoch_ago,
334 $ids,
335 self::YIELD_OWNER,
336 $epoch_ago);
337 }
338
339 protected function newContentSource() {
340 return PhabricatorContentSource::newForSource(
341 PhabricatorDaemonContentSource::SOURCECONST);
342 }
343
344}