Highly ambitious ATProtocol AppView service and sdks
at main 52 kB view raw
1//! Bulk synchronization operations with the ATProto relay. 2//! 3//! This module handles backfilling and syncing data from the ATProto network via the relay endpoint. 4//! It provides: 5//! - Memory-efficient batch processing with streaming writes 6//! - Concurrent database operations using channel-based architecture 7//! - HTTP/2 connection pooling for optimal network utilization 8//! - Rate-limited PDS requests (3 concurrent per server) 9//! - DID resolution with caching and chunked processing 10//! - Parallel record validation against Lexicon schemas 11//! - Actor indexing with pre-allocated data structures 12 13use atproto_identity::resolve::{HickoryDnsResolver, resolve_subject}; 14use chrono::Utc; 15use futures_util::future; 16use reqwest::Client; 17use serde::{Deserialize, Serialize}; 18use serde_json::Value; 19use tokio::sync::mpsc; 20use tokio::time::{Duration, timeout}; 21use tracing::{debug, error, info, warn}; 22 23use crate::actor_resolver::{resolve_actor_data_cached, resolve_actor_data_with_retry}; 24use crate::cache::SliceCache; 25use crate::database::Database; 26use crate::errors::SyncError; 27use crate::jobs::is_job_cancelled; 28use crate::logging::LogLevel; 29use crate::logging::Logger; 30use crate::models::{Actor, Record}; 31use serde_json::json; 32use std::sync::Arc; 33use tokio::sync::Mutex; 34use uuid::Uuid; 35 36// ============================================================================= 37// ATProto API Response Types 38// ============================================================================= 39 40/// Record returned from ATProto `com.atproto.repo.listRecords` endpoint. 41#[derive(Debug, Deserialize)] 42struct AtProtoRecord { 43 uri: String, 44 cid: String, 45 value: Value, 46} 47 48/// Response from `com.atproto.repo.listRecords` with cursor-based pagination. 49#[derive(Debug, Deserialize)] 50struct ListRecordsResponse { 51 records: Vec<AtProtoRecord>, 52 cursor: Option<String>, 53} 54 55/// Response from `com.atproto.sync.listReposByCollection` with cursor-based pagination. 56#[derive(Debug, Deserialize)] 57struct ListReposByCollectionResponse { 58 repos: Vec<RepoRef>, 59 cursor: Option<String>, 60} 61 62/// Repository reference from the relay (contains DID). 63#[derive(Debug, Deserialize)] 64struct RepoRef { 65 did: String, 66} 67 68// ============================================================================= 69// Internal Data Structures 70// ============================================================================= 71 72/// Resolved ATProto actor data (DID, PDS, handle). 73#[derive(Debug, Clone)] 74struct AtpData { 75 did: String, 76 pds: String, 77 handle: Option<String>, 78} 79 80// ============================================================================= 81// Public API Types 82// ============================================================================= 83 84/// Result from syncing user collections (used in login flows). 85#[derive(Debug, Serialize)] 86#[serde(rename_all = "camelCase")] 87pub struct SyncUserCollectionsResult { 88 pub success: bool, 89 pub repos_processed: i64, 90 pub records_synced: i64, 91 pub timed_out: bool, 92 pub message: String, 93} 94 95// ============================================================================= 96// Sync Service 97// ============================================================================= 98 99/// Service for synchronizing ATProto data from the relay. 100/// 101/// Handles bulk backfills, user syncs, rate limiting, and validation. 102#[derive(Clone)] 103pub struct SyncService { 104 client: Client, 105 database: Database, 106 relay_endpoint: String, 107 cache: Option<Arc<Mutex<SliceCache>>>, 108 logger: Option<Logger>, 109 job_id: Option<Uuid>, 110 user_did: Option<String>, 111} 112 113impl SyncService { 114 /// Create a new SyncService with cache (for use outside job contexts). 115 pub fn with_cache( 116 database: Database, 117 relay_endpoint: String, 118 cache: Arc<Mutex<SliceCache>>, 119 ) -> Self { 120 // Create HTTP client with connection pooling and optimized settings 121 let client = Client::builder() 122 .pool_idle_timeout(Duration::from_secs(90)) 123 .pool_max_idle_per_host(10) 124 .http2_keep_alive_interval(Some(Duration::from_secs(30))) 125 .http2_keep_alive_timeout(Duration::from_secs(10)) 126 .timeout(Duration::from_secs(30)) 127 .build() 128 .unwrap_or_else(|_| Client::new()); 129 130 Self { 131 client, 132 database, 133 relay_endpoint, 134 cache: Some(cache), 135 logger: None, 136 job_id: None, 137 user_did: None, 138 } 139 } 140 141 /// Create a new SyncService with logging and cache enabled for a specific job 142 pub fn with_logging_and_cache( 143 database: Database, 144 relay_endpoint: String, 145 logger: Logger, 146 job_id: Uuid, 147 user_did: String, 148 cache: Arc<Mutex<SliceCache>>, 149 ) -> Self { 150 // Create HTTP client with connection pooling and optimized settings 151 let client = Client::builder() 152 .pool_idle_timeout(Duration::from_secs(90)) 153 .pool_max_idle_per_host(10) 154 .http2_keep_alive_interval(Some(Duration::from_secs(30))) 155 .http2_keep_alive_timeout(Duration::from_secs(10)) 156 .timeout(Duration::from_secs(30)) 157 .build() 158 .unwrap_or_else(|_| Client::new()); 159 160 Self { 161 client, 162 database, 163 relay_endpoint, 164 cache: Some(cache), 165 logger: Some(logger), 166 job_id: Some(job_id), 167 user_did: Some(user_did), 168 } 169 } 170 171 /// Log a message with job context (job_id, user_did, slice_uri). 172 /// Only logs if this service was created with logging enabled. 173 fn log_with_context( 174 &self, 175 slice_uri: &str, 176 level: LogLevel, 177 message: &str, 178 metadata: Option<serde_json::Value>, 179 ) { 180 if let (Some(logger), Some(job_id), Some(user_did)) = 181 (&self.logger, &self.job_id, &self.user_did) 182 { 183 logger.log_sync_job(*job_id, user_did, slice_uri, level, message, metadata); 184 } 185 } 186 187 /// Check if this job has been cancelled. 188 /// 189 /// Returns Ok(()) if not cancelled, Err(SyncError::Cancelled) if cancelled. 190 /// Only checks if this service has a job_id (created with logging enabled). 191 async fn check_cancellation(&self) -> Result<(), SyncError> { 192 if let Some(job_id) = self.job_id { 193 let is_cancelled = is_job_cancelled(self.database.pool(), job_id) 194 .await 195 .map_err(|e| SyncError::DatabaseQuery(e.to_string()))?; 196 197 if is_cancelled { 198 info!("Job {} has been cancelled, stopping sync", job_id); 199 return Err(SyncError::Cancelled); 200 } 201 } 202 Ok(()) 203 } 204 205 /// Backfill collections from the ATProto relay. 206 /// 207 /// This is the main entry point for bulk synchronization operations. 208 /// 209 /// # Arguments 210 /// 211 /// * `slice_uri` - The slice to backfill data into 212 /// * `collections` - Primary collections (owned by slice domain) to backfill 213 /// * `external_collections` - External collections (not owned by slice) to backfill 214 /// * `repos` - Specific repos to sync (if None, fetches all repos for collections) 215 /// * `skip_validation` - Skip Lexicon validation (useful for testing) 216 /// 217 /// # Returns 218 /// 219 /// Tuple of (repos_processed, records_synced) 220 /// 221 /// # Performance Optimizations 222 /// 223 /// - Requests are grouped by PDS server with 3 concurrent requests max 224 /// - Records are processed in 500-item batches to limit memory usage 225 /// - Database writes happen concurrently via channels 226 /// - HTTP/2 connection pooling reduces network overhead 227 pub async fn backfill_collections( 228 &self, 229 slice_uri: &str, 230 collections: Option<&[String]>, 231 external_collections: Option<&[String]>, 232 repos: Option<&[String]>, 233 skip_validation: bool, 234 max_repos: Option<i32>, 235 ) -> Result<(i64, i64), SyncError> { 236 info!("Starting backfill operation"); 237 238 let primary_collections = collections.map(|c| c.to_vec()).unwrap_or_default(); 239 let external_collections = external_collections.map(|c| c.to_vec()).unwrap_or_default(); 240 241 if !primary_collections.is_empty() { 242 info!( 243 "Processing {} primary collections: {}", 244 primary_collections.len(), 245 primary_collections.join(", ") 246 ); 247 } 248 249 if !external_collections.is_empty() { 250 info!( 251 "Processing {} external collections: {}", 252 external_collections.len(), 253 external_collections.join(", ") 254 ); 255 } 256 257 if primary_collections.is_empty() && external_collections.is_empty() { 258 info!("No collections specified for backfill"); 259 return Ok((0, 0)); 260 } 261 262 let all_collections = [&primary_collections[..], &external_collections[..]].concat(); 263 264 // Fetch repos to process (either provided or discovered from collections) 265 let all_repos = if let Some(provided_repos) = repos { 266 info!("Using {} provided repositories", provided_repos.len()); 267 provided_repos.to_vec() 268 } else { 269 info!("Fetching repositories for collections..."); 270 let mut unique_repos = std::collections::HashSet::new(); 271 272 // First, get all repos from primary collections 273 let mut primary_repos = std::collections::HashSet::new(); 274 for collection in &primary_collections { 275 // Check for cancellation between collections 276 self.check_cancellation().await?; 277 278 match self.get_repos_for_collection(collection, slice_uri, max_repos).await { 279 Ok(repos) => { 280 info!( 281 "Found {} repositories for primary collection \"{}\"", 282 repos.len(), 283 collection 284 ); 285 self.log_with_context( 286 slice_uri, 287 LogLevel::Info, 288 &format!( 289 "Found {} repositories for collection '{}'", 290 repos.len(), 291 collection 292 ), 293 Some(json!({"collection": collection, "repo_count": repos.len()})), 294 ); 295 primary_repos.extend(repos); 296 } 297 Err(e) => { 298 error!( 299 "Failed to get repos for primary collection {}: {}", 300 collection, e 301 ); 302 self.log_with_context( 303 slice_uri, 304 LogLevel::Error, 305 &format!( 306 "Failed to fetch repositories for collection '{}': {}", 307 collection, e 308 ), 309 Some(json!({"collection": collection, "error": e.to_string()})), 310 ); 311 } 312 } 313 } 314 315 info!( 316 "Found {} unique repositories from primary collections", 317 primary_repos.len() 318 ); 319 320 // Use primary repos for syncing (both primary and external collections) 321 unique_repos.extend(primary_repos); 322 323 let repos: Vec<String> = unique_repos.into_iter().collect(); 324 info!("Processing {} unique repositories", repos.len()); 325 repos 326 }; 327 328 // Resolve DID -> PDS/handle mappings for all repos 329 info!("Resolving ATP data for repositories..."); 330 let atp_map = self.get_atp_map_for_repos(&all_repos).await?; 331 info!( 332 "Resolved ATP data for {}/{} repositories", 333 atp_map.len(), 334 all_repos.len() 335 ); 336 337 // Check for cancellation after DID resolution (before spawning expensive fetch tasks) 338 self.check_cancellation().await?; 339 340 // Only sync repos that successfully resolved 341 let valid_repos: Vec<String> = atp_map.keys().cloned().collect(); 342 let failed_resolutions = all_repos.len() - valid_repos.len(); 343 344 if failed_resolutions > 0 { 345 info!( 346 "{} repositories failed DID resolution and will be skipped", 347 failed_resolutions 348 ); 349 } 350 351 info!("Starting sync for {} repositories...", valid_repos.len()); 352 353 // Group requests by PDS server for rate limiting and connection reuse 354 // Pre-allocated capacity avoids HashMap resizing during insertions 355 let mut requests_by_pds: std::collections::HashMap<String, Vec<(String, String)>> = 356 std::collections::HashMap::with_capacity(atp_map.len()); 357 358 for repo in &valid_repos { 359 if let Some(atp_data) = atp_map.get(repo) { 360 let pds_url = atp_data.pds.clone(); 361 for collection in &all_collections { 362 requests_by_pds 363 .entry(pds_url.clone()) 364 .or_default() 365 .push((repo.clone(), collection.clone())); 366 } 367 } 368 } 369 370 info!( 371 "Fetching records with rate limiting: {} PDS servers, {} total requests", 372 requests_by_pds.len(), 373 requests_by_pds.values().map(|v| v.len()).sum::<usize>() 374 ); 375 376 // Process each PDS server with limited concurrency 377 // 3 concurrent requests balances speed vs memory usage 378 // Lower than 3 = too slow, Higher than 3 = memory pressure 379 let mut fetch_tasks = Vec::new(); 380 const MAX_CONCURRENT_PER_PDS: usize = 3; 381 382 for (_pds_url, repo_collections) in requests_by_pds { 383 let sync_service = self.clone(); 384 let atp_map_clone = atp_map.clone(); 385 let slice_uri_clone = slice_uri.to_string(); 386 387 // Process this PDS server's requests in chunks 388 let pds_task = tokio::spawn(async move { 389 let mut pds_results = Vec::new(); 390 391 // Split requests into chunks and process with limited concurrency 392 for chunk in repo_collections.chunks(MAX_CONCURRENT_PER_PDS) { 393 let mut chunk_tasks = Vec::new(); 394 395 for (repo, collection) in chunk { 396 let repo_clone = repo.clone(); 397 let collection_clone = collection.clone(); 398 let sync_service_clone = sync_service.clone(); 399 let atp_map_inner = atp_map_clone.clone(); 400 let slice_uri_inner = slice_uri_clone.clone(); 401 402 let task = tokio::spawn(async move { 403 match sync_service_clone 404 .fetch_records_for_repo_collection_with_atp_map( 405 &repo_clone, 406 &collection_clone, 407 &atp_map_inner, 408 &slice_uri_inner, 409 ) 410 .await 411 { 412 Ok(records) => Ok((repo_clone, collection_clone, records)), 413 Err(e) => { 414 // Handle common "not error" scenarios as empty results 415 match &e { 416 SyncError::ListRecords { status } => { 417 if *status == 404 || *status == 400 { 418 // Collection doesn't exist for this repo - return empty 419 Ok((repo_clone, collection_clone, vec![])) 420 } else { 421 Err(e) 422 } 423 } 424 SyncError::HttpRequest(_) => { 425 // Network errors - treat as empty (like TypeScript version) 426 Ok((repo_clone, collection_clone, vec![])) 427 } 428 _ => Err(e), 429 } 430 } 431 } 432 }); 433 chunk_tasks.push(task); 434 } 435 436 // Wait for this chunk to complete before starting the next 437 for task in chunk_tasks { 438 if let Ok(result) = task.await { 439 pds_results.push(result); 440 } 441 } 442 443 // Small delay between chunks to be kind to PDS servers 444 if chunk.len() == MAX_CONCURRENT_PER_PDS { 445 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; 446 } 447 } 448 449 pds_results 450 }); 451 452 fetch_tasks.push(pds_task); 453 } 454 455 // Collect all results 456 let mut successful_tasks = 0; 457 let mut failed_tasks = 0; 458 459 // Get lexicons for this slice 460 let lexicons = match self.database.get_lexicons_by_slice(slice_uri).await { 461 Ok(lexicons) if !lexicons.is_empty() => Some(lexicons), 462 Ok(_) => { 463 warn!("No lexicons found for slice {}", slice_uri); 464 None 465 } 466 Err(e) => { 467 warn!("Failed to get lexicons for slice {}: {}", slice_uri, e); 468 None 469 } 470 }; 471 472 // Index actors first (ensuring actor records exist before inserting records) 473 info!("Indexing actors..."); 474 self.index_actors(slice_uri, &valid_repos, &atp_map).await?; 475 info!("Indexed {} actors", valid_repos.len()); 476 477 // Set up concurrent database writer using channels 478 // This allows fetching to continue while DB writes happen in parallel 479 // 500-record batches optimize for memory usage and DB transaction size 480 const BATCH_SIZE: usize = 500; 481 let (tx, mut rx) = mpsc::channel::<Vec<Record>>(4); // Buffer prevents backpressure 482 let database = self.database.clone(); 483 let total_indexed_records = Arc::new(Mutex::new(0i64)); 484 485 // Spawn database writer task 486 let writer_task = tokio::spawn(async move { 487 let mut write_count = 0i64; 488 while let Some(batch) = rx.recv().await { 489 let batch_size = batch.len() as i64; 490 match database.batch_insert_records(&batch).await { 491 Ok(_) => { 492 write_count += batch_size; 493 info!( 494 "Database writer: Inserted batch of {} records (total: {})", 495 batch_size, write_count 496 ); 497 } 498 Err(e) => { 499 error!("Database writer: Failed to insert batch: {}", e); 500 return Err(SyncError::Generic(format!("Failed to insert batch: {}", e))); 501 } 502 } 503 } 504 Ok(write_count) 505 }); 506 507 // Process results from each PDS server 508 let mut batch_buffer = Vec::with_capacity(BATCH_SIZE); 509 510 for pds_task in fetch_tasks { 511 // Check for cancellation between processing PDS results 512 self.check_cancellation().await?; 513 514 match pds_task.await { 515 Ok(pds_results) => { 516 // Process each result from this PDS 517 for result in pds_results { 518 match result { 519 Ok((repo, collection, records)) => { 520 let mut validated_records = Vec::new(); 521 let total_records = records.len(); 522 523 // Skip validation if requested 524 if skip_validation { 525 validated_records = records; 526 info!( 527 "Validation skipped - accepting all {} records for collection {} from repo {}", 528 total_records, collection, repo 529 ); 530 } 531 // Validate each record if we have lexicons 532 else if let Some(ref lexicons) = lexicons { 533 let mut validation_errors = Vec::new(); 534 535 // Process validations in chunks for better CPU cache locality 536 // 50 records per chunk optimizes L2/L3 cache usage 537 const VALIDATION_CHUNK_SIZE: usize = 50; 538 for chunk in records.chunks(VALIDATION_CHUNK_SIZE) { 539 // Check for cancellation between validation chunks 540 self.check_cancellation().await?; 541 542 for record in chunk { 543 match slices_lexicon::validate_record( 544 lexicons.clone(), 545 &collection, 546 record.json.clone(), 547 ) { 548 Ok(_) => { 549 validated_records.push(record.clone()); 550 } 551 Err(e) => { 552 let error_msg = format!( 553 "Validation failed for record {} from {}: {}", 554 record.uri, repo, e 555 ); 556 warn!("{}", error_msg); 557 validation_errors.push(json!({ 558 "uri": record.uri, 559 "error": e.to_string() 560 })); 561 562 // Log individual validation failures 563 self.log_with_context( 564 slice_uri, 565 LogLevel::Warn, 566 &error_msg, 567 Some(json!({ 568 "repo": repo, 569 "collection": collection, 570 "record_uri": record.uri, 571 "validation_error": e.to_string() 572 })), 573 ); 574 } 575 } 576 } 577 } 578 579 let valid_count = validated_records.len(); 580 let invalid_count = validation_errors.len(); 581 582 if invalid_count > 0 { 583 self.log_with_context(slice_uri, LogLevel::Warn, 584 &format!("Validation completed for {}/{}: {} valid, {} invalid records", 585 repo, collection, valid_count, invalid_count), 586 Some(json!({ 587 "repo": repo, 588 "collection": collection, 589 "valid_records": valid_count, 590 "invalid_records": invalid_count, 591 "validation_errors": validation_errors 592 })) 593 ); 594 } else { 595 self.log_with_context( 596 slice_uri, 597 LogLevel::Info, 598 &format!( 599 "All {} records validated successfully for {}/{}", 600 valid_count, repo, collection 601 ), 602 Some(json!({ 603 "repo": repo, 604 "collection": collection, 605 "valid_records": valid_count 606 })), 607 ); 608 } 609 610 info!( 611 "Validated {}/{} records for collection {} from repo {}", 612 validated_records.len(), 613 total_records, 614 collection, 615 repo 616 ); 617 } else { 618 // No validator available, accept all records 619 validated_records = records; 620 self.log_with_context(slice_uri, LogLevel::Warn, 621 &format!("No lexicon validator available for collection {}", collection), 622 Some(json!({"collection": collection, "repo": repo, "accepted_records": total_records})) 623 ); 624 warn!( 625 "No lexicon validator available - accepting all records without validation for collection {}", 626 collection 627 ); 628 } 629 630 // Add to batch buffer instead of all_records 631 batch_buffer.extend(validated_records); 632 successful_tasks += 1; 633 } 634 Err(_) => { 635 failed_tasks += 1; 636 } 637 } 638 } 639 } 640 Err(_) => { 641 // PDS task failed - count all its requests as failed 642 failed_tasks += 1; 643 } 644 } 645 646 // Send batch to writer when buffer is full 647 if batch_buffer.len() >= BATCH_SIZE { 648 let batch_to_send = 649 std::mem::replace(&mut batch_buffer, Vec::with_capacity(BATCH_SIZE)); 650 let batch_count = batch_to_send.len() as i64; 651 info!( 652 "Sending batch of {} records to database writer", 653 batch_count 654 ); 655 656 // Send to writer channel (non-blocking) 657 if let Err(e) = tx.send(batch_to_send).await { 658 error!("Failed to send batch to writer: {}", e); 659 return Err(SyncError::Generic(format!( 660 "Failed to send batch to writer: {}", 661 e 662 ))); 663 } 664 665 let mut total = total_indexed_records.lock().await; 666 *total += batch_count; 667 } 668 } 669 670 // Flush any remaining records in the buffer 671 if !batch_buffer.is_empty() { 672 let batch_count = batch_buffer.len() as i64; 673 info!( 674 "Sending final batch of {} records to database writer", 675 batch_count 676 ); 677 678 if let Err(e) = tx.send(batch_buffer).await { 679 error!("Failed to send final batch to writer: {}", e); 680 return Err(SyncError::Generic(format!( 681 "Failed to send final batch to writer: {}", 682 e 683 ))); 684 } 685 686 let mut total = total_indexed_records.lock().await; 687 *total += batch_count; 688 } 689 690 // Close the channel and wait for writer to finish 691 drop(tx); 692 let write_result = writer_task 693 .await 694 .map_err(|e| SyncError::Generic(format!("Writer task panicked: {}", e)))?; 695 696 let final_count = match write_result { 697 Ok(count) => count, 698 Err(e) => return Err(e), 699 }; 700 701 info!( 702 "Debug: {} successful tasks, {} failed tasks", 703 successful_tasks, failed_tasks 704 ); 705 706 info!("Indexed {} new/changed records in batches", final_count); 707 708 info!("Backfill complete!"); 709 710 Ok((valid_repos.len() as i64, final_count)) 711 } 712 713 /// Fetch all repositories that have records in a given collection. 714 /// 715 /// Uses cursor-based pagination to fetch all repos from the relay. 716 pub async fn get_repos_for_collection( 717 &self, 718 collection: &str, 719 slice_uri: &str, 720 max_repos: Option<i32>, 721 ) -> Result<Vec<String>, SyncError> { 722 let url = format!( 723 "{}/xrpc/com.atproto.sync.listReposByCollection", 724 self.relay_endpoint 725 ); 726 let mut all_repos = Vec::new(); 727 let mut cursor: Option<String> = None; 728 let mut page_count = 0; 729 730 // AT Protocol docs: default 500 repos/page, max 2000 repos/page 731 // We use 1000 repos/page for efficiency (recommended for large DID lists) 732 const REPOS_PER_PAGE: usize = 1000; // Our configured page size 733 734 // Calculate max pages based on repo limit, with a reasonable safety margin 735 let max_pages = if let Some(limit) = max_repos { 736 // Add 20% safety margin and ensure at least 5 pages 737 ((limit as usize * 120 / 100) / REPOS_PER_PAGE).max(5) 738 } else { 739 25 // Default fallback for unlimited 740 }; 741 742 loop { 743 page_count += 1; 744 if page_count > max_pages { 745 warn!( 746 "Reached maximum page limit ({}) for collection {} (based on repo limit {:?}, estimated max {} repos at {} per page)", 747 max_pages, 748 collection, 749 max_repos, 750 max_pages * REPOS_PER_PAGE, 751 REPOS_PER_PAGE 752 ); 753 break; 754 } 755 756 let mut query_params = vec![ 757 ("collection", collection.to_string()), 758 ("limit", "1000".to_string()), 759 ]; 760 if let Some(ref cursor_value) = cursor { 761 query_params.push(("cursor", cursor_value.clone())); 762 } 763 764 let response = self.client.get(&url).query(&query_params).send().await?; 765 766 if !response.status().is_success() { 767 return Err(SyncError::ListRepos { 768 status: response.status().as_u16(), 769 }); 770 } 771 772 let repos_response: ListReposByCollectionResponse = response.json().await?; 773 774 // Add repos from this page to our collection 775 all_repos.extend(repos_response.repos.into_iter().map(|r| r.did)); 776 777 // Check if there's a next page 778 match repos_response.cursor { 779 Some(next_cursor) if !next_cursor.is_empty() => { 780 cursor = Some(next_cursor); 781 // Log pagination progress if we have a logger 782 self.log_with_context(slice_uri, LogLevel::Info, 783 &format!("Fetching next page of repositories for collection {}, total so far: {}", collection, all_repos.len()), 784 Some(json!({ 785 "collection": collection, 786 "repos_count": all_repos.len(), 787 "has_more": true, 788 "page": page_count 789 })) 790 ); 791 } 792 _ => break, // No more pages 793 } 794 } 795 796 // Log final count 797 self.log_with_context( 798 slice_uri, 799 LogLevel::Info, 800 &format!( 801 "Completed fetching repositories for collection {}, total: {}", 802 collection, 803 all_repos.len() 804 ), 805 Some(json!({ 806 "collection": collection, 807 "total_repos": all_repos.len() 808 })), 809 ); 810 811 Ok(all_repos) 812 } 813 814 /// Fetch records for a repo/collection with retry logic. 815 /// 816 /// If the PDS returns an error, invalidates the cached DID resolution and retries once. 817 /// This handles cases where PDS URLs change. 818 async fn fetch_records_for_repo_collection_with_atp_map( 819 &self, 820 repo: &str, 821 collection: &str, 822 atp_map: &std::collections::HashMap<String, AtpData>, 823 slice_uri: &str, 824 ) -> Result<Vec<Record>, SyncError> { 825 let atp_data = atp_map 826 .get(repo) 827 .ok_or_else(|| SyncError::Generic(format!("No ATP data found for repo: {}", repo)))?; 828 829 match self 830 .fetch_records_for_repo_collection(repo, collection, &atp_data.pds, slice_uri) 831 .await 832 { 833 Ok(records) => Ok(records), 834 Err(SyncError::ListRecords { status }) if (400..600).contains(&status) => { 835 // 4xx/5xx error from PDS - try invalidating cache and retrying once 836 debug!( 837 "PDS error {} for repo {}, attempting cache invalidation and retry", 838 status, repo 839 ); 840 841 match resolve_actor_data_with_retry(&self.client, repo, self.cache.clone(), true) 842 .await 843 { 844 Ok(fresh_actor_data) => { 845 debug!( 846 "Successfully re-resolved actor data for {}, retrying with PDS: {}", 847 repo, fresh_actor_data.pds 848 ); 849 self.fetch_records_for_repo_collection( 850 repo, 851 collection, 852 &fresh_actor_data.pds, 853 slice_uri, 854 ) 855 .await 856 } 857 Err(e) => { 858 debug!("Failed to re-resolve actor data for {}: {:?}", repo, e); 859 Err(SyncError::ListRecords { status }) // Return original error 860 } 861 } 862 } 863 Err(e) => Err(e), // Other errors (network, etc.) - don't retry 864 } 865 } 866 867 /// Fetch records for a specific repo and collection from its PDS. 868 /// 869 /// Only returns new or changed records (compared by CID). 870 /// Uses cursor-based pagination to fetch all records. 871 /// 872 /// # Memory optimizations: 873 /// - Pre-allocated Vec with 100 capacity (typical collection size) 874 /// - Fetches in 100-record pages to limit response size 875 /// - Reuses HTTP connections via client pooling 876 async fn fetch_records_for_repo_collection( 877 &self, 878 repo: &str, 879 collection: &str, 880 pds_url: &str, 881 slice_uri: &str, 882 ) -> Result<Vec<Record>, SyncError> { 883 // Get existing record CIDs to skip unchanged records 884 let existing_cids = self 885 .database 886 .get_existing_record_cids_for_slice(repo, collection, slice_uri) 887 .await 888 .map_err(|e| SyncError::Generic(format!("Failed to get existing CIDs: {}", e)))?; 889 890 debug!( 891 "Found {} existing records for {}/{}", 892 existing_cids.len(), 893 repo, 894 collection 895 ); 896 897 // Pre-allocate based on typical collection size (100 records) 898 // This avoids Vec reallocations which can cause memory fragmentation 899 let mut records = Vec::with_capacity(100); 900 let mut cursor: Option<String> = None; 901 let mut fetched_count = 0; 902 let mut skipped_count = 0; 903 904 loop { 905 let mut params = vec![("repo", repo), ("collection", collection), ("limit", "100")]; 906 if let Some(ref c) = cursor { 907 params.push(("cursor", c)); 908 } 909 910 let request_url = format!("{}/xrpc/com.atproto.repo.listRecords", pds_url); 911 let response = self.client.get(&request_url).query(&params).send().await; 912 913 let response = match response { 914 Ok(resp) => resp, 915 Err(e) => { 916 self.log_with_context(slice_uri, LogLevel::Error, 917 &format!("Failed to fetch records from {}: Network error: {}", repo, e), 918 Some(json!({"repo": repo, "collection": collection, "pds_url": pds_url, "error": e.to_string()})) 919 ); 920 return Err(SyncError::from(e)); 921 } 922 }; 923 924 if !response.status().is_success() { 925 let status = response.status().as_u16(); 926 927 // HTTP 400/404 are expected when collections don't exist - log as info, not error 928 let (log_level, log_message) = if status == 400 || status == 404 { 929 ( 930 LogLevel::Info, 931 format!( 932 "Collection '{}' not found for {}: HTTP {}", 933 collection, repo, status 934 ), 935 ) 936 } else { 937 ( 938 LogLevel::Error, 939 format!( 940 "Failed to fetch records from {}: HTTP {} from PDS", 941 repo, status 942 ), 943 ) 944 }; 945 946 self.log_with_context(slice_uri, log_level, 947 &log_message, 948 Some(json!({"repo": repo, "collection": collection, "pds_url": pds_url, "http_status": status})) 949 ); 950 return Err(SyncError::ListRecords { status }); 951 } 952 953 let list_response: ListRecordsResponse = response.json().await?; 954 955 for atproto_record in list_response.records { 956 // Check if we already have this record with the same CID 957 if let Some(existing_cid) = existing_cids.get(&atproto_record.uri) 958 && existing_cid == &atproto_record.cid 959 { 960 // Record unchanged, skip it 961 skipped_count += 1; 962 continue; 963 } 964 965 // Record is new or changed, include it 966 // TODO: Consider using Arc<str> for frequently cloned strings 967 let record = Record { 968 uri: atproto_record.uri, 969 cid: atproto_record.cid, 970 did: repo.to_string(), 971 collection: collection.to_string(), 972 json: atproto_record.value, 973 indexed_at: Utc::now(), 974 slice_uri: Some(slice_uri.to_string()), 975 }; 976 records.push(record); 977 fetched_count += 1; 978 } 979 980 cursor = list_response.cursor; 981 if cursor.is_none() { 982 break; 983 } 984 } 985 986 // Log results for this repo/collection 987 if fetched_count > 0 || skipped_count > 0 { 988 self.log_with_context( 989 slice_uri, 990 LogLevel::Info, 991 &format!( 992 "Fetched {} new/changed, skipped {} unchanged records from {}/{}", 993 fetched_count, skipped_count, repo, collection 994 ), 995 Some(json!({ 996 "repo": repo, 997 "collection": collection, 998 "new_records": fetched_count, 999 "skipped_records": skipped_count, 1000 "pds_url": pds_url 1001 })), 1002 ); 1003 } 1004 1005 if skipped_count > 0 { 1006 info!( 1007 "Skipped {} unchanged records, fetched {} new/changed records for {}/{}", 1008 skipped_count, fetched_count, repo, collection 1009 ); 1010 } 1011 1012 Ok(records) 1013 } 1014 1015 /// Resolve ATP data (DID, PDS, handle) for multiple repos. 1016 /// 1017 /// Returns a map of DID -> AtpData. Failed resolutions are logged but don't fail the operation. 1018 /// 1019 /// # Performance optimizations: 1020 /// - Processes DIDs in 50-item chunks to limit memory usage 1021 /// - 10 concurrent DNS resolutions max to avoid resolver exhaustion 1022 /// - Pre-allocated HashMap based on input size 1023 async fn get_atp_map_for_repos( 1024 &self, 1025 repos: &[String], 1026 ) -> Result<std::collections::HashMap<String, AtpData>, SyncError> { 1027 let mut atp_map = std::collections::HashMap::with_capacity(repos.len()); 1028 const CHUNK_SIZE: usize = 50; // Process DIDs in chunks 1029 const MAX_CONCURRENT: usize = 10; // Limit concurrent resolutions 1030 1031 info!( 1032 "Resolving ATP data for {} repositories in chunks", 1033 repos.len() 1034 ); 1035 1036 for (chunk_idx, chunk) in repos.chunks(CHUNK_SIZE).enumerate() { 1037 let chunk_start = chunk_idx * CHUNK_SIZE; 1038 let chunk_end = std::cmp::min(chunk_start + CHUNK_SIZE, repos.len()); 1039 1040 debug!( 1041 "Processing DID resolution chunk {}/{} (repos {}-{})", 1042 chunk_idx + 1, 1043 repos.len().div_ceil(CHUNK_SIZE), 1044 chunk_start, 1045 chunk_end - 1 1046 ); 1047 1048 // Process this chunk with limited concurrency 1049 let mut resolution_tasks = Vec::new(); 1050 1051 for batch in chunk.chunks(MAX_CONCURRENT) { 1052 let mut batch_futures = Vec::new(); 1053 1054 for repo in batch { 1055 let repo_clone = repo.clone(); 1056 let self_clone = self.clone(); 1057 1058 let fut = async move { 1059 match self_clone.resolve_atp_data(&repo_clone).await { 1060 Ok(atp_data) => Some((atp_data.did.clone(), atp_data)), 1061 Err(e) => { 1062 warn!("Failed to resolve ATP data for {}: {:?}", repo_clone, e); 1063 None 1064 } 1065 } 1066 }; 1067 batch_futures.push(fut); 1068 } 1069 1070 // Wait for this batch to complete 1071 let batch_results = future::join_all(batch_futures).await; 1072 resolution_tasks.extend(batch_results); 1073 } 1074 1075 // Add resolved data to map 1076 for (did, atp_data) in resolution_tasks.into_iter().flatten() { 1077 atp_map.insert(did, atp_data); 1078 } 1079 1080 // Small delay between chunks to be kind to DNS resolvers 1081 if chunk_idx < repos.len().div_ceil(CHUNK_SIZE) - 1 { 1082 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; 1083 } 1084 } 1085 1086 info!( 1087 "Successfully resolved ATP data for {}/{} repositories", 1088 atp_map.len(), 1089 repos.len() 1090 ); 1091 Ok(atp_map) 1092 } 1093 1094 /// Resolve ATP data for a single DID. 1095 /// 1096 /// Uses DID resolution to get the DID document, then extracts PDS and handle. 1097 /// Results are cached to avoid repeated lookups. 1098 async fn resolve_atp_data(&self, did: &str) -> Result<AtpData, SyncError> { 1099 debug!("Resolving ATP data for DID: {}", did); 1100 1101 let dns_resolver = HickoryDnsResolver::create_resolver(&[]); 1102 1103 match resolve_subject(&self.client, &dns_resolver, did).await { 1104 Ok(resolved_did) => { 1105 debug!("Successfully resolved subject: {}", resolved_did); 1106 1107 let actor_data = 1108 resolve_actor_data_cached(&self.client, &resolved_did, self.cache.clone()) 1109 .await 1110 .map_err(|e| SyncError::Generic(e.to_string()))?; 1111 1112 let atp_data = AtpData { 1113 did: actor_data.did, 1114 pds: actor_data.pds, 1115 handle: actor_data.handle, 1116 }; 1117 1118 Ok(atp_data) 1119 } 1120 Err(e) => Err(SyncError::Generic(format!( 1121 "Failed to resolve subject for {}: {:?}", 1122 did, e 1123 ))), 1124 } 1125 } 1126 1127 /// Index actors (DIDs with handles) into the database. 1128 /// 1129 /// Creates actor records for all repos being synced. 1130 async fn index_actors( 1131 &self, 1132 slice_uri: &str, 1133 repos: &[String], 1134 atp_map: &std::collections::HashMap<String, AtpData>, 1135 ) -> Result<(), SyncError> { 1136 let mut actors = Vec::new(); 1137 let now = chrono::Utc::now().to_rfc3339(); 1138 1139 for repo in repos { 1140 if let Some(atp_data) = atp_map.get(repo) { 1141 actors.push(Actor { 1142 did: atp_data.did.clone(), 1143 handle: atp_data.handle.clone(), 1144 slice_uri: slice_uri.to_string(), 1145 indexed_at: now.clone(), 1146 }); 1147 } 1148 } 1149 1150 if !actors.is_empty() { 1151 self.database.batch_insert_actors(&actors).await?; 1152 } 1153 1154 Ok(()) 1155 } 1156 1157 /// Get external collections for a slice. 1158 /// 1159 /// External collections are those that don't start with the slice's domain. 1160 /// For example, if slice domain is "com.example", then "app.bsky.feed.post" is external. 1161 async fn get_external_collections_for_slice( 1162 &self, 1163 slice_uri: &str, 1164 ) -> Result<Vec<String>, SyncError> { 1165 // Get the slice's domain 1166 let domain = self 1167 .database 1168 .get_slice_domain(slice_uri) 1169 .await 1170 .map_err(|e| SyncError::Generic(format!("Failed to get slice domain: {}", e)))? 1171 .ok_or_else(|| SyncError::Generic(format!("Slice not found: {}", slice_uri)))?; 1172 1173 // Get all collections (lexicons) for this slice 1174 let collections = self 1175 .database 1176 .get_slice_collections_list(slice_uri) 1177 .await 1178 .map_err(|e| SyncError::Generic(format!("Failed to get slice collections: {}", e)))?; 1179 1180 // Filter for external collections (those that don't start with the slice domain) 1181 let external_collections: Vec<String> = collections 1182 .into_iter() 1183 .filter(|collection| !collection.starts_with(&domain)) 1184 .collect(); 1185 1186 info!( 1187 "Found {} external collections for slice {} (domain: {}): {:?}", 1188 external_collections.len(), 1189 slice_uri, 1190 domain, 1191 external_collections 1192 ); 1193 1194 Ok(external_collections) 1195 } 1196 1197 /// Sync user's data for all external collections defined in the slice. 1198 /// 1199 /// Used during login flows to quickly sync a user's data. 1200 /// Automatically discovers which collections to sync based on slice configuration. 1201 /// Uses timeout protection to ensure responsive login flows. 1202 /// 1203 /// # Arguments 1204 /// 1205 /// * `user_did` - The user's DID to sync 1206 /// * `slice_uri` - The slice to sync into 1207 /// * `timeout_secs` - Maximum seconds to wait before timing out 1208 /// 1209 /// # Returns 1210 /// 1211 /// Result with repos_processed, records_synced, and timeout status 1212 pub async fn sync_user_collections( 1213 &self, 1214 user_did: &str, 1215 slice_uri: &str, 1216 timeout_secs: u64, 1217 ) -> Result<SyncUserCollectionsResult, SyncError> { 1218 info!( 1219 "Auto-discovering external collections for user {} in slice {}", 1220 user_did, slice_uri 1221 ); 1222 1223 // Auto-discover external collections from slice configuration 1224 let external_collections = self.get_external_collections_for_slice(slice_uri).await?; 1225 1226 if external_collections.is_empty() { 1227 info!("No external collections found for slice {}", slice_uri); 1228 return Ok(SyncUserCollectionsResult { 1229 success: true, 1230 repos_processed: 0, 1231 records_synced: 0, 1232 timed_out: false, 1233 message: "No external collections to sync".to_string(), 1234 }); 1235 } 1236 1237 info!( 1238 "Syncing {} external collections for user {}: {:?}", 1239 external_collections.len(), 1240 user_did, 1241 external_collections 1242 ); 1243 1244 // Use backfill_collections with timeout, only syncing this specific user 1245 let sync_future = async { 1246 self.backfill_collections( 1247 slice_uri, 1248 None, // No primary collections for user sync 1249 Some(&external_collections), 1250 Some(&[user_did.to_string()]), // Only sync this user's repos 1251 false, // Always validate user collections 1252 None, // No limit for user-specific sync 1253 ) 1254 .await 1255 }; 1256 1257 match timeout(Duration::from_secs(timeout_secs), sync_future).await { 1258 Ok(result) => { 1259 let (repos_processed, records_synced) = result?; 1260 info!( 1261 "User sync completed within timeout: {} repos, {} records", 1262 repos_processed, records_synced 1263 ); 1264 Ok(SyncUserCollectionsResult { 1265 success: true, 1266 repos_processed, 1267 records_synced, 1268 timed_out: false, 1269 message: format!( 1270 "Sync completed: {} repos, {} records", 1271 repos_processed, records_synced 1272 ), 1273 }) 1274 } 1275 Err(_) => { 1276 // Timeout occurred - return partial success with guidance 1277 warn!( 1278 "Sync for user {} timed out after {}s, suggest using async job", 1279 user_did, timeout_secs 1280 ); 1281 Ok(SyncUserCollectionsResult { 1282 success: false, 1283 repos_processed: 0, 1284 records_synced: 0, 1285 timed_out: true, 1286 message: format!( 1287 "Sync timed out after {}s - use startSync endpoint for larger syncs", 1288 timeout_secs 1289 ), 1290 }) 1291 } 1292 } 1293 } 1294}