podcast manager
1// simple blocking queue, for turning streams into async pulls
2// cribbed mostly from https://github.com/ComFreek/async-playground
3
4import { Semaphore } from "./semaphore.ts";
5
6export class BlockingQueue<T> {
7 #sema = new Semaphore();
8 #items: T[] = [];
9 #maxsize: number | undefined;
10
11 constructor(maxsize: number | undefined = 1000) {
12 this.#maxsize = maxsize ? maxsize : undefined;
13 }
14
15 get size() {
16 return this.#items.length;
17 }
18
19 enqueue(...elements: T[]) {
20 for (const el of elements) {
21 if (this.#maxsize && this.size >= this.#maxsize) {
22 throw Error("out of room");
23 }
24
25 this.#items.push(el);
26 this.#sema.free();
27 }
28 }
29
30 async dequeue(signal?: AbortSignal) {
31 if (await this.#sema.take(signal)) {
32 return this.poll();
33 }
34
35 signal?.throwIfAborted();
36 throw Error("canceled dequeue");
37 }
38
39 poll() {
40 if (this.size > 0) {
41 return this.#items.shift() as T;
42 } else {
43 throw Error("no elements");
44 }
45 }
46}