Thunderbots Project
Loading...
Searching...
No Matches
threaded_observer.hpp
1#pragma once
2
3#include <boost/bind.hpp>
4#include <thread>
5
6#include "software/multithreading/observer.hpp"
7
16template <typename T>
17class ThreadedObserver : public Observer<T>
18{
19 public:
27 bool log_buffer_full = true);
28
29 ~ThreadedObserver() override;
30
31 // Delete the copy and assignment operators because this class really shouldn't need
32 // them and we don't want to risk doing anything nasty with the internal
33 // multithreading this class uses
34 ThreadedObserver& operator=(const ThreadedObserver&) = delete;
35 ThreadedObserver(const ThreadedObserver&) = delete;
36
37 private:
53 virtual void onValueReceived(T val);
54
61 void continuouslyPullValuesFromBuffer();
62
70 virtual std::optional<T> getNextValue(const Duration& max_wait_time);
71
72 // This indicates if the destructor of this class has been called
73 std::mutex in_destructor_mutex;
74 bool in_destructor;
75
76 // The period for checking whether or not the destructor for this class has
77 // been called
78 const Duration IN_DESTRUCTOR_CHECK_PERIOD;
79
80 // This is the thread that will continuously pull values from the buffer
81 // and pass them into the `onValueReceived`
82 std::thread pull_from_buffer_thread;
83};
84
85template <typename T>
86ThreadedObserver<T>::ThreadedObserver(size_t buffer_size, bool log_buffer_full)
87 : Observer<T>(buffer_size, log_buffer_full),
88 in_destructor(false),
89 IN_DESTRUCTOR_CHECK_PERIOD(Duration::fromSeconds(0.1))
90{
91 pull_from_buffer_thread = std::thread(
92 boost::bind(&ThreadedObserver::continuouslyPullValuesFromBuffer, this));
93}
94
95template <typename T>
97{
98 // Do nothing, this function should be overridden to enable custom behavior on
99 // message reception.
100}
101
102template <typename T>
104{
105 do
106 {
107 in_destructor_mutex.unlock();
108 std::optional<T> new_val;
109
110 new_val = this->getNextValue(IN_DESTRUCTOR_CHECK_PERIOD);
111
112 if (new_val)
113 {
114 onValueReceived(*new_val);
115 }
116
117 in_destructor_mutex.lock();
118 } while (!in_destructor);
119}
120
121
122template <typename T>
124{
125 in_destructor_mutex.lock();
126 in_destructor = true;
127 in_destructor_mutex.unlock();
128
129 // We must wait for the thread to stop, as if we destroy it while it's still
130 // running we will segfault
131 pull_from_buffer_thread.join();
132}
133
134template <typename T>
135std::optional<T> ThreadedObserver<T>::getNextValue(const Duration& max_wait_time)
136{
137 // Do nothing, this function should be overridden to enable custom behavior on
138 // message reception.
139 // We *must* provide an implementation here instead of making it pure virtual because
140 // this function may be called from the `pull_from_buffer_thread` *before* the
141 // subclass constructor has completed (ie. before it has "finished overriding" all the
142 // methods). If this functions is pure virtual then this case would cause a crash.
143 return std::nullopt;
144}
Definition duration.h:12
Definition observer.hpp:15
Definition threaded_observer.hpp:18
ThreadedObserver(size_t buffer_size=Observer< T >::DEFAULT_BUFFER_SIZE, bool log_buffer_full=true)
Definition threaded_observer.hpp:86