Thunderbots Project
Loading...
Searching...
No Matches
thread_safe_buffer.hpp
1#pragma once
2
3#include <boost/circular_buffer.hpp>
4#include <condition_variable>
5#include <cstddef>
6#include <deque>
7#include <mutex>
8#include <optional>
9
10#include "software/time/duration.h"
11#include "software/util/typename/typename.h"
12
21template <typename T>
23{
24 public:
25 // Force the user to specify a size
26 explicit ThreadSafeBuffer() = delete;
27
34 explicit ThreadSafeBuffer(std::size_t buffer_size, bool log_buffer_full = true);
35
36 // Copying this class is not permitted
37 ThreadSafeBuffer(const ThreadSafeBuffer&) = delete;
38
56 Duration max_wait_time = Duration::fromSeconds(0));
57
74 std::optional<T> popMostRecentlyAddedValue(
75 Duration max_wait_time = Duration::fromSeconds(0));
76
84 void push(const T& value);
85
90 bool empty() const;
91
93
94 private:
103 std::unique_lock<std::mutex> waitForBufferToHaveAValue(Duration max_wait_time);
104
105 std::mutex buffer_mutex;
106 boost::circular_buffer<T> buffer;
107
108 std::condition_variable received_new_value;
109
110 std::mutex destructor_called_mutex;
111 bool log_buffer_full;
112 bool destructor_called;
113};
114
115template <typename T>
116ThreadSafeBuffer<T>::ThreadSafeBuffer(std::size_t buffer_size, bool log_buffer_full)
117 : buffer(buffer_size), log_buffer_full(log_buffer_full), destructor_called(false)
118{
119}
120
121template <typename T>
123{
124 // We hold the returned lock in a variable here so that we hold the lock on the
125 // buffer mutex until the lock is destructed at the end of this function
126 auto buffer_lock = waitForBufferToHaveAValue(max_wait_time);
127
128 std::optional<T> result = std::nullopt;
129 if (!buffer.empty())
130 {
131 result = buffer.front();
132 buffer.pop_front();
133 }
134 return result;
135}
136
137template <typename T>
139{
140 // We hold the returned lock in a variable here so that we hold the lock on the
141 // buffer mutex until the lock is destructed at the end of this function
142 auto buffer_lock = waitForBufferToHaveAValue(max_wait_time);
143
144 std::optional<T> result = std::nullopt;
145 if (!buffer.empty())
146 {
147 result = buffer.back();
148 buffer.pop_back();
149 }
150 return result;
151}
152
153template <typename T>
154void ThreadSafeBuffer<T>::push(const T& value)
155{
156 std::scoped_lock<std::mutex> buffer_lock(buffer_mutex);
157 if (log_buffer_full && buffer.full())
158 {
159 std::cerr << "Pushing to a full ThreadSafeBuffer of type: " << TYPENAME(T)
160 << std::endl;
161 }
162 buffer.push_back(value);
163 received_new_value.notify_all();
164}
165
166template <typename T>
167std::unique_lock<std::mutex> ThreadSafeBuffer<T>::waitForBufferToHaveAValue(
168 Duration max_wait_time)
169{
170 std::unique_lock<std::mutex> buffer_lock(buffer_mutex);
171 received_new_value.wait_for(
172 buffer_lock, std::chrono::duration<float>(max_wait_time.toSeconds()), [this] {
173 std::scoped_lock destructor_called_lock(destructor_called_mutex);
174 return !buffer.empty() || destructor_called;
175 });
176
177 // NOTE: We need to return this in order to prevent it being destructed so
178 // the lock is maintained until the value is read
179 return buffer_lock;
180}
181
182template <typename T>
184{
185 return buffer.empty();
186}
187
188template <typename T>
190{
191 destructor_called_mutex.lock();
192 destructor_called = true;
193 destructor_called_mutex.unlock();
194
195 received_new_value.notify_all();
196}
Definition duration.h:12
static const Duration fromSeconds(double seconds)
Definition duration.cpp:13
Definition thread_safe_buffer.hpp:23
std::optional< T > popMostRecentlyAddedValue(Duration max_wait_time=Duration::fromSeconds(0))
Definition thread_safe_buffer.hpp:138
std::optional< T > popLeastRecentlyAddedValue(Duration max_wait_time=Duration::fromSeconds(0))
Definition thread_safe_buffer.hpp:122
bool empty() const
Definition thread_safe_buffer.hpp:183
void push(const T &value)
Definition thread_safe_buffer.hpp:154
ThreadSafeBuffer(std::size_t buffer_size, bool log_buffer_full=true)
Definition thread_safe_buffer.hpp:116
double toSeconds() const
Definition time.cpp:12