Highly ambitious ATProtocol AppView service and sdks
1use sqlx::PgPool;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::time::{Duration, Instant};
5use tokio::sync::Mutex;
6use tracing::{debug, warn};
7
8/// Handles persistence of Jetstream cursor position to Postgres
9///
10/// The cursor tracks the last processed event's time_us to enable resumption
11/// after disconnections or restarts. Writes are debounced to reduce DB load.
12pub struct PostgresCursorHandler {
13 pool: PgPool,
14 cursor_id: String,
15 last_time_us: Arc<AtomicU64>,
16 last_write: Arc<Mutex<Instant>>,
17 write_interval: Duration,
18}
19
20impl PostgresCursorHandler {
21 /// Create a new cursor handler
22 ///
23 /// # Arguments
24 /// * `pool` - Database connection pool
25 /// * `cursor_id` - Unique identifier for this cursor (e.g., "default", "instance-1")
26 /// * `write_interval_secs` - Minimum seconds between cursor writes to reduce DB load
27 pub fn new(pool: PgPool, cursor_id: String, write_interval_secs: u64) -> Self {
28 Self {
29 pool,
30 cursor_id,
31 last_time_us: Arc::new(AtomicU64::new(0)),
32 last_write: Arc::new(Mutex::new(Instant::now())),
33 write_interval: Duration::from_secs(write_interval_secs),
34 }
35 }
36
37 /// Update the in-memory cursor position from an event's time_us
38 ///
39 /// This is called for every event but only writes to DB at intervals
40 pub fn update_position(&self, time_us: u64) {
41 self.last_time_us.store(time_us, Ordering::Relaxed);
42 }
43
44 /// Conditionally write cursor to Postgres if interval has elapsed
45 ///
46 /// This implements debouncing to avoid excessive DB writes while ensuring
47 /// we don't lose more than write_interval seconds of progress on restart
48 pub async fn maybe_write_cursor(&self) -> anyhow::Result<()> {
49 let current_time_us = self.last_time_us.load(Ordering::Relaxed);
50 if current_time_us == 0 {
51 return Ok(());
52 }
53
54 let mut last_write = self.last_write.lock().await;
55 if last_write.elapsed() >= self.write_interval {
56 sqlx::query!(
57 r#"
58 INSERT INTO jetstream_cursor (id, time_us, updated_at)
59 VALUES ($1, $2, NOW())
60 ON CONFLICT (id)
61 DO UPDATE SET time_us = $2, updated_at = NOW()
62 "#,
63 self.cursor_id,
64 current_time_us as i64
65 )
66 .execute(&self.pool)
67 .await?;
68
69 *last_write = Instant::now();
70 debug!(
71 cursor = current_time_us,
72 cursor_id = %self.cursor_id,
73 "Updated jetstream cursor in Postgres"
74 );
75 }
76 Ok(())
77 }
78
79 /// Force immediate write of cursor to Postgres, bypassing interval check
80 ///
81 /// Used during graceful shutdown to ensure latest position is persisted
82 pub async fn force_write_cursor(&self) -> anyhow::Result<()> {
83 let current_time_us = self.last_time_us.load(Ordering::Relaxed);
84 if current_time_us == 0 {
85 return Ok(());
86 }
87
88 sqlx::query!(
89 r#"
90 INSERT INTO jetstream_cursor (id, time_us, updated_at)
91 VALUES ($1, $2, NOW())
92 ON CONFLICT (id)
93 DO UPDATE SET time_us = $2, updated_at = NOW()
94 "#,
95 self.cursor_id,
96 current_time_us as i64
97 )
98 .execute(&self.pool)
99 .await?;
100
101 let mut last_write = self.last_write.lock().await;
102 *last_write = Instant::now();
103
104 debug!(
105 cursor = current_time_us,
106 cursor_id = %self.cursor_id,
107 "Force wrote jetstream cursor to Postgres"
108 );
109 Ok(())
110 }
111
112 /// Read the last persisted cursor position from Postgres
113 ///
114 /// Returns None if no cursor exists or cursor is 0 (indicating fresh start)
115 /// This should be called on startup to resume from last position
116 pub async fn read_cursor(pool: &PgPool, cursor_id: &str) -> Option<i64> {
117 match sqlx::query!(
118 r#"
119 SELECT time_us
120 FROM jetstream_cursor
121 WHERE id = $1
122 "#,
123 cursor_id
124 )
125 .fetch_optional(pool)
126 .await
127 {
128 Ok(Some(row)) => {
129 let time_us = row.time_us;
130 if time_us > 0 { Some(time_us) } else { None }
131 }
132 Ok(None) => None,
133 Err(e) => {
134 warn!(error = ?e, "Failed to read cursor from Postgres");
135 None
136 }
137 }
138 }
139}