A library for ATProtocol identities.
1# atproto-jetstream
2
3WebSocket consumer for AT Protocol Jetstream events.
4
5## Overview
6
7Real-time event streaming with Zstandard compression, automatic reconnection, and configurable event filtering for AT Protocol repository changes.
8
9## Features
10
11- **WebSocket streaming**: High-performance event consumption with automatic reconnection handling
12- **Event filtering**: Configurable filtering by collections and DIDs for targeted event processing
13- **Zstandard compression**: Built-in support for compressed event streams with custom dictionaries
14- **Event handlers**: Flexible handler system supporting multiple custom event processors
15- **Background processing**: Asynchronous event processing with graceful shutdown via cancellation tokens
16- **Structured errors**: Comprehensive error handling with detailed error codes
17
18## CLI Tools
19
20The following command-line tool is available when built with the `clap` feature:
21
22- **`atproto-jetstream-consumer`**: Real-time event stream consumer with filtering, compression, and background processing support
23
24## Usage
25
26### Basic Event Consumer
27
28```rust
29use atproto_jetstream::{Consumer, ConsumerTaskConfig, EventHandler, JetstreamEvent, CancellationToken};
30use async_trait::async_trait;
31
32struct MyEventHandler;
33
34#[async_trait]
35impl EventHandler for MyEventHandler {
36 async fn handle_event(&self, event: JetstreamEvent) -> anyhow::Result<()> {
37 println!("Received event: {:?}", event);
38 Ok(())
39 }
40
41 fn handler_id(&self) -> String {
42 "my-handler".to_string()
43 }
44}
45
46let config = ConsumerTaskConfig {
47 user_agent: "my-app/1.0".to_string(),
48 compression: false,
49 zstd_dictionary_location: String::new(),
50 jetstream_hostname: "jetstream1.us-east.bsky.network".to_string(),
51 collections: vec!["app.bsky.feed.post".to_string()],
52};
53
54let consumer = Consumer::new(config);
55consumer.register_handler(std::sync::Arc::new(MyEventHandler)).await?;
56
57let cancellation_token = CancellationToken::new();
58consumer.run_background(cancellation_token).await?;
59```
60
61### With Compression
62
63```rust
64let config = ConsumerTaskConfig {
65 compression: true,
66 zstd_dictionary_location: "./data/zstd_dictionary".to_string(),
67 // ... other config
68};
69
70// Download dictionary first:
71// curl -o data/zstd_dictionary https://github.com/bluesky-social/jetstream/raw/refs/heads/main/pkg/models/zstd_dictionary
72```
73
74## Command Line Examples
75
76```bash
77# Basic streaming
78cargo run --bin atproto-jetstream-consumer \
79 --hostname jetstream1.us-east.bsky.network \
80 --collections app.bsky.feed.post \
81 --user-agent "my-consumer/1.0"
82
83# With compression
84cargo run --bin atproto-jetstream-consumer \
85 --hostname jetstream1.us-east.bsky.network \
86 --collections app.bsky.feed.post \
87 --compression \
88 --zstd-dictionary ./data/zstd_dictionary
89
90# With filtering
91cargo run --bin atproto-jetstream-consumer \
92 --hostname jetstream1.us-east.bsky.network \
93 --collections "app.bsky.feed.post,app.bsky.actor.profile" \
94 --dids "did:plc:user123,did:plc:user456"
95```
96
97## License
98
99MIT License