forked from
microcosm.blue/Allegedly
Server tools to backfill, tail, mirror, and verify PLC logs
1use allegedly::{
2 Db, Dt, ExportPage, FjallDb, FolderSource, HttpSource, SeqPage, backfill, backfill_to_fjall,
3 backfill_to_pg,
4 bin::{GlobalArgs, bin_init},
5 full_pages, full_pages_seq, logo, pages_to_pg, pages_to_stdout, poll_upstream,
6 poll_upstream_seq,
7};
8use clap::Parser;
9use reqwest::Url;
10use std::{path::PathBuf, time::Duration};
11use tokio::{
12 sync::{mpsc, oneshot},
13 task::JoinSet,
14};
15
16pub const DEFAULT_HTTP: &str = "https://plc.t3.storage.dev/plc.directory/";
17
18#[derive(Debug, clap::Args)]
19pub struct Args {
20 /// Remote URL prefix to fetch bundles from
21 #[arg(long)]
22 #[clap(default_value = DEFAULT_HTTP)]
23 http: Url,
24 /// Local folder to fetch bundles from (overrides `http`)
25 #[arg(long)]
26 dir: Option<PathBuf>,
27 /// Don't do weekly bulk-loading at all.
28 ///
29 /// overrides `http` and `dir`, makes catch_up redundant
30 #[arg(long, action)]
31 no_bulk: bool,
32 /// Parallel bundle fetchers
33 ///
34 /// Default: 4 for http fetches, 1 for local folder
35 #[arg(long)]
36 source_workers: Option<usize>,
37 /// Bulk load into did-method-plc-compatible postgres instead of stdout
38 ///
39 /// Pass a postgres connection url like "postgresql://localhost:5432"
40 #[arg(long, env = "ALLEGEDLY_TO_POSTGRES")]
41 to_postgres: Option<Url>,
42 /// Cert for postgres (if needed)
43 #[arg(long)]
44 postgres_cert: Option<PathBuf>,
45 /// Delete all operations from the db before starting
46 ///
47 /// only used if `--to-postgres` or `--to-fjall` is present
48 #[arg(long, action)]
49 reset: bool,
50 /// Load into a local fjall embedded database
51 /// (doesnt support bulk yet unless loading from another fjall db)
52 ///
53 /// Pass a directory path for the fjall database
54 #[arg(long, conflicts_with_all = ["to_postgres", "postgres_cert"])]
55 to_fjall: Option<PathBuf>,
56 /// Stop at the week ending before this date
57 #[arg(long)]
58 until: Option<Dt>,
59 /// After the weekly imports, poll upstream until we're caught up
60 #[arg(long, action)]
61 catch_up: bool,
62}
63
64pub async fn run(
65 GlobalArgs {
66 upstream,
67 upstream_throttle_ms,
68 }: GlobalArgs,
69 Args {
70 http,
71 dir,
72 no_bulk,
73 source_workers,
74 to_postgres,
75 postgres_cert,
76 reset,
77 to_fjall,
78 until,
79 catch_up,
80 }: Args,
81) -> anyhow::Result<()> {
82 let mut tasks = JoinSet::<anyhow::Result<&'static str>>::new();
83
84 let (bulk_tx, bulk_out) = mpsc::channel(32); // bulk uses big pages
85
86 // a bulk sink can notify us as soon as the very last op's time is known
87 // so we can start catching up while the sink might restore indexes and such
88 let (found_last_tx, found_last_out) = if catch_up {
89 let (tx, rx) = oneshot::channel();
90 (Some(tx), Some(rx))
91 } else {
92 (None, None)
93 };
94
95 let (poll_tx, poll_out) = mpsc::channel::<ExportPage>(128); // normal/small pages
96 let (full_tx, full_out) = mpsc::channel::<ExportPage>(1); // don't need to buffer at this filter
97
98 // set up sources
99 if no_bulk {
100 // simple mode, just poll upstream from teh beginning
101 if http != DEFAULT_HTTP.parse()? {
102 log::warn!("ignoring non-default bulk http setting since --no-bulk was set");
103 }
104 if let Some(d) = dir {
105 log::warn!("ignoring bulk dir setting ({d:?}) since --no-bulk was set.");
106 }
107 if let Some(u) = until {
108 log::warn!(
109 "ignoring `until` setting ({u:?}) since --no-bulk was set. (feature request?)"
110 );
111 }
112 let mut upstream = upstream;
113 upstream.set_path("/export");
114 let throttle = Duration::from_millis(upstream_throttle_ms);
115 if let Some(fjall_path) = to_fjall {
116 log::trace!("opening fjall db at {fjall_path:?}...");
117 let db = FjallDb::open(&fjall_path)?;
118 log::trace!("opened fjall db");
119
120 let (poll_tx, poll_out) = mpsc::channel::<SeqPage>(128); // normal/small pages
121 let (full_tx, full_out) = mpsc::channel::<SeqPage>(1); // don't need to buffer at this filter
122
123 tasks.spawn(poll_upstream_seq(None, upstream, throttle, poll_tx));
124 tasks.spawn(full_pages_seq(poll_out, full_tx));
125 tasks.spawn(backfill_to_fjall(db, reset, full_out, None));
126 } else {
127 tasks.spawn(poll_upstream(None, upstream, throttle, poll_tx));
128 tasks.spawn(full_pages(poll_out, full_tx));
129 tasks.spawn(pages_to_stdout(full_out, None));
130 }
131 } else {
132 // fun mode
133
134 // set up bulk sources
135 if let Some(dir) = dir {
136 if http != DEFAULT_HTTP.parse()? {
137 anyhow::bail!(
138 "non-default bulk http setting can't be used with bulk dir setting ({dir:?})"
139 );
140 }
141 tasks.spawn(backfill(
142 FolderSource(dir),
143 bulk_tx,
144 source_workers.unwrap_or(1),
145 until,
146 ));
147 } else {
148 tasks.spawn(backfill(
149 HttpSource(http),
150 bulk_tx,
151 source_workers.unwrap_or(4),
152 until,
153 ));
154 }
155
156 // and the catch-up source...
157 if let Some(last) = found_last_out {
158 let throttle = Duration::from_millis(upstream_throttle_ms);
159 tasks.spawn(async move {
160 let mut upstream = upstream;
161 upstream.set_path("/export");
162
163 poll_upstream(last.await?, upstream, throttle, poll_tx).await
164 });
165 }
166
167 // set up sinks
168 if let Some(pg_url) = to_postgres {
169 log::trace!("connecting to postgres...");
170 let db = Db::new(pg_url.as_str(), postgres_cert).await?;
171 log::trace!("connected to postgres");
172
173 tasks.spawn(backfill_to_pg(db.clone(), reset, bulk_out, found_last_tx));
174 if catch_up {
175 tasks.spawn(pages_to_pg(db, full_out));
176 }
177 } else {
178 tasks.spawn(pages_to_stdout(bulk_out, found_last_tx));
179 if catch_up {
180 tasks.spawn(pages_to_stdout(full_out, None));
181 }
182 }
183 }
184
185 while let Some(next) = tasks.join_next().await {
186 match next {
187 Err(e) if e.is_panic() => {
188 log::error!("a joinset task panicked: {e}. bailing now. (should we panic?)");
189 return Err(e.into());
190 }
191 Err(e) => {
192 log::error!("a joinset task failed to join: {e}");
193 return Err(e.into());
194 }
195 Ok(Err(e)) => {
196 log::error!("a joinset task completed with error: {e}");
197 return Err(e);
198 }
199 Ok(Ok(name)) => {
200 log::trace!("a task completed: {name:?}. {} left", tasks.len());
201 }
202 }
203 }
204
205 Ok(())
206}
207
208#[derive(Debug, Parser)]
209struct CliArgs {
210 #[command(flatten)]
211 globals: GlobalArgs,
212 #[command(flatten)]
213 args: Args,
214}
215
216#[allow(dead_code)]
217#[tokio::main]
218async fn main() -> anyhow::Result<()> {
219 let args = CliArgs::parse();
220 bin_init(false);
221 log::info!("{}", logo("backfill"));
222 run(args.globals, args.args).await?;
223 Ok(())
224}