+2
Cargo.lock
+2
Cargo.lock
+2
Cargo.toml
+2
Cargo.toml
+198
-46
src/main.rs
+198
-46
src/main.rs
···
1
-
use std::{env, fs};
2
1
use std::path::PathBuf;
2
+
use std::{env, fs};
3
3
4
+
use anyhow;
5
+
use async_compression::futures::bufread::{ZstdDecoder, ZstdEncoder};
4
6
use clap::{Parser, Subcommand};
5
7
use dotenvy::dotenv;
8
+
use futures::io::BufReader;
6
9
use log::{error, info};
7
10
use reqwest::header::{ACCEPT, ACCEPT_ENCODING};
8
-
use anyhow;
9
-
use tokio_util::compat::FuturesAsyncReadCompatExt;
10
-
use tokio_stream::wrappers::LinesStream;
11
-
use std::future::Future;
11
+
use s3::creds::Credentials;
12
12
use s3::{Bucket, Region};
13
-
use s3::creds::Credentials;
13
+
use std::future::Future;
14
+
use tokio::io::AsyncWriteExt;
15
+
use tokio_stream::wrappers::LinesStream;
16
+
use tokio_util::compat::{FuturesAsyncReadCompatExt, TokioAsyncReadCompatExt};
17
+
use tokio_util::io::StreamReader;
18
+
use serde::Deserialize;
14
19
15
20
#[derive(Parser, Debug)]
16
21
#[command(author, version, about = "PDS Whatsit Compress Test CLI", long_about = None)]
···
22
27
#[derive(Subcommand, Debug)]
23
28
enum Commands {
24
29
/// Export data from a PDS instance
25
-
Export {
30
+
Backup {
26
31
/// Output directory path (defaults to ./export)
27
32
// #[arg(short, long)]
28
33
// out: Option<PathBuf>,
···
35
40
#[arg(long, value_name = "URL")]
36
41
pds_url: String,
37
42
},
43
+
44
+
/// Import: download from S3, decompress zstd, and save locally
45
+
Restore {
46
+
/// The DID to import
47
+
#[arg(long)]
48
+
did: String,
49
+
},
38
50
}
39
51
40
52
fn init_logging() {
···
52
64
fs::create_dir_all(parent)?;
53
65
}
54
66
}
55
-
if !path.exists() {
56
-
fs::create_dir_all(path)?;
57
-
}
58
67
Ok(())
59
68
}
60
69
61
70
#[tokio::main]
62
-
async fn main() {
71
+
async fn main() -> anyhow::Result<()> {
63
72
init_logging();
64
73
65
74
let cli = Cli::parse();
75
+
// Custom region requires valid region name and endpoint
76
+
let region_name = env::var("S3_REGION")?;
77
+
let endpoint = env::var("S3_ENDPOINT")?;
78
+
let region = Region::Custom {
79
+
region: region_name,
80
+
endpoint,
81
+
};
66
82
67
-
match cli.command {
68
-
Commands::Export { did, pds_url } => {
83
+
let bucket = Bucket::new(
84
+
env::var("S3_BUCKET_NAME")?.as_str(),
85
+
region,
86
+
// Credentials are collected from environment, config, profile or instance metadata
87
+
Credentials::new(
88
+
Some(env::var("S3_ACCESS_KEY")?.as_str()),
89
+
Some(env::var("S3_SECRET_KEY")?.as_str()),
90
+
None,
91
+
None,
92
+
None,
93
+
)?,
94
+
)?;
69
95
70
-
info!(
71
-
"Export requested: did={}, pds_url={}",
72
-
did,
73
-
pds_url,
74
-
);
96
+
match cli.command {
97
+
Commands::Backup { did, pds_url } => {
98
+
info!("Export requested: did={}, pds_url={}", did, pds_url,);
75
99
76
-
match do_work(pds_url, did).await{
100
+
match do_backup(pds_url, did, bucket).await {
77
101
Ok(_) => {
78
102
info!("Export completed");
103
+
Ok(())
79
104
}
80
105
Err(err) => {
81
106
error!("Export failed: {}", err);
107
+
Err(err)
108
+
}
109
+
}
110
+
}
111
+
Commands::Restore { did } => {
112
+
info!("Import requested: did={}", did);
113
+
match do_restore(did, bucket).await {
114
+
Ok(path) => {
115
+
info!("Import completed, wrote {}", path.display());
116
+
Ok(())
117
+
}
118
+
Err(err) => {
119
+
error!("Import failed: {}", err);
120
+
Err(err)
82
121
}
83
122
}
84
-
// Placeholder for actual export logic.
85
-
// Implement your export functionality here.
86
-
// println!(
87
-
// "Export would run here with did={} pds_url={} out_dir={}",
88
-
// did,
89
-
// pds_url,
90
-
// out_dir.display()
91
-
// );
92
123
}
93
124
}
94
125
}
95
126
96
-
async fn do_work(pds_url: String, did: String) -> anyhow::Result<()>{
127
+
async fn do_backup(pds_url: String, did: String, bucket: Box<Bucket>) -> anyhow::Result<()> {
97
128
use futures::TryStreamExt;
98
129
let atproto_client = reqwest::Client::new();
130
+
let back_up_path = format!("users/{did}");
99
131
100
-
101
-
// Custom region requires valid region name and endpoint
102
-
let region_name = env::var("S3_REGION")?;
103
-
let endpoint = env::var("S3_ENDPOINT")?;
104
-
let region = Region::Custom { region: region_name, endpoint };
105
-
106
-
let bucket = Bucket::new(
107
-
env::var("S3_BUCKET_NAME")?.as_str(),
108
-
region,
109
-
// Credentials are collected from environment, config, profile or instance metadata
110
-
Credentials::new(Some(env::var("S3_ACCESS_KEY")?.as_str()), Some(env::var("S3_SECRET_KEY")?.as_str()), None, None, None)?,
111
-
)?;
112
-
113
-
let mut response = atproto_client
132
+
// 1) Backup the full repo CAR (compressed)
133
+
let response_reader = atproto_client
114
134
.get(format!("{pds_url}/xrpc/com.atproto.sync.getRepo?did={did}"))
115
135
.header(ACCEPT, "application/vnd.ipld.car")
116
136
.send()
···
118
138
.error_for_status()?
119
139
.bytes_stream()
120
140
.map_err(|e| futures::io::Error::new(futures::io::ErrorKind::Other, e))
121
-
.into_async_read()
122
-
.compat();
123
-
bucket.put_object_stream(&mut response, did.as_str()).await?;
141
+
.into_async_read();
142
+
143
+
let buf_reader = BufReader::new(response_reader);
144
+
let zstd_encoder = ZstdEncoder::new(buf_reader);
145
+
let mut zstd_tokio_reader = zstd_encoder.compat();
146
+
147
+
bucket
148
+
.put_object_stream_builder(format!("/{back_up_path}/{did}.car.zst").as_str())
149
+
.with_content_type("application/vnd.ipld.car")
150
+
.with_content_encoding("zstd")?
151
+
.execute_stream(&mut zstd_tokio_reader)
152
+
.await?;
153
+
154
+
// 2) Paginate listBlobs and upload each as zstd-compressed
155
+
#[derive(Deserialize)]
156
+
struct ListBlobsResponse {
157
+
#[allow(dead_code)]
158
+
cursor: Option<String>,
159
+
cids: Vec<String>,
160
+
}
161
+
162
+
let mut cursor: Option<String> = None;
163
+
let limit = 1000u32;
164
+
165
+
loop {
166
+
let mut url = format!(
167
+
"{}/xrpc/com.atproto.sync.listBlobs?did={}&limit={}",
168
+
pds_url, did, limit
169
+
);
170
+
if let Some(ref c) = cursor {
171
+
if !c.is_empty() {
172
+
url.push_str("&cursor=");
173
+
url.push_str(c);
174
+
}
175
+
}
176
+
177
+
info!("Listing blobs: {}", url);
178
+
let resp = atproto_client
179
+
.get(url)
180
+
.header(ACCEPT, "application/json")
181
+
.send()
182
+
.await?
183
+
.error_for_status()?;
184
+
let bytes = resp.bytes().await?;
185
+
let page: ListBlobsResponse = serde_json::from_slice(&bytes)?;
186
+
187
+
if page.cids.is_empty() {
188
+
if cursor.is_none() || cursor.as_deref() == Some("") {
189
+
break;
190
+
}
191
+
}
192
+
193
+
for cid in page.cids {
194
+
let blob_url = format!(
195
+
"{}/xrpc/com.atproto.sync.getBlob?did={}&cid={}",
196
+
pds_url, did, cid
197
+
);
198
+
info!("Downloading blob {}", cid);
199
+
let blob_reader = atproto_client
200
+
.get(blob_url)
201
+
.header(ACCEPT, "*/*")
202
+
.send()
203
+
.await?
204
+
.error_for_status()?
205
+
.bytes_stream()
206
+
.map_err(|e| futures::io::Error::new(futures::io::ErrorKind::Other, e))
207
+
.into_async_read();
208
+
209
+
let blob_buf = BufReader::new(blob_reader);
210
+
let blob_zstd = ZstdEncoder::new(blob_buf);
211
+
let mut blob_tokio_reader = blob_zstd.compat();
212
+
213
+
let object_key = format!("/{}/blobs/{}.zst", back_up_path, cid);
214
+
bucket
215
+
.put_object_stream_builder(&object_key)
216
+
.with_content_type("application/octet-stream")
217
+
.with_content_encoding("zstd")?
218
+
.execute_stream(&mut blob_tokio_reader)
219
+
.await?;
220
+
}
221
+
222
+
// Update or finish based on cursor
223
+
match page.cursor {
224
+
Some(c) if !c.is_empty() => {
225
+
cursor = Some(c);
226
+
}
227
+
_ => break,
228
+
}
229
+
}
230
+
124
231
Ok(())
232
+
}
125
233
126
-
234
+
async fn do_restore(did: String, bucket: Box<Bucket>) -> anyhow::Result<PathBuf> {
235
+
use futures::StreamExt;
236
+
237
+
let back_up_path = format!("users/{did}");
238
+
239
+
// Stream download from S3
240
+
let mut s3_stream = bucket
241
+
.get_object_stream(format!("/{back_up_path}/{did}.car.zst"))
242
+
.await?;
243
+
244
+
// Convert the stream of Bytes into a tokio AsyncRead
245
+
let byte_stream = s3_stream
246
+
.bytes()
247
+
.map(|res| res.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)));
248
+
249
+
let tokio_reader = StreamReader::new(byte_stream);
250
+
251
+
// Convert tokio AsyncRead -> futures AsyncRead, then buffer for decoder
252
+
let futures_reader = tokio_reader.compat();
253
+
let futures_buf = futures::io::BufReader::new(futures_reader);
254
+
255
+
// Zstd decode
256
+
let decoder = ZstdDecoder::new(futures_buf);
257
+
258
+
// Convert back to tokio AsyncRead for writing
259
+
let mut decoded_tokio_reader = decoder.compat();
260
+
261
+
// Prepare local output path, labeled as decompressed
262
+
let out_path: PathBuf = [
263
+
"export",
264
+
"users",
265
+
did.as_str(),
266
+
&format!("{}-decompressed.car", did),
267
+
]
268
+
.iter()
269
+
.collect();
270
+
ensure_dir(&out_path)?;
271
+
272
+
let mut out_file = tokio::fs::File::create(&out_path).await?;
273
+
274
+
// Stream copy decoded content to file
275
+
tokio::io::copy(&mut decoded_tokio_reader, &mut out_file).await?;
276
+
out_file.flush().await?;
277
+
278
+
Ok(out_path)
127
279
}