//! Bulk record processing for COPY operations //! //! This module processes UnresolvedBulk events into ResolvedBulk events by: //! 1. Grouping records by type (likes, follows, posts, etc.) //! 2. Bulk resolving all foreign keys in single queries //! 3. Converting to COPY-ready data structures //! //! For records that can't be bulk processed (profiles, gates, etc.), //! they're converted to individual DatabaseOperations. use super::bulk_types::{ BlockCopyData, BulkOperations, FeedgenLikeCopyData, FollowCopyData, LabelerLikeCopyData, PostCopyData, PostLikeCopyData, RepostCopyData, UnresolvedRecord, }; use super::EventSource; use crate::db::{bulk_resolve, composite_builders}; use crate::relay::types::RecordTypes; use crate::types::records::{AppBskyEmbed, MediaEmbed}; use crate::Result; use std::collections::HashMap; /// Metadata for a like record extracted from UnresolvedRecord struct LikeMetadata { rkey: i64, record: crate::types::records::AppBskyFeedLike, } /// Processed like data separated by type struct ProcessedLikes { post_likes: Vec, feedgen_likes: Vec, labeler_likes: Vec, } /// Metadata for a follow record extracted from UnresolvedRecord struct FollowMetadata { rkey: i64, record: crate::types::records::AppBskyGraphFollow, } /// Metadata for a repost record extracted from UnresolvedRecord struct RepostMetadata { rkey: i64, cid: ipld_core::cid::Cid, record: crate::types::records::AppBskyFeedRepost, } /// Metadata for a block record extracted from UnresolvedRecord struct BlockMetadata { rkey: i64, record: crate::types::records::AppBskyGraphBlock, } /// Metadata for a post record extracted from UnresolvedRecord struct PostMetadata { rkey: i64, cid: ipld_core::cid::Cid, record: crate::types::records::AppBskyFeedPost, } /// Process a batch of unresolved records into bulk operations /// /// This function: /// 1. Groups records by type /// 2. Resolves all foreign keys in bulk (single query per FK type) /// 3. Converts to COPY-ready structures /// 4. Returns BulkOperations + individual operations pub async fn process_bulk_records( conn: &deadpool_postgres::Object, repo: &str, actor_id: i32, records: Vec, _source: EventSource, ) -> Result { let start = std::time::Instant::now(); let total_records = records.len(); tracing::info!( repo = %repo, actor_id = actor_id, records = total_records, "Starting bulk record processing" ); let mut bulk_ops = BulkOperations::new(); // Group records by type - extracting only metadata we need let mut likes = Vec::new(); let mut follows = Vec::new(); let mut reposts = Vec::new(); let mut blocks = Vec::new(); let mut posts = Vec::new(); let mut individual_records = Vec::new(); for unresolved in records { // Extract metadata we need before moving the record let UnresolvedRecord { at_uri, rkey, cid, record } = unresolved; let record = *record; // Unbox match record { RecordTypes::AppBskyFeedLike(like_record) => { // Validate TID timestamp (matches behavior in individual handlers) if let Err(e) = crate::database_writer::validate_tid_timestamp(&rkey) { tracing::warn!(rkey = %rkey, error = %e, "Invalid like TID timestamp, skipping"); continue; } let rkey_i64 = match parakeet_db::models::tid_to_i64(&rkey) { Ok(val) => val, Err(e) => { tracing::warn!(rkey = %rkey, error = ?e, "Failed to convert like TID to i64, skipping"); continue; } }; likes.push(LikeMetadata { rkey: rkey_i64, record: like_record, }); } RecordTypes::AppBskyGraphFollow(follow_record) => { // Validate TID timestamp if let Err(e) = crate::database_writer::validate_tid_timestamp(&rkey) { tracing::warn!(rkey = %rkey, error = %e, "Invalid follow TID timestamp, skipping"); continue; } let rkey_i64 = match parakeet_db::models::tid_to_i64(&rkey) { Ok(val) => val, Err(e) => { tracing::warn!(rkey = %rkey, error = ?e, "Failed to convert follow TID to i64, skipping"); continue; } }; follows.push(FollowMetadata { rkey: rkey_i64, record: follow_record, }); } RecordTypes::AppBskyFeedRepost(repost_record) => { // Validate TID timestamp if let Err(e) = crate::database_writer::validate_tid_timestamp(&rkey) { tracing::warn!(rkey = %rkey, error = %e, "Invalid repost TID timestamp, skipping"); continue; } let rkey_i64 = match parakeet_db::models::tid_to_i64(&rkey) { Ok(val) => val, Err(e) => { tracing::warn!(rkey = %rkey, error = ?e, "Failed to convert repost TID to i64, skipping"); continue; } }; reposts.push(RepostMetadata { rkey: rkey_i64, cid, record: repost_record, }); } RecordTypes::AppBskyGraphBlock(block_record) => { // Validate TID timestamp if let Err(e) = crate::database_writer::validate_tid_timestamp(&rkey) { tracing::warn!(rkey = %rkey, error = %e, "Invalid block TID timestamp, skipping"); continue; } let rkey_i64 = match parakeet_db::models::tid_to_i64(&rkey) { Ok(val) => val, Err(e) => { tracing::warn!(rkey = %rkey, error = ?e, "Failed to convert block TID to i64, skipping"); continue; } }; blocks.push(BlockMetadata { rkey: rkey_i64, record: block_record, }); } RecordTypes::AppBskyFeedPost(post_record) => { // Validate TID timestamp for posts if let Err(e) = crate::database_writer::validate_tid_timestamp(&rkey) { tracing::warn!(rkey = %rkey, error = %e, "Invalid post TID timestamp, skipping"); continue; } let rkey_i64 = match parakeet_db::models::tid_to_i64(&rkey) { Ok(val) => val, Err(e) => { tracing::warn!(rkey = %rkey, error = ?e, "Failed to convert post TID to i64, skipping"); continue; } }; posts.push(PostMetadata { rkey: rkey_i64, cid, record: post_record, }); } other => { // Everything else (profiles, gates, lists, etc.) goes through individual path // These may have non-TID rkeys (e.g., profiles use "self") individual_records.push(UnresolvedRecord { at_uri, rkey, cid, record: Box::new(other), }); } } } tracing::debug!( likes = likes.len(), follows = follows.len(), reposts = reposts.len(), blocks = blocks.len(), posts = posts.len(), individual = individual_records.len(), "Grouped records by type" ); // Process likes in bulk if !likes.is_empty() { let processed_likes = process_likes_bulk(conn, actor_id, likes).await?; bulk_ops.post_likes = processed_likes.post_likes; bulk_ops.feedgen_likes = processed_likes.feedgen_likes; bulk_ops.labeler_likes = processed_likes.labeler_likes; } // Process follows in bulk if !follows.is_empty() { let follow_data = process_follows_bulk(conn, actor_id, follows).await?; bulk_ops.follows = follow_data; } // Process reposts in bulk if !reposts.is_empty() { let repost_data = process_reposts_bulk(conn, actor_id, reposts).await?; bulk_ops.reposts = repost_data; } // Process blocks in bulk if !blocks.is_empty() { let block_data = process_blocks_bulk(conn, actor_id, blocks).await?; bulk_ops.blocks = block_data; } // Process posts in bulk if !posts.is_empty() { let post_data = process_posts_bulk(conn, repo, actor_id, posts).await?; bulk_ops.posts = post_data; } // Convert individual records to DatabaseOperations // These are records that can't be bulk-processed (posts, profiles, gates, lists, etc) // We need to properly resolve all referenced actors to create stubs and maintain referential integrity for unresolved in individual_records { let UnresolvedRecord { at_uri, rkey, cid, record } = unresolved; let record = *record; // Unbox // Extract all referenced actors/posts from the record let refs = super::extract_references(&record); // Resolve subject actor if present (e.g., for follows, blocks) let subject_actor_id = if let Some(subject_did) = refs.subject_did { let (actor_id, _, _) = crate::db::operations::feed::get_actor_id(conn, &subject_did).await?; Some(actor_id) } else { None }; // Resolve ALL additional DIDs (feedgen service DID, threadgate/listblock list owners) // These actors need to exist but don't have direct FK relationships in the record table // We ensure they exist here to prevent FK violations in related operations for additional_did in &refs.additional_dids { let _ = crate::db::operations::feed::get_actor_id(conn, additional_did).await?; } // For FeedGenerator records, the first (and only) additional DID is the service actor // For other record types with additional_dids, we just ensure they exist above let service_actor_id = if let Some(first_did) = refs.additional_dids.first() { match crate::db::operations::feed::get_actor_id(conn, first_did).await { Ok((actor_id, _, _)) => Some(actor_id), Err(e) => { tracing::warn!( at_uri = %at_uri, service_did = %first_did, error = ?e, "Failed to resolve service actor ID - skipping record" ); None } } } else { None }; // For posts, resolve parent/root/quoted authors and mentioned actors let (parent_author_actor_id, root_author_actor_id, quoted_author_actor_id, mentioned_actor_ids) = if let RecordTypes::AppBskyFeedPost(ref post) = record { // Resolve parent author let parent_author = if let Some(ref reply) = post.reply { let parent_did = parakeet_db::utils::at_uri::extract_did(&reply.parent.uri); if let Some(did) = parent_did { let (aid, _, _) = crate::db::operations::feed::get_actor_id(conn, did).await?; Some(aid) } else { None } } else { None }; // Resolve root author (if different from parent) let root_author = if let Some(ref reply) = post.reply { if reply.root.uri != reply.parent.uri { let root_did = parakeet_db::utils::at_uri::extract_did(&reply.root.uri); if let Some(did) = root_did { let (aid, _, _) = crate::db::operations::feed::get_actor_id(conn, did).await?; Some(aid) } else { None } } else { parent_author } } else { None }; // Resolve quoted post author (from embed) let quoted_author = if let Some(ref embed) = post.embed { if let Some(bsky_embed) = embed.as_bsky() { let quote_uri = match bsky_embed { crate::types::records::AppBskyEmbed::Record(r) => Some(&r.record.uri), crate::types::records::AppBskyEmbed::RecordWithMedia(rwm) => Some(&rwm.record.uri), _ => None, }; if let Some(uri) = quote_uri { let quoted_did = parakeet_db::utils::at_uri::extract_did(uri); if let Some(did) = quoted_did { let (aid, _, _) = crate::db::operations::feed::get_actor_id(conn, did).await?; Some(aid) } else { None } } else { None } } else { None } } else { None }; // Resolve mentioned actors from facets let mut mentions = Vec::new(); if let Some(ref facets) = post.facets { use jacquard_api::app_bsky::richtext::facet::FacetFeaturesItem; for facet in facets { for feature in &facet.features { if let FacetFeaturesItem::Mention(mention) = feature { let (aid, _, _) = crate::db::operations::feed::get_actor_id(conn, mention.did.as_ref()).await?; mentions.push(aid); } } } } (parent_author, root_author, quoted_author, mentions) } else { (None, None, None, Vec::new()) }; // Resolve via_repost natural key if present (for likes and reposts that came via a repost) let via_repost_key = if let (Some(via_uri), Some(via_cid)) = (&refs.via_uri, &refs.via_cid) { // Extract the DID and rkey from the via URI if let Some((via_did, via_rkey, _collection)) = parakeet_db::utils::at_uri::parse_at_uri(via_uri) { // Ensure the via repost actor exists let (via_actor_id, _, _) = crate::db::operations::feed::get_actor_id(conn, via_did).await?; // Get or create the repost stub using the public re-export let (repost_key, _was_created) = crate::db::operations::feed::get_repost_id( conn, via_actor_id, via_rkey, via_cid, ).await?; Some(repost_key) } else { None } } else { None }; let resolved_actor_ids = super::operations::ResolvedActorIds { subject_actor_id, service_actor_id, parent_author_actor_id, root_author_actor_id, quoted_author_actor_id, mentioned_actor_ids, via_repost_key, }; let processed = super::operations::process_record_to_operations( repo, actor_id, resolved_actor_ids, cid, record, at_uri, rkey, _source, ); // Add all operations from this record bulk_ops.individual_ops.extend(processed.operations); } let elapsed = start.elapsed(); tracing::info!( repo = %repo, total_records = total_records, bulk_ops = bulk_ops.total_count(), duration_ms = elapsed.as_millis(), "Bulk record processing completed" ); Ok(bulk_ops) } /// Process likes in bulk, returning three separate vectors by type async fn process_likes_bulk( conn: &deadpool_postgres::Object, actor_id: i32, likes: Vec, ) -> Result { let mut post_like_data = Vec::new(); let mut feedgen_like_data = Vec::new(); let mut labeler_like_data = Vec::new(); // Collect all subject URIs and CIDs for bulk resolution // We need to own the CID strings to avoid lifetime issues let cid_strings: Vec = likes.iter() .map(|like| like.record.subject.cid.to_string()) .collect(); // Separate likes by subject type let mut post_likes = Vec::new(); let mut feedgen_likes = Vec::new(); let mut labeler_likes = Vec::new(); for (i, like) in likes.iter().enumerate() { let subject_uri = &like.record.subject.uri; if subject_uri.contains("/app.bsky.feed.post/") { post_likes.push(i); } else if subject_uri.contains("/app.bsky.feed.generator/") { feedgen_likes.push(i); } else if subject_uri.contains("/app.bsky.labeler.service/") { labeler_likes.push(i); } else { tracing::warn!(uri = %subject_uri, "Unknown subject type in like"); } } // Bulk resolve posts let post_uri_cid_pairs: Vec<(&str, &str)> = post_likes.iter() .map(|&i| (likes[i].record.subject.uri.as_str(), cid_strings[i].as_str())) .collect(); let resolved_posts = if !post_uri_cid_pairs.is_empty() { bulk_resolve::resolve_and_ensure_posts_bulk(conn, &post_uri_cid_pairs).await? } else { HashMap::new() }; // Bulk resolve feedgens let feedgen_uris: Vec<&str> = feedgen_likes.iter() .map(|&i| likes[i].record.subject.uri.as_str()) .collect(); let resolved_feedgens = if !feedgen_uris.is_empty() { bulk_resolve::resolve_feedgen_uris_bulk(conn, &feedgen_uris).await? } else { HashMap::new() }; // Bulk resolve labelers // Extract DIDs from labeler AT URIs (at://did:plc:.../app.bsky.labeler.service/self) let labeler_dids: Vec<&str> = labeler_likes.iter() .filter_map(|&i| { let uri = &likes[i].record.subject.uri; parakeet_db::utils::at_uri::extract_did(uri) }) .collect(); let resolved_labelers = if !labeler_dids.is_empty() { bulk_resolve::resolve_labeler_dids_bulk(conn, &labeler_dids).await? } else { HashMap::new() }; // Bulk resolve via_repost_ids // Collect all via URIs with their corresponding subject info let mut via_cid_strings: Vec = Vec::new(); let mut via_subject_cid_strings: Vec = Vec::new(); let mut via_repost_indices: Vec = Vec::new(); for (i, like) in likes.iter().enumerate() { if let Some(ref via) = like.record.via { via_cid_strings.push(via.cid.to_string()); via_subject_cid_strings.push(cid_strings[i].clone()); via_repost_indices.push(i); } } let via_repost_data: Vec<(&str, &str, &str, &str)> = via_repost_indices.iter() .enumerate() .map(|(via_idx, &like_idx)| { let like = &likes[like_idx]; let via = like.record.via.as_ref().unwrap(); // Safe because we filtered above ( via.uri.as_str(), // repost URI via_cid_strings[via_idx].as_str(), // repost CID like.record.subject.uri.as_str(), // subject post URI (what was reposted and liked) via_subject_cid_strings[via_idx].as_str(), // subject post CID ) }) .collect(); let resolved_via_reposts = if !via_repost_data.is_empty() { bulk_resolve::resolve_and_ensure_reposts_bulk(conn, &via_repost_data).await? } else { HashMap::new() }; // Build type-specific like data for each like for like in likes { let subject_uri = &like.record.subject.uri; if subject_uri.contains("/app.bsky.feed.post/") { // Post like let &(post_actor_id, post_rkey) = resolved_posts.get(subject_uri.as_str()) .ok_or_else(|| eyre::eyre!("Post not found for URI: {}", subject_uri))?; // Resolve via_repost natural keys if present let (via_repost_actor_id, via_repost_rkey) = like.record.via.as_ref() .and_then(|via| resolved_via_reposts.get(via.uri.as_str()).copied()) .unzip(); post_like_data.push(PostLikeCopyData { actor_id, rkey: like.rkey, post_actor_id, post_rkey, via_repost_actor_id, via_repost_rkey, }); } else if subject_uri.contains("/app.bsky.feed.generator/") { // Feedgen like - only include if feedgen exists (no auto-stubbing) if let Some(&(feed_actor_id, ref feed_rkey)) = resolved_feedgens.get(subject_uri.as_str()) { feedgen_like_data.push(FeedgenLikeCopyData { actor_id, rkey: like.rkey, feed_actor_id, feed_rkey: feed_rkey.clone(), }); } else { tracing::debug!(uri = %subject_uri, "Feedgen not found for like, skipping"); } } else if subject_uri.contains("/app.bsky.labeler.service/") { // Labeler like - only include if labeler exists (no auto-stubbing) // Extract DID from labeler AT URI if let Some(labeler_did) = parakeet_db::utils::at_uri::extract_did(subject_uri) { if let Some(&labeler_actor_id) = resolved_labelers.get(labeler_did) { labeler_like_data.push(LabelerLikeCopyData { actor_id, rkey: like.rkey, labeler_actor_id, }); } else { tracing::debug!(uri = %subject_uri, "Labeler not found for like, skipping"); } } else { tracing::warn!(uri = %subject_uri, "Failed to extract DID from labeler URI"); } } else { tracing::warn!(uri = %subject_uri, "Unknown subject type in like"); } } Ok(ProcessedLikes { post_likes: post_like_data, feedgen_likes: feedgen_like_data, labeler_likes: labeler_like_data, }) } /// Process follows in bulk async fn process_follows_bulk( conn: &deadpool_postgres::Object, actor_id: i32, follows: Vec, ) -> Result> { let mut follow_data = Vec::with_capacity(follows.len()); // Collect all subject DIDs for bulk resolution let subject_dids: Vec<&str> = follows.iter().map(|f| f.record.subject.as_str()).collect(); // Resolve all subject actors (creating stubs if needed) let mut resolved_actors = bulk_resolve::resolve_actor_dids_bulk(conn, &subject_dids).await?; // Create stubs for missing actors let missing_dids: Vec<&str> = subject_dids .iter() .filter(|&&did| !resolved_actors.contains_key(did)) .copied() .collect(); if !missing_dids.is_empty() { let created = bulk_resolve::create_actor_stubs_bulk(conn, &missing_dids).await?; resolved_actors.extend(created); } // Build FollowCopyData for each follow for follow in follows { let subject_actor_id = *resolved_actors.get(follow.record.subject.as_str()) .ok_or_else(|| eyre::eyre!("Actor not found for DID: {}", follow.record.subject))?; follow_data.push(FollowCopyData { actor_id, rkey: follow.rkey, // Already converted to i64 subject_actor_id, created_at: follow.record.created_at, }); } Ok(follow_data) } /// Process reposts in bulk async fn process_reposts_bulk( conn: &deadpool_postgres::Object, actor_id: i32, reposts: Vec, ) -> Result> { let mut repost_data = Vec::with_capacity(reposts.len()); // Collect all subject URIs and CIDs for bulk resolution let cid_strings: Vec = reposts.iter() .map(|repost| repost.record.subject.cid.to_string()) .collect(); let uri_cid_pairs: Vec<(&str, &str)> = reposts.iter() .zip(cid_strings.iter()) .map(|(repost, cid_str)| (repost.record.subject.uri.as_str(), cid_str.as_str())) .collect(); // Bulk resolve all subject posts let resolved_posts = bulk_resolve::resolve_and_ensure_posts_bulk(conn, &uri_cid_pairs).await?; // Bulk resolve via_repost_ids // Collect all via URIs with their corresponding subject info let mut via_cid_strings: Vec = Vec::new(); let mut via_subject_cid_strings: Vec = Vec::new(); let mut via_repost_indices: Vec = Vec::new(); for (i, repost) in reposts.iter().enumerate() { if let Some(ref via) = repost.record.via { via_cid_strings.push(via.cid.to_string()); via_subject_cid_strings.push(cid_strings[i].clone()); via_repost_indices.push(i); } } let via_repost_data: Vec<(&str, &str, &str, &str)> = via_repost_indices.iter() .enumerate() .map(|(via_idx, &repost_idx)| { let repost = &reposts[repost_idx]; let via = repost.record.via.as_ref().unwrap(); // Safe because we filtered above ( via.uri.as_str(), // via repost URI via_cid_strings[via_idx].as_str(), // via repost CID repost.record.subject.uri.as_str(), // subject post URI (what was reposted) via_subject_cid_strings[via_idx].as_str(), // subject post CID ) }) .collect(); let resolved_via_reposts = if !via_repost_data.is_empty() { bulk_resolve::resolve_and_ensure_reposts_bulk(conn, &via_repost_data).await? } else { HashMap::new() }; // Build RepostCopyData for each repost for repost in reposts { let &(post_actor_id, post_rkey) = resolved_posts.get(repost.record.subject.uri.as_str()) .ok_or_else(|| eyre::eyre!("Post not found for URI: {}", repost.record.subject.uri))?; // Get CID digest (real CID for reposts) let cid_bytes = repost.cid.to_bytes(); let cid_digest = parakeet_db::utils::cid::cid_to_digest(&cid_bytes) .ok_or_else(|| eyre::eyre!("Invalid CID for repost"))?; // Resolve via_repost natural keys if present let (via_repost_actor_id, via_repost_rkey) = repost.record.via.as_ref() .and_then(|via| resolved_via_reposts.get(via.uri.as_str()).copied()) .unzip(); repost_data.push(RepostCopyData { actor_id, rkey: repost.rkey, // Already converted to i64 post_actor_id, post_rkey, cid: cid_digest.to_vec(), created_at: repost.record.created_at, via_repost_actor_id, via_repost_rkey, }); } Ok(repost_data) } /// Process blocks in bulk async fn process_blocks_bulk( conn: &deadpool_postgres::Object, actor_id: i32, blocks: Vec, ) -> Result> { let mut block_data = Vec::with_capacity(blocks.len()); // Collect all subject DIDs for bulk resolution let subject_dids: Vec<&str> = blocks.iter().map(|b| b.record.subject.as_str()).collect(); // Resolve all subject actors let mut resolved_actors = bulk_resolve::resolve_actor_dids_bulk(conn, &subject_dids).await?; // Create stubs for missing actors let missing_dids: Vec<&str> = subject_dids .iter() .filter(|&&did| !resolved_actors.contains_key(did)) .copied() .collect(); if !missing_dids.is_empty() { let created = bulk_resolve::create_actor_stubs_bulk(conn, &missing_dids).await?; resolved_actors.extend(created); } // Build BlockCopyData for each block for block in blocks { let subject_actor_id = *resolved_actors.get(block.record.subject.as_str()) .ok_or_else(|| eyre::eyre!("Actor not found for DID: {}", block.record.subject))?; block_data.push(BlockCopyData { actor_id, rkey: block.rkey, // Already converted to i64 subject_actor_id, created_at: block.record.created_at, }); } Ok(block_data) } /// Process posts in bulk /// /// Converts PostMetadata into PostCopyData by: /// 1. Resolving all FK references (parent/root posts, mentions) - stubs already exist /// 2. Extracting and processing embeds (images, video, external, record) /// 3. Extracting facets (links, mentions, tags) /// 4. Compressing content and tokenizing for search /// /// All referenced entities (posts, actors) have stubs created by resolve_all_references_bulk async fn process_posts_bulk( conn: &deadpool_postgres::Object, _repo: &str, actor_id: i32, posts: Vec, ) -> Result> { use crate::search::SearchTokenizer; use crate::types::records::EmbedOuter; use crate::utils::{extract_mentions_and_tags, merge_tags}; use parakeet_db::compression::PostContentCodec; let mut post_data = Vec::with_capacity(posts.len()); // Step 1: Collect all parent/root/quoted URIs for bulk lookup let mut post_uris_to_lookup: Vec = Vec::new(); for post in &posts { // Collect parent/root from replies if let Some(reply) = &post.record.reply { post_uris_to_lookup.push(reply.parent.uri.to_string()); // Only add root if different from parent if reply.root.uri != reply.parent.uri { post_uris_to_lookup.push(reply.root.uri.to_string()); } } // Collect quoted post URIs from embeds // Only collect post URIs - embeds can also be feedgens, profiles, lists, etc. if let Some(ref embed) = post.record.embed { if let Some(bsky_embed) = embed.as_bsky() { match bsky_embed { AppBskyEmbed::Record(r) => { if r.record.uri.as_str().contains("/app.bsky.feed.post/") { post_uris_to_lookup.push(r.record.uri.to_string()); } } AppBskyEmbed::RecordWithMedia(rwm) => { if rwm.record.uri.as_str().contains("/app.bsky.feed.post/") { post_uris_to_lookup.push(rwm.record.uri.to_string()); } } _ => {} } } } } // Step 2: Bulk resolve post URIs to post_ids (stubs already exist) let uri_to_post_id = if !post_uris_to_lookup.is_empty() { let uris: Vec<&str> = post_uris_to_lookup.iter().map(|s| s.as_str()).collect(); bulk_resolve::resolve_post_uris_bulk(conn, &uris).await? } else { std::collections::HashMap::new() }; // Step 3: Extract mentions and tags from all posts in one pass // Store tags indexed by post position for later use let mut mention_dids: Vec = Vec::new(); let mut tags_by_post: Vec> = Vec::with_capacity(posts.len()); for post in &posts { if let Some(facets) = &post.record.facets { let (mentions, tags) = extract_mentions_and_tags(facets.as_slice()); mention_dids.extend(mentions); tags_by_post.push(tags); } else { tags_by_post.push(Vec::new()); } } // Step 4: Bulk resolve mention DIDs to actor_ids (stubs already exist) let did_to_actor_id = if !mention_dids.is_empty() { let dids: Vec<&str> = mention_dids.iter().map(|s| s.as_str()).collect(); bulk_resolve::resolve_actor_dids_bulk(conn, &dids).await? } else { std::collections::HashMap::new() }; // Step 5: Process each post let codec = PostContentCodec::new(); let tokenizer = SearchTokenizer::new(); for (post_idx, post_meta) in posts.into_iter().enumerate() { let PostMetadata { rkey, cid, record } = post_meta; // Extract CID digest let cid_bytes = cid.to_bytes(); let cid_digest = parakeet_db::utils::cid::cid_to_digest(&cid_bytes) .expect("Valid CID should have digest"); // Resolve parent and root post natural keys let (parent_post_actor_id, parent_post_rkey, root_post_actor_id, root_post_rkey) = if let Some(reply) = &record.reply { let parent_nk = uri_to_post_id.get(reply.parent.uri.as_str()).copied(); let root_nk = if reply.root.uri != reply.parent.uri { uri_to_post_id.get(reply.root.uri.as_str()).copied() } else { parent_nk // Root is same as parent }; let (parent_actor_id, parent_rkey) = parent_nk.unzip(); let (root_actor_id, root_rkey) = root_nk.unzip(); (parent_actor_id, parent_rkey, root_actor_id, root_rkey) } else { (None, None, None, None) }; // Use pre-extracted tags from Step 3 let tags_from_facets = tags_by_post[post_idx].clone(); // Merge tags from facets and record.tags let tags = merge_tags(Some(tags_from_facets), record.tags.clone()); // Compress content let content_compressed = if record.text.is_empty() { None } else { Some(codec.compress(&record.text)?) }; // Tokenize for search let tokens = tokenizer.tokenize(&record.text); // Extract embed type and subtype // Inline lexicon_to_embed_type mapping let lexicon_to_embed_type = |lexicon: &str| -> String { match lexicon { "app.bsky.embed.images" => "images", "app.bsky.embed.video" => "video", "app.bsky.embed.external" => "external", "app.bsky.embed.record" => "record", "app.bsky.embed.recordWithMedia" => "record_with_media", _ => lexicon, }.to_string() }; let (embed_type, embed_subtype) = if let Some(ref embed_outer) = record.embed { let type_str = EmbedOuter::as_str(embed_outer); let embed_type = Some(lexicon_to_embed_type(type_str)); let embed_subtype = EmbedOuter::subtype(embed_outer) .map(lexicon_to_embed_type); (embed_type, embed_subtype) } else { (None, None) }; // For backfills, violates_threadgate is always false (we don't check gates for historical data) let violates_threadgate = false; // Build composite fields from embed using composite_builders let (ext_embed, video_embed, image_1, image_2, image_3, image_4, embedded_post_actor_id, embedded_post_rkey, record_detached) = if let Some(embed_outer) = record.embed.and_then(|e| e.into_bsky()) { match embed_outer { AppBskyEmbed::Images(images) => { let (i1, i2, i3, i4) = composite_builders::build_image_embeds(&images); (None, None, i1, i2, i3, i4, None, None, None) } AppBskyEmbed::Video(video) => { (None, composite_builders::build_video_embed(&video), None, None, None, None, None, None, None) } AppBskyEmbed::External(external) => { (composite_builders::build_ext_embed(&external), None, None, None, None, None, None, None, None) } AppBskyEmbed::Record(rec) => { let embedded_post_nk = uri_to_post_id.get(rec.record.uri.as_str()).copied(); let (embedded_actor_id, embedded_rkey) = embedded_post_nk.unzip(); (None, None, None, None, None, None, embedded_actor_id, embedded_rkey, Some(false)) } AppBskyEmbed::RecordWithMedia(rwm) => { // Process media part let (ext, vid, i1, i2, i3, i4) = match &rwm.media { MediaEmbed::Images(images) => { let (i1, i2, i3, i4) = composite_builders::build_image_embeds(&images); (None, None, i1, i2, i3, i4) } MediaEmbed::Video(video) => { (None, composite_builders::build_video_embed(&video), None, None, None, None) } MediaEmbed::External(external) => { (composite_builders::build_ext_embed(&external), None, None, None, None, None) } }; // Process record part let embedded_post_nk = uri_to_post_id.get(rwm.record.uri.as_str()).copied(); let (embedded_actor_id, embedded_rkey) = embedded_post_nk.unzip(); (ext, vid, i1, i2, i3, i4, embedded_actor_id, embedded_rkey, Some(false)) } } } else { (None, None, None, None, None, None, None, None, None) }; // Build facet composites and mention array using composite_builders let (facet_1, facet_2, facet_3, facet_4, facet_5, facet_6, facet_7, facet_8, mentions) = if let Some(ref facets) = record.facets { let (f1, f2, f3, f4, f5, f6, f7, f8) = composite_builders::build_facet_embeds(facets, &did_to_actor_id); let m = composite_builders::extract_mention_actor_ids(facets, &did_to_actor_id); (f1, f2, f3, f4, f5, f6, f7, f8, m) } else { (None, None, None, None, None, None, None, None, None) }; post_data.push(PostCopyData { actor_id, rkey, cid: cid_digest.to_vec(), content_compressed, langs: record.langs.unwrap_or_default(), tags, parent_post_actor_id, parent_post_rkey, root_post_actor_id, root_post_rkey, embed_type, embed_subtype, violates_threadgate, tokens, ext_embed, video_embed, image_1, image_2, image_3, image_4, embedded_post_actor_id, embedded_post_rkey, record_detached, facet_1, facet_2, facet_3, facet_4, facet_5, facet_6, facet_7, facet_8, mentions, }); } Ok(post_data) }