A Rust application to showcase badge awards in the AT Protocol ecosystem.
1//! Jetstream consumer for AT Protocol badge award events.
2//!
3//! Handles real-time processing of badge award events from Jetstream,
4//! with event queuing and cursor management for reliable consumption.
5
6use std::sync::Arc;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::time::{Duration, Instant};
9
10use crate::errors::ShowcaseError;
11use anyhow::Result;
12use async_trait::async_trait;
13use atproto_jetstream::{EventHandler, JetstreamEvent};
14use tokio::sync::Mutex;
15use tokio::sync::mpsc;
16
17/// Type alias for badge event receiver to hide implementation details
18pub type BadgeEventReceiver = mpsc::UnboundedReceiver<AwardEvent>;
19
20/// Badge event types from Jetstream
21#[derive(Debug, Clone)]
22pub enum AwardEvent {
23 /// Record commit event for a badge award.
24 Commit {
25 /// DID of the record owner.
26 did: String,
27 /// Record key within the collection.
28 rkey: String,
29 /// Content identifier of the record.
30 cid: String,
31 /// The complete record data.
32 record: serde_json::Value,
33 },
34 /// Record deletion event for a badge award.
35 Delete {
36 /// DID of the record owner.
37 did: String,
38 /// Record key within the collection.
39 rkey: String,
40 },
41}
42
43/// Event handler that publishes badge events to an in-memory queue
44pub struct AwardEventHandler {
45 id: String,
46 event_sender: mpsc::UnboundedSender<AwardEvent>,
47}
48
49impl AwardEventHandler {
50 fn new(id: String, event_sender: mpsc::UnboundedSender<AwardEvent>) -> Self {
51 Self { id, event_sender }
52 }
53}
54
55#[async_trait]
56impl EventHandler for AwardEventHandler {
57 async fn handle_event(&self, event: JetstreamEvent) -> Result<()> {
58 let award_event = match event {
59 JetstreamEvent::Commit { did, commit, .. } => {
60 if commit.collection != "community.lexicon.badge.award" {
61 return Ok(());
62 }
63
64 AwardEvent::Commit {
65 did,
66 rkey: commit.rkey,
67 cid: commit.cid,
68 record: commit.record,
69 }
70 }
71 JetstreamEvent::Delete { did, commit, .. } => {
72 if commit.collection != "community.lexicon.badge.award" {
73 return Ok(());
74 }
75 AwardEvent::Delete {
76 did,
77 rkey: commit.rkey,
78 }
79 }
80 JetstreamEvent::Identity { .. } | JetstreamEvent::Account { .. } => {
81 return Ok(());
82 }
83 };
84
85 if let Err(err) = self.event_sender.send(award_event) {
86 let showcase_error = ShowcaseError::ConsumerQueueSendFailed {
87 details: err.to_string(),
88 };
89 tracing::error!(?showcase_error);
90 }
91
92 Ok(())
93 }
94
95 fn handler_id(&self) -> String {
96 self.id.clone()
97 }
98}
99
100/// Cursor writer handler that periodically writes the latest time_us to a file
101pub struct CursorWriterHandler {
102 id: String,
103 cursor_path: String,
104 last_time_us: Arc<AtomicU64>,
105 last_write: Arc<Mutex<Instant>>,
106 write_interval: Duration,
107}
108
109impl CursorWriterHandler {
110 fn new(id: String, cursor_path: String) -> Self {
111 Self {
112 id,
113 cursor_path,
114 last_time_us: Arc::new(AtomicU64::new(0)),
115 last_write: Arc::new(Mutex::new(Instant::now())),
116 write_interval: Duration::from_secs(30),
117 }
118 }
119
120 async fn maybe_write_cursor(&self) -> Result<()> {
121 let current_time_us = self.last_time_us.load(Ordering::Relaxed);
122 if current_time_us == 0 {
123 return Ok(());
124 }
125
126 let mut last_write = self.last_write.lock().await;
127 if last_write.elapsed() >= self.write_interval {
128 tokio::fs::write(&self.cursor_path, current_time_us.to_string()).await?;
129 *last_write = Instant::now();
130 tracing::debug!("Wrote cursor to {}: {}", self.cursor_path, current_time_us);
131 }
132 Ok(())
133 }
134}
135
136#[async_trait]
137impl EventHandler for CursorWriterHandler {
138 async fn handle_event(&self, event: JetstreamEvent) -> Result<()> {
139 // Extract time_us from any event type
140 let time_us = match &event {
141 JetstreamEvent::Commit { time_us, .. } => *time_us,
142 JetstreamEvent::Delete { time_us, .. } => *time_us,
143 JetstreamEvent::Identity { time_us, .. } => *time_us,
144 JetstreamEvent::Account { time_us, .. } => *time_us,
145 };
146
147 // Update the latest time_us
148 self.last_time_us.store(time_us, Ordering::Relaxed);
149
150 // Try to write the cursor periodically
151 if let Err(err) = self.maybe_write_cursor().await {
152 tracing::warn!("Failed to write cursor: {}", err);
153 }
154
155 Ok(())
156 }
157
158 fn handler_id(&self) -> String {
159 self.id.clone()
160 }
161}
162
163/// Consumer factory for creating event handlers and queue receivers
164pub struct Consumer {}
165
166impl Consumer {
167 /// Create a badge event handler and return the receiver for processing
168 pub fn create_badge_handler(&self) -> (Arc<AwardEventHandler>, BadgeEventReceiver) {
169 let (sender, receiver) = mpsc::unbounded_channel();
170 let handler = Arc::new(AwardEventHandler::new(
171 "badge-processor".to_string(),
172 sender,
173 ));
174 (handler, receiver)
175 }
176
177 /// Create a cursor writer handler for the given cursor path
178 pub fn create_cursor_writer_handler(&self, cursor_path: String) -> Arc<CursorWriterHandler> {
179 Arc::new(CursorWriterHandler::new(
180 "cursor-writer".to_string(),
181 cursor_path,
182 ))
183 }
184}