Serenity Operating System
at master 226 lines 9.5 kB view raw
1/* 2 * Copyright (c) 2022, kleines Filmröllchen <filmroellchen@serenityos.org> 3 * 4 * SPDX-License-Identifier: BSD-2-Clause 5 */ 6 7#pragma once 8 9#include <AK/Assertions.h> 10#include <AK/Atomic.h> 11#include <AK/BuiltinWrappers.h> 12#include <AK/Debug.h> 13#include <AK/DeprecatedString.h> 14#include <AK/Error.h> 15#include <AK/Format.h> 16#include <AK/Function.h> 17#include <AK/NonnullRefPtr.h> 18#include <AK/NumericLimits.h> 19#include <AK/Platform.h> 20#include <AK/RefCounted.h> 21#include <AK/RefPtr.h> 22#include <AK/Types.h> 23#include <AK/Variant.h> 24#include <AK/Weakable.h> 25#include <LibCore/AnonymousBuffer.h> 26#include <LibCore/System.h> 27#include <errno.h> 28#include <fcntl.h> 29#include <sched.h> 30#include <sys/mman.h> 31 32namespace Core { 33 34// A circular lock-free queue (or a buffer) with a single producer, 35// residing in shared memory and designed to be accessible to multiple processes. 36// This implementation makes use of the fact that any producer-related code can be sure that 37// it's the only producer-related code that is running, which simplifies a bunch of the synchronization code. 38// The exclusivity and liveliness for critical sections in this class is proven to be correct 39// under the assumption of correct synchronization primitives, i.e. atomics. 40// In many circumstances, this is enough for cross-process queues. 41// This class is designed to be transferred over IPC and mmap()ed into multiple processes' memory. 42// It is a synthetic pointer to the actual shared memory, which is abstracted away from the user. 43// FIXME: Make this independent of shared memory, so that we can move it to AK. 44template<typename T, size_t Size = 32> 45// Size must be a power of two, which speeds up the modulus operations for indexing. 46requires(popcount(Size) == 1) 47class SharedSingleProducerCircularQueue final { 48 49public: 50 using ValueType = T; 51 52 enum class QueueStatus : u8 { 53 Invalid = 0, 54 Full, 55 Empty, 56 }; 57 58 SharedSingleProducerCircularQueue() = default; 59 SharedSingleProducerCircularQueue(SharedSingleProducerCircularQueue<ValueType, Size>& queue) = default; 60 61 SharedSingleProducerCircularQueue(SharedSingleProducerCircularQueue&& queue) = default; 62 SharedSingleProducerCircularQueue& operator=(SharedSingleProducerCircularQueue&& queue) = default; 63 64 // Allocates a new circular queue in shared memory. 65 static ErrorOr<SharedSingleProducerCircularQueue<T, Size>> create() 66 { 67 auto fd = TRY(System::anon_create(sizeof(SharedMemorySPCQ), O_CLOEXEC)); 68 return create_internal(fd, true); 69 } 70 71 // Uses an existing circular queue from given shared memory. 72 static ErrorOr<SharedSingleProducerCircularQueue<T, Size>> create(int fd) 73 { 74 return create_internal(fd, false); 75 } 76 77 constexpr size_t size() const { return Size; } 78 // These functions are provably inconsistent and should only be used as hints to the actual capacity and used count. 79 ALWAYS_INLINE size_t weak_remaining_capacity() const { return Size - weak_used(); } 80 ALWAYS_INLINE size_t weak_used() const 81 { 82 auto volatile head = m_queue->m_queue->m_tail.load(AK::MemoryOrder::memory_order_relaxed); 83 auto volatile tail = m_queue->m_queue->m_head.load(AK::MemoryOrder::memory_order_relaxed); 84 return head - tail; 85 } 86 87 ALWAYS_INLINE constexpr int fd() const { return m_queue->m_fd; } 88 ALWAYS_INLINE constexpr bool is_valid() const { return !m_queue.is_null(); } 89 90 ALWAYS_INLINE constexpr size_t weak_head() const { return m_queue->m_queue->m_head.load(AK::MemoryOrder::memory_order_relaxed); } 91 ALWAYS_INLINE constexpr size_t weak_tail() const { return m_queue->m_queue->m_tail.load(AK::MemoryOrder::memory_order_relaxed); } 92 93 ErrorOr<void, QueueStatus> enqueue(ValueType to_insert) 94 { 95 VERIFY(!m_queue.is_null()); 96 if (!can_enqueue()) 97 return QueueStatus::Full; 98 auto our_tail = m_queue->m_queue->m_tail.load() % Size; 99 m_queue->m_queue->m_data[our_tail] = to_insert; 100 m_queue->m_queue->m_tail.fetch_add(1); 101 102 return {}; 103 } 104 105 ALWAYS_INLINE bool can_enqueue() const 106 { 107 return ((head() - 1) % Size) != (m_queue->m_queue->m_tail.load() % Size); 108 } 109 110 // Repeatedly try to enqueue, using the wait_function to wait if it's not possible 111 ErrorOr<void> blocking_enqueue(ValueType to_insert, Function<void()> wait_function) 112 { 113 ErrorOr<void, QueueStatus> result; 114 while (true) { 115 result = enqueue(to_insert); 116 if (!result.is_error()) 117 break; 118 if (result.error() != QueueStatus::Full) 119 return Error::from_string_literal("Unexpected error while enqueuing"); 120 121 wait_function(); 122 } 123 return {}; 124 } 125 126 ErrorOr<ValueType, QueueStatus> dequeue() 127 { 128 VERIFY(!m_queue.is_null()); 129 while (true) { 130 // This CAS only succeeds if nobody is currently dequeuing. 131 auto size_max = NumericLimits<size_t>::max(); 132 if (m_queue->m_queue->m_head_protector.compare_exchange_strong(size_max, m_queue->m_queue->m_head.load())) { 133 auto old_head = m_queue->m_queue->m_head.load(); 134 // This check looks like it's in a weird place (especially since we have to roll back the protector), but it's actually protecting against a race between multiple dequeuers. 135 if (old_head >= m_queue->m_queue->m_tail.load()) { 136 m_queue->m_queue->m_head_protector.store(NumericLimits<size_t>::max(), AK::MemoryOrder::memory_order_release); 137 return QueueStatus::Empty; 138 } 139 auto data = move(m_queue->m_queue->m_data[old_head % Size]); 140 m_queue->m_queue->m_head.fetch_add(1); 141 m_queue->m_queue->m_head_protector.store(NumericLimits<size_t>::max(), AK::MemoryOrder::memory_order_release); 142 return { move(data) }; 143 } 144 } 145 } 146 147 // The "real" head as seen by the outside world. Don't use m_head directly unless you know what you're doing. 148 size_t head() const 149 { 150 return min(m_queue->m_queue->m_head.load(), m_queue->m_queue->m_head_protector.load()); 151 } 152 153private: 154 struct SharedMemorySPCQ { 155 SharedMemorySPCQ() = default; 156 SharedMemorySPCQ(SharedMemorySPCQ const&) = delete; 157 SharedMemorySPCQ(SharedMemorySPCQ&&) = delete; 158 ~SharedMemorySPCQ() = default; 159 160 // Invariant: tail >= head 161 // Invariant: head and tail are monotonically increasing 162 // Invariant: tail always points to the next free location where an enqueue can happen. 163 // Invariant: head always points to the element to be dequeued next. 164 // Invariant: tail is only modified by enqueue functions. 165 // Invariant: head is only modified by dequeue functions. 166 // An empty queue is signalled with: tail = head 167 // A full queue is signalled with: head - 1 mod size = tail mod size (i.e. head and tail point to the same index in the data array) 168 // FIXME: These invariants aren't proven to be correct after each successful completion of each operation where it is relevant. 169 // The work could be put in but for now I think the algorithmic correctness proofs of the functions are enough. 170 AK_CACHE_ALIGNED Atomic<size_t, AK::MemoryOrder::memory_order_seq_cst> m_tail { 0 }; 171 AK_CACHE_ALIGNED Atomic<size_t, AK::MemoryOrder::memory_order_seq_cst> m_head { 0 }; 172 AK_CACHE_ALIGNED Atomic<size_t, AK::MemoryOrder::memory_order_seq_cst> m_head_protector { NumericLimits<size_t>::max() }; 173 174 alignas(ValueType) Array<ValueType, Size> m_data; 175 }; 176 177 class RefCountedSharedMemorySPCQ : public RefCounted<RefCountedSharedMemorySPCQ> { 178 friend class SharedSingleProducerCircularQueue; 179 180 public: 181 SharedMemorySPCQ* m_queue; 182 void* m_raw; 183 int m_fd; 184 185 ~RefCountedSharedMemorySPCQ() 186 { 187 MUST(System::close(m_fd)); 188 MUST(System::munmap(m_raw, sizeof(SharedMemorySPCQ))); 189 dbgln_if(SHARED_QUEUE_DEBUG, "destructed SSPCQ at {:p}, shared mem: {:p}", this, this->m_raw); 190 } 191 192 private: 193 RefCountedSharedMemorySPCQ(SharedMemorySPCQ* queue, int fd) 194 : m_queue(queue) 195 , m_raw(reinterpret_cast<void*>(queue)) 196 , m_fd(fd) 197 { 198 } 199 }; 200 201 static ErrorOr<SharedSingleProducerCircularQueue<T, Size>> create_internal(int fd, bool is_new) 202 { 203 auto name = DeprecatedString::formatted("SharedSingleProducerCircularQueue@{:x}", fd); 204 auto* raw_mapping = TRY(System::mmap(nullptr, sizeof(SharedMemorySPCQ), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0, 0, name)); 205 dbgln_if(SHARED_QUEUE_DEBUG, "successfully mmapped {} at {:p}", name, raw_mapping); 206 207 SharedMemorySPCQ* shared_queue = is_new ? new (raw_mapping) SharedMemorySPCQ() : reinterpret_cast<SharedMemorySPCQ*>(raw_mapping); 208 209 if (!shared_queue) 210 return Error::from_string_literal("Unexpected error when creating shared queue from raw memory"); 211 212 return SharedSingleProducerCircularQueue<T, Size> { move(name), adopt_ref(*new (nothrow) RefCountedSharedMemorySPCQ(shared_queue, fd)) }; 213 } 214 215 SharedSingleProducerCircularQueue(DeprecatedString name, RefPtr<RefCountedSharedMemorySPCQ> queue) 216 : m_queue(queue) 217 , m_name(move(name)) 218 { 219 } 220 221 RefPtr<RefCountedSharedMemorySPCQ> m_queue; 222 223 DeprecatedString m_name {}; 224}; 225 226}