diff options
| author | 2021-07-08 19:20:57 -0400 | |
|---|---|---|
| committer | 2021-07-08 19:20:57 -0400 | |
| commit | 975a7b3a78af40f05b92a53a96b9dcd7e85969e4 (patch) | |
| tree | 56196b303ab9dd7f138beb45c471169647e1144a /src/common/thread_worker.h | |
| parent | Merge pull request #6539 from lat9nq/default-setting (diff) | |
| parent | common/thread_worker: Stop workers on stop_token when waiting (diff) | |
| download | yuzu-975a7b3a78af40f05b92a53a96b9dcd7e85969e4.tar.gz yuzu-975a7b3a78af40f05b92a53a96b9dcd7e85969e4.tar.xz yuzu-975a7b3a78af40f05b92a53a96b9dcd7e85969e4.zip | |
Merge pull request #6563 from ReinUsesLisp/thread-worker
common: Add stateful thread worker and unique function utilities
Diffstat (limited to 'src/common/thread_worker.h')
| -rw-r--r-- | src/common/thread_worker.h | 102 |
1 files changed, 94 insertions, 8 deletions
diff --git a/src/common/thread_worker.h b/src/common/thread_worker.h index f1859971f..8272985ff 100644 --- a/src/common/thread_worker.h +++ b/src/common/thread_worker.h | |||
| @@ -7,24 +7,110 @@ | |||
| 7 | #include <atomic> | 7 | #include <atomic> |
| 8 | #include <functional> | 8 | #include <functional> |
| 9 | #include <mutex> | 9 | #include <mutex> |
| 10 | #include <stop_token> | ||
| 10 | #include <string> | 11 | #include <string> |
| 12 | #include <thread> | ||
| 13 | #include <type_traits> | ||
| 11 | #include <vector> | 14 | #include <vector> |
| 12 | #include <queue> | 15 | #include <queue> |
| 13 | 16 | ||
| 17 | #include "common/thread.h" | ||
| 18 | #include "common/unique_function.h" | ||
| 19 | |||
| 14 | namespace Common { | 20 | namespace Common { |
| 15 | 21 | ||
| 16 | class ThreadWorker final { | 22 | template <class StateType = void> |
| 23 | class StatefulThreadWorker { | ||
| 24 | static constexpr bool with_state = !std::is_same_v<StateType, void>; | ||
| 25 | |||
| 26 | struct DummyCallable { | ||
| 27 | int operator()() const noexcept { | ||
| 28 | return 0; | ||
| 29 | } | ||
| 30 | }; | ||
| 31 | |||
| 32 | using Task = | ||
| 33 | std::conditional_t<with_state, UniqueFunction<void, StateType*>, UniqueFunction<void>>; | ||
| 34 | using StateMaker = std::conditional_t<with_state, std::function<StateType()>, DummyCallable>; | ||
| 35 | |||
| 17 | public: | 36 | public: |
| 18 | explicit ThreadWorker(std::size_t num_workers, const std::string& name); | 37 | explicit StatefulThreadWorker(size_t num_workers, std::string name, StateMaker func = {}) |
| 19 | ~ThreadWorker(); | 38 | : workers_queued{num_workers}, thread_name{std::move(name)} { |
| 20 | void QueueWork(std::function<void()>&& work); | 39 | const auto lambda = [this, func](std::stop_token stop_token) { |
| 40 | Common::SetCurrentThreadName(thread_name.c_str()); | ||
| 41 | { | ||
| 42 | std::conditional_t<with_state, StateType, int> state{func()}; | ||
| 43 | while (!stop_token.stop_requested()) { | ||
| 44 | Task task; | ||
| 45 | { | ||
| 46 | std::unique_lock lock{queue_mutex}; | ||
| 47 | if (requests.empty()) { | ||
| 48 | wait_condition.notify_all(); | ||
| 49 | } | ||
| 50 | condition.wait(lock, stop_token, [this] { return !requests.empty(); }); | ||
| 51 | if (stop_token.stop_requested()) { | ||
| 52 | break; | ||
| 53 | } | ||
| 54 | task = std::move(requests.front()); | ||
| 55 | requests.pop(); | ||
| 56 | } | ||
| 57 | if constexpr (with_state) { | ||
| 58 | task(&state); | ||
| 59 | } else { | ||
| 60 | task(); | ||
| 61 | } | ||
| 62 | ++work_done; | ||
| 63 | } | ||
| 64 | } | ||
| 65 | ++workers_stopped; | ||
| 66 | wait_condition.notify_all(); | ||
| 67 | }; | ||
| 68 | threads.reserve(num_workers); | ||
| 69 | for (size_t i = 0; i < num_workers; ++i) { | ||
| 70 | threads.emplace_back(lambda); | ||
| 71 | } | ||
| 72 | } | ||
| 73 | |||
| 74 | StatefulThreadWorker& operator=(const StatefulThreadWorker&) = delete; | ||
| 75 | StatefulThreadWorker(const StatefulThreadWorker&) = delete; | ||
| 76 | |||
| 77 | StatefulThreadWorker& operator=(StatefulThreadWorker&&) = delete; | ||
| 78 | StatefulThreadWorker(StatefulThreadWorker&&) = delete; | ||
| 79 | |||
| 80 | void QueueWork(Task work) { | ||
| 81 | { | ||
| 82 | std::unique_lock lock{queue_mutex}; | ||
| 83 | requests.emplace(std::move(work)); | ||
| 84 | ++work_scheduled; | ||
| 85 | } | ||
| 86 | condition.notify_one(); | ||
| 87 | } | ||
| 88 | |||
| 89 | void WaitForRequests(std::stop_token stop_token = {}) { | ||
| 90 | std::stop_callback callback(stop_token, [this] { | ||
| 91 | for (auto& thread : threads) { | ||
| 92 | thread.request_stop(); | ||
| 93 | } | ||
| 94 | }); | ||
| 95 | std::unique_lock lock{queue_mutex}; | ||
| 96 | wait_condition.wait(lock, [this] { | ||
| 97 | return workers_stopped >= workers_queued || work_done >= work_scheduled; | ||
| 98 | }); | ||
| 99 | } | ||
| 21 | 100 | ||
| 22 | private: | 101 | private: |
| 23 | std::vector<std::thread> threads; | 102 | std::queue<Task> requests; |
| 24 | std::queue<std::function<void()>> requests; | ||
| 25 | std::mutex queue_mutex; | 103 | std::mutex queue_mutex; |
| 26 | std::condition_variable condition; | 104 | std::condition_variable_any condition; |
| 27 | std::atomic_bool stop{}; | 105 | std::condition_variable wait_condition; |
| 106 | std::atomic<size_t> work_scheduled{}; | ||
| 107 | std::atomic<size_t> work_done{}; | ||
| 108 | std::atomic<size_t> workers_stopped{}; | ||
| 109 | std::atomic<size_t> workers_queued{}; | ||
| 110 | std::string thread_name; | ||
| 111 | std::vector<std::jthread> threads; | ||
| 28 | }; | 112 | }; |
| 29 | 113 | ||
| 114 | using ThreadWorker = StatefulThreadWorker<>; | ||
| 115 | |||
| 30 | } // namespace Common | 116 | } // namespace Common |