@recaptime-dev's working patches + fork for Phorge, a community fork of Phabricator. (Upstream dev and stable branches are at upstream/main and upstream/stable respectively.) hq.recaptime.dev/wiki/Phorge
phorge phabricator
at recaptime-dev/main 240 lines 6.0 kB view raw
1<?php 2 3final class PhabricatorWorkerTriggerQuery 4 extends PhabricatorPolicyAwareQuery { 5 6 // NOTE: This is a PolicyAware query so it can work with other infrastructure 7 // like handles; triggers themselves are low-level and do not have 8 // meaningful policies. 9 10 const ORDER_ID = 'id'; 11 const ORDER_EXECUTION = 'execution'; 12 const ORDER_VERSION = 'version'; 13 14 private $ids; 15 private $phids; 16 private $versionMin; 17 private $versionMax; 18 private $nextEpochMin; 19 private $nextEpochMax; 20 21 private $needEvents; 22 private $order = self::ORDER_ID; 23 24 public function getQueryApplicationClass() { 25 return null; 26 } 27 28 public function withIDs(array $ids) { 29 $this->ids = $ids; 30 return $this; 31 } 32 33 public function withPHIDs(array $phids) { 34 $this->phids = $phids; 35 return $this; 36 } 37 38 public function withVersionBetween($min, $max) { 39 $this->versionMin = $min; 40 $this->versionMax = $max; 41 return $this; 42 } 43 44 public function withNextEventBetween($min, $max) { 45 $this->nextEpochMin = $min; 46 $this->nextEpochMax = $max; 47 return $this; 48 } 49 50 public function needEvents($need_events) { 51 $this->needEvents = $need_events; 52 return $this; 53 } 54 55 /** 56 * Set the result order. 57 * 58 * Note that using `ORDER_EXECUTION` will also filter results to include only 59 * triggers which have been scheduled to execute. You should not use this 60 * ordering when querying for specific triggers, e.g. by ID or PHID. 61 * 62 * @param string $order Result order constant. 63 * @return $this 64 */ 65 public function setOrder($order) { 66 $this->order = $order; 67 return $this; 68 } 69 70 protected function nextPage(array $page) { 71 // NOTE: We don't implement paging because we don't currently ever need 72 // it and paging ORDER_EXECUTION is a hassle. 73 74 // (Before T13266, we raised an exception here, but since "nextPage()" is 75 // now called even if we don't page we can't do that anymore. Just do 76 // nothing instead.) 77 } 78 79 protected function loadPage() { 80 $task_table = new PhabricatorWorkerTrigger(); 81 82 $conn_r = $task_table->establishConnection('r'); 83 84 $rows = queryfx_all( 85 $conn_r, 86 'SELECT t.* FROM %T t %Q %Q %Q %Q', 87 $task_table->getTableName(), 88 $this->buildJoinClause($conn_r), 89 $this->buildWhereClause($conn_r), 90 $this->buildOrderClause($conn_r), 91 $this->buildLimitClause($conn_r)); 92 93 $triggers = $task_table->loadAllFromArray($rows); 94 95 if ($triggers) { 96 if ($this->needEvents) { 97 $ids = mpull($triggers, 'getID'); 98 99 $events = id(new PhabricatorWorkerTriggerEvent())->loadAllWhere( 100 'triggerID IN (%Ld)', 101 $ids); 102 $events = mpull($events, null, 'getTriggerID'); 103 104 foreach ($triggers as $key => $trigger) { 105 $event = idx($events, $trigger->getID()); 106 $trigger->attachEvent($event); 107 } 108 } 109 110 foreach ($triggers as $key => $trigger) { 111 $clock_class = $trigger->getClockClass(); 112 if (!is_subclass_of($clock_class, PhabricatorTriggerClock::class)) { 113 unset($triggers[$key]); 114 continue; 115 } 116 117 try { 118 $argv = array($trigger->getClockProperties()); 119 $clock = newv($clock_class, $argv); 120 } catch (Exception $ex) { 121 unset($triggers[$key]); 122 continue; 123 } 124 125 $trigger->attachClock($clock); 126 } 127 128 129 foreach ($triggers as $key => $trigger) { 130 $action_class = $trigger->getActionClass(); 131 if (!is_subclass_of($action_class, PhabricatorTriggerAction::class)) { 132 unset($triggers[$key]); 133 continue; 134 } 135 136 try { 137 $argv = array($trigger->getActionProperties()); 138 $action = newv($action_class, $argv); 139 } catch (Exception $ex) { 140 unset($triggers[$key]); 141 continue; 142 } 143 144 $trigger->attachAction($action); 145 } 146 } 147 148 return $triggers; 149 } 150 151 protected function buildJoinClause(AphrontDatabaseConnection $conn) { 152 $joins = array(); 153 154 if (($this->nextEpochMin !== null) || 155 ($this->nextEpochMax !== null) || 156 ($this->order == self::ORDER_EXECUTION)) { 157 $joins[] = qsprintf( 158 $conn, 159 'JOIN %T e ON e.triggerID = t.id', 160 id(new PhabricatorWorkerTriggerEvent())->getTableName()); 161 } 162 163 if ($joins) { 164 return qsprintf($conn, '%LJ', $joins); 165 } else { 166 return qsprintf($conn, ''); 167 } 168 } 169 170 protected function buildWhereClause(AphrontDatabaseConnection $conn) { 171 $where = array(); 172 173 if ($this->ids !== null) { 174 $where[] = qsprintf( 175 $conn, 176 't.id IN (%Ld)', 177 $this->ids); 178 } 179 180 if ($this->phids !== null) { 181 $where[] = qsprintf( 182 $conn, 183 't.phid IN (%Ls)', 184 $this->phids); 185 } 186 187 if ($this->versionMin !== null) { 188 $where[] = qsprintf( 189 $conn, 190 't.triggerVersion >= %d', 191 $this->versionMin); 192 } 193 194 if ($this->versionMax !== null) { 195 $where[] = qsprintf( 196 $conn, 197 't.triggerVersion <= %d', 198 $this->versionMax); 199 } 200 201 if ($this->nextEpochMin !== null) { 202 $where[] = qsprintf( 203 $conn, 204 'e.nextEventEpoch >= %d', 205 $this->nextEpochMin); 206 } 207 208 if ($this->nextEpochMax !== null) { 209 $where[] = qsprintf( 210 $conn, 211 'e.nextEventEpoch <= %d', 212 $this->nextEpochMax); 213 } 214 215 return $this->formatWhereClause($conn, $where); 216 } 217 218 private function buildOrderClause(AphrontDatabaseConnection $conn_r) { 219 switch ($this->order) { 220 case self::ORDER_ID: 221 return qsprintf( 222 $conn_r, 223 'ORDER BY id DESC'); 224 case self::ORDER_EXECUTION: 225 return qsprintf( 226 $conn_r, 227 'ORDER BY e.nextEventEpoch ASC, e.id ASC'); 228 case self::ORDER_VERSION: 229 return qsprintf( 230 $conn_r, 231 'ORDER BY t.triggerVersion ASC'); 232 default: 233 throw new Exception( 234 pht( 235 'Unsupported order "%s".', 236 $this->order)); 237 } 238 } 239 240}