···1+use clap::Parser;
2+use std::path::PathBuf;
3+use std::time::{Duration, Instant};
4+5+use jetstream::{
6+ events::{commit::CommitEvent, JetstreamEvent::Commit},
7+ DefaultJetstreamEndpoints, JetstreamCompression, JetstreamConfig, JetstreamConnector,
8+};
9+10+/// Aggregate links in the at-mosphere
11+#[derive(Parser, Debug)]
12+#[command(version, about, long_about = None)]
13+struct Args {
14+ /// Jetstream server to connect to (exclusive with --fixture). Provide either a wss:// URL, or a shorhand value:
15+ /// 'us-east-1', 'us-east-2', 'us-west-1', or 'us-west-2'
16+ #[arg(long)]
17+ jetstream: String,
18+ /// Location to store persist data to disk
19+ #[arg(long)]
20+ data: PathBuf,
21+}
22+23+#[tokio::main]
24+async fn main() -> anyhow::Result<()> {
25+ let args = Args::parse();
26+27+ let config: JetstreamConfig<serde_json::Value> = JetstreamConfig {
28+ endpoint: DefaultJetstreamEndpoints::endpoint_or_shortcut(&args.jetstream),
29+ compression: JetstreamCompression::Zstd,
30+ ..Default::default()
31+ };
32+33+ let jetstream: JetstreamConnector<serde_json::Value> = JetstreamConnector::new(config)?;
34+ let receiver = jetstream.connect().await?;
35+36+ println!("Jetstream ready");
37+38+ let print_throttle = Duration::from_millis(400);
39+ let mut last = Instant::now();
40+ while let Ok(event) = receiver.recv_async().await {
41+ if let Commit(CommitEvent::Create { commit, .. }) = event {
42+ let now = Instant::now();
43+ let since = now - last;
44+ if since >= print_throttle {
45+ let overshoot = since - print_throttle; // adjust to keep the rate on average
46+ last = now - overshoot;
47+ println!(
48+ "{}: {}",
49+ &*commit.info.collection,
50+ serde_json::to_string(&commit.record)?
51+ );
52+ }
53+ }
54+ }
55+56+ Ok(())
57+}