summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/common/threadsafe_queue.h27
1 files changed, 22 insertions, 5 deletions
diff --git a/src/common/threadsafe_queue.h b/src/common/threadsafe_queue.h
index 8430b9778..2c8c2b90e 100644
--- a/src/common/threadsafe_queue.h
+++ b/src/common/threadsafe_queue.h
@@ -14,7 +14,7 @@
14#include <utility> 14#include <utility>
15 15
16namespace Common { 16namespace Common {
17template <typename T> 17template <typename T, bool with_stop_token = false>
18class SPSCQueue { 18class SPSCQueue {
19public: 19public:
20 SPSCQueue() { 20 SPSCQueue() {
@@ -84,7 +84,7 @@ public:
84 void Wait() { 84 void Wait() {
85 if (Empty()) { 85 if (Empty()) {
86 std::unique_lock lock{cv_mutex}; 86 std::unique_lock lock{cv_mutex};
87 cv.wait(lock, [this]() { return !Empty(); }); 87 cv.wait(lock, [this] { return !Empty(); });
88 } 88 }
89 } 89 }
90 90
@@ -95,6 +95,19 @@ public:
95 return t; 95 return t;
96 } 96 }
97 97
98 T PopWait(std::stop_token stop_token) {
99 if (Empty()) {
100 std::unique_lock lock{cv_mutex};
101 cv.wait(lock, stop_token, [this] { return !Empty(); });
102 }
103 if (stop_token.stop_requested()) {
104 return T{};
105 }
106 T t;
107 Pop(t);
108 return t;
109 }
110
98 // not thread-safe 111 // not thread-safe
99 void Clear() { 112 void Clear() {
100 size.store(0); 113 size.store(0);
@@ -123,13 +136,13 @@ private:
123 ElementPtr* read_ptr; 136 ElementPtr* read_ptr;
124 std::atomic_size_t size{0}; 137 std::atomic_size_t size{0};
125 std::mutex cv_mutex; 138 std::mutex cv_mutex;
126 std::condition_variable cv; 139 std::conditional_t<with_stop_token, std::condition_variable_any, std::condition_variable> cv;
127}; 140};
128 141
129// a simple thread-safe, 142// a simple thread-safe,
130// single reader, multiple writer queue 143// single reader, multiple writer queue
131 144
132template <typename T> 145template <typename T, bool with_stop_token = false>
133class MPSCQueue { 146class MPSCQueue {
134public: 147public:
135 [[nodiscard]] std::size_t Size() const { 148 [[nodiscard]] std::size_t Size() const {
@@ -166,13 +179,17 @@ public:
166 return spsc_queue.PopWait(); 179 return spsc_queue.PopWait();
167 } 180 }
168 181
182 T PopWait(std::stop_token stop_token) {
183 return spsc_queue.PopWait(stop_token);
184 }
185
169 // not thread-safe 186 // not thread-safe
170 void Clear() { 187 void Clear() {
171 spsc_queue.Clear(); 188 spsc_queue.Clear();
172 } 189 }
173 190
174private: 191private:
175 SPSCQueue<T> spsc_queue; 192 SPSCQueue<T, with_stop_token> spsc_queue;
176 std::mutex write_lock; 193 std::mutex write_lock;
177}; 194};
178} // namespace Common 195} // namespace Common