Mirror of https://git.olaren.dev/Olaren/moot-graph
at main 5.7 kB view raw
1use axum::{Router, extract::State, routing::get}; 2use chrono::{DateTime, NaiveDateTime, Utc}; 3use futures::StreamExt as _; 4use listenfd::ListenFd; 5use maud::{DOCTYPE, Markup, html}; 6use serde_json::Value as JsonValue; 7use sqlx::postgres::PgPoolOptions; 8use sqlx::{Error, PgPool}; 9use tokio::net::TcpListener; 10use tower_http::trace::TraceLayer; 11 12pub mod firehose; 13use firehose::{FirehoseEvent, FirehoseOptions, subscribe_repos}; 14 15mod pds; 16use pds::get_all_active_dids_from_pdses; 17 18type Db = PgPool; 19 20#[tokio::main] 21async fn main() -> Result<(), Box<dyn std::error::Error>> { 22 tracing_subscriber::fmt::init(); 23 dotenvy::dotenv().ok(); 24 25 let pds_hosts_str = std::env::var("PDS_LIST")?; 26 27 let pds_hosts: Vec<String> = pds_hosts_str 28 .split(',') 29 .map(|s| s.trim().to_string()) 30 .filter(|s| !s.is_empty()) 31 .collect(); 32 33 if pds_hosts.is_empty() { 34 tracing::error!("Error: PDS_LIST environment variable is empty or contains only commas."); 35 return Ok(()); 36 } 37 38 tracing::info!("Querying {} PDS(es): {:?}", pds_hosts.len(), pds_hosts); 39 40 let _all_dids = get_all_active_dids_from_pdses(&pds_hosts).await?; 41 42 let db_url = std::env::var("DATABASE_URL").expect("DATABASE_URL must be set"); 43 let pool = PgPoolOptions::new() 44 .max_connections(5) 45 .connect(&db_url) 46 .await?; 47 48 sqlx::migrate!("./migrations").run(&pool).await?; 49 50 tracing::info!("Database connected and migrations are up to date."); 51 52 let web_server_pool = pool.clone(); 53 tokio::spawn(async move { web_server(web_server_pool).await }); 54 55 let relay_url = std::env::var("RELAY_URL").unwrap_or_default(); 56 firehose_subscriber(pool, relay_url).await; 57 58 Ok(()) 59} 60 61async fn firehose_subscriber(db: Db, relay_url: String) { 62 tracing::info!("Starting firehose subscriber..."); 63 64 let options = if relay_url.is_empty() { 65 FirehoseOptions::default() 66 } else { 67 FirehoseOptions { 68 relay_url, 69 ..Default::default() 70 } 71 }; 72 73 let mut stream = Box::pin(subscribe_repos(options)); 74 75 while let Some(event_result) = stream.next().await { 76 match event_result { 77 Ok(FirehoseEvent::Commit(commit)) => { 78 tracing::debug!( 79 "Received a commit from {} with {} ops", 80 commit.repo, 81 commit.ops.len() 82 ); 83 84 match serde_json::to_value(&commit) { 85 Ok(json_value) => { 86 if let Err(e) = create_firehose_record(&db, &json_value).await { 87 tracing::error!("Failed to write record to DB: {}", e); 88 } 89 } 90 Err(e) => { 91 tracing::error!("Failed to serialize commit to JSON: {}", e); 92 } 93 } 94 } 95 Ok(event) => { 96 tracing::info!("Received other event: {:?}", event); 97 } 98 Err(e) => { 99 tracing::error!("Firehose stream error: {}", e); 100 } 101 } 102 } 103} 104 105async fn web_server(db: Db) { 106 tracing::info!("Spinning up web server..."); 107 108 let app = Router::new() 109 .route("/", get(index)) 110 .with_state(db) 111 .layer(TraceLayer::new_for_http()); 112 113 let mut listenfd = ListenFd::from_env(); 114 let listener = match listenfd.take_tcp_listener(0).unwrap() { 115 // if we are given a tcp listener on listen fd 0, we use that one 116 Some(listener) => { 117 listener.set_nonblocking(true).unwrap(); 118 TcpListener::from_std(listener).unwrap() 119 } 120 // otherwise fall back to local listening 121 _none => TcpListener::bind("127.0.0.1:3000").await.unwrap(), 122 }; 123 124 tracing::info!("Web server started!"); 125 axum::serve(listener, app).await.unwrap(); 126} 127 128struct FirehoseMessage { 129 message: JsonValue, 130} 131 132async fn index(State(db): State<Db>) -> Markup { 133 let query_result = sqlx::query_as!( 134 FirehoseMessage, 135 "SELECT message FROM firehose_messages ORDER BY created_at DESC LIMIT 100" 136 ) 137 .fetch_all(&db) 138 .await; 139 140 let items = match query_result { 141 Ok(messages) => messages, 142 Err(e) => { 143 tracing::error!("Failed to fetch messages from DB: {}", e); 144 vec![] 145 } 146 }; 147 148 page( 149 "ShitSky", 150 html! { 151 p { "Hello from ShitSky" } 152 @for item in &items { 153 pre { 154 (serde_json::to_string_pretty(&item.message).unwrap_or_default()) 155 } 156 } 157 }, 158 ) 159} 160 161fn page(title: &str, body: Markup) -> Markup { 162 html! { 163 (DOCTYPE) 164 html { 165 head { 166 meta lang="en"; 167 title { (title) } 168 } 169 body { 170 (body) 171 } 172 } 173 } 174} 175 176async fn create_firehose_record(db: &PgPool, record: &JsonValue) -> Result<(), Error> { 177 let created_at_opt: Option<NaiveDateTime> = record 178 .get("time") 179 .and_then(|val| val.as_str()) 180 .filter(|s| !s.is_empty()) 181 .and_then(|s| s.parse::<DateTime<Utc>>().ok()) 182 .map(|dt_utc| dt_utc.naive_utc()); 183 184 if let Some(created_at) = created_at_opt { 185 sqlx::query!( 186 "INSERT INTO firehose_messages (message, created_at) VALUES ($1, $2)", 187 record, 188 created_at 189 ) 190 .execute(db) 191 .await?; 192 } else { 193 sqlx::query!( 194 "INSERT INTO firehose_messages (message) VALUES ($1)", 195 record, 196 ) 197 .execute(db) 198 .await?; 199 } 200 201 Ok(()) 202}