import {Semaphore} from './semaphore' /** * simple blocking queue, for turning streams into async pulls. * cribbed mostly from {@link https://github.com/ComFreek/async-playground} */ export class BlockingQueue { #sema: Semaphore #maxsize?: number #items: T[] constructor(maxsize = 1000) { this.#sema = new Semaphore() this.#maxsize = maxsize ? maxsize : undefined this.#items = [] } /** @returns the depth of the queue */ get depth(): number { return this.#items.length } /** place one or more items on the queue, to be picked up by awaiters. */ prequeue(...elements: T[]) { for (const el of elements.reverse()) { if (this.#maxsize && this.#items.length >= this.#maxsize) { throw Error('out of room') } this.#items.unshift(el) this.#sema.free() } } /** place one or more items on the queue, to be picked up by awaiters. */ enqueue(...elements: T[]) { for (const el of elements) { if (this.#maxsize && this.#items.length >= this.#maxsize) { throw Error('out of room') } this.#items.push(el) this.#sema.free() } } /** * block while waiting for an item off the queue. * * @param signal - a signal to use for aborting the block. * @returns the item off the queue; rejects if aborted. */ async dequeue(signal?: AbortSignal): Promise { if (await this.#sema.take(signal)) { return this.#poll() } signal?.throwIfAborted() throw Error('canceled dequeue') } #poll() { const item = this.#items.length > 0 && this.#items.shift() if (item) return item throw Error('no elements') } }