//! Jetstream event handler for QuickDID //! //! This module provides the event handler for processing AT Protocol Jetstream events, //! specifically handling Account and Identity events to maintain cache consistency. use crate::handle_resolver::HandleResolver; use crate::metrics::MetricsPublisher; use anyhow::Result; use atproto_jetstream::{EventHandler, JetstreamEvent}; use std::sync::Arc; use tracing::{debug, info, warn}; /// Jetstream event handler for QuickDID /// /// This handler processes AT Protocol events from the Jetstream firehose to keep /// the handle resolver cache in sync with the network state. /// /// # Event Processing /// /// ## Account Events /// - When an account is marked as "deleted" or "deactivated", the DID is purged from the cache /// - Metrics are tracked for successful and failed purge operations /// /// ## Identity Events /// - When an identity event contains a handle, the handle-to-DID mapping is updated /// - When an identity event lacks a handle (indicating removal), the DID is purged /// - Metrics are tracked for successful and failed update/purge operations /// /// # Example /// /// ```no_run /// use quickdid::jetstream_handler::QuickDidEventHandler; /// use quickdid::handle_resolver::HandleResolver; /// use quickdid::metrics::MetricsPublisher; /// use std::sync::Arc; /// /// # async fn example(resolver: Arc, metrics: Arc) { /// let handler = QuickDidEventHandler::new(resolver, metrics); /// // Register with a JetstreamConsumer /// # } /// ``` pub struct QuickDidEventHandler { resolver: Arc, metrics: Arc, } impl QuickDidEventHandler { /// Create a new Jetstream event handler /// /// # Arguments /// /// * `resolver` - The handle resolver to use for cache operations /// * `metrics` - The metrics publisher for tracking event processing pub fn new(resolver: Arc, metrics: Arc) -> Self { Self { resolver, metrics } } } #[async_trait::async_trait] impl EventHandler for QuickDidEventHandler { fn handler_id(&self) -> String { "quickdid_handler".to_string() } async fn handle_event(&self, event: JetstreamEvent) -> Result<()> { match event { JetstreamEvent::Account { did, kind, .. } => { // If account kind is "deleted" or "deactivated", purge the DID if kind == "deleted" || kind == "deactivated" { info!(did = %did, kind = %kind, "Purging account"); match self.resolver.purge(&did).await { Ok(()) => { self.metrics.incr("jetstream.account.purged").await; } Err(e) => { warn!(did = %did, error = ?e, "Failed to purge DID"); self.metrics.incr("jetstream.account.purge_error").await; } } } self.metrics.incr("jetstream.account.processed").await; } JetstreamEvent::Identity { did, identity, .. } => { // Extract handle from identity JSON if available if !identity.is_null() { if let Some(handle_value) = identity.get("handle") { if let Some(handle) = handle_value.as_str() { info!(handle = %handle, did = %did, "Updating identity mapping"); match self.resolver.set(handle, &did).await { Ok(()) => { self.metrics.incr("jetstream.identity.updated").await; } Err(e) => { warn!(handle = %handle, did = %did, error = ?e, "Failed to update mapping"); self.metrics.incr("jetstream.identity.update_error").await; } } } else { // No handle or invalid handle, purge the DID info!(did = %did, "Purging identity without valid handle"); match self.resolver.purge(&did).await { Ok(()) => { self.metrics.incr("jetstream.identity.purged").await; } Err(e) => { warn!(did = %did, error = ?e, "Failed to purge DID"); self.metrics.incr("jetstream.identity.purge_error").await; } } } } else { // No handle field, purge the DID info!(did = %did, "Purging identity without handle field"); match self.resolver.purge(&did).await { Ok(()) => { self.metrics.incr("jetstream.identity.purged").await; } Err(e) => { warn!(did = %did, error = ?e, "Failed to purge DID"); self.metrics.incr("jetstream.identity.purge_error").await; } } } } else { // Null identity means removed, purge the DID info!(did = %did, "Purging identity with null info"); match self.resolver.purge(&did).await { Ok(()) => { self.metrics.incr("jetstream.identity.purged").await; } Err(e) => { warn!(did = %did, error = ?e, "Failed to purge DID"); self.metrics.incr("jetstream.identity.purge_error").await; } } } self.metrics.incr("jetstream.identity.processed").await; } _ => { // Other event types we don't care about debug!("Ignoring unhandled Jetstream event type"); } } Ok(()) } } #[cfg(test)] mod tests { use super::*; use crate::handle_resolver::HandleResolverError; use crate::metrics::NoOpMetricsPublisher; use async_trait::async_trait; use serde_json::json; /// Mock resolver for testing struct MockResolver { purge_called: std::sync::Arc>>, set_called: std::sync::Arc>>, } impl MockResolver { fn new() -> Self { Self { purge_called: std::sync::Arc::new(std::sync::Mutex::new(Vec::new())), set_called: std::sync::Arc::new(std::sync::Mutex::new(Vec::new())), } } fn get_purge_calls(&self) -> Vec { self.purge_called.lock().unwrap().clone() } fn get_set_calls(&self) -> Vec<(String, String)> { self.set_called.lock().unwrap().clone() } } #[async_trait] impl HandleResolver for MockResolver { async fn resolve(&self, _handle: &str) -> Result<(String, u64), HandleResolverError> { unimplemented!("Not needed for tests") } async fn purge(&self, subject: &str) -> Result<(), HandleResolverError> { self.purge_called.lock().unwrap().push(subject.to_string()); Ok(()) } async fn set(&self, handle: &str, did: &str) -> Result<(), HandleResolverError> { self.set_called .lock() .unwrap() .push((handle.to_string(), did.to_string())); Ok(()) } } #[tokio::test] async fn test_account_deleted_event() { let resolver = Arc::new(MockResolver::new()); let metrics = Arc::new(NoOpMetricsPublisher::new()); let handler = QuickDidEventHandler::new(resolver.clone(), metrics); // Create a deleted account event let event = JetstreamEvent::Account { did: "did:plc:test123".to_string(), kind: "deleted".to_string(), time_us: 0, identity: json!(null), }; handler.handle_event(event).await.unwrap(); // Verify the DID was purged let purge_calls = resolver.get_purge_calls(); assert_eq!(purge_calls.len(), 1); assert_eq!(purge_calls[0], "did:plc:test123"); } #[tokio::test] async fn test_account_deactivated_event() { let resolver = Arc::new(MockResolver::new()); let metrics = Arc::new(NoOpMetricsPublisher::new()); let handler = QuickDidEventHandler::new(resolver.clone(), metrics); // Create a deactivated account event let event = JetstreamEvent::Account { did: "did:plc:test456".to_string(), kind: "deactivated".to_string(), time_us: 0, identity: json!(null), }; handler.handle_event(event).await.unwrap(); // Verify the DID was purged let purge_calls = resolver.get_purge_calls(); assert_eq!(purge_calls.len(), 1); assert_eq!(purge_calls[0], "did:plc:test456"); } #[tokio::test] async fn test_account_active_event() { let resolver = Arc::new(MockResolver::new()); let metrics = Arc::new(NoOpMetricsPublisher::new()); let handler = QuickDidEventHandler::new(resolver.clone(), metrics); // Create an active account event (should not purge) let event = JetstreamEvent::Account { did: "did:plc:test789".to_string(), kind: "active".to_string(), time_us: 0, identity: json!(null), }; handler.handle_event(event).await.unwrap(); // Verify the DID was NOT purged let purge_calls = resolver.get_purge_calls(); assert_eq!(purge_calls.len(), 0); } #[tokio::test] async fn test_identity_with_handle_event() { let resolver = Arc::new(MockResolver::new()); let metrics = Arc::new(NoOpMetricsPublisher::new()); let handler = QuickDidEventHandler::new(resolver.clone(), metrics); // Create an identity event with a handle let event = JetstreamEvent::Identity { did: "did:plc:testuser".to_string(), kind: "update".to_string(), time_us: 0, identity: json!({ "handle": "alice.bsky.social" }), }; handler.handle_event(event).await.unwrap(); // Verify the set method was called let set_calls = resolver.get_set_calls(); assert_eq!(set_calls.len(), 1); assert_eq!( set_calls[0], ( "alice.bsky.social".to_string(), "did:plc:testuser".to_string() ) ); // Verify no purge was called let purge_calls = resolver.get_purge_calls(); assert_eq!(purge_calls.len(), 0); } #[tokio::test] async fn test_identity_without_handle_event() { let resolver = Arc::new(MockResolver::new()); let metrics = Arc::new(NoOpMetricsPublisher::new()); let handler = QuickDidEventHandler::new(resolver.clone(), metrics); // Create an identity event without a handle field let event = JetstreamEvent::Identity { did: "did:plc:nohandle".to_string(), kind: "update".to_string(), time_us: 0, identity: json!({ "other_field": "value" }), }; handler.handle_event(event).await.unwrap(); // Verify the DID was purged let purge_calls = resolver.get_purge_calls(); assert_eq!(purge_calls.len(), 1); assert_eq!(purge_calls[0], "did:plc:nohandle"); // Verify set was not called let set_calls = resolver.get_set_calls(); assert_eq!(set_calls.len(), 0); } #[tokio::test] async fn test_identity_with_null_identity() { let resolver = Arc::new(MockResolver::new()); let metrics = Arc::new(NoOpMetricsPublisher::new()); let handler = QuickDidEventHandler::new(resolver.clone(), metrics); // Create an identity event with null identity let event = JetstreamEvent::Identity { did: "did:plc:nullidentity".to_string(), kind: "delete".to_string(), time_us: 0, identity: json!(null), }; handler.handle_event(event).await.unwrap(); // Verify the DID was purged let purge_calls = resolver.get_purge_calls(); assert_eq!(purge_calls.len(), 1); assert_eq!(purge_calls[0], "did:plc:nullidentity"); // Verify set was not called let set_calls = resolver.get_set_calls(); assert_eq!(set_calls.len(), 0); } #[tokio::test] async fn test_handler_id() { let resolver = Arc::new(MockResolver::new()); let metrics = Arc::new(NoOpMetricsPublisher::new()); let handler = QuickDidEventHandler::new(resolver, metrics); assert_eq!(handler.handler_id(), "quickdid_handler"); } }