A library for ATProtocol identities.
README.md

atproto-jetstream#

A Rust library for consuming AT Protocol Jetstream events with high-performance WebSocket streaming, flexible event handling, and optional Zstandard compression support.

Overview#

atproto-jetstream provides a comprehensive async stream consumer for AT Protocol Jetstream events. This library enables real-time consumption of AT Protocol repository events, identity changes, and account updates through WebSocket connections with support for filtering, compression, and graceful shutdown patterns.

This project was extracted from the open-sourced Smokesignal project and is designed to be a standalone, reusable library for AT Protocol event stream consumption.

Features#

  • High-Performance WebSocket Streaming: Async WebSocket-based event consumption with automatic reconnection
  • Flexible Event Handler System: Register multiple custom event handlers with unique identifiers
  • Zstandard Compression: Optional compression support with custom dictionaries for bandwidth optimization
  • Event Filtering: Filter events by collections and DIDs for targeted consumption
  • Graceful Shutdown: Cancellation token support for clean shutdown and resource cleanup
  • Message Size Management: Configurable maximum message sizes and rate limiting
  • Cursor Support: Resume streaming from specific points using cursor positioning
  • Structured Error Handling: Comprehensive error types with detailed error codes following project conventions
  • Built-in Event Broadcasting: Event broadcasting to multiple consumers with tokio::sync::broadcast
  • Tracing Integration: Full structured logging support for debugging and monitoring

Usage#

Basic Event Consumer#

use atproto_jetstream::{Consumer, ConsumerTaskConfig, EventHandler, JetstreamEvent, CancellationToken};
use async_trait::async_trait;
use anyhow::Result;

// Create a custom event handler
struct MyEventHandler;

#[async_trait]
impl EventHandler for MyEventHandler {
    async fn handle_event(&self, event: JetstreamEvent) -> Result<()> {
        println!("Received event: {:?}", event);
        Ok(())
    }

    fn handler_id(&self) -> String {
        "my-handler".to_string()
    }
}

#[tokio::main]
async fn main() -> Result<()> {
    let config = ConsumerTaskConfig {
        user_agent: "my-app/1.0".to_string(),
        compression: false,
        zstd_dictionary_location: String::new(),
        jetstream_hostname: "jetstream1.us-east.bsky.network".to_string(),
        collections: vec!["app.bsky.feed.post".to_string()],
    };

    let consumer = Consumer::new(config);
    let handler = std::sync::Arc::new(MyEventHandler);
    
    consumer.register_handler(handler).await?;
    
    let cancellation_token = CancellationToken::new();
    consumer.run_background(cancellation_token).await?;
    
    Ok(())
}

Using Multiple Event Handlers#

use atproto_jetstream::{Consumer, LoggingHandler};
use std::sync::Arc;

// Register multiple handlers
let consumer = Consumer::new(config);

let logging_handler = Arc::new(LoggingHandler::new("logger".to_string()));
let custom_handler = Arc::new(MyEventHandler);

consumer.register_handler(logging_handler).await?;
consumer.register_handler(custom_handler).await?;

With Compression Support#

let config = ConsumerTaskConfig {
    user_agent: "my-app/1.0".to_string(),
    compression: true,
    zstd_dictionary_location: "./data/zstd_dictionary".to_string(),
    jetstream_hostname: "jetstream1.us-east.bsky.network".to_string(),
    collections: vec!["app.bsky.feed.post".to_string()],
};

// Download the Zstandard dictionary first:
// mkdir -p data/
// curl -o data/zstd_dictionary https://github.com/bluesky-social/jetstream/raw/refs/heads/main/pkg/models/zstd_dictionary

Installation#

Add this to your Cargo.toml:

[dependencies]
atproto-jetstream = "0.1.0"

Command Line Tools#

The crate includes a command-line tool for consuming AT Protocol Jetstream events:

atproto-jetstream-consumer#

A comprehensive command-line tool for consuming AT Protocol Jetstream events with real-time streaming, filtering capabilities, and optional compression support. This tool provides an easy way to monitor AT Protocol event streams for development, testing, and production monitoring.

Features:

  • Real-Time Event Streaming: Connects to AT Protocol Jetstream instances for live event consumption
  • Event Filtering: Filter events by specific collections and DIDs for targeted monitoring
  • Compression Support: Optional Zstandard compression with dictionary support for bandwidth optimization
  • Flexible Output: Structured JSON output for each event with customizable logging levels
  • Connection Management: Automatic reconnection handling and graceful shutdown on interruption
  • Configurable Parameters: Extensive configuration options for hostname, collections, message sizes, and more
# Basic event streaming from Jetstream
cargo run --bin atproto-jetstream-consumer \
    --hostname jetstream1.us-east.bsky.network \
    --collections app.bsky.feed.post \
    --user-agent "my-consumer/1.0"

# Stream specific collections with filtering
cargo run --bin atproto-jetstream-consumer \
    --hostname jetstream1.us-east.bsky.network \
    --collections "app.bsky.feed.post,app.bsky.actor.profile" \
    --dids "did:plc:user123,did:plc:user456" \
    --user-agent "filtered-consumer/1.0"

# With Zstandard compression enabled
cargo run --bin atproto-jetstream-consumer \
    --hostname jetstream1.us-east.bsky.network \
    --collections app.bsky.feed.post \
    --compression \
    --zstd-dictionary ./data/zstd_dictionary \
    --user-agent "compressed-consumer/1.0"

# Advanced configuration with message size limits and cursor
cargo run --bin atproto-jetstream-consumer \
    --hostname jetstream1.us-east.bsky.network \
    --collections app.bsky.feed.post \
    --max-message-size 1048576 \
    --cursor 1234567890 \
    --require-hello \
    --user-agent "advanced-consumer/1.0"

Command Line Arguments:

  • --hostname - Jetstream hostname to connect to (e.g., jetstream1.us-east.bsky.network)
  • --collections - Comma-separated list of AT Protocol collections to subscribe to
  • --dids - Optional comma-separated list of DIDs to filter events for
  • --user-agent - User-Agent string for the WebSocket connection
  • --compression - Enable Zstandard compression (requires zstd-dictionary)
  • --zstd-dictionary - Path to Zstandard dictionary file for compression
  • --max-message-size - Maximum message size in bytes (default: 56000)
  • --cursor - Optional cursor position to start streaming from
  • --require-hello - Require hello message before receiving events (default: true)

Setting up Compression: To use compression, you need to download the Zstandard dictionary:

# Create data directory and download dictionary
mkdir -p data/
curl -o data/zstd_dictionary \
  https://github.com/bluesky-social/jetstream/raw/refs/heads/main/pkg/models/zstd_dictionary

# Use with compression enabled
cargo run --bin atproto-jetstream-consumer \
    --hostname jetstream1.us-east.bsky.network \
    --collections app.bsky.feed.post \
    --compression \
    --zstd-dictionary ./data/zstd_dictionary

Output Format: The tool outputs structured JSON for each received event:

{
  "kind": "commit",
  "time_us": 1704067200000000,
  "did": "did:plc:user123",
  "commit": {
    "rev": "3l2uygzaf5c2b",
    "operation": "create",
    "collection": "app.bsky.feed.post",
    "rkey": "3l2uygzaf5c2c",
    "cid": "bafyreif5n4jf6jfczjqzckzqxdxm5qnz4jf6jfczjqzckzqxdxm5qnz4",
    "record": {
      "$type": "app.bsky.feed.post",
      "text": "Hello AT Protocol!",
      "createdAt": "2024-01-01T00:00:00Z"
    }
  }
}

This tool is ideal for:

  • Development and Testing: Monitor AT Protocol events during application development
  • Production Monitoring: Track repository changes and user activity in real-time
  • Data Analysis: Collect AT Protocol events for analysis and research
  • Integration Testing: Verify that your applications are generating expected events
  • System Monitoring: Monitor the health and activity of AT Protocol networks

Event Types#

The library handles JetstreamEvent structures with the following fields:

  • kind: Event type (e.g., "commit", "identity", "account")
  • time_us: Event timestamp in microseconds
  • commit: Optional commit data for repository events
  • identity: Optional identity change data
  • account: Optional account-related data

Modules#

  • [consumer] - Core consumer implementation with WebSocket streaming and event handling
  • [lib] - Public library interface and re-exports

Error Handling#

The crate uses comprehensive structured error types with unique identifiers:

error-atproto-jetstream-<domain>-<number> <message>: <details>

All errors follow the project convention:

  • ConsumerError::ConnectionFailed - WebSocket connection establishment failures
  • ConsumerError::DecompressionFailed - Zstandard decompression operation failures
  • ConsumerError::DeserializationFailed - JSON event parsing failures
  • ConsumerError::HandlerRegistrationFailed - Event handler registration conflicts
  • ConsumerError::EventSenderNotInitialized - Event broadcasting setup errors
  • ConsumerError::MessageConversionFailed - WebSocket message format errors
  • ConsumerError::UpdateSerializationFailed - Subscription update serialization errors
  • ConsumerError::UpdateSendFailed - Subscription update transmission errors
  • ConsumerError::DecompressorCreationFailed - Zstandard decompressor initialization errors
use atproto_jetstream::consumer::ConsumerError;

// Example error handling
match consumer_result {
    Err(ConsumerError::ConnectionFailed(details)) => {
        println!("Failed to connect to Jetstream: {}", details);
    }
    Err(ConsumerError::DecompressionFailed(error)) => {
        println!("Decompression failed: {}", error);
    }
    Err(ConsumerError::HandlerRegistrationFailed(error)) => {
        println!("Handler registration failed: {}", error);
    }
    Ok(()) => println!("Consumer operation successful"),
}

Dependencies#

This crate builds on:

  • tokio - Async runtime for WebSocket connections and event handling
  • tokio-websockets - WebSocket client implementation for Jetstream connections
  • tokio-util - Additional utilities including cancellation token support
  • futures - Stream and sink traits for async WebSocket operations
  • zstd - Zstandard compression support with dictionary-based decompression
  • serde_json - JSON serialization and deserialization for AT Protocol events
  • http - HTTP types for WebSocket headers and URI parsing
  • urlencoding - URL encoding for query parameters in WebSocket connections
  • async_trait - Async trait support for event handler implementations
  • anyhow - Error handling utilities and result types
  • tracing - Structured logging for debugging and monitoring
  • thiserror - Structured error type derivation

AT Protocol Jetstream#

This library implements a client for AT Protocol Jetstream, which provides:

  • Real-Time Event Streaming: Live consumption of AT Protocol repository events
  • Efficient Compression: Zstandard compression with custom dictionaries for bandwidth optimization
  • Event Filtering: Server-side filtering by collections and DIDs for targeted consumption
  • High Performance: WebSocket-based streaming designed for high-throughput event processing
  • Reliability: Built-in connection management and error recovery patterns

Contributing#

Contributions are welcome! Please ensure that:

  1. All tests pass: cargo test
  2. Code is properly formatted: cargo fmt
  3. No linting issues: cargo clippy
  4. New functionality includes appropriate tests and documentation
  5. Error handling follows the project's structured error format

License#

This project is licensed under the MIT License. See the LICENSE file for details.

Acknowledgments#

This library was extracted from the Smokesignal project, an open-source event and RSVP management and discovery application.