import {Semaphore} from './semaphore' /** * simple blocking queue, for turning streams into async pulls. * cribbed mostly from {@link https://github.com/ComFreek/async-playground} */ export class BlockingQueue { #sema: Semaphore #maxsize?: number #items: T[] constructor(maxsize = 1000) { this.#sema = new Semaphore() this.#maxsize = maxsize ? maxsize : undefined this.#items = [] } /** @returns the depth of the queue */ get depth(): number { return this.#items.length } /** overridable method for checking if an element should be allowed to be inserted into the queue */ // eslint-disable-next-line @typescript-eslint/no-unused-vars allowed(_element: T): boolean { return true } /** place one or more items on the queue, to be picked up by awaiters. */ prequeue(...elements: T[]) { for (const el of elements.reverse()) { if (this.#maxsize && this.#items.length >= this.#maxsize) { throw Error('out of room') } if (this.allowed(el)) { this.#items.unshift(el) this.#sema.free() } } } /** place one or more items on the queue, to be picked up by awaiters. */ enqueue(...elements: T[]) { for (const el of elements) { if (this.#maxsize && this.#items.length >= this.#maxsize) { throw Error('out of room') } if (this.allowed(el)) { this.#items.push(el) this.#sema.free() } } } /** * block while waiting for an item off the queue. * * @param signal - a signal to use for aborting the block. * @returns the item off the queue; rejects if aborted. */ async dequeue(signal?: AbortSignal): Promise { signal?.throwIfAborted() if (await this.#sema.take(signal)) { return this.#poll() } signal?.throwIfAborted() throw Error('canceled dequeue') } #poll() { const item = this.#items.length > 0 && this.#items.shift() if (item) return item throw Error('no elements') } } /** * a blocking queue with a "visited" set, which may be cleared when the queue goes empty */ export class BlockingSet extends BlockingQueue { #visited: Set constructor(maxsize = 1000) { super(maxsize) this.#visited = new Set() } reset() { this.#visited = new Set() } enqueue(...elements: T[]): void { super.enqueue(...elements) console.log('enqueued:', this.#visited.size, this.depth) } allowed(element: T): boolean { if (this.#visited.has(element)) { return false } else { this.#visited.add(element) return true } } }