diff options
| author | 2023-03-19 14:48:01 -0400 | |
|---|---|---|
| committer | 2023-03-21 22:33:57 -0400 | |
| commit | 8c56481249ed1bc0b46bca3aec0c7e86495c5d3a (patch) | |
| tree | 8b78335fec7ed51c821b7af37c4cb738019909f7 /src/common/bounded_threadsafe_queue.h | |
| parent | bounded_threadsafe_queue: Deduplicate and add PushModes (diff) | |
| download | yuzu-8c56481249ed1bc0b46bca3aec0c7e86495c5d3a.tar.gz yuzu-8c56481249ed1bc0b46bca3aec0c7e86495c5d3a.tar.xz yuzu-8c56481249ed1bc0b46bca3aec0c7e86495c5d3a.zip | |
bounded_threadsafe_queue: Add producer cv to avoid busy waiting
Diffstat (limited to '')
| -rw-r--r-- | src/common/bounded_threadsafe_queue.h | 46 |
1 files changed, 29 insertions, 17 deletions
diff --git a/src/common/bounded_threadsafe_queue.h b/src/common/bounded_threadsafe_queue.h index 975215863..0fb2f42d1 100644 --- a/src/common/bounded_threadsafe_queue.h +++ b/src/common/bounded_threadsafe_queue.h | |||
| @@ -45,12 +45,12 @@ public: | |||
| 45 | } | 45 | } |
| 46 | 46 | ||
| 47 | void PopWait(T& t, std::stop_token stop_token) { | 47 | void PopWait(T& t, std::stop_token stop_token) { |
| 48 | Wait(stop_token); | 48 | ConsumerWait(stop_token); |
| 49 | Pop(t); | 49 | Pop(t); |
| 50 | } | 50 | } |
| 51 | 51 | ||
| 52 | T PopWait(std::stop_token stop_token) { | 52 | T PopWait(std::stop_token stop_token) { |
| 53 | Wait(stop_token); | 53 | ConsumerWait(stop_token); |
| 54 | T t; | 54 | T t; |
| 55 | Pop(t); | 55 | Pop(t); |
| 56 | return t; | 56 | return t; |
| @@ -88,9 +88,10 @@ private: | |||
| 88 | } | 88 | } |
| 89 | } else if constexpr (Mode == PushMode::Wait) { | 89 | } else if constexpr (Mode == PushMode::Wait) { |
| 90 | // Wait until we have free slots to write to. | 90 | // Wait until we have free slots to write to. |
| 91 | while ((write_index - m_read_index.load()) == Capacity) { | 91 | std::unique_lock lock{producer_cv_mutex}; |
| 92 | std::this_thread::yield(); | 92 | producer_cv.wait(lock, [this, write_index] { |
| 93 | } | 93 | return (write_index - m_read_index.load()) < Capacity; |
| 94 | }); | ||
| 94 | } else { | 95 | } else { |
| 95 | static_assert(Mode < PushMode::Count, "Invalid PushMode."); | 96 | static_assert(Mode < PushMode::Count, "Invalid PushMode."); |
| 96 | } | 97 | } |
| @@ -105,8 +106,8 @@ private: | |||
| 105 | ++m_write_index; | 106 | ++m_write_index; |
| 106 | 107 | ||
| 107 | // Notify the consumer that we have pushed into the queue. | 108 | // Notify the consumer that we have pushed into the queue. |
| 108 | std::scoped_lock lock{cv_mutex}; | 109 | std::scoped_lock lock{consumer_cv_mutex}; |
| 109 | cv.notify_one(); | 110 | consumer_cv.notify_one(); |
| 110 | 111 | ||
| 111 | return true; | 112 | return true; |
| 112 | } | 113 | } |
| @@ -122,9 +123,10 @@ private: | |||
| 122 | } | 123 | } |
| 123 | } else if constexpr (Mode == PushMode::Wait) { | 124 | } else if constexpr (Mode == PushMode::Wait) { |
| 124 | // Wait until we have free slots to write to. | 125 | // Wait until we have free slots to write to. |
| 125 | while ((write_index - m_read_index.load()) == Capacity) { | 126 | std::unique_lock lock{producer_cv_mutex}; |
| 126 | std::this_thread::yield(); | 127 | producer_cv.wait(lock, [this, write_index] { |
| 127 | } | 128 | return (write_index - m_read_index.load()) < Capacity; |
| 129 | }); | ||
| 128 | } else { | 130 | } else { |
| 129 | static_assert(Mode < PushMode::Count, "Invalid PushMode."); | 131 | static_assert(Mode < PushMode::Count, "Invalid PushMode."); |
| 130 | } | 132 | } |
| @@ -139,8 +141,8 @@ private: | |||
| 139 | ++m_write_index; | 141 | ++m_write_index; |
| 140 | 142 | ||
| 141 | // Notify the consumer that we have pushed into the queue. | 143 | // Notify the consumer that we have pushed into the queue. |
| 142 | std::scoped_lock lock{cv_mutex}; | 144 | std::scoped_lock lock{consumer_cv_mutex}; |
| 143 | cv.notify_one(); | 145 | consumer_cv.notify_one(); |
| 144 | 146 | ||
| 145 | return true; | 147 | return true; |
| 146 | } | 148 | } |
| @@ -161,6 +163,10 @@ private: | |||
| 161 | 163 | ||
| 162 | // Increment the read index. | 164 | // Increment the read index. |
| 163 | ++m_read_index; | 165 | ++m_read_index; |
| 166 | |||
| 167 | // Notify the producer that we have popped off the queue. | ||
| 168 | std::unique_lock lock{producer_cv_mutex}; | ||
| 169 | producer_cv.notify_one(); | ||
| 164 | } | 170 | } |
| 165 | 171 | ||
| 166 | bool Pop(T& t) { | 172 | bool Pop(T& t) { |
| @@ -180,12 +186,16 @@ private: | |||
| 180 | // Increment the read index. | 186 | // Increment the read index. |
| 181 | ++m_read_index; | 187 | ++m_read_index; |
| 182 | 188 | ||
| 189 | // Notify the producer that we have popped off the queue. | ||
| 190 | std::scoped_lock lock{producer_cv_mutex}; | ||
| 191 | producer_cv.notify_one(); | ||
| 192 | |||
| 183 | return true; | 193 | return true; |
| 184 | } | 194 | } |
| 185 | 195 | ||
| 186 | void Wait(std::stop_token stop_token) { | 196 | void ConsumerWait(std::stop_token stop_token) { |
| 187 | std::unique_lock lock{cv_mutex}; | 197 | std::unique_lock lock{consumer_cv_mutex}; |
| 188 | Common::CondvarWait(cv, lock, stop_token, [this] { return !Empty(); }); | 198 | Common::CondvarWait(consumer_cv, lock, stop_token, [this] { return !Empty(); }); |
| 189 | } | 199 | } |
| 190 | 200 | ||
| 191 | alignas(128) std::atomic_size_t m_read_index{0}; | 201 | alignas(128) std::atomic_size_t m_read_index{0}; |
| @@ -193,8 +203,10 @@ private: | |||
| 193 | 203 | ||
| 194 | std::array<T, Capacity> m_data; | 204 | std::array<T, Capacity> m_data; |
| 195 | 205 | ||
| 196 | std::condition_variable_any cv; | 206 | std::condition_variable_any producer_cv; |
| 197 | std::mutex cv_mutex; | 207 | std::mutex producer_cv_mutex; |
| 208 | std::condition_variable_any consumer_cv; | ||
| 209 | std::mutex consumer_cv_mutex; | ||
| 198 | }; | 210 | }; |
| 199 | 211 | ||
| 200 | template <typename T, size_t Capacity = detail::DefaultCapacity> | 212 | template <typename T, size_t Capacity = detail::DefaultCapacity> |