Elixir ATProtocol firehose & subscription listener

feat: initial proof of concept

ovyerus.com 7cb3b53a 3df61b08

verified
+2 -1
.formatter.exs
··· 1 1 # Used by "mix format" 2 2 [ 3 - inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"] 3 + inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"], 4 + import_deps: [:typedstruct] 4 5 ]
+68 -12
lib/drinkup.ex
··· 1 - defmodule Drinkup do 2 - @moduledoc """ 3 - Documentation for `Drinkup`. 4 - """ 1 + defmodule Drinkup.Firehose do 2 + alias Drinkup.Firehose 3 + require Logger 5 4 6 - @doc """ 7 - Hello world. 5 + use WebSockex 8 6 9 - ## Examples 7 + @default_host "https://bsky.network" 10 8 11 - iex> Drinkup.hello() 12 - :world 9 + @op_regular 1 10 + @op_error -1 13 11 14 - """ 15 - def hello do 16 - :world 12 + # TODO: switch to Gun and GenServer? 13 + 14 + def start_link(opts \\ []) do 15 + opts = Keyword.validate!(opts, host: @default_host) 16 + host = Keyword.get(opts, :host) 17 + cursor = Keyword.get(opts, :cursor) 18 + 19 + url = 20 + "#{host}/xrpc/com.atproto.sync.subscribeRepos" 21 + |> URI.new!() 22 + |> URI.append_query(URI.encode_query(%{cursor: cursor})) 23 + |> URI.to_string() 24 + 25 + WebSockex.start_link(url, __MODULE__, %{cursor: cursor}) 17 26 end 27 + 28 + def handle_connect(conn, state) do 29 + Logger.info("Connected to Firehose at #{conn.host}#{conn.path}") 30 + {:ok, state} 31 + end 32 + 33 + def handle_frame({:binary, msg}, state) do 34 + with {:ok, header, next} <- CAR.DagCbor.decode(msg), 35 + {:ok, payload, _} <- CAR.DagCbor.decode(next), 36 + {%{"op" => @op_regular, "t" => type}, _} <- {header, payload}, 37 + message <- from_payload(type, payload) do 38 + case message do 39 + %Firehose.Commit{} = commit -> 40 + IO.inspect(commit.ops, label: commit.repo) 41 + 42 + msg -> 43 + IO.inspect(msg) 44 + end 45 + else 46 + {%{"op" => @op_error, "t" => type}, payload} -> 47 + Logger.error("Got error from Firehose: #{inspect({type, payload})}") 48 + 49 + {:error, reason} -> 50 + Logger.warning("Failed to decode frame from Firehose: #{inspect(reason)}") 51 + end 52 + 53 + {:ok, state} 54 + end 55 + 56 + def handle_frame({:text, msg}, state) do 57 + Logger.warning("Got unexpected text frame from Firehose #{inspect(msg)}") 58 + {:ok, state} 59 + end 60 + 61 + @spec from_payload(String.t(), map()) :: 62 + Firehose.Commit.t() 63 + | Firehose.Sync.t() 64 + | Firehose.Identity.t() 65 + | Firehose.Account.t() 66 + | Firehose.Info.t() 67 + | nil 68 + def from_payload("#commit", payload), do: Firehose.Commit.from(payload) 69 + def from_payload("#sync", payload), do: Firehose.Sync.from(payload) 70 + def from_payload("#identity", payload), do: Firehose.Identity.from(payload) 71 + def from_payload("#account", payload), do: Firehose.Account.from(payload) 72 + def from_payload("#info", payload), do: Firehose.Info.from(payload) 73 + def from_payload(_type, _payload), do: nil 18 74 end
+53
lib/firehose/account.ex
··· 1 + defmodule Drinkup.Firehose.Account do 2 + @moduledoc """ 3 + Struct for account events from the ATProto Firehose. 4 + """ 5 + 6 + use TypedStruct 7 + 8 + @type status() :: 9 + :takendown 10 + | :suspended 11 + | :deleted 12 + | :deactivated 13 + | :desynchronized 14 + | :throttled 15 + | String.t() 16 + 17 + typedstruct enforce: true do 18 + field :seq, integer() 19 + field :did, String.t() 20 + field :time, NaiveDateTime.t() 21 + field :active, bool() 22 + field :status, status(), enforce: false 23 + end 24 + 25 + @spec from(map()) :: t() 26 + def from(%{"seq" => seq, "did" => did, "time" => time, "active" => active} = msg) do 27 + status = recognise_status(Map.get(msg, "status")) 28 + time = NaiveDateTime.from_iso8601!(time) 29 + 30 + %__MODULE__{ 31 + seq: seq, 32 + did: did, 33 + time: time, 34 + active: active, 35 + status: status 36 + } 37 + end 38 + 39 + @spec recognise_status(String.t()) :: status() 40 + defp recognise_status(status) 41 + when status in [ 42 + "takendown", 43 + "suspended", 44 + "deleted", 45 + "deactivated", 46 + "desynchronized", 47 + "throttled" 48 + ], 49 + do: String.to_atom(status) 50 + 51 + defp recognise_status(status) when is_binary(status), do: status 52 + defp recognise_status(nil), do: nil 53 + end
+100
lib/firehose/commit.ex
··· 1 + defmodule Drinkup.Firehose.Commit do 2 + @moduledoc """ 3 + Struct for commit events from the ATProto Firehose. 4 + """ 5 + 6 + # TODO: see atp specs 7 + @type tid() :: String.t() 8 + 9 + alias __MODULE__.RepoOp 10 + use TypedStruct 11 + 12 + typedstruct enforce: true do 13 + field :seq, integer() 14 + # DEPCREATED 15 + field :rebase, bool() 16 + # DEPRECATED 17 + field :too_big, bool() 18 + field :repo, String.t() 19 + field :commit, binary() 20 + field :rev, tid() 21 + field :since, tid() | nil 22 + field :blocks, CAR.Archive.t() 23 + field :ops, list(RepoOp.t()) 24 + # DEPRECATED 25 + field :blobs, list(binary()) 26 + field :prev_data, binary(), enforce: nil 27 + field :time, NaiveDateTime.t() 28 + end 29 + 30 + @spec from(map()) :: t() 31 + def from( 32 + %{ 33 + "seq" => seq, 34 + "rebase" => rebase, 35 + "tooBig" => too_big, 36 + "repo" => repo, 37 + "commit" => commit, 38 + "rev" => rev, 39 + "since" => since, 40 + "blocks" => %CBOR.Tag{value: blocks}, 41 + "ops" => ops, 42 + "blobs" => blobs, 43 + "time" => time 44 + } = msg 45 + ) do 46 + prev_data = 47 + Map.get(msg, "prevData") 48 + 49 + time = NaiveDateTime.from_iso8601!(time) 50 + {:ok, blocks} = CAR.decode(blocks) 51 + 52 + %__MODULE__{ 53 + seq: seq, 54 + rebase: rebase, 55 + too_big: too_big, 56 + repo: repo, 57 + commit: commit, 58 + rev: rev, 59 + since: since, 60 + blocks: blocks, 61 + ops: Enum.map(ops, &RepoOp.from(&1, blocks)), 62 + blobs: blobs, 63 + prev_data: prev_data, 64 + time: time 65 + } 66 + end 67 + 68 + defmodule RepoOp do 69 + typedstruct enforce: true do 70 + @type action() :: :create | :update | :delete | String.t() 71 + 72 + field :action, action() 73 + field :path, String.t() 74 + field :cid, binary() 75 + field :prev, binary(), enforce: false 76 + field :record, map() | nil 77 + end 78 + 79 + @spec from(map(), CAR.Archive.t()) :: t() 80 + def from(%{"action" => action, "path" => path, "cid" => cid} = op, %CAR.Archive{} = blocks) do 81 + prev = Map.get(op, "prev") 82 + record = CAR.Archive.get_block(blocks, cid) 83 + 84 + %__MODULE__{ 85 + action: recognise_action(action), 86 + path: path, 87 + cid: cid, 88 + prev: prev, 89 + record: record 90 + } 91 + end 92 + 93 + @spec recognise_action(String.t()) :: action() 94 + defp recognise_action(action) when action in ["create", "update", "delete"], 95 + do: String.to_atom(action) 96 + 97 + defp recognise_action(action) when is_binary(action), do: action 98 + defp recognise_action(nil), do: nil 99 + end 100 + end
+27
lib/firehose/identity.ex
··· 1 + defmodule Drinkup.Firehose.Identity do 2 + @moduledoc """ 3 + Struct for identity events from the ATProto Firehose. 4 + """ 5 + 6 + use TypedStruct 7 + 8 + typedstruct enforce: true do 9 + field :seq, integer() 10 + field :did, String.t() 11 + field :time, NaiveDateTime.t() 12 + field :handle, String.t() | nil 13 + end 14 + 15 + @spec from(map()) :: t() 16 + def from(%{"seq" => seq, "did" => did, "time" => time} = msg) do 17 + handle = Map.get(msg, "handle") 18 + time = NaiveDateTime.from_iso8601!(time) 19 + 20 + %__MODULE__{ 21 + seq: seq, 22 + did: did, 23 + time: time, 24 + handle: handle 25 + } 26 + end 27 + end
+22
lib/firehose/info.ex
··· 1 + defmodule Drinkup.Firehose.Info do 2 + @moduledoc """ 3 + Struct for info events from the ATProto Firehose. 4 + """ 5 + 6 + use TypedStruct 7 + 8 + typedstruct enforce: true do 9 + field :name, String.t() 10 + field :message, String.t() | nil 11 + end 12 + 13 + @spec from(map()) :: t() 14 + def from(%{"name" => name} = msg) do 15 + message = Map.get(msg, "message") 16 + 17 + %__MODULE__{ 18 + name: name, 19 + message: message 20 + } 21 + end 22 + end
+28
lib/firehose/sync.ex
··· 1 + defmodule Drinkup.Firehose.Sync do 2 + @moduledoc """ 3 + Struct for sync events from the ATProto Firehose. 4 + """ 5 + 6 + use TypedStruct 7 + 8 + typedstruct enforce: true do 9 + field :seq, integer() 10 + field :did, String.t() 11 + field :blocks, binary() 12 + field :rev, String.t() 13 + field :time, NaiveDateTime.t() 14 + end 15 + 16 + @spec from(map()) :: t() 17 + def from(%{"seq" => seq, "did" => did, "blocks" => blocks, "rev" => rev, "time" => time}) do 18 + time = NaiveDateTime.from_iso8601!(time) 19 + 20 + %__MODULE__{ 21 + seq: seq, 22 + did: did, 23 + blocks: blocks, 24 + rev: rev, 25 + time: time 26 + } 27 + end 28 + end
+5 -2
mix.exs
··· 21 21 # Run "mix help deps" to learn about dependencies. 22 22 defp deps do 23 23 [ 24 - # {:dep_from_hexpm, "~> 0.3.0"}, 25 - # {:dep_from_git, git: "https://github.com/elixir-lang/my_dep.git", tag: "0.1.0"} 24 + {:cbor, "~> 1.0.0"}, 25 + {:car, "~> 0.1.0"}, 26 + {:credo, "~> 1.7", only: [:dev, :test], runtime: false}, 27 + {:typedstruct, "~> 0.5"}, 28 + {:websockex, "~> 0.5.0", hex: :websockex_wt} 26 29 ] 27 30 end 28 31 end
+12
mix.lock
··· 1 + %{ 2 + "bunt": {:hex, :bunt, "1.0.0", "081c2c665f086849e6d57900292b3a161727ab40431219529f13c4ddcf3e7a44", [:mix], [], "hexpm", "dc5f86aa08a5f6fa6b8096f0735c4e76d54ae5c9fa2c143e5a1fc7c1cd9bb6b5"}, 3 + "car": {:hex, :car, "0.1.1", "a5bc4c5c1be96eab437634b3c0ccad1fe17b5e3d68c22a4031241ae1345aebd4", [:mix], [{:cbor, "~> 1.0.0", [hex: :cbor, repo: "hexpm", optional: false]}, {:typedstruct, "~> 0.5", [hex: :typedstruct, repo: "hexpm", optional: false]}, {:varint, "~> 1.4", [hex: :varint, repo: "hexpm", optional: false]}], "hexpm", "f895dda8123d04dd336db5a2bf0d0b47f4559cd5383f83fcca0700c1b45bfb6a"}, 4 + "cbor": {:hex, :cbor, "1.0.1", "39511158e8ea5a57c1fcb9639aaa7efde67129678fee49ebbda780f6f24959b0", [:mix], [], "hexpm", "5431acbe7a7908f17f6a9cd43311002836a34a8ab01876918d8cfb709cd8b6a2"}, 5 + "credo": {:hex, :credo, "1.7.12", "9e3c20463de4b5f3f23721527fcaf16722ec815e70ff6c60b86412c695d426c1", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "8493d45c656c5427d9c729235b99d498bd133421f3e0a683e5c1b561471291e5"}, 6 + "file_system": {:hex, :file_system, "1.1.0", "08d232062284546c6c34426997dd7ef6ec9f8bbd090eb91780283c9016840e8f", [:mix], [], "hexpm", "bfcf81244f416871f2a2e15c1b515287faa5db9c6bcf290222206d120b3d43f6"}, 7 + "jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"}, 8 + "telemetry": {:hex, :telemetry, "1.3.0", "fedebbae410d715cf8e7062c96a1ef32ec22e764197f70cda73d82778d61e7a2", [:rebar3], [], "hexpm", "7015fc8919dbe63764f4b4b87a95b7c0996bd539e0d499be6ec9d7f3875b79e6"}, 9 + "typedstruct": {:hex, :typedstruct, "0.5.3", "d68ae424251a41b81a8d0c485328ab48edbd3858f3565bbdac21b43c056fc9b4", [:make, :mix], [], "hexpm", "b53b8186701417c0b2782bf02a2db5524f879b8488f91d1d83b97d84c2943432"}, 10 + "varint": {:hex, :varint, "1.5.1", "17160c70d0428c3f8a7585e182468cac10bbf165c2360cf2328aaa39d3fb1795", [:mix], [], "hexpm", "24f3deb61e91cb988056de79d06f01161dd01be5e0acae61d8d936a552f1be73"}, 11 + "websockex": {:hex, :websockex_wt, "0.5.0", "8725b3bc741e7a682c21310610d033f0aaeedfb3238d9c8d5522c345c04f3f93", [:mix], [{:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "f854a5e7dbd61e852ee74565d862d606e884425c8867ddaa49c8a6c6f43d832a"}, 12 + }