use anyhow::{anyhow, Result}; use jacquard::{ api::com_atproto::repo::{describe_repo::DescribeRepo, list_records::ListRecords}, client::{credential_session::CredentialSession, Agent, BasicClient, MemorySessionStore}, identity::{resolver::IdentityResolver, slingshot_resolver_default}, prelude::*, types::{did::Did, nsid::Nsid, string::Handle}, }; use scc::{HashMap, HashSet}; use url::Url; use std::{ collections::HashMap as StdHashMap, fmt::Debug, io::{self, ErrorKind}, sync::Arc, }; pub mod cli; #[cfg(target_os = "linux")] pub mod fuse; pub async fn resolve_did(identifier: &str) -> Result> { if let Ok(did) = Did::new_owned(identifier) { return Ok(did); } let handle = Handle::new(identifier).map_err(|e| anyhow!("invalid handle: {}", e))?; let did = slingshot_resolver_default() .resolve_handle(&handle) .await .map_err(|e| anyhow!("resolution failed: {}", e))?; Ok(did) } pub async fn resolve_pds(did: &Did<'_>) -> Result { slingshot_resolver_default() .resolve_did_doc(did) .await? .parse()? .pds_endpoint() .ok_or_else(|| anyhow!("no pds endpoint in did doc")) } #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum FileType { File, Directory, } #[derive(Debug, Clone)] pub struct Metadata { pub file_type: FileType, pub len: u64, } #[derive(Debug)] struct CachedPage { files: StdHashMap>>, next_cursor: Option, } pub struct AtpFS { did: Did<'static>, client: BasicClient, cache: HashMap>, root_cache: HashSet, } impl Debug for AtpFS { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("AtpFS").field("did", &self.did).finish() } } impl AtpFS { pub async fn new(did: Did<'static>, pds: Url) -> Self { let store = MemorySessionStore::default(); let session = CredentialSession::new(Arc::new(store), Arc::new(slingshot_resolver_default())); session.set_endpoint(pds).await; Self { did, client: Agent::new(session), cache: HashMap::default(), root_cache: HashSet::default(), } } fn segments<'a, 's>(&'s self, path: &'a str) -> Vec<&'a str> { path.trim_matches('/') .split('/') .filter(|s| !s.is_empty()) .collect() } fn dir_metadata() -> Metadata { Metadata { file_type: FileType::Directory, len: 0, } } fn file_metadata(len: u64) -> Metadata { Metadata { file_type: FileType::File, len, } } async fn ensure_root_loaded(&self) -> io::Result<()> { if self.root_cache.is_empty() { let request = DescribeRepo::new().repo(self.did.clone()).build(); let response = self .client .send(request) .await .map_err(|e| io::Error::new(ErrorKind::Other, e.to_string()))?; let output = response .into_output() .map_err(|e| io::Error::new(ErrorKind::Other, e.to_string()))?; for col in output.collections { let _ = self.root_cache.insert_async(col.to_string()).await; } } return Ok(()); } async fn ensure_loaded(&self, path: &str) -> io::Result { let segs = self.segments(path); if segs.is_empty() { self.ensure_root_loaded().await?; return Ok("".to_string()); } let collection = segs[0]; if self.root_cache.is_empty() { self.ensure_root_loaded().await?; } if !self.root_cache.contains(collection) { return Err(ErrorKind::NotFound.into()); } let mut current_key = collection.to_string(); let mut parent_cursor: Option = None; for (_i, segment) in segs.iter().enumerate().skip(1) { if *segment == "next" { self.fetch_page_if_missing(¤t_key, parent_cursor.clone()) .await?; let next_cursor_val = self.cache.read(¤t_key, |_, v| v.next_cursor.clone()); if let Some(Some(cursor)) = next_cursor_val { parent_cursor = Some(cursor); current_key = format!("{}/next", current_key); } else { return Err(ErrorKind::NotFound.into()); } } else if segment.ends_with(".json") { break; } else { return Err(ErrorKind::NotFound.into()); } } self.fetch_page_if_missing(¤t_key, parent_cursor) .await?; Ok(current_key) } async fn fetch_page_if_missing(&self, key: &str, cursor: Option) -> io::Result<()> { if self.cache.contains(key) { return Ok(()); } let parts: Vec<&str> = key.split('/').collect(); let collection = parts[0]; let request = ListRecords::new() .repo(self.did.clone()) .collection(Nsid::new(collection).expect("nsid should be valid")) .limit(100) .cursor(cursor.map(Into::into)); let response = self .client .send(request.build()) .await .map_err(|e| io::Error::new(ErrorKind::Other, e.to_string()))?; let output = response .into_output() .map_err(|e| io::Error::new(ErrorKind::Other, e.to_string()))?; let mut files = StdHashMap::new(); for rec in output.records { if let Some(rkey) = rec.uri.rkey() { let filename = format!("{}.json", rkey.0); let content = serde_json::to_vec_pretty(&rec.value).unwrap_or_default(); files.insert(filename, Arc::new(content)); } } let _ = self .cache .insert_async( key.to_string(), Arc::new(CachedPage { files, next_cursor: output.cursor.map(Into::into), }), ) .await; Ok(()) } pub async fn read_dir(&self, path: &str) -> io::Result> { let segs = self.segments(path); if segs.is_empty() { self.ensure_root_loaded().await?; let mut keys = Vec::new(); self.root_cache.scan(|k| keys.push(k.clone())); return Ok(keys); } let cache_key = self.ensure_loaded(path).await?; if path.ends_with(".json") { return Err(io::Error::new(ErrorKind::Other, "not a directory")); } let page = self .cache .read(&cache_key, |_, v| v.clone()) .ok_or(ErrorKind::NotFound)?; let mut entries: Vec = page.files.keys().cloned().collect(); if page.next_cursor.is_some() { entries.push("next".to_string()); } Ok(entries) } pub async fn open_file(&self, path: &str) -> io::Result>> { let parent_path = std::path::Path::new(path) .parent() .unwrap_or(std::path::Path::new("")) .to_str() .unwrap(); let cache_key = self.ensure_loaded(parent_path).await?; let filename = path.split('/').last().ok_or(ErrorKind::NotFound)?; let content = self .cache .read(&cache_key, |_, page| page.files.get(filename).cloned()) .flatten(); content.ok_or(ErrorKind::NotFound.into()) } pub async fn metadata(&self, path: &str) -> io::Result { let segs = self.segments(path); if segs.is_empty() { return Ok(Self::dir_metadata()); } if segs.len() == 1 { self.ensure_root_loaded().await?; if self.root_cache.contains(segs[0]) { return Ok(Self::dir_metadata()); } else { return Err(ErrorKind::NotFound.into()); } } if let Some(last) = segs.last() { if *last == "next" { let parent = &path[0..path.len() - 5]; let cache_key = self.ensure_loaded(parent).await?; let has_next = self .cache .read(&cache_key, |_, v| v.next_cursor.is_some()) .unwrap_or(false); if has_next { return Ok(Self::dir_metadata()); } return Err(ErrorKind::NotFound.into()); } } if path.ends_with(".json") { let parent_path = std::path::Path::new(path) .parent() .unwrap() .to_str() .unwrap(); let cache_key = self.ensure_loaded(parent_path).await?; let filename = segs.last().unwrap(); let len = self .cache .read(&cache_key, |_, page| { page.files.get(*filename).map(|f| f.len()) }) .flatten(); if let Some(l) = len { return Ok(Self::file_metadata(l as u64)); } return Err(ErrorKind::NotFound.into()); } Err(ErrorKind::NotFound.into()) } pub async fn exists(&self, path: &str) -> io::Result { Ok(self.metadata(path).await.is_ok()) } }