offline-first, p2p synced, atproto enabled, feed reader
1import {describe, expect, it} from 'vitest'
2
3import {BlockingQueue} from '#lib/async/blocking-queue'
4
5describe('BlockingQueue', () => {
6 it('enqueues and dequeues items in order', async () => {
7 const queue = new BlockingQueue<number>()
8
9 queue.enqueue(1, 2, 3)
10
11 expect(await queue.dequeue()).toBe(1)
12 expect(await queue.dequeue()).toBe(2)
13 expect(await queue.dequeue()).toBe(3)
14 })
15
16 it('blocks dequeue until item is available', async () => {
17 const queue = new BlockingQueue<string>()
18
19 setTimeout(() => {
20 queue.enqueue('delayed')
21 }, 10)
22
23 const result = await queue.dequeue()
24
25 expect(result).toBe('delayed')
26 })
27
28 it('prequeues items at the front', async () => {
29 const queue = new BlockingQueue<number>()
30
31 queue.enqueue(1, 2)
32 queue.prequeue(3, 4)
33
34 expect(await queue.dequeue()).toBe(3)
35 expect(await queue.dequeue()).toBe(4)
36 expect(await queue.dequeue()).toBe(1)
37 expect(await queue.dequeue()).toBe(2)
38 })
39
40 it('tracks queue depth', () => {
41 const queue = new BlockingQueue<number>()
42
43 expect(queue.depth).toBe(0)
44
45 queue.enqueue(1, 2, 3)
46
47 expect(queue.depth).toBe(3)
48 })
49
50 it('respects maxsize limit', () => {
51 const queue = new BlockingQueue<number>(3)
52
53 queue.enqueue(1, 2, 3)
54
55 expect(() => {
56 queue.enqueue(4)
57 }).toThrow('out of room')
58 })
59
60 it('can be aborted with signal', async () => {
61 const queue = new BlockingQueue<number>()
62 const controller = new AbortController()
63
64 const promise = queue.dequeue(controller.signal)
65
66 // Abort after promise is created
67 await new Promise((resolve) => setTimeout(resolve, 5))
68 controller.abort(new DOMException('Aborted', 'AbortError'))
69
70 await expect(promise).rejects.toThrow()
71 })
72
73 it('throws if signal already aborted', async () => {
74 const queue = new BlockingQueue<number>()
75 const controller = new AbortController()
76 controller.abort(new DOMException('Aborted', 'AbortError'))
77
78 await expect(queue.dequeue(controller.signal)).rejects.toThrow()
79 })
80
81 it('handles prequeue with maxsize', () => {
82 const queue = new BlockingQueue<number>(3)
83
84 queue.prequeue(1, 2, 3)
85
86 expect(() => {
87 queue.prequeue(4)
88 }).toThrow('out of room')
89 })
90
91 it('allows unlimited size when maxsize is 0', () => {
92 const queue = new BlockingQueue<number>(0)
93
94 for (let i = 0; i < 2000; i++) {
95 queue.enqueue(i)
96 }
97
98 expect(queue.depth).toBe(2000)
99 })
100
101 it('reduces depth as items are dequeued', async () => {
102 const queue = new BlockingQueue<number>()
103
104 queue.enqueue(1, 2, 3, 4, 5)
105 expect(queue.depth).toBe(5)
106
107 await queue.dequeue()
108 expect(queue.depth).toBe(4)
109
110 await queue.dequeue()
111 expect(queue.depth).toBe(3)
112 })
113})