1use std::{
2 collections::{BTreeMap, hash_set::Iter},
3 time,
4};
5
6use atrium_repo::{Repository, blockstore::AsyncBlockStoreRead};
7use futures::StreamExt;
8use indexmap::{IndexMap, IndexSet};
9
10type Inode = usize;
11
12pub struct PdsFs<R> {
13 repos: IndexMap<String, Repository<R>>,
14 inodes: IndexSet<PdsFsEntry>,
15}
16
17#[derive(Debug, Clone, PartialEq, Eq, Hash)]
18pub enum PdsFsEntry {
19 Zero,
20 Root,
21 Did(String),
22 Collection(PdsFsCollection),
23 Record(PdsFsRecord),
24}
25
26impl PdsFsEntry {
27 fn as_collection(&self) -> Option<&PdsFsCollection> {
28 match &self {
29 Self::Collection(c) => Some(c),
30 _ => None,
31 }
32 }
33
34 fn as_did(&self) -> Option<&String> {
35 match &self {
36 Self::Did(d) => Some(d),
37 _ => None,
38 }
39 }
40
41 fn unwrap_collection(&self) -> &PdsFsCollection {
42 self.as_collection().unwrap()
43 }
44
45 fn unwrap_did(&self) -> &String {
46 self.as_did().unwrap()
47 }
48}
49
50#[derive(Debug, Clone, PartialEq, Eq, Hash)]
51pub struct PdsFsCollection {
52 parent: Inode,
53 nsid: String,
54}
55
56#[derive(Debug, Clone, PartialEq, Eq, Hash)]
57pub struct PdsFsRecord {
58 parent: Inode,
59 rkey: String,
60}
61
62// impl PdsFsRecord {
63// fn key(&self) -> String {
64// format!("{}/{}", self.collection, self.rkey)
65// }
66// }
67
68const TTL: time::Duration = time::Duration::from_secs(300);
69const BLKSIZE: u32 = 512;
70
71const ROOTDIR_ATTR: fuser::FileAttr = fuser::FileAttr {
72 ino: 1,
73 size: 0,
74 blocks: 0,
75 atime: time::UNIX_EPOCH,
76 mtime: time::UNIX_EPOCH,
77 ctime: time::UNIX_EPOCH,
78 crtime: time::UNIX_EPOCH,
79 kind: fuser::FileType::Directory,
80 perm: 0o755,
81 nlink: 2,
82 uid: 501,
83 gid: 20,
84 rdev: 0,
85 flags: 0,
86 blksize: BLKSIZE,
87};
88
89impl<R> PdsFs<R>
90where
91 R: AsyncBlockStoreRead,
92{
93 pub fn new() -> Self {
94 PdsFs {
95 repos: Default::default(),
96 inodes: IndexSet::from([PdsFsEntry::Zero, PdsFsEntry::Root]),
97 }
98 }
99
100 pub async fn add(&mut self, did: String, mut repo: Repository<R>) {
101 let mut mst = repo.tree();
102
103 let (did_inode, _) = self.inodes.insert_full(PdsFsEntry::Did(did.clone()));
104
105 let mut keys = Box::pin(mst.keys());
106 while let Some(Ok(key)) = keys.next().await {
107 if let Some((collection_name, rkey)) = key.split_once("/") {
108 let (collection_inode, _) =
109 self.inodes
110 .insert_full(PdsFsEntry::Collection(PdsFsCollection {
111 parent: did_inode,
112 nsid: collection_name.to_owned(),
113 }));
114
115 self.inodes.insert(PdsFsEntry::Record(PdsFsRecord {
116 parent: collection_inode,
117 rkey: rkey.to_owned(),
118 }));
119 }
120 }
121
122 drop(keys);
123 drop(mst);
124
125 self.repos.insert(did, repo);
126 }
127
128 fn attr(&mut self, ino: u64) -> fuser::FileAttr {
129 match self.inodes.get_index(ino as usize) {
130 Some(PdsFsEntry::Root) => ROOTDIR_ATTR,
131 Some(PdsFsEntry::Collection(_)) => fuser::FileAttr {
132 ino,
133 size: 0,
134 blocks: 0,
135 atime: time::UNIX_EPOCH,
136 mtime: time::UNIX_EPOCH,
137 ctime: time::UNIX_EPOCH,
138 crtime: time::UNIX_EPOCH,
139 kind: fuser::FileType::Directory,
140 perm: 0o755,
141 nlink: 2,
142 uid: 1000,
143 gid: 1000,
144 rdev: 0,
145 flags: 0,
146 blksize: BLKSIZE,
147 },
148 Some(PdsFsEntry::Did(_)) => fuser::FileAttr {
149 ino,
150 size: 0,
151 blocks: 0,
152 atime: time::UNIX_EPOCH,
153 mtime: time::UNIX_EPOCH,
154 ctime: time::UNIX_EPOCH,
155 crtime: time::UNIX_EPOCH,
156 kind: fuser::FileType::Directory,
157 perm: 0o755,
158 nlink: 2,
159 uid: 1000,
160 gid: 1000,
161 rdev: 0,
162 flags: 0,
163 blksize: BLKSIZE,
164 },
165 Some(PdsFsEntry::Record(r)) => {
166 let col = self.inodes[r.parent].unwrap_collection();
167 let did = self.inodes[col.parent].unwrap_did();
168 let repo = &mut self.repos[did];
169 let key = format!("{}/{}", col.nsid, r.rkey);
170 let rt = tokio::runtime::Runtime::new().unwrap();
171 let size = rt
172 .block_on(repo.get_raw::<ipld_core::ipld::Ipld>(&key))
173 .ok()
174 .flatten()
175 .map_or(0, |v| serde_json::to_string(&v).unwrap().len())
176 as u64;
177 let blocks = ((size as u32 + BLKSIZE - 1) / BLKSIZE) as u64;
178 fuser::FileAttr {
179 ino,
180 size,
181 blocks,
182 atime: time::UNIX_EPOCH,
183 mtime: time::UNIX_EPOCH,
184 ctime: time::UNIX_EPOCH,
185 crtime: time::UNIX_EPOCH,
186 kind: fuser::FileType::RegularFile,
187 perm: 0o644,
188 nlink: 1,
189 uid: 501,
190 gid: 20,
191 rdev: 0,
192 flags: 0,
193 blksize: 512,
194 }
195 }
196 _ => panic!("zero"),
197 }
198 }
199}
200
201impl<R> fuser::Filesystem for PdsFs<R>
202where
203 R: AsyncBlockStoreRead,
204{
205 fn getattr(
206 &mut self,
207 _req: &fuser::Request,
208 ino: u64,
209 _fh: Option<u64>,
210 reply: fuser::ReplyAttr,
211 ) {
212 if (ino as usize) < self.inodes.len() {
213 reply.attr(&TTL, &self.attr(ino as u64))
214 } else {
215 reply.error(libc::ENOENT)
216 }
217 }
218
219 fn readdir(
220 &mut self,
221 _req: &fuser::Request,
222 ino: u64,
223 _fh: u64,
224 offset: i64,
225 mut reply: fuser::ReplyDirectory,
226 ) {
227 match self.inodes.get_index(ino as usize) {
228 Some(PdsFsEntry::Root) => {
229 let entries: Vec<_> = vec![(ino, ".".to_string()), (ino, "..".to_string())]
230 .into_iter()
231 .chain(self.inodes.iter().enumerate().filter_map(|(i, e)| {
232 if let PdsFsEntry::Did(did) = e {
233 Some((i as u64, did.clone()))
234 } else {
235 None
236 }
237 }))
238 .collect();
239
240 for (index, (inode_num, name)) in
241 entries.into_iter().enumerate().skip(offset as usize)
242 {
243 if reply.add(
244 inode_num,
245 (index + 1) as i64,
246 fuser::FileType::Directory,
247 name,
248 ) {
249 break;
250 }
251 }
252 reply.ok()
253 }
254 Some(PdsFsEntry::Did(_)) => {
255 let entries = vec![(ino, ".".to_string()), (1, "..".to_string())]
256 .into_iter()
257 .chain(self.inodes.iter().enumerate().filter_map(|(i, e)| {
258 if let PdsFsEntry::Collection(col) = e {
259 if col.parent == ino as usize {
260 Some((i as u64, col.nsid.clone()))
261 } else {
262 None
263 }
264 } else {
265 None
266 }
267 }))
268 .into_iter()
269 .enumerate()
270 .skip(offset as usize);
271
272 for (index, (inode_num, name)) in entries {
273 let full = reply.add(
274 inode_num,
275 (index + 1) as i64,
276 if name.starts_with('.') {
277 fuser::FileType::Directory
278 } else {
279 fuser::FileType::RegularFile
280 },
281 name,
282 );
283 if full {
284 break;
285 }
286 }
287
288 reply.ok();
289 }
290 Some(PdsFsEntry::Collection(c)) => {
291 let entries = [(ino, ".".to_string()), (c.parent as u64, "..".to_string())]
292 .into_iter()
293 .chain(self.inodes.iter().enumerate().filter_map(|(i, e)| {
294 if let PdsFsEntry::Record(record) = e {
295 if record.parent == ino as usize {
296 Some((i as u64, record.rkey.clone()))
297 } else {
298 None
299 }
300 } else {
301 None
302 }
303 }))
304 .into_iter()
305 .enumerate()
306 .skip(offset as usize);
307
308 for (index, (inode_num, name)) in entries {
309 let full = reply.add(
310 inode_num,
311 (index + 1) as i64,
312 if name.starts_with('.') {
313 fuser::FileType::Directory
314 } else {
315 fuser::FileType::RegularFile
316 },
317 name,
318 );
319 if full {
320 break;
321 }
322 }
323
324 reply.ok()
325 }
326 _ => reply.error(libc::ENOENT),
327 }
328 }
329
330 fn lookup(
331 &mut self,
332 _req: &fuser::Request,
333 parent: u64,
334 name: &std::ffi::OsStr,
335 reply: fuser::ReplyEntry,
336 ) {
337 match self.inodes.get_index(parent as usize) {
338 Some(PdsFsEntry::Root) => {
339 let did = PdsFsEntry::Did(name.to_string_lossy().to_string());
340 if let Some(ino) = self.inodes.get_index_of(&did) {
341 reply.entry(&TTL, &self.attr(ino as u64), 0);
342 } else {
343 reply.error(libc::ENOENT)
344 }
345 }
346 Some(PdsFsEntry::Did(_)) => {
347 let col = PdsFsEntry::Collection(PdsFsCollection {
348 parent: parent as usize,
349 nsid: name.to_string_lossy().to_string(),
350 });
351 if let Some(ino) = self.inodes.get_index_of(&col) {
352 reply.entry(&TTL, &self.attr(ino as u64), 0);
353 } else {
354 reply.error(libc::ENOENT)
355 }
356 }
357 Some(PdsFsEntry::Collection(_)) => {
358 let record = PdsFsEntry::Record(PdsFsRecord {
359 parent: parent as usize,
360 rkey: name.to_string_lossy().to_string(),
361 });
362 if let Some(ino) = self.inodes.get_index_of(&record) {
363 reply.entry(&TTL, &self.attr(ino as u64), 0);
364 } else {
365 reply.error(libc::ENOENT)
366 }
367 }
368 _ => reply.error(libc::ENOENT),
369 }
370 }
371
372 fn read(
373 &mut self,
374 _req: &fuser::Request,
375 ino: u64,
376 _fh: u64,
377 offset: i64,
378 _size: u32,
379 _flags: i32,
380 _lock: Option<u64>,
381 reply: fuser::ReplyData,
382 ) {
383 let rt = tokio::runtime::Runtime::new().unwrap();
384 if let Some(PdsFsEntry::Record(r)) = self.inodes.get_index(ino as usize) {
385 let col = self.inodes[r.parent].unwrap_collection();
386 let did = self.inodes[col.parent].unwrap_did();
387 let repo = &mut self.repos[did];
388 let key = format!("{}/{}", col.nsid, r.rkey);
389
390 if let Ok(Some(val)) = rt.block_on(repo.get_raw::<ipld_core::ipld::Ipld>(&key)) {
391 reply.data(&serde_json::to_string(&val).unwrap().as_bytes()[offset as usize..]);
392 return;
393 }
394 }
395 reply.error(libc::ENOENT);
396 }
397}