at main 314 lines 11 kB view raw
1use std::net::SocketAddr; 2use std::sync::Arc; 3 4use axum::{ 5 Json, Router, 6 extract::State, 7 http::{StatusCode, header}, 8 middleware, 9 response::{Html, IntoResponse}, 10 routing::get, 11}; 12use jacquard::api::com_atproto::repo::{ 13 get_record::GetRecordRequest, list_records::ListRecordsRequest, 14}; 15use jacquard::client::UnauthenticatedSession; 16use jacquard::identity::JacquardResolver; 17use jacquard::types::did_doc::DidDocument; 18use jacquard::types::string::Did; 19use jacquard_axum::IntoRouter; 20use jacquard_axum::did_web::did_web_router; 21use jacquard_axum::service_auth::ServiceAuth; 22use serde::Serialize; 23use tower_http::cors::CorsLayer; 24use tower_http::trace::TraceLayer; 25use tracing::info; 26use weaver_api::app_bsky::actor::get_profile::GetProfileRequest as BskyGetProfileRequest; 27use weaver_api::app_bsky::feed::get_posts::GetPostsRequest as BskyGetPostsRequest; 28use weaver_api::com_atproto::identity::resolve_handle::ResolveHandleRequest; 29use weaver_api::sh_weaver::actor::{ 30 get_actor_entries::GetActorEntriesRequest, get_actor_notebooks::GetActorNotebooksRequest, 31 get_profile::GetProfileRequest, 32}; 33use weaver_api::sh_weaver::collab::get_collaboration_state::GetCollaborationStateRequest; 34use weaver_api::sh_weaver::collab::get_resource_participants::GetResourceParticipantsRequest; 35use weaver_api::sh_weaver::collab::get_resource_sessions::GetResourceSessionsRequest; 36use weaver_api::sh_weaver::domain::{ 37 generate_document::GenerateDocumentRequest, resolve_by_domain::ResolveByDomainRequest, 38 resolve_document::ResolveDocumentRequest, 39}; 40use weaver_api::sh_weaver::edit::get_contributors::GetContributorsRequest; 41use weaver_api::sh_weaver::edit::get_edit_history::GetEditHistoryRequest; 42use weaver_api::sh_weaver::edit::list_drafts::ListDraftsRequest; 43use weaver_api::sh_weaver::notebook::{ 44 get_book_entry::GetBookEntryRequest, get_entry::GetEntryRequest, 45 get_entry_feed::GetEntryFeedRequest, get_entry_notebooks::GetEntryNotebooksRequest, 46 get_notebook::GetNotebookRequest, get_notebook_feed::GetNotebookFeedRequest, 47 resolve_entry::ResolveEntryRequest, resolve_global_notebook::ResolveGlobalNotebookRequest, 48 resolve_notebook::ResolveNotebookRequest, 49}; 50 51use crate::clickhouse::Client; 52use crate::config::ShardConfig; 53use crate::endpoints::{actor, bsky, collab, domain, edit, identity, notebook, repo}; 54use crate::error::{IndexError, ServerError}; 55use crate::sqlite::ShardRouter; 56 57pub use weaver_common::telemetry::{self, TelemetryConfig}; 58 59/// Identity resolver type (unauthenticated, just for handle/DID resolution) 60pub type Resolver = UnauthenticatedSession<JacquardResolver>; 61 62/// Shared application state 63#[derive(Clone)] 64pub struct AppState { 65 pub clickhouse: Arc<Client>, 66 pub shards: Arc<ShardRouter>, 67 pub resolver: Resolver, 68 /// Our service DID (expected audience for service auth JWTs) 69 pub service_did: Did<'static>, 70} 71 72impl AppState { 73 pub fn new(clickhouse: Client, shard_config: ShardConfig, service_did: Did<'static>) -> Self { 74 Self { 75 clickhouse: Arc::new(clickhouse), 76 shards: Arc::new(ShardRouter::new(shard_config.base_path)), 77 resolver: UnauthenticatedSession::new_public(), 78 service_did, 79 } 80 } 81} 82 83impl ServiceAuth for AppState { 84 type Resolver = UnauthenticatedSession<JacquardResolver>; 85 86 fn service_did(&self) -> &Did<'_> { 87 &self.service_did 88 } 89 90 fn resolver(&self) -> &Self::Resolver { 91 &self.resolver 92 } 93 94 fn require_lxm(&self) -> bool { 95 true 96 } 97} 98 99/// Build the axum router with all XRPC endpoints 100pub fn router(state: AppState, did_doc: DidDocument<'static>) -> Router { 101 Router::new() 102 .route("/", get(landing)) 103 .route( 104 "/assets/IoskeleyMono-Regular.woff2", 105 get(font_ioskeley_regular), 106 ) 107 .route("/assets/IoskeleyMono-Bold.woff2", get(font_ioskeley_bold)) 108 .route( 109 "/assets/IoskeleyMono-Italic.woff2", 110 get(font_ioskeley_italic), 111 ) 112 .route("/xrpc/_health", get(health)) 113 .route("/metrics", get(metrics)) 114 // com.atproto.identity.* endpoints 115 .merge(ResolveHandleRequest::into_router(identity::resolve_handle)) 116 // com.atproto.repo.* endpoints (record cache) 117 .merge(GetRecordRequest::into_router(repo::get_record)) 118 .merge(ListRecordsRequest::into_router(repo::list_records)) 119 // app.bsky.* passthrough endpoints 120 .merge(BskyGetProfileRequest::into_router(bsky::get_profile)) 121 .merge(BskyGetPostsRequest::into_router(bsky::get_posts)) 122 // sh.weaver.actor.* endpoints 123 .merge(GetProfileRequest::into_router(actor::get_profile)) 124 .merge(GetActorNotebooksRequest::into_router( 125 actor::get_actor_notebooks, 126 )) 127 .merge(GetActorEntriesRequest::into_router( 128 actor::get_actor_entries, 129 )) 130 // sh.weaver.notebook.* endpoints 131 .merge(ResolveNotebookRequest::into_router( 132 notebook::resolve_notebook, 133 )) 134 .merge(GetNotebookRequest::into_router(notebook::get_notebook)) 135 .merge(GetEntryRequest::into_router(notebook::get_entry)) 136 .merge(ResolveEntryRequest::into_router(notebook::resolve_entry)) 137 .merge(GetNotebookFeedRequest::into_router( 138 notebook::get_notebook_feed, 139 )) 140 .merge(GetEntryFeedRequest::into_router(notebook::get_entry_feed)) 141 .merge(GetBookEntryRequest::into_router(notebook::get_book_entry)) 142 .merge(GetEntryNotebooksRequest::into_router( 143 notebook::get_entry_notebooks, 144 )) 145 .merge(ResolveGlobalNotebookRequest::into_router( 146 notebook::resolve_global_notebook, 147 )) 148 // Internal endpoint for Caddy on-demand TLS verification 149 .route("/internal/verify-domain", get(domain::verify_domain)) 150 // sh.weaver.domain.* endpoints 151 .merge(ResolveByDomainRequest::into_router( 152 domain::resolve_by_domain, 153 )) 154 .merge(ResolveDocumentRequest::into_router( 155 domain::resolve_document, 156 )) 157 .merge(GenerateDocumentRequest::into_router( 158 domain::generate_document, 159 )) 160 // sh.weaver.collab.* endpoints 161 .merge(GetResourceParticipantsRequest::into_router( 162 collab::get_resource_participants, 163 )) 164 .merge(GetCollaborationStateRequest::into_router( 165 collab::get_collaboration_state, 166 )) 167 .merge(GetResourceSessionsRequest::into_router( 168 collab::get_resource_sessions, 169 )) 170 // sh.weaver.edit.* endpoints 171 .merge(GetEditHistoryRequest::into_router(edit::get_edit_history)) 172 .merge(GetContributorsRequest::into_router(edit::get_contributors)) 173 .merge(ListDraftsRequest::into_router(edit::list_drafts)) 174 .layer(middleware::from_fn(telemetry::http_metrics)) 175 .layer(TraceLayer::new_for_http()) 176 .layer(CorsLayer::permissive().max_age(std::time::Duration::from_secs(86400))) 177 .with_state(state) 178 .merge(did_web_router(did_doc)) 179} 180 181/// Prometheus metrics endpoint 182async fn metrics() -> String { 183 telemetry::render() 184} 185 186// Embedded font files 187const IOSKELEY_MONO_REGULAR: &[u8] = 188 include_bytes!("../../weaver-app/assets/fonts/ioskeley-mono/IoskeleyMono-Regular.woff2"); 189const IOSKELEY_MONO_BOLD: &[u8] = 190 include_bytes!("../../weaver-app/assets/fonts/ioskeley-mono/IoskeleyMono-Bold.woff2"); 191const IOSKELEY_MONO_ITALIC: &[u8] = 192 include_bytes!("../../weaver-app/assets/fonts/ioskeley-mono/IoskeleyMono-Italic.woff2"); 193 194/// Serve the Ioskeley Mono Regular font 195async fn font_ioskeley_regular() -> impl IntoResponse { 196 ( 197 [(header::CONTENT_TYPE, "font/woff2")], 198 IOSKELEY_MONO_REGULAR, 199 ) 200} 201/// Serve the Ioskeley Mono Regular font 202async fn font_ioskeley_bold() -> impl IntoResponse { 203 ([(header::CONTENT_TYPE, "font/woff2")], IOSKELEY_MONO_BOLD) 204} 205 206/// Serve the Ioskeley Mono Regular font 207async fn font_ioskeley_italic() -> impl IntoResponse { 208 ([(header::CONTENT_TYPE, "font/woff2")], IOSKELEY_MONO_ITALIC) 209} 210 211const LANDING_HTML: &str = include_str!("./landing.html"); 212 213/// Landing page 214async fn landing() -> Html<&'static str> { 215 Html(LANDING_HTML) 216} 217 218/// Health check response 219#[derive(Serialize)] 220struct HealthResponse { 221 status: &'static str, 222 clickhouse: bool, 223 shard_count: usize, 224} 225 226/// Health check endpoint 227/// 228/// Returns 200 OK with stats if healthy, 503 if ClickHouse unreachable. 229async fn health(State(state): State<AppState>) -> impl IntoResponse { 230 let clickhouse_ok = state.clickhouse.execute("SELECT 1").await.is_ok(); 231 let shard_count = state.shards.shard_count(); 232 233 let response = HealthResponse { 234 status: if clickhouse_ok { "ok" } else { "degraded" }, 235 clickhouse: clickhouse_ok, 236 shard_count, 237 }; 238 239 let status = if clickhouse_ok { 240 StatusCode::OK 241 } else { 242 StatusCode::SERVICE_UNAVAILABLE 243 }; 244 245 (status, Json(response)) 246} 247 248/// Server configuration 249#[derive(Debug, Clone)] 250pub struct ServerConfig { 251 pub host: String, 252 pub port: u16, 253 /// Service DID for this indexer (used as expected audience for service auth) 254 pub service_did: Did<'static>, 255} 256 257impl Default for ServerConfig { 258 fn default() -> Self { 259 Self { 260 host: "0.0.0.0".to_string(), 261 port: 3000, 262 // Default to a placeholder - should be overridden in production 263 service_did: Did::new_static("did:web:index.weaver.sh").unwrap(), 264 } 265 } 266} 267 268impl ServerConfig { 269 pub fn from_env() -> Self { 270 let host = std::env::var("SERVER_HOST").unwrap_or_else(|_| "0.0.0.0".to_string()); 271 let port = std::env::var("SERVER_PORT") 272 .ok() 273 .and_then(|s| s.parse().ok()) 274 .unwrap_or(3000); 275 let service_did = std::env::var("SERVICE_DID") 276 .ok() 277 .and_then(|s| Did::new_owned(s).ok()) 278 .unwrap_or_else(|| Did::new_static("did:web:index.weaver.sh").unwrap()); 279 280 Self { 281 host, 282 port, 283 service_did, 284 } 285 } 286 287 pub fn addr(&self) -> SocketAddr { 288 format!("{}:{}", self.host, self.port) 289 .parse() 290 .expect("valid socket address") 291 } 292} 293 294/// Run the HTTP server 295pub async fn run( 296 state: AppState, 297 config: ServerConfig, 298 did_doc: DidDocument<'static>, 299) -> Result<(), IndexError> { 300 let addr = config.addr(); 301 let app = router(state, did_doc); 302 303 info!("Starting HTTP server on {}", addr); 304 305 let listener = tokio::net::TcpListener::bind(addr) 306 .await 307 .map_err(|e| ServerError::Bind { addr, source: e })?; 308 309 axum::serve(listener, app) 310 .await 311 .map_err(|e| ServerError::Serve { source: e })?; 312 313 Ok(()) 314}