1use std::path::PathBuf;
2use std::sync::Arc;
3use std::time::Instant;
4
5use anyhow::{Context, Result};
6use onis_common::metrics::PrometheusHandle;
7use sqlx::SqlitePool;
8use tokio::sync::RwLock;
9
10use onis_common::config::DatabaseConfig;
11use onis_common::db;
12
13use crate::tap::{RecordAction, TapEvent, TapRecordEvent};
14
15pub struct AppState {
16 /// Reverse index: domain → DID, verification status
17 pub index: SqlitePool,
18 /// Base directory for per-DID SQLite databases
19 pub db_dir: PathBuf,
20 /// Database pool configuration
21 pub db_config: DatabaseConfig,
22 /// Cache of open per-DID database pools
23 pub user_dbs: RwLock<std::collections::HashMap<String, SqlitePool>>,
24 /// Prometheus metrics handle for /metrics endpoint
25 pub metrics_handle: PrometheusHandle,
26}
27
28impl AppState {
29 pub async fn new(
30 index_path: &std::path::Path,
31 db_dir: PathBuf,
32 db_config: DatabaseConfig,
33 metrics_handle: PrometheusHandle,
34 ) -> Result<Self> {
35 let index = db::open_index_db(index_path, &db_config).await?;
36 Ok(Self {
37 index,
38 db_dir,
39 db_config,
40 user_dbs: RwLock::new(std::collections::HashMap::new()),
41 metrics_handle,
42 })
43 }
44
45 /// Get or create a per-DID database pool.
46 pub async fn get_user_db(&self, did: &str) -> Result<SqlitePool> {
47 {
48 let dbs = self.user_dbs.read().await;
49 if let Some(pool) = dbs.get(did) {
50 return Ok(pool.clone());
51 }
52 }
53
54 let path = db::did_to_db_path(&self.db_dir, did);
55 let pool = db::open_user_db(&path, &self.db_config).await?;
56
57 let mut dbs = self.user_dbs.write().await;
58 dbs.insert(did.to_string(), pool.clone());
59 Ok(pool)
60 }
61}
62
63/// Look up the domain for a record by rkey, then delete the row.
64/// Returns the domain if the record existed.
65async fn delete_by_rkey(pool: &SqlitePool, table: &str, rkey: &str) -> Result<Option<String>> {
66 let select = format!("SELECT domain FROM {table} WHERE rkey = ?");
67 let domain: Option<(String,)> = sqlx::query_as(&select)
68 .bind(rkey)
69 .fetch_optional(pool)
70 .await?;
71
72 let delete = format!("DELETE FROM {table} WHERE rkey = ?");
73 sqlx::query(&delete)
74 .bind(rkey)
75 .execute(pool)
76 .await
77 .with_context(|| format!("failed to delete from {table}"))?;
78
79 Ok(domain.map(|(d,)| d))
80}
81
82/// Yield each candidate zone for a domain by walking up the tree.
83/// e.g. "a.b.example.com" → ["a.b.example.com", "b.example.com", "example.com"]
84pub(crate) fn domain_ancestors(domain: &str) -> impl Iterator<Item = String> + '_ {
85 let parts: Vec<&str> = domain.split('.').collect();
86 (0..parts.len().saturating_sub(1)).map(move |i| parts[i..].join("."))
87}
88
89/// Check if the user has a declared zone that covers the given domain.
90async fn has_zone_for(pool: &SqlitePool, domain: &str) -> Result<bool> {
91 for candidate in domain_ancestors(domain) {
92 let found: Option<(i64,)> =
93 sqlx::query_as("SELECT 1 FROM zones WHERE domain = ?")
94 .bind(&candidate)
95 .fetch_optional(pool)
96 .await?;
97 if found.is_some() {
98 return Ok(true);
99 }
100 }
101 Ok(false)
102}
103
104pub async fn handle_event(state: Arc<AppState>, event: TapEvent) -> Result<()> {
105 match event.event_type.as_str() {
106 "record" => {
107 let rec = event.record.context("record event missing record field")?;
108 let collection = match rec.collection.as_str() {
109 "systems.kiri.zone" => "zone",
110 "systems.kiri.dns" => "dns",
111 _ => return Ok(()),
112 };
113 let action = match rec.action {
114 RecordAction::Create => "create",
115 RecordAction::Update => "update",
116 RecordAction::Delete => "delete",
117 };
118
119 let result = match collection {
120 "zone" => handle_zone_event(state, rec).await,
121 _ => handle_record_event(state, rec).await,
122 };
123
124 let status = if result.is_ok() { "success" } else { "error" };
125 metrics::counter!(
126 "appview_firehose_events_total",
127 "collection" => collection,
128 "action" => action,
129 "status" => status,
130 )
131 .increment(1);
132
133 result
134 }
135 "identity" => {
136 if let Some(ident) = event.identity {
137 tracing::debug!(
138 did = %ident.did,
139 handle = %ident.handle,
140 active = ident.is_active,
141 "identity event"
142 );
143 }
144 Ok(())
145 }
146 other => {
147 tracing::debug!("ignoring tap event type: {other}");
148 Ok(())
149 }
150 }
151}
152
153async fn handle_record_event(state: Arc<AppState>, event: TapRecordEvent) -> Result<()> {
154 let did = &event.did;
155 let rkey = &event.rkey;
156
157 match event.action {
158 RecordAction::Create | RecordAction::Update => {
159 let record_value = event
160 .record
161 .context("create/update event missing record payload")?;
162
163 let domain = record_value
164 .get("domain")
165 .and_then(|v| v.as_str())
166 .context("record missing domain field")?
167 .to_lowercase();
168
169 let record_type = record_value
170 .get("record")
171 .and_then(|v| v.get("$type"))
172 .and_then(|v| v.as_str())
173 .context("record missing $type")?;
174
175 let type_name = record_type
176 .strip_prefix("systems.kiri.dns#")
177 .context("unexpected $type prefix")?
178 .trim_end_matches("Record");
179
180 let user_db = state.get_user_db(did).await?;
181
182 if !has_zone_for(&user_db, &domain).await? {
183 tracing::warn!(
184 did = %did,
185 domain = %domain,
186 "record for {domain} has no matching zone, skipping"
187 );
188 metrics::counter!("appview_records_skipped_no_zone_total").increment(1);
189 return Ok(());
190 }
191
192 let data = serde_json::to_string(&record_value)?;
193 let now = chrono::Utc::now().timestamp();
194
195 let write_start = Instant::now();
196 sqlx::query(
197 "INSERT INTO records (rkey, domain, record_type, data, created_at, updated_at)
198 VALUES (?, ?, ?, ?, ?, ?)
199 ON CONFLICT(rkey) DO UPDATE SET
200 domain = excluded.domain,
201 record_type = excluded.record_type,
202 data = excluded.data,
203 updated_at = excluded.updated_at",
204 )
205 .bind(rkey)
206 .bind(&domain)
207 .bind(type_name)
208 .bind(&data)
209 .bind(now)
210 .bind(now)
211 .execute(&user_db)
212 .await
213 .context("failed to upsert record")?;
214 metrics::histogram!("appview_sqlite_write_duration_seconds")
215 .record(write_start.elapsed().as_secs_f64());
216
217 tracing::info!(
218 did = %did,
219 rkey = %rkey,
220 domain = %domain,
221 record_type = %type_name,
222 live = event.live,
223 "record upserted"
224 );
225 }
226
227 RecordAction::Delete => {
228 let Ok(user_db) = state.get_user_db(did).await else {
229 tracing::debug!(did = %did, rkey = %rkey, "delete for unknown DID, skipping");
230 return Ok(());
231 };
232
233 let write_start = Instant::now();
234 let domain = delete_by_rkey(&user_db, "records", rkey).await?;
235 metrics::histogram!("appview_sqlite_write_duration_seconds")
236 .record(write_start.elapsed().as_secs_f64());
237
238 if let Some(domain) = domain {
239 tracing::info!(
240 did = %did,
241 rkey = %rkey,
242 domain = %domain,
243 live = event.live,
244 "record deleted"
245 );
246 }
247 }
248 }
249
250 Ok(())
251}
252
253async fn handle_zone_event(state: Arc<AppState>, event: TapRecordEvent) -> Result<()> {
254 let did = &event.did;
255 let rkey = &event.rkey;
256
257 match event.action {
258 RecordAction::Create | RecordAction::Update => {
259 let record_value = event
260 .record
261 .context("create/update zone event missing record payload")?;
262
263 let domain = record_value
264 .get("domain")
265 .and_then(|v| v.as_str())
266 .context("zone record missing domain field")?
267 .to_lowercase();
268
269 let now = chrono::Utc::now().timestamp();
270 let user_db = state.get_user_db(did).await?;
271
272 let write_start = Instant::now();
273 sqlx::query(
274 "INSERT INTO zones (rkey, domain, created_at)
275 VALUES (?, ?, ?)
276 ON CONFLICT(rkey) DO UPDATE SET domain = excluded.domain",
277 )
278 .bind(rkey)
279 .bind(&domain)
280 .bind(now)
281 .execute(&user_db)
282 .await
283 .context("failed to upsert zone")?;
284
285 sqlx::query(
286 "INSERT INTO zone_index (zone, did, first_seen)
287 VALUES (?, ?, ?)
288 ON CONFLICT(zone, did) DO NOTHING",
289 )
290 .bind(&domain)
291 .bind(did)
292 .bind(now)
293 .execute(&state.index)
294 .await
295 .context("failed to update zone index")?;
296 metrics::histogram!("appview_sqlite_write_duration_seconds")
297 .record(write_start.elapsed().as_secs_f64());
298
299 tracing::info!(
300 did = %did,
301 rkey = %rkey,
302 domain = %domain,
303 live = event.live,
304 "zone upserted"
305 );
306 }
307
308 RecordAction::Delete => {
309 let Ok(user_db) = state.get_user_db(did).await else {
310 tracing::debug!(did = %did, rkey = %rkey, "zone delete for unknown DID, skipping");
311 return Ok(());
312 };
313
314 let write_start = Instant::now();
315 let domain = delete_by_rkey(&user_db, "zones", rkey).await?;
316
317 if let Some(domain) = domain {
318 sqlx::query("DELETE FROM zone_index WHERE zone = ? AND did = ?")
319 .bind(&domain)
320 .bind(did)
321 .execute(&state.index)
322 .await
323 .context("failed to delete from zone index")?;
324 metrics::histogram!("appview_sqlite_write_duration_seconds")
325 .record(write_start.elapsed().as_secs_f64());
326
327 tracing::info!(
328 did = %did,
329 rkey = %rkey,
330 domain = %domain,
331 live = event.live,
332 "zone deleted"
333 );
334 }
335 }
336 }
337
338 Ok(())
339}