1//! Bulk record processing for COPY operations
2//!
3//! This module processes UnresolvedBulk events into ResolvedBulk events by:
4//! 1. Grouping records by type (likes, follows, posts, etc.)
5//! 2. Bulk resolving all foreign keys in single queries
6//! 3. Converting to COPY-ready data structures
7//!
8//! For records that can't be bulk processed (profiles, gates, etc.),
9//! they're converted to individual DatabaseOperations.
10
11use super::bulk_types::{
12 BlockCopyData, BulkOperations, FeedgenLikeCopyData, FollowCopyData, LabelerLikeCopyData,
13 PostCopyData, PostLikeCopyData, RepostCopyData,
14 UnresolvedRecord,
15};
16use super::EventSource;
17use crate::db::{bulk_resolve, composite_builders};
18use crate::relay::types::RecordTypes;
19use crate::types::records::{AppBskyEmbed, MediaEmbed};
20use crate::Result;
21use std::collections::HashMap;
22
23/// Metadata for a like record extracted from UnresolvedRecord
24struct LikeMetadata {
25 rkey: i64,
26 record: crate::types::records::AppBskyFeedLike,
27}
28
29/// Processed like data separated by type
30struct ProcessedLikes {
31 post_likes: Vec<PostLikeCopyData>,
32 feedgen_likes: Vec<FeedgenLikeCopyData>,
33 labeler_likes: Vec<LabelerLikeCopyData>,
34}
35
36/// Metadata for a follow record extracted from UnresolvedRecord
37struct FollowMetadata {
38 rkey: i64,
39 record: crate::types::records::AppBskyGraphFollow,
40}
41
42/// Metadata for a repost record extracted from UnresolvedRecord
43struct RepostMetadata {
44 rkey: i64,
45 cid: ipld_core::cid::Cid,
46 record: crate::types::records::AppBskyFeedRepost,
47}
48
49/// Metadata for a block record extracted from UnresolvedRecord
50struct BlockMetadata {
51 rkey: i64,
52 record: crate::types::records::AppBskyGraphBlock,
53}
54
55/// Metadata for a post record extracted from UnresolvedRecord
56struct PostMetadata {
57 rkey: i64,
58 cid: ipld_core::cid::Cid,
59 record: crate::types::records::AppBskyFeedPost,
60}
61
62/// Process a batch of unresolved records into bulk operations
63///
64/// This function:
65/// 1. Groups records by type
66/// 2. Resolves all foreign keys in bulk (single query per FK type)
67/// 3. Converts to COPY-ready structures
68/// 4. Returns BulkOperations + individual operations
69pub async fn process_bulk_records(
70 conn: &deadpool_postgres::Object,
71 repo: &str,
72 actor_id: i32,
73 records: Vec<UnresolvedRecord>,
74 _source: EventSource,
75) -> Result<BulkOperations> {
76 let start = std::time::Instant::now();
77 let total_records = records.len();
78
79 tracing::info!(
80 repo = %repo,
81 actor_id = actor_id,
82 records = total_records,
83 "Starting bulk record processing"
84 );
85
86 let mut bulk_ops = BulkOperations::new();
87
88 // Group records by type - extracting only metadata we need
89 let mut likes = Vec::new();
90 let mut follows = Vec::new();
91 let mut reposts = Vec::new();
92 let mut blocks = Vec::new();
93 let mut posts = Vec::new();
94 let mut individual_records = Vec::new();
95
96 for unresolved in records {
97 // Extract metadata we need before moving the record
98 let UnresolvedRecord { at_uri, rkey, cid, record } = unresolved;
99 let record = *record; // Unbox
100
101 match record {
102 RecordTypes::AppBskyFeedLike(like_record) => {
103 // Validate TID timestamp (matches behavior in individual handlers)
104 if let Err(e) = crate::database_writer::validate_tid_timestamp(&rkey) {
105 tracing::warn!(rkey = %rkey, error = %e, "Invalid like TID timestamp, skipping");
106 continue;
107 }
108 let rkey_i64 = match parakeet_db::models::tid_to_i64(&rkey) {
109 Ok(val) => val,
110 Err(e) => {
111 tracing::warn!(rkey = %rkey, error = ?e, "Failed to convert like TID to i64, skipping");
112 continue;
113 }
114 };
115 likes.push(LikeMetadata {
116 rkey: rkey_i64,
117 record: like_record,
118 });
119 }
120 RecordTypes::AppBskyGraphFollow(follow_record) => {
121 // Validate TID timestamp
122 if let Err(e) = crate::database_writer::validate_tid_timestamp(&rkey) {
123 tracing::warn!(rkey = %rkey, error = %e, "Invalid follow TID timestamp, skipping");
124 continue;
125 }
126 let rkey_i64 = match parakeet_db::models::tid_to_i64(&rkey) {
127 Ok(val) => val,
128 Err(e) => {
129 tracing::warn!(rkey = %rkey, error = ?e, "Failed to convert follow TID to i64, skipping");
130 continue;
131 }
132 };
133 follows.push(FollowMetadata {
134 rkey: rkey_i64,
135 record: follow_record,
136 });
137 }
138 RecordTypes::AppBskyFeedRepost(repost_record) => {
139 // Validate TID timestamp
140 if let Err(e) = crate::database_writer::validate_tid_timestamp(&rkey) {
141 tracing::warn!(rkey = %rkey, error = %e, "Invalid repost TID timestamp, skipping");
142 continue;
143 }
144 let rkey_i64 = match parakeet_db::models::tid_to_i64(&rkey) {
145 Ok(val) => val,
146 Err(e) => {
147 tracing::warn!(rkey = %rkey, error = ?e, "Failed to convert repost TID to i64, skipping");
148 continue;
149 }
150 };
151 reposts.push(RepostMetadata {
152 rkey: rkey_i64,
153 cid,
154 record: repost_record,
155 });
156 }
157 RecordTypes::AppBskyGraphBlock(block_record) => {
158 // Validate TID timestamp
159 if let Err(e) = crate::database_writer::validate_tid_timestamp(&rkey) {
160 tracing::warn!(rkey = %rkey, error = %e, "Invalid block TID timestamp, skipping");
161 continue;
162 }
163 let rkey_i64 = match parakeet_db::models::tid_to_i64(&rkey) {
164 Ok(val) => val,
165 Err(e) => {
166 tracing::warn!(rkey = %rkey, error = ?e, "Failed to convert block TID to i64, skipping");
167 continue;
168 }
169 };
170 blocks.push(BlockMetadata {
171 rkey: rkey_i64,
172 record: block_record,
173 });
174 }
175 RecordTypes::AppBskyFeedPost(post_record) => {
176 // Validate TID timestamp for posts
177 if let Err(e) = crate::database_writer::validate_tid_timestamp(&rkey) {
178 tracing::warn!(rkey = %rkey, error = %e, "Invalid post TID timestamp, skipping");
179 continue;
180 }
181 let rkey_i64 = match parakeet_db::models::tid_to_i64(&rkey) {
182 Ok(val) => val,
183 Err(e) => {
184 tracing::warn!(rkey = %rkey, error = ?e, "Failed to convert post TID to i64, skipping");
185 continue;
186 }
187 };
188 posts.push(PostMetadata {
189 rkey: rkey_i64,
190 cid,
191 record: post_record,
192 });
193 }
194 other => {
195 // Everything else (profiles, gates, lists, etc.) goes through individual path
196 // These may have non-TID rkeys (e.g., profiles use "self")
197 individual_records.push(UnresolvedRecord {
198 at_uri,
199 rkey,
200 cid,
201 record: Box::new(other),
202 });
203 }
204 }
205 }
206
207 tracing::debug!(
208 likes = likes.len(),
209 follows = follows.len(),
210 reposts = reposts.len(),
211 blocks = blocks.len(),
212 posts = posts.len(),
213 individual = individual_records.len(),
214 "Grouped records by type"
215 );
216
217 // Process likes in bulk
218 if !likes.is_empty() {
219 let processed_likes = process_likes_bulk(conn, actor_id, likes).await?;
220 bulk_ops.post_likes = processed_likes.post_likes;
221 bulk_ops.feedgen_likes = processed_likes.feedgen_likes;
222 bulk_ops.labeler_likes = processed_likes.labeler_likes;
223 }
224
225 // Process follows in bulk
226 if !follows.is_empty() {
227 let follow_data = process_follows_bulk(conn, actor_id, follows).await?;
228 bulk_ops.follows = follow_data;
229 }
230
231 // Process reposts in bulk
232 if !reposts.is_empty() {
233 let repost_data = process_reposts_bulk(conn, actor_id, reposts).await?;
234 bulk_ops.reposts = repost_data;
235 }
236
237 // Process blocks in bulk
238 if !blocks.is_empty() {
239 let block_data = process_blocks_bulk(conn, actor_id, blocks).await?;
240 bulk_ops.blocks = block_data;
241 }
242
243 // Process posts in bulk
244 if !posts.is_empty() {
245 let post_data = process_posts_bulk(conn, repo, actor_id, posts).await?;
246 bulk_ops.posts = post_data;
247 }
248
249 // Convert individual records to DatabaseOperations
250 // These are records that can't be bulk-processed (posts, profiles, gates, lists, etc)
251 // We need to properly resolve all referenced actors to create stubs and maintain referential integrity
252 for unresolved in individual_records {
253 let UnresolvedRecord { at_uri, rkey, cid, record } = unresolved;
254 let record = *record; // Unbox
255
256 // Extract all referenced actors/posts from the record
257 let refs = super::extract_references(&record);
258
259 // Resolve subject actor if present (e.g., for follows, blocks)
260 let subject_actor_id = if let Some(subject_did) = refs.subject_did {
261 let (actor_id, _, _) = crate::db::operations::feed::get_actor_id(conn, &subject_did).await?;
262 Some(actor_id)
263 } else {
264 None
265 };
266
267 // Resolve ALL additional DIDs (feedgen service DID, threadgate/listblock list owners)
268 // These actors need to exist but don't have direct FK relationships in the record table
269 // We ensure they exist here to prevent FK violations in related operations
270 for additional_did in &refs.additional_dids {
271 let _ = crate::db::operations::feed::get_actor_id(conn, additional_did).await?;
272 }
273
274 // For FeedGenerator records, the first (and only) additional DID is the service actor
275 // For other record types with additional_dids, we just ensure they exist above
276 let service_actor_id = if let Some(first_did) = refs.additional_dids.first() {
277 match crate::db::operations::feed::get_actor_id(conn, first_did).await {
278 Ok((actor_id, _, _)) => Some(actor_id),
279 Err(e) => {
280 tracing::warn!(
281 at_uri = %at_uri,
282 service_did = %first_did,
283 error = ?e,
284 "Failed to resolve service actor ID - skipping record"
285 );
286 None
287 }
288 }
289 } else {
290 None
291 };
292
293 // For posts, resolve parent/root/quoted authors and mentioned actors
294 let (parent_author_actor_id, root_author_actor_id, quoted_author_actor_id, mentioned_actor_ids) =
295 if let RecordTypes::AppBskyFeedPost(ref post) = record {
296 // Resolve parent author
297 let parent_author = if let Some(ref reply) = post.reply {
298 let parent_did = parakeet_db::utils::at_uri::extract_did(&reply.parent.uri);
299 if let Some(did) = parent_did {
300 let (aid, _, _) = crate::db::operations::feed::get_actor_id(conn, did).await?;
301 Some(aid)
302 } else {
303 None
304 }
305 } else {
306 None
307 };
308
309 // Resolve root author (if different from parent)
310 let root_author = if let Some(ref reply) = post.reply {
311 if reply.root.uri != reply.parent.uri {
312 let root_did = parakeet_db::utils::at_uri::extract_did(&reply.root.uri);
313 if let Some(did) = root_did {
314 let (aid, _, _) = crate::db::operations::feed::get_actor_id(conn, did).await?;
315 Some(aid)
316 } else {
317 None
318 }
319 } else {
320 parent_author
321 }
322 } else {
323 None
324 };
325
326 // Resolve quoted post author (from embed)
327 let quoted_author = if let Some(ref embed) = post.embed {
328 if let Some(bsky_embed) = embed.as_bsky() {
329 let quote_uri = match bsky_embed {
330 crate::types::records::AppBskyEmbed::Record(r) => Some(&r.record.uri),
331 crate::types::records::AppBskyEmbed::RecordWithMedia(rwm) => Some(&rwm.record.uri),
332 _ => None,
333 };
334
335 if let Some(uri) = quote_uri {
336 let quoted_did = parakeet_db::utils::at_uri::extract_did(uri);
337 if let Some(did) = quoted_did {
338 let (aid, _, _) = crate::db::operations::feed::get_actor_id(conn, did).await?;
339 Some(aid)
340 } else {
341 None
342 }
343 } else {
344 None
345 }
346 } else {
347 None
348 }
349 } else {
350 None
351 };
352
353 // Resolve mentioned actors from facets
354 let mut mentions = Vec::new();
355 if let Some(ref facets) = post.facets {
356 use jacquard_api::app_bsky::richtext::facet::FacetFeaturesItem;
357 for facet in facets {
358 for feature in &facet.features {
359 if let FacetFeaturesItem::Mention(mention) = feature {
360 let (aid, _, _) = crate::db::operations::feed::get_actor_id(conn, mention.did.as_ref()).await?;
361 mentions.push(aid);
362 }
363 }
364 }
365 }
366
367 (parent_author, root_author, quoted_author, mentions)
368 } else {
369 (None, None, None, Vec::new())
370 };
371
372 // Resolve via_repost natural key if present (for likes and reposts that came via a repost)
373 let via_repost_key = if let (Some(via_uri), Some(via_cid)) = (&refs.via_uri, &refs.via_cid) {
374 // Extract the DID and rkey from the via URI
375 if let Some((via_did, via_rkey, _collection)) = parakeet_db::utils::at_uri::parse_at_uri(via_uri) {
376 // Ensure the via repost actor exists
377 let (via_actor_id, _, _) = crate::db::operations::feed::get_actor_id(conn, via_did).await?;
378
379 // Get or create the repost stub using the public re-export
380 let (repost_key, _was_created) = crate::db::operations::feed::get_repost_id(
381 conn,
382 via_actor_id,
383 via_rkey,
384 via_cid,
385 ).await?;
386
387 Some(repost_key)
388 } else {
389 None
390 }
391 } else {
392 None
393 };
394
395 let resolved_actor_ids = super::operations::ResolvedActorIds {
396 subject_actor_id,
397 service_actor_id,
398 parent_author_actor_id,
399 root_author_actor_id,
400 quoted_author_actor_id,
401 mentioned_actor_ids,
402 via_repost_key,
403 };
404
405 let processed = super::operations::process_record_to_operations(
406 repo,
407 actor_id,
408 resolved_actor_ids,
409 cid,
410 record,
411 at_uri,
412 rkey,
413 _source,
414 );
415
416 // Add all operations from this record
417 bulk_ops.individual_ops.extend(processed.operations);
418 }
419
420 let elapsed = start.elapsed();
421 tracing::info!(
422 repo = %repo,
423 total_records = total_records,
424 bulk_ops = bulk_ops.total_count(),
425 duration_ms = elapsed.as_millis(),
426 "Bulk record processing completed"
427 );
428
429 Ok(bulk_ops)
430}
431
432/// Process likes in bulk, returning three separate vectors by type
433async fn process_likes_bulk(
434 conn: &deadpool_postgres::Object,
435 actor_id: i32,
436 likes: Vec<LikeMetadata>,
437) -> Result<ProcessedLikes> {
438 let mut post_like_data = Vec::new();
439 let mut feedgen_like_data = Vec::new();
440 let mut labeler_like_data = Vec::new();
441
442 // Collect all subject URIs and CIDs for bulk resolution
443 // We need to own the CID strings to avoid lifetime issues
444 let cid_strings: Vec<String> = likes.iter()
445 .map(|like| like.record.subject.cid.to_string())
446 .collect();
447
448 // Separate likes by subject type
449 let mut post_likes = Vec::new();
450 let mut feedgen_likes = Vec::new();
451 let mut labeler_likes = Vec::new();
452
453 for (i, like) in likes.iter().enumerate() {
454 let subject_uri = &like.record.subject.uri;
455 if subject_uri.contains("/app.bsky.feed.post/") {
456 post_likes.push(i);
457 } else if subject_uri.contains("/app.bsky.feed.generator/") {
458 feedgen_likes.push(i);
459 } else if subject_uri.contains("/app.bsky.labeler.service/") {
460 labeler_likes.push(i);
461 } else {
462 tracing::warn!(uri = %subject_uri, "Unknown subject type in like");
463 }
464 }
465
466 // Bulk resolve posts
467 let post_uri_cid_pairs: Vec<(&str, &str)> = post_likes.iter()
468 .map(|&i| (likes[i].record.subject.uri.as_str(), cid_strings[i].as_str()))
469 .collect();
470 let resolved_posts = if !post_uri_cid_pairs.is_empty() {
471 bulk_resolve::resolve_and_ensure_posts_bulk(conn, &post_uri_cid_pairs).await?
472 } else {
473 HashMap::new()
474 };
475
476 // Bulk resolve feedgens
477 let feedgen_uris: Vec<&str> = feedgen_likes.iter()
478 .map(|&i| likes[i].record.subject.uri.as_str())
479 .collect();
480 let resolved_feedgens = if !feedgen_uris.is_empty() {
481 bulk_resolve::resolve_feedgen_uris_bulk(conn, &feedgen_uris).await?
482 } else {
483 HashMap::new()
484 };
485
486 // Bulk resolve labelers
487 // Extract DIDs from labeler AT URIs (at://did:plc:.../app.bsky.labeler.service/self)
488 let labeler_dids: Vec<&str> = labeler_likes.iter()
489 .filter_map(|&i| {
490 let uri = &likes[i].record.subject.uri;
491 parakeet_db::utils::at_uri::extract_did(uri)
492 })
493 .collect();
494 let resolved_labelers = if !labeler_dids.is_empty() {
495 bulk_resolve::resolve_labeler_dids_bulk(conn, &labeler_dids).await?
496 } else {
497 HashMap::new()
498 };
499
500 // Bulk resolve via_repost_ids
501 // Collect all via URIs with their corresponding subject info
502 let mut via_cid_strings: Vec<String> = Vec::new();
503 let mut via_subject_cid_strings: Vec<String> = Vec::new();
504 let mut via_repost_indices: Vec<usize> = Vec::new();
505
506 for (i, like) in likes.iter().enumerate() {
507 if let Some(ref via) = like.record.via {
508 via_cid_strings.push(via.cid.to_string());
509 via_subject_cid_strings.push(cid_strings[i].clone());
510 via_repost_indices.push(i);
511 }
512 }
513
514 let via_repost_data: Vec<(&str, &str, &str, &str)> = via_repost_indices.iter()
515 .enumerate()
516 .map(|(via_idx, &like_idx)| {
517 let like = &likes[like_idx];
518 let via = like.record.via.as_ref().unwrap(); // Safe because we filtered above
519 (
520 via.uri.as_str(), // repost URI
521 via_cid_strings[via_idx].as_str(), // repost CID
522 like.record.subject.uri.as_str(), // subject post URI (what was reposted and liked)
523 via_subject_cid_strings[via_idx].as_str(), // subject post CID
524 )
525 })
526 .collect();
527
528 let resolved_via_reposts = if !via_repost_data.is_empty() {
529 bulk_resolve::resolve_and_ensure_reposts_bulk(conn, &via_repost_data).await?
530 } else {
531 HashMap::new()
532 };
533
534 // Build type-specific like data for each like
535 for like in likes {
536 let subject_uri = &like.record.subject.uri;
537
538 if subject_uri.contains("/app.bsky.feed.post/") {
539 // Post like
540 let &(post_actor_id, post_rkey) = resolved_posts.get(subject_uri.as_str())
541 .ok_or_else(|| eyre::eyre!("Post not found for URI: {}", subject_uri))?;
542
543 // Resolve via_repost natural keys if present
544 let (via_repost_actor_id, via_repost_rkey) = like.record.via.as_ref()
545 .and_then(|via| resolved_via_reposts.get(via.uri.as_str()).copied())
546 .unzip();
547
548 post_like_data.push(PostLikeCopyData {
549 actor_id,
550 rkey: like.rkey,
551 post_actor_id,
552 post_rkey,
553 via_repost_actor_id,
554 via_repost_rkey,
555 });
556 } else if subject_uri.contains("/app.bsky.feed.generator/") {
557 // Feedgen like - only include if feedgen exists (no auto-stubbing)
558 if let Some(&(feed_actor_id, ref feed_rkey)) = resolved_feedgens.get(subject_uri.as_str()) {
559 feedgen_like_data.push(FeedgenLikeCopyData {
560 actor_id,
561 rkey: like.rkey,
562 feed_actor_id,
563 feed_rkey: feed_rkey.clone(),
564 });
565 } else {
566 tracing::debug!(uri = %subject_uri, "Feedgen not found for like, skipping");
567 }
568 } else if subject_uri.contains("/app.bsky.labeler.service/") {
569 // Labeler like - only include if labeler exists (no auto-stubbing)
570 // Extract DID from labeler AT URI
571 if let Some(labeler_did) = parakeet_db::utils::at_uri::extract_did(subject_uri) {
572 if let Some(&labeler_actor_id) = resolved_labelers.get(labeler_did) {
573 labeler_like_data.push(LabelerLikeCopyData {
574 actor_id,
575 rkey: like.rkey,
576 labeler_actor_id,
577 });
578 } else {
579 tracing::debug!(uri = %subject_uri, "Labeler not found for like, skipping");
580 }
581 } else {
582 tracing::warn!(uri = %subject_uri, "Failed to extract DID from labeler URI");
583 }
584 } else {
585 tracing::warn!(uri = %subject_uri, "Unknown subject type in like");
586 }
587 }
588
589 Ok(ProcessedLikes {
590 post_likes: post_like_data,
591 feedgen_likes: feedgen_like_data,
592 labeler_likes: labeler_like_data,
593 })
594}
595
596/// Process follows in bulk
597async fn process_follows_bulk(
598 conn: &deadpool_postgres::Object,
599 actor_id: i32,
600 follows: Vec<FollowMetadata>,
601) -> Result<Vec<FollowCopyData>> {
602 let mut follow_data = Vec::with_capacity(follows.len());
603
604 // Collect all subject DIDs for bulk resolution
605 let subject_dids: Vec<&str> = follows.iter().map(|f| f.record.subject.as_str()).collect();
606
607 // Resolve all subject actors (creating stubs if needed)
608 let mut resolved_actors = bulk_resolve::resolve_actor_dids_bulk(conn, &subject_dids).await?;
609
610 // Create stubs for missing actors
611 let missing_dids: Vec<&str> = subject_dids
612 .iter()
613 .filter(|&&did| !resolved_actors.contains_key(did))
614 .copied()
615 .collect();
616
617 if !missing_dids.is_empty() {
618 let created = bulk_resolve::create_actor_stubs_bulk(conn, &missing_dids).await?;
619 resolved_actors.extend(created);
620 }
621
622 // Build FollowCopyData for each follow
623 for follow in follows {
624 let subject_actor_id = *resolved_actors.get(follow.record.subject.as_str())
625 .ok_or_else(|| eyre::eyre!("Actor not found for DID: {}", follow.record.subject))?;
626
627 follow_data.push(FollowCopyData {
628 actor_id,
629 rkey: follow.rkey, // Already converted to i64
630 subject_actor_id,
631 created_at: follow.record.created_at,
632 });
633 }
634
635 Ok(follow_data)
636}
637
638/// Process reposts in bulk
639async fn process_reposts_bulk(
640 conn: &deadpool_postgres::Object,
641 actor_id: i32,
642 reposts: Vec<RepostMetadata>,
643) -> Result<Vec<RepostCopyData>> {
644 let mut repost_data = Vec::with_capacity(reposts.len());
645
646 // Collect all subject URIs and CIDs for bulk resolution
647 let cid_strings: Vec<String> = reposts.iter()
648 .map(|repost| repost.record.subject.cid.to_string())
649 .collect();
650
651 let uri_cid_pairs: Vec<(&str, &str)> = reposts.iter()
652 .zip(cid_strings.iter())
653 .map(|(repost, cid_str)| (repost.record.subject.uri.as_str(), cid_str.as_str()))
654 .collect();
655
656 // Bulk resolve all subject posts
657 let resolved_posts = bulk_resolve::resolve_and_ensure_posts_bulk(conn, &uri_cid_pairs).await?;
658
659 // Bulk resolve via_repost_ids
660 // Collect all via URIs with their corresponding subject info
661 let mut via_cid_strings: Vec<String> = Vec::new();
662 let mut via_subject_cid_strings: Vec<String> = Vec::new();
663 let mut via_repost_indices: Vec<usize> = Vec::new();
664
665 for (i, repost) in reposts.iter().enumerate() {
666 if let Some(ref via) = repost.record.via {
667 via_cid_strings.push(via.cid.to_string());
668 via_subject_cid_strings.push(cid_strings[i].clone());
669 via_repost_indices.push(i);
670 }
671 }
672
673 let via_repost_data: Vec<(&str, &str, &str, &str)> = via_repost_indices.iter()
674 .enumerate()
675 .map(|(via_idx, &repost_idx)| {
676 let repost = &reposts[repost_idx];
677 let via = repost.record.via.as_ref().unwrap(); // Safe because we filtered above
678 (
679 via.uri.as_str(), // via repost URI
680 via_cid_strings[via_idx].as_str(), // via repost CID
681 repost.record.subject.uri.as_str(), // subject post URI (what was reposted)
682 via_subject_cid_strings[via_idx].as_str(), // subject post CID
683 )
684 })
685 .collect();
686
687 let resolved_via_reposts = if !via_repost_data.is_empty() {
688 bulk_resolve::resolve_and_ensure_reposts_bulk(conn, &via_repost_data).await?
689 } else {
690 HashMap::new()
691 };
692
693 // Build RepostCopyData for each repost
694 for repost in reposts {
695 let &(post_actor_id, post_rkey) = resolved_posts.get(repost.record.subject.uri.as_str())
696 .ok_or_else(|| eyre::eyre!("Post not found for URI: {}", repost.record.subject.uri))?;
697
698 // Get CID digest (real CID for reposts)
699 let cid_bytes = repost.cid.to_bytes();
700 let cid_digest = parakeet_db::utils::cid::cid_to_digest(&cid_bytes)
701 .ok_or_else(|| eyre::eyre!("Invalid CID for repost"))?;
702
703 // Resolve via_repost natural keys if present
704 let (via_repost_actor_id, via_repost_rkey) = repost.record.via.as_ref()
705 .and_then(|via| resolved_via_reposts.get(via.uri.as_str()).copied())
706 .unzip();
707
708 repost_data.push(RepostCopyData {
709 actor_id,
710 rkey: repost.rkey, // Already converted to i64
711 post_actor_id,
712 post_rkey,
713 cid: cid_digest.to_vec(),
714 created_at: repost.record.created_at,
715 via_repost_actor_id,
716 via_repost_rkey,
717 });
718 }
719
720 Ok(repost_data)
721}
722
723/// Process blocks in bulk
724async fn process_blocks_bulk(
725 conn: &deadpool_postgres::Object,
726 actor_id: i32,
727 blocks: Vec<BlockMetadata>,
728) -> Result<Vec<BlockCopyData>> {
729 let mut block_data = Vec::with_capacity(blocks.len());
730
731 // Collect all subject DIDs for bulk resolution
732 let subject_dids: Vec<&str> = blocks.iter().map(|b| b.record.subject.as_str()).collect();
733
734 // Resolve all subject actors
735 let mut resolved_actors = bulk_resolve::resolve_actor_dids_bulk(conn, &subject_dids).await?;
736
737 // Create stubs for missing actors
738 let missing_dids: Vec<&str> = subject_dids
739 .iter()
740 .filter(|&&did| !resolved_actors.contains_key(did))
741 .copied()
742 .collect();
743
744 if !missing_dids.is_empty() {
745 let created = bulk_resolve::create_actor_stubs_bulk(conn, &missing_dids).await?;
746 resolved_actors.extend(created);
747 }
748
749 // Build BlockCopyData for each block
750 for block in blocks {
751 let subject_actor_id = *resolved_actors.get(block.record.subject.as_str())
752 .ok_or_else(|| eyre::eyre!("Actor not found for DID: {}", block.record.subject))?;
753
754 block_data.push(BlockCopyData {
755 actor_id,
756 rkey: block.rkey, // Already converted to i64
757 subject_actor_id,
758 created_at: block.record.created_at,
759 });
760 }
761
762 Ok(block_data)
763}
764
765/// Process posts in bulk
766///
767/// Converts PostMetadata into PostCopyData by:
768/// 1. Resolving all FK references (parent/root posts, mentions) - stubs already exist
769/// 2. Extracting and processing embeds (images, video, external, record)
770/// 3. Extracting facets (links, mentions, tags)
771/// 4. Compressing content and tokenizing for search
772///
773/// All referenced entities (posts, actors) have stubs created by resolve_all_references_bulk
774async fn process_posts_bulk(
775 conn: &deadpool_postgres::Object,
776 _repo: &str,
777 actor_id: i32,
778 posts: Vec<PostMetadata>,
779) -> Result<Vec<PostCopyData>> {
780 use crate::search::SearchTokenizer;
781 use crate::types::records::EmbedOuter;
782 use crate::utils::{extract_mentions_and_tags, merge_tags};
783 use parakeet_db::compression::PostContentCodec;
784
785 let mut post_data = Vec::with_capacity(posts.len());
786
787 // Step 1: Collect all parent/root/quoted URIs for bulk lookup
788 let mut post_uris_to_lookup: Vec<String> = Vec::new();
789 for post in &posts {
790 // Collect parent/root from replies
791 if let Some(reply) = &post.record.reply {
792 post_uris_to_lookup.push(reply.parent.uri.to_string());
793 // Only add root if different from parent
794 if reply.root.uri != reply.parent.uri {
795 post_uris_to_lookup.push(reply.root.uri.to_string());
796 }
797 }
798
799 // Collect quoted post URIs from embeds
800 // Only collect post URIs - embeds can also be feedgens, profiles, lists, etc.
801 if let Some(ref embed) = post.record.embed {
802 if let Some(bsky_embed) = embed.as_bsky() {
803 match bsky_embed {
804 AppBskyEmbed::Record(r) => {
805 if r.record.uri.as_str().contains("/app.bsky.feed.post/") {
806 post_uris_to_lookup.push(r.record.uri.to_string());
807 }
808 }
809 AppBskyEmbed::RecordWithMedia(rwm) => {
810 if rwm.record.uri.as_str().contains("/app.bsky.feed.post/") {
811 post_uris_to_lookup.push(rwm.record.uri.to_string());
812 }
813 }
814 _ => {}
815 }
816 }
817 }
818 }
819
820 // Step 2: Bulk resolve post URIs to post_ids (stubs already exist)
821 let uri_to_post_id = if !post_uris_to_lookup.is_empty() {
822 let uris: Vec<&str> = post_uris_to_lookup.iter().map(|s| s.as_str()).collect();
823 bulk_resolve::resolve_post_uris_bulk(conn, &uris).await?
824 } else {
825 std::collections::HashMap::new()
826 };
827
828 // Step 3: Extract mentions and tags from all posts in one pass
829 // Store tags indexed by post position for later use
830 let mut mention_dids: Vec<String> = Vec::new();
831 let mut tags_by_post: Vec<Vec<String>> = Vec::with_capacity(posts.len());
832
833 for post in &posts {
834 if let Some(facets) = &post.record.facets {
835 let (mentions, tags) = extract_mentions_and_tags(facets.as_slice());
836 mention_dids.extend(mentions);
837 tags_by_post.push(tags);
838 } else {
839 tags_by_post.push(Vec::new());
840 }
841 }
842
843 // Step 4: Bulk resolve mention DIDs to actor_ids (stubs already exist)
844 let did_to_actor_id = if !mention_dids.is_empty() {
845 let dids: Vec<&str> = mention_dids.iter().map(|s| s.as_str()).collect();
846 bulk_resolve::resolve_actor_dids_bulk(conn, &dids).await?
847 } else {
848 std::collections::HashMap::new()
849 };
850
851 // Step 5: Process each post
852 let codec = PostContentCodec::new();
853 let tokenizer = SearchTokenizer::new();
854
855 for (post_idx, post_meta) in posts.into_iter().enumerate() {
856 let PostMetadata { rkey, cid, record } = post_meta;
857
858 // Extract CID digest
859 let cid_bytes = cid.to_bytes();
860 let cid_digest = parakeet_db::utils::cid::cid_to_digest(&cid_bytes)
861 .expect("Valid CID should have digest");
862
863 // Resolve parent and root post natural keys
864 let (parent_post_actor_id, parent_post_rkey, root_post_actor_id, root_post_rkey) = if let Some(reply) = &record.reply {
865 let parent_nk = uri_to_post_id.get(reply.parent.uri.as_str()).copied();
866 let root_nk = if reply.root.uri != reply.parent.uri {
867 uri_to_post_id.get(reply.root.uri.as_str()).copied()
868 } else {
869 parent_nk // Root is same as parent
870 };
871 let (parent_actor_id, parent_rkey) = parent_nk.unzip();
872 let (root_actor_id, root_rkey) = root_nk.unzip();
873 (parent_actor_id, parent_rkey, root_actor_id, root_rkey)
874 } else {
875 (None, None, None, None)
876 };
877
878 // Use pre-extracted tags from Step 3
879 let tags_from_facets = tags_by_post[post_idx].clone();
880
881 // Merge tags from facets and record.tags
882 let tags = merge_tags(Some(tags_from_facets), record.tags.clone());
883
884 // Compress content
885 let content_compressed = if record.text.is_empty() {
886 None
887 } else {
888 Some(codec.compress(&record.text)?)
889 };
890
891 // Tokenize for search
892 let tokens = tokenizer.tokenize(&record.text);
893
894 // Extract embed type and subtype
895 // Inline lexicon_to_embed_type mapping
896 let lexicon_to_embed_type = |lexicon: &str| -> String {
897 match lexicon {
898 "app.bsky.embed.images" => "images",
899 "app.bsky.embed.video" => "video",
900 "app.bsky.embed.external" => "external",
901 "app.bsky.embed.record" => "record",
902 "app.bsky.embed.recordWithMedia" => "record_with_media",
903 _ => lexicon,
904 }.to_string()
905 };
906
907 let (embed_type, embed_subtype) = if let Some(ref embed_outer) = record.embed {
908 let type_str = EmbedOuter::as_str(embed_outer);
909 let embed_type = Some(lexicon_to_embed_type(type_str));
910 let embed_subtype = EmbedOuter::subtype(embed_outer)
911 .map(lexicon_to_embed_type);
912 (embed_type, embed_subtype)
913 } else {
914 (None, None)
915 };
916
917 // For backfills, violates_threadgate is always false (we don't check gates for historical data)
918 let violates_threadgate = false;
919
920 // Build composite fields from embed using composite_builders
921 let (ext_embed, video_embed, image_1, image_2, image_3, image_4, embedded_post_actor_id, embedded_post_rkey, record_detached) =
922 if let Some(embed_outer) = record.embed.and_then(|e| e.into_bsky()) {
923 match embed_outer {
924 AppBskyEmbed::Images(images) => {
925 let (i1, i2, i3, i4) = composite_builders::build_image_embeds(&images);
926 (None, None, i1, i2, i3, i4, None, None, None)
927 }
928 AppBskyEmbed::Video(video) => {
929 (None, composite_builders::build_video_embed(&video), None, None, None, None, None, None, None)
930 }
931 AppBskyEmbed::External(external) => {
932 (composite_builders::build_ext_embed(&external), None, None, None, None, None, None, None, None)
933 }
934 AppBskyEmbed::Record(rec) => {
935 let embedded_post_nk = uri_to_post_id.get(rec.record.uri.as_str()).copied();
936 let (embedded_actor_id, embedded_rkey) = embedded_post_nk.unzip();
937 (None, None, None, None, None, None, embedded_actor_id, embedded_rkey, Some(false))
938 }
939 AppBskyEmbed::RecordWithMedia(rwm) => {
940 // Process media part
941 let (ext, vid, i1, i2, i3, i4) = match &rwm.media {
942 MediaEmbed::Images(images) => {
943 let (i1, i2, i3, i4) = composite_builders::build_image_embeds(&images);
944 (None, None, i1, i2, i3, i4)
945 }
946 MediaEmbed::Video(video) => {
947 (None, composite_builders::build_video_embed(&video), None, None, None, None)
948 }
949 MediaEmbed::External(external) => {
950 (composite_builders::build_ext_embed(&external), None, None, None, None, None)
951 }
952 };
953 // Process record part
954 let embedded_post_nk = uri_to_post_id.get(rwm.record.uri.as_str()).copied();
955 let (embedded_actor_id, embedded_rkey) = embedded_post_nk.unzip();
956 (ext, vid, i1, i2, i3, i4, embedded_actor_id, embedded_rkey, Some(false))
957 }
958 }
959 } else {
960 (None, None, None, None, None, None, None, None, None)
961 };
962
963 // Build facet composites and mention array using composite_builders
964 let (facet_1, facet_2, facet_3, facet_4, facet_5, facet_6, facet_7, facet_8, mentions) =
965 if let Some(ref facets) = record.facets {
966 let (f1, f2, f3, f4, f5, f6, f7, f8) = composite_builders::build_facet_embeds(facets, &did_to_actor_id);
967 let m = composite_builders::extract_mention_actor_ids(facets, &did_to_actor_id);
968 (f1, f2, f3, f4, f5, f6, f7, f8, m)
969 } else {
970 (None, None, None, None, None, None, None, None, None)
971 };
972
973 post_data.push(PostCopyData {
974 actor_id,
975 rkey,
976 cid: cid_digest.to_vec(),
977 content_compressed,
978 langs: record.langs.unwrap_or_default(),
979 tags,
980 parent_post_actor_id,
981 parent_post_rkey,
982 root_post_actor_id,
983 root_post_rkey,
984 embed_type,
985 embed_subtype,
986 violates_threadgate,
987 tokens,
988 ext_embed,
989 video_embed,
990 image_1,
991 image_2,
992 image_3,
993 image_4,
994 embedded_post_actor_id,
995 embedded_post_rkey,
996 record_detached,
997 facet_1,
998 facet_2,
999 facet_3,
1000 facet_4,
1001 facet_5,
1002 facet_6,
1003 facet_7,
1004 facet_8,
1005 mentions,
1006 });
1007 }
1008
1009 Ok(post_data)
1010}