Highly ambitious ATProtocol AppView service and sdks
at main 16 kB view raw
1mod actor_resolver; 2mod api; 3mod atproto_extensions; 4mod auth; 5mod cache; 6mod database; 7mod errors; 8mod graphql; 9mod jetstream; 10mod jetstream_cursor; 11mod jobs; 12mod logging; 13mod models; 14mod sync; 15mod xrpc; 16 17use axum::{ 18 Router, 19 routing::{get, post}, 20}; 21use sqlx::PgPool; 22use std::env; 23use std::sync::Arc; 24use std::sync::atomic::AtomicBool; 25use tokio::sync::Mutex; 26use tower_http::{cors::CorsLayer, trace::TraceLayer}; 27use tracing::info; 28 29use crate::database::Database; 30use crate::errors::AppError; 31use crate::jetstream::JetstreamConsumer; 32use crate::jetstream_cursor::PostgresCursorHandler; 33use crate::logging::{Logger, start_log_cleanup_task}; 34 35#[derive(Clone)] 36pub struct Config { 37 pub auth_base_url: String, 38 pub relay_endpoint: String, 39 pub system_slice_uri: String, 40 pub default_max_sync_repos: i32, 41} 42 43#[derive(Clone)] 44pub struct AppState { 45 database: Database, 46 database_pool: PgPool, 47 config: Config, 48 pub jetstream_connected: Arc<AtomicBool>, 49 pub auth_cache: Arc<Mutex<cache::SliceCache>>, 50} 51 52#[tokio::main] 53async fn main() -> Result<(), AppError> { 54 // Load environment variables from .env file 55 dotenvy::dotenv().ok(); 56 57 // Initialize tracing 58 tracing_subscriber::fmt() 59 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) 60 .init(); 61 62 // Database connection 63 let database_url = env::var("DATABASE_URL") 64 .unwrap_or_else(|_| "postgresql://slice:slice@localhost:5432/slice".to_string()); 65 66 // Configure database pool for high-concurrency workload 67 let pool = sqlx::postgres::PgPoolOptions::new() 68 .max_connections(20) // Increase from default of 10 69 .min_connections(5) // Keep some connections warm 70 .acquire_timeout(std::time::Duration::from_secs(30)) // Match request timeouts 71 .idle_timeout(std::time::Duration::from_secs(300)) // 5 minutes idle timeout 72 .max_lifetime(std::time::Duration::from_secs(1800)) // 30 minutes max lifetime 73 .connect(&database_url) 74 .await?; 75 76 // Run migrations if needed 77 sqlx::migrate!("./migrations").run(&pool).await?; 78 79 let database = Database::new(pool.clone()); 80 81 let auth_base_url = 82 env::var("AUTH_BASE_URL").unwrap_or_else(|_| "http://localhost:8081".to_string()); 83 84 let relay_endpoint = env::var("RELAY_ENDPOINT") 85 .unwrap_or_else(|_| "https://relay1.us-west.bsky.network".to_string()); 86 87 let system_slice_uri = env::var("SYSTEM_SLICE_URI").unwrap_or_else(|_| { 88 "at://did:plc:bcgltzqazw5tb6k2g3ttenbj/network.slices.slice/3lymhd4jhrd2z".to_string() 89 }); 90 91 let default_max_sync_repos = env::var("DEFAULT_MAX_SYNC_REPOS") 92 .unwrap_or_else(|_| "5000".to_string()) 93 .parse::<i32>() 94 .unwrap_or(5000); 95 96 let config = Config { 97 auth_base_url, 98 relay_endpoint, 99 system_slice_uri, 100 default_max_sync_repos, 101 }; 102 103 // Initialize global logger 104 Logger::init_global(pool.clone()); 105 106 // Start log cleanup background task 107 start_log_cleanup_task(pool.clone()); 108 109 // Detect process type from environment (supports both PROCESS_TYPE and FLY_PROCESS_GROUP) 110 let process_type = env::var("PROCESS_TYPE") 111 .or_else(|_| env::var("FLY_PROCESS_GROUP")) 112 .unwrap_or_else(|_| "all".to_string()); 113 114 info!("Starting with process type: {}", process_type); 115 116 let is_all = process_type == "all"; 117 let is_app = process_type == "app"; 118 let is_worker = process_type == "worker"; 119 120 // Start job queue runner (in worker or all processes) 121 if is_worker || is_all { 122 info!("Starting sync job queue runner for worker process"); 123 let pool_for_runner = pool.clone(); 124 tokio::spawn(async move { 125 tracing::info!("Starting job queue runner..."); 126 match jobs::registry() 127 .runner(&pool_for_runner) 128 .set_concurrency(1, 2) // Keep 1-2 sync jobs running at a time (reduced for pool management) 129 .run() 130 .await 131 { 132 Ok(handle) => { 133 tracing::info!("Job runner started successfully, keeping handle alive..."); 134 // CRITICAL: We must keep the handle alive for the runner to continue processing jobs 135 // The runner will stop if the handle is dropped 136 std::future::pending::<()>().await; // Keep the task alive indefinitely 137 drop(handle); // This line will never be reached 138 } 139 Err(e) => { 140 tracing::error!("Failed to start job runner: {}", e); 141 } 142 } 143 }); 144 } else { 145 info!("Skipping sync job queue runner for app process"); 146 } 147 148 // Create shared jetstream connection status 149 let jetstream_connected = Arc::new(AtomicBool::new(false)); 150 151 // Start Jetstream consumer (in app or all processes) 152 if is_app || is_all { 153 info!("Starting Jetstream consumer for app process"); 154 let database_for_jetstream = database.clone(); 155 let pool_for_jetstream = pool.clone(); 156 let jetstream_connected_clone = jetstream_connected.clone(); 157 tokio::spawn(async move { 158 let jetstream_hostname = env::var("JETSTREAM_HOSTNAME").ok(); 159 let redis_url = env::var("REDIS_URL").ok(); 160 let cursor_write_interval = env::var("JETSTREAM_CURSOR_WRITE_INTERVAL_SECS") 161 .unwrap_or_else(|_| "5".to_string()) 162 .parse::<u64>() 163 .unwrap_or(5); 164 165 // Reconnection rate limiting (5 retries per minute max) 166 const MAX_RECONNECTS_PER_MINUTE: u32 = 5; 167 const RECONNECT_WINDOW: tokio::time::Duration = tokio::time::Duration::from_secs(60); 168 let mut reconnect_count = 0u32; 169 let mut window_start = std::time::Instant::now(); 170 171 let mut retry_delay = tokio::time::Duration::from_secs(5); 172 const MAX_RETRY_DELAY: tokio::time::Duration = tokio::time::Duration::from_secs(300); 173 174 // Configuration reloader setup (run once) 175 let mut config_reloader_started = false; 176 177 loop { 178 // Rate limiting: reset counter if window has passed 179 let now = std::time::Instant::now(); 180 if now.duration_since(window_start) >= RECONNECT_WINDOW { 181 reconnect_count = 0; 182 window_start = now; 183 } 184 185 // Check rate limit 186 if reconnect_count >= MAX_RECONNECTS_PER_MINUTE { 187 let wait_time = RECONNECT_WINDOW - now.duration_since(window_start); 188 tracing::warn!( 189 "Rate limit exceeded: {} reconnects in last minute, waiting {:?}", 190 reconnect_count, 191 wait_time 192 ); 193 tokio::time::sleep(wait_time).await; 194 continue; 195 } 196 197 reconnect_count += 1; 198 199 // Read cursor position from database 200 let initial_cursor = 201 PostgresCursorHandler::read_cursor(&pool_for_jetstream, "default").await; 202 if let Some(cursor) = initial_cursor { 203 tracing::info!("Resuming from cursor position: {}", cursor); 204 } else { 205 tracing::info!("No cursor found, starting from latest events"); 206 } 207 208 // Create cursor handler 209 let cursor_handler = Arc::new(PostgresCursorHandler::new( 210 pool_for_jetstream.clone(), 211 "default".to_string(), 212 cursor_write_interval, 213 )); 214 215 // Create consumer with cursor support and Redis cache 216 let consumer_result = JetstreamConsumer::new( 217 database_for_jetstream.clone(), 218 jetstream_hostname.clone(), 219 Some(cursor_handler.clone()), 220 initial_cursor, 221 redis_url.clone(), 222 ) 223 .await; 224 225 let consumer_arc = match consumer_result { 226 Ok(consumer) => { 227 let arc = Arc::new(consumer); 228 229 // Start configuration reloader only once 230 if !config_reloader_started { 231 JetstreamConsumer::start_configuration_reloader(arc.clone()); 232 config_reloader_started = true; 233 } 234 235 arc 236 } 237 Err(e) => { 238 tracing::error!( 239 "Failed to create Jetstream consumer: {} - retry in {:?}", 240 e, 241 retry_delay 242 ); 243 jetstream_connected_clone.store(false, std::sync::atomic::Ordering::Relaxed); 244 tokio::time::sleep(retry_delay).await; 245 retry_delay = std::cmp::min(retry_delay * 2, MAX_RETRY_DELAY); 246 continue; 247 } 248 }; 249 250 // Reset retry delay on successful creation 251 retry_delay = tokio::time::Duration::from_secs(5); 252 253 tracing::info!("Starting Jetstream consumer with cursor support..."); 254 jetstream_connected_clone.store(true, std::sync::atomic::Ordering::Relaxed); 255 256 // Start consuming with cancellation token 257 let cancellation_token = atproto_jetstream::CancellationToken::new(); 258 match consumer_arc.start_consuming(cancellation_token).await { 259 Ok(_) => { 260 tracing::info!("Jetstream consumer shut down normally"); 261 jetstream_connected_clone.store(false, std::sync::atomic::Ordering::Relaxed); 262 } 263 Err(e) => { 264 tracing::error!("Jetstream consumer failed: {} - will reconnect", e); 265 jetstream_connected_clone.store(false, std::sync::atomic::Ordering::Relaxed); 266 tokio::time::sleep(retry_delay).await; 267 retry_delay = std::cmp::min(retry_delay * 2, MAX_RETRY_DELAY); 268 } 269 } 270 } 271 }); 272 } else { 273 info!("Skipping Jetstream consumer for worker process"); 274 } 275 276 // Create auth cache for token/session caching (5 minute TTL) 277 let redis_url = env::var("REDIS_URL").ok(); 278 let auth_cache_backend = if let Some(redis_url) = redis_url { 279 cache::CacheBackend::Redis { 280 url: redis_url, 281 ttl_seconds: Some(300), 282 } 283 } else { 284 cache::CacheBackend::InMemory { 285 ttl_seconds: Some(300), 286 } 287 }; 288 let auth_cache = Arc::new(Mutex::new( 289 cache::CacheFactory::create_slice_cache(auth_cache_backend).await?, 290 )); 291 292 let state = AppState { 293 database: database.clone(), 294 database_pool: pool, 295 config, 296 jetstream_connected, 297 auth_cache, 298 }; 299 300 // Build application with routes 301 let app = Router::new() 302 // Health check endpoint 303 .route( 304 "/", 305 get(|| async { 306 r#" 307███████╗██╗ ██╗ ██████╗███████╗███████╗ 308██╔════╝██║ ██║██╔════╝██╔════╝██╔════╝ 309███████╗██║ ██║██║ █████╗ ███████╗ 310╚════██║██║ ██║██║ ██╔══╝ ╚════██║ 311███████║███████╗██║╚██████╗███████╗███████║ 312╚══════╝╚══════╝╚═╝ ╚═════╝╚══════╝╚══════╝ 313 "# 314 }), 315 ) 316 // XRPC endpoints 317 .route( 318 "/xrpc/com.atproto.repo.uploadBlob", 319 post(xrpc::com::atproto::repo::upload_blob::handler), 320 ) 321 .route( 322 "/xrpc/network.slices.slice.startSync", 323 post(xrpc::network::slices::slice::start_sync::handler), 324 ) 325 .route( 326 "/xrpc/network.slices.slice.syncUserCollections", 327 post(xrpc::network::slices::slice::sync_user_collections::handler), 328 ) 329 .route( 330 "/xrpc/network.slices.slice.clearSliceRecords", 331 post(xrpc::network::slices::slice::clear_slice_records::handler), 332 ) 333 .route( 334 "/xrpc/network.slices.slice.getJobStatus", 335 get(xrpc::network::slices::slice::get_job_status::handler), 336 ) 337 .route( 338 "/xrpc/network.slices.slice.getJobHistory", 339 get(xrpc::network::slices::slice::get_job_history::handler), 340 ) 341 .route( 342 "/xrpc/network.slices.slice.getJobLogs", 343 get(xrpc::network::slices::slice::get_job_logs::handler), 344 ) 345 .route( 346 "/xrpc/network.slices.slice.getJetstreamLogs", 347 get(xrpc::network::slices::slice::get_jetstream_logs::handler), 348 ) 349 .route( 350 "/xrpc/network.slices.slice.stats", 351 get(xrpc::network::slices::slice::stats::handler), 352 ) 353 .route( 354 "/xrpc/network.slices.slice.getSparklines", 355 post(xrpc::network::slices::slice::get_sparklines::handler), 356 ) 357 .route( 358 "/xrpc/network.slices.slice.getSliceRecords", 359 post(xrpc::network::slices::slice::get_slice_records::handler), 360 ) 361 .route( 362 "/xrpc/network.slices.slice.openapi", 363 get(xrpc::network::slices::slice::openapi::handler), 364 ) 365 .route( 366 "/xrpc/network.slices.slice.getJetstreamStatus", 367 get(xrpc::network::slices::slice::get_jetstream_status::handler), 368 ) 369 .route( 370 "/xrpc/network.slices.slice.getActors", 371 post(xrpc::network::slices::slice::get_actors::handler), 372 ) 373 .route( 374 "/xrpc/network.slices.slice.createOAuthClient", 375 post(xrpc::network::slices::slice::create_oauth_client::handler), 376 ) 377 .route( 378 "/xrpc/network.slices.slice.getOAuthClients", 379 get(xrpc::network::slices::slice::get_oauth_clients::handler), 380 ) 381 .route( 382 "/xrpc/network.slices.slice.updateOAuthClient", 383 post(xrpc::network::slices::slice::update_oauth_client::handler), 384 ) 385 .route( 386 "/xrpc/network.slices.slice.deleteOAuthClient", 387 post(xrpc::network::slices::slice::delete_oauth_client::handler), 388 ) 389 .route( 390 "/xrpc/network.slices.slice.getSyncSummary", 391 get(xrpc::network::slices::slice::get_sync_summary::handler), 392 ) 393 // GraphQL endpoint 394 .route( 395 "/graphql", 396 get(graphql::graphql_playground).post(graphql::graphql_handler), 397 ) 398 // Dynamic collection-specific XRPC endpoints (wildcard routes must come last) 399 .route( 400 "/xrpc/{*method}", 401 get(api::xrpc_dynamic::dynamic_xrpc_handler), 402 ) 403 .route( 404 "/xrpc/{*method}", 405 post(api::xrpc_dynamic::dynamic_xrpc_post_handler), 406 ) 407 .layer(TraceLayer::new_for_http()) 408 .layer(CorsLayer::permissive()) 409 .with_state(state); 410 411 // Start HTTP server (in app or all processes) 412 if is_app || is_all { 413 info!("Starting HTTP server for app process"); 414 let port = env::var("PORT").unwrap_or_else(|_| "3000".to_string()); 415 let addr = format!("0.0.0.0:{}", port); 416 417 let listener = tokio::net::TcpListener::bind(&addr).await?; 418 info!("Server running on http://{}", addr); 419 420 axum::serve(listener, app).await?; 421 } else { 422 info!("Worker process, no HTTP server started"); 423 std::future::pending::<()>().await; 424 } 425 426 Ok(()) 427}