A better Rust ATProto crate
at main 4.2 kB view raw
1//! Example: Subscribe to a PDS's subscribeRepos endpoint 2//! 3//! This demonstrates consuming the repo event stream directly from a PDS, 4//! which is what a Relay does to ingest updates from PDSes. 5//! 6//! Usage: 7//! cargo run --example subscribe_repos -- atproto.systems 8 9use clap::Parser; 10use jacquard::api::com_atproto::sync::subscribe_repos::{SubscribeRepos, SubscribeReposMessage}; 11use jacquard_common::xrpc::{SubscriptionClient, TungsteniteSubscriptionClient}; 12use miette::IntoDiagnostic; 13use n0_future::StreamExt; 14use smol_str::ToSmolStr; 15use url::Url; 16 17#[derive(Parser, Debug)] 18#[command( 19 author, 20 version, 21 about = "Subscribe to a PDS's subscribeRepos endpoint" 22)] 23struct Args { 24 /// PDS URL (e.g., atproto.systems or https://atproto.systems) 25 pds_url: String, 26 27 /// Starting cursor position 28 #[arg(short, long)] 29 cursor: Option<i64>, 30} 31 32fn normalize_url(input: &str) -> Result<Url, url::ParseError> { 33 // Strip any existing scheme 34 let without_scheme = input 35 .trim_start_matches("https://") 36 .trim_start_matches("http://") 37 .trim_start_matches("wss://") 38 .trim_start_matches("ws://"); 39 40 // Prepend wss:// 41 Url::parse(&format!("wss://{}", without_scheme)) 42} 43 44fn print_message(msg: &SubscribeReposMessage) { 45 match msg { 46 SubscribeReposMessage::Commit(commit) => { 47 println!( 48 "Commit | repo={} seq={} time={} rev={} commit={} ops={} prev={}", 49 commit.repo, 50 commit.seq, 51 commit.time, 52 commit.rev, 53 commit.commit, 54 commit.ops.len(), 55 commit 56 .since 57 .as_ref() 58 .map(|ts| ts.to_smolstr()) 59 .unwrap_or_default(), 60 ); 61 } 62 SubscribeReposMessage::Identity(identity) => { 63 println!( 64 "Identity | did={} seq={} time={} handle={:?}", 65 identity.did, identity.seq, identity.time, identity.handle 66 ); 67 } 68 SubscribeReposMessage::Account(account) => { 69 println!( 70 "Account | did={} seq={} time={} active={} status={:?}", 71 account.did, account.seq, account.time, account.active, account.status 72 ); 73 } 74 SubscribeReposMessage::Sync(sync) => { 75 println!( 76 "Sync | did={} seq={} time={} rev={} blocks={}b", 77 sync.did, 78 sync.seq, 79 sync.time, 80 sync.rev, 81 sync.blocks.len() 82 ); 83 } 84 SubscribeReposMessage::Info(info) => { 85 println!("Info | name={} message={:?}", info.name, info.message); 86 } 87 SubscribeReposMessage::Unknown(data) => { 88 println!("Unknown message: {:?}", data); 89 } 90 } 91} 92 93#[tokio::main] 94async fn main() -> miette::Result<()> { 95 let args = Args::parse(); 96 97 let base_url = normalize_url(&args.pds_url).into_diagnostic()?; 98 println!("Connecting to {}", base_url); 99 100 // Create subscription client 101 let client = TungsteniteSubscriptionClient::from_base_uri(base_url); 102 103 // Subscribe with optional cursor 104 let params = if let Some(cursor) = args.cursor { 105 SubscribeRepos::new().cursor(cursor).build() 106 } else { 107 SubscribeRepos::new().build() 108 }; 109 let stream = client.subscribe(&params).await.into_diagnostic()?; 110 111 println!("Connected! Streaming messages (Ctrl-C to stop)...\n"); 112 113 // Set up Ctrl-C handler 114 let (tx, mut rx) = tokio::sync::oneshot::channel(); 115 tokio::spawn(async move { 116 tokio::signal::ctrl_c().await.ok(); 117 let _ = tx.send(()); 118 }); 119 120 // Convert to typed message stream 121 let (_sink, mut messages) = stream.into_stream(); 122 123 loop { 124 tokio::select! { 125 Some(result) = messages.next() => { 126 match result { 127 Ok(msg) => print_message(&msg), 128 Err(e) => eprintln!("Error: {}", e), 129 } 130 } 131 _ = &mut rx => { 132 println!("\nShutting down..."); 133 break; 134 } 135 } 136 } 137 138 Ok(()) 139}