+1
-1
jetstream/Cargo.toml
+1
-1
jetstream/Cargo.toml
+22
-5
jetstream/examples/arbitrary_record.rs
+22
-5
jetstream/examples/arbitrary_record.rs
···
5
5
use clap::Parser;
6
6
use jetstream::{
7
7
events::{
8
-
commit::CommitEvent,
9
-
JetstreamEvent::Commit,
8
+
CommitOp,
9
+
EventKind,
10
+
JetstreamEvent,
10
11
},
11
12
DefaultJetstreamEndpoints,
12
13
JetstreamCompression,
···
30
31
let args = Args::parse();
31
32
32
33
let dids = args.did.unwrap_or_default();
33
-
let config: JetstreamConfig<serde_json::Value> = JetstreamConfig {
34
+
let config: JetstreamConfig = JetstreamConfig {
34
35
endpoint: DefaultJetstreamEndpoints::USEastOne.into(),
35
36
wanted_collections: vec![args.nsid.clone()],
36
37
wanted_dids: dids.clone(),
···
48
49
);
49
50
50
51
while let Some(event) = receiver.recv().await {
51
-
if let Commit(CommitEvent::CreateOrUpdate { commit, .. }) = event {
52
-
println!("got record: {:?}", commit.record);
52
+
if let JetstreamEvent {
53
+
kind: EventKind::Commit,
54
+
commit: Some(commit),
55
+
..
56
+
} = event
57
+
{
58
+
if commit.collection != args.nsid {
59
+
continue;
60
+
}
61
+
if !(commit.operation == CommitOp::Create || commit.operation == CommitOp::Update) {
62
+
continue;
63
+
}
64
+
let Some(rec) = commit.record else { continue };
65
+
println!(
66
+
"New or updated record! ({})\n{:?}\n",
67
+
commit.rkey.as_str(),
68
+
rec.get()
69
+
);
53
70
}
54
71
}
55
72
+20
-31
jetstream/examples/basic.rs
+20
-31
jetstream/examples/basic.rs
···
7
7
use clap::Parser;
8
8
use jetstream::{
9
9
events::{
10
-
commit::{
11
-
CommitEvent,
12
-
CommitType,
13
-
},
14
-
JetstreamEvent::Commit,
10
+
CommitEvent,
11
+
CommitOp,
12
+
EventKind,
13
+
JetstreamEvent,
15
14
},
16
15
DefaultJetstreamEndpoints,
17
16
JetstreamCompression,
···
25
24
/// The DIDs to listen for events on, if not provided we will listen for all DIDs.
26
25
#[arg(short, long)]
27
26
did: Option<Vec<string::Did>>,
28
-
/// The NSID for the collection to listen for (e.g. `app.bsky.feed.post`).
29
-
#[arg(short, long)]
30
-
nsid: string::Nsid,
31
27
}
32
28
33
29
#[tokio::main]
···
37
33
let dids = args.did.unwrap_or_default();
38
34
let config = JetstreamConfig {
39
35
endpoint: DefaultJetstreamEndpoints::USEastOne.into(),
40
-
wanted_collections: vec![args.nsid.clone()],
36
+
wanted_collections: vec![string::Nsid::new("app.bsky.feed.post".to_string()).unwrap()],
41
37
wanted_dids: dids.clone(),
42
38
compression: JetstreamCompression::Zstd,
43
39
..Default::default()
···
46
42
let jetstream = JetstreamConnector::new(config)?;
47
43
let mut receiver = jetstream.connect().await?;
48
44
49
-
println!(
50
-
"Listening for '{}' events on DIDs: {:?}",
51
-
args.nsid.as_str(),
52
-
dids
53
-
);
45
+
println!("Listening for 'app.bsky.feed.post' events on DIDs: {dids:?}");
54
46
55
47
while let Some(event) = receiver.recv().await {
56
-
if let Commit(commit) = event {
57
-
match commit {
58
-
CommitEvent::CreateOrUpdate { info: _, commit }
59
-
if commit.info.operation == CommitType::Create =>
60
-
{
61
-
if let AppBskyFeedPost(record) = commit.record {
62
-
println!(
63
-
"New post created! ({})\n\n'{}'",
64
-
commit.info.rkey.as_str(),
65
-
record.text
66
-
);
67
-
}
68
-
}
69
-
CommitEvent::Delete { info: _, commit } => {
70
-
println!("A post has been deleted. ({})", commit.rkey.as_str());
71
-
}
72
-
_ => {}
48
+
if let JetstreamEvent {
49
+
kind: EventKind::Commit,
50
+
commit:
51
+
Some(CommitEvent {
52
+
operation: CommitOp::Create,
53
+
rkey,
54
+
record: Some(record),
55
+
..
56
+
}),
57
+
..
58
+
} = event
59
+
{
60
+
if let Ok(AppBskyFeedPost(rec)) = serde_json::from_str(record.get()) {
61
+
println!("New post created! ({})\n{:?}\n", rkey.as_str(), rec.text);
73
62
}
74
63
}
75
64
}
-40
jetstream/src/events/account.rs
-40
jetstream/src/events/account.rs
···
1
-
use chrono::Utc;
2
-
use serde::Deserialize;
3
-
4
-
use crate::{
5
-
events::EventInfo,
6
-
exports,
7
-
};
8
-
9
-
/// An event representing a change to an account.
10
-
#[derive(Deserialize, Debug)]
11
-
pub struct AccountEvent {
12
-
/// Basic metadata included with every event.
13
-
#[serde(flatten)]
14
-
pub info: EventInfo,
15
-
/// Account specific data bundled with this event.
16
-
pub account: AccountData,
17
-
}
18
-
19
-
/// Account specific data bundled with an account event.
20
-
#[derive(Deserialize, Debug)]
21
-
pub struct AccountData {
22
-
/// Whether the account is currently active.
23
-
pub active: bool,
24
-
/// The DID of the account.
25
-
pub did: exports::Did,
26
-
pub seq: u64,
27
-
pub time: chrono::DateTime<Utc>,
28
-
/// If `active` is `false` this will be present to explain why the account is inactive.
29
-
pub status: Option<AccountStatus>,
30
-
}
31
-
32
-
/// The possible reasons an account might be listed as inactive.
33
-
#[derive(Deserialize, Debug)]
34
-
#[serde(rename_all = "lowercase")]
35
-
pub enum AccountStatus {
36
-
Deactivated,
37
-
Deleted,
38
-
Suspended,
39
-
TakenDown,
40
-
}
-55
jetstream/src/events/commit.rs
-55
jetstream/src/events/commit.rs
···
1
-
use serde::Deserialize;
2
-
3
-
use crate::{
4
-
events::EventInfo,
5
-
exports,
6
-
};
7
-
8
-
/// An event representing a repo commit, which can be a `create`, `update`, or `delete` operation.
9
-
#[derive(Deserialize, Debug)]
10
-
#[serde(untagged, rename_all = "snake_case")]
11
-
pub enum CommitEvent<R> {
12
-
CreateOrUpdate {
13
-
#[serde(flatten)]
14
-
info: EventInfo,
15
-
commit: CommitData<R>,
16
-
},
17
-
Delete {
18
-
#[serde(flatten)]
19
-
info: EventInfo,
20
-
commit: CommitInfo,
21
-
},
22
-
}
23
-
24
-
/// The type of commit operation that was performed.
25
-
#[derive(Deserialize, Debug, PartialEq)]
26
-
#[serde(rename_all = "snake_case")]
27
-
pub enum CommitType {
28
-
Create,
29
-
Update,
30
-
Delete,
31
-
}
32
-
33
-
/// Basic commit specific info bundled with every event, also the only data included with a `delete`
34
-
/// operation.
35
-
#[derive(Deserialize, Debug)]
36
-
pub struct CommitInfo {
37
-
/// The type of commit operation that was performed.
38
-
pub operation: CommitType,
39
-
pub rev: String,
40
-
pub rkey: exports::RecordKey,
41
-
/// The NSID of the record type that this commit is associated with.
42
-
pub collection: exports::Nsid,
43
-
}
44
-
45
-
/// Detailed data bundled with a commit event. This data is only included when the event is
46
-
/// `create` or `update`.
47
-
#[derive(Deserialize, Debug)]
48
-
pub struct CommitData<R> {
49
-
#[serde(flatten)]
50
-
pub info: CommitInfo,
51
-
/// The CID of the record that was operated on.
52
-
pub cid: exports::Cid,
53
-
/// The record that was operated on.
54
-
pub record: R,
55
-
}
-28
jetstream/src/events/identity.rs
-28
jetstream/src/events/identity.rs
···
1
-
use chrono::Utc;
2
-
use serde::Deserialize;
3
-
4
-
use crate::{
5
-
events::EventInfo,
6
-
exports,
7
-
};
8
-
9
-
/// An event representing a change to an identity.
10
-
#[derive(Deserialize, Debug)]
11
-
pub struct IdentityEvent {
12
-
/// Basic metadata included with every event.
13
-
#[serde(flatten)]
14
-
pub info: EventInfo,
15
-
/// Identity specific data bundled with this event.
16
-
pub identity: IdentityData,
17
-
}
18
-
19
-
/// Identity specific data bundled with an identity event.
20
-
#[derive(Deserialize, Debug)]
21
-
pub struct IdentityData {
22
-
/// The DID of the identity.
23
-
pub did: exports::Did,
24
-
/// The handle associated with the identity.
25
-
pub handle: Option<exports::Handle>,
26
-
pub seq: u64,
27
-
pub time: chrono::DateTime<Utc>,
28
-
}
+90
-32
jetstream/src/events/mod.rs
+90
-32
jetstream/src/events/mod.rs
···
1
-
pub mod account;
2
-
pub mod commit;
3
-
pub mod identity;
4
-
5
1
use std::time::{
6
2
Duration,
7
3
SystemTime,
···
9
5
UNIX_EPOCH,
10
6
};
11
7
8
+
use chrono::Utc;
12
9
use serde::Deserialize;
10
+
use serde_json::value::RawValue;
13
11
14
12
use crate::exports;
15
13
16
14
/// Opaque wrapper for the time_us cursor used by jetstream
17
-
///
18
-
/// Generally, you should use a cursor
19
15
#[derive(Deserialize, Debug, Clone, PartialEq, PartialOrd)]
20
16
pub struct Cursor(u64);
21
17
22
-
/// Basic data that is included with every event.
23
-
#[derive(Deserialize, Debug)]
24
-
pub struct EventInfo {
18
+
#[derive(Debug, Deserialize)]
19
+
#[serde(rename_all = "snake_case")]
20
+
pub struct JetstreamEvent {
21
+
#[serde(rename = "time_us")]
22
+
pub cursor: Cursor,
25
23
pub did: exports::Did,
26
-
pub time_us: Cursor,
27
24
pub kind: EventKind,
25
+
pub commit: Option<CommitEvent>,
26
+
pub identity: Option<IdentityEvent>,
27
+
pub account: Option<AccountEvent>,
28
28
}
29
29
30
-
#[derive(Deserialize, Debug)]
31
-
#[serde(untagged)]
32
-
pub enum JetstreamEvent<R> {
33
-
Commit(commit::CommitEvent<R>),
34
-
Identity(identity::IdentityEvent),
35
-
Account(account::AccountEvent),
36
-
}
37
-
38
-
#[derive(Deserialize, Debug)]
30
+
#[derive(Debug, Deserialize, PartialEq)]
39
31
#[serde(rename_all = "snake_case")]
40
32
pub enum EventKind {
41
33
Commit,
···
43
35
Account,
44
36
}
45
37
46
-
impl<R> JetstreamEvent<R> {
47
-
pub fn cursor(&self) -> Cursor {
48
-
match self {
49
-
JetstreamEvent::Commit(commit::CommitEvent::CreateOrUpdate { info, .. }) => {
50
-
info.time_us.clone()
51
-
}
52
-
JetstreamEvent::Commit(commit::CommitEvent::Delete { info, .. }) => {
53
-
info.time_us.clone()
54
-
}
55
-
JetstreamEvent::Identity(e) => e.info.time_us.clone(),
56
-
JetstreamEvent::Account(e) => e.info.time_us.clone(),
57
-
}
58
-
}
38
+
#[derive(Debug, Deserialize)]
39
+
#[serde(rename_all = "snake_case")]
40
+
pub struct CommitEvent {
41
+
pub collection: exports::Nsid,
42
+
pub rkey: exports::RecordKey,
43
+
pub rev: String,
44
+
pub operation: CommitOp,
45
+
pub record: Option<Box<RawValue>>,
46
+
pub cid: Option<exports::Cid>,
47
+
}
48
+
49
+
#[derive(Debug, Deserialize, PartialEq)]
50
+
#[serde(rename_all = "snake_case")]
51
+
pub enum CommitOp {
52
+
Create,
53
+
Update,
54
+
Delete,
55
+
}
56
+
57
+
#[derive(Debug, Deserialize, PartialEq)]
58
+
pub struct IdentityEvent {
59
+
pub did: exports::Did,
60
+
pub handle: Option<exports::Handle>,
61
+
pub seq: u64,
62
+
pub time: chrono::DateTime<Utc>,
63
+
}
64
+
65
+
#[derive(Debug, Deserialize, PartialEq)]
66
+
pub struct AccountEvent {
67
+
pub active: bool,
68
+
pub did: exports::Did,
69
+
pub seq: u64,
70
+
pub time: chrono::DateTime<Utc>,
71
+
pub status: Option<String>,
59
72
}
60
73
61
74
impl Cursor {
···
136
149
UNIX_EPOCH + Duration::from_micros(c.0)
137
150
}
138
151
}
152
+
153
+
#[cfg(test)]
154
+
mod test {
155
+
use super::*;
156
+
157
+
#[test]
158
+
fn test_parse_commit_event() -> anyhow::Result<()> {
159
+
let json = r#"{
160
+
"rev":"3llrdsginou2i",
161
+
"operation":"create",
162
+
"collection":"app.bsky.feed.post",
163
+
"rkey":"3llrdsglqdc2s",
164
+
"cid": "bafyreidofvwoqvd2cnzbun6dkzgfucxh57tirf3ohhde7lsvh4fu3jehgy",
165
+
"record": {"$type":"app.bsky.feed.post","createdAt":"2025-04-01T16:58:06.154Z","langs":["en"],"text":"I wish apirl 1st would stop existing lol"}
166
+
}"#;
167
+
let commit: CommitEvent = serde_json::from_str(json)?;
168
+
assert_eq!(
169
+
commit.cid.unwrap(),
170
+
"bafyreidofvwoqvd2cnzbun6dkzgfucxh57tirf3ohhde7lsvh4fu3jehgy".parse()?
171
+
);
172
+
assert_eq!(
173
+
commit.record.unwrap().get(),
174
+
r#"{"$type":"app.bsky.feed.post","createdAt":"2025-04-01T16:58:06.154Z","langs":["en"],"text":"I wish apirl 1st would stop existing lol"}"#
175
+
);
176
+
Ok(())
177
+
}
178
+
179
+
#[test]
180
+
fn test_parse_whole_event() -> anyhow::Result<()> {
181
+
let json = r#"{"did":"did:plc:ai3dzf35cth7s3st7n7jsd7r","time_us":1743526687419798,"kind":"commit","commit":{"rev":"3llrdsginou2i","operation":"create","collection":"app.bsky.feed.post","rkey":"3llrdsglqdc2s","record":{"$type":"app.bsky.feed.post","createdAt":"2025-04-01T16:58:06.154Z","langs":["en"],"text":"I wish apirl 1st would stop existing lol"},"cid":"bafyreidofvwoqvd2cnzbun6dkzgfucxh57tirf3ohhde7lsvh4fu3jehgy"}}"#;
182
+
let event: JetstreamEvent = serde_json::from_str(json)?;
183
+
assert_eq!(event.kind, EventKind::Commit);
184
+
assert!(event.commit.is_some());
185
+
let commit = event.commit.unwrap();
186
+
assert_eq!(
187
+
commit.cid.unwrap(),
188
+
"bafyreidofvwoqvd2cnzbun6dkzgfucxh57tirf3ohhde7lsvh4fu3jehgy".parse()?
189
+
);
190
+
assert_eq!(
191
+
commit.record.unwrap().get(),
192
+
r#"{"$type":"app.bsky.feed.post","createdAt":"2025-04-01T16:58:06.154Z","langs":["en"],"text":"I wish apirl 1st would stop existing lol"}"#
193
+
);
194
+
Ok(())
195
+
}
196
+
}
+20
-29
jetstream/src/lib.rs
+20
-29
jetstream/src/lib.rs
···
7
7
Cursor as IoCursor,
8
8
Read,
9
9
},
10
-
marker::PhantomData,
11
10
time::{
12
11
Duration,
13
12
Instant,
14
13
},
15
14
};
16
15
17
-
use atrium_api::record::KnownRecord;
18
16
use futures_util::{
19
17
stream::StreamExt,
20
18
SinkExt,
21
19
};
22
-
use serde::de::DeserializeOwned;
23
20
use tokio::{
24
21
net::TcpStream,
25
22
sync::mpsc::{
···
124
121
const JETSTREAM_ZSTD_DICTIONARY: &[u8] = include_bytes!("../zstd/dictionary");
125
122
126
123
/// A receiver channel for consuming Jetstream events.
127
-
pub type JetstreamReceiver<R> = Receiver<JetstreamEvent<R>>;
124
+
pub type JetstreamReceiver = Receiver<JetstreamEvent>;
128
125
129
126
/// An internal sender channel for sending Jetstream events to [JetstreamReceiver]'s.
130
-
type JetstreamSender<R> = Sender<JetstreamEvent<R>>;
127
+
type JetstreamSender = Sender<JetstreamEvent>;
131
128
132
129
/// A wrapper connector type for working with a WebSocket connection to a Jetstream instance to
133
130
/// receive and consume events. See [JetstreamConnector::connect] for more info.
134
-
pub struct JetstreamConnector<R: DeserializeOwned> {
131
+
pub struct JetstreamConnector {
135
132
/// The configuration for the Jetstream connection.
136
-
config: JetstreamConfig<R>,
133
+
config: JetstreamConfig,
137
134
}
138
135
139
136
pub enum JetstreamCompression {
···
163
160
}
164
161
}
165
162
166
-
pub struct JetstreamConfig<R: DeserializeOwned = KnownRecord> {
163
+
pub struct JetstreamConfig {
167
164
/// A Jetstream endpoint to connect to with a WebSocket Scheme i.e.
168
165
/// `wss://jetstream1.us-east.bsky.network/subscribe`.
169
166
pub endpoint: String,
···
200
197
/// can help prevent that if your consumer sometimes pauses, at a cost of higher memory
201
198
/// usage while events are buffered.
202
199
pub channel_size: usize,
203
-
/// Marker for record deserializable type.
204
-
///
205
-
/// See examples/arbitrary_record.rs for an example using serde_json::Value
206
-
///
207
-
/// You can omit this if you construct `JetstreamConfig { a: b, ..Default::default() }.
208
-
/// If you have to specify it, use `std::marker::PhantomData` with no type parameters.
209
-
pub record_type: PhantomData<R>,
210
200
}
211
201
212
-
impl<R: DeserializeOwned> Default for JetstreamConfig<R> {
202
+
impl Default for JetstreamConfig {
213
203
fn default() -> Self {
214
204
JetstreamConfig {
215
205
endpoint: DefaultJetstreamEndpoints::USEastOne.into(),
···
220
210
omit_user_agent_jetstream_info: false,
221
211
replay_on_reconnect: false,
222
212
channel_size: 4096, // a few seconds of firehose buffer
223
-
record_type: PhantomData,
224
213
}
225
214
}
226
215
}
227
216
228
-
impl<R: DeserializeOwned> JetstreamConfig<R> {
217
+
impl JetstreamConfig {
229
218
/// Constructs a new endpoint URL with the given [JetstreamConfig] applied.
230
219
pub fn get_request_builder(
231
220
&self,
···
313
302
}
314
303
}
315
304
316
-
impl<R: DeserializeOwned + Send + 'static> JetstreamConnector<R> {
305
+
impl JetstreamConnector {
317
306
/// Create a Jetstream connector with a valid [JetstreamConfig].
318
307
///
319
308
/// After creation, you can call [connect] to connect to the provided Jetstream instance.
320
-
pub fn new(config: JetstreamConfig<R>) -> Result<Self, ConfigValidationError> {
309
+
pub fn new(config: JetstreamConfig) -> Result<Self, ConfigValidationError> {
321
310
// We validate the configuration here so any issues are caught early.
322
311
config.validate()?;
323
312
Ok(JetstreamConnector { config })
···
327
316
///
328
317
/// A [JetstreamReceiver] is returned which can be used to respond to events. When all instances
329
318
/// of this receiver are dropped, the connection and task are automatically closed.
330
-
pub async fn connect(&self) -> Result<JetstreamReceiver<R>, ConnectionError> {
319
+
pub async fn connect(&self) -> Result<JetstreamReceiver, ConnectionError> {
331
320
self.connect_cursor(None).await
332
321
}
333
322
···
343
332
pub async fn connect_cursor(
344
333
&self,
345
334
cursor: Option<Cursor>,
346
-
) -> Result<JetstreamReceiver<R>, ConnectionError> {
335
+
) -> Result<JetstreamReceiver, ConnectionError> {
347
336
// We validate the config again for good measure. Probably not necessary but it can't hurt.
348
337
self.config
349
338
.validate()
···
424
413
425
414
/// The main task that handles the WebSocket connection and sends [JetstreamEvent]'s to any
426
415
/// receivers that are listening for them.
427
-
async fn websocket_task<R: DeserializeOwned>(
416
+
async fn websocket_task(
428
417
dictionary: DecoderDictionary<'_>,
429
418
ws: WebSocketStream<MaybeTlsStream<TcpStream>>,
430
-
send_channel: JetstreamSender<R>,
419
+
send_channel: JetstreamSender,
431
420
last_cursor: &mut Option<Cursor>,
432
421
) -> Result<(), JetstreamEventError> {
433
422
// TODO: Use the write half to allow the user to change configuration settings on the fly.
···
439
428
Some(Ok(message)) => {
440
429
match message {
441
430
Message::Text(json) => {
442
-
let event: JetstreamEvent<R> = serde_json::from_str(&json)
431
+
let event: JetstreamEvent = serde_json::from_str(&json)
443
432
.map_err(JetstreamEventError::ReceivedMalformedJSON)?;
444
-
let event_cursor = event.cursor();
433
+
let event_cursor = event.cursor.clone();
445
434
446
435
if let Some(last) = last_cursor {
447
436
if event_cursor <= *last {
···
475
464
.read_to_string(&mut json)
476
465
.map_err(JetstreamEventError::CompressionDecoderError)?;
477
466
478
-
let event: JetstreamEvent<R> = serde_json::from_str(&json)
479
-
.map_err(JetstreamEventError::ReceivedMalformedJSON)?;
480
-
let event_cursor = event.cursor();
467
+
let event: JetstreamEvent = serde_json::from_str(&json).map_err(|e| {
468
+
eprintln!("lkasjdflkajsd {e:?} {json}");
469
+
JetstreamEventError::ReceivedMalformedJSON(e)
470
+
})?;
471
+
let event_cursor = event.cursor.clone();
481
472
482
473
if let Some(last) = last_cursor {
483
474
if event_cursor <= *last {
+58
-53
ufos/src/consumer.rs
+58
-53
ufos/src/consumer.rs
···
1
1
use jetstream::{
2
-
events::{
3
-
account::AccountEvent,
4
-
commit::{CommitData, CommitEvent, CommitInfo, CommitType},
5
-
Cursor, EventInfo, JetstreamEvent,
6
-
},
2
+
events::{CommitEvent, CommitOp, Cursor, EventKind, JetstreamEvent},
7
3
exports::Did,
8
4
DefaultJetstreamEndpoints, JetstreamCompression, JetstreamConfig, JetstreamConnector,
9
5
JetstreamReceiver,
···
26
22
27
23
#[derive(Debug)]
28
24
struct Batcher {
29
-
jetstream_receiver: JetstreamReceiver<serde_json::Value>,
25
+
jetstream_receiver: JetstreamReceiver,
30
26
batch_sender: Sender<EventBatch>,
31
27
current_batch: EventBatch,
32
28
}
···
42
38
} else {
43
39
eprintln!("connecting to jetstream at {jetstream_endpoint} => {endpoint}");
44
40
}
45
-
let config: JetstreamConfig<serde_json::Value> = JetstreamConfig {
41
+
let config: JetstreamConfig = JetstreamConfig {
46
42
endpoint,
47
43
compression: if no_compress {
48
44
JetstreamCompression::None
49
45
} else {
50
46
JetstreamCompression::Zstd
51
47
},
52
-
channel_size: 64, // small because we'd rather buffer events into batches
48
+
channel_size: 64, // small because we expect to be fast....?
53
49
..Default::default()
54
50
};
55
51
let jetstream_receiver = JetstreamConnector::new(config)?
···
62
58
}
63
59
64
60
impl Batcher {
65
-
fn new(
66
-
jetstream_receiver: JetstreamReceiver<serde_json::Value>,
67
-
batch_sender: Sender<EventBatch>,
68
-
) -> Self {
61
+
fn new(jetstream_receiver: JetstreamReceiver, batch_sender: Sender<EventBatch>) -> Self {
69
62
Self {
70
63
jetstream_receiver,
71
64
batch_sender,
···
83
76
}
84
77
}
85
78
86
-
async fn handle_event(
87
-
&mut self,
88
-
event: JetstreamEvent<serde_json::Value>,
89
-
) -> anyhow::Result<()> {
90
-
let event_cursor = event.cursor();
79
+
async fn handle_event(&mut self, event: JetstreamEvent) -> anyhow::Result<()> {
80
+
let event_cursor = event.cursor;
91
81
92
82
if let Some(earliest) = &self.current_batch.first_jetstream_cursor {
93
83
if event_cursor.duration_since(earliest)? > Duration::from_secs_f64(MAX_BATCH_SPAN_SECS)
···
98
88
self.current_batch.first_jetstream_cursor = Some(event_cursor.clone());
99
89
}
100
90
101
-
match event {
102
-
JetstreamEvent::Commit(CommitEvent::CreateOrUpdate { commit, info }) => {
103
-
match commit.info.operation {
104
-
CommitType::Create => self.handle_create_record(commit, info).await?,
105
-
CommitType::Update => {
106
-
self.handle_modify_record(modify_update(commit, info))
107
-
.await?
91
+
match event.kind {
92
+
EventKind::Commit if event.commit.is_some() => {
93
+
let commit = event.commit.unwrap();
94
+
match commit.operation {
95
+
CommitOp::Create => {
96
+
self.handle_create_record(event.did, commit, event_cursor.clone())
97
+
.await?;
108
98
}
109
-
CommitType::Delete => {
110
-
panic!("jetstream Commit::CreateOrUpdate had Delete operation type")
99
+
CommitOp::Update => {
100
+
self.handle_modify_record(modify_update(
101
+
event.did,
102
+
commit,
103
+
event_cursor.clone(),
104
+
))
105
+
.await?;
106
+
}
107
+
CommitOp::Delete => {
108
+
self.handle_modify_record(modify_delete(
109
+
event.did,
110
+
commit,
111
+
event_cursor.clone(),
112
+
))
113
+
.await?;
111
114
}
112
115
}
113
116
}
114
-
JetstreamEvent::Commit(CommitEvent::Delete { commit, info }) => {
115
-
self.handle_modify_record(modify_delete(commit, info))
116
-
.await?
117
-
}
118
-
JetstreamEvent::Account(AccountEvent { info, account }) if !account.active => {
119
-
self.handle_remove_account(info.did, info.time_us).await?
117
+
EventKind::Account if event.account.is_some() => {
118
+
let account = event.account.unwrap();
119
+
if !account.active {
120
+
self.handle_remove_account(account.did, event_cursor.clone())
121
+
.await?;
122
+
}
120
123
}
121
-
JetstreamEvent::Account(_) => {} // ignore account *activations*
122
-
JetstreamEvent::Identity(_) => {} // identity events are noops for us
124
+
_ => {}
123
125
};
124
126
self.current_batch.last_jetstream_cursor = Some(event_cursor.clone());
125
127
···
159
161
160
162
async fn handle_create_record(
161
163
&mut self,
162
-
commit: CommitData<serde_json::Value>,
163
-
info: EventInfo,
164
+
did: Did,
165
+
commit: CommitEvent,
166
+
cursor: Cursor,
164
167
) -> anyhow::Result<()> {
165
168
if !self
166
169
.current_batch
167
170
.record_creates
168
-
.contains_key(&commit.info.collection)
171
+
.contains_key(&commit.collection)
169
172
&& self.current_batch.record_creates.len() >= MAX_BATCHED_COLLECTIONS
170
173
{
171
174
self.send_current_batch_now().await?;
172
175
}
176
+
let record = serde_json::from_str(commit.record.unwrap().get())?;
173
177
let record = CreateRecord {
174
-
did: info.did,
175
-
rkey: commit.info.rkey,
176
-
record: commit.record,
177
-
cursor: info.time_us,
178
+
did,
179
+
rkey: commit.rkey,
180
+
record,
181
+
cursor,
178
182
};
179
183
let collection = self
180
184
.current_batch
181
185
.record_creates
182
-
.entry(commit.info.collection)
186
+
.entry(commit.collection)
183
187
.or_default();
184
188
collection.total_seen += 1;
185
189
collection.samples.push_front(record);
···
206
210
}
207
211
}
208
212
209
-
fn modify_update(commit: CommitData<serde_json::Value>, info: EventInfo) -> ModifyRecord {
213
+
fn modify_update(did: Did, commit: CommitEvent, cursor: Cursor) -> ModifyRecord {
214
+
let record = serde_json::from_str(commit.record.unwrap().get()).unwrap();
210
215
ModifyRecord::Update(UpdateRecord {
211
-
did: info.did,
212
-
collection: commit.info.collection,
213
-
rkey: commit.info.rkey,
214
-
record: commit.record,
215
-
cursor: info.time_us,
216
+
did,
217
+
collection: commit.collection,
218
+
rkey: commit.rkey,
219
+
record,
220
+
cursor,
216
221
})
217
222
}
218
223
219
-
fn modify_delete(commit_info: CommitInfo, info: EventInfo) -> ModifyRecord {
224
+
fn modify_delete(did: Did, commit: CommitEvent, cursor: Cursor) -> ModifyRecord {
220
225
ModifyRecord::Delete(DeleteRecord {
221
-
did: info.did,
222
-
collection: commit_info.collection,
223
-
rkey: commit_info.rkey,
224
-
cursor: info.time_us,
226
+
did,
227
+
collection: commit.collection,
228
+
rkey: commit.rkey,
229
+
cursor,
225
230
})
226
231
}
+2
-1
ufos/src/lib.rs
+2
-1
ufos/src/lib.rs
+2
-2
ufos/src/main.rs
+2
-2
ufos/src/main.rs
···
1
1
use clap::Parser;
2
2
use std::path::PathBuf;
3
-
use ufos::{consumer, server, store};
3
+
use ufos::{consumer, server, storage_fjall};
4
4
5
5
#[cfg(not(target_env = "msvc"))]
6
6
use tikv_jemallocator::Jemalloc;
···
43
43
44
44
let args = Args::parse();
45
45
let (storage, cursor) =
46
-
store::Storage::open(args.data, &args.jetstream, args.jetstream_force).await?;
46
+
storage_fjall::Storage::open(args.data, &args.jetstream, args.jetstream_force).await?;
47
47
48
48
println!("starting server with storage...");
49
49
let serving = server::serve(storage.clone());
+1
-1
ufos/src/server.rs
+1
-1
ufos/src/server.rs
+1
-1
ufos/src/store.rs
ufos/src/storage_fjall.rs
+1
-1
ufos/src/store.rs
ufos/src/storage_fjall.rs
···
117
117
// TODO: see rw_loop: enforce single-thread.
118
118
loop {
119
119
let t_sleep = Instant::now();
120
-
sleep(Duration::from_secs_f64(0.8)).await; // TODO: minimize during replay
120
+
sleep(Duration::from_secs_f64(0.08)).await; // TODO: minimize during replay
121
121
let slept_for = t_sleep.elapsed();
122
122
let queue_size = receiver.len();
123
123