use std::path::PathBuf; use std::sync::Arc; use std::time::Instant; use anyhow::{Context, Result}; use onis_common::metrics::PrometheusHandle; use sqlx::SqlitePool; use tokio::sync::RwLock; use onis_common::config::DatabaseConfig; use onis_common::db; use crate::tap::{RecordAction, TapEvent, TapRecordEvent}; pub struct AppState { /// Reverse index: domain → DID, verification status pub index: SqlitePool, /// Base directory for per-DID SQLite databases pub db_dir: PathBuf, /// Database pool configuration pub db_config: DatabaseConfig, /// Cache of open per-DID database pools pub user_dbs: RwLock>, /// Prometheus metrics handle for /metrics endpoint pub metrics_handle: PrometheusHandle, } impl AppState { pub async fn new( index_path: &std::path::Path, db_dir: PathBuf, db_config: DatabaseConfig, metrics_handle: PrometheusHandle, ) -> Result { let index = db::open_index_db(index_path, &db_config).await?; Ok(Self { index, db_dir, db_config, user_dbs: RwLock::new(std::collections::HashMap::new()), metrics_handle, }) } /// Get or create a per-DID database pool. pub async fn get_user_db(&self, did: &str) -> Result { { let dbs = self.user_dbs.read().await; if let Some(pool) = dbs.get(did) { return Ok(pool.clone()); } } let path = db::did_to_db_path(&self.db_dir, did); let pool = db::open_user_db(&path, &self.db_config).await?; let mut dbs = self.user_dbs.write().await; dbs.insert(did.to_string(), pool.clone()); Ok(pool) } } /// Look up the domain for a record by rkey, then delete the row. /// Returns the domain if the record existed. async fn delete_by_rkey(pool: &SqlitePool, table: &str, rkey: &str) -> Result> { let select = format!("SELECT domain FROM {table} WHERE rkey = ?"); let domain: Option<(String,)> = sqlx::query_as(&select) .bind(rkey) .fetch_optional(pool) .await?; let delete = format!("DELETE FROM {table} WHERE rkey = ?"); sqlx::query(&delete) .bind(rkey) .execute(pool) .await .with_context(|| format!("failed to delete from {table}"))?; Ok(domain.map(|(d,)| d)) } /// Yield each candidate zone for a domain by walking up the tree. /// e.g. "a.b.example.com" → ["a.b.example.com", "b.example.com", "example.com"] pub(crate) fn domain_ancestors(domain: &str) -> impl Iterator + '_ { let parts: Vec<&str> = domain.split('.').collect(); (0..parts.len().saturating_sub(1)).map(move |i| parts[i..].join(".")) } /// Check if the user has a declared zone that covers the given domain. async fn has_zone_for(pool: &SqlitePool, domain: &str) -> Result { for candidate in domain_ancestors(domain) { let found: Option<(i64,)> = sqlx::query_as("SELECT 1 FROM zones WHERE domain = ?") .bind(&candidate) .fetch_optional(pool) .await?; if found.is_some() { return Ok(true); } } Ok(false) } pub async fn handle_event(state: Arc, event: TapEvent) -> Result<()> { match event.event_type.as_str() { "record" => { let rec = event.record.context("record event missing record field")?; let collection = match rec.collection.as_str() { "systems.kiri.zone" => "zone", "systems.kiri.dns" => "dns", _ => return Ok(()), }; let action = match rec.action { RecordAction::Create => "create", RecordAction::Update => "update", RecordAction::Delete => "delete", }; let result = match collection { "zone" => handle_zone_event(state, rec).await, _ => handle_record_event(state, rec).await, }; let status = if result.is_ok() { "success" } else { "error" }; metrics::counter!( "appview_firehose_events_total", "collection" => collection, "action" => action, "status" => status, ) .increment(1); result } "identity" => { if let Some(ident) = event.identity { tracing::debug!( did = %ident.did, handle = %ident.handle, active = ident.is_active, "identity event" ); } Ok(()) } other => { tracing::debug!("ignoring tap event type: {other}"); Ok(()) } } } async fn handle_record_event(state: Arc, event: TapRecordEvent) -> Result<()> { let did = &event.did; let rkey = &event.rkey; match event.action { RecordAction::Create | RecordAction::Update => { let record_value = event .record .context("create/update event missing record payload")?; let domain = record_value .get("domain") .and_then(|v| v.as_str()) .context("record missing domain field")? .to_lowercase(); let record_type = record_value .get("record") .and_then(|v| v.get("$type")) .and_then(|v| v.as_str()) .context("record missing $type")?; let type_name = record_type .strip_prefix("systems.kiri.dns#") .context("unexpected $type prefix")? .trim_end_matches("Record"); let user_db = state.get_user_db(did).await?; if !has_zone_for(&user_db, &domain).await? { tracing::warn!( did = %did, domain = %domain, "record for {domain} has no matching zone, skipping" ); metrics::counter!("appview_records_skipped_no_zone_total").increment(1); return Ok(()); } let data = serde_json::to_string(&record_value)?; let now = chrono::Utc::now().timestamp(); let write_start = Instant::now(); sqlx::query( "INSERT INTO records (rkey, domain, record_type, data, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?) ON CONFLICT(rkey) DO UPDATE SET domain = excluded.domain, record_type = excluded.record_type, data = excluded.data, updated_at = excluded.updated_at", ) .bind(rkey) .bind(&domain) .bind(type_name) .bind(&data) .bind(now) .bind(now) .execute(&user_db) .await .context("failed to upsert record")?; metrics::histogram!("appview_sqlite_write_duration_seconds") .record(write_start.elapsed().as_secs_f64()); tracing::info!( did = %did, rkey = %rkey, domain = %domain, record_type = %type_name, live = event.live, "record upserted" ); } RecordAction::Delete => { let Ok(user_db) = state.get_user_db(did).await else { tracing::debug!(did = %did, rkey = %rkey, "delete for unknown DID, skipping"); return Ok(()); }; let write_start = Instant::now(); let domain = delete_by_rkey(&user_db, "records", rkey).await?; metrics::histogram!("appview_sqlite_write_duration_seconds") .record(write_start.elapsed().as_secs_f64()); if let Some(domain) = domain { tracing::info!( did = %did, rkey = %rkey, domain = %domain, live = event.live, "record deleted" ); } } } Ok(()) } async fn handle_zone_event(state: Arc, event: TapRecordEvent) -> Result<()> { let did = &event.did; let rkey = &event.rkey; match event.action { RecordAction::Create | RecordAction::Update => { let record_value = event .record .context("create/update zone event missing record payload")?; let domain = record_value .get("domain") .and_then(|v| v.as_str()) .context("zone record missing domain field")? .to_lowercase(); let now = chrono::Utc::now().timestamp(); let user_db = state.get_user_db(did).await?; let write_start = Instant::now(); sqlx::query( "INSERT INTO zones (rkey, domain, created_at) VALUES (?, ?, ?) ON CONFLICT(rkey) DO UPDATE SET domain = excluded.domain", ) .bind(rkey) .bind(&domain) .bind(now) .execute(&user_db) .await .context("failed to upsert zone")?; sqlx::query( "INSERT INTO zone_index (zone, did, first_seen) VALUES (?, ?, ?) ON CONFLICT(zone, did) DO NOTHING", ) .bind(&domain) .bind(did) .bind(now) .execute(&state.index) .await .context("failed to update zone index")?; metrics::histogram!("appview_sqlite_write_duration_seconds") .record(write_start.elapsed().as_secs_f64()); tracing::info!( did = %did, rkey = %rkey, domain = %domain, live = event.live, "zone upserted" ); } RecordAction::Delete => { let Ok(user_db) = state.get_user_db(did).await else { tracing::debug!(did = %did, rkey = %rkey, "zone delete for unknown DID, skipping"); return Ok(()); }; let write_start = Instant::now(); let domain = delete_by_rkey(&user_db, "zones", rkey).await?; if let Some(domain) = domain { sqlx::query("DELETE FROM zone_index WHERE zone = ? AND did = ?") .bind(&domain) .bind(did) .execute(&state.index) .await .context("failed to delete from zone index")?; metrics::histogram!("appview_sqlite_write_duration_seconds") .record(write_start.elapsed().as_secs_f64()); tracing::info!( did = %did, rkey = %rkey, domain = %domain, live = event.live, "zone deleted" ); } } } Ok(()) }