import {MessageEvent, WebSocket} from 'isomorphic-ws' import {z} from 'zod/mini' import {combineSignals} from '#lib/async/aborts' import {BlockingAtom} from '#lib/async/blocking-atom' import {BlockingQueue} from '#lib/async/blocking-queue' import {Breaker} from '#lib/breaker' import {ProtocolError, normalizeError} from '#lib/errors' /** * put well-typed JSON data on the socket */ export function putSocket(ws: WebSocket, data: T): void { const json = JSON.stringify(data) ws.send(json) } /** * given a websocket, wait and take a single message off and return it * * @example * ``` * const ws = new WebSocket("wss://example.com/stream") * const timeout = timeoutSignal(5000) * * try { * const msg = await takeSocket(ws, timeout.signal) * doWhatever(msg) * } finally { * timeout.cleanup() * if (ws.readyState !== ws.CLOSED) * ws.close(); * } * ``` */ export async function takeSocket(ws: WebSocket, signal?: AbortSignal): Promise { signal?.throwIfAborted() const atom = new BlockingAtom() const breaker = new Breaker() const error = new AbortController() const multisignal = combineSignals(error.signal, signal) const onMessage = breaker.tripThen((m: MessageEvent) => { atom.set(m.data) }) const onError = breaker.tripThen(() => { // websocket errors are indistinguishable intentionally (WhatWG) error.abort(new Error('socket error')) }) const onClose = breaker.tripThen(() => { error.abort(new Error('socket closed')) }) try { ws.addEventListener('message', onMessage) ws.addEventListener('error', onError) ws.addEventListener('close', onClose) const data = await atom.get(multisignal) if (!data) { const cause = normalizeError(multisignal.reason) throw new ProtocolError('socket read aborted', 408, {cause}) } return data } finally { ws.removeEventListener('message', onMessage) ws.removeEventListener('error', onError) ws.removeEventListener('close', onClose) } } /** exactly take socket, but will additionally apply a json decoding */ export async function takeSocketSchema( ws: WebSocket, schema: z.ZodMiniType, signal?: AbortSignal, ): Promise { const data = await takeSocket(ws, signal) return schema.parseAsync(data) } /** stream configuration options */ export interface ConfigProps { /** maximum size of the stream buffer */ maxDepth: number /** skip errors, or raise? */ skipErrors?: boolean /** signal for abort */ signal?: AbortSignal } export const STREAM_CONFIG_DEFAULT: ConfigProps = { maxDepth: 1000, } // symbols for iteration protocol const yield$ = Symbol('yield$') const error$ = Symbol('error$') const end$ = Symbol('end$') type StreamYield = [typeof yield$, unknown] | [typeof error$, Error] | [typeof end$] /** * given a websocket, stream messages off the socket as an async generator * * @example * ``` * const ws = new WebSocket("wss://example.com/stream") * const timeout = timeoutSignal(5000) * * try { * // signal will fire in 5s, so we'll take as many messages as we get until then * for await (const msg of streamSocket(ws, { signal: timeout.signal })) { * doWhatever(msg) * } * } finally { * timeout.cleanup() * if (ws.readyState !== ws.CLOSED) * ws.close(); * } * ``` */ export async function* streamSocket(ws: WebSocket, config_?: Partial) { const {signal, ...config} = {...STREAM_CONFIG_DEFAULT, ...(config_ || {})} signal?.throwIfAborted() // await incoming messages without blocking const queue = new BlockingQueue(config.maxDepth) // if true, we're ignoring incoming messages until we drop the queue let inBackoffMode: boolean = false const backoffThresh = Math.floor(config.maxDepth * 0.9) // we don't want to keep processing after we've been closed const breaker = new Breaker() // define callback functions (need to be able to reference for removing them) const onMessage = breaker.untilTripped((m: MessageEvent) => { if (inBackoffMode) { console.warn('ignoring incoming message due to backpressure protection!') return } queue.enqueue([yield$, m.data]) inBackoffMode = queue.depth > backoffThresh if (inBackoffMode) { console.warn('message stream will start dropping messages due to backpressure!') } }) const onError = breaker.tripThen(() => { // websocket errors are indistinguishable intentionally (WhatWG) queue.enqueue([error$, new Error('error from the socket')]) }) const onClose = breaker.tripThen(() => { queue.enqueue([end$]) }) // finally get into our loop try { ws.addEventListener('message', onMessage) ws.addEventListener('error', onError) ws.addEventListener('close', onClose) while (true) { signal?.throwIfAborted() const [event, value] = await queue.dequeue(signal) // TODO: eslint thinks inBackoffMode is always falsey... // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition if (inBackoffMode && queue.depth < backoffThresh) { console.log('message stream will stop dropping messages due to eased backpressure') inBackoffMode = false } switch (event) { case yield$: yield value continue case error$: throw value case end$: return } } } finally { ws.removeEventListener('message', onMessage) ws.removeEventListener('error', onError) ws.removeEventListener('close', onClose) } } export async function* streamSocketSchema( ws: WebSocket, schema: z.ZodMiniType, config?: Partial, ): AsyncGenerator<[z.core.util.SafeParseResult, unknown]> { for await (const message of streamSocket(ws, config)) { yield [await schema.safeParseAsync(message), message] } }