24#include <seastar/core/circular_buffer.hh>
25#include <seastar/core/future.hh>
26#include <seastar/util/modules.hh>
43requires std::is_nothrow_move_constructible_v<T>
45 std::queue<T, circular_buffer<T>> _q;
47 std::optional<promise<>> _not_empty;
48 std::optional<promise<>> _not_full;
49 std::exception_ptr _ex =
nullptr;
51 void notify_not_empty()
noexcept;
52 void notify_not_full()
noexcept;
76 template <typename Func>
83 bool full() const noexcept;
131 void abort(std::exception_ptr ex)
noexcept {
135 while (!_q.empty()) {
140 _not_full->set_exception(ex);
141 _not_full= std::nullopt;
144 _not_empty->set_exception(std::move(ex));
145 _not_empty = std::nullopt;
153 return bool(_not_empty);
158requires std::is_nothrow_move_constructible_v<T>
160queue<T>::queue(
size_t size)
165requires std::is_nothrow_move_constructible_v<T>
167void queue<T>::notify_not_empty() noexcept {
169 _not_empty->set_value();
170 _not_empty = std::optional<promise<>>();
175requires std::is_nothrow_move_constructible_v<T>
177void queue<T>::notify_not_full() noexcept {
179 _not_full->set_value();
180 _not_full = std::optional<promise<>>();
185requires std::is_nothrow_move_constructible_v<T>
188 if (_q.size() < _max) {
189 _q.push(std::move(data));
198requires std::is_nothrow_move_constructible_v<T>
206requires std::is_nothrow_move_constructible_v<T>
209 if (_q.size() == _max) {
217 T data = std::move(_q.front());
223requires std::is_nothrow_move_constructible_v<T>
228 static_assert(std::is_nothrow_move_constructible_v<T>,
229 "Queue element type must be no-throw move constructible");
232 return make_exception_future<T>(_ex);
235 return not_empty().then([
this] {
237 return make_exception_future<T>(_ex);
239 return make_ready_future<T>(pop());
243 return make_ready_future<T>(pop());
248requires std::is_nothrow_move_constructible_v<T>
252 return make_exception_future<>(_ex);
255 return not_full().then([
this, data = std::move(data)] ()
mutable {
256 _q.push(std::move(data));
261 _q.push(std::move(data));
263 return make_ready_future<>();
271requires std::is_nothrow_move_constructible_v<T>
272template <
typename Func>
276 std::rethrow_exception(_ex);
279 while (!_q.empty() && running) {
280 running = func(std::move(_q.front()));
290requires std::is_nothrow_move_constructible_v<T>
298requires std::is_nothrow_move_constructible_v<T>
302 return _q.size() >= _max;
306requires std::is_nothrow_move_constructible_v<T>
310 return make_exception_future<>(_ex);
313 return make_ready_future<>();
316 return _not_empty->get_future();
321requires std::is_nothrow_move_constructible_v<T>
325 return make_exception_future<>(_ex);
328 return make_ready_future<>();
331 return _not_full->get_future();
A representation of a possibly not-yet-computed value.
Definition: future.hh:1240
size_t size() const noexcept
Returns the number of items currently in the queue.
Definition: queue.hh:109
bool push(T &&a)
Push an item.
Definition: queue.hh:187
future not_full() noexcept
Definition: queue.hh:323
bool full() const noexcept
Returns true when the queue is full.
Definition: queue.hh:300
T pop() noexcept
Pop an item.
Definition: queue.hh:208
bool consume(Func &&func)
Definition: queue.hh:274
future not_empty() noexcept
Definition: queue.hh:308
future< T > pop_eventually() noexcept
Definition: queue.hh:225
size_t max_size() const noexcept
Definition: queue.hh:117
void set_max_size(size_t max) noexcept
Definition: queue.hh:122
T & front() noexcept
access the front element in the queue
Definition: queue.hh:200
void abort(std::exception_ptr ex) noexcept
Definition: queue.hh:131
bool empty() const noexcept
Returns true when the queue is empty.
Definition: queue.hh:292
bool has_blocked_consumer() const noexcept
Check if there is an active consumer.
Definition: queue.hh:152
future push_eventually(T &&data) noexcept
Definition: queue.hh:250
future< T > current_exception_as_future() noexcept
Returns std::current_exception() wrapped in a future.
Definition: future.hh:1962
Seastar API namespace.
Definition: abort_on_ebadf.hh:26