mount an atproto PDS repository as a FUSE filesystem
1use std::{ 2 collections::{BTreeMap, hash_set::Iter}, 3 time, 4}; 5 6use atrium_repo::{Repository, blockstore::AsyncBlockStoreRead}; 7use futures::StreamExt; 8use indexmap::{IndexMap, IndexSet}; 9 10type Inode = usize; 11 12pub struct PdsFs<R> { 13 repos: IndexMap<String, Repository<R>>, 14 inodes: IndexSet<PdsFsEntry>, 15} 16 17#[derive(Debug, Clone, PartialEq, Eq, Hash)] 18pub enum PdsFsEntry { 19 Zero, 20 Root, 21 Did(String), 22 Collection(PdsFsCollection), 23 Record(PdsFsRecord), 24} 25 26impl PdsFsEntry { 27 fn as_collection(&self) -> Option<&PdsFsCollection> { 28 match &self { 29 Self::Collection(c) => Some(c), 30 _ => None, 31 } 32 } 33 34 fn as_did(&self) -> Option<&String> { 35 match &self { 36 Self::Did(d) => Some(d), 37 _ => None, 38 } 39 } 40 41 fn unwrap_collection(&self) -> &PdsFsCollection { 42 self.as_collection().unwrap() 43 } 44 45 fn unwrap_did(&self) -> &String { 46 self.as_did().unwrap() 47 } 48} 49 50#[derive(Debug, Clone, PartialEq, Eq, Hash)] 51pub struct PdsFsCollection { 52 parent: Inode, 53 nsid: String, 54} 55 56#[derive(Debug, Clone, PartialEq, Eq, Hash)] 57pub struct PdsFsRecord { 58 parent: Inode, 59 rkey: String, 60} 61 62// impl PdsFsRecord { 63// fn key(&self) -> String { 64// format!("{}/{}", self.collection, self.rkey) 65// } 66// } 67 68const TTL: time::Duration = time::Duration::from_secs(300); 69const BLKSIZE: u32 = 512; 70 71const ROOTDIR_ATTR: fuser::FileAttr = fuser::FileAttr { 72 ino: 1, 73 size: 0, 74 blocks: 0, 75 atime: time::UNIX_EPOCH, 76 mtime: time::UNIX_EPOCH, 77 ctime: time::UNIX_EPOCH, 78 crtime: time::UNIX_EPOCH, 79 kind: fuser::FileType::Directory, 80 perm: 0o755, 81 nlink: 2, 82 uid: 501, 83 gid: 20, 84 rdev: 0, 85 flags: 0, 86 blksize: BLKSIZE, 87}; 88 89impl<R> PdsFs<R> 90where 91 R: AsyncBlockStoreRead, 92{ 93 pub fn new() -> Self { 94 PdsFs { 95 repos: Default::default(), 96 inodes: IndexSet::from([PdsFsEntry::Zero, PdsFsEntry::Root]), 97 } 98 } 99 100 pub async fn add(&mut self, did: String, mut repo: Repository<R>) { 101 let mut mst = repo.tree(); 102 103 let (did_inode, _) = self.inodes.insert_full(PdsFsEntry::Did(did.clone())); 104 105 let mut keys = Box::pin(mst.keys()); 106 while let Some(Ok(key)) = keys.next().await { 107 if let Some((collection_name, rkey)) = key.split_once("/") { 108 let (collection_inode, _) = 109 self.inodes 110 .insert_full(PdsFsEntry::Collection(PdsFsCollection { 111 parent: did_inode, 112 nsid: collection_name.to_owned(), 113 })); 114 115 self.inodes.insert(PdsFsEntry::Record(PdsFsRecord { 116 parent: collection_inode, 117 rkey: rkey.to_owned(), 118 })); 119 } 120 } 121 122 drop(keys); 123 drop(mst); 124 125 self.repos.insert(did, repo); 126 } 127 128 fn attr(&mut self, ino: u64) -> fuser::FileAttr { 129 match self.inodes.get_index(ino as usize) { 130 Some(PdsFsEntry::Root) => ROOTDIR_ATTR, 131 Some(PdsFsEntry::Collection(_)) => fuser::FileAttr { 132 ino, 133 size: 0, 134 blocks: 0, 135 atime: time::UNIX_EPOCH, 136 mtime: time::UNIX_EPOCH, 137 ctime: time::UNIX_EPOCH, 138 crtime: time::UNIX_EPOCH, 139 kind: fuser::FileType::Directory, 140 perm: 0o755, 141 nlink: 2, 142 uid: 1000, 143 gid: 1000, 144 rdev: 0, 145 flags: 0, 146 blksize: BLKSIZE, 147 }, 148 Some(PdsFsEntry::Did(_)) => fuser::FileAttr { 149 ino, 150 size: 0, 151 blocks: 0, 152 atime: time::UNIX_EPOCH, 153 mtime: time::UNIX_EPOCH, 154 ctime: time::UNIX_EPOCH, 155 crtime: time::UNIX_EPOCH, 156 kind: fuser::FileType::Directory, 157 perm: 0o755, 158 nlink: 2, 159 uid: 1000, 160 gid: 1000, 161 rdev: 0, 162 flags: 0, 163 blksize: BLKSIZE, 164 }, 165 Some(PdsFsEntry::Record(r)) => { 166 let col = self.inodes[r.parent].unwrap_collection(); 167 let did = self.inodes[col.parent].unwrap_did(); 168 let repo = &mut self.repos[did]; 169 let key = format!("{}/{}", col.nsid, r.rkey); 170 let rt = tokio::runtime::Runtime::new().unwrap(); 171 let size = rt 172 .block_on(repo.get_raw::<ipld_core::ipld::Ipld>(&key)) 173 .ok() 174 .flatten() 175 .map_or(0, |v| serde_json::to_string(&v).unwrap().len()) 176 as u64; 177 let blocks = ((size as u32 + BLKSIZE - 1) / BLKSIZE) as u64; 178 fuser::FileAttr { 179 ino, 180 size, 181 blocks, 182 atime: time::UNIX_EPOCH, 183 mtime: time::UNIX_EPOCH, 184 ctime: time::UNIX_EPOCH, 185 crtime: time::UNIX_EPOCH, 186 kind: fuser::FileType::RegularFile, 187 perm: 0o644, 188 nlink: 1, 189 uid: 501, 190 gid: 20, 191 rdev: 0, 192 flags: 0, 193 blksize: 512, 194 } 195 } 196 _ => panic!("zero"), 197 } 198 } 199} 200 201impl<R> fuser::Filesystem for PdsFs<R> 202where 203 R: AsyncBlockStoreRead, 204{ 205 fn getattr( 206 &mut self, 207 _req: &fuser::Request, 208 ino: u64, 209 _fh: Option<u64>, 210 reply: fuser::ReplyAttr, 211 ) { 212 if (ino as usize) < self.inodes.len() { 213 reply.attr(&TTL, &self.attr(ino as u64)) 214 } else { 215 reply.error(libc::ENOENT) 216 } 217 } 218 219 fn readdir( 220 &mut self, 221 _req: &fuser::Request, 222 ino: u64, 223 _fh: u64, 224 offset: i64, 225 mut reply: fuser::ReplyDirectory, 226 ) { 227 match self.inodes.get_index(ino as usize) { 228 Some(PdsFsEntry::Root) => { 229 let entries: Vec<_> = vec![(ino, ".".to_string()), (ino, "..".to_string())] 230 .into_iter() 231 .chain(self.inodes.iter().enumerate().filter_map(|(i, e)| { 232 if let PdsFsEntry::Did(did) = e { 233 Some((i as u64, did.clone())) 234 } else { 235 None 236 } 237 })) 238 .collect(); 239 240 for (index, (inode_num, name)) in 241 entries.into_iter().enumerate().skip(offset as usize) 242 { 243 if reply.add( 244 inode_num, 245 (index + 1) as i64, 246 fuser::FileType::Directory, 247 name, 248 ) { 249 break; 250 } 251 } 252 reply.ok() 253 } 254 Some(PdsFsEntry::Did(_)) => { 255 let entries = vec![(ino, ".".to_string()), (1, "..".to_string())] 256 .into_iter() 257 .chain(self.inodes.iter().enumerate().filter_map(|(i, e)| { 258 if let PdsFsEntry::Collection(col) = e { 259 if col.parent == ino as usize { 260 Some((i as u64, col.nsid.clone())) 261 } else { 262 None 263 } 264 } else { 265 None 266 } 267 })) 268 .into_iter() 269 .enumerate() 270 .skip(offset as usize); 271 272 for (index, (inode_num, name)) in entries { 273 let full = reply.add( 274 inode_num, 275 (index + 1) as i64, 276 if name.starts_with('.') { 277 fuser::FileType::Directory 278 } else { 279 fuser::FileType::RegularFile 280 }, 281 name, 282 ); 283 if full { 284 break; 285 } 286 } 287 288 reply.ok(); 289 } 290 Some(PdsFsEntry::Collection(c)) => { 291 let entries = [(ino, ".".to_string()), (c.parent as u64, "..".to_string())] 292 .into_iter() 293 .chain(self.inodes.iter().enumerate().filter_map(|(i, e)| { 294 if let PdsFsEntry::Record(record) = e { 295 if record.parent == ino as usize { 296 Some((i as u64, record.rkey.clone())) 297 } else { 298 None 299 } 300 } else { 301 None 302 } 303 })) 304 .into_iter() 305 .enumerate() 306 .skip(offset as usize); 307 308 for (index, (inode_num, name)) in entries { 309 let full = reply.add( 310 inode_num, 311 (index + 1) as i64, 312 if name.starts_with('.') { 313 fuser::FileType::Directory 314 } else { 315 fuser::FileType::RegularFile 316 }, 317 name, 318 ); 319 if full { 320 break; 321 } 322 } 323 324 reply.ok() 325 } 326 _ => reply.error(libc::ENOENT), 327 } 328 } 329 330 fn lookup( 331 &mut self, 332 _req: &fuser::Request, 333 parent: u64, 334 name: &std::ffi::OsStr, 335 reply: fuser::ReplyEntry, 336 ) { 337 match self.inodes.get_index(parent as usize) { 338 Some(PdsFsEntry::Root) => { 339 let did = PdsFsEntry::Did(name.to_string_lossy().to_string()); 340 if let Some(ino) = self.inodes.get_index_of(&did) { 341 reply.entry(&TTL, &self.attr(ino as u64), 0); 342 } else { 343 reply.error(libc::ENOENT) 344 } 345 } 346 Some(PdsFsEntry::Did(_)) => { 347 let col = PdsFsEntry::Collection(PdsFsCollection { 348 parent: parent as usize, 349 nsid: name.to_string_lossy().to_string(), 350 }); 351 if let Some(ino) = self.inodes.get_index_of(&col) { 352 reply.entry(&TTL, &self.attr(ino as u64), 0); 353 } else { 354 reply.error(libc::ENOENT) 355 } 356 } 357 Some(PdsFsEntry::Collection(_)) => { 358 let record = PdsFsEntry::Record(PdsFsRecord { 359 parent: parent as usize, 360 rkey: name.to_string_lossy().to_string(), 361 }); 362 if let Some(ino) = self.inodes.get_index_of(&record) { 363 reply.entry(&TTL, &self.attr(ino as u64), 0); 364 } else { 365 reply.error(libc::ENOENT) 366 } 367 } 368 _ => reply.error(libc::ENOENT), 369 } 370 } 371 372 fn read( 373 &mut self, 374 _req: &fuser::Request, 375 ino: u64, 376 _fh: u64, 377 offset: i64, 378 _size: u32, 379 _flags: i32, 380 _lock: Option<u64>, 381 reply: fuser::ReplyData, 382 ) { 383 let rt = tokio::runtime::Runtime::new().unwrap(); 384 if let Some(PdsFsEntry::Record(r)) = self.inodes.get_index(ino as usize) { 385 let col = self.inodes[r.parent].unwrap_collection(); 386 let did = self.inodes[col.parent].unwrap_did(); 387 let repo = &mut self.repos[did]; 388 let key = format!("{}/{}", col.nsid, r.rkey); 389 390 if let Ok(Some(val)) = rt.block_on(repo.get_raw::<ipld_core::ipld::Ipld>(&key)) { 391 reply.data(&serde_json::to_string(&val).unwrap().as_bytes()[offset as usize..]); 392 return; 393 } 394 } 395 reply.error(libc::ENOENT); 396 } 397}