diff options
Diffstat (limited to 'src/common/threadsafe_queue.h')
| -rw-r--r-- | src/common/threadsafe_queue.h | 53 |
1 files changed, 34 insertions, 19 deletions
diff --git a/src/common/threadsafe_queue.h b/src/common/threadsafe_queue.h index edf13bc49..821e8536a 100644 --- a/src/common/threadsafe_queue.h +++ b/src/common/threadsafe_queue.h | |||
| @@ -7,17 +7,17 @@ | |||
| 7 | // a simple lockless thread-safe, | 7 | // a simple lockless thread-safe, |
| 8 | // single reader, single writer queue | 8 | // single reader, single writer queue |
| 9 | 9 | ||
| 10 | #include <algorithm> | ||
| 11 | #include <atomic> | 10 | #include <atomic> |
| 11 | #include <condition_variable> | ||
| 12 | #include <cstddef> | 12 | #include <cstddef> |
| 13 | #include <mutex> | 13 | #include <mutex> |
| 14 | #include "common/common_types.h" | 14 | #include <utility> |
| 15 | 15 | ||
| 16 | namespace Common { | 16 | namespace Common { |
| 17 | template <typename T, bool NeedSize = true> | 17 | template <typename T> |
| 18 | class SPSCQueue { | 18 | class SPSCQueue { |
| 19 | public: | 19 | public: |
| 20 | SPSCQueue() : size(0) { | 20 | SPSCQueue() { |
| 21 | write_ptr = read_ptr = new ElementPtr(); | 21 | write_ptr = read_ptr = new ElementPtr(); |
| 22 | } | 22 | } |
| 23 | ~SPSCQueue() { | 23 | ~SPSCQueue() { |
| @@ -25,13 +25,12 @@ public: | |||
| 25 | delete read_ptr; | 25 | delete read_ptr; |
| 26 | } | 26 | } |
| 27 | 27 | ||
| 28 | u32 Size() const { | 28 | std::size_t Size() const { |
| 29 | static_assert(NeedSize, "using Size() on FifoQueue without NeedSize"); | ||
| 30 | return size.load(); | 29 | return size.load(); |
| 31 | } | 30 | } |
| 32 | 31 | ||
| 33 | bool Empty() const { | 32 | bool Empty() const { |
| 34 | return !read_ptr->next.load(); | 33 | return Size() == 0; |
| 35 | } | 34 | } |
| 36 | 35 | ||
| 37 | T& Front() const { | 36 | T& Front() const { |
| @@ -47,13 +46,14 @@ public: | |||
| 47 | ElementPtr* new_ptr = new ElementPtr(); | 46 | ElementPtr* new_ptr = new ElementPtr(); |
| 48 | write_ptr->next.store(new_ptr, std::memory_order_release); | 47 | write_ptr->next.store(new_ptr, std::memory_order_release); |
| 49 | write_ptr = new_ptr; | 48 | write_ptr = new_ptr; |
| 50 | if (NeedSize) | 49 | cv.notify_one(); |
| 51 | size++; | 50 | |
| 51 | ++size; | ||
| 52 | } | 52 | } |
| 53 | 53 | ||
| 54 | void Pop() { | 54 | void Pop() { |
| 55 | if (NeedSize) | 55 | --size; |
| 56 | size--; | 56 | |
| 57 | ElementPtr* tmpptr = read_ptr; | 57 | ElementPtr* tmpptr = read_ptr; |
| 58 | // advance the read pointer | 58 | // advance the read pointer |
| 59 | read_ptr = tmpptr->next.load(); | 59 | read_ptr = tmpptr->next.load(); |
| @@ -66,8 +66,7 @@ public: | |||
| 66 | if (Empty()) | 66 | if (Empty()) |
| 67 | return false; | 67 | return false; |
| 68 | 68 | ||
| 69 | if (NeedSize) | 69 | --size; |
| 70 | size--; | ||
| 71 | 70 | ||
| 72 | ElementPtr* tmpptr = read_ptr; | 71 | ElementPtr* tmpptr = read_ptr; |
| 73 | read_ptr = tmpptr->next.load(std::memory_order_acquire); | 72 | read_ptr = tmpptr->next.load(std::memory_order_acquire); |
| @@ -77,6 +76,16 @@ public: | |||
| 77 | return true; | 76 | return true; |
| 78 | } | 77 | } |
| 79 | 78 | ||
| 79 | T PopWait() { | ||
| 80 | if (Empty()) { | ||
| 81 | std::unique_lock<std::mutex> lock(cv_mutex); | ||
| 82 | cv.wait(lock, [this]() { return !Empty(); }); | ||
| 83 | } | ||
| 84 | T t; | ||
| 85 | Pop(t); | ||
| 86 | return t; | ||
| 87 | } | ||
| 88 | |||
| 80 | // not thread-safe | 89 | // not thread-safe |
| 81 | void Clear() { | 90 | void Clear() { |
| 82 | size.store(0); | 91 | size.store(0); |
| @@ -89,7 +98,7 @@ private: | |||
| 89 | // and a pointer to the next ElementPtr | 98 | // and a pointer to the next ElementPtr |
| 90 | class ElementPtr { | 99 | class ElementPtr { |
| 91 | public: | 100 | public: |
| 92 | ElementPtr() : next(nullptr) {} | 101 | ElementPtr() {} |
| 93 | ~ElementPtr() { | 102 | ~ElementPtr() { |
| 94 | ElementPtr* next_ptr = next.load(); | 103 | ElementPtr* next_ptr = next.load(); |
| 95 | 104 | ||
| @@ -98,21 +107,23 @@ private: | |||
| 98 | } | 107 | } |
| 99 | 108 | ||
| 100 | T current; | 109 | T current; |
| 101 | std::atomic<ElementPtr*> next; | 110 | std::atomic<ElementPtr*> next{nullptr}; |
| 102 | }; | 111 | }; |
| 103 | 112 | ||
| 104 | ElementPtr* write_ptr; | 113 | ElementPtr* write_ptr; |
| 105 | ElementPtr* read_ptr; | 114 | ElementPtr* read_ptr; |
| 106 | std::atomic<u32> size; | 115 | std::atomic_size_t size{0}; |
| 116 | std::mutex cv_mutex; | ||
| 117 | std::condition_variable cv; | ||
| 107 | }; | 118 | }; |
| 108 | 119 | ||
| 109 | // a simple thread-safe, | 120 | // a simple thread-safe, |
| 110 | // single reader, multiple writer queue | 121 | // single reader, multiple writer queue |
| 111 | 122 | ||
| 112 | template <typename T, bool NeedSize = true> | 123 | template <typename T> |
| 113 | class MPSCQueue { | 124 | class MPSCQueue { |
| 114 | public: | 125 | public: |
| 115 | u32 Size() const { | 126 | std::size_t Size() const { |
| 116 | return spsc_queue.Size(); | 127 | return spsc_queue.Size(); |
| 117 | } | 128 | } |
| 118 | 129 | ||
| @@ -138,13 +149,17 @@ public: | |||
| 138 | return spsc_queue.Pop(t); | 149 | return spsc_queue.Pop(t); |
| 139 | } | 150 | } |
| 140 | 151 | ||
| 152 | T PopWait() { | ||
| 153 | return spsc_queue.PopWait(); | ||
| 154 | } | ||
| 155 | |||
| 141 | // not thread-safe | 156 | // not thread-safe |
| 142 | void Clear() { | 157 | void Clear() { |
| 143 | spsc_queue.Clear(); | 158 | spsc_queue.Clear(); |
| 144 | } | 159 | } |
| 145 | 160 | ||
| 146 | private: | 161 | private: |
| 147 | SPSCQueue<T, NeedSize> spsc_queue; | 162 | SPSCQueue<T> spsc_queue; |
| 148 | std::mutex write_lock; | 163 | std::mutex write_lock; |
| 149 | }; | 164 | }; |
| 150 | } // namespace Common | 165 | } // namespace Common |