Rust AppView - highly experimental!
1//! Database operation executor 2//! 3//! This module contains the actual database execution logic for all DatabaseOperation variants. 4//! 5//! ## Architecture 6//! 7//! - `execute_operation()` - Single operation execution with aggregate delta generation 8//! - `describe_operation()` - Logging helper for operation names and URIs 9//! 10//! ## Aggregate Deltas 11//! 12//! Counts are maintained automatically by database triggers. 13//! Operations return PostStatsDeltas for compatibility. 14 15use super::DatabaseOperation; 16use ipld_core::cid::Cid; 17 18pub fn describe_operation(op: &DatabaseOperation) -> (String, Option<String>) { 19 match op { 20 DatabaseOperation::InsertPost { rkey, actor_id, .. } => { 21 ("InsertPost".to_string(), Some(format!("actor_id:{}:rkey:{}", actor_id, rkey))) 22 } 23 DatabaseOperation::InsertLike { rkey, actor_id, .. } => ( 24 "InsertLike".to_string(), 25 Some(format!("actor_id:{}/app.bsky.feed.like/{}", actor_id, rkey)), 26 ), 27 DatabaseOperation::InsertRepost { rkey, actor_id, .. } => ( 28 "InsertRepost".to_string(), 29 Some(format!("actor_id:{}/app.bsky.feed.repost/{}", actor_id, rkey)), 30 ), 31 DatabaseOperation::InsertFollow { rkey, actor_id, .. } => ( 32 "InsertFollow".to_string(), 33 Some(format!("actor_id:{}/app.bsky.graph.follow/{}", actor_id, rkey)), 34 ), 35 DatabaseOperation::InsertBlock { rkey, actor_id, .. } => ( 36 "InsertBlock".to_string(), 37 Some(format!("actor_id:{}/app.bsky.graph.block/{}", actor_id, rkey)), 38 ), 39 DatabaseOperation::InsertNotification { author_actor_id, record_type, record_rkey, .. } => { 40 ("InsertNotification".to_string(), Some(format!("actor_id:{}/{}/{}", author_actor_id, record_type, record_rkey))) 41 } 42 DatabaseOperation::NotifyReplyChain { reply_uri, .. } => { 43 ("NotifyReplyChain".to_string(), Some(reply_uri.clone())) 44 } 45 DatabaseOperation::UpsertActor { did, .. } => { 46 ("UpsertActor".to_string(), Some(did.clone())) 47 } 48 DatabaseOperation::UpsertProfile { actor_id, .. } => { 49 ("UpsertProfile".to_string(), Some(format!("actor_id:{}", actor_id))) 50 } 51 DatabaseOperation::UpsertStatus { actor_id, .. } => { 52 ("UpsertStatus".to_string(), Some(format!("actor_id:{}", actor_id))) 53 } 54 DatabaseOperation::UpsertPostgate { rkey, actor_id, .. } => { 55 ("UpsertPostgate".to_string(), Some(format!("actor_id:{}:rkey:{}", actor_id, rkey))) 56 } 57 DatabaseOperation::UpsertThreadgate { rkey, actor_id, .. } => { 58 ("UpsertThreadgate".to_string(), Some(format!("actor_id:{}:rkey:{}", actor_id, rkey))) 59 } 60 DatabaseOperation::UpsertList { rkey, actor_id, .. } => { 61 ("UpsertList".to_string(), Some(format!("actor_id:{}:rkey:{}", actor_id, rkey))) 62 } 63 DatabaseOperation::InsertListItem { rkey, actor_id, .. } => { 64 ("InsertListItem".to_string(), Some(format!("actor_id:{}:rkey:{}", actor_id, rkey))) 65 } 66 DatabaseOperation::InsertListBlock { rkey, actor_id, .. } => { 67 ("InsertListBlock".to_string(), Some(format!("actor_id:{}:rkey:{}", actor_id, rkey))) 68 } 69 DatabaseOperation::UpsertFeedGenerator { rkey, actor_id, .. } => { 70 ("UpsertFeedGenerator".to_string(), Some(format!("actor_id:{}:rkey:{}", actor_id, rkey))) 71 } 72 DatabaseOperation::UpsertStarterPack { rkey, actor_id, .. } => { 73 ("UpsertStarterPack".to_string(), Some(format!("actor_id:{}:rkey:{}", actor_id, rkey))) 74 } 75 DatabaseOperation::InsertVerification { rkey, actor_id, .. } => { 76 ("InsertVerification".to_string(), Some(format!("actor_id:{}:rkey:{}", actor_id, rkey))) 77 } 78 DatabaseOperation::UpsertLabeler { actor_id, .. } => { 79 ("UpsertLabeler".to_string(), Some(format!("actor_id:{}", actor_id))) 80 } 81 DatabaseOperation::UpsertNotificationDeclaration { actor_id, .. } => ( 82 "UpsertNotificationDeclaration".to_string(), 83 Some(format!("actor_id:{}", actor_id)), 84 ), 85 DatabaseOperation::UpsertChatDeclaration { actor_id, .. } => { 86 ("UpsertChatDeclaration".to_string(), Some(format!("actor_id:{}", actor_id))) 87 } 88 DatabaseOperation::UpsertBookmark { rkey, actor_id, .. } => ( 89 "UpsertBookmark".to_string(), 90 Some(format!( 91 "actor_id:{}/community.lexicon.bookmarks.bookmark/{}", 92 actor_id, rkey 93 )), 94 ), 95 DatabaseOperation::DeleteRecord { 96 at_uri, collection, .. 97 } => ( 98 format!("DeleteRecord({:?})", collection), 99 Some(at_uri.clone()), 100 ), 101 DatabaseOperation::MaintainSelfLabels { at_uri, .. } => { 102 ("MaintainSelfLabels".to_string(), Some(at_uri.clone())) 103 } 104 DatabaseOperation::MaintainPostgateDetaches { post_uri, .. } => ( 105 "MaintainPostgateDetaches".to_string(), 106 Some(post_uri.clone()), 107 ), 108 } 109} 110 111/// Execute a single database operation 112/// 113/// This function will be called by the database writer for each operation. 114/// 115/// Counts and cache invalidations are maintained by database triggers. 116/// Execute database operations 117pub async fn execute_operation( 118 pool: &deadpool_postgres::Pool, 119 conn: &mut deadpool_postgres::Object, 120 op: DatabaseOperation, 121) -> eyre::Result<()> { 122 use crate::db; 123 124 match op { 125 DatabaseOperation::InsertPost { 126 rkey, 127 actor_id, 128 cid, 129 record, 130 source, 131 } => { 132 // Extract parent URI and embed URI before record is consumed 133 let parent_uri = record.reply.as_ref().map(|reply| reply.parent.uri.clone()); 134 135 let embed_uri = record.embed.as_ref().and_then(|embed_union| { 136 embed_union.as_bsky().and_then(|embed| match embed { 137 crate::types::records::AppBskyEmbed::Record(r) => Some(r.record.uri.clone()), 138 crate::types::records::AppBskyEmbed::RecordWithMedia(r) => { 139 Some(r.record.uri.clone()) 140 } 141 _ => None, 142 }) 143 }); 144 145 let start = std::time::Instant::now(); 146 let rows = 147 db::post_insert(conn, actor_id, rkey, cid, record, source).await?; 148 let elapsed_ms = start.elapsed().as_millis(); 149 150 // Log slow post inserts (>1s) 151 if elapsed_ms > 1000 { 152 tracing::warn!( 153 actor_id = actor_id, 154 rkey = %rkey, 155 elapsed_ms = elapsed_ms, 156 source = ?source, 157 "Slow post insert detected" 158 ); 159 } else if elapsed_ms > 500 { 160 tracing::info!( 161 actor_id = actor_id, 162 rkey = %rkey, 163 elapsed_ms = elapsed_ms, 164 source = ?source, 165 "Post insert timing" 166 ); 167 } 168 if rows > 0 { 169 // If this is a reply, increment parent post's reply count 170 if let Some(parent_uri) = parent_uri.as_ref() { 171 // Resolve parent URI to natural key 172 if let Ok(Some((parent_actor_id, parent_rkey))) = crate::db::id_resolution::resolve_post_uri(conn, parent_uri).await { 173 // Append to parent's reply arrays (array-only tracking) 174 // Arrays are correlated (actor_id, rkey) pairs sorted by rkey for better columnstore compression 175 let _ = conn.execute( 176 "UPDATE posts 177 SET reply_actor_ids = (SELECT array_agg(actor_id ORDER BY rkey) FROM unnest(COALESCE(reply_actor_ids, ARRAY[]::integer[]) || $3, COALESCE(reply_rkeys, ARRAY[]::bigint[]) || $4) AS t(actor_id, rkey)), 178 reply_rkeys = (SELECT array_agg(rkey ORDER BY rkey) FROM unnest(COALESCE(reply_actor_ids, ARRAY[]::integer[]) || $3, COALESCE(reply_rkeys, ARRAY[]::bigint[]) || $4) AS t(actor_id, rkey)) 179 WHERE (actor_id, rkey) = ($1, $2) 180 AND NOT ($3 = ANY(COALESCE(reply_actor_ids, ARRAY[])))", 181 &[&parent_actor_id, &parent_rkey, &actor_id, &rkey], 182 ).await; 183 // Database trigger handles cache invalidation 184 } 185 } 186 187 // If this quotes/embeds a post, append to that post's quote arrays 188 if let Some(quoted_uri) = embed_uri.as_ref() { 189 // Resolve quoted URI to natural key 190 if let Ok(Some((quoted_actor_id, quoted_rkey))) = crate::db::id_resolution::resolve_post_uri(conn, quoted_uri).await { 191 // Append to quoted post's quote arrays (array-only tracking) 192 // Arrays are correlated (actor_id, rkey) pairs sorted by rkey for better columnstore compression 193 let _ = conn.execute( 194 "UPDATE posts 195 SET quote_actor_ids = (SELECT array_agg(actor_id ORDER BY rkey) FROM unnest(COALESCE(quote_actor_ids, ARRAY[]::integer[]) || $3, COALESCE(quote_rkeys, ARRAY[]::bigint[]) || $4) AS t(actor_id, rkey)), 196 quote_rkeys = (SELECT array_agg(rkey ORDER BY rkey) FROM unnest(COALESCE(quote_actor_ids, ARRAY[]::integer[]) || $3, COALESCE(quote_rkeys, ARRAY[]::bigint[]) || $4) AS t(actor_id, rkey)) 197 WHERE (actor_id, rkey) = ($1, $2) 198 AND NOT ($3 = ANY(COALESCE(quote_actor_ids, ARRAY[])))", 199 &[&quoted_actor_id, &quoted_rkey, &actor_id, &rkey], 200 ).await; 201 // Database trigger handles cache invalidation 202 } 203 } 204 205 Ok(()) 206 } else { 207 Ok(()) 208 } 209 } 210 DatabaseOperation::InsertLike { 211 rkey, 212 actor_id, 213 record, 214 via_repost_key, 215 source, 216 } => { 217 218 // Extract URI before record is moved 219 let subject_uri = record.subject.uri.clone(); 220 221 let start = std::time::Instant::now(); 222 let rows = db::like_insert(conn, rkey, actor_id, record, via_repost_key, source).await?; 223 let elapsed_ms = start.elapsed().as_millis(); 224 225 // Log slow like inserts 226 if elapsed_ms > 1000 { 227 tracing::warn!( 228 subject_uri = %subject_uri, 229 elapsed_ms = elapsed_ms, 230 "Slow like insert detected" 231 ); 232 } 233 234 if rows > 0 { 235 // Note: like arrays are updated directly in db::like_insert via LikeArrayOp 236 // No deltas needed with array-only tracking 237 // Database trigger handles cache invalidation 238 239 Ok(()) 240 } else { 241 Ok(()) 242 } 243 } 244 DatabaseOperation::InsertRepost { 245 rkey, 246 actor_id, 247 cid, 248 record, 249 via_repost_key, 250 source, 251 } => { 252 253 // Extract URI before record is moved 254 let subject_uri = record.subject.uri.clone(); 255 256 let start = std::time::Instant::now(); 257 let rows = db::repost_insert(conn, rkey, actor_id, cid, record, via_repost_key, source).await?; 258 let elapsed_ms = start.elapsed().as_millis(); 259 260 // Log slow repost inserts 261 if elapsed_ms > 1000 { 262 tracing::warn!( 263 subject_uri = %subject_uri, 264 elapsed_ms = elapsed_ms, 265 "Slow repost insert detected" 266 ); 267 } 268 269 if rows > 0 { 270 // Note: repost arrays will be updated in db::repost_insert (dual-write to reposts table + post arrays) 271 // No deltas needed with array-only tracking 272 273 // Database trigger handles cache invalidation 274 275 Ok(()) 276 } else { 277 Ok(()) 278 } 279 } 280 DatabaseOperation::InsertFollow { 281 rkey, 282 actor_id, 283 subject_actor_id, 284 cid, 285 record, 286 } => { 287 let rows = db::follow_insert(conn, rkey, actor_id, subject_actor_id, cid, record).await?; 288 if rows > 0 { 289 // Counts maintained by triggers 290 Ok(()) 291 } else { 292 Ok(()) 293 } 294 } 295 DatabaseOperation::InsertBlock { 296 rkey, 297 actor_id, 298 subject_actor_id, 299 cid, 300 record, 301 } => { 302 db::block_insert(conn, rkey, actor_id, subject_actor_id, cid, record).await?; 303 Ok(()) // Blocks don't affect aggregates (yet) 304 } 305 DatabaseOperation::InsertNotification { 306 recipient_actor_id, 307 author_actor_id, 308 record_type, 309 record_rkey, 310 record_cid, 311 reason, 312 subject_actor_id, 313 subject_record_type, 314 subject_rkey, 315 created_at, 316 } => { 317 // Skip self-notifications (author notifying themselves) 318 if recipient_actor_id == author_actor_id { 319 return Ok(()); 320 } 321 322 323 // Check if thread is muted (for reply, quote, and mention notifications) 324 // Only posts can be thread roots, so only check for post subjects 325 if let (Some(subj_actor_id), Some("post"), Some(subj_rkey)) = 326 (subject_actor_id, subject_record_type.as_deref(), subject_rkey.as_ref()) 327 { 328 if db::is_thread_muted(conn, recipient_actor_id, subj_actor_id, *subj_rkey).await? { 329 tracing::debug!( 330 "Skipping notification for muted thread: recipient_actor_id={}, root_post_actor_id={}, root_post_rkey={}", 331 recipient_actor_id, 332 subj_actor_id, 333 subj_rkey 334 ); 335 return Ok(()); 336 } 337 } 338 339 // Add to PostgreSQL 340 if let Err(e) = crate::external::pg_notifications::add_notification( 341 pool, 342 recipient_actor_id, 343 author_actor_id, 344 &record_type, 345 record_rkey, // i64 (Copy type) 346 &record_cid, // &[u8] 347 &reason, 348 subject_actor_id, 349 subject_record_type.as_deref(), 350 subject_rkey, // Option<i64> (Copy type) 351 created_at, 352 ) 353 .await 354 { 355 tracing::warn!( 356 "Failed to add notification to PostgreSQL: recipient_actor_id={}, record={}/{}, error={}", 357 recipient_actor_id, 358 record_type, 359 record_rkey, 360 e 361 ); 362 metrics::counter!("batch_writer_notif_pg_error").increment(1); 363 } else { 364 tracing::debug!( 365 "Created notification in PostgreSQL: author_actor_id={} -> recipient_actor_id={} (reason: {})", 366 author_actor_id, 367 recipient_actor_id, 368 reason 369 ); 370 } 371 372 Ok(()) // Notifications don't affect aggregates 373 } 374 DatabaseOperation::NotifyReplyChain { 375 reply_uri, 376 author_actor_id, 377 cid, 378 created_at, 379 parent_uri, 380 root_uri, 381 max_levels, 382 already_notified_actor_ids, 383 } => { 384 let chain_start = std::time::Instant::now(); 385 386 // Track actor IDs we've already notified to avoid duplicates 387 let mut notified_actor_ids: std::collections::HashSet<i32> = 388 already_notified_actor_ids.into_iter().collect(); 389 390 // The reasonSubject is the root URI (or parent if no root) 391 let reason_subject = root_uri.unwrap_or_else(|| parent_uri.clone()); 392 393 // Walk up the reply chain, creating notifications for ancestors 394 let mut current_uri = parent_uri; 395 let mut level = 0; 396 397 while level < max_levels { 398 // Query the database to get the parent post author and its parent URI 399 let result = db::get_reply_chain_parent(conn, &current_uri).await?; 400 401 let Some((ancestor_actor_id, ancestor_parent_uri)) = result else { 402 // Post not found - stop walking 403 tracing::debug!( 404 "Reply chain walk stopped at level {}: post {} not found", 405 level, 406 current_uri 407 ); 408 break; 409 }; 410 411 // Skip if this is the reply author themselves 412 if ancestor_actor_id == author_actor_id { 413 tracing::debug!( 414 "Skipping reply-chain notification at level {}: ancestor is the reply author", 415 level 416 ); 417 } else if notified_actor_ids.contains(&ancestor_actor_id) { 418 // Already notified (either as direct parent or root) 419 tracing::debug!( 420 "Skipping reply-chain notification at level {}: actor_id={} already notified", 421 level, 422 ancestor_actor_id 423 ); 424 } else { 425 // Parse reason_subject URI early for thread mute check and notification 426 // Format: at://did:plc:xyz/app.bsky.feed.post/rkey 427 let subject_rkey_str = reason_subject.split('/').next_back().unwrap_or(""); 428 let subject_rkey_i64 = match parakeet_db::utils::tid::decode_tid(subject_rkey_str) { 429 Ok(rkey) => rkey, 430 Err(e) => { 431 tracing::warn!("Invalid subject TID in reason_subject: {} - {}", subject_rkey_str, e); 432 level += 1; 433 if let Some(parent) = ancestor_parent_uri { 434 current_uri = parent; 435 continue; 436 } else { 437 break; 438 } 439 } 440 }; 441 442 // Get subject_actor_id from reason_subject DID 443 let subject_actor_id = if let Some(subject_did) = reason_subject 444 .strip_prefix("at://") 445 .and_then(|s| s.split('/').next()) 446 { 447 match db::actor_id_from_did(conn, subject_did).await? { 448 Some(id) => id, 449 None => { 450 tracing::debug!( 451 "Skipping reply-chain notification at level {}: subject DID not found: {}", 452 level, 453 subject_did 454 ); 455 level += 1; 456 if let Some(parent) = ancestor_parent_uri { 457 current_uri = parent; 458 continue; 459 } else { 460 break; 461 } 462 } 463 } 464 } else { 465 tracing::warn!("Invalid reason_subject URI format: {}", reason_subject); 466 level += 1; 467 if let Some(parent) = ancestor_parent_uri { 468 current_uri = parent; 469 continue; 470 } else { 471 break; 472 } 473 }; 474 475 // Check if thread is muted for this ancestor 476 if db::is_thread_muted(conn, ancestor_actor_id, subject_actor_id, subject_rkey_i64).await? { 477 // Check if ancestor has muted this thread using actor_ids and i64 rkey 478 tracing::debug!( 479 "Skipping reply-chain notification at level {}: actor_id={} muted thread (root_post_actor_id={}, rkey={})", 480 level, 481 ancestor_actor_id, 482 subject_actor_id, 483 subject_rkey_i64 484 ); 485 } else { 486 // Create PostgreSQL notification for this ancestor 487 // Extract rkey from reply_uri (at://did/app.bsky.feed.post/rkey) 488 let reply_rkey_str = reply_uri.split('/').next_back().unwrap_or(""); 489 let reply_rkey_i64 = match parakeet_db::utils::tid::decode_tid(reply_rkey_str) { 490 Ok(rkey) => rkey, 491 Err(e) => { 492 tracing::warn!("Invalid reply TID: {} - {}", reply_rkey_str, e); 493 level += 1; 494 if let Some(parent) = ancestor_parent_uri { 495 current_uri = parent; 496 continue; 497 } else { 498 break; 499 } 500 } 501 }; 502 503 // Parse CID string and extract digest 504 let cid_digest = match Cid::try_from(cid.as_str()) { 505 Ok(cid_obj) => { 506 let cid_bytes = cid_obj.to_bytes(); 507 match parakeet_db::utils::cid::cid_to_digest_owned(&cid_bytes) { 508 Some(digest) => digest, 509 None => { 510 tracing::warn!("Invalid CID digest for CID: {}", cid); 511 level += 1; 512 if let Some(parent) = ancestor_parent_uri { 513 current_uri = parent; 514 continue; 515 } else { 516 break; 517 } 518 } 519 } 520 } 521 Err(e) => { 522 tracing::warn!("Failed to parse CID: {} - {}", cid, e); 523 level += 1; 524 if let Some(parent) = ancestor_parent_uri { 525 current_uri = parent; 526 continue; 527 } else { 528 break; 529 } 530 } 531 }; 532 533 if let Err(e) = crate::external::pg_notifications::add_notification( 534 pool, 535 ancestor_actor_id, 536 author_actor_id, 537 "post", // record_type (the reply is a post) 538 reply_rkey_i64, // record_rkey (i64) 539 &cid_digest, // record_cid (32-byte digest) 540 "reply", // reason 541 Some(subject_actor_id), // subject_actor_id (root/parent post author) 542 Some("post"), // subject_record_type 543 Some(subject_rkey_i64), // subject_rkey (Option<i64>) 544 created_at, 545 ) 546 .await 547 { 548 tracing::warn!( 549 "Failed to add reply-chain notification to PostgreSQL: recipient_actor_id={}, record_rkey={}, error={}", 550 ancestor_actor_id, 551 reply_rkey_i64, 552 e 553 ); 554 } else { 555 tracing::debug!( 556 "Created reply-chain notification at level {}: author_actor_id={} -> recipient_actor_id={}", 557 level, 558 author_actor_id, 559 ancestor_actor_id 560 ); 561 notified_actor_ids.insert(ancestor_actor_id); 562 } 563 } 564 } 565 566 // Move to the next ancestor 567 match ancestor_parent_uri { 568 Some(parent) => { 569 current_uri = parent; 570 level += 1; 571 } 572 None => { 573 // Reached the root of the thread 574 tracing::debug!("Reply chain walk reached root at level {}", level); 575 break; 576 } 577 } 578 } 579 580 let chain_elapsed_ms = chain_start.elapsed().as_millis(); 581 if chain_elapsed_ms > 1000 { 582 tracing::warn!( 583 reply_uri = %reply_uri, 584 elapsed_ms = chain_elapsed_ms, 585 levels_walked = level, 586 "Slow NotifyReplyChain detected" 587 ); 588 } 589 590 Ok(()) // Notifications don't affect aggregates 591 } 592 DatabaseOperation::UpsertActor { 593 did, 594 status, 595 sync_state, 596 handle, 597 account_created_at, 598 timestamp, 599 } => { 600 db::actor_upsert( 601 conn, 602 &did, 603 status.as_ref(), 604 &sync_state, 605 handle.as_deref(), 606 account_created_at.as_ref(), 607 timestamp, 608 ) 609 .await?; 610 Ok(()) 611 } 612 DatabaseOperation::UpsertProfile { actor_id, cid, record } => { 613 // Query DID from actor_id (needed by db functions) 614 let did = db::actor_did_from_id(conn, actor_id).await?; 615 616 db::profile_upsert(conn, actor_id, &did, cid, record).await?; 617 // Note: Handle resolution is now managed by Tap, not enqueued separately 618 Ok(()) 619 } 620 DatabaseOperation::UpsertStatus { actor_id, cid, record } => { 621 // Query DID from actor_id (needed by db functions) 622 let did = db::actor_did_from_id(conn, actor_id).await?; 623 624 db::status_upsert(conn, actor_id, &did, cid, record).await?; 625 Ok(()) 626 } 627 DatabaseOperation::UpsertPostgate { 628 rkey, 629 actor_id, 630 cid, 631 record, 632 } => { 633 db::postgate_upsert(conn, actor_id, rkey, cid, &record).await?; 634 Ok(()) 635 } 636 DatabaseOperation::UpsertThreadgate { 637 rkey, 638 actor_id, 639 cid, 640 record, 641 } => { 642 db::threadgate_upsert(conn, actor_id, rkey, cid, record).await?; 643 Ok(()) 644 } 645 DatabaseOperation::UpsertList { 646 rkey, 647 actor_id, 648 cid, 649 record, 650 } => { 651 // Lists use arbitrary string rkeys (not TID-based like posts) 652 let inserted = db::list_upsert(conn, actor_id, &rkey, cid, record).await?; 653 if inserted { 654 // lists_count maintained by application 655 Ok(()) 656 } else { 657 Ok(()) 658 } 659 } 660 DatabaseOperation::InsertListItem { 661 rkey, 662 actor_id, 663 subject_actor_id, 664 cid, 665 record, 666 } => { 667 db::list_item_insert(conn, actor_id, rkey, subject_actor_id, cid, record).await?; 668 Ok(()) 669 } 670 DatabaseOperation::InsertListBlock { 671 rkey, 672 actor_id, 673 cid, 674 record, 675 } => { 676 db::list_block_insert(conn, actor_id, rkey, cid, record).await?; 677 Ok(()) 678 } 679 DatabaseOperation::UpsertFeedGenerator { 680 rkey, 681 actor_id, 682 cid, 683 service_actor_id, 684 record, 685 } => { 686 // Service actor ID already resolved during reference extraction 687 let inserted = db::feedgen_upsert(conn, actor_id, &rkey, cid, service_actor_id, record).await?; 688 if inserted { 689 // feeds_count maintained by application 690 Ok(()) 691 } else { 692 Ok(()) 693 } 694 } 695 DatabaseOperation::UpsertStarterPack { 696 rkey, 697 actor_id, 698 cid, 699 record, 700 } => { 701 let inserted = db::starter_pack_upsert(conn, actor_id, rkey, cid, record).await?; 702 if inserted { 703 // starterpacks_count maintained by application 704 Ok(()) 705 } else { 706 Ok(()) 707 } 708 } 709 DatabaseOperation::InsertVerification { 710 rkey, 711 actor_id, 712 subject_actor_id, 713 cid, 714 record, 715 } => { 716 db::verification_insert(conn, actor_id, rkey, subject_actor_id, cid, record).await?; 717 Ok(()) 718 } 719 DatabaseOperation::UpsertLabeler { actor_id, cid, record } => { 720 // Query DID from actor_id (needed by db functions) 721 let did = db::actor_did_from_id(conn, actor_id).await?; 722 723 db::labeler_upsert(conn, actor_id, &did, cid, record).await?; 724 Ok(()) 725 } 726 DatabaseOperation::UpsertNotificationDeclaration { actor_id, record } => { 727 db::notif_decl_upsert(conn, actor_id, record).await?; 728 Ok(()) 729 } 730 DatabaseOperation::UpsertChatDeclaration { actor_id, record } => { 731 db::chat_decl_upsert(conn, actor_id, record).await?; 732 Ok(()) 733 } 734 DatabaseOperation::UpsertBookmark { actor_id, rkey, record } => { 735 db::bookmark_upsert(conn, actor_id, rkey, record).await?; 736 Ok(()) 737 } 738 DatabaseOperation::DeleteRecord { 739 actor_id, 740 collection, 741 rkey, 742 at_uri, 743 } => { 744 use crate::relay::types::CollectionType; 745 746 // actor_id is already resolved in the resolution phase 747 // No database lookup needed here! 748 749 // Execute type-specific delete operation and collect deltas 750 match collection { 751 CollectionType::BskyFeedLike => { 752 // Returns subject natural key if deleted 753 if let Some(subject) = db::like_delete(conn, rkey, actor_id).await? { 754 match subject { 755 db::LikeSubject::Post { actor_id: post_actor_id, rkey: post_rkey } => { 756 // Note: like arrays are updated directly in db::like_delete via array_remove() 757 // No deltas needed with array-only tracking 758 759 // Invalidate post cache 760 if let Ok(post_did) = db::actor_did_from_id(conn, post_actor_id).await { 761 let _post_uri = format!("at://{}/app.bsky.feed.post/{}", 762 post_did, 763 parakeet_db::models::i64_to_tid(post_rkey)); 764 // Database trigger handles cache invalidation 765 766 // Remove PostgreSQL notification (recipient = author of liked post) 767 if let Err(e) = 768 crate::external::pg_notifications::remove_notification( 769 pool, 770 actor_id, // author_actor_id (liker) 771 "like", // record_type 772 rkey, // record_rkey (i64) 773 ) 774 .await 775 { 776 tracing::warn!( 777 "Failed to remove like notification from PostgreSQL: actor_id={}, rkey={}, error={}", 778 actor_id, 779 rkey, 780 e 781 ); 782 } 783 } 784 } 785 db::LikeSubject::Feedgen { actor_id: _feedgen_actor_id, rkey: _feedgen_rkey } => { 786 // Database trigger handles cache invalidation for feedgen likes 787 } 788 db::LikeSubject::Labeler { actor_id: _labeler_actor_id } => { 789 // Database trigger handles cache invalidation for labeler likes 790 } 791 } 792 } 793 } 794 CollectionType::BskyFeedRepost => { 795 // Returns post URI if deleted 796 if let Some(_post_uri) = db::repost_delete(conn, rkey, actor_id).await? { 797 // Note: repost arrays will be updated in db::repost_delete (remove from post arrays) 798 // No deltas needed with array-only tracking 799 800 // Invalidate post cache since repost array changed 801 // Database trigger handles cache invalidation 802 803 // Remove PostgreSQL notification (recipient = author of reposted post) 804 if let Err(e) = 805 crate::external::pg_notifications::remove_notification( 806 pool, 807 actor_id, // author_actor_id (reposter) 808 "repost", // record_type 809 rkey, // record_rkey (i64) 810 ) 811 .await 812 { 813 tracing::warn!( 814 "Failed to remove repost notification from PostgreSQL: actor_id={}, rkey={}, error={}", 815 actor_id, 816 rkey, 817 e 818 ); 819 } 820 821 // Invalidate author feed cache when user deletes a repost 822 // Database trigger handles cache invalidation 823 } 824 } 825 CollectionType::BskyFollow => { 826 // Returns subject (target DID) if deleted 827 if let Some(_target_did) = db::follow_delete(conn, rkey, actor_id).await? { 828 // Counts maintained by triggers 829 830 // Remove PostgreSQL notification (recipient = followed user) 831 if let Err(e) = 832 crate::external::pg_notifications::remove_notification( 833 pool, 834 actor_id, // author_actor_id (follower) 835 "follow", // record_type 836 rkey, // record_rkey (i64) 837 ) 838 .await 839 { 840 tracing::warn!( 841 "Failed to remove follow notification from PostgreSQL: actor_id={}, rkey={}, error={}", 842 actor_id, 843 rkey, 844 e 845 ); 846 } 847 848 // Invalidate timeline cache when user unfollows someone 849 // Database trigger handles cache invalidation 850 } 851 } 852 CollectionType::BskyFeedPost => { 853 // Returns (parent natural key, embedded post natural key) if deleted 854 if let Some((parent_key, embedded_key)) = 855 db::post_delete(conn, actor_id, rkey).await? 856 { 857 // posts_count maintained by triggers 858 859 // Remove from parent's reply arrays if this was a reply 860 if let Some((parent_actor_id, parent_rkey)) = parent_key { 861 // Remove from reply arrays using array_remove() 862 let _ = conn.execute( 863 "UPDATE posts 864 SET reply_actor_ids = array_remove(reply_actor_ids, $3), 865 reply_rkeys = array_remove(reply_rkeys, $4) 866 WHERE (actor_id, rkey) = ($1, $2)", 867 &[&parent_actor_id, &parent_rkey, &actor_id, &rkey], 868 ).await; 869 870 // Invalidate parent post cache since reply array changed 871 // Construct URI using actor cache 872 if let Ok(parent_did) = db::actor_did_from_id(conn, parent_actor_id).await { 873 let _parent_uri = format!("at://{}/app.bsky.feed.post/{}", 874 parent_did, 875 parakeet_db::models::i64_to_tid(parent_rkey)); 876 // Database trigger handles cache invalidation 877 } 878 } 879 880 // Remove from embedded post's quote arrays if this quoted a record 881 if let Some((embed_actor_id, embed_rkey)) = embedded_key { 882 // Remove from quote arrays using array_remove() 883 let _ = conn.execute( 884 "UPDATE posts 885 SET quote_actor_ids = array_remove(quote_actor_ids, $3), 886 quote_rkeys = array_remove(quote_rkeys, $4) 887 WHERE (actor_id, rkey) = ($1, $2)", 888 &[&embed_actor_id, &embed_rkey, &actor_id, &rkey], 889 ).await; 890 891 // Invalidate quoted post cache since quote array changed 892 // Construct URI using actor cache 893 if let Ok(embed_did) = db::actor_did_from_id(conn, embed_actor_id).await { 894 let _embed_uri = format!("at://{}/app.bsky.feed.post/{}", 895 embed_did, 896 parakeet_db::models::i64_to_tid(embed_rkey)); 897 // Database trigger handles cache invalidation 898 } 899 } 900 901 // Remove PostgreSQL notification for this post 902 // Posts can create multiple notifications (reply, quote, mention) 903 // PostgreSQL uses (author_actor_id, record_type, record_rkey) as unique key 904 if let Err(e) = 905 crate::external::pg_notifications::remove_notification( 906 pool, 907 actor_id, // author_actor_id (post author) 908 "post", // record_type 909 rkey, // record_rkey (i64) 910 ) 911 .await 912 { 913 tracing::warn!( 914 "Failed to remove post notification from PostgreSQL: actor_id={}, rkey={}, error={}", 915 actor_id, 916 rkey, 917 e 918 ); 919 } 920 921 // Invalidate author feed cache when user deletes a post 922 // Database trigger handles cache invalidation 923 } 924 } 925 // Other delete operations that don't affect aggregates 926 CollectionType::BskyProfile => { 927 db::profile_delete(conn, actor_id).await?; 928 // Invalidate profile cache when user deletes profile record 929 // Database trigger handles cache invalidation 930 } 931 CollectionType::BskyStatus => { 932 db::status_delete(conn, actor_id).await?; 933 // Invalidate profile cache when user deletes custom status 934 // Database trigger handles cache invalidation 935 } 936 CollectionType::BskyBlock => { 937 db::block_delete(conn, rkey, actor_id).await?; 938 // Invalidate timeline cache when user unblocks someone 939 // Database trigger handles cache invalidation 940 } 941 CollectionType::BskyFeedGen => { 942 // Feedgens use arbitrary string rkeys - extract from at_uri 943 let rkey_str = parakeet_db::utils::at_uri::extract_rkey(&at_uri) 944 .ok_or_else(|| eyre::eyre!("Invalid at_uri: missing rkey"))?; 945 let rows = db::feedgen_delete(conn, actor_id, rkey_str).await?; 946 if rows > 0 { 947 // feeds_count maintained by application 948 } 949 } 950 CollectionType::BskyFeedPostgate => { 951 db::postgate_delete(conn, actor_id, rkey).await?; 952 } 953 CollectionType::BskyFeedThreadgate => { 954 db::threadgate_delete(conn, actor_id, rkey).await?; 955 } 956 CollectionType::BskyList => { 957 // Lists use arbitrary string rkeys - extract from at_uri 958 let rkey_str = parakeet_db::utils::at_uri::extract_rkey(&at_uri) 959 .ok_or_else(|| eyre::eyre!("Invalid at_uri: missing rkey"))?; 960 961 let rows = db::list_delete(conn, actor_id, rkey_str).await?; 962 if rows > 0 { 963 // lists_count maintained by application 964 } 965 } 966 CollectionType::BskyListBlock => { 967 db::list_block_delete(conn, actor_id, rkey).await?; 968 } 969 CollectionType::BskyListItem => { 970 db::list_item_delete(conn, actor_id, rkey).await?; 971 } 972 CollectionType::BskyStarterPack => { 973 let rows = db::starter_pack_delete(conn, actor_id, rkey).await?; 974 if rows > 0 { 975 // starterpacks_count maintained by application 976 } 977 } 978 CollectionType::BskyVerification => { 979 db::verification_delete(conn, actor_id, rkey).await?; 980 } 981 CollectionType::BskyLabelerService => { 982 db::labeler_delete(conn, actor_id).await?; 983 } 984 CollectionType::BskyNotificationDeclaration => { 985 db::notif_decl_delete(conn, actor_id).await?; 986 } 987 CollectionType::ChatActorDecl => { 988 db::chat_decl_delete(conn, actor_id).await?; 989 } 990 CollectionType::CommunityLexiconBookmark => { 991 db::bookmark_delete(conn, rkey, actor_id).await?; 992 } 993 _ => { 994 // Unsupported collection types - no-op 995 } 996 } 997 998 // Note: No generic records table to delete from anymore 999 1000 Ok(()) 1001 } 1002 DatabaseOperation::MaintainSelfLabels { 1003 actor_id, 1004 cid, 1005 at_uri, 1006 labels, 1007 } => { 1008 // Query DID from actor_id (needed by db function) 1009 let did = db::actor_did_from_id(conn, actor_id).await?; 1010 1011 db::maintain_self_labels(conn, &did, cid, &at_uri, labels).await?; 1012 Ok(()) 1013 } 1014 DatabaseOperation::MaintainPostgateDetaches { 1015 post_uri, 1016 detached_uris, 1017 disable_effective, 1018 } => { 1019 // Use optimized cached version that: 1020 // - Skips no-ops (posts with no quotes) 1021 // - Uses direct database lookups for IDs 1022 // - Does single efficient UPDATE 1023 db::postgate_maintain_detaches_cached(conn, &post_uri, &detached_uris, disable_effective).await?; 1024 Ok(()) 1025 } 1026 } 1027}