A Rust application to showcase badge awards in the AT Protocol ecosystem.
at main 5.7 kB view raw
1//! Jetstream consumer for AT Protocol badge award events. 2//! 3//! Handles real-time processing of badge award events from Jetstream, 4//! with event queuing and cursor management for reliable consumption. 5 6use std::sync::Arc; 7use std::sync::atomic::{AtomicU64, Ordering}; 8use std::time::{Duration, Instant}; 9 10use crate::errors::ShowcaseError; 11use anyhow::Result; 12use async_trait::async_trait; 13use atproto_jetstream::{EventHandler, JetstreamEvent}; 14use tokio::sync::Mutex; 15use tokio::sync::mpsc; 16 17/// Type alias for badge event receiver to hide implementation details 18pub type BadgeEventReceiver = mpsc::UnboundedReceiver<AwardEvent>; 19 20/// Badge event types from Jetstream 21#[derive(Debug, Clone)] 22pub enum AwardEvent { 23 /// Record commit event for a badge award. 24 Commit { 25 /// DID of the record owner. 26 did: String, 27 /// Record key within the collection. 28 rkey: String, 29 /// Content identifier of the record. 30 cid: String, 31 /// The complete record data. 32 record: serde_json::Value, 33 }, 34 /// Record deletion event for a badge award. 35 Delete { 36 /// DID of the record owner. 37 did: String, 38 /// Record key within the collection. 39 rkey: String, 40 }, 41} 42 43/// Event handler that publishes badge events to an in-memory queue 44pub struct AwardEventHandler { 45 id: String, 46 event_sender: mpsc::UnboundedSender<AwardEvent>, 47} 48 49impl AwardEventHandler { 50 fn new(id: String, event_sender: mpsc::UnboundedSender<AwardEvent>) -> Self { 51 Self { id, event_sender } 52 } 53} 54 55#[async_trait] 56impl EventHandler for AwardEventHandler { 57 async fn handle_event(&self, event: JetstreamEvent) -> Result<()> { 58 let award_event = match event { 59 JetstreamEvent::Commit { did, commit, .. } => { 60 if commit.collection != "community.lexicon.badge.award" { 61 return Ok(()); 62 } 63 64 AwardEvent::Commit { 65 did, 66 rkey: commit.rkey, 67 cid: commit.cid, 68 record: commit.record, 69 } 70 } 71 JetstreamEvent::Delete { did, commit, .. } => { 72 if commit.collection != "community.lexicon.badge.award" { 73 return Ok(()); 74 } 75 AwardEvent::Delete { 76 did, 77 rkey: commit.rkey, 78 } 79 } 80 JetstreamEvent::Identity { .. } | JetstreamEvent::Account { .. } => { 81 return Ok(()); 82 } 83 }; 84 85 if let Err(err) = self.event_sender.send(award_event) { 86 let showcase_error = ShowcaseError::ConsumerQueueSendFailed { 87 details: err.to_string(), 88 }; 89 tracing::error!(?showcase_error); 90 } 91 92 Ok(()) 93 } 94 95 fn handler_id(&self) -> String { 96 self.id.clone() 97 } 98} 99 100/// Cursor writer handler that periodically writes the latest time_us to a file 101pub struct CursorWriterHandler { 102 id: String, 103 cursor_path: String, 104 last_time_us: Arc<AtomicU64>, 105 last_write: Arc<Mutex<Instant>>, 106 write_interval: Duration, 107} 108 109impl CursorWriterHandler { 110 fn new(id: String, cursor_path: String) -> Self { 111 Self { 112 id, 113 cursor_path, 114 last_time_us: Arc::new(AtomicU64::new(0)), 115 last_write: Arc::new(Mutex::new(Instant::now())), 116 write_interval: Duration::from_secs(30), 117 } 118 } 119 120 async fn maybe_write_cursor(&self) -> Result<()> { 121 let current_time_us = self.last_time_us.load(Ordering::Relaxed); 122 if current_time_us == 0 { 123 return Ok(()); 124 } 125 126 let mut last_write = self.last_write.lock().await; 127 if last_write.elapsed() >= self.write_interval { 128 tokio::fs::write(&self.cursor_path, current_time_us.to_string()).await?; 129 *last_write = Instant::now(); 130 tracing::debug!("Wrote cursor to {}: {}", self.cursor_path, current_time_us); 131 } 132 Ok(()) 133 } 134} 135 136#[async_trait] 137impl EventHandler for CursorWriterHandler { 138 async fn handle_event(&self, event: JetstreamEvent) -> Result<()> { 139 // Extract time_us from any event type 140 let time_us = match &event { 141 JetstreamEvent::Commit { time_us, .. } => *time_us, 142 JetstreamEvent::Delete { time_us, .. } => *time_us, 143 JetstreamEvent::Identity { time_us, .. } => *time_us, 144 JetstreamEvent::Account { time_us, .. } => *time_us, 145 }; 146 147 // Update the latest time_us 148 self.last_time_us.store(time_us, Ordering::Relaxed); 149 150 // Try to write the cursor periodically 151 if let Err(err) = self.maybe_write_cursor().await { 152 tracing::warn!("Failed to write cursor: {}", err); 153 } 154 155 Ok(()) 156 } 157 158 fn handler_id(&self) -> String { 159 self.id.clone() 160 } 161} 162 163/// Consumer factory for creating event handlers and queue receivers 164pub struct Consumer {} 165 166impl Consumer { 167 /// Create a badge event handler and return the receiver for processing 168 pub fn create_badge_handler(&self) -> (Arc<AwardEventHandler>, BadgeEventReceiver) { 169 let (sender, receiver) = mpsc::unbounded_channel(); 170 let handler = Arc::new(AwardEventHandler::new( 171 "badge-processor".to_string(), 172 sender, 173 )); 174 (handler, receiver) 175 } 176 177 /// Create a cursor writer handler for the given cursor path 178 pub fn create_cursor_writer_handler(&self, cursor_path: String) -> Arc<CursorWriterHandler> { 179 Arc::new(CursorWriterHandler::new( 180 "cursor-writer".to_string(), 181 cursor_path, 182 )) 183 } 184}