@recaptime-dev's working patches + fork for Phorge, a community fork of Phabricator. (Upstream dev and stable branches are at upstream/main and upstream/stable respectively.) hq.recaptime.dev/wiki/Phorge
phorge phabricator
at upstream/main 537 lines 14 kB view raw
1<?php 2 3final class PhutilDaemonHandle extends Phobject { 4 5 const EVENT_DID_LAUNCH = 'daemon.didLaunch'; 6 const EVENT_DID_LOG = 'daemon.didLogMessage'; 7 const EVENT_DID_HEARTBEAT = 'daemon.didHeartbeat'; 8 const EVENT_WILL_GRACEFUL = 'daemon.willGraceful'; 9 const EVENT_WILL_EXIT = 'daemon.willExit'; 10 11 private $pool; 12 private $properties; 13 private $future; 14 private $argv; 15 16 private $restartAt; 17 private $busyEpoch; 18 19 private $daemonID; 20 private $deadline; 21 private $heartbeat; 22 private $stdoutBuffer; 23 private $shouldRestart = true; 24 private $shouldShutdown; 25 private $hibernating = false; 26 private $shouldSendExitEvent = false; 27 28 private function __construct() { 29 // <empty> 30 } 31 32 public static function newFromConfig(array $config) { 33 PhutilTypeSpec::checkMap( 34 $config, 35 array( 36 'class' => 'string', 37 'argv' => 'optional list<string>', 38 'load' => 'optional list<string>', 39 'log' => 'optional string|null', 40 'down' => 'optional int', 41 )); 42 43 $config = $config + array( 44 'argv' => array(), 45 'load' => array(), 46 'log' => null, 47 'down' => 15, 48 ); 49 50 $daemon = new self(); 51 $daemon->properties = $config; 52 $daemon->daemonID = $daemon->generateDaemonID(); 53 54 return $daemon; 55 } 56 57 public function setDaemonPool(PhutilDaemonPool $daemon_pool) { 58 $this->pool = $daemon_pool; 59 return $this; 60 } 61 62 public function getDaemonPool() { 63 return $this->pool; 64 } 65 66 public function getBusyEpoch() { 67 return $this->busyEpoch; 68 } 69 70 public function getDaemonClass() { 71 return $this->getProperty('class'); 72 } 73 74 private function getProperty($key) { 75 return idx($this->properties, $key); 76 } 77 78 public function setCommandLineArguments(array $arguments) { 79 $this->argv = $arguments; 80 return $this; 81 } 82 83 public function getCommandLineArguments() { 84 return $this->argv; 85 } 86 87 public function getDaemonArguments() { 88 return $this->getProperty('argv'); 89 } 90 91 public function didLaunch() { 92 $this->restartAt = time(); 93 $this->shouldSendExitEvent = true; 94 95 $this->dispatchEvent( 96 self::EVENT_DID_LAUNCH, 97 array( 98 'argv' => $this->getCommandLineArguments(), 99 'explicitArgv' => $this->getDaemonArguments(), 100 )); 101 102 return $this; 103 } 104 105 public function isRunning() { 106 return (bool)$this->getFuture(); 107 } 108 109 public function isHibernating() { 110 return 111 !$this->isRunning() && 112 !$this->isDone() && 113 $this->hibernating; 114 } 115 116 public function wakeFromHibernation() { 117 if (!$this->isHibernating()) { 118 return $this; 119 } 120 121 $this->logMessage( 122 'WAKE', 123 pht( 124 'Process is being awakened from hibernation.')); 125 126 $this->restartAt = time(); 127 $this->update(); 128 129 return $this; 130 } 131 132 public function isDone() { 133 return (!$this->shouldRestart && !$this->isRunning()); 134 } 135 136 public function update() { 137 if (!$this->isRunning()) { 138 if (!$this->shouldRestart) { 139 return; 140 } 141 if (!$this->restartAt || (time() < $this->restartAt)) { 142 return; 143 } 144 if ($this->shouldShutdown) { 145 return; 146 } 147 $this->startDaemonProcess(); 148 } 149 150 $future = $this->getFuture(); 151 152 $result = null; 153 $caught = null; 154 if ($future->canResolve()) { 155 $this->future = null; 156 try { 157 $result = $future->resolve(); 158 } catch (Exception $ex) { 159 $caught = $ex; 160 } catch (Throwable $ex) { 161 $caught = $ex; 162 } 163 } 164 165 list($stdout, $stderr) = $future->read(); 166 $future->discardBuffers(); 167 168 if (strlen($stdout)) { 169 $this->didReadStdout($stdout); 170 } 171 172 $stderr = trim($stderr); 173 if (strlen($stderr)) { 174 foreach (phutil_split_lines($stderr, false) as $line) { 175 $this->logMessage('STDE', $line); 176 } 177 } 178 179 if ($result !== null || $caught !== null) { 180 181 if ($caught) { 182 $message = pht( 183 'Process failed with exception: %s', 184 $caught->getMessage()); 185 $this->logMessage('FAIL', $message); 186 } else { 187 list($err) = $result; 188 189 if ($err) { 190 $this->logMessage('FAIL', pht('Process exited with error %s.', $err)); 191 } else { 192 $this->logMessage('DONE', pht('Process exited normally.')); 193 } 194 } 195 196 if ($this->shouldShutdown) { 197 $this->restartAt = null; 198 } else { 199 $this->scheduleRestart(); 200 } 201 } 202 203 $this->updateHeartbeatEvent(); 204 $this->updateHangDetection(); 205 } 206 207 private function updateHeartbeatEvent() { 208 if ($this->heartbeat > time()) { 209 return; 210 } 211 212 $this->heartbeat = time() + $this->getHeartbeatEventFrequency(); 213 $this->dispatchEvent(self::EVENT_DID_HEARTBEAT); 214 } 215 216 private function updateHangDetection() { 217 if (!$this->isRunning()) { 218 return; 219 } 220 221 if (time() > $this->deadline) { 222 $this->logMessage('HANG', pht('Hang detected. Restarting process.')); 223 $this->annihilateProcessGroup(); 224 $this->scheduleRestart(); 225 } 226 } 227 228 private function scheduleRestart() { 229 // Wait a minimum of a few sceconds before restarting, but we may wait 230 // longer if the daemon has initiated hibernation. 231 $default_restart = time() + self::getWaitBeforeRestart(); 232 if ($default_restart >= $this->restartAt) { 233 $this->restartAt = $default_restart; 234 } 235 236 $this->logMessage( 237 'WAIT', 238 pht( 239 'Waiting %s second(s) to restart process.', 240 new PhutilNumber($this->restartAt - time()))); 241 } 242 243 /** 244 * Generate a unique ID for this daemon. 245 * 246 * @return string A unique daemon ID. 247 */ 248 private function generateDaemonID() { 249 return substr(getmypid().':'.Filesystem::readRandomCharacters(12), 0, 12); 250 } 251 252 public function getDaemonID() { 253 return $this->daemonID; 254 } 255 256 private function getFuture() { 257 return $this->future; 258 } 259 260 private function getPID() { 261 $future = $this->getFuture(); 262 263 if (!$future) { 264 return null; 265 } 266 267 if (!$future->hasPID()) { 268 return null; 269 } 270 271 return $future->getPID(); 272 } 273 274 private function getCaptureBufferSize() { 275 return 65535; 276 } 277 278 private function getRequiredHeartbeatFrequency() { 279 return 86400; 280 } 281 282 public static function getWaitBeforeRestart() { 283 return 5; 284 } 285 286 public static function getHeartbeatEventFrequency() { 287 return 120; 288 } 289 290 private function getKillDelay() { 291 return 3; 292 } 293 294 private function getDaemonCWD() { 295 $root = dirname(phutil_get_library_root('phabricator')); 296 return $root.'/scripts/daemon/exec/'; 297 } 298 299 private function newExecFuture() { 300 $class = $this->getDaemonClass(); 301 $argv = $this->getCommandLineArguments(); 302 $buffer_size = $this->getCaptureBufferSize(); 303 304 // NOTE: PHP implements proc_open() by running 'sh -c'. On most systems this 305 // is bash, but on Ubuntu it's dash. When you proc_open() using bash, you 306 // get one new process (the command you ran). When you proc_open() using 307 // dash, you get two new processes: the command you ran and a parent 308 // "dash -c" (or "sh -c") process. This means that the child process's PID 309 // is actually the 'dash' PID, not the command's PID. To avoid this, use 310 // 'exec' to replace the shell process with the real process; without this, 311 // the child will call posix_getppid(), be given the pid of the 'sh -c' 312 // process, and send it SIGUSR1 to keepalive which will terminate it 313 // immediately. We also won't be able to do process group management because 314 // the shell process won't properly posix_setsid() so the pgid of the child 315 // won't be meaningful. 316 317 $config = $this->properties; 318 unset($config['class']); 319 $config = phutil_json_encode($config); 320 321 return id(new ExecFuture('exec ./exec_daemon.php %s %Ls', $class, $argv)) 322 ->setCWD($this->getDaemonCWD()) 323 ->setStdoutSizeLimit($buffer_size) 324 ->setStderrSizeLimit($buffer_size) 325 ->write($config); 326 } 327 328 /** 329 * Dispatch an event to event listeners. 330 * 331 * @param string $type Event type. 332 * @param array $params (optional) Event parameters. 333 * @return void 334 */ 335 private function dispatchEvent($type, array $params = array()) { 336 $data = array( 337 'id' => $this->getDaemonID(), 338 'daemonClass' => $this->getDaemonClass(), 339 'childPID' => $this->getPID(), 340 ) + $params; 341 342 $event = new PhutilEvent($type, $data); 343 344 try { 345 PhutilEventEngine::dispatchEvent($event); 346 } catch (Exception $ex) { 347 phlog($ex); 348 } 349 } 350 351 private function annihilateProcessGroup() { 352 $pid = $this->getPID(); 353 if ($pid) { 354 $pgid = posix_getpgid($pid); 355 if ($pgid) { 356 posix_kill(-$pgid, SIGTERM); 357 sleep($this->getKillDelay()); 358 posix_kill(-$pgid, SIGKILL); 359 } 360 } 361 } 362 363 private function startDaemonProcess() { 364 $this->logMessage('INIT', pht('Starting process.')); 365 366 $this->deadline = time() + $this->getRequiredHeartbeatFrequency(); 367 $this->heartbeat = time() + self::getHeartbeatEventFrequency(); 368 $this->stdoutBuffer = ''; 369 $this->hibernating = false; 370 371 $future = $this->newExecFuture(); 372 $this->future = $future; 373 374 $pool = $this->getDaemonPool(); 375 $overseer = $pool->getOverseer(); 376 $overseer->addFutureToPool($future); 377 } 378 379 private function didReadStdout($data) { 380 $this->stdoutBuffer .= $data; 381 while (true) { 382 $pos = strpos($this->stdoutBuffer, "\n"); 383 if ($pos === false) { 384 break; 385 } 386 $message = substr($this->stdoutBuffer, 0, $pos); 387 $this->stdoutBuffer = substr($this->stdoutBuffer, $pos + 1); 388 389 try { 390 $structure = phutil_json_decode($message); 391 } catch (PhutilJSONParserException $ex) { 392 $structure = array(); 393 } 394 395 switch (idx($structure, 0)) { 396 case PhutilDaemon::MESSAGETYPE_STDOUT: 397 $this->logMessage('STDO', idx($structure, 1)); 398 break; 399 case PhutilDaemon::MESSAGETYPE_HEARTBEAT: 400 $this->deadline = time() + $this->getRequiredHeartbeatFrequency(); 401 break; 402 case PhutilDaemon::MESSAGETYPE_BUSY: 403 if (!$this->busyEpoch) { 404 $this->busyEpoch = time(); 405 } 406 break; 407 case PhutilDaemon::MESSAGETYPE_IDLE: 408 $this->busyEpoch = null; 409 break; 410 case PhutilDaemon::MESSAGETYPE_DOWN: 411 // The daemon is exiting because it doesn't have enough work and it 412 // is trying to scale the pool down. We should not restart it. 413 $this->shouldRestart = false; 414 $this->shouldShutdown = true; 415 break; 416 case PhutilDaemon::MESSAGETYPE_HIBERNATE: 417 $config = idx($structure, 1); 418 $duration = (int)idx($config, 'duration', 0); 419 $this->restartAt = time() + $duration; 420 $this->hibernating = true; 421 $this->busyEpoch = null; 422 $this->logMessage( 423 'ZZZZ', 424 pht( 425 'Process is preparing to hibernate for %s second(s).', 426 new PhutilNumber($duration))); 427 break; 428 default: 429 // If we can't parse this or it isn't a message we understand, just 430 // emit the raw message. 431 $this->logMessage('STDO', pht('<Malformed> %s', $message)); 432 break; 433 } 434 } 435 } 436 437 public function didReceiveNotifySignal($signo) { 438 $pid = $this->getPID(); 439 if ($pid) { 440 posix_kill($pid, $signo); 441 } 442 } 443 444 public function didReceiveReloadSignal($signo) { 445 $signame = phutil_get_signal_name($signo); 446 if ($signame) { 447 $sigmsg = pht( 448 'Reloading in response to signal %d (%s).', 449 $signo, 450 $signame); 451 } else { 452 $sigmsg = pht( 453 'Reloading in response to signal %d.', 454 $signo); 455 } 456 457 $this->logMessage('RELO', $sigmsg, $signo); 458 459 // This signal means "stop the current process gracefully, then launch 460 // a new identical process once it exits". This can be used to update 461 // daemons after code changes (the new processes will run the new code) 462 // without aborting any running tasks. 463 464 // We SIGINT the daemon but don't set the shutdown flag, so it will 465 // naturally be restarted after it exits, as though it had exited after an 466 // unhandled exception. 467 468 $pid = $this->getPID(); 469 if ($pid) { 470 posix_kill($pid, SIGINT); 471 } 472 } 473 474 public function didReceiveGracefulSignal($signo) { 475 $this->shouldShutdown = true; 476 $this->shouldRestart = false; 477 478 $signame = phutil_get_signal_name($signo); 479 if ($signame) { 480 $sigmsg = pht( 481 'Graceful shutdown in response to signal %d (%s).', 482 $signo, 483 $signame); 484 } else { 485 $sigmsg = pht( 486 'Graceful shutdown in response to signal %d.', 487 $signo); 488 } 489 490 $this->logMessage('DONE', $sigmsg, $signo); 491 492 $pid = $this->getPID(); 493 if ($pid) { 494 posix_kill($pid, SIGINT); 495 } 496 } 497 498 public function didReceiveTerminateSignal($signo) { 499 $this->shouldShutdown = true; 500 $this->shouldRestart = false; 501 502 $signame = phutil_get_signal_name($signo); 503 if ($signame) { 504 $sigmsg = pht( 505 'Shutting down in response to signal %s (%s).', 506 $signo, 507 $signame); 508 } else { 509 $sigmsg = pht('Shutting down in response to signal %s.', $signo); 510 } 511 512 $this->logMessage('EXIT', $sigmsg, $signo); 513 $this->annihilateProcessGroup(); 514 } 515 516 private function logMessage($type, $message, $context = null) { 517 $this->getDaemonPool()->logMessage($type, $message, $context); 518 519 $this->dispatchEvent( 520 self::EVENT_DID_LOG, 521 array( 522 'type' => $type, 523 'message' => $message, 524 'context' => $context, 525 )); 526 } 527 528 public function didExit() { 529 if ($this->shouldSendExitEvent) { 530 $this->dispatchEvent(self::EVENT_WILL_EXIT); 531 $this->shouldSendExitEvent = false; 532 } 533 534 return $this; 535 } 536 537}