Maintain local ⭤ remote in sync with automatic AT Protocol parity for Laravel (alpha & unstable)
at dev 7.5 kB view raw
1<?php 2 3namespace SocialDept\AtpParity\Import; 4 5use SocialDept\AtpClient\AtpClient; 6use SocialDept\AtpClient\Facades\Atp; 7use SocialDept\AtpParity\Events\ImportCompleted; 8use SocialDept\AtpParity\Events\ImportFailed; 9use SocialDept\AtpParity\Events\ImportProgress; 10use SocialDept\AtpParity\Events\ImportStarted; 11use SocialDept\AtpParity\MapperRegistry; 12use SocialDept\AtpResolver\Facades\Resolver; 13use Throwable; 14 15/** 16 * Orchestrates importing of AT Protocol records to Eloquent models. 17 * 18 * Supports importing individual users, specific collections, or entire 19 * networks through cursor-based pagination with progress tracking. 20 */ 21class ImportService 22{ 23 /** 24 * Cache of clients by PDS endpoint. 25 * 26 * @var array<string, AtpClient> 27 */ 28 protected array $clients = []; 29 30 public function __construct( 31 protected MapperRegistry $registry 32 ) {} 33 34 /** 35 * Import all records for a user in registered collections. 36 * 37 * @param array<string>|null $collections Specific collections to import, or null for all registered 38 */ 39 public function importUser(string $did, ?array $collections = null, ?callable $onProgress = null): ImportResult 40 { 41 $collections = $collections ?? $this->registry->lexicons(); 42 $results = []; 43 44 foreach ($collections as $collection) { 45 if (! $this->registry->hasLexicon($collection)) { 46 continue; 47 } 48 49 $results[] = $this->importUserCollection($did, $collection, $onProgress); 50 } 51 52 return ImportResult::aggregate($did, $results); 53 } 54 55 /** 56 * Import a specific collection for a user. 57 */ 58 public function importUserCollection(string $did, string $collection, ?callable $onProgress = null): ImportResult 59 { 60 $mapper = $this->registry->forLexicon($collection); 61 62 if (! $mapper) { 63 return ImportResult::failed($did, $collection, "No mapper registered for collection: {$collection}"); 64 } 65 66 $state = ImportState::findOrCreateFor($did, $collection); 67 68 if ($state->isComplete()) { 69 return $state->toResult(); 70 } 71 72 $pdsEndpoint = $this->resolvePds($did); 73 74 if (! $pdsEndpoint) { 75 $error = "Could not resolve PDS endpoint for DID: {$did}"; 76 $state->markFailed($error); 77 event(new ImportFailed($did, $collection, $error)); 78 79 return ImportResult::failed($did, $collection, $error); 80 } 81 82 $state->markStarted(); 83 event(new ImportStarted($did, $collection)); 84 85 $client = $this->clientFor($pdsEndpoint); 86 $cursor = $state->cursor; 87 $pageSize = config('parity.import.page_size', 100); 88 $pageDelay = config('parity.import.page_delay', 100); 89 $recordClass = $mapper->recordClass(); 90 91 try { 92 do { 93 $response = $client->atproto->repo->listRecords( 94 repo: $did, 95 collection: $collection, 96 limit: $pageSize, 97 cursor: $cursor 98 ); 99 100 $synced = 0; 101 $skipped = 0; 102 $failed = 0; 103 104 foreach ($response->records as $item) { 105 try { 106 $record = $recordClass::fromArray($item['value']); 107 108 $mapper->upsert($record, [ 109 'uri' => $item['uri'], 110 'cid' => $item['cid'], 111 ]); 112 113 $synced++; 114 } catch (Throwable $e) { 115 $failed++; 116 } 117 } 118 119 $cursor = $response->cursor; 120 $state->updateProgress($synced, $skipped, $failed, $cursor); 121 122 if ($onProgress) { 123 $onProgress(new ImportProgress( 124 did: $did, 125 collection: $collection, 126 recordsSynced: $state->records_synced, 127 cursor: $cursor 128 )); 129 } 130 131 event(new ImportProgress($did, $collection, $state->records_synced, $cursor)); 132 133 if ($cursor && $pageDelay > 0) { 134 usleep($pageDelay * 1000); 135 } 136 } while ($cursor); 137 138 $state->markCompleted(); 139 $result = $state->toResult(); 140 event(new ImportCompleted($result)); 141 142 return $result; 143 } catch (Throwable $e) { 144 $error = $e->getMessage(); 145 $state->markFailed($error); 146 event(new ImportFailed($did, $collection, $error)); 147 148 return ImportResult::failed( 149 did: $did, 150 collection: $collection, 151 error: $error, 152 synced: $state->records_synced, 153 skipped: $state->records_skipped, 154 failed: $state->records_failed, 155 cursor: $state->cursor 156 ); 157 } 158 } 159 160 /** 161 * Resume an interrupted import from cursor. 162 */ 163 public function resume(ImportState $state, ?callable $onProgress = null): ImportResult 164 { 165 if (! $state->canResume()) { 166 return $state->toResult(); 167 } 168 169 $state->update(['status' => ImportState::STATUS_PENDING]); 170 171 return $this->importUserCollection($state->did, $state->collection, $onProgress); 172 } 173 174 /** 175 * Resume all interrupted imports. 176 * 177 * @return array<ImportResult> 178 */ 179 public function resumeAll(?callable $onProgress = null): array 180 { 181 $results = []; 182 183 ImportState::resumable()->each(function (ImportState $state) use (&$results, $onProgress) { 184 $results[] = $this->resume($state, $onProgress); 185 }); 186 187 return $results; 188 } 189 190 /** 191 * Get import status for a DID/collection. 192 */ 193 public function getStatus(string $did, string $collection): ?ImportState 194 { 195 return ImportState::where('did', $did) 196 ->where('collection', $collection) 197 ->first(); 198 } 199 200 /** 201 * Get all import states for a DID. 202 * 203 * @return \Illuminate\Database\Eloquent\Collection<int, ImportState> 204 */ 205 public function getStatusForUser(string $did): \Illuminate\Database\Eloquent\Collection 206 { 207 return ImportState::where('did', $did)->get(); 208 } 209 210 /** 211 * Check if a user's collection has been imported. 212 */ 213 public function isImported(string $did, string $collection): bool 214 { 215 $state = $this->getStatus($did, $collection); 216 217 return $state && $state->isComplete(); 218 } 219 220 /** 221 * Reset an import state to allow re-importing. 222 */ 223 public function reset(string $did, string $collection): void 224 { 225 ImportState::where('did', $did) 226 ->where('collection', $collection) 227 ->delete(); 228 } 229 230 /** 231 * Reset all import states for a user. 232 */ 233 public function resetUser(string $did): void 234 { 235 ImportState::where('did', $did)->delete(); 236 } 237 238 /** 239 * Get or create a client for a PDS endpoint. 240 */ 241 protected function clientFor(string $pdsEndpoint): AtpClient 242 { 243 return $this->clients[$pdsEndpoint] ??= Atp::public($pdsEndpoint); 244 } 245 246 /** 247 * Resolve the PDS endpoint for a DID. 248 */ 249 protected function resolvePds(string $did): ?string 250 { 251 return Resolver::resolvePds($did); 252 } 253}