Highly ambitious ATProtocol AppView service and sdks
138
fork

Configure Feed

Select the types of activity you want to include in your feed.

at 9cca9073c186d254eb784f7d3b15fd97ffeca64f 1057 lines 44 kB view raw
1use anyhow::Result; 2use async_trait::async_trait; 3use atproto_jetstream::{ 4 CancellationToken, Consumer, ConsumerTaskConfig, EventHandler, JetstreamEvent, 5}; 6use chrono::Utc; 7use reqwest::Client; 8use std::collections::HashSet; 9use std::sync::Arc; 10use tokio::sync::{Mutex, RwLock}; 11use tracing::{error, info, warn}; 12 13use crate::actor_resolver::resolve_actor_data; 14use crate::cache::{CacheBackend, CacheFactory, SliceCache}; 15use crate::database::Database; 16use crate::errors::JetstreamError; 17use crate::graphql::{publish_jetstream_log, RecordOperation, RecordUpdateEvent, PUBSUB}; 18use crate::jetstream_cursor::PostgresCursorHandler; 19use crate::logging::{LogEntry, LogLevel, Logger}; 20use crate::models::{Actor, Record}; 21 22pub struct JetstreamConsumer { 23 consumer: Consumer, 24 database: Database, 25 http_client: Client, 26 actor_cache: Arc<Mutex<SliceCache>>, 27 lexicon_cache: Arc<Mutex<SliceCache>>, 28 domain_cache: Arc<Mutex<SliceCache>>, 29 collections_cache: Arc<Mutex<SliceCache>>, 30 pub event_count: Arc<std::sync::atomic::AtomicU64>, 31 cursor_handler: Option<Arc<PostgresCursorHandler>>, 32 slices_list: Arc<RwLock<Vec<String>>>, 33} 34 35// Event handler that implements the EventHandler trait 36struct SliceEventHandler { 37 database: Database, 38 http_client: Client, 39 event_count: Arc<std::sync::atomic::AtomicU64>, 40 actor_cache: Arc<Mutex<SliceCache>>, 41 lexicon_cache: Arc<Mutex<SliceCache>>, 42 domain_cache: Arc<Mutex<SliceCache>>, 43 collections_cache: Arc<Mutex<SliceCache>>, 44 cursor_handler: Option<Arc<PostgresCursorHandler>>, 45 slices_list: Arc<RwLock<Vec<String>>>, 46} 47 48#[async_trait] 49impl EventHandler for SliceEventHandler { 50 async fn handle_event(&self, event: JetstreamEvent) -> Result<()> { 51 let count = self 52 .event_count 53 .fetch_add(1, std::sync::atomic::Ordering::Relaxed) 54 + 1; 55 56 if count.is_multiple_of(10000) { 57 info!("Jetstream consumer has processed {} events", count); 58 } 59 60 // Extract and update cursor position from event 61 let time_us = match &event { 62 JetstreamEvent::Commit { time_us, .. } => *time_us, 63 JetstreamEvent::Delete { time_us, .. } => *time_us, 64 JetstreamEvent::Identity { time_us, .. } => *time_us, 65 JetstreamEvent::Account { time_us, .. } => *time_us, 66 }; 67 68 if let Some(cursor_handler) = &self.cursor_handler { 69 cursor_handler.update_position(time_us); 70 71 // Periodically write cursor to DB (debounced by handler) 72 if let Err(e) = cursor_handler.maybe_write_cursor().await { 73 error!("Failed to write cursor: {}", e); 74 } 75 } 76 77 match event { 78 JetstreamEvent::Commit { did, commit, .. } => { 79 if let Err(e) = self.handle_commit_event(&did, commit).await { 80 let message = format!("Error handling commit event: {}", e); 81 error!("{}", message); 82 Logger::global().log_jetstream( 83 LogLevel::Error, 84 &message, 85 Some(serde_json::json!({ 86 "error": e.to_string(), 87 "did": did, 88 "event_type": "commit" 89 })), 90 ); 91 } 92 } 93 JetstreamEvent::Delete { did, commit, .. } => { 94 if let Err(e) = self.handle_delete_event(&did, commit).await { 95 let message = format!("Error handling delete event: {}", e); 96 error!("{}", message); 97 Logger::global().log_jetstream( 98 LogLevel::Error, 99 &message, 100 Some(serde_json::json!({ 101 "error": e.to_string(), 102 "did": did, 103 "event_type": "delete" 104 })), 105 ); 106 } 107 } 108 _ => { 109 // Ignore other event types (identity, account, etc.) 110 } 111 } 112 Ok(()) 113 } 114 115 fn handler_id(&self) -> String { 116 "slice-records-indexer".to_string() 117 } 118} 119 120impl SliceEventHandler { 121 /// Check if DID is an actor for the given slice 122 async fn is_actor_cached( 123 &self, 124 did: &str, 125 slice_uri: &str, 126 ) -> Result<Option<bool>, anyhow::Error> { 127 match self.actor_cache.lock().await.is_actor(did, slice_uri).await { 128 Ok(result) => Ok(result), 129 Err(e) => { 130 warn!( 131 error = ?e, 132 did = did, 133 slice_uri = slice_uri, 134 "Actor cache error" 135 ); 136 Ok(None) 137 } 138 } 139 } 140 141 /// Cache that an actor exists 142 async fn cache_actor_exists(&self, did: &str, slice_uri: &str) { 143 if let Err(e) = self 144 .actor_cache 145 .lock() 146 .await 147 .cache_actor_exists(did, slice_uri) 148 .await 149 { 150 warn!( 151 error = ?e, 152 did = did, 153 slice_uri = slice_uri, 154 "Failed to cache actor exists" 155 ); 156 } 157 } 158 159 /// Remove actor from cache 160 async fn remove_actor_from_cache(&self, did: &str, slice_uri: &str) { 161 if let Err(e) = self 162 .actor_cache 163 .lock() 164 .await 165 .remove_actor(did, slice_uri) 166 .await 167 { 168 warn!( 169 error = ?e, 170 did = did, 171 slice_uri = slice_uri, 172 "Failed to remove actor from cache" 173 ); 174 } 175 } 176 177 /// Log to database AND publish to GraphQL subscribers 178 /// 179 /// This helper ensures that log entries are both persisted to the database 180 /// and streamed in real-time to any active GraphQL subscriptions. 181 async fn log_and_publish( 182 &self, 183 level: LogLevel, 184 message: &str, 185 metadata: Option<serde_json::Value>, 186 slice_uri: Option<&str>, 187 ) { 188 // First, log to database (batched write) 189 Logger::global().log_jetstream_with_slice(level.clone(), message, metadata.clone(), slice_uri); 190 191 // Then, publish to GraphQL subscribers in real-time 192 // We need to query the most recent log entry we just created 193 // Since logging is batched, we create a LogEntry manually for immediate publishing 194 if let Some(slice) = slice_uri { 195 let log_entry = LogEntry { 196 id: 0, // Will be set by database, not critical for real-time streaming 197 created_at: Utc::now(), 198 log_type: "jetstream".to_string(), 199 job_id: None, 200 user_did: None, 201 slice_uri: Some(slice.to_string()), 202 level: level.as_str().to_string(), 203 message: message.to_string(), 204 metadata, 205 }; 206 207 publish_jetstream_log(log_entry).await; 208 } 209 } 210 211 /// Get slice collections from cache with database fallback 212 async fn get_slice_collections( 213 &self, 214 slice_uri: &str, 215 ) -> Result<Option<HashSet<String>>, anyhow::Error> { 216 // Try cache first 217 let cache_result = { 218 let mut cache = self.collections_cache.lock().await; 219 cache.get_slice_collections(slice_uri).await 220 }; 221 222 match cache_result { 223 Ok(Some(collections)) => Ok(Some(collections)), 224 Ok(None) => { 225 // Cache miss - load from database 226 match self.database.get_slice_collections_list(slice_uri).await { 227 Ok(collections) => { 228 let collections_set: HashSet<String> = collections.into_iter().collect(); 229 // Cache the result 230 let _ = self 231 .collections_cache 232 .lock() 233 .await 234 .cache_slice_collections(slice_uri, &collections_set) 235 .await; 236 Ok(Some(collections_set)) 237 } 238 Err(e) => Err(e.into()), 239 } 240 } 241 Err(e) => Err(e), 242 } 243 } 244 245 /// Get slice domain from cache with database fallback 246 async fn get_slice_domain(&self, slice_uri: &str) -> Result<Option<String>, anyhow::Error> { 247 // Try cache first 248 let cache_result = { 249 let mut cache = self.domain_cache.lock().await; 250 cache.get_slice_domain(slice_uri).await 251 }; 252 253 match cache_result { 254 Ok(Some(domain)) => Ok(Some(domain)), 255 Ok(None) => { 256 // Cache miss - load from database 257 match self.database.get_slice_domain(slice_uri).await { 258 Ok(Some(domain)) => { 259 // Cache the result 260 let _ = self 261 .domain_cache 262 .lock() 263 .await 264 .cache_slice_domain(slice_uri, &domain) 265 .await; 266 Ok(Some(domain)) 267 } 268 Ok(None) => Ok(None), 269 Err(e) => Err(e.into()), 270 } 271 } 272 Err(e) => Err(e), 273 } 274 } 275 276 /// Get slice lexicons from cache with database fallback 277 async fn get_slice_lexicons( 278 &self, 279 slice_uri: &str, 280 ) -> Result<Option<Vec<serde_json::Value>>, anyhow::Error> { 281 // Try cache first 282 let cache_result = { 283 let mut cache = self.lexicon_cache.lock().await; 284 cache.get_lexicons(slice_uri).await 285 }; 286 287 match cache_result { 288 Ok(Some(lexicons)) => Ok(Some(lexicons)), 289 Ok(None) => { 290 // Cache miss - load from database 291 match self.database.get_lexicons_by_slice(slice_uri).await { 292 Ok(lexicons) if !lexicons.is_empty() => { 293 // Cache the result 294 let _ = self 295 .lexicon_cache 296 .lock() 297 .await 298 .cache_lexicons(slice_uri, &lexicons) 299 .await; 300 Ok(Some(lexicons)) 301 } 302 Ok(_) => Ok(None), // Empty lexicons 303 Err(e) => Err(e.into()), 304 } 305 } 306 Err(e) => Err(e), 307 } 308 } 309 async fn handle_commit_event( 310 &self, 311 did: &str, 312 commit: atproto_jetstream::JetstreamEventCommit, 313 ) -> Result<()> { 314 // Get all slices from cached list 315 let slices = self.slices_list.read().await.clone(); 316 317 // Process each slice 318 for slice_uri in slices { 319 // Get collections for this slice (with caching) 320 let collections = match self.get_slice_collections(&slice_uri).await { 321 Ok(Some(collections)) => collections, 322 Ok(None) => continue, // No collections for this slice 323 Err(e) => { 324 error!("Failed to get collections for slice {}: {}", slice_uri, e); 325 continue; 326 } 327 }; 328 329 if collections.contains(&commit.collection) { 330 // Special handling for network.slices.lexicon records 331 // These should only be indexed to the slice specified in their JSON data 332 let is_lexicon_for_this_slice = if commit.collection == "network.slices.lexicon" { 333 if let Some(target_slice_uri) = 334 commit.record.get("slice").and_then(|v| v.as_str()) 335 { 336 // Skip this slice if it's not the target slice for this lexicon 337 if slice_uri != target_slice_uri { 338 continue; 339 } 340 true // This is a lexicon record for this specific slice 341 } else { 342 // No target slice specified, skip this lexicon record entirely 343 continue; 344 } 345 } else { 346 false 347 }; 348 349 // Get the domain for this slice (with caching) 350 let domain = match self.get_slice_domain(&slice_uri).await { 351 Ok(Some(domain)) => domain, 352 Ok(None) => continue, // No domain, skip 353 Err(e) => { 354 error!("Failed to get domain for slice {}: {}", slice_uri, e); 355 continue; 356 } 357 }; 358 359 // Check if this is a primary collection (starts with slice domain) 360 // Lexicon records for this slice are always treated as primary 361 let is_primary_collection = 362 commit.collection.starts_with(&domain) || is_lexicon_for_this_slice; 363 364 // For external collections, check actor status BEFORE expensive validation 365 if !is_primary_collection { 366 let is_actor = match self.is_actor_cached(did, &slice_uri).await { 367 Ok(Some(cached_result)) => cached_result, 368 Ok(None) => { 369 // Cache miss means this DID is not an actor we've synced 370 // For external collections, we only care about actors we've already added 371 false 372 } 373 Err(e) => { 374 error!("Error checking actor status: {}", e); 375 continue; 376 } 377 }; 378 379 if !is_actor { 380 // Not an actor - skip validation entirely for external collections 381 continue; 382 } 383 } 384 385 // Get lexicons for validation (after actor check for external collections) 386 let lexicons = match self.get_slice_lexicons(&slice_uri).await { 387 Ok(Some(lexicons)) => lexicons, 388 Ok(None) => { 389 info!( 390 "No lexicons found for slice {} - skipping validation", 391 slice_uri 392 ); 393 continue; 394 } 395 Err(e) => { 396 error!("Failed to get lexicons for slice {}: {}", slice_uri, e); 397 continue; 398 } 399 }; 400 401 // Validate the record against the slice's lexicons 402 let validation_result = match slices_lexicon::validate_record( 403 lexicons.clone(), 404 &commit.collection, 405 commit.record.clone(), 406 ) { 407 Ok(_) => { 408 info!( 409 "Record validated for collection {} in slice {}", 410 commit.collection, slice_uri 411 ); 412 true 413 } 414 Err(e) => { 415 let message = format!( 416 "Validation failed for collection {} in slice {}", 417 commit.collection, slice_uri 418 ); 419 error!("{}: {}", message, e); 420 self.log_and_publish( 421 LogLevel::Warn, 422 &message, 423 Some(serde_json::json!({ 424 "collection": commit.collection, 425 "slice_uri": slice_uri, 426 "did": did 427 })), 428 Some(&slice_uri), 429 ).await; 430 false 431 } 432 }; 433 434 if !validation_result { 435 continue; // Skip this slice if validation fails 436 } 437 438 if is_primary_collection { 439 // Primary collection - ensure actor exists and index ALL records 440 info!( 441 "Primary collection {} for slice {} (domain: {}) - indexing record", 442 commit.collection, slice_uri, domain 443 ); 444 445 // Ensure actor exists for primary collections (except lexicons) 446 // Lexicons don't create actors - they just get indexed 447 if !is_lexicon_for_this_slice { 448 let is_cached = 449 matches!(self.is_actor_cached(did, &slice_uri).await, Ok(Some(_))); 450 451 if !is_cached { 452 // Actor not in cache - create it 453 info!("Creating new actor {} for slice {}", did, slice_uri); 454 455 // Resolve actor data (handle, PDS) 456 match resolve_actor_data(&self.http_client, did).await { 457 Ok(actor_data) => { 458 let actor = Actor { 459 did: actor_data.did.clone(), 460 handle: actor_data.handle, 461 slice_uri: slice_uri.clone(), 462 indexed_at: Utc::now().to_rfc3339(), 463 }; 464 465 // Insert into database 466 if let Err(e) = 467 self.database.batch_insert_actors(&[actor]).await 468 { 469 error!("Failed to create actor {}: {}", did, e); 470 } else { 471 // Add to cache after successful database insert 472 self.cache_actor_exists(did, &slice_uri).await; 473 info!("Created actor {} for slice {}", did, slice_uri); 474 } 475 } 476 Err(e) => { 477 error!("Failed to resolve actor data for {}: {}", did, e); 478 } 479 } 480 } 481 } 482 483 let uri = format!("at://{}/{}/{}", did, commit.collection, commit.rkey); 484 485 let record = Record { 486 uri: uri.clone(), 487 cid: commit.cid.clone(), 488 did: did.to_string(), 489 collection: commit.collection.clone(), 490 json: commit.record.clone(), 491 indexed_at: Utc::now(), 492 slice_uri: Some(slice_uri.clone()), 493 }; 494 495 match self.database.upsert_record(&record).await { 496 Ok(is_insert) => { 497 let message = if is_insert { 498 format!("Record inserted in {}", commit.collection) 499 } else { 500 format!("Record updated in {}", commit.collection) 501 }; 502 self.log_and_publish( 503 LogLevel::Info, 504 &message, 505 Some(serde_json::json!({ 506 "did": did, 507 "kind": "commit", 508 "commit": { 509 "rev": commit.rev, 510 "operation": commit.operation, 511 "collection": commit.collection, 512 "rkey": commit.rkey, 513 "record": commit.record, 514 "cid": commit.cid 515 }, 516 "indexed_operation": if is_insert { "insert" } else { "update" }, 517 "record_type": "primary" 518 })), 519 Some(&slice_uri), 520 ).await; 521 522 // Broadcast to GraphQL subscribers 523 let event = RecordUpdateEvent { 524 uri: uri.clone(), 525 cid: commit.cid.clone(), 526 did: did.to_string(), 527 collection: commit.collection.clone(), 528 value: commit.record.clone(), 529 slice_uri: slice_uri.clone(), 530 indexed_at: record.indexed_at.to_rfc3339(), 531 operation: if is_insert { 532 RecordOperation::Create 533 } else { 534 RecordOperation::Update 535 }, 536 }; 537 PUBSUB.publish(event).await; 538 } 539 Err(e) => { 540 let message = "Failed to insert/update record"; 541 self.log_and_publish( 542 LogLevel::Error, 543 message, 544 Some(serde_json::json!({ 545 "did": did, 546 "kind": "commit", 547 "commit": { 548 "rev": commit.rev, 549 "operation": commit.operation, 550 "collection": commit.collection, 551 "rkey": commit.rkey, 552 "record": commit.record, 553 "cid": commit.cid 554 }, 555 "error": e.to_string(), 556 "record_type": "primary" 557 })), 558 Some(&slice_uri), 559 ).await; 560 return Err(anyhow::anyhow!("Database error: {}", e)); 561 } 562 } 563 564 info!( 565 "Successfully indexed {} record from primary collection: {}", 566 commit.operation, uri 567 ); 568 break; 569 } else { 570 // External collection - we already checked actor status, so just index 571 info!( 572 "External collection {} - DID {} is actor in slice {} - indexing", 573 commit.collection, did, slice_uri 574 ); 575 576 let uri = format!("at://{}/{}/{}", did, commit.collection, commit.rkey); 577 578 let record = Record { 579 uri: uri.clone(), 580 cid: commit.cid.clone(), 581 did: did.to_string(), 582 collection: commit.collection.clone(), 583 json: commit.record.clone(), 584 indexed_at: Utc::now(), 585 slice_uri: Some(slice_uri.clone()), 586 }; 587 588 match self.database.upsert_record(&record).await { 589 Ok(is_insert) => { 590 let message = if is_insert { 591 format!("Record inserted in {}", commit.collection) 592 } else { 593 format!("Record updated in {}", commit.collection) 594 }; 595 self.log_and_publish( 596 LogLevel::Info, 597 &message, 598 Some(serde_json::json!({ 599 "did": did, 600 "kind": "commit", 601 "commit": { 602 "rev": commit.rev, 603 "operation": commit.operation, 604 "collection": commit.collection, 605 "rkey": commit.rkey, 606 "record": commit.record, 607 "cid": commit.cid 608 }, 609 "indexed_operation": if is_insert { "insert" } else { "update" }, 610 "record_type": "external" 611 })), 612 Some(&slice_uri), 613 ).await; 614 615 // Broadcast to GraphQL subscribers 616 let event = RecordUpdateEvent { 617 uri: uri.clone(), 618 cid: commit.cid.clone(), 619 did: did.to_string(), 620 collection: commit.collection.clone(), 621 value: commit.record.clone(), 622 slice_uri: slice_uri.clone(), 623 indexed_at: record.indexed_at.to_rfc3339(), 624 operation: if is_insert { 625 RecordOperation::Create 626 } else { 627 RecordOperation::Update 628 }, 629 }; 630 PUBSUB.publish(event).await; 631 } 632 Err(e) => { 633 let message = "Failed to insert/update record"; 634 self.log_and_publish( 635 LogLevel::Error, 636 message, 637 Some(serde_json::json!({ 638 "did": did, 639 "kind": "commit", 640 "commit": { 641 "rev": commit.rev, 642 "operation": commit.operation, 643 "collection": commit.collection, 644 "rkey": commit.rkey, 645 "record": commit.record, 646 "cid": commit.cid 647 }, 648 "error": e.to_string(), 649 "record_type": "external" 650 })), 651 Some(&slice_uri), 652 ).await; 653 return Err(anyhow::anyhow!("Database error: {}", e)); 654 } 655 } 656 657 info!( 658 "Successfully indexed {} record from external collection: {}", 659 commit.operation, uri 660 ); 661 break; 662 } 663 } 664 } 665 666 Ok(()) 667 } 668 669 async fn handle_delete_event( 670 &self, 671 did: &str, 672 commit: atproto_jetstream::JetstreamEventDelete, 673 ) -> Result<()> { 674 let uri = format!("at://{}/{}/{}", did, commit.collection, commit.rkey); 675 676 // Get all slices from cached list 677 let slices = self.slices_list.read().await.clone(); 678 679 let mut relevant_slices: Vec<String> = Vec::new(); 680 681 for slice_uri in slices { 682 // Get collections for this slice (with caching) 683 let collections = match self.get_slice_collections(&slice_uri).await { 684 Ok(Some(collections)) => collections, 685 Ok(None) => continue, // No collections for this slice 686 Err(e) => { 687 error!("Failed to get collections for slice {}: {}", slice_uri, e); 688 continue; 689 } 690 }; 691 692 if !collections.contains(&commit.collection) { 693 continue; 694 } 695 696 // Get the domain for this slice (with caching) 697 let domain = match self.get_slice_domain(&slice_uri).await { 698 Ok(Some(domain)) => domain, 699 Ok(None) => continue, // No domain, skip 700 Err(e) => { 701 error!("Failed to get domain for slice {}: {}", slice_uri, e); 702 continue; 703 } 704 }; 705 706 // Check if this is a primary collection (starts with slice domain) 707 let is_primary_collection = commit.collection.starts_with(&domain); 708 709 if is_primary_collection { 710 // Primary collection - always process deletes 711 relevant_slices.push(slice_uri.clone()); 712 } else { 713 // External collection - only process if DID is an actor in this slice 714 let is_actor = match self.is_actor_cached(did, &slice_uri).await { 715 Ok(Some(cached_result)) => cached_result, 716 _ => false, 717 }; 718 if is_actor { 719 relevant_slices.push(slice_uri.clone()); 720 } 721 } 722 } 723 724 if relevant_slices.is_empty() { 725 // No relevant slices found, skip deletion 726 return Ok(()); 727 } 728 729 // Handle cascade deletion before deleting the record 730 if let Err(e) = self 731 .database 732 .handle_cascade_deletion(&uri, &commit.collection) 733 .await 734 { 735 warn!("Cascade deletion failed for {}: {}", uri, e); 736 } 737 738 // Delete the record and log only for relevant slices 739 match self.database.delete_record_by_uri(&uri, None).await { 740 Ok(rows_affected) => { 741 if rows_affected > 0 { 742 info!( 743 "Deleted record: {} ({} rows) for {} slice(s)", 744 uri, 745 rows_affected, 746 relevant_slices.len() 747 ); 748 let message = format!("Record deleted from {}", commit.collection); 749 750 // Log to each relevant slice and check if actor cleanup is needed 751 for slice_uri in &relevant_slices { 752 self.log_and_publish( 753 LogLevel::Info, 754 &message, 755 Some(serde_json::json!({ 756 "operation": "delete", 757 "collection": commit.collection, 758 "did": did, 759 "uri": uri, 760 "rows_affected": rows_affected 761 })), 762 Some(slice_uri), 763 ).await; 764 765 // Broadcast delete event to GraphQL subscribers 766 let event = RecordUpdateEvent { 767 uri: uri.clone(), 768 cid: String::new(), // No CID for delete events 769 did: did.to_string(), 770 collection: commit.collection.clone(), 771 value: serde_json::json!({}), // Empty value for deletes 772 slice_uri: slice_uri.clone(), 773 indexed_at: Utc::now().to_rfc3339(), 774 operation: RecordOperation::Delete, 775 }; 776 PUBSUB.publish(event).await; 777 } 778 779 // Check if actor should be cleaned up (no more records) 780 for slice_uri in &relevant_slices { 781 match self.database.actor_has_records(did, slice_uri).await { 782 Ok(has_records) => { 783 if !has_records { 784 // No more records for this actor in this slice - clean up 785 match self.database.delete_actor(did, slice_uri).await { 786 Ok(deleted) => { 787 if deleted > 0 { 788 info!( 789 "Cleaned up actor {} from slice {} (no records remaining)", 790 did, slice_uri 791 ); 792 // Remove from cache 793 self.remove_actor_from_cache(did, slice_uri).await; 794 } 795 } 796 Err(e) => { 797 error!( 798 "Failed to delete actor {} from slice {}: {}", 799 did, slice_uri, e 800 ); 801 } 802 } 803 } 804 } 805 Err(e) => { 806 error!( 807 "Failed to check if actor {} has records in slice {}: {}", 808 did, slice_uri, e 809 ); 810 } 811 } 812 } 813 } 814 } 815 Err(e) => { 816 let message = "Failed to delete record"; 817 error!("{}: {}", message, e); 818 819 // Log error to each relevant slice 820 for slice_uri in relevant_slices { 821 self.log_and_publish( 822 LogLevel::Error, 823 message, 824 Some(serde_json::json!({ 825 "operation": "delete", 826 "collection": commit.collection, 827 "did": did, 828 "uri": uri, 829 "error": e.to_string() 830 })), 831 Some(&slice_uri), 832 ).await; 833 } 834 } 835 } 836 837 Ok(()) 838 } 839} 840 841impl JetstreamConsumer { 842 /// Create a new Jetstream consumer with optional cursor support and Redis cache 843 /// 844 /// # Arguments 845 /// * `database` - Database connection for slice configurations and record storage 846 /// * `jetstream_hostname` - Optional custom jetstream hostname 847 /// * `cursor_handler` - Optional cursor handler for resumable event processing 848 /// * `initial_cursor` - Optional starting cursor position (time_us) to resume from 849 /// * `redis_url` - Optional Redis URL for caching (falls back to in-memory if not provided) 850 pub async fn new( 851 database: Database, 852 jetstream_hostname: Option<String>, 853 cursor_handler: Option<Arc<PostgresCursorHandler>>, 854 initial_cursor: Option<i64>, 855 redis_url: Option<String>, 856 ) -> Result<Self, JetstreamError> { 857 let config = ConsumerTaskConfig { 858 user_agent: "slice-server/1.0".to_string(), 859 compression: false, 860 zstd_dictionary_location: String::new(), 861 jetstream_hostname: jetstream_hostname 862 .unwrap_or_else(|| "jetstream1.us-east.bsky.network".to_string()), 863 collections: Vec::new(), 864 dids: Vec::new(), 865 max_message_size_bytes: None, 866 cursor: initial_cursor, 867 require_hello: true, 868 }; 869 870 let consumer = Consumer::new(config); 871 let http_client = Client::new(); 872 873 // Determine cache backend based on Redis URL 874 let cache_backend = if let Some(redis_url) = redis_url { 875 CacheBackend::Redis { 876 url: redis_url, 877 ttl_seconds: None, 878 } 879 } else { 880 CacheBackend::InMemory { ttl_seconds: None } 881 }; 882 883 // Create cache instances 884 let actor_cache = Arc::new(Mutex::new( 885 CacheFactory::create_slice_cache(cache_backend.clone()) 886 .await 887 .map_err(|e| JetstreamError::ConnectionFailed { 888 message: format!("Failed to create actor cache: {}", e), 889 })?, 890 )); 891 892 let lexicon_cache = Arc::new(Mutex::new( 893 CacheFactory::create_slice_cache(cache_backend.clone()) 894 .await 895 .map_err(|e| JetstreamError::ConnectionFailed { 896 message: format!("Failed to create lexicon cache: {}", e), 897 })?, 898 )); 899 900 let domain_cache = Arc::new(Mutex::new( 901 CacheFactory::create_slice_cache(cache_backend.clone()) 902 .await 903 .map_err(|e| JetstreamError::ConnectionFailed { 904 message: format!("Failed to create domain cache: {}", e), 905 })?, 906 )); 907 908 let collections_cache = Arc::new(Mutex::new( 909 CacheFactory::create_slice_cache(cache_backend) 910 .await 911 .map_err(|e| JetstreamError::ConnectionFailed { 912 message: format!("Failed to create collections cache: {}", e), 913 })?, 914 )); 915 916 Ok(Self { 917 consumer, 918 database, 919 http_client, 920 actor_cache, 921 lexicon_cache, 922 domain_cache, 923 collections_cache, 924 event_count: Arc::new(std::sync::atomic::AtomicU64::new(0)), 925 cursor_handler, 926 slices_list: Arc::new(RwLock::new(Vec::new())), 927 }) 928 } 929 930 /// Load slice configurations 931 pub async fn load_slice_configurations(&self) -> Result<(), JetstreamError> { 932 info!("Loading slice configurations..."); 933 934 // Get all slices and update cached list 935 let slices = self.database.get_all_slices().await?; 936 *self.slices_list.write().await = slices.clone(); 937 info!("Found {} total slices in database", slices.len()); 938 939 Ok(()) 940 } 941 942 /// Preload actor cache to avoid database hits during event processing 943 async fn preload_actor_cache(&self) -> Result<(), JetstreamError> { 944 info!("Preloading actor cache..."); 945 946 let actors = self.database.get_all_actors().await?; 947 info!("Found {} actors to cache", actors.len()); 948 949 match self.actor_cache.lock().await.preload_actors(actors).await { 950 Ok(_) => { 951 info!("Actor cache preloaded successfully"); 952 Ok(()) 953 } 954 Err(e) => { 955 warn!(error = ?e, "Failed to preload actors to cache"); 956 Ok(()) // Don't fail startup if preload fails 957 } 958 } 959 } 960 961 /// Start consuming events from Jetstream 962 pub async fn start_consuming( 963 &self, 964 cancellation_token: CancellationToken, 965 ) -> Result<(), JetstreamError> { 966 info!("Starting Jetstream consumer"); 967 968 // Load initial slice configurations 969 self.load_slice_configurations().await?; 970 971 // Preload actor cache 972 self.preload_actor_cache().await?; 973 974 // Create and register the event handler 975 let handler = Arc::new(SliceEventHandler { 976 database: self.database.clone(), 977 http_client: self.http_client.clone(), 978 event_count: self.event_count.clone(), 979 actor_cache: self.actor_cache.clone(), 980 lexicon_cache: self.lexicon_cache.clone(), 981 domain_cache: self.domain_cache.clone(), 982 collections_cache: self.collections_cache.clone(), 983 cursor_handler: self.cursor_handler.clone(), 984 slices_list: self.slices_list.clone(), 985 }); 986 987 self.consumer.register_handler(handler).await.map_err(|e| { 988 JetstreamError::ConnectionFailed { 989 message: format!("Failed to register event handler: {}", e), 990 } 991 })?; 992 993 // Start periodic status reporting (with cancellation support) 994 let event_count_for_status = self.event_count.clone(); 995 let cancellation_token_for_status = cancellation_token.clone(); 996 tokio::spawn(async move { 997 let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(60)); // Every minute 998 loop { 999 tokio::select! { 1000 _ = interval.tick() => { 1001 let count = event_count_for_status.load(std::sync::atomic::Ordering::Relaxed); 1002 info!( 1003 "Jetstream consumer status: {} total events processed", 1004 count 1005 ); 1006 } 1007 _ = cancellation_token_for_status.cancelled() => { 1008 info!("Status reporting task cancelled"); 1009 break; 1010 } 1011 } 1012 } 1013 }); 1014 1015 // Start the consumer 1016 info!("Starting Jetstream background consumer..."); 1017 let result = self 1018 .consumer 1019 .run_background(cancellation_token) 1020 .await 1021 .map_err(|e| JetstreamError::ConnectionFailed { 1022 message: format!("Consumer failed: {}", e), 1023 }); 1024 1025 // Force write cursor on shutdown to ensure latest position is persisted 1026 if let Some(cursor_handler) = &self.cursor_handler { 1027 if let Err(e) = cursor_handler.force_write_cursor().await { 1028 error!("Failed to write final cursor position: {}", e); 1029 } else { 1030 info!("Final cursor position written to database"); 1031 } 1032 } 1033 1034 result?; 1035 Ok(()) 1036 } 1037 1038 /// Periodically reload slice configurations and actor cache to pick up new slices/collections/actors 1039 pub fn start_configuration_reloader(consumer: Arc<Self>) { 1040 tokio::spawn(async move { 1041 let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(300)); // Reload every 5 minutes 1042 interval.tick().await; // Skip first immediate tick 1043 1044 loop { 1045 interval.tick().await; 1046 1047 if let Err(e) = consumer.load_slice_configurations().await { 1048 error!("Failed to reload slice configurations: {}", e); 1049 } 1050 1051 if let Err(e) = consumer.preload_actor_cache().await { 1052 error!("Failed to reload actor cache: {}", e); 1053 } 1054 } 1055 }); 1056 } 1057}