Build Reactive Signals for Bluesky's AT Protocol Firehose in Laravel
1# Real-World Examples 2 3Learn from production-ready Signal examples covering common use cases. 4 5## Social Media Analytics 6 7Track engagement metrics across Bluesky. 8 9```php 10<?php 11 12namespace App\Signals; 13 14use App\Models\EngagementMetric; 15use SocialDept\AtpSignals\Enums\SignalCommitOperation; 16use SocialDept\AtpSignals\Events\SignalEvent; 17use SocialDept\AtpSignals\Signals\Signal; 18use Illuminate\Support\Facades\DB; 19 20class EngagementTrackerSignal extends Signal 21{ 22 public function eventTypes(): array 23 { 24 return ['commit']; 25 } 26 27 public function collections(): ?array 28 { 29 return [ 30 'app.bsky.feed.post', 31 'app.bsky.feed.like', 32 'app.bsky.feed.repost', 33 'app.bsky.graph.follow', 34 ]; 35 } 36 37 public function operations(): ?array 38 { 39 return [SignalCommitOperation::Create]; 40 } 41 42 public function shouldQueue(): bool 43 { 44 return true; 45 } 46 47 public function handle(SignalEvent $event): void 48 { 49 $collection = $event->getCollection(); 50 $timestamp = $event->getTimestamp(); 51 52 // Increment counter for this hour 53 DB::table('engagement_metrics') 54 ->updateOrInsert( 55 [ 56 'collection' => $collection, 57 'hour' => $timestamp->startOfHour(), 58 ], 59 [ 60 'count' => DB::raw('count + 1'), 61 'updated_at' => now(), 62 ] 63 ); 64 } 65} 66``` 67 68**Use case:** Build analytics dashboards showing posts/hour, likes/hour, follows/hour. 69 70## Content Moderation 71 72Automatically flag problematic content. 73 74```php 75<?php 76 77namespace App\Signals; 78 79use App\Models\FlaggedPost; 80use App\Services\ModerationService; 81use SocialDept\AtpSignals\Enums\SignalCommitOperation; 82use SocialDept\AtpSignals\Events\SignalEvent; 83use SocialDept\AtpSignals\Signals\Signal; 84 85class ModerationSignal extends Signal 86{ 87 public function __construct( 88 private ModerationService $moderation 89 ) {} 90 91 public function eventTypes(): array 92 { 93 return ['commit']; 94 } 95 96 public function collections(): ?array 97 { 98 return ['app.bsky.feed.post']; 99 } 100 101 public function operations(): ?array 102 { 103 return [SignalCommitOperation::Create]; 104 } 105 106 public function shouldQueue(): bool 107 { 108 return true; 109 } 110 111 public function queue(): string 112 { 113 return 'moderation'; 114 } 115 116 public function handle(SignalEvent $event): void 117 { 118 $record = $event->getRecord(); 119 120 if (!isset($record->text)) { 121 return; 122 } 123 124 $result = $this->moderation->analyze($record->text); 125 126 if ($result->needsReview) { 127 FlaggedPost::create([ 128 'did' => $event->did, 129 'rkey' => $event->commit->rkey, 130 'text' => $record->text, 131 'reason' => $result->reason, 132 'confidence' => $result->confidence, 133 'flagged_at' => now(), 134 ]); 135 } 136 } 137 138 public function failed(SignalEvent $event, \Throwable $exception): void 139 { 140 Log::error('Moderation signal failed', [ 141 'did' => $event->did, 142 'error' => $exception->getMessage(), 143 ]); 144 } 145} 146``` 147 148**Use case:** Automated content moderation with human review queue. 149 150## User Activity Feed 151 152Build a personalized activity feed. 153 154```php 155<?php 156 157namespace App\Signals; 158 159use App\Models\Activity; 160use App\Models\User; 161use SocialDept\AtpSignals\Enums\SignalCommitOperation; 162use SocialDept\AtpSignals\Events\SignalEvent; 163use SocialDept\AtpSignals\Signals\Signal; 164 165class ActivityFeedSignal extends Signal 166{ 167 public function eventTypes(): array 168 { 169 return ['commit']; 170 } 171 172 public function collections(): ?array 173 { 174 return [ 175 'app.bsky.feed.post', 176 'app.bsky.feed.like', 177 'app.bsky.feed.repost', 178 ]; 179 } 180 181 public function operations(): ?array 182 { 183 return [SignalCommitOperation::Create]; 184 } 185 186 public function shouldQueue(): bool 187 { 188 return true; 189 } 190 191 public function handle(SignalEvent $event): void 192 { 193 // Check if we're tracking this user 194 $user = User::where('did', $event->did)->first(); 195 196 if (!$user) { 197 return; 198 } 199 200 // Check if any followers want to see this 201 $followerIds = $user->followers()->pluck('id'); 202 203 if ($followerIds->isEmpty()) { 204 return; 205 } 206 207 $collection = $event->getCollection(); 208 209 // Create activity for each follower's feed 210 foreach ($followerIds as $followerId) { 211 Activity::create([ 212 'user_id' => $followerId, 213 'actor_did' => $event->did, 214 'type' => $this->getActivityType($collection), 215 'data' => $event->toArray(), 216 'created_at' => $event->getTimestamp(), 217 ]); 218 } 219 } 220 221 private function getActivityType(string $collection): string 222 { 223 return match ($collection) { 224 'app.bsky.feed.post' => 'post', 225 'app.bsky.feed.like' => 'like', 226 'app.bsky.feed.repost' => 'repost', 227 default => 'unknown', 228 }; 229 } 230} 231``` 232 233**Use case:** Show users activity from people they follow. 234 235## Real-Time Notifications 236 237Send notifications for mentions and interactions. 238 239```php 240<?php 241 242namespace App\Signals; 243 244use App\Models\User; 245use App\Notifications\MentionedInPost; 246use SocialDept\AtpSignals\Enums\SignalCommitOperation; 247use SocialDept\AtpSignals\Events\SignalEvent; 248use SocialDept\AtpSignals\Signals\Signal; 249 250class MentionNotificationSignal extends Signal 251{ 252 public function eventTypes(): array 253 { 254 return ['commit']; 255 } 256 257 public function collections(): ?array 258 { 259 return ['app.bsky.feed.post']; 260 } 261 262 public function operations(): ?array 263 { 264 return [SignalCommitOperation::Create]; 265 } 266 267 public function shouldQueue(): bool 268 { 269 return true; 270 } 271 272 public function handle(SignalEvent $event): void 273 { 274 $record = $event->getRecord(); 275 276 if (!isset($record->facets)) { 277 return; 278 } 279 280 // Extract mentions from facets 281 $mentions = collect($record->facets) 282 ->filter(fn($facet) => isset($facet->features)) 283 ->flatMap(fn($facet) => $facet->features) 284 ->filter(fn($feature) => $feature->{'$type'} === 'app.bsky.richtext.facet#mention') 285 ->pluck('did') 286 ->unique(); 287 288 foreach ($mentions as $mentionedDid) { 289 $user = User::where('did', $mentionedDid)->first(); 290 291 if ($user) { 292 $user->notify(new MentionedInPost( 293 authorDid: $event->did, 294 text: $record->text ?? '', 295 uri: "at://{$event->did}/app.bsky.feed.post/{$event->commit->rkey}" 296 )); 297 } 298 } 299 } 300} 301``` 302 303**Use case:** Real-time notifications when users are mentioned. 304 305## Follow Tracker 306 307Track follow relationships and send notifications. 308 309```php 310<?php 311 312namespace App\Signals; 313 314use App\Models\Follow; 315use App\Models\User; 316use App\Notifications\NewFollower; 317use SocialDept\AtpSignals\Enums\SignalCommitOperation; 318use SocialDept\AtpSignals\Events\SignalEvent; 319use SocialDept\AtpSignals\Signals\Signal; 320 321class FollowTrackerSignal extends Signal 322{ 323 public function eventTypes(): array 324 { 325 return ['commit']; 326 } 327 328 public function collections(): ?array 329 { 330 return ['app.bsky.graph.follow']; 331 } 332 333 public function operations(): ?array 334 { 335 return [ 336 SignalCommitOperation::Create, 337 SignalCommitOperation::Delete, 338 ]; 339 } 340 341 public function shouldQueue(): bool 342 { 343 return true; 344 } 345 346 public function handle(SignalEvent $event): void 347 { 348 $record = $event->getRecord(); 349 $operation = $event->getOperation(); 350 351 if ($operation === SignalCommitOperation::Create) { 352 $this->handleNewFollow($event, $record); 353 } else { 354 $this->handleUnfollow($event); 355 } 356 } 357 358 private function handleNewFollow(SignalEvent $event, object $record): void 359 { 360 Follow::create([ 361 'follower_did' => $event->did, 362 'following_did' => $record->subject, 363 'created_at' => $record->createdAt ?? now(), 364 ]); 365 366 // Notify the followed user 367 $followedUser = User::where('did', $record->subject)->first(); 368 369 if ($followedUser) { 370 $followedUser->notify(new NewFollower($event->did)); 371 } 372 } 373 374 private function handleUnfollow(SignalEvent $event): void 375 { 376 Follow::where('follower_did', $event->did) 377 ->where('rkey', $event->commit->rkey) 378 ->delete(); 379 } 380} 381``` 382 383**Use case:** Track follows and notify users of new followers. 384 385## Search Indexer 386 387Index posts for full-text search. 388 389```php 390<?php 391 392namespace App\Signals; 393 394use App\Models\Post; 395use Laravel\Scout\Searchable; 396use SocialDept\AtpSignals\Enums\SignalCommitOperation; 397use SocialDept\AtpSignals\Events\SignalEvent; 398use SocialDept\AtpSignals\Signals\Signal; 399 400class SearchIndexerSignal extends Signal 401{ 402 public function eventTypes(): array 403 { 404 return ['commit']; 405 } 406 407 public function collections(): ?array 408 { 409 return ['app.bsky.feed.post']; 410 } 411 412 public function shouldQueue(): bool 413 { 414 return true; 415 } 416 417 public function queue(): string 418 { 419 return 'indexing'; 420 } 421 422 public function handle(SignalEvent $event): void 423 { 424 $operation = $event->getOperation(); 425 426 match ($operation) { 427 SignalCommitOperation::Create, 428 SignalCommitOperation::Update => $this->indexPost($event), 429 SignalCommitOperation::Delete => $this->deletePost($event), 430 }; 431 } 432 433 private function indexPost(SignalEvent $event): void 434 { 435 $record = $event->getRecord(); 436 437 $post = Post::updateOrCreate( 438 [ 439 'did' => $event->did, 440 'rkey' => $event->commit->rkey, 441 ], 442 [ 443 'text' => $record->text ?? '', 444 'created_at' => $record->createdAt ?? now(), 445 'indexed_at' => now(), 446 ] 447 ); 448 449 // Scout automatically indexes 450 $post->searchable(); 451 } 452 453 private function deletePost(SignalEvent $event): void 454 { 455 $post = Post::where('did', $event->did) 456 ->where('rkey', $event->commit->rkey) 457 ->first(); 458 459 if ($post) { 460 $post->unsearchable(); 461 $post->delete(); 462 } 463 } 464} 465``` 466 467**Use case:** Full-text search across all Bluesky posts. 468 469## Trend Detection 470 471Identify trending topics and hashtags. 472 473```php 474<?php 475 476namespace App\Signals; 477 478use App\Models\TrendingTopic; 479use Illuminate\Support\Facades\Cache; 480use SocialDept\AtpSignals\Enums\SignalCommitOperation; 481use SocialDept\AtpSignals\Events\SignalEvent; 482use SocialDept\AtpSignals\Signals\Signal; 483 484class TrendDetectionSignal extends Signal 485{ 486 public function eventTypes(): array 487 { 488 return ['commit']; 489 } 490 491 public function collections(): ?array 492 { 493 return ['app.bsky.feed.post']; 494 } 495 496 public function operations(): ?array 497 { 498 return [SignalCommitOperation::Create]; 499 } 500 501 public function shouldQueue(): bool 502 { 503 return true; 504 } 505 506 public function handle(SignalEvent $event): void 507 { 508 $record = $event->getRecord(); 509 510 if (!isset($record->text)) { 511 return; 512 } 513 514 // Extract hashtags 515 preg_match_all('/#(\w+)/', $record->text, $matches); 516 517 foreach ($matches[1] as $hashtag) { 518 $this->incrementHashtag($hashtag); 519 } 520 } 521 522 private function incrementHashtag(string $hashtag): void 523 { 524 $key = "trending:hashtag:{$hashtag}"; 525 526 // Increment counter (expires after 1 hour) 527 $count = Cache::increment($key, 1); 528 529 if (!Cache::has($key)) { 530 Cache::put($key, 1, now()->addHour()); 531 } 532 533 // Update trending topics if threshold reached 534 if ($count > 100) { 535 TrendingTopic::updateOrCreate( 536 ['hashtag' => $hashtag], 537 ['count' => $count, 'updated_at' => now()] 538 ); 539 } 540 } 541} 542``` 543 544**Use case:** Identify trending hashtags and topics in real-time. 545 546## Custom AppView 547 548Index custom collections for your AppView. 549 550```php 551<?php 552 553namespace App\Signals; 554 555use App\Models\Publication; 556use SocialDept\AtpSignals\Enums\SignalCommitOperation; 557use SocialDept\AtpSignals\Events\SignalEvent; 558use SocialDept\AtpSignals\Signals\Signal; 559 560class PublicationIndexerSignal extends Signal 561{ 562 public function eventTypes(): array 563 { 564 return ['commit']; 565 } 566 567 public function collections(): ?array 568 { 569 return [ 570 'app.offprint.beta.publication', 571 'app.offprint.beta.post', 572 ]; 573 } 574 575 public function shouldQueue(): bool 576 { 577 return true; 578 } 579 580 public function handle(SignalEvent $event): void 581 { 582 $collection = $event->getCollection(); 583 $operation = $event->getOperation(); 584 585 if ($collection === 'app.offprint.beta.publication') { 586 $this->handlePublication($event, $operation); 587 } else { 588 $this->handlePost($event, $operation); 589 } 590 } 591 592 private function handlePublication(SignalEvent $event, SignalCommitOperation $operation): void 593 { 594 if ($operation === SignalCommitOperation::Delete) { 595 Publication::where('did', $event->did) 596 ->where('rkey', $event->commit->rkey) 597 ->delete(); 598 return; 599 } 600 601 $record = $event->getRecord(); 602 603 Publication::updateOrCreate( 604 [ 605 'did' => $event->did, 606 'rkey' => $event->commit->rkey, 607 ], 608 [ 609 'title' => $record->title ?? '', 610 'description' => $record->description ?? null, 611 'created_at' => $record->createdAt ?? now(), 612 ] 613 ); 614 } 615 616 private function handlePost(SignalEvent $event, SignalCommitOperation $operation): void 617 { 618 // Handle custom post records 619 } 620} 621``` 622 623**Use case:** Build AT Protocol AppViews with custom collections. 624 625## Rate-Limited API Integration 626 627Integrate with external APIs respecting rate limits. 628 629```php 630<?php 631 632namespace App\Signals; 633 634use App\Services\ExternalAPIService; 635use Illuminate\Support\Facades\RateLimiter; 636use SocialDept\AtpSignals\Events\SignalEvent; 637use SocialDept\AtpSignals\Signals\Signal; 638 639class APIIntegrationSignal extends Signal 640{ 641 public function __construct( 642 private ExternalAPIService $api 643 ) {} 644 645 public function eventTypes(): array 646 { 647 return ['commit']; 648 } 649 650 public function collections(): ?array 651 { 652 return ['app.bsky.feed.post']; 653 } 654 655 public function shouldQueue(): bool 656 { 657 return true; 658 } 659 660 public function handle(SignalEvent $event): void 661 { 662 $record = $event->getRecord(); 663 664 // Rate limit: 100 calls per minute 665 $executed = RateLimiter::attempt( 666 'external-api', 667 $perMinute = 100, 668 function () use ($event, $record) { 669 $this->api->sendPost([ 670 'author' => $event->did, 671 'text' => $record->text ?? '', 672 'timestamp' => $event->getTimestamp(), 673 ]); 674 } 675 ); 676 677 if (!$executed) { 678 // Re-queue for later 679 dispatch(fn() => $this->handle($event)) 680 ->delay(now()->addMinutes(1)); 681 } 682 } 683} 684``` 685 686**Use case:** Mirror content to external platforms with rate limiting. 687 688## Multi-Collection Analytics 689 690Track engagement across multiple collection types. 691 692```php 693<?php 694 695namespace App\Signals; 696 697use App\Models\UserMetrics; 698use SocialDept\AtpSignals\Enums\SignalCommitOperation; 699use SocialDept\AtpSignals\Events\SignalEvent; 700use SocialDept\AtpSignals\Signals\Signal; 701 702class UserMetricsSignal extends Signal 703{ 704 public function eventTypes(): array 705 { 706 return ['commit']; 707 } 708 709 public function collections(): ?array 710 { 711 return ['app.bsky.feed.*', 'app.bsky.graph.*']; 712 } 713 714 public function operations(): ?array 715 { 716 return [SignalCommitOperation::Create]; 717 } 718 719 public function shouldQueue(): bool 720 { 721 return true; 722 } 723 724 public function handle(SignalEvent $event): void 725 { 726 $collection = $event->getCollection(); 727 728 $metrics = UserMetrics::firstOrCreate( 729 ['did' => $event->did], 730 ['total_posts' => 0, 'total_likes' => 0, 'total_follows' => 0] 731 ); 732 733 match ($collection) { 734 'app.bsky.feed.post' => $metrics->increment('total_posts'), 735 'app.bsky.feed.like' => $metrics->increment('total_likes'), 736 'app.bsky.graph.follow' => $metrics->increment('total_follows'), 737 default => null, 738 }; 739 740 $metrics->touch('last_activity_at'); 741 } 742} 743``` 744 745**Use case:** User activity metrics and leaderboards. 746 747## Performance Tips 748 749### Batch Database Operations 750 751```php 752public function handle(SignalEvent $event): void 753{ 754 // Bad - individual inserts 755 Post::create([...]); 756 757 // Good - batch inserts 758 $posts = Cache::get('pending_posts', []); 759 $posts[] = [...]; 760 761 if (count($posts) >= 100) { 762 Post::insert($posts); 763 Cache::forget('pending_posts'); 764 } else { 765 Cache::put('pending_posts', $posts, now()->addMinutes(5)); 766 } 767} 768``` 769 770### Use Queues for Heavy Operations 771 772```php 773public function shouldQueue(): bool 774{ 775 // Queue if operation takes > 100ms 776 return true; 777} 778``` 779 780### Add Indexes for Filtering 781 782```php 783// Migration for fast lookups 784Schema::table('posts', function (Blueprint $table) { 785 $table->index(['did', 'rkey']); 786 $table->index('created_at'); 787}); 788``` 789 790## Next Steps 791 792- **[Review signal architecture →](signals.md)** - Understand Signal structure 793- **[Learn about filtering →](filtering.md)** - Master event filtering 794- **[Explore queue integration →](queues.md)** - Build high-performance Signals 795- **[Configure your setup →](configuration.md)** - Optimize configuration