offline-first, p2p synced, atproto enabled, feed reader
at main 5.9 kB view raw
1import {MessageEvent, WebSocket} from 'isomorphic-ws' 2import {z} from 'zod/mini' 3 4import {combineSignals} from '#lib/async/aborts' 5import {BlockingAtom} from '#lib/async/blocking-atom' 6import {BlockingQueue} from '#lib/async/blocking-queue' 7import {Breaker} from '#lib/breaker' 8import {ProtocolError, normalizeError} from '#lib/errors' 9 10/** 11 * put well-typed JSON data on the socket 12 */ 13export function putSocket<T>(ws: WebSocket, data: T): void { 14 const json = JSON.stringify(data) 15 ws.send(json) 16} 17 18/** 19 * given a websocket, wait and take a single message off and return it 20 * 21 * @example 22 * ``` 23 * const ws = new WebSocket("wss://example.com/stream") 24 * const timeout = timeoutSignal(5000) 25 * 26 * try { 27 * const msg = await takeSocket(ws, timeout.signal) 28 * doWhatever(msg) 29 * } finally { 30 * timeout.cleanup() 31 * if (ws.readyState !== ws.CLOSED) 32 * ws.close(); 33 * } 34 * ``` 35 */ 36export async function takeSocket(ws: WebSocket, signal?: AbortSignal): Promise<unknown> { 37 signal?.throwIfAborted() 38 39 const atom = new BlockingAtom() 40 const breaker = new Breaker() 41 42 const error = new AbortController() 43 const multisignal = combineSignals(error.signal, signal) 44 45 const onMessage = breaker.tripThen((m: MessageEvent) => { 46 atom.set(m.data) 47 }) 48 49 const onError = breaker.tripThen(() => { 50 // websocket errors are indistinguishable intentionally (WhatWG) 51 error.abort(new Error('socket error')) 52 }) 53 54 const onClose = breaker.tripThen(() => { 55 error.abort(new Error('socket closed')) 56 }) 57 58 try { 59 ws.addEventListener('message', onMessage) 60 ws.addEventListener('error', onError) 61 ws.addEventListener('close', onClose) 62 63 const data = await atom.get(multisignal) 64 if (!data) { 65 const cause = normalizeError(multisignal.reason) 66 throw new ProtocolError('socket read aborted', 408, {cause}) 67 } 68 69 return data 70 } finally { 71 ws.removeEventListener('message', onMessage) 72 ws.removeEventListener('error', onError) 73 ws.removeEventListener('close', onClose) 74 } 75} 76 77/** exactly take socket, but will additionally apply a json decoding */ 78export async function takeSocketSchema<T>( 79 ws: WebSocket, 80 schema: z.ZodMiniType<T>, 81 signal?: AbortSignal, 82): Promise<T> { 83 const data = await takeSocket(ws, signal) 84 return schema.parseAsync(data) 85} 86 87/** stream configuration options */ 88export interface ConfigProps { 89 /** maximum size of the stream buffer */ 90 maxDepth: number 91 92 /** skip errors, or raise? */ 93 skipErrors?: boolean 94 95 /** signal for abort */ 96 signal?: AbortSignal 97} 98 99export const STREAM_CONFIG_DEFAULT: ConfigProps = { 100 maxDepth: 1000, 101} 102 103// symbols for iteration protocol 104 105const yield$ = Symbol('yield$') 106const error$ = Symbol('error$') 107const end$ = Symbol('end$') 108 109type StreamYield = [typeof yield$, unknown] | [typeof error$, Error] | [typeof end$] 110 111/** 112 * given a websocket, stream messages off the socket as an async generator 113 * 114 * @example 115 * ``` 116 * const ws = new WebSocket("wss://example.com/stream") 117 * const timeout = timeoutSignal(5000) 118 * 119 * try { 120 * // signal will fire in 5s, so we'll take as many messages as we get until then 121 * for await (const msg of streamSocket(ws, { signal: timeout.signal })) { 122 * doWhatever(msg) 123 * } 124 * } finally { 125 * timeout.cleanup() 126 * if (ws.readyState !== ws.CLOSED) 127 * ws.close(); 128 * } 129 * ``` 130 */ 131export async function* streamSocket(ws: WebSocket, config_?: Partial<ConfigProps>) { 132 const {signal, ...config} = {...STREAM_CONFIG_DEFAULT, ...(config_ || {})} 133 signal?.throwIfAborted() 134 135 // await incoming messages without blocking 136 const queue = new BlockingQueue<StreamYield>(config.maxDepth) 137 138 // if true, we're ignoring incoming messages until we drop the queue 139 let inBackoffMode: boolean = false 140 const backoffThresh = Math.floor(config.maxDepth * 0.9) 141 142 // we don't want to keep processing after we've been closed 143 const breaker = new Breaker() 144 145 // define callback functions (need to be able to reference for removing them) 146 147 const onMessage = breaker.untilTripped((m: MessageEvent) => { 148 if (inBackoffMode) { 149 console.warn('ignoring incoming message due to backpressure protection!') 150 return 151 } 152 153 queue.enqueue([yield$, m.data]) 154 inBackoffMode = queue.depth > backoffThresh 155 if (inBackoffMode) { 156 console.warn('message stream will start dropping messages due to backpressure!') 157 } 158 }) 159 160 const onError = breaker.tripThen(() => { 161 // websocket errors are indistinguishable intentionally (WhatWG) 162 queue.enqueue([error$, new Error('error from the socket')]) 163 }) 164 165 const onClose = breaker.tripThen(() => { 166 queue.enqueue([end$]) 167 }) 168 169 // finally get into our loop 170 try { 171 ws.addEventListener('message', onMessage) 172 ws.addEventListener('error', onError) 173 ws.addEventListener('close', onClose) 174 175 while (true) { 176 signal?.throwIfAborted() 177 178 const [event, value] = await queue.dequeue(signal) 179 180 // TODO: eslint thinks inBackoffMode is always falsey... 181 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition 182 if (inBackoffMode && queue.depth < backoffThresh) { 183 console.log('message stream will stop dropping messages due to eased backpressure') 184 inBackoffMode = false 185 } 186 187 switch (event) { 188 case yield$: 189 yield value 190 continue 191 case error$: 192 throw value 193 case end$: 194 return 195 } 196 } 197 } finally { 198 ws.removeEventListener('message', onMessage) 199 ws.removeEventListener('error', onError) 200 ws.removeEventListener('close', onClose) 201 } 202} 203 204export async function* streamSocketSchema<T>( 205 ws: WebSocket, 206 schema: z.ZodMiniType<T>, 207 config?: Partial<ConfigProps>, 208): AsyncGenerator<[z.core.util.SafeParseResult<T>, unknown]> { 209 for await (const message of streamSocket(ws, config)) { 210 yield [await schema.safeParseAsync(message), message] 211 } 212}