1// simple blocking queue, for turning streams into async pulls 2// cribbed mostly from https://github.com/ComFreek/async-playground 3 4import { Semaphore } from "./semaphore.ts"; 5 6export class BlockingQueue<T> { 7 #sema = new Semaphore(); 8 #items: T[] = []; 9 #maxsize: number | undefined; 10 11 constructor(maxsize: number | undefined = 1000) { 12 this.#maxsize = maxsize ? maxsize : undefined; 13 } 14 15 get size() { 16 return this.#items.length; 17 } 18 19 enqueue(...elements: T[]) { 20 for (const el of elements) { 21 if (this.#maxsize && this.size >= this.#maxsize) { 22 throw Error("out of room"); 23 } 24 25 this.#items.push(el); 26 this.#sema.free(); 27 } 28 } 29 30 async dequeue(signal?: AbortSignal) { 31 if (await this.#sema.take(signal)) { 32 return this.poll(); 33 } 34 35 signal?.throwIfAborted(); 36 throw Error("canceled dequeue"); 37 } 38 39 poll() { 40 if (this.size > 0) { 41 return this.#items.shift() as T; 42 } else { 43 throw Error("no elements"); 44 } 45 } 46}