nushell on your web browser
nushell wasm terminal
at main 12 kB view raw
1use crate::error::to_shell_err; 2use crate::globals::{get_pwd, print_to_console, register_task, remove_task}; 3use anyhow::{Result, anyhow}; 4use futures::future::{AbortHandle, Abortable}; 5use jacquard::types::aturi::AtUri; 6use jacquard::types::ident::AtIdentifier; 7use jacquard::types::string::Rkey; 8use jacquard::{ 9 api::com_atproto::{ 10 repo::{get_record::GetRecord, list_records::ListRecords}, 11 sync::get_repo::GetRepo, 12 }, 13 client::{Agent, MemorySessionStore, credential_session::CredentialSession}, 14 identity::{resolver::IdentityResolver, slingshot_resolver_default}, 15 prelude::*, 16 types::{did::Did, nsid::Nsid}, 17}; 18use jacquard_repo::{ 19 Repository, 20 car::parse_car_bytes, 21 storage::{BlockStore, MemoryBlockStore}, 22}; 23use nu_engine::CallExt; 24use nu_protocol::{ 25 Category, PipelineData, ShellError, Signature, SyntaxShape, Value, 26 engine::{Command, EngineState, Stack}, 27}; 28use nu_protocol::{IntoPipelineData, Type}; 29use std::io::Write; 30use std::str::FromStr; 31use std::sync::Arc; 32use url::Url; 33use wasm_bindgen_futures::spawn_local; 34 35#[derive(Clone)] 36pub struct Fetch; 37 38impl Command for Fetch { 39 fn name(&self) -> &str { 40 "fetch" 41 } 42 43 fn signature(&self) -> Signature { 44 Signature::build(self.name()) 45 .required( 46 "uri", 47 SyntaxShape::String, 48 "HTTP URI or AT URI (at://identifier[/collection[/rkey]])", 49 ) 50 .named("output", SyntaxShape::Filepath, "output path", Some('o')) 51 .input_output_type(Type::Nothing, Type::Nothing) 52 .category(Category::Network) 53 } 54 55 fn description(&self) -> &str { 56 "fetch files using HTTP or fetch records from an AT Protocol repository.\nsupports full repo (CAR), collection, or a single record." 57 } 58 59 fn run( 60 &self, 61 engine_state: &EngineState, 62 stack: &mut Stack, 63 call: &nu_protocol::engine::Call, 64 _input: PipelineData, 65 ) -> Result<PipelineData, ShellError> { 66 let uri_arg: String = call.req(engine_state, stack, 0)?; 67 let output_arg: Option<String> = call.get_flag(engine_state, stack, "output")?; 68 let output_span = call.get_flag_span(stack, "output"); 69 70 let at_uri = uri_arg 71 .starts_with("at://") 72 .then(|| { 73 AtUri::from_str(&uri_arg).map_err(|err| ShellError::GenericError { 74 error: "invalid AT URI format".into(), 75 msg: err.to_string(), 76 span: Some(call.arguments_span()), 77 help: None, 78 inner: vec![], 79 }) 80 }) 81 .transpose()?; 82 let uri = at_uri 83 .is_none() 84 .then(|| { 85 Url::from_str(&uri_arg).map_err(|err| ShellError::GenericError { 86 error: "invalid URI format".into(), 87 msg: err.to_string(), 88 span: Some(call.arguments_span()), 89 help: None, 90 inner: vec![], 91 }) 92 }) 93 .transpose()?; 94 95 // Determine target directory 96 let pwd = get_pwd(); 97 let base_dir = if let Some(out) = output_arg.as_ref() { 98 pwd.join(out.trim_end_matches('/')) 99 .map_err(to_shell_err(output_span.unwrap()))? 100 } else { 101 pwd.join("").unwrap() 102 }; 103 104 let final_path = if let Some(at_uri) = at_uri.as_ref() { 105 match (at_uri.collection(), at_uri.rkey()) { 106 (None, _) => base_dir 107 .join(at_uri.authority()) 108 .map_err(to_shell_err(call.span()))?, 109 (Some(collection), None) => base_dir 110 .join(collection) 111 .map_err(to_shell_err(call.span()))?, 112 // use rkey as file name 113 (Some(_), Some(rkey)) => output_arg 114 .is_some() 115 .then(|| Ok(base_dir.clone())) 116 .unwrap_or_else(|| base_dir.join(&rkey.0).map_err(to_shell_err(call.span())))?, 117 } 118 } else if let Some(uri) = uri.as_ref() { 119 // choose file name from uri 120 output_arg 121 .is_some() 122 .then(|| Ok(base_dir.clone())) 123 .unwrap_or_else(|| { 124 base_dir 125 .join( 126 uri.path() 127 .split('/') 128 .rfind(|a| !a.is_empty()) 129 .unwrap_or("index"), 130 ) 131 .map_err(to_shell_err(call.span())) 132 })? 133 } else { 134 // todo: make these into an enum so we dont need this 135 unreachable!("either of at_uri or uri should be set") 136 }; 137 138 if !final_path.exists().map_err(to_shell_err(call.span()))? { 139 // if http uri or at uri with rkey, we create parent dir since we'll 140 // write just a single file, otherwise we create this path as directory 141 if at_uri.as_ref().map_or(true, |at| at.rkey().is_some()) { 142 final_path 143 .parent() 144 .create_dir_all() 145 .map_err(to_shell_err(call.span()))?; 146 } else { 147 final_path 148 .create_dir_all() 149 .map_err(to_shell_err(call.span()))?; 150 } 151 } 152 153 let (abort_handle, abort_registration) = AbortHandle::new_pair(); 154 let task_desc = format!("{name} {uri_arg}", name = self.name()); 155 let task_id = register_task(task_desc.clone(), abort_handle); 156 157 spawn_local(async move { 158 let task_future = async { 159 if let Some(at_uri) = at_uri { 160 let identifier = at_uri.authority().clone(); 161 match (at_uri.collection(), at_uri.rkey()) { 162 (Some(coll), Some(key)) => { 163 fetch_record(identifier, coll.clone(), key.0.clone(), final_path).await 164 } 165 (Some(coll), None) => { 166 fetch_collection(identifier, coll.clone(), final_path).await 167 } 168 _ => fetch_repo(identifier, final_path).await, 169 } 170 } else if let Some(uri) = uri { 171 fetch_file(uri, final_path).await 172 } else { 173 Ok(()) 174 } 175 }; 176 177 let abortable = Abortable::new(task_future, abort_registration); 178 179 match abortable.await { 180 Ok(result) => { 181 remove_task(task_id); 182 match result { 183 Ok(_) => {} 184 Err(e) => { 185 let _ = print_to_console( 186 &format!("\x1b[31m✖\x1b[0m ({task_desc}) error: {e}"), 187 false, 188 ); 189 } 190 } 191 } 192 Err(_) => { 193 remove_task(task_id); 194 } 195 } 196 }); 197 198 Ok(Value::nothing(call.head).into_pipeline_data()) 199 } 200} 201 202async fn fetch_file(uri: Url, target_path: vfs::VfsPath) -> Result<()> { 203 let response = reqwest::get(uri).await?; 204 let content = response.bytes().await?; 205 target_path.create_file()?.write_all(&content)?; 206 Ok(()) 207} 208 209async fn resolve_did(identifier: AtIdentifier<'_>) -> Result<Did<'_>> { 210 match identifier { 211 AtIdentifier::Did(did) => Ok(did), 212 AtIdentifier::Handle(handle) => { 213 let did = slingshot_resolver_default().resolve_handle(&handle).await?; 214 Ok(did) 215 } 216 } 217} 218 219async fn resolve_pds(did: &Did<'_>) -> Result<Url> { 220 slingshot_resolver_default() 221 .resolve_did_doc(did) 222 .await? 223 .parse()? 224 .pds_endpoint() 225 .ok_or_else(|| anyhow!("no pds endpoint in did doc")) 226} 227 228async fn create_agent(pds: Url) -> BasicClient { 229 let store = MemorySessionStore::default(); 230 let session = CredentialSession::new(Arc::new(store), Arc::new(slingshot_resolver_default())); 231 session.set_endpoint(pds).await; 232 Agent::new(session) 233} 234 235async fn fetch_record( 236 identifier: AtIdentifier<'_>, 237 collection: Nsid<'_>, 238 rkey: Rkey<'_>, 239 target_path: vfs::VfsPath, 240) -> anyhow::Result<()> { 241 let did = resolve_did(identifier).await?; 242 let pds = resolve_pds(&did).await?; 243 let client = create_agent(pds).await; 244 245 let nsid = Nsid::new(&collection)?; 246 247 let request = GetRecord::new() 248 .repo(did) 249 .collection(nsid) 250 .rkey(rkey.clone()); 251 252 let output = client.send(request.build()).await?.into_output()?; 253 let file = target_path.create_file()?; 254 serde_json::to_writer_pretty(file, &output.value)?; 255 256 Ok(()) 257} 258 259async fn fetch_collection( 260 identifier: AtIdentifier<'_>, 261 collection: Nsid<'_>, 262 target_path: vfs::VfsPath, 263) -> anyhow::Result<()> { 264 let did = resolve_did(identifier).await?; 265 let pds = resolve_pds(&did).await?; 266 let client = create_agent(pds).await; 267 268 let mut cursor: Option<String> = None; 269 270 loop { 271 let request = ListRecords::new() 272 .repo(did.clone()) 273 .collection(collection.clone()) 274 .limit(100) 275 .cursor(cursor.map(Into::into)); 276 277 let response = client.send(request.build()).await?; 278 279 let output = response.into_output()?; 280 281 for rec in output.records { 282 if let Some(rkey) = rec.uri.rkey() { 283 let file = target_path.join(&rkey.0)?.create_file()?; 284 serde_json::to_writer_pretty(file, &rec.value)?; 285 } 286 } 287 288 if let Some(c) = output.cursor { 289 cursor = Some(c.into()); 290 } else { 291 break; 292 } 293 } 294 295 Ok(()) 296} 297 298async fn fetch_repo(identifier: AtIdentifier<'_>, target_path: vfs::VfsPath) -> Result<()> { 299 let did = resolve_did(identifier).await?; 300 let pds = resolve_pds(&did).await?; 301 let client = create_agent(pds).await; 302 303 let response = client.send(GetRepo::new().did(did).build()).await?; 304 305 // parse the car and create the repo 306 let car_bytes = response.into_output()?.body; 307 let parsed = parse_car_bytes(&car_bytes).await?; 308 let storage = Arc::new(MemoryBlockStore::new_from_blocks(parsed.blocks)); 309 let repo = Repository::from_commit(storage, &parsed.root).await?; 310 311 // Iterate over all leaves in the MST 312 // repo.mst() gives us the Mst<S> 313 // mst.leaves() returns Vec<(SmolStr, IpldCid)> 314 let leaves = repo.mst().leaves_sequential().await?; 315 316 for (key, cid) in leaves { 317 // key format: collection/rkey 318 let key_str = key.as_str(); 319 320 if let Some((collection, rkey)) = key_str.split_once('/') { 321 // Get the record block from storage 322 // The record data is just a CBOR block. We need to convert it to JSON for readability. 323 if let Some(block_bytes) = repo.storage().get(&cid).await? { 324 // Deserialize DAG-CBOR to generic JSON Value 325 let json_val: jacquard::RawData = serde_ipld_dagcbor::from_slice(&block_bytes) 326 .map_err(|e| anyhow!("failed to deserialize record {}: {}", key, e))?; 327 328 let coll_dir = target_path.join(collection)?; 329 if !coll_dir.exists()? { 330 coll_dir.create_dir_all()?; 331 } 332 let file = coll_dir.join(rkey)?.create_file()?; 333 334 serde_json::to_writer_pretty(file, &json_val)?; 335 } 336 } 337 } 338 339 Ok(()) 340}