Highly ambitious ATProtocol AppView service and sdks
at main 50 kB view raw
1//! Bulk synchronization operations with the ATProto relay. 2//! 3//! This module handles backfilling and syncing data from the ATProto network via the relay endpoint. 4//! It provides: 5//! - Memory-efficient batch processing with streaming writes 6//! - Concurrent database operations using channel-based architecture 7//! - HTTP/2 connection pooling for optimal network utilization 8//! - Rate-limited PDS requests (3 concurrent per server) 9//! - DID resolution with caching and chunked processing 10//! - Parallel record validation against Lexicon schemas 11//! - Actor indexing with pre-allocated data structures 12 13use atproto_identity::resolve::{HickoryDnsResolver, resolve_subject}; 14use chrono::Utc; 15use futures_util::future; 16use reqwest::Client; 17use serde::{Deserialize, Serialize}; 18use serde_json::Value; 19use tokio::sync::mpsc; 20use tokio::time::{Duration, timeout}; 21use tracing::{debug, error, info, warn}; 22 23use crate::actor_resolver::{resolve_actor_data_cached, resolve_actor_data_with_retry}; 24use crate::cache::SliceCache; 25use crate::database::Database; 26use crate::errors::SyncError; 27use crate::logging::LogLevel; 28use crate::logging::Logger; 29use crate::models::{Actor, Record}; 30use serde_json::json; 31use std::sync::Arc; 32use tokio::sync::Mutex; 33use uuid::Uuid; 34 35// ============================================================================= 36// ATProto API Response Types 37// ============================================================================= 38 39/// Record returned from ATProto `com.atproto.repo.listRecords` endpoint. 40#[derive(Debug, Deserialize)] 41struct AtProtoRecord { 42 uri: String, 43 cid: String, 44 value: Value, 45} 46 47/// Response from `com.atproto.repo.listRecords` with cursor-based pagination. 48#[derive(Debug, Deserialize)] 49struct ListRecordsResponse { 50 records: Vec<AtProtoRecord>, 51 cursor: Option<String>, 52} 53 54/// Response from `com.atproto.sync.listReposByCollection` with cursor-based pagination. 55#[derive(Debug, Deserialize)] 56struct ListReposByCollectionResponse { 57 repos: Vec<RepoRef>, 58 cursor: Option<String>, 59} 60 61/// Repository reference from the relay (contains DID). 62#[derive(Debug, Deserialize)] 63struct RepoRef { 64 did: String, 65} 66 67// ============================================================================= 68// Internal Data Structures 69// ============================================================================= 70 71/// Resolved ATProto actor data (DID, PDS, handle). 72#[derive(Debug, Clone)] 73struct AtpData { 74 did: String, 75 pds: String, 76 handle: Option<String>, 77} 78 79// ============================================================================= 80// Public API Types 81// ============================================================================= 82 83/// Result from syncing user collections (used in login flows). 84#[derive(Debug, Serialize)] 85#[serde(rename_all = "camelCase")] 86pub struct SyncUserCollectionsResult { 87 pub success: bool, 88 pub repos_processed: i64, 89 pub records_synced: i64, 90 pub timed_out: bool, 91 pub message: String, 92} 93 94// ============================================================================= 95// Sync Service 96// ============================================================================= 97 98/// Service for synchronizing ATProto data from the relay. 99/// 100/// Handles bulk backfills, user syncs, rate limiting, and validation. 101#[derive(Clone)] 102pub struct SyncService { 103 client: Client, 104 database: Database, 105 relay_endpoint: String, 106 cache: Option<Arc<Mutex<SliceCache>>>, 107 logger: Option<Logger>, 108 job_id: Option<Uuid>, 109 user_did: Option<String>, 110} 111 112impl SyncService { 113 /// Create a new SyncService with cache (for use outside job contexts). 114 pub fn with_cache( 115 database: Database, 116 relay_endpoint: String, 117 cache: Arc<Mutex<SliceCache>>, 118 ) -> Self { 119 // Create HTTP client with connection pooling and optimized settings 120 let client = Client::builder() 121 .pool_idle_timeout(Duration::from_secs(90)) 122 .pool_max_idle_per_host(10) 123 .http2_keep_alive_interval(Some(Duration::from_secs(30))) 124 .http2_keep_alive_timeout(Duration::from_secs(10)) 125 .timeout(Duration::from_secs(30)) 126 .build() 127 .unwrap_or_else(|_| Client::new()); 128 129 Self { 130 client, 131 database, 132 relay_endpoint, 133 cache: Some(cache), 134 logger: None, 135 job_id: None, 136 user_did: None, 137 } 138 } 139 140 /// Create a new SyncService with logging and cache enabled for a specific job 141 pub fn with_logging_and_cache( 142 database: Database, 143 relay_endpoint: String, 144 logger: Logger, 145 job_id: Uuid, 146 user_did: String, 147 cache: Arc<Mutex<SliceCache>>, 148 ) -> Self { 149 // Create HTTP client with connection pooling and optimized settings 150 let client = Client::builder() 151 .pool_idle_timeout(Duration::from_secs(90)) 152 .pool_max_idle_per_host(10) 153 .http2_keep_alive_interval(Some(Duration::from_secs(30))) 154 .http2_keep_alive_timeout(Duration::from_secs(10)) 155 .timeout(Duration::from_secs(30)) 156 .build() 157 .unwrap_or_else(|_| Client::new()); 158 159 Self { 160 client, 161 database, 162 relay_endpoint, 163 cache: Some(cache), 164 logger: Some(logger), 165 job_id: Some(job_id), 166 user_did: Some(user_did), 167 } 168 } 169 170 /// Log a message with job context (job_id, user_did, slice_uri). 171 /// Only logs if this service was created with logging enabled. 172 fn log_with_context( 173 &self, 174 slice_uri: &str, 175 level: LogLevel, 176 message: &str, 177 metadata: Option<serde_json::Value>, 178 ) { 179 if let (Some(logger), Some(job_id), Some(user_did)) = 180 (&self.logger, &self.job_id, &self.user_did) 181 { 182 logger.log_sync_job(*job_id, user_did, slice_uri, level, message, metadata); 183 } 184 } 185 186 /// Backfill collections from the ATProto relay. 187 /// 188 /// This is the main entry point for bulk synchronization operations. 189 /// 190 /// # Arguments 191 /// 192 /// * `slice_uri` - The slice to backfill data into 193 /// * `collections` - Primary collections (owned by slice domain) to backfill 194 /// * `external_collections` - External collections (not owned by slice) to backfill 195 /// * `repos` - Specific repos to sync (if None, fetches all repos for collections) 196 /// * `skip_validation` - Skip Lexicon validation (useful for testing) 197 /// 198 /// # Returns 199 /// 200 /// Tuple of (repos_processed, records_synced) 201 /// 202 /// # Performance Optimizations 203 /// 204 /// - Requests are grouped by PDS server with 3 concurrent requests max 205 /// - Records are processed in 500-item batches to limit memory usage 206 /// - Database writes happen concurrently via channels 207 /// - HTTP/2 connection pooling reduces network overhead 208 pub async fn backfill_collections( 209 &self, 210 slice_uri: &str, 211 collections: Option<&[String]>, 212 external_collections: Option<&[String]>, 213 repos: Option<&[String]>, 214 skip_validation: bool, 215 max_repos: Option<i32>, 216 ) -> Result<(i64, i64), SyncError> { 217 info!("Starting backfill operation"); 218 219 let primary_collections = collections.map(|c| c.to_vec()).unwrap_or_default(); 220 let external_collections = external_collections.map(|c| c.to_vec()).unwrap_or_default(); 221 222 if !primary_collections.is_empty() { 223 info!( 224 "Processing {} primary collections: {}", 225 primary_collections.len(), 226 primary_collections.join(", ") 227 ); 228 } 229 230 if !external_collections.is_empty() { 231 info!( 232 "Processing {} external collections: {}", 233 external_collections.len(), 234 external_collections.join(", ") 235 ); 236 } 237 238 if primary_collections.is_empty() && external_collections.is_empty() { 239 info!("No collections specified for backfill"); 240 return Ok((0, 0)); 241 } 242 243 let all_collections = [&primary_collections[..], &external_collections[..]].concat(); 244 245 // Fetch repos to process (either provided or discovered from collections) 246 let all_repos = if let Some(provided_repos) = repos { 247 info!("Using {} provided repositories", provided_repos.len()); 248 provided_repos.to_vec() 249 } else { 250 info!("Fetching repositories for collections..."); 251 let mut unique_repos = std::collections::HashSet::new(); 252 253 // First, get all repos from primary collections 254 let mut primary_repos = std::collections::HashSet::new(); 255 for collection in &primary_collections { 256 match self.get_repos_for_collection(collection, slice_uri, max_repos).await { 257 Ok(repos) => { 258 info!( 259 "Found {} repositories for primary collection \"{}\"", 260 repos.len(), 261 collection 262 ); 263 self.log_with_context( 264 slice_uri, 265 LogLevel::Info, 266 &format!( 267 "Found {} repositories for collection '{}'", 268 repos.len(), 269 collection 270 ), 271 Some(json!({"collection": collection, "repo_count": repos.len()})), 272 ); 273 primary_repos.extend(repos); 274 } 275 Err(e) => { 276 error!( 277 "Failed to get repos for primary collection {}: {}", 278 collection, e 279 ); 280 self.log_with_context( 281 slice_uri, 282 LogLevel::Error, 283 &format!( 284 "Failed to fetch repositories for collection '{}': {}", 285 collection, e 286 ), 287 Some(json!({"collection": collection, "error": e.to_string()})), 288 ); 289 } 290 } 291 } 292 293 info!( 294 "Found {} unique repositories from primary collections", 295 primary_repos.len() 296 ); 297 298 // Use primary repos for syncing (both primary and external collections) 299 unique_repos.extend(primary_repos); 300 301 let repos: Vec<String> = unique_repos.into_iter().collect(); 302 info!("Processing {} unique repositories", repos.len()); 303 repos 304 }; 305 306 // Resolve DID -> PDS/handle mappings for all repos 307 info!("Resolving ATP data for repositories..."); 308 let atp_map = self.get_atp_map_for_repos(&all_repos).await?; 309 info!( 310 "Resolved ATP data for {}/{} repositories", 311 atp_map.len(), 312 all_repos.len() 313 ); 314 315 // Only sync repos that successfully resolved 316 let valid_repos: Vec<String> = atp_map.keys().cloned().collect(); 317 let failed_resolutions = all_repos.len() - valid_repos.len(); 318 319 if failed_resolutions > 0 { 320 info!( 321 "{} repositories failed DID resolution and will be skipped", 322 failed_resolutions 323 ); 324 } 325 326 info!("Starting sync for {} repositories...", valid_repos.len()); 327 328 // Group requests by PDS server for rate limiting and connection reuse 329 // Pre-allocated capacity avoids HashMap resizing during insertions 330 let mut requests_by_pds: std::collections::HashMap<String, Vec<(String, String)>> = 331 std::collections::HashMap::with_capacity(atp_map.len()); 332 333 for repo in &valid_repos { 334 if let Some(atp_data) = atp_map.get(repo) { 335 let pds_url = atp_data.pds.clone(); 336 for collection in &all_collections { 337 requests_by_pds 338 .entry(pds_url.clone()) 339 .or_default() 340 .push((repo.clone(), collection.clone())); 341 } 342 } 343 } 344 345 info!( 346 "Fetching records with rate limiting: {} PDS servers, {} total requests", 347 requests_by_pds.len(), 348 requests_by_pds.values().map(|v| v.len()).sum::<usize>() 349 ); 350 351 // Process each PDS server with limited concurrency 352 // 3 concurrent requests balances speed vs memory usage 353 // Lower than 3 = too slow, Higher than 3 = memory pressure 354 let mut fetch_tasks = Vec::new(); 355 const MAX_CONCURRENT_PER_PDS: usize = 3; 356 357 for (_pds_url, repo_collections) in requests_by_pds { 358 let sync_service = self.clone(); 359 let atp_map_clone = atp_map.clone(); 360 let slice_uri_clone = slice_uri.to_string(); 361 362 // Process this PDS server's requests in chunks 363 let pds_task = tokio::spawn(async move { 364 let mut pds_results = Vec::new(); 365 366 // Split requests into chunks and process with limited concurrency 367 for chunk in repo_collections.chunks(MAX_CONCURRENT_PER_PDS) { 368 let mut chunk_tasks = Vec::new(); 369 370 for (repo, collection) in chunk { 371 let repo_clone = repo.clone(); 372 let collection_clone = collection.clone(); 373 let sync_service_clone = sync_service.clone(); 374 let atp_map_inner = atp_map_clone.clone(); 375 let slice_uri_inner = slice_uri_clone.clone(); 376 377 let task = tokio::spawn(async move { 378 match sync_service_clone 379 .fetch_records_for_repo_collection_with_atp_map( 380 &repo_clone, 381 &collection_clone, 382 &atp_map_inner, 383 &slice_uri_inner, 384 ) 385 .await 386 { 387 Ok(records) => Ok((repo_clone, collection_clone, records)), 388 Err(e) => { 389 // Handle common "not error" scenarios as empty results 390 match &e { 391 SyncError::ListRecords { status } => { 392 if *status == 404 || *status == 400 { 393 // Collection doesn't exist for this repo - return empty 394 Ok((repo_clone, collection_clone, vec![])) 395 } else { 396 Err(e) 397 } 398 } 399 SyncError::HttpRequest(_) => { 400 // Network errors - treat as empty (like TypeScript version) 401 Ok((repo_clone, collection_clone, vec![])) 402 } 403 _ => Err(e), 404 } 405 } 406 } 407 }); 408 chunk_tasks.push(task); 409 } 410 411 // Wait for this chunk to complete before starting the next 412 for task in chunk_tasks { 413 if let Ok(result) = task.await { 414 pds_results.push(result); 415 } 416 } 417 418 // Small delay between chunks to be kind to PDS servers 419 if chunk.len() == MAX_CONCURRENT_PER_PDS { 420 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; 421 } 422 } 423 424 pds_results 425 }); 426 427 fetch_tasks.push(pds_task); 428 } 429 430 // Collect all results 431 let mut successful_tasks = 0; 432 let mut failed_tasks = 0; 433 434 // Get lexicons for this slice 435 let lexicons = match self.database.get_lexicons_by_slice(slice_uri).await { 436 Ok(lexicons) if !lexicons.is_empty() => Some(lexicons), 437 Ok(_) => { 438 warn!("No lexicons found for slice {}", slice_uri); 439 None 440 } 441 Err(e) => { 442 warn!("Failed to get lexicons for slice {}: {}", slice_uri, e); 443 None 444 } 445 }; 446 447 // Index actors first (ensuring actor records exist before inserting records) 448 info!("Indexing actors..."); 449 self.index_actors(slice_uri, &valid_repos, &atp_map).await?; 450 info!("Indexed {} actors", valid_repos.len()); 451 452 // Set up concurrent database writer using channels 453 // This allows fetching to continue while DB writes happen in parallel 454 // 500-record batches optimize for memory usage and DB transaction size 455 const BATCH_SIZE: usize = 500; 456 let (tx, mut rx) = mpsc::channel::<Vec<Record>>(4); // Buffer prevents backpressure 457 let database = self.database.clone(); 458 let total_indexed_records = Arc::new(Mutex::new(0i64)); 459 460 // Spawn database writer task 461 let writer_task = tokio::spawn(async move { 462 let mut write_count = 0i64; 463 while let Some(batch) = rx.recv().await { 464 let batch_size = batch.len() as i64; 465 match database.batch_insert_records(&batch).await { 466 Ok(_) => { 467 write_count += batch_size; 468 info!("Database writer: Inserted batch of {} records (total: {})", batch_size, write_count); 469 } 470 Err(e) => { 471 error!("Database writer: Failed to insert batch: {}", e); 472 return Err(SyncError::Generic(format!("Failed to insert batch: {}", e))); 473 } 474 } 475 } 476 Ok(write_count) 477 }); 478 479 // Process results from each PDS server 480 let mut batch_buffer = Vec::with_capacity(BATCH_SIZE); 481 482 for pds_task in fetch_tasks { 483 match pds_task.await { 484 Ok(pds_results) => { 485 // Process each result from this PDS 486 for result in pds_results { 487 match result { 488 Ok((repo, collection, records)) => { 489 let mut validated_records = Vec::new(); 490 let total_records = records.len(); 491 492 // Skip validation if requested 493 if skip_validation { 494 validated_records = records; 495 info!( 496 "Validation skipped - accepting all {} records for collection {} from repo {}", 497 total_records, collection, repo 498 ); 499 } 500 // Validate each record if we have lexicons 501 else if let Some(ref lexicons) = lexicons { 502 let mut validation_errors = Vec::new(); 503 504 // Process validations in chunks for better CPU cache locality 505 // 50 records per chunk optimizes L2/L3 cache usage 506 const VALIDATION_CHUNK_SIZE: usize = 50; 507 for chunk in records.chunks(VALIDATION_CHUNK_SIZE) { 508 for record in chunk { 509 match slices_lexicon::validate_record( 510 lexicons.clone(), 511 &collection, 512 record.json.clone(), 513 ) { 514 Ok(_) => { 515 validated_records.push(record.clone()); 516 } 517 Err(e) => { 518 let error_msg = format!( 519 "Validation failed for record {} from {}: {}", 520 record.uri, repo, e 521 ); 522 warn!("{}", error_msg); 523 validation_errors.push(json!({ 524 "uri": record.uri, 525 "error": e.to_string() 526 })); 527 528 // Log individual validation failures 529 self.log_with_context( 530 slice_uri, 531 LogLevel::Warn, 532 &error_msg, 533 Some(json!({ 534 "repo": repo, 535 "collection": collection, 536 "record_uri": record.uri, 537 "validation_error": e.to_string() 538 })), 539 ); 540 } 541 } 542 } 543 } 544 545 let valid_count = validated_records.len(); 546 let invalid_count = validation_errors.len(); 547 548 if invalid_count > 0 { 549 self.log_with_context(slice_uri, LogLevel::Warn, 550 &format!("Validation completed for {}/{}: {} valid, {} invalid records", 551 repo, collection, valid_count, invalid_count), 552 Some(json!({ 553 "repo": repo, 554 "collection": collection, 555 "valid_records": valid_count, 556 "invalid_records": invalid_count, 557 "validation_errors": validation_errors 558 })) 559 ); 560 } else { 561 self.log_with_context( 562 slice_uri, 563 LogLevel::Info, 564 &format!( 565 "All {} records validated successfully for {}/{}", 566 valid_count, repo, collection 567 ), 568 Some(json!({ 569 "repo": repo, 570 "collection": collection, 571 "valid_records": valid_count 572 })), 573 ); 574 } 575 576 info!( 577 "Validated {}/{} records for collection {} from repo {}", 578 validated_records.len(), 579 total_records, 580 collection, 581 repo 582 ); 583 } else { 584 // No validator available, accept all records 585 validated_records = records; 586 self.log_with_context(slice_uri, LogLevel::Warn, 587 &format!("No lexicon validator available for collection {}", collection), 588 Some(json!({"collection": collection, "repo": repo, "accepted_records": total_records})) 589 ); 590 warn!( 591 "No lexicon validator available - accepting all records without validation for collection {}", 592 collection 593 ); 594 } 595 596 // Add to batch buffer instead of all_records 597 batch_buffer.extend(validated_records); 598 successful_tasks += 1; 599 } 600 Err(_) => { 601 failed_tasks += 1; 602 } 603 } 604 } 605 } 606 Err(_) => { 607 // PDS task failed - count all its requests as failed 608 failed_tasks += 1; 609 } 610 } 611 612 // Send batch to writer when buffer is full 613 if batch_buffer.len() >= BATCH_SIZE { 614 let batch_to_send = std::mem::replace(&mut batch_buffer, Vec::with_capacity(BATCH_SIZE)); 615 let batch_count = batch_to_send.len() as i64; 616 info!("Sending batch of {} records to database writer", batch_count); 617 618 // Send to writer channel (non-blocking) 619 if let Err(e) = tx.send(batch_to_send).await { 620 error!("Failed to send batch to writer: {}", e); 621 return Err(SyncError::Generic(format!("Failed to send batch to writer: {}", e))); 622 } 623 624 let mut total = total_indexed_records.lock().await; 625 *total += batch_count; 626 } 627 } 628 629 // Flush any remaining records in the buffer 630 if !batch_buffer.is_empty() { 631 let batch_count = batch_buffer.len() as i64; 632 info!("Sending final batch of {} records to database writer", batch_count); 633 634 if let Err(e) = tx.send(batch_buffer).await { 635 error!("Failed to send final batch to writer: {}", e); 636 return Err(SyncError::Generic(format!("Failed to send final batch to writer: {}", e))); 637 } 638 639 let mut total = total_indexed_records.lock().await; 640 *total += batch_count; 641 } 642 643 // Close the channel and wait for writer to finish 644 drop(tx); 645 let write_result = writer_task.await 646 .map_err(|e| SyncError::Generic(format!("Writer task panicked: {}", e)))?; 647 648 let final_count = match write_result { 649 Ok(count) => count, 650 Err(e) => return Err(e), 651 }; 652 653 info!( 654 "Debug: {} successful tasks, {} failed tasks", 655 successful_tasks, failed_tasks 656 ); 657 658 info!( 659 "Indexed {} new/changed records in batches", 660 final_count 661 ); 662 663 info!("Backfill complete!"); 664 665 Ok((valid_repos.len() as i64, final_count)) 666 } 667 668 /// Fetch all repositories that have records in a given collection. 669 /// 670 /// Uses cursor-based pagination to fetch all repos from the relay. 671 pub async fn get_repos_for_collection( 672 &self, 673 collection: &str, 674 slice_uri: &str, 675 max_repos: Option<i32>, 676 ) -> Result<Vec<String>, SyncError> { 677 let url = format!( 678 "{}/xrpc/com.atproto.sync.listReposByCollection", 679 self.relay_endpoint 680 ); 681 let mut all_repos = Vec::new(); 682 let mut cursor: Option<String> = None; 683 let mut page_count = 0; 684 685 // AT Protocol docs: default 500 repos/page, max 2000 repos/page 686 // We use 1000 repos/page for efficiency (recommended for large DID lists) 687 const REPOS_PER_PAGE: usize = 1000; // Our configured page size 688 689 // Calculate max pages based on repo limit, with a reasonable safety margin 690 let max_pages = if let Some(limit) = max_repos { 691 // Add 20% safety margin and ensure at least 5 pages 692 ((limit as usize * 120 / 100) / REPOS_PER_PAGE).max(5) 693 } else { 694 25 // Default fallback for unlimited 695 }; 696 697 loop { 698 page_count += 1; 699 if page_count > max_pages { 700 warn!( 701 "Reached maximum page limit ({}) for collection {} (based on repo limit {:?}, estimated max {} repos at {} per page)", 702 max_pages, collection, max_repos, max_pages * REPOS_PER_PAGE, REPOS_PER_PAGE 703 ); 704 break; 705 } 706 707 let mut query_params = vec![ 708 ("collection", collection.to_string()), 709 ("limit", "1000".to_string()), 710 ]; 711 if let Some(ref cursor_value) = cursor { 712 query_params.push(("cursor", cursor_value.clone())); 713 } 714 715 let response = self.client.get(&url).query(&query_params).send().await?; 716 717 if !response.status().is_success() { 718 return Err(SyncError::ListRepos { 719 status: response.status().as_u16(), 720 }); 721 } 722 723 let repos_response: ListReposByCollectionResponse = response.json().await?; 724 725 // Add repos from this page to our collection 726 all_repos.extend(repos_response.repos.into_iter().map(|r| r.did)); 727 728 // Check if there's a next page 729 match repos_response.cursor { 730 Some(next_cursor) if !next_cursor.is_empty() => { 731 cursor = Some(next_cursor); 732 // Log pagination progress if we have a logger 733 self.log_with_context(slice_uri, LogLevel::Info, 734 &format!("Fetching next page of repositories for collection {}, total so far: {}", collection, all_repos.len()), 735 Some(json!({ 736 "collection": collection, 737 "repos_count": all_repos.len(), 738 "has_more": true, 739 "page": page_count 740 })) 741 ); 742 } 743 _ => break, // No more pages 744 } 745 } 746 747 // Log final count 748 self.log_with_context( 749 slice_uri, 750 LogLevel::Info, 751 &format!( 752 "Completed fetching repositories for collection {}, total: {}", 753 collection, 754 all_repos.len() 755 ), 756 Some(json!({ 757 "collection": collection, 758 "total_repos": all_repos.len() 759 })), 760 ); 761 762 Ok(all_repos) 763 } 764 765 /// Fetch records for a repo/collection with retry logic. 766 /// 767 /// If the PDS returns an error, invalidates the cached DID resolution and retries once. 768 /// This handles cases where PDS URLs change. 769 async fn fetch_records_for_repo_collection_with_atp_map( 770 &self, 771 repo: &str, 772 collection: &str, 773 atp_map: &std::collections::HashMap<String, AtpData>, 774 slice_uri: &str, 775 ) -> Result<Vec<Record>, SyncError> { 776 let atp_data = atp_map 777 .get(repo) 778 .ok_or_else(|| SyncError::Generic(format!("No ATP data found for repo: {}", repo)))?; 779 780 match self 781 .fetch_records_for_repo_collection(repo, collection, &atp_data.pds, slice_uri) 782 .await 783 { 784 Ok(records) => Ok(records), 785 Err(SyncError::ListRecords { status }) if (400..600).contains(&status) => { 786 // 4xx/5xx error from PDS - try invalidating cache and retrying once 787 debug!( 788 "PDS error {} for repo {}, attempting cache invalidation and retry", 789 status, repo 790 ); 791 792 match resolve_actor_data_with_retry(&self.client, repo, self.cache.clone(), true) 793 .await 794 { 795 Ok(fresh_actor_data) => { 796 debug!( 797 "Successfully re-resolved actor data for {}, retrying with PDS: {}", 798 repo, fresh_actor_data.pds 799 ); 800 self.fetch_records_for_repo_collection( 801 repo, 802 collection, 803 &fresh_actor_data.pds, 804 slice_uri, 805 ) 806 .await 807 } 808 Err(e) => { 809 debug!("Failed to re-resolve actor data for {}: {:?}", repo, e); 810 Err(SyncError::ListRecords { status }) // Return original error 811 } 812 } 813 } 814 Err(e) => Err(e), // Other errors (network, etc.) - don't retry 815 } 816 } 817 818 /// Fetch records for a specific repo and collection from its PDS. 819 /// 820 /// Only returns new or changed records (compared by CID). 821 /// Uses cursor-based pagination to fetch all records. 822 /// 823 /// # Memory optimizations: 824 /// - Pre-allocated Vec with 100 capacity (typical collection size) 825 /// - Fetches in 100-record pages to limit response size 826 /// - Reuses HTTP connections via client pooling 827 async fn fetch_records_for_repo_collection( 828 &self, 829 repo: &str, 830 collection: &str, 831 pds_url: &str, 832 slice_uri: &str, 833 ) -> Result<Vec<Record>, SyncError> { 834 // Get existing record CIDs to skip unchanged records 835 let existing_cids = self 836 .database 837 .get_existing_record_cids_for_slice(repo, collection, slice_uri) 838 .await 839 .map_err(|e| SyncError::Generic(format!("Failed to get existing CIDs: {}", e)))?; 840 841 debug!( 842 "Found {} existing records for {}/{}", 843 existing_cids.len(), 844 repo, 845 collection 846 ); 847 848 // Pre-allocate based on typical collection size (100 records) 849 // This avoids Vec reallocations which can cause memory fragmentation 850 let mut records = Vec::with_capacity(100); 851 let mut cursor: Option<String> = None; 852 let mut fetched_count = 0; 853 let mut skipped_count = 0; 854 855 loop { 856 let mut params = vec![("repo", repo), ("collection", collection), ("limit", "100")]; 857 if let Some(ref c) = cursor { 858 params.push(("cursor", c)); 859 } 860 861 let request_url = format!("{}/xrpc/com.atproto.repo.listRecords", pds_url); 862 let response = self.client.get(&request_url).query(&params).send().await; 863 864 let response = match response { 865 Ok(resp) => resp, 866 Err(e) => { 867 self.log_with_context(slice_uri, LogLevel::Error, 868 &format!("Failed to fetch records from {}: Network error: {}", repo, e), 869 Some(json!({"repo": repo, "collection": collection, "pds_url": pds_url, "error": e.to_string()})) 870 ); 871 return Err(SyncError::from(e)); 872 } 873 }; 874 875 if !response.status().is_success() { 876 let status = response.status().as_u16(); 877 878 // HTTP 400/404 are expected when collections don't exist - log as info, not error 879 let (log_level, log_message) = if status == 400 || status == 404 { 880 ( 881 LogLevel::Info, 882 format!( 883 "Collection '{}' not found for {}: HTTP {}", 884 collection, repo, status 885 ), 886 ) 887 } else { 888 ( 889 LogLevel::Error, 890 format!( 891 "Failed to fetch records from {}: HTTP {} from PDS", 892 repo, status 893 ), 894 ) 895 }; 896 897 self.log_with_context(slice_uri, log_level, 898 &log_message, 899 Some(json!({"repo": repo, "collection": collection, "pds_url": pds_url, "http_status": status})) 900 ); 901 return Err(SyncError::ListRecords { status }); 902 } 903 904 let list_response: ListRecordsResponse = response.json().await?; 905 906 for atproto_record in list_response.records { 907 // Check if we already have this record with the same CID 908 if let Some(existing_cid) = existing_cids.get(&atproto_record.uri) 909 && existing_cid == &atproto_record.cid 910 { 911 // Record unchanged, skip it 912 skipped_count += 1; 913 continue; 914 } 915 916 // Record is new or changed, include it 917 // TODO: Consider using Arc<str> for frequently cloned strings 918 let record = Record { 919 uri: atproto_record.uri, 920 cid: atproto_record.cid, 921 did: repo.to_string(), 922 collection: collection.to_string(), 923 json: atproto_record.value, 924 indexed_at: Utc::now(), 925 slice_uri: Some(slice_uri.to_string()), 926 }; 927 records.push(record); 928 fetched_count += 1; 929 } 930 931 cursor = list_response.cursor; 932 if cursor.is_none() { 933 break; 934 } 935 } 936 937 // Log results for this repo/collection 938 if fetched_count > 0 || skipped_count > 0 { 939 self.log_with_context( 940 slice_uri, 941 LogLevel::Info, 942 &format!( 943 "Fetched {} new/changed, skipped {} unchanged records from {}/{}", 944 fetched_count, skipped_count, repo, collection 945 ), 946 Some(json!({ 947 "repo": repo, 948 "collection": collection, 949 "new_records": fetched_count, 950 "skipped_records": skipped_count, 951 "pds_url": pds_url 952 })), 953 ); 954 } 955 956 if skipped_count > 0 { 957 info!( 958 "Skipped {} unchanged records, fetched {} new/changed records for {}/{}", 959 skipped_count, fetched_count, repo, collection 960 ); 961 } 962 963 Ok(records) 964 } 965 966 /// Resolve ATP data (DID, PDS, handle) for multiple repos. 967 /// 968 /// Returns a map of DID -> AtpData. Failed resolutions are logged but don't fail the operation. 969 /// 970 /// # Performance optimizations: 971 /// - Processes DIDs in 50-item chunks to limit memory usage 972 /// - 10 concurrent DNS resolutions max to avoid resolver exhaustion 973 /// - Pre-allocated HashMap based on input size 974 async fn get_atp_map_for_repos( 975 &self, 976 repos: &[String], 977 ) -> Result<std::collections::HashMap<String, AtpData>, SyncError> { 978 let mut atp_map = std::collections::HashMap::with_capacity(repos.len()); 979 const CHUNK_SIZE: usize = 50; // Process DIDs in chunks 980 const MAX_CONCURRENT: usize = 10; // Limit concurrent resolutions 981 982 info!("Resolving ATP data for {} repositories in chunks", repos.len()); 983 984 for (chunk_idx, chunk) in repos.chunks(CHUNK_SIZE).enumerate() { 985 let chunk_start = chunk_idx * CHUNK_SIZE; 986 let chunk_end = std::cmp::min(chunk_start + CHUNK_SIZE, repos.len()); 987 988 debug!( 989 "Processing DID resolution chunk {}/{} (repos {}-{})", 990 chunk_idx + 1, 991 repos.len().div_ceil(CHUNK_SIZE), 992 chunk_start, 993 chunk_end - 1 994 ); 995 996 // Process this chunk with limited concurrency 997 let mut resolution_tasks = Vec::new(); 998 999 for batch in chunk.chunks(MAX_CONCURRENT) { 1000 let mut batch_futures = Vec::new(); 1001 1002 for repo in batch { 1003 let repo_clone = repo.clone(); 1004 let self_clone = self.clone(); 1005 1006 let fut = async move { 1007 match self_clone.resolve_atp_data(&repo_clone).await { 1008 Ok(atp_data) => Some((atp_data.did.clone(), atp_data)), 1009 Err(e) => { 1010 warn!("Failed to resolve ATP data for {}: {:?}", repo_clone, e); 1011 None 1012 } 1013 } 1014 }; 1015 batch_futures.push(fut); 1016 } 1017 1018 // Wait for this batch to complete 1019 let batch_results = future::join_all(batch_futures).await; 1020 resolution_tasks.extend(batch_results); 1021 } 1022 1023 // Add resolved data to map 1024 for (did, atp_data) in resolution_tasks.into_iter().flatten() { 1025 atp_map.insert(did, atp_data); 1026 } 1027 1028 // Small delay between chunks to be kind to DNS resolvers 1029 if chunk_idx < repos.len().div_ceil(CHUNK_SIZE) - 1 { 1030 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; 1031 } 1032 } 1033 1034 info!("Successfully resolved ATP data for {}/{} repositories", atp_map.len(), repos.len()); 1035 Ok(atp_map) 1036 } 1037 1038 /// Resolve ATP data for a single DID. 1039 /// 1040 /// Uses DID resolution to get the DID document, then extracts PDS and handle. 1041 /// Results are cached to avoid repeated lookups. 1042 async fn resolve_atp_data(&self, did: &str) -> Result<AtpData, SyncError> { 1043 debug!("Resolving ATP data for DID: {}", did); 1044 1045 let dns_resolver = HickoryDnsResolver::create_resolver(&[]); 1046 1047 match resolve_subject(&self.client, &dns_resolver, did).await { 1048 Ok(resolved_did) => { 1049 debug!("Successfully resolved subject: {}", resolved_did); 1050 1051 let actor_data = 1052 resolve_actor_data_cached(&self.client, &resolved_did, self.cache.clone()) 1053 .await 1054 .map_err(|e| SyncError::Generic(e.to_string()))?; 1055 1056 let atp_data = AtpData { 1057 did: actor_data.did, 1058 pds: actor_data.pds, 1059 handle: actor_data.handle, 1060 }; 1061 1062 Ok(atp_data) 1063 } 1064 Err(e) => Err(SyncError::Generic(format!( 1065 "Failed to resolve subject for {}: {:?}", 1066 did, e 1067 ))), 1068 } 1069 } 1070 1071 /// Index actors (DIDs with handles) into the database. 1072 /// 1073 /// Creates actor records for all repos being synced. 1074 async fn index_actors( 1075 &self, 1076 slice_uri: &str, 1077 repos: &[String], 1078 atp_map: &std::collections::HashMap<String, AtpData>, 1079 ) -> Result<(), SyncError> { 1080 let mut actors = Vec::new(); 1081 let now = chrono::Utc::now().to_rfc3339(); 1082 1083 for repo in repos { 1084 if let Some(atp_data) = atp_map.get(repo) { 1085 actors.push(Actor { 1086 did: atp_data.did.clone(), 1087 handle: atp_data.handle.clone(), 1088 slice_uri: slice_uri.to_string(), 1089 indexed_at: now.clone(), 1090 }); 1091 } 1092 } 1093 1094 if !actors.is_empty() { 1095 self.database.batch_insert_actors(&actors).await?; 1096 } 1097 1098 Ok(()) 1099 } 1100 1101 /// Get external collections for a slice. 1102 /// 1103 /// External collections are those that don't start with the slice's domain. 1104 /// For example, if slice domain is "com.example", then "app.bsky.feed.post" is external. 1105 async fn get_external_collections_for_slice( 1106 &self, 1107 slice_uri: &str, 1108 ) -> Result<Vec<String>, SyncError> { 1109 // Get the slice's domain 1110 let domain = self 1111 .database 1112 .get_slice_domain(slice_uri) 1113 .await 1114 .map_err(|e| SyncError::Generic(format!("Failed to get slice domain: {}", e)))? 1115 .ok_or_else(|| SyncError::Generic(format!("Slice not found: {}", slice_uri)))?; 1116 1117 // Get all collections (lexicons) for this slice 1118 let collections = self 1119 .database 1120 .get_slice_collections_list(slice_uri) 1121 .await 1122 .map_err(|e| SyncError::Generic(format!("Failed to get slice collections: {}", e)))?; 1123 1124 // Filter for external collections (those that don't start with the slice domain) 1125 let external_collections: Vec<String> = collections 1126 .into_iter() 1127 .filter(|collection| !collection.starts_with(&domain)) 1128 .collect(); 1129 1130 info!( 1131 "Found {} external collections for slice {} (domain: {}): {:?}", 1132 external_collections.len(), 1133 slice_uri, 1134 domain, 1135 external_collections 1136 ); 1137 1138 Ok(external_collections) 1139 } 1140 1141 /// Sync user's data for all external collections defined in the slice. 1142 /// 1143 /// Used during login flows to quickly sync a user's data. 1144 /// Automatically discovers which collections to sync based on slice configuration. 1145 /// Uses timeout protection to ensure responsive login flows. 1146 /// 1147 /// # Arguments 1148 /// 1149 /// * `user_did` - The user's DID to sync 1150 /// * `slice_uri` - The slice to sync into 1151 /// * `timeout_secs` - Maximum seconds to wait before timing out 1152 /// 1153 /// # Returns 1154 /// 1155 /// Result with repos_processed, records_synced, and timeout status 1156 pub async fn sync_user_collections( 1157 &self, 1158 user_did: &str, 1159 slice_uri: &str, 1160 timeout_secs: u64, 1161 ) -> Result<SyncUserCollectionsResult, SyncError> { 1162 info!( 1163 "Auto-discovering external collections for user {} in slice {}", 1164 user_did, slice_uri 1165 ); 1166 1167 // Auto-discover external collections from slice configuration 1168 let external_collections = self.get_external_collections_for_slice(slice_uri).await?; 1169 1170 if external_collections.is_empty() { 1171 info!("No external collections found for slice {}", slice_uri); 1172 return Ok(SyncUserCollectionsResult { 1173 success: true, 1174 repos_processed: 0, 1175 records_synced: 0, 1176 timed_out: false, 1177 message: "No external collections to sync".to_string(), 1178 }); 1179 } 1180 1181 info!( 1182 "Syncing {} external collections for user {}: {:?}", 1183 external_collections.len(), 1184 user_did, 1185 external_collections 1186 ); 1187 1188 // Use backfill_collections with timeout, only syncing this specific user 1189 let sync_future = async { 1190 self.backfill_collections( 1191 slice_uri, 1192 None, // No primary collections for user sync 1193 Some(&external_collections), 1194 Some(&[user_did.to_string()]), // Only sync this user's repos 1195 false, // Always validate user collections 1196 None, // No limit for user-specific sync 1197 ) 1198 .await 1199 }; 1200 1201 match timeout(Duration::from_secs(timeout_secs), sync_future).await { 1202 Ok(result) => { 1203 let (repos_processed, records_synced) = result?; 1204 info!( 1205 "User sync completed within timeout: {} repos, {} records", 1206 repos_processed, records_synced 1207 ); 1208 Ok(SyncUserCollectionsResult { 1209 success: true, 1210 repos_processed, 1211 records_synced, 1212 timed_out: false, 1213 message: format!( 1214 "Sync completed: {} repos, {} records", 1215 repos_processed, records_synced 1216 ), 1217 }) 1218 } 1219 Err(_) => { 1220 // Timeout occurred - return partial success with guidance 1221 warn!( 1222 "Sync for user {} timed out after {}s, suggest using async job", 1223 user_did, timeout_secs 1224 ); 1225 Ok(SyncUserCollectionsResult { 1226 success: false, 1227 repos_processed: 0, 1228 records_synced: 0, 1229 timed_out: true, 1230 message: format!( 1231 "Sync timed out after {}s - use startSync endpoint for larger syncs", 1232 timeout_secs 1233 ), 1234 }) 1235 } 1236 } 1237 } 1238}