+35
-7
src/config/mod.rs
+35
-7
src/config/mod.rs
···
18
18
#[derive(Debug, Clone)]
19
19
pub struct JetstreamConfig {
20
20
pub url: String,
21
+
pub fallback_urls: Vec<String>,
21
22
pub wanted_collections: Vec<String>,
22
23
pub cursor_update_interval: u64,
24
+
pub retry_delay_secs: u64,
25
+
pub max_retry_delay_secs: u64,
23
26
}
24
27
25
28
#[derive(Debug, Clone)]
···
48
51
#[derive(Debug, Clone)]
49
52
pub struct PlcConfig {
50
53
pub endpoint: String,
54
+
pub fallback_endpoints: Vec<String>,
51
55
}
52
56
53
57
#[derive(Debug, Clone)]
···
85
89
"JETSTREAM_URL",
86
90
Some("wss://jetstream.atproto.tools/subscribe"),
87
91
)?,
92
+
fallback_urls: get_env_list(
93
+
"JETSTREAM_FALLBACK_URLS",
94
+
vec![
95
+
"wss://jetstream1.us-east.fire.hose.cam/subscribe".to_string(),
96
+
"wss://jetstream2.us-east.fire.hose.cam/subscribe".to_string(),
97
+
],
98
+
),
88
99
wanted_collections: vec!["app.bsky.feed.post".to_string()],
89
100
cursor_update_interval: get_env_u64("CURSOR_UPDATE_INTERVAL", 10_000),
101
+
retry_delay_secs: get_env_u64("JETSTREAM_RETRY_DELAY_SECS", 5),
102
+
max_retry_delay_secs: get_env_u64("JETSTREAM_MAX_RETRY_DELAY_SECS", 300),
90
103
},
91
104
redis: RedisConfig {
92
105
url: get_env("REDIS_URL", Some("redis://localhost:6379"))?,
···
105
118
},
106
119
plc: PlcConfig {
107
120
endpoint: get_env("PLC_ENDPOINT", Some("https://plc.directory"))?,
121
+
fallback_endpoints: get_env_list(
122
+
"PLC_FALLBACK_ENDPOINTS",
123
+
vec!["https://plc.directory".to_string()],
124
+
),
108
125
},
109
126
automod: AutomodConfig {
110
127
handle: get_env("AUTOMOD_HANDLE", None)
···
129
146
130
147
/// Get environment variable with optional default
131
148
fn get_env(key: &str, default: Option<&str>) -> Result<String> {
132
-
env::var(key)
133
-
.into_diagnostic()
134
-
.or_else(|_| {
135
-
default
136
-
.ok_or_else(|| miette::miette!("Missing required environment variable: {}", key))
137
-
.map(String::from)
138
-
})
149
+
env::var(key).into_diagnostic().or_else(|_| {
150
+
default
151
+
.ok_or_else(|| miette::miette!("Missing required environment variable: {}", key))
152
+
.map(String::from)
153
+
})
139
154
}
140
155
141
156
/// Get environment variable as u32 with default
···
170
185
.map(|v| {
171
186
let v = v.to_lowercase();
172
187
v == "true" || v == "1" || v == "yes"
188
+
})
189
+
.unwrap_or(default)
190
+
}
191
+
192
+
/// Get environment variable as comma-separated list with default
193
+
fn get_env_list(key: &str, default: Vec<String>) -> Vec<String> {
194
+
env::var(key)
195
+
.ok()
196
+
.map(|v| {
197
+
v.split(',')
198
+
.map(|s| s.trim().to_string())
199
+
.filter(|s| !s.is_empty())
200
+
.collect()
173
201
})
174
202
.unwrap_or(default)
175
203
}
+1
src/jetstream/events.rs
+1
src/jetstream/events.rs
+2
-2
src/jetstream/mod.rs
+2
-2
src/jetstream/mod.rs
···
28
28
pub async fn subscribe(
29
29
self,
30
30
job_sender: mpsc::UnboundedSender<ImageJob>,
31
-
mut shutdown_rx: tokio::sync::oneshot::Receiver<()>,
31
+
mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
32
32
) -> Result<()> {
33
33
info!("Connecting to Jetstream: {}", self.url);
34
34
···
109
109
}
110
110
}
111
111
}
112
-
_ = &mut shutdown_rx => {
112
+
_ = shutdown_rx.recv() => {
113
113
info!("Shutting down Jetstream client");
114
114
info!("Processed {} total messages", message_count);
115
115
+3
src/lib.rs
+3
src/lib.rs
+54
-11
src/main.rs
+54
-11
src/main.rs
···
80
80
81
81
// Create shutdown channels
82
82
let (_shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
83
-
let (jetstream_shutdown_tx, jetstream_shutdown_rx) = tokio::sync::oneshot::channel::<()>();
83
+
let (jetstream_shutdown_tx, _jetstream_shutdown_rx) = tokio::sync::broadcast::channel::<()>(1);
84
84
85
85
// Create job channel for jetstream -> queue
86
86
let (job_tx, mut job_rx) = mpsc::unbounded_channel();
···
91
91
info!("Resuming from cursor: {}", c);
92
92
}
93
93
94
-
// Start jetstream subscriber
95
-
info!("Starting Jetstream subscriber...");
94
+
// Start jetstream subscriber with retry logic
95
+
info!("Starting Jetstream subscriber with auto-retry...");
96
96
let jetstream_config = config.clone();
97
-
let jetstream_handle = tokio::spawn(async move {
98
-
let jetstream = JetstreamClient::new(jetstream_config.jetstream.url.clone(), cursor)
99
-
.expect("Failed to create Jetstream client");
97
+
let jetstream_shutdown_broadcast = jetstream_shutdown_tx.clone();
98
+
let _jetstream_handle = tokio::spawn(async move {
99
+
let mut retry_delay = jetstream_config.jetstream.retry_delay_secs;
100
+
let max_retry_delay = jetstream_config.jetstream.max_retry_delay_secs;
101
+
let mut url_index = 0;
102
+
103
+
// Build list of URLs to try (primary + fallbacks)
104
+
let mut urls = vec![jetstream_config.jetstream.url.clone()];
105
+
urls.extend(jetstream_config.jetstream.fallback_urls.clone());
106
+
107
+
info!("Jetstream URLs configured: {} total (1 primary + {} fallbacks)",
108
+
urls.len(), urls.len() - 1);
109
+
110
+
loop {
111
+
let current_url = &urls[url_index];
112
+
let cursor = skywatch_phash_rs::jetstream::cursor::read_cursor();
113
+
114
+
info!("Attempting Jetstream connection to: {} (retry delay: {}s)", current_url, retry_delay);
100
115
101
-
if let Err(e) = jetstream.subscribe(job_tx, jetstream_shutdown_rx).await {
102
-
error!("Jetstream subscriber failed: {}", e);
116
+
let jetstream = match JetstreamClient::new(current_url.clone(), cursor) {
117
+
Ok(client) => client,
118
+
Err(e) => {
119
+
error!("Failed to create Jetstream client: {}", e);
120
+
tokio::time::sleep(Duration::from_secs(retry_delay)).await;
121
+
retry_delay = (retry_delay * 2).min(max_retry_delay);
122
+
url_index = (url_index + 1) % urls.len();
123
+
continue;
124
+
}
125
+
};
126
+
127
+
// Create new shutdown receiver for this attempt
128
+
let shutdown_rx = jetstream_shutdown_broadcast.subscribe();
129
+
130
+
match jetstream.subscribe(job_tx.clone(), shutdown_rx).await {
131
+
Ok(_) => {
132
+
info!("Jetstream subscriber completed normally (shutdown received)");
133
+
break;
134
+
}
135
+
Err(e) => {
136
+
error!("Jetstream connection failed: {} - Retrying in {}s", e, retry_delay);
137
+
tokio::time::sleep(Duration::from_secs(retry_delay)).await;
138
+
139
+
// Exponential backoff with max cap
140
+
retry_delay = (retry_delay * 2).min(max_retry_delay);
141
+
142
+
// Rotate to next URL
143
+
url_index = (url_index + 1) % urls.len();
144
+
if url_index == 0 {
145
+
info!("Tried all Jetstream URLs, cycling back to primary");
146
+
}
147
+
}
148
+
}
103
149
}
104
150
});
105
151
···
185
231
}
186
232
_ = all_workers_future => {
187
233
info!("All workers completed");
188
-
}
189
-
_ = jetstream_handle => {
190
-
info!("Jetstream subscriber completed");
191
234
}
192
235
}
193
236