forked from
slices.network/slices
Highly ambitious ATProtocol AppView service and sdks
1//! Bulk synchronization operations with the ATProto relay.
2//!
3//! This module handles backfilling and syncing data from the ATProto network via the relay endpoint.
4//! It provides:
5//! - Memory-efficient batch processing with streaming writes
6//! - Concurrent database operations using channel-based architecture
7//! - HTTP/2 connection pooling for optimal network utilization
8//! - Rate-limited PDS requests (3 concurrent per server)
9//! - DID resolution with caching and chunked processing
10//! - Parallel record validation against Lexicon schemas
11//! - Actor indexing with pre-allocated data structures
12
13use atproto_identity::resolve::{HickoryDnsResolver, resolve_subject};
14use chrono::Utc;
15use futures_util::future;
16use reqwest::Client;
17use serde::{Deserialize, Serialize};
18use serde_json::Value;
19use tokio::sync::mpsc;
20use tokio::time::{Duration, timeout};
21use tracing::{debug, error, info, warn};
22
23use crate::actor_resolver::{resolve_actor_data_cached, resolve_actor_data_with_retry};
24use crate::cache::SliceCache;
25use crate::database::Database;
26use crate::errors::SyncError;
27use crate::logging::LogLevel;
28use crate::logging::Logger;
29use crate::models::{Actor, Record};
30use serde_json::json;
31use std::sync::Arc;
32use tokio::sync::Mutex;
33use uuid::Uuid;
34
35// =============================================================================
36// ATProto API Response Types
37// =============================================================================
38
39/// Record returned from ATProto `com.atproto.repo.listRecords` endpoint.
40#[derive(Debug, Deserialize)]
41struct AtProtoRecord {
42 uri: String,
43 cid: String,
44 value: Value,
45}
46
47/// Response from `com.atproto.repo.listRecords` with cursor-based pagination.
48#[derive(Debug, Deserialize)]
49struct ListRecordsResponse {
50 records: Vec<AtProtoRecord>,
51 cursor: Option<String>,
52}
53
54/// Response from `com.atproto.sync.listReposByCollection` with cursor-based pagination.
55#[derive(Debug, Deserialize)]
56struct ListReposByCollectionResponse {
57 repos: Vec<RepoRef>,
58 cursor: Option<String>,
59}
60
61/// Repository reference from the relay (contains DID).
62#[derive(Debug, Deserialize)]
63struct RepoRef {
64 did: String,
65}
66
67// =============================================================================
68// Internal Data Structures
69// =============================================================================
70
71/// Resolved ATProto actor data (DID, PDS, handle).
72#[derive(Debug, Clone)]
73struct AtpData {
74 did: String,
75 pds: String,
76 handle: Option<String>,
77}
78
79// =============================================================================
80// Public API Types
81// =============================================================================
82
83/// Result from syncing user collections (used in login flows).
84#[derive(Debug, Serialize)]
85#[serde(rename_all = "camelCase")]
86pub struct SyncUserCollectionsResult {
87 pub success: bool,
88 pub repos_processed: i64,
89 pub records_synced: i64,
90 pub timed_out: bool,
91 pub message: String,
92}
93
94// =============================================================================
95// Sync Service
96// =============================================================================
97
98/// Service for synchronizing ATProto data from the relay.
99///
100/// Handles bulk backfills, user syncs, rate limiting, and validation.
101#[derive(Clone)]
102pub struct SyncService {
103 client: Client,
104 database: Database,
105 relay_endpoint: String,
106 cache: Option<Arc<Mutex<SliceCache>>>,
107 logger: Option<Logger>,
108 job_id: Option<Uuid>,
109 user_did: Option<String>,
110}
111
112impl SyncService {
113 /// Create a new SyncService with cache (for use outside job contexts).
114 pub fn with_cache(
115 database: Database,
116 relay_endpoint: String,
117 cache: Arc<Mutex<SliceCache>>,
118 ) -> Self {
119 // Create HTTP client with connection pooling and optimized settings
120 let client = Client::builder()
121 .pool_idle_timeout(Duration::from_secs(90))
122 .pool_max_idle_per_host(10)
123 .http2_keep_alive_interval(Some(Duration::from_secs(30)))
124 .http2_keep_alive_timeout(Duration::from_secs(10))
125 .timeout(Duration::from_secs(30))
126 .build()
127 .unwrap_or_else(|_| Client::new());
128
129 Self {
130 client,
131 database,
132 relay_endpoint,
133 cache: Some(cache),
134 logger: None,
135 job_id: None,
136 user_did: None,
137 }
138 }
139
140 /// Create a new SyncService with logging and cache enabled for a specific job
141 pub fn with_logging_and_cache(
142 database: Database,
143 relay_endpoint: String,
144 logger: Logger,
145 job_id: Uuid,
146 user_did: String,
147 cache: Arc<Mutex<SliceCache>>,
148 ) -> Self {
149 // Create HTTP client with connection pooling and optimized settings
150 let client = Client::builder()
151 .pool_idle_timeout(Duration::from_secs(90))
152 .pool_max_idle_per_host(10)
153 .http2_keep_alive_interval(Some(Duration::from_secs(30)))
154 .http2_keep_alive_timeout(Duration::from_secs(10))
155 .timeout(Duration::from_secs(30))
156 .build()
157 .unwrap_or_else(|_| Client::new());
158
159 Self {
160 client,
161 database,
162 relay_endpoint,
163 cache: Some(cache),
164 logger: Some(logger),
165 job_id: Some(job_id),
166 user_did: Some(user_did),
167 }
168 }
169
170 /// Log a message with job context (job_id, user_did, slice_uri).
171 /// Only logs if this service was created with logging enabled.
172 fn log_with_context(
173 &self,
174 slice_uri: &str,
175 level: LogLevel,
176 message: &str,
177 metadata: Option<serde_json::Value>,
178 ) {
179 if let (Some(logger), Some(job_id), Some(user_did)) =
180 (&self.logger, &self.job_id, &self.user_did)
181 {
182 logger.log_sync_job(*job_id, user_did, slice_uri, level, message, metadata);
183 }
184 }
185
186 /// Backfill collections from the ATProto relay.
187 ///
188 /// This is the main entry point for bulk synchronization operations.
189 ///
190 /// # Arguments
191 ///
192 /// * `slice_uri` - The slice to backfill data into
193 /// * `collections` - Primary collections (owned by slice domain) to backfill
194 /// * `external_collections` - External collections (not owned by slice) to backfill
195 /// * `repos` - Specific repos to sync (if None, fetches all repos for collections)
196 /// * `skip_validation` - Skip Lexicon validation (useful for testing)
197 ///
198 /// # Returns
199 ///
200 /// Tuple of (repos_processed, records_synced)
201 ///
202 /// # Performance Optimizations
203 ///
204 /// - Requests are grouped by PDS server with 3 concurrent requests max
205 /// - Records are processed in 500-item batches to limit memory usage
206 /// - Database writes happen concurrently via channels
207 /// - HTTP/2 connection pooling reduces network overhead
208 pub async fn backfill_collections(
209 &self,
210 slice_uri: &str,
211 collections: Option<&[String]>,
212 external_collections: Option<&[String]>,
213 repos: Option<&[String]>,
214 skip_validation: bool,
215 max_repos: Option<i32>,
216 ) -> Result<(i64, i64), SyncError> {
217 info!("Starting backfill operation");
218
219 let primary_collections = collections.map(|c| c.to_vec()).unwrap_or_default();
220 let external_collections = external_collections.map(|c| c.to_vec()).unwrap_or_default();
221
222 if !primary_collections.is_empty() {
223 info!(
224 "Processing {} primary collections: {}",
225 primary_collections.len(),
226 primary_collections.join(", ")
227 );
228 }
229
230 if !external_collections.is_empty() {
231 info!(
232 "Processing {} external collections: {}",
233 external_collections.len(),
234 external_collections.join(", ")
235 );
236 }
237
238 if primary_collections.is_empty() && external_collections.is_empty() {
239 info!("No collections specified for backfill");
240 return Ok((0, 0));
241 }
242
243 let all_collections = [&primary_collections[..], &external_collections[..]].concat();
244
245 // Fetch repos to process (either provided or discovered from collections)
246 let all_repos = if let Some(provided_repos) = repos {
247 info!("Using {} provided repositories", provided_repos.len());
248 provided_repos.to_vec()
249 } else {
250 info!("Fetching repositories for collections...");
251 let mut unique_repos = std::collections::HashSet::new();
252
253 // First, get all repos from primary collections
254 let mut primary_repos = std::collections::HashSet::new();
255 for collection in &primary_collections {
256 match self.get_repos_for_collection(collection, slice_uri, max_repos).await {
257 Ok(repos) => {
258 info!(
259 "Found {} repositories for primary collection \"{}\"",
260 repos.len(),
261 collection
262 );
263 self.log_with_context(
264 slice_uri,
265 LogLevel::Info,
266 &format!(
267 "Found {} repositories for collection '{}'",
268 repos.len(),
269 collection
270 ),
271 Some(json!({"collection": collection, "repo_count": repos.len()})),
272 );
273 primary_repos.extend(repos);
274 }
275 Err(e) => {
276 error!(
277 "Failed to get repos for primary collection {}: {}",
278 collection, e
279 );
280 self.log_with_context(
281 slice_uri,
282 LogLevel::Error,
283 &format!(
284 "Failed to fetch repositories for collection '{}': {}",
285 collection, e
286 ),
287 Some(json!({"collection": collection, "error": e.to_string()})),
288 );
289 }
290 }
291 }
292
293 info!(
294 "Found {} unique repositories from primary collections",
295 primary_repos.len()
296 );
297
298 // Use primary repos for syncing (both primary and external collections)
299 unique_repos.extend(primary_repos);
300
301 let repos: Vec<String> = unique_repos.into_iter().collect();
302 info!("Processing {} unique repositories", repos.len());
303 repos
304 };
305
306 // Resolve DID -> PDS/handle mappings for all repos
307 info!("Resolving ATP data for repositories...");
308 let atp_map = self.get_atp_map_for_repos(&all_repos).await?;
309 info!(
310 "Resolved ATP data for {}/{} repositories",
311 atp_map.len(),
312 all_repos.len()
313 );
314
315 // Only sync repos that successfully resolved
316 let valid_repos: Vec<String> = atp_map.keys().cloned().collect();
317 let failed_resolutions = all_repos.len() - valid_repos.len();
318
319 if failed_resolutions > 0 {
320 info!(
321 "{} repositories failed DID resolution and will be skipped",
322 failed_resolutions
323 );
324 }
325
326 info!("Starting sync for {} repositories...", valid_repos.len());
327
328 // Group requests by PDS server for rate limiting and connection reuse
329 // Pre-allocated capacity avoids HashMap resizing during insertions
330 let mut requests_by_pds: std::collections::HashMap<String, Vec<(String, String)>> =
331 std::collections::HashMap::with_capacity(atp_map.len());
332
333 for repo in &valid_repos {
334 if let Some(atp_data) = atp_map.get(repo) {
335 let pds_url = atp_data.pds.clone();
336 for collection in &all_collections {
337 requests_by_pds
338 .entry(pds_url.clone())
339 .or_default()
340 .push((repo.clone(), collection.clone()));
341 }
342 }
343 }
344
345 info!(
346 "Fetching records with rate limiting: {} PDS servers, {} total requests",
347 requests_by_pds.len(),
348 requests_by_pds.values().map(|v| v.len()).sum::<usize>()
349 );
350
351 // Process each PDS server with limited concurrency
352 // 3 concurrent requests balances speed vs memory usage
353 // Lower than 3 = too slow, Higher than 3 = memory pressure
354 let mut fetch_tasks = Vec::new();
355 const MAX_CONCURRENT_PER_PDS: usize = 3;
356
357 for (_pds_url, repo_collections) in requests_by_pds {
358 let sync_service = self.clone();
359 let atp_map_clone = atp_map.clone();
360 let slice_uri_clone = slice_uri.to_string();
361
362 // Process this PDS server's requests in chunks
363 let pds_task = tokio::spawn(async move {
364 let mut pds_results = Vec::new();
365
366 // Split requests into chunks and process with limited concurrency
367 for chunk in repo_collections.chunks(MAX_CONCURRENT_PER_PDS) {
368 let mut chunk_tasks = Vec::new();
369
370 for (repo, collection) in chunk {
371 let repo_clone = repo.clone();
372 let collection_clone = collection.clone();
373 let sync_service_clone = sync_service.clone();
374 let atp_map_inner = atp_map_clone.clone();
375 let slice_uri_inner = slice_uri_clone.clone();
376
377 let task = tokio::spawn(async move {
378 match sync_service_clone
379 .fetch_records_for_repo_collection_with_atp_map(
380 &repo_clone,
381 &collection_clone,
382 &atp_map_inner,
383 &slice_uri_inner,
384 )
385 .await
386 {
387 Ok(records) => Ok((repo_clone, collection_clone, records)),
388 Err(e) => {
389 // Handle common "not error" scenarios as empty results
390 match &e {
391 SyncError::ListRecords { status } => {
392 if *status == 404 || *status == 400 {
393 // Collection doesn't exist for this repo - return empty
394 Ok((repo_clone, collection_clone, vec![]))
395 } else {
396 Err(e)
397 }
398 }
399 SyncError::HttpRequest(_) => {
400 // Network errors - treat as empty (like TypeScript version)
401 Ok((repo_clone, collection_clone, vec![]))
402 }
403 _ => Err(e),
404 }
405 }
406 }
407 });
408 chunk_tasks.push(task);
409 }
410
411 // Wait for this chunk to complete before starting the next
412 for task in chunk_tasks {
413 if let Ok(result) = task.await {
414 pds_results.push(result);
415 }
416 }
417
418 // Small delay between chunks to be kind to PDS servers
419 if chunk.len() == MAX_CONCURRENT_PER_PDS {
420 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
421 }
422 }
423
424 pds_results
425 });
426
427 fetch_tasks.push(pds_task);
428 }
429
430 // Collect all results
431 let mut successful_tasks = 0;
432 let mut failed_tasks = 0;
433
434 // Get lexicons for this slice
435 let lexicons = match self.database.get_lexicons_by_slice(slice_uri).await {
436 Ok(lexicons) if !lexicons.is_empty() => Some(lexicons),
437 Ok(_) => {
438 warn!("No lexicons found for slice {}", slice_uri);
439 None
440 }
441 Err(e) => {
442 warn!("Failed to get lexicons for slice {}: {}", slice_uri, e);
443 None
444 }
445 };
446
447 // Index actors first (ensuring actor records exist before inserting records)
448 info!("Indexing actors...");
449 self.index_actors(slice_uri, &valid_repos, &atp_map).await?;
450 info!("Indexed {} actors", valid_repos.len());
451
452 // Set up concurrent database writer using channels
453 // This allows fetching to continue while DB writes happen in parallel
454 // 500-record batches optimize for memory usage and DB transaction size
455 const BATCH_SIZE: usize = 500;
456 let (tx, mut rx) = mpsc::channel::<Vec<Record>>(4); // Buffer prevents backpressure
457 let database = self.database.clone();
458 let total_indexed_records = Arc::new(Mutex::new(0i64));
459
460 // Spawn database writer task
461 let writer_task = tokio::spawn(async move {
462 let mut write_count = 0i64;
463 while let Some(batch) = rx.recv().await {
464 let batch_size = batch.len() as i64;
465 match database.batch_insert_records(&batch).await {
466 Ok(_) => {
467 write_count += batch_size;
468 info!("Database writer: Inserted batch of {} records (total: {})", batch_size, write_count);
469 }
470 Err(e) => {
471 error!("Database writer: Failed to insert batch: {}", e);
472 return Err(SyncError::Generic(format!("Failed to insert batch: {}", e)));
473 }
474 }
475 }
476 Ok(write_count)
477 });
478
479 // Process results from each PDS server
480 let mut batch_buffer = Vec::with_capacity(BATCH_SIZE);
481
482 for pds_task in fetch_tasks {
483 match pds_task.await {
484 Ok(pds_results) => {
485 // Process each result from this PDS
486 for result in pds_results {
487 match result {
488 Ok((repo, collection, records)) => {
489 let mut validated_records = Vec::new();
490 let total_records = records.len();
491
492 // Skip validation if requested
493 if skip_validation {
494 validated_records = records;
495 info!(
496 "Validation skipped - accepting all {} records for collection {} from repo {}",
497 total_records, collection, repo
498 );
499 }
500 // Validate each record if we have lexicons
501 else if let Some(ref lexicons) = lexicons {
502 let mut validation_errors = Vec::new();
503
504 // Process validations in chunks for better CPU cache locality
505 // 50 records per chunk optimizes L2/L3 cache usage
506 const VALIDATION_CHUNK_SIZE: usize = 50;
507 for chunk in records.chunks(VALIDATION_CHUNK_SIZE) {
508 for record in chunk {
509 match slices_lexicon::validate_record(
510 lexicons.clone(),
511 &collection,
512 record.json.clone(),
513 ) {
514 Ok(_) => {
515 validated_records.push(record.clone());
516 }
517 Err(e) => {
518 let error_msg = format!(
519 "Validation failed for record {} from {}: {}",
520 record.uri, repo, e
521 );
522 warn!("{}", error_msg);
523 validation_errors.push(json!({
524 "uri": record.uri,
525 "error": e.to_string()
526 }));
527
528 // Log individual validation failures
529 self.log_with_context(
530 slice_uri,
531 LogLevel::Warn,
532 &error_msg,
533 Some(json!({
534 "repo": repo,
535 "collection": collection,
536 "record_uri": record.uri,
537 "validation_error": e.to_string()
538 })),
539 );
540 }
541 }
542 }
543 }
544
545 let valid_count = validated_records.len();
546 let invalid_count = validation_errors.len();
547
548 if invalid_count > 0 {
549 self.log_with_context(slice_uri, LogLevel::Warn,
550 &format!("Validation completed for {}/{}: {} valid, {} invalid records",
551 repo, collection, valid_count, invalid_count),
552 Some(json!({
553 "repo": repo,
554 "collection": collection,
555 "valid_records": valid_count,
556 "invalid_records": invalid_count,
557 "validation_errors": validation_errors
558 }))
559 );
560 } else {
561 self.log_with_context(
562 slice_uri,
563 LogLevel::Info,
564 &format!(
565 "All {} records validated successfully for {}/{}",
566 valid_count, repo, collection
567 ),
568 Some(json!({
569 "repo": repo,
570 "collection": collection,
571 "valid_records": valid_count
572 })),
573 );
574 }
575
576 info!(
577 "Validated {}/{} records for collection {} from repo {}",
578 validated_records.len(),
579 total_records,
580 collection,
581 repo
582 );
583 } else {
584 // No validator available, accept all records
585 validated_records = records;
586 self.log_with_context(slice_uri, LogLevel::Warn,
587 &format!("No lexicon validator available for collection {}", collection),
588 Some(json!({"collection": collection, "repo": repo, "accepted_records": total_records}))
589 );
590 warn!(
591 "No lexicon validator available - accepting all records without validation for collection {}",
592 collection
593 );
594 }
595
596 // Add to batch buffer instead of all_records
597 batch_buffer.extend(validated_records);
598 successful_tasks += 1;
599 }
600 Err(_) => {
601 failed_tasks += 1;
602 }
603 }
604 }
605 }
606 Err(_) => {
607 // PDS task failed - count all its requests as failed
608 failed_tasks += 1;
609 }
610 }
611
612 // Send batch to writer when buffer is full
613 if batch_buffer.len() >= BATCH_SIZE {
614 let batch_to_send = std::mem::replace(&mut batch_buffer, Vec::with_capacity(BATCH_SIZE));
615 let batch_count = batch_to_send.len() as i64;
616 info!("Sending batch of {} records to database writer", batch_count);
617
618 // Send to writer channel (non-blocking)
619 if let Err(e) = tx.send(batch_to_send).await {
620 error!("Failed to send batch to writer: {}", e);
621 return Err(SyncError::Generic(format!("Failed to send batch to writer: {}", e)));
622 }
623
624 let mut total = total_indexed_records.lock().await;
625 *total += batch_count;
626 }
627 }
628
629 // Flush any remaining records in the buffer
630 if !batch_buffer.is_empty() {
631 let batch_count = batch_buffer.len() as i64;
632 info!("Sending final batch of {} records to database writer", batch_count);
633
634 if let Err(e) = tx.send(batch_buffer).await {
635 error!("Failed to send final batch to writer: {}", e);
636 return Err(SyncError::Generic(format!("Failed to send final batch to writer: {}", e)));
637 }
638
639 let mut total = total_indexed_records.lock().await;
640 *total += batch_count;
641 }
642
643 // Close the channel and wait for writer to finish
644 drop(tx);
645 let write_result = writer_task.await
646 .map_err(|e| SyncError::Generic(format!("Writer task panicked: {}", e)))?;
647
648 let final_count = match write_result {
649 Ok(count) => count,
650 Err(e) => return Err(e),
651 };
652
653 info!(
654 "Debug: {} successful tasks, {} failed tasks",
655 successful_tasks, failed_tasks
656 );
657
658 info!(
659 "Indexed {} new/changed records in batches",
660 final_count
661 );
662
663 info!("Backfill complete!");
664
665 Ok((valid_repos.len() as i64, final_count))
666 }
667
668 /// Fetch all repositories that have records in a given collection.
669 ///
670 /// Uses cursor-based pagination to fetch all repos from the relay.
671 pub async fn get_repos_for_collection(
672 &self,
673 collection: &str,
674 slice_uri: &str,
675 max_repos: Option<i32>,
676 ) -> Result<Vec<String>, SyncError> {
677 let url = format!(
678 "{}/xrpc/com.atproto.sync.listReposByCollection",
679 self.relay_endpoint
680 );
681 let mut all_repos = Vec::new();
682 let mut cursor: Option<String> = None;
683 let mut page_count = 0;
684
685 // AT Protocol docs: default 500 repos/page, max 2000 repos/page
686 // We use 1000 repos/page for efficiency (recommended for large DID lists)
687 const REPOS_PER_PAGE: usize = 1000; // Our configured page size
688
689 // Calculate max pages based on repo limit, with a reasonable safety margin
690 let max_pages = if let Some(limit) = max_repos {
691 // Add 20% safety margin and ensure at least 5 pages
692 ((limit as usize * 120 / 100) / REPOS_PER_PAGE).max(5)
693 } else {
694 25 // Default fallback for unlimited
695 };
696
697 loop {
698 page_count += 1;
699 if page_count > max_pages {
700 warn!(
701 "Reached maximum page limit ({}) for collection {} (based on repo limit {:?}, estimated max {} repos at {} per page)",
702 max_pages, collection, max_repos, max_pages * REPOS_PER_PAGE, REPOS_PER_PAGE
703 );
704 break;
705 }
706
707 let mut query_params = vec![
708 ("collection", collection.to_string()),
709 ("limit", "1000".to_string()),
710 ];
711 if let Some(ref cursor_value) = cursor {
712 query_params.push(("cursor", cursor_value.clone()));
713 }
714
715 let response = self.client.get(&url).query(&query_params).send().await?;
716
717 if !response.status().is_success() {
718 return Err(SyncError::ListRepos {
719 status: response.status().as_u16(),
720 });
721 }
722
723 let repos_response: ListReposByCollectionResponse = response.json().await?;
724
725 // Add repos from this page to our collection
726 all_repos.extend(repos_response.repos.into_iter().map(|r| r.did));
727
728 // Check if there's a next page
729 match repos_response.cursor {
730 Some(next_cursor) if !next_cursor.is_empty() => {
731 cursor = Some(next_cursor);
732 // Log pagination progress if we have a logger
733 self.log_with_context(slice_uri, LogLevel::Info,
734 &format!("Fetching next page of repositories for collection {}, total so far: {}", collection, all_repos.len()),
735 Some(json!({
736 "collection": collection,
737 "repos_count": all_repos.len(),
738 "has_more": true,
739 "page": page_count
740 }))
741 );
742 }
743 _ => break, // No more pages
744 }
745 }
746
747 // Log final count
748 self.log_with_context(
749 slice_uri,
750 LogLevel::Info,
751 &format!(
752 "Completed fetching repositories for collection {}, total: {}",
753 collection,
754 all_repos.len()
755 ),
756 Some(json!({
757 "collection": collection,
758 "total_repos": all_repos.len()
759 })),
760 );
761
762 Ok(all_repos)
763 }
764
765 /// Fetch records for a repo/collection with retry logic.
766 ///
767 /// If the PDS returns an error, invalidates the cached DID resolution and retries once.
768 /// This handles cases where PDS URLs change.
769 async fn fetch_records_for_repo_collection_with_atp_map(
770 &self,
771 repo: &str,
772 collection: &str,
773 atp_map: &std::collections::HashMap<String, AtpData>,
774 slice_uri: &str,
775 ) -> Result<Vec<Record>, SyncError> {
776 let atp_data = atp_map
777 .get(repo)
778 .ok_or_else(|| SyncError::Generic(format!("No ATP data found for repo: {}", repo)))?;
779
780 match self
781 .fetch_records_for_repo_collection(repo, collection, &atp_data.pds, slice_uri)
782 .await
783 {
784 Ok(records) => Ok(records),
785 Err(SyncError::ListRecords { status }) if (400..600).contains(&status) => {
786 // 4xx/5xx error from PDS - try invalidating cache and retrying once
787 debug!(
788 "PDS error {} for repo {}, attempting cache invalidation and retry",
789 status, repo
790 );
791
792 match resolve_actor_data_with_retry(&self.client, repo, self.cache.clone(), true)
793 .await
794 {
795 Ok(fresh_actor_data) => {
796 debug!(
797 "Successfully re-resolved actor data for {}, retrying with PDS: {}",
798 repo, fresh_actor_data.pds
799 );
800 self.fetch_records_for_repo_collection(
801 repo,
802 collection,
803 &fresh_actor_data.pds,
804 slice_uri,
805 )
806 .await
807 }
808 Err(e) => {
809 debug!("Failed to re-resolve actor data for {}: {:?}", repo, e);
810 Err(SyncError::ListRecords { status }) // Return original error
811 }
812 }
813 }
814 Err(e) => Err(e), // Other errors (network, etc.) - don't retry
815 }
816 }
817
818 /// Fetch records for a specific repo and collection from its PDS.
819 ///
820 /// Only returns new or changed records (compared by CID).
821 /// Uses cursor-based pagination to fetch all records.
822 ///
823 /// # Memory optimizations:
824 /// - Pre-allocated Vec with 100 capacity (typical collection size)
825 /// - Fetches in 100-record pages to limit response size
826 /// - Reuses HTTP connections via client pooling
827 async fn fetch_records_for_repo_collection(
828 &self,
829 repo: &str,
830 collection: &str,
831 pds_url: &str,
832 slice_uri: &str,
833 ) -> Result<Vec<Record>, SyncError> {
834 // Get existing record CIDs to skip unchanged records
835 let existing_cids = self
836 .database
837 .get_existing_record_cids_for_slice(repo, collection, slice_uri)
838 .await
839 .map_err(|e| SyncError::Generic(format!("Failed to get existing CIDs: {}", e)))?;
840
841 debug!(
842 "Found {} existing records for {}/{}",
843 existing_cids.len(),
844 repo,
845 collection
846 );
847
848 // Pre-allocate based on typical collection size (100 records)
849 // This avoids Vec reallocations which can cause memory fragmentation
850 let mut records = Vec::with_capacity(100);
851 let mut cursor: Option<String> = None;
852 let mut fetched_count = 0;
853 let mut skipped_count = 0;
854
855 loop {
856 let mut params = vec![("repo", repo), ("collection", collection), ("limit", "100")];
857 if let Some(ref c) = cursor {
858 params.push(("cursor", c));
859 }
860
861 let request_url = format!("{}/xrpc/com.atproto.repo.listRecords", pds_url);
862 let response = self.client.get(&request_url).query(¶ms).send().await;
863
864 let response = match response {
865 Ok(resp) => resp,
866 Err(e) => {
867 self.log_with_context(slice_uri, LogLevel::Error,
868 &format!("Failed to fetch records from {}: Network error: {}", repo, e),
869 Some(json!({"repo": repo, "collection": collection, "pds_url": pds_url, "error": e.to_string()}))
870 );
871 return Err(SyncError::from(e));
872 }
873 };
874
875 if !response.status().is_success() {
876 let status = response.status().as_u16();
877
878 // HTTP 400/404 are expected when collections don't exist - log as info, not error
879 let (log_level, log_message) = if status == 400 || status == 404 {
880 (
881 LogLevel::Info,
882 format!(
883 "Collection '{}' not found for {}: HTTP {}",
884 collection, repo, status
885 ),
886 )
887 } else {
888 (
889 LogLevel::Error,
890 format!(
891 "Failed to fetch records from {}: HTTP {} from PDS",
892 repo, status
893 ),
894 )
895 };
896
897 self.log_with_context(slice_uri, log_level,
898 &log_message,
899 Some(json!({"repo": repo, "collection": collection, "pds_url": pds_url, "http_status": status}))
900 );
901 return Err(SyncError::ListRecords { status });
902 }
903
904 let list_response: ListRecordsResponse = response.json().await?;
905
906 for atproto_record in list_response.records {
907 // Check if we already have this record with the same CID
908 if let Some(existing_cid) = existing_cids.get(&atproto_record.uri)
909 && existing_cid == &atproto_record.cid
910 {
911 // Record unchanged, skip it
912 skipped_count += 1;
913 continue;
914 }
915
916 // Record is new or changed, include it
917 // TODO: Consider using Arc<str> for frequently cloned strings
918 let record = Record {
919 uri: atproto_record.uri,
920 cid: atproto_record.cid,
921 did: repo.to_string(),
922 collection: collection.to_string(),
923 json: atproto_record.value,
924 indexed_at: Utc::now(),
925 slice_uri: Some(slice_uri.to_string()),
926 };
927 records.push(record);
928 fetched_count += 1;
929 }
930
931 cursor = list_response.cursor;
932 if cursor.is_none() {
933 break;
934 }
935 }
936
937 // Log results for this repo/collection
938 if fetched_count > 0 || skipped_count > 0 {
939 self.log_with_context(
940 slice_uri,
941 LogLevel::Info,
942 &format!(
943 "Fetched {} new/changed, skipped {} unchanged records from {}/{}",
944 fetched_count, skipped_count, repo, collection
945 ),
946 Some(json!({
947 "repo": repo,
948 "collection": collection,
949 "new_records": fetched_count,
950 "skipped_records": skipped_count,
951 "pds_url": pds_url
952 })),
953 );
954 }
955
956 if skipped_count > 0 {
957 info!(
958 "Skipped {} unchanged records, fetched {} new/changed records for {}/{}",
959 skipped_count, fetched_count, repo, collection
960 );
961 }
962
963 Ok(records)
964 }
965
966 /// Resolve ATP data (DID, PDS, handle) for multiple repos.
967 ///
968 /// Returns a map of DID -> AtpData. Failed resolutions are logged but don't fail the operation.
969 ///
970 /// # Performance optimizations:
971 /// - Processes DIDs in 50-item chunks to limit memory usage
972 /// - 10 concurrent DNS resolutions max to avoid resolver exhaustion
973 /// - Pre-allocated HashMap based on input size
974 async fn get_atp_map_for_repos(
975 &self,
976 repos: &[String],
977 ) -> Result<std::collections::HashMap<String, AtpData>, SyncError> {
978 let mut atp_map = std::collections::HashMap::with_capacity(repos.len());
979 const CHUNK_SIZE: usize = 50; // Process DIDs in chunks
980 const MAX_CONCURRENT: usize = 10; // Limit concurrent resolutions
981
982 info!("Resolving ATP data for {} repositories in chunks", repos.len());
983
984 for (chunk_idx, chunk) in repos.chunks(CHUNK_SIZE).enumerate() {
985 let chunk_start = chunk_idx * CHUNK_SIZE;
986 let chunk_end = std::cmp::min(chunk_start + CHUNK_SIZE, repos.len());
987
988 debug!(
989 "Processing DID resolution chunk {}/{} (repos {}-{})",
990 chunk_idx + 1,
991 repos.len().div_ceil(CHUNK_SIZE),
992 chunk_start,
993 chunk_end - 1
994 );
995
996 // Process this chunk with limited concurrency
997 let mut resolution_tasks = Vec::new();
998
999 for batch in chunk.chunks(MAX_CONCURRENT) {
1000 let mut batch_futures = Vec::new();
1001
1002 for repo in batch {
1003 let repo_clone = repo.clone();
1004 let self_clone = self.clone();
1005
1006 let fut = async move {
1007 match self_clone.resolve_atp_data(&repo_clone).await {
1008 Ok(atp_data) => Some((atp_data.did.clone(), atp_data)),
1009 Err(e) => {
1010 warn!("Failed to resolve ATP data for {}: {:?}", repo_clone, e);
1011 None
1012 }
1013 }
1014 };
1015 batch_futures.push(fut);
1016 }
1017
1018 // Wait for this batch to complete
1019 let batch_results = future::join_all(batch_futures).await;
1020 resolution_tasks.extend(batch_results);
1021 }
1022
1023 // Add resolved data to map
1024 for (did, atp_data) in resolution_tasks.into_iter().flatten() {
1025 atp_map.insert(did, atp_data);
1026 }
1027
1028 // Small delay between chunks to be kind to DNS resolvers
1029 if chunk_idx < repos.len().div_ceil(CHUNK_SIZE) - 1 {
1030 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1031 }
1032 }
1033
1034 info!("Successfully resolved ATP data for {}/{} repositories", atp_map.len(), repos.len());
1035 Ok(atp_map)
1036 }
1037
1038 /// Resolve ATP data for a single DID.
1039 ///
1040 /// Uses DID resolution to get the DID document, then extracts PDS and handle.
1041 /// Results are cached to avoid repeated lookups.
1042 async fn resolve_atp_data(&self, did: &str) -> Result<AtpData, SyncError> {
1043 debug!("Resolving ATP data for DID: {}", did);
1044
1045 let dns_resolver = HickoryDnsResolver::create_resolver(&[]);
1046
1047 match resolve_subject(&self.client, &dns_resolver, did).await {
1048 Ok(resolved_did) => {
1049 debug!("Successfully resolved subject: {}", resolved_did);
1050
1051 let actor_data =
1052 resolve_actor_data_cached(&self.client, &resolved_did, self.cache.clone())
1053 .await
1054 .map_err(|e| SyncError::Generic(e.to_string()))?;
1055
1056 let atp_data = AtpData {
1057 did: actor_data.did,
1058 pds: actor_data.pds,
1059 handle: actor_data.handle,
1060 };
1061
1062 Ok(atp_data)
1063 }
1064 Err(e) => Err(SyncError::Generic(format!(
1065 "Failed to resolve subject for {}: {:?}",
1066 did, e
1067 ))),
1068 }
1069 }
1070
1071 /// Index actors (DIDs with handles) into the database.
1072 ///
1073 /// Creates actor records for all repos being synced.
1074 async fn index_actors(
1075 &self,
1076 slice_uri: &str,
1077 repos: &[String],
1078 atp_map: &std::collections::HashMap<String, AtpData>,
1079 ) -> Result<(), SyncError> {
1080 let mut actors = Vec::new();
1081 let now = chrono::Utc::now().to_rfc3339();
1082
1083 for repo in repos {
1084 if let Some(atp_data) = atp_map.get(repo) {
1085 actors.push(Actor {
1086 did: atp_data.did.clone(),
1087 handle: atp_data.handle.clone(),
1088 slice_uri: slice_uri.to_string(),
1089 indexed_at: now.clone(),
1090 });
1091 }
1092 }
1093
1094 if !actors.is_empty() {
1095 self.database.batch_insert_actors(&actors).await?;
1096 }
1097
1098 Ok(())
1099 }
1100
1101 /// Get external collections for a slice.
1102 ///
1103 /// External collections are those that don't start with the slice's domain.
1104 /// For example, if slice domain is "com.example", then "app.bsky.feed.post" is external.
1105 async fn get_external_collections_for_slice(
1106 &self,
1107 slice_uri: &str,
1108 ) -> Result<Vec<String>, SyncError> {
1109 // Get the slice's domain
1110 let domain = self
1111 .database
1112 .get_slice_domain(slice_uri)
1113 .await
1114 .map_err(|e| SyncError::Generic(format!("Failed to get slice domain: {}", e)))?
1115 .ok_or_else(|| SyncError::Generic(format!("Slice not found: {}", slice_uri)))?;
1116
1117 // Get all collections (lexicons) for this slice
1118 let collections = self
1119 .database
1120 .get_slice_collections_list(slice_uri)
1121 .await
1122 .map_err(|e| SyncError::Generic(format!("Failed to get slice collections: {}", e)))?;
1123
1124 // Filter for external collections (those that don't start with the slice domain)
1125 let external_collections: Vec<String> = collections
1126 .into_iter()
1127 .filter(|collection| !collection.starts_with(&domain))
1128 .collect();
1129
1130 info!(
1131 "Found {} external collections for slice {} (domain: {}): {:?}",
1132 external_collections.len(),
1133 slice_uri,
1134 domain,
1135 external_collections
1136 );
1137
1138 Ok(external_collections)
1139 }
1140
1141 /// Sync user's data for all external collections defined in the slice.
1142 ///
1143 /// Used during login flows to quickly sync a user's data.
1144 /// Automatically discovers which collections to sync based on slice configuration.
1145 /// Uses timeout protection to ensure responsive login flows.
1146 ///
1147 /// # Arguments
1148 ///
1149 /// * `user_did` - The user's DID to sync
1150 /// * `slice_uri` - The slice to sync into
1151 /// * `timeout_secs` - Maximum seconds to wait before timing out
1152 ///
1153 /// # Returns
1154 ///
1155 /// Result with repos_processed, records_synced, and timeout status
1156 pub async fn sync_user_collections(
1157 &self,
1158 user_did: &str,
1159 slice_uri: &str,
1160 timeout_secs: u64,
1161 ) -> Result<SyncUserCollectionsResult, SyncError> {
1162 info!(
1163 "Auto-discovering external collections for user {} in slice {}",
1164 user_did, slice_uri
1165 );
1166
1167 // Auto-discover external collections from slice configuration
1168 let external_collections = self.get_external_collections_for_slice(slice_uri).await?;
1169
1170 if external_collections.is_empty() {
1171 info!("No external collections found for slice {}", slice_uri);
1172 return Ok(SyncUserCollectionsResult {
1173 success: true,
1174 repos_processed: 0,
1175 records_synced: 0,
1176 timed_out: false,
1177 message: "No external collections to sync".to_string(),
1178 });
1179 }
1180
1181 info!(
1182 "Syncing {} external collections for user {}: {:?}",
1183 external_collections.len(),
1184 user_did,
1185 external_collections
1186 );
1187
1188 // Use backfill_collections with timeout, only syncing this specific user
1189 let sync_future = async {
1190 self.backfill_collections(
1191 slice_uri,
1192 None, // No primary collections for user sync
1193 Some(&external_collections),
1194 Some(&[user_did.to_string()]), // Only sync this user's repos
1195 false, // Always validate user collections
1196 None, // No limit for user-specific sync
1197 )
1198 .await
1199 };
1200
1201 match timeout(Duration::from_secs(timeout_secs), sync_future).await {
1202 Ok(result) => {
1203 let (repos_processed, records_synced) = result?;
1204 info!(
1205 "User sync completed within timeout: {} repos, {} records",
1206 repos_processed, records_synced
1207 );
1208 Ok(SyncUserCollectionsResult {
1209 success: true,
1210 repos_processed,
1211 records_synced,
1212 timed_out: false,
1213 message: format!(
1214 "Sync completed: {} repos, {} records",
1215 repos_processed, records_synced
1216 ),
1217 })
1218 }
1219 Err(_) => {
1220 // Timeout occurred - return partial success with guidance
1221 warn!(
1222 "Sync for user {} timed out after {}s, suggest using async job",
1223 user_did, timeout_secs
1224 );
1225 Ok(SyncUserCollectionsResult {
1226 success: false,
1227 repos_processed: 0,
1228 records_synced: 0,
1229 timed_out: true,
1230 message: format!(
1231 "Sync timed out after {}s - use startSync endpoint for larger syncs",
1232 timeout_secs
1233 ),
1234 })
1235 }
1236 }
1237 }
1238}