@recaptime-dev's working patches + fork for Phorge, a community fork of Phabricator. (Upstream dev and stable branches are at upstream/main and upstream/stable respectively.)
hq.recaptime.dev/wiki/Phorge
phorge
phabricator
1<?php
2
3/**
4 * Scaffolding for implementing robust background processing scripts.
5 *
6 *
7 * Autoscaling
8 * ===========
9 *
10 * Autoscaling automatically launches copies of a daemon when it is busy
11 * (scaling the pool up) and stops them when they're idle (scaling the pool
12 * down). This is appropriate for daemons which perform highly parallelizable
13 * work.
14 *
15 * To make a daemon support autoscaling, the implementation should look
16 * something like this:
17 *
18 * while (!$this->shouldExit()) {
19 * if (work_available()) {
20 * $this->willBeginWork();
21 * do_work();
22 * $this->sleep(0);
23 * } else {
24 * $this->willBeginIdle();
25 * $this->sleep(1);
26 * }
27 * }
28 *
29 * In particular, call @{method:willBeginWork} before becoming busy, and
30 * @{method:willBeginIdle} when no work is available. If the daemon is launched
31 * into an autoscale pool, this will cause the pool to automatically scale up
32 * when busy and down when idle.
33 *
34 * Launching a daemon which does not make these callbacks into an autoscale
35 * pool will have no effect.
36 *
37 * @task overseer Communicating With the Overseer
38 * @task autoscale Autoscaling Daemon Pools
39 */
40abstract class PhutilDaemon extends Phobject {
41
42 const MESSAGETYPE_STDOUT = 'stdout';
43 const MESSAGETYPE_HEARTBEAT = 'heartbeat';
44 const MESSAGETYPE_BUSY = 'busy';
45 const MESSAGETYPE_IDLE = 'idle';
46 const MESSAGETYPE_DOWN = 'down';
47 const MESSAGETYPE_HIBERNATE = 'hibernate';
48
49 const WORKSTATE_BUSY = 'busy';
50 const WORKSTATE_IDLE = 'idle';
51
52 private $argv;
53 private $traceMode;
54 private $traceMemory;
55 private $verbose;
56 private $notifyReceived;
57 private $inGracefulShutdown;
58 private $workState = null;
59 private $idleSince = null;
60 private $scaledownDuration;
61
62 final public function setVerbose($verbose) {
63 $this->verbose = $verbose;
64 return $this;
65 }
66
67 final public function getVerbose() {
68 return $this->verbose;
69 }
70
71 final public function setScaledownDuration($scaledown_duration) {
72 $this->scaledownDuration = $scaledown_duration;
73 return $this;
74 }
75
76 final public function getScaledownDuration() {
77 return $this->scaledownDuration;
78 }
79
80 final public function __construct(array $argv) {
81 $this->argv = $argv;
82
83 $router = PhutilSignalRouter::getRouter();
84 $handler_key = 'daemon.term';
85 if (!$router->getHandler($handler_key)) {
86 $handler = new PhutilCallbackSignalHandler(
87 SIGTERM,
88 self::class.'::onTermSignal');
89 $router->installHandler($handler_key, $handler);
90 }
91
92 pcntl_signal(SIGINT, array($this, 'onGracefulSignal'));
93 pcntl_signal(SIGUSR2, array($this, 'onNotifySignal'));
94
95 // Without discard mode, this consumes unbounded amounts of memory. Keep
96 // memory bounded.
97 PhutilServiceProfiler::getInstance()->enableDiscardMode();
98
99 $this->beginStdoutCapture();
100 }
101
102 final public function __destruct() {
103 $this->endStdoutCapture();
104 }
105
106 final public function stillWorking() {
107 $this->emitOverseerMessage(self::MESSAGETYPE_HEARTBEAT, null);
108
109 if ($this->traceMemory) {
110 $daemon = get_class($this);
111 fprintf(
112 STDERR,
113 "%s %s %s\n",
114 '<RAMS>',
115 $daemon,
116 pht(
117 'Memory Usage: %s KB',
118 new PhutilNumber(memory_get_usage() / 1024, 1)));
119 }
120 }
121
122 final public function shouldExit() {
123 return $this->inGracefulShutdown;
124 }
125
126 final protected function shouldHibernate($duration) {
127 // Don't hibernate if we don't have very long to sleep.
128 if ($duration < 30) {
129 return false;
130 }
131
132 // Never hibernate if we're part of a pool and could scale down instead.
133 // We only hibernate the last process to drop the pool size to zero.
134 if ($this->getScaledownDuration()) {
135 return false;
136 }
137
138 // Don't hibernate for too long.
139 $duration = min($duration, phutil_units('3 minutes in seconds'));
140
141 $this->emitOverseerMessage(
142 self::MESSAGETYPE_HIBERNATE,
143 array(
144 'duration' => $duration,
145 ));
146
147 $this->log(
148 pht(
149 'Preparing to hibernate for %s second(s).',
150 new PhutilNumber($duration)));
151
152 return true;
153 }
154
155 final protected function sleep($duration) {
156 $this->notifyReceived = false;
157 $this->willSleep($duration);
158 $this->stillWorking();
159
160 $scale_down = $this->getScaledownDuration();
161
162 $max_sleep = 60;
163 if ($scale_down) {
164 $max_sleep = min($max_sleep, $scale_down);
165 }
166
167 if ($scale_down) {
168 if ($this->workState == self::WORKSTATE_IDLE) {
169 $dur = $this->getIdleDuration();
170 $this->log(pht('Idle for %s seconds.', $dur));
171 }
172 }
173
174 while ($duration > 0 &&
175 !$this->notifyReceived &&
176 !$this->shouldExit()) {
177
178 // If this is an autoscaling clone and we've been idle for too long,
179 // we're going to scale the pool down by exiting and not restarting. The
180 // DOWN message tells the overseer that we don't want to be restarted.
181 if ($scale_down) {
182 if ($this->workState == self::WORKSTATE_IDLE) {
183 if ($this->idleSince && ($this->idleSince + $scale_down < time())) {
184 $this->inGracefulShutdown = true;
185 $this->emitOverseerMessage(self::MESSAGETYPE_DOWN, null);
186 $this->log(
187 pht(
188 'Daemon was idle for more than %s second(s), '.
189 'scaling pool down.',
190 new PhutilNumber($scale_down)));
191 break;
192 }
193 }
194 }
195
196 sleep(min($duration, $max_sleep));
197 $duration -= $max_sleep;
198 $this->stillWorking();
199 }
200 }
201
202 protected function willSleep($duration) {
203 return;
204 }
205
206 public static function onTermSignal($signo) {
207 self::didCatchSignal($signo);
208 }
209
210 final protected function getArgv() {
211 return $this->argv;
212 }
213
214 final public function execute() {
215 $this->willRun();
216 $this->run();
217 }
218
219 abstract protected function run();
220
221 final public function setTraceMemory() {
222 $this->traceMemory = true;
223 return $this;
224 }
225
226 final public function getTraceMemory() {
227 return $this->traceMemory;
228 }
229
230 final public function setTraceMode() {
231 $this->traceMode = true;
232 PhutilServiceProfiler::installEchoListener();
233 PhutilConsole::getConsole()->getServer()->setEnableLog(true);
234 $this->didSetTraceMode();
235 return $this;
236 }
237
238 final public function getTraceMode() {
239 return $this->traceMode;
240 }
241
242 final public function onGracefulSignal($signo) {
243 self::didCatchSignal($signo);
244 $this->inGracefulShutdown = true;
245 }
246
247 final public function onNotifySignal($signo) {
248 self::didCatchSignal($signo);
249 $this->notifyReceived = true;
250 $this->onNotify($signo);
251 }
252
253 protected function onNotify($signo) {
254 // This is a hook for subclasses.
255 }
256
257 protected function willRun() {
258 // This is a hook for subclasses.
259 }
260
261 protected function didSetTraceMode() {
262 // This is a hook for subclasses.
263 }
264
265 final protected function log($message) {
266 if ($this->verbose) {
267 $daemon = get_class($this);
268 fprintf(STDERR, "%s %s %s\n", '<VERB>', $daemon, $message);
269 }
270 }
271
272 private static function didCatchSignal($signo) {
273 $signame = phutil_get_signal_name($signo);
274 fprintf(
275 STDERR,
276 "%s Caught signal %s (%s).\n",
277 '<SGNL>',
278 $signo,
279 $signame);
280 }
281
282
283/* -( Communicating With the Overseer )------------------------------------ */
284
285
286 private function beginStdoutCapture() {
287 ob_start(array($this, 'didReceiveStdout'), 2);
288 }
289
290 private function endStdoutCapture() {
291 ob_end_flush();
292 }
293
294 public function didReceiveStdout($data) {
295 if (!strlen($data)) {
296 return '';
297 }
298
299 return $this->encodeOverseerMessage(self::MESSAGETYPE_STDOUT, $data);
300 }
301
302 private function encodeOverseerMessage($type, $data) {
303 $structure = array($type);
304
305 if ($data !== null) {
306 $structure[] = $data;
307 }
308
309 return json_encode($structure)."\n";
310 }
311
312 private function emitOverseerMessage($type, $data) {
313 $this->endStdoutCapture();
314 echo $this->encodeOverseerMessage($type, $data);
315 $this->beginStdoutCapture();
316 }
317
318 public static function errorListener($event, $value, array $metadata) {
319 // If the caller has redirected the error log to a file, PHP won't output
320 // messages to stderr, so the overseer can't capture them. Install a
321 // listener which just echoes errors to stderr, so the overseer is always
322 // aware of errors.
323
324 $console = PhutilConsole::getConsole();
325 $message = idx($metadata, 'default_message');
326
327 if ($message) {
328 $console->writeErr("%s\n", $message);
329 }
330 if (idx($metadata, 'trace')) {
331 $trace = PhutilErrorHandler::formatStacktrace($metadata['trace']);
332 $console->writeErr("%s\n", $trace);
333 }
334 }
335
336
337/* -( Autoscaling )-------------------------------------------------------- */
338
339
340 /**
341 * Prepare to become busy. This may autoscale the pool up.
342 *
343 * This notifies the overseer that the daemon has become busy. If daemons
344 * that are part of an autoscale pool are continuously busy for a prolonged
345 * period of time, the overseer may scale up the pool.
346 *
347 * @return $this
348 * @task autoscale
349 */
350 protected function willBeginWork() {
351 if ($this->workState != self::WORKSTATE_BUSY) {
352 $this->workState = self::WORKSTATE_BUSY;
353 $this->idleSince = null;
354 $this->emitOverseerMessage(self::MESSAGETYPE_BUSY, null);
355 }
356
357 return $this;
358 }
359
360
361 /**
362 * Prepare to idle. This may autoscale the pool down.
363 *
364 * This notifies the overseer that the daemon is no longer busy. If daemons
365 * that are part of an autoscale pool are idle for a prolonged period of
366 * time, they may exit to scale the pool down.
367 *
368 * @return $this
369 * @task autoscale
370 */
371 protected function willBeginIdle() {
372 if ($this->workState != self::WORKSTATE_IDLE) {
373 $this->workState = self::WORKSTATE_IDLE;
374 $this->idleSince = time();
375 $this->emitOverseerMessage(self::MESSAGETYPE_IDLE, null);
376 }
377
378 return $this;
379 }
380
381 protected function getIdleDuration() {
382 if (!$this->idleSince) {
383 return null;
384 }
385
386 $now = time();
387 return ($now - $this->idleSince);
388 }
389
390}