at main 519 lines 18 kB view raw
1use crate::cache_impl; 2use crate::fetch::Fetcher; 3#[cfg(all(feature = "fullstack-server", feature = "server"))] 4use axum::Extension; 5use dioxus::prelude::*; 6use dioxus::{CapturedError, Result}; 7use jacquard::{ 8 IntoStatic, 9 bytes::Bytes, 10 prelude::*, 11 smol_str::{SmolStr, format_smolstr}, 12 types::{cid::Cid, collection::Collection, ident::AtIdentifier, nsid::Nsid, string::Rkey}, 13 xrpc::XrpcExt, 14}; 15use std::{sync::Arc, time::Duration}; 16use weaver_api::com_atproto::repo::get_record::GetRecord; 17use weaver_api::com_atproto::sync::get_blob::GetBlob; 18use weaver_api::sh_weaver::notebook::entry::Entry; 19use weaver_api::sh_weaver::publish::blob::Blob as PublishedBlob; 20use weaver_common::WeaverExt; 21 22#[derive(Clone)] 23pub struct BlobCache { 24 fetcher: Arc<Fetcher>, 25 cache: cache_impl::Cache<Cid<'static>, Bytes>, 26 map: cache_impl::Cache<SmolStr, Cid<'static>>, 27} 28 29impl BlobCache { 30 pub fn new(fetcher: Arc<Fetcher>) -> Self { 31 let cache = cache_impl::new_cache(100, Duration::from_secs(12000)); 32 let map = cache_impl::new_cache(500, Duration::from_secs(12000)); 33 34 Self { 35 fetcher, 36 cache, 37 map, 38 } 39 } 40 41 /// Resolve DID and PDS URL from an identifier 42 async fn resolve_ident( 43 &self, 44 ident: &AtIdentifier<'_>, 45 ) -> Result<(jacquard::types::string::Did<'static>, jacquard::url::Url)> { 46 match ident { 47 AtIdentifier::Did(did) => { 48 let pds = self.fetcher.pds_for_did(did).await?; 49 Ok((did.clone().into_static(), pds)) 50 } 51 AtIdentifier::Handle(handle) => { 52 let (did, pds) = self.fetcher.pds_for_handle(handle).await?; 53 Ok((did, pds)) 54 } 55 } 56 } 57 58 /// Fetch a blob by CID from a specific DID's PDS 59 async fn fetch_blob( 60 &self, 61 did: &jacquard::types::string::Did<'_>, 62 pds_url: jacquard::url::Url, 63 cid: &Cid<'_>, 64 ) -> Result<Bytes> { 65 match self 66 .fetcher 67 .xrpc(pds_url.clone()) 68 .send(&GetBlob::new().cid(cid.clone()).did(did.clone()).build()) 69 .await 70 { 71 Ok(blob_stream) => Ok(blob_stream.buffer().clone()), 72 Err(e) => { 73 tracing::warn!( 74 did = %did, 75 cid = %cid, 76 pds = %pds_url, 77 error = %e, 78 "PDS blob fetch failed, falling back to Bluesky CDN" 79 ); 80 // Fallback to Bluesky CDN (works for blobs stored on bsky PDSes) 81 let bytes = reqwest::get(format!( 82 "https://cdn.bsky.app/img/feed_fullsize/plain/{}/{}@jpeg", 83 did, cid 84 )) 85 .await? 86 .bytes() 87 .await?; 88 Ok(bytes) 89 } 90 } 91 } 92 93 pub async fn cache( 94 &self, 95 ident: AtIdentifier<'static>, 96 cid: Cid<'static>, 97 name: Option<SmolStr>, 98 ) -> Result<()> { 99 let (repo_did, pds_url) = self.resolve_ident(&ident).await?; 100 101 if self.get_cid(&cid).is_some() { 102 return Ok(()); 103 } 104 105 let blob = self.fetch_blob(&repo_did, pds_url, &cid).await?; 106 107 self.cache.insert(cid.clone(), blob); 108 if let Some(name) = name { 109 self.map.insert(name, cid); 110 } 111 112 Ok(()) 113 } 114 115 /// Resolve an image from a published entry by name. 116 /// 117 /// Looks up the entry record at `{ident}/sh.weaver.notebook.entry/{rkey}`, 118 /// finds the image by name in the embeds, and returns the blob bytes. 119 pub async fn resolve_from_entry( 120 &self, 121 ident: &AtIdentifier<'_>, 122 rkey: &str, 123 name: &str, 124 ) -> Result<Bytes> { 125 let (repo_did, pds_url) = self.resolve_ident(ident).await?; 126 127 // Fetch the entry record 128 let resp = self 129 .fetcher 130 .xrpc(pds_url.clone()) 131 .send( 132 &GetRecord::new() 133 .repo(AtIdentifier::Did(repo_did.clone())) 134 .collection(Nsid::raw(<Entry as Collection>::NSID)) 135 .rkey(Rkey::new(rkey).map_err(|e| CapturedError::from_display(e))?) 136 .build(), 137 ) 138 .await 139 .map_err(|e| { 140 CapturedError::from_display( 141 format_smolstr!("Failed to fetch entry: {}", e) 142 .as_str() 143 .to_string(), 144 ) 145 })?; 146 147 let record = resp.into_output().map_err(|e| { 148 CapturedError::from_display( 149 format_smolstr!("Failed to parse entry: {}", e) 150 .as_str() 151 .to_string(), 152 ) 153 })?; 154 155 // Parse the entry 156 let entry: Entry = jacquard::from_data(&record.value).map_err(|e| { 157 CapturedError::from_display( 158 format_smolstr!("Failed to deserialize entry: {}", e) 159 .as_str() 160 .to_string(), 161 ) 162 })?; 163 164 // Find the image by name 165 let cid = entry 166 .embeds 167 .as_ref() 168 .and_then(|e| e.images.as_ref()) 169 .and_then(|imgs| { 170 imgs.images 171 .iter() 172 .find(|img| img.name.as_ref().map(|n| n.as_ref()) == Some(name)) 173 }) 174 .map(|img| img.image.blob().cid().clone().into_static()) 175 .ok_or_else(|| { 176 CapturedError::from_display( 177 format_smolstr!("Image '{}' not found in entry", name) 178 .as_str() 179 .to_string(), 180 ) 181 })?; 182 183 // Check cache first 184 if let Some(bytes) = self.get_cid(&cid) { 185 return Ok(bytes); 186 } 187 188 // Fetch and cache the blob 189 let blob = self.fetch_blob(&repo_did, pds_url, &cid).await?; 190 self.cache.insert(cid.clone(), blob.clone()); 191 self.map.insert(name.into(), cid); 192 193 Ok(blob) 194 } 195 196 /// Resolve an image from a draft (unpublished) entry via PublishedBlob record. 197 /// 198 /// Looks up the PublishedBlob record at `{ident}/sh.weaver.publish.blob/{blob_rkey}`, 199 /// gets the CID from it, and returns the blob bytes. 200 pub async fn resolve_from_draft( 201 &self, 202 ident: &AtIdentifier<'_>, 203 blob_rkey: &str, 204 ) -> Result<Bytes> { 205 let (repo_did, pds_url) = self.resolve_ident(ident).await?; 206 207 // Fetch the PublishedBlob record 208 let resp = self 209 .fetcher 210 .xrpc(pds_url.clone()) 211 .send( 212 &GetRecord::new() 213 .repo(AtIdentifier::Did(repo_did.clone())) 214 .collection(Nsid::raw(<PublishedBlob as Collection>::NSID)) 215 .rkey(Rkey::new(blob_rkey).map_err(|e| CapturedError::from_display(e))?) 216 .build(), 217 ) 218 .await 219 .map_err(|e| { 220 CapturedError::from_display( 221 format_smolstr!("Failed to fetch PublishedBlob: {}", e) 222 .as_str() 223 .to_string(), 224 ) 225 })?; 226 227 let record = resp.into_output().map_err(|e| { 228 CapturedError::from_display( 229 format_smolstr!("Failed to parse PublishedBlob: {}", e) 230 .as_str() 231 .to_string(), 232 ) 233 })?; 234 235 // Parse the PublishedBlob 236 let published: PublishedBlob = jacquard::from_data(&record.value).map_err(|e| { 237 CapturedError::from_display( 238 format_smolstr!("Failed to deserialize PublishedBlob: {}", e) 239 .as_str() 240 .to_string(), 241 ) 242 })?; 243 244 // Get CID from the upload blob ref 245 let cid = published.upload.blob().cid().clone().into_static(); 246 247 // Check cache first 248 if let Some(bytes) = self.get_cid(&cid) { 249 return Ok(bytes); 250 } 251 252 // Fetch and cache the blob 253 let blob = self.fetch_blob(&repo_did, pds_url, &cid).await?; 254 self.cache.insert(cid, blob.clone()); 255 256 Ok(blob) 257 } 258 259 /// Resolve an image from a notebook entry by name. 260 /// 261 /// Looks up the notebook by title or path, iterates through entries to find 262 /// the image by name, and returns the blob bytes. Used for `/image/{notebook}/{name}` paths. 263 /// Cache key uses `{notebook_key}_{image_name}` to avoid collisions across notebooks. 264 pub async fn resolve_from_notebook( 265 &self, 266 notebook_key: &str, 267 image_name: &str, 268 ) -> Result<Bytes> { 269 // Try scoped cache key first: {notebook_key}_{image_name} 270 let cache_key = format_smolstr!("{}_{}", notebook_key, image_name); 271 if let Some(bytes) = self.get_named(&cache_key) { 272 return Ok(bytes); 273 } 274 275 // Use Fetcher's notebook lookup (works with title or path) 276 let notebook = self 277 .fetcher 278 .get_notebook_by_key(notebook_key) 279 .await? 280 .ok_or_else(|| { 281 CapturedError::from_display( 282 format_smolstr!("Notebook '{}' not found", notebook_key) 283 .as_str() 284 .to_string(), 285 ) 286 })?; 287 288 let (view, entry_refs) = notebook.as_ref(); 289 290 // Get the DID from the notebook URI for blob fetching 291 let notebook_did = jacquard::types::aturi::AtUri::new(view.uri.as_ref()) 292 .map_err(|e| { 293 CapturedError::from_display( 294 format_smolstr!("Invalid notebook URI: {}", e) 295 .as_str() 296 .to_string(), 297 ) 298 })? 299 .authority() 300 .clone() 301 .into_static(); 302 let repo_did = match &notebook_did { 303 AtIdentifier::Did(d) => d.clone(), 304 AtIdentifier::Handle(h) => self 305 .fetcher 306 .resolve_handle(h) 307 .await 308 .map_err(|e| CapturedError::from_display(e))?, 309 }; 310 let pds_url = self 311 .fetcher 312 .pds_for_did(&repo_did) 313 .await 314 .map_err(|e| CapturedError::from_display(e))?; 315 316 // Iterate through entries to find the image 317 let client = self.fetcher.get_client(); 318 for entry_ref in entry_refs { 319 // Parse the entry URI to get rkey 320 let entry_uri = jacquard::types::aturi::AtUri::new(entry_ref.entry.uri.as_ref()) 321 .map_err(|e| { 322 CapturedError::from_display( 323 format_smolstr!("Invalid entry URI: {}", e) 324 .as_str() 325 .to_string(), 326 ) 327 })?; 328 let rkey = entry_uri 329 .rkey() 330 .ok_or_else(|| CapturedError::from_display("Entry URI missing rkey"))?; 331 332 // Fetch entry using client's cached method 333 let (_entry_view, entry) = match client 334 .fetch_entry_by_rkey(&notebook_did, rkey.0.as_str()) 335 .await 336 { 337 Ok(result) => result, 338 Err(_) => continue, 339 }; 340 341 // Check if this entry has the image we're looking for 342 if let Some(embeds) = &entry.embeds { 343 if let Some(images) = &embeds.images { 344 if let Some(img) = images 345 .images 346 .iter() 347 .find(|i| i.name.as_deref() == Some(image_name)) 348 { 349 let cid = img.image.blob().cid().clone().into_static(); 350 351 // Check blob cache 352 if let Some(bytes) = self.get_cid(&cid) { 353 // Also cache with scoped key for next time 354 self.map.insert(cache_key, cid); 355 return Ok(bytes); 356 } 357 358 // Fetch and cache the blob 359 let blob = self.fetch_blob(&repo_did, pds_url, &cid).await?; 360 self.cache.insert(cid.clone(), blob.clone()); 361 self.map.insert(cache_key, cid); 362 return Ok(blob); 363 } 364 } 365 } 366 } 367 368 Err(CapturedError::from_display( 369 format_smolstr!( 370 "Image '{}' not found in notebook '{}'", 371 image_name, 372 notebook_key 373 ) 374 .as_str() 375 .to_string(), 376 )) 377 } 378 379 /// Insert bytes directly into cache (for pre-warming after upload) 380 pub fn insert_bytes(&self, cid: Cid<'static>, bytes: Bytes, name: Option<SmolStr>) { 381 self.cache.insert(cid.clone(), bytes); 382 if let Some(name) = name { 383 self.map.insert(name, cid); 384 } 385 } 386 387 pub fn get_cid(&self, cid: &Cid<'static>) -> Option<Bytes> { 388 self.cache.get(cid) 389 } 390 391 pub fn get_named(&self, name: &SmolStr) -> Option<Bytes> { 392 self.map.get(name).and_then(|cid| self.cache.get(&cid)) 393 } 394} 395 396/// Build an image response with appropriate headers for immutable blobs. 397#[cfg(all(feature = "fullstack-server", feature = "server"))] 398fn build_image_response(bytes: jacquard::bytes::Bytes) -> axum::response::Response { 399 use axum::{ 400 http::header::{CACHE_CONTROL, CONTENT_TYPE}, 401 response::IntoResponse, 402 }; 403 use mime_sniffer::MimeTypeSniffer; 404 405 let mime = bytes.sniff_mime_type().unwrap_or("image/jpg").to_string(); 406 ( 407 [ 408 (CONTENT_TYPE, mime), 409 ( 410 CACHE_CONTROL, 411 "public, max-age=31536000, immutable".to_string(), 412 ), 413 ], 414 bytes, 415 ) 416 .into_response() 417} 418 419/// Return a 404 response for missing images. 420#[cfg(all(feature = "fullstack-server", feature = "server"))] 421fn image_not_found() -> axum::response::Response { 422 use axum::{http::StatusCode, response::IntoResponse}; 423 (StatusCode::NOT_FOUND, "Image not found").into_response() 424} 425 426#[cfg(all(feature = "fullstack-server", feature = "server"))] 427#[get("/{notebook}/image/{name}", blob_cache: Extension<Arc<crate::blobcache::BlobCache>>)] 428pub async fn image_named(notebook: SmolStr, name: SmolStr) -> Result<axum::response::Response> { 429 if let Some(bytes) = blob_cache.get_named(&name) { 430 return Ok(build_image_response(bytes)); 431 } 432 433 // Try to resolve from notebook 434 match blob_cache.resolve_from_notebook(&notebook, &name).await { 435 Ok(bytes) => Ok(build_image_response(bytes)), 436 Err(_) => Ok(image_not_found()), 437 } 438} 439 440#[cfg(all(feature = "fullstack-server", feature = "server"))] 441#[get("/{_notebook}/blob/{cid}", blob_cache: Extension<Arc<crate::blobcache::BlobCache>>)] 442pub async fn blob(_notebook: SmolStr, cid: SmolStr) -> Result<axum::response::Response> { 443 match Cid::new_owned(cid.as_bytes()) { 444 Ok(cid) => { 445 if let Some(bytes) = blob_cache.get_cid(&cid) { 446 Ok(build_image_response(bytes)) 447 } else { 448 Ok(image_not_found()) 449 } 450 } 451 Err(_) => Ok(image_not_found()), 452 } 453} 454 455// Route: /image/{notebook}/{name} - notebook entry image by name 456#[cfg(all(feature = "fullstack-server", feature = "server"))] 457#[get("/image/{notebook}/{name}", blob_cache: Extension<Arc<crate::blobcache::BlobCache>>)] 458pub async fn image_notebook(notebook: SmolStr, name: SmolStr) -> Result<axum::response::Response> { 459 // Try name-based lookup first (backwards compat with cached entries) 460 if let Some(bytes) = blob_cache.get_named(&name) { 461 return Ok(build_image_response(bytes)); 462 } 463 464 // Try to resolve from notebook 465 match blob_cache.resolve_from_notebook(&notebook, &name).await { 466 Ok(bytes) => Ok(build_image_response(bytes)), 467 Err(_) => Ok(image_not_found()), 468 } 469} 470 471// Route: /image/{ident}/draft/{blob_rkey} - draft image (unpublished) 472#[cfg(all(feature = "fullstack-server", feature = "server"))] 473#[get("/image/{ident}/draft/{blob_rkey}", blob_cache: Extension<Arc<crate::blobcache::BlobCache>>)] 474pub async fn image_draft(ident: SmolStr, blob_rkey: SmolStr) -> Result<axum::response::Response> { 475 let Ok(at_ident) = AtIdentifier::new_owned(ident.clone()) else { 476 return Ok(image_not_found()); 477 }; 478 479 match blob_cache.resolve_from_draft(&at_ident, &blob_rkey).await { 480 Ok(bytes) => Ok(build_image_response(bytes)), 481 Err(_) => Ok(image_not_found()), 482 } 483} 484 485// Route: /image/{ident}/draft/{blob_rkey}/{name} - draft image with name (name is decorative) 486#[cfg(all(feature = "fullstack-server", feature = "server"))] 487#[get("/image/{ident}/draft/{blob_rkey}/{_name}", blob_cache: Extension<Arc<crate::blobcache::BlobCache>>)] 488pub async fn image_draft_named( 489 ident: SmolStr, 490 blob_rkey: SmolStr, 491 _name: SmolStr, 492) -> Result<axum::response::Response> { 493 let Ok(at_ident) = AtIdentifier::new_owned(ident.clone()) else { 494 return Ok(image_not_found()); 495 }; 496 497 match blob_cache.resolve_from_draft(&at_ident, &blob_rkey).await { 498 Ok(bytes) => Ok(build_image_response(bytes)), 499 Err(_) => Ok(image_not_found()), 500 } 501} 502 503// Route: /image/{ident}/{rkey}/{name} - published entry image 504#[cfg(all(feature = "fullstack-server", feature = "server"))] 505#[get("/image/{ident}/{rkey}/{name}", blob_cache: Extension<Arc<crate::blobcache::BlobCache>>)] 506pub async fn image_entry( 507 ident: SmolStr, 508 rkey: SmolStr, 509 name: SmolStr, 510) -> Result<axum::response::Response> { 511 let Ok(at_ident) = AtIdentifier::new_owned(ident.clone()) else { 512 return Ok(image_not_found()); 513 }; 514 515 match blob_cache.resolve_from_entry(&at_ident, &rkey, &name).await { 516 Ok(bytes) => Ok(build_image_response(bytes)), 517 Err(_) => Ok(image_not_found()), 518 } 519}