Highly ambitious ATProtocol AppView service and sdks
at main 40 kB view raw
1//! Record CRUD operations and queries. 2//! 3//! This module handles all database operations related to ATProto records, 4//! including insertion, updates, deletion, and complex queries with filtering, 5//! sorting, and pagination. 6 7use super::client::Database; 8use super::cursor::{build_cursor_where_condition, decode_cursor, generate_cursor_from_record}; 9use super::query_builder::{ 10 bind_where_parameters, bind_where_parameters_scalar, build_order_by_clause_with_field_info, 11 build_where_conditions, 12}; 13use super::types::{SortField, WhereClause}; 14use crate::errors::DatabaseError; 15use crate::models::{IndexedRecord, Record}; 16use sqlx::Row; 17 18impl Database { 19 /// Inserts a single record into the database. 20 /// 21 /// Uses ON CONFLICT to update existing records with matching URI and slice_uri. 22 pub async fn insert_record(&self, record: &Record) -> Result<(), DatabaseError> { 23 sqlx::query!( 24 r#"INSERT INTO "record" ("uri", "cid", "did", "collection", "json", "indexed_at", "slice_uri") 25 VALUES ($1, $2, $3, $4, $5, $6, $7) 26 ON CONFLICT ON CONSTRAINT record_pkey 27 DO UPDATE SET 28 "cid" = EXCLUDED."cid", 29 "json" = EXCLUDED."json", 30 "indexed_at" = EXCLUDED."indexed_at""#, 31 record.uri, 32 record.cid, 33 record.did, 34 record.collection, 35 record.json, 36 record.indexed_at, 37 record.slice_uri 38 ) 39 .execute(&self.pool) 40 .await?; 41 42 Ok(()) 43 } 44 45 /// Inserts multiple records in optimized batches. 46 /// 47 /// Automatically chunks records to stay within PostgreSQL parameter limits 48 /// (65536 parameters, ~8000 records per batch with 7 fields each). 49 pub async fn batch_insert_records(&self, records: &[Record]) -> Result<(), DatabaseError> { 50 if records.is_empty() { 51 return Ok(()); 52 } 53 54 const BATCH_SIZE: usize = 8000; 55 56 for chunk in records.chunks(BATCH_SIZE) { 57 self.batch_insert_records_chunk(chunk).await?; 58 } 59 60 Ok(()) 61 } 62 63 /// Internal helper to insert a single chunk of records. 64 async fn batch_insert_records_chunk(&self, records: &[Record]) -> Result<(), DatabaseError> { 65 let mut tx = self.pool.begin().await?; 66 67 let mut deduped = std::collections::HashMap::new(); 68 for record in records { 69 let key = (&record.uri, &record.slice_uri); 70 deduped.insert(key, record); 71 } 72 let records: Vec<&Record> = deduped.into_values().collect(); 73 74 let mut query = String::from( 75 r#"INSERT INTO "record" ("uri", "cid", "did", "collection", "json", "indexed_at", "slice_uri") VALUES "#, 76 ); 77 78 for (i, _) in records.iter().enumerate() { 79 if i > 0 { 80 query.push_str(", "); 81 } 82 let base = i * 7 + 1; 83 query.push_str(&format!( 84 "(${}, ${}, ${}, ${}, ${}, ${}, ${})", 85 base, 86 base + 1, 87 base + 2, 88 base + 3, 89 base + 4, 90 base + 5, 91 base + 6 92 )); 93 } 94 95 query.push_str( 96 r#" 97 ON CONFLICT ON CONSTRAINT record_pkey 98 DO UPDATE SET 99 "cid" = EXCLUDED."cid", 100 "json" = EXCLUDED."json", 101 "indexed_at" = EXCLUDED."indexed_at" 102 "#, 103 ); 104 105 let mut sqlx_query = sqlx::query(&query); 106 for record in records { 107 sqlx_query = sqlx_query 108 .bind(&record.uri) 109 .bind(&record.cid) 110 .bind(&record.did) 111 .bind(&record.collection) 112 .bind(&record.json) 113 .bind(record.indexed_at) 114 .bind(&record.slice_uri); 115 } 116 117 sqlx_query.execute(&mut *tx).await?; 118 tx.commit().await?; 119 120 Ok(()) 121 } 122 123 /// Gets a map of existing record CIDs for a specific actor, collection, and slice. 124 /// 125 /// Used during sync to determine which records need updating vs inserting. 126 /// 127 /// # Returns 128 /// HashMap mapping URI -> CID 129 pub async fn get_existing_record_cids_for_slice( 130 &self, 131 did: &str, 132 collection: &str, 133 slice_uri: &str, 134 ) -> Result<std::collections::HashMap<String, String>, DatabaseError> { 135 let records = sqlx::query!( 136 r#"SELECT "uri", "cid" 137 FROM "record" 138 WHERE "did" = $1 AND "collection" = $2 AND "slice_uri" = $3"#, 139 did, 140 collection, 141 slice_uri 142 ) 143 .fetch_all(&self.pool) 144 .await?; 145 146 let mut cid_map = std::collections::HashMap::new(); 147 for record in records { 148 cid_map.insert(record.uri, record.cid); 149 } 150 Ok(cid_map) 151 } 152 153 /// Retrieves a single record by URI. 154 /// 155 /// # Returns 156 /// Some(IndexedRecord) if found, None otherwise 157 pub async fn get_record(&self, uri: &str) -> Result<Option<IndexedRecord>, DatabaseError> { 158 let record = sqlx::query_as::<_, Record>( 159 r#"SELECT "uri", "cid", "did", "collection", "json", "indexed_at", "slice_uri" 160 FROM "record" 161 WHERE "uri" = $1"#, 162 ) 163 .bind(uri) 164 .fetch_optional(&self.pool) 165 .await?; 166 167 let indexed_record = record.map(|record| IndexedRecord { 168 uri: record.uri, 169 cid: record.cid, 170 did: record.did, 171 collection: record.collection, 172 value: record.json, 173 indexed_at: record.indexed_at.to_rfc3339(), 174 }); 175 176 Ok(indexed_record) 177 } 178 179 /// Updates an existing record. 180 /// 181 /// Returns error if no record with matching URI and slice_uri exists. 182 pub async fn update_record(&self, record: &Record) -> Result<(), DatabaseError> { 183 let result = sqlx::query!( 184 r#"UPDATE "record" 185 SET "cid" = $1, "json" = $2, "indexed_at" = $3 186 WHERE "uri" = $4 AND "slice_uri" = $5"#, 187 record.cid, 188 record.json, 189 record.indexed_at, 190 record.uri, 191 record.slice_uri 192 ) 193 .execute(&self.pool) 194 .await?; 195 196 if result.rows_affected() == 0 { 197 return Err(DatabaseError::RecordNotFound { 198 uri: record.uri.clone(), 199 }); 200 } 201 202 Ok(()) 203 } 204 205 /// Queries records for a slice with advanced filtering, sorting, and pagination. 206 /// 207 /// Supports: 208 /// - Cursor-based pagination 209 /// - Multi-field sorting (with JSON path support) 210 /// - Complex WHERE conditions (AND/OR, eq/in/contains operators) 211 /// - Automatic handling of lexicon records vs regular records 212 /// 213 /// # Returns 214 /// Tuple of (records, next_cursor) 215 pub async fn get_slice_collections_records( 216 &self, 217 slice_uri: &str, 218 limit: Option<i32>, 219 cursor: Option<&str>, 220 sort_by: Option<&Vec<SortField>>, 221 where_clause: Option<&WhereClause>, 222 ) -> Result<(Vec<Record>, Option<String>), DatabaseError> { 223 // Default to 50 for API requests, but support unlimited queries for DataLoader 224 let limit = limit.unwrap_or(50); 225 226 let mut where_clauses = Vec::new(); 227 let mut param_count = 1; 228 229 // Extract collection name from where clause for lexicon lookup 230 let collection = where_clause 231 .as_ref() 232 .and_then(|wc| wc.conditions.get("collection")) 233 .and_then(|c| c.eq.as_ref()) 234 .and_then(|v| v.as_str()); 235 236 // Determine which sort fields are datetime fields 237 let field_types: Option<Vec<bool>> = if let Some(sort_fields) = sort_by { 238 if let Some(collection_name) = collection { 239 // Fetch lexicons to check field types for JSON fields 240 match self.get_lexicons_by_slice(slice_uri).await { 241 Ok(lexicons) => { 242 let types: Vec<bool> = sort_fields 243 .iter() 244 .map(|field| { 245 // indexed_at is always a datetime table column 246 if field.field == "indexed_at" { 247 true 248 } else { 249 is_field_datetime(&lexicons, collection_name, &field.field) 250 } 251 }) 252 .collect(); 253 Some(types) 254 } 255 Err(_) => { 256 // Fallback: mark indexed_at as datetime even without lexicons 257 let types: Vec<bool> = sort_fields 258 .iter() 259 .map(|field| field.field == "indexed_at") 260 .collect(); 261 Some(types) 262 } 263 } 264 } else { 265 // No collection filter, but we can still identify table columns 266 let types: Vec<bool> = sort_fields 267 .iter() 268 .map(|field| field.field == "indexed_at") 269 .collect(); 270 Some(types) 271 } 272 } else { 273 None 274 }; 275 276 // Get first field type for ORDER BY (for backward compatibility) 277 let primary_field_is_datetime = field_types 278 .as_ref() 279 .and_then(|types| types.first().copied()); 280 281 // Build ORDER BY clause with datetime field information 282 let order_by = build_order_by_clause_with_field_info(sort_by, primary_field_is_datetime); 283 284 let is_lexicon = where_clause 285 .as_ref() 286 .and_then(|wc| wc.conditions.get("collection")) 287 .and_then(|c| c.eq.as_ref()) 288 .and_then(|v| v.as_str()) 289 == Some("network.slices.lexicon"); 290 291 // Check if where_clause already filters by "slice" field (for reverse joins) 292 let has_slice_filter = where_clause 293 .as_ref() 294 .map(|wc| wc.conditions.contains_key("slice")) 295 .unwrap_or(false); 296 297 if is_lexicon && !has_slice_filter { 298 // For lexicons without explicit slice filter, use the query slice_uri 299 where_clauses.push(format!("json->>'slice' = ${}", param_count)); 300 param_count += 1; 301 } else if !is_lexicon { 302 // For non-lexicons, use slice_uri column 303 where_clauses.push(format!("slice_uri = ${}", param_count)); 304 param_count += 1; 305 } 306 307 // Build all other WHERE conditions first (including collection filter) 308 // For non-lexicon records, exclude the 'slice' field since we handle it via slice_uri 309 let mut filtered_where_clause = None; 310 let filtered_clause; 311 312 if is_lexicon { 313 filtered_where_clause = where_clause; 314 } else if let Some(wc) = where_clause { 315 let mut filtered_conditions = std::collections::HashMap::new(); 316 for (field, condition) in &wc.conditions { 317 if field != "slice" { 318 filtered_conditions.insert(field.clone(), condition.clone()); 319 } 320 } 321 322 filtered_clause = WhereClause { 323 conditions: filtered_conditions, 324 or_conditions: wc.or_conditions.clone(), 325 and: wc.and.clone(), 326 or: wc.or.clone(), 327 }; 328 filtered_where_clause = Some(&filtered_clause); 329 } 330 331 let (and_conditions, or_conditions) = 332 build_where_conditions(filtered_where_clause, &mut param_count); 333 where_clauses.extend(and_conditions); 334 335 // Add cursor conditions last to ensure proper parameter order 336 let mut cursor_bind_values = Vec::new(); 337 if let Some(cursor_str) = cursor { 338 match decode_cursor(cursor_str, sort_by) { 339 Ok(decoded_cursor) => { 340 // Use the datetime field information we already computed 341 let field_type_slice = field_types.as_deref(); 342 let (cursor_where, bind_values) = build_cursor_where_condition( 343 &decoded_cursor, 344 sort_by, 345 &mut param_count, 346 field_type_slice, 347 ); 348 where_clauses.push(cursor_where); 349 cursor_bind_values = bind_values; 350 } 351 Err(e) => { 352 // Log the error but don't fail the request 353 eprintln!("Invalid cursor format: {}", e); 354 } 355 } 356 } 357 358 if !or_conditions.is_empty() { 359 let or_clause = format!("({})", or_conditions.join(" OR ")); 360 where_clauses.push(or_clause); 361 } 362 363 let where_sql = where_clauses 364 .into_iter() 365 .filter(|clause| !clause.is_empty()) 366 .collect::<Vec<_>>() 367 .join(" AND "); 368 369 // Assign limit parameter AFTER all other parameters 370 let limit_param = param_count; 371 372 let query = format!( 373 "SELECT uri, cid, did, collection, json, indexed_at, slice_uri 374 FROM record 375 WHERE {} 376 ORDER BY {} 377 LIMIT ${}", 378 where_sql, order_by, limit_param 379 ); 380 381 let mut query_builder = sqlx::query_as::<_, Record>(&query); 382 383 // Bind slice_uri only if we added it to the query 384 if is_lexicon && !has_slice_filter { 385 query_builder = query_builder.bind(slice_uri); 386 } else if !is_lexicon { 387 query_builder = query_builder.bind(slice_uri); 388 } 389 390 // Bind WHERE condition parameters (including collection filter) 391 query_builder = bind_where_parameters(query_builder, filtered_where_clause); 392 393 // Bind cursor values after WHERE conditions 394 for cursor_value in cursor_bind_values { 395 query_builder = query_builder.bind(cursor_value); 396 } 397 398 query_builder = query_builder.bind(limit as i64); 399 400 let mut records = query_builder.fetch_all(&self.pool).await?; 401 402 // Deduplicate lexicon records by URI (same URI can exist with different slice_uri values) 403 if is_lexicon { 404 let mut seen_uris = std::collections::HashSet::new(); 405 records.retain(|record| seen_uris.insert(record.uri.clone())); 406 } 407 408 // Only return cursor if we got a full page, indicating there might be more 409 let cursor = if records.len() < limit as usize { 410 None // Last page - no more results 411 } else { 412 records 413 .last() 414 .map(|record| generate_cursor_from_record(record, sort_by)) 415 }; 416 417 Ok((records, cursor)) 418 } 419 420 /// Counts records matching the given criteria. 421 /// 422 /// Used for pagination metadata and statistics. 423 pub async fn count_slice_collections_records( 424 &self, 425 slice_uri: &str, 426 where_clause: Option<&WhereClause>, 427 ) -> Result<i64, DatabaseError> { 428 let mut where_clauses = Vec::new(); 429 let mut param_count = 1; 430 431 let is_lexicon = where_clause 432 .as_ref() 433 .and_then(|wc| wc.conditions.get("collection")) 434 .and_then(|c| c.eq.as_ref()) 435 .and_then(|v| v.as_str()) 436 == Some("network.slices.lexicon"); 437 438 // Check if where_clause already filters by "slice" field (for reverse joins) 439 let has_slice_filter = where_clause 440 .as_ref() 441 .map(|wc| wc.conditions.contains_key("slice")) 442 .unwrap_or(false); 443 444 if is_lexicon && !has_slice_filter { 445 // For lexicons without explicit slice filter, use the query slice_uri 446 where_clauses.push(format!("json->>'slice' = ${}", param_count)); 447 param_count += 1; 448 } else if !is_lexicon { 449 // For non-lexicons, use slice_uri column 450 where_clauses.push(format!("slice_uri = ${}", param_count)); 451 param_count += 1; 452 } 453 454 let (and_conditions, or_conditions) = 455 build_where_conditions(where_clause, &mut param_count); 456 where_clauses.extend(and_conditions); 457 458 if !or_conditions.is_empty() { 459 let or_clause = format!("({})", or_conditions.join(" OR ")); 460 where_clauses.push(or_clause); 461 } 462 463 let filtered_where_clauses: Vec<_> = where_clauses 464 .into_iter() 465 .filter(|clause| !clause.is_empty()) 466 .collect(); 467 let where_sql = if filtered_where_clauses.is_empty() { 468 String::new() 469 } else { 470 format!(" WHERE {}", filtered_where_clauses.join(" AND ")) 471 }; 472 473 let query = format!("SELECT COUNT(*) as count FROM record{}", where_sql); 474 475 let mut query_builder = sqlx::query_scalar::<_, i64>(&query); 476 477 // Bind slice_uri only if we added it to the query 478 if is_lexicon && !has_slice_filter { 479 query_builder = query_builder.bind(slice_uri); 480 } else if !is_lexicon { 481 query_builder = query_builder.bind(slice_uri); 482 } 483 484 query_builder = bind_where_parameters_scalar(query_builder, where_clause); 485 486 let count = query_builder.fetch_one(&self.pool).await?; 487 Ok(count) 488 } 489 490 /// Queries aggregated records with GROUP BY support. 491 /// 492 /// # Arguments 493 /// * `slice_uri` - AT-URI of the slice to query 494 /// * `group_by_fields` - JSON paths to group by (e.g., ["releaseMbId", "releaseName"]) 495 /// * `where_clause` - Optional WHERE conditions 496 /// * `order_by_count` - Optional ordering ("asc" or "desc") 497 /// * `limit` - Maximum number of groups to return 498 /// 499 /// # Returns 500 /// Vec of (field_values, count) tuples 501 pub async fn get_aggregated_records( 502 &self, 503 slice_uri: &str, 504 group_by_fields: &[crate::models::GroupByField], 505 where_clause: Option<&WhereClause>, 506 order_by_count: Option<&str>, 507 limit: Option<i32>, 508 ) -> Result<Vec<serde_json::Value>, DatabaseError> { 509 if group_by_fields.is_empty() { 510 return Ok(Vec::new()); 511 } 512 513 let limit = limit.unwrap_or(50).min(1000); 514 let mut param_count = 1; 515 516 // Build SELECT clause with JSON field extraction and optional date truncation 517 let select_fields: Vec<String> = group_by_fields 518 .iter() 519 .enumerate() 520 .map(|(i, group_by_field)| { 521 match group_by_field { 522 crate::models::GroupByField::Simple(field) => { 523 // Check if it's a table column 524 if matches!( 525 field.as_str(), 526 "did" | "collection" | "uri" | "cid" | "indexed_at" 527 ) { 528 format!("\"{}\" as field_{}", field, i) 529 } else { 530 // JSON field 531 format!("json->>'{}' as field_{}", field, i) 532 } 533 } 534 crate::models::GroupByField::Truncated { field, interval } => { 535 // Date truncation using PostgreSQL's date_trunc function 536 let interval_str = interval.to_pg_interval(); 537 538 // Check if it's a table column 539 if field == "indexed_at" { 540 format!( 541 "date_trunc('{}', \"{}\")::text as field_{}", 542 interval_str, field, i 543 ) 544 } else { 545 // JSON field - cast to timestamp for date_trunc, then to text 546 format!( 547 "date_trunc('{}', (json->>'{}')::timestamp)::text as field_{}", 548 interval_str, field, i 549 ) 550 } 551 } 552 } 553 }) 554 .collect(); 555 556 let select_clause = format!("{}, COUNT(*) as count", select_fields.join(", ")); 557 558 // Build GROUP BY clause 559 let group_by_clause: Vec<String> = (0..group_by_fields.len()) 560 .map(|i| format!("field_{}", i)) 561 .collect(); 562 563 // Build WHERE clause 564 let mut where_clauses = vec![format!("slice_uri = ${}", param_count)]; 565 param_count += 1; 566 567 let (and_conditions, or_conditions) = 568 build_where_conditions(where_clause, &mut param_count); 569 where_clauses.extend(and_conditions); 570 571 if !or_conditions.is_empty() { 572 let or_clause = format!("({})", or_conditions.join(" OR ")); 573 where_clauses.push(or_clause); 574 } 575 576 let where_sql = format!(" WHERE {}", where_clauses.join(" AND ")); 577 578 // Build ORDER BY clause 579 let order_by_sql = match order_by_count { 580 Some("asc") => " ORDER BY count ASC", 581 Some("desc") | Some(_) | None => " ORDER BY count DESC", 582 }; 583 584 let query = format!( 585 "SELECT {} FROM record{} GROUP BY {} {} LIMIT {}", 586 select_clause, 587 where_sql, 588 group_by_clause.join(", "), 589 order_by_sql, 590 limit 591 ); 592 593 tracing::debug!("Generated SQL: {}", query); 594 595 let mut query_builder = sqlx::query(&query); 596 query_builder = query_builder.bind(slice_uri); 597 598 // Bind WHERE parameters manually 599 if let Some(clause) = where_clause { 600 for condition in clause.conditions.values() { 601 if let Some(eq_value) = &condition.eq { 602 if let Some(str_val) = eq_value.as_str() { 603 query_builder = query_builder.bind(str_val); 604 } else { 605 query_builder = query_builder.bind(eq_value.to_string()); 606 } 607 } 608 if let Some(in_values) = &condition.in_values { 609 let str_values: Vec<String> = in_values 610 .iter() 611 .filter_map(|v| v.as_str().map(|s| s.to_string())) 612 .collect(); 613 query_builder = query_builder.bind(str_values); 614 } 615 if let Some(contains_value) = &condition.contains { 616 query_builder = query_builder.bind(contains_value); 617 } 618 if let Some(gt_value) = &condition.gt { 619 if let Some(str_val) = gt_value.as_str() { 620 query_builder = query_builder.bind(str_val); 621 } else { 622 query_builder = query_builder.bind(gt_value.to_string()); 623 } 624 } 625 if let Some(gte_value) = &condition.gte { 626 if let Some(str_val) = gte_value.as_str() { 627 query_builder = query_builder.bind(str_val); 628 } else { 629 query_builder = query_builder.bind(gte_value.to_string()); 630 } 631 } 632 if let Some(lt_value) = &condition.lt { 633 if let Some(str_val) = lt_value.as_str() { 634 query_builder = query_builder.bind(str_val); 635 } else { 636 query_builder = query_builder.bind(lt_value.to_string()); 637 } 638 } 639 if let Some(lte_value) = &condition.lte { 640 if let Some(str_val) = lte_value.as_str() { 641 query_builder = query_builder.bind(str_val); 642 } else { 643 query_builder = query_builder.bind(lte_value.to_string()); 644 } 645 } 646 } 647 648 if let Some(or_conditions) = &clause.or_conditions { 649 for condition in or_conditions.values() { 650 if let Some(eq_value) = &condition.eq { 651 if let Some(str_val) = eq_value.as_str() { 652 query_builder = query_builder.bind(str_val); 653 } else { 654 query_builder = query_builder.bind(eq_value.to_string()); 655 } 656 } 657 if let Some(in_values) = &condition.in_values { 658 let str_values: Vec<String> = in_values 659 .iter() 660 .filter_map(|v| v.as_str().map(|s| s.to_string())) 661 .collect(); 662 query_builder = query_builder.bind(str_values); 663 } 664 if let Some(contains_value) = &condition.contains { 665 query_builder = query_builder.bind(contains_value); 666 } 667 if let Some(gt_value) = &condition.gt { 668 if let Some(str_val) = gt_value.as_str() { 669 query_builder = query_builder.bind(str_val); 670 } else { 671 query_builder = query_builder.bind(gt_value.to_string()); 672 } 673 } 674 if let Some(gte_value) = &condition.gte { 675 if let Some(str_val) = gte_value.as_str() { 676 query_builder = query_builder.bind(str_val); 677 } else { 678 query_builder = query_builder.bind(gte_value.to_string()); 679 } 680 } 681 if let Some(lt_value) = &condition.lt { 682 if let Some(str_val) = lt_value.as_str() { 683 query_builder = query_builder.bind(str_val); 684 } else { 685 query_builder = query_builder.bind(lt_value.to_string()); 686 } 687 } 688 if let Some(lte_value) = &condition.lte { 689 if let Some(str_val) = lte_value.as_str() { 690 query_builder = query_builder.bind(str_val); 691 } else { 692 query_builder = query_builder.bind(lte_value.to_string()); 693 } 694 } 695 } 696 } 697 } 698 699 let rows = query_builder.fetch_all(&self.pool).await?; 700 701 // Convert rows to JSON objects 702 let mut results = Vec::new(); 703 for row in rows { 704 let mut obj = serde_json::Map::new(); 705 706 // Extract grouped field values 707 for (i, group_by_field) in group_by_fields.iter().enumerate() { 708 let col_name = format!("field_{}", i); 709 let value: Option<String> = row.try_get(col_name.as_str()).ok(); 710 711 // Try to parse as JSON first (for arrays/objects), otherwise use as string 712 let json_value = if let Some(ref str_val) = value { 713 // Check if it looks like JSON (starts with [ or {) 714 if str_val.starts_with('[') || str_val.starts_with('{') { 715 // Try to parse as JSON 716 serde_json::from_str(str_val) 717 .unwrap_or_else(|_| serde_json::Value::String(str_val.clone())) 718 } else { 719 serde_json::Value::String(str_val.clone()) 720 } 721 } else { 722 serde_json::Value::Null 723 }; 724 725 obj.insert(group_by_field.field_name().to_string(), json_value); 726 } 727 728 // Extract count 729 let count: i64 = row.try_get("count").unwrap_or(0); 730 obj.insert("count".to_string(), serde_json::Value::Number(count.into())); 731 732 results.push(serde_json::Value::Object(obj)); 733 } 734 735 Ok(results) 736 } 737 738 /// Deletes a record by URI. 739 /// 740 /// If slice_uri is provided, only deletes from that slice. 741 /// Otherwise deletes from all slices. 742 /// 743 /// # Returns 744 /// Number of rows affected 745 pub async fn delete_record_by_uri( 746 &self, 747 uri: &str, 748 slice_uri: Option<&str>, 749 ) -> Result<u64, DatabaseError> { 750 let result = if let Some(slice_uri) = slice_uri { 751 sqlx::query("DELETE FROM record WHERE uri = $1 AND slice_uri = $2") 752 .bind(uri) 753 .bind(slice_uri) 754 .execute(&self.pool) 755 .await? 756 } else { 757 sqlx::query("DELETE FROM record WHERE uri = $1") 758 .bind(uri) 759 .execute(&self.pool) 760 .await? 761 }; 762 Ok(result.rows_affected()) 763 } 764 765 /// Deletes all records for a specific slice. 766 /// 767 /// This is a destructive operation that removes all indexed records 768 /// from the specified slice. Records can be recovered by re-syncing. 769 /// 770 /// # Arguments 771 /// * `slice_uri` - AT-URI of the slice to clear 772 /// 773 /// # Returns 774 /// Number of records deleted 775 pub async fn delete_all_records_for_slice( 776 &self, 777 slice_uri: &str, 778 ) -> Result<u64, DatabaseError> { 779 let result = sqlx::query( 780 "DELETE FROM record WHERE slice_uri = $1 AND collection NOT LIKE 'network.slices.%'", 781 ) 782 .bind(slice_uri) 783 .execute(&self.pool) 784 .await?; 785 Ok(result.rows_affected()) 786 } 787 788 /// Deletes all records of a specific collection from a slice. 789 /// 790 /// Used when a lexicon is deleted to clean up all records of that type. 791 /// 792 /// # Arguments 793 /// * `slice_uri` - AT-URI of the slice 794 /// * `collection` - Collection name (NSID) to delete 795 /// 796 /// # Returns 797 /// Number of records deleted 798 pub async fn delete_records_by_collection( 799 &self, 800 slice_uri: &str, 801 collection: &str, 802 ) -> Result<u64, DatabaseError> { 803 let result = sqlx::query("DELETE FROM record WHERE slice_uri = $1 AND collection = $2") 804 .bind(slice_uri) 805 .bind(collection) 806 .execute(&self.pool) 807 .await?; 808 Ok(result.rows_affected()) 809 } 810 811 /// Handles cascade deletion based on record type. 812 /// 813 /// When certain records are deleted, related data should be cleaned up: 814 /// - Lexicon deletion: removes all records of that collection type 815 /// - Slice deletion: removes all records and actors for that slice 816 /// 817 /// # Arguments 818 /// * `uri` - AT-URI of the deleted record 819 /// * `collection` - Collection name (e.g., "network.slices.lexicon") 820 pub async fn handle_cascade_deletion( 821 &self, 822 uri: &str, 823 collection: &str, 824 ) -> Result<(), DatabaseError> { 825 match collection { 826 "network.slices.lexicon" => { 827 // Get the lexicon record to extract collection name and slice URI 828 if let Ok(Some(lexicon_record)) = self.get_record(uri).await 829 && let (Some(nsid), Some(slice_uri_from_record)) = ( 830 lexicon_record.value.get("nsid").and_then(|v| v.as_str()), 831 lexicon_record.value.get("slice").and_then(|v| v.as_str()), 832 ) 833 { 834 // Delete all records of this collection type from the slice 835 let deleted = self 836 .delete_records_by_collection(slice_uri_from_record, nsid) 837 .await?; 838 tracing::info!( 839 "Cascade delete: removed {} records of collection {} from slice {}", 840 deleted, 841 nsid, 842 slice_uri_from_record 843 ); 844 } 845 } 846 "network.slices.slice" => { 847 // The URI itself is the slice URI 848 let slice_uri = uri; 849 850 // Delete all records for this slice 851 let records_deleted = self.delete_all_records_for_slice(slice_uri).await?; 852 tracing::info!( 853 "Cascade delete: removed {} records from slice {}", 854 records_deleted, 855 slice_uri 856 ); 857 858 // Delete all actors for this slice 859 let actors_deleted = 860 super::client::Database::delete_all_actors_for_slice(self, slice_uri).await?; 861 tracing::info!( 862 "Cascade delete: removed {} actors from slice {}", 863 actors_deleted, 864 slice_uri 865 ); 866 } 867 _ => { 868 // No cascade deletion needed for other collections 869 } 870 } 871 Ok(()) 872 } 873 874 /// Inserts or updates a record atomically. 875 /// 876 /// # Returns 877 /// true if inserted (new record), false if updated (existing record) 878 pub async fn upsert_record(&self, record: &Record) -> Result<bool, DatabaseError> { 879 let result = sqlx::query_scalar::<_, bool>( 880 r#" 881 INSERT INTO record (uri, cid, did, collection, json, indexed_at, slice_uri) 882 VALUES ($1, $2, $3, $4, $5, $6, $7) 883 ON CONFLICT ON CONSTRAINT record_pkey DO UPDATE 884 SET cid = EXCLUDED.cid, 885 json = EXCLUDED.json, 886 indexed_at = EXCLUDED.indexed_at 887 RETURNING (xmax = 0) 888 "#, 889 ) 890 .bind(&record.uri) 891 .bind(&record.cid) 892 .bind(&record.did) 893 .bind(&record.collection) 894 .bind(&record.json) 895 .bind(record.indexed_at) 896 .bind(&record.slice_uri) 897 .fetch_one(&self.pool) 898 .await?; 899 Ok(result) 900 } 901 902 /// Gets lexicon definitions for a specific slice. 903 /// 904 /// Filters for network.slices.lexicon records and transforms them 905 /// into the lexicon JSON format expected by the lexicon parser. 906 pub async fn get_lexicons_by_slice( 907 &self, 908 slice_uri: &str, 909 ) -> Result<Vec<serde_json::Value>, DatabaseError> { 910 let records = sqlx::query_as::<_, Record>( 911 r#"SELECT "uri", "cid", "did", "collection", "json", "indexed_at", "slice_uri" 912 FROM "record" 913 WHERE "collection" = 'network.slices.lexicon' 914 AND "json"->>'slice' = $1 915 ORDER BY "indexed_at" DESC"#, 916 ) 917 .bind(slice_uri) 918 .fetch_all(&self.pool) 919 .await?; 920 921 let lexicon_definitions: Vec<serde_json::Value> = records 922 .into_iter() 923 .filter_map(|record| { 924 let nsid = record.json.get("nsid")?.as_str()?; 925 let definitions_str = record.json.get("definitions")?.as_str()?; 926 let definitions: serde_json::Value = serde_json::from_str(definitions_str).ok()?; 927 928 Some(serde_json::json!({ 929 "lexicon": 1, 930 "id": nsid, 931 "defs": definitions 932 })) 933 }) 934 .collect(); 935 936 Ok(lexicon_definitions) 937 } 938} 939 940/// Helper function to check if a field is a datetime field in the lexicon 941fn is_field_datetime(lexicons: &[serde_json::Value], collection: &str, field: &str) -> bool { 942 for lexicon in lexicons { 943 if let Some(id) = lexicon.get("id").and_then(|v| v.as_str()) 944 && id == collection 945 && let Some(defs) = lexicon.get("defs") 946 && let Some(main) = defs.get("main") 947 && let Some(record) = main.get("record") 948 && let Some(properties) = record.get("properties") 949 && let Some(field_def) = properties.get(field) 950 && let Some(format) = field_def.get("format").and_then(|v| v.as_str()) 951 { 952 return format == "datetime"; 953 } 954 } 955 false 956} 957 958#[cfg(test)] 959mod tests { 960 use super::*; 961 962 #[test] 963 fn test_is_field_datetime_found() { 964 let lexicons = vec![serde_json::json!({ 965 "lexicon": 1, 966 "id": "app.bsky.feed.post", 967 "defs": { 968 "main": { 969 "record": { 970 "properties": { 971 "createdAt": { 972 "type": "string", 973 "format": "datetime" 974 }, 975 "text": { 976 "type": "string" 977 } 978 } 979 } 980 } 981 } 982 })]; 983 984 assert!(is_field_datetime( 985 &lexicons, 986 "app.bsky.feed.post", 987 "createdAt" 988 )); 989 } 990 991 #[test] 992 fn test_is_field_datetime_not_datetime() { 993 let lexicons = vec![serde_json::json!({ 994 "lexicon": 1, 995 "id": "app.bsky.feed.post", 996 "defs": { 997 "main": { 998 "record": { 999 "properties": { 1000 "text": { 1001 "type": "string" 1002 } 1003 } 1004 } 1005 } 1006 } 1007 })]; 1008 1009 assert!(!is_field_datetime(&lexicons, "app.bsky.feed.post", "text")); 1010 } 1011 1012 #[test] 1013 fn test_is_field_datetime_missing_field() { 1014 let lexicons = vec![serde_json::json!({ 1015 "lexicon": 1, 1016 "id": "app.bsky.feed.post", 1017 "defs": { 1018 "main": { 1019 "record": { 1020 "properties": { 1021 "text": { 1022 "type": "string" 1023 } 1024 } 1025 } 1026 } 1027 } 1028 })]; 1029 1030 assert!(!is_field_datetime( 1031 &lexicons, 1032 "app.bsky.feed.post", 1033 "nonexistent" 1034 )); 1035 } 1036 1037 #[test] 1038 fn test_is_field_datetime_wrong_collection() { 1039 let lexicons = vec![serde_json::json!({ 1040 "lexicon": 1, 1041 "id": "app.bsky.feed.post", 1042 "defs": { 1043 "main": { 1044 "record": { 1045 "properties": { 1046 "createdAt": { 1047 "type": "string", 1048 "format": "datetime" 1049 } 1050 } 1051 } 1052 } 1053 } 1054 })]; 1055 1056 assert!(!is_field_datetime( 1057 &lexicons, 1058 "app.bsky.actor.profile", 1059 "createdAt" 1060 )); 1061 } 1062 1063 #[test] 1064 fn test_is_field_datetime_multiple_lexicons() { 1065 let lexicons = vec![ 1066 serde_json::json!({ 1067 "lexicon": 1, 1068 "id": "app.bsky.feed.post", 1069 "defs": { 1070 "main": { 1071 "record": { 1072 "properties": { 1073 "text": { 1074 "type": "string" 1075 } 1076 } 1077 } 1078 } 1079 } 1080 }), 1081 serde_json::json!({ 1082 "lexicon": 1, 1083 "id": "app.bsky.actor.profile", 1084 "defs": { 1085 "main": { 1086 "record": { 1087 "properties": { 1088 "createdAt": { 1089 "type": "string", 1090 "format": "datetime" 1091 } 1092 } 1093 } 1094 } 1095 } 1096 }), 1097 ]; 1098 1099 assert!(is_field_datetime( 1100 &lexicons, 1101 "app.bsky.actor.profile", 1102 "createdAt" 1103 )); 1104 assert!(!is_field_datetime(&lexicons, "app.bsky.feed.post", "text")); 1105 } 1106}