Elixir ATProtocol ingestion and sync library.
1defmodule Drinkup.Tap do
2 @moduledoc """
3 Module for handling events from a
4 [Tap](https://github.com/bluesky-social/indigo/tree/main/cmd/tap) instance.
5
6 Tap is a complete sync and backfill solution which handles the firehose
7 connection itself, and automatically searches for repositories to backfill
8 from based on the options given to it. It's great for building an app that
9 wants all of a certain set of records within the AT Protocol network.
10
11 This module requires you to be running a properly configured Tap instance, it
12 doesn't spawn one for itself.
13
14 ## Usage
15
16 defmodule MyTapConsumer do
17 use Drinkup.Tap,
18 name: :my_tap,
19 host: "http://localhost:2480",
20 admin_password: System.get_env("TAP_PASSWORD")
21
22 @impl true
23 def handle_event(event) do
24 # Process event
25 :ok
26 end
27 end
28
29 # In your application supervision tree:
30 children = [MyTapConsumer]
31
32 You can also interact with the Tap HTTP API to manually start tracking
33 specific repositories or get information about what's going on.
34
35 # Add repos to track (triggers backfill)
36 Drinkup.Tap.add_repos(:my_tap, ["did:plc:abc123"])
37
38 # Get stats
39 {:ok, count} = Drinkup.Tap.get_repo_count(:my_tap)
40 """
41
42 alias Drinkup.Tap.Options
43
44 defmacro __using__(opts) do
45 quote location: :keep, bind_quoted: [opts: opts] do
46 use Supervisor
47 @behaviour Drinkup.Tap.Consumer
48
49 alias Drinkup.Tap.Options
50
51 # Store compile-time options as module attributes
52 @name Keyword.get(opts, :name)
53 @host Keyword.get(opts, :host, "http://localhost:2480")
54 @admin_password Keyword.get(opts, :admin_password)
55 @disable_acks Keyword.get(opts, :disable_acks, false)
56
57 @doc """
58 Starts the Tap consumer supervisor.
59
60 Accepts optional runtime configuration that overrides compile-time options.
61 """
62 def start_link(runtime_opts \\ []) do
63 opts = build_options(runtime_opts)
64 Supervisor.start_link(__MODULE__, opts, name: via_tuple(opts.name))
65 end
66
67 @impl true
68 def init(%Options{name: name} = options) do
69 # Register options in Registry for HTTP API access
70 Registry.register(Drinkup.Registry, {name, TapOptions}, options)
71
72 children = [
73 {Task.Supervisor, name: {:via, Registry, {Drinkup.Registry, {name, TapTasks}}}},
74 {Drinkup.Tap.Socket, options}
75 ]
76
77 Supervisor.init(children, strategy: :one_for_one)
78 end
79
80 @doc """
81 Returns a child spec for adding this consumer to a supervision tree.
82
83 Runtime options override compile-time options.
84 """
85 def child_spec(runtime_opts) when is_list(runtime_opts) do
86 opts = build_options(runtime_opts)
87
88 %{
89 id: opts.name,
90 start: {__MODULE__, :start_link, [runtime_opts]},
91 type: :supervisor,
92 restart: :permanent,
93 shutdown: 500
94 }
95 end
96
97 def child_spec(_opts) do
98 raise ArgumentError, "child_spec expects a keyword list of options"
99 end
100
101 defoverridable child_spec: 1
102
103 # Build Options struct from compile-time and runtime options
104 defp build_options(runtime_opts) do
105 compile_opts = [
106 name: @name || __MODULE__,
107 host: @host,
108 admin_password: @admin_password,
109 disable_acks: @disable_acks
110 ]
111
112 merged =
113 compile_opts
114 |> Keyword.merge(runtime_opts)
115 |> Enum.reject(fn {_k, v} -> is_nil(v) end)
116 |> Map.new()
117 |> Map.put(:consumer, __MODULE__)
118
119 Options.from(merged)
120 end
121
122 defp via_tuple(name) do
123 {:via, Registry, {Drinkup.Registry, {name, TapSupervisor}}}
124 end
125 end
126 end
127
128 # HTTP API Functions
129
130 @doc """
131 Add DIDs to track.
132
133 Triggers backfill for the specified DIDs. Historical events will be fetched
134 from each repo's PDS, followed by live events from the firehose.
135
136 ## Parameters
137
138 - `name` - The name of the Tap consumer (the `:name` option passed to `use Drinkup.Tap`)
139 - `dids` - List of DID strings to add
140 """
141 @spec add_repos(atom(), [String.t()]) :: {:ok, term()} | {:error, term()}
142 def add_repos(name, dids) when is_atom(name) and is_list(dids) do
143 with {:ok, options} <- get_options(name),
144 {:ok, response} <- make_request(options, :post, "/repos/add", %{dids: dids}) do
145 {:ok, response}
146 end
147 end
148
149 @doc """
150 Remove DIDs from tracking.
151
152 Stops syncing the specified repos and deletes tracked repo metadata. Does not
153 delete buffered events in the outbox.
154
155 ## Parameters
156
157 - `name` - The name of the Tap consumer (the `:name` option passed to `use Drinkup.Tap`)
158 - `dids` - List of DID strings to remove
159 """
160 @spec remove_repos(atom(), [String.t()]) :: {:ok, term()} | {:error, term()}
161 def remove_repos(name, dids) when is_atom(name) and is_list(dids) do
162 with {:ok, options} <- get_options(name),
163 {:ok, response} <- make_request(options, :post, "/repos/remove", %{dids: dids}) do
164 {:ok, response}
165 end
166 end
167
168 @doc """
169 Resolve a DID to its DID document.
170
171 ## Parameters
172
173 - `name` - The name of the Tap consumer
174 - `did` - DID string to resolve
175 """
176 @spec resolve_did(atom(), String.t()) :: {:ok, term()} | {:error, term()}
177 def resolve_did(name, did) when is_atom(name) and is_binary(did) do
178 with {:ok, options} <- get_options(name),
179 {:ok, response} <- make_request(options, :get, "/resolve/#{did}") do
180 {:ok, response}
181 end
182 end
183
184 @doc """
185 Get info about a tracked repo.
186
187 Returns repo state, repo rev, record count, error info, and retry count.
188
189 ## Parameters
190
191 - `name` - The name of the Tap consumer
192 - `did` - DID string to get info for
193 """
194 @spec get_repo_info(atom(), String.t()) :: {:ok, term()} | {:error, term()}
195 def get_repo_info(name, did) when is_atom(name) and is_binary(did) do
196 with {:ok, options} <- get_options(name),
197 {:ok, response} <- make_request(options, :get, "/info/#{did}") do
198 {:ok, response}
199 end
200 end
201
202 @doc """
203 Get the total number of tracked repos.
204
205 ## Parameters
206
207 - `name` - The name of the Tap consumer
208 """
209 @spec get_repo_count(atom()) :: {:ok, integer()} | {:error, term()}
210 def get_repo_count(name) when is_atom(name) do
211 with {:ok, options} <- get_options(name),
212 {:ok, response} <- make_request(options, :get, "/stats/repo-count") do
213 {:ok, response}
214 end
215 end
216
217 @doc """
218 Get the total number of tracked records.
219
220 ## Parameters
221
222 - `name` - The name of the Tap consumer
223 """
224 @spec get_record_count(atom()) :: {:ok, integer()} | {:error, term()}
225 def get_record_count(name) when is_atom(name) do
226 with {:ok, options} <- get_options(name),
227 {:ok, response} <- make_request(options, :get, "/stats/record-count") do
228 {:ok, response}
229 end
230 end
231
232 @doc """
233 Get the number of events in the outbox buffer.
234
235 ## Parameters
236
237 - `name` - The name of the Tap consumer
238 """
239 @spec get_outbox_buffer(atom()) :: {:ok, integer()} | {:error, term()}
240 def get_outbox_buffer(name) when is_atom(name) do
241 with {:ok, options} <- get_options(name),
242 {:ok, response} <- make_request(options, :get, "/stats/outbox-buffer") do
243 {:ok, response}
244 end
245 end
246
247 @doc """
248 Get the number of events in the resync buffer.
249
250 ## Parameters
251
252 - `name` - The name of the Tap consumer
253 """
254 @spec get_resync_buffer(atom()) :: {:ok, integer()} | {:error, term()}
255 def get_resync_buffer(name) when is_atom(name) do
256 with {:ok, options} <- get_options(name),
257 {:ok, response} <- make_request(options, :get, "/stats/resync-buffer") do
258 {:ok, response}
259 end
260 end
261
262 @doc """
263 Get current firehose and list repos cursors.
264
265 ## Parameters
266
267 - `name` - The name of the Tap consumer
268 """
269 @spec get_cursors(atom()) :: {:ok, map()} | {:error, term()}
270 def get_cursors(name) when is_atom(name) do
271 with {:ok, options} <- get_options(name),
272 {:ok, response} <- make_request(options, :get, "/stats/cursors") do
273 {:ok, response}
274 end
275 end
276
277 @doc """
278 Check Tap health status.
279
280 Returns `{:ok, %{"status" => "ok"}}` if healthy.
281
282 ## Parameters
283
284 - `name` - The name of the Tap consumer
285 """
286 @spec health(atom()) :: {:ok, map()} | {:error, term()}
287 def health(name) when is_atom(name) do
288 with {:ok, options} <- get_options(name),
289 {:ok, response} <- make_request(options, :get, "/health") do
290 {:ok, response}
291 end
292 end
293
294 @spec get_options(atom()) :: {:ok, Options.t()} | {:error, :not_found}
295 defp get_options(name) do
296 case Registry.lookup(Drinkup.Registry, {name, TapOptions}) do
297 [{_pid, options}] -> {:ok, options}
298 [] -> {:error, :not_found}
299 end
300 end
301
302 @spec make_request(Options.t(), atom(), String.t(), map() | nil) ::
303 {:ok, term()} | {:error, term()}
304 defp make_request(options, method, path, body \\ nil) do
305 url = build_url(options.host, path)
306 headers = build_headers(options.admin_password)
307
308 request_opts = [
309 method: method,
310 url: url,
311 headers: headers
312 ]
313
314 request_opts =
315 if body do
316 Keyword.merge(request_opts, json: body)
317 else
318 request_opts
319 end
320
321 case Req.request(request_opts) do
322 {:ok, %{status: status, body: body}} when status in 200..299 ->
323 {:ok, body}
324
325 {:ok, %{status: status, body: body}} ->
326 {:error, {:http_error, status, body}}
327
328 {:error, reason} ->
329 {:error, reason}
330 end
331 end
332
333 @spec build_url(String.t(), String.t()) :: String.t()
334 defp build_url(host, path) do
335 host = String.trim_trailing(host, "/")
336 "#{host}#{path}"
337 end
338
339 @spec build_headers(String.t() | nil) :: list()
340 defp build_headers(nil), do: []
341
342 defp build_headers(admin_password) do
343 credentials = "admin:#{admin_password}"
344 auth_header = "Basic #{Base.encode64(credentials)}"
345 [{"authorization", auth_header}]
346 end
347end