forked from
microcosm.blue/Allegedly
Server tools to backfill, tail, mirror, and verify PLC logs
1use allegedly::{
2 Db, ExperimentalConf, FjallDb, ListenConf,
3 bin::{GlobalArgs, InstrumentationArgs, bin_init},
4 logo, pages_to_pg, poll_upstream, poll_upstream_seq, seq_pages_to_fjall, serve, serve_fjall,
5 tail_upstream_stream,
6};
7use clap::Parser;
8use reqwest::Url;
9use std::{net::SocketAddr, path::PathBuf, time::Duration};
10use tokio::{fs::create_dir_all, sync::mpsc, task::JoinSet};
11
12#[derive(Debug, clap::Args)]
13pub struct Args {
14 /// the wrapped did-method-plc server (not needed when using --wrap-fjall)
15 #[arg(long, env = "ALLEGEDLY_WRAP")]
16 wrap: Option<Url>,
17 /// the wrapped did-method-plc server's database (write access required)
18 #[arg(long, env = "ALLEGEDLY_WRAP_PG", conflicts_with = "wrap_fjall")]
19 wrap_pg: Option<Url>,
20 /// path to tls cert for the wrapped postgres db, if needed
21 #[arg(long, env = "ALLEGEDLY_WRAP_PG_CERT")]
22 wrap_pg_cert: Option<PathBuf>,
23 /// path to a local fjall database directory (alternative to postgres)
24 #[arg(long, env = "ALLEGEDLY_WRAP_FJALL", conflicts_with_all = ["wrap_pg", "wrap_pg_cert"])]
25 wrap_fjall: Option<PathBuf>,
26 /// compact the fjall db on startup
27 #[arg(
28 long,
29 env = "ALLEGEDLY_FJALL_COMPACT",
30 conflicts_with_all = ["wrap_pg", "wrap_pg_cert"]
31 )]
32 compact_fjall: bool,
33 /// wrapping server listen address
34 #[arg(short, long, env = "ALLEGEDLY_BIND")]
35 #[clap(default_value = "127.0.0.1:8000")]
36 bind: SocketAddr,
37 /// obtain a certificate from letsencrypt
38 ///
39 /// for now this will force listening on all interfaces at :80 and :443
40 /// (:80 will serve an "https required" error, *will not* redirect)
41 #[arg(
42 long,
43 conflicts_with("bind"),
44 requires("acme_cache_path"),
45 env = "ALLEGEDLY_ACME_DOMAIN"
46 )]
47 acme_domain: Vec<String>,
48 /// which local directory to keep the letsencrypt certs in
49 #[arg(long, requires("acme_domain"), env = "ALLEGEDLY_ACME_CACHE_PATH")]
50 acme_cache_path: Option<PathBuf>,
51 /// which public acme directory to use
52 ///
53 /// eg. letsencrypt staging: "https://acme-staging-v02.api.letsencrypt.org/directory"
54 #[arg(long, requires("acme_domain"), env = "ALLEGEDLY_ACME_DIRECTORY_URL")]
55 #[clap(default_value = "https://acme-v02.api.letsencrypt.org/directory")]
56 acme_directory_url: Url,
57 /// try to listen for ipv6
58 #[arg(long, action, requires("acme_domain"), env = "ALLEGEDLY_ACME_IPV6")]
59 acme_ipv6: bool,
60 /// only accept experimental requests at this hostname
61 ///
62 /// a cert will be provisioned for it from letsencrypt. if you're not using
63 /// acme (eg., behind a tls-terminating reverse proxy), open a feature request.
64 #[arg(
65 long,
66 requires("acme_domain"),
67 env = "ALLEGEDLY_EXPERIMENTAL_ACME_DOMAIN"
68 )]
69 experimental_acme_domain: Option<String>,
70 /// accept writes! by forwarding them upstream
71 #[arg(long, action, env = "ALLEGEDLY_EXPERIMENTAL_WRITE_UPSTREAM")]
72 experimental_write_upstream: bool,
73 /// switch from polling to /export/stream once the latest op is within
74 /// this many days of now (plc.directory only supports ~1 week of backfill)
75 #[arg(long, env = "ALLEGEDLY_STREAM_CUTOVER_DAYS", default_value = "5")]
76 stream_cutover_days: u32,
77}
78
79pub async fn run(
80 GlobalArgs {
81 upstream,
82 upstream_throttle_ms,
83 }: GlobalArgs,
84 Args {
85 wrap,
86 wrap_pg,
87 wrap_pg_cert,
88 wrap_fjall,
89 compact_fjall,
90 bind,
91 acme_domain,
92 acme_cache_path,
93 acme_directory_url,
94 acme_ipv6,
95 experimental_acme_domain,
96 experimental_write_upstream,
97 stream_cutover_days,
98 }: Args,
99 sync: bool,
100) -> anyhow::Result<()> {
101 let listen_conf = match (bind, acme_domain.is_empty(), acme_cache_path) {
102 (_, false, Some(cache_path)) => {
103 create_dir_all(&cache_path).await?;
104 let mut domains = acme_domain.clone();
105 if let Some(ref experimental_domain) = experimental_acme_domain {
106 domains.push(experimental_domain.clone())
107 }
108 log::info!("configuring acme for https at {domains:?}...");
109 ListenConf::Acme {
110 domains,
111 cache_path,
112 directory_url: acme_directory_url.to_string(),
113 ipv6: acme_ipv6,
114 }
115 }
116 (bind, true, None) => ListenConf::Bind(bind),
117 (_, _, _) => unreachable!(),
118 };
119
120 let experimental_conf = ExperimentalConf {
121 acme_domain: experimental_acme_domain,
122 write_upstream: experimental_write_upstream,
123 };
124
125 let mut tasks = JoinSet::new();
126
127 if let Some(fjall_path) = wrap_fjall {
128 let db = FjallDb::open(&fjall_path)?;
129 if compact_fjall {
130 log::info!("compacting fjall...");
131 db.compact()?;
132 }
133
134 log::debug!("getting the latest seq from fjall...");
135 let latest_seq = db
136 .get_latest()?
137 .map(|(seq, _)| seq)
138 .expect("there to be at least one op in the db. did you backfill?");
139 log::info!("starting seq polling from seq {latest_seq}...");
140
141 let (send_page, recv_page) = mpsc::channel::<allegedly::SeqPage>(8);
142
143 let mut export_url = upstream.clone();
144 export_url.set_path("/export");
145 let mut stream_url = upstream.clone();
146 stream_url.set_path("/export/stream");
147 let throttle = Duration::from_millis(upstream_throttle_ms);
148 let cutover_age = Duration::from_secs(stream_cutover_days as u64 * 86_400);
149
150 // the poll -> stream task: poll until we're caught up, then switch to stream.
151 // on stream disconnect, fall back to polling to resync.
152 let send_page_bg = send_page.clone();
153 tasks.spawn(async move {
154 let mut current_seq = latest_seq;
155 loop {
156 log::info!("seq polling from seq {current_seq}");
157 let (inner_tx, mut inner_rx) = mpsc::channel::<allegedly::SeqPage>(8);
158
159 // run poller; it ends only when the channel closes
160 let poll_url = export_url.clone();
161 let poll_task = tokio::spawn(poll_upstream_seq(
162 Some(current_seq),
163 poll_url,
164 throttle,
165 inner_tx,
166 ));
167
168 // drain pages from poller until the last op is within cutover_age of now,
169 // meaning we're close enough to the tip that the stream can cover the rest
170 let mut last_seq_from_poll = current_seq;
171
172 while let Some(page) = inner_rx.recv().await {
173 let near_tip = page.ops.last().map_or(false, |op| {
174 let age = chrono::Utc::now().signed_duration_since(op.created_at);
175 age.to_std().map_or(false, |d| d <= cutover_age)
176 });
177 if let Some(last) = page.ops.last() {
178 last_seq_from_poll = last.seq;
179 }
180 let _ = send_page_bg.send(page).await;
181 if near_tip {
182 break;
183 }
184 }
185
186 poll_task.abort();
187 current_seq = last_seq_from_poll;
188
189 // switch to streaming
190 log::info!("caught up at seq {current_seq}, switching to /export/stream");
191 let (stream_inner_tx, mut stream_inner_rx) = mpsc::channel::<allegedly::SeqPage>(8);
192 let stream_task = tokio::spawn(tail_upstream_stream(
193 Some(current_seq),
194 stream_url.clone(),
195 stream_inner_tx,
196 ));
197
198 while let Some(page) = stream_inner_rx.recv().await {
199 if let Some(last) = page.ops.last() {
200 current_seq = last.seq;
201 }
202 if send_page_bg.send(page).await.is_err() {
203 stream_task.abort();
204 return anyhow::Ok("fjall-poll-stream (dest closed)");
205 }
206 }
207
208 // stream ended/errored — loop back to polling to resync
209 match stream_task.await {
210 Ok(Ok(())) => log::info!("stream closed cleanly, resyncing via poll"),
211 Ok(Err(e)) => log::warn!("stream error: {e}, resyncing via poll"),
212 Err(e) => log::warn!("stream task join error: {e}"),
213 }
214 }
215 });
216
217 tasks.spawn(seq_pages_to_fjall(db.clone(), recv_page));
218 tasks.spawn(serve_fjall(upstream, listen_conf, experimental_conf, db));
219 } else {
220 let wrap = wrap.ok_or(anyhow::anyhow!(
221 "--wrap is required unless using --wrap-fjall"
222 ))?;
223
224 let db: Option<Db> = if sync {
225 let wrap_pg = wrap_pg.ok_or(anyhow::anyhow!(
226 "a wrapped reference postgres (--wrap-pg) or fjall db (--wrap-fjall) must be provided to sync"
227 ))?;
228 let db = Db::new(wrap_pg.as_str(), wrap_pg_cert).await?;
229
230 log::debug!("getting the latest op from the db...");
231 let latest = db
232 .get_latest()
233 .await?
234 .expect("there to be at least one op in the db. did you backfill?");
235 log::debug!("starting polling from {latest}...");
236
237 let (send_page, recv_page) = mpsc::channel(8);
238
239 let mut poll_url = upstream.clone();
240 poll_url.set_path("/export");
241 let throttle = Duration::from_millis(upstream_throttle_ms);
242
243 tasks.spawn(poll_upstream(Some(latest), poll_url, throttle, send_page));
244 tasks.spawn(pages_to_pg(db.clone(), recv_page));
245 Some(db)
246 } else {
247 None
248 };
249
250 tasks.spawn(serve(upstream, wrap, listen_conf, experimental_conf, db));
251 }
252
253 while let Some(next) = tasks.join_next().await {
254 match next {
255 Err(e) if e.is_panic() => {
256 log::error!("a joinset task panicked: {e}. bailing now. (should we panic?)");
257 return Err(e.into());
258 }
259 Err(e) => {
260 log::error!("a joinset task failed to join: {e}");
261 return Err(e.into());
262 }
263 Ok(Err(e)) => {
264 log::error!("a joinset task completed with error: {e}");
265 return Err(e);
266 }
267 Ok(Ok(name)) => {
268 log::trace!("a task completed: {name:?}. {} left", tasks.len());
269 }
270 }
271 }
272
273 Ok(())
274}
275
276#[derive(Debug, Parser)]
277struct CliArgs {
278 #[command(flatten)]
279 globals: GlobalArgs,
280 #[command(flatten)]
281 instrumentation: InstrumentationArgs,
282 #[command(flatten)]
283 args: Args,
284 /// Run the mirror in wrap mode, no upstream synchronization (read-only)
285 #[arg(long, action)]
286 wrap_mode: bool,
287}
288
289#[allow(dead_code)]
290#[tokio::main]
291async fn main() -> anyhow::Result<()> {
292 let args = CliArgs::parse();
293 bin_init(args.instrumentation.enable_opentelemetry);
294 log::info!("{}", logo("mirror"));
295 run(args.globals, args.args, !args.wrap_mode).await?;
296 Ok(())
297}