very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
at main 200 lines 7.1 kB view raw
1//! a statusphere indexer: tracks xyz.statusphere.status records across the ATProto network. 2//! 3//! statusphere is a demo app where users set a single-emoji status on their bluesky profile. 4//! this example indexes those status records in real time, maintaining the current status 5//! per user and printing a periodic leaderboard of the top emoji statuses in use. 6//! 7//! see: https://github.com/bluesky-social/statusphere-example-app 8//! 9//! run with: 10//! HYDRANT_DATABASE_PATH=./statusphere.db cargo run --example statusphere 11//! 12//! the database persists records across restarts. on each start the full event 13//! history is replayed from the database to rebuild the in-memory index. 14//! (in a better app, we could for example use the ephemeral mode of hydrant, 15//! and use our db, or we could use hydrant to backfill multiple instances of the app.) 16 17use std::str::FromStr; 18use std::sync::Arc; 19use std::time::Duration; 20 21use chrono::DateTime; 22use futures::StreamExt; 23use hydrant::config::Config; 24use hydrant::control::{EventStream, Hydrant, ReposControl}; 25use hydrant::filter::FilterMode; 26use jacquard_common::types::did::Did; 27use jacquard_common::types::tid::Tid; 28use scc::HashMap; 29 30const COLLECTION: &str = "xyz.statusphere.status"; 31 32struct StatusEntry { 33 emoji: String, 34 created_at: String, 35} 36 37struct StatusIndex { 38 /// current status per DID: only the latest by createdAt is kept. 39 current: HashMap<String, StatusEntry>, 40} 41 42impl StatusIndex { 43 fn new() -> Self { 44 Self { 45 current: HashMap::new(), 46 } 47 } 48 49 fn set(&self, did: String, emoji: String, created_at: &str) -> bool { 50 let is_newer = self 51 .current 52 .read_sync(&did, |_, e| created_at > e.created_at.as_str()) 53 .unwrap_or(true); 54 if is_newer { 55 self.current.upsert_sync( 56 did, 57 StatusEntry { 58 emoji, 59 created_at: created_at.to_owned(), 60 }, 61 ); 62 } 63 is_newer 64 } 65 66 fn delete(&self, did: &str) { 67 self.current.remove_sync(did); 68 } 69 70 fn top(&self, n: usize) -> Vec<(String, usize)> { 71 use std::collections::HashMap; 72 let mut counts: HashMap<String, usize> = HashMap::with_capacity(self.current.capacity()); 73 self.current.iter_sync(|_, e| { 74 *counts.entry(e.emoji.clone()).or_default() += 1; 75 true 76 }); 77 let mut ranked: Vec<_> = counts.into_iter().collect(); 78 ranked.sort_by(|a, b| b.1.cmp(&a.1)); 79 ranked.truncate(n); 80 ranked 81 } 82} 83 84async fn run_ticker(index: Arc<StatusIndex>) { 85 let mut interval = tokio::time::interval(Duration::from_secs(30)); 86 interval.tick().await; 87 loop { 88 interval.tick().await; 89 let top = index.top(10); 90 if top.is_empty() { 91 continue; 92 } 93 println!( 94 "\n--- top statuses ({} users tracked) ---", 95 index.current.len() 96 ); 97 for (emoji, count) in &top { 98 println!(" {emoji} ×{count}"); 99 } 100 println!("----------------------------------------\n"); 101 } 102} 103 104async fn handle_stream(index: Arc<StatusIndex>, repos: ReposControl, mut stream: EventStream) { 105 // get handle for did through the hydrant api 106 let get_handle = async |did: &Did<'_>| { 107 repos 108 .info(did) 109 .await 110 .ok() 111 .flatten() 112 .and_then(|info| info.handle) 113 .map(|h| h.to_string()) 114 .unwrap_or_else(|| did.to_string()) 115 }; 116 while let Some(event) = stream.next().await { 117 if let Some(rec) = event.record { 118 let did = rec.did.as_str().to_owned(); 119 match rec.action.as_str() { 120 "create" | "update" => { 121 let Some(record) = rec.record else { continue }; 122 let Some(emoji) = record 123 .get("status") 124 .and_then(|v| v.as_str()) 125 .map(|s| s.to_owned()) 126 else { 127 continue; 128 }; 129 let created_at = record 130 .get("createdAt") 131 .and_then(|v| v.as_str()) 132 .unwrap_or(""); 133 if index.set(did.clone(), emoji.clone(), created_at) { 134 let name = get_handle(&rec.did).await; 135 println!("[{created_at}] {name}: {emoji}"); 136 } 137 } 138 "delete" => { 139 let name = get_handle(&rec.did).await; 140 index.delete(&did); 141 // parse the tid to use as date since createdAt doesnt make sense here 142 let date = Tid::from_str(&rec.rkey) 143 .ok() 144 .and_then(|tid| DateTime::from_timestamp_micros(tid.timestamp() as i64)) 145 .map(|date| date.to_string()) 146 .unwrap_or_else(|| "invalid rkey".to_string()); 147 println!("[{date}] {name} cleared status"); 148 } 149 _ => {} 150 } 151 } else if let Some(account) = event.account { 152 // when an account is deactivated or deleted, drop their status. 153 if !account.active { 154 index.delete(account.did.as_str()); 155 } 156 } 157 } 158} 159 160#[tokio::main] 161async fn main() -> miette::Result<()> { 162 rustls::crypto::aws_lc_rs::default_provider() 163 .install_default() 164 .ok(); 165 166 tracing_subscriber::fmt() 167 .with_env_filter("hydrant=info") 168 .init(); 169 170 // config is loaded from environment variables (all prefixed with HYDRANT_). 171 // key defaults for this example: 172 // DATABASE_PATH=./hydrant.db | where to store the database. 173 // RELAY_HOST=wss://relay.fire.hose.cam/ | firehose source. 174 // CRAWLER_URLS=https://lightrail.microcosm.blue | crawler sources. in filter mode this defaults to `by-collection`. 175 let cfg = Config::from_env()?; 176 let hydrant = Hydrant::new(cfg).await?; 177 178 // discover only repos that publish xyz.statusphere.status records, 179 // and only store that collection (all other record types are dropped). 180 hydrant 181 .filter 182 .set_mode(FilterMode::Filter) 183 .set_signals([COLLECTION]) 184 .set_collections([COLLECTION]) 185 .apply() 186 .await?; 187 188 // replay all persisted events from the start to rebuild the in-memory index, 189 // then switch to live tail. since the index is in-memory, we always need the 190 // full replay on startup. 191 let stream = hydrant.subscribe(Some(0)); 192 193 let index = Arc::new(StatusIndex::new()); 194 tokio::select! { 195 // this finally starts hydrant, so it will start crawling and backfilling etc. 196 r = hydrant.run()? => r, 197 _ = run_ticker(index.clone()) => Ok(()), 198 _ = handle_stream(index.clone(), hydrant.repos.clone(), stream) => Ok(()), 199 } 200}