diff options
| -rw-r--r-- | src/common/threadsafe_queue.h | 27 |
1 files changed, 22 insertions, 5 deletions
diff --git a/src/common/threadsafe_queue.h b/src/common/threadsafe_queue.h index 8430b9778..2c8c2b90e 100644 --- a/src/common/threadsafe_queue.h +++ b/src/common/threadsafe_queue.h | |||
| @@ -14,7 +14,7 @@ | |||
| 14 | #include <utility> | 14 | #include <utility> |
| 15 | 15 | ||
| 16 | namespace Common { | 16 | namespace Common { |
| 17 | template <typename T> | 17 | template <typename T, bool with_stop_token = false> |
| 18 | class SPSCQueue { | 18 | class SPSCQueue { |
| 19 | public: | 19 | public: |
| 20 | SPSCQueue() { | 20 | SPSCQueue() { |
| @@ -84,7 +84,7 @@ public: | |||
| 84 | void Wait() { | 84 | void Wait() { |
| 85 | if (Empty()) { | 85 | if (Empty()) { |
| 86 | std::unique_lock lock{cv_mutex}; | 86 | std::unique_lock lock{cv_mutex}; |
| 87 | cv.wait(lock, [this]() { return !Empty(); }); | 87 | cv.wait(lock, [this] { return !Empty(); }); |
| 88 | } | 88 | } |
| 89 | } | 89 | } |
| 90 | 90 | ||
| @@ -95,6 +95,19 @@ public: | |||
| 95 | return t; | 95 | return t; |
| 96 | } | 96 | } |
| 97 | 97 | ||
| 98 | T PopWait(std::stop_token stop_token) { | ||
| 99 | if (Empty()) { | ||
| 100 | std::unique_lock lock{cv_mutex}; | ||
| 101 | cv.wait(lock, stop_token, [this] { return !Empty(); }); | ||
| 102 | } | ||
| 103 | if (stop_token.stop_requested()) { | ||
| 104 | return T{}; | ||
| 105 | } | ||
| 106 | T t; | ||
| 107 | Pop(t); | ||
| 108 | return t; | ||
| 109 | } | ||
| 110 | |||
| 98 | // not thread-safe | 111 | // not thread-safe |
| 99 | void Clear() { | 112 | void Clear() { |
| 100 | size.store(0); | 113 | size.store(0); |
| @@ -123,13 +136,13 @@ private: | |||
| 123 | ElementPtr* read_ptr; | 136 | ElementPtr* read_ptr; |
| 124 | std::atomic_size_t size{0}; | 137 | std::atomic_size_t size{0}; |
| 125 | std::mutex cv_mutex; | 138 | std::mutex cv_mutex; |
| 126 | std::condition_variable cv; | 139 | std::conditional_t<with_stop_token, std::condition_variable_any, std::condition_variable> cv; |
| 127 | }; | 140 | }; |
| 128 | 141 | ||
| 129 | // a simple thread-safe, | 142 | // a simple thread-safe, |
| 130 | // single reader, multiple writer queue | 143 | // single reader, multiple writer queue |
| 131 | 144 | ||
| 132 | template <typename T> | 145 | template <typename T, bool with_stop_token = false> |
| 133 | class MPSCQueue { | 146 | class MPSCQueue { |
| 134 | public: | 147 | public: |
| 135 | [[nodiscard]] std::size_t Size() const { | 148 | [[nodiscard]] std::size_t Size() const { |
| @@ -166,13 +179,17 @@ public: | |||
| 166 | return spsc_queue.PopWait(); | 179 | return spsc_queue.PopWait(); |
| 167 | } | 180 | } |
| 168 | 181 | ||
| 182 | T PopWait(std::stop_token stop_token) { | ||
| 183 | return spsc_queue.PopWait(stop_token); | ||
| 184 | } | ||
| 185 | |||
| 169 | // not thread-safe | 186 | // not thread-safe |
| 170 | void Clear() { | 187 | void Clear() { |
| 171 | spsc_queue.Clear(); | 188 | spsc_queue.Clear(); |
| 172 | } | 189 | } |
| 173 | 190 | ||
| 174 | private: | 191 | private: |
| 175 | SPSCQueue<T> spsc_queue; | 192 | SPSCQueue<T, with_stop_token> spsc_queue; |
| 176 | std::mutex write_lock; | 193 | std::mutex write_lock; |
| 177 | }; | 194 | }; |
| 178 | } // namespace Common | 195 | } // namespace Common |