Rust AppView - highly experimental!
1//! Bulk record processing for COPY operations 2//! 3//! This module processes UnresolvedBulk events into ResolvedBulk events by: 4//! 1. Grouping records by type (likes, follows, posts, etc.) 5//! 2. Bulk resolving all foreign keys in single queries 6//! 3. Converting to COPY-ready data structures 7//! 8//! For records that can't be bulk processed (profiles, gates, etc.), 9//! they're converted to individual DatabaseOperations. 10 11use super::bulk_types::{ 12 BlockCopyData, BulkOperations, FeedgenLikeCopyData, FollowCopyData, LabelerLikeCopyData, 13 PostCopyData, PostLikeCopyData, RepostCopyData, 14 UnresolvedRecord, 15}; 16use super::EventSource; 17use crate::db::{bulk_resolve, composite_builders}; 18use crate::relay::types::RecordTypes; 19use crate::types::records::{AppBskyEmbed, MediaEmbed}; 20use crate::Result; 21use std::collections::HashMap; 22 23/// Metadata for a like record extracted from UnresolvedRecord 24struct LikeMetadata { 25 rkey: i64, 26 record: crate::types::records::AppBskyFeedLike, 27} 28 29/// Processed like data separated by type 30struct ProcessedLikes { 31 post_likes: Vec<PostLikeCopyData>, 32 feedgen_likes: Vec<FeedgenLikeCopyData>, 33 labeler_likes: Vec<LabelerLikeCopyData>, 34} 35 36/// Metadata for a follow record extracted from UnresolvedRecord 37struct FollowMetadata { 38 rkey: i64, 39 record: crate::types::records::AppBskyGraphFollow, 40} 41 42/// Metadata for a repost record extracted from UnresolvedRecord 43struct RepostMetadata { 44 rkey: i64, 45 cid: ipld_core::cid::Cid, 46 record: crate::types::records::AppBskyFeedRepost, 47} 48 49/// Metadata for a block record extracted from UnresolvedRecord 50struct BlockMetadata { 51 rkey: i64, 52 record: crate::types::records::AppBskyGraphBlock, 53} 54 55/// Metadata for a post record extracted from UnresolvedRecord 56struct PostMetadata { 57 rkey: i64, 58 cid: ipld_core::cid::Cid, 59 record: crate::types::records::AppBskyFeedPost, 60} 61 62/// Process a batch of unresolved records into bulk operations 63/// 64/// This function: 65/// 1. Groups records by type 66/// 2. Resolves all foreign keys in bulk (single query per FK type) 67/// 3. Converts to COPY-ready structures 68/// 4. Returns BulkOperations + individual operations 69pub async fn process_bulk_records( 70 conn: &deadpool_postgres::Object, 71 repo: &str, 72 actor_id: i32, 73 records: Vec<UnresolvedRecord>, 74 _source: EventSource, 75) -> Result<BulkOperations> { 76 let start = std::time::Instant::now(); 77 let total_records = records.len(); 78 79 tracing::info!( 80 repo = %repo, 81 actor_id = actor_id, 82 records = total_records, 83 "Starting bulk record processing" 84 ); 85 86 let mut bulk_ops = BulkOperations::new(); 87 88 // Group records by type - extracting only metadata we need 89 let mut likes = Vec::new(); 90 let mut follows = Vec::new(); 91 let mut reposts = Vec::new(); 92 let mut blocks = Vec::new(); 93 let mut posts = Vec::new(); 94 let mut individual_records = Vec::new(); 95 96 for unresolved in records { 97 // Extract metadata we need before moving the record 98 let UnresolvedRecord { at_uri, rkey, cid, record } = unresolved; 99 let record = *record; // Unbox 100 101 match record { 102 RecordTypes::AppBskyFeedLike(like_record) => { 103 // Validate TID timestamp (matches behavior in individual handlers) 104 if let Err(e) = crate::database_writer::validate_tid_timestamp(&rkey) { 105 tracing::warn!(rkey = %rkey, error = %e, "Invalid like TID timestamp, skipping"); 106 continue; 107 } 108 let rkey_i64 = match parakeet_db::models::tid_to_i64(&rkey) { 109 Ok(val) => val, 110 Err(e) => { 111 tracing::warn!(rkey = %rkey, error = ?e, "Failed to convert like TID to i64, skipping"); 112 continue; 113 } 114 }; 115 likes.push(LikeMetadata { 116 rkey: rkey_i64, 117 record: like_record, 118 }); 119 } 120 RecordTypes::AppBskyGraphFollow(follow_record) => { 121 // Validate TID timestamp 122 if let Err(e) = crate::database_writer::validate_tid_timestamp(&rkey) { 123 tracing::warn!(rkey = %rkey, error = %e, "Invalid follow TID timestamp, skipping"); 124 continue; 125 } 126 let rkey_i64 = match parakeet_db::models::tid_to_i64(&rkey) { 127 Ok(val) => val, 128 Err(e) => { 129 tracing::warn!(rkey = %rkey, error = ?e, "Failed to convert follow TID to i64, skipping"); 130 continue; 131 } 132 }; 133 follows.push(FollowMetadata { 134 rkey: rkey_i64, 135 record: follow_record, 136 }); 137 } 138 RecordTypes::AppBskyFeedRepost(repost_record) => { 139 // Validate TID timestamp 140 if let Err(e) = crate::database_writer::validate_tid_timestamp(&rkey) { 141 tracing::warn!(rkey = %rkey, error = %e, "Invalid repost TID timestamp, skipping"); 142 continue; 143 } 144 let rkey_i64 = match parakeet_db::models::tid_to_i64(&rkey) { 145 Ok(val) => val, 146 Err(e) => { 147 tracing::warn!(rkey = %rkey, error = ?e, "Failed to convert repost TID to i64, skipping"); 148 continue; 149 } 150 }; 151 reposts.push(RepostMetadata { 152 rkey: rkey_i64, 153 cid, 154 record: repost_record, 155 }); 156 } 157 RecordTypes::AppBskyGraphBlock(block_record) => { 158 // Validate TID timestamp 159 if let Err(e) = crate::database_writer::validate_tid_timestamp(&rkey) { 160 tracing::warn!(rkey = %rkey, error = %e, "Invalid block TID timestamp, skipping"); 161 continue; 162 } 163 let rkey_i64 = match parakeet_db::models::tid_to_i64(&rkey) { 164 Ok(val) => val, 165 Err(e) => { 166 tracing::warn!(rkey = %rkey, error = ?e, "Failed to convert block TID to i64, skipping"); 167 continue; 168 } 169 }; 170 blocks.push(BlockMetadata { 171 rkey: rkey_i64, 172 record: block_record, 173 }); 174 } 175 RecordTypes::AppBskyFeedPost(post_record) => { 176 // Validate TID timestamp for posts 177 if let Err(e) = crate::database_writer::validate_tid_timestamp(&rkey) { 178 tracing::warn!(rkey = %rkey, error = %e, "Invalid post TID timestamp, skipping"); 179 continue; 180 } 181 let rkey_i64 = match parakeet_db::models::tid_to_i64(&rkey) { 182 Ok(val) => val, 183 Err(e) => { 184 tracing::warn!(rkey = %rkey, error = ?e, "Failed to convert post TID to i64, skipping"); 185 continue; 186 } 187 }; 188 posts.push(PostMetadata { 189 rkey: rkey_i64, 190 cid, 191 record: post_record, 192 }); 193 } 194 other => { 195 // Everything else (profiles, gates, lists, etc.) goes through individual path 196 // These may have non-TID rkeys (e.g., profiles use "self") 197 individual_records.push(UnresolvedRecord { 198 at_uri, 199 rkey, 200 cid, 201 record: Box::new(other), 202 }); 203 } 204 } 205 } 206 207 tracing::debug!( 208 likes = likes.len(), 209 follows = follows.len(), 210 reposts = reposts.len(), 211 blocks = blocks.len(), 212 posts = posts.len(), 213 individual = individual_records.len(), 214 "Grouped records by type" 215 ); 216 217 // Process likes in bulk 218 if !likes.is_empty() { 219 let processed_likes = process_likes_bulk(conn, actor_id, likes).await?; 220 bulk_ops.post_likes = processed_likes.post_likes; 221 bulk_ops.feedgen_likes = processed_likes.feedgen_likes; 222 bulk_ops.labeler_likes = processed_likes.labeler_likes; 223 } 224 225 // Process follows in bulk 226 if !follows.is_empty() { 227 let follow_data = process_follows_bulk(conn, actor_id, follows).await?; 228 bulk_ops.follows = follow_data; 229 } 230 231 // Process reposts in bulk 232 if !reposts.is_empty() { 233 let repost_data = process_reposts_bulk(conn, actor_id, reposts).await?; 234 bulk_ops.reposts = repost_data; 235 } 236 237 // Process blocks in bulk 238 if !blocks.is_empty() { 239 let block_data = process_blocks_bulk(conn, actor_id, blocks).await?; 240 bulk_ops.blocks = block_data; 241 } 242 243 // Process posts in bulk 244 if !posts.is_empty() { 245 let post_data = process_posts_bulk(conn, repo, actor_id, posts).await?; 246 bulk_ops.posts = post_data; 247 } 248 249 // Convert individual records to DatabaseOperations 250 // These are records that can't be bulk-processed (posts, profiles, gates, lists, etc) 251 // We need to properly resolve all referenced actors to create stubs and maintain referential integrity 252 for unresolved in individual_records { 253 let UnresolvedRecord { at_uri, rkey, cid, record } = unresolved; 254 let record = *record; // Unbox 255 256 // Extract all referenced actors/posts from the record 257 let refs = super::extract_references(&record); 258 259 // Resolve subject actor if present (e.g., for follows, blocks) 260 let subject_actor_id = if let Some(subject_did) = refs.subject_did { 261 let (actor_id, _, _) = crate::db::operations::feed::get_actor_id(conn, &subject_did).await?; 262 Some(actor_id) 263 } else { 264 None 265 }; 266 267 // Resolve ALL additional DIDs (feedgen service DID, threadgate/listblock list owners) 268 // These actors need to exist but don't have direct FK relationships in the record table 269 // We ensure they exist here to prevent FK violations in related operations 270 for additional_did in &refs.additional_dids { 271 let _ = crate::db::operations::feed::get_actor_id(conn, additional_did).await?; 272 } 273 274 // For FeedGenerator records, the first (and only) additional DID is the service actor 275 // For other record types with additional_dids, we just ensure they exist above 276 let service_actor_id = if let Some(first_did) = refs.additional_dids.first() { 277 match crate::db::operations::feed::get_actor_id(conn, first_did).await { 278 Ok((actor_id, _, _)) => Some(actor_id), 279 Err(e) => { 280 tracing::warn!( 281 at_uri = %at_uri, 282 service_did = %first_did, 283 error = ?e, 284 "Failed to resolve service actor ID - skipping record" 285 ); 286 None 287 } 288 } 289 } else { 290 None 291 }; 292 293 // For posts, resolve parent/root/quoted authors and mentioned actors 294 let (parent_author_actor_id, root_author_actor_id, quoted_author_actor_id, mentioned_actor_ids) = 295 if let RecordTypes::AppBskyFeedPost(ref post) = record { 296 // Resolve parent author 297 let parent_author = if let Some(ref reply) = post.reply { 298 let parent_did = parakeet_db::utils::at_uri::extract_did(&reply.parent.uri); 299 if let Some(did) = parent_did { 300 let (aid, _, _) = crate::db::operations::feed::get_actor_id(conn, did).await?; 301 Some(aid) 302 } else { 303 None 304 } 305 } else { 306 None 307 }; 308 309 // Resolve root author (if different from parent) 310 let root_author = if let Some(ref reply) = post.reply { 311 if reply.root.uri != reply.parent.uri { 312 let root_did = parakeet_db::utils::at_uri::extract_did(&reply.root.uri); 313 if let Some(did) = root_did { 314 let (aid, _, _) = crate::db::operations::feed::get_actor_id(conn, did).await?; 315 Some(aid) 316 } else { 317 None 318 } 319 } else { 320 parent_author 321 } 322 } else { 323 None 324 }; 325 326 // Resolve quoted post author (from embed) 327 let quoted_author = if let Some(ref embed) = post.embed { 328 if let Some(bsky_embed) = embed.as_bsky() { 329 let quote_uri = match bsky_embed { 330 crate::types::records::AppBskyEmbed::Record(r) => Some(&r.record.uri), 331 crate::types::records::AppBskyEmbed::RecordWithMedia(rwm) => Some(&rwm.record.uri), 332 _ => None, 333 }; 334 335 if let Some(uri) = quote_uri { 336 let quoted_did = parakeet_db::utils::at_uri::extract_did(uri); 337 if let Some(did) = quoted_did { 338 let (aid, _, _) = crate::db::operations::feed::get_actor_id(conn, did).await?; 339 Some(aid) 340 } else { 341 None 342 } 343 } else { 344 None 345 } 346 } else { 347 None 348 } 349 } else { 350 None 351 }; 352 353 // Resolve mentioned actors from facets 354 let mut mentions = Vec::new(); 355 if let Some(ref facets) = post.facets { 356 use jacquard_api::app_bsky::richtext::facet::FacetFeaturesItem; 357 for facet in facets { 358 for feature in &facet.features { 359 if let FacetFeaturesItem::Mention(mention) = feature { 360 let (aid, _, _) = crate::db::operations::feed::get_actor_id(conn, mention.did.as_ref()).await?; 361 mentions.push(aid); 362 } 363 } 364 } 365 } 366 367 (parent_author, root_author, quoted_author, mentions) 368 } else { 369 (None, None, None, Vec::new()) 370 }; 371 372 // Resolve via_repost natural key if present (for likes and reposts that came via a repost) 373 let via_repost_key = if let (Some(via_uri), Some(via_cid)) = (&refs.via_uri, &refs.via_cid) { 374 // Extract the DID and rkey from the via URI 375 if let Some((via_did, via_rkey, _collection)) = parakeet_db::utils::at_uri::parse_at_uri(via_uri) { 376 // Ensure the via repost actor exists 377 let (via_actor_id, _, _) = crate::db::operations::feed::get_actor_id(conn, via_did).await?; 378 379 // Get or create the repost stub using the public re-export 380 let (repost_key, _was_created) = crate::db::operations::feed::get_repost_id( 381 conn, 382 via_actor_id, 383 via_rkey, 384 via_cid, 385 ).await?; 386 387 Some(repost_key) 388 } else { 389 None 390 } 391 } else { 392 None 393 }; 394 395 let resolved_actor_ids = super::operations::ResolvedActorIds { 396 subject_actor_id, 397 service_actor_id, 398 parent_author_actor_id, 399 root_author_actor_id, 400 quoted_author_actor_id, 401 mentioned_actor_ids, 402 via_repost_key, 403 }; 404 405 let processed = super::operations::process_record_to_operations( 406 repo, 407 actor_id, 408 resolved_actor_ids, 409 cid, 410 record, 411 at_uri, 412 rkey, 413 _source, 414 ); 415 416 // Add all operations from this record 417 bulk_ops.individual_ops.extend(processed.operations); 418 } 419 420 let elapsed = start.elapsed(); 421 tracing::info!( 422 repo = %repo, 423 total_records = total_records, 424 bulk_ops = bulk_ops.total_count(), 425 duration_ms = elapsed.as_millis(), 426 "Bulk record processing completed" 427 ); 428 429 Ok(bulk_ops) 430} 431 432/// Process likes in bulk, returning three separate vectors by type 433async fn process_likes_bulk( 434 conn: &deadpool_postgres::Object, 435 actor_id: i32, 436 likes: Vec<LikeMetadata>, 437) -> Result<ProcessedLikes> { 438 let mut post_like_data = Vec::new(); 439 let mut feedgen_like_data = Vec::new(); 440 let mut labeler_like_data = Vec::new(); 441 442 // Collect all subject URIs and CIDs for bulk resolution 443 // We need to own the CID strings to avoid lifetime issues 444 let cid_strings: Vec<String> = likes.iter() 445 .map(|like| like.record.subject.cid.to_string()) 446 .collect(); 447 448 // Separate likes by subject type 449 let mut post_likes = Vec::new(); 450 let mut feedgen_likes = Vec::new(); 451 let mut labeler_likes = Vec::new(); 452 453 for (i, like) in likes.iter().enumerate() { 454 let subject_uri = &like.record.subject.uri; 455 if subject_uri.contains("/app.bsky.feed.post/") { 456 post_likes.push(i); 457 } else if subject_uri.contains("/app.bsky.feed.generator/") { 458 feedgen_likes.push(i); 459 } else if subject_uri.contains("/app.bsky.labeler.service/") { 460 labeler_likes.push(i); 461 } else { 462 tracing::warn!(uri = %subject_uri, "Unknown subject type in like"); 463 } 464 } 465 466 // Bulk resolve posts 467 let post_uri_cid_pairs: Vec<(&str, &str)> = post_likes.iter() 468 .map(|&i| (likes[i].record.subject.uri.as_str(), cid_strings[i].as_str())) 469 .collect(); 470 let resolved_posts = if !post_uri_cid_pairs.is_empty() { 471 bulk_resolve::resolve_and_ensure_posts_bulk(conn, &post_uri_cid_pairs).await? 472 } else { 473 HashMap::new() 474 }; 475 476 // Bulk resolve feedgens 477 let feedgen_uris: Vec<&str> = feedgen_likes.iter() 478 .map(|&i| likes[i].record.subject.uri.as_str()) 479 .collect(); 480 let resolved_feedgens = if !feedgen_uris.is_empty() { 481 bulk_resolve::resolve_feedgen_uris_bulk(conn, &feedgen_uris).await? 482 } else { 483 HashMap::new() 484 }; 485 486 // Bulk resolve labelers 487 // Extract DIDs from labeler AT URIs (at://did:plc:.../app.bsky.labeler.service/self) 488 let labeler_dids: Vec<&str> = labeler_likes.iter() 489 .filter_map(|&i| { 490 let uri = &likes[i].record.subject.uri; 491 parakeet_db::utils::at_uri::extract_did(uri) 492 }) 493 .collect(); 494 let resolved_labelers = if !labeler_dids.is_empty() { 495 bulk_resolve::resolve_labeler_dids_bulk(conn, &labeler_dids).await? 496 } else { 497 HashMap::new() 498 }; 499 500 // Bulk resolve via_repost_ids 501 // Collect all via URIs with their corresponding subject info 502 let mut via_cid_strings: Vec<String> = Vec::new(); 503 let mut via_subject_cid_strings: Vec<String> = Vec::new(); 504 let mut via_repost_indices: Vec<usize> = Vec::new(); 505 506 for (i, like) in likes.iter().enumerate() { 507 if let Some(ref via) = like.record.via { 508 via_cid_strings.push(via.cid.to_string()); 509 via_subject_cid_strings.push(cid_strings[i].clone()); 510 via_repost_indices.push(i); 511 } 512 } 513 514 let via_repost_data: Vec<(&str, &str, &str, &str)> = via_repost_indices.iter() 515 .enumerate() 516 .map(|(via_idx, &like_idx)| { 517 let like = &likes[like_idx]; 518 let via = like.record.via.as_ref().unwrap(); // Safe because we filtered above 519 ( 520 via.uri.as_str(), // repost URI 521 via_cid_strings[via_idx].as_str(), // repost CID 522 like.record.subject.uri.as_str(), // subject post URI (what was reposted and liked) 523 via_subject_cid_strings[via_idx].as_str(), // subject post CID 524 ) 525 }) 526 .collect(); 527 528 let resolved_via_reposts = if !via_repost_data.is_empty() { 529 bulk_resolve::resolve_and_ensure_reposts_bulk(conn, &via_repost_data).await? 530 } else { 531 HashMap::new() 532 }; 533 534 // Build type-specific like data for each like 535 for like in likes { 536 let subject_uri = &like.record.subject.uri; 537 538 if subject_uri.contains("/app.bsky.feed.post/") { 539 // Post like 540 let &(post_actor_id, post_rkey) = resolved_posts.get(subject_uri.as_str()) 541 .ok_or_else(|| eyre::eyre!("Post not found for URI: {}", subject_uri))?; 542 543 // Resolve via_repost natural keys if present 544 let (via_repost_actor_id, via_repost_rkey) = like.record.via.as_ref() 545 .and_then(|via| resolved_via_reposts.get(via.uri.as_str()).copied()) 546 .unzip(); 547 548 post_like_data.push(PostLikeCopyData { 549 actor_id, 550 rkey: like.rkey, 551 post_actor_id, 552 post_rkey, 553 via_repost_actor_id, 554 via_repost_rkey, 555 }); 556 } else if subject_uri.contains("/app.bsky.feed.generator/") { 557 // Feedgen like - only include if feedgen exists (no auto-stubbing) 558 if let Some(&(feed_actor_id, ref feed_rkey)) = resolved_feedgens.get(subject_uri.as_str()) { 559 feedgen_like_data.push(FeedgenLikeCopyData { 560 actor_id, 561 rkey: like.rkey, 562 feed_actor_id, 563 feed_rkey: feed_rkey.clone(), 564 }); 565 } else { 566 tracing::debug!(uri = %subject_uri, "Feedgen not found for like, skipping"); 567 } 568 } else if subject_uri.contains("/app.bsky.labeler.service/") { 569 // Labeler like - only include if labeler exists (no auto-stubbing) 570 // Extract DID from labeler AT URI 571 if let Some(labeler_did) = parakeet_db::utils::at_uri::extract_did(subject_uri) { 572 if let Some(&labeler_actor_id) = resolved_labelers.get(labeler_did) { 573 labeler_like_data.push(LabelerLikeCopyData { 574 actor_id, 575 rkey: like.rkey, 576 labeler_actor_id, 577 }); 578 } else { 579 tracing::debug!(uri = %subject_uri, "Labeler not found for like, skipping"); 580 } 581 } else { 582 tracing::warn!(uri = %subject_uri, "Failed to extract DID from labeler URI"); 583 } 584 } else { 585 tracing::warn!(uri = %subject_uri, "Unknown subject type in like"); 586 } 587 } 588 589 Ok(ProcessedLikes { 590 post_likes: post_like_data, 591 feedgen_likes: feedgen_like_data, 592 labeler_likes: labeler_like_data, 593 }) 594} 595 596/// Process follows in bulk 597async fn process_follows_bulk( 598 conn: &deadpool_postgres::Object, 599 actor_id: i32, 600 follows: Vec<FollowMetadata>, 601) -> Result<Vec<FollowCopyData>> { 602 let mut follow_data = Vec::with_capacity(follows.len()); 603 604 // Collect all subject DIDs for bulk resolution 605 let subject_dids: Vec<&str> = follows.iter().map(|f| f.record.subject.as_str()).collect(); 606 607 // Resolve all subject actors (creating stubs if needed) 608 let mut resolved_actors = bulk_resolve::resolve_actor_dids_bulk(conn, &subject_dids).await?; 609 610 // Create stubs for missing actors 611 let missing_dids: Vec<&str> = subject_dids 612 .iter() 613 .filter(|&&did| !resolved_actors.contains_key(did)) 614 .copied() 615 .collect(); 616 617 if !missing_dids.is_empty() { 618 let created = bulk_resolve::create_actor_stubs_bulk(conn, &missing_dids).await?; 619 resolved_actors.extend(created); 620 } 621 622 // Build FollowCopyData for each follow 623 for follow in follows { 624 let subject_actor_id = *resolved_actors.get(follow.record.subject.as_str()) 625 .ok_or_else(|| eyre::eyre!("Actor not found for DID: {}", follow.record.subject))?; 626 627 follow_data.push(FollowCopyData { 628 actor_id, 629 rkey: follow.rkey, // Already converted to i64 630 subject_actor_id, 631 created_at: follow.record.created_at, 632 }); 633 } 634 635 Ok(follow_data) 636} 637 638/// Process reposts in bulk 639async fn process_reposts_bulk( 640 conn: &deadpool_postgres::Object, 641 actor_id: i32, 642 reposts: Vec<RepostMetadata>, 643) -> Result<Vec<RepostCopyData>> { 644 let mut repost_data = Vec::with_capacity(reposts.len()); 645 646 // Collect all subject URIs and CIDs for bulk resolution 647 let cid_strings: Vec<String> = reposts.iter() 648 .map(|repost| repost.record.subject.cid.to_string()) 649 .collect(); 650 651 let uri_cid_pairs: Vec<(&str, &str)> = reposts.iter() 652 .zip(cid_strings.iter()) 653 .map(|(repost, cid_str)| (repost.record.subject.uri.as_str(), cid_str.as_str())) 654 .collect(); 655 656 // Bulk resolve all subject posts 657 let resolved_posts = bulk_resolve::resolve_and_ensure_posts_bulk(conn, &uri_cid_pairs).await?; 658 659 // Bulk resolve via_repost_ids 660 // Collect all via URIs with their corresponding subject info 661 let mut via_cid_strings: Vec<String> = Vec::new(); 662 let mut via_subject_cid_strings: Vec<String> = Vec::new(); 663 let mut via_repost_indices: Vec<usize> = Vec::new(); 664 665 for (i, repost) in reposts.iter().enumerate() { 666 if let Some(ref via) = repost.record.via { 667 via_cid_strings.push(via.cid.to_string()); 668 via_subject_cid_strings.push(cid_strings[i].clone()); 669 via_repost_indices.push(i); 670 } 671 } 672 673 let via_repost_data: Vec<(&str, &str, &str, &str)> = via_repost_indices.iter() 674 .enumerate() 675 .map(|(via_idx, &repost_idx)| { 676 let repost = &reposts[repost_idx]; 677 let via = repost.record.via.as_ref().unwrap(); // Safe because we filtered above 678 ( 679 via.uri.as_str(), // via repost URI 680 via_cid_strings[via_idx].as_str(), // via repost CID 681 repost.record.subject.uri.as_str(), // subject post URI (what was reposted) 682 via_subject_cid_strings[via_idx].as_str(), // subject post CID 683 ) 684 }) 685 .collect(); 686 687 let resolved_via_reposts = if !via_repost_data.is_empty() { 688 bulk_resolve::resolve_and_ensure_reposts_bulk(conn, &via_repost_data).await? 689 } else { 690 HashMap::new() 691 }; 692 693 // Build RepostCopyData for each repost 694 for repost in reposts { 695 let &(post_actor_id, post_rkey) = resolved_posts.get(repost.record.subject.uri.as_str()) 696 .ok_or_else(|| eyre::eyre!("Post not found for URI: {}", repost.record.subject.uri))?; 697 698 // Get CID digest (real CID for reposts) 699 let cid_bytes = repost.cid.to_bytes(); 700 let cid_digest = parakeet_db::utils::cid::cid_to_digest(&cid_bytes) 701 .ok_or_else(|| eyre::eyre!("Invalid CID for repost"))?; 702 703 // Resolve via_repost natural keys if present 704 let (via_repost_actor_id, via_repost_rkey) = repost.record.via.as_ref() 705 .and_then(|via| resolved_via_reposts.get(via.uri.as_str()).copied()) 706 .unzip(); 707 708 repost_data.push(RepostCopyData { 709 actor_id, 710 rkey: repost.rkey, // Already converted to i64 711 post_actor_id, 712 post_rkey, 713 cid: cid_digest.to_vec(), 714 created_at: repost.record.created_at, 715 via_repost_actor_id, 716 via_repost_rkey, 717 }); 718 } 719 720 Ok(repost_data) 721} 722 723/// Process blocks in bulk 724async fn process_blocks_bulk( 725 conn: &deadpool_postgres::Object, 726 actor_id: i32, 727 blocks: Vec<BlockMetadata>, 728) -> Result<Vec<BlockCopyData>> { 729 let mut block_data = Vec::with_capacity(blocks.len()); 730 731 // Collect all subject DIDs for bulk resolution 732 let subject_dids: Vec<&str> = blocks.iter().map(|b| b.record.subject.as_str()).collect(); 733 734 // Resolve all subject actors 735 let mut resolved_actors = bulk_resolve::resolve_actor_dids_bulk(conn, &subject_dids).await?; 736 737 // Create stubs for missing actors 738 let missing_dids: Vec<&str> = subject_dids 739 .iter() 740 .filter(|&&did| !resolved_actors.contains_key(did)) 741 .copied() 742 .collect(); 743 744 if !missing_dids.is_empty() { 745 let created = bulk_resolve::create_actor_stubs_bulk(conn, &missing_dids).await?; 746 resolved_actors.extend(created); 747 } 748 749 // Build BlockCopyData for each block 750 for block in blocks { 751 let subject_actor_id = *resolved_actors.get(block.record.subject.as_str()) 752 .ok_or_else(|| eyre::eyre!("Actor not found for DID: {}", block.record.subject))?; 753 754 block_data.push(BlockCopyData { 755 actor_id, 756 rkey: block.rkey, // Already converted to i64 757 subject_actor_id, 758 created_at: block.record.created_at, 759 }); 760 } 761 762 Ok(block_data) 763} 764 765/// Process posts in bulk 766/// 767/// Converts PostMetadata into PostCopyData by: 768/// 1. Resolving all FK references (parent/root posts, mentions) - stubs already exist 769/// 2. Extracting and processing embeds (images, video, external, record) 770/// 3. Extracting facets (links, mentions, tags) 771/// 4. Compressing content and tokenizing for search 772/// 773/// All referenced entities (posts, actors) have stubs created by resolve_all_references_bulk 774async fn process_posts_bulk( 775 conn: &deadpool_postgres::Object, 776 _repo: &str, 777 actor_id: i32, 778 posts: Vec<PostMetadata>, 779) -> Result<Vec<PostCopyData>> { 780 use crate::search::SearchTokenizer; 781 use crate::types::records::EmbedOuter; 782 use crate::utils::{extract_mentions_and_tags, merge_tags}; 783 use parakeet_db::compression::PostContentCodec; 784 785 let mut post_data = Vec::with_capacity(posts.len()); 786 787 // Step 1: Collect all parent/root/quoted URIs for bulk lookup 788 let mut post_uris_to_lookup: Vec<String> = Vec::new(); 789 for post in &posts { 790 // Collect parent/root from replies 791 if let Some(reply) = &post.record.reply { 792 post_uris_to_lookup.push(reply.parent.uri.to_string()); 793 // Only add root if different from parent 794 if reply.root.uri != reply.parent.uri { 795 post_uris_to_lookup.push(reply.root.uri.to_string()); 796 } 797 } 798 799 // Collect quoted post URIs from embeds 800 // Only collect post URIs - embeds can also be feedgens, profiles, lists, etc. 801 if let Some(ref embed) = post.record.embed { 802 if let Some(bsky_embed) = embed.as_bsky() { 803 match bsky_embed { 804 AppBskyEmbed::Record(r) => { 805 if r.record.uri.as_str().contains("/app.bsky.feed.post/") { 806 post_uris_to_lookup.push(r.record.uri.to_string()); 807 } 808 } 809 AppBskyEmbed::RecordWithMedia(rwm) => { 810 if rwm.record.uri.as_str().contains("/app.bsky.feed.post/") { 811 post_uris_to_lookup.push(rwm.record.uri.to_string()); 812 } 813 } 814 _ => {} 815 } 816 } 817 } 818 } 819 820 // Step 2: Bulk resolve post URIs to post_ids (stubs already exist) 821 let uri_to_post_id = if !post_uris_to_lookup.is_empty() { 822 let uris: Vec<&str> = post_uris_to_lookup.iter().map(|s| s.as_str()).collect(); 823 bulk_resolve::resolve_post_uris_bulk(conn, &uris).await? 824 } else { 825 std::collections::HashMap::new() 826 }; 827 828 // Step 3: Extract mentions and tags from all posts in one pass 829 // Store tags indexed by post position for later use 830 let mut mention_dids: Vec<String> = Vec::new(); 831 let mut tags_by_post: Vec<Vec<String>> = Vec::with_capacity(posts.len()); 832 833 for post in &posts { 834 if let Some(facets) = &post.record.facets { 835 let (mentions, tags) = extract_mentions_and_tags(facets.as_slice()); 836 mention_dids.extend(mentions); 837 tags_by_post.push(tags); 838 } else { 839 tags_by_post.push(Vec::new()); 840 } 841 } 842 843 // Step 4: Bulk resolve mention DIDs to actor_ids (stubs already exist) 844 let did_to_actor_id = if !mention_dids.is_empty() { 845 let dids: Vec<&str> = mention_dids.iter().map(|s| s.as_str()).collect(); 846 bulk_resolve::resolve_actor_dids_bulk(conn, &dids).await? 847 } else { 848 std::collections::HashMap::new() 849 }; 850 851 // Step 5: Process each post 852 let codec = PostContentCodec::new(); 853 let tokenizer = SearchTokenizer::new(); 854 855 for (post_idx, post_meta) in posts.into_iter().enumerate() { 856 let PostMetadata { rkey, cid, record } = post_meta; 857 858 // Extract CID digest 859 let cid_bytes = cid.to_bytes(); 860 let cid_digest = parakeet_db::utils::cid::cid_to_digest(&cid_bytes) 861 .expect("Valid CID should have digest"); 862 863 // Resolve parent and root post natural keys 864 let (parent_post_actor_id, parent_post_rkey, root_post_actor_id, root_post_rkey) = if let Some(reply) = &record.reply { 865 let parent_nk = uri_to_post_id.get(reply.parent.uri.as_str()).copied(); 866 let root_nk = if reply.root.uri != reply.parent.uri { 867 uri_to_post_id.get(reply.root.uri.as_str()).copied() 868 } else { 869 parent_nk // Root is same as parent 870 }; 871 let (parent_actor_id, parent_rkey) = parent_nk.unzip(); 872 let (root_actor_id, root_rkey) = root_nk.unzip(); 873 (parent_actor_id, parent_rkey, root_actor_id, root_rkey) 874 } else { 875 (None, None, None, None) 876 }; 877 878 // Use pre-extracted tags from Step 3 879 let tags_from_facets = tags_by_post[post_idx].clone(); 880 881 // Merge tags from facets and record.tags 882 let tags = merge_tags(Some(tags_from_facets), record.tags.clone()); 883 884 // Compress content 885 let content_compressed = if record.text.is_empty() { 886 None 887 } else { 888 Some(codec.compress(&record.text)?) 889 }; 890 891 // Tokenize for search 892 let tokens = tokenizer.tokenize(&record.text); 893 894 // Extract embed type and subtype 895 // Inline lexicon_to_embed_type mapping 896 let lexicon_to_embed_type = |lexicon: &str| -> String { 897 match lexicon { 898 "app.bsky.embed.images" => "images", 899 "app.bsky.embed.video" => "video", 900 "app.bsky.embed.external" => "external", 901 "app.bsky.embed.record" => "record", 902 "app.bsky.embed.recordWithMedia" => "record_with_media", 903 _ => lexicon, 904 }.to_string() 905 }; 906 907 let (embed_type, embed_subtype) = if let Some(ref embed_outer) = record.embed { 908 let type_str = EmbedOuter::as_str(embed_outer); 909 let embed_type = Some(lexicon_to_embed_type(type_str)); 910 let embed_subtype = EmbedOuter::subtype(embed_outer) 911 .map(lexicon_to_embed_type); 912 (embed_type, embed_subtype) 913 } else { 914 (None, None) 915 }; 916 917 // For backfills, violates_threadgate is always false (we don't check gates for historical data) 918 let violates_threadgate = false; 919 920 // Build composite fields from embed using composite_builders 921 let (ext_embed, video_embed, image_1, image_2, image_3, image_4, embedded_post_actor_id, embedded_post_rkey, record_detached) = 922 if let Some(embed_outer) = record.embed.and_then(|e| e.into_bsky()) { 923 match embed_outer { 924 AppBskyEmbed::Images(images) => { 925 let (i1, i2, i3, i4) = composite_builders::build_image_embeds(&images); 926 (None, None, i1, i2, i3, i4, None, None, None) 927 } 928 AppBskyEmbed::Video(video) => { 929 (None, composite_builders::build_video_embed(&video), None, None, None, None, None, None, None) 930 } 931 AppBskyEmbed::External(external) => { 932 (composite_builders::build_ext_embed(&external), None, None, None, None, None, None, None, None) 933 } 934 AppBskyEmbed::Record(rec) => { 935 let embedded_post_nk = uri_to_post_id.get(rec.record.uri.as_str()).copied(); 936 let (embedded_actor_id, embedded_rkey) = embedded_post_nk.unzip(); 937 (None, None, None, None, None, None, embedded_actor_id, embedded_rkey, Some(false)) 938 } 939 AppBskyEmbed::RecordWithMedia(rwm) => { 940 // Process media part 941 let (ext, vid, i1, i2, i3, i4) = match &rwm.media { 942 MediaEmbed::Images(images) => { 943 let (i1, i2, i3, i4) = composite_builders::build_image_embeds(&images); 944 (None, None, i1, i2, i3, i4) 945 } 946 MediaEmbed::Video(video) => { 947 (None, composite_builders::build_video_embed(&video), None, None, None, None) 948 } 949 MediaEmbed::External(external) => { 950 (composite_builders::build_ext_embed(&external), None, None, None, None, None) 951 } 952 }; 953 // Process record part 954 let embedded_post_nk = uri_to_post_id.get(rwm.record.uri.as_str()).copied(); 955 let (embedded_actor_id, embedded_rkey) = embedded_post_nk.unzip(); 956 (ext, vid, i1, i2, i3, i4, embedded_actor_id, embedded_rkey, Some(false)) 957 } 958 } 959 } else { 960 (None, None, None, None, None, None, None, None, None) 961 }; 962 963 // Build facet composites and mention array using composite_builders 964 let (facet_1, facet_2, facet_3, facet_4, facet_5, facet_6, facet_7, facet_8, mentions) = 965 if let Some(ref facets) = record.facets { 966 let (f1, f2, f3, f4, f5, f6, f7, f8) = composite_builders::build_facet_embeds(facets, &did_to_actor_id); 967 let m = composite_builders::extract_mention_actor_ids(facets, &did_to_actor_id); 968 (f1, f2, f3, f4, f5, f6, f7, f8, m) 969 } else { 970 (None, None, None, None, None, None, None, None, None) 971 }; 972 973 post_data.push(PostCopyData { 974 actor_id, 975 rkey, 976 cid: cid_digest.to_vec(), 977 content_compressed, 978 langs: record.langs.unwrap_or_default(), 979 tags, 980 parent_post_actor_id, 981 parent_post_rkey, 982 root_post_actor_id, 983 root_post_rkey, 984 embed_type, 985 embed_subtype, 986 violates_threadgate, 987 tokens, 988 ext_embed, 989 video_embed, 990 image_1, 991 image_2, 992 image_3, 993 image_4, 994 embedded_post_actor_id, 995 embedded_post_rkey, 996 record_detached, 997 facet_1, 998 facet_2, 999 facet_3, 1000 facet_4, 1001 facet_5, 1002 facet_6, 1003 facet_7, 1004 facet_8, 1005 mentions, 1006 }); 1007 } 1008 1009 Ok(post_data) 1010}