very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust
fjall
at-protocol
atproto
indexer
1//! a statusphere indexer: tracks xyz.statusphere.status records across the ATProto network.
2//!
3//! statusphere is a demo app where users set a single-emoji status on their bluesky profile.
4//! this example indexes those status records in real time, maintaining the current status
5//! per user and printing a periodic leaderboard of the top emoji statuses in use.
6//!
7//! see: https://github.com/bluesky-social/statusphere-example-app
8//!
9//! run with:
10//! HYDRANT_DATABASE_PATH=./statusphere.db cargo run --example statusphere
11//!
12//! the database persists records across restarts. on each start the full event
13//! history is replayed from the database to rebuild the in-memory index.
14//! (in a better app, we could for example use the ephemeral mode of hydrant,
15//! and use our db, or we could use hydrant to backfill multiple instances of the app.)
16
17use std::str::FromStr;
18use std::sync::Arc;
19use std::time::Duration;
20
21use chrono::DateTime;
22use futures::StreamExt;
23use hydrant::config::Config;
24use hydrant::control::{EventStream, Hydrant, ReposControl};
25use hydrant::filter::FilterMode;
26use jacquard_common::types::did::Did;
27use jacquard_common::types::tid::Tid;
28use scc::HashMap;
29
30const COLLECTION: &str = "xyz.statusphere.status";
31
32struct StatusEntry {
33 emoji: String,
34 created_at: String,
35}
36
37struct StatusIndex {
38 /// current status per DID: only the latest by createdAt is kept.
39 current: HashMap<String, StatusEntry>,
40}
41
42impl StatusIndex {
43 fn new() -> Self {
44 Self {
45 current: HashMap::new(),
46 }
47 }
48
49 fn set(&self, did: String, emoji: String, created_at: &str) -> bool {
50 let is_newer = self
51 .current
52 .read_sync(&did, |_, e| created_at > e.created_at.as_str())
53 .unwrap_or(true);
54 if is_newer {
55 self.current.upsert_sync(
56 did,
57 StatusEntry {
58 emoji,
59 created_at: created_at.to_owned(),
60 },
61 );
62 }
63 is_newer
64 }
65
66 fn delete(&self, did: &str) {
67 self.current.remove_sync(did);
68 }
69
70 fn top(&self, n: usize) -> Vec<(String, usize)> {
71 use std::collections::HashMap;
72 let mut counts: HashMap<String, usize> = HashMap::with_capacity(self.current.capacity());
73 self.current.iter_sync(|_, e| {
74 *counts.entry(e.emoji.clone()).or_default() += 1;
75 true
76 });
77 let mut ranked: Vec<_> = counts.into_iter().collect();
78 ranked.sort_by(|a, b| b.1.cmp(&a.1));
79 ranked.truncate(n);
80 ranked
81 }
82}
83
84async fn run_ticker(index: Arc<StatusIndex>) {
85 let mut interval = tokio::time::interval(Duration::from_secs(30));
86 interval.tick().await;
87 loop {
88 interval.tick().await;
89 let top = index.top(10);
90 if top.is_empty() {
91 continue;
92 }
93 println!(
94 "\n--- top statuses ({} users tracked) ---",
95 index.current.len()
96 );
97 for (emoji, count) in &top {
98 println!(" {emoji} ×{count}");
99 }
100 println!("----------------------------------------\n");
101 }
102}
103
104async fn handle_stream(index: Arc<StatusIndex>, repos: ReposControl, mut stream: EventStream) {
105 // get handle for did through the hydrant api
106 let get_handle = async |did: &Did<'_>| {
107 repos
108 .info(did)
109 .await
110 .ok()
111 .flatten()
112 .and_then(|info| info.handle)
113 .map(|h| h.to_string())
114 .unwrap_or_else(|| did.to_string())
115 };
116 while let Some(event) = stream.next().await {
117 if let Some(rec) = event.record {
118 let did = rec.did.as_str().to_owned();
119 match rec.action.as_str() {
120 "create" | "update" => {
121 let Some(record) = rec.record else { continue };
122 let Some(emoji) = record
123 .get("status")
124 .and_then(|v| v.as_str())
125 .map(|s| s.to_owned())
126 else {
127 continue;
128 };
129 let created_at = record
130 .get("createdAt")
131 .and_then(|v| v.as_str())
132 .unwrap_or("");
133 if index.set(did.clone(), emoji.clone(), created_at) {
134 let name = get_handle(&rec.did).await;
135 println!("[{created_at}] {name}: {emoji}");
136 }
137 }
138 "delete" => {
139 let name = get_handle(&rec.did).await;
140 index.delete(&did);
141 // parse the tid to use as date since createdAt doesnt make sense here
142 let date = Tid::from_str(&rec.rkey)
143 .ok()
144 .and_then(|tid| DateTime::from_timestamp_micros(tid.timestamp() as i64))
145 .map(|date| date.to_string())
146 .unwrap_or_else(|| "invalid rkey".to_string());
147 println!("[{date}] {name} cleared status");
148 }
149 _ => {}
150 }
151 } else if let Some(account) = event.account {
152 // when an account is deactivated or deleted, drop their status.
153 if !account.active {
154 index.delete(account.did.as_str());
155 }
156 }
157 }
158}
159
160#[tokio::main]
161async fn main() -> miette::Result<()> {
162 rustls::crypto::aws_lc_rs::default_provider()
163 .install_default()
164 .ok();
165
166 tracing_subscriber::fmt()
167 .with_env_filter("hydrant=info")
168 .init();
169
170 // config is loaded from environment variables (all prefixed with HYDRANT_).
171 // key defaults for this example:
172 // DATABASE_PATH=./hydrant.db | where to store the database.
173 // RELAY_HOST=wss://relay.fire.hose.cam/ | firehose source.
174 // CRAWLER_URLS=https://lightrail.microcosm.blue | crawler sources. in filter mode this defaults to `by-collection`.
175 let cfg = Config::from_env()?;
176 let hydrant = Hydrant::new(cfg).await?;
177
178 // discover only repos that publish xyz.statusphere.status records,
179 // and only store that collection (all other record types are dropped).
180 hydrant
181 .filter
182 .set_mode(FilterMode::Filter)
183 .set_signals([COLLECTION])
184 .set_collections([COLLECTION])
185 .apply()
186 .await?;
187
188 // replay all persisted events from the start to rebuild the in-memory index,
189 // then switch to live tail. since the index is in-memory, we always need the
190 // full replay on startup.
191 let stream = hydrant.subscribe(Some(0));
192
193 let index = Arc::new(StatusIndex::new());
194 tokio::select! {
195 // this finally starts hydrant, so it will start crawling and backfilling etc.
196 r = hydrant.run()? => r,
197 _ = run_ticker(index.clone()) => Ok(()),
198 _ = handle_stream(index.clone(), hydrant.repos.clone(), stream) => Ok(()),
199 }
200}