summaryrefslogtreecommitdiff
path: root/src/common/threadsafe_queue.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/common/threadsafe_queue.h')
-rw-r--r--src/common/threadsafe_queue.h24
1 files changed, 11 insertions, 13 deletions
diff --git a/src/common/threadsafe_queue.h b/src/common/threadsafe_queue.h
index edf13bc49..2660b118a 100644
--- a/src/common/threadsafe_queue.h
+++ b/src/common/threadsafe_queue.h
@@ -14,10 +14,10 @@
14#include "common/common_types.h" 14#include "common/common_types.h"
15 15
16namespace Common { 16namespace Common {
17template <typename T, bool NeedSize = true> 17template <typename T>
18class SPSCQueue { 18class SPSCQueue {
19public: 19public:
20 SPSCQueue() : size(0) { 20 SPSCQueue() {
21 write_ptr = read_ptr = new ElementPtr(); 21 write_ptr = read_ptr = new ElementPtr();
22 } 22 }
23 ~SPSCQueue() { 23 ~SPSCQueue() {
@@ -26,12 +26,11 @@ public:
26 } 26 }
27 27
28 u32 Size() const { 28 u32 Size() const {
29 static_assert(NeedSize, "using Size() on FifoQueue without NeedSize");
30 return size.load(); 29 return size.load();
31 } 30 }
32 31
33 bool Empty() const { 32 bool Empty() const {
34 return !read_ptr->next.load(); 33 return Size() == 0;
35 } 34 }
36 35
37 T& Front() const { 36 T& Front() const {
@@ -47,13 +46,13 @@ public:
47 ElementPtr* new_ptr = new ElementPtr(); 46 ElementPtr* new_ptr = new ElementPtr();
48 write_ptr->next.store(new_ptr, std::memory_order_release); 47 write_ptr->next.store(new_ptr, std::memory_order_release);
49 write_ptr = new_ptr; 48 write_ptr = new_ptr;
50 if (NeedSize) 49
51 size++; 50 ++size;
52 } 51 }
53 52
54 void Pop() { 53 void Pop() {
55 if (NeedSize) 54 --size;
56 size--; 55
57 ElementPtr* tmpptr = read_ptr; 56 ElementPtr* tmpptr = read_ptr;
58 // advance the read pointer 57 // advance the read pointer
59 read_ptr = tmpptr->next.load(); 58 read_ptr = tmpptr->next.load();
@@ -66,8 +65,7 @@ public:
66 if (Empty()) 65 if (Empty())
67 return false; 66 return false;
68 67
69 if (NeedSize) 68 --size;
70 size--;
71 69
72 ElementPtr* tmpptr = read_ptr; 70 ElementPtr* tmpptr = read_ptr;
73 read_ptr = tmpptr->next.load(std::memory_order_acquire); 71 read_ptr = tmpptr->next.load(std::memory_order_acquire);
@@ -103,13 +101,13 @@ private:
103 101
104 ElementPtr* write_ptr; 102 ElementPtr* write_ptr;
105 ElementPtr* read_ptr; 103 ElementPtr* read_ptr;
106 std::atomic<u32> size; 104 std::atomic<u32> size{0};
107}; 105};
108 106
109// a simple thread-safe, 107// a simple thread-safe,
110// single reader, multiple writer queue 108// single reader, multiple writer queue
111 109
112template <typename T, bool NeedSize = true> 110template <typename T>
113class MPSCQueue { 111class MPSCQueue {
114public: 112public:
115 u32 Size() const { 113 u32 Size() const {
@@ -144,7 +142,7 @@ public:
144 } 142 }
145 143
146private: 144private:
147 SPSCQueue<T, NeedSize> spsc_queue; 145 SPSCQueue<T> spsc_queue;
148 std::mutex write_lock; 146 std::mutex write_lock;
149}; 147};
150} // namespace Common 148} // namespace Common