From 41549365680f0aae3f82e5812f3470305e939f7d Mon Sep 17 00:00:00 2001 From: B3n30 Date: Sun, 9 Sep 2018 13:08:57 +0200 Subject: threadsafe_queue: Add WaitIfEmpty and use it in logging --- src/common/threadsafe_queue.h | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) (limited to 'src/common/threadsafe_queue.h') diff --git a/src/common/threadsafe_queue.h b/src/common/threadsafe_queue.h index f553efdc9..4fdcecca0 100644 --- a/src/common/threadsafe_queue.h +++ b/src/common/threadsafe_queue.h @@ -8,6 +8,7 @@ // single reader, single writer queue #include +#include #include #include #include @@ -39,12 +40,13 @@ public: template void Push(Arg&& t) { // create the element, add it to the queue - write_ptr->current = std::forward(t); + write_ptr->current = std::move(t); // set the next pointer to a new element ptr // then advance the write pointer ElementPtr* new_ptr = new ElementPtr(); write_ptr->next.store(new_ptr, std::memory_order_release); write_ptr = new_ptr; + cv.notify_one(); ++size; } @@ -67,6 +69,7 @@ public: --size; ElementPtr* tmpptr = read_ptr; + read_ptr = tmpptr->next.load(std::memory_order_acquire); t = std::move(tmpptr->current); tmpptr->next.store(nullptr); @@ -74,6 +77,14 @@ public: return true; } + bool PopWait(T& t) { + if (Empty()) { + std::unique_lock lock(cv_mutex); + cv.wait(lock, [this]() { return !Empty(); }); + } + return Pop(t); + } + // not thread-safe void Clear() { size.store(0); @@ -101,6 +112,8 @@ private: ElementPtr* write_ptr; ElementPtr* read_ptr; std::atomic_size_t size{0}; + std::mutex cv_mutex; + std::condition_variable cv; }; // a simple thread-safe, @@ -135,6 +148,10 @@ public: return spsc_queue.Pop(t); } + bool PopWait(T& t) { + return spsc_queue.PopWait(t); + } + // not thread-safe void Clear() { spsc_queue.Clear(); -- cgit v1.2.3 From 2195f10d152a52e01ccab0a6528f8758752d66a9 Mon Sep 17 00:00:00 2001 From: B3n30 Date: Mon, 8 Oct 2018 23:28:54 +0200 Subject: Adressed review comments --- src/common/threadsafe_queue.h | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) (limited to 'src/common/threadsafe_queue.h') diff --git a/src/common/threadsafe_queue.h b/src/common/threadsafe_queue.h index 4fdcecca0..821e8536a 100644 --- a/src/common/threadsafe_queue.h +++ b/src/common/threadsafe_queue.h @@ -40,7 +40,7 @@ public: template void Push(Arg&& t) { // create the element, add it to the queue - write_ptr->current = std::move(t); + write_ptr->current = std::forward(t); // set the next pointer to a new element ptr // then advance the write pointer ElementPtr* new_ptr = new ElementPtr(); @@ -69,7 +69,6 @@ public: --size; ElementPtr* tmpptr = read_ptr; - read_ptr = tmpptr->next.load(std::memory_order_acquire); t = std::move(tmpptr->current); tmpptr->next.store(nullptr); @@ -77,12 +76,14 @@ public: return true; } - bool PopWait(T& t) { + T PopWait() { if (Empty()) { std::unique_lock lock(cv_mutex); cv.wait(lock, [this]() { return !Empty(); }); } - return Pop(t); + T t; + Pop(t); + return t; } // not thread-safe @@ -148,8 +149,8 @@ public: return spsc_queue.Pop(t); } - bool PopWait(T& t) { - return spsc_queue.PopWait(t); + T PopWait() { + return spsc_queue.PopWait(); } // not thread-safe -- cgit v1.2.3