1//! Example: Subscribe to a PDS's subscribeRepos endpoint
2//!
3//! This demonstrates consuming the repo event stream directly from a PDS,
4//! which is what a Relay does to ingest updates from PDSes.
5//!
6//! Usage:
7//! cargo run --example subscribe_repos -- atproto.systems
8
9use clap::Parser;
10use jacquard::api::com_atproto::sync::subscribe_repos::{SubscribeRepos, SubscribeReposMessage};
11use jacquard_common::xrpc::{SubscriptionClient, TungsteniteSubscriptionClient};
12use miette::IntoDiagnostic;
13use n0_future::StreamExt;
14use smol_str::ToSmolStr;
15use url::Url;
16
17#[derive(Parser, Debug)]
18#[command(
19 author,
20 version,
21 about = "Subscribe to a PDS's subscribeRepos endpoint"
22)]
23struct Args {
24 /// PDS URL (e.g., atproto.systems or https://atproto.systems)
25 pds_url: String,
26
27 /// Starting cursor position
28 #[arg(short, long)]
29 cursor: Option<i64>,
30}
31
32fn normalize_url(input: &str) -> Result<Url, url::ParseError> {
33 // Strip any existing scheme
34 let without_scheme = input
35 .trim_start_matches("https://")
36 .trim_start_matches("http://")
37 .trim_start_matches("wss://")
38 .trim_start_matches("ws://");
39
40 // Prepend wss://
41 Url::parse(&format!("wss://{}", without_scheme))
42}
43
44fn print_message(msg: &SubscribeReposMessage) {
45 match msg {
46 SubscribeReposMessage::Commit(commit) => {
47 println!(
48 "Commit | repo={} seq={} time={} rev={} commit={} ops={} prev={}",
49 commit.repo,
50 commit.seq,
51 commit.time,
52 commit.rev,
53 commit.commit,
54 commit.ops.len(),
55 commit
56 .since
57 .as_ref()
58 .map(|ts| ts.to_smolstr())
59 .unwrap_or_default(),
60 );
61 }
62 SubscribeReposMessage::Identity(identity) => {
63 println!(
64 "Identity | did={} seq={} time={} handle={:?}",
65 identity.did, identity.seq, identity.time, identity.handle
66 );
67 }
68 SubscribeReposMessage::Account(account) => {
69 println!(
70 "Account | did={} seq={} time={} active={} status={:?}",
71 account.did, account.seq, account.time, account.active, account.status
72 );
73 }
74 SubscribeReposMessage::Sync(sync) => {
75 println!(
76 "Sync | did={} seq={} time={} rev={} blocks={}b",
77 sync.did,
78 sync.seq,
79 sync.time,
80 sync.rev,
81 sync.blocks.len()
82 );
83 }
84 SubscribeReposMessage::Info(info) => {
85 println!("Info | name={} message={:?}", info.name, info.message);
86 }
87 SubscribeReposMessage::Unknown(data) => {
88 println!("Unknown message: {:?}", data);
89 }
90 }
91}
92
93#[tokio::main]
94async fn main() -> miette::Result<()> {
95 let args = Args::parse();
96
97 let base_url = normalize_url(&args.pds_url).into_diagnostic()?;
98 println!("Connecting to {}", base_url);
99
100 // Create subscription client
101 let client = TungsteniteSubscriptionClient::from_base_uri(base_url);
102
103 // Subscribe with optional cursor
104 let params = if let Some(cursor) = args.cursor {
105 SubscribeRepos::new().cursor(cursor).build()
106 } else {
107 SubscribeRepos::new().build()
108 };
109 let stream = client.subscribe(¶ms).await.into_diagnostic()?;
110
111 println!("Connected! Streaming messages (Ctrl-C to stop)...\n");
112
113 // Set up Ctrl-C handler
114 let (tx, mut rx) = tokio::sync::oneshot::channel();
115 tokio::spawn(async move {
116 tokio::signal::ctrl_c().await.ok();
117 let _ = tx.send(());
118 });
119
120 // Convert to typed message stream
121 let (_sink, mut messages) = stream.into_stream();
122
123 loop {
124 tokio::select! {
125 Some(result) = messages.next() => {
126 match result {
127 Ok(msg) => print_message(&msg),
128 Err(e) => eprintln!("Error: {}", e),
129 }
130 }
131 _ = &mut rx => {
132 println!("\nShutting down...");
133 break;
134 }
135 }
136 }
137
138 Ok(())
139}