@recaptime-dev's working patches + fork for Phorge, a community fork of Phabricator. (Upstream dev and stable branches are at upstream/main and upstream/stable respectively.) hq.recaptime.dev/wiki/Phorge
phorge phabricator
at recaptime-dev/main 345 lines 11 kB view raw
1<?php 2 3/** 4 * Select and lease tasks from the worker task queue. 5 */ 6final class PhabricatorWorkerLeaseQuery extends PhabricatorQuery { 7 8 const PHASE_LEASED = 'leased'; 9 const PHASE_UNLEASED = 'unleased'; 10 const PHASE_EXPIRED = 'expired'; 11 12 private $ids; 13 private $objectPHIDs; 14 private $limit; 15 private $skipLease; 16 private $leased = false; 17 18 public static function getDefaultWaitBeforeRetry() { 19 return phutil_units('5 minutes in seconds'); 20 } 21 22 public static function getDefaultLeaseDuration() { 23 return phutil_units('2 hours in seconds'); 24 } 25 26 /** 27 * Set this flag to select tasks from the top of the queue without leasing 28 * them. 29 * 30 * This can be used to show which tasks are coming up next without altering 31 * the queue's behavior. 32 * 33 * @param bool $skip True to skip the lease acquisition step. 34 */ 35 public function setSkipLease($skip) { 36 $this->skipLease = $skip; 37 return $this; 38 } 39 40 public function withIDs(array $ids) { 41 $this->ids = $ids; 42 return $this; 43 } 44 45 public function withObjectPHIDs(array $phids) { 46 $this->objectPHIDs = $phids; 47 return $this; 48 } 49 50 /** 51 * Select only leased tasks, only unleased tasks, or both types of task. 52 * 53 * By default, queries select only unleased tasks (equivalent to passing 54 * `false` to this method). You can pass `true` to select only leased tasks, 55 * or `null` to ignore the lease status of tasks. 56 * 57 * If your result set potentially includes leased tasks, you must disable 58 * leasing using @{method:setSkipLease}. These options are intended for use 59 * when displaying task status information. 60 * 61 * @param mixed $leased `true` to select only leased tasks, `false` to select 62 * only unleased tasks (default), or `null` to select both. 63 * @return $this 64 */ 65 public function withLeasedTasks($leased) { 66 $this->leased = $leased; 67 return $this; 68 } 69 70 public function setLimit($limit) { 71 $this->limit = $limit; 72 return $this; 73 } 74 75 public function execute() { 76 if (!$this->limit) { 77 throw new Exception( 78 pht('You must %s when leasing tasks.', 'setLimit()')); 79 } 80 81 if ($this->leased !== false) { 82 if (!$this->skipLease) { 83 throw new Exception( 84 pht( 85 'If you potentially select leased tasks using %s, '. 86 'you MUST disable lease acquisition by calling %s.', 87 'withLeasedTasks()', 88 'setSkipLease()')); 89 } 90 } 91 92 $task_table = new PhabricatorWorkerActiveTask(); 93 $taskdata_table = new PhabricatorWorkerTaskData(); 94 $lease_ownership_name = $this->getLeaseOwnershipName(); 95 96 $conn_w = $task_table->establishConnection('w'); 97 98 // Try to satisfy the request from new, unleased tasks first. If we don't 99 // find enough tasks, try tasks with expired leases (i.e., tasks which have 100 // previously failed). 101 102 // If we're selecting leased tasks, look for them first. 103 104 $phases = array(); 105 if ($this->leased !== false) { 106 $phases[] = self::PHASE_LEASED; 107 } 108 if ($this->leased !== true) { 109 $phases[] = self::PHASE_UNLEASED; 110 $phases[] = self::PHASE_EXPIRED; 111 } 112 $limit = $this->limit; 113 114 $leased = 0; 115 $task_ids = array(); 116 foreach ($phases as $phase) { 117 // NOTE: If we issue `UPDATE ... WHERE ... ORDER BY id ASC`, the query 118 // goes very, very slowly. The `ORDER BY` triggers this, although we get 119 // the same apparent results without it. Without the ORDER BY, binary 120 // read slaves complain that the query isn't repeatable. To avoid both 121 // problems, do a SELECT and then an UPDATE. 122 123 $rows = queryfx_all( 124 $conn_w, 125 'SELECT id, leaseOwner FROM %T %Q %Q %Q', 126 $task_table->getTableName(), 127 $this->buildCustomWhereClause($conn_w, $phase), 128 $this->buildOrderClause($conn_w, $phase), 129 $this->buildLimitClause($conn_w, $limit - $leased)); 130 131 // NOTE: Sometimes, we'll race with another worker and they'll grab 132 // this task before we do. We could reduce how often this happens by 133 // selecting more tasks than we need, then shuffling them and trying 134 // to lock only the number we're actually after. However, the amount 135 // of time workers spend here should be very small relative to their 136 // total runtime, so keep it simple for the moment. 137 138 if ($rows) { 139 if ($this->skipLease) { 140 $leased += count($rows); 141 $task_ids += array_fuse(ipull($rows, 'id')); 142 } else { 143 queryfx( 144 $conn_w, 145 'UPDATE %T task 146 SET leaseOwner = %s, leaseExpires = UNIX_TIMESTAMP() + %d 147 %Q', 148 $task_table->getTableName(), 149 $lease_ownership_name, 150 self::getDefaultLeaseDuration(), 151 $this->buildUpdateWhereClause($conn_w, $phase, $rows)); 152 153 $leased += $conn_w->getAffectedRows(); 154 } 155 156 if ($leased == $limit) { 157 break; 158 } 159 } 160 } 161 162 if (!$leased) { 163 return array(); 164 } 165 166 if ($this->skipLease) { 167 $selection_condition = qsprintf( 168 $conn_w, 169 'task.id IN (%Ld)', 170 $task_ids); 171 } else { 172 $selection_condition = qsprintf( 173 $conn_w, 174 'task.leaseOwner = %s AND leaseExpires > UNIX_TIMESTAMP()', 175 $lease_ownership_name); 176 } 177 178 $data = queryfx_all( 179 $conn_w, 180 'SELECT task.*, taskdata.data _taskData, UNIX_TIMESTAMP() _serverTime 181 FROM %T task LEFT JOIN %T taskdata 182 ON taskdata.id = task.dataID 183 WHERE %Q %Q %Q', 184 $task_table->getTableName(), 185 $taskdata_table->getTableName(), 186 $selection_condition, 187 $this->buildOrderClause($conn_w, $phase), 188 $this->buildLimitClause($conn_w, $limit)); 189 190 $tasks = $task_table->loadAllFromArray($data); 191 $tasks = mpull($tasks, null, 'getID'); 192 193 foreach ($data as $row) { 194 $tasks[$row['id']]->setServerTime($row['_serverTime']); 195 if ($row['_taskData']) { 196 $task_data = json_decode($row['_taskData'], true); 197 } else { 198 $task_data = null; 199 } 200 $tasks[$row['id']]->setData($task_data); 201 } 202 203 if ($this->skipLease) { 204 // Reorder rows into the original phase order if this is a status query. 205 $tasks = array_select_keys($tasks, $task_ids); 206 } 207 208 return $tasks; 209 } 210 211 protected function buildCustomWhereClause( 212 AphrontDatabaseConnection $conn, 213 $phase) { 214 215 $where = array(); 216 217 switch ($phase) { 218 case self::PHASE_LEASED: 219 $where[] = qsprintf( 220 $conn, 221 'leaseOwner IS NOT NULL'); 222 $where[] = qsprintf( 223 $conn, 224 'leaseExpires >= UNIX_TIMESTAMP()'); 225 break; 226 case self::PHASE_UNLEASED: 227 $where[] = qsprintf( 228 $conn, 229 'leaseOwner IS NULL'); 230 break; 231 case self::PHASE_EXPIRED: 232 $where[] = qsprintf( 233 $conn, 234 'leaseExpires < UNIX_TIMESTAMP()'); 235 break; 236 default: 237 throw new Exception(pht("Unknown phase '%s'!", $phase)); 238 } 239 240 if ($this->ids !== null) { 241 $where[] = qsprintf($conn, 'id IN (%Ld)', $this->ids); 242 } 243 244 if ($this->objectPHIDs !== null) { 245 $where[] = qsprintf($conn, 'objectPHID IN (%Ls)', $this->objectPHIDs); 246 } 247 248 return $this->formatWhereClause($conn, $where); 249 } 250 251 private function buildUpdateWhereClause( 252 AphrontDatabaseConnection $conn, 253 $phase, 254 array $rows) { 255 256 $where = array(); 257 258 // NOTE: This is basically working around the MySQL behavior that 259 // `IN (NULL)` doesn't match NULL. 260 261 switch ($phase) { 262 case self::PHASE_LEASED: 263 throw new Exception( 264 pht( 265 'Trying to lease tasks selected in the leased phase! This is '. 266 'intended to be impossible.')); 267 case self::PHASE_UNLEASED: 268 $where[] = qsprintf($conn, 'leaseOwner IS NULL'); 269 $where[] = qsprintf($conn, 'id IN (%Ld)', ipull($rows, 'id')); 270 break; 271 case self::PHASE_EXPIRED: 272 $in = array(); 273 foreach ($rows as $row) { 274 $in[] = qsprintf( 275 $conn, 276 '(id = %d AND leaseOwner = %s)', 277 $row['id'], 278 $row['leaseOwner']); 279 } 280 $where[] = qsprintf($conn, '%LO', $in); 281 break; 282 default: 283 throw new Exception(pht('Unknown phase "%s"!', $phase)); 284 } 285 286 return $this->formatWhereClause($conn, $where); 287 } 288 289 private function buildOrderClause(AphrontDatabaseConnection $conn_w, $phase) { 290 switch ($phase) { 291 case self::PHASE_LEASED: 292 // Ideally we'd probably order these by lease acquisition time, but 293 // we don't have that handy and this is a good approximation. 294 return qsprintf($conn_w, 'ORDER BY priority ASC, id ASC'); 295 case self::PHASE_UNLEASED: 296 // When selecting new tasks, we want to consume them in order of 297 // increasing priority (and then FIFO). 298 return qsprintf($conn_w, 'ORDER BY priority ASC, id ASC'); 299 case self::PHASE_EXPIRED: 300 // When selecting failed tasks, we want to consume them in roughly 301 // FIFO order of their failures, which is not necessarily their original 302 // queue order. 303 304 // Particularly, this is important for tasks which use soft failures to 305 // indicate that they are waiting on other tasks to complete: we need to 306 // push them to the end of the queue after they fail, at least on 307 // average, so we don't deadlock retrying the same blocked task over 308 // and over again. 309 return qsprintf($conn_w, 'ORDER BY leaseExpires ASC'); 310 default: 311 throw new Exception(pht('Unknown phase "%s"!', $phase)); 312 } 313 } 314 315 private function buildLimitClause(AphrontDatabaseConnection $conn_w, $limit) { 316 return qsprintf($conn_w, 'LIMIT %d', $limit); 317 } 318 319 private function getLeaseOwnershipName() { 320 static $sequence = 0; 321 322 // TODO: If the host name is very long, this can overflow the 64-character 323 // column, so we pick just the first part of the host name. It might be 324 // useful to just use a random hash as the identifier instead and put the 325 // pid / time / host (which are somewhat useful diagnostically) elsewhere. 326 // Likely, we could store a daemon ID instead and use that to identify 327 // when and where code executed. See T6742. 328 329 $host = php_uname('n'); 330 $host = id(new PhutilUTF8StringTruncator()) 331 ->setMaximumBytes(32) 332 ->setTerminator('...') 333 ->truncateString($host); 334 335 $parts = array( 336 getmypid(), 337 time(), 338 $host, 339 ++$sequence, 340 ); 341 342 return implode(':', $parts); 343 } 344 345}