@recaptime-dev's working patches + fork for Phorge, a community fork of Phabricator. (Upstream dev and stable branches are at upstream/main and upstream/stable respectively.)
hq.recaptime.dev/wiki/Phorge
phorge
phabricator
1<?php
2
3/**
4 * Manages repository synchronization for cluster repositories.
5 *
6 * @task config Configuring Synchronization
7 * @task sync Cluster Synchronization
8 * @task internal Internals
9 */
10final class DiffusionRepositoryClusterEngine extends Phobject {
11
12 private $repository;
13 private $viewer;
14 private $actingAsPHID;
15 private $logger;
16
17 private $clusterWriteLock;
18 private $clusterWriteVersion;
19 private $clusterWriteOwner;
20
21
22/* -( Configuring Synchronization )---------------------------------------- */
23
24
25 public function setRepository(PhabricatorRepository $repository) {
26 $this->repository = $repository;
27 return $this;
28 }
29
30 public function getRepository() {
31 return $this->repository;
32 }
33
34 public function setViewer(PhabricatorUser $viewer) {
35 $this->viewer = $viewer;
36 return $this;
37 }
38
39 public function getViewer() {
40 return $this->viewer;
41 }
42
43 public function setLog(DiffusionRepositoryClusterEngineLogInterface $log) {
44 $this->logger = $log;
45 return $this;
46 }
47
48 public function setActingAsPHID($acting_as_phid) {
49 $this->actingAsPHID = $acting_as_phid;
50 return $this;
51 }
52
53 public function getActingAsPHID() {
54 return $this->actingAsPHID;
55 }
56
57 private function getEffectiveActingAsPHID() {
58 if ($this->actingAsPHID) {
59 return $this->actingAsPHID;
60 }
61
62 return $this->getViewer()->getPHID();
63 }
64
65
66/* -( Cluster Synchronization )-------------------------------------------- */
67
68
69 /**
70 * Synchronize repository version information after creating a repository.
71 *
72 * This initializes working copy versions for all currently bound devices to
73 * 0, so that we don't get stuck making an ambiguous choice about which
74 * devices are leaders when we later synchronize before a read.
75 *
76 * @task sync
77 */
78 public function synchronizeWorkingCopyAfterCreation() {
79 if (!$this->shouldEnableSynchronization(false)) {
80 return;
81 }
82
83 $repository = $this->getRepository();
84 $repository_phid = $repository->getPHID();
85
86 $service = $repository->loadAlmanacService();
87 if (!$service) {
88 throw new Exception(pht('Failed to load repository cluster service.'));
89 }
90
91 $bindings = $service->getActiveBindings();
92 foreach ($bindings as $binding) {
93 PhabricatorRepositoryWorkingCopyVersion::updateVersion(
94 $repository_phid,
95 $binding->getDevicePHID(),
96 0);
97 }
98
99 return $this;
100 }
101
102
103 /**
104 * @task sync
105 */
106 public function synchronizeWorkingCopyAfterHostingChange() {
107 if (!$this->shouldEnableSynchronization(false)) {
108 return;
109 }
110
111 $repository = $this->getRepository();
112 $repository_phid = $repository->getPHID();
113
114 $versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions(
115 $repository_phid);
116 $versions = mpull($versions, null, 'getDevicePHID');
117
118 // After converting a hosted repository to observed, or vice versa, we
119 // need to reset version numbers because the clocks for observed and hosted
120 // repositories run on different units.
121
122 // We identify all the cluster leaders and reset their version to 0.
123 // We identify all the cluster followers and demote them.
124
125 // This allows the cluster to start over again at version 0 but keep the
126 // same leaders.
127
128 if ($versions) {
129 $max_version = (int)max(mpull($versions, 'getRepositoryVersion'));
130 foreach ($versions as $version) {
131 $device_phid = $version->getDevicePHID();
132
133 if ($version->getRepositoryVersion() == $max_version) {
134 PhabricatorRepositoryWorkingCopyVersion::updateVersion(
135 $repository_phid,
136 $device_phid,
137 0);
138 } else {
139 PhabricatorRepositoryWorkingCopyVersion::demoteDevice(
140 $repository_phid,
141 $device_phid);
142 }
143 }
144 }
145
146 return $this;
147 }
148
149
150 /**
151 * @task sync
152 */
153 public function synchronizeWorkingCopyBeforeRead() {
154 if (!$this->shouldEnableSynchronization(true)) {
155 return;
156 }
157
158 $repository = $this->getRepository();
159 $repository_phid = $repository->getPHID();
160
161 $device = AlmanacKeys::getLiveDevice();
162 $device_phid = $device->getPHID();
163
164 $read_lock = PhabricatorRepositoryWorkingCopyVersion::getReadLock(
165 $repository_phid,
166 $device_phid);
167
168 $lock_wait = phutil_units('2 minutes in seconds');
169
170 $this->logLine(
171 pht(
172 'Acquiring read lock for repository "%s" on device "%s"...',
173 $repository->getDisplayName(),
174 $device->getName()));
175
176 try {
177 $start = PhabricatorTime::getNow();
178 $read_lock->lock($lock_wait);
179 $waited = (PhabricatorTime::getNow() - $start);
180
181 if ($waited) {
182 $this->logLine(
183 pht(
184 'Acquired read lock after %s second(s).',
185 new PhutilNumber($waited)));
186 } else {
187 $this->logLine(
188 pht(
189 'Acquired read lock immediately.'));
190 }
191 } catch (PhutilLockException $ex) {
192 throw new Exception(
193 pht(
194 'Failed to acquire read lock after waiting %s second(s). You '.
195 'may be able to retry later. (%s)',
196 new PhutilNumber($lock_wait),
197 $ex->getHint()),
198 0,
199 $ex);
200 }
201
202 $versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions(
203 $repository_phid);
204 $versions = mpull($versions, null, 'getDevicePHID');
205
206 $this_version = idx($versions, $device_phid);
207 if ($this_version) {
208 $this_version = (int)$this_version->getRepositoryVersion();
209 } else {
210 $this_version = null;
211 }
212
213 if ($versions) {
214 // This is the normal case, where we have some version information and
215 // can identify which nodes are leaders. If the current node is not a
216 // leader, we want to fetch from a leader and then update our version.
217
218 $max_version = (int)max(mpull($versions, 'getRepositoryVersion'));
219 if (($this_version === null) || ($max_version > $this_version)) {
220 if ($repository->isHosted()) {
221 $fetchable = array();
222 foreach ($versions as $version) {
223 if ($version->getRepositoryVersion() == $max_version) {
224 $fetchable[] = $version->getDevicePHID();
225 }
226 }
227
228
229 $this->synchronizeWorkingCopyFromDevices(
230 $fetchable,
231 $this_version,
232 $max_version);
233 } else {
234 $this->synchronizeWorkingCopyFromRemote();
235 }
236
237 PhabricatorRepositoryWorkingCopyVersion::updateVersion(
238 $repository_phid,
239 $device_phid,
240 $max_version);
241 } else {
242 $this->logLine(
243 pht(
244 'Device "%s" is already a cluster leader and does not need '.
245 'to be synchronized.',
246 $device->getName()));
247 }
248
249 $result_version = $max_version;
250 } else {
251 // If no version records exist yet, we need to be careful, because we
252 // can not tell which nodes are leaders.
253
254 // There might be several nodes with arbitrary existing data, and we have
255 // no way to tell which one has the "right" data. If we pick wrong, we
256 // might erase some or all of the data in the repository.
257
258 // Since this is dangerous, we refuse to guess unless there is only one
259 // device. If we're the only device in the group, we obviously must be
260 // a leader.
261
262 $service = $repository->loadAlmanacService();
263 if (!$service) {
264 throw new Exception(pht('Failed to load repository cluster service.'));
265 }
266
267 $bindings = $service->getActiveBindings();
268 $device_map = array();
269 foreach ($bindings as $binding) {
270 $device_map[$binding->getDevicePHID()] = true;
271 }
272
273 if (count($device_map) > 1) {
274 throw new Exception(
275 pht(
276 'Repository "%s" exists on more than one device, but no device '.
277 'has any repository version information. There is no way for the '.
278 'software to determine which copy of the existing data is '.
279 'authoritative. Promote a device or see "Ambiguous Leaders" in '.
280 'the documentation.',
281 $repository->getDisplayName()));
282 }
283
284 if (empty($device_map[$device->getPHID()])) {
285 throw new Exception(
286 pht(
287 'Repository "%s" is being synchronized on device "%s", but '.
288 'this device is not bound to the corresponding cluster '.
289 'service ("%s").',
290 $repository->getDisplayName(),
291 $device->getName(),
292 $service->getName()));
293 }
294
295 // The current device is the only device in service, so it must be a
296 // leader. We can safely have any future nodes which come online read
297 // from it.
298 PhabricatorRepositoryWorkingCopyVersion::updateVersion(
299 $repository_phid,
300 $device_phid,
301 0);
302
303 $result_version = 0;
304 }
305
306 $read_lock->unlock();
307
308 return $result_version;
309 }
310
311
312 /**
313 * @task sync
314 */
315 public function synchronizeWorkingCopyBeforeWrite() {
316 if (!$this->shouldEnableSynchronization(true)) {
317 return;
318 }
319
320 $repository = $this->getRepository();
321 $viewer = $this->getViewer();
322
323 $repository_phid = $repository->getPHID();
324
325 $device = AlmanacKeys::getLiveDevice();
326 $device_phid = $device->getPHID();
327
328 $table = new PhabricatorRepositoryWorkingCopyVersion();
329 $locked_connection = $table->establishConnection('w');
330
331 $write_lock = PhabricatorRepositoryWorkingCopyVersion::getWriteLock(
332 $repository_phid);
333
334 $write_lock->setExternalConnection($locked_connection);
335
336 $this->logLine(
337 pht(
338 'Acquiring write lock for repository "%s"...',
339 $repository->getDisplayName()));
340
341 // See T13590. On the HTTP pathway, it's possible for us to hit the script
342 // time limit while holding the durable write lock if a user makes a big
343 // push. Remove the time limit before we acquire the durable lock.
344 set_time_limit(0);
345
346 $lock_wait = phutil_units('2 minutes in seconds');
347 try {
348 $write_wait_start = microtime(true);
349
350 $start = PhabricatorTime::getNow();
351 $step_wait = 1;
352
353 while (true) {
354 try {
355 $write_lock->lock((int)floor($step_wait));
356 $write_wait_end = microtime(true);
357 break;
358 } catch (PhutilLockException $ex) {
359 $waited = (PhabricatorTime::getNow() - $start);
360 if ($waited > $lock_wait) {
361 throw $ex;
362 }
363 $this->logActiveWriter($viewer, $repository);
364 }
365
366 // Wait a little longer before the next message we print.
367 $step_wait = $step_wait + 0.5;
368 $step_wait = min($step_wait, 3);
369 }
370
371 $waited = (PhabricatorTime::getNow() - $start);
372 if ($waited) {
373 $this->logLine(
374 pht(
375 'Acquired write lock after %s second(s).',
376 new PhutilNumber($waited)));
377 } else {
378 $this->logLine(
379 pht(
380 'Acquired write lock immediately.'));
381 }
382 } catch (PhutilLockException $ex) {
383 throw new Exception(
384 pht(
385 'Failed to acquire write lock after waiting %s second(s). You '.
386 'may be able to retry later. (%s)',
387 new PhutilNumber($lock_wait),
388 $ex->getHint()),
389 0,
390 $ex);
391 }
392
393 $versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions(
394 $repository_phid);
395 foreach ($versions as $version) {
396 if (!$version->getIsWriting()) {
397 continue;
398 }
399
400 throw new Exception(
401 pht(
402 'An previous write to this repository was interrupted; refusing '.
403 'new writes. This issue requires operator intervention to resolve, '.
404 'see "Write Interruptions" in the "Cluster: Repositories" in the '.
405 'documentation for instructions.'));
406 }
407
408 $read_wait_start = microtime(true);
409 try {
410 $max_version = $this->synchronizeWorkingCopyBeforeRead();
411 } catch (Exception $ex) {
412 $write_lock->unlock();
413 throw $ex;
414 }
415 $read_wait_end = microtime(true);
416
417 $pid = getmypid();
418 $hash = Filesystem::readRandomCharacters(12);
419 $this->clusterWriteOwner = "{$pid}.{$hash}";
420
421 PhabricatorRepositoryWorkingCopyVersion::willWrite(
422 $locked_connection,
423 $repository_phid,
424 $device_phid,
425 array(
426 'userPHID' => $this->getEffectiveActingAsPHID(),
427 'epoch' => PhabricatorTime::getNow(),
428 'devicePHID' => $device_phid,
429 ),
430 $this->clusterWriteOwner);
431
432 $this->clusterWriteVersion = $max_version;
433 $this->clusterWriteLock = $write_lock;
434
435 $write_wait = ($write_wait_end - $write_wait_start);
436 $read_wait = ($read_wait_end - $read_wait_start);
437
438 $log = $this->logger;
439 if ($log) {
440 $log->writeClusterEngineLogProperty('writeWait', $write_wait);
441 $log->writeClusterEngineLogProperty('readWait', $read_wait);
442 }
443 }
444
445
446 public function synchronizeWorkingCopyAfterDiscovery($new_version) {
447 if (!$this->shouldEnableSynchronization(true)) {
448 return;
449 }
450
451 $repository = $this->getRepository();
452 $repository_phid = $repository->getPHID();
453 if ($repository->isHosted()) {
454 return;
455 }
456
457 $device = AlmanacKeys::getLiveDevice();
458 $device_phid = $device->getPHID();
459
460 // NOTE: We are not holding a lock here because this method is only called
461 // from PhabricatorRepositoryDiscoveryEngine, which already holds a device
462 // lock. Even if we do race here and record an older version, the
463 // consequences are mild: we only do extra work to correct it later.
464
465 $versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions(
466 $repository_phid);
467 $versions = mpull($versions, null, 'getDevicePHID');
468
469 $this_version = idx($versions, $device_phid);
470 if ($this_version) {
471 $this_version = (int)$this_version->getRepositoryVersion();
472 } else {
473 $this_version = null;
474 }
475
476 if (($this_version === null) || ($new_version > $this_version)) {
477 PhabricatorRepositoryWorkingCopyVersion::updateVersion(
478 $repository_phid,
479 $device_phid,
480 $new_version);
481 }
482 }
483
484
485 /**
486 * @task sync
487 */
488 public function synchronizeWorkingCopyAfterWrite() {
489 if (!$this->shouldEnableSynchronization(true)) {
490 return;
491 }
492
493 if (!$this->clusterWriteLock) {
494 throw new Exception(
495 pht(
496 'Trying to synchronize after write, but not holding a write '.
497 'lock!'));
498 }
499
500 $repository = $this->getRepository();
501 $repository_phid = $repository->getPHID();
502
503 $device = AlmanacKeys::getLiveDevice();
504 $device_phid = $device->getPHID();
505
506 // It is possible that we've lost the global lock while receiving the push.
507 // For example, the master database may have been restarted between the
508 // time we acquired the global lock and now, when the push has finished.
509
510 // We wrote a durable lock while we were holding the the global lock,
511 // essentially upgrading our lock. We can still safely release this upgraded
512 // lock even if we're no longer holding the global lock.
513
514 // If we fail to release the lock, the repository will be frozen until
515 // an operator can figure out what happened, so we try pretty hard to
516 // reconnect to the database and release the lock.
517
518 $now = PhabricatorTime::getNow();
519 $duration = phutil_units('5 minutes in seconds');
520 $try_until = $now + $duration;
521
522 $did_release = false;
523 $already_failed = false;
524 while (PhabricatorTime::getNow() <= $try_until) {
525 try {
526 // NOTE: This means we're still bumping the version when pushes fail. We
527 // could select only un-rejected events instead to bump a little less
528 // often.
529
530 $new_log = id(new PhabricatorRepositoryPushEventQuery())
531 ->setViewer(PhabricatorUser::getOmnipotentUser())
532 ->withRepositoryPHIDs(array($repository_phid))
533 ->setLimit(1)
534 ->executeOne();
535
536 $old_version = $this->clusterWriteVersion;
537 if ($new_log) {
538 $new_version = $new_log->getID();
539 } else {
540 $new_version = $old_version;
541 }
542
543 PhabricatorRepositoryWorkingCopyVersion::didWrite(
544 $repository_phid,
545 $device_phid,
546 $this->clusterWriteVersion,
547 $new_version,
548 $this->clusterWriteOwner);
549 $did_release = true;
550 break;
551 } catch (AphrontConnectionQueryException $ex) {
552 $connection_exception = $ex;
553 } catch (AphrontConnectionLostQueryException $ex) {
554 $connection_exception = $ex;
555 }
556
557 if (!$already_failed) {
558 $already_failed = true;
559 $this->logLine(
560 pht('CRITICAL. Failed to release cluster write lock!'));
561
562 $this->logLine(
563 pht(
564 'The connection to the master database was lost while receiving '.
565 'the write.'));
566
567 $this->logLine(
568 pht(
569 'This process will spend %s more second(s) attempting to '.
570 'recover, then give up.',
571 new PhutilNumber($duration)));
572 }
573
574 sleep(1);
575 }
576
577 if ($did_release) {
578 if ($already_failed) {
579 $this->logLine(
580 pht('RECOVERED. Link to master database was restored.'));
581 }
582 $this->logLine(pht('Released cluster write lock.'));
583 } else {
584 throw new Exception(
585 pht(
586 'Failed to reconnect to master database and release held write '.
587 'lock ("%s") on device "%s" for repository "%s" after trying '.
588 'for %s seconds. This repository will be frozen.',
589 $this->clusterWriteOwner,
590 $device->getName(),
591 $repository->getDisplayName(),
592 new PhutilNumber($duration)));
593 }
594
595 // We can continue even if we've lost this lock, everything is still
596 // consistent.
597 try {
598 $this->clusterWriteLock->unlock();
599 } catch (Exception $ex) {
600 // Ignore.
601 }
602
603 $this->clusterWriteLock = null;
604 $this->clusterWriteOwner = null;
605 }
606
607
608/* -( Internals )---------------------------------------------------------- */
609
610
611 /**
612 * @task internal
613 */
614 private function shouldEnableSynchronization($require_device) {
615 $repository = $this->getRepository();
616
617 $service_phid = $repository->getAlmanacServicePHID();
618 if (!$service_phid) {
619 return false;
620 }
621
622 if (!$repository->supportsSynchronization()) {
623 return false;
624 }
625
626 if ($require_device) {
627 $device = AlmanacKeys::getLiveDevice();
628 if (!$device) {
629 return false;
630 }
631 }
632
633 return true;
634 }
635
636
637 /**
638 * @task internal
639 */
640 private function synchronizeWorkingCopyFromRemote() {
641 $repository = $this->getRepository();
642 $device = AlmanacKeys::getLiveDevice();
643
644 $local_path = $repository->getLocalPath();
645 $fetch_uri = $repository->getRemoteURIEnvelope();
646
647 if ($repository->isGit()) {
648 $this->requireWorkingCopy();
649
650 $argv = array(
651 'fetch --prune -- %P %s',
652 $fetch_uri,
653 '+refs/*:refs/*',
654 );
655 } else {
656 throw new Exception(pht('Remote sync only supported for git!'));
657 }
658
659 $future = DiffusionCommandEngine::newCommandEngine($repository)
660 ->setArgv($argv)
661 ->setSudoAsDaemon(true)
662 ->setCredentialPHID($repository->getCredentialPHID())
663 ->setURI($repository->getRemoteURIObject())
664 ->newFuture();
665
666 $future->setCWD($local_path);
667
668 try {
669 $future->resolvex();
670 } catch (Exception $ex) {
671 $this->logLine(
672 pht(
673 'Synchronization of "%s" from remote failed: %s',
674 $device->getName(),
675 $ex->getMessage()));
676 throw $ex;
677 }
678 }
679
680
681 /**
682 * @task internal
683 */
684 private function synchronizeWorkingCopyFromDevices(
685 array $device_phids,
686 $local_version,
687 $remote_version) {
688
689 $repository = $this->getRepository();
690
691 $service = $repository->loadAlmanacService();
692 if (!$service) {
693 throw new Exception(pht('Failed to load repository cluster service.'));
694 }
695
696 $device_map = array_fuse($device_phids);
697 $bindings = $service->getActiveBindings();
698
699 $fetchable = array();
700 foreach ($bindings as $binding) {
701 // We can't fetch from nodes which don't have the newest version.
702 $device_phid = $binding->getDevicePHID();
703 if (empty($device_map[$device_phid])) {
704 continue;
705 }
706
707 // TODO: For now, only fetch over SSH. We could support fetching over
708 // HTTP eventually.
709 if ($binding->getAlmanacPropertyValue('protocol') != 'ssh') {
710 continue;
711 }
712
713 $fetchable[] = $binding;
714 }
715
716 if (!$fetchable) {
717 throw new Exception(
718 pht(
719 'Leader lost: no up-to-date nodes in repository cluster are '.
720 'fetchable.'));
721 }
722
723 // If we can synchronize from multiple sources, choose one at random.
724 shuffle($fetchable);
725
726 $caught = null;
727 foreach ($fetchable as $binding) {
728 try {
729 $this->synchronizeWorkingCopyFromBinding(
730 $binding,
731 $local_version,
732 $remote_version);
733 $caught = null;
734 break;
735 } catch (Exception $ex) {
736 $caught = $ex;
737 }
738 }
739
740 if ($caught) {
741 throw $caught;
742 }
743 }
744
745
746 /**
747 * @task internal
748 */
749 private function synchronizeWorkingCopyFromBinding(
750 AlmanacBinding $binding,
751 $local_version,
752 $remote_version) {
753
754 $repository = $this->getRepository();
755 $device = AlmanacKeys::getLiveDevice();
756
757 $this->logLine(
758 pht(
759 'Synchronizing this device ("%s") from cluster leader ("%s").',
760 $device->getName(),
761 $binding->getDevice()->getName()));
762
763 $fetch_uri = $repository->getClusterRepositoryURIFromBinding($binding);
764 $local_path = $repository->getLocalPath();
765
766 if ($repository->isGit()) {
767 $this->requireWorkingCopy();
768
769 $argv = array(
770 'fetch --prune -- %s %s',
771 $fetch_uri,
772 '+refs/*:refs/*',
773 );
774 } else {
775 throw new Exception(pht('Binding sync only supported for git!'));
776 }
777
778 $future = DiffusionCommandEngine::newCommandEngine($repository)
779 ->setArgv($argv)
780 ->setConnectAsDevice(true)
781 ->setSudoAsDaemon(true)
782 ->setURI($fetch_uri)
783 ->newFuture();
784
785 $future->setCWD($local_path);
786
787 $log = PhabricatorRepositorySyncEvent::initializeNewEvent()
788 ->setRepositoryPHID($repository->getPHID())
789 ->setEpoch(PhabricatorTime::getNow())
790 ->setDevicePHID($device->getPHID())
791 ->setFromDevicePHID($binding->getDevice()->getPHID())
792 ->setDeviceVersion($local_version)
793 ->setFromDeviceVersion($remote_version);
794
795 $sync_start = microtime(true);
796
797 try {
798 $future->resolvex();
799 } catch (Exception $ex) {
800 $log->setSyncWait(phutil_microseconds_since($sync_start));
801
802 if ($ex instanceof CommandException) {
803 if ($future->getWasKilledByTimeout()) {
804 $result_type = PhabricatorRepositorySyncEvent::RESULT_TIMEOUT;
805 } else {
806 $result_type = PhabricatorRepositorySyncEvent::RESULT_ERROR;
807 }
808
809 $log
810 ->setResultCode($ex->getError())
811 ->setResultType($result_type)
812 ->setProperty('stdout', $ex->getStdout())
813 ->setProperty('stderr', $ex->getStderr());
814 } else {
815 $log
816 ->setResultCode(1)
817 ->setResultType(PhabricatorRepositorySyncEvent::RESULT_EXCEPTION)
818 ->setProperty('message', $ex->getMessage());
819 }
820
821 $log->save();
822
823 $this->logLine(
824 pht(
825 'Synchronization of "%s" from leader "%s" failed: %s',
826 $device->getName(),
827 $binding->getDevice()->getName(),
828 $ex->getMessage()));
829
830 throw $ex;
831 }
832
833 $log
834 ->setSyncWait(phutil_microseconds_since($sync_start))
835 ->setResultCode(0)
836 ->setResultType(PhabricatorRepositorySyncEvent::RESULT_SYNC)
837 ->save();
838 }
839
840
841 /**
842 * @task internal
843 */
844 private function logLine($message) {
845 return $this->logText("# {$message}\n");
846 }
847
848
849 /**
850 * @task internal
851 */
852 private function logText($message) {
853 $log = $this->logger;
854 if ($log) {
855 $log->writeClusterEngineLogMessage($message);
856 }
857 return $this;
858 }
859
860 private function requireWorkingCopy() {
861 $repository = $this->getRepository();
862 $local_path = $repository->getLocalPath();
863
864 if (!Filesystem::pathExists($local_path)) {
865 $device = AlmanacKeys::getLiveDevice();
866
867 throw new Exception(
868 pht(
869 'Repository "%s" does not have a working copy on this device '.
870 'yet, so it can not be synchronized. Wait for the daemons to '.
871 'construct one or run `bin/repository update %s` on this host '.
872 '("%s") to build it explicitly.',
873 $repository->getDisplayName(),
874 $repository->getMonogram(),
875 $device->getName()));
876 }
877 }
878
879 private function logActiveWriter(
880 PhabricatorUser $viewer,
881 PhabricatorRepository $repository) {
882
883 $writer = PhabricatorRepositoryWorkingCopyVersion::loadWriter(
884 $repository->getPHID());
885 if (!$writer) {
886 $this->logLine(pht('Waiting on another user to finish writing...'));
887 return;
888 }
889
890 $user_phid = $writer->getWriteProperty('userPHID');
891 $device_phid = $writer->getWriteProperty('devicePHID');
892 $epoch = $writer->getWriteProperty('epoch');
893
894 $phids = array($user_phid, $device_phid);
895 $handles = $viewer->loadHandles($phids);
896
897 $duration = (PhabricatorTime::getNow() - $epoch) + 1;
898
899 $this->logLine(
900 pht(
901 'Waiting for %s to finish writing (on device "%s" for %ss)...',
902 $handles[$user_phid]->getName(),
903 $handles[$device_phid]->getName(),
904 new PhutilNumber($duration)));
905 }
906
907 public function newMaintenanceEvent() {
908 $viewer = $this->getViewer();
909 $repository = $this->getRepository();
910 $now = PhabricatorTime::getNow();
911
912 $event = PhabricatorRepositoryPushEvent::initializeNewEvent($viewer)
913 ->setRepositoryPHID($repository->getPHID())
914 ->setEpoch($now)
915 ->setPusherPHID($this->getEffectiveActingAsPHID())
916 ->setRejectCode(PhabricatorRepositoryPushLog::REJECT_ACCEPT);
917
918 return $event;
919 }
920
921 public function newMaintenanceLog() {
922 $viewer = $this->getViewer();
923 $repository = $this->getRepository();
924 $now = PhabricatorTime::getNow();
925
926 $device = AlmanacKeys::getLiveDevice();
927 if ($device) {
928 $device_phid = $device->getPHID();
929 } else {
930 $device_phid = null;
931 }
932
933 return PhabricatorRepositoryPushLog::initializeNewLog($viewer)
934 ->setDevicePHID($device_phid)
935 ->setRepositoryPHID($repository->getPHID())
936 ->attachRepository($repository)
937 ->setEpoch($now)
938 ->setPusherPHID($this->getEffectiveActingAsPHID())
939 ->setChangeFlags(PhabricatorRepositoryPushLog::CHANGEFLAG_MAINTENANCE)
940 ->setRefType(PhabricatorRepositoryPushLog::REFTYPE_MAINTENANCE)
941 ->setRefNew('');
942 }
943
944}