diff options
Diffstat (limited to 'src/common/threadsafe_queue.h')
| -rw-r--r-- | src/common/threadsafe_queue.h | 35 |
1 files changed, 16 insertions, 19 deletions
diff --git a/src/common/threadsafe_queue.h b/src/common/threadsafe_queue.h index edf13bc49..f553efdc9 100644 --- a/src/common/threadsafe_queue.h +++ b/src/common/threadsafe_queue.h | |||
| @@ -7,17 +7,16 @@ | |||
| 7 | // a simple lockless thread-safe, | 7 | // a simple lockless thread-safe, |
| 8 | // single reader, single writer queue | 8 | // single reader, single writer queue |
| 9 | 9 | ||
| 10 | #include <algorithm> | ||
| 11 | #include <atomic> | 10 | #include <atomic> |
| 12 | #include <cstddef> | 11 | #include <cstddef> |
| 13 | #include <mutex> | 12 | #include <mutex> |
| 14 | #include "common/common_types.h" | 13 | #include <utility> |
| 15 | 14 | ||
| 16 | namespace Common { | 15 | namespace Common { |
| 17 | template <typename T, bool NeedSize = true> | 16 | template <typename T> |
| 18 | class SPSCQueue { | 17 | class SPSCQueue { |
| 19 | public: | 18 | public: |
| 20 | SPSCQueue() : size(0) { | 19 | SPSCQueue() { |
| 21 | write_ptr = read_ptr = new ElementPtr(); | 20 | write_ptr = read_ptr = new ElementPtr(); |
| 22 | } | 21 | } |
| 23 | ~SPSCQueue() { | 22 | ~SPSCQueue() { |
| @@ -25,13 +24,12 @@ public: | |||
| 25 | delete read_ptr; | 24 | delete read_ptr; |
| 26 | } | 25 | } |
| 27 | 26 | ||
| 28 | u32 Size() const { | 27 | std::size_t Size() const { |
| 29 | static_assert(NeedSize, "using Size() on FifoQueue without NeedSize"); | ||
| 30 | return size.load(); | 28 | return size.load(); |
| 31 | } | 29 | } |
| 32 | 30 | ||
| 33 | bool Empty() const { | 31 | bool Empty() const { |
| 34 | return !read_ptr->next.load(); | 32 | return Size() == 0; |
| 35 | } | 33 | } |
| 36 | 34 | ||
| 37 | T& Front() const { | 35 | T& Front() const { |
| @@ -47,13 +45,13 @@ public: | |||
| 47 | ElementPtr* new_ptr = new ElementPtr(); | 45 | ElementPtr* new_ptr = new ElementPtr(); |
| 48 | write_ptr->next.store(new_ptr, std::memory_order_release); | 46 | write_ptr->next.store(new_ptr, std::memory_order_release); |
| 49 | write_ptr = new_ptr; | 47 | write_ptr = new_ptr; |
| 50 | if (NeedSize) | 48 | |
| 51 | size++; | 49 | ++size; |
| 52 | } | 50 | } |
| 53 | 51 | ||
| 54 | void Pop() { | 52 | void Pop() { |
| 55 | if (NeedSize) | 53 | --size; |
| 56 | size--; | 54 | |
| 57 | ElementPtr* tmpptr = read_ptr; | 55 | ElementPtr* tmpptr = read_ptr; |
| 58 | // advance the read pointer | 56 | // advance the read pointer |
| 59 | read_ptr = tmpptr->next.load(); | 57 | read_ptr = tmpptr->next.load(); |
| @@ -66,8 +64,7 @@ public: | |||
| 66 | if (Empty()) | 64 | if (Empty()) |
| 67 | return false; | 65 | return false; |
| 68 | 66 | ||
| 69 | if (NeedSize) | 67 | --size; |
| 70 | size--; | ||
| 71 | 68 | ||
| 72 | ElementPtr* tmpptr = read_ptr; | 69 | ElementPtr* tmpptr = read_ptr; |
| 73 | read_ptr = tmpptr->next.load(std::memory_order_acquire); | 70 | read_ptr = tmpptr->next.load(std::memory_order_acquire); |
| @@ -89,7 +86,7 @@ private: | |||
| 89 | // and a pointer to the next ElementPtr | 86 | // and a pointer to the next ElementPtr |
| 90 | class ElementPtr { | 87 | class ElementPtr { |
| 91 | public: | 88 | public: |
| 92 | ElementPtr() : next(nullptr) {} | 89 | ElementPtr() {} |
| 93 | ~ElementPtr() { | 90 | ~ElementPtr() { |
| 94 | ElementPtr* next_ptr = next.load(); | 91 | ElementPtr* next_ptr = next.load(); |
| 95 | 92 | ||
| @@ -98,21 +95,21 @@ private: | |||
| 98 | } | 95 | } |
| 99 | 96 | ||
| 100 | T current; | 97 | T current; |
| 101 | std::atomic<ElementPtr*> next; | 98 | std::atomic<ElementPtr*> next{nullptr}; |
| 102 | }; | 99 | }; |
| 103 | 100 | ||
| 104 | ElementPtr* write_ptr; | 101 | ElementPtr* write_ptr; |
| 105 | ElementPtr* read_ptr; | 102 | ElementPtr* read_ptr; |
| 106 | std::atomic<u32> size; | 103 | std::atomic_size_t size{0}; |
| 107 | }; | 104 | }; |
| 108 | 105 | ||
| 109 | // a simple thread-safe, | 106 | // a simple thread-safe, |
| 110 | // single reader, multiple writer queue | 107 | // single reader, multiple writer queue |
| 111 | 108 | ||
| 112 | template <typename T, bool NeedSize = true> | 109 | template <typename T> |
| 113 | class MPSCQueue { | 110 | class MPSCQueue { |
| 114 | public: | 111 | public: |
| 115 | u32 Size() const { | 112 | std::size_t Size() const { |
| 116 | return spsc_queue.Size(); | 113 | return spsc_queue.Size(); |
| 117 | } | 114 | } |
| 118 | 115 | ||
| @@ -144,7 +141,7 @@ public: | |||
| 144 | } | 141 | } |
| 145 | 142 | ||
| 146 | private: | 143 | private: |
| 147 | SPSCQueue<T, NeedSize> spsc_queue; | 144 | SPSCQueue<T> spsc_queue; |
| 148 | std::mutex write_lock; | 145 | std::mutex write_lock; |
| 149 | }; | 146 | }; |
| 150 | } // namespace Common | 147 | } // namespace Common |