Highly ambitious ATProtocol AppView service and sdks
at main 8.2 kB view raw
1//! PubSub infrastructure for broadcasting GraphQL subscription events 2//! 3//! This module provides a publish-subscribe mechanism for broadcasting record 4//! updates from the Jetstream consumer to active GraphQL subscriptions. 5 6use serde::{Deserialize, Serialize}; 7use std::collections::HashMap; 8use std::sync::Arc; 9use tokio::sync::{RwLock, broadcast}; 10use tracing::{debug, info}; 11 12/// Event broadcast when a record is created or updated 13#[derive(Clone, Debug, Serialize, Deserialize)] 14pub struct RecordUpdateEvent { 15 pub uri: String, 16 pub cid: String, 17 pub did: String, 18 pub collection: String, 19 pub value: serde_json::Value, 20 pub slice_uri: String, 21 pub indexed_at: String, 22 pub operation: RecordOperation, 23} 24 25/// Type of record operation 26#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] 27pub enum RecordOperation { 28 Create, 29 Update, 30 Delete, 31} 32 33/// PubSub manager for broadcasting events to subscribers 34/// 35/// Each slice has its own broadcast channel to avoid cross-slice event leaking. 36/// Channels are created lazily when the first subscriber for a slice connects. 37pub struct GraphQLPubSub { 38 /// Map of slice_uri -> broadcast sender 39 /// Using broadcast channel allows multiple subscribers per slice 40 channels: Arc<RwLock<HashMap<String, broadcast::Sender<RecordUpdateEvent>>>>, 41 /// Channel capacity (number of events to buffer) 42 capacity: usize, 43} 44 45impl GraphQLPubSub { 46 /// Create a new PubSub manager 47 /// 48 /// # Arguments 49 /// * `capacity` - Number of events to buffer per slice (default: 1000) 50 pub fn new(capacity: usize) -> Self { 51 info!("Initializing GraphQL PubSub with capacity {}", capacity); 52 Self { 53 channels: Arc::new(RwLock::new(HashMap::new())), 54 capacity, 55 } 56 } 57 58 /// Publish an event to all subscribers of a slice 59 /// 60 /// If no subscribers exist, the event is dropped silently. 61 pub async fn publish(&self, event: RecordUpdateEvent) { 62 let slice_uri = event.slice_uri.clone(); 63 64 let channels = self.channels.read().await; 65 if let Some(sender) = channels.get(&slice_uri) { 66 // Try to send, ignore if no active receivers 67 match sender.send(event.clone()) { 68 Ok(receiver_count) => { 69 debug!( 70 "Published {} event for {} to {} subscriber(s)", 71 match event.operation { 72 RecordOperation::Create => "CREATE", 73 RecordOperation::Update => "UPDATE", 74 RecordOperation::Delete => "DELETE", 75 }, 76 event.collection, 77 receiver_count 78 ); 79 } 80 Err(_) => { 81 // No receivers, which is fine 82 debug!("No active subscribers for slice {}", slice_uri); 83 } 84 } 85 } 86 } 87 88 /// Subscribe to events for a specific slice 89 /// 90 /// Returns a receiver that will receive all future events for the slice. 91 /// Creates a new broadcast channel if one doesn't exist yet. 92 pub async fn subscribe(&self, slice_uri: &str) -> broadcast::Receiver<RecordUpdateEvent> { 93 let mut channels = self.channels.write().await; 94 95 let sender = channels.entry(slice_uri.to_string()).or_insert_with(|| { 96 info!("Creating new broadcast channel for slice: {}", slice_uri); 97 let (tx, _) = broadcast::channel(self.capacity); 98 tx 99 }); 100 101 sender.subscribe() 102 } 103 104 /// Get statistics about active channels and subscribers 105 pub async fn stats(&self) -> PubSubStats { 106 let channels = self.channels.read().await; 107 PubSubStats { 108 active_channels: channels.len(), 109 total_subscribers: channels.values().map(|s| s.receiver_count()).sum(), 110 } 111 } 112 113 /// Clean up channels with no subscribers 114 /// 115 /// Should be called periodically to prevent memory leaks 116 pub async fn cleanup_empty_channels(&self) { 117 let mut channels = self.channels.write().await; 118 let before_count = channels.len(); 119 120 channels.retain(|slice_uri, sender| { 121 let has_subscribers = sender.receiver_count() > 0; 122 if !has_subscribers { 123 debug!("Removing empty broadcast channel for slice: {}", slice_uri); 124 } 125 has_subscribers 126 }); 127 128 let removed = before_count - channels.len(); 129 if removed > 0 { 130 info!("Cleaned up {} empty broadcast channel(s)", removed); 131 } 132 } 133} 134 135/// Statistics about the PubSub system 136#[derive(Debug, Clone)] 137pub struct PubSubStats { 138 pub active_channels: usize, 139 pub total_subscribers: usize, 140} 141 142impl Default for GraphQLPubSub { 143 fn default() -> Self { 144 Self::new(1000) 145 } 146} 147 148// Global PubSub instance 149// This is initialized once at application startup and shared across 150// the Jetstream consumer and GraphQL subscription handlers. 151lazy_static::lazy_static! { 152 pub static ref PUBSUB: GraphQLPubSub = GraphQLPubSub::default(); 153} 154 155/// Start periodic cleanup task for empty channels 156pub fn start_cleanup_task() { 157 tokio::spawn(async { 158 let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(300)); // Every 5 minutes 159 loop { 160 interval.tick().await; 161 PUBSUB.cleanup_empty_channels().await; 162 163 let stats = PUBSUB.stats().await; 164 info!( 165 "PubSub stats: {} active channels, {} total subscribers", 166 stats.active_channels, stats.total_subscribers 167 ); 168 } 169 }); 170} 171 172#[cfg(test)] 173mod tests { 174 use super::*; 175 176 #[tokio::test] 177 async fn test_pubsub_broadcast() { 178 let pubsub = GraphQLPubSub::new(100); 179 180 // Subscribe to events 181 let mut rx = pubsub.subscribe("test://slice").await; 182 183 // Publish an event 184 let event = RecordUpdateEvent { 185 uri: "at://did:plc:test/app.test/123".to_string(), 186 cid: "bafytest".to_string(), 187 did: "did:plc:test".to_string(), 188 collection: "app.test".to_string(), 189 value: serde_json::json!({"text": "Hello"}), 190 slice_uri: "test://slice".to_string(), 191 indexed_at: "2024-01-01T00:00:00Z".to_string(), 192 operation: RecordOperation::Create, 193 }; 194 195 pubsub.publish(event.clone()).await; 196 197 // Receive the event 198 let received = rx.recv().await.unwrap(); 199 assert_eq!(received.uri, event.uri); 200 assert_eq!(received.collection, event.collection); 201 } 202 203 #[tokio::test] 204 async fn test_multiple_subscribers() { 205 let pubsub = GraphQLPubSub::new(100); 206 207 let mut rx1 = pubsub.subscribe("test://slice").await; 208 let mut rx2 = pubsub.subscribe("test://slice").await; 209 210 let event = RecordUpdateEvent { 211 uri: "at://did:plc:test/app.test/123".to_string(), 212 cid: "bafytest".to_string(), 213 did: "did:plc:test".to_string(), 214 collection: "app.test".to_string(), 215 value: serde_json::json!({"text": "Hello"}), 216 slice_uri: "test://slice".to_string(), 217 indexed_at: "2024-01-01T00:00:00Z".to_string(), 218 operation: RecordOperation::Create, 219 }; 220 221 pubsub.publish(event.clone()).await; 222 223 // Both subscribers should receive the event 224 let received1 = rx1.recv().await.unwrap(); 225 let received2 = rx2.recv().await.unwrap(); 226 227 assert_eq!(received1.uri, event.uri); 228 assert_eq!(received2.uri, event.uri); 229 } 230 231 #[tokio::test] 232 async fn test_cleanup_empty_channels() { 233 let pubsub = GraphQLPubSub::new(100); 234 235 // Create a subscriber and drop it 236 { 237 let _rx = pubsub.subscribe("test://slice").await; 238 assert_eq!(pubsub.stats().await.active_channels, 1); 239 } 240 241 // Cleanup should remove the empty channel 242 pubsub.cleanup_empty_channels().await; 243 assert_eq!(pubsub.stats().await.active_channels, 0); 244 } 245}