use anyhow::Result; use async_trait::async_trait; use atproto_jetstream::{ CancellationToken, Consumer, ConsumerTaskConfig, EventHandler, JetstreamEvent, }; use chrono::Utc; use reqwest::Client; use std::collections::HashSet; use std::sync::Arc; use tokio::sync::{Mutex, RwLock}; use tracing::{error, info, warn}; use crate::actor_resolver::resolve_actor_data; use crate::cache::{CacheBackend, CacheFactory, SliceCache}; use crate::database::Database; use crate::errors::JetstreamError; use crate::graphql::{publish_jetstream_log, RecordOperation, RecordUpdateEvent, PUBSUB}; use crate::jetstream_cursor::PostgresCursorHandler; use crate::logging::{LogEntry, LogLevel, Logger}; use crate::models::{Actor, Record}; pub struct JetstreamConsumer { consumer: Consumer, database: Database, http_client: Client, actor_cache: Arc>, lexicon_cache: Arc>, domain_cache: Arc>, collections_cache: Arc>, pub event_count: Arc, cursor_handler: Option>, slices_list: Arc>>, } // Event handler that implements the EventHandler trait struct SliceEventHandler { database: Database, http_client: Client, event_count: Arc, actor_cache: Arc>, lexicon_cache: Arc>, domain_cache: Arc>, collections_cache: Arc>, cursor_handler: Option>, slices_list: Arc>>, } #[async_trait] impl EventHandler for SliceEventHandler { async fn handle_event(&self, event: JetstreamEvent) -> Result<()> { let count = self .event_count .fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1; if count.is_multiple_of(10000) { info!("Jetstream consumer has processed {} events", count); } // Extract and update cursor position from event let time_us = match &event { JetstreamEvent::Commit { time_us, .. } => *time_us, JetstreamEvent::Delete { time_us, .. } => *time_us, JetstreamEvent::Identity { time_us, .. } => *time_us, JetstreamEvent::Account { time_us, .. } => *time_us, }; if let Some(cursor_handler) = &self.cursor_handler { cursor_handler.update_position(time_us); // Periodically write cursor to DB (debounced by handler) if let Err(e) = cursor_handler.maybe_write_cursor().await { error!("Failed to write cursor: {}", e); } } match event { JetstreamEvent::Commit { did, commit, .. } => { if let Err(e) = self.handle_commit_event(&did, commit).await { let message = format!("Error handling commit event: {}", e); error!("{}", message); Logger::global().log_jetstream( LogLevel::Error, &message, Some(serde_json::json!({ "error": e.to_string(), "did": did, "event_type": "commit" })), ); } } JetstreamEvent::Delete { did, commit, .. } => { if let Err(e) = self.handle_delete_event(&did, commit).await { let message = format!("Error handling delete event: {}", e); error!("{}", message); Logger::global().log_jetstream( LogLevel::Error, &message, Some(serde_json::json!({ "error": e.to_string(), "did": did, "event_type": "delete" })), ); } } _ => { // Ignore other event types (identity, account, etc.) } } Ok(()) } fn handler_id(&self) -> String { "slice-records-indexer".to_string() } } impl SliceEventHandler { /// Check if DID is an actor for the given slice async fn is_actor_cached( &self, did: &str, slice_uri: &str, ) -> Result, anyhow::Error> { match self.actor_cache.lock().await.is_actor(did, slice_uri).await { Ok(result) => Ok(result), Err(e) => { warn!( error = ?e, did = did, slice_uri = slice_uri, "Actor cache error" ); Ok(None) } } } /// Cache that an actor exists async fn cache_actor_exists(&self, did: &str, slice_uri: &str) { if let Err(e) = self .actor_cache .lock() .await .cache_actor_exists(did, slice_uri) .await { warn!( error = ?e, did = did, slice_uri = slice_uri, "Failed to cache actor exists" ); } } /// Remove actor from cache async fn remove_actor_from_cache(&self, did: &str, slice_uri: &str) { if let Err(e) = self .actor_cache .lock() .await .remove_actor(did, slice_uri) .await { warn!( error = ?e, did = did, slice_uri = slice_uri, "Failed to remove actor from cache" ); } } /// Log to database AND publish to GraphQL subscribers /// /// This helper ensures that log entries are both persisted to the database /// and streamed in real-time to any active GraphQL subscriptions. async fn log_and_publish( &self, level: LogLevel, message: &str, metadata: Option, slice_uri: Option<&str>, ) { // First, log to database (batched write) Logger::global().log_jetstream_with_slice(level.clone(), message, metadata.clone(), slice_uri); // Then, publish to GraphQL subscribers in real-time // We need to query the most recent log entry we just created // Since logging is batched, we create a LogEntry manually for immediate publishing if let Some(slice) = slice_uri { let log_entry = LogEntry { id: 0, // Will be set by database, not critical for real-time streaming created_at: Utc::now(), log_type: "jetstream".to_string(), job_id: None, user_did: None, slice_uri: Some(slice.to_string()), level: level.as_str().to_string(), message: message.to_string(), metadata, }; publish_jetstream_log(log_entry).await; } } /// Get slice collections from cache with database fallback async fn get_slice_collections( &self, slice_uri: &str, ) -> Result>, anyhow::Error> { // Try cache first let cache_result = { let mut cache = self.collections_cache.lock().await; cache.get_slice_collections(slice_uri).await }; match cache_result { Ok(Some(collections)) => Ok(Some(collections)), Ok(None) => { // Cache miss - load from database match self.database.get_slice_collections_list(slice_uri).await { Ok(collections) => { let collections_set: HashSet = collections.into_iter().collect(); // Cache the result let _ = self .collections_cache .lock() .await .cache_slice_collections(slice_uri, &collections_set) .await; Ok(Some(collections_set)) } Err(e) => Err(e.into()), } } Err(e) => Err(e), } } /// Get slice domain from cache with database fallback async fn get_slice_domain(&self, slice_uri: &str) -> Result, anyhow::Error> { // Try cache first let cache_result = { let mut cache = self.domain_cache.lock().await; cache.get_slice_domain(slice_uri).await }; match cache_result { Ok(Some(domain)) => Ok(Some(domain)), Ok(None) => { // Cache miss - load from database match self.database.get_slice_domain(slice_uri).await { Ok(Some(domain)) => { // Cache the result let _ = self .domain_cache .lock() .await .cache_slice_domain(slice_uri, &domain) .await; Ok(Some(domain)) } Ok(None) => Ok(None), Err(e) => Err(e.into()), } } Err(e) => Err(e), } } /// Get slice lexicons from cache with database fallback async fn get_slice_lexicons( &self, slice_uri: &str, ) -> Result>, anyhow::Error> { // Try cache first let cache_result = { let mut cache = self.lexicon_cache.lock().await; cache.get_lexicons(slice_uri).await }; match cache_result { Ok(Some(lexicons)) => Ok(Some(lexicons)), Ok(None) => { // Cache miss - load from database match self.database.get_lexicons_by_slice(slice_uri).await { Ok(lexicons) if !lexicons.is_empty() => { // Cache the result let _ = self .lexicon_cache .lock() .await .cache_lexicons(slice_uri, &lexicons) .await; Ok(Some(lexicons)) } Ok(_) => Ok(None), // Empty lexicons Err(e) => Err(e.into()), } } Err(e) => Err(e), } } async fn handle_commit_event( &self, did: &str, commit: atproto_jetstream::JetstreamEventCommit, ) -> Result<()> { // Get all slices from cached list let slices = self.slices_list.read().await.clone(); // Process each slice for slice_uri in slices { // Get collections for this slice (with caching) let collections = match self.get_slice_collections(&slice_uri).await { Ok(Some(collections)) => collections, Ok(None) => continue, // No collections for this slice Err(e) => { error!("Failed to get collections for slice {}: {}", slice_uri, e); continue; } }; if collections.contains(&commit.collection) { // Special handling for network.slices.lexicon records // These should only be indexed to the slice specified in their JSON data let is_lexicon_for_this_slice = if commit.collection == "network.slices.lexicon" { if let Some(target_slice_uri) = commit.record.get("slice").and_then(|v| v.as_str()) { // Skip this slice if it's not the target slice for this lexicon if slice_uri != target_slice_uri { continue; } true // This is a lexicon record for this specific slice } else { // No target slice specified, skip this lexicon record entirely continue; } } else { false }; // Get the domain for this slice (with caching) let domain = match self.get_slice_domain(&slice_uri).await { Ok(Some(domain)) => domain, Ok(None) => continue, // No domain, skip Err(e) => { error!("Failed to get domain for slice {}: {}", slice_uri, e); continue; } }; // Check if this is a primary collection (starts with slice domain) // Lexicon records for this slice are always treated as primary let is_primary_collection = commit.collection.starts_with(&domain) || is_lexicon_for_this_slice; // For external collections, check actor status BEFORE expensive validation if !is_primary_collection { let is_actor = match self.is_actor_cached(did, &slice_uri).await { Ok(Some(cached_result)) => cached_result, Ok(None) => { // Cache miss means this DID is not an actor we've synced // For external collections, we only care about actors we've already added false } Err(e) => { error!("Error checking actor status: {}", e); continue; } }; if !is_actor { // Not an actor - skip validation entirely for external collections continue; } } // Get lexicons for validation (after actor check for external collections) let lexicons = match self.get_slice_lexicons(&slice_uri).await { Ok(Some(lexicons)) => lexicons, Ok(None) => { info!( "No lexicons found for slice {} - skipping validation", slice_uri ); continue; } Err(e) => { error!("Failed to get lexicons for slice {}: {}", slice_uri, e); continue; } }; // Validate the record against the slice's lexicons let validation_result = match slices_lexicon::validate_record( lexicons.clone(), &commit.collection, commit.record.clone(), ) { Ok(_) => { info!( "Record validated for collection {} in slice {}", commit.collection, slice_uri ); true } Err(e) => { let message = format!( "Validation failed for collection {} in slice {}", commit.collection, slice_uri ); error!("{}: {}", message, e); self.log_and_publish( LogLevel::Warn, &message, Some(serde_json::json!({ "collection": commit.collection, "slice_uri": slice_uri, "did": did })), Some(&slice_uri), ).await; false } }; if !validation_result { continue; // Skip this slice if validation fails } if is_primary_collection { // Primary collection - ensure actor exists and index ALL records info!( "Primary collection {} for slice {} (domain: {}) - indexing record", commit.collection, slice_uri, domain ); // Ensure actor exists for primary collections (except lexicons) // Lexicons don't create actors - they just get indexed if !is_lexicon_for_this_slice { let is_cached = matches!(self.is_actor_cached(did, &slice_uri).await, Ok(Some(_))); if !is_cached { // Actor not in cache - create it info!("Creating new actor {} for slice {}", did, slice_uri); // Resolve actor data (handle, PDS) match resolve_actor_data(&self.http_client, did).await { Ok(actor_data) => { let actor = Actor { did: actor_data.did.clone(), handle: actor_data.handle, slice_uri: slice_uri.clone(), indexed_at: Utc::now().to_rfc3339(), }; // Insert into database if let Err(e) = self.database.batch_insert_actors(&[actor]).await { error!("Failed to create actor {}: {}", did, e); } else { // Add to cache after successful database insert self.cache_actor_exists(did, &slice_uri).await; info!("Created actor {} for slice {}", did, slice_uri); } } Err(e) => { error!("Failed to resolve actor data for {}: {}", did, e); } } } } let uri = format!("at://{}/{}/{}", did, commit.collection, commit.rkey); let record = Record { uri: uri.clone(), cid: commit.cid.clone(), did: did.to_string(), collection: commit.collection.clone(), json: commit.record.clone(), indexed_at: Utc::now(), slice_uri: Some(slice_uri.clone()), }; match self.database.upsert_record(&record).await { Ok(is_insert) => { let message = if is_insert { format!("Record inserted in {}", commit.collection) } else { format!("Record updated in {}", commit.collection) }; self.log_and_publish( LogLevel::Info, &message, Some(serde_json::json!({ "did": did, "kind": "commit", "commit": { "rev": commit.rev, "operation": commit.operation, "collection": commit.collection, "rkey": commit.rkey, "record": commit.record, "cid": commit.cid }, "indexed_operation": if is_insert { "insert" } else { "update" }, "record_type": "primary" })), Some(&slice_uri), ).await; // Broadcast to GraphQL subscribers let event = RecordUpdateEvent { uri: uri.clone(), cid: commit.cid.clone(), did: did.to_string(), collection: commit.collection.clone(), value: commit.record.clone(), slice_uri: slice_uri.clone(), indexed_at: record.indexed_at.to_rfc3339(), operation: if is_insert { RecordOperation::Create } else { RecordOperation::Update }, }; PUBSUB.publish(event).await; } Err(e) => { let message = "Failed to insert/update record"; self.log_and_publish( LogLevel::Error, message, Some(serde_json::json!({ "did": did, "kind": "commit", "commit": { "rev": commit.rev, "operation": commit.operation, "collection": commit.collection, "rkey": commit.rkey, "record": commit.record, "cid": commit.cid }, "error": e.to_string(), "record_type": "primary" })), Some(&slice_uri), ).await; return Err(anyhow::anyhow!("Database error: {}", e)); } } info!( "Successfully indexed {} record from primary collection: {}", commit.operation, uri ); break; } else { // External collection - we already checked actor status, so just index info!( "External collection {} - DID {} is actor in slice {} - indexing", commit.collection, did, slice_uri ); let uri = format!("at://{}/{}/{}", did, commit.collection, commit.rkey); let record = Record { uri: uri.clone(), cid: commit.cid.clone(), did: did.to_string(), collection: commit.collection.clone(), json: commit.record.clone(), indexed_at: Utc::now(), slice_uri: Some(slice_uri.clone()), }; match self.database.upsert_record(&record).await { Ok(is_insert) => { let message = if is_insert { format!("Record inserted in {}", commit.collection) } else { format!("Record updated in {}", commit.collection) }; self.log_and_publish( LogLevel::Info, &message, Some(serde_json::json!({ "did": did, "kind": "commit", "commit": { "rev": commit.rev, "operation": commit.operation, "collection": commit.collection, "rkey": commit.rkey, "record": commit.record, "cid": commit.cid }, "indexed_operation": if is_insert { "insert" } else { "update" }, "record_type": "external" })), Some(&slice_uri), ).await; // Broadcast to GraphQL subscribers let event = RecordUpdateEvent { uri: uri.clone(), cid: commit.cid.clone(), did: did.to_string(), collection: commit.collection.clone(), value: commit.record.clone(), slice_uri: slice_uri.clone(), indexed_at: record.indexed_at.to_rfc3339(), operation: if is_insert { RecordOperation::Create } else { RecordOperation::Update }, }; PUBSUB.publish(event).await; } Err(e) => { let message = "Failed to insert/update record"; self.log_and_publish( LogLevel::Error, message, Some(serde_json::json!({ "did": did, "kind": "commit", "commit": { "rev": commit.rev, "operation": commit.operation, "collection": commit.collection, "rkey": commit.rkey, "record": commit.record, "cid": commit.cid }, "error": e.to_string(), "record_type": "external" })), Some(&slice_uri), ).await; return Err(anyhow::anyhow!("Database error: {}", e)); } } info!( "Successfully indexed {} record from external collection: {}", commit.operation, uri ); break; } } } Ok(()) } async fn handle_delete_event( &self, did: &str, commit: atproto_jetstream::JetstreamEventDelete, ) -> Result<()> { let uri = format!("at://{}/{}/{}", did, commit.collection, commit.rkey); // Get all slices from cached list let slices = self.slices_list.read().await.clone(); let mut relevant_slices: Vec = Vec::new(); for slice_uri in slices { // Get collections for this slice (with caching) let collections = match self.get_slice_collections(&slice_uri).await { Ok(Some(collections)) => collections, Ok(None) => continue, // No collections for this slice Err(e) => { error!("Failed to get collections for slice {}: {}", slice_uri, e); continue; } }; if !collections.contains(&commit.collection) { continue; } // Get the domain for this slice (with caching) let domain = match self.get_slice_domain(&slice_uri).await { Ok(Some(domain)) => domain, Ok(None) => continue, // No domain, skip Err(e) => { error!("Failed to get domain for slice {}: {}", slice_uri, e); continue; } }; // Check if this is a primary collection (starts with slice domain) let is_primary_collection = commit.collection.starts_with(&domain); if is_primary_collection { // Primary collection - always process deletes relevant_slices.push(slice_uri.clone()); } else { // External collection - only process if DID is an actor in this slice let is_actor = match self.is_actor_cached(did, &slice_uri).await { Ok(Some(cached_result)) => cached_result, _ => false, }; if is_actor { relevant_slices.push(slice_uri.clone()); } } } if relevant_slices.is_empty() { // No relevant slices found, skip deletion return Ok(()); } // Handle cascade deletion before deleting the record if let Err(e) = self .database .handle_cascade_deletion(&uri, &commit.collection) .await { warn!("Cascade deletion failed for {}: {}", uri, e); } // Delete the record and log only for relevant slices match self.database.delete_record_by_uri(&uri, None).await { Ok(rows_affected) => { if rows_affected > 0 { info!( "Deleted record: {} ({} rows) for {} slice(s)", uri, rows_affected, relevant_slices.len() ); let message = format!("Record deleted from {}", commit.collection); // Log to each relevant slice and check if actor cleanup is needed for slice_uri in &relevant_slices { self.log_and_publish( LogLevel::Info, &message, Some(serde_json::json!({ "operation": "delete", "collection": commit.collection, "did": did, "uri": uri, "rows_affected": rows_affected })), Some(slice_uri), ).await; // Broadcast delete event to GraphQL subscribers let event = RecordUpdateEvent { uri: uri.clone(), cid: String::new(), // No CID for delete events did: did.to_string(), collection: commit.collection.clone(), value: serde_json::json!({}), // Empty value for deletes slice_uri: slice_uri.clone(), indexed_at: Utc::now().to_rfc3339(), operation: RecordOperation::Delete, }; PUBSUB.publish(event).await; } // Check if actor should be cleaned up (no more records) for slice_uri in &relevant_slices { match self.database.actor_has_records(did, slice_uri).await { Ok(has_records) => { if !has_records { // No more records for this actor in this slice - clean up match self.database.delete_actor(did, slice_uri).await { Ok(deleted) => { if deleted > 0 { info!( "Cleaned up actor {} from slice {} (no records remaining)", did, slice_uri ); // Remove from cache self.remove_actor_from_cache(did, slice_uri).await; } } Err(e) => { error!( "Failed to delete actor {} from slice {}: {}", did, slice_uri, e ); } } } } Err(e) => { error!( "Failed to check if actor {} has records in slice {}: {}", did, slice_uri, e ); } } } } } Err(e) => { let message = "Failed to delete record"; error!("{}: {}", message, e); // Log error to each relevant slice for slice_uri in relevant_slices { self.log_and_publish( LogLevel::Error, message, Some(serde_json::json!({ "operation": "delete", "collection": commit.collection, "did": did, "uri": uri, "error": e.to_string() })), Some(&slice_uri), ).await; } } } Ok(()) } } impl JetstreamConsumer { /// Create a new Jetstream consumer with optional cursor support and Redis cache /// /// # Arguments /// * `database` - Database connection for slice configurations and record storage /// * `jetstream_hostname` - Optional custom jetstream hostname /// * `cursor_handler` - Optional cursor handler for resumable event processing /// * `initial_cursor` - Optional starting cursor position (time_us) to resume from /// * `redis_url` - Optional Redis URL for caching (falls back to in-memory if not provided) pub async fn new( database: Database, jetstream_hostname: Option, cursor_handler: Option>, initial_cursor: Option, redis_url: Option, ) -> Result { let config = ConsumerTaskConfig { user_agent: "slice-server/1.0".to_string(), compression: false, zstd_dictionary_location: String::new(), jetstream_hostname: jetstream_hostname .unwrap_or_else(|| "jetstream1.us-east.bsky.network".to_string()), collections: Vec::new(), dids: Vec::new(), max_message_size_bytes: None, cursor: initial_cursor, require_hello: true, }; let consumer = Consumer::new(config); let http_client = Client::new(); // Determine cache backend based on Redis URL let cache_backend = if let Some(redis_url) = redis_url { CacheBackend::Redis { url: redis_url, ttl_seconds: None, } } else { CacheBackend::InMemory { ttl_seconds: None } }; // Create cache instances let actor_cache = Arc::new(Mutex::new( CacheFactory::create_slice_cache(cache_backend.clone()) .await .map_err(|e| JetstreamError::ConnectionFailed { message: format!("Failed to create actor cache: {}", e), })?, )); let lexicon_cache = Arc::new(Mutex::new( CacheFactory::create_slice_cache(cache_backend.clone()) .await .map_err(|e| JetstreamError::ConnectionFailed { message: format!("Failed to create lexicon cache: {}", e), })?, )); let domain_cache = Arc::new(Mutex::new( CacheFactory::create_slice_cache(cache_backend.clone()) .await .map_err(|e| JetstreamError::ConnectionFailed { message: format!("Failed to create domain cache: {}", e), })?, )); let collections_cache = Arc::new(Mutex::new( CacheFactory::create_slice_cache(cache_backend) .await .map_err(|e| JetstreamError::ConnectionFailed { message: format!("Failed to create collections cache: {}", e), })?, )); Ok(Self { consumer, database, http_client, actor_cache, lexicon_cache, domain_cache, collections_cache, event_count: Arc::new(std::sync::atomic::AtomicU64::new(0)), cursor_handler, slices_list: Arc::new(RwLock::new(Vec::new())), }) } /// Load slice configurations pub async fn load_slice_configurations(&self) -> Result<(), JetstreamError> { info!("Loading slice configurations..."); // Get all slices and update cached list let slices = self.database.get_all_slices().await?; *self.slices_list.write().await = slices.clone(); info!("Found {} total slices in database", slices.len()); Ok(()) } /// Preload actor cache to avoid database hits during event processing async fn preload_actor_cache(&self) -> Result<(), JetstreamError> { info!("Preloading actor cache..."); let actors = self.database.get_all_actors().await?; info!("Found {} actors to cache", actors.len()); match self.actor_cache.lock().await.preload_actors(actors).await { Ok(_) => { info!("Actor cache preloaded successfully"); Ok(()) } Err(e) => { warn!(error = ?e, "Failed to preload actors to cache"); Ok(()) // Don't fail startup if preload fails } } } /// Start consuming events from Jetstream pub async fn start_consuming( &self, cancellation_token: CancellationToken, ) -> Result<(), JetstreamError> { info!("Starting Jetstream consumer"); // Load initial slice configurations self.load_slice_configurations().await?; // Preload actor cache self.preload_actor_cache().await?; // Create and register the event handler let handler = Arc::new(SliceEventHandler { database: self.database.clone(), http_client: self.http_client.clone(), event_count: self.event_count.clone(), actor_cache: self.actor_cache.clone(), lexicon_cache: self.lexicon_cache.clone(), domain_cache: self.domain_cache.clone(), collections_cache: self.collections_cache.clone(), cursor_handler: self.cursor_handler.clone(), slices_list: self.slices_list.clone(), }); self.consumer.register_handler(handler).await.map_err(|e| { JetstreamError::ConnectionFailed { message: format!("Failed to register event handler: {}", e), } })?; // Start periodic status reporting (with cancellation support) let event_count_for_status = self.event_count.clone(); let cancellation_token_for_status = cancellation_token.clone(); tokio::spawn(async move { let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(60)); // Every minute loop { tokio::select! { _ = interval.tick() => { let count = event_count_for_status.load(std::sync::atomic::Ordering::Relaxed); info!( "Jetstream consumer status: {} total events processed", count ); } _ = cancellation_token_for_status.cancelled() => { info!("Status reporting task cancelled"); break; } } } }); // Start the consumer info!("Starting Jetstream background consumer..."); let result = self .consumer .run_background(cancellation_token) .await .map_err(|e| JetstreamError::ConnectionFailed { message: format!("Consumer failed: {}", e), }); // Force write cursor on shutdown to ensure latest position is persisted if let Some(cursor_handler) = &self.cursor_handler { if let Err(e) = cursor_handler.force_write_cursor().await { error!("Failed to write final cursor position: {}", e); } else { info!("Final cursor position written to database"); } } result?; Ok(()) } /// Periodically reload slice configurations and actor cache to pick up new slices/collections/actors pub fn start_configuration_reloader(consumer: Arc) { tokio::spawn(async move { let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(300)); // Reload every 5 minutes interval.tick().await; // Skip first immediate tick loop { interval.tick().await; if let Err(e) = consumer.load_slice_configurations().await { error!("Failed to reload slice configurations: {}", e); } if let Err(e) = consumer.preload_actor_cache().await { error!("Failed to reload actor cache: {}", e); } } }); } }