Highly ambitious ATProtocol AppView service and sdks
1//! Bulk synchronization operations with the ATProto relay.
2//!
3//! This module handles backfilling and syncing data from the ATProto network via the relay endpoint.
4//! It provides:
5//! - Memory-efficient batch processing with streaming writes
6//! - Concurrent database operations using channel-based architecture
7//! - HTTP/2 connection pooling for optimal network utilization
8//! - Rate-limited PDS requests (3 concurrent per server)
9//! - DID resolution with caching and chunked processing
10//! - Parallel record validation against Lexicon schemas
11//! - Actor indexing with pre-allocated data structures
12
13use atproto_identity::resolve::{HickoryDnsResolver, resolve_subject};
14use chrono::Utc;
15use futures_util::future;
16use reqwest::Client;
17use serde::{Deserialize, Serialize};
18use serde_json::Value;
19use tokio::sync::mpsc;
20use tokio::time::{Duration, timeout};
21use tracing::{debug, error, info, warn};
22
23use crate::actor_resolver::{resolve_actor_data_cached, resolve_actor_data_with_retry};
24use crate::cache::SliceCache;
25use crate::database::Database;
26use crate::errors::SyncError;
27use crate::jobs::is_job_cancelled;
28use crate::logging::LogLevel;
29use crate::logging::Logger;
30use crate::models::{Actor, Record};
31use serde_json::json;
32use std::sync::Arc;
33use tokio::sync::Mutex;
34use uuid::Uuid;
35
36// =============================================================================
37// ATProto API Response Types
38// =============================================================================
39
40/// Record returned from ATProto `com.atproto.repo.listRecords` endpoint.
41#[derive(Debug, Deserialize)]
42struct AtProtoRecord {
43 uri: String,
44 cid: String,
45 value: Value,
46}
47
48/// Response from `com.atproto.repo.listRecords` with cursor-based pagination.
49#[derive(Debug, Deserialize)]
50struct ListRecordsResponse {
51 records: Vec<AtProtoRecord>,
52 cursor: Option<String>,
53}
54
55/// Response from `com.atproto.sync.listReposByCollection` with cursor-based pagination.
56#[derive(Debug, Deserialize)]
57struct ListReposByCollectionResponse {
58 repos: Vec<RepoRef>,
59 cursor: Option<String>,
60}
61
62/// Repository reference from the relay (contains DID).
63#[derive(Debug, Deserialize)]
64struct RepoRef {
65 did: String,
66}
67
68// =============================================================================
69// Internal Data Structures
70// =============================================================================
71
72/// Resolved ATProto actor data (DID, PDS, handle).
73#[derive(Debug, Clone)]
74struct AtpData {
75 did: String,
76 pds: String,
77 handle: Option<String>,
78}
79
80// =============================================================================
81// Public API Types
82// =============================================================================
83
84/// Result from syncing user collections (used in login flows).
85#[derive(Debug, Serialize)]
86#[serde(rename_all = "camelCase")]
87pub struct SyncUserCollectionsResult {
88 pub success: bool,
89 pub repos_processed: i64,
90 pub records_synced: i64,
91 pub timed_out: bool,
92 pub message: String,
93}
94
95// =============================================================================
96// Sync Service
97// =============================================================================
98
99/// Service for synchronizing ATProto data from the relay.
100///
101/// Handles bulk backfills, user syncs, rate limiting, and validation.
102#[derive(Clone)]
103pub struct SyncService {
104 client: Client,
105 database: Database,
106 relay_endpoint: String,
107 cache: Option<Arc<Mutex<SliceCache>>>,
108 logger: Option<Logger>,
109 job_id: Option<Uuid>,
110 user_did: Option<String>,
111}
112
113impl SyncService {
114 /// Create a new SyncService with cache (for use outside job contexts).
115 pub fn with_cache(
116 database: Database,
117 relay_endpoint: String,
118 cache: Arc<Mutex<SliceCache>>,
119 ) -> Self {
120 // Create HTTP client with connection pooling and optimized settings
121 let client = Client::builder()
122 .pool_idle_timeout(Duration::from_secs(90))
123 .pool_max_idle_per_host(10)
124 .http2_keep_alive_interval(Some(Duration::from_secs(30)))
125 .http2_keep_alive_timeout(Duration::from_secs(10))
126 .timeout(Duration::from_secs(30))
127 .build()
128 .unwrap_or_else(|_| Client::new());
129
130 Self {
131 client,
132 database,
133 relay_endpoint,
134 cache: Some(cache),
135 logger: None,
136 job_id: None,
137 user_did: None,
138 }
139 }
140
141 /// Create a new SyncService with logging and cache enabled for a specific job
142 pub fn with_logging_and_cache(
143 database: Database,
144 relay_endpoint: String,
145 logger: Logger,
146 job_id: Uuid,
147 user_did: String,
148 cache: Arc<Mutex<SliceCache>>,
149 ) -> Self {
150 // Create HTTP client with connection pooling and optimized settings
151 let client = Client::builder()
152 .pool_idle_timeout(Duration::from_secs(90))
153 .pool_max_idle_per_host(10)
154 .http2_keep_alive_interval(Some(Duration::from_secs(30)))
155 .http2_keep_alive_timeout(Duration::from_secs(10))
156 .timeout(Duration::from_secs(30))
157 .build()
158 .unwrap_or_else(|_| Client::new());
159
160 Self {
161 client,
162 database,
163 relay_endpoint,
164 cache: Some(cache),
165 logger: Some(logger),
166 job_id: Some(job_id),
167 user_did: Some(user_did),
168 }
169 }
170
171 /// Log a message with job context (job_id, user_did, slice_uri).
172 /// Only logs if this service was created with logging enabled.
173 fn log_with_context(
174 &self,
175 slice_uri: &str,
176 level: LogLevel,
177 message: &str,
178 metadata: Option<serde_json::Value>,
179 ) {
180 if let (Some(logger), Some(job_id), Some(user_did)) =
181 (&self.logger, &self.job_id, &self.user_did)
182 {
183 logger.log_sync_job(*job_id, user_did, slice_uri, level, message, metadata);
184 }
185 }
186
187 /// Check if this job has been cancelled.
188 ///
189 /// Returns Ok(()) if not cancelled, Err(SyncError::Cancelled) if cancelled.
190 /// Only checks if this service has a job_id (created with logging enabled).
191 async fn check_cancellation(&self) -> Result<(), SyncError> {
192 if let Some(job_id) = self.job_id {
193 let is_cancelled = is_job_cancelled(self.database.pool(), job_id)
194 .await
195 .map_err(|e| SyncError::DatabaseQuery(e.to_string()))?;
196
197 if is_cancelled {
198 info!("Job {} has been cancelled, stopping sync", job_id);
199 return Err(SyncError::Cancelled);
200 }
201 }
202 Ok(())
203 }
204
205 /// Backfill collections from the ATProto relay.
206 ///
207 /// This is the main entry point for bulk synchronization operations.
208 ///
209 /// # Arguments
210 ///
211 /// * `slice_uri` - The slice to backfill data into
212 /// * `collections` - Primary collections (owned by slice domain) to backfill
213 /// * `external_collections` - External collections (not owned by slice) to backfill
214 /// * `repos` - Specific repos to sync (if None, fetches all repos for collections)
215 /// * `skip_validation` - Skip Lexicon validation (useful for testing)
216 ///
217 /// # Returns
218 ///
219 /// Tuple of (repos_processed, records_synced)
220 ///
221 /// # Performance Optimizations
222 ///
223 /// - Requests are grouped by PDS server with 3 concurrent requests max
224 /// - Records are processed in 500-item batches to limit memory usage
225 /// - Database writes happen concurrently via channels
226 /// - HTTP/2 connection pooling reduces network overhead
227 pub async fn backfill_collections(
228 &self,
229 slice_uri: &str,
230 collections: Option<&[String]>,
231 external_collections: Option<&[String]>,
232 repos: Option<&[String]>,
233 skip_validation: bool,
234 max_repos: Option<i32>,
235 ) -> Result<(i64, i64), SyncError> {
236 info!("Starting backfill operation");
237
238 let primary_collections = collections.map(|c| c.to_vec()).unwrap_or_default();
239 let external_collections = external_collections.map(|c| c.to_vec()).unwrap_or_default();
240
241 if !primary_collections.is_empty() {
242 info!(
243 "Processing {} primary collections: {}",
244 primary_collections.len(),
245 primary_collections.join(", ")
246 );
247 }
248
249 if !external_collections.is_empty() {
250 info!(
251 "Processing {} external collections: {}",
252 external_collections.len(),
253 external_collections.join(", ")
254 );
255 }
256
257 if primary_collections.is_empty() && external_collections.is_empty() {
258 info!("No collections specified for backfill");
259 return Ok((0, 0));
260 }
261
262 let all_collections = [&primary_collections[..], &external_collections[..]].concat();
263
264 // Fetch repos to process (either provided or discovered from collections)
265 let all_repos = if let Some(provided_repos) = repos {
266 info!("Using {} provided repositories", provided_repos.len());
267 provided_repos.to_vec()
268 } else {
269 info!("Fetching repositories for collections...");
270 let mut unique_repos = std::collections::HashSet::new();
271
272 // First, get all repos from primary collections
273 let mut primary_repos = std::collections::HashSet::new();
274 for collection in &primary_collections {
275 // Check for cancellation between collections
276 self.check_cancellation().await?;
277
278 match self.get_repos_for_collection(collection, slice_uri, max_repos).await {
279 Ok(repos) => {
280 info!(
281 "Found {} repositories for primary collection \"{}\"",
282 repos.len(),
283 collection
284 );
285 self.log_with_context(
286 slice_uri,
287 LogLevel::Info,
288 &format!(
289 "Found {} repositories for collection '{}'",
290 repos.len(),
291 collection
292 ),
293 Some(json!({"collection": collection, "repo_count": repos.len()})),
294 );
295 primary_repos.extend(repos);
296 }
297 Err(e) => {
298 error!(
299 "Failed to get repos for primary collection {}: {}",
300 collection, e
301 );
302 self.log_with_context(
303 slice_uri,
304 LogLevel::Error,
305 &format!(
306 "Failed to fetch repositories for collection '{}': {}",
307 collection, e
308 ),
309 Some(json!({"collection": collection, "error": e.to_string()})),
310 );
311 }
312 }
313 }
314
315 info!(
316 "Found {} unique repositories from primary collections",
317 primary_repos.len()
318 );
319
320 // Use primary repos for syncing (both primary and external collections)
321 unique_repos.extend(primary_repos);
322
323 let repos: Vec<String> = unique_repos.into_iter().collect();
324 info!("Processing {} unique repositories", repos.len());
325 repos
326 };
327
328 // Resolve DID -> PDS/handle mappings for all repos
329 info!("Resolving ATP data for repositories...");
330 let atp_map = self.get_atp_map_for_repos(&all_repos).await?;
331 info!(
332 "Resolved ATP data for {}/{} repositories",
333 atp_map.len(),
334 all_repos.len()
335 );
336
337 // Check for cancellation after DID resolution (before spawning expensive fetch tasks)
338 self.check_cancellation().await?;
339
340 // Only sync repos that successfully resolved
341 let valid_repos: Vec<String> = atp_map.keys().cloned().collect();
342 let failed_resolutions = all_repos.len() - valid_repos.len();
343
344 if failed_resolutions > 0 {
345 info!(
346 "{} repositories failed DID resolution and will be skipped",
347 failed_resolutions
348 );
349 }
350
351 info!("Starting sync for {} repositories...", valid_repos.len());
352
353 // Group requests by PDS server for rate limiting and connection reuse
354 // Pre-allocated capacity avoids HashMap resizing during insertions
355 let mut requests_by_pds: std::collections::HashMap<String, Vec<(String, String)>> =
356 std::collections::HashMap::with_capacity(atp_map.len());
357
358 for repo in &valid_repos {
359 if let Some(atp_data) = atp_map.get(repo) {
360 let pds_url = atp_data.pds.clone();
361 for collection in &all_collections {
362 requests_by_pds
363 .entry(pds_url.clone())
364 .or_default()
365 .push((repo.clone(), collection.clone()));
366 }
367 }
368 }
369
370 info!(
371 "Fetching records with rate limiting: {} PDS servers, {} total requests",
372 requests_by_pds.len(),
373 requests_by_pds.values().map(|v| v.len()).sum::<usize>()
374 );
375
376 // Process each PDS server with limited concurrency
377 // 3 concurrent requests balances speed vs memory usage
378 // Lower than 3 = too slow, Higher than 3 = memory pressure
379 let mut fetch_tasks = Vec::new();
380 const MAX_CONCURRENT_PER_PDS: usize = 3;
381
382 for (_pds_url, repo_collections) in requests_by_pds {
383 let sync_service = self.clone();
384 let atp_map_clone = atp_map.clone();
385 let slice_uri_clone = slice_uri.to_string();
386
387 // Process this PDS server's requests in chunks
388 let pds_task = tokio::spawn(async move {
389 let mut pds_results = Vec::new();
390
391 // Split requests into chunks and process with limited concurrency
392 for chunk in repo_collections.chunks(MAX_CONCURRENT_PER_PDS) {
393 let mut chunk_tasks = Vec::new();
394
395 for (repo, collection) in chunk {
396 let repo_clone = repo.clone();
397 let collection_clone = collection.clone();
398 let sync_service_clone = sync_service.clone();
399 let atp_map_inner = atp_map_clone.clone();
400 let slice_uri_inner = slice_uri_clone.clone();
401
402 let task = tokio::spawn(async move {
403 match sync_service_clone
404 .fetch_records_for_repo_collection_with_atp_map(
405 &repo_clone,
406 &collection_clone,
407 &atp_map_inner,
408 &slice_uri_inner,
409 )
410 .await
411 {
412 Ok(records) => Ok((repo_clone, collection_clone, records)),
413 Err(e) => {
414 // Handle common "not error" scenarios as empty results
415 match &e {
416 SyncError::ListRecords { status } => {
417 if *status == 404 || *status == 400 {
418 // Collection doesn't exist for this repo - return empty
419 Ok((repo_clone, collection_clone, vec![]))
420 } else {
421 Err(e)
422 }
423 }
424 SyncError::HttpRequest(_) => {
425 // Network errors - treat as empty (like TypeScript version)
426 Ok((repo_clone, collection_clone, vec![]))
427 }
428 _ => Err(e),
429 }
430 }
431 }
432 });
433 chunk_tasks.push(task);
434 }
435
436 // Wait for this chunk to complete before starting the next
437 for task in chunk_tasks {
438 if let Ok(result) = task.await {
439 pds_results.push(result);
440 }
441 }
442
443 // Small delay between chunks to be kind to PDS servers
444 if chunk.len() == MAX_CONCURRENT_PER_PDS {
445 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
446 }
447 }
448
449 pds_results
450 });
451
452 fetch_tasks.push(pds_task);
453 }
454
455 // Collect all results
456 let mut successful_tasks = 0;
457 let mut failed_tasks = 0;
458
459 // Get lexicons for this slice
460 let lexicons = match self.database.get_lexicons_by_slice(slice_uri).await {
461 Ok(lexicons) if !lexicons.is_empty() => Some(lexicons),
462 Ok(_) => {
463 warn!("No lexicons found for slice {}", slice_uri);
464 None
465 }
466 Err(e) => {
467 warn!("Failed to get lexicons for slice {}: {}", slice_uri, e);
468 None
469 }
470 };
471
472 // Index actors first (ensuring actor records exist before inserting records)
473 info!("Indexing actors...");
474 self.index_actors(slice_uri, &valid_repos, &atp_map).await?;
475 info!("Indexed {} actors", valid_repos.len());
476
477 // Set up concurrent database writer using channels
478 // This allows fetching to continue while DB writes happen in parallel
479 // 500-record batches optimize for memory usage and DB transaction size
480 const BATCH_SIZE: usize = 500;
481 let (tx, mut rx) = mpsc::channel::<Vec<Record>>(4); // Buffer prevents backpressure
482 let database = self.database.clone();
483 let total_indexed_records = Arc::new(Mutex::new(0i64));
484
485 // Spawn database writer task
486 let writer_task = tokio::spawn(async move {
487 let mut write_count = 0i64;
488 while let Some(batch) = rx.recv().await {
489 let batch_size = batch.len() as i64;
490 match database.batch_insert_records(&batch).await {
491 Ok(_) => {
492 write_count += batch_size;
493 info!(
494 "Database writer: Inserted batch of {} records (total: {})",
495 batch_size, write_count
496 );
497 }
498 Err(e) => {
499 error!("Database writer: Failed to insert batch: {}", e);
500 return Err(SyncError::Generic(format!("Failed to insert batch: {}", e)));
501 }
502 }
503 }
504 Ok(write_count)
505 });
506
507 // Process results from each PDS server
508 let mut batch_buffer = Vec::with_capacity(BATCH_SIZE);
509
510 for pds_task in fetch_tasks {
511 // Check for cancellation between processing PDS results
512 self.check_cancellation().await?;
513
514 match pds_task.await {
515 Ok(pds_results) => {
516 // Process each result from this PDS
517 for result in pds_results {
518 match result {
519 Ok((repo, collection, records)) => {
520 let mut validated_records = Vec::new();
521 let total_records = records.len();
522
523 // Skip validation if requested
524 if skip_validation {
525 validated_records = records;
526 info!(
527 "Validation skipped - accepting all {} records for collection {} from repo {}",
528 total_records, collection, repo
529 );
530 }
531 // Validate each record if we have lexicons
532 else if let Some(ref lexicons) = lexicons {
533 let mut validation_errors = Vec::new();
534
535 // Process validations in chunks for better CPU cache locality
536 // 50 records per chunk optimizes L2/L3 cache usage
537 const VALIDATION_CHUNK_SIZE: usize = 50;
538 for chunk in records.chunks(VALIDATION_CHUNK_SIZE) {
539 // Check for cancellation between validation chunks
540 self.check_cancellation().await?;
541
542 for record in chunk {
543 match slices_lexicon::validate_record(
544 lexicons.clone(),
545 &collection,
546 record.json.clone(),
547 ) {
548 Ok(_) => {
549 validated_records.push(record.clone());
550 }
551 Err(e) => {
552 let error_msg = format!(
553 "Validation failed for record {} from {}: {}",
554 record.uri, repo, e
555 );
556 warn!("{}", error_msg);
557 validation_errors.push(json!({
558 "uri": record.uri,
559 "error": e.to_string()
560 }));
561
562 // Log individual validation failures
563 self.log_with_context(
564 slice_uri,
565 LogLevel::Warn,
566 &error_msg,
567 Some(json!({
568 "repo": repo,
569 "collection": collection,
570 "record_uri": record.uri,
571 "validation_error": e.to_string()
572 })),
573 );
574 }
575 }
576 }
577 }
578
579 let valid_count = validated_records.len();
580 let invalid_count = validation_errors.len();
581
582 if invalid_count > 0 {
583 self.log_with_context(slice_uri, LogLevel::Warn,
584 &format!("Validation completed for {}/{}: {} valid, {} invalid records",
585 repo, collection, valid_count, invalid_count),
586 Some(json!({
587 "repo": repo,
588 "collection": collection,
589 "valid_records": valid_count,
590 "invalid_records": invalid_count,
591 "validation_errors": validation_errors
592 }))
593 );
594 } else {
595 self.log_with_context(
596 slice_uri,
597 LogLevel::Info,
598 &format!(
599 "All {} records validated successfully for {}/{}",
600 valid_count, repo, collection
601 ),
602 Some(json!({
603 "repo": repo,
604 "collection": collection,
605 "valid_records": valid_count
606 })),
607 );
608 }
609
610 info!(
611 "Validated {}/{} records for collection {} from repo {}",
612 validated_records.len(),
613 total_records,
614 collection,
615 repo
616 );
617 } else {
618 // No validator available, accept all records
619 validated_records = records;
620 self.log_with_context(slice_uri, LogLevel::Warn,
621 &format!("No lexicon validator available for collection {}", collection),
622 Some(json!({"collection": collection, "repo": repo, "accepted_records": total_records}))
623 );
624 warn!(
625 "No lexicon validator available - accepting all records without validation for collection {}",
626 collection
627 );
628 }
629
630 // Add to batch buffer instead of all_records
631 batch_buffer.extend(validated_records);
632 successful_tasks += 1;
633 }
634 Err(_) => {
635 failed_tasks += 1;
636 }
637 }
638 }
639 }
640 Err(_) => {
641 // PDS task failed - count all its requests as failed
642 failed_tasks += 1;
643 }
644 }
645
646 // Send batch to writer when buffer is full
647 if batch_buffer.len() >= BATCH_SIZE {
648 let batch_to_send =
649 std::mem::replace(&mut batch_buffer, Vec::with_capacity(BATCH_SIZE));
650 let batch_count = batch_to_send.len() as i64;
651 info!(
652 "Sending batch of {} records to database writer",
653 batch_count
654 );
655
656 // Send to writer channel (non-blocking)
657 if let Err(e) = tx.send(batch_to_send).await {
658 error!("Failed to send batch to writer: {}", e);
659 return Err(SyncError::Generic(format!(
660 "Failed to send batch to writer: {}",
661 e
662 )));
663 }
664
665 let mut total = total_indexed_records.lock().await;
666 *total += batch_count;
667 }
668 }
669
670 // Flush any remaining records in the buffer
671 if !batch_buffer.is_empty() {
672 let batch_count = batch_buffer.len() as i64;
673 info!(
674 "Sending final batch of {} records to database writer",
675 batch_count
676 );
677
678 if let Err(e) = tx.send(batch_buffer).await {
679 error!("Failed to send final batch to writer: {}", e);
680 return Err(SyncError::Generic(format!(
681 "Failed to send final batch to writer: {}",
682 e
683 )));
684 }
685
686 let mut total = total_indexed_records.lock().await;
687 *total += batch_count;
688 }
689
690 // Close the channel and wait for writer to finish
691 drop(tx);
692 let write_result = writer_task
693 .await
694 .map_err(|e| SyncError::Generic(format!("Writer task panicked: {}", e)))?;
695
696 let final_count = match write_result {
697 Ok(count) => count,
698 Err(e) => return Err(e),
699 };
700
701 info!(
702 "Debug: {} successful tasks, {} failed tasks",
703 successful_tasks, failed_tasks
704 );
705
706 info!("Indexed {} new/changed records in batches", final_count);
707
708 info!("Backfill complete!");
709
710 Ok((valid_repos.len() as i64, final_count))
711 }
712
713 /// Fetch all repositories that have records in a given collection.
714 ///
715 /// Uses cursor-based pagination to fetch all repos from the relay.
716 pub async fn get_repos_for_collection(
717 &self,
718 collection: &str,
719 slice_uri: &str,
720 max_repos: Option<i32>,
721 ) -> Result<Vec<String>, SyncError> {
722 let url = format!(
723 "{}/xrpc/com.atproto.sync.listReposByCollection",
724 self.relay_endpoint
725 );
726 let mut all_repos = Vec::new();
727 let mut cursor: Option<String> = None;
728 let mut page_count = 0;
729
730 // AT Protocol docs: default 500 repos/page, max 2000 repos/page
731 // We use 1000 repos/page for efficiency (recommended for large DID lists)
732 const REPOS_PER_PAGE: usize = 1000; // Our configured page size
733
734 // Calculate max pages based on repo limit, with a reasonable safety margin
735 let max_pages = if let Some(limit) = max_repos {
736 // Add 20% safety margin and ensure at least 5 pages
737 ((limit as usize * 120 / 100) / REPOS_PER_PAGE).max(5)
738 } else {
739 25 // Default fallback for unlimited
740 };
741
742 loop {
743 page_count += 1;
744 if page_count > max_pages {
745 warn!(
746 "Reached maximum page limit ({}) for collection {} (based on repo limit {:?}, estimated max {} repos at {} per page)",
747 max_pages,
748 collection,
749 max_repos,
750 max_pages * REPOS_PER_PAGE,
751 REPOS_PER_PAGE
752 );
753 break;
754 }
755
756 let mut query_params = vec![
757 ("collection", collection.to_string()),
758 ("limit", "1000".to_string()),
759 ];
760 if let Some(ref cursor_value) = cursor {
761 query_params.push(("cursor", cursor_value.clone()));
762 }
763
764 let response = self.client.get(&url).query(&query_params).send().await?;
765
766 if !response.status().is_success() {
767 return Err(SyncError::ListRepos {
768 status: response.status().as_u16(),
769 });
770 }
771
772 let repos_response: ListReposByCollectionResponse = response.json().await?;
773
774 // Add repos from this page to our collection
775 all_repos.extend(repos_response.repos.into_iter().map(|r| r.did));
776
777 // Check if there's a next page
778 match repos_response.cursor {
779 Some(next_cursor) if !next_cursor.is_empty() => {
780 cursor = Some(next_cursor);
781 // Log pagination progress if we have a logger
782 self.log_with_context(slice_uri, LogLevel::Info,
783 &format!("Fetching next page of repositories for collection {}, total so far: {}", collection, all_repos.len()),
784 Some(json!({
785 "collection": collection,
786 "repos_count": all_repos.len(),
787 "has_more": true,
788 "page": page_count
789 }))
790 );
791 }
792 _ => break, // No more pages
793 }
794 }
795
796 // Log final count
797 self.log_with_context(
798 slice_uri,
799 LogLevel::Info,
800 &format!(
801 "Completed fetching repositories for collection {}, total: {}",
802 collection,
803 all_repos.len()
804 ),
805 Some(json!({
806 "collection": collection,
807 "total_repos": all_repos.len()
808 })),
809 );
810
811 Ok(all_repos)
812 }
813
814 /// Fetch records for a repo/collection with retry logic.
815 ///
816 /// If the PDS returns an error, invalidates the cached DID resolution and retries once.
817 /// This handles cases where PDS URLs change.
818 async fn fetch_records_for_repo_collection_with_atp_map(
819 &self,
820 repo: &str,
821 collection: &str,
822 atp_map: &std::collections::HashMap<String, AtpData>,
823 slice_uri: &str,
824 ) -> Result<Vec<Record>, SyncError> {
825 let atp_data = atp_map
826 .get(repo)
827 .ok_or_else(|| SyncError::Generic(format!("No ATP data found for repo: {}", repo)))?;
828
829 match self
830 .fetch_records_for_repo_collection(repo, collection, &atp_data.pds, slice_uri)
831 .await
832 {
833 Ok(records) => Ok(records),
834 Err(SyncError::ListRecords { status }) if (400..600).contains(&status) => {
835 // 4xx/5xx error from PDS - try invalidating cache and retrying once
836 debug!(
837 "PDS error {} for repo {}, attempting cache invalidation and retry",
838 status, repo
839 );
840
841 match resolve_actor_data_with_retry(&self.client, repo, self.cache.clone(), true)
842 .await
843 {
844 Ok(fresh_actor_data) => {
845 debug!(
846 "Successfully re-resolved actor data for {}, retrying with PDS: {}",
847 repo, fresh_actor_data.pds
848 );
849 self.fetch_records_for_repo_collection(
850 repo,
851 collection,
852 &fresh_actor_data.pds,
853 slice_uri,
854 )
855 .await
856 }
857 Err(e) => {
858 debug!("Failed to re-resolve actor data for {}: {:?}", repo, e);
859 Err(SyncError::ListRecords { status }) // Return original error
860 }
861 }
862 }
863 Err(e) => Err(e), // Other errors (network, etc.) - don't retry
864 }
865 }
866
867 /// Fetch records for a specific repo and collection from its PDS.
868 ///
869 /// Only returns new or changed records (compared by CID).
870 /// Uses cursor-based pagination to fetch all records.
871 ///
872 /// # Memory optimizations:
873 /// - Pre-allocated Vec with 100 capacity (typical collection size)
874 /// - Fetches in 100-record pages to limit response size
875 /// - Reuses HTTP connections via client pooling
876 async fn fetch_records_for_repo_collection(
877 &self,
878 repo: &str,
879 collection: &str,
880 pds_url: &str,
881 slice_uri: &str,
882 ) -> Result<Vec<Record>, SyncError> {
883 // Get existing record CIDs to skip unchanged records
884 let existing_cids = self
885 .database
886 .get_existing_record_cids_for_slice(repo, collection, slice_uri)
887 .await
888 .map_err(|e| SyncError::Generic(format!("Failed to get existing CIDs: {}", e)))?;
889
890 debug!(
891 "Found {} existing records for {}/{}",
892 existing_cids.len(),
893 repo,
894 collection
895 );
896
897 // Pre-allocate based on typical collection size (100 records)
898 // This avoids Vec reallocations which can cause memory fragmentation
899 let mut records = Vec::with_capacity(100);
900 let mut cursor: Option<String> = None;
901 let mut fetched_count = 0;
902 let mut skipped_count = 0;
903
904 loop {
905 let mut params = vec![("repo", repo), ("collection", collection), ("limit", "100")];
906 if let Some(ref c) = cursor {
907 params.push(("cursor", c));
908 }
909
910 let request_url = format!("{}/xrpc/com.atproto.repo.listRecords", pds_url);
911 let response = self.client.get(&request_url).query(¶ms).send().await;
912
913 let response = match response {
914 Ok(resp) => resp,
915 Err(e) => {
916 self.log_with_context(slice_uri, LogLevel::Error,
917 &format!("Failed to fetch records from {}: Network error: {}", repo, e),
918 Some(json!({"repo": repo, "collection": collection, "pds_url": pds_url, "error": e.to_string()}))
919 );
920 return Err(SyncError::from(e));
921 }
922 };
923
924 if !response.status().is_success() {
925 let status = response.status().as_u16();
926
927 // HTTP 400/404 are expected when collections don't exist - log as info, not error
928 let (log_level, log_message) = if status == 400 || status == 404 {
929 (
930 LogLevel::Info,
931 format!(
932 "Collection '{}' not found for {}: HTTP {}",
933 collection, repo, status
934 ),
935 )
936 } else {
937 (
938 LogLevel::Error,
939 format!(
940 "Failed to fetch records from {}: HTTP {} from PDS",
941 repo, status
942 ),
943 )
944 };
945
946 self.log_with_context(slice_uri, log_level,
947 &log_message,
948 Some(json!({"repo": repo, "collection": collection, "pds_url": pds_url, "http_status": status}))
949 );
950 return Err(SyncError::ListRecords { status });
951 }
952
953 let list_response: ListRecordsResponse = response.json().await?;
954
955 for atproto_record in list_response.records {
956 // Check if we already have this record with the same CID
957 if let Some(existing_cid) = existing_cids.get(&atproto_record.uri)
958 && existing_cid == &atproto_record.cid
959 {
960 // Record unchanged, skip it
961 skipped_count += 1;
962 continue;
963 }
964
965 // Record is new or changed, include it
966 // TODO: Consider using Arc<str> for frequently cloned strings
967 let record = Record {
968 uri: atproto_record.uri,
969 cid: atproto_record.cid,
970 did: repo.to_string(),
971 collection: collection.to_string(),
972 json: atproto_record.value,
973 indexed_at: Utc::now(),
974 slice_uri: Some(slice_uri.to_string()),
975 };
976 records.push(record);
977 fetched_count += 1;
978 }
979
980 cursor = list_response.cursor;
981 if cursor.is_none() {
982 break;
983 }
984 }
985
986 // Log results for this repo/collection
987 if fetched_count > 0 || skipped_count > 0 {
988 self.log_with_context(
989 slice_uri,
990 LogLevel::Info,
991 &format!(
992 "Fetched {} new/changed, skipped {} unchanged records from {}/{}",
993 fetched_count, skipped_count, repo, collection
994 ),
995 Some(json!({
996 "repo": repo,
997 "collection": collection,
998 "new_records": fetched_count,
999 "skipped_records": skipped_count,
1000 "pds_url": pds_url
1001 })),
1002 );
1003 }
1004
1005 if skipped_count > 0 {
1006 info!(
1007 "Skipped {} unchanged records, fetched {} new/changed records for {}/{}",
1008 skipped_count, fetched_count, repo, collection
1009 );
1010 }
1011
1012 Ok(records)
1013 }
1014
1015 /// Resolve ATP data (DID, PDS, handle) for multiple repos.
1016 ///
1017 /// Returns a map of DID -> AtpData. Failed resolutions are logged but don't fail the operation.
1018 ///
1019 /// # Performance optimizations:
1020 /// - Processes DIDs in 50-item chunks to limit memory usage
1021 /// - 10 concurrent DNS resolutions max to avoid resolver exhaustion
1022 /// - Pre-allocated HashMap based on input size
1023 async fn get_atp_map_for_repos(
1024 &self,
1025 repos: &[String],
1026 ) -> Result<std::collections::HashMap<String, AtpData>, SyncError> {
1027 let mut atp_map = std::collections::HashMap::with_capacity(repos.len());
1028 const CHUNK_SIZE: usize = 50; // Process DIDs in chunks
1029 const MAX_CONCURRENT: usize = 10; // Limit concurrent resolutions
1030
1031 info!(
1032 "Resolving ATP data for {} repositories in chunks",
1033 repos.len()
1034 );
1035
1036 for (chunk_idx, chunk) in repos.chunks(CHUNK_SIZE).enumerate() {
1037 let chunk_start = chunk_idx * CHUNK_SIZE;
1038 let chunk_end = std::cmp::min(chunk_start + CHUNK_SIZE, repos.len());
1039
1040 debug!(
1041 "Processing DID resolution chunk {}/{} (repos {}-{})",
1042 chunk_idx + 1,
1043 repos.len().div_ceil(CHUNK_SIZE),
1044 chunk_start,
1045 chunk_end - 1
1046 );
1047
1048 // Process this chunk with limited concurrency
1049 let mut resolution_tasks = Vec::new();
1050
1051 for batch in chunk.chunks(MAX_CONCURRENT) {
1052 let mut batch_futures = Vec::new();
1053
1054 for repo in batch {
1055 let repo_clone = repo.clone();
1056 let self_clone = self.clone();
1057
1058 let fut = async move {
1059 match self_clone.resolve_atp_data(&repo_clone).await {
1060 Ok(atp_data) => Some((atp_data.did.clone(), atp_data)),
1061 Err(e) => {
1062 warn!("Failed to resolve ATP data for {}: {:?}", repo_clone, e);
1063 None
1064 }
1065 }
1066 };
1067 batch_futures.push(fut);
1068 }
1069
1070 // Wait for this batch to complete
1071 let batch_results = future::join_all(batch_futures).await;
1072 resolution_tasks.extend(batch_results);
1073 }
1074
1075 // Add resolved data to map
1076 for (did, atp_data) in resolution_tasks.into_iter().flatten() {
1077 atp_map.insert(did, atp_data);
1078 }
1079
1080 // Small delay between chunks to be kind to DNS resolvers
1081 if chunk_idx < repos.len().div_ceil(CHUNK_SIZE) - 1 {
1082 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1083 }
1084 }
1085
1086 info!(
1087 "Successfully resolved ATP data for {}/{} repositories",
1088 atp_map.len(),
1089 repos.len()
1090 );
1091 Ok(atp_map)
1092 }
1093
1094 /// Resolve ATP data for a single DID.
1095 ///
1096 /// Uses DID resolution to get the DID document, then extracts PDS and handle.
1097 /// Results are cached to avoid repeated lookups.
1098 async fn resolve_atp_data(&self, did: &str) -> Result<AtpData, SyncError> {
1099 debug!("Resolving ATP data for DID: {}", did);
1100
1101 let dns_resolver = HickoryDnsResolver::create_resolver(&[]);
1102
1103 match resolve_subject(&self.client, &dns_resolver, did).await {
1104 Ok(resolved_did) => {
1105 debug!("Successfully resolved subject: {}", resolved_did);
1106
1107 let actor_data =
1108 resolve_actor_data_cached(&self.client, &resolved_did, self.cache.clone())
1109 .await
1110 .map_err(|e| SyncError::Generic(e.to_string()))?;
1111
1112 let atp_data = AtpData {
1113 did: actor_data.did,
1114 pds: actor_data.pds,
1115 handle: actor_data.handle,
1116 };
1117
1118 Ok(atp_data)
1119 }
1120 Err(e) => Err(SyncError::Generic(format!(
1121 "Failed to resolve subject for {}: {:?}",
1122 did, e
1123 ))),
1124 }
1125 }
1126
1127 /// Index actors (DIDs with handles) into the database.
1128 ///
1129 /// Creates actor records for all repos being synced.
1130 async fn index_actors(
1131 &self,
1132 slice_uri: &str,
1133 repos: &[String],
1134 atp_map: &std::collections::HashMap<String, AtpData>,
1135 ) -> Result<(), SyncError> {
1136 let mut actors = Vec::new();
1137 let now = chrono::Utc::now().to_rfc3339();
1138
1139 for repo in repos {
1140 if let Some(atp_data) = atp_map.get(repo) {
1141 actors.push(Actor {
1142 did: atp_data.did.clone(),
1143 handle: atp_data.handle.clone(),
1144 slice_uri: slice_uri.to_string(),
1145 indexed_at: now.clone(),
1146 });
1147 }
1148 }
1149
1150 if !actors.is_empty() {
1151 self.database.batch_insert_actors(&actors).await?;
1152 }
1153
1154 Ok(())
1155 }
1156
1157 /// Get external collections for a slice.
1158 ///
1159 /// External collections are those that don't start with the slice's domain.
1160 /// For example, if slice domain is "com.example", then "app.bsky.feed.post" is external.
1161 async fn get_external_collections_for_slice(
1162 &self,
1163 slice_uri: &str,
1164 ) -> Result<Vec<String>, SyncError> {
1165 // Get the slice's domain
1166 let domain = self
1167 .database
1168 .get_slice_domain(slice_uri)
1169 .await
1170 .map_err(|e| SyncError::Generic(format!("Failed to get slice domain: {}", e)))?
1171 .ok_or_else(|| SyncError::Generic(format!("Slice not found: {}", slice_uri)))?;
1172
1173 // Get all collections (lexicons) for this slice
1174 let collections = self
1175 .database
1176 .get_slice_collections_list(slice_uri)
1177 .await
1178 .map_err(|e| SyncError::Generic(format!("Failed to get slice collections: {}", e)))?;
1179
1180 // Filter for external collections (those that don't start with the slice domain)
1181 let external_collections: Vec<String> = collections
1182 .into_iter()
1183 .filter(|collection| !collection.starts_with(&domain))
1184 .collect();
1185
1186 info!(
1187 "Found {} external collections for slice {} (domain: {}): {:?}",
1188 external_collections.len(),
1189 slice_uri,
1190 domain,
1191 external_collections
1192 );
1193
1194 Ok(external_collections)
1195 }
1196
1197 /// Sync user's data for all external collections defined in the slice.
1198 ///
1199 /// Used during login flows to quickly sync a user's data.
1200 /// Automatically discovers which collections to sync based on slice configuration.
1201 /// Uses timeout protection to ensure responsive login flows.
1202 ///
1203 /// # Arguments
1204 ///
1205 /// * `user_did` - The user's DID to sync
1206 /// * `slice_uri` - The slice to sync into
1207 /// * `timeout_secs` - Maximum seconds to wait before timing out
1208 ///
1209 /// # Returns
1210 ///
1211 /// Result with repos_processed, records_synced, and timeout status
1212 pub async fn sync_user_collections(
1213 &self,
1214 user_did: &str,
1215 slice_uri: &str,
1216 timeout_secs: u64,
1217 ) -> Result<SyncUserCollectionsResult, SyncError> {
1218 info!(
1219 "Auto-discovering external collections for user {} in slice {}",
1220 user_did, slice_uri
1221 );
1222
1223 // Auto-discover external collections from slice configuration
1224 let external_collections = self.get_external_collections_for_slice(slice_uri).await?;
1225
1226 if external_collections.is_empty() {
1227 info!("No external collections found for slice {}", slice_uri);
1228 return Ok(SyncUserCollectionsResult {
1229 success: true,
1230 repos_processed: 0,
1231 records_synced: 0,
1232 timed_out: false,
1233 message: "No external collections to sync".to_string(),
1234 });
1235 }
1236
1237 info!(
1238 "Syncing {} external collections for user {}: {:?}",
1239 external_collections.len(),
1240 user_did,
1241 external_collections
1242 );
1243
1244 // Use backfill_collections with timeout, only syncing this specific user
1245 let sync_future = async {
1246 self.backfill_collections(
1247 slice_uri,
1248 None, // No primary collections for user sync
1249 Some(&external_collections),
1250 Some(&[user_did.to_string()]), // Only sync this user's repos
1251 false, // Always validate user collections
1252 None, // No limit for user-specific sync
1253 )
1254 .await
1255 };
1256
1257 match timeout(Duration::from_secs(timeout_secs), sync_future).await {
1258 Ok(result) => {
1259 let (repos_processed, records_synced) = result?;
1260 info!(
1261 "User sync completed within timeout: {} repos, {} records",
1262 repos_processed, records_synced
1263 );
1264 Ok(SyncUserCollectionsResult {
1265 success: true,
1266 repos_processed,
1267 records_synced,
1268 timed_out: false,
1269 message: format!(
1270 "Sync completed: {} repos, {} records",
1271 repos_processed, records_synced
1272 ),
1273 })
1274 }
1275 Err(_) => {
1276 // Timeout occurred - return partial success with guidance
1277 warn!(
1278 "Sync for user {} timed out after {}s, suggest using async job",
1279 user_did, timeout_secs
1280 );
1281 Ok(SyncUserCollectionsResult {
1282 success: false,
1283 repos_processed: 0,
1284 records_synced: 0,
1285 timed_out: true,
1286 message: format!(
1287 "Sync timed out after {}s - use startSync endpoint for larger syncs",
1288 timeout_secs
1289 ),
1290 })
1291 }
1292 }
1293 }
1294}