podcast manager
3
fork

Configure Feed

Select the types of activity you want to include in your feed.

we have a socket

+350 -27
+3 -1
deno.json
··· 20 20 }, 21 21 22 22 "imports": { 23 + "@repo/": "./src/", 23 24 "@deno/vite-plugin": "npm:@deno/vite-plugin@^1.0.4", 24 25 "@oak/oak": "jsr:@oak/oak@^17.1.4", 25 26 "@std/assert": "jsr:@std/assert@^1.0.12", ··· 41 42 "lib": [ 42 43 "dom", 43 44 "dom.iterable", 44 - "deno.ns" 45 + "deno.ns", 46 + "esnext" 45 47 ], 46 48 "jsx": "react-jsx", 47 49 "jsxImportSource": "react"
+49
src/common/aborts.ts
··· 1 + export type CancellableAbortController = AbortController & { cancel: () => void } 2 + 3 + export function makeAbort(): CancellableAbortController { 4 + let cancelled = false 5 + const controller = new AbortController(); 6 + 7 + return { 8 + cancel: () => { cancelled = true }, 9 + abort: (r) => { !cancelled && controller.abort(r) }, 10 + signal: controller.signal 11 + }; 12 + } 13 + 14 + export function makeTimeoutAbort(ms: number): CancellableAbortController { 15 + const controller = new AbortController(); 16 + const { promise, triggered, cancel } = makeTimeoutPromise(ms); 17 + 18 + controller.signal.addEventListener("abort", () => { 19 + if (!triggered()) 20 + cancel(); 21 + }); 22 + 23 + promise.then(() => { 24 + if (!controller.signal.aborted) 25 + controller.abort(); 26 + }); 27 + 28 + return { 29 + cancel, 30 + abort: controller.abort.bind(controller), 31 + signal: controller.signal, 32 + }; 33 + } 34 + 35 + export function makeTimeoutPromise(ms: number) { 36 + const { promise, resolve } = Promise.withResolvers<void>(); 37 + 38 + let triggered = false 39 + const timeout = setTimeout(() => { 40 + triggered = true; 41 + resolve(); 42 + }, ms); 43 + 44 + return { 45 + promise, 46 + cancel: () => clearTimeout(timeout), 47 + triggered: () => triggered, 48 + }; 49 + }
+21
src/common/blocking-atom.ts
··· 1 + // simple blocking atom, for waiting for a value 2 + // cribbed mostly from https://github.com/ComFreek/async-playground 3 + 4 + import { Semaphore } from "./semaphore.ts"; 5 + 6 + export class BlockingAtom<T> { 7 + #sema = new Semaphore(); 8 + #item: T | undefined; 9 + 10 + set(item: T) { 11 + this.#item = item; 12 + this.#sema.free(); 13 + } 14 + 15 + async get(signal?: AbortSignal) { 16 + if (await this.#sema.take(signal)) 17 + return this.#item; 18 + 19 + return undefined; 20 + } 21 + }
+42
src/common/blocking-queue.ts
··· 1 + // simple blocking queue, for turning streams into async pulls 2 + // cribbed mostly from https://github.com/ComFreek/async-playground 3 + 4 + import { Semaphore } from "./semaphore.ts"; 5 + 6 + export class BlockingQueue<T> { 7 + #sema = new Semaphore(); 8 + #items: T[] = []; 9 + #maxsize: number | undefined; 10 + 11 + constructor(maxsize: number | undefined = 1000) { 12 + this.#maxsize = maxsize ? maxsize : undefined; 13 + } 14 + 15 + get size() { 16 + return this.#items.length; 17 + } 18 + 19 + enqueue(...elements: T[]) { 20 + for (const el of elements) { 21 + if (this.#maxsize && this.size >= this.#maxsize) 22 + throw Error("out of room"); 23 + 24 + this.#items.push(el); 25 + this.#sema.free(); 26 + } 27 + } 28 + 29 + async dequeue(signal?: AbortSignal) { 30 + if (await this.#sema.take(signal)) return this.poll(); 31 + 32 + throw Error("canceled dequeue"); 33 + } 34 + 35 + poll() { 36 + if (this.size > 0) { 37 + return this.#items.shift() as T; 38 + } else { 39 + throw Error("no elements"); 40 + } 41 + } 42 + }
+21
src/common/once.ts
··· 1 + export function onceMaker<F extends (...args: any[]) => void>(outercb?: () => void) { 2 + let done = false; 3 + 4 + return { 5 + fired: () => done, 6 + 7 + once: (innercb: F) => (...args: Parameters<F>): void => { 8 + if (!done) { 9 + done = true; 10 + 11 + outercb?.(); 12 + innercb(...args); 13 + } 14 + }, 15 + 16 + many: (innercb: F) => (...args: Parameters<F>): void => { 17 + if (done) return; 18 + return innercb(...args); 19 + } 20 + }; 21 + }
+52
src/common/semaphore.ts
··· 1 + // simple counting semaphore, for blocking async ops 2 + // cribbed mostly from https://github.com/ComFreek/async-playground 3 + 4 + export class Semaphore { 5 + #counter = 0; 6 + #resolvers: ((taken: boolean) => void)[] = []; 7 + 8 + constructor(count = 0) { 9 + this.#counter = count; 10 + } 11 + 12 + take(signal?: AbortSignal) { 13 + return new Promise<boolean>((resolve) => { 14 + if (signal?.aborted) return resolve(false); 15 + 16 + // if there's resources available, use them 17 + 18 + this.#counter--; 19 + if (this.#counter >= 0) return resolve(true); 20 + 21 + // otherwise add to pending 22 + // and explicitly remove the resolver from the list on abort 23 + 24 + this.#resolvers.push(resolve); 25 + signal?.addEventListener("abort", () => { 26 + const index = this.#resolvers.indexOf(resolve); 27 + if (index >= 0) { 28 + this.#resolvers.splice(index, 1); 29 + this.#counter++; 30 + } 31 + 32 + resolve(false); 33 + }); 34 + }); 35 + } 36 + 37 + poll() { 38 + if (this.#counter <= 0) return false; 39 + 40 + this.#counter--; 41 + return true; 42 + } 43 + 44 + free() { 45 + this.#counter++; 46 + 47 + if (this.#resolvers.length > 0) { 48 + const resolver = this.#resolvers.shift(); 49 + resolver && queueMicrotask(() => resolver(true)); 50 + } 51 + } 52 + }
+106
src/common/socket.ts
··· 1 + import { makeAbort, makeTimeoutAbort } from "./aborts.ts"; 2 + import { BlockingAtom } from "./blocking-atom.ts"; 3 + import { BlockingQueue } from "./blocking-queue.ts"; 4 + import { onceMaker } from "./once.ts"; 5 + 6 + export type StreamConfig = typeof STREAM_CONFIG_DEFAULT; 7 + const STREAM_CONFIG_DEFAULT = { 8 + signal: undefined as AbortSignal | undefined, 9 + queueSize: 1000, 10 + threshPause: 800, 11 + threshResume: 200, 12 + onPause: () => {}, 13 + onResume: () => {}, 14 + }; 15 + 16 + type StreamEvent = 17 + | ['yield', MessageEvent['data']] 18 + | ['end'] 19 + | ['error', Error] 20 + ; 21 + 22 + export async function* streamSocket(ws: WebSocket, config_: Partial<StreamConfig> = {}): AsyncGenerator<unknown> { 23 + const config = { ...STREAM_CONFIG_DEFAULT, ...config_ } 24 + const queue = new BlockingQueue<StreamEvent>(config.queueSize); 25 + const gate = onceMaker(); 26 + 27 + const onMessage = gate.many((m: MessageEvent) => { 28 + queue.enqueue(['yield', m.data]) 29 + 30 + if (queue.size > config.threshPause) 31 + config.onPause?.(); 32 + }); 33 + 34 + const onClose = gate.once(() => queue.enqueue(['end'])) 35 + 36 + const onError = gate.once((e: Error) => { 37 + // todo: why are we getting this on client shutdown? 38 + if (e.message === "Unexpected EOF") 39 + queue.enqueue(['end']) 40 + else 41 + queue.enqueue(['error', e]); 42 + }); 43 + 44 + try { 45 + ws.addEventListener("message", onMessage) 46 + ws.addEventListener("error", onError) 47 + ws.addEventListener("close", onClose) 48 + 49 + while (true) { 50 + const [event, value] = await queue.dequeue(config.signal) 51 + if (queue.size < config.threshResume) 52 + config.onResume?.(); 53 + 54 + switch (event) { 55 + case "yield": 56 + yield value; 57 + continue; 58 + 59 + case "end": 60 + return; 61 + 62 + case "error": 63 + throw value; 64 + } 65 + } 66 + } catch (e) { 67 + if (!config.signal?.aborted) throw e; 68 + } finally { 69 + ws.removeEventListener("message", onMessage) 70 + ws.removeEventListener("error", onError) 71 + ws.removeEventListener("close", onClose) 72 + } 73 + } 74 + 75 + export async function takeSocket(ws: WebSocket, ms?: number): Promise<MessageEvent['data']> { 76 + const atom = new BlockingAtom<MessageEvent['data']>(); 77 + const abort = ms ? makeTimeoutAbort(ms) : makeAbort(); 78 + const gate = onceMaker(); 79 + 80 + // callback functions - consts so we can `off` them on clean up 81 + 82 + const onMessage = gate.once((m: MessageEvent) => atom.set(m.data)); 83 + const onClose = gate.once(() => abort.abort('closed')); 84 + const onError = gate.once((e: Error) => { 85 + if (e.message == "Unexpected EOF") 86 + // todo: why are we getting this on client shutdown? 87 + abort.abort('closed') 88 + else 89 + abort.abort(e) 90 + }); 91 + 92 + try { 93 + ws.addEventListener("message", onMessage) 94 + ws.addEventListener("close", onClose) 95 + ws.addEventListener("error", onError) 96 + 97 + return await atom.get(abort.signal); 98 + } finally { 99 + if (!abort.signal.aborted) 100 + abort.cancel() 101 + 102 + ws.removeEventListener("message", onMessage) 103 + ws.removeEventListener("close", onClose) 104 + ws.removeEventListener("error", onError) 105 + } 106 + }
+20
src/common/strict-map.ts
··· 1 + export class StrictMap<K, V> extends Map<K, V> { 2 + require(key: K): V { 3 + if (!this.has(key)) throw Error(`key is required but not in the map`); 4 + 5 + return this.get(key) as V; 6 + } 7 + 8 + ensure(key: K, maker: () => V): V { 9 + if (!this.has(key)) { 10 + this.set(key, maker()); 11 + } 12 + 13 + return this.get(key) as V; 14 + } 15 + 16 + update(key: K, update: (v?: V) => V) { 17 + const current = this.get(key); 18 + this.set(key, update(current)); 19 + } 20 + }
+2 -2
src/server/main.ts
··· 1 1 import { Application } from "@oak/oak/application"; 2 2 import { parseArgs } from "jsr:@std/cli/parse-args"; 3 3 4 - import { apiMiddleware } from "./routes.api.ts"; 5 - import { socketMiddleware } from "./routes.socket.ts"; 4 + import { apiMiddleware } from "./api/middleware.ts"; 5 + import { socketMiddleware } from "./socket/middleware.ts"; 6 6 import { makeSpaRoute, makeStaticRoute } from "./routes.static.ts"; 7 7 import { notFound } from "./routes.error.ts"; 8 8
src/server/routes.api.ts src/server/api/middleware.ts
-24
src/server/routes.socket.ts
··· 1 - import { Middleware } from "@oak/oak/middleware"; 2 - 3 - export const socketMiddleware: (path: string) => Middleware = 4 - (pathname) => async (ctx, next) => { 5 - // must be an upgradable websocket connection to api root 6 - if (ctx.request.url.pathname !== pathname || !ctx.isUpgradable) { 7 - return await next(); 8 - } 9 - 10 - // the we'll upgrade and connect a handler 11 - const ws = ctx.upgrade(); 12 - ws.onopen = () => { 13 - console.log("Connected to client"); 14 - ws.send("Hello from server!"); 15 - }; 16 - 17 - ws.onmessage = (m) => { 18 - console.log("Got message from client: ", m.data); 19 - ws.send(m.data as string); 20 - ws.close(); 21 - }; 22 - 23 - ws.onclose = () => console.log("Disconncted from client"); 24 - };
+22
src/server/socket/handler.ts
··· 1 + import { streamSocket, takeSocket } from "@repo/common/socket.ts"; 2 + 3 + export async function socketHandler(this: WebSocket) { 4 + try { 5 + this.send('your name?') 6 + 7 + const name = await takeSocket(this, 5000) 8 + if (!name) return 9 + 10 + this.send(`welcome, ${name}`) 11 + for await (const message of streamSocket(this)) { 12 + this.send(`${name} said: ${message}`) 13 + } 14 + } catch (e) { 15 + console.error(e) 16 + } finally { 17 + if (this.readyState !== this.CLOSED) 18 + this.close() 19 + 20 + console.log("kthxbye"); 21 + } 22 + }
+12
src/server/socket/middleware.ts
··· 1 + import { Middleware } from "@oak/oak/middleware"; 2 + import { socketHandler } from "./handler.ts"; 3 + 4 + export const socketMiddleware: (path: string) => Middleware = (pathname) => async (ctx, next) => { 5 + // must be an upgradable websocket connection to api root 6 + if (ctx.request.url.pathname !== pathname || !ctx.isUpgradable) 7 + return await next() 8 + 9 + // then we'll upgrade and connect a handler 10 + const ws = ctx.upgrade(); 11 + ws.onopen = socketHandler; 12 + };