atproto repo as vfs

dont use vfs crate

ptr.pet 71bf0826 b85225eb

verified
Changed files
+153 -224
src
-33
Cargo.lock
··· 112 "serde_json", 113 "tokio", 114 "url", 115 - "vfs", 116 ] 117 118 [[package]] ··· 744 dependencies = [ 745 "rand_core 0.6.4", 746 "subtle", 747 - ] 748 - 749 - [[package]] 750 - name = "filetime" 751 - version = "0.2.26" 752 - source = "registry+https://github.com/rust-lang/crates.io-index" 753 - checksum = "bc0505cd1b6fa6580283f6bdf70a73fcf4aba1184038c90902b92b3dd0df63ed" 754 - dependencies = [ 755 - "cfg-if", 756 - "libc", 757 - "libredox", 758 - "windows-sys 0.60.2", 759 ] 760 761 [[package]] ··· 1728 version = "0.2.15" 1729 source = "registry+https://github.com/rust-lang/crates.io-index" 1730 checksum = "f9fbbcab51052fe104eb5e5d351cf728d30a5be1fe14d9be8a3b097481fb97de" 1731 - 1732 - [[package]] 1733 - name = "libredox" 1734 - version = "0.1.10" 1735 - source = "registry+https://github.com/rust-lang/crates.io-index" 1736 - checksum = "416f7e718bdb06000964960ffa43b4335ad4012ae8b99060261aa4a8088d5ccb" 1737 - dependencies = [ 1738 - "bitflags", 1739 - "libc", 1740 - "redox_syscall", 1741 - ] 1742 1743 [[package]] 1744 name = "litemap" ··· 3556 version = "0.9.5" 3557 source = "registry+https://github.com/rust-lang/crates.io-index" 3558 checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" 3559 - 3560 - [[package]] 3561 - name = "vfs" 3562 - version = "0.12.2" 3563 - source = "registry+https://github.com/rust-lang/crates.io-index" 3564 - checksum = "9e723b9e1c02a3cf9f9d0de6a4ddb8cdc1df859078902fe0ae0589d615711ae6" 3565 - dependencies = [ 3566 - "filetime", 3567 - ] 3568 3569 [[package]] 3570 name = "want"
··· 112 "serde_json", 113 "tokio", 114 "url", 115 ] 116 117 [[package]] ··· 743 dependencies = [ 744 "rand_core 0.6.4", 745 "subtle", 746 ] 747 748 [[package]] ··· 1715 version = "0.2.15" 1716 source = "registry+https://github.com/rust-lang/crates.io-index" 1717 checksum = "f9fbbcab51052fe104eb5e5d351cf728d30a5be1fe14d9be8a3b097481fb97de" 1718 1719 [[package]] 1720 name = "litemap" ··· 3532 version = "0.9.5" 3533 source = "registry+https://github.com/rust-lang/crates.io-index" 3534 checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" 3535 3536 [[package]] 3537 name = "want"
-1
Cargo.toml
··· 4 edition = "2021" 5 6 [dependencies] 7 - vfs = { version = "0.12" } 8 bpaf = { version = "0.9", features = ["derive"] } 9 anyhow = "1.0" 10 scc = "2.1"
··· 4 edition = "2021" 5 6 [dependencies] 7 bpaf = { version = "0.9", features = ["derive"] } 8 anyhow = "1.0" 9 scc = "2.1"
+24 -19
src/fuse.rs
··· 1 - use super::*; 2 use easy_fuser::{prelude::*, templates::DefaultFuseHandler}; 3 4 use std::{ 5 ffi::{OsStr, OsString}, 6 - io::{Read, Seek, SeekFrom}, 7 path::PathBuf, 8 time::UNIX_EPOCH, 9 }; 10 11 pub struct AtpFuse { 12 pub fs: Arc<AtpFS>, 13 pub inner: DefaultFuseHandler, 14 } 15 16 impl AtpFuse { ··· 45 46 fn vfs_metadata_attr(&self, vfs_path: &str) -> FuseResult<FileAttribute> { 47 let meta = self 48 - .fs 49 - .metadata(vfs_path) 50 .map_err(|_| ErrorKind::FileNotFound.to_error("Not found"))?; 51 52 let (kind, perm, nlink) = match meta.file_type { 53 - VfsFileType::Directory => (FileKind::Directory, 0o755, 2), 54 - VfsFileType::File => (FileKind::RegularFile, 0o644, 1), 55 }; 56 57 Ok(FileAttribute { ··· 115 let vfs_path = self.path_to_str(&file_id); 116 117 let stream = self 118 - .fs 119 - .read_dir(&vfs_path) 120 .map_err(|_| ErrorKind::InputOutputError.to_error("Read dir failed"))?; 121 122 let mut entries = vec![ ··· 125 ]; 126 127 for name in stream { 128 - let kind = name 129 - .ends_with(".json") 130 - .then_some(FileKind::RegularFile) 131 - .unwrap_or(FileKind::Directory); 132 entries.push((OsString::from(name), kind)); 133 } 134 ··· 145 _flags: FUSEOpenFlags, 146 _lock_owner: Option<u64>, 147 ) -> FuseResult<Vec<u8>> { 148 - let vfs_path = self.path_to_str(&file_id); 149 - let mut reader = self 150 - .fs 151 - .open_file(&vfs_path) 152 - .map_err(|_| ErrorKind::FileNotFound.to_error("File not found"))?; 153 - 154 // Only support absolute start seeks for now. 155 let pos = match seek { 156 SeekFrom::Start(p) => p, ··· 163 return Ok(Vec::new()); 164 } 165 166 // Seek to the requested position. 167 reader 168 .seek(SeekFrom::Start(pos)) 169 .map_err(|_| ErrorKind::InputOutputError.to_error("Seek failed"))?; 170 171 // Read up to `size` bytes into the buffer. 172 - // We use take to limit the read, then read_to_end or just read into buffer. 173 let mut buf = vec![0u8; size as usize]; 174 let n = reader 175 .read(&mut buf)
··· 1 use easy_fuser::{prelude::*, templates::DefaultFuseHandler}; 2 + use tokio::runtime::Handle; 3 4 use std::{ 5 ffi::{OsStr, OsString}, 6 + io::{Cursor, Read, Seek, SeekFrom}, 7 path::PathBuf, 8 + sync::Arc, 9 time::UNIX_EPOCH, 10 }; 11 12 + use crate::{AtpFS, FileType}; 13 + 14 pub struct AtpFuse { 15 pub fs: Arc<AtpFS>, 16 pub inner: DefaultFuseHandler, 17 + pub runtime: Handle, 18 } 19 20 impl AtpFuse { ··· 49 50 fn vfs_metadata_attr(&self, vfs_path: &str) -> FuseResult<FileAttribute> { 51 let meta = self 52 + .runtime 53 + .block_on(self.fs.metadata(vfs_path)) 54 .map_err(|_| ErrorKind::FileNotFound.to_error("Not found"))?; 55 56 let (kind, perm, nlink) = match meta.file_type { 57 + FileType::Directory => (FileKind::Directory, 0o755, 2), 58 + FileType::File => (FileKind::RegularFile, 0o644, 1), 59 }; 60 61 Ok(FileAttribute { ··· 119 let vfs_path = self.path_to_str(&file_id); 120 121 let stream = self 122 + .runtime 123 + .block_on(self.fs.read_dir(&vfs_path)) 124 .map_err(|_| ErrorKind::InputOutputError.to_error("Read dir failed"))?; 125 126 let mut entries = vec![ ··· 129 ]; 130 131 for name in stream { 132 + let kind = if name.ends_with(".json") { 133 + FileKind::RegularFile 134 + } else { 135 + FileKind::Directory 136 + }; 137 entries.push((OsString::from(name), kind)); 138 } 139 ··· 150 _flags: FUSEOpenFlags, 151 _lock_owner: Option<u64>, 152 ) -> FuseResult<Vec<u8>> { 153 // Only support absolute start seeks for now. 154 let pos = match seek { 155 SeekFrom::Start(p) => p, ··· 162 return Ok(Vec::new()); 163 } 164 165 + let vfs_path = self.path_to_str(&file_id); 166 + let data = self 167 + .runtime 168 + .block_on(self.fs.open_file(&vfs_path)) 169 + .map_err(|_| ErrorKind::FileNotFound.to_error("File not found"))?; 170 + let mut reader = Cursor::new(data.as_slice()); 171 + 172 // Seek to the requested position. 173 reader 174 .seek(SeekFrom::Start(pos)) 175 .map_err(|_| ErrorKind::InputOutputError.to_error("Seek failed"))?; 176 177 // Read up to `size` bytes into the buffer. 178 let mut buf = vec![0u8; size as usize]; 179 let n = reader 180 .read(&mut buf)
+122 -166
src/lib.rs
··· 1 use anyhow::{anyhow, Result}; 2 - #[cfg(target_arch = "wasm32")] 3 - use futures::executor::block_on; 4 use jacquard::{ 5 api::com_atproto::repo::{describe_repo::DescribeRepo, list_records::ListRecords}, 6 client::{credential_session::CredentialSession, Agent, BasicClient, MemorySessionStore}, 7 identity::{resolver::IdentityResolver, slingshot_resolver_default}, 8 types::{did::Did, nsid::Nsid, string::Handle}, 9 - xrpc::XrpcClient, 10 }; 11 use scc::{HashMap, HashSet}; 12 use url::Url; 13 - use vfs::{error::VfsErrorKind, FileSystem, SeekAndRead, VfsFileType, VfsMetadata, VfsResult}; 14 15 - use std::{collections::HashMap as StdHashMap, fmt::Debug, sync::Arc}; 16 17 pub mod cli; 18 #[cfg(target_os = "linux")] ··· 41 .ok_or_else(|| anyhow!("no pds endpoint in did doc")) 42 } 43 44 #[derive(Debug)] 45 struct CachedPage { 46 - files: StdHashMap<String, Vec<u8>>, 47 next_cursor: Option<String>, 48 } 49 ··· 52 client: BasicClient, 53 cache: HashMap<String, Arc<CachedPage>>, 54 root_cache: HashSet<String>, 55 - #[cfg(not(target_arch = "wasm32"))] 56 - handle: tokio::runtime::Handle, 57 } 58 59 impl Debug for AtpFS { 60 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 61 - f.debug_struct("AtProtoFS").field("did", &self.did).finish() 62 } 63 } 64 65 impl AtpFS { 66 - pub fn new(did: Did<'static>, pds: Url) -> Self { 67 - #[cfg(not(target_arch = "wasm32"))] 68 - let handle = tokio::runtime::Handle::current(); 69 - 70 let store = MemorySessionStore::default(); 71 let session = 72 CredentialSession::new(Arc::new(store), Arc::new(slingshot_resolver_default())); 73 74 - #[cfg(not(target_arch = "wasm32"))] 75 - tokio::task::block_in_place(|| handle.block_on(session.set_endpoint(pds))); 76 - 77 - #[cfg(target_arch = "wasm32")] 78 - block_on(session.set_endpoint(pds)); 79 80 Self { 81 did, 82 client: Agent::new(session), 83 cache: HashMap::default(), 84 root_cache: HashSet::default(), 85 - #[cfg(not(target_arch = "wasm32"))] 86 - handle, 87 } 88 } 89 90 - #[cfg(not(target_arch = "wasm32"))] 91 - fn block_on<F: std::future::Future>(&self, future: F) -> F::Output { 92 - tokio::task::block_in_place(move || self.handle.block_on(future)) 93 - } 94 - 95 - #[cfg(target_arch = "wasm32")] 96 - fn block_on<F: std::future::Future>(&self, future: F) -> F::Output { 97 - block_on(future) 98 - } 99 - 100 fn segments<'a, 's>(&'s self, path: &'a str) -> Vec<&'a str> { 101 path.trim_matches('/') 102 .split('/') ··· 104 .collect() 105 } 106 107 - fn vfs_dir_metadata() -> VfsMetadata { 108 - VfsMetadata { 109 - file_type: VfsFileType::Directory, 110 len: 0, 111 - created: None, 112 - modified: None, 113 - accessed: None, 114 } 115 } 116 117 - fn vfs_file_metadata(len: u64) -> VfsMetadata { 118 - VfsMetadata { 119 - file_type: VfsFileType::File, 120 len, 121 - created: None, 122 - modified: None, 123 - accessed: None, 124 } 125 } 126 127 - async fn ensure_root_loaded(&self) -> VfsResult<String> { 128 if self.root_cache.is_empty() { 129 let request = DescribeRepo::new().repo(self.did.clone()).build(); 130 ··· 132 .client 133 .send(request) 134 .await 135 - .map_err(|e| VfsErrorKind::Other(e.to_string()))?; 136 137 let output = response 138 .into_output() 139 - .map_err(|e| VfsErrorKind::Other(e.to_string()))?; 140 141 for col in output.collections { 142 let _ = self.root_cache.insert_async(col.to_string()).await; 143 } 144 } 145 - return Ok("".to_string()); 146 } 147 148 - async fn ensure_loaded(&self, path: &str) -> VfsResult<String> { 149 let segs = self.segments(path); 150 151 if segs.is_empty() { 152 - return self.ensure_root_loaded().await; 153 } 154 155 let collection = segs[0]; ··· 158 } 159 160 if !self.root_cache.contains(collection) { 161 - return Err(VfsErrorKind::FileNotFound.into()); 162 } 163 164 let mut current_key = collection.to_string(); ··· 174 parent_cursor = Some(cursor); 175 current_key = format!("{}/next", current_key); 176 } else { 177 - return Err(VfsErrorKind::FileNotFound.into()); 178 } 179 } else if segment.ends_with(".json") { 180 break; 181 } else { 182 - return Err(VfsErrorKind::FileNotFound.into()); 183 } 184 } 185 ··· 188 Ok(current_key) 189 } 190 191 - async fn fetch_page_if_missing(&self, key: &str, cursor: Option<String>) -> VfsResult<()> { 192 if self.cache.contains(key) { 193 return Ok(()); 194 } ··· 206 .client 207 .send(request.build()) 208 .await 209 - .map_err(|e| VfsErrorKind::Other(e.to_string()))?; 210 211 let output = response 212 .into_output() 213 - .map_err(|e| VfsErrorKind::Other(e.to_string()))?; 214 215 let mut files = StdHashMap::new(); 216 for rec in output.records { 217 if let Some(rkey) = rec.uri.rkey() { 218 let filename = format!("{}.json", rkey.0); 219 let content = serde_json::to_vec_pretty(&rec.value).unwrap_or_default(); 220 - files.insert(filename, content); 221 } 222 } 223 ··· 234 235 Ok(()) 236 } 237 - } 238 239 - impl FileSystem for AtpFS { 240 - fn read_dir(&self, path: &str) -> VfsResult<Box<dyn Iterator<Item = String> + Send>> { 241 - self.block_on(async { 242 - let segs = self.segments(path); 243 - 244 - if segs.is_empty() { 245 - self.ensure_root_loaded().await?; 246 - let mut keys = Vec::new(); 247 - self.root_cache.scan(|k| keys.push(k.clone())); 248 - return Ok(Box::new(keys.into_iter()) as Box<dyn Iterator<Item = String> + Send>); 249 - } 250 251 - let cache_key = self.ensure_loaded(path).await?; 252 253 - if path.ends_with(".json") { 254 - return Err(VfsErrorKind::Other("not a directory".into()).into()); 255 - } 256 257 - let page = self 258 - .cache 259 - .read(&cache_key, |_, v| v.clone()) 260 - .ok_or(VfsErrorKind::FileNotFound)?; 261 262 - let mut entries: Vec<String> = page.files.keys().cloned().collect(); 263 - if page.next_cursor.is_some() { 264 - entries.push("next".to_string()); 265 - } 266 267 - Ok(Box::new(entries.into_iter()) as Box<dyn Iterator<Item = String> + Send>) 268 - }) 269 - } 270 271 - fn create_dir(&self, _path: &str) -> VfsResult<()> { 272 - Err(VfsErrorKind::NotSupported.into()) 273 } 274 275 - fn open_file(&self, path: &str) -> VfsResult<Box<dyn SeekAndRead + Send>> { 276 - self.block_on(async { 277 - let parent_path = std::path::Path::new(path) 278 - .parent() 279 - .unwrap_or(std::path::Path::new("")) 280 - .to_str() 281 - .unwrap(); 282 - let cache_key = self.ensure_loaded(parent_path).await?; 283 - let filename = path.split('/').last().ok_or(VfsErrorKind::FileNotFound)?; 284 - 285 - let content = self 286 - .cache 287 - .read(&cache_key, |_, page| page.files.get(filename).cloned()) 288 - .flatten(); 289 290 - if let Some(data) = content { 291 - return Ok(Box::new(std::io::Cursor::new(data)) as Box<dyn SeekAndRead + Send>); 292 - } 293 294 - Err(VfsErrorKind::FileNotFound.into()) 295 - }) 296 } 297 298 - fn metadata(&self, path: &str) -> VfsResult<VfsMetadata> { 299 - self.block_on(async { 300 - let segs = self.segments(path); 301 - if segs.is_empty() { 302 - return Ok(AtpFS::vfs_dir_metadata()); 303 - } 304 - 305 - if segs.len() == 1 { 306 - self.ensure_root_loaded().await?; 307 - if self.root_cache.contains(segs[0]) { 308 - return Ok(AtpFS::vfs_dir_metadata()); 309 - } else { 310 - return Err(VfsErrorKind::FileNotFound.into()); 311 - } 312 - } 313 314 - if let Some(last) = segs.last() { 315 - if *last == "next" { 316 - let parent = &path[0..path.len() - 5]; 317 - let cache_key = self.ensure_loaded(parent).await?; 318 - let has_next = self 319 - .cache 320 - .read(&cache_key, |_, v| v.next_cursor.is_some()) 321 - .unwrap_or(false); 322 - if has_next { 323 - return Ok(AtpFS::vfs_dir_metadata()); 324 - } 325 - return Err(VfsErrorKind::FileNotFound.into()); 326 - } 327 } 328 - 329 - if path.ends_with(".json") { 330 - let parent_path = std::path::Path::new(path) 331 - .parent() 332 - .unwrap() 333 - .to_str() 334 - .unwrap(); 335 - let cache_key = self.ensure_loaded(parent_path).await?; 336 - let filename = segs.last().unwrap(); 337 338 - let len = self 339 .cache 340 - .read(&cache_key, |_, page| { 341 - page.files.get(*filename).map(|f| f.len()) 342 - }) 343 - .flatten(); 344 - 345 - if let Some(l) = len { 346 - return Ok(AtpFS::vfs_file_metadata(l as u64)); 347 } 348 - return Err(VfsErrorKind::FileNotFound.into()); 349 } 350 - 351 - Err(VfsErrorKind::FileNotFound.into()) 352 - }) 353 - } 354 355 - fn exists(&self, path: &str) -> VfsResult<bool> { 356 - Ok(self.metadata(path).is_ok()) 357 - } 358 359 - fn create_file(&self, _: &str) -> VfsResult<Box<dyn vfs::SeekAndWrite + Send>> { 360 - Err(VfsErrorKind::NotSupported.into()) 361 - } 362 363 - fn append_file(&self, _: &str) -> VfsResult<Box<dyn vfs::SeekAndWrite + Send>> { 364 - Err(VfsErrorKind::NotSupported.into()) 365 - } 366 367 - fn remove_file(&self, _path: &str) -> VfsResult<()> { 368 - Err(VfsErrorKind::NotSupported.into()) 369 } 370 371 - fn remove_dir(&self, _path: &str) -> VfsResult<()> { 372 - Err(VfsErrorKind::NotSupported.into()) 373 } 374 }
··· 1 use anyhow::{anyhow, Result}; 2 use jacquard::{ 3 api::com_atproto::repo::{describe_repo::DescribeRepo, list_records::ListRecords}, 4 client::{credential_session::CredentialSession, Agent, BasicClient, MemorySessionStore}, 5 identity::{resolver::IdentityResolver, slingshot_resolver_default}, 6 + prelude::*, 7 types::{did::Did, nsid::Nsid, string::Handle}, 8 }; 9 use scc::{HashMap, HashSet}; 10 use url::Url; 11 12 + use std::{ 13 + collections::HashMap as StdHashMap, 14 + fmt::Debug, 15 + io::{self, ErrorKind}, 16 + sync::Arc, 17 + }; 18 19 pub mod cli; 20 #[cfg(target_os = "linux")] ··· 43 .ok_or_else(|| anyhow!("no pds endpoint in did doc")) 44 } 45 46 + #[derive(Debug, Clone, Copy, PartialEq, Eq)] 47 + pub enum FileType { 48 + File, 49 + Directory, 50 + } 51 + 52 + #[derive(Debug, Clone)] 53 + pub struct Metadata { 54 + pub file_type: FileType, 55 + pub len: u64, 56 + } 57 + 58 #[derive(Debug)] 59 struct CachedPage { 60 + files: StdHashMap<String, Arc<Vec<u8>>>, 61 next_cursor: Option<String>, 62 } 63 ··· 66 client: BasicClient, 67 cache: HashMap<String, Arc<CachedPage>>, 68 root_cache: HashSet<String>, 69 } 70 71 impl Debug for AtpFS { 72 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 73 + f.debug_struct("AtpFS").field("did", &self.did).finish() 74 } 75 } 76 77 impl AtpFS { 78 + pub async fn new(did: Did<'static>, pds: Url) -> Self { 79 let store = MemorySessionStore::default(); 80 let session = 81 CredentialSession::new(Arc::new(store), Arc::new(slingshot_resolver_default())); 82 83 + session.set_endpoint(pds).await; 84 85 Self { 86 did, 87 client: Agent::new(session), 88 cache: HashMap::default(), 89 root_cache: HashSet::default(), 90 } 91 } 92 93 fn segments<'a, 's>(&'s self, path: &'a str) -> Vec<&'a str> { 94 path.trim_matches('/') 95 .split('/') ··· 97 .collect() 98 } 99 100 + fn dir_metadata() -> Metadata { 101 + Metadata { 102 + file_type: FileType::Directory, 103 len: 0, 104 } 105 } 106 107 + fn file_metadata(len: u64) -> Metadata { 108 + Metadata { 109 + file_type: FileType::File, 110 len, 111 } 112 } 113 114 + async fn ensure_root_loaded(&self) -> io::Result<()> { 115 if self.root_cache.is_empty() { 116 let request = DescribeRepo::new().repo(self.did.clone()).build(); 117 ··· 119 .client 120 .send(request) 121 .await 122 + .map_err(|e| io::Error::new(ErrorKind::Other, e.to_string()))?; 123 124 let output = response 125 .into_output() 126 + .map_err(|e| io::Error::new(ErrorKind::Other, e.to_string()))?; 127 128 for col in output.collections { 129 let _ = self.root_cache.insert_async(col.to_string()).await; 130 } 131 } 132 + return Ok(()); 133 } 134 135 + async fn ensure_loaded(&self, path: &str) -> io::Result<String> { 136 let segs = self.segments(path); 137 138 if segs.is_empty() { 139 + self.ensure_root_loaded().await?; 140 + return Ok("".to_string()); 141 } 142 143 let collection = segs[0]; ··· 146 } 147 148 if !self.root_cache.contains(collection) { 149 + return Err(ErrorKind::NotFound.into()); 150 } 151 152 let mut current_key = collection.to_string(); ··· 162 parent_cursor = Some(cursor); 163 current_key = format!("{}/next", current_key); 164 } else { 165 + return Err(ErrorKind::NotFound.into()); 166 } 167 } else if segment.ends_with(".json") { 168 break; 169 } else { 170 + return Err(ErrorKind::NotFound.into()); 171 } 172 } 173 ··· 176 Ok(current_key) 177 } 178 179 + async fn fetch_page_if_missing(&self, key: &str, cursor: Option<String>) -> io::Result<()> { 180 if self.cache.contains(key) { 181 return Ok(()); 182 } ··· 194 .client 195 .send(request.build()) 196 .await 197 + .map_err(|e| io::Error::new(ErrorKind::Other, e.to_string()))?; 198 199 let output = response 200 .into_output() 201 + .map_err(|e| io::Error::new(ErrorKind::Other, e.to_string()))?; 202 203 let mut files = StdHashMap::new(); 204 for rec in output.records { 205 if let Some(rkey) = rec.uri.rkey() { 206 let filename = format!("{}.json", rkey.0); 207 let content = serde_json::to_vec_pretty(&rec.value).unwrap_or_default(); 208 + files.insert(filename, Arc::new(content)); 209 } 210 } 211 ··· 222 223 Ok(()) 224 } 225 226 + pub async fn read_dir(&self, path: &str) -> io::Result<Vec<String>> { 227 + let segs = self.segments(path); 228 229 + if segs.is_empty() { 230 + self.ensure_root_loaded().await?; 231 + let mut keys = Vec::new(); 232 + self.root_cache.scan(|k| keys.push(k.clone())); 233 + return Ok(keys); 234 + } 235 236 + let cache_key = self.ensure_loaded(path).await?; 237 238 + if path.ends_with(".json") { 239 + return Err(io::Error::new(ErrorKind::Other, "not a directory")); 240 + } 241 242 + let page = self 243 + .cache 244 + .read(&cache_key, |_, v| v.clone()) 245 + .ok_or(ErrorKind::NotFound)?; 246 247 + let mut entries: Vec<String> = page.files.keys().cloned().collect(); 248 + if page.next_cursor.is_some() { 249 + entries.push("next".to_string()); 250 + } 251 252 + Ok(entries) 253 } 254 255 + pub async fn open_file(&self, path: &str) -> io::Result<Arc<Vec<u8>>> { 256 + let parent_path = std::path::Path::new(path) 257 + .parent() 258 + .unwrap_or(std::path::Path::new("")) 259 + .to_str() 260 + .unwrap(); 261 + let cache_key = self.ensure_loaded(parent_path).await?; 262 + let filename = path.split('/').last().ok_or(ErrorKind::NotFound)?; 263 264 + let content = self 265 + .cache 266 + .read(&cache_key, |_, page| page.files.get(filename).cloned()) 267 + .flatten(); 268 269 + content.ok_or(ErrorKind::NotFound.into()) 270 } 271 272 + pub async fn metadata(&self, path: &str) -> io::Result<Metadata> { 273 + let segs = self.segments(path); 274 + if segs.is_empty() { 275 + return Ok(Self::dir_metadata()); 276 + } 277 278 + if segs.len() == 1 { 279 + self.ensure_root_loaded().await?; 280 + if self.root_cache.contains(segs[0]) { 281 + return Ok(Self::dir_metadata()); 282 + } else { 283 + return Err(ErrorKind::NotFound.into()); 284 } 285 + } 286 287 + if let Some(last) = segs.last() { 288 + if *last == "next" { 289 + let parent = &path[0..path.len() - 5]; 290 + let cache_key = self.ensure_loaded(parent).await?; 291 + let has_next = self 292 .cache 293 + .read(&cache_key, |_, v| v.next_cursor.is_some()) 294 + .unwrap_or(false); 295 + if has_next { 296 + return Ok(Self::dir_metadata()); 297 } 298 + return Err(ErrorKind::NotFound.into()); 299 } 300 + } 301 302 + if path.ends_with(".json") { 303 + let parent_path = std::path::Path::new(path) 304 + .parent() 305 + .unwrap() 306 + .to_str() 307 + .unwrap(); 308 + let cache_key = self.ensure_loaded(parent_path).await?; 309 + let filename = segs.last().unwrap(); 310 311 + let len = self 312 + .cache 313 + .read(&cache_key, |_, page| { 314 + page.files.get(*filename).map(|f| f.len()) 315 + }) 316 + .flatten(); 317 318 + if let Some(l) = len { 319 + return Ok(Self::file_metadata(l as u64)); 320 + } 321 + return Err(ErrorKind::NotFound.into()); 322 + } 323 324 + Err(ErrorKind::NotFound.into()) 325 } 326 327 + pub async fn exists(&self, path: &str) -> io::Result<bool> { 328 + Ok(self.metadata(path).await.is_ok()) 329 } 330 }
+7 -5
src/main.rs
··· 3 cli::{opts, SubCommand}, 4 resolve_did, resolve_pds, AtpFS, 5 }; 6 - use vfs::FileSystem; 7 8 use std::sync::Arc; 9 10 async fn run_app(args: Vec<String>) -> Result<()> { 11 let opts = opts().run_inner(args.as_slice()); ··· 23 let pds = resolve_pds(&did).await?; 24 println!("resolved PDS: {}", pds); 25 26 - let fs = Arc::new(AtpFS::new(did, pds)); 27 28 match opts.cmd { 29 SubCommand::Ls { path } => { 30 - println!("Listing: {}", path); 31 - let iterator = fs.read_dir(&path)?; 32 - for item in iterator { 33 println!("{}", item); 34 } 35 } ··· 40 41 let options = vec![MountOption::RO, MountOption::FSName("atproto".to_string())]; 42 43 let fuse_handler = AtpFuse { 44 fs, 45 inner: DefaultFuseHandler::new(), 46 }; 47 48 println!("mounting at {:?}...", mount_point);
··· 3 cli::{opts, SubCommand}, 4 resolve_did, resolve_pds, AtpFS, 5 }; 6 7 use std::sync::Arc; 8 + use tokio::runtime::Handle; 9 10 async fn run_app(args: Vec<String>) -> Result<()> { 11 let opts = opts().run_inner(args.as_slice()); ··· 23 let pds = resolve_pds(&did).await?; 24 println!("resolved PDS: {}", pds); 25 26 + let fs = Arc::new(AtpFS::new(did, pds).await); 27 28 match opts.cmd { 29 SubCommand::Ls { path } => { 30 + let files = fs.read_dir(&path).await?; 31 + for item in files { 32 println!("{}", item); 33 } 34 } ··· 39 40 let options = vec![MountOption::RO, MountOption::FSName("atproto".to_string())]; 41 42 + let handle = Handle::current(); 43 + 44 let fuse_handler = AtpFuse { 45 fs, 46 inner: DefaultFuseHandler::new(), 47 + runtime: handle, 48 }; 49 50 println!("mounting at {:?}...", mount_point);