A better Rust ATProto crate
1//! Jetstream subscription support
2//!
3//! Jetstream is a simplified JSON-based alternative to the atproto firehose.
4//! Unlike subscribeRepos which uses DAG-CBOR, Jetstream uses JSON encoding.
5
6use crate::types::cid::Cid;
7use crate::types::nsid::Nsid;
8use crate::types::string::{Datetime, Did, Handle, Rkey};
9use crate::xrpc::{MessageEncoding, SubscriptionResp, XrpcSubscription};
10use crate::{CowStr, Data, IntoStatic, RawData};
11use alloc::vec::Vec;
12use serde::{Deserialize, Serialize};
13
14/// Parameters for subscribing to Jetstream
15#[cfg_attr(feature = "std", derive(bon::Builder))]
16#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
17#[serde(rename_all = "camelCase")]
18#[cfg_attr(feature = "std", builder(start_fn = new))]
19pub struct JetstreamParams<'a> {
20 /// Filter by collection NSIDs (max 100)
21 #[serde(skip_serializing_if = "Option::is_none")]
22 #[serde(borrow)]
23 #[builder(into)]
24 pub wanted_collections: Option<Vec<Nsid<'a>>>,
25
26 /// Filter by DIDs (max 10,000)
27 #[serde(skip_serializing_if = "Option::is_none")]
28 #[serde(borrow)]
29 #[builder(into)]
30 pub wanted_dids: Option<Vec<Did<'a>>>,
31
32 /// Unix microseconds timestamp to start playback
33 #[serde(skip_serializing_if = "Option::is_none")]
34 pub cursor: Option<i64>,
35
36 /// Maximum payload size in bytes
37 #[serde(skip_serializing_if = "Option::is_none")]
38 pub max_message_size_bytes: Option<u64>,
39
40 /// Enable zstd compression
41 #[serde(skip_serializing_if = "Option::is_none")]
42 pub compress: Option<bool>,
43
44 /// Pause stream until first options update
45 #[serde(skip_serializing_if = "Option::is_none")]
46 pub require_hello: Option<bool>,
47}
48
49/// Commit operation type
50#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
51#[serde(rename_all = "lowercase")]
52pub enum CommitOperation {
53 /// Create a new record
54 Create,
55 /// Update an existing record
56 Update,
57 /// Delete a record
58 Delete,
59}
60
61/// Commit event details (minimal validation)
62#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
63pub struct RawJetstreamCommit<'a> {
64 /// Revision string
65 #[serde(borrow)]
66 pub rev: CowStr<'a>,
67 /// Operation type
68 pub operation: CommitOperation,
69 /// Collection NSID
70 #[serde(borrow)]
71 pub collection: CowStr<'a>,
72 /// Record key
73 #[serde(borrow)]
74 pub rkey: CowStr<'a>,
75 /// Record data (present for create/update)
76 #[serde(skip_serializing_if = "Option::is_none")]
77 #[serde(borrow)]
78 pub record: Option<RawData<'a>>,
79 /// Content identifier
80 #[serde(skip_serializing_if = "Option::is_none")]
81 #[serde(borrow)]
82 pub cid: Option<CowStr<'a>>,
83}
84
85/// Commit event details (additional validation)
86#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
87pub struct JetstreamCommit<'a> {
88 /// Revision string
89 #[serde(borrow)]
90 pub rev: CowStr<'a>,
91 /// Operation type
92 pub operation: CommitOperation,
93 /// Collection NSID
94 #[serde(borrow)]
95 pub collection: Nsid<'a>,
96 /// Record key
97 #[serde(borrow)]
98 pub rkey: Rkey<'a>,
99 /// Record data (present for create/update)
100 #[serde(skip_serializing_if = "Option::is_none")]
101 #[serde(borrow)]
102 pub record: Option<Data<'a>>,
103 /// Content identifier
104 #[serde(skip_serializing_if = "Option::is_none")]
105 #[serde(borrow)]
106 pub cid: Option<Cid<'a>>,
107}
108
109/// Identity event details
110#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
111pub struct JetstreamIdentity<'a> {
112 /// DID
113 #[serde(borrow)]
114 pub did: Did<'a>,
115 /// Handle
116 #[serde(skip_serializing_if = "Option::is_none")]
117 #[serde(borrow)]
118 pub handle: Option<Handle<'a>>,
119 /// Sequence number
120 pub seq: i64,
121 /// Timestamp
122 pub time: Datetime,
123}
124
125/// Account event details
126#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
127pub struct JetstreamAccount<'a> {
128 /// Account active status
129 pub active: bool,
130 /// DID
131 #[serde(borrow)]
132 pub did: Did<'a>,
133 /// Sequence number
134 pub seq: i64,
135 /// Timestamp
136 pub time: Datetime,
137 /// Optional status message
138 #[serde(skip_serializing_if = "Option::is_none")]
139 #[serde(borrow)]
140 pub status: Option<CowStr<'a>>,
141}
142
143/// Jetstream event message
144#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
145#[serde(tag = "kind")]
146#[serde(rename_all = "lowercase")]
147pub enum JetstreamMessage<'a> {
148 /// Commit event
149 Commit {
150 /// DID
151 #[serde(borrow)]
152 did: Did<'a>,
153 /// Unix microseconds timestamp
154 time_us: i64,
155 /// Commit details
156 #[serde(borrow)]
157 commit: JetstreamCommit<'a>,
158 },
159 /// Identity event
160 Identity {
161 /// DID
162 #[serde(borrow)]
163 did: Did<'a>,
164 /// Unix microseconds timestamp
165 time_us: i64,
166 /// Identity details
167 #[serde(borrow)]
168 identity: JetstreamIdentity<'a>,
169 },
170 /// Account event
171 Account {
172 /// DID
173 #[serde(borrow)]
174 did: Did<'a>,
175 /// Unix microseconds timestamp
176 time_us: i64,
177 /// Account details
178 #[serde(borrow)]
179 account: JetstreamAccount<'a>,
180 },
181}
182
183/// Jetstream event message
184#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
185#[serde(tag = "kind")]
186#[serde(rename_all = "lowercase")]
187pub enum RawJetstreamMessage<'a> {
188 /// Commit event
189 Commit {
190 /// DID
191 #[serde(borrow)]
192 did: Did<'a>,
193 /// Unix microseconds timestamp
194 time_us: i64,
195 /// Commit details
196 #[serde(borrow)]
197 commit: RawJetstreamCommit<'a>,
198 },
199 /// Identity event
200 Identity {
201 /// DID
202 #[serde(borrow)]
203 did: Did<'a>,
204 /// Unix microseconds timestamp
205 time_us: i64,
206 /// Identity details
207 #[serde(borrow)]
208 identity: JetstreamIdentity<'a>,
209 },
210 /// Account event
211 Account {
212 /// DID
213 #[serde(borrow)]
214 did: Did<'a>,
215 /// Unix microseconds timestamp
216 time_us: i64,
217 /// Account details
218 #[serde(borrow)]
219 account: JetstreamAccount<'a>,
220 },
221 /// Unknown messsage type
222 #[serde(untagged)]
223 Unknown(RawData<'a>),
224}
225
226impl IntoStatic for CommitOperation {
227 type Output = CommitOperation;
228
229 fn into_static(self) -> Self::Output {
230 self
231 }
232}
233
234impl IntoStatic for JetstreamCommit<'_> {
235 type Output = JetstreamCommit<'static>;
236
237 fn into_static(self) -> Self::Output {
238 JetstreamCommit {
239 rev: self.rev.into_static(),
240 operation: self.operation,
241 collection: self.collection.into_static(),
242 rkey: self.rkey.into_static(),
243 record: self.record.map(|r| r.into_static()),
244 cid: self.cid.map(|c| c.into_static()),
245 }
246 }
247}
248
249impl IntoStatic for RawJetstreamCommit<'_> {
250 type Output = RawJetstreamCommit<'static>;
251
252 fn into_static(self) -> Self::Output {
253 RawJetstreamCommit {
254 rev: self.rev.into_static(),
255 operation: self.operation,
256 collection: self.collection.into_static(),
257 rkey: self.rkey.into_static(),
258 record: self.record.map(|r| r.into_static()),
259 cid: self.cid.map(|c| c.into_static()),
260 }
261 }
262}
263
264impl IntoStatic for JetstreamIdentity<'_> {
265 type Output = JetstreamIdentity<'static>;
266
267 fn into_static(self) -> Self::Output {
268 JetstreamIdentity {
269 did: self.did.into_static(),
270 handle: self.handle.map(|h| h.into_static()),
271 seq: self.seq,
272 time: self.time,
273 }
274 }
275}
276
277impl IntoStatic for JetstreamAccount<'_> {
278 type Output = JetstreamAccount<'static>;
279
280 fn into_static(self) -> Self::Output {
281 JetstreamAccount {
282 active: self.active,
283 did: self.did.into_static(),
284 seq: self.seq,
285 time: self.time,
286 status: self.status.map(|s| s.into_static()),
287 }
288 }
289}
290
291impl IntoStatic for JetstreamMessage<'_> {
292 type Output = JetstreamMessage<'static>;
293
294 fn into_static(self) -> Self::Output {
295 match self {
296 JetstreamMessage::Commit {
297 did,
298 time_us,
299 commit,
300 } => JetstreamMessage::Commit {
301 did: did.into_static(),
302 time_us,
303 commit: commit.into_static(),
304 },
305 JetstreamMessage::Identity {
306 did,
307 time_us,
308 identity,
309 } => JetstreamMessage::Identity {
310 did: did.into_static(),
311 time_us,
312 identity: identity.into_static(),
313 },
314 JetstreamMessage::Account {
315 did,
316 time_us,
317 account,
318 } => JetstreamMessage::Account {
319 did: did.into_static(),
320 time_us,
321 account: account.into_static(),
322 },
323 }
324 }
325}
326
327impl IntoStatic for RawJetstreamMessage<'_> {
328 type Output = RawJetstreamMessage<'static>;
329
330 fn into_static(self) -> Self::Output {
331 match self {
332 RawJetstreamMessage::Commit {
333 did,
334 time_us,
335 commit,
336 } => RawJetstreamMessage::Commit {
337 did: did.into_static(),
338 time_us,
339 commit: commit.into_static(),
340 },
341 RawJetstreamMessage::Identity {
342 did,
343 time_us,
344 identity,
345 } => RawJetstreamMessage::Identity {
346 did: did.into_static(),
347 time_us,
348 identity: identity.into_static(),
349 },
350 RawJetstreamMessage::Account {
351 did,
352 time_us,
353 account,
354 } => RawJetstreamMessage::Account {
355 did: did.into_static(),
356 time_us,
357 account: account.into_static(),
358 },
359 RawJetstreamMessage::Unknown(data) => RawJetstreamMessage::Unknown(data.into_static()),
360 }
361 }
362}
363
364/// Stream response type for Jetstream subscriptions
365pub struct JetstreamStream;
366
367impl SubscriptionResp for JetstreamStream {
368 const NSID: &'static str = "jetstream";
369 const ENCODING: MessageEncoding = MessageEncoding::Json;
370
371 /// Typed Jetstream message
372 type Message<'de> = JetstreamMessage<'de>;
373
374 /// Generic error type
375 type Error<'de> = crate::xrpc::GenericError<'de>;
376}
377
378impl<'a> XrpcSubscription for JetstreamParams<'a> {
379 const NSID: &'static str = "jetstream";
380 const ENCODING: MessageEncoding = MessageEncoding::Json;
381 const CUSTOM_PATH: Option<&'static str> = Some("/subscribe");
382 type Stream = JetstreamStream;
383}
384
385impl IntoStatic for JetstreamParams<'_> {
386 type Output = JetstreamParams<'static>;
387
388 fn into_static(self) -> Self::Output {
389 JetstreamParams {
390 wanted_collections: self
391 .wanted_collections
392 .map(|v| v.into_iter().map(|s| s.into_static()).collect()),
393 wanted_dids: self
394 .wanted_dids
395 .map(|v| v.into_iter().map(|s| s.into_static()).collect()),
396 cursor: self.cursor,
397 max_message_size_bytes: self.max_message_size_bytes,
398 compress: self.compress,
399 require_hello: self.require_hello,
400 }
401 }
402}
403
404/// Parameters for subscribing to Jetstream
405#[cfg_attr(feature = "std", derive(bon::Builder))]
406#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
407#[serde(rename_all = "camelCase")]
408#[cfg_attr(feature = "std", builder(start_fn = new))]
409pub struct RawJetstreamParams<'a> {
410 /// Filter by collection NSIDs (max 100)
411 #[serde(skip_serializing_if = "Option::is_none")]
412 #[serde(borrow)]
413 #[builder(into)]
414 pub wanted_collections: Option<Vec<crate::CowStr<'a>>>,
415
416 /// Filter by DIDs (max 10,000)
417 #[serde(skip_serializing_if = "Option::is_none")]
418 #[serde(borrow)]
419 #[builder(into)]
420 pub wanted_dids: Option<Vec<crate::CowStr<'a>>>,
421
422 /// Unix microseconds timestamp to start playback
423 #[serde(skip_serializing_if = "Option::is_none")]
424 pub cursor: Option<i64>,
425
426 /// Maximum payload size in bytes
427 #[serde(skip_serializing_if = "Option::is_none")]
428 pub max_message_size_bytes: Option<u64>,
429
430 /// Enable zstd compression
431 #[serde(skip_serializing_if = "Option::is_none")]
432 pub compress: Option<bool>,
433
434 /// Pause stream until first options update
435 #[serde(skip_serializing_if = "Option::is_none")]
436 pub require_hello: Option<bool>,
437}
438
439/// Stream response type for Jetstream subscriptions
440pub struct JetstreamRawStream;
441
442impl SubscriptionResp for JetstreamRawStream {
443 const NSID: &'static str = "jetstream";
444 const ENCODING: MessageEncoding = MessageEncoding::Json;
445
446 /// Typed Jetstream message
447 type Message<'de> = RawJetstreamMessage<'de>;
448
449 /// Generic error type
450 type Error<'de> = crate::xrpc::GenericError<'de>;
451}
452
453impl<'a> XrpcSubscription for RawJetstreamParams<'a> {
454 const NSID: &'static str = "jetstream";
455 const ENCODING: MessageEncoding = MessageEncoding::Json;
456 const CUSTOM_PATH: Option<&'static str> = Some("/subscribe");
457 type Stream = JetstreamRawStream;
458}
459
460impl IntoStatic for RawJetstreamParams<'_> {
461 type Output = RawJetstreamParams<'static>;
462
463 fn into_static(self) -> Self::Output {
464 RawJetstreamParams {
465 wanted_collections: self
466 .wanted_collections
467 .map(|v| v.into_iter().map(|s| s.into_static()).collect()),
468 wanted_dids: self
469 .wanted_dids
470 .map(|v| v.into_iter().map(|s| s.into_static()).collect()),
471 cursor: self.cursor,
472 max_message_size_bytes: self.max_message_size_bytes,
473 compress: self.compress,
474 require_hello: self.require_hello,
475 }
476 }
477}