summaryrefslogtreecommitdiff
path: root/src/common/reader_writer_queue.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/common/reader_writer_queue.h')
-rw-r--r--src/common/reader_writer_queue.h940
1 files changed, 940 insertions, 0 deletions
diff --git a/src/common/reader_writer_queue.h b/src/common/reader_writer_queue.h
new file mode 100644
index 000000000..60c41a8cb
--- /dev/null
+++ b/src/common/reader_writer_queue.h
@@ -0,0 +1,940 @@
1// SPDX-FileCopyrightText: 2013-2020 Cameron Desrochers
2// SPDX-License-Identifier: BSD-2-Clause
3
4#pragma once
5
6#include <cassert>
7#include <cstdint>
8#include <cstdlib> // For malloc/free/abort & size_t
9#include <memory>
10#include <new>
11#include <stdexcept>
12#include <type_traits>
13#include <utility>
14
15#include "common/atomic_helpers.h"
16
17#if __cplusplus > 199711L || _MSC_VER >= 1700 // C++11 or VS2012
18#include <chrono>
19#endif
20
21// A lock-free queue for a single-consumer, single-producer architecture.
22// The queue is also wait-free in the common path (except if more memory
23// needs to be allocated, in which case malloc is called).
24// Allocates memory sparingly, and only once if the original maximum size
25// estimate is never exceeded.
26// Tested on x86/x64 processors, but semantics should be correct for all
27// architectures (given the right implementations in atomicops.h), provided
28// that aligned integer and pointer accesses are naturally atomic.
29// Note that there should only be one consumer thread and producer thread;
30// Switching roles of the threads, or using multiple consecutive threads for
31// one role, is not safe unless properly synchronized.
32// Using the queue exclusively from one thread is fine, though a bit silly.
33
34#ifndef MOODYCAMEL_CACHE_LINE_SIZE
35#define MOODYCAMEL_CACHE_LINE_SIZE 64
36#endif
37
38#ifndef MOODYCAMEL_EXCEPTIONS_ENABLED
39#if (defined(_MSC_VER) && defined(_CPPUNWIND)) || (defined(__GNUC__) && defined(__EXCEPTIONS)) || \
40 (!defined(_MSC_VER) && !defined(__GNUC__))
41#define MOODYCAMEL_EXCEPTIONS_ENABLED
42#endif
43#endif
44
45#ifndef MOODYCAMEL_HAS_EMPLACE
46#if !defined(_MSC_VER) || \
47 _MSC_VER >= 1800 // variadic templates: either a non-MS compiler or VS >= 2013
48#define MOODYCAMEL_HAS_EMPLACE 1
49#endif
50#endif
51
52#ifndef MOODYCAMEL_MAYBE_ALIGN_TO_CACHELINE
53#if defined(__APPLE__) && defined(__MACH__) && __cplusplus >= 201703L
54// This is required to find out what deployment target we are using
55#include <CoreFoundation/CoreFoundation.h>
56#if !defined(MAC_OS_X_VERSION_MIN_REQUIRED) || \
57 MAC_OS_X_VERSION_MIN_REQUIRED < MAC_OS_X_VERSION_10_14
58// C++17 new(size_t, align_val_t) is not backwards-compatible with older versions of macOS, so we
59// can't support over-alignment in this case
60#define MOODYCAMEL_MAYBE_ALIGN_TO_CACHELINE
61#endif
62#endif
63#endif
64
65#ifndef MOODYCAMEL_MAYBE_ALIGN_TO_CACHELINE
66#define MOODYCAMEL_MAYBE_ALIGN_TO_CACHELINE AE_ALIGN(MOODYCAMEL_CACHE_LINE_SIZE)
67#endif
68
69#ifdef AE_VCPP
70#pragma warning(push)
71#pragma warning(disable : 4324) // structure was padded due to __declspec(align())
72#pragma warning(disable : 4820) // padding was added
73#pragma warning(disable : 4127) // conditional expression is constant
74#endif
75
76namespace Common {
77
78template <typename T, size_t MAX_BLOCK_SIZE = 512>
79class MOODYCAMEL_MAYBE_ALIGN_TO_CACHELINE ReaderWriterQueue {
80 // Design: Based on a queue-of-queues. The low-level queues are just
81 // circular buffers with front and tail indices indicating where the
82 // next element to dequeue is and where the next element can be enqueued,
83 // respectively. Each low-level queue is called a "block". Each block
84 // wastes exactly one element's worth of space to keep the design simple
85 // (if front == tail then the queue is empty, and can't be full).
86 // The high-level queue is a circular linked list of blocks; again there
87 // is a front and tail, but this time they are pointers to the blocks.
88 // The front block is where the next element to be dequeued is, provided
89 // the block is not empty. The back block is where elements are to be
90 // enqueued, provided the block is not full.
91 // The producer thread owns all the tail indices/pointers. The consumer
92 // thread owns all the front indices/pointers. Both threads read each
93 // other's variables, but only the owning thread updates them. E.g. After
94 // the consumer reads the producer's tail, the tail may change before the
95 // consumer is done dequeuing an object, but the consumer knows the tail
96 // will never go backwards, only forwards.
97 // If there is no room to enqueue an object, an additional block (of
98 // equal size to the last block) is added. Blocks are never removed.
99
100public:
101 typedef T value_type;
102
103 // Constructs a queue that can hold at least `size` elements without further
104 // allocations. If more than MAX_BLOCK_SIZE elements are requested,
105 // then several blocks of MAX_BLOCK_SIZE each are reserved (including
106 // at least one extra buffer block).
107 AE_NO_TSAN explicit ReaderWriterQueue(size_t size = 15)
108#ifndef NDEBUG
109 : enqueuing(false), dequeuing(false)
110#endif
111 {
112 assert(MAX_BLOCK_SIZE == ceilToPow2(MAX_BLOCK_SIZE) &&
113 "MAX_BLOCK_SIZE must be a power of 2");
114 assert(MAX_BLOCK_SIZE >= 2 && "MAX_BLOCK_SIZE must be at least 2");
115
116 Block* firstBlock = nullptr;
117
118 largestBlockSize =
119 ceilToPow2(size + 1); // We need a spare slot to fit size elements in the block
120 if (largestBlockSize > MAX_BLOCK_SIZE * 2) {
121 // We need a spare block in case the producer is writing to a different block the
122 // consumer is reading from, and wants to enqueue the maximum number of elements. We
123 // also need a spare element in each block to avoid the ambiguity between front == tail
124 // meaning "empty" and "full". So the effective number of slots that are guaranteed to
125 // be usable at any time is the block size - 1 times the number of blocks - 1. Solving
126 // for size and applying a ceiling to the division gives us (after simplifying):
127 size_t initialBlockCount = (size + MAX_BLOCK_SIZE * 2 - 3) / (MAX_BLOCK_SIZE - 1);
128 largestBlockSize = MAX_BLOCK_SIZE;
129 Block* lastBlock = nullptr;
130 for (size_t i = 0; i != initialBlockCount; ++i) {
131 auto block = make_block(largestBlockSize);
132 if (block == nullptr) {
133#ifdef MOODYCAMEL_EXCEPTIONS_ENABLED
134 throw std::bad_alloc();
135#else
136 abort();
137#endif
138 }
139 if (firstBlock == nullptr) {
140 firstBlock = block;
141 } else {
142 lastBlock->next = block;
143 }
144 lastBlock = block;
145 block->next = firstBlock;
146 }
147 } else {
148 firstBlock = make_block(largestBlockSize);
149 if (firstBlock == nullptr) {
150#ifdef MOODYCAMEL_EXCEPTIONS_ENABLED
151 throw std::bad_alloc();
152#else
153 abort();
154#endif
155 }
156 firstBlock->next = firstBlock;
157 }
158 frontBlock = firstBlock;
159 tailBlock = firstBlock;
160
161 // Make sure the reader/writer threads will have the initialized memory setup above:
162 fence(memory_order_sync);
163 }
164
165 // Note: The queue should not be accessed concurrently while it's
166 // being moved. It's up to the user to synchronize this.
167 AE_NO_TSAN ReaderWriterQueue(ReaderWriterQueue&& other)
168 : frontBlock(other.frontBlock.load()), tailBlock(other.tailBlock.load()),
169 largestBlockSize(other.largestBlockSize)
170#ifndef NDEBUG
171 ,
172 enqueuing(false), dequeuing(false)
173#endif
174 {
175 other.largestBlockSize = 32;
176 Block* b = other.make_block(other.largestBlockSize);
177 if (b == nullptr) {
178#ifdef MOODYCAMEL_EXCEPTIONS_ENABLED
179 throw std::bad_alloc();
180#else
181 abort();
182#endif
183 }
184 b->next = b;
185 other.frontBlock = b;
186 other.tailBlock = b;
187 }
188
189 // Note: The queue should not be accessed concurrently while it's
190 // being moved. It's up to the user to synchronize this.
191 ReaderWriterQueue& operator=(ReaderWriterQueue&& other) AE_NO_TSAN {
192 Block* b = frontBlock.load();
193 frontBlock = other.frontBlock.load();
194 other.frontBlock = b;
195 b = tailBlock.load();
196 tailBlock = other.tailBlock.load();
197 other.tailBlock = b;
198 std::swap(largestBlockSize, other.largestBlockSize);
199 return *this;
200 }
201
202 // Note: The queue should not be accessed concurrently while it's
203 // being deleted. It's up to the user to synchronize this.
204 AE_NO_TSAN ~ReaderWriterQueue() {
205 // Make sure we get the latest version of all variables from other CPUs:
206 fence(memory_order_sync);
207
208 // Destroy any remaining objects in queue and free memory
209 Block* frontBlock_ = frontBlock;
210 Block* block = frontBlock_;
211 do {
212 Block* nextBlock = block->next;
213 size_t blockFront = block->front;
214 size_t blockTail = block->tail;
215
216 for (size_t i = blockFront; i != blockTail; i = (i + 1) & block->sizeMask) {
217 auto element = reinterpret_cast<T*>(block->data + i * sizeof(T));
218 element->~T();
219 (void)element;
220 }
221
222 auto rawBlock = block->rawThis;
223 block->~Block();
224 std::free(rawBlock);
225 block = nextBlock;
226 } while (block != frontBlock_);
227 }
228
229 // Enqueues a copy of element if there is room in the queue.
230 // Returns true if the element was enqueued, false otherwise.
231 // Does not allocate memory.
232 AE_FORCEINLINE bool try_enqueue(T const& element) AE_NO_TSAN {
233 return inner_enqueue<CannotAlloc>(element);
234 }
235
236 // Enqueues a moved copy of element if there is room in the queue.
237 // Returns true if the element was enqueued, false otherwise.
238 // Does not allocate memory.
239 AE_FORCEINLINE bool try_enqueue(T&& element) AE_NO_TSAN {
240 return inner_enqueue<CannotAlloc>(std::forward<T>(element));
241 }
242
243#if MOODYCAMEL_HAS_EMPLACE
244 // Like try_enqueue() but with emplace semantics (i.e. construct-in-place).
245 template <typename... Args>
246 AE_FORCEINLINE bool try_emplace(Args&&... args) AE_NO_TSAN {
247 return inner_enqueue<CannotAlloc>(std::forward<Args>(args)...);
248 }
249#endif
250
251 // Enqueues a copy of element on the queue.
252 // Allocates an additional block of memory if needed.
253 // Only fails (returns false) if memory allocation fails.
254 AE_FORCEINLINE bool enqueue(T const& element) AE_NO_TSAN {
255 return inner_enqueue<CanAlloc>(element);
256 }
257
258 // Enqueues a moved copy of element on the queue.
259 // Allocates an additional block of memory if needed.
260 // Only fails (returns false) if memory allocation fails.
261 AE_FORCEINLINE bool enqueue(T&& element) AE_NO_TSAN {
262 return inner_enqueue<CanAlloc>(std::forward<T>(element));
263 }
264
265#if MOODYCAMEL_HAS_EMPLACE
266 // Like enqueue() but with emplace semantics (i.e. construct-in-place).
267 template <typename... Args>
268 AE_FORCEINLINE bool emplace(Args&&... args) AE_NO_TSAN {
269 return inner_enqueue<CanAlloc>(std::forward<Args>(args)...);
270 }
271#endif
272
273 // Attempts to dequeue an element; if the queue is empty,
274 // returns false instead. If the queue has at least one element,
275 // moves front to result using operator=, then returns true.
276 template <typename U>
277 bool try_dequeue(U& result) AE_NO_TSAN {
278#ifndef NDEBUG
279 ReentrantGuard guard(this->dequeuing);
280#endif
281
282 // High-level pseudocode:
283 // Remember where the tail block is
284 // If the front block has an element in it, dequeue it
285 // Else
286 // If front block was the tail block when we entered the function, return false
287 // Else advance to next block and dequeue the item there
288
289 // Note that we have to use the value of the tail block from before we check if the front
290 // block is full or not, in case the front block is empty and then, before we check if the
291 // tail block is at the front block or not, the producer fills up the front block *and
292 // moves on*, which would make us skip a filled block. Seems unlikely, but was consistently
293 // reproducible in practice.
294 // In order to avoid overhead in the common case, though, we do a double-checked pattern
295 // where we have the fast path if the front block is not empty, then read the tail block,
296 // then re-read the front block and check if it's not empty again, then check if the tail
297 // block has advanced.
298
299 Block* frontBlock_ = frontBlock.load();
300 size_t blockTail = frontBlock_->localTail;
301 size_t blockFront = frontBlock_->front.load();
302
303 if (blockFront != blockTail ||
304 blockFront != (frontBlock_->localTail = frontBlock_->tail.load())) {
305 fence(memory_order_acquire);
306
307 non_empty_front_block:
308 // Front block not empty, dequeue from here
309 auto element = reinterpret_cast<T*>(frontBlock_->data + blockFront * sizeof(T));
310 result = std::move(*element);
311 element->~T();
312
313 blockFront = (blockFront + 1) & frontBlock_->sizeMask;
314
315 fence(memory_order_release);
316 frontBlock_->front = blockFront;
317 } else if (frontBlock_ != tailBlock.load()) {
318 fence(memory_order_acquire);
319
320 frontBlock_ = frontBlock.load();
321 blockTail = frontBlock_->localTail = frontBlock_->tail.load();
322 blockFront = frontBlock_->front.load();
323 fence(memory_order_acquire);
324
325 if (blockFront != blockTail) {
326 // Oh look, the front block isn't empty after all
327 goto non_empty_front_block;
328 }
329
330 // Front block is empty but there's another block ahead, advance to it
331 Block* nextBlock = frontBlock_->next;
332 // Don't need an acquire fence here since next can only ever be set on the tailBlock,
333 // and we're not the tailBlock, and we did an acquire earlier after reading tailBlock
334 // which ensures next is up-to-date on this CPU in case we recently were at tailBlock.
335
336 size_t nextBlockFront = nextBlock->front.load();
337 size_t nextBlockTail = nextBlock->localTail = nextBlock->tail.load();
338 fence(memory_order_acquire);
339
340 // Since the tailBlock is only ever advanced after being written to,
341 // we know there's for sure an element to dequeue on it
342 assert(nextBlockFront != nextBlockTail);
343 AE_UNUSED(nextBlockTail);
344
345 // We're done with this block, let the producer use it if it needs
346 fence(memory_order_release); // Expose possibly pending changes to frontBlock->front
347 // from last dequeue
348 frontBlock = frontBlock_ = nextBlock;
349
350 compiler_fence(memory_order_release); // Not strictly needed
351
352 auto element = reinterpret_cast<T*>(frontBlock_->data + nextBlockFront * sizeof(T));
353
354 result = std::move(*element);
355 element->~T();
356
357 nextBlockFront = (nextBlockFront + 1) & frontBlock_->sizeMask;
358
359 fence(memory_order_release);
360 frontBlock_->front = nextBlockFront;
361 } else {
362 // No elements in current block and no other block to advance to
363 return false;
364 }
365
366 return true;
367 }
368
369 // Returns a pointer to the front element in the queue (the one that
370 // would be removed next by a call to `try_dequeue` or `pop`). If the
371 // queue appears empty at the time the method is called, nullptr is
372 // returned instead.
373 // Must be called only from the consumer thread.
374 T* peek() const AE_NO_TSAN {
375#ifndef NDEBUG
376 ReentrantGuard guard(this->dequeuing);
377#endif
378 // See try_dequeue() for reasoning
379
380 Block* frontBlock_ = frontBlock.load();
381 size_t blockTail = frontBlock_->localTail;
382 size_t blockFront = frontBlock_->front.load();
383
384 if (blockFront != blockTail ||
385 blockFront != (frontBlock_->localTail = frontBlock_->tail.load())) {
386 fence(memory_order_acquire);
387 non_empty_front_block:
388 return reinterpret_cast<T*>(frontBlock_->data + blockFront * sizeof(T));
389 } else if (frontBlock_ != tailBlock.load()) {
390 fence(memory_order_acquire);
391 frontBlock_ = frontBlock.load();
392 blockTail = frontBlock_->localTail = frontBlock_->tail.load();
393 blockFront = frontBlock_->front.load();
394 fence(memory_order_acquire);
395
396 if (blockFront != blockTail) {
397 goto non_empty_front_block;
398 }
399
400 Block* nextBlock = frontBlock_->next;
401
402 size_t nextBlockFront = nextBlock->front.load();
403 fence(memory_order_acquire);
404
405 assert(nextBlockFront != nextBlock->tail.load());
406 return reinterpret_cast<T*>(nextBlock->data + nextBlockFront * sizeof(T));
407 }
408
409 return nullptr;
410 }
411
412 // Removes the front element from the queue, if any, without returning it.
413 // Returns true on success, or false if the queue appeared empty at the time
414 // `pop` was called.
415 bool pop() AE_NO_TSAN {
416#ifndef NDEBUG
417 ReentrantGuard guard(this->dequeuing);
418#endif
419 // See try_dequeue() for reasoning
420
421 Block* frontBlock_ = frontBlock.load();
422 size_t blockTail = frontBlock_->localTail;
423 size_t blockFront = frontBlock_->front.load();
424
425 if (blockFront != blockTail ||
426 blockFront != (frontBlock_->localTail = frontBlock_->tail.load())) {
427 fence(memory_order_acquire);
428
429 non_empty_front_block:
430 auto element = reinterpret_cast<T*>(frontBlock_->data + blockFront * sizeof(T));
431 element->~T();
432
433 blockFront = (blockFront + 1) & frontBlock_->sizeMask;
434
435 fence(memory_order_release);
436 frontBlock_->front = blockFront;
437 } else if (frontBlock_ != tailBlock.load()) {
438 fence(memory_order_acquire);
439 frontBlock_ = frontBlock.load();
440 blockTail = frontBlock_->localTail = frontBlock_->tail.load();
441 blockFront = frontBlock_->front.load();
442 fence(memory_order_acquire);
443
444 if (blockFront != blockTail) {
445 goto non_empty_front_block;
446 }
447
448 // Front block is empty but there's another block ahead, advance to it
449 Block* nextBlock = frontBlock_->next;
450
451 size_t nextBlockFront = nextBlock->front.load();
452 size_t nextBlockTail = nextBlock->localTail = nextBlock->tail.load();
453 fence(memory_order_acquire);
454
455 assert(nextBlockFront != nextBlockTail);
456 AE_UNUSED(nextBlockTail);
457
458 fence(memory_order_release);
459 frontBlock = frontBlock_ = nextBlock;
460
461 compiler_fence(memory_order_release);
462
463 auto element = reinterpret_cast<T*>(frontBlock_->data + nextBlockFront * sizeof(T));
464 element->~T();
465
466 nextBlockFront = (nextBlockFront + 1) & frontBlock_->sizeMask;
467
468 fence(memory_order_release);
469 frontBlock_->front = nextBlockFront;
470 } else {
471 // No elements in current block and no other block to advance to
472 return false;
473 }
474
475 return true;
476 }
477
478 // Returns the approximate number of items currently in the queue.
479 // Safe to call from both the producer and consumer threads.
480 inline size_t size_approx() const AE_NO_TSAN {
481 size_t result = 0;
482 Block* frontBlock_ = frontBlock.load();
483 Block* block = frontBlock_;
484 do {
485 fence(memory_order_acquire);
486 size_t blockFront = block->front.load();
487 size_t blockTail = block->tail.load();
488 result += (blockTail - blockFront) & block->sizeMask;
489 block = block->next.load();
490 } while (block != frontBlock_);
491 return result;
492 }
493
494 // Returns the total number of items that could be enqueued without incurring
495 // an allocation when this queue is empty.
496 // Safe to call from both the producer and consumer threads.
497 //
498 // NOTE: The actual capacity during usage may be different depending on the consumer.
499 // If the consumer is removing elements concurrently, the producer cannot add to
500 // the block the consumer is removing from until it's completely empty, except in
501 // the case where the producer was writing to the same block the consumer was
502 // reading from the whole time.
503 inline size_t max_capacity() const {
504 size_t result = 0;
505 Block* frontBlock_ = frontBlock.load();
506 Block* block = frontBlock_;
507 do {
508 fence(memory_order_acquire);
509 result += block->sizeMask;
510 block = block->next.load();
511 } while (block != frontBlock_);
512 return result;
513 }
514
515private:
516 enum AllocationMode { CanAlloc, CannotAlloc };
517
518#if MOODYCAMEL_HAS_EMPLACE
519 template <AllocationMode canAlloc, typename... Args>
520 bool inner_enqueue(Args&&... args) AE_NO_TSAN
521#else
522 template <AllocationMode canAlloc, typename U>
523 bool inner_enqueue(U&& element) AE_NO_TSAN
524#endif
525 {
526#ifndef NDEBUG
527 ReentrantGuard guard(this->enqueuing);
528#endif
529
530 // High-level pseudocode (assuming we're allowed to alloc a new block):
531 // If room in tail block, add to tail
532 // Else check next block
533 // If next block is not the head block, enqueue on next block
534 // Else create a new block and enqueue there
535 // Advance tail to the block we just enqueued to
536
537 Block* tailBlock_ = tailBlock.load();
538 size_t blockFront = tailBlock_->localFront;
539 size_t blockTail = tailBlock_->tail.load();
540
541 size_t nextBlockTail = (blockTail + 1) & tailBlock_->sizeMask;
542 if (nextBlockTail != blockFront ||
543 nextBlockTail != (tailBlock_->localFront = tailBlock_->front.load())) {
544 fence(memory_order_acquire);
545 // This block has room for at least one more element
546 char* location = tailBlock_->data + blockTail * sizeof(T);
547#if MOODYCAMEL_HAS_EMPLACE
548 new (location) T(std::forward<Args>(args)...);
549#else
550 new (location) T(std::forward<U>(element));
551#endif
552
553 fence(memory_order_release);
554 tailBlock_->tail = nextBlockTail;
555 } else {
556 fence(memory_order_acquire);
557 if (tailBlock_->next.load() != frontBlock) {
558 // Note that the reason we can't advance to the frontBlock and start adding new
559 // entries there is because if we did, then dequeue would stay in that block,
560 // eventually reading the new values, instead of advancing to the next full block
561 // (whose values were enqueued first and so should be consumed first).
562
563 fence(memory_order_acquire); // Ensure we get latest writes if we got the latest
564 // frontBlock
565
566 // tailBlock is full, but there's a free block ahead, use it
567 Block* tailBlockNext = tailBlock_->next.load();
568 size_t nextBlockFront = tailBlockNext->localFront = tailBlockNext->front.load();
569 nextBlockTail = tailBlockNext->tail.load();
570 fence(memory_order_acquire);
571
572 // This block must be empty since it's not the head block and we
573 // go through the blocks in a circle
574 assert(nextBlockFront == nextBlockTail);
575 tailBlockNext->localFront = nextBlockFront;
576
577 char* location = tailBlockNext->data + nextBlockTail * sizeof(T);
578#if MOODYCAMEL_HAS_EMPLACE
579 new (location) T(std::forward<Args>(args)...);
580#else
581 new (location) T(std::forward<U>(element));
582#endif
583
584 tailBlockNext->tail = (nextBlockTail + 1) & tailBlockNext->sizeMask;
585
586 fence(memory_order_release);
587 tailBlock = tailBlockNext;
588 } else if (canAlloc == CanAlloc) {
589 // tailBlock is full and there's no free block ahead; create a new block
590 auto newBlockSize =
591 largestBlockSize >= MAX_BLOCK_SIZE ? largestBlockSize : largestBlockSize * 2;
592 auto newBlock = make_block(newBlockSize);
593 if (newBlock == nullptr) {
594 // Could not allocate a block!
595 return false;
596 }
597 largestBlockSize = newBlockSize;
598
599#if MOODYCAMEL_HAS_EMPLACE
600 new (newBlock->data) T(std::forward<Args>(args)...);
601#else
602 new (newBlock->data) T(std::forward<U>(element));
603#endif
604 assert(newBlock->front == 0);
605 newBlock->tail = newBlock->localTail = 1;
606
607 newBlock->next = tailBlock_->next.load();
608 tailBlock_->next = newBlock;
609
610 // Might be possible for the dequeue thread to see the new tailBlock->next
611 // *without* seeing the new tailBlock value, but this is OK since it can't
612 // advance to the next block until tailBlock is set anyway (because the only
613 // case where it could try to read the next is if it's already at the tailBlock,
614 // and it won't advance past tailBlock in any circumstance).
615
616 fence(memory_order_release);
617 tailBlock = newBlock;
618 } else if (canAlloc == CannotAlloc) {
619 // Would have had to allocate a new block to enqueue, but not allowed
620 return false;
621 } else {
622 assert(false && "Should be unreachable code");
623 return false;
624 }
625 }
626
627 return true;
628 }
629
630 // Disable copying
631 ReaderWriterQueue(ReaderWriterQueue const&) {}
632
633 // Disable assignment
634 ReaderWriterQueue& operator=(ReaderWriterQueue const&) {}
635
636 AE_FORCEINLINE static size_t ceilToPow2(size_t x) {
637 // From http://graphics.stanford.edu/~seander/bithacks.html#RoundUpPowerOf2
638 --x;
639 x |= x >> 1;
640 x |= x >> 2;
641 x |= x >> 4;
642 for (size_t i = 1; i < sizeof(size_t); i <<= 1) {
643 x |= x >> (i << 3);
644 }
645 ++x;
646 return x;
647 }
648
649 template <typename U>
650 static AE_FORCEINLINE char* align_for(char* ptr) AE_NO_TSAN {
651 const std::size_t alignment = std::alignment_of<U>::value;
652 return ptr + (alignment - (reinterpret_cast<std::uintptr_t>(ptr) % alignment)) % alignment;
653 }
654
655private:
656#ifndef NDEBUG
657 struct ReentrantGuard {
658 AE_NO_TSAN ReentrantGuard(weak_atomic<bool>& _inSection) : inSection(_inSection) {
659 assert(!inSection &&
660 "Concurrent (or re-entrant) enqueue or dequeue operation detected (only one "
661 "thread at a time may hold the producer or consumer role)");
662 inSection = true;
663 }
664
665 AE_NO_TSAN ~ReentrantGuard() {
666 inSection = false;
667 }
668
669 private:
670 ReentrantGuard& operator=(ReentrantGuard const&);
671
672 private:
673 weak_atomic<bool>& inSection;
674 };
675#endif
676
677 struct Block {
678 // Avoid false-sharing by putting highly contended variables on their own cache lines
679 weak_atomic<size_t> front; // (Atomic) Elements are read from here
680 size_t localTail; // An uncontended shadow copy of tail, owned by the consumer
681
682 char cachelineFiller0[MOODYCAMEL_CACHE_LINE_SIZE - sizeof(weak_atomic<size_t>) -
683 sizeof(size_t)];
684 weak_atomic<size_t> tail; // (Atomic) Elements are enqueued here
685 size_t localFront;
686
687 char cachelineFiller1[MOODYCAMEL_CACHE_LINE_SIZE - sizeof(weak_atomic<size_t>) -
688 sizeof(size_t)]; // next isn't very contended, but we don't want it on
689 // the same cache line as tail (which is)
690 weak_atomic<Block*> next; // (Atomic)
691
692 char* data; // Contents (on heap) are aligned to T's alignment
693
694 const size_t sizeMask;
695
696 // size must be a power of two (and greater than 0)
697 AE_NO_TSAN Block(size_t const& _size, char* _rawThis, char* _data)
698 : front(0UL), localTail(0), tail(0UL), localFront(0), next(nullptr), data(_data),
699 sizeMask(_size - 1), rawThis(_rawThis) {}
700
701 private:
702 // C4512 - Assignment operator could not be generated
703 Block& operator=(Block const&);
704
705 public:
706 char* rawThis;
707 };
708
709 static Block* make_block(size_t capacity) AE_NO_TSAN {
710 // Allocate enough memory for the block itself, as well as all the elements it will contain
711 auto size = sizeof(Block) + std::alignment_of<Block>::value - 1;
712 size += sizeof(T) * capacity + std::alignment_of<T>::value - 1;
713 auto newBlockRaw = static_cast<char*>(std::malloc(size));
714 if (newBlockRaw == nullptr) {
715 return nullptr;
716 }
717
718 auto newBlockAligned = align_for<Block>(newBlockRaw);
719 auto newBlockData = align_for<T>(newBlockAligned + sizeof(Block));
720 return new (newBlockAligned) Block(capacity, newBlockRaw, newBlockData);
721 }
722
723private:
724 weak_atomic<Block*> frontBlock; // (Atomic) Elements are dequeued from this block
725
726 char cachelineFiller[MOODYCAMEL_CACHE_LINE_SIZE - sizeof(weak_atomic<Block*>)];
727 weak_atomic<Block*> tailBlock; // (Atomic) Elements are enqueued to this block
728
729 size_t largestBlockSize;
730
731#ifndef NDEBUG
732 weak_atomic<bool> enqueuing;
733 mutable weak_atomic<bool> dequeuing;
734#endif
735};
736
737// Like ReaderWriterQueue, but also providees blocking operations
738template <typename T, size_t MAX_BLOCK_SIZE = 512>
739class BlockingReaderWriterQueue {
740private:
741 typedef ::Common::ReaderWriterQueue<T, MAX_BLOCK_SIZE> ReaderWriterQueue;
742
743public:
744 explicit BlockingReaderWriterQueue(size_t size = 15) AE_NO_TSAN
745 : inner(size),
746 sema(new spsc_sema::LightweightSemaphore()) {}
747
748 BlockingReaderWriterQueue(BlockingReaderWriterQueue&& other) AE_NO_TSAN
749 : inner(std::move(other.inner)),
750 sema(std::move(other.sema)) {}
751
752 BlockingReaderWriterQueue& operator=(BlockingReaderWriterQueue&& other) AE_NO_TSAN {
753 std::swap(sema, other.sema);
754 std::swap(inner, other.inner);
755 return *this;
756 }
757
758 // Enqueues a copy of element if there is room in the queue.
759 // Returns true if the element was enqueued, false otherwise.
760 // Does not allocate memory.
761 AE_FORCEINLINE bool try_enqueue(T const& element) AE_NO_TSAN {
762 if (inner.try_enqueue(element)) {
763 sema->signal();
764 return true;
765 }
766 return false;
767 }
768
769 // Enqueues a moved copy of element if there is room in the queue.
770 // Returns true if the element was enqueued, false otherwise.
771 // Does not allocate memory.
772 AE_FORCEINLINE bool try_enqueue(T&& element) AE_NO_TSAN {
773 if (inner.try_enqueue(std::forward<T>(element))) {
774 sema->signal();
775 return true;
776 }
777 return false;
778 }
779
780#if MOODYCAMEL_HAS_EMPLACE
781 // Like try_enqueue() but with emplace semantics (i.e. construct-in-place).
782 template <typename... Args>
783 AE_FORCEINLINE bool try_emplace(Args&&... args) AE_NO_TSAN {
784 if (inner.try_emplace(std::forward<Args>(args)...)) {
785 sema->signal();
786 return true;
787 }
788 return false;
789 }
790#endif
791
792 // Enqueues a copy of element on the queue.
793 // Allocates an additional block of memory if needed.
794 // Only fails (returns false) if memory allocation fails.
795 AE_FORCEINLINE bool enqueue(T const& element) AE_NO_TSAN {
796 if (inner.enqueue(element)) {
797 sema->signal();
798 return true;
799 }
800 return false;
801 }
802
803 // Enqueues a moved copy of element on the queue.
804 // Allocates an additional block of memory if needed.
805 // Only fails (returns false) if memory allocation fails.
806 AE_FORCEINLINE bool enqueue(T&& element) AE_NO_TSAN {
807 if (inner.enqueue(std::forward<T>(element))) {
808 sema->signal();
809 return true;
810 }
811 return false;
812 }
813
814#if MOODYCAMEL_HAS_EMPLACE
815 // Like enqueue() but with emplace semantics (i.e. construct-in-place).
816 template <typename... Args>
817 AE_FORCEINLINE bool emplace(Args&&... args) AE_NO_TSAN {
818 if (inner.emplace(std::forward<Args>(args)...)) {
819 sema->signal();
820 return true;
821 }
822 return false;
823 }
824#endif
825
826 // Attempts to dequeue an element; if the queue is empty,
827 // returns false instead. If the queue has at least one element,
828 // moves front to result using operator=, then returns true.
829 template <typename U>
830 bool try_dequeue(U& result) AE_NO_TSAN {
831 if (sema->tryWait()) {
832 bool success = inner.try_dequeue(result);
833 assert(success);
834 AE_UNUSED(success);
835 return true;
836 }
837 return false;
838 }
839
840 // Attempts to dequeue an element; if the queue is empty,
841 // waits until an element is available, then dequeues it.
842 template <typename U>
843 void wait_dequeue(U& result) AE_NO_TSAN {
844 while (!sema->wait())
845 ;
846 bool success = inner.try_dequeue(result);
847 AE_UNUSED(result);
848 assert(success);
849 AE_UNUSED(success);
850 }
851
852 // Attempts to dequeue an element; if the queue is empty,
853 // waits until an element is available up to the specified timeout,
854 // then dequeues it and returns true, or returns false if the timeout
855 // expires before an element can be dequeued.
856 // Using a negative timeout indicates an indefinite timeout,
857 // and is thus functionally equivalent to calling wait_dequeue.
858 template <typename U>
859 bool wait_dequeue_timed(U& result, std::int64_t timeout_usecs) AE_NO_TSAN {
860 if (!sema->wait(timeout_usecs)) {
861 return false;
862 }
863 bool success = inner.try_dequeue(result);
864 AE_UNUSED(result);
865 assert(success);
866 AE_UNUSED(success);
867 return true;
868 }
869
870#if __cplusplus > 199711L || _MSC_VER >= 1700
871 // Attempts to dequeue an element; if the queue is empty,
872 // waits until an element is available up to the specified timeout,
873 // then dequeues it and returns true, or returns false if the timeout
874 // expires before an element can be dequeued.
875 // Using a negative timeout indicates an indefinite timeout,
876 // and is thus functionally equivalent to calling wait_dequeue.
877 template <typename U, typename Rep, typename Period>
878 inline bool wait_dequeue_timed(U& result,
879 std::chrono::duration<Rep, Period> const& timeout) AE_NO_TSAN {
880 return wait_dequeue_timed(
881 result, std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
882 }
883#endif
884
885 // Returns a pointer to the front element in the queue (the one that
886 // would be removed next by a call to `try_dequeue` or `pop`). If the
887 // queue appears empty at the time the method is called, nullptr is
888 // returned instead.
889 // Must be called only from the consumer thread.
890 AE_FORCEINLINE T* peek() const AE_NO_TSAN {
891 return inner.peek();
892 }
893
894 // Removes the front element from the queue, if any, without returning it.
895 // Returns true on success, or false if the queue appeared empty at the time
896 // `pop` was called.
897 AE_FORCEINLINE bool pop() AE_NO_TSAN {
898 if (sema->tryWait()) {
899 bool result = inner.pop();
900 assert(result);
901 AE_UNUSED(result);
902 return true;
903 }
904 return false;
905 }
906
907 // Returns the approximate number of items currently in the queue.
908 // Safe to call from both the producer and consumer threads.
909 AE_FORCEINLINE size_t size_approx() const AE_NO_TSAN {
910 return sema->availableApprox();
911 }
912
913 // Returns the total number of items that could be enqueued without incurring
914 // an allocation when this queue is empty.
915 // Safe to call from both the producer and consumer threads.
916 //
917 // NOTE: The actual capacity during usage may be different depending on the consumer.
918 // If the consumer is removing elements concurrently, the producer cannot add to
919 // the block the consumer is removing from until it's completely empty, except in
920 // the case where the producer was writing to the same block the consumer was
921 // reading from the whole time.
922 AE_FORCEINLINE size_t max_capacity() const {
923 return inner.max_capacity();
924 }
925
926private:
927 // Disable copying & assignment
928 BlockingReaderWriterQueue(BlockingReaderWriterQueue const&) {}
929 BlockingReaderWriterQueue& operator=(BlockingReaderWriterQueue const&) {}
930
931private:
932 ReaderWriterQueue inner;
933 std::unique_ptr<spsc_sema::LightweightSemaphore> sema;
934};
935
936} // namespace Common
937
938#ifdef AE_VCPP
939#pragma warning(pop)
940#endif