A rust implementation of skywatch-phash

feat: Add support for Jetstream and PLC fallback URLs

This commit adds support for fallback URLs for both Jetstream and PLC.
This provides resilience in case of primary server outages. Also
includes retry logic for Jetstream with exponential backoff and max
delay.

Skywatch c1ea582b 48e85261

Changed files
+95 -20
src
config
jetstream
+35 -7
src/config/mod.rs
··· 18 18 #[derive(Debug, Clone)] 19 19 pub struct JetstreamConfig { 20 20 pub url: String, 21 + pub fallback_urls: Vec<String>, 21 22 pub wanted_collections: Vec<String>, 22 23 pub cursor_update_interval: u64, 24 + pub retry_delay_secs: u64, 25 + pub max_retry_delay_secs: u64, 23 26 } 24 27 25 28 #[derive(Debug, Clone)] ··· 48 51 #[derive(Debug, Clone)] 49 52 pub struct PlcConfig { 50 53 pub endpoint: String, 54 + pub fallback_endpoints: Vec<String>, 51 55 } 52 56 53 57 #[derive(Debug, Clone)] ··· 85 89 "JETSTREAM_URL", 86 90 Some("wss://jetstream.atproto.tools/subscribe"), 87 91 )?, 92 + fallback_urls: get_env_list( 93 + "JETSTREAM_FALLBACK_URLS", 94 + vec![ 95 + "wss://jetstream1.us-east.fire.hose.cam/subscribe".to_string(), 96 + "wss://jetstream2.us-east.fire.hose.cam/subscribe".to_string(), 97 + ], 98 + ), 88 99 wanted_collections: vec!["app.bsky.feed.post".to_string()], 89 100 cursor_update_interval: get_env_u64("CURSOR_UPDATE_INTERVAL", 10_000), 101 + retry_delay_secs: get_env_u64("JETSTREAM_RETRY_DELAY_SECS", 5), 102 + max_retry_delay_secs: get_env_u64("JETSTREAM_MAX_RETRY_DELAY_SECS", 300), 90 103 }, 91 104 redis: RedisConfig { 92 105 url: get_env("REDIS_URL", Some("redis://localhost:6379"))?, ··· 105 118 }, 106 119 plc: PlcConfig { 107 120 endpoint: get_env("PLC_ENDPOINT", Some("https://plc.directory"))?, 121 + fallback_endpoints: get_env_list( 122 + "PLC_FALLBACK_ENDPOINTS", 123 + vec!["https://plc.directory".to_string()], 124 + ), 108 125 }, 109 126 automod: AutomodConfig { 110 127 handle: get_env("AUTOMOD_HANDLE", None) ··· 129 146 130 147 /// Get environment variable with optional default 131 148 fn get_env(key: &str, default: Option<&str>) -> Result<String> { 132 - env::var(key) 133 - .into_diagnostic() 134 - .or_else(|_| { 135 - default 136 - .ok_or_else(|| miette::miette!("Missing required environment variable: {}", key)) 137 - .map(String::from) 138 - }) 149 + env::var(key).into_diagnostic().or_else(|_| { 150 + default 151 + .ok_or_else(|| miette::miette!("Missing required environment variable: {}", key)) 152 + .map(String::from) 153 + }) 139 154 } 140 155 141 156 /// Get environment variable as u32 with default ··· 170 185 .map(|v| { 171 186 let v = v.to_lowercase(); 172 187 v == "true" || v == "1" || v == "yes" 188 + }) 189 + .unwrap_or(default) 190 + } 191 + 192 + /// Get environment variable as comma-separated list with default 193 + fn get_env_list(key: &str, default: Vec<String>) -> Vec<String> { 194 + env::var(key) 195 + .ok() 196 + .map(|v| { 197 + v.split(',') 198 + .map(|s| s.trim().to_string()) 199 + .filter(|s| !s.is_empty()) 200 + .collect() 173 201 }) 174 202 .unwrap_or(default) 175 203 }
+1
src/jetstream/events.rs
··· 91 91 #[cfg(test)] 92 92 mod tests { 93 93 use super::*; 94 + use jacquard_common::types::string::AtprotoStr; 94 95 use jacquard_common::types::value::{Array, Object}; 95 96 use jacquard_common::CowStr; 96 97 use std::collections::BTreeMap;
+2 -2
src/jetstream/mod.rs
··· 28 28 pub async fn subscribe( 29 29 self, 30 30 job_sender: mpsc::UnboundedSender<ImageJob>, 31 - mut shutdown_rx: tokio::sync::oneshot::Receiver<()>, 31 + mut shutdown_rx: tokio::sync::broadcast::Receiver<()>, 32 32 ) -> Result<()> { 33 33 info!("Connecting to Jetstream: {}", self.url); 34 34 ··· 109 109 } 110 110 } 111 111 } 112 - _ = &mut shutdown_rx => { 112 + _ = shutdown_rx.recv() => { 113 113 info!("Shutting down Jetstream client"); 114 114 info!("Processed {} total messages", message_count); 115 115
+3
src/lib.rs
··· 23 23 // Metrics 24 24 pub mod metrics; 25 25 26 + // PLC Directory client 27 + pub mod plc; 28 + 26 29 // Re-export commonly used types 27 30 pub use config::Config; 28 31 pub use types::{BlobCheck, BlobReference, ImageJob, MatchResult};
+54 -11
src/main.rs
··· 80 80 81 81 // Create shutdown channels 82 82 let (_shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>(); 83 - let (jetstream_shutdown_tx, jetstream_shutdown_rx) = tokio::sync::oneshot::channel::<()>(); 83 + let (jetstream_shutdown_tx, _jetstream_shutdown_rx) = tokio::sync::broadcast::channel::<()>(1); 84 84 85 85 // Create job channel for jetstream -> queue 86 86 let (job_tx, mut job_rx) = mpsc::unbounded_channel(); ··· 91 91 info!("Resuming from cursor: {}", c); 92 92 } 93 93 94 - // Start jetstream subscriber 95 - info!("Starting Jetstream subscriber..."); 94 + // Start jetstream subscriber with retry logic 95 + info!("Starting Jetstream subscriber with auto-retry..."); 96 96 let jetstream_config = config.clone(); 97 - let jetstream_handle = tokio::spawn(async move { 98 - let jetstream = JetstreamClient::new(jetstream_config.jetstream.url.clone(), cursor) 99 - .expect("Failed to create Jetstream client"); 97 + let jetstream_shutdown_broadcast = jetstream_shutdown_tx.clone(); 98 + let _jetstream_handle = tokio::spawn(async move { 99 + let mut retry_delay = jetstream_config.jetstream.retry_delay_secs; 100 + let max_retry_delay = jetstream_config.jetstream.max_retry_delay_secs; 101 + let mut url_index = 0; 102 + 103 + // Build list of URLs to try (primary + fallbacks) 104 + let mut urls = vec![jetstream_config.jetstream.url.clone()]; 105 + urls.extend(jetstream_config.jetstream.fallback_urls.clone()); 106 + 107 + info!("Jetstream URLs configured: {} total (1 primary + {} fallbacks)", 108 + urls.len(), urls.len() - 1); 109 + 110 + loop { 111 + let current_url = &urls[url_index]; 112 + let cursor = skywatch_phash_rs::jetstream::cursor::read_cursor(); 113 + 114 + info!("Attempting Jetstream connection to: {} (retry delay: {}s)", current_url, retry_delay); 100 115 101 - if let Err(e) = jetstream.subscribe(job_tx, jetstream_shutdown_rx).await { 102 - error!("Jetstream subscriber failed: {}", e); 116 + let jetstream = match JetstreamClient::new(current_url.clone(), cursor) { 117 + Ok(client) => client, 118 + Err(e) => { 119 + error!("Failed to create Jetstream client: {}", e); 120 + tokio::time::sleep(Duration::from_secs(retry_delay)).await; 121 + retry_delay = (retry_delay * 2).min(max_retry_delay); 122 + url_index = (url_index + 1) % urls.len(); 123 + continue; 124 + } 125 + }; 126 + 127 + // Create new shutdown receiver for this attempt 128 + let shutdown_rx = jetstream_shutdown_broadcast.subscribe(); 129 + 130 + match jetstream.subscribe(job_tx.clone(), shutdown_rx).await { 131 + Ok(_) => { 132 + info!("Jetstream subscriber completed normally (shutdown received)"); 133 + break; 134 + } 135 + Err(e) => { 136 + error!("Jetstream connection failed: {} - Retrying in {}s", e, retry_delay); 137 + tokio::time::sleep(Duration::from_secs(retry_delay)).await; 138 + 139 + // Exponential backoff with max cap 140 + retry_delay = (retry_delay * 2).min(max_retry_delay); 141 + 142 + // Rotate to next URL 143 + url_index = (url_index + 1) % urls.len(); 144 + if url_index == 0 { 145 + info!("Tried all Jetstream URLs, cycling back to primary"); 146 + } 147 + } 148 + } 103 149 } 104 150 }); 105 151 ··· 185 231 } 186 232 _ = all_workers_future => { 187 233 info!("All workers completed"); 188 - } 189 - _ = jetstream_handle => { 190 - info!("Jetstream subscriber completed"); 191 234 } 192 235 } 193 236