QuickDID is a high-performance AT Protocol identity resolution service written in Rust. It provides handle-to-DID resolution with Redis-backed caching and queue processing.
1//! Jetstream event handler for QuickDID
2//!
3//! This module provides the event handler for processing AT Protocol Jetstream events,
4//! specifically handling Account and Identity events to maintain cache consistency.
5
6use crate::handle_resolver::HandleResolver;
7use crate::metrics::MetricsPublisher;
8use anyhow::Result;
9use atproto_jetstream::{EventHandler, JetstreamEvent};
10use std::sync::Arc;
11use tracing::{debug, info, warn};
12
13/// Jetstream event handler for QuickDID
14///
15/// This handler processes AT Protocol events from the Jetstream firehose to keep
16/// the handle resolver cache in sync with the network state.
17///
18/// # Event Processing
19///
20/// ## Account Events
21/// - When an account is marked as "deleted" or "deactivated", the DID is purged from the cache
22/// - Metrics are tracked for successful and failed purge operations
23///
24/// ## Identity Events
25/// - When an identity event contains a handle, the handle-to-DID mapping is updated
26/// - When an identity event lacks a handle (indicating removal), the DID is purged
27/// - Metrics are tracked for successful and failed update/purge operations
28///
29/// # Example
30///
31/// ```no_run
32/// use quickdid::jetstream_handler::QuickDidEventHandler;
33/// use quickdid::handle_resolver::HandleResolver;
34/// use quickdid::metrics::MetricsPublisher;
35/// use std::sync::Arc;
36///
37/// # async fn example(resolver: Arc<dyn HandleResolver>, metrics: Arc<dyn MetricsPublisher>) {
38/// let handler = QuickDidEventHandler::new(resolver, metrics);
39/// // Register with a JetstreamConsumer
40/// # }
41/// ```
42pub struct QuickDidEventHandler {
43 resolver: Arc<dyn HandleResolver>,
44 metrics: Arc<dyn MetricsPublisher>,
45}
46
47impl QuickDidEventHandler {
48 /// Create a new Jetstream event handler
49 ///
50 /// # Arguments
51 ///
52 /// * `resolver` - The handle resolver to use for cache operations
53 /// * `metrics` - The metrics publisher for tracking event processing
54 pub fn new(resolver: Arc<dyn HandleResolver>, metrics: Arc<dyn MetricsPublisher>) -> Self {
55 Self { resolver, metrics }
56 }
57}
58
59#[async_trait::async_trait]
60impl EventHandler for QuickDidEventHandler {
61 fn handler_id(&self) -> String {
62 "quickdid_handler".to_string()
63 }
64
65 async fn handle_event(&self, event: JetstreamEvent) -> Result<()> {
66 match event {
67 JetstreamEvent::Account { did, kind, .. } => {
68 // If account kind is "deleted" or "deactivated", purge the DID
69 if kind == "deleted" || kind == "deactivated" {
70 info!(did = %did, kind = %kind, "Purging account");
71 match self.resolver.purge(&did).await {
72 Ok(()) => {
73 self.metrics.incr("jetstream.account.purged").await;
74 }
75 Err(e) => {
76 warn!(did = %did, error = ?e, "Failed to purge DID");
77 self.metrics.incr("jetstream.account.purge_error").await;
78 }
79 }
80 }
81 self.metrics.incr("jetstream.account.processed").await;
82 }
83 JetstreamEvent::Identity { did, identity, .. } => {
84 // Extract handle from identity JSON if available
85 if !identity.is_null() {
86 if let Some(handle_value) = identity.get("handle") {
87 if let Some(handle) = handle_value.as_str() {
88 info!(handle = %handle, did = %did, "Updating identity mapping");
89 match self.resolver.set(handle, &did).await {
90 Ok(()) => {
91 self.metrics.incr("jetstream.identity.updated").await;
92 }
93 Err(e) => {
94 warn!(handle = %handle, did = %did, error = ?e, "Failed to update mapping");
95 self.metrics.incr("jetstream.identity.update_error").await;
96 }
97 }
98 } else {
99 // No handle or invalid handle, purge the DID
100 info!(did = %did, "Purging identity without valid handle");
101 match self.resolver.purge(&did).await {
102 Ok(()) => {
103 self.metrics.incr("jetstream.identity.purged").await;
104 }
105 Err(e) => {
106 warn!(did = %did, error = ?e, "Failed to purge DID");
107 self.metrics.incr("jetstream.identity.purge_error").await;
108 }
109 }
110 }
111 } else {
112 // No handle field, purge the DID
113 info!(did = %did, "Purging identity without handle field");
114 match self.resolver.purge(&did).await {
115 Ok(()) => {
116 self.metrics.incr("jetstream.identity.purged").await;
117 }
118 Err(e) => {
119 warn!(did = %did, error = ?e, "Failed to purge DID");
120 self.metrics.incr("jetstream.identity.purge_error").await;
121 }
122 }
123 }
124 } else {
125 // Null identity means removed, purge the DID
126 info!(did = %did, "Purging identity with null info");
127 match self.resolver.purge(&did).await {
128 Ok(()) => {
129 self.metrics.incr("jetstream.identity.purged").await;
130 }
131 Err(e) => {
132 warn!(did = %did, error = ?e, "Failed to purge DID");
133 self.metrics.incr("jetstream.identity.purge_error").await;
134 }
135 }
136 }
137 self.metrics.incr("jetstream.identity.processed").await;
138 }
139 _ => {
140 // Other event types we don't care about
141 debug!("Ignoring unhandled Jetstream event type");
142 }
143 }
144 Ok(())
145 }
146}
147
148#[cfg(test)]
149mod tests {
150 use super::*;
151 use crate::handle_resolver::HandleResolverError;
152 use crate::metrics::NoOpMetricsPublisher;
153 use async_trait::async_trait;
154 use serde_json::json;
155
156 /// Mock resolver for testing
157 struct MockResolver {
158 purge_called: std::sync::Arc<std::sync::Mutex<Vec<String>>>,
159 set_called: std::sync::Arc<std::sync::Mutex<Vec<(String, String)>>>,
160 }
161
162 impl MockResolver {
163 fn new() -> Self {
164 Self {
165 purge_called: std::sync::Arc::new(std::sync::Mutex::new(Vec::new())),
166 set_called: std::sync::Arc::new(std::sync::Mutex::new(Vec::new())),
167 }
168 }
169
170 fn get_purge_calls(&self) -> Vec<String> {
171 self.purge_called.lock().unwrap().clone()
172 }
173
174 fn get_set_calls(&self) -> Vec<(String, String)> {
175 self.set_called.lock().unwrap().clone()
176 }
177 }
178
179 #[async_trait]
180 impl HandleResolver for MockResolver {
181 async fn resolve(&self, _handle: &str) -> Result<(String, u64), HandleResolverError> {
182 unimplemented!("Not needed for tests")
183 }
184
185 async fn purge(&self, subject: &str) -> Result<(), HandleResolverError> {
186 self.purge_called.lock().unwrap().push(subject.to_string());
187 Ok(())
188 }
189
190 async fn set(&self, handle: &str, did: &str) -> Result<(), HandleResolverError> {
191 self.set_called
192 .lock()
193 .unwrap()
194 .push((handle.to_string(), did.to_string()));
195 Ok(())
196 }
197 }
198
199 #[tokio::test]
200 async fn test_account_deleted_event() {
201 let resolver = Arc::new(MockResolver::new());
202 let metrics = Arc::new(NoOpMetricsPublisher::new());
203 let handler = QuickDidEventHandler::new(resolver.clone(), metrics);
204
205 // Create a deleted account event
206 let event = JetstreamEvent::Account {
207 did: "did:plc:test123".to_string(),
208 kind: "deleted".to_string(),
209 time_us: 0,
210 account: json!(null),
211 };
212
213 handler.handle_event(event).await.unwrap();
214
215 // Verify the DID was purged
216 let purge_calls = resolver.get_purge_calls();
217 assert_eq!(purge_calls.len(), 1);
218 assert_eq!(purge_calls[0], "did:plc:test123");
219 }
220
221 #[tokio::test]
222 async fn test_account_deactivated_event() {
223 let resolver = Arc::new(MockResolver::new());
224 let metrics = Arc::new(NoOpMetricsPublisher::new());
225 let handler = QuickDidEventHandler::new(resolver.clone(), metrics);
226
227 // Create a deactivated account event
228 let event = JetstreamEvent::Account {
229 did: "did:plc:test456".to_string(),
230 kind: "deactivated".to_string(),
231 time_us: 0,
232 account: json!(null),
233 };
234
235 handler.handle_event(event).await.unwrap();
236
237 // Verify the DID was purged
238 let purge_calls = resolver.get_purge_calls();
239 assert_eq!(purge_calls.len(), 1);
240 assert_eq!(purge_calls[0], "did:plc:test456");
241 }
242
243 #[tokio::test]
244 async fn test_account_active_event() {
245 let resolver = Arc::new(MockResolver::new());
246 let metrics = Arc::new(NoOpMetricsPublisher::new());
247 let handler = QuickDidEventHandler::new(resolver.clone(), metrics);
248
249 // Create an active account event (should not purge)
250 let event = JetstreamEvent::Account {
251 did: "did:plc:test789".to_string(),
252 kind: "active".to_string(),
253 time_us: 0,
254 account: json!(null),
255 };
256
257 handler.handle_event(event).await.unwrap();
258
259 // Verify the DID was NOT purged
260 let purge_calls = resolver.get_purge_calls();
261 assert_eq!(purge_calls.len(), 0);
262 }
263
264 #[tokio::test]
265 async fn test_identity_with_handle_event() {
266 let resolver = Arc::new(MockResolver::new());
267 let metrics = Arc::new(NoOpMetricsPublisher::new());
268 let handler = QuickDidEventHandler::new(resolver.clone(), metrics);
269
270 // Create an identity event with a handle
271 let event = JetstreamEvent::Identity {
272 did: "did:plc:testuser".to_string(),
273 kind: "update".to_string(),
274 time_us: 0,
275 identity: json!({
276 "handle": "alice.bsky.social"
277 }),
278 };
279
280 handler.handle_event(event).await.unwrap();
281
282 // Verify the set method was called
283 let set_calls = resolver.get_set_calls();
284 assert_eq!(set_calls.len(), 1);
285 assert_eq!(
286 set_calls[0],
287 (
288 "alice.bsky.social".to_string(),
289 "did:plc:testuser".to_string()
290 )
291 );
292
293 // Verify no purge was called
294 let purge_calls = resolver.get_purge_calls();
295 assert_eq!(purge_calls.len(), 0);
296 }
297
298 #[tokio::test]
299 async fn test_identity_without_handle_event() {
300 let resolver = Arc::new(MockResolver::new());
301 let metrics = Arc::new(NoOpMetricsPublisher::new());
302 let handler = QuickDidEventHandler::new(resolver.clone(), metrics);
303
304 // Create an identity event without a handle field
305 let event = JetstreamEvent::Identity {
306 did: "did:plc:nohandle".to_string(),
307 kind: "update".to_string(),
308 time_us: 0,
309 identity: json!({
310 "other_field": "value"
311 }),
312 };
313
314 handler.handle_event(event).await.unwrap();
315
316 // Verify the DID was purged
317 let purge_calls = resolver.get_purge_calls();
318 assert_eq!(purge_calls.len(), 1);
319 assert_eq!(purge_calls[0], "did:plc:nohandle");
320
321 // Verify set was not called
322 let set_calls = resolver.get_set_calls();
323 assert_eq!(set_calls.len(), 0);
324 }
325
326 #[tokio::test]
327 async fn test_identity_with_null_identity() {
328 let resolver = Arc::new(MockResolver::new());
329 let metrics = Arc::new(NoOpMetricsPublisher::new());
330 let handler = QuickDidEventHandler::new(resolver.clone(), metrics);
331
332 // Create an identity event with null identity
333 let event = JetstreamEvent::Identity {
334 did: "did:plc:nullidentity".to_string(),
335 kind: "delete".to_string(),
336 time_us: 0,
337 identity: json!(null),
338 };
339
340 handler.handle_event(event).await.unwrap();
341
342 // Verify the DID was purged
343 let purge_calls = resolver.get_purge_calls();
344 assert_eq!(purge_calls.len(), 1);
345 assert_eq!(purge_calls[0], "did:plc:nullidentity");
346
347 // Verify set was not called
348 let set_calls = resolver.get_set_calls();
349 assert_eq!(set_calls.len(), 0);
350 }
351
352 #[tokio::test]
353 async fn test_handler_id() {
354 let resolver = Arc::new(MockResolver::new());
355 let metrics = Arc::new(NoOpMetricsPublisher::new());
356 let handler = QuickDidEventHandler::new(resolver, metrics);
357
358 assert_eq!(handler.handler_id(), "quickdid_handler");
359 }
360}