1use consumer::indexing as relay;
2use deadpool_postgres::Runtime;
3use eyre::OptionExt as _;
4use metrics_exporter_prometheus::PrometheusBuilder;
5use tokio::signal::ctrl_c;
6use tokio_postgres::NoTls;
7
8/// Custom time formatter for tracing - shows only HH:MM:SS in local time
9struct SimpleTime;
10
11impl tracing_subscriber::fmt::time::FormatTime for SimpleTime {
12 fn format_time(&self, w: &mut tracing_subscriber::fmt::format::Writer<'_>) -> std::fmt::Result {
13 let now = chrono::Local::now();
14 write!(w, "{}", now.format("%H:%M:%S"))
15 }
16}
17
18#[tokio::main]
19async fn main() -> eyre::Result<()> {
20 // Install color-eyre for beautiful error reports
21 color_eyre::install()?;
22
23 // Initialize tracing with structured, concise output
24 // RUST_LOG env var controls verbosity (default: info)
25 tracing_subscriber::fmt()
26 .with_timer(SimpleTime)
27 .with_max_level(
28 std::env::var("RUST_LOG")
29 .ok()
30 .and_then(|s| s.parse().ok())
31 .unwrap_or(tracing::Level::INFO)
32 )
33 .compact()
34 .with_target(false)
35 .with_file(true)
36 .with_line_number(true)
37 .with_thread_ids(true)
38 .init();
39 PrometheusBuilder::new().install()?;
40
41 let cli = consumer::parse();
42 let mut conf = consumer::load_config()?;
43
44
45 // Configure database connection pool for 30-worker database writer architecture
46 // 30 workers + 15 headroom for burst traffic + other operations = 45 connections
47 conf.database.pool = Some(deadpool_postgres::PoolConfig {
48 max_size: 45,
49 ..Default::default()
50 });
51 tracing::info!(
52 "Database connection pool configured with max_size=45 for 30-worker database writer"
53 );
54
55 let pool = conf.database.create_pool(Some(Runtime::Tokio1), NoTls)?;
56
57 // Note: DID/handle resolution is now handled by Tap
58
59 let tracker = tokio_util::task::TaskTracker::new();
60 let (stop_tx, stop) = tokio::sync::watch::channel(false);
61
62 // Calculate retention cutoff from config if retention_days is set
63 let _retention_cutoff = conf.retention_days.map(|days| {
64 let cutoff = chrono::Utc::now() - chrono::Duration::days(i64::from(days));
65 tracing::info!(
66 retention_days = days,
67 cutoff_date = %cutoff.format("%Y-%m-%d"),
68 "Retention enabled: only processing records newer than cutoff"
69 );
70 cutoff
71 });
72
73 // Create shared database writer infrastructure if indexer mode is enabled
74 let shared_database_writer = if cli.indexer {
75 // Create channel for database writer (bounded to provide backpressure)
76 // When channel fills up, workers will block, propagating backpressure
77 let (tap_tx, tap_rx) = tokio::sync::mpsc::channel::<consumer::database_writer::WriterEvent>(10_000);
78
79 // Spawn single database writer task
80 let batch_events_processed = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0));
81 let batch_operations_processed = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0));
82
83 // Create unified ID cache for fast DID→actor_id and URI→post_id lookups
84 // Actor cache: 5min TTL, 50k capacity
85 // Post cache: 10min TTL, 50k capacity
86 let id_cache = parakeet_db::id_cache::IdCache::new();
87 tracing::info!("ID cache initialized (actors: 5min TTL/50k capacity, posts: 10min TTL/50k capacity)");
88
89 // Spawn database writer with Tap as primary source
90 let database_writer_handle = consumer::spawn_database_writer_tap(
91 pool.clone(),
92 tap_rx,
93 batch_events_processed.clone(),
94 batch_operations_processed.clone(),
95 id_cache,
96 stop.clone(),
97 );
98 drop(tracker.spawn(database_writer_handle));
99
100 // Create worker supervisor (shared for all supervised workers)
101 let worker_supervisor = consumer::worker_core::WorkerSupervisor::new();
102
103 // Note: Handle resolution is now managed by Tap
104 // Worker supervisor manages its own tasks via TaskTracker
105 // We don't need to explicitly track it in the main tracker
106
107 Some((
108 tap_tx,
109 batch_events_processed,
110 batch_operations_processed,
111 worker_supervisor,
112 ))
113 } else {
114 None
115 };
116
117 if cli.indexer {
118 let indexer_cfg = conf
119 .indexer
120 .ok_or_eyre("Config item [indexer] must be specified when using --indexer")?;
121
122 // Get the shared infrastructure from the database writer setup
123 let (
124 tap_tx,
125 batch_events_processed,
126 batch_operations_processed,
127 worker_supervisor,
128 ) = shared_database_writer.as_ref().unwrap();
129
130 // Create Tap configuration
131 let tap_config = consumer::streaming::sources::tap::consumer::TapConfig {
132 websocket_url: indexer_cfg.tap_websocket_url.clone(),
133 admin_url: indexer_cfg.tap_admin_url.clone(),
134 admin_password: indexer_cfg.tap_admin_password.clone(),
135 max_pending_acks: indexer_cfg.max_pending_acks,
136 reconnect_backoff_ms: 1000,
137 reconnect_max_backoff_ms: 60000,
138 };
139
140 // Create Tap indexer factory and spawn with supervision
141 let tap_indexer_factory = relay::TapIndexerFactory::new(
142 pool.clone(),
143 tap_config,
144 indexer_cfg.workers,
145 tap_tx.clone(),
146 batch_events_processed.clone(),
147 batch_operations_processed.clone(),
148 );
149
150 worker_supervisor.spawn(
151 tap_indexer_factory,
152 stop.clone(),
153 consumer::worker_core::RestartPolicy::Always, // Critical worker - always restart
154 );
155 }
156
157 // Wait for Control+C
158 tokio::select! {
159 _ = ctrl_c() => {
160 tracing::info!("Received SIGINT (Control+C), initiating shutdown...");
161 }
162 }
163
164 // Step 1: Signal all tasks to stop
165 tracing::info!("Sending stop signal to all tasks...");
166 stop_tx.send(true).unwrap();
167
168 // Step 2: Drop the shared database writer senders to close the channels
169 // This must happen BEFORE we wait, so tasks can see the channels close
170 // Tasks will drop their clones as they exit, and once all are dropped,
171 // the database writer will drain and exit
172 if let Some((
173 tap_tx,
174 _batch_events_processed,
175 _batch_operations_processed,
176 _worker_supervisor,
177 )) = shared_database_writer
178 {
179 tracing::info!("Closing database writer channels...");
180 drop(tap_tx);
181 }
182
183 // Step 3: Close the tracker to prevent new tasks
184 tracker.close();
185
186 // Step 4: Wait for all tasks to complete
187 // Tasks should respect the stop signal and exit cleanly
188 // The database writer will drain its queue before exiting
189 tracing::info!("Waiting for all tasks to complete...");
190 tracker.wait().await;
191
192 tracing::info!("Consumer shutdown complete");
193 Ok(())
194}