Highly ambitious ATProtocol AppView service and sdks
1//! PubSub infrastructure for broadcasting GraphQL subscription events
2//!
3//! This module provides a publish-subscribe mechanism for broadcasting record
4//! updates from the Jetstream consumer to active GraphQL subscriptions.
5
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8use std::sync::Arc;
9use tokio::sync::{RwLock, broadcast};
10use tracing::{debug, info};
11
12/// Event broadcast when a record is created or updated
13#[derive(Clone, Debug, Serialize, Deserialize)]
14pub struct RecordUpdateEvent {
15 pub uri: String,
16 pub cid: String,
17 pub did: String,
18 pub collection: String,
19 pub value: serde_json::Value,
20 pub slice_uri: String,
21 pub indexed_at: String,
22 pub operation: RecordOperation,
23}
24
25/// Type of record operation
26#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
27pub enum RecordOperation {
28 Create,
29 Update,
30 Delete,
31}
32
33/// PubSub manager for broadcasting events to subscribers
34///
35/// Each slice has its own broadcast channel to avoid cross-slice event leaking.
36/// Channels are created lazily when the first subscriber for a slice connects.
37pub struct GraphQLPubSub {
38 /// Map of slice_uri -> broadcast sender
39 /// Using broadcast channel allows multiple subscribers per slice
40 channels: Arc<RwLock<HashMap<String, broadcast::Sender<RecordUpdateEvent>>>>,
41 /// Channel capacity (number of events to buffer)
42 capacity: usize,
43}
44
45impl GraphQLPubSub {
46 /// Create a new PubSub manager
47 ///
48 /// # Arguments
49 /// * `capacity` - Number of events to buffer per slice (default: 1000)
50 pub fn new(capacity: usize) -> Self {
51 info!("Initializing GraphQL PubSub with capacity {}", capacity);
52 Self {
53 channels: Arc::new(RwLock::new(HashMap::new())),
54 capacity,
55 }
56 }
57
58 /// Publish an event to all subscribers of a slice
59 ///
60 /// If no subscribers exist, the event is dropped silently.
61 pub async fn publish(&self, event: RecordUpdateEvent) {
62 let slice_uri = event.slice_uri.clone();
63
64 let channels = self.channels.read().await;
65 if let Some(sender) = channels.get(&slice_uri) {
66 // Try to send, ignore if no active receivers
67 match sender.send(event.clone()) {
68 Ok(receiver_count) => {
69 debug!(
70 "Published {} event for {} to {} subscriber(s)",
71 match event.operation {
72 RecordOperation::Create => "CREATE",
73 RecordOperation::Update => "UPDATE",
74 RecordOperation::Delete => "DELETE",
75 },
76 event.collection,
77 receiver_count
78 );
79 }
80 Err(_) => {
81 // No receivers, which is fine
82 debug!("No active subscribers for slice {}", slice_uri);
83 }
84 }
85 }
86 }
87
88 /// Subscribe to events for a specific slice
89 ///
90 /// Returns a receiver that will receive all future events for the slice.
91 /// Creates a new broadcast channel if one doesn't exist yet.
92 pub async fn subscribe(&self, slice_uri: &str) -> broadcast::Receiver<RecordUpdateEvent> {
93 let mut channels = self.channels.write().await;
94
95 let sender = channels.entry(slice_uri.to_string()).or_insert_with(|| {
96 info!("Creating new broadcast channel for slice: {}", slice_uri);
97 let (tx, _) = broadcast::channel(self.capacity);
98 tx
99 });
100
101 sender.subscribe()
102 }
103
104 /// Get statistics about active channels and subscribers
105 pub async fn stats(&self) -> PubSubStats {
106 let channels = self.channels.read().await;
107 PubSubStats {
108 active_channels: channels.len(),
109 total_subscribers: channels.values().map(|s| s.receiver_count()).sum(),
110 }
111 }
112
113 /// Clean up channels with no subscribers
114 ///
115 /// Should be called periodically to prevent memory leaks
116 pub async fn cleanup_empty_channels(&self) {
117 let mut channels = self.channels.write().await;
118 let before_count = channels.len();
119
120 channels.retain(|slice_uri, sender| {
121 let has_subscribers = sender.receiver_count() > 0;
122 if !has_subscribers {
123 debug!("Removing empty broadcast channel for slice: {}", slice_uri);
124 }
125 has_subscribers
126 });
127
128 let removed = before_count - channels.len();
129 if removed > 0 {
130 info!("Cleaned up {} empty broadcast channel(s)", removed);
131 }
132 }
133}
134
135/// Statistics about the PubSub system
136#[derive(Debug, Clone)]
137pub struct PubSubStats {
138 pub active_channels: usize,
139 pub total_subscribers: usize,
140}
141
142impl Default for GraphQLPubSub {
143 fn default() -> Self {
144 Self::new(1000)
145 }
146}
147
148// Global PubSub instance
149// This is initialized once at application startup and shared across
150// the Jetstream consumer and GraphQL subscription handlers.
151lazy_static::lazy_static! {
152 pub static ref PUBSUB: GraphQLPubSub = GraphQLPubSub::default();
153}
154
155/// Start periodic cleanup task for empty channels
156pub fn start_cleanup_task() {
157 tokio::spawn(async {
158 let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(300)); // Every 5 minutes
159 loop {
160 interval.tick().await;
161 PUBSUB.cleanup_empty_channels().await;
162
163 let stats = PUBSUB.stats().await;
164 info!(
165 "PubSub stats: {} active channels, {} total subscribers",
166 stats.active_channels, stats.total_subscribers
167 );
168 }
169 });
170}
171
172#[cfg(test)]
173mod tests {
174 use super::*;
175
176 #[tokio::test]
177 async fn test_pubsub_broadcast() {
178 let pubsub = GraphQLPubSub::new(100);
179
180 // Subscribe to events
181 let mut rx = pubsub.subscribe("test://slice").await;
182
183 // Publish an event
184 let event = RecordUpdateEvent {
185 uri: "at://did:plc:test/app.test/123".to_string(),
186 cid: "bafytest".to_string(),
187 did: "did:plc:test".to_string(),
188 collection: "app.test".to_string(),
189 value: serde_json::json!({"text": "Hello"}),
190 slice_uri: "test://slice".to_string(),
191 indexed_at: "2024-01-01T00:00:00Z".to_string(),
192 operation: RecordOperation::Create,
193 };
194
195 pubsub.publish(event.clone()).await;
196
197 // Receive the event
198 let received = rx.recv().await.unwrap();
199 assert_eq!(received.uri, event.uri);
200 assert_eq!(received.collection, event.collection);
201 }
202
203 #[tokio::test]
204 async fn test_multiple_subscribers() {
205 let pubsub = GraphQLPubSub::new(100);
206
207 let mut rx1 = pubsub.subscribe("test://slice").await;
208 let mut rx2 = pubsub.subscribe("test://slice").await;
209
210 let event = RecordUpdateEvent {
211 uri: "at://did:plc:test/app.test/123".to_string(),
212 cid: "bafytest".to_string(),
213 did: "did:plc:test".to_string(),
214 collection: "app.test".to_string(),
215 value: serde_json::json!({"text": "Hello"}),
216 slice_uri: "test://slice".to_string(),
217 indexed_at: "2024-01-01T00:00:00Z".to_string(),
218 operation: RecordOperation::Create,
219 };
220
221 pubsub.publish(event.clone()).await;
222
223 // Both subscribers should receive the event
224 let received1 = rx1.recv().await.unwrap();
225 let received2 = rx2.recv().await.unwrap();
226
227 assert_eq!(received1.uri, event.uri);
228 assert_eq!(received2.uri, event.uri);
229 }
230
231 #[tokio::test]
232 async fn test_cleanup_empty_channels() {
233 let pubsub = GraphQLPubSub::new(100);
234
235 // Create a subscriber and drop it
236 {
237 let _rx = pubsub.subscribe("test://slice").await;
238 assert_eq!(pubsub.stats().await.active_channels, 1);
239 }
240
241 // Cleanup should remove the empty channel
242 pubsub.cleanup_empty_channels().await;
243 assert_eq!(pubsub.stats().await.active_channels, 0);
244 }
245}