summaryrefslogtreecommitdiff
path: root/src/common/bounded_threadsafe_queue.h
diff options
context:
space:
mode:
authorGravatar Morph2023-03-19 14:24:18 -0400
committerGravatar Morph2023-03-21 19:20:21 -0400
commit407dc917f170cc9d08f3f1f9bdeb9e44ddebc653 (patch)
tree6275a084c767fd96e4d9ccbedc117d3fb43b7e68 /src/common/bounded_threadsafe_queue.h
parentbounded_threadsafe_queue: Add TryPush (diff)
downloadyuzu-407dc917f170cc9d08f3f1f9bdeb9e44ddebc653.tar.gz
yuzu-407dc917f170cc9d08f3f1f9bdeb9e44ddebc653.tar.xz
yuzu-407dc917f170cc9d08f3f1f9bdeb9e44ddebc653.zip
bounded_threadsafe_queue: Deduplicate and add PushModes
Adds the PushModes Try and Wait to allow producers to specify how they want to push their data to the queue if the queue is full. If the queue is full: - Try will fail to push to the queue, returning false. Try only returns true if it successfully pushes to the queue. This may result in items not being pushed into the queue. - Wait will wait until a slot is available to push to the queue, resulting in potential for deadlock if a consumer is not running.
Diffstat (limited to 'src/common/bounded_threadsafe_queue.h')
-rw-r--r--src/common/bounded_threadsafe_queue.h170
1 files changed, 84 insertions, 86 deletions
diff --git a/src/common/bounded_threadsafe_queue.h b/src/common/bounded_threadsafe_queue.h
index eb88cc1d1..975215863 100644
--- a/src/common/bounded_threadsafe_queue.h
+++ b/src/common/bounded_threadsafe_queue.h
@@ -23,60 +23,76 @@ class SPSCQueue {
23 23
24public: 24public:
25 bool TryPush(T&& t) { 25 bool TryPush(T&& t) {
26 const size_t write_index = m_write_index.load(); 26 return Push<PushMode::Try>(std::move(t));
27 27 }
28 // Check if we have free slots to write to.
29 if ((write_index - m_read_index.load()) == Capacity) {
30 return false;
31 }
32
33 // Determine the position to write to.
34 const size_t pos = write_index % Capacity;
35
36 // Push into the queue.
37 m_data[pos] = std::move(t);
38
39 // Increment the write index.
40 ++m_write_index;
41 28
42 // Notify the consumer that we have pushed into the queue. 29 template <typename... Args>
43 std::scoped_lock lock{cv_mutex}; 30 bool TryEmplace(Args&&... args) {
44 cv.notify_one(); 31 return Emplace<PushMode::Try>(std::forward<Args>(args)...);
32 }
45 33
46 return true; 34 void PushWait(T&& t) {
35 Push<PushMode::Wait>(std::move(t));
47 } 36 }
48 37
49 template <typename... Args> 38 template <typename... Args>
50 bool TryPush(Args&&... args) { 39 void EmplaceWait(Args&&... args) {
51 const size_t write_index = m_write_index.load(); 40 Emplace<PushMode::Wait>(std::forward<Args>(args)...);
41 }
52 42
53 // Check if we have free slots to write to. 43 bool TryPop(T& t) {
54 if ((write_index - m_read_index.load()) == Capacity) { 44 return Pop(t);
55 return false; 45 }
56 }
57 46
58 // Determine the position to write to. 47 void PopWait(T& t, std::stop_token stop_token) {
59 const size_t pos = write_index % Capacity; 48 Wait(stop_token);
49 Pop(t);
50 }
60 51
61 // Emplace into the queue. 52 T PopWait(std::stop_token stop_token) {
62 std::construct_at(std::addressof(m_data[pos]), std::forward<Args>(args)...); 53 Wait(stop_token);
54 T t;
55 Pop(t);
56 return t;
57 }
63 58
64 // Increment the write index. 59 void Clear() {
65 ++m_write_index; 60 while (!Empty()) {
61 Pop();
62 }
63 }
66 64
67 // Notify the consumer that we have pushed into the queue. 65 bool Empty() const {
68 std::scoped_lock lock{cv_mutex}; 66 return m_read_index.load() == m_write_index.load();
69 cv.notify_one(); 67 }
70 68
71 return true; 69 size_t Size() const {
70 return m_write_index.load() - m_read_index.load();
72 } 71 }
73 72
74 void Push(T&& t) { 73private:
74 enum class PushMode {
75 Try,
76 Wait,
77 Count,
78 };
79
80 template <PushMode Mode>
81 bool Push(T&& t) {
75 const size_t write_index = m_write_index.load(); 82 const size_t write_index = m_write_index.load();
76 83
77 // Wait until we have free slots to write to. 84 if constexpr (Mode == PushMode::Try) {
78 while ((write_index - m_read_index.load()) == Capacity) { 85 // Check if we have free slots to write to.
79 std::this_thread::yield(); 86 if ((write_index - m_read_index.load()) == Capacity) {
87 return false;
88 }
89 } else if constexpr (Mode == PushMode::Wait) {
90 // Wait until we have free slots to write to.
91 while ((write_index - m_read_index.load()) == Capacity) {
92 std::this_thread::yield();
93 }
94 } else {
95 static_assert(Mode < PushMode::Count, "Invalid PushMode.");
80 } 96 }
81 97
82 // Determine the position to write to. 98 // Determine the position to write to.
@@ -91,15 +107,26 @@ public:
91 // Notify the consumer that we have pushed into the queue. 107 // Notify the consumer that we have pushed into the queue.
92 std::scoped_lock lock{cv_mutex}; 108 std::scoped_lock lock{cv_mutex};
93 cv.notify_one(); 109 cv.notify_one();
110
111 return true;
94 } 112 }
95 113
96 template <typename... Args> 114 template <PushMode Mode, typename... Args>
97 void Push(Args&&... args) { 115 bool Emplace(Args&&... args) {
98 const size_t write_index = m_write_index.load(); 116 const size_t write_index = m_write_index.load();
99 117
100 // Wait until we have free slots to write to. 118 if constexpr (Mode == PushMode::Try) {
101 while ((write_index - m_read_index.load()) == Capacity) { 119 // Check if we have free slots to write to.
102 std::this_thread::yield(); 120 if ((write_index - m_read_index.load()) == Capacity) {
121 return false;
122 }
123 } else if constexpr (Mode == PushMode::Wait) {
124 // Wait until we have free slots to write to.
125 while ((write_index - m_read_index.load()) == Capacity) {
126 std::this_thread::yield();
127 }
128 } else {
129 static_assert(Mode < PushMode::Count, "Invalid PushMode.");
103 } 130 }
104 131
105 // Determine the position to write to. 132 // Determine the position to write to.
@@ -114,39 +141,10 @@ public:
114 // Notify the consumer that we have pushed into the queue. 141 // Notify the consumer that we have pushed into the queue.
115 std::scoped_lock lock{cv_mutex}; 142 std::scoped_lock lock{cv_mutex};
116 cv.notify_one(); 143 cv.notify_one();
117 }
118
119 bool TryPop(T& t) {
120 return Pop(t);
121 }
122
123 void PopWait(T& t, std::stop_token stop_token) {
124 Wait(stop_token);
125 Pop(t);
126 }
127
128 T PopWait(std::stop_token stop_token) {
129 Wait(stop_token);
130 T t;
131 Pop(t);
132 return t;
133 }
134 144
135 void Clear() { 145 return true;
136 while (!Empty()) {
137 Pop();
138 }
139 }
140
141 bool Empty() const {
142 return m_read_index.load() == m_write_index.load();
143 }
144
145 size_t Size() const {
146 return m_write_index.load() - m_read_index.load();
147 } 146 }
148 147
149private:
150 void Pop() { 148 void Pop() {
151 const size_t read_index = m_read_index.load(); 149 const size_t read_index = m_read_index.load();
152 150
@@ -208,20 +206,20 @@ public:
208 } 206 }
209 207
210 template <typename... Args> 208 template <typename... Args>
211 bool TryPush(Args&&... args) { 209 bool TryEmplace(Args&&... args) {
212 std::scoped_lock lock{write_mutex}; 210 std::scoped_lock lock{write_mutex};
213 return spsc_queue.TryPush(std::forward<Args>(args)...); 211 return spsc_queue.TryEmplace(std::forward<Args>(args)...);
214 } 212 }
215 213
216 void Push(T&& t) { 214 void PushWait(T&& t) {
217 std::scoped_lock lock{write_mutex}; 215 std::scoped_lock lock{write_mutex};
218 spsc_queue.Push(std::move(t)); 216 spsc_queue.PushWait(std::move(t));
219 } 217 }
220 218
221 template <typename... Args> 219 template <typename... Args>
222 void Push(Args&&... args) { 220 void EmplaceWait(Args&&... args) {
223 std::scoped_lock lock{write_mutex}; 221 std::scoped_lock lock{write_mutex};
224 spsc_queue.Push(std::forward<Args>(args)...); 222 spsc_queue.EmplaceWait(std::forward<Args>(args)...);
225 } 223 }
226 224
227 bool TryPop(T& t) { 225 bool TryPop(T& t) {
@@ -262,20 +260,20 @@ public:
262 } 260 }
263 261
264 template <typename... Args> 262 template <typename... Args>
265 bool TryPush(Args&&... args) { 263 bool TryEmplace(Args&&... args) {
266 std::scoped_lock lock{write_mutex}; 264 std::scoped_lock lock{write_mutex};
267 return spsc_queue.TryPush(std::forward<Args>(args)...); 265 return spsc_queue.TryEmplace(std::forward<Args>(args)...);
268 } 266 }
269 267
270 void Push(T&& t) { 268 void PushWait(T&& t) {
271 std::scoped_lock lock{write_mutex}; 269 std::scoped_lock lock{write_mutex};
272 spsc_queue.Push(std::move(t)); 270 spsc_queue.PushWait(std::move(t));
273 } 271 }
274 272
275 template <typename... Args> 273 template <typename... Args>
276 void Push(Args&&... args) { 274 void EmplaceWait(Args&&... args) {
277 std::scoped_lock lock{write_mutex}; 275 std::scoped_lock lock{write_mutex};
278 spsc_queue.Push(std::forward<Args>(args)...); 276 spsc_queue.EmplaceWait(std::forward<Args>(args)...);
279 } 277 }
280 278
281 bool TryPop(T& t) { 279 bool TryPop(T& t) {