Maintain local ⭤ remote in sync with automatic AT Protocol parity for Laravel (alpha & unstable)
at dev 5.5 kB view raw
1<?php 2 3namespace SocialDept\AtpParity\Commands; 4 5use Illuminate\Console\Command; 6use SocialDept\AtpParity\Events\ImportProgress; 7use SocialDept\AtpParity\Import\ImportService; 8use SocialDept\AtpParity\Jobs\ImportUserJob; 9use SocialDept\AtpParity\MapperRegistry; 10 11use function Laravel\Prompts\error; 12use function Laravel\Prompts\info; 13use function Laravel\Prompts\note; 14use function Laravel\Prompts\warning; 15 16class ImportCommand extends Command 17{ 18 protected $signature = 'parity:import 19 {did? : The DID to import} 20 {--collection= : Specific collection to import} 21 {--file= : File containing DIDs to import (one per line)} 22 {--resume : Resume all interrupted imports} 23 {--queue : Queue the import job instead of running synchronously} 24 {--progress : Show progress output}'; 25 26 protected $description = 'Import AT Protocol records for a user or from a file of DIDs'; 27 28 public function handle(ImportService $service, MapperRegistry $registry): int 29 { 30 if ($this->option('resume')) { 31 return $this->handleResume($service); 32 } 33 34 $did = $this->argument('did'); 35 $file = $this->option('file'); 36 37 if (! $did && ! $file) { 38 error('Please provide a DID or use --file to specify a file of DIDs'); 39 40 return self::FAILURE; 41 } 42 43 if ($file) { 44 return $this->handleFile($file, $service); 45 } 46 47 return $this->importDid($did, $service, $registry); 48 } 49 50 protected function handleResume(ImportService $service): int 51 { 52 info('Resuming interrupted imports...'); 53 54 $results = $service->resumeAll($this->getProgressCallback()); 55 56 if (empty($results)) { 57 note('No interrupted imports found'); 58 59 return self::SUCCESS; 60 } 61 62 $success = 0; 63 $failed = 0; 64 65 foreach ($results as $result) { 66 if ($result->isSuccess()) { 67 $success++; 68 } else { 69 $failed++; 70 } 71 } 72 73 info("Resumed {$success} imports successfully"); 74 75 if ($failed > 0) { 76 warning("{$failed} imports failed"); 77 } 78 79 return $failed > 0 ? self::FAILURE : self::SUCCESS; 80 } 81 82 protected function handleFile(string $file, ImportService $service): int 83 { 84 if (! file_exists($file)) { 85 error("File not found: {$file}"); 86 87 return self::FAILURE; 88 } 89 90 $dids = array_filter(array_map('trim', file($file))); 91 $total = count($dids); 92 $success = 0; 93 $failed = 0; 94 95 info("Importing {$total} DIDs from {$file}"); 96 97 foreach ($dids as $index => $did) { 98 if (! str_starts_with($did, 'did:')) { 99 warning("Skipping invalid DID: {$did}"); 100 101 continue; 102 } 103 104 $current = $index + 1; 105 note("[{$current}/{$total}] Importing {$did}"); 106 107 if ($this->option('queue')) { 108 ImportUserJob::dispatch($did, $this->option('collection')); 109 $success++; 110 } else { 111 $result = $service->importUser($did, $this->getCollections(), $this->getProgressCallback()); 112 113 if ($result->isSuccess()) { 114 $success++; 115 } else { 116 $failed++; 117 warning("Failed: {$result->error}"); 118 } 119 } 120 } 121 122 info("Completed: {$success} successful, {$failed} failed"); 123 124 return $failed > 0 ? self::FAILURE : self::SUCCESS; 125 } 126 127 protected function importDid(string $did, ImportService $service, MapperRegistry $registry): int 128 { 129 if (! str_starts_with($did, 'did:')) { 130 error("Invalid DID format: {$did}"); 131 132 return self::FAILURE; 133 } 134 135 $collections = $this->getCollections(); 136 $collectionDisplay = $collections ? implode(', ', $collections) : 'all registered'; 137 138 info("Importing {$did} ({$collectionDisplay})"); 139 140 if ($this->option('queue')) { 141 ImportUserJob::dispatch($did, $this->option('collection')); 142 note('Import job queued'); 143 144 return self::SUCCESS; 145 } 146 147 $result = $service->importUser($did, $collections, $this->getProgressCallback()); 148 149 if ($result->isSuccess()) { 150 info("Import completed: {$result->recordsSynced} records synced"); 151 152 if ($result->recordsSkipped > 0) { 153 note("{$result->recordsSkipped} records skipped"); 154 } 155 156 if ($result->recordsFailed > 0) { 157 warning("{$result->recordsFailed} records failed"); 158 } 159 160 return self::SUCCESS; 161 } 162 163 error("Import failed: {$result->error}"); 164 165 if ($result->recordsSynced > 0) { 166 note("Partial progress: {$result->recordsSynced} records synced before failure"); 167 } 168 169 return self::FAILURE; 170 } 171 172 protected function getCollections(): ?array 173 { 174 $collection = $this->option('collection'); 175 176 return $collection ? [$collection] : null; 177 } 178 179 protected function getProgressCallback(): ?callable 180 { 181 if (! $this->option('progress')) { 182 return null; 183 } 184 185 return function (ImportProgress $progress) { 186 $this->output->write("\r"); 187 $this->output->write(" [{$progress->collection}] {$progress->recordsSynced} records synced"); 188 }; 189 } 190}