Highly ambitious ATProtocol AppView service and sdks
at main 17 kB view raw
1mod actor_resolver; 2mod api; 3mod atproto_extensions; 4mod auth; 5mod cache; 6mod database; 7mod errors; 8mod graphql; 9mod jetstream; 10mod jetstream_cursor; 11mod jobs; 12mod logging; 13mod models; 14mod sync; 15mod xrpc; 16 17use axum::{ 18 Router, 19 routing::{get, post}, 20}; 21use sqlx::PgPool; 22use std::env; 23use std::sync::Arc; 24use std::sync::atomic::AtomicBool; 25use tokio::sync::Mutex; 26use tower_http::{cors::CorsLayer, trace::TraceLayer}; 27use tracing::info; 28 29use crate::database::Database; 30use crate::errors::AppError; 31use crate::jetstream::JetstreamConsumer; 32use crate::jetstream_cursor::PostgresCursorHandler; 33use crate::logging::{Logger, start_log_cleanup_task}; 34 35#[derive(Clone)] 36pub struct Config { 37 pub auth_base_url: String, 38 pub relay_endpoint: String, 39 pub system_slice_uri: String, 40 pub default_max_sync_repos: i32, 41} 42 43#[derive(Clone)] 44pub struct AppState { 45 database: Database, 46 database_pool: PgPool, 47 config: Config, 48 pub jetstream_connected: Arc<AtomicBool>, 49 pub auth_cache: Arc<Mutex<cache::SliceCache>>, 50} 51 52#[tokio::main] 53async fn main() -> Result<(), AppError> { 54 // Load environment variables from .env file 55 dotenvy::dotenv().ok(); 56 57 // Initialize tracing 58 tracing_subscriber::fmt() 59 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) 60 .init(); 61 62 // Database connection 63 let database_url = env::var("DATABASE_URL") 64 .unwrap_or_else(|_| "postgresql://slice:slice@localhost:5432/slice".to_string()); 65 66 // Configure database pool for high-concurrency workload 67 let pool = sqlx::postgres::PgPoolOptions::new() 68 .max_connections(20) // Increase from default of 10 69 .min_connections(5) // Keep some connections warm 70 .acquire_timeout(std::time::Duration::from_secs(30)) // Match request timeouts 71 .idle_timeout(std::time::Duration::from_secs(300)) // 5 minutes idle timeout 72 .max_lifetime(std::time::Duration::from_secs(1800)) // 30 minutes max lifetime 73 .connect(&database_url) 74 .await?; 75 76 // Run migrations if needed 77 sqlx::migrate!("./migrations").run(&pool).await?; 78 79 let database = Database::new(pool.clone()); 80 81 let auth_base_url = 82 env::var("AUTH_BASE_URL").unwrap_or_else(|_| "http://localhost:8081".to_string()); 83 84 let relay_endpoint = env::var("RELAY_ENDPOINT") 85 .unwrap_or_else(|_| "https://relay1.us-west.bsky.network".to_string()); 86 87 let system_slice_uri = env::var("SYSTEM_SLICE_URI").unwrap_or_else(|_| { 88 "at://did:plc:bcgltzqazw5tb6k2g3ttenbj/network.slices.slice/3lymhd4jhrd2z".to_string() 89 }); 90 91 let default_max_sync_repos = env::var("DEFAULT_MAX_SYNC_REPOS") 92 .unwrap_or_else(|_| "5000".to_string()) 93 .parse::<i32>() 94 .unwrap_or(5000); 95 96 let config = Config { 97 auth_base_url, 98 relay_endpoint, 99 system_slice_uri, 100 default_max_sync_repos, 101 }; 102 103 // Initialize global logger 104 Logger::init_global(pool.clone()); 105 106 // Start log cleanup background task 107 start_log_cleanup_task(pool.clone()); 108 109 // Start GraphQL PubSub cleanup task 110 graphql::pubsub::start_cleanup_task(); 111 112 // Initialize Redis pub/sub for cross-process sync job updates 113 let redis_url = env::var("REDIS_URL").ok(); 114 graphql::schema_ext::sync::initialize_redis_pubsub(redis_url); 115 116 // Detect process type from environment (supports both PROCESS_TYPE and FLY_PROCESS_GROUP) 117 let process_type = env::var("PROCESS_TYPE") 118 .or_else(|_| env::var("FLY_PROCESS_GROUP")) 119 .unwrap_or_else(|_| "all".to_string()); 120 121 info!("Starting with process type: {}", process_type); 122 123 let is_all = process_type == "all"; 124 let is_app = process_type == "app"; 125 let is_worker = process_type == "worker"; 126 127 // Start job queue runner (in worker or all processes) 128 if is_worker || is_all { 129 info!("Starting sync job queue runner for worker process"); 130 let pool_for_runner = pool.clone(); 131 tokio::spawn(async move { 132 tracing::info!("Starting job queue runner..."); 133 match jobs::registry() 134 .runner(&pool_for_runner) 135 .set_concurrency(1, 2) // Keep 1-2 sync jobs running at a time (reduced for pool management) 136 .run() 137 .await 138 { 139 Ok(handle) => { 140 tracing::info!("Job runner started successfully, keeping handle alive..."); 141 // CRITICAL: We must keep the handle alive for the runner to continue processing jobs 142 // The runner will stop if the handle is dropped 143 std::future::pending::<()>().await; // Keep the task alive indefinitely 144 drop(handle); // This line will never be reached 145 } 146 Err(e) => { 147 tracing::error!("Failed to start job runner: {}", e); 148 } 149 } 150 }); 151 } else { 152 info!("Skipping sync job queue runner for app process"); 153 } 154 155 // Create shared jetstream connection status 156 let jetstream_connected = Arc::new(AtomicBool::new(false)); 157 158 // Start Jetstream consumer (in app or all processes) 159 if is_app || is_all { 160 info!("Starting Jetstream consumer for app process"); 161 let database_for_jetstream = database.clone(); 162 let pool_for_jetstream = pool.clone(); 163 let jetstream_connected_clone = jetstream_connected.clone(); 164 tokio::spawn(async move { 165 let jetstream_hostname = env::var("JETSTREAM_HOSTNAME").ok(); 166 let redis_url = env::var("REDIS_URL").ok(); 167 let cursor_write_interval = env::var("JETSTREAM_CURSOR_WRITE_INTERVAL_SECS") 168 .unwrap_or_else(|_| "5".to_string()) 169 .parse::<u64>() 170 .unwrap_or(5); 171 172 // Reconnection rate limiting (5 retries per minute max) 173 const MAX_RECONNECTS_PER_MINUTE: u32 = 5; 174 const RECONNECT_WINDOW: tokio::time::Duration = tokio::time::Duration::from_secs(60); 175 let mut reconnect_count = 0u32; 176 let mut window_start = std::time::Instant::now(); 177 178 let mut retry_delay = tokio::time::Duration::from_secs(5); 179 const MAX_RETRY_DELAY: tokio::time::Duration = tokio::time::Duration::from_secs(300); 180 181 // Configuration reloader setup (run once) 182 let mut config_reloader_started = false; 183 184 loop { 185 // Rate limiting: reset counter if window has passed 186 let now = std::time::Instant::now(); 187 if now.duration_since(window_start) >= RECONNECT_WINDOW { 188 reconnect_count = 0; 189 window_start = now; 190 retry_delay = tokio::time::Duration::from_secs(5); // Reset delay after window passes 191 } 192 193 // Check rate limit 194 if reconnect_count >= MAX_RECONNECTS_PER_MINUTE { 195 let wait_time = RECONNECT_WINDOW - now.duration_since(window_start); 196 tracing::warn!( 197 "Rate limit exceeded: {} reconnects in last minute, waiting {:?}", 198 reconnect_count, 199 wait_time 200 ); 201 tokio::time::sleep(wait_time).await; 202 continue; 203 } 204 205 reconnect_count += 1; 206 tracing::info!("Jetstream connection attempt #{} (retry delay: {:?})", reconnect_count, retry_delay); 207 208 // Read cursor position from database 209 let initial_cursor = 210 PostgresCursorHandler::read_cursor(&pool_for_jetstream, "default").await; 211 if let Some(cursor) = initial_cursor { 212 tracing::info!("Resuming from cursor position: {}", cursor); 213 } else { 214 tracing::info!("No cursor found, starting from latest events"); 215 } 216 217 // Create cursor handler 218 let cursor_handler = Arc::new(PostgresCursorHandler::new( 219 pool_for_jetstream.clone(), 220 "default".to_string(), 221 cursor_write_interval, 222 )); 223 224 // Create consumer with cursor support and Redis cache 225 let consumer_result = JetstreamConsumer::new( 226 database_for_jetstream.clone(), 227 jetstream_hostname.clone(), 228 Some(cursor_handler.clone()), 229 initial_cursor, 230 redis_url.clone(), 231 ) 232 .await; 233 234 let consumer_arc = match consumer_result { 235 Ok(consumer) => { 236 let arc = Arc::new(consumer); 237 238 // Start configuration reloader only once 239 if !config_reloader_started { 240 JetstreamConsumer::start_configuration_reloader(arc.clone()); 241 config_reloader_started = true; 242 } 243 244 arc 245 } 246 Err(e) => { 247 tracing::error!( 248 "Failed to create Jetstream consumer: {} - retry in {:?}", 249 e, 250 retry_delay 251 ); 252 jetstream_connected_clone 253 .store(false, std::sync::atomic::Ordering::Relaxed); 254 tokio::time::sleep(retry_delay).await; 255 retry_delay = std::cmp::min(retry_delay * 2, MAX_RETRY_DELAY); 256 continue; 257 } 258 }; 259 260 // Reset retry delay on successful creation 261 retry_delay = tokio::time::Duration::from_secs(5); 262 263 tracing::info!("Starting Jetstream consumer with cursor support..."); 264 jetstream_connected_clone.store(true, std::sync::atomic::Ordering::Relaxed); 265 266 // Start consuming with cancellation token 267 let cancellation_token = atproto_jetstream::CancellationToken::new(); 268 match consumer_arc.start_consuming(cancellation_token).await { 269 Ok(_) => { 270 tracing::info!("Jetstream consumer shut down normally - reconnecting in {:?}", retry_delay); 271 jetstream_connected_clone 272 .store(false, std::sync::atomic::Ordering::Relaxed); 273 tokio::time::sleep(retry_delay).await; 274 retry_delay = std::cmp::min(retry_delay * 2, MAX_RETRY_DELAY); 275 } 276 Err(e) => { 277 tracing::error!("Jetstream consumer failed: {} - reconnecting in {:?}", e, retry_delay); 278 jetstream_connected_clone 279 .store(false, std::sync::atomic::Ordering::Relaxed); 280 tokio::time::sleep(retry_delay).await; 281 retry_delay = std::cmp::min(retry_delay * 2, MAX_RETRY_DELAY); 282 } 283 } 284 } 285 }); 286 } else { 287 info!("Skipping Jetstream consumer for worker process"); 288 } 289 290 // Create auth cache for token/session caching (5 minute TTL) 291 let redis_url = env::var("REDIS_URL").ok(); 292 let auth_cache_backend = if let Some(redis_url) = redis_url { 293 cache::CacheBackend::Redis { 294 url: redis_url, 295 ttl_seconds: Some(300), 296 } 297 } else { 298 cache::CacheBackend::InMemory { 299 ttl_seconds: Some(300), 300 } 301 }; 302 let auth_cache = Arc::new(Mutex::new( 303 cache::CacheFactory::create_slice_cache(auth_cache_backend).await?, 304 )); 305 306 let state = AppState { 307 database: database.clone(), 308 database_pool: pool, 309 config, 310 jetstream_connected, 311 auth_cache, 312 }; 313 314 // Build application with routes 315 let app = Router::new() 316 // Health check endpoint 317 .route( 318 "/", 319 get(|| async { 320 r#" 321███████╗██╗ ██╗ ██████╗███████╗███████╗ 322██╔════╝██║ ██║██╔════╝██╔════╝██╔════╝ 323███████╗██║ ██║██║ █████╗ ███████╗ 324╚════██║██║ ██║██║ ██╔══╝ ╚════██║ 325███████║███████╗██║╚██████╗███████╗███████║ 326╚══════╝╚══════╝╚═╝ ╚═════╝╚══════╝╚══════╝ 327 "# 328 }), 329 ) 330 // XRPC endpoints 331 .route( 332 "/xrpc/com.atproto.repo.uploadBlob", 333 post(xrpc::com::atproto::repo::upload_blob::handler), 334 ) 335 .route( 336 "/xrpc/network.slices.slice.startSync", 337 post(xrpc::network::slices::slice::start_sync::handler), 338 ) 339 .route( 340 "/xrpc/network.slices.slice.syncUserCollections", 341 post(xrpc::network::slices::slice::sync_user_collections::handler), 342 ) 343 .route( 344 "/xrpc/network.slices.slice.clearSliceRecords", 345 post(xrpc::network::slices::slice::clear_slice_records::handler), 346 ) 347 .route( 348 "/xrpc/network.slices.slice.getJobStatus", 349 get(xrpc::network::slices::slice::get_job_status::handler), 350 ) 351 .route( 352 "/xrpc/network.slices.slice.getJobHistory", 353 get(xrpc::network::slices::slice::get_job_history::handler), 354 ) 355 .route( 356 "/xrpc/network.slices.slice.getJobLogs", 357 get(xrpc::network::slices::slice::get_job_logs::handler), 358 ) 359 .route( 360 "/xrpc/network.slices.slice.getJetstreamLogs", 361 get(xrpc::network::slices::slice::get_jetstream_logs::handler), 362 ) 363 .route( 364 "/xrpc/network.slices.slice.stats", 365 get(xrpc::network::slices::slice::stats::handler), 366 ) 367 .route( 368 "/xrpc/network.slices.slice.getSparklines", 369 post(xrpc::network::slices::slice::get_sparklines::handler), 370 ) 371 .route( 372 "/xrpc/network.slices.slice.getSliceRecords", 373 post(xrpc::network::slices::slice::get_slice_records::handler), 374 ) 375 .route( 376 "/xrpc/network.slices.slice.openapi", 377 get(xrpc::network::slices::slice::openapi::handler), 378 ) 379 .route( 380 "/xrpc/network.slices.slice.getJetstreamStatus", 381 get(xrpc::network::slices::slice::get_jetstream_status::handler), 382 ) 383 .route( 384 "/xrpc/network.slices.slice.getActors", 385 post(xrpc::network::slices::slice::get_actors::handler), 386 ) 387 .route( 388 "/xrpc/network.slices.slice.createOAuthClient", 389 post(xrpc::network::slices::slice::create_oauth_client::handler), 390 ) 391 .route( 392 "/xrpc/network.slices.slice.getOAuthClients", 393 get(xrpc::network::slices::slice::get_oauth_clients::handler), 394 ) 395 .route( 396 "/xrpc/network.slices.slice.updateOAuthClient", 397 post(xrpc::network::slices::slice::update_oauth_client::handler), 398 ) 399 .route( 400 "/xrpc/network.slices.slice.deleteOAuthClient", 401 post(xrpc::network::slices::slice::delete_oauth_client::handler), 402 ) 403 .route( 404 "/xrpc/network.slices.slice.getSyncSummary", 405 get(xrpc::network::slices::slice::get_sync_summary::handler), 406 ) 407 // GraphQL endpoints 408 .route( 409 "/graphql", 410 get(graphql::graphql_playground).post(graphql::graphql_handler), 411 ) 412 .route("/graphql/ws", get(graphql::graphql_subscription_handler)) 413 // Dynamic collection-specific XRPC endpoints (wildcard routes must come last) 414 .route( 415 "/xrpc/{*method}", 416 get(api::xrpc_dynamic::dynamic_xrpc_handler), 417 ) 418 .route( 419 "/xrpc/{*method}", 420 post(api::xrpc_dynamic::dynamic_xrpc_post_handler), 421 ) 422 .layer(TraceLayer::new_for_http()) 423 .layer(CorsLayer::permissive()) 424 .with_state(state); 425 426 // Start HTTP server (in app or all processes) 427 if is_app || is_all { 428 info!("Starting HTTP server for app process"); 429 let port = env::var("PORT").unwrap_or_else(|_| "3000".to_string()); 430 let addr = format!("0.0.0.0:{}", port); 431 432 let listener = tokio::net::TcpListener::bind(&addr).await?; 433 info!("Server running on http://{}", addr); 434 435 axum::serve(listener, app).await?; 436 } else { 437 info!("Worker process, no HTTP server started"); 438 std::future::pending::<()>().await; 439 } 440 441 Ok(()) 442}