Highly ambitious ATProtocol AppView service and sdks
at main 11 kB view raw
1//! GraphQL HTTP handler for Axum 2 3use async_graphql::dynamic::Schema; 4use async_graphql::http::{WebSocket as GraphQLWebSocket, WebSocketProtocols, WsMessage}; 5use async_graphql_axum::{GraphQLRequest, GraphQLResponse}; 6use axum::{ 7 extract::{ 8 Query, State, WebSocketUpgrade, 9 ws::{Message, WebSocket}, 10 }, 11 http::{HeaderMap, StatusCode}, 12 response::{Html, Response}, 13}; 14use futures_util::{SinkExt, StreamExt}; 15use serde::Deserialize; 16use std::sync::Arc; 17use tokio::sync::RwLock; 18 19use crate::AppState; 20use crate::errors::AppError; 21use crate::graphql::GraphQLContext; 22 23/// Global schema cache (one schema per slice) 24/// This prevents rebuilding the schema on every request 25type SchemaCache = Arc<RwLock<std::collections::HashMap<String, Schema>>>; 26 27lazy_static::lazy_static! { 28 static ref SCHEMA_CACHE: SchemaCache = Arc::new(RwLock::new(std::collections::HashMap::new())); 29} 30 31#[derive(Deserialize, Default)] 32pub struct GraphQLParams { 33 pub slice: Option<String>, 34} 35 36/// GraphQL query handler 37/// Accepts slice URI from either query parameter (?slice=...) or HTTP header (X-Slice-Uri) 38pub async fn graphql_handler( 39 State(state): State<AppState>, 40 Query(params): Query<GraphQLParams>, 41 headers: HeaderMap, 42 req: GraphQLRequest, 43) -> Result<GraphQLResponse, (StatusCode, String)> { 44 // Get slice URI from query param or header 45 let slice_uri = params 46 .slice 47 .or_else(|| { 48 headers 49 .get("x-slice-uri") 50 .and_then(|h| h.to_str().ok()) 51 .map(|s| s.to_string()) 52 }) 53 .ok_or_else(|| { 54 ( 55 StatusCode::BAD_REQUEST, 56 "Missing slice parameter. Provide either ?slice=... query parameter or X-Slice-Uri header".to_string(), 57 ) 58 })?; 59 60 let schema = match get_or_build_schema(&state, &slice_uri).await { 61 Ok(s) => s, 62 Err(e) => { 63 tracing::error!("Failed to get GraphQL schema: {:?}", e); 64 return Ok(async_graphql::Response::from_errors(vec![ 65 async_graphql::ServerError::new(format!("Schema error: {:?}", e), None), 66 ]) 67 .into()); 68 } 69 }; 70 71 // Extract optional bearer token for mutations 72 let auth_token = headers 73 .get("authorization") 74 .and_then(|h| h.to_str().ok()) 75 .and_then(|s| s.strip_prefix("Bearer ")) 76 .map(|s| s.to_string()); 77 78 // Create GraphQL context with DataLoader and auth 79 let gql_context = GraphQLContext::with_auth( 80 state.database.clone(), 81 auth_token.clone(), 82 state.config.auth_base_url.clone(), 83 Some(state.auth_cache.clone()), 84 ); 85 86 // Verify auth token and get user DID for mutations 87 let mut request = req.into_inner().data(gql_context).data(state.database_pool.clone()); 88 89 if let Some(token) = auth_token { 90 // Verify token and add user DID to context 91 match crate::auth::verify_oauth_token_cached( 92 &token, 93 &state.config.auth_base_url, 94 Some(state.auth_cache.clone()), 95 ) 96 .await 97 { 98 Ok(user_info) => { 99 request = request.data(user_info.sub); 100 } 101 Err(_) => { 102 // Invalid token - let the mutation handle the error 103 } 104 } 105 } 106 107 // Execute query with context 108 Ok(schema.execute(request).await.into()) 109} 110 111/// GraphiQL UI handler 112/// Configures GraphiQL with the slice URI in headers 113pub async fn graphql_playground( 114 Query(params): Query<GraphQLParams>, 115) -> Result<Html<String>, (StatusCode, String)> { 116 let slice_uri = params.slice.ok_or_else(|| { 117 ( 118 StatusCode::BAD_REQUEST, 119 "Missing slice parameter. Provide ?slice=... query parameter".to_string(), 120 ) 121 })?; 122 123 // Create GraphiQL with pre-configured headers using React 19 and modern ESM 124 let graphiql_html = format!( 125 r#"<!doctype html> 126<html lang="en"> 127 <head> 128 <meta charset="UTF-8" /> 129 <meta name="viewport" content="width=device-width, initial-scale=1.0" /> 130 <title>Slices GraphiQL</title> 131 <style> 132 body {{ 133 margin: 0; 134 }} 135 136 #graphiql {{ 137 height: 100dvh; 138 }} 139 140 .loading {{ 141 height: 100%; 142 display: flex; 143 align-items: center; 144 justify-content: center; 145 font-size: 4rem; 146 }} 147 </style> 148 <link rel="stylesheet" href="https://esm.sh/graphiql/dist/style.css" /> 149 <link 150 rel="stylesheet" 151 href="https://esm.sh/@graphiql/plugin-explorer/dist/style.css" 152 /> 153 <script type="importmap"> 154 {{ 155 "imports": {{ 156 "react": "https://esm.sh/react@19.1.0", 157 "react/": "https://esm.sh/react@19.1.0/", 158 159 "react-dom": "https://esm.sh/react-dom@19.1.0", 160 "react-dom/": "https://esm.sh/react-dom@19.1.0/", 161 162 "graphiql": "https://esm.sh/graphiql?standalone&external=react,react-dom,@graphiql/react,graphql", 163 "graphiql/": "https://esm.sh/graphiql/", 164 "@graphiql/plugin-explorer": "https://esm.sh/@graphiql/plugin-explorer?standalone&external=react,@graphiql/react,graphql", 165 "@graphiql/react": "https://esm.sh/@graphiql/react?standalone&external=react,react-dom,graphql,@graphiql/toolkit,@emotion/is-prop-valid", 166 167 "@graphiql/toolkit": "https://esm.sh/@graphiql/toolkit?standalone&external=graphql", 168 "graphql": "https://esm.sh/graphql@16.11.0", 169 "@emotion/is-prop-valid": "data:text/javascript," 170 }} 171 }} 172 </script> 173 <script type="module"> 174 import React from 'react'; 175 import ReactDOM from 'react-dom/client'; 176 import {{ GraphiQL, HISTORY_PLUGIN }} from 'graphiql'; 177 import {{ createGraphiQLFetcher }} from '@graphiql/toolkit'; 178 import {{ explorerPlugin }} from '@graphiql/plugin-explorer'; 179 import 'graphiql/setup-workers/esm.sh'; 180 181 const fetcher = createGraphiQLFetcher({{ 182 url: '/graphql', 183 subscriptionUrl: '/graphql/ws?slice={}', 184 headers: {{ 185 'X-Slice-Uri': '{}' 186 }} 187 }}); 188 const plugins = [HISTORY_PLUGIN, explorerPlugin()]; 189 190 function App() {{ 191 return React.createElement(GraphiQL, {{ 192 fetcher, 193 plugins, 194 defaultEditorToolsVisibility: true, 195 }}); 196 }} 197 198 const container = document.getElementById('graphiql'); 199 const root = ReactDOM.createRoot(container); 200 root.render(React.createElement(App)); 201 </script> 202 </head> 203 <body> 204 <div id="graphiql"> 205 <div class="loading">Loading…</div> 206 </div> 207 </body> 208</html>"#, 209 slice_uri.replace("'", "\\'").replace("\"", "\\\""), 210 slice_uri.replace("'", "\\'").replace("\"", "\\\"") 211 ); 212 213 Ok(Html(graphiql_html)) 214} 215 216/// GraphQL WebSocket handler for subscriptions 217/// Accepts slice URI from query parameter (?slice=...) 218pub async fn graphql_subscription_handler( 219 State(state): State<AppState>, 220 Query(params): Query<GraphQLParams>, 221 ws: WebSocketUpgrade, 222) -> Result<Response, (StatusCode, String)> { 223 let slice_uri = params.slice.ok_or_else(|| { 224 ( 225 StatusCode::BAD_REQUEST, 226 "Missing slice parameter. Provide ?slice=... query parameter".to_string(), 227 ) 228 })?; 229 230 let schema = match get_or_build_schema(&state, &slice_uri).await { 231 Ok(s) => s, 232 Err(e) => { 233 tracing::error!("Failed to get GraphQL schema: {:?}", e); 234 return Err(( 235 StatusCode::INTERNAL_SERVER_ERROR, 236 format!("Schema error: {:?}", e), 237 )); 238 } 239 }; 240 241 // Create GraphQL context with DataLoader (subscriptions don't need auth typically) 242 let gql_context = GraphQLContext::with_auth( 243 state.database.clone(), 244 None, 245 state.config.auth_base_url.clone(), 246 Some(state.auth_cache.clone()), 247 ); 248 249 // Upgrade to WebSocket and handle GraphQL subscriptions manually 250 let db_pool = state.database_pool.clone(); 251 Ok(ws 252 .protocols(["graphql-transport-ws", "graphql-ws"]) 253 .on_upgrade(move |socket| handle_graphql_ws(socket, schema, gql_context, db_pool))) 254} 255 256/// Handle GraphQL WebSocket connection 257async fn handle_graphql_ws(socket: WebSocket, schema: Schema, gql_context: GraphQLContext, state_pool: sqlx::PgPool) { 258 let (ws_sender, ws_receiver) = socket.split(); 259 260 // Convert axum WebSocket messages to strings for async-graphql 261 let input = ws_receiver.filter_map(|msg| { 262 futures_util::future::ready(match msg { 263 Ok(Message::Text(text)) => Some(text.to_string()), 264 _ => None, // Ignore other message types 265 }) 266 }); 267 268 // Create GraphQL WebSocket handler with context and database pool 269 let mut stream = GraphQLWebSocket::new(schema.clone(), input, WebSocketProtocols::GraphQLWS) 270 .on_connection_init(move |_| { 271 let gql_ctx = gql_context.clone(); 272 let pool = state_pool.clone(); 273 async move { 274 let mut data = async_graphql::Data::default(); 275 data.insert(gql_ctx); 276 data.insert(pool); 277 Ok(data) 278 } 279 }); 280 281 // Send GraphQL messages back through WebSocket 282 let mut ws_sender = ws_sender; 283 while let Some(msg) = stream.next().await { 284 let axum_msg = match msg { 285 WsMessage::Text(text) => Message::Text(text.into()), 286 WsMessage::Close(code, reason) => Message::Close(Some(axum::extract::ws::CloseFrame { 287 code, 288 reason: reason.into(), 289 })), 290 }; 291 292 if ws_sender.send(axum_msg).await.is_err() { 293 break; 294 } 295 } 296} 297 298/// Gets schema from cache or builds it if not cached 299async fn get_or_build_schema(state: &AppState, slice_uri: &str) -> Result<Schema, AppError> { 300 // Check cache first 301 { 302 let cache = SCHEMA_CACHE.read().await; 303 if let Some(schema) = cache.get(slice_uri) { 304 return Ok(schema.clone()); 305 } 306 } 307 308 // Build schema 309 let schema = 310 crate::graphql::build_graphql_schema( 311 state.database.clone(), 312 slice_uri.to_string(), 313 state.config.auth_base_url.clone(), 314 ) 315 .await 316 .map_err(|e| AppError::Internal(format!("Failed to build GraphQL schema: {}", e)))?; 317 318 // Cache it 319 { 320 let mut cache = SCHEMA_CACHE.write().await; 321 cache.insert(slice_uri.to_string(), schema.clone()); 322 } 323 324 Ok(schema) 325} 326 327/// Invalidates the cached GraphQL schema for a given slice 328/// 329/// This should be called when lexicon records are created, updated, or deleted 330/// to ensure the schema is rebuilt with the new lexicon definitions. 331pub async fn invalidate_schema_cache(slice_uri: &str) { 332 let mut cache = SCHEMA_CACHE.write().await; 333 cache.remove(slice_uri); 334 tracing::debug!("Invalidated GraphQL schema cache for slice: {}", slice_uri); 335}