atproto-jetstream#
A Rust library for consuming AT Protocol Jetstream events with high-performance WebSocket streaming, flexible event handling, and optional Zstandard compression support.
Overview#
atproto-jetstream provides a comprehensive async stream consumer for AT Protocol Jetstream events. This library enables real-time consumption of AT Protocol repository events, identity changes, and account updates through WebSocket connections with support for filtering, compression, and graceful shutdown patterns.
This project was extracted from the open-sourced Smokesignal project and is designed to be a standalone, reusable library for AT Protocol event stream consumption.
Features#
- High-Performance WebSocket Streaming: Async WebSocket-based event consumption with automatic reconnection
- Flexible Event Handler System: Register multiple custom event handlers with unique identifiers
- Zstandard Compression: Optional compression support with custom dictionaries for bandwidth optimization
- Event Filtering: Filter events by collections and DIDs for targeted consumption
- Graceful Shutdown: Cancellation token support for clean shutdown and resource cleanup
- Message Size Management: Configurable maximum message sizes and rate limiting
- Cursor Support: Resume streaming from specific points using cursor positioning
- Structured Error Handling: Comprehensive error types with detailed error codes following project conventions
- Built-in Event Broadcasting: Event broadcasting to multiple consumers with
tokio::sync::broadcast - Tracing Integration: Full structured logging support for debugging and monitoring
Usage#
Basic Event Consumer#
use atproto_jetstream::{Consumer, ConsumerTaskConfig, EventHandler, JetstreamEvent, CancellationToken};
use async_trait::async_trait;
use anyhow::Result;
// Create a custom event handler
struct MyEventHandler;
#[async_trait]
impl EventHandler for MyEventHandler {
async fn handle_event(&self, event: JetstreamEvent) -> Result<()> {
println!("Received event: {:?}", event);
Ok(())
}
fn handler_id(&self) -> String {
"my-handler".to_string()
}
}
#[tokio::main]
async fn main() -> Result<()> {
let config = ConsumerTaskConfig {
user_agent: "my-app/1.0".to_string(),
compression: false,
zstd_dictionary_location: String::new(),
jetstream_hostname: "jetstream1.us-east.bsky.network".to_string(),
collections: vec!["app.bsky.feed.post".to_string()],
};
let consumer = Consumer::new(config);
let handler = std::sync::Arc::new(MyEventHandler);
consumer.register_handler(handler).await?;
let cancellation_token = CancellationToken::new();
consumer.run_background(cancellation_token).await?;
Ok(())
}
Using Multiple Event Handlers#
use atproto_jetstream::{Consumer, LoggingHandler};
use std::sync::Arc;
// Register multiple handlers
let consumer = Consumer::new(config);
let logging_handler = Arc::new(LoggingHandler::new("logger".to_string()));
let custom_handler = Arc::new(MyEventHandler);
consumer.register_handler(logging_handler).await?;
consumer.register_handler(custom_handler).await?;
With Compression Support#
let config = ConsumerTaskConfig {
user_agent: "my-app/1.0".to_string(),
compression: true,
zstd_dictionary_location: "./data/zstd_dictionary".to_string(),
jetstream_hostname: "jetstream1.us-east.bsky.network".to_string(),
collections: vec!["app.bsky.feed.post".to_string()],
};
// Download the Zstandard dictionary first:
// mkdir -p data/
// curl -o data/zstd_dictionary https://github.com/bluesky-social/jetstream/raw/refs/heads/main/pkg/models/zstd_dictionary
Installation#
Add this to your Cargo.toml:
[dependencies]
atproto-jetstream = "0.1.0"
Command Line Tools#
The crate includes a command-line tool for consuming AT Protocol Jetstream events:
atproto-jetstream-consumer#
A comprehensive command-line tool for consuming AT Protocol Jetstream events with real-time streaming, filtering capabilities, and optional compression support. This tool provides an easy way to monitor AT Protocol event streams for development, testing, and production monitoring.
Features:
- Real-Time Event Streaming: Connects to AT Protocol Jetstream instances for live event consumption
- Event Filtering: Filter events by specific collections and DIDs for targeted monitoring
- Compression Support: Optional Zstandard compression with dictionary support for bandwidth optimization
- Flexible Output: Structured JSON output for each event with customizable logging levels
- Connection Management: Automatic reconnection handling and graceful shutdown on interruption
- Configurable Parameters: Extensive configuration options for hostname, collections, message sizes, and more
# Basic event streaming from Jetstream
cargo run --bin atproto-jetstream-consumer \
--hostname jetstream1.us-east.bsky.network \
--collections app.bsky.feed.post \
--user-agent "my-consumer/1.0"
# Stream specific collections with filtering
cargo run --bin atproto-jetstream-consumer \
--hostname jetstream1.us-east.bsky.network \
--collections "app.bsky.feed.post,app.bsky.actor.profile" \
--dids "did:plc:user123,did:plc:user456" \
--user-agent "filtered-consumer/1.0"
# With Zstandard compression enabled
cargo run --bin atproto-jetstream-consumer \
--hostname jetstream1.us-east.bsky.network \
--collections app.bsky.feed.post \
--compression \
--zstd-dictionary ./data/zstd_dictionary \
--user-agent "compressed-consumer/1.0"
# Advanced configuration with message size limits and cursor
cargo run --bin atproto-jetstream-consumer \
--hostname jetstream1.us-east.bsky.network \
--collections app.bsky.feed.post \
--max-message-size 1048576 \
--cursor 1234567890 \
--require-hello \
--user-agent "advanced-consumer/1.0"
Command Line Arguments:
--hostname- Jetstream hostname to connect to (e.g., jetstream1.us-east.bsky.network)--collections- Comma-separated list of AT Protocol collections to subscribe to--dids- Optional comma-separated list of DIDs to filter events for--user-agent- User-Agent string for the WebSocket connection--compression- Enable Zstandard compression (requires zstd-dictionary)--zstd-dictionary- Path to Zstandard dictionary file for compression--max-message-size- Maximum message size in bytes (default: 56000)--cursor- Optional cursor position to start streaming from--require-hello- Require hello message before receiving events (default: true)
Setting up Compression: To use compression, you need to download the Zstandard dictionary:
# Create data directory and download dictionary
mkdir -p data/
curl -o data/zstd_dictionary \
https://github.com/bluesky-social/jetstream/raw/refs/heads/main/pkg/models/zstd_dictionary
# Use with compression enabled
cargo run --bin atproto-jetstream-consumer \
--hostname jetstream1.us-east.bsky.network \
--collections app.bsky.feed.post \
--compression \
--zstd-dictionary ./data/zstd_dictionary
Output Format: The tool outputs structured JSON for each received event:
{
"kind": "commit",
"time_us": 1704067200000000,
"did": "did:plc:user123",
"commit": {
"rev": "3l2uygzaf5c2b",
"operation": "create",
"collection": "app.bsky.feed.post",
"rkey": "3l2uygzaf5c2c",
"cid": "bafyreif5n4jf6jfczjqzckzqxdxm5qnz4jf6jfczjqzckzqxdxm5qnz4",
"record": {
"$type": "app.bsky.feed.post",
"text": "Hello AT Protocol!",
"createdAt": "2024-01-01T00:00:00Z"
}
}
}
This tool is ideal for:
- Development and Testing: Monitor AT Protocol events during application development
- Production Monitoring: Track repository changes and user activity in real-time
- Data Analysis: Collect AT Protocol events for analysis and research
- Integration Testing: Verify that your applications are generating expected events
- System Monitoring: Monitor the health and activity of AT Protocol networks
Event Types#
The library handles JetstreamEvent structures with the following fields:
kind: Event type (e.g., "commit", "identity", "account")time_us: Event timestamp in microsecondscommit: Optional commit data for repository eventsidentity: Optional identity change dataaccount: Optional account-related data
Modules#
- [
consumer] - Core consumer implementation with WebSocket streaming and event handling - [
lib] - Public library interface and re-exports
Error Handling#
The crate uses comprehensive structured error types with unique identifiers:
error-atproto-jetstream-<domain>-<number> <message>: <details>
All errors follow the project convention:
ConsumerError::ConnectionFailed- WebSocket connection establishment failuresConsumerError::DecompressionFailed- Zstandard decompression operation failuresConsumerError::DeserializationFailed- JSON event parsing failuresConsumerError::HandlerRegistrationFailed- Event handler registration conflictsConsumerError::EventSenderNotInitialized- Event broadcasting setup errorsConsumerError::MessageConversionFailed- WebSocket message format errorsConsumerError::UpdateSerializationFailed- Subscription update serialization errorsConsumerError::UpdateSendFailed- Subscription update transmission errorsConsumerError::DecompressorCreationFailed- Zstandard decompressor initialization errors
use atproto_jetstream::consumer::ConsumerError;
// Example error handling
match consumer_result {
Err(ConsumerError::ConnectionFailed(details)) => {
println!("Failed to connect to Jetstream: {}", details);
}
Err(ConsumerError::DecompressionFailed(error)) => {
println!("Decompression failed: {}", error);
}
Err(ConsumerError::HandlerRegistrationFailed(error)) => {
println!("Handler registration failed: {}", error);
}
Ok(()) => println!("Consumer operation successful"),
}
Dependencies#
This crate builds on:
tokio- Async runtime for WebSocket connections and event handlingtokio-websockets- WebSocket client implementation for Jetstream connectionstokio-util- Additional utilities including cancellation token supportfutures- Stream and sink traits for async WebSocket operationszstd- Zstandard compression support with dictionary-based decompressionserde_json- JSON serialization and deserialization for AT Protocol eventshttp- HTTP types for WebSocket headers and URI parsingurlencoding- URL encoding for query parameters in WebSocket connectionsasync_trait- Async trait support for event handler implementationsanyhow- Error handling utilities and result typestracing- Structured logging for debugging and monitoringthiserror- Structured error type derivation
AT Protocol Jetstream#
This library implements a client for AT Protocol Jetstream, which provides:
- Real-Time Event Streaming: Live consumption of AT Protocol repository events
- Efficient Compression: Zstandard compression with custom dictionaries for bandwidth optimization
- Event Filtering: Server-side filtering by collections and DIDs for targeted consumption
- High Performance: WebSocket-based streaming designed for high-throughput event processing
- Reliability: Built-in connection management and error recovery patterns
Contributing#
Contributions are welcome! Please ensure that:
- All tests pass:
cargo test - Code is properly formatted:
cargo fmt - No linting issues:
cargo clippy - New functionality includes appropriate tests and documentation
- Error handling follows the project's structured error format
License#
This project is licensed under the MIT License. See the LICENSE file for details.
Acknowledgments#
This library was extracted from the Smokesignal project, an open-source event and RSVP management and discovery application.