use consumer::indexing as relay; use deadpool_postgres::Runtime; use eyre::OptionExt as _; use metrics_exporter_prometheus::PrometheusBuilder; use tokio::signal::ctrl_c; use tokio_postgres::NoTls; /// Custom time formatter for tracing - shows only HH:MM:SS in local time struct SimpleTime; impl tracing_subscriber::fmt::time::FormatTime for SimpleTime { fn format_time(&self, w: &mut tracing_subscriber::fmt::format::Writer<'_>) -> std::fmt::Result { let now = chrono::Local::now(); write!(w, "{}", now.format("%H:%M:%S")) } } #[tokio::main] async fn main() -> eyre::Result<()> { // Install color-eyre for beautiful error reports color_eyre::install()?; // Initialize tracing with structured, concise output // RUST_LOG env var controls verbosity (default: info) tracing_subscriber::fmt() .with_timer(SimpleTime) .with_max_level( std::env::var("RUST_LOG") .ok() .and_then(|s| s.parse().ok()) .unwrap_or(tracing::Level::INFO) ) .compact() .with_target(false) .with_file(true) .with_line_number(true) .with_thread_ids(true) .init(); PrometheusBuilder::new().install()?; let cli = consumer::parse(); let mut conf = consumer::load_config()?; // Configure database connection pool for 30-worker database writer architecture // 30 workers + 15 headroom for burst traffic + other operations = 45 connections conf.database.pool = Some(deadpool_postgres::PoolConfig { max_size: 45, ..Default::default() }); tracing::info!( "Database connection pool configured with max_size=45 for 30-worker database writer" ); let pool = conf.database.create_pool(Some(Runtime::Tokio1), NoTls)?; // Note: DID/handle resolution is now handled by Tap let tracker = tokio_util::task::TaskTracker::new(); let (stop_tx, stop) = tokio::sync::watch::channel(false); // Calculate retention cutoff from config if retention_days is set let _retention_cutoff = conf.retention_days.map(|days| { let cutoff = chrono::Utc::now() - chrono::Duration::days(i64::from(days)); tracing::info!( retention_days = days, cutoff_date = %cutoff.format("%Y-%m-%d"), "Retention enabled: only processing records newer than cutoff" ); cutoff }); // Create shared database writer infrastructure if indexer mode is enabled let shared_database_writer = if cli.indexer { // Create channel for database writer (bounded to provide backpressure) // When channel fills up, workers will block, propagating backpressure let (tap_tx, tap_rx) = tokio::sync::mpsc::channel::(10_000); // Spawn single database writer task let batch_events_processed = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)); let batch_operations_processed = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)); // Create unified ID cache for fast DID→actor_id and URI→post_id lookups // Actor cache: 5min TTL, 50k capacity // Post cache: 10min TTL, 50k capacity let id_cache = parakeet_db::id_cache::IdCache::new(); tracing::info!("ID cache initialized (actors: 5min TTL/50k capacity, posts: 10min TTL/50k capacity)"); // Spawn database writer with Tap as primary source let database_writer_handle = consumer::spawn_database_writer_tap( pool.clone(), tap_rx, batch_events_processed.clone(), batch_operations_processed.clone(), id_cache, stop.clone(), ); drop(tracker.spawn(database_writer_handle)); // Create worker supervisor (shared for all supervised workers) let worker_supervisor = consumer::worker_core::WorkerSupervisor::new(); // Note: Handle resolution is now managed by Tap // Worker supervisor manages its own tasks via TaskTracker // We don't need to explicitly track it in the main tracker Some(( tap_tx, batch_events_processed, batch_operations_processed, worker_supervisor, )) } else { None }; if cli.indexer { let indexer_cfg = conf .indexer .ok_or_eyre("Config item [indexer] must be specified when using --indexer")?; // Get the shared infrastructure from the database writer setup let ( tap_tx, batch_events_processed, batch_operations_processed, worker_supervisor, ) = shared_database_writer.as_ref().unwrap(); // Create Tap configuration let tap_config = consumer::streaming::sources::tap::consumer::TapConfig { websocket_url: indexer_cfg.tap_websocket_url.clone(), admin_url: indexer_cfg.tap_admin_url.clone(), admin_password: indexer_cfg.tap_admin_password.clone(), max_pending_acks: indexer_cfg.max_pending_acks, reconnect_backoff_ms: 1000, reconnect_max_backoff_ms: 60000, }; // Create Tap indexer factory and spawn with supervision let tap_indexer_factory = relay::TapIndexerFactory::new( pool.clone(), tap_config, indexer_cfg.workers, tap_tx.clone(), batch_events_processed.clone(), batch_operations_processed.clone(), ); worker_supervisor.spawn( tap_indexer_factory, stop.clone(), consumer::worker_core::RestartPolicy::Always, // Critical worker - always restart ); } // Wait for Control+C tokio::select! { _ = ctrl_c() => { tracing::info!("Received SIGINT (Control+C), initiating shutdown..."); } } // Step 1: Signal all tasks to stop tracing::info!("Sending stop signal to all tasks..."); stop_tx.send(true).unwrap(); // Step 2: Drop the shared database writer senders to close the channels // This must happen BEFORE we wait, so tasks can see the channels close // Tasks will drop their clones as they exit, and once all are dropped, // the database writer will drain and exit if let Some(( tap_tx, _batch_events_processed, _batch_operations_processed, _worker_supervisor, )) = shared_database_writer { tracing::info!("Closing database writer channels..."); drop(tap_tx); } // Step 3: Close the tracker to prevent new tasks tracker.close(); // Step 4: Wait for all tasks to complete // Tasks should respect the stop signal and exit cleanly // The database writer will drain its queue before exiting tracing::info!("Waiting for all tasks to complete..."); tracker.wait().await; tracing::info!("Consumer shutdown complete"); Ok(()) }