Serenity Operating System
1/*
2 * Copyright (c) 2022, kleines Filmröllchen <filmroellchen@serenityos.org>
3 *
4 * SPDX-License-Identifier: BSD-2-Clause
5 */
6
7#include <LibCore/SharedCircularQueue.h>
8#include <LibTest/TestCase.h>
9#include <LibThreading/Thread.h>
10#include <sched.h>
11
12using TestQueue = Core::SharedSingleProducerCircularQueue<int>;
13using QueueError = ErrorOr<int, TestQueue::QueueStatus>;
14
15Function<intptr_t()> dequeuer(TestQueue& queue, Atomic<size_t>& dequeue_count, size_t test_count);
16
17// These first two cases don't multithread at all.
18
19TEST_CASE(simple_enqueue)
20{
21 auto queue = MUST(TestQueue::create());
22 for (size_t i = 0; i < queue.size() - 1; ++i)
23 EXPECT(!queue.enqueue((int)i).is_error());
24
25 auto result = queue.enqueue(0);
26 EXPECT(result.is_error());
27 EXPECT_EQ(result.release_error(), TestQueue::QueueStatus::Full);
28}
29
30TEST_CASE(simple_dequeue)
31{
32 auto queue = MUST(TestQueue::create());
33 auto const test_count = 10;
34 for (int i = 0; i < test_count; ++i)
35 (void)queue.enqueue(i);
36 for (int i = 0; i < test_count; ++i) {
37 auto const element = queue.dequeue();
38 EXPECT(!element.is_error());
39 EXPECT_EQ(element.value(), i);
40 }
41}
42
43// There is one parallel consumer, but nobody is producing at the same time.
44TEST_CASE(simple_multithread)
45{
46 auto queue = MUST(TestQueue::create());
47 auto const test_count = 10;
48
49 for (int i = 0; i < test_count; ++i)
50 (void)queue.enqueue(i);
51
52 auto second_thread = Threading::Thread::construct([&queue]() {
53 auto copied_queue = queue;
54 for (int i = 0; i < test_count; ++i) {
55 QueueError result = TestQueue::QueueStatus::Invalid;
56 do {
57 result = copied_queue.dequeue();
58 if (!result.is_error())
59 EXPECT_EQ(result.value(), i);
60 } while (result.is_error() && result.error() == TestQueue::QueueStatus::Empty);
61
62 if (result.is_error())
63 FAIL("Unexpected error while dequeueing.");
64 }
65 return 0;
66 });
67 second_thread->start();
68 (void)second_thread->join();
69
70 EXPECT_EQ(queue.weak_used(), (size_t)0);
71}
72
73// There is one parallel consumer and one parallel producer.
74TEST_CASE(producer_consumer_multithread)
75{
76 auto queue = MUST(TestQueue::create());
77 // Ensure that we have the possibility of filling the queue up.
78 auto const test_count = queue.size() * 4;
79
80 Atomic<bool> other_thread_running { false };
81
82 auto second_thread = Threading::Thread::construct([&queue, &other_thread_running]() {
83 auto copied_queue = queue;
84 other_thread_running.store(true);
85 for (size_t i = 0; i < test_count; ++i) {
86 QueueError result = TestQueue::QueueStatus::Invalid;
87 do {
88 result = copied_queue.dequeue();
89 if (!result.is_error())
90 EXPECT_EQ(result.value(), (int)i);
91 } while (result.is_error() && result.error() == TestQueue::QueueStatus::Empty);
92
93 if (result.is_error())
94 FAIL("Unexpected error while dequeueing.");
95 }
96 return 0;
97 });
98 second_thread->start();
99
100 while (!other_thread_running.load())
101 ;
102
103 for (size_t i = 0; i < test_count; ++i) {
104 ErrorOr<void, TestQueue::QueueStatus> result = TestQueue::QueueStatus::Invalid;
105 do {
106 result = queue.enqueue((int)i);
107 } while (result.is_error() && result.error() == TestQueue::QueueStatus::Full);
108
109 if (result.is_error())
110 FAIL("Unexpected error while enqueueing.");
111 }
112
113 (void)second_thread->join();
114
115 EXPECT_EQ(queue.weak_used(), (size_t)0);
116}
117
118// There are multiple parallel consumers, but nobody is producing at the same time.
119TEST_CASE(multi_consumer)
120{
121 auto queue = MUST(TestQueue::create());
122 // This needs to be divisible by 4!
123 size_t const test_count = queue.size() - 4;
124 Atomic<size_t> dequeue_count = 0;
125
126 auto threads = {
127 Threading::Thread::construct(dequeuer(queue, dequeue_count, test_count)),
128 Threading::Thread::construct(dequeuer(queue, dequeue_count, test_count)),
129 Threading::Thread::construct(dequeuer(queue, dequeue_count, test_count)),
130 Threading::Thread::construct(dequeuer(queue, dequeue_count, test_count)),
131 };
132
133 for (size_t i = 0; i < test_count; ++i)
134 (void)queue.enqueue((int)i);
135
136 for (auto thread : threads)
137 thread->start();
138 for (auto thread : threads)
139 (void)thread->join();
140
141 EXPECT_EQ(queue.weak_used(), (size_t)0);
142 EXPECT_EQ(dequeue_count.load(), (size_t)test_count);
143}
144
145// There are multiple parallel consumers and one parallel producer.
146TEST_CASE(single_producer_multi_consumer)
147{
148 auto queue = MUST(TestQueue::create());
149 // Choose a higher number to provoke possible race conditions.
150 size_t const test_count = queue.size() * 8;
151 Atomic<size_t> dequeue_count = 0;
152
153 auto threads = {
154 Threading::Thread::construct(dequeuer(queue, dequeue_count, test_count)),
155 Threading::Thread::construct(dequeuer(queue, dequeue_count, test_count)),
156 Threading::Thread::construct(dequeuer(queue, dequeue_count, test_count)),
157 Threading::Thread::construct(dequeuer(queue, dequeue_count, test_count)),
158 };
159 for (auto thread : threads)
160 thread->start();
161
162 for (size_t i = 0; i < test_count; ++i) {
163 ErrorOr<void, TestQueue::QueueStatus> result = TestQueue::QueueStatus::Invalid;
164 do {
165 result = queue.enqueue((int)i);
166 // After we put something in the first time, let's wait while nobody has dequeued yet.
167 while (dequeue_count.load() == 0)
168 ;
169 // Give others time to do something.
170 sched_yield();
171 } while (result.is_error() && result.error() == TestQueue::QueueStatus::Full);
172
173 if (result.is_error())
174 FAIL("Unexpected error while enqueueing.");
175 }
176
177 for (auto thread : threads)
178 (void)thread->join();
179
180 EXPECT_EQ(queue.weak_used(), (size_t)0);
181 EXPECT_EQ(dequeue_count.load(), (size_t)test_count);
182}
183
184Function<intptr_t()> dequeuer(TestQueue& queue, Atomic<size_t>& dequeue_count, size_t const test_count)
185{
186 return [&queue, &dequeue_count, test_count]() {
187 auto copied_queue = queue;
188 for (size_t i = 0; i < test_count / 4; ++i) {
189 QueueError result = TestQueue::QueueStatus::Invalid;
190 do {
191 result = copied_queue.dequeue();
192 if (!result.is_error())
193 dequeue_count.fetch_add(1);
194 // Give others time to do something.
195 sched_yield();
196 } while (result.is_error() && result.error() == TestQueue::QueueStatus::Empty);
197
198 if (result.is_error())
199 FAIL("Unexpected error while dequeueing.");
200 }
201 return (intptr_t)0;
202 };
203}