Build Reactive Signals for Bluesky's AT Protocol Firehose in Laravel

Migrate consumers to custom CBOR/CAR parsing

Changed files
+50 -75
src
+2 -2
src/Commands/ConsumeCommand.php
··· 115 115 if ($this->option('fresh')) { 116 116 $this->info('Starting fresh from the beginning'); 117 117 118 - return null; 118 + return 0; // Explicitly 0 means "start fresh, no cursor" 119 119 } 120 120 121 121 if ($this->option('cursor')) { ··· 127 127 128 128 $this->info('Resuming from stored cursor position'); 129 129 130 - return null; 130 + return null; // null means "use stored cursor" 131 131 } 132 132 133 133 private function startConsumer(string $mode, ?int $cursor): int
+42 -49
src/Services/FirehoseConsumer.php
··· 4 4 5 5 use Illuminate\Support\Arr; 6 6 use Illuminate\Support\Facades\Log; 7 - use Revolution\Bluesky\Core\CAR; 8 - use Revolution\Bluesky\Core\CBOR; 9 7 use SocialDept\Signal\Contracts\CursorStore; 8 + use SocialDept\Signal\Core\CAR; 9 + use SocialDept\Signal\Core\CBOR; 10 + use SocialDept\Signal\Core\CID; 10 11 use SocialDept\Signal\Events\AccountEvent; 11 12 use SocialDept\Signal\Events\CommitEvent; 12 13 use SocialDept\Signal\Events\IdentityEvent; ··· 45 46 { 46 47 $this->shouldStop = false; 47 48 48 - // Get cursor from storage if not provided 49 + // Get cursor from storage if not explicitly provided 50 + // null = use stored cursor, 0 = start fresh (no cursor), >0 = specific cursor 49 51 if ($cursor === null) { 50 52 $cursor = $this->cursorStore->get(); 51 53 } 52 54 53 - $url = $this->buildWebSocketUrl($cursor); 55 + // If cursor is explicitly 0, don't send it (fresh start) 56 + $url = $this->buildWebSocketUrl($cursor > 0 ? $cursor : null); 54 57 55 58 Log::info('Signal: Starting Firehose consumer', [ 56 59 'url' => $url, 57 - 'cursor' => $cursor, 60 + 'cursor' => $cursor > 0 ? $cursor : 'none (fresh start)', 58 61 'mode' => 'firehose', 59 62 ]); 60 63 ··· 176 179 $time = $payload['time']; 177 180 $timeUs = $payload['seq'] ?? 0; // Use seq as time_us equivalent 178 181 179 - // Parse CAR blocks 182 + // Parse CAR blocks (returns CID => block data map) 180 183 $records = $payload['blocks']; 184 + 181 185 $blocks = []; 182 186 if (! empty($records)) { 183 - $blocks = rescue(fn () => iterator_to_array(CAR::blockMap($records)), []); 187 + $blocks = rescue(fn () => CAR::blockMap($records, $did), [], function (\Throwable $e) { 188 + Log::warning('Signal: Failed to parse CAR blocks', [ 189 + 'error' => $e->getMessage(), 190 + 'trace' => $e->getTraceAsString(), 191 + ]); 192 + }); 184 193 } 185 194 186 195 // Process operations ··· 202 211 $rkey = ''; 203 212 204 213 if (str_contains($path, '/')) { 205 - [$collection, $rkey] = explode('/', $path); 214 + [$collection, $rkey] = explode('/', $path, 2); 206 215 } 207 216 208 - $record = $blocks[$path] ?? []; 217 + // Get record data from blocks using the op CID 218 + // Convert CID to string if it's an object 219 + $cidStr = $cid instanceof CID ? $cid->toString() : $cid; 220 + 221 + // For delete operations, there won't be a record 222 + $record = []; 223 + if ($action !== 'delete' && isset($blocks[$cidStr])) { 224 + // Decode the CBOR block to get the record data 225 + $decoded = rescue(fn () => CBOR::decode($blocks[$cidStr])); 226 + if (is_array($decoded)) { 227 + $record = $decoded; 228 + } 229 + } 209 230 210 231 // Convert to SignalEvent format for compatibility 211 - $event = $this->buildSignalEvent($did, $timeUs, $action, $collection, $rkey, $rev, $cid, $record); 232 + $event = $this->buildSignalEvent($did, $timeUs, $action, $collection, $rkey, $rev, $cidStr, $record); 212 233 213 - // Dispatch event with cursor update and logging 214 - $this->dispatchSignalEvent($event, 'Commit', [ 215 - 'collection' => $collection, 216 - 'operation' => $action, 217 - ]); 234 + // Dispatch event with cursor update 235 + $this->dispatchSignalEvent($event); 218 236 } 219 237 } 220 238 ··· 231 249 ?string $cid, 232 250 array $record 233 251 ): SignalEvent { 234 - $recordValue = $record['value'] ?? null; 252 + // Record is already the decoded data, or empty array for deletes 253 + $recordValue = ! empty($record) ? (object) $record : null; 235 254 236 255 $commitEvent = new CommitEvent( 237 256 rev: $rev, 238 257 operation: $operation, 239 258 collection: $collection, 240 259 rkey: $rkey, 241 - record: $recordValue ? (object) $recordValue : null, 260 + record: $recordValue, 242 261 cid: $cid 243 262 ); 244 263 ··· 251 270 } 252 271 253 272 /** 254 - * Dispatch a SignalEvent with cursor update and logging. 273 + * Dispatch a SignalEvent with cursor update. 255 274 */ 256 - protected function dispatchSignalEvent(SignalEvent $event, string $eventType, array $context = []): void 275 + protected function dispatchSignalEvent(SignalEvent $event): void 257 276 { 258 277 // Update cursor 259 278 $this->cursorStore->set($event->timeUs); 260 279 261 - // Check if any signals match this event 262 - $matchingSignals = $this->signalRegistry->getMatchingSignals($event); 263 - 264 - if ($matchingSignals->isNotEmpty()) { 265 - Log::info("Signal: {$eventType} event matched", array_merge([ 266 - 'matched_signals' => $matchingSignals->count(), 267 - 'signal_names' => $matchingSignals->map(fn ($s) => class_basename($s))->join(', '), 268 - ], $context)); 269 - } 270 - 271 280 // Dispatch to matching signals 272 281 $this->eventDispatcher->dispatch($event); 273 282 } ··· 306 315 identity: $identityEvent 307 316 ); 308 317 309 - // Dispatch event with cursor update and logging 310 - $this->dispatchSignalEvent($event, 'Identity', [ 311 - 'did' => $did, 312 - 'handle' => $handle, 313 - ]); 318 + // Dispatch event with cursor update 319 + $this->dispatchSignalEvent($event); 314 320 } 315 321 316 322 /** ··· 349 355 account: $accountEvent 350 356 ); 351 357 352 - // Dispatch event with cursor update and logging 353 - $this->dispatchSignalEvent($event, 'Account', [ 354 - 'did' => $did, 355 - 'active' => $active, 356 - 'status' => $status, 357 - ]); 358 + // Dispatch event with cursor update 359 + $this->dispatchSignalEvent($event); 358 360 } 359 361 360 362 /** ··· 441 443 if (! empty($params)) { 442 444 $url .= '?'.implode('&', $params); 443 445 } 444 - 445 - Log::warning('Signal: Firehose mode - NO server-side collection filtering', [ 446 - 'note' => 'All events will be received and filtered client-side', 447 - 'registered_collections' => $this->signalRegistry->all() 448 - ->flatMap(fn ($signal) => $signal->collections() ?? []) 449 - ->unique() 450 - ->values() 451 - ->toArray(), 452 - ]); 453 446 454 447 return $url; 455 448 }
+6 -24
src/Services/JetstreamConsumer.php
··· 39 39 { 40 40 $this->shouldStop = false; 41 41 42 - // Get cursor from storage if not provided 42 + // Get cursor from storage if not explicitly provided 43 + // null = use stored cursor, 0 = start fresh (no cursor), >0 = specific cursor 43 44 if ($cursor === null) { 44 45 $cursor = $this->cursorStore->get(); 45 46 } 46 47 47 - $url = $this->buildWebSocketUrl($cursor); 48 + // If cursor is explicitly 0, don't send it (fresh start) 49 + $url = $this->buildWebSocketUrl($cursor > 0 ? $cursor : null); 48 50 49 51 Log::info('Signal: Starting Jetstream consumer', [ 50 52 'url' => $url, 51 - 'cursor' => $cursor, 53 + 'cursor' => $cursor > 0 ? $cursor : 'none (fresh start)', 54 + 'mode' => 'firehose', 52 55 ]); 53 56 54 57 $this->connect($url); ··· 127 130 // Update cursor 128 131 $this->cursorStore->set($event->timeUs); 129 132 130 - // Check if any signals match this event 131 - $matchingSignals = $this->signalRegistry->getMatchingSignals($event); 132 - 133 - if ($matchingSignals->isNotEmpty()) { 134 - $collection = $event->getCollection() ?? $event->kind; 135 - $operation = $event->getOperation() ?? 'event'; 136 - 137 - Log::info('Signal: Event matched', [ 138 - 'collection' => $collection, 139 - 'operation' => $operation, 140 - 'matched_signals' => $matchingSignals->count(), 141 - 'signal_names' => $matchingSignals->map(fn ($s) => class_basename($s))->join(', '), 142 - ]); 143 - } 144 - 145 133 // Dispatch to matching signals 146 134 $this->eventDispatcher->dispatch($event); 147 135 ··· 244 232 foreach ($collections as $collection) { 245 233 $params[] = 'wantedCollections='.urlencode($collection); 246 234 } 247 - 248 - Log::info('Signal: Collection filters applied', [ 249 - 'collections' => $collections->toArray(), 250 - ]); 251 - } else { 252 - Log::warning('Signal: No collection filters - will receive ALL events'); 253 235 } 254 236 255 237 if (! empty($params)) {