summaryrefslogtreecommitdiff
path: root/src/common/bounded_threadsafe_queue.h
diff options
context:
space:
mode:
authorGravatar Morph2023-03-19 14:48:01 -0400
committerGravatar Morph2023-03-21 22:33:57 -0400
commit8c56481249ed1bc0b46bca3aec0c7e86495c5d3a (patch)
tree8b78335fec7ed51c821b7af37c4cb738019909f7 /src/common/bounded_threadsafe_queue.h
parentbounded_threadsafe_queue: Deduplicate and add PushModes (diff)
downloadyuzu-8c56481249ed1bc0b46bca3aec0c7e86495c5d3a.tar.gz
yuzu-8c56481249ed1bc0b46bca3aec0c7e86495c5d3a.tar.xz
yuzu-8c56481249ed1bc0b46bca3aec0c7e86495c5d3a.zip
bounded_threadsafe_queue: Add producer cv to avoid busy waiting
Diffstat (limited to '')
-rw-r--r--src/common/bounded_threadsafe_queue.h46
1 files changed, 29 insertions, 17 deletions
diff --git a/src/common/bounded_threadsafe_queue.h b/src/common/bounded_threadsafe_queue.h
index 975215863..0fb2f42d1 100644
--- a/src/common/bounded_threadsafe_queue.h
+++ b/src/common/bounded_threadsafe_queue.h
@@ -45,12 +45,12 @@ public:
45 } 45 }
46 46
47 void PopWait(T& t, std::stop_token stop_token) { 47 void PopWait(T& t, std::stop_token stop_token) {
48 Wait(stop_token); 48 ConsumerWait(stop_token);
49 Pop(t); 49 Pop(t);
50 } 50 }
51 51
52 T PopWait(std::stop_token stop_token) { 52 T PopWait(std::stop_token stop_token) {
53 Wait(stop_token); 53 ConsumerWait(stop_token);
54 T t; 54 T t;
55 Pop(t); 55 Pop(t);
56 return t; 56 return t;
@@ -88,9 +88,10 @@ private:
88 } 88 }
89 } else if constexpr (Mode == PushMode::Wait) { 89 } else if constexpr (Mode == PushMode::Wait) {
90 // Wait until we have free slots to write to. 90 // Wait until we have free slots to write to.
91 while ((write_index - m_read_index.load()) == Capacity) { 91 std::unique_lock lock{producer_cv_mutex};
92 std::this_thread::yield(); 92 producer_cv.wait(lock, [this, write_index] {
93 } 93 return (write_index - m_read_index.load()) < Capacity;
94 });
94 } else { 95 } else {
95 static_assert(Mode < PushMode::Count, "Invalid PushMode."); 96 static_assert(Mode < PushMode::Count, "Invalid PushMode.");
96 } 97 }
@@ -105,8 +106,8 @@ private:
105 ++m_write_index; 106 ++m_write_index;
106 107
107 // Notify the consumer that we have pushed into the queue. 108 // Notify the consumer that we have pushed into the queue.
108 std::scoped_lock lock{cv_mutex}; 109 std::scoped_lock lock{consumer_cv_mutex};
109 cv.notify_one(); 110 consumer_cv.notify_one();
110 111
111 return true; 112 return true;
112 } 113 }
@@ -122,9 +123,10 @@ private:
122 } 123 }
123 } else if constexpr (Mode == PushMode::Wait) { 124 } else if constexpr (Mode == PushMode::Wait) {
124 // Wait until we have free slots to write to. 125 // Wait until we have free slots to write to.
125 while ((write_index - m_read_index.load()) == Capacity) { 126 std::unique_lock lock{producer_cv_mutex};
126 std::this_thread::yield(); 127 producer_cv.wait(lock, [this, write_index] {
127 } 128 return (write_index - m_read_index.load()) < Capacity;
129 });
128 } else { 130 } else {
129 static_assert(Mode < PushMode::Count, "Invalid PushMode."); 131 static_assert(Mode < PushMode::Count, "Invalid PushMode.");
130 } 132 }
@@ -139,8 +141,8 @@ private:
139 ++m_write_index; 141 ++m_write_index;
140 142
141 // Notify the consumer that we have pushed into the queue. 143 // Notify the consumer that we have pushed into the queue.
142 std::scoped_lock lock{cv_mutex}; 144 std::scoped_lock lock{consumer_cv_mutex};
143 cv.notify_one(); 145 consumer_cv.notify_one();
144 146
145 return true; 147 return true;
146 } 148 }
@@ -161,6 +163,10 @@ private:
161 163
162 // Increment the read index. 164 // Increment the read index.
163 ++m_read_index; 165 ++m_read_index;
166
167 // Notify the producer that we have popped off the queue.
168 std::unique_lock lock{producer_cv_mutex};
169 producer_cv.notify_one();
164 } 170 }
165 171
166 bool Pop(T& t) { 172 bool Pop(T& t) {
@@ -180,12 +186,16 @@ private:
180 // Increment the read index. 186 // Increment the read index.
181 ++m_read_index; 187 ++m_read_index;
182 188
189 // Notify the producer that we have popped off the queue.
190 std::scoped_lock lock{producer_cv_mutex};
191 producer_cv.notify_one();
192
183 return true; 193 return true;
184 } 194 }
185 195
186 void Wait(std::stop_token stop_token) { 196 void ConsumerWait(std::stop_token stop_token) {
187 std::unique_lock lock{cv_mutex}; 197 std::unique_lock lock{consumer_cv_mutex};
188 Common::CondvarWait(cv, lock, stop_token, [this] { return !Empty(); }); 198 Common::CondvarWait(consumer_cv, lock, stop_token, [this] { return !Empty(); });
189 } 199 }
190 200
191 alignas(128) std::atomic_size_t m_read_index{0}; 201 alignas(128) std::atomic_size_t m_read_index{0};
@@ -193,8 +203,10 @@ private:
193 203
194 std::array<T, Capacity> m_data; 204 std::array<T, Capacity> m_data;
195 205
196 std::condition_variable_any cv; 206 std::condition_variable_any producer_cv;
197 std::mutex cv_mutex; 207 std::mutex producer_cv_mutex;
208 std::condition_variable_any consumer_cv;
209 std::mutex consumer_cv_mutex;
198}; 210};
199 211
200template <typename T, size_t Capacity = detail::DefaultCapacity> 212template <typename T, size_t Capacity = detail::DefaultCapacity>