+54
-50
src/main.rs
+54
-50
src/main.rs
···
1
-
use std::{env, fs};
2
1
use std::path::PathBuf;
2
+
use std::{env, fs};
3
3
4
+
use anyhow;
5
+
use async_compression::futures::bufread::{ZstdDecoder, ZstdEncoder};
4
6
use clap::{Parser, Subcommand};
5
7
use dotenvy::dotenv;
8
+
use futures::io::BufReader;
6
9
use log::{error, info};
7
10
use reqwest::header::{ACCEPT, ACCEPT_ENCODING};
8
-
use anyhow;
11
+
use s3::creds::Credentials;
12
+
use s3::{Bucket, Region};
13
+
use std::future::Future;
14
+
use tokio::io::AsyncWriteExt;
15
+
use tokio_stream::wrappers::LinesStream;
9
16
use tokio_util::compat::{FuturesAsyncReadCompatExt, TokioAsyncReadCompatExt};
10
17
use tokio_util::io::StreamReader;
11
-
use tokio_stream::wrappers::LinesStream;
12
-
use tokio::io::AsyncWriteExt;
13
-
use std::future::Future;
14
-
use async_compression::futures::bufread::{ZstdEncoder, ZstdDecoder};
15
-
use s3::{Bucket, Region};
16
-
use s3::creds::Credentials;
17
-
use futures::io::BufReader;
18
18
19
19
#[derive(Parser, Debug)]
20
20
#[command(author, version, about = "PDS Whatsit Compress Test CLI", long_about = None)]
···
63
63
fs::create_dir_all(parent)?;
64
64
}
65
65
}
66
-
if !path.exists() {
67
-
fs::create_dir_all(path)?;
68
-
}
69
66
Ok(())
70
67
}
71
68
72
69
#[tokio::main]
73
-
async fn main() {
70
+
async fn main() -> anyhow::Result<()> {
74
71
init_logging();
75
72
76
73
let cli = Cli::parse();
74
+
// Custom region requires valid region name and endpoint
75
+
let region_name = env::var("S3_REGION")?;
76
+
let endpoint = env::var("S3_ENDPOINT")?;
77
+
let region = Region::Custom {
78
+
region: region_name,
79
+
endpoint,
80
+
};
77
81
78
-
match cli.command {
79
-
Commands::Export { did, pds_url } => {
82
+
let bucket = Bucket::new(
83
+
env::var("S3_BUCKET_NAME")?.as_str(),
84
+
region,
85
+
// Credentials are collected from environment, config, profile or instance metadata
86
+
Credentials::new(
87
+
Some(env::var("S3_ACCESS_KEY")?.as_str()),
88
+
Some(env::var("S3_SECRET_KEY")?.as_str()),
89
+
None,
90
+
None,
91
+
None,
92
+
)?,
93
+
)?;
80
94
81
-
info!(
82
-
"Export requested: did={}, pds_url={}",
83
-
did,
84
-
pds_url,
85
-
);
95
+
match cli.command {
96
+
Commands::Export { did, pds_url } => {
97
+
info!("Export requested: did={}, pds_url={}", did, pds_url,);
86
98
87
-
match do_work(pds_url, did).await{
99
+
match do_work(pds_url, did, bucket).await {
88
100
Ok(_) => {
89
101
info!("Export completed");
102
+
Ok(())
90
103
}
91
104
Err(err) => {
92
105
error!("Export failed: {}", err);
106
+
Err(err)
93
107
}
94
108
}
95
109
}
96
110
Commands::Import { did } => {
97
111
info!("Import requested: did={}", did);
98
-
match do_import(did).await {
99
-
Ok(path) => info!("Import completed, wrote {}", path.display()),
100
-
Err(err) => error!("Import failed: {}", err),
112
+
match do_import(did, bucket).await {
113
+
Ok(path) => {
114
+
info!("Import completed, wrote {}", path.display());
115
+
Ok(())
116
+
}
117
+
Err(err) => {
118
+
error!("Import failed: {}", err);
119
+
Err(err)
120
+
}
101
121
}
102
122
}
103
123
}
104
124
}
105
125
106
-
async fn do_work(pds_url: String, did: String) -> anyhow::Result<()>{
126
+
async fn do_work(pds_url: String, did: String, bucket: Box<Bucket>) -> anyhow::Result<()> {
107
127
use futures::TryStreamExt;
108
128
let atproto_client = reqwest::Client::new();
109
129
let back_up_path = format!("users/{did}");
110
130
111
-
// Custom region requires valid region name and endpoint
112
-
let region_name = env::var("S3_REGION")?;
113
-
let endpoint = env::var("S3_ENDPOINT")?;
114
-
let region = Region::Custom { region: region_name, endpoint };
115
-
116
-
let bucket = Bucket::new(
117
-
env::var("S3_BUCKET_NAME")?.as_str(),
118
-
region,
119
-
// Credentials are collected from environment, config, profile or instance metadata
120
-
Credentials::new(Some(env::var("S3_ACCESS_KEY")?.as_str()), Some(env::var("S3_SECRET_KEY")?.as_str()), None, None, None)?,
121
-
)?;
122
-
123
131
let response_reader = atproto_client
124
132
.get(format!("{pds_url}/xrpc/com.atproto.sync.getRepo?did={did}"))
125
133
.header(ACCEPT, "application/vnd.ipld.car")
···
150
158
Ok(())
151
159
}
152
160
153
-
async fn do_import(did: String) -> anyhow::Result<PathBuf> {
161
+
async fn do_import(did: String, bucket: Box<Bucket>) -> anyhow::Result<PathBuf> {
154
162
use futures::StreamExt;
155
163
156
164
let back_up_path = format!("users/{did}");
157
165
158
-
// S3 setup from env
159
-
let region_name = env::var("S3_REGION")?;
160
-
let endpoint = env::var("S3_ENDPOINT")?;
161
-
let region = Region::Custom { region: region_name, endpoint };
162
-
163
-
let bucket = Bucket::new(
164
-
env::var("S3_BUCKET_NAME")?.as_str(),
165
-
region,
166
-
Credentials::new(Some(env::var("S3_ACCESS_KEY")?.as_str()), Some(env::var("S3_SECRET_KEY")?.as_str()), None, None, None)?,
167
-
)?;
168
-
169
166
// Stream download from S3
170
167
let mut s3_stream = bucket
171
168
.get_object_stream(format!("/{back_up_path}/{did}.car.zst"))
···
189
186
let mut decoded_tokio_reader = decoder.compat();
190
187
191
188
// Prepare local output path, labeled as decompressed
192
-
let out_path: PathBuf = ["export", "users", did.as_str(), &format!("{}-decompressed.car", did)].iter().collect();
189
+
let out_path: PathBuf = [
190
+
"export",
191
+
"users",
192
+
did.as_str(),
193
+
&format!("{}-decompressed.car", did),
194
+
]
195
+
.iter()
196
+
.collect();
193
197
ensure_dir(&out_path)?;
194
198
195
199
let mut out_file = tokio::fs::File::create(&out_path).await?;