Build Reactive Signals for Bluesky's AT Protocol Firehose in Laravel

Implement identity and account event handling in FirehoseConsumer

Changed files
+100 -19
src
+100 -19
src/Services/FirehoseConsumer.php
··· 7 7 use Revolution\Bluesky\Core\CAR; 8 8 use Revolution\Bluesky\Core\CBOR; 9 9 use SocialDept\Signal\Contracts\CursorStore; 10 + use SocialDept\Signal\Events\AccountEvent; 10 11 use SocialDept\Signal\Events\CommitEvent; 12 + use SocialDept\Signal\Events\IdentityEvent; 11 13 use SocialDept\Signal\Events\SignalEvent; 12 14 use SocialDept\Signal\Exceptions\ConnectionException; 13 15 use SocialDept\Signal\Support\WebSocketConnection; ··· 208 210 // Convert to SignalEvent format for compatibility 209 211 $event = $this->buildSignalEvent($did, $timeUs, $action, $collection, $rkey, $rev, $cid, $record); 210 212 211 - // Update cursor 212 - $this->cursorStore->set($timeUs); 213 - 214 - // Check if any signals match this event 215 - $matchingSignals = $this->signalRegistry->getMatchingSignals($event); 216 - 217 - if ($matchingSignals->isNotEmpty()) { 218 - Log::info('Signal: Event matched', [ 219 - 'collection' => $collection, 220 - 'operation' => $action, 221 - 'matched_signals' => $matchingSignals->count(), 222 - 'signal_names' => $matchingSignals->map(fn ($s) => class_basename($s))->join(', '), 223 - ]); 224 - } 225 - 226 - // Dispatch to matching signals 227 - $this->eventDispatcher->dispatch($event); 213 + // Dispatch event with cursor update and logging 214 + $this->dispatchSignalEvent($event, 'Commit', [ 215 + 'collection' => $collection, 216 + 'operation' => $action, 217 + ]); 228 218 } 229 219 } 230 220 ··· 261 251 } 262 252 263 253 /** 254 + * Dispatch a SignalEvent with cursor update and logging. 255 + */ 256 + protected function dispatchSignalEvent(SignalEvent $event, string $eventType, array $context = []): void 257 + { 258 + // Update cursor 259 + $this->cursorStore->set($event->timeUs); 260 + 261 + // Check if any signals match this event 262 + $matchingSignals = $this->signalRegistry->getMatchingSignals($event); 263 + 264 + if ($matchingSignals->isNotEmpty()) { 265 + Log::info("Signal: {$eventType} event matched", array_merge([ 266 + 'matched_signals' => $matchingSignals->count(), 267 + 'signal_names' => $matchingSignals->map(fn ($s) => class_basename($s))->join(', '), 268 + ], $context)); 269 + } 270 + 271 + // Dispatch to matching signals 272 + $this->eventDispatcher->dispatch($event); 273 + } 274 + 275 + /** 264 276 * Handle identity event from Firehose. 265 277 */ 266 278 protected function handleIdentity(array $payload): void 267 279 { 268 - // Identity events are received but not currently processed 280 + // Validate required fields 281 + if (! isset($payload['did'])) { 282 + Log::debug('Signal: Invalid identity payload - missing did', ['payload' => $payload]); 283 + 284 + return; 285 + } 286 + 287 + $did = $payload['did']; 288 + $handle = $payload['handle'] ?? null; 289 + $seq = $payload['seq'] ?? 0; 290 + $time = $payload['time'] ?? null; 291 + $timeUs = $seq; // Use seq as timeUs equivalent for cursor management 292 + 293 + // Create IdentityEvent 294 + $identityEvent = new IdentityEvent( 295 + did: $did, 296 + handle: $handle, 297 + seq: $seq, 298 + time: $time 299 + ); 300 + 301 + // Create SignalEvent wrapper 302 + $event = new SignalEvent( 303 + did: $did, 304 + timeUs: $timeUs, 305 + kind: 'identity', 306 + identity: $identityEvent 307 + ); 308 + 309 + // Dispatch event with cursor update and logging 310 + $this->dispatchSignalEvent($event, 'Identity', [ 311 + 'did' => $did, 312 + 'handle' => $handle, 313 + ]); 269 314 } 270 315 271 316 /** ··· 273 318 */ 274 319 protected function handleAccount(array $payload): void 275 320 { 276 - // Account events are received but not currently processed 321 + // Validate required fields 322 + if (! isset($payload['did'], $payload['active'])) { 323 + Log::debug('Signal: Invalid account payload - missing required fields', ['payload' => $payload]); 324 + 325 + return; 326 + } 327 + 328 + $did = $payload['did']; 329 + $active = (bool) $payload['active']; 330 + $status = $payload['status'] ?? null; 331 + $seq = $payload['seq'] ?? 0; 332 + $time = $payload['time'] ?? null; 333 + $timeUs = $seq; // Use seq as timeUs equivalent for cursor management 334 + 335 + // Create AccountEvent 336 + $accountEvent = new AccountEvent( 337 + did: $did, 338 + active: $active, 339 + status: $status, 340 + seq: $seq, 341 + time: $time 342 + ); 343 + 344 + // Create SignalEvent wrapper 345 + $event = new SignalEvent( 346 + did: $did, 347 + timeUs: $timeUs, 348 + kind: 'account', 349 + account: $accountEvent 350 + ); 351 + 352 + // Dispatch event with cursor update and logging 353 + $this->dispatchSignalEvent($event, 'Account', [ 354 + 'did' => $did, 355 + 'active' => $active, 356 + 'status' => $status, 357 + ]); 277 358 } 278 359 279 360 /**