+75
-22
crates/atproto-jetstream/src/consumer.rs
+75
-22
crates/atproto-jetstream/src/consumer.rs
···
2
2
//!
3
3
//! WebSocket event consumption with background processing and
4
4
//! customizable event handler dispatch.
5
+
//!
6
+
//! ## Memory Efficiency
7
+
//!
8
+
//! This module is optimized for high-throughput event processing with minimal allocations:
9
+
//!
10
+
//! - **Arc-based event sharing**: Events are wrapped in `Arc` and shared across all handlers,
11
+
//! avoiding expensive clones of event data structures.
12
+
//! - **Zero-copy handler IDs**: Handler identifiers use string slices to avoid allocations
13
+
//! during registration and dispatch.
14
+
//! - **Optimized query building**: WebSocket query strings are built with pre-allocated
15
+
//! capacity to minimize reallocations.
16
+
//!
17
+
//! ## Usage
18
+
//!
19
+
//! Implement the `EventHandler` trait to process events:
20
+
//!
21
+
//! ```rust
22
+
//! use atproto_jetstream::{EventHandler, JetstreamEvent};
23
+
//! use async_trait::async_trait;
24
+
//! use std::sync::Arc;
25
+
//! use anyhow::Result;
26
+
//!
27
+
//! struct MyHandler;
28
+
//!
29
+
//! #[async_trait]
30
+
//! impl EventHandler for MyHandler {
31
+
//! async fn handle_event(&self, event: Arc<JetstreamEvent>) -> Result<()> {
32
+
//! // Process event without cloning
33
+
//! Ok(())
34
+
//! }
35
+
//!
36
+
//! fn handler_id(&self) -> &str {
37
+
//! "my-handler"
38
+
//! }
39
+
//! }
40
+
//! ```
5
41
6
42
use crate::errors::ConsumerError;
7
43
use anyhow::Result;
···
133
169
#[async_trait]
134
170
pub trait EventHandler: Send + Sync {
135
171
/// Handle a received event
136
-
async fn handle_event(&self, event: JetstreamEvent) -> Result<()>;
172
+
///
173
+
/// Events are wrapped in Arc to enable efficient sharing across multiple handlers
174
+
/// without cloning the entire event data structure.
175
+
async fn handle_event(&self, event: Arc<JetstreamEvent>) -> Result<()>;
137
176
138
177
/// Get the handler's identifier
139
-
fn handler_id(&self) -> String;
178
+
///
179
+
/// Returns a string slice to avoid unnecessary allocations.
180
+
fn handler_id(&self) -> &str;
140
181
}
141
182
142
183
#[cfg_attr(debug_assertions, derive(Debug))]
···
167
208
pub struct Consumer {
168
209
config: ConsumerTaskConfig,
169
210
handlers: Arc<RwLock<HashMap<String, Arc<dyn EventHandler>>>>,
170
-
event_sender: Arc<RwLock<Option<broadcast::Sender<JetstreamEvent>>>>,
211
+
event_sender: Arc<RwLock<Option<broadcast::Sender<Arc<JetstreamEvent>>>>>,
171
212
}
172
213
173
214
impl Consumer {
···
185
226
let handler_id = handler.handler_id();
186
227
let mut handlers = self.handlers.write().await;
187
228
188
-
if handlers.contains_key(&handler_id) {
229
+
if handlers.contains_key(handler_id) {
189
230
return Err(ConsumerError::HandlerRegistrationFailed(format!(
190
231
"Handler with ID '{}' already registered",
191
232
handler_id
···
193
234
.into());
194
235
}
195
236
196
-
handlers.insert(handler_id.clone(), handler);
237
+
handlers.insert(handler_id.to_string(), handler);
197
238
Ok(())
198
239
}
199
240
···
205
246
}
206
247
207
248
/// Get a broadcast receiver for events
208
-
pub async fn get_event_receiver(&self) -> Result<broadcast::Receiver<JetstreamEvent>> {
249
+
///
250
+
/// Events are wrapped in Arc to enable efficient sharing without cloning.
251
+
pub async fn get_event_receiver(&self) -> Result<broadcast::Receiver<Arc<JetstreamEvent>>> {
209
252
let sender_guard = self.event_sender.read().await;
210
253
match sender_guard.as_ref() {
211
254
Some(sender) => Ok(sender.subscribe()),
···
249
292
tracing::info!("Starting Jetstream consumer");
250
293
251
294
// Build WebSocket URL with query parameters
252
-
let mut query_params = vec![];
295
+
// Pre-allocate capacity to avoid reallocations during string building
296
+
let capacity = 50 // Base parameters
297
+
+ self.config.collections.len() * 30 // Estimate per collection
298
+
+ self.config.dids.len() * 60; // Estimate per DID
299
+
let mut query_string = String::with_capacity(capacity);
253
300
254
301
// Add compression parameter
255
-
query_params.push(format!("compress={}", self.config.compression));
302
+
query_string.push_str("compress=");
303
+
query_string.push_str(if self.config.compression { "true" } else { "false" });
256
304
257
305
// Add requireHello parameter
258
-
query_params.push(format!("requireHello={}", self.config.require_hello));
306
+
query_string.push_str("&requireHello=");
307
+
query_string.push_str(if self.config.require_hello { "true" } else { "false" });
259
308
260
309
// Add wantedCollections if specified (each collection as a separate query parameter)
261
310
if !self.config.collections.is_empty() && !self.config.require_hello {
262
311
for collection in &self.config.collections {
263
-
query_params.push(format!(
264
-
"wantedCollections={}",
265
-
urlencoding::encode(collection)
266
-
));
312
+
query_string.push_str("&wantedCollections=");
313
+
query_string.push_str(&urlencoding::encode(collection));
267
314
}
268
315
}
269
316
270
317
// Add wantedDids if specified (each DID as a separate query parameter)
271
318
if !self.config.dids.is_empty() && !self.config.require_hello {
272
319
for did in &self.config.dids {
273
-
query_params.push(format!("wantedDids={}", urlencoding::encode(did)));
320
+
query_string.push_str("&wantedDids=");
321
+
query_string.push_str(&urlencoding::encode(did));
274
322
}
275
323
}
276
324
277
325
// Add maxMessageSizeBytes if specified
278
326
if let Some(max_size) = self.config.max_message_size_bytes {
279
-
query_params.push(format!("maxMessageSizeBytes={}", max_size));
327
+
use std::fmt::Write;
328
+
write!(&mut query_string, "&maxMessageSizeBytes={}", max_size).unwrap();
280
329
}
281
330
282
331
// Add cursor if specified
283
332
if let Some(cursor) = self.config.cursor {
284
-
query_params.push(format!("cursor={}", cursor));
333
+
use std::fmt::Write;
334
+
write!(&mut query_string, "&cursor={}", cursor).unwrap();
285
335
}
286
-
287
-
let query_string = query_params.join("&");
288
336
let ws_url = Uri::from_str(&format!(
289
337
"wss://{}/subscribe?{}",
290
338
self.config.jetstream_hostname, query_string
···
404
452
}
405
453
406
454
/// Dispatch event to all registered handlers
455
+
///
456
+
/// Wraps the event in Arc once and shares it across all handlers,
457
+
/// avoiding expensive clones of the event data structure.
407
458
async fn dispatch_to_handlers(&self, event: JetstreamEvent) -> Result<()> {
408
459
let handlers = self.handlers.read().await;
460
+
let event = Arc::new(event);
409
461
410
462
for (handler_id, handler) in handlers.iter() {
411
463
let handler_span = tracing::debug_span!("handler_dispatch", handler_id = %handler_id);
464
+
let event_ref = Arc::clone(&event);
412
465
async {
413
-
if let Err(err) = handler.handle_event(event.clone()).await {
466
+
if let Err(err) = handler.handle_event(event_ref).await {
414
467
tracing::error!(
415
468
error = ?err,
416
469
handler_id = %handler_id,
···
440
493
441
494
#[async_trait]
442
495
impl EventHandler for LoggingHandler {
443
-
async fn handle_event(&self, _event: JetstreamEvent) -> Result<()> {
496
+
async fn handle_event(&self, _event: Arc<JetstreamEvent>) -> Result<()> {
444
497
Ok(())
445
498
}
446
499
447
-
fn handler_id(&self) -> String {
448
-
self.id.clone()
500
+
fn handler_id(&self) -> &str {
501
+
&self.id
449
502
}
450
503
}
451
504