//! Database operation executor //! //! This module contains the actual database execution logic for all DatabaseOperation variants. //! //! ## Architecture //! //! - `execute_operation()` - Single operation execution with aggregate delta generation //! - `describe_operation()` - Logging helper for operation names and URIs //! //! ## Aggregate Deltas //! //! Counts are maintained automatically by database triggers. //! Operations return PostStatsDeltas for compatibility. use super::DatabaseOperation; use ipld_core::cid::Cid; pub fn describe_operation(op: &DatabaseOperation) -> (String, Option) { match op { DatabaseOperation::InsertPost { rkey, actor_id, .. } => { ("InsertPost".to_string(), Some(format!("actor_id:{}:rkey:{}", actor_id, rkey))) } DatabaseOperation::InsertLike { rkey, actor_id, .. } => ( "InsertLike".to_string(), Some(format!("actor_id:{}/app.bsky.feed.like/{}", actor_id, rkey)), ), DatabaseOperation::InsertRepost { rkey, actor_id, .. } => ( "InsertRepost".to_string(), Some(format!("actor_id:{}/app.bsky.feed.repost/{}", actor_id, rkey)), ), DatabaseOperation::InsertFollow { rkey, actor_id, .. } => ( "InsertFollow".to_string(), Some(format!("actor_id:{}/app.bsky.graph.follow/{}", actor_id, rkey)), ), DatabaseOperation::InsertBlock { rkey, actor_id, .. } => ( "InsertBlock".to_string(), Some(format!("actor_id:{}/app.bsky.graph.block/{}", actor_id, rkey)), ), DatabaseOperation::InsertNotification { author_actor_id, record_type, record_rkey, .. } => { ("InsertNotification".to_string(), Some(format!("actor_id:{}/{}/{}", author_actor_id, record_type, record_rkey))) } DatabaseOperation::NotifyReplyChain { reply_uri, .. } => { ("NotifyReplyChain".to_string(), Some(reply_uri.clone())) } DatabaseOperation::UpsertActor { did, .. } => { ("UpsertActor".to_string(), Some(did.clone())) } DatabaseOperation::UpsertProfile { actor_id, .. } => { ("UpsertProfile".to_string(), Some(format!("actor_id:{}", actor_id))) } DatabaseOperation::UpsertStatus { actor_id, .. } => { ("UpsertStatus".to_string(), Some(format!("actor_id:{}", actor_id))) } DatabaseOperation::UpsertPostgate { rkey, actor_id, .. } => { ("UpsertPostgate".to_string(), Some(format!("actor_id:{}:rkey:{}", actor_id, rkey))) } DatabaseOperation::UpsertThreadgate { rkey, actor_id, .. } => { ("UpsertThreadgate".to_string(), Some(format!("actor_id:{}:rkey:{}", actor_id, rkey))) } DatabaseOperation::UpsertList { rkey, actor_id, .. } => { ("UpsertList".to_string(), Some(format!("actor_id:{}:rkey:{}", actor_id, rkey))) } DatabaseOperation::InsertListItem { rkey, actor_id, .. } => { ("InsertListItem".to_string(), Some(format!("actor_id:{}:rkey:{}", actor_id, rkey))) } DatabaseOperation::InsertListBlock { rkey, actor_id, .. } => { ("InsertListBlock".to_string(), Some(format!("actor_id:{}:rkey:{}", actor_id, rkey))) } DatabaseOperation::UpsertFeedGenerator { rkey, actor_id, .. } => { ("UpsertFeedGenerator".to_string(), Some(format!("actor_id:{}:rkey:{}", actor_id, rkey))) } DatabaseOperation::UpsertStarterPack { rkey, actor_id, .. } => { ("UpsertStarterPack".to_string(), Some(format!("actor_id:{}:rkey:{}", actor_id, rkey))) } DatabaseOperation::InsertVerification { rkey, actor_id, .. } => { ("InsertVerification".to_string(), Some(format!("actor_id:{}:rkey:{}", actor_id, rkey))) } DatabaseOperation::UpsertLabeler { actor_id, .. } => { ("UpsertLabeler".to_string(), Some(format!("actor_id:{}", actor_id))) } DatabaseOperation::UpsertNotificationDeclaration { actor_id, .. } => ( "UpsertNotificationDeclaration".to_string(), Some(format!("actor_id:{}", actor_id)), ), DatabaseOperation::UpsertChatDeclaration { actor_id, .. } => { ("UpsertChatDeclaration".to_string(), Some(format!("actor_id:{}", actor_id))) } DatabaseOperation::UpsertBookmark { rkey, actor_id, .. } => ( "UpsertBookmark".to_string(), Some(format!( "actor_id:{}/community.lexicon.bookmarks.bookmark/{}", actor_id, rkey )), ), DatabaseOperation::DeleteRecord { at_uri, collection, .. } => ( format!("DeleteRecord({:?})", collection), Some(at_uri.clone()), ), DatabaseOperation::MaintainSelfLabels { at_uri, .. } => { ("MaintainSelfLabels".to_string(), Some(at_uri.clone())) } DatabaseOperation::MaintainPostgateDetaches { post_uri, .. } => ( "MaintainPostgateDetaches".to_string(), Some(post_uri.clone()), ), } } /// Execute a single database operation /// /// This function will be called by the database writer for each operation. /// /// Counts and cache invalidations are maintained by database triggers. /// Execute database operations pub async fn execute_operation( pool: &deadpool_postgres::Pool, conn: &mut deadpool_postgres::Object, op: DatabaseOperation, ) -> eyre::Result<()> { use crate::db; match op { DatabaseOperation::InsertPost { rkey, actor_id, cid, record, source, } => { // Extract parent URI and embed URI before record is consumed let parent_uri = record.reply.as_ref().map(|reply| reply.parent.uri.clone()); let embed_uri = record.embed.as_ref().and_then(|embed_union| { embed_union.as_bsky().and_then(|embed| match embed { crate::types::records::AppBskyEmbed::Record(r) => Some(r.record.uri.clone()), crate::types::records::AppBskyEmbed::RecordWithMedia(r) => { Some(r.record.uri.clone()) } _ => None, }) }); let start = std::time::Instant::now(); let rows = db::post_insert(conn, actor_id, rkey, cid, record, source).await?; let elapsed_ms = start.elapsed().as_millis(); // Log slow post inserts (>1s) if elapsed_ms > 1000 { tracing::warn!( actor_id = actor_id, rkey = %rkey, elapsed_ms = elapsed_ms, source = ?source, "Slow post insert detected" ); } else if elapsed_ms > 500 { tracing::info!( actor_id = actor_id, rkey = %rkey, elapsed_ms = elapsed_ms, source = ?source, "Post insert timing" ); } if rows > 0 { // If this is a reply, increment parent post's reply count if let Some(parent_uri) = parent_uri.as_ref() { // Resolve parent URI to natural key if let Ok(Some((parent_actor_id, parent_rkey))) = crate::db::id_resolution::resolve_post_uri(conn, parent_uri).await { // Append to parent's reply arrays (array-only tracking) // Arrays are correlated (actor_id, rkey) pairs sorted by rkey for better columnstore compression let _ = conn.execute( "UPDATE posts SET reply_actor_ids = (SELECT array_agg(actor_id ORDER BY rkey) FROM unnest(COALESCE(reply_actor_ids, ARRAY[]::integer[]) || $3, COALESCE(reply_rkeys, ARRAY[]::bigint[]) || $4) AS t(actor_id, rkey)), reply_rkeys = (SELECT array_agg(rkey ORDER BY rkey) FROM unnest(COALESCE(reply_actor_ids, ARRAY[]::integer[]) || $3, COALESCE(reply_rkeys, ARRAY[]::bigint[]) || $4) AS t(actor_id, rkey)) WHERE (actor_id, rkey) = ($1, $2) AND NOT ($3 = ANY(COALESCE(reply_actor_ids, ARRAY[])))", &[&parent_actor_id, &parent_rkey, &actor_id, &rkey], ).await; // Database trigger handles cache invalidation } } // If this quotes/embeds a post, append to that post's quote arrays if let Some(quoted_uri) = embed_uri.as_ref() { // Resolve quoted URI to natural key if let Ok(Some((quoted_actor_id, quoted_rkey))) = crate::db::id_resolution::resolve_post_uri(conn, quoted_uri).await { // Append to quoted post's quote arrays (array-only tracking) // Arrays are correlated (actor_id, rkey) pairs sorted by rkey for better columnstore compression let _ = conn.execute( "UPDATE posts SET quote_actor_ids = (SELECT array_agg(actor_id ORDER BY rkey) FROM unnest(COALESCE(quote_actor_ids, ARRAY[]::integer[]) || $3, COALESCE(quote_rkeys, ARRAY[]::bigint[]) || $4) AS t(actor_id, rkey)), quote_rkeys = (SELECT array_agg(rkey ORDER BY rkey) FROM unnest(COALESCE(quote_actor_ids, ARRAY[]::integer[]) || $3, COALESCE(quote_rkeys, ARRAY[]::bigint[]) || $4) AS t(actor_id, rkey)) WHERE (actor_id, rkey) = ($1, $2) AND NOT ($3 = ANY(COALESCE(quote_actor_ids, ARRAY[])))", &["ed_actor_id, "ed_rkey, &actor_id, &rkey], ).await; // Database trigger handles cache invalidation } } Ok(()) } else { Ok(()) } } DatabaseOperation::InsertLike { rkey, actor_id, record, via_repost_key, source, } => { // Extract URI before record is moved let subject_uri = record.subject.uri.clone(); let start = std::time::Instant::now(); let rows = db::like_insert(conn, rkey, actor_id, record, via_repost_key, source).await?; let elapsed_ms = start.elapsed().as_millis(); // Log slow like inserts if elapsed_ms > 1000 { tracing::warn!( subject_uri = %subject_uri, elapsed_ms = elapsed_ms, "Slow like insert detected" ); } if rows > 0 { // Note: like arrays are updated directly in db::like_insert via LikeArrayOp // No deltas needed with array-only tracking // Database trigger handles cache invalidation Ok(()) } else { Ok(()) } } DatabaseOperation::InsertRepost { rkey, actor_id, cid, record, via_repost_key, source, } => { // Extract URI before record is moved let subject_uri = record.subject.uri.clone(); let start = std::time::Instant::now(); let rows = db::repost_insert(conn, rkey, actor_id, cid, record, via_repost_key, source).await?; let elapsed_ms = start.elapsed().as_millis(); // Log slow repost inserts if elapsed_ms > 1000 { tracing::warn!( subject_uri = %subject_uri, elapsed_ms = elapsed_ms, "Slow repost insert detected" ); } if rows > 0 { // Note: repost arrays will be updated in db::repost_insert (dual-write to reposts table + post arrays) // No deltas needed with array-only tracking // Database trigger handles cache invalidation Ok(()) } else { Ok(()) } } DatabaseOperation::InsertFollow { rkey, actor_id, subject_actor_id, cid, record, } => { let rows = db::follow_insert(conn, rkey, actor_id, subject_actor_id, cid, record).await?; if rows > 0 { // Counts maintained by triggers Ok(()) } else { Ok(()) } } DatabaseOperation::InsertBlock { rkey, actor_id, subject_actor_id, cid, record, } => { db::block_insert(conn, rkey, actor_id, subject_actor_id, cid, record).await?; Ok(()) // Blocks don't affect aggregates (yet) } DatabaseOperation::InsertNotification { recipient_actor_id, author_actor_id, record_type, record_rkey, record_cid, reason, subject_actor_id, subject_record_type, subject_rkey, created_at, } => { // Skip self-notifications (author notifying themselves) if recipient_actor_id == author_actor_id { return Ok(()); } // Check if thread is muted (for reply, quote, and mention notifications) // Only posts can be thread roots, so only check for post subjects if let (Some(subj_actor_id), Some("post"), Some(subj_rkey)) = (subject_actor_id, subject_record_type.as_deref(), subject_rkey.as_ref()) { if db::is_thread_muted(conn, recipient_actor_id, subj_actor_id, *subj_rkey).await? { tracing::debug!( "Skipping notification for muted thread: recipient_actor_id={}, root_post_actor_id={}, root_post_rkey={}", recipient_actor_id, subj_actor_id, subj_rkey ); return Ok(()); } } // Add to PostgreSQL if let Err(e) = crate::external::pg_notifications::add_notification( pool, recipient_actor_id, author_actor_id, &record_type, record_rkey, // i64 (Copy type) &record_cid, // &[u8] &reason, subject_actor_id, subject_record_type.as_deref(), subject_rkey, // Option (Copy type) created_at, ) .await { tracing::warn!( "Failed to add notification to PostgreSQL: recipient_actor_id={}, record={}/{}, error={}", recipient_actor_id, record_type, record_rkey, e ); metrics::counter!("batch_writer_notif_pg_error").increment(1); } else { tracing::debug!( "Created notification in PostgreSQL: author_actor_id={} -> recipient_actor_id={} (reason: {})", author_actor_id, recipient_actor_id, reason ); } Ok(()) // Notifications don't affect aggregates } DatabaseOperation::NotifyReplyChain { reply_uri, author_actor_id, cid, created_at, parent_uri, root_uri, max_levels, already_notified_actor_ids, } => { let chain_start = std::time::Instant::now(); // Track actor IDs we've already notified to avoid duplicates let mut notified_actor_ids: std::collections::HashSet = already_notified_actor_ids.into_iter().collect(); // The reasonSubject is the root URI (or parent if no root) let reason_subject = root_uri.unwrap_or_else(|| parent_uri.clone()); // Walk up the reply chain, creating notifications for ancestors let mut current_uri = parent_uri; let mut level = 0; while level < max_levels { // Query the database to get the parent post author and its parent URI let result = db::get_reply_chain_parent(conn, ¤t_uri).await?; let Some((ancestor_actor_id, ancestor_parent_uri)) = result else { // Post not found - stop walking tracing::debug!( "Reply chain walk stopped at level {}: post {} not found", level, current_uri ); break; }; // Skip if this is the reply author themselves if ancestor_actor_id == author_actor_id { tracing::debug!( "Skipping reply-chain notification at level {}: ancestor is the reply author", level ); } else if notified_actor_ids.contains(&ancestor_actor_id) { // Already notified (either as direct parent or root) tracing::debug!( "Skipping reply-chain notification at level {}: actor_id={} already notified", level, ancestor_actor_id ); } else { // Parse reason_subject URI early for thread mute check and notification // Format: at://did:plc:xyz/app.bsky.feed.post/rkey let subject_rkey_str = reason_subject.split('/').next_back().unwrap_or(""); let subject_rkey_i64 = match parakeet_db::utils::tid::decode_tid(subject_rkey_str) { Ok(rkey) => rkey, Err(e) => { tracing::warn!("Invalid subject TID in reason_subject: {} - {}", subject_rkey_str, e); level += 1; if let Some(parent) = ancestor_parent_uri { current_uri = parent; continue; } else { break; } } }; // Get subject_actor_id from reason_subject DID let subject_actor_id = if let Some(subject_did) = reason_subject .strip_prefix("at://") .and_then(|s| s.split('/').next()) { match db::actor_id_from_did(conn, subject_did).await? { Some(id) => id, None => { tracing::debug!( "Skipping reply-chain notification at level {}: subject DID not found: {}", level, subject_did ); level += 1; if let Some(parent) = ancestor_parent_uri { current_uri = parent; continue; } else { break; } } } } else { tracing::warn!("Invalid reason_subject URI format: {}", reason_subject); level += 1; if let Some(parent) = ancestor_parent_uri { current_uri = parent; continue; } else { break; } }; // Check if thread is muted for this ancestor if db::is_thread_muted(conn, ancestor_actor_id, subject_actor_id, subject_rkey_i64).await? { // Check if ancestor has muted this thread using actor_ids and i64 rkey tracing::debug!( "Skipping reply-chain notification at level {}: actor_id={} muted thread (root_post_actor_id={}, rkey={})", level, ancestor_actor_id, subject_actor_id, subject_rkey_i64 ); } else { // Create PostgreSQL notification for this ancestor // Extract rkey from reply_uri (at://did/app.bsky.feed.post/rkey) let reply_rkey_str = reply_uri.split('/').next_back().unwrap_or(""); let reply_rkey_i64 = match parakeet_db::utils::tid::decode_tid(reply_rkey_str) { Ok(rkey) => rkey, Err(e) => { tracing::warn!("Invalid reply TID: {} - {}", reply_rkey_str, e); level += 1; if let Some(parent) = ancestor_parent_uri { current_uri = parent; continue; } else { break; } } }; // Parse CID string and extract digest let cid_digest = match Cid::try_from(cid.as_str()) { Ok(cid_obj) => { let cid_bytes = cid_obj.to_bytes(); match parakeet_db::utils::cid::cid_to_digest_owned(&cid_bytes) { Some(digest) => digest, None => { tracing::warn!("Invalid CID digest for CID: {}", cid); level += 1; if let Some(parent) = ancestor_parent_uri { current_uri = parent; continue; } else { break; } } } } Err(e) => { tracing::warn!("Failed to parse CID: {} - {}", cid, e); level += 1; if let Some(parent) = ancestor_parent_uri { current_uri = parent; continue; } else { break; } } }; if let Err(e) = crate::external::pg_notifications::add_notification( pool, ancestor_actor_id, author_actor_id, "post", // record_type (the reply is a post) reply_rkey_i64, // record_rkey (i64) &cid_digest, // record_cid (32-byte digest) "reply", // reason Some(subject_actor_id), // subject_actor_id (root/parent post author) Some("post"), // subject_record_type Some(subject_rkey_i64), // subject_rkey (Option) created_at, ) .await { tracing::warn!( "Failed to add reply-chain notification to PostgreSQL: recipient_actor_id={}, record_rkey={}, error={}", ancestor_actor_id, reply_rkey_i64, e ); } else { tracing::debug!( "Created reply-chain notification at level {}: author_actor_id={} -> recipient_actor_id={}", level, author_actor_id, ancestor_actor_id ); notified_actor_ids.insert(ancestor_actor_id); } } } // Move to the next ancestor match ancestor_parent_uri { Some(parent) => { current_uri = parent; level += 1; } None => { // Reached the root of the thread tracing::debug!("Reply chain walk reached root at level {}", level); break; } } } let chain_elapsed_ms = chain_start.elapsed().as_millis(); if chain_elapsed_ms > 1000 { tracing::warn!( reply_uri = %reply_uri, elapsed_ms = chain_elapsed_ms, levels_walked = level, "Slow NotifyReplyChain detected" ); } Ok(()) // Notifications don't affect aggregates } DatabaseOperation::UpsertActor { did, status, sync_state, handle, account_created_at, timestamp, } => { db::actor_upsert( conn, &did, status.as_ref(), &sync_state, handle.as_deref(), account_created_at.as_ref(), timestamp, ) .await?; Ok(()) } DatabaseOperation::UpsertProfile { actor_id, cid, record } => { // Query DID from actor_id (needed by db functions) let did = db::actor_did_from_id(conn, actor_id).await?; db::profile_upsert(conn, actor_id, &did, cid, record).await?; // Note: Handle resolution is now managed by Tap, not enqueued separately Ok(()) } DatabaseOperation::UpsertStatus { actor_id, cid, record } => { // Query DID from actor_id (needed by db functions) let did = db::actor_did_from_id(conn, actor_id).await?; db::status_upsert(conn, actor_id, &did, cid, record).await?; Ok(()) } DatabaseOperation::UpsertPostgate { rkey, actor_id, cid, record, } => { db::postgate_upsert(conn, actor_id, rkey, cid, &record).await?; Ok(()) } DatabaseOperation::UpsertThreadgate { rkey, actor_id, cid, record, } => { db::threadgate_upsert(conn, actor_id, rkey, cid, record).await?; Ok(()) } DatabaseOperation::UpsertList { rkey, actor_id, cid, record, } => { // Lists use arbitrary string rkeys (not TID-based like posts) let inserted = db::list_upsert(conn, actor_id, &rkey, cid, record).await?; if inserted { // lists_count maintained by application Ok(()) } else { Ok(()) } } DatabaseOperation::InsertListItem { rkey, actor_id, subject_actor_id, cid, record, } => { db::list_item_insert(conn, actor_id, rkey, subject_actor_id, cid, record).await?; Ok(()) } DatabaseOperation::InsertListBlock { rkey, actor_id, cid, record, } => { db::list_block_insert(conn, actor_id, rkey, cid, record).await?; Ok(()) } DatabaseOperation::UpsertFeedGenerator { rkey, actor_id, cid, service_actor_id, record, } => { // Service actor ID already resolved during reference extraction let inserted = db::feedgen_upsert(conn, actor_id, &rkey, cid, service_actor_id, record).await?; if inserted { // feeds_count maintained by application Ok(()) } else { Ok(()) } } DatabaseOperation::UpsertStarterPack { rkey, actor_id, cid, record, } => { let inserted = db::starter_pack_upsert(conn, actor_id, rkey, cid, record).await?; if inserted { // starterpacks_count maintained by application Ok(()) } else { Ok(()) } } DatabaseOperation::InsertVerification { rkey, actor_id, subject_actor_id, cid, record, } => { db::verification_insert(conn, actor_id, rkey, subject_actor_id, cid, record).await?; Ok(()) } DatabaseOperation::UpsertLabeler { actor_id, cid, record } => { // Query DID from actor_id (needed by db functions) let did = db::actor_did_from_id(conn, actor_id).await?; db::labeler_upsert(conn, actor_id, &did, cid, record).await?; Ok(()) } DatabaseOperation::UpsertNotificationDeclaration { actor_id, record } => { db::notif_decl_upsert(conn, actor_id, record).await?; Ok(()) } DatabaseOperation::UpsertChatDeclaration { actor_id, record } => { db::chat_decl_upsert(conn, actor_id, record).await?; Ok(()) } DatabaseOperation::UpsertBookmark { actor_id, rkey, record } => { db::bookmark_upsert(conn, actor_id, rkey, record).await?; Ok(()) } DatabaseOperation::DeleteRecord { actor_id, collection, rkey, at_uri, } => { use crate::relay::types::CollectionType; // actor_id is already resolved in the resolution phase // No database lookup needed here! // Execute type-specific delete operation and collect deltas match collection { CollectionType::BskyFeedLike => { // Returns subject natural key if deleted if let Some(subject) = db::like_delete(conn, rkey, actor_id).await? { match subject { db::LikeSubject::Post { actor_id: post_actor_id, rkey: post_rkey } => { // Note: like arrays are updated directly in db::like_delete via array_remove() // No deltas needed with array-only tracking // Invalidate post cache if let Ok(post_did) = db::actor_did_from_id(conn, post_actor_id).await { let _post_uri = format!("at://{}/app.bsky.feed.post/{}", post_did, parakeet_db::models::i64_to_tid(post_rkey)); // Database trigger handles cache invalidation // Remove PostgreSQL notification (recipient = author of liked post) if let Err(e) = crate::external::pg_notifications::remove_notification( pool, actor_id, // author_actor_id (liker) "like", // record_type rkey, // record_rkey (i64) ) .await { tracing::warn!( "Failed to remove like notification from PostgreSQL: actor_id={}, rkey={}, error={}", actor_id, rkey, e ); } } } db::LikeSubject::Feedgen { actor_id: _feedgen_actor_id, rkey: _feedgen_rkey } => { // Database trigger handles cache invalidation for feedgen likes } db::LikeSubject::Labeler { actor_id: _labeler_actor_id } => { // Database trigger handles cache invalidation for labeler likes } } } } CollectionType::BskyFeedRepost => { // Returns post URI if deleted if let Some(_post_uri) = db::repost_delete(conn, rkey, actor_id).await? { // Note: repost arrays will be updated in db::repost_delete (remove from post arrays) // No deltas needed with array-only tracking // Invalidate post cache since repost array changed // Database trigger handles cache invalidation // Remove PostgreSQL notification (recipient = author of reposted post) if let Err(e) = crate::external::pg_notifications::remove_notification( pool, actor_id, // author_actor_id (reposter) "repost", // record_type rkey, // record_rkey (i64) ) .await { tracing::warn!( "Failed to remove repost notification from PostgreSQL: actor_id={}, rkey={}, error={}", actor_id, rkey, e ); } // Invalidate author feed cache when user deletes a repost // Database trigger handles cache invalidation } } CollectionType::BskyFollow => { // Returns subject (target DID) if deleted if let Some(_target_did) = db::follow_delete(conn, rkey, actor_id).await? { // Counts maintained by triggers // Remove PostgreSQL notification (recipient = followed user) if let Err(e) = crate::external::pg_notifications::remove_notification( pool, actor_id, // author_actor_id (follower) "follow", // record_type rkey, // record_rkey (i64) ) .await { tracing::warn!( "Failed to remove follow notification from PostgreSQL: actor_id={}, rkey={}, error={}", actor_id, rkey, e ); } // Invalidate timeline cache when user unfollows someone // Database trigger handles cache invalidation } } CollectionType::BskyFeedPost => { // Returns (parent natural key, embedded post natural key) if deleted if let Some((parent_key, embedded_key)) = db::post_delete(conn, actor_id, rkey).await? { // posts_count maintained by triggers // Remove from parent's reply arrays if this was a reply if let Some((parent_actor_id, parent_rkey)) = parent_key { // Remove from reply arrays using array_remove() let _ = conn.execute( "UPDATE posts SET reply_actor_ids = array_remove(reply_actor_ids, $3), reply_rkeys = array_remove(reply_rkeys, $4) WHERE (actor_id, rkey) = ($1, $2)", &[&parent_actor_id, &parent_rkey, &actor_id, &rkey], ).await; // Invalidate parent post cache since reply array changed // Construct URI using actor cache if let Ok(parent_did) = db::actor_did_from_id(conn, parent_actor_id).await { let _parent_uri = format!("at://{}/app.bsky.feed.post/{}", parent_did, parakeet_db::models::i64_to_tid(parent_rkey)); // Database trigger handles cache invalidation } } // Remove from embedded post's quote arrays if this quoted a record if let Some((embed_actor_id, embed_rkey)) = embedded_key { // Remove from quote arrays using array_remove() let _ = conn.execute( "UPDATE posts SET quote_actor_ids = array_remove(quote_actor_ids, $3), quote_rkeys = array_remove(quote_rkeys, $4) WHERE (actor_id, rkey) = ($1, $2)", &[&embed_actor_id, &embed_rkey, &actor_id, &rkey], ).await; // Invalidate quoted post cache since quote array changed // Construct URI using actor cache if let Ok(embed_did) = db::actor_did_from_id(conn, embed_actor_id).await { let _embed_uri = format!("at://{}/app.bsky.feed.post/{}", embed_did, parakeet_db::models::i64_to_tid(embed_rkey)); // Database trigger handles cache invalidation } } // Remove PostgreSQL notification for this post // Posts can create multiple notifications (reply, quote, mention) // PostgreSQL uses (author_actor_id, record_type, record_rkey) as unique key if let Err(e) = crate::external::pg_notifications::remove_notification( pool, actor_id, // author_actor_id (post author) "post", // record_type rkey, // record_rkey (i64) ) .await { tracing::warn!( "Failed to remove post notification from PostgreSQL: actor_id={}, rkey={}, error={}", actor_id, rkey, e ); } // Invalidate author feed cache when user deletes a post // Database trigger handles cache invalidation } } // Other delete operations that don't affect aggregates CollectionType::BskyProfile => { db::profile_delete(conn, actor_id).await?; // Invalidate profile cache when user deletes profile record // Database trigger handles cache invalidation } CollectionType::BskyStatus => { db::status_delete(conn, actor_id).await?; // Invalidate profile cache when user deletes custom status // Database trigger handles cache invalidation } CollectionType::BskyBlock => { db::block_delete(conn, rkey, actor_id).await?; // Invalidate timeline cache when user unblocks someone // Database trigger handles cache invalidation } CollectionType::BskyFeedGen => { // Feedgens use arbitrary string rkeys - extract from at_uri let rkey_str = parakeet_db::utils::at_uri::extract_rkey(&at_uri) .ok_or_else(|| eyre::eyre!("Invalid at_uri: missing rkey"))?; let rows = db::feedgen_delete(conn, actor_id, rkey_str).await?; if rows > 0 { // feeds_count maintained by application } } CollectionType::BskyFeedPostgate => { db::postgate_delete(conn, actor_id, rkey).await?; } CollectionType::BskyFeedThreadgate => { db::threadgate_delete(conn, actor_id, rkey).await?; } CollectionType::BskyList => { // Lists use arbitrary string rkeys - extract from at_uri let rkey_str = parakeet_db::utils::at_uri::extract_rkey(&at_uri) .ok_or_else(|| eyre::eyre!("Invalid at_uri: missing rkey"))?; let rows = db::list_delete(conn, actor_id, rkey_str).await?; if rows > 0 { // lists_count maintained by application } } CollectionType::BskyListBlock => { db::list_block_delete(conn, actor_id, rkey).await?; } CollectionType::BskyListItem => { db::list_item_delete(conn, actor_id, rkey).await?; } CollectionType::BskyStarterPack => { let rows = db::starter_pack_delete(conn, actor_id, rkey).await?; if rows > 0 { // starterpacks_count maintained by application } } CollectionType::BskyVerification => { db::verification_delete(conn, actor_id, rkey).await?; } CollectionType::BskyLabelerService => { db::labeler_delete(conn, actor_id).await?; } CollectionType::BskyNotificationDeclaration => { db::notif_decl_delete(conn, actor_id).await?; } CollectionType::ChatActorDecl => { db::chat_decl_delete(conn, actor_id).await?; } CollectionType::CommunityLexiconBookmark => { db::bookmark_delete(conn, rkey, actor_id).await?; } _ => { // Unsupported collection types - no-op } } // Note: No generic records table to delete from anymore Ok(()) } DatabaseOperation::MaintainSelfLabels { actor_id, cid, at_uri, labels, } => { // Query DID from actor_id (needed by db function) let did = db::actor_did_from_id(conn, actor_id).await?; db::maintain_self_labels(conn, &did, cid, &at_uri, labels).await?; Ok(()) } DatabaseOperation::MaintainPostgateDetaches { post_uri, detached_uris, disable_effective, } => { // Use optimized cached version that: // - Skips no-ops (posts with no quotes) // - Uses direct database lookups for IDs // - Does single efficient UPDATE db::postgate_maintain_detaches_cached(conn, &post_uri, &detached_uris, disable_effective).await?; Ok(()) } } }