A Gleam WebSocket consumer for AT Protocol Jetstream events.

use stratus pkg instead of gun for websocket client, remove logging/retry options

+1
.gitignore
··· 2 2 *.ez 3 3 **/build 4 4 erl_crash.dump 5 + .claude
+50
CHANGELOG.md
··· 5 5 The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), 6 6 and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). 7 7 8 + ## [2.0.0] - 2025-11-04 9 + 10 + ### Changed 11 + - **BREAKING**: Migrated from Erlang's `gun` library to Gleam's `stratus` WebSocket client 12 + - **BREAKING**: Removed three configuration fields from `JetstreamConfig`: 13 + - `max_backoff_seconds` (retry logic still works internally, capped at 60s) 14 + - `log_connection_events` (connection logging removed) 15 + - `log_retry_attempts` (retry logging removed) 16 + - Reduced Erlang FFI code from ~290 lines to ~40 lines 17 + - Improved code maintainability with more idiomatic Gleam implementation 18 + 19 + ### Added 20 + - Vendored `stratus` WebSocket client internally (moved to `goose/stratus`) until a new version is published 21 + - New dependency: `simplifile` (>= 2.0.0 and < 3.0.0) 22 + - New dependencies (from stratus): `gleam_otp`, `gleam_crypto`, `logging`, `exception`, `gramps` 23 + 24 + ### Removed 25 + - Dependency: `gun` (>= 2.2.0 and < 3.0.0) 26 + 27 + ### Migration Guide 28 + If you're upgrading from v1.x, remove the following fields from your `JetstreamConfig`: 29 + ```gleam 30 + // Before (v1.x) 31 + let config = goose.JetstreamConfig( 32 + endpoint: "wss://jetstream2.us-east.bsky.network/subscribe", 33 + wanted_collections: [], 34 + wanted_dids: [], 35 + cursor: option.None, 36 + max_message_size_bytes: option.None, 37 + compress: True, 38 + require_hello: False, 39 + max_backoff_seconds: 60, // Remove this 40 + log_connection_events: True, // Remove this 41 + log_retry_attempts: True, // Remove this 42 + ) 43 + 44 + // After (v2.x) 45 + let config = goose.JetstreamConfig( 46 + endpoint: "wss://jetstream2.us-east.bsky.network/subscribe", 47 + wanted_collections: [], 48 + wanted_dids: [], 49 + cursor: option.None, 50 + max_message_size_bytes: option.None, 51 + compress: True, 52 + require_hello: False, 53 + ) 54 + ``` 55 + 56 + All other functionality remains the same. Automatic retry with exponential backoff still works internally. 57 + 8 58 ## [1.1.0] - 2025-01-29 9 59 10 60 ### Added
+2 -44
README.md
··· 53 53 wanted_dids: [], 54 54 cursor: option.None, 55 55 max_message_size_bytes: option.None, 56 - compress: False, 56 + compress: True, 57 57 require_hello: False, 58 - // Retry configuration 59 - max_backoff_seconds: 60, // Max wait between retries 60 - log_connection_events: True, // Log connects/disconnects 61 - log_retry_attempts: False, // Skip verbose retry logs 62 58 ) 63 59 64 60 goose.start_consumer(config, handle_event) ··· 67 63 68 64 ## Configuration Options 69 65 70 - **Note:** Goose automatically handles connection failures with exponential backoff retry logic (1s, 2s, 4s, 8s, 16s, 32s, up to max). All connections automatically retry on failure, reconnect on disconnection, and distinguish between harmless timeouts and real errors. 66 + **Note:** Goose automatically handles connection failures with exponential backoff retry logic (1s, 2s, 4s, 8s, 16s, 32s, capped at 60s). All connections automatically retry on failure and reconnect on disconnection. 71 67 72 68 ### `wanted_collections` 73 69 An array of Collection NSIDs to filter which records you receive (default: empty = all collections) ··· 145 141 require_hello: True 146 142 ``` 147 143 148 - ### `max_backoff_seconds` 149 - Maximum wait time in seconds between retry attempts 150 - 151 - - Uses exponential backoff: 1s, 2s, 4s, 8s, 16s, 32s, then capped at this value 152 - - Default: `60` 153 - 154 - **Example:** 155 - ```gleam 156 - max_backoff_seconds: 120 // Allow up to 2 minute waits between retries 157 - ``` 158 - 159 - ### `log_connection_events` 160 - Whether to log connection state changes (connected, disconnected) 161 - 162 - - Set to `True` to log important connection events 163 - - Default: `True` 164 - - Recommended: `True` for production (know when disconnects happen) 165 - 166 - **Example:** 167 - ```gleam 168 - log_connection_events: True 169 - ``` 170 - 171 - ### `log_retry_attempts` 172 - Whether to log detailed retry attempt information 173 - 174 - - Set to `True` to log attempt numbers, errors, and backoff times 175 - - Default: `True` 176 - - Recommended: `False` for production (reduces log noise) 177 - 178 - **Example:** 179 - ```gleam 180 - log_retry_attempts: False // Production: skip verbose retry logs 181 - ``` 182 - 183 144 ## Full Configuration Example 184 145 185 146 ```gleam ··· 194 155 max_message_size_bytes: option.Some(2097152), // 2MB 195 156 compress: True, 196 157 require_hello: False, 197 - max_backoff_seconds: 60, 198 - log_connection_events: True, 199 - log_retry_attempts: False, 200 158 ) 201 159 202 160 goose.start_consumer(config, handle_event)
+9 -4
example/manifest.toml
··· 2 2 # You typically do not need to edit this file 3 3 4 4 packages = [ 5 - { name = "cowlib", version = "2.16.0", build_tools = ["make", "rebar3"], requirements = [], otp_app = "cowlib", source = "hex", outer_checksum = "7F478D80D66B747344F0EA7708C187645CFCC08B11AA424632F78E25BF05DB51" }, 5 + { name = "exception", version = "2.1.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "exception", source = "hex", outer_checksum = "329D269D5C2A314F7364BD2711372B6F2C58FA6F39981572E5CA68624D291F8C" }, 6 6 { name = "ezstd", version = "1.2.3", build_tools = ["rebar3"], requirements = [], otp_app = "ezstd", source = "hex", outer_checksum = "DE32E0B41BA36A9ED46DB8215DA74777D2F141BB75F67BFC05DBB4B7C3386DEE" }, 7 + { name = "filepath", version = "1.1.2", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "filepath", source = "hex", outer_checksum = "B06A9AF0BF10E51401D64B98E4B627F1D2E48C154967DA7AF4D0914780A6D40A" }, 8 + { name = "gleam_crypto", version = "1.5.1", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_crypto", source = "hex", outer_checksum = "50774BAFFF1144E7872814C566C5D653D83A3EBF23ACC3156B757A1B6819086E" }, 7 9 { name = "gleam_erlang", version = "1.3.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_erlang", source = "hex", outer_checksum = "1124AD3AA21143E5AF0FC5CF3D9529F6DB8CA03E43A55711B60B6B7B3874375C" }, 8 10 { name = "gleam_http", version = "4.3.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_http", source = "hex", outer_checksum = "82EA6A717C842456188C190AFB372665EA56CE13D8559BF3B1DD9E40F619EE0C" }, 9 11 { name = "gleam_json", version = "3.0.2", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_json", source = "hex", outer_checksum = "874FA3C3BB6E22DD2BB111966BD40B3759E9094E05257899A7C08F5DE77EC049" }, 12 + { name = "gleam_otp", version = "1.2.0", build_tools = ["gleam"], requirements = ["gleam_erlang", "gleam_stdlib"], otp_app = "gleam_otp", source = "hex", outer_checksum = "BA6A294E295E428EC1562DC1C11EA7530DCB981E8359134BEABC8493B7B2258E" }, 10 13 { name = "gleam_stdlib", version = "0.65.0", build_tools = ["gleam"], requirements = [], otp_app = "gleam_stdlib", source = "hex", outer_checksum = "7C69C71D8C493AE11A5184828A77110EB05A7786EBF8B25B36A72F879C3EE107" }, 11 - { name = "gleeunit", version = "1.7.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleeunit", source = "hex", outer_checksum = "CD701726CBCE5588B375D157B4391CFD0F2F134CD12D9B6998A395484DE05C58" }, 12 - { name = "goose", version = "1.0.0", build_tools = ["gleam"], requirements = ["ezstd", "gleam_erlang", "gleam_http", "gleam_json", "gleam_stdlib", "gun"], source = "local", path = ".." }, 13 - { name = "gun", version = "2.2.0", build_tools = ["make", "rebar3"], requirements = ["cowlib"], otp_app = "gun", source = "hex", outer_checksum = "76022700C64287FEB4DF93A1795CFF6741B83FB37415C40C34C38D2A4645261A" }, 14 + { name = "gleeunit", version = "1.8.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleeunit", source = "hex", outer_checksum = "7AE0F64B26CC065ED705FF7CA5F4EDAB8015E72A883736FE251E46FACCCE1E08" }, 15 + { name = "goose", version = "2.0.0", build_tools = ["gleam"], requirements = ["exception", "ezstd", "gleam_crypto", "gleam_erlang", "gleam_http", "gleam_json", "gleam_otp", "gleam_stdlib", "gramps", "logging", "simplifile"], source = "local", path = ".." }, 16 + { name = "gramps", version = "6.0.0", build_tools = ["gleam"], requirements = ["gleam_crypto", "gleam_erlang", "gleam_http", "gleam_stdlib"], otp_app = "gramps", source = "hex", outer_checksum = "8B7195978FBFD30B43DF791A8A272041B81E45D245314D7A41FC57237AA882A0" }, 17 + { name = "logging", version = "1.3.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "logging", source = "hex", outer_checksum = "1098FBF10B54B44C2C7FDF0B01C1253CAFACDACABEFB4B0D027803246753E06D" }, 18 + { name = "simplifile", version = "2.3.0", build_tools = ["gleam"], requirements = ["filepath", "gleam_stdlib"], otp_app = "simplifile", source = "hex", outer_checksum = "0A868DAC6063D9E983477981839810DC2E553285AB4588B87E3E9C96A7FB4CB4" }, 14 19 ] 15 20 16 21 [requirements]
-3
example/src/example.gleam
··· 13 13 max_message_size_bytes: option.None, 14 14 compress: True, 15 15 require_hello: False, 16 - max_backoff_seconds: 60, 17 - log_connection_events: True, 18 - log_retry_attempts: True, 19 16 ) 20 17 21 18 io.println("Starting Jetstream consumer...")
+7 -2
gleam.toml
··· 1 1 name = "goose" 2 - version = "1.1.0" 2 + version = "2.0.0" 3 3 description = "A Gleam WebSocket consumer for AT Protocol Jetstream events" 4 4 licences = ["Apache-2.0"] 5 5 repository = { type = "github", user = "bigmoves", repo = "goose" } ··· 7 7 [dependencies] 8 8 gleam_stdlib = ">= 0.44.0 and < 2.0.0" 9 9 gleam_erlang = ">= 1.0.0 and < 2.0.0" 10 + gleam_otp = ">= 1.0.0 and < 2.0.0" 10 11 gleam_http = ">= 4.0.0 and < 5.0.0" 11 12 gleam_json = ">= 3.0.2 and < 4.0.0" 12 - gun = ">= 2.2.0 and < 3.0.0" 13 + gleam_crypto = ">= 1.5.0 and < 2.0.0" 13 14 ezstd = ">= 1.2.0 and < 2.0.0" 15 + simplifile = ">= 2.0.0 and < 3.0.0" 16 + logging = ">= 1.3.0 and < 2.0.0" 17 + exception = ">= 2.1.0 and < 3.0.0" 18 + gramps = ">= 6.0.0 and < 7.0.0" 14 19 15 20 [dev-dependencies] 16 21 gleeunit = ">= 1.0.0 and < 2.0.0"
+14 -4
manifest.toml
··· 2 2 # You typically do not need to edit this file 3 3 4 4 packages = [ 5 - { name = "cowlib", version = "2.16.0", build_tools = ["make", "rebar3"], requirements = [], otp_app = "cowlib", source = "hex", outer_checksum = "7F478D80D66B747344F0EA7708C187645CFCC08B11AA424632F78E25BF05DB51" }, 5 + { name = "exception", version = "2.1.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "exception", source = "hex", outer_checksum = "329D269D5C2A314F7364BD2711372B6F2C58FA6F39981572E5CA68624D291F8C" }, 6 6 { name = "ezstd", version = "1.2.3", build_tools = ["rebar3"], requirements = [], otp_app = "ezstd", source = "hex", outer_checksum = "DE32E0B41BA36A9ED46DB8215DA74777D2F141BB75F67BFC05DBB4B7C3386DEE" }, 7 + { name = "filepath", version = "1.1.2", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "filepath", source = "hex", outer_checksum = "B06A9AF0BF10E51401D64B98E4B627F1D2E48C154967DA7AF4D0914780A6D40A" }, 8 + { name = "gleam_crypto", version = "1.5.1", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_crypto", source = "hex", outer_checksum = "50774BAFFF1144E7872814C566C5D653D83A3EBF23ACC3156B757A1B6819086E" }, 7 9 { name = "gleam_erlang", version = "1.3.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_erlang", source = "hex", outer_checksum = "1124AD3AA21143E5AF0FC5CF3D9529F6DB8CA03E43A55711B60B6B7B3874375C" }, 8 10 { name = "gleam_http", version = "4.3.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_http", source = "hex", outer_checksum = "82EA6A717C842456188C190AFB372665EA56CE13D8559BF3B1DD9E40F619EE0C" }, 9 11 { name = "gleam_json", version = "3.0.2", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_json", source = "hex", outer_checksum = "874FA3C3BB6E22DD2BB111966BD40B3759E9094E05257899A7C08F5DE77EC049" }, 12 + { name = "gleam_otp", version = "1.2.0", build_tools = ["gleam"], requirements = ["gleam_erlang", "gleam_stdlib"], otp_app = "gleam_otp", source = "hex", outer_checksum = "BA6A294E295E428EC1562DC1C11EA7530DCB981E8359134BEABC8493B7B2258E" }, 10 13 { name = "gleam_stdlib", version = "0.65.0", build_tools = ["gleam"], requirements = [], otp_app = "gleam_stdlib", source = "hex", outer_checksum = "7C69C71D8C493AE11A5184828A77110EB05A7786EBF8B25B36A72F879C3EE107" }, 11 - { name = "gleeunit", version = "1.6.1", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleeunit", source = "hex", outer_checksum = "FDC68A8C492B1E9B429249062CD9BAC9B5538C6FBF584817205D0998C42E1DAC" }, 12 - { name = "gun", version = "2.2.0", build_tools = ["make", "rebar3"], requirements = ["cowlib"], otp_app = "gun", source = "hex", outer_checksum = "76022700C64287FEB4DF93A1795CFF6741B83FB37415C40C34C38D2A4645261A" }, 14 + { name = "gleeunit", version = "1.8.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleeunit", source = "hex", outer_checksum = "7AE0F64B26CC065ED705FF7CA5F4EDAB8015E72A883736FE251E46FACCCE1E08" }, 15 + { name = "gramps", version = "6.0.0", build_tools = ["gleam"], requirements = ["gleam_crypto", "gleam_erlang", "gleam_http", "gleam_stdlib"], otp_app = "gramps", source = "hex", outer_checksum = "8B7195978FBFD30B43DF791A8A272041B81E45D245314D7A41FC57237AA882A0" }, 16 + { name = "logging", version = "1.3.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "logging", source = "hex", outer_checksum = "1098FBF10B54B44C2C7FDF0B01C1253CAFACDACABEFB4B0D027803246753E06D" }, 17 + { name = "simplifile", version = "2.3.0", build_tools = ["gleam"], requirements = ["filepath", "gleam_stdlib"], otp_app = "simplifile", source = "hex", outer_checksum = "0A868DAC6063D9E983477981839810DC2E553285AB4588B87E3E9C96A7FB4CB4" }, 13 18 ] 14 19 15 20 [requirements] 21 + exception = { version = ">= 2.1.0 and < 3.0.0" } 16 22 ezstd = { version = ">= 1.2.0 and < 2.0.0" } 23 + gleam_crypto = { version = ">= 1.5.0 and < 2.0.0" } 17 24 gleam_erlang = { version = ">= 1.0.0 and < 2.0.0" } 18 25 gleam_http = { version = ">= 4.0.0 and < 5.0.0" } 19 26 gleam_json = { version = ">= 3.0.2 and < 4.0.0" } 27 + gleam_otp = { version = ">= 1.0.0 and < 2.0.0" } 20 28 gleam_stdlib = { version = ">= 0.44.0 and < 2.0.0" } 21 29 gleeunit = { version = ">= 1.0.0 and < 2.0.0" } 22 - gun = { version = ">= 2.2.0 and < 3.0.0" } 30 + gramps = { version = ">= 6.0.0 and < 7.0.0" } 31 + logging = { version = ">= 1.3.0 and < 2.0.0" } 32 + simplifile = { version = ">= 2.0.0 and < 3.0.0" }
+172 -111
src/goose.gleam
··· 1 + import gleam/bit_array 1 2 import gleam/dynamic.{type Dynamic} 2 3 import gleam/dynamic/decode 3 - import gleam/erlang/atom 4 - import gleam/erlang/process.{type Pid} 5 - import gleam/int 6 - import gleam/io 4 + import gleam/erlang/process 5 + import gleam/http/request 7 6 import gleam/json 8 7 import gleam/list 9 8 import gleam/option.{type Option} 9 + import gleam/result 10 10 import gleam/string 11 + import goose/internal/zstd 12 + import goose/stratus 13 + import simplifile 11 14 12 15 /// Jetstream event types 13 16 pub type JetstreamEvent { ··· 46 49 max_message_size_bytes: Option(Int), 47 50 compress: Bool, 48 51 require_hello: Bool, 49 - /// Maximum backoff time in seconds for retry logic (default: 60) 50 - max_backoff_seconds: Int, 51 - /// Whether to log connection events (connected, disconnected) (default: True) 52 - log_connection_events: Bool, 53 - /// Whether to log retry attempts and errors (default: True) 54 - log_retry_attempts: Bool, 52 + ) 53 + } 54 + 55 + /// Internal state for WebSocket connection 56 + type ConnectionState { 57 + ConnectionState( 58 + config: JetstreamConfig, 59 + on_event: fn(String) -> Nil, 60 + decompressor: Option(Decompressor), 61 + ) 62 + } 63 + 64 + /// Decompression context holder 65 + type Decompressor { 66 + Decompressor( 67 + dctx: zstd.DecompressionContext, 68 + ddict: zstd.DecompressionDict, 55 69 ) 56 70 } 57 71 ··· 66 80 max_message_size_bytes: option.None, 67 81 compress: False, 68 82 require_hello: False, 69 - max_backoff_seconds: 60, 70 - log_connection_events: True, 71 - log_retry_attempts: True, 72 83 ) 73 84 } 74 85 ··· 131 142 } 132 143 } 133 144 134 - /// Connect to Jetstream WebSocket using Erlang gun library 135 - @external(erlang, "goose_ws_ffi", "connect") 136 - pub fn connect( 137 - url: String, 138 - handler_pid: Pid, 139 - compress: Bool, 140 - ) -> Result(Pid, Dynamic) 145 + /// Load the zstd decompression dictionary 146 + fn load_decompressor() -> Result(Decompressor, String) { 147 + // Get priv directory 148 + let priv_dir = get_priv_dir() 149 + let dict_path = priv_dir <> "/zstd_dictionary" 150 + 151 + // Read dictionary file 152 + use dict_data <- result.try( 153 + simplifile.read_bits(dict_path) 154 + |> result.map_error(fn(_) { 155 + "Failed to load zstd dictionary from " <> dict_path 156 + }), 157 + ) 158 + 159 + // Create decompression context and dictionary 160 + let dctx = zstd.create_decompression_context(1024 * 1024) 161 + let ddict = zstd.create_ddict(dict_data) 162 + 163 + // Select dictionary for context 164 + use _ <- result.try(zstd.select_ddict(dctx, ddict)) 165 + 166 + Ok(Decompressor(dctx: dctx, ddict: ddict)) 167 + } 168 + 169 + /// Get the priv directory path for this application 170 + @external(erlang, "goose_ffi", "priv_dir") 171 + fn get_priv_dir() -> String 172 + 173 + /// Decompress zstd-compressed data 174 + fn decompress_data( 175 + data: BitArray, 176 + decompressor: Decompressor, 177 + ) -> Result(String, String) { 178 + // Try decompress_using_ddict first (works for frames with content size) 179 + case zstd.decompress_using_ddict(data, decompressor.ddict) { 180 + Ok(decompressed) -> 181 + bit_array.to_string(decompressed) 182 + |> result.replace_error("Failed to decode decompressed data as UTF-8") 183 + Error(msg) -> { 184 + // Check if error is due to unknown content size 185 + case string.contains(msg, "ZSTD_CONTENTSIZE_UNKNOWN") { 186 + True -> { 187 + // Frame doesn't have content size, use streaming with dictionary-loaded context 188 + case zstd.decompress_streaming(decompressor.dctx, data) { 189 + Ok(decompressed) -> 190 + bit_array.to_string(decompressed) 191 + |> result.replace_error( 192 + "Failed to decode streaming decompressed data as UTF-8", 193 + ) 194 + Error(_stream_err) -> Error("Streaming decompression failed") 195 + } 196 + } 197 + False -> Error("Decompression failed") 198 + } 199 + } 200 + } 201 + } 141 202 142 203 /// Start consuming the Jetstream feed with automatic retry logic 143 204 /// ··· 170 231 retry_count: Int, 171 232 ) -> Nil { 172 233 let url = build_url(config) 173 - let self = process.self() 174 - let result = connect(url, self, config.compress) 234 + 235 + // Convert wss:// to https:// and ws:// to http:// for request parsing 236 + let http_url = 237 + url 238 + |> string.replace("wss://", "https://") 239 + |> string.replace("ws://", "http://") 240 + 241 + // Parse URL into HTTP request 242 + let assert Ok(req) = request.to(http_url) 243 + 244 + // Load decompressor if compression is enabled 245 + let decompressor = case config.compress { 246 + True -> 247 + case load_decompressor() { 248 + Ok(dec) -> option.Some(dec) 249 + Error(_err) -> option.None 250 + } 251 + False -> option.None 252 + } 253 + 254 + // Create initial state 255 + let state = 256 + ConnectionState( 257 + config: config, 258 + on_event: on_event, 259 + decompressor: decompressor, 260 + ) 261 + 262 + // Start WebSocket connection 263 + let result = 264 + stratus.new_with_initialiser( 265 + request: req, 266 + init: fn() { Ok(stratus.initialised(state)) }, 267 + ) 268 + |> stratus.on_message(handle_message) 269 + |> stratus.on_close(handle_close) 270 + |> stratus.with_connect_timeout(30_000) 271 + |> stratus.start() 175 272 176 273 case result { 177 - Ok(_conn_pid) -> { 178 - case config.log_connection_events { 179 - True -> io.println("Connected to Jetstream successfully") 180 - False -> Nil 181 - } 182 - // Start receiving with retry support 183 - receive_with_retry(config, on_event) 274 + Ok(_websocket) -> { 275 + // Keep process alive indefinitely 276 + process.sleep_forever() 184 277 } 185 - Error(err) -> { 278 + Error(_err) -> { 186 279 // Connection failed, calculate backoff and retry 187 - let backoff_seconds = calculate_backoff(retry_count, config) 188 - case config.log_retry_attempts { 189 - True -> { 190 - io.println( 191 - "Failed to connect to Jetstream (attempt " 192 - <> int.to_string(retry_count + 1) 193 - <> "): " 194 - <> string.inspect(err), 195 - ) 196 - io.println( 197 - "Retrying in " <> int.to_string(backoff_seconds) <> " seconds...", 198 - ) 199 - } 200 - False -> Nil 201 - } 280 + let backoff_seconds = calculate_backoff(retry_count) 202 281 203 282 // Sleep for backoff period 204 283 process.sleep(backoff_seconds * 1000) ··· 209 288 } 210 289 } 211 290 212 - /// Calculate exponential backoff with configurable maximum 213 - fn calculate_backoff(retry_count: Int, config: JetstreamConfig) -> Int { 214 - let backoff = case retry_count { 215 - 0 -> 1 216 - 1 -> 2 217 - 2 -> 4 218 - 3 -> 8 219 - 4 -> 16 220 - 5 -> 32 221 - _ -> config.max_backoff_seconds 222 - } 223 - 224 - // Cap at max_backoff_seconds 225 - case backoff > config.max_backoff_seconds { 226 - True -> config.max_backoff_seconds 227 - False -> backoff 228 - } 229 - } 230 - 231 - /// Receive messages with retry logic 232 - fn receive_with_retry( 233 - config: JetstreamConfig, 234 - on_event: fn(String) -> Nil, 235 - ) -> Nil { 236 - case receive_ws_message() { 237 - Ok(text) -> { 238 - on_event(text) 239 - receive_with_retry(config, on_event) 291 + /// Handle incoming WebSocket messages 292 + fn handle_message( 293 + state: ConnectionState, 294 + msg: stratus.Message(Nil), 295 + _conn: stratus.Connection, 296 + ) -> stratus.Next(ConnectionState, Nil) { 297 + case msg { 298 + stratus.Text(text) -> { 299 + state.on_event(text) 300 + stratus.continue(state) 240 301 } 241 - Error(error_dynamic) -> { 242 - // Decode error type 243 - let atm = atom.cast_from_dynamic(error_dynamic) 244 - let error_type = atom.to_string(atm) 245 - 246 - case error_type { 247 - "timeout" -> { 248 - // No messages in 60s, connection is alive - continue 249 - receive_with_retry(config, on_event) 250 - } 251 - "closed" -> { 252 - // Connection closed - log if configured 253 - case config.log_connection_events { 254 - True -> io.println("Jetstream connection closed, reconnecting...") 255 - False -> Nil 256 - } 257 - start_with_retry_internal(config, on_event, 0) 258 - } 259 - "connection_error" -> { 260 - // Connection error - log if configured 261 - case config.log_connection_events { 262 - True -> io.println("Jetstream connection error, reconnecting...") 263 - False -> Nil 264 - } 265 - start_with_retry_internal(config, on_event, 0) 266 - } 267 - _ -> { 268 - // Unknown error - log if retry logging is enabled 269 - case config.log_retry_attempts { 270 - True -> { 271 - io.println("Unknown Jetstream error: " <> error_type) 272 - io.println("Reconnecting...") 302 + stratus.Binary(data) -> { 303 + // Handle compressed binary data if decompressor is available 304 + case state.decompressor { 305 + option.Some(decompressor) -> { 306 + case decompress_data(data, decompressor) { 307 + Ok(text) -> { 308 + state.on_event(text) 309 + stratus.continue(state) 273 310 } 274 - False -> Nil 311 + Error(_err) -> { 312 + // Skip frames that fail to decompress 313 + stratus.continue(state) 314 + } 275 315 } 276 - start_with_retry_internal(config, on_event, 0) 316 + } 317 + option.None -> { 318 + // No decompressor, ignore binary messages 319 + stratus.continue(state) 277 320 } 278 321 } 279 322 } 323 + stratus.User(_) -> { 324 + // No custom user messages in this implementation 325 + stratus.continue(state) 326 + } 280 327 } 281 328 } 282 329 283 - /// Receive a WebSocket message from the message queue 284 - /// Returns Ok(text) for messages, or Error with one of: timeout, closed, connection_error 285 - @external(erlang, "goose_ffi", "receive_ws_message") 286 - fn receive_ws_message() -> Result(String, Dynamic) 330 + /// Handle WebSocket connection close 331 + fn handle_close(state: ConnectionState) -> Nil { 332 + // Reconnect from the beginning 333 + start_with_retry_internal(state.config, state.on_event, 0) 334 + } 335 + 336 + /// Calculate exponential backoff capped at 60 seconds 337 + fn calculate_backoff(retry_count: Int) -> Int { 338 + case retry_count { 339 + 0 -> 1 340 + 1 -> 2 341 + 2 -> 4 342 + 3 -> 8 343 + 4 -> 16 344 + 5 -> 32 345 + _ -> 60 346 + } 347 + } 287 348 288 349 /// Parse a JSON event string into a JetstreamEvent 289 350 pub fn parse_event(json_string: String) -> JetstreamEvent {
+51
src/goose/internal/zstd.gleam
··· 1 + import gleam/erlang/atom 2 + 3 + /// Opaque type for decompression context 4 + pub type DecompressionContext 5 + 6 + /// Opaque type for decompression dictionary 7 + pub type DecompressionDict 8 + 9 + /// Create a decompression context with a given buffer size 10 + @external(erlang, "ezstd", "create_decompression_context") 11 + pub fn create_decompression_context(buffer_size: Int) -> DecompressionContext 12 + 13 + /// Create a decompression dictionary from binary data 14 + @external(erlang, "ezstd", "create_ddict") 15 + pub fn create_ddict(dict_data: BitArray) -> DecompressionDict 16 + 17 + /// Select a dictionary for the decompression context 18 + /// Note: ezstd returns 'ok' atom, not {ok, undefined} 19 + @external(erlang, "ezstd", "select_ddict") 20 + fn select_ddict_ffi( 21 + dctx: DecompressionContext, 22 + ddict: DecompressionDict, 23 + ) -> atom.Atom 24 + 25 + /// Select a dictionary for the decompression context (safe wrapper) 26 + pub fn select_ddict( 27 + dctx: DecompressionContext, 28 + ddict: DecompressionDict, 29 + ) -> Result(Nil, String) { 30 + let result_atom = select_ddict_ffi(dctx, ddict) 31 + case atom.to_string(result_atom) { 32 + "ok" -> Ok(Nil) 33 + err -> Error("Failed to select dictionary: " <> err) 34 + } 35 + } 36 + 37 + /// Decompress data using a dictionary 38 + /// Note: ezstd can return either BitArray or {error, Binary} 39 + @external(erlang, "goose_ffi", "decompress_using_ddict_safe") 40 + pub fn decompress_using_ddict( 41 + data: BitArray, 42 + ddict: DecompressionDict, 43 + ) -> Result(BitArray, String) 44 + 45 + /// Decompress data using a streaming context (for frames without content size) 46 + /// Note: ezstd can return either BitArray or {error, Binary} 47 + @external(erlang, "goose_ffi", "decompress_streaming_safe") 48 + pub fn decompress_streaming( 49 + dctx: DecompressionContext, 50 + data: BitArray, 51 + ) -> Result(BitArray, String)
+943
src/goose/stratus.gleam
··· 1 + import exception 2 + import gleam/bit_array 3 + import gleam/bool 4 + import gleam/bytes_tree.{type BytesTree} 5 + import gleam/crypto 6 + import gleam/erlang/charlist 7 + import gleam/erlang/process.{type Selector, type Subject} 8 + import gleam/http.{Http, Https} 9 + import gleam/http/request.{type Request} 10 + import gleam/http/response.{type Response} 11 + import gleam/int 12 + import gleam/list 13 + import gleam/option.{type Option, None, Some} 14 + import gleam/otp/actor 15 + import gleam/result 16 + import gleam/string 17 + import gleam/uri 18 + import gramps/http as gramps_http 19 + import gramps/websocket.{ 20 + BinaryFrame, CloseFrame, Continuation, Control, Data as DataFrame, PingFrame, 21 + PongFrame, TextFrame, 22 + } 23 + import gramps/websocket/compression 24 + import logging 25 + import goose/stratus/internal/socket.{ 26 + type Socket, type SocketMessage, Cacerts, Once, Pull, Receive, 27 + } 28 + import goose/stratus/internal/ssl 29 + import goose/stratus/internal/tcp 30 + import goose/stratus/internal/transport.{type Transport, Ssl, Tcp} 31 + 32 + /// This holds some information needed to communicate with the WebSocket. 33 + pub opaque type Connection { 34 + Connection( 35 + socket: Socket, 36 + transport: Transport, 37 + context: Option(compression.Context), 38 + ) 39 + } 40 + 41 + pub type SocketReason { 42 + SocketClosed 43 + Timeout 44 + Badarg 45 + Terminated 46 + Eaddrinuse 47 + Eaddrnotavail 48 + Eafnosupport 49 + Ealready 50 + Econnaborted 51 + Econnrefused 52 + Econnreset 53 + Edestaddrreq 54 + Ehostdown 55 + Ehostunreach 56 + Einprogress 57 + Eisconn 58 + Emsgsize 59 + Enetdown 60 + Enetunreach 61 + Enopkg 62 + Enoprotoopt 63 + Enotconn 64 + Enotty 65 + Enotsock 66 + Eproto 67 + Eprotonosupport 68 + Eprototype 69 + Esocktnosupport 70 + Etimedout 71 + Ewouldblock 72 + Exbadport 73 + Exbadseq 74 + } 75 + 76 + pub type CustomCloseError { 77 + SocketFail(SocketReason) 78 + InvalidCode 79 + } 80 + 81 + fn convert_socket_reason(reason: socket.SocketReason) -> SocketReason { 82 + case reason { 83 + socket.Badarg -> Badarg 84 + socket.Closed -> SocketClosed 85 + socket.Eaddrinuse -> Eaddrinuse 86 + socket.Eaddrnotavail -> Eaddrnotavail 87 + socket.Eafnosupport -> Eafnosupport 88 + socket.Ealready -> Ealready 89 + socket.Econnaborted -> Econnaborted 90 + socket.Econnrefused -> Econnrefused 91 + socket.Econnreset -> Econnreset 92 + socket.Edestaddrreq -> Edestaddrreq 93 + socket.Ehostdown -> Ehostdown 94 + socket.Ehostunreach -> Ehostunreach 95 + socket.Einprogress -> Einprogress 96 + socket.Eisconn -> Eisconn 97 + socket.Emsgsize -> Emsgsize 98 + socket.Enetdown -> Enetdown 99 + socket.Enetunreach -> Enetunreach 100 + socket.Enopkg -> Enopkg 101 + socket.Enoprotoopt -> Enoprotoopt 102 + socket.Enotconn -> Enotconn 103 + socket.Enotsock -> Enotsock 104 + socket.Enotty -> Enotty 105 + socket.Eproto -> Eproto 106 + socket.Eprotonosupport -> Eprotonosupport 107 + socket.Eprototype -> Eprototype 108 + socket.Esocktnosupport -> Esocktnosupport 109 + socket.Etimedout -> Etimedout 110 + socket.Ewouldblock -> Ewouldblock 111 + socket.Exbadport -> Exbadport 112 + socket.Exbadseq -> Exbadseq 113 + socket.Terminated -> Terminated 114 + socket.Timeout -> Timeout 115 + } 116 + } 117 + 118 + fn from_socket_message(msg: SocketMessage) -> InternalMessage(user_message) { 119 + case msg { 120 + socket.Data(bits) -> Data(bits) 121 + socket.Err(reason) -> Err(convert_socket_reason(reason)) 122 + } 123 + } 124 + 125 + pub opaque type Next(state, user_message) { 126 + Continue(state: state, selector: Option(Selector(user_message))) 127 + NormalStop 128 + AbnormalStop(reason: String) 129 + } 130 + 131 + pub fn continue(state: state) -> Next(state, user_message) { 132 + Continue(state, None) 133 + } 134 + 135 + pub fn with_selector( 136 + next: Next(state, user_message), 137 + selector: Selector(user_message), 138 + ) -> Next(state, user_message) { 139 + case next { 140 + Continue(state, _) -> Continue(state, Some(selector)) 141 + _ -> next 142 + } 143 + } 144 + 145 + pub fn stop() -> Next(state, user_message) { 146 + NormalStop 147 + } 148 + 149 + pub fn stop_abnormal(reason: String) -> Next(state, user_message) { 150 + AbnormalStop(reason) 151 + } 152 + 153 + /// These are the messages emitted or received by the underlying process. You 154 + /// should only need to interact with `Message` below. 155 + pub opaque type InternalMessage(user_message) { 156 + UserMessage(user_message) 157 + Err(SocketReason) 158 + Data(BitArray) 159 + Closed 160 + Shutdown 161 + } 162 + 163 + /// This is the type of message your handler might receive. 164 + pub type Message(user_message) { 165 + Text(String) 166 + Binary(BitArray) 167 + User(user_message) 168 + } 169 + 170 + pub opaque type Builder(state, user_message) { 171 + Builder( 172 + request: Request(String), 173 + connect_timeout: Int, 174 + init: fn() -> Result(Initialised(state, user_message), String), 175 + loop: fn(state, Message(user_message), Connection) -> 176 + Next(state, user_message), 177 + on_close: fn(state) -> Nil, 178 + ) 179 + } 180 + 181 + // and `on_close`. 182 + /// This creates a builder to set up a WebSocket actor. This will use default 183 + /// values for the connection initialization timeout, and provide an empty 184 + /// function to be called when the server closes the connection. If you want to 185 + /// customize either of those, see the helper functions `with_connect_timeout` 186 + pub fn new( 187 + request req: Request(String), 188 + state state: state, 189 + ) -> Builder(state, user_message) { 190 + Builder( 191 + request: req, 192 + connect_timeout: 5000, 193 + init: fn() { Ok(initialised(state)) }, 194 + loop: fn(state, _msg, _conn) { continue(state) }, 195 + on_close: fn(_state) { Nil }, 196 + ) 197 + } 198 + 199 + pub type Initialised(state, user_message) { 200 + Initialised(state: state, selector: Option(Selector(user_message))) 201 + } 202 + 203 + pub fn initialised(state: state) -> Initialised(state, user_message) { 204 + Initialised(state:, selector: None) 205 + } 206 + 207 + pub fn selecting( 208 + initialised: Initialised(state, old_message), 209 + selector: Selector(user_message), 210 + ) -> Initialised(state, user_message) { 211 + Initialised(state: initialised.state, selector: Some(selector)) 212 + } 213 + 214 + pub fn new_with_initialiser( 215 + request req: Request(String), 216 + init init: fn() -> Result(Initialised(state, user_message), String), 217 + ) -> Builder(state, user_message) { 218 + Builder( 219 + request: req, 220 + connect_timeout: 5000, 221 + init: init, 222 + loop: fn(state, _msg, _conn) { continue(state) }, 223 + on_close: fn(_state) { Nil }, 224 + ) 225 + } 226 + 227 + pub fn on_message( 228 + builder: Builder(state, user_message), 229 + on_message: fn(state, Message(user_message), Connection) -> 230 + Next(state, user_message), 231 + ) -> Builder(state, user_message) { 232 + Builder(..builder, loop: on_message) 233 + } 234 + 235 + /// This sets the maximum amount of time you are willing to wait for both 236 + /// connecting to the server and receiving the upgrade response. This means 237 + /// that it may take up to `timeout * 2` to begin sending or receiving messages. 238 + /// This value defaults to 5 seconds. 239 + pub fn with_connect_timeout( 240 + builder: Builder(state, user_message), 241 + timeout: Int, 242 + ) -> Builder(state, user_message) { 243 + Builder(..builder, connect_timeout: timeout) 244 + } 245 + 246 + /// You can provide a function to be called when the connection is closed. This 247 + /// function receives the last value for the state of the WebSocket. 248 + /// 249 + /// NOTE: If you manually call `stratus.close`, this function will not be 250 + /// called. I'm unsure right now if this is a bug or working as intended. But 251 + /// you will be in the loop with the state value handy. 252 + pub fn on_close( 253 + builder: Builder(state, user_message), 254 + on_close: fn(state) -> Nil, 255 + ) -> Builder(state, user_message) { 256 + Builder(..builder, on_close: on_close) 257 + } 258 + 259 + type State(state, user_message) { 260 + State( 261 + buffer: BitArray, 262 + incomplete: Option(websocket.Frame), 263 + self: Subject(InternalMessage(user_message)), 264 + socket: Socket, 265 + user_state: state, 266 + compression: Option(compression.Compression), 267 + ) 268 + } 269 + 270 + /// These are the possible failures when calling `stratus.initialize`. 271 + pub type InitializationError { 272 + // The WebSocket handshake failed for the provided reason 273 + HandshakeFailed(HandshakeError) 274 + // The actor failed to start, most likely due to a timeout in your `init` 275 + ActorFailed(actor.StartError) 276 + // 277 + FailedToTransferSocket(SocketReason) 278 + } 279 + 280 + /// This opens the WebSocket connection with the provided `Builder`. It makes 281 + /// some assumptions about the request if you do not provide it. It will use 282 + /// ports 80 or 443 for `ws` or `wss` respectively. 283 + /// 284 + /// It will open the connection and perform the WebSocket handshake. If this 285 + /// fails, the actor will fail to start with the given reason as a string value. 286 + /// 287 + /// After that, received messages will be passed to your loop, and you can use 288 + /// the helper functions to send messages to the server. The `close` method will 289 + /// send a close frame and end the connection. 290 + pub fn start( 291 + builder: Builder(state, user_message), 292 + ) -> Result( 293 + actor.Started(Subject(InternalMessage(user_message))), 294 + InitializationError, 295 + ) { 296 + let transport = case builder.request.scheme { 297 + Https -> Ssl 298 + _ -> Tcp 299 + } 300 + 301 + let handshake_result = 302 + perform_handshake(builder.request, transport, builder.connect_timeout) 303 + |> result.map_error(fn(reason) { 304 + let msg = case reason { 305 + UpgradeFailed(resp) -> 306 + "WebSocket handshake failed with status " 307 + <> int.to_string(resp.status) 308 + reason -> "WebSocket handshake failed: " <> string.inspect(reason) 309 + } 310 + logging.log(logging.Error, msg) 311 + HandshakeFailed(reason) 312 + }) 313 + 314 + use handshake_response <- result.try(handshake_result) 315 + logging.log(logging.Debug, "Handshake successful") 316 + 317 + let extensions = 318 + handshake_response.response 319 + |> response.get_header("sec-websocket-extensions") 320 + |> result.map(string.split(_, "; ")) 321 + |> result.unwrap([]) 322 + 323 + let context_takeovers = websocket.get_context_takeovers(extensions) 324 + 325 + let assert Ok(_) = 326 + transport.set_opts( 327 + transport, 328 + handshake_response.socket, 329 + socket.convert_options([Receive(Once)]), 330 + ) 331 + 332 + actor.new_with_initialiser(1000, fn(subject) { 333 + let started_selector = process.select(process.new_selector(), subject) 334 + logging.log(logging.Debug, "Calling user initializer") 335 + use Initialised(user_state, user_selector) <- result.try(builder.init()) 336 + let selector = case user_selector { 337 + Some(selector) -> { 338 + selector 339 + |> process.map_selector(UserMessage) 340 + |> process.merge_selector(started_selector) 341 + |> process.merge_selector( 342 + process.map_selector(socket.selector(), fn(msg) { 343 + let assert Ok(msg) = msg 344 + from_socket_message(msg) 345 + }), 346 + ) 347 + } 348 + _ -> 349 + started_selector 350 + |> process.merge_selector( 351 + process.map_selector(socket.selector(), fn(msg) { 352 + let assert Ok(msg) = msg 353 + from_socket_message(msg) 354 + }), 355 + ) 356 + } 357 + let context = case websocket.has_deflate(extensions) { 358 + True -> Some(compression.init(context_takeovers)) 359 + False -> None 360 + } 361 + State( 362 + buffer: <<>>, 363 + incomplete: None, 364 + self: subject, 365 + socket: handshake_response.socket, 366 + user_state: user_state, 367 + compression: context, 368 + ) 369 + |> actor.initialised 370 + |> actor.selecting(selector) 371 + |> actor.returning(subject) 372 + |> Ok 373 + }) 374 + |> actor.on_message(fn(state, message) { 375 + case message { 376 + UserMessage(user_message) -> { 377 + let conn = 378 + Connection( 379 + state.socket, 380 + transport, 381 + option.map(state.compression, fn(context) { context.deflate }), 382 + ) 383 + let res = 384 + exception.rescue(fn() { 385 + builder.loop(state.user_state, User(user_message), conn) 386 + }) 387 + case res { 388 + // TODO: de-dupe this 389 + Ok(Continue(user_state, user_selector)) -> { 390 + let new_state = State(..state, user_state: user_state) 391 + case user_selector { 392 + Some(user_selector) -> { 393 + let selector = 394 + user_selector 395 + |> process.map_selector(UserMessage) 396 + |> process.merge_selector( 397 + process.map_selector(socket.selector(), fn(msg) { 398 + let assert Ok(msg) = msg 399 + from_socket_message(msg) 400 + }), 401 + ) 402 + new_state 403 + |> actor.continue 404 + |> actor.with_selector(selector) 405 + } 406 + _ -> actor.continue(new_state) 407 + } 408 + } 409 + Ok(NormalStop) -> actor.stop() 410 + Ok(AbnormalStop(reason)) -> actor.stop_abnormal(reason) 411 + Error(reason) -> { 412 + logging.log( 413 + logging.Error, 414 + "Caught error in user handler: " <> string.inspect(reason), 415 + ) 416 + actor.continue(state) 417 + } 418 + } 419 + } 420 + Err(reason) -> { 421 + close_contexts(state.compression) 422 + actor.stop_abnormal(string.inspect(reason)) 423 + } 424 + Data(bits) -> { 425 + let conn = 426 + Connection( 427 + state.socket, 428 + transport, 429 + option.map(state.compression, fn(context) { context.deflate }), 430 + ) 431 + let #(frames, rest) = 432 + websocket.decode_many_frames( 433 + bit_array.append(state.buffer, bits), 434 + option.map(state.compression, fn(context) { context.inflate }), 435 + [], 436 + ) 437 + let frames = websocket.aggregate_frames(frames, state.incomplete, []) 438 + case frames { 439 + Error(Nil) -> continue(state) 440 + Ok(frames) -> { 441 + list.fold_until(frames, continue(state), fn(acc, frame) { 442 + let assert Continue(prev_state, _selector) = acc 443 + case handle_frame(builder, prev_state, conn, frame) { 444 + Continue(..) as next -> list.Continue(next) 445 + err -> list.Stop(err) 446 + } 447 + }) 448 + } 449 + } 450 + |> fn(next) { 451 + case next { 452 + Continue(state, selector) -> { 453 + let assert Ok(_) = 454 + transport.set_opts( 455 + transport, 456 + state.socket, 457 + socket.convert_options([Receive(Once)]), 458 + ) 459 + let next = actor.continue(State(..state, buffer: rest)) 460 + case selector { 461 + Some(selector) -> actor.with_selector(next, selector) 462 + _ -> next 463 + } 464 + } 465 + NormalStop -> { 466 + close_contexts(state.compression) 467 + actor.stop() 468 + } 469 + AbnormalStop(reason) -> { 470 + close_contexts(state.compression) 471 + actor.stop_abnormal(reason) 472 + } 473 + } 474 + } 475 + } 476 + Closed -> { 477 + logging.log(logging.Debug, "Received closed frame") 478 + builder.on_close(state.user_state) 479 + close_contexts(state.compression) 480 + actor.stop() 481 + } 482 + // TODO: handle shutdown better? 483 + Shutdown -> { 484 + logging.log(logging.Debug, "Received shutdown messag") 485 + close_contexts(state.compression) 486 + actor.stop() 487 + } 488 + } 489 + }) 490 + |> actor.start 491 + |> result.map_error(ActorFailed) 492 + |> result.try(fn(started) { 493 + case transport { 494 + Tcp -> tcp.controlling_process(handshake_response.socket, started.pid) 495 + Ssl -> ssl.controlling_process(handshake_response.socket, started.pid) 496 + } 497 + |> result.map_error(fn(socket_reason) { 498 + FailedToTransferSocket(convert_socket_reason(socket_reason)) 499 + }) 500 + |> result.map(fn(_nil) { started }) 501 + }) 502 + |> result.map(fn(started) { 503 + let _ = case handshake_response.buffer { 504 + <<>> -> Nil 505 + data -> process.send(started.data, Data(data)) 506 + } 507 + started 508 + }) 509 + } 510 + 511 + fn handle_frame( 512 + builder: Builder(user_state, user_message), 513 + state: State(user_state, user_message), 514 + conn: Connection, 515 + frame: websocket.Frame, 516 + ) -> Next(State(user_state, user_message), InternalMessage(user_message)) { 517 + case frame { 518 + DataFrame(TextFrame(payload: data)) -> { 519 + let assert Ok(str) = bit_array.to_string(data) 520 + let res = 521 + exception.rescue(fn() { 522 + builder.loop(state.user_state, Text(str), conn) 523 + }) 524 + case res { 525 + // TODO: de-dupe this 526 + Ok(Continue(user_state, user_selector)) -> { 527 + let new_state = State(..state, user_state: user_state) 528 + case user_selector { 529 + Some(user_selector) -> { 530 + let selector = 531 + user_selector 532 + |> process.map_selector(UserMessage) 533 + |> process.merge_selector( 534 + process.map_selector(socket.selector(), fn(msg) { 535 + let assert Ok(msg) = msg 536 + from_socket_message(msg) 537 + }), 538 + ) 539 + Continue(new_state, Some(selector)) 540 + } 541 + _ -> continue(new_state) 542 + } 543 + } 544 + Ok(NormalStop) -> NormalStop 545 + Ok(AbnormalStop(reason)) -> AbnormalStop(reason) 546 + Error(reason) -> { 547 + logging.log( 548 + logging.Error, 549 + "Caught error in user handler: " <> string.inspect(reason), 550 + ) 551 + continue(state) 552 + } 553 + } 554 + } 555 + DataFrame(BinaryFrame(payload: data)) -> { 556 + let res = 557 + exception.rescue(fn() { 558 + builder.loop(state.user_state, Binary(data), conn) 559 + }) 560 + case res { 561 + // TODO: de-dupe this 562 + Ok(Continue(user_state, user_selector)) -> { 563 + let new_state = State(..state, user_state: user_state) 564 + case user_selector { 565 + Some(user_selector) -> { 566 + let selector = 567 + user_selector 568 + |> process.map_selector(UserMessage) 569 + |> process.merge_selector( 570 + process.map_selector(socket.selector(), fn(msg) { 571 + let assert Ok(msg) = msg 572 + from_socket_message(msg) 573 + }), 574 + ) 575 + Continue(new_state, Some(selector)) 576 + } 577 + _ -> continue(new_state) 578 + } 579 + } 580 + Ok(NormalStop) -> NormalStop 581 + Ok(AbnormalStop(reason)) -> AbnormalStop(reason) 582 + Error(reason) -> { 583 + logging.log( 584 + logging.Error, 585 + "Caught error in user handler: " <> string.inspect(reason), 586 + ) 587 + continue(state) 588 + } 589 + } 590 + } 591 + Control(PingFrame(payload)) -> { 592 + let mask = Some(<<0:unit(8)-size(4)>>) 593 + let frame = websocket.encode_pong_frame(payload, mask) 594 + 595 + let _ = transport.send(conn.transport, conn.socket, frame) 596 + continue(state) 597 + } 598 + Control(PongFrame(..)) -> { 599 + continue(state) 600 + } 601 + Control(CloseFrame(reason)) -> { 602 + logging.log( 603 + logging.Debug, 604 + "WebSocket closing: " <> string.inspect(reason), 605 + ) 606 + builder.on_close(state.user_state) 607 + NormalStop 608 + } 609 + Continuation(..) -> { 610 + continue(state) 611 + } 612 + } 613 + } 614 + 615 + /// The `Subject` returned from `initialize` is an opaque type. In order to 616 + /// send custom messages to your process, you can do this mapping. 617 + /// 618 + /// For example: 619 + /// ```gleam 620 + /// // using `process.send` 621 + /// MyMessage(some_data) 622 + /// |> stratus.to_user_message 623 + /// |> process.send(stratus_subject, _) 624 + /// // using `process.call` 625 + /// process.call(stratus_subject, fn(subj) { 626 + /// stratus.to_user_message(MyMessage(some_data, subj)) 627 + /// }) 628 + /// ``` 629 + pub fn to_user_message( 630 + user_message: user_message, 631 + ) -> InternalMessage(user_message) { 632 + UserMessage(user_message) 633 + } 634 + 635 + /// From within the actor loop, this is how you send a WebSocket text frame. 636 + /// This must be valid UTF-8, so it is a `String`. 637 + pub fn send_text_message( 638 + conn: Connection, 639 + msg: String, 640 + ) -> Result(Nil, SocketReason) { 641 + let frame = 642 + websocket.encode_text_frame( 643 + msg, 644 + conn.context, 645 + Some(crypto.strong_random_bytes(4)), 646 + ) 647 + transport.send(conn.transport, conn.socket, frame) 648 + |> result.map_error(convert_socket_reason) 649 + } 650 + 651 + /// From within the actor loop, this is how you send a WebSocket text frame. 652 + pub fn send_binary_message( 653 + conn: Connection, 654 + msg: BitArray, 655 + ) -> Result(Nil, SocketReason) { 656 + let frame = 657 + websocket.encode_binary_frame( 658 + msg, 659 + conn.context, 660 + Some(crypto.strong_random_bytes(4)), 661 + ) 662 + transport.send(conn.transport, conn.socket, frame) 663 + |> result.map_error(convert_socket_reason) 664 + } 665 + 666 + /// Send a ping frame with some data. 667 + pub fn send_ping(conn: Connection, data: BitArray) -> Result(Nil, SocketReason) { 668 + let size = bit_array.byte_size(data) 669 + let mask = case size { 670 + 0 -> Some(<<0:size(4)>>) 671 + _n -> Some(crypto.strong_random_bytes(4)) 672 + } 673 + let frame = websocket.encode_ping_frame(data, mask) 674 + transport.send(conn.transport, conn.socket, frame) 675 + |> result.map_error(convert_socket_reason) 676 + } 677 + 678 + pub type CloseReason { 679 + NotProvided 680 + /// Status code: 1000 681 + Normal(body: BitArray) 682 + /// Status code: 1001 683 + GoingAway(body: BitArray) 684 + /// Status code: 1002 685 + ProtocolError(body: BitArray) 686 + /// Status code: 1003 687 + UnexpectedDataType(body: BitArray) 688 + /// Status code: 1007 689 + InconsistentDataType(body: BitArray) 690 + /// Status code: 1008 691 + PolicyViolation(body: BitArray) 692 + /// Status code: 1009 693 + MessageTooBig(body: BitArray) 694 + /// Status code: 1010 695 + MissingExtensions(body: BitArray) 696 + /// Status code: 1011 697 + UnexpectedCondition(body: BitArray) 698 + /// Use [`close_custom`](#close_custom) to send a custom close code. 699 + Custom(CustomCloseReason) 700 + } 701 + 702 + /// Use [`close_custom`](#close_custom) to send a custom close code. 703 + pub opaque type CustomCloseReason { 704 + CustomCloseReason(code: Int, body: BitArray) 705 + } 706 + 707 + fn convert_close_reason(reason: CloseReason) -> websocket.CloseReason { 708 + case reason { 709 + NotProvided -> websocket.NotProvided 710 + GoingAway(body:) -> websocket.GoingAway(body:) 711 + InconsistentDataType(body:) -> websocket.InconsistentDataType(body:) 712 + MessageTooBig(body:) -> websocket.MessageTooBig(body:) 713 + MissingExtensions(body:) -> websocket.MissingExtensions(body:) 714 + Normal(body:) -> websocket.Normal(body:) 715 + PolicyViolation(body:) -> websocket.PolicyViolation(body:) 716 + ProtocolError(body:) -> websocket.ProtocolError(body:) 717 + UnexpectedCondition(body:) -> websocket.UnexpectedCondition(body:) 718 + UnexpectedDataType(body:) -> websocket.UnexpectedDataType(body:) 719 + Custom(CustomCloseReason(code:, body:)) -> 720 + websocket.CustomCloseReason(code:, body:) 721 + } 722 + } 723 + 724 + /// Closes the connection with a custom close code between 1000 and 4999. 725 + pub fn close_custom( 726 + conn: Connection, 727 + code code: Int, 728 + body body: BitArray, 729 + ) -> Result(Nil, CustomCloseError) { 730 + use <- bool.guard( 731 + when: code >= 5000 || code < 1000, 732 + return: Error(InvalidCode), 733 + ) 734 + 735 + close(conn, Custom(CustomCloseReason(code:, body:))) 736 + |> result.map_error(SocketFail) 737 + } 738 + 739 + pub fn close( 740 + conn: Connection, 741 + because reason: CloseReason, 742 + ) -> Result(Nil, SocketReason) { 743 + let reason = convert_close_reason(reason) 744 + let mask = crypto.strong_random_bytes(4) 745 + let frame = websocket.encode_close_frame(reason, Some(mask)) 746 + 747 + transport.send(conn.transport, conn.socket, frame) 748 + |> result.map_error(convert_socket_reason) 749 + } 750 + 751 + fn make_upgrade(req: Request(String)) -> BytesTree { 752 + let user_headers = case req.headers { 753 + [] -> "" 754 + _ -> 755 + req.headers 756 + |> list.filter(fn(pair) { 757 + let #(key, _value) = pair 758 + key != "host" 759 + && key != "upgrade" 760 + && key != "connection" 761 + && key != "sec-websocket-key" 762 + && key != "sec-websocket-version" 763 + }) 764 + |> list.map(fn(pair) { 765 + let #(key, value) = pair 766 + key <> ": " <> value 767 + }) 768 + |> string.join("\r\n") 769 + |> string.append("\r\n") 770 + } 771 + 772 + let path = case req.path { 773 + "" -> "/" 774 + path -> path 775 + } 776 + 777 + let query = 778 + req 779 + |> request.get_query 780 + |> result.map(uri.query_to_string) 781 + |> fn(str) { 782 + case str { 783 + Ok("") -> "" 784 + Ok(str) -> "?" <> str 785 + _ -> "" 786 + } 787 + } 788 + 789 + let port = 790 + req.port 791 + |> option.map(fn(port) { ":" <> int.to_string(port) }) 792 + |> option.unwrap("") 793 + 794 + bytes_tree.new() 795 + |> bytes_tree.append_string("GET " <> path <> query <> " HTTP/1.1\r\n") 796 + |> bytes_tree.append_string("host: " <> req.host <> port <> "\r\n") 797 + |> bytes_tree.append_string("upgrade: websocket\r\n") 798 + |> bytes_tree.append_string("connection: upgrade\r\n") 799 + |> bytes_tree.append_string( 800 + "sec-websocket-key: " <> websocket.make_client_key() <> "\r\n", 801 + ) 802 + |> bytes_tree.append_string("sec-websocket-version: 13\r\n") 803 + |> bytes_tree.append_string( 804 + "sec-websocket-extensions: permessage-deflate\r\n", 805 + ) 806 + |> bytes_tree.append_string(user_headers) 807 + |> bytes_tree.append_string("\r\n") 808 + } 809 + 810 + pub type HandshakeError { 811 + Sock(SocketReason) 812 + Protocol(BitArray) 813 + UpgradeFailed(Response(BitArray)) 814 + } 815 + 816 + type HandshakeResponse { 817 + HandshakeResponse( 818 + socket: Socket, 819 + response: Response(BitArray), 820 + buffer: BitArray, 821 + ) 822 + } 823 + 824 + fn perform_handshake( 825 + req: Request(String), 826 + transport: Transport, 827 + timeout: Int, 828 + ) -> Result(HandshakeResponse, HandshakeError) { 829 + let certs = case req.scheme { 830 + Https -> { 831 + let assert Ok(_ok) = ssl.start() 832 + [Cacerts(socket.get_certs()), socket.get_custom_matcher()] 833 + } 834 + Http -> [] 835 + } 836 + 837 + let opts = 838 + socket.convert_options( 839 + list.append(socket.default_options, [Receive(Pull), ..certs]), 840 + ) 841 + 842 + let port = 843 + option.lazy_unwrap(req.port, fn() { 844 + case transport { 845 + Ssl -> 443 846 + Tcp -> 80 847 + } 848 + }) 849 + 850 + logging.log( 851 + logging.Debug, 852 + "Making request to " <> req.host <> " at " <> int.to_string(port), 853 + ) 854 + 855 + use socket <- result.try( 856 + result.map_error( 857 + transport.connect( 858 + transport, 859 + charlist.from_string(req.host), 860 + port, 861 + opts, 862 + timeout, 863 + ), 864 + fn(err) { Sock(convert_socket_reason(err)) }, 865 + ), 866 + ) 867 + 868 + let upgrade_req = make_upgrade(req) 869 + 870 + use _nil <- result.try( 871 + result.map_error(transport.send(transport, socket, upgrade_req), fn(err) { 872 + Sock(convert_socket_reason(err)) 873 + }), 874 + ) 875 + 876 + logging.log( 877 + logging.Debug, 878 + "Sent upgrade request, waiting " <> int.to_string(timeout), 879 + ) 880 + 881 + use resp <- result.try( 882 + result.map_error( 883 + transport.receive_timeout(transport, socket, 0, timeout), 884 + fn(err) { Sock(convert_socket_reason(err)) }, 885 + ), 886 + ) 887 + 888 + resp 889 + |> gramps_http.read_response 890 + |> result.map_error(fn(_err) { Protocol(resp) }) 891 + |> result.try(fn(pair) { 892 + let #(resp, body) = pair 893 + let body_size = 894 + resp.headers 895 + |> list.key_find("content-length") 896 + |> result.try(int.parse) 897 + |> result.unwrap(0) 898 + case read_body(transport, socket, timeout, body_size, body) { 899 + Ok(#(body, rest)) -> { 900 + Ok(#(response.set_body(resp, body), rest)) 901 + } 902 + Error(reason) -> Error(Sock(reason)) 903 + } 904 + }) 905 + |> result.try(fn(pair) { 906 + let #(resp, buffer) = pair 907 + case resp.status { 908 + 101 -> Ok(HandshakeResponse(socket:, response: resp, buffer:)) 909 + _ -> Error(UpgradeFailed(resp)) 910 + } 911 + }) 912 + } 913 + 914 + fn read_body( 915 + transport: Transport, 916 + socket: Socket, 917 + timeout: Int, 918 + length: Int, 919 + body: BitArray, 920 + ) -> Result(#(BitArray, BitArray), SocketReason) { 921 + case body { 922 + <<data:bytes-size(length), rest:bits>> -> Ok(#(data, rest)) 923 + _ -> { 924 + case transport.receive_timeout(transport, socket, 0, timeout) { 925 + Ok(data) -> { 926 + read_body(transport, socket, timeout, length, <<body:bits, data:bits>>) 927 + } 928 + Error(reason) -> Error(convert_socket_reason(reason)) 929 + } 930 + } 931 + } 932 + } 933 + 934 + fn close_contexts(contexts: Option(compression.Compression)) -> Nil { 935 + case contexts { 936 + Some(compression) -> { 937 + compression.close(compression.deflate) 938 + compression.close(compression.inflate) 939 + Nil 940 + } 941 + _ -> Nil 942 + } 943 + }
+174
src/goose/stratus/internal/socket.gleam
··· 1 + import gleam/dynamic.{type Dynamic} 2 + import gleam/dynamic/decode 3 + import gleam/erlang/atom.{type Atom} 4 + import gleam/erlang/process.{type Selector} 5 + import gleam/list 6 + 7 + pub type Socket 8 + 9 + pub type SocketReason { 10 + Closed 11 + Timeout 12 + Badarg 13 + Terminated 14 + Eaddrinuse 15 + Eaddrnotavail 16 + Eafnosupport 17 + Ealready 18 + Econnaborted 19 + Econnrefused 20 + Econnreset 21 + Edestaddrreq 22 + Ehostdown 23 + Ehostunreach 24 + Einprogress 25 + Eisconn 26 + Emsgsize 27 + Enetdown 28 + Enetunreach 29 + Enopkg 30 + Enoprotoopt 31 + Enotconn 32 + Enotty 33 + Enotsock 34 + Eproto 35 + Eprotonosupport 36 + Eprototype 37 + Esocktnosupport 38 + Etimedout 39 + Ewouldblock 40 + Exbadport 41 + Exbadseq 42 + } 43 + 44 + pub type TcpOption = 45 + #(Atom, Dynamic) 46 + 47 + pub type ReceiveMode { 48 + Count(Int) 49 + Once 50 + Pull 51 + All 52 + } 53 + 54 + pub type PacketType { 55 + Binary 56 + List 57 + } 58 + 59 + pub type Options { 60 + Receive(ReceiveMode) 61 + PacketsOf(PacketType) 62 + SendTimeout(Int) 63 + SendTimeoutClose(Bool) 64 + Reuseaddr(Bool) 65 + Nodelay(Bool) 66 + Cacerts(Dynamic) 67 + CustomizeHostnameCheck(Dynamic) 68 + } 69 + 70 + pub const default_options = [ 71 + PacketsOf(Binary), 72 + SendTimeout(30_000), 73 + SendTimeoutClose(True), 74 + Reuseaddr(True), 75 + Nodelay(True), 76 + ] 77 + 78 + @external(erlang, "gleam@function", "identity") 79 + fn from(value: a) -> Dynamic 80 + 81 + pub fn convert_options(options: List(Options)) -> List(TcpOption) { 82 + let active = atom.create("active") 83 + list.map(options, fn(opt) { 84 + case opt { 85 + Receive(Count(count)) -> #(active, dynamic.int(count)) 86 + Receive(Once) -> #(active, from(atom.create("once"))) 87 + Receive(Pull) -> #(active, dynamic.bool(False)) 88 + Receive(All) -> #(active, dynamic.bool(True)) 89 + PacketsOf(Binary) -> #(atom.create("mode"), from(Binary)) 90 + PacketsOf(List) -> #(atom.create("mode"), from(List)) 91 + Cacerts(data) -> #(atom.create("cacerts"), data) 92 + Nodelay(bool) -> #(atom.create("nodelay"), dynamic.bool(bool)) 93 + Reuseaddr(bool) -> #(atom.create("reuseaddr"), dynamic.bool(bool)) 94 + SendTimeout(int) -> #(atom.create("send_timeout"), dynamic.int(int)) 95 + SendTimeoutClose(bool) -> #( 96 + atom.create("send_timeout_close"), 97 + dynamic.bool(bool), 98 + ) 99 + CustomizeHostnameCheck(funcs) -> #( 100 + atom.create("customize_hostname_check"), 101 + funcs, 102 + ) 103 + } 104 + }) 105 + } 106 + 107 + pub type Shutdown { 108 + Read 109 + Write 110 + ReadWrite 111 + } 112 + 113 + pub type SocketMessage { 114 + Data(BitArray) 115 + Err(SocketReason) 116 + } 117 + 118 + type ErlangSocketMessage { 119 + Ssl 120 + SslClosed 121 + SslError 122 + Tcp 123 + TcpClosed 124 + TcpError 125 + } 126 + 127 + pub fn selector() -> Selector(Result(SocketMessage, List(decode.DecodeError))) { 128 + process.new_selector() 129 + |> process.select_record(Tcp, 2, fn(data) { 130 + { 131 + use msg <- decode.field(2, decode.bit_array) 132 + decode.success(Data(msg)) 133 + } 134 + |> decode.run(data, _) 135 + }) 136 + |> process.select_record(Ssl, 2, fn(data) { 137 + { 138 + use msg <- decode.field(2, decode.bit_array) 139 + decode.success(Data(msg)) 140 + } 141 + |> decode.run(data, _) 142 + }) 143 + |> process.select_record(SslClosed, 1, fn(_socket) { Ok(Err(Closed)) }) 144 + |> process.select_record(TcpClosed, 1, fn(_socket) { Ok(Err(Closed)) }) 145 + |> process.select_record(TcpError, 2, fn(data) { 146 + { 147 + use reason <- decode.field(2, atom.decoder()) 148 + case parse_known_socket_reason(reason) { 149 + Ok(socket_reason) -> decode.success(Err(socket_reason)) 150 + Error(_) -> decode.failure(Err(Badarg), "SocketReason") 151 + } 152 + } 153 + |> decode.run(data, _) 154 + }) 155 + |> process.select_record(SslError, 2, fn(data) { 156 + { 157 + use reason <- decode.field(2, atom.decoder()) 158 + case parse_known_socket_reason(reason) { 159 + Ok(socket_reason) -> decode.success(Err(socket_reason)) 160 + Error(_) -> decode.failure(Err(Badarg), "SocketReason") 161 + } 162 + } 163 + |> decode.run(data, _) 164 + }) 165 + } 166 + 167 + @external(erlang, "public_key", "cacerts_get") 168 + pub fn get_certs() -> Dynamic 169 + 170 + @external(erlang, "stratus_ffi", "custom_sni_matcher") 171 + pub fn get_custom_matcher() -> Options 172 + 173 + @external(erlang, "stratus_ffi", "parse_known_socket_reason") 174 + fn parse_known_socket_reason(reason: Atom) -> Result(SocketReason, Dynamic)
+45
src/goose/stratus/internal/ssl.gleam
··· 1 + import gleam/bytes_tree.{type BytesTree} 2 + import gleam/erlang/charlist.{type Charlist} 3 + import gleam/erlang/process.{type Pid} 4 + import goose/stratus/internal/socket.{ 5 + type Shutdown, type Socket, type SocketReason, type TcpOption, 6 + } 7 + 8 + @external(erlang, "ssl", "connect") 9 + pub fn connect( 10 + address: Charlist, 11 + port: Int, 12 + options: List(TcpOption), 13 + timeout: Int, 14 + ) -> Result(Socket, SocketReason) 15 + 16 + @external(erlang, "stratus_ffi", "ssl_shutdown") 17 + pub fn shutdown(socket: Socket, how: Shutdown) -> Result(Nil, SocketReason) 18 + 19 + @external(erlang, "stratus_ffi", "ssl_send") 20 + pub fn send(socket: Socket, packet: BytesTree) -> Result(Nil, SocketReason) 21 + 22 + @external(erlang, "ssl", "recv") 23 + pub fn receive(socket: Socket, length: Int) -> Result(BitArray, SocketReason) 24 + 25 + @external(erlang, "ssl", "recv") 26 + pub fn receive_timeout( 27 + socket: Socket, 28 + length: Int, 29 + timeout: Int, 30 + ) -> Result(BitArray, SocketReason) 31 + 32 + @external(erlang, "stratus_ffi", "ssl_set_opts") 33 + pub fn set_opts( 34 + socket: Socket, 35 + opts: List(TcpOption), 36 + ) -> Result(Nil, SocketReason) 37 + 38 + @external(erlang, "stratus_ffi", "ssl_start") 39 + pub fn start() -> Result(Nil, Nil) 40 + 41 + @external(erlang, "stratus_ffi", "ssl_controlling_process") 42 + pub fn controlling_process( 43 + socket: Socket, 44 + new_owner: Pid, 45 + ) -> Result(Nil, SocketReason)
+42
src/goose/stratus/internal/tcp.gleam
··· 1 + import gleam/bytes_tree.{type BytesTree} 2 + import gleam/erlang/charlist.{type Charlist} 3 + import gleam/erlang/process.{type Pid} 4 + import goose/stratus/internal/socket.{ 5 + type Shutdown, type Socket, type SocketReason, type TcpOption, 6 + } 7 + 8 + @external(erlang, "gen_tcp", "connect") 9 + pub fn connect( 10 + address: Charlist, 11 + port: Int, 12 + options: List(TcpOption), 13 + timeout: Int, 14 + ) -> Result(Socket, SocketReason) 15 + 16 + @external(erlang, "stratus_ffi", "tcp_shutdown") 17 + pub fn shutdown(socket: Socket, how: Shutdown) -> Result(Nil, SocketReason) 18 + 19 + @external(erlang, "stratus_ffi", "tcp_send") 20 + pub fn send(socket: Socket, packet: BytesTree) -> Result(Nil, SocketReason) 21 + 22 + @external(erlang, "gen_tcp", "recv") 23 + pub fn receive(socket: Socket, length: Int) -> Result(BitArray, SocketReason) 24 + 25 + @external(erlang, "gen_tcp", "recv") 26 + pub fn receive_timeout( 27 + socket: Socket, 28 + length: Int, 29 + timeout: Int, 30 + ) -> Result(BitArray, SocketReason) 31 + 32 + @external(erlang, "stratus_ffi", "tcp_set_opts") 33 + pub fn set_opts( 34 + socket: Socket, 35 + opts: List(TcpOption), 36 + ) -> Result(Nil, SocketReason) 37 + 38 + @external(erlang, "stratus_ffi", "tcp_controlling_process") 39 + pub fn controlling_process( 40 + socket: Socket, 41 + new_owner: Pid, 42 + ) -> Result(Nil, SocketReason)
+81
src/goose/stratus/internal/transport.gleam
··· 1 + import gleam/bytes_tree.{type BytesTree} 2 + import gleam/erlang/charlist.{type Charlist} 3 + import goose/stratus/internal/socket.{ 4 + type Shutdown, type Socket, type SocketReason, type TcpOption, 5 + } 6 + import goose/stratus/internal/ssl 7 + import goose/stratus/internal/tcp 8 + 9 + pub type Transport { 10 + Tcp 11 + Ssl 12 + } 13 + 14 + pub fn connect( 15 + transport: Transport, 16 + host: Charlist, 17 + port: Int, 18 + options: List(TcpOption), 19 + timeout: Int, 20 + ) -> Result(Socket, SocketReason) { 21 + case transport { 22 + Ssl -> ssl.connect(host, port, options, timeout) 23 + Tcp -> tcp.connect(host, port, options, timeout) 24 + } 25 + } 26 + 27 + pub fn send( 28 + transport: Transport, 29 + socket: Socket, 30 + data: BytesTree, 31 + ) -> Result(Nil, SocketReason) { 32 + case transport { 33 + Ssl -> ssl.send(socket, data) 34 + Tcp -> tcp.send(socket, data) 35 + } 36 + } 37 + 38 + pub fn receive( 39 + transport: Transport, 40 + socket: Socket, 41 + length: Int, 42 + ) -> Result(BitArray, SocketReason) { 43 + case transport { 44 + Ssl -> ssl.receive(socket, length) 45 + Tcp -> tcp.receive(socket, length) 46 + } 47 + } 48 + 49 + pub fn receive_timeout( 50 + transport: Transport, 51 + socket: Socket, 52 + length: Int, 53 + timeout: Int, 54 + ) -> Result(BitArray, SocketReason) { 55 + case transport { 56 + Ssl -> ssl.receive_timeout(socket, length, timeout) 57 + Tcp -> tcp.receive_timeout(socket, length, timeout) 58 + } 59 + } 60 + 61 + pub fn shutdown( 62 + transport: Transport, 63 + socket: Socket, 64 + how: Shutdown, 65 + ) -> Result(Nil, SocketReason) { 66 + case transport { 67 + Ssl -> ssl.shutdown(socket, how) 68 + Tcp -> tcp.shutdown(socket, how) 69 + } 70 + } 71 + 72 + pub fn set_opts( 73 + transport: Transport, 74 + socket: Socket, 75 + opts: List(TcpOption), 76 + ) -> Result(Nil, SocketReason) { 77 + case transport { 78 + Tcp -> tcp.set_opts(socket, opts) 79 + Ssl -> ssl.set_opts(socket, opts) 80 + } 81 + }
+43 -20
src/goose_ffi.erl
··· 1 1 -module(goose_ffi). 2 - -export([receive_ws_message/0]). 2 + -export([priv_dir/0, decompress_using_ddict_safe/2, decompress_streaming_safe/2]). 3 3 4 - %% Receive a WebSocket text message from the process mailbox 5 - receive_ws_message() -> 6 - receive 7 - %% Handle messages forwarded from handler process 8 - {ws_text, Text} -> 9 - {ok, Text}; 10 - {ws_binary, _Binary} -> 11 - %% Ignore binary messages, try again 12 - receive_ws_message(); 13 - {ws_closed, _Reason} -> 14 - {error, closed}; 15 - {ws_error, _Reason} -> 16 - {error, connection_error}; 17 - _Other -> 18 - %% Ignore unexpected messages 19 - receive_ws_message() 20 - after 60000 -> 21 - %% Timeout - connection is still alive, just no messages 22 - {error, timeout} 4 + %% Get the priv directory for the goose application 5 + priv_dir() -> 6 + case code:priv_dir(goose) of 7 + {error, _} -> "./priv"; 8 + Path when is_list(Path) -> list_to_binary(Path); 9 + Path -> Path 10 + end. 11 + 12 + %% Safe wrapper for ezstd:decompress_using_ddict that always returns Result type 13 + decompress_using_ddict_safe(Data, DDict) -> 14 + case ezstd:decompress_using_ddict(Data, DDict) of 15 + Result when is_binary(Result) -> 16 + {ok, Result}; 17 + Result when is_list(Result) -> 18 + {ok, iolist_to_binary(Result)}; 19 + {error, Err} when is_binary(Err) -> 20 + %% Keep error as binary (Gleam String) 21 + {error, Err}; 22 + {error, Err} when is_list(Err) -> 23 + %% Convert list to binary 24 + {error, list_to_binary(Err)}; 25 + {error, Err} -> 26 + %% Convert any other type to binary 27 + {error, list_to_binary(lists:flatten(io_lib:format("~p", [Err])))} 28 + end. 29 + 30 + %% Safe wrapper for ezstd:decompress_streaming that always returns Result type 31 + decompress_streaming_safe(DCtx, Data) -> 32 + case ezstd:decompress_streaming(DCtx, Data) of 33 + Result when is_binary(Result) -> 34 + {ok, Result}; 35 + Result when is_list(Result) -> 36 + {ok, iolist_to_binary(Result)}; 37 + {error, Err} when is_binary(Err) -> 38 + %% Keep error as binary (Gleam String) 39 + {error, Err}; 40 + {error, Err} when is_list(Err) -> 41 + %% Convert list to binary 42 + {error, list_to_binary(Err)}; 43 + {error, Err} -> 44 + %% Convert any other type to binary 45 + {error, list_to_binary(lists:flatten(io_lib:format("~p", [Err])))} 23 46 end.
-216
src/goose_ws_ffi.erl
··· 1 - -module(goose_ws_ffi). 2 - -export([connect/3]). 3 - 4 - %% Connect to WebSocket using gun 5 - connect(Url, HandlerPid, Compress) -> 6 - %% Start gun application and dependencies 7 - application:ensure_all_started(ssl), 8 - application:ensure_all_started(gun), 9 - 10 - %% Start ezstd if compression is enabled 11 - case Compress of 12 - true -> application:ensure_all_started(ezstd); 13 - _ -> ok 14 - end, 15 - 16 - %% Spawn a connection process that will own the gun connection 17 - Parent = self(), 18 - spawn(fun() -> connect_worker(Url, HandlerPid, Compress, Parent) end), 19 - 20 - %% Wait for the connection result 21 - receive 22 - {connection_result, Result} -> Result 23 - after 60000 -> 24 - {error, connection_timeout} 25 - end. 26 - 27 - %% Worker process that owns the connection 28 - connect_worker(Url, HandlerPid, Compress, Parent) -> 29 - %% Parse URL using uri_string 30 - UriMap = uri_string:parse(Url), 31 - #{scheme := SchemeStr, host := Host, path := Path} = UriMap, 32 - 33 - %% Get query string if present and append to path 34 - Query = maps:get(query, UriMap, undefined), 35 - PathWithQuery = case Query of 36 - undefined -> Path; 37 - <<>> -> Path; 38 - Q -> <<Path/binary, "?", Q/binary>> 39 - end, 40 - 41 - %% Get port, use defaults if not specified 42 - Port = maps:get(port, uri_string:parse(Url), 43 - case SchemeStr of 44 - <<"wss">> -> 443; 45 - <<"ws">> -> 80; 46 - _ -> 443 47 - end), 48 - 49 - %% Determine transport 50 - Transport = case SchemeStr of 51 - <<"wss">> -> tls; 52 - <<"ws">> -> tcp; 53 - _ -> tls 54 - end, 55 - 56 - %% TLS options for secure connections 57 - TlsOpts = [{verify, verify_none}], %% For simplicity, disable cert verification 58 - %% In production, use proper CA certs 59 - 60 - %% Connection options 61 - Opts = case Transport of 62 - tls -> 63 - #{ 64 - transport => tls, 65 - tls_opts => TlsOpts, 66 - protocols => [http], 67 - retry => 10, 68 - retry_timeout => 1000 69 - }; 70 - tcp -> 71 - #{ 72 - transport => tcp, 73 - protocols => [http], 74 - retry => 10, 75 - retry_timeout => 1000 76 - } 77 - end, 78 - 79 - %% Convert host to list if needed 80 - HostStr = case is_binary(Host) of 81 - true -> binary_to_list(Host); 82 - false -> Host 83 - end, 84 - 85 - %% Ensure path with query is binary 86 - PathBin = case is_binary(PathWithQuery) of 87 - true -> PathWithQuery; 88 - false -> list_to_binary(PathWithQuery) 89 - end, 90 - 91 - %% Open connection (this process will be the owner) 92 - case gun:open(HostStr, Port, Opts) of 93 - {ok, ConnPid} -> 94 - %% Monitor the connection 95 - MRef = monitor(process, ConnPid), 96 - 97 - %% Wait for connection 98 - receive 99 - {gun_up, ConnPid, _Protocol} -> 100 - %% Upgrade to WebSocket (compression is controlled via query string, not headers) 101 - StreamRef = gun:ws_upgrade(ConnPid, binary_to_list(PathBin), []), 102 - 103 - %% Wait for upgrade 104 - receive 105 - {gun_upgrade, ConnPid, StreamRef, [<<"websocket">>], _ResponseHeaders} -> 106 - %% Notify parent that connection is ready 107 - Parent ! {connection_result, {ok, ConnPid}}, 108 - %% Now handle messages in this process (the connection owner) 109 - handle_messages(ConnPid, StreamRef, HandlerPid, Compress); 110 - {gun_response, ConnPid, _, _, Status, Headers} -> 111 - gun:close(ConnPid), 112 - Parent ! {connection_result, {error, {upgrade_failed, Status, Headers}}}; 113 - {gun_error, ConnPid, _StreamRef, Reason} -> 114 - gun:close(ConnPid), 115 - Parent ! {connection_result, {error, {gun_error, Reason}}}; 116 - {'DOWN', MRef, process, ConnPid, Reason} -> 117 - Parent ! {connection_result, {error, {connection_down, Reason}}}; 118 - _Other -> 119 - gun:close(ConnPid), 120 - Parent ! {connection_result, {error, unexpected_message}} 121 - after 30000 -> 122 - gun:close(ConnPid), 123 - Parent ! {connection_result, {error, upgrade_timeout}} 124 - end; 125 - {'DOWN', MRef, process, ConnPid, Reason} -> 126 - Parent ! {connection_result, {error, {connection_failed, Reason}}}; 127 - _Other -> 128 - gun:close(ConnPid), 129 - Parent ! {connection_result, {error, unexpected_message}} 130 - after 30000 -> 131 - gun:close(ConnPid), 132 - Parent ! {connection_result, {error, connection_timeout}} 133 - end; 134 - {error, Reason} -> 135 - Parent ! {connection_result, {error, {open_failed, Reason}}} 136 - end. 137 - 138 - %% Handle incoming WebSocket messages 139 - handle_messages(ConnPid, StreamRef, HandlerPid, Compress) -> 140 - %% Load zstd dictionary if compression is enabled 141 - Decompressor = case Compress of 142 - true -> 143 - %% Load dictionary from priv directory 144 - PrivDir = code:priv_dir(goose), 145 - DictPath = filename:join(PrivDir, "zstd_dictionary"), 146 - case file:read_file(DictPath) of 147 - {ok, DictData} -> 148 - %% Create decompression context and dictionary 149 - DCtx = ezstd:create_decompression_context(1024 * 1024), 150 - DDict = ezstd:create_ddict(DictData), 151 - %% Select the dictionary for the decompression context 152 - ok = ezstd:select_ddict(DCtx, DDict), 153 - {ok, {DCtx, DDict}}; 154 - {error, Err} -> 155 - io:format("Failed to load zstd dictionary: ~p~n", [Err]), 156 - {error, Err} 157 - end; 158 - _ -> 159 - none 160 - end, 161 - handle_messages_loop(ConnPid, StreamRef, HandlerPid, Compress, Decompressor). 162 - 163 - %% Message handling loop 164 - handle_messages_loop(ConnPid, StreamRef, HandlerPid, Compress, Decompressor) -> 165 - receive 166 - {gun_ws, _AnyConnPid, _AnyStreamRef, {text, Text}} -> 167 - HandlerPid ! {ws_text, Text}, 168 - handle_messages_loop(ConnPid, StreamRef, HandlerPid, Compress, Decompressor); 169 - {gun_ws, _AnyConnPid, _AnyStreamRef, {binary, Binary}} -> 170 - %% If compression is enabled, decompress the binary data 171 - case {Compress, Decompressor} of 172 - {true, {ok, {DCtx, DDict}}} -> 173 - %% Try decompress_using_ddict first (works for frames with content size) 174 - case ezstd:decompress_using_ddict(Binary, DDict) of 175 - Result when is_binary(Result) -> 176 - HandlerPid ! {ws_text, Result}; 177 - Result when is_list(Result) -> 178 - HandlerPid ! {ws_text, iolist_to_binary(Result)}; 179 - {error, <<"failed to decompress: ZSTD_CONTENTSIZE_UNKNOWN">>} -> 180 - %% Frame doesn't have content size, use streaming with dictionary-loaded context 181 - case ezstd:decompress_streaming(DCtx, Binary) of 182 - StreamResult when is_binary(StreamResult) -> 183 - HandlerPid ! {ws_text, StreamResult}; 184 - StreamResult when is_list(StreamResult) -> 185 - HandlerPid ! {ws_text, iolist_to_binary(StreamResult)}; 186 - {error, _StreamReason} -> 187 - %% Skip frames that fail to decompress 188 - ok 189 - end; 190 - {error, _Reason} -> 191 - %% Skip frames that fail to decompress 192 - ok 193 - end; 194 - _ -> 195 - %% No compression, ignore binary messages 196 - ok 197 - end, 198 - handle_messages_loop(ConnPid, StreamRef, HandlerPid, Compress, Decompressor); 199 - {gun_ws, ConnPid, StreamRef, close} -> 200 - HandlerPid ! {ws_closed, normal}, 201 - gun:close(ConnPid); 202 - {gun_down, ConnPid, _Protocol, Reason, _KilledStreams} -> 203 - HandlerPid ! {ws_error, Reason}, 204 - gun:close(ConnPid); 205 - {gun_error, ConnPid, StreamRef, Reason} -> 206 - HandlerPid ! {ws_error, Reason}, 207 - handle_messages_loop(ConnPid, StreamRef, HandlerPid, Compress, Decompressor); 208 - stop -> 209 - gun:close(ConnPid); 210 - _Other -> 211 - %% Ignore unexpected messages 212 - handle_messages_loop(ConnPid, StreamRef, HandlerPid, Compress, Decompressor) 213 - after 30000 -> 214 - %% Heartbeat every 30 seconds to keep connection alive 215 - handle_messages_loop(ConnPid, StreamRef, HandlerPid, Compress, Decompressor) 216 - end.
+116
src/stratus_ffi.erl
··· 1 + -module(stratus_ffi). 2 + 3 + -export([tcp_shutdown/2, tcp_send/2, ssl_shutdown/2, ssl_send/2, tcp_set_opts/2, 4 + ssl_set_opts/2, ssl_start/0, custom_sni_matcher/0, 5 + ssl_controlling_process/2, tcp_controlling_process/2, 6 + parse_known_socket_reason/1 ]). 7 + 8 + tcp_shutdown(Socket, How) -> 9 + case gen_tcp:shutdown(Socket, How) of 10 + ok -> 11 + {ok, nil}; 12 + {error, Reason} -> 13 + {error, Reason} 14 + end. 15 + 16 + tcp_send(Socket, Packet) -> 17 + case gen_tcp:send(Socket, Packet) of 18 + ok -> 19 + {ok, nil}; 20 + {error, Reason} -> 21 + {error, Reason} 22 + end. 23 + 24 + ssl_shutdown(Socket, How) -> 25 + case ssl:shutdown(Socket, How) of 26 + ok -> 27 + {ok, nil}; 28 + {error, Reason} -> 29 + {error, Reason} 30 + end. 31 + 32 + ssl_send(Socket, Packet) -> 33 + case ssl:send(Socket, Packet) of 34 + ok -> 35 + {ok, nil}; 36 + {error, Reason} -> 37 + {error, Reason} 38 + end. 39 + 40 + tcp_set_opts(Socket, Opts) -> 41 + case inet:setopts(Socket, Opts) of 42 + ok -> 43 + {ok, nil}; 44 + {error, Reason} -> 45 + {error, Reason} 46 + end. 47 + 48 + ssl_set_opts(Socket, Opts) -> 49 + case ssl:setopts(Socket, Opts) of 50 + ok -> 51 + {ok, nil}; 52 + {error, Reason} -> 53 + {error, Reason} 54 + end. 55 + 56 + ssl_start() -> 57 + case ssl:start() of 58 + ok -> 59 + {ok, nil}; 60 + {error, Reason} -> 61 + {error, Reason} 62 + end. 63 + 64 + % Thank you! https://github.com/erlang/otp/issues/4321 65 + custom_sni_matcher() -> 66 + {customize_hostname_check, 67 + [{match_fun, public_key:pkix_verify_hostname_match_fun(https)}]}. 68 + 69 + ssl_controlling_process(Socket, NewOwner) -> 70 + case ssl:controlling_process(Socket, NewOwner) of 71 + ok -> {ok, nil}; 72 + Error -> Error 73 + end. 74 + 75 + tcp_controlling_process(Socket, NewOwner) -> 76 + case gen_tcp:controlling_process(Socket, NewOwner) of 77 + ok -> {ok, nil}; 78 + Error -> Error 79 + end. 80 + 81 + parse_known_socket_reason(Reason) -> 82 + case Reason of 83 + closed -> {ok, closed}; 84 + timeout -> {ok, timeout}; 85 + badarg -> {ok, badarg}; 86 + terminated -> {ok, terminated}; 87 + eaddrinuse -> {ok, eaddrinuse}; 88 + eaddrnotavail -> {ok, eaddrnotavail}; 89 + eafnosupport -> {ok, eafnosupport}; 90 + ealready -> {ok, ealready}; 91 + econnaborted -> {ok, econnaborted}; 92 + econnrefused -> {ok, econnrefused}; 93 + econnreset -> {ok, econnreset}; 94 + edestaddrreq -> {ok, edestaddrreq}; 95 + ehostdown -> {ok, ehostdown}; 96 + ehostunreach -> {ok, ehostunreach}; 97 + einprogress -> {ok, einprogress}; 98 + eisconn -> {ok, eisconn}; 99 + emsgsize -> {ok, emsgsize}; 100 + enetdown -> {ok, enetdown}; 101 + enetunreach -> {ok, enetunreach}; 102 + enopkg -> {ok, enopkg}; 103 + enoprotoopt -> {ok, enoprotoopt}; 104 + enotconn -> {ok, enotconn}; 105 + enotty -> {ok, enotty}; 106 + enotsock -> {ok, enotsock}; 107 + eproto -> {ok, eproto}; 108 + eprotonosupport -> {ok, eprotonosupport}; 109 + eprototype -> {ok, eprototype}; 110 + esocktnosupport -> {ok, esocktnosupport}; 111 + etimedout -> {ok, etimedout}; 112 + ewouldblock -> {ok, ewouldblock}; 113 + exbadport -> {ok, exbadport}; 114 + exbadseq -> {ok, exbadseq}; 115 + Unknown -> {error, Unknown} 116 + end.
-24
test/goose_test.gleam
··· 26 26 max_message_size_bytes: option.None, 27 27 compress: False, 28 28 require_hello: False, 29 - max_backoff_seconds: 60, 30 - log_connection_events: True, 31 - log_retry_attempts: True, 32 29 ) 33 30 let url = goose.build_url(config) 34 31 ··· 47 44 max_message_size_bytes: option.None, 48 45 compress: False, 49 46 require_hello: False, 50 - max_backoff_seconds: 60, 51 - log_connection_events: True, 52 - log_retry_attempts: True, 53 47 ) 54 48 let url = goose.build_url(config) 55 49 ··· 68 62 max_message_size_bytes: option.None, 69 63 compress: False, 70 64 require_hello: False, 71 - max_backoff_seconds: 60, 72 - log_connection_events: True, 73 - log_retry_attempts: True, 74 65 ) 75 66 let url = goose.build_url(config) 76 67 ··· 89 80 max_message_size_bytes: option.None, 90 81 compress: False, 91 82 require_hello: False, 92 - max_backoff_seconds: 60, 93 - log_connection_events: True, 94 - log_retry_attempts: True, 95 83 ) 96 84 let url = goose.build_url(config) 97 85 ··· 110 98 max_message_size_bytes: option.Some(1_048_576), 111 99 compress: False, 112 100 require_hello: False, 113 - max_backoff_seconds: 60, 114 - log_connection_events: True, 115 - log_retry_attempts: True, 116 101 ) 117 102 let url = goose.build_url(config) 118 103 ··· 131 116 max_message_size_bytes: option.None, 132 117 compress: True, 133 118 require_hello: False, 134 - max_backoff_seconds: 60, 135 - log_connection_events: True, 136 - log_retry_attempts: True, 137 119 ) 138 120 let url = goose.build_url(config) 139 121 ··· 152 134 max_message_size_bytes: option.None, 153 135 compress: False, 154 136 require_hello: True, 155 - max_backoff_seconds: 60, 156 - log_connection_events: True, 157 - log_retry_attempts: True, 158 137 ) 159 138 let url = goose.build_url(config) 160 139 ··· 173 152 max_message_size_bytes: option.Some(2_097_152), 174 153 compress: True, 175 154 require_hello: True, 176 - max_backoff_seconds: 60, 177 - log_connection_events: True, 178 - log_retry_attempts: True, 179 155 ) 180 156 let url = goose.build_url(config) 181 157