summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/common/CMakeLists.txt1
-rw-r--r--src/common/concurrent_ring_buffer.h163
2 files changed, 0 insertions, 164 deletions
diff --git a/src/common/CMakeLists.txt b/src/common/CMakeLists.txt
index 11659c3c5..66e19d0e4 100644
--- a/src/common/CMakeLists.txt
+++ b/src/common/CMakeLists.txt
@@ -28,7 +28,6 @@ set(HEADERS
28 common_funcs.h 28 common_funcs.h
29 common_paths.h 29 common_paths.h
30 common_types.h 30 common_types.h
31 concurrent_ring_buffer.h
32 cpu_detect.h 31 cpu_detect.h
33 debug_interface.h 32 debug_interface.h
34 emu_window.h 33 emu_window.h
diff --git a/src/common/concurrent_ring_buffer.h b/src/common/concurrent_ring_buffer.h
deleted file mode 100644
index c5889513a..000000000
--- a/src/common/concurrent_ring_buffer.h
+++ /dev/null
@@ -1,163 +0,0 @@
1// Copyright 2014 Citra Emulator Project
2// Licensed under GPLv2 or any later version
3// Refer to the license.txt file included.
4
5#pragma once
6
7#include <array>
8#include <condition_variable>
9#include <cstdint>
10#include <mutex>
11#include <thread>
12
13#include "common/common_types.h" // for NonCopyable
14
15namespace Common {
16
17/**
18 * A MPMC (Multiple-Producer Multiple-Consumer) concurrent ring buffer. This data structure permits
19 * multiple threads to push and pop from a queue of bounded size.
20 */
21template <typename T, size_t ArraySize>
22class ConcurrentRingBuffer : private NonCopyable {
23public:
24 /// Value returned by the popping functions when the queue has been closed.
25 static const size_t QUEUE_CLOSED = -1;
26
27 ConcurrentRingBuffer() {}
28
29 ~ConcurrentRingBuffer() {
30 // If for whatever reason the queue wasn't completely drained, destroy the left over items.
31 for (size_t i = reader_index, end = writer_index; i != end; i = (i + 1) % ArraySize) {
32 Data()[i].~T();
33 }
34 }
35
36 /**
37 * Pushes a value to the queue. If the queue is full, this method will block. Does nothing if
38 * the queue is closed.
39 */
40 void Push(T val) {
41 std::unique_lock<std::mutex> lock(mutex);
42 if (closed) {
43 return;
44 }
45
46 // If the buffer is full, wait
47 writer.wait(lock, [&]{
48 return (writer_index + 1) % ArraySize != reader_index;
49 });
50
51 T* item = &Data()[writer_index];
52 new (item) T(std::move(val));
53
54 writer_index = (writer_index + 1) % ArraySize;
55
56 // Wake up waiting readers
57 lock.unlock();
58 reader.notify_one();
59 }
60
61 /**
62 * Pops up to `dest_len` items from the queue, storing them in `dest`. This function will not
63 * block, and might return 0 values if there are no elements in the queue when it is called.
64 *
65 * @return The number of elements stored in `dest`. If the queue has been closed, returns
66 * `QUEUE_CLOSED`.
67 */
68 size_t Pop(T* dest, size_t dest_len) {
69 std::unique_lock<std::mutex> lock(mutex);
70 if (closed && !CanRead()) {
71 return QUEUE_CLOSED;
72 }
73 return PopInternal(dest, dest_len);
74 }
75
76 /**
77 * Pops up to `dest_len` items from the queue, storing them in `dest`. This function will block
78 * if there are no elements in the queue when it is called.
79 *
80 * @return The number of elements stored in `dest`. If the queue has been closed, returns
81 * `QUEUE_CLOSED`.
82 */
83 size_t BlockingPop(T* dest, size_t dest_len) {
84 std::unique_lock<std::mutex> lock(mutex);
85 if (closed && !CanRead()) {
86 return QUEUE_CLOSED;
87 }
88
89 while (!CanRead()) {
90 reader.wait(lock);
91 if (closed && !CanRead()) {
92 return QUEUE_CLOSED;
93 }
94 }
95 DEBUG_ASSERT(CanRead());
96 return PopInternal(dest, dest_len);
97 }
98
99 /**
100 * Closes the queue. After calling this method, `Push` operations won't have any effect, and
101 * `PopMany` and `PopManyBlock` will start returning `QUEUE_CLOSED`. This is intended to allow
102 * a graceful shutdown of all consumers.
103 */
104 void Close() {
105 std::unique_lock<std::mutex> lock(mutex);
106 closed = true;
107 // We need to wake up any reader that are waiting for an item that will never come.
108 lock.unlock();
109 reader.notify_all();
110 }
111
112 /// Returns true if `Close()` has been called.
113 bool IsClosed() const {
114 return closed;
115 }
116
117private:
118 size_t PopInternal(T* dest, size_t dest_len) {
119 size_t output_count = 0;
120 while (output_count < dest_len && CanRead()) {
121 DEBUG_ASSERT(CanRead());
122
123 T* item = &Data()[reader_index];
124 T out_val = std::move(*item);
125 item->~T();
126
127 size_t prev_index = (reader_index + ArraySize - 1) % ArraySize;
128 reader_index = (reader_index + 1) % ArraySize;
129 if (writer_index == prev_index) {
130 writer.notify_one();
131 }
132 dest[output_count++] = std::move(out_val);
133 }
134 return output_count;
135 }
136
137 bool CanRead() const {
138 return reader_index != writer_index;
139 }
140
141 T* Data() {
142 return static_cast<T*>(static_cast<void*>(&storage));
143 }
144
145 /// Storage for entries
146 typename std::aligned_storage<ArraySize * sizeof(T),
147 std::alignment_of<T>::value>::type storage;
148
149 /// Data is valid in the half-open interval [reader, writer). If they are `QUEUE_CLOSED` then the
150 /// queue has been closed.
151 size_t writer_index = 0, reader_index = 0;
152 // True if the queue has been closed.
153 bool closed = false;
154
155 /// Mutex that protects the entire data structure.
156 std::mutex mutex;
157 /// Signaling wakes up reader which is waiting for storage to be non-empty.
158 std::condition_variable reader;
159 /// Signaling wakes up writer which is waiting for storage to be non-full.
160 std::condition_variable writer;
161};
162
163} // namespace