wip

Changed files
+68 -11
src
+68 -11
src/main.rs
··· 6 6 use log::{error, info}; 7 7 use reqwest::header::{ACCEPT, ACCEPT_ENCODING}; 8 8 use anyhow; 9 - use tokio_util::compat::FuturesAsyncReadCompatExt; 9 + use tokio_util::compat::{FuturesAsyncReadCompatExt, TokioAsyncReadCompatExt}; 10 + use tokio_util::io::StreamReader; 10 11 use tokio_stream::wrappers::LinesStream; 12 + use tokio::io::AsyncWriteExt; 11 13 use std::future::Future; 12 - use async_compression::futures::bufread::ZstdEncoder; 14 + use async_compression::futures::bufread::{ZstdEncoder, ZstdDecoder}; 13 15 use s3::{Bucket, Region}; 14 16 use s3::creds::Credentials; 15 17 use futures::io::BufReader; ··· 36 38 /// The PDS base URL 37 39 #[arg(long, value_name = "URL")] 38 40 pds_url: String, 41 + }, 42 + 43 + /// Import: download from S3, decompress zstd, and save locally 44 + Import { 45 + /// The DID to import 46 + #[arg(long)] 47 + did: String, 39 48 }, 40 49 } 41 50 ··· 83 92 error!("Export failed: {}", err); 84 93 } 85 94 } 86 - // Placeholder for actual export logic. 87 - // Implement your export functionality here. 88 - // println!( 89 - // "Export would run here with did={} pds_url={} out_dir={}", 90 - // did, 91 - // pds_url, 92 - // out_dir.display() 93 - // ); 95 + } 96 + Commands::Import { did } => { 97 + info!("Import requested: did={}", did); 98 + match do_import(did).await { 99 + Ok(path) => info!("Import completed, wrote {}", path.display()), 100 + Err(err) => error!("Import failed: {}", err), 101 + } 94 102 } 95 103 } 96 104 } ··· 140 148 .await?; 141 149 142 150 Ok(()) 151 + } 143 152 144 - 153 + async fn do_import(did: String) -> anyhow::Result<PathBuf> { 154 + use futures::StreamExt; 155 + 156 + let back_up_path = format!("users/{did}"); 157 + 158 + // S3 setup from env 159 + let region_name = env::var("S3_REGION")?; 160 + let endpoint = env::var("S3_ENDPOINT")?; 161 + let region = Region::Custom { region: region_name, endpoint }; 162 + 163 + let bucket = Bucket::new( 164 + env::var("S3_BUCKET_NAME")?.as_str(), 165 + region, 166 + Credentials::new(Some(env::var("S3_ACCESS_KEY")?.as_str()), Some(env::var("S3_SECRET_KEY")?.as_str()), None, None, None)?, 167 + )?; 168 + 169 + // Stream download from S3 170 + let mut s3_stream = bucket 171 + .get_object_stream(format!("/{back_up_path}/{did}.car.zst")) 172 + .await?; 173 + 174 + // Convert the stream of Bytes into a tokio AsyncRead 175 + let byte_stream = s3_stream 176 + .bytes() 177 + .map(|res| res.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))); 178 + 179 + let tokio_reader = StreamReader::new(byte_stream); 180 + 181 + // Convert tokio AsyncRead -> futures AsyncRead, then buffer for decoder 182 + let futures_reader = tokio_reader.compat(); 183 + let futures_buf = futures::io::BufReader::new(futures_reader); 184 + 185 + // Zstd decode 186 + let decoder = ZstdDecoder::new(futures_buf); 187 + 188 + // Convert back to tokio AsyncRead for writing 189 + let mut decoded_tokio_reader = decoder.compat(); 190 + 191 + // Prepare local output path, labeled as decompressed 192 + let out_path: PathBuf = ["export", "users", did.as_str(), &format!("{}-decompressed.car", did)].iter().collect(); 193 + ensure_dir(&out_path)?; 194 + 195 + let mut out_file = tokio::fs::File::create(&out_path).await?; 196 + 197 + // Stream copy decoded content to file 198 + tokio::io::copy(&mut decoded_tokio_reader, &mut out_file).await?; 199 + out_file.flush().await?; 200 + 201 + Ok(out_path) 145 202 }