Highly ambitious ATProtocol AppView service and sdks
138
fork

Configure Feed

Select the types of activity you want to include in your feed.

at 0ca7304524bed6a8d5c490bd23137f28bb082167 139 lines 4.6 kB view raw
1use sqlx::PgPool; 2use std::sync::Arc; 3use std::sync::atomic::{AtomicU64, Ordering}; 4use std::time::{Duration, Instant}; 5use tokio::sync::Mutex; 6use tracing::{debug, warn}; 7 8/// Handles persistence of Jetstream cursor position to Postgres 9/// 10/// The cursor tracks the last processed event's time_us to enable resumption 11/// after disconnections or restarts. Writes are debounced to reduce DB load. 12pub struct PostgresCursorHandler { 13 pool: PgPool, 14 cursor_id: String, 15 last_time_us: Arc<AtomicU64>, 16 last_write: Arc<Mutex<Instant>>, 17 write_interval: Duration, 18} 19 20impl PostgresCursorHandler { 21 /// Create a new cursor handler 22 /// 23 /// # Arguments 24 /// * `pool` - Database connection pool 25 /// * `cursor_id` - Unique identifier for this cursor (e.g., "default", "instance-1") 26 /// * `write_interval_secs` - Minimum seconds between cursor writes to reduce DB load 27 pub fn new(pool: PgPool, cursor_id: String, write_interval_secs: u64) -> Self { 28 Self { 29 pool, 30 cursor_id, 31 last_time_us: Arc::new(AtomicU64::new(0)), 32 last_write: Arc::new(Mutex::new(Instant::now())), 33 write_interval: Duration::from_secs(write_interval_secs), 34 } 35 } 36 37 /// Update the in-memory cursor position from an event's time_us 38 /// 39 /// This is called for every event but only writes to DB at intervals 40 pub fn update_position(&self, time_us: u64) { 41 self.last_time_us.store(time_us, Ordering::Relaxed); 42 } 43 44 /// Conditionally write cursor to Postgres if interval has elapsed 45 /// 46 /// This implements debouncing to avoid excessive DB writes while ensuring 47 /// we don't lose more than write_interval seconds of progress on restart 48 pub async fn maybe_write_cursor(&self) -> anyhow::Result<()> { 49 let current_time_us = self.last_time_us.load(Ordering::Relaxed); 50 if current_time_us == 0 { 51 return Ok(()); 52 } 53 54 let mut last_write = self.last_write.lock().await; 55 if last_write.elapsed() >= self.write_interval { 56 sqlx::query!( 57 r#" 58 INSERT INTO jetstream_cursor (id, time_us, updated_at) 59 VALUES ($1, $2, NOW()) 60 ON CONFLICT (id) 61 DO UPDATE SET time_us = $2, updated_at = NOW() 62 "#, 63 self.cursor_id, 64 current_time_us as i64 65 ) 66 .execute(&self.pool) 67 .await?; 68 69 *last_write = Instant::now(); 70 debug!( 71 cursor = current_time_us, 72 cursor_id = %self.cursor_id, 73 "Updated jetstream cursor in Postgres" 74 ); 75 } 76 Ok(()) 77 } 78 79 /// Force immediate write of cursor to Postgres, bypassing interval check 80 /// 81 /// Used during graceful shutdown to ensure latest position is persisted 82 pub async fn force_write_cursor(&self) -> anyhow::Result<()> { 83 let current_time_us = self.last_time_us.load(Ordering::Relaxed); 84 if current_time_us == 0 { 85 return Ok(()); 86 } 87 88 sqlx::query!( 89 r#" 90 INSERT INTO jetstream_cursor (id, time_us, updated_at) 91 VALUES ($1, $2, NOW()) 92 ON CONFLICT (id) 93 DO UPDATE SET time_us = $2, updated_at = NOW() 94 "#, 95 self.cursor_id, 96 current_time_us as i64 97 ) 98 .execute(&self.pool) 99 .await?; 100 101 let mut last_write = self.last_write.lock().await; 102 *last_write = Instant::now(); 103 104 debug!( 105 cursor = current_time_us, 106 cursor_id = %self.cursor_id, 107 "Force wrote jetstream cursor to Postgres" 108 ); 109 Ok(()) 110 } 111 112 /// Read the last persisted cursor position from Postgres 113 /// 114 /// Returns None if no cursor exists or cursor is 0 (indicating fresh start) 115 /// This should be called on startup to resume from last position 116 pub async fn read_cursor(pool: &PgPool, cursor_id: &str) -> Option<i64> { 117 match sqlx::query!( 118 r#" 119 SELECT time_us 120 FROM jetstream_cursor 121 WHERE id = $1 122 "#, 123 cursor_id 124 ) 125 .fetch_optional(pool) 126 .await 127 { 128 Ok(Some(row)) => { 129 let time_us = row.time_us; 130 if time_us > 0 { Some(time_us) } else { None } 131 } 132 Ok(None) => None, 133 Err(e) => { 134 warn!(error = ?e, "Failed to read cursor from Postgres"); 135 None 136 } 137 } 138 } 139}