24 #include <seastar/core/circular_buffer.hh>
25 #include <seastar/core/future.hh>
27 #include <seastar/util/std-compat.hh>
36 std::queue<T, circular_buffer<T>> _q;
38 std::optional<promise<>> _not_empty;
39 std::optional<promise<>> _not_full;
40 std::exception_ptr _ex =
nullptr;
42 void notify_not_empty();
43 void notify_not_full();
61 template <
typename Func>
94 size_t size()
const {
return _q.size(); }
114 while (!_q.empty()) {
119 _not_full->set_exception(ex);
120 _not_full= std::nullopt;
123 _not_empty->set_exception(std::move(ex));
124 _not_empty = std::nullopt;
132 return bool(_not_empty);
136 template <
typename T>
138 queue<T>::queue(
size_t size)
142 template <
typename T>
144 void queue<T>::notify_not_empty() {
146 _not_empty->set_value();
147 _not_empty = std::optional<promise<>>();
151 template <
typename T>
153 void queue<T>::notify_not_full() {
155 _not_full->set_value();
156 _not_full = std::optional<promise<>>();
160 template <
typename T>
163 if (_q.size() < _max) {
164 _q.
push(std::move(data));
172 template <
typename T>
175 if (_q.size() == _max) {
178 T data = std::move(_q.front());
183 template <
typename T>
187 return make_exception_future<T>(_ex);
190 return not_empty().then([
this] {
192 return make_exception_future<T>(_ex);
194 return make_ready_future<T>(pop());
198 return make_ready_future<T>(pop());
202 template <
typename T>
206 return make_exception_future<>(_ex);
209 return not_full().then([
this, data = std::move(data)] ()
mutable {
210 _q.push(std::move(data));
214 _q.push(std::move(data));
216 return make_ready_future<>();
220 template <
typename T>
221 template <
typename Func>
225 std::rethrow_exception(_ex);
228 while (!_q.empty() && running) {
229 running = func(std::move(_q.front()));
238 template <
typename T>
244 template <
typename T>
247 return _q.
size() >= _max;
250 template <
typename T>
254 return make_exception_future<>(_ex);
257 return make_ready_future<>();
260 return _not_empty->get_future();
264 template <
typename T>
268 return make_exception_future<>(_ex);
271 return make_ready_future<>();
274 return _not_full->get_future();