A library for ATProtocol identities.
1# atproto-jetstream 2 3WebSocket consumer for AT Protocol Jetstream events. 4 5## Overview 6 7Real-time event streaming with Zstandard compression, automatic reconnection, and configurable event filtering for AT Protocol repository changes. 8 9## Features 10 11- **WebSocket streaming**: High-performance event consumption with automatic reconnection handling 12- **Event filtering**: Configurable filtering by collections and DIDs for targeted event processing 13- **Zstandard compression**: Built-in support for compressed event streams with custom dictionaries 14- **Event handlers**: Flexible handler system supporting multiple custom event processors 15- **Background processing**: Asynchronous event processing with graceful shutdown via cancellation tokens 16- **Structured errors**: Comprehensive error handling with detailed error codes 17 18## CLI Tools 19 20The following command-line tool is available when built with the `clap` feature: 21 22- **`atproto-jetstream-consumer`**: Real-time event stream consumer with filtering, compression, and background processing support 23 24## Usage 25 26### Basic Event Consumer 27 28```rust 29use atproto_jetstream::{Consumer, ConsumerTaskConfig, EventHandler, JetstreamEvent, CancellationToken}; 30use async_trait::async_trait; 31 32struct MyEventHandler; 33 34#[async_trait] 35impl EventHandler for MyEventHandler { 36 async fn handle_event(&self, event: JetstreamEvent) -> anyhow::Result<()> { 37 println!("Received event: {:?}", event); 38 Ok(()) 39 } 40 41 fn handler_id(&self) -> String { 42 "my-handler".to_string() 43 } 44} 45 46let config = ConsumerTaskConfig { 47 user_agent: "my-app/1.0".to_string(), 48 compression: false, 49 zstd_dictionary_location: String::new(), 50 jetstream_hostname: "jetstream1.us-east.bsky.network".to_string(), 51 collections: vec!["app.bsky.feed.post".to_string()], 52}; 53 54let consumer = Consumer::new(config); 55consumer.register_handler(std::sync::Arc::new(MyEventHandler)).await?; 56 57let cancellation_token = CancellationToken::new(); 58consumer.run_background(cancellation_token).await?; 59``` 60 61### With Compression 62 63```rust 64let config = ConsumerTaskConfig { 65 compression: true, 66 zstd_dictionary_location: "./data/zstd_dictionary".to_string(), 67 // ... other config 68}; 69 70// Download dictionary first: 71// curl -o data/zstd_dictionary https://github.com/bluesky-social/jetstream/raw/refs/heads/main/pkg/models/zstd_dictionary 72``` 73 74## Command Line Examples 75 76```bash 77# Basic streaming 78cargo run --bin atproto-jetstream-consumer \ 79 --hostname jetstream1.us-east.bsky.network \ 80 --collections app.bsky.feed.post \ 81 --user-agent "my-consumer/1.0" 82 83# With compression 84cargo run --bin atproto-jetstream-consumer \ 85 --hostname jetstream1.us-east.bsky.network \ 86 --collections app.bsky.feed.post \ 87 --compression \ 88 --zstd-dictionary ./data/zstd_dictionary 89 90# With filtering 91cargo run --bin atproto-jetstream-consumer \ 92 --hostname jetstream1.us-east.bsky.network \ 93 --collections "app.bsky.feed.post,app.bsky.actor.profile" \ 94 --dids "did:plc:user123,did:plc:user456" 95``` 96 97## License 98 99MIT License