+1
-1
CLAUDE.md
+1
-1
CLAUDE.md
···
43
43
44
44
Avoid creating new errors with the `anyhow!(...)` macro.
45
45
46
-
When a function call would return `anyhow::Error`, use the following pattern to log the error in addition to any code specific handling that must occur
46
+
When a function call would return `anyhow::Error`, use the following pattern to log the error in addition to any code specific handling that must occur:
47
47
48
48
```
49
49
If let Err(err) = result {
+3
-1
CLAUDE.prompts.md
+3
-1
CLAUDE.prompts.md
···
6
6
7
7
## Review and ensure error correctness
8
8
9
-
Review all of the errors in the `atproto-xrpcs` crate and ensure their names, messages, documentation, and usage are correct. Each error must have a globally unique identifier and error numbers must be ordered consistently. Think very very hard.
9
+
Review all of the errors in the `atproto-jetstream` crate and ensure their names, messages, documentation, and usage are correct. Each error must have a globally unique identifier and error numbers must be ordered consistently. Think very very hard.
10
10
11
11
Review all of the errors and identify any that are unused. Think very very hard.
12
12
···
55
55
4. Update the `README.md` files in each of the project crates. Each `README.md` file should include a high level overview of what the crate provides and include a summary of each binary produced by the crate.
56
56
57
57
5. Update the `README.md` file in the root of the project that describes the project as collection of components used to create ATProtocol applications. Note that parts of this was extracted from the open sourced https://tangled.sh/@smokesignal.events/smokesignal project. This project is open source under the MIT license.
58
+
59
+
6. Ensure all crates can be packaged and published.
58
60
59
61
Avoid introducing new dependencies. Think very very hard.
+189
-10
Cargo.lock
+189
-10
Cargo.lock
···
57
57
58
58
[[package]]
59
59
name = "atproto-client"
60
-
version = "0.5.0"
60
+
version = "0.6.0"
61
61
dependencies = [
62
62
"anyhow",
63
63
"atproto-identity",
64
64
"atproto-oauth",
65
65
"atproto-record",
66
+
"bytes",
66
67
"reqwest",
67
68
"reqwest-chain",
68
69
"reqwest-middleware",
···
76
77
77
78
[[package]]
78
79
name = "atproto-identity"
79
-
version = "0.5.0"
80
+
version = "0.6.0"
80
81
dependencies = [
81
82
"anyhow",
82
83
"async-trait",
···
100
101
]
101
102
102
103
[[package]]
104
+
name = "atproto-jetstream"
105
+
version = "0.6.0"
106
+
dependencies = [
107
+
"anyhow",
108
+
"async-trait",
109
+
"atproto-identity",
110
+
"futures",
111
+
"http",
112
+
"serde",
113
+
"serde_json",
114
+
"thiserror 2.0.12",
115
+
"tokio",
116
+
"tokio-util",
117
+
"tokio-websockets",
118
+
"tracing",
119
+
"tracing-subscriber",
120
+
"urlencoding",
121
+
"zstd",
122
+
]
123
+
124
+
[[package]]
103
125
name = "atproto-oauth"
104
-
version = "0.5.0"
126
+
version = "0.6.0"
105
127
dependencies = [
106
128
"anyhow",
107
129
"async-trait",
···
132
154
133
155
[[package]]
134
156
name = "atproto-oauth-axum"
135
-
version = "0.5.0"
157
+
version = "0.6.0"
136
158
dependencies = [
137
159
"anyhow",
138
160
"async-trait",
···
158
180
159
181
[[package]]
160
182
name = "atproto-record"
161
-
version = "0.5.0"
183
+
version = "0.6.0"
162
184
dependencies = [
163
185
"anyhow",
164
186
"atproto-identity",
···
177
199
178
200
[[package]]
179
201
name = "atproto-xrpcs"
180
-
version = "0.5.0"
202
+
version = "0.6.0"
181
203
dependencies = [
182
204
"anyhow",
183
205
"async-trait",
···
204
226
205
227
[[package]]
206
228
name = "atproto-xrpcs-helloworld"
207
-
version = "0.5.0"
229
+
version = "0.6.0"
208
230
dependencies = [
209
231
"anyhow",
210
232
"async-trait",
···
382
404
source = "registry+https://github.com/rust-lang/crates.io-index"
383
405
checksum = "16595d3be041c03b09d08d0858631facccee9221e579704070e6e9e4915d3bc7"
384
406
dependencies = [
407
+
"jobserver",
408
+
"libc",
385
409
"shlex",
386
410
]
387
411
···
437
461
]
438
462
439
463
[[package]]
464
+
name = "core-foundation"
465
+
version = "0.10.1"
466
+
source = "registry+https://github.com/rust-lang/crates.io-index"
467
+
checksum = "b2a6cd9ae233e7f62ba4e9353e81a88df7fc8a5987b8d445b4d90c879bd156f6"
468
+
dependencies = [
469
+
"core-foundation-sys",
470
+
"libc",
471
+
]
472
+
473
+
[[package]]
440
474
name = "core-foundation-sys"
441
475
version = "0.8.7"
442
476
source = "registry+https://github.com/rust-lang/crates.io-index"
···
700
734
]
701
735
702
736
[[package]]
737
+
name = "futures"
738
+
version = "0.3.31"
739
+
source = "registry+https://github.com/rust-lang/crates.io-index"
740
+
checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876"
741
+
dependencies = [
742
+
"futures-channel",
743
+
"futures-core",
744
+
"futures-executor",
745
+
"futures-io",
746
+
"futures-sink",
747
+
"futures-task",
748
+
"futures-util",
749
+
]
750
+
751
+
[[package]]
703
752
name = "futures-channel"
704
753
version = "0.3.31"
705
754
source = "registry+https://github.com/rust-lang/crates.io-index"
706
755
checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10"
707
756
dependencies = [
708
757
"futures-core",
758
+
"futures-sink",
709
759
]
710
760
711
761
[[package]]
···
715
765
checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e"
716
766
717
767
[[package]]
768
+
name = "futures-executor"
769
+
version = "0.3.31"
770
+
source = "registry+https://github.com/rust-lang/crates.io-index"
771
+
checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f"
772
+
dependencies = [
773
+
"futures-core",
774
+
"futures-task",
775
+
"futures-util",
776
+
]
777
+
778
+
[[package]]
718
779
name = "futures-io"
719
780
version = "0.3.31"
720
781
source = "registry+https://github.com/rust-lang/crates.io-index"
721
782
checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6"
722
783
723
784
[[package]]
785
+
name = "futures-macro"
786
+
version = "0.3.31"
787
+
source = "registry+https://github.com/rust-lang/crates.io-index"
788
+
checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650"
789
+
dependencies = [
790
+
"proc-macro2",
791
+
"quote",
792
+
"syn",
793
+
]
794
+
795
+
[[package]]
724
796
name = "futures-sink"
725
797
version = "0.3.31"
726
798
source = "registry+https://github.com/rust-lang/crates.io-index"
···
738
810
source = "registry+https://github.com/rust-lang/crates.io-index"
739
811
checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81"
740
812
dependencies = [
813
+
"futures-channel",
741
814
"futures-core",
815
+
"futures-io",
816
+
"futures-macro",
817
+
"futures-sink",
742
818
"futures-task",
819
+
"memchr",
743
820
"pin-project-lite",
744
821
"pin-utils",
745
822
"slab",
···
1194
1271
checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c"
1195
1272
1196
1273
[[package]]
1274
+
name = "jobserver"
1275
+
version = "0.1.33"
1276
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1277
+
checksum = "38f262f097c174adebe41eb73d66ae9c06b2844fb0da69969647bbddd9b0538a"
1278
+
dependencies = [
1279
+
"getrandom 0.3.3",
1280
+
"libc",
1281
+
]
1282
+
1283
+
[[package]]
1197
1284
name = "js-sys"
1198
1285
version = "0.3.77"
1199
1286
source = "registry+https://github.com/rust-lang/crates.io-index"
···
1395
1482
"openssl-probe",
1396
1483
"openssl-sys",
1397
1484
"schannel",
1398
-
"security-framework",
1485
+
"security-framework 2.11.1",
1399
1486
"security-framework-sys",
1400
1487
"tempfile",
1401
1488
]
···
1946
2033
]
1947
2034
1948
2035
[[package]]
2036
+
name = "rustls-native-certs"
2037
+
version = "0.8.1"
2038
+
source = "registry+https://github.com/rust-lang/crates.io-index"
2039
+
checksum = "7fcff2dd52b58a8d98a70243663a0d234c4e2b79235637849d15913394a247d3"
2040
+
dependencies = [
2041
+
"openssl-probe",
2042
+
"rustls-pki-types",
2043
+
"schannel",
2044
+
"security-framework 3.2.0",
2045
+
]
2046
+
2047
+
[[package]]
1949
2048
name = "rustls-pki-types"
1950
2049
version = "1.12.0"
1951
2050
source = "registry+https://github.com/rust-lang/crates.io-index"
···
2021
2120
checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02"
2022
2121
dependencies = [
2023
2122
"bitflags",
2024
-
"core-foundation",
2123
+
"core-foundation 0.9.4",
2124
+
"core-foundation-sys",
2125
+
"libc",
2126
+
"security-framework-sys",
2127
+
]
2128
+
2129
+
[[package]]
2130
+
name = "security-framework"
2131
+
version = "3.2.0"
2132
+
source = "registry+https://github.com/rust-lang/crates.io-index"
2133
+
checksum = "271720403f46ca04f7ba6f55d438f8bd878d6b8ca0a1046e8228c4145bcbb316"
2134
+
dependencies = [
2135
+
"bitflags",
2136
+
"core-foundation 0.10.1",
2025
2137
"core-foundation-sys",
2026
2138
"libc",
2027
2139
"security-framework-sys",
···
2155
2267
checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64"
2156
2268
2157
2269
[[package]]
2270
+
name = "signal-hook-registry"
2271
+
version = "1.4.5"
2272
+
source = "registry+https://github.com/rust-lang/crates.io-index"
2273
+
checksum = "9203b8055f63a2a00e2f593bb0510367fe707d7ff1e5c872de2f537b339e5410"
2274
+
dependencies = [
2275
+
"libc",
2276
+
]
2277
+
2278
+
[[package]]
2158
2279
name = "signature"
2159
2280
version = "2.2.0"
2160
2281
source = "registry+https://github.com/rust-lang/crates.io-index"
···
2165
2286
]
2166
2287
2167
2288
[[package]]
2289
+
name = "simdutf8"
2290
+
version = "0.1.5"
2291
+
source = "registry+https://github.com/rust-lang/crates.io-index"
2292
+
checksum = "e3a9fe34e3e7a50316060351f37187a3f546bce95496156754b601a5fa71b76e"
2293
+
2294
+
[[package]]
2168
2295
name = "slab"
2169
2296
version = "0.4.9"
2170
2297
source = "registry+https://github.com/rust-lang/crates.io-index"
···
2249
2376
checksum = "3c879d448e9d986b661742763247d3693ed13609438cf3d006f51f5368a5ba6b"
2250
2377
dependencies = [
2251
2378
"bitflags",
2252
-
"core-foundation",
2379
+
"core-foundation 0.9.4",
2253
2380
"system-configuration-sys",
2254
2381
]
2255
2382
···
2367
2494
"bytes",
2368
2495
"libc",
2369
2496
"mio",
2497
+
"parking_lot",
2370
2498
"pin-project-lite",
2499
+
"signal-hook-registry",
2371
2500
"socket2",
2372
2501
"tokio-macros",
2373
2502
"windows-sys 0.52.0",
···
2418
2547
]
2419
2548
2420
2549
[[package]]
2550
+
name = "tokio-websockets"
2551
+
version = "0.11.4"
2552
+
source = "registry+https://github.com/rust-lang/crates.io-index"
2553
+
checksum = "9fcaf159b4e7a376b05b5bfd77bfd38f3324f5fce751b4213bfc7eaa47affb4e"
2554
+
dependencies = [
2555
+
"base64",
2556
+
"bytes",
2557
+
"futures-core",
2558
+
"futures-sink",
2559
+
"http",
2560
+
"httparse",
2561
+
"rand 0.9.1",
2562
+
"ring",
2563
+
"rustls-native-certs",
2564
+
"rustls-pki-types",
2565
+
"simdutf8",
2566
+
"tokio",
2567
+
"tokio-rustls",
2568
+
"tokio-util",
2569
+
]
2570
+
2571
+
[[package]]
2421
2572
name = "tower"
2422
2573
version = "0.5.2"
2423
2574
source = "registry+https://github.com/rust-lang/crates.io-index"
···
3246
3397
"quote",
3247
3398
"syn",
3248
3399
]
3400
+
3401
+
[[package]]
3402
+
name = "zstd"
3403
+
version = "0.13.3"
3404
+
source = "registry+https://github.com/rust-lang/crates.io-index"
3405
+
checksum = "e91ee311a569c327171651566e07972200e76fcfe2242a4fa446149a3881c08a"
3406
+
dependencies = [
3407
+
"zstd-safe",
3408
+
]
3409
+
3410
+
[[package]]
3411
+
name = "zstd-safe"
3412
+
version = "7.2.4"
3413
+
source = "registry+https://github.com/rust-lang/crates.io-index"
3414
+
checksum = "8f49c4d5f0abb602a93fb8736af2a4f4dd9512e36f7f570d66e65ff867ed3b9d"
3415
+
dependencies = [
3416
+
"zstd-sys",
3417
+
]
3418
+
3419
+
[[package]]
3420
+
name = "zstd-sys"
3421
+
version = "2.0.15+zstd.1.5.7"
3422
+
source = "registry+https://github.com/rust-lang/crates.io-index"
3423
+
checksum = "eb81183ddd97d0c74cedf1d50d85c8d08c1b8b68ee863bdee9e706eedba1a237"
3424
+
dependencies = [
3425
+
"cc",
3426
+
"pkg-config",
3427
+
]
+13
-5
Cargo.toml
+13
-5
Cargo.toml
···
2
2
members = [
3
3
"crates/atproto-client",
4
4
"crates/atproto-identity",
5
+
"crates/atproto-jetstream",
5
6
"crates/atproto-oauth-axum",
6
7
"crates/atproto-oauth",
7
8
"crates/atproto-record",
···
20
21
categories = ["command-line-utilities", "web-programming"]
21
22
22
23
[workspace.dependencies]
23
-
atproto-client = { version = "0.5.0", path = "crates/atproto-client" }
24
-
atproto-identity = { version = "0.5.0", path = "crates/atproto-identity" }
25
-
atproto-oauth = { version = "0.5.0", path = "crates/atproto-oauth" }
26
-
atproto-record = { version = "0.5.0", path = "crates/atproto-record" }
27
-
atproto-xrpcs = { version = "0.5.0", path = "crates/atproto-xrpcs" }
24
+
atproto-client = { version = "0.6.0", path = "crates/atproto-client" }
25
+
atproto-identity = { version = "0.6.0", path = "crates/atproto-identity" }
26
+
atproto-oauth = { version = "0.6.0", path = "crates/atproto-oauth" }
27
+
atproto-record = { version = "0.6.0", path = "crates/atproto-record" }
28
+
atproto-xrpcs = { version = "0.6.0", path = "crates/atproto-xrpcs" }
29
+
atproto-jetstream = { version = "0.6.0", path = "crates/atproto-jetstream" }
28
30
29
31
anyhow = "1.0"
30
32
async-trait = "0.1.88"
···
32
34
chrono = {version = "0.4.41", default-features = false, features = ["std", "now"]}
33
35
ecdsa = { version = "0.16.9", features = ["std"] }
34
36
elliptic-curve = { version = "0.13.8", features = ["jwk", "serde"] }
37
+
futures = "0.3"
35
38
hickory-resolver = { version = "0.25" }
39
+
http = "1.3.1"
36
40
k256 = "0.13.4"
37
41
lru = "0.12"
38
42
multibase = "0.9.1"
···
47
51
sha2 = "0.10.9"
48
52
thiserror = "2.0"
49
53
tokio = { version = "1.41", features = ["macros", "rt", "rt-multi-thread"] }
54
+
tokio-websockets = { version = "0.11.4", features = ["client", "rustls-native-roots", "rand", "ring"] }
55
+
tokio-util = "0.7"
50
56
tracing = { version = "0.1", features = ["async-await"] }
51
57
ulid = "1.2.1"
58
+
urlencoding = "2.1"
59
+
zstd = "0.13"
52
60
53
61
[workspace.lints.rust]
54
62
unsafe_code = "forbid"
+1
-1
Dockerfile
+1
-1
Dockerfile
···
56
56
LABEL org.opencontainers.image.description="AT Protocol identity management tools"
57
57
LABEL org.opencontainers.image.authors="Nick Gerakines <nick.gerakines@gmail.com>"
58
58
LABEL org.opencontainers.image.source="https://tangled.sh/@smokesignal.events/atproto-identity-rs"
59
-
LABEL org.opencontainers.image.version="0.5.0"
59
+
LABEL org.opencontainers.image.version="0.6.0"
60
60
LABEL org.opencontainers.image.licenses="MIT"
61
61
62
62
# Document available binaries
+86
-12
README.md
+86
-12
README.md
···
1
1
# AT Protocol Identity & Record Library
2
2
3
-
A comprehensive Rust library for AT Protocol identity management, record signing, verification, and OAuth operations. This library provides full functionality for DID resolution, handle resolution, identity document management, cryptographic record operations, and OAuth 2.0 flows across multiple DID methods.
3
+
A comprehensive collection of Rust crates for building AT Protocol applications. This workspace provides complete functionality for identity management, record operations, OAuth 2.0 flows, event streaming, XRPC services, and HTTP client operations across multiple DID methods.
4
4
5
-
This project was extracted from the open-sourced [Smokesignal](https://tangled.sh/@smokesignal.events/smokesignal) project and is released under the MIT license.
5
+
This project was extracted from the open-sourced [smokesignal.events](https://tangled.sh/@smokesignal.events/smokesignal) project and is released under the MIT license.
6
6
7
7
## Project Overview
8
8
···
10
10
11
11
## Crates
12
12
13
-
This workspace contains seven specialized crates:
13
+
This workspace contains eight specialized crates:
14
14
15
15
### [`atproto-identity`](crates/atproto-identity/)
16
16
**Core identity management and cryptographic operations**
···
78
78
- **JWT Authentication**: Demonstrates integration with `atproto-xrpcs` authorization extractors
79
79
- **Service Discovery**: Complete service document with verification methods and service endpoints
80
80
81
+
### [`atproto-jetstream`](crates/atproto-jetstream/)
82
+
**AT Protocol Jetstream event consumer library**
83
+
84
+
- **Event Stream Consumer**: High-performance WebSocket-based event consumption from Jetstream instances
85
+
- **Event Handler Registration**: Flexible event handler system supporting multiple concurrent handlers
86
+
- **Compression Support**: Optional Zstandard compression with dictionary support for efficient data transfer
87
+
- **Graceful Shutdown**: Cancellation token support for clean shutdown and resource cleanup
88
+
- **Error Handling**: Comprehensive error types following project conventions with structured logging
89
+
81
90
## Command Line Tools
82
91
83
-
The library includes 8 command-line utilities across the crates:
92
+
The library includes 10 command-line utilities across the crates:
84
93
85
94
### Identity Operations (`atproto-identity`)
86
95
- **`atproto-identity-resolve`** - Resolve AT Protocol handles and DIDs to identity documents
···
92
101
- **`atproto-record-sign`** - Sign AT Protocol records with embedded signature metadata
93
102
- **`atproto-record-verify`** - Verify AT Protocol record signatures with issuer authentication
94
103
104
+
### HTTP Client Operations (`atproto-client`)
105
+
- **`atproto-client-dpop`** - Test DPoP authentication flows with AT Protocol services
106
+
95
107
### OAuth Operations (`atproto-oauth-axum`)
96
108
- **`atproto-oauth-tool`** - Complete OAuth client flow with local server and token acquisition
97
109
98
110
### XRPC Services (`atproto-xrpcs-helloworld`)
99
111
- **`atproto-xrpcs-helloworld`** - Example AT Protocol XRPC service with DID web identity and authentication
100
112
113
+
### Event Streaming (`atproto-jetstream`)
114
+
- **`atproto-jetstream-consumer`** - Stream AT Protocol events from Jetstream instances with configurable handlers
115
+
101
116
## Quick Start
102
117
103
118
Add the crates to your `Cargo.toml`:
104
119
105
120
```toml
106
121
[dependencies]
107
-
atproto-identity = "0.5.0"
108
-
atproto-record = "0.5.0"
109
-
atproto-oauth = "0.5.0"
110
-
atproto-client = "0.5.0"
111
-
atproto-oauth-axum = "0.5.0"
112
-
atproto-xrpcs = "0.5.0"
113
-
atproto-xrpcs-helloworld = "0.5.0"
122
+
atproto-identity = "0.6.0"
123
+
atproto-record = "0.6.0"
124
+
atproto-oauth = "0.6.0"
125
+
atproto-client = "0.6.0"
126
+
atproto-oauth-axum = "0.6.0"
127
+
atproto-xrpcs = "0.6.0"
128
+
atproto-jetstream = "0.6.0"
114
129
```
115
130
116
131
### Basic Identity Resolution
···
256
271
}
257
272
```
258
273
274
+
### Jetstream Event Streaming
275
+
276
+
```rust
277
+
use atproto_jetstream::{Consumer, ConsumerTaskConfig, EventHandler, JetstreamEvent, CancellationToken};
278
+
use async_trait::async_trait;
279
+
use std::sync::Arc;
280
+
281
+
// Custom event handler
282
+
struct PostEventHandler;
283
+
284
+
#[async_trait]
285
+
impl EventHandler for PostEventHandler {
286
+
async fn handle_event(&self, event: JetstreamEvent) -> anyhow::Result<()> {
287
+
if event.kind == "commit" {
288
+
println!("Received post event: {:?}", event);
289
+
}
290
+
Ok(())
291
+
}
292
+
293
+
fn handler_id(&self) -> String {
294
+
"post-handler".to_string()
295
+
}
296
+
}
297
+
298
+
#[tokio::main]
299
+
async fn main() -> anyhow::Result<()> {
300
+
let config = ConsumerTaskConfig {
301
+
user_agent: "my-app/1.0".to_string(),
302
+
compression: false,
303
+
zstd_dictionary_location: String::new(),
304
+
jetstream_hostname: "jetstream1.us-east.bsky.network".to_string(),
305
+
collections: vec!["app.bsky.feed.post".to_string()],
306
+
};
307
+
308
+
let consumer = Consumer::new(config);
309
+
let handler = Arc::new(PostEventHandler);
310
+
311
+
consumer.register_handler(handler).await?;
312
+
313
+
let cancellation_token = CancellationToken::new();
314
+
consumer.run_background(cancellation_token).await?;
315
+
316
+
Ok(())
317
+
}
318
+
```
319
+
259
320
## CLI Usage Examples
260
321
261
322
```bash
···
292
353
# Run example XRPC service with DID web identity
293
354
EXTERNAL_BASE=localhost:8080 SERVICE_KEY=did:key:zQ3shNzMp4oaaQ1... \
294
355
cargo run --bin atproto-xrpcs-helloworld
356
+
357
+
# Stream events from Jetstream
358
+
cargo run --bin atproto-jetstream-consumer \
359
+
--hostname jetstream1.us-east.bsky.network \
360
+
--collections app.bsky.feed.post \
361
+
--user-agent "my-consumer/1.0"
362
+
363
+
# Stream with compression (requires dictionary file)
364
+
cargo run --bin atproto-jetstream-consumer \
365
+
--hostname jetstream1.us-east.bsky.network \
366
+
--collections app.bsky.feed.post \
367
+
--compression \
368
+
--zstd-dictionary ./data/zstd_dictionary
295
369
```
296
370
297
371
## Features
···
399
473
400
474
## Acknowledgments
401
475
402
-
This library was extracted from the [Smokesignal](https://tangled.sh/@smokesignal.events/smokesignal) project, an open-source event and RSVP management and discovery application. We thank the Smokesignal contributors for their foundational work on AT Protocol identity management and record operations.
476
+
This library was extracted from the [smokesignal.events](https://tangled.sh/@smokesignal.events/smokesignal) project, an open-source event and RSVP management and discovery application built on AT Protocol. We thank the smokesignal.events contributors for their foundational work on AT Protocol identity management, record operations, and event streaming infrastructure.
+2
-1
crates/atproto-client/Cargo.toml
+2
-1
crates/atproto-client/Cargo.toml
···
1
1
[package]
2
2
name = "atproto-client"
3
-
version = "0.5.0"
3
+
version = "0.6.0"
4
4
description = "HTTP client for AT Protocol services with OAuth and identity integration"
5
5
readme = "README.md"
6
6
homepage = "https://tangled.sh/@smokesignal.events/atproto-identity-rs"
···
29
29
tokio.workspace = true
30
30
tracing.workspace = true
31
31
urlencoding = "2.1.3"
32
+
bytes = "1.10.1"
32
33
33
34
[lints]
34
35
workspace = true
+47
-3
crates/atproto-client/README.md
+47
-3
crates/atproto-client/README.md
···
25
25
26
26
```toml
27
27
[dependencies]
28
-
atproto-client = "0.5.0"
28
+
atproto-client = "0.6.0"
29
29
```
30
30
31
31
## Usage
···
330
330
- `tracing` - Structured logging for debugging and monitoring
331
331
- `thiserror` - Structured error type derivation
332
332
333
-
## Library Only
333
+
## Command Line Tools
334
334
335
-
This crate is designed as a library and does not provide command line tools. All functionality is accessed programmatically through the Rust API. For command line operations, see the [`atproto-identity`](../atproto-identity) and [`atproto-record`](../atproto-record) crates which include CLI tools for identity resolution and record signing operations.
335
+
The crate includes one command-line tool for DPoP authentication testing:
336
+
337
+
### `atproto-client-dpop`
338
+
339
+
A command-line tool for testing DPoP authentication flows with AT Protocol services. This tool demonstrates the complete DPoP authentication process including proof generation, HTTP request signing, and token usage.
340
+
341
+
**Features:**
342
+
- **DPoP Proof Generation**: Creates DPoP proofs for HTTP requests using private keys
343
+
- **OAuth Integration**: Supports OAuth access tokens with DPoP binding
344
+
- **HTTP Client Testing**: Tests DPoP authentication against real AT Protocol endpoints
345
+
- **Request Signing**: Demonstrates proper DPoP header generation and validation
346
+
- **Token Management**: Shows how to use DPoP-bound access tokens for API requests
347
+
348
+
```bash
349
+
# Test DPoP authentication with an AT Protocol endpoint
350
+
cargo run --bin atproto-client-dpop \
351
+
--private-key did:key:zQ3shNzMp4oaaQ1gQRzCxMGXFrSW3NEM1M9T6KCY9eA7HhyEA \
352
+
--access-token your_access_token \
353
+
--issuer did:plc:issuer123 \
354
+
--url https://pds.example.com/xrpc/com.atproto.repo.listRecords \
355
+
--method GET
356
+
357
+
# Example POST request with DPoP authentication
358
+
cargo run --bin atproto-client-dpop \
359
+
--private-key did:key:zQ3shNzMp4oaaQ1gQRzCxMGXFrSW3NEM1M9T6KCY9eA7HhyEA \
360
+
--access-token your_access_token \
361
+
--issuer did:plc:issuer123 \
362
+
--url https://pds.example.com/xrpc/com.atproto.repo.createRecord \
363
+
--method POST \
364
+
--data '{"repo":"did:plc:user123","collection":"app.bsky.feed.post","record":{"$type":"app.bsky.feed.post","text":"Hello AT Protocol!"}}'
365
+
```
366
+
367
+
**Arguments:**
368
+
- `--private-key` - DID key string for DPoP proof signing
369
+
- `--access-token` - OAuth access token for authentication
370
+
- `--issuer` - Issuer DID for proof validation
371
+
- `--url` - Target URL for the authenticated request
372
+
- `--method` - HTTP method (GET, POST, PUT, DELETE)
373
+
- `--data` - Optional JSON data for POST/PUT requests
374
+
375
+
This tool is useful for:
376
+
- Testing DPoP implementation against AT Protocol services
377
+
- Validating authentication flows during development
378
+
- Debugging DPoP proof generation and validation
379
+
- Learning how DPoP authentication works in practice
336
380
337
381
## Contributing
338
382
+94
-10
crates/atproto-client/src/client.rs
+94
-10
crates/atproto-client/src/client.rs
···
4
4
//! with support for DPoP (Demonstration of Proof-of-Possession) authentication.
5
5
6
6
use crate::errors::{ClientError, DPoPError};
7
-
use anyhow::Result;
7
+
use anyhow::{Context, Result};
8
8
use atproto_identity::key::KeyData;
9
9
use atproto_oauth::dpop::{DpopRetry, request_dpop};
10
+
use bytes::Bytes;
10
11
use reqwest::header::HeaderMap;
11
12
use reqwest_chain::ChainMiddleware;
12
13
use reqwest_middleware::ClientBuilder;
···
41
42
/// Returns `ClientError::HttpRequestFailed` if the HTTP request fails,
42
43
/// or `ClientError::JsonParseFailed` if JSON parsing fails.
43
44
pub async fn get_json(http_client: &reqwest::Client, url: &str) -> Result<serde_json::Value> {
44
-
let http_response =
45
-
http_client
46
-
.get(url)
47
-
.send()
48
-
.await
49
-
.map_err(|error| ClientError::HttpRequestFailed {
50
-
url: url.to_string(),
51
-
error,
52
-
})?;
45
+
let empty = HeaderMap::default();
46
+
get_json_with_headers(http_client, url, &empty).await
47
+
}
48
+
49
+
/// Performs an unauthenticated HTTP GET request with additional headers and parses the response as JSON.
50
+
///
51
+
/// # Arguments
52
+
///
53
+
/// * `http_client` - The HTTP client to use for the request
54
+
/// * `url` - The URL to request
55
+
/// * `additional_headers` - Additional HTTP headers to include in the request
56
+
///
57
+
/// # Returns
58
+
///
59
+
/// The parsed JSON response as a `serde_json::Value`
60
+
///
61
+
/// # Errors
62
+
///
63
+
/// Returns `ClientError::HttpRequestFailed` if the HTTP request fails,
64
+
/// or `ClientError::JsonParseFailed` if JSON parsing fails.
65
+
pub async fn get_json_with_headers(
66
+
http_client: &reqwest::Client,
67
+
url: &str,
68
+
additional_headers: &HeaderMap,
69
+
) -> Result<serde_json::Value> {
70
+
let http_response = http_client
71
+
.get(url)
72
+
.headers(additional_headers.clone())
73
+
.send()
74
+
.instrument(tracing::info_span!("get_json_with_headers", url = %url))
75
+
.await
76
+
.map_err(|error| ClientError::HttpRequestFailed {
77
+
url: url.to_string(),
78
+
error,
79
+
})?;
53
80
54
81
let value = http_response
55
82
.json::<serde_json::Value>()
···
60
87
})?;
61
88
62
89
Ok(value)
90
+
}
91
+
92
+
/// Performs an unauthenticated HTTP GET request and returns the response as bytes.
93
+
///
94
+
/// # Arguments
95
+
///
96
+
/// * `http_client` - The HTTP client to use for the request
97
+
/// * `url` - The URL to request
98
+
///
99
+
/// # Returns
100
+
///
101
+
/// The response body as bytes
102
+
///
103
+
/// # Errors
104
+
///
105
+
/// Returns `ClientError::HttpRequestFailed` if the HTTP request fails,
106
+
/// or an error if streaming the response bytes fails.
107
+
pub async fn get_bytes(http_client: &reqwest::Client, url: &str) -> Result<Bytes> {
108
+
let empty = HeaderMap::default();
109
+
get_bytes_with_headers(http_client, url, &empty).await
110
+
}
111
+
112
+
/// Performs an unauthenticated HTTP GET request with additional headers and returns the response as bytes.
113
+
///
114
+
/// # Arguments
115
+
///
116
+
/// * `http_client` - The HTTP client to use for the request
117
+
/// * `url` - The URL to request
118
+
/// * `additional_headers` - Additional HTTP headers to include in the request
119
+
///
120
+
/// # Returns
121
+
///
122
+
/// The response body as bytes
123
+
///
124
+
/// # Errors
125
+
///
126
+
/// Returns `ClientError::HttpRequestFailed` if the HTTP request fails,
127
+
/// or an error if streaming the response bytes fails.
128
+
pub async fn get_bytes_with_headers(
129
+
http_client: &reqwest::Client,
130
+
url: &str,
131
+
additional_headers: &HeaderMap,
132
+
) -> Result<Bytes> {
133
+
let http_response = http_client
134
+
.get(url)
135
+
.headers(additional_headers.clone())
136
+
.send()
137
+
.instrument(tracing::info_span!("get_bytes_with_headers", url = %url))
138
+
.await
139
+
.map_err(|error| ClientError::HttpRequestFailed {
140
+
url: url.to_string(),
141
+
error,
142
+
})?;
143
+
http_response
144
+
.bytes()
145
+
.await
146
+
.context("failed streaming bytes")
63
147
}
64
148
65
149
/// Performs a DPoP-authenticated HTTP GET request and parses the response as JSON.
+33
-1
crates/atproto-client/src/com_atproto_repo.rs
+33
-1
crates/atproto-client/src/com_atproto_repo.rs
···
28
28
use std::collections::HashMap;
29
29
30
30
use anyhow::Result;
31
+
use bytes::Bytes;
31
32
use serde::{Deserialize, Serialize, de::DeserializeOwned};
32
33
33
34
use crate::{
34
-
client::{DPoPAuth, get_dpop_json, get_json, post_dpop_json},
35
+
client::{DPoPAuth, get_bytes, get_dpop_json, get_json, post_dpop_json},
35
36
errors::SimpleError,
36
37
url::URLBuilder,
37
38
};
···
55
56
},
56
57
/// Error response from the server
57
58
Error(SimpleError),
59
+
}
60
+
61
+
/// Retrieves a blob from an AT Protocol repository by DID and CID.
62
+
///
63
+
/// # Arguments
64
+
///
65
+
/// * `http_client` - HTTP client for making requests
66
+
/// * `base_url` - Base URL of the AT Protocol server
67
+
/// * `did` - Repository identifier (DID) containing the blob
68
+
/// * `cid` - Content identifier (CID) of the blob to retrieve
69
+
///
70
+
/// # Returns
71
+
///
72
+
/// The blob data as bytes
73
+
pub async fn get_blob(
74
+
http_client: &reqwest::Client,
75
+
base_url: &str,
76
+
did: &str,
77
+
cid: &str,
78
+
) -> Result<Bytes> {
79
+
let mut url_builder = URLBuilder::new(base_url);
80
+
url_builder.path("/xrpc/com.atproto.sync.getBlob");
81
+
82
+
url_builder.param("did", did);
83
+
url_builder.param("cid", cid);
84
+
85
+
let url = url_builder.build();
86
+
87
+
tracing::info!(?url, "get_blob");
88
+
89
+
get_bytes(http_client, &url).await
58
90
}
59
91
60
92
/// Retrieves a record from an AT Protocol repository.
+1
-1
crates/atproto-identity/Cargo.toml
+1
-1
crates/atproto-identity/Cargo.toml
···
1
1
[package]
2
2
name = "atproto-identity"
3
-
version = "0.5.0"
3
+
version = "0.6.0"
4
4
description = "AT Protocol identity management - DID resolution, handle resolution, and cryptographic operations"
5
5
readme = "README.md"
6
6
homepage = "https://tangled.sh/@smokesignal.events/atproto-identity-rs"
+1
-1
crates/atproto-identity/README.md
+1
-1
crates/atproto-identity/README.md
+3
-3
crates/atproto-identity/src/model.rs
+3
-3
crates/atproto-identity/src/model.rs
···
10
10
11
11
/// AT Protocol service configuration from a DID document.
12
12
/// Represents services like Personal Data Servers (PDS).
13
-
#[derive(Clone, Deserialize, Debug, PartialEq)]
13
+
#[derive(Clone, Serialize, Deserialize, Debug, PartialEq)]
14
14
#[serde(rename_all = "camelCase")]
15
15
pub struct Service {
16
16
/// Unique identifier for the service.
···
27
27
28
28
/// Cryptographic verification method from a DID document.
29
29
/// Used to verify signatures and authenticate identity operations.
30
-
#[derive(Clone, Deserialize, Debug, PartialEq)]
30
+
#[derive(Clone, Serialize, Deserialize, Debug, PartialEq)]
31
31
#[serde(tag = "type")]
32
32
pub enum VerificationMethod {
33
33
/// Multikey verification method with multibase-encoded public key.
···
57
57
58
58
/// Complete DID document containing identity information.
59
59
/// Contains services, verification methods, and aliases for a DID.
60
-
#[derive(Clone, Deserialize, Debug, PartialEq)]
60
+
#[derive(Clone, Serialize, Deserialize, Debug, PartialEq)]
61
61
#[serde(rename_all = "camelCase")]
62
62
pub struct Document {
63
63
/// The DID identifier (e.g., "did:plc:abc123").
+35
crates/atproto-jetstream/Cargo.toml
+35
crates/atproto-jetstream/Cargo.toml
···
1
+
[package]
2
+
name = "atproto-jetstream"
3
+
version = "0.6.0"
4
+
description = "AT Protocol Jetstream event consumer library with WebSocket streaming and compression support"
5
+
readme = "README.md"
6
+
homepage = "https://tangled.sh/@smokesignal.events/atproto-identity-rs"
7
+
documentation = "https://docs.rs/atproto-jetstream"
8
+
9
+
edition.workspace = true
10
+
rust-version.workspace = true
11
+
repository.workspace = true
12
+
authors.workspace = true
13
+
license.workspace = true
14
+
keywords.workspace = true
15
+
categories.workspace = true
16
+
17
+
[dependencies]
18
+
tokio = { workspace = true, features = ["full"] }
19
+
tokio-util.workspace = true
20
+
futures.workspace = true
21
+
serde.workspace = true
22
+
serde_json.workspace = true
23
+
zstd.workspace = true
24
+
anyhow.workspace = true
25
+
thiserror.workspace = true
26
+
tracing.workspace = true
27
+
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
28
+
async-trait.workspace = true
29
+
atproto-identity.workspace = true
30
+
urlencoding.workspace = true
31
+
tokio-websockets.workspace = true
32
+
http.workspace = true
33
+
34
+
[lints]
35
+
workspace = true
+313
crates/atproto-jetstream/README.md
+313
crates/atproto-jetstream/README.md
···
1
+
# atproto-jetstream
2
+
3
+
A Rust library for consuming AT Protocol Jetstream events with high-performance WebSocket streaming, flexible event handling, and optional Zstandard compression support.
4
+
5
+
## Overview
6
+
7
+
`atproto-jetstream` provides a comprehensive async stream consumer for AT Protocol Jetstream events. This library enables real-time consumption of AT Protocol repository events, identity changes, and account updates through WebSocket connections with support for filtering, compression, and graceful shutdown patterns.
8
+
9
+
This project was extracted from the open-sourced [Smokesignal](https://tangled.sh/@smokesignal.events/smokesignal) project and is designed to be a standalone, reusable library for AT Protocol event stream consumption.
10
+
11
+
## Features
12
+
13
+
- **High-Performance WebSocket Streaming**: Async WebSocket-based event consumption with automatic reconnection
14
+
- **Flexible Event Handler System**: Register multiple custom event handlers with unique identifiers
15
+
- **Zstandard Compression**: Optional compression support with custom dictionaries for bandwidth optimization
16
+
- **Event Filtering**: Filter events by collections and DIDs for targeted consumption
17
+
- **Graceful Shutdown**: Cancellation token support for clean shutdown and resource cleanup
18
+
- **Message Size Management**: Configurable maximum message sizes and rate limiting
19
+
- **Cursor Support**: Resume streaming from specific points using cursor positioning
20
+
- **Structured Error Handling**: Comprehensive error types with detailed error codes following project conventions
21
+
- **Built-in Event Broadcasting**: Event broadcasting to multiple consumers with `tokio::sync::broadcast`
22
+
- **Tracing Integration**: Full structured logging support for debugging and monitoring
23
+
24
+
## Usage
25
+
26
+
### Basic Event Consumer
27
+
28
+
```rust
29
+
use atproto_jetstream::{Consumer, ConsumerTaskConfig, EventHandler, JetstreamEvent, CancellationToken};
30
+
use async_trait::async_trait;
31
+
use anyhow::Result;
32
+
33
+
// Create a custom event handler
34
+
struct MyEventHandler;
35
+
36
+
#[async_trait]
37
+
impl EventHandler for MyEventHandler {
38
+
async fn handle_event(&self, event: JetstreamEvent) -> Result<()> {
39
+
println!("Received event: {:?}", event);
40
+
Ok(())
41
+
}
42
+
43
+
fn handler_id(&self) -> String {
44
+
"my-handler".to_string()
45
+
}
46
+
}
47
+
48
+
#[tokio::main]
49
+
async fn main() -> Result<()> {
50
+
let config = ConsumerTaskConfig {
51
+
user_agent: "my-app/1.0".to_string(),
52
+
compression: false,
53
+
zstd_dictionary_location: String::new(),
54
+
jetstream_hostname: "jetstream1.us-east.bsky.network".to_string(),
55
+
collections: vec!["app.bsky.feed.post".to_string()],
56
+
};
57
+
58
+
let consumer = Consumer::new(config);
59
+
let handler = std::sync::Arc::new(MyEventHandler);
60
+
61
+
consumer.register_handler(handler).await?;
62
+
63
+
let cancellation_token = CancellationToken::new();
64
+
consumer.run_background(cancellation_token).await?;
65
+
66
+
Ok(())
67
+
}
68
+
```
69
+
70
+
### Using Multiple Event Handlers
71
+
72
+
```rust
73
+
use atproto_jetstream::{Consumer, LoggingHandler};
74
+
use std::sync::Arc;
75
+
76
+
// Register multiple handlers
77
+
let consumer = Consumer::new(config);
78
+
79
+
let logging_handler = Arc::new(LoggingHandler::new("logger".to_string()));
80
+
let custom_handler = Arc::new(MyEventHandler);
81
+
82
+
consumer.register_handler(logging_handler).await?;
83
+
consumer.register_handler(custom_handler).await?;
84
+
```
85
+
86
+
### With Compression Support
87
+
88
+
```rust
89
+
let config = ConsumerTaskConfig {
90
+
user_agent: "my-app/1.0".to_string(),
91
+
compression: true,
92
+
zstd_dictionary_location: "./data/zstd_dictionary".to_string(),
93
+
jetstream_hostname: "jetstream1.us-east.bsky.network".to_string(),
94
+
collections: vec!["app.bsky.feed.post".to_string()],
95
+
};
96
+
97
+
// Download the Zstandard dictionary first:
98
+
// mkdir -p data/
99
+
// curl -o data/zstd_dictionary https://github.com/bluesky-social/jetstream/raw/refs/heads/main/pkg/models/zstd_dictionary
100
+
```
101
+
102
+
## Installation
103
+
104
+
Add this to your `Cargo.toml`:
105
+
106
+
```toml
107
+
[dependencies]
108
+
atproto-jetstream = "0.1.0"
109
+
```
110
+
111
+
## Command Line Tools
112
+
113
+
The crate includes a command-line tool for consuming AT Protocol Jetstream events:
114
+
115
+
### `atproto-jetstream-consumer`
116
+
117
+
A comprehensive command-line tool for consuming AT Protocol Jetstream events with real-time streaming, filtering capabilities, and optional compression support. This tool provides an easy way to monitor AT Protocol event streams for development, testing, and production monitoring.
118
+
119
+
**Features:**
120
+
- **Real-Time Event Streaming**: Connects to AT Protocol Jetstream instances for live event consumption
121
+
- **Event Filtering**: Filter events by specific collections and DIDs for targeted monitoring
122
+
- **Compression Support**: Optional Zstandard compression with dictionary support for bandwidth optimization
123
+
- **Flexible Output**: Structured JSON output for each event with customizable logging levels
124
+
- **Connection Management**: Automatic reconnection handling and graceful shutdown on interruption
125
+
- **Configurable Parameters**: Extensive configuration options for hostname, collections, message sizes, and more
126
+
127
+
```bash
128
+
# Basic event streaming from Jetstream
129
+
cargo run --bin atproto-jetstream-consumer \
130
+
--hostname jetstream1.us-east.bsky.network \
131
+
--collections app.bsky.feed.post \
132
+
--user-agent "my-consumer/1.0"
133
+
134
+
# Stream specific collections with filtering
135
+
cargo run --bin atproto-jetstream-consumer \
136
+
--hostname jetstream1.us-east.bsky.network \
137
+
--collections "app.bsky.feed.post,app.bsky.actor.profile" \
138
+
--dids "did:plc:user123,did:plc:user456" \
139
+
--user-agent "filtered-consumer/1.0"
140
+
141
+
# With Zstandard compression enabled
142
+
cargo run --bin atproto-jetstream-consumer \
143
+
--hostname jetstream1.us-east.bsky.network \
144
+
--collections app.bsky.feed.post \
145
+
--compression \
146
+
--zstd-dictionary ./data/zstd_dictionary \
147
+
--user-agent "compressed-consumer/1.0"
148
+
149
+
# Advanced configuration with message size limits and cursor
150
+
cargo run --bin atproto-jetstream-consumer \
151
+
--hostname jetstream1.us-east.bsky.network \
152
+
--collections app.bsky.feed.post \
153
+
--max-message-size 1048576 \
154
+
--cursor 1234567890 \
155
+
--require-hello \
156
+
--user-agent "advanced-consumer/1.0"
157
+
```
158
+
159
+
**Command Line Arguments:**
160
+
- `--hostname` - Jetstream hostname to connect to (e.g., jetstream1.us-east.bsky.network)
161
+
- `--collections` - Comma-separated list of AT Protocol collections to subscribe to
162
+
- `--dids` - Optional comma-separated list of DIDs to filter events for
163
+
- `--user-agent` - User-Agent string for the WebSocket connection
164
+
- `--compression` - Enable Zstandard compression (requires zstd-dictionary)
165
+
- `--zstd-dictionary` - Path to Zstandard dictionary file for compression
166
+
- `--max-message-size` - Maximum message size in bytes (default: 56000)
167
+
- `--cursor` - Optional cursor position to start streaming from
168
+
- `--require-hello` - Require hello message before receiving events (default: true)
169
+
170
+
**Setting up Compression:**
171
+
To use compression, you need to download the Zstandard dictionary:
172
+
173
+
```bash
174
+
# Create data directory and download dictionary
175
+
mkdir -p data/
176
+
curl -o data/zstd_dictionary \
177
+
https://github.com/bluesky-social/jetstream/raw/refs/heads/main/pkg/models/zstd_dictionary
178
+
179
+
# Use with compression enabled
180
+
cargo run --bin atproto-jetstream-consumer \
181
+
--hostname jetstream1.us-east.bsky.network \
182
+
--collections app.bsky.feed.post \
183
+
--compression \
184
+
--zstd-dictionary ./data/zstd_dictionary
185
+
```
186
+
187
+
**Output Format:**
188
+
The tool outputs structured JSON for each received event:
189
+
190
+
```json
191
+
{
192
+
"kind": "commit",
193
+
"time_us": 1704067200000000,
194
+
"did": "did:plc:user123",
195
+
"commit": {
196
+
"rev": "3l2uygzaf5c2b",
197
+
"operation": "create",
198
+
"collection": "app.bsky.feed.post",
199
+
"rkey": "3l2uygzaf5c2c",
200
+
"cid": "bafyreif5n4jf6jfczjqzckzqxdxm5qnz4jf6jfczjqzckzqxdxm5qnz4",
201
+
"record": {
202
+
"$type": "app.bsky.feed.post",
203
+
"text": "Hello AT Protocol!",
204
+
"createdAt": "2024-01-01T00:00:00Z"
205
+
}
206
+
}
207
+
}
208
+
```
209
+
210
+
This tool is ideal for:
211
+
- **Development and Testing**: Monitor AT Protocol events during application development
212
+
- **Production Monitoring**: Track repository changes and user activity in real-time
213
+
- **Data Analysis**: Collect AT Protocol events for analysis and research
214
+
- **Integration Testing**: Verify that your applications are generating expected events
215
+
- **System Monitoring**: Monitor the health and activity of AT Protocol networks
216
+
217
+
## Event Types
218
+
219
+
The library handles `JetstreamEvent` structures with the following fields:
220
+
221
+
- `kind`: Event type (e.g., "commit", "identity", "account")
222
+
- `time_us`: Event timestamp in microseconds
223
+
- `commit`: Optional commit data for repository events
224
+
- `identity`: Optional identity change data
225
+
- `account`: Optional account-related data
226
+
227
+
## Modules
228
+
229
+
- **[`consumer`]** - Core consumer implementation with WebSocket streaming and event handling
230
+
- **[`lib`]** - Public library interface and re-exports
231
+
232
+
## Error Handling
233
+
234
+
The crate uses comprehensive structured error types with unique identifiers:
235
+
236
+
```
237
+
error-atproto-jetstream-<domain>-<number> <message>: <details>
238
+
```
239
+
240
+
All errors follow the project convention:
241
+
242
+
- `ConsumerError::ConnectionFailed` - WebSocket connection establishment failures
243
+
- `ConsumerError::DecompressionFailed` - Zstandard decompression operation failures
244
+
- `ConsumerError::DeserializationFailed` - JSON event parsing failures
245
+
- `ConsumerError::HandlerRegistrationFailed` - Event handler registration conflicts
246
+
- `ConsumerError::EventSenderNotInitialized` - Event broadcasting setup errors
247
+
- `ConsumerError::MessageConversionFailed` - WebSocket message format errors
248
+
- `ConsumerError::UpdateSerializationFailed` - Subscription update serialization errors
249
+
- `ConsumerError::UpdateSendFailed` - Subscription update transmission errors
250
+
- `ConsumerError::DecompressorCreationFailed` - Zstandard decompressor initialization errors
251
+
252
+
```rust
253
+
use atproto_jetstream::consumer::ConsumerError;
254
+
255
+
// Example error handling
256
+
match consumer_result {
257
+
Err(ConsumerError::ConnectionFailed(details)) => {
258
+
println!("Failed to connect to Jetstream: {}", details);
259
+
}
260
+
Err(ConsumerError::DecompressionFailed(error)) => {
261
+
println!("Decompression failed: {}", error);
262
+
}
263
+
Err(ConsumerError::HandlerRegistrationFailed(error)) => {
264
+
println!("Handler registration failed: {}", error);
265
+
}
266
+
Ok(()) => println!("Consumer operation successful"),
267
+
}
268
+
```
269
+
270
+
## Dependencies
271
+
272
+
This crate builds on:
273
+
274
+
- `tokio` - Async runtime for WebSocket connections and event handling
275
+
- `tokio-websockets` - WebSocket client implementation for Jetstream connections
276
+
- `tokio-util` - Additional utilities including cancellation token support
277
+
- `futures` - Stream and sink traits for async WebSocket operations
278
+
- `zstd` - Zstandard compression support with dictionary-based decompression
279
+
- `serde_json` - JSON serialization and deserialization for AT Protocol events
280
+
- `http` - HTTP types for WebSocket headers and URI parsing
281
+
- `urlencoding` - URL encoding for query parameters in WebSocket connections
282
+
- `async_trait` - Async trait support for event handler implementations
283
+
- `anyhow` - Error handling utilities and result types
284
+
- `tracing` - Structured logging for debugging and monitoring
285
+
- `thiserror` - Structured error type derivation
286
+
287
+
## AT Protocol Jetstream
288
+
289
+
This library implements a client for [AT Protocol Jetstream](https://github.com/bluesky-social/jetstream), which provides:
290
+
291
+
- **Real-Time Event Streaming**: Live consumption of AT Protocol repository events
292
+
- **Efficient Compression**: Zstandard compression with custom dictionaries for bandwidth optimization
293
+
- **Event Filtering**: Server-side filtering by collections and DIDs for targeted consumption
294
+
- **High Performance**: WebSocket-based streaming designed for high-throughput event processing
295
+
- **Reliability**: Built-in connection management and error recovery patterns
296
+
297
+
## Contributing
298
+
299
+
Contributions are welcome! Please ensure that:
300
+
301
+
1. All tests pass: `cargo test`
302
+
2. Code is properly formatted: `cargo fmt`
303
+
3. No linting issues: `cargo clippy`
304
+
4. New functionality includes appropriate tests and documentation
305
+
5. Error handling follows the project's structured error format
306
+
307
+
## License
308
+
309
+
This project is licensed under the MIT License. See the LICENSE file for details.
310
+
311
+
## Acknowledgments
312
+
313
+
This library was extracted from the [Smokesignal](https://tangled.sh/@smokesignal.events/smokesignal) project, an open-source event and RSVP management and discovery application.
+180
crates/atproto-jetstream/src/bin/atproto-jetstream-consumer.rs
+180
crates/atproto-jetstream/src/bin/atproto-jetstream-consumer.rs
···
1
+
//! AT Protocol Jetstream consumer tool for streaming events.
2
+
//!
3
+
//! This binary tool connects to a Jetstream instance and streams AT Protocol
4
+
//! events from specified collections, with optional zstd compression support.
5
+
6
+
use anyhow::Result;
7
+
use atproto_identity::config::{CertificateBundles, default_env, optional_env, version};
8
+
use atproto_jetstream::{CancellationToken, Consumer, ConsumerTaskConfig, LoggingHandler};
9
+
use std::{env, sync::Arc};
10
+
use tokio::signal;
11
+
12
+
fn print_usage() {
13
+
println!("AT Protocol Jetstream Consumer Tool");
14
+
println!();
15
+
println!("Usage:");
16
+
println!(" atproto-jetstream-consumer <jetstream_hostname> <zstd_dictionary> [collection...]");
17
+
println!();
18
+
println!("Arguments:");
19
+
println!(" jetstream_hostname Hostname of the Jetstream instance to connect to");
20
+
println!(
21
+
" zstd_dictionary Path to zstd dictionary file (use 'none' to disable compression)"
22
+
);
23
+
println!(" collection Zero or more AT Protocol collections to subscribe to");
24
+
println!();
25
+
println!("Environment Variables:");
26
+
println!(" CERTIFICATE_BUNDLES Optional path to additional CA certificates");
27
+
println!(" USER_AGENT Custom user agent string");
28
+
println!();
29
+
println!("Examples:");
30
+
println!(" # Subscribe to feed posts with compression");
31
+
println!(
32
+
" atproto-jetstream-consumer jetstream1.us-east.bsky.network /path/to/dict.zstd app.bsky.feed.post"
33
+
);
34
+
println!();
35
+
println!(" # Subscribe to multiple collections without compression");
36
+
println!(
37
+
" atproto-jetstream-consumer jetstream1.us-east.bsky.network none app.bsky.feed.post app.bsky.feed.repost"
38
+
);
39
+
println!();
40
+
println!(" # Subscribe to all collections");
41
+
println!(" atproto-jetstream-consumer jetstream1.us-east.bsky.network none");
42
+
}
43
+
44
+
#[tokio::main]
45
+
async fn main() -> Result<()> {
46
+
// Initialize tracing
47
+
tracing_subscriber::fmt()
48
+
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
49
+
.init();
50
+
51
+
// Parse command line arguments
52
+
let args: Vec<String> = env::args().skip(1).collect();
53
+
54
+
if args.len() < 2 || args.iter().any(|arg| arg == "--help" || arg == "-h") {
55
+
print_usage();
56
+
return Ok(());
57
+
}
58
+
59
+
let jetstream_hostname = &args[0];
60
+
let zstd_dictionary_path = &args[1];
61
+
let collections: Vec<String> = args[2..].iter().map(|s| s.to_string()).collect();
62
+
63
+
tracing::info!(
64
+
hostname = %jetstream_hostname,
65
+
dictionary = %zstd_dictionary_path,
66
+
collections = ?collections,
67
+
"Starting Jetstream consumer"
68
+
);
69
+
70
+
// Handle environment variables
71
+
let _certificate_bundles: CertificateBundles =
72
+
optional_env("CERTIFICATE_BUNDLES").try_into()?;
73
+
let default_user_agent = format!(
74
+
"atproto-jetstream-rs ({}; +https://tangled.sh/@smokesignal.events/atproto-identity-rs)",
75
+
version()?
76
+
);
77
+
let user_agent = default_env("USER_AGENT", &default_user_agent);
78
+
79
+
tracing::info!(user_agent = %user_agent, "Configuration loaded");
80
+
81
+
// Load zstd dictionary if specified
82
+
let compression_enabled = zstd_dictionary_path != "none";
83
+
84
+
// Create consumer configuration
85
+
let config = ConsumerTaskConfig {
86
+
user_agent,
87
+
compression: compression_enabled,
88
+
zstd_dictionary_location: if compression_enabled {
89
+
zstd_dictionary_path.to_string()
90
+
} else {
91
+
String::new()
92
+
},
93
+
jetstream_hostname: jetstream_hostname.to_string(),
94
+
collections: if collections.is_empty() {
95
+
tracing::info!("No collections specified, subscribing to all");
96
+
vec![]
97
+
} else {
98
+
collections
99
+
},
100
+
dids: vec![], // Default to all DIDs
101
+
max_message_size_bytes: None, // Default to no limit
102
+
cursor: None, // Default to live-tail
103
+
require_hello: true, // Default to true as requested
104
+
};
105
+
106
+
// Create consumer
107
+
let consumer = Consumer::new(config);
108
+
109
+
// Register logging handler
110
+
let logging_handler = Arc::new(LoggingHandler::new("jetstream-logger".to_string()));
111
+
consumer.register_handler(logging_handler).await?;
112
+
113
+
tracing::info!("Jetstream consumer registered and ready");
114
+
115
+
// Set up cancellation token for graceful shutdown
116
+
let cancellation_token = CancellationToken::new();
117
+
let cancellation_token_clone = cancellation_token.clone();
118
+
119
+
// Set up signal handling for graceful shutdown
120
+
let signal_handler = tokio::spawn(async move {
121
+
#[cfg(unix)]
122
+
{
123
+
let mut sigint = signal::unix::signal(signal::unix::SignalKind::interrupt())
124
+
.expect("Failed to create SIGINT handler");
125
+
let mut sigterm = signal::unix::signal(signal::unix::SignalKind::terminate())
126
+
.expect("Failed to create SIGTERM handler");
127
+
128
+
tokio::select! {
129
+
_ = sigint.recv() => {
130
+
tracing::info!("Received SIGINT, initiating graceful shutdown");
131
+
}
132
+
_ = sigterm.recv() => {
133
+
tracing::info!("Received SIGTERM, initiating graceful shutdown");
134
+
}
135
+
}
136
+
}
137
+
138
+
#[cfg(windows)]
139
+
{
140
+
signal::ctrl_c().await.expect("Failed to listen for Ctrl+C");
141
+
tracing::info!("Received Ctrl+C, initiating graceful shutdown");
142
+
}
143
+
144
+
cancellation_token_clone.cancel();
145
+
});
146
+
147
+
// Run consumer
148
+
let consumer_task = tokio::spawn(async move {
149
+
if let Err(err) = consumer.run_background(cancellation_token).await {
150
+
tracing::error!(error = ?err, "Consumer failed");
151
+
return Err(err);
152
+
}
153
+
Ok(())
154
+
});
155
+
156
+
tracing::info!("Consumer started, press Ctrl+C to stop");
157
+
158
+
// Wait for either the consumer to finish or signal handler
159
+
tokio::select! {
160
+
result = consumer_task => {
161
+
match result {
162
+
Ok(Ok(())) => tracing::info!("Consumer finished successfully"),
163
+
Ok(Err(err)) => {
164
+
tracing::error!(error = ?err, "Consumer failed");
165
+
return Err(err);
166
+
}
167
+
Err(err) => {
168
+
tracing::error!(error = ?err, "Consumer task panicked");
169
+
return Err(err.into());
170
+
}
171
+
}
172
+
}
173
+
_ = signal_handler => {
174
+
tracing::info!("Signal handler completed");
175
+
}
176
+
}
177
+
178
+
tracing::info!("Jetstream consumer shutting down");
179
+
Ok(())
180
+
}
+497
crates/atproto-jetstream/src/consumer.rs
+497
crates/atproto-jetstream/src/consumer.rs
···
1
+
//! Async stream consumer for AT Protocol Jetstream events
2
+
//!
3
+
//! This module provides structures for consuming events from an async stream
4
+
//! and dispatching them to registered event handlers.
5
+
6
+
use anyhow::Result;
7
+
use async_trait::async_trait;
8
+
use futures::{SinkExt, StreamExt};
9
+
use http::Uri;
10
+
use serde::{Deserialize, Serialize};
11
+
use std::sync::Arc;
12
+
use std::{collections::HashMap, str::FromStr};
13
+
use tokio::sync::{RwLock, broadcast};
14
+
use tokio::time::{Instant, sleep};
15
+
use tokio_util::sync::CancellationToken;
16
+
use tokio_websockets::{ClientBuilder, Message};
17
+
use tracing::Instrument;
18
+
19
+
const MAX_MESSAGE_SIZE: usize = 56000;
20
+
21
+
/// Configuration for the Jetstream consumer task
22
+
#[derive(Clone, Debug)]
23
+
pub struct ConsumerTaskConfig {
24
+
/// User-Agent header value for WebSocket connections
25
+
pub user_agent: String,
26
+
/// Enable Zstandard compression for messages
27
+
pub compression: bool,
28
+
/// Path to Zstandard dictionary file (required if compression is enabled)
29
+
pub zstd_dictionary_location: String,
30
+
/// Hostname of the Jetstream instance to connect to
31
+
pub jetstream_hostname: String,
32
+
/// AT Protocol collections to subscribe to (empty for all)
33
+
pub collections: Vec<String>,
34
+
/// DIDs to filter events for (empty for all)
35
+
pub dids: Vec<String>,
36
+
/// Maximum message size in bytes (None for unlimited)
37
+
pub max_message_size_bytes: Option<u64>,
38
+
/// Optional cursor position to start streaming from
39
+
pub cursor: Option<i64>,
40
+
/// Whether to require a hello message before receiving events
41
+
pub require_hello: bool,
42
+
}
43
+
44
+
/// Event data structure for Jetstream events
45
+
#[derive(Debug, Clone, Serialize, Deserialize)]
46
+
#[serde(untagged)]
47
+
pub enum JetstreamEvent {
48
+
/// Repository commit event (create/update operations)
49
+
Commit {
50
+
/// DID of the repository that was updated
51
+
did: String,
52
+
/// Event timestamp in microseconds since Unix epoch
53
+
time_us: u64,
54
+
/// Event type identifier
55
+
kind: String,
56
+
57
+
#[serde(rename = "commit")]
58
+
/// Commit operation details
59
+
commit: JetstreamEventCommit,
60
+
},
61
+
62
+
/// Repository delete event
63
+
Delete {
64
+
/// DID of the repository that was updated
65
+
did: String,
66
+
/// Event timestamp in microseconds since Unix epoch
67
+
time_us: u64,
68
+
/// Event type identifier
69
+
kind: String,
70
+
71
+
#[serde(rename = "commit")]
72
+
/// Delete operation details
73
+
commit: JetstreamEventDelete,
74
+
},
75
+
76
+
/// Identity document update event
77
+
Identity {
78
+
/// DID whose identity was updated
79
+
did: String,
80
+
/// Event timestamp in microseconds since Unix epoch
81
+
time_us: u64,
82
+
/// Event type identifier
83
+
kind: String,
84
+
85
+
#[serde(rename = "identity")]
86
+
/// Identity document data
87
+
identity: serde_json::Value,
88
+
},
89
+
90
+
/// Account-related event
91
+
Account {
92
+
/// DID of the account
93
+
did: String,
94
+
/// Event timestamp in microseconds since Unix epoch
95
+
time_us: u64,
96
+
/// Event type identifier
97
+
kind: String,
98
+
99
+
#[serde(rename = "account")]
100
+
/// Account data
101
+
identity: serde_json::Value,
102
+
},
103
+
}
104
+
105
+
/// Repository commit operation details
106
+
#[derive(Debug, Clone, Serialize, Deserialize)]
107
+
pub struct JetstreamEventCommit {
108
+
/// Repository revision identifier
109
+
pub rev: String,
110
+
/// Operation type (create, update)
111
+
pub operation: String,
112
+
/// AT Protocol collection name
113
+
pub collection: String,
114
+
/// Record key within the collection
115
+
pub rkey: String,
116
+
/// Content identifier (CID) of the record
117
+
pub cid: String,
118
+
/// Record data as JSON
119
+
pub record: serde_json::Value,
120
+
}
121
+
122
+
/// Repository delete operation details
123
+
#[derive(Debug, Clone, Serialize, Deserialize)]
124
+
pub struct JetstreamEventDelete {
125
+
/// Repository revision identifier
126
+
pub rev: String,
127
+
/// Operation type (delete)
128
+
pub operation: String,
129
+
/// AT Protocol collection name
130
+
pub collection: String,
131
+
/// Record key that was deleted
132
+
pub rkey: String,
133
+
}
134
+
135
+
/// Trait for handling Jetstream events
136
+
#[async_trait]
137
+
pub trait EventHandler: Send + Sync {
138
+
/// Handle a received event
139
+
async fn handle_event(&self, event: JetstreamEvent) -> Result<()>;
140
+
141
+
/// Get the handler's identifier
142
+
fn handler_id(&self) -> String;
143
+
}
144
+
145
+
/// Errors specific to the consumer module
146
+
#[derive(thiserror::Error, Debug)]
147
+
pub enum ConsumerError {
148
+
/// WebSocket connection establishment failed
149
+
#[error("error-atproto-jetstream-consumer-1 WebSocket connection failed: {0}")]
150
+
ConnectionFailed(String),
151
+
/// Message decompression operation failed
152
+
#[error("error-atproto-jetstream-consumer-2 Message decompression failed: {0}")]
153
+
DecompressionFailed(String),
154
+
/// JSON deserialization of event data failed
155
+
#[error("error-atproto-jetstream-consumer-3 Event deserialization failed: {0}")]
156
+
DeserializationFailed(String),
157
+
/// Event handler registration failed (e.g., duplicate handler ID)
158
+
#[error("error-atproto-jetstream-consumer-4 Handler registration failed: {0}")]
159
+
HandlerRegistrationFailed(String),
160
+
/// Event broadcast sender not initialized (consumer not running)
161
+
#[error("error-atproto-jetstream-consumer-5 Event sender not initialized: {0}")]
162
+
EventSenderNotInitialized(String),
163
+
/// WebSocket message format conversion failed
164
+
#[error("error-atproto-jetstream-consumer-6 Message conversion failed: {0}")]
165
+
MessageConversionFailed(String),
166
+
/// Serialization of subscription update message failed
167
+
#[error("error-atproto-jetstream-consumer-7 Update serialization failed: {0}")]
168
+
UpdateSerializationFailed(String),
169
+
/// Sending subscription update message failed
170
+
#[error("error-atproto-jetstream-consumer-8 Update send failed: {0}")]
171
+
UpdateSendFailed(String),
172
+
/// Zstandard decompressor initialization failed
173
+
#[error("error-atproto-jetstream-consumer-9 Decompressor creation failed: {0}")]
174
+
DecompressorCreationFailed(String),
175
+
}
176
+
177
+
#[derive(Debug, Clone, Serialize, Deserialize)]
178
+
#[serde(tag = "type", content = "payload")]
179
+
pub(crate) enum SubscriberSourcedMessage {
180
+
#[serde(rename = "options_update")]
181
+
Update {
182
+
#[serde(rename = "wantedCollections")]
183
+
wanted_collections: Vec<String>,
184
+
185
+
#[serde(rename = "wantedDids", skip_serializing_if = "Vec::is_empty", default)]
186
+
wanted_dids: Vec<String>,
187
+
188
+
#[serde(rename = "maxMessageSizeBytes")]
189
+
max_message_size_bytes: u64,
190
+
191
+
#[serde(skip_serializing_if = "Option::is_none")]
192
+
cursor: Option<i64>,
193
+
},
194
+
}
195
+
196
+
/// Main consumer structure for handling async streams and event dispatching
197
+
pub struct Consumer {
198
+
config: ConsumerTaskConfig,
199
+
handlers: Arc<RwLock<HashMap<String, Arc<dyn EventHandler>>>>,
200
+
event_sender: Arc<RwLock<Option<broadcast::Sender<JetstreamEvent>>>>,
201
+
}
202
+
203
+
impl Consumer {
204
+
/// Create a new consumer with the given configuration
205
+
pub fn new(config: ConsumerTaskConfig) -> Self {
206
+
Self {
207
+
config,
208
+
handlers: Arc::new(RwLock::new(HashMap::new())),
209
+
event_sender: Arc::new(RwLock::new(None)),
210
+
}
211
+
}
212
+
213
+
/// Register an event handler
214
+
pub async fn register_handler(&self, handler: Arc<dyn EventHandler>) -> Result<()> {
215
+
let handler_id = handler.handler_id();
216
+
let mut handlers = self.handlers.write().await;
217
+
218
+
if handlers.contains_key(&handler_id) {
219
+
return Err(ConsumerError::HandlerRegistrationFailed(format!(
220
+
"Handler with ID '{}' already registered",
221
+
handler_id
222
+
))
223
+
.into());
224
+
}
225
+
226
+
handlers.insert(handler_id.clone(), handler);
227
+
tracing::info!(handler_id = %handler_id, "Event handler registered");
228
+
Ok(())
229
+
}
230
+
231
+
/// Unregister an event handler
232
+
pub async fn unregister_handler(&self, handler_id: &str) -> Result<()> {
233
+
let mut handlers = self.handlers.write().await;
234
+
handlers.remove(handler_id);
235
+
tracing::info!(handler_id = %handler_id, "Event handler unregistered");
236
+
Ok(())
237
+
}
238
+
239
+
/// Get a broadcast receiver for events
240
+
pub async fn get_event_receiver(&self) -> Result<broadcast::Receiver<JetstreamEvent>> {
241
+
let sender_guard = self.event_sender.read().await;
242
+
match sender_guard.as_ref() {
243
+
Some(sender) => Ok(sender.subscribe()),
244
+
None => Err(ConsumerError::EventSenderNotInitialized(
245
+
"consumer not running".to_string(),
246
+
)
247
+
.into()),
248
+
}
249
+
}
250
+
251
+
/// Run the consumer in the background
252
+
///
253
+
/// # Example
254
+
/// ```rust,no_run
255
+
/// use atproto_jetstream::{Consumer, ConsumerTaskConfig, CancellationToken};
256
+
///
257
+
/// # async fn example() -> anyhow::Result<()> {
258
+
/// let config = ConsumerTaskConfig {
259
+
/// user_agent: "my-app/1.0".to_string(),
260
+
/// compression: false,
261
+
/// zstd_dictionary_location: String::new(),
262
+
/// jetstream_hostname: "jetstream1.us-east.bsky.network".to_string(),
263
+
/// collections: vec!["app.bsky.feed.post".to_string()],
264
+
/// dids: vec![], // Subscribe to all DIDs
265
+
/// max_message_size_bytes: None, // No limit
266
+
/// cursor: None, // Live-tail from current time
267
+
/// require_hello: true, // Wait for initial options update
268
+
/// };
269
+
///
270
+
/// let consumer = Consumer::new(config);
271
+
/// let cancellation_token = CancellationToken::new();
272
+
///
273
+
/// // To cancel the consumer later:
274
+
/// // cancellation_token.cancel();
275
+
///
276
+
/// consumer.run_background(cancellation_token).await?;
277
+
/// # Ok(())
278
+
/// # }
279
+
/// ```
280
+
pub async fn run_background(&self, cancellation_token: CancellationToken) -> Result<()> {
281
+
tracing::info!("Starting Jetstream consumer");
282
+
283
+
// Build WebSocket URL with query parameters
284
+
let mut query_params = vec![];
285
+
286
+
// Add compression parameter
287
+
query_params.push(format!("compress={}", self.config.compression));
288
+
289
+
// Add requireHello parameter
290
+
query_params.push(format!("requireHello={}", self.config.require_hello));
291
+
292
+
// Add wantedCollections if specified
293
+
if !self.config.collections.is_empty() {
294
+
let collections = self
295
+
.config
296
+
.collections
297
+
.iter()
298
+
.map(|c| urlencoding::encode(c))
299
+
.collect::<Vec<_>>()
300
+
.join(",");
301
+
query_params.push(format!("wantedCollections={}", collections));
302
+
}
303
+
304
+
// Add wantedDids if specified
305
+
if !self.config.dids.is_empty() {
306
+
let dids = self
307
+
.config
308
+
.dids
309
+
.iter()
310
+
.map(|d| urlencoding::encode(d))
311
+
.collect::<Vec<_>>()
312
+
.join(",");
313
+
query_params.push(format!("wantedDids={}", dids));
314
+
}
315
+
316
+
// Add maxMessageSizeBytes if specified
317
+
if let Some(max_size) = self.config.max_message_size_bytes {
318
+
query_params.push(format!("maxMessageSizeBytes={}", max_size));
319
+
}
320
+
321
+
// Add cursor if specified
322
+
if let Some(cursor) = self.config.cursor {
323
+
query_params.push(format!("cursor={}", cursor));
324
+
}
325
+
326
+
let query_string = query_params.join("&");
327
+
let ws_url = Uri::from_str(&format!(
328
+
"wss://{}/subscribe?{}",
329
+
self.config.jetstream_hostname, query_string
330
+
))?;
331
+
332
+
tracing::info!(url = %ws_url, "Connecting to Jetstream");
333
+
334
+
let (mut client, _) = ClientBuilder::from_uri(ws_url)
335
+
.add_header(
336
+
http::header::USER_AGENT,
337
+
http::HeaderValue::from_str(&self.config.user_agent)?,
338
+
)?
339
+
.connect()
340
+
.await?;
341
+
342
+
let update = SubscriberSourcedMessage::Update {
343
+
wanted_collections: self.config.collections.clone(),
344
+
wanted_dids: self.config.dids.clone(),
345
+
max_message_size_bytes: self
346
+
.config
347
+
.max_message_size_bytes
348
+
.unwrap_or(MAX_MESSAGE_SIZE as u64),
349
+
cursor: self.config.cursor,
350
+
};
351
+
let serialized_update = serde_json::to_string(&update)
352
+
.map_err(|err| ConsumerError::UpdateSerializationFailed(err.to_string()))?;
353
+
354
+
client
355
+
.send(Message::text(serialized_update))
356
+
.await
357
+
.map_err(|err| ConsumerError::UpdateSendFailed(err.to_string()))?;
358
+
359
+
let mut decompressor = if self.config.compression {
360
+
// mkdir -p data/ && curl -o data/zstd_dictionary https://github.com/bluesky-social/jetstream/raw/refs/heads/main/pkg/models/zstd_dictionary
361
+
let data: Vec<u8> = std::fs::read(self.config.zstd_dictionary_location.clone())?;
362
+
zstd::bulk::Decompressor::with_dictionary(&data)
363
+
.map_err(|err| ConsumerError::DecompressorCreationFailed(err.to_string()))?
364
+
} else {
365
+
zstd::bulk::Decompressor::new()
366
+
.map_err(|err| ConsumerError::DecompressorCreationFailed(err.to_string()))?
367
+
};
368
+
369
+
let interval = std::time::Duration::from_secs(120);
370
+
let sleeper = sleep(interval);
371
+
tokio::pin!(sleeper);
372
+
373
+
loop {
374
+
tokio::select! {
375
+
() = cancellation_token.cancelled() => {
376
+
break;
377
+
},
378
+
() = &mut sleeper => {
379
+
// consumer_control_insert(&self.pool, &self.config.jetstream_hostname, time_usec).await?;
380
+
381
+
sleeper.as_mut().reset(Instant::now() + interval);
382
+
},
383
+
item = client.next() => {
384
+
if item.is_none() {
385
+
tracing::warn!("jetstream connection closed");
386
+
break;
387
+
}
388
+
let item = item.unwrap();
389
+
390
+
if let Err(err) = item {
391
+
tracing::error!(error = ?err, "error processing jetstream message");
392
+
continue;
393
+
}
394
+
let item = item.unwrap();
395
+
396
+
let event = if self.config.compression {
397
+
if !item.is_binary() {
398
+
tracing::debug!("compression enabled but message from jetstream is not binary");
399
+
continue;
400
+
}
401
+
let payload = item.into_payload();
402
+
403
+
let decoded = decompressor.decompress(&payload, MAX_MESSAGE_SIZE * 3);
404
+
if let Err(err) = decoded {
405
+
tracing::debug!(err = ?err, "cannot decompress message");
406
+
continue;
407
+
}
408
+
let decoded = decoded.unwrap();
409
+
serde_json::from_slice::<JetstreamEvent>(&decoded)
410
+
.map_err(|err| ConsumerError::DeserializationFailed(err.to_string()))
411
+
} else {
412
+
if !item.is_text() {
413
+
tracing::debug!("compression disabled but message from jetstream is binary");
414
+
continue;
415
+
}
416
+
item.as_text()
417
+
.ok_or_else(|| ConsumerError::MessageConversionFailed("cannot convert message to text".to_string()))
418
+
.and_then(|value| {
419
+
serde_json::from_str::<JetstreamEvent>(value)
420
+
.map_err(|err| ConsumerError::DeserializationFailed(err.to_string()))
421
+
})
422
+
};
423
+
if let Err(err) = event {
424
+
tracing::error!(error = ?err, "error processing jetstream message");
425
+
426
+
continue;
427
+
}
428
+
let event = event.unwrap();
429
+
430
+
if let Err(err) = self.dispatch_to_handlers(event).await {
431
+
tracing::error!(error = ?err, "Failed to process message");
432
+
}
433
+
434
+
}
435
+
}
436
+
}
437
+
438
+
// Clean up
439
+
{
440
+
let mut sender_guard = self.event_sender.write().await;
441
+
*sender_guard = None;
442
+
}
443
+
444
+
tracing::info!("Consumer background task finished");
445
+
Ok(())
446
+
}
447
+
448
+
/// Dispatch event to all registered handlers
449
+
async fn dispatch_to_handlers(&self, event: JetstreamEvent) -> Result<()> {
450
+
let handlers = self.handlers.read().await;
451
+
452
+
for (handler_id, handler) in handlers.iter() {
453
+
let handler_span = tracing::debug_span!("handler_dispatch", handler_id = %handler_id);
454
+
async {
455
+
if let Err(err) = handler.handle_event(event.clone()).await {
456
+
tracing::error!(
457
+
error = ?err,
458
+
handler_id = %handler_id,
459
+
"Handler failed to process event"
460
+
);
461
+
}
462
+
}
463
+
.instrument(handler_span)
464
+
.await;
465
+
}
466
+
467
+
Ok(())
468
+
}
469
+
}
470
+
471
+
/// Example event handler implementation
472
+
pub struct LoggingHandler {
473
+
id: String,
474
+
}
475
+
476
+
impl LoggingHandler {
477
+
/// Create a new logging handler with the specified ID
478
+
pub fn new(id: String) -> Self {
479
+
Self { id }
480
+
}
481
+
}
482
+
483
+
#[async_trait]
484
+
impl EventHandler for LoggingHandler {
485
+
async fn handle_event(&self, event: JetstreamEvent) -> Result<()> {
486
+
tracing::info!(
487
+
handler_id = %self.id,
488
+
event = ?event,
489
+
"Processing event"
490
+
);
491
+
Ok(())
492
+
}
493
+
494
+
fn handler_id(&self) -> String {
495
+
self.id.clone()
496
+
}
497
+
}
+24
crates/atproto-jetstream/src/lib.rs
+24
crates/atproto-jetstream/src/lib.rs
···
1
+
//! AT Protocol Jetstream event consumer library.
2
+
//!
3
+
//! Provides async stream consumption and event handling for AT Protocol Jetstream
4
+
//! with support for WebSocket connections, event dispatching, and handler registration.
5
+
//!
6
+
//! ## Key Features
7
+
//!
8
+
//! - **Async Stream Consumer**: High-performance WebSocket-based event consumption
9
+
//! - **Event Handler Registration**: Flexible event handler system with multiple handlers
10
+
//! - **Compression Support**: Optional Zstandard compression with dictionary support
11
+
//! - **Graceful Shutdown**: Cancellation token support for clean shutdown
12
+
//! - **Error Handling**: Comprehensive error types following project conventions
13
+
14
+
#![warn(missing_docs)]
15
+
16
+
pub mod consumer;
17
+
18
+
pub use consumer::{
19
+
Consumer, ConsumerError, ConsumerTaskConfig, EventHandler, JetstreamEvent,
20
+
JetstreamEventCommit, JetstreamEventDelete, LoggingHandler,
21
+
};
22
+
23
+
// Re-export CancellationToken for convenience
24
+
pub use tokio_util::sync::CancellationToken;
+1
-1
crates/atproto-oauth-axum/Cargo.toml
+1
-1
crates/atproto-oauth-axum/Cargo.toml
+1
-1
crates/atproto-oauth-axum/README.md
+1
-1
crates/atproto-oauth-axum/README.md
+28
crates/atproto-oauth-axum/src/bin/atproto-oauth-tool.rs
+28
crates/atproto-oauth-axum/src/bin/atproto-oauth-tool.rs
···
1
+
//! # AT Protocol OAuth CLI Tool
2
+
//!
3
+
//! Command-line tool for managing AT Protocol OAuth authentication flows.
4
+
//! Provides functionality to initiate OAuth login flows and refresh access tokens
5
+
//! for AT Protocol services.
6
+
//!
7
+
//! ## Commands
8
+
//!
9
+
//! - `login <private_signing_key> <subject>`: Start OAuth login flow
10
+
//! - `refresh <private_signing_key> <subject> <private_dpop_key> <refresh_token>`: Refresh OAuth tokens
11
+
//!
12
+
//! ## Features
13
+
//!
14
+
//! - Complete OAuth 2.0 authorization code flow with PKCE
15
+
//! - DPoP (Demonstration of Proof-of-Possession) token support
16
+
//! - AT Protocol identity resolution and DID document management
17
+
//! - OAuth server endpoints for client metadata and callback handling
18
+
//! - Support for both `did:plc` and `did:web` identity methods
19
+
//!
20
+
//! ## Environment Variables
21
+
//!
22
+
//! - `EXTERNAL_BASE`: External hostname for OAuth endpoints (required)
23
+
//! - `PORT`: HTTP server port (default: 8080)
24
+
//! - `PLC_HOSTNAME`: PLC directory hostname (default: plc.directory)
25
+
//! - `USER_AGENT`: HTTP User-Agent header (auto-generated)
26
+
//! - `DNS_NAMESERVERS`: Custom DNS nameservers (optional)
27
+
//! - `CERTIFICATE_BUNDLES`: Additional CA certificates (optional)
28
+
1
29
use anyhow::Result;
2
30
use async_trait::async_trait;
3
31
use atproto_identity::{
+1
-1
crates/atproto-oauth/Cargo.toml
+1
-1
crates/atproto-oauth/Cargo.toml
···
1
1
[package]
2
2
name = "atproto-oauth"
3
-
version = "0.5.0"
3
+
version = "0.6.0"
4
4
description = "OAuth workflow implementation for AT Protocol - PKCE, DPoP, and secure authentication flows"
5
5
readme = "README.md"
6
6
homepage = "https://tangled.sh/@smokesignal.events/atproto-identity-rs"
+1
-1
crates/atproto-oauth/README.md
+1
-1
crates/atproto-oauth/README.md
+1
-1
crates/atproto-record/Cargo.toml
+1
-1
crates/atproto-record/Cargo.toml
···
1
1
[package]
2
2
name = "atproto-record"
3
-
version = "0.5.0"
3
+
version = "0.6.0"
4
4
description = "AT Protocol record signature operations - cryptographic signing and verification for AT Protocol records"
5
5
readme = "README.md"
6
6
homepage = "https://tangled.sh/@smokesignal.events/atproto-identity-rs"
+2
-1
crates/atproto-xrpcs-helloworld/Cargo.toml
+2
-1
crates/atproto-xrpcs-helloworld/Cargo.toml
···
1
1
[package]
2
2
name = "atproto-xrpcs-helloworld"
3
-
version = "0.5.0"
3
+
version = "0.6.0"
4
+
description = "Complete example implementation of an AT Protocol XRPC service with DID web functionality and JWT authentication"
4
5
edition.workspace = true
5
6
rust-version.workspace = true
6
7
repository.workspace = true
+23
crates/atproto-xrpcs-helloworld/src/main.rs
+23
crates/atproto-xrpcs-helloworld/src/main.rs
···
1
+
//! # AT Protocol XRPC Hello World Service
2
+
//!
3
+
//! A demonstration XRPC service implementation showcasing the AT Protocol ecosystem.
4
+
//! This service provides a simple "Hello, World!" endpoint that supports both authenticated
5
+
//! and unauthenticated requests.
6
+
//!
7
+
//! ## Features
8
+
//!
9
+
//! - AT Protocol identity resolution and DID document management
10
+
//! - XRPC service endpoint with optional authentication
11
+
//! - DID:web identity publishing via `.well-known` endpoints
12
+
//! - JWT-based request authentication using AT Protocol standards
13
+
//!
14
+
//! ## Environment Variables
15
+
//!
16
+
//! - `SERVICE_KEY`: Private key for service identity (required)
17
+
//! - `EXTERNAL_BASE`: External hostname for service endpoints (required)
18
+
//! - `PORT`: HTTP server port (default: 8080)
19
+
//! - `PLC_HOSTNAME`: PLC directory hostname (default: plc.directory)
20
+
//! - `USER_AGENT`: HTTP User-Agent header (auto-generated)
21
+
//! - `DNS_NAMESERVERS`: Custom DNS nameservers (optional)
22
+
//! - `CERTIFICATE_BUNDLES`: Additional CA certificates (optional)
23
+
1
24
use anyhow::Result;
2
25
use async_trait::async_trait;
3
26
use atproto_identity::{
+2
-1
crates/atproto-xrpcs/Cargo.toml
+2
-1
crates/atproto-xrpcs/Cargo.toml