offline-first, p2p synced, atproto enabled, feed reader
at main 2.6 kB view raw
1import {Semaphore} from './semaphore' 2 3/** 4 * simple blocking queue, for turning streams into async pulls. 5 * cribbed mostly from {@link https://github.com/ComFreek/async-playground} 6 */ 7export class BlockingQueue<T> { 8 #sema: Semaphore 9 #maxsize?: number 10 #items: T[] 11 12 constructor(maxsize = 1000) { 13 this.#sema = new Semaphore() 14 this.#maxsize = maxsize ? maxsize : undefined 15 this.#items = [] 16 } 17 18 /** @returns the depth of the queue */ 19 get depth(): number { 20 return this.#items.length 21 } 22 23 /** overridable method for checking if an element should be allowed to be inserted into the queue */ 24 // eslint-disable-next-line @typescript-eslint/no-unused-vars 25 allowed(_element: T): boolean { 26 return true 27 } 28 29 /** place one or more items on the queue, to be picked up by awaiters. */ 30 prequeue(...elements: T[]) { 31 for (const el of elements.reverse()) { 32 if (this.#maxsize && this.#items.length >= this.#maxsize) { 33 throw Error('out of room') 34 } 35 36 if (this.allowed(el)) { 37 this.#items.unshift(el) 38 this.#sema.free() 39 } 40 } 41 } 42 43 /** place one or more items on the queue, to be picked up by awaiters. */ 44 enqueue(...elements: T[]) { 45 for (const el of elements) { 46 if (this.#maxsize && this.#items.length >= this.#maxsize) { 47 throw Error('out of room') 48 } 49 50 if (this.allowed(el)) { 51 this.#items.push(el) 52 this.#sema.free() 53 } 54 } 55 } 56 57 /** 58 * block while waiting for an item off the queue. 59 * 60 * @param signal - a signal to use for aborting the block. 61 * @returns the item off the queue; rejects if aborted. 62 */ 63 async dequeue(signal?: AbortSignal): Promise<T> { 64 signal?.throwIfAborted() 65 if (await this.#sema.take(signal)) { 66 return this.#poll() 67 } 68 69 signal?.throwIfAborted() 70 throw Error('canceled dequeue') 71 } 72 73 #poll() { 74 const item = this.#items.length > 0 && this.#items.shift() 75 if (item) return item 76 77 throw Error('no elements') 78 } 79} 80 81/** 82 * a blocking queue with a "visited" set, which may be cleared when the queue goes empty 83 */ 84export class BlockingSet<T> extends BlockingQueue<T> { 85 #visited: Set<T> 86 87 constructor(maxsize = 1000) { 88 super(maxsize) 89 this.#visited = new Set() 90 } 91 92 reset() { 93 this.#visited = new Set() 94 } 95 96 enqueue(...elements: T[]): void { 97 super.enqueue(...elements) 98 console.log('enqueued:', this.#visited.size, this.depth) 99 } 100 101 allowed(element: T): boolean { 102 if (this.#visited.has(element)) { 103 return false 104 } else { 105 this.#visited.add(element) 106 return true 107 } 108 } 109}