mod actor_resolver; mod api; mod atproto_extensions; mod auth; mod cache; mod database; mod errors; mod graphql; mod jetstream; mod jetstream_cursor; mod jobs; mod logging; mod models; mod sync; mod xrpc; use axum::{ Router, routing::{get, post}, }; use sqlx::PgPool; use std::env; use std::sync::Arc; use std::sync::atomic::AtomicBool; use tokio::sync::Mutex; use tower_http::{cors::CorsLayer, trace::TraceLayer}; use tracing::info; use crate::database::Database; use crate::errors::AppError; use crate::jetstream::JetstreamConsumer; use crate::jetstream_cursor::PostgresCursorHandler; use crate::logging::{Logger, start_log_cleanup_task}; #[derive(Clone)] pub struct Config { pub auth_base_url: String, pub relay_endpoint: String, pub system_slice_uri: String, pub default_max_sync_repos: i32, } #[derive(Clone)] pub struct AppState { database: Database, database_pool: PgPool, config: Config, pub jetstream_connected: Arc, pub auth_cache: Arc>, } #[tokio::main] async fn main() -> Result<(), AppError> { // Load environment variables from .env file dotenvy::dotenv().ok(); // Initialize tracing tracing_subscriber::fmt() .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) .init(); // Database connection let database_url = env::var("DATABASE_URL") .unwrap_or_else(|_| "postgresql://slice:slice@localhost:5432/slice".to_string()); // Configure database pool for high-concurrency workload let pool = sqlx::postgres::PgPoolOptions::new() .max_connections(20) // Increase from default of 10 .min_connections(5) // Keep some connections warm .acquire_timeout(std::time::Duration::from_secs(30)) // Match request timeouts .idle_timeout(std::time::Duration::from_secs(300)) // 5 minutes idle timeout .max_lifetime(std::time::Duration::from_secs(1800)) // 30 minutes max lifetime .connect(&database_url) .await?; // Run migrations if needed sqlx::migrate!("./migrations").run(&pool).await?; let database = Database::new(pool.clone()); let auth_base_url = env::var("AUTH_BASE_URL").unwrap_or_else(|_| "http://localhost:8081".to_string()); let relay_endpoint = env::var("RELAY_ENDPOINT") .unwrap_or_else(|_| "https://relay1.us-west.bsky.network".to_string()); let system_slice_uri = env::var("SYSTEM_SLICE_URI").unwrap_or_else(|_| { "at://did:plc:bcgltzqazw5tb6k2g3ttenbj/network.slices.slice/3lymhd4jhrd2z".to_string() }); let default_max_sync_repos = env::var("DEFAULT_MAX_SYNC_REPOS") .unwrap_or_else(|_| "5000".to_string()) .parse::() .unwrap_or(5000); let config = Config { auth_base_url, relay_endpoint, system_slice_uri, default_max_sync_repos, }; // Initialize global logger Logger::init_global(pool.clone()); // Start log cleanup background task start_log_cleanup_task(pool.clone()); // Detect process type from environment (supports both PROCESS_TYPE and FLY_PROCESS_GROUP) let process_type = env::var("PROCESS_TYPE") .or_else(|_| env::var("FLY_PROCESS_GROUP")) .unwrap_or_else(|_| "all".to_string()); info!("Starting with process type: {}", process_type); let is_all = process_type == "all"; let is_app = process_type == "app"; let is_worker = process_type == "worker"; // Start job queue runner (in worker or all processes) if is_worker || is_all { info!("Starting sync job queue runner for worker process"); let pool_for_runner = pool.clone(); tokio::spawn(async move { tracing::info!("Starting job queue runner..."); match jobs::registry() .runner(&pool_for_runner) .set_concurrency(1, 2) // Keep 1-2 sync jobs running at a time (reduced for pool management) .run() .await { Ok(handle) => { tracing::info!("Job runner started successfully, keeping handle alive..."); // CRITICAL: We must keep the handle alive for the runner to continue processing jobs // The runner will stop if the handle is dropped std::future::pending::<()>().await; // Keep the task alive indefinitely drop(handle); // This line will never be reached } Err(e) => { tracing::error!("Failed to start job runner: {}", e); } } }); } else { info!("Skipping sync job queue runner for app process"); } // Create shared jetstream connection status let jetstream_connected = Arc::new(AtomicBool::new(false)); // Start Jetstream consumer (in app or all processes) if is_app || is_all { info!("Starting Jetstream consumer for app process"); let database_for_jetstream = database.clone(); let pool_for_jetstream = pool.clone(); let jetstream_connected_clone = jetstream_connected.clone(); tokio::spawn(async move { let jetstream_hostname = env::var("JETSTREAM_HOSTNAME").ok(); let redis_url = env::var("REDIS_URL").ok(); let cursor_write_interval = env::var("JETSTREAM_CURSOR_WRITE_INTERVAL_SECS") .unwrap_or_else(|_| "5".to_string()) .parse::() .unwrap_or(5); // Reconnection rate limiting (5 retries per minute max) const MAX_RECONNECTS_PER_MINUTE: u32 = 5; const RECONNECT_WINDOW: tokio::time::Duration = tokio::time::Duration::from_secs(60); let mut reconnect_count = 0u32; let mut window_start = std::time::Instant::now(); let mut retry_delay = tokio::time::Duration::from_secs(5); const MAX_RETRY_DELAY: tokio::time::Duration = tokio::time::Duration::from_secs(300); // Configuration reloader setup (run once) let mut config_reloader_started = false; loop { // Rate limiting: reset counter if window has passed let now = std::time::Instant::now(); if now.duration_since(window_start) >= RECONNECT_WINDOW { reconnect_count = 0; window_start = now; } // Check rate limit if reconnect_count >= MAX_RECONNECTS_PER_MINUTE { let wait_time = RECONNECT_WINDOW - now.duration_since(window_start); tracing::warn!( "Rate limit exceeded: {} reconnects in last minute, waiting {:?}", reconnect_count, wait_time ); tokio::time::sleep(wait_time).await; continue; } reconnect_count += 1; // Read cursor position from database let initial_cursor = PostgresCursorHandler::read_cursor(&pool_for_jetstream, "default").await; if let Some(cursor) = initial_cursor { tracing::info!("Resuming from cursor position: {}", cursor); } else { tracing::info!("No cursor found, starting from latest events"); } // Create cursor handler let cursor_handler = Arc::new(PostgresCursorHandler::new( pool_for_jetstream.clone(), "default".to_string(), cursor_write_interval, )); // Create consumer with cursor support and Redis cache let consumer_result = JetstreamConsumer::new( database_for_jetstream.clone(), jetstream_hostname.clone(), Some(cursor_handler.clone()), initial_cursor, redis_url.clone(), ) .await; let consumer_arc = match consumer_result { Ok(consumer) => { let arc = Arc::new(consumer); // Start configuration reloader only once if !config_reloader_started { JetstreamConsumer::start_configuration_reloader(arc.clone()); config_reloader_started = true; } arc } Err(e) => { tracing::error!( "Failed to create Jetstream consumer: {} - retry in {:?}", e, retry_delay ); jetstream_connected_clone.store(false, std::sync::atomic::Ordering::Relaxed); tokio::time::sleep(retry_delay).await; retry_delay = std::cmp::min(retry_delay * 2, MAX_RETRY_DELAY); continue; } }; // Reset retry delay on successful creation retry_delay = tokio::time::Duration::from_secs(5); tracing::info!("Starting Jetstream consumer with cursor support..."); jetstream_connected_clone.store(true, std::sync::atomic::Ordering::Relaxed); // Start consuming with cancellation token let cancellation_token = atproto_jetstream::CancellationToken::new(); match consumer_arc.start_consuming(cancellation_token).await { Ok(_) => { tracing::info!("Jetstream consumer shut down normally"); jetstream_connected_clone.store(false, std::sync::atomic::Ordering::Relaxed); } Err(e) => { tracing::error!("Jetstream consumer failed: {} - will reconnect", e); jetstream_connected_clone.store(false, std::sync::atomic::Ordering::Relaxed); tokio::time::sleep(retry_delay).await; retry_delay = std::cmp::min(retry_delay * 2, MAX_RETRY_DELAY); } } } }); } else { info!("Skipping Jetstream consumer for worker process"); } // Create auth cache for token/session caching (5 minute TTL) let redis_url = env::var("REDIS_URL").ok(); let auth_cache_backend = if let Some(redis_url) = redis_url { cache::CacheBackend::Redis { url: redis_url, ttl_seconds: Some(300), } } else { cache::CacheBackend::InMemory { ttl_seconds: Some(300), } }; let auth_cache = Arc::new(Mutex::new( cache::CacheFactory::create_slice_cache(auth_cache_backend).await?, )); let state = AppState { database: database.clone(), database_pool: pool, config, jetstream_connected, auth_cache, }; // Build application with routes let app = Router::new() // Health check endpoint .route( "/", get(|| async { r#" ███████╗██╗ ██╗ ██████╗███████╗███████╗ ██╔════╝██║ ██║██╔════╝██╔════╝██╔════╝ ███████╗██║ ██║██║ █████╗ ███████╗ ╚════██║██║ ██║██║ ██╔══╝ ╚════██║ ███████║███████╗██║╚██████╗███████╗███████║ ╚══════╝╚══════╝╚═╝ ╚═════╝╚══════╝╚══════╝ "# }), ) // XRPC endpoints .route( "/xrpc/com.atproto.repo.uploadBlob", post(xrpc::com::atproto::repo::upload_blob::handler), ) .route( "/xrpc/network.slices.slice.startSync", post(xrpc::network::slices::slice::start_sync::handler), ) .route( "/xrpc/network.slices.slice.syncUserCollections", post(xrpc::network::slices::slice::sync_user_collections::handler), ) .route( "/xrpc/network.slices.slice.clearSliceRecords", post(xrpc::network::slices::slice::clear_slice_records::handler), ) .route( "/xrpc/network.slices.slice.getJobStatus", get(xrpc::network::slices::slice::get_job_status::handler), ) .route( "/xrpc/network.slices.slice.getJobHistory", get(xrpc::network::slices::slice::get_job_history::handler), ) .route( "/xrpc/network.slices.slice.getJobLogs", get(xrpc::network::slices::slice::get_job_logs::handler), ) .route( "/xrpc/network.slices.slice.getJetstreamLogs", get(xrpc::network::slices::slice::get_jetstream_logs::handler), ) .route( "/xrpc/network.slices.slice.stats", get(xrpc::network::slices::slice::stats::handler), ) .route( "/xrpc/network.slices.slice.getSparklines", post(xrpc::network::slices::slice::get_sparklines::handler), ) .route( "/xrpc/network.slices.slice.getSliceRecords", post(xrpc::network::slices::slice::get_slice_records::handler), ) .route( "/xrpc/network.slices.slice.openapi", get(xrpc::network::slices::slice::openapi::handler), ) .route( "/xrpc/network.slices.slice.getJetstreamStatus", get(xrpc::network::slices::slice::get_jetstream_status::handler), ) .route( "/xrpc/network.slices.slice.getActors", post(xrpc::network::slices::slice::get_actors::handler), ) .route( "/xrpc/network.slices.slice.createOAuthClient", post(xrpc::network::slices::slice::create_oauth_client::handler), ) .route( "/xrpc/network.slices.slice.getOAuthClients", get(xrpc::network::slices::slice::get_oauth_clients::handler), ) .route( "/xrpc/network.slices.slice.updateOAuthClient", post(xrpc::network::slices::slice::update_oauth_client::handler), ) .route( "/xrpc/network.slices.slice.deleteOAuthClient", post(xrpc::network::slices::slice::delete_oauth_client::handler), ) .route( "/xrpc/network.slices.slice.getSyncSummary", get(xrpc::network::slices::slice::get_sync_summary::handler), ) // GraphQL endpoint .route( "/graphql", get(graphql::graphql_playground).post(graphql::graphql_handler), ) // Dynamic collection-specific XRPC endpoints (wildcard routes must come last) .route( "/xrpc/{*method}", get(api::xrpc_dynamic::dynamic_xrpc_handler), ) .route( "/xrpc/{*method}", post(api::xrpc_dynamic::dynamic_xrpc_post_handler), ) .layer(TraceLayer::new_for_http()) .layer(CorsLayer::permissive()) .with_state(state); // Start HTTP server (in app or all processes) if is_app || is_all { info!("Starting HTTP server for app process"); let port = env::var("PORT").unwrap_or_else(|_| "3000".to_string()); let addr = format!("0.0.0.0:{}", port); let listener = tokio::net::TcpListener::bind(&addr).await?; info!("Server running on http://{}", addr); axum::serve(listener, app).await?; } else { info!("Worker process, no HTTP server started"); std::future::pending::<()>().await; } Ok(()) }