Maintain local ⭤ remote in sync with automatic AT Protocol parity for Laravel (alpha & unstable)
1<?php
2
3namespace SocialDept\AtpParity\Commands;
4
5use Illuminate\Console\Command;
6use SocialDept\AtpParity\Events\ImportProgress;
7use SocialDept\AtpParity\Import\ImportService;
8use SocialDept\AtpParity\Jobs\ImportUserJob;
9use SocialDept\AtpParity\MapperRegistry;
10
11use function Laravel\Prompts\error;
12use function Laravel\Prompts\info;
13use function Laravel\Prompts\note;
14use function Laravel\Prompts\warning;
15
16class ImportCommand extends Command
17{
18 protected $signature = 'parity:import
19 {did? : The DID to import}
20 {--collection= : Specific collection to import}
21 {--file= : File containing DIDs to import (one per line)}
22 {--resume : Resume all interrupted imports}
23 {--queue : Queue the import job instead of running synchronously}
24 {--progress : Show progress output}';
25
26 protected $description = 'Import AT Protocol records for a user or from a file of DIDs';
27
28 public function handle(ImportService $service, MapperRegistry $registry): int
29 {
30 if ($this->option('resume')) {
31 return $this->handleResume($service);
32 }
33
34 $did = $this->argument('did');
35 $file = $this->option('file');
36
37 if (! $did && ! $file) {
38 error('Please provide a DID or use --file to specify a file of DIDs');
39
40 return self::FAILURE;
41 }
42
43 if ($file) {
44 return $this->handleFile($file, $service);
45 }
46
47 return $this->importDid($did, $service, $registry);
48 }
49
50 protected function handleResume(ImportService $service): int
51 {
52 info('Resuming interrupted imports...');
53
54 $results = $service->resumeAll($this->getProgressCallback());
55
56 if (empty($results)) {
57 note('No interrupted imports found');
58
59 return self::SUCCESS;
60 }
61
62 $success = 0;
63 $failed = 0;
64
65 foreach ($results as $result) {
66 if ($result->isSuccess()) {
67 $success++;
68 } else {
69 $failed++;
70 }
71 }
72
73 info("Resumed {$success} imports successfully");
74
75 if ($failed > 0) {
76 warning("{$failed} imports failed");
77 }
78
79 return $failed > 0 ? self::FAILURE : self::SUCCESS;
80 }
81
82 protected function handleFile(string $file, ImportService $service): int
83 {
84 if (! file_exists($file)) {
85 error("File not found: {$file}");
86
87 return self::FAILURE;
88 }
89
90 $dids = array_filter(array_map('trim', file($file)));
91 $total = count($dids);
92 $success = 0;
93 $failed = 0;
94
95 info("Importing {$total} DIDs from {$file}");
96
97 foreach ($dids as $index => $did) {
98 if (! str_starts_with($did, 'did:')) {
99 warning("Skipping invalid DID: {$did}");
100
101 continue;
102 }
103
104 $current = $index + 1;
105 note("[{$current}/{$total}] Importing {$did}");
106
107 if ($this->option('queue')) {
108 ImportUserJob::dispatch($did, $this->option('collection'));
109 $success++;
110 } else {
111 $result = $service->importUser($did, $this->getCollections(), $this->getProgressCallback());
112
113 if ($result->isSuccess()) {
114 $success++;
115 } else {
116 $failed++;
117 warning("Failed: {$result->error}");
118 }
119 }
120 }
121
122 info("Completed: {$success} successful, {$failed} failed");
123
124 return $failed > 0 ? self::FAILURE : self::SUCCESS;
125 }
126
127 protected function importDid(string $did, ImportService $service, MapperRegistry $registry): int
128 {
129 if (! str_starts_with($did, 'did:')) {
130 error("Invalid DID format: {$did}");
131
132 return self::FAILURE;
133 }
134
135 $collections = $this->getCollections();
136 $collectionDisplay = $collections ? implode(', ', $collections) : 'all registered';
137
138 info("Importing {$did} ({$collectionDisplay})");
139
140 if ($this->option('queue')) {
141 ImportUserJob::dispatch($did, $this->option('collection'));
142 note('Import job queued');
143
144 return self::SUCCESS;
145 }
146
147 $result = $service->importUser($did, $collections, $this->getProgressCallback());
148
149 if ($result->isSuccess()) {
150 info("Import completed: {$result->recordsSynced} records synced");
151
152 if ($result->recordsSkipped > 0) {
153 note("{$result->recordsSkipped} records skipped");
154 }
155
156 if ($result->recordsFailed > 0) {
157 warning("{$result->recordsFailed} records failed");
158 }
159
160 return self::SUCCESS;
161 }
162
163 error("Import failed: {$result->error}");
164
165 if ($result->recordsSynced > 0) {
166 note("Partial progress: {$result->recordsSynced} records synced before failure");
167 }
168
169 return self::FAILURE;
170 }
171
172 protected function getCollections(): ?array
173 {
174 $collection = $this->option('collection');
175
176 return $collection ? [$collection] : null;
177 }
178
179 protected function getProgressCallback(): ?callable
180 {
181 if (! $this->option('progress')) {
182 return null;
183 }
184
185 return function (ImportProgress $progress) {
186 $this->output->write("\r");
187 $this->output->write(" [{$progress->collection}] {$progress->recordsSynced} records synced");
188 };
189 }
190}