auth dns over atproto
at main 339 lines 11 kB view raw
1use std::path::PathBuf; 2use std::sync::Arc; 3use std::time::Instant; 4 5use anyhow::{Context, Result}; 6use onis_common::metrics::PrometheusHandle; 7use sqlx::SqlitePool; 8use tokio::sync::RwLock; 9 10use onis_common::config::DatabaseConfig; 11use onis_common::db; 12 13use crate::tap::{RecordAction, TapEvent, TapRecordEvent}; 14 15pub struct AppState { 16 /// Reverse index: domain → DID, verification status 17 pub index: SqlitePool, 18 /// Base directory for per-DID SQLite databases 19 pub db_dir: PathBuf, 20 /// Database pool configuration 21 pub db_config: DatabaseConfig, 22 /// Cache of open per-DID database pools 23 pub user_dbs: RwLock<std::collections::HashMap<String, SqlitePool>>, 24 /// Prometheus metrics handle for /metrics endpoint 25 pub metrics_handle: PrometheusHandle, 26} 27 28impl AppState { 29 pub async fn new( 30 index_path: &std::path::Path, 31 db_dir: PathBuf, 32 db_config: DatabaseConfig, 33 metrics_handle: PrometheusHandle, 34 ) -> Result<Self> { 35 let index = db::open_index_db(index_path, &db_config).await?; 36 Ok(Self { 37 index, 38 db_dir, 39 db_config, 40 user_dbs: RwLock::new(std::collections::HashMap::new()), 41 metrics_handle, 42 }) 43 } 44 45 /// Get or create a per-DID database pool. 46 pub async fn get_user_db(&self, did: &str) -> Result<SqlitePool> { 47 { 48 let dbs = self.user_dbs.read().await; 49 if let Some(pool) = dbs.get(did) { 50 return Ok(pool.clone()); 51 } 52 } 53 54 let path = db::did_to_db_path(&self.db_dir, did); 55 let pool = db::open_user_db(&path, &self.db_config).await?; 56 57 let mut dbs = self.user_dbs.write().await; 58 dbs.insert(did.to_string(), pool.clone()); 59 Ok(pool) 60 } 61} 62 63/// Look up the domain for a record by rkey, then delete the row. 64/// Returns the domain if the record existed. 65async fn delete_by_rkey(pool: &SqlitePool, table: &str, rkey: &str) -> Result<Option<String>> { 66 let select = format!("SELECT domain FROM {table} WHERE rkey = ?"); 67 let domain: Option<(String,)> = sqlx::query_as(&select) 68 .bind(rkey) 69 .fetch_optional(pool) 70 .await?; 71 72 let delete = format!("DELETE FROM {table} WHERE rkey = ?"); 73 sqlx::query(&delete) 74 .bind(rkey) 75 .execute(pool) 76 .await 77 .with_context(|| format!("failed to delete from {table}"))?; 78 79 Ok(domain.map(|(d,)| d)) 80} 81 82/// Yield each candidate zone for a domain by walking up the tree. 83/// e.g. "a.b.example.com" → ["a.b.example.com", "b.example.com", "example.com"] 84pub(crate) fn domain_ancestors(domain: &str) -> impl Iterator<Item = String> + '_ { 85 let parts: Vec<&str> = domain.split('.').collect(); 86 (0..parts.len().saturating_sub(1)).map(move |i| parts[i..].join(".")) 87} 88 89/// Check if the user has a declared zone that covers the given domain. 90async fn has_zone_for(pool: &SqlitePool, domain: &str) -> Result<bool> { 91 for candidate in domain_ancestors(domain) { 92 let found: Option<(i64,)> = 93 sqlx::query_as("SELECT 1 FROM zones WHERE domain = ?") 94 .bind(&candidate) 95 .fetch_optional(pool) 96 .await?; 97 if found.is_some() { 98 return Ok(true); 99 } 100 } 101 Ok(false) 102} 103 104pub async fn handle_event(state: Arc<AppState>, event: TapEvent) -> Result<()> { 105 match event.event_type.as_str() { 106 "record" => { 107 let rec = event.record.context("record event missing record field")?; 108 let collection = match rec.collection.as_str() { 109 "systems.kiri.zone" => "zone", 110 "systems.kiri.dns" => "dns", 111 _ => return Ok(()), 112 }; 113 let action = match rec.action { 114 RecordAction::Create => "create", 115 RecordAction::Update => "update", 116 RecordAction::Delete => "delete", 117 }; 118 119 let result = match collection { 120 "zone" => handle_zone_event(state, rec).await, 121 _ => handle_record_event(state, rec).await, 122 }; 123 124 let status = if result.is_ok() { "success" } else { "error" }; 125 metrics::counter!( 126 "appview_firehose_events_total", 127 "collection" => collection, 128 "action" => action, 129 "status" => status, 130 ) 131 .increment(1); 132 133 result 134 } 135 "identity" => { 136 if let Some(ident) = event.identity { 137 tracing::debug!( 138 did = %ident.did, 139 handle = %ident.handle, 140 active = ident.is_active, 141 "identity event" 142 ); 143 } 144 Ok(()) 145 } 146 other => { 147 tracing::debug!("ignoring tap event type: {other}"); 148 Ok(()) 149 } 150 } 151} 152 153async fn handle_record_event(state: Arc<AppState>, event: TapRecordEvent) -> Result<()> { 154 let did = &event.did; 155 let rkey = &event.rkey; 156 157 match event.action { 158 RecordAction::Create | RecordAction::Update => { 159 let record_value = event 160 .record 161 .context("create/update event missing record payload")?; 162 163 let domain = record_value 164 .get("domain") 165 .and_then(|v| v.as_str()) 166 .context("record missing domain field")? 167 .to_lowercase(); 168 169 let record_type = record_value 170 .get("record") 171 .and_then(|v| v.get("$type")) 172 .and_then(|v| v.as_str()) 173 .context("record missing $type")?; 174 175 let type_name = record_type 176 .strip_prefix("systems.kiri.dns#") 177 .context("unexpected $type prefix")? 178 .trim_end_matches("Record"); 179 180 let user_db = state.get_user_db(did).await?; 181 182 if !has_zone_for(&user_db, &domain).await? { 183 tracing::warn!( 184 did = %did, 185 domain = %domain, 186 "record for {domain} has no matching zone, skipping" 187 ); 188 metrics::counter!("appview_records_skipped_no_zone_total").increment(1); 189 return Ok(()); 190 } 191 192 let data = serde_json::to_string(&record_value)?; 193 let now = chrono::Utc::now().timestamp(); 194 195 let write_start = Instant::now(); 196 sqlx::query( 197 "INSERT INTO records (rkey, domain, record_type, data, created_at, updated_at) 198 VALUES (?, ?, ?, ?, ?, ?) 199 ON CONFLICT(rkey) DO UPDATE SET 200 domain = excluded.domain, 201 record_type = excluded.record_type, 202 data = excluded.data, 203 updated_at = excluded.updated_at", 204 ) 205 .bind(rkey) 206 .bind(&domain) 207 .bind(type_name) 208 .bind(&data) 209 .bind(now) 210 .bind(now) 211 .execute(&user_db) 212 .await 213 .context("failed to upsert record")?; 214 metrics::histogram!("appview_sqlite_write_duration_seconds") 215 .record(write_start.elapsed().as_secs_f64()); 216 217 tracing::info!( 218 did = %did, 219 rkey = %rkey, 220 domain = %domain, 221 record_type = %type_name, 222 live = event.live, 223 "record upserted" 224 ); 225 } 226 227 RecordAction::Delete => { 228 let Ok(user_db) = state.get_user_db(did).await else { 229 tracing::debug!(did = %did, rkey = %rkey, "delete for unknown DID, skipping"); 230 return Ok(()); 231 }; 232 233 let write_start = Instant::now(); 234 let domain = delete_by_rkey(&user_db, "records", rkey).await?; 235 metrics::histogram!("appview_sqlite_write_duration_seconds") 236 .record(write_start.elapsed().as_secs_f64()); 237 238 if let Some(domain) = domain { 239 tracing::info!( 240 did = %did, 241 rkey = %rkey, 242 domain = %domain, 243 live = event.live, 244 "record deleted" 245 ); 246 } 247 } 248 } 249 250 Ok(()) 251} 252 253async fn handle_zone_event(state: Arc<AppState>, event: TapRecordEvent) -> Result<()> { 254 let did = &event.did; 255 let rkey = &event.rkey; 256 257 match event.action { 258 RecordAction::Create | RecordAction::Update => { 259 let record_value = event 260 .record 261 .context("create/update zone event missing record payload")?; 262 263 let domain = record_value 264 .get("domain") 265 .and_then(|v| v.as_str()) 266 .context("zone record missing domain field")? 267 .to_lowercase(); 268 269 let now = chrono::Utc::now().timestamp(); 270 let user_db = state.get_user_db(did).await?; 271 272 let write_start = Instant::now(); 273 sqlx::query( 274 "INSERT INTO zones (rkey, domain, created_at) 275 VALUES (?, ?, ?) 276 ON CONFLICT(rkey) DO UPDATE SET domain = excluded.domain", 277 ) 278 .bind(rkey) 279 .bind(&domain) 280 .bind(now) 281 .execute(&user_db) 282 .await 283 .context("failed to upsert zone")?; 284 285 sqlx::query( 286 "INSERT INTO zone_index (zone, did, first_seen) 287 VALUES (?, ?, ?) 288 ON CONFLICT(zone, did) DO NOTHING", 289 ) 290 .bind(&domain) 291 .bind(did) 292 .bind(now) 293 .execute(&state.index) 294 .await 295 .context("failed to update zone index")?; 296 metrics::histogram!("appview_sqlite_write_duration_seconds") 297 .record(write_start.elapsed().as_secs_f64()); 298 299 tracing::info!( 300 did = %did, 301 rkey = %rkey, 302 domain = %domain, 303 live = event.live, 304 "zone upserted" 305 ); 306 } 307 308 RecordAction::Delete => { 309 let Ok(user_db) = state.get_user_db(did).await else { 310 tracing::debug!(did = %did, rkey = %rkey, "zone delete for unknown DID, skipping"); 311 return Ok(()); 312 }; 313 314 let write_start = Instant::now(); 315 let domain = delete_by_rkey(&user_db, "zones", rkey).await?; 316 317 if let Some(domain) = domain { 318 sqlx::query("DELETE FROM zone_index WHERE zone = ? AND did = ?") 319 .bind(&domain) 320 .bind(did) 321 .execute(&state.index) 322 .await 323 .context("failed to delete from zone index")?; 324 metrics::histogram!("appview_sqlite_write_duration_seconds") 325 .record(write_start.elapsed().as_secs_f64()); 326 327 tracing::info!( 328 did = %did, 329 rkey = %rkey, 330 domain = %domain, 331 live = event.live, 332 "zone deleted" 333 ); 334 } 335 } 336 } 337 338 Ok(()) 339}