The open source OpenXR runtime
1// Provides a C++11 implementation of a multi-producer, multi-consumer lock-free queue.
2// An overview, including benchmark results, is provided here:
3// http://moodycamel.com/blog/2014/a-fast-general-purpose-lock-free-queue-for-c++
4// The full design is also described in excruciating detail at:
5// http://moodycamel.com/blog/2014/detailed-design-of-a-lock-free-queue
6
7// Simplified BSD license:
8// Copyright (c) 2013-2016, Cameron Desrochers.
9// All rights reserved.
10//
11// Redistribution and use in source and binary forms, with or without modification,
12// are permitted provided that the following conditions are met:
13//
14// - Redistributions of source code must retain the above copyright notice, this list of
15// conditions and the following disclaimer.
16// - Redistributions in binary form must reproduce the above copyright notice, this list of
17// conditions and the following disclaimer in the documentation and/or other materials
18// provided with the distribution.
19//
20// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
21// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
22// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
23// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
25// OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
26// HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
27// TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
28// EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
30
31#pragma once
32
33#include "../common/TracyAlloc.hpp"
34#include "../common/TracyForceInline.hpp"
35#include "../common/TracySystem.hpp"
36
37#if defined(__GNUC__)
38// Disable -Wconversion warnings (spuriously triggered when Traits::size_t and
39// Traits::index_t are set to < 32 bits, causing integer promotion, causing warnings
40// upon assigning any computed values)
41#pragma GCC diagnostic push
42#pragma GCC diagnostic ignored "-Wconversion"
43#endif
44
45#if defined(__APPLE__)
46#include "TargetConditionals.h"
47#endif
48
49#include <atomic> // Requires C++11. Sorry VS2010.
50#include <cassert>
51#include <cstddef> // for max_align_t
52#include <cstdint>
53#include <cstdlib>
54#include <type_traits>
55#include <algorithm>
56#include <utility>
57#include <limits>
58#include <climits> // for CHAR_BIT
59#include <array>
60#include <thread> // partly for __WINPTHREADS_VERSION if on MinGW-w64 w/ POSIX threading
61
62namespace tracy
63{
64
65// Compiler-specific likely/unlikely hints
66namespace moodycamel { namespace details {
67#if defined(__GNUC__)
68 inline bool cqLikely(bool x) { return __builtin_expect((x), true); }
69 inline bool cqUnlikely(bool x) { return __builtin_expect((x), false); }
70#else
71 inline bool cqLikely(bool x) { return x; }
72 inline bool cqUnlikely(bool x) { return x; }
73#endif
74} }
75
76namespace
77{
78 // to avoid MSVC warning 4127: conditional expression is constant
79 template <bool>
80 struct compile_time_condition
81 {
82 static const bool value = false;
83 };
84 template <>
85 struct compile_time_condition<true>
86 {
87 static const bool value = true;
88 };
89}
90
91namespace moodycamel {
92namespace details {
93 template<typename T>
94 struct const_numeric_max {
95 static_assert(std::is_integral<T>::value, "const_numeric_max can only be used with integers");
96 static const T value = std::numeric_limits<T>::is_signed
97 ? (static_cast<T>(1) << (sizeof(T) * CHAR_BIT - 1)) - static_cast<T>(1)
98 : static_cast<T>(-1);
99 };
100
101#if defined(__GLIBCXX__)
102 typedef ::max_align_t std_max_align_t; // libstdc++ forgot to add it to std:: for a while
103#else
104 typedef std::max_align_t std_max_align_t; // Others (e.g. MSVC) insist it can *only* be accessed via std::
105#endif
106
107 // Some platforms have incorrectly set max_align_t to a type with <8 bytes alignment even while supporting
108 // 8-byte aligned scalar values (*cough* 32-bit iOS). Work around this with our own union. See issue #64.
109 typedef union {
110 std_max_align_t x;
111 long long y;
112 void* z;
113 } max_align_t;
114}
115
116// Default traits for the ConcurrentQueue. To change some of the
117// traits without re-implementing all of them, inherit from this
118// struct and shadow the declarations you wish to be different;
119// since the traits are used as a template type parameter, the
120// shadowed declarations will be used where defined, and the defaults
121// otherwise.
122struct ConcurrentQueueDefaultTraits
123{
124 // General-purpose size type. std::size_t is strongly recommended.
125 typedef std::size_t size_t;
126
127 // The type used for the enqueue and dequeue indices. Must be at least as
128 // large as size_t. Should be significantly larger than the number of elements
129 // you expect to hold at once, especially if you have a high turnover rate;
130 // for example, on 32-bit x86, if you expect to have over a hundred million
131 // elements or pump several million elements through your queue in a very
132 // short space of time, using a 32-bit type *may* trigger a race condition.
133 // A 64-bit int type is recommended in that case, and in practice will
134 // prevent a race condition no matter the usage of the queue. Note that
135 // whether the queue is lock-free with a 64-int type depends on the whether
136 // std::atomic<std::uint64_t> is lock-free, which is platform-specific.
137 typedef std::size_t index_t;
138
139 // Internally, all elements are enqueued and dequeued from multi-element
140 // blocks; this is the smallest controllable unit. If you expect few elements
141 // but many producers, a smaller block size should be favoured. For few producers
142 // and/or many elements, a larger block size is preferred. A sane default
143 // is provided. Must be a power of 2.
144 static const size_t BLOCK_SIZE = 64*1024;
145
146 // For explicit producers (i.e. when using a producer token), the block is
147 // checked for being empty by iterating through a list of flags, one per element.
148 // For large block sizes, this is too inefficient, and switching to an atomic
149 // counter-based approach is faster. The switch is made for block sizes strictly
150 // larger than this threshold.
151 static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD = 32;
152
153 // How many full blocks can be expected for a single explicit producer? This should
154 // reflect that number's maximum for optimal performance. Must be a power of 2.
155 static const size_t EXPLICIT_INITIAL_INDEX_SIZE = 32;
156
157 // Controls the number of items that an explicit consumer (i.e. one with a token)
158 // must consume before it causes all consumers to rotate and move on to the next
159 // internal queue.
160 static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE = 256;
161
162 // The maximum number of elements (inclusive) that can be enqueued to a sub-queue.
163 // Enqueue operations that would cause this limit to be surpassed will fail. Note
164 // that this limit is enforced at the block level (for performance reasons), i.e.
165 // it's rounded up to the nearest block size.
166 static const size_t MAX_SUBQUEUE_SIZE = details::const_numeric_max<size_t>::value;
167
168
169 // Memory allocation can be customized if needed.
170 // malloc should return nullptr on failure, and handle alignment like std::malloc.
171#if defined(malloc) || defined(free)
172 // Gah, this is 2015, stop defining macros that break standard code already!
173 // Work around malloc/free being special macros:
174 static inline void* WORKAROUND_malloc(size_t size) { return malloc(size); }
175 static inline void WORKAROUND_free(void* ptr) { return free(ptr); }
176 static inline void* (malloc)(size_t size) { return WORKAROUND_malloc(size); }
177 static inline void (free)(void* ptr) { return WORKAROUND_free(ptr); }
178#else
179 static inline void* malloc(size_t size) { return tracy::tracy_malloc(size); }
180 static inline void free(void* ptr) { return tracy::tracy_free(ptr); }
181#endif
182};
183
184
185// When producing or consuming many elements, the most efficient way is to:
186// 1) Use one of the bulk-operation methods of the queue with a token
187// 2) Failing that, use the bulk-operation methods without a token
188// 3) Failing that, create a token and use that with the single-item methods
189// 4) Failing that, use the single-parameter methods of the queue
190// Having said that, don't create tokens willy-nilly -- ideally there should be
191// a maximum of one token per thread (of each kind).
192struct ProducerToken;
193struct ConsumerToken;
194
195template<typename T, typename Traits> class ConcurrentQueue;
196
197
198namespace details
199{
200 struct ConcurrentQueueProducerTypelessBase
201 {
202 ConcurrentQueueProducerTypelessBase* next;
203 std::atomic<bool> inactive;
204 ProducerToken* token;
205 uint32_t threadId;
206
207 ConcurrentQueueProducerTypelessBase()
208 : next(nullptr), inactive(false), token(nullptr), threadId(0)
209 {
210 }
211 };
212
213 template<typename T>
214 static inline bool circular_less_than(T a, T b)
215 {
216 static_assert(std::is_integral<T>::value && !std::numeric_limits<T>::is_signed, "circular_less_than is intended to be used only with unsigned integer types");
217 return static_cast<T>(a - b) > static_cast<T>(static_cast<T>(1) << (static_cast<T>(sizeof(T) * CHAR_BIT - 1)));
218 // Note: extra parens around rhs of operator<< is MSVC bug: https://developercommunity2.visualstudio.com/t/C4554-triggers-when-both-lhs-and-rhs-is/10034931
219 // silencing the bug requires #pragma warning(disable: 4554) around the calling code and has no effect when done here.
220 }
221
222 template<typename U>
223 static inline char* align_for(char* ptr)
224 {
225 const std::size_t alignment = std::alignment_of<U>::value;
226 return ptr + (alignment - (reinterpret_cast<std::uintptr_t>(ptr) % alignment)) % alignment;
227 }
228
229 template<typename T>
230 static inline T ceil_to_pow_2(T x)
231 {
232 static_assert(std::is_integral<T>::value && !std::numeric_limits<T>::is_signed, "ceil_to_pow_2 is intended to be used only with unsigned integer types");
233
234 // Adapted from http://graphics.stanford.edu/~seander/bithacks.html#RoundUpPowerOf2
235 --x;
236 x |= x >> 1;
237 x |= x >> 2;
238 x |= x >> 4;
239 for (std::size_t i = 1; i < sizeof(T); i <<= 1) {
240 x |= x >> (i << 3);
241 }
242 ++x;
243 return x;
244 }
245
246 template<typename T>
247 static inline void swap_relaxed(std::atomic<T>& left, std::atomic<T>& right)
248 {
249 T temp = std::move(left.load(std::memory_order_relaxed));
250 left.store(std::move(right.load(std::memory_order_relaxed)), std::memory_order_relaxed);
251 right.store(std::move(temp), std::memory_order_relaxed);
252 }
253
254 template<typename T>
255 static inline T const& nomove(T const& x)
256 {
257 return x;
258 }
259
260 template<bool Enable>
261 struct nomove_if
262 {
263 template<typename T>
264 static inline T const& eval(T const& x)
265 {
266 return x;
267 }
268 };
269
270 template<>
271 struct nomove_if<false>
272 {
273 template<typename U>
274 static inline auto eval(U&& x)
275 -> decltype(std::forward<U>(x))
276 {
277 return std::forward<U>(x);
278 }
279 };
280
281 template<typename It>
282 static inline auto deref_noexcept(It& it) noexcept -> decltype(*it)
283 {
284 return *it;
285 }
286
287#if defined(__clang__) || !defined(__GNUC__) || __GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 8)
288 template<typename T> struct is_trivially_destructible : std::is_trivially_destructible<T> { };
289#else
290 template<typename T> struct is_trivially_destructible : std::has_trivial_destructor<T> { };
291#endif
292
293 template<typename T> struct static_is_lock_free_num { enum { value = 0 }; };
294 template<> struct static_is_lock_free_num<signed char> { enum { value = ATOMIC_CHAR_LOCK_FREE }; };
295 template<> struct static_is_lock_free_num<short> { enum { value = ATOMIC_SHORT_LOCK_FREE }; };
296 template<> struct static_is_lock_free_num<int> { enum { value = ATOMIC_INT_LOCK_FREE }; };
297 template<> struct static_is_lock_free_num<long> { enum { value = ATOMIC_LONG_LOCK_FREE }; };
298 template<> struct static_is_lock_free_num<long long> { enum { value = ATOMIC_LLONG_LOCK_FREE }; };
299 template<typename T> struct static_is_lock_free : static_is_lock_free_num<typename std::make_signed<T>::type> { };
300 template<> struct static_is_lock_free<bool> { enum { value = ATOMIC_BOOL_LOCK_FREE }; };
301 template<typename U> struct static_is_lock_free<U*> { enum { value = ATOMIC_POINTER_LOCK_FREE }; };
302}
303
304
305struct ProducerToken
306{
307 template<typename T, typename Traits>
308 explicit ProducerToken(ConcurrentQueue<T, Traits>& queue);
309
310 ProducerToken(ProducerToken&& other) noexcept
311 : producer(other.producer)
312 {
313 other.producer = nullptr;
314 if (producer != nullptr) {
315 producer->token = this;
316 }
317 }
318
319 inline ProducerToken& operator=(ProducerToken&& other) noexcept
320 {
321 swap(other);
322 return *this;
323 }
324
325 void swap(ProducerToken& other) noexcept
326 {
327 std::swap(producer, other.producer);
328 if (producer != nullptr) {
329 producer->token = this;
330 }
331 if (other.producer != nullptr) {
332 other.producer->token = &other;
333 }
334 }
335
336 // A token is always valid unless:
337 // 1) Memory allocation failed during construction
338 // 2) It was moved via the move constructor
339 // (Note: assignment does a swap, leaving both potentially valid)
340 // 3) The associated queue was destroyed
341 // Note that if valid() returns true, that only indicates
342 // that the token is valid for use with a specific queue,
343 // but not which one; that's up to the user to track.
344 inline bool valid() const { return producer != nullptr; }
345
346 ~ProducerToken()
347 {
348 if (producer != nullptr) {
349 producer->token = nullptr;
350 producer->inactive.store(true, std::memory_order_release);
351 }
352 }
353
354 // Disable copying and assignment
355 ProducerToken(ProducerToken const&) = delete;
356 ProducerToken& operator=(ProducerToken const&) = delete;
357
358private:
359 template<typename T, typename Traits> friend class ConcurrentQueue;
360
361protected:
362 details::ConcurrentQueueProducerTypelessBase* producer;
363};
364
365
366struct ConsumerToken
367{
368 template<typename T, typename Traits>
369 explicit ConsumerToken(ConcurrentQueue<T, Traits>& q);
370
371 ConsumerToken(ConsumerToken&& other) noexcept
372 : initialOffset(other.initialOffset), lastKnownGlobalOffset(other.lastKnownGlobalOffset), itemsConsumedFromCurrent(other.itemsConsumedFromCurrent), currentProducer(other.currentProducer), desiredProducer(other.desiredProducer)
373 {
374 }
375
376 inline ConsumerToken& operator=(ConsumerToken&& other) noexcept
377 {
378 swap(other);
379 return *this;
380 }
381
382 void swap(ConsumerToken& other) noexcept
383 {
384 std::swap(initialOffset, other.initialOffset);
385 std::swap(lastKnownGlobalOffset, other.lastKnownGlobalOffset);
386 std::swap(itemsConsumedFromCurrent, other.itemsConsumedFromCurrent);
387 std::swap(currentProducer, other.currentProducer);
388 std::swap(desiredProducer, other.desiredProducer);
389 }
390
391 // Disable copying and assignment
392 ConsumerToken(ConsumerToken const&) = delete;
393 ConsumerToken& operator=(ConsumerToken const&) = delete;
394
395private:
396 template<typename T, typename Traits> friend class ConcurrentQueue;
397
398private: // but shared with ConcurrentQueue
399 std::uint32_t initialOffset;
400 std::uint32_t lastKnownGlobalOffset;
401 std::uint32_t itemsConsumedFromCurrent;
402 details::ConcurrentQueueProducerTypelessBase* currentProducer;
403 details::ConcurrentQueueProducerTypelessBase* desiredProducer;
404};
405
406
407template<typename T, typename Traits = ConcurrentQueueDefaultTraits>
408class ConcurrentQueue
409{
410public:
411 struct ExplicitProducer;
412
413 typedef moodycamel::ProducerToken producer_token_t;
414 typedef moodycamel::ConsumerToken consumer_token_t;
415
416 typedef typename Traits::index_t index_t;
417 typedef typename Traits::size_t size_t;
418
419 static const size_t BLOCK_SIZE = static_cast<size_t>(Traits::BLOCK_SIZE);
420 static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD = static_cast<size_t>(Traits::EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD);
421 static const size_t EXPLICIT_INITIAL_INDEX_SIZE = static_cast<size_t>(Traits::EXPLICIT_INITIAL_INDEX_SIZE);
422 static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE = static_cast<std::uint32_t>(Traits::EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE);
423#ifdef _MSC_VER
424#pragma warning(push)
425#pragma warning(disable: 4307) // + integral constant overflow (that's what the ternary expression is for!)
426#pragma warning(disable: 4309) // static_cast: Truncation of constant value
427#endif
428 static const size_t MAX_SUBQUEUE_SIZE = (details::const_numeric_max<size_t>::value - static_cast<size_t>(Traits::MAX_SUBQUEUE_SIZE) < BLOCK_SIZE) ? details::const_numeric_max<size_t>::value : ((static_cast<size_t>(Traits::MAX_SUBQUEUE_SIZE) + (BLOCK_SIZE - 1)) / BLOCK_SIZE * BLOCK_SIZE);
429#ifdef _MSC_VER
430#pragma warning(pop)
431#endif
432
433 static_assert(!std::numeric_limits<size_t>::is_signed && std::is_integral<size_t>::value, "Traits::size_t must be an unsigned integral type");
434 static_assert(!std::numeric_limits<index_t>::is_signed && std::is_integral<index_t>::value, "Traits::index_t must be an unsigned integral type");
435 static_assert(sizeof(index_t) >= sizeof(size_t), "Traits::index_t must be at least as wide as Traits::size_t");
436 static_assert((BLOCK_SIZE > 1) && !(BLOCK_SIZE & (BLOCK_SIZE - 1)), "Traits::BLOCK_SIZE must be a power of 2 (and at least 2)");
437 static_assert((EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD > 1) && !(EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD & (EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD - 1)), "Traits::EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD must be a power of 2 (and greater than 1)");
438 static_assert((EXPLICIT_INITIAL_INDEX_SIZE > 1) && !(EXPLICIT_INITIAL_INDEX_SIZE & (EXPLICIT_INITIAL_INDEX_SIZE - 1)), "Traits::EXPLICIT_INITIAL_INDEX_SIZE must be a power of 2 (and greater than 1)");
439
440public:
441 // Creates a queue with at least `capacity` element slots; note that the
442 // actual number of elements that can be inserted without additional memory
443 // allocation depends on the number of producers and the block size (e.g. if
444 // the block size is equal to `capacity`, only a single block will be allocated
445 // up-front, which means only a single producer will be able to enqueue elements
446 // without an extra allocation -- blocks aren't shared between producers).
447 // This method is not thread safe -- it is up to the user to ensure that the
448 // queue is fully constructed before it starts being used by other threads (this
449 // includes making the memory effects of construction visible, possibly with a
450 // memory barrier).
451 explicit ConcurrentQueue(size_t capacity = 6 * BLOCK_SIZE)
452 : producerListTail(nullptr),
453 producerCount(0),
454 initialBlockPoolIndex(0),
455 nextExplicitConsumerId(0),
456 globalExplicitConsumerOffset(0)
457 {
458 populate_initial_block_list(capacity / BLOCK_SIZE + ((capacity & (BLOCK_SIZE - 1)) == 0 ? 0 : 1));
459 }
460
461 // Computes the correct amount of pre-allocated blocks for you based
462 // on the minimum number of elements you want available at any given
463 // time, and the maximum concurrent number of each type of producer.
464 ConcurrentQueue(size_t minCapacity, size_t maxExplicitProducers)
465 : producerListTail(nullptr),
466 producerCount(0),
467 initialBlockPoolIndex(0),
468 nextExplicitConsumerId(0),
469 globalExplicitConsumerOffset(0)
470 {
471 size_t blocks = (((minCapacity + BLOCK_SIZE - 1) / BLOCK_SIZE) - 1) * (maxExplicitProducers + 1) + 2 * (maxExplicitProducers);
472 populate_initial_block_list(blocks);
473 }
474
475 // Note: The queue should not be accessed concurrently while it's
476 // being deleted. It's up to the user to synchronize this.
477 // This method is not thread safe.
478 ~ConcurrentQueue()
479 {
480 // Destroy producers
481 auto ptr = producerListTail.load(std::memory_order_relaxed);
482 while (ptr != nullptr) {
483 auto next = ptr->next_prod();
484 if (ptr->token != nullptr) {
485 ptr->token->producer = nullptr;
486 }
487 destroy(ptr);
488 ptr = next;
489 }
490
491 // Destroy global free list
492 auto block = freeList.head_unsafe();
493 while (block != nullptr) {
494 auto next = block->freeListNext.load(std::memory_order_relaxed);
495 if (block->dynamicallyAllocated) {
496 destroy(block);
497 }
498 block = next;
499 }
500
501 // Destroy initial free list
502 destroy_array(initialBlockPool, initialBlockPoolSize);
503 }
504
505 // Disable copying and copy assignment
506 ConcurrentQueue(ConcurrentQueue const&) = delete;
507 ConcurrentQueue(ConcurrentQueue&& other) = delete;
508 ConcurrentQueue& operator=(ConcurrentQueue const&) = delete;
509 ConcurrentQueue& operator=(ConcurrentQueue&& other) = delete;
510
511public:
512 tracy_force_inline T* enqueue_begin(producer_token_t const& token, index_t& currentTailIndex)
513 {
514 return static_cast<ExplicitProducer*>(token.producer)->ConcurrentQueue::ExplicitProducer::enqueue_begin(currentTailIndex);
515 }
516
517 template<class NotifyThread, class ProcessData>
518 size_t try_dequeue_bulk_single(consumer_token_t& token, NotifyThread notifyThread, ProcessData processData )
519 {
520 if (token.desiredProducer == nullptr || token.lastKnownGlobalOffset != globalExplicitConsumerOffset.load(std::memory_order_relaxed)) {
521 if (!update_current_producer_after_rotation(token)) {
522 return 0;
523 }
524 }
525
526 size_t count = static_cast<ProducerBase*>(token.currentProducer)->dequeue_bulk(notifyThread, processData);
527 token.itemsConsumedFromCurrent += static_cast<std::uint32_t>(count);
528
529 auto tail = producerListTail.load(std::memory_order_acquire);
530 auto ptr = static_cast<ProducerBase*>(token.currentProducer)->next_prod();
531 if (ptr == nullptr) {
532 ptr = tail;
533 }
534 if( count == 0 )
535 {
536 while (ptr != static_cast<ProducerBase*>(token.currentProducer)) {
537 auto dequeued = ptr->dequeue_bulk(notifyThread, processData);
538 if (dequeued != 0) {
539 token.currentProducer = ptr;
540 token.itemsConsumedFromCurrent = static_cast<std::uint32_t>(dequeued);
541 return dequeued;
542 }
543 ptr = ptr->next_prod();
544 if (ptr == nullptr) {
545 ptr = tail;
546 }
547 }
548 return 0;
549 }
550 else
551 {
552 token.currentProducer = ptr;
553 token.itemsConsumedFromCurrent = 0;
554 return count;
555 }
556 }
557
558
559 // Returns an estimate of the total number of elements currently in the queue. This
560 // estimate is only accurate if the queue has completely stabilized before it is called
561 // (i.e. all enqueue and dequeue operations have completed and their memory effects are
562 // visible on the calling thread, and no further operations start while this method is
563 // being called).
564 // Thread-safe.
565 size_t size_approx() const
566 {
567 size_t size = 0;
568 for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) {
569 size += ptr->size_approx();
570 }
571 return size;
572 }
573
574
575 // Returns true if the underlying atomic variables used by
576 // the queue are lock-free (they should be on most platforms).
577 // Thread-safe.
578 static bool is_lock_free()
579 {
580 return
581 details::static_is_lock_free<bool>::value == 2 &&
582 details::static_is_lock_free<size_t>::value == 2 &&
583 details::static_is_lock_free<std::uint32_t>::value == 2 &&
584 details::static_is_lock_free<index_t>::value == 2 &&
585 details::static_is_lock_free<void*>::value == 2;
586 }
587
588
589private:
590 friend struct ProducerToken;
591 friend struct ConsumerToken;
592 friend struct ExplicitProducer;
593
594
595 ///////////////////////////////
596 // Queue methods
597 ///////////////////////////////
598
599 inline bool update_current_producer_after_rotation(consumer_token_t& token)
600 {
601 // Ah, there's been a rotation, figure out where we should be!
602 auto tail = producerListTail.load(std::memory_order_acquire);
603 if (token.desiredProducer == nullptr && tail == nullptr) {
604 return false;
605 }
606 auto prodCount = producerCount.load(std::memory_order_relaxed);
607 auto globalOffset = globalExplicitConsumerOffset.load(std::memory_order_relaxed);
608 if (details::cqUnlikely(token.desiredProducer == nullptr)) {
609 // Aha, first time we're dequeueing anything.
610 // Figure out our local position
611 // Note: offset is from start, not end, but we're traversing from end -- subtract from count first
612 std::uint32_t offset = prodCount - 1 - (token.initialOffset % prodCount);
613 token.desiredProducer = tail;
614 for (std::uint32_t i = 0; i != offset; ++i) {
615 token.desiredProducer = static_cast<ProducerBase*>(token.desiredProducer)->next_prod();
616 if (token.desiredProducer == nullptr) {
617 token.desiredProducer = tail;
618 }
619 }
620 }
621
622 std::uint32_t delta = globalOffset - token.lastKnownGlobalOffset;
623 if (delta >= prodCount) {
624 delta = delta % prodCount;
625 }
626 for (std::uint32_t i = 0; i != delta; ++i) {
627 token.desiredProducer = static_cast<ProducerBase*>(token.desiredProducer)->next_prod();
628 if (token.desiredProducer == nullptr) {
629 token.desiredProducer = tail;
630 }
631 }
632
633 token.lastKnownGlobalOffset = globalOffset;
634 token.currentProducer = token.desiredProducer;
635 token.itemsConsumedFromCurrent = 0;
636 return true;
637 }
638
639
640 ///////////////////////////
641 // Free list
642 ///////////////////////////
643
644 template <typename N>
645 struct FreeListNode
646 {
647 FreeListNode() : freeListRefs(0), freeListNext(nullptr) { }
648
649 std::atomic<std::uint32_t> freeListRefs;
650 std::atomic<N*> freeListNext;
651 };
652
653 // A simple CAS-based lock-free free list. Not the fastest thing in the world under heavy contention, but
654 // simple and correct (assuming nodes are never freed until after the free list is destroyed), and fairly
655 // speedy under low contention.
656 template<typename N> // N must inherit FreeListNode or have the same fields (and initialization of them)
657 struct FreeList
658 {
659 FreeList() : freeListHead(nullptr) { }
660 FreeList(FreeList&& other) : freeListHead(other.freeListHead.load(std::memory_order_relaxed)) { other.freeListHead.store(nullptr, std::memory_order_relaxed); }
661 void swap(FreeList& other) { details::swap_relaxed(freeListHead, other.freeListHead); }
662
663 FreeList(FreeList const&) = delete;
664 FreeList& operator=(FreeList const&) = delete;
665
666 inline void add(N* node)
667 {
668 // We know that the should-be-on-freelist bit is 0 at this point, so it's safe to
669 // set it using a fetch_add
670 if (node->freeListRefs.fetch_add(SHOULD_BE_ON_FREELIST, std::memory_order_acq_rel) == 0) {
671 // Oh look! We were the last ones referencing this node, and we know
672 // we want to add it to the free list, so let's do it!
673 add_knowing_refcount_is_zero(node);
674 }
675 }
676
677 inline N* try_get()
678 {
679 auto head = freeListHead.load(std::memory_order_acquire);
680 while (head != nullptr) {
681 auto prevHead = head;
682 auto refs = head->freeListRefs.load(std::memory_order_relaxed);
683 if ((refs & REFS_MASK) == 0 || !head->freeListRefs.compare_exchange_strong(refs, refs + 1, std::memory_order_acquire, std::memory_order_relaxed)) {
684 head = freeListHead.load(std::memory_order_acquire);
685 continue;
686 }
687
688 // Good, reference count has been incremented (it wasn't at zero), which means we can read the
689 // next and not worry about it changing between now and the time we do the CAS
690 auto next = head->freeListNext.load(std::memory_order_relaxed);
691 if (freeListHead.compare_exchange_strong(head, next, std::memory_order_acquire, std::memory_order_relaxed)) {
692 // Yay, got the node. This means it was on the list, which means shouldBeOnFreeList must be false no
693 // matter the refcount (because nobody else knows it's been taken off yet, it can't have been put back on).
694 assert((head->freeListRefs.load(std::memory_order_relaxed) & SHOULD_BE_ON_FREELIST) == 0);
695
696 // Decrease refcount twice, once for our ref, and once for the list's ref
697 head->freeListRefs.fetch_sub(2, std::memory_order_release);
698 return head;
699 }
700
701 // OK, the head must have changed on us, but we still need to decrease the refcount we increased.
702 // Note that we don't need to release any memory effects, but we do need to ensure that the reference
703 // count decrement happens-after the CAS on the head.
704 refs = prevHead->freeListRefs.fetch_sub(1, std::memory_order_acq_rel);
705 if (refs == SHOULD_BE_ON_FREELIST + 1) {
706 add_knowing_refcount_is_zero(prevHead);
707 }
708 }
709
710 return nullptr;
711 }
712
713 // Useful for traversing the list when there's no contention (e.g. to destroy remaining nodes)
714 N* head_unsafe() const { return freeListHead.load(std::memory_order_relaxed); }
715
716 private:
717 inline void add_knowing_refcount_is_zero(N* node)
718 {
719 // Since the refcount is zero, and nobody can increase it once it's zero (except us, and we run
720 // only one copy of this method per node at a time, i.e. the single thread case), then we know
721 // we can safely change the next pointer of the node; however, once the refcount is back above
722 // zero, then other threads could increase it (happens under heavy contention, when the refcount
723 // goes to zero in between a load and a refcount increment of a node in try_get, then back up to
724 // something non-zero, then the refcount increment is done by the other thread) -- so, if the CAS
725 // to add the node to the actual list fails, decrease the refcount and leave the add operation to
726 // the next thread who puts the refcount back at zero (which could be us, hence the loop).
727 auto head = freeListHead.load(std::memory_order_relaxed);
728 while (true) {
729 node->freeListNext.store(head, std::memory_order_relaxed);
730 node->freeListRefs.store(1, std::memory_order_release);
731 if (!freeListHead.compare_exchange_strong(head, node, std::memory_order_release, std::memory_order_relaxed)) {
732 // Hmm, the add failed, but we can only try again when the refcount goes back to zero
733 if (node->freeListRefs.fetch_add(SHOULD_BE_ON_FREELIST - 1, std::memory_order_release) == 1) {
734 continue;
735 }
736 }
737 return;
738 }
739 }
740
741 private:
742 // Implemented like a stack, but where node order doesn't matter (nodes are inserted out of order under contention)
743 std::atomic<N*> freeListHead;
744
745 static const std::uint32_t REFS_MASK = 0x7FFFFFFF;
746 static const std::uint32_t SHOULD_BE_ON_FREELIST = 0x80000000;
747 };
748
749
750 ///////////////////////////
751 // Block
752 ///////////////////////////
753
754 struct Block
755 {
756 Block()
757 : next(nullptr), elementsCompletelyDequeued(0), freeListRefs(0), freeListNext(nullptr), shouldBeOnFreeList(false), dynamicallyAllocated(true)
758 {
759 }
760
761 inline bool is_empty() const
762 {
763 if (compile_time_condition<BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD>::value) {
764 // Check flags
765 for (size_t i = 0; i < BLOCK_SIZE; ++i) {
766 if (!emptyFlags[i].load(std::memory_order_relaxed)) {
767 return false;
768 }
769 }
770
771 // Aha, empty; make sure we have all other memory effects that happened before the empty flags were set
772 std::atomic_thread_fence(std::memory_order_acquire);
773 return true;
774 }
775 else {
776 // Check counter
777 if (elementsCompletelyDequeued.load(std::memory_order_relaxed) == BLOCK_SIZE) {
778 std::atomic_thread_fence(std::memory_order_acquire);
779 return true;
780 }
781 assert(elementsCompletelyDequeued.load(std::memory_order_relaxed) <= BLOCK_SIZE);
782 return false;
783 }
784 }
785
786 // Returns true if the block is now empty (does not apply in explicit context)
787 inline bool set_empty(index_t i)
788 {
789 if (BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) {
790 // Set flag
791 assert(!emptyFlags[BLOCK_SIZE - 1 - static_cast<size_t>(i & static_cast<index_t>(BLOCK_SIZE - 1))].load(std::memory_order_relaxed));
792 emptyFlags[BLOCK_SIZE - 1 - static_cast<size_t>(i & static_cast<index_t>(BLOCK_SIZE - 1))].store(true, std::memory_order_release);
793 return false;
794 }
795 else {
796 // Increment counter
797 auto prevVal = elementsCompletelyDequeued.fetch_add(1, std::memory_order_release);
798 assert(prevVal < BLOCK_SIZE);
799 return prevVal == BLOCK_SIZE - 1;
800 }
801 }
802
803 // Sets multiple contiguous item statuses to 'empty' (assumes no wrapping and count > 0).
804 // Returns true if the block is now empty (does not apply in explicit context).
805 inline bool set_many_empty(index_t i, size_t count)
806 {
807 if (compile_time_condition<BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD>::value) {
808 // Set flags
809 std::atomic_thread_fence(std::memory_order_release);
810 i = BLOCK_SIZE - 1 - static_cast<size_t>(i & static_cast<index_t>(BLOCK_SIZE - 1)) - count + 1;
811 for (size_t j = 0; j != count; ++j) {
812 assert(!emptyFlags[i + j].load(std::memory_order_relaxed));
813 emptyFlags[i + j].store(true, std::memory_order_relaxed);
814 }
815 return false;
816 }
817 else {
818 // Increment counter
819 auto prevVal = elementsCompletelyDequeued.fetch_add(count, std::memory_order_release);
820 assert(prevVal + count <= BLOCK_SIZE);
821 return prevVal + count == BLOCK_SIZE;
822 }
823 }
824
825 inline void set_all_empty()
826 {
827 if (BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) {
828 // Set all flags
829 for (size_t i = 0; i != BLOCK_SIZE; ++i) {
830 emptyFlags[i].store(true, std::memory_order_relaxed);
831 }
832 }
833 else {
834 // Reset counter
835 elementsCompletelyDequeued.store(BLOCK_SIZE, std::memory_order_relaxed);
836 }
837 }
838
839 inline void reset_empty()
840 {
841 if (compile_time_condition<BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD>::value) {
842 // Reset flags
843 for (size_t i = 0; i != BLOCK_SIZE; ++i) {
844 emptyFlags[i].store(false, std::memory_order_relaxed);
845 }
846 }
847 else {
848 // Reset counter
849 elementsCompletelyDequeued.store(0, std::memory_order_relaxed);
850 }
851 }
852
853 inline T* operator[](index_t idx) noexcept { return static_cast<T*>(static_cast<void*>(elements)) + static_cast<size_t>(idx & static_cast<index_t>(BLOCK_SIZE - 1)); }
854 inline T const* operator[](index_t idx) const noexcept { return static_cast<T const*>(static_cast<void const*>(elements)) + static_cast<size_t>(idx & static_cast<index_t>(BLOCK_SIZE - 1)); }
855
856 private:
857 // IMPORTANT: This must be the first member in Block, so that if T depends on the alignment of
858 // addresses returned by malloc, that alignment will be preserved. Apparently clang actually
859 // generates code that uses this assumption for AVX instructions in some cases. Ideally, we
860 // should also align Block to the alignment of T in case it's higher than malloc's 16-byte
861 // alignment, but this is hard to do in a cross-platform way. Assert for this case:
862 static_assert(std::alignment_of<T>::value <= std::alignment_of<details::max_align_t>::value, "The queue does not support super-aligned types at this time");
863 // Additionally, we need the alignment of Block itself to be a multiple of max_align_t since
864 // otherwise the appropriate padding will not be added at the end of Block in order to make
865 // arrays of Blocks all be properly aligned (not just the first one). We use a union to force
866 // this.
867 union {
868 char elements[sizeof(T) * BLOCK_SIZE];
869 details::max_align_t dummy;
870 };
871 public:
872 Block* next;
873 std::atomic<size_t> elementsCompletelyDequeued;
874 std::atomic<bool> emptyFlags[BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD ? BLOCK_SIZE : 1];
875 public:
876 std::atomic<std::uint32_t> freeListRefs;
877 std::atomic<Block*> freeListNext;
878 std::atomic<bool> shouldBeOnFreeList;
879 bool dynamicallyAllocated; // Perhaps a better name for this would be 'isNotPartOfInitialBlockPool'
880 };
881 static_assert(std::alignment_of<Block>::value >= std::alignment_of<details::max_align_t>::value, "Internal error: Blocks must be at least as aligned as the type they are wrapping");
882
883
884 ///////////////////////////
885 // Producer base
886 ///////////////////////////
887
888 struct ProducerBase : public details::ConcurrentQueueProducerTypelessBase
889 {
890 ProducerBase(ConcurrentQueue* parent_) :
891 tailIndex(0),
892 headIndex(0),
893 dequeueOptimisticCount(0),
894 dequeueOvercommit(0),
895 tailBlock(nullptr),
896 parent(parent_)
897 {
898 }
899
900 virtual ~ProducerBase() { };
901
902 template<class NotifyThread, class ProcessData>
903 inline size_t dequeue_bulk(NotifyThread notifyThread, ProcessData processData)
904 {
905 return static_cast<ExplicitProducer*>(this)->dequeue_bulk(notifyThread, processData);
906 }
907
908 inline ProducerBase* next_prod() const { return static_cast<ProducerBase*>(next); }
909
910 inline size_t size_approx() const
911 {
912 auto tail = tailIndex.load(std::memory_order_relaxed);
913 auto head = headIndex.load(std::memory_order_relaxed);
914 return details::circular_less_than(head, tail) ? static_cast<size_t>(tail - head) : 0;
915 }
916
917 inline index_t getTail() const { return tailIndex.load(std::memory_order_relaxed); }
918 protected:
919 std::atomic<index_t> tailIndex; // Where to enqueue to next
920 std::atomic<index_t> headIndex; // Where to dequeue from next
921
922 std::atomic<index_t> dequeueOptimisticCount;
923 std::atomic<index_t> dequeueOvercommit;
924
925 Block* tailBlock;
926
927 public:
928 ConcurrentQueue* parent;
929 };
930
931
932 public:
933 ///////////////////////////
934 // Explicit queue
935 ///////////////////////////
936 struct ExplicitProducer : public ProducerBase
937 {
938 explicit ExplicitProducer(ConcurrentQueue* _parent) :
939 ProducerBase(_parent),
940 blockIndex(nullptr),
941 pr_blockIndexSlotsUsed(0),
942 pr_blockIndexSize(EXPLICIT_INITIAL_INDEX_SIZE >> 1),
943 pr_blockIndexFront(0),
944 pr_blockIndexEntries(nullptr),
945 pr_blockIndexRaw(nullptr)
946 {
947 size_t poolBasedIndexSize = details::ceil_to_pow_2(_parent->initialBlockPoolSize) >> 1;
948 if (poolBasedIndexSize > pr_blockIndexSize) {
949 pr_blockIndexSize = poolBasedIndexSize;
950 }
951
952 new_block_index(0); // This creates an index with double the number of current entries, i.e. EXPLICIT_INITIAL_INDEX_SIZE
953 }
954
955 ~ExplicitProducer()
956 {
957 // Destruct any elements not yet dequeued.
958 // Since we're in the destructor, we can assume all elements
959 // are either completely dequeued or completely not (no halfways).
960 if (this->tailBlock != nullptr) { // Note this means there must be a block index too
961 // First find the block that's partially dequeued, if any
962 Block* halfDequeuedBlock = nullptr;
963 if ((this->headIndex.load(std::memory_order_relaxed) & static_cast<index_t>(BLOCK_SIZE - 1)) != 0) {
964 // The head's not on a block boundary, meaning a block somewhere is partially dequeued
965 // (or the head block is the tail block and was fully dequeued, but the head/tail are still not on a boundary)
966 size_t i = (pr_blockIndexFront - pr_blockIndexSlotsUsed) & (pr_blockIndexSize - 1);
967 while (details::circular_less_than<index_t>(pr_blockIndexEntries[i].base + BLOCK_SIZE, this->headIndex.load(std::memory_order_relaxed))) {
968 i = (i + 1) & (pr_blockIndexSize - 1);
969 }
970 assert(details::circular_less_than<index_t>(pr_blockIndexEntries[i].base, this->headIndex.load(std::memory_order_relaxed)));
971 halfDequeuedBlock = pr_blockIndexEntries[i].block;
972 }
973
974 // Start at the head block (note the first line in the loop gives us the head from the tail on the first iteration)
975 auto block = this->tailBlock;
976 do {
977 block = block->next;
978 if (block->ConcurrentQueue::Block::is_empty()) {
979 continue;
980 }
981
982 size_t i = 0; // Offset into block
983 if (block == halfDequeuedBlock) {
984 i = static_cast<size_t>(this->headIndex.load(std::memory_order_relaxed) & static_cast<index_t>(BLOCK_SIZE - 1));
985 }
986
987 // Walk through all the items in the block; if this is the tail block, we need to stop when we reach the tail index
988 auto lastValidIndex = (this->tailIndex.load(std::memory_order_relaxed) & static_cast<index_t>(BLOCK_SIZE - 1)) == 0 ? BLOCK_SIZE : static_cast<size_t>(this->tailIndex.load(std::memory_order_relaxed) & static_cast<index_t>(BLOCK_SIZE - 1));
989 while (i != BLOCK_SIZE && (block != this->tailBlock || i != lastValidIndex)) {
990 (*block)[i++]->~T();
991 }
992 } while (block != this->tailBlock);
993 }
994
995 // Destroy all blocks that we own
996 if (this->tailBlock != nullptr) {
997 auto block = this->tailBlock;
998 do {
999 auto nextBlock = block->next;
1000 if (block->dynamicallyAllocated) {
1001 destroy(block);
1002 }
1003 else {
1004 this->parent->add_block_to_free_list(block);
1005 }
1006 block = nextBlock;
1007 } while (block != this->tailBlock);
1008 }
1009
1010 // Destroy the block indices
1011 auto header = static_cast<BlockIndexHeader*>(pr_blockIndexRaw);
1012 while (header != nullptr) {
1013 auto prev = static_cast<BlockIndexHeader*>(header->prev);
1014 header->~BlockIndexHeader();
1015 (Traits::free)(header);
1016 header = prev;
1017 }
1018 }
1019
1020 inline void enqueue_begin_alloc(index_t currentTailIndex)
1021 {
1022 // We reached the end of a block, start a new one
1023 if (this->tailBlock != nullptr && this->tailBlock->next->ConcurrentQueue::Block::is_empty()) {
1024 // We can re-use the block ahead of us, it's empty!
1025 this->tailBlock = this->tailBlock->next;
1026 this->tailBlock->ConcurrentQueue::Block::reset_empty();
1027
1028 // We'll put the block on the block index (guaranteed to be room since we're conceptually removing the
1029 // last block from it first -- except instead of removing then adding, we can just overwrite).
1030 // Note that there must be a valid block index here, since even if allocation failed in the ctor,
1031 // it would have been re-attempted when adding the first block to the queue; since there is such
1032 // a block, a block index must have been successfully allocated.
1033 }
1034 else {
1035 // We're going to need a new block; check that the block index has room
1036 if (pr_blockIndexRaw == nullptr || pr_blockIndexSlotsUsed == pr_blockIndexSize) {
1037 // Hmm, the circular block index is already full -- we'll need
1038 // to allocate a new index. Note pr_blockIndexRaw can only be nullptr if
1039 // the initial allocation failed in the constructor.
1040 new_block_index(pr_blockIndexSlotsUsed);
1041 }
1042
1043 // Insert a new block in the circular linked list
1044 auto newBlock = this->parent->ConcurrentQueue::requisition_block();
1045 newBlock->ConcurrentQueue::Block::reset_empty();
1046 if (this->tailBlock == nullptr) {
1047 newBlock->next = newBlock;
1048 }
1049 else {
1050 newBlock->next = this->tailBlock->next;
1051 this->tailBlock->next = newBlock;
1052 }
1053 this->tailBlock = newBlock;
1054 ++pr_blockIndexSlotsUsed;
1055 }
1056
1057 // Add block to block index
1058 auto& entry = blockIndex.load(std::memory_order_relaxed)->entries[pr_blockIndexFront];
1059 entry.base = currentTailIndex;
1060 entry.block = this->tailBlock;
1061 blockIndex.load(std::memory_order_relaxed)->front.store(pr_blockIndexFront, std::memory_order_release);
1062 pr_blockIndexFront = (pr_blockIndexFront + 1) & (pr_blockIndexSize - 1);
1063 }
1064
1065 tracy_force_inline T* enqueue_begin(index_t& currentTailIndex)
1066 {
1067 currentTailIndex = this->tailIndex.load(std::memory_order_relaxed);
1068 if (details::cqUnlikely((currentTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0)) {
1069 this->enqueue_begin_alloc(currentTailIndex);
1070 }
1071 return (*this->tailBlock)[currentTailIndex];
1072 }
1073
1074 tracy_force_inline std::atomic<index_t>& get_tail_index()
1075 {
1076 return this->tailIndex;
1077 }
1078
1079 template<class NotifyThread, class ProcessData>
1080 size_t dequeue_bulk(NotifyThread notifyThread, ProcessData processData)
1081 {
1082 auto tail = this->tailIndex.load(std::memory_order_relaxed);
1083 auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
1084 auto desiredCount = static_cast<size_t>(tail - (this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit));
1085 if (details::circular_less_than<size_t>(0, desiredCount)) {
1086 desiredCount = desiredCount < 8192 ? desiredCount : 8192;
1087 std::atomic_thread_fence(std::memory_order_acquire);
1088
1089 auto myDequeueCount = this->dequeueOptimisticCount.fetch_add(desiredCount, std::memory_order_relaxed);
1090 assert(overcommit <= myDequeueCount);
1091
1092 tail = this->tailIndex.load(std::memory_order_acquire);
1093 auto actualCount = static_cast<size_t>(tail - (myDequeueCount - overcommit));
1094 if (details::circular_less_than<size_t>(0, actualCount)) {
1095 actualCount = desiredCount < actualCount ? desiredCount : actualCount;
1096 if (actualCount < desiredCount) {
1097 this->dequeueOvercommit.fetch_add(desiredCount - actualCount, std::memory_order_release);
1098 }
1099
1100 // Get the first index. Note that since there's guaranteed to be at least actualCount elements, this
1101 // will never exceed tail.
1102 auto firstIndex = this->headIndex.fetch_add(actualCount, std::memory_order_acq_rel);
1103
1104 // Determine which block the first element is in
1105 auto localBlockIndex = blockIndex.load(std::memory_order_acquire);
1106 auto localBlockIndexHead = localBlockIndex->front.load(std::memory_order_acquire);
1107
1108 auto headBase = localBlockIndex->entries[localBlockIndexHead].base;
1109 auto firstBlockBaseIndex = firstIndex & ~static_cast<index_t>(BLOCK_SIZE - 1);
1110 auto offset = static_cast<size_t>(static_cast<typename std::make_signed<index_t>::type>(firstBlockBaseIndex - headBase) / BLOCK_SIZE);
1111 auto indexIndex = (localBlockIndexHead + offset) & (localBlockIndex->size - 1);
1112
1113 notifyThread( this->threadId );
1114
1115 // Iterate the blocks and dequeue
1116 auto index = firstIndex;
1117 do {
1118 auto firstIndexInBlock = index;
1119 auto endIndex = (index & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
1120 endIndex = details::circular_less_than<index_t>(firstIndex + static_cast<index_t>(actualCount), endIndex) ? firstIndex + static_cast<index_t>(actualCount) : endIndex;
1121 auto block = localBlockIndex->entries[indexIndex].block;
1122
1123 const auto sz = endIndex - index;
1124 processData( (*block)[index], sz );
1125 index += sz;
1126
1127 block->ConcurrentQueue::Block::set_many_empty(firstIndexInBlock, static_cast<size_t>(endIndex - firstIndexInBlock));
1128 indexIndex = (indexIndex + 1) & (localBlockIndex->size - 1);
1129 } while (index != firstIndex + actualCount);
1130
1131 return actualCount;
1132 }
1133 else {
1134 // Wasn't anything to dequeue after all; make the effective dequeue count eventually consistent
1135 this->dequeueOvercommit.fetch_add(desiredCount, std::memory_order_release);
1136 }
1137 }
1138
1139 return 0;
1140 }
1141
1142 private:
1143 struct BlockIndexEntry
1144 {
1145 index_t base;
1146 Block* block;
1147 };
1148
1149 struct BlockIndexHeader
1150 {
1151 size_t size;
1152 std::atomic<size_t> front; // Current slot (not next, like pr_blockIndexFront)
1153 BlockIndexEntry* entries;
1154 void* prev;
1155 };
1156
1157
1158 bool new_block_index(size_t numberOfFilledSlotsToExpose)
1159 {
1160 auto prevBlockSizeMask = pr_blockIndexSize - 1;
1161
1162 // Create the new block
1163 pr_blockIndexSize <<= 1;
1164 auto newRawPtr = static_cast<char*>((Traits::malloc)(sizeof(BlockIndexHeader) + std::alignment_of<BlockIndexEntry>::value - 1 + sizeof(BlockIndexEntry) * pr_blockIndexSize));
1165 if (newRawPtr == nullptr) {
1166 pr_blockIndexSize >>= 1; // Reset to allow graceful retry
1167 return false;
1168 }
1169
1170 auto newBlockIndexEntries = reinterpret_cast<BlockIndexEntry*>(details::align_for<BlockIndexEntry>(newRawPtr + sizeof(BlockIndexHeader)));
1171
1172 // Copy in all the old indices, if any
1173 size_t j = 0;
1174 if (pr_blockIndexSlotsUsed != 0) {
1175 auto i = (pr_blockIndexFront - pr_blockIndexSlotsUsed) & prevBlockSizeMask;
1176 do {
1177 newBlockIndexEntries[j++] = pr_blockIndexEntries[i];
1178 i = (i + 1) & prevBlockSizeMask;
1179 } while (i != pr_blockIndexFront);
1180 }
1181
1182 // Update everything
1183 auto header = new (newRawPtr) BlockIndexHeader;
1184 header->size = pr_blockIndexSize;
1185 header->front.store(numberOfFilledSlotsToExpose - 1, std::memory_order_relaxed);
1186 header->entries = newBlockIndexEntries;
1187 header->prev = pr_blockIndexRaw; // we link the new block to the old one so we can free it later
1188
1189 pr_blockIndexFront = j;
1190 pr_blockIndexEntries = newBlockIndexEntries;
1191 pr_blockIndexRaw = newRawPtr;
1192 blockIndex.store(header, std::memory_order_release);
1193
1194 return true;
1195 }
1196
1197 private:
1198 std::atomic<BlockIndexHeader*> blockIndex;
1199
1200 // To be used by producer only -- consumer must use the ones in referenced by blockIndex
1201 size_t pr_blockIndexSlotsUsed;
1202 size_t pr_blockIndexSize;
1203 size_t pr_blockIndexFront; // Next slot (not current)
1204 BlockIndexEntry* pr_blockIndexEntries;
1205 void* pr_blockIndexRaw;
1206 };
1207
1208 ExplicitProducer* get_explicit_producer(producer_token_t const& token)
1209 {
1210 return static_cast<ExplicitProducer*>(token.producer);
1211 }
1212
1213 private:
1214
1215 //////////////////////////////////
1216 // Block pool manipulation
1217 //////////////////////////////////
1218
1219 void populate_initial_block_list(size_t blockCount)
1220 {
1221 initialBlockPoolSize = blockCount;
1222 if (initialBlockPoolSize == 0) {
1223 initialBlockPool = nullptr;
1224 return;
1225 }
1226
1227 initialBlockPool = create_array<Block>(blockCount);
1228 if (initialBlockPool == nullptr) {
1229 initialBlockPoolSize = 0;
1230 }
1231 for (size_t i = 0; i < initialBlockPoolSize; ++i) {
1232 initialBlockPool[i].dynamicallyAllocated = false;
1233 }
1234 }
1235
1236 inline Block* try_get_block_from_initial_pool()
1237 {
1238 if (initialBlockPoolIndex.load(std::memory_order_relaxed) >= initialBlockPoolSize) {
1239 return nullptr;
1240 }
1241
1242 auto index = initialBlockPoolIndex.fetch_add(1, std::memory_order_relaxed);
1243
1244 return index < initialBlockPoolSize ? (initialBlockPool + index) : nullptr;
1245 }
1246
1247 inline void add_block_to_free_list(Block* block)
1248 {
1249 freeList.add(block);
1250 }
1251
1252 inline void add_blocks_to_free_list(Block* block)
1253 {
1254 while (block != nullptr) {
1255 auto next = block->next;
1256 add_block_to_free_list(block);
1257 block = next;
1258 }
1259 }
1260
1261 inline Block* try_get_block_from_free_list()
1262 {
1263 return freeList.try_get();
1264 }
1265
1266 // Gets a free block from one of the memory pools, or allocates a new one (if applicable)
1267 Block* requisition_block()
1268 {
1269 auto block = try_get_block_from_initial_pool();
1270 if (block != nullptr) {
1271 return block;
1272 }
1273
1274 block = try_get_block_from_free_list();
1275 if (block != nullptr) {
1276 return block;
1277 }
1278
1279 return create<Block>();
1280 }
1281
1282
1283 //////////////////////////////////
1284 // Producer list manipulation
1285 //////////////////////////////////
1286
1287 ProducerBase* recycle_or_create_producer()
1288 {
1289 bool recycled;
1290 return recycle_or_create_producer(recycled);
1291 }
1292
1293 ProducerBase* recycle_or_create_producer(bool& recycled)
1294 {
1295 // Try to re-use one first
1296 for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) {
1297 if (ptr->inactive.load(std::memory_order_relaxed)) {
1298 if( ptr->size_approx() == 0 )
1299 {
1300 bool expected = true;
1301 if (ptr->inactive.compare_exchange_strong(expected, /* desired */ false, std::memory_order_acquire, std::memory_order_relaxed)) {
1302 // We caught one! It's been marked as activated, the caller can have it
1303 recycled = true;
1304 return ptr;
1305 }
1306 }
1307 }
1308 }
1309
1310 recycled = false;
1311 return add_producer(static_cast<ProducerBase*>(create<ExplicitProducer>(this)));
1312 }
1313
1314 ProducerBase* add_producer(ProducerBase* producer)
1315 {
1316 // Handle failed memory allocation
1317 if (producer == nullptr) {
1318 return nullptr;
1319 }
1320
1321 producerCount.fetch_add(1, std::memory_order_relaxed);
1322
1323 // Add it to the lock-free list
1324 auto prevTail = producerListTail.load(std::memory_order_relaxed);
1325 do {
1326 producer->next = prevTail;
1327 } while (!producerListTail.compare_exchange_weak(prevTail, producer, std::memory_order_release, std::memory_order_relaxed));
1328
1329 return producer;
1330 }
1331
1332 void reown_producers()
1333 {
1334 // After another instance is moved-into/swapped-with this one, all the
1335 // producers we stole still think their parents are the other queue.
1336 // So fix them up!
1337 for (auto ptr = producerListTail.load(std::memory_order_relaxed); ptr != nullptr; ptr = ptr->next_prod()) {
1338 ptr->parent = this;
1339 }
1340 }
1341
1342 //////////////////////////////////
1343 // Utility functions
1344 //////////////////////////////////
1345
1346 template<typename U>
1347 static inline U* create_array(size_t count)
1348 {
1349 assert(count > 0);
1350 return static_cast<U*>((Traits::malloc)(sizeof(U) * count));
1351 }
1352
1353 template<typename U>
1354 static inline void destroy_array(U* p, size_t count)
1355 {
1356 ((void)count);
1357 if (p != nullptr) {
1358 assert(count > 0);
1359 (Traits::free)(p);
1360 }
1361 }
1362
1363 template<typename U>
1364 static inline U* create()
1365 {
1366 auto p = (Traits::malloc)(sizeof(U));
1367 return new (p) U;
1368 }
1369
1370 template<typename U, typename A1>
1371 static inline U* create(A1&& a1)
1372 {
1373 auto p = (Traits::malloc)(sizeof(U));
1374 return new (p) U(std::forward<A1>(a1));
1375 }
1376
1377 template<typename U>
1378 static inline void destroy(U* p)
1379 {
1380 if (p != nullptr) {
1381 p->~U();
1382 }
1383 (Traits::free)(p);
1384 }
1385
1386private:
1387 std::atomic<ProducerBase*> producerListTail;
1388 std::atomic<std::uint32_t> producerCount;
1389
1390 std::atomic<size_t> initialBlockPoolIndex;
1391 Block* initialBlockPool;
1392 size_t initialBlockPoolSize;
1393
1394 FreeList<Block> freeList;
1395
1396 std::atomic<std::uint32_t> nextExplicitConsumerId;
1397 std::atomic<std::uint32_t> globalExplicitConsumerOffset;
1398};
1399
1400
1401template<typename T, typename Traits>
1402ProducerToken::ProducerToken(ConcurrentQueue<T, Traits>& queue)
1403 : producer(queue.recycle_or_create_producer())
1404{
1405 if (producer != nullptr) {
1406 producer->token = this;
1407 producer->threadId = detail::GetThreadHandleImpl();
1408 }
1409}
1410
1411template<typename T, typename Traits>
1412ConsumerToken::ConsumerToken(ConcurrentQueue<T, Traits>& queue)
1413 : itemsConsumedFromCurrent(0), currentProducer(nullptr), desiredProducer(nullptr)
1414{
1415 initialOffset = queue.nextExplicitConsumerId.fetch_add(1, std::memory_order_release);
1416 lastKnownGlobalOffset = static_cast<std::uint32_t>(-1);
1417}
1418
1419template<typename T, typename Traits>
1420inline void swap(ConcurrentQueue<T, Traits>& a, ConcurrentQueue<T, Traits>& b) noexcept
1421{
1422 a.swap(b);
1423}
1424
1425inline void swap(ProducerToken& a, ProducerToken& b) noexcept
1426{
1427 a.swap(b);
1428}
1429
1430inline void swap(ConsumerToken& a, ConsumerToken& b) noexcept
1431{
1432 a.swap(b);
1433}
1434
1435}
1436
1437} /* namespace tracy */
1438
1439#if defined(__GNUC__)
1440#pragma GCC diagnostic pop
1441#endif