summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/common/bounded_threadsafe_queue.h202
1 files changed, 62 insertions, 140 deletions
diff --git a/src/common/bounded_threadsafe_queue.h b/src/common/bounded_threadsafe_queue.h
index 0fb2f42d1..bd87aa09b 100644
--- a/src/common/bounded_threadsafe_queue.h
+++ b/src/common/bounded_threadsafe_queue.h
@@ -22,52 +22,38 @@ class SPSCQueue {
22 static_assert((Capacity & (Capacity - 1)) == 0, "Capacity must be a power of two."); 22 static_assert((Capacity & (Capacity - 1)) == 0, "Capacity must be a power of two.");
23 23
24public: 24public:
25 bool TryPush(T&& t) {
26 return Push<PushMode::Try>(std::move(t));
27 }
28
29 template <typename... Args> 25 template <typename... Args>
30 bool TryEmplace(Args&&... args) { 26 bool TryEmplace(Args&&... args) {
31 return Emplace<PushMode::Try>(std::forward<Args>(args)...); 27 return Emplace<PushMode::Try>(std::forward<Args>(args)...);
32 } 28 }
33 29
34 void PushWait(T&& t) {
35 Push<PushMode::Wait>(std::move(t));
36 }
37
38 template <typename... Args> 30 template <typename... Args>
39 void EmplaceWait(Args&&... args) { 31 void EmplaceWait(Args&&... args) {
40 Emplace<PushMode::Wait>(std::forward<Args>(args)...); 32 Emplace<PushMode::Wait>(std::forward<Args>(args)...);
41 } 33 }
42 34
43 bool TryPop(T& t) { 35 bool TryPop(T& t) {
44 return Pop(t); 36 return Pop<PopMode::Try>(t);
37 }
38
39 void PopWait(T& t) {
40 Pop<PopMode::Wait>(t);
45 } 41 }
46 42
47 void PopWait(T& t, std::stop_token stop_token) { 43 void PopWait(T& t, std::stop_token stop_token) {
48 ConsumerWait(stop_token); 44 Pop<PopMode::WaitWithStopToken>(t, stop_token);
49 Pop(t);
50 } 45 }
51 46
52 T PopWait(std::stop_token stop_token) { 47 T PopWait() {
53 ConsumerWait(stop_token);
54 T t; 48 T t;
55 Pop(t); 49 Pop<PopMode::Wait>(t);
56 return t; 50 return t;
57 } 51 }
58 52
59 void Clear() { 53 T PopWait(std::stop_token stop_token) {
60 while (!Empty()) { 54 T t;
61 Pop(); 55 Pop<PopMode::WaitWithStopToken>(t, stop_token);
62 } 56 return t;
63 }
64
65 bool Empty() const {
66 return m_read_index.load() == m_write_index.load();
67 }
68
69 size_t Size() const {
70 return m_write_index.load() - m_read_index.load();
71 } 57 }
72 58
73private: 59private:
@@ -77,55 +63,27 @@ private:
77 Count, 63 Count,
78 }; 64 };
79 65
80 template <PushMode Mode> 66 enum class PopMode {
81 bool Push(T&& t) { 67 Try,
82 const size_t write_index = m_write_index.load(); 68 Wait,
83 69 WaitWithStopToken,
84 if constexpr (Mode == PushMode::Try) { 70 Count,
85 // Check if we have free slots to write to. 71 };
86 if ((write_index - m_read_index.load()) == Capacity) {
87 return false;
88 }
89 } else if constexpr (Mode == PushMode::Wait) {
90 // Wait until we have free slots to write to.
91 std::unique_lock lock{producer_cv_mutex};
92 producer_cv.wait(lock, [this, write_index] {
93 return (write_index - m_read_index.load()) < Capacity;
94 });
95 } else {
96 static_assert(Mode < PushMode::Count, "Invalid PushMode.");
97 }
98
99 // Determine the position to write to.
100 const size_t pos = write_index % Capacity;
101
102 // Push into the queue.
103 m_data[pos] = std::move(t);
104
105 // Increment the write index.
106 ++m_write_index;
107
108 // Notify the consumer that we have pushed into the queue.
109 std::scoped_lock lock{consumer_cv_mutex};
110 consumer_cv.notify_one();
111
112 return true;
113 }
114 72
115 template <PushMode Mode, typename... Args> 73 template <PushMode Mode, typename... Args>
116 bool Emplace(Args&&... args) { 74 bool Emplace(Args&&... args) {
117 const size_t write_index = m_write_index.load(); 75 const size_t write_index = m_write_index.load(std::memory_order::relaxed);
118 76
119 if constexpr (Mode == PushMode::Try) { 77 if constexpr (Mode == PushMode::Try) {
120 // Check if we have free slots to write to. 78 // Check if we have free slots to write to.
121 if ((write_index - m_read_index.load()) == Capacity) { 79 if ((write_index - m_read_index.load(std::memory_order::acquire)) == Capacity) {
122 return false; 80 return false;
123 } 81 }
124 } else if constexpr (Mode == PushMode::Wait) { 82 } else if constexpr (Mode == PushMode::Wait) {
125 // Wait until we have free slots to write to. 83 // Wait until we have free slots to write to.
126 std::unique_lock lock{producer_cv_mutex}; 84 std::unique_lock lock{producer_cv_mutex};
127 producer_cv.wait(lock, [this, write_index] { 85 producer_cv.wait(lock, [this, write_index] {
128 return (write_index - m_read_index.load()) < Capacity; 86 return (write_index - m_read_index.load(std::memory_order::acquire)) < Capacity;
129 }); 87 });
130 } else { 88 } else {
131 static_assert(Mode < PushMode::Count, "Invalid PushMode."); 89 static_assert(Mode < PushMode::Count, "Invalid PushMode.");
@@ -147,34 +105,32 @@ private:
147 return true; 105 return true;
148 } 106 }
149 107
150 void Pop() { 108 template <PopMode Mode>
151 const size_t read_index = m_read_index.load(); 109 bool Pop(T& t, [[maybe_unused]] std::stop_token stop_token = {}) {
152 110 const size_t read_index = m_read_index.load(std::memory_order::relaxed);
153 // Check if the queue is empty.
154 if (read_index == m_write_index.load()) {
155 return;
156 }
157
158 // Determine the position to read from.
159 const size_t pos = read_index % Capacity;
160
161 // Pop the data off the queue, deleting it.
162 std::destroy_at(std::addressof(m_data[pos]));
163
164 // Increment the read index.
165 ++m_read_index;
166
167 // Notify the producer that we have popped off the queue.
168 std::unique_lock lock{producer_cv_mutex};
169 producer_cv.notify_one();
170 }
171
172 bool Pop(T& t) {
173 const size_t read_index = m_read_index.load();
174 111
175 // Check if the queue is empty. 112 if constexpr (Mode == PopMode::Try) {
176 if (read_index == m_write_index.load()) { 113 // Check if the queue is empty.
177 return false; 114 if (read_index == m_write_index.load(std::memory_order::acquire)) {
115 return false;
116 }
117 } else if constexpr (Mode == PopMode::Wait) {
118 // Wait until the queue is not empty.
119 std::unique_lock lock{consumer_cv_mutex};
120 consumer_cv.wait(lock, [this, read_index] {
121 return read_index != m_write_index.load(std::memory_order::acquire);
122 });
123 } else if constexpr (Mode == PopMode::WaitWithStopToken) {
124 // Wait until the queue is not empty.
125 std::unique_lock lock{consumer_cv_mutex};
126 Common::CondvarWait(consumer_cv, lock, stop_token, [this, read_index] {
127 return read_index != m_write_index.load(std::memory_order::acquire);
128 });
129 if (stop_token.stop_requested()) {
130 return false;
131 }
132 } else {
133 static_assert(Mode < PopMode::Count, "Invalid PopMode.");
178 } 134 }
179 135
180 // Determine the position to read from. 136 // Determine the position to read from.
@@ -193,11 +149,6 @@ private:
193 return true; 149 return true;
194 } 150 }
195 151
196 void ConsumerWait(std::stop_token stop_token) {
197 std::unique_lock lock{consumer_cv_mutex};
198 Common::CondvarWait(consumer_cv, lock, stop_token, [this] { return !Empty(); });
199 }
200
201 alignas(128) std::atomic_size_t m_read_index{0}; 152 alignas(128) std::atomic_size_t m_read_index{0};
202 alignas(128) std::atomic_size_t m_write_index{0}; 153 alignas(128) std::atomic_size_t m_write_index{0};
203 154
@@ -212,22 +163,12 @@ private:
212template <typename T, size_t Capacity = detail::DefaultCapacity> 163template <typename T, size_t Capacity = detail::DefaultCapacity>
213class MPSCQueue { 164class MPSCQueue {
214public: 165public:
215 bool TryPush(T&& t) {
216 std::scoped_lock lock{write_mutex};
217 return spsc_queue.TryPush(std::move(t));
218 }
219
220 template <typename... Args> 166 template <typename... Args>
221 bool TryEmplace(Args&&... args) { 167 bool TryEmplace(Args&&... args) {
222 std::scoped_lock lock{write_mutex}; 168 std::scoped_lock lock{write_mutex};
223 return spsc_queue.TryEmplace(std::forward<Args>(args)...); 169 return spsc_queue.TryEmplace(std::forward<Args>(args)...);
224 } 170 }
225 171
226 void PushWait(T&& t) {
227 std::scoped_lock lock{write_mutex};
228 spsc_queue.PushWait(std::move(t));
229 }
230
231 template <typename... Args> 172 template <typename... Args>
232 void EmplaceWait(Args&&... args) { 173 void EmplaceWait(Args&&... args) {
233 std::scoped_lock lock{write_mutex}; 174 std::scoped_lock lock{write_mutex};
@@ -238,24 +179,20 @@ public:
238 return spsc_queue.TryPop(t); 179 return spsc_queue.TryPop(t);
239 } 180 }
240 181
241 void PopWait(T& t, std::stop_token stop_token) { 182 void PopWait(T& t) {
242 spsc_queue.PopWait(t, stop_token); 183 spsc_queue.PopWait(t);
243 } 184 }
244 185
245 T PopWait(std::stop_token stop_token) { 186 void PopWait(T& t, std::stop_token stop_token) {
246 return spsc_queue.PopWait(stop_token); 187 spsc_queue.PopWait(t, stop_token);
247 }
248
249 void Clear() {
250 spsc_queue.Clear();
251 } 188 }
252 189
253 bool Empty() { 190 T PopWait() {
254 return spsc_queue.Empty(); 191 return spsc_queue.PopWait();
255 } 192 }
256 193
257 size_t Size() { 194 T PopWait(std::stop_token stop_token) {
258 return spsc_queue.Size(); 195 return spsc_queue.PopWait(stop_token);
259 } 196 }
260 197
261private: 198private:
@@ -266,22 +203,12 @@ private:
266template <typename T, size_t Capacity = detail::DefaultCapacity> 203template <typename T, size_t Capacity = detail::DefaultCapacity>
267class MPMCQueue { 204class MPMCQueue {
268public: 205public:
269 bool TryPush(T&& t) {
270 std::scoped_lock lock{write_mutex};
271 return spsc_queue.TryPush(std::move(t));
272 }
273
274 template <typename... Args> 206 template <typename... Args>
275 bool TryEmplace(Args&&... args) { 207 bool TryEmplace(Args&&... args) {
276 std::scoped_lock lock{write_mutex}; 208 std::scoped_lock lock{write_mutex};
277 return spsc_queue.TryEmplace(std::forward<Args>(args)...); 209 return spsc_queue.TryEmplace(std::forward<Args>(args)...);
278 } 210 }
279 211
280 void PushWait(T&& t) {
281 std::scoped_lock lock{write_mutex};
282 spsc_queue.PushWait(std::move(t));
283 }
284
285 template <typename... Args> 212 template <typename... Args>
286 void EmplaceWait(Args&&... args) { 213 void EmplaceWait(Args&&... args) {
287 std::scoped_lock lock{write_mutex}; 214 std::scoped_lock lock{write_mutex};
@@ -293,29 +220,24 @@ public:
293 return spsc_queue.TryPop(t); 220 return spsc_queue.TryPop(t);
294 } 221 }
295 222
296 void PopWait(T& t, std::stop_token stop_token) { 223 void PopWait(T& t) {
297 std::scoped_lock lock{read_mutex};
298 spsc_queue.PopWait(t, stop_token);
299 }
300
301 T PopWait(std::stop_token stop_token) {
302 std::scoped_lock lock{read_mutex}; 224 std::scoped_lock lock{read_mutex};
303 return spsc_queue.PopWait(stop_token); 225 spsc_queue.PopWait(t);
304 } 226 }
305 227
306 void Clear() { 228 void PopWait(T& t, std::stop_token stop_token) {
307 std::scoped_lock lock{read_mutex}; 229 std::scoped_lock lock{read_mutex};
308 spsc_queue.Clear(); 230 spsc_queue.PopWait(t, stop_token);
309 } 231 }
310 232
311 bool Empty() { 233 T PopWait() {
312 std::scoped_lock lock{read_mutex}; 234 std::scoped_lock lock{read_mutex};
313 return spsc_queue.Empty(); 235 return spsc_queue.PopWait();
314 } 236 }
315 237
316 size_t Size() { 238 T PopWait(std::stop_token stop_token) {
317 std::scoped_lock lock{read_mutex}; 239 std::scoped_lock lock{read_mutex};
318 return spsc_queue.Size(); 240 return spsc_queue.PopWait(stop_token);
319 } 241 }
320 242
321private: 243private: