offline-first, p2p synced, atproto enabled, feed reader
1import {Semaphore} from './semaphore'
2
3/**
4 * simple blocking queue, for turning streams into async pulls.
5 * cribbed mostly from {@link https://github.com/ComFreek/async-playground}
6 */
7export class BlockingQueue<T> {
8 #sema: Semaphore
9 #maxsize?: number
10 #items: T[]
11
12 constructor(maxsize = 1000) {
13 this.#sema = new Semaphore()
14 this.#maxsize = maxsize ? maxsize : undefined
15 this.#items = []
16 }
17
18 /** @returns the depth of the queue */
19 get depth(): number {
20 return this.#items.length
21 }
22
23 /** overridable method for checking if an element should be allowed to be inserted into the queue */
24 // eslint-disable-next-line @typescript-eslint/no-unused-vars
25 allowed(_element: T): boolean {
26 return true
27 }
28
29 /** place one or more items on the queue, to be picked up by awaiters. */
30 prequeue(...elements: T[]) {
31 for (const el of elements.reverse()) {
32 if (this.#maxsize && this.#items.length >= this.#maxsize) {
33 throw Error('out of room')
34 }
35
36 if (this.allowed(el)) {
37 this.#items.unshift(el)
38 this.#sema.free()
39 }
40 }
41 }
42
43 /** place one or more items on the queue, to be picked up by awaiters. */
44 enqueue(...elements: T[]) {
45 for (const el of elements) {
46 if (this.#maxsize && this.#items.length >= this.#maxsize) {
47 throw Error('out of room')
48 }
49
50 if (this.allowed(el)) {
51 this.#items.push(el)
52 this.#sema.free()
53 }
54 }
55 }
56
57 /**
58 * block while waiting for an item off the queue.
59 *
60 * @param signal - a signal to use for aborting the block.
61 * @returns the item off the queue; rejects if aborted.
62 */
63 async dequeue(signal?: AbortSignal): Promise<T> {
64 signal?.throwIfAborted()
65 if (await this.#sema.take(signal)) {
66 return this.#poll()
67 }
68
69 signal?.throwIfAborted()
70 throw Error('canceled dequeue')
71 }
72
73 #poll() {
74 const item = this.#items.length > 0 && this.#items.shift()
75 if (item) return item
76
77 throw Error('no elements')
78 }
79}
80
81/**
82 * a blocking queue with a "visited" set, which may be cleared when the queue goes empty
83 */
84export class BlockingSet<T> extends BlockingQueue<T> {
85 #visited: Set<T>
86
87 constructor(maxsize = 1000) {
88 super(maxsize)
89 this.#visited = new Set()
90 }
91
92 reset() {
93 this.#visited = new Set()
94 }
95
96 enqueue(...elements: T[]): void {
97 super.enqueue(...elements)
98 console.log('enqueued:', this.#visited.size, this.depth)
99 }
100
101 allowed(element: T): boolean {
102 if (this.#visited.has(element)) {
103 return false
104 } else {
105 this.#visited.add(element)
106 return true
107 }
108 }
109}