Serenity Operating System
1/*
2 * Copyright (c) 2022, kleines Filmröllchen <filmroellchen@serenityos.org>
3 *
4 * SPDX-License-Identifier: BSD-2-Clause
5 */
6
7#pragma once
8
9#include <AK/Assertions.h>
10#include <AK/Atomic.h>
11#include <AK/BuiltinWrappers.h>
12#include <AK/Debug.h>
13#include <AK/DeprecatedString.h>
14#include <AK/Error.h>
15#include <AK/Format.h>
16#include <AK/Function.h>
17#include <AK/NonnullRefPtr.h>
18#include <AK/NumericLimits.h>
19#include <AK/Platform.h>
20#include <AK/RefCounted.h>
21#include <AK/RefPtr.h>
22#include <AK/Types.h>
23#include <AK/Variant.h>
24#include <AK/Weakable.h>
25#include <LibCore/AnonymousBuffer.h>
26#include <LibCore/System.h>
27#include <errno.h>
28#include <fcntl.h>
29#include <sched.h>
30#include <sys/mman.h>
31
32namespace Core {
33
34// A circular lock-free queue (or a buffer) with a single producer,
35// residing in shared memory and designed to be accessible to multiple processes.
36// This implementation makes use of the fact that any producer-related code can be sure that
37// it's the only producer-related code that is running, which simplifies a bunch of the synchronization code.
38// The exclusivity and liveliness for critical sections in this class is proven to be correct
39// under the assumption of correct synchronization primitives, i.e. atomics.
40// In many circumstances, this is enough for cross-process queues.
41// This class is designed to be transferred over IPC and mmap()ed into multiple processes' memory.
42// It is a synthetic pointer to the actual shared memory, which is abstracted away from the user.
43// FIXME: Make this independent of shared memory, so that we can move it to AK.
44template<typename T, size_t Size = 32>
45// Size must be a power of two, which speeds up the modulus operations for indexing.
46requires(popcount(Size) == 1)
47class SharedSingleProducerCircularQueue final {
48
49public:
50 using ValueType = T;
51
52 enum class QueueStatus : u8 {
53 Invalid = 0,
54 Full,
55 Empty,
56 };
57
58 SharedSingleProducerCircularQueue() = default;
59 SharedSingleProducerCircularQueue(SharedSingleProducerCircularQueue<ValueType, Size>& queue) = default;
60
61 SharedSingleProducerCircularQueue(SharedSingleProducerCircularQueue&& queue) = default;
62 SharedSingleProducerCircularQueue& operator=(SharedSingleProducerCircularQueue&& queue) = default;
63
64 // Allocates a new circular queue in shared memory.
65 static ErrorOr<SharedSingleProducerCircularQueue<T, Size>> create()
66 {
67 auto fd = TRY(System::anon_create(sizeof(SharedMemorySPCQ), O_CLOEXEC));
68 return create_internal(fd, true);
69 }
70
71 // Uses an existing circular queue from given shared memory.
72 static ErrorOr<SharedSingleProducerCircularQueue<T, Size>> create(int fd)
73 {
74 return create_internal(fd, false);
75 }
76
77 constexpr size_t size() const { return Size; }
78 // These functions are provably inconsistent and should only be used as hints to the actual capacity and used count.
79 ALWAYS_INLINE size_t weak_remaining_capacity() const { return Size - weak_used(); }
80 ALWAYS_INLINE size_t weak_used() const
81 {
82 auto volatile head = m_queue->m_queue->m_tail.load(AK::MemoryOrder::memory_order_relaxed);
83 auto volatile tail = m_queue->m_queue->m_head.load(AK::MemoryOrder::memory_order_relaxed);
84 return head - tail;
85 }
86
87 ALWAYS_INLINE constexpr int fd() const { return m_queue->m_fd; }
88 ALWAYS_INLINE constexpr bool is_valid() const { return !m_queue.is_null(); }
89
90 ALWAYS_INLINE constexpr size_t weak_head() const { return m_queue->m_queue->m_head.load(AK::MemoryOrder::memory_order_relaxed); }
91 ALWAYS_INLINE constexpr size_t weak_tail() const { return m_queue->m_queue->m_tail.load(AK::MemoryOrder::memory_order_relaxed); }
92
93 ErrorOr<void, QueueStatus> enqueue(ValueType to_insert)
94 {
95 VERIFY(!m_queue.is_null());
96 if (!can_enqueue())
97 return QueueStatus::Full;
98 auto our_tail = m_queue->m_queue->m_tail.load() % Size;
99 m_queue->m_queue->m_data[our_tail] = to_insert;
100 m_queue->m_queue->m_tail.fetch_add(1);
101
102 return {};
103 }
104
105 ALWAYS_INLINE bool can_enqueue() const
106 {
107 return ((head() - 1) % Size) != (m_queue->m_queue->m_tail.load() % Size);
108 }
109
110 // Repeatedly try to enqueue, using the wait_function to wait if it's not possible
111 ErrorOr<void> blocking_enqueue(ValueType to_insert, Function<void()> wait_function)
112 {
113 ErrorOr<void, QueueStatus> result;
114 while (true) {
115 result = enqueue(to_insert);
116 if (!result.is_error())
117 break;
118 if (result.error() != QueueStatus::Full)
119 return Error::from_string_literal("Unexpected error while enqueuing");
120
121 wait_function();
122 }
123 return {};
124 }
125
126 ErrorOr<ValueType, QueueStatus> dequeue()
127 {
128 VERIFY(!m_queue.is_null());
129 while (true) {
130 // This CAS only succeeds if nobody is currently dequeuing.
131 auto size_max = NumericLimits<size_t>::max();
132 if (m_queue->m_queue->m_head_protector.compare_exchange_strong(size_max, m_queue->m_queue->m_head.load())) {
133 auto old_head = m_queue->m_queue->m_head.load();
134 // This check looks like it's in a weird place (especially since we have to roll back the protector), but it's actually protecting against a race between multiple dequeuers.
135 if (old_head >= m_queue->m_queue->m_tail.load()) {
136 m_queue->m_queue->m_head_protector.store(NumericLimits<size_t>::max(), AK::MemoryOrder::memory_order_release);
137 return QueueStatus::Empty;
138 }
139 auto data = move(m_queue->m_queue->m_data[old_head % Size]);
140 m_queue->m_queue->m_head.fetch_add(1);
141 m_queue->m_queue->m_head_protector.store(NumericLimits<size_t>::max(), AK::MemoryOrder::memory_order_release);
142 return { move(data) };
143 }
144 }
145 }
146
147 // The "real" head as seen by the outside world. Don't use m_head directly unless you know what you're doing.
148 size_t head() const
149 {
150 return min(m_queue->m_queue->m_head.load(), m_queue->m_queue->m_head_protector.load());
151 }
152
153private:
154 struct SharedMemorySPCQ {
155 SharedMemorySPCQ() = default;
156 SharedMemorySPCQ(SharedMemorySPCQ const&) = delete;
157 SharedMemorySPCQ(SharedMemorySPCQ&&) = delete;
158 ~SharedMemorySPCQ() = default;
159
160 // Invariant: tail >= head
161 // Invariant: head and tail are monotonically increasing
162 // Invariant: tail always points to the next free location where an enqueue can happen.
163 // Invariant: head always points to the element to be dequeued next.
164 // Invariant: tail is only modified by enqueue functions.
165 // Invariant: head is only modified by dequeue functions.
166 // An empty queue is signalled with: tail = head
167 // A full queue is signalled with: head - 1 mod size = tail mod size (i.e. head and tail point to the same index in the data array)
168 // FIXME: These invariants aren't proven to be correct after each successful completion of each operation where it is relevant.
169 // The work could be put in but for now I think the algorithmic correctness proofs of the functions are enough.
170 AK_CACHE_ALIGNED Atomic<size_t, AK::MemoryOrder::memory_order_seq_cst> m_tail { 0 };
171 AK_CACHE_ALIGNED Atomic<size_t, AK::MemoryOrder::memory_order_seq_cst> m_head { 0 };
172 AK_CACHE_ALIGNED Atomic<size_t, AK::MemoryOrder::memory_order_seq_cst> m_head_protector { NumericLimits<size_t>::max() };
173
174 alignas(ValueType) Array<ValueType, Size> m_data;
175 };
176
177 class RefCountedSharedMemorySPCQ : public RefCounted<RefCountedSharedMemorySPCQ> {
178 friend class SharedSingleProducerCircularQueue;
179
180 public:
181 SharedMemorySPCQ* m_queue;
182 void* m_raw;
183 int m_fd;
184
185 ~RefCountedSharedMemorySPCQ()
186 {
187 MUST(System::close(m_fd));
188 MUST(System::munmap(m_raw, sizeof(SharedMemorySPCQ)));
189 dbgln_if(SHARED_QUEUE_DEBUG, "destructed SSPCQ at {:p}, shared mem: {:p}", this, this->m_raw);
190 }
191
192 private:
193 RefCountedSharedMemorySPCQ(SharedMemorySPCQ* queue, int fd)
194 : m_queue(queue)
195 , m_raw(reinterpret_cast<void*>(queue))
196 , m_fd(fd)
197 {
198 }
199 };
200
201 static ErrorOr<SharedSingleProducerCircularQueue<T, Size>> create_internal(int fd, bool is_new)
202 {
203 auto name = DeprecatedString::formatted("SharedSingleProducerCircularQueue@{:x}", fd);
204 auto* raw_mapping = TRY(System::mmap(nullptr, sizeof(SharedMemorySPCQ), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0, 0, name));
205 dbgln_if(SHARED_QUEUE_DEBUG, "successfully mmapped {} at {:p}", name, raw_mapping);
206
207 SharedMemorySPCQ* shared_queue = is_new ? new (raw_mapping) SharedMemorySPCQ() : reinterpret_cast<SharedMemorySPCQ*>(raw_mapping);
208
209 if (!shared_queue)
210 return Error::from_string_literal("Unexpected error when creating shared queue from raw memory");
211
212 return SharedSingleProducerCircularQueue<T, Size> { move(name), adopt_ref(*new (nothrow) RefCountedSharedMemorySPCQ(shared_queue, fd)) };
213 }
214
215 SharedSingleProducerCircularQueue(DeprecatedString name, RefPtr<RefCountedSharedMemorySPCQ> queue)
216 : m_queue(queue)
217 , m_name(move(name))
218 {
219 }
220
221 RefPtr<RefCountedSharedMemorySPCQ> m_queue;
222
223 DeprecatedString m_name {};
224};
225
226}