//! Bulk synchronization operations with the ATProto relay. //! //! This module handles backfilling and syncing data from the ATProto network via the relay endpoint. //! It provides: //! - Memory-efficient batch processing with streaming writes //! - Concurrent database operations using channel-based architecture //! - HTTP/2 connection pooling for optimal network utilization //! - Rate-limited PDS requests (3 concurrent per server) //! - DID resolution with caching and chunked processing //! - Parallel record validation against Lexicon schemas //! - Actor indexing with pre-allocated data structures use atproto_identity::resolve::{HickoryDnsResolver, resolve_subject}; use chrono::Utc; use futures_util::future; use reqwest::Client; use serde::{Deserialize, Serialize}; use serde_json::Value; use tokio::sync::mpsc; use tokio::time::{Duration, timeout}; use tracing::{debug, error, info, warn}; use crate::actor_resolver::{resolve_actor_data_cached, resolve_actor_data_with_retry}; use crate::cache::SliceCache; use crate::database::Database; use crate::errors::SyncError; use crate::jobs::is_job_cancelled; use crate::logging::LogLevel; use crate::logging::Logger; use crate::models::{Actor, Record}; use serde_json::json; use std::sync::Arc; use tokio::sync::Mutex; use uuid::Uuid; // ============================================================================= // ATProto API Response Types // ============================================================================= /// Record returned from ATProto `com.atproto.repo.listRecords` endpoint. #[derive(Debug, Deserialize)] struct AtProtoRecord { uri: String, cid: String, value: Value, } /// Response from `com.atproto.repo.listRecords` with cursor-based pagination. #[derive(Debug, Deserialize)] struct ListRecordsResponse { records: Vec, cursor: Option, } /// Response from `com.atproto.sync.listReposByCollection` with cursor-based pagination. #[derive(Debug, Deserialize)] struct ListReposByCollectionResponse { repos: Vec, cursor: Option, } /// Repository reference from the relay (contains DID). #[derive(Debug, Deserialize)] struct RepoRef { did: String, } // ============================================================================= // Internal Data Structures // ============================================================================= /// Resolved ATProto actor data (DID, PDS, handle). #[derive(Debug, Clone)] struct AtpData { did: String, pds: String, handle: Option, } // ============================================================================= // Public API Types // ============================================================================= /// Result from syncing user collections (used in login flows). #[derive(Debug, Serialize)] #[serde(rename_all = "camelCase")] pub struct SyncUserCollectionsResult { pub success: bool, pub repos_processed: i64, pub records_synced: i64, pub timed_out: bool, pub message: String, } // ============================================================================= // Sync Service // ============================================================================= /// Service for synchronizing ATProto data from the relay. /// /// Handles bulk backfills, user syncs, rate limiting, and validation. #[derive(Clone)] pub struct SyncService { client: Client, database: Database, relay_endpoint: String, cache: Option>>, logger: Option, job_id: Option, user_did: Option, } impl SyncService { /// Create a new SyncService with cache (for use outside job contexts). pub fn with_cache( database: Database, relay_endpoint: String, cache: Arc>, ) -> Self { // Create HTTP client with connection pooling and optimized settings let client = Client::builder() .pool_idle_timeout(Duration::from_secs(90)) .pool_max_idle_per_host(10) .http2_keep_alive_interval(Some(Duration::from_secs(30))) .http2_keep_alive_timeout(Duration::from_secs(10)) .timeout(Duration::from_secs(30)) .build() .unwrap_or_else(|_| Client::new()); Self { client, database, relay_endpoint, cache: Some(cache), logger: None, job_id: None, user_did: None, } } /// Create a new SyncService with logging and cache enabled for a specific job pub fn with_logging_and_cache( database: Database, relay_endpoint: String, logger: Logger, job_id: Uuid, user_did: String, cache: Arc>, ) -> Self { // Create HTTP client with connection pooling and optimized settings let client = Client::builder() .pool_idle_timeout(Duration::from_secs(90)) .pool_max_idle_per_host(10) .http2_keep_alive_interval(Some(Duration::from_secs(30))) .http2_keep_alive_timeout(Duration::from_secs(10)) .timeout(Duration::from_secs(30)) .build() .unwrap_or_else(|_| Client::new()); Self { client, database, relay_endpoint, cache: Some(cache), logger: Some(logger), job_id: Some(job_id), user_did: Some(user_did), } } /// Log a message with job context (job_id, user_did, slice_uri). /// Only logs if this service was created with logging enabled. fn log_with_context( &self, slice_uri: &str, level: LogLevel, message: &str, metadata: Option, ) { if let (Some(logger), Some(job_id), Some(user_did)) = (&self.logger, &self.job_id, &self.user_did) { logger.log_sync_job(*job_id, user_did, slice_uri, level, message, metadata); } } /// Check if this job has been cancelled. /// /// Returns Ok(()) if not cancelled, Err(SyncError::Cancelled) if cancelled. /// Only checks if this service has a job_id (created with logging enabled). async fn check_cancellation(&self) -> Result<(), SyncError> { if let Some(job_id) = self.job_id { let is_cancelled = is_job_cancelled(self.database.pool(), job_id) .await .map_err(|e| SyncError::DatabaseQuery(e.to_string()))?; if is_cancelled { info!("Job {} has been cancelled, stopping sync", job_id); return Err(SyncError::Cancelled); } } Ok(()) } /// Backfill collections from the ATProto relay. /// /// This is the main entry point for bulk synchronization operations. /// /// # Arguments /// /// * `slice_uri` - The slice to backfill data into /// * `collections` - Primary collections (owned by slice domain) to backfill /// * `external_collections` - External collections (not owned by slice) to backfill /// * `repos` - Specific repos to sync (if None, fetches all repos for collections) /// * `skip_validation` - Skip Lexicon validation (useful for testing) /// /// # Returns /// /// Tuple of (repos_processed, records_synced) /// /// # Performance Optimizations /// /// - Requests are grouped by PDS server with 3 concurrent requests max /// - Records are processed in 500-item batches to limit memory usage /// - Database writes happen concurrently via channels /// - HTTP/2 connection pooling reduces network overhead pub async fn backfill_collections( &self, slice_uri: &str, collections: Option<&[String]>, external_collections: Option<&[String]>, repos: Option<&[String]>, skip_validation: bool, max_repos: Option, ) -> Result<(i64, i64), SyncError> { info!("Starting backfill operation"); let primary_collections = collections.map(|c| c.to_vec()).unwrap_or_default(); let external_collections = external_collections.map(|c| c.to_vec()).unwrap_or_default(); if !primary_collections.is_empty() { info!( "Processing {} primary collections: {}", primary_collections.len(), primary_collections.join(", ") ); } if !external_collections.is_empty() { info!( "Processing {} external collections: {}", external_collections.len(), external_collections.join(", ") ); } if primary_collections.is_empty() && external_collections.is_empty() { info!("No collections specified for backfill"); return Ok((0, 0)); } let all_collections = [&primary_collections[..], &external_collections[..]].concat(); // Fetch repos to process (either provided or discovered from collections) let all_repos = if let Some(provided_repos) = repos { info!("Using {} provided repositories", provided_repos.len()); provided_repos.to_vec() } else { info!("Fetching repositories for collections..."); let mut unique_repos = std::collections::HashSet::new(); // First, get all repos from primary collections let mut primary_repos = std::collections::HashSet::new(); for collection in &primary_collections { // Check for cancellation between collections self.check_cancellation().await?; match self.get_repos_for_collection(collection, slice_uri, max_repos).await { Ok(repos) => { info!( "Found {} repositories for primary collection \"{}\"", repos.len(), collection ); self.log_with_context( slice_uri, LogLevel::Info, &format!( "Found {} repositories for collection '{}'", repos.len(), collection ), Some(json!({"collection": collection, "repo_count": repos.len()})), ); primary_repos.extend(repos); } Err(e) => { error!( "Failed to get repos for primary collection {}: {}", collection, e ); self.log_with_context( slice_uri, LogLevel::Error, &format!( "Failed to fetch repositories for collection '{}': {}", collection, e ), Some(json!({"collection": collection, "error": e.to_string()})), ); } } } info!( "Found {} unique repositories from primary collections", primary_repos.len() ); // Use primary repos for syncing (both primary and external collections) unique_repos.extend(primary_repos); let repos: Vec = unique_repos.into_iter().collect(); info!("Processing {} unique repositories", repos.len()); repos }; // Resolve DID -> PDS/handle mappings for all repos info!("Resolving ATP data for repositories..."); let atp_map = self.get_atp_map_for_repos(&all_repos).await?; info!( "Resolved ATP data for {}/{} repositories", atp_map.len(), all_repos.len() ); // Check for cancellation after DID resolution (before spawning expensive fetch tasks) self.check_cancellation().await?; // Only sync repos that successfully resolved let valid_repos: Vec = atp_map.keys().cloned().collect(); let failed_resolutions = all_repos.len() - valid_repos.len(); if failed_resolutions > 0 { info!( "{} repositories failed DID resolution and will be skipped", failed_resolutions ); } info!("Starting sync for {} repositories...", valid_repos.len()); // Group requests by PDS server for rate limiting and connection reuse // Pre-allocated capacity avoids HashMap resizing during insertions let mut requests_by_pds: std::collections::HashMap> = std::collections::HashMap::with_capacity(atp_map.len()); for repo in &valid_repos { if let Some(atp_data) = atp_map.get(repo) { let pds_url = atp_data.pds.clone(); for collection in &all_collections { requests_by_pds .entry(pds_url.clone()) .or_default() .push((repo.clone(), collection.clone())); } } } info!( "Fetching records with rate limiting: {} PDS servers, {} total requests", requests_by_pds.len(), requests_by_pds.values().map(|v| v.len()).sum::() ); // Process each PDS server with limited concurrency // 3 concurrent requests balances speed vs memory usage // Lower than 3 = too slow, Higher than 3 = memory pressure let mut fetch_tasks = Vec::new(); const MAX_CONCURRENT_PER_PDS: usize = 3; for (_pds_url, repo_collections) in requests_by_pds { let sync_service = self.clone(); let atp_map_clone = atp_map.clone(); let slice_uri_clone = slice_uri.to_string(); // Process this PDS server's requests in chunks let pds_task = tokio::spawn(async move { let mut pds_results = Vec::new(); // Split requests into chunks and process with limited concurrency for chunk in repo_collections.chunks(MAX_CONCURRENT_PER_PDS) { let mut chunk_tasks = Vec::new(); for (repo, collection) in chunk { let repo_clone = repo.clone(); let collection_clone = collection.clone(); let sync_service_clone = sync_service.clone(); let atp_map_inner = atp_map_clone.clone(); let slice_uri_inner = slice_uri_clone.clone(); let task = tokio::spawn(async move { match sync_service_clone .fetch_records_for_repo_collection_with_atp_map( &repo_clone, &collection_clone, &atp_map_inner, &slice_uri_inner, ) .await { Ok(records) => Ok((repo_clone, collection_clone, records)), Err(e) => { // Handle common "not error" scenarios as empty results match &e { SyncError::ListRecords { status } => { if *status == 404 || *status == 400 { // Collection doesn't exist for this repo - return empty Ok((repo_clone, collection_clone, vec![])) } else { Err(e) } } SyncError::HttpRequest(_) => { // Network errors - treat as empty (like TypeScript version) Ok((repo_clone, collection_clone, vec![])) } _ => Err(e), } } } }); chunk_tasks.push(task); } // Wait for this chunk to complete before starting the next for task in chunk_tasks { if let Ok(result) = task.await { pds_results.push(result); } } // Small delay between chunks to be kind to PDS servers if chunk.len() == MAX_CONCURRENT_PER_PDS { tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; } } pds_results }); fetch_tasks.push(pds_task); } // Collect all results let mut successful_tasks = 0; let mut failed_tasks = 0; // Get lexicons for this slice let lexicons = match self.database.get_lexicons_by_slice(slice_uri).await { Ok(lexicons) if !lexicons.is_empty() => Some(lexicons), Ok(_) => { warn!("No lexicons found for slice {}", slice_uri); None } Err(e) => { warn!("Failed to get lexicons for slice {}: {}", slice_uri, e); None } }; // Index actors first (ensuring actor records exist before inserting records) info!("Indexing actors..."); self.index_actors(slice_uri, &valid_repos, &atp_map).await?; info!("Indexed {} actors", valid_repos.len()); // Set up concurrent database writer using channels // This allows fetching to continue while DB writes happen in parallel // 500-record batches optimize for memory usage and DB transaction size const BATCH_SIZE: usize = 500; let (tx, mut rx) = mpsc::channel::>(4); // Buffer prevents backpressure let database = self.database.clone(); let total_indexed_records = Arc::new(Mutex::new(0i64)); // Spawn database writer task let writer_task = tokio::spawn(async move { let mut write_count = 0i64; while let Some(batch) = rx.recv().await { let batch_size = batch.len() as i64; match database.batch_insert_records(&batch).await { Ok(_) => { write_count += batch_size; info!( "Database writer: Inserted batch of {} records (total: {})", batch_size, write_count ); } Err(e) => { error!("Database writer: Failed to insert batch: {}", e); return Err(SyncError::Generic(format!("Failed to insert batch: {}", e))); } } } Ok(write_count) }); // Process results from each PDS server let mut batch_buffer = Vec::with_capacity(BATCH_SIZE); for pds_task in fetch_tasks { // Check for cancellation between processing PDS results self.check_cancellation().await?; match pds_task.await { Ok(pds_results) => { // Process each result from this PDS for result in pds_results { match result { Ok((repo, collection, records)) => { let mut validated_records = Vec::new(); let total_records = records.len(); // Skip validation if requested if skip_validation { validated_records = records; info!( "Validation skipped - accepting all {} records for collection {} from repo {}", total_records, collection, repo ); } // Validate each record if we have lexicons else if let Some(ref lexicons) = lexicons { let mut validation_errors = Vec::new(); // Process validations in chunks for better CPU cache locality // 50 records per chunk optimizes L2/L3 cache usage const VALIDATION_CHUNK_SIZE: usize = 50; for chunk in records.chunks(VALIDATION_CHUNK_SIZE) { // Check for cancellation between validation chunks self.check_cancellation().await?; for record in chunk { match slices_lexicon::validate_record( lexicons.clone(), &collection, record.json.clone(), ) { Ok(_) => { validated_records.push(record.clone()); } Err(e) => { let error_msg = format!( "Validation failed for record {} from {}: {}", record.uri, repo, e ); warn!("{}", error_msg); validation_errors.push(json!({ "uri": record.uri, "error": e.to_string() })); // Log individual validation failures self.log_with_context( slice_uri, LogLevel::Warn, &error_msg, Some(json!({ "repo": repo, "collection": collection, "record_uri": record.uri, "validation_error": e.to_string() })), ); } } } } let valid_count = validated_records.len(); let invalid_count = validation_errors.len(); if invalid_count > 0 { self.log_with_context(slice_uri, LogLevel::Warn, &format!("Validation completed for {}/{}: {} valid, {} invalid records", repo, collection, valid_count, invalid_count), Some(json!({ "repo": repo, "collection": collection, "valid_records": valid_count, "invalid_records": invalid_count, "validation_errors": validation_errors })) ); } else { self.log_with_context( slice_uri, LogLevel::Info, &format!( "All {} records validated successfully for {}/{}", valid_count, repo, collection ), Some(json!({ "repo": repo, "collection": collection, "valid_records": valid_count })), ); } info!( "Validated {}/{} records for collection {} from repo {}", validated_records.len(), total_records, collection, repo ); } else { // No validator available, accept all records validated_records = records; self.log_with_context(slice_uri, LogLevel::Warn, &format!("No lexicon validator available for collection {}", collection), Some(json!({"collection": collection, "repo": repo, "accepted_records": total_records})) ); warn!( "No lexicon validator available - accepting all records without validation for collection {}", collection ); } // Add to batch buffer instead of all_records batch_buffer.extend(validated_records); successful_tasks += 1; } Err(_) => { failed_tasks += 1; } } } } Err(_) => { // PDS task failed - count all its requests as failed failed_tasks += 1; } } // Send batch to writer when buffer is full if batch_buffer.len() >= BATCH_SIZE { let batch_to_send = std::mem::replace(&mut batch_buffer, Vec::with_capacity(BATCH_SIZE)); let batch_count = batch_to_send.len() as i64; info!( "Sending batch of {} records to database writer", batch_count ); // Send to writer channel (non-blocking) if let Err(e) = tx.send(batch_to_send).await { error!("Failed to send batch to writer: {}", e); return Err(SyncError::Generic(format!( "Failed to send batch to writer: {}", e ))); } let mut total = total_indexed_records.lock().await; *total += batch_count; } } // Flush any remaining records in the buffer if !batch_buffer.is_empty() { let batch_count = batch_buffer.len() as i64; info!( "Sending final batch of {} records to database writer", batch_count ); if let Err(e) = tx.send(batch_buffer).await { error!("Failed to send final batch to writer: {}", e); return Err(SyncError::Generic(format!( "Failed to send final batch to writer: {}", e ))); } let mut total = total_indexed_records.lock().await; *total += batch_count; } // Close the channel and wait for writer to finish drop(tx); let write_result = writer_task .await .map_err(|e| SyncError::Generic(format!("Writer task panicked: {}", e)))?; let final_count = match write_result { Ok(count) => count, Err(e) => return Err(e), }; info!( "Debug: {} successful tasks, {} failed tasks", successful_tasks, failed_tasks ); info!("Indexed {} new/changed records in batches", final_count); info!("Backfill complete!"); Ok((valid_repos.len() as i64, final_count)) } /// Fetch all repositories that have records in a given collection. /// /// Uses cursor-based pagination to fetch all repos from the relay. pub async fn get_repos_for_collection( &self, collection: &str, slice_uri: &str, max_repos: Option, ) -> Result, SyncError> { let url = format!( "{}/xrpc/com.atproto.sync.listReposByCollection", self.relay_endpoint ); let mut all_repos = Vec::new(); let mut cursor: Option = None; let mut page_count = 0; // AT Protocol docs: default 500 repos/page, max 2000 repos/page // We use 1000 repos/page for efficiency (recommended for large DID lists) const REPOS_PER_PAGE: usize = 1000; // Our configured page size // Calculate max pages based on repo limit, with a reasonable safety margin let max_pages = if let Some(limit) = max_repos { // Add 20% safety margin and ensure at least 5 pages ((limit as usize * 120 / 100) / REPOS_PER_PAGE).max(5) } else { 25 // Default fallback for unlimited }; loop { page_count += 1; if page_count > max_pages { warn!( "Reached maximum page limit ({}) for collection {} (based on repo limit {:?}, estimated max {} repos at {} per page)", max_pages, collection, max_repos, max_pages * REPOS_PER_PAGE, REPOS_PER_PAGE ); break; } let mut query_params = vec![ ("collection", collection.to_string()), ("limit", "1000".to_string()), ]; if let Some(ref cursor_value) = cursor { query_params.push(("cursor", cursor_value.clone())); } let response = self.client.get(&url).query(&query_params).send().await?; if !response.status().is_success() { return Err(SyncError::ListRepos { status: response.status().as_u16(), }); } let repos_response: ListReposByCollectionResponse = response.json().await?; // Add repos from this page to our collection all_repos.extend(repos_response.repos.into_iter().map(|r| r.did)); // Check if there's a next page match repos_response.cursor { Some(next_cursor) if !next_cursor.is_empty() => { cursor = Some(next_cursor); // Log pagination progress if we have a logger self.log_with_context(slice_uri, LogLevel::Info, &format!("Fetching next page of repositories for collection {}, total so far: {}", collection, all_repos.len()), Some(json!({ "collection": collection, "repos_count": all_repos.len(), "has_more": true, "page": page_count })) ); } _ => break, // No more pages } } // Log final count self.log_with_context( slice_uri, LogLevel::Info, &format!( "Completed fetching repositories for collection {}, total: {}", collection, all_repos.len() ), Some(json!({ "collection": collection, "total_repos": all_repos.len() })), ); Ok(all_repos) } /// Fetch records for a repo/collection with retry logic. /// /// If the PDS returns an error, invalidates the cached DID resolution and retries once. /// This handles cases where PDS URLs change. async fn fetch_records_for_repo_collection_with_atp_map( &self, repo: &str, collection: &str, atp_map: &std::collections::HashMap, slice_uri: &str, ) -> Result, SyncError> { let atp_data = atp_map .get(repo) .ok_or_else(|| SyncError::Generic(format!("No ATP data found for repo: {}", repo)))?; match self .fetch_records_for_repo_collection(repo, collection, &atp_data.pds, slice_uri) .await { Ok(records) => Ok(records), Err(SyncError::ListRecords { status }) if (400..600).contains(&status) => { // 4xx/5xx error from PDS - try invalidating cache and retrying once debug!( "PDS error {} for repo {}, attempting cache invalidation and retry", status, repo ); match resolve_actor_data_with_retry(&self.client, repo, self.cache.clone(), true) .await { Ok(fresh_actor_data) => { debug!( "Successfully re-resolved actor data for {}, retrying with PDS: {}", repo, fresh_actor_data.pds ); self.fetch_records_for_repo_collection( repo, collection, &fresh_actor_data.pds, slice_uri, ) .await } Err(e) => { debug!("Failed to re-resolve actor data for {}: {:?}", repo, e); Err(SyncError::ListRecords { status }) // Return original error } } } Err(e) => Err(e), // Other errors (network, etc.) - don't retry } } /// Fetch records for a specific repo and collection from its PDS. /// /// Only returns new or changed records (compared by CID). /// Uses cursor-based pagination to fetch all records. /// /// # Memory optimizations: /// - Pre-allocated Vec with 100 capacity (typical collection size) /// - Fetches in 100-record pages to limit response size /// - Reuses HTTP connections via client pooling async fn fetch_records_for_repo_collection( &self, repo: &str, collection: &str, pds_url: &str, slice_uri: &str, ) -> Result, SyncError> { // Get existing record CIDs to skip unchanged records let existing_cids = self .database .get_existing_record_cids_for_slice(repo, collection, slice_uri) .await .map_err(|e| SyncError::Generic(format!("Failed to get existing CIDs: {}", e)))?; debug!( "Found {} existing records for {}/{}", existing_cids.len(), repo, collection ); // Pre-allocate based on typical collection size (100 records) // This avoids Vec reallocations which can cause memory fragmentation let mut records = Vec::with_capacity(100); let mut cursor: Option = None; let mut fetched_count = 0; let mut skipped_count = 0; loop { let mut params = vec![("repo", repo), ("collection", collection), ("limit", "100")]; if let Some(ref c) = cursor { params.push(("cursor", c)); } let request_url = format!("{}/xrpc/com.atproto.repo.listRecords", pds_url); let response = self.client.get(&request_url).query(¶ms).send().await; let response = match response { Ok(resp) => resp, Err(e) => { self.log_with_context(slice_uri, LogLevel::Error, &format!("Failed to fetch records from {}: Network error: {}", repo, e), Some(json!({"repo": repo, "collection": collection, "pds_url": pds_url, "error": e.to_string()})) ); return Err(SyncError::from(e)); } }; if !response.status().is_success() { let status = response.status().as_u16(); // HTTP 400/404 are expected when collections don't exist - log as info, not error let (log_level, log_message) = if status == 400 || status == 404 { ( LogLevel::Info, format!( "Collection '{}' not found for {}: HTTP {}", collection, repo, status ), ) } else { ( LogLevel::Error, format!( "Failed to fetch records from {}: HTTP {} from PDS", repo, status ), ) }; self.log_with_context(slice_uri, log_level, &log_message, Some(json!({"repo": repo, "collection": collection, "pds_url": pds_url, "http_status": status})) ); return Err(SyncError::ListRecords { status }); } let list_response: ListRecordsResponse = response.json().await?; for atproto_record in list_response.records { // Check if we already have this record with the same CID if let Some(existing_cid) = existing_cids.get(&atproto_record.uri) && existing_cid == &atproto_record.cid { // Record unchanged, skip it skipped_count += 1; continue; } // Record is new or changed, include it // TODO: Consider using Arc for frequently cloned strings let record = Record { uri: atproto_record.uri, cid: atproto_record.cid, did: repo.to_string(), collection: collection.to_string(), json: atproto_record.value, indexed_at: Utc::now(), slice_uri: Some(slice_uri.to_string()), }; records.push(record); fetched_count += 1; } cursor = list_response.cursor; if cursor.is_none() { break; } } // Log results for this repo/collection if fetched_count > 0 || skipped_count > 0 { self.log_with_context( slice_uri, LogLevel::Info, &format!( "Fetched {} new/changed, skipped {} unchanged records from {}/{}", fetched_count, skipped_count, repo, collection ), Some(json!({ "repo": repo, "collection": collection, "new_records": fetched_count, "skipped_records": skipped_count, "pds_url": pds_url })), ); } if skipped_count > 0 { info!( "Skipped {} unchanged records, fetched {} new/changed records for {}/{}", skipped_count, fetched_count, repo, collection ); } Ok(records) } /// Resolve ATP data (DID, PDS, handle) for multiple repos. /// /// Returns a map of DID -> AtpData. Failed resolutions are logged but don't fail the operation. /// /// # Performance optimizations: /// - Processes DIDs in 50-item chunks to limit memory usage /// - 10 concurrent DNS resolutions max to avoid resolver exhaustion /// - Pre-allocated HashMap based on input size async fn get_atp_map_for_repos( &self, repos: &[String], ) -> Result, SyncError> { let mut atp_map = std::collections::HashMap::with_capacity(repos.len()); const CHUNK_SIZE: usize = 50; // Process DIDs in chunks const MAX_CONCURRENT: usize = 10; // Limit concurrent resolutions info!( "Resolving ATP data for {} repositories in chunks", repos.len() ); for (chunk_idx, chunk) in repos.chunks(CHUNK_SIZE).enumerate() { let chunk_start = chunk_idx * CHUNK_SIZE; let chunk_end = std::cmp::min(chunk_start + CHUNK_SIZE, repos.len()); debug!( "Processing DID resolution chunk {}/{} (repos {}-{})", chunk_idx + 1, repos.len().div_ceil(CHUNK_SIZE), chunk_start, chunk_end - 1 ); // Process this chunk with limited concurrency let mut resolution_tasks = Vec::new(); for batch in chunk.chunks(MAX_CONCURRENT) { let mut batch_futures = Vec::new(); for repo in batch { let repo_clone = repo.clone(); let self_clone = self.clone(); let fut = async move { match self_clone.resolve_atp_data(&repo_clone).await { Ok(atp_data) => Some((atp_data.did.clone(), atp_data)), Err(e) => { warn!("Failed to resolve ATP data for {}: {:?}", repo_clone, e); None } } }; batch_futures.push(fut); } // Wait for this batch to complete let batch_results = future::join_all(batch_futures).await; resolution_tasks.extend(batch_results); } // Add resolved data to map for (did, atp_data) in resolution_tasks.into_iter().flatten() { atp_map.insert(did, atp_data); } // Small delay between chunks to be kind to DNS resolvers if chunk_idx < repos.len().div_ceil(CHUNK_SIZE) - 1 { tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; } } info!( "Successfully resolved ATP data for {}/{} repositories", atp_map.len(), repos.len() ); Ok(atp_map) } /// Resolve ATP data for a single DID. /// /// Uses DID resolution to get the DID document, then extracts PDS and handle. /// Results are cached to avoid repeated lookups. async fn resolve_atp_data(&self, did: &str) -> Result { debug!("Resolving ATP data for DID: {}", did); let dns_resolver = HickoryDnsResolver::create_resolver(&[]); match resolve_subject(&self.client, &dns_resolver, did).await { Ok(resolved_did) => { debug!("Successfully resolved subject: {}", resolved_did); let actor_data = resolve_actor_data_cached(&self.client, &resolved_did, self.cache.clone()) .await .map_err(|e| SyncError::Generic(e.to_string()))?; let atp_data = AtpData { did: actor_data.did, pds: actor_data.pds, handle: actor_data.handle, }; Ok(atp_data) } Err(e) => Err(SyncError::Generic(format!( "Failed to resolve subject for {}: {:?}", did, e ))), } } /// Index actors (DIDs with handles) into the database. /// /// Creates actor records for all repos being synced. async fn index_actors( &self, slice_uri: &str, repos: &[String], atp_map: &std::collections::HashMap, ) -> Result<(), SyncError> { let mut actors = Vec::new(); let now = chrono::Utc::now().to_rfc3339(); for repo in repos { if let Some(atp_data) = atp_map.get(repo) { actors.push(Actor { did: atp_data.did.clone(), handle: atp_data.handle.clone(), slice_uri: slice_uri.to_string(), indexed_at: now.clone(), }); } } if !actors.is_empty() { self.database.batch_insert_actors(&actors).await?; } Ok(()) } /// Get external collections for a slice. /// /// External collections are those that don't start with the slice's domain. /// For example, if slice domain is "com.example", then "app.bsky.feed.post" is external. async fn get_external_collections_for_slice( &self, slice_uri: &str, ) -> Result, SyncError> { // Get the slice's domain let domain = self .database .get_slice_domain(slice_uri) .await .map_err(|e| SyncError::Generic(format!("Failed to get slice domain: {}", e)))? .ok_or_else(|| SyncError::Generic(format!("Slice not found: {}", slice_uri)))?; // Get all collections (lexicons) for this slice let collections = self .database .get_slice_collections_list(slice_uri) .await .map_err(|e| SyncError::Generic(format!("Failed to get slice collections: {}", e)))?; // Filter for external collections (those that don't start with the slice domain) let external_collections: Vec = collections .into_iter() .filter(|collection| !collection.starts_with(&domain)) .collect(); info!( "Found {} external collections for slice {} (domain: {}): {:?}", external_collections.len(), slice_uri, domain, external_collections ); Ok(external_collections) } /// Sync user's data for all external collections defined in the slice. /// /// Used during login flows to quickly sync a user's data. /// Automatically discovers which collections to sync based on slice configuration. /// Uses timeout protection to ensure responsive login flows. /// /// # Arguments /// /// * `user_did` - The user's DID to sync /// * `slice_uri` - The slice to sync into /// * `timeout_secs` - Maximum seconds to wait before timing out /// /// # Returns /// /// Result with repos_processed, records_synced, and timeout status pub async fn sync_user_collections( &self, user_did: &str, slice_uri: &str, timeout_secs: u64, ) -> Result { info!( "Auto-discovering external collections for user {} in slice {}", user_did, slice_uri ); // Auto-discover external collections from slice configuration let external_collections = self.get_external_collections_for_slice(slice_uri).await?; if external_collections.is_empty() { info!("No external collections found for slice {}", slice_uri); return Ok(SyncUserCollectionsResult { success: true, repos_processed: 0, records_synced: 0, timed_out: false, message: "No external collections to sync".to_string(), }); } info!( "Syncing {} external collections for user {}: {:?}", external_collections.len(), user_did, external_collections ); // Use backfill_collections with timeout, only syncing this specific user let sync_future = async { self.backfill_collections( slice_uri, None, // No primary collections for user sync Some(&external_collections), Some(&[user_did.to_string()]), // Only sync this user's repos false, // Always validate user collections None, // No limit for user-specific sync ) .await }; match timeout(Duration::from_secs(timeout_secs), sync_future).await { Ok(result) => { let (repos_processed, records_synced) = result?; info!( "User sync completed within timeout: {} repos, {} records", repos_processed, records_synced ); Ok(SyncUserCollectionsResult { success: true, repos_processed, records_synced, timed_out: false, message: format!( "Sync completed: {} repos, {} records", repos_processed, records_synced ), }) } Err(_) => { // Timeout occurred - return partial success with guidance warn!( "Sync for user {} timed out after {}s, suggest using async job", user_did, timeout_secs ); Ok(SyncUserCollectionsResult { success: false, repos_processed: 0, records_synced: 0, timed_out: true, message: format!( "Sync timed out after {}s - use startSync endpoint for larger syncs", timeout_secs ), }) } } } }