Elixir ATProtocol ingestion and sync library.
1defmodule Drinkup.Tap.Socket do
2 @moduledoc """
3 WebSocket connection handler for Tap indexer/backfill service.
4
5 Implements the Drinkup.Socket behaviour to manage connections to a Tap service,
6 handling JSON-encoded events and dispatching them to the configured consumer.
7
8 Events are acknowledged after successful processing based on the consumer's
9 return value:
10 - `:ok`, `{:ok, any()}`, or `nil` → Success, ack sent to Tap
11 - `{:error, reason}` → Failure, no ack (Tap will retry after timeout)
12 - Exception raised → Failure, no ack (Tap will retry after timeout)
13 """
14
15 use Drinkup.Socket
16
17 require Logger
18 alias Drinkup.Tap.{Event, Options}
19
20 @impl true
21 def init(opts) do
22 options = Keyword.fetch!(opts, :options)
23 {:ok, %{options: options, host: options.host}}
24 end
25
26 def start_link(%Options{} = options, statem_opts) do
27 socket_opts = build_socket_opts(options)
28 Drinkup.Socket.start_link(__MODULE__, socket_opts, statem_opts)
29 end
30
31 @impl true
32 def build_path(_data) do
33 "/channel"
34 end
35
36 @impl true
37 def handle_frame({:text, json}, {%{options: options} = data, conn, stream}) do
38 case Jason.decode(json) do
39 {:ok, payload} ->
40 case Event.from(payload) do
41 nil ->
42 Logger.warning("Received unrecognized event from Tap: #{inspect(payload)}")
43 :noop
44
45 event ->
46 Event.dispatch(event, options, conn, stream)
47 {:ok, data}
48 end
49
50 {:error, reason} ->
51 Logger.error("Failed to decode JSON from Tap: #{inspect(reason)}")
52 :noop
53 end
54 end
55
56 @impl true
57 def handle_frame({:binary, _binary}, _data) do
58 Logger.warning("Received unexpected binary frame from Tap")
59 :noop
60 end
61
62 @impl true
63 def handle_frame(:close, _data) do
64 Logger.info("Websocket closed, reason unknown")
65 nil
66 end
67
68 @impl true
69 def handle_frame({:close, errno, reason}, _data) do
70 Logger.info("Websocket closed, errno: #{errno}, reason: #{inspect(reason)}")
71 nil
72 end
73
74 defp build_socket_opts(%Options{host: host, admin_password: admin_password} = options) do
75 base_opts = [
76 host: host,
77 options: options
78 ]
79
80 if admin_password do
81 auth_header = build_auth_header(admin_password)
82
83 gun_opts = %{
84 ws_opts: %{
85 headers: [{"authorization", auth_header}]
86 }
87 }
88
89 Keyword.put(base_opts, :gun_opts, gun_opts)
90 else
91 base_opts
92 end
93 end
94
95 @spec build_auth_header(String.t()) :: String.t()
96 defp build_auth_header(password) do
97 credentials = "admin:#{password}"
98 "Basic #{Base.encode64(credentials)}"
99 end
100end