mount an atproto PDS repository as a FUSE filesystem

rework to be multi-tenant

supports multiple handles now

Signed-off-by: oppiliappan <me@oppi.li>

oppi.li 028f454a 8c332f8d

verified
Changed files
+275 -104
src
+21 -1
readme.txt
··· 7 7 usage 8 8 ----- 9 9 10 - cargo run -- DID|handle 10 + cargo run -- [DIDs|handles ...] [-m mountpoint] 11 + 12 + example 13 + ------- 14 + 15 + $ cargo run -- oppi.li icyphox.sh 16 + ⠏ [00:00:00] using cached CAR file for...did:plc:qfpnj4og54vl56wngdriaxug 17 + ⠏ [00:00:00] using cached CAR file for...did:plc:hwevmowznbiukdf6uk5dwrrq 18 + mounted at "mnt" 19 + hit enter to unmount and exit... 20 + 21 + 22 + $ cat foo/**/sh.tangled.publicKey/* | jq -r '.key' 23 + ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAICJPYX06+qKr9IHWfkgCtHbExoBOOwS/+iAWbog9bAdk icy@wyndle 24 + ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIMj1Dn9YuFo2BNr993ymBa6nzyyIKAURIqMbUtfI8+4X op@mantis 25 + . 26 + . 27 + . 28 + 29 + 30 +
+6
src/error.rs
··· 2 2 pub enum Error { 3 3 #[error("atproto error: {0}")] 4 4 GetRepo(#[from] atrium_xrpc::Error<atrium_api::com::atproto::sync::get_repo::Error>), 5 + #[error("repo build error: {0}")] 6 + Repo(#[from] atrium_repo::repo::Error), 7 + #[error("car store error: {0}")] 8 + Car(#[from] atrium_repo::blockstore::CarError), 9 + #[error("identity error: {0}")] 10 + Identity(#[from] atrium_identity::Error), 5 11 #[error("io error: {0}")] 6 12 Io(#[from] std::io::Error), 7 13 }
+173 -71
src/fs.rs
··· 1 - use std::{collections::BTreeMap, time}; 1 + use std::{ 2 + collections::{BTreeMap, hash_set::Iter}, 3 + time, 4 + }; 2 5 3 6 use atrium_repo::{Repository, blockstore::AsyncBlockStoreRead}; 4 7 use futures::StreamExt; 5 - use indexmap::IndexSet; 8 + use indexmap::{IndexMap, IndexSet}; 9 + 10 + type Inode = usize; 11 + 6 12 pub struct PdsFs<R> { 7 - repo: Repository<R>, 13 + repos: IndexMap<String, Repository<R>>, 8 14 inodes: IndexSet<PdsFsEntry>, 9 15 } 10 16 ··· 12 18 pub enum PdsFsEntry { 13 19 Zero, 14 20 Root, 15 - Collection(String), 21 + Did(String), 22 + Collection(PdsFsCollection), 16 23 Record(PdsFsRecord), 17 24 } 18 25 26 + impl PdsFsEntry { 27 + fn as_collection(&self) -> Option<&PdsFsCollection> { 28 + match &self { 29 + Self::Collection(c) => Some(c), 30 + _ => None, 31 + } 32 + } 33 + 34 + fn as_did(&self) -> Option<&String> { 35 + match &self { 36 + Self::Did(d) => Some(d), 37 + _ => None, 38 + } 39 + } 40 + 41 + fn unwrap_collection(&self) -> &PdsFsCollection { 42 + self.as_collection().unwrap() 43 + } 44 + 45 + fn unwrap_did(&self) -> &String { 46 + self.as_did().unwrap() 47 + } 48 + } 49 + 50 + #[derive(Debug, Clone, PartialEq, Eq, Hash)] 51 + pub struct PdsFsCollection { 52 + parent: Inode, 53 + nsid: String, 54 + } 55 + 19 56 #[derive(Debug, Clone, PartialEq, Eq, Hash)] 20 57 pub struct PdsFsRecord { 21 - collection: String, 58 + parent: Inode, 22 59 rkey: String, 23 60 } 24 61 25 - impl PdsFsRecord { 26 - fn key(&self) -> String { 27 - format!("{}/{}", self.collection, self.rkey) 28 - } 29 - } 62 + // impl PdsFsRecord { 63 + // fn key(&self) -> String { 64 + // format!("{}/{}", self.collection, self.rkey) 65 + // } 66 + // } 30 67 31 68 const TTL: time::Duration = time::Duration::from_secs(300); 32 69 const BLKSIZE: u32 = 512; ··· 53 90 where 54 91 R: AsyncBlockStoreRead, 55 92 { 56 - pub async fn new(mut repo: Repository<R>) -> Self { 93 + pub fn new() -> Self { 94 + PdsFs { 95 + repos: Default::default(), 96 + inodes: IndexSet::from([PdsFsEntry::Zero, PdsFsEntry::Root]), 97 + } 98 + } 99 + 100 + pub async fn add(&mut self, did: String, mut repo: Repository<R>) { 57 101 let mut mst = repo.tree(); 58 102 59 - // collect all keys and group by collection 103 + let (did_inode, _) = self.inodes.insert_full(PdsFsEntry::Did(did.clone())); 104 + 60 105 let mut keys = Box::pin(mst.keys()); 61 - let mut collections: BTreeMap<String, Vec<PdsFsRecord>> = BTreeMap::new(); 106 + while let Some(Ok(key)) = keys.next().await { 107 + if let Some((collection_name, rkey)) = key.split_once("/") { 108 + let (collection_inode, _) = 109 + self.inodes 110 + .insert_full(PdsFsEntry::Collection(PdsFsCollection { 111 + parent: did_inode, 112 + nsid: collection_name.to_owned(), 113 + })); 62 114 63 - while let Some(Ok(key)) = keys.next().await { 64 - if let Some((collection, rkey)) = key.split_once("/") { 65 - let record = PdsFsRecord { 66 - collection: collection.to_owned(), 115 + self.inodes.insert(PdsFsEntry::Record(PdsFsRecord { 116 + parent: collection_inode, 67 117 rkey: rkey.to_owned(), 68 - }; 69 - 70 - collections 71 - .entry(collection.to_owned()) 72 - .or_insert_with(Vec::new) 73 - .push(record); 118 + })); 74 119 } 75 120 } 76 121 77 122 drop(keys); 78 123 drop(mst); 79 124 80 - // build inode structure with proper ordering 81 - let mut inodes = IndexSet::from([PdsFsEntry::Zero, PdsFsEntry::Root]); 82 - 83 - // add collections first 84 - for n in collections.keys().cloned() { 85 - inodes.insert(PdsFsEntry::Collection(n)); 86 - } 87 - 88 - // then add all records grouped by collection 89 - for r in collections.values().flatten().cloned() { 90 - inodes.insert(PdsFsEntry::Record(r)); 91 - } 92 - 93 - println!("constructed {} inodes", inodes.len()); 94 - 95 - PdsFs { repo, inodes } 125 + self.repos.insert(did, repo); 96 126 } 97 127 98 128 fn attr(&mut self, ino: u64) -> fuser::FileAttr { ··· 115 145 flags: 0, 116 146 blksize: BLKSIZE, 117 147 }, 148 + Some(PdsFsEntry::Did(_)) => fuser::FileAttr { 149 + ino, 150 + size: 0, 151 + blocks: 0, 152 + atime: time::UNIX_EPOCH, 153 + mtime: time::UNIX_EPOCH, 154 + ctime: time::UNIX_EPOCH, 155 + crtime: time::UNIX_EPOCH, 156 + kind: fuser::FileType::Directory, 157 + perm: 0o755, 158 + nlink: 2, 159 + uid: 1000, 160 + gid: 1000, 161 + rdev: 0, 162 + flags: 0, 163 + blksize: BLKSIZE, 164 + }, 118 165 Some(PdsFsEntry::Record(r)) => { 166 + let col = self.inodes[r.parent].unwrap_collection(); 167 + let did = self.inodes[col.parent].unwrap_did(); 168 + let repo = &mut self.repos[did]; 169 + let key = format!("{}/{}", col.nsid, r.rkey); 119 170 let rt = tokio::runtime::Runtime::new().unwrap(); 120 171 let size = rt 121 - .block_on(self.repo.get_raw::<ipld_core::ipld::Ipld>(&r.key())) 172 + .block_on(repo.get_raw::<ipld_core::ipld::Ipld>(&key)) 122 173 .ok() 123 174 .flatten() 124 175 .map_or(0, |v| serde_json::to_string(&v).unwrap().len()) ··· 178 229 let entries: Vec<_> = vec![(ino, ".".to_string()), (ino, "..".to_string())] 179 230 .into_iter() 180 231 .chain(self.inodes.iter().enumerate().filter_map(|(i, e)| { 181 - if let PdsFsEntry::Collection(name) = e { 182 - Some((i as u64, name.clone())) 232 + if let PdsFsEntry::Did(did) = e { 233 + Some((i as u64, did.clone())) 183 234 } else { 184 235 None 185 236 } ··· 200 251 } 201 252 reply.ok() 202 253 } 203 - Some(PdsFsEntry::Collection(collection_name)) => { 204 - let entries: Vec<_> = vec![ 205 - (ino, ".".to_string()), 206 - (1, "..".to_string()), // Parent is root (inode 1) 207 - ] 208 - .into_iter() 209 - .chain(self.inodes.iter().enumerate().filter_map(|(i, e)| { 210 - if let PdsFsEntry::Record(record) = e { 211 - if record.collection == *collection_name { 212 - Some((i as u64, record.rkey.clone())) 254 + Some(PdsFsEntry::Did(_)) => { 255 + let entries = vec![(ino, ".".to_string()), (1, "..".to_string())] 256 + .into_iter() 257 + .chain(self.inodes.iter().enumerate().filter_map(|(i, e)| { 258 + if let PdsFsEntry::Collection(col) = e { 259 + if col.parent == ino as usize { 260 + Some((i as u64, col.nsid.clone())) 261 + } else { 262 + None 263 + } 213 264 } else { 214 265 None 215 266 } 216 - } else { 217 - None 267 + })) 268 + .into_iter() 269 + .enumerate() 270 + .skip(offset as usize); 271 + 272 + for (index, (inode_num, name)) in entries { 273 + let full = reply.add( 274 + inode_num, 275 + (index + 1) as i64, 276 + if name.starts_with('.') { 277 + fuser::FileType::Directory 278 + } else { 279 + fuser::FileType::RegularFile 280 + }, 281 + name, 282 + ); 283 + if full { 284 + break; 218 285 } 219 - })) 220 - .collect(); 286 + } 221 287 222 - for (index, (inode_num, name)) in 223 - entries.into_iter().enumerate().skip(offset as usize) 224 - { 225 - if reply.add( 288 + reply.ok(); 289 + } 290 + Some(PdsFsEntry::Collection(c)) => { 291 + let entries = [(ino, ".".to_string()), (c.parent as u64, "..".to_string())] 292 + .into_iter() 293 + .chain(self.inodes.iter().enumerate().filter_map(|(i, e)| { 294 + if let PdsFsEntry::Record(record) = e { 295 + if record.parent == ino as usize { 296 + Some((i as u64, record.rkey.clone())) 297 + } else { 298 + None 299 + } 300 + } else { 301 + None 302 + } 303 + })) 304 + .into_iter() 305 + .enumerate() 306 + .skip(offset as usize); 307 + 308 + for (index, (inode_num, name)) in entries { 309 + let full = reply.add( 226 310 inode_num, 227 311 (index + 1) as i64, 228 312 if name.starts_with('.') { ··· 231 315 fuser::FileType::RegularFile 232 316 }, 233 317 name, 234 - ) { 318 + ); 319 + if full { 235 320 break; 236 321 } 237 322 } 323 + 238 324 reply.ok() 239 325 } 240 326 _ => reply.error(libc::ENOENT), ··· 250 336 ) { 251 337 match self.inodes.get_index(parent as usize) { 252 338 Some(PdsFsEntry::Root) => { 253 - let collection = PdsFsEntry::Collection(name.to_string_lossy().to_string()); 254 - if let Some(ino) = self.inodes.get_index_of(&collection) { 339 + let did = PdsFsEntry::Did(name.to_string_lossy().to_string()); 340 + if let Some(ino) = self.inodes.get_index_of(&did) { 341 + reply.entry(&TTL, &self.attr(ino as u64), 0); 342 + } else { 343 + reply.error(libc::ENOENT) 344 + } 345 + } 346 + Some(PdsFsEntry::Did(_)) => { 347 + let col = PdsFsEntry::Collection(PdsFsCollection { 348 + parent: parent as usize, 349 + nsid: name.to_string_lossy().to_string(), 350 + }); 351 + if let Some(ino) = self.inodes.get_index_of(&col) { 255 352 reply.entry(&TTL, &self.attr(ino as u64), 0); 256 353 } else { 257 354 reply.error(libc::ENOENT) 258 355 } 259 356 } 260 - Some(PdsFsEntry::Collection(c)) => { 357 + Some(PdsFsEntry::Collection(_)) => { 261 358 let record = PdsFsEntry::Record(PdsFsRecord { 262 - collection: c.to_owned(), 359 + parent: parent as usize, 263 360 rkey: name.to_string_lossy().to_string(), 264 361 }); 265 362 if let Some(ino) = self.inodes.get_index_of(&record) { ··· 284 381 reply: fuser::ReplyData, 285 382 ) { 286 383 let rt = tokio::runtime::Runtime::new().unwrap(); 287 - if let Some(PdsFsEntry::Record(r)) = self.inodes.get_index(ino as usize) 288 - && let Ok(Some(val)) = rt.block_on(self.repo.get_raw::<ipld_core::ipld::Ipld>(&r.key())) 289 - { 290 - reply.data(&serde_json::to_string(&val).unwrap().as_bytes()[offset as usize..]); 291 - } else { 292 - reply.error(libc::ENOENT); 384 + if let Some(PdsFsEntry::Record(r)) = self.inodes.get_index(ino as usize) { 385 + let col = self.inodes[r.parent].unwrap_collection(); 386 + let did = self.inodes[col.parent].unwrap_did(); 387 + let repo = &mut self.repos[did]; 388 + let key = format!("{}/{}", col.nsid, r.rkey); 389 + 390 + if let Ok(Some(val)) = rt.block_on(repo.get_raw::<ipld_core::ipld::Ipld>(&key)) { 391 + reply.data(&serde_json::to_string(&val).unwrap().as_bytes()[offset as usize..]); 392 + return; 393 + } 293 394 } 395 + reply.error(libc::ENOENT); 294 396 } 295 397 }
+75 -32
src/main.rs
··· 10 10 use atrium_repo::{Repository, blockstore::CarStore}; 11 11 use atrium_xrpc_client::isahc::IsahcClient; 12 12 use fuser::MountOption; 13 - use indicatif::{ProgressBar, ProgressStyle}; 13 + use futures::{Stream, StreamExt, TryStream, TryStreamExt, stream}; 14 + use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; 14 15 use std::{ 16 + collections::HashMap, 15 17 io::{Cursor, Write}, 16 18 path::PathBuf, 19 + sync::Arc, 17 20 }; 18 21 use xdg::BaseDirectories; 19 22 20 23 fn main() { 21 24 let rt = tokio::runtime::Runtime::new().unwrap(); 22 25 let matches = clap::command!() 23 - .arg(clap::Arg::new("handle").index(1)) 26 + .arg( 27 + clap::Arg::new("handles") 28 + .index(1) 29 + .required(true) 30 + .num_args(1..) 31 + .help("One or more handles to download and mount"), 32 + ) 24 33 .arg( 25 34 clap::Arg::new("mountpoint") 26 35 .short('m') ··· 28 37 .value_parser(clap::value_parser!(PathBuf)), 29 38 ) 30 39 .get_matches(); 31 - let handle = matches.get_one::<String>("handle").unwrap(); 40 + let handles = matches 41 + .get_many::<String>("handles") 42 + .unwrap() 43 + .cloned() 44 + .collect::<Vec<_>>(); 32 45 let mountpoint = matches 33 46 .get_one::<PathBuf>("mountpoint") 34 47 .map(ToOwned::to_owned) 35 48 .unwrap_or(PathBuf::from("mnt")); 36 49 let _ = std::fs::create_dir_all(&mountpoint); 37 - let resolver = resolver::id_resolver(); 38 - let id = rt.block_on(async { 39 - resolver 40 - .resolve(&atrium_api::types::string::Handle::new(handle.into()).unwrap()) 41 - .await 42 - .unwrap() 43 - }); 44 50 45 - let pb = ProgressBar::new_spinner(); 46 - pb.set_style( 47 - ProgressStyle::default_spinner() 48 - .template("{spinner:.green} [{elapsed_precise}] {msg}") 49 - .unwrap() 50 - .tick_strings(&["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]), 51 + let resolver = Arc::new(resolver::id_resolver()); 52 + let bars = Arc::new(MultiProgress::new()); 53 + let repos = rt.block_on( 54 + stream::iter(handles) 55 + .then(|handle| { 56 + let h = handle.clone(); 57 + let r = Arc::clone(&resolver); 58 + let b = Arc::clone(&bars); 59 + async move { 60 + let id = r.resolve(&h).await?; 61 + let bytes = cached_download(&id, &b).await?; 62 + let repo = build_repo(bytes).await?; 63 + Ok::<_, error::Error>((id.did, repo)) 64 + } 65 + }) 66 + .collect::<Vec<_>>(), 51 67 ); 52 - 53 - let bytes = rt.block_on(cached_download(&id, &pb)).unwrap(); 54 - let store = rt.block_on(async { CarStore::open(Cursor::new(bytes)).await.unwrap() }); 55 - let root = store.roots().next().unwrap(); 56 - let repo = rt.block_on(async { Repository::open(store, root).await.unwrap() }); 68 + let (success, errors): (Vec<_>, Vec<_>) = repos.into_iter().partition(|r| r.is_ok()); 69 + for e in errors { 70 + eprintln!("{:?}", e.as_ref().unwrap_err()); 71 + } 72 + let repos = success 73 + .into_iter() 74 + .map(|s| s.unwrap()) 75 + .collect::<HashMap<_, _>>(); 57 76 58 77 // construct the fs 59 - let fs = rt.block_on(fs::PdsFs::new(repo)); 78 + let mut fs = fs::PdsFs::new(); 79 + for (did, repo) in repos { 80 + rt.block_on(fs.add(did, repo)) 81 + } 60 82 61 83 // mount 62 84 let options = vec![MountOption::RO, MountOption::FSName("pdsfs".to_string())]; ··· 76 98 println!("unmounted {mountpoint:?}"); 77 99 } 78 100 79 - async fn cached_download(id: &ResolvedIdentity, pb: &ProgressBar) -> Result<Vec<u8>, error::Error> { 101 + async fn cached_download( 102 + id: &ResolvedIdentity, 103 + m: &MultiProgress, 104 + ) -> Result<Vec<u8>, error::Error> { 105 + let mut pb = ProgressBar::new_spinner(); 106 + pb.set_style( 107 + ProgressStyle::default_spinner() 108 + .template("{spinner:.green} [{elapsed_precise}] {msg}") 109 + .unwrap() 110 + .tick_strings(&["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]), 111 + ); 112 + pb.enable_steady_tick(std::time::Duration::from_millis(100)); 113 + pb = m.add(pb); 114 + 80 115 let dirs = BaseDirectories::new(); 81 116 82 117 let dir = dirs.get_data_home().expect("$HOME is absent").join("pdsfs"); ··· 86 121 let exists = std::fs::exists(&file)?; 87 122 88 123 let bytes = if !exists { 89 - pb.set_message(format!("downloading CAR file for\t...\t{}", id.did)); 90 - download_car_file(id, pb).await? 124 + pb.set_message(format!("downloading CAR file for...{}", id.did)); 125 + download_car_file(id, &pb).await? 91 126 } else { 92 - pb.set_message(format!("using cached CAR file for\t...\t{}", id.did)); 93 - tokio::fs::read(file).await? 127 + pb.set_message(format!("using cached CAR file for...{}", id.did)); 128 + tokio::fs::read(&file).await? 94 129 }; 95 130 96 - pb.set_message(format!( 97 - "received {} bytes for \t...\t{}", 98 - bytes.len(), 99 - id.did 100 - )); 131 + // write to disk 132 + if !exists { 133 + tokio::fs::write(&file, &bytes).await?; 134 + } 135 + 136 + pb.finish(); 101 137 Ok(bytes) 102 138 } 103 139 ··· 123 159 124 160 Ok(bytes) 125 161 } 162 + 163 + async fn build_repo(bytes: Vec<u8>) -> Result<Repository<CarStore<Cursor<Vec<u8>>>>, error::Error> { 164 + let store = CarStore::open(Cursor::new(bytes)).await?; 165 + let root = store.roots().next().unwrap(); 166 + let repo = Repository::open(store, root).await?; 167 + Ok(repo) 168 + }