Mirror of https://git.olaren.dev/Olaren/moot-graph
1use axum::{Router, extract::State, routing::get};
2use chrono::{DateTime, NaiveDateTime, Utc};
3use futures::StreamExt as _;
4use listenfd::ListenFd;
5use maud::{DOCTYPE, Markup, html};
6use serde_json::Value as JsonValue;
7use sqlx::postgres::PgPoolOptions;
8use sqlx::{Error, PgPool};
9use tokio::net::TcpListener;
10use tower_http::trace::TraceLayer;
11
12pub mod firehose;
13use firehose::{FirehoseEvent, FirehoseOptions, subscribe_repos};
14
15mod pds;
16use pds::get_all_active_dids_from_pdses;
17
18type Db = PgPool;
19
20#[tokio::main]
21async fn main() -> Result<(), Box<dyn std::error::Error>> {
22 tracing_subscriber::fmt::init();
23 dotenvy::dotenv().ok();
24
25 let pds_hosts_str = std::env::var("PDS_LIST")?;
26
27 let pds_hosts: Vec<String> = pds_hosts_str
28 .split(',')
29 .map(|s| s.trim().to_string())
30 .filter(|s| !s.is_empty())
31 .collect();
32
33 if pds_hosts.is_empty() {
34 tracing::error!("Error: PDS_LIST environment variable is empty or contains only commas.");
35 return Ok(());
36 }
37
38 tracing::info!("Querying {} PDS(es): {:?}", pds_hosts.len(), pds_hosts);
39
40 let _all_dids = get_all_active_dids_from_pdses(&pds_hosts).await?;
41
42 let db_url = std::env::var("DATABASE_URL").expect("DATABASE_URL must be set");
43 let pool = PgPoolOptions::new()
44 .max_connections(5)
45 .connect(&db_url)
46 .await?;
47
48 sqlx::migrate!("./migrations").run(&pool).await?;
49
50 tracing::info!("Database connected and migrations are up to date.");
51
52 let web_server_pool = pool.clone();
53 tokio::spawn(async move { web_server(web_server_pool).await });
54
55 let relay_url = std::env::var("RELAY_URL").unwrap_or_default();
56 firehose_subscriber(pool, relay_url).await;
57
58 Ok(())
59}
60
61async fn firehose_subscriber(db: Db, relay_url: String) {
62 tracing::info!("Starting firehose subscriber...");
63
64 let options = if relay_url.is_empty() {
65 FirehoseOptions::default()
66 } else {
67 FirehoseOptions {
68 relay_url,
69 ..Default::default()
70 }
71 };
72
73 let mut stream = Box::pin(subscribe_repos(options));
74
75 while let Some(event_result) = stream.next().await {
76 match event_result {
77 Ok(FirehoseEvent::Commit(commit)) => {
78 tracing::debug!(
79 "Received a commit from {} with {} ops",
80 commit.repo,
81 commit.ops.len()
82 );
83
84 match serde_json::to_value(&commit) {
85 Ok(json_value) => {
86 if let Err(e) = create_firehose_record(&db, &json_value).await {
87 tracing::error!("Failed to write record to DB: {}", e);
88 }
89 }
90 Err(e) => {
91 tracing::error!("Failed to serialize commit to JSON: {}", e);
92 }
93 }
94 }
95 Ok(event) => {
96 tracing::info!("Received other event: {:?}", event);
97 }
98 Err(e) => {
99 tracing::error!("Firehose stream error: {}", e);
100 }
101 }
102 }
103}
104
105async fn web_server(db: Db) {
106 tracing::info!("Spinning up web server...");
107
108 let app = Router::new()
109 .route("/", get(index))
110 .with_state(db)
111 .layer(TraceLayer::new_for_http());
112
113 let mut listenfd = ListenFd::from_env();
114 let listener = match listenfd.take_tcp_listener(0).unwrap() {
115 // if we are given a tcp listener on listen fd 0, we use that one
116 Some(listener) => {
117 listener.set_nonblocking(true).unwrap();
118 TcpListener::from_std(listener).unwrap()
119 }
120 // otherwise fall back to local listening
121 _none => TcpListener::bind("127.0.0.1:3000").await.unwrap(),
122 };
123
124 tracing::info!("Web server started!");
125 axum::serve(listener, app).await.unwrap();
126}
127
128struct FirehoseMessage {
129 message: JsonValue,
130}
131
132async fn index(State(db): State<Db>) -> Markup {
133 let query_result = sqlx::query_as!(
134 FirehoseMessage,
135 "SELECT message FROM firehose_messages ORDER BY created_at DESC LIMIT 100"
136 )
137 .fetch_all(&db)
138 .await;
139
140 let items = match query_result {
141 Ok(messages) => messages,
142 Err(e) => {
143 tracing::error!("Failed to fetch messages from DB: {}", e);
144 vec![]
145 }
146 };
147
148 page(
149 "ShitSky",
150 html! {
151 p { "Hello from ShitSky" }
152 @for item in &items {
153 pre {
154 (serde_json::to_string_pretty(&item.message).unwrap_or_default())
155 }
156 }
157 },
158 )
159}
160
161fn page(title: &str, body: Markup) -> Markup {
162 html! {
163 (DOCTYPE)
164 html {
165 head {
166 meta lang="en";
167 title { (title) }
168 }
169 body {
170 (body)
171 }
172 }
173 }
174}
175
176async fn create_firehose_record(db: &PgPool, record: &JsonValue) -> Result<(), Error> {
177 let created_at_opt: Option<NaiveDateTime> = record
178 .get("time")
179 .and_then(|val| val.as_str())
180 .filter(|s| !s.is_empty())
181 .and_then(|s| s.parse::<DateTime<Utc>>().ok())
182 .map(|dt_utc| dt_utc.naive_utc());
183
184 if let Some(created_at) = created_at_opt {
185 sqlx::query!(
186 "INSERT INTO firehose_messages (message, created_at) VALUES ($1, $2)",
187 record,
188 created_at
189 )
190 .execute(db)
191 .await?;
192 } else {
193 sqlx::query!(
194 "INSERT INTO firehose_messages (message) VALUES ($1)",
195 record,
196 )
197 .execute(db)
198 .await?;
199 }
200
201 Ok(())
202}