1use crate::error::to_shell_err;
2use crate::globals::{get_pwd, print_to_console, register_task, remove_task};
3use anyhow::{Result, anyhow};
4use futures::future::{AbortHandle, Abortable};
5use jacquard::types::aturi::AtUri;
6use jacquard::types::ident::AtIdentifier;
7use jacquard::types::string::Rkey;
8use jacquard::{
9 api::com_atproto::{
10 repo::{get_record::GetRecord, list_records::ListRecords},
11 sync::get_repo::GetRepo,
12 },
13 client::{Agent, MemorySessionStore, credential_session::CredentialSession},
14 identity::{resolver::IdentityResolver, slingshot_resolver_default},
15 prelude::*,
16 types::{did::Did, nsid::Nsid},
17};
18use jacquard_repo::{
19 Repository,
20 car::parse_car_bytes,
21 storage::{BlockStore, MemoryBlockStore},
22};
23use nu_engine::CallExt;
24use nu_protocol::{
25 Category, PipelineData, ShellError, Signature, SyntaxShape, Value,
26 engine::{Command, EngineState, Stack},
27};
28use nu_protocol::{IntoPipelineData, Type};
29use std::io::Write;
30use std::str::FromStr;
31use std::sync::Arc;
32use url::Url;
33use wasm_bindgen_futures::spawn_local;
34
35#[derive(Clone)]
36pub struct Fetch;
37
38impl Command for Fetch {
39 fn name(&self) -> &str {
40 "fetch"
41 }
42
43 fn signature(&self) -> Signature {
44 Signature::build(self.name())
45 .required(
46 "uri",
47 SyntaxShape::String,
48 "HTTP URI or AT URI (at://identifier[/collection[/rkey]])",
49 )
50 .named("output", SyntaxShape::Filepath, "output path", Some('o'))
51 .input_output_type(Type::Nothing, Type::Nothing)
52 .category(Category::Network)
53 }
54
55 fn description(&self) -> &str {
56 "fetch files using HTTP or fetch records from an AT Protocol repository.\nsupports full repo (CAR), collection, or a single record."
57 }
58
59 fn run(
60 &self,
61 engine_state: &EngineState,
62 stack: &mut Stack,
63 call: &nu_protocol::engine::Call,
64 _input: PipelineData,
65 ) -> Result<PipelineData, ShellError> {
66 let uri_arg: String = call.req(engine_state, stack, 0)?;
67 let output_arg: Option<String> = call.get_flag(engine_state, stack, "output")?;
68 let output_span = call.get_flag_span(stack, "output");
69
70 let at_uri = uri_arg
71 .starts_with("at://")
72 .then(|| {
73 AtUri::from_str(&uri_arg).map_err(|err| ShellError::GenericError {
74 error: "invalid AT URI format".into(),
75 msg: err.to_string(),
76 span: Some(call.arguments_span()),
77 help: None,
78 inner: vec![],
79 })
80 })
81 .transpose()?;
82 let uri = at_uri
83 .is_none()
84 .then(|| {
85 Url::from_str(&uri_arg).map_err(|err| ShellError::GenericError {
86 error: "invalid URI format".into(),
87 msg: err.to_string(),
88 span: Some(call.arguments_span()),
89 help: None,
90 inner: vec![],
91 })
92 })
93 .transpose()?;
94
95 // Determine target directory
96 let pwd = get_pwd();
97 let base_dir = if let Some(out) = output_arg.as_ref() {
98 pwd.join(out.trim_end_matches('/'))
99 .map_err(to_shell_err(output_span.unwrap()))?
100 } else {
101 pwd.join("").unwrap()
102 };
103
104 let final_path = if let Some(at_uri) = at_uri.as_ref() {
105 match (at_uri.collection(), at_uri.rkey()) {
106 (None, _) => base_dir
107 .join(at_uri.authority())
108 .map_err(to_shell_err(call.span()))?,
109 (Some(collection), None) => base_dir
110 .join(collection)
111 .map_err(to_shell_err(call.span()))?,
112 // use rkey as file name
113 (Some(_), Some(rkey)) => output_arg
114 .is_some()
115 .then(|| Ok(base_dir.clone()))
116 .unwrap_or_else(|| base_dir.join(&rkey.0).map_err(to_shell_err(call.span())))?,
117 }
118 } else if let Some(uri) = uri.as_ref() {
119 // choose file name from uri
120 output_arg
121 .is_some()
122 .then(|| Ok(base_dir.clone()))
123 .unwrap_or_else(|| {
124 base_dir
125 .join(
126 uri.path()
127 .split('/')
128 .rfind(|a| !a.is_empty())
129 .unwrap_or("index"),
130 )
131 .map_err(to_shell_err(call.span()))
132 })?
133 } else {
134 // todo: make these into an enum so we dont need this
135 unreachable!("either of at_uri or uri should be set")
136 };
137
138 if !final_path.exists().map_err(to_shell_err(call.span()))? {
139 // if http uri or at uri with rkey, we create parent dir since we'll
140 // write just a single file, otherwise we create this path as directory
141 if at_uri.as_ref().map_or(true, |at| at.rkey().is_some()) {
142 final_path
143 .parent()
144 .create_dir_all()
145 .map_err(to_shell_err(call.span()))?;
146 } else {
147 final_path
148 .create_dir_all()
149 .map_err(to_shell_err(call.span()))?;
150 }
151 }
152
153 let (abort_handle, abort_registration) = AbortHandle::new_pair();
154 let task_desc = format!("{name} {uri_arg}", name = self.name());
155 let task_id = register_task(task_desc.clone(), abort_handle);
156
157 spawn_local(async move {
158 let task_future = async {
159 if let Some(at_uri) = at_uri {
160 let identifier = at_uri.authority().clone();
161 match (at_uri.collection(), at_uri.rkey()) {
162 (Some(coll), Some(key)) => {
163 fetch_record(identifier, coll.clone(), key.0.clone(), final_path).await
164 }
165 (Some(coll), None) => {
166 fetch_collection(identifier, coll.clone(), final_path).await
167 }
168 _ => fetch_repo(identifier, final_path).await,
169 }
170 } else if let Some(uri) = uri {
171 fetch_file(uri, final_path).await
172 } else {
173 Ok(())
174 }
175 };
176
177 let abortable = Abortable::new(task_future, abort_registration);
178
179 match abortable.await {
180 Ok(result) => {
181 remove_task(task_id);
182 match result {
183 Ok(_) => {}
184 Err(e) => {
185 let _ = print_to_console(
186 &format!("\x1b[31m✖\x1b[0m ({task_desc}) error: {e}"),
187 false,
188 );
189 }
190 }
191 }
192 Err(_) => {
193 remove_task(task_id);
194 }
195 }
196 });
197
198 Ok(Value::nothing(call.head).into_pipeline_data())
199 }
200}
201
202async fn fetch_file(uri: Url, target_path: vfs::VfsPath) -> Result<()> {
203 let response = reqwest::get(uri).await?;
204 let content = response.bytes().await?;
205 target_path.create_file()?.write_all(&content)?;
206 Ok(())
207}
208
209async fn resolve_did(identifier: AtIdentifier<'_>) -> Result<Did<'_>> {
210 match identifier {
211 AtIdentifier::Did(did) => Ok(did),
212 AtIdentifier::Handle(handle) => {
213 let did = slingshot_resolver_default().resolve_handle(&handle).await?;
214 Ok(did)
215 }
216 }
217}
218
219async fn resolve_pds(did: &Did<'_>) -> Result<Url> {
220 slingshot_resolver_default()
221 .resolve_did_doc(did)
222 .await?
223 .parse()?
224 .pds_endpoint()
225 .ok_or_else(|| anyhow!("no pds endpoint in did doc"))
226}
227
228async fn create_agent(pds: Url) -> BasicClient {
229 let store = MemorySessionStore::default();
230 let session = CredentialSession::new(Arc::new(store), Arc::new(slingshot_resolver_default()));
231 session.set_endpoint(pds).await;
232 Agent::new(session)
233}
234
235async fn fetch_record(
236 identifier: AtIdentifier<'_>,
237 collection: Nsid<'_>,
238 rkey: Rkey<'_>,
239 target_path: vfs::VfsPath,
240) -> anyhow::Result<()> {
241 let did = resolve_did(identifier).await?;
242 let pds = resolve_pds(&did).await?;
243 let client = create_agent(pds).await;
244
245 let nsid = Nsid::new(&collection)?;
246
247 let request = GetRecord::new()
248 .repo(did)
249 .collection(nsid)
250 .rkey(rkey.clone());
251
252 let output = client.send(request.build()).await?.into_output()?;
253 let file = target_path.create_file()?;
254 serde_json::to_writer_pretty(file, &output.value)?;
255
256 Ok(())
257}
258
259async fn fetch_collection(
260 identifier: AtIdentifier<'_>,
261 collection: Nsid<'_>,
262 target_path: vfs::VfsPath,
263) -> anyhow::Result<()> {
264 let did = resolve_did(identifier).await?;
265 let pds = resolve_pds(&did).await?;
266 let client = create_agent(pds).await;
267
268 let mut cursor: Option<String> = None;
269
270 loop {
271 let request = ListRecords::new()
272 .repo(did.clone())
273 .collection(collection.clone())
274 .limit(100)
275 .cursor(cursor.map(Into::into));
276
277 let response = client.send(request.build()).await?;
278
279 let output = response.into_output()?;
280
281 for rec in output.records {
282 if let Some(rkey) = rec.uri.rkey() {
283 let file = target_path.join(&rkey.0)?.create_file()?;
284 serde_json::to_writer_pretty(file, &rec.value)?;
285 }
286 }
287
288 if let Some(c) = output.cursor {
289 cursor = Some(c.into());
290 } else {
291 break;
292 }
293 }
294
295 Ok(())
296}
297
298async fn fetch_repo(identifier: AtIdentifier<'_>, target_path: vfs::VfsPath) -> Result<()> {
299 let did = resolve_did(identifier).await?;
300 let pds = resolve_pds(&did).await?;
301 let client = create_agent(pds).await;
302
303 let response = client.send(GetRepo::new().did(did).build()).await?;
304
305 // parse the car and create the repo
306 let car_bytes = response.into_output()?.body;
307 let parsed = parse_car_bytes(&car_bytes).await?;
308 let storage = Arc::new(MemoryBlockStore::new_from_blocks(parsed.blocks));
309 let repo = Repository::from_commit(storage, &parsed.root).await?;
310
311 // Iterate over all leaves in the MST
312 // repo.mst() gives us the Mst<S>
313 // mst.leaves() returns Vec<(SmolStr, IpldCid)>
314 let leaves = repo.mst().leaves_sequential().await?;
315
316 for (key, cid) in leaves {
317 // key format: collection/rkey
318 let key_str = key.as_str();
319
320 if let Some((collection, rkey)) = key_str.split_once('/') {
321 // Get the record block from storage
322 // The record data is just a CBOR block. We need to convert it to JSON for readability.
323 if let Some(block_bytes) = repo.storage().get(&cid).await? {
324 // Deserialize DAG-CBOR to generic JSON Value
325 let json_val: jacquard::RawData = serde_ipld_dagcbor::from_slice(&block_bytes)
326 .map_err(|e| anyhow!("failed to deserialize record {}: {}", key, e))?;
327
328 let coll_dir = target_path.join(collection)?;
329 if !coll_dir.exists()? {
330 coll_dir.create_dir_all()?;
331 }
332 let file = coll_dir.join(rkey)?.create_file()?;
333
334 serde_json::to_writer_pretty(file, &json_val)?;
335 }
336 }
337 }
338
339 Ok(())
340}