Serenity Operating System
at master 203 lines 6.9 kB view raw
1/* 2 * Copyright (c) 2022, kleines Filmröllchen <filmroellchen@serenityos.org> 3 * 4 * SPDX-License-Identifier: BSD-2-Clause 5 */ 6 7#include <LibCore/SharedCircularQueue.h> 8#include <LibTest/TestCase.h> 9#include <LibThreading/Thread.h> 10#include <sched.h> 11 12using TestQueue = Core::SharedSingleProducerCircularQueue<int>; 13using QueueError = ErrorOr<int, TestQueue::QueueStatus>; 14 15Function<intptr_t()> dequeuer(TestQueue& queue, Atomic<size_t>& dequeue_count, size_t test_count); 16 17// These first two cases don't multithread at all. 18 19TEST_CASE(simple_enqueue) 20{ 21 auto queue = MUST(TestQueue::create()); 22 for (size_t i = 0; i < queue.size() - 1; ++i) 23 EXPECT(!queue.enqueue((int)i).is_error()); 24 25 auto result = queue.enqueue(0); 26 EXPECT(result.is_error()); 27 EXPECT_EQ(result.release_error(), TestQueue::QueueStatus::Full); 28} 29 30TEST_CASE(simple_dequeue) 31{ 32 auto queue = MUST(TestQueue::create()); 33 auto const test_count = 10; 34 for (int i = 0; i < test_count; ++i) 35 (void)queue.enqueue(i); 36 for (int i = 0; i < test_count; ++i) { 37 auto const element = queue.dequeue(); 38 EXPECT(!element.is_error()); 39 EXPECT_EQ(element.value(), i); 40 } 41} 42 43// There is one parallel consumer, but nobody is producing at the same time. 44TEST_CASE(simple_multithread) 45{ 46 auto queue = MUST(TestQueue::create()); 47 auto const test_count = 10; 48 49 for (int i = 0; i < test_count; ++i) 50 (void)queue.enqueue(i); 51 52 auto second_thread = Threading::Thread::construct([&queue]() { 53 auto copied_queue = queue; 54 for (int i = 0; i < test_count; ++i) { 55 QueueError result = TestQueue::QueueStatus::Invalid; 56 do { 57 result = copied_queue.dequeue(); 58 if (!result.is_error()) 59 EXPECT_EQ(result.value(), i); 60 } while (result.is_error() && result.error() == TestQueue::QueueStatus::Empty); 61 62 if (result.is_error()) 63 FAIL("Unexpected error while dequeueing."); 64 } 65 return 0; 66 }); 67 second_thread->start(); 68 (void)second_thread->join(); 69 70 EXPECT_EQ(queue.weak_used(), (size_t)0); 71} 72 73// There is one parallel consumer and one parallel producer. 74TEST_CASE(producer_consumer_multithread) 75{ 76 auto queue = MUST(TestQueue::create()); 77 // Ensure that we have the possibility of filling the queue up. 78 auto const test_count = queue.size() * 4; 79 80 Atomic<bool> other_thread_running { false }; 81 82 auto second_thread = Threading::Thread::construct([&queue, &other_thread_running]() { 83 auto copied_queue = queue; 84 other_thread_running.store(true); 85 for (size_t i = 0; i < test_count; ++i) { 86 QueueError result = TestQueue::QueueStatus::Invalid; 87 do { 88 result = copied_queue.dequeue(); 89 if (!result.is_error()) 90 EXPECT_EQ(result.value(), (int)i); 91 } while (result.is_error() && result.error() == TestQueue::QueueStatus::Empty); 92 93 if (result.is_error()) 94 FAIL("Unexpected error while dequeueing."); 95 } 96 return 0; 97 }); 98 second_thread->start(); 99 100 while (!other_thread_running.load()) 101 ; 102 103 for (size_t i = 0; i < test_count; ++i) { 104 ErrorOr<void, TestQueue::QueueStatus> result = TestQueue::QueueStatus::Invalid; 105 do { 106 result = queue.enqueue((int)i); 107 } while (result.is_error() && result.error() == TestQueue::QueueStatus::Full); 108 109 if (result.is_error()) 110 FAIL("Unexpected error while enqueueing."); 111 } 112 113 (void)second_thread->join(); 114 115 EXPECT_EQ(queue.weak_used(), (size_t)0); 116} 117 118// There are multiple parallel consumers, but nobody is producing at the same time. 119TEST_CASE(multi_consumer) 120{ 121 auto queue = MUST(TestQueue::create()); 122 // This needs to be divisible by 4! 123 size_t const test_count = queue.size() - 4; 124 Atomic<size_t> dequeue_count = 0; 125 126 auto threads = { 127 Threading::Thread::construct(dequeuer(queue, dequeue_count, test_count)), 128 Threading::Thread::construct(dequeuer(queue, dequeue_count, test_count)), 129 Threading::Thread::construct(dequeuer(queue, dequeue_count, test_count)), 130 Threading::Thread::construct(dequeuer(queue, dequeue_count, test_count)), 131 }; 132 133 for (size_t i = 0; i < test_count; ++i) 134 (void)queue.enqueue((int)i); 135 136 for (auto thread : threads) 137 thread->start(); 138 for (auto thread : threads) 139 (void)thread->join(); 140 141 EXPECT_EQ(queue.weak_used(), (size_t)0); 142 EXPECT_EQ(dequeue_count.load(), (size_t)test_count); 143} 144 145// There are multiple parallel consumers and one parallel producer. 146TEST_CASE(single_producer_multi_consumer) 147{ 148 auto queue = MUST(TestQueue::create()); 149 // Choose a higher number to provoke possible race conditions. 150 size_t const test_count = queue.size() * 8; 151 Atomic<size_t> dequeue_count = 0; 152 153 auto threads = { 154 Threading::Thread::construct(dequeuer(queue, dequeue_count, test_count)), 155 Threading::Thread::construct(dequeuer(queue, dequeue_count, test_count)), 156 Threading::Thread::construct(dequeuer(queue, dequeue_count, test_count)), 157 Threading::Thread::construct(dequeuer(queue, dequeue_count, test_count)), 158 }; 159 for (auto thread : threads) 160 thread->start(); 161 162 for (size_t i = 0; i < test_count; ++i) { 163 ErrorOr<void, TestQueue::QueueStatus> result = TestQueue::QueueStatus::Invalid; 164 do { 165 result = queue.enqueue((int)i); 166 // After we put something in the first time, let's wait while nobody has dequeued yet. 167 while (dequeue_count.load() == 0) 168 ; 169 // Give others time to do something. 170 sched_yield(); 171 } while (result.is_error() && result.error() == TestQueue::QueueStatus::Full); 172 173 if (result.is_error()) 174 FAIL("Unexpected error while enqueueing."); 175 } 176 177 for (auto thread : threads) 178 (void)thread->join(); 179 180 EXPECT_EQ(queue.weak_used(), (size_t)0); 181 EXPECT_EQ(dequeue_count.load(), (size_t)test_count); 182} 183 184Function<intptr_t()> dequeuer(TestQueue& queue, Atomic<size_t>& dequeue_count, size_t const test_count) 185{ 186 return [&queue, &dequeue_count, test_count]() { 187 auto copied_queue = queue; 188 for (size_t i = 0; i < test_count / 4; ++i) { 189 QueueError result = TestQueue::QueueStatus::Invalid; 190 do { 191 result = copied_queue.dequeue(); 192 if (!result.is_error()) 193 dequeue_count.fetch_add(1); 194 // Give others time to do something. 195 sched_yield(); 196 } while (result.is_error() && result.error() == TestQueue::QueueStatus::Empty); 197 198 if (result.is_error()) 199 FAIL("Unexpected error while dequeueing."); 200 } 201 return (intptr_t)0; 202 }; 203}