use std::sync::{Arc, Mutex}; use std::time::{self, SystemTime, UNIX_EPOCH, Duration}; use atrium_repo::{Repository, blockstore::AsyncBlockStoreRead}; use futures::StreamExt; use indexmap::{IndexMap, IndexSet}; type Inode = usize; /// Decode a TID (timestamp identifier) to get the timestamp in microseconds since Unix epoch fn tid_to_timestamp(tid: &str) -> Option { const S32_CHAR: &[u8] = b"234567abcdefghijklmnopqrstuvwxyz"; if tid.len() != 13 { return None; } let mut value: u64 = 0; for ch in tid.chars() { let pos = S32_CHAR.iter().position(|&c| c as char == ch)?; // Big-endian: first character is most significant value = (value << 5) | (pos as u64); } // Extract timestamp from upper bits (shifted by 10) let micros = value >> 10; UNIX_EPOCH.checked_add(Duration::from_micros(micros)) } pub struct PdsFs { repos: Arc>>>, inodes: Arc>>, sizes: Arc>>, content_cache: Arc>>, rt: tokio::runtime::Runtime, } #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub enum PdsFsEntry { Zero, Root, Did(String), Collection(PdsFsCollection), Record(PdsFsRecord), } impl PdsFsEntry { fn as_collection(&self) -> Option<&PdsFsCollection> { match &self { Self::Collection(c) => Some(c), _ => None, } } fn as_did(&self) -> Option<&String> { match &self { Self::Did(d) => Some(d), _ => None, } } fn unwrap_collection(&self) -> &PdsFsCollection { self.as_collection().unwrap() } fn unwrap_did(&self) -> &String { self.as_did().unwrap() } } #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct PdsFsCollection { pub parent: Inode, pub nsid: String, } #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct PdsFsRecord { pub parent: Inode, pub rkey: String, } // impl PdsFsRecord { // fn key(&self) -> String { // format!("{}/{}", self.collection, self.rkey) // } // } const TTL: time::Duration = time::Duration::from_secs(300); const BLKSIZE: u32 = 512; const ROOTDIR_ATTR: fuser::FileAttr = fuser::FileAttr { ino: 1, size: 0, blocks: 0, atime: time::UNIX_EPOCH, mtime: time::UNIX_EPOCH, ctime: time::UNIX_EPOCH, crtime: time::UNIX_EPOCH, kind: fuser::FileType::Directory, perm: 0o755, nlink: 2, uid: 501, gid: 20, rdev: 0, flags: 0, blksize: BLKSIZE, }; impl PdsFs where R: AsyncBlockStoreRead, { pub fn new() -> Self { PdsFs { repos: Arc::new(Mutex::new(Default::default())), inodes: Arc::new(Mutex::new(IndexSet::from([PdsFsEntry::Zero, PdsFsEntry::Root]))), sizes: Arc::new(Mutex::new(Default::default())), content_cache: Arc::new(Mutex::new(Default::default())), rt: tokio::runtime::Runtime::new().unwrap(), } } pub fn get_shared_state(&self) -> (Arc>>>, Arc>>, Arc>>, Arc>>) { (Arc::clone(&self.repos), Arc::clone(&self.inodes), Arc::clone(&self.sizes), Arc::clone(&self.content_cache)) } pub async fn add(&mut self, did: String, mut repo: Repository) { let mut mst = repo.tree(); let did_inode = { let mut inodes = self.inodes.lock().unwrap(); let (did_inode, _) = inodes.insert_full(PdsFsEntry::Did(did.clone())); did_inode }; let mut keys = Box::pin(mst.keys()); while let Some(Ok(key)) = keys.next().await { if let Some((collection_name, rkey)) = key.split_once("/") { let mut inodes = self.inodes.lock().unwrap(); let (collection_inode, _) = inodes.insert_full(PdsFsEntry::Collection(PdsFsCollection { parent: did_inode, nsid: collection_name.to_owned(), })); inodes.insert(PdsFsEntry::Record(PdsFsRecord { parent: collection_inode, rkey: rkey.to_owned(), })); } } drop(keys); drop(mst); self.repos.lock().unwrap().insert(did, repo); } fn attr(&mut self, ino: u64) -> fuser::FileAttr { let inodes = self.inodes.lock().unwrap(); match inodes.get_index(ino as usize) { Some(PdsFsEntry::Root) => ROOTDIR_ATTR, Some(PdsFsEntry::Collection(_)) => fuser::FileAttr { ino, size: 0, blocks: 0, atime: time::UNIX_EPOCH, mtime: time::UNIX_EPOCH, ctime: time::UNIX_EPOCH, crtime: time::UNIX_EPOCH, kind: fuser::FileType::Directory, perm: 0o755, nlink: 2, uid: 1000, gid: 1000, rdev: 0, flags: 0, blksize: BLKSIZE, }, Some(PdsFsEntry::Did(_)) => fuser::FileAttr { ino, size: 0, blocks: 0, atime: time::UNIX_EPOCH, mtime: time::UNIX_EPOCH, ctime: time::UNIX_EPOCH, crtime: time::UNIX_EPOCH, kind: fuser::FileType::Directory, perm: 0o755, nlink: 2, uid: 1000, gid: 1000, rdev: 0, flags: 0, blksize: BLKSIZE, }, Some(PdsFsEntry::Record(r)) => { let col = inodes[r.parent].unwrap_collection(); let did = inodes[col.parent].unwrap_did().clone(); let rkey = r.rkey.clone(); let collection_nsid = col.nsid.clone(); drop(inodes); // Check cache first let size = { let sizes = self.sizes.lock().unwrap(); if let Some(&cached_size) = sizes.get(&(ino as usize)) { cached_size } else { drop(sizes); // Not in cache, try to fetch from repo let mut repos = self.repos.lock().unwrap(); let repo = &mut repos[&did]; let key = format!("{}/{}", collection_nsid, rkey); let size = self .rt .block_on(repo.get_raw::(&key)) .ok() .flatten() .map_or(500, |v| serde_json::to_string_pretty(&v).unwrap().len()) as u64; // Cache it for next time self.sizes.lock().unwrap().insert(ino as usize, size); size } }; let blocks = ((size as u32 + BLKSIZE - 1) / BLKSIZE) as u64; // Decode TID to get creation timestamp let timestamp = tid_to_timestamp(&rkey).unwrap_or(time::UNIX_EPOCH); fuser::FileAttr { ino, size, blocks, atime: timestamp, mtime: timestamp, ctime: timestamp, crtime: timestamp, kind: fuser::FileType::RegularFile, perm: 0o644, nlink: 1, uid: 501, gid: 20, rdev: 0, flags: 0, blksize: BLKSIZE, } } _ => panic!("zero"), } } } impl fuser::Filesystem for PdsFs where R: AsyncBlockStoreRead, { fn getattr( &mut self, _req: &fuser::Request, ino: u64, _fh: Option, reply: fuser::ReplyAttr, ) { let len = self.inodes.lock().unwrap().len(); if (ino as usize) < len { reply.attr(&TTL, &self.attr(ino as u64)) } else { reply.error(libc::ENOENT) } } fn readdir( &mut self, _req: &fuser::Request, ino: u64, _fh: u64, offset: i64, mut reply: fuser::ReplyDirectory, ) { let inodes = self.inodes.lock().unwrap(); match inodes.get_index(ino as usize) { Some(PdsFsEntry::Root) => { let entries: Vec<_> = vec![(ino, ".".to_string()), (ino, "..".to_string())] .into_iter() .chain(inodes.iter().enumerate().filter_map(|(i, e)| { if let PdsFsEntry::Did(did) = e { Some((i as u64, did.clone())) } else { None } })) .collect(); drop(inodes); for (index, (inode_num, name)) in entries.into_iter().enumerate().skip(offset as usize) { if reply.add( inode_num, (index + 1) as i64, fuser::FileType::Directory, name, ) { break; } } reply.ok() } Some(PdsFsEntry::Did(_)) => { let entries: Vec<_> = vec![(ino, ".".to_string()), (1, "..".to_string())] .into_iter() .chain(inodes.iter().enumerate().filter_map(|(i, e)| { if let PdsFsEntry::Collection(col) = e { if col.parent == ino as usize { Some((i as u64, col.nsid.clone())) } else { None } } else { None } })) .collect(); drop(inodes); for (index, (inode_num, name)) in entries.into_iter().enumerate().skip(offset as usize) { let full = reply.add( inode_num, (index + 1) as i64, if name.starts_with('.') { fuser::FileType::Directory } else { fuser::FileType::RegularFile }, name, ); if full { break; } } reply.ok(); } Some(PdsFsEntry::Collection(c)) => { let parent_ino = c.parent; let entries: Vec<_> = [(ino, ".".to_string()), (parent_ino as u64, "..".to_string())] .into_iter() .chain(inodes.iter().enumerate().filter_map(|(i, e)| { if let PdsFsEntry::Record(record) = e { if record.parent == ino as usize { Some((i as u64, format!("{}.json", record.rkey))) } else { None } } else { None } })) .collect(); drop(inodes); for (index, (inode_num, name)) in entries.into_iter().enumerate().skip(offset as usize) { let full = reply.add( inode_num, (index + 1) as i64, if name.starts_with('.') { fuser::FileType::Directory } else { fuser::FileType::RegularFile }, name, ); if full { break; } } reply.ok() } _ => { drop(inodes); reply.error(libc::ENOENT) } } } fn lookup( &mut self, _req: &fuser::Request, parent: u64, name: &std::ffi::OsStr, reply: fuser::ReplyEntry, ) { let inodes = self.inodes.lock().unwrap(); match inodes.get_index(parent as usize) { Some(PdsFsEntry::Root) => { let did = PdsFsEntry::Did(name.to_string_lossy().to_string()); if let Some(ino) = inodes.get_index_of(&did) { drop(inodes); reply.entry(&TTL, &self.attr(ino as u64), 0); } else { drop(inodes); reply.error(libc::ENOENT) } } Some(PdsFsEntry::Did(_)) => { let col = PdsFsEntry::Collection(PdsFsCollection { parent: parent as usize, nsid: name.to_string_lossy().to_string(), }); if let Some(ino) = inodes.get_index_of(&col) { drop(inodes); reply.entry(&TTL, &self.attr(ino as u64), 0); } else { drop(inodes); reply.error(libc::ENOENT) } } Some(PdsFsEntry::Collection(_)) => { let name_str = name.to_string_lossy(); let rkey = name_str.strip_suffix(".json").unwrap_or(&name_str).to_string(); let record = PdsFsEntry::Record(PdsFsRecord { parent: parent as usize, rkey, }); if let Some(ino) = inodes.get_index_of(&record) { drop(inodes); reply.entry(&TTL, &self.attr(ino as u64), 0); } else { drop(inodes); reply.error(libc::ENOENT) } } _ => { drop(inodes); reply.error(libc::ENOENT) } } } fn read( &mut self, _req: &fuser::Request, ino: u64, _fh: u64, offset: i64, _size: u32, _flags: i32, _lock: Option, reply: fuser::ReplyData, ) { let inodes = self.inodes.lock().unwrap(); if let Some(PdsFsEntry::Record(r)) = inodes.get_index(ino as usize) { let col = inodes[r.parent].unwrap_collection(); let did = inodes[col.parent].unwrap_did().clone(); let key = format!("{}/{}", col.nsid, r.rkey); let cache_key = format!("{}/{}", did, key); drop(inodes); // Check content cache first (for new records from firehose) { let cache = self.content_cache.lock().unwrap(); if let Some(content) = cache.get(&cache_key) { reply.data(&content.as_bytes()[offset as usize..]); return; } } // Fall back to repo let mut repos = self.repos.lock().unwrap(); let repo = &mut repos[&did]; if let Ok(Some(val)) = self.rt.block_on(repo.get_raw::(&key)) { reply.data(&serde_json::to_string(&val).unwrap().as_bytes()[offset as usize..]); return; } } else { drop(inodes); } reply.error(libc::ENOENT); } }