summaryrefslogtreecommitdiff
path: root/src/common/threadsafe_queue.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/common/threadsafe_queue.h')
-rw-r--r--src/common/threadsafe_queue.h18
1 files changed, 18 insertions, 0 deletions
diff --git a/src/common/threadsafe_queue.h b/src/common/threadsafe_queue.h
index f553efdc9..821e8536a 100644
--- a/src/common/threadsafe_queue.h
+++ b/src/common/threadsafe_queue.h
@@ -8,6 +8,7 @@
8// single reader, single writer queue 8// single reader, single writer queue
9 9
10#include <atomic> 10#include <atomic>
11#include <condition_variable>
11#include <cstddef> 12#include <cstddef>
12#include <mutex> 13#include <mutex>
13#include <utility> 14#include <utility>
@@ -45,6 +46,7 @@ public:
45 ElementPtr* new_ptr = new ElementPtr(); 46 ElementPtr* new_ptr = new ElementPtr();
46 write_ptr->next.store(new_ptr, std::memory_order_release); 47 write_ptr->next.store(new_ptr, std::memory_order_release);
47 write_ptr = new_ptr; 48 write_ptr = new_ptr;
49 cv.notify_one();
48 50
49 ++size; 51 ++size;
50 } 52 }
@@ -74,6 +76,16 @@ public:
74 return true; 76 return true;
75 } 77 }
76 78
79 T PopWait() {
80 if (Empty()) {
81 std::unique_lock<std::mutex> lock(cv_mutex);
82 cv.wait(lock, [this]() { return !Empty(); });
83 }
84 T t;
85 Pop(t);
86 return t;
87 }
88
77 // not thread-safe 89 // not thread-safe
78 void Clear() { 90 void Clear() {
79 size.store(0); 91 size.store(0);
@@ -101,6 +113,8 @@ private:
101 ElementPtr* write_ptr; 113 ElementPtr* write_ptr;
102 ElementPtr* read_ptr; 114 ElementPtr* read_ptr;
103 std::atomic_size_t size{0}; 115 std::atomic_size_t size{0};
116 std::mutex cv_mutex;
117 std::condition_variable cv;
104}; 118};
105 119
106// a simple thread-safe, 120// a simple thread-safe,
@@ -135,6 +149,10 @@ public:
135 return spsc_queue.Pop(t); 149 return spsc_queue.Pop(t);
136 } 150 }
137 151
152 T PopWait() {
153 return spsc_queue.PopWait();
154 }
155
138 // not thread-safe 156 // not thread-safe
139 void Clear() { 157 void Clear() {
140 spsc_queue.Clear(); 158 spsc_queue.Clear();