@recaptime-dev's working patches + fork for Phorge, a community fork of Phabricator. (Upstream dev and stable branches are at upstream/main and upstream/stable respectively.)
hq.recaptime.dev/wiki/Phorge
phorge
phabricator
1<?php
2
3final class PhabricatorWorkerTriggerQuery
4 extends PhabricatorPolicyAwareQuery {
5
6 // NOTE: This is a PolicyAware query so it can work with other infrastructure
7 // like handles; triggers themselves are low-level and do not have
8 // meaningful policies.
9
10 const ORDER_ID = 'id';
11 const ORDER_EXECUTION = 'execution';
12 const ORDER_VERSION = 'version';
13
14 private $ids;
15 private $phids;
16 private $versionMin;
17 private $versionMax;
18 private $nextEpochMin;
19 private $nextEpochMax;
20
21 private $needEvents;
22 private $order = self::ORDER_ID;
23
24 public function getQueryApplicationClass() {
25 return null;
26 }
27
28 public function withIDs(array $ids) {
29 $this->ids = $ids;
30 return $this;
31 }
32
33 public function withPHIDs(array $phids) {
34 $this->phids = $phids;
35 return $this;
36 }
37
38 public function withVersionBetween($min, $max) {
39 $this->versionMin = $min;
40 $this->versionMax = $max;
41 return $this;
42 }
43
44 public function withNextEventBetween($min, $max) {
45 $this->nextEpochMin = $min;
46 $this->nextEpochMax = $max;
47 return $this;
48 }
49
50 public function needEvents($need_events) {
51 $this->needEvents = $need_events;
52 return $this;
53 }
54
55 /**
56 * Set the result order.
57 *
58 * Note that using `ORDER_EXECUTION` will also filter results to include only
59 * triggers which have been scheduled to execute. You should not use this
60 * ordering when querying for specific triggers, e.g. by ID or PHID.
61 *
62 * @param string $order Result order constant.
63 * @return $this
64 */
65 public function setOrder($order) {
66 $this->order = $order;
67 return $this;
68 }
69
70 protected function nextPage(array $page) {
71 // NOTE: We don't implement paging because we don't currently ever need
72 // it and paging ORDER_EXECUTION is a hassle.
73
74 // (Before T13266, we raised an exception here, but since "nextPage()" is
75 // now called even if we don't page we can't do that anymore. Just do
76 // nothing instead.)
77 }
78
79 protected function loadPage() {
80 $task_table = new PhabricatorWorkerTrigger();
81
82 $conn_r = $task_table->establishConnection('r');
83
84 $rows = queryfx_all(
85 $conn_r,
86 'SELECT t.* FROM %T t %Q %Q %Q %Q',
87 $task_table->getTableName(),
88 $this->buildJoinClause($conn_r),
89 $this->buildWhereClause($conn_r),
90 $this->buildOrderClause($conn_r),
91 $this->buildLimitClause($conn_r));
92
93 $triggers = $task_table->loadAllFromArray($rows);
94
95 if ($triggers) {
96 if ($this->needEvents) {
97 $ids = mpull($triggers, 'getID');
98
99 $events = id(new PhabricatorWorkerTriggerEvent())->loadAllWhere(
100 'triggerID IN (%Ld)',
101 $ids);
102 $events = mpull($events, null, 'getTriggerID');
103
104 foreach ($triggers as $key => $trigger) {
105 $event = idx($events, $trigger->getID());
106 $trigger->attachEvent($event);
107 }
108 }
109
110 foreach ($triggers as $key => $trigger) {
111 $clock_class = $trigger->getClockClass();
112 if (!is_subclass_of($clock_class, PhabricatorTriggerClock::class)) {
113 unset($triggers[$key]);
114 continue;
115 }
116
117 try {
118 $argv = array($trigger->getClockProperties());
119 $clock = newv($clock_class, $argv);
120 } catch (Exception $ex) {
121 unset($triggers[$key]);
122 continue;
123 }
124
125 $trigger->attachClock($clock);
126 }
127
128
129 foreach ($triggers as $key => $trigger) {
130 $action_class = $trigger->getActionClass();
131 if (!is_subclass_of($action_class, PhabricatorTriggerAction::class)) {
132 unset($triggers[$key]);
133 continue;
134 }
135
136 try {
137 $argv = array($trigger->getActionProperties());
138 $action = newv($action_class, $argv);
139 } catch (Exception $ex) {
140 unset($triggers[$key]);
141 continue;
142 }
143
144 $trigger->attachAction($action);
145 }
146 }
147
148 return $triggers;
149 }
150
151 protected function buildJoinClause(AphrontDatabaseConnection $conn) {
152 $joins = array();
153
154 if (($this->nextEpochMin !== null) ||
155 ($this->nextEpochMax !== null) ||
156 ($this->order == self::ORDER_EXECUTION)) {
157 $joins[] = qsprintf(
158 $conn,
159 'JOIN %T e ON e.triggerID = t.id',
160 id(new PhabricatorWorkerTriggerEvent())->getTableName());
161 }
162
163 if ($joins) {
164 return qsprintf($conn, '%LJ', $joins);
165 } else {
166 return qsprintf($conn, '');
167 }
168 }
169
170 protected function buildWhereClause(AphrontDatabaseConnection $conn) {
171 $where = array();
172
173 if ($this->ids !== null) {
174 $where[] = qsprintf(
175 $conn,
176 't.id IN (%Ld)',
177 $this->ids);
178 }
179
180 if ($this->phids !== null) {
181 $where[] = qsprintf(
182 $conn,
183 't.phid IN (%Ls)',
184 $this->phids);
185 }
186
187 if ($this->versionMin !== null) {
188 $where[] = qsprintf(
189 $conn,
190 't.triggerVersion >= %d',
191 $this->versionMin);
192 }
193
194 if ($this->versionMax !== null) {
195 $where[] = qsprintf(
196 $conn,
197 't.triggerVersion <= %d',
198 $this->versionMax);
199 }
200
201 if ($this->nextEpochMin !== null) {
202 $where[] = qsprintf(
203 $conn,
204 'e.nextEventEpoch >= %d',
205 $this->nextEpochMin);
206 }
207
208 if ($this->nextEpochMax !== null) {
209 $where[] = qsprintf(
210 $conn,
211 'e.nextEventEpoch <= %d',
212 $this->nextEpochMax);
213 }
214
215 return $this->formatWhereClause($conn, $where);
216 }
217
218 private function buildOrderClause(AphrontDatabaseConnection $conn_r) {
219 switch ($this->order) {
220 case self::ORDER_ID:
221 return qsprintf(
222 $conn_r,
223 'ORDER BY id DESC');
224 case self::ORDER_EXECUTION:
225 return qsprintf(
226 $conn_r,
227 'ORDER BY e.nextEventEpoch ASC, e.id ASC');
228 case self::ORDER_VERSION:
229 return qsprintf(
230 $conn_r,
231 'ORDER BY t.triggerVersion ASC');
232 default:
233 throw new Exception(
234 pht(
235 'Unsupported order "%s".',
236 $this->order));
237 }
238 }
239
240}