@recaptime-dev's working patches + fork for Phorge, a community fork of Phabricator. (Upstream dev and stable branches are at upstream/main and upstream/stable respectively.) hq.recaptime.dev/wiki/Phorge
phorge phabricator
at recaptime-dev/main 390 lines 10 kB view raw
1<?php 2 3/** 4 * Scaffolding for implementing robust background processing scripts. 5 * 6 * 7 * Autoscaling 8 * =========== 9 * 10 * Autoscaling automatically launches copies of a daemon when it is busy 11 * (scaling the pool up) and stops them when they're idle (scaling the pool 12 * down). This is appropriate for daemons which perform highly parallelizable 13 * work. 14 * 15 * To make a daemon support autoscaling, the implementation should look 16 * something like this: 17 * 18 * while (!$this->shouldExit()) { 19 * if (work_available()) { 20 * $this->willBeginWork(); 21 * do_work(); 22 * $this->sleep(0); 23 * } else { 24 * $this->willBeginIdle(); 25 * $this->sleep(1); 26 * } 27 * } 28 * 29 * In particular, call @{method:willBeginWork} before becoming busy, and 30 * @{method:willBeginIdle} when no work is available. If the daemon is launched 31 * into an autoscale pool, this will cause the pool to automatically scale up 32 * when busy and down when idle. 33 * 34 * Launching a daemon which does not make these callbacks into an autoscale 35 * pool will have no effect. 36 * 37 * @task overseer Communicating With the Overseer 38 * @task autoscale Autoscaling Daemon Pools 39 */ 40abstract class PhutilDaemon extends Phobject { 41 42 const MESSAGETYPE_STDOUT = 'stdout'; 43 const MESSAGETYPE_HEARTBEAT = 'heartbeat'; 44 const MESSAGETYPE_BUSY = 'busy'; 45 const MESSAGETYPE_IDLE = 'idle'; 46 const MESSAGETYPE_DOWN = 'down'; 47 const MESSAGETYPE_HIBERNATE = 'hibernate'; 48 49 const WORKSTATE_BUSY = 'busy'; 50 const WORKSTATE_IDLE = 'idle'; 51 52 private $argv; 53 private $traceMode; 54 private $traceMemory; 55 private $verbose; 56 private $notifyReceived; 57 private $inGracefulShutdown; 58 private $workState = null; 59 private $idleSince = null; 60 private $scaledownDuration; 61 62 final public function setVerbose($verbose) { 63 $this->verbose = $verbose; 64 return $this; 65 } 66 67 final public function getVerbose() { 68 return $this->verbose; 69 } 70 71 final public function setScaledownDuration($scaledown_duration) { 72 $this->scaledownDuration = $scaledown_duration; 73 return $this; 74 } 75 76 final public function getScaledownDuration() { 77 return $this->scaledownDuration; 78 } 79 80 final public function __construct(array $argv) { 81 $this->argv = $argv; 82 83 $router = PhutilSignalRouter::getRouter(); 84 $handler_key = 'daemon.term'; 85 if (!$router->getHandler($handler_key)) { 86 $handler = new PhutilCallbackSignalHandler( 87 SIGTERM, 88 self::class.'::onTermSignal'); 89 $router->installHandler($handler_key, $handler); 90 } 91 92 pcntl_signal(SIGINT, array($this, 'onGracefulSignal')); 93 pcntl_signal(SIGUSR2, array($this, 'onNotifySignal')); 94 95 // Without discard mode, this consumes unbounded amounts of memory. Keep 96 // memory bounded. 97 PhutilServiceProfiler::getInstance()->enableDiscardMode(); 98 99 $this->beginStdoutCapture(); 100 } 101 102 final public function __destruct() { 103 $this->endStdoutCapture(); 104 } 105 106 final public function stillWorking() { 107 $this->emitOverseerMessage(self::MESSAGETYPE_HEARTBEAT, null); 108 109 if ($this->traceMemory) { 110 $daemon = get_class($this); 111 fprintf( 112 STDERR, 113 "%s %s %s\n", 114 '<RAMS>', 115 $daemon, 116 pht( 117 'Memory Usage: %s KB', 118 new PhutilNumber(memory_get_usage() / 1024, 1))); 119 } 120 } 121 122 final public function shouldExit() { 123 return $this->inGracefulShutdown; 124 } 125 126 final protected function shouldHibernate($duration) { 127 // Don't hibernate if we don't have very long to sleep. 128 if ($duration < 30) { 129 return false; 130 } 131 132 // Never hibernate if we're part of a pool and could scale down instead. 133 // We only hibernate the last process to drop the pool size to zero. 134 if ($this->getScaledownDuration()) { 135 return false; 136 } 137 138 // Don't hibernate for too long. 139 $duration = min($duration, phutil_units('3 minutes in seconds')); 140 141 $this->emitOverseerMessage( 142 self::MESSAGETYPE_HIBERNATE, 143 array( 144 'duration' => $duration, 145 )); 146 147 $this->log( 148 pht( 149 'Preparing to hibernate for %s second(s).', 150 new PhutilNumber($duration))); 151 152 return true; 153 } 154 155 final protected function sleep($duration) { 156 $this->notifyReceived = false; 157 $this->willSleep($duration); 158 $this->stillWorking(); 159 160 $scale_down = $this->getScaledownDuration(); 161 162 $max_sleep = 60; 163 if ($scale_down) { 164 $max_sleep = min($max_sleep, $scale_down); 165 } 166 167 if ($scale_down) { 168 if ($this->workState == self::WORKSTATE_IDLE) { 169 $dur = $this->getIdleDuration(); 170 $this->log(pht('Idle for %s seconds.', $dur)); 171 } 172 } 173 174 while ($duration > 0 && 175 !$this->notifyReceived && 176 !$this->shouldExit()) { 177 178 // If this is an autoscaling clone and we've been idle for too long, 179 // we're going to scale the pool down by exiting and not restarting. The 180 // DOWN message tells the overseer that we don't want to be restarted. 181 if ($scale_down) { 182 if ($this->workState == self::WORKSTATE_IDLE) { 183 if ($this->idleSince && ($this->idleSince + $scale_down < time())) { 184 $this->inGracefulShutdown = true; 185 $this->emitOverseerMessage(self::MESSAGETYPE_DOWN, null); 186 $this->log( 187 pht( 188 'Daemon was idle for more than %s second(s), '. 189 'scaling pool down.', 190 new PhutilNumber($scale_down))); 191 break; 192 } 193 } 194 } 195 196 sleep(min($duration, $max_sleep)); 197 $duration -= $max_sleep; 198 $this->stillWorking(); 199 } 200 } 201 202 protected function willSleep($duration) { 203 return; 204 } 205 206 public static function onTermSignal($signo) { 207 self::didCatchSignal($signo); 208 } 209 210 final protected function getArgv() { 211 return $this->argv; 212 } 213 214 final public function execute() { 215 $this->willRun(); 216 $this->run(); 217 } 218 219 abstract protected function run(); 220 221 final public function setTraceMemory() { 222 $this->traceMemory = true; 223 return $this; 224 } 225 226 final public function getTraceMemory() { 227 return $this->traceMemory; 228 } 229 230 final public function setTraceMode() { 231 $this->traceMode = true; 232 PhutilServiceProfiler::installEchoListener(); 233 PhutilConsole::getConsole()->getServer()->setEnableLog(true); 234 $this->didSetTraceMode(); 235 return $this; 236 } 237 238 final public function getTraceMode() { 239 return $this->traceMode; 240 } 241 242 final public function onGracefulSignal($signo) { 243 self::didCatchSignal($signo); 244 $this->inGracefulShutdown = true; 245 } 246 247 final public function onNotifySignal($signo) { 248 self::didCatchSignal($signo); 249 $this->notifyReceived = true; 250 $this->onNotify($signo); 251 } 252 253 protected function onNotify($signo) { 254 // This is a hook for subclasses. 255 } 256 257 protected function willRun() { 258 // This is a hook for subclasses. 259 } 260 261 protected function didSetTraceMode() { 262 // This is a hook for subclasses. 263 } 264 265 final protected function log($message) { 266 if ($this->verbose) { 267 $daemon = get_class($this); 268 fprintf(STDERR, "%s %s %s\n", '<VERB>', $daemon, $message); 269 } 270 } 271 272 private static function didCatchSignal($signo) { 273 $signame = phutil_get_signal_name($signo); 274 fprintf( 275 STDERR, 276 "%s Caught signal %s (%s).\n", 277 '<SGNL>', 278 $signo, 279 $signame); 280 } 281 282 283/* -( Communicating With the Overseer )------------------------------------ */ 284 285 286 private function beginStdoutCapture() { 287 ob_start(array($this, 'didReceiveStdout'), 2); 288 } 289 290 private function endStdoutCapture() { 291 ob_end_flush(); 292 } 293 294 public function didReceiveStdout($data) { 295 if (!strlen($data)) { 296 return ''; 297 } 298 299 return $this->encodeOverseerMessage(self::MESSAGETYPE_STDOUT, $data); 300 } 301 302 private function encodeOverseerMessage($type, $data) { 303 $structure = array($type); 304 305 if ($data !== null) { 306 $structure[] = $data; 307 } 308 309 return json_encode($structure)."\n"; 310 } 311 312 private function emitOverseerMessage($type, $data) { 313 $this->endStdoutCapture(); 314 echo $this->encodeOverseerMessage($type, $data); 315 $this->beginStdoutCapture(); 316 } 317 318 public static function errorListener($event, $value, array $metadata) { 319 // If the caller has redirected the error log to a file, PHP won't output 320 // messages to stderr, so the overseer can't capture them. Install a 321 // listener which just echoes errors to stderr, so the overseer is always 322 // aware of errors. 323 324 $console = PhutilConsole::getConsole(); 325 $message = idx($metadata, 'default_message'); 326 327 if ($message) { 328 $console->writeErr("%s\n", $message); 329 } 330 if (idx($metadata, 'trace')) { 331 $trace = PhutilErrorHandler::formatStacktrace($metadata['trace']); 332 $console->writeErr("%s\n", $trace); 333 } 334 } 335 336 337/* -( Autoscaling )-------------------------------------------------------- */ 338 339 340 /** 341 * Prepare to become busy. This may autoscale the pool up. 342 * 343 * This notifies the overseer that the daemon has become busy. If daemons 344 * that are part of an autoscale pool are continuously busy for a prolonged 345 * period of time, the overseer may scale up the pool. 346 * 347 * @return $this 348 * @task autoscale 349 */ 350 protected function willBeginWork() { 351 if ($this->workState != self::WORKSTATE_BUSY) { 352 $this->workState = self::WORKSTATE_BUSY; 353 $this->idleSince = null; 354 $this->emitOverseerMessage(self::MESSAGETYPE_BUSY, null); 355 } 356 357 return $this; 358 } 359 360 361 /** 362 * Prepare to idle. This may autoscale the pool down. 363 * 364 * This notifies the overseer that the daemon is no longer busy. If daemons 365 * that are part of an autoscale pool are idle for a prolonged period of 366 * time, they may exit to scale the pool down. 367 * 368 * @return $this 369 * @task autoscale 370 */ 371 protected function willBeginIdle() { 372 if ($this->workState != self::WORKSTATE_IDLE) { 373 $this->workState = self::WORKSTATE_IDLE; 374 $this->idleSince = time(); 375 $this->emitOverseerMessage(self::MESSAGETYPE_IDLE, null); 376 } 377 378 return $this; 379 } 380 381 protected function getIdleDuration() { 382 if (!$this->idleSince) { 383 return null; 384 } 385 386 $now = time(); 387 return ($now - $this->idleSince); 388 } 389 390}