summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/common/bounded_threadsafe_queue.h316
-rw-r--r--src/common/logging/backend.cpp16
-rw-r--r--src/video_core/gpu_thread.cpp7
3 files changed, 215 insertions, 124 deletions
diff --git a/src/common/bounded_threadsafe_queue.h b/src/common/bounded_threadsafe_queue.h
index 14e887c70..bd87aa09b 100644
--- a/src/common/bounded_threadsafe_queue.h
+++ b/src/common/bounded_threadsafe_queue.h
@@ -1,159 +1,249 @@
1// SPDX-FileCopyrightText: Copyright (c) 2020 Erik Rigtorp <erik@rigtorp.se> 1// SPDX-FileCopyrightText: Copyright 2023 yuzu Emulator Project
2// SPDX-License-Identifier: MIT 2// SPDX-License-Identifier: GPL-2.0-or-later
3 3
4#pragma once 4#pragma once
5 5
6#include <atomic> 6#include <atomic>
7#include <bit>
8#include <condition_variable> 7#include <condition_variable>
9#include <memory> 8#include <cstddef>
10#include <mutex> 9#include <mutex>
11#include <new> 10#include <new>
12#include <type_traits>
13#include <utility>
14 11
15#include "common/polyfill_thread.h" 12#include "common/polyfill_thread.h"
16 13
17namespace Common { 14namespace Common {
18 15
19#if defined(__cpp_lib_hardware_interference_size) 16namespace detail {
20constexpr size_t hardware_interference_size = std::hardware_destructive_interference_size; 17constexpr size_t DefaultCapacity = 0x1000;
21#else 18} // namespace detail
22constexpr size_t hardware_interference_size = 64; 19
23#endif 20template <typename T, size_t Capacity = detail::DefaultCapacity>
21class SPSCQueue {
22 static_assert((Capacity & (Capacity - 1)) == 0, "Capacity must be a power of two.");
24 23
25template <typename T, size_t capacity = 0x400>
26class MPSCQueue {
27public: 24public:
28 explicit MPSCQueue() : allocator{std::allocator<Slot<T>>()} { 25 template <typename... Args>
29 // Allocate one extra slot to prevent false sharing on the last slot 26 bool TryEmplace(Args&&... args) {
30 slots = allocator.allocate(capacity + 1); 27 return Emplace<PushMode::Try>(std::forward<Args>(args)...);
31 // Allocators are not required to honor alignment for over-aligned types
32 // (see http://eel.is/c++draft/allocator.requirements#10) so we verify
33 // alignment here
34 if (reinterpret_cast<uintptr_t>(slots) % alignof(Slot<T>) != 0) {
35 allocator.deallocate(slots, capacity + 1);
36 throw std::bad_alloc();
37 }
38 for (size_t i = 0; i < capacity; ++i) {
39 std::construct_at(&slots[i]);
40 }
41 static_assert(std::has_single_bit(capacity), "capacity must be an integer power of 2");
42 static_assert(alignof(Slot<T>) == hardware_interference_size,
43 "Slot must be aligned to cache line boundary to prevent false sharing");
44 static_assert(sizeof(Slot<T>) % hardware_interference_size == 0,
45 "Slot size must be a multiple of cache line size to prevent "
46 "false sharing between adjacent slots");
47 static_assert(sizeof(MPSCQueue) % hardware_interference_size == 0,
48 "Queue size must be a multiple of cache line size to "
49 "prevent false sharing between adjacent queues");
50 }
51
52 ~MPSCQueue() noexcept {
53 for (size_t i = 0; i < capacity; ++i) {
54 std::destroy_at(&slots[i]);
55 }
56 allocator.deallocate(slots, capacity + 1);
57 } 28 }
58 29
59 // The queue must be both non-copyable and non-movable 30 template <typename... Args>
60 MPSCQueue(const MPSCQueue&) = delete; 31 void EmplaceWait(Args&&... args) {
61 MPSCQueue& operator=(const MPSCQueue&) = delete; 32 Emplace<PushMode::Wait>(std::forward<Args>(args)...);
33 }
62 34
63 MPSCQueue(MPSCQueue&&) = delete; 35 bool TryPop(T& t) {
64 MPSCQueue& operator=(MPSCQueue&&) = delete; 36 return Pop<PopMode::Try>(t);
37 }
65 38
66 void Push(const T& v) noexcept { 39 void PopWait(T& t) {
67 static_assert(std::is_nothrow_copy_constructible_v<T>, 40 Pop<PopMode::Wait>(t);
68 "T must be nothrow copy constructible");
69 emplace(v);
70 } 41 }
71 42
72 template <typename P, typename = std::enable_if_t<std::is_nothrow_constructible_v<T, P&&>>> 43 void PopWait(T& t, std::stop_token stop_token) {
73 void Push(P&& v) noexcept { 44 Pop<PopMode::WaitWithStopToken>(t, stop_token);
74 emplace(std::forward<P>(v));
75 } 45 }
76 46
77 void Pop(T& v, std::stop_token stop) noexcept { 47 T PopWait() {
78 auto const tail = tail_.fetch_add(1); 48 T t;
79 auto& slot = slots[idx(tail)]; 49 Pop<PopMode::Wait>(t);
80 if (!slot.turn.test()) { 50 return t;
81 std::unique_lock lock{cv_mutex}; 51 }
82 Common::CondvarWait(cv, lock, stop, [&slot] { return slot.turn.test(); }); 52
83 } 53 T PopWait(std::stop_token stop_token) {
84 v = slot.move(); 54 T t;
85 slot.destroy(); 55 Pop<PopMode::WaitWithStopToken>(t, stop_token);
86 slot.turn.clear(); 56 return t;
87 slot.turn.notify_one();
88 } 57 }
89 58
90private: 59private:
91 template <typename U = T> 60 enum class PushMode {
92 struct Slot { 61 Try,
93 ~Slot() noexcept { 62 Wait,
94 if (turn.test()) { 63 Count,
95 destroy(); 64 };
65
66 enum class PopMode {
67 Try,
68 Wait,
69 WaitWithStopToken,
70 Count,
71 };
72
73 template <PushMode Mode, typename... Args>
74 bool Emplace(Args&&... args) {
75 const size_t write_index = m_write_index.load(std::memory_order::relaxed);
76
77 if constexpr (Mode == PushMode::Try) {
78 // Check if we have free slots to write to.
79 if ((write_index - m_read_index.load(std::memory_order::acquire)) == Capacity) {
80 return false;
96 } 81 }
82 } else if constexpr (Mode == PushMode::Wait) {
83 // Wait until we have free slots to write to.
84 std::unique_lock lock{producer_cv_mutex};
85 producer_cv.wait(lock, [this, write_index] {
86 return (write_index - m_read_index.load(std::memory_order::acquire)) < Capacity;
87 });
88 } else {
89 static_assert(Mode < PushMode::Count, "Invalid PushMode.");
97 } 90 }
98 91
99 template <typename... Args> 92 // Determine the position to write to.
100 void construct(Args&&... args) noexcept { 93 const size_t pos = write_index % Capacity;
101 static_assert(std::is_nothrow_constructible_v<U, Args&&...>,
102 "T must be nothrow constructible with Args&&...");
103 std::construct_at(reinterpret_cast<U*>(&storage), std::forward<Args>(args)...);
104 }
105 94
106 void destroy() noexcept { 95 // Emplace into the queue.
107 static_assert(std::is_nothrow_destructible_v<U>, "T must be nothrow destructible"); 96 std::construct_at(std::addressof(m_data[pos]), std::forward<Args>(args)...);
108 std::destroy_at(reinterpret_cast<U*>(&storage)); 97
109 } 98 // Increment the write index.
99 ++m_write_index;
100
101 // Notify the consumer that we have pushed into the queue.
102 std::scoped_lock lock{consumer_cv_mutex};
103 consumer_cv.notify_one();
110 104
111 U&& move() noexcept { 105 return true;
112 return reinterpret_cast<U&&>(storage); 106 }
107
108 template <PopMode Mode>
109 bool Pop(T& t, [[maybe_unused]] std::stop_token stop_token = {}) {
110 const size_t read_index = m_read_index.load(std::memory_order::relaxed);
111
112 if constexpr (Mode == PopMode::Try) {
113 // Check if the queue is empty.
114 if (read_index == m_write_index.load(std::memory_order::acquire)) {
115 return false;
116 }
117 } else if constexpr (Mode == PopMode::Wait) {
118 // Wait until the queue is not empty.
119 std::unique_lock lock{consumer_cv_mutex};
120 consumer_cv.wait(lock, [this, read_index] {
121 return read_index != m_write_index.load(std::memory_order::acquire);
122 });
123 } else if constexpr (Mode == PopMode::WaitWithStopToken) {
124 // Wait until the queue is not empty.
125 std::unique_lock lock{consumer_cv_mutex};
126 Common::CondvarWait(consumer_cv, lock, stop_token, [this, read_index] {
127 return read_index != m_write_index.load(std::memory_order::acquire);
128 });
129 if (stop_token.stop_requested()) {
130 return false;
131 }
132 } else {
133 static_assert(Mode < PopMode::Count, "Invalid PopMode.");
113 } 134 }
114 135
115 // Align to avoid false sharing between adjacent slots 136 // Determine the position to read from.
116 alignas(hardware_interference_size) std::atomic_flag turn{}; 137 const size_t pos = read_index % Capacity;
117 struct aligned_store { 138
118 struct type { 139 // Pop the data off the queue, moving it.
119 alignas(U) unsigned char data[sizeof(U)]; 140 t = std::move(m_data[pos]);
120 }; 141
121 }; 142 // Increment the read index.
122 typename aligned_store::type storage; 143 ++m_read_index;
123 }; 144
145 // Notify the producer that we have popped off the queue.
146 std::scoped_lock lock{producer_cv_mutex};
147 producer_cv.notify_one();
148
149 return true;
150 }
151
152 alignas(128) std::atomic_size_t m_read_index{0};
153 alignas(128) std::atomic_size_t m_write_index{0};
124 154
155 std::array<T, Capacity> m_data;
156
157 std::condition_variable_any producer_cv;
158 std::mutex producer_cv_mutex;
159 std::condition_variable_any consumer_cv;
160 std::mutex consumer_cv_mutex;
161};
162
163template <typename T, size_t Capacity = detail::DefaultCapacity>
164class MPSCQueue {
165public:
166 template <typename... Args>
167 bool TryEmplace(Args&&... args) {
168 std::scoped_lock lock{write_mutex};
169 return spsc_queue.TryEmplace(std::forward<Args>(args)...);
170 }
171
172 template <typename... Args>
173 void EmplaceWait(Args&&... args) {
174 std::scoped_lock lock{write_mutex};
175 spsc_queue.EmplaceWait(std::forward<Args>(args)...);
176 }
177
178 bool TryPop(T& t) {
179 return spsc_queue.TryPop(t);
180 }
181
182 void PopWait(T& t) {
183 spsc_queue.PopWait(t);
184 }
185
186 void PopWait(T& t, std::stop_token stop_token) {
187 spsc_queue.PopWait(t, stop_token);
188 }
189
190 T PopWait() {
191 return spsc_queue.PopWait();
192 }
193
194 T PopWait(std::stop_token stop_token) {
195 return spsc_queue.PopWait(stop_token);
196 }
197
198private:
199 SPSCQueue<T, Capacity> spsc_queue;
200 std::mutex write_mutex;
201};
202
203template <typename T, size_t Capacity = detail::DefaultCapacity>
204class MPMCQueue {
205public:
125 template <typename... Args> 206 template <typename... Args>
126 void emplace(Args&&... args) noexcept { 207 bool TryEmplace(Args&&... args) {
127 static_assert(std::is_nothrow_constructible_v<T, Args&&...>, 208 std::scoped_lock lock{write_mutex};
128 "T must be nothrow constructible with Args&&..."); 209 return spsc_queue.TryEmplace(std::forward<Args>(args)...);
129 auto const head = head_.fetch_add(1);
130 auto& slot = slots[idx(head)];
131 slot.turn.wait(true);
132 slot.construct(std::forward<Args>(args)...);
133 slot.turn.test_and_set();
134 cv.notify_one();
135 } 210 }
136 211
137 constexpr size_t idx(size_t i) const noexcept { 212 template <typename... Args>
138 return i & mask; 213 void EmplaceWait(Args&&... args) {
214 std::scoped_lock lock{write_mutex};
215 spsc_queue.EmplaceWait(std::forward<Args>(args)...);
139 } 216 }
140 217
141 static constexpr size_t mask = capacity - 1; 218 bool TryPop(T& t) {
219 std::scoped_lock lock{read_mutex};
220 return spsc_queue.TryPop(t);
221 }
142 222
143 // Align to avoid false sharing between head_ and tail_ 223 void PopWait(T& t) {
144 alignas(hardware_interference_size) std::atomic<size_t> head_{0}; 224 std::scoped_lock lock{read_mutex};
145 alignas(hardware_interference_size) std::atomic<size_t> tail_{0}; 225 spsc_queue.PopWait(t);
226 }
146 227
147 std::mutex cv_mutex; 228 void PopWait(T& t, std::stop_token stop_token) {
148 std::condition_variable_any cv; 229 std::scoped_lock lock{read_mutex};
230 spsc_queue.PopWait(t, stop_token);
231 }
149 232
150 Slot<T>* slots; 233 T PopWait() {
151 [[no_unique_address]] std::allocator<Slot<T>> allocator; 234 std::scoped_lock lock{read_mutex};
235 return spsc_queue.PopWait();
236 }
152 237
153 static_assert(std::is_nothrow_copy_assignable_v<T> || std::is_nothrow_move_assignable_v<T>, 238 T PopWait(std::stop_token stop_token) {
154 "T must be nothrow copy or move assignable"); 239 std::scoped_lock lock{read_mutex};
240 return spsc_queue.PopWait(stop_token);
241 }
155 242
156 static_assert(std::is_nothrow_destructible_v<T>, "T must be nothrow destructible"); 243private:
244 SPSCQueue<T, Capacity> spsc_queue;
245 std::mutex write_mutex;
246 std::mutex read_mutex;
157}; 247};
158 248
159} // namespace Common 249} // namespace Common
diff --git a/src/common/logging/backend.cpp b/src/common/logging/backend.cpp
index 2a3bded40..f96c7c222 100644
--- a/src/common/logging/backend.cpp
+++ b/src/common/logging/backend.cpp
@@ -28,7 +28,7 @@
28#ifdef _WIN32 28#ifdef _WIN32
29#include "common/string_util.h" 29#include "common/string_util.h"
30#endif 30#endif
31#include "common/threadsafe_queue.h" 31#include "common/bounded_threadsafe_queue.h"
32 32
33namespace Common::Log { 33namespace Common::Log {
34 34
@@ -204,11 +204,11 @@ public:
204 204
205 void PushEntry(Class log_class, Level log_level, const char* filename, unsigned int line_num, 205 void PushEntry(Class log_class, Level log_level, const char* filename, unsigned int line_num,
206 const char* function, std::string&& message) { 206 const char* function, std::string&& message) {
207 if (!filter.CheckMessage(log_class, log_level)) 207 if (!filter.CheckMessage(log_class, log_level)) {
208 return; 208 return;
209 const Entry& entry = 209 }
210 CreateEntry(log_class, log_level, filename, line_num, function, std::move(message)); 210 message_queue.EmplaceWait(
211 message_queue.Push(entry); 211 CreateEntry(log_class, log_level, filename, line_num, function, std::move(message)));
212 } 212 }
213 213
214private: 214private:
@@ -225,7 +225,7 @@ private:
225 ForEachBackend([&entry](Backend& backend) { backend.Write(entry); }); 225 ForEachBackend([&entry](Backend& backend) { backend.Write(entry); });
226 }; 226 };
227 while (!stop_token.stop_requested()) { 227 while (!stop_token.stop_requested()) {
228 entry = message_queue.PopWait(stop_token); 228 message_queue.PopWait(entry, stop_token);
229 if (entry.filename != nullptr) { 229 if (entry.filename != nullptr) {
230 write_logs(); 230 write_logs();
231 } 231 }
@@ -233,7 +233,7 @@ private:
233 // Drain the logging queue. Only writes out up to MAX_LOGS_TO_WRITE to prevent a 233 // Drain the logging queue. Only writes out up to MAX_LOGS_TO_WRITE to prevent a
234 // case where a system is repeatedly spamming logs even on close. 234 // case where a system is repeatedly spamming logs even on close.
235 int max_logs_to_write = filter.IsDebug() ? INT_MAX : 100; 235 int max_logs_to_write = filter.IsDebug() ? INT_MAX : 100;
236 while (max_logs_to_write-- && message_queue.Pop(entry)) { 236 while (max_logs_to_write-- && message_queue.TryPop(entry)) {
237 write_logs(); 237 write_logs();
238 } 238 }
239 }); 239 });
@@ -273,7 +273,7 @@ private:
273 ColorConsoleBackend color_console_backend{}; 273 ColorConsoleBackend color_console_backend{};
274 FileBackend file_backend; 274 FileBackend file_backend;
275 275
276 MPSCQueue<Entry, true> message_queue{}; 276 MPSCQueue<Entry> message_queue{};
277 std::chrono::steady_clock::time_point time_origin{std::chrono::steady_clock::now()}; 277 std::chrono::steady_clock::time_point time_origin{std::chrono::steady_clock::now()};
278 std::jthread backend_thread; 278 std::jthread backend_thread;
279}; 279};
diff --git a/src/video_core/gpu_thread.cpp b/src/video_core/gpu_thread.cpp
index f52f9e28f..3c5317777 100644
--- a/src/video_core/gpu_thread.cpp
+++ b/src/video_core/gpu_thread.cpp
@@ -31,9 +31,10 @@ static void RunThread(std::stop_token stop_token, Core::System& system,
31 auto current_context = context.Acquire(); 31 auto current_context = context.Acquire();
32 VideoCore::RasterizerInterface* const rasterizer = renderer.ReadRasterizer(); 32 VideoCore::RasterizerInterface* const rasterizer = renderer.ReadRasterizer();
33 33
34 CommandDataContainer next;
35
34 while (!stop_token.stop_requested()) { 36 while (!stop_token.stop_requested()) {
35 CommandDataContainer next; 37 state.queue.PopWait(next, stop_token);
36 state.queue.Pop(next, stop_token);
37 if (stop_token.stop_requested()) { 38 if (stop_token.stop_requested()) {
38 break; 39 break;
39 } 40 }
@@ -117,7 +118,7 @@ u64 ThreadManager::PushCommand(CommandData&& command_data, bool block) {
117 118
118 std::unique_lock lk(state.write_lock); 119 std::unique_lock lk(state.write_lock);
119 const u64 fence{++state.last_fence}; 120 const u64 fence{++state.last_fence};
120 state.queue.Push(CommandDataContainer(std::move(command_data), fence, block)); 121 state.queue.EmplaceWait(std::move(command_data), fence, block);
121 122
122 if (block) { 123 if (block) {
123 Common::CondvarWait(state.cv, lk, thread.get_stop_token(), [this, fence] { 124 Common::CondvarWait(state.cv, lk, thread.get_stop_token(), [this, fence] {