+1
api/Cargo.lock
+1
api/Cargo.lock
+1
api/Cargo.toml
+1
api/Cargo.toml
+67
-2
api/src/graphql/handler.rs
+67
-2
api/src/graphql/handler.rs
···
1
1
//! GraphQL HTTP handler for Axum
2
2
3
3
use async_graphql::dynamic::Schema;
4
+
use async_graphql::http::{WebSocket as GraphQLWebSocket, WebSocketProtocols, WsMessage};
4
5
use async_graphql_axum::{GraphQLRequest, GraphQLResponse};
5
6
use axum::{
6
-
extract::{Query, State},
7
+
extract::{ws::{WebSocket, Message}, Query, State, WebSocketUpgrade},
7
8
http::{HeaderMap, StatusCode},
8
-
response::Html,
9
+
response::{Html, Response},
9
10
};
11
+
use futures_util::{StreamExt, SinkExt};
10
12
use serde::Deserialize;
11
13
use std::sync::Arc;
12
14
use tokio::sync::RwLock;
···
167
169
);
168
170
169
171
Ok(Html(graphiql_html))
172
+
}
173
+
174
+
/// GraphQL WebSocket handler for subscriptions
175
+
/// Accepts slice URI from query parameter (?slice=...)
176
+
pub async fn graphql_subscription_handler(
177
+
State(state): State<AppState>,
178
+
Query(params): Query<GraphQLParams>,
179
+
ws: WebSocketUpgrade,
180
+
) -> Result<Response, (StatusCode, String)> {
181
+
let slice_uri = params.slice.ok_or_else(|| {
182
+
(
183
+
StatusCode::BAD_REQUEST,
184
+
"Missing slice parameter. Provide ?slice=... query parameter".to_string(),
185
+
)
186
+
})?;
187
+
188
+
let schema = match get_or_build_schema(&state, &slice_uri).await {
189
+
Ok(s) => s,
190
+
Err(e) => {
191
+
tracing::error!("Failed to get GraphQL schema: {:?}", e);
192
+
return Err((
193
+
StatusCode::INTERNAL_SERVER_ERROR,
194
+
format!("Schema error: {:?}", e),
195
+
));
196
+
}
197
+
};
198
+
199
+
// Upgrade to WebSocket and handle GraphQL subscriptions manually
200
+
Ok(ws
201
+
.protocols(["graphql-transport-ws", "graphql-ws"])
202
+
.on_upgrade(move |socket| handle_graphql_ws(socket, schema)))
203
+
}
204
+
205
+
/// Handle GraphQL WebSocket connection
206
+
async fn handle_graphql_ws(socket: WebSocket, schema: Schema) {
207
+
let (ws_sender, ws_receiver) = socket.split();
208
+
209
+
// Convert axum WebSocket messages to strings for async-graphql
210
+
let input = ws_receiver.filter_map(|msg| {
211
+
futures_util::future::ready(match msg {
212
+
Ok(Message::Text(text)) => Some(text.to_string()),
213
+
_ => None, // Ignore other message types
214
+
})
215
+
});
216
+
217
+
// Create GraphQL WebSocket handler
218
+
let mut stream = GraphQLWebSocket::new(schema, input, WebSocketProtocols::GraphQLWS);
219
+
220
+
// Send GraphQL messages back through WebSocket
221
+
let mut ws_sender = ws_sender;
222
+
while let Some(msg) = stream.next().await {
223
+
let axum_msg = match msg {
224
+
WsMessage::Text(text) => Message::Text(text.into()),
225
+
WsMessage::Close(code, reason) => Message::Close(Some(axum::extract::ws::CloseFrame {
226
+
code,
227
+
reason: reason.into(),
228
+
})),
229
+
};
230
+
231
+
if ws_sender.send(axum_msg).await.is_err() {
232
+
break;
233
+
}
234
+
}
170
235
}
171
236
172
237
/// Gets schema from cache or builds it if not cached
+3
-1
api/src/graphql/mod.rs
+3
-1
api/src/graphql/mod.rs
···
7
7
mod dataloaders;
8
8
mod types;
9
9
pub mod handler;
10
+
pub mod pubsub;
10
11
11
12
pub use schema_builder::build_graphql_schema;
12
-
pub use handler::{graphql_handler, graphql_playground};
13
+
pub use handler::{graphql_handler, graphql_playground, graphql_subscription_handler};
14
+
pub use pubsub::{RecordUpdateEvent, RecordOperation, PUBSUB};
+246
api/src/graphql/pubsub.rs
+246
api/src/graphql/pubsub.rs
···
1
+
//! PubSub infrastructure for broadcasting GraphQL subscription events
2
+
//!
3
+
//! This module provides a publish-subscribe mechanism for broadcasting record
4
+
//! updates from the Jetstream consumer to active GraphQL subscriptions.
5
+
6
+
use serde::{Deserialize, Serialize};
7
+
use std::collections::HashMap;
8
+
use std::sync::Arc;
9
+
use tokio::sync::{broadcast, RwLock};
10
+
use tracing::{debug, info};
11
+
12
+
/// Event broadcast when a record is created or updated
13
+
#[derive(Clone, Debug, Serialize, Deserialize)]
14
+
pub struct RecordUpdateEvent {
15
+
pub uri: String,
16
+
pub cid: String,
17
+
pub did: String,
18
+
pub collection: String,
19
+
pub value: serde_json::Value,
20
+
pub slice_uri: String,
21
+
pub indexed_at: String,
22
+
pub operation: RecordOperation,
23
+
}
24
+
25
+
/// Type of record operation
26
+
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
27
+
pub enum RecordOperation {
28
+
Create,
29
+
Update,
30
+
Delete,
31
+
}
32
+
33
+
34
+
/// PubSub manager for broadcasting events to subscribers
35
+
///
36
+
/// Each slice has its own broadcast channel to avoid cross-slice event leaking.
37
+
/// Channels are created lazily when the first subscriber for a slice connects.
38
+
pub struct GraphQLPubSub {
39
+
/// Map of slice_uri -> broadcast sender
40
+
/// Using broadcast channel allows multiple subscribers per slice
41
+
channels: Arc<RwLock<HashMap<String, broadcast::Sender<RecordUpdateEvent>>>>,
42
+
/// Channel capacity (number of events to buffer)
43
+
capacity: usize,
44
+
}
45
+
46
+
impl GraphQLPubSub {
47
+
/// Create a new PubSub manager
48
+
///
49
+
/// # Arguments
50
+
/// * `capacity` - Number of events to buffer per slice (default: 1000)
51
+
pub fn new(capacity: usize) -> Self {
52
+
info!("Initializing GraphQL PubSub with capacity {}", capacity);
53
+
Self {
54
+
channels: Arc::new(RwLock::new(HashMap::new())),
55
+
capacity,
56
+
}
57
+
}
58
+
59
+
/// Publish an event to all subscribers of a slice
60
+
///
61
+
/// If no subscribers exist, the event is dropped silently.
62
+
pub async fn publish(&self, event: RecordUpdateEvent) {
63
+
let slice_uri = event.slice_uri.clone();
64
+
65
+
let channels = self.channels.read().await;
66
+
if let Some(sender) = channels.get(&slice_uri) {
67
+
// Try to send, ignore if no active receivers
68
+
match sender.send(event.clone()) {
69
+
Ok(receiver_count) => {
70
+
debug!(
71
+
"Published {} event for {} to {} subscriber(s)",
72
+
match event.operation {
73
+
RecordOperation::Create => "CREATE",
74
+
RecordOperation::Update => "UPDATE",
75
+
RecordOperation::Delete => "DELETE",
76
+
},
77
+
event.collection,
78
+
receiver_count
79
+
);
80
+
}
81
+
Err(_) => {
82
+
// No receivers, which is fine
83
+
debug!("No active subscribers for slice {}", slice_uri);
84
+
}
85
+
}
86
+
}
87
+
}
88
+
89
+
/// Subscribe to events for a specific slice
90
+
///
91
+
/// Returns a receiver that will receive all future events for the slice.
92
+
/// Creates a new broadcast channel if one doesn't exist yet.
93
+
pub async fn subscribe(&self, slice_uri: &str) -> broadcast::Receiver<RecordUpdateEvent> {
94
+
let mut channels = self.channels.write().await;
95
+
96
+
let sender = channels.entry(slice_uri.to_string()).or_insert_with(|| {
97
+
info!("Creating new broadcast channel for slice: {}", slice_uri);
98
+
let (tx, _) = broadcast::channel(self.capacity);
99
+
tx
100
+
});
101
+
102
+
sender.subscribe()
103
+
}
104
+
105
+
/// Get statistics about active channels and subscribers
106
+
pub async fn stats(&self) -> PubSubStats {
107
+
let channels = self.channels.read().await;
108
+
PubSubStats {
109
+
active_channels: channels.len(),
110
+
total_subscribers: channels.values().map(|s| s.receiver_count()).sum(),
111
+
}
112
+
}
113
+
114
+
/// Clean up channels with no subscribers
115
+
///
116
+
/// Should be called periodically to prevent memory leaks
117
+
pub async fn cleanup_empty_channels(&self) {
118
+
let mut channels = self.channels.write().await;
119
+
let before_count = channels.len();
120
+
121
+
channels.retain(|slice_uri, sender| {
122
+
let has_subscribers = sender.receiver_count() > 0;
123
+
if !has_subscribers {
124
+
debug!("Removing empty broadcast channel for slice: {}", slice_uri);
125
+
}
126
+
has_subscribers
127
+
});
128
+
129
+
let removed = before_count - channels.len();
130
+
if removed > 0 {
131
+
info!("Cleaned up {} empty broadcast channel(s)", removed);
132
+
}
133
+
}
134
+
}
135
+
136
+
/// Statistics about the PubSub system
137
+
#[derive(Debug, Clone)]
138
+
pub struct PubSubStats {
139
+
pub active_channels: usize,
140
+
pub total_subscribers: usize,
141
+
}
142
+
143
+
impl Default for GraphQLPubSub {
144
+
fn default() -> Self {
145
+
Self::new(1000)
146
+
}
147
+
}
148
+
149
+
// Global PubSub instance
150
+
// This is initialized once at application startup and shared across
151
+
// the Jetstream consumer and GraphQL subscription handlers.
152
+
lazy_static::lazy_static! {
153
+
pub static ref PUBSUB: GraphQLPubSub = GraphQLPubSub::default();
154
+
}
155
+
156
+
/// Start periodic cleanup task for empty channels
157
+
pub fn start_cleanup_task() {
158
+
tokio::spawn(async {
159
+
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(300)); // Every 5 minutes
160
+
loop {
161
+
interval.tick().await;
162
+
PUBSUB.cleanup_empty_channels().await;
163
+
164
+
let stats = PUBSUB.stats().await;
165
+
info!(
166
+
"PubSub stats: {} active channels, {} total subscribers",
167
+
stats.active_channels, stats.total_subscribers
168
+
);
169
+
}
170
+
});
171
+
}
172
+
173
+
#[cfg(test)]
174
+
mod tests {
175
+
use super::*;
176
+
177
+
#[tokio::test]
178
+
async fn test_pubsub_broadcast() {
179
+
let pubsub = GraphQLPubSub::new(100);
180
+
181
+
// Subscribe to events
182
+
let mut rx = pubsub.subscribe("test://slice").await;
183
+
184
+
// Publish an event
185
+
let event = RecordUpdateEvent {
186
+
uri: "at://did:plc:test/app.test/123".to_string(),
187
+
cid: "bafytest".to_string(),
188
+
did: "did:plc:test".to_string(),
189
+
collection: "app.test".to_string(),
190
+
value: serde_json::json!({"text": "Hello"}),
191
+
slice_uri: "test://slice".to_string(),
192
+
indexed_at: "2024-01-01T00:00:00Z".to_string(),
193
+
operation: RecordOperation::Create,
194
+
};
195
+
196
+
pubsub.publish(event.clone()).await;
197
+
198
+
// Receive the event
199
+
let received = rx.recv().await.unwrap();
200
+
assert_eq!(received.uri, event.uri);
201
+
assert_eq!(received.collection, event.collection);
202
+
}
203
+
204
+
#[tokio::test]
205
+
async fn test_multiple_subscribers() {
206
+
let pubsub = GraphQLPubSub::new(100);
207
+
208
+
let mut rx1 = pubsub.subscribe("test://slice").await;
209
+
let mut rx2 = pubsub.subscribe("test://slice").await;
210
+
211
+
let event = RecordUpdateEvent {
212
+
uri: "at://did:plc:test/app.test/123".to_string(),
213
+
cid: "bafytest".to_string(),
214
+
did: "did:plc:test".to_string(),
215
+
collection: "app.test".to_string(),
216
+
value: serde_json::json!({"text": "Hello"}),
217
+
slice_uri: "test://slice".to_string(),
218
+
indexed_at: "2024-01-01T00:00:00Z".to_string(),
219
+
operation: RecordOperation::Create,
220
+
};
221
+
222
+
pubsub.publish(event.clone()).await;
223
+
224
+
// Both subscribers should receive the event
225
+
let received1 = rx1.recv().await.unwrap();
226
+
let received2 = rx2.recv().await.unwrap();
227
+
228
+
assert_eq!(received1.uri, event.uri);
229
+
assert_eq!(received2.uri, event.uri);
230
+
}
231
+
232
+
#[tokio::test]
233
+
async fn test_cleanup_empty_channels() {
234
+
let pubsub = GraphQLPubSub::new(100);
235
+
236
+
// Create a subscriber and drop it
237
+
{
238
+
let _rx = pubsub.subscribe("test://slice").await;
239
+
assert_eq!(pubsub.stats().await.active_channels, 1);
240
+
}
241
+
242
+
// Cleanup should remove the empty channel
243
+
pubsub.cleanup_empty_channels().await;
244
+
assert_eq!(pubsub.stats().await.active_channels, 0);
245
+
}
246
+
}
+272
-3
api/src/graphql/schema_builder.rs
+272
-3
api/src/graphql/schema_builder.rs
···
3
3
//! This module generates GraphQL schemas at runtime based on lexicon definitions
4
4
//! stored in the database, enabling flexible querying of slice records.
5
5
6
-
use async_graphql::dynamic::{Field, FieldFuture, FieldValue, Object, Schema, Scalar, TypeRef, InputObject, InputValue, Enum, EnumItem};
6
+
use async_graphql::dynamic::{Field, FieldFuture, FieldValue, Object, Schema, Scalar, TypeRef, InputObject, InputValue, Enum, EnumItem, Subscription, SubscriptionField, SubscriptionFieldFuture};
7
7
use async_graphql::{Error, Value as GraphQLValue};
8
8
use base64::engine::general_purpose;
9
9
use base64::Engine;
···
14
14
15
15
use crate::database::Database;
16
16
use crate::graphql::types::{extract_collection_fields, extract_record_key, GraphQLField, GraphQLType};
17
+
use crate::graphql::PUBSUB;
17
18
18
19
/// Metadata about a collection for cross-referencing
19
20
#[derive(Clone)]
···
567
568
// Build Mutation type
568
569
let mutation = create_mutation_type(database.clone(), slice_uri.clone());
569
570
571
+
// Build Subscription type with collection-specific subscriptions
572
+
let subscription = create_subscription_type(slice_uri.clone(), &lexicons);
573
+
570
574
// Build and return the schema
571
-
let mut schema_builder = Schema::build(query.type_name(), Some(mutation.type_name()), None)
575
+
let mut schema_builder = Schema::build(query.type_name(), Some(mutation.type_name()), Some(subscription.type_name()))
572
576
.register(query)
573
-
.register(mutation);
577
+
.register(mutation)
578
+
.register(subscription);
574
579
575
580
// Register JSON scalar type for complex fields
576
581
let json_scalar = Scalar::new("JSON");
···
606
611
// Register PageInfo type
607
612
let page_info_type = create_page_info_type();
608
613
schema_builder = schema_builder.register(page_info_type);
614
+
615
+
// Register RecordUpdate type for subscriptions
616
+
let record_update_type = create_record_update_type();
617
+
schema_builder = schema_builder.register(record_update_type);
609
618
610
619
// Register all object types
611
620
for obj in objects_to_register {
···
1736
1745
}
1737
1746
}
1738
1747
}
1748
+
1749
+
/// Creates the RecordUpdate type for subscription events
1750
+
fn create_record_update_type() -> Object {
1751
+
let mut record_update = Object::new("RecordUpdate");
1752
+
1753
+
record_update = record_update.field(Field::new("uri", TypeRef::named_nn(TypeRef::STRING), |ctx| {
1754
+
FieldFuture::new(async move {
1755
+
let value = ctx.parent_value.downcast_ref::<GraphQLValue>()
1756
+
.ok_or_else(|| Error::new("Failed to downcast RecordUpdate"))?;
1757
+
if let GraphQLValue::Object(obj) = value {
1758
+
if let Some(uri) = obj.get("uri") {
1759
+
return Ok(Some(uri.clone()));
1760
+
}
1761
+
}
1762
+
Ok(None)
1763
+
})
1764
+
}));
1765
+
1766
+
record_update = record_update.field(Field::new("cid", TypeRef::named_nn(TypeRef::STRING), |ctx| {
1767
+
FieldFuture::new(async move {
1768
+
let value = ctx.parent_value.downcast_ref::<GraphQLValue>()
1769
+
.ok_or_else(|| Error::new("Failed to downcast RecordUpdate"))?;
1770
+
if let GraphQLValue::Object(obj) = value {
1771
+
if let Some(cid) = obj.get("cid") {
1772
+
return Ok(Some(cid.clone()));
1773
+
}
1774
+
}
1775
+
Ok(None)
1776
+
})
1777
+
}));
1778
+
1779
+
record_update = record_update.field(Field::new("did", TypeRef::named_nn(TypeRef::STRING), |ctx| {
1780
+
FieldFuture::new(async move {
1781
+
let value = ctx.parent_value.downcast_ref::<GraphQLValue>()
1782
+
.ok_or_else(|| Error::new("Failed to downcast RecordUpdate"))?;
1783
+
if let GraphQLValue::Object(obj) = value {
1784
+
if let Some(did) = obj.get("did") {
1785
+
return Ok(Some(did.clone()));
1786
+
}
1787
+
}
1788
+
Ok(None)
1789
+
})
1790
+
}));
1791
+
1792
+
record_update = record_update.field(Field::new("collection", TypeRef::named_nn(TypeRef::STRING), |ctx| {
1793
+
FieldFuture::new(async move {
1794
+
let value = ctx.parent_value.downcast_ref::<GraphQLValue>()
1795
+
.ok_or_else(|| Error::new("Failed to downcast RecordUpdate"))?;
1796
+
if let GraphQLValue::Object(obj) = value {
1797
+
if let Some(collection) = obj.get("collection") {
1798
+
return Ok(Some(collection.clone()));
1799
+
}
1800
+
}
1801
+
Ok(None)
1802
+
})
1803
+
}));
1804
+
1805
+
record_update = record_update.field(Field::new("indexedAt", TypeRef::named_nn(TypeRef::STRING), |ctx| {
1806
+
FieldFuture::new(async move {
1807
+
let value = ctx.parent_value.downcast_ref::<GraphQLValue>()
1808
+
.ok_or_else(|| Error::new("Failed to downcast RecordUpdate"))?;
1809
+
if let GraphQLValue::Object(obj) = value {
1810
+
if let Some(indexed_at) = obj.get("indexedAt") {
1811
+
return Ok(Some(indexed_at.clone()));
1812
+
}
1813
+
}
1814
+
Ok(None)
1815
+
})
1816
+
}));
1817
+
1818
+
record_update = record_update.field(Field::new("operation", TypeRef::named_nn(TypeRef::STRING), |ctx| {
1819
+
FieldFuture::new(async move {
1820
+
let value = ctx.parent_value.downcast_ref::<GraphQLValue>()
1821
+
.ok_or_else(|| Error::new("Failed to downcast RecordUpdate"))?;
1822
+
if let GraphQLValue::Object(obj) = value {
1823
+
if let Some(operation) = obj.get("operation") {
1824
+
return Ok(Some(operation.clone()));
1825
+
}
1826
+
}
1827
+
Ok(None)
1828
+
})
1829
+
}));
1830
+
1831
+
record_update = record_update.field(Field::new("value", TypeRef::named_nn("JSON"), |ctx| {
1832
+
FieldFuture::new(async move {
1833
+
let value = ctx.parent_value.downcast_ref::<GraphQLValue>()
1834
+
.ok_or_else(|| Error::new("Failed to downcast RecordUpdate"))?;
1835
+
if let GraphQLValue::Object(obj) = value {
1836
+
if let Some(val) = obj.get("value") {
1837
+
return Ok(Some(val.clone()));
1838
+
}
1839
+
}
1840
+
Ok(None)
1841
+
})
1842
+
}));
1843
+
1844
+
record_update
1845
+
}
1846
+
1847
+
/// Creates the Subscription root type with collection-specific subscriptions
1848
+
fn create_subscription_type(slice_uri: String, lexicons: &[serde_json::Value]) -> Subscription {
1849
+
let mut subscription = Subscription::new("Subscription");
1850
+
1851
+
// For each record collection, create {collection}Created, {collection}Updated, {collection}Deleted subscriptions
1852
+
for lexicon in lexicons {
1853
+
let nsid = match lexicon.get("id").and_then(|n| n.as_str()) {
1854
+
Some(n) => n,
1855
+
None => continue,
1856
+
};
1857
+
1858
+
let defs = match lexicon.get("defs") {
1859
+
Some(d) => d,
1860
+
None => continue,
1861
+
};
1862
+
1863
+
// Only process record types (skip queries, procedures, etc.)
1864
+
let is_record = defs
1865
+
.get("main")
1866
+
.and_then(|m| m.get("type"))
1867
+
.and_then(|t| t.as_str())
1868
+
== Some("record");
1869
+
1870
+
if !is_record {
1871
+
continue;
1872
+
}
1873
+
1874
+
let fields = extract_collection_fields(defs);
1875
+
if fields.is_empty() {
1876
+
continue;
1877
+
}
1878
+
1879
+
let type_name = nsid_to_type_name(nsid);
1880
+
let field_base_name = nsid_to_join_field_name(nsid);
1881
+
1882
+
// {collection}Created subscription
1883
+
let created_field_name = format!("{}Created", field_base_name);
1884
+
let slice_for_created = slice_uri.clone();
1885
+
let nsid_for_created = nsid.to_string();
1886
+
let type_name_for_created = type_name.clone();
1887
+
1888
+
subscription = subscription.field(SubscriptionField::new(
1889
+
&created_field_name,
1890
+
TypeRef::named_nn(&type_name_for_created),
1891
+
move |_ctx| {
1892
+
let slice_uri = slice_for_created.clone();
1893
+
let collection = nsid_for_created.clone();
1894
+
1895
+
SubscriptionFieldFuture::new(async move {
1896
+
let mut receiver = PUBSUB.subscribe(&slice_uri).await;
1897
+
1898
+
let stream = async_stream::stream! {
1899
+
while let Ok(event) = receiver.recv().await {
1900
+
// Filter by collection and operation
1901
+
if event.collection != collection || event.operation != crate::graphql::RecordOperation::Create {
1902
+
continue;
1903
+
}
1904
+
1905
+
// Convert to RecordContainer and yield
1906
+
let indexed_record = crate::models::IndexedRecord {
1907
+
uri: event.uri,
1908
+
cid: event.cid,
1909
+
did: event.did,
1910
+
collection: event.collection,
1911
+
value: event.value,
1912
+
indexed_at: event.indexed_at,
1913
+
};
1914
+
let container = RecordContainer {
1915
+
record: indexed_record,
1916
+
};
1917
+
yield Ok(FieldValue::owned_any(container));
1918
+
}
1919
+
};
1920
+
1921
+
Ok(stream)
1922
+
})
1923
+
},
1924
+
)
1925
+
.description(format!("Subscribe to {} record creation events", nsid)));
1926
+
1927
+
// {collection}Updated subscription
1928
+
let updated_field_name = format!("{}Updated", field_base_name);
1929
+
let slice_for_updated = slice_uri.clone();
1930
+
let nsid_for_updated = nsid.to_string();
1931
+
let type_name_for_updated = type_name.clone();
1932
+
1933
+
subscription = subscription.field(SubscriptionField::new(
1934
+
&updated_field_name,
1935
+
TypeRef::named_nn(&type_name_for_updated),
1936
+
move |_ctx| {
1937
+
let slice_uri = slice_for_updated.clone();
1938
+
let collection = nsid_for_updated.clone();
1939
+
1940
+
SubscriptionFieldFuture::new(async move {
1941
+
let mut receiver = PUBSUB.subscribe(&slice_uri).await;
1942
+
1943
+
let stream = async_stream::stream! {
1944
+
while let Ok(event) = receiver.recv().await {
1945
+
// Filter by collection and operation
1946
+
if event.collection != collection || event.operation != crate::graphql::RecordOperation::Update {
1947
+
continue;
1948
+
}
1949
+
1950
+
// Convert to RecordContainer and yield
1951
+
let indexed_record = crate::models::IndexedRecord {
1952
+
uri: event.uri,
1953
+
cid: event.cid,
1954
+
did: event.did,
1955
+
collection: event.collection,
1956
+
value: event.value,
1957
+
indexed_at: event.indexed_at,
1958
+
};
1959
+
let container = RecordContainer {
1960
+
record: indexed_record,
1961
+
};
1962
+
yield Ok(FieldValue::owned_any(container));
1963
+
}
1964
+
};
1965
+
1966
+
Ok(stream)
1967
+
})
1968
+
},
1969
+
)
1970
+
.description(format!("Subscribe to {} record update events", nsid)));
1971
+
1972
+
// {collection}Deleted subscription - returns just the URI string
1973
+
let deleted_field_name = format!("{}Deleted", field_base_name);
1974
+
let slice_for_deleted = slice_uri.clone();
1975
+
let nsid_for_deleted = nsid.to_string();
1976
+
1977
+
subscription = subscription.field(SubscriptionField::new(
1978
+
&deleted_field_name,
1979
+
TypeRef::named_nn(TypeRef::STRING),
1980
+
move |_ctx| {
1981
+
let slice_uri = slice_for_deleted.clone();
1982
+
let collection = nsid_for_deleted.clone();
1983
+
1984
+
SubscriptionFieldFuture::new(async move {
1985
+
let mut receiver = PUBSUB.subscribe(&slice_uri).await;
1986
+
1987
+
let stream = async_stream::stream! {
1988
+
while let Ok(event) = receiver.recv().await {
1989
+
// Filter by collection and operation
1990
+
if event.collection != collection || event.operation != crate::graphql::RecordOperation::Delete {
1991
+
continue;
1992
+
}
1993
+
1994
+
// For deletes, just return the URI
1995
+
yield Ok(FieldValue::value(GraphQLValue::String(event.uri)));
1996
+
}
1997
+
};
1998
+
1999
+
Ok(stream)
2000
+
})
2001
+
},
2002
+
)
2003
+
.description(format!("Subscribe to {} record deletion events. Returns the URI of deleted records.", nsid)));
2004
+
}
2005
+
2006
+
subscription
2007
+
}
+48
api/src/jetstream.rs
+48
api/src/jetstream.rs
···
14
14
use crate::cache::{CacheBackend, CacheFactory, SliceCache};
15
15
use crate::database::Database;
16
16
use crate::errors::JetstreamError;
17
+
use crate::graphql::{RecordOperation, RecordUpdateEvent, PUBSUB};
17
18
use crate::jetstream_cursor::PostgresCursorHandler;
18
19
use crate::logging::{LogLevel, Logger};
19
20
use crate::models::{Actor, Record};
···
480
481
})),
481
482
Some(&slice_uri),
482
483
);
484
+
485
+
// Broadcast to GraphQL subscribers
486
+
let event = RecordUpdateEvent {
487
+
uri: uri.clone(),
488
+
cid: commit.cid.clone(),
489
+
did: did.to_string(),
490
+
collection: commit.collection.clone(),
491
+
value: commit.record.clone(),
492
+
slice_uri: slice_uri.clone(),
493
+
indexed_at: record.indexed_at.to_rfc3339(),
494
+
operation: if is_insert {
495
+
RecordOperation::Create
496
+
} else {
497
+
RecordOperation::Update
498
+
},
499
+
};
500
+
PUBSUB.publish(event).await;
483
501
}
484
502
Err(e) => {
485
503
let message = "Failed to insert/update record";
···
556
574
})),
557
575
Some(&slice_uri),
558
576
);
577
+
578
+
// Broadcast to GraphQL subscribers
579
+
let event = RecordUpdateEvent {
580
+
uri: uri.clone(),
581
+
cid: commit.cid.clone(),
582
+
did: did.to_string(),
583
+
collection: commit.collection.clone(),
584
+
value: commit.record.clone(),
585
+
slice_uri: slice_uri.clone(),
586
+
indexed_at: record.indexed_at.to_rfc3339(),
587
+
operation: if is_insert {
588
+
RecordOperation::Create
589
+
} else {
590
+
RecordOperation::Update
591
+
},
592
+
};
593
+
PUBSUB.publish(event).await;
559
594
}
560
595
Err(e) => {
561
596
let message = "Failed to insert/update record";
···
689
724
})),
690
725
Some(slice_uri),
691
726
);
727
+
728
+
// Broadcast delete event to GraphQL subscribers
729
+
let event = RecordUpdateEvent {
730
+
uri: uri.clone(),
731
+
cid: String::new(), // No CID for delete events
732
+
did: did.to_string(),
733
+
collection: commit.collection.clone(),
734
+
value: serde_json::json!({}), // Empty value for deletes
735
+
slice_uri: slice_uri.clone(),
736
+
indexed_at: Utc::now().to_rfc3339(),
737
+
operation: RecordOperation::Delete,
738
+
};
739
+
PUBSUB.publish(event).await;
692
740
}
693
741
694
742
// Check if actor should be cleaned up (no more records)
+8
-1
api/src/main.rs
+8
-1
api/src/main.rs
···
106
106
// Start log cleanup background task
107
107
start_log_cleanup_task(pool.clone());
108
108
109
+
// Start GraphQL PubSub cleanup task
110
+
graphql::pubsub::start_cleanup_task();
111
+
109
112
// Detect process type from environment (supports both PROCESS_TYPE and FLY_PROCESS_GROUP)
110
113
let process_type = env::var("PROCESS_TYPE")
111
114
.or_else(|_| env::var("FLY_PROCESS_GROUP"))
···
390
393
"/xrpc/network.slices.slice.getSyncSummary",
391
394
get(xrpc::network::slices::slice::get_sync_summary::handler),
392
395
)
393
-
// GraphQL endpoint
396
+
// GraphQL endpoints
394
397
.route(
395
398
"/graphql",
396
399
get(graphql::graphql_playground).post(graphql::graphql_handler),
400
+
)
401
+
.route(
402
+
"/graphql/ws",
403
+
get(graphql::graphql_subscription_handler),
397
404
)
398
405
// Dynamic collection-specific XRPC endpoints (wildcard routes must come last)
399
406
.route(