Highly ambitious ATProtocol AppView service and sdks
1use anyhow::Result;
2use async_trait::async_trait;
3use atproto_jetstream::{
4 CancellationToken, Consumer, ConsumerTaskConfig, EventHandler, JetstreamEvent,
5};
6use chrono::Utc;
7use reqwest::Client;
8use std::collections::HashSet;
9use std::sync::Arc;
10use tokio::sync::{Mutex, RwLock};
11use tracing::{error, info, warn};
12
13use crate::actor_resolver::resolve_actor_data;
14use crate::cache::{CacheBackend, CacheFactory, SliceCache};
15use crate::database::Database;
16use crate::errors::JetstreamError;
17use crate::graphql::{publish_jetstream_log, RecordOperation, RecordUpdateEvent, PUBSUB};
18use crate::jetstream_cursor::PostgresCursorHandler;
19use crate::logging::{LogEntry, LogLevel, Logger};
20use crate::models::{Actor, Record};
21
22pub struct JetstreamConsumer {
23 consumer: Consumer,
24 database: Database,
25 http_client: Client,
26 actor_cache: Arc<Mutex<SliceCache>>,
27 lexicon_cache: Arc<Mutex<SliceCache>>,
28 domain_cache: Arc<Mutex<SliceCache>>,
29 collections_cache: Arc<Mutex<SliceCache>>,
30 pub event_count: Arc<std::sync::atomic::AtomicU64>,
31 cursor_handler: Option<Arc<PostgresCursorHandler>>,
32 slices_list: Arc<RwLock<Vec<String>>>,
33}
34
35// Event handler that implements the EventHandler trait
36struct SliceEventHandler {
37 database: Database,
38 http_client: Client,
39 event_count: Arc<std::sync::atomic::AtomicU64>,
40 actor_cache: Arc<Mutex<SliceCache>>,
41 lexicon_cache: Arc<Mutex<SliceCache>>,
42 domain_cache: Arc<Mutex<SliceCache>>,
43 collections_cache: Arc<Mutex<SliceCache>>,
44 cursor_handler: Option<Arc<PostgresCursorHandler>>,
45 slices_list: Arc<RwLock<Vec<String>>>,
46}
47
48#[async_trait]
49impl EventHandler for SliceEventHandler {
50 async fn handle_event(&self, event: JetstreamEvent) -> Result<()> {
51 let count = self
52 .event_count
53 .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
54 + 1;
55
56 if count.is_multiple_of(10000) {
57 info!("Jetstream consumer has processed {} events", count);
58 }
59
60 // Extract and update cursor position from event
61 let time_us = match &event {
62 JetstreamEvent::Commit { time_us, .. } => *time_us,
63 JetstreamEvent::Delete { time_us, .. } => *time_us,
64 JetstreamEvent::Identity { time_us, .. } => *time_us,
65 JetstreamEvent::Account { time_us, .. } => *time_us,
66 };
67
68 if let Some(cursor_handler) = &self.cursor_handler {
69 cursor_handler.update_position(time_us);
70
71 // Periodically write cursor to DB (debounced by handler)
72 if let Err(e) = cursor_handler.maybe_write_cursor().await {
73 error!("Failed to write cursor: {}", e);
74 }
75 }
76
77 match event {
78 JetstreamEvent::Commit { did, commit, .. } => {
79 if let Err(e) = self.handle_commit_event(&did, commit).await {
80 let message = format!("Error handling commit event: {}", e);
81 error!("{}", message);
82 Logger::global().log_jetstream(
83 LogLevel::Error,
84 &message,
85 Some(serde_json::json!({
86 "error": e.to_string(),
87 "did": did,
88 "event_type": "commit"
89 })),
90 );
91 }
92 }
93 JetstreamEvent::Delete { did, commit, .. } => {
94 if let Err(e) = self.handle_delete_event(&did, commit).await {
95 let message = format!("Error handling delete event: {}", e);
96 error!("{}", message);
97 Logger::global().log_jetstream(
98 LogLevel::Error,
99 &message,
100 Some(serde_json::json!({
101 "error": e.to_string(),
102 "did": did,
103 "event_type": "delete"
104 })),
105 );
106 }
107 }
108 _ => {
109 // Ignore other event types (identity, account, etc.)
110 }
111 }
112 Ok(())
113 }
114
115 fn handler_id(&self) -> String {
116 "slice-records-indexer".to_string()
117 }
118}
119
120impl SliceEventHandler {
121 /// Check if DID is an actor for the given slice
122 async fn is_actor_cached(
123 &self,
124 did: &str,
125 slice_uri: &str,
126 ) -> Result<Option<bool>, anyhow::Error> {
127 match self.actor_cache.lock().await.is_actor(did, slice_uri).await {
128 Ok(result) => Ok(result),
129 Err(e) => {
130 warn!(
131 error = ?e,
132 did = did,
133 slice_uri = slice_uri,
134 "Actor cache error"
135 );
136 Ok(None)
137 }
138 }
139 }
140
141 /// Cache that an actor exists
142 async fn cache_actor_exists(&self, did: &str, slice_uri: &str) {
143 if let Err(e) = self
144 .actor_cache
145 .lock()
146 .await
147 .cache_actor_exists(did, slice_uri)
148 .await
149 {
150 warn!(
151 error = ?e,
152 did = did,
153 slice_uri = slice_uri,
154 "Failed to cache actor exists"
155 );
156 }
157 }
158
159 /// Remove actor from cache
160 async fn remove_actor_from_cache(&self, did: &str, slice_uri: &str) {
161 if let Err(e) = self
162 .actor_cache
163 .lock()
164 .await
165 .remove_actor(did, slice_uri)
166 .await
167 {
168 warn!(
169 error = ?e,
170 did = did,
171 slice_uri = slice_uri,
172 "Failed to remove actor from cache"
173 );
174 }
175 }
176
177 /// Log to database AND publish to GraphQL subscribers
178 ///
179 /// This helper ensures that log entries are both persisted to the database
180 /// and streamed in real-time to any active GraphQL subscriptions.
181 async fn log_and_publish(
182 &self,
183 level: LogLevel,
184 message: &str,
185 metadata: Option<serde_json::Value>,
186 slice_uri: Option<&str>,
187 ) {
188 // First, log to database (batched write)
189 Logger::global().log_jetstream_with_slice(level.clone(), message, metadata.clone(), slice_uri);
190
191 // Then, publish to GraphQL subscribers in real-time
192 // We need to query the most recent log entry we just created
193 // Since logging is batched, we create a LogEntry manually for immediate publishing
194 if let Some(slice) = slice_uri {
195 let log_entry = LogEntry {
196 id: 0, // Will be set by database, not critical for real-time streaming
197 created_at: Utc::now(),
198 log_type: "jetstream".to_string(),
199 job_id: None,
200 user_did: None,
201 slice_uri: Some(slice.to_string()),
202 level: level.as_str().to_string(),
203 message: message.to_string(),
204 metadata,
205 };
206
207 publish_jetstream_log(log_entry).await;
208 }
209 }
210
211 /// Get slice collections from cache with database fallback
212 async fn get_slice_collections(
213 &self,
214 slice_uri: &str,
215 ) -> Result<Option<HashSet<String>>, anyhow::Error> {
216 // Try cache first
217 let cache_result = {
218 let mut cache = self.collections_cache.lock().await;
219 cache.get_slice_collections(slice_uri).await
220 };
221
222 match cache_result {
223 Ok(Some(collections)) => Ok(Some(collections)),
224 Ok(None) => {
225 // Cache miss - load from database
226 match self.database.get_slice_collections_list(slice_uri).await {
227 Ok(collections) => {
228 let collections_set: HashSet<String> = collections.into_iter().collect();
229 // Cache the result
230 let _ = self
231 .collections_cache
232 .lock()
233 .await
234 .cache_slice_collections(slice_uri, &collections_set)
235 .await;
236 Ok(Some(collections_set))
237 }
238 Err(e) => Err(e.into()),
239 }
240 }
241 Err(e) => Err(e),
242 }
243 }
244
245 /// Get slice domain from cache with database fallback
246 async fn get_slice_domain(&self, slice_uri: &str) -> Result<Option<String>, anyhow::Error> {
247 // Try cache first
248 let cache_result = {
249 let mut cache = self.domain_cache.lock().await;
250 cache.get_slice_domain(slice_uri).await
251 };
252
253 match cache_result {
254 Ok(Some(domain)) => Ok(Some(domain)),
255 Ok(None) => {
256 // Cache miss - load from database
257 match self.database.get_slice_domain(slice_uri).await {
258 Ok(Some(domain)) => {
259 // Cache the result
260 let _ = self
261 .domain_cache
262 .lock()
263 .await
264 .cache_slice_domain(slice_uri, &domain)
265 .await;
266 Ok(Some(domain))
267 }
268 Ok(None) => Ok(None),
269 Err(e) => Err(e.into()),
270 }
271 }
272 Err(e) => Err(e),
273 }
274 }
275
276 /// Get slice lexicons from cache with database fallback
277 async fn get_slice_lexicons(
278 &self,
279 slice_uri: &str,
280 ) -> Result<Option<Vec<serde_json::Value>>, anyhow::Error> {
281 // Try cache first
282 let cache_result = {
283 let mut cache = self.lexicon_cache.lock().await;
284 cache.get_lexicons(slice_uri).await
285 };
286
287 match cache_result {
288 Ok(Some(lexicons)) => Ok(Some(lexicons)),
289 Ok(None) => {
290 // Cache miss - load from database
291 match self.database.get_lexicons_by_slice(slice_uri).await {
292 Ok(lexicons) if !lexicons.is_empty() => {
293 // Cache the result
294 let _ = self
295 .lexicon_cache
296 .lock()
297 .await
298 .cache_lexicons(slice_uri, &lexicons)
299 .await;
300 Ok(Some(lexicons))
301 }
302 Ok(_) => Ok(None), // Empty lexicons
303 Err(e) => Err(e.into()),
304 }
305 }
306 Err(e) => Err(e),
307 }
308 }
309 async fn handle_commit_event(
310 &self,
311 did: &str,
312 commit: atproto_jetstream::JetstreamEventCommit,
313 ) -> Result<()> {
314 // Get all slices from cached list
315 let slices = self.slices_list.read().await.clone();
316
317 // Process each slice
318 for slice_uri in slices {
319 // Get collections for this slice (with caching)
320 let collections = match self.get_slice_collections(&slice_uri).await {
321 Ok(Some(collections)) => collections,
322 Ok(None) => continue, // No collections for this slice
323 Err(e) => {
324 error!("Failed to get collections for slice {}: {}", slice_uri, e);
325 continue;
326 }
327 };
328
329 if collections.contains(&commit.collection) {
330 // Special handling for network.slices.lexicon records
331 // These should only be indexed to the slice specified in their JSON data
332 let is_lexicon_for_this_slice = if commit.collection == "network.slices.lexicon" {
333 if let Some(target_slice_uri) =
334 commit.record.get("slice").and_then(|v| v.as_str())
335 {
336 // Skip this slice if it's not the target slice for this lexicon
337 if slice_uri != target_slice_uri {
338 continue;
339 }
340 true // This is a lexicon record for this specific slice
341 } else {
342 // No target slice specified, skip this lexicon record entirely
343 continue;
344 }
345 } else {
346 false
347 };
348
349 // Get the domain for this slice (with caching)
350 let domain = match self.get_slice_domain(&slice_uri).await {
351 Ok(Some(domain)) => domain,
352 Ok(None) => continue, // No domain, skip
353 Err(e) => {
354 error!("Failed to get domain for slice {}: {}", slice_uri, e);
355 continue;
356 }
357 };
358
359 // Check if this is a primary collection (starts with slice domain)
360 // Lexicon records for this slice are always treated as primary
361 let is_primary_collection =
362 commit.collection.starts_with(&domain) || is_lexicon_for_this_slice;
363
364 // For external collections, check actor status BEFORE expensive validation
365 if !is_primary_collection {
366 let is_actor = match self.is_actor_cached(did, &slice_uri).await {
367 Ok(Some(cached_result)) => cached_result,
368 Ok(None) => {
369 // Cache miss means this DID is not an actor we've synced
370 // For external collections, we only care about actors we've already added
371 false
372 }
373 Err(e) => {
374 error!("Error checking actor status: {}", e);
375 continue;
376 }
377 };
378
379 if !is_actor {
380 // Not an actor - skip validation entirely for external collections
381 continue;
382 }
383 }
384
385 // Get lexicons for validation (after actor check for external collections)
386 let lexicons = match self.get_slice_lexicons(&slice_uri).await {
387 Ok(Some(lexicons)) => lexicons,
388 Ok(None) => {
389 info!(
390 "No lexicons found for slice {} - skipping validation",
391 slice_uri
392 );
393 continue;
394 }
395 Err(e) => {
396 error!("Failed to get lexicons for slice {}: {}", slice_uri, e);
397 continue;
398 }
399 };
400
401 // Validate the record against the slice's lexicons
402 let validation_result = match slices_lexicon::validate_record(
403 lexicons.clone(),
404 &commit.collection,
405 commit.record.clone(),
406 ) {
407 Ok(_) => {
408 info!(
409 "Record validated for collection {} in slice {}",
410 commit.collection, slice_uri
411 );
412 true
413 }
414 Err(e) => {
415 let message = format!(
416 "Validation failed for collection {} in slice {}",
417 commit.collection, slice_uri
418 );
419 error!("{}: {}", message, e);
420 self.log_and_publish(
421 LogLevel::Warn,
422 &message,
423 Some(serde_json::json!({
424 "collection": commit.collection,
425 "slice_uri": slice_uri,
426 "did": did
427 })),
428 Some(&slice_uri),
429 ).await;
430 false
431 }
432 };
433
434 if !validation_result {
435 continue; // Skip this slice if validation fails
436 }
437
438 if is_primary_collection {
439 // Primary collection - ensure actor exists and index ALL records
440 info!(
441 "Primary collection {} for slice {} (domain: {}) - indexing record",
442 commit.collection, slice_uri, domain
443 );
444
445 // Ensure actor exists for primary collections (except lexicons)
446 // Lexicons don't create actors - they just get indexed
447 if !is_lexicon_for_this_slice {
448 let is_cached =
449 matches!(self.is_actor_cached(did, &slice_uri).await, Ok(Some(_)));
450
451 if !is_cached {
452 // Actor not in cache - create it
453 info!("Creating new actor {} for slice {}", did, slice_uri);
454
455 // Resolve actor data (handle, PDS)
456 match resolve_actor_data(&self.http_client, did).await {
457 Ok(actor_data) => {
458 let actor = Actor {
459 did: actor_data.did.clone(),
460 handle: actor_data.handle,
461 slice_uri: slice_uri.clone(),
462 indexed_at: Utc::now().to_rfc3339(),
463 };
464
465 // Insert into database
466 if let Err(e) =
467 self.database.batch_insert_actors(&[actor]).await
468 {
469 error!("Failed to create actor {}: {}", did, e);
470 } else {
471 // Add to cache after successful database insert
472 self.cache_actor_exists(did, &slice_uri).await;
473 info!("Created actor {} for slice {}", did, slice_uri);
474 }
475 }
476 Err(e) => {
477 error!("Failed to resolve actor data for {}: {}", did, e);
478 }
479 }
480 }
481 }
482
483 let uri = format!("at://{}/{}/{}", did, commit.collection, commit.rkey);
484
485 let record = Record {
486 uri: uri.clone(),
487 cid: commit.cid.clone(),
488 did: did.to_string(),
489 collection: commit.collection.clone(),
490 json: commit.record.clone(),
491 indexed_at: Utc::now(),
492 slice_uri: Some(slice_uri.clone()),
493 };
494
495 match self.database.upsert_record(&record).await {
496 Ok(is_insert) => {
497 let message = if is_insert {
498 format!("Record inserted in {}", commit.collection)
499 } else {
500 format!("Record updated in {}", commit.collection)
501 };
502 self.log_and_publish(
503 LogLevel::Info,
504 &message,
505 Some(serde_json::json!({
506 "did": did,
507 "kind": "commit",
508 "commit": {
509 "rev": commit.rev,
510 "operation": commit.operation,
511 "collection": commit.collection,
512 "rkey": commit.rkey,
513 "record": commit.record,
514 "cid": commit.cid
515 },
516 "indexed_operation": if is_insert { "insert" } else { "update" },
517 "record_type": "primary"
518 })),
519 Some(&slice_uri),
520 ).await;
521
522 // Broadcast to GraphQL subscribers
523 let event = RecordUpdateEvent {
524 uri: uri.clone(),
525 cid: commit.cid.clone(),
526 did: did.to_string(),
527 collection: commit.collection.clone(),
528 value: commit.record.clone(),
529 slice_uri: slice_uri.clone(),
530 indexed_at: record.indexed_at.to_rfc3339(),
531 operation: if is_insert {
532 RecordOperation::Create
533 } else {
534 RecordOperation::Update
535 },
536 };
537 PUBSUB.publish(event).await;
538 }
539 Err(e) => {
540 let message = "Failed to insert/update record";
541 self.log_and_publish(
542 LogLevel::Error,
543 message,
544 Some(serde_json::json!({
545 "did": did,
546 "kind": "commit",
547 "commit": {
548 "rev": commit.rev,
549 "operation": commit.operation,
550 "collection": commit.collection,
551 "rkey": commit.rkey,
552 "record": commit.record,
553 "cid": commit.cid
554 },
555 "error": e.to_string(),
556 "record_type": "primary"
557 })),
558 Some(&slice_uri),
559 ).await;
560 return Err(anyhow::anyhow!("Database error: {}", e));
561 }
562 }
563
564 info!(
565 "Successfully indexed {} record from primary collection: {}",
566 commit.operation, uri
567 );
568 break;
569 } else {
570 // External collection - we already checked actor status, so just index
571 info!(
572 "External collection {} - DID {} is actor in slice {} - indexing",
573 commit.collection, did, slice_uri
574 );
575
576 let uri = format!("at://{}/{}/{}", did, commit.collection, commit.rkey);
577
578 let record = Record {
579 uri: uri.clone(),
580 cid: commit.cid.clone(),
581 did: did.to_string(),
582 collection: commit.collection.clone(),
583 json: commit.record.clone(),
584 indexed_at: Utc::now(),
585 slice_uri: Some(slice_uri.clone()),
586 };
587
588 match self.database.upsert_record(&record).await {
589 Ok(is_insert) => {
590 let message = if is_insert {
591 format!("Record inserted in {}", commit.collection)
592 } else {
593 format!("Record updated in {}", commit.collection)
594 };
595 self.log_and_publish(
596 LogLevel::Info,
597 &message,
598 Some(serde_json::json!({
599 "did": did,
600 "kind": "commit",
601 "commit": {
602 "rev": commit.rev,
603 "operation": commit.operation,
604 "collection": commit.collection,
605 "rkey": commit.rkey,
606 "record": commit.record,
607 "cid": commit.cid
608 },
609 "indexed_operation": if is_insert { "insert" } else { "update" },
610 "record_type": "external"
611 })),
612 Some(&slice_uri),
613 ).await;
614
615 // Broadcast to GraphQL subscribers
616 let event = RecordUpdateEvent {
617 uri: uri.clone(),
618 cid: commit.cid.clone(),
619 did: did.to_string(),
620 collection: commit.collection.clone(),
621 value: commit.record.clone(),
622 slice_uri: slice_uri.clone(),
623 indexed_at: record.indexed_at.to_rfc3339(),
624 operation: if is_insert {
625 RecordOperation::Create
626 } else {
627 RecordOperation::Update
628 },
629 };
630 PUBSUB.publish(event).await;
631 }
632 Err(e) => {
633 let message = "Failed to insert/update record";
634 self.log_and_publish(
635 LogLevel::Error,
636 message,
637 Some(serde_json::json!({
638 "did": did,
639 "kind": "commit",
640 "commit": {
641 "rev": commit.rev,
642 "operation": commit.operation,
643 "collection": commit.collection,
644 "rkey": commit.rkey,
645 "record": commit.record,
646 "cid": commit.cid
647 },
648 "error": e.to_string(),
649 "record_type": "external"
650 })),
651 Some(&slice_uri),
652 ).await;
653 return Err(anyhow::anyhow!("Database error: {}", e));
654 }
655 }
656
657 info!(
658 "Successfully indexed {} record from external collection: {}",
659 commit.operation, uri
660 );
661 break;
662 }
663 }
664 }
665
666 Ok(())
667 }
668
669 async fn handle_delete_event(
670 &self,
671 did: &str,
672 commit: atproto_jetstream::JetstreamEventDelete,
673 ) -> Result<()> {
674 let uri = format!("at://{}/{}/{}", did, commit.collection, commit.rkey);
675
676 // Get all slices from cached list
677 let slices = self.slices_list.read().await.clone();
678
679 let mut relevant_slices: Vec<String> = Vec::new();
680
681 for slice_uri in slices {
682 // Get collections for this slice (with caching)
683 let collections = match self.get_slice_collections(&slice_uri).await {
684 Ok(Some(collections)) => collections,
685 Ok(None) => continue, // No collections for this slice
686 Err(e) => {
687 error!("Failed to get collections for slice {}: {}", slice_uri, e);
688 continue;
689 }
690 };
691
692 if !collections.contains(&commit.collection) {
693 continue;
694 }
695
696 // Get the domain for this slice (with caching)
697 let domain = match self.get_slice_domain(&slice_uri).await {
698 Ok(Some(domain)) => domain,
699 Ok(None) => continue, // No domain, skip
700 Err(e) => {
701 error!("Failed to get domain for slice {}: {}", slice_uri, e);
702 continue;
703 }
704 };
705
706 // Check if this is a primary collection (starts with slice domain)
707 let is_primary_collection = commit.collection.starts_with(&domain);
708
709 if is_primary_collection {
710 // Primary collection - always process deletes
711 relevant_slices.push(slice_uri.clone());
712 } else {
713 // External collection - only process if DID is an actor in this slice
714 let is_actor = match self.is_actor_cached(did, &slice_uri).await {
715 Ok(Some(cached_result)) => cached_result,
716 _ => false,
717 };
718 if is_actor {
719 relevant_slices.push(slice_uri.clone());
720 }
721 }
722 }
723
724 if relevant_slices.is_empty() {
725 // No relevant slices found, skip deletion
726 return Ok(());
727 }
728
729 // Handle cascade deletion before deleting the record
730 if let Err(e) = self
731 .database
732 .handle_cascade_deletion(&uri, &commit.collection)
733 .await
734 {
735 warn!("Cascade deletion failed for {}: {}", uri, e);
736 }
737
738 // Delete the record and log only for relevant slices
739 match self.database.delete_record_by_uri(&uri, None).await {
740 Ok(rows_affected) => {
741 if rows_affected > 0 {
742 info!(
743 "Deleted record: {} ({} rows) for {} slice(s)",
744 uri,
745 rows_affected,
746 relevant_slices.len()
747 );
748 let message = format!("Record deleted from {}", commit.collection);
749
750 // Log to each relevant slice and check if actor cleanup is needed
751 for slice_uri in &relevant_slices {
752 self.log_and_publish(
753 LogLevel::Info,
754 &message,
755 Some(serde_json::json!({
756 "operation": "delete",
757 "collection": commit.collection,
758 "did": did,
759 "uri": uri,
760 "rows_affected": rows_affected
761 })),
762 Some(slice_uri),
763 ).await;
764
765 // Broadcast delete event to GraphQL subscribers
766 let event = RecordUpdateEvent {
767 uri: uri.clone(),
768 cid: String::new(), // No CID for delete events
769 did: did.to_string(),
770 collection: commit.collection.clone(),
771 value: serde_json::json!({}), // Empty value for deletes
772 slice_uri: slice_uri.clone(),
773 indexed_at: Utc::now().to_rfc3339(),
774 operation: RecordOperation::Delete,
775 };
776 PUBSUB.publish(event).await;
777 }
778
779 // Check if actor should be cleaned up (no more records)
780 for slice_uri in &relevant_slices {
781 match self.database.actor_has_records(did, slice_uri).await {
782 Ok(has_records) => {
783 if !has_records {
784 // No more records for this actor in this slice - clean up
785 match self.database.delete_actor(did, slice_uri).await {
786 Ok(deleted) => {
787 if deleted > 0 {
788 info!(
789 "Cleaned up actor {} from slice {} (no records remaining)",
790 did, slice_uri
791 );
792 // Remove from cache
793 self.remove_actor_from_cache(did, slice_uri).await;
794 }
795 }
796 Err(e) => {
797 error!(
798 "Failed to delete actor {} from slice {}: {}",
799 did, slice_uri, e
800 );
801 }
802 }
803 }
804 }
805 Err(e) => {
806 error!(
807 "Failed to check if actor {} has records in slice {}: {}",
808 did, slice_uri, e
809 );
810 }
811 }
812 }
813 }
814 }
815 Err(e) => {
816 let message = "Failed to delete record";
817 error!("{}: {}", message, e);
818
819 // Log error to each relevant slice
820 for slice_uri in relevant_slices {
821 self.log_and_publish(
822 LogLevel::Error,
823 message,
824 Some(serde_json::json!({
825 "operation": "delete",
826 "collection": commit.collection,
827 "did": did,
828 "uri": uri,
829 "error": e.to_string()
830 })),
831 Some(&slice_uri),
832 ).await;
833 }
834 }
835 }
836
837 Ok(())
838 }
839}
840
841impl JetstreamConsumer {
842 /// Create a new Jetstream consumer with optional cursor support and Redis cache
843 ///
844 /// # Arguments
845 /// * `database` - Database connection for slice configurations and record storage
846 /// * `jetstream_hostname` - Optional custom jetstream hostname
847 /// * `cursor_handler` - Optional cursor handler for resumable event processing
848 /// * `initial_cursor` - Optional starting cursor position (time_us) to resume from
849 /// * `redis_url` - Optional Redis URL for caching (falls back to in-memory if not provided)
850 pub async fn new(
851 database: Database,
852 jetstream_hostname: Option<String>,
853 cursor_handler: Option<Arc<PostgresCursorHandler>>,
854 initial_cursor: Option<i64>,
855 redis_url: Option<String>,
856 ) -> Result<Self, JetstreamError> {
857 let config = ConsumerTaskConfig {
858 user_agent: "slice-server/1.0".to_string(),
859 compression: false,
860 zstd_dictionary_location: String::new(),
861 jetstream_hostname: jetstream_hostname
862 .unwrap_or_else(|| "jetstream1.us-east.bsky.network".to_string()),
863 collections: Vec::new(),
864 dids: Vec::new(),
865 max_message_size_bytes: None,
866 cursor: initial_cursor,
867 require_hello: true,
868 };
869
870 let consumer = Consumer::new(config);
871 let http_client = Client::new();
872
873 // Determine cache backend based on Redis URL
874 let cache_backend = if let Some(redis_url) = redis_url {
875 CacheBackend::Redis {
876 url: redis_url,
877 ttl_seconds: None,
878 }
879 } else {
880 CacheBackend::InMemory { ttl_seconds: None }
881 };
882
883 // Create cache instances
884 let actor_cache = Arc::new(Mutex::new(
885 CacheFactory::create_slice_cache(cache_backend.clone())
886 .await
887 .map_err(|e| JetstreamError::ConnectionFailed {
888 message: format!("Failed to create actor cache: {}", e),
889 })?,
890 ));
891
892 let lexicon_cache = Arc::new(Mutex::new(
893 CacheFactory::create_slice_cache(cache_backend.clone())
894 .await
895 .map_err(|e| JetstreamError::ConnectionFailed {
896 message: format!("Failed to create lexicon cache: {}", e),
897 })?,
898 ));
899
900 let domain_cache = Arc::new(Mutex::new(
901 CacheFactory::create_slice_cache(cache_backend.clone())
902 .await
903 .map_err(|e| JetstreamError::ConnectionFailed {
904 message: format!("Failed to create domain cache: {}", e),
905 })?,
906 ));
907
908 let collections_cache = Arc::new(Mutex::new(
909 CacheFactory::create_slice_cache(cache_backend)
910 .await
911 .map_err(|e| JetstreamError::ConnectionFailed {
912 message: format!("Failed to create collections cache: {}", e),
913 })?,
914 ));
915
916 Ok(Self {
917 consumer,
918 database,
919 http_client,
920 actor_cache,
921 lexicon_cache,
922 domain_cache,
923 collections_cache,
924 event_count: Arc::new(std::sync::atomic::AtomicU64::new(0)),
925 cursor_handler,
926 slices_list: Arc::new(RwLock::new(Vec::new())),
927 })
928 }
929
930 /// Load slice configurations
931 pub async fn load_slice_configurations(&self) -> Result<(), JetstreamError> {
932 info!("Loading slice configurations...");
933
934 // Get all slices and update cached list
935 let slices = self.database.get_all_slices().await?;
936 *self.slices_list.write().await = slices.clone();
937 info!("Found {} total slices in database", slices.len());
938
939 Ok(())
940 }
941
942 /// Preload actor cache to avoid database hits during event processing
943 async fn preload_actor_cache(&self) -> Result<(), JetstreamError> {
944 info!("Preloading actor cache...");
945
946 let actors = self.database.get_all_actors().await?;
947 info!("Found {} actors to cache", actors.len());
948
949 match self.actor_cache.lock().await.preload_actors(actors).await {
950 Ok(_) => {
951 info!("Actor cache preloaded successfully");
952 Ok(())
953 }
954 Err(e) => {
955 warn!(error = ?e, "Failed to preload actors to cache");
956 Ok(()) // Don't fail startup if preload fails
957 }
958 }
959 }
960
961 /// Start consuming events from Jetstream
962 pub async fn start_consuming(
963 &self,
964 cancellation_token: CancellationToken,
965 ) -> Result<(), JetstreamError> {
966 info!("Starting Jetstream consumer");
967
968 // Load initial slice configurations
969 self.load_slice_configurations().await?;
970
971 // Preload actor cache
972 self.preload_actor_cache().await?;
973
974 // Create and register the event handler
975 let handler = Arc::new(SliceEventHandler {
976 database: self.database.clone(),
977 http_client: self.http_client.clone(),
978 event_count: self.event_count.clone(),
979 actor_cache: self.actor_cache.clone(),
980 lexicon_cache: self.lexicon_cache.clone(),
981 domain_cache: self.domain_cache.clone(),
982 collections_cache: self.collections_cache.clone(),
983 cursor_handler: self.cursor_handler.clone(),
984 slices_list: self.slices_list.clone(),
985 });
986
987 self.consumer.register_handler(handler).await.map_err(|e| {
988 JetstreamError::ConnectionFailed {
989 message: format!("Failed to register event handler: {}", e),
990 }
991 })?;
992
993 // Start periodic status reporting (with cancellation support)
994 let event_count_for_status = self.event_count.clone();
995 let cancellation_token_for_status = cancellation_token.clone();
996 tokio::spawn(async move {
997 let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(60)); // Every minute
998 loop {
999 tokio::select! {
1000 _ = interval.tick() => {
1001 let count = event_count_for_status.load(std::sync::atomic::Ordering::Relaxed);
1002 info!(
1003 "Jetstream consumer status: {} total events processed",
1004 count
1005 );
1006 }
1007 _ = cancellation_token_for_status.cancelled() => {
1008 info!("Status reporting task cancelled");
1009 break;
1010 }
1011 }
1012 }
1013 });
1014
1015 // Start the consumer
1016 info!("Starting Jetstream background consumer...");
1017 let result = self
1018 .consumer
1019 .run_background(cancellation_token)
1020 .await
1021 .map_err(|e| JetstreamError::ConnectionFailed {
1022 message: format!("Consumer failed: {}", e),
1023 });
1024
1025 // Force write cursor on shutdown to ensure latest position is persisted
1026 if let Some(cursor_handler) = &self.cursor_handler {
1027 if let Err(e) = cursor_handler.force_write_cursor().await {
1028 error!("Failed to write final cursor position: {}", e);
1029 } else {
1030 info!("Final cursor position written to database");
1031 }
1032 }
1033
1034 result?;
1035 Ok(())
1036 }
1037
1038 /// Periodically reload slice configurations and actor cache to pick up new slices/collections/actors
1039 pub fn start_configuration_reloader(consumer: Arc<Self>) {
1040 tokio::spawn(async move {
1041 let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(300)); // Reload every 5 minutes
1042 interval.tick().await; // Skip first immediate tick
1043
1044 loop {
1045 interval.tick().await;
1046
1047 if let Err(e) = consumer.load_slice_configurations().await {
1048 error!("Failed to reload slice configurations: {}", e);
1049 }
1050
1051 if let Err(e) = consumer.preload_actor_cache().await {
1052 error!("Failed to reload actor cache: {}", e);
1053 }
1054 }
1055 });
1056 }
1057}