A Gleam WebSocket consumer for AT Protocol Jetstream events.

Compare changes

Choose any two refs to compare.

+1
.gitignore
··· 2 *.ez 3 **/build 4 erl_crash.dump
··· 2 *.ez 3 **/build 4 erl_crash.dump 5 + .claude
+84
CHANGELOG.md
···
··· 1 + # Changelog 2 + 3 + All notable changes to this project will be documented in this file. 4 + 5 + The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), 6 + and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). 7 + 8 + ## [2.0.0] - 2025-11-04 9 + 10 + ### Changed 11 + - **BREAKING**: Migrated from Erlang's `gun` library to Gleam's `stratus` WebSocket client 12 + - **BREAKING**: Removed three configuration fields from `JetstreamConfig`: 13 + - `max_backoff_seconds` (retry logic still works internally, capped at 60s) 14 + - `log_connection_events` (connection logging removed) 15 + - `log_retry_attempts` (retry logging removed) 16 + - Reduced Erlang FFI code from ~290 lines to ~40 lines 17 + - Improved code maintainability with more idiomatic Gleam implementation 18 + 19 + ### Added 20 + - Vendored `stratus` WebSocket client internally (moved to `goose/stratus`) until a new version is published 21 + - New dependency: `simplifile` (>= 2.0.0 and < 3.0.0) 22 + - New dependencies (from stratus): `gleam_otp`, `gleam_crypto`, `logging`, `exception`, `gramps` 23 + 24 + ### Removed 25 + - Dependency: `gun` (>= 2.2.0 and < 3.0.0) 26 + 27 + ### Migration Guide 28 + If you're upgrading from v1.x, remove the following fields from your `JetstreamConfig`: 29 + ```gleam 30 + // Before (v1.x) 31 + let config = goose.JetstreamConfig( 32 + endpoint: "wss://jetstream2.us-east.bsky.network/subscribe", 33 + wanted_collections: [], 34 + wanted_dids: [], 35 + cursor: option.None, 36 + max_message_size_bytes: option.None, 37 + compress: True, 38 + require_hello: False, 39 + max_backoff_seconds: 60, // Remove this 40 + log_connection_events: True, // Remove this 41 + log_retry_attempts: True, // Remove this 42 + ) 43 + 44 + // After (v2.x) 45 + let config = goose.JetstreamConfig( 46 + endpoint: "wss://jetstream2.us-east.bsky.network/subscribe", 47 + wanted_collections: [], 48 + wanted_dids: [], 49 + cursor: option.None, 50 + max_message_size_bytes: option.None, 51 + compress: True, 52 + require_hello: False, 53 + ) 54 + ``` 55 + 56 + All other functionality remains the same. Automatic retry with exponential backoff still works internally. 57 + 58 + ## [1.1.0] - 2025-01-29 59 + 60 + ### Added 61 + - Automatic retry logic with exponential backoff for all connections 62 + - Three new configuration fields in `JetstreamConfig`: 63 + - `max_backoff_seconds: Int` - Maximum wait time between retries (default: 60) 64 + - `log_connection_events: Bool` - Log connection state changes (default: True) 65 + - `log_retry_attempts: Bool` - Log detailed retry information (default: True) 66 + 67 + ### Changed 68 + - `start_consumer()` now automatically retries failed connections and handles disconnections 69 + - Enhanced error handling distinguishes between harmless timeouts and real connection failures 70 + 71 + ### Fixed 72 + - Connection failures no longer cause application to stop 73 + - Harmless 60-second timeouts no longer trigger unnecessary reconnections 74 + - WebSocket disconnections are handled gracefully with automatic reconnection 75 + 76 + ## [1.0.0] - 2024-10-28 77 + 78 + ### Added 79 + - Initial release 80 + - WebSocket consumer for AT Protocol Jetstream events 81 + - Support for collection and DID filtering 82 + - Zstd compression support 83 + - Cursor-based replay 84 + - Event parsing for Commit, Identity, and Account events
+26 -16
README.md
··· 6 A Gleam WebSocket consumer for AT Protocol Jetstream events. 7 8 ```sh 9 - gleam add goose@1 10 ``` 11 12 - ## Example 13 14 ```gleam 15 import goose 16 import gleam/io 17 - import gleam/option 18 19 pub fn main() { 20 - // Create a default configuration 21 let config = goose.default_config() 22 23 - // Or configure with custom options 24 - let config = goose.JetstreamConfig( 25 - endpoint: "wss://jetstream2.us-east.bsky.network/subscribe", 26 - wanted_collections: ["app.bsky.feed.post", "app.bsky.feed.like"], 27 - wanted_dids: [], 28 - cursor: option.None, 29 - max_message_size_bytes: option.None, 30 - compress: False, 31 - require_hello: False, 32 - ) 33 - 34 - // Start consuming events 35 goose.start_consumer(config, fn(json_event) { 36 let event = goose.parse_event(json_event) 37 ··· 53 } 54 ``` 55 56 ## Configuration Options 57 58 ### `wanted_collections` 59 An array of Collection NSIDs to filter which records you receive (default: empty = all collections)
··· 6 A Gleam WebSocket consumer for AT Protocol Jetstream events. 7 8 ```sh 9 + gleam add goose 10 ``` 11 12 + ## Quick Start 13 14 ```gleam 15 import goose 16 import gleam/io 17 18 pub fn main() { 19 + // Use default config with automatic retry logic 20 let config = goose.default_config() 21 22 goose.start_consumer(config, fn(json_event) { 23 let event = goose.parse_event(json_event) 24 ··· 40 } 41 ``` 42 43 + ### Custom Configuration 44 + 45 + ```gleam 46 + import goose 47 + import gleam/option 48 + 49 + pub fn main() { 50 + let config = goose.JetstreamConfig( 51 + endpoint: "wss://jetstream2.us-east.bsky.network/subscribe", 52 + wanted_collections: ["app.bsky.feed.post", "app.bsky.feed.like"], 53 + wanted_dids: [], 54 + cursor: option.None, 55 + max_message_size_bytes: option.None, 56 + compress: True, 57 + require_hello: False, 58 + ) 59 + 60 + goose.start_consumer(config, handle_event) 61 + } 62 + ``` 63 + 64 ## Configuration Options 65 + 66 + **Note:** Goose automatically handles connection failures with exponential backoff retry logic (1s, 2s, 4s, 8s, 16s, 32s, capped at 60s). All connections automatically retry on failure and reconnect on disconnection. 67 68 ### `wanted_collections` 69 An array of Collection NSIDs to filter which records you receive (default: empty = all collections)
+9 -4
example/manifest.toml
··· 2 # You typically do not need to edit this file 3 4 packages = [ 5 - { name = "cowlib", version = "2.16.0", build_tools = ["make", "rebar3"], requirements = [], otp_app = "cowlib", source = "hex", outer_checksum = "7F478D80D66B747344F0EA7708C187645CFCC08B11AA424632F78E25BF05DB51" }, 6 { name = "ezstd", version = "1.2.3", build_tools = ["rebar3"], requirements = [], otp_app = "ezstd", source = "hex", outer_checksum = "DE32E0B41BA36A9ED46DB8215DA74777D2F141BB75F67BFC05DBB4B7C3386DEE" }, 7 { name = "gleam_erlang", version = "1.3.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_erlang", source = "hex", outer_checksum = "1124AD3AA21143E5AF0FC5CF3D9529F6DB8CA03E43A55711B60B6B7B3874375C" }, 8 { name = "gleam_http", version = "4.3.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_http", source = "hex", outer_checksum = "82EA6A717C842456188C190AFB372665EA56CE13D8559BF3B1DD9E40F619EE0C" }, 9 { name = "gleam_json", version = "3.0.2", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_json", source = "hex", outer_checksum = "874FA3C3BB6E22DD2BB111966BD40B3759E9094E05257899A7C08F5DE77EC049" }, 10 { name = "gleam_stdlib", version = "0.65.0", build_tools = ["gleam"], requirements = [], otp_app = "gleam_stdlib", source = "hex", outer_checksum = "7C69C71D8C493AE11A5184828A77110EB05A7786EBF8B25B36A72F879C3EE107" }, 11 - { name = "gleeunit", version = "1.7.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleeunit", source = "hex", outer_checksum = "CD701726CBCE5588B375D157B4391CFD0F2F134CD12D9B6998A395484DE05C58" }, 12 - { name = "goose", version = "1.0.0", build_tools = ["gleam"], requirements = ["ezstd", "gleam_erlang", "gleam_http", "gleam_json", "gleam_stdlib", "gun"], source = "local", path = ".." }, 13 - { name = "gun", version = "2.2.0", build_tools = ["make", "rebar3"], requirements = ["cowlib"], otp_app = "gun", source = "hex", outer_checksum = "76022700C64287FEB4DF93A1795CFF6741B83FB37415C40C34C38D2A4645261A" }, 14 ] 15 16 [requirements]
··· 2 # You typically do not need to edit this file 3 4 packages = [ 5 + { name = "exception", version = "2.1.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "exception", source = "hex", outer_checksum = "329D269D5C2A314F7364BD2711372B6F2C58FA6F39981572E5CA68624D291F8C" }, 6 { name = "ezstd", version = "1.2.3", build_tools = ["rebar3"], requirements = [], otp_app = "ezstd", source = "hex", outer_checksum = "DE32E0B41BA36A9ED46DB8215DA74777D2F141BB75F67BFC05DBB4B7C3386DEE" }, 7 + { name = "filepath", version = "1.1.2", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "filepath", source = "hex", outer_checksum = "B06A9AF0BF10E51401D64B98E4B627F1D2E48C154967DA7AF4D0914780A6D40A" }, 8 + { name = "gleam_crypto", version = "1.5.1", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_crypto", source = "hex", outer_checksum = "50774BAFFF1144E7872814C566C5D653D83A3EBF23ACC3156B757A1B6819086E" }, 9 { name = "gleam_erlang", version = "1.3.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_erlang", source = "hex", outer_checksum = "1124AD3AA21143E5AF0FC5CF3D9529F6DB8CA03E43A55711B60B6B7B3874375C" }, 10 { name = "gleam_http", version = "4.3.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_http", source = "hex", outer_checksum = "82EA6A717C842456188C190AFB372665EA56CE13D8559BF3B1DD9E40F619EE0C" }, 11 { name = "gleam_json", version = "3.0.2", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_json", source = "hex", outer_checksum = "874FA3C3BB6E22DD2BB111966BD40B3759E9094E05257899A7C08F5DE77EC049" }, 12 + { name = "gleam_otp", version = "1.2.0", build_tools = ["gleam"], requirements = ["gleam_erlang", "gleam_stdlib"], otp_app = "gleam_otp", source = "hex", outer_checksum = "BA6A294E295E428EC1562DC1C11EA7530DCB981E8359134BEABC8493B7B2258E" }, 13 { name = "gleam_stdlib", version = "0.65.0", build_tools = ["gleam"], requirements = [], otp_app = "gleam_stdlib", source = "hex", outer_checksum = "7C69C71D8C493AE11A5184828A77110EB05A7786EBF8B25B36A72F879C3EE107" }, 14 + { name = "gleeunit", version = "1.8.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleeunit", source = "hex", outer_checksum = "7AE0F64B26CC065ED705FF7CA5F4EDAB8015E72A883736FE251E46FACCCE1E08" }, 15 + { name = "goose", version = "2.0.0", build_tools = ["gleam"], requirements = ["exception", "ezstd", "gleam_crypto", "gleam_erlang", "gleam_http", "gleam_json", "gleam_otp", "gleam_stdlib", "gramps", "logging", "simplifile"], source = "local", path = ".." }, 16 + { name = "gramps", version = "6.0.0", build_tools = ["gleam"], requirements = ["gleam_crypto", "gleam_erlang", "gleam_http", "gleam_stdlib"], otp_app = "gramps", source = "hex", outer_checksum = "8B7195978FBFD30B43DF791A8A272041B81E45D245314D7A41FC57237AA882A0" }, 17 + { name = "logging", version = "1.3.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "logging", source = "hex", outer_checksum = "1098FBF10B54B44C2C7FDF0B01C1253CAFACDACABEFB4B0D027803246753E06D" }, 18 + { name = "simplifile", version = "2.3.0", build_tools = ["gleam"], requirements = ["filepath", "gleam_stdlib"], otp_app = "simplifile", source = "hex", outer_checksum = "0A868DAC6063D9E983477981839810DC2E553285AB4588B87E3E9C96A7FB4CB4" }, 19 ] 20 21 [requirements]
+1 -1
example/src/example.gleam
··· 19 io.println("Connected to: " <> config.endpoint) 20 io.println("Listening for all events...\n") 21 22 - // Start consuming and log all events 23 goose.start_consumer(config, fn(json_event) { 24 let event = goose.parse_event(json_event) 25
··· 19 io.println("Connected to: " <> config.endpoint) 20 io.println("Listening for all events...\n") 21 22 + // Start consuming and log all events (automatically retries on failure) 23 goose.start_consumer(config, fn(json_event) { 24 let event = goose.parse_event(json_event) 25
+7 -2
gleam.toml
··· 1 name = "goose" 2 - version = "1.0.0" 3 description = "A Gleam WebSocket consumer for AT Protocol Jetstream events" 4 licences = ["Apache-2.0"] 5 repository = { type = "github", user = "bigmoves", repo = "goose" } ··· 7 [dependencies] 8 gleam_stdlib = ">= 0.44.0 and < 2.0.0" 9 gleam_erlang = ">= 1.0.0 and < 2.0.0" 10 gleam_http = ">= 4.0.0 and < 5.0.0" 11 gleam_json = ">= 3.0.2 and < 4.0.0" 12 - gun = ">= 2.2.0 and < 3.0.0" 13 ezstd = ">= 1.2.0 and < 2.0.0" 14 15 [dev-dependencies] 16 gleeunit = ">= 1.0.0 and < 2.0.0"
··· 1 name = "goose" 2 + version = "2.0.0" 3 description = "A Gleam WebSocket consumer for AT Protocol Jetstream events" 4 licences = ["Apache-2.0"] 5 repository = { type = "github", user = "bigmoves", repo = "goose" } ··· 7 [dependencies] 8 gleam_stdlib = ">= 0.44.0 and < 2.0.0" 9 gleam_erlang = ">= 1.0.0 and < 2.0.0" 10 + gleam_otp = ">= 1.0.0 and < 2.0.0" 11 gleam_http = ">= 4.0.0 and < 5.0.0" 12 gleam_json = ">= 3.0.2 and < 4.0.0" 13 + gleam_crypto = ">= 1.5.0 and < 2.0.0" 14 ezstd = ">= 1.2.0 and < 2.0.0" 15 + simplifile = ">= 2.0.0 and < 3.0.0" 16 + logging = ">= 1.3.0 and < 2.0.0" 17 + exception = ">= 2.1.0 and < 3.0.0" 18 + gramps = ">= 6.0.0 and < 7.0.0" 19 20 [dev-dependencies] 21 gleeunit = ">= 1.0.0 and < 2.0.0"
+14 -4
manifest.toml
··· 2 # You typically do not need to edit this file 3 4 packages = [ 5 - { name = "cowlib", version = "2.16.0", build_tools = ["make", "rebar3"], requirements = [], otp_app = "cowlib", source = "hex", outer_checksum = "7F478D80D66B747344F0EA7708C187645CFCC08B11AA424632F78E25BF05DB51" }, 6 { name = "ezstd", version = "1.2.3", build_tools = ["rebar3"], requirements = [], otp_app = "ezstd", source = "hex", outer_checksum = "DE32E0B41BA36A9ED46DB8215DA74777D2F141BB75F67BFC05DBB4B7C3386DEE" }, 7 { name = "gleam_erlang", version = "1.3.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_erlang", source = "hex", outer_checksum = "1124AD3AA21143E5AF0FC5CF3D9529F6DB8CA03E43A55711B60B6B7B3874375C" }, 8 { name = "gleam_http", version = "4.3.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_http", source = "hex", outer_checksum = "82EA6A717C842456188C190AFB372665EA56CE13D8559BF3B1DD9E40F619EE0C" }, 9 { name = "gleam_json", version = "3.0.2", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_json", source = "hex", outer_checksum = "874FA3C3BB6E22DD2BB111966BD40B3759E9094E05257899A7C08F5DE77EC049" }, 10 { name = "gleam_stdlib", version = "0.65.0", build_tools = ["gleam"], requirements = [], otp_app = "gleam_stdlib", source = "hex", outer_checksum = "7C69C71D8C493AE11A5184828A77110EB05A7786EBF8B25B36A72F879C3EE107" }, 11 - { name = "gleeunit", version = "1.6.1", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleeunit", source = "hex", outer_checksum = "FDC68A8C492B1E9B429249062CD9BAC9B5538C6FBF584817205D0998C42E1DAC" }, 12 - { name = "gun", version = "2.2.0", build_tools = ["make", "rebar3"], requirements = ["cowlib"], otp_app = "gun", source = "hex", outer_checksum = "76022700C64287FEB4DF93A1795CFF6741B83FB37415C40C34C38D2A4645261A" }, 13 ] 14 15 [requirements] 16 ezstd = { version = ">= 1.2.0 and < 2.0.0" } 17 gleam_erlang = { version = ">= 1.0.0 and < 2.0.0" } 18 gleam_http = { version = ">= 4.0.0 and < 5.0.0" } 19 gleam_json = { version = ">= 3.0.2 and < 4.0.0" } 20 gleam_stdlib = { version = ">= 0.44.0 and < 2.0.0" } 21 gleeunit = { version = ">= 1.0.0 and < 2.0.0" } 22 - gun = { version = ">= 2.2.0 and < 3.0.0" }
··· 2 # You typically do not need to edit this file 3 4 packages = [ 5 + { name = "exception", version = "2.1.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "exception", source = "hex", outer_checksum = "329D269D5C2A314F7364BD2711372B6F2C58FA6F39981572E5CA68624D291F8C" }, 6 { name = "ezstd", version = "1.2.3", build_tools = ["rebar3"], requirements = [], otp_app = "ezstd", source = "hex", outer_checksum = "DE32E0B41BA36A9ED46DB8215DA74777D2F141BB75F67BFC05DBB4B7C3386DEE" }, 7 + { name = "filepath", version = "1.1.2", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "filepath", source = "hex", outer_checksum = "B06A9AF0BF10E51401D64B98E4B627F1D2E48C154967DA7AF4D0914780A6D40A" }, 8 + { name = "gleam_crypto", version = "1.5.1", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_crypto", source = "hex", outer_checksum = "50774BAFFF1144E7872814C566C5D653D83A3EBF23ACC3156B757A1B6819086E" }, 9 { name = "gleam_erlang", version = "1.3.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_erlang", source = "hex", outer_checksum = "1124AD3AA21143E5AF0FC5CF3D9529F6DB8CA03E43A55711B60B6B7B3874375C" }, 10 { name = "gleam_http", version = "4.3.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_http", source = "hex", outer_checksum = "82EA6A717C842456188C190AFB372665EA56CE13D8559BF3B1DD9E40F619EE0C" }, 11 { name = "gleam_json", version = "3.0.2", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_json", source = "hex", outer_checksum = "874FA3C3BB6E22DD2BB111966BD40B3759E9094E05257899A7C08F5DE77EC049" }, 12 + { name = "gleam_otp", version = "1.2.0", build_tools = ["gleam"], requirements = ["gleam_erlang", "gleam_stdlib"], otp_app = "gleam_otp", source = "hex", outer_checksum = "BA6A294E295E428EC1562DC1C11EA7530DCB981E8359134BEABC8493B7B2258E" }, 13 { name = "gleam_stdlib", version = "0.65.0", build_tools = ["gleam"], requirements = [], otp_app = "gleam_stdlib", source = "hex", outer_checksum = "7C69C71D8C493AE11A5184828A77110EB05A7786EBF8B25B36A72F879C3EE107" }, 14 + { name = "gleeunit", version = "1.8.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleeunit", source = "hex", outer_checksum = "7AE0F64B26CC065ED705FF7CA5F4EDAB8015E72A883736FE251E46FACCCE1E08" }, 15 + { name = "gramps", version = "6.0.0", build_tools = ["gleam"], requirements = ["gleam_crypto", "gleam_erlang", "gleam_http", "gleam_stdlib"], otp_app = "gramps", source = "hex", outer_checksum = "8B7195978FBFD30B43DF791A8A272041B81E45D245314D7A41FC57237AA882A0" }, 16 + { name = "logging", version = "1.3.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "logging", source = "hex", outer_checksum = "1098FBF10B54B44C2C7FDF0B01C1253CAFACDACABEFB4B0D027803246753E06D" }, 17 + { name = "simplifile", version = "2.3.0", build_tools = ["gleam"], requirements = ["filepath", "gleam_stdlib"], otp_app = "simplifile", source = "hex", outer_checksum = "0A868DAC6063D9E983477981839810DC2E553285AB4588B87E3E9C96A7FB4CB4" }, 18 ] 19 20 [requirements] 21 + exception = { version = ">= 2.1.0 and < 3.0.0" } 22 ezstd = { version = ">= 1.2.0 and < 2.0.0" } 23 + gleam_crypto = { version = ">= 1.5.0 and < 2.0.0" } 24 gleam_erlang = { version = ">= 1.0.0 and < 2.0.0" } 25 gleam_http = { version = ">= 4.0.0 and < 5.0.0" } 26 gleam_json = { version = ">= 3.0.2 and < 4.0.0" } 27 + gleam_otp = { version = ">= 1.0.0 and < 2.0.0" } 28 gleam_stdlib = { version = ">= 0.44.0 and < 2.0.0" } 29 gleeunit = { version = ">= 1.0.0 and < 2.0.0" } 30 + gramps = { version = ">= 6.0.0 and < 7.0.0" } 31 + logging = { version = ">= 1.3.0 and < 2.0.0" } 32 + simplifile = { version = ">= 2.0.0 and < 3.0.0" }
+51
src/goose/internal/zstd.gleam
···
··· 1 + import gleam/erlang/atom 2 + 3 + /// Opaque type for decompression context 4 + pub type DecompressionContext 5 + 6 + /// Opaque type for decompression dictionary 7 + pub type DecompressionDict 8 + 9 + /// Create a decompression context with a given buffer size 10 + @external(erlang, "ezstd", "create_decompression_context") 11 + pub fn create_decompression_context(buffer_size: Int) -> DecompressionContext 12 + 13 + /// Create a decompression dictionary from binary data 14 + @external(erlang, "ezstd", "create_ddict") 15 + pub fn create_ddict(dict_data: BitArray) -> DecompressionDict 16 + 17 + /// Select a dictionary for the decompression context 18 + /// Note: ezstd returns 'ok' atom, not {ok, undefined} 19 + @external(erlang, "ezstd", "select_ddict") 20 + fn select_ddict_ffi( 21 + dctx: DecompressionContext, 22 + ddict: DecompressionDict, 23 + ) -> atom.Atom 24 + 25 + /// Select a dictionary for the decompression context (safe wrapper) 26 + pub fn select_ddict( 27 + dctx: DecompressionContext, 28 + ddict: DecompressionDict, 29 + ) -> Result(Nil, String) { 30 + let result_atom = select_ddict_ffi(dctx, ddict) 31 + case atom.to_string(result_atom) { 32 + "ok" -> Ok(Nil) 33 + err -> Error("Failed to select dictionary: " <> err) 34 + } 35 + } 36 + 37 + /// Decompress data using a dictionary 38 + /// Note: ezstd can return either BitArray or {error, Binary} 39 + @external(erlang, "goose_ffi", "decompress_using_ddict_safe") 40 + pub fn decompress_using_ddict( 41 + data: BitArray, 42 + ddict: DecompressionDict, 43 + ) -> Result(BitArray, String) 44 + 45 + /// Decompress data using a streaming context (for frames without content size) 46 + /// Note: ezstd can return either BitArray or {error, Binary} 47 + @external(erlang, "goose_ffi", "decompress_streaming_safe") 48 + pub fn decompress_streaming( 49 + dctx: DecompressionContext, 50 + data: BitArray, 51 + ) -> Result(BitArray, String)
+174
src/goose/stratus/internal/socket.gleam
···
··· 1 + import gleam/dynamic.{type Dynamic} 2 + import gleam/dynamic/decode 3 + import gleam/erlang/atom.{type Atom} 4 + import gleam/erlang/process.{type Selector} 5 + import gleam/list 6 + 7 + pub type Socket 8 + 9 + pub type SocketReason { 10 + Closed 11 + Timeout 12 + Badarg 13 + Terminated 14 + Eaddrinuse 15 + Eaddrnotavail 16 + Eafnosupport 17 + Ealready 18 + Econnaborted 19 + Econnrefused 20 + Econnreset 21 + Edestaddrreq 22 + Ehostdown 23 + Ehostunreach 24 + Einprogress 25 + Eisconn 26 + Emsgsize 27 + Enetdown 28 + Enetunreach 29 + Enopkg 30 + Enoprotoopt 31 + Enotconn 32 + Enotty 33 + Enotsock 34 + Eproto 35 + Eprotonosupport 36 + Eprototype 37 + Esocktnosupport 38 + Etimedout 39 + Ewouldblock 40 + Exbadport 41 + Exbadseq 42 + } 43 + 44 + pub type TcpOption = 45 + #(Atom, Dynamic) 46 + 47 + pub type ReceiveMode { 48 + Count(Int) 49 + Once 50 + Pull 51 + All 52 + } 53 + 54 + pub type PacketType { 55 + Binary 56 + List 57 + } 58 + 59 + pub type Options { 60 + Receive(ReceiveMode) 61 + PacketsOf(PacketType) 62 + SendTimeout(Int) 63 + SendTimeoutClose(Bool) 64 + Reuseaddr(Bool) 65 + Nodelay(Bool) 66 + Cacerts(Dynamic) 67 + CustomizeHostnameCheck(Dynamic) 68 + } 69 + 70 + pub const default_options = [ 71 + PacketsOf(Binary), 72 + SendTimeout(30_000), 73 + SendTimeoutClose(True), 74 + Reuseaddr(True), 75 + Nodelay(True), 76 + ] 77 + 78 + @external(erlang, "gleam@function", "identity") 79 + fn from(value: a) -> Dynamic 80 + 81 + pub fn convert_options(options: List(Options)) -> List(TcpOption) { 82 + let active = atom.create("active") 83 + list.map(options, fn(opt) { 84 + case opt { 85 + Receive(Count(count)) -> #(active, dynamic.int(count)) 86 + Receive(Once) -> #(active, from(atom.create("once"))) 87 + Receive(Pull) -> #(active, dynamic.bool(False)) 88 + Receive(All) -> #(active, dynamic.bool(True)) 89 + PacketsOf(Binary) -> #(atom.create("mode"), from(Binary)) 90 + PacketsOf(List) -> #(atom.create("mode"), from(List)) 91 + Cacerts(data) -> #(atom.create("cacerts"), data) 92 + Nodelay(bool) -> #(atom.create("nodelay"), dynamic.bool(bool)) 93 + Reuseaddr(bool) -> #(atom.create("reuseaddr"), dynamic.bool(bool)) 94 + SendTimeout(int) -> #(atom.create("send_timeout"), dynamic.int(int)) 95 + SendTimeoutClose(bool) -> #( 96 + atom.create("send_timeout_close"), 97 + dynamic.bool(bool), 98 + ) 99 + CustomizeHostnameCheck(funcs) -> #( 100 + atom.create("customize_hostname_check"), 101 + funcs, 102 + ) 103 + } 104 + }) 105 + } 106 + 107 + pub type Shutdown { 108 + Read 109 + Write 110 + ReadWrite 111 + } 112 + 113 + pub type SocketMessage { 114 + Data(BitArray) 115 + Err(SocketReason) 116 + } 117 + 118 + type ErlangSocketMessage { 119 + Ssl 120 + SslClosed 121 + SslError 122 + Tcp 123 + TcpClosed 124 + TcpError 125 + } 126 + 127 + pub fn selector() -> Selector(Result(SocketMessage, List(decode.DecodeError))) { 128 + process.new_selector() 129 + |> process.select_record(Tcp, 2, fn(data) { 130 + { 131 + use msg <- decode.field(2, decode.bit_array) 132 + decode.success(Data(msg)) 133 + } 134 + |> decode.run(data, _) 135 + }) 136 + |> process.select_record(Ssl, 2, fn(data) { 137 + { 138 + use msg <- decode.field(2, decode.bit_array) 139 + decode.success(Data(msg)) 140 + } 141 + |> decode.run(data, _) 142 + }) 143 + |> process.select_record(SslClosed, 1, fn(_socket) { Ok(Err(Closed)) }) 144 + |> process.select_record(TcpClosed, 1, fn(_socket) { Ok(Err(Closed)) }) 145 + |> process.select_record(TcpError, 2, fn(data) { 146 + { 147 + use reason <- decode.field(2, atom.decoder()) 148 + case parse_known_socket_reason(reason) { 149 + Ok(socket_reason) -> decode.success(Err(socket_reason)) 150 + Error(_) -> decode.failure(Err(Badarg), "SocketReason") 151 + } 152 + } 153 + |> decode.run(data, _) 154 + }) 155 + |> process.select_record(SslError, 2, fn(data) { 156 + { 157 + use reason <- decode.field(2, atom.decoder()) 158 + case parse_known_socket_reason(reason) { 159 + Ok(socket_reason) -> decode.success(Err(socket_reason)) 160 + Error(_) -> decode.failure(Err(Badarg), "SocketReason") 161 + } 162 + } 163 + |> decode.run(data, _) 164 + }) 165 + } 166 + 167 + @external(erlang, "public_key", "cacerts_get") 168 + pub fn get_certs() -> Dynamic 169 + 170 + @external(erlang, "stratus_ffi", "custom_sni_matcher") 171 + pub fn get_custom_matcher() -> Options 172 + 173 + @external(erlang, "stratus_ffi", "parse_known_socket_reason") 174 + fn parse_known_socket_reason(reason: Atom) -> Result(SocketReason, Dynamic)
+45
src/goose/stratus/internal/ssl.gleam
···
··· 1 + import gleam/bytes_tree.{type BytesTree} 2 + import gleam/erlang/charlist.{type Charlist} 3 + import gleam/erlang/process.{type Pid} 4 + import goose/stratus/internal/socket.{ 5 + type Shutdown, type Socket, type SocketReason, type TcpOption, 6 + } 7 + 8 + @external(erlang, "ssl", "connect") 9 + pub fn connect( 10 + address: Charlist, 11 + port: Int, 12 + options: List(TcpOption), 13 + timeout: Int, 14 + ) -> Result(Socket, SocketReason) 15 + 16 + @external(erlang, "stratus_ffi", "ssl_shutdown") 17 + pub fn shutdown(socket: Socket, how: Shutdown) -> Result(Nil, SocketReason) 18 + 19 + @external(erlang, "stratus_ffi", "ssl_send") 20 + pub fn send(socket: Socket, packet: BytesTree) -> Result(Nil, SocketReason) 21 + 22 + @external(erlang, "ssl", "recv") 23 + pub fn receive(socket: Socket, length: Int) -> Result(BitArray, SocketReason) 24 + 25 + @external(erlang, "ssl", "recv") 26 + pub fn receive_timeout( 27 + socket: Socket, 28 + length: Int, 29 + timeout: Int, 30 + ) -> Result(BitArray, SocketReason) 31 + 32 + @external(erlang, "stratus_ffi", "ssl_set_opts") 33 + pub fn set_opts( 34 + socket: Socket, 35 + opts: List(TcpOption), 36 + ) -> Result(Nil, SocketReason) 37 + 38 + @external(erlang, "stratus_ffi", "ssl_start") 39 + pub fn start() -> Result(Nil, Nil) 40 + 41 + @external(erlang, "stratus_ffi", "ssl_controlling_process") 42 + pub fn controlling_process( 43 + socket: Socket, 44 + new_owner: Pid, 45 + ) -> Result(Nil, SocketReason)
+42
src/goose/stratus/internal/tcp.gleam
···
··· 1 + import gleam/bytes_tree.{type BytesTree} 2 + import gleam/erlang/charlist.{type Charlist} 3 + import gleam/erlang/process.{type Pid} 4 + import goose/stratus/internal/socket.{ 5 + type Shutdown, type Socket, type SocketReason, type TcpOption, 6 + } 7 + 8 + @external(erlang, "gen_tcp", "connect") 9 + pub fn connect( 10 + address: Charlist, 11 + port: Int, 12 + options: List(TcpOption), 13 + timeout: Int, 14 + ) -> Result(Socket, SocketReason) 15 + 16 + @external(erlang, "stratus_ffi", "tcp_shutdown") 17 + pub fn shutdown(socket: Socket, how: Shutdown) -> Result(Nil, SocketReason) 18 + 19 + @external(erlang, "stratus_ffi", "tcp_send") 20 + pub fn send(socket: Socket, packet: BytesTree) -> Result(Nil, SocketReason) 21 + 22 + @external(erlang, "gen_tcp", "recv") 23 + pub fn receive(socket: Socket, length: Int) -> Result(BitArray, SocketReason) 24 + 25 + @external(erlang, "gen_tcp", "recv") 26 + pub fn receive_timeout( 27 + socket: Socket, 28 + length: Int, 29 + timeout: Int, 30 + ) -> Result(BitArray, SocketReason) 31 + 32 + @external(erlang, "stratus_ffi", "tcp_set_opts") 33 + pub fn set_opts( 34 + socket: Socket, 35 + opts: List(TcpOption), 36 + ) -> Result(Nil, SocketReason) 37 + 38 + @external(erlang, "stratus_ffi", "tcp_controlling_process") 39 + pub fn controlling_process( 40 + socket: Socket, 41 + new_owner: Pid, 42 + ) -> Result(Nil, SocketReason)
+81
src/goose/stratus/internal/transport.gleam
···
··· 1 + import gleam/bytes_tree.{type BytesTree} 2 + import gleam/erlang/charlist.{type Charlist} 3 + import goose/stratus/internal/socket.{ 4 + type Shutdown, type Socket, type SocketReason, type TcpOption, 5 + } 6 + import goose/stratus/internal/ssl 7 + import goose/stratus/internal/tcp 8 + 9 + pub type Transport { 10 + Tcp 11 + Ssl 12 + } 13 + 14 + pub fn connect( 15 + transport: Transport, 16 + host: Charlist, 17 + port: Int, 18 + options: List(TcpOption), 19 + timeout: Int, 20 + ) -> Result(Socket, SocketReason) { 21 + case transport { 22 + Ssl -> ssl.connect(host, port, options, timeout) 23 + Tcp -> tcp.connect(host, port, options, timeout) 24 + } 25 + } 26 + 27 + pub fn send( 28 + transport: Transport, 29 + socket: Socket, 30 + data: BytesTree, 31 + ) -> Result(Nil, SocketReason) { 32 + case transport { 33 + Ssl -> ssl.send(socket, data) 34 + Tcp -> tcp.send(socket, data) 35 + } 36 + } 37 + 38 + pub fn receive( 39 + transport: Transport, 40 + socket: Socket, 41 + length: Int, 42 + ) -> Result(BitArray, SocketReason) { 43 + case transport { 44 + Ssl -> ssl.receive(socket, length) 45 + Tcp -> tcp.receive(socket, length) 46 + } 47 + } 48 + 49 + pub fn receive_timeout( 50 + transport: Transport, 51 + socket: Socket, 52 + length: Int, 53 + timeout: Int, 54 + ) -> Result(BitArray, SocketReason) { 55 + case transport { 56 + Ssl -> ssl.receive_timeout(socket, length, timeout) 57 + Tcp -> tcp.receive_timeout(socket, length, timeout) 58 + } 59 + } 60 + 61 + pub fn shutdown( 62 + transport: Transport, 63 + socket: Socket, 64 + how: Shutdown, 65 + ) -> Result(Nil, SocketReason) { 66 + case transport { 67 + Ssl -> ssl.shutdown(socket, how) 68 + Tcp -> tcp.shutdown(socket, how) 69 + } 70 + } 71 + 72 + pub fn set_opts( 73 + transport: Transport, 74 + socket: Socket, 75 + opts: List(TcpOption), 76 + ) -> Result(Nil, SocketReason) { 77 + case transport { 78 + Tcp -> tcp.set_opts(socket, opts) 79 + Ssl -> ssl.set_opts(socket, opts) 80 + } 81 + }
+943
src/goose/stratus.gleam
···
··· 1 + import exception 2 + import gleam/bit_array 3 + import gleam/bool 4 + import gleam/bytes_tree.{type BytesTree} 5 + import gleam/crypto 6 + import gleam/erlang/charlist 7 + import gleam/erlang/process.{type Selector, type Subject} 8 + import gleam/http.{Http, Https} 9 + import gleam/http/request.{type Request} 10 + import gleam/http/response.{type Response} 11 + import gleam/int 12 + import gleam/list 13 + import gleam/option.{type Option, None, Some} 14 + import gleam/otp/actor 15 + import gleam/result 16 + import gleam/string 17 + import gleam/uri 18 + import goose/stratus/internal/socket.{ 19 + type Socket, type SocketMessage, Cacerts, Once, Pull, Receive, 20 + } 21 + import goose/stratus/internal/ssl 22 + import goose/stratus/internal/tcp 23 + import goose/stratus/internal/transport.{type Transport, Ssl, Tcp} 24 + import gramps/http as gramps_http 25 + import gramps/websocket.{ 26 + BinaryFrame, CloseFrame, Continuation, Control, Data as DataFrame, PingFrame, 27 + PongFrame, TextFrame, 28 + } 29 + import gramps/websocket/compression 30 + import logging 31 + 32 + /// This holds some information needed to communicate with the WebSocket. 33 + pub opaque type Connection { 34 + Connection( 35 + socket: Socket, 36 + transport: Transport, 37 + context: Option(compression.Context), 38 + ) 39 + } 40 + 41 + pub type SocketReason { 42 + SocketClosed 43 + Timeout 44 + Badarg 45 + Terminated 46 + Eaddrinuse 47 + Eaddrnotavail 48 + Eafnosupport 49 + Ealready 50 + Econnaborted 51 + Econnrefused 52 + Econnreset 53 + Edestaddrreq 54 + Ehostdown 55 + Ehostunreach 56 + Einprogress 57 + Eisconn 58 + Emsgsize 59 + Enetdown 60 + Enetunreach 61 + Enopkg 62 + Enoprotoopt 63 + Enotconn 64 + Enotty 65 + Enotsock 66 + Eproto 67 + Eprotonosupport 68 + Eprototype 69 + Esocktnosupport 70 + Etimedout 71 + Ewouldblock 72 + Exbadport 73 + Exbadseq 74 + } 75 + 76 + pub type CustomCloseError { 77 + SocketFail(SocketReason) 78 + InvalidCode 79 + } 80 + 81 + fn convert_socket_reason(reason: socket.SocketReason) -> SocketReason { 82 + case reason { 83 + socket.Badarg -> Badarg 84 + socket.Closed -> SocketClosed 85 + socket.Eaddrinuse -> Eaddrinuse 86 + socket.Eaddrnotavail -> Eaddrnotavail 87 + socket.Eafnosupport -> Eafnosupport 88 + socket.Ealready -> Ealready 89 + socket.Econnaborted -> Econnaborted 90 + socket.Econnrefused -> Econnrefused 91 + socket.Econnreset -> Econnreset 92 + socket.Edestaddrreq -> Edestaddrreq 93 + socket.Ehostdown -> Ehostdown 94 + socket.Ehostunreach -> Ehostunreach 95 + socket.Einprogress -> Einprogress 96 + socket.Eisconn -> Eisconn 97 + socket.Emsgsize -> Emsgsize 98 + socket.Enetdown -> Enetdown 99 + socket.Enetunreach -> Enetunreach 100 + socket.Enopkg -> Enopkg 101 + socket.Enoprotoopt -> Enoprotoopt 102 + socket.Enotconn -> Enotconn 103 + socket.Enotsock -> Enotsock 104 + socket.Enotty -> Enotty 105 + socket.Eproto -> Eproto 106 + socket.Eprotonosupport -> Eprotonosupport 107 + socket.Eprototype -> Eprototype 108 + socket.Esocktnosupport -> Esocktnosupport 109 + socket.Etimedout -> Etimedout 110 + socket.Ewouldblock -> Ewouldblock 111 + socket.Exbadport -> Exbadport 112 + socket.Exbadseq -> Exbadseq 113 + socket.Terminated -> Terminated 114 + socket.Timeout -> Timeout 115 + } 116 + } 117 + 118 + fn from_socket_message(msg: SocketMessage) -> InternalMessage(user_message) { 119 + case msg { 120 + socket.Data(bits) -> Data(bits) 121 + socket.Err(reason) -> Err(convert_socket_reason(reason)) 122 + } 123 + } 124 + 125 + pub opaque type Next(state, user_message) { 126 + Continue(state: state, selector: Option(Selector(user_message))) 127 + NormalStop 128 + AbnormalStop(reason: String) 129 + } 130 + 131 + pub fn continue(state: state) -> Next(state, user_message) { 132 + Continue(state, None) 133 + } 134 + 135 + pub fn with_selector( 136 + next: Next(state, user_message), 137 + selector: Selector(user_message), 138 + ) -> Next(state, user_message) { 139 + case next { 140 + Continue(state, _) -> Continue(state, Some(selector)) 141 + _ -> next 142 + } 143 + } 144 + 145 + pub fn stop() -> Next(state, user_message) { 146 + NormalStop 147 + } 148 + 149 + pub fn stop_abnormal(reason: String) -> Next(state, user_message) { 150 + AbnormalStop(reason) 151 + } 152 + 153 + /// These are the messages emitted or received by the underlying process. You 154 + /// should only need to interact with `Message` below. 155 + pub opaque type InternalMessage(user_message) { 156 + UserMessage(user_message) 157 + Err(SocketReason) 158 + Data(BitArray) 159 + Closed 160 + Shutdown 161 + } 162 + 163 + /// This is the type of message your handler might receive. 164 + pub type Message(user_message) { 165 + Text(String) 166 + Binary(BitArray) 167 + User(user_message) 168 + } 169 + 170 + pub opaque type Builder(state, user_message) { 171 + Builder( 172 + request: Request(String), 173 + connect_timeout: Int, 174 + init: fn() -> Result(Initialised(state, user_message), String), 175 + loop: fn(state, Message(user_message), Connection) -> 176 + Next(state, user_message), 177 + on_close: fn(state) -> Nil, 178 + ) 179 + } 180 + 181 + // and `on_close`. 182 + /// This creates a builder to set up a WebSocket actor. This will use default 183 + /// values for the connection initialization timeout, and provide an empty 184 + /// function to be called when the server closes the connection. If you want to 185 + /// customize either of those, see the helper functions `with_connect_timeout` 186 + pub fn new( 187 + request req: Request(String), 188 + state state: state, 189 + ) -> Builder(state, user_message) { 190 + Builder( 191 + request: req, 192 + connect_timeout: 5000, 193 + init: fn() { Ok(initialised(state)) }, 194 + loop: fn(state, _msg, _conn) { continue(state) }, 195 + on_close: fn(_state) { Nil }, 196 + ) 197 + } 198 + 199 + pub type Initialised(state, user_message) { 200 + Initialised(state: state, selector: Option(Selector(user_message))) 201 + } 202 + 203 + pub fn initialised(state: state) -> Initialised(state, user_message) { 204 + Initialised(state:, selector: None) 205 + } 206 + 207 + pub fn selecting( 208 + initialised: Initialised(state, old_message), 209 + selector: Selector(user_message), 210 + ) -> Initialised(state, user_message) { 211 + Initialised(state: initialised.state, selector: Some(selector)) 212 + } 213 + 214 + pub fn new_with_initialiser( 215 + request req: Request(String), 216 + init init: fn() -> Result(Initialised(state, user_message), String), 217 + ) -> Builder(state, user_message) { 218 + Builder( 219 + request: req, 220 + connect_timeout: 5000, 221 + init: init, 222 + loop: fn(state, _msg, _conn) { continue(state) }, 223 + on_close: fn(_state) { Nil }, 224 + ) 225 + } 226 + 227 + pub fn on_message( 228 + builder: Builder(state, user_message), 229 + on_message: fn(state, Message(user_message), Connection) -> 230 + Next(state, user_message), 231 + ) -> Builder(state, user_message) { 232 + Builder(..builder, loop: on_message) 233 + } 234 + 235 + /// This sets the maximum amount of time you are willing to wait for both 236 + /// connecting to the server and receiving the upgrade response. This means 237 + /// that it may take up to `timeout * 2` to begin sending or receiving messages. 238 + /// This value defaults to 5 seconds. 239 + pub fn with_connect_timeout( 240 + builder: Builder(state, user_message), 241 + timeout: Int, 242 + ) -> Builder(state, user_message) { 243 + Builder(..builder, connect_timeout: timeout) 244 + } 245 + 246 + /// You can provide a function to be called when the connection is closed. This 247 + /// function receives the last value for the state of the WebSocket. 248 + /// 249 + /// NOTE: If you manually call `stratus.close`, this function will not be 250 + /// called. I'm unsure right now if this is a bug or working as intended. But 251 + /// you will be in the loop with the state value handy. 252 + pub fn on_close( 253 + builder: Builder(state, user_message), 254 + on_close: fn(state) -> Nil, 255 + ) -> Builder(state, user_message) { 256 + Builder(..builder, on_close: on_close) 257 + } 258 + 259 + type State(state, user_message) { 260 + State( 261 + buffer: BitArray, 262 + incomplete: Option(websocket.Frame), 263 + self: Subject(InternalMessage(user_message)), 264 + socket: Socket, 265 + user_state: state, 266 + compression: Option(compression.Compression), 267 + ) 268 + } 269 + 270 + /// These are the possible failures when calling `stratus.initialize`. 271 + pub type InitializationError { 272 + // The WebSocket handshake failed for the provided reason 273 + HandshakeFailed(HandshakeError) 274 + // The actor failed to start, most likely due to a timeout in your `init` 275 + ActorFailed(actor.StartError) 276 + // 277 + FailedToTransferSocket(SocketReason) 278 + } 279 + 280 + /// This opens the WebSocket connection with the provided `Builder`. It makes 281 + /// some assumptions about the request if you do not provide it. It will use 282 + /// ports 80 or 443 for `ws` or `wss` respectively. 283 + /// 284 + /// It will open the connection and perform the WebSocket handshake. If this 285 + /// fails, the actor will fail to start with the given reason as a string value. 286 + /// 287 + /// After that, received messages will be passed to your loop, and you can use 288 + /// the helper functions to send messages to the server. The `close` method will 289 + /// send a close frame and end the connection. 290 + pub fn start( 291 + builder: Builder(state, user_message), 292 + ) -> Result( 293 + actor.Started(Subject(InternalMessage(user_message))), 294 + InitializationError, 295 + ) { 296 + let transport = case builder.request.scheme { 297 + Https -> Ssl 298 + _ -> Tcp 299 + } 300 + 301 + let handshake_result = 302 + perform_handshake(builder.request, transport, builder.connect_timeout) 303 + |> result.map_error(fn(reason) { 304 + let msg = case reason { 305 + UpgradeFailed(resp) -> 306 + "WebSocket handshake failed with status " 307 + <> int.to_string(resp.status) 308 + reason -> "WebSocket handshake failed: " <> string.inspect(reason) 309 + } 310 + logging.log(logging.Error, msg) 311 + HandshakeFailed(reason) 312 + }) 313 + 314 + use handshake_response <- result.try(handshake_result) 315 + logging.log(logging.Debug, "Handshake successful") 316 + 317 + let extensions = 318 + handshake_response.response 319 + |> response.get_header("sec-websocket-extensions") 320 + |> result.map(string.split(_, "; ")) 321 + |> result.unwrap([]) 322 + 323 + let context_takeovers = websocket.get_context_takeovers(extensions) 324 + 325 + let assert Ok(_) = 326 + transport.set_opts( 327 + transport, 328 + handshake_response.socket, 329 + socket.convert_options([Receive(Once)]), 330 + ) 331 + 332 + actor.new_with_initialiser(1000, fn(subject) { 333 + let started_selector = process.select(process.new_selector(), subject) 334 + logging.log(logging.Debug, "Calling user initializer") 335 + use Initialised(user_state, user_selector) <- result.try(builder.init()) 336 + let selector = case user_selector { 337 + Some(selector) -> { 338 + selector 339 + |> process.map_selector(UserMessage) 340 + |> process.merge_selector(started_selector) 341 + |> process.merge_selector( 342 + process.map_selector(socket.selector(), fn(msg) { 343 + let assert Ok(msg) = msg 344 + from_socket_message(msg) 345 + }), 346 + ) 347 + } 348 + _ -> 349 + started_selector 350 + |> process.merge_selector( 351 + process.map_selector(socket.selector(), fn(msg) { 352 + let assert Ok(msg) = msg 353 + from_socket_message(msg) 354 + }), 355 + ) 356 + } 357 + let context = case websocket.has_deflate(extensions) { 358 + True -> Some(compression.init(context_takeovers)) 359 + False -> None 360 + } 361 + State( 362 + buffer: <<>>, 363 + incomplete: None, 364 + self: subject, 365 + socket: handshake_response.socket, 366 + user_state: user_state, 367 + compression: context, 368 + ) 369 + |> actor.initialised 370 + |> actor.selecting(selector) 371 + |> actor.returning(subject) 372 + |> Ok 373 + }) 374 + |> actor.on_message(fn(state, message) { 375 + case message { 376 + UserMessage(user_message) -> { 377 + let conn = 378 + Connection( 379 + state.socket, 380 + transport, 381 + option.map(state.compression, fn(context) { context.deflate }), 382 + ) 383 + let res = 384 + exception.rescue(fn() { 385 + builder.loop(state.user_state, User(user_message), conn) 386 + }) 387 + case res { 388 + // TODO: de-dupe this 389 + Ok(Continue(user_state, user_selector)) -> { 390 + let new_state = State(..state, user_state: user_state) 391 + case user_selector { 392 + Some(user_selector) -> { 393 + let selector = 394 + user_selector 395 + |> process.map_selector(UserMessage) 396 + |> process.merge_selector( 397 + process.map_selector(socket.selector(), fn(msg) { 398 + let assert Ok(msg) = msg 399 + from_socket_message(msg) 400 + }), 401 + ) 402 + new_state 403 + |> actor.continue 404 + |> actor.with_selector(selector) 405 + } 406 + _ -> actor.continue(new_state) 407 + } 408 + } 409 + Ok(NormalStop) -> actor.stop() 410 + Ok(AbnormalStop(reason)) -> actor.stop_abnormal(reason) 411 + Error(reason) -> { 412 + logging.log( 413 + logging.Error, 414 + "Caught error in user handler: " <> string.inspect(reason), 415 + ) 416 + actor.continue(state) 417 + } 418 + } 419 + } 420 + Err(reason) -> { 421 + close_contexts(state.compression) 422 + actor.stop_abnormal(string.inspect(reason)) 423 + } 424 + Data(bits) -> { 425 + let conn = 426 + Connection( 427 + state.socket, 428 + transport, 429 + option.map(state.compression, fn(context) { context.deflate }), 430 + ) 431 + let #(frames, rest) = 432 + websocket.decode_many_frames( 433 + bit_array.append(state.buffer, bits), 434 + option.map(state.compression, fn(context) { context.inflate }), 435 + [], 436 + ) 437 + let frames = websocket.aggregate_frames(frames, state.incomplete, []) 438 + case frames { 439 + Error(Nil) -> continue(state) 440 + Ok(frames) -> { 441 + list.fold_until(frames, continue(state), fn(acc, frame) { 442 + let assert Continue(prev_state, _selector) = acc 443 + case handle_frame(builder, prev_state, conn, frame) { 444 + Continue(..) as next -> list.Continue(next) 445 + err -> list.Stop(err) 446 + } 447 + }) 448 + } 449 + } 450 + |> fn(next) { 451 + case next { 452 + Continue(state, selector) -> { 453 + let assert Ok(_) = 454 + transport.set_opts( 455 + transport, 456 + state.socket, 457 + socket.convert_options([Receive(Once)]), 458 + ) 459 + let next = actor.continue(State(..state, buffer: rest)) 460 + case selector { 461 + Some(selector) -> actor.with_selector(next, selector) 462 + _ -> next 463 + } 464 + } 465 + NormalStop -> { 466 + close_contexts(state.compression) 467 + actor.stop() 468 + } 469 + AbnormalStop(reason) -> { 470 + close_contexts(state.compression) 471 + actor.stop_abnormal(reason) 472 + } 473 + } 474 + } 475 + } 476 + Closed -> { 477 + logging.log(logging.Debug, "Received closed frame") 478 + builder.on_close(state.user_state) 479 + close_contexts(state.compression) 480 + actor.stop() 481 + } 482 + // TODO: handle shutdown better? 483 + Shutdown -> { 484 + logging.log(logging.Debug, "Received shutdown messag") 485 + close_contexts(state.compression) 486 + actor.stop() 487 + } 488 + } 489 + }) 490 + |> actor.start 491 + |> result.map_error(ActorFailed) 492 + |> result.try(fn(started) { 493 + case transport { 494 + Tcp -> tcp.controlling_process(handshake_response.socket, started.pid) 495 + Ssl -> ssl.controlling_process(handshake_response.socket, started.pid) 496 + } 497 + |> result.map_error(fn(socket_reason) { 498 + FailedToTransferSocket(convert_socket_reason(socket_reason)) 499 + }) 500 + |> result.map(fn(_nil) { started }) 501 + }) 502 + |> result.map(fn(started) { 503 + let _ = case handshake_response.buffer { 504 + <<>> -> Nil 505 + data -> process.send(started.data, Data(data)) 506 + } 507 + started 508 + }) 509 + } 510 + 511 + fn handle_frame( 512 + builder: Builder(user_state, user_message), 513 + state: State(user_state, user_message), 514 + conn: Connection, 515 + frame: websocket.Frame, 516 + ) -> Next(State(user_state, user_message), InternalMessage(user_message)) { 517 + case frame { 518 + DataFrame(TextFrame(payload: data)) -> { 519 + let assert Ok(str) = bit_array.to_string(data) 520 + let res = 521 + exception.rescue(fn() { 522 + builder.loop(state.user_state, Text(str), conn) 523 + }) 524 + case res { 525 + // TODO: de-dupe this 526 + Ok(Continue(user_state, user_selector)) -> { 527 + let new_state = State(..state, user_state: user_state) 528 + case user_selector { 529 + Some(user_selector) -> { 530 + let selector = 531 + user_selector 532 + |> process.map_selector(UserMessage) 533 + |> process.merge_selector( 534 + process.map_selector(socket.selector(), fn(msg) { 535 + let assert Ok(msg) = msg 536 + from_socket_message(msg) 537 + }), 538 + ) 539 + Continue(new_state, Some(selector)) 540 + } 541 + _ -> continue(new_state) 542 + } 543 + } 544 + Ok(NormalStop) -> NormalStop 545 + Ok(AbnormalStop(reason)) -> AbnormalStop(reason) 546 + Error(reason) -> { 547 + logging.log( 548 + logging.Error, 549 + "Caught error in user handler: " <> string.inspect(reason), 550 + ) 551 + continue(state) 552 + } 553 + } 554 + } 555 + DataFrame(BinaryFrame(payload: data)) -> { 556 + let res = 557 + exception.rescue(fn() { 558 + builder.loop(state.user_state, Binary(data), conn) 559 + }) 560 + case res { 561 + // TODO: de-dupe this 562 + Ok(Continue(user_state, user_selector)) -> { 563 + let new_state = State(..state, user_state: user_state) 564 + case user_selector { 565 + Some(user_selector) -> { 566 + let selector = 567 + user_selector 568 + |> process.map_selector(UserMessage) 569 + |> process.merge_selector( 570 + process.map_selector(socket.selector(), fn(msg) { 571 + let assert Ok(msg) = msg 572 + from_socket_message(msg) 573 + }), 574 + ) 575 + Continue(new_state, Some(selector)) 576 + } 577 + _ -> continue(new_state) 578 + } 579 + } 580 + Ok(NormalStop) -> NormalStop 581 + Ok(AbnormalStop(reason)) -> AbnormalStop(reason) 582 + Error(reason) -> { 583 + logging.log( 584 + logging.Error, 585 + "Caught error in user handler: " <> string.inspect(reason), 586 + ) 587 + continue(state) 588 + } 589 + } 590 + } 591 + Control(PingFrame(payload)) -> { 592 + let mask = Some(<<0:unit(8)-size(4)>>) 593 + let frame = websocket.encode_pong_frame(payload, mask) 594 + 595 + let _ = transport.send(conn.transport, conn.socket, frame) 596 + continue(state) 597 + } 598 + Control(PongFrame(..)) -> { 599 + continue(state) 600 + } 601 + Control(CloseFrame(reason)) -> { 602 + logging.log( 603 + logging.Debug, 604 + "WebSocket closing: " <> string.inspect(reason), 605 + ) 606 + builder.on_close(state.user_state) 607 + NormalStop 608 + } 609 + Continuation(..) -> { 610 + continue(state) 611 + } 612 + } 613 + } 614 + 615 + /// The `Subject` returned from `initialize` is an opaque type. In order to 616 + /// send custom messages to your process, you can do this mapping. 617 + /// 618 + /// For example: 619 + /// ```gleam 620 + /// // using `process.send` 621 + /// MyMessage(some_data) 622 + /// |> stratus.to_user_message 623 + /// |> process.send(stratus_subject, _) 624 + /// // using `process.call` 625 + /// process.call(stratus_subject, fn(subj) { 626 + /// stratus.to_user_message(MyMessage(some_data, subj)) 627 + /// }) 628 + /// ``` 629 + pub fn to_user_message( 630 + user_message: user_message, 631 + ) -> InternalMessage(user_message) { 632 + UserMessage(user_message) 633 + } 634 + 635 + /// From within the actor loop, this is how you send a WebSocket text frame. 636 + /// This must be valid UTF-8, so it is a `String`. 637 + pub fn send_text_message( 638 + conn: Connection, 639 + msg: String, 640 + ) -> Result(Nil, SocketReason) { 641 + let frame = 642 + websocket.encode_text_frame( 643 + msg, 644 + conn.context, 645 + Some(crypto.strong_random_bytes(4)), 646 + ) 647 + transport.send(conn.transport, conn.socket, frame) 648 + |> result.map_error(convert_socket_reason) 649 + } 650 + 651 + /// From within the actor loop, this is how you send a WebSocket text frame. 652 + pub fn send_binary_message( 653 + conn: Connection, 654 + msg: BitArray, 655 + ) -> Result(Nil, SocketReason) { 656 + let frame = 657 + websocket.encode_binary_frame( 658 + msg, 659 + conn.context, 660 + Some(crypto.strong_random_bytes(4)), 661 + ) 662 + transport.send(conn.transport, conn.socket, frame) 663 + |> result.map_error(convert_socket_reason) 664 + } 665 + 666 + /// Send a ping frame with some data. 667 + pub fn send_ping(conn: Connection, data: BitArray) -> Result(Nil, SocketReason) { 668 + let size = bit_array.byte_size(data) 669 + let mask = case size { 670 + 0 -> Some(<<0:size(4)>>) 671 + _n -> Some(crypto.strong_random_bytes(4)) 672 + } 673 + let frame = websocket.encode_ping_frame(data, mask) 674 + transport.send(conn.transport, conn.socket, frame) 675 + |> result.map_error(convert_socket_reason) 676 + } 677 + 678 + pub type CloseReason { 679 + NotProvided 680 + /// Status code: 1000 681 + Normal(body: BitArray) 682 + /// Status code: 1001 683 + GoingAway(body: BitArray) 684 + /// Status code: 1002 685 + ProtocolError(body: BitArray) 686 + /// Status code: 1003 687 + UnexpectedDataType(body: BitArray) 688 + /// Status code: 1007 689 + InconsistentDataType(body: BitArray) 690 + /// Status code: 1008 691 + PolicyViolation(body: BitArray) 692 + /// Status code: 1009 693 + MessageTooBig(body: BitArray) 694 + /// Status code: 1010 695 + MissingExtensions(body: BitArray) 696 + /// Status code: 1011 697 + UnexpectedCondition(body: BitArray) 698 + /// Use [`close_custom`](#close_custom) to send a custom close code. 699 + Custom(CustomCloseReason) 700 + } 701 + 702 + /// Use [`close_custom`](#close_custom) to send a custom close code. 703 + pub opaque type CustomCloseReason { 704 + CustomCloseReason(code: Int, body: BitArray) 705 + } 706 + 707 + fn convert_close_reason(reason: CloseReason) -> websocket.CloseReason { 708 + case reason { 709 + NotProvided -> websocket.NotProvided 710 + GoingAway(body:) -> websocket.GoingAway(body:) 711 + InconsistentDataType(body:) -> websocket.InconsistentDataType(body:) 712 + MessageTooBig(body:) -> websocket.MessageTooBig(body:) 713 + MissingExtensions(body:) -> websocket.MissingExtensions(body:) 714 + Normal(body:) -> websocket.Normal(body:) 715 + PolicyViolation(body:) -> websocket.PolicyViolation(body:) 716 + ProtocolError(body:) -> websocket.ProtocolError(body:) 717 + UnexpectedCondition(body:) -> websocket.UnexpectedCondition(body:) 718 + UnexpectedDataType(body:) -> websocket.UnexpectedDataType(body:) 719 + Custom(CustomCloseReason(code:, body:)) -> 720 + websocket.CustomCloseReason(code:, body:) 721 + } 722 + } 723 + 724 + /// Closes the connection with a custom close code between 1000 and 4999. 725 + pub fn close_custom( 726 + conn: Connection, 727 + code code: Int, 728 + body body: BitArray, 729 + ) -> Result(Nil, CustomCloseError) { 730 + use <- bool.guard( 731 + when: code >= 5000 || code < 1000, 732 + return: Error(InvalidCode), 733 + ) 734 + 735 + close(conn, Custom(CustomCloseReason(code:, body:))) 736 + |> result.map_error(SocketFail) 737 + } 738 + 739 + pub fn close( 740 + conn: Connection, 741 + because reason: CloseReason, 742 + ) -> Result(Nil, SocketReason) { 743 + let reason = convert_close_reason(reason) 744 + let mask = crypto.strong_random_bytes(4) 745 + let frame = websocket.encode_close_frame(reason, Some(mask)) 746 + 747 + transport.send(conn.transport, conn.socket, frame) 748 + |> result.map_error(convert_socket_reason) 749 + } 750 + 751 + fn make_upgrade(req: Request(String)) -> BytesTree { 752 + let user_headers = case req.headers { 753 + [] -> "" 754 + _ -> 755 + req.headers 756 + |> list.filter(fn(pair) { 757 + let #(key, _value) = pair 758 + key != "host" 759 + && key != "upgrade" 760 + && key != "connection" 761 + && key != "sec-websocket-key" 762 + && key != "sec-websocket-version" 763 + }) 764 + |> list.map(fn(pair) { 765 + let #(key, value) = pair 766 + key <> ": " <> value 767 + }) 768 + |> string.join("\r\n") 769 + |> string.append("\r\n") 770 + } 771 + 772 + let path = case req.path { 773 + "" -> "/" 774 + path -> path 775 + } 776 + 777 + let query = 778 + req 779 + |> request.get_query 780 + |> result.map(uri.query_to_string) 781 + |> fn(str) { 782 + case str { 783 + Ok("") -> "" 784 + Ok(str) -> "?" <> str 785 + _ -> "" 786 + } 787 + } 788 + 789 + let port = 790 + req.port 791 + |> option.map(fn(port) { ":" <> int.to_string(port) }) 792 + |> option.unwrap("") 793 + 794 + bytes_tree.new() 795 + |> bytes_tree.append_string("GET " <> path <> query <> " HTTP/1.1\r\n") 796 + |> bytes_tree.append_string("host: " <> req.host <> port <> "\r\n") 797 + |> bytes_tree.append_string("upgrade: websocket\r\n") 798 + |> bytes_tree.append_string("connection: upgrade\r\n") 799 + |> bytes_tree.append_string( 800 + "sec-websocket-key: " <> websocket.make_client_key() <> "\r\n", 801 + ) 802 + |> bytes_tree.append_string("sec-websocket-version: 13\r\n") 803 + |> bytes_tree.append_string( 804 + "sec-websocket-extensions: permessage-deflate\r\n", 805 + ) 806 + |> bytes_tree.append_string(user_headers) 807 + |> bytes_tree.append_string("\r\n") 808 + } 809 + 810 + pub type HandshakeError { 811 + Sock(SocketReason) 812 + Protocol(BitArray) 813 + UpgradeFailed(Response(BitArray)) 814 + } 815 + 816 + type HandshakeResponse { 817 + HandshakeResponse( 818 + socket: Socket, 819 + response: Response(BitArray), 820 + buffer: BitArray, 821 + ) 822 + } 823 + 824 + fn perform_handshake( 825 + req: Request(String), 826 + transport: Transport, 827 + timeout: Int, 828 + ) -> Result(HandshakeResponse, HandshakeError) { 829 + let certs = case req.scheme { 830 + Https -> { 831 + let assert Ok(_ok) = ssl.start() 832 + [Cacerts(socket.get_certs()), socket.get_custom_matcher()] 833 + } 834 + Http -> [] 835 + } 836 + 837 + let opts = 838 + socket.convert_options( 839 + list.append(socket.default_options, [Receive(Pull), ..certs]), 840 + ) 841 + 842 + let port = 843 + option.lazy_unwrap(req.port, fn() { 844 + case transport { 845 + Ssl -> 443 846 + Tcp -> 80 847 + } 848 + }) 849 + 850 + logging.log( 851 + logging.Debug, 852 + "Making request to " <> req.host <> " at " <> int.to_string(port), 853 + ) 854 + 855 + use socket <- result.try( 856 + result.map_error( 857 + transport.connect( 858 + transport, 859 + charlist.from_string(req.host), 860 + port, 861 + opts, 862 + timeout, 863 + ), 864 + fn(err) { Sock(convert_socket_reason(err)) }, 865 + ), 866 + ) 867 + 868 + let upgrade_req = make_upgrade(req) 869 + 870 + use _nil <- result.try( 871 + result.map_error(transport.send(transport, socket, upgrade_req), fn(err) { 872 + Sock(convert_socket_reason(err)) 873 + }), 874 + ) 875 + 876 + logging.log( 877 + logging.Debug, 878 + "Sent upgrade request, waiting " <> int.to_string(timeout), 879 + ) 880 + 881 + use resp <- result.try( 882 + result.map_error( 883 + transport.receive_timeout(transport, socket, 0, timeout), 884 + fn(err) { Sock(convert_socket_reason(err)) }, 885 + ), 886 + ) 887 + 888 + resp 889 + |> gramps_http.read_response 890 + |> result.map_error(fn(_err) { Protocol(resp) }) 891 + |> result.try(fn(pair) { 892 + let #(resp, body) = pair 893 + let body_size = 894 + resp.headers 895 + |> list.key_find("content-length") 896 + |> result.try(int.parse) 897 + |> result.unwrap(0) 898 + case read_body(transport, socket, timeout, body_size, body) { 899 + Ok(#(body, rest)) -> { 900 + Ok(#(response.set_body(resp, body), rest)) 901 + } 902 + Error(reason) -> Error(Sock(reason)) 903 + } 904 + }) 905 + |> result.try(fn(pair) { 906 + let #(resp, buffer) = pair 907 + case resp.status { 908 + 101 -> Ok(HandshakeResponse(socket:, response: resp, buffer:)) 909 + _ -> Error(UpgradeFailed(resp)) 910 + } 911 + }) 912 + } 913 + 914 + fn read_body( 915 + transport: Transport, 916 + socket: Socket, 917 + timeout: Int, 918 + length: Int, 919 + body: BitArray, 920 + ) -> Result(#(BitArray, BitArray), SocketReason) { 921 + case body { 922 + <<data:bytes-size(length), rest:bits>> -> Ok(#(data, rest)) 923 + _ -> { 924 + case transport.receive_timeout(transport, socket, 0, timeout) { 925 + Ok(data) -> { 926 + read_body(transport, socket, timeout, length, <<body:bits, data:bits>>) 927 + } 928 + Error(reason) -> Error(convert_socket_reason(reason)) 929 + } 930 + } 931 + } 932 + } 933 + 934 + fn close_contexts(contexts: Option(compression.Compression)) -> Nil { 935 + case contexts { 936 + Some(compression) -> { 937 + compression.close(compression.deflate) 938 + compression.close(compression.inflate) 939 + Nil 940 + } 941 + _ -> Nil 942 + } 943 + }
+206 -30
src/goose.gleam
··· 1 import gleam/dynamic.{type Dynamic} 2 import gleam/dynamic/decode 3 - import gleam/erlang/process.{type Pid} 4 - import gleam/io 5 import gleam/json 6 import gleam/list 7 import gleam/option.{type Option} 8 import gleam/string 9 10 /// Jetstream event types 11 pub type JetstreamEvent { ··· 47 ) 48 } 49 50 /// Create a default configuration for US East endpoint 51 pub fn default_config() -> JetstreamConfig { 52 JetstreamConfig( 53 endpoint: "wss://jetstream2.us-east.bsky.network/subscribe", ··· 119 } 120 } 121 122 - /// Connect to Jetstream WebSocket using Erlang gun library 123 - @external(erlang, "goose_ws_ffi", "connect") 124 - pub fn connect( 125 - url: String, 126 - handler_pid: Pid, 127 - compress: Bool, 128 - ) -> Result(Pid, Dynamic) 129 130 - /// Start consuming the Jetstream feed 131 pub fn start_consumer( 132 config: JetstreamConfig, 133 on_event: fn(String) -> Nil, 134 ) -> Nil { 135 let url = build_url(config) 136 - let self = process.self() 137 - let result = connect(url, self, config.compress) 138 139 case result { 140 - Ok(_conn_pid) -> { 141 - receive_loop(on_event) 142 } 143 - Error(err) -> { 144 - io.println("Failed to connect to Jetstream") 145 - io.println_error(string.inspect(err)) 146 } 147 } 148 } 149 150 - /// Receive loop for WebSocket messages 151 - fn receive_loop(on_event: fn(String) -> Nil) -> Nil { 152 - // Call Erlang to receive one message 153 - case receive_ws_message() { 154 - Ok(text) -> { 155 - on_event(text) 156 - receive_loop(on_event) 157 } 158 - Error(_) -> { 159 - // Timeout or error, continue loop 160 - receive_loop(on_event) 161 } 162 } 163 } 164 165 - /// Receive a WebSocket message from the message queue 166 - @external(erlang, "goose_ffi", "receive_ws_message") 167 - fn receive_ws_message() -> Result(String, Nil) 168 169 /// Parse a JSON event string into a JetstreamEvent 170 pub fn parse_event(json_string: String) -> JetstreamEvent {
··· 1 + import gleam/bit_array 2 import gleam/dynamic.{type Dynamic} 3 import gleam/dynamic/decode 4 + import gleam/erlang/process 5 + import gleam/http/request 6 import gleam/json 7 import gleam/list 8 import gleam/option.{type Option} 9 + import gleam/result 10 import gleam/string 11 + import goose/internal/zstd 12 + import goose/stratus 13 + import simplifile 14 15 /// Jetstream event types 16 pub type JetstreamEvent { ··· 52 ) 53 } 54 55 + /// Internal state for WebSocket connection 56 + type ConnectionState { 57 + ConnectionState( 58 + config: JetstreamConfig, 59 + on_event: fn(String) -> Nil, 60 + decompressor: Option(Decompressor), 61 + ) 62 + } 63 + 64 + /// Decompression context holder 65 + type Decompressor { 66 + Decompressor(dctx: zstd.DecompressionContext, ddict: zstd.DecompressionDict) 67 + } 68 + 69 /// Create a default configuration for US East endpoint 70 + /// Includes automatic retry with exponential backoff (1s, 2s, 4s, 8s, 16s, 32s, capped at 60s) 71 pub fn default_config() -> JetstreamConfig { 72 JetstreamConfig( 73 endpoint: "wss://jetstream2.us-east.bsky.network/subscribe", ··· 139 } 140 } 141 142 + /// Load the zstd decompression dictionary 143 + fn load_decompressor() -> Result(Decompressor, String) { 144 + // Get priv directory 145 + let priv_dir = get_priv_dir() 146 + let dict_path = priv_dir <> "/zstd_dictionary" 147 + 148 + // Read dictionary file 149 + use dict_data <- result.try( 150 + simplifile.read_bits(dict_path) 151 + |> result.map_error(fn(_) { 152 + "Failed to load zstd dictionary from " <> dict_path 153 + }), 154 + ) 155 + 156 + // Create decompression context and dictionary 157 + let dctx = zstd.create_decompression_context(1024 * 1024) 158 + let ddict = zstd.create_ddict(dict_data) 159 + 160 + // Select dictionary for context 161 + use _ <- result.try(zstd.select_ddict(dctx, ddict)) 162 + 163 + Ok(Decompressor(dctx: dctx, ddict: ddict)) 164 + } 165 166 + /// Get the priv directory path for this application 167 + @external(erlang, "goose_ffi", "priv_dir") 168 + fn get_priv_dir() -> String 169 + 170 + /// Decompress zstd-compressed data 171 + fn decompress_data( 172 + data: BitArray, 173 + decompressor: Decompressor, 174 + ) -> Result(String, String) { 175 + // Try decompress_using_ddict first (works for frames with content size) 176 + case zstd.decompress_using_ddict(data, decompressor.ddict) { 177 + Ok(decompressed) -> 178 + bit_array.to_string(decompressed) 179 + |> result.replace_error("Failed to decode decompressed data as UTF-8") 180 + Error(msg) -> { 181 + // Check if error is due to unknown content size 182 + case string.contains(msg, "ZSTD_CONTENTSIZE_UNKNOWN") { 183 + True -> { 184 + // Frame doesn't have content size, use streaming with dictionary-loaded context 185 + case zstd.decompress_streaming(decompressor.dctx, data) { 186 + Ok(decompressed) -> 187 + bit_array.to_string(decompressed) 188 + |> result.replace_error( 189 + "Failed to decode streaming decompressed data as UTF-8", 190 + ) 191 + Error(_stream_err) -> Error("Streaming decompression failed") 192 + } 193 + } 194 + False -> Error("Decompression failed") 195 + } 196 + } 197 + } 198 + } 199 + 200 + /// Start consuming the Jetstream feed with automatic retry logic 201 + /// 202 + /// Handles connection failures gracefully with exponential backoff and automatic reconnection. 203 + /// The retry behavior is configured through the JetstreamConfig fields: 204 + /// - max_backoff_seconds: Maximum wait time between retries 205 + /// - log_connection_events: Log connects/disconnects 206 + /// - log_retry_attempts: Log retry attempts and errors 207 + /// 208 + /// Example: 209 + /// ```gleam 210 + /// let config = goose.default_config() 211 + /// 212 + /// goose.start_consumer(config, fn(event_json) { 213 + /// // Handle event 214 + /// io.println(event_json) 215 + /// }) 216 + /// ``` 217 pub fn start_consumer( 218 config: JetstreamConfig, 219 on_event: fn(String) -> Nil, 220 ) -> Nil { 221 + start_with_retry_internal(config, on_event, 0) 222 + } 223 + 224 + /// Internal function to handle connection with retry 225 + fn start_with_retry_internal( 226 + config: JetstreamConfig, 227 + on_event: fn(String) -> Nil, 228 + retry_count: Int, 229 + ) -> Nil { 230 let url = build_url(config) 231 + 232 + // Convert wss:// to https:// and ws:// to http:// for request parsing 233 + let http_url = 234 + url 235 + |> string.replace("wss://", "https://") 236 + |> string.replace("ws://", "http://") 237 + 238 + // Parse URL into HTTP request 239 + let assert Ok(req) = request.to(http_url) 240 + 241 + // Load decompressor if compression is enabled 242 + let decompressor = case config.compress { 243 + True -> 244 + case load_decompressor() { 245 + Ok(dec) -> option.Some(dec) 246 + Error(_err) -> option.None 247 + } 248 + False -> option.None 249 + } 250 + 251 + // Create initial state 252 + let state = 253 + ConnectionState( 254 + config: config, 255 + on_event: on_event, 256 + decompressor: decompressor, 257 + ) 258 + 259 + // Start WebSocket connection 260 + let result = 261 + stratus.new_with_initialiser(request: req, init: fn() { 262 + Ok(stratus.initialised(state)) 263 + }) 264 + |> stratus.on_message(handle_message) 265 + |> stratus.on_close(handle_close) 266 + |> stratus.with_connect_timeout(30_000) 267 + |> stratus.start() 268 269 case result { 270 + Ok(_websocket) -> { 271 + // Keep process alive indefinitely 272 + process.sleep_forever() 273 } 274 + Error(_err) -> { 275 + // Connection failed, calculate backoff and retry 276 + let backoff_seconds = calculate_backoff(retry_count) 277 + 278 + // Sleep for backoff period 279 + process.sleep(backoff_seconds * 1000) 280 + 281 + // Retry connection 282 + start_with_retry_internal(config, on_event, retry_count + 1) 283 } 284 } 285 } 286 287 + /// Handle incoming WebSocket messages 288 + fn handle_message( 289 + state: ConnectionState, 290 + msg: stratus.Message(Nil), 291 + _conn: stratus.Connection, 292 + ) -> stratus.Next(ConnectionState, Nil) { 293 + case msg { 294 + stratus.Text(text) -> { 295 + state.on_event(text) 296 + stratus.continue(state) 297 + } 298 + stratus.Binary(data) -> { 299 + // Handle compressed binary data if decompressor is available 300 + case state.decompressor { 301 + option.Some(decompressor) -> { 302 + case decompress_data(data, decompressor) { 303 + Ok(text) -> { 304 + state.on_event(text) 305 + stratus.continue(state) 306 + } 307 + Error(_err) -> { 308 + // Skip frames that fail to decompress 309 + stratus.continue(state) 310 + } 311 + } 312 + } 313 + option.None -> { 314 + // No decompressor, ignore binary messages 315 + stratus.continue(state) 316 + } 317 + } 318 } 319 + stratus.User(_) -> { 320 + // No custom user messages in this implementation 321 + stratus.continue(state) 322 } 323 } 324 } 325 326 + /// Handle WebSocket connection close 327 + fn handle_close(state: ConnectionState) -> Nil { 328 + // Reconnect from the beginning 329 + start_with_retry_internal(state.config, state.on_event, 0) 330 + } 331 + 332 + /// Calculate exponential backoff capped at 60 seconds 333 + fn calculate_backoff(retry_count: Int) -> Int { 334 + case retry_count { 335 + 0 -> 1 336 + 1 -> 2 337 + 2 -> 4 338 + 3 -> 8 339 + 4 -> 16 340 + 5 -> 32 341 + _ -> 60 342 + } 343 + } 344 345 /// Parse a JSON event string into a JetstreamEvent 346 pub fn parse_event(json_string: String) -> JetstreamEvent {
+43 -20
src/goose_ffi.erl
··· 1 -module(goose_ffi). 2 - -export([receive_ws_message/0]). 3 4 - %% Receive a WebSocket text message from the process mailbox 5 - receive_ws_message() -> 6 - receive 7 - %% Handle messages forwarded from handler process 8 - {ws_text, Text} -> 9 - {ok, Text}; 10 - {ws_binary, _Binary} -> 11 - %% Ignore binary messages, try again 12 - receive_ws_message(); 13 - {ws_closed, _Reason} -> 14 - {error, nil}; 15 - {ws_error, _Reason} -> 16 - {error, nil}; 17 - _Other -> 18 - %% Ignore unexpected messages 19 - receive_ws_message() 20 - after 60000 -> 21 - %% Timeout - return error to continue loop 22 - {error, nil} 23 end.
··· 1 -module(goose_ffi). 2 + -export([priv_dir/0, decompress_using_ddict_safe/2, decompress_streaming_safe/2]). 3 4 + %% Get the priv directory for the goose application 5 + priv_dir() -> 6 + case code:priv_dir(goose) of 7 + {error, _} -> "./priv"; 8 + Path when is_list(Path) -> list_to_binary(Path); 9 + Path -> Path 10 + end. 11 + 12 + %% Safe wrapper for ezstd:decompress_using_ddict that always returns Result type 13 + decompress_using_ddict_safe(Data, DDict) -> 14 + case ezstd:decompress_using_ddict(Data, DDict) of 15 + Result when is_binary(Result) -> 16 + {ok, Result}; 17 + Result when is_list(Result) -> 18 + {ok, iolist_to_binary(Result)}; 19 + {error, Err} when is_binary(Err) -> 20 + %% Keep error as binary (Gleam String) 21 + {error, Err}; 22 + {error, Err} when is_list(Err) -> 23 + %% Convert list to binary 24 + {error, list_to_binary(Err)}; 25 + {error, Err} -> 26 + %% Convert any other type to binary 27 + {error, list_to_binary(lists:flatten(io_lib:format("~p", [Err])))} 28 + end. 29 + 30 + %% Safe wrapper for ezstd:decompress_streaming that always returns Result type 31 + decompress_streaming_safe(DCtx, Data) -> 32 + case ezstd:decompress_streaming(DCtx, Data) of 33 + Result when is_binary(Result) -> 34 + {ok, Result}; 35 + Result when is_list(Result) -> 36 + {ok, iolist_to_binary(Result)}; 37 + {error, Err} when is_binary(Err) -> 38 + %% Keep error as binary (Gleam String) 39 + {error, Err}; 40 + {error, Err} when is_list(Err) -> 41 + %% Convert list to binary 42 + {error, list_to_binary(Err)}; 43 + {error, Err} -> 44 + %% Convert any other type to binary 45 + {error, list_to_binary(lists:flatten(io_lib:format("~p", [Err])))} 46 end.
-202
src/goose_ws_ffi.erl
··· 1 - -module(goose_ws_ffi). 2 - -export([connect/3]). 3 - 4 - %% Connect to WebSocket using gun 5 - connect(Url, HandlerPid, Compress) -> 6 - %% Start gun application and dependencies 7 - application:ensure_all_started(ssl), 8 - application:ensure_all_started(gun), 9 - 10 - %% Start ezstd if compression is enabled 11 - case Compress of 12 - true -> application:ensure_all_started(ezstd); 13 - _ -> ok 14 - end, 15 - 16 - %% Spawn a connection process that will own the gun connection 17 - Parent = self(), 18 - spawn(fun() -> connect_worker(Url, HandlerPid, Compress, Parent) end), 19 - 20 - %% Wait for the connection result 21 - receive 22 - {connection_result, Result} -> Result 23 - after 60000 -> 24 - {error, connection_timeout} 25 - end. 26 - 27 - %% Worker process that owns the connection 28 - connect_worker(Url, HandlerPid, Compress, Parent) -> 29 - %% Parse URL using uri_string 30 - UriMap = uri_string:parse(Url), 31 - #{scheme := SchemeStr, host := Host, path := Path} = UriMap, 32 - 33 - %% Get query string if present and append to path 34 - Query = maps:get(query, UriMap, undefined), 35 - PathWithQuery = case Query of 36 - undefined -> Path; 37 - <<>> -> Path; 38 - Q -> <<Path/binary, "?", Q/binary>> 39 - end, 40 - 41 - %% Get port, use defaults if not specified 42 - Port = maps:get(port, uri_string:parse(Url), 43 - case SchemeStr of 44 - <<"wss">> -> 443; 45 - <<"ws">> -> 80; 46 - _ -> 443 47 - end), 48 - 49 - %% Determine transport 50 - Transport = case SchemeStr of 51 - <<"wss">> -> tls; 52 - <<"ws">> -> tcp; 53 - _ -> tls 54 - end, 55 - 56 - %% TLS options for secure connections 57 - TlsOpts = [{verify, verify_none}], %% For simplicity, disable cert verification 58 - %% In production, use proper CA certs 59 - 60 - %% Connection options 61 - Opts = case Transport of 62 - tls -> 63 - #{ 64 - transport => tls, 65 - tls_opts => TlsOpts, 66 - protocols => [http], 67 - retry => 10, 68 - retry_timeout => 1000 69 - }; 70 - tcp -> 71 - #{ 72 - transport => tcp, 73 - protocols => [http], 74 - retry => 10, 75 - retry_timeout => 1000 76 - } 77 - end, 78 - 79 - %% Convert host to list if needed 80 - HostStr = case is_binary(Host) of 81 - true -> binary_to_list(Host); 82 - false -> Host 83 - end, 84 - 85 - %% Ensure path with query is binary 86 - PathBin = case is_binary(PathWithQuery) of 87 - true -> PathWithQuery; 88 - false -> list_to_binary(PathWithQuery) 89 - end, 90 - 91 - %% Open connection (this process will be the owner) 92 - case gun:open(HostStr, Port, Opts) of 93 - {ok, ConnPid} -> 94 - %% Monitor the connection 95 - MRef = monitor(process, ConnPid), 96 - 97 - %% Wait for connection 98 - receive 99 - {gun_up, ConnPid, _Protocol} -> 100 - %% Upgrade to WebSocket (compression is controlled via query string, not headers) 101 - StreamRef = gun:ws_upgrade(ConnPid, binary_to_list(PathBin), []), 102 - 103 - %% Wait for upgrade 104 - receive 105 - {gun_upgrade, ConnPid, StreamRef, [<<"websocket">>], _ResponseHeaders} -> 106 - %% Notify parent that connection is ready 107 - Parent ! {connection_result, {ok, ConnPid}}, 108 - %% Now handle messages in this process (the connection owner) 109 - handle_messages(ConnPid, StreamRef, HandlerPid, Compress); 110 - {gun_response, ConnPid, _, _, Status, Headers} -> 111 - gun:close(ConnPid), 112 - Parent ! {connection_result, {error, {upgrade_failed, Status, Headers}}}; 113 - {gun_error, ConnPid, _StreamRef, Reason} -> 114 - gun:close(ConnPid), 115 - Parent ! {connection_result, {error, {gun_error, Reason}}}; 116 - {'DOWN', MRef, process, ConnPid, Reason} -> 117 - Parent ! {connection_result, {error, {connection_down, Reason}}}; 118 - _Other -> 119 - gun:close(ConnPid), 120 - Parent ! {connection_result, {error, unexpected_message}} 121 - after 30000 -> 122 - gun:close(ConnPid), 123 - Parent ! {connection_result, {error, upgrade_timeout}} 124 - end; 125 - {'DOWN', MRef, process, ConnPid, Reason} -> 126 - Parent ! {connection_result, {error, {connection_failed, Reason}}}; 127 - _Other -> 128 - gun:close(ConnPid), 129 - Parent ! {connection_result, {error, unexpected_message}} 130 - after 30000 -> 131 - gun:close(ConnPid), 132 - Parent ! {connection_result, {error, connection_timeout}} 133 - end; 134 - {error, Reason} -> 135 - Parent ! {connection_result, {error, {open_failed, Reason}}} 136 - end. 137 - 138 - %% Handle incoming WebSocket messages 139 - handle_messages(ConnPid, StreamRef, HandlerPid, Compress) -> 140 - %% Load zstd dictionary if compression is enabled 141 - Decompressor = case Compress of 142 - true -> 143 - %% Load dictionary from priv directory 144 - PrivDir = code:priv_dir(goose), 145 - DictPath = filename:join(PrivDir, "zstd_dictionary"), 146 - case file:read_file(DictPath) of 147 - {ok, DictData} -> 148 - %% Create decompression dictionary (returns reference directly) 149 - DDict = ezstd:create_ddict(DictData), 150 - {ok, DDict}; 151 - {error, Err} -> 152 - io:format("Failed to read zstd dictionary: ~p~n", [Err]), 153 - {error, Err} 154 - end; 155 - _ -> 156 - none 157 - end, 158 - handle_messages_loop(ConnPid, StreamRef, HandlerPid, Compress, Decompressor). 159 - 160 - %% Message handling loop 161 - handle_messages_loop(ConnPid, StreamRef, HandlerPid, Compress, Decompressor) -> 162 - receive 163 - {gun_ws, _AnyConnPid, _AnyStreamRef, {text, Text}} -> 164 - HandlerPid ! {ws_text, Text}, 165 - handle_messages_loop(ConnPid, StreamRef, HandlerPid, Compress, Decompressor); 166 - {gun_ws, _AnyConnPid, _AnyStreamRef, {binary, Binary}} -> 167 - %% If compression is enabled, decompress the binary data 168 - case {Compress, Decompressor} of 169 - {true, {ok, DDict}} -> 170 - try 171 - %% decompress_using_ddict returns the decompressed data directly 172 - Decompressed = ezstd:decompress_using_ddict(Binary, DDict), 173 - %% Ensure it's treated as a binary (not iolist) 174 - DecompressedBin = iolist_to_binary([Decompressed]), 175 - HandlerPid ! {ws_text, DecompressedBin} 176 - catch 177 - Error:Reason:_Stacktrace -> 178 - io:format("Decompression failed: ~p:~p~n", [Error, Reason]) 179 - end; 180 - _ -> 181 - %% No compression, ignore binary messages 182 - ok 183 - end, 184 - handle_messages_loop(ConnPid, StreamRef, HandlerPid, Compress, Decompressor); 185 - {gun_ws, ConnPid, StreamRef, close} -> 186 - HandlerPid ! {ws_closed, normal}, 187 - gun:close(ConnPid); 188 - {gun_down, ConnPid, _Protocol, Reason, _KilledStreams} -> 189 - HandlerPid ! {ws_error, Reason}, 190 - gun:close(ConnPid); 191 - {gun_error, ConnPid, StreamRef, Reason} -> 192 - HandlerPid ! {ws_error, Reason}, 193 - handle_messages_loop(ConnPid, StreamRef, HandlerPid, Compress, Decompressor); 194 - stop -> 195 - gun:close(ConnPid); 196 - _Other -> 197 - %% Ignore unexpected messages 198 - handle_messages_loop(ConnPid, StreamRef, HandlerPid, Compress, Decompressor) 199 - after 30000 -> 200 - %% Heartbeat every 30 seconds to keep connection alive 201 - handle_messages_loop(ConnPid, StreamRef, HandlerPid, Compress, Decompressor) 202 - end.
···
+116
src/stratus_ffi.erl
···
··· 1 + -module(stratus_ffi). 2 + 3 + -export([tcp_shutdown/2, tcp_send/2, ssl_shutdown/2, ssl_send/2, tcp_set_opts/2, 4 + ssl_set_opts/2, ssl_start/0, custom_sni_matcher/0, 5 + ssl_controlling_process/2, tcp_controlling_process/2, 6 + parse_known_socket_reason/1 ]). 7 + 8 + tcp_shutdown(Socket, How) -> 9 + case gen_tcp:shutdown(Socket, How) of 10 + ok -> 11 + {ok, nil}; 12 + {error, Reason} -> 13 + {error, Reason} 14 + end. 15 + 16 + tcp_send(Socket, Packet) -> 17 + case gen_tcp:send(Socket, Packet) of 18 + ok -> 19 + {ok, nil}; 20 + {error, Reason} -> 21 + {error, Reason} 22 + end. 23 + 24 + ssl_shutdown(Socket, How) -> 25 + case ssl:shutdown(Socket, How) of 26 + ok -> 27 + {ok, nil}; 28 + {error, Reason} -> 29 + {error, Reason} 30 + end. 31 + 32 + ssl_send(Socket, Packet) -> 33 + case ssl:send(Socket, Packet) of 34 + ok -> 35 + {ok, nil}; 36 + {error, Reason} -> 37 + {error, Reason} 38 + end. 39 + 40 + tcp_set_opts(Socket, Opts) -> 41 + case inet:setopts(Socket, Opts) of 42 + ok -> 43 + {ok, nil}; 44 + {error, Reason} -> 45 + {error, Reason} 46 + end. 47 + 48 + ssl_set_opts(Socket, Opts) -> 49 + case ssl:setopts(Socket, Opts) of 50 + ok -> 51 + {ok, nil}; 52 + {error, Reason} -> 53 + {error, Reason} 54 + end. 55 + 56 + ssl_start() -> 57 + case ssl:start() of 58 + ok -> 59 + {ok, nil}; 60 + {error, Reason} -> 61 + {error, Reason} 62 + end. 63 + 64 + % Thank you! https://github.com/erlang/otp/issues/4321 65 + custom_sni_matcher() -> 66 + {customize_hostname_check, 67 + [{match_fun, public_key:pkix_verify_hostname_match_fun(https)}]}. 68 + 69 + ssl_controlling_process(Socket, NewOwner) -> 70 + case ssl:controlling_process(Socket, NewOwner) of 71 + ok -> {ok, nil}; 72 + Error -> Error 73 + end. 74 + 75 + tcp_controlling_process(Socket, NewOwner) -> 76 + case gen_tcp:controlling_process(Socket, NewOwner) of 77 + ok -> {ok, nil}; 78 + Error -> Error 79 + end. 80 + 81 + parse_known_socket_reason(Reason) -> 82 + case Reason of 83 + closed -> {ok, closed}; 84 + timeout -> {ok, timeout}; 85 + badarg -> {ok, badarg}; 86 + terminated -> {ok, terminated}; 87 + eaddrinuse -> {ok, eaddrinuse}; 88 + eaddrnotavail -> {ok, eaddrnotavail}; 89 + eafnosupport -> {ok, eafnosupport}; 90 + ealready -> {ok, ealready}; 91 + econnaborted -> {ok, econnaborted}; 92 + econnrefused -> {ok, econnrefused}; 93 + econnreset -> {ok, econnreset}; 94 + edestaddrreq -> {ok, edestaddrreq}; 95 + ehostdown -> {ok, ehostdown}; 96 + ehostunreach -> {ok, ehostunreach}; 97 + einprogress -> {ok, einprogress}; 98 + eisconn -> {ok, eisconn}; 99 + emsgsize -> {ok, emsgsize}; 100 + enetdown -> {ok, enetdown}; 101 + enetunreach -> {ok, enetunreach}; 102 + enopkg -> {ok, enopkg}; 103 + enoprotoopt -> {ok, enoprotoopt}; 104 + enotconn -> {ok, enotconn}; 105 + enotty -> {ok, enotty}; 106 + enotsock -> {ok, enotsock}; 107 + eproto -> {ok, eproto}; 108 + eprotonosupport -> {ok, eprotonosupport}; 109 + eprototype -> {ok, eprototype}; 110 + esocktnosupport -> {ok, esocktnosupport}; 111 + etimedout -> {ok, etimedout}; 112 + ewouldblock -> {ok, ewouldblock}; 113 + exbadport -> {ok, exbadport}; 114 + exbadseq -> {ok, exbadseq}; 115 + Unknown -> {error, Unknown} 116 + end.