24#include <seastar/core/circular_buffer.hh>
25#include <seastar/core/future.hh>
26#include <seastar/util/assert.hh>
27#include <seastar/util/modules.hh>
44requires std::is_nothrow_move_constructible_v<T>
46 std::queue<T, circular_buffer<T>> _q;
48 std::optional<promise<>> _not_empty;
49 std::optional<promise<>> _not_full;
50 std::exception_ptr _ex =
nullptr;
52 void notify_not_empty()
noexcept;
53 void notify_not_full()
noexcept;
77 template <typename Func>
84 bool full() const noexcept;
132 void abort(std::exception_ptr ex)
noexcept {
136 while (!_q.empty()) {
141 _not_full->set_exception(ex);
142 _not_full= std::nullopt;
145 _not_empty->set_exception(std::move(ex));
146 _not_empty = std::nullopt;
154 return bool(_not_empty);
159requires std::is_nothrow_move_constructible_v<T>
161queue<T>::queue(
size_t size)
166requires std::is_nothrow_move_constructible_v<T>
168void queue<T>::notify_not_empty() noexcept {
170 _not_empty->set_value();
171 _not_empty = std::optional<promise<>>();
176requires std::is_nothrow_move_constructible_v<T>
178void queue<T>::notify_not_full() noexcept {
180 _not_full->set_value();
181 _not_full = std::optional<promise<>>();
186requires std::is_nothrow_move_constructible_v<T>
189 if (_q.size() < _max) {
190 _q.push(std::move(data));
199requires std::is_nothrow_move_constructible_v<T>
207requires std::is_nothrow_move_constructible_v<T>
210 if (_q.size() == _max) {
217 SEASTAR_ASSERT(!_q.empty());
218 T data = std::move(_q.front());
224requires std::is_nothrow_move_constructible_v<T>
229 static_assert(std::is_nothrow_move_constructible_v<T>,
230 "Queue element type must be no-throw move constructible");
233 return make_exception_future<T>(_ex);
236 return not_empty().then([
this] {
238 return make_exception_future<T>(_ex);
240 return make_ready_future<T>(pop());
244 return make_ready_future<T>(pop());
249requires std::is_nothrow_move_constructible_v<T>
253 return make_exception_future<>(_ex);
256 return not_full().then([
this, data = std::move(data)] ()
mutable {
257 _q.push(std::move(data));
262 _q.push(std::move(data));
264 return make_ready_future<>();
272requires std::is_nothrow_move_constructible_v<T>
273template <
typename Func>
277 std::rethrow_exception(_ex);
280 while (!_q.empty() && running) {
281 running = func(std::move(_q.front()));
291requires std::is_nothrow_move_constructible_v<T>
299requires std::is_nothrow_move_constructible_v<T>
303 return _q.size() >= _max;
307requires std::is_nothrow_move_constructible_v<T>
311 return make_exception_future<>(_ex);
314 return make_ready_future<>();
317 return _not_empty->get_future();
322requires std::is_nothrow_move_constructible_v<T>
326 return make_exception_future<>(_ex);
329 return make_ready_future<>();
332 return _not_full->get_future();
A representation of a possibly not-yet-computed value.
Definition: future.hh:1197
size_t size() const noexcept
Returns the number of items currently in the queue.
Definition: queue.hh:110
bool push(T &&a)
Push an item.
Definition: queue.hh:188
future not_full() noexcept
Definition: queue.hh:324
bool full() const noexcept
Returns true when the queue is full.
Definition: queue.hh:301
T pop() noexcept
Pop an item.
Definition: queue.hh:209
bool consume(Func &&func)
Definition: queue.hh:275
future not_empty() noexcept
Definition: queue.hh:309
future< T > pop_eventually() noexcept
Definition: queue.hh:226
size_t max_size() const noexcept
Definition: queue.hh:118
void set_max_size(size_t max) noexcept
Definition: queue.hh:123
T & front() noexcept
access the front element in the queue
Definition: queue.hh:201
void abort(std::exception_ptr ex) noexcept
Definition: queue.hh:132
bool empty() const noexcept
Returns true when the queue is empty.
Definition: queue.hh:293
bool has_blocked_consumer() const noexcept
Check if there is an active consumer.
Definition: queue.hh:153
future push_eventually(T &&data) noexcept
Definition: queue.hh:251
future< T > current_exception_as_future() noexcept
Returns std::current_exception() wrapped in a future.
Definition: future.hh:1888
Seastar API namespace.
Definition: abort_on_ebadf.hh:26