summaryrefslogtreecommitdiff
path: root/src/common/polyfill_thread.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/common/polyfill_thread.h')
-rw-r--r--src/common/polyfill_thread.h123
1 files changed, 84 insertions, 39 deletions
diff --git a/src/common/polyfill_thread.h b/src/common/polyfill_thread.h
index 5a8d1ce08..b5ef055db 100644
--- a/src/common/polyfill_thread.h
+++ b/src/common/polyfill_thread.h
@@ -11,6 +11,8 @@
11 11
12#ifdef __cpp_lib_jthread 12#ifdef __cpp_lib_jthread
13 13
14#include <chrono>
15#include <condition_variable>
14#include <stop_token> 16#include <stop_token>
15#include <thread> 17#include <thread>
16 18
@@ -21,23 +23,36 @@ void CondvarWait(Condvar& cv, Lock& lock, std::stop_token token, Pred&& pred) {
21 cv.wait(lock, token, std::move(pred)); 23 cv.wait(lock, token, std::move(pred));
22} 24}
23 25
26template <typename Rep, typename Period>
27bool StoppableTimedWait(std::stop_token token, const std::chrono::duration<Rep, Period>& rel_time) {
28 std::condition_variable_any cv;
29 std::mutex m;
30
31 // Perform the timed wait.
32 std::unique_lock lk{m};
33 return !cv.wait_for(lk, token, rel_time, [&] { return token.stop_requested(); });
34}
35
24} // namespace Common 36} // namespace Common
25 37
26#else 38#else
27 39
28#include <atomic> 40#include <atomic>
41#include <chrono>
42#include <condition_variable>
29#include <functional> 43#include <functional>
30#include <list> 44#include <map>
31#include <memory> 45#include <memory>
32#include <mutex> 46#include <mutex>
33#include <optional> 47#include <optional>
34#include <thread> 48#include <thread>
35#include <type_traits> 49#include <type_traits>
50#include <utility>
36 51
37namespace std { 52namespace std {
38namespace polyfill { 53namespace polyfill {
39 54
40using stop_state_callbacks = list<function<void()>>; 55using stop_state_callback = size_t;
41 56
42class stop_state { 57class stop_state {
43public: 58public:
@@ -45,61 +60,69 @@ public:
45 ~stop_state() = default; 60 ~stop_state() = default;
46 61
47 bool request_stop() { 62 bool request_stop() {
48 stop_state_callbacks callbacks; 63 unique_lock lk{m_lock};
49 64
50 { 65 if (m_stop_requested) {
51 scoped_lock lk{m_lock}; 66 // Already set, nothing to do.
67 return false;
68 }
52 69
53 if (m_stop_requested.load()) { 70 // Mark stop requested.
54 // Already set, nothing to do 71 m_stop_requested = true;
55 return false;
56 }
57 72
58 // Set as requested 73 while (!m_callbacks.empty()) {
59 m_stop_requested = true; 74 // Get an iterator to the first element.
75 const auto it = m_callbacks.begin();
60 76
61 // Copy callback list 77 // Move the callback function out of the map.
62 callbacks = m_callbacks; 78 function<void()> f;
63 } 79 swap(it->second, f);
80
81 // Erase the now-empty map element.
82 m_callbacks.erase(it);
64 83
65 for (auto callback : callbacks) { 84 // Run the callback.
66 callback(); 85 if (f) {
86 f();
87 }
67 } 88 }
68 89
69 return true; 90 return true;
70 } 91 }
71 92
72 bool stop_requested() const { 93 bool stop_requested() const {
73 return m_stop_requested.load(); 94 unique_lock lk{m_lock};
95 return m_stop_requested;
74 } 96 }
75 97
76 stop_state_callbacks::const_iterator insert_callback(function<void()> f) { 98 stop_state_callback insert_callback(function<void()> f) {
77 stop_state_callbacks::const_iterator ret{}; 99 unique_lock lk{m_lock};
78 bool should_run{};
79
80 {
81 scoped_lock lk{m_lock};
82 should_run = m_stop_requested.load();
83 m_callbacks.push_front(f);
84 ret = m_callbacks.begin();
85 }
86 100
87 if (should_run) { 101 if (m_stop_requested) {
88 f(); 102 // Stop already requested. Don't insert anything,
103 // just run the callback synchronously.
104 if (f) {
105 f();
106 }
107 return 0;
89 } 108 }
90 109
110 // Insert the callback.
111 stop_state_callback ret = ++m_next_callback;
112 m_callbacks.emplace(ret, move(f));
91 return ret; 113 return ret;
92 } 114 }
93 115
94 void remove_callback(stop_state_callbacks::const_iterator it) { 116 void remove_callback(stop_state_callback cb) {
95 scoped_lock lk{m_lock}; 117 unique_lock lk{m_lock};
96 m_callbacks.erase(it); 118 m_callbacks.erase(cb);
97 } 119 }
98 120
99private: 121private:
100 mutex m_lock; 122 mutable recursive_mutex m_lock;
101 atomic<bool> m_stop_requested; 123 map<stop_state_callback, function<void()>> m_callbacks;
102 stop_state_callbacks m_callbacks; 124 stop_state_callback m_next_callback{0};
125 bool m_stop_requested{false};
103}; 126};
104 127
105} // namespace polyfill 128} // namespace polyfill
@@ -190,7 +213,7 @@ public:
190 using callback_type = Callback; 213 using callback_type = Callback;
191 214
192 template <typename C> 215 template <typename C>
193 requires constructible_from<Callback, C> 216 requires constructible_from<Callback, C>
194 explicit stop_callback(const stop_token& st, 217 explicit stop_callback(const stop_token& st,
195 C&& cb) noexcept(is_nothrow_constructible_v<Callback, C>) 218 C&& cb) noexcept(is_nothrow_constructible_v<Callback, C>)
196 : m_stop_state(st.m_stop_state) { 219 : m_stop_state(st.m_stop_state) {
@@ -199,7 +222,7 @@ public:
199 } 222 }
200 } 223 }
201 template <typename C> 224 template <typename C>
202 requires constructible_from<Callback, C> 225 requires constructible_from<Callback, C>
203 explicit stop_callback(stop_token&& st, 226 explicit stop_callback(stop_token&& st,
204 C&& cb) noexcept(is_nothrow_constructible_v<Callback, C>) 227 C&& cb) noexcept(is_nothrow_constructible_v<Callback, C>)
205 : m_stop_state(move(st.m_stop_state)) { 228 : m_stop_state(move(st.m_stop_state)) {
@@ -209,7 +232,7 @@ public:
209 } 232 }
210 ~stop_callback() { 233 ~stop_callback() {
211 if (m_stop_state && m_callback) { 234 if (m_stop_state && m_callback) {
212 m_stop_state->remove_callback(*m_callback); 235 m_stop_state->remove_callback(m_callback);
213 } 236 }
214 } 237 }
215 238
@@ -220,7 +243,7 @@ public:
220 243
221private: 244private:
222 shared_ptr<polyfill::stop_state> m_stop_state; 245 shared_ptr<polyfill::stop_state> m_stop_state;
223 optional<polyfill::stop_state_callbacks::const_iterator> m_callback; 246 polyfill::stop_state_callback m_callback;
224}; 247};
225 248
226template <typename Callback> 249template <typename Callback>
@@ -318,6 +341,28 @@ void CondvarWait(Condvar& cv, Lock& lock, std::stop_token token, Pred pred) {
318 cv.wait(lock, [&] { return pred() || token.stop_requested(); }); 341 cv.wait(lock, [&] { return pred() || token.stop_requested(); });
319} 342}
320 343
344template <typename Rep, typename Period>
345bool StoppableTimedWait(std::stop_token token, const std::chrono::duration<Rep, Period>& rel_time) {
346 if (token.stop_requested()) {
347 return false;
348 }
349
350 bool stop_requested = false;
351 std::condition_variable cv;
352 std::mutex m;
353
354 std::stop_callback cb(token, [&] {
355 // Wake up the waiting thread.
356 std::unique_lock lk{m};
357 stop_requested = true;
358 cv.notify_one();
359 });
360
361 // Perform the timed wait.
362 std::unique_lock lk{m};
363 return !cv.wait_for(lk, rel_time, [&] { return stop_requested; });
364}
365
321} // namespace Common 366} // namespace Common
322 367
323#endif 368#endif