Elixir ATProtocol ingestion and sync library.
1defmodule Drinkup.Jetstream do
2 @moduledoc """
3 Module for handling events from an AT Protocol
4 [Jetstream](https://github.com/bluesky-social/jetstream) instance.
5
6 Jetstream is an abstraction over the raw AT Protocol firehose that converts
7 the CBOR-encoded events into easier to handle JSON objects, and also provides
8 the ability to filter the events received by repository DID or collection
9 NSID. This is useful when you know specifically which repos or collections you
10 want events from, and thus reduces the amount of bandwidth consumed vs
11 consuming the raw firehose directly.
12
13 If you need a solution for easy backfilling from repositories and not just a
14 firehose translation layer, check out `Drinkup.Tap`.
15
16 ## Usage
17
18 defmodule MyJetstreamConsumer do
19 use Drinkup.Jetstream,
20 name: :my_jetstream,
21 wanted_collections: ["app.bsky.feed.post"]
22
23 @impl true
24 def handle_event(event) do
25 IO.inspect(event)
26 end
27 end
28
29 # In your application supervision tree:
30 children = [MyJetstreamConsumer]
31
32 ## Configuration
33
34 See `Drinkup.Jetstream.Consumer` for all available configuration options.
35
36 ## Dynamic Filter Updates
37
38 You can update filters after the connection is established:
39
40 Drinkup.Jetstream.update_options(:my_jetstream, %{
41 wanted_collections: ["app.bsky.graph.follow"],
42 wanted_dids: ["did:plc:abc123"]
43 })
44
45 ## Public Instances
46
47 By default Drinkup connects to `jetstream2.us-east.bsky.network`.
48
49 Bluesky operates a few different Jetstream instances:
50 - `wss://jetstream1.us-east.bsky.network`
51 - `wss://jetstream2.us-east.bsky.network`
52 - `wss://jetstream1.us-west.bsky.network`
53 - `wss://jetstream2.us-west.bsky.network`
54
55 There also some third-party instances not run by Bluesky PBC, including but not limited to:
56 - `wss://jetstream.fire.hose.cam`
57 - `wss://jetstream2.fr.hose.cam`
58 - `wss://jetstream1.us-east.fire.hose.cam`
59
60 https://firehose.stream/ also hosts several instances around the world.
61 """
62
63 require Logger
64
65 defmacro __using__(opts) do
66 quote location: :keep, bind_quoted: [opts: opts] do
67 use Supervisor
68 @behaviour Drinkup.Jetstream.Consumer
69
70 alias Drinkup.Jetstream.Options
71
72 # Store compile-time options as module attributes
73 @name Keyword.get(opts, :name)
74 @host Keyword.get(opts, :host, "wss://jetstream2.us-east.bsky.network")
75 @wanted_collections Keyword.get(opts, :wanted_collections, [])
76 @wanted_dids Keyword.get(opts, :wanted_dids, [])
77 @cursor Keyword.get(opts, :cursor)
78 @require_hello Keyword.get(opts, :require_hello, false)
79 @max_message_size_bytes Keyword.get(opts, :max_message_size_bytes)
80
81 @doc """
82 Starts the Jetstream consumer supervisor.
83
84 Accepts optional runtime configuration that overrides compile-time options.
85 """
86 def start_link(runtime_opts \\ []) do
87 opts = build_options(runtime_opts)
88 Supervisor.start_link(__MODULE__, opts, name: via_tuple(opts.name))
89 end
90
91 @impl true
92 def init(%Options{name: name} = options) do
93 children = [
94 {Task.Supervisor, name: {:via, Registry, {Drinkup.Registry, {name, JetstreamTasks}}}},
95 {Drinkup.Jetstream.Socket, options}
96 ]
97
98 Supervisor.init(children, strategy: :one_for_one)
99 end
100
101 @doc """
102 Returns a child spec for adding this consumer to a supervision tree.
103
104 Runtime options override compile-time options.
105 """
106 def child_spec(runtime_opts) when is_list(runtime_opts) do
107 opts = build_options(runtime_opts)
108
109 %{
110 id: opts.name,
111 start: {__MODULE__, :start_link, [runtime_opts]},
112 type: :supervisor,
113 restart: :permanent,
114 shutdown: 500
115 }
116 end
117
118 def child_spec(_opts) do
119 raise ArgumentError, "child_spec expects a keyword list of options"
120 end
121
122 defoverridable child_spec: 1
123
124 # Build Options struct from compile-time and runtime options
125 defp build_options(runtime_opts) do
126 compile_opts = [
127 name: @name || __MODULE__,
128 host: @host,
129 wanted_collections: @wanted_collections,
130 wanted_dids: @wanted_dids,
131 cursor: @cursor,
132 require_hello: @require_hello,
133 max_message_size_bytes: @max_message_size_bytes
134 ]
135
136 merged =
137 compile_opts
138 |> Keyword.merge(runtime_opts)
139 |> Enum.reject(fn {_k, v} -> is_nil(v) end)
140 |> Map.new()
141 |> Map.put(:consumer, __MODULE__)
142
143 Options.from(merged)
144 end
145
146 defp via_tuple(name) do
147 {:via, Registry, {Drinkup.Registry, {name, JetstreamSupervisor}}}
148 end
149 end
150 end
151
152 # Options Update API
153
154 @typedoc """
155 Options that can be updated dynamically via `update_options/2`.
156
157 - `:wanted_collections` - List of collection NSIDs or prefixes (max 100)
158 - `:wanted_dids` - List of DIDs to filter (max 10,000)
159 - `:max_message_size_bytes` - Maximum message size to receive
160
161 Empty arrays will disable the corresponding filter (i.e., receive all).
162 """
163 @type update_opts :: %{
164 optional(:wanted_collections) => [String.t()],
165 optional(:wanted_dids) => [String.t()],
166 optional(:max_message_size_bytes) => integer()
167 }
168
169 @doc """
170 Update filters and options for an active Jetstream connection.
171
172 Sends an options update message to the Jetstream server over the websocket
173 connection. This allows you to dynamically change which collections and DIDs
174 you're interested in without reconnecting.
175
176 ## Parameters
177
178 - `name` - The name of the Jetstream consumer (the `:name` option passed to `use Drinkup.Jetstream`)
179 - `opts` - Map with optional fields:
180 - `:wanted_collections` - List of collection NSIDs or prefixes (max 100)
181 - `:wanted_dids` - List of DIDs to filter (max 10,000)
182 - `:max_message_size_bytes` - Maximum message size to receive
183
184 ## Examples
185
186 # Filter to only posts
187 Drinkup.Jetstream.update_options(:my_jetstream, %{
188 wanted_collections: ["app.bsky.feed.post"]
189 })
190
191 # Filter to specific DIDs
192 Drinkup.Jetstream.update_options(:my_jetstream, %{
193 wanted_dids: ["did:plc:abc123", "did:plc:def456"]
194 })
195
196 # Disable all filters (receive all events)
197 Drinkup.Jetstream.update_options(:my_jetstream, %{
198 wanted_collections: [],
199 wanted_dids: []
200 })
201
202 ## Return Value
203
204 Returns `:ok` if the message was sent successfully, or `{:error, reason}` if
205 the socket process could not be found or the message could not be sent.
206
207 Note: The server may reject invalid updates (e.g., too many collections/DIDs).
208 Invalid updates will result in the connection being closed by the server.
209 """
210 @spec update_options(atom(), update_opts()) :: :ok | {:error, term()}
211 def update_options(name, opts) when is_atom(name) and is_map(opts) do
212 case find_connection(name) do
213 {:ok, {conn, stream}} ->
214 message = build_options_update_message(opts)
215 :ok = :gun.ws_send(conn, stream, {:text, message})
216
217 Logger.debug("[Drinkup.Jetstream] Sent options update")
218 :ok
219
220 {:error, reason} ->
221 {:error, reason}
222 end
223 end
224
225 @spec find_connection(atom()) :: {:ok, {pid(), :gun.stream_ref()}} | {:error, :not_connected}
226 defp find_connection(name) do
227 # Look up the connection details from Registry
228 case Registry.lookup(Drinkup.Registry, {name, JetstreamConnection}) do
229 [{_socket_pid, {conn, stream}}] ->
230 {:ok, {conn, stream}}
231
232 [] ->
233 {:error, :not_connected}
234 end
235 end
236
237 @spec build_options_update_message(update_opts()) :: String.t()
238 defp build_options_update_message(opts) do
239 payload =
240 %{}
241 |> maybe_add_wanted_collections(Map.get(opts, :wanted_collections))
242 |> maybe_add_wanted_dids(Map.get(opts, :wanted_dids))
243 |> maybe_add_max_message_size(Map.get(opts, :max_message_size_bytes))
244
245 message = %{
246 "type" => "options_update",
247 "payload" => payload
248 }
249
250 Jason.encode!(message)
251 end
252
253 @spec maybe_add_wanted_collections(map(), [String.t()] | nil) :: map()
254 defp maybe_add_wanted_collections(payload, nil), do: payload
255
256 defp maybe_add_wanted_collections(payload, collections) when is_list(collections) do
257 Map.put(payload, "wantedCollections", collections)
258 end
259
260 @spec maybe_add_wanted_dids(map(), [String.t()] | nil) :: map()
261 defp maybe_add_wanted_dids(payload, nil), do: payload
262
263 defp maybe_add_wanted_dids(payload, dids) when is_list(dids) do
264 Map.put(payload, "wantedDids", dids)
265 end
266
267 @spec maybe_add_max_message_size(map(), integer() | nil) :: map()
268 defp maybe_add_max_message_size(payload, nil), do: payload
269
270 defp maybe_add_max_message_size(payload, max_size) when is_integer(max_size) do
271 Map.put(payload, "maxMessageSizeBytes", max_size)
272 end
273end