use axum::{Router, extract::State, routing::get}; use chrono::{DateTime, NaiveDateTime, Utc}; use futures::StreamExt as _; use listenfd::ListenFd; use maud::{DOCTYPE, Markup, html}; use serde_json::Value as JsonValue; use sqlx::postgres::PgPoolOptions; use sqlx::{Error, PgPool}; use tokio::net::TcpListener; use tower_http::trace::TraceLayer; pub mod firehose; use firehose::{FirehoseEvent, FirehoseOptions, subscribe_repos}; mod pds; use pds::get_all_active_dids_from_pdses; type Db = PgPool; #[tokio::main] async fn main() -> Result<(), Box> { tracing_subscriber::fmt::init(); dotenvy::dotenv().ok(); let pds_hosts_str = std::env::var("PDS_LIST")?; let pds_hosts: Vec = pds_hosts_str .split(',') .map(|s| s.trim().to_string()) .filter(|s| !s.is_empty()) .collect(); if pds_hosts.is_empty() { tracing::error!("Error: PDS_LIST environment variable is empty or contains only commas."); return Ok(()); } tracing::info!("Querying {} PDS(es): {:?}", pds_hosts.len(), pds_hosts); let _all_dids = get_all_active_dids_from_pdses(&pds_hosts).await?; let db_url = std::env::var("DATABASE_URL").expect("DATABASE_URL must be set"); let pool = PgPoolOptions::new() .max_connections(5) .connect(&db_url) .await?; sqlx::migrate!("./migrations").run(&pool).await?; tracing::info!("Database connected and migrations are up to date."); let web_server_pool = pool.clone(); tokio::spawn(async move { web_server(web_server_pool).await }); let relay_url = std::env::var("RELAY_URL").unwrap_or_default(); firehose_subscriber(pool, relay_url).await; Ok(()) } async fn firehose_subscriber(db: Db, relay_url: String) { tracing::info!("Starting firehose subscriber..."); let options = if relay_url.is_empty() { FirehoseOptions::default() } else { FirehoseOptions { relay_url, ..Default::default() } }; let mut stream = Box::pin(subscribe_repos(options)); while let Some(event_result) = stream.next().await { match event_result { Ok(FirehoseEvent::Commit(commit)) => { tracing::debug!( "Received a commit from {} with {} ops", commit.repo, commit.ops.len() ); match serde_json::to_value(&commit) { Ok(json_value) => { if let Err(e) = create_firehose_record(&db, &json_value).await { tracing::error!("Failed to write record to DB: {}", e); } } Err(e) => { tracing::error!("Failed to serialize commit to JSON: {}", e); } } } Ok(event) => { tracing::info!("Received other event: {:?}", event); } Err(e) => { tracing::error!("Firehose stream error: {}", e); } } } } async fn web_server(db: Db) { tracing::info!("Spinning up web server..."); let app = Router::new() .route("/", get(index)) .with_state(db) .layer(TraceLayer::new_for_http()); let mut listenfd = ListenFd::from_env(); let listener = match listenfd.take_tcp_listener(0).unwrap() { // if we are given a tcp listener on listen fd 0, we use that one Some(listener) => { listener.set_nonblocking(true).unwrap(); TcpListener::from_std(listener).unwrap() } // otherwise fall back to local listening _none => TcpListener::bind("127.0.0.1:3000").await.unwrap(), }; tracing::info!("Web server started!"); axum::serve(listener, app).await.unwrap(); } struct FirehoseMessage { message: JsonValue, } async fn index(State(db): State) -> Markup { let query_result = sqlx::query_as!( FirehoseMessage, "SELECT message FROM firehose_messages ORDER BY created_at DESC LIMIT 100" ) .fetch_all(&db) .await; let items = match query_result { Ok(messages) => messages, Err(e) => { tracing::error!("Failed to fetch messages from DB: {}", e); vec![] } }; page( "ShitSky", html! { p { "Hello from ShitSky" } @for item in &items { pre { (serde_json::to_string_pretty(&item.message).unwrap_or_default()) } } }, ) } fn page(title: &str, body: Markup) -> Markup { html! { (DOCTYPE) html { head { meta lang="en"; title { (title) } } body { (body) } } } } async fn create_firehose_record(db: &PgPool, record: &JsonValue) -> Result<(), Error> { let created_at_opt: Option = record .get("time") .and_then(|val| val.as_str()) .filter(|s| !s.is_empty()) .and_then(|s| s.parse::>().ok()) .map(|dt_utc| dt_utc.naive_utc()); if let Some(created_at) = created_at_opt { sqlx::query!( "INSERT INTO firehose_messages (message, created_at) VALUES ($1, $2)", record, created_at ) .execute(db) .await?; } else { sqlx::query!( "INSERT INTO firehose_messages (message) VALUES ($1)", record, ) .execute(db) .await?; } Ok(()) }