summaryrefslogtreecommitdiff
path: root/src/common/reader_writer_queue.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/common/reader_writer_queue.h')
-rw-r--r--src/common/reader_writer_queue.h941
1 files changed, 941 insertions, 0 deletions
diff --git a/src/common/reader_writer_queue.h b/src/common/reader_writer_queue.h
new file mode 100644
index 000000000..8d2c9408c
--- /dev/null
+++ b/src/common/reader_writer_queue.h
@@ -0,0 +1,941 @@
1// ©2013-2020 Cameron Desrochers.
2// Distributed under the simplified BSD license (see the license file that
3// should have come with this header).
4
5#pragma once
6
7#include <cassert>
8#include <cstdint>
9#include <cstdlib> // For malloc/free/abort & size_t
10#include <memory>
11#include <new>
12#include <stdexcept>
13#include <type_traits>
14#include <utility>
15
16#include "common/atomic_helpers.h"
17
18#if __cplusplus > 199711L || _MSC_VER >= 1700 // C++11 or VS2012
19#include <chrono>
20#endif
21
22// A lock-free queue for a single-consumer, single-producer architecture.
23// The queue is also wait-free in the common path (except if more memory
24// needs to be allocated, in which case malloc is called).
25// Allocates memory sparingly, and only once if the original maximum size
26// estimate is never exceeded.
27// Tested on x86/x64 processors, but semantics should be correct for all
28// architectures (given the right implementations in atomicops.h), provided
29// that aligned integer and pointer accesses are naturally atomic.
30// Note that there should only be one consumer thread and producer thread;
31// Switching roles of the threads, or using multiple consecutive threads for
32// one role, is not safe unless properly synchronized.
33// Using the queue exclusively from one thread is fine, though a bit silly.
34
35#ifndef MOODYCAMEL_CACHE_LINE_SIZE
36#define MOODYCAMEL_CACHE_LINE_SIZE 64
37#endif
38
39#ifndef MOODYCAMEL_EXCEPTIONS_ENABLED
40#if (defined(_MSC_VER) && defined(_CPPUNWIND)) || (defined(__GNUC__) && defined(__EXCEPTIONS)) || \
41 (!defined(_MSC_VER) && !defined(__GNUC__))
42#define MOODYCAMEL_EXCEPTIONS_ENABLED
43#endif
44#endif
45
46#ifndef MOODYCAMEL_HAS_EMPLACE
47#if !defined(_MSC_VER) || \
48 _MSC_VER >= 1800 // variadic templates: either a non-MS compiler or VS >= 2013
49#define MOODYCAMEL_HAS_EMPLACE 1
50#endif
51#endif
52
53#ifndef MOODYCAMEL_MAYBE_ALIGN_TO_CACHELINE
54#if defined(__APPLE__) && defined(__MACH__) && __cplusplus >= 201703L
55// This is required to find out what deployment target we are using
56#include <CoreFoundation/CoreFoundation.h>
57#if !defined(MAC_OS_X_VERSION_MIN_REQUIRED) || \
58 MAC_OS_X_VERSION_MIN_REQUIRED < MAC_OS_X_VERSION_10_14
59// C++17 new(size_t, align_val_t) is not backwards-compatible with older versions of macOS, so we
60// can't support over-alignment in this case
61#define MOODYCAMEL_MAYBE_ALIGN_TO_CACHELINE
62#endif
63#endif
64#endif
65
66#ifndef MOODYCAMEL_MAYBE_ALIGN_TO_CACHELINE
67#define MOODYCAMEL_MAYBE_ALIGN_TO_CACHELINE AE_ALIGN(MOODYCAMEL_CACHE_LINE_SIZE)
68#endif
69
70#ifdef AE_VCPP
71#pragma warning(push)
72#pragma warning(disable : 4324) // structure was padded due to __declspec(align())
73#pragma warning(disable : 4820) // padding was added
74#pragma warning(disable : 4127) // conditional expression is constant
75#endif
76
77namespace Common {
78
79template <typename T, size_t MAX_BLOCK_SIZE = 512>
80class MOODYCAMEL_MAYBE_ALIGN_TO_CACHELINE ReaderWriterQueue {
81 // Design: Based on a queue-of-queues. The low-level queues are just
82 // circular buffers with front and tail indices indicating where the
83 // next element to dequeue is and where the next element can be enqueued,
84 // respectively. Each low-level queue is called a "block". Each block
85 // wastes exactly one element's worth of space to keep the design simple
86 // (if front == tail then the queue is empty, and can't be full).
87 // The high-level queue is a circular linked list of blocks; again there
88 // is a front and tail, but this time they are pointers to the blocks.
89 // The front block is where the next element to be dequeued is, provided
90 // the block is not empty. The back block is where elements are to be
91 // enqueued, provided the block is not full.
92 // The producer thread owns all the tail indices/pointers. The consumer
93 // thread owns all the front indices/pointers. Both threads read each
94 // other's variables, but only the owning thread updates them. E.g. After
95 // the consumer reads the producer's tail, the tail may change before the
96 // consumer is done dequeuing an object, but the consumer knows the tail
97 // will never go backwards, only forwards.
98 // If there is no room to enqueue an object, an additional block (of
99 // equal size to the last block) is added. Blocks are never removed.
100
101public:
102 typedef T value_type;
103
104 // Constructs a queue that can hold at least `size` elements without further
105 // allocations. If more than MAX_BLOCK_SIZE elements are requested,
106 // then several blocks of MAX_BLOCK_SIZE each are reserved (including
107 // at least one extra buffer block).
108 AE_NO_TSAN explicit ReaderWriterQueue(size_t size = 15)
109#ifndef NDEBUG
110 : enqueuing(false), dequeuing(false)
111#endif
112 {
113 assert(MAX_BLOCK_SIZE == ceilToPow2(MAX_BLOCK_SIZE) &&
114 "MAX_BLOCK_SIZE must be a power of 2");
115 assert(MAX_BLOCK_SIZE >= 2 && "MAX_BLOCK_SIZE must be at least 2");
116
117 Block* firstBlock = nullptr;
118
119 largestBlockSize =
120 ceilToPow2(size + 1); // We need a spare slot to fit size elements in the block
121 if (largestBlockSize > MAX_BLOCK_SIZE * 2) {
122 // We need a spare block in case the producer is writing to a different block the
123 // consumer is reading from, and wants to enqueue the maximum number of elements. We
124 // also need a spare element in each block to avoid the ambiguity between front == tail
125 // meaning "empty" and "full". So the effective number of slots that are guaranteed to
126 // be usable at any time is the block size - 1 times the number of blocks - 1. Solving
127 // for size and applying a ceiling to the division gives us (after simplifying):
128 size_t initialBlockCount = (size + MAX_BLOCK_SIZE * 2 - 3) / (MAX_BLOCK_SIZE - 1);
129 largestBlockSize = MAX_BLOCK_SIZE;
130 Block* lastBlock = nullptr;
131 for (size_t i = 0; i != initialBlockCount; ++i) {
132 auto block = make_block(largestBlockSize);
133 if (block == nullptr) {
134#ifdef MOODYCAMEL_EXCEPTIONS_ENABLED
135 throw std::bad_alloc();
136#else
137 abort();
138#endif
139 }
140 if (firstBlock == nullptr) {
141 firstBlock = block;
142 } else {
143 lastBlock->next = block;
144 }
145 lastBlock = block;
146 block->next = firstBlock;
147 }
148 } else {
149 firstBlock = make_block(largestBlockSize);
150 if (firstBlock == nullptr) {
151#ifdef MOODYCAMEL_EXCEPTIONS_ENABLED
152 throw std::bad_alloc();
153#else
154 abort();
155#endif
156 }
157 firstBlock->next = firstBlock;
158 }
159 frontBlock = firstBlock;
160 tailBlock = firstBlock;
161
162 // Make sure the reader/writer threads will have the initialized memory setup above:
163 fence(memory_order_sync);
164 }
165
166 // Note: The queue should not be accessed concurrently while it's
167 // being moved. It's up to the user to synchronize this.
168 AE_NO_TSAN ReaderWriterQueue(ReaderWriterQueue&& other)
169 : frontBlock(other.frontBlock.load()), tailBlock(other.tailBlock.load()),
170 largestBlockSize(other.largestBlockSize)
171#ifndef NDEBUG
172 ,
173 enqueuing(false), dequeuing(false)
174#endif
175 {
176 other.largestBlockSize = 32;
177 Block* b = other.make_block(other.largestBlockSize);
178 if (b == nullptr) {
179#ifdef MOODYCAMEL_EXCEPTIONS_ENABLED
180 throw std::bad_alloc();
181#else
182 abort();
183#endif
184 }
185 b->next = b;
186 other.frontBlock = b;
187 other.tailBlock = b;
188 }
189
190 // Note: The queue should not be accessed concurrently while it's
191 // being moved. It's up to the user to synchronize this.
192 ReaderWriterQueue& operator=(ReaderWriterQueue&& other) AE_NO_TSAN {
193 Block* b = frontBlock.load();
194 frontBlock = other.frontBlock.load();
195 other.frontBlock = b;
196 b = tailBlock.load();
197 tailBlock = other.tailBlock.load();
198 other.tailBlock = b;
199 std::swap(largestBlockSize, other.largestBlockSize);
200 return *this;
201 }
202
203 // Note: The queue should not be accessed concurrently while it's
204 // being deleted. It's up to the user to synchronize this.
205 AE_NO_TSAN ~ReaderWriterQueue() {
206 // Make sure we get the latest version of all variables from other CPUs:
207 fence(memory_order_sync);
208
209 // Destroy any remaining objects in queue and free memory
210 Block* frontBlock_ = frontBlock;
211 Block* block = frontBlock_;
212 do {
213 Block* nextBlock = block->next;
214 size_t blockFront = block->front;
215 size_t blockTail = block->tail;
216
217 for (size_t i = blockFront; i != blockTail; i = (i + 1) & block->sizeMask) {
218 auto element = reinterpret_cast<T*>(block->data + i * sizeof(T));
219 element->~T();
220 (void)element;
221 }
222
223 auto rawBlock = block->rawThis;
224 block->~Block();
225 std::free(rawBlock);
226 block = nextBlock;
227 } while (block != frontBlock_);
228 }
229
230 // Enqueues a copy of element if there is room in the queue.
231 // Returns true if the element was enqueued, false otherwise.
232 // Does not allocate memory.
233 AE_FORCEINLINE bool try_enqueue(T const& element) AE_NO_TSAN {
234 return inner_enqueue<CannotAlloc>(element);
235 }
236
237 // Enqueues a moved copy of element if there is room in the queue.
238 // Returns true if the element was enqueued, false otherwise.
239 // Does not allocate memory.
240 AE_FORCEINLINE bool try_enqueue(T&& element) AE_NO_TSAN {
241 return inner_enqueue<CannotAlloc>(std::forward<T>(element));
242 }
243
244#if MOODYCAMEL_HAS_EMPLACE
245 // Like try_enqueue() but with emplace semantics (i.e. construct-in-place).
246 template <typename... Args>
247 AE_FORCEINLINE bool try_emplace(Args&&... args) AE_NO_TSAN {
248 return inner_enqueue<CannotAlloc>(std::forward<Args>(args)...);
249 }
250#endif
251
252 // Enqueues a copy of element on the queue.
253 // Allocates an additional block of memory if needed.
254 // Only fails (returns false) if memory allocation fails.
255 AE_FORCEINLINE bool enqueue(T const& element) AE_NO_TSAN {
256 return inner_enqueue<CanAlloc>(element);
257 }
258
259 // Enqueues a moved copy of element on the queue.
260 // Allocates an additional block of memory if needed.
261 // Only fails (returns false) if memory allocation fails.
262 AE_FORCEINLINE bool enqueue(T&& element) AE_NO_TSAN {
263 return inner_enqueue<CanAlloc>(std::forward<T>(element));
264 }
265
266#if MOODYCAMEL_HAS_EMPLACE
267 // Like enqueue() but with emplace semantics (i.e. construct-in-place).
268 template <typename... Args>
269 AE_FORCEINLINE bool emplace(Args&&... args) AE_NO_TSAN {
270 return inner_enqueue<CanAlloc>(std::forward<Args>(args)...);
271 }
272#endif
273
274 // Attempts to dequeue an element; if the queue is empty,
275 // returns false instead. If the queue has at least one element,
276 // moves front to result using operator=, then returns true.
277 template <typename U>
278 bool try_dequeue(U& result) AE_NO_TSAN {
279#ifndef NDEBUG
280 ReentrantGuard guard(this->dequeuing);
281#endif
282
283 // High-level pseudocode:
284 // Remember where the tail block is
285 // If the front block has an element in it, dequeue it
286 // Else
287 // If front block was the tail block when we entered the function, return false
288 // Else advance to next block and dequeue the item there
289
290 // Note that we have to use the value of the tail block from before we check if the front
291 // block is full or not, in case the front block is empty and then, before we check if the
292 // tail block is at the front block or not, the producer fills up the front block *and
293 // moves on*, which would make us skip a filled block. Seems unlikely, but was consistently
294 // reproducible in practice.
295 // In order to avoid overhead in the common case, though, we do a double-checked pattern
296 // where we have the fast path if the front block is not empty, then read the tail block,
297 // then re-read the front block and check if it's not empty again, then check if the tail
298 // block has advanced.
299
300 Block* frontBlock_ = frontBlock.load();
301 size_t blockTail = frontBlock_->localTail;
302 size_t blockFront = frontBlock_->front.load();
303
304 if (blockFront != blockTail ||
305 blockFront != (frontBlock_->localTail = frontBlock_->tail.load())) {
306 fence(memory_order_acquire);
307
308 non_empty_front_block:
309 // Front block not empty, dequeue from here
310 auto element = reinterpret_cast<T*>(frontBlock_->data + blockFront * sizeof(T));
311 result = std::move(*element);
312 element->~T();
313
314 blockFront = (blockFront + 1) & frontBlock_->sizeMask;
315
316 fence(memory_order_release);
317 frontBlock_->front = blockFront;
318 } else if (frontBlock_ != tailBlock.load()) {
319 fence(memory_order_acquire);
320
321 frontBlock_ = frontBlock.load();
322 blockTail = frontBlock_->localTail = frontBlock_->tail.load();
323 blockFront = frontBlock_->front.load();
324 fence(memory_order_acquire);
325
326 if (blockFront != blockTail) {
327 // Oh look, the front block isn't empty after all
328 goto non_empty_front_block;
329 }
330
331 // Front block is empty but there's another block ahead, advance to it
332 Block* nextBlock = frontBlock_->next;
333 // Don't need an acquire fence here since next can only ever be set on the tailBlock,
334 // and we're not the tailBlock, and we did an acquire earlier after reading tailBlock
335 // which ensures next is up-to-date on this CPU in case we recently were at tailBlock.
336
337 size_t nextBlockFront = nextBlock->front.load();
338 size_t nextBlockTail = nextBlock->localTail = nextBlock->tail.load();
339 fence(memory_order_acquire);
340
341 // Since the tailBlock is only ever advanced after being written to,
342 // we know there's for sure an element to dequeue on it
343 assert(nextBlockFront != nextBlockTail);
344 AE_UNUSED(nextBlockTail);
345
346 // We're done with this block, let the producer use it if it needs
347 fence(memory_order_release); // Expose possibly pending changes to frontBlock->front
348 // from last dequeue
349 frontBlock = frontBlock_ = nextBlock;
350
351 compiler_fence(memory_order_release); // Not strictly needed
352
353 auto element = reinterpret_cast<T*>(frontBlock_->data + nextBlockFront * sizeof(T));
354
355 result = std::move(*element);
356 element->~T();
357
358 nextBlockFront = (nextBlockFront + 1) & frontBlock_->sizeMask;
359
360 fence(memory_order_release);
361 frontBlock_->front = nextBlockFront;
362 } else {
363 // No elements in current block and no other block to advance to
364 return false;
365 }
366
367 return true;
368 }
369
370 // Returns a pointer to the front element in the queue (the one that
371 // would be removed next by a call to `try_dequeue` or `pop`). If the
372 // queue appears empty at the time the method is called, nullptr is
373 // returned instead.
374 // Must be called only from the consumer thread.
375 T* peek() const AE_NO_TSAN {
376#ifndef NDEBUG
377 ReentrantGuard guard(this->dequeuing);
378#endif
379 // See try_dequeue() for reasoning
380
381 Block* frontBlock_ = frontBlock.load();
382 size_t blockTail = frontBlock_->localTail;
383 size_t blockFront = frontBlock_->front.load();
384
385 if (blockFront != blockTail ||
386 blockFront != (frontBlock_->localTail = frontBlock_->tail.load())) {
387 fence(memory_order_acquire);
388 non_empty_front_block:
389 return reinterpret_cast<T*>(frontBlock_->data + blockFront * sizeof(T));
390 } else if (frontBlock_ != tailBlock.load()) {
391 fence(memory_order_acquire);
392 frontBlock_ = frontBlock.load();
393 blockTail = frontBlock_->localTail = frontBlock_->tail.load();
394 blockFront = frontBlock_->front.load();
395 fence(memory_order_acquire);
396
397 if (blockFront != blockTail) {
398 goto non_empty_front_block;
399 }
400
401 Block* nextBlock = frontBlock_->next;
402
403 size_t nextBlockFront = nextBlock->front.load();
404 fence(memory_order_acquire);
405
406 assert(nextBlockFront != nextBlock->tail.load());
407 return reinterpret_cast<T*>(nextBlock->data + nextBlockFront * sizeof(T));
408 }
409
410 return nullptr;
411 }
412
413 // Removes the front element from the queue, if any, without returning it.
414 // Returns true on success, or false if the queue appeared empty at the time
415 // `pop` was called.
416 bool pop() AE_NO_TSAN {
417#ifndef NDEBUG
418 ReentrantGuard guard(this->dequeuing);
419#endif
420 // See try_dequeue() for reasoning
421
422 Block* frontBlock_ = frontBlock.load();
423 size_t blockTail = frontBlock_->localTail;
424 size_t blockFront = frontBlock_->front.load();
425
426 if (blockFront != blockTail ||
427 blockFront != (frontBlock_->localTail = frontBlock_->tail.load())) {
428 fence(memory_order_acquire);
429
430 non_empty_front_block:
431 auto element = reinterpret_cast<T*>(frontBlock_->data + blockFront * sizeof(T));
432 element->~T();
433
434 blockFront = (blockFront + 1) & frontBlock_->sizeMask;
435
436 fence(memory_order_release);
437 frontBlock_->front = blockFront;
438 } else if (frontBlock_ != tailBlock.load()) {
439 fence(memory_order_acquire);
440 frontBlock_ = frontBlock.load();
441 blockTail = frontBlock_->localTail = frontBlock_->tail.load();
442 blockFront = frontBlock_->front.load();
443 fence(memory_order_acquire);
444
445 if (blockFront != blockTail) {
446 goto non_empty_front_block;
447 }
448
449 // Front block is empty but there's another block ahead, advance to it
450 Block* nextBlock = frontBlock_->next;
451
452 size_t nextBlockFront = nextBlock->front.load();
453 size_t nextBlockTail = nextBlock->localTail = nextBlock->tail.load();
454 fence(memory_order_acquire);
455
456 assert(nextBlockFront != nextBlockTail);
457 AE_UNUSED(nextBlockTail);
458
459 fence(memory_order_release);
460 frontBlock = frontBlock_ = nextBlock;
461
462 compiler_fence(memory_order_release);
463
464 auto element = reinterpret_cast<T*>(frontBlock_->data + nextBlockFront * sizeof(T));
465 element->~T();
466
467 nextBlockFront = (nextBlockFront + 1) & frontBlock_->sizeMask;
468
469 fence(memory_order_release);
470 frontBlock_->front = nextBlockFront;
471 } else {
472 // No elements in current block and no other block to advance to
473 return false;
474 }
475
476 return true;
477 }
478
479 // Returns the approximate number of items currently in the queue.
480 // Safe to call from both the producer and consumer threads.
481 inline size_t size_approx() const AE_NO_TSAN {
482 size_t result = 0;
483 Block* frontBlock_ = frontBlock.load();
484 Block* block = frontBlock_;
485 do {
486 fence(memory_order_acquire);
487 size_t blockFront = block->front.load();
488 size_t blockTail = block->tail.load();
489 result += (blockTail - blockFront) & block->sizeMask;
490 block = block->next.load();
491 } while (block != frontBlock_);
492 return result;
493 }
494
495 // Returns the total number of items that could be enqueued without incurring
496 // an allocation when this queue is empty.
497 // Safe to call from both the producer and consumer threads.
498 //
499 // NOTE: The actual capacity during usage may be different depending on the consumer.
500 // If the consumer is removing elements concurrently, the producer cannot add to
501 // the block the consumer is removing from until it's completely empty, except in
502 // the case where the producer was writing to the same block the consumer was
503 // reading from the whole time.
504 inline size_t max_capacity() const {
505 size_t result = 0;
506 Block* frontBlock_ = frontBlock.load();
507 Block* block = frontBlock_;
508 do {
509 fence(memory_order_acquire);
510 result += block->sizeMask;
511 block = block->next.load();
512 } while (block != frontBlock_);
513 return result;
514 }
515
516private:
517 enum AllocationMode { CanAlloc, CannotAlloc };
518
519#if MOODYCAMEL_HAS_EMPLACE
520 template <AllocationMode canAlloc, typename... Args>
521 bool inner_enqueue(Args&&... args) AE_NO_TSAN
522#else
523 template <AllocationMode canAlloc, typename U>
524 bool inner_enqueue(U&& element) AE_NO_TSAN
525#endif
526 {
527#ifndef NDEBUG
528 ReentrantGuard guard(this->enqueuing);
529#endif
530
531 // High-level pseudocode (assuming we're allowed to alloc a new block):
532 // If room in tail block, add to tail
533 // Else check next block
534 // If next block is not the head block, enqueue on next block
535 // Else create a new block and enqueue there
536 // Advance tail to the block we just enqueued to
537
538 Block* tailBlock_ = tailBlock.load();
539 size_t blockFront = tailBlock_->localFront;
540 size_t blockTail = tailBlock_->tail.load();
541
542 size_t nextBlockTail = (blockTail + 1) & tailBlock_->sizeMask;
543 if (nextBlockTail != blockFront ||
544 nextBlockTail != (tailBlock_->localFront = tailBlock_->front.load())) {
545 fence(memory_order_acquire);
546 // This block has room for at least one more element
547 char* location = tailBlock_->data + blockTail * sizeof(T);
548#if MOODYCAMEL_HAS_EMPLACE
549 new (location) T(std::forward<Args>(args)...);
550#else
551 new (location) T(std::forward<U>(element));
552#endif
553
554 fence(memory_order_release);
555 tailBlock_->tail = nextBlockTail;
556 } else {
557 fence(memory_order_acquire);
558 if (tailBlock_->next.load() != frontBlock) {
559 // Note that the reason we can't advance to the frontBlock and start adding new
560 // entries there is because if we did, then dequeue would stay in that block,
561 // eventually reading the new values, instead of advancing to the next full block
562 // (whose values were enqueued first and so should be consumed first).
563
564 fence(memory_order_acquire); // Ensure we get latest writes if we got the latest
565 // frontBlock
566
567 // tailBlock is full, but there's a free block ahead, use it
568 Block* tailBlockNext = tailBlock_->next.load();
569 size_t nextBlockFront = tailBlockNext->localFront = tailBlockNext->front.load();
570 nextBlockTail = tailBlockNext->tail.load();
571 fence(memory_order_acquire);
572
573 // This block must be empty since it's not the head block and we
574 // go through the blocks in a circle
575 assert(nextBlockFront == nextBlockTail);
576 tailBlockNext->localFront = nextBlockFront;
577
578 char* location = tailBlockNext->data + nextBlockTail * sizeof(T);
579#if MOODYCAMEL_HAS_EMPLACE
580 new (location) T(std::forward<Args>(args)...);
581#else
582 new (location) T(std::forward<U>(element));
583#endif
584
585 tailBlockNext->tail = (nextBlockTail + 1) & tailBlockNext->sizeMask;
586
587 fence(memory_order_release);
588 tailBlock = tailBlockNext;
589 } else if (canAlloc == CanAlloc) {
590 // tailBlock is full and there's no free block ahead; create a new block
591 auto newBlockSize =
592 largestBlockSize >= MAX_BLOCK_SIZE ? largestBlockSize : largestBlockSize * 2;
593 auto newBlock = make_block(newBlockSize);
594 if (newBlock == nullptr) {
595 // Could not allocate a block!
596 return false;
597 }
598 largestBlockSize = newBlockSize;
599
600#if MOODYCAMEL_HAS_EMPLACE
601 new (newBlock->data) T(std::forward<Args>(args)...);
602#else
603 new (newBlock->data) T(std::forward<U>(element));
604#endif
605 assert(newBlock->front == 0);
606 newBlock->tail = newBlock->localTail = 1;
607
608 newBlock->next = tailBlock_->next.load();
609 tailBlock_->next = newBlock;
610
611 // Might be possible for the dequeue thread to see the new tailBlock->next
612 // *without* seeing the new tailBlock value, but this is OK since it can't
613 // advance to the next block until tailBlock is set anyway (because the only
614 // case where it could try to read the next is if it's already at the tailBlock,
615 // and it won't advance past tailBlock in any circumstance).
616
617 fence(memory_order_release);
618 tailBlock = newBlock;
619 } else if (canAlloc == CannotAlloc) {
620 // Would have had to allocate a new block to enqueue, but not allowed
621 return false;
622 } else {
623 assert(false && "Should be unreachable code");
624 return false;
625 }
626 }
627
628 return true;
629 }
630
631 // Disable copying
632 ReaderWriterQueue(ReaderWriterQueue const&) {}
633
634 // Disable assignment
635 ReaderWriterQueue& operator=(ReaderWriterQueue const&) {}
636
637 AE_FORCEINLINE static size_t ceilToPow2(size_t x) {
638 // From http://graphics.stanford.edu/~seander/bithacks.html#RoundUpPowerOf2
639 --x;
640 x |= x >> 1;
641 x |= x >> 2;
642 x |= x >> 4;
643 for (size_t i = 1; i < sizeof(size_t); i <<= 1) {
644 x |= x >> (i << 3);
645 }
646 ++x;
647 return x;
648 }
649
650 template <typename U>
651 static AE_FORCEINLINE char* align_for(char* ptr) AE_NO_TSAN {
652 const std::size_t alignment = std::alignment_of<U>::value;
653 return ptr + (alignment - (reinterpret_cast<std::uintptr_t>(ptr) % alignment)) % alignment;
654 }
655
656private:
657#ifndef NDEBUG
658 struct ReentrantGuard {
659 AE_NO_TSAN ReentrantGuard(weak_atomic<bool>& _inSection) : inSection(_inSection) {
660 assert(!inSection &&
661 "Concurrent (or re-entrant) enqueue or dequeue operation detected (only one "
662 "thread at a time may hold the producer or consumer role)");
663 inSection = true;
664 }
665
666 AE_NO_TSAN ~ReentrantGuard() {
667 inSection = false;
668 }
669
670 private:
671 ReentrantGuard& operator=(ReentrantGuard const&);
672
673 private:
674 weak_atomic<bool>& inSection;
675 };
676#endif
677
678 struct Block {
679 // Avoid false-sharing by putting highly contended variables on their own cache lines
680 weak_atomic<size_t> front; // (Atomic) Elements are read from here
681 size_t localTail; // An uncontended shadow copy of tail, owned by the consumer
682
683 char cachelineFiller0[MOODYCAMEL_CACHE_LINE_SIZE - sizeof(weak_atomic<size_t>) -
684 sizeof(size_t)];
685 weak_atomic<size_t> tail; // (Atomic) Elements are enqueued here
686 size_t localFront;
687
688 char cachelineFiller1[MOODYCAMEL_CACHE_LINE_SIZE - sizeof(weak_atomic<size_t>) -
689 sizeof(size_t)]; // next isn't very contended, but we don't want it on
690 // the same cache line as tail (which is)
691 weak_atomic<Block*> next; // (Atomic)
692
693 char* data; // Contents (on heap) are aligned to T's alignment
694
695 const size_t sizeMask;
696
697 // size must be a power of two (and greater than 0)
698 AE_NO_TSAN Block(size_t const& _size, char* _rawThis, char* _data)
699 : front(0UL), localTail(0), tail(0UL), localFront(0), next(nullptr), data(_data),
700 sizeMask(_size - 1), rawThis(_rawThis) {}
701
702 private:
703 // C4512 - Assignment operator could not be generated
704 Block& operator=(Block const&);
705
706 public:
707 char* rawThis;
708 };
709
710 static Block* make_block(size_t capacity) AE_NO_TSAN {
711 // Allocate enough memory for the block itself, as well as all the elements it will contain
712 auto size = sizeof(Block) + std::alignment_of<Block>::value - 1;
713 size += sizeof(T) * capacity + std::alignment_of<T>::value - 1;
714 auto newBlockRaw = static_cast<char*>(std::malloc(size));
715 if (newBlockRaw == nullptr) {
716 return nullptr;
717 }
718
719 auto newBlockAligned = align_for<Block>(newBlockRaw);
720 auto newBlockData = align_for<T>(newBlockAligned + sizeof(Block));
721 return new (newBlockAligned) Block(capacity, newBlockRaw, newBlockData);
722 }
723
724private:
725 weak_atomic<Block*> frontBlock; // (Atomic) Elements are dequeued from this block
726
727 char cachelineFiller[MOODYCAMEL_CACHE_LINE_SIZE - sizeof(weak_atomic<Block*>)];
728 weak_atomic<Block*> tailBlock; // (Atomic) Elements are enqueued to this block
729
730 size_t largestBlockSize;
731
732#ifndef NDEBUG
733 weak_atomic<bool> enqueuing;
734 mutable weak_atomic<bool> dequeuing;
735#endif
736};
737
738// Like ReaderWriterQueue, but also providees blocking operations
739template <typename T, size_t MAX_BLOCK_SIZE = 512>
740class BlockingReaderWriterQueue {
741private:
742 typedef ::Common::ReaderWriterQueue<T, MAX_BLOCK_SIZE> ReaderWriterQueue;
743
744public:
745 explicit BlockingReaderWriterQueue(size_t size = 15) AE_NO_TSAN
746 : inner(size),
747 sema(new spsc_sema::LightweightSemaphore()) {}
748
749 BlockingReaderWriterQueue(BlockingReaderWriterQueue&& other) AE_NO_TSAN
750 : inner(std::move(other.inner)),
751 sema(std::move(other.sema)) {}
752
753 BlockingReaderWriterQueue& operator=(BlockingReaderWriterQueue&& other) AE_NO_TSAN {
754 std::swap(sema, other.sema);
755 std::swap(inner, other.inner);
756 return *this;
757 }
758
759 // Enqueues a copy of element if there is room in the queue.
760 // Returns true if the element was enqueued, false otherwise.
761 // Does not allocate memory.
762 AE_FORCEINLINE bool try_enqueue(T const& element) AE_NO_TSAN {
763 if (inner.try_enqueue(element)) {
764 sema->signal();
765 return true;
766 }
767 return false;
768 }
769
770 // Enqueues a moved copy of element if there is room in the queue.
771 // Returns true if the element was enqueued, false otherwise.
772 // Does not allocate memory.
773 AE_FORCEINLINE bool try_enqueue(T&& element) AE_NO_TSAN {
774 if (inner.try_enqueue(std::forward<T>(element))) {
775 sema->signal();
776 return true;
777 }
778 return false;
779 }
780
781#if MOODYCAMEL_HAS_EMPLACE
782 // Like try_enqueue() but with emplace semantics (i.e. construct-in-place).
783 template <typename... Args>
784 AE_FORCEINLINE bool try_emplace(Args&&... args) AE_NO_TSAN {
785 if (inner.try_emplace(std::forward<Args>(args)...)) {
786 sema->signal();
787 return true;
788 }
789 return false;
790 }
791#endif
792
793 // Enqueues a copy of element on the queue.
794 // Allocates an additional block of memory if needed.
795 // Only fails (returns false) if memory allocation fails.
796 AE_FORCEINLINE bool enqueue(T const& element) AE_NO_TSAN {
797 if (inner.enqueue(element)) {
798 sema->signal();
799 return true;
800 }
801 return false;
802 }
803
804 // Enqueues a moved copy of element on the queue.
805 // Allocates an additional block of memory if needed.
806 // Only fails (returns false) if memory allocation fails.
807 AE_FORCEINLINE bool enqueue(T&& element) AE_NO_TSAN {
808 if (inner.enqueue(std::forward<T>(element))) {
809 sema->signal();
810 return true;
811 }
812 return false;
813 }
814
815#if MOODYCAMEL_HAS_EMPLACE
816 // Like enqueue() but with emplace semantics (i.e. construct-in-place).
817 template <typename... Args>
818 AE_FORCEINLINE bool emplace(Args&&... args) AE_NO_TSAN {
819 if (inner.emplace(std::forward<Args>(args)...)) {
820 sema->signal();
821 return true;
822 }
823 return false;
824 }
825#endif
826
827 // Attempts to dequeue an element; if the queue is empty,
828 // returns false instead. If the queue has at least one element,
829 // moves front to result using operator=, then returns true.
830 template <typename U>
831 bool try_dequeue(U& result) AE_NO_TSAN {
832 if (sema->tryWait()) {
833 bool success = inner.try_dequeue(result);
834 assert(success);
835 AE_UNUSED(success);
836 return true;
837 }
838 return false;
839 }
840
841 // Attempts to dequeue an element; if the queue is empty,
842 // waits until an element is available, then dequeues it.
843 template <typename U>
844 void wait_dequeue(U& result) AE_NO_TSAN {
845 while (!sema->wait())
846 ;
847 bool success = inner.try_dequeue(result);
848 AE_UNUSED(result);
849 assert(success);
850 AE_UNUSED(success);
851 }
852
853 // Attempts to dequeue an element; if the queue is empty,
854 // waits until an element is available up to the specified timeout,
855 // then dequeues it and returns true, or returns false if the timeout
856 // expires before an element can be dequeued.
857 // Using a negative timeout indicates an indefinite timeout,
858 // and is thus functionally equivalent to calling wait_dequeue.
859 template <typename U>
860 bool wait_dequeue_timed(U& result, std::int64_t timeout_usecs) AE_NO_TSAN {
861 if (!sema->wait(timeout_usecs)) {
862 return false;
863 }
864 bool success = inner.try_dequeue(result);
865 AE_UNUSED(result);
866 assert(success);
867 AE_UNUSED(success);
868 return true;
869 }
870
871#if __cplusplus > 199711L || _MSC_VER >= 1700
872 // Attempts to dequeue an element; if the queue is empty,
873 // waits until an element is available up to the specified timeout,
874 // then dequeues it and returns true, or returns false if the timeout
875 // expires before an element can be dequeued.
876 // Using a negative timeout indicates an indefinite timeout,
877 // and is thus functionally equivalent to calling wait_dequeue.
878 template <typename U, typename Rep, typename Period>
879 inline bool wait_dequeue_timed(U& result,
880 std::chrono::duration<Rep, Period> const& timeout) AE_NO_TSAN {
881 return wait_dequeue_timed(
882 result, std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
883 }
884#endif
885
886 // Returns a pointer to the front element in the queue (the one that
887 // would be removed next by a call to `try_dequeue` or `pop`). If the
888 // queue appears empty at the time the method is called, nullptr is
889 // returned instead.
890 // Must be called only from the consumer thread.
891 AE_FORCEINLINE T* peek() const AE_NO_TSAN {
892 return inner.peek();
893 }
894
895 // Removes the front element from the queue, if any, without returning it.
896 // Returns true on success, or false if the queue appeared empty at the time
897 // `pop` was called.
898 AE_FORCEINLINE bool pop() AE_NO_TSAN {
899 if (sema->tryWait()) {
900 bool result = inner.pop();
901 assert(result);
902 AE_UNUSED(result);
903 return true;
904 }
905 return false;
906 }
907
908 // Returns the approximate number of items currently in the queue.
909 // Safe to call from both the producer and consumer threads.
910 AE_FORCEINLINE size_t size_approx() const AE_NO_TSAN {
911 return sema->availableApprox();
912 }
913
914 // Returns the total number of items that could be enqueued without incurring
915 // an allocation when this queue is empty.
916 // Safe to call from both the producer and consumer threads.
917 //
918 // NOTE: The actual capacity during usage may be different depending on the consumer.
919 // If the consumer is removing elements concurrently, the producer cannot add to
920 // the block the consumer is removing from until it's completely empty, except in
921 // the case where the producer was writing to the same block the consumer was
922 // reading from the whole time.
923 AE_FORCEINLINE size_t max_capacity() const {
924 return inner.max_capacity();
925 }
926
927private:
928 // Disable copying & assignment
929 BlockingReaderWriterQueue(BlockingReaderWriterQueue const&) {}
930 BlockingReaderWriterQueue& operator=(BlockingReaderWriterQueue const&) {}
931
932private:
933 ReaderWriterQueue inner;
934 std::unique_ptr<spsc_sema::LightweightSemaphore> sema;
935};
936
937} // namespace Common
938
939#ifdef AE_VCPP
940#pragma warning(pop)
941#endif