tangled
alpha
login
or
join now
microcosm.blue
/
Allegedly
Server tools to backfill, tail, mirror, and verify PLC logs
43
fork
atom
overview
issues
4
pulls
pipelines
Compare changes
Choose any two refs to compare.
base:
otel-tracing
main
debug
v0.3.0
v0.2.1
v0.2.0
compare:
otel-tracing
main
debug
v0.3.0
v0.2.1
v0.2.0
go
+720
-587
16 changed files
expand all
collapse all
unified
split
Cargo.lock
Cargo.toml
favicon.ico
readme.md
src
backfill.rs
bin
allegedly.rs
backfill.rs
mirror.rs
mod.rs
client.rs
lib.rs
mirror.rs
plc_pg.rs
poll.rs
ratelimit.rs
weekly.rs
+14
Cargo.lock
···
38
38
"governor",
39
39
"http-body-util",
40
40
"log",
41
41
+
"native-tls",
41
42
"poem",
43
43
+
"postgres-native-tls",
42
44
"reqwest",
43
45
"reqwest-middleware",
44
46
"reqwest-retry",
···
1740
1742
version = "1.11.1"
1741
1743
source = "registry+https://github.com/rust-lang/crates.io-index"
1742
1744
checksum = "f84267b20a16ea918e43c6a88433c2d54fa145c92a811b5b047ccbe153674483"
1745
1745
+
1746
1746
+
[[package]]
1747
1747
+
name = "postgres-native-tls"
1748
1748
+
version = "0.5.1"
1749
1749
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1750
1750
+
checksum = "a1f39498473c92f7b6820ae970382c1d83178a3454c618161cb772e8598d9f6f"
1751
1751
+
dependencies = [
1752
1752
+
"native-tls",
1753
1753
+
"tokio",
1754
1754
+
"tokio-native-tls",
1755
1755
+
"tokio-postgres",
1756
1756
+
]
1743
1757
1744
1758
[[package]]
1745
1759
name = "postgres-protocol"
+2
Cargo.toml
···
15
15
governor = "0.10.1"
16
16
http-body-util = "0.1.3"
17
17
log = "0.4.28"
18
18
+
native-tls = "0.2.14"
18
19
poem = { version = "3.1.12", features = ["acme", "compression"] }
20
20
+
postgres-native-tls = "0.5.1"
19
21
reqwest = { version = "0.12.23", features = ["stream", "json"] }
20
22
reqwest-middleware = "0.4.2"
21
23
reqwest-retry = "0.7.0"
favicon.ico
This is a binary file and will not be displayed.
+1
-1
readme.md
···
27
27
--upstream "https://plc.directory" \
28
28
--wrap "http://127.0.0.1:3000" \
29
29
--acme-domain "plc.wtf" \
30
30
-
--acme-cache-dir ./acme-cache \
30
30
+
--acme-cache-path ./acme-cache \
31
31
--acme-directory-url "https://acme-staging-v02.api.letsencrypt.org/directory"
32
32
```
33
33
+12
-5
src/backfill.rs
···
13
13
dest: mpsc::Sender<ExportPage>,
14
14
source_workers: usize,
15
15
until: Option<Dt>,
16
16
-
) -> anyhow::Result<()> {
16
16
+
) -> anyhow::Result<&'static str> {
17
17
// queue up the week bundles that should be available
18
18
let weeks = Arc::new(Mutex::new(
19
19
until
···
39
39
while let Some(week) = weeks.lock().await.pop() {
40
40
let when = Into::<Dt>::into(week).to_rfc3339();
41
41
log::trace!("worker {w}: fetching week {when} (-{})", week.n_ago());
42
42
-
week_to_pages(source.clone(), week, dest.clone()).await?;
42
42
+
week_to_pages(source.clone(), week, dest.clone())
43
43
+
.await
44
44
+
.inspect_err(|e| log::error!("failing week_to_pages: {e}"))?;
43
45
}
44
46
log::info!("done with the weeks ig");
45
47
Ok(())
···
50
52
51
53
// wait for the big backfill to finish
52
54
while let Some(res) = workers.join_next().await {
53
53
-
res??;
55
55
+
res.inspect_err(|e| log::error!("problem joining source workers: {e}"))?
56
56
+
.inspect_err(|e| log::error!("problem *from* source worker: {e}"))?;
54
57
}
55
55
-
log::info!("finished fetching backfill in {:?}", t_step.elapsed());
56
56
-
Ok(())
58
58
+
log::info!(
59
59
+
"finished fetching backfill in {:?}. senders remaining: {}",
60
60
+
t_step.elapsed(),
61
61
+
dest.strong_count()
62
62
+
);
63
63
+
Ok("backfill")
57
64
}
+40
-226
src/bin/allegedly.rs
···
1
1
-
use allegedly::{
2
2
-
Db, Dt, ExportPage, FolderSource, HttpSource, ListenConf, PageBoundaryState, backfill,
3
3
-
backfill_to_pg, bin_init, pages_to_pg, pages_to_weeks, poll_upstream, serve,
4
4
-
};
1
1
+
use allegedly::{Dt, bin::GlobalArgs, bin_init, pages_to_stdout, pages_to_weeks, poll_upstream};
5
2
use clap::{CommandFactory, Parser, Subcommand};
6
6
-
use reqwest::Url;
7
7
-
use std::{net::SocketAddr, path::PathBuf, time::Instant};
8
8
-
use tokio::sync::{mpsc, oneshot};
3
3
+
use std::{path::PathBuf, time::Instant};
4
4
+
use tokio::fs::create_dir_all;
5
5
+
use tokio::sync::mpsc;
6
6
+
7
7
+
mod backfill;
8
8
+
mod mirror;
9
9
10
10
#[derive(Debug, Parser)]
11
11
struct Cli {
12
12
-
/// Upstream PLC server
13
13
-
#[arg(short, long, global = true, env = "ALLEGEDLY_UPSTREAM")]
14
14
-
#[clap(default_value = "https://plc.directory")]
15
15
-
upstream: Url,
12
12
+
#[command(flatten)]
13
13
+
globals: GlobalArgs,
14
14
+
16
15
#[command(subcommand)]
17
16
command: Commands,
18
17
}
···
21
20
enum Commands {
22
21
/// Use weekly bundled ops to get a complete directory mirror FAST
23
22
Backfill {
24
24
-
/// Remote URL prefix to fetch bundles from
25
25
-
#[arg(long)]
26
26
-
#[clap(default_value = "https://plc.t3.storage.dev/plc.directory/")]
27
27
-
http: Url,
28
28
-
/// Local folder to fetch bundles from (overrides `http`)
29
29
-
#[arg(long)]
30
30
-
dir: Option<PathBuf>,
31
31
-
/// Parallel bundle fetchers
32
32
-
///
33
33
-
/// Default: 4 for http fetches, 1 for local folder
34
34
-
#[arg(long)]
35
35
-
source_workers: Option<usize>,
36
36
-
/// Bulk load into did-method-plc-compatible postgres instead of stdout
37
37
-
///
38
38
-
/// Pass a postgres connection url like "postgresql://localhost:5432"
39
39
-
#[arg(long)]
40
40
-
to_postgres: Option<Url>,
41
41
-
/// Delete all operations from the postgres db before starting
42
42
-
///
43
43
-
/// only used if `--to-postgres` is present
44
44
-
#[arg(long, action)]
45
45
-
postgres_reset: bool,
46
46
-
/// Stop at the week ending before this date
47
47
-
#[arg(long)]
48
48
-
until: Option<Dt>,
49
49
-
/// After the weekly imports, poll upstream until we're caught up
50
50
-
#[arg(long, action)]
51
51
-
catch_up: bool,
23
23
+
#[command(flatten)]
24
24
+
args: backfill::Args,
52
25
},
53
26
/// Scrape a PLC server, collecting ops into weekly bundles
54
27
///
···
73
46
},
74
47
/// Wrap a did-method-plc server, syncing upstream and blocking op submits
75
48
Mirror {
76
76
-
/// the wrapped did-method-plc server
77
77
-
#[arg(long, env = "ALLEGEDLY_WRAP")]
78
78
-
wrap: Url,
79
79
-
/// the wrapped did-method-plc server's database (write access required)
80
80
-
#[arg(long, env = "ALLEGEDLY_WRAP_PG")]
81
81
-
wrap_pg: Url,
82
82
-
/// wrapping server listen address
83
83
-
#[arg(short, long, env = "ALLEGEDLY_BIND")]
84
84
-
#[clap(default_value = "127.0.0.1:8000")]
85
85
-
bind: SocketAddr,
86
86
-
/// obtain a certificate from letsencrypt
87
87
-
///
88
88
-
/// for now this will force listening on all interfaces at :80 and :443
89
89
-
/// (:80 will serve an "https required" error, *will not* redirect)
90
90
-
#[arg(
91
91
-
long,
92
92
-
conflicts_with("bind"),
93
93
-
requires("acme_cache_path"),
94
94
-
env = "ALLEGEDLY_ACME_DOMAIN"
95
95
-
)]
96
96
-
acme_domain: Vec<String>,
97
97
-
/// which local directory to keep the letsencrypt certs in
98
98
-
#[arg(long, requires("acme_domain"), env = "ALLEGEDLY_ACME_CACHE_PATH")]
99
99
-
acme_cache_path: Option<PathBuf>,
100
100
-
/// which public acme directory to use
101
101
-
///
102
102
-
/// eg. letsencrypt staging: "https://acme-staging-v02.api.letsencrypt.org/directory"
103
103
-
#[arg(long, requires("acme_domain"), env = "ALLEGEDLY_ACME_DIRECTORY_URL")]
104
104
-
#[clap(default_value = "https://acme-v02.api.letsencrypt.org/directory")]
105
105
-
acme_directory_url: Url,
49
49
+
#[command(flatten)]
50
50
+
args: mirror::Args,
106
51
},
107
52
/// Poll an upstream PLC server and log new ops to stdout
108
53
Tail {
···
112
57
},
113
58
}
114
59
115
115
-
async fn pages_to_stdout(
116
116
-
mut rx: mpsc::Receiver<ExportPage>,
117
117
-
notify_last_at: Option<oneshot::Sender<Option<Dt>>>,
118
118
-
) -> anyhow::Result<()> {
119
119
-
let mut last_at = None;
120
120
-
while let Some(page) = rx.recv().await {
121
121
-
for op in &page.ops {
122
122
-
println!("{op}");
123
123
-
}
124
124
-
if notify_last_at.is_some()
125
125
-
&& let Some(s) = PageBoundaryState::new(&page)
126
126
-
{
127
127
-
last_at = last_at.filter(|&l| l >= s.last_at).or(Some(s.last_at));
128
128
-
}
129
129
-
}
130
130
-
if let Some(notify) = notify_last_at {
131
131
-
log::trace!("notifying last_at: {last_at:?}");
132
132
-
if notify.send(last_at).is_err() {
133
133
-
log::error!("receiver for last_at dropped, can't notify");
134
134
-
};
135
135
-
}
136
136
-
Ok(())
137
137
-
}
138
138
-
139
139
-
/// page forwarder who drops its channels on receipt of a small page
140
140
-
///
141
141
-
/// PLC will return up to 1000 ops on a page, and returns full pages until it
142
142
-
/// has caught up, so this is a (hacky?) way to stop polling once we're up.
143
143
-
fn full_pages(mut rx: mpsc::Receiver<ExportPage>) -> mpsc::Receiver<ExportPage> {
144
144
-
let (tx, fwd) = mpsc::channel(1);
145
145
-
tokio::task::spawn(async move {
146
146
-
while let Some(page) = rx.recv().await
147
147
-
&& page.ops.len() > 900
148
148
-
{
149
149
-
tx.send(page).await.unwrap();
150
150
-
}
151
151
-
});
152
152
-
fwd
153
153
-
}
154
154
-
155
60
#[tokio::main]
156
156
-
async fn main() {
61
61
+
async fn main() -> anyhow::Result<()> {
157
62
let args = Cli::parse();
158
63
let matches = Cli::command().get_matches();
159
64
let name = matches.subcommand().map(|(name, _)| name).unwrap_or("???");
160
65
bin_init(name);
161
66
67
67
+
let globals = args.globals.clone();
68
68
+
162
69
let t0 = Instant::now();
163
70
match args.command {
164
164
-
Commands::Backfill {
165
165
-
http,
166
166
-
dir,
167
167
-
source_workers,
168
168
-
to_postgres,
169
169
-
postgres_reset,
170
170
-
until,
171
171
-
catch_up,
172
172
-
} => {
173
173
-
let (tx, rx) = mpsc::channel(32); // these are big pages
174
174
-
tokio::task::spawn(async move {
175
175
-
if let Some(dir) = dir {
176
176
-
log::info!("Reading weekly bundles from local folder {dir:?}");
177
177
-
backfill(FolderSource(dir), tx, source_workers.unwrap_or(1), until)
178
178
-
.await
179
179
-
.unwrap();
180
180
-
} else {
181
181
-
log::info!("Fetching weekly bundles from from {http}");
182
182
-
backfill(HttpSource(http), tx, source_workers.unwrap_or(4), until)
183
183
-
.await
184
184
-
.unwrap();
185
185
-
}
186
186
-
});
187
187
-
188
188
-
// postgres writer will notify us as soon as the very last op's time is known
189
189
-
// so we can start catching up while pg is restoring indexes and stuff
190
190
-
let (notify_last_at, rx_last) = if catch_up {
191
191
-
let (tx, rx) = oneshot::channel();
192
192
-
(Some(tx), Some(rx))
193
193
-
} else {
194
194
-
(None, None)
195
195
-
};
196
196
-
197
197
-
let to_postgres_url_bulk = to_postgres.clone();
198
198
-
let bulk_out_write = tokio::task::spawn(async move {
199
199
-
if let Some(ref url) = to_postgres_url_bulk {
200
200
-
let db = Db::new(url.as_str()).await.unwrap();
201
201
-
backfill_to_pg(db, postgres_reset, rx, notify_last_at)
202
202
-
.await
203
203
-
.unwrap();
204
204
-
} else {
205
205
-
pages_to_stdout(rx, notify_last_at).await.unwrap();
206
206
-
}
207
207
-
});
208
208
-
209
209
-
if let Some(rx_last) = rx_last {
210
210
-
let mut upstream = args.upstream;
211
211
-
upstream.set_path("/export");
212
212
-
// wait until the time for `after` is known
213
213
-
let last_at = rx_last.await.unwrap();
214
214
-
log::info!("beginning catch-up from {last_at:?} while the writer finalizes stuff");
215
215
-
let (tx, rx) = mpsc::channel(256); // these are small pages
216
216
-
tokio::task::spawn(
217
217
-
async move { poll_upstream(last_at, upstream, tx).await.unwrap() },
218
218
-
);
219
219
-
bulk_out_write.await.unwrap();
220
220
-
log::info!("writing catch-up pages");
221
221
-
let full_pages = full_pages(rx);
222
222
-
if let Some(url) = to_postgres {
223
223
-
let db = Db::new(url.as_str()).await.unwrap();
224
224
-
pages_to_pg(db, full_pages).await.unwrap();
225
225
-
} else {
226
226
-
pages_to_stdout(full_pages, None).await.unwrap();
227
227
-
}
228
228
-
}
229
229
-
}
71
71
+
Commands::Backfill { args } => backfill::run(globals, args).await?,
230
72
Commands::Bundle {
231
73
dest,
232
74
after,
233
75
clobber,
234
76
} => {
235
235
-
let mut url = args.upstream;
77
77
+
let mut url = globals.upstream;
236
78
url.set_path("/export");
237
79
let (tx, rx) = mpsc::channel(32); // read ahead if gzip stalls for some reason
238
238
-
tokio::task::spawn(async move { poll_upstream(Some(after), url, tx).await.unwrap() });
80
80
+
tokio::task::spawn(async move {
81
81
+
poll_upstream(Some(after), url, tx)
82
82
+
.await
83
83
+
.expect("to poll upstream")
84
84
+
});
239
85
log::trace!("ensuring output directory exists");
240
240
-
std::fs::create_dir_all(&dest).unwrap();
241
241
-
pages_to_weeks(rx, dest, clobber).await.unwrap();
242
242
-
}
243
243
-
Commands::Mirror {
244
244
-
wrap,
245
245
-
wrap_pg,
246
246
-
bind,
247
247
-
acme_domain,
248
248
-
acme_cache_path,
249
249
-
acme_directory_url,
250
250
-
} => {
251
251
-
let db = Db::new(wrap_pg.as_str()).await.unwrap();
252
252
-
let latest = db
253
253
-
.get_latest()
86
86
+
create_dir_all(&dest)
254
87
.await
255
255
-
.unwrap()
256
256
-
.expect("there to be at least one op in the db. did you backfill?");
257
257
-
258
258
-
let (tx, rx) = mpsc::channel(2);
259
259
-
// upstream poller
260
260
-
let mut url = args.upstream.clone();
261
261
-
tokio::task::spawn(async move {
262
262
-
log::info!("starting poll reader...");
263
263
-
url.set_path("/export");
264
264
-
tokio::task::spawn(
265
265
-
async move { poll_upstream(Some(latest), url, tx).await.unwrap() },
266
266
-
);
267
267
-
});
268
268
-
// db writer
269
269
-
let poll_db = db.clone();
270
270
-
tokio::task::spawn(async move {
271
271
-
log::info!("starting db writer...");
272
272
-
pages_to_pg(poll_db, rx).await.unwrap();
273
273
-
});
274
274
-
275
275
-
let listen_conf = match (bind, acme_domain.is_empty(), acme_cache_path) {
276
276
-
(_, false, Some(cache_path)) => ListenConf::Acme {
277
277
-
domains: acme_domain,
278
278
-
cache_path,
279
279
-
directory_url: acme_directory_url.to_string(),
280
280
-
},
281
281
-
(bind, true, None) => ListenConf::Bind(bind),
282
282
-
(_, _, _) => unreachable!(),
283
283
-
};
284
284
-
285
285
-
serve(&args.upstream, wrap, listen_conf).await.unwrap();
88
88
+
.expect("to ensure output dir exists");
89
89
+
pages_to_weeks(rx, dest, clobber)
90
90
+
.await
91
91
+
.expect("to write bundles to output files");
286
92
}
93
93
+
Commands::Mirror { args } => mirror::run(globals, args).await?,
287
94
Commands::Tail { after } => {
288
288
-
let mut url = args.upstream;
95
95
+
let mut url = globals.upstream;
289
96
url.set_path("/export");
290
97
let start_at = after.or_else(|| Some(chrono::Utc::now()));
291
98
let (tx, rx) = mpsc::channel(1);
292
292
-
tokio::task::spawn(async move { poll_upstream(start_at, url, tx).await.unwrap() });
293
293
-
pages_to_stdout(rx, None).await.unwrap();
99
99
+
tokio::task::spawn(async move {
100
100
+
poll_upstream(start_at, url, tx)
101
101
+
.await
102
102
+
.expect("to poll upstream")
103
103
+
});
104
104
+
pages_to_stdout(rx, None)
105
105
+
.await
106
106
+
.expect("to write pages to stdout");
294
107
}
295
108
}
296
109
log::info!("whew, {:?}. goodbye!", t0.elapsed());
110
110
+
Ok(())
297
111
}
+199
src/bin/backfill.rs
···
1
1
+
use allegedly::{
2
2
+
Db, Dt, ExportPage, FolderSource, HttpSource, backfill, backfill_to_pg, bin::GlobalArgs,
3
3
+
bin_init, full_pages, pages_to_pg, pages_to_stdout, poll_upstream,
4
4
+
};
5
5
+
use clap::Parser;
6
6
+
use reqwest::Url;
7
7
+
use std::path::PathBuf;
8
8
+
use tokio::{
9
9
+
sync::{mpsc, oneshot},
10
10
+
task::JoinSet,
11
11
+
};
12
12
+
13
13
+
pub const DEFAULT_HTTP: &str = "https://plc.t3.storage.dev/plc.directory/";
14
14
+
15
15
+
#[derive(Debug, clap::Args)]
16
16
+
pub struct Args {
17
17
+
/// Remote URL prefix to fetch bundles from
18
18
+
#[arg(long)]
19
19
+
#[clap(default_value = DEFAULT_HTTP)]
20
20
+
http: Url,
21
21
+
/// Local folder to fetch bundles from (overrides `http`)
22
22
+
#[arg(long)]
23
23
+
dir: Option<PathBuf>,
24
24
+
/// Don't do weekly bulk-loading at all.
25
25
+
///
26
26
+
/// overrides `http` and `dir`, makes catch_up redundant
27
27
+
#[arg(long, action)]
28
28
+
no_bulk: bool,
29
29
+
/// Parallel bundle fetchers
30
30
+
///
31
31
+
/// Default: 4 for http fetches, 1 for local folder
32
32
+
#[arg(long)]
33
33
+
source_workers: Option<usize>,
34
34
+
/// Bulk load into did-method-plc-compatible postgres instead of stdout
35
35
+
///
36
36
+
/// Pass a postgres connection url like "postgresql://localhost:5432"
37
37
+
#[arg(long, env = "ALLEGEDLY_TO_POSTGRES")]
38
38
+
to_postgres: Option<Url>,
39
39
+
/// Cert for postgres (if needed)
40
40
+
#[arg(long)]
41
41
+
postgres_cert: Option<PathBuf>,
42
42
+
/// Delete all operations from the postgres db before starting
43
43
+
///
44
44
+
/// only used if `--to-postgres` is present
45
45
+
#[arg(long, action)]
46
46
+
postgres_reset: bool,
47
47
+
/// Stop at the week ending before this date
48
48
+
#[arg(long)]
49
49
+
until: Option<Dt>,
50
50
+
/// After the weekly imports, poll upstream until we're caught up
51
51
+
#[arg(long, action)]
52
52
+
catch_up: bool,
53
53
+
}
54
54
+
55
55
+
pub async fn run(
56
56
+
GlobalArgs { upstream }: GlobalArgs,
57
57
+
Args {
58
58
+
http,
59
59
+
dir,
60
60
+
no_bulk,
61
61
+
source_workers,
62
62
+
to_postgres,
63
63
+
postgres_cert,
64
64
+
postgres_reset,
65
65
+
until,
66
66
+
catch_up,
67
67
+
}: Args,
68
68
+
) -> anyhow::Result<()> {
69
69
+
let mut tasks = JoinSet::<anyhow::Result<&'static str>>::new();
70
70
+
71
71
+
let (bulk_tx, bulk_out) = mpsc::channel(32); // bulk uses big pages
72
72
+
73
73
+
// a bulk sink can notify us as soon as the very last op's time is known
74
74
+
// so we can start catching up while the sink might restore indexes and such
75
75
+
let (found_last_tx, found_last_out) = if catch_up {
76
76
+
let (tx, rx) = oneshot::channel();
77
77
+
(Some(tx), Some(rx))
78
78
+
} else {
79
79
+
(None, None)
80
80
+
};
81
81
+
82
82
+
let (poll_tx, poll_out) = mpsc::channel::<ExportPage>(128); // normal/small pages
83
83
+
let (full_tx, full_out) = mpsc::channel(1); // don't need to buffer at this filter
84
84
+
85
85
+
// set up sources
86
86
+
if no_bulk {
87
87
+
// simple mode, just poll upstream from teh beginning
88
88
+
if http != DEFAULT_HTTP.parse()? {
89
89
+
log::warn!("ignoring non-default bulk http setting since --no-bulk was set");
90
90
+
}
91
91
+
if let Some(d) = dir {
92
92
+
log::warn!("ignoring bulk dir setting ({d:?}) since --no-bulk was set.");
93
93
+
}
94
94
+
if let Some(u) = until {
95
95
+
log::warn!(
96
96
+
"ignoring `until` setting ({u:?}) since --no-bulk was set. (feature request?)"
97
97
+
);
98
98
+
}
99
99
+
let mut upstream = upstream;
100
100
+
upstream.set_path("/export");
101
101
+
tasks.spawn(poll_upstream(None, upstream, poll_tx));
102
102
+
tasks.spawn(full_pages(poll_out, full_tx));
103
103
+
tasks.spawn(pages_to_stdout(full_out, None));
104
104
+
} else {
105
105
+
// fun mode
106
106
+
107
107
+
// set up bulk sources
108
108
+
if let Some(dir) = dir {
109
109
+
if http != DEFAULT_HTTP.parse()? {
110
110
+
anyhow::bail!(
111
111
+
"non-default bulk http setting can't be used with bulk dir setting ({dir:?})"
112
112
+
);
113
113
+
}
114
114
+
tasks.spawn(backfill(
115
115
+
FolderSource(dir),
116
116
+
bulk_tx,
117
117
+
source_workers.unwrap_or(1),
118
118
+
until,
119
119
+
));
120
120
+
} else {
121
121
+
tasks.spawn(backfill(
122
122
+
HttpSource(http),
123
123
+
bulk_tx,
124
124
+
source_workers.unwrap_or(4),
125
125
+
until,
126
126
+
));
127
127
+
}
128
128
+
129
129
+
// and the catch-up source...
130
130
+
if let Some(last) = found_last_out {
131
131
+
tasks.spawn(async move {
132
132
+
let mut upstream = upstream;
133
133
+
upstream.set_path("/export");
134
134
+
poll_upstream(last.await?, upstream, poll_tx).await
135
135
+
});
136
136
+
}
137
137
+
138
138
+
// set up sinks
139
139
+
if let Some(pg_url) = to_postgres {
140
140
+
log::trace!("connecting to postgres...");
141
141
+
let db = Db::new(pg_url.as_str(), postgres_cert).await?;
142
142
+
log::trace!("connected to postgres");
143
143
+
144
144
+
tasks.spawn(backfill_to_pg(
145
145
+
db.clone(),
146
146
+
postgres_reset,
147
147
+
bulk_out,
148
148
+
found_last_tx,
149
149
+
));
150
150
+
if catch_up {
151
151
+
tasks.spawn(pages_to_pg(db, full_out));
152
152
+
}
153
153
+
} else {
154
154
+
tasks.spawn(pages_to_stdout(bulk_out, found_last_tx));
155
155
+
if catch_up {
156
156
+
tasks.spawn(pages_to_stdout(full_out, None));
157
157
+
}
158
158
+
}
159
159
+
}
160
160
+
161
161
+
while let Some(next) = tasks.join_next().await {
162
162
+
match next {
163
163
+
Err(e) if e.is_panic() => {
164
164
+
log::error!("a joinset task panicked: {e}. bailing now. (should we panic?)");
165
165
+
return Err(e.into());
166
166
+
}
167
167
+
Err(e) => {
168
168
+
log::error!("a joinset task failed to join: {e}");
169
169
+
return Err(e.into());
170
170
+
}
171
171
+
Ok(Err(e)) => {
172
172
+
log::error!("a joinset task completed with error: {e}");
173
173
+
return Err(e);
174
174
+
}
175
175
+
Ok(Ok(name)) => {
176
176
+
log::trace!("a task completed: {name:?}. {} left", tasks.len());
177
177
+
}
178
178
+
}
179
179
+
}
180
180
+
181
181
+
Ok(())
182
182
+
}
183
183
+
184
184
+
#[derive(Debug, Parser)]
185
185
+
struct CliArgs {
186
186
+
#[command(flatten)]
187
187
+
globals: GlobalArgs,
188
188
+
#[command(flatten)]
189
189
+
args: Args,
190
190
+
}
191
191
+
192
192
+
#[allow(dead_code)]
193
193
+
#[tokio::main]
194
194
+
async fn main() -> anyhow::Result<()> {
195
195
+
let args = CliArgs::parse();
196
196
+
bin_init("backfill");
197
197
+
run(args.globals, args.args).await?;
198
198
+
Ok(())
199
199
+
}
+128
src/bin/mirror.rs
···
1
1
+
use allegedly::{Db, ListenConf, bin::GlobalArgs, bin_init, pages_to_pg, poll_upstream, serve};
2
2
+
use clap::Parser;
3
3
+
use reqwest::Url;
4
4
+
use std::{net::SocketAddr, path::PathBuf};
5
5
+
use tokio::{fs::create_dir_all, sync::mpsc, task::JoinSet};
6
6
+
7
7
+
#[derive(Debug, clap::Args)]
8
8
+
pub struct Args {
9
9
+
/// the wrapped did-method-plc server
10
10
+
#[arg(long, env = "ALLEGEDLY_WRAP")]
11
11
+
wrap: Url,
12
12
+
/// the wrapped did-method-plc server's database (write access required)
13
13
+
#[arg(long, env = "ALLEGEDLY_WRAP_PG")]
14
14
+
wrap_pg: Url,
15
15
+
/// path to tls cert for the wrapped postgres db, if needed
16
16
+
#[arg(long, env = "ALLEGEDLY_WRAP_PG_CERT")]
17
17
+
wrap_pg_cert: Option<PathBuf>,
18
18
+
/// wrapping server listen address
19
19
+
#[arg(short, long, env = "ALLEGEDLY_BIND")]
20
20
+
#[clap(default_value = "127.0.0.1:8000")]
21
21
+
bind: SocketAddr,
22
22
+
/// obtain a certificate from letsencrypt
23
23
+
///
24
24
+
/// for now this will force listening on all interfaces at :80 and :443
25
25
+
/// (:80 will serve an "https required" error, *will not* redirect)
26
26
+
#[arg(
27
27
+
long,
28
28
+
conflicts_with("bind"),
29
29
+
requires("acme_cache_path"),
30
30
+
env = "ALLEGEDLY_ACME_DOMAIN"
31
31
+
)]
32
32
+
acme_domain: Vec<String>,
33
33
+
/// which local directory to keep the letsencrypt certs in
34
34
+
#[arg(long, requires("acme_domain"), env = "ALLEGEDLY_ACME_CACHE_PATH")]
35
35
+
acme_cache_path: Option<PathBuf>,
36
36
+
/// which public acme directory to use
37
37
+
///
38
38
+
/// eg. letsencrypt staging: "https://acme-staging-v02.api.letsencrypt.org/directory"
39
39
+
#[arg(long, requires("acme_domain"), env = "ALLEGEDLY_ACME_DIRECTORY_URL")]
40
40
+
#[clap(default_value = "https://acme-v02.api.letsencrypt.org/directory")]
41
41
+
acme_directory_url: Url,
42
42
+
}
43
43
+
44
44
+
pub async fn run(
45
45
+
GlobalArgs { upstream }: GlobalArgs,
46
46
+
Args {
47
47
+
wrap,
48
48
+
wrap_pg,
49
49
+
wrap_pg_cert,
50
50
+
bind,
51
51
+
acme_domain,
52
52
+
acme_cache_path,
53
53
+
acme_directory_url,
54
54
+
}: Args,
55
55
+
) -> anyhow::Result<()> {
56
56
+
let db = Db::new(wrap_pg.as_str(), wrap_pg_cert).await?;
57
57
+
58
58
+
// TODO: allow starting up with polling backfill from beginning?
59
59
+
log::debug!("getting the latest op from the db...");
60
60
+
let latest = db
61
61
+
.get_latest()
62
62
+
.await?
63
63
+
.expect("there to be at least one op in the db. did you backfill?");
64
64
+
65
65
+
let listen_conf = match (bind, acme_domain.is_empty(), acme_cache_path) {
66
66
+
(_, false, Some(cache_path)) => {
67
67
+
log::info!("configuring acme for https at {acme_domain:?}...");
68
68
+
create_dir_all(&cache_path).await?;
69
69
+
ListenConf::Acme {
70
70
+
domains: acme_domain,
71
71
+
cache_path,
72
72
+
directory_url: acme_directory_url.to_string(),
73
73
+
}
74
74
+
}
75
75
+
(bind, true, None) => ListenConf::Bind(bind),
76
76
+
(_, _, _) => unreachable!(),
77
77
+
};
78
78
+
79
79
+
let mut tasks = JoinSet::new();
80
80
+
81
81
+
let (send_page, recv_page) = mpsc::channel(8);
82
82
+
83
83
+
let mut poll_url = upstream.clone();
84
84
+
poll_url.set_path("/export");
85
85
+
86
86
+
tasks.spawn(poll_upstream(Some(latest), poll_url, send_page));
87
87
+
tasks.spawn(pages_to_pg(db.clone(), recv_page));
88
88
+
tasks.spawn(serve(upstream, wrap, listen_conf));
89
89
+
90
90
+
while let Some(next) = tasks.join_next().await {
91
91
+
match next {
92
92
+
Err(e) if e.is_panic() => {
93
93
+
log::error!("a joinset task panicked: {e}. bailing now. (should we panic?)");
94
94
+
return Err(e.into());
95
95
+
}
96
96
+
Err(e) => {
97
97
+
log::error!("a joinset task failed to join: {e}");
98
98
+
return Err(e.into());
99
99
+
}
100
100
+
Ok(Err(e)) => {
101
101
+
log::error!("a joinset task completed with error: {e}");
102
102
+
return Err(e);
103
103
+
}
104
104
+
Ok(Ok(name)) => {
105
105
+
log::trace!("a task completed: {name:?}. {} left", tasks.len());
106
106
+
}
107
107
+
}
108
108
+
}
109
109
+
110
110
+
Ok(())
111
111
+
}
112
112
+
113
113
+
#[derive(Debug, Parser)]
114
114
+
struct CliArgs {
115
115
+
#[command(flatten)]
116
116
+
globals: GlobalArgs,
117
117
+
#[command(flatten)]
118
118
+
args: Args,
119
119
+
}
120
120
+
121
121
+
#[allow(dead_code)]
122
122
+
#[tokio::main]
123
123
+
async fn main() -> anyhow::Result<()> {
124
124
+
let args = CliArgs::parse();
125
125
+
bin_init("mirror");
126
126
+
run(args.globals, args.args).await?;
127
127
+
Ok(())
128
128
+
}
+14
src/bin/mod.rs
···
1
1
+
use reqwest::Url;
2
2
+
3
3
+
#[derive(Debug, Clone, clap::Args)]
4
4
+
pub struct GlobalArgs {
5
5
+
/// Upstream PLC server
6
6
+
#[arg(short, long, global = true, env = "ALLEGEDLY_UPSTREAM")]
7
7
+
#[clap(default_value = "https://plc.directory")]
8
8
+
pub upstream: Url,
9
9
+
}
10
10
+
11
11
+
#[allow(dead_code)]
12
12
+
fn main() {
13
13
+
panic!("this is not actually a module")
14
14
+
}
+4
-1
src/client.rs
···
10
10
);
11
11
12
12
pub static CLIENT: LazyLock<ClientWithMiddleware> = LazyLock::new(|| {
13
13
-
let inner = Client::builder().user_agent(UA).build().unwrap();
13
13
+
let inner = Client::builder()
14
14
+
.user_agent(UA)
15
15
+
.build()
16
16
+
.expect("reqwest client to build");
14
17
15
18
let policy = ExponentialBackoff::builder().build_with_max_retries(12);
16
19
+79
-10
src/lib.rs
···
1
1
-
use serde::Deserialize;
1
1
+
use serde::{Deserialize, Serialize};
2
2
+
use tokio::sync::{mpsc, oneshot};
2
3
3
4
mod backfill;
4
5
mod client;
···
7
8
mod poll;
8
9
mod ratelimit;
9
10
mod weekly;
11
11
+
12
12
+
pub mod bin;
10
13
11
14
pub use backfill::backfill;
12
15
pub use client::{CLIENT, UA};
···
23
26
/// plc.directory caps /export at 1000 ops; backfill tasks may send more in a page.
24
27
#[derive(Debug)]
25
28
pub struct ExportPage {
26
26
-
pub ops: Vec<String>,
29
29
+
pub ops: Vec<Op>,
27
30
}
28
31
29
32
impl ExportPage {
···
35
38
/// A fully-deserialized plc operation
36
39
///
37
40
/// including the plc's wrapping with timestmap and nullified state
38
38
-
#[derive(Debug, Deserialize)]
41
41
+
#[derive(Debug, Clone, Deserialize, Serialize)]
39
42
#[serde(rename_all = "camelCase")]
40
40
-
pub struct Op<'a> {
41
41
-
pub did: &'a str,
42
42
-
pub cid: &'a str,
43
43
+
pub struct Op {
44
44
+
pub did: String,
45
45
+
pub cid: String,
43
46
pub created_at: Dt,
44
47
pub nullified: bool,
45
45
-
#[serde(borrow)]
46
46
-
pub operation: &'a serde_json::value::RawValue,
48
48
+
pub operation: Box<serde_json::value::RawValue>,
49
49
+
}
50
50
+
51
51
+
#[cfg(test)]
52
52
+
impl PartialEq for Op {
53
53
+
fn eq(&self, other: &Self) -> bool {
54
54
+
self.did == other.did
55
55
+
&& self.cid == other.cid
56
56
+
&& self.created_at == other.created_at
57
57
+
&& self.nullified == other.nullified
58
58
+
&& serde_json::from_str::<serde_json::Value>(self.operation.get()).unwrap()
59
59
+
== serde_json::from_str::<serde_json::Value>(other.operation.get()).unwrap()
60
60
+
}
47
61
}
48
62
49
63
/// Database primary key for an op
···
53
67
pub cid: String,
54
68
}
55
69
56
56
-
impl From<&Op<'_>> for OpKey {
57
57
-
fn from(Op { did, cid, .. }: &Op<'_>) -> Self {
70
70
+
impl From<&Op> for OpKey {
71
71
+
fn from(Op { did, cid, .. }: &Op) -> Self {
58
72
Self {
59
73
did: did.to_string(),
60
74
cid: cid.to_string(),
61
75
}
62
76
}
77
77
+
}
78
78
+
79
79
+
/// page forwarder who drops its channels on receipt of a small page
80
80
+
///
81
81
+
/// PLC will return up to 1000 ops on a page, and returns full pages until it
82
82
+
/// has caught up, so this is a (hacky?) way to stop polling once we're up.
83
83
+
pub async fn full_pages(
84
84
+
mut rx: mpsc::Receiver<ExportPage>,
85
85
+
tx: mpsc::Sender<ExportPage>,
86
86
+
) -> anyhow::Result<&'static str> {
87
87
+
while let Some(page) = rx.recv().await {
88
88
+
let n = page.ops.len();
89
89
+
if n < 900 {
90
90
+
let last_age = page.ops.last().map(|op| chrono::Utc::now() - op.created_at);
91
91
+
let Some(age) = last_age else {
92
92
+
log::info!("full_pages done, empty final page");
93
93
+
return Ok("full pages (hmm)");
94
94
+
};
95
95
+
if age <= chrono::TimeDelta::hours(6) {
96
96
+
log::info!("full_pages done, final page of {n} ops");
97
97
+
} else {
98
98
+
log::warn!("full_pages finished with small page of {n} ops, but it's {age} old");
99
99
+
}
100
100
+
return Ok("full pages (cool)");
101
101
+
}
102
102
+
log::trace!("full_pages: continuing with page of {n} ops");
103
103
+
tx.send(page).await?;
104
104
+
}
105
105
+
Err(anyhow::anyhow!(
106
106
+
"full_pages ran out of source material, sender closed"
107
107
+
))
108
108
+
}
109
109
+
110
110
+
pub async fn pages_to_stdout(
111
111
+
mut rx: mpsc::Receiver<ExportPage>,
112
112
+
notify_last_at: Option<oneshot::Sender<Option<Dt>>>,
113
113
+
) -> anyhow::Result<&'static str> {
114
114
+
let mut last_at = None;
115
115
+
while let Some(page) = rx.recv().await {
116
116
+
for op in &page.ops {
117
117
+
println!("{}", serde_json::to_string(op)?);
118
118
+
}
119
119
+
if notify_last_at.is_some()
120
120
+
&& let Some(s) = PageBoundaryState::new(&page)
121
121
+
{
122
122
+
last_at = last_at.filter(|&l| l >= s.last_at).or(Some(s.last_at));
123
123
+
}
124
124
+
}
125
125
+
if let Some(notify) = notify_last_at {
126
126
+
log::trace!("notifying last_at: {last_at:?}");
127
127
+
if notify.send(last_at).is_err() {
128
128
+
log::error!("receiver for last_at dropped, can't notify");
129
129
+
};
130
130
+
}
131
131
+
Ok("pages_to_stdout")
63
132
}
64
133
65
134
pub fn logo(name: &str) -> String {
+46
-24
src/mirror.rs
···
24
24
format!(
25
25
r#"{}
26
26
27
27
-
This is a PLC[1] mirror running Allegedly[2] in mirror mode. Allegedly synchronizes and proxies to a downstream PLC reference server instance[3] (why?[4]).
27
27
+
This is a PLC[1] mirror running Allegedly in mirror mode. Mirror mode wraps and
28
28
+
synchronizes a local PLC reference server instance[2] (why?[3]).
28
29
29
30
30
31
Configured upstream:
···
34
35
35
36
Available APIs:
36
37
37
37
-
- All PLC GET requests [5].
38
38
-
- Rejects POSTs. This is a mirror.
38
38
+
- GET /_health Health and version info
39
39
+
40
40
+
- GET /* Proxies to wrapped server; see PLC API docs:
41
41
+
https://web.plc.directory/api/redoc
42
42
+
43
43
+
- POST /* Always rejected. This is a mirror.
44
44
+
45
45
+
46
46
+
tip: try `GET /{{did}}` to resolve an identity
47
47
+
48
48
+
49
49
+
Allegedly is a suit of open-source CLI tools for working with PLC logs:
39
50
40
40
-
try `GET /{{did}}` to resolve an identity
51
51
+
https://tangled.org/@microcosm.blue/Allegedly
41
52
42
53
43
54
[1] https://web.plc.directory
44
44
-
[2] https://tangled.org/@microcosm.blue/Allegedly
45
45
-
[3] https://github.com/did-method-plc/did-method-plc
46
46
-
[4] https://updates.microcosm.blue/3lz7nwvh4zc2u
47
47
-
[5] https://web.plc.directory/api/redoc
48
48
-
55
55
+
[2] https://github.com/did-method-plc/did-method-plc
56
56
+
[3] https://updates.microcosm.blue/3lz7nwvh4zc2u
49
57
"#,
50
58
logo("mirror")
51
59
)
60
60
+
}
61
61
+
62
62
+
#[handler]
63
63
+
fn favicon() -> impl IntoResponse {
64
64
+
include_bytes!("../favicon.ico").with_content_type("image/x-icon")
52
65
}
53
66
54
67
fn failed_to_reach_wrapped() -> String {
···
186
199
Bind(SocketAddr),
187
200
}
188
201
189
189
-
pub async fn serve(upstream: &Url, plc: Url, listen: ListenConf) -> std::io::Result<()> {
202
202
+
pub async fn serve(upstream: Url, plc: Url, listen: ListenConf) -> anyhow::Result<&'static str> {
203
203
+
log::info!("starting server...");
204
204
+
190
205
// not using crate CLIENT: don't want the retries etc
191
206
let client = Client::builder()
192
207
.user_agent(UA)
193
208
.timeout(Duration::from_secs(10)) // fallback
194
209
.build()
195
195
-
.unwrap();
210
210
+
.expect("reqwest client to build");
196
211
197
212
let state = State {
198
213
client,
···
202
217
203
218
let app = Route::new()
204
219
.at("/", get(hello))
220
220
+
.at("/favicon.ico", get(favicon))
205
221
.at("/_health", get(health))
206
222
.at("/:any", get(proxy).post(nope))
207
223
.with(AddData::new(state))
208
224
.with(Cors::new().allow_credentials(false))
209
225
.with(Compression::new())
210
226
.with(GovernorMiddleware::new(Quota::per_minute(
211
211
-
3000.try_into().unwrap(),
227
227
+
3000.try_into().expect("ratelimit middleware to build"),
212
228
)))
213
229
.with(CatchPanic::new())
214
230
.with(Tracing);
···
231
247
}
232
248
let auto_cert = auto_cert.build().expect("acme config to build");
233
249
234
234
-
run_insecure_notice();
235
235
-
run(app, TcpListener::bind("0.0.0.0:443").acme(auto_cert)).await
250
250
+
let notice_task = tokio::task::spawn(run_insecure_notice());
251
251
+
let app_res = run(app, TcpListener::bind("0.0.0.0:443").acme(auto_cert)).await;
252
252
+
log::warn!("server task ended, aborting insecure server task...");
253
253
+
notice_task.abort();
254
254
+
app_res?;
255
255
+
notice_task.await??;
236
256
}
237
237
-
ListenConf::Bind(addr) => run(app, TcpListener::bind(addr)).await,
257
257
+
ListenConf::Bind(addr) => run(app, TcpListener::bind(addr)).await?,
238
258
}
259
259
+
260
260
+
Ok("server (uh oh?)")
239
261
}
240
262
241
263
async fn run<A, L>(app: A, listener: L) -> std::io::Result<()>
···
250
272
}
251
273
252
274
/// kick off a tiny little server on a tokio task to tell people to use 443
253
253
-
fn run_insecure_notice() {
275
275
+
async fn run_insecure_notice() -> Result<(), std::io::Error> {
254
276
#[handler]
255
277
fn oop_plz_be_secure() -> (StatusCode, String) {
256
278
(
···
265
287
)
266
288
}
267
289
268
268
-
let app = Route::new().at("/", get(oop_plz_be_secure)).with(Tracing);
269
269
-
let listener = TcpListener::bind("0.0.0.0:80");
270
270
-
tokio::task::spawn(async move {
271
271
-
Server::new(listener)
272
272
-
.name("allegedly (mirror:80 helper)")
273
273
-
.run(app)
274
274
-
.await
275
275
-
});
290
290
+
let app = Route::new()
291
291
+
.at("/", get(oop_plz_be_secure))
292
292
+
.at("/favicon.ico", get(favicon))
293
293
+
.with(Tracing);
294
294
+
Server::new(TcpListener::bind("0.0.0.0:80"))
295
295
+
.name("allegedly (mirror:80 helper)")
296
296
+
.run(app)
297
297
+
.await
276
298
}
+69
-45
src/plc_pg.rs
···
1
1
-
use crate::{Dt, ExportPage, Op, PageBoundaryState};
1
1
+
use crate::{Dt, ExportPage, PageBoundaryState};
2
2
+
use native_tls::{Certificate, TlsConnector};
3
3
+
use postgres_native_tls::MakeTlsConnector;
4
4
+
use std::path::PathBuf;
2
5
use std::pin::pin;
3
6
use std::time::Instant;
4
4
-
use tokio::sync::{mpsc, oneshot};
7
7
+
use tokio::{
8
8
+
sync::{mpsc, oneshot},
9
9
+
task::{JoinHandle, spawn},
10
10
+
};
5
11
use tokio_postgres::{
6
6
-
Client, Error as PgError, NoTls,
12
12
+
Client, Error as PgError, NoTls, Socket,
7
13
binary_copy::BinaryCopyInWriter,
8
14
connect,
15
15
+
tls::MakeTlsConnect,
9
16
types::{Json, Type},
10
17
};
11
18
19
19
+
fn get_tls(cert: PathBuf) -> anyhow::Result<MakeTlsConnector> {
20
20
+
let cert = std::fs::read(cert)?;
21
21
+
let cert = Certificate::from_pem(&cert)?;
22
22
+
let connector = TlsConnector::builder().add_root_certificate(cert).build()?;
23
23
+
Ok(MakeTlsConnector::new(connector))
24
24
+
}
25
25
+
26
26
+
async fn get_client_and_task<T>(
27
27
+
uri: &str,
28
28
+
connector: T,
29
29
+
) -> Result<(Client, JoinHandle<Result<(), PgError>>), PgError>
30
30
+
where
31
31
+
T: MakeTlsConnect<Socket>,
32
32
+
<T as MakeTlsConnect<Socket>>::Stream: Send + 'static,
33
33
+
{
34
34
+
let (client, connection) = connect(uri, connector).await?;
35
35
+
Ok((client, spawn(connection)))
36
36
+
}
37
37
+
12
38
/// a little tokio-postgres helper
13
39
///
14
40
/// it's clone for easiness. it doesn't share any resources underneath after
15
15
-
/// cloning at all so it's not meant for
16
16
-
#[derive(Debug, Clone)]
41
41
+
/// cloning *at all* so it's not meant for eg. handling public web requests
42
42
+
#[derive(Clone)]
17
43
pub struct Db {
18
44
pg_uri: String,
45
45
+
cert: Option<MakeTlsConnector>,
19
46
}
20
47
21
48
impl Db {
22
22
-
pub async fn new(pg_uri: &str) -> Result<Self, anyhow::Error> {
49
49
+
pub async fn new(pg_uri: &str, cert: Option<PathBuf>) -> Result<Self, anyhow::Error> {
23
50
// we're going to interact with did-method-plc's database, so make sure
24
51
// it's what we expect: check for db migrations.
25
52
log::trace!("checking migrations...");
26
26
-
let (client, connection) = connect(pg_uri, NoTls).await?;
27
27
-
let connection_task = tokio::task::spawn(async move {
28
28
-
connection
29
29
-
.await
30
30
-
.inspect_err(|e| log::error!("connection ended with error: {e}"))
31
31
-
.unwrap();
32
32
-
});
53
53
+
54
54
+
let connector = cert.map(get_tls).transpose()?;
55
55
+
56
56
+
let (client, conn_task) = if let Some(ref connector) = connector {
57
57
+
get_client_and_task(pg_uri, connector.clone()).await?
58
58
+
} else {
59
59
+
get_client_and_task(pg_uri, NoTls).await?
60
60
+
};
61
61
+
33
62
let migrations: Vec<String> = client
34
63
.query("SELECT name FROM kysely_migration ORDER BY name", &[])
35
64
.await?
···
47
76
);
48
77
drop(client);
49
78
// make sure the connection worker thing doesn't linger
50
50
-
connection_task.await?;
79
79
+
conn_task.await??;
51
80
log::info!("db connection succeeded and plc migrations appear as expected");
52
81
53
82
Ok(Self {
54
83
pg_uri: pg_uri.to_string(),
84
84
+
cert: connector,
55
85
})
56
86
}
57
87
58
58
-
pub async fn connect(&self) -> Result<Client, PgError> {
88
88
+
pub async fn connect(&self) -> Result<(Client, JoinHandle<Result<(), PgError>>), PgError> {
59
89
log::trace!("connecting postgres...");
60
60
-
let (client, connection) = connect(&self.pg_uri, NoTls).await?;
61
61
-
62
62
-
// send the connection away to do the actual communication work
63
63
-
// apparently the connection will complete when the client drops
64
64
-
tokio::task::spawn(async move {
65
65
-
connection
66
66
-
.await
67
67
-
.inspect_err(|e| log::error!("connection ended with error: {e}"))
68
68
-
.unwrap();
69
69
-
});
70
70
-
71
71
-
Ok(client)
90
90
+
if let Some(ref connector) = self.cert {
91
91
+
get_client_and_task(&self.pg_uri, connector.clone()).await
92
92
+
} else {
93
93
+
get_client_and_task(&self.pg_uri, NoTls).await
94
94
+
}
72
95
}
73
96
74
97
pub async fn get_latest(&self) -> Result<Option<Dt>, PgError> {
75
75
-
let client = self.connect().await?;
98
98
+
let (client, task) = self.connect().await?;
76
99
let dt: Option<Dt> = client
77
100
.query_opt(
78
101
r#"SELECT "createdAt"
···
83
106
)
84
107
.await?
85
108
.map(|row| row.get(0));
109
109
+
drop(task);
86
110
Ok(dt)
87
111
}
88
112
}
89
113
90
90
-
pub async fn pages_to_pg(db: Db, mut pages: mpsc::Receiver<ExportPage>) -> Result<(), PgError> {
91
91
-
let mut client = db.connect().await?;
114
114
+
pub async fn pages_to_pg(
115
115
+
db: Db,
116
116
+
mut pages: mpsc::Receiver<ExportPage>,
117
117
+
) -> anyhow::Result<&'static str> {
118
118
+
log::info!("starting pages_to_pg writer...");
119
119
+
120
120
+
let (mut client, task) = db.connect().await?;
92
121
93
122
let ops_stmt = client
94
123
.prepare(
···
108
137
while let Some(page) = pages.recv().await {
109
138
log::trace!("writing page with {} ops", page.ops.len());
110
139
let tx = client.transaction().await?;
111
111
-
for s in page.ops {
112
112
-
let Ok(op) = serde_json::from_str::<Op>(&s) else {
113
113
-
log::warn!("ignoring unparseable op {s:?}");
114
114
-
continue;
115
115
-
};
140
140
+
for op in page.ops {
116
141
ops_inserted += tx
117
142
.execute(
118
143
&ops_stmt,
···
129
154
}
130
155
tx.commit().await?;
131
156
}
157
157
+
drop(task);
132
158
133
159
log::info!(
134
160
"no more pages. inserted {ops_inserted} ops and {dids_inserted} dids in {:?}",
135
161
t0.elapsed()
136
162
);
137
137
-
Ok(())
163
163
+
Ok("pages_to_pg")
138
164
}
139
165
140
166
/// Dump rows into an empty operations table quickly
···
155
181
reset: bool,
156
182
mut pages: mpsc::Receiver<ExportPage>,
157
183
notify_last_at: Option<oneshot::Sender<Option<Dt>>>,
158
158
-
) -> Result<(), PgError> {
159
159
-
let mut client = db.connect().await?;
184
184
+
) -> anyhow::Result<&'static str> {
185
185
+
let (mut client, task) = db.connect().await?;
160
186
161
187
let t0 = Instant::now();
162
188
let tx = client.transaction().await?;
···
212
238
let mut writer = pin!(BinaryCopyInWriter::new(sync, types));
213
239
let mut last_at = None;
214
240
while let Some(page) = pages.recv().await {
215
215
-
for s in &page.ops {
216
216
-
let Ok(op) = serde_json::from_str::<Op>(s) else {
217
217
-
log::warn!("ignoring unparseable op: {s:?}");
218
218
-
continue;
219
219
-
};
241
241
+
for op in &page.ops {
220
242
writer
221
243
.as_mut()
222
244
.write(&[
223
245
&op.did,
224
224
-
&Json(op.operation),
246
246
+
&Json(op.operation.clone()),
225
247
&op.cid,
226
248
&op.nullified,
227
249
&op.created_at,
···
234
256
last_at = last_at.filter(|&l| l >= s.last_at).or(Some(s.last_at));
235
257
}
236
258
}
259
259
+
log::debug!("finished receiving bulk pages");
237
260
238
261
if let Some(notify) = notify_last_at {
239
262
log::trace!("notifying last_at: {last_at:?}");
···
274
297
log::trace!("set tables LOGGED: {:?}", t_step.elapsed());
275
298
276
299
tx.commit().await?;
300
300
+
drop(task);
277
301
log::info!("total backfill time: {:?}", t0.elapsed());
278
302
279
279
-
Ok(())
303
303
+
Ok("backfill_to_pg")
280
304
}
+66
-254
src/poll.rs
···
26
26
pk: (String, String), // did, cid
27
27
}
28
28
29
29
-
impl From<Op<'_>> for LastOp {
29
29
+
impl From<Op> for LastOp {
30
30
fn from(op: Op) -> Self {
31
31
Self {
32
32
created_at: op.created_at,
33
33
-
pk: (op.did.to_string(), op.cid.to_string()),
33
33
+
pk: (op.did, op.cid),
34
34
}
35
35
}
36
36
}
37
37
38
38
+
impl From<&Op> for LastOp {
39
39
+
fn from(op: &Op) -> Self {
40
40
+
Self {
41
41
+
created_at: op.created_at,
42
42
+
pk: (op.did.clone(), op.cid.clone()),
43
43
+
}
44
44
+
}
45
45
+
}
46
46
+
47
47
+
// bit of a hack
38
48
impl From<Dt> for LastOp {
39
49
fn from(dt: Dt) -> Self {
40
50
Self {
···
51
61
keys_at: Vec<OpKey>, // expected to ~always be length one
52
62
}
53
63
54
54
-
// ok so this is silly.
55
55
-
//
56
56
-
// i think i had some idea that deferring parsing to later steps would make it
57
57
-
// easier to do things like sometimes not parsing at all (where the output is
58
58
-
// also json lines), and maybe avoid some memory shuffling.
59
59
-
// but since the input already has to be split into lines, keeping them as line
60
60
-
// strings is probably the worst option: space-inefficient, allows garbage, and
61
61
-
// leads to, well, this impl.
62
62
-
//
63
63
-
// it almost could have been slick if the *original* was just reused, and the
64
64
-
// parsed ops were just kind of on the side referencing into it, but i'm lazy
65
65
-
// and didn't get it there.
66
66
-
//
67
67
-
// should unrefactor to make Op own its data again, parse (and deal with errors)
68
68
-
// upfront, and probably greatly simplify everything downstream. simple.
64
64
+
/// track keys at final createdAt to deduplicate the start of the next page
69
65
impl PageBoundaryState {
70
66
pub fn new(page: &ExportPage) -> Option<Self> {
71
71
-
let mut skips = 0;
72
72
-
73
67
// grab the very last op
74
74
-
let (last_at, last_key) = loop {
75
75
-
let Some(s) = page.ops.iter().rev().nth(skips).cloned() else {
76
76
-
// there are no ops left? oop. bail.
77
77
-
// last_at and existing keys remain in tact if there was no later op
78
78
-
return None;
79
79
-
};
80
80
-
if s.is_empty() {
81
81
-
// annoying: ignore any trailing blank lines
82
82
-
skips += 1;
83
83
-
continue;
84
84
-
}
85
85
-
let Ok(op) = serde_json::from_str::<Op>(&s)
86
86
-
.inspect_err(|e| log::warn!("deduplication failed last op parsing ({s:?}: {e}), ignoring for downstream to deal with."))
87
87
-
else {
88
88
-
// doubly annoying: skip over trailing garbage??
89
89
-
skips += 1;
90
90
-
continue;
91
91
-
};
92
92
-
break (op.created_at, Into::<OpKey>::into(&op));
93
93
-
};
68
68
+
let (last_at, last_key) = page.ops.last().map(|op| (op.created_at, op.into()))?;
94
69
95
70
// set initial state
96
71
let mut me = Self {
···
99
74
};
100
75
101
76
// and make sure all keys at this time are captured from the back
102
102
-
me.capture_nth_last_at(page, last_at, skips);
77
77
+
me.capture_nth_last_at(page, last_at, 1);
103
78
104
79
Some(me)
105
80
}
···
108
83
let to_remove: Vec<usize> = page
109
84
.ops
110
85
.iter()
111
111
-
.map(|s| serde_json::from_str::<Op>(s).inspect_err(|e|
112
112
-
log::warn!("deduplication failed op parsing ({s:?}: {e}), bailing for downstream to deal with.")))
113
86
.enumerate()
114
114
-
.take_while(|(_, opr)| opr.as_ref().map(|op| op.created_at == self.last_at).unwrap_or(false))
115
115
-
.filter_map(|(i, opr)| {
116
116
-
if self.keys_at.contains(&(&opr.expect("any Errs were filtered by take_while")).into()) {
117
117
-
Some(i)
118
118
-
} else { None }
119
119
-
})
87
87
+
.take_while(|(_, op)| op.created_at == self.last_at)
88
88
+
.filter(|(_, op)| self.keys_at.contains(&(*op).into()))
89
89
+
.map(|(i, _)| i)
120
90
.collect();
121
91
122
122
-
// actually remove them. last to first to indices don't shift
92
92
+
// actually remove them. last to first so indices don't shift
123
93
for dup_idx in to_remove.into_iter().rev() {
124
94
page.ops.remove(dup_idx);
125
95
}
126
96
127
97
// grab the very last op
128
128
-
let mut skips = 0;
129
129
-
let (last_at, last_key) = loop {
130
130
-
let Some(s) = page.ops.iter().rev().nth(skips).cloned() else {
131
131
-
// there are no ops left? oop. bail.
132
132
-
// last_at and existing keys remain in tact if there was no later op
133
133
-
return;
134
134
-
};
135
135
-
if s.is_empty() {
136
136
-
// annoying: trim off any trailing blank lines
137
137
-
skips += 1;
138
138
-
continue;
139
139
-
}
140
140
-
let Ok(op) = serde_json::from_str::<Op>(&s)
141
141
-
.inspect_err(|e| log::warn!("deduplication failed last op parsing ({s:?}: {e}), ignoring for downstream to deal with."))
142
142
-
else {
143
143
-
// doubly annoying: skip over trailing garbage??
144
144
-
skips += 1;
145
145
-
continue;
146
146
-
};
147
147
-
break (op.created_at, Into::<OpKey>::into(&op));
98
98
+
let Some((last_at, last_key)) = page.ops.last().map(|op| (op.created_at, op.into())) else {
99
99
+
// there are no ops left? oop. bail.
100
100
+
// last_at and existing keys remain in tact
101
101
+
return;
148
102
};
149
103
150
104
// reset state (as long as time actually moved forward on this page)
···
157
111
self.keys_at.push(last_key);
158
112
}
159
113
// and make sure all keys at this time are captured from the back
160
160
-
self.capture_nth_last_at(page, last_at, skips);
114
114
+
self.capture_nth_last_at(page, last_at, 1);
161
115
}
162
116
163
117
/// walk backwards from 2nd last and collect keys until created_at changes
···
166
120
.iter()
167
121
.rev()
168
122
.skip(skips)
169
169
-
.skip(1) // we alredy added the very last one
170
170
-
.map(|s| serde_json::from_str::<Op>(s).inspect_err(|e|
171
171
-
log::warn!("deduplication failed op parsing ({s:?}: {e}), bailing for downstream to deal with.")))
172
172
-
.take_while(|opr| opr.as_ref().map(|op| op.created_at == last_at).unwrap_or(false))
173
173
-
.for_each(|opr| {
174
174
-
let op = &opr.expect("any Errs were filtered by take_while");
123
123
+
.take_while(|op| op.created_at == last_at)
124
124
+
.for_each(|op| {
175
125
self.keys_at.push(op.into());
176
126
});
177
127
}
···
180
130
pub async fn get_page(url: Url) -> Result<(ExportPage, Option<LastOp>), GetPageError> {
181
131
log::trace!("Getting page: {url}");
182
132
183
183
-
let ops: Vec<String> = CLIENT
133
133
+
let ops: Vec<Op> = CLIENT
184
134
.get(url)
185
135
.send()
186
136
.await?
···
190
140
.trim()
191
141
.split('\n')
192
142
.filter_map(|s| {
193
193
-
let s = s.trim();
194
194
-
if s.is_empty() { None } else { Some(s) }
143
143
+
serde_json::from_str::<Op>(s)
144
144
+
.inspect_err(|e| log::warn!("failed to parse op: {e} ({s})"))
145
145
+
.ok()
195
146
})
196
196
-
.map(Into::into)
197
147
.collect();
198
148
199
199
-
let last_op = ops
200
200
-
.last()
201
201
-
.filter(|s| !s.is_empty())
202
202
-
.map(|s| serde_json::from_str::<Op>(s))
203
203
-
.transpose()?
204
204
-
.map(Into::into)
205
205
-
.inspect(|at| log::trace!("new last op: {at:?}"));
149
149
+
let last_op = ops.last().map(Into::into);
206
150
207
151
Ok((ExportPage { ops }, last_op))
208
152
}
···
211
155
after: Option<Dt>,
212
156
base: Url,
213
157
dest: mpsc::Sender<ExportPage>,
214
214
-
) -> anyhow::Result<()> {
158
158
+
) -> anyhow::Result<&'static str> {
159
159
+
log::info!("starting upstream poller after {after:?}");
215
160
let mut tick = tokio::time::interval(UPSTREAM_REQUEST_INTERVAL);
216
161
let mut prev_last: Option<LastOp> = after.map(Into::into);
217
162
let mut boundary_state: Option<PageBoundaryState> = None;
···
252
197
const FIVES_TS: i64 = 1431648000;
253
198
const NEXT_TS: i64 = 1431648001;
254
199
255
255
-
fn valid_op() -> serde_json::Value {
256
256
-
serde_json::json!({
200
200
+
fn valid_op() -> Op {
201
201
+
serde_json::from_value(serde_json::json!({
257
202
"did": "did",
258
203
"cid": "cid",
259
204
"createdAt": "2015-05-15T00:00:00Z",
260
205
"nullified": false,
261
206
"operation": {},
262
262
-
})
207
207
+
}))
208
208
+
.unwrap()
263
209
}
264
210
265
265
-
fn next_op() -> serde_json::Value {
266
266
-
serde_json::json!({
211
211
+
fn next_op() -> Op {
212
212
+
serde_json::from_value(serde_json::json!({
267
213
"did": "didnext",
268
214
"cid": "cidnext",
269
215
"createdAt": "2015-05-15T00:00:01Z",
270
216
"nullified": false,
271
217
"operation": {},
272
272
-
})
218
218
+
}))
219
219
+
.unwrap()
273
220
}
274
221
275
222
fn base_state() -> PageBoundaryState {
276
223
let page = ExportPage {
277
277
-
ops: vec![valid_op().to_string()],
224
224
+
ops: vec![valid_op()],
278
225
};
279
279
-
PageBoundaryState::new(&page).unwrap()
226
226
+
PageBoundaryState::new(&page).expect("to have a base page boundary state")
280
227
}
281
228
282
229
#[test]
···
287
234
}
288
235
289
236
#[test]
290
290
-
fn test_boundary_new_empty_op() {
291
291
-
let page = ExportPage {
292
292
-
ops: vec!["".to_string()],
293
293
-
};
294
294
-
let state = PageBoundaryState::new(&page);
295
295
-
assert!(state.is_none());
296
296
-
}
297
297
-
298
298
-
#[test]
299
299
-
fn test_boundary_new_ignores_bad_op() {
300
300
-
let page = ExportPage {
301
301
-
ops: vec!["bad".to_string()],
302
302
-
};
303
303
-
let state = PageBoundaryState::new(&page);
304
304
-
assert!(state.is_none());
305
305
-
}
306
306
-
307
307
-
#[test]
308
308
-
fn test_boundary_new_multiple_bad_end() {
309
309
-
let page = ExportPage {
310
310
-
ops: vec![
311
311
-
"bad".to_string(),
312
312
-
"".to_string(),
313
313
-
"foo".to_string(),
314
314
-
"".to_string(),
315
315
-
],
316
316
-
};
317
317
-
let state = PageBoundaryState::new(&page);
318
318
-
assert!(state.is_none());
319
319
-
}
320
320
-
321
321
-
#[test]
322
237
fn test_boundary_new_one_op() {
323
238
let page = ExportPage {
324
324
-
ops: vec![valid_op().to_string()],
239
239
+
ops: vec![valid_op()],
325
240
};
326
241
let state = PageBoundaryState::new(&page).unwrap();
327
242
assert_eq!(state.last_at, Dt::from_timestamp(FIVES_TS, 0).unwrap());
···
335
250
}
336
251
337
252
#[test]
338
338
-
fn test_boundary_new_one_op_with_stuff() {
339
339
-
let expect_same_state = |m, ops| {
340
340
-
let this_state = PageBoundaryState::new(&ExportPage { ops }).unwrap();
341
341
-
assert_eq!(this_state, base_state(), "{}", m);
342
342
-
};
343
343
-
344
344
-
expect_same_state("empty before", vec!["".to_string(), valid_op().to_string()]);
345
345
-
346
346
-
expect_same_state("empty after", vec![valid_op().to_string(), "".to_string()]);
347
347
-
348
348
-
expect_same_state(
349
349
-
"bad before, empty after",
350
350
-
vec!["bad".to_string(), valid_op().to_string(), "".to_string()],
351
351
-
);
352
352
-
353
353
-
expect_same_state(
354
354
-
"bad and empty before and after",
355
355
-
vec![
356
356
-
"".to_string(),
357
357
-
"bad".to_string(),
358
358
-
valid_op().to_string(),
359
359
-
"".to_string(),
360
360
-
"bad".to_string(),
361
361
-
],
362
362
-
);
363
363
-
}
364
364
-
365
365
-
#[test]
366
253
fn test_add_new_empty() {
367
254
let mut state = base_state();
368
255
state.apply_to_next(&mut ExportPage { ops: vec![] });
···
370
257
}
371
258
372
259
#[test]
373
373
-
fn test_add_new_empty_op() {
374
374
-
let mut state = base_state();
375
375
-
state.apply_to_next(&mut ExportPage {
376
376
-
ops: vec!["".to_string()],
377
377
-
});
378
378
-
assert_eq!(state, base_state());
379
379
-
}
380
380
-
381
381
-
#[test]
382
382
-
fn test_add_new_ignores_bad_op() {
383
383
-
let mut state = base_state();
384
384
-
state.apply_to_next(&mut ExportPage {
385
385
-
ops: vec!["bad".to_string()],
386
386
-
});
387
387
-
assert_eq!(state, base_state());
388
388
-
}
389
389
-
390
390
-
#[test]
391
391
-
fn test_add_new_multiple_bad() {
392
392
-
let mut page = ExportPage {
393
393
-
ops: vec![
394
394
-
"bad".to_string(),
395
395
-
"".to_string(),
396
396
-
"foo".to_string(),
397
397
-
"".to_string(),
398
398
-
],
399
399
-
};
400
400
-
401
401
-
let mut state = base_state();
402
402
-
state.apply_to_next(&mut page);
403
403
-
assert_eq!(state, base_state());
404
404
-
}
405
405
-
406
406
-
#[test]
407
260
fn test_add_new_same_op() {
408
261
let mut page = ExportPage {
409
409
-
ops: vec![valid_op().to_string()],
262
262
+
ops: vec![valid_op()],
410
263
};
411
264
let mut state = base_state();
412
265
state.apply_to_next(&mut page);
···
417
270
fn test_add_new_same_time() {
418
271
// make an op with a different OpKey
419
272
let mut op = valid_op();
420
420
-
op.as_object_mut()
421
421
-
.unwrap()
422
422
-
.insert("cid".to_string(), "cid2".into());
423
423
-
let mut page = ExportPage {
424
424
-
ops: vec![op.to_string()],
425
425
-
};
273
273
+
op.cid = "cid2".to_string();
274
274
+
let mut page = ExportPage { ops: vec![op] };
426
275
427
276
let mut state = base_state();
428
277
state.apply_to_next(&mut page);
···
446
295
fn test_add_new_same_time_dup_before() {
447
296
// make an op with a different OpKey
448
297
let mut op = valid_op();
449
449
-
op.as_object_mut()
450
450
-
.unwrap()
451
451
-
.insert("cid".to_string(), "cid2".into());
298
298
+
op.cid = "cid2".to_string();
452
299
let mut page = ExportPage {
453
453
-
ops: vec![valid_op().to_string(), op.to_string()],
300
300
+
ops: vec![valid_op(), op],
454
301
};
455
302
456
303
let mut state = base_state();
···
475
322
fn test_add_new_same_time_dup_after() {
476
323
// make an op with a different OpKey
477
324
let mut op = valid_op();
478
478
-
op.as_object_mut()
479
479
-
.unwrap()
480
480
-
.insert("cid".to_string(), "cid2".into());
481
481
-
let mut page = ExportPage {
482
482
-
ops: vec![op.to_string(), valid_op().to_string()],
483
483
-
};
484
484
-
485
485
-
let mut state = base_state();
486
486
-
state.apply_to_next(&mut page);
487
487
-
assert_eq!(state.last_at, Dt::from_timestamp(FIVES_TS, 0).unwrap());
488
488
-
assert_eq!(
489
489
-
state.keys_at,
490
490
-
vec![
491
491
-
OpKey {
492
492
-
cid: "cid".to_string(),
493
493
-
did: "did".to_string(),
494
494
-
},
495
495
-
OpKey {
496
496
-
cid: "cid2".to_string(),
497
497
-
did: "did".to_string(),
498
498
-
},
499
499
-
]
500
500
-
);
501
501
-
}
502
502
-
503
503
-
#[test]
504
504
-
fn test_add_new_same_time_blank_after() {
505
505
-
// make an op with a different OpKey
506
506
-
let mut op = valid_op();
507
507
-
op.as_object_mut()
508
508
-
.unwrap()
509
509
-
.insert("cid".to_string(), "cid2".into());
325
325
+
op.cid = "cid2".to_string();
510
326
let mut page = ExportPage {
511
511
-
ops: vec![op.to_string(), "".to_string()],
327
327
+
ops: vec![op, valid_op()],
512
328
};
513
329
514
330
let mut state = base_state();
···
532
348
#[test]
533
349
fn test_add_new_next_time() {
534
350
let mut page = ExportPage {
535
535
-
ops: vec![next_op().to_string()],
351
351
+
ops: vec![next_op()],
536
352
};
537
353
let mut state = base_state();
538
354
state.apply_to_next(&mut page);
···
549
365
#[test]
550
366
fn test_add_new_next_time_with_dup() {
551
367
let mut page = ExportPage {
552
552
-
ops: vec![valid_op().to_string(), next_op().to_string()],
368
368
+
ops: vec![valid_op(), next_op()],
553
369
};
554
370
let mut state = base_state();
555
371
state.apply_to_next(&mut page);
···
562
378
},]
563
379
);
564
380
assert_eq!(page.ops.len(), 1);
565
565
-
assert_eq!(page.ops[0], next_op().to_string());
381
381
+
assert_eq!(page.ops[0], next_op());
566
382
}
567
383
568
384
#[test]
569
385
fn test_add_new_next_time_with_dup_and_new_prev_same_time() {
570
386
// make an op with a different OpKey
571
387
let mut op = valid_op();
572
572
-
op.as_object_mut()
573
573
-
.unwrap()
574
574
-
.insert("cid".to_string(), "cid2".into());
388
388
+
op.cid = "cid2".to_string();
575
389
576
390
let mut page = ExportPage {
577
391
ops: vec![
578
578
-
valid_op().to_string(), // should get dropped
579
579
-
op.to_string(), // should be kept
580
580
-
next_op().to_string(),
392
392
+
valid_op(), // should get dropped
393
393
+
op.clone(), // should be kept
394
394
+
next_op(),
581
395
],
582
396
};
583
397
let mut state = base_state();
···
591
405
},]
592
406
);
593
407
assert_eq!(page.ops.len(), 2);
594
594
-
assert_eq!(page.ops[0], op.to_string());
595
595
-
assert_eq!(page.ops[1], next_op().to_string());
408
408
+
assert_eq!(page.ops[0], op);
409
409
+
assert_eq!(page.ops[1], next_op());
596
410
}
597
411
598
412
#[test]
599
413
fn test_add_new_next_time_with_dup_later_and_new_prev_same_time() {
600
414
// make an op with a different OpKey
601
415
let mut op = valid_op();
602
602
-
op.as_object_mut()
603
603
-
.unwrap()
604
604
-
.insert("cid".to_string(), "cid2".into());
416
416
+
op.cid = "cid2".to_string();
605
417
606
418
let mut page = ExportPage {
607
419
ops: vec![
608
608
-
op.to_string(), // should be kept
609
609
-
valid_op().to_string(), // should get dropped
610
610
-
next_op().to_string(),
420
420
+
op.clone(), // should be kept
421
421
+
valid_op(), // should get dropped
422
422
+
next_op(),
611
423
],
612
424
};
613
425
let mut state = base_state();
···
621
433
},]
622
434
);
623
435
assert_eq!(page.ops.len(), 2);
624
624
-
assert_eq!(page.ops[0], op.to_string());
625
625
-
assert_eq!(page.ops[1], next_op().to_string());
436
436
+
assert_eq!(page.ops[0], op);
437
437
+
assert_eq!(page.ops[1], next_op());
626
438
}
627
439
}
+15
-7
src/ratelimit.rs
···
24
24
let period = quota.replenish_interval() / factor;
25
25
let burst = quota
26
26
.burst_size()
27
27
-
.checked_mul(factor.try_into().unwrap())
28
28
-
.unwrap();
27
27
+
.checked_mul(factor.try_into().expect("factor to be non-zero"))
28
28
+
.expect("burst to be able to multiply");
29
29
Quota::with_period(period).map(|q| q.allow_burst(burst))
30
30
}
31
31
···
40
40
pub fn new(quota: Quota) -> Self {
41
41
Self {
42
42
per_ip: RateLimiter::keyed(quota),
43
43
-
ip6_56: RateLimiter::keyed(scale_quota(quota, 8).unwrap()),
44
44
-
ip6_48: RateLimiter::keyed(scale_quota(quota, 256).unwrap()),
43
43
+
ip6_56: RateLimiter::keyed(scale_quota(quota, 8).expect("to scale quota")),
44
44
+
ip6_48: RateLimiter::keyed(scale_quota(quota, 256).expect("to scale quota")),
45
45
}
46
46
}
47
47
pub fn check_key(&self, ip: IpAddr) -> Result<(), Duration> {
···
56
56
.map_err(asdf);
57
57
let check_56 = self
58
58
.ip6_56
59
59
-
.check_key(a.octets()[..7].try_into().unwrap())
59
59
+
.check_key(
60
60
+
a.octets()[..7]
61
61
+
.try_into()
62
62
+
.expect("to check ip6 /56 limiter"),
63
63
+
)
60
64
.map_err(asdf);
61
65
let check_48 = self
62
66
.ip6_48
63
63
-
.check_key(a.octets()[..6].try_into().unwrap())
67
67
+
.check_key(
68
68
+
a.octets()[..6]
69
69
+
.try_into()
70
70
+
.expect("to check ip6 /48 limiter"),
71
71
+
)
64
72
.map_err(asdf);
65
73
check_ip.and(check_56).and(check_48)
66
74
}
···
135
143
let remote = req
136
144
.remote_addr()
137
145
.as_socket_addr()
138
138
-
.unwrap_or_else(|| panic!("failed to get request's remote addr")) // TODO
146
146
+
.expect("failed to get request's remote addr") // TODO
139
147
.ip();
140
148
141
149
log::trace!("remote: {remote}");
+31
-14
src/weekly.rs
···
97
97
async fn reader_for(&self, week: Week) -> anyhow::Result<impl AsyncRead> {
98
98
let FolderSource(dir) = self;
99
99
let path = dir.join(format!("{}.jsonl.gz", week.0));
100
100
-
Ok(File::open(path).await?)
100
100
+
log::debug!("opening folder source: {path:?}");
101
101
+
let file = File::open(path)
102
102
+
.await
103
103
+
.inspect_err(|e| log::error!("failed to open file: {e}"))?;
104
104
+
Ok(file)
101
105
}
102
106
}
103
107
···
138
142
let mut week_t0 = total_t0;
139
143
140
144
while let Some(page) = rx.recv().await {
141
141
-
for mut s in page.ops {
142
142
-
let Ok(op) = serde_json::from_str::<Op>(&s)
143
143
-
.inspect_err(|e| log::error!("failed to parse plc op, ignoring: {e}"))
144
144
-
else {
145
145
-
continue;
146
146
-
};
145
145
+
for op in page.ops {
147
146
let op_week = op.created_at.into();
148
147
if current_week.map(|w| w != op_week).unwrap_or(true) {
149
148
encoder.shutdown().await?;
···
168
167
week_ops = 0;
169
168
week_t0 = now;
170
169
}
171
171
-
s.push('\n'); // hack
172
172
-
log::trace!("writing: {s}");
173
173
-
encoder.write_all(s.as_bytes()).await?;
170
170
+
log::trace!("writing: {op:?}");
171
171
+
encoder
172
172
+
.write_all(serde_json::to_string(&op)?.as_bytes())
173
173
+
.await?;
174
174
total_ops += 1;
175
175
week_ops += 1;
176
176
}
···
197
197
dest: mpsc::Sender<ExportPage>,
198
198
) -> anyhow::Result<()> {
199
199
use futures::TryStreamExt;
200
200
-
let decoder = GzipDecoder::new(BufReader::new(source.reader_for(week).await?));
200
200
+
let reader = source
201
201
+
.reader_for(week)
202
202
+
.await
203
203
+
.inspect_err(|e| log::error!("week_to_pages reader failed: {e}"))?;
204
204
+
let decoder = GzipDecoder::new(BufReader::new(reader));
201
205
let mut chunks = pin!(LinesStream::new(BufReader::new(decoder).lines()).try_chunks(10000));
202
206
203
203
-
while let Some(chunk) = chunks.try_next().await? {
204
204
-
let ops: Vec<String> = chunk.into_iter().collect();
207
207
+
while let Some(chunk) = chunks
208
208
+
.try_next()
209
209
+
.await
210
210
+
.inspect_err(|e| log::error!("failed to get next chunk: {e}"))?
211
211
+
{
212
212
+
let ops: Vec<Op> = chunk
213
213
+
.into_iter()
214
214
+
.filter_map(|s| {
215
215
+
serde_json::from_str::<Op>(&s)
216
216
+
.inspect_err(|e| log::warn!("failed to parse op: {e} ({s})"))
217
217
+
.ok()
218
218
+
})
219
219
+
.collect();
205
220
let page = ExportPage { ops };
206
206
-
dest.send(page).await?;
221
221
+
dest.send(page)
222
222
+
.await
223
223
+
.inspect_err(|e| log::error!("failed to send page: {e}"))?;
207
224
}
208
225
Ok(())
209
226
}