Maintain local ⭤ remote in sync with automatic AT Protocol parity for Laravel (alpha & unstable)
at dev 5.6 kB view raw
1<?php 2 3namespace SocialDept\AtpParity\Import; 4 5use Illuminate\Database\Eloquent\Builder; 6use Illuminate\Database\Eloquent\Model; 7 8/** 9 * Tracks import progress for a DID/collection pair. 10 * 11 * @property int $id 12 * @property string $did 13 * @property string $collection 14 * @property string $status 15 * @property string|null $cursor 16 * @property int $records_synced 17 * @property int $records_skipped 18 * @property int $records_failed 19 * @property \Carbon\Carbon|null $started_at 20 * @property \Carbon\Carbon|null $completed_at 21 * @property string|null $error 22 * @property \Carbon\Carbon $created_at 23 * @property \Carbon\Carbon $updated_at 24 */ 25class ImportState extends Model 26{ 27 public const STATUS_PENDING = 'pending'; 28 29 public const STATUS_IN_PROGRESS = 'in_progress'; 30 31 public const STATUS_COMPLETED = 'completed'; 32 33 public const STATUS_FAILED = 'failed'; 34 35 protected $fillable = [ 36 'did', 37 'collection', 38 'status', 39 'cursor', 40 'records_synced', 41 'records_skipped', 42 'records_failed', 43 'started_at', 44 'completed_at', 45 'error', 46 ]; 47 48 protected $casts = [ 49 'records_synced' => 'integer', 50 'records_skipped' => 'integer', 51 'records_failed' => 'integer', 52 'started_at' => 'datetime', 53 'completed_at' => 'datetime', 54 ]; 55 56 public function getTable(): string 57 { 58 return config('parity.import.state_table', 'parity_import_states'); 59 } 60 61 /** 62 * Start the import process for this state. 63 */ 64 public function markStarted(): self 65 { 66 $this->update([ 67 'status' => self::STATUS_IN_PROGRESS, 68 'started_at' => now(), 69 'error' => null, 70 ]); 71 72 return $this; 73 } 74 75 /** 76 * Mark the import as completed. 77 */ 78 public function markCompleted(): self 79 { 80 $this->update([ 81 'status' => self::STATUS_COMPLETED, 82 'completed_at' => now(), 83 'cursor' => null, 84 ]); 85 86 return $this; 87 } 88 89 /** 90 * Mark the import as failed. 91 */ 92 public function markFailed(string $error): self 93 { 94 $this->update([ 95 'status' => self::STATUS_FAILED, 96 'error' => $error, 97 ]); 98 99 return $this; 100 } 101 102 /** 103 * Update progress during import. 104 */ 105 public function updateProgress(int $synced, int $skipped = 0, int $failed = 0, ?string $cursor = null): self 106 { 107 $this->increment('records_synced', $synced); 108 109 if ($skipped > 0) { 110 $this->increment('records_skipped', $skipped); 111 } 112 113 if ($failed > 0) { 114 $this->increment('records_failed', $failed); 115 } 116 117 if ($cursor !== null) { 118 $this->update(['cursor' => $cursor]); 119 } 120 121 return $this; 122 } 123 124 /** 125 * Check if this import can be resumed. 126 */ 127 public function canResume(): bool 128 { 129 return $this->status === self::STATUS_IN_PROGRESS 130 || $this->status === self::STATUS_FAILED; 131 } 132 133 /** 134 * Check if this import is complete. 135 */ 136 public function isComplete(): bool 137 { 138 return $this->status === self::STATUS_COMPLETED; 139 } 140 141 /** 142 * Check if this import is currently running. 143 */ 144 public function isRunning(): bool 145 { 146 return $this->status === self::STATUS_IN_PROGRESS; 147 } 148 149 /** 150 * Scope to pending imports. 151 */ 152 public function scopePending(Builder $query): Builder 153 { 154 return $query->where('status', self::STATUS_PENDING); 155 } 156 157 /** 158 * Scope to in-progress imports. 159 */ 160 public function scopeInProgress(Builder $query): Builder 161 { 162 return $query->where('status', self::STATUS_IN_PROGRESS); 163 } 164 165 /** 166 * Scope to completed imports. 167 */ 168 public function scopeCompleted(Builder $query): Builder 169 { 170 return $query->where('status', self::STATUS_COMPLETED); 171 } 172 173 /** 174 * Scope to failed imports. 175 */ 176 public function scopeFailed(Builder $query): Builder 177 { 178 return $query->where('status', self::STATUS_FAILED); 179 } 180 181 /** 182 * Scope to incomplete imports (pending, in_progress, or failed). 183 */ 184 public function scopeIncomplete(Builder $query): Builder 185 { 186 return $query->whereIn('status', [ 187 self::STATUS_PENDING, 188 self::STATUS_IN_PROGRESS, 189 self::STATUS_FAILED, 190 ]); 191 } 192 193 /** 194 * Scope to resumable imports (in_progress or failed with cursor). 195 */ 196 public function scopeResumable(Builder $query): Builder 197 { 198 return $query->whereIn('status', [ 199 self::STATUS_IN_PROGRESS, 200 self::STATUS_FAILED, 201 ]); 202 } 203 204 /** 205 * Find or create an import state for a DID/collection pair. 206 */ 207 public static function findOrCreateFor(string $did, string $collection): self 208 { 209 return static::firstOrCreate( 210 ['did' => $did, 'collection' => $collection], 211 ['status' => self::STATUS_PENDING] 212 ); 213 } 214 215 /** 216 * Convert to ImportResult. 217 */ 218 public function toResult(): ImportResult 219 { 220 return new ImportResult( 221 did: $this->did, 222 collection: $this->collection, 223 recordsSynced: $this->records_synced, 224 recordsSkipped: $this->records_skipped, 225 recordsFailed: $this->records_failed, 226 completed: $this->isComplete(), 227 cursor: $this->cursor, 228 error: $this->error, 229 ); 230 } 231}