summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGravatar Morph2023-03-19 03:19:25 -0400
committerGravatar Morph2023-03-21 19:17:32 -0400
commit306840a5808cae10bf5d91e4b6e8a91cd619386b (patch)
tree22d42022abbf8ab522ee046919920135d86be42c
parentMerge pull request #9970 from bunnei/string-util-view (diff)
downloadyuzu-306840a5808cae10bf5d91e4b6e8a91cd619386b.tar.gz
yuzu-306840a5808cae10bf5d91e4b6e8a91cd619386b.tar.xz
yuzu-306840a5808cae10bf5d91e4b6e8a91cd619386b.zip
bounded_threadsafe_queue: Use simplified impl of bounded queue
Provides a simplified SPSC, MPSC, and MPMC bounded queue implementation using mutexes.
-rw-r--r--src/common/bounded_threadsafe_queue.h311
-rw-r--r--src/video_core/gpu_thread.cpp7
2 files changed, 203 insertions, 115 deletions
diff --git a/src/common/bounded_threadsafe_queue.h b/src/common/bounded_threadsafe_queue.h
index 14e887c70..e03427539 100644
--- a/src/common/bounded_threadsafe_queue.h
+++ b/src/common/bounded_threadsafe_queue.h
@@ -1,159 +1,246 @@
1// SPDX-FileCopyrightText: Copyright (c) 2020 Erik Rigtorp <erik@rigtorp.se> 1// SPDX-FileCopyrightText: Copyright 2023 yuzu Emulator Project
2// SPDX-License-Identifier: MIT 2// SPDX-License-Identifier: GPL-2.0-or-later
3 3
4#pragma once 4#pragma once
5 5
6#include <atomic> 6#include <atomic>
7#include <bit>
8#include <condition_variable> 7#include <condition_variable>
9#include <memory> 8#include <cstddef>
10#include <mutex> 9#include <mutex>
11#include <new> 10#include <new>
12#include <type_traits>
13#include <utility>
14 11
15#include "common/polyfill_thread.h" 12#include "common/polyfill_thread.h"
16 13
17namespace Common { 14namespace Common {
18 15
19#if defined(__cpp_lib_hardware_interference_size) 16namespace detail {
20constexpr size_t hardware_interference_size = std::hardware_destructive_interference_size; 17constexpr size_t DefaultCapacity = 0x1000;
21#else 18} // namespace detail
22constexpr size_t hardware_interference_size = 64; 19
23#endif 20template <typename T, size_t Capacity = detail::DefaultCapacity>
21class SPSCQueue {
22 static_assert((Capacity & (Capacity - 1)) == 0, "Capacity must be a power of two.");
24 23
25template <typename T, size_t capacity = 0x400>
26class MPSCQueue {
27public: 24public:
28 explicit MPSCQueue() : allocator{std::allocator<Slot<T>>()} { 25 void Push(T&& t) {
29 // Allocate one extra slot to prevent false sharing on the last slot 26 const size_t write_index = m_write_index.load();
30 slots = allocator.allocate(capacity + 1); 27
31 // Allocators are not required to honor alignment for over-aligned types 28 // Wait until we have free slots to write to.
32 // (see http://eel.is/c++draft/allocator.requirements#10) so we verify 29 while ((write_index - m_read_index.load()) == Capacity) {
33 // alignment here 30 std::this_thread::yield();
34 if (reinterpret_cast<uintptr_t>(slots) % alignof(Slot<T>) != 0) {
35 allocator.deallocate(slots, capacity + 1);
36 throw std::bad_alloc();
37 }
38 for (size_t i = 0; i < capacity; ++i) {
39 std::construct_at(&slots[i]);
40 }
41 static_assert(std::has_single_bit(capacity), "capacity must be an integer power of 2");
42 static_assert(alignof(Slot<T>) == hardware_interference_size,
43 "Slot must be aligned to cache line boundary to prevent false sharing");
44 static_assert(sizeof(Slot<T>) % hardware_interference_size == 0,
45 "Slot size must be a multiple of cache line size to prevent "
46 "false sharing between adjacent slots");
47 static_assert(sizeof(MPSCQueue) % hardware_interference_size == 0,
48 "Queue size must be a multiple of cache line size to "
49 "prevent false sharing between adjacent queues");
50 }
51
52 ~MPSCQueue() noexcept {
53 for (size_t i = 0; i < capacity; ++i) {
54 std::destroy_at(&slots[i]);
55 } 31 }
56 allocator.deallocate(slots, capacity + 1); 32
33 // Determine the position to write to.
34 const size_t pos = write_index % Capacity;
35
36 // Push into the queue.
37 m_data[pos] = std::move(t);
38
39 // Increment the write index.
40 ++m_write_index;
41
42 // Notify the consumer that we have pushed into the queue.
43 std::scoped_lock lock{cv_mutex};
44 cv.notify_one();
57 } 45 }
58 46
59 // The queue must be both non-copyable and non-movable 47 template <typename... Args>
60 MPSCQueue(const MPSCQueue&) = delete; 48 void Push(Args&&... args) {
61 MPSCQueue& operator=(const MPSCQueue&) = delete; 49 const size_t write_index = m_write_index.load();
50
51 // Wait until we have free slots to write to.
52 while ((write_index - m_read_index.load()) == Capacity) {
53 std::this_thread::yield();
54 }
55
56 // Determine the position to write to.
57 const size_t pos = write_index % Capacity;
58
59 // Emplace into the queue.
60 std::construct_at(std::addressof(m_data[pos]), std::forward<Args>(args)...);
61
62 // Increment the write index.
63 ++m_write_index;
64
65 // Notify the consumer that we have pushed into the queue.
66 std::scoped_lock lock{cv_mutex};
67 cv.notify_one();
68 }
62 69
63 MPSCQueue(MPSCQueue&&) = delete; 70 bool TryPop(T& t) {
64 MPSCQueue& operator=(MPSCQueue&&) = delete; 71 return Pop(t);
72 }
65 73
66 void Push(const T& v) noexcept { 74 void PopWait(T& t, std::stop_token stop_token) {
67 static_assert(std::is_nothrow_copy_constructible_v<T>, 75 Wait(stop_token);
68 "T must be nothrow copy constructible"); 76 Pop(t);
69 emplace(v);
70 } 77 }
71 78
72 template <typename P, typename = std::enable_if_t<std::is_nothrow_constructible_v<T, P&&>>> 79 T PopWait(std::stop_token stop_token) {
73 void Push(P&& v) noexcept { 80 Wait(stop_token);
74 emplace(std::forward<P>(v)); 81 T t;
82 Pop(t);
83 return t;
75 } 84 }
76 85
77 void Pop(T& v, std::stop_token stop) noexcept { 86 void Clear() {
78 auto const tail = tail_.fetch_add(1); 87 while (!Empty()) {
79 auto& slot = slots[idx(tail)]; 88 Pop();
80 if (!slot.turn.test()) {
81 std::unique_lock lock{cv_mutex};
82 Common::CondvarWait(cv, lock, stop, [&slot] { return slot.turn.test(); });
83 } 89 }
84 v = slot.move(); 90 }
85 slot.destroy(); 91
86 slot.turn.clear(); 92 bool Empty() const {
87 slot.turn.notify_one(); 93 return m_read_index.load() == m_write_index.load();
94 }
95
96 size_t Size() const {
97 return m_write_index.load() - m_read_index.load();
88 } 98 }
89 99
90private: 100private:
91 template <typename U = T> 101 void Pop() {
92 struct Slot { 102 const size_t read_index = m_read_index.load();
93 ~Slot() noexcept {
94 if (turn.test()) {
95 destroy();
96 }
97 }
98 103
99 template <typename... Args> 104 // Check if the queue is empty.
100 void construct(Args&&... args) noexcept { 105 if (read_index == m_write_index.load()) {
101 static_assert(std::is_nothrow_constructible_v<U, Args&&...>, 106 return;
102 "T must be nothrow constructible with Args&&...");
103 std::construct_at(reinterpret_cast<U*>(&storage), std::forward<Args>(args)...);
104 } 107 }
105 108
106 void destroy() noexcept { 109 // Determine the position to read from.
107 static_assert(std::is_nothrow_destructible_v<U>, "T must be nothrow destructible"); 110 const size_t pos = read_index % Capacity;
108 std::destroy_at(reinterpret_cast<U*>(&storage)); 111
109 } 112 // Pop the data off the queue, deleting it.
113 std::destroy_at(std::addressof(m_data[pos]));
114
115 // Increment the read index.
116 ++m_read_index;
117 }
110 118
111 U&& move() noexcept { 119 bool Pop(T& t) {
112 return reinterpret_cast<U&&>(storage); 120 const size_t read_index = m_read_index.load();
121
122 // Check if the queue is empty.
123 if (read_index == m_write_index.load()) {
124 return false;
113 } 125 }
114 126
115 // Align to avoid false sharing between adjacent slots 127 // Determine the position to read from.
116 alignas(hardware_interference_size) std::atomic_flag turn{}; 128 const size_t pos = read_index % Capacity;
117 struct aligned_store {
118 struct type {
119 alignas(U) unsigned char data[sizeof(U)];
120 };
121 };
122 typename aligned_store::type storage;
123 };
124 129
125 template <typename... Args> 130 // Pop the data off the queue, moving it.
126 void emplace(Args&&... args) noexcept { 131 t = std::move(m_data[pos]);
127 static_assert(std::is_nothrow_constructible_v<T, Args&&...>, 132
128 "T must be nothrow constructible with Args&&..."); 133 // Increment the read index.
129 auto const head = head_.fetch_add(1); 134 ++m_read_index;
130 auto& slot = slots[idx(head)]; 135
131 slot.turn.wait(true); 136 return true;
132 slot.construct(std::forward<Args>(args)...);
133 slot.turn.test_and_set();
134 cv.notify_one();
135 } 137 }
136 138
137 constexpr size_t idx(size_t i) const noexcept { 139 void Wait(std::stop_token stop_token) {
138 return i & mask; 140 std::unique_lock lock{cv_mutex};
141 Common::CondvarWait(cv, lock, stop_token, [this] { return !Empty(); });
139 } 142 }
140 143
141 static constexpr size_t mask = capacity - 1; 144 alignas(128) std::atomic_size_t m_read_index{0};
145 alignas(128) std::atomic_size_t m_write_index{0};
142 146
143 // Align to avoid false sharing between head_ and tail_ 147 std::array<T, Capacity> m_data;
144 alignas(hardware_interference_size) std::atomic<size_t> head_{0};
145 alignas(hardware_interference_size) std::atomic<size_t> tail_{0};
146 148
147 std::mutex cv_mutex;
148 std::condition_variable_any cv; 149 std::condition_variable_any cv;
150 std::mutex cv_mutex;
151};
152
153template <typename T, size_t Capacity = detail::DefaultCapacity>
154class MPSCQueue {
155public:
156 void Push(T&& t) {
157 std::scoped_lock lock{write_mutex};
158 spsc_queue.Push(std::move(t));
159 }
160
161 template <typename... Args>
162 void Push(Args&&... args) {
163 std::scoped_lock lock{write_mutex};
164 spsc_queue.Push(std::forward<Args>(args)...);
165 }
166
167 bool TryPop(T& t) {
168 return spsc_queue.TryPop(t);
169 }
170
171 void PopWait(T& t, std::stop_token stop_token) {
172 spsc_queue.PopWait(t, stop_token);
173 }
174
175 T PopWait(std::stop_token stop_token) {
176 return spsc_queue.PopWait(stop_token);
177 }
178
179 void Clear() {
180 spsc_queue.Clear();
181 }
182
183 bool Empty() {
184 return spsc_queue.Empty();
185 }
186
187 size_t Size() {
188 return spsc_queue.Size();
189 }
190
191private:
192 SPSCQueue<T, Capacity> spsc_queue;
193 std::mutex write_mutex;
194};
195
196template <typename T, size_t Capacity = detail::DefaultCapacity>
197class MPMCQueue {
198public:
199 void Push(T&& t) {
200 std::scoped_lock lock{write_mutex};
201 spsc_queue.Push(std::move(t));
202 }
149 203
150 Slot<T>* slots; 204 template <typename... Args>
151 [[no_unique_address]] std::allocator<Slot<T>> allocator; 205 void Push(Args&&... args) {
206 std::scoped_lock lock{write_mutex};
207 spsc_queue.Push(std::forward<Args>(args)...);
208 }
152 209
153 static_assert(std::is_nothrow_copy_assignable_v<T> || std::is_nothrow_move_assignable_v<T>, 210 bool TryPop(T& t) {
154 "T must be nothrow copy or move assignable"); 211 std::scoped_lock lock{read_mutex};
212 return spsc_queue.TryPop(t);
213 }
214
215 void PopWait(T& t, std::stop_token stop_token) {
216 std::scoped_lock lock{read_mutex};
217 spsc_queue.PopWait(t, stop_token);
218 }
155 219
156 static_assert(std::is_nothrow_destructible_v<T>, "T must be nothrow destructible"); 220 T PopWait(std::stop_token stop_token) {
221 std::scoped_lock lock{read_mutex};
222 return spsc_queue.PopWait(stop_token);
223 }
224
225 void Clear() {
226 std::scoped_lock lock{read_mutex};
227 spsc_queue.Clear();
228 }
229
230 bool Empty() {
231 std::scoped_lock lock{read_mutex};
232 return spsc_queue.Empty();
233 }
234
235 size_t Size() {
236 std::scoped_lock lock{read_mutex};
237 return spsc_queue.Size();
238 }
239
240private:
241 SPSCQueue<T, Capacity> spsc_queue;
242 std::mutex write_mutex;
243 std::mutex read_mutex;
157}; 244};
158 245
159} // namespace Common 246} // namespace Common
diff --git a/src/video_core/gpu_thread.cpp b/src/video_core/gpu_thread.cpp
index f52f9e28f..469a59cf9 100644
--- a/src/video_core/gpu_thread.cpp
+++ b/src/video_core/gpu_thread.cpp
@@ -31,9 +31,10 @@ static void RunThread(std::stop_token stop_token, Core::System& system,
31 auto current_context = context.Acquire(); 31 auto current_context = context.Acquire();
32 VideoCore::RasterizerInterface* const rasterizer = renderer.ReadRasterizer(); 32 VideoCore::RasterizerInterface* const rasterizer = renderer.ReadRasterizer();
33 33
34 CommandDataContainer next;
35
34 while (!stop_token.stop_requested()) { 36 while (!stop_token.stop_requested()) {
35 CommandDataContainer next; 37 state.queue.PopWait(next, stop_token);
36 state.queue.Pop(next, stop_token);
37 if (stop_token.stop_requested()) { 38 if (stop_token.stop_requested()) {
38 break; 39 break;
39 } 40 }
@@ -117,7 +118,7 @@ u64 ThreadManager::PushCommand(CommandData&& command_data, bool block) {
117 118
118 std::unique_lock lk(state.write_lock); 119 std::unique_lock lk(state.write_lock);
119 const u64 fence{++state.last_fence}; 120 const u64 fence{++state.last_fence};
120 state.queue.Push(CommandDataContainer(std::move(command_data), fence, block)); 121 state.queue.Push(std::move(command_data), fence, block);
121 122
122 if (block) { 123 if (block) {
123 Common::CondvarWait(state.cv, lk, thread.get_stop_token(), [this, fence] { 124 Common::CondvarWait(state.cv, lk, thread.get_stop_token(), [this, fence] {