QuickDID is a high-performance AT Protocol identity resolution service written in Rust. It provides handle-to-DID resolution with Redis-backed caching and queue processing.
at main 14 kB view raw
1//! Jetstream event handler for QuickDID 2//! 3//! This module provides the event handler for processing AT Protocol Jetstream events, 4//! specifically handling Account and Identity events to maintain cache consistency. 5 6use crate::handle_resolver::HandleResolver; 7use crate::metrics::MetricsPublisher; 8use anyhow::Result; 9use atproto_jetstream::{EventHandler, JetstreamEvent}; 10use std::sync::Arc; 11use tracing::{debug, info, warn}; 12 13/// Jetstream event handler for QuickDID 14/// 15/// This handler processes AT Protocol events from the Jetstream firehose to keep 16/// the handle resolver cache in sync with the network state. 17/// 18/// # Event Processing 19/// 20/// ## Account Events 21/// - When an account is marked as "deleted" or "deactivated", the DID is purged from the cache 22/// - Metrics are tracked for successful and failed purge operations 23/// 24/// ## Identity Events 25/// - When an identity event contains a handle, the handle-to-DID mapping is updated 26/// - When an identity event lacks a handle (indicating removal), the DID is purged 27/// - Metrics are tracked for successful and failed update/purge operations 28/// 29/// # Example 30/// 31/// ```no_run 32/// use quickdid::jetstream_handler::QuickDidEventHandler; 33/// use quickdid::handle_resolver::HandleResolver; 34/// use quickdid::metrics::MetricsPublisher; 35/// use std::sync::Arc; 36/// 37/// # async fn example(resolver: Arc<dyn HandleResolver>, metrics: Arc<dyn MetricsPublisher>) { 38/// let handler = QuickDidEventHandler::new(resolver, metrics); 39/// // Register with a JetstreamConsumer 40/// # } 41/// ``` 42pub struct QuickDidEventHandler { 43 resolver: Arc<dyn HandleResolver>, 44 metrics: Arc<dyn MetricsPublisher>, 45} 46 47impl QuickDidEventHandler { 48 /// Create a new Jetstream event handler 49 /// 50 /// # Arguments 51 /// 52 /// * `resolver` - The handle resolver to use for cache operations 53 /// * `metrics` - The metrics publisher for tracking event processing 54 pub fn new(resolver: Arc<dyn HandleResolver>, metrics: Arc<dyn MetricsPublisher>) -> Self { 55 Self { resolver, metrics } 56 } 57} 58 59#[async_trait::async_trait] 60impl EventHandler for QuickDidEventHandler { 61 fn handler_id(&self) -> String { 62 "quickdid_handler".to_string() 63 } 64 65 async fn handle_event(&self, event: JetstreamEvent) -> Result<()> { 66 match event { 67 JetstreamEvent::Account { did, kind, .. } => { 68 // If account kind is "deleted" or "deactivated", purge the DID 69 if kind == "deleted" || kind == "deactivated" { 70 info!(did = %did, kind = %kind, "Purging account"); 71 match self.resolver.purge(&did).await { 72 Ok(()) => { 73 self.metrics.incr("jetstream.account.purged").await; 74 } 75 Err(e) => { 76 warn!(did = %did, error = ?e, "Failed to purge DID"); 77 self.metrics.incr("jetstream.account.purge_error").await; 78 } 79 } 80 } 81 self.metrics.incr("jetstream.account.processed").await; 82 } 83 JetstreamEvent::Identity { did, identity, .. } => { 84 // Extract handle from identity JSON if available 85 if !identity.is_null() { 86 if let Some(handle_value) = identity.get("handle") { 87 if let Some(handle) = handle_value.as_str() { 88 info!(handle = %handle, did = %did, "Updating identity mapping"); 89 match self.resolver.set(handle, &did).await { 90 Ok(()) => { 91 self.metrics.incr("jetstream.identity.updated").await; 92 } 93 Err(e) => { 94 warn!(handle = %handle, did = %did, error = ?e, "Failed to update mapping"); 95 self.metrics.incr("jetstream.identity.update_error").await; 96 } 97 } 98 } else { 99 // No handle or invalid handle, purge the DID 100 info!(did = %did, "Purging identity without valid handle"); 101 match self.resolver.purge(&did).await { 102 Ok(()) => { 103 self.metrics.incr("jetstream.identity.purged").await; 104 } 105 Err(e) => { 106 warn!(did = %did, error = ?e, "Failed to purge DID"); 107 self.metrics.incr("jetstream.identity.purge_error").await; 108 } 109 } 110 } 111 } else { 112 // No handle field, purge the DID 113 info!(did = %did, "Purging identity without handle field"); 114 match self.resolver.purge(&did).await { 115 Ok(()) => { 116 self.metrics.incr("jetstream.identity.purged").await; 117 } 118 Err(e) => { 119 warn!(did = %did, error = ?e, "Failed to purge DID"); 120 self.metrics.incr("jetstream.identity.purge_error").await; 121 } 122 } 123 } 124 } else { 125 // Null identity means removed, purge the DID 126 info!(did = %did, "Purging identity with null info"); 127 match self.resolver.purge(&did).await { 128 Ok(()) => { 129 self.metrics.incr("jetstream.identity.purged").await; 130 } 131 Err(e) => { 132 warn!(did = %did, error = ?e, "Failed to purge DID"); 133 self.metrics.incr("jetstream.identity.purge_error").await; 134 } 135 } 136 } 137 self.metrics.incr("jetstream.identity.processed").await; 138 } 139 _ => { 140 // Other event types we don't care about 141 debug!("Ignoring unhandled Jetstream event type"); 142 } 143 } 144 Ok(()) 145 } 146} 147 148#[cfg(test)] 149mod tests { 150 use super::*; 151 use crate::handle_resolver::HandleResolverError; 152 use crate::metrics::NoOpMetricsPublisher; 153 use async_trait::async_trait; 154 use serde_json::json; 155 156 /// Mock resolver for testing 157 struct MockResolver { 158 purge_called: std::sync::Arc<std::sync::Mutex<Vec<String>>>, 159 set_called: std::sync::Arc<std::sync::Mutex<Vec<(String, String)>>>, 160 } 161 162 impl MockResolver { 163 fn new() -> Self { 164 Self { 165 purge_called: std::sync::Arc::new(std::sync::Mutex::new(Vec::new())), 166 set_called: std::sync::Arc::new(std::sync::Mutex::new(Vec::new())), 167 } 168 } 169 170 fn get_purge_calls(&self) -> Vec<String> { 171 self.purge_called.lock().unwrap().clone() 172 } 173 174 fn get_set_calls(&self) -> Vec<(String, String)> { 175 self.set_called.lock().unwrap().clone() 176 } 177 } 178 179 #[async_trait] 180 impl HandleResolver for MockResolver { 181 async fn resolve(&self, _handle: &str) -> Result<(String, u64), HandleResolverError> { 182 unimplemented!("Not needed for tests") 183 } 184 185 async fn purge(&self, subject: &str) -> Result<(), HandleResolverError> { 186 self.purge_called.lock().unwrap().push(subject.to_string()); 187 Ok(()) 188 } 189 190 async fn set(&self, handle: &str, did: &str) -> Result<(), HandleResolverError> { 191 self.set_called 192 .lock() 193 .unwrap() 194 .push((handle.to_string(), did.to_string())); 195 Ok(()) 196 } 197 } 198 199 #[tokio::test] 200 async fn test_account_deleted_event() { 201 let resolver = Arc::new(MockResolver::new()); 202 let metrics = Arc::new(NoOpMetricsPublisher::new()); 203 let handler = QuickDidEventHandler::new(resolver.clone(), metrics); 204 205 // Create a deleted account event 206 let event = JetstreamEvent::Account { 207 did: "did:plc:test123".to_string(), 208 kind: "deleted".to_string(), 209 time_us: 0, 210 identity: json!(null), 211 }; 212 213 handler.handle_event(event).await.unwrap(); 214 215 // Verify the DID was purged 216 let purge_calls = resolver.get_purge_calls(); 217 assert_eq!(purge_calls.len(), 1); 218 assert_eq!(purge_calls[0], "did:plc:test123"); 219 } 220 221 #[tokio::test] 222 async fn test_account_deactivated_event() { 223 let resolver = Arc::new(MockResolver::new()); 224 let metrics = Arc::new(NoOpMetricsPublisher::new()); 225 let handler = QuickDidEventHandler::new(resolver.clone(), metrics); 226 227 // Create a deactivated account event 228 let event = JetstreamEvent::Account { 229 did: "did:plc:test456".to_string(), 230 kind: "deactivated".to_string(), 231 time_us: 0, 232 identity: json!(null), 233 }; 234 235 handler.handle_event(event).await.unwrap(); 236 237 // Verify the DID was purged 238 let purge_calls = resolver.get_purge_calls(); 239 assert_eq!(purge_calls.len(), 1); 240 assert_eq!(purge_calls[0], "did:plc:test456"); 241 } 242 243 #[tokio::test] 244 async fn test_account_active_event() { 245 let resolver = Arc::new(MockResolver::new()); 246 let metrics = Arc::new(NoOpMetricsPublisher::new()); 247 let handler = QuickDidEventHandler::new(resolver.clone(), metrics); 248 249 // Create an active account event (should not purge) 250 let event = JetstreamEvent::Account { 251 did: "did:plc:test789".to_string(), 252 kind: "active".to_string(), 253 time_us: 0, 254 identity: json!(null), 255 }; 256 257 handler.handle_event(event).await.unwrap(); 258 259 // Verify the DID was NOT purged 260 let purge_calls = resolver.get_purge_calls(); 261 assert_eq!(purge_calls.len(), 0); 262 } 263 264 #[tokio::test] 265 async fn test_identity_with_handle_event() { 266 let resolver = Arc::new(MockResolver::new()); 267 let metrics = Arc::new(NoOpMetricsPublisher::new()); 268 let handler = QuickDidEventHandler::new(resolver.clone(), metrics); 269 270 // Create an identity event with a handle 271 let event = JetstreamEvent::Identity { 272 did: "did:plc:testuser".to_string(), 273 kind: "update".to_string(), 274 time_us: 0, 275 identity: json!({ 276 "handle": "alice.bsky.social" 277 }), 278 }; 279 280 handler.handle_event(event).await.unwrap(); 281 282 // Verify the set method was called 283 let set_calls = resolver.get_set_calls(); 284 assert_eq!(set_calls.len(), 1); 285 assert_eq!( 286 set_calls[0], 287 ( 288 "alice.bsky.social".to_string(), 289 "did:plc:testuser".to_string() 290 ) 291 ); 292 293 // Verify no purge was called 294 let purge_calls = resolver.get_purge_calls(); 295 assert_eq!(purge_calls.len(), 0); 296 } 297 298 #[tokio::test] 299 async fn test_identity_without_handle_event() { 300 let resolver = Arc::new(MockResolver::new()); 301 let metrics = Arc::new(NoOpMetricsPublisher::new()); 302 let handler = QuickDidEventHandler::new(resolver.clone(), metrics); 303 304 // Create an identity event without a handle field 305 let event = JetstreamEvent::Identity { 306 did: "did:plc:nohandle".to_string(), 307 kind: "update".to_string(), 308 time_us: 0, 309 identity: json!({ 310 "other_field": "value" 311 }), 312 }; 313 314 handler.handle_event(event).await.unwrap(); 315 316 // Verify the DID was purged 317 let purge_calls = resolver.get_purge_calls(); 318 assert_eq!(purge_calls.len(), 1); 319 assert_eq!(purge_calls[0], "did:plc:nohandle"); 320 321 // Verify set was not called 322 let set_calls = resolver.get_set_calls(); 323 assert_eq!(set_calls.len(), 0); 324 } 325 326 #[tokio::test] 327 async fn test_identity_with_null_identity() { 328 let resolver = Arc::new(MockResolver::new()); 329 let metrics = Arc::new(NoOpMetricsPublisher::new()); 330 let handler = QuickDidEventHandler::new(resolver.clone(), metrics); 331 332 // Create an identity event with null identity 333 let event = JetstreamEvent::Identity { 334 did: "did:plc:nullidentity".to_string(), 335 kind: "delete".to_string(), 336 time_us: 0, 337 identity: json!(null), 338 }; 339 340 handler.handle_event(event).await.unwrap(); 341 342 // Verify the DID was purged 343 let purge_calls = resolver.get_purge_calls(); 344 assert_eq!(purge_calls.len(), 1); 345 assert_eq!(purge_calls[0], "did:plc:nullidentity"); 346 347 // Verify set was not called 348 let set_calls = resolver.get_set_calls(); 349 assert_eq!(set_calls.len(), 0); 350 } 351 352 #[tokio::test] 353 async fn test_handler_id() { 354 let resolver = Arc::new(MockResolver::new()); 355 let metrics = Arc::new(NoOpMetricsPublisher::new()); 356 let handler = QuickDidEventHandler::new(resolver, metrics); 357 358 assert_eq!(handler.handler_id(), "quickdid_handler"); 359 } 360}