at main 1.7 kB view raw
1import {Semaphore} from './semaphore' 2 3/** 4 * simple blocking queue, for turning streams into async pulls. 5 * cribbed mostly from {@link https://github.com/ComFreek/async-playground} 6 */ 7export class BlockingQueue<T> { 8 #sema: Semaphore 9 #maxsize?: number 10 #items: T[] 11 12 constructor(maxsize = 1000) { 13 this.#sema = new Semaphore() 14 this.#maxsize = maxsize ? maxsize : undefined 15 this.#items = [] 16 } 17 18 /** @returns the depth of the queue */ 19 get depth(): number { 20 return this.#items.length 21 } 22 23 /** place one or more items on the queue, to be picked up by awaiters. */ 24 prequeue(...elements: T[]) { 25 for (const el of elements.reverse()) { 26 if (this.#maxsize && this.#items.length >= this.#maxsize) { 27 throw Error('out of room') 28 } 29 30 this.#items.unshift(el) 31 this.#sema.free() 32 } 33 } 34 35 /** place one or more items on the queue, to be picked up by awaiters. */ 36 enqueue(...elements: T[]) { 37 for (const el of elements) { 38 if (this.#maxsize && this.#items.length >= this.#maxsize) { 39 throw Error('out of room') 40 } 41 42 this.#items.push(el) 43 this.#sema.free() 44 } 45 } 46 47 /** 48 * block while waiting for an item off the queue. 49 * 50 * @param signal - a signal to use for aborting the block. 51 * @returns the item off the queue; rejects if aborted. 52 */ 53 async dequeue(signal?: AbortSignal): Promise<T> { 54 if (await this.#sema.take(signal)) { 55 return this.#poll() 56 } 57 58 signal?.throwIfAborted() 59 throw Error('canceled dequeue') 60 } 61 62 #poll() { 63 const item = this.#items.length > 0 && this.#items.shift() 64 if (item) return item 65 66 throw Error('no elements') 67 } 68}