Elixir ATProtocol ingestion and sync library.
1defmodule Drinkup.Jetstream.Options do
2 @moduledoc """
3 Configuration options for Jetstream event stream connection.
4
5 Jetstream is a simplified JSON event stream that converts the CBOR-encoded
6 ATProto Firehose into lightweight, friendly JSON. It provides zstd compression
7 and filtering capabilities for collections and DIDs.
8
9 ## Options
10
11 - `:consumer` (required) - Module implementing `Drinkup.Jetstream.Consumer` behaviour
12 - `:name` - Unique name for this Jetstream instance in the supervision tree (default: `Drinkup.Jetstream`)
13 - `:host` - Jetstream service URL (default: `"wss://jetstream2.us-east.bsky.network"`)
14 - `:wanted_collections` - List of collection NSIDs or prefixes to filter (default: `[]` = all collections)
15 - `:wanted_dids` - List of DIDs to filter (default: `[]` = all repos)
16 - `:cursor` - Unix microseconds timestamp to resume from (default: `nil` = live-tail)
17 - `:require_hello` - Pause replay until first options update is sent (default: `false`)
18 - `:max_message_size_bytes` - Maximum message size to receive (default: `nil` = no limit)
19
20 ## Example
21
22 %{
23 consumer: MyJetstreamConsumer,
24 name: MyJetstream,
25 host: "wss://jetstream2.us-east.bsky.network",
26 wanted_collections: ["app.bsky.feed.post", "app.bsky.feed.like"],
27 wanted_dids: ["did:plc:abc123"],
28 cursor: 1725519626134432
29 }
30
31 ## Collection Filters
32
33 The `wanted_collections` option supports:
34 - Full NSIDs: `"app.bsky.feed.post"`
35 - NSID prefixes: `"app.bsky.graph.*"`, `"app.bsky.*"`
36
37 You can specify up to 100 collection filters.
38
39 ## DID Filters
40
41 The `wanted_dids` option accepts a list of DID strings.
42 You can specify up to 10,000 DIDs.
43
44 ## Compression
45
46 Jetstream always uses zstd compression with a custom dictionary.
47 This is handled automatically by the socket implementation.
48 """
49
50 use TypedStruct
51
52 @default_host "wss://jetstream2.us-east.bsky.network"
53
54 @typedoc """
55 Map of configuration options accepted by `Drinkup.Jetstream.child_spec/1`.
56 """
57 @type options() :: %{
58 required(:consumer) => consumer(),
59 optional(:name) => name(),
60 optional(:host) => host(),
61 optional(:wanted_collections) => wanted_collections(),
62 optional(:wanted_dids) => wanted_dids(),
63 optional(:cursor) => cursor(),
64 optional(:require_hello) => require_hello(),
65 optional(:max_message_size_bytes) => max_message_size_bytes()
66 }
67
68 @typedoc """
69 Module implementing the `Drinkup.Jetstream.Consumer` behaviour.
70 """
71 @type consumer() :: module()
72
73 @typedoc """
74 Unique identifier for this Jetstream instance in the supervision tree.
75
76 Used for Registry lookups and naming child processes.
77 """
78 @type name() :: atom()
79
80 @typedoc """
81 WebSocket URL of the Jetstream service.
82
83 Defaults to `"wss://jetstream2.us-east.bsky.network"` which is a public Bluesky instance.
84 """
85 @type host() :: String.t()
86
87 @typedoc """
88 List of collection NSIDs or NSID prefixes to filter.
89
90 Examples:
91 - `["app.bsky.feed.post"]` - Only posts
92 - `["app.bsky.graph.*"]` - All graph collections
93 - `["app.bsky.*"]` - All Bluesky app collections
94
95 You can specify up to 100 collection filters.
96 Defaults to `[]` (all collections).
97 """
98 @type wanted_collections() :: [String.t()]
99
100 @typedoc """
101 List of DIDs to filter events by.
102
103 You can specify up to 10,000 DIDs.
104 Defaults to `[]` (all repos).
105 """
106 @type wanted_dids() :: [String.t()]
107
108 @typedoc """
109 Unix microseconds timestamp to resume streaming from.
110
111 When provided, Jetstream will replay events starting from this timestamp.
112 Useful for resuming after a restart without missing events. The cursor is
113 automatically tracked and updated as events are received.
114
115 Defaults to `nil` (live-tail from current time).
116 """
117 @type cursor() :: pos_integer() | nil
118
119 @typedoc """
120 Whether to pause replay/live-tail until the first options update is sent.
121
122 When `true`, the connection will wait for a `Drinkup.Jetstream.update_options/2`
123 call before starting to receive events.
124
125 Defaults to `false`.
126 """
127 @type require_hello() :: boolean()
128
129 @typedoc """
130 Maximum message size in bytes that the client would like to receive.
131
132 Zero or `nil` means no limit. Negative values are treated as zero.
133 Defaults to `nil` (no maximum size).
134 """
135 @type max_message_size_bytes() :: integer() | nil
136
137 typedstruct do
138 field :consumer, consumer(), enforce: true
139 field :name, name(), default: Drinkup.Jetstream
140 field :host, host(), default: @default_host
141 # TODO: Add NSID prefix validation once available in atex
142 field :wanted_collections, wanted_collections(), default: []
143 field :wanted_dids, wanted_dids(), default: []
144 field :cursor, cursor()
145 field :require_hello, require_hello(), default: false
146 field :max_message_size_bytes, max_message_size_bytes()
147 end
148
149 @spec from(options()) :: t()
150 def from(%{consumer: _} = options), do: struct(__MODULE__, options)
151end