forked from
slices.network/slices
Highly ambitious ATProtocol AppView service and sdks
1//! Bulk synchronization operations with the ATProto relay.
2//!
3//! This module handles backfilling and syncing data from the ATProto network via the relay endpoint.
4//! It provides:
5//! - Memory-efficient batch processing with streaming writes
6//! - Concurrent database operations using channel-based architecture
7//! - HTTP/2 connection pooling for optimal network utilization
8//! - Rate-limited PDS requests (3 concurrent per server)
9//! - DID resolution with caching and chunked processing
10//! - Parallel record validation against Lexicon schemas
11//! - Actor indexing with pre-allocated data structures
12
13use atproto_identity::resolve::{HickoryDnsResolver, resolve_subject};
14use chrono::Utc;
15use futures_util::future;
16use reqwest::Client;
17use serde::{Deserialize, Serialize};
18use serde_json::Value;
19use tokio::sync::mpsc;
20use tokio::time::{Duration, timeout};
21use tracing::{debug, error, info, warn};
22
23use crate::actor_resolver::{resolve_actor_data_cached, resolve_actor_data_with_retry};
24use crate::cache::SliceCache;
25use crate::database::Database;
26use crate::errors::SyncError;
27use crate::logging::LogLevel;
28use crate::logging::Logger;
29use crate::models::{Actor, Record};
30use serde_json::json;
31use std::sync::Arc;
32use tokio::sync::Mutex;
33use uuid::Uuid;
34
35// =============================================================================
36// ATProto API Response Types
37// =============================================================================
38
39/// Record returned from ATProto `com.atproto.repo.listRecords` endpoint.
40#[derive(Debug, Deserialize)]
41struct AtProtoRecord {
42 uri: String,
43 cid: String,
44 value: Value,
45}
46
47/// Response from `com.atproto.repo.listRecords` with cursor-based pagination.
48#[derive(Debug, Deserialize)]
49struct ListRecordsResponse {
50 records: Vec<AtProtoRecord>,
51 cursor: Option<String>,
52}
53
54/// Response from `com.atproto.sync.listReposByCollection` with cursor-based pagination.
55#[derive(Debug, Deserialize)]
56struct ListReposByCollectionResponse {
57 repos: Vec<RepoRef>,
58 cursor: Option<String>,
59}
60
61/// Repository reference from the relay (contains DID).
62#[derive(Debug, Deserialize)]
63struct RepoRef {
64 did: String,
65}
66
67// =============================================================================
68// Internal Data Structures
69// =============================================================================
70
71/// Resolved ATProto actor data (DID, PDS, handle).
72#[derive(Debug, Clone)]
73struct AtpData {
74 did: String,
75 pds: String,
76 handle: Option<String>,
77}
78
79// =============================================================================
80// Public API Types
81// =============================================================================
82
83/// Result from syncing user collections (used in login flows).
84#[derive(Debug, Serialize)]
85#[serde(rename_all = "camelCase")]
86pub struct SyncUserCollectionsResult {
87 pub success: bool,
88 pub repos_processed: i64,
89 pub records_synced: i64,
90 pub timed_out: bool,
91 pub message: String,
92}
93
94// =============================================================================
95// Sync Service
96// =============================================================================
97
98/// Service for synchronizing ATProto data from the relay.
99///
100/// Handles bulk backfills, user syncs, rate limiting, and validation.
101#[derive(Clone)]
102pub struct SyncService {
103 client: Client,
104 database: Database,
105 relay_endpoint: String,
106 cache: Option<Arc<Mutex<SliceCache>>>,
107 logger: Option<Logger>,
108 job_id: Option<Uuid>,
109 user_did: Option<String>,
110}
111
112impl SyncService {
113 /// Create a new SyncService with cache (for use outside job contexts).
114 pub fn with_cache(
115 database: Database,
116 relay_endpoint: String,
117 cache: Arc<Mutex<SliceCache>>,
118 ) -> Self {
119 // Create HTTP client with connection pooling and optimized settings
120 let client = Client::builder()
121 .pool_idle_timeout(Duration::from_secs(90))
122 .pool_max_idle_per_host(10)
123 .http2_keep_alive_interval(Some(Duration::from_secs(30)))
124 .http2_keep_alive_timeout(Duration::from_secs(10))
125 .timeout(Duration::from_secs(30))
126 .build()
127 .unwrap_or_else(|_| Client::new());
128
129 Self {
130 client,
131 database,
132 relay_endpoint,
133 cache: Some(cache),
134 logger: None,
135 job_id: None,
136 user_did: None,
137 }
138 }
139
140 /// Create a new SyncService with logging and cache enabled for a specific job
141 pub fn with_logging_and_cache(
142 database: Database,
143 relay_endpoint: String,
144 logger: Logger,
145 job_id: Uuid,
146 user_did: String,
147 cache: Arc<Mutex<SliceCache>>,
148 ) -> Self {
149 // Create HTTP client with connection pooling and optimized settings
150 let client = Client::builder()
151 .pool_idle_timeout(Duration::from_secs(90))
152 .pool_max_idle_per_host(10)
153 .http2_keep_alive_interval(Some(Duration::from_secs(30)))
154 .http2_keep_alive_timeout(Duration::from_secs(10))
155 .timeout(Duration::from_secs(30))
156 .build()
157 .unwrap_or_else(|_| Client::new());
158
159 Self {
160 client,
161 database,
162 relay_endpoint,
163 cache: Some(cache),
164 logger: Some(logger),
165 job_id: Some(job_id),
166 user_did: Some(user_did),
167 }
168 }
169
170 /// Log a message with job context (job_id, user_did, slice_uri).
171 /// Only logs if this service was created with logging enabled.
172 fn log_with_context(
173 &self,
174 slice_uri: &str,
175 level: LogLevel,
176 message: &str,
177 metadata: Option<serde_json::Value>,
178 ) {
179 if let (Some(logger), Some(job_id), Some(user_did)) =
180 (&self.logger, &self.job_id, &self.user_did)
181 {
182 logger.log_sync_job(*job_id, user_did, slice_uri, level, message, metadata);
183 }
184 }
185
186 /// Backfill collections from the ATProto relay.
187 ///
188 /// This is the main entry point for bulk synchronization operations.
189 ///
190 /// # Arguments
191 ///
192 /// * `slice_uri` - The slice to backfill data into
193 /// * `collections` - Primary collections (owned by slice domain) to backfill
194 /// * `external_collections` - External collections (not owned by slice) to backfill
195 /// * `repos` - Specific repos to sync (if None, fetches all repos for collections)
196 /// * `skip_validation` - Skip Lexicon validation (useful for testing)
197 ///
198 /// # Returns
199 ///
200 /// Tuple of (repos_processed, records_synced)
201 ///
202 /// # Performance Optimizations
203 ///
204 /// - Requests are grouped by PDS server with 3 concurrent requests max
205 /// - Records are processed in 500-item batches to limit memory usage
206 /// - Database writes happen concurrently via channels
207 /// - HTTP/2 connection pooling reduces network overhead
208 pub async fn backfill_collections(
209 &self,
210 slice_uri: &str,
211 collections: Option<&[String]>,
212 external_collections: Option<&[String]>,
213 repos: Option<&[String]>,
214 skip_validation: bool,
215 max_repos: Option<i32>,
216 ) -> Result<(i64, i64), SyncError> {
217 info!("Starting backfill operation");
218
219 let primary_collections = collections.map(|c| c.to_vec()).unwrap_or_default();
220 let external_collections = external_collections.map(|c| c.to_vec()).unwrap_or_default();
221
222 if !primary_collections.is_empty() {
223 info!(
224 "Processing {} primary collections: {}",
225 primary_collections.len(),
226 primary_collections.join(", ")
227 );
228 }
229
230 if !external_collections.is_empty() {
231 info!(
232 "Processing {} external collections: {}",
233 external_collections.len(),
234 external_collections.join(", ")
235 );
236 }
237
238 if primary_collections.is_empty() && external_collections.is_empty() {
239 info!("No collections specified for backfill");
240 return Ok((0, 0));
241 }
242
243 let all_collections = [&primary_collections[..], &external_collections[..]].concat();
244
245 // Fetch repos to process (either provided or discovered from collections)
246 let all_repos = if let Some(provided_repos) = repos {
247 info!("Using {} provided repositories", provided_repos.len());
248 provided_repos.to_vec()
249 } else {
250 info!("Fetching repositories for collections...");
251 let mut unique_repos = std::collections::HashSet::new();
252
253 // First, get all repos from primary collections
254 let mut primary_repos = std::collections::HashSet::new();
255 for collection in &primary_collections {
256 match self
257 .get_repos_for_collection(collection, slice_uri, max_repos)
258 .await
259 {
260 Ok(repos) => {
261 info!(
262 "Found {} repositories for primary collection \"{}\"",
263 repos.len(),
264 collection
265 );
266 self.log_with_context(
267 slice_uri,
268 LogLevel::Info,
269 &format!(
270 "Found {} repositories for collection '{}'",
271 repos.len(),
272 collection
273 ),
274 Some(json!({"collection": collection, "repo_count": repos.len()})),
275 );
276 primary_repos.extend(repos);
277 }
278 Err(e) => {
279 error!(
280 "Failed to get repos for primary collection {}: {}",
281 collection, e
282 );
283 self.log_with_context(
284 slice_uri,
285 LogLevel::Error,
286 &format!(
287 "Failed to fetch repositories for collection '{}': {}",
288 collection, e
289 ),
290 Some(json!({"collection": collection, "error": e.to_string()})),
291 );
292 }
293 }
294 }
295
296 info!(
297 "Found {} unique repositories from primary collections",
298 primary_repos.len()
299 );
300
301 // Use primary repos for syncing (both primary and external collections)
302 unique_repos.extend(primary_repos);
303
304 let repos: Vec<String> = unique_repos.into_iter().collect();
305 info!("Processing {} unique repositories", repos.len());
306 repos
307 };
308
309 // Resolve DID -> PDS/handle mappings for all repos
310 info!("Resolving ATP data for repositories...");
311 let atp_map = self.get_atp_map_for_repos(&all_repos).await?;
312 info!(
313 "Resolved ATP data for {}/{} repositories",
314 atp_map.len(),
315 all_repos.len()
316 );
317
318 // Only sync repos that successfully resolved
319 let valid_repos: Vec<String> = atp_map.keys().cloned().collect();
320 let failed_resolutions = all_repos.len() - valid_repos.len();
321
322 if failed_resolutions > 0 {
323 info!(
324 "{} repositories failed DID resolution and will be skipped",
325 failed_resolutions
326 );
327 }
328
329 info!("Starting sync for {} repositories...", valid_repos.len());
330
331 // Group requests by PDS server for rate limiting and connection reuse
332 // Pre-allocated capacity avoids HashMap resizing during insertions
333 let mut requests_by_pds: std::collections::HashMap<String, Vec<(String, String)>> =
334 std::collections::HashMap::with_capacity(atp_map.len());
335
336 for repo in &valid_repos {
337 if let Some(atp_data) = atp_map.get(repo) {
338 let pds_url = atp_data.pds.clone();
339 for collection in &all_collections {
340 requests_by_pds
341 .entry(pds_url.clone())
342 .or_default()
343 .push((repo.clone(), collection.clone()));
344 }
345 }
346 }
347
348 info!(
349 "Fetching records with rate limiting: {} PDS servers, {} total requests",
350 requests_by_pds.len(),
351 requests_by_pds.values().map(|v| v.len()).sum::<usize>()
352 );
353
354 // Process each PDS server with limited concurrency
355 // 3 concurrent requests balances speed vs memory usage
356 // Lower than 3 = too slow, Higher than 3 = memory pressure
357 let mut fetch_tasks = Vec::new();
358 const MAX_CONCURRENT_PER_PDS: usize = 3;
359
360 for (_pds_url, repo_collections) in requests_by_pds {
361 let sync_service = self.clone();
362 let atp_map_clone = atp_map.clone();
363 let slice_uri_clone = slice_uri.to_string();
364
365 // Process this PDS server's requests in chunks
366 let pds_task = tokio::spawn(async move {
367 let mut pds_results = Vec::new();
368
369 // Split requests into chunks and process with limited concurrency
370 for chunk in repo_collections.chunks(MAX_CONCURRENT_PER_PDS) {
371 let mut chunk_tasks = Vec::new();
372
373 for (repo, collection) in chunk {
374 let repo_clone = repo.clone();
375 let collection_clone = collection.clone();
376 let sync_service_clone = sync_service.clone();
377 let atp_map_inner = atp_map_clone.clone();
378 let slice_uri_inner = slice_uri_clone.clone();
379
380 let task = tokio::spawn(async move {
381 match sync_service_clone
382 .fetch_records_for_repo_collection_with_atp_map(
383 &repo_clone,
384 &collection_clone,
385 &atp_map_inner,
386 &slice_uri_inner,
387 )
388 .await
389 {
390 Ok(records) => Ok((repo_clone, collection_clone, records)),
391 Err(e) => {
392 // Handle common "not error" scenarios as empty results
393 match &e {
394 SyncError::ListRecords { status } => {
395 if *status == 404 || *status == 400 {
396 // Collection doesn't exist for this repo - return empty
397 Ok((repo_clone, collection_clone, vec![]))
398 } else {
399 Err(e)
400 }
401 }
402 SyncError::HttpRequest(_) => {
403 // Network errors - treat as empty (like TypeScript version)
404 Ok((repo_clone, collection_clone, vec![]))
405 }
406 _ => Err(e),
407 }
408 }
409 }
410 });
411 chunk_tasks.push(task);
412 }
413
414 // Wait for this chunk to complete before starting the next
415 for task in chunk_tasks {
416 if let Ok(result) = task.await {
417 pds_results.push(result);
418 }
419 }
420
421 // Small delay between chunks to be kind to PDS servers
422 if chunk.len() == MAX_CONCURRENT_PER_PDS {
423 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
424 }
425 }
426
427 pds_results
428 });
429
430 fetch_tasks.push(pds_task);
431 }
432
433 // Collect all results
434 let mut successful_tasks = 0;
435 let mut failed_tasks = 0;
436
437 // Get lexicons for this slice
438 let lexicons = match self.database.get_lexicons_by_slice(slice_uri).await {
439 Ok(lexicons) if !lexicons.is_empty() => Some(lexicons),
440 Ok(_) => {
441 warn!("No lexicons found for slice {}", slice_uri);
442 None
443 }
444 Err(e) => {
445 warn!("Failed to get lexicons for slice {}: {}", slice_uri, e);
446 None
447 }
448 };
449
450 // Index actors first (ensuring actor records exist before inserting records)
451 info!("Indexing actors...");
452 self.index_actors(slice_uri, &valid_repos, &atp_map).await?;
453 info!("Indexed {} actors", valid_repos.len());
454
455 // Set up concurrent database writer using channels
456 // This allows fetching to continue while DB writes happen in parallel
457 // 500-record batches optimize for memory usage and DB transaction size
458 const BATCH_SIZE: usize = 500;
459 let (tx, mut rx) = mpsc::channel::<Vec<Record>>(4); // Buffer prevents backpressure
460 let database = self.database.clone();
461 let total_indexed_records = Arc::new(Mutex::new(0i64));
462
463 // Spawn database writer task
464 let writer_task = tokio::spawn(async move {
465 let mut write_count = 0i64;
466 while let Some(batch) = rx.recv().await {
467 let batch_size = batch.len() as i64;
468 match database.batch_insert_records(&batch).await {
469 Ok(_) => {
470 write_count += batch_size;
471 info!(
472 "Database writer: Inserted batch of {} records (total: {})",
473 batch_size, write_count
474 );
475 }
476 Err(e) => {
477 error!("Database writer: Failed to insert batch: {}", e);
478 return Err(SyncError::Generic(format!("Failed to insert batch: {}", e)));
479 }
480 }
481 }
482 Ok(write_count)
483 });
484
485 // Process results from each PDS server
486 let mut batch_buffer = Vec::with_capacity(BATCH_SIZE);
487
488 for pds_task in fetch_tasks {
489 match pds_task.await {
490 Ok(pds_results) => {
491 // Process each result from this PDS
492 for result in pds_results {
493 match result {
494 Ok((repo, collection, records)) => {
495 let mut validated_records = Vec::new();
496 let total_records = records.len();
497
498 // Skip validation if requested
499 if skip_validation {
500 validated_records = records;
501 info!(
502 "Validation skipped - accepting all {} records for collection {} from repo {}",
503 total_records, collection, repo
504 );
505 }
506 // Validate each record if we have lexicons
507 else if let Some(ref lexicons) = lexicons {
508 let mut validation_errors = Vec::new();
509
510 // Process validations in chunks for better CPU cache locality
511 // 50 records per chunk optimizes L2/L3 cache usage
512 const VALIDATION_CHUNK_SIZE: usize = 50;
513 for chunk in records.chunks(VALIDATION_CHUNK_SIZE) {
514 for record in chunk {
515 match slices_lexicon::validate_record(
516 lexicons.clone(),
517 &collection,
518 record.json.clone(),
519 ) {
520 Ok(_) => {
521 validated_records.push(record.clone());
522 }
523 Err(e) => {
524 let error_msg = format!(
525 "Validation failed for record {} from {}: {}",
526 record.uri, repo, e
527 );
528 warn!("{}", error_msg);
529 validation_errors.push(json!({
530 "uri": record.uri,
531 "error": e.to_string()
532 }));
533
534 // Log individual validation failures
535 self.log_with_context(
536 slice_uri,
537 LogLevel::Warn,
538 &error_msg,
539 Some(json!({
540 "repo": repo,
541 "collection": collection,
542 "record_uri": record.uri,
543 "validation_error": e.to_string()
544 })),
545 );
546 }
547 }
548 }
549 }
550
551 let valid_count = validated_records.len();
552 let invalid_count = validation_errors.len();
553
554 if invalid_count > 0 {
555 self.log_with_context(slice_uri, LogLevel::Warn,
556 &format!("Validation completed for {}/{}: {} valid, {} invalid records",
557 repo, collection, valid_count, invalid_count),
558 Some(json!({
559 "repo": repo,
560 "collection": collection,
561 "valid_records": valid_count,
562 "invalid_records": invalid_count,
563 "validation_errors": validation_errors
564 }))
565 );
566 } else {
567 self.log_with_context(
568 slice_uri,
569 LogLevel::Info,
570 &format!(
571 "All {} records validated successfully for {}/{}",
572 valid_count, repo, collection
573 ),
574 Some(json!({
575 "repo": repo,
576 "collection": collection,
577 "valid_records": valid_count
578 })),
579 );
580 }
581
582 info!(
583 "Validated {}/{} records for collection {} from repo {}",
584 validated_records.len(),
585 total_records,
586 collection,
587 repo
588 );
589 } else {
590 // No validator available, accept all records
591 validated_records = records;
592 self.log_with_context(slice_uri, LogLevel::Warn,
593 &format!("No lexicon validator available for collection {}", collection),
594 Some(json!({"collection": collection, "repo": repo, "accepted_records": total_records}))
595 );
596 warn!(
597 "No lexicon validator available - accepting all records without validation for collection {}",
598 collection
599 );
600 }
601
602 // Add to batch buffer instead of all_records
603 batch_buffer.extend(validated_records);
604 successful_tasks += 1;
605 }
606 Err(_) => {
607 failed_tasks += 1;
608 }
609 }
610 }
611 }
612 Err(_) => {
613 // PDS task failed - count all its requests as failed
614 failed_tasks += 1;
615 }
616 }
617
618 // Send batch to writer when buffer is full
619 if batch_buffer.len() >= BATCH_SIZE {
620 let batch_to_send =
621 std::mem::replace(&mut batch_buffer, Vec::with_capacity(BATCH_SIZE));
622 let batch_count = batch_to_send.len() as i64;
623 info!(
624 "Sending batch of {} records to database writer",
625 batch_count
626 );
627
628 // Send to writer channel (non-blocking)
629 if let Err(e) = tx.send(batch_to_send).await {
630 error!("Failed to send batch to writer: {}", e);
631 return Err(SyncError::Generic(format!(
632 "Failed to send batch to writer: {}",
633 e
634 )));
635 }
636
637 let mut total = total_indexed_records.lock().await;
638 *total += batch_count;
639 }
640 }
641
642 // Flush any remaining records in the buffer
643 if !batch_buffer.is_empty() {
644 let batch_count = batch_buffer.len() as i64;
645 info!(
646 "Sending final batch of {} records to database writer",
647 batch_count
648 );
649
650 if let Err(e) = tx.send(batch_buffer).await {
651 error!("Failed to send final batch to writer: {}", e);
652 return Err(SyncError::Generic(format!(
653 "Failed to send final batch to writer: {}",
654 e
655 )));
656 }
657
658 let mut total = total_indexed_records.lock().await;
659 *total += batch_count;
660 }
661
662 // Close the channel and wait for writer to finish
663 drop(tx);
664 let write_result = writer_task
665 .await
666 .map_err(|e| SyncError::Generic(format!("Writer task panicked: {}", e)))?;
667
668 let final_count = match write_result {
669 Ok(count) => count,
670 Err(e) => return Err(e),
671 };
672
673 info!(
674 "Debug: {} successful tasks, {} failed tasks",
675 successful_tasks, failed_tasks
676 );
677
678 info!("Indexed {} new/changed records in batches", final_count);
679
680 info!("Backfill complete!");
681
682 Ok((valid_repos.len() as i64, final_count))
683 }
684
685 /// Fetch all repositories that have records in a given collection.
686 ///
687 /// Uses cursor-based pagination to fetch all repos from the relay.
688 pub async fn get_repos_for_collection(
689 &self,
690 collection: &str,
691 slice_uri: &str,
692 max_repos: Option<i32>,
693 ) -> Result<Vec<String>, SyncError> {
694 let url = format!(
695 "{}/xrpc/com.atproto.sync.listReposByCollection",
696 self.relay_endpoint
697 );
698 let mut all_repos = Vec::new();
699 let mut cursor: Option<String> = None;
700 let mut page_count = 0;
701
702 // AT Protocol docs: default 500 repos/page, max 2000 repos/page
703 // We use 1000 repos/page for efficiency (recommended for large DID lists)
704 const REPOS_PER_PAGE: usize = 1000; // Our configured page size
705
706 // Calculate max pages based on repo limit, with a reasonable safety margin
707 let max_pages = if let Some(limit) = max_repos {
708 // Add 20% safety margin and ensure at least 5 pages
709 ((limit as usize * 120 / 100) / REPOS_PER_PAGE).max(5)
710 } else {
711 25 // Default fallback for unlimited
712 };
713
714 loop {
715 page_count += 1;
716 if page_count > max_pages {
717 warn!(
718 "Reached maximum page limit ({}) for collection {} (based on repo limit {:?}, estimated max {} repos at {} per page)",
719 max_pages,
720 collection,
721 max_repos,
722 max_pages * REPOS_PER_PAGE,
723 REPOS_PER_PAGE
724 );
725 break;
726 }
727
728 let mut query_params = vec![
729 ("collection", collection.to_string()),
730 ("limit", "1000".to_string()),
731 ];
732 if let Some(ref cursor_value) = cursor {
733 query_params.push(("cursor", cursor_value.clone()));
734 }
735
736 let response = self.client.get(&url).query(&query_params).send().await?;
737
738 if !response.status().is_success() {
739 return Err(SyncError::ListRepos {
740 status: response.status().as_u16(),
741 });
742 }
743
744 let repos_response: ListReposByCollectionResponse = response.json().await?;
745
746 // Add repos from this page to our collection
747 all_repos.extend(repos_response.repos.into_iter().map(|r| r.did));
748
749 // Check if there's a next page
750 match repos_response.cursor {
751 Some(next_cursor) if !next_cursor.is_empty() => {
752 cursor = Some(next_cursor);
753 // Log pagination progress if we have a logger
754 self.log_with_context(slice_uri, LogLevel::Info,
755 &format!("Fetching next page of repositories for collection {}, total so far: {}", collection, all_repos.len()),
756 Some(json!({
757 "collection": collection,
758 "repos_count": all_repos.len(),
759 "has_more": true,
760 "page": page_count
761 }))
762 );
763 }
764 _ => break, // No more pages
765 }
766 }
767
768 // Log final count
769 self.log_with_context(
770 slice_uri,
771 LogLevel::Info,
772 &format!(
773 "Completed fetching repositories for collection {}, total: {}",
774 collection,
775 all_repos.len()
776 ),
777 Some(json!({
778 "collection": collection,
779 "total_repos": all_repos.len()
780 })),
781 );
782
783 Ok(all_repos)
784 }
785
786 /// Fetch records for a repo/collection with retry logic.
787 ///
788 /// If the PDS returns an error, invalidates the cached DID resolution and retries once.
789 /// This handles cases where PDS URLs change.
790 async fn fetch_records_for_repo_collection_with_atp_map(
791 &self,
792 repo: &str,
793 collection: &str,
794 atp_map: &std::collections::HashMap<String, AtpData>,
795 slice_uri: &str,
796 ) -> Result<Vec<Record>, SyncError> {
797 let atp_data = atp_map
798 .get(repo)
799 .ok_or_else(|| SyncError::Generic(format!("No ATP data found for repo: {}", repo)))?;
800
801 match self
802 .fetch_records_for_repo_collection(repo, collection, &atp_data.pds, slice_uri)
803 .await
804 {
805 Ok(records) => Ok(records),
806 Err(SyncError::ListRecords { status }) if (400..600).contains(&status) => {
807 // 4xx/5xx error from PDS - try invalidating cache and retrying once
808 debug!(
809 "PDS error {} for repo {}, attempting cache invalidation and retry",
810 status, repo
811 );
812
813 match resolve_actor_data_with_retry(&self.client, repo, self.cache.clone(), true)
814 .await
815 {
816 Ok(fresh_actor_data) => {
817 debug!(
818 "Successfully re-resolved actor data for {}, retrying with PDS: {}",
819 repo, fresh_actor_data.pds
820 );
821 self.fetch_records_for_repo_collection(
822 repo,
823 collection,
824 &fresh_actor_data.pds,
825 slice_uri,
826 )
827 .await
828 }
829 Err(e) => {
830 debug!("Failed to re-resolve actor data for {}: {:?}", repo, e);
831 Err(SyncError::ListRecords { status }) // Return original error
832 }
833 }
834 }
835 Err(e) => Err(e), // Other errors (network, etc.) - don't retry
836 }
837 }
838
839 /// Fetch records for a specific repo and collection from its PDS.
840 ///
841 /// Only returns new or changed records (compared by CID).
842 /// Uses cursor-based pagination to fetch all records.
843 ///
844 /// # Memory optimizations:
845 /// - Pre-allocated Vec with 100 capacity (typical collection size)
846 /// - Fetches in 100-record pages to limit response size
847 /// - Reuses HTTP connections via client pooling
848 async fn fetch_records_for_repo_collection(
849 &self,
850 repo: &str,
851 collection: &str,
852 pds_url: &str,
853 slice_uri: &str,
854 ) -> Result<Vec<Record>, SyncError> {
855 // Get existing record CIDs to skip unchanged records
856 let existing_cids = self
857 .database
858 .get_existing_record_cids_for_slice(repo, collection, slice_uri)
859 .await
860 .map_err(|e| SyncError::Generic(format!("Failed to get existing CIDs: {}", e)))?;
861
862 debug!(
863 "Found {} existing records for {}/{}",
864 existing_cids.len(),
865 repo,
866 collection
867 );
868
869 // Pre-allocate based on typical collection size (100 records)
870 // This avoids Vec reallocations which can cause memory fragmentation
871 let mut records = Vec::with_capacity(100);
872 let mut cursor: Option<String> = None;
873 let mut fetched_count = 0;
874 let mut skipped_count = 0;
875
876 loop {
877 let mut params = vec![("repo", repo), ("collection", collection), ("limit", "100")];
878 if let Some(ref c) = cursor {
879 params.push(("cursor", c));
880 }
881
882 let request_url = format!("{}/xrpc/com.atproto.repo.listRecords", pds_url);
883 let response = self.client.get(&request_url).query(¶ms).send().await;
884
885 let response = match response {
886 Ok(resp) => resp,
887 Err(e) => {
888 self.log_with_context(slice_uri, LogLevel::Error,
889 &format!("Failed to fetch records from {}: Network error: {}", repo, e),
890 Some(json!({"repo": repo, "collection": collection, "pds_url": pds_url, "error": e.to_string()}))
891 );
892 return Err(SyncError::from(e));
893 }
894 };
895
896 if !response.status().is_success() {
897 let status = response.status().as_u16();
898
899 // HTTP 400/404 are expected when collections don't exist - log as info, not error
900 let (log_level, log_message) = if status == 400 || status == 404 {
901 (
902 LogLevel::Info,
903 format!(
904 "Collection '{}' not found for {}: HTTP {}",
905 collection, repo, status
906 ),
907 )
908 } else {
909 (
910 LogLevel::Error,
911 format!(
912 "Failed to fetch records from {}: HTTP {} from PDS",
913 repo, status
914 ),
915 )
916 };
917
918 self.log_with_context(slice_uri, log_level,
919 &log_message,
920 Some(json!({"repo": repo, "collection": collection, "pds_url": pds_url, "http_status": status}))
921 );
922 return Err(SyncError::ListRecords { status });
923 }
924
925 let list_response: ListRecordsResponse = response.json().await?;
926
927 for atproto_record in list_response.records {
928 // Check if we already have this record with the same CID
929 if let Some(existing_cid) = existing_cids.get(&atproto_record.uri)
930 && existing_cid == &atproto_record.cid
931 {
932 // Record unchanged, skip it
933 skipped_count += 1;
934 continue;
935 }
936
937 // Record is new or changed, include it
938 // TODO: Consider using Arc<str> for frequently cloned strings
939 let record = Record {
940 uri: atproto_record.uri,
941 cid: atproto_record.cid,
942 did: repo.to_string(),
943 collection: collection.to_string(),
944 json: atproto_record.value,
945 indexed_at: Utc::now(),
946 slice_uri: Some(slice_uri.to_string()),
947 };
948 records.push(record);
949 fetched_count += 1;
950 }
951
952 cursor = list_response.cursor;
953 if cursor.is_none() {
954 break;
955 }
956 }
957
958 // Log results for this repo/collection
959 if fetched_count > 0 || skipped_count > 0 {
960 self.log_with_context(
961 slice_uri,
962 LogLevel::Info,
963 &format!(
964 "Fetched {} new/changed, skipped {} unchanged records from {}/{}",
965 fetched_count, skipped_count, repo, collection
966 ),
967 Some(json!({
968 "repo": repo,
969 "collection": collection,
970 "new_records": fetched_count,
971 "skipped_records": skipped_count,
972 "pds_url": pds_url
973 })),
974 );
975 }
976
977 if skipped_count > 0 {
978 info!(
979 "Skipped {} unchanged records, fetched {} new/changed records for {}/{}",
980 skipped_count, fetched_count, repo, collection
981 );
982 }
983
984 Ok(records)
985 }
986
987 /// Resolve ATP data (DID, PDS, handle) for multiple repos.
988 ///
989 /// Returns a map of DID -> AtpData. Failed resolutions are logged but don't fail the operation.
990 ///
991 /// # Performance optimizations:
992 /// - Processes DIDs in 50-item chunks to limit memory usage
993 /// - 10 concurrent DNS resolutions max to avoid resolver exhaustion
994 /// - Pre-allocated HashMap based on input size
995 async fn get_atp_map_for_repos(
996 &self,
997 repos: &[String],
998 ) -> Result<std::collections::HashMap<String, AtpData>, SyncError> {
999 let mut atp_map = std::collections::HashMap::with_capacity(repos.len());
1000 const CHUNK_SIZE: usize = 50; // Process DIDs in chunks
1001 const MAX_CONCURRENT: usize = 10; // Limit concurrent resolutions
1002
1003 info!(
1004 "Resolving ATP data for {} repositories in chunks",
1005 repos.len()
1006 );
1007
1008 for (chunk_idx, chunk) in repos.chunks(CHUNK_SIZE).enumerate() {
1009 let chunk_start = chunk_idx * CHUNK_SIZE;
1010 let chunk_end = std::cmp::min(chunk_start + CHUNK_SIZE, repos.len());
1011
1012 debug!(
1013 "Processing DID resolution chunk {}/{} (repos {}-{})",
1014 chunk_idx + 1,
1015 repos.len().div_ceil(CHUNK_SIZE),
1016 chunk_start,
1017 chunk_end - 1
1018 );
1019
1020 // Process this chunk with limited concurrency
1021 let mut resolution_tasks = Vec::new();
1022
1023 for batch in chunk.chunks(MAX_CONCURRENT) {
1024 let mut batch_futures = Vec::new();
1025
1026 for repo in batch {
1027 let repo_clone = repo.clone();
1028 let self_clone = self.clone();
1029
1030 let fut = async move {
1031 match self_clone.resolve_atp_data(&repo_clone).await {
1032 Ok(atp_data) => Some((atp_data.did.clone(), atp_data)),
1033 Err(e) => {
1034 warn!("Failed to resolve ATP data for {}: {:?}", repo_clone, e);
1035 None
1036 }
1037 }
1038 };
1039 batch_futures.push(fut);
1040 }
1041
1042 // Wait for this batch to complete
1043 let batch_results = future::join_all(batch_futures).await;
1044 resolution_tasks.extend(batch_results);
1045 }
1046
1047 // Add resolved data to map
1048 for (did, atp_data) in resolution_tasks.into_iter().flatten() {
1049 atp_map.insert(did, atp_data);
1050 }
1051
1052 // Small delay between chunks to be kind to DNS resolvers
1053 if chunk_idx < repos.len().div_ceil(CHUNK_SIZE) - 1 {
1054 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1055 }
1056 }
1057
1058 info!(
1059 "Successfully resolved ATP data for {}/{} repositories",
1060 atp_map.len(),
1061 repos.len()
1062 );
1063 Ok(atp_map)
1064 }
1065
1066 /// Resolve ATP data for a single DID.
1067 ///
1068 /// Uses DID resolution to get the DID document, then extracts PDS and handle.
1069 /// Results are cached to avoid repeated lookups.
1070 async fn resolve_atp_data(&self, did: &str) -> Result<AtpData, SyncError> {
1071 debug!("Resolving ATP data for DID: {}", did);
1072
1073 let dns_resolver = HickoryDnsResolver::create_resolver(&[]);
1074
1075 match resolve_subject(&self.client, &dns_resolver, did).await {
1076 Ok(resolved_did) => {
1077 debug!("Successfully resolved subject: {}", resolved_did);
1078
1079 let actor_data =
1080 resolve_actor_data_cached(&self.client, &resolved_did, self.cache.clone())
1081 .await
1082 .map_err(|e| SyncError::Generic(e.to_string()))?;
1083
1084 let atp_data = AtpData {
1085 did: actor_data.did,
1086 pds: actor_data.pds,
1087 handle: actor_data.handle,
1088 };
1089
1090 Ok(atp_data)
1091 }
1092 Err(e) => Err(SyncError::Generic(format!(
1093 "Failed to resolve subject for {}: {:?}",
1094 did, e
1095 ))),
1096 }
1097 }
1098
1099 /// Index actors (DIDs with handles) into the database.
1100 ///
1101 /// Creates actor records for all repos being synced.
1102 async fn index_actors(
1103 &self,
1104 slice_uri: &str,
1105 repos: &[String],
1106 atp_map: &std::collections::HashMap<String, AtpData>,
1107 ) -> Result<(), SyncError> {
1108 let mut actors = Vec::new();
1109 let now = chrono::Utc::now().to_rfc3339();
1110
1111 for repo in repos {
1112 if let Some(atp_data) = atp_map.get(repo) {
1113 actors.push(Actor {
1114 did: atp_data.did.clone(),
1115 handle: atp_data.handle.clone(),
1116 slice_uri: slice_uri.to_string(),
1117 indexed_at: now.clone(),
1118 });
1119 }
1120 }
1121
1122 if !actors.is_empty() {
1123 self.database.batch_insert_actors(&actors).await?;
1124 }
1125
1126 Ok(())
1127 }
1128
1129 /// Get external collections for a slice.
1130 ///
1131 /// External collections are those that don't start with the slice's domain.
1132 /// For example, if slice domain is "com.example", then "app.bsky.feed.post" is external.
1133 async fn get_external_collections_for_slice(
1134 &self,
1135 slice_uri: &str,
1136 ) -> Result<Vec<String>, SyncError> {
1137 // Get the slice's domain
1138 let domain = self
1139 .database
1140 .get_slice_domain(slice_uri)
1141 .await
1142 .map_err(|e| SyncError::Generic(format!("Failed to get slice domain: {}", e)))?
1143 .ok_or_else(|| SyncError::Generic(format!("Slice not found: {}", slice_uri)))?;
1144
1145 // Get all collections (lexicons) for this slice
1146 let collections = self
1147 .database
1148 .get_slice_collections_list(slice_uri)
1149 .await
1150 .map_err(|e| SyncError::Generic(format!("Failed to get slice collections: {}", e)))?;
1151
1152 // Filter for external collections (those that don't start with the slice domain)
1153 let external_collections: Vec<String> = collections
1154 .into_iter()
1155 .filter(|collection| !collection.starts_with(&domain))
1156 .collect();
1157
1158 info!(
1159 "Found {} external collections for slice {} (domain: {}): {:?}",
1160 external_collections.len(),
1161 slice_uri,
1162 domain,
1163 external_collections
1164 );
1165
1166 Ok(external_collections)
1167 }
1168
1169 /// Sync user's data for all external collections defined in the slice.
1170 ///
1171 /// Used during login flows to quickly sync a user's data.
1172 /// Automatically discovers which collections to sync based on slice configuration.
1173 /// Uses timeout protection to ensure responsive login flows.
1174 ///
1175 /// # Arguments
1176 ///
1177 /// * `user_did` - The user's DID to sync
1178 /// * `slice_uri` - The slice to sync into
1179 /// * `timeout_secs` - Maximum seconds to wait before timing out
1180 ///
1181 /// # Returns
1182 ///
1183 /// Result with repos_processed, records_synced, and timeout status
1184 pub async fn sync_user_collections(
1185 &self,
1186 user_did: &str,
1187 slice_uri: &str,
1188 timeout_secs: u64,
1189 ) -> Result<SyncUserCollectionsResult, SyncError> {
1190 info!(
1191 "Auto-discovering external collections for user {} in slice {}",
1192 user_did, slice_uri
1193 );
1194
1195 // Auto-discover external collections from slice configuration
1196 let external_collections = self.get_external_collections_for_slice(slice_uri).await?;
1197
1198 if external_collections.is_empty() {
1199 info!("No external collections found for slice {}", slice_uri);
1200 return Ok(SyncUserCollectionsResult {
1201 success: true,
1202 repos_processed: 0,
1203 records_synced: 0,
1204 timed_out: false,
1205 message: "No external collections to sync".to_string(),
1206 });
1207 }
1208
1209 info!(
1210 "Syncing {} external collections for user {}: {:?}",
1211 external_collections.len(),
1212 user_did,
1213 external_collections
1214 );
1215
1216 // Use backfill_collections with timeout, only syncing this specific user
1217 let sync_future = async {
1218 self.backfill_collections(
1219 slice_uri,
1220 None, // No primary collections for user sync
1221 Some(&external_collections),
1222 Some(&[user_did.to_string()]), // Only sync this user's repos
1223 false, // Always validate user collections
1224 None, // No limit for user-specific sync
1225 )
1226 .await
1227 };
1228
1229 match timeout(Duration::from_secs(timeout_secs), sync_future).await {
1230 Ok(result) => {
1231 let (repos_processed, records_synced) = result?;
1232 info!(
1233 "User sync completed within timeout: {} repos, {} records",
1234 repos_processed, records_synced
1235 );
1236 Ok(SyncUserCollectionsResult {
1237 success: true,
1238 repos_processed,
1239 records_synced,
1240 timed_out: false,
1241 message: format!(
1242 "Sync completed: {} repos, {} records",
1243 repos_processed, records_synced
1244 ),
1245 })
1246 }
1247 Err(_) => {
1248 // Timeout occurred - return partial success with guidance
1249 warn!(
1250 "Sync for user {} timed out after {}s, suggest using async job",
1251 user_did, timeout_secs
1252 );
1253 Ok(SyncUserCollectionsResult {
1254 success: false,
1255 repos_processed: 0,
1256 records_synced: 0,
1257 timed_out: true,
1258 message: format!(
1259 "Sync timed out after {}s - use startSync endpoint for larger syncs",
1260 timeout_secs
1261 ),
1262 })
1263 }
1264 }
1265 }
1266}