summaryrefslogtreecommitdiff
path: root/src/common/threadsafe_queue.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/common/threadsafe_queue.h')
-rw-r--r--src/common/threadsafe_queue.h53
1 files changed, 34 insertions, 19 deletions
diff --git a/src/common/threadsafe_queue.h b/src/common/threadsafe_queue.h
index edf13bc49..821e8536a 100644
--- a/src/common/threadsafe_queue.h
+++ b/src/common/threadsafe_queue.h
@@ -7,17 +7,17 @@
7// a simple lockless thread-safe, 7// a simple lockless thread-safe,
8// single reader, single writer queue 8// single reader, single writer queue
9 9
10#include <algorithm>
11#include <atomic> 10#include <atomic>
11#include <condition_variable>
12#include <cstddef> 12#include <cstddef>
13#include <mutex> 13#include <mutex>
14#include "common/common_types.h" 14#include <utility>
15 15
16namespace Common { 16namespace Common {
17template <typename T, bool NeedSize = true> 17template <typename T>
18class SPSCQueue { 18class SPSCQueue {
19public: 19public:
20 SPSCQueue() : size(0) { 20 SPSCQueue() {
21 write_ptr = read_ptr = new ElementPtr(); 21 write_ptr = read_ptr = new ElementPtr();
22 } 22 }
23 ~SPSCQueue() { 23 ~SPSCQueue() {
@@ -25,13 +25,12 @@ public:
25 delete read_ptr; 25 delete read_ptr;
26 } 26 }
27 27
28 u32 Size() const { 28 std::size_t Size() const {
29 static_assert(NeedSize, "using Size() on FifoQueue without NeedSize");
30 return size.load(); 29 return size.load();
31 } 30 }
32 31
33 bool Empty() const { 32 bool Empty() const {
34 return !read_ptr->next.load(); 33 return Size() == 0;
35 } 34 }
36 35
37 T& Front() const { 36 T& Front() const {
@@ -47,13 +46,14 @@ public:
47 ElementPtr* new_ptr = new ElementPtr(); 46 ElementPtr* new_ptr = new ElementPtr();
48 write_ptr->next.store(new_ptr, std::memory_order_release); 47 write_ptr->next.store(new_ptr, std::memory_order_release);
49 write_ptr = new_ptr; 48 write_ptr = new_ptr;
50 if (NeedSize) 49 cv.notify_one();
51 size++; 50
51 ++size;
52 } 52 }
53 53
54 void Pop() { 54 void Pop() {
55 if (NeedSize) 55 --size;
56 size--; 56
57 ElementPtr* tmpptr = read_ptr; 57 ElementPtr* tmpptr = read_ptr;
58 // advance the read pointer 58 // advance the read pointer
59 read_ptr = tmpptr->next.load(); 59 read_ptr = tmpptr->next.load();
@@ -66,8 +66,7 @@ public:
66 if (Empty()) 66 if (Empty())
67 return false; 67 return false;
68 68
69 if (NeedSize) 69 --size;
70 size--;
71 70
72 ElementPtr* tmpptr = read_ptr; 71 ElementPtr* tmpptr = read_ptr;
73 read_ptr = tmpptr->next.load(std::memory_order_acquire); 72 read_ptr = tmpptr->next.load(std::memory_order_acquire);
@@ -77,6 +76,16 @@ public:
77 return true; 76 return true;
78 } 77 }
79 78
79 T PopWait() {
80 if (Empty()) {
81 std::unique_lock<std::mutex> lock(cv_mutex);
82 cv.wait(lock, [this]() { return !Empty(); });
83 }
84 T t;
85 Pop(t);
86 return t;
87 }
88
80 // not thread-safe 89 // not thread-safe
81 void Clear() { 90 void Clear() {
82 size.store(0); 91 size.store(0);
@@ -89,7 +98,7 @@ private:
89 // and a pointer to the next ElementPtr 98 // and a pointer to the next ElementPtr
90 class ElementPtr { 99 class ElementPtr {
91 public: 100 public:
92 ElementPtr() : next(nullptr) {} 101 ElementPtr() {}
93 ~ElementPtr() { 102 ~ElementPtr() {
94 ElementPtr* next_ptr = next.load(); 103 ElementPtr* next_ptr = next.load();
95 104
@@ -98,21 +107,23 @@ private:
98 } 107 }
99 108
100 T current; 109 T current;
101 std::atomic<ElementPtr*> next; 110 std::atomic<ElementPtr*> next{nullptr};
102 }; 111 };
103 112
104 ElementPtr* write_ptr; 113 ElementPtr* write_ptr;
105 ElementPtr* read_ptr; 114 ElementPtr* read_ptr;
106 std::atomic<u32> size; 115 std::atomic_size_t size{0};
116 std::mutex cv_mutex;
117 std::condition_variable cv;
107}; 118};
108 119
109// a simple thread-safe, 120// a simple thread-safe,
110// single reader, multiple writer queue 121// single reader, multiple writer queue
111 122
112template <typename T, bool NeedSize = true> 123template <typename T>
113class MPSCQueue { 124class MPSCQueue {
114public: 125public:
115 u32 Size() const { 126 std::size_t Size() const {
116 return spsc_queue.Size(); 127 return spsc_queue.Size();
117 } 128 }
118 129
@@ -138,13 +149,17 @@ public:
138 return spsc_queue.Pop(t); 149 return spsc_queue.Pop(t);
139 } 150 }
140 151
152 T PopWait() {
153 return spsc_queue.PopWait();
154 }
155
141 // not thread-safe 156 // not thread-safe
142 void Clear() { 157 void Clear() {
143 spsc_queue.Clear(); 158 spsc_queue.Clear();
144 } 159 }
145 160
146private: 161private:
147 SPSCQueue<T, NeedSize> spsc_queue; 162 SPSCQueue<T> spsc_queue;
148 std::mutex write_lock; 163 std::mutex write_lock;
149}; 164};
150} // namespace Common 165} // namespace Common