A better Rust ATProto crate
1//! Example: Subscribe to a PDS's subscribeRepos endpoint 2//! 3//! This demonstrates consuming the repo event stream directly from a PDS, 4//! which is what a Relay does to ingest updates from PDSes. 5//! 6//! Usage: 7//! cargo run --example subscribe_repos -- atproto.systems 8 9use clap::Parser; 10use jacquard::api::com_atproto::sync::subscribe_repos::{SubscribeRepos, SubscribeReposMessage}; 11use jacquard_common::xrpc::{SubscriptionClient, TungsteniteSubscriptionClient}; 12use miette::IntoDiagnostic; 13use n0_future::StreamExt; 14use url::Url; 15 16#[derive(Parser, Debug)] 17#[command(author, version, about = "Subscribe to a PDS's subscribeRepos endpoint")] 18struct Args { 19 /// PDS URL (e.g., atproto.systems or https://atproto.systems) 20 pds_url: String, 21 22 /// Starting cursor position 23 #[arg(short, long)] 24 cursor: Option<i64>, 25} 26 27fn normalize_url(input: &str) -> Result<Url, url::ParseError> { 28 // Strip any existing scheme 29 let without_scheme = input 30 .trim_start_matches("https://") 31 .trim_start_matches("http://") 32 .trim_start_matches("wss://") 33 .trim_start_matches("ws://"); 34 35 // Prepend wss:// 36 Url::parse(&format!("wss://{}", without_scheme)) 37} 38 39fn print_message(msg: &SubscribeReposMessage) { 40 match msg { 41 SubscribeReposMessage::Commit(commit) => { 42 println!( 43 "Commit | repo={} seq={} time={} rev={} commit={} ops={} prev={}", 44 commit.repo, 45 commit.seq, 46 commit.time, 47 commit.rev, 48 commit.commit, 49 commit.ops.len(), 50 commit.since, 51 ); 52 } 53 SubscribeReposMessage::Identity(identity) => { 54 println!( 55 "Identity | did={} seq={} time={} handle={:?}", 56 identity.did, identity.seq, identity.time, identity.handle 57 ); 58 } 59 SubscribeReposMessage::Account(account) => { 60 println!( 61 "Account | did={} seq={} time={} active={} status={:?}", 62 account.did, account.seq, account.time, account.active, account.status 63 ); 64 } 65 SubscribeReposMessage::Sync(sync) => { 66 println!( 67 "Sync | did={} seq={} time={} rev={} blocks={}b", 68 sync.did, 69 sync.seq, 70 sync.time, 71 sync.rev, 72 sync.blocks.len() 73 ); 74 } 75 SubscribeReposMessage::Info(info) => { 76 println!("Info | name={} message={:?}", info.name, info.message); 77 } 78 SubscribeReposMessage::Unknown(data) => { 79 println!("Unknown message: {:?}", data); 80 } 81 } 82} 83 84#[tokio::main] 85async fn main() -> miette::Result<()> { 86 let args = Args::parse(); 87 88 let base_url = normalize_url(&args.pds_url).into_diagnostic()?; 89 println!("Connecting to {}", base_url); 90 91 // Create subscription client 92 let client = TungsteniteSubscriptionClient::from_base_uri(base_url); 93 94 // Subscribe with optional cursor 95 let params = if let Some(cursor) = args.cursor { 96 SubscribeRepos::new().cursor(cursor).build() 97 } else { 98 SubscribeRepos::new().build() 99 }; 100 let stream = client.subscribe(&params).await.into_diagnostic()?; 101 102 println!("Connected! Streaming messages (Ctrl-C to stop)...\n"); 103 104 // Set up Ctrl-C handler 105 let (tx, mut rx) = tokio::sync::oneshot::channel(); 106 tokio::spawn(async move { 107 tokio::signal::ctrl_c().await.ok(); 108 let _ = tx.send(()); 109 }); 110 111 // Convert to typed message stream 112 let (_sink, mut messages) = stream.into_stream(); 113 114 loop { 115 tokio::select! { 116 Some(result) = messages.next() => { 117 match result { 118 Ok(msg) => print_message(&msg), 119 Err(e) => eprintln!("Error: {}", e), 120 } 121 } 122 _ = &mut rx => { 123 println!("\nShutting down..."); 124 break; 125 } 126 } 127 } 128 129 Ok(()) 130}