Highly ambitious ATProtocol AppView service and sdks
at main 44 kB view raw
1use anyhow::Result; 2use async_trait::async_trait; 3use atproto_jetstream::{ 4 CancellationToken, Consumer, ConsumerTaskConfig, EventHandler, JetstreamEvent, 5}; 6use chrono::Utc; 7use reqwest::Client; 8use std::collections::HashSet; 9use std::sync::Arc; 10use tokio::sync::{Mutex, RwLock}; 11use tracing::{error, info, warn}; 12 13use crate::actor_resolver::resolve_actor_data; 14use crate::cache::{CacheBackend, CacheFactory, SliceCache}; 15use crate::database::Database; 16use crate::errors::JetstreamError; 17use crate::graphql::{publish_jetstream_log, RecordOperation, RecordUpdateEvent, PUBSUB}; 18use crate::jetstream_cursor::PostgresCursorHandler; 19use crate::logging::{LogEntry, LogLevel, Logger}; 20use crate::models::{Actor, Record}; 21 22pub struct JetstreamConsumer { 23 consumer: Consumer, 24 database: Database, 25 http_client: Client, 26 actor_cache: Arc<Mutex<SliceCache>>, 27 lexicon_cache: Arc<Mutex<SliceCache>>, 28 domain_cache: Arc<Mutex<SliceCache>>, 29 collections_cache: Arc<Mutex<SliceCache>>, 30 pub event_count: Arc<std::sync::atomic::AtomicU64>, 31 cursor_handler: Option<Arc<PostgresCursorHandler>>, 32 slices_list: Arc<RwLock<Vec<String>>>, 33} 34 35// Event handler that implements the EventHandler trait 36struct SliceEventHandler { 37 database: Database, 38 http_client: Client, 39 event_count: Arc<std::sync::atomic::AtomicU64>, 40 actor_cache: Arc<Mutex<SliceCache>>, 41 lexicon_cache: Arc<Mutex<SliceCache>>, 42 domain_cache: Arc<Mutex<SliceCache>>, 43 collections_cache: Arc<Mutex<SliceCache>>, 44 cursor_handler: Option<Arc<PostgresCursorHandler>>, 45 slices_list: Arc<RwLock<Vec<String>>>, 46} 47 48#[async_trait] 49impl EventHandler for SliceEventHandler { 50 async fn handle_event(&self, event: JetstreamEvent) -> Result<()> { 51 let count = self 52 .event_count 53 .fetch_add(1, std::sync::atomic::Ordering::Relaxed) 54 + 1; 55 56 if count.is_multiple_of(10000) { 57 info!("Jetstream consumer has processed {} events", count); 58 } 59 60 // Extract and update cursor position from event 61 let time_us = match &event { 62 JetstreamEvent::Commit { time_us, .. } => *time_us, 63 JetstreamEvent::Delete { time_us, .. } => *time_us, 64 JetstreamEvent::Identity { time_us, .. } => *time_us, 65 JetstreamEvent::Account { time_us, .. } => *time_us, 66 }; 67 68 if let Some(cursor_handler) = &self.cursor_handler { 69 cursor_handler.update_position(time_us); 70 71 // Periodically write cursor to DB (debounced by handler) 72 if let Err(e) = cursor_handler.maybe_write_cursor().await { 73 error!("Failed to write cursor: {}", e); 74 } 75 } 76 77 match event { 78 JetstreamEvent::Commit { did, commit, .. } => { 79 if let Err(e) = self.handle_commit_event(&did, commit).await { 80 let message = format!("Error handling commit event: {}", e); 81 error!("{}", message); 82 Logger::global().log_jetstream( 83 LogLevel::Error, 84 &message, 85 Some(serde_json::json!({ 86 "error": e.to_string(), 87 "did": did, 88 "event_type": "commit" 89 })), 90 ); 91 } 92 } 93 JetstreamEvent::Delete { did, commit, .. } => { 94 if let Err(e) = self.handle_delete_event(&did, commit).await { 95 let message = format!("Error handling delete event: {}", e); 96 error!("{}", message); 97 Logger::global().log_jetstream( 98 LogLevel::Error, 99 &message, 100 Some(serde_json::json!({ 101 "error": e.to_string(), 102 "did": did, 103 "event_type": "delete" 104 })), 105 ); 106 } 107 } 108 _ => { 109 // Ignore other event types (identity, account, etc.) 110 } 111 } 112 Ok(()) 113 } 114 115 fn handler_id(&self) -> String { 116 "slice-records-indexer".to_string() 117 } 118} 119 120impl SliceEventHandler { 121 /// Check if DID is an actor for the given slice 122 async fn is_actor_cached( 123 &self, 124 did: &str, 125 slice_uri: &str, 126 ) -> Result<Option<bool>, anyhow::Error> { 127 match self.actor_cache.lock().await.is_actor(did, slice_uri).await { 128 Ok(result) => Ok(result), 129 Err(e) => { 130 warn!( 131 error = ?e, 132 did = did, 133 slice_uri = slice_uri, 134 "Actor cache error" 135 ); 136 Ok(None) 137 } 138 } 139 } 140 141 /// Cache that an actor exists 142 async fn cache_actor_exists(&self, did: &str, slice_uri: &str) { 143 if let Err(e) = self 144 .actor_cache 145 .lock() 146 .await 147 .cache_actor_exists(did, slice_uri) 148 .await 149 { 150 warn!( 151 error = ?e, 152 did = did, 153 slice_uri = slice_uri, 154 "Failed to cache actor exists" 155 ); 156 } 157 } 158 159 /// Remove actor from cache 160 async fn remove_actor_from_cache(&self, did: &str, slice_uri: &str) { 161 if let Err(e) = self 162 .actor_cache 163 .lock() 164 .await 165 .remove_actor(did, slice_uri) 166 .await 167 { 168 warn!( 169 error = ?e, 170 did = did, 171 slice_uri = slice_uri, 172 "Failed to remove actor from cache" 173 ); 174 } 175 } 176 177 /// Log to database AND publish to GraphQL subscribers 178 /// 179 /// This helper ensures that log entries are both persisted to the database 180 /// and streamed in real-time to any active GraphQL subscriptions. 181 async fn log_and_publish( 182 &self, 183 level: LogLevel, 184 message: &str, 185 metadata: Option<serde_json::Value>, 186 slice_uri: Option<&str>, 187 ) { 188 // First, log to database (batched write) 189 Logger::global().log_jetstream_with_slice(level.clone(), message, metadata.clone(), slice_uri); 190 191 // Then, publish to GraphQL subscribers in real-time 192 // We need to query the most recent log entry we just created 193 // Since logging is batched, we create a LogEntry manually for immediate publishing 194 if let Some(slice) = slice_uri { 195 let log_entry = LogEntry { 196 id: 0, // Will be set by database, not critical for real-time streaming 197 created_at: Utc::now(), 198 log_type: "jetstream".to_string(), 199 job_id: None, 200 user_did: None, 201 slice_uri: Some(slice.to_string()), 202 level: level.as_str().to_string(), 203 message: message.to_string(), 204 metadata, 205 }; 206 207 publish_jetstream_log(log_entry).await; 208 } 209 } 210 211 /// Get slice collections from cache with database fallback 212 async fn get_slice_collections( 213 &self, 214 slice_uri: &str, 215 ) -> Result<Option<HashSet<String>>, anyhow::Error> { 216 // Try cache first 217 let cache_result = { 218 let mut cache = self.collections_cache.lock().await; 219 cache.get_slice_collections(slice_uri).await 220 }; 221 222 match cache_result { 223 Ok(Some(collections)) => Ok(Some(collections)), 224 Ok(None) => { 225 // Cache miss - load from database 226 match self.database.get_slice_collections_list(slice_uri).await { 227 Ok(collections) => { 228 let collections_set: HashSet<String> = collections.into_iter().collect(); 229 // Cache the result 230 let _ = self 231 .collections_cache 232 .lock() 233 .await 234 .cache_slice_collections(slice_uri, &collections_set) 235 .await; 236 Ok(Some(collections_set)) 237 } 238 Err(e) => Err(e.into()), 239 } 240 } 241 Err(e) => Err(e), 242 } 243 } 244 245 /// Get slice domain from cache with database fallback 246 async fn get_slice_domain(&self, slice_uri: &str) -> Result<Option<String>, anyhow::Error> { 247 // Try cache first 248 let cache_result = { 249 let mut cache = self.domain_cache.lock().await; 250 cache.get_slice_domain(slice_uri).await 251 }; 252 253 match cache_result { 254 Ok(Some(domain)) => Ok(Some(domain)), 255 Ok(None) => { 256 // Cache miss - load from database 257 match self.database.get_slice_domain(slice_uri).await { 258 Ok(Some(domain)) => { 259 // Cache the result 260 let _ = self 261 .domain_cache 262 .lock() 263 .await 264 .cache_slice_domain(slice_uri, &domain) 265 .await; 266 Ok(Some(domain)) 267 } 268 Ok(None) => Ok(None), 269 Err(e) => Err(e.into()), 270 } 271 } 272 Err(e) => Err(e), 273 } 274 } 275 276 /// Get slice lexicons from cache with database fallback 277 async fn get_slice_lexicons( 278 &self, 279 slice_uri: &str, 280 ) -> Result<Option<Vec<serde_json::Value>>, anyhow::Error> { 281 // Try cache first 282 let cache_result = { 283 let mut cache = self.lexicon_cache.lock().await; 284 cache.get_lexicons(slice_uri).await 285 }; 286 287 match cache_result { 288 Ok(Some(lexicons)) => Ok(Some(lexicons)), 289 Ok(None) => { 290 // Cache miss - load from database 291 match self.database.get_lexicons_by_slice(slice_uri).await { 292 Ok(lexicons) if !lexicons.is_empty() => { 293 // Cache the result 294 let _ = self 295 .lexicon_cache 296 .lock() 297 .await 298 .cache_lexicons(slice_uri, &lexicons) 299 .await; 300 Ok(Some(lexicons)) 301 } 302 Ok(_) => Ok(None), // Empty lexicons 303 Err(e) => Err(e.into()), 304 } 305 } 306 Err(e) => Err(e), 307 } 308 } 309 async fn handle_commit_event( 310 &self, 311 did: &str, 312 commit: atproto_jetstream::JetstreamEventCommit, 313 ) -> Result<()> { 314 // Get all slices from cached list 315 let slices = self.slices_list.read().await.clone(); 316 317 // Process each slice 318 for slice_uri in slices { 319 // Get collections for this slice (with caching) 320 let collections = match self.get_slice_collections(&slice_uri).await { 321 Ok(Some(collections)) => collections, 322 Ok(None) => continue, // No collections for this slice 323 Err(e) => { 324 error!("Failed to get collections for slice {}: {}", slice_uri, e); 325 continue; 326 } 327 }; 328 329 if collections.contains(&commit.collection) { 330 // Special handling for network.slices.lexicon records 331 // These should only be indexed to the slice specified in their JSON data 332 let is_lexicon_for_this_slice = if commit.collection == "network.slices.lexicon" { 333 if let Some(target_slice_uri) = 334 commit.record.get("slice").and_then(|v| v.as_str()) 335 { 336 // Skip this slice if it's not the target slice for this lexicon 337 if slice_uri != target_slice_uri { 338 continue; 339 } 340 true // This is a lexicon record for this specific slice 341 } else { 342 // No target slice specified, skip this lexicon record entirely 343 continue; 344 } 345 } else { 346 false 347 }; 348 349 // Get the domain for this slice (with caching) 350 let domain = match self.get_slice_domain(&slice_uri).await { 351 Ok(Some(domain)) => domain, 352 Ok(None) => continue, // No domain, skip 353 Err(e) => { 354 error!("Failed to get domain for slice {}: {}", slice_uri, e); 355 continue; 356 } 357 }; 358 359 // Check if this is a primary collection (starts with slice domain) 360 // Lexicon records for this slice are always treated as primary 361 let is_primary_collection = 362 commit.collection.starts_with(&domain) || is_lexicon_for_this_slice; 363 364 // For external collections, check actor status BEFORE expensive validation 365 if !is_primary_collection { 366 let is_actor = match self.is_actor_cached(did, &slice_uri).await { 367 Ok(Some(cached_result)) => cached_result, 368 Ok(None) => { 369 // Cache miss means this DID is not an actor we've synced 370 // For external collections, we only care about actors we've already added 371 false 372 } 373 Err(e) => { 374 error!("Error checking actor status: {}", e); 375 continue; 376 } 377 }; 378 379 if !is_actor { 380 // Not an actor - skip validation entirely for external collections 381 continue; 382 } 383 } 384 385 // Get lexicons for validation (after actor check for external collections) 386 let lexicons = match self.get_slice_lexicons(&slice_uri).await { 387 Ok(Some(lexicons)) => lexicons, 388 Ok(None) => { 389 info!( 390 "No lexicons found for slice {} - skipping validation", 391 slice_uri 392 ); 393 continue; 394 } 395 Err(e) => { 396 error!("Failed to get lexicons for slice {}: {}", slice_uri, e); 397 continue; 398 } 399 }; 400 401 // Validate the record against the slice's lexicons 402 let validation_result = match slices_lexicon::validate_record( 403 lexicons.clone(), 404 &commit.collection, 405 commit.record.clone(), 406 ) { 407 Ok(_) => { 408 info!( 409 "Record validated for collection {} in slice {}", 410 commit.collection, slice_uri 411 ); 412 true 413 } 414 Err(e) => { 415 let message = format!( 416 "Validation failed for collection {} in slice {}", 417 commit.collection, slice_uri 418 ); 419 error!("{}: {}", message, e); 420 self.log_and_publish( 421 LogLevel::Warn, 422 &message, 423 Some(serde_json::json!({ 424 "collection": commit.collection, 425 "slice_uri": slice_uri, 426 "did": did 427 })), 428 Some(&slice_uri), 429 ).await; 430 false 431 } 432 }; 433 434 if !validation_result { 435 continue; // Skip this slice if validation fails 436 } 437 438 if is_primary_collection { 439 // Primary collection - ensure actor exists and index ALL records 440 info!( 441 "Primary collection {} for slice {} (domain: {}) - indexing record", 442 commit.collection, slice_uri, domain 443 ); 444 445 // Ensure actor exists for primary collections (except lexicons) 446 // Lexicons don't create actors - they just get indexed 447 if !is_lexicon_for_this_slice { 448 let is_cached = 449 matches!(self.is_actor_cached(did, &slice_uri).await, Ok(Some(_))); 450 451 if !is_cached { 452 // Actor not in cache - create it 453 info!("Creating new actor {} for slice {}", did, slice_uri); 454 455 // Resolve actor data (handle, PDS) 456 match resolve_actor_data(&self.http_client, did).await { 457 Ok(actor_data) => { 458 let actor = Actor { 459 did: actor_data.did.clone(), 460 handle: actor_data.handle, 461 slice_uri: slice_uri.clone(), 462 indexed_at: Utc::now().to_rfc3339(), 463 }; 464 465 // Insert into database 466 if let Err(e) = 467 self.database.batch_insert_actors(&[actor]).await 468 { 469 error!("Failed to create actor {}: {}", did, e); 470 } else { 471 // Add to cache after successful database insert 472 self.cache_actor_exists(did, &slice_uri).await; 473 info!("Created actor {} for slice {}", did, slice_uri); 474 } 475 } 476 Err(e) => { 477 error!("Failed to resolve actor data for {}: {}", did, e); 478 } 479 } 480 } 481 } 482 483 let uri = format!("at://{}/{}/{}", did, commit.collection, commit.rkey); 484 485 let record = Record { 486 uri: uri.clone(), 487 cid: commit.cid.clone(), 488 did: did.to_string(), 489 collection: commit.collection.clone(), 490 json: commit.record.clone(), 491 indexed_at: Utc::now(), 492 slice_uri: Some(slice_uri.clone()), 493 }; 494 495 match self.database.upsert_record(&record).await { 496 Ok(is_insert) => { 497 let message = if is_insert { 498 format!("Record inserted in {}", commit.collection) 499 } else { 500 format!("Record updated in {}", commit.collection) 501 }; 502 self.log_and_publish( 503 LogLevel::Info, 504 &message, 505 Some(serde_json::json!({ 506 "did": did, 507 "kind": "commit", 508 "commit": { 509 "rev": commit.rev, 510 "operation": commit.operation, 511 "collection": commit.collection, 512 "rkey": commit.rkey, 513 "record": commit.record, 514 "cid": commit.cid 515 }, 516 "indexed_operation": if is_insert { "insert" } else { "update" }, 517 "record_type": "primary" 518 })), 519 Some(&slice_uri), 520 ).await; 521 522 // Broadcast to GraphQL subscribers 523 let event = RecordUpdateEvent { 524 uri: uri.clone(), 525 cid: commit.cid.clone(), 526 did: did.to_string(), 527 collection: commit.collection.clone(), 528 value: commit.record.clone(), 529 slice_uri: slice_uri.clone(), 530 indexed_at: record.indexed_at.to_rfc3339(), 531 operation: if is_insert { 532 RecordOperation::Create 533 } else { 534 RecordOperation::Update 535 }, 536 }; 537 PUBSUB.publish(event).await; 538 } 539 Err(e) => { 540 let message = "Failed to insert/update record"; 541 self.log_and_publish( 542 LogLevel::Error, 543 message, 544 Some(serde_json::json!({ 545 "did": did, 546 "kind": "commit", 547 "commit": { 548 "rev": commit.rev, 549 "operation": commit.operation, 550 "collection": commit.collection, 551 "rkey": commit.rkey, 552 "record": commit.record, 553 "cid": commit.cid 554 }, 555 "error": e.to_string(), 556 "record_type": "primary" 557 })), 558 Some(&slice_uri), 559 ).await; 560 return Err(anyhow::anyhow!("Database error: {}", e)); 561 } 562 } 563 564 info!( 565 "Successfully indexed {} record from primary collection: {}", 566 commit.operation, uri 567 ); 568 break; 569 } else { 570 // External collection - we already checked actor status, so just index 571 info!( 572 "External collection {} - DID {} is actor in slice {} - indexing", 573 commit.collection, did, slice_uri 574 ); 575 576 let uri = format!("at://{}/{}/{}", did, commit.collection, commit.rkey); 577 578 let record = Record { 579 uri: uri.clone(), 580 cid: commit.cid.clone(), 581 did: did.to_string(), 582 collection: commit.collection.clone(), 583 json: commit.record.clone(), 584 indexed_at: Utc::now(), 585 slice_uri: Some(slice_uri.clone()), 586 }; 587 588 match self.database.upsert_record(&record).await { 589 Ok(is_insert) => { 590 let message = if is_insert { 591 format!("Record inserted in {}", commit.collection) 592 } else { 593 format!("Record updated in {}", commit.collection) 594 }; 595 self.log_and_publish( 596 LogLevel::Info, 597 &message, 598 Some(serde_json::json!({ 599 "did": did, 600 "kind": "commit", 601 "commit": { 602 "rev": commit.rev, 603 "operation": commit.operation, 604 "collection": commit.collection, 605 "rkey": commit.rkey, 606 "record": commit.record, 607 "cid": commit.cid 608 }, 609 "indexed_operation": if is_insert { "insert" } else { "update" }, 610 "record_type": "external" 611 })), 612 Some(&slice_uri), 613 ).await; 614 615 // Broadcast to GraphQL subscribers 616 let event = RecordUpdateEvent { 617 uri: uri.clone(), 618 cid: commit.cid.clone(), 619 did: did.to_string(), 620 collection: commit.collection.clone(), 621 value: commit.record.clone(), 622 slice_uri: slice_uri.clone(), 623 indexed_at: record.indexed_at.to_rfc3339(), 624 operation: if is_insert { 625 RecordOperation::Create 626 } else { 627 RecordOperation::Update 628 }, 629 }; 630 PUBSUB.publish(event).await; 631 } 632 Err(e) => { 633 let message = "Failed to insert/update record"; 634 self.log_and_publish( 635 LogLevel::Error, 636 message, 637 Some(serde_json::json!({ 638 "did": did, 639 "kind": "commit", 640 "commit": { 641 "rev": commit.rev, 642 "operation": commit.operation, 643 "collection": commit.collection, 644 "rkey": commit.rkey, 645 "record": commit.record, 646 "cid": commit.cid 647 }, 648 "error": e.to_string(), 649 "record_type": "external" 650 })), 651 Some(&slice_uri), 652 ).await; 653 return Err(anyhow::anyhow!("Database error: {}", e)); 654 } 655 } 656 657 info!( 658 "Successfully indexed {} record from external collection: {}", 659 commit.operation, uri 660 ); 661 break; 662 } 663 } 664 } 665 666 Ok(()) 667 } 668 669 async fn handle_delete_event( 670 &self, 671 did: &str, 672 commit: atproto_jetstream::JetstreamEventDelete, 673 ) -> Result<()> { 674 let uri = format!("at://{}/{}/{}", did, commit.collection, commit.rkey); 675 676 // Get all slices from cached list 677 let slices = self.slices_list.read().await.clone(); 678 679 let mut relevant_slices: Vec<String> = Vec::new(); 680 681 for slice_uri in slices { 682 // Get collections for this slice (with caching) 683 let collections = match self.get_slice_collections(&slice_uri).await { 684 Ok(Some(collections)) => collections, 685 Ok(None) => continue, // No collections for this slice 686 Err(e) => { 687 error!("Failed to get collections for slice {}: {}", slice_uri, e); 688 continue; 689 } 690 }; 691 692 if !collections.contains(&commit.collection) { 693 continue; 694 } 695 696 // Get the domain for this slice (with caching) 697 let domain = match self.get_slice_domain(&slice_uri).await { 698 Ok(Some(domain)) => domain, 699 Ok(None) => continue, // No domain, skip 700 Err(e) => { 701 error!("Failed to get domain for slice {}: {}", slice_uri, e); 702 continue; 703 } 704 }; 705 706 // Check if this is a primary collection (starts with slice domain) 707 let is_primary_collection = commit.collection.starts_with(&domain); 708 709 if is_primary_collection { 710 // Primary collection - always process deletes 711 relevant_slices.push(slice_uri.clone()); 712 } else { 713 // External collection - only process if DID is an actor in this slice 714 let is_actor = match self.is_actor_cached(did, &slice_uri).await { 715 Ok(Some(cached_result)) => cached_result, 716 _ => false, 717 }; 718 if is_actor { 719 relevant_slices.push(slice_uri.clone()); 720 } 721 } 722 } 723 724 if relevant_slices.is_empty() { 725 // No relevant slices found, skip deletion 726 return Ok(()); 727 } 728 729 // Handle cascade deletion before deleting the record 730 if let Err(e) = self 731 .database 732 .handle_cascade_deletion(&uri, &commit.collection) 733 .await 734 { 735 warn!("Cascade deletion failed for {}: {}", uri, e); 736 } 737 738 // Delete the record and log only for relevant slices 739 match self.database.delete_record_by_uri(&uri, None).await { 740 Ok(rows_affected) => { 741 if rows_affected > 0 { 742 info!( 743 "Deleted record: {} ({} rows) for {} slice(s)", 744 uri, 745 rows_affected, 746 relevant_slices.len() 747 ); 748 let message = format!("Record deleted from {}", commit.collection); 749 750 // Log to each relevant slice and check if actor cleanup is needed 751 for slice_uri in &relevant_slices { 752 self.log_and_publish( 753 LogLevel::Info, 754 &message, 755 Some(serde_json::json!({ 756 "operation": "delete", 757 "collection": commit.collection, 758 "did": did, 759 "uri": uri, 760 "rows_affected": rows_affected 761 })), 762 Some(slice_uri), 763 ).await; 764 765 // Broadcast delete event to GraphQL subscribers 766 let event = RecordUpdateEvent { 767 uri: uri.clone(), 768 cid: String::new(), // No CID for delete events 769 did: did.to_string(), 770 collection: commit.collection.clone(), 771 value: serde_json::json!({}), // Empty value for deletes 772 slice_uri: slice_uri.clone(), 773 indexed_at: Utc::now().to_rfc3339(), 774 operation: RecordOperation::Delete, 775 }; 776 PUBSUB.publish(event).await; 777 } 778 779 // Check if actor should be cleaned up (no more records) 780 for slice_uri in &relevant_slices { 781 match self.database.actor_has_records(did, slice_uri).await { 782 Ok(has_records) => { 783 if !has_records { 784 // No more records for this actor in this slice - clean up 785 match self.database.delete_actor(did, slice_uri).await { 786 Ok(deleted) => { 787 if deleted > 0 { 788 info!( 789 "Cleaned up actor {} from slice {} (no records remaining)", 790 did, slice_uri 791 ); 792 // Remove from cache 793 self.remove_actor_from_cache(did, slice_uri).await; 794 } 795 } 796 Err(e) => { 797 error!( 798 "Failed to delete actor {} from slice {}: {}", 799 did, slice_uri, e 800 ); 801 } 802 } 803 } 804 } 805 Err(e) => { 806 error!( 807 "Failed to check if actor {} has records in slice {}: {}", 808 did, slice_uri, e 809 ); 810 } 811 } 812 } 813 } 814 } 815 Err(e) => { 816 let message = "Failed to delete record"; 817 error!("{}: {}", message, e); 818 819 // Log error to each relevant slice 820 for slice_uri in relevant_slices { 821 self.log_and_publish( 822 LogLevel::Error, 823 message, 824 Some(serde_json::json!({ 825 "operation": "delete", 826 "collection": commit.collection, 827 "did": did, 828 "uri": uri, 829 "error": e.to_string() 830 })), 831 Some(&slice_uri), 832 ).await; 833 } 834 } 835 } 836 837 Ok(()) 838 } 839} 840 841impl JetstreamConsumer { 842 /// Create a new Jetstream consumer with optional cursor support and Redis cache 843 /// 844 /// # Arguments 845 /// * `database` - Database connection for slice configurations and record storage 846 /// * `jetstream_hostname` - Optional custom jetstream hostname 847 /// * `cursor_handler` - Optional cursor handler for resumable event processing 848 /// * `initial_cursor` - Optional starting cursor position (time_us) to resume from 849 /// * `redis_url` - Optional Redis URL for caching (falls back to in-memory if not provided) 850 pub async fn new( 851 database: Database, 852 jetstream_hostname: Option<String>, 853 cursor_handler: Option<Arc<PostgresCursorHandler>>, 854 initial_cursor: Option<i64>, 855 redis_url: Option<String>, 856 ) -> Result<Self, JetstreamError> { 857 let config = ConsumerTaskConfig { 858 user_agent: "slice-server/1.0".to_string(), 859 compression: false, 860 zstd_dictionary_location: String::new(), 861 jetstream_hostname: jetstream_hostname 862 .unwrap_or_else(|| "jetstream1.us-east.bsky.network".to_string()), 863 collections: Vec::new(), 864 dids: Vec::new(), 865 max_message_size_bytes: None, 866 cursor: initial_cursor, 867 require_hello: true, 868 }; 869 870 let consumer = Consumer::new(config); 871 let http_client = Client::new(); 872 873 // Determine cache backend based on Redis URL 874 let cache_backend = if let Some(redis_url) = redis_url { 875 CacheBackend::Redis { 876 url: redis_url, 877 ttl_seconds: None, 878 } 879 } else { 880 CacheBackend::InMemory { ttl_seconds: None } 881 }; 882 883 // Create cache instances 884 let actor_cache = Arc::new(Mutex::new( 885 CacheFactory::create_slice_cache(cache_backend.clone()) 886 .await 887 .map_err(|e| JetstreamError::ConnectionFailed { 888 message: format!("Failed to create actor cache: {}", e), 889 })?, 890 )); 891 892 let lexicon_cache = Arc::new(Mutex::new( 893 CacheFactory::create_slice_cache(cache_backend.clone()) 894 .await 895 .map_err(|e| JetstreamError::ConnectionFailed { 896 message: format!("Failed to create lexicon cache: {}", e), 897 })?, 898 )); 899 900 let domain_cache = Arc::new(Mutex::new( 901 CacheFactory::create_slice_cache(cache_backend.clone()) 902 .await 903 .map_err(|e| JetstreamError::ConnectionFailed { 904 message: format!("Failed to create domain cache: {}", e), 905 })?, 906 )); 907 908 let collections_cache = Arc::new(Mutex::new( 909 CacheFactory::create_slice_cache(cache_backend) 910 .await 911 .map_err(|e| JetstreamError::ConnectionFailed { 912 message: format!("Failed to create collections cache: {}", e), 913 })?, 914 )); 915 916 Ok(Self { 917 consumer, 918 database, 919 http_client, 920 actor_cache, 921 lexicon_cache, 922 domain_cache, 923 collections_cache, 924 event_count: Arc::new(std::sync::atomic::AtomicU64::new(0)), 925 cursor_handler, 926 slices_list: Arc::new(RwLock::new(Vec::new())), 927 }) 928 } 929 930 /// Load slice configurations 931 pub async fn load_slice_configurations(&self) -> Result<(), JetstreamError> { 932 info!("Loading slice configurations..."); 933 934 // Get all slices and update cached list 935 let slices = self.database.get_all_slices().await?; 936 *self.slices_list.write().await = slices.clone(); 937 info!("Found {} total slices in database", slices.len()); 938 939 Ok(()) 940 } 941 942 /// Preload actor cache to avoid database hits during event processing 943 async fn preload_actor_cache(&self) -> Result<(), JetstreamError> { 944 info!("Preloading actor cache..."); 945 946 let actors = self.database.get_all_actors().await?; 947 info!("Found {} actors to cache", actors.len()); 948 949 match self.actor_cache.lock().await.preload_actors(actors).await { 950 Ok(_) => { 951 info!("Actor cache preloaded successfully"); 952 Ok(()) 953 } 954 Err(e) => { 955 warn!(error = ?e, "Failed to preload actors to cache"); 956 Ok(()) // Don't fail startup if preload fails 957 } 958 } 959 } 960 961 /// Start consuming events from Jetstream 962 pub async fn start_consuming( 963 &self, 964 cancellation_token: CancellationToken, 965 ) -> Result<(), JetstreamError> { 966 info!("Starting Jetstream consumer"); 967 968 // Load initial slice configurations 969 self.load_slice_configurations().await?; 970 971 // Preload actor cache 972 self.preload_actor_cache().await?; 973 974 // Create and register the event handler 975 let handler = Arc::new(SliceEventHandler { 976 database: self.database.clone(), 977 http_client: self.http_client.clone(), 978 event_count: self.event_count.clone(), 979 actor_cache: self.actor_cache.clone(), 980 lexicon_cache: self.lexicon_cache.clone(), 981 domain_cache: self.domain_cache.clone(), 982 collections_cache: self.collections_cache.clone(), 983 cursor_handler: self.cursor_handler.clone(), 984 slices_list: self.slices_list.clone(), 985 }); 986 987 self.consumer.register_handler(handler).await.map_err(|e| { 988 JetstreamError::ConnectionFailed { 989 message: format!("Failed to register event handler: {}", e), 990 } 991 })?; 992 993 // Start periodic status reporting (with cancellation support) 994 let event_count_for_status = self.event_count.clone(); 995 let cancellation_token_for_status = cancellation_token.clone(); 996 tokio::spawn(async move { 997 let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(60)); // Every minute 998 loop { 999 tokio::select! { 1000 _ = interval.tick() => { 1001 let count = event_count_for_status.load(std::sync::atomic::Ordering::Relaxed); 1002 info!( 1003 "Jetstream consumer status: {} total events processed", 1004 count 1005 ); 1006 } 1007 _ = cancellation_token_for_status.cancelled() => { 1008 info!("Status reporting task cancelled"); 1009 break; 1010 } 1011 } 1012 } 1013 }); 1014 1015 // Start the consumer 1016 info!("Starting Jetstream background consumer..."); 1017 let result = self 1018 .consumer 1019 .run_background(cancellation_token) 1020 .await 1021 .map_err(|e| JetstreamError::ConnectionFailed { 1022 message: format!("Consumer failed: {}", e), 1023 }); 1024 1025 // Force write cursor on shutdown to ensure latest position is persisted 1026 if let Some(cursor_handler) = &self.cursor_handler { 1027 if let Err(e) = cursor_handler.force_write_cursor().await { 1028 error!("Failed to write final cursor position: {}", e); 1029 } else { 1030 info!("Final cursor position written to database"); 1031 } 1032 } 1033 1034 result?; 1035 Ok(()) 1036 } 1037 1038 /// Periodically reload slice configurations and actor cache to pick up new slices/collections/actors 1039 pub fn start_configuration_reloader(consumer: Arc<Self>) { 1040 tokio::spawn(async move { 1041 let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(300)); // Reload every 5 minutes 1042 interval.tick().await; // Skip first immediate tick 1043 1044 loop { 1045 interval.tick().await; 1046 1047 if let Err(e) = consumer.load_slice_configurations().await { 1048 error!("Failed to reload slice configurations: {}", e); 1049 } 1050 1051 if let Err(e) = consumer.preload_actor_cache().await { 1052 error!("Failed to reload actor cache: {}", e); 1053 } 1054 } 1055 }); 1056 } 1057}