Maintain local ⭤ remote in sync with automatic AT Protocol parity for Laravel (alpha & unstable)
1<?php
2
3namespace SocialDept\AtpParity\Import;
4
5use SocialDept\AtpClient\AtpClient;
6use SocialDept\AtpClient\Facades\Atp;
7use SocialDept\AtpParity\Events\ImportCompleted;
8use SocialDept\AtpParity\Events\ImportFailed;
9use SocialDept\AtpParity\Events\ImportProgress;
10use SocialDept\AtpParity\Events\ImportStarted;
11use SocialDept\AtpParity\MapperRegistry;
12use SocialDept\AtpResolver\Facades\Resolver;
13use Throwable;
14
15/**
16 * Orchestrates importing of AT Protocol records to Eloquent models.
17 *
18 * Supports importing individual users, specific collections, or entire
19 * networks through cursor-based pagination with progress tracking.
20 */
21class ImportService
22{
23 /**
24 * Cache of clients by PDS endpoint.
25 *
26 * @var array<string, AtpClient>
27 */
28 protected array $clients = [];
29
30 public function __construct(
31 protected MapperRegistry $registry
32 ) {}
33
34 /**
35 * Import all records for a user in registered collections.
36 *
37 * @param array<string>|null $collections Specific collections to import, or null for all registered
38 */
39 public function importUser(string $did, ?array $collections = null, ?callable $onProgress = null): ImportResult
40 {
41 $collections = $collections ?? $this->registry->lexicons();
42 $results = [];
43
44 foreach ($collections as $collection) {
45 if (! $this->registry->hasLexicon($collection)) {
46 continue;
47 }
48
49 $results[] = $this->importUserCollection($did, $collection, $onProgress);
50 }
51
52 return ImportResult::aggregate($did, $results);
53 }
54
55 /**
56 * Import a specific collection for a user.
57 */
58 public function importUserCollection(string $did, string $collection, ?callable $onProgress = null): ImportResult
59 {
60 $mapper = $this->registry->forLexicon($collection);
61
62 if (! $mapper) {
63 return ImportResult::failed($did, $collection, "No mapper registered for collection: {$collection}");
64 }
65
66 $state = ImportState::findOrCreateFor($did, $collection);
67
68 if ($state->isComplete()) {
69 return $state->toResult();
70 }
71
72 $pdsEndpoint = $this->resolvePds($did);
73
74 if (! $pdsEndpoint) {
75 $error = "Could not resolve PDS endpoint for DID: {$did}";
76 $state->markFailed($error);
77 event(new ImportFailed($did, $collection, $error));
78
79 return ImportResult::failed($did, $collection, $error);
80 }
81
82 $state->markStarted();
83 event(new ImportStarted($did, $collection));
84
85 $client = $this->clientFor($pdsEndpoint);
86 $cursor = $state->cursor;
87 $pageSize = config('parity.import.page_size', 100);
88 $pageDelay = config('parity.import.page_delay', 100);
89 $recordClass = $mapper->recordClass();
90
91 try {
92 do {
93 $response = $client->atproto->repo->listRecords(
94 repo: $did,
95 collection: $collection,
96 limit: $pageSize,
97 cursor: $cursor
98 );
99
100 $synced = 0;
101 $skipped = 0;
102 $failed = 0;
103
104 foreach ($response->records as $item) {
105 try {
106 $record = $recordClass::fromArray($item['value']);
107
108 $mapper->upsert($record, [
109 'uri' => $item['uri'],
110 'cid' => $item['cid'],
111 ]);
112
113 $synced++;
114 } catch (Throwable $e) {
115 $failed++;
116 }
117 }
118
119 $cursor = $response->cursor;
120 $state->updateProgress($synced, $skipped, $failed, $cursor);
121
122 if ($onProgress) {
123 $onProgress(new ImportProgress(
124 did: $did,
125 collection: $collection,
126 recordsSynced: $state->records_synced,
127 cursor: $cursor
128 ));
129 }
130
131 event(new ImportProgress($did, $collection, $state->records_synced, $cursor));
132
133 if ($cursor && $pageDelay > 0) {
134 usleep($pageDelay * 1000);
135 }
136 } while ($cursor);
137
138 $state->markCompleted();
139 $result = $state->toResult();
140 event(new ImportCompleted($result));
141
142 return $result;
143 } catch (Throwable $e) {
144 $error = $e->getMessage();
145 $state->markFailed($error);
146 event(new ImportFailed($did, $collection, $error));
147
148 return ImportResult::failed(
149 did: $did,
150 collection: $collection,
151 error: $error,
152 synced: $state->records_synced,
153 skipped: $state->records_skipped,
154 failed: $state->records_failed,
155 cursor: $state->cursor
156 );
157 }
158 }
159
160 /**
161 * Resume an interrupted import from cursor.
162 */
163 public function resume(ImportState $state, ?callable $onProgress = null): ImportResult
164 {
165 if (! $state->canResume()) {
166 return $state->toResult();
167 }
168
169 $state->update(['status' => ImportState::STATUS_PENDING]);
170
171 return $this->importUserCollection($state->did, $state->collection, $onProgress);
172 }
173
174 /**
175 * Resume all interrupted imports.
176 *
177 * @return array<ImportResult>
178 */
179 public function resumeAll(?callable $onProgress = null): array
180 {
181 $results = [];
182
183 ImportState::resumable()->each(function (ImportState $state) use (&$results, $onProgress) {
184 $results[] = $this->resume($state, $onProgress);
185 });
186
187 return $results;
188 }
189
190 /**
191 * Get import status for a DID/collection.
192 */
193 public function getStatus(string $did, string $collection): ?ImportState
194 {
195 return ImportState::where('did', $did)
196 ->where('collection', $collection)
197 ->first();
198 }
199
200 /**
201 * Get all import states for a DID.
202 *
203 * @return \Illuminate\Database\Eloquent\Collection<int, ImportState>
204 */
205 public function getStatusForUser(string $did): \Illuminate\Database\Eloquent\Collection
206 {
207 return ImportState::where('did', $did)->get();
208 }
209
210 /**
211 * Check if a user's collection has been imported.
212 */
213 public function isImported(string $did, string $collection): bool
214 {
215 $state = $this->getStatus($did, $collection);
216
217 return $state && $state->isComplete();
218 }
219
220 /**
221 * Reset an import state to allow re-importing.
222 */
223 public function reset(string $did, string $collection): void
224 {
225 ImportState::where('did', $did)
226 ->where('collection', $collection)
227 ->delete();
228 }
229
230 /**
231 * Reset all import states for a user.
232 */
233 public function resetUser(string $did): void
234 {
235 ImportState::where('did', $did)->delete();
236 }
237
238 /**
239 * Get or create a client for a PDS endpoint.
240 */
241 protected function clientFor(string $pdsEndpoint): AtpClient
242 {
243 return $this->clients[$pdsEndpoint] ??= Atp::public($pdsEndpoint);
244 }
245
246 /**
247 * Resolve the PDS endpoint for a DID.
248 */
249 protected function resolvePds(string $did): ?string
250 {
251 return Resolver::resolvePds($did);
252 }
253}