1//! Example: Subscribe to Jetstream firehose
2//!
3//! Jetstream is a JSON-based alternative to the standard DAG-CBOR firehose.
4//! It streams all public network updates in a simplified format.
5//!
6//! Usage:
7//! cargo run --example subscribe_jetstream
8//! cargo run --example subscribe_jetstream -- jetstream2.us-west.bsky.network
9
10use clap::Parser;
11use jacquard_common::jetstream::{CommitOperation, JetstreamMessage, JetstreamParams};
12use jacquard_common::xrpc::{SubscriptionClient, TungsteniteSubscriptionClient};
13use miette::IntoDiagnostic;
14use n0_future::StreamExt;
15use url::Url;
16
17#[derive(Parser, Debug)]
18#[command(author, version, about = "Subscribe to Jetstream firehose")]
19struct Args {
20 /// Jetstream URL (e.g., jetstream1.us-east.fire.hose.cam)
21 #[arg(default_value = "jetstream1.us-east.fire.hose.cam")]
22 jetstream_url: String,
23}
24
25fn normalize_url(input: &str) -> Result<Url, url::ParseError> {
26 // Strip any existing scheme
27 let without_scheme = input
28 .trim_start_matches("https://")
29 .trim_start_matches("http://")
30 .trim_start_matches("wss://")
31 .trim_start_matches("ws://");
32
33 // Prepend wss:// and parse
34 Url::parse(&format!("wss://{}", without_scheme))
35}
36
37fn print_message(msg: &JetstreamMessage) {
38 match msg {
39 JetstreamMessage::Commit {
40 did,
41 time_us,
42 commit,
43 } => {
44 let op = match commit.operation {
45 CommitOperation::Create => "create",
46 CommitOperation::Update => "update",
47 CommitOperation::Delete => "delete",
48 };
49 println!(
50 "Commit | did={} time_us={} op={} collection={} rkey={} cid={:?}",
51 did, time_us, op, commit.collection, commit.rkey, commit.cid
52 );
53 }
54 JetstreamMessage::Identity {
55 did,
56 time_us,
57 identity,
58 } => {
59 println!(
60 "Identity | did={} time_us={} handle={:?} seq={} time={}",
61 did, time_us, identity.handle, identity.seq, identity.time
62 );
63 }
64 JetstreamMessage::Account {
65 did,
66 time_us,
67 account,
68 } => {
69 println!(
70 "Account | did={} time_us={} active={} seq={} time={} status={:?}",
71 did, time_us, account.active, account.seq, account.time, account.status
72 );
73 }
74 }
75}
76
77#[tokio::main]
78async fn main() -> miette::Result<()> {
79 let args = Args::parse();
80
81 let base_url = normalize_url(&args.jetstream_url).into_diagnostic()?;
82 println!("Connecting to {}", base_url);
83
84 // Create subscription client
85 let client = TungsteniteSubscriptionClient::from_base_uri(base_url);
86
87 // Subscribe with no filters (firehose mode)
88 // Enable compression if zstd feature is available
89 #[cfg(feature = "zstd")]
90 let params = { JetstreamParams::new().compress(true).build() };
91
92 #[cfg(not(feature = "zstd"))]
93 let params = { JetstreamParams::new().build() };
94
95 let stream = client.subscribe(¶ms).await.into_diagnostic()?;
96
97 println!("Connected! Streaming messages (Ctrl-C to stop)...\n");
98
99 // Set up Ctrl-C handler
100 let (tx, mut rx) = tokio::sync::oneshot::channel();
101 tokio::spawn(async move {
102 tokio::signal::ctrl_c().await.ok();
103 let _ = tx.send(());
104 });
105
106 // Convert to typed message stream
107 let (_sink, mut messages) = stream.into_stream();
108
109 let mut count = 0u64;
110
111 loop {
112 tokio::select! {
113 Some(result) = messages.next() => {
114 match result {
115 Ok(msg) => {
116 count += 1;
117 print_message(&msg);
118 }
119 Err(e) => eprintln!("Error: {}", e),
120 }
121 }
122 _ = &mut rx => {
123 println!("\nReceived {} messages", count);
124 println!("Shutting down...");
125 break;
126 }
127 }
128 }
129
130 Ok(())
131}