Elixir ATProtocol ingestion and sync library.
at main 273 lines 9.0 kB view raw
1defmodule Drinkup.Jetstream do 2 @moduledoc """ 3 Module for handling events from an AT Protocol 4 [Jetstream](https://github.com/bluesky-social/jetstream) instance. 5 6 Jetstream is an abstraction over the raw AT Protocol firehose that converts 7 the CBOR-encoded events into easier to handle JSON objects, and also provides 8 the ability to filter the events received by repository DID or collection 9 NSID. This is useful when you know specifically which repos or collections you 10 want events from, and thus reduces the amount of bandwidth consumed vs 11 consuming the raw firehose directly. 12 13 If you need a solution for easy backfilling from repositories and not just a 14 firehose translation layer, check out `Drinkup.Tap`. 15 16 ## Usage 17 18 defmodule MyJetstreamConsumer do 19 use Drinkup.Jetstream, 20 name: :my_jetstream, 21 wanted_collections: ["app.bsky.feed.post"] 22 23 @impl true 24 def handle_event(event) do 25 IO.inspect(event) 26 end 27 end 28 29 # In your application supervision tree: 30 children = [MyJetstreamConsumer] 31 32 ## Configuration 33 34 See `Drinkup.Jetstream.Consumer` for all available configuration options. 35 36 ## Dynamic Filter Updates 37 38 You can update filters after the connection is established: 39 40 Drinkup.Jetstream.update_options(:my_jetstream, %{ 41 wanted_collections: ["app.bsky.graph.follow"], 42 wanted_dids: ["did:plc:abc123"] 43 }) 44 45 ## Public Instances 46 47 By default Drinkup connects to `jetstream2.us-east.bsky.network`. 48 49 Bluesky operates a few different Jetstream instances: 50 - `wss://jetstream1.us-east.bsky.network` 51 - `wss://jetstream2.us-east.bsky.network` 52 - `wss://jetstream1.us-west.bsky.network` 53 - `wss://jetstream2.us-west.bsky.network` 54 55 There also some third-party instances not run by Bluesky PBC, including but not limited to: 56 - `wss://jetstream.fire.hose.cam` 57 - `wss://jetstream2.fr.hose.cam` 58 - `wss://jetstream1.us-east.fire.hose.cam` 59 60 https://firehose.stream/ also hosts several instances around the world. 61 """ 62 63 require Logger 64 65 defmacro __using__(opts) do 66 quote location: :keep, bind_quoted: [opts: opts] do 67 use Supervisor 68 @behaviour Drinkup.Jetstream.Consumer 69 70 alias Drinkup.Jetstream.Options 71 72 # Store compile-time options as module attributes 73 @name Keyword.get(opts, :name) 74 @host Keyword.get(opts, :host, "wss://jetstream2.us-east.bsky.network") 75 @wanted_collections Keyword.get(opts, :wanted_collections, []) 76 @wanted_dids Keyword.get(opts, :wanted_dids, []) 77 @cursor Keyword.get(opts, :cursor) 78 @require_hello Keyword.get(opts, :require_hello, false) 79 @max_message_size_bytes Keyword.get(opts, :max_message_size_bytes) 80 81 @doc """ 82 Starts the Jetstream consumer supervisor. 83 84 Accepts optional runtime configuration that overrides compile-time options. 85 """ 86 def start_link(runtime_opts \\ []) do 87 opts = build_options(runtime_opts) 88 Supervisor.start_link(__MODULE__, opts, name: via_tuple(opts.name)) 89 end 90 91 @impl true 92 def init(%Options{name: name} = options) do 93 children = [ 94 {Task.Supervisor, name: {:via, Registry, {Drinkup.Registry, {name, JetstreamTasks}}}}, 95 {Drinkup.Jetstream.Socket, options} 96 ] 97 98 Supervisor.init(children, strategy: :one_for_one) 99 end 100 101 @doc """ 102 Returns a child spec for adding this consumer to a supervision tree. 103 104 Runtime options override compile-time options. 105 """ 106 def child_spec(runtime_opts) when is_list(runtime_opts) do 107 opts = build_options(runtime_opts) 108 109 %{ 110 id: opts.name, 111 start: {__MODULE__, :start_link, [runtime_opts]}, 112 type: :supervisor, 113 restart: :permanent, 114 shutdown: 500 115 } 116 end 117 118 def child_spec(_opts) do 119 raise ArgumentError, "child_spec expects a keyword list of options" 120 end 121 122 defoverridable child_spec: 1 123 124 # Build Options struct from compile-time and runtime options 125 defp build_options(runtime_opts) do 126 compile_opts = [ 127 name: @name || __MODULE__, 128 host: @host, 129 wanted_collections: @wanted_collections, 130 wanted_dids: @wanted_dids, 131 cursor: @cursor, 132 require_hello: @require_hello, 133 max_message_size_bytes: @max_message_size_bytes 134 ] 135 136 merged = 137 compile_opts 138 |> Keyword.merge(runtime_opts) 139 |> Enum.reject(fn {_k, v} -> is_nil(v) end) 140 |> Map.new() 141 |> Map.put(:consumer, __MODULE__) 142 143 Options.from(merged) 144 end 145 146 defp via_tuple(name) do 147 {:via, Registry, {Drinkup.Registry, {name, JetstreamSupervisor}}} 148 end 149 end 150 end 151 152 # Options Update API 153 154 @typedoc """ 155 Options that can be updated dynamically via `update_options/2`. 156 157 - `:wanted_collections` - List of collection NSIDs or prefixes (max 100) 158 - `:wanted_dids` - List of DIDs to filter (max 10,000) 159 - `:max_message_size_bytes` - Maximum message size to receive 160 161 Empty arrays will disable the corresponding filter (i.e., receive all). 162 """ 163 @type update_opts :: %{ 164 optional(:wanted_collections) => [String.t()], 165 optional(:wanted_dids) => [String.t()], 166 optional(:max_message_size_bytes) => integer() 167 } 168 169 @doc """ 170 Update filters and options for an active Jetstream connection. 171 172 Sends an options update message to the Jetstream server over the websocket 173 connection. This allows you to dynamically change which collections and DIDs 174 you're interested in without reconnecting. 175 176 ## Parameters 177 178 - `name` - The name of the Jetstream consumer (the `:name` option passed to `use Drinkup.Jetstream`) 179 - `opts` - Map with optional fields: 180 - `:wanted_collections` - List of collection NSIDs or prefixes (max 100) 181 - `:wanted_dids` - List of DIDs to filter (max 10,000) 182 - `:max_message_size_bytes` - Maximum message size to receive 183 184 ## Examples 185 186 # Filter to only posts 187 Drinkup.Jetstream.update_options(:my_jetstream, %{ 188 wanted_collections: ["app.bsky.feed.post"] 189 }) 190 191 # Filter to specific DIDs 192 Drinkup.Jetstream.update_options(:my_jetstream, %{ 193 wanted_dids: ["did:plc:abc123", "did:plc:def456"] 194 }) 195 196 # Disable all filters (receive all events) 197 Drinkup.Jetstream.update_options(:my_jetstream, %{ 198 wanted_collections: [], 199 wanted_dids: [] 200 }) 201 202 ## Return Value 203 204 Returns `:ok` if the message was sent successfully, or `{:error, reason}` if 205 the socket process could not be found or the message could not be sent. 206 207 Note: The server may reject invalid updates (e.g., too many collections/DIDs). 208 Invalid updates will result in the connection being closed by the server. 209 """ 210 @spec update_options(atom(), update_opts()) :: :ok | {:error, term()} 211 def update_options(name, opts) when is_atom(name) and is_map(opts) do 212 case find_connection(name) do 213 {:ok, {conn, stream}} -> 214 message = build_options_update_message(opts) 215 :ok = :gun.ws_send(conn, stream, {:text, message}) 216 217 Logger.debug("[Drinkup.Jetstream] Sent options update") 218 :ok 219 220 {:error, reason} -> 221 {:error, reason} 222 end 223 end 224 225 @spec find_connection(atom()) :: {:ok, {pid(), :gun.stream_ref()}} | {:error, :not_connected} 226 defp find_connection(name) do 227 # Look up the connection details from Registry 228 case Registry.lookup(Drinkup.Registry, {name, JetstreamConnection}) do 229 [{_socket_pid, {conn, stream}}] -> 230 {:ok, {conn, stream}} 231 232 [] -> 233 {:error, :not_connected} 234 end 235 end 236 237 @spec build_options_update_message(update_opts()) :: String.t() 238 defp build_options_update_message(opts) do 239 payload = 240 %{} 241 |> maybe_add_wanted_collections(Map.get(opts, :wanted_collections)) 242 |> maybe_add_wanted_dids(Map.get(opts, :wanted_dids)) 243 |> maybe_add_max_message_size(Map.get(opts, :max_message_size_bytes)) 244 245 message = %{ 246 "type" => "options_update", 247 "payload" => payload 248 } 249 250 Jason.encode!(message) 251 end 252 253 @spec maybe_add_wanted_collections(map(), [String.t()] | nil) :: map() 254 defp maybe_add_wanted_collections(payload, nil), do: payload 255 256 defp maybe_add_wanted_collections(payload, collections) when is_list(collections) do 257 Map.put(payload, "wantedCollections", collections) 258 end 259 260 @spec maybe_add_wanted_dids(map(), [String.t()] | nil) :: map() 261 defp maybe_add_wanted_dids(payload, nil), do: payload 262 263 defp maybe_add_wanted_dids(payload, dids) when is_list(dids) do 264 Map.put(payload, "wantedDids", dids) 265 end 266 267 @spec maybe_add_max_message_size(map(), integer() | nil) :: map() 268 defp maybe_add_max_message_size(payload, nil), do: payload 269 270 defp maybe_add_max_message_size(payload, max_size) when is_integer(max_size) do 271 Map.put(payload, "maxMessageSizeBytes", max_size) 272 end 273end