podcast manager
1import {Semaphore} from './semaphore'
2
3/**
4 * simple blocking queue, for turning streams into async pulls.
5 * cribbed mostly from {@link https://github.com/ComFreek/async-playground}
6 */
7export class BlockingQueue<T> {
8 #sema: Semaphore
9 #maxsize?: number
10 #items: T[]
11
12 constructor(maxsize = 1000) {
13 this.#sema = new Semaphore()
14 this.#maxsize = maxsize ? maxsize : undefined
15 this.#items = []
16 }
17
18 /** @returns the depth of the queue */
19 get depth(): number {
20 return this.#items.length
21 }
22
23 /** place one or more items on the queue, to be picked up by awaiters. */
24 prequeue(...elements: T[]) {
25 for (const el of elements.reverse()) {
26 if (this.#maxsize && this.#items.length >= this.#maxsize) {
27 throw Error('out of room')
28 }
29
30 this.#items.unshift(el)
31 this.#sema.free()
32 }
33 }
34
35 /** place one or more items on the queue, to be picked up by awaiters. */
36 enqueue(...elements: T[]) {
37 for (const el of elements) {
38 if (this.#maxsize && this.#items.length >= this.#maxsize) {
39 throw Error('out of room')
40 }
41
42 this.#items.push(el)
43 this.#sema.free()
44 }
45 }
46
47 /**
48 * block while waiting for an item off the queue.
49 *
50 * @param signal - a signal to use for aborting the block.
51 * @returns the item off the queue; rejects if aborted.
52 */
53 async dequeue(signal?: AbortSignal): Promise<T> {
54 if (await this.#sema.take(signal)) {
55 return this.#poll()
56 }
57
58 signal?.throwIfAborted()
59 throw Error('canceled dequeue')
60 }
61
62 #poll() {
63 const item = this.#items.length > 0 && this.#items.shift()
64 if (item) return item
65
66 throw Error('no elements')
67 }
68}