···11-# The directory Mix will write compiled artifacts to.
21/_build/
33-44-# If you run "mix test --cover", coverage assets end up here.
52/cover/
66-77-# The directory Mix downloads your dependencies sources to.
83/deps/
99-1010-# Where third-party dependencies like ExDoc output generated docs.
114/doc/
1212-1313-# If the VM crashes, it generates a dump, let's ignore it too.
145erl_crash.dump
1515-1616-# Also ignore archive artifacts (built via "mix archive.build").
176*.ez
1818-1919-# Ignore package tarball (built via "mix hex.build").
207drinkup-*.tar
2121-2222-# Temporary files, for example, from tests.
238/tmp/
2424-2525-# Nix
269.envrc
2710.direnv
2828-result1111+result
1212+priv/dets/
+40
AGENTS.md
···11+# Agent Guidelines for Drinkup
22+33+## Commands
44+55+- **Test**: `mix test` (all), `mix test test/path/to/file_test.exs` (single file), `mix test test/path/to/file_test.exs:42` (single test at line)
66+- **Format**: `mix format` (auto-formats all code)
77+- **Lint**: `mix credo` (static analysis), `mix credo --strict` (strict mode)
88+- **Compile**: `mix compile`
99+- **Docs**: `mix docs`
1010+- **Type Check**: `mix dialyzer` (if configured)
1111+1212+## Code Style
1313+1414+- **Imports**: Use `alias` for modules (e.g., `alias Drinkup.Firehose.{Event, Options}`), `require` for macros (e.g., `require Logger`)
1515+- **Formatting**: Elixir 1.18+, auto-formatted via `.formatter.exs` with `import_deps: [:typedstruct]`
1616+- **Naming**: snake_case for functions/variables, PascalCase for modules, `:lowercase_atoms` for atoms, `@behaviour` (not `@behavior`)
1717+- **Types**: Use `@type` and `@spec` for all functions; use TypedStruct for structs with `enforce: true` for required fields
1818+- **Moduledocs**: Public modules need `@moduledoc`, public functions need `@doc` with examples
1919+- **Error Handling**: Return `{:ok, result}` or `{:error, reason}` tuples; use `with` for chaining operations; log errors with `Logger.error("#{Exception.format(:error, e, __STACKTRACE__)}")`
2020+- **Pattern Matching**: Prefer pattern matching in function heads over conditionals; use guard clauses when appropriate
2121+- **OTP**: Use `child_spec/1` for custom supervisor specs; `:gen_statem` for state machines; `Task.Supervisor` for concurrent tasks; Registry for named lookups
2222+- **Tests**: Use ExUnit with `use ExUnit.Case`; use `doctest Module` for documentation examples
2323+- **Dependencies**: Core deps include gun (WebSocket), car (CAR format), cbor (encoding), TypedStruct (typed structs), Credo (linting)
2424+2525+## Project Structure
2626+2727+- **Namespace**: All firehose functionality under `Drinkup.Firehose.*`
2828+ - `Drinkup.Firehose` - Main supervisor
2929+ - `Drinkup.Firehose.Consumer` - Behaviour for handling all events
3030+ - `Drinkup.Firehose.RecordConsumer` - Macro for handling commit record events with filtering
3131+ - `Drinkup.Firehose.Event` - Event types (`Commit`, `Sync`, `Identity`, `Account`, `Info`)
3232+ - `Drinkup.Firehose.Socket` - `:gen_statem` WebSocket connection manager
3333+- **Consumer Pattern**: Implement `@behaviour Drinkup.Firehose.Consumer` with `handle_event/1`
3434+- **RecordConsumer Pattern**: `use Drinkup.Firehose.RecordConsumer, collections: [~r/app\.bsky\.graph\..+/, "app.bsky.feed.post"]` with `handle_create/1`, `handle_update/1`, `handle_delete/1` overrides
3535+3636+## Important Notes
3737+3838+- **Update CHANGELOG.md** when adding features, changes, or fixes under `## [Unreleased]` with appropriate sections (`Added`, `Changed`, `Fixed`, `Deprecated`, `Removed`, `Security`)
3939+- **WebSocket States**: Socket uses `:disconnected` โ `:connecting_http` โ `:connecting_ws` โ `:connected` flow
4040+- **Sequence Tracking**: Use `Event.valid_seq?/2` to validate sequence numbers from firehose
+22-2
CHANGELOG.md
···66and this project adheres to
77[Semantic Versioning](https://semver.org/spec/v2.0.0.html).
8899+## [Unreleased]
1010+1111+### Breaking Changes
1212+1313+- Existing behaviour moved to `Drinkup.Firehose` namespace, to make way for
1414+ alternate sync systems.
1515+1616+### Added
1717+1818+- Support for the
1919+ [Tap](https://github.com/bluesky-social/indigo/blob/main/cmd/tap/README.md)
2020+ sync and backfill utility service, via `Drinkup.Tap`.
2121+- Support for [Jetstream](https://github.com/bluesky-social/jetstream), a
2222+ simplified JSON event stream for ATProto, via `Drinkup.Jetstream`.
2323+2424+### Changed
2525+2626+- Refactor core connection logic for websockets into `Drinkup.Socket` to make it
2727+ easy to use across multiple different services.
2828+929## [0.1.0] - 2025-05-26
10301131Initial release.
12321313-[unreleased]: https://github.com/cometsh/elixir-car/compare/v0.1.0...HEAD
1414-[0.1.0]: https://github.com/cometsh/elixir-car/releases/tag/v0.1.0
3333+[unreleased]: https://github.com/cometsh/drinkup/compare/v0.1.0...HEAD
3434+[0.1.0]: https://github.com/cometsh/drinkup/releases/tag/v0.1.0
+60-58
README.md
···11# Drinkup
2233-An Elixir library for listening to events from an ATProtocol relay
44-(firehose/`com.atproto.sync.subscribeRepos`). Eventually aiming to support any
55-ATProtocol subscription.
33+An Elixir library for consuming various AT Protocol sync services.
6477-## TODO
55+Drinkup provides a unified interface for connecting to various AT Protocol data
66+streams, handling reconnection logic, sequence tracking, and event dispatch.
77+Choose the sync service that fits your needs:
8899-- Support for different subscriptions other than
1010- `com.atproto.sync.subscribeRepo'.
1111-- Validation (signatures, making sure to only track handle active accounts,
1212- etc.) (see
1313- [Firehose Validation Best Practices](https://atproto.com/specs/sync#firehose-validation-best-practices))
1414-- Look into backfilling? See if there's better ways to do it.
1515-- Built-in solutions for tracking resumption? (probably a pluggable solution to
1616- allow for different things like Mnesia, Postgres, etc.)
1717-- Testing of multi-node/distribution.
1818-- Tests
1919-- Documentation
99+- **Firehose** - Raw event stream from the full AT Protocol network.
1010+- **Jetstream** - Lightweight, cherry-picked event stream with filtering by
1111+ record collections and DIDs.
1212+- **Tap** - Managed backfill and indexing solution.
20132114## Installation
22152323-Add `drinkup` to your `mix.exs`.
1616+Add `drinkup` to your `mix.exs`:
24172518```elixir
2619def deps do
···32253326Documentation can be found on HexDocs at https://hexdocs.pm/drinkup.
34273535-## Example Usage
2828+## Quick Start
36293737-First, create a module implementing the `Drinkup.Consumer` behaviour (only
3838-requires a `handle_event/1` function):
3030+### Firehose
39314032```elixir
4141-defmodule ExampleConsumer do
4242- @behaviour Drinkup.Consumer
3333+defmodule MyApp.FirehoseConsumer do
3434+ @behaviour Drinkup.Firehose.Consumer
43354444- def handle_event(%Drinkup.Event.Commit{} = event) do
4545- IO.inspect(event, label: "Got commit event")
3636+ def handle_event(%Drinkup.Firehose.Event.Commit{} = event) do
3737+ IO.inspect(event, label: "Commit")
4638 end
47394840 def handle_event(_), do: :noop
4941end
4242+4343+# In your supervision tree:
4444+children = [{Drinkup.Firehose, %{consumer: MyApp.FirehoseConsumer}}]
5045```
51465252-Then add Drinkup and your consumer to your application's supervision tree:
4747+### Jetstream
53485449```elixir
5555-defmodule MyApp.Application do
5656- use Application
5050+defmodule MyApp.JetstreamConsumer do
5151+ @behaviour Drinkup.Jetstream.Consumer
57525858- def start(_type, _args) do
5959- children = [{Drinkup, %{consumer: ExampleConsumer}}]
6060- Supervisor.start_link(children, strategy: :one_for_one)
5353+ def handle_event(%Drinkup.Jetstream.Event.Commit{} = event) do
5454+ IO.inspect(event, label: "Commit")
6155 end
5656+5757+ def handle_event(_), do: :noop
6258end
6363-```
64596565-You should then be able to start your application and start seeing
6666-`Got commit event: ...` in the terminal.
6767-6868-### Record Consumer
6060+# In your supervision tree:
6161+children = [
6262+ {Drinkup.Jetstream, %{
6363+ consumer: MyApp.JetstreamConsumer,
6464+ wanted_collections: ["app.bsky.feed.post"]
6565+ }}
6666+]
6767+```
69687070-One of the main reasons for listening to an ATProto relay is to synchronise a
7171-database with records. As a result, Drinkup provides a light extension around a
7272-basic consumer, the `RecordConsumer`, which only listens to commit events, and
7373-transforms them into a slightly nicer structure to work around, calling your
7474-`handle_create/1`, `handle_update/1`, and `handle_delete/1` functions for each
7575-record it comes across. It also allows for filtering of specific types of
7676-records either by full name or with a
7777-[Regex](https://hexdocs.pm/elixir/1.18.4/Regex.html) match.
6969+### Tap
78707971```elixir
8080-defmodule ExampleRecordConsumer do
8181- # Will respond to any events either `app.bsky.feed.post` records, or anything under `app.bsky.graph`.
8282- use Drinkup.RecordConsumer, collections: [~r/app\.bsky\.graph\..+/, "app.bsky.feed.post"]
8383- alias Drinkup.RecordConsumer.Record
7272+defmodule MyApp.TapConsumer do
7373+ @behaviour Drinkup.Tap.Consumer
84748585- def handle_create(%Record{type: "app.bsky.feed.post"} = record) do
8686- IO.inspect(record, label: "Bluesky post created")
7575+ def handle_event(%Drinkup.Tap.Event.Record{} = event) do
7676+ IO.inspect(event, label: "Record")
8777 end
88788989- def handle_create(%Record{type: "app.bsky.graph" <> _} = record) do
9090- IO.inspect(record, label: "Bluesky graph updated")
9191- end
7979+ def handle_event(_), do: :noop
8080+end
92819393- def handle_update(record) do
9494- # ...
9595- end
8282+# In your supervision tree:
8383+children = [
8484+ {Drinkup.Tap, %{
8585+ consumer: MyApp.TapConsumer,
8686+ host: "http://localhost:2480"
8787+ }}
8888+]
96899797- def handle_delete(record) do
9898- # ...
9999- end
100100-end
9090+# Track specific repos:
9191+Drinkup.Tap.add_repos(Drinkup.Tap, ["did:plc:abc123"])
10192```
9393+9494+See [the examples](./examples) for some more complete samples.
9595+9696+## TODO
9797+9898+- Validation for Firehose events (signatures, active account tracking) โ see
9999+ [Firehose Validation Best Practices](https://atproto.com/specs/sync#firehose-validation-best-practices)
100100+- Pluggable cursor persistence (Mnesia, Postgres, etc.)
101101+- Multi-node/distribution testing
102102+- More comprehensive test coverage
103103+- Additional documentation
102104103105## Special thanks
104106
···11-defmodule Drinkup.Consumer do
22- @moduledoc """
33- An unopinionated consumer of the Firehose. Will receive all events, not just commits.
44- """
55-66- alias Drinkup.Event
77-88- @callback handle_event(Event.t()) :: any()
99-end
···11+defmodule Drinkup.Firehose.Consumer do
22+ @moduledoc """
33+ An unopinionated consumer of the Firehose. Will receive all events, not just commits.
44+ """
55+66+ alias Drinkup.Firehose.Event
77+88+ @callback handle_event(Event.t()) :: any()
99+end
+53
lib/firehose/event/account.ex
···11+defmodule Drinkup.Firehose.Event.Account do
22+ @moduledoc """
33+ Struct for account events from the ATProto Firehose.
44+ """
55+66+ use TypedStruct
77+88+ @type status() ::
99+ :takendown
1010+ | :suspended
1111+ | :deleted
1212+ | :deactivated
1313+ | :desynchronized
1414+ | :throttled
1515+ | String.t()
1616+1717+ typedstruct enforce: true do
1818+ field :seq, integer()
1919+ field :did, String.t()
2020+ field :time, NaiveDateTime.t()
2121+ field :active, bool()
2222+ field :status, status(), enforce: false
2323+ end
2424+2525+ @spec from(map()) :: t()
2626+ def from(%{"seq" => seq, "did" => did, "time" => time, "active" => active} = msg) do
2727+ status = recognise_status(Map.get(msg, "status"))
2828+ time = NaiveDateTime.from_iso8601!(time)
2929+3030+ %__MODULE__{
3131+ seq: seq,
3232+ did: did,
3333+ time: time,
3434+ active: active,
3535+ status: status
3636+ }
3737+ end
3838+3939+ @spec recognise_status(String.t()) :: status()
4040+ defp recognise_status(status)
4141+ when status in [
4242+ "takendown",
4343+ "suspended",
4444+ "deleted",
4545+ "deactivated",
4646+ "desynchronized",
4747+ "throttled"
4848+ ],
4949+ do: String.to_atom(status)
5050+5151+ defp recognise_status(status) when is_binary(status), do: status
5252+ defp recognise_status(nil), do: nil
5353+end
+100
lib/firehose/event/commit.ex
···11+defmodule Drinkup.Firehose.Event.Commit do
22+ @moduledoc """
33+ Struct for commit events from the ATProto Firehose.
44+ """
55+66+ # TODO: see atp specs
77+ @type tid() :: String.t()
88+99+ alias __MODULE__.RepoOp
1010+ use TypedStruct
1111+1212+ typedstruct enforce: true do
1313+ field :seq, integer()
1414+ # DEPCREATED
1515+ field :rebase, bool()
1616+ # DEPRECATED
1717+ field :too_big, bool()
1818+ field :repo, String.t()
1919+ field :commit, binary()
2020+ field :rev, tid()
2121+ field :since, tid() | nil
2222+ field :blocks, CAR.Archive.t()
2323+ field :ops, list(RepoOp.t())
2424+ # DEPRECATED
2525+ field :blobs, list(binary())
2626+ field :prev_data, binary(), enforce: nil
2727+ field :time, NaiveDateTime.t()
2828+ end
2929+3030+ @spec from(map()) :: t()
3131+ def from(
3232+ %{
3333+ "seq" => seq,
3434+ "rebase" => rebase,
3535+ "tooBig" => too_big,
3636+ "repo" => repo,
3737+ "commit" => commit,
3838+ "rev" => rev,
3939+ "since" => since,
4040+ "blocks" => %CBOR.Tag{value: blocks},
4141+ "ops" => ops,
4242+ "blobs" => blobs,
4343+ "time" => time
4444+ } = msg
4545+ ) do
4646+ prev_data =
4747+ Map.get(msg, "prevData")
4848+4949+ time = NaiveDateTime.from_iso8601!(time)
5050+ {:ok, blocks} = CAR.decode(blocks)
5151+5252+ %__MODULE__{
5353+ seq: seq,
5454+ rebase: rebase,
5555+ too_big: too_big,
5656+ repo: repo,
5757+ commit: commit,
5858+ rev: rev,
5959+ since: since,
6060+ blocks: blocks,
6161+ ops: Enum.map(ops, &RepoOp.from(&1, blocks)),
6262+ blobs: blobs,
6363+ prev_data: prev_data,
6464+ time: time
6565+ }
6666+ end
6767+6868+ defmodule RepoOp do
6969+ typedstruct enforce: true do
7070+ @type action() :: :create | :update | :delete | String.t()
7171+7272+ field :action, action()
7373+ field :path, String.t()
7474+ field :cid, binary()
7575+ field :prev, binary(), enforce: false
7676+ field :record, map() | nil
7777+ end
7878+7979+ @spec from(map(), CAR.Archive.t()) :: t()
8080+ def from(%{"action" => action, "path" => path, "cid" => cid} = op, %CAR.Archive{} = blocks) do
8181+ prev = Map.get(op, "prev")
8282+ record = CAR.Archive.get_block(blocks, cid)
8383+8484+ %__MODULE__{
8585+ action: recognise_action(action),
8686+ path: path,
8787+ cid: cid,
8888+ prev: prev,
8989+ record: record
9090+ }
9191+ end
9292+9393+ @spec recognise_action(String.t()) :: action()
9494+ defp recognise_action(action) when action in ["create", "update", "delete"],
9595+ do: String.to_atom(action)
9696+9797+ defp recognise_action(action) when is_binary(action), do: action
9898+ defp recognise_action(nil), do: nil
9999+ end
100100+end
+27
lib/firehose/event/identity.ex
···11+defmodule Drinkup.Firehose.Event.Identity do
22+ @moduledoc """
33+ Struct for identity events from the ATProto Firehose.
44+ """
55+66+ use TypedStruct
77+88+ typedstruct enforce: true do
99+ field :seq, integer()
1010+ field :did, String.t()
1111+ field :time, NaiveDateTime.t()
1212+ field :handle, String.t() | nil
1313+ end
1414+1515+ @spec from(map()) :: t()
1616+ def from(%{"seq" => seq, "did" => did, "time" => time} = msg) do
1717+ handle = Map.get(msg, "handle")
1818+ time = NaiveDateTime.from_iso8601!(time)
1919+2020+ %__MODULE__{
2121+ seq: seq,
2222+ did: did,
2323+ time: time,
2424+ handle: handle
2525+ }
2626+ end
2727+end
+22
lib/firehose/event/info.ex
···11+defmodule Drinkup.Firehose.Event.Info do
22+ @moduledoc """
33+ Struct for info events from the ATProto Firehose.
44+ """
55+66+ use TypedStruct
77+88+ typedstruct enforce: true do
99+ field :name, String.t()
1010+ field :message, String.t() | nil
1111+ end
1212+1313+ @spec from(map()) :: t()
1414+ def from(%{"name" => name} = msg) do
1515+ message = Map.get(msg, "message")
1616+1717+ %__MODULE__{
1818+ name: name,
1919+ message: message
2020+ }
2121+ end
2222+end
+28
lib/firehose/event/sync.ex
···11+defmodule Drinkup.Firehose.Event.Sync do
22+ @moduledoc """
33+ Struct for sync events from the ATProto Firehose.
44+ """
55+66+ use TypedStruct
77+88+ typedstruct enforce: true do
99+ field :seq, integer()
1010+ field :did, String.t()
1111+ field :blocks, binary()
1212+ field :rev, String.t()
1313+ field :time, NaiveDateTime.t()
1414+ end
1515+1616+ @spec from(map()) :: t()
1717+ def from(%{"seq" => seq, "did" => did, "blocks" => blocks, "rev" => rev, "time" => time}) do
1818+ time = NaiveDateTime.from_iso8601!(time)
1919+2020+ %__MODULE__{
2121+ seq: seq,
2222+ did: did,
2323+ blocks: blocks,
2424+ rev: rev,
2525+ time: time
2626+ }
2727+ end
2828+end
···11+defmodule Drinkup.Firehose.Options do
22+ @moduledoc """
33+ Configuration options for ATProto Firehose relay subscriptions.
44+55+ This module defines the configuration structure for connecting to and
66+ consuming events from an ATProto Firehose relay. The Firehose streams
77+ real-time repository events from the AT Protocol network.
88+99+ ## Options
1010+1111+ - `:consumer` (required) - Module implementing `Drinkup.Firehose.Consumer` behaviour
1212+ - `:name` - Unique name for this Firehose instance in the supervision tree (default: `Drinkup.Firehose`)
1313+ - `:host` - Firehose relay URL (default: `"https://bsky.network"`)
1414+ - `:cursor` - Optional sequence number to resume streaming from
1515+1616+ ## Example
1717+1818+ %{
1919+ consumer: MyFirehoseConsumer,
2020+ name: MyFirehose,
2121+ host: "https://bsky.network",
2222+ cursor: 12345
2323+ }
2424+ """
2525+2626+ use TypedStruct
2727+2828+ @default_host "https://bsky.network"
2929+3030+ @typedoc """
3131+ Map of configuration options accepted by `Drinkup.Firehose.child_spec/1`.
3232+ """
3333+ @type options() :: %{
3434+ required(:consumer) => consumer(),
3535+ optional(:name) => name(),
3636+ optional(:host) => host(),
3737+ optional(:cursor) => cursor()
3838+ }
3939+4040+ @typedoc """
4141+ Module implementing the `Drinkup.Firehose.Consumer` behaviour.
4242+ """
4343+ @type consumer() :: module()
4444+4545+ @typedoc """
4646+ Unique identifier for this Firehose instance in the supervision tree.
4747+4848+ Used for Registry lookups and naming child processes.
4949+ """
5050+ @type name() :: atom()
5151+5252+ @typedoc """
5353+ HTTP/HTTPS URL of the ATProto Firehose relay.
5454+5555+ Defaults to `"https://bsky.network"` which is the public Bluesky relay.
5656+5757+ You can find a list of third-party relays at https://compare.hose.cam/.
5858+ """
5959+ @type host() :: String.t()
6060+6161+ @typedoc """
6262+ Optional sequence number to resume streaming from.
6363+6464+ When provided, the Firehose will replay events starting from this sequence
6565+ number. Useful for resuming after a restart without missing events. The
6666+ cursor is automatically tracked and updated as events are received.
6767+ """
6868+ @type cursor() :: pos_integer() | nil
6969+7070+ typedstruct do
7171+ field :consumer, consumer(), enforce: true
7272+ field :name, name(), default: Drinkup.Firehose
7373+ field :host, host(), default: @default_host
7474+ field :cursor, cursor()
7575+ end
7676+7777+ @spec from(options()) :: t()
7878+ def from(%{consumer: _} = options), do: struct(__MODULE__, options)
7979+end
+85
lib/firehose/record_consumer.ex
···11+defmodule Drinkup.Firehose.RecordConsumer do
22+ @moduledoc """
33+ An opinionated consumer of the Firehose that eats consumers
44+ """
55+66+ @callback handle_create(any()) :: any()
77+ @callback handle_update(any()) :: any()
88+ @callback handle_delete(any()) :: any()
99+1010+ defmacro __using__(opts) do
1111+ {collections, _opts} = Keyword.pop(opts, :collections, [])
1212+1313+ quote location: :keep do
1414+ @behaviour Drinkup.Firehose.Consumer
1515+ @behaviour Drinkup.Firehose.RecordConsumer
1616+1717+ def handle_event(%Drinkup.Firehose.Event.Commit{} = event) do
1818+ event.ops
1919+ |> Enum.filter(fn %{path: path} ->
2020+ path |> String.split("/") |> Enum.at(0) |> matches_collections?()
2121+ end)
2222+ |> Enum.map(&Drinkup.Firehose.RecordConsumer.Record.from(&1, event.repo))
2323+ |> Enum.each(&apply(__MODULE__, :"handle_#{&1.action}", [&1]))
2424+ end
2525+2626+ def handle_event(_event), do: :noop
2727+2828+ unquote(
2929+ if collections == [] do
3030+ quote do
3131+ def matches_collections?(_type), do: true
3232+ end
3333+ else
3434+ quote do
3535+ def matches_collections?(nil), do: false
3636+3737+ def matches_collections?(type) when is_binary(type),
3838+ do:
3939+ Enum.any?(unquote(collections), fn
4040+ matcher when is_binary(matcher) -> type == matcher
4141+ matcher -> Regex.match?(matcher, type)
4242+ end)
4343+ end
4444+ end
4545+ )
4646+4747+ @impl true
4848+ def handle_create(_record), do: nil
4949+ @impl true
5050+ def handle_update(_record), do: nil
5151+ @impl true
5252+ def handle_delete(_record), do: nil
5353+5454+ defoverridable handle_create: 1, handle_update: 1, handle_delete: 1
5555+ end
5656+ end
5757+5858+ defmodule Record do
5959+ alias Drinkup.Firehose.Event.Commit.RepoOp
6060+ use TypedStruct
6161+6262+ typedstruct do
6363+ field :type, String.t()
6464+ field :rkey, String.t()
6565+ field :did, String.t()
6666+ field :action, :create | :update | :delete
6767+ field :cid, binary() | nil
6868+ field :record, map() | nil
6969+ end
7070+7171+ @spec from(RepoOp.t(), String.t()) :: t()
7272+ def from(%RepoOp{action: action, path: path, cid: cid, record: record}, did) do
7373+ [type, rkey] = String.split(path, "/")
7474+7575+ %__MODULE__{
7676+ type: type,
7777+ rkey: rkey,
7878+ did: did,
7979+ action: action,
8080+ cid: cid,
8181+ record: record
8282+ }
8383+ end
8484+ end
8585+end
+90
lib/firehose/socket.ex
···11+defmodule Drinkup.Firehose.Socket do
22+ @moduledoc """
33+ WebSocket connection handler for ATProto relay subscriptions.
44+55+ Implements the Drinkup.Socket behaviour to manage connections to an ATProto
66+ Firehose relay, handling CAR/CBOR-encoded frames and dispatching events to
77+ the configured consumer.
88+ """
99+1010+ use Drinkup.Socket
1111+1212+ require Logger
1313+ alias Drinkup.Firehose.{Event, Options}
1414+1515+ @op_regular 1
1616+ @op_error -1
1717+1818+ @impl true
1919+ def init(opts) do
2020+ options = Keyword.fetch!(opts, :options)
2121+ {:ok, %{seq: options.cursor, options: options, host: options.host}}
2222+ end
2323+2424+ def start_link(%Options{} = options, statem_opts) do
2525+ # Build opts for Drinkup.Socket from Options struct
2626+ socket_opts = [
2727+ host: options.host,
2828+ cursor: options.cursor,
2929+ options: options
3030+ ]
3131+3232+ Drinkup.Socket.start_link(__MODULE__, socket_opts, statem_opts)
3333+ end
3434+3535+ @impl true
3636+ def build_path(%{seq: seq}) do
3737+ cursor_param = if seq, do: %{cursor: seq}, else: %{}
3838+ "/xrpc/com.atproto.sync.subscribeRepos?" <> URI.encode_query(cursor_param)
3939+ end
4040+4141+ @impl true
4242+ def handle_frame({:binary, frame}, {%{seq: seq, options: options} = data, _conn, _stream}) do
4343+ with {:ok, header, next} <- CAR.DagCbor.decode(frame),
4444+ {:ok, payload, _} <- CAR.DagCbor.decode(next),
4545+ {%{"op" => @op_regular, "t" => type}, _} <- {header, payload},
4646+ true <- Event.valid_seq?(seq, payload["seq"]) do
4747+ new_seq = payload["seq"] || seq
4848+4949+ case Event.from(type, payload) do
5050+ nil ->
5151+ Logger.warning("Received unrecognised event from firehose: #{inspect({type, payload})}")
5252+5353+ message ->
5454+ Event.dispatch(message, options)
5555+ end
5656+5757+ {:ok, %{data | seq: new_seq}}
5858+ else
5959+ false ->
6060+ Logger.error("Got out of sequence or invalid `seq` from Firehose")
6161+ :noop
6262+6363+ {%{"op" => @op_error, "t" => type}, payload} ->
6464+ Logger.error("Got error from Firehose: #{inspect({type, payload})}")
6565+ :noop
6666+6767+ {:error, reason} ->
6868+ Logger.warning("Failed to decode frame from Firehose: #{inspect(reason)}")
6969+ :noop
7070+ end
7171+ end
7272+7373+ @impl true
7474+ def handle_frame(:close, _data) do
7575+ Logger.info("Websocket closed, reason unknown")
7676+ nil
7777+ end
7878+7979+ @impl true
8080+ def handle_frame({:close, errno, reason}, _data) do
8181+ Logger.info("Websocket closed, errno: #{errno}, reason: #{inspect(reason)}")
8282+ nil
8383+ end
8484+8585+ @impl true
8686+ def handle_frame({:text, _text}, _data) do
8787+ Logger.warning("Received unexpected text frame from Firehose")
8888+ :noop
8989+ end
9090+end
···11+defmodule Drinkup.Jetstream.Consumer do
22+ @moduledoc """
33+ Consumer behaviour for handling Jetstream events.
44+55+ Implement this behaviour to process events from a Jetstream instance.
66+ Events are dispatched asynchronously via `Task.Supervisor`.
77+88+ Unlike Tap, Jetstream does not require event acknowledgments. Events are
99+ processed in a fire-and-forget manner.
1010+1111+ ## Example
1212+1313+ defmodule MyJetstreamConsumer do
1414+ @behaviour Drinkup.Jetstream.Consumer
1515+1616+ def handle_event(%Drinkup.Jetstream.Event.Commit{operation: :create} = event) do
1717+ # Handle new record creation
1818+ IO.inspect(event, label: "New record")
1919+ :ok
2020+ end
2121+2222+ def handle_event(%Drinkup.Jetstream.Event.Commit{operation: :delete} = event) do
2323+ # Handle record deletion
2424+ IO.inspect(event, label: "Deleted record")
2525+ :ok
2626+ end
2727+2828+ def handle_event(%Drinkup.Jetstream.Event.Identity{} = event) do
2929+ # Handle identity changes
3030+ IO.inspect(event, label: "Identity update")
3131+ :ok
3232+ end
3333+3434+ def handle_event(%Drinkup.Jetstream.Event.Account{active: false} = event) do
3535+ # Handle account deactivation
3636+ IO.inspect(event, label: "Account inactive")
3737+ :ok
3838+ end
3939+4040+ def handle_event(_event), do: :ok
4141+ end
4242+4343+ ## Event Types
4444+4545+ The consumer will receive one of three event types:
4646+4747+ - `Drinkup.Jetstream.Event.Commit` - Repository commits (create, update, delete)
4848+ - `Drinkup.Jetstream.Event.Identity` - Identity updates (handle changes, etc.)
4949+ - `Drinkup.Jetstream.Event.Account` - Account status changes (active, taken down, etc.)
5050+5151+ ## Error Handling
5252+5353+ If your `handle_event/1` implementation raises an exception, it will be logged
5454+ but will not affect the stream. The error is caught and logged by the event
5555+ dispatcher.
5656+ """
5757+5858+ alias Drinkup.Jetstream.Event
5959+6060+ @callback handle_event(Event.t()) :: any()
6161+end
+106
lib/jetstream/event/account.ex
···11+defmodule Drinkup.Jetstream.Event.Account do
22+ @moduledoc """
33+ Struct for account events from Jetstream.
44+55+ Represents a change to an account's status on a host (e.g., PDS or Relay).
66+ The semantics of this event are that the status is at the host which emitted
77+ the event, not necessarily that at the currently active PDS.
88+99+ For example, a Relay takedown would emit a takedown with `active: false`,
1010+ even if the PDS is still active.
1111+ """
1212+1313+ use TypedStruct
1414+1515+ typedstruct enforce: true do
1616+ @typedoc """
1717+ The status of an inactive account.
1818+1919+ Known values from the ATProto lexicon:
2020+ - `:takendown` - Account has been taken down
2121+ - `:suspended` - Account is suspended
2222+ - `:deleted` - Account has been deleted
2323+ - `:deactivated` - Account has been deactivated by the user
2424+ - `:desynchronized` - Account is out of sync
2525+ - `:throttled` - Account is throttled
2626+2727+ The status can also be any other string value for future compatibility.
2828+ """
2929+ @type status() ::
3030+ :takendown
3131+ | :suspended
3232+ | :deleted
3333+ | :deactivated
3434+ | :desynchronized
3535+ | :throttled
3636+ | String.t()
3737+3838+ field :did, String.t()
3939+ field :time_us, integer()
4040+ field :kind, :account, default: :account
4141+ field :active, boolean()
4242+ field :seq, integer()
4343+ field :time, NaiveDateTime.t()
4444+ field :status, status() | nil
4545+ end
4646+4747+ @doc """
4848+ Parses a Jetstream account payload into an Account struct.
4949+5050+ ## Example Payload (Active)
5151+5252+ %{
5353+ "active" => true,
5454+ "did" => "did:plc:ufbl4k27gp6kzas5glhz7fim",
5555+ "seq" => 1409753013,
5656+ "time" => "2024-09-05T06:11:04.870Z"
5757+ }
5858+5959+ ## Example Payload (Inactive)
6060+6161+ %{
6262+ "active" => false,
6363+ "did" => "did:plc:abc123",
6464+ "seq" => 1409753014,
6565+ "time" => "2024-09-05T06:12:00.000Z",
6666+ "status" => "takendown"
6767+ }
6868+ """
6969+ @spec from(String.t(), integer(), map()) :: t()
7070+ def from(
7171+ did,
7272+ time_us,
7373+ %{
7474+ "active" => active,
7575+ "seq" => seq,
7676+ "time" => time
7777+ } = account
7878+ ) do
7979+ %__MODULE__{
8080+ did: did,
8181+ time_us: time_us,
8282+ active: active,
8383+ seq: seq,
8484+ time: parse_datetime(time),
8585+ status: parse_status(Map.get(account, "status"))
8686+ }
8787+ end
8888+8989+ @spec parse_datetime(String.t()) :: NaiveDateTime.t()
9090+ defp parse_datetime(time_str) do
9191+ case NaiveDateTime.from_iso8601(time_str) do
9292+ {:ok, datetime} -> datetime
9393+ {:error, _} -> raise "Invalid datetime format: #{time_str}"
9494+ end
9595+ end
9696+9797+ @spec parse_status(String.t() | nil) :: status() | nil
9898+ defp parse_status(nil), do: nil
9999+ defp parse_status("takendown"), do: :takendown
100100+ defp parse_status("suspended"), do: :suspended
101101+ defp parse_status("deleted"), do: :deleted
102102+ defp parse_status("deactivated"), do: :deactivated
103103+ defp parse_status("desynchronized"), do: :desynchronized
104104+ defp parse_status("throttled"), do: :throttled
105105+ defp parse_status(status) when is_binary(status), do: status
106106+end
+78
lib/jetstream/event/commit.ex
···11+defmodule Drinkup.Jetstream.Event.Commit do
22+ @moduledoc """
33+ Struct for commit events from Jetstream.
44+55+ Represents a repository commit containing either a create, update, or delete
66+ operation on a record. Unlike the Firehose commit events, Jetstream provides
77+ simplified JSON structures without CAR/CBOR encoding.
88+ """
99+1010+ use TypedStruct
1111+1212+ typedstruct enforce: true do
1313+ @typedoc """
1414+ The operation type for this commit.
1515+1616+ - `:create` - A new record was created
1717+ - `:update` - An existing record was updated
1818+ - `:delete` - An existing record was deleted
1919+ """
2020+ @type operation() :: :create | :update | :delete
2121+2222+ field :did, String.t()
2323+ field :time_us, integer()
2424+ field :kind, :commit, default: :commit
2525+ field :operation, operation()
2626+ field :collection, String.t()
2727+ field :rkey, String.t()
2828+ field :rev, String.t()
2929+ field :record, map() | nil
3030+ field :cid, String.t() | nil
3131+ end
3232+3333+ @doc """
3434+ Parses a Jetstream commit payload into a Commit struct.
3535+3636+ ## Example Payload
3737+3838+ %{
3939+ "rev" => "3l3qo2vutsw2b",
4040+ "operation" => "create",
4141+ "collection" => "app.bsky.feed.like",
4242+ "rkey" => "3l3qo2vuowo2b",
4343+ "record" => %{
4444+ "$type" => "app.bsky.feed.like",
4545+ "createdAt" => "2024-09-09T19:46:02.102Z",
4646+ "subject" => %{...}
4747+ },
4848+ "cid" => "bafyreidwaivazkwu67xztlmuobx35hs2lnfh3kolmgfmucldvhd3sgzcqi"
4949+ }
5050+ """
5151+ @spec from(String.t(), integer(), map()) :: t()
5252+ def from(
5353+ did,
5454+ time_us,
5555+ %{
5656+ "rev" => rev,
5757+ "operation" => operation,
5858+ "collection" => collection,
5959+ "rkey" => rkey
6060+ } = commit
6161+ ) do
6262+ %__MODULE__{
6363+ did: did,
6464+ time_us: time_us,
6565+ operation: parse_operation(operation),
6666+ collection: collection,
6767+ rkey: rkey,
6868+ rev: rev,
6969+ record: Map.get(commit, "record"),
7070+ cid: Map.get(commit, "cid")
7171+ }
7272+ end
7373+7474+ @spec parse_operation(String.t()) :: operation()
7575+ defp parse_operation("create"), do: :create
7676+ defp parse_operation("update"), do: :update
7777+ defp parse_operation("delete"), do: :delete
7878+end
+58
lib/jetstream/event/identity.ex
···11+defmodule Drinkup.Jetstream.Event.Identity do
22+ @moduledoc """
33+ Struct for identity events from Jetstream.
44+55+ Represents a change to an account's identity, such as an updated handle,
66+ signing key, or PDS hosting endpoint. This serves as a signal to downstream
77+ services to refresh their identity cache.
88+ """
99+1010+ use TypedStruct
1111+1212+ typedstruct enforce: true do
1313+ field :did, String.t()
1414+ field :time_us, integer()
1515+ field :kind, :identity, default: :identity
1616+ field :handle, String.t() | nil
1717+ field :seq, integer()
1818+ field :time, NaiveDateTime.t()
1919+ end
2020+2121+ @doc """
2222+ Parses a Jetstream identity payload into an Identity struct.
2323+2424+ ## Example Payload
2525+2626+ %{
2727+ "did" => "did:plc:ufbl4k27gp6kzas5glhz7fim",
2828+ "handle" => "yohenrique.bsky.social",
2929+ "seq" => 1409752997,
3030+ "time" => "2024-09-05T06:11:04.870Z"
3131+ }
3232+ """
3333+ @spec from(String.t(), integer(), map()) :: t()
3434+ def from(
3535+ did,
3636+ time_us,
3737+ %{
3838+ "seq" => seq,
3939+ "time" => time
4040+ } = identity
4141+ ) do
4242+ %__MODULE__{
4343+ did: did,
4444+ time_us: time_us,
4545+ handle: Map.get(identity, "handle"),
4646+ seq: seq,
4747+ time: parse_datetime(time)
4848+ }
4949+ end
5050+5151+ @spec parse_datetime(String.t()) :: NaiveDateTime.t()
5252+ defp parse_datetime(time_str) do
5353+ case NaiveDateTime.from_iso8601(time_str) do
5454+ {:ok, datetime} -> datetime
5555+ {:error, _} -> raise "Invalid datetime format: #{time_str}"
5656+ end
5757+ end
5858+end
+100
lib/jetstream/event.ex
···11+defmodule Drinkup.Jetstream.Event do
22+ @moduledoc """
33+ Event handling and dispatch for Jetstream events.
44+55+ Parses incoming JSON events from Jetstream and dispatches them to the
66+ configured consumer via Task.Supervisor.
77+ """
88+99+ require Logger
1010+ alias Drinkup.Jetstream.{Event, Options}
1111+1212+ @type t() :: Event.Commit.t() | Event.Identity.t() | Event.Account.t()
1313+1414+ @doc """
1515+ Parse a JSON map into an event struct.
1616+1717+ Jetstream events have a top-level structure with a "kind" field that
1818+ determines the event type, and a nested object with the event data.
1919+2020+ ## Example Event Structure
2121+2222+ %{
2323+ "did" => "did:plc:...",
2424+ "time_us" => 1726880765818347,
2525+ "kind" => "commit",
2626+ "commit" => %{...}
2727+ }
2828+2929+ Returns the appropriate event struct based on the "kind" field, or `nil`
3030+ if the event type is not recognized.
3131+ """
3232+ @spec from(map()) :: t() | nil
3333+ def from(%{"did" => did, "time_us" => time_us, "kind" => kind} = payload) do
3434+ case kind do
3535+ "commit" ->
3636+ case Map.get(payload, "commit") do
3737+ nil ->
3838+ Logger.warning("Commit event missing 'commit' field: #{inspect(payload)}")
3939+ nil
4040+4141+ commit ->
4242+ Event.Commit.from(did, time_us, commit)
4343+ end
4444+4545+ "identity" ->
4646+ case Map.get(payload, "identity") do
4747+ nil ->
4848+ Logger.warning("Identity event missing 'identity' field: #{inspect(payload)}")
4949+ nil
5050+5151+ identity ->
5252+ Event.Identity.from(did, time_us, identity)
5353+ end
5454+5555+ "account" ->
5656+ case Map.get(payload, "account") do
5757+ nil ->
5858+ Logger.warning("Account event missing 'account' field: #{inspect(payload)}")
5959+ nil
6060+6161+ account ->
6262+ Event.Account.from(did, time_us, account)
6363+ end
6464+6565+ _ ->
6666+ Logger.warning("Received unrecognized event kind from Jetstream: #{inspect(kind)}")
6767+ nil
6868+ end
6969+ end
7070+7171+ def from(payload) do
7272+ Logger.warning("Received invalid event structure from Jetstream: #{inspect(payload)}")
7373+ nil
7474+ end
7575+7676+ @doc """
7777+ Dispatch an event to the consumer via Task.Supervisor.
7878+7979+ Spawns a task that processes the event via the consumer's `handle_event/1`
8080+ callback. Unlike Tap, Jetstream does not require acknowledgments.
8181+ """
8282+ @spec dispatch(t(), Options.t()) :: :ok
8383+ def dispatch(event, %Options{consumer: consumer, name: name}) do
8484+ supervisor_name = {:via, Registry, {Drinkup.Registry, {name, JetstreamTasks}}}
8585+8686+ {:ok, _pid} =
8787+ Task.Supervisor.start_child(supervisor_name, fn ->
8888+ try do
8989+ consumer.handle_event(event)
9090+ rescue
9191+ e ->
9292+ Logger.error(
9393+ "Error in Jetstream event handler: #{Exception.format(:error, e, __STACKTRACE__)}"
9494+ )
9595+ end
9696+ end)
9797+9898+ :ok
9999+ end
100100+end
+151
lib/jetstream/options.ex
···11+defmodule Drinkup.Jetstream.Options do
22+ @moduledoc """
33+ Configuration options for Jetstream event stream connection.
44+55+ Jetstream is a simplified JSON event stream that converts the CBOR-encoded
66+ ATProto Firehose into lightweight, friendly JSON. It provides zstd compression
77+ and filtering capabilities for collections and DIDs.
88+99+ ## Options
1010+1111+ - `:consumer` (required) - Module implementing `Drinkup.Jetstream.Consumer` behaviour
1212+ - `:name` - Unique name for this Jetstream instance in the supervision tree (default: `Drinkup.Jetstream`)
1313+ - `:host` - Jetstream service URL (default: `"wss://jetstream2.us-east.bsky.network"`)
1414+ - `:wanted_collections` - List of collection NSIDs or prefixes to filter (default: `[]` = all collections)
1515+ - `:wanted_dids` - List of DIDs to filter (default: `[]` = all repos)
1616+ - `:cursor` - Unix microseconds timestamp to resume from (default: `nil` = live-tail)
1717+ - `:require_hello` - Pause replay until first options update is sent (default: `false`)
1818+ - `:max_message_size_bytes` - Maximum message size to receive (default: `nil` = no limit)
1919+2020+ ## Example
2121+2222+ %{
2323+ consumer: MyJetstreamConsumer,
2424+ name: MyJetstream,
2525+ host: "wss://jetstream2.us-east.bsky.network",
2626+ wanted_collections: ["app.bsky.feed.post", "app.bsky.feed.like"],
2727+ wanted_dids: ["did:plc:abc123"],
2828+ cursor: 1725519626134432
2929+ }
3030+3131+ ## Collection Filters
3232+3333+ The `wanted_collections` option supports:
3434+ - Full NSIDs: `"app.bsky.feed.post"`
3535+ - NSID prefixes: `"app.bsky.graph.*"`, `"app.bsky.*"`
3636+3737+ You can specify up to 100 collection filters.
3838+3939+ ## DID Filters
4040+4141+ The `wanted_dids` option accepts a list of DID strings.
4242+ You can specify up to 10,000 DIDs.
4343+4444+ ## Compression
4545+4646+ Jetstream always uses zstd compression with a custom dictionary.
4747+ This is handled automatically by the socket implementation.
4848+ """
4949+5050+ use TypedStruct
5151+5252+ @default_host "wss://jetstream2.us-east.bsky.network"
5353+5454+ @typedoc """
5555+ Map of configuration options accepted by `Drinkup.Jetstream.child_spec/1`.
5656+ """
5757+ @type options() :: %{
5858+ required(:consumer) => consumer(),
5959+ optional(:name) => name(),
6060+ optional(:host) => host(),
6161+ optional(:wanted_collections) => wanted_collections(),
6262+ optional(:wanted_dids) => wanted_dids(),
6363+ optional(:cursor) => cursor(),
6464+ optional(:require_hello) => require_hello(),
6565+ optional(:max_message_size_bytes) => max_message_size_bytes()
6666+ }
6767+6868+ @typedoc """
6969+ Module implementing the `Drinkup.Jetstream.Consumer` behaviour.
7070+ """
7171+ @type consumer() :: module()
7272+7373+ @typedoc """
7474+ Unique identifier for this Jetstream instance in the supervision tree.
7575+7676+ Used for Registry lookups and naming child processes.
7777+ """
7878+ @type name() :: atom()
7979+8080+ @typedoc """
8181+ WebSocket URL of the Jetstream service.
8282+8383+ Defaults to `"wss://jetstream2.us-east.bsky.network"` which is a public Bluesky instance.
8484+ """
8585+ @type host() :: String.t()
8686+8787+ @typedoc """
8888+ List of collection NSIDs or NSID prefixes to filter.
8989+9090+ Examples:
9191+ - `["app.bsky.feed.post"]` - Only posts
9292+ - `["app.bsky.graph.*"]` - All graph collections
9393+ - `["app.bsky.*"]` - All Bluesky app collections
9494+9595+ You can specify up to 100 collection filters.
9696+ Defaults to `[]` (all collections).
9797+ """
9898+ @type wanted_collections() :: [String.t()]
9999+100100+ @typedoc """
101101+ List of DIDs to filter events by.
102102+103103+ You can specify up to 10,000 DIDs.
104104+ Defaults to `[]` (all repos).
105105+ """
106106+ @type wanted_dids() :: [String.t()]
107107+108108+ @typedoc """
109109+ Unix microseconds timestamp to resume streaming from.
110110+111111+ When provided, Jetstream will replay events starting from this timestamp.
112112+ Useful for resuming after a restart without missing events. The cursor is
113113+ automatically tracked and updated as events are received.
114114+115115+ Defaults to `nil` (live-tail from current time).
116116+ """
117117+ @type cursor() :: pos_integer() | nil
118118+119119+ @typedoc """
120120+ Whether to pause replay/live-tail until the first options update is sent.
121121+122122+ When `true`, the connection will wait for a `Drinkup.Jetstream.update_options/2`
123123+ call before starting to receive events.
124124+125125+ Defaults to `false`.
126126+ """
127127+ @type require_hello() :: boolean()
128128+129129+ @typedoc """
130130+ Maximum message size in bytes that the client would like to receive.
131131+132132+ Zero or `nil` means no limit. Negative values are treated as zero.
133133+ Defaults to `nil` (no maximum size).
134134+ """
135135+ @type max_message_size_bytes() :: integer() | nil
136136+137137+ typedstruct do
138138+ field :consumer, consumer(), enforce: true
139139+ field :name, name(), default: Drinkup.Jetstream
140140+ field :host, host(), default: @default_host
141141+ # TODO: Add NSID prefix validation once available in atex
142142+ field :wanted_collections, wanted_collections(), default: []
143143+ field :wanted_dids, wanted_dids(), default: []
144144+ field :cursor, cursor()
145145+ field :require_hello, require_hello(), default: false
146146+ field :max_message_size_bytes, max_message_size_bytes()
147147+ end
148148+149149+ @spec from(options()) :: t()
150150+ def from(%{consumer: _} = options), do: struct(__MODULE__, options)
151151+end
+201
lib/jetstream/socket.ex
···11+defmodule Drinkup.Jetstream.Socket do
22+ @moduledoc """
33+ WebSocket connection handler for Jetstream event streams.
44+55+ Implements the Drinkup.Socket behaviour to manage connections to a Jetstream
66+ service, handling zstd-compressed JSON events and dispatching them to the
77+ configured consumer.
88+ """
99+1010+ use Drinkup.Socket
1111+1212+ require Logger
1313+ alias Drinkup.Jetstream.{Event, Options}
1414+1515+ @dict_path "priv/jetstream/zstd_dictionary"
1616+ @external_resource @dict_path
1717+ @zstd_dict File.read!(@dict_path)
1818+1919+ @impl true
2020+ def init(opts) do
2121+ options = Keyword.fetch!(opts, :options)
2222+2323+ {:ok, %{options: options, host: options.host, cursor: options.cursor}}
2424+ end
2525+2626+ def start_link(%Options{} = options, statem_opts) do
2727+ socket_opts = [
2828+ host: options.host,
2929+ options: options
3030+ ]
3131+3232+ statem_opts =
3333+ Keyword.put(
3434+ statem_opts,
3535+ :name,
3636+ {:via, Registry, {Drinkup.Registry, {options.name, JetstreamSocket}}}
3737+ )
3838+3939+ Drinkup.Socket.start_link(__MODULE__, socket_opts, statem_opts)
4040+ end
4141+4242+ @impl true
4343+ def build_path(%{options: options}) do
4444+ query_params = [compress: "true"]
4545+4646+ query_params =
4747+ query_params
4848+ |> put_collections(options.wanted_collections)
4949+ |> put_dids(options.wanted_dids)
5050+ |> put_cursor(options.cursor)
5151+ |> put_max_size(options.max_message_size_bytes)
5252+ |> put_require_hello(options.require_hello)
5353+5454+ "/subscribe?" <> URI.encode_query(query_params)
5555+ end
5656+5757+ @impl true
5858+ def handle_frame(
5959+ {:binary, compressed_data},
6060+ {%{options: options} = data, _conn, _stream}
6161+ ) do
6262+ case decompress_and_parse(compressed_data) do
6363+ {:ok, payload} ->
6464+ case Event.from(payload) do
6565+ nil ->
6666+ # Event.from already logs warnings for unrecognized events
6767+ :noop
6868+6969+ event ->
7070+ Event.dispatch(event, options)
7171+ # Update cursor with the event's time_us
7272+ new_cursor = Map.get(payload, "time_us")
7373+ {:ok, %{data | cursor: new_cursor}}
7474+ end
7575+7676+ # TODO: sometimes getting ZSTD_CONTENTSIZE_UNKNOWN
7777+ {:error, reason} ->
7878+ Logger.error(
7979+ "[Drinkup.Jetstream.Socket] Failed to decompress/parse frame: #{inspect(reason)}"
8080+ )
8181+8282+ :noop
8383+ end
8484+ end
8585+8686+ @impl true
8787+ def handle_frame({:text, json}, {%{options: options} = data, _conn, _stream}) do
8888+ # Text frames shouldn't happen since we force compression, but handle them anyway
8989+ case Jason.decode(json) do
9090+ {:ok, payload} ->
9191+ case Event.from(payload) do
9292+ nil ->
9393+ :noop
9494+9595+ event ->
9696+ Event.dispatch(event, options)
9797+ new_cursor = Map.get(payload, "time_us")
9898+ {:ok, %{data | cursor: new_cursor}}
9999+ end
100100+101101+ {:error, reason} ->
102102+ Logger.error("[Drinkup.Jetstream.Socket] Failed to decode JSON: #{inspect(reason)}")
103103+ :noop
104104+ end
105105+ end
106106+107107+ @impl true
108108+ def handle_frame(:close, _data) do
109109+ Logger.info("[Drinkup.Jetstream.Socket] WebSocket closed, reason unknown")
110110+ nil
111111+ end
112112+113113+ @impl true
114114+ def handle_frame({:close, errno, reason}, _data) do
115115+ Logger.info(
116116+ "[Drinkup.Jetstream.Socket] WebSocket closed, errno: #{errno}, reason: #{inspect(reason)}"
117117+ )
118118+119119+ nil
120120+ end
121121+122122+ @impl true
123123+ def handle_connected({user_data, conn, stream}) do
124124+ # Register connection for options updates
125125+ Registry.register(
126126+ Drinkup.Registry,
127127+ {user_data.options.name, JetstreamConnection},
128128+ {conn, stream}
129129+ )
130130+131131+ {:ok, user_data}
132132+ end
133133+134134+ @impl true
135135+ def handle_disconnected(_reason, {user_data, _conn, _stream}) do
136136+ # Unregister connection when disconnected
137137+ Registry.unregister(Drinkup.Registry, {user_data.options.name, JetstreamConnection})
138138+ {:ok, user_data}
139139+ end
140140+141141+ # Can't use `create_ddict` as the value of `@zstd_dict` because it returns a reference :(
142142+ @spec get_dictionary() :: reference()
143143+ defp get_dictionary() do
144144+ case :ezstd.create_ddict(@zstd_dict) do
145145+ {:error, reason} ->
146146+ raise ArgumentError,
147147+ "somehow failed to created Jetstream's ZSTD dictionary: #{inspect(reason)}"
148148+149149+ dict ->
150150+ dict
151151+ end
152152+ end
153153+154154+ @spec decompress_and_parse(binary()) :: {:ok, map()} | {:error, term()}
155155+ defp decompress_and_parse(compressed_data) do
156156+ with ctx when is_reference(ctx) <-
157157+ :ezstd.create_decompression_context(byte_size(compressed_data)),
158158+ :ok <- :ezstd.select_ddict(ctx, get_dictionary()),
159159+ iolist when is_list(iolist) <- :ezstd.decompress_streaming(ctx, compressed_data),
160160+ decompressed <- IO.iodata_to_binary(iolist),
161161+ {:ok, payload} <- JSON.decode(decompressed) do
162162+ {:ok, payload}
163163+ else
164164+ {:error, reason} -> {:error, reason}
165165+ end
166166+ end
167167+168168+ @spec put_collections(keyword(), [String.t()]) :: keyword()
169169+ defp put_collections(params, []), do: params
170170+171171+ defp put_collections(params, collections) when is_list(collections) do
172172+ Enum.reduce(collections, params, fn collection, acc ->
173173+ [{:wantedCollections, collection} | acc]
174174+ end)
175175+ end
176176+177177+ @spec put_dids(keyword(), [String.t()]) :: keyword()
178178+ defp put_dids(params, []), do: params
179179+180180+ defp put_dids(params, dids) when is_list(dids) do
181181+ Enum.reduce(dids, params, fn did, acc ->
182182+ [{:wantedDids, did} | acc]
183183+ end)
184184+ end
185185+186186+ @spec put_cursor(keyword(), integer() | nil) :: keyword()
187187+ defp put_cursor(params, nil), do: params
188188+189189+ defp put_cursor(params, cursor) when is_integer(cursor), do: [{:cursor, cursor} | params]
190190+191191+ @spec put_max_size(keyword(), integer() | nil) :: keyword()
192192+ defp put_max_size(params, nil), do: params
193193+194194+ defp put_max_size(params, max_size) when is_integer(max_size),
195195+ do: [{:maxMessageSizeBytes, max_size} | params]
196196+197197+ @spec put_require_hello(keyword(), boolean()) :: keyword()
198198+ defp put_require_hello(params, false), do: params
199199+200200+ defp put_require_hello(params, true), do: [{:requireHello, "true"} | params]
201201+end
+207
lib/jetstream.ex
···11+defmodule Drinkup.Jetstream do
22+ @moduledoc """
33+ Supervisor for Jetstream event stream connections.
44+55+ Jetstream is a simplified JSON event stream that converts the CBOR-encoded
66+ ATProto Firehose into lightweight, friendly JSON events. It provides zstd
77+ compression and filtering capabilities for collections and DIDs.
88+99+ ## Usage
1010+1111+ Add Jetstream to your supervision tree:
1212+1313+ children = [
1414+ {Drinkup.Jetstream, %{
1515+ consumer: MyJetstreamConsumer,
1616+ name: MyJetstream,
1717+ wanted_collections: ["app.bsky.feed.post", "app.bsky.feed.like"]
1818+ }}
1919+ ]
2020+2121+ ## Configuration
2222+2323+ See `Drinkup.Jetstream.Options` for all available configuration options.
2424+2525+ ## Dynamic Filter Updates
2626+2727+ You can update filters after the connection is established:
2828+2929+ Drinkup.Jetstream.update_options(MyJetstream, %{
3030+ wanted_collections: ["app.bsky.graph.follow"],
3131+ wanted_dids: ["did:plc:abc123"]
3232+ })
3333+3434+ ## Public Instances
3535+3636+ By default Drinkup connects to `jetstream2.us-east.bsky.network`.
3737+3838+ Bluesky operates a few different Jetstream instances:
3939+ - `jetstream1.us-east.bsky.network`
4040+ - `jetstream2.us-east.bsky.network`
4141+ - `jetstream1.us-west.bsky.network`
4242+ - `jetstream2.us-west.bsky.network`
4343+4444+ There also some third-party instances not run by Bluesky PBC:
4545+ - `jetstream.fire.hose.cam`
4646+ - `jetstream2.fr.hose.cam`
4747+ - `jetstream1.us-east.fire.hose.cam`
4848+ """
4949+5050+ use Supervisor
5151+ require Logger
5252+ alias Drinkup.Jetstream.Options
5353+5454+ @dialyzer nowarn_function: {:init, 1}
5555+5656+ @impl true
5757+ def init({%Options{name: name} = drinkup_options, supervisor_options}) do
5858+ children = [
5959+ {Task.Supervisor, name: {:via, Registry, {Drinkup.Registry, {name, JetstreamTasks}}}},
6060+ {Drinkup.Jetstream.Socket, drinkup_options}
6161+ ]
6262+6363+ Supervisor.start_link(
6464+ children,
6565+ supervisor_options ++
6666+ [name: {:via, Registry, {Drinkup.Registry, {name, JetstreamSupervisor}}}]
6767+ )
6868+ end
6969+7070+ @spec child_spec(Options.options()) :: Supervisor.child_spec()
7171+ def child_spec(%{} = options), do: child_spec({options, [strategy: :one_for_one]})
7272+7373+ @spec child_spec({Options.options(), Keyword.t()}) :: Supervisor.child_spec()
7474+ def child_spec({drinkup_options, supervisor_options}) do
7575+ %{
7676+ id: Map.get(drinkup_options, :name, __MODULE__),
7777+ start: {__MODULE__, :init, [{Options.from(drinkup_options), supervisor_options}]},
7878+ type: :supervisor,
7979+ restart: :permanent,
8080+ shutdown: 500
8181+ }
8282+ end
8383+8484+ # Options Update API
8585+8686+ @typedoc """
8787+ Options that can be updated dynamically via `update_options/2`.
8888+8989+ - `:wanted_collections` - List of collection NSIDs or prefixes (max 100)
9090+ - `:wanted_dids` - List of DIDs to filter (max 10,000)
9191+ - `:max_message_size_bytes` - Maximum message size to receive
9292+9393+ Empty arrays will disable the corresponding filter (i.e., receive all).
9494+ """
9595+ @type update_opts :: %{
9696+ optional(:wanted_collections) => [String.t()],
9797+ optional(:wanted_dids) => [String.t()],
9898+ optional(:max_message_size_bytes) => integer()
9999+ }
100100+101101+ @doc """
102102+ Update filters and options for an active Jetstream connection.
103103+104104+ Sends an options update message to the Jetstream server over the websocket
105105+ connection. This allows you to dynamically change which collections and DIDs
106106+ you're interested in without reconnecting.
107107+108108+ ## Parameters
109109+110110+ - `name` - The name of the Jetstream instance (default: `Drinkup.Jetstream`)
111111+ - `opts` - Map with optional fields:
112112+ - `:wanted_collections` - List of collection NSIDs or prefixes (max 100)
113113+ - `:wanted_dids` - List of DIDs to filter (max 10,000)
114114+ - `:max_message_size_bytes` - Maximum message size to receive
115115+116116+ ## Examples
117117+118118+ # Filter to only posts
119119+ Drinkup.Jetstream.update_options(MyJetstream, %{
120120+ wanted_collections: ["app.bsky.feed.post"]
121121+ })
122122+123123+ # Filter to specific DIDs
124124+ Drinkup.Jetstream.update_options(MyJetstream, %{
125125+ wanted_dids: ["did:plc:abc123", "did:plc:def456"]
126126+ })
127127+128128+ # Disable all filters (receive all events)
129129+ Drinkup.Jetstream.update_options(MyJetstream, %{
130130+ wanted_collections: [],
131131+ wanted_dids: []
132132+ })
133133+134134+ ## Return Value
135135+136136+ Returns `:ok` if the message was sent successfully, or `{:error, reason}` if
137137+ the socket process could not be found or the message could not be sent.
138138+139139+ Note: The server may reject invalid updates (e.g., too many collections/DIDs).
140140+ Invalid updates will result in the connection being closed by the server.
141141+ """
142142+ @spec update_options(atom(), update_opts()) :: :ok | {:error, term()}
143143+ def update_options(name \\ Drinkup.Jetstream, opts) when is_map(opts) do
144144+ case find_connection(name) do
145145+ {:ok, {conn, stream}} ->
146146+ message = build_options_update_message(opts)
147147+ :ok = :gun.ws_send(conn, stream, {:text, message})
148148+149149+ Logger.debug("[Drinkup.Jetstream] Sent options update")
150150+ :ok
151151+152152+ {:error, reason} ->
153153+ {:error, reason}
154154+ end
155155+ end
156156+157157+ # Private functions
158158+159159+ @spec find_connection(atom()) :: {:ok, {pid(), :gun.stream_ref()}} | {:error, :not_connected}
160160+ defp find_connection(name) do
161161+ # Look up the connection details from Registry
162162+ case Registry.lookup(Drinkup.Registry, {name, JetstreamConnection}) do
163163+ [{_socket_pid, {conn, stream}}] ->
164164+ {:ok, {conn, stream}}
165165+166166+ [] ->
167167+ {:error, :not_connected}
168168+ end
169169+ end
170170+171171+ @spec build_options_update_message(update_opts()) :: String.t()
172172+ defp build_options_update_message(opts) do
173173+ payload =
174174+ %{}
175175+ |> maybe_add_wanted_collections(Map.get(opts, :wanted_collections))
176176+ |> maybe_add_wanted_dids(Map.get(opts, :wanted_dids))
177177+ |> maybe_add_max_message_size(Map.get(opts, :max_message_size_bytes))
178178+179179+ message = %{
180180+ "type" => "options_update",
181181+ "payload" => payload
182182+ }
183183+184184+ Jason.encode!(message)
185185+ end
186186+187187+ @spec maybe_add_wanted_collections(map(), [String.t()] | nil) :: map()
188188+ defp maybe_add_wanted_collections(payload, nil), do: payload
189189+190190+ defp maybe_add_wanted_collections(payload, collections) when is_list(collections) do
191191+ Map.put(payload, "wantedCollections", collections)
192192+ end
193193+194194+ @spec maybe_add_wanted_dids(map(), [String.t()] | nil) :: map()
195195+ defp maybe_add_wanted_dids(payload, nil), do: payload
196196+197197+ defp maybe_add_wanted_dids(payload, dids) when is_list(dids) do
198198+ Map.put(payload, "wantedDids", dids)
199199+ end
200200+201201+ @spec maybe_add_max_message_size(map(), integer() | nil) :: map()
202202+ defp maybe_add_max_message_size(payload, nil), do: payload
203203+204204+ defp maybe_add_max_message_size(payload, max_size) when is_integer(max_size) do
205205+ Map.put(payload, "maxMessageSizeBytes", max_size)
206206+ end
207207+end
-22
lib/options.ex
···11-defmodule Drinkup.Options do
22- use TypedStruct
33-44- @default_host "https://bsky.network"
55-66- @type options() :: %{
77- required(:consumer) => module(),
88- optional(:name) => atom(),
99- optional(:host) => String.t(),
1010- optional(:cursor) => pos_integer()
1111- }
1212-1313- typedstruct do
1414- field :consumer, module(), enforce: true
1515- field :name, atom(), default: Drinkup
1616- field :host, String.t(), default: @default_host
1717- field :cursor, pos_integer() | nil
1818- end
1919-2020- @spec from(options()) :: t()
2121- def from(%{consumer: _} = options), do: struct(__MODULE__, options)
2222-end
-85
lib/record_consumer.ex
···11-defmodule Drinkup.RecordConsumer do
22- @moduledoc """
33- An opinionated consumer of the Firehose that eats consumers
44- """
55-66- @callback handle_create(any()) :: any()
77- @callback handle_update(any()) :: any()
88- @callback handle_delete(any()) :: any()
99-1010- defmacro __using__(opts) do
1111- {collections, _opts} = Keyword.pop(opts, :collections, [])
1212-1313- quote location: :keep do
1414- @behaviour Drinkup.Consumer
1515- @behaviour Drinkup.RecordConsumer
1616-1717- def handle_event(%Drinkup.Event.Commit{} = event) do
1818- event.ops
1919- |> Enum.filter(fn %{path: path} ->
2020- path |> String.split("/") |> Enum.at(0) |> matches_collections?()
2121- end)
2222- |> Enum.map(&Drinkup.RecordConsumer.Record.from(&1, event.repo))
2323- |> Enum.each(&apply(__MODULE__, :"handle_#{&1.action}", [&1]))
2424- end
2525-2626- def handle_event(_event), do: :noop
2727-2828- unquote(
2929- if collections == [] do
3030- quote do
3131- def matches_collections?(_type), do: true
3232- end
3333- else
3434- quote do
3535- def matches_collections?(nil), do: false
3636-3737- def matches_collections?(type) when is_binary(type),
3838- do:
3939- Enum.any?(unquote(collections), fn
4040- matcher when is_binary(matcher) -> type == matcher
4141- matcher -> Regex.match?(matcher, type)
4242- end)
4343- end
4444- end
4545- )
4646-4747- @impl true
4848- def handle_create(_record), do: nil
4949- @impl true
5050- def handle_update(_record), do: nil
5151- @impl true
5252- def handle_delete(_record), do: nil
5353-5454- defoverridable handle_create: 1, handle_update: 1, handle_delete: 1
5555- end
5656- end
5757-5858- defmodule Record do
5959- alias Drinkup.Event.Commit.RepoOp
6060- use TypedStruct
6161-6262- typedstruct do
6363- field :type, String.t()
6464- field :rkey, String.t()
6565- field :did, String.t()
6666- field :action, :create | :update | :delete
6767- field :cid, binary() | nil
6868- field :record, map() | nil
6969- end
7070-7171- @spec from(RepoOp.t(), String.t()) :: t()
7272- def from(%RepoOp{action: action, path: path, cid: cid, record: record}, did) do
7373- [type, rkey] = String.split(path, "/")
7474-7575- %__MODULE__{
7676- type: type,
7777- rkey: rkey,
7878- did: did,
7979- action: action,
8080- cid: cid,
8181- record: record
8282- }
8383- end
8484- end
8585-end
+282-98
lib/socket.ex
···11defmodule Drinkup.Socket do
22- @moduledoc """
33- gen_statem process for managing the websocket connection to an ATProto relay.
44- """
22+ # TODO: talk about how to implment, but that it's for internal use
33+ @moduledoc false
5465 require Logger
77- alias Drinkup.{Event, Options}
8697 @behaviour :gen_statem
1010- @timeout :timer.seconds(5)
1111- # TODO: `flow` determines messages in buffer. Determine ideal value?
1212- @flow 10
88+99+ @type frame ::
1010+ {:binary, binary()}
1111+ | {:text, String.t()}
1212+ | :close
1313+ | {:close, errno :: integer(), reason :: binary()}
1414+1515+ @type user_data :: term()
1616+1717+ @type reconnect_strategy ::
1818+ :exponential
1919+ | {:exponential, max_backoff :: pos_integer()}
2020+ | {:custom, (attempt :: pos_integer() -> delay_ms :: pos_integer())}
2121+2222+ @type option ::
2323+ {:host, String.t()}
2424+ | {:flow, pos_integer()}
2525+ | {:timeout, pos_integer()}
2626+ | {:tls_opts, keyword()}
2727+ | {:gun_opts, map()}
2828+ | {:reconnect_strategy, reconnect_strategy()}
2929+ | {atom(), term()}
3030+3131+ @callback init(opts :: keyword()) :: {:ok, user_data()} | {:error, reason :: term()}
3232+3333+ @callback build_path(data :: user_data()) :: String.t()
3434+3535+ @callback handle_frame(
3636+ frame :: frame(),
3737+ data :: {user_data(), conn :: pid() | nil, stream :: :gun.stream_ref() | nil}
3838+ ) ::
3939+ {:ok, new_data :: user_data()} | :noop | nil | {:error, reason :: term()}
4040+4141+ @callback handle_connected(data :: {user_data(), conn :: pid(), stream :: :gun.stream_ref()}) ::
4242+ {:ok, new_data :: user_data()}
4343+4444+ @callback handle_disconnected(
4545+ reason :: term(),
4646+ data :: {user_data(), conn :: pid() | nil, stream :: :gun.stream_ref() | nil}
4747+ ) ::
4848+ {:ok, new_data :: user_data()}
4949+5050+ @optional_callbacks handle_connected: 1, handle_disconnected: 2
5151+5252+ defstruct [
5353+ :module,
5454+ :user_data,
5555+ :options,
5656+ :conn,
5757+ :stream,
5858+ reconnect_attempts: 0
5959+ ]
6060+6161+ defmacro __using__(_opts) do
6262+ quote do
6363+ @behaviour Drinkup.Socket
6464+6565+ def start_link(opts, statem_opts \\ [])
6666+6767+ def start_link(opts, statem_opts) do
6868+ Drinkup.Socket.start_link(__MODULE__, opts, statem_opts)
6969+ end
7070+7171+ defoverridable start_link: 2
7272+7373+ def child_spec(opts) do
7474+ %{
7575+ id: __MODULE__,
7676+ start: {__MODULE__, :start_link, [opts, []]},
7777+ type: :worker,
7878+ restart: :permanent,
7979+ shutdown: 500
8080+ }
8181+ end
8282+8383+ defoverridable child_spec: 1
13841414- @op_regular 1
1515- @op_error -1
8585+ @impl true
8686+ def handle_connected({user_data, _conn, _stream}), do: {:ok, user_data}
16871717- defstruct [:options, :seq, :conn, :stream]
8888+ @impl true
8989+ def handle_disconnected(_reason, {user_data, _conn, _stream}), do: {:ok, user_data}
9090+9191+ defoverridable handle_connected: 1, handle_disconnected: 2
9292+ end
9393+ end
18941995 @impl true
2096 def callback_mode, do: [:state_functions, :state_enter]
21972222- def child_spec(opts) do
2323- %{
2424- id: __MODULE__,
2525- start: {__MODULE__, :start_link, [opts, []]},
2626- type: :worker,
2727- restart: :permanent,
2828- shutdown: 500
2929- }
3030- end
9898+ @doc """
9999+ Start a WebSocket connection process.
311003232- def start_link(%Options{} = options, statem_opts) do
3333- :gen_statem.start_link(__MODULE__, options, statem_opts)
101101+ ## Parameters
102102+103103+ * `module` - The module implementing the Drinkup.Socket behaviour
104104+ * `opts` - Keyword list of options (see module documentation)
105105+ * `statem_opts` - Options passed to `:gen_statem.start_link/3`
106106+ """
107107+ def start_link(module, opts, statem_opts) do
108108+ :gen_statem.start_link(__MODULE__, {module, opts}, statem_opts)
34109 end
3511036111 @impl true
3737- def init(%{cursor: seq} = options) do
3838- data = %__MODULE__{seq: seq, options: options}
3939- {:ok, :disconnected, data, [{:next_event, :internal, :connect}]}
112112+ def init({module, opts}) do
113113+ case module.init(opts) do
114114+ {:ok, user_data} ->
115115+ options = parse_options(opts)
116116+117117+ data = %__MODULE__{
118118+ module: module,
119119+ user_data: user_data,
120120+ options: options,
121121+ reconnect_attempts: 0
122122+ }
123123+124124+ {:ok, :disconnected, data, [{:next_event, :internal, :connect}]}
125125+126126+ {:error, reason} ->
127127+ {:stop, {:init_failed, reason}}
128128+ end
40129 end
411304242- def disconnected(:enter, _from, data) do
4343- Logger.debug("Initial connection")
4444- # TODO: differentiate between initial & reconnects, probably stuff to do with seq
4545- {:next_state, :disconnected, data}
131131+ # :disconnected state - waiting to connect or reconnect
132132+133133+ def disconnected(:enter, _from, _data) do
134134+ Logger.debug("[Drinkup.Socket] Entering disconnected state")
135135+ :keep_state_and_data
46136 end
4713748138 def disconnected(:internal, :connect, data) do
49139 {:next_state, :connecting_http, data}
50140 end
51141142142+ def disconnected(:timeout, :reconnect, data) do
143143+ {:next_state, :connecting_http, data}
144144+ end
145145+146146+ # :connecting_http state - establishing HTTP connection with TLS
147147+52148 def connecting_http(:enter, _from, %{options: options} = data) do
5353- Logger.debug("Connecting to http")
149149+ Logger.debug("[Drinkup.Socket] Connecting to HTTP")
5415055151 %{host: host, port: port} = URI.new!(options.host)
561525757- {:ok, conn} =
5858- :gun.open(:binary.bin_to_list(host), port, %{
5959- retry: 0,
6060- protocols: [:http],
6161- connect_timeout: @timeout,
6262- domain_lookup_timeout: @timeout,
6363- tls_handshake_timeout: @timeout,
6464- tls_opts: [
6565- verify: :verify_peer,
6666- cacerts: :certifi.cacerts(),
6767- depth: 3,
6868- customize_hostname_check: [
6969- match_fun: :public_key.pkix_verify_hostname_match_fun(:https)
7070- ]
7171- ]
7272- })
153153+ gun_opts =
154154+ Map.merge(
155155+ %{
156156+ retry: 0,
157157+ protocols: [:http],
158158+ connect_timeout: options.timeout,
159159+ domain_lookup_timeout: options.timeout,
160160+ tls_handshake_timeout: options.timeout,
161161+ tls_opts: options.tls_opts
162162+ },
163163+ options.gun_opts
164164+ )
165165+166166+ case :gun.open(:binary.bin_to_list(host), port, gun_opts) do
167167+ {:ok, conn} ->
168168+ {:keep_state, %{data | conn: conn}, [{:state_timeout, options.timeout, :connect_timeout}]}
731697474- {:keep_state, %{data | conn: conn}, [{:state_timeout, @timeout, :connect_timeout}]}
170170+ {:error, reason} ->
171171+ Logger.error("[Drinkup.Socket] Failed to open connection: #{inspect(reason)}")
172172+ {:stop, {:connect_failed, reason}}
173173+ end
75174 end
7617577176 def connecting_http(:info, {:gun_up, _conn, :http}, data) do
78177 {:next_state, :connecting_ws, data}
79178 end
801798181- def connecting_http(:state_timeout, :connect_timeout, _data) do
8282- {:stop, :connect_http_timeout}
180180+ def connecting_http(:state_timeout, :connect_timeout, data) do
181181+ Logger.error("[Drinkup.Socket] HTTP connection timeout")
182182+ trigger_reconnect(data)
83183 end
841848585- def connecting_ws(:enter, _from, %{conn: conn, seq: seq} = data) do
8686- Logger.debug("Upgrading connection to websocket")
8787- path = "/xrpc/com.atproto.sync.subscribeRepos?" <> URI.encode_query(%{cursor: seq})
8888- stream = :gun.ws_upgrade(conn, path, [], %{flow: @flow})
8989- {:keep_state, %{data | stream: stream}, [{:state_timeout, @timeout, :upgrade_timeout}]}
185185+ # :connecting_ws state - upgrading to WebSocket
186186+187187+ def connecting_ws(
188188+ :enter,
189189+ _from,
190190+ %{module: module, user_data: user_data, options: options} = data
191191+ ) do
192192+ Logger.debug("[Drinkup.Socket] Upgrading connection to WebSocket")
193193+194194+ path = module.build_path(user_data)
195195+ stream = :gun.ws_upgrade(data.conn, path, [], %{flow: options.flow})
196196+197197+ {:keep_state, %{data | stream: stream}, [{:state_timeout, options.timeout, :upgrade_timeout}]}
90198 end
9119992200 def connecting_ws(:info, {:gun_upgrade, _conn, _stream, ["websocket"], _headers}, data) do
93201 {:next_state, :connected, data}
94202 end
952039696- def connecting_ws(:state_timeout, :upgrade_timeout, _data) do
9797- {:stop, :connect_ws_timeout}
204204+ def connecting_ws(:info, {:gun_response, _conn, _stream, _fin, status, _headers}, data) do
205205+ Logger.error("[Drinkup.Socket] WebSocket upgrade failed with status: #{status}")
206206+ trigger_reconnect(data)
98207 end
99208100100- def connected(:enter, _from, _data) do
101101- Logger.debug("Connected to websocket")
102102- :keep_state_and_data
209209+ def connecting_ws(:info, {:gun_error, _conn, _stream, reason}, data) do
210210+ Logger.error("[Drinkup.Socket] WebSocket upgrade error: #{inspect(reason)}")
211211+ trigger_reconnect(data)
103212 end
104213105105- def connected(:info, {:gun_ws, conn, stream, {:binary, frame}}, %{options: options} = data) do
106106- # TODO: let clients specify a handler for raw* (*decoded) packets to support any atproto subscription
107107- # Will also need support for JSON frames
108108- with {:ok, header, next} <- CAR.DagCbor.decode(frame),
109109- {:ok, payload, _} <- CAR.DagCbor.decode(next),
110110- {%{"op" => @op_regular, "t" => type}, _} <- {header, payload},
111111- true <- Event.valid_seq?(data.seq, payload["seq"]) do
112112- data = %{data | seq: payload["seq"] || data.seq}
113113- message = Event.from(type, payload)
114114- :ok = :gun.update_flow(conn, stream, @flow)
214214+ def connecting_ws(:state_timeout, :upgrade_timeout, data) do
215215+ Logger.error("[Drinkup.Socket] WebSocket upgrade timeout")
216216+ trigger_reconnect(data)
217217+ end
218218+219219+ # :connected state - active WebSocket connection
220220+221221+ def connected(
222222+ :enter,
223223+ _from,
224224+ %{module: module, user_data: user_data, conn: conn, stream: stream} = data
225225+ ) do
226226+ Logger.debug("[Drinkup.Socket] WebSocket connected")
115227116116- case message do
117117- nil ->
118118- Logger.warning("Received unrecognised event from firehose: #{inspect({type, payload})}")
228228+ case module.handle_connected({user_data, conn, stream}) do
229229+ {:ok, new_user_data} ->
230230+ {:keep_state, %{data | user_data: new_user_data, reconnect_attempts: 0}}
231231+232232+ _ ->
233233+ {:keep_state, %{data | reconnect_attempts: 0}}
234234+ end
235235+ end
236236+237237+ def connected(
238238+ :info,
239239+ {:gun_ws, conn, _stream, frame},
240240+ %{module: module, user_data: user_data, options: options, conn: conn, stream: stream} =
241241+ data
242242+ ) do
243243+ result = module.handle_frame(frame, {user_data, conn, stream})
119244120120- message ->
121121- Event.dispatch(message, options)
122122- end
245245+ :ok = :gun.update_flow(conn, frame, options.flow)
123246124124- {:keep_state, data}
125125- else
126126- false ->
127127- Logger.error("Got out of sequence or invalid `seq` from Firehose")
128128- {:keep_state, data}
247247+ case result do
248248+ {:ok, new_user_data} ->
249249+ {:keep_state, %{data | user_data: new_user_data}}
129250130130- {%{"op" => @op_error, "t" => type}, payload} ->
131131- Logger.error("Got error from Firehose: #{inspect({type, payload})}")
132132- {:keep_state, data}
251251+ result when result in [:noop, nil] ->
252252+ :keep_state_and_data
133253134254 {:error, reason} ->
135135- Logger.warning("Failed to decode frame from Firehose: #{inspect(reason)}")
136136- {:keep_state, data}
255255+ Logger.error("[Drinkup.Socket] Frame handler error: #{inspect(reason)}")
256256+ :keep_state_and_data
137257 end
138258 end
139259140140- def connected(:info, {:gun_ws, _conn, _stream, :close}, _data) do
141141- Logger.info("Websocket closed, reason unknown")
142142- {:keep_state_and_data, [{:next_event, :internal, :reconnect}]}
260260+ def connected(:info, {:gun_ws, _conn, _stream, :close}, data) do
261261+ Logger.info("[Drinkup.Socket] WebSocket closed by remote")
262262+ trigger_reconnect(data, :remote_close)
143263 end
144264145145- def connected(:info, {:gun_ws, _conn, _stream, {:close, errno, reason}}, _data) do
146146- Logger.info("Websocket closed, errno: #{errno}, reason: #{inspect(reason)}")
147147- {:keep_state_and_data, [{:next_event, :internal, :reconnect}]}
265265+ def connected(:info, {:gun_ws, _conn, _stream, {:close, errno, reason}}, data) do
266266+ Logger.info("[Drinkup.Socket] WebSocket closed: #{errno} - #{inspect(reason)}")
267267+ trigger_reconnect(data, {:remote_close, errno, reason})
148268 end
149269150270 def connected(:info, {:gun_down, old_conn, _proto, _reason, _killed_streams}, %{conn: new_conn})
151271 when old_conn != new_conn do
152152- Logger.debug("Ignoring received :gun_down for a previous connection.")
272272+ Logger.debug("[Drinkup.Socket] Ignoring :gun_down for old connection")
153273 :keep_state_and_data
154274 end
155275156156- def connected(:info, {:gun_down, _conn, _proto, _reason, _killed_streams}, _data) do
157157- Logger.info("Websocket connection killed. Attempting to reconnect")
158158- {:keep_state_and_data, [{:next_event, :internal, :reconnect}]}
276276+ def connected(:info, {:gun_down, _conn, _proto, reason, _killed_streams}, data) do
277277+ Logger.info("[Drinkup.Socket] Connection down: #{inspect(reason)}")
278278+ trigger_reconnect(data, {:connection_down, reason})
159279 end
160280161161- def connected(:internal, :reconnect, %{conn: conn} = data) do
281281+ def connected(
282282+ :internal,
283283+ :reconnect,
284284+ %{conn: conn, options: options, reconnect_attempts: attempts} = data
285285+ ) do
162286 :ok = :gun.close(conn)
163287 :ok = :gun.flush(conn)
164288165165- # TODO: reconnect backoff
166166- {:next_state, :disconnected, %{data | conn: nil, stream: nil},
167167- [{:next_event, :internal, :connect}]}
289289+ backoff = calculate_backoff(attempts, options.reconnect_strategy)
290290+291291+ Logger.info("[Drinkup.Socket] Reconnecting in #{backoff}ms (attempt #{attempts + 1})")
292292+293293+ {:next_state, :disconnected,
294294+ %{data | conn: nil, stream: nil, reconnect_attempts: attempts + 1},
295295+ [{{:timeout, :reconnect}, backoff, :reconnect}]}
296296+ end
297297+298298+ # Helper functions
299299+300300+ defp trigger_reconnect(data, reason \\ :unknown) do
301301+ %{module: module, user_data: user_data, conn: conn, stream: stream} = data
302302+303303+ case module.handle_disconnected(reason, {user_data, conn, stream}) do
304304+ {:ok, new_user_data} ->
305305+ {:keep_state, %{data | user_data: new_user_data}, [{:next_event, :internal, :reconnect}]}
306306+307307+ _ ->
308308+ {:keep_state_and_data, [{:next_event, :internal, :reconnect}]}
309309+ end
310310+ end
311311+312312+ defp parse_options(opts) do
313313+ %{
314314+ host: Keyword.fetch!(opts, :host),
315315+ flow: Keyword.get(opts, :flow, 10),
316316+ timeout: Keyword.get(opts, :timeout, :timer.seconds(5)),
317317+ tls_opts: Keyword.get(opts, :tls_opts, default_tls_opts()),
318318+ gun_opts: Keyword.get(opts, :gun_opts, %{}),
319319+ reconnect_strategy: Keyword.get(opts, :reconnect_strategy, :exponential)
320320+ }
321321+ end
322322+323323+ defp default_tls_opts do
324324+ [
325325+ verify: :verify_peer,
326326+ cacerts: :certifi.cacerts(),
327327+ depth: 3,
328328+ customize_hostname_check: [
329329+ match_fun: :public_key.pkix_verify_hostname_match_fun(:https)
330330+ ]
331331+ ]
332332+ end
333333+334334+ defp calculate_backoff(attempt, strategy) do
335335+ case strategy do
336336+ :exponential ->
337337+ exponential_backoff(attempt, :timer.seconds(60))
338338+339339+ {:exponential, max_backoff} ->
340340+ exponential_backoff(attempt, max_backoff)
341341+342342+ {:custom, func} when is_function(func, 1) ->
343343+ func.(attempt)
344344+ end
345345+ end
346346+347347+ defp exponential_backoff(attempt, max_backoff) do
348348+ base = :timer.seconds(1)
349349+ delay = min(base * :math.pow(2, attempt), max_backoff)
350350+ jitter = :rand.uniform(trunc(delay * 0.1))
351351+ trunc(delay) + jitter
168352 end
169353end
+46
lib/tap/consumer.ex
···11+defmodule Drinkup.Tap.Consumer do
22+ @moduledoc """
33+ Consumer behaviour for handling Tap events.
44+55+ Implement this behaviour to process events from a Tap indexer/backfill service.
66+ Events are dispatched asynchronously via `Task.Supervisor` and acknowledged
77+ to Tap based on the return value of `handle_event/1`.
88+99+ ## Event Acknowledgment
1010+1111+ By default, events are acknowledged to Tap based on your return value:
1212+1313+ - `:ok`, `{:ok, any()}`, or `nil` โ Success, event is acked to Tap
1414+ - `{:error, reason}` โ Failure, event is NOT acked (Tap will retry after timeout)
1515+ - Exception raised โ Failure, event is NOT acked (Tap will retry after timeout)
1616+1717+ Any other value will log a warning and acknowledge the event anyway.
1818+1919+ If you set `disable_acks: true` in your Tap options, no acks are sent regardless
2020+ of the return value. This matches Tap's `TAP_DISABLE_ACKS` environment variable.
2121+2222+ ## Example
2323+2424+ defmodule MyTapConsumer do
2525+ @behaviour Drinkup.Tap.Consumer
2626+2727+ def handle_event(%Drinkup.Tap.Event.Record{action: :create} = record) do
2828+ # Handle new record creation
2929+ case save_to_database(record) do
3030+ :ok -> :ok # Success - event will be acked
3131+ {:error, reason} -> {:error, reason} # Failure - Tap will retry
3232+ end
3333+ end
3434+3535+ def handle_event(%Drinkup.Tap.Event.Identity{} = identity) do
3636+ # Handle identity changes
3737+ update_identity(identity)
3838+ :ok # Success - event will be acked
3939+ end
4040+ end
4141+ """
4242+4343+ alias Drinkup.Tap.Event
4444+4545+ @callback handle_event(Event.Record.t() | Event.Identity.t()) :: any()
4646+end
+39
lib/tap/event/identity.ex
···11+defmodule Drinkup.Tap.Event.Identity do
22+ @moduledoc """
33+ Struct for identity events from Tap.
44+55+ Represents handle or status changes for a DID.
66+ """
77+88+ use TypedStruct
99+1010+ typedstruct enforce: true do
1111+ field :id, integer()
1212+ field :did, String.t()
1313+ field :handle, String.t() | nil
1414+ field :is_active, boolean()
1515+ field :status, String.t()
1616+ end
1717+1818+ @spec from(map()) :: t()
1919+ def from(%{
2020+ "id" => id,
2121+ "type" => "identity",
2222+ "identity" =>
2323+ %{
2424+ "did" => did,
2525+ "is_active" => is_active,
2626+ "status" => status
2727+ } = identity_data
2828+ }) do
2929+ handle = Map.get(identity_data, "handle")
3030+3131+ %__MODULE__{
3232+ id: id,
3333+ did: did,
3434+ handle: handle,
3535+ is_active: is_active,
3636+ status: status
3737+ }
3838+ end
3939+end
+58
lib/tap/event/record.ex
···11+defmodule Drinkup.Tap.Event.Record do
22+ @moduledoc """
33+ Struct for record events from Tap.
44+55+ Represents create, update, or delete operations on records in the repository.
66+ """
77+88+ use TypedStruct
99+1010+ typedstruct enforce: true do
1111+ @type action() :: :create | :update | :delete
1212+1313+ field :id, integer()
1414+ field :live, boolean()
1515+ field :rev, String.t()
1616+ field :did, String.t()
1717+ field :collection, String.t()
1818+ field :rkey, String.t()
1919+ field :action, action()
2020+ field :cid, String.t() | nil
2121+ field :record, map() | nil
2222+ end
2323+2424+ @spec from(map()) :: t()
2525+ def from(%{
2626+ "id" => id,
2727+ "type" => "record",
2828+ "record" =>
2929+ %{
3030+ "live" => live,
3131+ "rev" => rev,
3232+ "did" => did,
3333+ "collection" => collection,
3434+ "rkey" => rkey,
3535+ "action" => action
3636+ } = record_data
3737+ }) do
3838+ cid = Map.get(record_data, "cid")
3939+ record = Map.get(record_data, "record")
4040+4141+ %__MODULE__{
4242+ id: id,
4343+ live: live,
4444+ rev: rev,
4545+ did: did,
4646+ collection: collection,
4747+ rkey: rkey,
4848+ action: parse_action(action),
4949+ cid: cid,
5050+ record: record
5151+ }
5252+ end
5353+5454+ @spec parse_action(String.t()) :: action()
5555+ defp parse_action("create"), do: :create
5656+ defp parse_action("update"), do: :update
5757+ defp parse_action("delete"), do: :delete
5858+end
+105
lib/tap/event.ex
···11+defmodule Drinkup.Tap.Event do
22+ @moduledoc """
33+ Event handling and dispatch for Tap events.
44+55+ Parses incoming JSON events from Tap and dispatches them to the configured
66+ consumer via Task.Supervisor. After successful processing, sends an ack
77+ message back to the socket.
88+ """
99+1010+ require Logger
1111+ alias Drinkup.Tap.{Event, Options}
1212+1313+ @type t() :: Event.Record.t() | Event.Identity.t()
1414+1515+ @doc """
1616+ Parse a JSON map into an event struct.
1717+1818+ Returns the appropriate event struct based on the "type" field.
1919+ """
2020+ @spec from(map()) :: t() | nil
2121+ def from(%{"type" => "record"} = payload), do: Event.Record.from(payload)
2222+ def from(%{"type" => "identity"} = payload), do: Event.Identity.from(payload)
2323+ def from(_payload), do: nil
2424+2525+ @doc """
2626+ Dispatch an event to the consumer via Task.Supervisor.
2727+2828+ Spawns a task that:
2929+ 1. Processes the event via the consumer's handle_event/1 callback
3030+ 2. Sends an ack to Tap if acks are enabled and the consumer returns :ok, {:ok, _}, or nil
3131+ 3. Does not ack if the consumer returns an error-like value or raises an exception
3232+3333+ Consumer return value semantics (when acks are enabled):
3434+ - `:ok` or `{:ok, any()}` or `nil` -> Success, send ack
3535+ - `{:error, _}` or any error-like tuple -> Failure, don't ack (Tap will retry)
3636+ - Exception raised -> Failure, don't ack (Tap will retry)
3737+3838+ If `disable_acks: true` is set in options, no acks are sent regardless of
3939+ consumer return value.
4040+ """
4141+ @spec dispatch(t(), Options.t(), pid(), :gun.stream_ref()) :: :ok
4242+ def dispatch(
4343+ event,
4444+ %Options{consumer: consumer, name: name, disable_acks: disable_acks},
4545+ conn,
4646+ stream
4747+ ) do
4848+ supervisor_name = {:via, Registry, {Drinkup.Registry, {name, TapTasks}}}
4949+ event_id = get_event_id(event)
5050+5151+ {:ok, _pid} =
5252+ Task.Supervisor.start_child(supervisor_name, fn ->
5353+ try do
5454+ result = consumer.handle_event(event)
5555+5656+ unless disable_acks do
5757+ case result do
5858+ :ok ->
5959+ send_ack(conn, stream, event_id)
6060+6161+ {:ok, _} ->
6262+ send_ack(conn, stream, event_id)
6363+6464+ nil ->
6565+ send_ack(conn, stream, event_id)
6666+6767+ :error ->
6868+ Logger.error("Consumer returned error for event #{event_id}, not acking.")
6969+7070+ {:error, reason} ->
7171+ Logger.error(
7272+ "Consumer returned error for event #{event_id}, not acking: #{inspect(reason)}"
7373+ )
7474+7575+ _ ->
7676+ Logger.warning(
7777+ "Consumer returned unexpected value for event #{event_id}, acking anyway: #{inspect(result)}"
7878+ )
7979+8080+ send_ack(conn, stream, event_id)
8181+ end
8282+ end
8383+ rescue
8484+ e ->
8585+ Logger.error(
8686+ "Error in Tap event handler (event #{event_id}), not acking: #{Exception.format(:error, e, __STACKTRACE__)}"
8787+ )
8888+ end
8989+ end)
9090+9191+ :ok
9292+ end
9393+9494+ @spec send_ack(pid(), :gun.stream_ref(), integer()) :: :ok
9595+ defp send_ack(conn, stream, event_id) do
9696+ ack_message = Jason.encode!(%{type: "ack", id: event_id})
9797+9898+ :ok = :gun.ws_send(conn, stream, {:text, ack_message})
9999+ Logger.debug("[Drinkup.Tap] Acked event #{event_id}")
100100+ end
101101+102102+ @spec get_event_id(t()) :: integer()
103103+ defp get_event_id(%Event.Record{id: id}), do: id
104104+ defp get_event_id(%Event.Identity{id: id}), do: id
105105+end
+90
lib/tap/options.ex
···11+defmodule Drinkup.Tap.Options do
22+ @moduledoc """
33+ Configuration options for Tap indexer/backfill service connection.
44+55+ This module defines the configuration structure for connecting to and
66+ interacting with a Tap service. Tap simplifies AT Protocol sync by handling
77+ firehose connections, verification, backfill, and filtering server-side.
88+99+ ## Options
1010+1111+ - `:consumer` (required) - Module implementing `Drinkup.Tap.Consumer` behaviour
1212+ - `:name` - Unique name for this Tap instance in the supervision tree (default: `Drinkup.Tap`)
1313+ - `:host` - Tap service URL (default: `"http://localhost:2480"`)
1414+ - `:admin_password` - Optional password for authenticated Tap instances
1515+ - `:disable_acks` - Disable event acknowledgments (default: `false`)
1616+1717+ ## Example
1818+1919+ %{
2020+ consumer: MyTapConsumer,
2121+ name: MyTap,
2222+ host: "http://localhost:2480",
2323+ admin_password: "secret",
2424+ disable_acks: false
2525+ }
2626+ """
2727+2828+ use TypedStruct
2929+3030+ @default_host "http://localhost:2480"
3131+3232+ @typedoc """
3333+ Map of configuration options accepted by `Drinkup.Tap.child_spec/1`.
3434+ """
3535+ @type options() :: %{
3636+ required(:consumer) => consumer(),
3737+ optional(:name) => name(),
3838+ optional(:host) => host(),
3939+ optional(:admin_password) => admin_password(),
4040+ optional(:disable_acks) => disable_acks()
4141+ }
4242+4343+ @typedoc """
4444+ Module implementing the `Drinkup.Tap.Consumer` behaviour.
4545+ """
4646+ @type consumer() :: module()
4747+4848+ @typedoc """
4949+ Unique identifier for this Tap instance in the supervision tree.
5050+5151+ Used for Registry lookups and naming child processes.
5252+ """
5353+ @type name() :: atom()
5454+5555+ @typedoc """
5656+ HTTP/HTTPS URL of the Tap service.
5757+5858+ Defaults to `"http://localhost:2480"` which is Tap's default bind address.
5959+ """
6060+ @type host() :: String.t()
6161+6262+ @typedoc """
6363+ Optional password for HTTP Basic authentication.
6464+6565+ Required when connecting to a Tap service configured with `TAP_ADMIN_PASSWORD`.
6666+ The password is sent as `Basic admin:<password>` in the Authorization header.
6767+ """
6868+ @type admin_password() :: String.t() | nil
6969+7070+ @typedoc """
7171+ Whether to disable event acknowledgments.
7272+7373+ When `true`, events are not acknowledged to Tap regardless of consumer
7474+ return values. This matches Tap's `TAP_DISABLE_ACKS` environment variable.
7575+7676+ Defaults to `false` (acknowledgments enabled).
7777+ """
7878+ @type disable_acks() :: boolean()
7979+8080+ typedstruct do
8181+ field :consumer, consumer(), enforce: true
8282+ field :name, name(), default: Drinkup.Tap
8383+ field :host, host(), default: @default_host
8484+ field :admin_password, admin_password()
8585+ field :disable_acks, disable_acks(), default: false
8686+ end
8787+8888+ @spec from(options()) :: t()
8989+ def from(%{consumer: _} = options), do: struct(__MODULE__, options)
9090+end
+100
lib/tap/socket.ex
···11+defmodule Drinkup.Tap.Socket do
22+ @moduledoc """
33+ WebSocket connection handler for Tap indexer/backfill service.
44+55+ Implements the Drinkup.Socket behaviour to manage connections to a Tap service,
66+ handling JSON-encoded events and dispatching them to the configured consumer.
77+88+ Events are acknowledged after successful processing based on the consumer's
99+ return value:
1010+ - `:ok`, `{:ok, any()}`, or `nil` โ Success, ack sent to Tap
1111+ - `{:error, reason}` โ Failure, no ack (Tap will retry after timeout)
1212+ - Exception raised โ Failure, no ack (Tap will retry after timeout)
1313+ """
1414+1515+ use Drinkup.Socket
1616+1717+ require Logger
1818+ alias Drinkup.Tap.{Event, Options}
1919+2020+ @impl true
2121+ def init(opts) do
2222+ options = Keyword.fetch!(opts, :options)
2323+ {:ok, %{options: options, host: options.host}}
2424+ end
2525+2626+ def start_link(%Options{} = options, statem_opts) do
2727+ socket_opts = build_socket_opts(options)
2828+ Drinkup.Socket.start_link(__MODULE__, socket_opts, statem_opts)
2929+ end
3030+3131+ @impl true
3232+ def build_path(_data) do
3333+ "/channel"
3434+ end
3535+3636+ @impl true
3737+ def handle_frame({:text, json}, {%{options: options} = data, conn, stream}) do
3838+ case Jason.decode(json) do
3939+ {:ok, payload} ->
4040+ case Event.from(payload) do
4141+ nil ->
4242+ Logger.warning("Received unrecognized event from Tap: #{inspect(payload)}")
4343+ :noop
4444+4545+ event ->
4646+ Event.dispatch(event, options, conn, stream)
4747+ {:ok, data}
4848+ end
4949+5050+ {:error, reason} ->
5151+ Logger.error("Failed to decode JSON from Tap: #{inspect(reason)}")
5252+ :noop
5353+ end
5454+ end
5555+5656+ @impl true
5757+ def handle_frame({:binary, _binary}, _data) do
5858+ Logger.warning("Received unexpected binary frame from Tap")
5959+ :noop
6060+ end
6161+6262+ @impl true
6363+ def handle_frame(:close, _data) do
6464+ Logger.info("Websocket closed, reason unknown")
6565+ nil
6666+ end
6767+6868+ @impl true
6969+ def handle_frame({:close, errno, reason}, _data) do
7070+ Logger.info("Websocket closed, errno: #{errno}, reason: #{inspect(reason)}")
7171+ nil
7272+ end
7373+7474+ defp build_socket_opts(%Options{host: host, admin_password: admin_password} = options) do
7575+ base_opts = [
7676+ host: host,
7777+ options: options
7878+ ]
7979+8080+ if admin_password do
8181+ auth_header = build_auth_header(admin_password)
8282+8383+ gun_opts = %{
8484+ ws_opts: %{
8585+ headers: [{"authorization", auth_header}]
8686+ }
8787+ }
8888+8989+ Keyword.put(base_opts, :gun_opts, gun_opts)
9090+ else
9191+ base_opts
9292+ end
9393+ end
9494+9595+ @spec build_auth_header(String.t()) :: String.t()
9696+ defp build_auth_header(password) do
9797+ credentials = "admin:#{password}"
9898+ "Basic #{Base.encode64(credentials)}"
9999+ end
100100+end
+249
lib/tap.ex
···11+defmodule Drinkup.Tap do
22+ @moduledoc """
33+ Supervisor and HTTP API for Tap indexer/backfill service.
44+55+ Tap simplifies AT sync by handling the firehose connection, verification,
66+ backfill, and filtering. Your application connects to a Tap service and
77+ receives simple JSON events for only the repos and collections you care about.
88+99+ ## Usage
1010+1111+ Add Tap to your supervision tree:
1212+1313+ children = [
1414+ {Drinkup.Tap, %{
1515+ consumer: MyTapConsumer,
1616+ name: MyTap,
1717+ host: "http://localhost:2480",
1818+ admin_password: "secret" # optional
1919+ }}
2020+ ]
2121+2222+ Then interact with the Tap HTTP API:
2323+2424+ # Add repos to track (triggers backfill)
2525+ Drinkup.Tap.add_repos(MyTap, ["did:plc:abc123"])
2626+2727+ # Get stats
2828+ {:ok, count} = Drinkup.Tap.get_repo_count(MyTap)
2929+3030+ ## Configuration
3131+3232+ Tap itself is configured via environment variables. See the Tap documentation
3333+ for details on configuring collection filters, signal collections, and other
3434+ operational settings:
3535+ https://github.com/bluesky-social/indigo/blob/main/cmd/tap/README.md
3636+ """
3737+3838+ use Supervisor
3939+ alias Drinkup.Tap.Options
4040+4141+ @dialyzer nowarn_function: {:init, 1}
4242+ @impl true
4343+ def init({%Options{name: name} = drinkup_options, supervisor_options}) do
4444+ # Register options in Registry for HTTP API access
4545+ Registry.register(Drinkup.Registry, {name, TapOptions}, drinkup_options)
4646+4747+ children = [
4848+ {Task.Supervisor, name: {:via, Registry, {Drinkup.Registry, {name, TapTasks}}}},
4949+ {Drinkup.Tap.Socket, drinkup_options}
5050+ ]
5151+5252+ Supervisor.start_link(
5353+ children,
5454+ supervisor_options ++ [name: {:via, Registry, {Drinkup.Registry, {name, TapSupervisor}}}]
5555+ )
5656+ end
5757+5858+ @spec child_spec(Options.options()) :: Supervisor.child_spec()
5959+ def child_spec(%{} = options), do: child_spec({options, [strategy: :one_for_one]})
6060+6161+ @spec child_spec({Options.options(), Keyword.t()}) :: Supervisor.child_spec()
6262+ def child_spec({drinkup_options, supervisor_options}) do
6363+ %{
6464+ id: Map.get(drinkup_options, :name, __MODULE__),
6565+ start: {__MODULE__, :init, [{Options.from(drinkup_options), supervisor_options}]},
6666+ type: :supervisor,
6767+ restart: :permanent,
6868+ shutdown: 500
6969+ }
7070+ end
7171+7272+ # HTTP API Functions
7373+7474+ @doc """
7575+ Add DIDs to track.
7676+7777+ Triggers backfill for the specified DIDs. Historical events will be fetched
7878+ from each repo's PDS, followed by live events from the firehose.
7979+ """
8080+ @spec add_repos(atom(), [String.t()]) :: {:ok, term()} | {:error, term()}
8181+ def add_repos(name \\ Drinkup.Tap, dids) when is_list(dids) do
8282+ with {:ok, options} <- get_options(name),
8383+ {:ok, response} <- make_request(options, :post, "/repos/add", %{dids: dids}) do
8484+ {:ok, response}
8585+ end
8686+ end
8787+8888+ @doc """
8989+ Remove DIDs from tracking.
9090+9191+ Stops syncing the specified repos and deletes tracked repo metadata. Does not
9292+ delete buffered events in the outbox.
9393+ """
9494+ @spec remove_repos(atom(), [String.t()]) :: {:ok, term()} | {:error, term()}
9595+ def remove_repos(name \\ Drinkup.Tap, dids) when is_list(dids) do
9696+ with {:ok, options} <- get_options(name),
9797+ {:ok, response} <- make_request(options, :post, "/repos/remove", %{dids: dids}) do
9898+ {:ok, response}
9999+ end
100100+ end
101101+102102+ @doc """
103103+ Resolve a DID to its DID document.
104104+ """
105105+ @spec resolve_did(atom(), String.t()) :: {:ok, term()} | {:error, term()}
106106+ def resolve_did(name \\ Drinkup.Tap, did) when is_binary(did) do
107107+ with {:ok, options} <- get_options(name),
108108+ {:ok, response} <- make_request(options, :get, "/resolve/#{did}") do
109109+ {:ok, response}
110110+ end
111111+ end
112112+113113+ @doc """
114114+ Get info about a tracked repo.
115115+116116+ Returns repo state, repo rev, record count, error info, and retry count.
117117+ """
118118+ @spec get_repo_info(atom(), String.t()) :: {:ok, term()} | {:error, term()}
119119+ def get_repo_info(name \\ Drinkup.Tap, did) when is_binary(did) do
120120+ with {:ok, options} <- get_options(name),
121121+ {:ok, response} <- make_request(options, :get, "/info/#{did}") do
122122+ {:ok, response}
123123+ end
124124+ end
125125+126126+ @doc """
127127+ Get the total number of tracked repos.
128128+ """
129129+ @spec get_repo_count(atom()) :: {:ok, integer()} | {:error, term()}
130130+ def get_repo_count(name \\ Drinkup.Tap) do
131131+ with {:ok, options} <- get_options(name),
132132+ {:ok, response} <- make_request(options, :get, "/stats/repo-count") do
133133+ {:ok, response}
134134+ end
135135+ end
136136+137137+ @doc """
138138+ Get the total number of tracked records.
139139+ """
140140+ @spec get_record_count(atom()) :: {:ok, integer()} | {:error, term()}
141141+ def get_record_count(name \\ Drinkup.Tap) do
142142+ with {:ok, options} <- get_options(name),
143143+ {:ok, response} <- make_request(options, :get, "/stats/record-count") do
144144+ {:ok, response}
145145+ end
146146+ end
147147+148148+ @doc """
149149+ Get the number of events in the outbox buffer.
150150+ """
151151+ @spec get_outbox_buffer(atom()) :: {:ok, integer()} | {:error, term()}
152152+ def get_outbox_buffer(name \\ Drinkup.Tap) do
153153+ with {:ok, options} <- get_options(name),
154154+ {:ok, response} <- make_request(options, :get, "/stats/outbox-buffer") do
155155+ {:ok, response}
156156+ end
157157+ end
158158+159159+ @doc """
160160+ Get the number of events in the resync buffer.
161161+ """
162162+ @spec get_resync_buffer(atom()) :: {:ok, integer()} | {:error, term()}
163163+ def get_resync_buffer(name \\ Drinkup.Tap) do
164164+ with {:ok, options} <- get_options(name),
165165+ {:ok, response} <- make_request(options, :get, "/stats/resync-buffer") do
166166+ {:ok, response}
167167+ end
168168+ end
169169+170170+ @doc """
171171+ Get current firehose and list repos cursors.
172172+ """
173173+ @spec get_cursors(atom()) :: {:ok, map()} | {:error, term()}
174174+ def get_cursors(name \\ Drinkup.Tap) do
175175+ with {:ok, options} <- get_options(name),
176176+ {:ok, response} <- make_request(options, :get, "/stats/cursors") do
177177+ {:ok, response}
178178+ end
179179+ end
180180+181181+ @doc """
182182+ Check Tap health status.
183183+184184+ Returns `{:ok, %{"status" => "ok"}}` if healthy.
185185+ """
186186+ @spec health(atom()) :: {:ok, map()} | {:error, term()}
187187+ def health(name \\ Drinkup.Tap) do
188188+ with {:ok, options} <- get_options(name),
189189+ {:ok, response} <- make_request(options, :get, "/health") do
190190+ {:ok, response}
191191+ end
192192+ end
193193+194194+ # Private Functions
195195+196196+ @spec get_options(atom()) :: {:ok, Options.t()} | {:error, :not_found}
197197+ defp get_options(name) do
198198+ case Registry.lookup(Drinkup.Registry, {name, TapOptions}) do
199199+ [{_pid, options}] -> {:ok, options}
200200+ [] -> {:error, :not_found}
201201+ end
202202+ end
203203+204204+ @spec make_request(Options.t(), atom(), String.t(), map() | nil) ::
205205+ {:ok, term()} | {:error, term()}
206206+ defp make_request(options, method, path, body \\ nil) do
207207+ url = build_url(options.host, path)
208208+ headers = build_headers(options.admin_password)
209209+210210+ request_opts = [
211211+ method: method,
212212+ url: url,
213213+ headers: headers
214214+ ]
215215+216216+ request_opts =
217217+ if body do
218218+ Keyword.merge(request_opts, json: body)
219219+ else
220220+ request_opts
221221+ end
222222+223223+ case Req.request(request_opts) do
224224+ {:ok, %{status: status, body: body}} when status in 200..299 ->
225225+ {:ok, body}
226226+227227+ {:ok, %{status: status, body: body}} ->
228228+ {:error, {:http_error, status, body}}
229229+230230+ {:error, reason} ->
231231+ {:error, reason}
232232+ end
233233+ end
234234+235235+ @spec build_url(String.t(), String.t()) :: String.t()
236236+ defp build_url(host, path) do
237237+ host = String.trim_trailing(host, "/")
238238+ "#{host}#{path}"
239239+ end
240240+241241+ @spec build_headers(String.t() | nil) :: list()
242242+ defp build_headers(nil), do: []
243243+244244+ defp build_headers(admin_password) do
245245+ credentials = "admin:#{admin_password}"
246246+ auth_header = "Basic #{Base.encode64(credentials)}"
247247+ [{"authorization", auth_header}]
248248+ end
249249+end