atproto repo as vfs
at main 330 lines 9.7 kB view raw
1use anyhow::{anyhow, Result}; 2use jacquard::{ 3 api::com_atproto::repo::{describe_repo::DescribeRepo, list_records::ListRecords}, 4 client::{credential_session::CredentialSession, Agent, BasicClient, MemorySessionStore}, 5 identity::{resolver::IdentityResolver, slingshot_resolver_default}, 6 prelude::*, 7 types::{did::Did, nsid::Nsid, string::Handle}, 8}; 9use scc::{HashMap, HashSet}; 10use url::Url; 11 12use std::{ 13 collections::HashMap as StdHashMap, 14 fmt::Debug, 15 io::{self, ErrorKind}, 16 sync::Arc, 17}; 18 19pub mod cli; 20#[cfg(target_os = "linux")] 21pub mod fuse; 22 23pub async fn resolve_did(identifier: &str) -> Result<Did<'static>> { 24 if let Ok(did) = Did::new_owned(identifier) { 25 return Ok(did); 26 } 27 28 let handle = Handle::new(identifier).map_err(|e| anyhow!("invalid handle: {}", e))?; 29 let did = slingshot_resolver_default() 30 .resolve_handle(&handle) 31 .await 32 .map_err(|e| anyhow!("resolution failed: {}", e))?; 33 34 Ok(did) 35} 36 37pub async fn resolve_pds(did: &Did<'_>) -> Result<Url> { 38 slingshot_resolver_default() 39 .resolve_did_doc(did) 40 .await? 41 .parse()? 42 .pds_endpoint() 43 .ok_or_else(|| anyhow!("no pds endpoint in did doc")) 44} 45 46#[derive(Debug, Clone, Copy, PartialEq, Eq)] 47pub enum FileType { 48 File, 49 Directory, 50} 51 52#[derive(Debug, Clone)] 53pub struct Metadata { 54 pub file_type: FileType, 55 pub len: u64, 56} 57 58#[derive(Debug)] 59struct CachedPage { 60 files: StdHashMap<String, Arc<Vec<u8>>>, 61 next_cursor: Option<String>, 62} 63 64pub struct AtpFS { 65 did: Did<'static>, 66 client: BasicClient, 67 cache: HashMap<String, Arc<CachedPage>>, 68 root_cache: HashSet<String>, 69} 70 71impl Debug for AtpFS { 72 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 73 f.debug_struct("AtpFS").field("did", &self.did).finish() 74 } 75} 76 77impl AtpFS { 78 pub async fn new(did: Did<'static>, pds: Url) -> Self { 79 let store = MemorySessionStore::default(); 80 let session = 81 CredentialSession::new(Arc::new(store), Arc::new(slingshot_resolver_default())); 82 83 session.set_endpoint(pds).await; 84 85 Self { 86 did, 87 client: Agent::new(session), 88 cache: HashMap::default(), 89 root_cache: HashSet::default(), 90 } 91 } 92 93 fn segments<'a, 's>(&'s self, path: &'a str) -> Vec<&'a str> { 94 path.trim_matches('/') 95 .split('/') 96 .filter(|s| !s.is_empty()) 97 .collect() 98 } 99 100 fn dir_metadata() -> Metadata { 101 Metadata { 102 file_type: FileType::Directory, 103 len: 0, 104 } 105 } 106 107 fn file_metadata(len: u64) -> Metadata { 108 Metadata { 109 file_type: FileType::File, 110 len, 111 } 112 } 113 114 async fn ensure_root_loaded(&self) -> io::Result<()> { 115 if self.root_cache.is_empty() { 116 let request = DescribeRepo::new().repo(self.did.clone()).build(); 117 118 let response = self 119 .client 120 .send(request) 121 .await 122 .map_err(|e| io::Error::new(ErrorKind::Other, e.to_string()))?; 123 124 let output = response 125 .into_output() 126 .map_err(|e| io::Error::new(ErrorKind::Other, e.to_string()))?; 127 128 for col in output.collections { 129 let _ = self.root_cache.insert_async(col.to_string()).await; 130 } 131 } 132 return Ok(()); 133 } 134 135 async fn ensure_loaded(&self, path: &str) -> io::Result<String> { 136 let segs = self.segments(path); 137 138 if segs.is_empty() { 139 self.ensure_root_loaded().await?; 140 return Ok("".to_string()); 141 } 142 143 let collection = segs[0]; 144 if self.root_cache.is_empty() { 145 self.ensure_root_loaded().await?; 146 } 147 148 if !self.root_cache.contains(collection) { 149 return Err(ErrorKind::NotFound.into()); 150 } 151 152 let mut current_key = collection.to_string(); 153 let mut parent_cursor: Option<String> = None; 154 155 for (_i, segment) in segs.iter().enumerate().skip(1) { 156 if *segment == "next" { 157 self.fetch_page_if_missing(&current_key, parent_cursor.clone()) 158 .await?; 159 let next_cursor_val = self.cache.read(&current_key, |_, v| v.next_cursor.clone()); 160 161 if let Some(Some(cursor)) = next_cursor_val { 162 parent_cursor = Some(cursor); 163 current_key = format!("{}/next", current_key); 164 } else { 165 return Err(ErrorKind::NotFound.into()); 166 } 167 } else if segment.ends_with(".json") { 168 break; 169 } else { 170 return Err(ErrorKind::NotFound.into()); 171 } 172 } 173 174 self.fetch_page_if_missing(&current_key, parent_cursor) 175 .await?; 176 Ok(current_key) 177 } 178 179 async fn fetch_page_if_missing(&self, key: &str, cursor: Option<String>) -> io::Result<()> { 180 if self.cache.contains(key) { 181 return Ok(()); 182 } 183 184 let parts: Vec<&str> = key.split('/').collect(); 185 let collection = parts[0]; 186 187 let request = ListRecords::new() 188 .repo(self.did.clone()) 189 .collection(Nsid::new(collection).expect("nsid should be valid")) 190 .limit(100) 191 .cursor(cursor.map(Into::into)); 192 193 let response = self 194 .client 195 .send(request.build()) 196 .await 197 .map_err(|e| io::Error::new(ErrorKind::Other, e.to_string()))?; 198 199 let output = response 200 .into_output() 201 .map_err(|e| io::Error::new(ErrorKind::Other, e.to_string()))?; 202 203 let mut files = StdHashMap::new(); 204 for rec in output.records { 205 if let Some(rkey) = rec.uri.rkey() { 206 let filename = format!("{}.json", rkey.0); 207 let content = serde_json::to_vec_pretty(&rec.value).unwrap_or_default(); 208 files.insert(filename, Arc::new(content)); 209 } 210 } 211 212 let _ = self 213 .cache 214 .insert_async( 215 key.to_string(), 216 Arc::new(CachedPage { 217 files, 218 next_cursor: output.cursor.map(Into::into), 219 }), 220 ) 221 .await; 222 223 Ok(()) 224 } 225 226 pub async fn read_dir(&self, path: &str) -> io::Result<Vec<String>> { 227 let segs = self.segments(path); 228 229 if segs.is_empty() { 230 self.ensure_root_loaded().await?; 231 let mut keys = Vec::new(); 232 self.root_cache.scan(|k| keys.push(k.clone())); 233 return Ok(keys); 234 } 235 236 let cache_key = self.ensure_loaded(path).await?; 237 238 if path.ends_with(".json") { 239 return Err(io::Error::new(ErrorKind::Other, "not a directory")); 240 } 241 242 let page = self 243 .cache 244 .read(&cache_key, |_, v| v.clone()) 245 .ok_or(ErrorKind::NotFound)?; 246 247 let mut entries: Vec<String> = page.files.keys().cloned().collect(); 248 if page.next_cursor.is_some() { 249 entries.push("next".to_string()); 250 } 251 252 Ok(entries) 253 } 254 255 pub async fn open_file(&self, path: &str) -> io::Result<Arc<Vec<u8>>> { 256 let parent_path = std::path::Path::new(path) 257 .parent() 258 .unwrap_or(std::path::Path::new("")) 259 .to_str() 260 .unwrap(); 261 let cache_key = self.ensure_loaded(parent_path).await?; 262 let filename = path.split('/').last().ok_or(ErrorKind::NotFound)?; 263 264 let content = self 265 .cache 266 .read(&cache_key, |_, page| page.files.get(filename).cloned()) 267 .flatten(); 268 269 content.ok_or(ErrorKind::NotFound.into()) 270 } 271 272 pub async fn metadata(&self, path: &str) -> io::Result<Metadata> { 273 let segs = self.segments(path); 274 if segs.is_empty() { 275 return Ok(Self::dir_metadata()); 276 } 277 278 if segs.len() == 1 { 279 self.ensure_root_loaded().await?; 280 if self.root_cache.contains(segs[0]) { 281 return Ok(Self::dir_metadata()); 282 } else { 283 return Err(ErrorKind::NotFound.into()); 284 } 285 } 286 287 if let Some(last) = segs.last() { 288 if *last == "next" { 289 let parent = &path[0..path.len() - 5]; 290 let cache_key = self.ensure_loaded(parent).await?; 291 let has_next = self 292 .cache 293 .read(&cache_key, |_, v| v.next_cursor.is_some()) 294 .unwrap_or(false); 295 if has_next { 296 return Ok(Self::dir_metadata()); 297 } 298 return Err(ErrorKind::NotFound.into()); 299 } 300 } 301 302 if path.ends_with(".json") { 303 let parent_path = std::path::Path::new(path) 304 .parent() 305 .unwrap() 306 .to_str() 307 .unwrap(); 308 let cache_key = self.ensure_loaded(parent_path).await?; 309 let filename = segs.last().unwrap(); 310 311 let len = self 312 .cache 313 .read(&cache_key, |_, page| { 314 page.files.get(*filename).map(|f| f.len()) 315 }) 316 .flatten(); 317 318 if let Some(l) = len { 319 return Ok(Self::file_metadata(l as u64)); 320 } 321 return Err(ErrorKind::NotFound.into()); 322 } 323 324 Err(ErrorKind::NotFound.into()) 325 } 326 327 pub async fn exists(&self, path: &str) -> io::Result<bool> { 328 Ok(self.metadata(path).await.is_ok()) 329 } 330}