use sqlx::PgPool; use std::sync::Arc; use std::sync::atomic::{AtomicU64, Ordering}; use std::time::{Duration, Instant}; use tokio::sync::Mutex; use tracing::{debug, warn}; /// Handles persistence of Jetstream cursor position to Postgres /// /// The cursor tracks the last processed event's time_us to enable resumption /// after disconnections or restarts. Writes are debounced to reduce DB load. pub struct PostgresCursorHandler { pool: PgPool, cursor_id: String, last_time_us: Arc, last_write: Arc>, write_interval: Duration, } impl PostgresCursorHandler { /// Create a new cursor handler /// /// # Arguments /// * `pool` - Database connection pool /// * `cursor_id` - Unique identifier for this cursor (e.g., "default", "instance-1") /// * `write_interval_secs` - Minimum seconds between cursor writes to reduce DB load pub fn new(pool: PgPool, cursor_id: String, write_interval_secs: u64) -> Self { Self { pool, cursor_id, last_time_us: Arc::new(AtomicU64::new(0)), last_write: Arc::new(Mutex::new(Instant::now())), write_interval: Duration::from_secs(write_interval_secs), } } /// Update the in-memory cursor position from an event's time_us /// /// This is called for every event but only writes to DB at intervals pub fn update_position(&self, time_us: u64) { self.last_time_us.store(time_us, Ordering::Relaxed); } /// Conditionally write cursor to Postgres if interval has elapsed /// /// This implements debouncing to avoid excessive DB writes while ensuring /// we don't lose more than write_interval seconds of progress on restart pub async fn maybe_write_cursor(&self) -> anyhow::Result<()> { let current_time_us = self.last_time_us.load(Ordering::Relaxed); if current_time_us == 0 { return Ok(()); } let mut last_write = self.last_write.lock().await; if last_write.elapsed() >= self.write_interval { sqlx::query!( r#" INSERT INTO jetstream_cursor (id, time_us, updated_at) VALUES ($1, $2, NOW()) ON CONFLICT (id) DO UPDATE SET time_us = $2, updated_at = NOW() "#, self.cursor_id, current_time_us as i64 ) .execute(&self.pool) .await?; *last_write = Instant::now(); debug!( cursor = current_time_us, cursor_id = %self.cursor_id, "Updated jetstream cursor in Postgres" ); } Ok(()) } /// Force immediate write of cursor to Postgres, bypassing interval check /// /// Used during graceful shutdown to ensure latest position is persisted pub async fn force_write_cursor(&self) -> anyhow::Result<()> { let current_time_us = self.last_time_us.load(Ordering::Relaxed); if current_time_us == 0 { return Ok(()); } sqlx::query!( r#" INSERT INTO jetstream_cursor (id, time_us, updated_at) VALUES ($1, $2, NOW()) ON CONFLICT (id) DO UPDATE SET time_us = $2, updated_at = NOW() "#, self.cursor_id, current_time_us as i64 ) .execute(&self.pool) .await?; let mut last_write = self.last_write.lock().await; *last_write = Instant::now(); debug!( cursor = current_time_us, cursor_id = %self.cursor_id, "Force wrote jetstream cursor to Postgres" ); Ok(()) } /// Read the last persisted cursor position from Postgres /// /// Returns None if no cursor exists or cursor is 0 (indicating fresh start) /// This should be called on startup to resume from last position pub async fn read_cursor(pool: &PgPool, cursor_id: &str) -> Option { match sqlx::query!( r#" SELECT time_us FROM jetstream_cursor WHERE id = $1 "#, cursor_id ) .fetch_optional(pool) .await { Ok(Some(row)) => { let time_us = row.time_us; if time_us > 0 { Some(time_us) } else { None } } Ok(None) => None, Err(e) => { warn!(error = ?e, "Failed to read cursor from Postgres"); None } } } }