A Gleam WebSocket consumer for AT Protocol Jetstream events.

initial commit

+4
.gitignore
··· 1 + *.beam 2 + *.ez 3 + **/build 4 + erl_crash.dump
+160
README.md
··· 1 + # goose 2 + 3 + [![Package Version](https://img.shields.io/hexpm/v/goose)](https://hex.pm/packages/goose) 4 + [![Hex Docs](https://img.shields.io/badge/hex-docs-ffaff3)](https://hexdocs.pm/goose/) 5 + 6 + A Gleam WebSocket consumer for AT Protocol Jetstream events. 7 + 8 + ```sh 9 + gleam add goose@1 10 + ``` 11 + 12 + ## Example 13 + 14 + ```gleam 15 + import goose 16 + import gleam/io 17 + import gleam/option 18 + 19 + pub fn main() { 20 + // Create a default configuration 21 + let config = goose.default_config() 22 + 23 + // Or configure with custom options 24 + let config = goose.JetstreamConfig( 25 + endpoint: "wss://jetstream2.us-east.bsky.network/subscribe", 26 + wanted_collections: ["app.bsky.feed.post", "app.bsky.feed.like"], 27 + wanted_dids: [], 28 + cursor: option.None, 29 + max_message_size_bytes: option.None, 30 + compress: False, 31 + require_hello: False, 32 + ) 33 + 34 + // Start consuming events 35 + goose.start_consumer(config, fn(json_event) { 36 + let event = goose.parse_event(json_event) 37 + 38 + case event { 39 + goose.CommitEvent(did, time_us, commit) -> { 40 + io.println("Commit: " <> commit.operation <> " in " <> commit.collection) 41 + } 42 + goose.IdentityEvent(did, time_us, identity) -> { 43 + io.println("Identity: " <> identity.handle) 44 + } 45 + goose.AccountEvent(did, time_us, account) -> { 46 + io.println("Account updated: " <> did) 47 + } 48 + goose.UnknownEvent(_) -> { 49 + io.println("Unknown event type") 50 + } 51 + } 52 + }) 53 + } 54 + ``` 55 + 56 + ## Configuration Options 57 + 58 + ### `wanted_collections` 59 + An array of Collection NSIDs to filter which records you receive (default: empty = all collections) 60 + 61 + - Supports NSID path prefixes like `app.bsky.graph.*` or `app.bsky.*` 62 + - The prefix before `.*` must pass NSID validation 63 + - Incomplete prefixes like `app.bsky.graph.fo*` are not supported 64 + - Account and Identity events are always received regardless of this filter 65 + - Maximum 100 collections/prefixes 66 + 67 + **Example:** 68 + ```gleam 69 + wanted_collections: ["app.bsky.feed.post", "app.bsky.graph.*"] 70 + ``` 71 + 72 + ### `wanted_dids` 73 + An array of Repo DIDs to filter which records you receive (default: empty = all repos) 74 + 75 + - Maximum 10,000 DIDs 76 + 77 + **Example:** 78 + ```gleam 79 + wanted_dids: ["did:plc:example123", "did:plc:example456"] 80 + ``` 81 + 82 + ### `cursor` 83 + A unix microseconds timestamp to begin playback from 84 + 85 + - Absent cursor or future timestamp results in live-tail operation 86 + - When reconnecting, use `time_us` from your most recently processed event 87 + - Consider subtracting a few seconds as a buffer to ensure gapless playback 88 + 89 + **Example:** 90 + ```gleam 91 + cursor: option.Some(1234567890123456) 92 + ``` 93 + 94 + ### `max_message_size_bytes` 95 + The maximum size of a payload that this client would like to receive 96 + 97 + - Zero means no limit 98 + - Negative values are treated as zero 99 + - Default: 0 (no maximum size) 100 + 101 + **Example:** 102 + ```gleam 103 + max_message_size_bytes: option.Some(1048576) // 1MB limit 104 + ``` 105 + 106 + ### `compress` 107 + Enable zstd compression for WebSocket frames 108 + 109 + - Set to `True` to enable compression 110 + - Default: `False` 111 + - Uses zstandard compression with Jetstream's custom dictionary 112 + - Reduces bandwidth by approximately 50% 113 + - Messages are automatically decompressed before reaching your callback 114 + - Requires the `ezstd` library (automatically handled as a dependency) 115 + 116 + **Example:** 117 + ```gleam 118 + compress: True 119 + ``` 120 + 121 + **Note:** Compression is transparent to your application - compressed messages are automatically decompressed before being passed to your event handler. The bandwidth savings occur on the wire between the server and your client. 122 + 123 + ### `require_hello` 124 + Pause replay/live-tail until server receives a `SubscriberOptionsUpdatePayload` 125 + 126 + - Set to `True` to require initial handshake 127 + - Default: `False` 128 + 129 + **Example:** 130 + ```gleam 131 + require_hello: True 132 + ``` 133 + 134 + ## Full Configuration Example 135 + 136 + ```gleam 137 + import goose 138 + import gleam/option 139 + 140 + let config = goose.JetstreamConfig( 141 + endpoint: "wss://jetstream2.us-east.bsky.network/subscribe", 142 + wanted_collections: ["app.bsky.feed.post", "app.bsky.graph.*"], 143 + wanted_dids: ["did:plc:example123"], 144 + cursor: option.Some(1234567890123456), 145 + max_message_size_bytes: option.Some(2097152), // 2MB 146 + compress: True, 147 + require_hello: False, 148 + ) 149 + 150 + goose.start_consumer(config, handle_event) 151 + ``` 152 + 153 + Further documentation can be found at <https://hexdocs.pm/goose>. 154 + 155 + ## Development 156 + 157 + ```sh 158 + gleam build # Build the project 159 + gleam test # Run the tests 160 + ```
+7
example/README.md
··· 1 + # Goose Example - Log All Events 2 + 3 + This example logs all events from the AT Protocol Jetstream. 4 + 5 + ```sh 6 + gleam run 7 + ```
+21
example/gleam.toml
··· 1 + name = "example" 2 + version = "1.0.0" 3 + 4 + # Fill out these fields if you intend to generate HTML documentation or publish 5 + # your project to the Hex package manager. 6 + # 7 + # description = "" 8 + # licences = ["Apache-2.0"] 9 + # repository = { type = "github", user = "", repo = "" } 10 + # links = [{ title = "Website", href = "" }] 11 + # 12 + # For a full reference of all the available options, you can have a look at 13 + # https://gleam.run/writing-gleam/gleam-toml/. 14 + 15 + [dependencies] 16 + gleam_stdlib = ">= 0.44.0 and < 2.0.0" 17 + goose = { path = "../" } 18 + ezstd = ">= 1.2.0 and < 2.0.0" 19 + 20 + [dev-dependencies] 21 + gleeunit = ">= 1.0.0 and < 2.0.0"
+20
example/manifest.toml
··· 1 + # This file was generated by Gleam 2 + # You typically do not need to edit this file 3 + 4 + packages = [ 5 + { name = "cowlib", version = "2.16.0", build_tools = ["make", "rebar3"], requirements = [], otp_app = "cowlib", source = "hex", outer_checksum = "7F478D80D66B747344F0EA7708C187645CFCC08B11AA424632F78E25BF05DB51" }, 6 + { name = "ezstd", version = "1.2.3", build_tools = ["rebar3"], requirements = [], otp_app = "ezstd", source = "hex", outer_checksum = "DE32E0B41BA36A9ED46DB8215DA74777D2F141BB75F67BFC05DBB4B7C3386DEE" }, 7 + { name = "gleam_erlang", version = "1.3.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_erlang", source = "hex", outer_checksum = "1124AD3AA21143E5AF0FC5CF3D9529F6DB8CA03E43A55711B60B6B7B3874375C" }, 8 + { name = "gleam_http", version = "4.3.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_http", source = "hex", outer_checksum = "82EA6A717C842456188C190AFB372665EA56CE13D8559BF3B1DD9E40F619EE0C" }, 9 + { name = "gleam_json", version = "3.0.2", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_json", source = "hex", outer_checksum = "874FA3C3BB6E22DD2BB111966BD40B3759E9094E05257899A7C08F5DE77EC049" }, 10 + { name = "gleam_stdlib", version = "0.65.0", build_tools = ["gleam"], requirements = [], otp_app = "gleam_stdlib", source = "hex", outer_checksum = "7C69C71D8C493AE11A5184828A77110EB05A7786EBF8B25B36A72F879C3EE107" }, 11 + { name = "gleeunit", version = "1.7.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleeunit", source = "hex", outer_checksum = "CD701726CBCE5588B375D157B4391CFD0F2F134CD12D9B6998A395484DE05C58" }, 12 + { name = "goose", version = "1.0.0", build_tools = ["gleam"], requirements = ["ezstd", "gleam_erlang", "gleam_http", "gleam_json", "gleam_stdlib", "gun"], source = "local", path = ".." }, 13 + { name = "gun", version = "2.2.0", build_tools = ["make", "rebar3"], requirements = ["cowlib"], otp_app = "gun", source = "hex", outer_checksum = "76022700C64287FEB4DF93A1795CFF6741B83FB37415C40C34C38D2A4645261A" }, 14 + ] 15 + 16 + [requirements] 17 + ezstd = { version = ">= 1.2.0 and < 2.0.0" } 18 + gleam_stdlib = { version = ">= 0.44.0 and < 2.0.0" } 19 + gleeunit = { version = ">= 1.0.0 and < 2.0.0" } 20 + goose = { path = "../" }
+74
example/src/example.gleam
··· 1 + import gleam/io 2 + import gleam/option 3 + import gleam/string 4 + import goose 5 + 6 + pub fn main() -> Nil { 7 + let config = 8 + goose.JetstreamConfig( 9 + endpoint: "wss://jetstream2.us-east.bsky.network/subscribe", 10 + wanted_collections: [], 11 + wanted_dids: [], 12 + cursor: option.None, 13 + max_message_size_bytes: option.None, 14 + compress: True, 15 + require_hello: False, 16 + ) 17 + 18 + io.println("Starting Jetstream consumer...") 19 + io.println("Connected to: " <> config.endpoint) 20 + io.println("Listening for all events...\n") 21 + 22 + // Start consuming and log all events 23 + goose.start_consumer(config, fn(json_event) { 24 + let event = goose.parse_event(json_event) 25 + 26 + case event { 27 + goose.CommitEvent(did, time_us, commit) -> { 28 + io.println( 29 + "COMMIT | " 30 + <> commit.operation 31 + <> " | " 32 + <> commit.collection 33 + <> " | " 34 + <> commit.rkey 35 + <> " | DID: " 36 + <> did 37 + <> " | Time: " 38 + <> string.inspect(time_us), 39 + ) 40 + } 41 + goose.IdentityEvent(did, time_us, identity) -> { 42 + io.println( 43 + "IDENTITY | Handle: " 44 + <> identity.handle 45 + <> " | DID: " 46 + <> did 47 + <> " | Seq: " 48 + <> string.inspect(identity.seq) 49 + <> " | Time: " 50 + <> string.inspect(time_us), 51 + ) 52 + } 53 + goose.AccountEvent(did, time_us, account) -> { 54 + let status = case account.active { 55 + True -> "ACTIVE" 56 + False -> "INACTIVE" 57 + } 58 + io.println( 59 + "ACCOUNT | Status: " 60 + <> status 61 + <> " | DID: " 62 + <> did 63 + <> " | Seq: " 64 + <> string.inspect(account.seq) 65 + <> " | Time: " 66 + <> string.inspect(time_us), 67 + ) 68 + } 69 + goose.UnknownEvent(raw) -> { 70 + io.println("UNKNOWN | " <> raw) 71 + } 72 + } 73 + }) 74 + }
+13
example/test/example_test.gleam
··· 1 + import gleeunit 2 + 3 + pub fn main() -> Nil { 4 + gleeunit.main() 5 + } 6 + 7 + // gleeunit test functions end in `_test` 8 + pub fn hello_world_test() { 9 + let name = "Joe" 10 + let greeting = "Hello, " <> name <> "!" 11 + 12 + assert greeting == "Hello, Joe!" 13 + }
+24
gleam.toml
··· 1 + name = "goose" 2 + version = "1.0.0" 3 + 4 + # Fill out these fields if you intend to generate HTML documentation or publish 5 + # your project to the Hex package manager. 6 + # 7 + # description = "" 8 + # licences = ["Apache-2.0"] 9 + # repository = { type = "github", user = "", repo = "" } 10 + # links = [{ title = "Website", href = "" }] 11 + # 12 + # For a full reference of all the available options, you can have a look at 13 + # https://gleam.run/writing-gleam/gleam-toml/. 14 + 15 + [dependencies] 16 + gleam_stdlib = ">= 0.44.0 and < 2.0.0" 17 + gleam_erlang = ">= 1.0.0 and < 2.0.0" 18 + gleam_http = ">= 4.0.0 and < 5.0.0" 19 + gleam_json = ">= 3.0.2 and < 4.0.0" 20 + gun = ">= 2.2.0 and < 3.0.0" 21 + ezstd = ">= 1.2.0 and < 2.0.0" 22 + 23 + [dev-dependencies] 24 + gleeunit = ">= 1.0.0 and < 2.0.0"
+22
manifest.toml
··· 1 + # This file was generated by Gleam 2 + # You typically do not need to edit this file 3 + 4 + packages = [ 5 + { name = "cowlib", version = "2.16.0", build_tools = ["make", "rebar3"], requirements = [], otp_app = "cowlib", source = "hex", outer_checksum = "7F478D80D66B747344F0EA7708C187645CFCC08B11AA424632F78E25BF05DB51" }, 6 + { name = "ezstd", version = "1.2.3", build_tools = ["rebar3"], requirements = [], otp_app = "ezstd", source = "hex", outer_checksum = "DE32E0B41BA36A9ED46DB8215DA74777D2F141BB75F67BFC05DBB4B7C3386DEE" }, 7 + { name = "gleam_erlang", version = "1.3.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_erlang", source = "hex", outer_checksum = "1124AD3AA21143E5AF0FC5CF3D9529F6DB8CA03E43A55711B60B6B7B3874375C" }, 8 + { name = "gleam_http", version = "4.3.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_http", source = "hex", outer_checksum = "82EA6A717C842456188C190AFB372665EA56CE13D8559BF3B1DD9E40F619EE0C" }, 9 + { name = "gleam_json", version = "3.0.2", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_json", source = "hex", outer_checksum = "874FA3C3BB6E22DD2BB111966BD40B3759E9094E05257899A7C08F5DE77EC049" }, 10 + { name = "gleam_stdlib", version = "0.65.0", build_tools = ["gleam"], requirements = [], otp_app = "gleam_stdlib", source = "hex", outer_checksum = "7C69C71D8C493AE11A5184828A77110EB05A7786EBF8B25B36A72F879C3EE107" }, 11 + { name = "gleeunit", version = "1.6.1", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleeunit", source = "hex", outer_checksum = "FDC68A8C492B1E9B429249062CD9BAC9B5538C6FBF584817205D0998C42E1DAC" }, 12 + { name = "gun", version = "2.2.0", build_tools = ["make", "rebar3"], requirements = ["cowlib"], otp_app = "gun", source = "hex", outer_checksum = "76022700C64287FEB4DF93A1795CFF6741B83FB37415C40C34C38D2A4645261A" }, 13 + ] 14 + 15 + [requirements] 16 + ezstd = { version = ">= 1.2.0 and < 2.0.0" } 17 + gleam_erlang = { version = ">= 1.0.0 and < 2.0.0" } 18 + gleam_http = { version = ">= 4.0.0 and < 5.0.0" } 19 + gleam_json = { version = ">= 3.0.2 and < 4.0.0" } 20 + gleam_stdlib = { version = ">= 0.44.0 and < 2.0.0" } 21 + gleeunit = { version = ">= 1.0.0 and < 2.0.0" } 22 + gun = { version = ">= 2.2.0 and < 3.0.0" }
priv/zstd_dictionary

This is a binary file and will not be displayed.

+270
src/goose.gleam
··· 1 + import gleam/dynamic.{type Dynamic} 2 + import gleam/dynamic/decode 3 + import gleam/erlang/process.{type Pid} 4 + import gleam/io 5 + import gleam/json 6 + import gleam/list 7 + import gleam/option.{type Option} 8 + import gleam/string 9 + 10 + /// Jetstream event types 11 + pub type JetstreamEvent { 12 + CommitEvent(did: String, time_us: Int, commit: CommitData) 13 + IdentityEvent(did: String, time_us: Int, identity: IdentityData) 14 + AccountEvent(did: String, time_us: Int, account: AccountData) 15 + UnknownEvent(raw: String) 16 + } 17 + 18 + pub type CommitData { 19 + CommitData( 20 + rev: String, 21 + operation: String, 22 + collection: String, 23 + rkey: String, 24 + record: Option(Dynamic), 25 + cid: Option(String), 26 + ) 27 + } 28 + 29 + pub type IdentityData { 30 + IdentityData(did: String, handle: String, seq: Int, time: String) 31 + } 32 + 33 + pub type AccountData { 34 + AccountData(active: Bool, did: String, seq: Int, time: String) 35 + } 36 + 37 + /// Configuration for Jetstream consumer 38 + pub type JetstreamConfig { 39 + JetstreamConfig( 40 + endpoint: String, 41 + wanted_collections: List(String), 42 + wanted_dids: List(String), 43 + cursor: Option(Int), 44 + max_message_size_bytes: Option(Int), 45 + compress: Bool, 46 + require_hello: Bool, 47 + ) 48 + } 49 + 50 + /// Create a default configuration for US East endpoint 51 + pub fn default_config() -> JetstreamConfig { 52 + JetstreamConfig( 53 + endpoint: "wss://jetstream2.us-east.bsky.network/subscribe", 54 + wanted_collections: [], 55 + wanted_dids: [], 56 + cursor: option.None, 57 + max_message_size_bytes: option.None, 58 + compress: False, 59 + require_hello: False, 60 + ) 61 + } 62 + 63 + /// Build the WebSocket URL with query parameters 64 + pub fn build_url(config: JetstreamConfig) -> String { 65 + let base = config.endpoint 66 + let mut_params = [] 67 + 68 + // Add wanted collections (each as a separate query parameter) 69 + let mut_params = case config.wanted_collections { 70 + [] -> mut_params 71 + collections -> { 72 + let collection_params = 73 + list.map(collections, fn(col) { "wantedCollections=" <> col }) 74 + list.append(collection_params, mut_params) 75 + } 76 + } 77 + 78 + // Add wanted DIDs (each as a separate query parameter) 79 + let mut_params = case config.wanted_dids { 80 + [] -> mut_params 81 + dids -> { 82 + let did_params = list.map(dids, fn(did) { "wantedDids=" <> did }) 83 + list.append(did_params, mut_params) 84 + } 85 + } 86 + 87 + // Add cursor if specified 88 + let mut_params = case config.cursor { 89 + option.None -> mut_params 90 + option.Some(cursor_val) -> 91 + list.append(["cursor=" <> string.inspect(cursor_val)], mut_params) 92 + } 93 + 94 + // Add maxMessageSizeBytes if specified 95 + let mut_params = case config.max_message_size_bytes { 96 + option.None -> mut_params 97 + option.Some(size_val) -> 98 + list.append(["maxMessageSizeBytes=" <> string.inspect(size_val)], mut_params) 99 + } 100 + 101 + // Add compress parameter (always include it) 102 + let mut_params = case config.compress { 103 + False -> list.append(["compress=false"], mut_params) 104 + True -> list.append(["compress=true"], mut_params) 105 + } 106 + 107 + // Add requireHello parameter (always include it) 108 + let mut_params = case config.require_hello { 109 + False -> list.append(["requireHello=false"], mut_params) 110 + True -> list.append(["requireHello=true"], mut_params) 111 + } 112 + 113 + case mut_params { 114 + [] -> base 115 + params -> base <> "?" <> string.join(list.reverse(params), "&") 116 + } 117 + } 118 + 119 + /// Connect to Jetstream WebSocket using Erlang gun library 120 + @external(erlang, "goose_ws_ffi", "connect") 121 + pub fn connect( 122 + url: String, 123 + handler_pid: Pid, 124 + compress: Bool, 125 + ) -> Result(Pid, Dynamic) 126 + 127 + /// Start consuming the Jetstream feed 128 + pub fn start_consumer( 129 + config: JetstreamConfig, 130 + on_event: fn(String) -> Nil, 131 + ) -> Nil { 132 + let url = build_url(config) 133 + let self = process.self() 134 + let result = connect(url, self, config.compress) 135 + 136 + case result { 137 + Ok(_conn_pid) -> { 138 + receive_loop(on_event) 139 + } 140 + Error(err) -> { 141 + io.println("Failed to connect to Jetstream") 142 + io.println_error(string.inspect(err)) 143 + } 144 + } 145 + } 146 + 147 + /// Receive loop for WebSocket messages 148 + fn receive_loop(on_event: fn(String) -> Nil) -> Nil { 149 + // Call Erlang to receive one message 150 + case receive_ws_message() { 151 + Ok(text) -> { 152 + on_event(text) 153 + receive_loop(on_event) 154 + } 155 + Error(_) -> { 156 + // Timeout or error, continue loop 157 + receive_loop(on_event) 158 + } 159 + } 160 + } 161 + 162 + /// Receive a WebSocket message from the message queue 163 + @external(erlang, "goose_ffi", "receive_ws_message") 164 + fn receive_ws_message() -> Result(String, Nil) 165 + 166 + /// Parse a JSON event string into a JetstreamEvent 167 + pub fn parse_event(json_string: String) -> JetstreamEvent { 168 + // Try to parse as commit event first 169 + case json.parse(json_string, commit_event_decoder()) { 170 + Ok(event) -> event 171 + Error(_) -> { 172 + // Try identity event 173 + case json.parse(json_string, identity_event_decoder()) { 174 + Ok(event) -> event 175 + Error(_) -> { 176 + // Try account event 177 + case json.parse(json_string, account_event_decoder()) { 178 + Ok(event) -> event 179 + Error(_) -> UnknownEvent(json_string) 180 + } 181 + } 182 + } 183 + } 184 + } 185 + } 186 + 187 + /// Decoder for commit events 188 + fn commit_event_decoder() { 189 + use did <- decode.field("did", decode.string) 190 + use time_us <- decode.field("time_us", decode.int) 191 + use commit <- decode.field("commit", commit_data_decoder()) 192 + decode.success(CommitEvent(did: did, time_us: time_us, commit: commit)) 193 + } 194 + 195 + /// Decoder for commit data - handles both create/update (with record) and delete (without) 196 + fn commit_data_decoder() { 197 + // Try decoder with record and cid fields first (for create/update) 198 + // If that fails, try without (for delete) 199 + decode.one_of(commit_with_record_decoder(), or: [ 200 + commit_without_record_decoder(), 201 + ]) 202 + } 203 + 204 + /// Decoder for commit with record (create/update operations) 205 + fn commit_with_record_decoder() { 206 + use rev <- decode.field("rev", decode.string) 207 + use operation <- decode.field("operation", decode.string) 208 + use collection <- decode.field("collection", decode.string) 209 + use rkey <- decode.field("rkey", decode.string) 210 + use record <- decode.field("record", decode.dynamic) 211 + use cid <- decode.field("cid", decode.string) 212 + decode.success(CommitData( 213 + rev: rev, 214 + operation: operation, 215 + collection: collection, 216 + rkey: rkey, 217 + record: option.Some(record), 218 + cid: option.Some(cid), 219 + )) 220 + } 221 + 222 + /// Decoder for commit without record (delete operations) 223 + fn commit_without_record_decoder() { 224 + use rev <- decode.field("rev", decode.string) 225 + use operation <- decode.field("operation", decode.string) 226 + use collection <- decode.field("collection", decode.string) 227 + use rkey <- decode.field("rkey", decode.string) 228 + decode.success(CommitData( 229 + rev: rev, 230 + operation: operation, 231 + collection: collection, 232 + rkey: rkey, 233 + record: option.None, 234 + cid: option.None, 235 + )) 236 + } 237 + 238 + /// Decoder for identity events 239 + fn identity_event_decoder() { 240 + use did <- decode.field("did", decode.string) 241 + use time_us <- decode.field("time_us", decode.int) 242 + use identity <- decode.field("identity", identity_data_decoder()) 243 + decode.success(IdentityEvent(did: did, time_us: time_us, identity: identity)) 244 + } 245 + 246 + /// Decoder for identity data 247 + fn identity_data_decoder() { 248 + use did <- decode.field("did", decode.string) 249 + use handle <- decode.field("handle", decode.string) 250 + use seq <- decode.field("seq", decode.int) 251 + use time <- decode.field("time", decode.string) 252 + decode.success(IdentityData(did: did, handle: handle, seq: seq, time: time)) 253 + } 254 + 255 + /// Decoder for account events 256 + fn account_event_decoder() { 257 + use did <- decode.field("did", decode.string) 258 + use time_us <- decode.field("time_us", decode.int) 259 + use account <- decode.field("account", account_data_decoder()) 260 + decode.success(AccountEvent(did: did, time_us: time_us, account: account)) 261 + } 262 + 263 + /// Decoder for account data 264 + fn account_data_decoder() { 265 + use active <- decode.field("active", decode.bool) 266 + use did <- decode.field("did", decode.string) 267 + use seq <- decode.field("seq", decode.int) 268 + use time <- decode.field("time", decode.string) 269 + decode.success(AccountData(active: active, did: did, seq: seq, time: time)) 270 + }
+23
src/goose_ffi.erl
··· 1 + -module(goose_ffi). 2 + -export([receive_ws_message/0]). 3 + 4 + %% Receive a WebSocket text message from the process mailbox 5 + receive_ws_message() -> 6 + receive 7 + %% Handle messages forwarded from handler process 8 + {ws_text, Text} -> 9 + {ok, Text}; 10 + {ws_binary, _Binary} -> 11 + %% Ignore binary messages, try again 12 + receive_ws_message(); 13 + {ws_closed, _Reason} -> 14 + {error, nil}; 15 + {ws_error, _Reason} -> 16 + {error, nil}; 17 + _Other -> 18 + %% Ignore unexpected messages 19 + receive_ws_message() 20 + after 60000 -> 21 + %% Timeout - return error to continue loop 22 + {error, nil} 23 + end.
+202
src/goose_ws_ffi.erl
··· 1 + -module(goose_ws_ffi). 2 + -export([connect/3]). 3 + 4 + %% Connect to WebSocket using gun 5 + connect(Url, HandlerPid, Compress) -> 6 + %% Start gun application and dependencies 7 + application:ensure_all_started(ssl), 8 + application:ensure_all_started(gun), 9 + 10 + %% Start ezstd if compression is enabled 11 + case Compress of 12 + true -> application:ensure_all_started(ezstd); 13 + _ -> ok 14 + end, 15 + 16 + %% Spawn a connection process that will own the gun connection 17 + Parent = self(), 18 + spawn(fun() -> connect_worker(Url, HandlerPid, Compress, Parent) end), 19 + 20 + %% Wait for the connection result 21 + receive 22 + {connection_result, Result} -> Result 23 + after 60000 -> 24 + {error, connection_timeout} 25 + end. 26 + 27 + %% Worker process that owns the connection 28 + connect_worker(Url, HandlerPid, Compress, Parent) -> 29 + %% Parse URL using uri_string 30 + UriMap = uri_string:parse(Url), 31 + #{scheme := SchemeStr, host := Host, path := Path} = UriMap, 32 + 33 + %% Get query string if present and append to path 34 + Query = maps:get(query, UriMap, undefined), 35 + PathWithQuery = case Query of 36 + undefined -> Path; 37 + <<>> -> Path; 38 + Q -> <<Path/binary, "?", Q/binary>> 39 + end, 40 + 41 + %% Get port, use defaults if not specified 42 + Port = maps:get(port, uri_string:parse(Url), 43 + case SchemeStr of 44 + <<"wss">> -> 443; 45 + <<"ws">> -> 80; 46 + _ -> 443 47 + end), 48 + 49 + %% Determine transport 50 + Transport = case SchemeStr of 51 + <<"wss">> -> tls; 52 + <<"ws">> -> tcp; 53 + _ -> tls 54 + end, 55 + 56 + %% TLS options for secure connections 57 + TlsOpts = [{verify, verify_none}], %% For simplicity, disable cert verification 58 + %% In production, use proper CA certs 59 + 60 + %% Connection options 61 + Opts = case Transport of 62 + tls -> 63 + #{ 64 + transport => tls, 65 + tls_opts => TlsOpts, 66 + protocols => [http], 67 + retry => 10, 68 + retry_timeout => 1000 69 + }; 70 + tcp -> 71 + #{ 72 + transport => tcp, 73 + protocols => [http], 74 + retry => 10, 75 + retry_timeout => 1000 76 + } 77 + end, 78 + 79 + %% Convert host to list if needed 80 + HostStr = case is_binary(Host) of 81 + true -> binary_to_list(Host); 82 + false -> Host 83 + end, 84 + 85 + %% Ensure path with query is binary 86 + PathBin = case is_binary(PathWithQuery) of 87 + true -> PathWithQuery; 88 + false -> list_to_binary(PathWithQuery) 89 + end, 90 + 91 + %% Open connection (this process will be the owner) 92 + case gun:open(HostStr, Port, Opts) of 93 + {ok, ConnPid} -> 94 + %% Monitor the connection 95 + MRef = monitor(process, ConnPid), 96 + 97 + %% Wait for connection 98 + receive 99 + {gun_up, ConnPid, _Protocol} -> 100 + %% Upgrade to WebSocket (compression is controlled via query string, not headers) 101 + StreamRef = gun:ws_upgrade(ConnPid, binary_to_list(PathBin), []), 102 + 103 + %% Wait for upgrade 104 + receive 105 + {gun_upgrade, ConnPid, StreamRef, [<<"websocket">>], _ResponseHeaders} -> 106 + %% Notify parent that connection is ready 107 + Parent ! {connection_result, {ok, ConnPid}}, 108 + %% Now handle messages in this process (the connection owner) 109 + handle_messages(ConnPid, StreamRef, HandlerPid, Compress); 110 + {gun_response, ConnPid, _, _, Status, Headers} -> 111 + gun:close(ConnPid), 112 + Parent ! {connection_result, {error, {upgrade_failed, Status, Headers}}}; 113 + {gun_error, ConnPid, _StreamRef, Reason} -> 114 + gun:close(ConnPid), 115 + Parent ! {connection_result, {error, {gun_error, Reason}}}; 116 + {'DOWN', MRef, process, ConnPid, Reason} -> 117 + Parent ! {connection_result, {error, {connection_down, Reason}}}; 118 + _Other -> 119 + gun:close(ConnPid), 120 + Parent ! {connection_result, {error, unexpected_message}} 121 + after 30000 -> 122 + gun:close(ConnPid), 123 + Parent ! {connection_result, {error, upgrade_timeout}} 124 + end; 125 + {'DOWN', MRef, process, ConnPid, Reason} -> 126 + Parent ! {connection_result, {error, {connection_failed, Reason}}}; 127 + _Other -> 128 + gun:close(ConnPid), 129 + Parent ! {connection_result, {error, unexpected_message}} 130 + after 30000 -> 131 + gun:close(ConnPid), 132 + Parent ! {connection_result, {error, connection_timeout}} 133 + end; 134 + {error, Reason} -> 135 + Parent ! {connection_result, {error, {open_failed, Reason}}} 136 + end. 137 + 138 + %% Handle incoming WebSocket messages 139 + handle_messages(ConnPid, StreamRef, HandlerPid, Compress) -> 140 + %% Load zstd dictionary if compression is enabled 141 + Decompressor = case Compress of 142 + true -> 143 + %% Load dictionary from priv directory 144 + PrivDir = code:priv_dir(goose), 145 + DictPath = filename:join(PrivDir, "zstd_dictionary"), 146 + case file:read_file(DictPath) of 147 + {ok, DictData} -> 148 + %% Create decompression dictionary (returns reference directly) 149 + DDict = ezstd:create_ddict(DictData), 150 + {ok, DDict}; 151 + {error, Err} -> 152 + io:format("Failed to read zstd dictionary: ~p~n", [Err]), 153 + {error, Err} 154 + end; 155 + _ -> 156 + none 157 + end, 158 + handle_messages_loop(ConnPid, StreamRef, HandlerPid, Compress, Decompressor). 159 + 160 + %% Message handling loop 161 + handle_messages_loop(ConnPid, StreamRef, HandlerPid, Compress, Decompressor) -> 162 + receive 163 + {gun_ws, _AnyConnPid, _AnyStreamRef, {text, Text}} -> 164 + HandlerPid ! {ws_text, Text}, 165 + handle_messages_loop(ConnPid, StreamRef, HandlerPid, Compress, Decompressor); 166 + {gun_ws, _AnyConnPid, _AnyStreamRef, {binary, Binary}} -> 167 + %% If compression is enabled, decompress the binary data 168 + case {Compress, Decompressor} of 169 + {true, {ok, DDict}} -> 170 + try 171 + %% decompress_using_ddict returns the decompressed data directly 172 + Decompressed = ezstd:decompress_using_ddict(Binary, DDict), 173 + %% Ensure it's treated as a binary (not iolist) 174 + DecompressedBin = iolist_to_binary([Decompressed]), 175 + HandlerPid ! {ws_text, DecompressedBin} 176 + catch 177 + Error:Reason:_Stacktrace -> 178 + io:format("Decompression failed: ~p:~p~n", [Error, Reason]) 179 + end; 180 + _ -> 181 + %% No compression, ignore binary messages 182 + ok 183 + end, 184 + handle_messages_loop(ConnPid, StreamRef, HandlerPid, Compress, Decompressor); 185 + {gun_ws, ConnPid, StreamRef, close} -> 186 + HandlerPid ! {ws_closed, normal}, 187 + gun:close(ConnPid); 188 + {gun_down, ConnPid, _Protocol, Reason, _KilledStreams} -> 189 + HandlerPid ! {ws_error, Reason}, 190 + gun:close(ConnPid); 191 + {gun_error, ConnPid, StreamRef, Reason} -> 192 + HandlerPid ! {ws_error, Reason}, 193 + handle_messages_loop(ConnPid, StreamRef, HandlerPid, Compress, Decompressor); 194 + stop -> 195 + gun:close(ConnPid); 196 + _Other -> 197 + %% Ignore unexpected messages 198 + handle_messages_loop(ConnPid, StreamRef, HandlerPid, Compress, Decompressor) 199 + after 30000 -> 200 + %% Heartbeat every 30 seconds to keep connection alive 201 + handle_messages_loop(ConnPid, StreamRef, HandlerPid, Compress, Decompressor) 202 + end.
+233
test/goose_test.gleam
··· 1 + import gleam/option 2 + import gleeunit 3 + import goose 4 + 5 + pub fn main() -> Nil { 6 + gleeunit.main() 7 + } 8 + 9 + // Test build_url with default config (no filters) 10 + pub fn build_url_default_test() { 11 + let config = goose.default_config() 12 + let url = goose.build_url(config) 13 + 14 + assert url == "wss://jetstream2.us-east.bsky.network/subscribe" 15 + } 16 + 17 + // Test build_url with wanted collections 18 + pub fn build_url_with_collections_test() { 19 + let config = goose.JetstreamConfig( 20 + endpoint: "wss://jetstream2.us-east.bsky.network/subscribe", 21 + wanted_collections: ["app.bsky.feed.post", "app.bsky.feed.like"], 22 + wanted_dids: [], 23 + cursor: option.None, 24 + max_message_size_bytes: option.None, 25 + compress: False, 26 + require_hello: False, 27 + ) 28 + let url = goose.build_url(config) 29 + 30 + assert url == "wss://jetstream2.us-east.bsky.network/subscribe?wantedCollections=app.bsky.feed.like&wantedCollections=app.bsky.feed.post" 31 + } 32 + 33 + // Test build_url with wanted DIDs 34 + pub fn build_url_with_dids_test() { 35 + let config = goose.JetstreamConfig( 36 + endpoint: "wss://jetstream2.us-east.bsky.network/subscribe", 37 + wanted_collections: [], 38 + wanted_dids: ["did:plc:example123", "did:plc:example456"], 39 + cursor: option.None, 40 + max_message_size_bytes: option.None, 41 + compress: False, 42 + require_hello: False, 43 + ) 44 + let url = goose.build_url(config) 45 + 46 + assert url == "wss://jetstream2.us-east.bsky.network/subscribe?wantedDids=did:plc:example456&wantedDids=did:plc:example123" 47 + } 48 + 49 + // Test build_url with both collections and DIDs 50 + pub fn build_url_with_both_test() { 51 + let config = goose.JetstreamConfig( 52 + endpoint: "wss://jetstream2.us-east.bsky.network/subscribe", 53 + wanted_collections: ["app.bsky.feed.post"], 54 + wanted_dids: ["did:plc:example123"], 55 + cursor: option.None, 56 + max_message_size_bytes: option.None, 57 + compress: False, 58 + require_hello: False, 59 + ) 60 + let url = goose.build_url(config) 61 + 62 + assert url == "wss://jetstream2.us-east.bsky.network/subscribe?wantedCollections=app.bsky.feed.post&wantedDids=did:plc:example123" 63 + } 64 + 65 + // Test build_url with cursor 66 + pub fn build_url_with_cursor_test() { 67 + let config = goose.JetstreamConfig( 68 + endpoint: "wss://jetstream2.us-east.bsky.network/subscribe", 69 + wanted_collections: [], 70 + wanted_dids: [], 71 + cursor: option.Some(1234567890123456), 72 + max_message_size_bytes: option.None, 73 + compress: False, 74 + require_hello: False, 75 + ) 76 + let url = goose.build_url(config) 77 + 78 + assert url == "wss://jetstream2.us-east.bsky.network/subscribe?cursor=1234567890123456" 79 + } 80 + 81 + // Test build_url with max_message_size_bytes 82 + pub fn build_url_with_max_size_test() { 83 + let config = goose.JetstreamConfig( 84 + endpoint: "wss://jetstream2.us-east.bsky.network/subscribe", 85 + wanted_collections: [], 86 + wanted_dids: [], 87 + cursor: option.None, 88 + max_message_size_bytes: option.Some(1048576), 89 + compress: False, 90 + require_hello: False, 91 + ) 92 + let url = goose.build_url(config) 93 + 94 + assert url == "wss://jetstream2.us-east.bsky.network/subscribe?maxMessageSizeBytes=1048576" 95 + } 96 + 97 + // Test build_url with compress enabled 98 + pub fn build_url_with_compress_test() { 99 + let config = goose.JetstreamConfig( 100 + endpoint: "wss://jetstream2.us-east.bsky.network/subscribe", 101 + wanted_collections: [], 102 + wanted_dids: [], 103 + cursor: option.None, 104 + max_message_size_bytes: option.None, 105 + compress: True, 106 + require_hello: False, 107 + ) 108 + let url = goose.build_url(config) 109 + 110 + assert url == "wss://jetstream2.us-east.bsky.network/subscribe?compress=true" 111 + } 112 + 113 + // Test build_url with require_hello enabled 114 + pub fn build_url_with_require_hello_test() { 115 + let config = goose.JetstreamConfig( 116 + endpoint: "wss://jetstream2.us-east.bsky.network/subscribe", 117 + wanted_collections: [], 118 + wanted_dids: [], 119 + cursor: option.None, 120 + max_message_size_bytes: option.None, 121 + compress: False, 122 + require_hello: True, 123 + ) 124 + let url = goose.build_url(config) 125 + 126 + assert url == "wss://jetstream2.us-east.bsky.network/subscribe?requireHello=true" 127 + } 128 + 129 + // Test build_url with all options combined 130 + pub fn build_url_with_all_options_test() { 131 + let config = goose.JetstreamConfig( 132 + endpoint: "wss://jetstream2.us-east.bsky.network/subscribe", 133 + wanted_collections: ["app.bsky.feed.post"], 134 + wanted_dids: ["did:plc:example123"], 135 + cursor: option.Some(9876543210), 136 + max_message_size_bytes: option.Some(2097152), 137 + compress: True, 138 + require_hello: True, 139 + ) 140 + let url = goose.build_url(config) 141 + 142 + assert url == "wss://jetstream2.us-east.bsky.network/subscribe?wantedCollections=app.bsky.feed.post&wantedDids=did:plc:example123&cursor=9876543210&maxMessageSizeBytes=2097152&compress=true&requireHello=true" 143 + } 144 + 145 + // Test parsing a commit event (create operation with record) 146 + pub fn parse_commit_event_create_test() { 147 + let json = "{\"did\":\"did:plc:test123\",\"time_us\":1234567890,\"commit\":{\"rev\":\"abc123\",\"operation\":\"create\",\"collection\":\"app.bsky.feed.post\",\"rkey\":\"post123\",\"record\":{\"text\":\"Hello world\"},\"cid\":\"cid123\"}}" 148 + 149 + let event = goose.parse_event(json) 150 + 151 + case event { 152 + goose.CommitEvent(did, time_us, commit) -> { 153 + assert did == "did:plc:test123" 154 + assert time_us == 1234567890 155 + assert commit.rev == "abc123" 156 + assert commit.operation == "create" 157 + assert commit.collection == "app.bsky.feed.post" 158 + assert commit.rkey == "post123" 159 + } 160 + _ -> panic as "Expected CommitEvent" 161 + } 162 + } 163 + 164 + // Test parsing a commit event (delete operation without record) 165 + pub fn parse_commit_event_delete_test() { 166 + let json = "{\"did\":\"did:plc:test456\",\"time_us\":9876543210,\"commit\":{\"rev\":\"xyz789\",\"operation\":\"delete\",\"collection\":\"app.bsky.feed.like\",\"rkey\":\"like456\"}}" 167 + 168 + let event = goose.parse_event(json) 169 + 170 + case event { 171 + goose.CommitEvent(did, time_us, commit) -> { 172 + assert did == "did:plc:test456" 173 + assert time_us == 9876543210 174 + assert commit.rev == "xyz789" 175 + assert commit.operation == "delete" 176 + assert commit.collection == "app.bsky.feed.like" 177 + assert commit.rkey == "like456" 178 + } 179 + _ -> panic as "Expected CommitEvent" 180 + } 181 + } 182 + 183 + // Test parsing an identity event 184 + pub fn parse_identity_event_test() { 185 + let json = "{\"did\":\"did:plc:identity123\",\"time_us\":1111111111,\"identity\":{\"did\":\"did:plc:identity123\",\"handle\":\"alice.bsky.social\",\"seq\":42,\"time\":\"2024-01-01T00:00:00Z\"}}" 186 + 187 + let event = goose.parse_event(json) 188 + 189 + case event { 190 + goose.IdentityEvent(did, time_us, identity) -> { 191 + assert did == "did:plc:identity123" 192 + assert time_us == 1111111111 193 + assert identity.did == "did:plc:identity123" 194 + assert identity.handle == "alice.bsky.social" 195 + assert identity.seq == 42 196 + assert identity.time == "2024-01-01T00:00:00Z" 197 + } 198 + _ -> panic as "Expected IdentityEvent" 199 + } 200 + } 201 + 202 + // Test parsing an account event 203 + pub fn parse_account_event_test() { 204 + let json = "{\"did\":\"did:plc:account789\",\"time_us\":2222222222,\"account\":{\"active\":true,\"did\":\"did:plc:account789\",\"seq\":99,\"time\":\"2024-01-02T00:00:00Z\"}}" 205 + 206 + let event = goose.parse_event(json) 207 + 208 + case event { 209 + goose.AccountEvent(did, time_us, account) -> { 210 + assert did == "did:plc:account789" 211 + assert time_us == 2222222222 212 + assert account.active == True 213 + assert account.did == "did:plc:account789" 214 + assert account.seq == 99 215 + assert account.time == "2024-01-02T00:00:00Z" 216 + } 217 + _ -> panic as "Expected AccountEvent" 218 + } 219 + } 220 + 221 + // Test parsing unknown/invalid JSON 222 + pub fn parse_unknown_event_test() { 223 + let json = "{\"unknown\":\"event\",\"type\":\"something\"}" 224 + 225 + let event = goose.parse_event(json) 226 + 227 + case event { 228 + goose.UnknownEvent(raw) -> { 229 + assert raw == json 230 + } 231 + _ -> panic as "Expected UnknownEvent" 232 + } 233 + }