1 #ifndef OSMIUM_THREAD_QUEUE_HPP 2 #define OSMIUM_THREAD_QUEUE_HPP 38 #include <condition_variable> 74 #ifdef OSMIUM_DEBUG_QUEUE_SIZE 75 size_t m_largest_size;
79 std::atomic<int> m_push_counter;
83 std::atomic<int> m_full_counter;
95 explicit Queue(
size_t max_size = 0,
const std::string& name =
"") :
102 #ifdef OSMIUM_DEBUG_QUEUE_SIZE
113 #ifdef OSMIUM_DEBUG_QUEUE_SIZE 114 std::cerr <<
"queue '" << m_name <<
"' with max_size=" << m_max_size <<
" had largest size " << m_largest_size <<
" and was full " << m_full_counter <<
" times in " << m_push_counter <<
" push() calls\n";
123 #ifdef OSMIUM_DEBUG_QUEUE_SIZE 127 while (
size() >= m_max_size) {
129 #ifdef OSMIUM_DEBUG_QUEUE_SIZE 134 std::lock_guard<std::mutex> lock(m_mutex);
135 m_queue.push(std::move(value));
136 #ifdef OSMIUM_DEBUG_QUEUE_SIZE 137 if (m_largest_size < m_queue.size()) {
138 m_largest_size = m_queue.size();
141 m_data_available.notify_one();
146 m_data_available.notify_all();
150 std::unique_lock<std::mutex> lock(m_mutex);
151 m_data_available.wait(lock, [
this] {
152 return !m_queue.empty() ||
m_done;
154 if (!m_queue.empty()) {
155 value = std::move(m_queue.front());
161 std::unique_lock<std::mutex> lock(m_mutex);
162 if (!m_data_available.wait_for(lock, std::chrono::seconds(1), [
this] {
163 return !m_queue.empty() || m_done;
167 if (!m_queue.empty()) {
168 value = std::move(m_queue.front());
174 std::lock_guard<std::mutex> lock(m_mutex);
175 if (m_queue.empty()) {
178 value = std::move(m_queue.front());
184 std::lock_guard<std::mutex> lock(m_mutex);
185 return m_queue.empty();
189 std::lock_guard<std::mutex> lock(m_mutex);
190 return m_queue.size();
199 #endif // OSMIUM_THREAD_QUEUE_HPP size_t size() const
Definition: queue.hpp:188
bool empty() const
Definition: queue.hpp:183
std::mutex m_mutex
Definition: queue.hpp:65
std::atomic< bool > m_done
Definition: queue.hpp:72
void shutdown()
Definition: queue.hpp:144
~Queue()
Definition: queue.hpp:111
std::queue< T > m_queue
Definition: queue.hpp:67
const std::string m_name
Name of this queue (for debugging only).
Definition: queue.hpp:63
const size_t m_max_size
Definition: queue.hpp:60
Namespace for everything in the Osmium library.
Definition: assembler.hpp:59
void wait_and_pop_with_timeout(T &value)
Definition: queue.hpp:160
bool try_pop(T &value)
Definition: queue.hpp:173
static const std::chrono::milliseconds full_queue_sleep_duration
Definition: queue.hpp:50
void wait_and_pop(T &value)
Definition: queue.hpp:149
void push(T value)
Definition: queue.hpp:122
Queue(size_t max_size=0, const std::string &name="")
Definition: queue.hpp:95
std::condition_variable m_data_available
Used to signal readers when data is available in the queue.
Definition: queue.hpp:70