···13 dest: mpsc::Sender<ExportPage>,
14 source_workers: usize,
15 until: Option<Dt>,
16-) -> anyhow::Result<()> {
17 // queue up the week bundles that should be available
18 let weeks = Arc::new(Mutex::new(
19 until
···39 while let Some(week) = weeks.lock().await.pop() {
40 let when = Into::<Dt>::into(week).to_rfc3339();
41 log::trace!("worker {w}: fetching week {when} (-{})", week.n_ago());
42- week_to_pages(source.clone(), week, dest.clone()).await?;
0043 }
44 log::info!("done with the weeks ig");
45 Ok(())
···5051 // wait for the big backfill to finish
52 while let Some(res) = workers.join_next().await {
53- res??;
054 }
55- log::info!("finished fetching backfill in {:?}", t_step.elapsed());
56- Ok(())
000057}
···13 dest: mpsc::Sender<ExportPage>,
14 source_workers: usize,
15 until: Option<Dt>,
16+) -> anyhow::Result<&'static str> {
17 // queue up the week bundles that should be available
18 let weeks = Arc::new(Mutex::new(
19 until
···39 while let Some(week) = weeks.lock().await.pop() {
40 let when = Into::<Dt>::into(week).to_rfc3339();
41 log::trace!("worker {w}: fetching week {when} (-{})", week.n_ago());
42+ week_to_pages(source.clone(), week, dest.clone())
43+ .await
44+ .inspect_err(|e| log::error!("failing week_to_pages: {e}"))?;
45 }
46 log::info!("done with the weeks ig");
47 Ok(())
···5253 // wait for the big backfill to finish
54 while let Some(res) = workers.join_next().await {
55+ res.inspect_err(|e| log::error!("problem joining source workers: {e}"))?
56+ .inspect_err(|e| log::error!("problem *from* source worker: {e}"))?;
57 }
58+ log::info!(
59+ "finished fetching backfill in {:?}. senders remaining: {}",
60+ t_step.elapsed(),
61+ dest.strong_count()
62+ );
63+ Ok("backfill")
64}
+40-226
src/bin/allegedly.rs
···1-use allegedly::{
2- Db, Dt, ExportPage, FolderSource, HttpSource, ListenConf, PageBoundaryState, backfill,
3- backfill_to_pg, bin_init, pages_to_pg, pages_to_weeks, poll_upstream, serve,
4-};
5use clap::{CommandFactory, Parser, Subcommand};
6-use reqwest::Url;
7-use std::{net::SocketAddr, path::PathBuf, time::Instant};
8-use tokio::sync::{mpsc, oneshot};
000910#[derive(Debug, Parser)]
11struct Cli {
12- /// Upstream PLC server
13- #[arg(short, long, global = true, env = "ALLEGEDLY_UPSTREAM")]
14- #[clap(default_value = "https://plc.directory")]
15- upstream: Url,
16 #[command(subcommand)]
17 command: Commands,
18}
···21enum Commands {
22 /// Use weekly bundled ops to get a complete directory mirror FAST
23 Backfill {
24- /// Remote URL prefix to fetch bundles from
25- #[arg(long)]
26- #[clap(default_value = "https://plc.t3.storage.dev/plc.directory/")]
27- http: Url,
28- /// Local folder to fetch bundles from (overrides `http`)
29- #[arg(long)]
30- dir: Option<PathBuf>,
31- /// Parallel bundle fetchers
32- ///
33- /// Default: 4 for http fetches, 1 for local folder
34- #[arg(long)]
35- source_workers: Option<usize>,
36- /// Bulk load into did-method-plc-compatible postgres instead of stdout
37- ///
38- /// Pass a postgres connection url like "postgresql://localhost:5432"
39- #[arg(long)]
40- to_postgres: Option<Url>,
41- /// Delete all operations from the postgres db before starting
42- ///
43- /// only used if `--to-postgres` is present
44- #[arg(long, action)]
45- postgres_reset: bool,
46- /// Stop at the week ending before this date
47- #[arg(long)]
48- until: Option<Dt>,
49- /// After the weekly imports, poll upstream until we're caught up
50- #[arg(long, action)]
51- catch_up: bool,
52 },
53 /// Scrape a PLC server, collecting ops into weekly bundles
54 ///
···73 },
74 /// Wrap a did-method-plc server, syncing upstream and blocking op submits
75 Mirror {
76- /// the wrapped did-method-plc server
77- #[arg(long, env = "ALLEGEDLY_WRAP")]
78- wrap: Url,
79- /// the wrapped did-method-plc server's database (write access required)
80- #[arg(long, env = "ALLEGEDLY_WRAP_PG")]
81- wrap_pg: Url,
82- /// wrapping server listen address
83- #[arg(short, long, env = "ALLEGEDLY_BIND")]
84- #[clap(default_value = "127.0.0.1:8000")]
85- bind: SocketAddr,
86- /// obtain a certificate from letsencrypt
87- ///
88- /// for now this will force listening on all interfaces at :80 and :443
89- /// (:80 will serve an "https required" error, *will not* redirect)
90- #[arg(
91- long,
92- conflicts_with("bind"),
93- requires("acme_cache_path"),
94- env = "ALLEGEDLY_ACME_DOMAIN"
95- )]
96- acme_domain: Vec<String>,
97- /// which local directory to keep the letsencrypt certs in
98- #[arg(long, requires("acme_domain"), env = "ALLEGEDLY_ACME_CACHE_PATH")]
99- acme_cache_path: Option<PathBuf>,
100- /// which public acme directory to use
101- ///
102- /// eg. letsencrypt staging: "https://acme-staging-v02.api.letsencrypt.org/directory"
103- #[arg(long, requires("acme_domain"), env = "ALLEGEDLY_ACME_DIRECTORY_URL")]
104- #[clap(default_value = "https://acme-v02.api.letsencrypt.org/directory")]
105- acme_directory_url: Url,
106 },
107 /// Poll an upstream PLC server and log new ops to stdout
108 Tail {
···112 },
113}
114115-async fn pages_to_stdout(
116- mut rx: mpsc::Receiver<ExportPage>,
117- notify_last_at: Option<oneshot::Sender<Option<Dt>>>,
118-) -> anyhow::Result<()> {
119- let mut last_at = None;
120- while let Some(page) = rx.recv().await {
121- for op in &page.ops {
122- println!("{op}");
123- }
124- if notify_last_at.is_some()
125- && let Some(s) = PageBoundaryState::new(&page)
126- {
127- last_at = last_at.filter(|&l| l >= s.last_at).or(Some(s.last_at));
128- }
129- }
130- if let Some(notify) = notify_last_at {
131- log::trace!("notifying last_at: {last_at:?}");
132- if notify.send(last_at).is_err() {
133- log::error!("receiver for last_at dropped, can't notify");
134- };
135- }
136- Ok(())
137-}
138-139-/// page forwarder who drops its channels on receipt of a small page
140-///
141-/// PLC will return up to 1000 ops on a page, and returns full pages until it
142-/// has caught up, so this is a (hacky?) way to stop polling once we're up.
143-fn full_pages(mut rx: mpsc::Receiver<ExportPage>) -> mpsc::Receiver<ExportPage> {
144- let (tx, fwd) = mpsc::channel(1);
145- tokio::task::spawn(async move {
146- while let Some(page) = rx.recv().await
147- && page.ops.len() > 900
148- {
149- tx.send(page).await.unwrap();
150- }
151- });
152- fwd
153-}
154-155#[tokio::main]
156-async fn main() {
157 let args = Cli::parse();
158 let matches = Cli::command().get_matches();
159 let name = matches.subcommand().map(|(name, _)| name).unwrap_or("???");
160 bin_init(name);
16100162 let t0 = Instant::now();
163 match args.command {
164- Commands::Backfill {
165- http,
166- dir,
167- source_workers,
168- to_postgres,
169- postgres_reset,
170- until,
171- catch_up,
172- } => {
173- let (tx, rx) = mpsc::channel(32); // these are big pages
174- tokio::task::spawn(async move {
175- if let Some(dir) = dir {
176- log::info!("Reading weekly bundles from local folder {dir:?}");
177- backfill(FolderSource(dir), tx, source_workers.unwrap_or(1), until)
178- .await
179- .unwrap();
180- } else {
181- log::info!("Fetching weekly bundles from from {http}");
182- backfill(HttpSource(http), tx, source_workers.unwrap_or(4), until)
183- .await
184- .unwrap();
185- }
186- });
187-188- // postgres writer will notify us as soon as the very last op's time is known
189- // so we can start catching up while pg is restoring indexes and stuff
190- let (notify_last_at, rx_last) = if catch_up {
191- let (tx, rx) = oneshot::channel();
192- (Some(tx), Some(rx))
193- } else {
194- (None, None)
195- };
196-197- let to_postgres_url_bulk = to_postgres.clone();
198- let bulk_out_write = tokio::task::spawn(async move {
199- if let Some(ref url) = to_postgres_url_bulk {
200- let db = Db::new(url.as_str()).await.unwrap();
201- backfill_to_pg(db, postgres_reset, rx, notify_last_at)
202- .await
203- .unwrap();
204- } else {
205- pages_to_stdout(rx, notify_last_at).await.unwrap();
206- }
207- });
208-209- if let Some(rx_last) = rx_last {
210- let mut upstream = args.upstream;
211- upstream.set_path("/export");
212- // wait until the time for `after` is known
213- let last_at = rx_last.await.unwrap();
214- log::info!("beginning catch-up from {last_at:?} while the writer finalizes stuff");
215- let (tx, rx) = mpsc::channel(256); // these are small pages
216- tokio::task::spawn(
217- async move { poll_upstream(last_at, upstream, tx).await.unwrap() },
218- );
219- bulk_out_write.await.unwrap();
220- log::info!("writing catch-up pages");
221- let full_pages = full_pages(rx);
222- if let Some(url) = to_postgres {
223- let db = Db::new(url.as_str()).await.unwrap();
224- pages_to_pg(db, full_pages).await.unwrap();
225- } else {
226- pages_to_stdout(full_pages, None).await.unwrap();
227- }
228- }
229- }
230 Commands::Bundle {
231 dest,
232 after,
233 clobber,
234 } => {
235- let mut url = args.upstream;
236 url.set_path("/export");
237 let (tx, rx) = mpsc::channel(32); // read ahead if gzip stalls for some reason
238- tokio::task::spawn(async move { poll_upstream(Some(after), url, tx).await.unwrap() });
0000239 log::trace!("ensuring output directory exists");
240- std::fs::create_dir_all(&dest).unwrap();
241- pages_to_weeks(rx, dest, clobber).await.unwrap();
242- }
243- Commands::Mirror {
244- wrap,
245- wrap_pg,
246- bind,
247- acme_domain,
248- acme_cache_path,
249- acme_directory_url,
250- } => {
251- let db = Db::new(wrap_pg.as_str()).await.unwrap();
252- let latest = db
253- .get_latest()
254 .await
255- .unwrap()
256- .expect("there to be at least one op in the db. did you backfill?");
257-258- let (tx, rx) = mpsc::channel(2);
259- // upstream poller
260- let mut url = args.upstream.clone();
261- tokio::task::spawn(async move {
262- log::info!("starting poll reader...");
263- url.set_path("/export");
264- tokio::task::spawn(
265- async move { poll_upstream(Some(latest), url, tx).await.unwrap() },
266- );
267- });
268- // db writer
269- let poll_db = db.clone();
270- tokio::task::spawn(async move {
271- log::info!("starting db writer...");
272- pages_to_pg(poll_db, rx).await.unwrap();
273- });
274-275- let listen_conf = match (bind, acme_domain.is_empty(), acme_cache_path) {
276- (_, false, Some(cache_path)) => ListenConf::Acme {
277- domains: acme_domain,
278- cache_path,
279- directory_url: acme_directory_url.to_string(),
280- },
281- (bind, true, None) => ListenConf::Bind(bind),
282- (_, _, _) => unreachable!(),
283- };
284-285- serve(&args.upstream, wrap, listen_conf).await.unwrap();
286 }
0287 Commands::Tail { after } => {
288- let mut url = args.upstream;
289 url.set_path("/export");
290 let start_at = after.or_else(|| Some(chrono::Utc::now()));
291 let (tx, rx) = mpsc::channel(1);
292- tokio::task::spawn(async move { poll_upstream(start_at, url, tx).await.unwrap() });
293- pages_to_stdout(rx, None).await.unwrap();
000000294 }
295 }
296 log::info!("whew, {:?}. goodbye!", t0.elapsed());
0297}
···1+use allegedly::{Dt, bin::GlobalArgs, bin_init, pages_to_stdout, pages_to_weeks, poll_upstream};
0002use clap::{CommandFactory, Parser, Subcommand};
3+use std::{path::PathBuf, time::Instant};
4+use tokio::fs::create_dir_all;
5+use tokio::sync::mpsc;
6+7+mod backfill;
8+mod mirror;
910#[derive(Debug, Parser)]
11struct Cli {
12+ #[command(flatten)]
13+ globals: GlobalArgs,
14+015 #[command(subcommand)]
16 command: Commands,
17}
···20enum Commands {
21 /// Use weekly bundled ops to get a complete directory mirror FAST
22 Backfill {
23+ #[command(flatten)]
24+ args: backfill::Args,
0000000000000000000000000025 },
26 /// Scrape a PLC server, collecting ops into weekly bundles
27 ///
···46 },
47 /// Wrap a did-method-plc server, syncing upstream and blocking op submits
48 Mirror {
49+ #[command(flatten)]
50+ args: mirror::Args,
000000000000000000000000000051 },
52 /// Poll an upstream PLC server and log new ops to stdout
53 Tail {
···57 },
58}
59000000000000000000000000000000000000000060#[tokio::main]
61+async fn main() -> anyhow::Result<()> {
62 let args = Cli::parse();
63 let matches = Cli::command().get_matches();
64 let name = matches.subcommand().map(|(name, _)| name).unwrap_or("???");
65 bin_init(name);
6667+ let globals = args.globals.clone();
68+69 let t0 = Instant::now();
70 match args.command {
71+ Commands::Backfill { args } => backfill::run(globals, args).await?,
0000000000000000000000000000000000000000000000000000000000000000072 Commands::Bundle {
73 dest,
74 after,
75 clobber,
76 } => {
77+ let mut url = globals.upstream;
78 url.set_path("/export");
79 let (tx, rx) = mpsc::channel(32); // read ahead if gzip stalls for some reason
80+ tokio::task::spawn(async move {
81+ poll_upstream(Some(after), url, tx)
82+ .await
83+ .expect("to poll upstream")
84+ });
85 log::trace!("ensuring output directory exists");
86+ create_dir_all(&dest)
000000000000087 .await
88+ .expect("to ensure output dir exists");
89+ pages_to_weeks(rx, dest, clobber)
90+ .await
91+ .expect("to write bundles to output files");
00000000000000000000000000092 }
93+ Commands::Mirror { args } => mirror::run(globals, args).await?,
94 Commands::Tail { after } => {
95+ let mut url = globals.upstream;
96 url.set_path("/export");
97 let start_at = after.or_else(|| Some(chrono::Utc::now()));
98 let (tx, rx) = mpsc::channel(1);
99+ tokio::task::spawn(async move {
100+ poll_upstream(start_at, url, tx)
101+ .await
102+ .expect("to poll upstream")
103+ });
104+ pages_to_stdout(rx, None)
105+ .await
106+ .expect("to write pages to stdout");
107 }
108 }
109 log::info!("whew, {:?}. goodbye!", t0.elapsed());
110+ Ok(())
111}