Highly ambitious ATProtocol AppView service and sdks
1//! Record CRUD operations and queries.
2//!
3//! This module handles all database operations related to ATProto records,
4//! including insertion, updates, deletion, and complex queries with filtering,
5//! sorting, and pagination.
6
7use super::client::Database;
8use super::cursor::{build_cursor_where_condition, decode_cursor, generate_cursor_from_record};
9use super::query_builder::{
10 bind_where_parameters, bind_where_parameters_scalar, build_order_by_clause_with_field_info,
11 build_where_conditions,
12};
13use super::types::{SortField, WhereClause};
14use crate::errors::DatabaseError;
15use crate::models::{IndexedRecord, Record};
16use sqlx::Row;
17
18impl Database {
19 /// Inserts a single record into the database.
20 ///
21 /// Uses ON CONFLICT to update existing records with matching URI and slice_uri.
22 pub async fn insert_record(&self, record: &Record) -> Result<(), DatabaseError> {
23 sqlx::query!(
24 r#"INSERT INTO "record" ("uri", "cid", "did", "collection", "json", "indexed_at", "slice_uri")
25 VALUES ($1, $2, $3, $4, $5, $6, $7)
26 ON CONFLICT ON CONSTRAINT record_pkey
27 DO UPDATE SET
28 "cid" = EXCLUDED."cid",
29 "json" = EXCLUDED."json",
30 "indexed_at" = EXCLUDED."indexed_at""#,
31 record.uri,
32 record.cid,
33 record.did,
34 record.collection,
35 record.json,
36 record.indexed_at,
37 record.slice_uri
38 )
39 .execute(&self.pool)
40 .await?;
41
42 Ok(())
43 }
44
45 /// Inserts multiple records in optimized batches.
46 ///
47 /// Automatically chunks records to stay within PostgreSQL parameter limits
48 /// (65536 parameters, ~8000 records per batch with 7 fields each).
49 pub async fn batch_insert_records(&self, records: &[Record]) -> Result<(), DatabaseError> {
50 if records.is_empty() {
51 return Ok(());
52 }
53
54 const BATCH_SIZE: usize = 8000;
55
56 for chunk in records.chunks(BATCH_SIZE) {
57 self.batch_insert_records_chunk(chunk).await?;
58 }
59
60 Ok(())
61 }
62
63 /// Internal helper to insert a single chunk of records.
64 async fn batch_insert_records_chunk(&self, records: &[Record]) -> Result<(), DatabaseError> {
65 let mut tx = self.pool.begin().await?;
66
67 let mut deduped = std::collections::HashMap::new();
68 for record in records {
69 let key = (&record.uri, &record.slice_uri);
70 deduped.insert(key, record);
71 }
72 let records: Vec<&Record> = deduped.into_values().collect();
73
74 let mut query = String::from(
75 r#"INSERT INTO "record" ("uri", "cid", "did", "collection", "json", "indexed_at", "slice_uri") VALUES "#,
76 );
77
78 for (i, _) in records.iter().enumerate() {
79 if i > 0 {
80 query.push_str(", ");
81 }
82 let base = i * 7 + 1;
83 query.push_str(&format!(
84 "(${}, ${}, ${}, ${}, ${}, ${}, ${})",
85 base,
86 base + 1,
87 base + 2,
88 base + 3,
89 base + 4,
90 base + 5,
91 base + 6
92 ));
93 }
94
95 query.push_str(
96 r#"
97 ON CONFLICT ON CONSTRAINT record_pkey
98 DO UPDATE SET
99 "cid" = EXCLUDED."cid",
100 "json" = EXCLUDED."json",
101 "indexed_at" = EXCLUDED."indexed_at"
102 "#,
103 );
104
105 let mut sqlx_query = sqlx::query(&query);
106 for record in records {
107 sqlx_query = sqlx_query
108 .bind(&record.uri)
109 .bind(&record.cid)
110 .bind(&record.did)
111 .bind(&record.collection)
112 .bind(&record.json)
113 .bind(record.indexed_at)
114 .bind(&record.slice_uri);
115 }
116
117 sqlx_query.execute(&mut *tx).await?;
118 tx.commit().await?;
119
120 Ok(())
121 }
122
123 /// Gets a map of existing record CIDs for a specific actor, collection, and slice.
124 ///
125 /// Used during sync to determine which records need updating vs inserting.
126 ///
127 /// # Returns
128 /// HashMap mapping URI -> CID
129 pub async fn get_existing_record_cids_for_slice(
130 &self,
131 did: &str,
132 collection: &str,
133 slice_uri: &str,
134 ) -> Result<std::collections::HashMap<String, String>, DatabaseError> {
135 let records = sqlx::query!(
136 r#"SELECT "uri", "cid"
137 FROM "record"
138 WHERE "did" = $1 AND "collection" = $2 AND "slice_uri" = $3"#,
139 did,
140 collection,
141 slice_uri
142 )
143 .fetch_all(&self.pool)
144 .await?;
145
146 let mut cid_map = std::collections::HashMap::new();
147 for record in records {
148 cid_map.insert(record.uri, record.cid);
149 }
150 Ok(cid_map)
151 }
152
153 /// Retrieves a single record by URI.
154 ///
155 /// # Returns
156 /// Some(IndexedRecord) if found, None otherwise
157 pub async fn get_record(&self, uri: &str) -> Result<Option<IndexedRecord>, DatabaseError> {
158 let record = sqlx::query_as::<_, Record>(
159 r#"SELECT "uri", "cid", "did", "collection", "json", "indexed_at", "slice_uri"
160 FROM "record"
161 WHERE "uri" = $1"#,
162 )
163 .bind(uri)
164 .fetch_optional(&self.pool)
165 .await?;
166
167 let indexed_record = record.map(|record| IndexedRecord {
168 uri: record.uri,
169 cid: record.cid,
170 did: record.did,
171 collection: record.collection,
172 value: record.json,
173 indexed_at: record.indexed_at.to_rfc3339(),
174 });
175
176 Ok(indexed_record)
177 }
178
179 /// Updates an existing record.
180 ///
181 /// Returns error if no record with matching URI and slice_uri exists.
182 pub async fn update_record(&self, record: &Record) -> Result<(), DatabaseError> {
183 let result = sqlx::query!(
184 r#"UPDATE "record"
185 SET "cid" = $1, "json" = $2, "indexed_at" = $3
186 WHERE "uri" = $4 AND "slice_uri" = $5"#,
187 record.cid,
188 record.json,
189 record.indexed_at,
190 record.uri,
191 record.slice_uri
192 )
193 .execute(&self.pool)
194 .await?;
195
196 if result.rows_affected() == 0 {
197 return Err(DatabaseError::RecordNotFound {
198 uri: record.uri.clone(),
199 });
200 }
201
202 Ok(())
203 }
204
205 /// Queries records for a slice with advanced filtering, sorting, and pagination.
206 ///
207 /// Supports:
208 /// - Cursor-based pagination
209 /// - Multi-field sorting (with JSON path support)
210 /// - Complex WHERE conditions (AND/OR, eq/in/contains operators)
211 /// - Automatic handling of lexicon records vs regular records
212 ///
213 /// # Returns
214 /// Tuple of (records, next_cursor)
215 pub async fn get_slice_collections_records(
216 &self,
217 slice_uri: &str,
218 limit: Option<i32>,
219 cursor: Option<&str>,
220 sort_by: Option<&Vec<SortField>>,
221 where_clause: Option<&WhereClause>,
222 ) -> Result<(Vec<Record>, Option<String>), DatabaseError> {
223 // Default to 50 for API requests, but support unlimited queries for DataLoader
224 let limit = limit.unwrap_or(50);
225
226 let mut where_clauses = Vec::new();
227 let mut param_count = 1;
228
229 // Extract collection name from where clause for lexicon lookup
230 let collection = where_clause
231 .as_ref()
232 .and_then(|wc| wc.conditions.get("collection"))
233 .and_then(|c| c.eq.as_ref())
234 .and_then(|v| v.as_str());
235
236 // Determine which sort fields are datetime fields
237 let field_types: Option<Vec<bool>> = if let Some(sort_fields) = sort_by {
238 if let Some(collection_name) = collection {
239 // Fetch lexicons to check field types for JSON fields
240 match self.get_lexicons_by_slice(slice_uri).await {
241 Ok(lexicons) => {
242 let types: Vec<bool> = sort_fields
243 .iter()
244 .map(|field| {
245 // indexed_at is always a datetime table column
246 if field.field == "indexed_at" {
247 true
248 } else {
249 is_field_datetime(&lexicons, collection_name, &field.field)
250 }
251 })
252 .collect();
253 Some(types)
254 }
255 Err(_) => {
256 // Fallback: mark indexed_at as datetime even without lexicons
257 let types: Vec<bool> = sort_fields
258 .iter()
259 .map(|field| field.field == "indexed_at")
260 .collect();
261 Some(types)
262 }
263 }
264 } else {
265 // No collection filter, but we can still identify table columns
266 let types: Vec<bool> = sort_fields
267 .iter()
268 .map(|field| field.field == "indexed_at")
269 .collect();
270 Some(types)
271 }
272 } else {
273 None
274 };
275
276 // Get first field type for ORDER BY (for backward compatibility)
277 let primary_field_is_datetime = field_types
278 .as_ref()
279 .and_then(|types| types.first().copied());
280
281 // Build ORDER BY clause with datetime field information
282 let order_by = build_order_by_clause_with_field_info(sort_by, primary_field_is_datetime);
283
284 let is_lexicon = where_clause
285 .as_ref()
286 .and_then(|wc| wc.conditions.get("collection"))
287 .and_then(|c| c.eq.as_ref())
288 .and_then(|v| v.as_str())
289 == Some("network.slices.lexicon");
290
291 // Check if where_clause already filters by "slice" field (for reverse joins)
292 let has_slice_filter = where_clause
293 .as_ref()
294 .map(|wc| wc.conditions.contains_key("slice"))
295 .unwrap_or(false);
296
297 if is_lexicon && !has_slice_filter {
298 // For lexicons without explicit slice filter, use the query slice_uri
299 where_clauses.push(format!("json->>'slice' = ${}", param_count));
300 param_count += 1;
301 } else if !is_lexicon {
302 // For non-lexicons, use slice_uri column
303 where_clauses.push(format!("slice_uri = ${}", param_count));
304 param_count += 1;
305 }
306
307 // Build all other WHERE conditions first (including collection filter)
308 // For non-lexicon records, exclude the 'slice' field since we handle it via slice_uri
309 let mut filtered_where_clause = None;
310 let filtered_clause;
311
312 if is_lexicon {
313 filtered_where_clause = where_clause;
314 } else if let Some(wc) = where_clause {
315 let mut filtered_conditions = std::collections::HashMap::new();
316 for (field, condition) in &wc.conditions {
317 if field != "slice" {
318 filtered_conditions.insert(field.clone(), condition.clone());
319 }
320 }
321
322 filtered_clause = WhereClause {
323 conditions: filtered_conditions,
324 or_conditions: wc.or_conditions.clone(),
325 and: wc.and.clone(),
326 or: wc.or.clone(),
327 };
328 filtered_where_clause = Some(&filtered_clause);
329 }
330
331 let (and_conditions, or_conditions) =
332 build_where_conditions(filtered_where_clause, &mut param_count);
333 where_clauses.extend(and_conditions);
334
335 // Add cursor conditions last to ensure proper parameter order
336 let mut cursor_bind_values = Vec::new();
337 if let Some(cursor_str) = cursor {
338 match decode_cursor(cursor_str, sort_by) {
339 Ok(decoded_cursor) => {
340 // Use the datetime field information we already computed
341 let field_type_slice = field_types.as_deref();
342 let (cursor_where, bind_values) = build_cursor_where_condition(
343 &decoded_cursor,
344 sort_by,
345 &mut param_count,
346 field_type_slice,
347 );
348 where_clauses.push(cursor_where);
349 cursor_bind_values = bind_values;
350 }
351 Err(e) => {
352 // Log the error but don't fail the request
353 eprintln!("Invalid cursor format: {}", e);
354 }
355 }
356 }
357
358 if !or_conditions.is_empty() {
359 let or_clause = format!("({})", or_conditions.join(" OR "));
360 where_clauses.push(or_clause);
361 }
362
363 let where_sql = where_clauses
364 .into_iter()
365 .filter(|clause| !clause.is_empty())
366 .collect::<Vec<_>>()
367 .join(" AND ");
368
369 // Assign limit parameter AFTER all other parameters
370 let limit_param = param_count;
371
372 let query = format!(
373 "SELECT uri, cid, did, collection, json, indexed_at, slice_uri
374 FROM record
375 WHERE {}
376 ORDER BY {}
377 LIMIT ${}",
378 where_sql, order_by, limit_param
379 );
380
381 let mut query_builder = sqlx::query_as::<_, Record>(&query);
382
383 // Bind slice_uri only if we added it to the query
384 if is_lexicon && !has_slice_filter {
385 query_builder = query_builder.bind(slice_uri);
386 } else if !is_lexicon {
387 query_builder = query_builder.bind(slice_uri);
388 }
389
390 // Bind WHERE condition parameters (including collection filter)
391 query_builder = bind_where_parameters(query_builder, filtered_where_clause);
392
393 // Bind cursor values after WHERE conditions
394 for cursor_value in cursor_bind_values {
395 query_builder = query_builder.bind(cursor_value);
396 }
397
398 query_builder = query_builder.bind(limit as i64);
399
400 let mut records = query_builder.fetch_all(&self.pool).await?;
401
402 // Deduplicate lexicon records by URI (same URI can exist with different slice_uri values)
403 if is_lexicon {
404 let mut seen_uris = std::collections::HashSet::new();
405 records.retain(|record| seen_uris.insert(record.uri.clone()));
406 }
407
408 // Only return cursor if we got a full page, indicating there might be more
409 let cursor = if records.len() < limit as usize {
410 None // Last page - no more results
411 } else {
412 records
413 .last()
414 .map(|record| generate_cursor_from_record(record, sort_by))
415 };
416
417 Ok((records, cursor))
418 }
419
420 /// Counts records matching the given criteria.
421 ///
422 /// Used for pagination metadata and statistics.
423 pub async fn count_slice_collections_records(
424 &self,
425 slice_uri: &str,
426 where_clause: Option<&WhereClause>,
427 ) -> Result<i64, DatabaseError> {
428 let mut where_clauses = Vec::new();
429 let mut param_count = 1;
430
431 let is_lexicon = where_clause
432 .as_ref()
433 .and_then(|wc| wc.conditions.get("collection"))
434 .and_then(|c| c.eq.as_ref())
435 .and_then(|v| v.as_str())
436 == Some("network.slices.lexicon");
437
438 // Check if where_clause already filters by "slice" field (for reverse joins)
439 let has_slice_filter = where_clause
440 .as_ref()
441 .map(|wc| wc.conditions.contains_key("slice"))
442 .unwrap_or(false);
443
444 if is_lexicon && !has_slice_filter {
445 // For lexicons without explicit slice filter, use the query slice_uri
446 where_clauses.push(format!("json->>'slice' = ${}", param_count));
447 param_count += 1;
448 } else if !is_lexicon {
449 // For non-lexicons, use slice_uri column
450 where_clauses.push(format!("slice_uri = ${}", param_count));
451 param_count += 1;
452 }
453
454 let (and_conditions, or_conditions) =
455 build_where_conditions(where_clause, &mut param_count);
456 where_clauses.extend(and_conditions);
457
458 if !or_conditions.is_empty() {
459 let or_clause = format!("({})", or_conditions.join(" OR "));
460 where_clauses.push(or_clause);
461 }
462
463 let filtered_where_clauses: Vec<_> = where_clauses
464 .into_iter()
465 .filter(|clause| !clause.is_empty())
466 .collect();
467 let where_sql = if filtered_where_clauses.is_empty() {
468 String::new()
469 } else {
470 format!(" WHERE {}", filtered_where_clauses.join(" AND "))
471 };
472
473 let query = format!("SELECT COUNT(*) as count FROM record{}", where_sql);
474
475 let mut query_builder = sqlx::query_scalar::<_, i64>(&query);
476
477 // Bind slice_uri only if we added it to the query
478 if is_lexicon && !has_slice_filter {
479 query_builder = query_builder.bind(slice_uri);
480 } else if !is_lexicon {
481 query_builder = query_builder.bind(slice_uri);
482 }
483
484 query_builder = bind_where_parameters_scalar(query_builder, where_clause);
485
486 let count = query_builder.fetch_one(&self.pool).await?;
487 Ok(count)
488 }
489
490 /// Queries aggregated records with GROUP BY support.
491 ///
492 /// # Arguments
493 /// * `slice_uri` - AT-URI of the slice to query
494 /// * `group_by_fields` - JSON paths to group by (e.g., ["releaseMbId", "releaseName"])
495 /// * `where_clause` - Optional WHERE conditions
496 /// * `order_by_count` - Optional ordering ("asc" or "desc")
497 /// * `limit` - Maximum number of groups to return
498 ///
499 /// # Returns
500 /// Vec of (field_values, count) tuples
501 pub async fn get_aggregated_records(
502 &self,
503 slice_uri: &str,
504 group_by_fields: &[crate::models::GroupByField],
505 where_clause: Option<&WhereClause>,
506 order_by_count: Option<&str>,
507 limit: Option<i32>,
508 ) -> Result<Vec<serde_json::Value>, DatabaseError> {
509 if group_by_fields.is_empty() {
510 return Ok(Vec::new());
511 }
512
513 let limit = limit.unwrap_or(50).min(1000);
514 let mut param_count = 1;
515
516 // Build SELECT clause with JSON field extraction and optional date truncation
517 let select_fields: Vec<String> = group_by_fields
518 .iter()
519 .enumerate()
520 .map(|(i, group_by_field)| {
521 match group_by_field {
522 crate::models::GroupByField::Simple(field) => {
523 // Check if it's a table column
524 if matches!(
525 field.as_str(),
526 "did" | "collection" | "uri" | "cid" | "indexed_at"
527 ) {
528 format!("\"{}\" as field_{}", field, i)
529 } else {
530 // JSON field
531 format!("json->>'{}' as field_{}", field, i)
532 }
533 }
534 crate::models::GroupByField::Truncated { field, interval } => {
535 // Date truncation using PostgreSQL's date_trunc function
536 let interval_str = interval.to_pg_interval();
537
538 // Check if it's a table column
539 if field == "indexed_at" {
540 format!(
541 "date_trunc('{}', \"{}\")::text as field_{}",
542 interval_str, field, i
543 )
544 } else {
545 // JSON field - cast to timestamp for date_trunc, then to text
546 format!(
547 "date_trunc('{}', (json->>'{}')::timestamp)::text as field_{}",
548 interval_str, field, i
549 )
550 }
551 }
552 }
553 })
554 .collect();
555
556 let select_clause = format!("{}, COUNT(*) as count", select_fields.join(", "));
557
558 // Build GROUP BY clause
559 let group_by_clause: Vec<String> = (0..group_by_fields.len())
560 .map(|i| format!("field_{}", i))
561 .collect();
562
563 // Build WHERE clause
564 let mut where_clauses = vec![format!("slice_uri = ${}", param_count)];
565 param_count += 1;
566
567 let (and_conditions, or_conditions) =
568 build_where_conditions(where_clause, &mut param_count);
569 where_clauses.extend(and_conditions);
570
571 if !or_conditions.is_empty() {
572 let or_clause = format!("({})", or_conditions.join(" OR "));
573 where_clauses.push(or_clause);
574 }
575
576 let where_sql = format!(" WHERE {}", where_clauses.join(" AND "));
577
578 // Build ORDER BY clause
579 let order_by_sql = match order_by_count {
580 Some("asc") => " ORDER BY count ASC",
581 Some("desc") | Some(_) | None => " ORDER BY count DESC",
582 };
583
584 let query = format!(
585 "SELECT {} FROM record{} GROUP BY {} {} LIMIT {}",
586 select_clause,
587 where_sql,
588 group_by_clause.join(", "),
589 order_by_sql,
590 limit
591 );
592
593 tracing::debug!("Generated SQL: {}", query);
594
595 let mut query_builder = sqlx::query(&query);
596 query_builder = query_builder.bind(slice_uri);
597
598 // Bind WHERE parameters manually
599 if let Some(clause) = where_clause {
600 for condition in clause.conditions.values() {
601 if let Some(eq_value) = &condition.eq {
602 if let Some(str_val) = eq_value.as_str() {
603 query_builder = query_builder.bind(str_val);
604 } else {
605 query_builder = query_builder.bind(eq_value.to_string());
606 }
607 }
608 if let Some(in_values) = &condition.in_values {
609 let str_values: Vec<String> = in_values
610 .iter()
611 .filter_map(|v| v.as_str().map(|s| s.to_string()))
612 .collect();
613 query_builder = query_builder.bind(str_values);
614 }
615 if let Some(contains_value) = &condition.contains {
616 query_builder = query_builder.bind(contains_value);
617 }
618 if let Some(gt_value) = &condition.gt {
619 if let Some(str_val) = gt_value.as_str() {
620 query_builder = query_builder.bind(str_val);
621 } else {
622 query_builder = query_builder.bind(gt_value.to_string());
623 }
624 }
625 if let Some(gte_value) = &condition.gte {
626 if let Some(str_val) = gte_value.as_str() {
627 query_builder = query_builder.bind(str_val);
628 } else {
629 query_builder = query_builder.bind(gte_value.to_string());
630 }
631 }
632 if let Some(lt_value) = &condition.lt {
633 if let Some(str_val) = lt_value.as_str() {
634 query_builder = query_builder.bind(str_val);
635 } else {
636 query_builder = query_builder.bind(lt_value.to_string());
637 }
638 }
639 if let Some(lte_value) = &condition.lte {
640 if let Some(str_val) = lte_value.as_str() {
641 query_builder = query_builder.bind(str_val);
642 } else {
643 query_builder = query_builder.bind(lte_value.to_string());
644 }
645 }
646 }
647
648 if let Some(or_conditions) = &clause.or_conditions {
649 for condition in or_conditions.values() {
650 if let Some(eq_value) = &condition.eq {
651 if let Some(str_val) = eq_value.as_str() {
652 query_builder = query_builder.bind(str_val);
653 } else {
654 query_builder = query_builder.bind(eq_value.to_string());
655 }
656 }
657 if let Some(in_values) = &condition.in_values {
658 let str_values: Vec<String> = in_values
659 .iter()
660 .filter_map(|v| v.as_str().map(|s| s.to_string()))
661 .collect();
662 query_builder = query_builder.bind(str_values);
663 }
664 if let Some(contains_value) = &condition.contains {
665 query_builder = query_builder.bind(contains_value);
666 }
667 if let Some(gt_value) = &condition.gt {
668 if let Some(str_val) = gt_value.as_str() {
669 query_builder = query_builder.bind(str_val);
670 } else {
671 query_builder = query_builder.bind(gt_value.to_string());
672 }
673 }
674 if let Some(gte_value) = &condition.gte {
675 if let Some(str_val) = gte_value.as_str() {
676 query_builder = query_builder.bind(str_val);
677 } else {
678 query_builder = query_builder.bind(gte_value.to_string());
679 }
680 }
681 if let Some(lt_value) = &condition.lt {
682 if let Some(str_val) = lt_value.as_str() {
683 query_builder = query_builder.bind(str_val);
684 } else {
685 query_builder = query_builder.bind(lt_value.to_string());
686 }
687 }
688 if let Some(lte_value) = &condition.lte {
689 if let Some(str_val) = lte_value.as_str() {
690 query_builder = query_builder.bind(str_val);
691 } else {
692 query_builder = query_builder.bind(lte_value.to_string());
693 }
694 }
695 }
696 }
697 }
698
699 let rows = query_builder.fetch_all(&self.pool).await?;
700
701 // Convert rows to JSON objects
702 let mut results = Vec::new();
703 for row in rows {
704 let mut obj = serde_json::Map::new();
705
706 // Extract grouped field values
707 for (i, group_by_field) in group_by_fields.iter().enumerate() {
708 let col_name = format!("field_{}", i);
709 let value: Option<String> = row.try_get(col_name.as_str()).ok();
710
711 // Try to parse as JSON first (for arrays/objects), otherwise use as string
712 let json_value = if let Some(ref str_val) = value {
713 // Check if it looks like JSON (starts with [ or {)
714 if str_val.starts_with('[') || str_val.starts_with('{') {
715 // Try to parse as JSON
716 serde_json::from_str(str_val)
717 .unwrap_or_else(|_| serde_json::Value::String(str_val.clone()))
718 } else {
719 serde_json::Value::String(str_val.clone())
720 }
721 } else {
722 serde_json::Value::Null
723 };
724
725 obj.insert(group_by_field.field_name().to_string(), json_value);
726 }
727
728 // Extract count
729 let count: i64 = row.try_get("count").unwrap_or(0);
730 obj.insert("count".to_string(), serde_json::Value::Number(count.into()));
731
732 results.push(serde_json::Value::Object(obj));
733 }
734
735 Ok(results)
736 }
737
738 /// Deletes a record by URI.
739 ///
740 /// If slice_uri is provided, only deletes from that slice.
741 /// Otherwise deletes from all slices.
742 ///
743 /// # Returns
744 /// Number of rows affected
745 pub async fn delete_record_by_uri(
746 &self,
747 uri: &str,
748 slice_uri: Option<&str>,
749 ) -> Result<u64, DatabaseError> {
750 let result = if let Some(slice_uri) = slice_uri {
751 sqlx::query("DELETE FROM record WHERE uri = $1 AND slice_uri = $2")
752 .bind(uri)
753 .bind(slice_uri)
754 .execute(&self.pool)
755 .await?
756 } else {
757 sqlx::query("DELETE FROM record WHERE uri = $1")
758 .bind(uri)
759 .execute(&self.pool)
760 .await?
761 };
762 Ok(result.rows_affected())
763 }
764
765 /// Deletes all records for a specific slice.
766 ///
767 /// This is a destructive operation that removes all indexed records
768 /// from the specified slice. Records can be recovered by re-syncing.
769 ///
770 /// # Arguments
771 /// * `slice_uri` - AT-URI of the slice to clear
772 ///
773 /// # Returns
774 /// Number of records deleted
775 pub async fn delete_all_records_for_slice(
776 &self,
777 slice_uri: &str,
778 ) -> Result<u64, DatabaseError> {
779 let result = sqlx::query(
780 "DELETE FROM record WHERE slice_uri = $1 AND collection NOT LIKE 'network.slices.%'",
781 )
782 .bind(slice_uri)
783 .execute(&self.pool)
784 .await?;
785 Ok(result.rows_affected())
786 }
787
788 /// Deletes all records of a specific collection from a slice.
789 ///
790 /// Used when a lexicon is deleted to clean up all records of that type.
791 ///
792 /// # Arguments
793 /// * `slice_uri` - AT-URI of the slice
794 /// * `collection` - Collection name (NSID) to delete
795 ///
796 /// # Returns
797 /// Number of records deleted
798 pub async fn delete_records_by_collection(
799 &self,
800 slice_uri: &str,
801 collection: &str,
802 ) -> Result<u64, DatabaseError> {
803 let result = sqlx::query("DELETE FROM record WHERE slice_uri = $1 AND collection = $2")
804 .bind(slice_uri)
805 .bind(collection)
806 .execute(&self.pool)
807 .await?;
808 Ok(result.rows_affected())
809 }
810
811 /// Handles cascade deletion based on record type.
812 ///
813 /// When certain records are deleted, related data should be cleaned up:
814 /// - Lexicon deletion: removes all records of that collection type
815 /// - Slice deletion: removes all records and actors for that slice
816 ///
817 /// # Arguments
818 /// * `uri` - AT-URI of the deleted record
819 /// * `collection` - Collection name (e.g., "network.slices.lexicon")
820 pub async fn handle_cascade_deletion(
821 &self,
822 uri: &str,
823 collection: &str,
824 ) -> Result<(), DatabaseError> {
825 match collection {
826 "network.slices.lexicon" => {
827 // Get the lexicon record to extract collection name and slice URI
828 if let Ok(Some(lexicon_record)) = self.get_record(uri).await
829 && let (Some(nsid), Some(slice_uri_from_record)) = (
830 lexicon_record.value.get("nsid").and_then(|v| v.as_str()),
831 lexicon_record.value.get("slice").and_then(|v| v.as_str()),
832 )
833 {
834 // Delete all records of this collection type from the slice
835 let deleted = self
836 .delete_records_by_collection(slice_uri_from_record, nsid)
837 .await?;
838 tracing::info!(
839 "Cascade delete: removed {} records of collection {} from slice {}",
840 deleted,
841 nsid,
842 slice_uri_from_record
843 );
844 }
845 }
846 "network.slices.slice" => {
847 // The URI itself is the slice URI
848 let slice_uri = uri;
849
850 // Delete all records for this slice
851 let records_deleted = self.delete_all_records_for_slice(slice_uri).await?;
852 tracing::info!(
853 "Cascade delete: removed {} records from slice {}",
854 records_deleted,
855 slice_uri
856 );
857
858 // Delete all actors for this slice
859 let actors_deleted =
860 super::client::Database::delete_all_actors_for_slice(self, slice_uri).await?;
861 tracing::info!(
862 "Cascade delete: removed {} actors from slice {}",
863 actors_deleted,
864 slice_uri
865 );
866 }
867 _ => {
868 // No cascade deletion needed for other collections
869 }
870 }
871 Ok(())
872 }
873
874 /// Inserts or updates a record atomically.
875 ///
876 /// # Returns
877 /// true if inserted (new record), false if updated (existing record)
878 pub async fn upsert_record(&self, record: &Record) -> Result<bool, DatabaseError> {
879 let result = sqlx::query_scalar::<_, bool>(
880 r#"
881 INSERT INTO record (uri, cid, did, collection, json, indexed_at, slice_uri)
882 VALUES ($1, $2, $3, $4, $5, $6, $7)
883 ON CONFLICT ON CONSTRAINT record_pkey DO UPDATE
884 SET cid = EXCLUDED.cid,
885 json = EXCLUDED.json,
886 indexed_at = EXCLUDED.indexed_at
887 RETURNING (xmax = 0)
888 "#,
889 )
890 .bind(&record.uri)
891 .bind(&record.cid)
892 .bind(&record.did)
893 .bind(&record.collection)
894 .bind(&record.json)
895 .bind(record.indexed_at)
896 .bind(&record.slice_uri)
897 .fetch_one(&self.pool)
898 .await?;
899 Ok(result)
900 }
901
902 /// Gets lexicon definitions for a specific slice.
903 ///
904 /// Filters for network.slices.lexicon records and transforms them
905 /// into the lexicon JSON format expected by the lexicon parser.
906 pub async fn get_lexicons_by_slice(
907 &self,
908 slice_uri: &str,
909 ) -> Result<Vec<serde_json::Value>, DatabaseError> {
910 let records = sqlx::query_as::<_, Record>(
911 r#"SELECT "uri", "cid", "did", "collection", "json", "indexed_at", "slice_uri"
912 FROM "record"
913 WHERE "collection" = 'network.slices.lexicon'
914 AND "json"->>'slice' = $1
915 ORDER BY "indexed_at" DESC"#,
916 )
917 .bind(slice_uri)
918 .fetch_all(&self.pool)
919 .await?;
920
921 let lexicon_definitions: Vec<serde_json::Value> = records
922 .into_iter()
923 .filter_map(|record| {
924 let nsid = record.json.get("nsid")?.as_str()?;
925 let definitions_str = record.json.get("definitions")?.as_str()?;
926 let definitions: serde_json::Value = serde_json::from_str(definitions_str).ok()?;
927
928 Some(serde_json::json!({
929 "lexicon": 1,
930 "id": nsid,
931 "defs": definitions
932 }))
933 })
934 .collect();
935
936 Ok(lexicon_definitions)
937 }
938}
939
940/// Helper function to check if a field is a datetime field in the lexicon
941fn is_field_datetime(lexicons: &[serde_json::Value], collection: &str, field: &str) -> bool {
942 for lexicon in lexicons {
943 if let Some(id) = lexicon.get("id").and_then(|v| v.as_str())
944 && id == collection
945 && let Some(defs) = lexicon.get("defs")
946 && let Some(main) = defs.get("main")
947 && let Some(record) = main.get("record")
948 && let Some(properties) = record.get("properties")
949 && let Some(field_def) = properties.get(field)
950 && let Some(format) = field_def.get("format").and_then(|v| v.as_str())
951 {
952 return format == "datetime";
953 }
954 }
955 false
956}
957
958#[cfg(test)]
959mod tests {
960 use super::*;
961
962 #[test]
963 fn test_is_field_datetime_found() {
964 let lexicons = vec![serde_json::json!({
965 "lexicon": 1,
966 "id": "app.bsky.feed.post",
967 "defs": {
968 "main": {
969 "record": {
970 "properties": {
971 "createdAt": {
972 "type": "string",
973 "format": "datetime"
974 },
975 "text": {
976 "type": "string"
977 }
978 }
979 }
980 }
981 }
982 })];
983
984 assert!(is_field_datetime(
985 &lexicons,
986 "app.bsky.feed.post",
987 "createdAt"
988 ));
989 }
990
991 #[test]
992 fn test_is_field_datetime_not_datetime() {
993 let lexicons = vec![serde_json::json!({
994 "lexicon": 1,
995 "id": "app.bsky.feed.post",
996 "defs": {
997 "main": {
998 "record": {
999 "properties": {
1000 "text": {
1001 "type": "string"
1002 }
1003 }
1004 }
1005 }
1006 }
1007 })];
1008
1009 assert!(!is_field_datetime(&lexicons, "app.bsky.feed.post", "text"));
1010 }
1011
1012 #[test]
1013 fn test_is_field_datetime_missing_field() {
1014 let lexicons = vec![serde_json::json!({
1015 "lexicon": 1,
1016 "id": "app.bsky.feed.post",
1017 "defs": {
1018 "main": {
1019 "record": {
1020 "properties": {
1021 "text": {
1022 "type": "string"
1023 }
1024 }
1025 }
1026 }
1027 }
1028 })];
1029
1030 assert!(!is_field_datetime(
1031 &lexicons,
1032 "app.bsky.feed.post",
1033 "nonexistent"
1034 ));
1035 }
1036
1037 #[test]
1038 fn test_is_field_datetime_wrong_collection() {
1039 let lexicons = vec![serde_json::json!({
1040 "lexicon": 1,
1041 "id": "app.bsky.feed.post",
1042 "defs": {
1043 "main": {
1044 "record": {
1045 "properties": {
1046 "createdAt": {
1047 "type": "string",
1048 "format": "datetime"
1049 }
1050 }
1051 }
1052 }
1053 }
1054 })];
1055
1056 assert!(!is_field_datetime(
1057 &lexicons,
1058 "app.bsky.actor.profile",
1059 "createdAt"
1060 ));
1061 }
1062
1063 #[test]
1064 fn test_is_field_datetime_multiple_lexicons() {
1065 let lexicons = vec![
1066 serde_json::json!({
1067 "lexicon": 1,
1068 "id": "app.bsky.feed.post",
1069 "defs": {
1070 "main": {
1071 "record": {
1072 "properties": {
1073 "text": {
1074 "type": "string"
1075 }
1076 }
1077 }
1078 }
1079 }
1080 }),
1081 serde_json::json!({
1082 "lexicon": 1,
1083 "id": "app.bsky.actor.profile",
1084 "defs": {
1085 "main": {
1086 "record": {
1087 "properties": {
1088 "createdAt": {
1089 "type": "string",
1090 "format": "datetime"
1091 }
1092 }
1093 }
1094 }
1095 }
1096 }),
1097 ];
1098
1099 assert!(is_field_datetime(
1100 &lexicons,
1101 "app.bsky.actor.profile",
1102 "createdAt"
1103 ));
1104 assert!(!is_field_datetime(&lexicons, "app.bsky.feed.post", "text"));
1105 }
1106}