@recaptime-dev's working patches + fork for Phorge, a community fork of Phabricator. (Upstream dev and stable branches are at upstream/main and upstream/stable respectively.)
hq.recaptime.dev/wiki/Phorge
phorge
phabricator
1<?php
2
3/**
4 * Select and lease tasks from the worker task queue.
5 */
6final class PhabricatorWorkerLeaseQuery extends PhabricatorQuery {
7
8 const PHASE_LEASED = 'leased';
9 const PHASE_UNLEASED = 'unleased';
10 const PHASE_EXPIRED = 'expired';
11
12 private $ids;
13 private $objectPHIDs;
14 private $limit;
15 private $skipLease;
16 private $leased = false;
17
18 public static function getDefaultWaitBeforeRetry() {
19 return phutil_units('5 minutes in seconds');
20 }
21
22 public static function getDefaultLeaseDuration() {
23 return phutil_units('2 hours in seconds');
24 }
25
26 /**
27 * Set this flag to select tasks from the top of the queue without leasing
28 * them.
29 *
30 * This can be used to show which tasks are coming up next without altering
31 * the queue's behavior.
32 *
33 * @param bool $skip True to skip the lease acquisition step.
34 */
35 public function setSkipLease($skip) {
36 $this->skipLease = $skip;
37 return $this;
38 }
39
40 public function withIDs(array $ids) {
41 $this->ids = $ids;
42 return $this;
43 }
44
45 public function withObjectPHIDs(array $phids) {
46 $this->objectPHIDs = $phids;
47 return $this;
48 }
49
50 /**
51 * Select only leased tasks, only unleased tasks, or both types of task.
52 *
53 * By default, queries select only unleased tasks (equivalent to passing
54 * `false` to this method). You can pass `true` to select only leased tasks,
55 * or `null` to ignore the lease status of tasks.
56 *
57 * If your result set potentially includes leased tasks, you must disable
58 * leasing using @{method:setSkipLease}. These options are intended for use
59 * when displaying task status information.
60 *
61 * @param mixed $leased `true` to select only leased tasks, `false` to select
62 * only unleased tasks (default), or `null` to select both.
63 * @return $this
64 */
65 public function withLeasedTasks($leased) {
66 $this->leased = $leased;
67 return $this;
68 }
69
70 public function setLimit($limit) {
71 $this->limit = $limit;
72 return $this;
73 }
74
75 public function execute() {
76 if (!$this->limit) {
77 throw new Exception(
78 pht('You must %s when leasing tasks.', 'setLimit()'));
79 }
80
81 if ($this->leased !== false) {
82 if (!$this->skipLease) {
83 throw new Exception(
84 pht(
85 'If you potentially select leased tasks using %s, '.
86 'you MUST disable lease acquisition by calling %s.',
87 'withLeasedTasks()',
88 'setSkipLease()'));
89 }
90 }
91
92 $task_table = new PhabricatorWorkerActiveTask();
93 $taskdata_table = new PhabricatorWorkerTaskData();
94 $lease_ownership_name = $this->getLeaseOwnershipName();
95
96 $conn_w = $task_table->establishConnection('w');
97
98 // Try to satisfy the request from new, unleased tasks first. If we don't
99 // find enough tasks, try tasks with expired leases (i.e., tasks which have
100 // previously failed).
101
102 // If we're selecting leased tasks, look for them first.
103
104 $phases = array();
105 if ($this->leased !== false) {
106 $phases[] = self::PHASE_LEASED;
107 }
108 if ($this->leased !== true) {
109 $phases[] = self::PHASE_UNLEASED;
110 $phases[] = self::PHASE_EXPIRED;
111 }
112 $limit = $this->limit;
113
114 $leased = 0;
115 $task_ids = array();
116 foreach ($phases as $phase) {
117 // NOTE: If we issue `UPDATE ... WHERE ... ORDER BY id ASC`, the query
118 // goes very, very slowly. The `ORDER BY` triggers this, although we get
119 // the same apparent results without it. Without the ORDER BY, binary
120 // read slaves complain that the query isn't repeatable. To avoid both
121 // problems, do a SELECT and then an UPDATE.
122
123 $rows = queryfx_all(
124 $conn_w,
125 'SELECT id, leaseOwner FROM %T %Q %Q %Q',
126 $task_table->getTableName(),
127 $this->buildCustomWhereClause($conn_w, $phase),
128 $this->buildOrderClause($conn_w, $phase),
129 $this->buildLimitClause($conn_w, $limit - $leased));
130
131 // NOTE: Sometimes, we'll race with another worker and they'll grab
132 // this task before we do. We could reduce how often this happens by
133 // selecting more tasks than we need, then shuffling them and trying
134 // to lock only the number we're actually after. However, the amount
135 // of time workers spend here should be very small relative to their
136 // total runtime, so keep it simple for the moment.
137
138 if ($rows) {
139 if ($this->skipLease) {
140 $leased += count($rows);
141 $task_ids += array_fuse(ipull($rows, 'id'));
142 } else {
143 queryfx(
144 $conn_w,
145 'UPDATE %T task
146 SET leaseOwner = %s, leaseExpires = UNIX_TIMESTAMP() + %d
147 %Q',
148 $task_table->getTableName(),
149 $lease_ownership_name,
150 self::getDefaultLeaseDuration(),
151 $this->buildUpdateWhereClause($conn_w, $phase, $rows));
152
153 $leased += $conn_w->getAffectedRows();
154 }
155
156 if ($leased == $limit) {
157 break;
158 }
159 }
160 }
161
162 if (!$leased) {
163 return array();
164 }
165
166 if ($this->skipLease) {
167 $selection_condition = qsprintf(
168 $conn_w,
169 'task.id IN (%Ld)',
170 $task_ids);
171 } else {
172 $selection_condition = qsprintf(
173 $conn_w,
174 'task.leaseOwner = %s AND leaseExpires > UNIX_TIMESTAMP()',
175 $lease_ownership_name);
176 }
177
178 $data = queryfx_all(
179 $conn_w,
180 'SELECT task.*, taskdata.data _taskData, UNIX_TIMESTAMP() _serverTime
181 FROM %T task LEFT JOIN %T taskdata
182 ON taskdata.id = task.dataID
183 WHERE %Q %Q %Q',
184 $task_table->getTableName(),
185 $taskdata_table->getTableName(),
186 $selection_condition,
187 $this->buildOrderClause($conn_w, $phase),
188 $this->buildLimitClause($conn_w, $limit));
189
190 $tasks = $task_table->loadAllFromArray($data);
191 $tasks = mpull($tasks, null, 'getID');
192
193 foreach ($data as $row) {
194 $tasks[$row['id']]->setServerTime($row['_serverTime']);
195 if ($row['_taskData']) {
196 $task_data = json_decode($row['_taskData'], true);
197 } else {
198 $task_data = null;
199 }
200 $tasks[$row['id']]->setData($task_data);
201 }
202
203 if ($this->skipLease) {
204 // Reorder rows into the original phase order if this is a status query.
205 $tasks = array_select_keys($tasks, $task_ids);
206 }
207
208 return $tasks;
209 }
210
211 protected function buildCustomWhereClause(
212 AphrontDatabaseConnection $conn,
213 $phase) {
214
215 $where = array();
216
217 switch ($phase) {
218 case self::PHASE_LEASED:
219 $where[] = qsprintf(
220 $conn,
221 'leaseOwner IS NOT NULL');
222 $where[] = qsprintf(
223 $conn,
224 'leaseExpires >= UNIX_TIMESTAMP()');
225 break;
226 case self::PHASE_UNLEASED:
227 $where[] = qsprintf(
228 $conn,
229 'leaseOwner IS NULL');
230 break;
231 case self::PHASE_EXPIRED:
232 $where[] = qsprintf(
233 $conn,
234 'leaseExpires < UNIX_TIMESTAMP()');
235 break;
236 default:
237 throw new Exception(pht("Unknown phase '%s'!", $phase));
238 }
239
240 if ($this->ids !== null) {
241 $where[] = qsprintf($conn, 'id IN (%Ld)', $this->ids);
242 }
243
244 if ($this->objectPHIDs !== null) {
245 $where[] = qsprintf($conn, 'objectPHID IN (%Ls)', $this->objectPHIDs);
246 }
247
248 return $this->formatWhereClause($conn, $where);
249 }
250
251 private function buildUpdateWhereClause(
252 AphrontDatabaseConnection $conn,
253 $phase,
254 array $rows) {
255
256 $where = array();
257
258 // NOTE: This is basically working around the MySQL behavior that
259 // `IN (NULL)` doesn't match NULL.
260
261 switch ($phase) {
262 case self::PHASE_LEASED:
263 throw new Exception(
264 pht(
265 'Trying to lease tasks selected in the leased phase! This is '.
266 'intended to be impossible.'));
267 case self::PHASE_UNLEASED:
268 $where[] = qsprintf($conn, 'leaseOwner IS NULL');
269 $where[] = qsprintf($conn, 'id IN (%Ld)', ipull($rows, 'id'));
270 break;
271 case self::PHASE_EXPIRED:
272 $in = array();
273 foreach ($rows as $row) {
274 $in[] = qsprintf(
275 $conn,
276 '(id = %d AND leaseOwner = %s)',
277 $row['id'],
278 $row['leaseOwner']);
279 }
280 $where[] = qsprintf($conn, '%LO', $in);
281 break;
282 default:
283 throw new Exception(pht('Unknown phase "%s"!', $phase));
284 }
285
286 return $this->formatWhereClause($conn, $where);
287 }
288
289 private function buildOrderClause(AphrontDatabaseConnection $conn_w, $phase) {
290 switch ($phase) {
291 case self::PHASE_LEASED:
292 // Ideally we'd probably order these by lease acquisition time, but
293 // we don't have that handy and this is a good approximation.
294 return qsprintf($conn_w, 'ORDER BY priority ASC, id ASC');
295 case self::PHASE_UNLEASED:
296 // When selecting new tasks, we want to consume them in order of
297 // increasing priority (and then FIFO).
298 return qsprintf($conn_w, 'ORDER BY priority ASC, id ASC');
299 case self::PHASE_EXPIRED:
300 // When selecting failed tasks, we want to consume them in roughly
301 // FIFO order of their failures, which is not necessarily their original
302 // queue order.
303
304 // Particularly, this is important for tasks which use soft failures to
305 // indicate that they are waiting on other tasks to complete: we need to
306 // push them to the end of the queue after they fail, at least on
307 // average, so we don't deadlock retrying the same blocked task over
308 // and over again.
309 return qsprintf($conn_w, 'ORDER BY leaseExpires ASC');
310 default:
311 throw new Exception(pht('Unknown phase "%s"!', $phase));
312 }
313 }
314
315 private function buildLimitClause(AphrontDatabaseConnection $conn_w, $limit) {
316 return qsprintf($conn_w, 'LIMIT %d', $limit);
317 }
318
319 private function getLeaseOwnershipName() {
320 static $sequence = 0;
321
322 // TODO: If the host name is very long, this can overflow the 64-character
323 // column, so we pick just the first part of the host name. It might be
324 // useful to just use a random hash as the identifier instead and put the
325 // pid / time / host (which are somewhat useful diagnostically) elsewhere.
326 // Likely, we could store a daemon ID instead and use that to identify
327 // when and where code executed. See T6742.
328
329 $host = php_uname('n');
330 $host = id(new PhutilUTF8StringTruncator())
331 ->setMaximumBytes(32)
332 ->setTerminator('...')
333 ->truncateString($host);
334
335 $parts = array(
336 getmypid(),
337 time(),
338 $host,
339 ++$sequence,
340 );
341
342 return implode(':', $parts);
343 }
344
345}