mount an atproto PDS repository as a FUSE filesystem
at main 16 kB view raw
1use std::sync::{Arc, Mutex}; 2use std::time::{self, SystemTime, UNIX_EPOCH, Duration}; 3 4use atrium_repo::{Repository, blockstore::AsyncBlockStoreRead}; 5use futures::StreamExt; 6use indexmap::{IndexMap, IndexSet}; 7 8type Inode = usize; 9 10/// Decode a TID (timestamp identifier) to get the timestamp in microseconds since Unix epoch 11fn tid_to_timestamp(tid: &str) -> Option<SystemTime> { 12 const S32_CHAR: &[u8] = b"234567abcdefghijklmnopqrstuvwxyz"; 13 14 if tid.len() != 13 { 15 return None; 16 } 17 18 let mut value: u64 = 0; 19 for ch in tid.chars() { 20 let pos = S32_CHAR.iter().position(|&c| c as char == ch)?; 21 // Big-endian: first character is most significant 22 value = (value << 5) | (pos as u64); 23 } 24 25 // Extract timestamp from upper bits (shifted by 10) 26 let micros = value >> 10; 27 28 UNIX_EPOCH.checked_add(Duration::from_micros(micros)) 29} 30 31pub struct PdsFs<R> { 32 repos: Arc<Mutex<IndexMap<String, Repository<R>>>>, 33 inodes: Arc<Mutex<IndexSet<PdsFsEntry>>>, 34 sizes: Arc<Mutex<IndexMap<Inode, u64>>>, 35 content_cache: Arc<Mutex<IndexMap<String, String>>>, 36 rt: tokio::runtime::Runtime, 37} 38 39#[derive(Debug, Clone, PartialEq, Eq, Hash)] 40pub enum PdsFsEntry { 41 Zero, 42 Root, 43 Did(String), 44 Collection(PdsFsCollection), 45 Record(PdsFsRecord), 46} 47 48impl PdsFsEntry { 49 fn as_collection(&self) -> Option<&PdsFsCollection> { 50 match &self { 51 Self::Collection(c) => Some(c), 52 _ => None, 53 } 54 } 55 56 fn as_did(&self) -> Option<&String> { 57 match &self { 58 Self::Did(d) => Some(d), 59 _ => None, 60 } 61 } 62 63 fn unwrap_collection(&self) -> &PdsFsCollection { 64 self.as_collection().unwrap() 65 } 66 67 fn unwrap_did(&self) -> &String { 68 self.as_did().unwrap() 69 } 70} 71 72#[derive(Debug, Clone, PartialEq, Eq, Hash)] 73pub struct PdsFsCollection { 74 pub parent: Inode, 75 pub nsid: String, 76} 77 78#[derive(Debug, Clone, PartialEq, Eq, Hash)] 79pub struct PdsFsRecord { 80 pub parent: Inode, 81 pub rkey: String, 82} 83 84// impl PdsFsRecord { 85// fn key(&self) -> String { 86// format!("{}/{}", self.collection, self.rkey) 87// } 88// } 89 90const TTL: time::Duration = time::Duration::from_secs(300); 91const BLKSIZE: u32 = 512; 92 93const ROOTDIR_ATTR: fuser::FileAttr = fuser::FileAttr { 94 ino: 1, 95 size: 0, 96 blocks: 0, 97 atime: time::UNIX_EPOCH, 98 mtime: time::UNIX_EPOCH, 99 ctime: time::UNIX_EPOCH, 100 crtime: time::UNIX_EPOCH, 101 kind: fuser::FileType::Directory, 102 perm: 0o755, 103 nlink: 2, 104 uid: 501, 105 gid: 20, 106 rdev: 0, 107 flags: 0, 108 blksize: BLKSIZE, 109}; 110 111impl<R> PdsFs<R> 112where 113 R: AsyncBlockStoreRead, 114{ 115 pub fn new() -> Self { 116 PdsFs { 117 repos: Arc::new(Mutex::new(Default::default())), 118 inodes: Arc::new(Mutex::new(IndexSet::from([PdsFsEntry::Zero, PdsFsEntry::Root]))), 119 sizes: Arc::new(Mutex::new(Default::default())), 120 content_cache: Arc::new(Mutex::new(Default::default())), 121 rt: tokio::runtime::Runtime::new().unwrap(), 122 } 123 } 124 125 pub fn get_shared_state(&self) -> (Arc<Mutex<IndexMap<String, Repository<R>>>>, Arc<Mutex<IndexSet<PdsFsEntry>>>, Arc<Mutex<IndexMap<Inode, u64>>>, Arc<Mutex<IndexMap<String, String>>>) { 126 (Arc::clone(&self.repos), Arc::clone(&self.inodes), Arc::clone(&self.sizes), Arc::clone(&self.content_cache)) 127 } 128 129 pub async fn add(&mut self, did: String, mut repo: Repository<R>) { 130 let mut mst = repo.tree(); 131 132 let did_inode = { 133 let mut inodes = self.inodes.lock().unwrap(); 134 let (did_inode, _) = inodes.insert_full(PdsFsEntry::Did(did.clone())); 135 did_inode 136 }; 137 138 let mut keys = Box::pin(mst.keys()); 139 while let Some(Ok(key)) = keys.next().await { 140 if let Some((collection_name, rkey)) = key.split_once("/") { 141 let mut inodes = self.inodes.lock().unwrap(); 142 let (collection_inode, _) = inodes.insert_full(PdsFsEntry::Collection(PdsFsCollection { 143 parent: did_inode, 144 nsid: collection_name.to_owned(), 145 })); 146 147 inodes.insert(PdsFsEntry::Record(PdsFsRecord { 148 parent: collection_inode, 149 rkey: rkey.to_owned(), 150 })); 151 } 152 } 153 154 drop(keys); 155 drop(mst); 156 157 self.repos.lock().unwrap().insert(did, repo); 158 } 159 160 fn attr(&mut self, ino: u64) -> fuser::FileAttr { 161 let inodes = self.inodes.lock().unwrap(); 162 match inodes.get_index(ino as usize) { 163 Some(PdsFsEntry::Root) => ROOTDIR_ATTR, 164 Some(PdsFsEntry::Collection(_)) => fuser::FileAttr { 165 ino, 166 size: 0, 167 blocks: 0, 168 atime: time::UNIX_EPOCH, 169 mtime: time::UNIX_EPOCH, 170 ctime: time::UNIX_EPOCH, 171 crtime: time::UNIX_EPOCH, 172 kind: fuser::FileType::Directory, 173 perm: 0o755, 174 nlink: 2, 175 uid: 1000, 176 gid: 1000, 177 rdev: 0, 178 flags: 0, 179 blksize: BLKSIZE, 180 }, 181 Some(PdsFsEntry::Did(_)) => fuser::FileAttr { 182 ino, 183 size: 0, 184 blocks: 0, 185 atime: time::UNIX_EPOCH, 186 mtime: time::UNIX_EPOCH, 187 ctime: time::UNIX_EPOCH, 188 crtime: time::UNIX_EPOCH, 189 kind: fuser::FileType::Directory, 190 perm: 0o755, 191 nlink: 2, 192 uid: 1000, 193 gid: 1000, 194 rdev: 0, 195 flags: 0, 196 blksize: BLKSIZE, 197 }, 198 Some(PdsFsEntry::Record(r)) => { 199 let col = inodes[r.parent].unwrap_collection(); 200 let did = inodes[col.parent].unwrap_did().clone(); 201 let rkey = r.rkey.clone(); 202 let collection_nsid = col.nsid.clone(); 203 drop(inodes); 204 205 // Check cache first 206 let size = { 207 let sizes = self.sizes.lock().unwrap(); 208 if let Some(&cached_size) = sizes.get(&(ino as usize)) { 209 cached_size 210 } else { 211 drop(sizes); 212 // Not in cache, try to fetch from repo 213 let mut repos = self.repos.lock().unwrap(); 214 let repo = &mut repos[&did]; 215 let key = format!("{}/{}", collection_nsid, rkey); 216 let size = self 217 .rt 218 .block_on(repo.get_raw::<ipld_core::ipld::Ipld>(&key)) 219 .ok() 220 .flatten() 221 .map_or(500, |v| serde_json::to_string_pretty(&v).unwrap().len()) 222 as u64; 223 // Cache it for next time 224 self.sizes.lock().unwrap().insert(ino as usize, size); 225 size 226 } 227 }; 228 let blocks = ((size as u32 + BLKSIZE - 1) / BLKSIZE) as u64; 229 230 // Decode TID to get creation timestamp 231 let timestamp = tid_to_timestamp(&rkey).unwrap_or(time::UNIX_EPOCH); 232 233 fuser::FileAttr { 234 ino, 235 size, 236 blocks, 237 atime: timestamp, 238 mtime: timestamp, 239 ctime: timestamp, 240 crtime: timestamp, 241 kind: fuser::FileType::RegularFile, 242 perm: 0o644, 243 nlink: 1, 244 uid: 501, 245 gid: 20, 246 rdev: 0, 247 flags: 0, 248 blksize: BLKSIZE, 249 } 250 } 251 _ => panic!("zero"), 252 } 253 } 254} 255 256impl<R> fuser::Filesystem for PdsFs<R> 257where 258 R: AsyncBlockStoreRead, 259{ 260 fn getattr( 261 &mut self, 262 _req: &fuser::Request, 263 ino: u64, 264 _fh: Option<u64>, 265 reply: fuser::ReplyAttr, 266 ) { 267 let len = self.inodes.lock().unwrap().len(); 268 if (ino as usize) < len { 269 reply.attr(&TTL, &self.attr(ino as u64)) 270 } else { 271 reply.error(libc::ENOENT) 272 } 273 } 274 275 fn readdir( 276 &mut self, 277 _req: &fuser::Request, 278 ino: u64, 279 _fh: u64, 280 offset: i64, 281 mut reply: fuser::ReplyDirectory, 282 ) { 283 let inodes = self.inodes.lock().unwrap(); 284 match inodes.get_index(ino as usize) { 285 Some(PdsFsEntry::Root) => { 286 let entries: Vec<_> = vec![(ino, ".".to_string()), (ino, "..".to_string())] 287 .into_iter() 288 .chain(inodes.iter().enumerate().filter_map(|(i, e)| { 289 if let PdsFsEntry::Did(did) = e { 290 Some((i as u64, did.clone())) 291 } else { 292 None 293 } 294 })) 295 .collect(); 296 drop(inodes); 297 298 for (index, (inode_num, name)) in 299 entries.into_iter().enumerate().skip(offset as usize) 300 { 301 if reply.add( 302 inode_num, 303 (index + 1) as i64, 304 fuser::FileType::Directory, 305 name, 306 ) { 307 break; 308 } 309 } 310 reply.ok() 311 } 312 Some(PdsFsEntry::Did(_)) => { 313 let entries: Vec<_> = vec![(ino, ".".to_string()), (1, "..".to_string())] 314 .into_iter() 315 .chain(inodes.iter().enumerate().filter_map(|(i, e)| { 316 if let PdsFsEntry::Collection(col) = e { 317 if col.parent == ino as usize { 318 Some((i as u64, col.nsid.clone())) 319 } else { 320 None 321 } 322 } else { 323 None 324 } 325 })) 326 .collect(); 327 drop(inodes); 328 329 for (index, (inode_num, name)) in entries.into_iter().enumerate().skip(offset as usize) { 330 let full = reply.add( 331 inode_num, 332 (index + 1) as i64, 333 if name.starts_with('.') { 334 fuser::FileType::Directory 335 } else { 336 fuser::FileType::RegularFile 337 }, 338 name, 339 ); 340 if full { 341 break; 342 } 343 } 344 345 reply.ok(); 346 } 347 Some(PdsFsEntry::Collection(c)) => { 348 let parent_ino = c.parent; 349 let entries: Vec<_> = [(ino, ".".to_string()), (parent_ino as u64, "..".to_string())] 350 .into_iter() 351 .chain(inodes.iter().enumerate().filter_map(|(i, e)| { 352 if let PdsFsEntry::Record(record) = e { 353 if record.parent == ino as usize { 354 Some((i as u64, format!("{}.json", record.rkey))) 355 } else { 356 None 357 } 358 } else { 359 None 360 } 361 })) 362 .collect(); 363 drop(inodes); 364 365 for (index, (inode_num, name)) in entries.into_iter().enumerate().skip(offset as usize) { 366 let full = reply.add( 367 inode_num, 368 (index + 1) as i64, 369 if name.starts_with('.') { 370 fuser::FileType::Directory 371 } else { 372 fuser::FileType::RegularFile 373 }, 374 name, 375 ); 376 if full { 377 break; 378 } 379 } 380 381 reply.ok() 382 } 383 _ => { 384 drop(inodes); 385 reply.error(libc::ENOENT) 386 } 387 } 388 } 389 390 fn lookup( 391 &mut self, 392 _req: &fuser::Request, 393 parent: u64, 394 name: &std::ffi::OsStr, 395 reply: fuser::ReplyEntry, 396 ) { 397 let inodes = self.inodes.lock().unwrap(); 398 match inodes.get_index(parent as usize) { 399 Some(PdsFsEntry::Root) => { 400 let did = PdsFsEntry::Did(name.to_string_lossy().to_string()); 401 if let Some(ino) = inodes.get_index_of(&did) { 402 drop(inodes); 403 reply.entry(&TTL, &self.attr(ino as u64), 0); 404 } else { 405 drop(inodes); 406 reply.error(libc::ENOENT) 407 } 408 } 409 Some(PdsFsEntry::Did(_)) => { 410 let col = PdsFsEntry::Collection(PdsFsCollection { 411 parent: parent as usize, 412 nsid: name.to_string_lossy().to_string(), 413 }); 414 if let Some(ino) = inodes.get_index_of(&col) { 415 drop(inodes); 416 reply.entry(&TTL, &self.attr(ino as u64), 0); 417 } else { 418 drop(inodes); 419 reply.error(libc::ENOENT) 420 } 421 } 422 Some(PdsFsEntry::Collection(_)) => { 423 let name_str = name.to_string_lossy(); 424 let rkey = name_str.strip_suffix(".json").unwrap_or(&name_str).to_string(); 425 let record = PdsFsEntry::Record(PdsFsRecord { 426 parent: parent as usize, 427 rkey, 428 }); 429 if let Some(ino) = inodes.get_index_of(&record) { 430 drop(inodes); 431 reply.entry(&TTL, &self.attr(ino as u64), 0); 432 } else { 433 drop(inodes); 434 reply.error(libc::ENOENT) 435 } 436 } 437 _ => { 438 drop(inodes); 439 reply.error(libc::ENOENT) 440 } 441 } 442 } 443 444 fn read( 445 &mut self, 446 _req: &fuser::Request, 447 ino: u64, 448 _fh: u64, 449 offset: i64, 450 _size: u32, 451 _flags: i32, 452 _lock: Option<u64>, 453 reply: fuser::ReplyData, 454 ) { 455 let inodes = self.inodes.lock().unwrap(); 456 if let Some(PdsFsEntry::Record(r)) = inodes.get_index(ino as usize) { 457 let col = inodes[r.parent].unwrap_collection(); 458 let did = inodes[col.parent].unwrap_did().clone(); 459 let key = format!("{}/{}", col.nsid, r.rkey); 460 let cache_key = format!("{}/{}", did, key); 461 drop(inodes); 462 463 // Check content cache first (for new records from firehose) 464 { 465 let cache = self.content_cache.lock().unwrap(); 466 if let Some(content) = cache.get(&cache_key) { 467 reply.data(&content.as_bytes()[offset as usize..]); 468 return; 469 } 470 } 471 472 // Fall back to repo 473 let mut repos = self.repos.lock().unwrap(); 474 let repo = &mut repos[&did]; 475 if let Ok(Some(val)) = self.rt.block_on(repo.get_raw::<ipld_core::ipld::Ipld>(&key)) { 476 reply.data(&serde_json::to_string(&val).unwrap().as_bytes()[offset as usize..]); 477 return; 478 } 479 } else { 480 drop(inodes); 481 } 482 reply.error(libc::ENOENT); 483 } 484}