// simple blocking queue, for turning streams into async pulls // cribbed mostly from https://github.com/ComFreek/async-playground import { Semaphore } from "./semaphore.ts"; export class BlockingQueue { #sema = new Semaphore(); #items: T[] = []; #maxsize: number | undefined; constructor(maxsize: number | undefined = 1000) { this.#maxsize = maxsize ? maxsize : undefined; } get size() { return this.#items.length; } enqueue(...elements: T[]) { for (const el of elements) { if (this.#maxsize && this.size >= this.#maxsize) { throw Error("out of room"); } this.#items.push(el); this.#sema.free(); } } async dequeue(signal?: AbortSignal) { if (await this.#sema.take(signal)) { return this.poll(); } signal?.throwIfAborted(); throw Error("canceled dequeue"); } poll() { if (this.size > 0) { return this.#items.shift() as T; } else { throw Error("no elements"); } } }