+2
Cargo.lock
+2
Cargo.lock
+2
Cargo.toml
+2
Cargo.toml
+87
-14
src/main.rs
+87
-14
src/main.rs
···
15
15
use tokio_stream::wrappers::LinesStream;
16
16
use tokio_util::compat::{FuturesAsyncReadCompatExt, TokioAsyncReadCompatExt};
17
17
use tokio_util::io::StreamReader;
18
+
use serde::Deserialize;
18
19
19
20
#[derive(Parser, Debug)]
20
21
#[command(author, version, about = "PDS Whatsit Compress Test CLI", long_about = None)]
···
26
27
#[derive(Subcommand, Debug)]
27
28
enum Commands {
28
29
/// Export data from a PDS instance
29
-
Export {
30
+
Backup {
30
31
/// Output directory path (defaults to ./export)
31
32
// #[arg(short, long)]
32
33
// out: Option<PathBuf>,
···
41
42
},
42
43
43
44
/// Import: download from S3, decompress zstd, and save locally
44
-
Import {
45
+
Restore {
45
46
/// The DID to import
46
47
#[arg(long)]
47
48
did: String,
···
93
94
)?;
94
95
95
96
match cli.command {
96
-
Commands::Export { did, pds_url } => {
97
+
Commands::Backup { did, pds_url } => {
97
98
info!("Export requested: did={}, pds_url={}", did, pds_url,);
98
99
99
-
match do_work(pds_url, did, bucket).await {
100
+
match do_backup(pds_url, did, bucket).await {
100
101
Ok(_) => {
101
102
info!("Export completed");
102
103
Ok(())
···
107
108
}
108
109
}
109
110
}
110
-
Commands::Import { did } => {
111
+
Commands::Restore { did } => {
111
112
info!("Import requested: did={}", did);
112
-
match do_import(did, bucket).await {
113
+
match do_restore(did, bucket).await {
113
114
Ok(path) => {
114
115
info!("Import completed, wrote {}", path.display());
115
116
Ok(())
···
123
124
}
124
125
}
125
126
126
-
async fn do_work(pds_url: String, did: String, bucket: Box<Bucket>) -> anyhow::Result<()> {
127
+
async fn do_backup(pds_url: String, did: String, bucket: Box<Bucket>) -> anyhow::Result<()> {
127
128
use futures::TryStreamExt;
128
129
let atproto_client = reqwest::Client::new();
129
130
let back_up_path = format!("users/{did}");
130
131
132
+
// 1) Backup the full repo CAR (compressed)
131
133
let response_reader = atproto_client
132
134
.get(format!("{pds_url}/xrpc/com.atproto.sync.getRepo?did={did}"))
133
135
.header(ACCEPT, "application/vnd.ipld.car")
···
138
140
.map_err(|e| futures::io::Error::new(futures::io::ErrorKind::Other, e))
139
141
.into_async_read();
140
142
141
-
// Wrap the response in a buffered reader to satisfy AsyncBufRead for the encoder
142
143
let buf_reader = BufReader::new(response_reader);
143
-
144
-
// Create a Zstd encoder that compresses on-the-fly while being read
145
144
let zstd_encoder = ZstdEncoder::new(buf_reader);
146
-
147
-
// Convert futures::AsyncRead to tokio::io::AsyncRead for the S3 client
148
145
let mut zstd_tokio_reader = zstd_encoder.compat();
149
146
150
-
// Stream the compressed data to S3 with appropriate headers
151
147
bucket
152
148
.put_object_stream_builder(format!("/{back_up_path}/{did}.car.zst").as_str())
153
149
.with_content_type("application/vnd.ipld.car")
···
155
151
.execute_stream(&mut zstd_tokio_reader)
156
152
.await?;
157
153
154
+
// 2) Paginate listBlobs and upload each as zstd-compressed
155
+
#[derive(Deserialize)]
156
+
struct ListBlobsResponse {
157
+
#[allow(dead_code)]
158
+
cursor: Option<String>,
159
+
cids: Vec<String>,
160
+
}
161
+
162
+
let mut cursor: Option<String> = None;
163
+
let limit = 1000u32;
164
+
165
+
loop {
166
+
let mut url = format!(
167
+
"{}/xrpc/com.atproto.sync.listBlobs?did={}&limit={}",
168
+
pds_url, did, limit
169
+
);
170
+
if let Some(ref c) = cursor {
171
+
if !c.is_empty() {
172
+
url.push_str("&cursor=");
173
+
url.push_str(c);
174
+
}
175
+
}
176
+
177
+
info!("Listing blobs: {}", url);
178
+
let resp = atproto_client
179
+
.get(url)
180
+
.header(ACCEPT, "application/json")
181
+
.send()
182
+
.await?
183
+
.error_for_status()?;
184
+
let bytes = resp.bytes().await?;
185
+
let page: ListBlobsResponse = serde_json::from_slice(&bytes)?;
186
+
187
+
if page.cids.is_empty() {
188
+
if cursor.is_none() || cursor.as_deref() == Some("") {
189
+
break;
190
+
}
191
+
}
192
+
193
+
for cid in page.cids {
194
+
let blob_url = format!(
195
+
"{}/xrpc/com.atproto.sync.getBlob?did={}&cid={}",
196
+
pds_url, did, cid
197
+
);
198
+
info!("Downloading blob {}", cid);
199
+
let blob_reader = atproto_client
200
+
.get(blob_url)
201
+
.header(ACCEPT, "*/*")
202
+
.send()
203
+
.await?
204
+
.error_for_status()?
205
+
.bytes_stream()
206
+
.map_err(|e| futures::io::Error::new(futures::io::ErrorKind::Other, e))
207
+
.into_async_read();
208
+
209
+
let blob_buf = BufReader::new(blob_reader);
210
+
let blob_zstd = ZstdEncoder::new(blob_buf);
211
+
let mut blob_tokio_reader = blob_zstd.compat();
212
+
213
+
let object_key = format!("/{}/blobs/{}.zst", back_up_path, cid);
214
+
bucket
215
+
.put_object_stream_builder(&object_key)
216
+
.with_content_type("application/octet-stream")
217
+
.with_content_encoding("zstd")?
218
+
.execute_stream(&mut blob_tokio_reader)
219
+
.await?;
220
+
}
221
+
222
+
// Update or finish based on cursor
223
+
match page.cursor {
224
+
Some(c) if !c.is_empty() => {
225
+
cursor = Some(c);
226
+
}
227
+
_ => break,
228
+
}
229
+
}
230
+
158
231
Ok(())
159
232
}
160
233
161
-
async fn do_import(did: String, bucket: Box<Bucket>) -> anyhow::Result<PathBuf> {
234
+
async fn do_restore(did: String, bucket: Box<Bucket>) -> anyhow::Result<PathBuf> {
162
235
use futures::StreamExt;
163
236
164
237
let back_up_path = format!("users/{did}");