1use std::sync::{Arc, Mutex};
2use std::time::{self, SystemTime, UNIX_EPOCH, Duration};
3
4use atrium_repo::{Repository, blockstore::AsyncBlockStoreRead};
5use futures::StreamExt;
6use indexmap::{IndexMap, IndexSet};
7
8type Inode = usize;
9
10/// Decode a TID (timestamp identifier) to get the timestamp in microseconds since Unix epoch
11fn tid_to_timestamp(tid: &str) -> Option<SystemTime> {
12 const S32_CHAR: &[u8] = b"234567abcdefghijklmnopqrstuvwxyz";
13
14 if tid.len() != 13 {
15 return None;
16 }
17
18 let mut value: u64 = 0;
19 for ch in tid.chars() {
20 let pos = S32_CHAR.iter().position(|&c| c as char == ch)?;
21 // Big-endian: first character is most significant
22 value = (value << 5) | (pos as u64);
23 }
24
25 // Extract timestamp from upper bits (shifted by 10)
26 let micros = value >> 10;
27
28 UNIX_EPOCH.checked_add(Duration::from_micros(micros))
29}
30
31pub struct PdsFs<R> {
32 repos: Arc<Mutex<IndexMap<String, Repository<R>>>>,
33 inodes: Arc<Mutex<IndexSet<PdsFsEntry>>>,
34 sizes: Arc<Mutex<IndexMap<Inode, u64>>>,
35 content_cache: Arc<Mutex<IndexMap<String, String>>>,
36 rt: tokio::runtime::Runtime,
37}
38
39#[derive(Debug, Clone, PartialEq, Eq, Hash)]
40pub enum PdsFsEntry {
41 Zero,
42 Root,
43 Did(String),
44 Collection(PdsFsCollection),
45 Record(PdsFsRecord),
46}
47
48impl PdsFsEntry {
49 fn as_collection(&self) -> Option<&PdsFsCollection> {
50 match &self {
51 Self::Collection(c) => Some(c),
52 _ => None,
53 }
54 }
55
56 fn as_did(&self) -> Option<&String> {
57 match &self {
58 Self::Did(d) => Some(d),
59 _ => None,
60 }
61 }
62
63 fn unwrap_collection(&self) -> &PdsFsCollection {
64 self.as_collection().unwrap()
65 }
66
67 fn unwrap_did(&self) -> &String {
68 self.as_did().unwrap()
69 }
70}
71
72#[derive(Debug, Clone, PartialEq, Eq, Hash)]
73pub struct PdsFsCollection {
74 pub parent: Inode,
75 pub nsid: String,
76}
77
78#[derive(Debug, Clone, PartialEq, Eq, Hash)]
79pub struct PdsFsRecord {
80 pub parent: Inode,
81 pub rkey: String,
82}
83
84// impl PdsFsRecord {
85// fn key(&self) -> String {
86// format!("{}/{}", self.collection, self.rkey)
87// }
88// }
89
90const TTL: time::Duration = time::Duration::from_secs(300);
91const BLKSIZE: u32 = 512;
92
93const ROOTDIR_ATTR: fuser::FileAttr = fuser::FileAttr {
94 ino: 1,
95 size: 0,
96 blocks: 0,
97 atime: time::UNIX_EPOCH,
98 mtime: time::UNIX_EPOCH,
99 ctime: time::UNIX_EPOCH,
100 crtime: time::UNIX_EPOCH,
101 kind: fuser::FileType::Directory,
102 perm: 0o755,
103 nlink: 2,
104 uid: 501,
105 gid: 20,
106 rdev: 0,
107 flags: 0,
108 blksize: BLKSIZE,
109};
110
111impl<R> PdsFs<R>
112where
113 R: AsyncBlockStoreRead,
114{
115 pub fn new() -> Self {
116 PdsFs {
117 repos: Arc::new(Mutex::new(Default::default())),
118 inodes: Arc::new(Mutex::new(IndexSet::from([PdsFsEntry::Zero, PdsFsEntry::Root]))),
119 sizes: Arc::new(Mutex::new(Default::default())),
120 content_cache: Arc::new(Mutex::new(Default::default())),
121 rt: tokio::runtime::Runtime::new().unwrap(),
122 }
123 }
124
125 pub fn get_shared_state(&self) -> (Arc<Mutex<IndexMap<String, Repository<R>>>>, Arc<Mutex<IndexSet<PdsFsEntry>>>, Arc<Mutex<IndexMap<Inode, u64>>>, Arc<Mutex<IndexMap<String, String>>>) {
126 (Arc::clone(&self.repos), Arc::clone(&self.inodes), Arc::clone(&self.sizes), Arc::clone(&self.content_cache))
127 }
128
129 pub async fn add(&mut self, did: String, mut repo: Repository<R>) {
130 let mut mst = repo.tree();
131
132 let did_inode = {
133 let mut inodes = self.inodes.lock().unwrap();
134 let (did_inode, _) = inodes.insert_full(PdsFsEntry::Did(did.clone()));
135 did_inode
136 };
137
138 let mut keys = Box::pin(mst.keys());
139 while let Some(Ok(key)) = keys.next().await {
140 if let Some((collection_name, rkey)) = key.split_once("/") {
141 let mut inodes = self.inodes.lock().unwrap();
142 let (collection_inode, _) = inodes.insert_full(PdsFsEntry::Collection(PdsFsCollection {
143 parent: did_inode,
144 nsid: collection_name.to_owned(),
145 }));
146
147 inodes.insert(PdsFsEntry::Record(PdsFsRecord {
148 parent: collection_inode,
149 rkey: rkey.to_owned(),
150 }));
151 }
152 }
153
154 drop(keys);
155 drop(mst);
156
157 self.repos.lock().unwrap().insert(did, repo);
158 }
159
160 fn attr(&mut self, ino: u64) -> fuser::FileAttr {
161 let inodes = self.inodes.lock().unwrap();
162 match inodes.get_index(ino as usize) {
163 Some(PdsFsEntry::Root) => ROOTDIR_ATTR,
164 Some(PdsFsEntry::Collection(_)) => fuser::FileAttr {
165 ino,
166 size: 0,
167 blocks: 0,
168 atime: time::UNIX_EPOCH,
169 mtime: time::UNIX_EPOCH,
170 ctime: time::UNIX_EPOCH,
171 crtime: time::UNIX_EPOCH,
172 kind: fuser::FileType::Directory,
173 perm: 0o755,
174 nlink: 2,
175 uid: 1000,
176 gid: 1000,
177 rdev: 0,
178 flags: 0,
179 blksize: BLKSIZE,
180 },
181 Some(PdsFsEntry::Did(_)) => fuser::FileAttr {
182 ino,
183 size: 0,
184 blocks: 0,
185 atime: time::UNIX_EPOCH,
186 mtime: time::UNIX_EPOCH,
187 ctime: time::UNIX_EPOCH,
188 crtime: time::UNIX_EPOCH,
189 kind: fuser::FileType::Directory,
190 perm: 0o755,
191 nlink: 2,
192 uid: 1000,
193 gid: 1000,
194 rdev: 0,
195 flags: 0,
196 blksize: BLKSIZE,
197 },
198 Some(PdsFsEntry::Record(r)) => {
199 let col = inodes[r.parent].unwrap_collection();
200 let did = inodes[col.parent].unwrap_did().clone();
201 let rkey = r.rkey.clone();
202 let collection_nsid = col.nsid.clone();
203 drop(inodes);
204
205 // Check cache first
206 let size = {
207 let sizes = self.sizes.lock().unwrap();
208 if let Some(&cached_size) = sizes.get(&(ino as usize)) {
209 cached_size
210 } else {
211 drop(sizes);
212 // Not in cache, try to fetch from repo
213 let mut repos = self.repos.lock().unwrap();
214 let repo = &mut repos[&did];
215 let key = format!("{}/{}", collection_nsid, rkey);
216 let size = self
217 .rt
218 .block_on(repo.get_raw::<ipld_core::ipld::Ipld>(&key))
219 .ok()
220 .flatten()
221 .map_or(500, |v| serde_json::to_string_pretty(&v).unwrap().len())
222 as u64;
223 // Cache it for next time
224 self.sizes.lock().unwrap().insert(ino as usize, size);
225 size
226 }
227 };
228 let blocks = ((size as u32 + BLKSIZE - 1) / BLKSIZE) as u64;
229
230 // Decode TID to get creation timestamp
231 let timestamp = tid_to_timestamp(&rkey).unwrap_or(time::UNIX_EPOCH);
232
233 fuser::FileAttr {
234 ino,
235 size,
236 blocks,
237 atime: timestamp,
238 mtime: timestamp,
239 ctime: timestamp,
240 crtime: timestamp,
241 kind: fuser::FileType::RegularFile,
242 perm: 0o644,
243 nlink: 1,
244 uid: 501,
245 gid: 20,
246 rdev: 0,
247 flags: 0,
248 blksize: BLKSIZE,
249 }
250 }
251 _ => panic!("zero"),
252 }
253 }
254}
255
256impl<R> fuser::Filesystem for PdsFs<R>
257where
258 R: AsyncBlockStoreRead,
259{
260 fn getattr(
261 &mut self,
262 _req: &fuser::Request,
263 ino: u64,
264 _fh: Option<u64>,
265 reply: fuser::ReplyAttr,
266 ) {
267 let len = self.inodes.lock().unwrap().len();
268 if (ino as usize) < len {
269 reply.attr(&TTL, &self.attr(ino as u64))
270 } else {
271 reply.error(libc::ENOENT)
272 }
273 }
274
275 fn readdir(
276 &mut self,
277 _req: &fuser::Request,
278 ino: u64,
279 _fh: u64,
280 offset: i64,
281 mut reply: fuser::ReplyDirectory,
282 ) {
283 let inodes = self.inodes.lock().unwrap();
284 match inodes.get_index(ino as usize) {
285 Some(PdsFsEntry::Root) => {
286 let entries: Vec<_> = vec![(ino, ".".to_string()), (ino, "..".to_string())]
287 .into_iter()
288 .chain(inodes.iter().enumerate().filter_map(|(i, e)| {
289 if let PdsFsEntry::Did(did) = e {
290 Some((i as u64, did.clone()))
291 } else {
292 None
293 }
294 }))
295 .collect();
296 drop(inodes);
297
298 for (index, (inode_num, name)) in
299 entries.into_iter().enumerate().skip(offset as usize)
300 {
301 if reply.add(
302 inode_num,
303 (index + 1) as i64,
304 fuser::FileType::Directory,
305 name,
306 ) {
307 break;
308 }
309 }
310 reply.ok()
311 }
312 Some(PdsFsEntry::Did(_)) => {
313 let entries: Vec<_> = vec![(ino, ".".to_string()), (1, "..".to_string())]
314 .into_iter()
315 .chain(inodes.iter().enumerate().filter_map(|(i, e)| {
316 if let PdsFsEntry::Collection(col) = e {
317 if col.parent == ino as usize {
318 Some((i as u64, col.nsid.clone()))
319 } else {
320 None
321 }
322 } else {
323 None
324 }
325 }))
326 .collect();
327 drop(inodes);
328
329 for (index, (inode_num, name)) in entries.into_iter().enumerate().skip(offset as usize) {
330 let full = reply.add(
331 inode_num,
332 (index + 1) as i64,
333 if name.starts_with('.') {
334 fuser::FileType::Directory
335 } else {
336 fuser::FileType::RegularFile
337 },
338 name,
339 );
340 if full {
341 break;
342 }
343 }
344
345 reply.ok();
346 }
347 Some(PdsFsEntry::Collection(c)) => {
348 let parent_ino = c.parent;
349 let entries: Vec<_> = [(ino, ".".to_string()), (parent_ino as u64, "..".to_string())]
350 .into_iter()
351 .chain(inodes.iter().enumerate().filter_map(|(i, e)| {
352 if let PdsFsEntry::Record(record) = e {
353 if record.parent == ino as usize {
354 Some((i as u64, format!("{}.json", record.rkey)))
355 } else {
356 None
357 }
358 } else {
359 None
360 }
361 }))
362 .collect();
363 drop(inodes);
364
365 for (index, (inode_num, name)) in entries.into_iter().enumerate().skip(offset as usize) {
366 let full = reply.add(
367 inode_num,
368 (index + 1) as i64,
369 if name.starts_with('.') {
370 fuser::FileType::Directory
371 } else {
372 fuser::FileType::RegularFile
373 },
374 name,
375 );
376 if full {
377 break;
378 }
379 }
380
381 reply.ok()
382 }
383 _ => {
384 drop(inodes);
385 reply.error(libc::ENOENT)
386 }
387 }
388 }
389
390 fn lookup(
391 &mut self,
392 _req: &fuser::Request,
393 parent: u64,
394 name: &std::ffi::OsStr,
395 reply: fuser::ReplyEntry,
396 ) {
397 let inodes = self.inodes.lock().unwrap();
398 match inodes.get_index(parent as usize) {
399 Some(PdsFsEntry::Root) => {
400 let did = PdsFsEntry::Did(name.to_string_lossy().to_string());
401 if let Some(ino) = inodes.get_index_of(&did) {
402 drop(inodes);
403 reply.entry(&TTL, &self.attr(ino as u64), 0);
404 } else {
405 drop(inodes);
406 reply.error(libc::ENOENT)
407 }
408 }
409 Some(PdsFsEntry::Did(_)) => {
410 let col = PdsFsEntry::Collection(PdsFsCollection {
411 parent: parent as usize,
412 nsid: name.to_string_lossy().to_string(),
413 });
414 if let Some(ino) = inodes.get_index_of(&col) {
415 drop(inodes);
416 reply.entry(&TTL, &self.attr(ino as u64), 0);
417 } else {
418 drop(inodes);
419 reply.error(libc::ENOENT)
420 }
421 }
422 Some(PdsFsEntry::Collection(_)) => {
423 let name_str = name.to_string_lossy();
424 let rkey = name_str.strip_suffix(".json").unwrap_or(&name_str).to_string();
425 let record = PdsFsEntry::Record(PdsFsRecord {
426 parent: parent as usize,
427 rkey,
428 });
429 if let Some(ino) = inodes.get_index_of(&record) {
430 drop(inodes);
431 reply.entry(&TTL, &self.attr(ino as u64), 0);
432 } else {
433 drop(inodes);
434 reply.error(libc::ENOENT)
435 }
436 }
437 _ => {
438 drop(inodes);
439 reply.error(libc::ENOENT)
440 }
441 }
442 }
443
444 fn read(
445 &mut self,
446 _req: &fuser::Request,
447 ino: u64,
448 _fh: u64,
449 offset: i64,
450 _size: u32,
451 _flags: i32,
452 _lock: Option<u64>,
453 reply: fuser::ReplyData,
454 ) {
455 let inodes = self.inodes.lock().unwrap();
456 if let Some(PdsFsEntry::Record(r)) = inodes.get_index(ino as usize) {
457 let col = inodes[r.parent].unwrap_collection();
458 let did = inodes[col.parent].unwrap_did().clone();
459 let key = format!("{}/{}", col.nsid, r.rkey);
460 let cache_key = format!("{}/{}", did, key);
461 drop(inodes);
462
463 // Check content cache first (for new records from firehose)
464 {
465 let cache = self.content_cache.lock().unwrap();
466 if let Some(content) = cache.get(&cache_key) {
467 reply.data(&content.as_bytes()[offset as usize..]);
468 return;
469 }
470 }
471
472 // Fall back to repo
473 let mut repos = self.repos.lock().unwrap();
474 let repo = &mut repos[&did];
475 if let Ok(Some(val)) = self.rt.block_on(repo.get_raw::<ipld_core::ipld::Ipld>(&key)) {
476 reply.data(&serde_json::to_string(&val).unwrap().as_bytes()[offset as usize..]);
477 return;
478 }
479 } else {
480 drop(inodes);
481 }
482 reply.error(libc::ENOENT);
483 }
484}