Highly ambitious ATProtocol AppView service and sdks
at main 4.6 kB view raw
1use sqlx::PgPool; 2use std::sync::Arc; 3use std::sync::atomic::{AtomicU64, Ordering}; 4use std::time::{Duration, Instant}; 5use tokio::sync::Mutex; 6use tracing::{debug, warn}; 7 8/// Handles persistence of Jetstream cursor position to Postgres 9/// 10/// The cursor tracks the last processed event's time_us to enable resumption 11/// after disconnections or restarts. Writes are debounced to reduce DB load. 12pub struct PostgresCursorHandler { 13 pool: PgPool, 14 cursor_id: String, 15 last_time_us: Arc<AtomicU64>, 16 last_write: Arc<Mutex<Instant>>, 17 write_interval: Duration, 18} 19 20impl PostgresCursorHandler { 21 /// Create a new cursor handler 22 /// 23 /// # Arguments 24 /// * `pool` - Database connection pool 25 /// * `cursor_id` - Unique identifier for this cursor (e.g., "default", "instance-1") 26 /// * `write_interval_secs` - Minimum seconds between cursor writes to reduce DB load 27 pub fn new(pool: PgPool, cursor_id: String, write_interval_secs: u64) -> Self { 28 Self { 29 pool, 30 cursor_id, 31 last_time_us: Arc::new(AtomicU64::new(0)), 32 last_write: Arc::new(Mutex::new(Instant::now())), 33 write_interval: Duration::from_secs(write_interval_secs), 34 } 35 } 36 37 /// Update the in-memory cursor position from an event's time_us 38 /// 39 /// This is called for every event but only writes to DB at intervals 40 pub fn update_position(&self, time_us: u64) { 41 self.last_time_us.store(time_us, Ordering::Relaxed); 42 } 43 44 /// Conditionally write cursor to Postgres if interval has elapsed 45 /// 46 /// This implements debouncing to avoid excessive DB writes while ensuring 47 /// we don't lose more than write_interval seconds of progress on restart 48 pub async fn maybe_write_cursor(&self) -> anyhow::Result<()> { 49 let current_time_us = self.last_time_us.load(Ordering::Relaxed); 50 if current_time_us == 0 { 51 return Ok(()); 52 } 53 54 let mut last_write = self.last_write.lock().await; 55 if last_write.elapsed() >= self.write_interval { 56 sqlx::query!( 57 r#" 58 INSERT INTO jetstream_cursor (id, time_us, updated_at) 59 VALUES ($1, $2, NOW()) 60 ON CONFLICT (id) 61 DO UPDATE SET time_us = $2, updated_at = NOW() 62 "#, 63 self.cursor_id, 64 current_time_us as i64 65 ) 66 .execute(&self.pool) 67 .await?; 68 69 *last_write = Instant::now(); 70 debug!( 71 cursor = current_time_us, 72 cursor_id = %self.cursor_id, 73 "Updated jetstream cursor in Postgres" 74 ); 75 } 76 Ok(()) 77 } 78 79 /// Force immediate write of cursor to Postgres, bypassing interval check 80 /// 81 /// Used during graceful shutdown to ensure latest position is persisted 82 pub async fn force_write_cursor(&self) -> anyhow::Result<()> { 83 let current_time_us = self.last_time_us.load(Ordering::Relaxed); 84 if current_time_us == 0 { 85 return Ok(()); 86 } 87 88 sqlx::query!( 89 r#" 90 INSERT INTO jetstream_cursor (id, time_us, updated_at) 91 VALUES ($1, $2, NOW()) 92 ON CONFLICT (id) 93 DO UPDATE SET time_us = $2, updated_at = NOW() 94 "#, 95 self.cursor_id, 96 current_time_us as i64 97 ) 98 .execute(&self.pool) 99 .await?; 100 101 let mut last_write = self.last_write.lock().await; 102 *last_write = Instant::now(); 103 104 debug!( 105 cursor = current_time_us, 106 cursor_id = %self.cursor_id, 107 "Force wrote jetstream cursor to Postgres" 108 ); 109 Ok(()) 110 } 111 112 /// Read the last persisted cursor position from Postgres 113 /// 114 /// Returns None if no cursor exists or cursor is 0 (indicating fresh start) 115 /// This should be called on startup to resume from last position 116 pub async fn read_cursor(pool: &PgPool, cursor_id: &str) -> Option<i64> { 117 match sqlx::query!( 118 r#" 119 SELECT time_us 120 FROM jetstream_cursor 121 WHERE id = $1 122 "#, 123 cursor_id 124 ) 125 .fetch_optional(pool) 126 .await 127 { 128 Ok(Some(row)) => { 129 let time_us = row.time_us; 130 if time_us > 0 { Some(time_us) } else { None } 131 } 132 Ok(None) => None, 133 Err(e) => { 134 warn!(error = ?e, "Failed to read cursor from Postgres"); 135 None 136 } 137 } 138 } 139}