Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm

Compare changes

Choose any two refs to compare.

Changed files
+67 -32
constellation
src
consumer
server
templates
slingshot
+13 -6
constellation/src/consumer/jetstream.rs
··· 226 println!("jetstream closed the websocket cleanly."); 227 break; 228 } 229 - r => eprintln!("jetstream: close result after error: {r:?}"), 230 } 231 - counter!("jetstream_read_fail", "url" => stream.clone(), "reason" => "read error") 232 - .increment(1); 233 - // if we didn't immediately get ConnectionClosed, we should keep polling read 234 - // until we get it. 235 - continue; 236 } 237 }; 238
··· 226 println!("jetstream closed the websocket cleanly."); 227 break; 228 } 229 + Err(_) => { 230 + counter!("jetstream_read_fail", "url" => stream.clone(), "reason" => "dirty close").increment(1); 231 + println!("jetstream failed to close the websocket cleanly."); 232 + break; 233 + } 234 + Ok(r) => { 235 + eprintln!("jetstream: close result after error: {r:?}"); 236 + counter!("jetstream_read_fail", "url" => stream.clone(), "reason" => "read error") 237 + .increment(1); 238 + // if we didn't immediately get ConnectionClosed, we should keep polling read 239 + // until we get it. 240 + continue; 241 + } 242 } 243 } 244 }; 245
+4 -6
constellation/src/server/filters.rs
··· 5 Ok({ 6 if let Some(link) = parse_any_link(s) { 7 match link { 8 - Link::AtUri(at_uri) => at_uri.strip_prefix("at://").map(|noproto| { 9 - format!("https://atproto-browser-plus-links.vercel.app/at/{noproto}") 10 - }), 11 - Link::Did(did) => Some(format!( 12 - "https://atproto-browser-plus-links.vercel.app/at/{did}" 13 - )), 14 Link::Uri(uri) => Some(uri), 15 } 16 } else {
··· 5 Ok({ 6 if let Some(link) = parse_any_link(s) { 7 match link { 8 + Link::AtUri(at_uri) => at_uri 9 + .strip_prefix("at://") 10 + .map(|noproto| format!("https://pdsls.dev/at://{noproto}")), 11 + Link::Did(did) => Some(format!("https://pdsls.dev/at://{did}")), 12 Link::Uri(uri) => Some(uri), 13 } 14 } else {
+1 -1
constellation/templates/dids.html.j2
··· 27 {% for did in linking_dids %} 28 <pre style="display: block; margin: 1em 2em" class="code"><strong>DID</strong>: {{ did.0 }} 29 -> see <a href="/links/all?target={{ did.0|urlencode }}">links to this DID</a> 30 - -> browse <a href="https://atproto-browser-plus-links.vercel.app/at/{{ did.0|urlencode }}">this DID record</a></pre> 31 {% endfor %} 32 33 {% if let Some(c) = cursor %}
··· 27 {% for did in linking_dids %} 28 <pre style="display: block; margin: 1em 2em" class="code"><strong>DID</strong>: {{ did.0 }} 29 -> see <a href="/links/all?target={{ did.0|urlencode }}">links to this DID</a> 30 + -> browse <a href="https://pdsls.dev/at://{{ did.0|urlencode }}">this DID record</a></pre> 31 {% endfor %} 32 33 {% if let Some(c) = cursor %}
+4 -2
slingshot/src/firehose_cache.rs
··· 4 5 pub async fn firehose_cache( 6 cache_dir: impl AsRef<Path>, 7 ) -> Result<HybridCache<String, CachedRecord>, String> { 8 let cache = HybridCacheBuilder::new() 9 .with_name("firehose") 10 - .memory(64 * 2_usize.pow(20)) 11 .with_weighter(|k: &String, v| k.len() + std::mem::size_of_val(v)) 12 .storage(Engine::large()) 13 .with_device_options( 14 DirectFsDeviceOptions::new(cache_dir) 15 - .with_capacity(2_usize.pow(30)) // TODO: configurable (1GB to have something) 16 .with_file_size(16 * 2_usize.pow(20)), // note: this does limit the max cached item size, warning jumbo records 17 ) 18 .build()
··· 4 5 pub async fn firehose_cache( 6 cache_dir: impl AsRef<Path>, 7 + memory_mb: usize, 8 + disk_gb: usize, 9 ) -> Result<HybridCache<String, CachedRecord>, String> { 10 let cache = HybridCacheBuilder::new() 11 .with_name("firehose") 12 + .memory(memory_mb * 2_usize.pow(20)) 13 .with_weighter(|k: &String, v| k.len() + std::mem::size_of_val(v)) 14 .storage(Engine::large()) 15 .with_device_options( 16 DirectFsDeviceOptions::new(cache_dir) 17 + .with_capacity(disk_gb * 2_usize.pow(30)) 18 .with_file_size(16 * 2_usize.pow(20)), // note: this does limit the max cached item size, warning jumbo records 19 ) 20 .build()
+26 -5
slingshot/src/main.rs
··· 25 /// where to keep disk caches 26 #[arg(long)] 27 cache_dir: PathBuf, 28 /// the domain pointing to this server 29 /// 30 /// if present: ··· 62 63 let args = Args::parse(); 64 65 - if let Err(e) = install_metrics_server() { 66 log::error!("failed to install metrics server: {e:?}"); 67 } else { 68 - log::info!("metrics listening at http://0.0.0.0:8765"); 69 } 70 71 std::fs::create_dir_all(&args.cache_dir).map_err(|e| { ··· 83 log::info!("cache dir ready at at {cache_dir:?}."); 84 85 log::info!("setting up firehose cache..."); 86 - let cache = firehose_cache(cache_dir.join("./firehose")).await?; 87 log::info!("firehose cache ready."); 88 89 let mut tasks: tokio::task::JoinSet<Result<(), MainTaskError>> = tokio::task::JoinSet::new(); ··· 112 args.domain, 113 args.acme_contact, 114 args.certs, 115 server_shutdown, 116 ) 117 .await?; ··· 172 Ok(()) 173 } 174 175 - fn install_metrics_server() -> Result<(), metrics_exporter_prometheus::BuildError> { 176 log::info!("installing metrics server..."); 177 let host = [0, 0, 0, 0]; 178 - let port = 8765; 179 PrometheusBuilder::new() 180 .set_quantiles(&[0.5, 0.9, 0.99, 1.0])? 181 .set_bucket_duration(std::time::Duration::from_secs(300))?
··· 25 /// where to keep disk caches 26 #[arg(long)] 27 cache_dir: PathBuf, 28 + /// memory cache size in MB 29 + #[arg(long, default_value_t = 64)] 30 + cache_memory_mb: usize, 31 + /// disk cache size in GB 32 + #[arg(long, default_value_t = 1)] 33 + cache_disk_gb: usize, 34 + /// host for HTTP server (when not using --domain) 35 + #[arg(long, default_value = "127.0.0.1")] 36 + host: String, 37 + /// port for HTTP server (when not using --domain) 38 + #[arg(long, default_value_t = 3000)] 39 + port: u16, 40 + /// port for metrics/prometheus server 41 + #[arg(long, default_value_t = 8765)] 42 + metrics_port: u16, 43 /// the domain pointing to this server 44 /// 45 /// if present: ··· 77 78 let args = Args::parse(); 79 80 + if let Err(e) = install_metrics_server(args.metrics_port) { 81 log::error!("failed to install metrics server: {e:?}"); 82 } else { 83 + log::info!("metrics listening at http://0.0.0.0:{}", args.metrics_port); 84 } 85 86 std::fs::create_dir_all(&args.cache_dir).map_err(|e| { ··· 98 log::info!("cache dir ready at at {cache_dir:?}."); 99 100 log::info!("setting up firehose cache..."); 101 + let cache = firehose_cache( 102 + cache_dir.join("./firehose"), 103 + args.cache_memory_mb, 104 + args.cache_disk_gb, 105 + ) 106 + .await?; 107 log::info!("firehose cache ready."); 108 109 let mut tasks: tokio::task::JoinSet<Result<(), MainTaskError>> = tokio::task::JoinSet::new(); ··· 132 args.domain, 133 args.acme_contact, 134 args.certs, 135 + args.host, 136 + args.port, 137 server_shutdown, 138 ) 139 .await?; ··· 194 Ok(()) 195 } 196 197 + fn install_metrics_server(port: u16) -> Result<(), metrics_exporter_prometheus::BuildError> { 198 log::info!("installing metrics server..."); 199 let host = [0, 0, 0, 0]; 200 PrometheusBuilder::new() 201 .set_quantiles(&[0.5, 0.9, 0.99, 1.0])? 202 .set_bucket_duration(std::time::Duration::from_secs(300))?
+19 -12
slingshot/src/server.rs
··· 437 Ok(did) => did, 438 Err(_) => { 439 let Ok(alleged_handle) = Handle::new(identifier) else { 440 - return invalid("identifier was not a valid DID or handle"); 441 }; 442 443 match self.identity.handle_to_did(alleged_handle.clone()).await { ··· 453 Err(e) => { 454 log::debug!("failed to resolve handle: {e}"); 455 // TODO: ServerError not BadRequest 456 - return invalid("errored while trying to resolve handle to DID"); 457 } 458 } 459 } 460 }; 461 let Ok(partial_doc) = self.identity.did_to_partial_mini_doc(&did).await else { 462 - return invalid("failed to get DID doc"); 463 }; 464 let Some(partial_doc) = partial_doc else { 465 - return invalid("failed to find DID doc"); 466 }; 467 468 // ok so here's where we're at: ··· 483 .handle_to_did(partial_doc.unverified_handle.clone()) 484 .await 485 else { 486 - return invalid("failed to get did doc's handle"); 487 }; 488 let Some(handle_did) = handle_did else { 489 - return invalid("failed to resolve did doc's handle"); 490 }; 491 if handle_did == did { 492 partial_doc.unverified_handle.to_string() ··· 516 let Ok(handle) = Handle::new(repo) else { 517 return GetRecordResponse::BadRequest(xrpc_error( 518 "InvalidRequest", 519 - "repo was not a valid DID or handle", 520 )); 521 }; 522 match self.identity.handle_to_did(handle).await { ··· 534 log::debug!("handle resolution failed: {e}"); 535 return GetRecordResponse::ServerError(xrpc_error( 536 "ResolutionFailed", 537 - "errored while trying to resolve handle to DID", 538 )); 539 } 540 } ··· 544 let Ok(collection) = Nsid::new(collection) else { 545 return GetRecordResponse::BadRequest(xrpc_error( 546 "InvalidRequest", 547 - "invalid NSID for collection", 548 )); 549 }; 550 551 let Ok(rkey) = RecordKey::new(rkey) else { 552 - return GetRecordResponse::BadRequest(xrpc_error("InvalidRequest", "invalid rkey")); 553 }; 554 555 let cid: Option<Cid> = if let Some(cid) = cid { 556 let Ok(cid) = Cid::from_str(&cid) else { 557 - return GetRecordResponse::BadRequest(xrpc_error("InvalidRequest", "invalid CID")); 558 }; 559 Some(cid) 560 } else { ··· 694 domain: Option<String>, 695 acme_contact: Option<String>, 696 certs: Option<PathBuf>, 697 shutdown: CancellationToken, 698 ) -> Result<(), ServerError> { 699 let repo = Arc::new(repo); ··· 752 ) 753 .await 754 } else { 755 - run(TcpListener::bind("127.0.0.1:3000"), app, shutdown).await 756 } 757 } 758
··· 437 Ok(did) => did, 438 Err(_) => { 439 let Ok(alleged_handle) = Handle::new(identifier) else { 440 + return invalid("Identifier was not a valid DID or handle"); 441 }; 442 443 match self.identity.handle_to_did(alleged_handle.clone()).await { ··· 453 Err(e) => { 454 log::debug!("failed to resolve handle: {e}"); 455 // TODO: ServerError not BadRequest 456 + return invalid("Errored while trying to resolve handle to DID"); 457 } 458 } 459 } 460 }; 461 let Ok(partial_doc) = self.identity.did_to_partial_mini_doc(&did).await else { 462 + return invalid("Failed to get DID doc"); 463 }; 464 let Some(partial_doc) = partial_doc else { 465 + return invalid("Failed to find DID doc"); 466 }; 467 468 // ok so here's where we're at: ··· 483 .handle_to_did(partial_doc.unverified_handle.clone()) 484 .await 485 else { 486 + return invalid("Failed to get DID doc's handle"); 487 }; 488 let Some(handle_did) = handle_did else { 489 + return invalid("Failed to resolve DID doc's handle"); 490 }; 491 if handle_did == did { 492 partial_doc.unverified_handle.to_string() ··· 516 let Ok(handle) = Handle::new(repo) else { 517 return GetRecordResponse::BadRequest(xrpc_error( 518 "InvalidRequest", 519 + "Repo was not a valid DID or handle", 520 )); 521 }; 522 match self.identity.handle_to_did(handle).await { ··· 534 log::debug!("handle resolution failed: {e}"); 535 return GetRecordResponse::ServerError(xrpc_error( 536 "ResolutionFailed", 537 + "Errored while trying to resolve handle to DID", 538 )); 539 } 540 } ··· 544 let Ok(collection) = Nsid::new(collection) else { 545 return GetRecordResponse::BadRequest(xrpc_error( 546 "InvalidRequest", 547 + "Invalid NSID for collection", 548 )); 549 }; 550 551 let Ok(rkey) = RecordKey::new(rkey) else { 552 + return GetRecordResponse::BadRequest(xrpc_error("InvalidRequest", "Invalid rkey")); 553 }; 554 555 let cid: Option<Cid> = if let Some(cid) = cid { 556 let Ok(cid) = Cid::from_str(&cid) else { 557 + return GetRecordResponse::BadRequest(xrpc_error("InvalidRequest", "Invalid CID")); 558 }; 559 Some(cid) 560 } else { ··· 694 domain: Option<String>, 695 acme_contact: Option<String>, 696 certs: Option<PathBuf>, 697 + host: String, 698 + port: u16, 699 shutdown: CancellationToken, 700 ) -> Result<(), ServerError> { 701 let repo = Arc::new(repo); ··· 754 ) 755 .await 756 } else { 757 + run( 758 + TcpListener::bind(format!("{host}:{port}")), 759 + app, 760 + shutdown, 761 + ) 762 + .await 763 } 764 } 765