Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm

Compare changes

Choose any two refs to compare.

Changed files
+67 -32
constellation
src
consumer
server
templates
slingshot
+13 -6
constellation/src/consumer/jetstream.rs
··· 226 226 println!("jetstream closed the websocket cleanly."); 227 227 break; 228 228 } 229 - r => eprintln!("jetstream: close result after error: {r:?}"), 229 + Err(_) => { 230 + counter!("jetstream_read_fail", "url" => stream.clone(), "reason" => "dirty close").increment(1); 231 + println!("jetstream failed to close the websocket cleanly."); 232 + break; 233 + } 234 + Ok(r) => { 235 + eprintln!("jetstream: close result after error: {r:?}"); 236 + counter!("jetstream_read_fail", "url" => stream.clone(), "reason" => "read error") 237 + .increment(1); 238 + // if we didn't immediately get ConnectionClosed, we should keep polling read 239 + // until we get it. 240 + continue; 241 + } 230 242 } 231 - counter!("jetstream_read_fail", "url" => stream.clone(), "reason" => "read error") 232 - .increment(1); 233 - // if we didn't immediately get ConnectionClosed, we should keep polling read 234 - // until we get it. 235 - continue; 236 243 } 237 244 }; 238 245
+4 -6
constellation/src/server/filters.rs
··· 5 5 Ok({ 6 6 if let Some(link) = parse_any_link(s) { 7 7 match link { 8 - Link::AtUri(at_uri) => at_uri.strip_prefix("at://").map(|noproto| { 9 - format!("https://atproto-browser-plus-links.vercel.app/at/{noproto}") 10 - }), 11 - Link::Did(did) => Some(format!( 12 - "https://atproto-browser-plus-links.vercel.app/at/{did}" 13 - )), 8 + Link::AtUri(at_uri) => at_uri 9 + .strip_prefix("at://") 10 + .map(|noproto| format!("https://pdsls.dev/at://{noproto}")), 11 + Link::Did(did) => Some(format!("https://pdsls.dev/at://{did}")), 14 12 Link::Uri(uri) => Some(uri), 15 13 } 16 14 } else {
+1 -1
constellation/templates/dids.html.j2
··· 27 27 {% for did in linking_dids %} 28 28 <pre style="display: block; margin: 1em 2em" class="code"><strong>DID</strong>: {{ did.0 }} 29 29 -> see <a href="/links/all?target={{ did.0|urlencode }}">links to this DID</a> 30 - -> browse <a href="https://atproto-browser-plus-links.vercel.app/at/{{ did.0|urlencode }}">this DID record</a></pre> 30 + -> browse <a href="https://pdsls.dev/at://{{ did.0|urlencode }}">this DID record</a></pre> 31 31 {% endfor %} 32 32 33 33 {% if let Some(c) = cursor %}
+4 -2
slingshot/src/firehose_cache.rs
··· 4 4 5 5 pub async fn firehose_cache( 6 6 cache_dir: impl AsRef<Path>, 7 + memory_mb: usize, 8 + disk_gb: usize, 7 9 ) -> Result<HybridCache<String, CachedRecord>, String> { 8 10 let cache = HybridCacheBuilder::new() 9 11 .with_name("firehose") 10 - .memory(64 * 2_usize.pow(20)) 12 + .memory(memory_mb * 2_usize.pow(20)) 11 13 .with_weighter(|k: &String, v| k.len() + std::mem::size_of_val(v)) 12 14 .storage(Engine::large()) 13 15 .with_device_options( 14 16 DirectFsDeviceOptions::new(cache_dir) 15 - .with_capacity(2_usize.pow(30)) // TODO: configurable (1GB to have something) 17 + .with_capacity(disk_gb * 2_usize.pow(30)) 16 18 .with_file_size(16 * 2_usize.pow(20)), // note: this does limit the max cached item size, warning jumbo records 17 19 ) 18 20 .build()
+26 -5
slingshot/src/main.rs
··· 25 25 /// where to keep disk caches 26 26 #[arg(long)] 27 27 cache_dir: PathBuf, 28 + /// memory cache size in MB 29 + #[arg(long, default_value_t = 64)] 30 + cache_memory_mb: usize, 31 + /// disk cache size in GB 32 + #[arg(long, default_value_t = 1)] 33 + cache_disk_gb: usize, 34 + /// host for HTTP server (when not using --domain) 35 + #[arg(long, default_value = "127.0.0.1")] 36 + host: String, 37 + /// port for HTTP server (when not using --domain) 38 + #[arg(long, default_value_t = 3000)] 39 + port: u16, 40 + /// port for metrics/prometheus server 41 + #[arg(long, default_value_t = 8765)] 42 + metrics_port: u16, 28 43 /// the domain pointing to this server 29 44 /// 30 45 /// if present: ··· 62 77 63 78 let args = Args::parse(); 64 79 65 - if let Err(e) = install_metrics_server() { 80 + if let Err(e) = install_metrics_server(args.metrics_port) { 66 81 log::error!("failed to install metrics server: {e:?}"); 67 82 } else { 68 - log::info!("metrics listening at http://0.0.0.0:8765"); 83 + log::info!("metrics listening at http://0.0.0.0:{}", args.metrics_port); 69 84 } 70 85 71 86 std::fs::create_dir_all(&args.cache_dir).map_err(|e| { ··· 83 98 log::info!("cache dir ready at at {cache_dir:?}."); 84 99 85 100 log::info!("setting up firehose cache..."); 86 - let cache = firehose_cache(cache_dir.join("./firehose")).await?; 101 + let cache = firehose_cache( 102 + cache_dir.join("./firehose"), 103 + args.cache_memory_mb, 104 + args.cache_disk_gb, 105 + ) 106 + .await?; 87 107 log::info!("firehose cache ready."); 88 108 89 109 let mut tasks: tokio::task::JoinSet<Result<(), MainTaskError>> = tokio::task::JoinSet::new(); ··· 112 132 args.domain, 113 133 args.acme_contact, 114 134 args.certs, 135 + args.host, 136 + args.port, 115 137 server_shutdown, 116 138 ) 117 139 .await?; ··· 172 194 Ok(()) 173 195 } 174 196 175 - fn install_metrics_server() -> Result<(), metrics_exporter_prometheus::BuildError> { 197 + fn install_metrics_server(port: u16) -> Result<(), metrics_exporter_prometheus::BuildError> { 176 198 log::info!("installing metrics server..."); 177 199 let host = [0, 0, 0, 0]; 178 - let port = 8765; 179 200 PrometheusBuilder::new() 180 201 .set_quantiles(&[0.5, 0.9, 0.99, 1.0])? 181 202 .set_bucket_duration(std::time::Duration::from_secs(300))?
+19 -12
slingshot/src/server.rs
··· 437 437 Ok(did) => did, 438 438 Err(_) => { 439 439 let Ok(alleged_handle) = Handle::new(identifier) else { 440 - return invalid("identifier was not a valid DID or handle"); 440 + return invalid("Identifier was not a valid DID or handle"); 441 441 }; 442 442 443 443 match self.identity.handle_to_did(alleged_handle.clone()).await { ··· 453 453 Err(e) => { 454 454 log::debug!("failed to resolve handle: {e}"); 455 455 // TODO: ServerError not BadRequest 456 - return invalid("errored while trying to resolve handle to DID"); 456 + return invalid("Errored while trying to resolve handle to DID"); 457 457 } 458 458 } 459 459 } 460 460 }; 461 461 let Ok(partial_doc) = self.identity.did_to_partial_mini_doc(&did).await else { 462 - return invalid("failed to get DID doc"); 462 + return invalid("Failed to get DID doc"); 463 463 }; 464 464 let Some(partial_doc) = partial_doc else { 465 - return invalid("failed to find DID doc"); 465 + return invalid("Failed to find DID doc"); 466 466 }; 467 467 468 468 // ok so here's where we're at: ··· 483 483 .handle_to_did(partial_doc.unverified_handle.clone()) 484 484 .await 485 485 else { 486 - return invalid("failed to get did doc's handle"); 486 + return invalid("Failed to get DID doc's handle"); 487 487 }; 488 488 let Some(handle_did) = handle_did else { 489 - return invalid("failed to resolve did doc's handle"); 489 + return invalid("Failed to resolve DID doc's handle"); 490 490 }; 491 491 if handle_did == did { 492 492 partial_doc.unverified_handle.to_string() ··· 516 516 let Ok(handle) = Handle::new(repo) else { 517 517 return GetRecordResponse::BadRequest(xrpc_error( 518 518 "InvalidRequest", 519 - "repo was not a valid DID or handle", 519 + "Repo was not a valid DID or handle", 520 520 )); 521 521 }; 522 522 match self.identity.handle_to_did(handle).await { ··· 534 534 log::debug!("handle resolution failed: {e}"); 535 535 return GetRecordResponse::ServerError(xrpc_error( 536 536 "ResolutionFailed", 537 - "errored while trying to resolve handle to DID", 537 + "Errored while trying to resolve handle to DID", 538 538 )); 539 539 } 540 540 } ··· 544 544 let Ok(collection) = Nsid::new(collection) else { 545 545 return GetRecordResponse::BadRequest(xrpc_error( 546 546 "InvalidRequest", 547 - "invalid NSID for collection", 547 + "Invalid NSID for collection", 548 548 )); 549 549 }; 550 550 551 551 let Ok(rkey) = RecordKey::new(rkey) else { 552 - return GetRecordResponse::BadRequest(xrpc_error("InvalidRequest", "invalid rkey")); 552 + return GetRecordResponse::BadRequest(xrpc_error("InvalidRequest", "Invalid rkey")); 553 553 }; 554 554 555 555 let cid: Option<Cid> = if let Some(cid) = cid { 556 556 let Ok(cid) = Cid::from_str(&cid) else { 557 - return GetRecordResponse::BadRequest(xrpc_error("InvalidRequest", "invalid CID")); 557 + return GetRecordResponse::BadRequest(xrpc_error("InvalidRequest", "Invalid CID")); 558 558 }; 559 559 Some(cid) 560 560 } else { ··· 694 694 domain: Option<String>, 695 695 acme_contact: Option<String>, 696 696 certs: Option<PathBuf>, 697 + host: String, 698 + port: u16, 697 699 shutdown: CancellationToken, 698 700 ) -> Result<(), ServerError> { 699 701 let repo = Arc::new(repo); ··· 752 754 ) 753 755 .await 754 756 } else { 755 - run(TcpListener::bind("127.0.0.1:3000"), app, shutdown).await 757 + run( 758 + TcpListener::bind(format!("{host}:{port}")), 759 + app, 760 + shutdown, 761 + ) 762 + .await 756 763 } 757 764 } 758 765