The open source OpenXR runtime
at main 1441 lines 54 kB view raw
1// Provides a C++11 implementation of a multi-producer, multi-consumer lock-free queue. 2// An overview, including benchmark results, is provided here: 3// http://moodycamel.com/blog/2014/a-fast-general-purpose-lock-free-queue-for-c++ 4// The full design is also described in excruciating detail at: 5// http://moodycamel.com/blog/2014/detailed-design-of-a-lock-free-queue 6 7// Simplified BSD license: 8// Copyright (c) 2013-2016, Cameron Desrochers. 9// All rights reserved. 10// 11// Redistribution and use in source and binary forms, with or without modification, 12// are permitted provided that the following conditions are met: 13// 14// - Redistributions of source code must retain the above copyright notice, this list of 15// conditions and the following disclaimer. 16// - Redistributions in binary form must reproduce the above copyright notice, this list of 17// conditions and the following disclaimer in the documentation and/or other materials 18// provided with the distribution. 19// 20// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY 21// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 22// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL 23// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 24// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT 25// OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 26// HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR 27// TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, 28// EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 29 30 31#pragma once 32 33#include "../common/TracyAlloc.hpp" 34#include "../common/TracyForceInline.hpp" 35#include "../common/TracySystem.hpp" 36 37#if defined(__GNUC__) 38// Disable -Wconversion warnings (spuriously triggered when Traits::size_t and 39// Traits::index_t are set to < 32 bits, causing integer promotion, causing warnings 40// upon assigning any computed values) 41#pragma GCC diagnostic push 42#pragma GCC diagnostic ignored "-Wconversion" 43#endif 44 45#if defined(__APPLE__) 46#include "TargetConditionals.h" 47#endif 48 49#include <atomic> // Requires C++11. Sorry VS2010. 50#include <cassert> 51#include <cstddef> // for max_align_t 52#include <cstdint> 53#include <cstdlib> 54#include <type_traits> 55#include <algorithm> 56#include <utility> 57#include <limits> 58#include <climits> // for CHAR_BIT 59#include <array> 60#include <thread> // partly for __WINPTHREADS_VERSION if on MinGW-w64 w/ POSIX threading 61 62namespace tracy 63{ 64 65// Compiler-specific likely/unlikely hints 66namespace moodycamel { namespace details { 67#if defined(__GNUC__) 68 inline bool cqLikely(bool x) { return __builtin_expect((x), true); } 69 inline bool cqUnlikely(bool x) { return __builtin_expect((x), false); } 70#else 71 inline bool cqLikely(bool x) { return x; } 72 inline bool cqUnlikely(bool x) { return x; } 73#endif 74} } 75 76namespace 77{ 78 // to avoid MSVC warning 4127: conditional expression is constant 79 template <bool> 80 struct compile_time_condition 81 { 82 static const bool value = false; 83 }; 84 template <> 85 struct compile_time_condition<true> 86 { 87 static const bool value = true; 88 }; 89} 90 91namespace moodycamel { 92namespace details { 93 template<typename T> 94 struct const_numeric_max { 95 static_assert(std::is_integral<T>::value, "const_numeric_max can only be used with integers"); 96 static const T value = std::numeric_limits<T>::is_signed 97 ? (static_cast<T>(1) << (sizeof(T) * CHAR_BIT - 1)) - static_cast<T>(1) 98 : static_cast<T>(-1); 99 }; 100 101#if defined(__GLIBCXX__) 102 typedef ::max_align_t std_max_align_t; // libstdc++ forgot to add it to std:: for a while 103#else 104 typedef std::max_align_t std_max_align_t; // Others (e.g. MSVC) insist it can *only* be accessed via std:: 105#endif 106 107 // Some platforms have incorrectly set max_align_t to a type with <8 bytes alignment even while supporting 108 // 8-byte aligned scalar values (*cough* 32-bit iOS). Work around this with our own union. See issue #64. 109 typedef union { 110 std_max_align_t x; 111 long long y; 112 void* z; 113 } max_align_t; 114} 115 116// Default traits for the ConcurrentQueue. To change some of the 117// traits without re-implementing all of them, inherit from this 118// struct and shadow the declarations you wish to be different; 119// since the traits are used as a template type parameter, the 120// shadowed declarations will be used where defined, and the defaults 121// otherwise. 122struct ConcurrentQueueDefaultTraits 123{ 124 // General-purpose size type. std::size_t is strongly recommended. 125 typedef std::size_t size_t; 126 127 // The type used for the enqueue and dequeue indices. Must be at least as 128 // large as size_t. Should be significantly larger than the number of elements 129 // you expect to hold at once, especially if you have a high turnover rate; 130 // for example, on 32-bit x86, if you expect to have over a hundred million 131 // elements or pump several million elements through your queue in a very 132 // short space of time, using a 32-bit type *may* trigger a race condition. 133 // A 64-bit int type is recommended in that case, and in practice will 134 // prevent a race condition no matter the usage of the queue. Note that 135 // whether the queue is lock-free with a 64-int type depends on the whether 136 // std::atomic<std::uint64_t> is lock-free, which is platform-specific. 137 typedef std::size_t index_t; 138 139 // Internally, all elements are enqueued and dequeued from multi-element 140 // blocks; this is the smallest controllable unit. If you expect few elements 141 // but many producers, a smaller block size should be favoured. For few producers 142 // and/or many elements, a larger block size is preferred. A sane default 143 // is provided. Must be a power of 2. 144 static const size_t BLOCK_SIZE = 64*1024; 145 146 // For explicit producers (i.e. when using a producer token), the block is 147 // checked for being empty by iterating through a list of flags, one per element. 148 // For large block sizes, this is too inefficient, and switching to an atomic 149 // counter-based approach is faster. The switch is made for block sizes strictly 150 // larger than this threshold. 151 static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD = 32; 152 153 // How many full blocks can be expected for a single explicit producer? This should 154 // reflect that number's maximum for optimal performance. Must be a power of 2. 155 static const size_t EXPLICIT_INITIAL_INDEX_SIZE = 32; 156 157 // Controls the number of items that an explicit consumer (i.e. one with a token) 158 // must consume before it causes all consumers to rotate and move on to the next 159 // internal queue. 160 static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE = 256; 161 162 // The maximum number of elements (inclusive) that can be enqueued to a sub-queue. 163 // Enqueue operations that would cause this limit to be surpassed will fail. Note 164 // that this limit is enforced at the block level (for performance reasons), i.e. 165 // it's rounded up to the nearest block size. 166 static const size_t MAX_SUBQUEUE_SIZE = details::const_numeric_max<size_t>::value; 167 168 169 // Memory allocation can be customized if needed. 170 // malloc should return nullptr on failure, and handle alignment like std::malloc. 171#if defined(malloc) || defined(free) 172 // Gah, this is 2015, stop defining macros that break standard code already! 173 // Work around malloc/free being special macros: 174 static inline void* WORKAROUND_malloc(size_t size) { return malloc(size); } 175 static inline void WORKAROUND_free(void* ptr) { return free(ptr); } 176 static inline void* (malloc)(size_t size) { return WORKAROUND_malloc(size); } 177 static inline void (free)(void* ptr) { return WORKAROUND_free(ptr); } 178#else 179 static inline void* malloc(size_t size) { return tracy::tracy_malloc(size); } 180 static inline void free(void* ptr) { return tracy::tracy_free(ptr); } 181#endif 182}; 183 184 185// When producing or consuming many elements, the most efficient way is to: 186// 1) Use one of the bulk-operation methods of the queue with a token 187// 2) Failing that, use the bulk-operation methods without a token 188// 3) Failing that, create a token and use that with the single-item methods 189// 4) Failing that, use the single-parameter methods of the queue 190// Having said that, don't create tokens willy-nilly -- ideally there should be 191// a maximum of one token per thread (of each kind). 192struct ProducerToken; 193struct ConsumerToken; 194 195template<typename T, typename Traits> class ConcurrentQueue; 196 197 198namespace details 199{ 200 struct ConcurrentQueueProducerTypelessBase 201 { 202 ConcurrentQueueProducerTypelessBase* next; 203 std::atomic<bool> inactive; 204 ProducerToken* token; 205 uint32_t threadId; 206 207 ConcurrentQueueProducerTypelessBase() 208 : next(nullptr), inactive(false), token(nullptr), threadId(0) 209 { 210 } 211 }; 212 213 template<typename T> 214 static inline bool circular_less_than(T a, T b) 215 { 216 static_assert(std::is_integral<T>::value && !std::numeric_limits<T>::is_signed, "circular_less_than is intended to be used only with unsigned integer types"); 217 return static_cast<T>(a - b) > static_cast<T>(static_cast<T>(1) << (static_cast<T>(sizeof(T) * CHAR_BIT - 1))); 218 // Note: extra parens around rhs of operator<< is MSVC bug: https://developercommunity2.visualstudio.com/t/C4554-triggers-when-both-lhs-and-rhs-is/10034931 219 // silencing the bug requires #pragma warning(disable: 4554) around the calling code and has no effect when done here. 220 } 221 222 template<typename U> 223 static inline char* align_for(char* ptr) 224 { 225 const std::size_t alignment = std::alignment_of<U>::value; 226 return ptr + (alignment - (reinterpret_cast<std::uintptr_t>(ptr) % alignment)) % alignment; 227 } 228 229 template<typename T> 230 static inline T ceil_to_pow_2(T x) 231 { 232 static_assert(std::is_integral<T>::value && !std::numeric_limits<T>::is_signed, "ceil_to_pow_2 is intended to be used only with unsigned integer types"); 233 234 // Adapted from http://graphics.stanford.edu/~seander/bithacks.html#RoundUpPowerOf2 235 --x; 236 x |= x >> 1; 237 x |= x >> 2; 238 x |= x >> 4; 239 for (std::size_t i = 1; i < sizeof(T); i <<= 1) { 240 x |= x >> (i << 3); 241 } 242 ++x; 243 return x; 244 } 245 246 template<typename T> 247 static inline void swap_relaxed(std::atomic<T>& left, std::atomic<T>& right) 248 { 249 T temp = std::move(left.load(std::memory_order_relaxed)); 250 left.store(std::move(right.load(std::memory_order_relaxed)), std::memory_order_relaxed); 251 right.store(std::move(temp), std::memory_order_relaxed); 252 } 253 254 template<typename T> 255 static inline T const& nomove(T const& x) 256 { 257 return x; 258 } 259 260 template<bool Enable> 261 struct nomove_if 262 { 263 template<typename T> 264 static inline T const& eval(T const& x) 265 { 266 return x; 267 } 268 }; 269 270 template<> 271 struct nomove_if<false> 272 { 273 template<typename U> 274 static inline auto eval(U&& x) 275 -> decltype(std::forward<U>(x)) 276 { 277 return std::forward<U>(x); 278 } 279 }; 280 281 template<typename It> 282 static inline auto deref_noexcept(It& it) noexcept -> decltype(*it) 283 { 284 return *it; 285 } 286 287#if defined(__clang__) || !defined(__GNUC__) || __GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 8) 288 template<typename T> struct is_trivially_destructible : std::is_trivially_destructible<T> { }; 289#else 290 template<typename T> struct is_trivially_destructible : std::has_trivial_destructor<T> { }; 291#endif 292 293 template<typename T> struct static_is_lock_free_num { enum { value = 0 }; }; 294 template<> struct static_is_lock_free_num<signed char> { enum { value = ATOMIC_CHAR_LOCK_FREE }; }; 295 template<> struct static_is_lock_free_num<short> { enum { value = ATOMIC_SHORT_LOCK_FREE }; }; 296 template<> struct static_is_lock_free_num<int> { enum { value = ATOMIC_INT_LOCK_FREE }; }; 297 template<> struct static_is_lock_free_num<long> { enum { value = ATOMIC_LONG_LOCK_FREE }; }; 298 template<> struct static_is_lock_free_num<long long> { enum { value = ATOMIC_LLONG_LOCK_FREE }; }; 299 template<typename T> struct static_is_lock_free : static_is_lock_free_num<typename std::make_signed<T>::type> { }; 300 template<> struct static_is_lock_free<bool> { enum { value = ATOMIC_BOOL_LOCK_FREE }; }; 301 template<typename U> struct static_is_lock_free<U*> { enum { value = ATOMIC_POINTER_LOCK_FREE }; }; 302} 303 304 305struct ProducerToken 306{ 307 template<typename T, typename Traits> 308 explicit ProducerToken(ConcurrentQueue<T, Traits>& queue); 309 310 ProducerToken(ProducerToken&& other) noexcept 311 : producer(other.producer) 312 { 313 other.producer = nullptr; 314 if (producer != nullptr) { 315 producer->token = this; 316 } 317 } 318 319 inline ProducerToken& operator=(ProducerToken&& other) noexcept 320 { 321 swap(other); 322 return *this; 323 } 324 325 void swap(ProducerToken& other) noexcept 326 { 327 std::swap(producer, other.producer); 328 if (producer != nullptr) { 329 producer->token = this; 330 } 331 if (other.producer != nullptr) { 332 other.producer->token = &other; 333 } 334 } 335 336 // A token is always valid unless: 337 // 1) Memory allocation failed during construction 338 // 2) It was moved via the move constructor 339 // (Note: assignment does a swap, leaving both potentially valid) 340 // 3) The associated queue was destroyed 341 // Note that if valid() returns true, that only indicates 342 // that the token is valid for use with a specific queue, 343 // but not which one; that's up to the user to track. 344 inline bool valid() const { return producer != nullptr; } 345 346 ~ProducerToken() 347 { 348 if (producer != nullptr) { 349 producer->token = nullptr; 350 producer->inactive.store(true, std::memory_order_release); 351 } 352 } 353 354 // Disable copying and assignment 355 ProducerToken(ProducerToken const&) = delete; 356 ProducerToken& operator=(ProducerToken const&) = delete; 357 358private: 359 template<typename T, typename Traits> friend class ConcurrentQueue; 360 361protected: 362 details::ConcurrentQueueProducerTypelessBase* producer; 363}; 364 365 366struct ConsumerToken 367{ 368 template<typename T, typename Traits> 369 explicit ConsumerToken(ConcurrentQueue<T, Traits>& q); 370 371 ConsumerToken(ConsumerToken&& other) noexcept 372 : initialOffset(other.initialOffset), lastKnownGlobalOffset(other.lastKnownGlobalOffset), itemsConsumedFromCurrent(other.itemsConsumedFromCurrent), currentProducer(other.currentProducer), desiredProducer(other.desiredProducer) 373 { 374 } 375 376 inline ConsumerToken& operator=(ConsumerToken&& other) noexcept 377 { 378 swap(other); 379 return *this; 380 } 381 382 void swap(ConsumerToken& other) noexcept 383 { 384 std::swap(initialOffset, other.initialOffset); 385 std::swap(lastKnownGlobalOffset, other.lastKnownGlobalOffset); 386 std::swap(itemsConsumedFromCurrent, other.itemsConsumedFromCurrent); 387 std::swap(currentProducer, other.currentProducer); 388 std::swap(desiredProducer, other.desiredProducer); 389 } 390 391 // Disable copying and assignment 392 ConsumerToken(ConsumerToken const&) = delete; 393 ConsumerToken& operator=(ConsumerToken const&) = delete; 394 395private: 396 template<typename T, typename Traits> friend class ConcurrentQueue; 397 398private: // but shared with ConcurrentQueue 399 std::uint32_t initialOffset; 400 std::uint32_t lastKnownGlobalOffset; 401 std::uint32_t itemsConsumedFromCurrent; 402 details::ConcurrentQueueProducerTypelessBase* currentProducer; 403 details::ConcurrentQueueProducerTypelessBase* desiredProducer; 404}; 405 406 407template<typename T, typename Traits = ConcurrentQueueDefaultTraits> 408class ConcurrentQueue 409{ 410public: 411 struct ExplicitProducer; 412 413 typedef moodycamel::ProducerToken producer_token_t; 414 typedef moodycamel::ConsumerToken consumer_token_t; 415 416 typedef typename Traits::index_t index_t; 417 typedef typename Traits::size_t size_t; 418 419 static const size_t BLOCK_SIZE = static_cast<size_t>(Traits::BLOCK_SIZE); 420 static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD = static_cast<size_t>(Traits::EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD); 421 static const size_t EXPLICIT_INITIAL_INDEX_SIZE = static_cast<size_t>(Traits::EXPLICIT_INITIAL_INDEX_SIZE); 422 static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE = static_cast<std::uint32_t>(Traits::EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE); 423#ifdef _MSC_VER 424#pragma warning(push) 425#pragma warning(disable: 4307) // + integral constant overflow (that's what the ternary expression is for!) 426#pragma warning(disable: 4309) // static_cast: Truncation of constant value 427#endif 428 static const size_t MAX_SUBQUEUE_SIZE = (details::const_numeric_max<size_t>::value - static_cast<size_t>(Traits::MAX_SUBQUEUE_SIZE) < BLOCK_SIZE) ? details::const_numeric_max<size_t>::value : ((static_cast<size_t>(Traits::MAX_SUBQUEUE_SIZE) + (BLOCK_SIZE - 1)) / BLOCK_SIZE * BLOCK_SIZE); 429#ifdef _MSC_VER 430#pragma warning(pop) 431#endif 432 433 static_assert(!std::numeric_limits<size_t>::is_signed && std::is_integral<size_t>::value, "Traits::size_t must be an unsigned integral type"); 434 static_assert(!std::numeric_limits<index_t>::is_signed && std::is_integral<index_t>::value, "Traits::index_t must be an unsigned integral type"); 435 static_assert(sizeof(index_t) >= sizeof(size_t), "Traits::index_t must be at least as wide as Traits::size_t"); 436 static_assert((BLOCK_SIZE > 1) && !(BLOCK_SIZE & (BLOCK_SIZE - 1)), "Traits::BLOCK_SIZE must be a power of 2 (and at least 2)"); 437 static_assert((EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD > 1) && !(EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD & (EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD - 1)), "Traits::EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD must be a power of 2 (and greater than 1)"); 438 static_assert((EXPLICIT_INITIAL_INDEX_SIZE > 1) && !(EXPLICIT_INITIAL_INDEX_SIZE & (EXPLICIT_INITIAL_INDEX_SIZE - 1)), "Traits::EXPLICIT_INITIAL_INDEX_SIZE must be a power of 2 (and greater than 1)"); 439 440public: 441 // Creates a queue with at least `capacity` element slots; note that the 442 // actual number of elements that can be inserted without additional memory 443 // allocation depends on the number of producers and the block size (e.g. if 444 // the block size is equal to `capacity`, only a single block will be allocated 445 // up-front, which means only a single producer will be able to enqueue elements 446 // without an extra allocation -- blocks aren't shared between producers). 447 // This method is not thread safe -- it is up to the user to ensure that the 448 // queue is fully constructed before it starts being used by other threads (this 449 // includes making the memory effects of construction visible, possibly with a 450 // memory barrier). 451 explicit ConcurrentQueue(size_t capacity = 6 * BLOCK_SIZE) 452 : producerListTail(nullptr), 453 producerCount(0), 454 initialBlockPoolIndex(0), 455 nextExplicitConsumerId(0), 456 globalExplicitConsumerOffset(0) 457 { 458 populate_initial_block_list(capacity / BLOCK_SIZE + ((capacity & (BLOCK_SIZE - 1)) == 0 ? 0 : 1)); 459 } 460 461 // Computes the correct amount of pre-allocated blocks for you based 462 // on the minimum number of elements you want available at any given 463 // time, and the maximum concurrent number of each type of producer. 464 ConcurrentQueue(size_t minCapacity, size_t maxExplicitProducers) 465 : producerListTail(nullptr), 466 producerCount(0), 467 initialBlockPoolIndex(0), 468 nextExplicitConsumerId(0), 469 globalExplicitConsumerOffset(0) 470 { 471 size_t blocks = (((minCapacity + BLOCK_SIZE - 1) / BLOCK_SIZE) - 1) * (maxExplicitProducers + 1) + 2 * (maxExplicitProducers); 472 populate_initial_block_list(blocks); 473 } 474 475 // Note: The queue should not be accessed concurrently while it's 476 // being deleted. It's up to the user to synchronize this. 477 // This method is not thread safe. 478 ~ConcurrentQueue() 479 { 480 // Destroy producers 481 auto ptr = producerListTail.load(std::memory_order_relaxed); 482 while (ptr != nullptr) { 483 auto next = ptr->next_prod(); 484 if (ptr->token != nullptr) { 485 ptr->token->producer = nullptr; 486 } 487 destroy(ptr); 488 ptr = next; 489 } 490 491 // Destroy global free list 492 auto block = freeList.head_unsafe(); 493 while (block != nullptr) { 494 auto next = block->freeListNext.load(std::memory_order_relaxed); 495 if (block->dynamicallyAllocated) { 496 destroy(block); 497 } 498 block = next; 499 } 500 501 // Destroy initial free list 502 destroy_array(initialBlockPool, initialBlockPoolSize); 503 } 504 505 // Disable copying and copy assignment 506 ConcurrentQueue(ConcurrentQueue const&) = delete; 507 ConcurrentQueue(ConcurrentQueue&& other) = delete; 508 ConcurrentQueue& operator=(ConcurrentQueue const&) = delete; 509 ConcurrentQueue& operator=(ConcurrentQueue&& other) = delete; 510 511public: 512 tracy_force_inline T* enqueue_begin(producer_token_t const& token, index_t& currentTailIndex) 513 { 514 return static_cast<ExplicitProducer*>(token.producer)->ConcurrentQueue::ExplicitProducer::enqueue_begin(currentTailIndex); 515 } 516 517 template<class NotifyThread, class ProcessData> 518 size_t try_dequeue_bulk_single(consumer_token_t& token, NotifyThread notifyThread, ProcessData processData ) 519 { 520 if (token.desiredProducer == nullptr || token.lastKnownGlobalOffset != globalExplicitConsumerOffset.load(std::memory_order_relaxed)) { 521 if (!update_current_producer_after_rotation(token)) { 522 return 0; 523 } 524 } 525 526 size_t count = static_cast<ProducerBase*>(token.currentProducer)->dequeue_bulk(notifyThread, processData); 527 token.itemsConsumedFromCurrent += static_cast<std::uint32_t>(count); 528 529 auto tail = producerListTail.load(std::memory_order_acquire); 530 auto ptr = static_cast<ProducerBase*>(token.currentProducer)->next_prod(); 531 if (ptr == nullptr) { 532 ptr = tail; 533 } 534 if( count == 0 ) 535 { 536 while (ptr != static_cast<ProducerBase*>(token.currentProducer)) { 537 auto dequeued = ptr->dequeue_bulk(notifyThread, processData); 538 if (dequeued != 0) { 539 token.currentProducer = ptr; 540 token.itemsConsumedFromCurrent = static_cast<std::uint32_t>(dequeued); 541 return dequeued; 542 } 543 ptr = ptr->next_prod(); 544 if (ptr == nullptr) { 545 ptr = tail; 546 } 547 } 548 return 0; 549 } 550 else 551 { 552 token.currentProducer = ptr; 553 token.itemsConsumedFromCurrent = 0; 554 return count; 555 } 556 } 557 558 559 // Returns an estimate of the total number of elements currently in the queue. This 560 // estimate is only accurate if the queue has completely stabilized before it is called 561 // (i.e. all enqueue and dequeue operations have completed and their memory effects are 562 // visible on the calling thread, and no further operations start while this method is 563 // being called). 564 // Thread-safe. 565 size_t size_approx() const 566 { 567 size_t size = 0; 568 for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) { 569 size += ptr->size_approx(); 570 } 571 return size; 572 } 573 574 575 // Returns true if the underlying atomic variables used by 576 // the queue are lock-free (they should be on most platforms). 577 // Thread-safe. 578 static bool is_lock_free() 579 { 580 return 581 details::static_is_lock_free<bool>::value == 2 && 582 details::static_is_lock_free<size_t>::value == 2 && 583 details::static_is_lock_free<std::uint32_t>::value == 2 && 584 details::static_is_lock_free<index_t>::value == 2 && 585 details::static_is_lock_free<void*>::value == 2; 586 } 587 588 589private: 590 friend struct ProducerToken; 591 friend struct ConsumerToken; 592 friend struct ExplicitProducer; 593 594 595 /////////////////////////////// 596 // Queue methods 597 /////////////////////////////// 598 599 inline bool update_current_producer_after_rotation(consumer_token_t& token) 600 { 601 // Ah, there's been a rotation, figure out where we should be! 602 auto tail = producerListTail.load(std::memory_order_acquire); 603 if (token.desiredProducer == nullptr && tail == nullptr) { 604 return false; 605 } 606 auto prodCount = producerCount.load(std::memory_order_relaxed); 607 auto globalOffset = globalExplicitConsumerOffset.load(std::memory_order_relaxed); 608 if (details::cqUnlikely(token.desiredProducer == nullptr)) { 609 // Aha, first time we're dequeueing anything. 610 // Figure out our local position 611 // Note: offset is from start, not end, but we're traversing from end -- subtract from count first 612 std::uint32_t offset = prodCount - 1 - (token.initialOffset % prodCount); 613 token.desiredProducer = tail; 614 for (std::uint32_t i = 0; i != offset; ++i) { 615 token.desiredProducer = static_cast<ProducerBase*>(token.desiredProducer)->next_prod(); 616 if (token.desiredProducer == nullptr) { 617 token.desiredProducer = tail; 618 } 619 } 620 } 621 622 std::uint32_t delta = globalOffset - token.lastKnownGlobalOffset; 623 if (delta >= prodCount) { 624 delta = delta % prodCount; 625 } 626 for (std::uint32_t i = 0; i != delta; ++i) { 627 token.desiredProducer = static_cast<ProducerBase*>(token.desiredProducer)->next_prod(); 628 if (token.desiredProducer == nullptr) { 629 token.desiredProducer = tail; 630 } 631 } 632 633 token.lastKnownGlobalOffset = globalOffset; 634 token.currentProducer = token.desiredProducer; 635 token.itemsConsumedFromCurrent = 0; 636 return true; 637 } 638 639 640 /////////////////////////// 641 // Free list 642 /////////////////////////// 643 644 template <typename N> 645 struct FreeListNode 646 { 647 FreeListNode() : freeListRefs(0), freeListNext(nullptr) { } 648 649 std::atomic<std::uint32_t> freeListRefs; 650 std::atomic<N*> freeListNext; 651 }; 652 653 // A simple CAS-based lock-free free list. Not the fastest thing in the world under heavy contention, but 654 // simple and correct (assuming nodes are never freed until after the free list is destroyed), and fairly 655 // speedy under low contention. 656 template<typename N> // N must inherit FreeListNode or have the same fields (and initialization of them) 657 struct FreeList 658 { 659 FreeList() : freeListHead(nullptr) { } 660 FreeList(FreeList&& other) : freeListHead(other.freeListHead.load(std::memory_order_relaxed)) { other.freeListHead.store(nullptr, std::memory_order_relaxed); } 661 void swap(FreeList& other) { details::swap_relaxed(freeListHead, other.freeListHead); } 662 663 FreeList(FreeList const&) = delete; 664 FreeList& operator=(FreeList const&) = delete; 665 666 inline void add(N* node) 667 { 668 // We know that the should-be-on-freelist bit is 0 at this point, so it's safe to 669 // set it using a fetch_add 670 if (node->freeListRefs.fetch_add(SHOULD_BE_ON_FREELIST, std::memory_order_acq_rel) == 0) { 671 // Oh look! We were the last ones referencing this node, and we know 672 // we want to add it to the free list, so let's do it! 673 add_knowing_refcount_is_zero(node); 674 } 675 } 676 677 inline N* try_get() 678 { 679 auto head = freeListHead.load(std::memory_order_acquire); 680 while (head != nullptr) { 681 auto prevHead = head; 682 auto refs = head->freeListRefs.load(std::memory_order_relaxed); 683 if ((refs & REFS_MASK) == 0 || !head->freeListRefs.compare_exchange_strong(refs, refs + 1, std::memory_order_acquire, std::memory_order_relaxed)) { 684 head = freeListHead.load(std::memory_order_acquire); 685 continue; 686 } 687 688 // Good, reference count has been incremented (it wasn't at zero), which means we can read the 689 // next and not worry about it changing between now and the time we do the CAS 690 auto next = head->freeListNext.load(std::memory_order_relaxed); 691 if (freeListHead.compare_exchange_strong(head, next, std::memory_order_acquire, std::memory_order_relaxed)) { 692 // Yay, got the node. This means it was on the list, which means shouldBeOnFreeList must be false no 693 // matter the refcount (because nobody else knows it's been taken off yet, it can't have been put back on). 694 assert((head->freeListRefs.load(std::memory_order_relaxed) & SHOULD_BE_ON_FREELIST) == 0); 695 696 // Decrease refcount twice, once for our ref, and once for the list's ref 697 head->freeListRefs.fetch_sub(2, std::memory_order_release); 698 return head; 699 } 700 701 // OK, the head must have changed on us, but we still need to decrease the refcount we increased. 702 // Note that we don't need to release any memory effects, but we do need to ensure that the reference 703 // count decrement happens-after the CAS on the head. 704 refs = prevHead->freeListRefs.fetch_sub(1, std::memory_order_acq_rel); 705 if (refs == SHOULD_BE_ON_FREELIST + 1) { 706 add_knowing_refcount_is_zero(prevHead); 707 } 708 } 709 710 return nullptr; 711 } 712 713 // Useful for traversing the list when there's no contention (e.g. to destroy remaining nodes) 714 N* head_unsafe() const { return freeListHead.load(std::memory_order_relaxed); } 715 716 private: 717 inline void add_knowing_refcount_is_zero(N* node) 718 { 719 // Since the refcount is zero, and nobody can increase it once it's zero (except us, and we run 720 // only one copy of this method per node at a time, i.e. the single thread case), then we know 721 // we can safely change the next pointer of the node; however, once the refcount is back above 722 // zero, then other threads could increase it (happens under heavy contention, when the refcount 723 // goes to zero in between a load and a refcount increment of a node in try_get, then back up to 724 // something non-zero, then the refcount increment is done by the other thread) -- so, if the CAS 725 // to add the node to the actual list fails, decrease the refcount and leave the add operation to 726 // the next thread who puts the refcount back at zero (which could be us, hence the loop). 727 auto head = freeListHead.load(std::memory_order_relaxed); 728 while (true) { 729 node->freeListNext.store(head, std::memory_order_relaxed); 730 node->freeListRefs.store(1, std::memory_order_release); 731 if (!freeListHead.compare_exchange_strong(head, node, std::memory_order_release, std::memory_order_relaxed)) { 732 // Hmm, the add failed, but we can only try again when the refcount goes back to zero 733 if (node->freeListRefs.fetch_add(SHOULD_BE_ON_FREELIST - 1, std::memory_order_release) == 1) { 734 continue; 735 } 736 } 737 return; 738 } 739 } 740 741 private: 742 // Implemented like a stack, but where node order doesn't matter (nodes are inserted out of order under contention) 743 std::atomic<N*> freeListHead; 744 745 static const std::uint32_t REFS_MASK = 0x7FFFFFFF; 746 static const std::uint32_t SHOULD_BE_ON_FREELIST = 0x80000000; 747 }; 748 749 750 /////////////////////////// 751 // Block 752 /////////////////////////// 753 754 struct Block 755 { 756 Block() 757 : next(nullptr), elementsCompletelyDequeued(0), freeListRefs(0), freeListNext(nullptr), shouldBeOnFreeList(false), dynamicallyAllocated(true) 758 { 759 } 760 761 inline bool is_empty() const 762 { 763 if (compile_time_condition<BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD>::value) { 764 // Check flags 765 for (size_t i = 0; i < BLOCK_SIZE; ++i) { 766 if (!emptyFlags[i].load(std::memory_order_relaxed)) { 767 return false; 768 } 769 } 770 771 // Aha, empty; make sure we have all other memory effects that happened before the empty flags were set 772 std::atomic_thread_fence(std::memory_order_acquire); 773 return true; 774 } 775 else { 776 // Check counter 777 if (elementsCompletelyDequeued.load(std::memory_order_relaxed) == BLOCK_SIZE) { 778 std::atomic_thread_fence(std::memory_order_acquire); 779 return true; 780 } 781 assert(elementsCompletelyDequeued.load(std::memory_order_relaxed) <= BLOCK_SIZE); 782 return false; 783 } 784 } 785 786 // Returns true if the block is now empty (does not apply in explicit context) 787 inline bool set_empty(index_t i) 788 { 789 if (BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) { 790 // Set flag 791 assert(!emptyFlags[BLOCK_SIZE - 1 - static_cast<size_t>(i & static_cast<index_t>(BLOCK_SIZE - 1))].load(std::memory_order_relaxed)); 792 emptyFlags[BLOCK_SIZE - 1 - static_cast<size_t>(i & static_cast<index_t>(BLOCK_SIZE - 1))].store(true, std::memory_order_release); 793 return false; 794 } 795 else { 796 // Increment counter 797 auto prevVal = elementsCompletelyDequeued.fetch_add(1, std::memory_order_release); 798 assert(prevVal < BLOCK_SIZE); 799 return prevVal == BLOCK_SIZE - 1; 800 } 801 } 802 803 // Sets multiple contiguous item statuses to 'empty' (assumes no wrapping and count > 0). 804 // Returns true if the block is now empty (does not apply in explicit context). 805 inline bool set_many_empty(index_t i, size_t count) 806 { 807 if (compile_time_condition<BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD>::value) { 808 // Set flags 809 std::atomic_thread_fence(std::memory_order_release); 810 i = BLOCK_SIZE - 1 - static_cast<size_t>(i & static_cast<index_t>(BLOCK_SIZE - 1)) - count + 1; 811 for (size_t j = 0; j != count; ++j) { 812 assert(!emptyFlags[i + j].load(std::memory_order_relaxed)); 813 emptyFlags[i + j].store(true, std::memory_order_relaxed); 814 } 815 return false; 816 } 817 else { 818 // Increment counter 819 auto prevVal = elementsCompletelyDequeued.fetch_add(count, std::memory_order_release); 820 assert(prevVal + count <= BLOCK_SIZE); 821 return prevVal + count == BLOCK_SIZE; 822 } 823 } 824 825 inline void set_all_empty() 826 { 827 if (BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) { 828 // Set all flags 829 for (size_t i = 0; i != BLOCK_SIZE; ++i) { 830 emptyFlags[i].store(true, std::memory_order_relaxed); 831 } 832 } 833 else { 834 // Reset counter 835 elementsCompletelyDequeued.store(BLOCK_SIZE, std::memory_order_relaxed); 836 } 837 } 838 839 inline void reset_empty() 840 { 841 if (compile_time_condition<BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD>::value) { 842 // Reset flags 843 for (size_t i = 0; i != BLOCK_SIZE; ++i) { 844 emptyFlags[i].store(false, std::memory_order_relaxed); 845 } 846 } 847 else { 848 // Reset counter 849 elementsCompletelyDequeued.store(0, std::memory_order_relaxed); 850 } 851 } 852 853 inline T* operator[](index_t idx) noexcept { return static_cast<T*>(static_cast<void*>(elements)) + static_cast<size_t>(idx & static_cast<index_t>(BLOCK_SIZE - 1)); } 854 inline T const* operator[](index_t idx) const noexcept { return static_cast<T const*>(static_cast<void const*>(elements)) + static_cast<size_t>(idx & static_cast<index_t>(BLOCK_SIZE - 1)); } 855 856 private: 857 // IMPORTANT: This must be the first member in Block, so that if T depends on the alignment of 858 // addresses returned by malloc, that alignment will be preserved. Apparently clang actually 859 // generates code that uses this assumption for AVX instructions in some cases. Ideally, we 860 // should also align Block to the alignment of T in case it's higher than malloc's 16-byte 861 // alignment, but this is hard to do in a cross-platform way. Assert for this case: 862 static_assert(std::alignment_of<T>::value <= std::alignment_of<details::max_align_t>::value, "The queue does not support super-aligned types at this time"); 863 // Additionally, we need the alignment of Block itself to be a multiple of max_align_t since 864 // otherwise the appropriate padding will not be added at the end of Block in order to make 865 // arrays of Blocks all be properly aligned (not just the first one). We use a union to force 866 // this. 867 union { 868 char elements[sizeof(T) * BLOCK_SIZE]; 869 details::max_align_t dummy; 870 }; 871 public: 872 Block* next; 873 std::atomic<size_t> elementsCompletelyDequeued; 874 std::atomic<bool> emptyFlags[BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD ? BLOCK_SIZE : 1]; 875 public: 876 std::atomic<std::uint32_t> freeListRefs; 877 std::atomic<Block*> freeListNext; 878 std::atomic<bool> shouldBeOnFreeList; 879 bool dynamicallyAllocated; // Perhaps a better name for this would be 'isNotPartOfInitialBlockPool' 880 }; 881 static_assert(std::alignment_of<Block>::value >= std::alignment_of<details::max_align_t>::value, "Internal error: Blocks must be at least as aligned as the type they are wrapping"); 882 883 884 /////////////////////////// 885 // Producer base 886 /////////////////////////// 887 888 struct ProducerBase : public details::ConcurrentQueueProducerTypelessBase 889 { 890 ProducerBase(ConcurrentQueue* parent_) : 891 tailIndex(0), 892 headIndex(0), 893 dequeueOptimisticCount(0), 894 dequeueOvercommit(0), 895 tailBlock(nullptr), 896 parent(parent_) 897 { 898 } 899 900 virtual ~ProducerBase() { }; 901 902 template<class NotifyThread, class ProcessData> 903 inline size_t dequeue_bulk(NotifyThread notifyThread, ProcessData processData) 904 { 905 return static_cast<ExplicitProducer*>(this)->dequeue_bulk(notifyThread, processData); 906 } 907 908 inline ProducerBase* next_prod() const { return static_cast<ProducerBase*>(next); } 909 910 inline size_t size_approx() const 911 { 912 auto tail = tailIndex.load(std::memory_order_relaxed); 913 auto head = headIndex.load(std::memory_order_relaxed); 914 return details::circular_less_than(head, tail) ? static_cast<size_t>(tail - head) : 0; 915 } 916 917 inline index_t getTail() const { return tailIndex.load(std::memory_order_relaxed); } 918 protected: 919 std::atomic<index_t> tailIndex; // Where to enqueue to next 920 std::atomic<index_t> headIndex; // Where to dequeue from next 921 922 std::atomic<index_t> dequeueOptimisticCount; 923 std::atomic<index_t> dequeueOvercommit; 924 925 Block* tailBlock; 926 927 public: 928 ConcurrentQueue* parent; 929 }; 930 931 932 public: 933 /////////////////////////// 934 // Explicit queue 935 /////////////////////////// 936 struct ExplicitProducer : public ProducerBase 937 { 938 explicit ExplicitProducer(ConcurrentQueue* _parent) : 939 ProducerBase(_parent), 940 blockIndex(nullptr), 941 pr_blockIndexSlotsUsed(0), 942 pr_blockIndexSize(EXPLICIT_INITIAL_INDEX_SIZE >> 1), 943 pr_blockIndexFront(0), 944 pr_blockIndexEntries(nullptr), 945 pr_blockIndexRaw(nullptr) 946 { 947 size_t poolBasedIndexSize = details::ceil_to_pow_2(_parent->initialBlockPoolSize) >> 1; 948 if (poolBasedIndexSize > pr_blockIndexSize) { 949 pr_blockIndexSize = poolBasedIndexSize; 950 } 951 952 new_block_index(0); // This creates an index with double the number of current entries, i.e. EXPLICIT_INITIAL_INDEX_SIZE 953 } 954 955 ~ExplicitProducer() 956 { 957 // Destruct any elements not yet dequeued. 958 // Since we're in the destructor, we can assume all elements 959 // are either completely dequeued or completely not (no halfways). 960 if (this->tailBlock != nullptr) { // Note this means there must be a block index too 961 // First find the block that's partially dequeued, if any 962 Block* halfDequeuedBlock = nullptr; 963 if ((this->headIndex.load(std::memory_order_relaxed) & static_cast<index_t>(BLOCK_SIZE - 1)) != 0) { 964 // The head's not on a block boundary, meaning a block somewhere is partially dequeued 965 // (or the head block is the tail block and was fully dequeued, but the head/tail are still not on a boundary) 966 size_t i = (pr_blockIndexFront - pr_blockIndexSlotsUsed) & (pr_blockIndexSize - 1); 967 while (details::circular_less_than<index_t>(pr_blockIndexEntries[i].base + BLOCK_SIZE, this->headIndex.load(std::memory_order_relaxed))) { 968 i = (i + 1) & (pr_blockIndexSize - 1); 969 } 970 assert(details::circular_less_than<index_t>(pr_blockIndexEntries[i].base, this->headIndex.load(std::memory_order_relaxed))); 971 halfDequeuedBlock = pr_blockIndexEntries[i].block; 972 } 973 974 // Start at the head block (note the first line in the loop gives us the head from the tail on the first iteration) 975 auto block = this->tailBlock; 976 do { 977 block = block->next; 978 if (block->ConcurrentQueue::Block::is_empty()) { 979 continue; 980 } 981 982 size_t i = 0; // Offset into block 983 if (block == halfDequeuedBlock) { 984 i = static_cast<size_t>(this->headIndex.load(std::memory_order_relaxed) & static_cast<index_t>(BLOCK_SIZE - 1)); 985 } 986 987 // Walk through all the items in the block; if this is the tail block, we need to stop when we reach the tail index 988 auto lastValidIndex = (this->tailIndex.load(std::memory_order_relaxed) & static_cast<index_t>(BLOCK_SIZE - 1)) == 0 ? BLOCK_SIZE : static_cast<size_t>(this->tailIndex.load(std::memory_order_relaxed) & static_cast<index_t>(BLOCK_SIZE - 1)); 989 while (i != BLOCK_SIZE && (block != this->tailBlock || i != lastValidIndex)) { 990 (*block)[i++]->~T(); 991 } 992 } while (block != this->tailBlock); 993 } 994 995 // Destroy all blocks that we own 996 if (this->tailBlock != nullptr) { 997 auto block = this->tailBlock; 998 do { 999 auto nextBlock = block->next; 1000 if (block->dynamicallyAllocated) { 1001 destroy(block); 1002 } 1003 else { 1004 this->parent->add_block_to_free_list(block); 1005 } 1006 block = nextBlock; 1007 } while (block != this->tailBlock); 1008 } 1009 1010 // Destroy the block indices 1011 auto header = static_cast<BlockIndexHeader*>(pr_blockIndexRaw); 1012 while (header != nullptr) { 1013 auto prev = static_cast<BlockIndexHeader*>(header->prev); 1014 header->~BlockIndexHeader(); 1015 (Traits::free)(header); 1016 header = prev; 1017 } 1018 } 1019 1020 inline void enqueue_begin_alloc(index_t currentTailIndex) 1021 { 1022 // We reached the end of a block, start a new one 1023 if (this->tailBlock != nullptr && this->tailBlock->next->ConcurrentQueue::Block::is_empty()) { 1024 // We can re-use the block ahead of us, it's empty! 1025 this->tailBlock = this->tailBlock->next; 1026 this->tailBlock->ConcurrentQueue::Block::reset_empty(); 1027 1028 // We'll put the block on the block index (guaranteed to be room since we're conceptually removing the 1029 // last block from it first -- except instead of removing then adding, we can just overwrite). 1030 // Note that there must be a valid block index here, since even if allocation failed in the ctor, 1031 // it would have been re-attempted when adding the first block to the queue; since there is such 1032 // a block, a block index must have been successfully allocated. 1033 } 1034 else { 1035 // We're going to need a new block; check that the block index has room 1036 if (pr_blockIndexRaw == nullptr || pr_blockIndexSlotsUsed == pr_blockIndexSize) { 1037 // Hmm, the circular block index is already full -- we'll need 1038 // to allocate a new index. Note pr_blockIndexRaw can only be nullptr if 1039 // the initial allocation failed in the constructor. 1040 new_block_index(pr_blockIndexSlotsUsed); 1041 } 1042 1043 // Insert a new block in the circular linked list 1044 auto newBlock = this->parent->ConcurrentQueue::requisition_block(); 1045 newBlock->ConcurrentQueue::Block::reset_empty(); 1046 if (this->tailBlock == nullptr) { 1047 newBlock->next = newBlock; 1048 } 1049 else { 1050 newBlock->next = this->tailBlock->next; 1051 this->tailBlock->next = newBlock; 1052 } 1053 this->tailBlock = newBlock; 1054 ++pr_blockIndexSlotsUsed; 1055 } 1056 1057 // Add block to block index 1058 auto& entry = blockIndex.load(std::memory_order_relaxed)->entries[pr_blockIndexFront]; 1059 entry.base = currentTailIndex; 1060 entry.block = this->tailBlock; 1061 blockIndex.load(std::memory_order_relaxed)->front.store(pr_blockIndexFront, std::memory_order_release); 1062 pr_blockIndexFront = (pr_blockIndexFront + 1) & (pr_blockIndexSize - 1); 1063 } 1064 1065 tracy_force_inline T* enqueue_begin(index_t& currentTailIndex) 1066 { 1067 currentTailIndex = this->tailIndex.load(std::memory_order_relaxed); 1068 if (details::cqUnlikely((currentTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0)) { 1069 this->enqueue_begin_alloc(currentTailIndex); 1070 } 1071 return (*this->tailBlock)[currentTailIndex]; 1072 } 1073 1074 tracy_force_inline std::atomic<index_t>& get_tail_index() 1075 { 1076 return this->tailIndex; 1077 } 1078 1079 template<class NotifyThread, class ProcessData> 1080 size_t dequeue_bulk(NotifyThread notifyThread, ProcessData processData) 1081 { 1082 auto tail = this->tailIndex.load(std::memory_order_relaxed); 1083 auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed); 1084 auto desiredCount = static_cast<size_t>(tail - (this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit)); 1085 if (details::circular_less_than<size_t>(0, desiredCount)) { 1086 desiredCount = desiredCount < 8192 ? desiredCount : 8192; 1087 std::atomic_thread_fence(std::memory_order_acquire); 1088 1089 auto myDequeueCount = this->dequeueOptimisticCount.fetch_add(desiredCount, std::memory_order_relaxed); 1090 assert(overcommit <= myDequeueCount); 1091 1092 tail = this->tailIndex.load(std::memory_order_acquire); 1093 auto actualCount = static_cast<size_t>(tail - (myDequeueCount - overcommit)); 1094 if (details::circular_less_than<size_t>(0, actualCount)) { 1095 actualCount = desiredCount < actualCount ? desiredCount : actualCount; 1096 if (actualCount < desiredCount) { 1097 this->dequeueOvercommit.fetch_add(desiredCount - actualCount, std::memory_order_release); 1098 } 1099 1100 // Get the first index. Note that since there's guaranteed to be at least actualCount elements, this 1101 // will never exceed tail. 1102 auto firstIndex = this->headIndex.fetch_add(actualCount, std::memory_order_acq_rel); 1103 1104 // Determine which block the first element is in 1105 auto localBlockIndex = blockIndex.load(std::memory_order_acquire); 1106 auto localBlockIndexHead = localBlockIndex->front.load(std::memory_order_acquire); 1107 1108 auto headBase = localBlockIndex->entries[localBlockIndexHead].base; 1109 auto firstBlockBaseIndex = firstIndex & ~static_cast<index_t>(BLOCK_SIZE - 1); 1110 auto offset = static_cast<size_t>(static_cast<typename std::make_signed<index_t>::type>(firstBlockBaseIndex - headBase) / BLOCK_SIZE); 1111 auto indexIndex = (localBlockIndexHead + offset) & (localBlockIndex->size - 1); 1112 1113 notifyThread( this->threadId ); 1114 1115 // Iterate the blocks and dequeue 1116 auto index = firstIndex; 1117 do { 1118 auto firstIndexInBlock = index; 1119 auto endIndex = (index & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE); 1120 endIndex = details::circular_less_than<index_t>(firstIndex + static_cast<index_t>(actualCount), endIndex) ? firstIndex + static_cast<index_t>(actualCount) : endIndex; 1121 auto block = localBlockIndex->entries[indexIndex].block; 1122 1123 const auto sz = endIndex - index; 1124 processData( (*block)[index], sz ); 1125 index += sz; 1126 1127 block->ConcurrentQueue::Block::set_many_empty(firstIndexInBlock, static_cast<size_t>(endIndex - firstIndexInBlock)); 1128 indexIndex = (indexIndex + 1) & (localBlockIndex->size - 1); 1129 } while (index != firstIndex + actualCount); 1130 1131 return actualCount; 1132 } 1133 else { 1134 // Wasn't anything to dequeue after all; make the effective dequeue count eventually consistent 1135 this->dequeueOvercommit.fetch_add(desiredCount, std::memory_order_release); 1136 } 1137 } 1138 1139 return 0; 1140 } 1141 1142 private: 1143 struct BlockIndexEntry 1144 { 1145 index_t base; 1146 Block* block; 1147 }; 1148 1149 struct BlockIndexHeader 1150 { 1151 size_t size; 1152 std::atomic<size_t> front; // Current slot (not next, like pr_blockIndexFront) 1153 BlockIndexEntry* entries; 1154 void* prev; 1155 }; 1156 1157 1158 bool new_block_index(size_t numberOfFilledSlotsToExpose) 1159 { 1160 auto prevBlockSizeMask = pr_blockIndexSize - 1; 1161 1162 // Create the new block 1163 pr_blockIndexSize <<= 1; 1164 auto newRawPtr = static_cast<char*>((Traits::malloc)(sizeof(BlockIndexHeader) + std::alignment_of<BlockIndexEntry>::value - 1 + sizeof(BlockIndexEntry) * pr_blockIndexSize)); 1165 if (newRawPtr == nullptr) { 1166 pr_blockIndexSize >>= 1; // Reset to allow graceful retry 1167 return false; 1168 } 1169 1170 auto newBlockIndexEntries = reinterpret_cast<BlockIndexEntry*>(details::align_for<BlockIndexEntry>(newRawPtr + sizeof(BlockIndexHeader))); 1171 1172 // Copy in all the old indices, if any 1173 size_t j = 0; 1174 if (pr_blockIndexSlotsUsed != 0) { 1175 auto i = (pr_blockIndexFront - pr_blockIndexSlotsUsed) & prevBlockSizeMask; 1176 do { 1177 newBlockIndexEntries[j++] = pr_blockIndexEntries[i]; 1178 i = (i + 1) & prevBlockSizeMask; 1179 } while (i != pr_blockIndexFront); 1180 } 1181 1182 // Update everything 1183 auto header = new (newRawPtr) BlockIndexHeader; 1184 header->size = pr_blockIndexSize; 1185 header->front.store(numberOfFilledSlotsToExpose - 1, std::memory_order_relaxed); 1186 header->entries = newBlockIndexEntries; 1187 header->prev = pr_blockIndexRaw; // we link the new block to the old one so we can free it later 1188 1189 pr_blockIndexFront = j; 1190 pr_blockIndexEntries = newBlockIndexEntries; 1191 pr_blockIndexRaw = newRawPtr; 1192 blockIndex.store(header, std::memory_order_release); 1193 1194 return true; 1195 } 1196 1197 private: 1198 std::atomic<BlockIndexHeader*> blockIndex; 1199 1200 // To be used by producer only -- consumer must use the ones in referenced by blockIndex 1201 size_t pr_blockIndexSlotsUsed; 1202 size_t pr_blockIndexSize; 1203 size_t pr_blockIndexFront; // Next slot (not current) 1204 BlockIndexEntry* pr_blockIndexEntries; 1205 void* pr_blockIndexRaw; 1206 }; 1207 1208 ExplicitProducer* get_explicit_producer(producer_token_t const& token) 1209 { 1210 return static_cast<ExplicitProducer*>(token.producer); 1211 } 1212 1213 private: 1214 1215 ////////////////////////////////// 1216 // Block pool manipulation 1217 ////////////////////////////////// 1218 1219 void populate_initial_block_list(size_t blockCount) 1220 { 1221 initialBlockPoolSize = blockCount; 1222 if (initialBlockPoolSize == 0) { 1223 initialBlockPool = nullptr; 1224 return; 1225 } 1226 1227 initialBlockPool = create_array<Block>(blockCount); 1228 if (initialBlockPool == nullptr) { 1229 initialBlockPoolSize = 0; 1230 } 1231 for (size_t i = 0; i < initialBlockPoolSize; ++i) { 1232 initialBlockPool[i].dynamicallyAllocated = false; 1233 } 1234 } 1235 1236 inline Block* try_get_block_from_initial_pool() 1237 { 1238 if (initialBlockPoolIndex.load(std::memory_order_relaxed) >= initialBlockPoolSize) { 1239 return nullptr; 1240 } 1241 1242 auto index = initialBlockPoolIndex.fetch_add(1, std::memory_order_relaxed); 1243 1244 return index < initialBlockPoolSize ? (initialBlockPool + index) : nullptr; 1245 } 1246 1247 inline void add_block_to_free_list(Block* block) 1248 { 1249 freeList.add(block); 1250 } 1251 1252 inline void add_blocks_to_free_list(Block* block) 1253 { 1254 while (block != nullptr) { 1255 auto next = block->next; 1256 add_block_to_free_list(block); 1257 block = next; 1258 } 1259 } 1260 1261 inline Block* try_get_block_from_free_list() 1262 { 1263 return freeList.try_get(); 1264 } 1265 1266 // Gets a free block from one of the memory pools, or allocates a new one (if applicable) 1267 Block* requisition_block() 1268 { 1269 auto block = try_get_block_from_initial_pool(); 1270 if (block != nullptr) { 1271 return block; 1272 } 1273 1274 block = try_get_block_from_free_list(); 1275 if (block != nullptr) { 1276 return block; 1277 } 1278 1279 return create<Block>(); 1280 } 1281 1282 1283 ////////////////////////////////// 1284 // Producer list manipulation 1285 ////////////////////////////////// 1286 1287 ProducerBase* recycle_or_create_producer() 1288 { 1289 bool recycled; 1290 return recycle_or_create_producer(recycled); 1291 } 1292 1293 ProducerBase* recycle_or_create_producer(bool& recycled) 1294 { 1295 // Try to re-use one first 1296 for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) { 1297 if (ptr->inactive.load(std::memory_order_relaxed)) { 1298 if( ptr->size_approx() == 0 ) 1299 { 1300 bool expected = true; 1301 if (ptr->inactive.compare_exchange_strong(expected, /* desired */ false, std::memory_order_acquire, std::memory_order_relaxed)) { 1302 // We caught one! It's been marked as activated, the caller can have it 1303 recycled = true; 1304 return ptr; 1305 } 1306 } 1307 } 1308 } 1309 1310 recycled = false; 1311 return add_producer(static_cast<ProducerBase*>(create<ExplicitProducer>(this))); 1312 } 1313 1314 ProducerBase* add_producer(ProducerBase* producer) 1315 { 1316 // Handle failed memory allocation 1317 if (producer == nullptr) { 1318 return nullptr; 1319 } 1320 1321 producerCount.fetch_add(1, std::memory_order_relaxed); 1322 1323 // Add it to the lock-free list 1324 auto prevTail = producerListTail.load(std::memory_order_relaxed); 1325 do { 1326 producer->next = prevTail; 1327 } while (!producerListTail.compare_exchange_weak(prevTail, producer, std::memory_order_release, std::memory_order_relaxed)); 1328 1329 return producer; 1330 } 1331 1332 void reown_producers() 1333 { 1334 // After another instance is moved-into/swapped-with this one, all the 1335 // producers we stole still think their parents are the other queue. 1336 // So fix them up! 1337 for (auto ptr = producerListTail.load(std::memory_order_relaxed); ptr != nullptr; ptr = ptr->next_prod()) { 1338 ptr->parent = this; 1339 } 1340 } 1341 1342 ////////////////////////////////// 1343 // Utility functions 1344 ////////////////////////////////// 1345 1346 template<typename U> 1347 static inline U* create_array(size_t count) 1348 { 1349 assert(count > 0); 1350 return static_cast<U*>((Traits::malloc)(sizeof(U) * count)); 1351 } 1352 1353 template<typename U> 1354 static inline void destroy_array(U* p, size_t count) 1355 { 1356 ((void)count); 1357 if (p != nullptr) { 1358 assert(count > 0); 1359 (Traits::free)(p); 1360 } 1361 } 1362 1363 template<typename U> 1364 static inline U* create() 1365 { 1366 auto p = (Traits::malloc)(sizeof(U)); 1367 return new (p) U; 1368 } 1369 1370 template<typename U, typename A1> 1371 static inline U* create(A1&& a1) 1372 { 1373 auto p = (Traits::malloc)(sizeof(U)); 1374 return new (p) U(std::forward<A1>(a1)); 1375 } 1376 1377 template<typename U> 1378 static inline void destroy(U* p) 1379 { 1380 if (p != nullptr) { 1381 p->~U(); 1382 } 1383 (Traits::free)(p); 1384 } 1385 1386private: 1387 std::atomic<ProducerBase*> producerListTail; 1388 std::atomic<std::uint32_t> producerCount; 1389 1390 std::atomic<size_t> initialBlockPoolIndex; 1391 Block* initialBlockPool; 1392 size_t initialBlockPoolSize; 1393 1394 FreeList<Block> freeList; 1395 1396 std::atomic<std::uint32_t> nextExplicitConsumerId; 1397 std::atomic<std::uint32_t> globalExplicitConsumerOffset; 1398}; 1399 1400 1401template<typename T, typename Traits> 1402ProducerToken::ProducerToken(ConcurrentQueue<T, Traits>& queue) 1403 : producer(queue.recycle_or_create_producer()) 1404{ 1405 if (producer != nullptr) { 1406 producer->token = this; 1407 producer->threadId = detail::GetThreadHandleImpl(); 1408 } 1409} 1410 1411template<typename T, typename Traits> 1412ConsumerToken::ConsumerToken(ConcurrentQueue<T, Traits>& queue) 1413 : itemsConsumedFromCurrent(0), currentProducer(nullptr), desiredProducer(nullptr) 1414{ 1415 initialOffset = queue.nextExplicitConsumerId.fetch_add(1, std::memory_order_release); 1416 lastKnownGlobalOffset = static_cast<std::uint32_t>(-1); 1417} 1418 1419template<typename T, typename Traits> 1420inline void swap(ConcurrentQueue<T, Traits>& a, ConcurrentQueue<T, Traits>& b) noexcept 1421{ 1422 a.swap(b); 1423} 1424 1425inline void swap(ProducerToken& a, ProducerToken& b) noexcept 1426{ 1427 a.swap(b); 1428} 1429 1430inline void swap(ConsumerToken& a, ConsumerToken& b) noexcept 1431{ 1432 a.swap(b); 1433} 1434 1435} 1436 1437} /* namespace tracy */ 1438 1439#if defined(__GNUC__) 1440#pragma GCC diagnostic pop 1441#endif