1//! Example: Subscribe to a PDS's subscribeRepos endpoint
2//!
3//! This demonstrates consuming the repo event stream directly from a PDS,
4//! which is what a Relay does to ingest updates from PDSes.
5//!
6//! Usage:
7//! cargo run --example subscribe_repos -- atproto.systems
8
9use clap::Parser;
10use jacquard::api::com_atproto::sync::subscribe_repos::{SubscribeRepos, SubscribeReposMessage};
11use jacquard_common::xrpc::{SubscriptionClient, TungsteniteSubscriptionClient};
12use miette::IntoDiagnostic;
13use n0_future::StreamExt;
14use url::Url;
15
16#[derive(Parser, Debug)]
17#[command(author, version, about = "Subscribe to a PDS's subscribeRepos endpoint")]
18struct Args {
19 /// PDS URL (e.g., atproto.systems or https://atproto.systems)
20 pds_url: String,
21
22 /// Starting cursor position
23 #[arg(short, long)]
24 cursor: Option<i64>,
25}
26
27fn normalize_url(input: &str) -> Result<Url, url::ParseError> {
28 // Strip any existing scheme
29 let without_scheme = input
30 .trim_start_matches("https://")
31 .trim_start_matches("http://")
32 .trim_start_matches("wss://")
33 .trim_start_matches("ws://");
34
35 // Prepend wss://
36 Url::parse(&format!("wss://{}", without_scheme))
37}
38
39fn print_message(msg: &SubscribeReposMessage) {
40 match msg {
41 SubscribeReposMessage::Commit(commit) => {
42 println!(
43 "Commit | repo={} seq={} time={} rev={} commit={} ops={} prev={}",
44 commit.repo,
45 commit.seq,
46 commit.time,
47 commit.rev,
48 commit.commit,
49 commit.ops.len(),
50 commit.since,
51 );
52 }
53 SubscribeReposMessage::Identity(identity) => {
54 println!(
55 "Identity | did={} seq={} time={} handle={:?}",
56 identity.did, identity.seq, identity.time, identity.handle
57 );
58 }
59 SubscribeReposMessage::Account(account) => {
60 println!(
61 "Account | did={} seq={} time={} active={} status={:?}",
62 account.did, account.seq, account.time, account.active, account.status
63 );
64 }
65 SubscribeReposMessage::Sync(sync) => {
66 println!(
67 "Sync | did={} seq={} time={} rev={} blocks={}b",
68 sync.did,
69 sync.seq,
70 sync.time,
71 sync.rev,
72 sync.blocks.len()
73 );
74 }
75 SubscribeReposMessage::Info(info) => {
76 println!("Info | name={} message={:?}", info.name, info.message);
77 }
78 SubscribeReposMessage::Unknown(data) => {
79 println!("Unknown message: {:?}", data);
80 }
81 }
82}
83
84#[tokio::main]
85async fn main() -> miette::Result<()> {
86 let args = Args::parse();
87
88 let base_url = normalize_url(&args.pds_url).into_diagnostic()?;
89 println!("Connecting to {}", base_url);
90
91 // Create subscription client
92 let client = TungsteniteSubscriptionClient::from_base_uri(base_url);
93
94 // Subscribe with optional cursor
95 let params = if let Some(cursor) = args.cursor {
96 SubscribeRepos::new().cursor(cursor).build()
97 } else {
98 SubscribeRepos::new().build()
99 };
100 let stream = client.subscribe(¶ms).await.into_diagnostic()?;
101
102 println!("Connected! Streaming messages (Ctrl-C to stop)...\n");
103
104 // Set up Ctrl-C handler
105 let (tx, mut rx) = tokio::sync::oneshot::channel();
106 tokio::spawn(async move {
107 tokio::signal::ctrl_c().await.ok();
108 let _ = tx.send(());
109 });
110
111 // Convert to typed message stream
112 let (_sink, mut messages) = stream.into_stream();
113
114 loop {
115 tokio::select! {
116 Some(result) = messages.next() => {
117 match result {
118 Ok(msg) => print_message(&msg),
119 Err(e) => eprintln!("Error: {}", e),
120 }
121 }
122 _ = &mut rx => {
123 println!("\nShutting down...");
124 break;
125 }
126 }
127 }
128
129 Ok(())
130}