1use anyhow::{anyhow, Result};
2use jacquard::{
3 api::com_atproto::repo::{describe_repo::DescribeRepo, list_records::ListRecords},
4 client::{credential_session::CredentialSession, Agent, BasicClient, MemorySessionStore},
5 identity::{resolver::IdentityResolver, slingshot_resolver_default},
6 prelude::*,
7 types::{did::Did, nsid::Nsid, string::Handle},
8};
9use scc::{HashMap, HashSet};
10use url::Url;
11
12use std::{
13 collections::HashMap as StdHashMap,
14 fmt::Debug,
15 io::{self, ErrorKind},
16 sync::Arc,
17};
18
19pub mod cli;
20#[cfg(target_os = "linux")]
21pub mod fuse;
22
23pub async fn resolve_did(identifier: &str) -> Result<Did<'static>> {
24 if let Ok(did) = Did::new_owned(identifier) {
25 return Ok(did);
26 }
27
28 let handle = Handle::new(identifier).map_err(|e| anyhow!("invalid handle: {}", e))?;
29 let did = slingshot_resolver_default()
30 .resolve_handle(&handle)
31 .await
32 .map_err(|e| anyhow!("resolution failed: {}", e))?;
33
34 Ok(did)
35}
36
37pub async fn resolve_pds(did: &Did<'_>) -> Result<Url> {
38 slingshot_resolver_default()
39 .resolve_did_doc(did)
40 .await?
41 .parse()?
42 .pds_endpoint()
43 .ok_or_else(|| anyhow!("no pds endpoint in did doc"))
44}
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47pub enum FileType {
48 File,
49 Directory,
50}
51
52#[derive(Debug, Clone)]
53pub struct Metadata {
54 pub file_type: FileType,
55 pub len: u64,
56}
57
58#[derive(Debug)]
59struct CachedPage {
60 files: StdHashMap<String, Arc<Vec<u8>>>,
61 next_cursor: Option<String>,
62}
63
64pub struct AtpFS {
65 did: Did<'static>,
66 client: BasicClient,
67 cache: HashMap<String, Arc<CachedPage>>,
68 root_cache: HashSet<String>,
69}
70
71impl Debug for AtpFS {
72 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73 f.debug_struct("AtpFS").field("did", &self.did).finish()
74 }
75}
76
77impl AtpFS {
78 pub async fn new(did: Did<'static>, pds: Url) -> Self {
79 let store = MemorySessionStore::default();
80 let session =
81 CredentialSession::new(Arc::new(store), Arc::new(slingshot_resolver_default()));
82
83 session.set_endpoint(pds).await;
84
85 Self {
86 did,
87 client: Agent::new(session),
88 cache: HashMap::default(),
89 root_cache: HashSet::default(),
90 }
91 }
92
93 fn segments<'a, 's>(&'s self, path: &'a str) -> Vec<&'a str> {
94 path.trim_matches('/')
95 .split('/')
96 .filter(|s| !s.is_empty())
97 .collect()
98 }
99
100 fn dir_metadata() -> Metadata {
101 Metadata {
102 file_type: FileType::Directory,
103 len: 0,
104 }
105 }
106
107 fn file_metadata(len: u64) -> Metadata {
108 Metadata {
109 file_type: FileType::File,
110 len,
111 }
112 }
113
114 async fn ensure_root_loaded(&self) -> io::Result<()> {
115 if self.root_cache.is_empty() {
116 let request = DescribeRepo::new().repo(self.did.clone()).build();
117
118 let response = self
119 .client
120 .send(request)
121 .await
122 .map_err(|e| io::Error::new(ErrorKind::Other, e.to_string()))?;
123
124 let output = response
125 .into_output()
126 .map_err(|e| io::Error::new(ErrorKind::Other, e.to_string()))?;
127
128 for col in output.collections {
129 let _ = self.root_cache.insert_async(col.to_string()).await;
130 }
131 }
132 return Ok(());
133 }
134
135 async fn ensure_loaded(&self, path: &str) -> io::Result<String> {
136 let segs = self.segments(path);
137
138 if segs.is_empty() {
139 self.ensure_root_loaded().await?;
140 return Ok("".to_string());
141 }
142
143 let collection = segs[0];
144 if self.root_cache.is_empty() {
145 self.ensure_root_loaded().await?;
146 }
147
148 if !self.root_cache.contains(collection) {
149 return Err(ErrorKind::NotFound.into());
150 }
151
152 let mut current_key = collection.to_string();
153 let mut parent_cursor: Option<String> = None;
154
155 for (_i, segment) in segs.iter().enumerate().skip(1) {
156 if *segment == "next" {
157 self.fetch_page_if_missing(¤t_key, parent_cursor.clone())
158 .await?;
159 let next_cursor_val = self.cache.read(¤t_key, |_, v| v.next_cursor.clone());
160
161 if let Some(Some(cursor)) = next_cursor_val {
162 parent_cursor = Some(cursor);
163 current_key = format!("{}/next", current_key);
164 } else {
165 return Err(ErrorKind::NotFound.into());
166 }
167 } else if segment.ends_with(".json") {
168 break;
169 } else {
170 return Err(ErrorKind::NotFound.into());
171 }
172 }
173
174 self.fetch_page_if_missing(¤t_key, parent_cursor)
175 .await?;
176 Ok(current_key)
177 }
178
179 async fn fetch_page_if_missing(&self, key: &str, cursor: Option<String>) -> io::Result<()> {
180 if self.cache.contains(key) {
181 return Ok(());
182 }
183
184 let parts: Vec<&str> = key.split('/').collect();
185 let collection = parts[0];
186
187 let request = ListRecords::new()
188 .repo(self.did.clone())
189 .collection(Nsid::new(collection).expect("nsid should be valid"))
190 .limit(100)
191 .cursor(cursor.map(Into::into));
192
193 let response = self
194 .client
195 .send(request.build())
196 .await
197 .map_err(|e| io::Error::new(ErrorKind::Other, e.to_string()))?;
198
199 let output = response
200 .into_output()
201 .map_err(|e| io::Error::new(ErrorKind::Other, e.to_string()))?;
202
203 let mut files = StdHashMap::new();
204 for rec in output.records {
205 if let Some(rkey) = rec.uri.rkey() {
206 let filename = format!("{}.json", rkey.0);
207 let content = serde_json::to_vec_pretty(&rec.value).unwrap_or_default();
208 files.insert(filename, Arc::new(content));
209 }
210 }
211
212 let _ = self
213 .cache
214 .insert_async(
215 key.to_string(),
216 Arc::new(CachedPage {
217 files,
218 next_cursor: output.cursor.map(Into::into),
219 }),
220 )
221 .await;
222
223 Ok(())
224 }
225
226 pub async fn read_dir(&self, path: &str) -> io::Result<Vec<String>> {
227 let segs = self.segments(path);
228
229 if segs.is_empty() {
230 self.ensure_root_loaded().await?;
231 let mut keys = Vec::new();
232 self.root_cache.scan(|k| keys.push(k.clone()));
233 return Ok(keys);
234 }
235
236 let cache_key = self.ensure_loaded(path).await?;
237
238 if path.ends_with(".json") {
239 return Err(io::Error::new(ErrorKind::Other, "not a directory"));
240 }
241
242 let page = self
243 .cache
244 .read(&cache_key, |_, v| v.clone())
245 .ok_or(ErrorKind::NotFound)?;
246
247 let mut entries: Vec<String> = page.files.keys().cloned().collect();
248 if page.next_cursor.is_some() {
249 entries.push("next".to_string());
250 }
251
252 Ok(entries)
253 }
254
255 pub async fn open_file(&self, path: &str) -> io::Result<Arc<Vec<u8>>> {
256 let parent_path = std::path::Path::new(path)
257 .parent()
258 .unwrap_or(std::path::Path::new(""))
259 .to_str()
260 .unwrap();
261 let cache_key = self.ensure_loaded(parent_path).await?;
262 let filename = path.split('/').last().ok_or(ErrorKind::NotFound)?;
263
264 let content = self
265 .cache
266 .read(&cache_key, |_, page| page.files.get(filename).cloned())
267 .flatten();
268
269 content.ok_or(ErrorKind::NotFound.into())
270 }
271
272 pub async fn metadata(&self, path: &str) -> io::Result<Metadata> {
273 let segs = self.segments(path);
274 if segs.is_empty() {
275 return Ok(Self::dir_metadata());
276 }
277
278 if segs.len() == 1 {
279 self.ensure_root_loaded().await?;
280 if self.root_cache.contains(segs[0]) {
281 return Ok(Self::dir_metadata());
282 } else {
283 return Err(ErrorKind::NotFound.into());
284 }
285 }
286
287 if let Some(last) = segs.last() {
288 if *last == "next" {
289 let parent = &path[0..path.len() - 5];
290 let cache_key = self.ensure_loaded(parent).await?;
291 let has_next = self
292 .cache
293 .read(&cache_key, |_, v| v.next_cursor.is_some())
294 .unwrap_or(false);
295 if has_next {
296 return Ok(Self::dir_metadata());
297 }
298 return Err(ErrorKind::NotFound.into());
299 }
300 }
301
302 if path.ends_with(".json") {
303 let parent_path = std::path::Path::new(path)
304 .parent()
305 .unwrap()
306 .to_str()
307 .unwrap();
308 let cache_key = self.ensure_loaded(parent_path).await?;
309 let filename = segs.last().unwrap();
310
311 let len = self
312 .cache
313 .read(&cache_key, |_, page| {
314 page.files.get(*filename).map(|f| f.len())
315 })
316 .flatten();
317
318 if let Some(l) = len {
319 return Ok(Self::file_metadata(l as u64));
320 }
321 return Err(ErrorKind::NotFound.into());
322 }
323
324 Err(ErrorKind::NotFound.into())
325 }
326
327 pub async fn exists(&self, path: &str) -> io::Result<bool> {
328 Ok(self.metadata(path).await.is_ok())
329 }
330}