Compare changes

Choose any two refs to compare.

Changed files
+204 -47
src
+2 -1
.gitignore
··· 1 1 /target 2 2 .idea 3 - .env 3 + .env 4 + .DS_Store
+2
Cargo.lock
··· 1233 1233 "log", 1234 1234 "reqwest", 1235 1235 "rust-s3", 1236 + "serde", 1237 + "serde_json", 1236 1238 "tokio", 1237 1239 "tokio-stream", 1238 1240 "tokio-util",
+2
Cargo.toml
··· 16 16 tokio-util = { version = "0.7.16", features = ["compat"] } 17 17 tokio-stream = { version = "0.1.17", features = ["io-util"] } 18 18 rust-s3 = "0.37.0" 19 + serde = { version = "1", features = ["derive"] } 20 + serde_json = "1"
+198 -46
src/main.rs
··· 1 - use std::{env, fs}; 2 1 use std::path::PathBuf; 2 + use std::{env, fs}; 3 3 4 + use anyhow; 5 + use async_compression::futures::bufread::{ZstdDecoder, ZstdEncoder}; 4 6 use clap::{Parser, Subcommand}; 5 7 use dotenvy::dotenv; 8 + use futures::io::BufReader; 6 9 use log::{error, info}; 7 10 use reqwest::header::{ACCEPT, ACCEPT_ENCODING}; 8 - use anyhow; 9 - use tokio_util::compat::FuturesAsyncReadCompatExt; 10 - use tokio_stream::wrappers::LinesStream; 11 - use std::future::Future; 11 + use s3::creds::Credentials; 12 12 use s3::{Bucket, Region}; 13 - use s3::creds::Credentials; 13 + use std::future::Future; 14 + use tokio::io::AsyncWriteExt; 15 + use tokio_stream::wrappers::LinesStream; 16 + use tokio_util::compat::{FuturesAsyncReadCompatExt, TokioAsyncReadCompatExt}; 17 + use tokio_util::io::StreamReader; 18 + use serde::Deserialize; 14 19 15 20 #[derive(Parser, Debug)] 16 21 #[command(author, version, about = "PDS Whatsit Compress Test CLI", long_about = None)] ··· 22 27 #[derive(Subcommand, Debug)] 23 28 enum Commands { 24 29 /// Export data from a PDS instance 25 - Export { 30 + Backup { 26 31 /// Output directory path (defaults to ./export) 27 32 // #[arg(short, long)] 28 33 // out: Option<PathBuf>, ··· 35 40 #[arg(long, value_name = "URL")] 36 41 pds_url: String, 37 42 }, 43 + 44 + /// Import: download from S3, decompress zstd, and save locally 45 + Restore { 46 + /// The DID to import 47 + #[arg(long)] 48 + did: String, 49 + }, 38 50 } 39 51 40 52 fn init_logging() { ··· 52 64 fs::create_dir_all(parent)?; 53 65 } 54 66 } 55 - if !path.exists() { 56 - fs::create_dir_all(path)?; 57 - } 58 67 Ok(()) 59 68 } 60 69 61 70 #[tokio::main] 62 - async fn main() { 71 + async fn main() -> anyhow::Result<()> { 63 72 init_logging(); 64 73 65 74 let cli = Cli::parse(); 75 + // Custom region requires valid region name and endpoint 76 + let region_name = env::var("S3_REGION")?; 77 + let endpoint = env::var("S3_ENDPOINT")?; 78 + let region = Region::Custom { 79 + region: region_name, 80 + endpoint, 81 + }; 66 82 67 - match cli.command { 68 - Commands::Export { did, pds_url } => { 83 + let bucket = Bucket::new( 84 + env::var("S3_BUCKET_NAME")?.as_str(), 85 + region, 86 + // Credentials are collected from environment, config, profile or instance metadata 87 + Credentials::new( 88 + Some(env::var("S3_ACCESS_KEY")?.as_str()), 89 + Some(env::var("S3_SECRET_KEY")?.as_str()), 90 + None, 91 + None, 92 + None, 93 + )?, 94 + )?; 69 95 70 - info!( 71 - "Export requested: did={}, pds_url={}", 72 - did, 73 - pds_url, 74 - ); 96 + match cli.command { 97 + Commands::Backup { did, pds_url } => { 98 + info!("Export requested: did={}, pds_url={}", did, pds_url,); 75 99 76 - match do_work(pds_url, did).await{ 100 + match do_backup(pds_url, did, bucket).await { 77 101 Ok(_) => { 78 102 info!("Export completed"); 103 + Ok(()) 79 104 } 80 105 Err(err) => { 81 106 error!("Export failed: {}", err); 107 + Err(err) 108 + } 109 + } 110 + } 111 + Commands::Restore { did } => { 112 + info!("Import requested: did={}", did); 113 + match do_restore(did, bucket).await { 114 + Ok(path) => { 115 + info!("Import completed, wrote {}", path.display()); 116 + Ok(()) 117 + } 118 + Err(err) => { 119 + error!("Import failed: {}", err); 120 + Err(err) 82 121 } 83 122 } 84 - // Placeholder for actual export logic. 85 - // Implement your export functionality here. 86 - // println!( 87 - // "Export would run here with did={} pds_url={} out_dir={}", 88 - // did, 89 - // pds_url, 90 - // out_dir.display() 91 - // ); 92 123 } 93 124 } 94 125 } 95 126 96 - async fn do_work(pds_url: String, did: String) -> anyhow::Result<()>{ 127 + async fn do_backup(pds_url: String, did: String, bucket: Box<Bucket>) -> anyhow::Result<()> { 97 128 use futures::TryStreamExt; 98 129 let atproto_client = reqwest::Client::new(); 130 + let back_up_path = format!("users/{did}"); 99 131 100 - 101 - // Custom region requires valid region name and endpoint 102 - let region_name = env::var("S3_REGION")?; 103 - let endpoint = env::var("S3_ENDPOINT")?; 104 - let region = Region::Custom { region: region_name, endpoint }; 105 - 106 - let bucket = Bucket::new( 107 - env::var("S3_BUCKET_NAME")?.as_str(), 108 - region, 109 - // Credentials are collected from environment, config, profile or instance metadata 110 - Credentials::new(Some(env::var("S3_ACCESS_KEY")?.as_str()), Some(env::var("S3_SECRET_KEY")?.as_str()), None, None, None)?, 111 - )?; 112 - 113 - let mut response = atproto_client 132 + // 1) Backup the full repo CAR (compressed) 133 + let response_reader = atproto_client 114 134 .get(format!("{pds_url}/xrpc/com.atproto.sync.getRepo?did={did}")) 115 135 .header(ACCEPT, "application/vnd.ipld.car") 116 136 .send() ··· 118 138 .error_for_status()? 119 139 .bytes_stream() 120 140 .map_err(|e| futures::io::Error::new(futures::io::ErrorKind::Other, e)) 121 - .into_async_read() 122 - .compat(); 123 - bucket.put_object_stream(&mut response, did.as_str()).await?; 141 + .into_async_read(); 142 + 143 + let buf_reader = BufReader::new(response_reader); 144 + let zstd_encoder = ZstdEncoder::new(buf_reader); 145 + let mut zstd_tokio_reader = zstd_encoder.compat(); 146 + 147 + bucket 148 + .put_object_stream_builder(format!("/{back_up_path}/{did}.car.zst").as_str()) 149 + .with_content_type("application/vnd.ipld.car") 150 + .with_content_encoding("zstd")? 151 + .execute_stream(&mut zstd_tokio_reader) 152 + .await?; 153 + 154 + // 2) Paginate listBlobs and upload each as zstd-compressed 155 + #[derive(Deserialize)] 156 + struct ListBlobsResponse { 157 + #[allow(dead_code)] 158 + cursor: Option<String>, 159 + cids: Vec<String>, 160 + } 161 + 162 + let mut cursor: Option<String> = None; 163 + let limit = 1000u32; 164 + 165 + loop { 166 + let mut url = format!( 167 + "{}/xrpc/com.atproto.sync.listBlobs?did={}&limit={}", 168 + pds_url, did, limit 169 + ); 170 + if let Some(ref c) = cursor { 171 + if !c.is_empty() { 172 + url.push_str("&cursor="); 173 + url.push_str(c); 174 + } 175 + } 176 + 177 + info!("Listing blobs: {}", url); 178 + let resp = atproto_client 179 + .get(url) 180 + .header(ACCEPT, "application/json") 181 + .send() 182 + .await? 183 + .error_for_status()?; 184 + let bytes = resp.bytes().await?; 185 + let page: ListBlobsResponse = serde_json::from_slice(&bytes)?; 186 + 187 + if page.cids.is_empty() { 188 + if cursor.is_none() || cursor.as_deref() == Some("") { 189 + break; 190 + } 191 + } 192 + 193 + for cid in page.cids { 194 + let blob_url = format!( 195 + "{}/xrpc/com.atproto.sync.getBlob?did={}&cid={}", 196 + pds_url, did, cid 197 + ); 198 + info!("Downloading blob {}", cid); 199 + let blob_reader = atproto_client 200 + .get(blob_url) 201 + .header(ACCEPT, "*/*") 202 + .send() 203 + .await? 204 + .error_for_status()? 205 + .bytes_stream() 206 + .map_err(|e| futures::io::Error::new(futures::io::ErrorKind::Other, e)) 207 + .into_async_read(); 208 + 209 + let blob_buf = BufReader::new(blob_reader); 210 + let blob_zstd = ZstdEncoder::new(blob_buf); 211 + let mut blob_tokio_reader = blob_zstd.compat(); 212 + 213 + let object_key = format!("/{}/blobs/{}.zst", back_up_path, cid); 214 + bucket 215 + .put_object_stream_builder(&object_key) 216 + .with_content_type("application/octet-stream") 217 + .with_content_encoding("zstd")? 218 + .execute_stream(&mut blob_tokio_reader) 219 + .await?; 220 + } 221 + 222 + // Update or finish based on cursor 223 + match page.cursor { 224 + Some(c) if !c.is_empty() => { 225 + cursor = Some(c); 226 + } 227 + _ => break, 228 + } 229 + } 230 + 124 231 Ok(()) 232 + } 125 233 126 - 234 + async fn do_restore(did: String, bucket: Box<Bucket>) -> anyhow::Result<PathBuf> { 235 + use futures::StreamExt; 236 + 237 + let back_up_path = format!("users/{did}"); 238 + 239 + // Stream download from S3 240 + let mut s3_stream = bucket 241 + .get_object_stream(format!("/{back_up_path}/{did}.car.zst")) 242 + .await?; 243 + 244 + // Convert the stream of Bytes into a tokio AsyncRead 245 + let byte_stream = s3_stream 246 + .bytes() 247 + .map(|res| res.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))); 248 + 249 + let tokio_reader = StreamReader::new(byte_stream); 250 + 251 + // Convert tokio AsyncRead -> futures AsyncRead, then buffer for decoder 252 + let futures_reader = tokio_reader.compat(); 253 + let futures_buf = futures::io::BufReader::new(futures_reader); 254 + 255 + // Zstd decode 256 + let decoder = ZstdDecoder::new(futures_buf); 257 + 258 + // Convert back to tokio AsyncRead for writing 259 + let mut decoded_tokio_reader = decoder.compat(); 260 + 261 + // Prepare local output path, labeled as decompressed 262 + let out_path: PathBuf = [ 263 + "export", 264 + "users", 265 + did.as_str(), 266 + &format!("{}-decompressed.car", did), 267 + ] 268 + .iter() 269 + .collect(); 270 + ensure_dir(&out_path)?; 271 + 272 + let mut out_file = tokio::fs::File::create(&out_path).await?; 273 + 274 + // Stream copy decoded content to file 275 + tokio::io::copy(&mut decoded_tokio_reader, &mut out_file).await?; 276 + out_file.flush().await?; 277 + 278 + Ok(out_path) 127 279 }