24#include <seastar/core/circular_buffer.hh>
25#include <seastar/core/future.hh>
26#include <seastar/util/assert.hh>
27#include <seastar/util/modules.hh>
44requires std::is_nothrow_move_constructible_v<T>
46 std::queue<T, circular_buffer<T>> _q;
48 std::optional<promise<>> _not_empty;
49 std::optional<promise<>> _not_full;
50 std::exception_ptr _ex =
nullptr;
52 void notify_not_empty()
noexcept;
53 void notify_not_full()
noexcept;
77 template <typename Func>
84 bool full() const noexcept;
132 void abort(std::exception_ptr ex)
noexcept {
136 while (!_q.empty()) {
141 _not_full->set_exception(ex);
142 _not_full= std::nullopt;
145 _not_empty->set_exception(std::move(ex));
146 _not_empty = std::nullopt;
154 return bool(_not_empty);
159requires std::is_nothrow_move_constructible_v<T>
161queue<T>::queue(
size_t size)
166requires std::is_nothrow_move_constructible_v<T>
168void queue<T>::notify_not_empty() noexcept {
170 _not_empty->set_value();
171 _not_empty = std::optional<promise<>>();
176requires std::is_nothrow_move_constructible_v<T>
178void queue<T>::notify_not_full() noexcept {
180 _not_full->set_value();
181 _not_full = std::optional<promise<>>();
186requires std::is_nothrow_move_constructible_v<T>
189 if (_q.size() < _max) {
190 _q.push(std::move(data));
199requires std::is_nothrow_move_constructible_v<T>
207requires std::is_nothrow_move_constructible_v<T>
210 if (_q.size() == _max) {
217 SEASTAR_ASSERT(!_q.empty());
218 T data = std::move(_q.front());
226requires std::is_nothrow_move_constructible_v<T>
230 return make_exception_future<T>(_ex);
233 return not_empty().then([
this] {
235 return make_exception_future<T>(_ex);
237 return make_ready_future<T>(pop());
241 return make_ready_future<T>(pop());
246requires std::is_nothrow_move_constructible_v<T>
250 return make_exception_future<>(_ex);
253 return not_full().then([
this, data = std::move(data)] ()
mutable {
254 _q.push(std::move(data));
259 _q.push(std::move(data));
261 return make_ready_future<>();
269requires std::is_nothrow_move_constructible_v<T>
270template <
typename Func>
274 std::rethrow_exception(_ex);
277 while (!_q.empty() && running) {
278 running = func(std::move(_q.front()));
288requires std::is_nothrow_move_constructible_v<T>
296requires std::is_nothrow_move_constructible_v<T>
300 return _q.size() >= _max;
304requires std::is_nothrow_move_constructible_v<T>
308 return make_exception_future<>(_ex);
311 return make_ready_future<>();
314 return _not_empty->get_future();
319requires std::is_nothrow_move_constructible_v<T>
323 return make_exception_future<>(_ex);
326 return make_ready_future<>();
329 return _not_full->get_future();
A representation of a possibly not-yet-computed value.
Definition: future.hh:1199
size_t size() const noexcept
Returns the number of items currently in the queue.
Definition: queue.hh:110
bool push(T &&a)
Push an item.
Definition: queue.hh:188
future not_full() noexcept
Definition: queue.hh:321
bool full() const noexcept
Returns true when the queue is full.
Definition: queue.hh:298
T pop() noexcept
Pop an item.
Definition: queue.hh:209
bool consume(Func &&func)
Definition: queue.hh:272
future not_empty() noexcept
Definition: queue.hh:306
future< T > pop_eventually() noexcept
Definition: queue.hh:228
size_t max_size() const noexcept
Definition: queue.hh:118
void set_max_size(size_t max) noexcept
Definition: queue.hh:123
T & front() noexcept
access the front element in the queue
Definition: queue.hh:201
void abort(std::exception_ptr ex) noexcept
Definition: queue.hh:132
bool empty() const noexcept
Returns true when the queue is empty.
Definition: queue.hh:290
bool has_blocked_consumer() const noexcept
Check if there is an active consumer.
Definition: queue.hh:153
future push_eventually(T &&data) noexcept
Definition: queue.hh:248
future< T > current_exception_as_future() noexcept
Returns std::current_exception() wrapped in a future.
Definition: future.hh:1932
Seastar API namespace.
Definition: abort_on_ebadf.hh:26