Elixir ATProtocol ingestion and sync library.
at main 201 lines 5.9 kB view raw
1defmodule Drinkup.Jetstream.Socket do 2 @moduledoc """ 3 WebSocket connection handler for Jetstream event streams. 4 5 Implements the Drinkup.Socket behaviour to manage connections to a Jetstream 6 service, handling zstd-compressed JSON events and dispatching them to the 7 configured consumer. 8 """ 9 10 use Drinkup.Socket 11 12 require Logger 13 alias Drinkup.Jetstream.{Event, Options} 14 15 @dict_path "priv/jetstream/zstd_dictionary" 16 @external_resource @dict_path 17 @zstd_dict File.read!(@dict_path) 18 19 @impl true 20 def init(opts) do 21 options = Keyword.fetch!(opts, :options) 22 23 {:ok, %{options: options, host: options.host, cursor: options.cursor}} 24 end 25 26 def start_link(%Options{} = options, statem_opts) do 27 socket_opts = [ 28 host: options.host, 29 options: options 30 ] 31 32 statem_opts = 33 Keyword.put( 34 statem_opts, 35 :name, 36 {:via, Registry, {Drinkup.Registry, {options.name, JetstreamSocket}}} 37 ) 38 39 Drinkup.Socket.start_link(__MODULE__, socket_opts, statem_opts) 40 end 41 42 @impl true 43 def build_path(%{options: options}) do 44 query_params = [compress: "true"] 45 46 query_params = 47 query_params 48 |> put_collections(options.wanted_collections) 49 |> put_dids(options.wanted_dids) 50 |> put_cursor(options.cursor) 51 |> put_max_size(options.max_message_size_bytes) 52 |> put_require_hello(options.require_hello) 53 54 "/subscribe?" <> URI.encode_query(query_params) 55 end 56 57 @impl true 58 def handle_frame( 59 {:binary, compressed_data}, 60 {%{options: options} = data, _conn, _stream} 61 ) do 62 case decompress_and_parse(compressed_data) do 63 {:ok, payload} -> 64 case Event.from(payload) do 65 nil -> 66 # Event.from already logs warnings for unrecognized events 67 :noop 68 69 event -> 70 Event.dispatch(event, options) 71 # Update cursor with the event's time_us 72 new_cursor = Map.get(payload, "time_us") 73 {:ok, %{data | cursor: new_cursor}} 74 end 75 76 # TODO: sometimes getting ZSTD_CONTENTSIZE_UNKNOWN 77 {:error, reason} -> 78 Logger.error( 79 "[Drinkup.Jetstream.Socket] Failed to decompress/parse frame: #{inspect(reason)}" 80 ) 81 82 :noop 83 end 84 end 85 86 @impl true 87 def handle_frame({:text, json}, {%{options: options} = data, _conn, _stream}) do 88 # Text frames shouldn't happen since we force compression, but handle them anyway 89 case Jason.decode(json) do 90 {:ok, payload} -> 91 case Event.from(payload) do 92 nil -> 93 :noop 94 95 event -> 96 Event.dispatch(event, options) 97 new_cursor = Map.get(payload, "time_us") 98 {:ok, %{data | cursor: new_cursor}} 99 end 100 101 {:error, reason} -> 102 Logger.error("[Drinkup.Jetstream.Socket] Failed to decode JSON: #{inspect(reason)}") 103 :noop 104 end 105 end 106 107 @impl true 108 def handle_frame(:close, _data) do 109 Logger.info("[Drinkup.Jetstream.Socket] WebSocket closed, reason unknown") 110 nil 111 end 112 113 @impl true 114 def handle_frame({:close, errno, reason}, _data) do 115 Logger.info( 116 "[Drinkup.Jetstream.Socket] WebSocket closed, errno: #{errno}, reason: #{inspect(reason)}" 117 ) 118 119 nil 120 end 121 122 @impl true 123 def handle_connected({user_data, conn, stream}) do 124 # Register connection for options updates 125 Registry.register( 126 Drinkup.Registry, 127 {user_data.options.name, JetstreamConnection}, 128 {conn, stream} 129 ) 130 131 {:ok, user_data} 132 end 133 134 @impl true 135 def handle_disconnected(_reason, {user_data, _conn, _stream}) do 136 # Unregister connection when disconnected 137 Registry.unregister(Drinkup.Registry, {user_data.options.name, JetstreamConnection}) 138 {:ok, user_data} 139 end 140 141 # Can't use `create_ddict` as the value of `@zstd_dict` because it returns a reference :( 142 @spec get_dictionary() :: reference() 143 defp get_dictionary() do 144 case :ezstd.create_ddict(@zstd_dict) do 145 {:error, reason} -> 146 raise ArgumentError, 147 "somehow failed to created Jetstream's ZSTD dictionary: #{inspect(reason)}" 148 149 dict -> 150 dict 151 end 152 end 153 154 @spec decompress_and_parse(binary()) :: {:ok, map()} | {:error, term()} 155 defp decompress_and_parse(compressed_data) do 156 with ctx when is_reference(ctx) <- 157 :ezstd.create_decompression_context(byte_size(compressed_data)), 158 :ok <- :ezstd.select_ddict(ctx, get_dictionary()), 159 iolist when is_list(iolist) <- :ezstd.decompress_streaming(ctx, compressed_data), 160 decompressed <- IO.iodata_to_binary(iolist), 161 {:ok, payload} <- JSON.decode(decompressed) do 162 {:ok, payload} 163 else 164 {:error, reason} -> {:error, reason} 165 end 166 end 167 168 @spec put_collections(keyword(), [String.t()]) :: keyword() 169 defp put_collections(params, []), do: params 170 171 defp put_collections(params, collections) when is_list(collections) do 172 Enum.reduce(collections, params, fn collection, acc -> 173 [{:wantedCollections, collection} | acc] 174 end) 175 end 176 177 @spec put_dids(keyword(), [String.t()]) :: keyword() 178 defp put_dids(params, []), do: params 179 180 defp put_dids(params, dids) when is_list(dids) do 181 Enum.reduce(dids, params, fn did, acc -> 182 [{:wantedDids, did} | acc] 183 end) 184 end 185 186 @spec put_cursor(keyword(), integer() | nil) :: keyword() 187 defp put_cursor(params, nil), do: params 188 189 defp put_cursor(params, cursor) when is_integer(cursor), do: [{:cursor, cursor} | params] 190 191 @spec put_max_size(keyword(), integer() | nil) :: keyword() 192 defp put_max_size(params, nil), do: params 193 194 defp put_max_size(params, max_size) when is_integer(max_size), 195 do: [{:maxMessageSizeBytes, max_size} | params] 196 197 @spec put_require_hello(keyword(), boolean()) :: keyword() 198 defp put_require_hello(params, false), do: params 199 200 defp put_require_hello(params, true), do: [{:requireHello, "true"} | params] 201end