Server tools to backfill, tail, mirror, and verify PLC logs

fjall: fix mirror test to wait for backfill to receive page

ptr.pet 6dbca756 ccedaa32

verified
+61 -54
+61 -54
tests/fjall_mirror_test.rs
··· 1 1 use allegedly::{ 2 - ExperimentalConf, FjallDb, ListenConf, backfill_to_fjall, poll_upstream, serve_fjall, 2 + ExperimentalConf, FjallDb, ListenConf, backfill_to_fjall, bin::bin_init, poll_upstream, 3 + serve_fjall, 3 4 }; 4 - use reqwest::Url; 5 + use futures::TryFutureExt; 6 + use reqwest::{StatusCode, Url}; 5 7 use std::time::Duration; 6 8 use tokio::sync::mpsc; 7 9 8 10 #[tokio::test] 9 11 async fn test_fjall_mirror_mode() -> anyhow::Result<()> { 10 - let _ = tracing_subscriber::fmt::try_init(); 11 - 12 - // setup 12 + bin_init(false); 13 13 let temp_dir = tempfile::tempdir()?; 14 14 let db_path = temp_dir.path().join("fjall.db"); 15 15 let db = FjallDb::open(&db_path)?; ··· 18 18 let (backfill_tx, backfill_rx) = mpsc::channel(1); 19 19 let (upstream_tx, mut upstream_rx) = mpsc::channel(1); 20 20 21 + let upstream_url: Url = "https://plc.directory".parse()?; 22 + 21 23 // spawn upstream poller 22 - let upstream_url: Url = "https://plc.directory/export".parse()?; 23 - tokio::spawn(async move { 24 - // poll fresh data so our data matches the upstream 25 - let start_at = chrono::Utc::now() - chrono::Duration::try_minutes(5).unwrap(); 26 - let _ = poll_upstream( 27 - Some(start_at), 28 - upstream_url, 29 - Duration::from_millis(100), 30 - upstream_tx, 31 - ) 32 - .await; 24 + tokio::spawn({ 25 + let mut base = upstream_url.clone(); 26 + base.set_path("/export"); 27 + async move { 28 + // poll fresh data so our data matches the upstream 29 + let start_at = chrono::Utc::now() - chrono::Duration::try_minutes(5).unwrap(); 30 + let _ = poll_upstream( 31 + Some(start_at), 32 + base, 33 + Duration::from_millis(100), 34 + upstream_tx, 35 + ) 36 + .inspect_err(|err| log::error!("failed to poll upstream: {err}")) 37 + .await; 38 + } 33 39 }); 34 40 35 - // bridge: take 1 page from upstream and forward to backfill 36 - println!("waiting for page from upstream..."); 41 + log::info!("waiting for page from upstream..."); 37 42 let page = upstream_rx 38 43 .recv() 39 44 .await 40 45 .expect("to receive page from upstream"); 41 - println!("received page with {} ops", page.ops.len()); 46 + log::info!("received page with {} ops", page.ops.len()); 42 47 let sample_did = page.ops.last().unwrap().did.clone(); 48 + println!("will check did {sample_did}"); 43 49 44 50 backfill_tx.send(page).await?; 45 - drop(backfill_tx); // close backfill input 46 - 47 - backfill_to_fjall(db.clone(), false, backfill_rx, None).await?; 51 + let backfill_handle = tokio::spawn(backfill_to_fjall(db.clone(), false, backfill_rx, None)); 52 + // since we are using a channel with 1 capacity, we can wait that the backfill task received 53 + // the page by reserving on the channel, and then drop the sender to signal the backfill task to finish 54 + let _ = backfill_tx.reserve().await; 55 + drop(backfill_tx); 56 + backfill_handle.await??; 48 57 49 - // get free port 58 + // todo: should probably use a random port here but shrug 50 59 let listener = std::net::TcpListener::bind("127.0.0.1:17548")?; 51 60 let port = listener.local_addr()?.port(); 52 61 drop(listener); ··· 57 66 write_upstream: false, 58 67 }; 59 68 60 - let db_for_server = db.clone(); 61 - let server_handle = tokio::spawn(async move { 62 - let upstream: Url = "https://plc.directory".parse().unwrap(); 63 - serve_fjall(upstream, listen_conf, exp_conf, db_for_server).await 69 + let server_handle = tokio::spawn({ 70 + let db = db.clone(); 71 + let upstream = upstream_url.clone(); 72 + serve_fjall(upstream, listen_conf, exp_conf, db) 73 + .inspect_err(|err| log::error!("failed to serve: {err}")) 64 74 }); 75 + let base_url = format!("http://127.0.0.1:{}", port); 65 76 66 - // wait for server to be ready (retry loop) 77 + // wait for server to be ready 67 78 let client = reqwest::Client::new(); 68 - let base_url = format!("http://127.0.0.1:{}", port); 69 - let mut ready = false; 79 + let health_url = format!("{base_url}/_health"); 80 + let mut ready = None; 70 81 for _ in 0..50 { 71 - if client 72 - .get(format!("{}/_health", base_url)) 73 - .send() 74 - .await 75 - .is_ok() 76 - { 77 - ready = true; 82 + let resp = match client.get(&health_url).send().await { 83 + Ok(resp) => resp, 84 + Err(err) => { 85 + log::warn!("failed to get health: {err}"); 86 + continue; 87 + } 88 + }; 89 + if resp.status().is_success() { 90 + let json: serde_json::Value = resp.json().await?; 91 + ready = Some(json); 78 92 break; 79 93 } 80 94 tokio::time::sleep(Duration::from_millis(100)).await; 81 95 } 82 - assert!(ready, "server failed to start"); 83 - 84 - // verify health 85 - let resp = client.get(format!("{}/_health", base_url)).send().await?; 86 - assert!(resp.status().is_success()); 87 - let json: serde_json::Value = resp.json().await?; 88 - assert_eq!(json["server"], "allegedly (mirror/fjall)"); 96 + assert!(ready.is_some(), "server failed to start"); 97 + assert_eq!(ready.unwrap()["server"], "allegedly (mirror/fjall)"); 89 98 90 99 // verify did resolution against upstream 91 - let upstream_resp = client 92 - .get(format!("https://plc.directory/{}", sample_did)) 93 - .send() 94 - .await?; 95 - assert!(upstream_resp.status().is_success()); 100 + let mut doc_url = upstream_url.clone(); 101 + doc_url.set_path(&format!("/{sample_did}")); 102 + let upstream_resp = client.get(doc_url).send().await?; 103 + assert_eq!(upstream_resp.status(), StatusCode::OK); 96 104 let upstream_doc: serde_json::Value = upstream_resp.json().await?; 97 105 98 - let resp = client 99 - .get(format!("{}/{}", base_url, sample_did)) 100 - .send() 101 - .await?; 102 - assert!(resp.status().is_success()); 106 + let local_doc_url = format!("{base_url}/{sample_did}"); 107 + let resp = client.get(local_doc_url).send().await?; 108 + assert_eq!(resp.status(), StatusCode::OK); 103 109 let doc: serde_json::Value = resp.json().await?; 110 + 104 111 assert_eq!( 105 112 doc, upstream_doc, 106 113 "local doc != upstream doc.\nlocal: {:#?}\nupstream: {:#?}",