Maintain local ⭤ remote in sync with automatic AT Protocol parity for Laravel (alpha & unstable)
1<?php
2
3namespace SocialDept\AtpParity\Import;
4
5use Illuminate\Database\Eloquent\Builder;
6use Illuminate\Database\Eloquent\Model;
7
8/**
9 * Tracks import progress for a DID/collection pair.
10 *
11 * @property int $id
12 * @property string $did
13 * @property string $collection
14 * @property string $status
15 * @property string|null $cursor
16 * @property int $records_synced
17 * @property int $records_skipped
18 * @property int $records_failed
19 * @property \Carbon\Carbon|null $started_at
20 * @property \Carbon\Carbon|null $completed_at
21 * @property string|null $error
22 * @property \Carbon\Carbon $created_at
23 * @property \Carbon\Carbon $updated_at
24 */
25class ImportState extends Model
26{
27 public const STATUS_PENDING = 'pending';
28
29 public const STATUS_IN_PROGRESS = 'in_progress';
30
31 public const STATUS_COMPLETED = 'completed';
32
33 public const STATUS_FAILED = 'failed';
34
35 protected $fillable = [
36 'did',
37 'collection',
38 'status',
39 'cursor',
40 'records_synced',
41 'records_skipped',
42 'records_failed',
43 'started_at',
44 'completed_at',
45 'error',
46 ];
47
48 protected $casts = [
49 'records_synced' => 'integer',
50 'records_skipped' => 'integer',
51 'records_failed' => 'integer',
52 'started_at' => 'datetime',
53 'completed_at' => 'datetime',
54 ];
55
56 public function getTable(): string
57 {
58 return config('parity.import.state_table', 'parity_import_states');
59 }
60
61 /**
62 * Start the import process for this state.
63 */
64 public function markStarted(): self
65 {
66 $this->update([
67 'status' => self::STATUS_IN_PROGRESS,
68 'started_at' => now(),
69 'error' => null,
70 ]);
71
72 return $this;
73 }
74
75 /**
76 * Mark the import as completed.
77 */
78 public function markCompleted(): self
79 {
80 $this->update([
81 'status' => self::STATUS_COMPLETED,
82 'completed_at' => now(),
83 'cursor' => null,
84 ]);
85
86 return $this;
87 }
88
89 /**
90 * Mark the import as failed.
91 */
92 public function markFailed(string $error): self
93 {
94 $this->update([
95 'status' => self::STATUS_FAILED,
96 'error' => $error,
97 ]);
98
99 return $this;
100 }
101
102 /**
103 * Update progress during import.
104 */
105 public function updateProgress(int $synced, int $skipped = 0, int $failed = 0, ?string $cursor = null): self
106 {
107 $this->increment('records_synced', $synced);
108
109 if ($skipped > 0) {
110 $this->increment('records_skipped', $skipped);
111 }
112
113 if ($failed > 0) {
114 $this->increment('records_failed', $failed);
115 }
116
117 if ($cursor !== null) {
118 $this->update(['cursor' => $cursor]);
119 }
120
121 return $this;
122 }
123
124 /**
125 * Check if this import can be resumed.
126 */
127 public function canResume(): bool
128 {
129 return $this->status === self::STATUS_IN_PROGRESS
130 || $this->status === self::STATUS_FAILED;
131 }
132
133 /**
134 * Check if this import is complete.
135 */
136 public function isComplete(): bool
137 {
138 return $this->status === self::STATUS_COMPLETED;
139 }
140
141 /**
142 * Check if this import is currently running.
143 */
144 public function isRunning(): bool
145 {
146 return $this->status === self::STATUS_IN_PROGRESS;
147 }
148
149 /**
150 * Scope to pending imports.
151 */
152 public function scopePending(Builder $query): Builder
153 {
154 return $query->where('status', self::STATUS_PENDING);
155 }
156
157 /**
158 * Scope to in-progress imports.
159 */
160 public function scopeInProgress(Builder $query): Builder
161 {
162 return $query->where('status', self::STATUS_IN_PROGRESS);
163 }
164
165 /**
166 * Scope to completed imports.
167 */
168 public function scopeCompleted(Builder $query): Builder
169 {
170 return $query->where('status', self::STATUS_COMPLETED);
171 }
172
173 /**
174 * Scope to failed imports.
175 */
176 public function scopeFailed(Builder $query): Builder
177 {
178 return $query->where('status', self::STATUS_FAILED);
179 }
180
181 /**
182 * Scope to incomplete imports (pending, in_progress, or failed).
183 */
184 public function scopeIncomplete(Builder $query): Builder
185 {
186 return $query->whereIn('status', [
187 self::STATUS_PENDING,
188 self::STATUS_IN_PROGRESS,
189 self::STATUS_FAILED,
190 ]);
191 }
192
193 /**
194 * Scope to resumable imports (in_progress or failed with cursor).
195 */
196 public function scopeResumable(Builder $query): Builder
197 {
198 return $query->whereIn('status', [
199 self::STATUS_IN_PROGRESS,
200 self::STATUS_FAILED,
201 ]);
202 }
203
204 /**
205 * Find or create an import state for a DID/collection pair.
206 */
207 public static function findOrCreateFor(string $did, string $collection): self
208 {
209 return static::firstOrCreate(
210 ['did' => $did, 'collection' => $collection],
211 ['status' => self::STATUS_PENDING]
212 );
213 }
214
215 /**
216 * Convert to ImportResult.
217 */
218 public function toResult(): ImportResult
219 {
220 return new ImportResult(
221 did: $this->did,
222 collection: $this->collection,
223 recordsSynced: $this->records_synced,
224 recordsSkipped: $this->records_skipped,
225 recordsFailed: $this->records_failed,
226 completed: $this->isComplete(),
227 cursor: $this->cursor,
228 error: $this->error,
229 );
230 }
231}