summaryrefslogtreecommitdiff
path: root/src/common/concurrent_ring_buffer.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/common/concurrent_ring_buffer.h')
-rw-r--r--src/common/concurrent_ring_buffer.h164
1 files changed, 164 insertions, 0 deletions
diff --git a/src/common/concurrent_ring_buffer.h b/src/common/concurrent_ring_buffer.h
new file mode 100644
index 000000000..2951d93db
--- /dev/null
+++ b/src/common/concurrent_ring_buffer.h
@@ -0,0 +1,164 @@
1// Copyright 2014 Citra Emulator Project
2// Licensed under GPLv2+
3// Refer to the license.txt file included.
4
5#pragma once
6
7#include <array>
8#include <condition_variable>
9#include <cstdint>
10#include <mutex>
11#include <thread>
12
13#include "common/common.h" // for NonCopyable
14#include "common/log.h" // for _dbg_assert_
15
16namespace Common {
17
18/**
19 * A MPMC (Multiple-Producer Multiple-Consumer) concurrent ring buffer. This data structure permits
20 * multiple threads to push and pop from a queue of bounded size.
21 */
22template <typename T, size_t ArraySize>
23class ConcurrentRingBuffer : private NonCopyable {
24public:
25 /// Value returned by the popping functions when the queue has been closed.
26 static const size_t QUEUE_CLOSED = -1;
27
28 ConcurrentRingBuffer() {}
29
30 ~ConcurrentRingBuffer() {
31 // If for whatever reason the queue wasn't completely drained, destroy the left over items.
32 for (size_t i = reader_index, end = writer_index; i != end; i = (i + 1) % ArraySize) {
33 Data()[i].~T();
34 }
35 }
36
37 /**
38 * Pushes a value to the queue. If the queue is full, this method will block. Does nothing if
39 * the queue is closed.
40 */
41 void Push(T val) {
42 std::unique_lock<std::mutex> lock(mutex);
43 if (closed) {
44 return;
45 }
46
47 // If the buffer is full, wait
48 writer.wait(lock, [&]{
49 return (writer_index + 1) % ArraySize != reader_index;
50 });
51
52 T* item = &Data()[writer_index];
53 new (item) T(std::move(val));
54
55 writer_index = (writer_index + 1) % ArraySize;
56
57 // Wake up waiting readers
58 lock.unlock();
59 reader.notify_one();
60 }
61
62 /**
63 * Pops up to `dest_len` items from the queue, storing them in `dest`. This function will not
64 * block, and might return 0 values if there are no elements in the queue when it is called.
65 *
66 * @return The number of elements stored in `dest`. If the queue has been closed, returns
67 * `QUEUE_CLOSED`.
68 */
69 size_t Pop(T* dest, size_t dest_len) {
70 std::unique_lock<std::mutex> lock(mutex);
71 if (closed && !CanRead()) {
72 return QUEUE_CLOSED;
73 }
74 return PopInternal(dest, dest_len);
75 }
76
77 /**
78 * Pops up to `dest_len` items from the queue, storing them in `dest`. This function will block
79 * if there are no elements in the queue when it is called.
80 *
81 * @return The number of elements stored in `dest`. If the queue has been closed, returns
82 * `QUEUE_CLOSED`.
83 */
84 size_t BlockingPop(T* dest, size_t dest_len) {
85 std::unique_lock<std::mutex> lock(mutex);
86 if (closed && !CanRead()) {
87 return QUEUE_CLOSED;
88 }
89
90 while (!CanRead()) {
91 reader.wait(lock);
92 if (closed && !CanRead()) {
93 return QUEUE_CLOSED;
94 }
95 }
96 _dbg_assert_(Common, CanRead());
97 return PopInternal(dest, dest_len);
98 }
99
100 /**
101 * Closes the queue. After calling this method, `Push` operations won't have any effect, and
102 * `PopMany` and `PopManyBlock` will start returning `QUEUE_CLOSED`. This is intended to allow
103 * a graceful shutdown of all consumers.
104 */
105 void Close() {
106 std::unique_lock<std::mutex> lock(mutex);
107 closed = true;
108 // We need to wake up any reader that are waiting for an item that will never come.
109 lock.unlock();
110 reader.notify_all();
111 }
112
113 /// Returns true if `Close()` has been called.
114 bool IsClosed() const {
115 return closed;
116 }
117
118private:
119 size_t PopInternal(T* dest, size_t dest_len) {
120 size_t output_count = 0;
121 while (output_count < dest_len && CanRead()) {
122 _dbg_assert_(Common, CanRead());
123
124 T* item = &Data()[reader_index];
125 T out_val = std::move(*item);
126 item->~T();
127
128 size_t prev_index = (reader_index + ArraySize - 1) % ArraySize;
129 reader_index = (reader_index + 1) % ArraySize;
130 if (writer_index == prev_index) {
131 writer.notify_one();
132 }
133 dest[output_count++] = std::move(out_val);
134 }
135 return output_count;
136 }
137
138 bool CanRead() const {
139 return reader_index != writer_index;
140 }
141
142 T* Data() {
143 return static_cast<T*>(static_cast<void*>(&storage));
144 }
145
146 /// Storage for entries
147 typename std::aligned_storage<ArraySize * sizeof(T),
148 std::alignment_of<T>::value>::type storage;
149
150 /// Data is valid in the half-open interval [reader, writer). If they are `QUEUE_CLOSED` then the
151 /// queue has been closed.
152 size_t writer_index = 0, reader_index = 0;
153 // True if the queue has been closed.
154 bool closed = false;
155
156 /// Mutex that protects the entire data structure.
157 std::mutex mutex;
158 /// Signaling wakes up reader which is waiting for storage to be non-empty.
159 std::condition_variable reader;
160 /// Signaling wakes up writer which is waiting for storage to be non-full.
161 std::condition_variable writer;
162};
163
164} // namespace