Build Reactive Signals for Bluesky's AT Protocol Firehose in Laravel
PHP 98.3%
Other 1.7%
13 2 11

Clone this repository

https://tangled.org/socialde.pt/atp-signals
git@tangled.org:socialde.pt/atp-signals

For self-hosted knots, clone URLs may differ based on your setup.

README.md

Signal#

Laravel package for building Signals that respond to AT Protocol events

Signal provides a clean, Laravel-style interface for consuming real-time events from the AT Protocol. Supports both Jetstream (simplified JSON events) and Firehose (raw CBOR/CAR format) for maximum flexibility. Build reactive applications, AppViews, and custom indexers that respond to posts, likes, follows, and other social interactions on the AT Protocol network.


Features#

  • 🔄 Dual-Mode Support - Choose between Jetstream (JSON) or Firehose (CBOR/CAR) based on your needs
  • 🔌 WebSocket Connection - Connect to AT Protocol with automatic reconnection and exponential backoff
  • 🎯 Signal-based Architecture - Clean, testable event handlers (avoiding Laravel's "listener" naming collision)
  • Wildcard Collection Filtering - Match multiple collections with patterns like app.bsky.feed.*
  • 🏗️ AppView Ready - Full support for custom collections and building AT Protocol AppViews
  • 💾 Cursor Management - Resume from last position after disconnections (Database, Redis, or File storage)
  • Queue Integration - Process events asynchronously with Laravel queues
  • 🔍 Auto-Discovery - Automatically find and register Signals in app/Signals
  • 🧪 Testing Tools - Test your Signals with sample data
  • 🛠️ Artisan Commands - Full CLI support for managing and testing Signals

Table of Contents#


Installation#

Install the package via Composer:

composer require socialdept/signal

Run the installation command:

php artisan signal:install

This will:

  • Publish the configuration file to config/signal.php
  • Publish the database migration
  • Run migrations (with confirmation)
  • Display next steps

Manual Installation#

If you prefer manual installation:

php artisan vendor:publish --tag=signal-config
php artisan vendor:publish --tag=signal-migrations
php artisan migrate

Quick Start#

1. Create Your First Signal#

php artisan make:signal NewPostSignal

This creates app/Signals/NewPostSignal.php:

<?php

namespace App\Signals;

use SocialDept\Signal\Events\SignalEvent;
use SocialDept\Signal\Signals\Signal;

class NewPostSignal extends Signal
{
    public function eventTypes(): array
    {
        return ['commit'];
    }

    public function collections(): ?array
    {
        return ['app.bsky.feed.post'];
    }

    public function handle(SignalEvent $event): void
    {
        $record = $event->getRecord();

        logger()->info('New post created', [
            'did' => $event->did,
            'text' => $record->text ?? null,
        ]);
    }
}

2. Start Consuming Events#

php artisan signal:consume

Your Signal will now respond to new posts on the AT Protocol network in real-time!


Jetstream vs Firehose#

Signal supports two modes for consuming AT Protocol events. Choose based on your use case:

Jetstream Mode (Default)#

Best for: Standard Bluesky collections, production efficiency, lower bandwidth

php artisan signal:consume --mode=jetstream

Characteristics:

  • ✅ Simplified JSON events (easy to work with)
  • ✅ Server-side collection filtering (efficient)
  • ✅ Lower bandwidth and processing overhead
  • ⚠️ Only standard app.bsky.* collections get create/update operations
  • ⚠️ Custom collections only receive delete operations

Jetstream URL options:

  • US East: wss://jetstream2.us-east.bsky.network (default)
  • US West: wss://jetstream1.us-west.bsky.network

Firehose Mode#

Best for: Custom collections, AppViews, comprehensive indexing

php artisan signal:consume --mode=firehose

Characteristics:

  • All operations (create, update, delete) for all collections
  • ✅ Perfect for custom collections (e.g., app.yourapp.*.collection)
  • ✅ Full CBOR/CAR decoding with package revolution/laravel-bluesky
  • ⚠️ Client-side filtering only (higher bandwidth)
  • ⚠️ More processing overhead

When to use Firehose:

  • Building an AT Protocol AppView
  • Working with custom collections
  • Need create/update events for non-standard collections
  • Building comprehensive indexes

Configuration#

Set your preferred mode in .env:

# Use Jetstream (default)
SIGNAL_MODE=jetstream

# Or use Firehose for custom collections
SIGNAL_MODE=firehose

Example: Custom Collections#

If you're tracking custom collections like app.offprint.beta.publication, you must use Firehose mode:

class PublicationSignal extends Signal
{
    public function collections(): ?array
    {
        return ['app.offprint.beta.publication'];
    }

    public function handle(SignalEvent $event): void
    {
        // With Jetstream: Only sees deletes ❌
        // With Firehose: Sees creates, updates, deletes ✅
    }
}

Creating Signals#

Basic Signal Structure#

Every Signal extends the base Signal class and must implement:

use SocialDept\Signal\Enums\SignalEventType;
use SocialDept\Signal\Events\SignalEvent;
use SocialDept\Signal\Signals\Signal;

class MySignal extends Signal
{
    // Required: Define which event types to listen for
    public function eventTypes(): array
    {
        return [SignalEventType::Commit];

        // Or use strings:
        // return ['commit'];
    }

    // Required: Handle the event
    public function handle(SignalEvent $event): void
    {
        // Your logic here
    }
}

Enums vs Strings: Signal supports both typed enums and strings for better IDE support and type safety. Use whichever you prefer!

Event Types#

Three event types are available:

Enum String Description Use Cases
SignalEventType::Commit 'commit' Repository commits (posts, likes, follows, etc.) Content creation, social interactions
SignalEventType::Identity 'identity' Identity changes (handle updates) User profile tracking
SignalEventType::Account 'account' Account status changes Account monitoring

Accessing Event Data#

use SocialDept\Signal\Enums\SignalCommitOperation;

public function handle(SignalEvent $event): void
{
    // Common properties
    $did = $event->did;           // User's DID
    $kind = $event->kind;         // Event type
    $timestamp = $event->timeUs;  // Microsecond timestamp

    // Commit events
    if ($event->isCommit()) {
        $collection = $event->getCollection();  // e.g., 'app.bsky.feed.post'
        $operation = $event->getOperation();    // SignalCommitOperation enum
        $record = $event->getRecord();          // The actual record data
        $rkey = $event->commit->rkey;           // Record key

        // Use enum for type-safe comparisons
        if ($operation === SignalCommitOperation::Create) {
            // Handle new records
        }

        // Or get string value
        $operationString = $operation->value; // 'create', 'update', or 'delete'
    }

    // Identity events
    if ($event->isIdentity()) {
        $handle = $event->identity->handle;
    }

    // Account events
    if ($event->isAccount()) {
        $active = $event->account->active;
        $status = $event->account->status;
    }
}

Filtering Events#

Collection Filtering (with Wildcards!)#

Filter events by AT Protocol collection.

Important:

  • Jetstream mode: Exact collection names are sent as URL parameters for server-side filtering. Wildcards work for client-side filtering only.
  • Firehose mode: All filtering is client-side. Wildcards work normally.
// Exact match - only posts
public function collections(): ?array
{
    return ['app.bsky.feed.post'];
}

// Wildcard - all feed events
public function collections(): ?array
{
    return ['app.bsky.feed.*'];
}

// Multiple patterns
public function collections(): ?array
{
    return [
        'app.bsky.feed.post',
        'app.bsky.feed.repost',
        'app.bsky.graph.*',  // All graph collections
    ];
}

// No filter - all collections
public function collections(): ?array
{
    return null;
}

Common Collection Patterns#

Pattern Matches
app.bsky.feed.* Posts, likes, reposts, etc.
app.bsky.graph.* Follows, blocks, mutes
app.bsky.actor.* Profile updates
app.bsky.* All Bluesky collections

Operation Filtering#

Filter events by operation type (only applies to commit events):

use SocialDept\Signal\Enums\SignalCommitOperation;

// Only handle creates (using enum)
public function operations(): ?array
{
    return [SignalCommitOperation::Create];
}

// Only handle creates and updates (using enums)
public function operations(): ?array
{
    return [
        SignalCommitOperation::Create,
        SignalCommitOperation::Update,
    ];
}

// Only handle deletes (using string)
public function operations(): ?array
{
    return ['delete'];
}

// No filter - all operations (default)
public function operations(): ?array
{
    return null;
}

Available operations:

Enum String Description
SignalCommitOperation::Create 'create' New records created
SignalCommitOperation::Update 'update' Existing records modified
SignalCommitOperation::Delete 'delete' Records removed

Example use cases:

use SocialDept\Signal\Enums\SignalCommitOperation;

// Signal that only handles new posts (not edits)
class NewPostSignal extends Signal
{
    public function collections(): ?array
    {
        return ['app.bsky.feed.post'];
    }

    public function operations(): ?array
    {
        return [SignalCommitOperation::Create];
    }
}

// Signal that only handles content updates
class ContentUpdateSignal extends Signal
{
    public function collections(): ?array
    {
        return ['app.bsky.feed.post'];
    }

    public function operations(): ?array
    {
        return [SignalCommitOperation::Update];
    }
}

// Signal that handles deletions for cleanup
class CleanupSignal extends Signal
{
    public function collections(): ?array
    {
        return ['app.bsky.feed.*'];
    }

    public function operations(): ?array
    {
        return [SignalCommitOperation::Delete];
    }
}

DID Filtering#

Filter events by specific users:

public function dids(): ?array
{
    return [
        'did:plc:z72i7hdynmk6r22z27h6tvur',  // Specific user
        'did:plc:ragtjsm2j2vknwkz3zp4oxrd',  // Another user
    ];
}

Custom Filtering#

Add complex filtering logic:

public function shouldHandle(SignalEvent $event): bool
{
    // Only handle posts with images
    if ($event->isCommit() && $event->commit->collection === 'app.bsky.feed.post') {
        $record = $event->getRecord();
        return isset($record->embed);
    }

    return true;
}

Queue Integration#

Process events asynchronously using Laravel queues:

class HeavyProcessingSignal extends Signal
{
    public function eventTypes(): array
    {
        return ['commit'];
    }

    // Enable queueing
    public function shouldQueue(): bool
    {
        return true;
    }

    // Optional: Customize queue
    public function queue(): string
    {
        return 'high-priority';
    }

    // Optional: Customize connection
    public function queueConnection(): string
    {
        return 'redis';
    }

    public function handle(SignalEvent $event): void
    {
        // This runs in a queue job
        $this->performExpensiveOperation($event);
    }

    // Handle failures
    public function failed(SignalEvent $event, \Throwable $exception): void
    {
        Log::error('Signal failed', [
            'event' => $event->toArray(),
            'error' => $exception->getMessage(),
        ]);
    }
}

Configuration#

Configuration is stored in config/signal.php:

Consumer Mode#

Choose between Jetstream (JSON) or Firehose (CBOR) mode:

'mode' => env('SIGNAL_MODE', 'jetstream'),

Options:

  • jetstream - JSON events, server-side filtering (default)
  • firehose - CBOR events, client-side filtering (required for custom collections)

Jetstream Configuration#

'websocket_url' => env('SIGNAL_JETSTREAM_URL', 'wss://jetstream2.us-east.bsky.network'),

Available endpoints:

  • US East: wss://jetstream2.us-east.bsky.network (default)
  • US West: wss://jetstream1.us-west.bsky.network

Firehose Configuration#

'firehose' => [
    'host' => env('SIGNAL_FIREHOSE_HOST', 'bsky.network'),
],

The raw firehose endpoint is: wss://{host}/xrpc/com.atproto.sync.subscribeRepos

Cursor Storage#

Choose how to store cursor positions:

'cursor_storage' => env('SIGNAL_CURSOR_STORAGE', 'database'),
Driver Best For Configuration
database Production, multi-server Default connection
redis High performance, distributed Redis connection
file Development, single server Storage path

Environment Variables#

Add to your .env:

# Consumer Mode
SIGNAL_MODE=jetstream  # or 'firehose' for custom collections

# Jetstream Configuration
SIGNAL_JETSTREAM_URL=wss://jetstream2.us-east.bsky.network

# Firehose Configuration (only needed if using firehose mode)
SIGNAL_FIREHOSE_HOST=bsky.network

# Optional Configuration
SIGNAL_CURSOR_STORAGE=database
SIGNAL_QUEUE_CONNECTION=redis
SIGNAL_QUEUE=signal
SIGNAL_BATCH_SIZE=100
SIGNAL_RATE_LIMIT=1000

Auto-Discovery#

Signals are automatically discovered from app/Signals. Disable if needed:

'auto_discovery' => [
    'enabled' => true,
    'path' => app_path('Signals'),
    'namespace' => 'App\\Signals',
],

Or manually register Signals:

'signals' => [
    \App\Signals\NewPostSignal::class,
    \App\Signals\NewFollowSignal::class,
],

Programmatic Usage#

You can start and stop the consumer programmatically using the Signal facade:

use SocialDept\Signal\Facades\Signal;

// Start consuming events (uses mode from config)
Signal::start();

// Start from a specific cursor
Signal::start(cursor: 123456789);

// Check which mode is active
$mode = Signal::getMode(); // Returns 'jetstream' or 'firehose'

// Stop consuming events
Signal::stop();

The facade automatically resolves the correct consumer (Jetstream or Firehose) based on your config('signal.mode') setting. This allows you to:

  • Switch between modes by changing configuration
  • Start consumers from application code (e.g., in a custom command)
  • Integrate Signal into existing application workflows
// Example: Start consumer based on environment
if (app()->environment('production')) {
    config(['signal.mode' => 'jetstream']); // Use efficient Jetstream
} else {
    config(['signal.mode' => 'firehose']); // Use comprehensive Firehose for testing
}

Signal::start();

Available Commands#

signal:install#

Install the package (publish config, migrations, run migrations)

php artisan signal:install

signal:consume#

Start consuming events from AT Protocol

# Use default mode from config
php artisan signal:consume

# Override mode
php artisan signal:consume --mode=jetstream
php artisan signal:consume --mode=firehose

# Start from specific cursor
php artisan signal:consume --cursor=123456789

# Start fresh (ignore stored cursor)
php artisan signal:consume --fresh

# Combine options
php artisan signal:consume --mode=firehose --fresh

signal:list#

List all registered Signals

php artisan signal:list

signal:make#

Create a new Signal class

php artisan make:signal NewPostSignal

# With options
php artisan make:signal FollowSignal --type=commit --collection=app.bsky.graph.follow

signal:test#

Test a Signal with sample data

php artisan signal:test NewPostSignal

Testing#

Signal includes a comprehensive test suite. Test your Signals:

Unit Testing#

use SocialDept\Signal\Events\CommitEvent;
use SocialDept\Signal\Events\SignalEvent;

class NewPostSignalTest extends TestCase
{
    /** @test */
    public function it_handles_new_posts()
    {
        $signal = new NewPostSignal();

        $event = new SignalEvent(
            did: 'did:plc:test',
            timeUs: time() * 1000000,
            kind: 'commit',
            commit: new CommitEvent(
                rev: 'test',
                operation: 'create',
                collection: 'app.bsky.feed.post',
                rkey: 'test',
                record: (object) [
                    'text' => 'Hello World!',
                    'createdAt' => now()->toIso8601String(),
                ],
            ),
        );

        $signal->handle($event);

        // Assert your expected behavior
    }
}

Testing with Artisan#

php artisan signal:test NewPostSignal

External Resources#


Examples#

Monitor All Feed Activity#

class FeedMonitorSignal extends Signal
{
    public function eventTypes(): array
    {
        return ['commit'];
    }

    public function collections(): ?array
    {
        return ['app.bsky.feed.*'];
    }

    public function handle(SignalEvent $event): void
    {
        // Handles posts, likes, reposts, etc.
        Log::info('Feed activity', [
            'collection' => $event->getCollection(),
            'operation' => $event->getOperation(),
            'did' => $event->did,
        ]);
    }
}

Track New Follows#

class NewFollowSignal extends Signal
{
    public function eventTypes(): array
    {
        return ['commit'];
    }

    public function collections(): ?array
    {
        return ['app.bsky.graph.follow'];
    }

    public function handle(SignalEvent $event): void
    {
        if ($event->commit->isCreate()) {
            $record = $event->getRecord();

            // Store follow relationship
            Follow::create([
                'follower_did' => $event->did,
                'following_did' => $record->subject,
            ]);
        }
    }
}

Content Moderation#

class ModerationSignal extends Signal
{
    public function eventTypes(): array
    {
        return ['commit'];
    }

    public function collections(): ?array
    {
        return ['app.bsky.feed.post'];
    }

    public function shouldQueue(): bool
    {
        return true;
    }

    public function handle(SignalEvent $event): void
    {
        $record = $event->getRecord();

        if ($this->containsProhibitedContent($record->text)) {
            $this->flagForModeration($event->did, $record);
        }
    }
}

Requirements#

  • PHP 8.2 or higher
  • Laravel 11.0 or higher
  • WebSocket support (enabled by default in most environments)

License#

The MIT License (MIT). Please see LICENSE for more information.


Support#

For issues, questions, or feature requests:

  • Read the README.md before opening issues
  • Search through existing issues
  • Open new issue

Built for the AT Protocol ecosystem • Made with ❤️ by Social Dept