1//! Database operation executor
2//!
3//! This module contains the actual database execution logic for all DatabaseOperation variants.
4//!
5//! ## Architecture
6//!
7//! - `execute_operation()` - Single operation execution with aggregate delta generation
8//! - `describe_operation()` - Logging helper for operation names and URIs
9//!
10//! ## Aggregate Deltas
11//!
12//! Counts are maintained automatically by database triggers.
13//! Operations return PostStatsDeltas for compatibility.
14
15use super::DatabaseOperation;
16use ipld_core::cid::Cid;
17
18pub fn describe_operation(op: &DatabaseOperation) -> (String, Option<String>) {
19 match op {
20 DatabaseOperation::InsertPost { rkey, actor_id, .. } => {
21 ("InsertPost".to_string(), Some(format!("actor_id:{}:rkey:{}", actor_id, rkey)))
22 }
23 DatabaseOperation::InsertLike { rkey, actor_id, .. } => (
24 "InsertLike".to_string(),
25 Some(format!("actor_id:{}/app.bsky.feed.like/{}", actor_id, rkey)),
26 ),
27 DatabaseOperation::InsertRepost { rkey, actor_id, .. } => (
28 "InsertRepost".to_string(),
29 Some(format!("actor_id:{}/app.bsky.feed.repost/{}", actor_id, rkey)),
30 ),
31 DatabaseOperation::InsertFollow { rkey, actor_id, .. } => (
32 "InsertFollow".to_string(),
33 Some(format!("actor_id:{}/app.bsky.graph.follow/{}", actor_id, rkey)),
34 ),
35 DatabaseOperation::InsertBlock { rkey, actor_id, .. } => (
36 "InsertBlock".to_string(),
37 Some(format!("actor_id:{}/app.bsky.graph.block/{}", actor_id, rkey)),
38 ),
39 DatabaseOperation::InsertNotification { author_actor_id, record_type, record_rkey, .. } => {
40 ("InsertNotification".to_string(), Some(format!("actor_id:{}/{}/{}", author_actor_id, record_type, record_rkey)))
41 }
42 DatabaseOperation::NotifyReplyChain { reply_uri, .. } => {
43 ("NotifyReplyChain".to_string(), Some(reply_uri.clone()))
44 }
45 DatabaseOperation::UpsertActor { did, .. } => {
46 ("UpsertActor".to_string(), Some(did.clone()))
47 }
48 DatabaseOperation::UpsertProfile { actor_id, .. } => {
49 ("UpsertProfile".to_string(), Some(format!("actor_id:{}", actor_id)))
50 }
51 DatabaseOperation::UpsertStatus { actor_id, .. } => {
52 ("UpsertStatus".to_string(), Some(format!("actor_id:{}", actor_id)))
53 }
54 DatabaseOperation::UpsertPostgate { rkey, actor_id, .. } => {
55 ("UpsertPostgate".to_string(), Some(format!("actor_id:{}:rkey:{}", actor_id, rkey)))
56 }
57 DatabaseOperation::UpsertThreadgate { rkey, actor_id, .. } => {
58 ("UpsertThreadgate".to_string(), Some(format!("actor_id:{}:rkey:{}", actor_id, rkey)))
59 }
60 DatabaseOperation::UpsertList { rkey, actor_id, .. } => {
61 ("UpsertList".to_string(), Some(format!("actor_id:{}:rkey:{}", actor_id, rkey)))
62 }
63 DatabaseOperation::InsertListItem { rkey, actor_id, .. } => {
64 ("InsertListItem".to_string(), Some(format!("actor_id:{}:rkey:{}", actor_id, rkey)))
65 }
66 DatabaseOperation::InsertListBlock { rkey, actor_id, .. } => {
67 ("InsertListBlock".to_string(), Some(format!("actor_id:{}:rkey:{}", actor_id, rkey)))
68 }
69 DatabaseOperation::UpsertFeedGenerator { rkey, actor_id, .. } => {
70 ("UpsertFeedGenerator".to_string(), Some(format!("actor_id:{}:rkey:{}", actor_id, rkey)))
71 }
72 DatabaseOperation::UpsertStarterPack { rkey, actor_id, .. } => {
73 ("UpsertStarterPack".to_string(), Some(format!("actor_id:{}:rkey:{}", actor_id, rkey)))
74 }
75 DatabaseOperation::InsertVerification { rkey, actor_id, .. } => {
76 ("InsertVerification".to_string(), Some(format!("actor_id:{}:rkey:{}", actor_id, rkey)))
77 }
78 DatabaseOperation::UpsertLabeler { actor_id, .. } => {
79 ("UpsertLabeler".to_string(), Some(format!("actor_id:{}", actor_id)))
80 }
81 DatabaseOperation::UpsertNotificationDeclaration { actor_id, .. } => (
82 "UpsertNotificationDeclaration".to_string(),
83 Some(format!("actor_id:{}", actor_id)),
84 ),
85 DatabaseOperation::UpsertChatDeclaration { actor_id, .. } => {
86 ("UpsertChatDeclaration".to_string(), Some(format!("actor_id:{}", actor_id)))
87 }
88 DatabaseOperation::UpsertBookmark { rkey, actor_id, .. } => (
89 "UpsertBookmark".to_string(),
90 Some(format!(
91 "actor_id:{}/community.lexicon.bookmarks.bookmark/{}",
92 actor_id, rkey
93 )),
94 ),
95 DatabaseOperation::DeleteRecord {
96 at_uri, collection, ..
97 } => (
98 format!("DeleteRecord({:?})", collection),
99 Some(at_uri.clone()),
100 ),
101 DatabaseOperation::MaintainSelfLabels { at_uri, .. } => {
102 ("MaintainSelfLabels".to_string(), Some(at_uri.clone()))
103 }
104 DatabaseOperation::MaintainPostgateDetaches { post_uri, .. } => (
105 "MaintainPostgateDetaches".to_string(),
106 Some(post_uri.clone()),
107 ),
108 }
109}
110
111/// Execute a single database operation
112///
113/// This function will be called by the database writer for each operation.
114///
115/// Counts and cache invalidations are maintained by database triggers.
116/// Execute database operations
117pub async fn execute_operation(
118 pool: &deadpool_postgres::Pool,
119 conn: &mut deadpool_postgres::Object,
120 op: DatabaseOperation,
121) -> eyre::Result<()> {
122 use crate::db;
123
124 match op {
125 DatabaseOperation::InsertPost {
126 rkey,
127 actor_id,
128 cid,
129 record,
130 source,
131 } => {
132 // Extract parent URI and embed URI before record is consumed
133 let parent_uri = record.reply.as_ref().map(|reply| reply.parent.uri.clone());
134
135 let embed_uri = record.embed.as_ref().and_then(|embed_union| {
136 embed_union.as_bsky().and_then(|embed| match embed {
137 crate::types::records::AppBskyEmbed::Record(r) => Some(r.record.uri.clone()),
138 crate::types::records::AppBskyEmbed::RecordWithMedia(r) => {
139 Some(r.record.uri.clone())
140 }
141 _ => None,
142 })
143 });
144
145 let start = std::time::Instant::now();
146 let rows =
147 db::post_insert(conn, actor_id, rkey, cid, record, source).await?;
148 let elapsed_ms = start.elapsed().as_millis();
149
150 // Log slow post inserts (>1s)
151 if elapsed_ms > 1000 {
152 tracing::warn!(
153 actor_id = actor_id,
154 rkey = %rkey,
155 elapsed_ms = elapsed_ms,
156 source = ?source,
157 "Slow post insert detected"
158 );
159 } else if elapsed_ms > 500 {
160 tracing::info!(
161 actor_id = actor_id,
162 rkey = %rkey,
163 elapsed_ms = elapsed_ms,
164 source = ?source,
165 "Post insert timing"
166 );
167 }
168 if rows > 0 {
169 // If this is a reply, increment parent post's reply count
170 if let Some(parent_uri) = parent_uri.as_ref() {
171 // Resolve parent URI to natural key
172 if let Ok(Some((parent_actor_id, parent_rkey))) = crate::db::id_resolution::resolve_post_uri(conn, parent_uri).await {
173 // Append to parent's reply arrays (array-only tracking)
174 // Arrays are correlated (actor_id, rkey) pairs sorted by rkey for better columnstore compression
175 let _ = conn.execute(
176 "UPDATE posts
177 SET reply_actor_ids = (SELECT array_agg(actor_id ORDER BY rkey) FROM unnest(COALESCE(reply_actor_ids, ARRAY[]::integer[]) || $3, COALESCE(reply_rkeys, ARRAY[]::bigint[]) || $4) AS t(actor_id, rkey)),
178 reply_rkeys = (SELECT array_agg(rkey ORDER BY rkey) FROM unnest(COALESCE(reply_actor_ids, ARRAY[]::integer[]) || $3, COALESCE(reply_rkeys, ARRAY[]::bigint[]) || $4) AS t(actor_id, rkey))
179 WHERE (actor_id, rkey) = ($1, $2)
180 AND NOT ($3 = ANY(COALESCE(reply_actor_ids, ARRAY[])))",
181 &[&parent_actor_id, &parent_rkey, &actor_id, &rkey],
182 ).await;
183 // Database trigger handles cache invalidation
184 }
185 }
186
187 // If this quotes/embeds a post, append to that post's quote arrays
188 if let Some(quoted_uri) = embed_uri.as_ref() {
189 // Resolve quoted URI to natural key
190 if let Ok(Some((quoted_actor_id, quoted_rkey))) = crate::db::id_resolution::resolve_post_uri(conn, quoted_uri).await {
191 // Append to quoted post's quote arrays (array-only tracking)
192 // Arrays are correlated (actor_id, rkey) pairs sorted by rkey for better columnstore compression
193 let _ = conn.execute(
194 "UPDATE posts
195 SET quote_actor_ids = (SELECT array_agg(actor_id ORDER BY rkey) FROM unnest(COALESCE(quote_actor_ids, ARRAY[]::integer[]) || $3, COALESCE(quote_rkeys, ARRAY[]::bigint[]) || $4) AS t(actor_id, rkey)),
196 quote_rkeys = (SELECT array_agg(rkey ORDER BY rkey) FROM unnest(COALESCE(quote_actor_ids, ARRAY[]::integer[]) || $3, COALESCE(quote_rkeys, ARRAY[]::bigint[]) || $4) AS t(actor_id, rkey))
197 WHERE (actor_id, rkey) = ($1, $2)
198 AND NOT ($3 = ANY(COALESCE(quote_actor_ids, ARRAY[])))",
199 &["ed_actor_id, "ed_rkey, &actor_id, &rkey],
200 ).await;
201 // Database trigger handles cache invalidation
202 }
203 }
204
205 Ok(())
206 } else {
207 Ok(())
208 }
209 }
210 DatabaseOperation::InsertLike {
211 rkey,
212 actor_id,
213 record,
214 via_repost_key,
215 source,
216 } => {
217
218 // Extract URI before record is moved
219 let subject_uri = record.subject.uri.clone();
220
221 let start = std::time::Instant::now();
222 let rows = db::like_insert(conn, rkey, actor_id, record, via_repost_key, source).await?;
223 let elapsed_ms = start.elapsed().as_millis();
224
225 // Log slow like inserts
226 if elapsed_ms > 1000 {
227 tracing::warn!(
228 subject_uri = %subject_uri,
229 elapsed_ms = elapsed_ms,
230 "Slow like insert detected"
231 );
232 }
233
234 if rows > 0 {
235 // Note: like arrays are updated directly in db::like_insert via LikeArrayOp
236 // No deltas needed with array-only tracking
237 // Database trigger handles cache invalidation
238
239 Ok(())
240 } else {
241 Ok(())
242 }
243 }
244 DatabaseOperation::InsertRepost {
245 rkey,
246 actor_id,
247 cid,
248 record,
249 via_repost_key,
250 source,
251 } => {
252
253 // Extract URI before record is moved
254 let subject_uri = record.subject.uri.clone();
255
256 let start = std::time::Instant::now();
257 let rows = db::repost_insert(conn, rkey, actor_id, cid, record, via_repost_key, source).await?;
258 let elapsed_ms = start.elapsed().as_millis();
259
260 // Log slow repost inserts
261 if elapsed_ms > 1000 {
262 tracing::warn!(
263 subject_uri = %subject_uri,
264 elapsed_ms = elapsed_ms,
265 "Slow repost insert detected"
266 );
267 }
268
269 if rows > 0 {
270 // Note: repost arrays will be updated in db::repost_insert (dual-write to reposts table + post arrays)
271 // No deltas needed with array-only tracking
272
273 // Database trigger handles cache invalidation
274
275 Ok(())
276 } else {
277 Ok(())
278 }
279 }
280 DatabaseOperation::InsertFollow {
281 rkey,
282 actor_id,
283 subject_actor_id,
284 cid,
285 record,
286 } => {
287 let rows = db::follow_insert(conn, rkey, actor_id, subject_actor_id, cid, record).await?;
288 if rows > 0 {
289 // Counts maintained by triggers
290 Ok(())
291 } else {
292 Ok(())
293 }
294 }
295 DatabaseOperation::InsertBlock {
296 rkey,
297 actor_id,
298 subject_actor_id,
299 cid,
300 record,
301 } => {
302 db::block_insert(conn, rkey, actor_id, subject_actor_id, cid, record).await?;
303 Ok(()) // Blocks don't affect aggregates (yet)
304 }
305 DatabaseOperation::InsertNotification {
306 recipient_actor_id,
307 author_actor_id,
308 record_type,
309 record_rkey,
310 record_cid,
311 reason,
312 subject_actor_id,
313 subject_record_type,
314 subject_rkey,
315 created_at,
316 } => {
317 // Skip self-notifications (author notifying themselves)
318 if recipient_actor_id == author_actor_id {
319 return Ok(());
320 }
321
322
323 // Check if thread is muted (for reply, quote, and mention notifications)
324 // Only posts can be thread roots, so only check for post subjects
325 if let (Some(subj_actor_id), Some("post"), Some(subj_rkey)) =
326 (subject_actor_id, subject_record_type.as_deref(), subject_rkey.as_ref())
327 {
328 if db::is_thread_muted(conn, recipient_actor_id, subj_actor_id, *subj_rkey).await? {
329 tracing::debug!(
330 "Skipping notification for muted thread: recipient_actor_id={}, root_post_actor_id={}, root_post_rkey={}",
331 recipient_actor_id,
332 subj_actor_id,
333 subj_rkey
334 );
335 return Ok(());
336 }
337 }
338
339 // Add to PostgreSQL
340 if let Err(e) = crate::external::pg_notifications::add_notification(
341 pool,
342 recipient_actor_id,
343 author_actor_id,
344 &record_type,
345 record_rkey, // i64 (Copy type)
346 &record_cid, // &[u8]
347 &reason,
348 subject_actor_id,
349 subject_record_type.as_deref(),
350 subject_rkey, // Option<i64> (Copy type)
351 created_at,
352 )
353 .await
354 {
355 tracing::warn!(
356 "Failed to add notification to PostgreSQL: recipient_actor_id={}, record={}/{}, error={}",
357 recipient_actor_id,
358 record_type,
359 record_rkey,
360 e
361 );
362 metrics::counter!("batch_writer_notif_pg_error").increment(1);
363 } else {
364 tracing::debug!(
365 "Created notification in PostgreSQL: author_actor_id={} -> recipient_actor_id={} (reason: {})",
366 author_actor_id,
367 recipient_actor_id,
368 reason
369 );
370 }
371
372 Ok(()) // Notifications don't affect aggregates
373 }
374 DatabaseOperation::NotifyReplyChain {
375 reply_uri,
376 author_actor_id,
377 cid,
378 created_at,
379 parent_uri,
380 root_uri,
381 max_levels,
382 already_notified_actor_ids,
383 } => {
384 let chain_start = std::time::Instant::now();
385
386 // Track actor IDs we've already notified to avoid duplicates
387 let mut notified_actor_ids: std::collections::HashSet<i32> =
388 already_notified_actor_ids.into_iter().collect();
389
390 // The reasonSubject is the root URI (or parent if no root)
391 let reason_subject = root_uri.unwrap_or_else(|| parent_uri.clone());
392
393 // Walk up the reply chain, creating notifications for ancestors
394 let mut current_uri = parent_uri;
395 let mut level = 0;
396
397 while level < max_levels {
398 // Query the database to get the parent post author and its parent URI
399 let result = db::get_reply_chain_parent(conn, ¤t_uri).await?;
400
401 let Some((ancestor_actor_id, ancestor_parent_uri)) = result else {
402 // Post not found - stop walking
403 tracing::debug!(
404 "Reply chain walk stopped at level {}: post {} not found",
405 level,
406 current_uri
407 );
408 break;
409 };
410
411 // Skip if this is the reply author themselves
412 if ancestor_actor_id == author_actor_id {
413 tracing::debug!(
414 "Skipping reply-chain notification at level {}: ancestor is the reply author",
415 level
416 );
417 } else if notified_actor_ids.contains(&ancestor_actor_id) {
418 // Already notified (either as direct parent or root)
419 tracing::debug!(
420 "Skipping reply-chain notification at level {}: actor_id={} already notified",
421 level,
422 ancestor_actor_id
423 );
424 } else {
425 // Parse reason_subject URI early for thread mute check and notification
426 // Format: at://did:plc:xyz/app.bsky.feed.post/rkey
427 let subject_rkey_str = reason_subject.split('/').next_back().unwrap_or("");
428 let subject_rkey_i64 = match parakeet_db::utils::tid::decode_tid(subject_rkey_str) {
429 Ok(rkey) => rkey,
430 Err(e) => {
431 tracing::warn!("Invalid subject TID in reason_subject: {} - {}", subject_rkey_str, e);
432 level += 1;
433 if let Some(parent) = ancestor_parent_uri {
434 current_uri = parent;
435 continue;
436 } else {
437 break;
438 }
439 }
440 };
441
442 // Get subject_actor_id from reason_subject DID
443 let subject_actor_id = if let Some(subject_did) = reason_subject
444 .strip_prefix("at://")
445 .and_then(|s| s.split('/').next())
446 {
447 match db::actor_id_from_did(conn, subject_did).await? {
448 Some(id) => id,
449 None => {
450 tracing::debug!(
451 "Skipping reply-chain notification at level {}: subject DID not found: {}",
452 level,
453 subject_did
454 );
455 level += 1;
456 if let Some(parent) = ancestor_parent_uri {
457 current_uri = parent;
458 continue;
459 } else {
460 break;
461 }
462 }
463 }
464 } else {
465 tracing::warn!("Invalid reason_subject URI format: {}", reason_subject);
466 level += 1;
467 if let Some(parent) = ancestor_parent_uri {
468 current_uri = parent;
469 continue;
470 } else {
471 break;
472 }
473 };
474
475 // Check if thread is muted for this ancestor
476 if db::is_thread_muted(conn, ancestor_actor_id, subject_actor_id, subject_rkey_i64).await? {
477 // Check if ancestor has muted this thread using actor_ids and i64 rkey
478 tracing::debug!(
479 "Skipping reply-chain notification at level {}: actor_id={} muted thread (root_post_actor_id={}, rkey={})",
480 level,
481 ancestor_actor_id,
482 subject_actor_id,
483 subject_rkey_i64
484 );
485 } else {
486 // Create PostgreSQL notification for this ancestor
487 // Extract rkey from reply_uri (at://did/app.bsky.feed.post/rkey)
488 let reply_rkey_str = reply_uri.split('/').next_back().unwrap_or("");
489 let reply_rkey_i64 = match parakeet_db::utils::tid::decode_tid(reply_rkey_str) {
490 Ok(rkey) => rkey,
491 Err(e) => {
492 tracing::warn!("Invalid reply TID: {} - {}", reply_rkey_str, e);
493 level += 1;
494 if let Some(parent) = ancestor_parent_uri {
495 current_uri = parent;
496 continue;
497 } else {
498 break;
499 }
500 }
501 };
502
503 // Parse CID string and extract digest
504 let cid_digest = match Cid::try_from(cid.as_str()) {
505 Ok(cid_obj) => {
506 let cid_bytes = cid_obj.to_bytes();
507 match parakeet_db::utils::cid::cid_to_digest_owned(&cid_bytes) {
508 Some(digest) => digest,
509 None => {
510 tracing::warn!("Invalid CID digest for CID: {}", cid);
511 level += 1;
512 if let Some(parent) = ancestor_parent_uri {
513 current_uri = parent;
514 continue;
515 } else {
516 break;
517 }
518 }
519 }
520 }
521 Err(e) => {
522 tracing::warn!("Failed to parse CID: {} - {}", cid, e);
523 level += 1;
524 if let Some(parent) = ancestor_parent_uri {
525 current_uri = parent;
526 continue;
527 } else {
528 break;
529 }
530 }
531 };
532
533 if let Err(e) = crate::external::pg_notifications::add_notification(
534 pool,
535 ancestor_actor_id,
536 author_actor_id,
537 "post", // record_type (the reply is a post)
538 reply_rkey_i64, // record_rkey (i64)
539 &cid_digest, // record_cid (32-byte digest)
540 "reply", // reason
541 Some(subject_actor_id), // subject_actor_id (root/parent post author)
542 Some("post"), // subject_record_type
543 Some(subject_rkey_i64), // subject_rkey (Option<i64>)
544 created_at,
545 )
546 .await
547 {
548 tracing::warn!(
549 "Failed to add reply-chain notification to PostgreSQL: recipient_actor_id={}, record_rkey={}, error={}",
550 ancestor_actor_id,
551 reply_rkey_i64,
552 e
553 );
554 } else {
555 tracing::debug!(
556 "Created reply-chain notification at level {}: author_actor_id={} -> recipient_actor_id={}",
557 level,
558 author_actor_id,
559 ancestor_actor_id
560 );
561 notified_actor_ids.insert(ancestor_actor_id);
562 }
563 }
564 }
565
566 // Move to the next ancestor
567 match ancestor_parent_uri {
568 Some(parent) => {
569 current_uri = parent;
570 level += 1;
571 }
572 None => {
573 // Reached the root of the thread
574 tracing::debug!("Reply chain walk reached root at level {}", level);
575 break;
576 }
577 }
578 }
579
580 let chain_elapsed_ms = chain_start.elapsed().as_millis();
581 if chain_elapsed_ms > 1000 {
582 tracing::warn!(
583 reply_uri = %reply_uri,
584 elapsed_ms = chain_elapsed_ms,
585 levels_walked = level,
586 "Slow NotifyReplyChain detected"
587 );
588 }
589
590 Ok(()) // Notifications don't affect aggregates
591 }
592 DatabaseOperation::UpsertActor {
593 did,
594 status,
595 sync_state,
596 handle,
597 account_created_at,
598 timestamp,
599 } => {
600 db::actor_upsert(
601 conn,
602 &did,
603 status.as_ref(),
604 &sync_state,
605 handle.as_deref(),
606 account_created_at.as_ref(),
607 timestamp,
608 )
609 .await?;
610 Ok(())
611 }
612 DatabaseOperation::UpsertProfile { actor_id, cid, record } => {
613 // Query DID from actor_id (needed by db functions)
614 let did = db::actor_did_from_id(conn, actor_id).await?;
615
616 db::profile_upsert(conn, actor_id, &did, cid, record).await?;
617 // Note: Handle resolution is now managed by Tap, not enqueued separately
618 Ok(())
619 }
620 DatabaseOperation::UpsertStatus { actor_id, cid, record } => {
621 // Query DID from actor_id (needed by db functions)
622 let did = db::actor_did_from_id(conn, actor_id).await?;
623
624 db::status_upsert(conn, actor_id, &did, cid, record).await?;
625 Ok(())
626 }
627 DatabaseOperation::UpsertPostgate {
628 rkey,
629 actor_id,
630 cid,
631 record,
632 } => {
633 db::postgate_upsert(conn, actor_id, rkey, cid, &record).await?;
634 Ok(())
635 }
636 DatabaseOperation::UpsertThreadgate {
637 rkey,
638 actor_id,
639 cid,
640 record,
641 } => {
642 db::threadgate_upsert(conn, actor_id, rkey, cid, record).await?;
643 Ok(())
644 }
645 DatabaseOperation::UpsertList {
646 rkey,
647 actor_id,
648 cid,
649 record,
650 } => {
651 // Lists use arbitrary string rkeys (not TID-based like posts)
652 let inserted = db::list_upsert(conn, actor_id, &rkey, cid, record).await?;
653 if inserted {
654 // lists_count maintained by application
655 Ok(())
656 } else {
657 Ok(())
658 }
659 }
660 DatabaseOperation::InsertListItem {
661 rkey,
662 actor_id,
663 subject_actor_id,
664 cid,
665 record,
666 } => {
667 db::list_item_insert(conn, actor_id, rkey, subject_actor_id, cid, record).await?;
668 Ok(())
669 }
670 DatabaseOperation::InsertListBlock {
671 rkey,
672 actor_id,
673 cid,
674 record,
675 } => {
676 db::list_block_insert(conn, actor_id, rkey, cid, record).await?;
677 Ok(())
678 }
679 DatabaseOperation::UpsertFeedGenerator {
680 rkey,
681 actor_id,
682 cid,
683 service_actor_id,
684 record,
685 } => {
686 // Service actor ID already resolved during reference extraction
687 let inserted = db::feedgen_upsert(conn, actor_id, &rkey, cid, service_actor_id, record).await?;
688 if inserted {
689 // feeds_count maintained by application
690 Ok(())
691 } else {
692 Ok(())
693 }
694 }
695 DatabaseOperation::UpsertStarterPack {
696 rkey,
697 actor_id,
698 cid,
699 record,
700 } => {
701 let inserted = db::starter_pack_upsert(conn, actor_id, rkey, cid, record).await?;
702 if inserted {
703 // starterpacks_count maintained by application
704 Ok(())
705 } else {
706 Ok(())
707 }
708 }
709 DatabaseOperation::InsertVerification {
710 rkey,
711 actor_id,
712 subject_actor_id,
713 cid,
714 record,
715 } => {
716 db::verification_insert(conn, actor_id, rkey, subject_actor_id, cid, record).await?;
717 Ok(())
718 }
719 DatabaseOperation::UpsertLabeler { actor_id, cid, record } => {
720 // Query DID from actor_id (needed by db functions)
721 let did = db::actor_did_from_id(conn, actor_id).await?;
722
723 db::labeler_upsert(conn, actor_id, &did, cid, record).await?;
724 Ok(())
725 }
726 DatabaseOperation::UpsertNotificationDeclaration { actor_id, record } => {
727 db::notif_decl_upsert(conn, actor_id, record).await?;
728 Ok(())
729 }
730 DatabaseOperation::UpsertChatDeclaration { actor_id, record } => {
731 db::chat_decl_upsert(conn, actor_id, record).await?;
732 Ok(())
733 }
734 DatabaseOperation::UpsertBookmark { actor_id, rkey, record } => {
735 db::bookmark_upsert(conn, actor_id, rkey, record).await?;
736 Ok(())
737 }
738 DatabaseOperation::DeleteRecord {
739 actor_id,
740 collection,
741 rkey,
742 at_uri,
743 } => {
744 use crate::relay::types::CollectionType;
745
746 // actor_id is already resolved in the resolution phase
747 // No database lookup needed here!
748
749 // Execute type-specific delete operation and collect deltas
750 match collection {
751 CollectionType::BskyFeedLike => {
752 // Returns subject natural key if deleted
753 if let Some(subject) = db::like_delete(conn, rkey, actor_id).await? {
754 match subject {
755 db::LikeSubject::Post { actor_id: post_actor_id, rkey: post_rkey } => {
756 // Note: like arrays are updated directly in db::like_delete via array_remove()
757 // No deltas needed with array-only tracking
758
759 // Invalidate post cache
760 if let Ok(post_did) = db::actor_did_from_id(conn, post_actor_id).await {
761 let _post_uri = format!("at://{}/app.bsky.feed.post/{}",
762 post_did,
763 parakeet_db::models::i64_to_tid(post_rkey));
764 // Database trigger handles cache invalidation
765
766 // Remove PostgreSQL notification (recipient = author of liked post)
767 if let Err(e) =
768 crate::external::pg_notifications::remove_notification(
769 pool,
770 actor_id, // author_actor_id (liker)
771 "like", // record_type
772 rkey, // record_rkey (i64)
773 )
774 .await
775 {
776 tracing::warn!(
777 "Failed to remove like notification from PostgreSQL: actor_id={}, rkey={}, error={}",
778 actor_id,
779 rkey,
780 e
781 );
782 }
783 }
784 }
785 db::LikeSubject::Feedgen { actor_id: _feedgen_actor_id, rkey: _feedgen_rkey } => {
786 // Database trigger handles cache invalidation for feedgen likes
787 }
788 db::LikeSubject::Labeler { actor_id: _labeler_actor_id } => {
789 // Database trigger handles cache invalidation for labeler likes
790 }
791 }
792 }
793 }
794 CollectionType::BskyFeedRepost => {
795 // Returns post URI if deleted
796 if let Some(_post_uri) = db::repost_delete(conn, rkey, actor_id).await? {
797 // Note: repost arrays will be updated in db::repost_delete (remove from post arrays)
798 // No deltas needed with array-only tracking
799
800 // Invalidate post cache since repost array changed
801 // Database trigger handles cache invalidation
802
803 // Remove PostgreSQL notification (recipient = author of reposted post)
804 if let Err(e) =
805 crate::external::pg_notifications::remove_notification(
806 pool,
807 actor_id, // author_actor_id (reposter)
808 "repost", // record_type
809 rkey, // record_rkey (i64)
810 )
811 .await
812 {
813 tracing::warn!(
814 "Failed to remove repost notification from PostgreSQL: actor_id={}, rkey={}, error={}",
815 actor_id,
816 rkey,
817 e
818 );
819 }
820
821 // Invalidate author feed cache when user deletes a repost
822 // Database trigger handles cache invalidation
823 }
824 }
825 CollectionType::BskyFollow => {
826 // Returns subject (target DID) if deleted
827 if let Some(_target_did) = db::follow_delete(conn, rkey, actor_id).await? {
828 // Counts maintained by triggers
829
830 // Remove PostgreSQL notification (recipient = followed user)
831 if let Err(e) =
832 crate::external::pg_notifications::remove_notification(
833 pool,
834 actor_id, // author_actor_id (follower)
835 "follow", // record_type
836 rkey, // record_rkey (i64)
837 )
838 .await
839 {
840 tracing::warn!(
841 "Failed to remove follow notification from PostgreSQL: actor_id={}, rkey={}, error={}",
842 actor_id,
843 rkey,
844 e
845 );
846 }
847
848 // Invalidate timeline cache when user unfollows someone
849 // Database trigger handles cache invalidation
850 }
851 }
852 CollectionType::BskyFeedPost => {
853 // Returns (parent natural key, embedded post natural key) if deleted
854 if let Some((parent_key, embedded_key)) =
855 db::post_delete(conn, actor_id, rkey).await?
856 {
857 // posts_count maintained by triggers
858
859 // Remove from parent's reply arrays if this was a reply
860 if let Some((parent_actor_id, parent_rkey)) = parent_key {
861 // Remove from reply arrays using array_remove()
862 let _ = conn.execute(
863 "UPDATE posts
864 SET reply_actor_ids = array_remove(reply_actor_ids, $3),
865 reply_rkeys = array_remove(reply_rkeys, $4)
866 WHERE (actor_id, rkey) = ($1, $2)",
867 &[&parent_actor_id, &parent_rkey, &actor_id, &rkey],
868 ).await;
869
870 // Invalidate parent post cache since reply array changed
871 // Construct URI using actor cache
872 if let Ok(parent_did) = db::actor_did_from_id(conn, parent_actor_id).await {
873 let _parent_uri = format!("at://{}/app.bsky.feed.post/{}",
874 parent_did,
875 parakeet_db::models::i64_to_tid(parent_rkey));
876 // Database trigger handles cache invalidation
877 }
878 }
879
880 // Remove from embedded post's quote arrays if this quoted a record
881 if let Some((embed_actor_id, embed_rkey)) = embedded_key {
882 // Remove from quote arrays using array_remove()
883 let _ = conn.execute(
884 "UPDATE posts
885 SET quote_actor_ids = array_remove(quote_actor_ids, $3),
886 quote_rkeys = array_remove(quote_rkeys, $4)
887 WHERE (actor_id, rkey) = ($1, $2)",
888 &[&embed_actor_id, &embed_rkey, &actor_id, &rkey],
889 ).await;
890
891 // Invalidate quoted post cache since quote array changed
892 // Construct URI using actor cache
893 if let Ok(embed_did) = db::actor_did_from_id(conn, embed_actor_id).await {
894 let _embed_uri = format!("at://{}/app.bsky.feed.post/{}",
895 embed_did,
896 parakeet_db::models::i64_to_tid(embed_rkey));
897 // Database trigger handles cache invalidation
898 }
899 }
900
901 // Remove PostgreSQL notification for this post
902 // Posts can create multiple notifications (reply, quote, mention)
903 // PostgreSQL uses (author_actor_id, record_type, record_rkey) as unique key
904 if let Err(e) =
905 crate::external::pg_notifications::remove_notification(
906 pool,
907 actor_id, // author_actor_id (post author)
908 "post", // record_type
909 rkey, // record_rkey (i64)
910 )
911 .await
912 {
913 tracing::warn!(
914 "Failed to remove post notification from PostgreSQL: actor_id={}, rkey={}, error={}",
915 actor_id,
916 rkey,
917 e
918 );
919 }
920
921 // Invalidate author feed cache when user deletes a post
922 // Database trigger handles cache invalidation
923 }
924 }
925 // Other delete operations that don't affect aggregates
926 CollectionType::BskyProfile => {
927 db::profile_delete(conn, actor_id).await?;
928 // Invalidate profile cache when user deletes profile record
929 // Database trigger handles cache invalidation
930 }
931 CollectionType::BskyStatus => {
932 db::status_delete(conn, actor_id).await?;
933 // Invalidate profile cache when user deletes custom status
934 // Database trigger handles cache invalidation
935 }
936 CollectionType::BskyBlock => {
937 db::block_delete(conn, rkey, actor_id).await?;
938 // Invalidate timeline cache when user unblocks someone
939 // Database trigger handles cache invalidation
940 }
941 CollectionType::BskyFeedGen => {
942 // Feedgens use arbitrary string rkeys - extract from at_uri
943 let rkey_str = parakeet_db::utils::at_uri::extract_rkey(&at_uri)
944 .ok_or_else(|| eyre::eyre!("Invalid at_uri: missing rkey"))?;
945 let rows = db::feedgen_delete(conn, actor_id, rkey_str).await?;
946 if rows > 0 {
947 // feeds_count maintained by application
948 }
949 }
950 CollectionType::BskyFeedPostgate => {
951 db::postgate_delete(conn, actor_id, rkey).await?;
952 }
953 CollectionType::BskyFeedThreadgate => {
954 db::threadgate_delete(conn, actor_id, rkey).await?;
955 }
956 CollectionType::BskyList => {
957 // Lists use arbitrary string rkeys - extract from at_uri
958 let rkey_str = parakeet_db::utils::at_uri::extract_rkey(&at_uri)
959 .ok_or_else(|| eyre::eyre!("Invalid at_uri: missing rkey"))?;
960
961 let rows = db::list_delete(conn, actor_id, rkey_str).await?;
962 if rows > 0 {
963 // lists_count maintained by application
964 }
965 }
966 CollectionType::BskyListBlock => {
967 db::list_block_delete(conn, actor_id, rkey).await?;
968 }
969 CollectionType::BskyListItem => {
970 db::list_item_delete(conn, actor_id, rkey).await?;
971 }
972 CollectionType::BskyStarterPack => {
973 let rows = db::starter_pack_delete(conn, actor_id, rkey).await?;
974 if rows > 0 {
975 // starterpacks_count maintained by application
976 }
977 }
978 CollectionType::BskyVerification => {
979 db::verification_delete(conn, actor_id, rkey).await?;
980 }
981 CollectionType::BskyLabelerService => {
982 db::labeler_delete(conn, actor_id).await?;
983 }
984 CollectionType::BskyNotificationDeclaration => {
985 db::notif_decl_delete(conn, actor_id).await?;
986 }
987 CollectionType::ChatActorDecl => {
988 db::chat_decl_delete(conn, actor_id).await?;
989 }
990 CollectionType::CommunityLexiconBookmark => {
991 db::bookmark_delete(conn, rkey, actor_id).await?;
992 }
993 _ => {
994 // Unsupported collection types - no-op
995 }
996 }
997
998 // Note: No generic records table to delete from anymore
999
1000 Ok(())
1001 }
1002 DatabaseOperation::MaintainSelfLabels {
1003 actor_id,
1004 cid,
1005 at_uri,
1006 labels,
1007 } => {
1008 // Query DID from actor_id (needed by db function)
1009 let did = db::actor_did_from_id(conn, actor_id).await?;
1010
1011 db::maintain_self_labels(conn, &did, cid, &at_uri, labels).await?;
1012 Ok(())
1013 }
1014 DatabaseOperation::MaintainPostgateDetaches {
1015 post_uri,
1016 detached_uris,
1017 disable_effective,
1018 } => {
1019 // Use optimized cached version that:
1020 // - Skips no-ops (posts with no quotes)
1021 // - Uses direct database lookups for IDs
1022 // - Does single efficient UPDATE
1023 db::postgate_maintain_detaches_cached(conn, &post_uri, &detached_uris, disable_effective).await?;
1024 Ok(())
1025 }
1026 }
1027}