Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm

upstream getrecord

+19 -57
Cargo.lock
··· 263 263 264 264 [[package]] 265 265 name = "atrium-api" 266 - version = "0.25.3" 267 - source = "git+https://github.com/uniphil/atrium?branch=fix%2Fnsid-allow-nonleading-name-digits#c4364f318d337bbc3e3e3aaf97c9f971e95f5f7e" 268 - dependencies = [ 269 - "atrium-common 0.1.2 (git+https://github.com/uniphil/atrium?branch=fix%2Fnsid-allow-nonleading-name-digits)", 270 - "atrium-xrpc 0.12.3 (git+https://github.com/uniphil/atrium?branch=fix%2Fnsid-allow-nonleading-name-digits)", 271 - "chrono", 272 - "http", 273 - "ipld-core", 274 - "langtag", 275 - "regex", 276 - "serde", 277 - "serde_bytes", 278 - "serde_json", 279 - "thiserror 1.0.69", 280 - "trait-variant", 281 - ] 282 - 283 - [[package]] 284 - name = "atrium-api" 285 266 version = "0.25.4" 286 267 source = "registry+https://github.com/rust-lang/crates.io-index" 287 268 checksum = "46355d3245edc7b3160b2a45fe55d09a6963ebd3eee0252feb6b72fb0eb71463" 288 269 dependencies = [ 289 - "atrium-common 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", 290 - "atrium-xrpc 0.12.3 (registry+https://github.com/rust-lang/crates.io-index)", 270 + "atrium-common", 271 + "atrium-xrpc", 291 272 "chrono", 292 273 "http", 293 274 "ipld-core", ··· 317 298 ] 318 299 319 300 [[package]] 320 - name = "atrium-common" 321 - version = "0.1.2" 322 - source = "git+https://github.com/uniphil/atrium?branch=fix%2Fnsid-allow-nonleading-name-digits#c4364f318d337bbc3e3e3aaf97c9f971e95f5f7e" 323 - dependencies = [ 324 - "dashmap", 325 - "lru", 326 - "moka", 327 - "thiserror 1.0.69", 328 - "tokio", 329 - "trait-variant", 330 - "web-time", 331 - ] 332 - 333 - [[package]] 334 301 name = "atrium-identity" 335 302 version = "0.1.5" 336 303 source = "registry+https://github.com/rust-lang/crates.io-index" 337 304 checksum = "c9e2d42bb4dbea038f4f5f45e3af2a89d61a9894a75f06aa550b74a60d2be380" 338 305 dependencies = [ 339 - "atrium-api 0.25.4", 340 - "atrium-common 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", 341 - "atrium-xrpc 0.12.3 (registry+https://github.com/rust-lang/crates.io-index)", 306 + "atrium-api", 307 + "atrium-common", 308 + "atrium-xrpc", 342 309 "serde", 343 310 "serde_html_form", 344 311 "serde_json", ··· 352 319 source = "registry+https://github.com/rust-lang/crates.io-index" 353 320 checksum = "ca22dc4eaf77fd9bf050b21192ac58cd654a437d28e000ec114ebd93a51d36f5" 354 321 dependencies = [ 355 - "atrium-api 0.25.4", 356 - "atrium-common 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", 322 + "atrium-api", 323 + "atrium-common", 357 324 "atrium-identity", 358 - "atrium-xrpc 0.12.3 (registry+https://github.com/rust-lang/crates.io-index)", 325 + "atrium-xrpc", 359 326 "base64 0.22.1", 360 327 "chrono", 361 328 "dashmap", ··· 390 357 ] 391 358 392 359 [[package]] 393 - name = "atrium-xrpc" 394 - version = "0.12.3" 395 - source = "git+https://github.com/uniphil/atrium?branch=fix%2Fnsid-allow-nonleading-name-digits#c4364f318d337bbc3e3e3aaf97c9f971e95f5f7e" 396 - dependencies = [ 397 - "http", 398 - "serde", 399 - "serde_html_form", 400 - "serde_json", 401 - "thiserror 1.0.69", 402 - "trait-variant", 403 - ] 404 - 405 - [[package]] 406 360 name = "auto_enums" 407 361 version = "0.8.7" 408 362 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 2634 2588 dependencies = [ 2635 2589 "anyhow", 2636 2590 "async-trait", 2637 - "atrium-api 0.25.3", 2591 + "atrium-api", 2638 2592 "chrono", 2639 2593 "clap", 2640 2594 "futures-util", ··· 4797 4751 name = "slingshot" 4798 4752 version = "0.1.0" 4799 4753 dependencies = [ 4754 + "atrium-api", 4755 + "atrium-common", 4756 + "atrium-identity", 4757 + "atrium-oauth", 4800 4758 "clap", 4801 4759 "ctrlc", 4802 4760 "env_logger", 4803 4761 "foyer", 4762 + "hickory-resolver", 4804 4763 "jetstream", 4805 4764 "log", 4806 4765 "metrics", 4807 4766 "metrics-exporter-prometheus 0.17.2", 4808 4767 "poem", 4809 4768 "poem-openapi", 4769 + "reqwest", 4810 4770 "serde", 4811 4771 "serde_json", 4812 4772 "thiserror 2.0.12", 4773 + "time", 4813 4774 "tokio", 4814 4775 "tokio-util", 4776 + "url", 4815 4777 ] 4816 4778 4817 4779 [[package]] ··· 5880 5842 name = "who-am-i" 5881 5843 version = "0.1.0" 5882 5844 dependencies = [ 5883 - "atrium-api 0.25.4", 5884 - "atrium-common 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", 5845 + "atrium-api", 5846 + "atrium-common", 5885 5847 "atrium-identity", 5886 5848 "atrium-oauth", 5887 5849 "axum",
+1 -1
jetstream/Cargo.toml
··· 10 10 11 11 [dependencies] 12 12 async-trait = "0.1.83" 13 - atrium-api = { git = "https://github.com/uniphil/atrium", branch = "fix/nsid-allow-nonleading-name-digits", default-features = false, features = [ 13 + atrium-api = { version = "0.25.4", default-features = false, features = [ 14 14 "namespace-appbsky", 15 15 ] } 16 16 tokio = { version = "1.44.2", features = ["full", "sync", "time"] }
+8
slingshot/Cargo.toml
··· 4 4 edition = "2024" 5 5 6 6 [dependencies] 7 + atrium-api = { version = "0.25.4", default-features = false } 8 + atrium-common = "0.1.2" 9 + atrium-identity = "0.1.5" 10 + atrium-oauth = "0.1.3" 7 11 clap = { version = "4.5.41", features = ["derive"] } 8 12 ctrlc = "3.4.7" 9 13 env_logger = "0.11.8" 10 14 foyer = { version = "0.18.0", features = ["serde"] } 15 + hickory-resolver = "0.25.2" 11 16 jetstream = { path = "../jetstream", features = ["metrics"] } 12 17 log = "0.4.27" 13 18 metrics = "0.24.2" 14 19 metrics-exporter-prometheus = { version = "0.17.1", features = ["http-listener"] } 15 20 poem = "3.1.12" 16 21 poem-openapi = { version = "5.1.16", features = ["scalar"] } 22 + reqwest = { version = "0.12.22", features = ["json"] } 17 23 serde = { version = "1.0.219", features = ["derive"] } 18 24 serde_json = { version = "1.0.141", features = ["raw_value"] } 19 25 thiserror = "2.0.12" 26 + time = { version = "0.3.41", features = ["serde"] } 20 27 tokio = { version = "1.47.0", features = ["full"] } 21 28 tokio-util = "0.7.15" 29 + url = "2.5.4"
+7
slingshot/readme.md
··· 1 + # slingshot: atproto record edge cache 2 + 3 + local dev running: 4 + 5 + ```bash 6 + RUST_LOG=info,slingshot=trace ulimit -n 4096 && RUST_LOG=info cargo run -- --jetstream us-east-1 --cache-dir ./foyer 7 + ```
+49
slingshot/src/error.rs
··· 19 19 } 20 20 21 21 #[derive(Debug, Error)] 22 + pub enum IdentityError { 23 + #[error("whatever: {0}")] 24 + WhateverError(String), 25 + #[error("bad DID: {0}")] 26 + BadDid(&'static str), 27 + #[error("identity types got mixed up: {0}")] 28 + IdentityValTypeMixup(String), 29 + #[error("foyer error: {0}")] 30 + FoyerError(#[from] foyer::Error), 31 + 32 + #[error("failed to resolve: {0}")] 33 + ResolutionFailed(#[from] atrium_identity::Error), 34 + // #[error("identity resolved but no handle found for user")] 35 + // NoHandle, 36 + #[error("found handle {0:?} but it appears invalid: {1}")] 37 + InvalidHandle(String, &'static str), 38 + 39 + #[error("could not convert atrium did doc to partial mini doc: {0}")] 40 + BadDidDoc(String), 41 + 42 + #[error("wrong key for clearing refresh queue: {0}")] 43 + RefreshQueueKeyError(&'static str), 44 + } 45 + 46 + #[derive(Debug, Error)] 22 47 pub enum MainTaskError { 23 48 #[error(transparent)] 24 49 ConsumerTaskError(#[from] ConsumerError), 25 50 #[error(transparent)] 26 51 ServerTaskError(#[from] ServerError), 52 + #[error(transparent)] 53 + IdentityTaskError(#[from] IdentityError), 54 + } 55 + 56 + #[derive(Debug, Error)] 57 + pub enum RecordError { 58 + #[error("identity error: {0}")] 59 + IdentityError(#[from] IdentityError), 60 + #[error("repo could not be validated as either a DID or an atproto handle")] 61 + BadRepo, 62 + #[error("could not get record: {0}")] 63 + NotFound(&'static str), 64 + #[error("could nto parse pds url: {0}")] 65 + UrlParseError(#[from] url::ParseError), 66 + #[error("reqwest send failed: {0}")] 67 + SendError(reqwest::Error), 68 + #[error("reqwest raised for status: {0}")] 69 + StatusError(reqwest::Error), 70 + #[error("reqwest failed to parse json: {0}")] 71 + ParseJsonError(reqwest::Error), 72 + #[error("upstream getRecord did not include a CID")] 73 + MissingUpstreamCid, 74 + #[error("upstream CID was not valid: {0}")] 75 + BadUpstreamCid(String), 27 76 }
+2 -2
slingshot/src/firehose_cache.rs
··· 3 3 use std::path::Path; 4 4 5 5 pub async fn firehose_cache( 6 - dir: impl AsRef<Path>, 6 + cache_dir: impl AsRef<Path>, 7 7 ) -> Result<HybridCache<String, CachedRecord>, String> { 8 8 let cache = HybridCacheBuilder::new() 9 9 .with_name("firehose") 10 10 .memory(64 * 2_usize.pow(20)) 11 11 .with_weighter(|k: &String, v| k.len() + std::mem::size_of_val(v)) 12 12 .storage(Engine::large()) 13 - .with_device_options(DirectFsDeviceOptions::new(dir)) 13 + .with_device_options(DirectFsDeviceOptions::new(cache_dir)) 14 14 .build() 15 15 .await 16 16 .map_err(|e| format!("foyer setup error: {e:?}"))?;
+510
slingshot/src/identity.rs
··· 1 + use hickory_resolver::{ResolveError, TokioResolver}; 2 + use std::collections::{HashSet, VecDeque}; 3 + use std::path::Path; 4 + use std::sync::Arc; 5 + /// for now we're gonna just keep doing more cache 6 + /// 7 + /// plc.director x foyer, ttl kept with data, refresh deferred to background on fetch 8 + /// 9 + /// things we need: 10 + /// 11 + /// 1. handle -> DID resolution: getRecord must accept a handle for `repo` param 12 + /// 2. DID -> PDS resolution: so we know where to getRecord 13 + /// 3. DID -> handle resolution: for bidirectional handle validation and in case we want to offer this 14 + use std::time::Duration; 15 + use tokio::sync::Mutex; 16 + 17 + use crate::error::IdentityError; 18 + use atrium_api::{ 19 + did_doc::DidDocument, 20 + types::string::{Did, Handle}, 21 + }; 22 + use atrium_common::resolver::Resolver; 23 + use atrium_identity::{ 24 + did::{CommonDidResolver, CommonDidResolverConfig, DEFAULT_PLC_DIRECTORY_URL}, 25 + handle::{AtprotoHandleResolver, AtprotoHandleResolverConfig, DnsTxtResolver}, 26 + }; 27 + use atrium_oauth::DefaultHttpClient; // it's probably not worth bringing all of atrium_oauth for this but 28 + use foyer::{DirectFsDeviceOptions, Engine, HybridCache, HybridCacheBuilder}; 29 + use serde::{Deserialize, Serialize}; 30 + use time::UtcDateTime; 31 + 32 + /// once we have something resolved, don't re-resolve until after this period 33 + const MIN_TTL: Duration = Duration::from_secs(4 * 3600); // probably shoudl have a max ttl 34 + const MIN_NOT_FOUND_TTL: Duration = Duration::from_secs(60); 35 + 36 + #[derive(Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)] 37 + enum IdentityKey { 38 + Handle(Handle), 39 + Did(Did), 40 + } 41 + 42 + #[derive(Debug, Serialize, Deserialize)] 43 + struct IdentityVal(UtcDateTime, IdentityData); 44 + 45 + #[derive(Debug, Serialize, Deserialize)] 46 + enum IdentityData { 47 + NotFound, 48 + Did(Did), 49 + Doc(PartialMiniDoc), 50 + } 51 + 52 + /// partial representation of a com.bad-example.identity mini atproto doc 53 + /// 54 + /// partial because the handle is not verified 55 + #[derive(Debug, Clone, Serialize, Deserialize)] 56 + struct PartialMiniDoc { 57 + /// an atproto handle (**unverified**) 58 + /// 59 + /// the first valid atproto handle from the did doc's aka 60 + unverified_handle: Handle, 61 + /// the did's atproto pds url (TODO: type this?) 62 + /// 63 + /// note: atrium *does* actually parse it into a URI, it just doesn't return 64 + /// that for some reason 65 + pds: String, 66 + /// for now we're just pulling this straight from the did doc 67 + /// 68 + /// would be nice to type and validate it 69 + /// 70 + /// this is the publicKeyMultibase from the did doc. 71 + /// legacy key encoding not supported. 72 + /// `id`, `type`, and `controller` must be checked, but aren't stored. 73 + signing_key: String, 74 + } 75 + 76 + impl TryFrom<DidDocument> for PartialMiniDoc { 77 + type Error = String; 78 + fn try_from(did_doc: DidDocument) -> Result<Self, Self::Error> { 79 + // must use the first valid handle 80 + let mut unverified_handle = None; 81 + let Some(ref doc_akas) = did_doc.also_known_as else { 82 + return Err("did doc missing `also_known_as`".to_string()); 83 + }; 84 + for aka in doc_akas { 85 + let Some(maybe_handle) = aka.strip_prefix("at://") else { 86 + continue; 87 + }; 88 + let Ok(valid_handle) = Handle::new(maybe_handle.to_string()) else { 89 + continue; 90 + }; 91 + unverified_handle = Some(valid_handle); 92 + break; 93 + } 94 + let Some(unverified_handle) = unverified_handle else { 95 + return Err("no valid atproto handles in `also_known_as`".to_string()); 96 + }; 97 + 98 + // atrium seems to get service endpoint getters 99 + let Some(pds) = did_doc.get_pds_endpoint() else { 100 + return Err("no valid pds service found".to_string()); 101 + }; 102 + 103 + // TODO can't use atrium's get_signing_key() becuase it fails to check type and controller 104 + // so if we check those and reject it, we might miss a later valid key in the array 105 + // (todo is to fix atrium) 106 + // actually: atrium might be flexible for legacy reps. for now we're rejecting legacy rep. 107 + 108 + // must use the first valid signing key 109 + let mut signing_key = None; 110 + let Some(verification_methods) = did_doc.verification_method else { 111 + return Err("no verification methods found".to_string()); 112 + }; 113 + for method in verification_methods { 114 + if method.id != format!("{}#atproto", did_doc.id) { 115 + continue; 116 + } 117 + if method.r#type != "Multikey" { 118 + continue; 119 + } 120 + if method.controller != did_doc.id { 121 + continue; 122 + } 123 + let Some(key) = method.public_key_multibase else { 124 + continue; 125 + }; 126 + signing_key = Some(key); 127 + break; 128 + } 129 + let Some(signing_key) = signing_key else { 130 + return Err("no valid atproto signing key found in verification methods".to_string()); 131 + }; 132 + 133 + Ok(PartialMiniDoc { 134 + unverified_handle, 135 + pds, 136 + signing_key, 137 + }) 138 + } 139 + } 140 + 141 + /// multi-producer *single-consumer* queue structures (wrap in arc-mutex plz) 142 + /// 143 + /// the hashset allows testing for presense of items in the queue. 144 + /// this has absolutely no support for multiple queue consumers. 145 + #[derive(Debug, Default)] 146 + struct RefreshQueue { 147 + queue: VecDeque<IdentityKey>, 148 + items: HashSet<IdentityKey>, 149 + } 150 + 151 + #[derive(Clone)] 152 + pub struct Identity { 153 + handle_resolver: Arc<AtprotoHandleResolver<HickoryDnsTxtResolver, DefaultHttpClient>>, 154 + did_resolver: Arc<CommonDidResolver<DefaultHttpClient>>, 155 + cache: HybridCache<IdentityKey, IdentityVal>, 156 + /// multi-producer *single consumer* queue 157 + refresh_queue: Arc<Mutex<RefreshQueue>>, 158 + /// just a lock to ensure only one refresher (queue consumer) is running (to be improved with a better refresher) 159 + refresher: Arc<Mutex<()>>, 160 + } 161 + 162 + impl Identity { 163 + pub async fn new(cache_dir: impl AsRef<Path>) -> Result<Self, IdentityError> { 164 + let http_client = Arc::new(DefaultHttpClient::default()); 165 + let handle_resolver = AtprotoHandleResolver::new(AtprotoHandleResolverConfig { 166 + dns_txt_resolver: HickoryDnsTxtResolver::new().unwrap(), 167 + http_client: http_client.clone(), 168 + }); 169 + let did_resolver = CommonDidResolver::new(CommonDidResolverConfig { 170 + plc_directory_url: DEFAULT_PLC_DIRECTORY_URL.to_string(), 171 + http_client: http_client.clone(), 172 + }); 173 + 174 + let cache = HybridCacheBuilder::new() 175 + .with_name("identity") 176 + .memory(16 * 2_usize.pow(20)) 177 + .with_weighter(|k, v| std::mem::size_of_val(k) + std::mem::size_of_val(v)) 178 + .storage(Engine::large()) 179 + .with_device_options(DirectFsDeviceOptions::new(cache_dir)) 180 + .build() 181 + .await?; 182 + 183 + Ok(Self { 184 + handle_resolver: Arc::new(handle_resolver), 185 + did_resolver: Arc::new(did_resolver), 186 + cache, 187 + refresh_queue: Default::default(), 188 + refresher: Default::default(), 189 + }) 190 + } 191 + 192 + /// Resolve (and verify!) an atproto handle to a DID 193 + /// 194 + /// The result can be stale 195 + /// 196 + /// `None` if the handle can't be found or verification fails 197 + pub async fn handle_to_did(&self, handle: Handle) -> Result<Option<Did>, IdentityError> { 198 + let Some(did) = self.handle_to_unverified_did(&handle).await? else { 199 + return Ok(None); 200 + }; 201 + let Some(doc) = self.did_to_partial_mini_doc(&did).await? else { 202 + return Ok(None); 203 + }; 204 + if doc.unverified_handle != handle { 205 + return Ok(None); 206 + } 207 + Ok(Some(did)) 208 + } 209 + 210 + /// Resolve (and verify!) a DID to a pds url 211 + /// 212 + /// This *also* incidentally resolves and verifies the handle, which might 213 + /// make it slower than expected 214 + pub async fn did_to_pds(&self, did: Did) -> Result<Option<String>, IdentityError> { 215 + let Some(mini_doc) = self.did_to_partial_mini_doc(&did).await? else { 216 + return Ok(None); 217 + }; 218 + Ok(Some(mini_doc.pds)) 219 + } 220 + 221 + /// Resolve (and cache but **not verify**) a handle to a DID 222 + async fn handle_to_unverified_did( 223 + &self, 224 + handle: &Handle, 225 + ) -> Result<Option<Did>, IdentityError> { 226 + let key = IdentityKey::Handle(handle.clone()); 227 + let entry = self 228 + .cache 229 + .fetch(key.clone(), { 230 + let handle = handle.clone(); 231 + let resolver = self.handle_resolver.clone(); 232 + || async move { 233 + match resolver.resolve(&handle).await { 234 + Ok(did) => Ok(IdentityVal(UtcDateTime::now(), IdentityData::Did(did))), 235 + Err(atrium_identity::Error::NotFound) => { 236 + Ok(IdentityVal(UtcDateTime::now(), IdentityData::NotFound)) 237 + } 238 + Err(other) => Err(foyer::Error::Other(Box::new( 239 + IdentityError::ResolutionFailed(other), 240 + ))), 241 + } 242 + } 243 + }) 244 + .await?; 245 + 246 + let now = UtcDateTime::now(); 247 + let IdentityVal(last_fetch, data) = entry.value(); 248 + match data { 249 + IdentityData::Doc(_) => { 250 + log::error!("identity value mixup: got a doc from a handle key (should be a did)"); 251 + Err(IdentityError::IdentityValTypeMixup(handle.to_string())) 252 + } 253 + IdentityData::NotFound => { 254 + if (now - *last_fetch) >= MIN_NOT_FOUND_TTL { 255 + self.queue_refresh(key).await; 256 + } 257 + Ok(None) 258 + } 259 + IdentityData::Did(did) => { 260 + if (now - *last_fetch) >= MIN_TTL { 261 + self.queue_refresh(key).await; 262 + } 263 + Ok(Some(did.clone())) 264 + } 265 + } 266 + } 267 + 268 + /// Fetch (and cache) a partial mini doc from a did 269 + async fn did_to_partial_mini_doc( 270 + &self, 271 + did: &Did, 272 + ) -> Result<Option<PartialMiniDoc>, IdentityError> { 273 + let key = IdentityKey::Did(did.clone()); 274 + let entry = self 275 + .cache 276 + .fetch(key.clone(), { 277 + let did = did.clone(); 278 + let resolver = self.did_resolver.clone(); 279 + || async move { 280 + match resolver.resolve(&did).await { 281 + Ok(did_doc) => { 282 + // TODO: fix in atrium: should verify id is did 283 + if did_doc.id != did.to_string() { 284 + return Err(foyer::Error::other(Box::new( 285 + IdentityError::BadDidDoc( 286 + "did doc's id did not match did".to_string(), 287 + ), 288 + ))); 289 + } 290 + let mini_doc = did_doc.try_into().map_err(|e| { 291 + foyer::Error::Other(Box::new(IdentityError::BadDidDoc(e))) 292 + })?; 293 + Ok(IdentityVal(UtcDateTime::now(), IdentityData::Doc(mini_doc))) 294 + } 295 + Err(atrium_identity::Error::NotFound) => { 296 + Ok(IdentityVal(UtcDateTime::now(), IdentityData::NotFound)) 297 + } 298 + Err(other) => Err(foyer::Error::Other(Box::new( 299 + IdentityError::ResolutionFailed(other), 300 + ))), 301 + } 302 + } 303 + }) 304 + .await?; 305 + 306 + let now = UtcDateTime::now(); 307 + let IdentityVal(last_fetch, data) = entry.value(); 308 + match data { 309 + IdentityData::Did(_) => { 310 + log::error!("identity value mixup: got a did from a did key (should be a doc)"); 311 + Err(IdentityError::IdentityValTypeMixup(did.to_string())) 312 + } 313 + IdentityData::NotFound => { 314 + if (now - *last_fetch) >= MIN_NOT_FOUND_TTL { 315 + self.queue_refresh(key).await; 316 + } 317 + Ok(None) 318 + } 319 + IdentityData::Doc(mini_did) => { 320 + if (now - *last_fetch) >= MIN_TTL { 321 + self.queue_refresh(key).await; 322 + } 323 + Ok(Some(mini_did.clone())) 324 + } 325 + } 326 + } 327 + 328 + /// put a refresh task on the queue 329 + /// 330 + /// this can be safely called from multiple concurrent tasks 331 + async fn queue_refresh(&self, key: IdentityKey) { 332 + // todo: max queue size 333 + let mut q = self.refresh_queue.lock().await; 334 + if !q.items.contains(&key) { 335 + q.items.insert(key.clone()); 336 + q.queue.push_back(key); 337 + } 338 + } 339 + 340 + /// find out what's next in the queue. concurrent consumers are not allowed. 341 + /// 342 + /// intent is to leave the item in the queue while refreshing, so that a 343 + /// producer will not re-add it if it's in progress. there's definitely 344 + /// better ways to do this, but this is ~simple for as far as a single 345 + /// consumer can take us. 346 + /// 347 + /// we could take it from the queue but leave it in the set and remove from 348 + /// set later, but splitting them apart feels more bug-prone. 349 + async fn peek_refresh(&self) -> Option<IdentityKey> { 350 + let q = self.refresh_queue.lock().await; 351 + q.queue.front().cloned() 352 + } 353 + 354 + /// call to clear the latest key from the refresh queue. concurrent consumers not allowed. 355 + /// 356 + /// must provide the last peeked refresh queue item as a small safety check 357 + async fn complete_refresh(&self, key: &IdentityKey) -> Result<(), IdentityError> { 358 + let mut q = self.refresh_queue.lock().await; 359 + 360 + let Some(queue_key) = q.queue.pop_front() else { 361 + // gone from queue + since we're in an error condition, make sure it's not stuck in items 362 + // (not toctou because we have the lock) 363 + // bolder here than below and removing from items because if the queue is *empty*, then we 364 + // know it hasn't been re-added since losing sync. 365 + if q.items.remove(key) { 366 + log::error!("identity refresh: queue de-sync: not in "); 367 + } else { 368 + log::warn!( 369 + "identity refresh: tried to complete with wrong key. are multiple queue consumers running?" 370 + ); 371 + } 372 + return Err(IdentityError::RefreshQueueKeyError("no key in queue")); 373 + }; 374 + 375 + if queue_key != *key { 376 + // extra weird case here, what's the most defensive behaviour? 377 + // we have two keys: ours should have been first but isn't. this shouldn't happen, so let's 378 + // just leave items alone for it. risks unbounded growth but we're in a bad place already. 379 + // the other key is the one we just popped. we didn't want it, so maybe we should put it 380 + // back, BUT if we somehow ended up with concurrent consumers, we have bigger problems. take 381 + // responsibility for taking it instead: remove it from items as well, and just drop it. 382 + // 383 + // hope that whoever calls us takes this error seriously. 384 + if q.items.remove(&queue_key) { 385 + log::warn!( 386 + "identity refresh: queue de-sync + dropping a bystander key without refreshing it!" 387 + ); 388 + } else { 389 + // you thought things couldn't get weirder? (i mean hopefully they can't) 390 + log::error!("identity refresh: queue de-sync + bystander key also de-sync!?"); 391 + } 392 + return Err(IdentityError::RefreshQueueKeyError( 393 + "wrong key at front of queue", 394 + )); 395 + } 396 + 397 + if q.items.remove(key) { 398 + Ok(()) 399 + } else { 400 + log::error!("identity refresh: queue de-sync: key not in items"); 401 + Err(IdentityError::RefreshQueueKeyError("key not in items")) 402 + } 403 + } 404 + 405 + /// run the refresh queue consumer 406 + pub async fn run_refresher(&self) -> Result<(), IdentityError> { 407 + let _guard = self 408 + .refresher 409 + .try_lock() 410 + .expect("there to only be one refresher running"); 411 + loop { 412 + let Some(task_key) = self.peek_refresh().await else { 413 + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; 414 + continue; 415 + }; 416 + match task_key { 417 + IdentityKey::Handle(ref handle) => { 418 + log::trace!("refreshing handle {handle:?}"); 419 + match self.handle_resolver.resolve(handle).await { 420 + Ok(did) => { 421 + self.cache.insert( 422 + task_key.clone(), 423 + IdentityVal(UtcDateTime::now(), IdentityData::Did(did)), 424 + ); 425 + } 426 + Err(atrium_identity::Error::NotFound) => { 427 + self.cache.insert( 428 + task_key.clone(), 429 + IdentityVal(UtcDateTime::now(), IdentityData::NotFound), 430 + ); 431 + } 432 + Err(err) => { 433 + log::warn!( 434 + "failed to refresh handle: {err:?}. leaving stale (should we eventually do something?)" 435 + ); 436 + } 437 + } 438 + self.complete_refresh(&task_key).await?; // failures are bugs, so break loop 439 + } 440 + IdentityKey::Did(ref did) => { 441 + log::trace!("refreshing did doc: {did:?}"); 442 + 443 + match self.did_resolver.resolve(did).await { 444 + Ok(did_doc) => { 445 + // TODO: fix in atrium: should verify id is did 446 + if did_doc.id != did.to_string() { 447 + log::warn!( 448 + "refreshed did doc failed: wrong did doc id. dropping refresh." 449 + ); 450 + continue; 451 + } 452 + let mini_doc = match did_doc.try_into() { 453 + Ok(md) => md, 454 + Err(e) => { 455 + log::warn!( 456 + "converting mini doc failed: {e:?}. dropping refresh." 457 + ); 458 + continue; 459 + } 460 + }; 461 + self.cache.insert( 462 + task_key.clone(), 463 + IdentityVal(UtcDateTime::now(), IdentityData::Doc(mini_doc)), 464 + ); 465 + } 466 + Err(atrium_identity::Error::NotFound) => { 467 + self.cache.insert( 468 + task_key.clone(), 469 + IdentityVal(UtcDateTime::now(), IdentityData::NotFound), 470 + ); 471 + } 472 + Err(err) => { 473 + log::warn!( 474 + "failed to refresh did doc: {err:?}. leaving stale (should we eventually do something?)" 475 + ); 476 + } 477 + } 478 + 479 + self.complete_refresh(&task_key).await?; // failures are bugs, so break loop 480 + } 481 + } 482 + } 483 + } 484 + } 485 + 486 + pub struct HickoryDnsTxtResolver(TokioResolver); 487 + 488 + impl HickoryDnsTxtResolver { 489 + fn new() -> Result<Self, ResolveError> { 490 + Ok(Self(TokioResolver::builder_tokio()?.build())) 491 + } 492 + } 493 + 494 + impl DnsTxtResolver for HickoryDnsTxtResolver { 495 + async fn resolve( 496 + &self, 497 + query: &str, 498 + ) -> core::result::Result<Vec<String>, Box<dyn std::error::Error + Send + Sync>> { 499 + match self.0.txt_lookup(query).await { 500 + Ok(r) => { 501 + metrics::counter!("whoami_resolve_dns_txt", "success" => "true").increment(1); 502 + Ok(r.iter().map(|r| r.to_string()).collect()) 503 + } 504 + Err(e) => { 505 + metrics::counter!("whoami_resolve_dns_txt", "success" => "false").increment(1); 506 + Err(e.into()) 507 + } 508 + } 509 + } 510 + }
+3 -1
slingshot/src/lib.rs
··· 1 1 mod consumer; 2 2 pub mod error; 3 3 mod firehose_cache; 4 + mod identity; 4 5 mod record; 5 6 mod server; 6 7 7 8 pub use consumer::consume; 8 9 pub use firehose_cache::firehose_cache; 9 - pub use record::CachedRecord; 10 + pub use identity::Identity; 11 + pub use record::{CachedRecord, Repo}; 10 12 pub use server::serve;
+34 -3
slingshot/src/main.rs
··· 1 1 // use foyer::HybridCache; 2 2 // use foyer::{Engine, DirectFsDeviceOptions, HybridCacheBuilder}; 3 3 use metrics_exporter_prometheus::PrometheusBuilder; 4 - use slingshot::{consume, error::MainTaskError, firehose_cache, serve}; 4 + use slingshot::{Identity, Repo, consume, error::MainTaskError, firehose_cache, serve}; 5 + use std::path::PathBuf; 5 6 6 7 use clap::Parser; 7 8 use tokio_util::sync::CancellationToken; ··· 19 20 /// reduces CPU at the expense of more ingress bandwidth 20 21 #[arg(long, action)] 21 22 jetstream_no_zstd: bool, 23 + /// where to keep disk caches 24 + #[arg(long)] 25 + cache_dir: PathBuf, 22 26 } 23 27 24 28 #[tokio::main] ··· 38 42 log::info!("metrics listening at http://0.0.0.0:8765"); 39 43 } 40 44 45 + std::fs::create_dir_all(&args.cache_dir).map_err(|e| { 46 + format!( 47 + "failed to ensure cache parent dir: {e:?} (dir: {:?})", 48 + args.cache_dir 49 + ) 50 + })?; 51 + let cache_dir = args.cache_dir.canonicalize().map_err(|e| { 52 + format!( 53 + "failed to canonicalize cache_dir: {e:?} (dir: {:?})", 54 + args.cache_dir 55 + ) 56 + })?; 57 + log::info!("cache dir ready at at {cache_dir:?}."); 58 + 41 59 log::info!("setting up firehose cache..."); 42 - let cache = firehose_cache("./foyer").await?; 60 + let cache = firehose_cache(cache_dir.join("./firehose")).await?; 43 61 log::info!("firehose cache ready."); 44 62 45 63 let mut tasks: tokio::task::JoinSet<Result<(), MainTaskError>> = tokio::task::JoinSet::new(); 46 64 65 + log::info!("starting identity service..."); 66 + let identity = Identity::new(cache_dir.join("./identity")) 67 + .await 68 + .map_err(|e| format!("identity setup failed: {e:?}"))?; 69 + log::info!("identity service ready."); 70 + let identity_refresher = identity.clone(); 71 + tasks.spawn(async move { 72 + identity_refresher.run_refresher().await?; 73 + Ok(()) 74 + }); 75 + 76 + let repo = Repo::new(identity); 77 + 47 78 let server_shutdown = shutdown.clone(); 48 79 let server_cache_handle = cache.clone(); 49 80 tasks.spawn(async move { 50 - serve(server_cache_handle, server_shutdown).await?; 81 + serve(server_cache_handle, repo, server_shutdown).await?; 51 82 Ok(()) 52 83 }); 53 84
+100 -1
slingshot/src/record.rs
··· 1 - use jetstream::exports::Cid; 1 + //! cached record storage 2 + 3 + use crate::{Identity, error::RecordError}; 4 + use atrium_api::types::string::{Cid, Did, Handle}; 5 + use reqwest::Client; 2 6 use serde::{Deserialize, Serialize}; 3 7 use serde_json::value::RawValue; 8 + use std::str::FromStr; 9 + use std::time::Duration; 10 + use url::Url; 4 11 5 12 #[derive(Debug, Serialize, Deserialize)] 6 13 pub struct RawRecord { ··· 34 41 Found(RawRecord), 35 42 Deleted, 36 43 } 44 + 45 + //////// upstream record fetching 46 + 47 + #[derive(Deserialize)] 48 + struct RecordResponseObject { 49 + #[allow(dead_code)] // expect it to be there but we ignore it 50 + uri: String, 51 + /// CID for this exact version of the record 52 + /// 53 + /// this is optional in the spec and that's potentially TODO for slingshot 54 + cid: Option<String>, 55 + /// the record itself as JSON 56 + value: Box<RawValue>, 57 + } 58 + 59 + #[derive(Clone)] 60 + pub struct Repo { 61 + identity: Identity, 62 + client: Client, 63 + } 64 + 65 + impl Repo { 66 + pub fn new(identity: Identity) -> Self { 67 + let client = Client::builder() 68 + .user_agent(format!( 69 + "microcosm slingshot v{} (dev: @bad-example.com)", 70 + env!("CARGO_PKG_VERSION") 71 + )) 72 + .no_proxy() 73 + .timeout(Duration::from_secs(10)) 74 + .build() 75 + .unwrap(); 76 + Repo { identity, client } 77 + } 78 + 79 + pub async fn get_record( 80 + &self, 81 + did_or_handle: String, 82 + collection: String, 83 + rkey: String, 84 + cid: Option<String>, 85 + ) -> Result<CachedRecord, RecordError> { 86 + let did = match Did::new(did_or_handle.clone()) { 87 + Ok(did) => did, 88 + Err(_) => { 89 + let handle = Handle::new(did_or_handle).map_err(|_| RecordError::BadRepo)?; 90 + let Some(did) = self.identity.handle_to_did(handle).await? else { 91 + return Err(RecordError::NotFound("could not resolve and verify handle")); 92 + }; 93 + did 94 + } 95 + }; 96 + let Some(pds) = self.identity.did_to_pds(did.clone()).await? else { 97 + return Err(RecordError::NotFound("could not get pds for DID")); 98 + }; 99 + 100 + // TODO: throttle by host probably, generally guard against outgoing requests 101 + 102 + let mut params = vec![ 103 + ("repo", did.to_string()), 104 + ("collection", collection), 105 + ("rkey", rkey), 106 + ]; 107 + if let Some(cid) = cid { 108 + params.push(("cid", cid)); 109 + } 110 + let mut url = Url::parse_with_params(&pds, &params)?; 111 + url.set_path("/xrpc/com.atproto.repo.getRecord"); 112 + 113 + let res = self 114 + .client 115 + .get(url) 116 + .send() 117 + .await 118 + .map_err(RecordError::SendError)? 119 + .error_for_status() 120 + .map_err(RecordError::StatusError)? // TODO atproto error handling (think about handling not found) 121 + .json::<RecordResponseObject>() 122 + .await 123 + .map_err(RecordError::ParseJsonError)?; // todo... 124 + 125 + let Some(cid) = res.cid else { 126 + return Err(RecordError::MissingUpstreamCid); 127 + }; 128 + let cid = Cid::from_str(&cid).map_err(|e| RecordError::BadUpstreamCid(e.to_string()))?; 129 + 130 + Ok(CachedRecord::Found(RawRecord { 131 + cid, 132 + record: res.value.to_string(), 133 + })) 134 + } 135 + }
+38 -11
slingshot/src/server.rs
··· 1 - use crate::{CachedRecord, error::ServerError}; 1 + use crate::{CachedRecord, Repo, error::ServerError}; 2 2 use foyer::HybridCache; 3 + use std::sync::Arc; 3 4 use tokio_util::sync::CancellationToken; 4 5 5 6 use poem::{Route, Server, listener::TcpListener}; ··· 94 95 95 96 struct Xrpc { 96 97 cache: HybridCache<String, CachedRecord>, 98 + repo: Arc<Repo>, 97 99 } 98 100 99 101 #[OpenApi] ··· 112 114 /// 113 115 /// NOTE: handles should be accepted here but this is still TODO in slingshot 114 116 #[oai(example = "example_did")] 115 - repo: Query<String>, 117 + Query(repo): Query<String>, 116 118 /// The NSID of the record collection 117 119 #[oai(example = "example_collection")] 118 - collection: Query<String>, 120 + Query(collection): Query<String>, 119 121 /// The Record key 120 122 #[oai(example = "example_rkey")] 121 - rkey: Query<String>, 123 + Query(rkey): Query<String>, 122 124 /// Optional: the CID of the version of the record. 123 125 /// 124 126 /// If not specified, then return the most recent version. ··· 126 128 /// If specified and a newer version of the record exists, returns 404 not 127 129 /// found. That is: slingshot only retains the most recent version of a 128 130 /// record. 129 - cid: Query<Option<String>>, 131 + Query(cid): Query<Option<String>>, 130 132 ) -> GetRecordResponse { 131 133 // TODO: yeah yeah 132 - let at_uri = format!("at://{}/{}/{}", &*repo, &*collection, &*rkey); 134 + let at_uri = format!("at://{repo}/{collection}/{rkey}"); 133 135 134 136 let entry = self 135 137 .cache 136 - .fetch(at_uri.clone(), || async move { todo!() }) 138 + .fetch(at_uri.clone(), { 139 + let cid = cid.clone(); 140 + let repo_api = self.repo.clone(); 141 + || async move { 142 + repo_api 143 + .get_record(repo, collection, rkey, cid) 144 + .await 145 + .map_err(|e| foyer::Error::Other(Box::new(e))) 146 + } 147 + }) 137 148 .await 138 - .unwrap(); 149 + .unwrap(); // todo 139 150 140 151 // TODO: actual 404 141 152 ··· 165 176 })), 166 177 } 167 178 } 179 + 180 + // TODO 181 + // #[oai(path = "/com.atproto.identity.resolveHandle", method = "get")] 182 + // #[oai(path = "/com.atproto.identity.resolveDid", method = "get")] 183 + // but these are both not specified to do bidirectional validation, which is what we want to offer 184 + // com.atproto.identity.resolveIdentity seems right, but requires returning the full did-doc 185 + // would be nice if there were two queries: 186 + // did -> verified handle + pds url 187 + // handle -> verified did + pds url 188 + // 189 + // we could do horrible things and implement resolveIdentity with only a stripped-down fake did doc 190 + // but this will *definitely* cause problems because eg. we're not currently storing pubkeys and 191 + // those are a little bit important 168 192 } 169 193 170 194 pub async fn serve( 171 195 cache: HybridCache<String, CachedRecord>, 196 + repo: Repo, 172 197 _shutdown: CancellationToken, 173 198 ) -> Result<(), ServerError> { 174 - let api_service = OpenApiService::new(Xrpc { cache }, "Slingshot", env!("CARGO_PKG_VERSION")) 175 - .server("http://localhost:3000") 176 - .url_prefix("/xrpc"); 199 + let repo = Arc::new(repo); 200 + let api_service = 201 + OpenApiService::new(Xrpc { cache, repo }, "Slingshot", env!("CARGO_PKG_VERSION")) 202 + .server("http://localhost:3000") 203 + .url_prefix("/xrpc"); 177 204 178 205 let app = Route::new() 179 206 .nest("/", api_service.scalar())