diff options
Diffstat (limited to 'src/common/bounded_threadsafe_queue.h')
| -rw-r--r-- | src/common/bounded_threadsafe_queue.h | 71 |
1 files changed, 71 insertions, 0 deletions
diff --git a/src/common/bounded_threadsafe_queue.h b/src/common/bounded_threadsafe_queue.h index e03427539..eb88cc1d1 100644 --- a/src/common/bounded_threadsafe_queue.h +++ b/src/common/bounded_threadsafe_queue.h | |||
| @@ -22,6 +22,55 @@ class SPSCQueue { | |||
| 22 | static_assert((Capacity & (Capacity - 1)) == 0, "Capacity must be a power of two."); | 22 | static_assert((Capacity & (Capacity - 1)) == 0, "Capacity must be a power of two."); |
| 23 | 23 | ||
| 24 | public: | 24 | public: |
| 25 | bool TryPush(T&& t) { | ||
| 26 | const size_t write_index = m_write_index.load(); | ||
| 27 | |||
| 28 | // Check if we have free slots to write to. | ||
| 29 | if ((write_index - m_read_index.load()) == Capacity) { | ||
| 30 | return false; | ||
| 31 | } | ||
| 32 | |||
| 33 | // Determine the position to write to. | ||
| 34 | const size_t pos = write_index % Capacity; | ||
| 35 | |||
| 36 | // Push into the queue. | ||
| 37 | m_data[pos] = std::move(t); | ||
| 38 | |||
| 39 | // Increment the write index. | ||
| 40 | ++m_write_index; | ||
| 41 | |||
| 42 | // Notify the consumer that we have pushed into the queue. | ||
| 43 | std::scoped_lock lock{cv_mutex}; | ||
| 44 | cv.notify_one(); | ||
| 45 | |||
| 46 | return true; | ||
| 47 | } | ||
| 48 | |||
| 49 | template <typename... Args> | ||
| 50 | bool TryPush(Args&&... args) { | ||
| 51 | const size_t write_index = m_write_index.load(); | ||
| 52 | |||
| 53 | // Check if we have free slots to write to. | ||
| 54 | if ((write_index - m_read_index.load()) == Capacity) { | ||
| 55 | return false; | ||
| 56 | } | ||
| 57 | |||
| 58 | // Determine the position to write to. | ||
| 59 | const size_t pos = write_index % Capacity; | ||
| 60 | |||
| 61 | // Emplace into the queue. | ||
| 62 | std::construct_at(std::addressof(m_data[pos]), std::forward<Args>(args)...); | ||
| 63 | |||
| 64 | // Increment the write index. | ||
| 65 | ++m_write_index; | ||
| 66 | |||
| 67 | // Notify the consumer that we have pushed into the queue. | ||
| 68 | std::scoped_lock lock{cv_mutex}; | ||
| 69 | cv.notify_one(); | ||
| 70 | |||
| 71 | return true; | ||
| 72 | } | ||
| 73 | |||
| 25 | void Push(T&& t) { | 74 | void Push(T&& t) { |
| 26 | const size_t write_index = m_write_index.load(); | 75 | const size_t write_index = m_write_index.load(); |
| 27 | 76 | ||
| @@ -153,6 +202,17 @@ private: | |||
| 153 | template <typename T, size_t Capacity = detail::DefaultCapacity> | 202 | template <typename T, size_t Capacity = detail::DefaultCapacity> |
| 154 | class MPSCQueue { | 203 | class MPSCQueue { |
| 155 | public: | 204 | public: |
| 205 | bool TryPush(T&& t) { | ||
| 206 | std::scoped_lock lock{write_mutex}; | ||
| 207 | return spsc_queue.TryPush(std::move(t)); | ||
| 208 | } | ||
| 209 | |||
| 210 | template <typename... Args> | ||
| 211 | bool TryPush(Args&&... args) { | ||
| 212 | std::scoped_lock lock{write_mutex}; | ||
| 213 | return spsc_queue.TryPush(std::forward<Args>(args)...); | ||
| 214 | } | ||
| 215 | |||
| 156 | void Push(T&& t) { | 216 | void Push(T&& t) { |
| 157 | std::scoped_lock lock{write_mutex}; | 217 | std::scoped_lock lock{write_mutex}; |
| 158 | spsc_queue.Push(std::move(t)); | 218 | spsc_queue.Push(std::move(t)); |
| @@ -196,6 +256,17 @@ private: | |||
| 196 | template <typename T, size_t Capacity = detail::DefaultCapacity> | 256 | template <typename T, size_t Capacity = detail::DefaultCapacity> |
| 197 | class MPMCQueue { | 257 | class MPMCQueue { |
| 198 | public: | 258 | public: |
| 259 | bool TryPush(T&& t) { | ||
| 260 | std::scoped_lock lock{write_mutex}; | ||
| 261 | return spsc_queue.TryPush(std::move(t)); | ||
| 262 | } | ||
| 263 | |||
| 264 | template <typename... Args> | ||
| 265 | bool TryPush(Args&&... args) { | ||
| 266 | std::scoped_lock lock{write_mutex}; | ||
| 267 | return spsc_queue.TryPush(std::forward<Args>(args)...); | ||
| 268 | } | ||
| 269 | |||
| 199 | void Push(T&& t) { | 270 | void Push(T&& t) { |
| 200 | std::scoped_lock lock{write_mutex}; | 271 | std::scoped_lock lock{write_mutex}; |
| 201 | spsc_queue.Push(std::move(t)); | 272 | spsc_queue.Push(std::move(t)); |