Elixir ATProtocol firehose & subscription listener

refactor: rewrite socket using gen_statem and gun

Inspired by Nostrum's sturcture.

ovyerus.com 7c948f8f 4ec6c6d7

verified
+1 -85
lib/drinkup.ex
··· 1 - defmodule Drinkup.Firehose do 2 - alias Drinkup.Firehose 3 - require Logger 4 - 5 - use WebSockex 6 - 7 - @default_host "https://bsky.network" 8 - 9 - @op_regular 1 10 - @op_error -1 11 - 12 - # TODO: switch to Gun and GenServer? 13 - 14 - def start_link(opts \\ []) do 15 - opts = Keyword.validate!(opts, host: @default_host) 16 - host = Keyword.get(opts, :host) 17 - cursor = Keyword.get(opts, :cursor) 18 - 19 - url = 20 - "#{host}/xrpc/com.atproto.sync.subscribeRepos" 21 - |> URI.new!() 22 - |> URI.append_query(URI.encode_query(%{cursor: cursor})) 23 - |> URI.to_string() 24 - 25 - WebSockex.start_link(url, __MODULE__, 0) 26 - end 27 - 28 - def handle_connect(conn, state) do 29 - Logger.info("Connected to Firehose at #{conn.host}#{conn.path}") 30 - {:ok, state} 31 - end 32 - 33 - def handle_frame({:binary, msg}, state) do 34 - with {:ok, header, next} <- CAR.DagCbor.decode(msg), 35 - {:ok, payload, _} <- CAR.DagCbor.decode(next), 36 - {%{"op" => @op_regular, "t" => type}, _} <- {header, payload}, 37 - true <- type == "#info" || valid_seq?(state, payload["seq"]), 38 - message <- 39 - from_payload(type, payload) do 40 - case message do 41 - %Firehose.Commit{} = commit -> 42 - IO.inspect(commit.ops, label: commit.repo) 43 - 44 - msg -> 45 - IO.inspect(msg) 46 - end 47 - 48 - {:ok, payload["seq"] || state} 49 - else 50 - false -> 51 - Logger.error("Got out of sequence or invalid `seq` from Firehose") 52 - {:ok, state} 53 - 54 - {%{"op" => @op_error, "t" => type}, payload} -> 55 - Logger.error("Got error from Firehose: #{inspect({type, payload})}") 56 - {:ok, state} 57 - 58 - {:error, reason} -> 59 - Logger.warning("Failed to decode frame from Firehose: #{inspect(reason)}") 60 - {:ok, state} 61 - end 62 - end 63 - 64 - def handle_frame({:text, msg}, state) do 65 - Logger.warning("Got unexpected text frame from Firehose: #{inspect(msg)}") 66 - {:ok, state} 67 - end 68 - 69 - @spec valid_seq?(integer(), any()) :: boolean() 70 - defp valid_seq?(last_seq, seq) when is_integer(seq), do: seq > last_seq 71 - defp valid_seq?(_last_seq, _seq), do: false 72 - 73 - @spec from_payload(String.t(), map()) :: 74 - Firehose.Commit.t() 75 - | Firehose.Sync.t() 76 - | Firehose.Identity.t() 77 - | Firehose.Account.t() 78 - | Firehose.Info.t() 79 - | nil 80 - defp from_payload("#commit", payload), do: Firehose.Commit.from(payload) 81 - defp from_payload("#sync", payload), do: Firehose.Sync.from(payload) 82 - defp from_payload("#identity", payload), do: Firehose.Identity.from(payload) 83 - defp from_payload("#account", payload), do: Firehose.Account.from(payload) 84 - defp from_payload("#info", payload), do: Firehose.Info.from(payload) 85 - defp from_payload(_type, _payload), do: nil 1 + defmodule Drinkup do 86 2 end
+22
lib/event.ex
··· 1 + defmodule Drinkup.Event do 2 + alias Drinkup.Event 3 + 4 + @spec from(String.t(), map()) :: 5 + Event.Commit.t() 6 + | Event.Sync.t() 7 + | Event.Identity.t() 8 + | Event.Account.t() 9 + | Event.Info.t() 10 + | nil 11 + def from("#commit", payload), do: Event.Commit.from(payload) 12 + def from("#sync", payload), do: Event.Sync.from(payload) 13 + def from("#identity", payload), do: Event.Identity.from(payload) 14 + def from("#account", payload), do: Event.Account.from(payload) 15 + def from("#info", payload), do: Event.Info.from(payload) 16 + def from(_type, _payload), do: nil 17 + 18 + @spec valid_seq?(integer() | nil, any()) :: boolean() 19 + def valid_seq?(nil, seq) when is_integer(seq), do: true 20 + def valid_seq?(last_seq, seq) when is_integer(last_seq) and is_integer(seq), do: seq > last_seq 21 + def valid_seq?(_last_seq, _seq), do: false 22 + end
+1 -1
lib/firehose/account.ex lib/event/account.ex
··· 1 - defmodule Drinkup.Firehose.Account do 1 + defmodule Drinkup.Event.Account do 2 2 @moduledoc """ 3 3 Struct for account events from the ATProto Firehose. 4 4 """
+1 -1
lib/firehose/commit.ex lib/event/commit.ex
··· 1 - defmodule Drinkup.Firehose.Commit do 1 + defmodule Drinkup.Event.Commit do 2 2 @moduledoc """ 3 3 Struct for commit events from the ATProto Firehose. 4 4 """
+1 -1
lib/firehose/identity.ex lib/event/identity.ex
··· 1 - defmodule Drinkup.Firehose.Identity do 1 + defmodule Drinkup.Event.Identity do 2 2 @moduledoc """ 3 3 Struct for identity events from the ATProto Firehose. 4 4 """
+1 -1
lib/firehose/info.ex lib/event/info.ex
··· 1 - defmodule Drinkup.Firehose.Info do 1 + defmodule Drinkup.Event.Info do 2 2 @moduledoc """ 3 3 Struct for info events from the ATProto Firehose. 4 4 """
+1 -1
lib/firehose/sync.ex lib/event/sync.ex
··· 1 - defmodule Drinkup.Firehose.Sync do 1 + defmodule Drinkup.Event.Sync do 2 2 @moduledoc """ 3 3 Struct for sync events from the ATProto Firehose. 4 4 """
+165
lib/socket.ex
··· 1 + defmodule Drinkup.Socket do 2 + @moduledoc """ 3 + gen_statem process for managing the websocket connection to an ATProto relay. 4 + """ 5 + 6 + require Logger 7 + alias Drinkup.Event 8 + 9 + @behaviour :gen_statem 10 + @default_host "https://bsky.network" 11 + @timeout :timer.seconds(5) 12 + # TODO: `flow` determines messages in buffer. Determine ideal value? 13 + @flow 10 14 + 15 + @op_regular 1 16 + @op_error -1 17 + 18 + defstruct [:host, :seq, :conn, :stream] 19 + 20 + @impl true 21 + def callback_mode, do: [:state_functions, :state_enter] 22 + 23 + def start_link(opts \\ []) do 24 + opts = Keyword.validate!(opts, host: @default_host) 25 + host = Keyword.get(opts, :host) 26 + cursor = Keyword.get(opts, :cursor) 27 + 28 + :gen_statem.start_link(__MODULE__, {host, cursor}, []) 29 + end 30 + 31 + @impl true 32 + def init({host, cursor}) do 33 + data = %__MODULE__{host: host, seq: cursor} 34 + {:ok, :disconnected, data, [{:next_event, :internal, :connect}]} 35 + end 36 + 37 + def disconnected(:enter, _from, data) do 38 + Logger.debug("Initial connection") 39 + # TODO: differentiate between initial & reconnects, probably stuff to do with seq 40 + {:next_state, :disconnected, data} 41 + end 42 + 43 + def disconnected(:internal, :connect, data) do 44 + {:next_state, :connecting_http, data} 45 + end 46 + 47 + def connecting_http(:enter, _from, data) do 48 + Logger.debug("Connecting to http") 49 + 50 + %{host: host, port: port} = URI.new!(data.host) 51 + 52 + {:ok, conn} = 53 + :gun.open(:binary.bin_to_list(host), port, %{ 54 + retry: 0, 55 + protocols: [:http], 56 + connect_timeout: @timeout, 57 + domain_lookup_timeout: @timeout, 58 + tls_handshake_timeout: @timeout, 59 + tls_opts: [ 60 + verify: :verify_peer, 61 + cacerts: :certifi.cacerts(), 62 + depth: 3, 63 + customize_hostname_check: [ 64 + match_fun: :public_key.pkix_verify_hostname_match_fun(:https) 65 + ] 66 + ] 67 + }) 68 + 69 + {:keep_state, %{data | conn: conn}, [{:state_timeout, @timeout, :connect_timeout}]} 70 + end 71 + 72 + def connecting_http(:info, {:gun_up, _conn, :http}, data) do 73 + {:next_state, :connecting_ws, data} 74 + end 75 + 76 + def connecting_http(:state_timeout, :connect_timeout, _data) do 77 + {:stop, :connect_http_timeout} 78 + end 79 + 80 + def connecting_ws(:enter, _from, %{conn: conn, seq: seq} = data) do 81 + Logger.debug("Upgrading connection to websocket") 82 + path = "/xrpc/com.atproto.sync.subscribeRepos?" <> URI.encode_query(%{cursor: seq}) 83 + stream = :gun.ws_upgrade(conn, path, [], %{flow: @flow}) 84 + {:keep_state, %{data | stream: stream}, [{:state_timeout, @timeout, :upgrade_timeout}]} 85 + end 86 + 87 + def connecting_ws(:info, {:gun_upgrade, _conn, _stream, ["websocket"], _headers}, data) do 88 + {:next_state, :connected, data} 89 + end 90 + 91 + def connecting_ws(:state_timeout, :upgrade_timeout, _data) do 92 + {:stop, :connect_ws_timeout} 93 + end 94 + 95 + def connected(:enter, _from, _data) do 96 + Logger.debug("Connected to websocket") 97 + :keep_state_and_data 98 + end 99 + 100 + def connected(:info, {:gun_ws, conn, stream, {:binary, frame}}, data) do 101 + # TODO: let clients specify a handler for raw* (*decoded) packets to support any atproto subscription 102 + # Will also need support for JSON frames 103 + with {:ok, header, next} <- CAR.DagCbor.decode(frame), 104 + {:ok, payload, _} <- CAR.DagCbor.decode(next), 105 + {%{"op" => @op_regular, "t" => type}, _} <- {header, payload}, 106 + true <- type == "#info" || Event.valid_seq?(data.seq, payload["seq"]), 107 + data <- %{data | seq: payload["seq"] || data.seq}, 108 + message <- 109 + Event.from(type, payload) do 110 + :ok = :gun.update_flow(conn, stream, @flow) 111 + 112 + case message do 113 + %Event.Commit{} = commit -> 114 + IO.inspect(commit.ops, label: commit.repo) 115 + 116 + msg -> 117 + IO.inspect(msg) 118 + end 119 + 120 + {:keep_state, data} 121 + else 122 + false -> 123 + Logger.error("Got out of sequence or invalid `seq` from Firehose") 124 + {:keep_state, data} 125 + 126 + {%{"op" => @op_error, "t" => type}, payload} -> 127 + Logger.error("Got error from Firehose: #{inspect({type, payload})}") 128 + {:keep_state, data} 129 + 130 + {:error, reason} -> 131 + Logger.warning("Failed to decode frame from Firehose: #{inspect(reason)}") 132 + {:keep_state, data} 133 + end 134 + end 135 + 136 + def connected(:info, {:gun_ws, _conn, _stream, :close}, _data) do 137 + Logger.info("Websocket closed, reason unknown") 138 + {:keep_state_and_data, [{:next_event, :internal, :reconnect}]} 139 + end 140 + 141 + def connected(:info, {:gun_ws, _conn, _stream, {:close, errno, reason}}, _data) do 142 + Logger.info("Websocket closed, errno: #{errno}, reason: #{inspect(reason)}") 143 + {:keep_state_and_data, [{:next_event, :internal, :reconnect}]} 144 + end 145 + 146 + def connected(:info, {:gun_down, old_conn, _proto, _reason, _killed_streams}, %{conn: new_conn}) 147 + when old_conn != new_conn do 148 + Logger.debug("Ignoring received :gun_down for a previous connection.") 149 + :keep_state_and_data 150 + end 151 + 152 + def connected(:info, {:gun_down, _conn, _proto, _reason, _killed_streams}, _data) do 153 + Logger.info("Websocket connection killed. Attempting to reconnect") 154 + {:keep_state_and_data, [{:next_event, :internal, :reconnect}]} 155 + end 156 + 157 + def connected(:internal, :reconnect, %{conn: conn} = data) do 158 + :ok = :gun.close(conn) 159 + :ok = :gun.flush(conn) 160 + 161 + # TODO: reconnect backoff 162 + {:next_state, :disconnected, %{data | conn: nil, stream: nil}, 163 + [{:next_event, :internal, :connect}]} 164 + end 165 + end
+4 -3
mix.exs
··· 21 21 # Run "mix help deps" to learn about dependencies. 22 22 defp deps do 23 23 [ 24 - {:cbor, "~> 1.0.0"}, 25 24 {:car, "~> 0.1.0"}, 25 + {:cbor, "~> 1.0.0"}, 26 + {:certifi, "~> 2.15"}, 26 27 {:credo, "~> 1.7", only: [:dev, :test], runtime: false}, 27 - {:typedstruct, "~> 0.5"}, 28 - {:websockex, "~> 0.5.0", hex: :websockex_wt} 28 + {:gun, "~> 2.2"}, 29 + {:typedstruct, "~> 0.5"} 29 30 ] 30 31 end 31 32 end
+3 -2
mix.lock
··· 2 2 "bunt": {:hex, :bunt, "1.0.0", "081c2c665f086849e6d57900292b3a161727ab40431219529f13c4ddcf3e7a44", [:mix], [], "hexpm", "dc5f86aa08a5f6fa6b8096f0735c4e76d54ae5c9fa2c143e5a1fc7c1cd9bb6b5"}, 3 3 "car": {:hex, :car, "0.1.1", "a5bc4c5c1be96eab437634b3c0ccad1fe17b5e3d68c22a4031241ae1345aebd4", [:mix], [{:cbor, "~> 1.0.0", [hex: :cbor, repo: "hexpm", optional: false]}, {:typedstruct, "~> 0.5", [hex: :typedstruct, repo: "hexpm", optional: false]}, {:varint, "~> 1.4", [hex: :varint, repo: "hexpm", optional: false]}], "hexpm", "f895dda8123d04dd336db5a2bf0d0b47f4559cd5383f83fcca0700c1b45bfb6a"}, 4 4 "cbor": {:hex, :cbor, "1.0.1", "39511158e8ea5a57c1fcb9639aaa7efde67129678fee49ebbda780f6f24959b0", [:mix], [], "hexpm", "5431acbe7a7908f17f6a9cd43311002836a34a8ab01876918d8cfb709cd8b6a2"}, 5 + "certifi": {:hex, :certifi, "2.15.0", "0e6e882fcdaaa0a5a9f2b3db55b1394dba07e8d6d9bcad08318fb604c6839712", [:rebar3], [], "hexpm", "b147ed22ce71d72eafdad94f055165c1c182f61a2ff49df28bcc71d1d5b94a60"}, 6 + "cowlib": {:hex, :cowlib, "2.15.0", "3c97a318a933962d1c12b96ab7c1d728267d2c523c25a5b57b0f93392b6e9e25", [:make, :rebar3], [], "hexpm", "4f00c879a64b4fe7c8fcb42a4281925e9ffdb928820b03c3ad325a617e857532"}, 5 7 "credo": {:hex, :credo, "1.7.12", "9e3c20463de4b5f3f23721527fcaf16722ec815e70ff6c60b86412c695d426c1", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "8493d45c656c5427d9c729235b99d498bd133421f3e0a683e5c1b561471291e5"}, 6 8 "file_system": {:hex, :file_system, "1.1.0", "08d232062284546c6c34426997dd7ef6ec9f8bbd090eb91780283c9016840e8f", [:mix], [], "hexpm", "bfcf81244f416871f2a2e15c1b515287faa5db9c6bcf290222206d120b3d43f6"}, 9 + "gun": {:hex, :gun, "2.2.0", "b8f6b7d417e277d4c2b0dc3c07dfdf892447b087f1cc1caff9c0f556b884e33d", [:make, :rebar3], [{:cowlib, ">= 2.15.0 and < 3.0.0", [hex: :cowlib, repo: "hexpm", optional: false]}], "hexpm", "76022700c64287feb4df93a1795cff6741b83fb37415c40c34c38d2a4645261a"}, 7 10 "jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"}, 8 - "telemetry": {:hex, :telemetry, "1.3.0", "fedebbae410d715cf8e7062c96a1ef32ec22e764197f70cda73d82778d61e7a2", [:rebar3], [], "hexpm", "7015fc8919dbe63764f4b4b87a95b7c0996bd539e0d499be6ec9d7f3875b79e6"}, 9 11 "typedstruct": {:hex, :typedstruct, "0.5.3", "d68ae424251a41b81a8d0c485328ab48edbd3858f3565bbdac21b43c056fc9b4", [:make, :mix], [], "hexpm", "b53b8186701417c0b2782bf02a2db5524f879b8488f91d1d83b97d84c2943432"}, 10 12 "varint": {:hex, :varint, "1.5.1", "17160c70d0428c3f8a7585e182468cac10bbf165c2360cf2328aaa39d3fb1795", [:mix], [], "hexpm", "24f3deb61e91cb988056de79d06f01161dd01be5e0acae61d8d936a552f1be73"}, 11 - "websockex": {:hex, :websockex_wt, "0.5.0", "8725b3bc741e7a682c21310610d033f0aaeedfb3238d9c8d5522c345c04f3f93", [:mix], [{:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "f854a5e7dbd61e852ee74565d862d606e884425c8867ddaa49c8a6c6f43d832a"}, 12 13 }