Highly ambitious ATProtocol AppView service and sdks
at main 1266 lines 51 kB view raw
1//! Bulk synchronization operations with the ATProto relay. 2//! 3//! This module handles backfilling and syncing data from the ATProto network via the relay endpoint. 4//! It provides: 5//! - Memory-efficient batch processing with streaming writes 6//! - Concurrent database operations using channel-based architecture 7//! - HTTP/2 connection pooling for optimal network utilization 8//! - Rate-limited PDS requests (3 concurrent per server) 9//! - DID resolution with caching and chunked processing 10//! - Parallel record validation against Lexicon schemas 11//! - Actor indexing with pre-allocated data structures 12 13use atproto_identity::resolve::{HickoryDnsResolver, resolve_subject}; 14use chrono::Utc; 15use futures_util::future; 16use reqwest::Client; 17use serde::{Deserialize, Serialize}; 18use serde_json::Value; 19use tokio::sync::mpsc; 20use tokio::time::{Duration, timeout}; 21use tracing::{debug, error, info, warn}; 22 23use crate::actor_resolver::{resolve_actor_data_cached, resolve_actor_data_with_retry}; 24use crate::cache::SliceCache; 25use crate::database::Database; 26use crate::errors::SyncError; 27use crate::logging::LogLevel; 28use crate::logging::Logger; 29use crate::models::{Actor, Record}; 30use serde_json::json; 31use std::sync::Arc; 32use tokio::sync::Mutex; 33use uuid::Uuid; 34 35// ============================================================================= 36// ATProto API Response Types 37// ============================================================================= 38 39/// Record returned from ATProto `com.atproto.repo.listRecords` endpoint. 40#[derive(Debug, Deserialize)] 41struct AtProtoRecord { 42 uri: String, 43 cid: String, 44 value: Value, 45} 46 47/// Response from `com.atproto.repo.listRecords` with cursor-based pagination. 48#[derive(Debug, Deserialize)] 49struct ListRecordsResponse { 50 records: Vec<AtProtoRecord>, 51 cursor: Option<String>, 52} 53 54/// Response from `com.atproto.sync.listReposByCollection` with cursor-based pagination. 55#[derive(Debug, Deserialize)] 56struct ListReposByCollectionResponse { 57 repos: Vec<RepoRef>, 58 cursor: Option<String>, 59} 60 61/// Repository reference from the relay (contains DID). 62#[derive(Debug, Deserialize)] 63struct RepoRef { 64 did: String, 65} 66 67// ============================================================================= 68// Internal Data Structures 69// ============================================================================= 70 71/// Resolved ATProto actor data (DID, PDS, handle). 72#[derive(Debug, Clone)] 73struct AtpData { 74 did: String, 75 pds: String, 76 handle: Option<String>, 77} 78 79// ============================================================================= 80// Public API Types 81// ============================================================================= 82 83/// Result from syncing user collections (used in login flows). 84#[derive(Debug, Serialize)] 85#[serde(rename_all = "camelCase")] 86pub struct SyncUserCollectionsResult { 87 pub success: bool, 88 pub repos_processed: i64, 89 pub records_synced: i64, 90 pub timed_out: bool, 91 pub message: String, 92} 93 94// ============================================================================= 95// Sync Service 96// ============================================================================= 97 98/// Service for synchronizing ATProto data from the relay. 99/// 100/// Handles bulk backfills, user syncs, rate limiting, and validation. 101#[derive(Clone)] 102pub struct SyncService { 103 client: Client, 104 database: Database, 105 relay_endpoint: String, 106 cache: Option<Arc<Mutex<SliceCache>>>, 107 logger: Option<Logger>, 108 job_id: Option<Uuid>, 109 user_did: Option<String>, 110} 111 112impl SyncService { 113 /// Create a new SyncService with cache (for use outside job contexts). 114 pub fn with_cache( 115 database: Database, 116 relay_endpoint: String, 117 cache: Arc<Mutex<SliceCache>>, 118 ) -> Self { 119 // Create HTTP client with connection pooling and optimized settings 120 let client = Client::builder() 121 .pool_idle_timeout(Duration::from_secs(90)) 122 .pool_max_idle_per_host(10) 123 .http2_keep_alive_interval(Some(Duration::from_secs(30))) 124 .http2_keep_alive_timeout(Duration::from_secs(10)) 125 .timeout(Duration::from_secs(30)) 126 .build() 127 .unwrap_or_else(|_| Client::new()); 128 129 Self { 130 client, 131 database, 132 relay_endpoint, 133 cache: Some(cache), 134 logger: None, 135 job_id: None, 136 user_did: None, 137 } 138 } 139 140 /// Create a new SyncService with logging and cache enabled for a specific job 141 pub fn with_logging_and_cache( 142 database: Database, 143 relay_endpoint: String, 144 logger: Logger, 145 job_id: Uuid, 146 user_did: String, 147 cache: Arc<Mutex<SliceCache>>, 148 ) -> Self { 149 // Create HTTP client with connection pooling and optimized settings 150 let client = Client::builder() 151 .pool_idle_timeout(Duration::from_secs(90)) 152 .pool_max_idle_per_host(10) 153 .http2_keep_alive_interval(Some(Duration::from_secs(30))) 154 .http2_keep_alive_timeout(Duration::from_secs(10)) 155 .timeout(Duration::from_secs(30)) 156 .build() 157 .unwrap_or_else(|_| Client::new()); 158 159 Self { 160 client, 161 database, 162 relay_endpoint, 163 cache: Some(cache), 164 logger: Some(logger), 165 job_id: Some(job_id), 166 user_did: Some(user_did), 167 } 168 } 169 170 /// Log a message with job context (job_id, user_did, slice_uri). 171 /// Only logs if this service was created with logging enabled. 172 fn log_with_context( 173 &self, 174 slice_uri: &str, 175 level: LogLevel, 176 message: &str, 177 metadata: Option<serde_json::Value>, 178 ) { 179 if let (Some(logger), Some(job_id), Some(user_did)) = 180 (&self.logger, &self.job_id, &self.user_did) 181 { 182 logger.log_sync_job(*job_id, user_did, slice_uri, level, message, metadata); 183 } 184 } 185 186 /// Backfill collections from the ATProto relay. 187 /// 188 /// This is the main entry point for bulk synchronization operations. 189 /// 190 /// # Arguments 191 /// 192 /// * `slice_uri` - The slice to backfill data into 193 /// * `collections` - Primary collections (owned by slice domain) to backfill 194 /// * `external_collections` - External collections (not owned by slice) to backfill 195 /// * `repos` - Specific repos to sync (if None, fetches all repos for collections) 196 /// * `skip_validation` - Skip Lexicon validation (useful for testing) 197 /// 198 /// # Returns 199 /// 200 /// Tuple of (repos_processed, records_synced) 201 /// 202 /// # Performance Optimizations 203 /// 204 /// - Requests are grouped by PDS server with 3 concurrent requests max 205 /// - Records are processed in 500-item batches to limit memory usage 206 /// - Database writes happen concurrently via channels 207 /// - HTTP/2 connection pooling reduces network overhead 208 pub async fn backfill_collections( 209 &self, 210 slice_uri: &str, 211 collections: Option<&[String]>, 212 external_collections: Option<&[String]>, 213 repos: Option<&[String]>, 214 skip_validation: bool, 215 max_repos: Option<i32>, 216 ) -> Result<(i64, i64), SyncError> { 217 info!("Starting backfill operation"); 218 219 let primary_collections = collections.map(|c| c.to_vec()).unwrap_or_default(); 220 let external_collections = external_collections.map(|c| c.to_vec()).unwrap_or_default(); 221 222 if !primary_collections.is_empty() { 223 info!( 224 "Processing {} primary collections: {}", 225 primary_collections.len(), 226 primary_collections.join(", ") 227 ); 228 } 229 230 if !external_collections.is_empty() { 231 info!( 232 "Processing {} external collections: {}", 233 external_collections.len(), 234 external_collections.join(", ") 235 ); 236 } 237 238 if primary_collections.is_empty() && external_collections.is_empty() { 239 info!("No collections specified for backfill"); 240 return Ok((0, 0)); 241 } 242 243 let all_collections = [&primary_collections[..], &external_collections[..]].concat(); 244 245 // Fetch repos to process (either provided or discovered from collections) 246 let all_repos = if let Some(provided_repos) = repos { 247 info!("Using {} provided repositories", provided_repos.len()); 248 provided_repos.to_vec() 249 } else { 250 info!("Fetching repositories for collections..."); 251 let mut unique_repos = std::collections::HashSet::new(); 252 253 // First, get all repos from primary collections 254 let mut primary_repos = std::collections::HashSet::new(); 255 for collection in &primary_collections { 256 match self 257 .get_repos_for_collection(collection, slice_uri, max_repos) 258 .await 259 { 260 Ok(repos) => { 261 info!( 262 "Found {} repositories for primary collection \"{}\"", 263 repos.len(), 264 collection 265 ); 266 self.log_with_context( 267 slice_uri, 268 LogLevel::Info, 269 &format!( 270 "Found {} repositories for collection '{}'", 271 repos.len(), 272 collection 273 ), 274 Some(json!({"collection": collection, "repo_count": repos.len()})), 275 ); 276 primary_repos.extend(repos); 277 } 278 Err(e) => { 279 error!( 280 "Failed to get repos for primary collection {}: {}", 281 collection, e 282 ); 283 self.log_with_context( 284 slice_uri, 285 LogLevel::Error, 286 &format!( 287 "Failed to fetch repositories for collection '{}': {}", 288 collection, e 289 ), 290 Some(json!({"collection": collection, "error": e.to_string()})), 291 ); 292 } 293 } 294 } 295 296 info!( 297 "Found {} unique repositories from primary collections", 298 primary_repos.len() 299 ); 300 301 // Use primary repos for syncing (both primary and external collections) 302 unique_repos.extend(primary_repos); 303 304 let repos: Vec<String> = unique_repos.into_iter().collect(); 305 info!("Processing {} unique repositories", repos.len()); 306 repos 307 }; 308 309 // Resolve DID -> PDS/handle mappings for all repos 310 info!("Resolving ATP data for repositories..."); 311 let atp_map = self.get_atp_map_for_repos(&all_repos).await?; 312 info!( 313 "Resolved ATP data for {}/{} repositories", 314 atp_map.len(), 315 all_repos.len() 316 ); 317 318 // Only sync repos that successfully resolved 319 let valid_repos: Vec<String> = atp_map.keys().cloned().collect(); 320 let failed_resolutions = all_repos.len() - valid_repos.len(); 321 322 if failed_resolutions > 0 { 323 info!( 324 "{} repositories failed DID resolution and will be skipped", 325 failed_resolutions 326 ); 327 } 328 329 info!("Starting sync for {} repositories...", valid_repos.len()); 330 331 // Group requests by PDS server for rate limiting and connection reuse 332 // Pre-allocated capacity avoids HashMap resizing during insertions 333 let mut requests_by_pds: std::collections::HashMap<String, Vec<(String, String)>> = 334 std::collections::HashMap::with_capacity(atp_map.len()); 335 336 for repo in &valid_repos { 337 if let Some(atp_data) = atp_map.get(repo) { 338 let pds_url = atp_data.pds.clone(); 339 for collection in &all_collections { 340 requests_by_pds 341 .entry(pds_url.clone()) 342 .or_default() 343 .push((repo.clone(), collection.clone())); 344 } 345 } 346 } 347 348 info!( 349 "Fetching records with rate limiting: {} PDS servers, {} total requests", 350 requests_by_pds.len(), 351 requests_by_pds.values().map(|v| v.len()).sum::<usize>() 352 ); 353 354 // Process each PDS server with limited concurrency 355 // 3 concurrent requests balances speed vs memory usage 356 // Lower than 3 = too slow, Higher than 3 = memory pressure 357 let mut fetch_tasks = Vec::new(); 358 const MAX_CONCURRENT_PER_PDS: usize = 3; 359 360 for (_pds_url, repo_collections) in requests_by_pds { 361 let sync_service = self.clone(); 362 let atp_map_clone = atp_map.clone(); 363 let slice_uri_clone = slice_uri.to_string(); 364 365 // Process this PDS server's requests in chunks 366 let pds_task = tokio::spawn(async move { 367 let mut pds_results = Vec::new(); 368 369 // Split requests into chunks and process with limited concurrency 370 for chunk in repo_collections.chunks(MAX_CONCURRENT_PER_PDS) { 371 let mut chunk_tasks = Vec::new(); 372 373 for (repo, collection) in chunk { 374 let repo_clone = repo.clone(); 375 let collection_clone = collection.clone(); 376 let sync_service_clone = sync_service.clone(); 377 let atp_map_inner = atp_map_clone.clone(); 378 let slice_uri_inner = slice_uri_clone.clone(); 379 380 let task = tokio::spawn(async move { 381 match sync_service_clone 382 .fetch_records_for_repo_collection_with_atp_map( 383 &repo_clone, 384 &collection_clone, 385 &atp_map_inner, 386 &slice_uri_inner, 387 ) 388 .await 389 { 390 Ok(records) => Ok((repo_clone, collection_clone, records)), 391 Err(e) => { 392 // Handle common "not error" scenarios as empty results 393 match &e { 394 SyncError::ListRecords { status } => { 395 if *status == 404 || *status == 400 { 396 // Collection doesn't exist for this repo - return empty 397 Ok((repo_clone, collection_clone, vec![])) 398 } else { 399 Err(e) 400 } 401 } 402 SyncError::HttpRequest(_) => { 403 // Network errors - treat as empty (like TypeScript version) 404 Ok((repo_clone, collection_clone, vec![])) 405 } 406 _ => Err(e), 407 } 408 } 409 } 410 }); 411 chunk_tasks.push(task); 412 } 413 414 // Wait for this chunk to complete before starting the next 415 for task in chunk_tasks { 416 if let Ok(result) = task.await { 417 pds_results.push(result); 418 } 419 } 420 421 // Small delay between chunks to be kind to PDS servers 422 if chunk.len() == MAX_CONCURRENT_PER_PDS { 423 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; 424 } 425 } 426 427 pds_results 428 }); 429 430 fetch_tasks.push(pds_task); 431 } 432 433 // Collect all results 434 let mut successful_tasks = 0; 435 let mut failed_tasks = 0; 436 437 // Get lexicons for this slice 438 let lexicons = match self.database.get_lexicons_by_slice(slice_uri).await { 439 Ok(lexicons) if !lexicons.is_empty() => Some(lexicons), 440 Ok(_) => { 441 warn!("No lexicons found for slice {}", slice_uri); 442 None 443 } 444 Err(e) => { 445 warn!("Failed to get lexicons for slice {}: {}", slice_uri, e); 446 None 447 } 448 }; 449 450 // Index actors first (ensuring actor records exist before inserting records) 451 info!("Indexing actors..."); 452 self.index_actors(slice_uri, &valid_repos, &atp_map).await?; 453 info!("Indexed {} actors", valid_repos.len()); 454 455 // Set up concurrent database writer using channels 456 // This allows fetching to continue while DB writes happen in parallel 457 // 500-record batches optimize for memory usage and DB transaction size 458 const BATCH_SIZE: usize = 500; 459 let (tx, mut rx) = mpsc::channel::<Vec<Record>>(4); // Buffer prevents backpressure 460 let database = self.database.clone(); 461 let total_indexed_records = Arc::new(Mutex::new(0i64)); 462 463 // Spawn database writer task 464 let writer_task = tokio::spawn(async move { 465 let mut write_count = 0i64; 466 while let Some(batch) = rx.recv().await { 467 let batch_size = batch.len() as i64; 468 match database.batch_insert_records(&batch).await { 469 Ok(_) => { 470 write_count += batch_size; 471 info!( 472 "Database writer: Inserted batch of {} records (total: {})", 473 batch_size, write_count 474 ); 475 } 476 Err(e) => { 477 error!("Database writer: Failed to insert batch: {}", e); 478 return Err(SyncError::Generic(format!("Failed to insert batch: {}", e))); 479 } 480 } 481 } 482 Ok(write_count) 483 }); 484 485 // Process results from each PDS server 486 let mut batch_buffer = Vec::with_capacity(BATCH_SIZE); 487 488 for pds_task in fetch_tasks { 489 match pds_task.await { 490 Ok(pds_results) => { 491 // Process each result from this PDS 492 for result in pds_results { 493 match result { 494 Ok((repo, collection, records)) => { 495 let mut validated_records = Vec::new(); 496 let total_records = records.len(); 497 498 // Skip validation if requested 499 if skip_validation { 500 validated_records = records; 501 info!( 502 "Validation skipped - accepting all {} records for collection {} from repo {}", 503 total_records, collection, repo 504 ); 505 } 506 // Validate each record if we have lexicons 507 else if let Some(ref lexicons) = lexicons { 508 let mut validation_errors = Vec::new(); 509 510 // Process validations in chunks for better CPU cache locality 511 // 50 records per chunk optimizes L2/L3 cache usage 512 const VALIDATION_CHUNK_SIZE: usize = 50; 513 for chunk in records.chunks(VALIDATION_CHUNK_SIZE) { 514 for record in chunk { 515 match slices_lexicon::validate_record( 516 lexicons.clone(), 517 &collection, 518 record.json.clone(), 519 ) { 520 Ok(_) => { 521 validated_records.push(record.clone()); 522 } 523 Err(e) => { 524 let error_msg = format!( 525 "Validation failed for record {} from {}: {}", 526 record.uri, repo, e 527 ); 528 warn!("{}", error_msg); 529 validation_errors.push(json!({ 530 "uri": record.uri, 531 "error": e.to_string() 532 })); 533 534 // Log individual validation failures 535 self.log_with_context( 536 slice_uri, 537 LogLevel::Warn, 538 &error_msg, 539 Some(json!({ 540 "repo": repo, 541 "collection": collection, 542 "record_uri": record.uri, 543 "validation_error": e.to_string() 544 })), 545 ); 546 } 547 } 548 } 549 } 550 551 let valid_count = validated_records.len(); 552 let invalid_count = validation_errors.len(); 553 554 if invalid_count > 0 { 555 self.log_with_context(slice_uri, LogLevel::Warn, 556 &format!("Validation completed for {}/{}: {} valid, {} invalid records", 557 repo, collection, valid_count, invalid_count), 558 Some(json!({ 559 "repo": repo, 560 "collection": collection, 561 "valid_records": valid_count, 562 "invalid_records": invalid_count, 563 "validation_errors": validation_errors 564 })) 565 ); 566 } else { 567 self.log_with_context( 568 slice_uri, 569 LogLevel::Info, 570 &format!( 571 "All {} records validated successfully for {}/{}", 572 valid_count, repo, collection 573 ), 574 Some(json!({ 575 "repo": repo, 576 "collection": collection, 577 "valid_records": valid_count 578 })), 579 ); 580 } 581 582 info!( 583 "Validated {}/{} records for collection {} from repo {}", 584 validated_records.len(), 585 total_records, 586 collection, 587 repo 588 ); 589 } else { 590 // No validator available, accept all records 591 validated_records = records; 592 self.log_with_context(slice_uri, LogLevel::Warn, 593 &format!("No lexicon validator available for collection {}", collection), 594 Some(json!({"collection": collection, "repo": repo, "accepted_records": total_records})) 595 ); 596 warn!( 597 "No lexicon validator available - accepting all records without validation for collection {}", 598 collection 599 ); 600 } 601 602 // Add to batch buffer instead of all_records 603 batch_buffer.extend(validated_records); 604 successful_tasks += 1; 605 } 606 Err(_) => { 607 failed_tasks += 1; 608 } 609 } 610 } 611 } 612 Err(_) => { 613 // PDS task failed - count all its requests as failed 614 failed_tasks += 1; 615 } 616 } 617 618 // Send batch to writer when buffer is full 619 if batch_buffer.len() >= BATCH_SIZE { 620 let batch_to_send = 621 std::mem::replace(&mut batch_buffer, Vec::with_capacity(BATCH_SIZE)); 622 let batch_count = batch_to_send.len() as i64; 623 info!( 624 "Sending batch of {} records to database writer", 625 batch_count 626 ); 627 628 // Send to writer channel (non-blocking) 629 if let Err(e) = tx.send(batch_to_send).await { 630 error!("Failed to send batch to writer: {}", e); 631 return Err(SyncError::Generic(format!( 632 "Failed to send batch to writer: {}", 633 e 634 ))); 635 } 636 637 let mut total = total_indexed_records.lock().await; 638 *total += batch_count; 639 } 640 } 641 642 // Flush any remaining records in the buffer 643 if !batch_buffer.is_empty() { 644 let batch_count = batch_buffer.len() as i64; 645 info!( 646 "Sending final batch of {} records to database writer", 647 batch_count 648 ); 649 650 if let Err(e) = tx.send(batch_buffer).await { 651 error!("Failed to send final batch to writer: {}", e); 652 return Err(SyncError::Generic(format!( 653 "Failed to send final batch to writer: {}", 654 e 655 ))); 656 } 657 658 let mut total = total_indexed_records.lock().await; 659 *total += batch_count; 660 } 661 662 // Close the channel and wait for writer to finish 663 drop(tx); 664 let write_result = writer_task 665 .await 666 .map_err(|e| SyncError::Generic(format!("Writer task panicked: {}", e)))?; 667 668 let final_count = match write_result { 669 Ok(count) => count, 670 Err(e) => return Err(e), 671 }; 672 673 info!( 674 "Debug: {} successful tasks, {} failed tasks", 675 successful_tasks, failed_tasks 676 ); 677 678 info!("Indexed {} new/changed records in batches", final_count); 679 680 info!("Backfill complete!"); 681 682 Ok((valid_repos.len() as i64, final_count)) 683 } 684 685 /// Fetch all repositories that have records in a given collection. 686 /// 687 /// Uses cursor-based pagination to fetch all repos from the relay. 688 pub async fn get_repos_for_collection( 689 &self, 690 collection: &str, 691 slice_uri: &str, 692 max_repos: Option<i32>, 693 ) -> Result<Vec<String>, SyncError> { 694 let url = format!( 695 "{}/xrpc/com.atproto.sync.listReposByCollection", 696 self.relay_endpoint 697 ); 698 let mut all_repos = Vec::new(); 699 let mut cursor: Option<String> = None; 700 let mut page_count = 0; 701 702 // AT Protocol docs: default 500 repos/page, max 2000 repos/page 703 // We use 1000 repos/page for efficiency (recommended for large DID lists) 704 const REPOS_PER_PAGE: usize = 1000; // Our configured page size 705 706 // Calculate max pages based on repo limit, with a reasonable safety margin 707 let max_pages = if let Some(limit) = max_repos { 708 // Add 20% safety margin and ensure at least 5 pages 709 ((limit as usize * 120 / 100) / REPOS_PER_PAGE).max(5) 710 } else { 711 25 // Default fallback for unlimited 712 }; 713 714 loop { 715 page_count += 1; 716 if page_count > max_pages { 717 warn!( 718 "Reached maximum page limit ({}) for collection {} (based on repo limit {:?}, estimated max {} repos at {} per page)", 719 max_pages, 720 collection, 721 max_repos, 722 max_pages * REPOS_PER_PAGE, 723 REPOS_PER_PAGE 724 ); 725 break; 726 } 727 728 let mut query_params = vec![ 729 ("collection", collection.to_string()), 730 ("limit", "1000".to_string()), 731 ]; 732 if let Some(ref cursor_value) = cursor { 733 query_params.push(("cursor", cursor_value.clone())); 734 } 735 736 let response = self.client.get(&url).query(&query_params).send().await?; 737 738 if !response.status().is_success() { 739 return Err(SyncError::ListRepos { 740 status: response.status().as_u16(), 741 }); 742 } 743 744 let repos_response: ListReposByCollectionResponse = response.json().await?; 745 746 // Add repos from this page to our collection 747 all_repos.extend(repos_response.repos.into_iter().map(|r| r.did)); 748 749 // Check if there's a next page 750 match repos_response.cursor { 751 Some(next_cursor) if !next_cursor.is_empty() => { 752 cursor = Some(next_cursor); 753 // Log pagination progress if we have a logger 754 self.log_with_context(slice_uri, LogLevel::Info, 755 &format!("Fetching next page of repositories for collection {}, total so far: {}", collection, all_repos.len()), 756 Some(json!({ 757 "collection": collection, 758 "repos_count": all_repos.len(), 759 "has_more": true, 760 "page": page_count 761 })) 762 ); 763 } 764 _ => break, // No more pages 765 } 766 } 767 768 // Log final count 769 self.log_with_context( 770 slice_uri, 771 LogLevel::Info, 772 &format!( 773 "Completed fetching repositories for collection {}, total: {}", 774 collection, 775 all_repos.len() 776 ), 777 Some(json!({ 778 "collection": collection, 779 "total_repos": all_repos.len() 780 })), 781 ); 782 783 Ok(all_repos) 784 } 785 786 /// Fetch records for a repo/collection with retry logic. 787 /// 788 /// If the PDS returns an error, invalidates the cached DID resolution and retries once. 789 /// This handles cases where PDS URLs change. 790 async fn fetch_records_for_repo_collection_with_atp_map( 791 &self, 792 repo: &str, 793 collection: &str, 794 atp_map: &std::collections::HashMap<String, AtpData>, 795 slice_uri: &str, 796 ) -> Result<Vec<Record>, SyncError> { 797 let atp_data = atp_map 798 .get(repo) 799 .ok_or_else(|| SyncError::Generic(format!("No ATP data found for repo: {}", repo)))?; 800 801 match self 802 .fetch_records_for_repo_collection(repo, collection, &atp_data.pds, slice_uri) 803 .await 804 { 805 Ok(records) => Ok(records), 806 Err(SyncError::ListRecords { status }) if (400..600).contains(&status) => { 807 // 4xx/5xx error from PDS - try invalidating cache and retrying once 808 debug!( 809 "PDS error {} for repo {}, attempting cache invalidation and retry", 810 status, repo 811 ); 812 813 match resolve_actor_data_with_retry(&self.client, repo, self.cache.clone(), true) 814 .await 815 { 816 Ok(fresh_actor_data) => { 817 debug!( 818 "Successfully re-resolved actor data for {}, retrying with PDS: {}", 819 repo, fresh_actor_data.pds 820 ); 821 self.fetch_records_for_repo_collection( 822 repo, 823 collection, 824 &fresh_actor_data.pds, 825 slice_uri, 826 ) 827 .await 828 } 829 Err(e) => { 830 debug!("Failed to re-resolve actor data for {}: {:?}", repo, e); 831 Err(SyncError::ListRecords { status }) // Return original error 832 } 833 } 834 } 835 Err(e) => Err(e), // Other errors (network, etc.) - don't retry 836 } 837 } 838 839 /// Fetch records for a specific repo and collection from its PDS. 840 /// 841 /// Only returns new or changed records (compared by CID). 842 /// Uses cursor-based pagination to fetch all records. 843 /// 844 /// # Memory optimizations: 845 /// - Pre-allocated Vec with 100 capacity (typical collection size) 846 /// - Fetches in 100-record pages to limit response size 847 /// - Reuses HTTP connections via client pooling 848 async fn fetch_records_for_repo_collection( 849 &self, 850 repo: &str, 851 collection: &str, 852 pds_url: &str, 853 slice_uri: &str, 854 ) -> Result<Vec<Record>, SyncError> { 855 // Get existing record CIDs to skip unchanged records 856 let existing_cids = self 857 .database 858 .get_existing_record_cids_for_slice(repo, collection, slice_uri) 859 .await 860 .map_err(|e| SyncError::Generic(format!("Failed to get existing CIDs: {}", e)))?; 861 862 debug!( 863 "Found {} existing records for {}/{}", 864 existing_cids.len(), 865 repo, 866 collection 867 ); 868 869 // Pre-allocate based on typical collection size (100 records) 870 // This avoids Vec reallocations which can cause memory fragmentation 871 let mut records = Vec::with_capacity(100); 872 let mut cursor: Option<String> = None; 873 let mut fetched_count = 0; 874 let mut skipped_count = 0; 875 876 loop { 877 let mut params = vec![("repo", repo), ("collection", collection), ("limit", "100")]; 878 if let Some(ref c) = cursor { 879 params.push(("cursor", c)); 880 } 881 882 let request_url = format!("{}/xrpc/com.atproto.repo.listRecords", pds_url); 883 let response = self.client.get(&request_url).query(&params).send().await; 884 885 let response = match response { 886 Ok(resp) => resp, 887 Err(e) => { 888 self.log_with_context(slice_uri, LogLevel::Error, 889 &format!("Failed to fetch records from {}: Network error: {}", repo, e), 890 Some(json!({"repo": repo, "collection": collection, "pds_url": pds_url, "error": e.to_string()})) 891 ); 892 return Err(SyncError::from(e)); 893 } 894 }; 895 896 if !response.status().is_success() { 897 let status = response.status().as_u16(); 898 899 // HTTP 400/404 are expected when collections don't exist - log as info, not error 900 let (log_level, log_message) = if status == 400 || status == 404 { 901 ( 902 LogLevel::Info, 903 format!( 904 "Collection '{}' not found for {}: HTTP {}", 905 collection, repo, status 906 ), 907 ) 908 } else { 909 ( 910 LogLevel::Error, 911 format!( 912 "Failed to fetch records from {}: HTTP {} from PDS", 913 repo, status 914 ), 915 ) 916 }; 917 918 self.log_with_context(slice_uri, log_level, 919 &log_message, 920 Some(json!({"repo": repo, "collection": collection, "pds_url": pds_url, "http_status": status})) 921 ); 922 return Err(SyncError::ListRecords { status }); 923 } 924 925 let list_response: ListRecordsResponse = response.json().await?; 926 927 for atproto_record in list_response.records { 928 // Check if we already have this record with the same CID 929 if let Some(existing_cid) = existing_cids.get(&atproto_record.uri) 930 && existing_cid == &atproto_record.cid 931 { 932 // Record unchanged, skip it 933 skipped_count += 1; 934 continue; 935 } 936 937 // Record is new or changed, include it 938 // TODO: Consider using Arc<str> for frequently cloned strings 939 let record = Record { 940 uri: atproto_record.uri, 941 cid: atproto_record.cid, 942 did: repo.to_string(), 943 collection: collection.to_string(), 944 json: atproto_record.value, 945 indexed_at: Utc::now(), 946 slice_uri: Some(slice_uri.to_string()), 947 }; 948 records.push(record); 949 fetched_count += 1; 950 } 951 952 cursor = list_response.cursor; 953 if cursor.is_none() { 954 break; 955 } 956 } 957 958 // Log results for this repo/collection 959 if fetched_count > 0 || skipped_count > 0 { 960 self.log_with_context( 961 slice_uri, 962 LogLevel::Info, 963 &format!( 964 "Fetched {} new/changed, skipped {} unchanged records from {}/{}", 965 fetched_count, skipped_count, repo, collection 966 ), 967 Some(json!({ 968 "repo": repo, 969 "collection": collection, 970 "new_records": fetched_count, 971 "skipped_records": skipped_count, 972 "pds_url": pds_url 973 })), 974 ); 975 } 976 977 if skipped_count > 0 { 978 info!( 979 "Skipped {} unchanged records, fetched {} new/changed records for {}/{}", 980 skipped_count, fetched_count, repo, collection 981 ); 982 } 983 984 Ok(records) 985 } 986 987 /// Resolve ATP data (DID, PDS, handle) for multiple repos. 988 /// 989 /// Returns a map of DID -> AtpData. Failed resolutions are logged but don't fail the operation. 990 /// 991 /// # Performance optimizations: 992 /// - Processes DIDs in 50-item chunks to limit memory usage 993 /// - 10 concurrent DNS resolutions max to avoid resolver exhaustion 994 /// - Pre-allocated HashMap based on input size 995 async fn get_atp_map_for_repos( 996 &self, 997 repos: &[String], 998 ) -> Result<std::collections::HashMap<String, AtpData>, SyncError> { 999 let mut atp_map = std::collections::HashMap::with_capacity(repos.len()); 1000 const CHUNK_SIZE: usize = 50; // Process DIDs in chunks 1001 const MAX_CONCURRENT: usize = 10; // Limit concurrent resolutions 1002 1003 info!( 1004 "Resolving ATP data for {} repositories in chunks", 1005 repos.len() 1006 ); 1007 1008 for (chunk_idx, chunk) in repos.chunks(CHUNK_SIZE).enumerate() { 1009 let chunk_start = chunk_idx * CHUNK_SIZE; 1010 let chunk_end = std::cmp::min(chunk_start + CHUNK_SIZE, repos.len()); 1011 1012 debug!( 1013 "Processing DID resolution chunk {}/{} (repos {}-{})", 1014 chunk_idx + 1, 1015 repos.len().div_ceil(CHUNK_SIZE), 1016 chunk_start, 1017 chunk_end - 1 1018 ); 1019 1020 // Process this chunk with limited concurrency 1021 let mut resolution_tasks = Vec::new(); 1022 1023 for batch in chunk.chunks(MAX_CONCURRENT) { 1024 let mut batch_futures = Vec::new(); 1025 1026 for repo in batch { 1027 let repo_clone = repo.clone(); 1028 let self_clone = self.clone(); 1029 1030 let fut = async move { 1031 match self_clone.resolve_atp_data(&repo_clone).await { 1032 Ok(atp_data) => Some((atp_data.did.clone(), atp_data)), 1033 Err(e) => { 1034 warn!("Failed to resolve ATP data for {}: {:?}", repo_clone, e); 1035 None 1036 } 1037 } 1038 }; 1039 batch_futures.push(fut); 1040 } 1041 1042 // Wait for this batch to complete 1043 let batch_results = future::join_all(batch_futures).await; 1044 resolution_tasks.extend(batch_results); 1045 } 1046 1047 // Add resolved data to map 1048 for (did, atp_data) in resolution_tasks.into_iter().flatten() { 1049 atp_map.insert(did, atp_data); 1050 } 1051 1052 // Small delay between chunks to be kind to DNS resolvers 1053 if chunk_idx < repos.len().div_ceil(CHUNK_SIZE) - 1 { 1054 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; 1055 } 1056 } 1057 1058 info!( 1059 "Successfully resolved ATP data for {}/{} repositories", 1060 atp_map.len(), 1061 repos.len() 1062 ); 1063 Ok(atp_map) 1064 } 1065 1066 /// Resolve ATP data for a single DID. 1067 /// 1068 /// Uses DID resolution to get the DID document, then extracts PDS and handle. 1069 /// Results are cached to avoid repeated lookups. 1070 async fn resolve_atp_data(&self, did: &str) -> Result<AtpData, SyncError> { 1071 debug!("Resolving ATP data for DID: {}", did); 1072 1073 let dns_resolver = HickoryDnsResolver::create_resolver(&[]); 1074 1075 match resolve_subject(&self.client, &dns_resolver, did).await { 1076 Ok(resolved_did) => { 1077 debug!("Successfully resolved subject: {}", resolved_did); 1078 1079 let actor_data = 1080 resolve_actor_data_cached(&self.client, &resolved_did, self.cache.clone()) 1081 .await 1082 .map_err(|e| SyncError::Generic(e.to_string()))?; 1083 1084 let atp_data = AtpData { 1085 did: actor_data.did, 1086 pds: actor_data.pds, 1087 handle: actor_data.handle, 1088 }; 1089 1090 Ok(atp_data) 1091 } 1092 Err(e) => Err(SyncError::Generic(format!( 1093 "Failed to resolve subject for {}: {:?}", 1094 did, e 1095 ))), 1096 } 1097 } 1098 1099 /// Index actors (DIDs with handles) into the database. 1100 /// 1101 /// Creates actor records for all repos being synced. 1102 async fn index_actors( 1103 &self, 1104 slice_uri: &str, 1105 repos: &[String], 1106 atp_map: &std::collections::HashMap<String, AtpData>, 1107 ) -> Result<(), SyncError> { 1108 let mut actors = Vec::new(); 1109 let now = chrono::Utc::now().to_rfc3339(); 1110 1111 for repo in repos { 1112 if let Some(atp_data) = atp_map.get(repo) { 1113 actors.push(Actor { 1114 did: atp_data.did.clone(), 1115 handle: atp_data.handle.clone(), 1116 slice_uri: slice_uri.to_string(), 1117 indexed_at: now.clone(), 1118 }); 1119 } 1120 } 1121 1122 if !actors.is_empty() { 1123 self.database.batch_insert_actors(&actors).await?; 1124 } 1125 1126 Ok(()) 1127 } 1128 1129 /// Get external collections for a slice. 1130 /// 1131 /// External collections are those that don't start with the slice's domain. 1132 /// For example, if slice domain is "com.example", then "app.bsky.feed.post" is external. 1133 async fn get_external_collections_for_slice( 1134 &self, 1135 slice_uri: &str, 1136 ) -> Result<Vec<String>, SyncError> { 1137 // Get the slice's domain 1138 let domain = self 1139 .database 1140 .get_slice_domain(slice_uri) 1141 .await 1142 .map_err(|e| SyncError::Generic(format!("Failed to get slice domain: {}", e)))? 1143 .ok_or_else(|| SyncError::Generic(format!("Slice not found: {}", slice_uri)))?; 1144 1145 // Get all collections (lexicons) for this slice 1146 let collections = self 1147 .database 1148 .get_slice_collections_list(slice_uri) 1149 .await 1150 .map_err(|e| SyncError::Generic(format!("Failed to get slice collections: {}", e)))?; 1151 1152 // Filter for external collections (those that don't start with the slice domain) 1153 let external_collections: Vec<String> = collections 1154 .into_iter() 1155 .filter(|collection| !collection.starts_with(&domain)) 1156 .collect(); 1157 1158 info!( 1159 "Found {} external collections for slice {} (domain: {}): {:?}", 1160 external_collections.len(), 1161 slice_uri, 1162 domain, 1163 external_collections 1164 ); 1165 1166 Ok(external_collections) 1167 } 1168 1169 /// Sync user's data for all external collections defined in the slice. 1170 /// 1171 /// Used during login flows to quickly sync a user's data. 1172 /// Automatically discovers which collections to sync based on slice configuration. 1173 /// Uses timeout protection to ensure responsive login flows. 1174 /// 1175 /// # Arguments 1176 /// 1177 /// * `user_did` - The user's DID to sync 1178 /// * `slice_uri` - The slice to sync into 1179 /// * `timeout_secs` - Maximum seconds to wait before timing out 1180 /// 1181 /// # Returns 1182 /// 1183 /// Result with repos_processed, records_synced, and timeout status 1184 pub async fn sync_user_collections( 1185 &self, 1186 user_did: &str, 1187 slice_uri: &str, 1188 timeout_secs: u64, 1189 ) -> Result<SyncUserCollectionsResult, SyncError> { 1190 info!( 1191 "Auto-discovering external collections for user {} in slice {}", 1192 user_did, slice_uri 1193 ); 1194 1195 // Auto-discover external collections from slice configuration 1196 let external_collections = self.get_external_collections_for_slice(slice_uri).await?; 1197 1198 if external_collections.is_empty() { 1199 info!("No external collections found for slice {}", slice_uri); 1200 return Ok(SyncUserCollectionsResult { 1201 success: true, 1202 repos_processed: 0, 1203 records_synced: 0, 1204 timed_out: false, 1205 message: "No external collections to sync".to_string(), 1206 }); 1207 } 1208 1209 info!( 1210 "Syncing {} external collections for user {}: {:?}", 1211 external_collections.len(), 1212 user_did, 1213 external_collections 1214 ); 1215 1216 // Use backfill_collections with timeout, only syncing this specific user 1217 let sync_future = async { 1218 self.backfill_collections( 1219 slice_uri, 1220 None, // No primary collections for user sync 1221 Some(&external_collections), 1222 Some(&[user_did.to_string()]), // Only sync this user's repos 1223 false, // Always validate user collections 1224 None, // No limit for user-specific sync 1225 ) 1226 .await 1227 }; 1228 1229 match timeout(Duration::from_secs(timeout_secs), sync_future).await { 1230 Ok(result) => { 1231 let (repos_processed, records_synced) = result?; 1232 info!( 1233 "User sync completed within timeout: {} repos, {} records", 1234 repos_processed, records_synced 1235 ); 1236 Ok(SyncUserCollectionsResult { 1237 success: true, 1238 repos_processed, 1239 records_synced, 1240 timed_out: false, 1241 message: format!( 1242 "Sync completed: {} repos, {} records", 1243 repos_processed, records_synced 1244 ), 1245 }) 1246 } 1247 Err(_) => { 1248 // Timeout occurred - return partial success with guidance 1249 warn!( 1250 "Sync for user {} timed out after {}s, suggest using async job", 1251 user_did, timeout_secs 1252 ); 1253 Ok(SyncUserCollectionsResult { 1254 success: false, 1255 repos_processed: 0, 1256 records_synced: 0, 1257 timed_out: true, 1258 message: format!( 1259 "Sync timed out after {}s - use startSync endpoint for larger syncs", 1260 timeout_secs 1261 ), 1262 }) 1263 } 1264 } 1265 } 1266}