Elixir ATProtocol ingestion and sync library.
at main 90 lines 2.5 kB view raw
1defmodule Drinkup.Firehose.Socket do 2 @moduledoc """ 3 WebSocket connection handler for ATProto relay subscriptions. 4 5 Implements the Drinkup.Socket behaviour to manage connections to an ATProto 6 Firehose relay, handling CAR/CBOR-encoded frames and dispatching events to 7 the configured consumer. 8 """ 9 10 use Drinkup.Socket 11 12 require Logger 13 alias Drinkup.Firehose.{Event, Options} 14 15 @op_regular 1 16 @op_error -1 17 18 @impl true 19 def init(opts) do 20 options = Keyword.fetch!(opts, :options) 21 {:ok, %{seq: options.cursor, options: options, host: options.host}} 22 end 23 24 def start_link(%Options{} = options, statem_opts) do 25 # Build opts for Drinkup.Socket from Options struct 26 socket_opts = [ 27 host: options.host, 28 cursor: options.cursor, 29 options: options 30 ] 31 32 Drinkup.Socket.start_link(__MODULE__, socket_opts, statem_opts) 33 end 34 35 @impl true 36 def build_path(%{seq: seq}) do 37 cursor_param = if seq, do: %{cursor: seq}, else: %{} 38 "/xrpc/com.atproto.sync.subscribeRepos?" <> URI.encode_query(cursor_param) 39 end 40 41 @impl true 42 def handle_frame({:binary, frame}, {%{seq: seq, options: options} = data, _conn, _stream}) do 43 with {:ok, header, next} <- CAR.DagCbor.decode(frame), 44 {:ok, payload, _} <- CAR.DagCbor.decode(next), 45 {%{"op" => @op_regular, "t" => type}, _} <- {header, payload}, 46 true <- Event.valid_seq?(seq, payload["seq"]) do 47 new_seq = payload["seq"] || seq 48 49 case Event.from(type, payload) do 50 nil -> 51 Logger.warning("Received unrecognised event from firehose: #{inspect({type, payload})}") 52 53 message -> 54 Event.dispatch(message, options) 55 end 56 57 {:ok, %{data | seq: new_seq}} 58 else 59 false -> 60 Logger.error("Got out of sequence or invalid `seq` from Firehose") 61 :noop 62 63 {%{"op" => @op_error, "t" => type}, payload} -> 64 Logger.error("Got error from Firehose: #{inspect({type, payload})}") 65 :noop 66 67 {:error, reason} -> 68 Logger.warning("Failed to decode frame from Firehose: #{inspect(reason)}") 69 :noop 70 end 71 end 72 73 @impl true 74 def handle_frame(:close, _data) do 75 Logger.info("Websocket closed, reason unknown") 76 nil 77 end 78 79 @impl true 80 def handle_frame({:close, errno, reason}, _data) do 81 Logger.info("Websocket closed, errno: #{errno}, reason: #{inspect(reason)}") 82 nil 83 end 84 85 @impl true 86 def handle_frame({:text, _text}, _data) do 87 Logger.warning("Received unexpected text frame from Firehose") 88 :noop 89 end 90end