Build Reactive Signals for Bluesky's AT Protocol Firehose in Laravel
1# Real-World Examples
2
3Learn from production-ready Signal examples covering common use cases.
4
5## Social Media Analytics
6
7Track engagement metrics across Bluesky.
8
9```php
10<?php
11
12namespace App\Signals;
13
14use App\Models\EngagementMetric;
15use SocialDept\AtpSignals\Enums\SignalCommitOperation;
16use SocialDept\AtpSignals\Events\SignalEvent;
17use SocialDept\AtpSignals\Signals\Signal;
18use Illuminate\Support\Facades\DB;
19
20class EngagementTrackerSignal extends Signal
21{
22 public function eventTypes(): array
23 {
24 return ['commit'];
25 }
26
27 public function collections(): ?array
28 {
29 return [
30 'app.bsky.feed.post',
31 'app.bsky.feed.like',
32 'app.bsky.feed.repost',
33 'app.bsky.graph.follow',
34 ];
35 }
36
37 public function operations(): ?array
38 {
39 return [SignalCommitOperation::Create];
40 }
41
42 public function shouldQueue(): bool
43 {
44 return true;
45 }
46
47 public function handle(SignalEvent $event): void
48 {
49 $collection = $event->getCollection();
50 $timestamp = $event->getTimestamp();
51
52 // Increment counter for this hour
53 DB::table('engagement_metrics')
54 ->updateOrInsert(
55 [
56 'collection' => $collection,
57 'hour' => $timestamp->startOfHour(),
58 ],
59 [
60 'count' => DB::raw('count + 1'),
61 'updated_at' => now(),
62 ]
63 );
64 }
65}
66```
67
68**Use case:** Build analytics dashboards showing posts/hour, likes/hour, follows/hour.
69
70## Content Moderation
71
72Automatically flag problematic content.
73
74```php
75<?php
76
77namespace App\Signals;
78
79use App\Models\FlaggedPost;
80use App\Services\ModerationService;
81use SocialDept\AtpSignals\Enums\SignalCommitOperation;
82use SocialDept\AtpSignals\Events\SignalEvent;
83use SocialDept\AtpSignals\Signals\Signal;
84
85class ModerationSignal extends Signal
86{
87 public function __construct(
88 private ModerationService $moderation
89 ) {}
90
91 public function eventTypes(): array
92 {
93 return ['commit'];
94 }
95
96 public function collections(): ?array
97 {
98 return ['app.bsky.feed.post'];
99 }
100
101 public function operations(): ?array
102 {
103 return [SignalCommitOperation::Create];
104 }
105
106 public function shouldQueue(): bool
107 {
108 return true;
109 }
110
111 public function queue(): string
112 {
113 return 'moderation';
114 }
115
116 public function handle(SignalEvent $event): void
117 {
118 $record = $event->getRecord();
119
120 if (!isset($record->text)) {
121 return;
122 }
123
124 $result = $this->moderation->analyze($record->text);
125
126 if ($result->needsReview) {
127 FlaggedPost::create([
128 'did' => $event->did,
129 'rkey' => $event->commit->rkey,
130 'text' => $record->text,
131 'reason' => $result->reason,
132 'confidence' => $result->confidence,
133 'flagged_at' => now(),
134 ]);
135 }
136 }
137
138 public function failed(SignalEvent $event, \Throwable $exception): void
139 {
140 Log::error('Moderation signal failed', [
141 'did' => $event->did,
142 'error' => $exception->getMessage(),
143 ]);
144 }
145}
146```
147
148**Use case:** Automated content moderation with human review queue.
149
150## User Activity Feed
151
152Build a personalized activity feed.
153
154```php
155<?php
156
157namespace App\Signals;
158
159use App\Models\Activity;
160use App\Models\User;
161use SocialDept\AtpSignals\Enums\SignalCommitOperation;
162use SocialDept\AtpSignals\Events\SignalEvent;
163use SocialDept\AtpSignals\Signals\Signal;
164
165class ActivityFeedSignal extends Signal
166{
167 public function eventTypes(): array
168 {
169 return ['commit'];
170 }
171
172 public function collections(): ?array
173 {
174 return [
175 'app.bsky.feed.post',
176 'app.bsky.feed.like',
177 'app.bsky.feed.repost',
178 ];
179 }
180
181 public function operations(): ?array
182 {
183 return [SignalCommitOperation::Create];
184 }
185
186 public function shouldQueue(): bool
187 {
188 return true;
189 }
190
191 public function handle(SignalEvent $event): void
192 {
193 // Check if we're tracking this user
194 $user = User::where('did', $event->did)->first();
195
196 if (!$user) {
197 return;
198 }
199
200 // Check if any followers want to see this
201 $followerIds = $user->followers()->pluck('id');
202
203 if ($followerIds->isEmpty()) {
204 return;
205 }
206
207 $collection = $event->getCollection();
208
209 // Create activity for each follower's feed
210 foreach ($followerIds as $followerId) {
211 Activity::create([
212 'user_id' => $followerId,
213 'actor_did' => $event->did,
214 'type' => $this->getActivityType($collection),
215 'data' => $event->toArray(),
216 'created_at' => $event->getTimestamp(),
217 ]);
218 }
219 }
220
221 private function getActivityType(string $collection): string
222 {
223 return match ($collection) {
224 'app.bsky.feed.post' => 'post',
225 'app.bsky.feed.like' => 'like',
226 'app.bsky.feed.repost' => 'repost',
227 default => 'unknown',
228 };
229 }
230}
231```
232
233**Use case:** Show users activity from people they follow.
234
235## Real-Time Notifications
236
237Send notifications for mentions and interactions.
238
239```php
240<?php
241
242namespace App\Signals;
243
244use App\Models\User;
245use App\Notifications\MentionedInPost;
246use SocialDept\AtpSignals\Enums\SignalCommitOperation;
247use SocialDept\AtpSignals\Events\SignalEvent;
248use SocialDept\AtpSignals\Signals\Signal;
249
250class MentionNotificationSignal extends Signal
251{
252 public function eventTypes(): array
253 {
254 return ['commit'];
255 }
256
257 public function collections(): ?array
258 {
259 return ['app.bsky.feed.post'];
260 }
261
262 public function operations(): ?array
263 {
264 return [SignalCommitOperation::Create];
265 }
266
267 public function shouldQueue(): bool
268 {
269 return true;
270 }
271
272 public function handle(SignalEvent $event): void
273 {
274 $record = $event->getRecord();
275
276 if (!isset($record->facets)) {
277 return;
278 }
279
280 // Extract mentions from facets
281 $mentions = collect($record->facets)
282 ->filter(fn($facet) => isset($facet->features))
283 ->flatMap(fn($facet) => $facet->features)
284 ->filter(fn($feature) => $feature->{'$type'} === 'app.bsky.richtext.facet#mention')
285 ->pluck('did')
286 ->unique();
287
288 foreach ($mentions as $mentionedDid) {
289 $user = User::where('did', $mentionedDid)->first();
290
291 if ($user) {
292 $user->notify(new MentionedInPost(
293 authorDid: $event->did,
294 text: $record->text ?? '',
295 uri: "at://{$event->did}/app.bsky.feed.post/{$event->commit->rkey}"
296 ));
297 }
298 }
299 }
300}
301```
302
303**Use case:** Real-time notifications when users are mentioned.
304
305## Follow Tracker
306
307Track follow relationships and send notifications.
308
309```php
310<?php
311
312namespace App\Signals;
313
314use App\Models\Follow;
315use App\Models\User;
316use App\Notifications\NewFollower;
317use SocialDept\AtpSignals\Enums\SignalCommitOperation;
318use SocialDept\AtpSignals\Events\SignalEvent;
319use SocialDept\AtpSignals\Signals\Signal;
320
321class FollowTrackerSignal extends Signal
322{
323 public function eventTypes(): array
324 {
325 return ['commit'];
326 }
327
328 public function collections(): ?array
329 {
330 return ['app.bsky.graph.follow'];
331 }
332
333 public function operations(): ?array
334 {
335 return [
336 SignalCommitOperation::Create,
337 SignalCommitOperation::Delete,
338 ];
339 }
340
341 public function shouldQueue(): bool
342 {
343 return true;
344 }
345
346 public function handle(SignalEvent $event): void
347 {
348 $record = $event->getRecord();
349 $operation = $event->getOperation();
350
351 if ($operation === SignalCommitOperation::Create) {
352 $this->handleNewFollow($event, $record);
353 } else {
354 $this->handleUnfollow($event);
355 }
356 }
357
358 private function handleNewFollow(SignalEvent $event, object $record): void
359 {
360 Follow::create([
361 'follower_did' => $event->did,
362 'following_did' => $record->subject,
363 'created_at' => $record->createdAt ?? now(),
364 ]);
365
366 // Notify the followed user
367 $followedUser = User::where('did', $record->subject)->first();
368
369 if ($followedUser) {
370 $followedUser->notify(new NewFollower($event->did));
371 }
372 }
373
374 private function handleUnfollow(SignalEvent $event): void
375 {
376 Follow::where('follower_did', $event->did)
377 ->where('rkey', $event->commit->rkey)
378 ->delete();
379 }
380}
381```
382
383**Use case:** Track follows and notify users of new followers.
384
385## Search Indexer
386
387Index posts for full-text search.
388
389```php
390<?php
391
392namespace App\Signals;
393
394use App\Models\Post;
395use Laravel\Scout\Searchable;
396use SocialDept\AtpSignals\Enums\SignalCommitOperation;
397use SocialDept\AtpSignals\Events\SignalEvent;
398use SocialDept\AtpSignals\Signals\Signal;
399
400class SearchIndexerSignal extends Signal
401{
402 public function eventTypes(): array
403 {
404 return ['commit'];
405 }
406
407 public function collections(): ?array
408 {
409 return ['app.bsky.feed.post'];
410 }
411
412 public function shouldQueue(): bool
413 {
414 return true;
415 }
416
417 public function queue(): string
418 {
419 return 'indexing';
420 }
421
422 public function handle(SignalEvent $event): void
423 {
424 $operation = $event->getOperation();
425
426 match ($operation) {
427 SignalCommitOperation::Create,
428 SignalCommitOperation::Update => $this->indexPost($event),
429 SignalCommitOperation::Delete => $this->deletePost($event),
430 };
431 }
432
433 private function indexPost(SignalEvent $event): void
434 {
435 $record = $event->getRecord();
436
437 $post = Post::updateOrCreate(
438 [
439 'did' => $event->did,
440 'rkey' => $event->commit->rkey,
441 ],
442 [
443 'text' => $record->text ?? '',
444 'created_at' => $record->createdAt ?? now(),
445 'indexed_at' => now(),
446 ]
447 );
448
449 // Scout automatically indexes
450 $post->searchable();
451 }
452
453 private function deletePost(SignalEvent $event): void
454 {
455 $post = Post::where('did', $event->did)
456 ->where('rkey', $event->commit->rkey)
457 ->first();
458
459 if ($post) {
460 $post->unsearchable();
461 $post->delete();
462 }
463 }
464}
465```
466
467**Use case:** Full-text search across all Bluesky posts.
468
469## Trend Detection
470
471Identify trending topics and hashtags.
472
473```php
474<?php
475
476namespace App\Signals;
477
478use App\Models\TrendingTopic;
479use Illuminate\Support\Facades\Cache;
480use SocialDept\AtpSignals\Enums\SignalCommitOperation;
481use SocialDept\AtpSignals\Events\SignalEvent;
482use SocialDept\AtpSignals\Signals\Signal;
483
484class TrendDetectionSignal extends Signal
485{
486 public function eventTypes(): array
487 {
488 return ['commit'];
489 }
490
491 public function collections(): ?array
492 {
493 return ['app.bsky.feed.post'];
494 }
495
496 public function operations(): ?array
497 {
498 return [SignalCommitOperation::Create];
499 }
500
501 public function shouldQueue(): bool
502 {
503 return true;
504 }
505
506 public function handle(SignalEvent $event): void
507 {
508 $record = $event->getRecord();
509
510 if (!isset($record->text)) {
511 return;
512 }
513
514 // Extract hashtags
515 preg_match_all('/#(\w+)/', $record->text, $matches);
516
517 foreach ($matches[1] as $hashtag) {
518 $this->incrementHashtag($hashtag);
519 }
520 }
521
522 private function incrementHashtag(string $hashtag): void
523 {
524 $key = "trending:hashtag:{$hashtag}";
525
526 // Increment counter (expires after 1 hour)
527 $count = Cache::increment($key, 1);
528
529 if (!Cache::has($key)) {
530 Cache::put($key, 1, now()->addHour());
531 }
532
533 // Update trending topics if threshold reached
534 if ($count > 100) {
535 TrendingTopic::updateOrCreate(
536 ['hashtag' => $hashtag],
537 ['count' => $count, 'updated_at' => now()]
538 );
539 }
540 }
541}
542```
543
544**Use case:** Identify trending hashtags and topics in real-time.
545
546## Custom AppView
547
548Index custom collections for your AppView.
549
550```php
551<?php
552
553namespace App\Signals;
554
555use App\Models\Publication;
556use SocialDept\AtpSignals\Enums\SignalCommitOperation;
557use SocialDept\AtpSignals\Events\SignalEvent;
558use SocialDept\AtpSignals\Signals\Signal;
559
560class PublicationIndexerSignal extends Signal
561{
562 public function eventTypes(): array
563 {
564 return ['commit'];
565 }
566
567 public function collections(): ?array
568 {
569 return [
570 'app.offprint.beta.publication',
571 'app.offprint.beta.post',
572 ];
573 }
574
575 public function shouldQueue(): bool
576 {
577 return true;
578 }
579
580 public function handle(SignalEvent $event): void
581 {
582 $collection = $event->getCollection();
583 $operation = $event->getOperation();
584
585 if ($collection === 'app.offprint.beta.publication') {
586 $this->handlePublication($event, $operation);
587 } else {
588 $this->handlePost($event, $operation);
589 }
590 }
591
592 private function handlePublication(SignalEvent $event, SignalCommitOperation $operation): void
593 {
594 if ($operation === SignalCommitOperation::Delete) {
595 Publication::where('did', $event->did)
596 ->where('rkey', $event->commit->rkey)
597 ->delete();
598 return;
599 }
600
601 $record = $event->getRecord();
602
603 Publication::updateOrCreate(
604 [
605 'did' => $event->did,
606 'rkey' => $event->commit->rkey,
607 ],
608 [
609 'title' => $record->title ?? '',
610 'description' => $record->description ?? null,
611 'created_at' => $record->createdAt ?? now(),
612 ]
613 );
614 }
615
616 private function handlePost(SignalEvent $event, SignalCommitOperation $operation): void
617 {
618 // Handle custom post records
619 }
620}
621```
622
623**Use case:** Build AT Protocol AppViews with custom collections.
624
625## Rate-Limited API Integration
626
627Integrate with external APIs respecting rate limits.
628
629```php
630<?php
631
632namespace App\Signals;
633
634use App\Services\ExternalAPIService;
635use Illuminate\Support\Facades\RateLimiter;
636use SocialDept\AtpSignals\Events\SignalEvent;
637use SocialDept\AtpSignals\Signals\Signal;
638
639class APIIntegrationSignal extends Signal
640{
641 public function __construct(
642 private ExternalAPIService $api
643 ) {}
644
645 public function eventTypes(): array
646 {
647 return ['commit'];
648 }
649
650 public function collections(): ?array
651 {
652 return ['app.bsky.feed.post'];
653 }
654
655 public function shouldQueue(): bool
656 {
657 return true;
658 }
659
660 public function handle(SignalEvent $event): void
661 {
662 $record = $event->getRecord();
663
664 // Rate limit: 100 calls per minute
665 $executed = RateLimiter::attempt(
666 'external-api',
667 $perMinute = 100,
668 function () use ($event, $record) {
669 $this->api->sendPost([
670 'author' => $event->did,
671 'text' => $record->text ?? '',
672 'timestamp' => $event->getTimestamp(),
673 ]);
674 }
675 );
676
677 if (!$executed) {
678 // Re-queue for later
679 dispatch(fn() => $this->handle($event))
680 ->delay(now()->addMinutes(1));
681 }
682 }
683}
684```
685
686**Use case:** Mirror content to external platforms with rate limiting.
687
688## Multi-Collection Analytics
689
690Track engagement across multiple collection types.
691
692```php
693<?php
694
695namespace App\Signals;
696
697use App\Models\UserMetrics;
698use SocialDept\AtpSignals\Enums\SignalCommitOperation;
699use SocialDept\AtpSignals\Events\SignalEvent;
700use SocialDept\AtpSignals\Signals\Signal;
701
702class UserMetricsSignal extends Signal
703{
704 public function eventTypes(): array
705 {
706 return ['commit'];
707 }
708
709 public function collections(): ?array
710 {
711 return ['app.bsky.feed.*', 'app.bsky.graph.*'];
712 }
713
714 public function operations(): ?array
715 {
716 return [SignalCommitOperation::Create];
717 }
718
719 public function shouldQueue(): bool
720 {
721 return true;
722 }
723
724 public function handle(SignalEvent $event): void
725 {
726 $collection = $event->getCollection();
727
728 $metrics = UserMetrics::firstOrCreate(
729 ['did' => $event->did],
730 ['total_posts' => 0, 'total_likes' => 0, 'total_follows' => 0]
731 );
732
733 match ($collection) {
734 'app.bsky.feed.post' => $metrics->increment('total_posts'),
735 'app.bsky.feed.like' => $metrics->increment('total_likes'),
736 'app.bsky.graph.follow' => $metrics->increment('total_follows'),
737 default => null,
738 };
739
740 $metrics->touch('last_activity_at');
741 }
742}
743```
744
745**Use case:** User activity metrics and leaderboards.
746
747## Performance Tips
748
749### Batch Database Operations
750
751```php
752public function handle(SignalEvent $event): void
753{
754 // Bad - individual inserts
755 Post::create([...]);
756
757 // Good - batch inserts
758 $posts = Cache::get('pending_posts', []);
759 $posts[] = [...];
760
761 if (count($posts) >= 100) {
762 Post::insert($posts);
763 Cache::forget('pending_posts');
764 } else {
765 Cache::put('pending_posts', $posts, now()->addMinutes(5));
766 }
767}
768```
769
770### Use Queues for Heavy Operations
771
772```php
773public function shouldQueue(): bool
774{
775 // Queue if operation takes > 100ms
776 return true;
777}
778```
779
780### Add Indexes for Filtering
781
782```php
783// Migration for fast lookups
784Schema::table('posts', function (Blueprint $table) {
785 $table->index(['did', 'rkey']);
786 $table->index('created_at');
787});
788```
789
790## Next Steps
791
792- **[Review signal architecture →](signals.md)** - Understand Signal structure
793- **[Learn about filtering →](filtering.md)** - Master event filtering
794- **[Explore queue integration →](queues.md)** - Build high-performance Signals
795- **[Configure your setup →](configuration.md)** - Optimize configuration