+253
src/Import/ImportService.php
+253
src/Import/ImportService.php
···
1
+
<?php
2
+
3
+
namespace SocialDept\AtpParity\Import;
4
+
5
+
use SocialDept\AtpClient\AtpClient;
6
+
use SocialDept\AtpClient\Facades\Atp;
7
+
use SocialDept\AtpParity\Events\ImportCompleted;
8
+
use SocialDept\AtpParity\Events\ImportFailed;
9
+
use SocialDept\AtpParity\Events\ImportProgress;
10
+
use SocialDept\AtpParity\Events\ImportStarted;
11
+
use SocialDept\AtpParity\MapperRegistry;
12
+
use SocialDept\AtpResolver\Facades\Resolver;
13
+
use Throwable;
14
+
15
+
/**
16
+
* Orchestrates importing of AT Protocol records to Eloquent models.
17
+
*
18
+
* Supports importing individual users, specific collections, or entire
19
+
* networks through cursor-based pagination with progress tracking.
20
+
*/
21
+
class ImportService
22
+
{
23
+
/**
24
+
* Cache of clients by PDS endpoint.
25
+
*
26
+
* @var array<string, AtpClient>
27
+
*/
28
+
protected array $clients = [];
29
+
30
+
public function __construct(
31
+
protected MapperRegistry $registry
32
+
) {}
33
+
34
+
/**
35
+
* Import all records for a user in registered collections.
36
+
*
37
+
* @param array<string>|null $collections Specific collections to import, or null for all registered
38
+
*/
39
+
public function importUser(string $did, ?array $collections = null, ?callable $onProgress = null): ImportResult
40
+
{
41
+
$collections = $collections ?? $this->registry->lexicons();
42
+
$results = [];
43
+
44
+
foreach ($collections as $collection) {
45
+
if (! $this->registry->hasLexicon($collection)) {
46
+
continue;
47
+
}
48
+
49
+
$results[] = $this->importUserCollection($did, $collection, $onProgress);
50
+
}
51
+
52
+
return ImportResult::aggregate($did, $results);
53
+
}
54
+
55
+
/**
56
+
* Import a specific collection for a user.
57
+
*/
58
+
public function importUserCollection(string $did, string $collection, ?callable $onProgress = null): ImportResult
59
+
{
60
+
$mapper = $this->registry->forLexicon($collection);
61
+
62
+
if (! $mapper) {
63
+
return ImportResult::failed($did, $collection, "No mapper registered for collection: {$collection}");
64
+
}
65
+
66
+
$state = ImportState::findOrCreateFor($did, $collection);
67
+
68
+
if ($state->isComplete()) {
69
+
return $state->toResult();
70
+
}
71
+
72
+
$pdsEndpoint = $this->resolvePds($did);
73
+
74
+
if (! $pdsEndpoint) {
75
+
$error = "Could not resolve PDS endpoint for DID: {$did}";
76
+
$state->markFailed($error);
77
+
event(new ImportFailed($did, $collection, $error));
78
+
79
+
return ImportResult::failed($did, $collection, $error);
80
+
}
81
+
82
+
$state->markStarted();
83
+
event(new ImportStarted($did, $collection));
84
+
85
+
$client = $this->clientFor($pdsEndpoint);
86
+
$cursor = $state->cursor;
87
+
$pageSize = config('parity.import.page_size', 100);
88
+
$pageDelay = config('parity.import.page_delay', 100);
89
+
$recordClass = $mapper->recordClass();
90
+
91
+
try {
92
+
do {
93
+
$response = $client->atproto->repo->listRecords(
94
+
repo: $did,
95
+
collection: $collection,
96
+
limit: $pageSize,
97
+
cursor: $cursor
98
+
);
99
+
100
+
$synced = 0;
101
+
$skipped = 0;
102
+
$failed = 0;
103
+
104
+
foreach ($response->records as $item) {
105
+
try {
106
+
$record = $recordClass::fromArray($item['value']);
107
+
108
+
$mapper->upsert($record, [
109
+
'uri' => $item['uri'],
110
+
'cid' => $item['cid'],
111
+
]);
112
+
113
+
$synced++;
114
+
} catch (Throwable $e) {
115
+
$failed++;
116
+
}
117
+
}
118
+
119
+
$cursor = $response->cursor;
120
+
$state->updateProgress($synced, $skipped, $failed, $cursor);
121
+
122
+
if ($onProgress) {
123
+
$onProgress(new ImportProgress(
124
+
did: $did,
125
+
collection: $collection,
126
+
recordsSynced: $state->records_synced,
127
+
cursor: $cursor
128
+
));
129
+
}
130
+
131
+
event(new ImportProgress($did, $collection, $state->records_synced, $cursor));
132
+
133
+
if ($cursor && $pageDelay > 0) {
134
+
usleep($pageDelay * 1000);
135
+
}
136
+
} while ($cursor);
137
+
138
+
$state->markCompleted();
139
+
$result = $state->toResult();
140
+
event(new ImportCompleted($result));
141
+
142
+
return $result;
143
+
} catch (Throwable $e) {
144
+
$error = $e->getMessage();
145
+
$state->markFailed($error);
146
+
event(new ImportFailed($did, $collection, $error));
147
+
148
+
return ImportResult::failed(
149
+
did: $did,
150
+
collection: $collection,
151
+
error: $error,
152
+
synced: $state->records_synced,
153
+
skipped: $state->records_skipped,
154
+
failed: $state->records_failed,
155
+
cursor: $state->cursor
156
+
);
157
+
}
158
+
}
159
+
160
+
/**
161
+
* Resume an interrupted import from cursor.
162
+
*/
163
+
public function resume(ImportState $state, ?callable $onProgress = null): ImportResult
164
+
{
165
+
if (! $state->canResume()) {
166
+
return $state->toResult();
167
+
}
168
+
169
+
$state->update(['status' => ImportState::STATUS_PENDING]);
170
+
171
+
return $this->importUserCollection($state->did, $state->collection, $onProgress);
172
+
}
173
+
174
+
/**
175
+
* Resume all interrupted imports.
176
+
*
177
+
* @return array<ImportResult>
178
+
*/
179
+
public function resumeAll(?callable $onProgress = null): array
180
+
{
181
+
$results = [];
182
+
183
+
ImportState::resumable()->each(function (ImportState $state) use (&$results, $onProgress) {
184
+
$results[] = $this->resume($state, $onProgress);
185
+
});
186
+
187
+
return $results;
188
+
}
189
+
190
+
/**
191
+
* Get import status for a DID/collection.
192
+
*/
193
+
public function getStatus(string $did, string $collection): ?ImportState
194
+
{
195
+
return ImportState::where('did', $did)
196
+
->where('collection', $collection)
197
+
->first();
198
+
}
199
+
200
+
/**
201
+
* Get all import states for a DID.
202
+
*
203
+
* @return \Illuminate\Database\Eloquent\Collection<int, ImportState>
204
+
*/
205
+
public function getStatusForUser(string $did): \Illuminate\Database\Eloquent\Collection
206
+
{
207
+
return ImportState::where('did', $did)->get();
208
+
}
209
+
210
+
/**
211
+
* Check if a user's collection has been imported.
212
+
*/
213
+
public function isImported(string $did, string $collection): bool
214
+
{
215
+
$state = $this->getStatus($did, $collection);
216
+
217
+
return $state && $state->isComplete();
218
+
}
219
+
220
+
/**
221
+
* Reset an import state to allow re-importing.
222
+
*/
223
+
public function reset(string $did, string $collection): void
224
+
{
225
+
ImportState::where('did', $did)
226
+
->where('collection', $collection)
227
+
->delete();
228
+
}
229
+
230
+
/**
231
+
* Reset all import states for a user.
232
+
*/
233
+
public function resetUser(string $did): void
234
+
{
235
+
ImportState::where('did', $did)->delete();
236
+
}
237
+
238
+
/**
239
+
* Get or create a client for a PDS endpoint.
240
+
*/
241
+
protected function clientFor(string $pdsEndpoint): AtpClient
242
+
{
243
+
return $this->clients[$pdsEndpoint] ??= Atp::public($pdsEndpoint);
244
+
}
245
+
246
+
/**
247
+
* Resolve the PDS endpoint for a DID.
248
+
*/
249
+
protected function resolvePds(string $did): ?string
250
+
{
251
+
return Resolver::resolvePds($did);
252
+
}
253
+
}
+220
src/Support/RecordHelper.php
+220
src/Support/RecordHelper.php
···
1
+
<?php
2
+
3
+
namespace SocialDept\AtpParity\Support;
4
+
5
+
use Illuminate\Database\Eloquent\Model;
6
+
use SocialDept\AtpClient\AtpClient;
7
+
use SocialDept\AtpClient\Data\Responses\Atproto\Repo\GetRecordResponse;
8
+
use SocialDept\AtpClient\Facades\Atp;
9
+
use SocialDept\AtpParity\MapperRegistry;
10
+
use SocialDept\AtpResolver\Facades\Resolver;
11
+
use SocialDept\AtpSchema\Data\Data;
12
+
13
+
/**
14
+
* Helper for integrating atp-parity with atp-client.
15
+
*
16
+
* Provides convenient methods for fetching records from the ATP network
17
+
* and converting them to typed DTOs or Eloquent models.
18
+
*/
19
+
class RecordHelper
20
+
{
21
+
/**
22
+
* Cache of clients by PDS endpoint.
23
+
*
24
+
* @var array<string, AtpClient>
25
+
*/
26
+
protected array $clients = [];
27
+
28
+
public function __construct(
29
+
protected MapperRegistry $registry
30
+
) {}
31
+
32
+
/**
33
+
* Get or create a client for a PDS endpoint.
34
+
*/
35
+
protected function clientFor(string $pdsEndpoint): AtpClient
36
+
{
37
+
return $this->clients[$pdsEndpoint] ??= Atp::public($pdsEndpoint);
38
+
}
39
+
40
+
/**
41
+
* Resolve the PDS endpoint for a DID or handle.
42
+
*/
43
+
protected function resolvePds(string $actor): ?string
44
+
{
45
+
return Resolver::resolvePds($actor);
46
+
}
47
+
48
+
/**
49
+
* Convert a GetRecordResponse to a typed record DTO.
50
+
*
51
+
* @template T of Data
52
+
*
53
+
* @param class-string<T>|null $recordClass Explicit record class, or null to auto-detect from mapper
54
+
* @return T|array The typed record, or raw array if no mapper found and no class specified
55
+
*/
56
+
public function hydrateRecord(GetRecordResponse $response, ?string $recordClass = null): mixed
57
+
{
58
+
if ($recordClass) {
59
+
return $recordClass::fromArray($response->value);
60
+
}
61
+
62
+
$collection = $this->extractCollection($response->uri);
63
+
$mapper = $this->registry->forLexicon($collection);
64
+
65
+
if (! $mapper) {
66
+
return $response->value;
67
+
}
68
+
69
+
$recordClass = $mapper->recordClass();
70
+
71
+
return $recordClass::fromArray($response->value);
72
+
}
73
+
74
+
/**
75
+
* Fetch a record from the ATP network by URI and return as typed DTO.
76
+
*
77
+
* @template T of Data
78
+
*
79
+
* @param class-string<T>|null $recordClass
80
+
* @return T|array|null
81
+
*/
82
+
public function fetch(string $uri, ?string $recordClass = null): mixed
83
+
{
84
+
$parts = $this->parseUri($uri);
85
+
86
+
if (! $parts) {
87
+
return null;
88
+
}
89
+
90
+
$pdsEndpoint = $this->resolvePds($parts['repo']);
91
+
92
+
if (! $pdsEndpoint) {
93
+
return null;
94
+
}
95
+
96
+
$response = $this->clientFor($pdsEndpoint)->atproto->repo->getRecord(
97
+
$parts['repo'],
98
+
$parts['collection'],
99
+
$parts['rkey']
100
+
);
101
+
102
+
return $this->hydrateRecord($response, $recordClass);
103
+
}
104
+
105
+
/**
106
+
* Fetch a record by URI and convert directly to an Eloquent model.
107
+
*
108
+
* @template TModel of Model
109
+
*
110
+
* @return TModel|null
111
+
*/
112
+
public function fetchAsModel(string $uri): ?Model
113
+
{
114
+
$parts = $this->parseUri($uri);
115
+
116
+
if (! $parts) {
117
+
return null;
118
+
}
119
+
120
+
$mapper = $this->registry->forLexicon($parts['collection']);
121
+
122
+
if (! $mapper) {
123
+
return null;
124
+
}
125
+
126
+
$pdsEndpoint = $this->resolvePds($parts['repo']);
127
+
128
+
if (! $pdsEndpoint) {
129
+
return null;
130
+
}
131
+
132
+
$response = $this->clientFor($pdsEndpoint)->atproto->repo->getRecord(
133
+
$parts['repo'],
134
+
$parts['collection'],
135
+
$parts['rkey']
136
+
);
137
+
138
+
$recordClass = $mapper->recordClass();
139
+
$record = $recordClass::fromArray($response->value);
140
+
141
+
return $mapper->toModel($record, [
142
+
'uri' => $response->uri,
143
+
'cid' => $response->cid,
144
+
]);
145
+
}
146
+
147
+
/**
148
+
* Fetch a record by URI and upsert to the database.
149
+
*
150
+
* @template TModel of Model
151
+
*
152
+
* @return TModel|null
153
+
*/
154
+
public function sync(string $uri): ?Model
155
+
{
156
+
$parts = $this->parseUri($uri);
157
+
158
+
if (! $parts) {
159
+
return null;
160
+
}
161
+
162
+
$mapper = $this->registry->forLexicon($parts['collection']);
163
+
164
+
if (! $mapper) {
165
+
return null;
166
+
}
167
+
168
+
$pdsEndpoint = $this->resolvePds($parts['repo']);
169
+
170
+
if (! $pdsEndpoint) {
171
+
return null;
172
+
}
173
+
174
+
$response = $this->clientFor($pdsEndpoint)->atproto->repo->getRecord(
175
+
$parts['repo'],
176
+
$parts['collection'],
177
+
$parts['rkey']
178
+
);
179
+
180
+
$recordClass = $mapper->recordClass();
181
+
$record = $recordClass::fromArray($response->value);
182
+
183
+
return $mapper->upsert($record, [
184
+
'uri' => $response->uri,
185
+
'cid' => $response->cid,
186
+
]);
187
+
}
188
+
189
+
/**
190
+
* Parse an AT Protocol URI into its components.
191
+
*
192
+
* @return array{repo: string, collection: string, rkey: string}|null
193
+
*/
194
+
protected function parseUri(string $uri): ?array
195
+
{
196
+
// at://did:plc:xxx/app.bsky.feed.post/rkey
197
+
if (! preg_match('#^at://([^/]+)/([^/]+)/([^/]+)$#', $uri, $matches)) {
198
+
return null;
199
+
}
200
+
201
+
return [
202
+
'repo' => $matches[1],
203
+
'collection' => $matches[2],
204
+
'rkey' => $matches[3],
205
+
];
206
+
}
207
+
208
+
/**
209
+
* Extract collection from AT Protocol URI.
210
+
*/
211
+
protected function extractCollection(string $uri): string
212
+
{
213
+
// at://did:plc:xxx/app.bsky.feed.post/rkey
214
+
if (preg_match('#^at://[^/]+/([^/]+)/#', $uri, $matches)) {
215
+
return $matches[1];
216
+
}
217
+
218
+
return '';
219
+
}
220
+
}