···2name = "allegedly"
3description = "public ledger server tools and services (for the PLC)"
4license = "MIT OR Apache-2.0"
5-version = "0.2.0"
6edition = "2024"
7default-run = "allegedly"
8···15governor = "0.10.1"
16http-body-util = "0.1.3"
17log = "0.4.28"
000018poem = { version = "3.1.12", features = ["acme", "compression"] }
19-reqwest = { version = "0.12.23", features = ["stream", "json"] }
020reqwest-middleware = "0.4.2"
21reqwest-retry = "0.7.0"
22rustls = "0.23.32"
···27tokio-postgres = { version = "0.7.13", features = ["with-chrono-0_4", "with-serde_json-1"] }
28tokio-stream = { version = "0.1.17", features = ["io-util"] }
29tokio-util = { version = "0.7.16", features = ["compat"] }
0030tracing-subscriber = { version = "0.3.20", features = ["env-filter"] }
···2name = "allegedly"
3description = "public ledger server tools and services (for the PLC)"
4license = "MIT OR Apache-2.0"
5+version = "0.3.3"
6edition = "2024"
7default-run = "allegedly"
8···15governor = "0.10.1"
16http-body-util = "0.1.3"
17log = "0.4.28"
18+native-tls = "0.2.14"
19+opentelemetry = "0.30.0"
20+opentelemetry-otlp = { version = "0.30.0" }
21+opentelemetry_sdk = { version = "0.30.0", features = ["rt-tokio"] }
22poem = { version = "3.1.12", features = ["acme", "compression"] }
23+postgres-native-tls = "0.5.1"
24+reqwest = { version = "0.12.23", features = ["stream", "json", "gzip"] }
25reqwest-middleware = "0.4.2"
26reqwest-retry = "0.7.0"
27rustls = "0.23.32"
···32tokio-postgres = { version = "0.7.13", features = ["with-chrono-0_4", "with-serde_json-1"] }
33tokio-stream = { version = "0.1.17", features = ["io-util"] }
34tokio-util = { version = "0.7.16", features = ["compat"] }
35+tracing = "0.1.41"
36+tracing-opentelemetry = "0.31.0"
37tracing-subscriber = { version = "0.3.20", features = ["env-filter"] }
+37
examples/poll.rs
···0000000000000000000000000000000000000
···1+use allegedly::{ExportPage, poll_upstream};
2+3+#[tokio::main]
4+async fn main() {
5+ // set to `None` to replay from the beginning of the PLC history
6+ let after = Some(chrono::Utc::now());
7+8+ // the PLC server to poll for new ops
9+ let upstream = "https://plc.wtf/export".parse().unwrap();
10+11+ // self-rate-limit (plc.directory's limit interval is 600ms)
12+ let throttle = std::time::Duration::from_millis(300);
13+14+ // pages are sent out of the poller via a tokio mpsc channel
15+ let (tx, mut rx) = tokio::sync::mpsc::channel(1);
16+17+ // spawn a tokio task to run the poller
18+ tokio::task::spawn(poll_upstream(after, upstream, throttle, tx));
19+20+ // receive pages of plc ops from the poller
21+ while let Some(ExportPage { ops }) = rx.recv().await {
22+ println!("received {} plc ops", ops.len());
23+24+ for op in ops {
25+ // in this example we're alerting when changes are found for one
26+ // specific identity
27+ if op.did == "did:plc:hdhoaan3xa3jiuq4fg4mefid" {
28+ println!(
29+ "Update found for {}! cid={}\n -> operation: {}",
30+ op.did,
31+ op.cid,
32+ op.operation.get()
33+ );
34+ }
35+ }
36+ }
37+}
favicon.ico
This is a binary file and will not be displayed.
+26-8
readme.md
···26 sudo allegedly mirror \
27 --upstream "https://plc.directory" \
28 --wrap "http://127.0.0.1:3000" \
029 --acme-domain "plc.wtf" \
30- --acme-cache-dir ./acme-cache \
31- --acme-directory-url "https://acme-staging-v02.api.letsencrypt.org/directory"
0000000000000000032 ```
3334···61- monitoring of the various tasks
62- health check pings
63- expose metrics/tracing
64-- read-only flag for mirror wrapper
65- bundle: write directly to s3-compatible object storage
66- helpers for automating periodic `bundle` runs
676869### new things
7071-- experimental: websocket version of /export
72-- experimental: accept writes by forwarding them upstream
73-- experimental: serve a tlog
74-- experimental: embed a log database directly for fast and efficient mirroring
75-- experimental: support multiple upstreams?
7677- [ ] new command todo: `zip` or `check` or `diff`: compare two plc logs over some time range
78- [ ] new command to consider: `scatter` or something: broadcast plc writes to multiple upstreams
···26 sudo allegedly mirror \
27 --upstream "https://plc.directory" \
28 --wrap "http://127.0.0.1:3000" \
29+ --wrap-pg-cert "/opt/allegedly/postgres-cert.pem" \
30 --acme-domain "plc.wtf" \
31+ --acme-domain "alt.plc.wtf" \
32+ --experimental-acme-domain "experimental.plc.wtf" \
33+ --acme-cache-path ./acme-cache \
34+ --acme-directory-url "https://acme-staging-v02.api.letsencrypt.org/directory" \
35+ --acme-ipv6 \
36+ --experimental-write-upstream
37+ ```
38+39+- Reverse-proxy to any PLC server, terminating TLS and forwarding writes upstream
40+41+ ```bash
42+ sudo allegedly wrap \
43+ --wrap "http://127.0.0.1:3000" \
44+ --acme-ipv6 \
45+ --acme-cache-path ./acme-cache \
46+ --acme-domain "plc.wtf" \
47+ --experimental-acme-domain "experimental.plc.wtf" \
48+ --experimental-write-upstream \
49+ --upstream "https://plc.wtf" \
50 ```
5152···79- monitoring of the various tasks
80- health check pings
81- expose metrics/tracing
82+- [x] read-only flag for mirror wrapper
83- bundle: write directly to s3-compatible object storage
84- helpers for automating periodic `bundle` runs
858687### new things
8889+- [ ] experimental: websocket version of /export
90+- [x] experimental: accept writes by forwarding them upstream
91+- [ ] experimental: serve a tlog
92+- [ ] experimental: embed a log database directly for fast and efficient mirroring
93+- [ ] experimental: support multiple upstreams?
9495- [ ] new command todo: `zip` or `check` or `diff`: compare two plc logs over some time range
96- [ ] new command to consider: `scatter` or something: broadcast plc writes to multiple upstreams
+12-5
src/backfill.rs
···13 dest: mpsc::Sender<ExportPage>,
14 source_workers: usize,
15 until: Option<Dt>,
16-) -> anyhow::Result<()> {
17 // queue up the week bundles that should be available
18 let weeks = Arc::new(Mutex::new(
19 until
···39 while let Some(week) = weeks.lock().await.pop() {
40 let when = Into::<Dt>::into(week).to_rfc3339();
41 log::trace!("worker {w}: fetching week {when} (-{})", week.n_ago());
42- week_to_pages(source.clone(), week, dest.clone()).await?;
0043 }
44 log::info!("done with the weeks ig");
45 Ok(())
···5051 // wait for the big backfill to finish
52 while let Some(res) = workers.join_next().await {
53- res??;
054 }
55- log::info!("finished fetching backfill in {:?}", t_step.elapsed());
56- Ok(())
000057}
···13 dest: mpsc::Sender<ExportPage>,
14 source_workers: usize,
15 until: Option<Dt>,
16+) -> anyhow::Result<&'static str> {
17 // queue up the week bundles that should be available
18 let weeks = Arc::new(Mutex::new(
19 until
···39 while let Some(week) = weeks.lock().await.pop() {
40 let when = Into::<Dt>::into(week).to_rfc3339();
41 log::trace!("worker {w}: fetching week {when} (-{})", week.n_ago());
42+ week_to_pages(source.clone(), week, dest.clone())
43+ .await
44+ .inspect_err(|e| log::error!("failing week_to_pages: {e}"))?;
45 }
46 log::info!("done with the weeks ig");
47 Ok(())
···5253 // wait for the big backfill to finish
54 while let Some(res) = workers.join_next().await {
55+ res.inspect_err(|e| log::error!("problem joining source workers: {e}"))?
56+ .inspect_err(|e| log::error!("problem *from* source worker: {e}"))?;
57 }
58+ log::info!(
59+ "finished fetching backfill in {:?}. senders remaining: {}",
60+ t_step.elapsed(),
61+ dest.strong_count()
62+ );
63+ Ok("backfill")
64}
+65-223
src/bin/allegedly.rs
···1-use allegedly::{
2- Db, Dt, ExportPage, FolderSource, HttpSource, ListenConf, PageBoundaryState, backfill,
3- backfill_to_pg, bin_init, pages_to_pg, pages_to_weeks, poll_upstream, serve,
4-};
5use clap::{CommandFactory, Parser, Subcommand};
6-use reqwest::Url;
7-use std::{net::SocketAddr, path::PathBuf, time::Instant};
8-use tokio::sync::{mpsc, oneshot};
000910#[derive(Debug, Parser)]
11struct Cli {
12- /// Upstream PLC server
13- #[arg(short, long, global = true, env = "ALLEGEDLY_UPSTREAM")]
14- #[clap(default_value = "https://plc.directory")]
15- upstream: Url,
16 #[command(subcommand)]
17 command: Commands,
18}
···21enum Commands {
22 /// Use weekly bundled ops to get a complete directory mirror FAST
23 Backfill {
24- /// Remote URL prefix to fetch bundles from
25- #[arg(long)]
26- #[clap(default_value = "https://plc.t3.storage.dev/plc.directory/")]
27- http: Url,
28- /// Local folder to fetch bundles from (overrides `http`)
29- #[arg(long)]
30- dir: Option<PathBuf>,
31- /// Parallel bundle fetchers
32- ///
33- /// Default: 4 for http fetches, 1 for local folder
34- #[arg(long)]
35- source_workers: Option<usize>,
36- /// Bulk load into did-method-plc-compatible postgres instead of stdout
37- ///
38- /// Pass a postgres connection url like "postgresql://localhost:5432"
39- #[arg(long)]
40- to_postgres: Option<Url>,
41- /// Delete all operations from the postgres db before starting
42- ///
43- /// only used if `--to-postgres` is present
44- #[arg(long, action)]
45- postgres_reset: bool,
46- /// Stop at the week ending before this date
47- #[arg(long)]
48- until: Option<Dt>,
49- /// After the weekly imports, poll upstream until we're caught up
50- #[arg(long, action)]
51- catch_up: bool,
52 },
53 /// Scrape a PLC server, collecting ops into weekly bundles
54 ///
···73 },
74 /// Wrap a did-method-plc server, syncing upstream and blocking op submits
75 Mirror {
76- /// the wrapped did-method-plc server
77- #[arg(long, env = "ALLEGEDLY_WRAP")]
78- wrap: Url,
79- /// the wrapped did-method-plc server's database (write access required)
80- #[arg(long, env = "ALLEGEDLY_WRAP_PG")]
81- wrap_pg: Url,
82- /// wrapping server listen address
83- #[arg(short, long, env = "ALLEGEDLY_BIND")]
84- #[clap(default_value = "127.0.0.1:8000")]
85- bind: SocketAddr,
86- /// obtain a certificate from letsencrypt
87- ///
88- /// for now this will force listening on all interfaces at :80 and :443
89- /// (:80 will serve an "https required" error, *will not* redirect)
90- #[arg(
91- long,
92- conflicts_with("bind"),
93- requires("acme_cache_path"),
94- env = "ALLEGEDLY_ACME_DOMAIN"
95- )]
96- acme_domain: Vec<String>,
97- /// which local directory to keep the letsencrypt certs in
98- #[arg(long, requires("acme_domain"), env = "ALLEGEDLY_ACME_CACHE_PATH")]
99- acme_cache_path: Option<PathBuf>,
100- /// which public acme directory to use
101- ///
102- /// eg. letsencrypt staging: "https://acme-staging-v02.api.letsencrypt.org/directory"
103- #[arg(long, requires("acme_domain"), env = "ALLEGEDLY_ACME_DIRECTORY_URL")]
104- #[clap(default_value = "https://acme-v02.api.letsencrypt.org/directory")]
105- acme_directory_url: Url,
106 },
107 /// Poll an upstream PLC server and log new ops to stdout
108 Tail {
···112 },
113}
114115-async fn pages_to_stdout(
116- mut rx: mpsc::Receiver<ExportPage>,
117- notify_last_at: Option<oneshot::Sender<Option<Dt>>>,
118-) -> anyhow::Result<()> {
119- let mut last_at = None;
120- while let Some(page) = rx.recv().await {
121- for op in &page.ops {
122- println!("{op}");
123- }
124- if notify_last_at.is_some()
125- && let Some(s) = PageBoundaryState::new(&page)
126- {
127- last_at = last_at.filter(|&l| l >= s.last_at).or(Some(s.last_at));
128 }
129 }
130- if let Some(notify) = notify_last_at {
131- log::trace!("notifying last_at: {last_at:?}");
132- if notify.send(last_at).is_err() {
133- log::error!("receiver for last_at dropped, can't notify");
134- };
135- }
136- Ok(())
137-}
138-139-/// page forwarder who drops its channels on receipt of a small page
140-///
141-/// PLC will return up to 1000 ops on a page, and returns full pages until it
142-/// has caught up, so this is a (hacky?) way to stop polling once we're up.
143-fn full_pages(mut rx: mpsc::Receiver<ExportPage>) -> mpsc::Receiver<ExportPage> {
144- let (tx, fwd) = mpsc::channel(1);
145- tokio::task::spawn(async move {
146- while let Some(page) = rx.recv().await
147- && page.ops.len() > 900
148- {
149- tx.send(page).await.unwrap();
150- }
151- });
152- fwd
153}
154155#[tokio::main]
156-async fn main() {
157 let args = Cli::parse();
158 let matches = Cli::command().get_matches();
159 let name = matches.subcommand().map(|(name, _)| name).unwrap_or("???");
160- bin_init(name);
000161162 let t0 = Instant::now();
163 match args.command {
164- Commands::Backfill {
165- http,
166- dir,
167- source_workers,
168- to_postgres,
169- postgres_reset,
170- until,
171- catch_up,
172- } => {
173- let (tx, rx) = mpsc::channel(32); // these are big pages
174- tokio::task::spawn(async move {
175- if let Some(dir) = dir {
176- log::info!("Reading weekly bundles from local folder {dir:?}");
177- backfill(FolderSource(dir), tx, source_workers.unwrap_or(1), until)
178- .await
179- .unwrap();
180- } else {
181- log::info!("Fetching weekly bundles from from {http}");
182- backfill(HttpSource(http), tx, source_workers.unwrap_or(4), until)
183- .await
184- .unwrap();
185- }
186- });
187-188- // postgres writer will notify us as soon as the very last op's time is known
189- // so we can start catching up while pg is restoring indexes and stuff
190- let (notify_last_at, rx_last) = if catch_up {
191- let (tx, rx) = oneshot::channel();
192- (Some(tx), Some(rx))
193- } else {
194- (None, None)
195- };
196-197- let to_postgres_url_bulk = to_postgres.clone();
198- let bulk_out_write = tokio::task::spawn(async move {
199- if let Some(ref url) = to_postgres_url_bulk {
200- let db = Db::new(url.as_str()).await.unwrap();
201- backfill_to_pg(db, postgres_reset, rx, notify_last_at)
202- .await
203- .unwrap();
204- } else {
205- pages_to_stdout(rx, notify_last_at).await.unwrap();
206- }
207- });
208-209- if let Some(rx_last) = rx_last {
210- let mut upstream = args.upstream;
211- upstream.set_path("/export");
212- // wait until the time for `after` is known
213- let last_at = rx_last.await.unwrap();
214- log::info!("beginning catch-up from {last_at:?} while the writer finalizes stuff");
215- let (tx, rx) = mpsc::channel(256); // these are small pages
216- tokio::task::spawn(
217- async move { poll_upstream(last_at, upstream, tx).await.unwrap() },
218- );
219- bulk_out_write.await.unwrap();
220- log::info!("writing catch-up pages");
221- let full_pages = full_pages(rx);
222- if let Some(url) = to_postgres {
223- let db = Db::new(url.as_str()).await.unwrap();
224- pages_to_pg(db, full_pages).await.unwrap();
225- } else {
226- pages_to_stdout(full_pages, None).await.unwrap();
227- }
228- }
229- }
230 Commands::Bundle {
231 dest,
232 after,
233 clobber,
234 } => {
235- let mut url = args.upstream;
236 url.set_path("/export");
0237 let (tx, rx) = mpsc::channel(32); // read ahead if gzip stalls for some reason
238- tokio::task::spawn(async move { poll_upstream(Some(after), url, tx).await.unwrap() });
239- log::trace!("ensuring output directory exists");
240- std::fs::create_dir_all(&dest).unwrap();
241- pages_to_weeks(rx, dest, clobber).await.unwrap();
242- }
243- Commands::Mirror {
244- wrap,
245- wrap_pg,
246- bind,
247- acme_domain,
248- acme_cache_path,
249- acme_directory_url,
250- } => {
251- let db = Db::new(wrap_pg.as_str()).await.unwrap();
252- let latest = db
253- .get_latest()
254- .await
255- .unwrap()
256- .expect("there to be at least one op in the db. did you backfill?");
257-258- let (tx, rx) = mpsc::channel(2);
259- // upstream poller
260- let mut url = args.upstream.clone();
261 tokio::task::spawn(async move {
262- log::info!("starting poll reader...");
263- url.set_path("/export");
264- tokio::task::spawn(
265- async move { poll_upstream(Some(latest), url, tx).await.unwrap() },
266- );
267- });
268- // db writer
269- let poll_db = db.clone();
270- tokio::task::spawn(async move {
271- log::info!("starting db writer...");
272- pages_to_pg(poll_db, rx).await.unwrap();
273 });
274-275- let listen_conf = match (bind, acme_domain.is_empty(), acme_cache_path) {
276- (_, false, Some(cache_path)) => ListenConf::Acme {
277- domains: acme_domain,
278- cache_path,
279- directory_url: acme_directory_url.to_string(),
280- },
281- (bind, true, None) => ListenConf::Bind(bind),
282- (_, _, _) => unreachable!(),
283- };
284-285- serve(&args.upstream, wrap, listen_conf).await.unwrap();
286 }
00287 Commands::Tail { after } => {
288- let mut url = args.upstream;
289 url.set_path("/export");
290 let start_at = after.or_else(|| Some(chrono::Utc::now()));
0291 let (tx, rx) = mpsc::channel(1);
292- tokio::task::spawn(async move { poll_upstream(start_at, url, tx).await.unwrap() });
293- pages_to_stdout(rx, None).await.unwrap();
000000294 }
295 }
296 log::info!("whew, {:?}. goodbye!", t0.elapsed());
0297}