+3
-2
Cargo.lock
+3
-2
Cargo.lock
···
3638
3638
3639
3639
[[package]]
3640
3640
name = "tokio-util"
3641
-
version = "0.7.14"
3641
+
version = "0.7.15"
3642
3642
source = "registry+https://github.com/rust-lang/crates.io-index"
3643
-
checksum = "6b9590b93e6fcc1739458317cccd391ad3955e2bde8913edf6f95f9e65a8f034"
3643
+
checksum = "66a539a9ad6d5d281510d5bd368c973d636c02dbf8a67300bfb6b950696ad7df"
3644
3644
dependencies = [
3645
3645
"bytes",
3646
3646
"futures-core",
···
3846
3846
"thiserror 2.0.12",
3847
3847
"tikv-jemallocator",
3848
3848
"tokio",
3849
+
"tokio-util",
3849
3850
]
3850
3851
3851
3852
[[package]]
+74
-81
jetstream/src/lib.rs
+74
-81
jetstream/src/lib.rs
···
363
363
retry_attempt += 1;
364
364
if let Ok((ws_stream, _)) = connect_async(req).await {
365
365
let t_connected = Instant::now();
366
-
log::trace!("jetstream connected. starting websocket task...");
366
+
log::info!("jetstream connected. starting websocket task...");
367
367
if let Err(e) =
368
368
websocket_task(dict, ws_stream, send_channel.clone(), &mut last_cursor)
369
369
.await
···
374
374
}
375
375
log::error!("Jetstream closed after encountering error: {e:?}");
376
376
} else {
377
-
log::error!("Jetstream connection closed cleanly");
377
+
log::warn!("Jetstream connection closed cleanly");
378
378
}
379
379
if t_connected.elapsed() > Duration::from_secs(success_threshold_s) {
380
+
log::warn!("Jetstream: more than {success_threshold_s}s since last reconnect, reconnecting immediately.");
380
381
retry_attempt = 0;
381
382
}
382
383
}
383
384
384
385
if retry_attempt >= max_retries {
385
-
log::error!("hit max retries, bye");
386
+
log::error!("jetstream: hit max retries, bye");
386
387
break;
387
388
}
388
389
···
422
423
let mut closing_connection = false;
423
424
loop {
424
425
match socket_read.next().await {
425
-
Some(Ok(message)) => {
426
-
match message {
427
-
Message::Text(json) => {
428
-
let event: JetstreamEvent = match serde_json::from_str(&json) {
429
-
Ok(ev) => ev,
430
-
Err(e) => {
431
-
log::warn!(
432
-
"failed to parse json: {e:?} (from {})",
433
-
json.get(..24).unwrap_or(&json)
434
-
);
435
-
continue;
436
-
}
437
-
};
438
-
let event_cursor = event.cursor;
426
+
Some(Ok(message)) => match message {
427
+
Message::Text(json) => {
428
+
let event: JetstreamEvent = match serde_json::from_str(&json) {
429
+
Ok(ev) => ev,
430
+
Err(e) => {
431
+
log::warn!(
432
+
"failed to parse json: {e:?} (from {})",
433
+
json.get(..24).unwrap_or(&json)
434
+
);
435
+
continue;
436
+
}
437
+
};
438
+
let event_cursor = event.cursor;
439
439
440
-
if let Some(last) = last_cursor {
441
-
if event_cursor <= *last {
442
-
log::warn!("event cursor {event_cursor:?} was not newer than the last one: {last:?}. dropping event.");
443
-
continue;
444
-
}
440
+
if let Some(last) = last_cursor {
441
+
if event_cursor <= *last {
442
+
log::warn!("event cursor {event_cursor:?} was not newer than the last one: {last:?}. dropping event.");
443
+
continue;
445
444
}
445
+
}
446
446
447
-
if send_channel.send(event).await.is_err() {
448
-
// We can assume that all receivers have been dropped, so we can close
449
-
// the connection and exit the task.
450
-
log::info!(
447
+
if send_channel.send(event).await.is_err() {
448
+
log::warn!(
451
449
"All receivers for the Jetstream connection have been dropped, closing connection."
452
450
);
453
-
socket_write.close().await?;
454
-
return Err(JetstreamEventError::ReceiverClosedError);
455
-
} else if let Some(last) = last_cursor.as_mut() {
456
-
*last = event_cursor;
457
-
}
451
+
socket_write.close().await?;
452
+
return Err(JetstreamEventError::ReceiverClosedError);
453
+
} else if let Some(last) = last_cursor.as_mut() {
454
+
*last = event_cursor;
458
455
}
459
-
Message::Binary(zstd_json) => {
460
-
let mut cursor = IoCursor::new(zstd_json);
461
-
let decoder = zstd::stream::Decoder::with_prepared_dictionary(
462
-
&mut cursor,
463
-
&dictionary,
464
-
)
465
-
.map_err(JetstreamEventError::CompressionDictionaryError)?;
456
+
}
457
+
Message::Binary(zstd_json) => {
458
+
let mut cursor = IoCursor::new(zstd_json);
459
+
let decoder =
460
+
zstd::stream::Decoder::with_prepared_dictionary(&mut cursor, &dictionary)
461
+
.map_err(JetstreamEventError::CompressionDictionaryError)?;
466
462
467
-
let event: JetstreamEvent = match serde_json::from_reader(decoder) {
468
-
Ok(ev) => ev,
469
-
Err(e) => {
470
-
log::warn!("failed to parse json: {e:?}");
471
-
continue;
472
-
}
473
-
};
474
-
let event_cursor = event.cursor;
463
+
let event: JetstreamEvent = match serde_json::from_reader(decoder) {
464
+
Ok(ev) => ev,
465
+
Err(e) => {
466
+
log::warn!("failed to parse json: {e:?}");
467
+
continue;
468
+
}
469
+
};
470
+
let event_cursor = event.cursor;
475
471
476
-
if let Some(last) = last_cursor {
477
-
if event_cursor <= *last {
478
-
log::warn!("event cursor {event_cursor:?} was not newer than the last one: {last:?}. dropping event.");
479
-
continue;
480
-
}
472
+
if let Some(last) = last_cursor {
473
+
if event_cursor <= *last {
474
+
log::warn!("event cursor {event_cursor:?} was not newer than the last one: {last:?}. dropping event.");
475
+
continue;
481
476
}
477
+
}
482
478
483
-
if send_channel.send(event).await.is_err() {
484
-
// We can assume that all receivers have been dropped, so we can close
485
-
// the connection and exit the task.
486
-
log::info!(
479
+
if send_channel.send(event).await.is_err() {
480
+
log::warn!(
487
481
"All receivers for the Jetstream connection have been dropped, closing connection."
488
482
);
489
-
socket_write.close().await?;
490
-
return Err(JetstreamEventError::ReceiverClosedError);
491
-
} else if let Some(last) = last_cursor.as_mut() {
492
-
*last = event_cursor;
493
-
}
483
+
socket_write.close().await?;
484
+
return Err(JetstreamEventError::ReceiverClosedError);
485
+
} else if let Some(last) = last_cursor.as_mut() {
486
+
*last = event_cursor;
494
487
}
495
-
Message::Ping(vec) => {
496
-
log::trace!("Ping recieved, responding");
497
-
socket_write
498
-
.send(Message::Pong(vec))
499
-
.await
500
-
.map_err(JetstreamEventError::PingPongError)?;
501
-
}
502
-
Message::Close(close_frame) => {
503
-
log::trace!("Close recieved. I guess we just log here?");
504
-
if let Some(close_frame) = close_frame {
505
-
let reason = close_frame.reason;
506
-
let code = close_frame.code;
507
-
log::trace!("Connection closed. Reason: {reason}, Code: {code}");
508
-
}
509
-
}
510
-
Message::Pong(pong) => {
511
-
let pong_payload = String::from_utf8(pong.to_vec())
512
-
.unwrap_or("Invalid payload".to_string());
513
-
log::trace!("Pong recieved. Payload: {pong_payload}");
488
+
}
489
+
Message::Ping(vec) => {
490
+
log::trace!("Ping recieved, responding");
491
+
socket_write
492
+
.send(Message::Pong(vec))
493
+
.await
494
+
.map_err(JetstreamEventError::PingPongError)?;
495
+
}
496
+
Message::Close(close_frame) => {
497
+
log::trace!("Close recieved. I guess we just log here?");
498
+
if let Some(close_frame) = close_frame {
499
+
let reason = close_frame.reason;
500
+
let code = close_frame.code;
501
+
log::trace!("Connection closed. Reason: {reason}, Code: {code}");
514
502
}
515
-
Message::Frame(_) => (),
503
+
}
504
+
Message::Pong(pong) => {
505
+
let pong_payload =
506
+
String::from_utf8(pong.to_vec()).unwrap_or("Invalid payload".to_string());
507
+
log::trace!("Pong recieved. Payload: {pong_payload}");
516
508
}
517
-
}
509
+
Message::Frame(_) => (),
510
+
},
518
511
Some(Err(error)) => {
519
512
log::error!("Web socket error: {error}");
520
513
closing_connection = true;
+1
ufos/Cargo.toml
+1
ufos/Cargo.toml
+3
-8
ufos/src/consumer.rs
+3
-8
ufos/src/consumer.rs
···
68
68
let mut batcher = Batcher::new(jetstream_receiver, batch_sender, sketch_secret);
69
69
tokio::task::spawn(async move {
70
70
let r = batcher.run().await;
71
-
log::info!("batcher ended: {r:?}");
71
+
log::warn!("batcher ended: {r:?}");
72
72
});
73
73
Ok(batch_reciever)
74
74
}
···
93
93
pub async fn run(&mut self) -> anyhow::Result<()> {
94
94
// TODO: report errors *from here* probably, since this gets shipped off into a spawned task that might just vanish
95
95
loop {
96
-
match timeout(
97
-
Duration::from_millis(30_000),
98
-
self.jetstream_receiver.recv(),
99
-
)
100
-
.await
101
-
{
96
+
match timeout(Duration::from_secs_f64(30.), self.jetstream_receiver.recv()).await {
102
97
Err(_elapsed) => self.no_events_step().await?,
103
98
Ok(Some(event)) => self.handle_event(event).await?,
104
99
Ok(None) => anyhow::bail!("channel closed"),
···
198
193
Some(Ok(t)) => format!("{:?}", t),
199
194
Some(Err(e)) => format!("+{:?}", e.duration()),
200
195
};
201
-
log::trace!(
196
+
log::info!(
202
197
"sending batch now from {beginning}, {}, queue capacity: {}, referrer: {referrer}",
203
198
if small { "small" } else { "full" },
204
199
self.batch_sender.capacity(),
+2
ufos/src/error.rs
+2
ufos/src/error.rs
+2
-2
ufos/src/main.rs
+2
-2
ufos/src/main.rs
···
100
100
let rolling = write_store
101
101
.background_tasks(args.reroll)?
102
102
.run(args.backfill);
103
-
let storing = write_store.receive_batches(batches);
103
+
let consuming = write_store.receive_batches(batches);
104
104
105
105
let stating = do_update_stuff(read_store);
106
106
107
107
tokio::select! {
108
108
z = serving => log::warn!("serve task ended: {z:?}"),
109
109
z = rolling => log::warn!("rollup task ended: {z:?}"),
110
-
z = storing => log::warn!("storage task ended: {z:?}"),
110
+
z = consuming => log::warn!("consuming task ended: {z:?}"),
111
111
z = stating => log::warn!("status task ended: {z:?}"),
112
112
};
113
113
+36
-13
ufos/src/storage.rs
+36
-13
ufos/src/storage.rs
···
7
7
use jetstream::exports::{Did, Nsid};
8
8
use std::collections::{HashMap, HashSet};
9
9
use std::path::Path;
10
+
use std::time::{Duration, SystemTime};
10
11
use tokio::sync::mpsc::Receiver;
12
+
use tokio_util::sync::CancellationToken;
11
13
12
14
pub type StorageResult<T> = Result<T, StorageError>;
13
15
···
22
24
Self: Sized;
23
25
}
24
26
25
-
pub trait StoreWriter<B: StoreBackground>: Send + Sync
27
+
#[async_trait]
28
+
pub trait StoreWriter<B: StoreBackground>: Clone + Send + Sync
26
29
where
27
30
Self: 'static,
28
31
{
29
32
fn background_tasks(&mut self, reroll: bool) -> StorageResult<B>;
30
33
31
-
fn receive_batches<const LIMIT: usize>(
32
-
mut self,
34
+
async fn receive_batches<const LIMIT: usize>(
35
+
self,
33
36
mut batches: Receiver<EventBatch<LIMIT>>,
34
-
) -> impl std::future::Future<Output = StorageResult<()>> + Send
35
-
where
36
-
Self: Sized,
37
-
{
38
-
async {
39
-
tokio::task::spawn_blocking(move || {
40
-
while let Some(event_batch) = batches.blocking_recv() {
41
-
self.insert_batch(event_batch)?;
37
+
) -> StorageResult<()> {
38
+
while let Some(event_batch) = batches.recv().await {
39
+
let token = CancellationToken::new();
40
+
let cancelled = token.clone();
41
+
tokio::spawn(async move {
42
+
let started = SystemTime::now();
43
+
let mut concerned = false;
44
+
loop {
45
+
tokio::select! {
46
+
_ = tokio::time::sleep(Duration::from_secs_f64(1.)) => {
47
+
log::warn!("taking a long time to insert an event batch ({:?})...", started.elapsed());
48
+
concerned = true;
49
+
}
50
+
_ = cancelled.cancelled() => {
51
+
if concerned {
52
+
log::warn!("finally inserted slow event batch after {:?}", started.elapsed());
53
+
}
54
+
break
55
+
}
56
+
}
42
57
}
43
-
Ok::<(), StorageError>(())
58
+
});
59
+
tokio::task::spawn_blocking({
60
+
let mut me = self.clone();
61
+
move || {
62
+
let _guard = token.drop_guard();
63
+
me.insert_batch(event_batch)
64
+
}
44
65
})
45
-
.await?
66
+
.await??;
46
67
}
68
+
69
+
Err(StorageError::BatchSenderExited)
47
70
}
48
71
49
72
fn insert_batch<const LIMIT: usize>(