Elixir ATProtocol ingestion and sync library.
1defmodule Drinkup.Jetstream.Socket do
2 @moduledoc """
3 WebSocket connection handler for Jetstream event streams.
4
5 Implements the Drinkup.Socket behaviour to manage connections to a Jetstream
6 service, handling zstd-compressed JSON events and dispatching them to the
7 configured consumer.
8 """
9
10 use Drinkup.Socket
11
12 require Logger
13 alias Drinkup.Jetstream.{Event, Options}
14
15 @dict_path "priv/jetstream/zstd_dictionary"
16 @external_resource @dict_path
17 @zstd_dict File.read!(@dict_path)
18
19 @impl true
20 def init(opts) do
21 options = Keyword.fetch!(opts, :options)
22
23 {:ok, %{options: options, host: options.host, cursor: options.cursor}}
24 end
25
26 def start_link(%Options{} = options, statem_opts) do
27 socket_opts = [
28 host: options.host,
29 options: options
30 ]
31
32 statem_opts =
33 Keyword.put(
34 statem_opts,
35 :name,
36 {:via, Registry, {Drinkup.Registry, {options.name, JetstreamSocket}}}
37 )
38
39 Drinkup.Socket.start_link(__MODULE__, socket_opts, statem_opts)
40 end
41
42 @impl true
43 def build_path(%{options: options}) do
44 query_params = [compress: "true"]
45
46 query_params =
47 query_params
48 |> put_collections(options.wanted_collections)
49 |> put_dids(options.wanted_dids)
50 |> put_cursor(options.cursor)
51 |> put_max_size(options.max_message_size_bytes)
52 |> put_require_hello(options.require_hello)
53
54 "/subscribe?" <> URI.encode_query(query_params)
55 end
56
57 @impl true
58 def handle_frame(
59 {:binary, compressed_data},
60 {%{options: options} = data, _conn, _stream}
61 ) do
62 case decompress_and_parse(compressed_data) do
63 {:ok, payload} ->
64 case Event.from(payload) do
65 nil ->
66 # Event.from already logs warnings for unrecognized events
67 :noop
68
69 event ->
70 Event.dispatch(event, options)
71 # Update cursor with the event's time_us
72 new_cursor = Map.get(payload, "time_us")
73 {:ok, %{data | cursor: new_cursor}}
74 end
75
76 # TODO: sometimes getting ZSTD_CONTENTSIZE_UNKNOWN
77 {:error, reason} ->
78 Logger.error(
79 "[Drinkup.Jetstream.Socket] Failed to decompress/parse frame: #{inspect(reason)}"
80 )
81
82 :noop
83 end
84 end
85
86 @impl true
87 def handle_frame({:text, json}, {%{options: options} = data, _conn, _stream}) do
88 # Text frames shouldn't happen since we force compression, but handle them anyway
89 case Jason.decode(json) do
90 {:ok, payload} ->
91 case Event.from(payload) do
92 nil ->
93 :noop
94
95 event ->
96 Event.dispatch(event, options)
97 new_cursor = Map.get(payload, "time_us")
98 {:ok, %{data | cursor: new_cursor}}
99 end
100
101 {:error, reason} ->
102 Logger.error("[Drinkup.Jetstream.Socket] Failed to decode JSON: #{inspect(reason)}")
103 :noop
104 end
105 end
106
107 @impl true
108 def handle_frame(:close, _data) do
109 Logger.info("[Drinkup.Jetstream.Socket] WebSocket closed, reason unknown")
110 nil
111 end
112
113 @impl true
114 def handle_frame({:close, errno, reason}, _data) do
115 Logger.info(
116 "[Drinkup.Jetstream.Socket] WebSocket closed, errno: #{errno}, reason: #{inspect(reason)}"
117 )
118
119 nil
120 end
121
122 @impl true
123 def handle_connected({user_data, conn, stream}) do
124 # Register connection for options updates
125 Registry.register(
126 Drinkup.Registry,
127 {user_data.options.name, JetstreamConnection},
128 {conn, stream}
129 )
130
131 {:ok, user_data}
132 end
133
134 @impl true
135 def handle_disconnected(_reason, {user_data, _conn, _stream}) do
136 # Unregister connection when disconnected
137 Registry.unregister(Drinkup.Registry, {user_data.options.name, JetstreamConnection})
138 {:ok, user_data}
139 end
140
141 # Can't use `create_ddict` as the value of `@zstd_dict` because it returns a reference :(
142 @spec get_dictionary() :: reference()
143 defp get_dictionary() do
144 case :ezstd.create_ddict(@zstd_dict) do
145 {:error, reason} ->
146 raise ArgumentError,
147 "somehow failed to created Jetstream's ZSTD dictionary: #{inspect(reason)}"
148
149 dict ->
150 dict
151 end
152 end
153
154 @spec decompress_and_parse(binary()) :: {:ok, map()} | {:error, term()}
155 defp decompress_and_parse(compressed_data) do
156 with ctx when is_reference(ctx) <-
157 :ezstd.create_decompression_context(byte_size(compressed_data)),
158 :ok <- :ezstd.select_ddict(ctx, get_dictionary()),
159 iolist when is_list(iolist) <- :ezstd.decompress_streaming(ctx, compressed_data),
160 decompressed <- IO.iodata_to_binary(iolist),
161 {:ok, payload} <- JSON.decode(decompressed) do
162 {:ok, payload}
163 else
164 {:error, reason} -> {:error, reason}
165 end
166 end
167
168 @spec put_collections(keyword(), [String.t()]) :: keyword()
169 defp put_collections(params, []), do: params
170
171 defp put_collections(params, collections) when is_list(collections) do
172 Enum.reduce(collections, params, fn collection, acc ->
173 [{:wantedCollections, collection} | acc]
174 end)
175 end
176
177 @spec put_dids(keyword(), [String.t()]) :: keyword()
178 defp put_dids(params, []), do: params
179
180 defp put_dids(params, dids) when is_list(dids) do
181 Enum.reduce(dids, params, fn did, acc ->
182 [{:wantedDids, did} | acc]
183 end)
184 end
185
186 @spec put_cursor(keyword(), integer() | nil) :: keyword()
187 defp put_cursor(params, nil), do: params
188
189 defp put_cursor(params, cursor) when is_integer(cursor), do: [{:cursor, cursor} | params]
190
191 @spec put_max_size(keyword(), integer() | nil) :: keyword()
192 defp put_max_size(params, nil), do: params
193
194 defp put_max_size(params, max_size) when is_integer(max_size),
195 do: [{:maxMessageSizeBytes, max_size} | params]
196
197 @spec put_require_hello(keyword(), boolean()) :: keyword()
198 defp put_require_hello(params, false), do: params
199
200 defp put_require_hello(params, true), do: [{:requireHello, "true"} | params]
201end