Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm

Compare changes

Choose any two refs to compare.

+11 -11
Cargo.lock
··· 992 992 993 993 [[package]] 994 994 name = "clap" 995 - version = "4.5.47" 995 + version = "4.5.48" 996 996 source = "registry+https://github.com/rust-lang/crates.io-index" 997 - checksum = "7eac00902d9d136acd712710d71823fb8ac8004ca445a89e73a41d45aa712931" 997 + checksum = "e2134bb3ea021b78629caa971416385309e0131b351b25e01dc16fb54e1b5fae" 998 998 dependencies = [ 999 999 "clap_builder", 1000 1000 "clap_derive", ··· 1002 1002 1003 1003 [[package]] 1004 1004 name = "clap_builder" 1005 - version = "4.5.47" 1005 + version = "4.5.48" 1006 1006 source = "registry+https://github.com/rust-lang/crates.io-index" 1007 - checksum = "2ad9bbf750e73b5884fb8a211a9424a1906c1e156724260fdae972f31d70e1d6" 1007 + checksum = "c2ba64afa3c0a6df7fa517765e31314e983f51dda798ffba27b988194fb65dc9" 1008 1008 dependencies = [ 1009 1009 "anstream", 1010 1010 "anstyle", ··· 1375 1375 checksum = "18e4fdb82bd54a12e42fb58a800dcae6b9e13982238ce2296dc3570b92148e1f" 1376 1376 dependencies = [ 1377 1377 "data-encoding", 1378 - "syn 1.0.109", 1378 + "syn 2.0.106", 1379 1379 ] 1380 1380 1381 1381 [[package]] ··· 1796 1796 [[package]] 1797 1797 name = "fjall" 1798 1798 version = "2.11.2" 1799 - source = "git+https://github.com/fjall-rs/fjall.git#42d811f7c8cc9004407d520d37d2a1d8d246c03d" 1799 + source = "git+https://github.com/fjall-rs/fjall.git?rev=fb229572bb7d1d6966a596994dc1708e47ec57d8#fb229572bb7d1d6966a596994dc1708e47ec57d8" 1800 1800 dependencies = [ 1801 1801 "byteorder", 1802 1802 "byteview", ··· 3045 3045 checksum = "fc2f4eb4bc735547cfed7c0a4922cbd04a4655978c09b54f1f7b228750664c34" 3046 3046 dependencies = [ 3047 3047 "cfg-if", 3048 - "windows-targets 0.48.5", 3048 + "windows-targets 0.52.6", 3049 3049 ] 3050 3050 3051 3051 [[package]] ··· 4539 4539 4540 4540 [[package]] 4541 4541 name = "reqwest" 4542 - version = "0.12.22" 4542 + version = "0.12.23" 4543 4543 source = "registry+https://github.com/rust-lang/crates.io-index" 4544 - checksum = "cbc931937e6ca3a06e3b6c0aa7841849b160a90351d6ab467a8b9b9959767531" 4544 + checksum = "d429f34c8092b2d42c7c93cec323bb4adeb7c67698f70839adec842ec10c7ceb" 4545 4545 dependencies = [ 4546 4546 "async-compression", 4547 4547 "base64 0.22.1", ··· 6049 6049 "clap", 6050 6050 "dropshot", 6051 6051 "env_logger", 6052 - "fjall 2.11.2 (git+https://github.com/fjall-rs/fjall.git)", 6052 + "fjall 2.11.2 (git+https://github.com/fjall-rs/fjall.git?rev=fb229572bb7d1d6966a596994dc1708e47ec57d8)", 6053 6053 "getrandom 0.3.3", 6054 6054 "http", 6055 6055 "jetstream", ··· 6440 6440 source = "registry+https://github.com/rust-lang/crates.io-index" 6441 6441 checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" 6442 6442 dependencies = [ 6443 - "windows-sys 0.48.0", 6443 + "windows-sys 0.59.0", 6444 6444 ] 6445 6445 6446 6446 [[package]]
+48 -13
constellation/src/bin/main.rs
··· 26 26 #[arg(long)] 27 27 #[clap(default_value = "0.0.0.0:6789")] 28 28 bind: SocketAddr, 29 + /// optionally disable the metrics server 30 + #[arg(long)] 31 + #[clap(default_value_t = false)] 32 + collect_metrics: bool, 29 33 /// metrics server's listen address 30 34 #[arg(long)] 31 35 #[clap(default_value = "0.0.0.0:8765")] ··· 54 58 /// Saved jsonl from jetstream to use instead of a live subscription 55 59 #[arg(short, long)] 56 60 fixture: Option<PathBuf>, 61 + /// run a scan across the target id table and write all key -> ids to id -> keys 62 + #[arg(long, action)] 63 + repair_target_ids: bool, 57 64 } 58 65 59 66 #[derive(Debug, Clone, ValueEnum)] ··· 89 96 let bind = args.bind; 90 97 let metrics_bind = args.bind_metrics; 91 98 99 + let collect_metrics = args.collect_metrics; 92 100 let stay_alive = CancellationToken::new(); 93 101 94 102 match args.backend { ··· 99 107 stream, 100 108 bind, 101 109 metrics_bind, 110 + collect_metrics, 102 111 stay_alive, 103 112 ), 104 113 #[cfg(feature = "rocks")] ··· 115 124 rocks.start_backup(backup_dir, auto_backup, stay_alive.clone())?; 116 125 } 117 126 println!("rocks ready."); 118 - run( 119 - rocks, 120 - fixture, 121 - args.data, 122 - stream, 123 - bind, 124 - metrics_bind, 125 - stay_alive, 126 - ) 127 + std::thread::scope(|s| { 128 + if args.repair_target_ids { 129 + let rocks = rocks.clone(); 130 + let stay_alive = stay_alive.clone(); 131 + s.spawn(move || { 132 + let rep = rocks.run_repair(time::Duration::from_millis(0), stay_alive); 133 + eprintln!("repair finished: {rep:?}"); 134 + rep 135 + }); 136 + } 137 + s.spawn(|| { 138 + let r = run( 139 + rocks, 140 + fixture, 141 + args.data, 142 + stream, 143 + bind, 144 + metrics_bind, 145 + collect_metrics, 146 + stay_alive, 147 + ); 148 + eprintln!("run finished: {r:?}"); 149 + r 150 + }); 151 + }); 152 + Ok(()) 127 153 } 128 154 } 129 155 } 130 156 157 + #[allow(clippy::too_many_lines)] 158 + #[allow(clippy::too_many_arguments)] 131 159 fn run( 132 160 mut storage: impl LinkStorage, 133 161 fixture: Option<PathBuf>, ··· 135 163 stream: String, 136 164 bind: SocketAddr, 137 165 metrics_bind: SocketAddr, 166 + collect_metrics: bool, 138 167 stay_alive: CancellationToken, 139 168 ) -> Result<()> { 140 169 ctrlc::set_handler({ ··· 179 208 .build() 180 209 .expect("axum startup") 181 210 .block_on(async { 182 - install_metrics_server(metrics_bind)?; 211 + // Install metrics server only if requested 212 + if collect_metrics { 213 + install_metrics_server(metrics_bind)?; 214 + } 183 215 serve(readable, bind, staying_alive).await 184 216 }) 185 217 .unwrap(); ··· 187 219 } 188 220 }); 189 221 190 - s.spawn(move || { // monitor thread 222 + // only spawn monitoring thread if the metrics server is running 223 + if collect_metrics { 224 + s.spawn(move || { // monitor thread 191 225 let stay_alive = stay_alive.clone(); 192 226 let check_alive = stay_alive.clone(); 193 227 ··· 213 247 214 248 'monitor: loop { 215 249 match readable.get_stats() { 216 - Ok(StorageStats { dids, targetables, linking_records }) => { 250 + Ok(StorageStats { dids, targetables, linking_records, .. }) => { 217 251 metrics::gauge!("storage.stats.dids").set(dids as f64); 218 252 metrics::gauge!("storage.stats.targetables").set(targetables as f64); 219 253 metrics::gauge!("storage.stats.linking_records").set(linking_records as f64); ··· 239 273 } 240 274 } 241 275 stay_alive.drop_guard(); 242 - }); 276 + }); 277 + } 243 278 }); 244 279 245 280 println!("byeeee");
+13 -6
constellation/src/consumer/jetstream.rs
··· 226 226 println!("jetstream closed the websocket cleanly."); 227 227 break; 228 228 } 229 - r => eprintln!("jetstream: close result after error: {r:?}"), 229 + Err(_) => { 230 + counter!("jetstream_read_fail", "url" => stream.clone(), "reason" => "dirty close").increment(1); 231 + println!("jetstream failed to close the websocket cleanly."); 232 + break; 233 + } 234 + Ok(r) => { 235 + eprintln!("jetstream: close result after error: {r:?}"); 236 + counter!("jetstream_read_fail", "url" => stream.clone(), "reason" => "read error") 237 + .increment(1); 238 + // if we didn't immediately get ConnectionClosed, we should keep polling read 239 + // until we get it. 240 + continue; 241 + } 230 242 } 231 - counter!("jetstream_read_fail", "url" => stream.clone(), "reason" => "read error") 232 - .increment(1); 233 - // if we didn't immediately get ConnectionClosed, we should keep polling read 234 - // until we get it. 235 - continue; 236 243 } 237 244 }; 238 245
+8 -6
constellation/src/server/filters.rs
··· 5 5 Ok({ 6 6 if let Some(link) = parse_any_link(s) { 7 7 match link { 8 - Link::AtUri(at_uri) => at_uri.strip_prefix("at://").map(|noproto| { 9 - format!("https://atproto-browser-plus-links.vercel.app/at/{noproto}") 10 - }), 11 - Link::Did(did) => Some(format!( 12 - "https://atproto-browser-plus-links.vercel.app/at/{did}" 13 - )), 8 + Link::AtUri(at_uri) => at_uri 9 + .strip_prefix("at://") 10 + .map(|noproto| format!("https://pdsls.dev/at://{noproto}")), 11 + Link::Did(did) => Some(format!("https://pdsls.dev/at://{did}")), 14 12 Link::Uri(uri) => Some(uri), 15 13 } 16 14 } else { ··· 22 20 pub fn human_number(n: &u64) -> askama::Result<String> { 23 21 Ok(n.to_formatted_string(&Locale::en)) 24 22 } 23 + 24 + pub fn to_u64(n: usize) -> askama::Result<u64> { 25 + Ok(n as u64) 26 + }
+181 -17
constellation/src/server/mod.rs
··· 14 14 use std::collections::{HashMap, HashSet}; 15 15 use std::time::{Duration, UNIX_EPOCH}; 16 16 use tokio::net::{TcpListener, ToSocketAddrs}; 17 - use tokio::task::block_in_place; 17 + use tokio::task::spawn_blocking; 18 18 use tokio_util::sync::CancellationToken; 19 19 20 20 use crate::storage::{LinkReader, StorageStats}; ··· 25 25 26 26 use acceptable::{acceptable, ExtractAccept}; 27 27 28 - const DEFAULT_CURSOR_LIMIT: u64 = 16; 29 - const DEFAULT_CURSOR_LIMIT_MAX: u64 = 100; 28 + const DEFAULT_CURSOR_LIMIT: u64 = 100; 29 + const DEFAULT_CURSOR_LIMIT_MAX: u64 = 1000; 30 30 31 31 fn get_default_cursor_limit() -> u64 { 32 32 DEFAULT_CURSOR_LIMIT 33 33 } 34 34 35 - const INDEX_BEGAN_AT_TS: u64 = 1738083600; // TODO: not this 35 + fn to500(e: tokio::task::JoinError) -> http::StatusCode { 36 + eprintln!("handler error: {e}"); 37 + http::StatusCode::INTERNAL_SERVER_ERROR 38 + } 36 39 37 40 pub async fn serve<S, A>(store: S, addr: A, stay_alive: CancellationToken) -> anyhow::Result<()> 38 41 where ··· 45 48 "/", 46 49 get({ 47 50 let store = store.clone(); 48 - move |accept| async { block_in_place(|| hello(accept, store)) } 51 + move |accept| async { 52 + spawn_blocking(|| hello(accept, store)) 53 + .await 54 + .map_err(to500)? 55 + } 56 + }), 57 + ) 58 + .route( 59 + "/xrpc/blue.microcosm.links.getManyToManyCounts", 60 + get({ 61 + let store = store.clone(); 62 + move |accept, query| async { 63 + spawn_blocking(|| get_many_to_many_counts(accept, query, store)) 64 + .await 65 + .map_err(to500)? 66 + } 49 67 }), 50 68 ) 51 69 .route( 52 70 "/links/count", 53 71 get({ 54 72 let store = store.clone(); 55 - move |accept, query| async { block_in_place(|| count_links(accept, query, store)) } 73 + move |accept, query| async { 74 + spawn_blocking(|| count_links(accept, query, store)) 75 + .await 76 + .map_err(to500)? 77 + } 56 78 }), 57 79 ) 58 80 .route( ··· 60 82 get({ 61 83 let store = store.clone(); 62 84 move |accept, query| async { 63 - block_in_place(|| count_distinct_dids(accept, query, store)) 85 + spawn_blocking(|| count_distinct_dids(accept, query, store)) 86 + .await 87 + .map_err(to500)? 64 88 } 65 89 }), 66 90 ) ··· 69 93 get({ 70 94 let store = store.clone(); 71 95 move |accept, query| async { 72 - block_in_place(|| get_backlinks(accept, query, store)) 96 + spawn_blocking(|| get_backlinks(accept, query, store)) 97 + .await 98 + .map_err(to500)? 73 99 } 74 100 }), 75 101 ) ··· 77 103 "/links", 78 104 get({ 79 105 let store = store.clone(); 80 - move |accept, query| async { block_in_place(|| get_links(accept, query, store)) } 106 + move |accept, query| async { 107 + spawn_blocking(|| get_links(accept, query, store)) 108 + .await 109 + .map_err(to500)? 110 + } 81 111 }), 82 112 ) 83 113 .route( ··· 85 115 get({ 86 116 let store = store.clone(); 87 117 move |accept, query| async { 88 - block_in_place(|| get_distinct_dids(accept, query, store)) 118 + spawn_blocking(|| get_distinct_dids(accept, query, store)) 119 + .await 120 + .map_err(to500)? 89 121 } 90 122 }), 91 123 ) ··· 95 127 get({ 96 128 let store = store.clone(); 97 129 move |accept, query| async { 98 - block_in_place(|| count_all_links(accept, query, store)) 130 + spawn_blocking(|| count_all_links(accept, query, store)) 131 + .await 132 + .map_err(to500)? 99 133 } 100 134 }), 101 135 ) ··· 104 138 get({ 105 139 let store = store.clone(); 106 140 move |accept, query| async { 107 - block_in_place(|| explore_links(accept, query, store)) 141 + spawn_blocking(|| explore_links(accept, query, store)) 142 + .await 143 + .map_err(to500)? 108 144 } 109 145 }), 110 146 ) ··· 163 199 #[template(path = "hello.html.j2")] 164 200 struct HelloReponse { 165 201 help: &'static str, 166 - days_indexed: u64, 202 + days_indexed: Option<u64>, 167 203 stats: StorageStats, 168 204 } 169 205 fn hello( ··· 173 209 let stats = store 174 210 .get_stats() 175 211 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?; 176 - let days_indexed = (UNIX_EPOCH + Duration::from_secs(INDEX_BEGAN_AT_TS)) 177 - .elapsed() 212 + let days_indexed = stats 213 + .started_at 214 + .map(|c| (UNIX_EPOCH + Duration::from_micros(c)).elapsed()) 215 + .transpose() 178 216 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)? 179 - .as_secs() 180 - / 86400; 217 + .map(|d| d.as_secs() / 86_400); 181 218 Ok(acceptable(accept, HelloReponse { 182 219 help: "open this URL in a web browser (or request with Accept: text/html) for information about this API.", 183 220 days_indexed, 184 221 stats, 185 222 })) 223 + } 224 + 225 + #[derive(Clone, Deserialize)] 226 + #[serde(rename_all = "camelCase")] 227 + struct GetManyToManyCountsQuery { 228 + subject: String, 229 + source: String, 230 + /// path to the secondary link in the linking record 231 + path_to_other: String, 232 + /// filter to linking records (join of the m2m) by these DIDs 233 + #[serde(default)] 234 + did: Vec<String>, 235 + /// filter to specific secondary records 236 + #[serde(default)] 237 + other_subject: Vec<String>, 238 + cursor: Option<OpaqueApiCursor>, 239 + /// Set the max number of links to return per page of results 240 + #[serde(default = "get_default_cursor_limit")] 241 + limit: u64, 242 + } 243 + #[derive(Serialize)] 244 + struct OtherSubjectCount { 245 + subject: String, 246 + total: u64, 247 + distinct: u64, 248 + } 249 + #[derive(Template, Serialize)] 250 + #[template(path = "get-many-to-many-counts.html.j2")] 251 + struct GetManyToManyCountsResponse { 252 + counts_by_other_subject: Vec<OtherSubjectCount>, 253 + cursor: Option<OpaqueApiCursor>, 254 + #[serde(skip_serializing)] 255 + query: GetManyToManyCountsQuery, 256 + } 257 + fn get_many_to_many_counts( 258 + accept: ExtractAccept, 259 + query: axum_extra::extract::Query<GetManyToManyCountsQuery>, 260 + store: impl LinkReader, 261 + ) -> Result<impl IntoResponse, http::StatusCode> { 262 + let cursor_key = query 263 + .cursor 264 + .clone() 265 + .map(|oc| ApiKeyedCursor::try_from(oc).map_err(|_| http::StatusCode::BAD_REQUEST)) 266 + .transpose()? 267 + .map(|c| c.next); 268 + 269 + let limit = query.limit; 270 + if limit > DEFAULT_CURSOR_LIMIT_MAX { 271 + return Err(http::StatusCode::BAD_REQUEST); 272 + } 273 + 274 + let filter_dids: HashSet<Did> = HashSet::from_iter( 275 + query 276 + .did 277 + .iter() 278 + .map(|d| d.trim()) 279 + .filter(|d| !d.is_empty()) 280 + .map(|d| Did(d.to_string())), 281 + ); 282 + 283 + let filter_other_subjects: HashSet<String> = HashSet::from_iter( 284 + query 285 + .other_subject 286 + .iter() 287 + .map(|s| s.trim().to_string()) 288 + .filter(|s| !s.is_empty()), 289 + ); 290 + 291 + let Some((collection, path)) = query.source.split_once(':') else { 292 + return Err(http::StatusCode::BAD_REQUEST); 293 + }; 294 + let path = format!(".{path}"); 295 + 296 + let path_to_other = format!(".{}", query.path_to_other); 297 + 298 + let paged = store 299 + .get_many_to_many_counts( 300 + &query.subject, 301 + collection, 302 + &path, 303 + &path_to_other, 304 + limit, 305 + cursor_key, 306 + &filter_dids, 307 + &filter_other_subjects, 308 + ) 309 + .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?; 310 + 311 + let cursor = paged.next.map(|next| ApiKeyedCursor { next }.into()); 312 + 313 + let items = paged 314 + .items 315 + .into_iter() 316 + .map(|(subject, total, distinct)| OtherSubjectCount { 317 + subject, 318 + total, 319 + distinct, 320 + }) 321 + .collect(); 322 + 323 + Ok(acceptable( 324 + accept, 325 + GetManyToManyCountsResponse { 326 + counts_by_other_subject: items, 327 + cursor, 328 + query: (*query).clone(), 329 + }, 330 + )) 186 331 } 187 332 188 333 #[derive(Clone, Deserialize)] ··· 582 727 OpaqueApiCursor(bincode::DefaultOptions::new().serialize(&item).unwrap()) 583 728 } 584 729 } 730 + 731 + #[derive(Serialize, Deserialize)] // for bincode 732 + struct ApiKeyedCursor { 733 + next: String, // the key 734 + } 735 + 736 + impl TryFrom<OpaqueApiCursor> for ApiKeyedCursor { 737 + type Error = bincode::Error; 738 + 739 + fn try_from(item: OpaqueApiCursor) -> Result<Self, Self::Error> { 740 + bincode::DefaultOptions::new().deserialize(&item.0) 741 + } 742 + } 743 + 744 + impl From<ApiKeyedCursor> for OpaqueApiCursor { 745 + fn from(item: ApiKeyedCursor) -> Self { 746 + OpaqueApiCursor(bincode::DefaultOptions::new().serialize(&item).unwrap()) 747 + } 748 + }
+78 -1
constellation/src/storage/mem_store.rs
··· 1 - use super::{LinkReader, LinkStorage, PagedAppendingCollection, StorageStats}; 1 + use super::{ 2 + LinkReader, LinkStorage, PagedAppendingCollection, PagedOrderedCollection, StorageStats, 3 + }; 2 4 use crate::{ActionableEvent, CountsByCount, Did, RecordId}; 3 5 use anyhow::Result; 4 6 use links::CollectedLink; ··· 132 134 } 133 135 134 136 impl LinkReader for MemStorage { 137 + fn get_many_to_many_counts( 138 + &self, 139 + target: &str, 140 + collection: &str, 141 + path: &str, 142 + path_to_other: &str, 143 + limit: u64, 144 + after: Option<String>, 145 + filter_dids: &HashSet<Did>, 146 + filter_to_targets: &HashSet<String>, 147 + ) -> Result<PagedOrderedCollection<(String, u64, u64), String>> { 148 + let data = self.0.lock().unwrap(); 149 + let Some(paths) = data.targets.get(&Target::new(target)) else { 150 + return Ok(PagedOrderedCollection::default()); 151 + }; 152 + let Some(linkers) = paths.get(&Source::new(collection, path)) else { 153 + return Ok(PagedOrderedCollection::default()); 154 + }; 155 + 156 + let path_to_other = RecordPath::new(path_to_other); 157 + let filter_to_targets: HashSet<Target> = 158 + HashSet::from_iter(filter_to_targets.iter().map(|s| Target::new(s))); 159 + 160 + let mut grouped_counts: HashMap<Target, (u64, HashSet<Did>)> = HashMap::new(); 161 + for (did, rkey) in linkers.iter().flatten().cloned() { 162 + if !filter_dids.is_empty() && !filter_dids.contains(&did) { 163 + continue; 164 + } 165 + if let Some(fwd_target) = data 166 + .links 167 + .get(&did) 168 + .unwrap_or(&HashMap::new()) 169 + .get(&RepoId { 170 + collection: collection.to_string(), 171 + rkey, 172 + }) 173 + .unwrap_or(&Vec::new()) 174 + .iter() 175 + .filter_map(|(path, target)| { 176 + if *path == path_to_other 177 + && (filter_to_targets.is_empty() || filter_to_targets.contains(target)) 178 + { 179 + Some(target) 180 + } else { 181 + None 182 + } 183 + }) 184 + .take(1) 185 + .next() 186 + { 187 + let e = grouped_counts.entry(fwd_target.clone()).or_default(); 188 + e.0 += 1; 189 + e.1.insert(did.clone()); 190 + } 191 + } 192 + let mut items: Vec<(String, u64, u64)> = grouped_counts 193 + .iter() 194 + .map(|(k, (n, u))| (k.0.clone(), *n, u.len() as u64)) 195 + .collect(); 196 + items.sort(); 197 + items = items 198 + .into_iter() 199 + .skip_while(|(t, _, _)| after.as_ref().map(|a| t <= a).unwrap_or(false)) 200 + .take(limit as usize) 201 + .collect(); 202 + let next = if items.len() as u64 >= limit { 203 + items.last().map(|(t, _, _)| t.clone()) 204 + } else { 205 + None 206 + }; 207 + Ok(PagedOrderedCollection { items, next }) 208 + } 209 + 135 210 fn get_count(&self, target: &str, collection: &str, path: &str) -> Result<u64> { 136 211 let data = self.0.lock().unwrap(); 137 212 let Some(paths) = data.targets.get(&Target::new(target)) else { ··· 353 428 dids, 354 429 targetables, 355 430 linking_records, 431 + started_at: None, 432 + other_data: Default::default(), 356 433 }) 357 434 } 358 435 }
+225
constellation/src/storage/mod.rs
··· 19 19 pub total: u64, 20 20 } 21 21 22 + /// A paged collection whose keys are sorted instead of indexed 23 + /// 24 + /// this has weaker guarantees than PagedAppendingCollection: it might 25 + /// return a totally consistent snapshot. but it should avoid duplicates 26 + /// and each page should at least be internally consistent. 27 + #[derive(Debug, PartialEq, Default)] 28 + pub struct PagedOrderedCollection<T, K: Ord> { 29 + pub items: Vec<T>, 30 + pub next: Option<K>, 31 + } 32 + 22 33 #[derive(Debug, Deserialize, Serialize, PartialEq)] 23 34 pub struct StorageStats { 24 35 /// estimate of how many accounts we've seen create links. the _subjects_ of any links are not represented here. ··· 33 44 /// records with multiple links are single-counted. 34 45 /// for LSM stores, deleted links don't decrement this, and updated records with any links will likely increment it. 35 46 pub linking_records: u64, 47 + 48 + /// first jetstream cursor when this instance first started 49 + pub started_at: Option<u64>, 50 + 51 + /// anything else we want to throw in 52 + pub other_data: HashMap<String, u64>, 36 53 } 37 54 38 55 pub trait LinkStorage: Send + Sync { ··· 48 65 } 49 66 50 67 pub trait LinkReader: Clone + Send + Sync + 'static { 68 + #[allow(clippy::too_many_arguments)] 69 + fn get_many_to_many_counts( 70 + &self, 71 + target: &str, 72 + collection: &str, 73 + path: &str, 74 + path_to_other: &str, 75 + limit: u64, 76 + after: Option<String>, 77 + filter_dids: &HashSet<Did>, 78 + filter_to_targets: &HashSet<String>, 79 + ) -> Result<PagedOrderedCollection<(String, u64, u64), String>>; 80 + 51 81 fn get_count(&self, target: &str, collection: &str, path: &str) -> Result<u64>; 52 82 53 83 fn get_distinct_did_count(&self, target: &str, collection: &str, path: &str) -> Result<u64>; ··· 1326 1356 counts 1327 1357 }); 1328 1358 assert_stats(storage.get_stats()?, 1..=1, 2..=2, 1..=1); 1359 + }); 1360 + 1361 + //////// many-to-many ///////// 1362 + 1363 + test_each_storage!(get_m2m_counts_empty, |storage| { 1364 + assert_eq!( 1365 + storage.get_many_to_many_counts( 1366 + "a.com", 1367 + "a.b.c", 1368 + ".d.e", 1369 + ".f.g", 1370 + 10, 1371 + None, 1372 + &HashSet::new(), 1373 + &HashSet::new(), 1374 + )?, 1375 + PagedOrderedCollection { 1376 + items: vec![], 1377 + next: None, 1378 + } 1379 + ); 1380 + }); 1381 + 1382 + test_each_storage!(get_m2m_counts_single, |storage| { 1383 + storage.push( 1384 + &ActionableEvent::CreateLinks { 1385 + record_id: RecordId { 1386 + did: "did:plc:asdf".into(), 1387 + collection: "app.t.c".into(), 1388 + rkey: "asdf".into(), 1389 + }, 1390 + links: vec![ 1391 + CollectedLink { 1392 + target: Link::Uri("a.com".into()), 1393 + path: ".abc.uri".into(), 1394 + }, 1395 + CollectedLink { 1396 + target: Link::Uri("b.com".into()), 1397 + path: ".def.uri".into(), 1398 + }, 1399 + CollectedLink { 1400 + target: Link::Uri("b.com".into()), 1401 + path: ".ghi.uri".into(), 1402 + }, 1403 + ], 1404 + }, 1405 + 0, 1406 + )?; 1407 + assert_eq!( 1408 + storage.get_many_to_many_counts( 1409 + "a.com", 1410 + "app.t.c", 1411 + ".abc.uri", 1412 + ".def.uri", 1413 + 10, 1414 + None, 1415 + &HashSet::new(), 1416 + &HashSet::new(), 1417 + )?, 1418 + PagedOrderedCollection { 1419 + items: vec![("b.com".to_string(), 1, 1)], 1420 + next: None, 1421 + } 1422 + ); 1423 + }); 1424 + 1425 + test_each_storage!(get_m2m_counts_filters, |storage| { 1426 + storage.push( 1427 + &ActionableEvent::CreateLinks { 1428 + record_id: RecordId { 1429 + did: "did:plc:asdf".into(), 1430 + collection: "app.t.c".into(), 1431 + rkey: "asdf".into(), 1432 + }, 1433 + links: vec![ 1434 + CollectedLink { 1435 + target: Link::Uri("a.com".into()), 1436 + path: ".abc.uri".into(), 1437 + }, 1438 + CollectedLink { 1439 + target: Link::Uri("b.com".into()), 1440 + path: ".def.uri".into(), 1441 + }, 1442 + ], 1443 + }, 1444 + 0, 1445 + )?; 1446 + storage.push( 1447 + &ActionableEvent::CreateLinks { 1448 + record_id: RecordId { 1449 + did: "did:plc:asdfasdf".into(), 1450 + collection: "app.t.c".into(), 1451 + rkey: "asdf".into(), 1452 + }, 1453 + links: vec![ 1454 + CollectedLink { 1455 + target: Link::Uri("a.com".into()), 1456 + path: ".abc.uri".into(), 1457 + }, 1458 + CollectedLink { 1459 + target: Link::Uri("b.com".into()), 1460 + path: ".def.uri".into(), 1461 + }, 1462 + ], 1463 + }, 1464 + 1, 1465 + )?; 1466 + storage.push( 1467 + &ActionableEvent::CreateLinks { 1468 + record_id: RecordId { 1469 + did: "did:plc:fdsa".into(), 1470 + collection: "app.t.c".into(), 1471 + rkey: "asdf".into(), 1472 + }, 1473 + links: vec![ 1474 + CollectedLink { 1475 + target: Link::Uri("a.com".into()), 1476 + path: ".abc.uri".into(), 1477 + }, 1478 + CollectedLink { 1479 + target: Link::Uri("c.com".into()), 1480 + path: ".def.uri".into(), 1481 + }, 1482 + ], 1483 + }, 1484 + 2, 1485 + )?; 1486 + storage.push( 1487 + &ActionableEvent::CreateLinks { 1488 + record_id: RecordId { 1489 + did: "did:plc:fdsa".into(), 1490 + collection: "app.t.c".into(), 1491 + rkey: "asdf2".into(), 1492 + }, 1493 + links: vec![ 1494 + CollectedLink { 1495 + target: Link::Uri("a.com".into()), 1496 + path: ".abc.uri".into(), 1497 + }, 1498 + CollectedLink { 1499 + target: Link::Uri("c.com".into()), 1500 + path: ".def.uri".into(), 1501 + }, 1502 + ], 1503 + }, 1504 + 3, 1505 + )?; 1506 + assert_eq!( 1507 + storage.get_many_to_many_counts( 1508 + "a.com", 1509 + "app.t.c", 1510 + ".abc.uri", 1511 + ".def.uri", 1512 + 10, 1513 + None, 1514 + &HashSet::new(), 1515 + &HashSet::new(), 1516 + )?, 1517 + PagedOrderedCollection { 1518 + items: vec![("b.com".to_string(), 2, 2), ("c.com".to_string(), 2, 1),], 1519 + next: None, 1520 + } 1521 + ); 1522 + assert_eq!( 1523 + storage.get_many_to_many_counts( 1524 + "a.com", 1525 + "app.t.c", 1526 + ".abc.uri", 1527 + ".def.uri", 1528 + 10, 1529 + None, 1530 + &HashSet::from_iter([Did("did:plc:fdsa".to_string())]), 1531 + &HashSet::new(), 1532 + )?, 1533 + PagedOrderedCollection { 1534 + items: vec![("c.com".to_string(), 2, 1),], 1535 + next: None, 1536 + } 1537 + ); 1538 + assert_eq!( 1539 + storage.get_many_to_many_counts( 1540 + "a.com", 1541 + "app.t.c", 1542 + ".abc.uri", 1543 + ".def.uri", 1544 + 10, 1545 + None, 1546 + &HashSet::new(), 1547 + &HashSet::from_iter(["b.com".to_string()]), 1548 + )?, 1549 + PagedOrderedCollection { 1550 + items: vec![("b.com".to_string(), 2, 2),], 1551 + next: None, 1552 + } 1553 + ); 1329 1554 }); 1330 1555 }
+342 -40
constellation/src/storage/rocks_store.rs
··· 1 - use super::{ActionableEvent, LinkReader, LinkStorage, PagedAppendingCollection, StorageStats}; 1 + use super::{ 2 + ActionableEvent, LinkReader, LinkStorage, PagedAppendingCollection, PagedOrderedCollection, 3 + StorageStats, 4 + }; 2 5 use crate::{CountsByCount, Did, RecordId}; 3 6 use anyhow::{bail, Result}; 4 7 use bincode::Options as BincodeOptions; ··· 11 14 MultiThreaded, Options, PrefixRange, ReadOptions, WriteBatch, 12 15 }; 13 16 use serde::{Deserialize, Serialize}; 14 - use std::collections::{HashMap, HashSet}; 17 + use std::collections::{BTreeMap, HashMap, HashSet}; 15 18 use std::io::Read; 16 19 use std::marker::PhantomData; 17 20 use std::path::{Path, PathBuf}; ··· 20 23 Arc, 21 24 }; 22 25 use std::thread; 23 - use std::time::{Duration, Instant}; 26 + use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; 24 27 use tokio_util::sync::CancellationToken; 25 28 26 29 static DID_IDS_CF: &str = "did_ids"; ··· 29 32 static LINK_TARGETS_CF: &str = "link_targets"; 30 33 31 34 static JETSTREAM_CURSOR_KEY: &str = "jetstream_cursor"; 35 + static STARTED_AT_KEY: &str = "jetstream_first_cursor"; 36 + // add reverse mappings for targets if this db was running before that was a thing 37 + static TARGET_ID_REPAIR_STATE_KEY: &str = "target_id_table_repair_state"; 38 + 39 + static COZY_FIRST_CURSOR: u64 = 1_738_083_600_000_000; // constellation.microcosm.blue started 40 + 41 + #[derive(Debug, Clone, Serialize, Deserialize)] 42 + struct TargetIdRepairState { 43 + /// start time for repair, microseconds timestamp 44 + current_us_started_at: u64, 45 + /// id table's latest id when repair started 46 + id_when_started: u64, 47 + /// id table id 48 + latest_repaired_i: u64, 49 + } 50 + impl AsRocksValue for TargetIdRepairState {} 51 + impl ValueFromRocks for TargetIdRepairState {} 32 52 33 53 // todo: actually understand and set these options probably better 34 54 fn rocks_opts_base() -> Options { ··· 56 76 #[derive(Debug, Clone)] 57 77 pub struct RocksStorage { 58 78 pub db: Arc<DBWithThreadMode<MultiThreaded>>, // TODO: mov seqs here (concat merge op will be fun) 59 - did_id_table: IdTable<Did, DidIdValue, true>, 60 - target_id_table: IdTable<TargetKey, TargetId, false>, 79 + did_id_table: IdTable<Did, DidIdValue>, 80 + target_id_table: IdTable<TargetKey, TargetId>, 61 81 is_writer: bool, 62 82 backup_task: Arc<Option<thread::JoinHandle<Result<()>>>>, 63 83 } ··· 85 105 fn cf_descriptor(&self) -> ColumnFamilyDescriptor { 86 106 ColumnFamilyDescriptor::new(&self.name, rocks_opts_base()) 87 107 } 88 - fn init<const WITH_REVERSE: bool>( 89 - self, 90 - db: &DBWithThreadMode<MultiThreaded>, 91 - ) -> Result<IdTable<Orig, IdVal, WITH_REVERSE>> { 108 + fn init(self, db: &DBWithThreadMode<MultiThreaded>) -> Result<IdTable<Orig, IdVal>> { 92 109 if db.cf_handle(&self.name).is_none() { 93 110 bail!("failed to get cf handle from db -- was the db open with our .cf_descriptor()?"); 94 111 } ··· 119 136 } 120 137 } 121 138 #[derive(Debug, Clone)] 122 - struct IdTable<Orig, IdVal: IdTableValue, const WITH_REVERSE: bool> 139 + struct IdTable<Orig, IdVal: IdTableValue> 123 140 where 124 141 Orig: KeyFromRocks, 125 142 for<'a> &'a Orig: AsRocksKey, ··· 127 144 base: IdTableBase<Orig, IdVal>, 128 145 priv_id_seq: u64, 129 146 } 130 - impl<Orig: Clone, IdVal: IdTableValue, const WITH_REVERSE: bool> IdTable<Orig, IdVal, WITH_REVERSE> 147 + impl<Orig: Clone, IdVal: IdTableValue> IdTable<Orig, IdVal> 131 148 where 132 149 Orig: KeyFromRocks, 133 150 for<'v> &'v IdVal: AsRocksValue, ··· 139 156 _key_marker: PhantomData, 140 157 _val_marker: PhantomData, 141 158 name: name.into(), 142 - id_seq: Arc::new(AtomicU64::new(0)), // zero is "uninint", first seq num will be 1 159 + id_seq: Arc::new(AtomicU64::new(0)), // zero is "uninit", first seq num will be 1 143 160 } 144 161 } 145 162 fn get_id_val( ··· 178 195 id_value 179 196 })) 180 197 } 198 + 181 199 fn estimate_count(&self) -> u64 { 182 200 self.base.id_seq.load(Ordering::SeqCst) - 1 // -1 because seq zero is reserved 183 201 } 184 - } 185 - impl<Orig: Clone, IdVal: IdTableValue> IdTable<Orig, IdVal, true> 186 - where 187 - Orig: KeyFromRocks, 188 - for<'v> &'v IdVal: AsRocksValue, 189 - for<'k> &'k Orig: AsRocksKey, 190 - { 202 + 191 203 fn get_or_create_id_val( 192 204 &mut self, 193 205 db: &DBWithThreadMode<MultiThreaded>, ··· 215 227 } 216 228 } 217 229 } 218 - impl<Orig: Clone, IdVal: IdTableValue> IdTable<Orig, IdVal, false> 219 - where 220 - Orig: KeyFromRocks, 221 - for<'v> &'v IdVal: AsRocksValue, 222 - for<'k> &'k Orig: AsRocksKey, 223 - { 224 - fn get_or_create_id_val( 225 - &mut self, 226 - db: &DBWithThreadMode<MultiThreaded>, 227 - batch: &mut WriteBatch, 228 - orig: &Orig, 229 - ) -> Result<IdVal> { 230 - let cf = db.cf_handle(&self.base.name).unwrap(); 231 - self.__get_or_create_id_val(&cf, db, batch, orig) 232 - } 233 - } 234 230 235 231 impl IdTableValue for DidIdValue { 236 232 fn new(v: u64) -> Self { ··· 249 245 } 250 246 } 251 247 248 + fn now() -> u64 { 249 + SystemTime::now() 250 + .duration_since(UNIX_EPOCH) 251 + .unwrap() 252 + .as_micros() as u64 253 + } 254 + 252 255 impl RocksStorage { 253 256 pub fn new(path: impl AsRef<Path>) -> Result<Self> { 254 257 Self::describe_metrics(); 255 - RocksStorage::open_readmode(path, false) 258 + let me = RocksStorage::open_readmode(path, false)?; 259 + me.global_init()?; 260 + Ok(me) 256 261 } 257 262 258 263 pub fn open_readonly(path: impl AsRef<Path>) -> Result<Self> { ··· 260 265 } 261 266 262 267 fn open_readmode(path: impl AsRef<Path>, readonly: bool) -> Result<Self> { 263 - let did_id_table = IdTable::<_, _, true>::setup(DID_IDS_CF); 264 - let target_id_table = IdTable::<_, _, false>::setup(TARGET_IDS_CF); 268 + let did_id_table = IdTable::setup(DID_IDS_CF); 269 + let target_id_table = IdTable::setup(TARGET_IDS_CF); 265 270 271 + // note: global stuff like jetstream cursor goes in the default cf 272 + // these are bonus extra cfs 266 273 let cfs = vec![ 267 274 // id reference tables 268 275 did_id_table.cf_descriptor(), ··· 296 303 is_writer: !readonly, 297 304 backup_task: None.into(), 298 305 }) 306 + } 307 + 308 + fn global_init(&self) -> Result<()> { 309 + let first_run = self.db.get(JETSTREAM_CURSOR_KEY)?.is_some(); 310 + if first_run { 311 + self.db.put(STARTED_AT_KEY, _rv(now()))?; 312 + 313 + // hack / temporary: if we're a new db, put in a completed repair 314 + // state so we don't run repairs (repairs are for old-code dbs) 315 + let completed = TargetIdRepairState { 316 + id_when_started: 0, 317 + current_us_started_at: 0, 318 + latest_repaired_i: 0, 319 + }; 320 + self.db.put(TARGET_ID_REPAIR_STATE_KEY, _rv(completed))?; 321 + } 322 + Ok(()) 323 + } 324 + 325 + pub fn run_repair(&self, breather: Duration, stay_alive: CancellationToken) -> Result<bool> { 326 + let mut state = match self 327 + .db 328 + .get(TARGET_ID_REPAIR_STATE_KEY)? 329 + .map(|s| _vr(&s)) 330 + .transpose()? 331 + { 332 + Some(s) => s, 333 + None => TargetIdRepairState { 334 + id_when_started: self.did_id_table.priv_id_seq, 335 + current_us_started_at: now(), 336 + latest_repaired_i: 0, 337 + }, 338 + }; 339 + 340 + eprintln!("initial repair state: {state:?}"); 341 + 342 + let cf = self.db.cf_handle(TARGET_IDS_CF).unwrap(); 343 + 344 + let mut iter = self.db.raw_iterator_cf(&cf); 345 + iter.seek_to_first(); 346 + 347 + eprintln!("repair iterator sent to first key"); 348 + 349 + // skip ahead if we're done some, or take a single first step 350 + for _ in 0..state.latest_repaired_i { 351 + iter.next(); 352 + } 353 + 354 + eprintln!( 355 + "repair iterator skipped to {}th key", 356 + state.latest_repaired_i 357 + ); 358 + 359 + let mut maybe_done = false; 360 + 361 + let mut write_fast = rocksdb::WriteOptions::default(); 362 + write_fast.set_sync(false); 363 + write_fast.disable_wal(true); 364 + 365 + while !stay_alive.is_cancelled() && !maybe_done { 366 + // let mut batch = WriteBatch::default(); 367 + 368 + let mut any_written = false; 369 + 370 + for _ in 0..1000 { 371 + if state.latest_repaired_i % 1_000_000 == 0 { 372 + eprintln!("target iter at {}", state.latest_repaired_i); 373 + } 374 + state.latest_repaired_i += 1; 375 + 376 + if !iter.valid() { 377 + eprintln!("invalid iter, are we done repairing?"); 378 + maybe_done = true; 379 + break; 380 + }; 381 + 382 + // eprintln!("iterator seems to be valid! getting the key..."); 383 + let raw_key = iter.key().unwrap(); 384 + if raw_key.len() == 8 { 385 + // eprintln!("found an 8-byte key, skipping it since it's probably an id..."); 386 + iter.next(); 387 + continue; 388 + } 389 + let target: TargetKey = _kr::<TargetKey>(raw_key)?; 390 + let target_id: TargetId = _vr(iter.value().unwrap())?; 391 + 392 + self.db 393 + .put_cf_opt(&cf, target_id.id().to_be_bytes(), _rv(&target), &write_fast)?; 394 + any_written = true; 395 + iter.next(); 396 + } 397 + 398 + if any_written { 399 + self.db 400 + .put(TARGET_ID_REPAIR_STATE_KEY, _rv(state.clone()))?; 401 + std::thread::sleep(breather); 402 + } 403 + } 404 + 405 + eprintln!("repair iterator done."); 406 + 407 + Ok(false) 299 408 } 300 409 301 410 pub fn start_backup( ··· 826 935 } 827 936 828 937 impl LinkReader for RocksStorage { 938 + fn get_many_to_many_counts( 939 + &self, 940 + target: &str, 941 + collection: &str, 942 + path: &str, 943 + path_to_other: &str, 944 + limit: u64, 945 + after: Option<String>, 946 + filter_dids: &HashSet<Did>, 947 + filter_to_targets: &HashSet<String>, 948 + ) -> Result<PagedOrderedCollection<(String, u64, u64), String>> { 949 + let collection = Collection(collection.to_string()); 950 + let path = RPath(path.to_string()); 951 + 952 + let target_key = TargetKey(Target(target.to_string()), collection.clone(), path.clone()); 953 + 954 + // unfortunately the cursor is a, uh, stringified number. 955 + // this was easier for the memstore (plain target, not target id), and 956 + // making it generic is a bit awful. 957 + // so... parse the number out of a string here :( 958 + // TODO: this should bubble up to a BAD_REQUEST response 959 + let after = after.map(|s| s.parse::<u64>().map(TargetId)).transpose()?; 960 + 961 + let Some(target_id) = self.target_id_table.get_id_val(&self.db, &target_key)? else { 962 + eprintln!("nothin doin for this target, {target_key:?}"); 963 + return Ok(Default::default()); 964 + }; 965 + 966 + let filter_did_ids: HashMap<DidId, bool> = filter_dids 967 + .iter() 968 + .filter_map(|did| self.did_id_table.get_id_val(&self.db, did).transpose()) 969 + .collect::<Result<Vec<DidIdValue>>>()? 970 + .into_iter() 971 + .map(|DidIdValue(id, active)| (id, active)) 972 + .collect(); 973 + 974 + // stored targets are keyed by triples of (target, collection, path). 975 + // target filtering only consideres the target itself, so we actually 976 + // need to do a prefix iteration of all target ids for this target and 977 + // keep them all. 978 + // i *think* the number of keys at a target prefix should usually be 979 + // pretty small, so this is hopefully fine. but if it turns out to be 980 + // large, we can push this filtering back into the main links loop and 981 + // do forward db queries per backlink to get the raw target back out. 982 + let mut filter_to_target_ids: HashSet<TargetId> = HashSet::new(); 983 + for t in filter_to_targets { 984 + for (_, target_id) in self.iter_targets_for_target(&Target(t.to_string())) { 985 + filter_to_target_ids.insert(target_id); 986 + } 987 + } 988 + 989 + let linkers = self.get_target_linkers(&target_id)?; 990 + 991 + let mut grouped_counts: BTreeMap<TargetId, (u64, HashSet<DidId>)> = BTreeMap::new(); 992 + 993 + for (did_id, rkey) in linkers.0 { 994 + if did_id.is_empty() { 995 + continue; 996 + } 997 + 998 + if !filter_did_ids.is_empty() && filter_did_ids.get(&did_id) != Some(&true) { 999 + continue; 1000 + } 1001 + 1002 + let record_link_key = RecordLinkKey(did_id, collection.clone(), rkey); 1003 + let Some(targets) = self.get_record_link_targets(&record_link_key)? else { 1004 + continue; 1005 + }; 1006 + 1007 + let Some(fwd_target) = targets 1008 + .0 1009 + .into_iter() 1010 + .filter_map(|RecordLinkTarget(rpath, target_id)| { 1011 + if rpath.0 == path_to_other 1012 + && (filter_to_target_ids.is_empty() 1013 + || filter_to_target_ids.contains(&target_id)) 1014 + { 1015 + Some(target_id) 1016 + } else { 1017 + None 1018 + } 1019 + }) 1020 + .take(1) 1021 + .next() 1022 + else { 1023 + eprintln!("no forward match"); 1024 + continue; 1025 + }; 1026 + 1027 + // small relief: we page over target ids, so we can already bail 1028 + // reprocessing previous pages here 1029 + if after.as_ref().map(|a| fwd_target <= *a).unwrap_or(false) { 1030 + continue; 1031 + } 1032 + 1033 + // aand we can skip target ids that must be on future pages 1034 + // (this check continues after the did-lookup, which we have to do) 1035 + let page_is_full = grouped_counts.len() as u64 >= limit; 1036 + if page_is_full { 1037 + let current_max = grouped_counts.keys().next_back().unwrap(); // limit should be non-zero bleh 1038 + if fwd_target > *current_max { 1039 + continue; 1040 + } 1041 + } 1042 + 1043 + // bit painful: 2-step lookup to make sure this did is active 1044 + let Some(did) = self.did_id_table.get_val_from_id(&self.db, did_id.0)? else { 1045 + eprintln!("failed to look up did from did_id {did_id:?}"); 1046 + continue; 1047 + }; 1048 + let Some(DidIdValue(_, active)) = self.did_id_table.get_id_val(&self.db, &did)? else { 1049 + eprintln!("failed to look up did_value from did_id {did_id:?}: {did:?}: data consistency bug?"); 1050 + continue; 1051 + }; 1052 + if !active { 1053 + continue; 1054 + } 1055 + 1056 + // page-management, continued 1057 + // if we have a full page, and we're inserting a *new* key less than 1058 + // the current max, then we can evict the current max 1059 + let mut should_evict = false; 1060 + let entry = grouped_counts.entry(fwd_target.clone()).or_insert_with(|| { 1061 + // this is a *new* key, so kick the max if we're full 1062 + should_evict = page_is_full; 1063 + Default::default() 1064 + }); 1065 + entry.0 += 1; 1066 + entry.1.insert(did_id); 1067 + 1068 + if should_evict { 1069 + grouped_counts.pop_last(); 1070 + } 1071 + } 1072 + 1073 + let mut items: Vec<(String, u64, u64)> = Vec::with_capacity(grouped_counts.len()); 1074 + for (target_id, (n, dids)) in &grouped_counts { 1075 + let Some(target) = self 1076 + .target_id_table 1077 + .get_val_from_id(&self.db, target_id.0)? 1078 + else { 1079 + eprintln!("failed to look up target from target_id {target_id:?}"); 1080 + continue; 1081 + }; 1082 + items.push((target.0 .0, *n, dids.len() as u64)); 1083 + } 1084 + 1085 + let next = if grouped_counts.len() as u64 >= limit { 1086 + // yeah.... it's a number saved as a string......sorry 1087 + grouped_counts 1088 + .keys() 1089 + .next_back() 1090 + .map(|k| format!("{}", k.0)) 1091 + } else { 1092 + None 1093 + }; 1094 + 1095 + Ok(PagedOrderedCollection { items, next }) 1096 + } 1097 + 829 1098 fn get_count(&self, target: &str, collection: &str, path: &str) -> Result<u64> { 830 1099 let target_key = TargetKey( 831 1100 Target(target.to_string()), ··· 1042 1311 .map(|s| s.parse::<u64>()) 1043 1312 .transpose()? 1044 1313 .unwrap_or(0); 1314 + let started_at = self 1315 + .db 1316 + .get(STARTED_AT_KEY)? 1317 + .map(|c| _vr(&c)) 1318 + .transpose()? 1319 + .unwrap_or(COZY_FIRST_CURSOR); 1320 + 1321 + let other_data = self 1322 + .db 1323 + .get(TARGET_ID_REPAIR_STATE_KEY)? 1324 + .map(|s| _vr(&s)) 1325 + .transpose()? 1326 + .map( 1327 + |TargetIdRepairState { 1328 + current_us_started_at, 1329 + id_when_started, 1330 + latest_repaired_i, 1331 + }| { 1332 + HashMap::from([ 1333 + ("current_us_started_at".to_string(), current_us_started_at), 1334 + ("id_when_started".to_string(), id_when_started), 1335 + ("latest_repaired_i".to_string(), latest_repaired_i), 1336 + ]) 1337 + }, 1338 + ) 1339 + .unwrap_or(HashMap::default()); 1340 + 1045 1341 Ok(StorageStats { 1046 1342 dids, 1047 1343 targetables, 1048 1344 linking_records, 1345 + started_at: Some(started_at), 1346 + other_data, 1049 1347 }) 1050 1348 } 1051 1349 } ··· 1071 1369 impl AsRocksValue for &TargetId {} 1072 1370 impl KeyFromRocks for TargetKey {} 1073 1371 impl ValueFromRocks for TargetId {} 1372 + 1373 + // temp? 1374 + impl KeyFromRocks for TargetId {} 1375 + impl AsRocksValue for &TargetKey {} 1074 1376 1075 1377 // target_links table 1076 1378 impl AsRocksKey for &TargetId {} ··· 1142 1444 } 1143 1445 1144 1446 // target ids 1145 - #[derive(Debug, Clone, Serialize, Deserialize)] 1447 + #[derive(Debug, Clone, Serialize, Deserialize, PartialOrd, Ord, PartialEq, Eq, Hash)] 1146 1448 struct TargetId(u64); // key 1147 1449 1148 - #[derive(Debug, Clone, Serialize, Deserialize)] 1450 + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)] 1149 1451 pub struct Target(pub String); // the actual target/uri 1150 1452 1151 1453 // targets (uris, dids, etc.): the reverse index
+1 -1
constellation/templates/dids.html.j2
··· 27 27 {% for did in linking_dids %} 28 28 <pre style="display: block; margin: 1em 2em" class="code"><strong>DID</strong>: {{ did.0 }} 29 29 -> see <a href="/links/all?target={{ did.0|urlencode }}">links to this DID</a> 30 - -> browse <a href="https://atproto-browser-plus-links.vercel.app/at/{{ did.0|urlencode }}">this DID record</a></pre> 30 + -> browse <a href="https://pdsls.dev/at://{{ did.0 }}">this DID record</a></pre> 31 31 {% endfor %} 32 32 33 33 {% if let Some(c) = cursor %}
+1 -1
constellation/templates/get-backlinks.html.j2
··· 1 1 {% extends "base.html.j2" %} 2 2 {% import "try-it-macros.html.j2" as try_it %} 3 3 4 - {% block title %}Links{% endblock %} 4 + {% block title %}Backlinks{% endblock %} 5 5 {% block description %}All {{ query.source }} records with links to {{ query.subject }}{% endblock %} 6 6 7 7 {% block content %}
+67
constellation/templates/get-many-to-many-counts.html.j2
··· 1 + {% extends "base.html.j2" %} 2 + {% import "try-it-macros.html.j2" as try_it %} 3 + 4 + {% block title %}Many to Many counts{% endblock %} 5 + {% block description %}Counts of many-to-many {{ query.source }} join records with links to {{ query.subject }} and a secondary target at {{ query.path_to_other }}{% endblock %} 6 + 7 + {% block content %} 8 + 9 + {% call try_it::get_many_to_many_counts( 10 + query.subject, 11 + query.source, 12 + query.path_to_other, 13 + query.did, 14 + query.other_subject, 15 + query.limit, 16 + ) %} 17 + 18 + <h2> 19 + Many-to-many links to <code>{{ query.subject }}</code> joining through <code>{{ query.path_to_other }}</code> 20 + {% if let Some(browseable_uri) = query.subject|to_browseable %} 21 + <small style="font-weight: normal; font-size: 1rem"><a href="{{ browseable_uri }}">browse record</a></small> 22 + {% endif %} 23 + </h2> 24 + 25 + <p><strong>{% if cursor.is_some() || query.cursor.is_some() %}more than {% endif %}{{ counts_by_other_subject.len()|to_u64|human_number }} joins</strong> <code>{{ query.source }}โ†’{{ query.path_to_other }}</code></p> 26 + 27 + <ul> 28 + <li>See direct backlinks at <code>/xrpc/blue.microcosm.links.getBacklinks</code>: <a href="/xrpc/blue.microcosm.links.getBacklinks?subject={{ query.subject|urlencode }}&source={{ query.source|urlencode }}">/xrpc/blue.microcosm.links.getBacklinks?subject={{ query.subject }}&source={{ query.source }}</a></li> 29 + <li>See all links to this target at <code>/links/all</code>: <a href="/links/all?target={{ query.subject|urlencode }}">/links/all?target={{ query.subject }}</a></li> 30 + </ul> 31 + 32 + <h3>Counts by other subject:</h3> 33 + 34 + {% for counts in counts_by_other_subject %} 35 + <pre style="display: block; margin: 1em 2em" class="code"><strong>Joined subject</strong>: {{ counts.subject }} 36 + <strong>Joining records</strong>: {{ counts.total }} 37 + <strong>Unique joiner ids</strong>: {{ counts.distinct }} 38 + -> {% if let Some(browseable_uri) = counts.subject|to_browseable -%} 39 + <a href="{{ browseable_uri }}">browse record</a> 40 + {%- endif %}</pre> 41 + {% endfor %} 42 + 43 + {% if let Some(c) = cursor %} 44 + <form method="get" action="/xrpc/blue.microcosm.links.getManyToManyCounts"> 45 + <input type="hidden" name="subject" value="{{ query.subject }}" /> 46 + <input type="hidden" name="source" value="{{ query.source }}" /> 47 + <input type="hidden" name="pathToOther" value="{{ query.path_to_other }}" /> 48 + {% for did in query.did %} 49 + <input type="hidden" name="did" value="{{ did }}" /> 50 + {% endfor %} 51 + {% for otherSubject in query.other_subject %} 52 + <input type="hidden" name="otherSubject" value="{{ otherSubject }}" /> 53 + {% endfor %} 54 + <input type="hidden" name="limit" value="{{ query.limit }}" /> 55 + <input type="hidden" name="cursor" value={{ c|json|safe }} /> 56 + <button type="submit">next page&hellip;</button> 57 + </form> 58 + {% else %} 59 + <button disabled><em>end of results</em></button> 60 + {% endif %} 61 + 62 + <details> 63 + <summary>Raw JSON response</summary> 64 + <pre class="code">{{ self|tojson }}</pre> 65 + </details> 66 + 67 + {% endblock %}
+38 -2
constellation/templates/hello.html.j2
··· 19 19 <p>It works by recursively walking <em>all</em> records coming through the firehose, searching for anything that looks like a link. Links are indexed by the target they point at, the collection the record came from, and the JSON path to the link in that record.</p> 20 20 21 21 <p> 22 - This server has indexed <span class="stat">{{ stats.linking_records|human_number }}</span> links between <span class="stat">{{ stats.targetables|human_number }}</span> targets and sources from <span class="stat">{{ stats.dids|human_number }}</span> identities over <span class="stat">{{ days_indexed|human_number }}</span> days.<br/> 22 + This server has indexed <span class="stat">{{ stats.linking_records|human_number }}</span> links between <span class="stat">{{ stats.targetables|human_number }}</span> targets and sources from <span class="stat">{{ stats.dids|human_number }}</span> identities over <span class="stat"> 23 + {%- if let Some(days) = days_indexed %} 24 + {{ days|human_number }} 25 + {% else %} 26 + ??? 27 + {% endif -%} 28 + </span> days.<br/> 23 29 <small>(indexing new records in real time, backfill coming soon!)</small> 24 30 </p> 25 31 26 - <p>But feel free to use it! If you want to be nice, put your project name and bsky username (or email) in your user-agent header for api requests.</p> 32 + {# {% for k, v in stats.other_data.iter() %} 33 + <p><strong>{{ k }}</strong>: {{ v }}</p> 34 + {% endfor %} #} 35 + 36 + <p>You're welcome to use this public instance! Please do not build the torment nexus. If you want to be nice, put your project name and bsky username (or email) in your user-agent header for api requests.</p> 27 37 28 38 29 39 <h2>API Endpoints</h2> ··· 43 53 44 54 <p style="margin-bottom: 0"><strong>Try it:</strong></p> 45 55 {% call try_it::get_backlinks("at://did:plc:a4pqq234yw7fqbddawjo7y35/app.bsky.feed.post/3m237ilwc372e", "app.bsky.feed.like:subject.uri", [""], 16) %} 56 + 57 + 58 + <h3 class="route"><code>GET /xrpc/blue.microcosm.links.getManyToManyCounts</code></h3> 59 + 60 + <p>TODO: description</p> 61 + 62 + <h4>Query parameters:</h4> 63 + 64 + <ul> 65 + <li><p><code>subject</code>: required, must url-encode. Example: <code>at://did:plc:vc7f4oafdgxsihk4cry2xpze/app.bsky.feed.post/3lgwdn7vd722r</code></p></li> 66 + <li><p><code>source</code>: required. Example: <code>app.bsky.feed.like:subject.uri</code></p></li> 67 + <li><p><code>pathToOther</code>: required. Path to the secondary link in the many-to-many record. Example: <code>otherThing.uri</code></p></li> 68 + <li><p><code>did</code>: optional, filter links to those from specific users. Include multiple times to filter by multiple users. Example: <code>did=did:plc:vc7f4oafdgxsihk4cry2xpze&did=did:plc:vc7f4oafdgxsihk4cry2xpze</code></p></li> 69 + <li><p><code>otherSubject</code>: optional, filter secondary links to specific subjects. Include multiple times to filter by multiple users. Example: <code>at://did:plc:vc7f4oafdgxsihk4cry2xpze/app.bsky.feed.post/3lgwdn7vd722r</code></p></li> 70 + <li><p><code>limit</code>: optional. Default: <code>16</code>. Maximum: <code>100</code></p></li> 71 + </ul> 72 + 73 + <p style="margin-bottom: 0"><strong>Try it:</strong></p> 74 + {% call try_it::get_many_to_many_counts( 75 + "at://did:plc:wshs7t2adsemcrrd4snkeqli/sh.tangled.label.definition/good-first-issue", 76 + "sh.tangled.label.op:add[].key", 77 + "subject", 78 + [""], 79 + [""], 80 + 25, 81 + ) %} 46 82 47 83 48 84 <h3 class="route"><code>GET /links</code></h3>
+43 -1
constellation/templates/try-it-macros.html.j2
··· 1 1 {% macro get_backlinks(subject, source, dids, limit) %} 2 2 <form method="get" action="/xrpc/blue.microcosm.links.getBacklinks"> 3 - <pre class="code"><strong>GET</strong> /links 3 + <pre class="code"><strong>GET</strong> /xrpc/blue.microcosm.links.getBacklinks 4 4 ?subject= <input type="text" name="subject" value="{{ subject }}" placeholder="at-uri, did, uri..." /> 5 5 &source= <input type="text" name="source" value="{{ source }}" placeholder="app.bsky.feed.like:subject.uri" /> 6 6 {%- for did in dids %}{% if !did.is_empty() %} ··· 20 20 p.insertBefore(document.createTextNode('&did= '), didPlaceholder); 21 21 p.insertBefore(i, didPlaceholder); 22 22 p.insertBefore(document.createTextNode('\n '), didPlaceholder); 23 + }); 24 + </script> 25 + {% endmacro %} 26 + 27 + {% macro get_many_to_many_counts(subject, source, pathToOther, dids, otherSubjects, limit) %} 28 + <form method="get" action="/xrpc/blue.microcosm.links.getManyToManyCounts"> 29 + <pre class="code"><strong>GET</strong> /xrpc/blue.microcosm.links.getManyToManyCounts 30 + ?subject= <input type="text" name="subject" value="{{ subject }}" placeholder="at-uri, did, uri..." /> 31 + &source= <input type="text" name="source" value="{{ source }}" placeholder="app.bsky.feed.like:subject.uri" /> 32 + &pathToOther= <input type="text" name="pathToOther" value="{{ pathToOther }}" placeholder="otherThing.uri" /> 33 + {%- for did in dids %}{% if !did.is_empty() %} 34 + &did= <input type="text" name="did" value="{{ did }}" placeholder="did:plc:..." />{% endif %}{% endfor %} 35 + <span id="m2m-subject-placeholder"></span> <button id="m2m-add-subject">+ other subject filter</button> 36 + {%- for otherSubject in otherSubjects %}{% if !otherSubject.is_empty() %} 37 + &otherSubject= <input type="text" name="did" value="{{ otherSubject }}" placeholder="at-uri, did, uri..." />{% endif %}{% endfor %} 38 + <span id="m2m-did-placeholder"></span> <button id="m2m-add-did">+ did filter</button> 39 + &limit= <input type="number" name="limit" value="{{ limit }}" max="100" placeholder="100" /> <button type="submit">get links</button></pre> 40 + </form> 41 + <script> 42 + const m2mAddDidButton = document.getElementById('m2m-add-did'); 43 + const m2mDidPlaceholder = document.getElementById('m2m-did-placeholder'); 44 + m2mAddDidButton.addEventListener('click', e => { 45 + e.preventDefault(); 46 + const i = document.createElement('input'); 47 + i.placeholder = 'did:plc:...'; 48 + i.name = "did" 49 + const p = m2mAddDidButton.parentNode; 50 + p.insertBefore(document.createTextNode('&did= '), m2mDidPlaceholder); 51 + p.insertBefore(i, m2mDidPlaceholder); 52 + p.insertBefore(document.createTextNode('\n '), m2mDidPlaceholder); 53 + }); 54 + const m2mAddSubjectButton = document.getElementById('m2m-add-subject'); 55 + const m2mSubjectPlaceholder = document.getElementById('m2m-subject-placeholder'); 56 + m2mAddSubjectButton.addEventListener('click', e => { 57 + e.preventDefault(); 58 + const i = document.createElement('input'); 59 + i.placeholder = 'at-uri, did, uri...'; 60 + i.name = "otherSubject" 61 + const p = m2mAddSubjectButton.parentNode; 62 + p.insertBefore(document.createTextNode('&otherSubject= '), m2mSubjectPlaceholder); 63 + p.insertBefore(i, m2mSubjectPlaceholder); 64 + p.insertBefore(document.createTextNode('\n '), m2mSubjectPlaceholder); 23 65 }); 24 66 </script> 25 67 {% endmacro %}
+95
lexicons/blue.microcosm/links/getBacklinks.json
··· 1 + { 2 + "lexicon": 1, 3 + "id": "blue.microcosm.links.getBacklinks", 4 + "defs": { 5 + "main": { 6 + "type": "query", 7 + "description": "a list of records linking to any record, identity, or uri", 8 + "parameters": { 9 + "type": "params", 10 + "required": [ 11 + "subject", 12 + "source" 13 + ], 14 + "properties": { 15 + "subject": { 16 + "type": "string", 17 + "format": "uri", 18 + "description": "the target being linked to (at-uri, did, or uri)" 19 + }, 20 + "source": { 21 + "type": "string", 22 + "description": "collection and path specification (e.g., 'app.bsky.feed.like:subject.uri')" 23 + }, 24 + "did": { 25 + "type": "array", 26 + "description": "filter links to those from specific users", 27 + "items": { 28 + "type": "string", 29 + "format": "did" 30 + } 31 + }, 32 + "limit": { 33 + "type": "integer", 34 + "minimum": 1, 35 + "maximum": 100, 36 + "default": 16, 37 + "description": "number of results to return" 38 + } 39 + } 40 + }, 41 + "output": { 42 + "encoding": "application/json", 43 + "schema": { 44 + "type": "object", 45 + "required": [ 46 + "total", 47 + "records" 48 + ], 49 + "properties": { 50 + "total": { 51 + "type": "integer", 52 + "description": "total number of matching links" 53 + }, 54 + "records": { 55 + "type": "array", 56 + "items": { 57 + "type": "ref", 58 + "ref": "#linkRecord" 59 + } 60 + }, 61 + "cursor": { 62 + "type": "string", 63 + "description": "pagination cursor" 64 + } 65 + } 66 + } 67 + } 68 + }, 69 + "linkRecord": { 70 + "type": "object", 71 + "required": [ 72 + "did", 73 + "collection", 74 + "rkey" 75 + ], 76 + "properties": { 77 + "did": { 78 + "type": "string", 79 + "format": "did", 80 + "description": "the DID of the linking record's repository" 81 + }, 82 + "collection": { 83 + "type": "string", 84 + "format": "nsid", 85 + "description": "the collection of the linking record" 86 + }, 87 + "rkey": { 88 + "type": "string", 89 + "format": "record-key", 90 + "description": "the record key of the linking record" 91 + } 92 + } 93 + } 94 + } 95 + }
+99
lexicons/blue.microcosm/links/getManyToManyCounts.json
··· 1 + { 2 + "lexicon": 1, 3 + "id": "blue.microcosm.links.getManyToManyCounts", 4 + "defs": { 5 + "main": { 6 + "type": "query", 7 + "description": "count many-to-many relationships with secondary link paths", 8 + "parameters": { 9 + "type": "params", 10 + "required": [ 11 + "subject", 12 + "source", 13 + "pathToOther" 14 + ], 15 + "properties": { 16 + "subject": { 17 + "type": "string", 18 + "format": "uri", 19 + "description": "the primary target being linked to (at-uri, did, or uri)" 20 + }, 21 + "source": { 22 + "type": "string", 23 + "description": "collection and path specification for the primary link" 24 + }, 25 + "pathToOther": { 26 + "type": "string", 27 + "description": "path to the secondary link in the many-to-many record (e.g., 'otherThing.uri')" 28 + }, 29 + "did": { 30 + "type": "array", 31 + "description": "filter links to those from specific users", 32 + "items": { 33 + "type": "string", 34 + "format": "did" 35 + } 36 + }, 37 + "otherSubject": { 38 + "type": "array", 39 + "description": "filter secondary links to specific subjects", 40 + "items": { 41 + "type": "string" 42 + } 43 + }, 44 + "limit": { 45 + "type": "integer", 46 + "minimum": 1, 47 + "maximum": 100, 48 + "default": 16, 49 + "description": "number of results to return" 50 + } 51 + } 52 + }, 53 + "output": { 54 + "encoding": "application/json", 55 + "schema": { 56 + "type": "object", 57 + "required": [ 58 + "counts_by_other_subject" 59 + ], 60 + "properties": { 61 + "counts_by_other_subject": { 62 + "type": "array", 63 + "items": { 64 + "type": "ref", 65 + "ref": "#countBySubject" 66 + } 67 + }, 68 + "cursor": { 69 + "type": "string", 70 + "description": "pagination cursor" 71 + } 72 + } 73 + } 74 + } 75 + }, 76 + "countBySubject": { 77 + "type": "object", 78 + "required": [ 79 + "subject", 80 + "total", 81 + "distinct" 82 + ], 83 + "properties": { 84 + "subject": { 85 + "type": "string", 86 + "description": "the secondary subject being counted" 87 + }, 88 + "total": { 89 + "type": "integer", 90 + "description": "total number of links to this subject" 91 + }, 92 + "distinct": { 93 + "type": "integer", 94 + "description": "number of distinct DIDs linking to this subject" 95 + } 96 + } 97 + } 98 + } 99 + }
+56
lexicons/com.bad-example/identity/resolveMiniDoc.json
··· 1 + { 2 + "lexicon": 1, 3 + "id": "com.bad-example.identity.resolveMiniDoc", 4 + "defs": { 5 + "main": { 6 + "type": "query", 7 + "description": "like com.atproto.identity.resolveIdentity but instead of the full didDoc it returns an atproto-relevant subset", 8 + "parameters": { 9 + "type": "params", 10 + "required": [ 11 + "identifier" 12 + ], 13 + "properties": { 14 + "identifier": { 15 + "type": "string", 16 + "format": "at-identifier", 17 + "description": "handle or DID to resolve" 18 + } 19 + } 20 + }, 21 + "output": { 22 + "encoding": "application/json", 23 + "schema": { 24 + "type": "object", 25 + "required": [ 26 + "did", 27 + "handle", 28 + "pds", 29 + "signing_key" 30 + ], 31 + "properties": { 32 + "did": { 33 + "type": "string", 34 + "format": "did", 35 + "description": "DID, bi-directionally verified if a handle was provided in the query" 36 + }, 37 + "handle": { 38 + "type": "string", 39 + "format": "handle", 40 + "description": "the validated handle of the account or 'handle.invalid' if the handle did not bi-directionally match the DID document" 41 + }, 42 + "pds": { 43 + "type": "string", 44 + "format": "uri", 45 + "description": "the identity's PDS URL" 46 + }, 47 + "signing_key": { 48 + "type": "string", 49 + "description": "the atproto signing key publicKeyMultibase" 50 + } 51 + } 52 + } 53 + } 54 + } 55 + } 56 + }
+54
lexicons/com.bad-example/repo/getUriRecord.json
··· 1 + { 2 + "lexicon": 1, 3 + "id": "com.bad-example.repo.getUriRecord", 4 + "defs": { 5 + "main": { 6 + "type": "query", 7 + "description": "ergonomic complement to com.atproto.repo.getRecord which accepts an at-uri instead of individual repo/collection/rkey params", 8 + "parameters": { 9 + "type": "params", 10 + "required": [ 11 + "at_uri" 12 + ], 13 + "properties": { 14 + "at_uri": { 15 + "type": "string", 16 + "format": "at-uri", 17 + "description": "the at-uri of the record (identifier can be a DID or handle)" 18 + }, 19 + "cid": { 20 + "type": "string", 21 + "format": "cid", 22 + "description": "optional CID of the version of the record. if not specified, return the most recent version. if specified and a newer version exists, returns 404." 23 + } 24 + } 25 + }, 26 + "output": { 27 + "encoding": "application/json", 28 + "schema": { 29 + "type": "object", 30 + "required": [ 31 + "uri", 32 + "value" 33 + ], 34 + "properties": { 35 + "uri": { 36 + "type": "string", 37 + "format": "at-uri", 38 + "description": "at-uri for this record" 39 + }, 40 + "cid": { 41 + "type": "string", 42 + "format": "cid", 43 + "description": "CID for this exact version of the record" 44 + }, 45 + "value": { 46 + "type": "unknown", 47 + "description": "the record itself" 48 + } 49 + } 50 + } 51 + } 52 + } 53 + } 54 + }
+11 -11
slingshot/src/server.rs
··· 437 437 Ok(did) => did, 438 438 Err(_) => { 439 439 let Ok(alleged_handle) = Handle::new(identifier) else { 440 - return invalid("identifier was not a valid DID or handle"); 440 + return invalid("Identifier was not a valid DID or handle"); 441 441 }; 442 442 443 443 match self.identity.handle_to_did(alleged_handle.clone()).await { ··· 453 453 Err(e) => { 454 454 log::debug!("failed to resolve handle: {e}"); 455 455 // TODO: ServerError not BadRequest 456 - return invalid("errored while trying to resolve handle to DID"); 456 + return invalid("Errored while trying to resolve handle to DID"); 457 457 } 458 458 } 459 459 } 460 460 }; 461 461 let Ok(partial_doc) = self.identity.did_to_partial_mini_doc(&did).await else { 462 - return invalid("failed to get DID doc"); 462 + return invalid("Failed to get DID doc"); 463 463 }; 464 464 let Some(partial_doc) = partial_doc else { 465 - return invalid("failed to find DID doc"); 465 + return invalid("Failed to find DID doc"); 466 466 }; 467 467 468 468 // ok so here's where we're at: ··· 483 483 .handle_to_did(partial_doc.unverified_handle.clone()) 484 484 .await 485 485 else { 486 - return invalid("failed to get did doc's handle"); 486 + return invalid("Failed to get DID doc's handle"); 487 487 }; 488 488 let Some(handle_did) = handle_did else { 489 - return invalid("failed to resolve did doc's handle"); 489 + return invalid("Failed to resolve DID doc's handle"); 490 490 }; 491 491 if handle_did == did { 492 492 partial_doc.unverified_handle.to_string() ··· 516 516 let Ok(handle) = Handle::new(repo) else { 517 517 return GetRecordResponse::BadRequest(xrpc_error( 518 518 "InvalidRequest", 519 - "repo was not a valid DID or handle", 519 + "Repo was not a valid DID or handle", 520 520 )); 521 521 }; 522 522 match self.identity.handle_to_did(handle).await { ··· 534 534 log::debug!("handle resolution failed: {e}"); 535 535 return GetRecordResponse::ServerError(xrpc_error( 536 536 "ResolutionFailed", 537 - "errored while trying to resolve handle to DID", 537 + "Errored while trying to resolve handle to DID", 538 538 )); 539 539 } 540 540 } ··· 544 544 let Ok(collection) = Nsid::new(collection) else { 545 545 return GetRecordResponse::BadRequest(xrpc_error( 546 546 "InvalidRequest", 547 - "invalid NSID for collection", 547 + "Invalid NSID for collection", 548 548 )); 549 549 }; 550 550 551 551 let Ok(rkey) = RecordKey::new(rkey) else { 552 - return GetRecordResponse::BadRequest(xrpc_error("InvalidRequest", "invalid rkey")); 552 + return GetRecordResponse::BadRequest(xrpc_error("InvalidRequest", "Invalid rkey")); 553 553 }; 554 554 555 555 let cid: Option<Cid> = if let Some(cid) = cid { 556 556 let Ok(cid) = Cid::from_str(&cid) else { 557 - return GetRecordResponse::BadRequest(xrpc_error("InvalidRequest", "invalid CID")); 557 + return GetRecordResponse::BadRequest(xrpc_error("InvalidRequest", "Invalid CID")); 558 558 }; 559 559 Some(cid) 560 560 } else {
+2
spacedust/src/error.rs
··· 30 30 TooManySourcesWanted, 31 31 #[error("more wantedSubjectDids were requested than allowed (max 10,000)")] 32 32 TooManyDidsWanted, 33 + #[error("more wantedSubjectPrefixes were requested than allowed (max 100)")] 34 + TooManySubjectPrefixesWanted, 33 35 #[error("more wantedSubjects were requested than allowed (max 50,000)")] 34 36 TooManySubjectsWanted, 35 37 }
+11 -2
spacedust/src/server.rs
··· 227 227 #[serde(default)] 228 228 pub wanted_subjects: HashSet<String>, 229 229 #[serde(default)] 230 + pub wanted_subject_prefixes: HashSet<String>, 231 + #[serde(default)] 230 232 pub wanted_subject_dids: HashSet<String>, 231 233 #[serde(default)] 232 234 pub wanted_sources: HashSet<String>, ··· 241 243 /// 242 244 /// The at-uri must be url-encoded 243 245 /// 244 - /// Pass this parameter multiple times to specify multiple collections, like 246 + /// Pass this parameter multiple times to specify multiple subjects, like 245 247 /// `wantedSubjects=[...]&wantedSubjects=[...]` 246 248 pub wanted_subjects: String, 249 + /// One or more at-uri, URI, or DID prefixes to receive links about 250 + /// 251 + /// The uri must be url-encoded 252 + /// 253 + /// Pass this parameter multiple times to specify multiple prefixes, like 254 + /// `wantedSubjectPrefixes=[...]&wantedSubjectPrefixes=[...]` 255 + pub wanted_subject_prefixes: String, 247 256 /// One or more DIDs to receive links about 248 257 /// 249 - /// Pass this parameter multiple times to specify multiple collections 258 + /// Pass this parameter multiple times to specify multiple subjects 250 259 pub wanted_subject_dids: String, 251 260 /// One or more link sources to receive links about 252 261 ///
+10 -1
spacedust/src/subscriber.rs
··· 124 124 let query = &self.query; 125 125 126 126 // subject + subject DIDs are logical OR 127 - if !(query.wanted_subjects.is_empty() && query.wanted_subject_dids.is_empty() 127 + if !(query.wanted_subjects.is_empty() 128 + && query.wanted_subject_prefixes.is_empty() 129 + && query.wanted_subject_dids.is_empty() 128 130 || query.wanted_subjects.contains(&properties.subject) 131 + || query 132 + .wanted_subject_prefixes 133 + .iter() 134 + .any(|p| properties.subject.starts_with(p)) 129 135 || properties 130 136 .subject_did 131 137 .as_ref() ··· 154 160 } 155 161 if opts.wanted_subject_dids.len() > 10_000 { 156 162 return Err(SubscriberUpdateError::TooManyDidsWanted); 163 + } 164 + if opts.wanted_subject_prefixes.len() > 100 { 165 + return Err(SubscriberUpdateError::TooManySubjectPrefixesWanted); 157 166 } 158 167 if opts.wanted_subjects.len() > 50_000 { 159 168 return Err(SubscriberUpdateError::TooManySubjectsWanted);
+1 -1
ufos/Cargo.toml
··· 13 13 clap = { version = "4.5.31", features = ["derive"] } 14 14 dropshot = "0.16.0" 15 15 env_logger = "0.11.7" 16 - fjall = { git = "https://github.com/fjall-rs/fjall.git", features = ["lz4"] } 16 + fjall = { git = "https://github.com/fjall-rs/fjall.git", rev = "fb229572bb7d1d6966a596994dc1708e47ec57d8", features = ["lz4"] } 17 17 getrandom = "0.3.3" 18 18 http = "1.3.1" 19 19 jetstream = { path = "../jetstream", features = ["metrics"] }