@recaptime-dev's working patches + fork for Phorge, a community fork of Phabricator. (Upstream dev and stable branches are at upstream/main and upstream/stable respectively.) hq.recaptime.dev/wiki/Phorge
phorge phabricator
at upstream/main 944 lines 27 kB view raw
1<?php 2 3/** 4 * Manages repository synchronization for cluster repositories. 5 * 6 * @task config Configuring Synchronization 7 * @task sync Cluster Synchronization 8 * @task internal Internals 9 */ 10final class DiffusionRepositoryClusterEngine extends Phobject { 11 12 private $repository; 13 private $viewer; 14 private $actingAsPHID; 15 private $logger; 16 17 private $clusterWriteLock; 18 private $clusterWriteVersion; 19 private $clusterWriteOwner; 20 21 22/* -( Configuring Synchronization )---------------------------------------- */ 23 24 25 public function setRepository(PhabricatorRepository $repository) { 26 $this->repository = $repository; 27 return $this; 28 } 29 30 public function getRepository() { 31 return $this->repository; 32 } 33 34 public function setViewer(PhabricatorUser $viewer) { 35 $this->viewer = $viewer; 36 return $this; 37 } 38 39 public function getViewer() { 40 return $this->viewer; 41 } 42 43 public function setLog(DiffusionRepositoryClusterEngineLogInterface $log) { 44 $this->logger = $log; 45 return $this; 46 } 47 48 public function setActingAsPHID($acting_as_phid) { 49 $this->actingAsPHID = $acting_as_phid; 50 return $this; 51 } 52 53 public function getActingAsPHID() { 54 return $this->actingAsPHID; 55 } 56 57 private function getEffectiveActingAsPHID() { 58 if ($this->actingAsPHID) { 59 return $this->actingAsPHID; 60 } 61 62 return $this->getViewer()->getPHID(); 63 } 64 65 66/* -( Cluster Synchronization )-------------------------------------------- */ 67 68 69 /** 70 * Synchronize repository version information after creating a repository. 71 * 72 * This initializes working copy versions for all currently bound devices to 73 * 0, so that we don't get stuck making an ambiguous choice about which 74 * devices are leaders when we later synchronize before a read. 75 * 76 * @task sync 77 */ 78 public function synchronizeWorkingCopyAfterCreation() { 79 if (!$this->shouldEnableSynchronization(false)) { 80 return; 81 } 82 83 $repository = $this->getRepository(); 84 $repository_phid = $repository->getPHID(); 85 86 $service = $repository->loadAlmanacService(); 87 if (!$service) { 88 throw new Exception(pht('Failed to load repository cluster service.')); 89 } 90 91 $bindings = $service->getActiveBindings(); 92 foreach ($bindings as $binding) { 93 PhabricatorRepositoryWorkingCopyVersion::updateVersion( 94 $repository_phid, 95 $binding->getDevicePHID(), 96 0); 97 } 98 99 return $this; 100 } 101 102 103 /** 104 * @task sync 105 */ 106 public function synchronizeWorkingCopyAfterHostingChange() { 107 if (!$this->shouldEnableSynchronization(false)) { 108 return; 109 } 110 111 $repository = $this->getRepository(); 112 $repository_phid = $repository->getPHID(); 113 114 $versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions( 115 $repository_phid); 116 $versions = mpull($versions, null, 'getDevicePHID'); 117 118 // After converting a hosted repository to observed, or vice versa, we 119 // need to reset version numbers because the clocks for observed and hosted 120 // repositories run on different units. 121 122 // We identify all the cluster leaders and reset their version to 0. 123 // We identify all the cluster followers and demote them. 124 125 // This allows the cluster to start over again at version 0 but keep the 126 // same leaders. 127 128 if ($versions) { 129 $max_version = (int)max(mpull($versions, 'getRepositoryVersion')); 130 foreach ($versions as $version) { 131 $device_phid = $version->getDevicePHID(); 132 133 if ($version->getRepositoryVersion() == $max_version) { 134 PhabricatorRepositoryWorkingCopyVersion::updateVersion( 135 $repository_phid, 136 $device_phid, 137 0); 138 } else { 139 PhabricatorRepositoryWorkingCopyVersion::demoteDevice( 140 $repository_phid, 141 $device_phid); 142 } 143 } 144 } 145 146 return $this; 147 } 148 149 150 /** 151 * @task sync 152 */ 153 public function synchronizeWorkingCopyBeforeRead() { 154 if (!$this->shouldEnableSynchronization(true)) { 155 return; 156 } 157 158 $repository = $this->getRepository(); 159 $repository_phid = $repository->getPHID(); 160 161 $device = AlmanacKeys::getLiveDevice(); 162 $device_phid = $device->getPHID(); 163 164 $read_lock = PhabricatorRepositoryWorkingCopyVersion::getReadLock( 165 $repository_phid, 166 $device_phid); 167 168 $lock_wait = phutil_units('2 minutes in seconds'); 169 170 $this->logLine( 171 pht( 172 'Acquiring read lock for repository "%s" on device "%s"...', 173 $repository->getDisplayName(), 174 $device->getName())); 175 176 try { 177 $start = PhabricatorTime::getNow(); 178 $read_lock->lock($lock_wait); 179 $waited = (PhabricatorTime::getNow() - $start); 180 181 if ($waited) { 182 $this->logLine( 183 pht( 184 'Acquired read lock after %s second(s).', 185 new PhutilNumber($waited))); 186 } else { 187 $this->logLine( 188 pht( 189 'Acquired read lock immediately.')); 190 } 191 } catch (PhutilLockException $ex) { 192 throw new Exception( 193 pht( 194 'Failed to acquire read lock after waiting %s second(s). You '. 195 'may be able to retry later. (%s)', 196 new PhutilNumber($lock_wait), 197 $ex->getHint()), 198 0, 199 $ex); 200 } 201 202 $versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions( 203 $repository_phid); 204 $versions = mpull($versions, null, 'getDevicePHID'); 205 206 $this_version = idx($versions, $device_phid); 207 if ($this_version) { 208 $this_version = (int)$this_version->getRepositoryVersion(); 209 } else { 210 $this_version = null; 211 } 212 213 if ($versions) { 214 // This is the normal case, where we have some version information and 215 // can identify which nodes are leaders. If the current node is not a 216 // leader, we want to fetch from a leader and then update our version. 217 218 $max_version = (int)max(mpull($versions, 'getRepositoryVersion')); 219 if (($this_version === null) || ($max_version > $this_version)) { 220 if ($repository->isHosted()) { 221 $fetchable = array(); 222 foreach ($versions as $version) { 223 if ($version->getRepositoryVersion() == $max_version) { 224 $fetchable[] = $version->getDevicePHID(); 225 } 226 } 227 228 229 $this->synchronizeWorkingCopyFromDevices( 230 $fetchable, 231 $this_version, 232 $max_version); 233 } else { 234 $this->synchronizeWorkingCopyFromRemote(); 235 } 236 237 PhabricatorRepositoryWorkingCopyVersion::updateVersion( 238 $repository_phid, 239 $device_phid, 240 $max_version); 241 } else { 242 $this->logLine( 243 pht( 244 'Device "%s" is already a cluster leader and does not need '. 245 'to be synchronized.', 246 $device->getName())); 247 } 248 249 $result_version = $max_version; 250 } else { 251 // If no version records exist yet, we need to be careful, because we 252 // can not tell which nodes are leaders. 253 254 // There might be several nodes with arbitrary existing data, and we have 255 // no way to tell which one has the "right" data. If we pick wrong, we 256 // might erase some or all of the data in the repository. 257 258 // Since this is dangerous, we refuse to guess unless there is only one 259 // device. If we're the only device in the group, we obviously must be 260 // a leader. 261 262 $service = $repository->loadAlmanacService(); 263 if (!$service) { 264 throw new Exception(pht('Failed to load repository cluster service.')); 265 } 266 267 $bindings = $service->getActiveBindings(); 268 $device_map = array(); 269 foreach ($bindings as $binding) { 270 $device_map[$binding->getDevicePHID()] = true; 271 } 272 273 if (count($device_map) > 1) { 274 throw new Exception( 275 pht( 276 'Repository "%s" exists on more than one device, but no device '. 277 'has any repository version information. There is no way for the '. 278 'software to determine which copy of the existing data is '. 279 'authoritative. Promote a device or see "Ambiguous Leaders" in '. 280 'the documentation.', 281 $repository->getDisplayName())); 282 } 283 284 if (empty($device_map[$device->getPHID()])) { 285 throw new Exception( 286 pht( 287 'Repository "%s" is being synchronized on device "%s", but '. 288 'this device is not bound to the corresponding cluster '. 289 'service ("%s").', 290 $repository->getDisplayName(), 291 $device->getName(), 292 $service->getName())); 293 } 294 295 // The current device is the only device in service, so it must be a 296 // leader. We can safely have any future nodes which come online read 297 // from it. 298 PhabricatorRepositoryWorkingCopyVersion::updateVersion( 299 $repository_phid, 300 $device_phid, 301 0); 302 303 $result_version = 0; 304 } 305 306 $read_lock->unlock(); 307 308 return $result_version; 309 } 310 311 312 /** 313 * @task sync 314 */ 315 public function synchronizeWorkingCopyBeforeWrite() { 316 if (!$this->shouldEnableSynchronization(true)) { 317 return; 318 } 319 320 $repository = $this->getRepository(); 321 $viewer = $this->getViewer(); 322 323 $repository_phid = $repository->getPHID(); 324 325 $device = AlmanacKeys::getLiveDevice(); 326 $device_phid = $device->getPHID(); 327 328 $table = new PhabricatorRepositoryWorkingCopyVersion(); 329 $locked_connection = $table->establishConnection('w'); 330 331 $write_lock = PhabricatorRepositoryWorkingCopyVersion::getWriteLock( 332 $repository_phid); 333 334 $write_lock->setExternalConnection($locked_connection); 335 336 $this->logLine( 337 pht( 338 'Acquiring write lock for repository "%s"...', 339 $repository->getDisplayName())); 340 341 // See T13590. On the HTTP pathway, it's possible for us to hit the script 342 // time limit while holding the durable write lock if a user makes a big 343 // push. Remove the time limit before we acquire the durable lock. 344 set_time_limit(0); 345 346 $lock_wait = phutil_units('2 minutes in seconds'); 347 try { 348 $write_wait_start = microtime(true); 349 350 $start = PhabricatorTime::getNow(); 351 $step_wait = 1; 352 353 while (true) { 354 try { 355 $write_lock->lock((int)floor($step_wait)); 356 $write_wait_end = microtime(true); 357 break; 358 } catch (PhutilLockException $ex) { 359 $waited = (PhabricatorTime::getNow() - $start); 360 if ($waited > $lock_wait) { 361 throw $ex; 362 } 363 $this->logActiveWriter($viewer, $repository); 364 } 365 366 // Wait a little longer before the next message we print. 367 $step_wait = $step_wait + 0.5; 368 $step_wait = min($step_wait, 3); 369 } 370 371 $waited = (PhabricatorTime::getNow() - $start); 372 if ($waited) { 373 $this->logLine( 374 pht( 375 'Acquired write lock after %s second(s).', 376 new PhutilNumber($waited))); 377 } else { 378 $this->logLine( 379 pht( 380 'Acquired write lock immediately.')); 381 } 382 } catch (PhutilLockException $ex) { 383 throw new Exception( 384 pht( 385 'Failed to acquire write lock after waiting %s second(s). You '. 386 'may be able to retry later. (%s)', 387 new PhutilNumber($lock_wait), 388 $ex->getHint()), 389 0, 390 $ex); 391 } 392 393 $versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions( 394 $repository_phid); 395 foreach ($versions as $version) { 396 if (!$version->getIsWriting()) { 397 continue; 398 } 399 400 throw new Exception( 401 pht( 402 'An previous write to this repository was interrupted; refusing '. 403 'new writes. This issue requires operator intervention to resolve, '. 404 'see "Write Interruptions" in the "Cluster: Repositories" in the '. 405 'documentation for instructions.')); 406 } 407 408 $read_wait_start = microtime(true); 409 try { 410 $max_version = $this->synchronizeWorkingCopyBeforeRead(); 411 } catch (Exception $ex) { 412 $write_lock->unlock(); 413 throw $ex; 414 } 415 $read_wait_end = microtime(true); 416 417 $pid = getmypid(); 418 $hash = Filesystem::readRandomCharacters(12); 419 $this->clusterWriteOwner = "{$pid}.{$hash}"; 420 421 PhabricatorRepositoryWorkingCopyVersion::willWrite( 422 $locked_connection, 423 $repository_phid, 424 $device_phid, 425 array( 426 'userPHID' => $this->getEffectiveActingAsPHID(), 427 'epoch' => PhabricatorTime::getNow(), 428 'devicePHID' => $device_phid, 429 ), 430 $this->clusterWriteOwner); 431 432 $this->clusterWriteVersion = $max_version; 433 $this->clusterWriteLock = $write_lock; 434 435 $write_wait = ($write_wait_end - $write_wait_start); 436 $read_wait = ($read_wait_end - $read_wait_start); 437 438 $log = $this->logger; 439 if ($log) { 440 $log->writeClusterEngineLogProperty('writeWait', $write_wait); 441 $log->writeClusterEngineLogProperty('readWait', $read_wait); 442 } 443 } 444 445 446 public function synchronizeWorkingCopyAfterDiscovery($new_version) { 447 if (!$this->shouldEnableSynchronization(true)) { 448 return; 449 } 450 451 $repository = $this->getRepository(); 452 $repository_phid = $repository->getPHID(); 453 if ($repository->isHosted()) { 454 return; 455 } 456 457 $device = AlmanacKeys::getLiveDevice(); 458 $device_phid = $device->getPHID(); 459 460 // NOTE: We are not holding a lock here because this method is only called 461 // from PhabricatorRepositoryDiscoveryEngine, which already holds a device 462 // lock. Even if we do race here and record an older version, the 463 // consequences are mild: we only do extra work to correct it later. 464 465 $versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions( 466 $repository_phid); 467 $versions = mpull($versions, null, 'getDevicePHID'); 468 469 $this_version = idx($versions, $device_phid); 470 if ($this_version) { 471 $this_version = (int)$this_version->getRepositoryVersion(); 472 } else { 473 $this_version = null; 474 } 475 476 if (($this_version === null) || ($new_version > $this_version)) { 477 PhabricatorRepositoryWorkingCopyVersion::updateVersion( 478 $repository_phid, 479 $device_phid, 480 $new_version); 481 } 482 } 483 484 485 /** 486 * @task sync 487 */ 488 public function synchronizeWorkingCopyAfterWrite() { 489 if (!$this->shouldEnableSynchronization(true)) { 490 return; 491 } 492 493 if (!$this->clusterWriteLock) { 494 throw new Exception( 495 pht( 496 'Trying to synchronize after write, but not holding a write '. 497 'lock!')); 498 } 499 500 $repository = $this->getRepository(); 501 $repository_phid = $repository->getPHID(); 502 503 $device = AlmanacKeys::getLiveDevice(); 504 $device_phid = $device->getPHID(); 505 506 // It is possible that we've lost the global lock while receiving the push. 507 // For example, the master database may have been restarted between the 508 // time we acquired the global lock and now, when the push has finished. 509 510 // We wrote a durable lock while we were holding the the global lock, 511 // essentially upgrading our lock. We can still safely release this upgraded 512 // lock even if we're no longer holding the global lock. 513 514 // If we fail to release the lock, the repository will be frozen until 515 // an operator can figure out what happened, so we try pretty hard to 516 // reconnect to the database and release the lock. 517 518 $now = PhabricatorTime::getNow(); 519 $duration = phutil_units('5 minutes in seconds'); 520 $try_until = $now + $duration; 521 522 $did_release = false; 523 $already_failed = false; 524 while (PhabricatorTime::getNow() <= $try_until) { 525 try { 526 // NOTE: This means we're still bumping the version when pushes fail. We 527 // could select only un-rejected events instead to bump a little less 528 // often. 529 530 $new_log = id(new PhabricatorRepositoryPushEventQuery()) 531 ->setViewer(PhabricatorUser::getOmnipotentUser()) 532 ->withRepositoryPHIDs(array($repository_phid)) 533 ->setLimit(1) 534 ->executeOne(); 535 536 $old_version = $this->clusterWriteVersion; 537 if ($new_log) { 538 $new_version = $new_log->getID(); 539 } else { 540 $new_version = $old_version; 541 } 542 543 PhabricatorRepositoryWorkingCopyVersion::didWrite( 544 $repository_phid, 545 $device_phid, 546 $this->clusterWriteVersion, 547 $new_version, 548 $this->clusterWriteOwner); 549 $did_release = true; 550 break; 551 } catch (AphrontConnectionQueryException $ex) { 552 $connection_exception = $ex; 553 } catch (AphrontConnectionLostQueryException $ex) { 554 $connection_exception = $ex; 555 } 556 557 if (!$already_failed) { 558 $already_failed = true; 559 $this->logLine( 560 pht('CRITICAL. Failed to release cluster write lock!')); 561 562 $this->logLine( 563 pht( 564 'The connection to the master database was lost while receiving '. 565 'the write.')); 566 567 $this->logLine( 568 pht( 569 'This process will spend %s more second(s) attempting to '. 570 'recover, then give up.', 571 new PhutilNumber($duration))); 572 } 573 574 sleep(1); 575 } 576 577 if ($did_release) { 578 if ($already_failed) { 579 $this->logLine( 580 pht('RECOVERED. Link to master database was restored.')); 581 } 582 $this->logLine(pht('Released cluster write lock.')); 583 } else { 584 throw new Exception( 585 pht( 586 'Failed to reconnect to master database and release held write '. 587 'lock ("%s") on device "%s" for repository "%s" after trying '. 588 'for %s seconds. This repository will be frozen.', 589 $this->clusterWriteOwner, 590 $device->getName(), 591 $repository->getDisplayName(), 592 new PhutilNumber($duration))); 593 } 594 595 // We can continue even if we've lost this lock, everything is still 596 // consistent. 597 try { 598 $this->clusterWriteLock->unlock(); 599 } catch (Exception $ex) { 600 // Ignore. 601 } 602 603 $this->clusterWriteLock = null; 604 $this->clusterWriteOwner = null; 605 } 606 607 608/* -( Internals )---------------------------------------------------------- */ 609 610 611 /** 612 * @task internal 613 */ 614 private function shouldEnableSynchronization($require_device) { 615 $repository = $this->getRepository(); 616 617 $service_phid = $repository->getAlmanacServicePHID(); 618 if (!$service_phid) { 619 return false; 620 } 621 622 if (!$repository->supportsSynchronization()) { 623 return false; 624 } 625 626 if ($require_device) { 627 $device = AlmanacKeys::getLiveDevice(); 628 if (!$device) { 629 return false; 630 } 631 } 632 633 return true; 634 } 635 636 637 /** 638 * @task internal 639 */ 640 private function synchronizeWorkingCopyFromRemote() { 641 $repository = $this->getRepository(); 642 $device = AlmanacKeys::getLiveDevice(); 643 644 $local_path = $repository->getLocalPath(); 645 $fetch_uri = $repository->getRemoteURIEnvelope(); 646 647 if ($repository->isGit()) { 648 $this->requireWorkingCopy(); 649 650 $argv = array( 651 'fetch --prune -- %P %s', 652 $fetch_uri, 653 '+refs/*:refs/*', 654 ); 655 } else { 656 throw new Exception(pht('Remote sync only supported for git!')); 657 } 658 659 $future = DiffusionCommandEngine::newCommandEngine($repository) 660 ->setArgv($argv) 661 ->setSudoAsDaemon(true) 662 ->setCredentialPHID($repository->getCredentialPHID()) 663 ->setURI($repository->getRemoteURIObject()) 664 ->newFuture(); 665 666 $future->setCWD($local_path); 667 668 try { 669 $future->resolvex(); 670 } catch (Exception $ex) { 671 $this->logLine( 672 pht( 673 'Synchronization of "%s" from remote failed: %s', 674 $device->getName(), 675 $ex->getMessage())); 676 throw $ex; 677 } 678 } 679 680 681 /** 682 * @task internal 683 */ 684 private function synchronizeWorkingCopyFromDevices( 685 array $device_phids, 686 $local_version, 687 $remote_version) { 688 689 $repository = $this->getRepository(); 690 691 $service = $repository->loadAlmanacService(); 692 if (!$service) { 693 throw new Exception(pht('Failed to load repository cluster service.')); 694 } 695 696 $device_map = array_fuse($device_phids); 697 $bindings = $service->getActiveBindings(); 698 699 $fetchable = array(); 700 foreach ($bindings as $binding) { 701 // We can't fetch from nodes which don't have the newest version. 702 $device_phid = $binding->getDevicePHID(); 703 if (empty($device_map[$device_phid])) { 704 continue; 705 } 706 707 // TODO: For now, only fetch over SSH. We could support fetching over 708 // HTTP eventually. 709 if ($binding->getAlmanacPropertyValue('protocol') != 'ssh') { 710 continue; 711 } 712 713 $fetchable[] = $binding; 714 } 715 716 if (!$fetchable) { 717 throw new Exception( 718 pht( 719 'Leader lost: no up-to-date nodes in repository cluster are '. 720 'fetchable.')); 721 } 722 723 // If we can synchronize from multiple sources, choose one at random. 724 shuffle($fetchable); 725 726 $caught = null; 727 foreach ($fetchable as $binding) { 728 try { 729 $this->synchronizeWorkingCopyFromBinding( 730 $binding, 731 $local_version, 732 $remote_version); 733 $caught = null; 734 break; 735 } catch (Exception $ex) { 736 $caught = $ex; 737 } 738 } 739 740 if ($caught) { 741 throw $caught; 742 } 743 } 744 745 746 /** 747 * @task internal 748 */ 749 private function synchronizeWorkingCopyFromBinding( 750 AlmanacBinding $binding, 751 $local_version, 752 $remote_version) { 753 754 $repository = $this->getRepository(); 755 $device = AlmanacKeys::getLiveDevice(); 756 757 $this->logLine( 758 pht( 759 'Synchronizing this device ("%s") from cluster leader ("%s").', 760 $device->getName(), 761 $binding->getDevice()->getName())); 762 763 $fetch_uri = $repository->getClusterRepositoryURIFromBinding($binding); 764 $local_path = $repository->getLocalPath(); 765 766 if ($repository->isGit()) { 767 $this->requireWorkingCopy(); 768 769 $argv = array( 770 'fetch --prune -- %s %s', 771 $fetch_uri, 772 '+refs/*:refs/*', 773 ); 774 } else { 775 throw new Exception(pht('Binding sync only supported for git!')); 776 } 777 778 $future = DiffusionCommandEngine::newCommandEngine($repository) 779 ->setArgv($argv) 780 ->setConnectAsDevice(true) 781 ->setSudoAsDaemon(true) 782 ->setURI($fetch_uri) 783 ->newFuture(); 784 785 $future->setCWD($local_path); 786 787 $log = PhabricatorRepositorySyncEvent::initializeNewEvent() 788 ->setRepositoryPHID($repository->getPHID()) 789 ->setEpoch(PhabricatorTime::getNow()) 790 ->setDevicePHID($device->getPHID()) 791 ->setFromDevicePHID($binding->getDevice()->getPHID()) 792 ->setDeviceVersion($local_version) 793 ->setFromDeviceVersion($remote_version); 794 795 $sync_start = microtime(true); 796 797 try { 798 $future->resolvex(); 799 } catch (Exception $ex) { 800 $log->setSyncWait(phutil_microseconds_since($sync_start)); 801 802 if ($ex instanceof CommandException) { 803 if ($future->getWasKilledByTimeout()) { 804 $result_type = PhabricatorRepositorySyncEvent::RESULT_TIMEOUT; 805 } else { 806 $result_type = PhabricatorRepositorySyncEvent::RESULT_ERROR; 807 } 808 809 $log 810 ->setResultCode($ex->getError()) 811 ->setResultType($result_type) 812 ->setProperty('stdout', $ex->getStdout()) 813 ->setProperty('stderr', $ex->getStderr()); 814 } else { 815 $log 816 ->setResultCode(1) 817 ->setResultType(PhabricatorRepositorySyncEvent::RESULT_EXCEPTION) 818 ->setProperty('message', $ex->getMessage()); 819 } 820 821 $log->save(); 822 823 $this->logLine( 824 pht( 825 'Synchronization of "%s" from leader "%s" failed: %s', 826 $device->getName(), 827 $binding->getDevice()->getName(), 828 $ex->getMessage())); 829 830 throw $ex; 831 } 832 833 $log 834 ->setSyncWait(phutil_microseconds_since($sync_start)) 835 ->setResultCode(0) 836 ->setResultType(PhabricatorRepositorySyncEvent::RESULT_SYNC) 837 ->save(); 838 } 839 840 841 /** 842 * @task internal 843 */ 844 private function logLine($message) { 845 return $this->logText("# {$message}\n"); 846 } 847 848 849 /** 850 * @task internal 851 */ 852 private function logText($message) { 853 $log = $this->logger; 854 if ($log) { 855 $log->writeClusterEngineLogMessage($message); 856 } 857 return $this; 858 } 859 860 private function requireWorkingCopy() { 861 $repository = $this->getRepository(); 862 $local_path = $repository->getLocalPath(); 863 864 if (!Filesystem::pathExists($local_path)) { 865 $device = AlmanacKeys::getLiveDevice(); 866 867 throw new Exception( 868 pht( 869 'Repository "%s" does not have a working copy on this device '. 870 'yet, so it can not be synchronized. Wait for the daemons to '. 871 'construct one or run `bin/repository update %s` on this host '. 872 '("%s") to build it explicitly.', 873 $repository->getDisplayName(), 874 $repository->getMonogram(), 875 $device->getName())); 876 } 877 } 878 879 private function logActiveWriter( 880 PhabricatorUser $viewer, 881 PhabricatorRepository $repository) { 882 883 $writer = PhabricatorRepositoryWorkingCopyVersion::loadWriter( 884 $repository->getPHID()); 885 if (!$writer) { 886 $this->logLine(pht('Waiting on another user to finish writing...')); 887 return; 888 } 889 890 $user_phid = $writer->getWriteProperty('userPHID'); 891 $device_phid = $writer->getWriteProperty('devicePHID'); 892 $epoch = $writer->getWriteProperty('epoch'); 893 894 $phids = array($user_phid, $device_phid); 895 $handles = $viewer->loadHandles($phids); 896 897 $duration = (PhabricatorTime::getNow() - $epoch) + 1; 898 899 $this->logLine( 900 pht( 901 'Waiting for %s to finish writing (on device "%s" for %ss)...', 902 $handles[$user_phid]->getName(), 903 $handles[$device_phid]->getName(), 904 new PhutilNumber($duration))); 905 } 906 907 public function newMaintenanceEvent() { 908 $viewer = $this->getViewer(); 909 $repository = $this->getRepository(); 910 $now = PhabricatorTime::getNow(); 911 912 $event = PhabricatorRepositoryPushEvent::initializeNewEvent($viewer) 913 ->setRepositoryPHID($repository->getPHID()) 914 ->setEpoch($now) 915 ->setPusherPHID($this->getEffectiveActingAsPHID()) 916 ->setRejectCode(PhabricatorRepositoryPushLog::REJECT_ACCEPT); 917 918 return $event; 919 } 920 921 public function newMaintenanceLog() { 922 $viewer = $this->getViewer(); 923 $repository = $this->getRepository(); 924 $now = PhabricatorTime::getNow(); 925 926 $device = AlmanacKeys::getLiveDevice(); 927 if ($device) { 928 $device_phid = $device->getPHID(); 929 } else { 930 $device_phid = null; 931 } 932 933 return PhabricatorRepositoryPushLog::initializeNewLog($viewer) 934 ->setDevicePHID($device_phid) 935 ->setRepositoryPHID($repository->getPHID()) 936 ->attachRepository($repository) 937 ->setEpoch($now) 938 ->setPusherPHID($this->getEffectiveActingAsPHID()) 939 ->setChangeFlags(PhabricatorRepositoryPushLog::CHANGEFLAG_MAINTENANCE) 940 ->setRefType(PhabricatorRepositoryPushLog::REFTYPE_MAINTENANCE) 941 ->setRefNew(''); 942 } 943 944}