Rust AppView - highly experimental!
at experiments 7.0 kB view raw
1use consumer::indexing as relay; 2use deadpool_postgres::Runtime; 3use eyre::OptionExt as _; 4use metrics_exporter_prometheus::PrometheusBuilder; 5use tokio::signal::ctrl_c; 6use tokio_postgres::NoTls; 7 8/// Custom time formatter for tracing - shows only HH:MM:SS in local time 9struct SimpleTime; 10 11impl tracing_subscriber::fmt::time::FormatTime for SimpleTime { 12 fn format_time(&self, w: &mut tracing_subscriber::fmt::format::Writer<'_>) -> std::fmt::Result { 13 let now = chrono::Local::now(); 14 write!(w, "{}", now.format("%H:%M:%S")) 15 } 16} 17 18#[tokio::main] 19async fn main() -> eyre::Result<()> { 20 // Install color-eyre for beautiful error reports 21 color_eyre::install()?; 22 23 // Initialize tracing with structured, concise output 24 // RUST_LOG env var controls verbosity (default: info) 25 tracing_subscriber::fmt() 26 .with_timer(SimpleTime) 27 .with_max_level( 28 std::env::var("RUST_LOG") 29 .ok() 30 .and_then(|s| s.parse().ok()) 31 .unwrap_or(tracing::Level::INFO) 32 ) 33 .compact() 34 .with_target(false) 35 .with_file(true) 36 .with_line_number(true) 37 .with_thread_ids(true) 38 .init(); 39 PrometheusBuilder::new().install()?; 40 41 let cli = consumer::parse(); 42 let mut conf = consumer::load_config()?; 43 44 45 // Configure database connection pool for 30-worker database writer architecture 46 // 30 workers + 15 headroom for burst traffic + other operations = 45 connections 47 conf.database.pool = Some(deadpool_postgres::PoolConfig { 48 max_size: 45, 49 ..Default::default() 50 }); 51 tracing::info!( 52 "Database connection pool configured with max_size=45 for 30-worker database writer" 53 ); 54 55 let pool = conf.database.create_pool(Some(Runtime::Tokio1), NoTls)?; 56 57 // Note: DID/handle resolution is now handled by Tap 58 59 let tracker = tokio_util::task::TaskTracker::new(); 60 let (stop_tx, stop) = tokio::sync::watch::channel(false); 61 62 // Calculate retention cutoff from config if retention_days is set 63 let _retention_cutoff = conf.retention_days.map(|days| { 64 let cutoff = chrono::Utc::now() - chrono::Duration::days(i64::from(days)); 65 tracing::info!( 66 retention_days = days, 67 cutoff_date = %cutoff.format("%Y-%m-%d"), 68 "Retention enabled: only processing records newer than cutoff" 69 ); 70 cutoff 71 }); 72 73 // Create shared database writer infrastructure if indexer mode is enabled 74 let shared_database_writer = if cli.indexer { 75 // Create channel for database writer (bounded to provide backpressure) 76 // When channel fills up, workers will block, propagating backpressure 77 let (tap_tx, tap_rx) = tokio::sync::mpsc::channel::<consumer::database_writer::WriterEvent>(10_000); 78 79 // Spawn single database writer task 80 let batch_events_processed = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)); 81 let batch_operations_processed = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)); 82 83 // Create unified ID cache for fast DID→actor_id and URI→post_id lookups 84 // Actor cache: 5min TTL, 50k capacity 85 // Post cache: 10min TTL, 50k capacity 86 let id_cache = parakeet_db::id_cache::IdCache::new(); 87 tracing::info!("ID cache initialized (actors: 5min TTL/50k capacity, posts: 10min TTL/50k capacity)"); 88 89 // Spawn database writer with Tap as primary source 90 let database_writer_handle = consumer::spawn_database_writer_tap( 91 pool.clone(), 92 tap_rx, 93 batch_events_processed.clone(), 94 batch_operations_processed.clone(), 95 id_cache, 96 stop.clone(), 97 ); 98 drop(tracker.spawn(database_writer_handle)); 99 100 // Create worker supervisor (shared for all supervised workers) 101 let worker_supervisor = consumer::worker_core::WorkerSupervisor::new(); 102 103 // Note: Handle resolution is now managed by Tap 104 // Worker supervisor manages its own tasks via TaskTracker 105 // We don't need to explicitly track it in the main tracker 106 107 Some(( 108 tap_tx, 109 batch_events_processed, 110 batch_operations_processed, 111 worker_supervisor, 112 )) 113 } else { 114 None 115 }; 116 117 if cli.indexer { 118 let indexer_cfg = conf 119 .indexer 120 .ok_or_eyre("Config item [indexer] must be specified when using --indexer")?; 121 122 // Get the shared infrastructure from the database writer setup 123 let ( 124 tap_tx, 125 batch_events_processed, 126 batch_operations_processed, 127 worker_supervisor, 128 ) = shared_database_writer.as_ref().unwrap(); 129 130 // Create Tap configuration 131 let tap_config = consumer::streaming::sources::tap::consumer::TapConfig { 132 websocket_url: indexer_cfg.tap_websocket_url.clone(), 133 admin_url: indexer_cfg.tap_admin_url.clone(), 134 admin_password: indexer_cfg.tap_admin_password.clone(), 135 max_pending_acks: indexer_cfg.max_pending_acks, 136 reconnect_backoff_ms: 1000, 137 reconnect_max_backoff_ms: 60000, 138 }; 139 140 // Create Tap indexer factory and spawn with supervision 141 let tap_indexer_factory = relay::TapIndexerFactory::new( 142 pool.clone(), 143 tap_config, 144 indexer_cfg.workers, 145 tap_tx.clone(), 146 batch_events_processed.clone(), 147 batch_operations_processed.clone(), 148 ); 149 150 worker_supervisor.spawn( 151 tap_indexer_factory, 152 stop.clone(), 153 consumer::worker_core::RestartPolicy::Always, // Critical worker - always restart 154 ); 155 } 156 157 // Wait for Control+C 158 tokio::select! { 159 _ = ctrl_c() => { 160 tracing::info!("Received SIGINT (Control+C), initiating shutdown..."); 161 } 162 } 163 164 // Step 1: Signal all tasks to stop 165 tracing::info!("Sending stop signal to all tasks..."); 166 stop_tx.send(true).unwrap(); 167 168 // Step 2: Drop the shared database writer senders to close the channels 169 // This must happen BEFORE we wait, so tasks can see the channels close 170 // Tasks will drop their clones as they exit, and once all are dropped, 171 // the database writer will drain and exit 172 if let Some(( 173 tap_tx, 174 _batch_events_processed, 175 _batch_operations_processed, 176 _worker_supervisor, 177 )) = shared_database_writer 178 { 179 tracing::info!("Closing database writer channels..."); 180 drop(tap_tx); 181 } 182 183 // Step 3: Close the tracker to prevent new tasks 184 tracker.close(); 185 186 // Step 4: Wait for all tasks to complete 187 // Tasks should respect the stop signal and exit cleanly 188 // The database writer will drain its queue before exiting 189 tracing::info!("Waiting for all tasks to complete..."); 190 tracker.wait().await; 191 192 tracing::info!("Consumer shutdown complete"); 193 Ok(()) 194}