diff options
| author | 2021-06-29 01:47:13 -0300 | |
|---|---|---|
| committer | 2021-07-08 19:03:26 -0300 | |
| commit | 0ddbbb64e514ea9bba6a4f8bd6908d654e7f114c (patch) | |
| tree | 56196b303ab9dd7f138beb45c471169647e1144a /src | |
| parent | common/thread_worker: Add support for stateful threads (diff) | |
| download | yuzu-0ddbbb64e514ea9bba6a4f8bd6908d654e7f114c.tar.gz yuzu-0ddbbb64e514ea9bba6a4f8bd6908d654e7f114c.tar.xz yuzu-0ddbbb64e514ea9bba6a4f8bd6908d654e7f114c.zip | |
common/thread_worker: Stop workers on stop_token when waiting
Diffstat (limited to 'src')
| -rw-r--r-- | src/common/thread_worker.h | 38 |
1 files changed, 20 insertions, 18 deletions
diff --git a/src/common/thread_worker.h b/src/common/thread_worker.h index 16aa673bd..8272985ff 100644 --- a/src/common/thread_worker.h +++ b/src/common/thread_worker.h | |||
| @@ -7,7 +7,9 @@ | |||
| 7 | #include <atomic> | 7 | #include <atomic> |
| 8 | #include <functional> | 8 | #include <functional> |
| 9 | #include <mutex> | 9 | #include <mutex> |
| 10 | #include <stop_token> | ||
| 10 | #include <string> | 11 | #include <string> |
| 12 | #include <thread> | ||
| 11 | #include <type_traits> | 13 | #include <type_traits> |
| 12 | #include <vector> | 14 | #include <vector> |
| 13 | #include <queue> | 15 | #include <queue> |
| @@ -34,19 +36,19 @@ class StatefulThreadWorker { | |||
| 34 | public: | 36 | public: |
| 35 | explicit StatefulThreadWorker(size_t num_workers, std::string name, StateMaker func = {}) | 37 | explicit StatefulThreadWorker(size_t num_workers, std::string name, StateMaker func = {}) |
| 36 | : workers_queued{num_workers}, thread_name{std::move(name)} { | 38 | : workers_queued{num_workers}, thread_name{std::move(name)} { |
| 37 | const auto lambda = [this, func] { | 39 | const auto lambda = [this, func](std::stop_token stop_token) { |
| 38 | Common::SetCurrentThreadName(thread_name.c_str()); | 40 | Common::SetCurrentThreadName(thread_name.c_str()); |
| 39 | { | 41 | { |
| 40 | std::conditional_t<with_state, StateType, int> state{func()}; | 42 | std::conditional_t<with_state, StateType, int> state{func()}; |
| 41 | while (!stop) { | 43 | while (!stop_token.stop_requested()) { |
| 42 | Task task; | 44 | Task task; |
| 43 | { | 45 | { |
| 44 | std::unique_lock lock{queue_mutex}; | 46 | std::unique_lock lock{queue_mutex}; |
| 45 | if (requests.empty()) { | 47 | if (requests.empty()) { |
| 46 | wait_condition.notify_all(); | 48 | wait_condition.notify_all(); |
| 47 | } | 49 | } |
| 48 | condition.wait(lock, [this] { return stop || !requests.empty(); }); | 50 | condition.wait(lock, stop_token, [this] { return !requests.empty(); }); |
| 49 | if (stop) { | 51 | if (stop_token.stop_requested()) { |
| 50 | break; | 52 | break; |
| 51 | } | 53 | } |
| 52 | task = std::move(requests.front()); | 54 | task = std::move(requests.front()); |
| @@ -63,21 +65,17 @@ public: | |||
| 63 | ++workers_stopped; | 65 | ++workers_stopped; |
| 64 | wait_condition.notify_all(); | 66 | wait_condition.notify_all(); |
| 65 | }; | 67 | }; |
| 68 | threads.reserve(num_workers); | ||
| 66 | for (size_t i = 0; i < num_workers; ++i) { | 69 | for (size_t i = 0; i < num_workers; ++i) { |
| 67 | threads.emplace_back(lambda); | 70 | threads.emplace_back(lambda); |
| 68 | } | 71 | } |
| 69 | } | 72 | } |
| 70 | 73 | ||
| 71 | ~StatefulThreadWorker() { | 74 | StatefulThreadWorker& operator=(const StatefulThreadWorker&) = delete; |
| 72 | { | 75 | StatefulThreadWorker(const StatefulThreadWorker&) = delete; |
| 73 | std::unique_lock lock{queue_mutex}; | 76 | |
| 74 | stop = true; | 77 | StatefulThreadWorker& operator=(StatefulThreadWorker&&) = delete; |
| 75 | } | 78 | StatefulThreadWorker(StatefulThreadWorker&&) = delete; |
| 76 | condition.notify_all(); | ||
| 77 | for (std::thread& thread : threads) { | ||
| 78 | thread.join(); | ||
| 79 | } | ||
| 80 | } | ||
| 81 | 79 | ||
| 82 | void QueueWork(Task work) { | 80 | void QueueWork(Task work) { |
| 83 | { | 81 | { |
| @@ -88,7 +86,12 @@ public: | |||
| 88 | condition.notify_one(); | 86 | condition.notify_one(); |
| 89 | } | 87 | } |
| 90 | 88 | ||
| 91 | void WaitForRequests() { | 89 | void WaitForRequests(std::stop_token stop_token = {}) { |
| 90 | std::stop_callback callback(stop_token, [this] { | ||
| 91 | for (auto& thread : threads) { | ||
| 92 | thread.request_stop(); | ||
| 93 | } | ||
| 94 | }); | ||
| 92 | std::unique_lock lock{queue_mutex}; | 95 | std::unique_lock lock{queue_mutex}; |
| 93 | wait_condition.wait(lock, [this] { | 96 | wait_condition.wait(lock, [this] { |
| 94 | return workers_stopped >= workers_queued || work_done >= work_scheduled; | 97 | return workers_stopped >= workers_queued || work_done >= work_scheduled; |
| @@ -96,17 +99,16 @@ public: | |||
| 96 | } | 99 | } |
| 97 | 100 | ||
| 98 | private: | 101 | private: |
| 99 | std::vector<std::thread> threads; | ||
| 100 | std::queue<Task> requests; | 102 | std::queue<Task> requests; |
| 101 | std::mutex queue_mutex; | 103 | std::mutex queue_mutex; |
| 102 | std::condition_variable condition; | 104 | std::condition_variable_any condition; |
| 103 | std::condition_variable wait_condition; | 105 | std::condition_variable wait_condition; |
| 104 | std::atomic_bool stop{}; | ||
| 105 | std::atomic<size_t> work_scheduled{}; | 106 | std::atomic<size_t> work_scheduled{}; |
| 106 | std::atomic<size_t> work_done{}; | 107 | std::atomic<size_t> work_done{}; |
| 107 | std::atomic<size_t> workers_stopped{}; | 108 | std::atomic<size_t> workers_stopped{}; |
| 108 | std::atomic<size_t> workers_queued{}; | 109 | std::atomic<size_t> workers_queued{}; |
| 109 | std::string thread_name; | 110 | std::string thread_name; |
| 111 | std::vector<std::jthread> threads; | ||
| 110 | }; | 112 | }; |
| 111 | 113 | ||
| 112 | using ThreadWorker = StatefulThreadWorker<>; | 114 | using ThreadWorker = StatefulThreadWorker<>; |