Highly ambitious ATProtocol AppView service and sdks

add jetstream cursor tracking, update atproto libs to 0.13

+4 -1
api/.env.example
··· 7 7 # Authentication service base URL 8 8 AUTH_BASE_URL=http://localhost:8081 9 9 10 - # AT Protocol relay endpoint for syncing data 10 + # AT Protocol relay endpoint for backfill 11 11 RELAY_ENDPOINT=https://relay1.us-west.bsky.network 12 + 13 + # AT Protocol Jetstream hostname 14 + JETSTREAM_HOSTNAME=jetstream2.us-west.bsky.network 12 15 13 16 # System slice URI 14 17 SYSTEM_SLICE_URI=at://did:plc:bcgltzqazw5tb6k2g3ttenbj/network.slices.slice/3lymhd4jhrd2z
+11 -10
api/Cargo.lock
··· 87 87 88 88 [[package]] 89 89 name = "atproto-client" 90 - version = "0.11.2" 90 + version = "0.13.0" 91 91 source = "registry+https://github.com/rust-lang/crates.io-index" 92 - checksum = "188c4bae6a3260c4d57149e7061415d440422ef11d68a16f581422ff181a66d9" 92 + checksum = "c34ed7ebeec01cd7775c1c7841838c142d123d403983ed7179b31850435b5c7c" 93 93 dependencies = [ 94 94 "anyhow", 95 95 "atproto-identity", ··· 109 109 110 110 [[package]] 111 111 name = "atproto-identity" 112 - version = "0.11.2" 112 + version = "0.13.0" 113 113 source = "registry+https://github.com/rust-lang/crates.io-index" 114 - checksum = "f4bf47131d663bcb76feaeb9403c09e12f00e5a2e0a7d805bd8caf4c6fdf01fa" 114 + checksum = "b956c07726fce812630be63c5cb31b1961cbb70f0a05614278523102d78c3a48" 115 115 dependencies = [ 116 116 "anyhow", 117 117 "async-trait", ··· 131 131 "thiserror 2.0.14", 132 132 "tokio", 133 133 "tracing", 134 + "urlencoding", 134 135 ] 135 136 136 137 [[package]] 137 138 name = "atproto-jetstream" 138 - version = "0.11.2" 139 + version = "0.13.0" 139 140 source = "registry+https://github.com/rust-lang/crates.io-index" 140 - checksum = "178b4af2b79ee11f25e69bca5c12907d0699843a4a847d737827dcff68a20af4" 141 + checksum = "7b1897fb2f7c6d02d46f7b8d25d653c141cee4a68a10efd135d46201a95034db" 141 142 dependencies = [ 142 143 "anyhow", 143 144 "async-trait", ··· 158 159 159 160 [[package]] 160 161 name = "atproto-oauth" 161 - version = "0.11.2" 162 + version = "0.13.0" 162 163 source = "registry+https://github.com/rust-lang/crates.io-index" 163 - checksum = "919d64f13696fb700ed604b09c526b223f6bf063eb35f46d691198079cfbc789" 164 + checksum = "3ea205901c33d074a1b498591d0511bcd788b6772ec0ca6e09a92c4327ddbdff" 164 165 dependencies = [ 165 166 "anyhow", 166 167 "async-trait", ··· 190 191 191 192 [[package]] 192 193 name = "atproto-record" 193 - version = "0.11.2" 194 + version = "0.13.0" 194 195 source = "registry+https://github.com/rust-lang/crates.io-index" 195 - checksum = "623f3eb1ba1e7b99903dc525f8b115ea8b6439b0d598507201f1f88aa6a37646" 196 + checksum = "0550f74423ca745132dc07ba1cb01f2f08243a7bf7497f5c3f2185a774c92ca2" 196 197 dependencies = [ 197 198 "anyhow", 198 199 "atproto-identity",
+4 -4
api/Cargo.toml
··· 49 49 async-trait = "0.1" 50 50 51 51 # AT Protocol client 52 - atproto-client = "0.11.2" 53 - atproto-identity = "0.11.2" 54 - atproto-oauth = "0.11.2" 55 - atproto-jetstream = "0.11.2" 52 + atproto-client = "0.13.0" 53 + atproto-identity = "0.13.0" 54 + atproto-oauth = "0.13.0" 55 + atproto-jetstream = "0.13.0" 56 56 57 57 58 58 # Middleware for HTTP requests with retry logic
+13
api/migrations/011_jetstream_cursor.sql
··· 1 + -- Add jetstream cursor table for tracking event processing position 2 + CREATE TABLE IF NOT EXISTS jetstream_cursor ( 3 + id TEXT PRIMARY KEY DEFAULT 'default', 4 + time_us BIGINT NOT NULL, 5 + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() 6 + ); 7 + 8 + -- Index for tracking cursor freshness 9 + CREATE INDEX idx_jetstream_cursor_updated_at ON jetstream_cursor(updated_at); 10 + 11 + -- Insert default cursor starting at 0 (will be updated when events are processed) 12 + INSERT INTO jetstream_cursor (id, time_us) VALUES ('default', 0) 13 + ON CONFLICT (id) DO NOTHING;
+3 -3
api/src/api/xrpc_dynamic.rs
··· 603 603 validate: false, 604 604 }; 605 605 606 - let result = create_record(&http_client, &dpop_auth, &pds_url, create_request) 606 + let result = create_record(&http_client, &atproto_client::client::Auth::DPoP(dpop_auth), &pds_url, create_request) 607 607 .await 608 608 .map_err(|_e| status_to_error_response(StatusCode::INTERNAL_SERVER_ERROR))?; 609 609 ··· 721 721 validate: false, 722 722 }; 723 723 724 - let result = put_record(&http_client, &dpop_auth, &pds_url, put_request) 724 + let result = put_record(&http_client, &atproto_client::client::Auth::DPoP(dpop_auth), &pds_url, put_request) 725 725 .await 726 726 .map_err(|_| status_to_error_response(StatusCode::INTERNAL_SERVER_ERROR))?; 727 727 ··· 790 790 swap_commit: None, 791 791 }; 792 792 793 - delete_record(&http_client, &dpop_auth, &pds_url, delete_request) 793 + delete_record(&http_client, &atproto_client::client::Auth::DPoP(dpop_auth), &pds_url, delete_request) 794 794 .await 795 795 .map_err(|_| status_to_error_response(StatusCode::INTERNAL_SERVER_ERROR))?; 796 796
+9 -18
api/src/atproto_extensions.rs
··· 3 3 4 4 use serde::{Deserialize, Serialize}; 5 5 use atproto_client::client::DPoPAuth; 6 - use atproto_client::url::URLBuilder; 7 6 use thiserror::Error; 8 7 use atproto_oauth::dpop::{DpopRetry, request_dpop}; 9 8 use reqwest_middleware::ClientBuilder; ··· 24 23 UploadFailed { status: u16, message: String }, 25 24 } 26 25 27 - /// Request for uploading a blob 28 - #[derive(Serialize, Deserialize, Debug)] 29 - pub struct UploadBlobRequest { 30 - // Note: For blob uploads, the data is sent as the request body, not JSON 31 - // So this struct is mainly for reference - we'll handle the actual upload differently 32 - } 33 26 34 27 /// Response from blob upload 35 28 #[cfg_attr(debug_assertions, derive(Debug))] ··· 68 61 blob_data: Vec<u8>, 69 62 mime_type: &str, 70 63 ) -> Result<UploadBlobResponse, BlobUploadError> { 71 - // Build the URL 72 - let mut url_builder = URLBuilder::new(base_url); 73 - url_builder.path("/xrpc/com.atproto.repo.uploadBlob"); 74 - let url = url_builder.build(); 75 - 64 + // Build the URL using standard string formatting 65 + let url = format!("{}/xrpc/com.atproto.repo.uploadBlob", base_url.trim_end_matches('/')); 66 + 76 67 // For blob uploads, we need to use a different approach than post_dpop_json 77 68 // since we're sending binary data, not JSON 78 69 // We need to use the same DPoP mechanism but with binary body 79 - 70 + 80 71 // Use the internal post_dpop function but for binary data 81 72 post_dpop_binary(http_client, dpop_auth, &url, blob_data, mime_type) 82 73 .await ··· 127 118 if !http_response.status().is_success() { 128 119 let status = http_response.status(); 129 120 let error_text = http_response.text().await.unwrap_or_else(|_| "unknown".to_string()); 130 - return Err(BlobUploadError::UploadFailed { 131 - status: status.as_u16(), 132 - message: error_text 121 + return Err(BlobUploadError::UploadFailed { 122 + status: status.as_u16(), 123 + message: error_text 133 124 }); 134 125 } 135 126 ··· 137 128 .json::<serde_json::Value>() 138 129 .await 139 130 .map_err(|e| BlobUploadError::HttpRequest(e.into()))?; 140 - 131 + 141 132 Ok(value) 142 - } 133 + }
+55 -20
api/src/jetstream.rs
··· 10 10 11 11 use crate::actor_resolver::resolve_actor_data; 12 12 use crate::database::Database; 13 + use crate::jetstream_cursor::PostgresCursorHandler; 13 14 use crate::models::{Record, Actor}; 14 15 use crate::errors::SliceError; 15 16 use crate::logging::{Logger, LogLevel}; ··· 18 19 consumer: Consumer, 19 20 database: Database, 20 21 http_client: Client, 21 - // Track which collections we should index for each slice 22 22 slice_collections: Arc<RwLock<HashMap<String, HashSet<String>>>>, 23 - // Track domains for each slice (slice_uri -> domain) 24 23 slice_domains: Arc<RwLock<HashMap<String, String>>>, 25 - // Cache for actor lookups 26 24 actor_cache: Arc<RwLock<HashMap<(String, String), bool>>>, 27 - // Lexicon cache for each slice 28 25 slice_lexicons: Arc<RwLock<HashMap<String, Vec<serde_json::Value>>>>, 29 - // Event counter for health monitoring 30 26 pub event_count: Arc<std::sync::atomic::AtomicU64>, 27 + cursor_handler: Option<Arc<PostgresCursorHandler>>, 31 28 } 32 29 33 30 // Event handler that implements the EventHandler trait ··· 37 34 slice_collections: Arc<RwLock<HashMap<String, HashSet<String>>>>, 38 35 slice_domains: Arc<RwLock<HashMap<String, String>>>, 39 36 event_count: Arc<std::sync::atomic::AtomicU64>, 40 - // Cache for (did, slice_uri) -> is_actor lookups 41 37 actor_cache: Arc<RwLock<HashMap<(String, String), bool>>>, 42 - // Lexicon cache for each slice 43 38 slice_lexicons: Arc<RwLock<HashMap<String, Vec<serde_json::Value>>>>, 39 + cursor_handler: Option<Arc<PostgresCursorHandler>>, 44 40 } 45 41 46 42 #[async_trait] 47 43 impl EventHandler for SliceEventHandler { 48 44 async fn handle_event(&self, event: JetstreamEvent) -> Result<()> { 49 - // Increment event counter 50 45 let count = self.event_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1; 51 - 52 - // Log every 10000 events to show activity (console only, not in DB) 53 - if count % 10000 == 0 { 46 + 47 + if count.is_multiple_of(10000) { 54 48 info!("Jetstream consumer has processed {} events", count); 55 49 } 56 - 50 + 51 + // Extract and update cursor position from event 52 + let time_us = match &event { 53 + JetstreamEvent::Commit { time_us, .. } => *time_us, 54 + JetstreamEvent::Delete { time_us, .. } => *time_us, 55 + JetstreamEvent::Identity { time_us, .. } => *time_us, 56 + JetstreamEvent::Account { time_us, .. } => *time_us, 57 + }; 58 + 59 + if let Some(cursor_handler) = &self.cursor_handler { 60 + cursor_handler.update_position(time_us); 61 + 62 + // Periodically write cursor to DB (debounced by handler) 63 + if let Err(e) = cursor_handler.maybe_write_cursor().await { 64 + error!("Failed to write cursor: {}", e); 65 + } 66 + } 67 + 57 68 match event { 58 69 JetstreamEvent::Commit { did, commit, .. } => { 59 70 if let Err(e) = self.handle_commit_event(&did, commit).await { ··· 501 512 } 502 513 503 514 impl JetstreamConsumer { 504 - pub async fn new(database: Database, jetstream_hostname: Option<String>) -> Result<Self, SliceError> { 515 + /// Create a new Jetstream consumer with optional cursor support 516 + /// 517 + /// # Arguments 518 + /// * `database` - Database connection for slice configurations and record storage 519 + /// * `jetstream_hostname` - Optional custom jetstream hostname 520 + /// * `cursor_handler` - Optional cursor handler for resumable event processing 521 + /// * `initial_cursor` - Optional starting cursor position (time_us) to resume from 522 + pub async fn new( 523 + database: Database, 524 + jetstream_hostname: Option<String>, 525 + cursor_handler: Option<Arc<PostgresCursorHandler>>, 526 + initial_cursor: Option<i64>, 527 + ) -> Result<Self, SliceError> { 505 528 let config = ConsumerTaskConfig { 506 529 user_agent: "slice-server/1.0".to_string(), 507 530 compression: false, 508 531 zstd_dictionary_location: String::new(), 509 532 jetstream_hostname: jetstream_hostname 510 533 .unwrap_or_else(|| "jetstream1.us-east.bsky.network".to_string()), 511 - collections: Vec::new(), // We'll update this dynamically based on slice configs 512 - dids: Vec::new(), // Subscribe to all DIDs 534 + collections: Vec::new(), 535 + dids: Vec::new(), 513 536 max_message_size_bytes: None, 514 - cursor: None, 515 - require_hello: true, // Match official example - enables proper handshake 537 + cursor: initial_cursor, 538 + require_hello: true, 516 539 }; 517 540 518 541 let consumer = Consumer::new(config); ··· 527 550 actor_cache: Arc::new(RwLock::new(HashMap::new())), 528 551 slice_lexicons: Arc::new(RwLock::new(HashMap::new())), 529 552 event_count: Arc::new(std::sync::atomic::AtomicU64::new(0)), 553 + cursor_handler, 530 554 }) 531 555 } 532 556 ··· 651 675 event_count: self.event_count.clone(), 652 676 actor_cache: self.actor_cache.clone(), 653 677 slice_lexicons: self.slice_lexicons.clone(), 678 + cursor_handler: self.cursor_handler.clone(), 654 679 }); 655 680 656 681 self.consumer.register_handler(handler).await ··· 671 696 672 697 // Start the consumer 673 698 info!("Starting Jetstream background consumer..."); 674 - self.consumer.run_background(cancellation_token).await 699 + let result = self.consumer.run_background(cancellation_token).await 675 700 .map_err(|e| SliceError::JetstreamError { 676 701 message: format!("Consumer failed: {}", e), 677 - })?; 678 - 702 + }); 703 + 704 + // Force write cursor on shutdown to ensure latest position is persisted 705 + if let Some(cursor_handler) = &self.cursor_handler { 706 + if let Err(e) = cursor_handler.force_write_cursor().await { 707 + error!("Failed to write final cursor position: {}", e); 708 + } else { 709 + info!("Final cursor position written to database"); 710 + } 711 + } 712 + 713 + result?; 679 714 Ok(()) 680 715 } 681 716
+143
api/src/jetstream_cursor.rs
··· 1 + use sqlx::PgPool; 2 + use std::sync::Arc; 3 + use std::sync::atomic::{AtomicU64, Ordering}; 4 + use std::time::{Duration, Instant}; 5 + use tokio::sync::Mutex; 6 + use tracing::{debug, warn}; 7 + 8 + /// Handles persistence of Jetstream cursor position to Postgres 9 + /// 10 + /// The cursor tracks the last processed event's time_us to enable resumption 11 + /// after disconnections or restarts. Writes are debounced to reduce DB load. 12 + pub struct PostgresCursorHandler { 13 + pool: PgPool, 14 + cursor_id: String, 15 + last_time_us: Arc<AtomicU64>, 16 + last_write: Arc<Mutex<Instant>>, 17 + write_interval: Duration, 18 + } 19 + 20 + impl PostgresCursorHandler { 21 + /// Create a new cursor handler 22 + /// 23 + /// # Arguments 24 + /// * `pool` - Database connection pool 25 + /// * `cursor_id` - Unique identifier for this cursor (e.g., "default", "instance-1") 26 + /// * `write_interval_secs` - Minimum seconds between cursor writes to reduce DB load 27 + pub fn new(pool: PgPool, cursor_id: String, write_interval_secs: u64) -> Self { 28 + Self { 29 + pool, 30 + cursor_id, 31 + last_time_us: Arc::new(AtomicU64::new(0)), 32 + last_write: Arc::new(Mutex::new(Instant::now())), 33 + write_interval: Duration::from_secs(write_interval_secs), 34 + } 35 + } 36 + 37 + /// Update the in-memory cursor position from an event's time_us 38 + /// 39 + /// This is called for every event but only writes to DB at intervals 40 + pub fn update_position(&self, time_us: u64) { 41 + self.last_time_us.store(time_us, Ordering::Relaxed); 42 + } 43 + 44 + /// Conditionally write cursor to Postgres if interval has elapsed 45 + /// 46 + /// This implements debouncing to avoid excessive DB writes while ensuring 47 + /// we don't lose more than write_interval seconds of progress on restart 48 + pub async fn maybe_write_cursor(&self) -> anyhow::Result<()> { 49 + let current_time_us = self.last_time_us.load(Ordering::Relaxed); 50 + if current_time_us == 0 { 51 + return Ok(()); 52 + } 53 + 54 + let mut last_write = self.last_write.lock().await; 55 + if last_write.elapsed() >= self.write_interval { 56 + sqlx::query!( 57 + r#" 58 + INSERT INTO jetstream_cursor (id, time_us, updated_at) 59 + VALUES ($1, $2, NOW()) 60 + ON CONFLICT (id) 61 + DO UPDATE SET time_us = $2, updated_at = NOW() 62 + "#, 63 + self.cursor_id, 64 + current_time_us as i64 65 + ) 66 + .execute(&self.pool) 67 + .await?; 68 + 69 + *last_write = Instant::now(); 70 + debug!( 71 + cursor = current_time_us, 72 + cursor_id = %self.cursor_id, 73 + "Updated jetstream cursor in Postgres" 74 + ); 75 + } 76 + Ok(()) 77 + } 78 + 79 + /// Force immediate write of cursor to Postgres, bypassing interval check 80 + /// 81 + /// Used during graceful shutdown to ensure latest position is persisted 82 + pub async fn force_write_cursor(&self) -> anyhow::Result<()> { 83 + let current_time_us = self.last_time_us.load(Ordering::Relaxed); 84 + if current_time_us == 0 { 85 + return Ok(()); 86 + } 87 + 88 + sqlx::query!( 89 + r#" 90 + INSERT INTO jetstream_cursor (id, time_us, updated_at) 91 + VALUES ($1, $2, NOW()) 92 + ON CONFLICT (id) 93 + DO UPDATE SET time_us = $2, updated_at = NOW() 94 + "#, 95 + self.cursor_id, 96 + current_time_us as i64 97 + ) 98 + .execute(&self.pool) 99 + .await?; 100 + 101 + let mut last_write = self.last_write.lock().await; 102 + *last_write = Instant::now(); 103 + 104 + debug!( 105 + cursor = current_time_us, 106 + cursor_id = %self.cursor_id, 107 + "Force wrote jetstream cursor to Postgres" 108 + ); 109 + Ok(()) 110 + } 111 + 112 + /// Read the last persisted cursor position from Postgres 113 + /// 114 + /// Returns None if no cursor exists or cursor is 0 (indicating fresh start) 115 + /// This should be called on startup to resume from last position 116 + pub async fn read_cursor(pool: &PgPool, cursor_id: &str) -> Option<i64> { 117 + match sqlx::query!( 118 + r#" 119 + SELECT time_us 120 + FROM jetstream_cursor 121 + WHERE id = $1 122 + "#, 123 + cursor_id 124 + ) 125 + .fetch_optional(pool) 126 + .await 127 + { 128 + Ok(Some(row)) => { 129 + let time_us = row.time_us; 130 + if time_us > 0 { 131 + Some(time_us) 132 + } else { 133 + None 134 + } 135 + } 136 + Ok(None) => None, 137 + Err(e) => { 138 + warn!(error = ?e, "Failed to read cursor from Postgres"); 139 + None 140 + } 141 + } 142 + } 143 + }
+86 -106
api/src/main.rs
··· 5 5 mod database; 6 6 mod errors; 7 7 mod jetstream; 8 + mod jetstream_cursor; 8 9 mod jobs; 9 10 mod logging; 10 11 mod models; ··· 25 26 use crate::database::Database; 26 27 use crate::errors::AppError; 27 28 use crate::jetstream::JetstreamConsumer; 28 - use crate::logging::{LogLevel, Logger, start_log_cleanup_task}; 29 + use crate::jetstream_cursor::PostgresCursorHandler; 30 + use crate::logging::{Logger, start_log_cleanup_task}; 29 31 30 32 #[derive(Clone)] 31 33 pub struct Config { ··· 118 120 // Create shared jetstream connection status 119 121 let jetstream_connected = Arc::new(AtomicBool::new(false)); 120 122 121 - // Start Jetstream consumer for real-time indexing with automatic recovery 123 + // Start Jetstream consumer with cursor persistence and improved reconnection logic 122 124 let database_for_jetstream = database.clone(); 125 + let pool_for_jetstream = pool.clone(); 123 126 let jetstream_connected_clone = jetstream_connected.clone(); 124 127 tokio::spawn(async move { 125 - let jetstream_hostname = env::var("JETSTREAM_HOSTNAME").ok(); // Optional, will use default if not set 128 + let jetstream_hostname = env::var("JETSTREAM_HOSTNAME").ok(); 129 + let cursor_write_interval = env::var("JETSTREAM_CURSOR_WRITE_INTERVAL_SECS") 130 + .unwrap_or_else(|_| "5".to_string()) 131 + .parse::<u64>() 132 + .unwrap_or(5); 126 133 127 - // Create initial consumer to start configuration reloader (only once) 128 - let initial_consumer = match JetstreamConsumer::new( 129 - database_for_jetstream.clone(), 130 - jetstream_hostname.clone(), 131 - ) 132 - .await 133 - { 134 - Ok(consumer) => { 135 - let consumer_arc = std::sync::Arc::new(consumer); 136 - // Start configuration reloader ONCE - it will run independently 137 - JetstreamConsumer::start_configuration_reloader(consumer_arc.clone()); 138 - Some(consumer_arc) 134 + // Reconnection rate limiting (5 retries per minute max) 135 + const MAX_RECONNECTS_PER_MINUTE: u32 = 5; 136 + const RECONNECT_WINDOW: tokio::time::Duration = tokio::time::Duration::from_secs(60); 137 + let mut reconnect_count = 0u32; 138 + let mut window_start = std::time::Instant::now(); 139 + 140 + let mut retry_delay = tokio::time::Duration::from_secs(5); 141 + const MAX_RETRY_DELAY: tokio::time::Duration = tokio::time::Duration::from_secs(300); 142 + 143 + // Configuration reloader setup (run once) 144 + let mut config_reloader_started = false; 145 + 146 + loop { 147 + // Rate limiting: reset counter if window has passed 148 + let now = std::time::Instant::now(); 149 + if now.duration_since(window_start) >= RECONNECT_WINDOW { 150 + reconnect_count = 0; 151 + window_start = now; 139 152 } 140 - Err(e) => { 141 - tracing::error!("Failed to create initial Jetstream consumer: {}", e); 142 - None 153 + 154 + // Check rate limit 155 + if reconnect_count >= MAX_RECONNECTS_PER_MINUTE { 156 + let wait_time = RECONNECT_WINDOW - now.duration_since(window_start); 157 + tracing::warn!( 158 + "Rate limit exceeded: {} reconnects in last minute, waiting {:?}", 159 + reconnect_count, wait_time 160 + ); 161 + tokio::time::sleep(wait_time).await; 162 + continue; 143 163 } 144 - }; 145 164 146 - // Retry loop for Jetstream consumer with exponential backoff 147 - let mut retry_delay = tokio::time::Duration::from_secs(5); // Start with 5 seconds 148 - const MAX_RETRY_DELAY: tokio::time::Duration = tokio::time::Duration::from_secs(300); // Cap at 5 minutes 149 - let mut current_consumer = initial_consumer; 165 + reconnect_count += 1; 150 166 151 - loop { 152 - tracing::info!("Starting Jetstream consumer..."); 153 - Logger::global().log_jetstream( 154 - LogLevel::Info, 155 - "Starting Jetstream consumer", 156 - Some(serde_json::json!({"action": "starting_consumer"})), 157 - ); 167 + // Read cursor position from database 168 + let initial_cursor = PostgresCursorHandler::read_cursor(&pool_for_jetstream, "default").await; 169 + if let Some(cursor) = initial_cursor { 170 + tracing::info!("Resuming from cursor position: {}", cursor); 171 + } else { 172 + tracing::info!("No cursor found, starting from latest events"); 173 + } 158 174 159 - // Use existing consumer or create new one 160 - let consumer_arc = match current_consumer.take() { 161 - Some(existing) => existing, 162 - None => { 163 - match JetstreamConsumer::new( 164 - database_for_jetstream.clone(), 165 - jetstream_hostname.clone(), 166 - ) 167 - .await 168 - { 169 - Ok(consumer) => std::sync::Arc::new(consumer), 170 - Err(e) => { 171 - let message = format!( 172 - "Failed to create Jetstream consumer: {} - will retry in {:?}", 173 - e, retry_delay 174 - ); 175 - tracing::error!("{}", message); 176 - Logger::global().log_jetstream( 177 - LogLevel::Error, 178 - &message, 179 - Some(serde_json::json!({ 180 - "error": e.to_string(), 181 - "retry_delay_secs": retry_delay.as_secs(), 182 - "action": "consumer_creation_failed" 183 - })), 184 - ); 185 - jetstream_connected_clone 186 - .store(false, std::sync::atomic::Ordering::Relaxed); 175 + // Create cursor handler 176 + let cursor_handler = Arc::new(PostgresCursorHandler::new( 177 + pool_for_jetstream.clone(), 178 + "default".to_string(), 179 + cursor_write_interval, 180 + )); 187 181 188 - // Wait before retrying 189 - tokio::time::sleep(retry_delay).await; 190 - retry_delay = std::cmp::min(retry_delay * 2, MAX_RETRY_DELAY); 191 - continue; 192 - } 182 + // Create consumer with cursor support 183 + let consumer_result = JetstreamConsumer::new( 184 + database_for_jetstream.clone(), 185 + jetstream_hostname.clone(), 186 + Some(cursor_handler.clone()), 187 + initial_cursor, 188 + ).await; 189 + 190 + let consumer_arc = match consumer_result { 191 + Ok(consumer) => { 192 + let arc = Arc::new(consumer); 193 + 194 + // Start configuration reloader only once 195 + if !config_reloader_started { 196 + JetstreamConsumer::start_configuration_reloader(arc.clone()); 197 + config_reloader_started = true; 193 198 } 199 + 200 + arc 201 + } 202 + Err(e) => { 203 + tracing::error!("Failed to create Jetstream consumer: {} - retry in {:?}", e, retry_delay); 204 + jetstream_connected_clone.store(false, std::sync::atomic::Ordering::Relaxed); 205 + tokio::time::sleep(retry_delay).await; 206 + retry_delay = std::cmp::min(retry_delay * 2, MAX_RETRY_DELAY); 207 + continue; 194 208 } 195 209 }; 196 210 197 - // Reset retry delay on successful connection 211 + // Reset retry delay on successful creation 198 212 retry_delay = tokio::time::Duration::from_secs(5); 199 213 200 - // Mark as connected when consumer starts successfully 214 + tracing::info!("Starting Jetstream consumer with cursor support..."); 201 215 jetstream_connected_clone.store(true, std::sync::atomic::Ordering::Relaxed); 202 - tracing::info!("Jetstream consumer connected successfully"); 203 - Logger::global().log_jetstream( 204 - LogLevel::Info, 205 - "Jetstream consumer connected", 206 - Some(serde_json::json!({ 207 - "action": "consumer_connected" 208 - })), 209 - ); 210 216 211 - // Start consuming events 217 + // Start consuming with cancellation token 212 218 let cancellation_token = atproto_jetstream::CancellationToken::new(); 213 219 match consumer_arc.start_consuming(cancellation_token).await { 214 - Err(e) => { 215 - tracing::error!( 216 - "Jetstream consumer disconnected: {} - will retry in {:?}", 217 - e, 218 - retry_delay 219 - ); 220 - Logger::global().log_jetstream( 221 - LogLevel::Error, 222 - &format!("Jetstream consumer disconnected: {}", e), 223 - Some(serde_json::json!({ 224 - "error": e.to_string(), 225 - "retry_delay_secs": retry_delay.as_secs(), 226 - "action": "consumer_disconnected" 227 - })), 228 - ); 229 - // Mark as disconnected on failure 220 + Ok(_) => { 221 + tracing::info!("Jetstream consumer shut down normally"); 230 222 jetstream_connected_clone.store(false, std::sync::atomic::Ordering::Relaxed); 231 223 } 232 - Ok(_) => { 233 - tracing::info!("Jetstream consumer closed normally"); 234 - Logger::global().log_jetstream( 235 - LogLevel::Info, 236 - "Jetstream consumer closed normally", 237 - Some(serde_json::json!({ 238 - "action": "consumer_closed" 239 - })), 240 - ); 241 - // This shouldn't happen in normal operation since start_consuming should run indefinitely 224 + Err(e) => { 225 + tracing::error!("Jetstream consumer failed: {} - will reconnect", e); 242 226 jetstream_connected_clone.store(false, std::sync::atomic::Ordering::Relaxed); 227 + tokio::time::sleep(retry_delay).await; 228 + retry_delay = std::cmp::min(retry_delay * 2, MAX_RETRY_DELAY); 243 229 } 244 230 } 245 - 246 - // Wait before retrying with exponential backoff 247 - tokio::time::sleep(retry_delay).await; 248 - 249 - // Increase retry delay, but cap it at MAX_RETRY_DELAY 250 - retry_delay = std::cmp::min(retry_delay * 2, MAX_RETRY_DELAY); 251 231 } 252 232 }); 253 233