offline-first, p2p synced, atproto enabled, feed reader
1import {MessageEvent, WebSocket} from 'isomorphic-ws'
2import {z} from 'zod/mini'
3
4import {combineSignals} from '#lib/async/aborts'
5import {BlockingAtom} from '#lib/async/blocking-atom'
6import {BlockingQueue} from '#lib/async/blocking-queue'
7import {Breaker} from '#lib/breaker'
8import {ProtocolError, normalizeError} from '#lib/errors'
9
10/**
11 * put well-typed JSON data on the socket
12 */
13export function putSocket<T>(ws: WebSocket, data: T): void {
14 const json = JSON.stringify(data)
15 ws.send(json)
16}
17
18/**
19 * given a websocket, wait and take a single message off and return it
20 *
21 * @example
22 * ```
23 * const ws = new WebSocket("wss://example.com/stream")
24 * const timeout = timeoutSignal(5000)
25 *
26 * try {
27 * const msg = await takeSocket(ws, timeout.signal)
28 * doWhatever(msg)
29 * } finally {
30 * timeout.cleanup()
31 * if (ws.readyState !== ws.CLOSED)
32 * ws.close();
33 * }
34 * ```
35 */
36export async function takeSocket(ws: WebSocket, signal?: AbortSignal): Promise<unknown> {
37 signal?.throwIfAborted()
38
39 const atom = new BlockingAtom()
40 const breaker = new Breaker()
41
42 const error = new AbortController()
43 const multisignal = combineSignals(error.signal, signal)
44
45 const onMessage = breaker.tripThen((m: MessageEvent) => {
46 atom.set(m.data)
47 })
48
49 const onError = breaker.tripThen(() => {
50 // websocket errors are indistinguishable intentionally (WhatWG)
51 error.abort(new Error('socket error'))
52 })
53
54 const onClose = breaker.tripThen(() => {
55 error.abort(new Error('socket closed'))
56 })
57
58 try {
59 ws.addEventListener('message', onMessage)
60 ws.addEventListener('error', onError)
61 ws.addEventListener('close', onClose)
62
63 const data = await atom.get(multisignal)
64 if (!data) {
65 const cause = normalizeError(multisignal.reason)
66 throw new ProtocolError('socket read aborted', 408, {cause})
67 }
68
69 return data
70 } finally {
71 ws.removeEventListener('message', onMessage)
72 ws.removeEventListener('error', onError)
73 ws.removeEventListener('close', onClose)
74 }
75}
76
77/** exactly take socket, but will additionally apply a json decoding */
78export async function takeSocketSchema<T>(
79 ws: WebSocket,
80 schema: z.ZodMiniType<T>,
81 signal?: AbortSignal,
82): Promise<T> {
83 const data = await takeSocket(ws, signal)
84 return schema.parseAsync(data)
85}
86
87/** stream configuration options */
88export interface ConfigProps {
89 /** maximum size of the stream buffer */
90 maxDepth: number
91
92 /** skip errors, or raise? */
93 skipErrors?: boolean
94
95 /** signal for abort */
96 signal?: AbortSignal
97}
98
99export const STREAM_CONFIG_DEFAULT: ConfigProps = {
100 maxDepth: 1000,
101}
102
103// symbols for iteration protocol
104
105const yield$ = Symbol('yield$')
106const error$ = Symbol('error$')
107const end$ = Symbol('end$')
108
109type StreamYield = [typeof yield$, unknown] | [typeof error$, Error] | [typeof end$]
110
111/**
112 * given a websocket, stream messages off the socket as an async generator
113 *
114 * @example
115 * ```
116 * const ws = new WebSocket("wss://example.com/stream")
117 * const timeout = timeoutSignal(5000)
118 *
119 * try {
120 * // signal will fire in 5s, so we'll take as many messages as we get until then
121 * for await (const msg of streamSocket(ws, { signal: timeout.signal })) {
122 * doWhatever(msg)
123 * }
124 * } finally {
125 * timeout.cleanup()
126 * if (ws.readyState !== ws.CLOSED)
127 * ws.close();
128 * }
129 * ```
130 */
131export async function* streamSocket(ws: WebSocket, config_?: Partial<ConfigProps>) {
132 const {signal, ...config} = {...STREAM_CONFIG_DEFAULT, ...(config_ || {})}
133 signal?.throwIfAborted()
134
135 // await incoming messages without blocking
136 const queue = new BlockingQueue<StreamYield>(config.maxDepth)
137
138 // if true, we're ignoring incoming messages until we drop the queue
139 let inBackoffMode: boolean = false
140 const backoffThresh = Math.floor(config.maxDepth * 0.9)
141
142 // we don't want to keep processing after we've been closed
143 const breaker = new Breaker()
144
145 // define callback functions (need to be able to reference for removing them)
146
147 const onMessage = breaker.untilTripped((m: MessageEvent) => {
148 if (inBackoffMode) {
149 console.warn('ignoring incoming message due to backpressure protection!')
150 return
151 }
152
153 queue.enqueue([yield$, m.data])
154 inBackoffMode = queue.depth > backoffThresh
155 if (inBackoffMode) {
156 console.warn('message stream will start dropping messages due to backpressure!')
157 }
158 })
159
160 const onError = breaker.tripThen(() => {
161 // websocket errors are indistinguishable intentionally (WhatWG)
162 queue.enqueue([error$, new Error('error from the socket')])
163 })
164
165 const onClose = breaker.tripThen(() => {
166 queue.enqueue([end$])
167 })
168
169 // finally get into our loop
170 try {
171 ws.addEventListener('message', onMessage)
172 ws.addEventListener('error', onError)
173 ws.addEventListener('close', onClose)
174
175 while (true) {
176 signal?.throwIfAborted()
177
178 const [event, value] = await queue.dequeue(signal)
179
180 // TODO: eslint thinks inBackoffMode is always falsey...
181 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
182 if (inBackoffMode && queue.depth < backoffThresh) {
183 console.log('message stream will stop dropping messages due to eased backpressure')
184 inBackoffMode = false
185 }
186
187 switch (event) {
188 case yield$:
189 yield value
190 continue
191 case error$:
192 throw value
193 case end$:
194 return
195 }
196 }
197 } finally {
198 ws.removeEventListener('message', onMessage)
199 ws.removeEventListener('error', onError)
200 ws.removeEventListener('close', onClose)
201 }
202}
203
204export async function* streamSocketSchema<T>(
205 ws: WebSocket,
206 schema: z.ZodMiniType<T>,
207 config?: Partial<ConfigProps>,
208): AsyncGenerator<[z.core.util.SafeParseResult<T>, unknown]> {
209 for await (const message of streamSocket(ws, config)) {
210 yield [await schema.safeParseAsync(message), message]
211 }
212}