Elixir ATProtocol ingestion and sync library.
at main 151 lines 5.0 kB view raw
1defmodule Drinkup.Jetstream.Options do 2 @moduledoc """ 3 Configuration options for Jetstream event stream connection. 4 5 Jetstream is a simplified JSON event stream that converts the CBOR-encoded 6 ATProto Firehose into lightweight, friendly JSON. It provides zstd compression 7 and filtering capabilities for collections and DIDs. 8 9 ## Options 10 11 - `:consumer` (required) - Module implementing `Drinkup.Jetstream.Consumer` behaviour 12 - `:name` - Unique name for this Jetstream instance in the supervision tree (default: `Drinkup.Jetstream`) 13 - `:host` - Jetstream service URL (default: `"wss://jetstream2.us-east.bsky.network"`) 14 - `:wanted_collections` - List of collection NSIDs or prefixes to filter (default: `[]` = all collections) 15 - `:wanted_dids` - List of DIDs to filter (default: `[]` = all repos) 16 - `:cursor` - Unix microseconds timestamp to resume from (default: `nil` = live-tail) 17 - `:require_hello` - Pause replay until first options update is sent (default: `false`) 18 - `:max_message_size_bytes` - Maximum message size to receive (default: `nil` = no limit) 19 20 ## Example 21 22 %{ 23 consumer: MyJetstreamConsumer, 24 name: MyJetstream, 25 host: "wss://jetstream2.us-east.bsky.network", 26 wanted_collections: ["app.bsky.feed.post", "app.bsky.feed.like"], 27 wanted_dids: ["did:plc:abc123"], 28 cursor: 1725519626134432 29 } 30 31 ## Collection Filters 32 33 The `wanted_collections` option supports: 34 - Full NSIDs: `"app.bsky.feed.post"` 35 - NSID prefixes: `"app.bsky.graph.*"`, `"app.bsky.*"` 36 37 You can specify up to 100 collection filters. 38 39 ## DID Filters 40 41 The `wanted_dids` option accepts a list of DID strings. 42 You can specify up to 10,000 DIDs. 43 44 ## Compression 45 46 Jetstream always uses zstd compression with a custom dictionary. 47 This is handled automatically by the socket implementation. 48 """ 49 50 use TypedStruct 51 52 @default_host "wss://jetstream2.us-east.bsky.network" 53 54 @typedoc """ 55 Map of configuration options accepted by `Drinkup.Jetstream.child_spec/1`. 56 """ 57 @type options() :: %{ 58 required(:consumer) => consumer(), 59 optional(:name) => name(), 60 optional(:host) => host(), 61 optional(:wanted_collections) => wanted_collections(), 62 optional(:wanted_dids) => wanted_dids(), 63 optional(:cursor) => cursor(), 64 optional(:require_hello) => require_hello(), 65 optional(:max_message_size_bytes) => max_message_size_bytes() 66 } 67 68 @typedoc """ 69 Module implementing the `Drinkup.Jetstream.Consumer` behaviour. 70 """ 71 @type consumer() :: module() 72 73 @typedoc """ 74 Unique identifier for this Jetstream instance in the supervision tree. 75 76 Used for Registry lookups and naming child processes. 77 """ 78 @type name() :: atom() 79 80 @typedoc """ 81 WebSocket URL of the Jetstream service. 82 83 Defaults to `"wss://jetstream2.us-east.bsky.network"` which is a public Bluesky instance. 84 """ 85 @type host() :: String.t() 86 87 @typedoc """ 88 List of collection NSIDs or NSID prefixes to filter. 89 90 Examples: 91 - `["app.bsky.feed.post"]` - Only posts 92 - `["app.bsky.graph.*"]` - All graph collections 93 - `["app.bsky.*"]` - All Bluesky app collections 94 95 You can specify up to 100 collection filters. 96 Defaults to `[]` (all collections). 97 """ 98 @type wanted_collections() :: [String.t()] 99 100 @typedoc """ 101 List of DIDs to filter events by. 102 103 You can specify up to 10,000 DIDs. 104 Defaults to `[]` (all repos). 105 """ 106 @type wanted_dids() :: [String.t()] 107 108 @typedoc """ 109 Unix microseconds timestamp to resume streaming from. 110 111 When provided, Jetstream will replay events starting from this timestamp. 112 Useful for resuming after a restart without missing events. The cursor is 113 automatically tracked and updated as events are received. 114 115 Defaults to `nil` (live-tail from current time). 116 """ 117 @type cursor() :: pos_integer() | nil 118 119 @typedoc """ 120 Whether to pause replay/live-tail until the first options update is sent. 121 122 When `true`, the connection will wait for a `Drinkup.Jetstream.update_options/2` 123 call before starting to receive events. 124 125 Defaults to `false`. 126 """ 127 @type require_hello() :: boolean() 128 129 @typedoc """ 130 Maximum message size in bytes that the client would like to receive. 131 132 Zero or `nil` means no limit. Negative values are treated as zero. 133 Defaults to `nil` (no maximum size). 134 """ 135 @type max_message_size_bytes() :: integer() | nil 136 137 typedstruct do 138 field :consumer, consumer(), enforce: true 139 field :name, name(), default: Drinkup.Jetstream 140 field :host, host(), default: @default_host 141 # TODO: Add NSID prefix validation once available in atex 142 field :wanted_collections, wanted_collections(), default: [] 143 field :wanted_dids, wanted_dids(), default: [] 144 field :cursor, cursor() 145 field :require_hello, require_hello(), default: false 146 field :max_message_size_bytes, max_message_size_bytes() 147 end 148 149 @spec from(options()) :: t() 150 def from(%{consumer: _} = options), do: struct(__MODULE__, options) 151end