@recaptime-dev's working patches + fork for Phorge, a community fork of Phabricator. (Upstream dev and stable branches are at upstream/main and upstream/stable respectively.) hq.recaptime.dev/wiki/Phorge
phorge phabricator
at upstream/main 261 lines 7.3 kB view raw
1<?php 2 3abstract class DiffusionGitSSHWorkflow 4 extends DiffusionSSHWorkflow 5 implements DiffusionRepositoryClusterEngineLogInterface { 6 7 private $engineLogProperties = array(); 8 private $protocolLog; 9 10 private $wireProtocol; 11 private $ioBytesRead = 0; 12 private $ioBytesWritten = 0; 13 private $requestAttempts = 0; 14 private $requestFailures = 0; 15 16 protected function writeError($message) { 17 // Git assumes we'll add our own newlines. 18 return parent::writeError($message."\n"); 19 } 20 21 public function writeClusterEngineLogMessage($message) { 22 parent::writeError($message); 23 $this->getErrorChannel()->update(); 24 } 25 26 public function writeClusterEngineLogProperty($key, $value) { 27 $this->engineLogProperties[$key] = $value; 28 } 29 30 protected function getClusterEngineLogProperty($key, $default = null) { 31 return idx($this->engineLogProperties, $key, $default); 32 } 33 34 protected function identifyRepository() { 35 $args = $this->getArgs(); 36 $path = head($args->getArg('dir')); 37 return $this->loadRepositoryWithPath( 38 $path, 39 PhabricatorRepositoryType::REPOSITORY_TYPE_GIT); 40 } 41 42 protected function waitForGitClient() { 43 $io_channel = $this->getIOChannel(); 44 45 // If we don't wait for the client to close the connection, `git` will 46 // consider it an early abort and fail. Sit around until Git is comfortable 47 // that it really received all the data. 48 while ($io_channel->isOpenForReading()) { 49 $io_channel->update(); 50 $this->getErrorChannel()->flush(); 51 PhutilChannel::waitForAny(array($io_channel)); 52 } 53 } 54 55 protected function raiseWrongVCSException( 56 PhabricatorRepository $repository) { 57 throw new Exception( 58 pht( 59 'This repository ("%s") is not a Git repository. Use "%s" to '. 60 'interact with this repository.', 61 $repository->getDisplayName(), 62 $repository->getVersionControlSystem())); 63 } 64 65 protected function newPassthruCommand() { 66 return parent::newPassthruCommand() 67 ->setWillWriteCallback(array($this, 'willWriteMessageCallback')) 68 ->setWillReadCallback(array($this, 'willReadMessageCallback')); 69 } 70 71 protected function newProtocolLog($is_proxy) { 72 if ($is_proxy) { 73 return null; 74 } 75 76 // While developing, do this to write a full protocol log to disk: 77 // 78 // return new PhabricatorProtocolLog('/tmp/git-protocol.log'); 79 80 return null; 81 } 82 83 final protected function getProtocolLog() { 84 return $this->protocolLog; 85 } 86 87 final protected function setProtocolLog(PhabricatorProtocolLog $log) { 88 $this->protocolLog = $log; 89 } 90 91 final protected function getWireProtocol() { 92 return $this->wireProtocol; 93 } 94 95 final protected function setWireProtocol( 96 DiffusionGitWireProtocol $protocol) { 97 $this->wireProtocol = $protocol; 98 return $this; 99 } 100 101 public function willWriteMessageCallback( 102 PhabricatorSSHPassthruCommand $command, 103 $message) { 104 105 $this->ioBytesWritten += strlen($message); 106 107 $log = $this->getProtocolLog(); 108 if ($log) { 109 $log->didWriteBytes($message); 110 } 111 112 $protocol = $this->getWireProtocol(); 113 if ($protocol) { 114 $message = $protocol->willWriteBytes($message); 115 } 116 117 return $message; 118 } 119 120 public function willReadMessageCallback( 121 PhabricatorSSHPassthruCommand $command, 122 $message) { 123 124 $log = $this->getProtocolLog(); 125 if ($log) { 126 $log->didReadBytes($message); 127 } 128 129 $protocol = $this->getWireProtocol(); 130 if ($protocol) { 131 $message = $protocol->willReadBytes($message); 132 } 133 134 // Note that bytes aren't counted until they're emittted by the protocol 135 // layer. This means the underlying command might emit bytes, but if they 136 // are buffered by the protocol layer they won't count as read bytes yet. 137 138 $this->ioBytesRead += strlen($message); 139 140 return $message; 141 } 142 143 final protected function getIOBytesRead() { 144 return $this->ioBytesRead; 145 } 146 147 final protected function getIOBytesWritten() { 148 return $this->ioBytesWritten; 149 } 150 151 final protected function executeRepositoryProxyOperations($for_write) { 152 $device = AlmanacKeys::getLiveDevice(); 153 154 $refs = $this->getAlmanacServiceRefs($for_write); 155 $err = 1; 156 157 while (true) { 158 $ref = head($refs); 159 160 $command = $this->getProxyCommandForServiceRef($ref); 161 162 if ($device) { 163 $this->writeClusterEngineLogMessage( 164 pht( 165 "# Request received by \"%s\", forwarding to cluster ". 166 "host \"%s\".\n", 167 $device->getName(), 168 $ref->getDeviceName())); 169 } 170 171 $command = PhabricatorDaemon::sudoCommandAsDaemonUser($command); 172 173 $future = id(new ExecFuture('%C', $command)) 174 ->setEnv($this->getEnvironment()); 175 176 $this->didBeginRequest(); 177 178 $err = $this->newPassthruCommand() 179 ->setIOChannel($this->getIOChannel()) 180 ->setCommandChannelFromExecFuture($future) 181 ->execute(); 182 183 // TODO: Currently, when proxying, we do not write an event log on the 184 // proxy. Perhaps we should write a "proxy log". This is not very useful 185 // for statistics or auditing, but could be useful for diagnostics. 186 // Marking the proxy logs as proxied (and recording devicePHID on all 187 // logs) would make differentiating between these use cases easier. 188 189 if (!$err) { 190 $this->waitForGitClient(); 191 return $err; 192 } 193 194 // Throw away this service: the request failed and we're treating the 195 // failure as persistent, so we don't want to retry another request to 196 // the same host. 197 array_shift($refs); 198 199 $should_retry = $this->shouldRetryRequest($refs); 200 if (!$should_retry) { 201 return $err; 202 } 203 204 // If we haven't bailed out yet, we'll retry the request with the next 205 // service. 206 } 207 208 throw new Exception(pht('Reached an unreachable place.')); 209 } 210 211 private function didBeginRequest() { 212 $this->requestAttempts++; 213 return $this; 214 } 215 216 private function shouldRetryRequest(array $remaining_refs) { 217 $this->requestFailures++; 218 219 if ($this->requestFailures > $this->requestAttempts) { 220 throw new Exception( 221 pht( 222 "Workflow has recorded more failures than attempts; there is a ". 223 "missing call to \"didBeginRequest()\".\n")); 224 } 225 226 if (!$remaining_refs) { 227 $this->writeClusterEngineLogMessage( 228 pht( 229 "# All available services failed to serve the request, ". 230 "giving up.\n")); 231 return false; 232 } 233 234 $read_len = $this->getIOBytesRead(); 235 if ($read_len) { 236 $this->writeClusterEngineLogMessage( 237 pht( 238 "# Client already read from service (%s bytes), unable to retry.\n", 239 new PhutilNumber($read_len))); 240 return false; 241 } 242 243 $write_len = $this->getIOBytesWritten(); 244 if ($write_len) { 245 $this->writeClusterEngineLogMessage( 246 pht( 247 "# Client already wrote to service (%s bytes), unable to retry.\n", 248 new PhutilNumber($write_len))); 249 return false; 250 } 251 252 $this->writeClusterEngineLogMessage( 253 pht( 254 "# Service request failed, retrying (making attempt %s of %s).\n", 255 new PhutilNumber($this->requestAttempts + 1), 256 new PhutilNumber($this->requestAttempts + count($remaining_refs)))); 257 258 return true; 259 } 260 261}