···22name = "allegedly"
33description = "public ledger server tools and services (for the PLC)"
44license = "MIT OR Apache-2.0"
55-version = "0.2.0"
55+version = "0.3.3"
66edition = "2024"
77default-run = "allegedly"
88···1515governor = "0.10.1"
1616http-body-util = "0.1.3"
1717log = "0.4.28"
1818+native-tls = "0.2.14"
1919+opentelemetry = "0.30.0"
2020+opentelemetry-otlp = { version = "0.30.0" }
2121+opentelemetry_sdk = { version = "0.30.0", features = ["rt-tokio"] }
1822poem = { version = "3.1.12", features = ["acme", "compression"] }
1919-reqwest = { version = "0.12.23", features = ["stream", "json"] }
2323+postgres-native-tls = "0.5.1"
2424+reqwest = { version = "0.12.23", features = ["stream", "json", "gzip"] }
2025reqwest-middleware = "0.4.2"
2126reqwest-retry = "0.7.0"
2227rustls = "0.23.32"
···2732tokio-postgres = { version = "0.7.13", features = ["with-chrono-0_4", "with-serde_json-1"] }
2833tokio-stream = { version = "0.1.17", features = ["io-util"] }
2934tokio-util = { version = "0.7.16", features = ["compat"] }
3535+tracing = "0.1.41"
3636+tracing-opentelemetry = "0.31.0"
3037tracing-subscriber = { version = "0.3.20", features = ["env-filter"] }
+37
examples/poll.rs
···11+use allegedly::{ExportPage, poll_upstream};
22+33+#[tokio::main]
44+async fn main() {
55+ // set to `None` to replay from the beginning of the PLC history
66+ let after = Some(chrono::Utc::now());
77+88+ // the PLC server to poll for new ops
99+ let upstream = "https://plc.wtf/export".parse().unwrap();
1010+1111+ // self-rate-limit (plc.directory's limit interval is 600ms)
1212+ let throttle = std::time::Duration::from_millis(300);
1313+1414+ // pages are sent out of the poller via a tokio mpsc channel
1515+ let (tx, mut rx) = tokio::sync::mpsc::channel(1);
1616+1717+ // spawn a tokio task to run the poller
1818+ tokio::task::spawn(poll_upstream(after, upstream, throttle, tx));
1919+2020+ // receive pages of plc ops from the poller
2121+ while let Some(ExportPage { ops }) = rx.recv().await {
2222+ println!("received {} plc ops", ops.len());
2323+2424+ for op in ops {
2525+ // in this example we're alerting when changes are found for one
2626+ // specific identity
2727+ if op.did == "did:plc:hdhoaan3xa3jiuq4fg4mefid" {
2828+ println!(
2929+ "Update found for {}! cid={}\n -> operation: {}",
3030+ op.did,
3131+ op.cid,
3232+ op.operation.get()
3333+ );
3434+ }
3535+ }
3636+ }
3737+}
favicon.ico
This is a binary file and will not be displayed.
+26-8
readme.md
···2626 sudo allegedly mirror \
2727 --upstream "https://plc.directory" \
2828 --wrap "http://127.0.0.1:3000" \
2929+ --wrap-pg-cert "/opt/allegedly/postgres-cert.pem" \
2930 --acme-domain "plc.wtf" \
3030- --acme-cache-dir ./acme-cache \
3131- --acme-directory-url "https://acme-staging-v02.api.letsencrypt.org/directory"
3131+ --acme-domain "alt.plc.wtf" \
3232+ --experimental-acme-domain "experimental.plc.wtf" \
3333+ --acme-cache-path ./acme-cache \
3434+ --acme-directory-url "https://acme-staging-v02.api.letsencrypt.org/directory" \
3535+ --acme-ipv6 \
3636+ --experimental-write-upstream
3737+ ```
3838+3939+- Reverse-proxy to any PLC server, terminating TLS and forwarding writes upstream
4040+4141+ ```bash
4242+ sudo allegedly wrap \
4343+ --wrap "http://127.0.0.1:3000" \
4444+ --acme-ipv6 \
4545+ --acme-cache-path ./acme-cache \
4646+ --acme-domain "plc.wtf" \
4747+ --experimental-acme-domain "experimental.plc.wtf" \
4848+ --experimental-write-upstream \
4949+ --upstream "https://plc.wtf" \
3250 ```
33513452···6179- monitoring of the various tasks
6280- health check pings
6381- expose metrics/tracing
6464-- read-only flag for mirror wrapper
8282+- [x] read-only flag for mirror wrapper
6583- bundle: write directly to s3-compatible object storage
6684- helpers for automating periodic `bundle` runs
678568866987### new things
70887171-- experimental: websocket version of /export
7272-- experimental: accept writes by forwarding them upstream
7373-- experimental: serve a tlog
7474-- experimental: embed a log database directly for fast and efficient mirroring
7575-- experimental: support multiple upstreams?
8989+- [ ] experimental: websocket version of /export
9090+- [x] experimental: accept writes by forwarding them upstream
9191+- [ ] experimental: serve a tlog
9292+- [ ] experimental: embed a log database directly for fast and efficient mirroring
9393+- [ ] experimental: support multiple upstreams?
76947795- [ ] new command todo: `zip` or `check` or `diff`: compare two plc logs over some time range
7896- [ ] new command to consider: `scatter` or something: broadcast plc writes to multiple upstreams
+12-5
src/backfill.rs
···1313 dest: mpsc::Sender<ExportPage>,
1414 source_workers: usize,
1515 until: Option<Dt>,
1616-) -> anyhow::Result<()> {
1616+) -> anyhow::Result<&'static str> {
1717 // queue up the week bundles that should be available
1818 let weeks = Arc::new(Mutex::new(
1919 until
···3939 while let Some(week) = weeks.lock().await.pop() {
4040 let when = Into::<Dt>::into(week).to_rfc3339();
4141 log::trace!("worker {w}: fetching week {when} (-{})", week.n_ago());
4242- week_to_pages(source.clone(), week, dest.clone()).await?;
4242+ week_to_pages(source.clone(), week, dest.clone())
4343+ .await
4444+ .inspect_err(|e| log::error!("failing week_to_pages: {e}"))?;
4345 }
4446 log::info!("done with the weeks ig");
4547 Ok(())
···50525153 // wait for the big backfill to finish
5254 while let Some(res) = workers.join_next().await {
5353- res??;
5555+ res.inspect_err(|e| log::error!("problem joining source workers: {e}"))?
5656+ .inspect_err(|e| log::error!("problem *from* source worker: {e}"))?;
5457 }
5555- log::info!("finished fetching backfill in {:?}", t_step.elapsed());
5656- Ok(())
5858+ log::info!(
5959+ "finished fetching backfill in {:?}. senders remaining: {}",
6060+ t_step.elapsed(),
6161+ dest.strong_count()
6262+ );
6363+ Ok("backfill")
5764}
+65-223
src/bin/allegedly.rs
···11-use allegedly::{
22- Db, Dt, ExportPage, FolderSource, HttpSource, ListenConf, PageBoundaryState, backfill,
33- backfill_to_pg, bin_init, pages_to_pg, pages_to_weeks, poll_upstream, serve,
44-};
11+use allegedly::bin::{GlobalArgs, InstrumentationArgs, bin_init};
22+use allegedly::{Dt, logo, pages_to_stdout, pages_to_weeks, poll_upstream};
53use clap::{CommandFactory, Parser, Subcommand};
66-use reqwest::Url;
77-use std::{net::SocketAddr, path::PathBuf, time::Instant};
88-use tokio::sync::{mpsc, oneshot};
44+use std::{path::PathBuf, time::Duration, time::Instant};
55+use tokio::fs::create_dir_all;
66+use tokio::sync::mpsc;
77+88+mod backfill;
99+mod mirror;
9101011#[derive(Debug, Parser)]
1112struct Cli {
1212- /// Upstream PLC server
1313- #[arg(short, long, global = true, env = "ALLEGEDLY_UPSTREAM")]
1414- #[clap(default_value = "https://plc.directory")]
1515- upstream: Url,
1313+ #[command(flatten)]
1414+ globals: GlobalArgs,
1515+1616 #[command(subcommand)]
1717 command: Commands,
1818}
···2121enum Commands {
2222 /// Use weekly bundled ops to get a complete directory mirror FAST
2323 Backfill {
2424- /// Remote URL prefix to fetch bundles from
2525- #[arg(long)]
2626- #[clap(default_value = "https://plc.t3.storage.dev/plc.directory/")]
2727- http: Url,
2828- /// Local folder to fetch bundles from (overrides `http`)
2929- #[arg(long)]
3030- dir: Option<PathBuf>,
3131- /// Parallel bundle fetchers
3232- ///
3333- /// Default: 4 for http fetches, 1 for local folder
3434- #[arg(long)]
3535- source_workers: Option<usize>,
3636- /// Bulk load into did-method-plc-compatible postgres instead of stdout
3737- ///
3838- /// Pass a postgres connection url like "postgresql://localhost:5432"
3939- #[arg(long)]
4040- to_postgres: Option<Url>,
4141- /// Delete all operations from the postgres db before starting
4242- ///
4343- /// only used if `--to-postgres` is present
4444- #[arg(long, action)]
4545- postgres_reset: bool,
4646- /// Stop at the week ending before this date
4747- #[arg(long)]
4848- until: Option<Dt>,
4949- /// After the weekly imports, poll upstream until we're caught up
5050- #[arg(long, action)]
5151- catch_up: bool,
2424+ #[command(flatten)]
2525+ args: backfill::Args,
5226 },
5327 /// Scrape a PLC server, collecting ops into weekly bundles
5428 ///
···7347 },
7448 /// Wrap a did-method-plc server, syncing upstream and blocking op submits
7549 Mirror {
7676- /// the wrapped did-method-plc server
7777- #[arg(long, env = "ALLEGEDLY_WRAP")]
7878- wrap: Url,
7979- /// the wrapped did-method-plc server's database (write access required)
8080- #[arg(long, env = "ALLEGEDLY_WRAP_PG")]
8181- wrap_pg: Url,
8282- /// wrapping server listen address
8383- #[arg(short, long, env = "ALLEGEDLY_BIND")]
8484- #[clap(default_value = "127.0.0.1:8000")]
8585- bind: SocketAddr,
8686- /// obtain a certificate from letsencrypt
8787- ///
8888- /// for now this will force listening on all interfaces at :80 and :443
8989- /// (:80 will serve an "https required" error, *will not* redirect)
9090- #[arg(
9191- long,
9292- conflicts_with("bind"),
9393- requires("acme_cache_path"),
9494- env = "ALLEGEDLY_ACME_DOMAIN"
9595- )]
9696- acme_domain: Vec<String>,
9797- /// which local directory to keep the letsencrypt certs in
9898- #[arg(long, requires("acme_domain"), env = "ALLEGEDLY_ACME_CACHE_PATH")]
9999- acme_cache_path: Option<PathBuf>,
100100- /// which public acme directory to use
101101- ///
102102- /// eg. letsencrypt staging: "https://acme-staging-v02.api.letsencrypt.org/directory"
103103- #[arg(long, requires("acme_domain"), env = "ALLEGEDLY_ACME_DIRECTORY_URL")]
104104- #[clap(default_value = "https://acme-v02.api.letsencrypt.org/directory")]
105105- acme_directory_url: Url,
5050+ #[command(flatten)]
5151+ args: mirror::Args,
5252+ #[command(flatten)]
5353+ instrumentation: InstrumentationArgs,
5454+ },
5555+ /// Wrap any did-method-plc server, without syncing upstream (read-only)
5656+ Wrap {
5757+ #[command(flatten)]
5858+ args: mirror::Args,
5959+ #[command(flatten)]
6060+ instrumentation: InstrumentationArgs,
10661 },
10762 /// Poll an upstream PLC server and log new ops to stdout
10863 Tail {
···11267 },
11368}
11469115115-async fn pages_to_stdout(
116116- mut rx: mpsc::Receiver<ExportPage>,
117117- notify_last_at: Option<oneshot::Sender<Option<Dt>>>,
118118-) -> anyhow::Result<()> {
119119- let mut last_at = None;
120120- while let Some(page) = rx.recv().await {
121121- for op in &page.ops {
122122- println!("{op}");
123123- }
124124- if notify_last_at.is_some()
125125- && let Some(s) = PageBoundaryState::new(&page)
126126- {
127127- last_at = last_at.filter(|&l| l >= s.last_at).or(Some(s.last_at));
7070+impl Commands {
7171+ fn enable_otel(&self) -> bool {
7272+ match self {
7373+ Commands::Mirror {
7474+ instrumentation, ..
7575+ }
7676+ | Commands::Wrap {
7777+ instrumentation, ..
7878+ } => instrumentation.enable_opentelemetry,
7979+ _ => false,
12880 }
12981 }
130130- if let Some(notify) = notify_last_at {
131131- log::trace!("notifying last_at: {last_at:?}");
132132- if notify.send(last_at).is_err() {
133133- log::error!("receiver for last_at dropped, can't notify");
134134- };
135135- }
136136- Ok(())
137137-}
138138-139139-/// page forwarder who drops its channels on receipt of a small page
140140-///
141141-/// PLC will return up to 1000 ops on a page, and returns full pages until it
142142-/// has caught up, so this is a (hacky?) way to stop polling once we're up.
143143-fn full_pages(mut rx: mpsc::Receiver<ExportPage>) -> mpsc::Receiver<ExportPage> {
144144- let (tx, fwd) = mpsc::channel(1);
145145- tokio::task::spawn(async move {
146146- while let Some(page) = rx.recv().await
147147- && page.ops.len() > 900
148148- {
149149- tx.send(page).await.unwrap();
150150- }
151151- });
152152- fwd
15382}
1548315584#[tokio::main]
156156-async fn main() {
8585+async fn main() -> anyhow::Result<()> {
15786 let args = Cli::parse();
15887 let matches = Cli::command().get_matches();
15988 let name = matches.subcommand().map(|(name, _)| name).unwrap_or("???");
160160- bin_init(name);
8989+ bin_init(args.command.enable_otel());
9090+ log::info!("{}", logo(name));
9191+9292+ let globals = args.globals.clone();
1619316294 let t0 = Instant::now();
16395 match args.command {
164164- Commands::Backfill {
165165- http,
166166- dir,
167167- source_workers,
168168- to_postgres,
169169- postgres_reset,
170170- until,
171171- catch_up,
172172- } => {
173173- let (tx, rx) = mpsc::channel(32); // these are big pages
174174- tokio::task::spawn(async move {
175175- if let Some(dir) = dir {
176176- log::info!("Reading weekly bundles from local folder {dir:?}");
177177- backfill(FolderSource(dir), tx, source_workers.unwrap_or(1), until)
178178- .await
179179- .unwrap();
180180- } else {
181181- log::info!("Fetching weekly bundles from from {http}");
182182- backfill(HttpSource(http), tx, source_workers.unwrap_or(4), until)
183183- .await
184184- .unwrap();
185185- }
186186- });
187187-188188- // postgres writer will notify us as soon as the very last op's time is known
189189- // so we can start catching up while pg is restoring indexes and stuff
190190- let (notify_last_at, rx_last) = if catch_up {
191191- let (tx, rx) = oneshot::channel();
192192- (Some(tx), Some(rx))
193193- } else {
194194- (None, None)
195195- };
196196-197197- let to_postgres_url_bulk = to_postgres.clone();
198198- let bulk_out_write = tokio::task::spawn(async move {
199199- if let Some(ref url) = to_postgres_url_bulk {
200200- let db = Db::new(url.as_str()).await.unwrap();
201201- backfill_to_pg(db, postgres_reset, rx, notify_last_at)
202202- .await
203203- .unwrap();
204204- } else {
205205- pages_to_stdout(rx, notify_last_at).await.unwrap();
206206- }
207207- });
208208-209209- if let Some(rx_last) = rx_last {
210210- let mut upstream = args.upstream;
211211- upstream.set_path("/export");
212212- // wait until the time for `after` is known
213213- let last_at = rx_last.await.unwrap();
214214- log::info!("beginning catch-up from {last_at:?} while the writer finalizes stuff");
215215- let (tx, rx) = mpsc::channel(256); // these are small pages
216216- tokio::task::spawn(
217217- async move { poll_upstream(last_at, upstream, tx).await.unwrap() },
218218- );
219219- bulk_out_write.await.unwrap();
220220- log::info!("writing catch-up pages");
221221- let full_pages = full_pages(rx);
222222- if let Some(url) = to_postgres {
223223- let db = Db::new(url.as_str()).await.unwrap();
224224- pages_to_pg(db, full_pages).await.unwrap();
225225- } else {
226226- pages_to_stdout(full_pages, None).await.unwrap();
227227- }
228228- }
229229- }
9696+ Commands::Backfill { args } => backfill::run(globals, args).await?,
23097 Commands::Bundle {
23198 dest,
23299 after,
233100 clobber,
234101 } => {
235235- let mut url = args.upstream;
102102+ let mut url = globals.upstream;
236103 url.set_path("/export");
104104+ let throttle = Duration::from_millis(globals.upstream_throttle_ms);
237105 let (tx, rx) = mpsc::channel(32); // read ahead if gzip stalls for some reason
238238- tokio::task::spawn(async move { poll_upstream(Some(after), url, tx).await.unwrap() });
239239- log::trace!("ensuring output directory exists");
240240- std::fs::create_dir_all(&dest).unwrap();
241241- pages_to_weeks(rx, dest, clobber).await.unwrap();
242242- }
243243- Commands::Mirror {
244244- wrap,
245245- wrap_pg,
246246- bind,
247247- acme_domain,
248248- acme_cache_path,
249249- acme_directory_url,
250250- } => {
251251- let db = Db::new(wrap_pg.as_str()).await.unwrap();
252252- let latest = db
253253- .get_latest()
254254- .await
255255- .unwrap()
256256- .expect("there to be at least one op in the db. did you backfill?");
257257-258258- let (tx, rx) = mpsc::channel(2);
259259- // upstream poller
260260- let mut url = args.upstream.clone();
261106 tokio::task::spawn(async move {
262262- log::info!("starting poll reader...");
263263- url.set_path("/export");
264264- tokio::task::spawn(
265265- async move { poll_upstream(Some(latest), url, tx).await.unwrap() },
266266- );
267267- });
268268- // db writer
269269- let poll_db = db.clone();
270270- tokio::task::spawn(async move {
271271- log::info!("starting db writer...");
272272- pages_to_pg(poll_db, rx).await.unwrap();
107107+ poll_upstream(Some(after), url, throttle, tx)
108108+ .await
109109+ .expect("to poll upstream")
273110 });
274274-275275- let listen_conf = match (bind, acme_domain.is_empty(), acme_cache_path) {
276276- (_, false, Some(cache_path)) => ListenConf::Acme {
277277- domains: acme_domain,
278278- cache_path,
279279- directory_url: acme_directory_url.to_string(),
280280- },
281281- (bind, true, None) => ListenConf::Bind(bind),
282282- (_, _, _) => unreachable!(),
283283- };
284284-285285- serve(&args.upstream, wrap, listen_conf).await.unwrap();
111111+ log::trace!("ensuring output directory exists");
112112+ create_dir_all(&dest)
113113+ .await
114114+ .expect("to ensure output dir exists");
115115+ pages_to_weeks(rx, dest, clobber)
116116+ .await
117117+ .expect("to write bundles to output files");
286118 }
119119+ Commands::Mirror { args, .. } => mirror::run(globals, args, true).await?,
120120+ Commands::Wrap { args, .. } => mirror::run(globals, args, false).await?,
287121 Commands::Tail { after } => {
288288- let mut url = args.upstream;
122122+ let mut url = globals.upstream;
289123 url.set_path("/export");
290124 let start_at = after.or_else(|| Some(chrono::Utc::now()));
125125+ let throttle = Duration::from_millis(globals.upstream_throttle_ms);
291126 let (tx, rx) = mpsc::channel(1);
292292- tokio::task::spawn(async move { poll_upstream(start_at, url, tx).await.unwrap() });
293293- pages_to_stdout(rx, None).await.unwrap();
127127+ tokio::task::spawn(async move {
128128+ poll_upstream(start_at, url, throttle, tx)
129129+ .await
130130+ .expect("to poll upstream")
131131+ });
132132+ pages_to_stdout(rx, None)
133133+ .await
134134+ .expect("to write pages to stdout");
294135 }
295136 }
296137 log::info!("whew, {:?}. goodbye!", t0.elapsed());
138138+ Ok(())
297139}
+207
src/bin/backfill.rs
···11+use allegedly::{
22+ Db, Dt, ExportPage, FolderSource, HttpSource, backfill, backfill_to_pg,
33+ bin::{GlobalArgs, bin_init},
44+ full_pages, logo, pages_to_pg, pages_to_stdout, poll_upstream,
55+};
66+use clap::Parser;
77+use reqwest::Url;
88+use std::{path::PathBuf, time::Duration};
99+use tokio::{
1010+ sync::{mpsc, oneshot},
1111+ task::JoinSet,
1212+};
1313+1414+pub const DEFAULT_HTTP: &str = "https://plc.t3.storage.dev/plc.directory/";
1515+1616+#[derive(Debug, clap::Args)]
1717+pub struct Args {
1818+ /// Remote URL prefix to fetch bundles from
1919+ #[arg(long)]
2020+ #[clap(default_value = DEFAULT_HTTP)]
2121+ http: Url,
2222+ /// Local folder to fetch bundles from (overrides `http`)
2323+ #[arg(long)]
2424+ dir: Option<PathBuf>,
2525+ /// Don't do weekly bulk-loading at all.
2626+ ///
2727+ /// overrides `http` and `dir`, makes catch_up redundant
2828+ #[arg(long, action)]
2929+ no_bulk: bool,
3030+ /// Parallel bundle fetchers
3131+ ///
3232+ /// Default: 4 for http fetches, 1 for local folder
3333+ #[arg(long)]
3434+ source_workers: Option<usize>,
3535+ /// Bulk load into did-method-plc-compatible postgres instead of stdout
3636+ ///
3737+ /// Pass a postgres connection url like "postgresql://localhost:5432"
3838+ #[arg(long, env = "ALLEGEDLY_TO_POSTGRES")]
3939+ to_postgres: Option<Url>,
4040+ /// Cert for postgres (if needed)
4141+ #[arg(long)]
4242+ postgres_cert: Option<PathBuf>,
4343+ /// Delete all operations from the postgres db before starting
4444+ ///
4545+ /// only used if `--to-postgres` is present
4646+ #[arg(long, action)]
4747+ postgres_reset: bool,
4848+ /// Stop at the week ending before this date
4949+ #[arg(long)]
5050+ until: Option<Dt>,
5151+ /// After the weekly imports, poll upstream until we're caught up
5252+ #[arg(long, action)]
5353+ catch_up: bool,
5454+}
5555+5656+pub async fn run(
5757+ GlobalArgs {
5858+ upstream,
5959+ upstream_throttle_ms,
6060+ }: GlobalArgs,
6161+ Args {
6262+ http,
6363+ dir,
6464+ no_bulk,
6565+ source_workers,
6666+ to_postgres,
6767+ postgres_cert,
6868+ postgres_reset,
6969+ until,
7070+ catch_up,
7171+ }: Args,
7272+) -> anyhow::Result<()> {
7373+ let mut tasks = JoinSet::<anyhow::Result<&'static str>>::new();
7474+7575+ let (bulk_tx, bulk_out) = mpsc::channel(32); // bulk uses big pages
7676+7777+ // a bulk sink can notify us as soon as the very last op's time is known
7878+ // so we can start catching up while the sink might restore indexes and such
7979+ let (found_last_tx, found_last_out) = if catch_up {
8080+ let (tx, rx) = oneshot::channel();
8181+ (Some(tx), Some(rx))
8282+ } else {
8383+ (None, None)
8484+ };
8585+8686+ let (poll_tx, poll_out) = mpsc::channel::<ExportPage>(128); // normal/small pages
8787+ let (full_tx, full_out) = mpsc::channel(1); // don't need to buffer at this filter
8888+8989+ // set up sources
9090+ if no_bulk {
9191+ // simple mode, just poll upstream from teh beginning
9292+ if http != DEFAULT_HTTP.parse()? {
9393+ log::warn!("ignoring non-default bulk http setting since --no-bulk was set");
9494+ }
9595+ if let Some(d) = dir {
9696+ log::warn!("ignoring bulk dir setting ({d:?}) since --no-bulk was set.");
9797+ }
9898+ if let Some(u) = until {
9999+ log::warn!(
100100+ "ignoring `until` setting ({u:?}) since --no-bulk was set. (feature request?)"
101101+ );
102102+ }
103103+ let mut upstream = upstream;
104104+ upstream.set_path("/export");
105105+ let throttle = Duration::from_millis(upstream_throttle_ms);
106106+ tasks.spawn(poll_upstream(None, upstream, throttle, poll_tx));
107107+ tasks.spawn(full_pages(poll_out, full_tx));
108108+ tasks.spawn(pages_to_stdout(full_out, None));
109109+ } else {
110110+ // fun mode
111111+112112+ // set up bulk sources
113113+ if let Some(dir) = dir {
114114+ if http != DEFAULT_HTTP.parse()? {
115115+ anyhow::bail!(
116116+ "non-default bulk http setting can't be used with bulk dir setting ({dir:?})"
117117+ );
118118+ }
119119+ tasks.spawn(backfill(
120120+ FolderSource(dir),
121121+ bulk_tx,
122122+ source_workers.unwrap_or(1),
123123+ until,
124124+ ));
125125+ } else {
126126+ tasks.spawn(backfill(
127127+ HttpSource(http),
128128+ bulk_tx,
129129+ source_workers.unwrap_or(4),
130130+ until,
131131+ ));
132132+ }
133133+134134+ // and the catch-up source...
135135+ if let Some(last) = found_last_out {
136136+ let throttle = Duration::from_millis(upstream_throttle_ms);
137137+ tasks.spawn(async move {
138138+ let mut upstream = upstream;
139139+ upstream.set_path("/export");
140140+141141+ poll_upstream(last.await?, upstream, throttle, poll_tx).await
142142+ });
143143+ }
144144+145145+ // set up sinks
146146+ if let Some(pg_url) = to_postgres {
147147+ log::trace!("connecting to postgres...");
148148+ let db = Db::new(pg_url.as_str(), postgres_cert).await?;
149149+ log::trace!("connected to postgres");
150150+151151+ tasks.spawn(backfill_to_pg(
152152+ db.clone(),
153153+ postgres_reset,
154154+ bulk_out,
155155+ found_last_tx,
156156+ ));
157157+ if catch_up {
158158+ tasks.spawn(pages_to_pg(db, full_out));
159159+ }
160160+ } else {
161161+ tasks.spawn(pages_to_stdout(bulk_out, found_last_tx));
162162+ if catch_up {
163163+ tasks.spawn(pages_to_stdout(full_out, None));
164164+ }
165165+ }
166166+ }
167167+168168+ while let Some(next) = tasks.join_next().await {
169169+ match next {
170170+ Err(e) if e.is_panic() => {
171171+ log::error!("a joinset task panicked: {e}. bailing now. (should we panic?)");
172172+ return Err(e.into());
173173+ }
174174+ Err(e) => {
175175+ log::error!("a joinset task failed to join: {e}");
176176+ return Err(e.into());
177177+ }
178178+ Ok(Err(e)) => {
179179+ log::error!("a joinset task completed with error: {e}");
180180+ return Err(e);
181181+ }
182182+ Ok(Ok(name)) => {
183183+ log::trace!("a task completed: {name:?}. {} left", tasks.len());
184184+ }
185185+ }
186186+ }
187187+188188+ Ok(())
189189+}
190190+191191+#[derive(Debug, Parser)]
192192+struct CliArgs {
193193+ #[command(flatten)]
194194+ globals: GlobalArgs,
195195+ #[command(flatten)]
196196+ args: Args,
197197+}
198198+199199+#[allow(dead_code)]
200200+#[tokio::main]
201201+async fn main() -> anyhow::Result<()> {
202202+ let args = CliArgs::parse();
203203+ bin_init(false);
204204+ log::info!("{}", logo("backfill"));
205205+ run(args.globals, args.args).await?;
206206+ Ok(())
207207+}