A better Rust ATProto crate
1//! Example: Subscribe to Jetstream firehose 2//! 3//! Jetstream is a JSON-based alternative to the standard DAG-CBOR firehose. 4//! It streams all public network updates in a simplified format. 5//! 6//! Usage: 7//! cargo run --example subscribe_jetstream 8//! cargo run --example subscribe_jetstream -- jetstream2.us-west.bsky.network 9 10use clap::Parser; 11use jacquard_common::jetstream::{CommitOperation, JetstreamMessage, JetstreamParams}; 12use jacquard_common::xrpc::{SubscriptionClient, TungsteniteSubscriptionClient}; 13use miette::IntoDiagnostic; 14use n0_future::StreamExt; 15use url::Url; 16 17#[derive(Parser, Debug)] 18#[command(author, version, about = "Subscribe to Jetstream firehose")] 19struct Args { 20 /// Jetstream URL (e.g., jetstream1.us-east.fire.hose.cam) 21 #[arg(default_value = "jetstream1.us-east.fire.hose.cam")] 22 jetstream_url: String, 23} 24 25fn normalize_url(input: &str) -> Result<Url, url::ParseError> { 26 // Strip any existing scheme 27 let without_scheme = input 28 .trim_start_matches("https://") 29 .trim_start_matches("http://") 30 .trim_start_matches("wss://") 31 .trim_start_matches("ws://"); 32 33 // Prepend wss:// and parse 34 Url::parse(&format!("wss://{}", without_scheme)) 35} 36 37fn print_message(msg: &JetstreamMessage) { 38 match msg { 39 JetstreamMessage::Commit { 40 did, 41 time_us, 42 commit, 43 } => { 44 let op = match commit.operation { 45 CommitOperation::Create => "create", 46 CommitOperation::Update => "update", 47 CommitOperation::Delete => "delete", 48 }; 49 println!( 50 "Commit | did={} time_us={} op={} collection={} rkey={} cid={:?}", 51 did, time_us, op, commit.collection, commit.rkey, commit.cid 52 ); 53 } 54 JetstreamMessage::Identity { 55 did, 56 time_us, 57 identity, 58 } => { 59 println!( 60 "Identity | did={} time_us={} handle={:?} seq={} time={}", 61 did, time_us, identity.handle, identity.seq, identity.time 62 ); 63 } 64 JetstreamMessage::Account { 65 did, 66 time_us, 67 account, 68 } => { 69 println!( 70 "Account | did={} time_us={} active={} seq={} time={} status={:?}", 71 did, time_us, account.active, account.seq, account.time, account.status 72 ); 73 } 74 } 75} 76 77#[tokio::main] 78async fn main() -> miette::Result<()> { 79 let args = Args::parse(); 80 81 let base_url = normalize_url(&args.jetstream_url).into_diagnostic()?; 82 println!("Connecting to {}", base_url); 83 84 // Create subscription client 85 let client = TungsteniteSubscriptionClient::from_base_uri(base_url); 86 87 // Subscribe with no filters (firehose mode) 88 // Enable compression if zstd feature is available 89 #[cfg(feature = "zstd")] 90 let params = { JetstreamParams::new().compress(true).build() }; 91 92 #[cfg(not(feature = "zstd"))] 93 let params = { JetstreamParams::new().build() }; 94 95 let stream = client.subscribe(&params).await.into_diagnostic()?; 96 97 println!("Connected! Streaming messages (Ctrl-C to stop)...\n"); 98 99 // Set up Ctrl-C handler 100 let (tx, mut rx) = tokio::sync::oneshot::channel(); 101 tokio::spawn(async move { 102 tokio::signal::ctrl_c().await.ok(); 103 let _ = tx.send(()); 104 }); 105 106 // Convert to typed message stream 107 let (_sink, mut messages) = stream.into_stream(); 108 109 let mut count = 0u64; 110 111 loop { 112 tokio::select! { 113 Some(result) = messages.next() => { 114 match result { 115 Ok(msg) => { 116 count += 1; 117 print_message(&msg); 118 } 119 Err(e) => eprintln!("Error: {}", e), 120 } 121 } 122 _ = &mut rx => { 123 println!("\nReceived {} messages", count); 124 println!("Shutting down..."); 125 break; 126 } 127 } 128 } 129 130 Ok(()) 131}