Elixir ATProtocol ingestion and sync library.
at main 347 lines 10 kB view raw
1defmodule Drinkup.Tap do 2 @moduledoc """ 3 Module for handling events from a 4 [Tap](https://github.com/bluesky-social/indigo/tree/main/cmd/tap) instance. 5 6 Tap is a complete sync and backfill solution which handles the firehose 7 connection itself, and automatically searches for repositories to backfill 8 from based on the options given to it. It's great for building an app that 9 wants all of a certain set of records within the AT Protocol network. 10 11 This module requires you to be running a properly configured Tap instance, it 12 doesn't spawn one for itself. 13 14 ## Usage 15 16 defmodule MyTapConsumer do 17 use Drinkup.Tap, 18 name: :my_tap, 19 host: "http://localhost:2480", 20 admin_password: System.get_env("TAP_PASSWORD") 21 22 @impl true 23 def handle_event(event) do 24 # Process event 25 :ok 26 end 27 end 28 29 # In your application supervision tree: 30 children = [MyTapConsumer] 31 32 You can also interact with the Tap HTTP API to manually start tracking 33 specific repositories or get information about what's going on. 34 35 # Add repos to track (triggers backfill) 36 Drinkup.Tap.add_repos(:my_tap, ["did:plc:abc123"]) 37 38 # Get stats 39 {:ok, count} = Drinkup.Tap.get_repo_count(:my_tap) 40 """ 41 42 alias Drinkup.Tap.Options 43 44 defmacro __using__(opts) do 45 quote location: :keep, bind_quoted: [opts: opts] do 46 use Supervisor 47 @behaviour Drinkup.Tap.Consumer 48 49 alias Drinkup.Tap.Options 50 51 # Store compile-time options as module attributes 52 @name Keyword.get(opts, :name) 53 @host Keyword.get(opts, :host, "http://localhost:2480") 54 @admin_password Keyword.get(opts, :admin_password) 55 @disable_acks Keyword.get(opts, :disable_acks, false) 56 57 @doc """ 58 Starts the Tap consumer supervisor. 59 60 Accepts optional runtime configuration that overrides compile-time options. 61 """ 62 def start_link(runtime_opts \\ []) do 63 opts = build_options(runtime_opts) 64 Supervisor.start_link(__MODULE__, opts, name: via_tuple(opts.name)) 65 end 66 67 @impl true 68 def init(%Options{name: name} = options) do 69 # Register options in Registry for HTTP API access 70 Registry.register(Drinkup.Registry, {name, TapOptions}, options) 71 72 children = [ 73 {Task.Supervisor, name: {:via, Registry, {Drinkup.Registry, {name, TapTasks}}}}, 74 {Drinkup.Tap.Socket, options} 75 ] 76 77 Supervisor.init(children, strategy: :one_for_one) 78 end 79 80 @doc """ 81 Returns a child spec for adding this consumer to a supervision tree. 82 83 Runtime options override compile-time options. 84 """ 85 def child_spec(runtime_opts) when is_list(runtime_opts) do 86 opts = build_options(runtime_opts) 87 88 %{ 89 id: opts.name, 90 start: {__MODULE__, :start_link, [runtime_opts]}, 91 type: :supervisor, 92 restart: :permanent, 93 shutdown: 500 94 } 95 end 96 97 def child_spec(_opts) do 98 raise ArgumentError, "child_spec expects a keyword list of options" 99 end 100 101 defoverridable child_spec: 1 102 103 # Build Options struct from compile-time and runtime options 104 defp build_options(runtime_opts) do 105 compile_opts = [ 106 name: @name || __MODULE__, 107 host: @host, 108 admin_password: @admin_password, 109 disable_acks: @disable_acks 110 ] 111 112 merged = 113 compile_opts 114 |> Keyword.merge(runtime_opts) 115 |> Enum.reject(fn {_k, v} -> is_nil(v) end) 116 |> Map.new() 117 |> Map.put(:consumer, __MODULE__) 118 119 Options.from(merged) 120 end 121 122 defp via_tuple(name) do 123 {:via, Registry, {Drinkup.Registry, {name, TapSupervisor}}} 124 end 125 end 126 end 127 128 # HTTP API Functions 129 130 @doc """ 131 Add DIDs to track. 132 133 Triggers backfill for the specified DIDs. Historical events will be fetched 134 from each repo's PDS, followed by live events from the firehose. 135 136 ## Parameters 137 138 - `name` - The name of the Tap consumer (the `:name` option passed to `use Drinkup.Tap`) 139 - `dids` - List of DID strings to add 140 """ 141 @spec add_repos(atom(), [String.t()]) :: {:ok, term()} | {:error, term()} 142 def add_repos(name, dids) when is_atom(name) and is_list(dids) do 143 with {:ok, options} <- get_options(name), 144 {:ok, response} <- make_request(options, :post, "/repos/add", %{dids: dids}) do 145 {:ok, response} 146 end 147 end 148 149 @doc """ 150 Remove DIDs from tracking. 151 152 Stops syncing the specified repos and deletes tracked repo metadata. Does not 153 delete buffered events in the outbox. 154 155 ## Parameters 156 157 - `name` - The name of the Tap consumer (the `:name` option passed to `use Drinkup.Tap`) 158 - `dids` - List of DID strings to remove 159 """ 160 @spec remove_repos(atom(), [String.t()]) :: {:ok, term()} | {:error, term()} 161 def remove_repos(name, dids) when is_atom(name) and is_list(dids) do 162 with {:ok, options} <- get_options(name), 163 {:ok, response} <- make_request(options, :post, "/repos/remove", %{dids: dids}) do 164 {:ok, response} 165 end 166 end 167 168 @doc """ 169 Resolve a DID to its DID document. 170 171 ## Parameters 172 173 - `name` - The name of the Tap consumer 174 - `did` - DID string to resolve 175 """ 176 @spec resolve_did(atom(), String.t()) :: {:ok, term()} | {:error, term()} 177 def resolve_did(name, did) when is_atom(name) and is_binary(did) do 178 with {:ok, options} <- get_options(name), 179 {:ok, response} <- make_request(options, :get, "/resolve/#{did}") do 180 {:ok, response} 181 end 182 end 183 184 @doc """ 185 Get info about a tracked repo. 186 187 Returns repo state, repo rev, record count, error info, and retry count. 188 189 ## Parameters 190 191 - `name` - The name of the Tap consumer 192 - `did` - DID string to get info for 193 """ 194 @spec get_repo_info(atom(), String.t()) :: {:ok, term()} | {:error, term()} 195 def get_repo_info(name, did) when is_atom(name) and is_binary(did) do 196 with {:ok, options} <- get_options(name), 197 {:ok, response} <- make_request(options, :get, "/info/#{did}") do 198 {:ok, response} 199 end 200 end 201 202 @doc """ 203 Get the total number of tracked repos. 204 205 ## Parameters 206 207 - `name` - The name of the Tap consumer 208 """ 209 @spec get_repo_count(atom()) :: {:ok, integer()} | {:error, term()} 210 def get_repo_count(name) when is_atom(name) do 211 with {:ok, options} <- get_options(name), 212 {:ok, response} <- make_request(options, :get, "/stats/repo-count") do 213 {:ok, response} 214 end 215 end 216 217 @doc """ 218 Get the total number of tracked records. 219 220 ## Parameters 221 222 - `name` - The name of the Tap consumer 223 """ 224 @spec get_record_count(atom()) :: {:ok, integer()} | {:error, term()} 225 def get_record_count(name) when is_atom(name) do 226 with {:ok, options} <- get_options(name), 227 {:ok, response} <- make_request(options, :get, "/stats/record-count") do 228 {:ok, response} 229 end 230 end 231 232 @doc """ 233 Get the number of events in the outbox buffer. 234 235 ## Parameters 236 237 - `name` - The name of the Tap consumer 238 """ 239 @spec get_outbox_buffer(atom()) :: {:ok, integer()} | {:error, term()} 240 def get_outbox_buffer(name) when is_atom(name) do 241 with {:ok, options} <- get_options(name), 242 {:ok, response} <- make_request(options, :get, "/stats/outbox-buffer") do 243 {:ok, response} 244 end 245 end 246 247 @doc """ 248 Get the number of events in the resync buffer. 249 250 ## Parameters 251 252 - `name` - The name of the Tap consumer 253 """ 254 @spec get_resync_buffer(atom()) :: {:ok, integer()} | {:error, term()} 255 def get_resync_buffer(name) when is_atom(name) do 256 with {:ok, options} <- get_options(name), 257 {:ok, response} <- make_request(options, :get, "/stats/resync-buffer") do 258 {:ok, response} 259 end 260 end 261 262 @doc """ 263 Get current firehose and list repos cursors. 264 265 ## Parameters 266 267 - `name` - The name of the Tap consumer 268 """ 269 @spec get_cursors(atom()) :: {:ok, map()} | {:error, term()} 270 def get_cursors(name) when is_atom(name) do 271 with {:ok, options} <- get_options(name), 272 {:ok, response} <- make_request(options, :get, "/stats/cursors") do 273 {:ok, response} 274 end 275 end 276 277 @doc """ 278 Check Tap health status. 279 280 Returns `{:ok, %{"status" => "ok"}}` if healthy. 281 282 ## Parameters 283 284 - `name` - The name of the Tap consumer 285 """ 286 @spec health(atom()) :: {:ok, map()} | {:error, term()} 287 def health(name) when is_atom(name) do 288 with {:ok, options} <- get_options(name), 289 {:ok, response} <- make_request(options, :get, "/health") do 290 {:ok, response} 291 end 292 end 293 294 @spec get_options(atom()) :: {:ok, Options.t()} | {:error, :not_found} 295 defp get_options(name) do 296 case Registry.lookup(Drinkup.Registry, {name, TapOptions}) do 297 [{_pid, options}] -> {:ok, options} 298 [] -> {:error, :not_found} 299 end 300 end 301 302 @spec make_request(Options.t(), atom(), String.t(), map() | nil) :: 303 {:ok, term()} | {:error, term()} 304 defp make_request(options, method, path, body \\ nil) do 305 url = build_url(options.host, path) 306 headers = build_headers(options.admin_password) 307 308 request_opts = [ 309 method: method, 310 url: url, 311 headers: headers 312 ] 313 314 request_opts = 315 if body do 316 Keyword.merge(request_opts, json: body) 317 else 318 request_opts 319 end 320 321 case Req.request(request_opts) do 322 {:ok, %{status: status, body: body}} when status in 200..299 -> 323 {:ok, body} 324 325 {:ok, %{status: status, body: body}} -> 326 {:error, {:http_error, status, body}} 327 328 {:error, reason} -> 329 {:error, reason} 330 end 331 end 332 333 @spec build_url(String.t(), String.t()) :: String.t() 334 defp build_url(host, path) do 335 host = String.trim_trailing(host, "/") 336 "#{host}#{path}" 337 end 338 339 @spec build_headers(String.t() | nil) :: list() 340 defp build_headers(nil), do: [] 341 342 defp build_headers(admin_password) do 343 credentials = "admin:#{admin_password}" 344 auth_header = "Basic #{Base.encode64(credentials)}" 345 [{"authorization", auth_header}] 346 end 347end