A Gleam WebSocket consumer for AT Protocol Jetstream events.

feat: add automatic retry logic to start_consumer (v1.1.0)

- merge retry configuration into JetstreamConfig
- add max_backoff_seconds, log_connection_events, log_retry_attempts fields
- start_consumer now always retries with exponential backoff
- distinguish between harmless timeouts and real connection failures
- update documentation with integrated retry options
- update example and tests

+34
CHANGELOG.md
···
··· 1 + # Changelog 2 + 3 + All notable changes to this project will be documented in this file. 4 + 5 + The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), 6 + and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). 7 + 8 + ## [1.1.0] - 2025-01-29 9 + 10 + ### Added 11 + - Automatic retry logic with exponential backoff for all connections 12 + - Three new configuration fields in `JetstreamConfig`: 13 + - `max_backoff_seconds: Int` - Maximum wait time between retries (default: 60) 14 + - `log_connection_events: Bool` - Log connection state changes (default: True) 15 + - `log_retry_attempts: Bool` - Log detailed retry information (default: True) 16 + 17 + ### Changed 18 + - `start_consumer()` now automatically retries failed connections and handles disconnections 19 + - Enhanced error handling distinguishes between harmless timeouts and real connection failures 20 + 21 + ### Fixed 22 + - Connection failures no longer cause application to stop 23 + - Harmless 60-second timeouts no longer trigger unnecessary reconnections 24 + - WebSocket disconnections are handled gracefully with automatic reconnection 25 + 26 + ## [1.0.0] - 2024-10-28 27 + 28 + ### Added 29 + - Initial release 30 + - WebSocket consumer for AT Protocol Jetstream events 31 + - Support for collection and DID filtering 32 + - Zstd compression support 33 + - Cursor-based replay 34 + - Event parsing for Commit, Identity, and Account events
+67 -15
README.md
··· 9 gleam add goose@1 10 ``` 11 12 - ## Example 13 14 ```gleam 15 import goose 16 import gleam/io 17 - import gleam/option 18 19 pub fn main() { 20 - // Create a default configuration 21 let config = goose.default_config() 22 23 - // Or configure with custom options 24 - let config = goose.JetstreamConfig( 25 - endpoint: "wss://jetstream2.us-east.bsky.network/subscribe", 26 - wanted_collections: ["app.bsky.feed.post", "app.bsky.feed.like"], 27 - wanted_dids: [], 28 - cursor: option.None, 29 - max_message_size_bytes: option.None, 30 - compress: False, 31 - require_hello: False, 32 - ) 33 - 34 - // Start consuming events 35 goose.start_consumer(config, fn(json_event) { 36 let event = goose.parse_event(json_event) 37 ··· 53 } 54 ``` 55 56 ## Configuration Options 57 58 ### `wanted_collections` 59 An array of Collection NSIDs to filter which records you receive (default: empty = all collections) ··· 131 require_hello: True 132 ``` 133 134 ## Full Configuration Example 135 136 ```gleam ··· 145 max_message_size_bytes: option.Some(2097152), // 2MB 146 compress: True, 147 require_hello: False, 148 ) 149 150 goose.start_consumer(config, handle_event)
··· 9 gleam add goose@1 10 ``` 11 12 + ## Quick Start 13 14 ```gleam 15 import goose 16 import gleam/io 17 18 pub fn main() { 19 + // Use default config with automatic retry logic 20 let config = goose.default_config() 21 22 goose.start_consumer(config, fn(json_event) { 23 let event = goose.parse_event(json_event) 24 ··· 40 } 41 ``` 42 43 + ### Custom Configuration 44 + 45 + ```gleam 46 + import goose 47 + import gleam/option 48 + 49 + pub fn main() { 50 + let config = goose.JetstreamConfig( 51 + endpoint: "wss://jetstream2.us-east.bsky.network/subscribe", 52 + wanted_collections: ["app.bsky.feed.post", "app.bsky.feed.like"], 53 + wanted_dids: [], 54 + cursor: option.None, 55 + max_message_size_bytes: option.None, 56 + compress: False, 57 + require_hello: False, 58 + // Retry configuration 59 + max_backoff_seconds: 60, // Max wait between retries 60 + log_connection_events: True, // Log connects/disconnects 61 + log_retry_attempts: False, // Skip verbose retry logs 62 + ) 63 + 64 + goose.start_consumer(config, handle_event) 65 + } 66 + ``` 67 + 68 ## Configuration Options 69 + 70 + **Note:** Goose automatically handles connection failures with exponential backoff retry logic (1s, 2s, 4s, 8s, 16s, 32s, up to max). All connections automatically retry on failure, reconnect on disconnection, and distinguish between harmless timeouts and real errors. 71 72 ### `wanted_collections` 73 An array of Collection NSIDs to filter which records you receive (default: empty = all collections) ··· 145 require_hello: True 146 ``` 147 148 + ### `max_backoff_seconds` 149 + Maximum wait time in seconds between retry attempts 150 + 151 + - Uses exponential backoff: 1s, 2s, 4s, 8s, 16s, 32s, then capped at this value 152 + - Default: `60` 153 + 154 + **Example:** 155 + ```gleam 156 + max_backoff_seconds: 120 // Allow up to 2 minute waits between retries 157 + ``` 158 + 159 + ### `log_connection_events` 160 + Whether to log connection state changes (connected, disconnected) 161 + 162 + - Set to `True` to log important connection events 163 + - Default: `True` 164 + - Recommended: `True` for production (know when disconnects happen) 165 + 166 + **Example:** 167 + ```gleam 168 + log_connection_events: True 169 + ``` 170 + 171 + ### `log_retry_attempts` 172 + Whether to log detailed retry attempt information 173 + 174 + - Set to `True` to log attempt numbers, errors, and backoff times 175 + - Default: `True` 176 + - Recommended: `False` for production (reduces log noise) 177 + 178 + **Example:** 179 + ```gleam 180 + log_retry_attempts: False // Production: skip verbose retry logs 181 + ``` 182 + 183 ## Full Configuration Example 184 185 ```gleam ··· 194 max_message_size_bytes: option.Some(2097152), // 2MB 195 compress: True, 196 require_hello: False, 197 + max_backoff_seconds: 60, 198 + log_connection_events: True, 199 + log_retry_attempts: False, 200 ) 201 202 goose.start_consumer(config, handle_event)
+4 -1
example/src/example.gleam
··· 13 max_message_size_bytes: option.None, 14 compress: True, 15 require_hello: False, 16 ) 17 18 io.println("Starting Jetstream consumer...") 19 io.println("Connected to: " <> config.endpoint) 20 io.println("Listening for all events...\n") 21 22 - // Start consuming and log all events 23 goose.start_consumer(config, fn(json_event) { 24 let event = goose.parse_event(json_event) 25
··· 13 max_message_size_bytes: option.None, 14 compress: True, 15 require_hello: False, 16 + max_backoff_seconds: 60, 17 + log_connection_events: True, 18 + log_retry_attempts: True, 19 ) 20 21 io.println("Starting Jetstream consumer...") 22 io.println("Connected to: " <> config.endpoint) 23 io.println("Listening for all events...\n") 24 25 + // Start consuming and log all events (automatically retries on failure) 26 goose.start_consumer(config, fn(json_event) { 27 let event = goose.parse_event(json_event) 28
+1 -1
gleam.toml
··· 1 name = "goose" 2 - version = "1.0.0" 3 description = "A Gleam WebSocket consumer for AT Protocol Jetstream events" 4 licences = ["Apache-2.0"] 5 repository = { type = "github", user = "bigmoves", repo = "goose" }
··· 1 name = "goose" 2 + version = "1.1.0" 3 description = "A Gleam WebSocket consumer for AT Protocol Jetstream events" 4 licences = ["Apache-2.0"] 5 repository = { type = "github", user = "bigmoves", repo = "goose" }
+131 -12
src/goose.gleam
··· 1 import gleam/dynamic.{type Dynamic} 2 import gleam/dynamic/decode 3 import gleam/erlang/process.{type Pid} 4 import gleam/io 5 import gleam/json 6 import gleam/list ··· 44 max_message_size_bytes: Option(Int), 45 compress: Bool, 46 require_hello: Bool, 47 ) 48 } 49 50 /// Create a default configuration for US East endpoint 51 pub fn default_config() -> JetstreamConfig { 52 JetstreamConfig( 53 endpoint: "wss://jetstream2.us-east.bsky.network/subscribe", ··· 57 max_message_size_bytes: option.None, 58 compress: False, 59 require_hello: False, 60 ) 61 } 62 ··· 127 compress: Bool, 128 ) -> Result(Pid, Dynamic) 129 130 - /// Start consuming the Jetstream feed 131 pub fn start_consumer( 132 config: JetstreamConfig, 133 on_event: fn(String) -> Nil, 134 ) -> Nil { 135 let url = build_url(config) 136 let self = process.self() 137 let result = connect(url, self, config.compress) 138 139 case result { 140 Ok(_conn_pid) -> { 141 - receive_loop(on_event) 142 } 143 Error(err) -> { 144 - io.println("Failed to connect to Jetstream") 145 - io.println_error(string.inspect(err)) 146 } 147 } 148 } 149 150 - /// Receive loop for WebSocket messages 151 - fn receive_loop(on_event: fn(String) -> Nil) -> Nil { 152 - // Call Erlang to receive one message 153 case receive_ws_message() { 154 Ok(text) -> { 155 on_event(text) 156 - receive_loop(on_event) 157 } 158 - Error(_) -> { 159 - // Timeout or error, continue loop 160 - receive_loop(on_event) 161 } 162 } 163 } 164 165 /// Receive a WebSocket message from the message queue 166 @external(erlang, "goose_ffi", "receive_ws_message") 167 - fn receive_ws_message() -> Result(String, Nil) 168 169 /// Parse a JSON event string into a JetstreamEvent 170 pub fn parse_event(json_string: String) -> JetstreamEvent {
··· 1 import gleam/dynamic.{type Dynamic} 2 import gleam/dynamic/decode 3 + import gleam/erlang/atom 4 import gleam/erlang/process.{type Pid} 5 + import gleam/int 6 import gleam/io 7 import gleam/json 8 import gleam/list ··· 46 max_message_size_bytes: Option(Int), 47 compress: Bool, 48 require_hello: Bool, 49 + /// Maximum backoff time in seconds for retry logic (default: 60) 50 + max_backoff_seconds: Int, 51 + /// Whether to log connection events (connected, disconnected) (default: True) 52 + log_connection_events: Bool, 53 + /// Whether to log retry attempts and errors (default: True) 54 + log_retry_attempts: Bool, 55 ) 56 } 57 58 /// Create a default configuration for US East endpoint 59 + /// Includes automatic retry with exponential backoff (1s, 2s, 4s, 8s, 16s, 32s, capped at 60s) 60 pub fn default_config() -> JetstreamConfig { 61 JetstreamConfig( 62 endpoint: "wss://jetstream2.us-east.bsky.network/subscribe", ··· 66 max_message_size_bytes: option.None, 67 compress: False, 68 require_hello: False, 69 + max_backoff_seconds: 60, 70 + log_connection_events: True, 71 + log_retry_attempts: True, 72 ) 73 } 74 ··· 139 compress: Bool, 140 ) -> Result(Pid, Dynamic) 141 142 + /// Start consuming the Jetstream feed with automatic retry logic 143 + /// 144 + /// Handles connection failures gracefully with exponential backoff and automatic reconnection. 145 + /// The retry behavior is configured through the JetstreamConfig fields: 146 + /// - max_backoff_seconds: Maximum wait time between retries 147 + /// - log_connection_events: Log connects/disconnects 148 + /// - log_retry_attempts: Log retry attempts and errors 149 + /// 150 + /// Example: 151 + /// ```gleam 152 + /// let config = goose.default_config() 153 + /// 154 + /// goose.start_consumer(config, fn(event_json) { 155 + /// // Handle event 156 + /// io.println(event_json) 157 + /// }) 158 + /// ``` 159 pub fn start_consumer( 160 config: JetstreamConfig, 161 on_event: fn(String) -> Nil, 162 ) -> Nil { 163 + start_with_retry_internal(config, on_event, 0) 164 + } 165 + 166 + /// Internal function to handle connection with retry 167 + fn start_with_retry_internal( 168 + config: JetstreamConfig, 169 + on_event: fn(String) -> Nil, 170 + retry_count: Int, 171 + ) -> Nil { 172 let url = build_url(config) 173 let self = process.self() 174 let result = connect(url, self, config.compress) 175 176 case result { 177 Ok(_conn_pid) -> { 178 + case config.log_connection_events { 179 + True -> io.println("Connected to Jetstream successfully") 180 + False -> Nil 181 + } 182 + // Start receiving with retry support 183 + receive_with_retry(config, on_event) 184 } 185 Error(err) -> { 186 + // Connection failed, calculate backoff and retry 187 + let backoff_seconds = calculate_backoff(retry_count, config) 188 + case config.log_retry_attempts { 189 + True -> { 190 + io.println( 191 + "Failed to connect to Jetstream (attempt " 192 + <> int.to_string(retry_count + 1) 193 + <> "): " 194 + <> string.inspect(err), 195 + ) 196 + io.println( 197 + "Retrying in " <> int.to_string(backoff_seconds) <> " seconds...", 198 + ) 199 + } 200 + False -> Nil 201 + } 202 + 203 + // Sleep for backoff period 204 + process.sleep(backoff_seconds * 1000) 205 + 206 + // Retry connection 207 + start_with_retry_internal(config, on_event, retry_count + 1) 208 } 209 } 210 } 211 212 + /// Calculate exponential backoff with configurable maximum 213 + fn calculate_backoff(retry_count: Int, config: JetstreamConfig) -> Int { 214 + let backoff = case retry_count { 215 + 0 -> 1 216 + 1 -> 2 217 + 2 -> 4 218 + 3 -> 8 219 + 4 -> 16 220 + 5 -> 32 221 + _ -> config.max_backoff_seconds 222 + } 223 + 224 + // Cap at max_backoff_seconds 225 + case backoff > config.max_backoff_seconds { 226 + True -> config.max_backoff_seconds 227 + False -> backoff 228 + } 229 + } 230 + 231 + /// Receive messages with retry logic 232 + fn receive_with_retry( 233 + config: JetstreamConfig, 234 + on_event: fn(String) -> Nil, 235 + ) -> Nil { 236 case receive_ws_message() { 237 Ok(text) -> { 238 on_event(text) 239 + receive_with_retry(config, on_event) 240 } 241 + Error(error_dynamic) -> { 242 + // Decode error type 243 + let atm = atom.cast_from_dynamic(error_dynamic) 244 + let error_type = atom.to_string(atm) 245 + 246 + case error_type { 247 + "timeout" -> { 248 + // No messages in 60s, connection is alive - continue 249 + receive_with_retry(config, on_event) 250 + } 251 + "closed" -> { 252 + // Connection closed - log if configured 253 + case config.log_connection_events { 254 + True -> io.println("Jetstream connection closed, reconnecting...") 255 + False -> Nil 256 + } 257 + start_with_retry_internal(config, on_event, 0) 258 + } 259 + "connection_error" -> { 260 + // Connection error - log if configured 261 + case config.log_connection_events { 262 + True -> io.println("Jetstream connection error, reconnecting...") 263 + False -> Nil 264 + } 265 + start_with_retry_internal(config, on_event, 0) 266 + } 267 + _ -> { 268 + // Unknown error - log if retry logging is enabled 269 + case config.log_retry_attempts { 270 + True -> { 271 + io.println("Unknown Jetstream error: " <> error_type) 272 + io.println("Reconnecting...") 273 + } 274 + False -> Nil 275 + } 276 + start_with_retry_internal(config, on_event, 0) 277 + } 278 + } 279 } 280 } 281 } 282 283 /// Receive a WebSocket message from the message queue 284 + /// Returns Ok(text) for messages, or Error with one of: timeout, closed, connection_error 285 @external(erlang, "goose_ffi", "receive_ws_message") 286 + fn receive_ws_message() -> Result(String, Dynamic) 287 288 /// Parse a JSON event string into a JetstreamEvent 289 pub fn parse_event(json_string: String) -> JetstreamEvent {
+4 -4
src/goose_ffi.erl
··· 11 %% Ignore binary messages, try again 12 receive_ws_message(); 13 {ws_closed, _Reason} -> 14 - {error, nil}; 15 {ws_error, _Reason} -> 16 - {error, nil}; 17 _Other -> 18 %% Ignore unexpected messages 19 receive_ws_message() 20 after 60000 -> 21 - %% Timeout - return error to continue loop 22 - {error, nil} 23 end.
··· 11 %% Ignore binary messages, try again 12 receive_ws_message(); 13 {ws_closed, _Reason} -> 14 + {error, closed}; 15 {ws_error, _Reason} -> 16 + {error, connection_error}; 17 _Other -> 18 %% Ignore unexpected messages 19 receive_ws_message() 20 after 60000 -> 21 + %% Timeout - connection is still alive, just no messages 22 + {error, timeout} 23 end.
+24
test/goose_test.gleam
··· 26 max_message_size_bytes: option.None, 27 compress: False, 28 require_hello: False, 29 ) 30 let url = goose.build_url(config) 31 ··· 44 max_message_size_bytes: option.None, 45 compress: False, 46 require_hello: False, 47 ) 48 let url = goose.build_url(config) 49 ··· 62 max_message_size_bytes: option.None, 63 compress: False, 64 require_hello: False, 65 ) 66 let url = goose.build_url(config) 67 ··· 80 max_message_size_bytes: option.None, 81 compress: False, 82 require_hello: False, 83 ) 84 let url = goose.build_url(config) 85 ··· 98 max_message_size_bytes: option.Some(1_048_576), 99 compress: False, 100 require_hello: False, 101 ) 102 let url = goose.build_url(config) 103 ··· 116 max_message_size_bytes: option.None, 117 compress: True, 118 require_hello: False, 119 ) 120 let url = goose.build_url(config) 121 ··· 134 max_message_size_bytes: option.None, 135 compress: False, 136 require_hello: True, 137 ) 138 let url = goose.build_url(config) 139 ··· 152 max_message_size_bytes: option.Some(2_097_152), 153 compress: True, 154 require_hello: True, 155 ) 156 let url = goose.build_url(config) 157
··· 26 max_message_size_bytes: option.None, 27 compress: False, 28 require_hello: False, 29 + max_backoff_seconds: 60, 30 + log_connection_events: True, 31 + log_retry_attempts: True, 32 ) 33 let url = goose.build_url(config) 34 ··· 47 max_message_size_bytes: option.None, 48 compress: False, 49 require_hello: False, 50 + max_backoff_seconds: 60, 51 + log_connection_events: True, 52 + log_retry_attempts: True, 53 ) 54 let url = goose.build_url(config) 55 ··· 68 max_message_size_bytes: option.None, 69 compress: False, 70 require_hello: False, 71 + max_backoff_seconds: 60, 72 + log_connection_events: True, 73 + log_retry_attempts: True, 74 ) 75 let url = goose.build_url(config) 76 ··· 89 max_message_size_bytes: option.None, 90 compress: False, 91 require_hello: False, 92 + max_backoff_seconds: 60, 93 + log_connection_events: True, 94 + log_retry_attempts: True, 95 ) 96 let url = goose.build_url(config) 97 ··· 110 max_message_size_bytes: option.Some(1_048_576), 111 compress: False, 112 require_hello: False, 113 + max_backoff_seconds: 60, 114 + log_connection_events: True, 115 + log_retry_attempts: True, 116 ) 117 let url = goose.build_url(config) 118 ··· 131 max_message_size_bytes: option.None, 132 compress: True, 133 require_hello: False, 134 + max_backoff_seconds: 60, 135 + log_connection_events: True, 136 + log_retry_attempts: True, 137 ) 138 let url = goose.build_url(config) 139 ··· 152 max_message_size_bytes: option.None, 153 compress: False, 154 require_hello: True, 155 + max_backoff_seconds: 60, 156 + log_connection_events: True, 157 + log_retry_attempts: True, 158 ) 159 let url = goose.build_url(config) 160 ··· 173 max_message_size_bytes: option.Some(2_097_152), 174 compress: True, 175 require_hello: True, 176 + max_backoff_seconds: 60, 177 + log_connection_events: True, 178 + log_retry_attempts: True, 179 ) 180 let url = goose.build_url(config) 181