Elixir ATProtocol ingestion and sync library.
at main 100 lines 2.6 kB view raw
1defmodule Drinkup.Tap.Socket do 2 @moduledoc """ 3 WebSocket connection handler for Tap indexer/backfill service. 4 5 Implements the Drinkup.Socket behaviour to manage connections to a Tap service, 6 handling JSON-encoded events and dispatching them to the configured consumer. 7 8 Events are acknowledged after successful processing based on the consumer's 9 return value: 10 - `:ok`, `{:ok, any()}`, or `nil` → Success, ack sent to Tap 11 - `{:error, reason}` → Failure, no ack (Tap will retry after timeout) 12 - Exception raised → Failure, no ack (Tap will retry after timeout) 13 """ 14 15 use Drinkup.Socket 16 17 require Logger 18 alias Drinkup.Tap.{Event, Options} 19 20 @impl true 21 def init(opts) do 22 options = Keyword.fetch!(opts, :options) 23 {:ok, %{options: options, host: options.host}} 24 end 25 26 def start_link(%Options{} = options, statem_opts) do 27 socket_opts = build_socket_opts(options) 28 Drinkup.Socket.start_link(__MODULE__, socket_opts, statem_opts) 29 end 30 31 @impl true 32 def build_path(_data) do 33 "/channel" 34 end 35 36 @impl true 37 def handle_frame({:text, json}, {%{options: options} = data, conn, stream}) do 38 case Jason.decode(json) do 39 {:ok, payload} -> 40 case Event.from(payload) do 41 nil -> 42 Logger.warning("Received unrecognized event from Tap: #{inspect(payload)}") 43 :noop 44 45 event -> 46 Event.dispatch(event, options, conn, stream) 47 {:ok, data} 48 end 49 50 {:error, reason} -> 51 Logger.error("Failed to decode JSON from Tap: #{inspect(reason)}") 52 :noop 53 end 54 end 55 56 @impl true 57 def handle_frame({:binary, _binary}, _data) do 58 Logger.warning("Received unexpected binary frame from Tap") 59 :noop 60 end 61 62 @impl true 63 def handle_frame(:close, _data) do 64 Logger.info("Websocket closed, reason unknown") 65 nil 66 end 67 68 @impl true 69 def handle_frame({:close, errno, reason}, _data) do 70 Logger.info("Websocket closed, errno: #{errno}, reason: #{inspect(reason)}") 71 nil 72 end 73 74 defp build_socket_opts(%Options{host: host, admin_password: admin_password} = options) do 75 base_opts = [ 76 host: host, 77 options: options 78 ] 79 80 if admin_password do 81 auth_header = build_auth_header(admin_password) 82 83 gun_opts = %{ 84 ws_opts: %{ 85 headers: [{"authorization", auth_header}] 86 } 87 } 88 89 Keyword.put(base_opts, :gun_opts, gun_opts) 90 else 91 base_opts 92 end 93 end 94 95 @spec build_auth_header(String.t()) :: String.t() 96 defp build_auth_header(password) do 97 credentials = "admin:#{password}" 98 "Basic #{Base.encode64(credentials)}" 99 end 100end