24 #include <seastar/core/circular_buffer.hh>
25 #include <seastar/core/future.hh>
26 #include <seastar/util/std-compat.hh>
27 #include <seastar/util/modules.hh>
28 #ifndef SEASTAR_MODULE
43 SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<T>)
45 std::queue<T, circular_buffer<T>> _q;
47 std::optional<promise<>> _not_empty;
48 std::optional<promise<>> _not_full;
49 std::exception_ptr _ex =
nullptr;
51 void notify_not_empty() noexcept;
52 void notify_not_full() noexcept;
54 explicit queue(
size_t size);
76 template <typename Func>
77 bool consume(Func&& func);
80 bool empty() const noexcept;
83 bool full() const noexcept;
106 future<> push_eventually(T&& data) noexcept;
109 size_t size() const noexcept {
131 void abort(std::exception_ptr ex) noexcept {
135 while (!_q.empty()) {
140 _not_full->set_exception(ex);
141 _not_full= std::nullopt;
144 _not_empty->set_exception(std::move(ex));
145 _not_empty = std::nullopt;
153 return bool(_not_empty);
157 template <
typename T>
158 SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<T>)
160 queue<T>::queue(
size_t size)
164 template <
typename T>
165 SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<T>)
167 void queue<T>::notify_not_empty() noexcept {
169 _not_empty->set_value();
170 _not_empty = std::optional<promise<>>();
174 template <
typename T>
175 SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<T>)
177 void queue<T>::notify_not_full() noexcept {
179 _not_full->set_value();
180 _not_full = std::optional<promise<>>();
184 template <
typename T>
185 SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<T>)
188 if (_q.size() < _max) {
189 _q.push(std::move(data));
197 template <
typename T>
198 SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<T>)
205 template <
typename T>
206 SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<T>)
209 if (_q.size() == _max) {
217 T data = std::move(_q.front());
222 template <
typename T>
223 SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<T>)
228 static_assert(std::is_nothrow_move_constructible_v<T>,
229 "Queue element type must be no-throw move constructible");
232 return make_exception_future<T>(_ex);
235 return not_empty().then([
this] {
237 return make_exception_future<T>(_ex);
239 return make_ready_future<T>(pop());
243 return make_ready_future<T>(pop());
247 template <
typename T>
248 SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<T>)
252 return make_exception_future<>(_ex);
255 return not_full().then([
this, data = std::move(data)] ()
mutable {
256 _q.push(std::move(data));
261 _q.push(std::move(data));
263 return make_ready_future<>();
270 template <
typename T>
271 SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<T>)
272 template <
typename Func>
276 std::rethrow_exception(_ex);
279 while (!_q.empty() && running) {
280 running = func(std::move(_q.front()));
289 template <
typename T>
290 SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<T>)
297 template <
typename T>
298 SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<T>)
302 return _q.size() >= _max;
305 template <
typename T>
306 SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<T>)
310 return make_exception_future<>(_ex);
313 return make_ready_future<>();
316 return _not_empty->get_future();
320 template <
typename T>
321 SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<T>)
325 return make_exception_future<>(_ex);
328 return make_ready_future<>();
331 return _not_full->get_future();
A representation of a possibly not-yet-computed value.
Definition: future.hh:1238
T & front() noexcept
access the front element in the queue
Definition: queue.hh:200
size_t max_size() const noexcept
Definition: queue.hh:117
future push_eventually(T &&data) noexcept
Definition: queue.hh:250
bool full() const noexcept
Returns true when the queue is full.
Definition: queue.hh:300
future< T > pop_eventually() noexcept
Definition: queue.hh:225
void set_max_size(size_t max) noexcept
Definition: queue.hh:122
void abort(std::exception_ptr ex) noexcept
Definition: queue.hh:131
bool consume(Func &&func)
Definition: queue.hh:274
T pop() noexcept
Pop an item.
Definition: queue.hh:208
future not_empty() noexcept
Definition: queue.hh:308
bool has_blocked_consumer() const noexcept
Check if there is an active consumer.
Definition: queue.hh:152
bool empty() const noexcept
Returns true when the queue is empty.
Definition: queue.hh:292
bool push(T &&a)
Push an item.
Definition: queue.hh:187
future not_full() noexcept
Definition: queue.hh:323
future< T > current_exception_as_future() noexcept
Returns std::current_exception() wrapped in a future.
Definition: future.hh:1953
Seastar API namespace.
Definition: abort_on_ebadf.hh:26