28#include <seastar/core/future.hh>
30namespace seastar::coroutine::experimental {
32template<
typename T,
template <
typename>
class Container = std::optional>
45enum class buffer_size_t : size_t;
49using std::coroutine_handle;
50using std::suspend_never;
51using std::suspend_always;
52using std::suspend_never;
53using std::noop_coroutine;
56using next_value_t = std::optional<T>;
58template <
template <
typename>
class Container,
typename T>
59concept Fifo =
requires(Container<T>&& c, T&& value) {
61 { c.front() } -> std::same_as<T&>;
65 { c.size() } -> std::convertible_to<size_t>;
69concept NothrowMoveConstructible = std::is_nothrow_move_constructible_v<T>;
71template<NothrowMoveConstructible T>
74 std::optional<seastar::promise<>> _wait_for_next_value;
82 void return_void()
noexcept;
83 void unhandled_exception()
noexcept;
85 template<std::convertible_to<T> U>
86 suspend_always yield_value(U&& value)
noexcept {
88 _generator->put_next_value(std::forward<U>(value));
89 assert(_wait_for_next_value);
90 _wait_for_next_value->set_value();
91 _wait_for_next_value = {};
101 suspend_always initial_suspend()
const noexcept {
return {}; }
102 suspend_never final_suspend()
const noexcept {
104 _generator->on_finished();
109 assert(!_wait_for_next_value);
110 return _wait_for_next_value.emplace().get_future();
113 void run_and_dispose()
noexcept final {
114 using handle_type = coroutine_handle<generator_unbuffered_promise>;
115 handle_type::from_promise(*this).resume();
119 if (_wait_for_next_value) {
127template <NothrowMoveConstructible T,
template <
typename>
class Container>
128requires Fifo<Container, T>
129class generator_buffered_promise;
131template<
typename T,
template <
typename>
class Container>
138 : _future{std::move(f)} {}
140 bool await_ready()
noexcept {
144 coroutine_handle<> await_suspend(coroutine_handle<promise_type> coro)
noexcept;
145 void await_resume()
noexcept { }
149template<NothrowMoveConstructible T,
template <
typename>
class Container>
150requires Fifo<Container, T>
154 std::optional<seastar::promise<>> _wait_for_next_value;
155 std::optional<seastar::promise<>> _wait_for_free_space;
157 const size_t _buffer_capacity;
160 template<
typename... Args>
162 : _buffer_capacity{
static_cast<size_t>(buffer_capacity)} {}
166 void return_void()
noexcept {
167 if (_wait_for_next_value) {
168 _wait_for_next_value->set_value();
169 _wait_for_next_value = {};
172 void unhandled_exception()
noexcept;
174 template<std::convertible_to<T> U>
176 bool ready = _generator->put_next_value(std::forward<U>(value));
178 if (_wait_for_next_value) {
179 _wait_for_next_value->set_value();
180 _wait_for_next_value = {};
186 assert(!_wait_for_free_space);
187 return {_wait_for_free_space.emplace().get_future()};
197 suspend_always initial_suspend()
const noexcept {
return {}; }
198 suspend_never final_suspend()
const noexcept {
200 _generator->on_finished();
204 bool is_awaiting()
const noexcept {
205 return _wait_for_next_value.has_value();
208 coroutine_handle<> coroutine()
const noexcept {
209 return coroutine_handle<>::from_address(_wait_for_next_value->waiting_task());
213 assert(!_wait_for_next_value);
214 return _wait_for_next_value.emplace().get_future();
217 void on_reclaim_free_space()
noexcept {
218 assert(_wait_for_free_space);
219 _wait_for_free_space->set_value();
220 _wait_for_free_space = {};
224 void run_and_dispose()
noexcept final {
225 using handle_type = coroutine_handle<generator_buffered_promise>;
226 handle_type::from_promise(*this).resume();
230 if (_wait_for_next_value) {
232 }
else if (_wait_for_free_space) {
240template<
typename T,
typename Generator>
242 using next_value_type = next_value_t<T>;
243 Generator*
const _generator;
253 , _next_value_future(std::move(f)) {}
258 constexpr bool await_ready()
const noexcept {
259 return _next_value_future.
available() && !seastar::need_preempt();
262 template<
typename Promise>
263 void await_suspend(coroutine_handle<Promise> coro)
noexcept {
264 auto& current_task = coro.promise();
266 seastar::schedule(¤t_task);
268 _next_value_future.set_coroutine(current_task);
269 seastar::schedule(_task);
273 next_value_type await_resume() {
276 return _generator->take_next_value();
312template <
typename T,
template <
typename>
class Container>
318 using handle_type = internal::coroutine_handle<promise_type>;
320 promise_type* _promise;
321 Container<T> _values;
322 const size_t _buffer_capacity;
323 std::exception_ptr _exception;
328 promise_type*
promise) noexcept
331 , _buffer_capacity{buffer_capacity} {
333 _promise->set_generator(
this);
335 generator(
const generator&) =
delete;
336 generator(generator&& other) noexcept
337 : _coro{std::exchange(other._coro, {})}
338 , _buffer_capacity{other._buffer_capacity} {}
339 generator& operator=(generator&& other)
noexcept {
340 if (std::addressof(other) !=
this) {
341 auto old_coro = std::exchange(_coro, std::exchange(other._coro, {}));
354 void swap(generator& other)
noexcept {
355 std::swap(_coro, other._coro);
358 internal::next_awaiter<T, generator> operator()() noexcept {
359 if (!_values.empty()) {
360 return {
this,
nullptr, make_ready_future<>()};
361 }
else if (_exception) [[unlikely]] {
362 return {
this,
nullptr, make_ready_future<>()};
363 }
else if (_promise) {
364 return {
this, _promise, _promise->wait_for_next_value()};
366 return {
this,
nullptr, make_ready_future<>()};
371 bool put_next_value(U&& value) {
372 _values.push_back(std::forward<U>(value));
373 return _values.size() < _buffer_capacity;
376 internal::next_value_t<T> take_next_value() {
377 if (!_values.empty()) [[likely]] {
378 auto value = std::move(_values.front());
379 bool maybe_reclaim = _values.size() == _buffer_capacity;
382 if (_promise) [[likely]] {
383 _promise->on_reclaim_free_space();
386 return internal::next_value_t<T>(std::move(value));
387 }
else if (_exception) [[unlikely]] {
388 std::rethrow_exception(std::exchange(_exception,
nullptr));
399 void unhandled_exception() noexcept {
402 _exception = std::current_exception();
407class generator<T,
std::optional> {
409 using promise_type = internal::generator_unbuffered_promise<T>;
412 using handle_type = internal::coroutine_handle<promise_type>;
414 promise_type* _promise;
415 std::optional<T> _maybe_value;
416 std::exception_ptr _exception;
419 generator(handle_type coro,
420 promise_type* promise) noexcept
422 , _promise{promise} {
424 _promise->set_generator(
this);
426 generator(
const generator&) =
delete;
427 generator(generator&& other) noexcept
428 : _coro{std::exchange(other._coro, {})} {}
429 generator& operator=(generator&& other)
noexcept {
430 if (std::addressof(other) !=
this) {
431 auto old_coro = std::exchange(_coro, std::exchange(other._coro, {}));
444 void swap(generator& other)
noexcept {
445 std::swap(_coro, other._coro);
448 internal::next_awaiter<T, generator> operator()() noexcept {
449 if (_promise) [[likely]] {
450 return {
this, _promise, _promise->wait_for_next_value()};
452 return {
this,
nullptr, make_ready_future<>()};
457 void put_next_value(U&& value)
noexcept {
458 _maybe_value.emplace(std::forward<U>(value));
461 internal::next_value_t<T> take_next_value() {
462 if (_maybe_value.has_value()) [[likely]] {
463 return std::exchange(_maybe_value, std::nullopt);
464 }
else if (_exception) [[unlikely]] {
465 std::rethrow_exception(std::exchange(_exception,
nullptr));
476 void unhandled_exception() noexcept {
478 _exception = std::current_exception();
484template<NothrowMoveConstructible T>
485void generator_unbuffered_promise<T>::return_void() noexcept {
486 assert(_wait_for_next_value);
487 _wait_for_next_value->set_value();
488 _wait_for_next_value = {};
491template<NothrowMoveConstructible T>
492void generator_unbuffered_promise<T>::unhandled_exception() noexcept {
498 _generator->unhandled_exception();
499 if (_wait_for_next_value.has_value()) {
500 _wait_for_next_value->set_value();
501 _wait_for_next_value = {};
505template<NothrowMoveConstructible T>
506auto generator_unbuffered_promise<T>::get_return_object() noexcept -> generator_type {
507 using handle_type = coroutine_handle<generator_unbuffered_promise<T>>;
508 return generator_type{handle_type::from_promise(*
this),
this};
511template<NothrowMoveConstructible T,
template <
typename>
class Container>
512requires Fifo<Container, T>
513void generator_buffered_promise<T, Container>::unhandled_exception() noexcept {
514 _generator->unhandled_exception();
515 if (_wait_for_next_value.has_value()) {
516 _wait_for_next_value->set_value();
517 _wait_for_next_value = {};
521template<NothrowMoveConstructible T,
template <
typename>
class Container>
522requires Fifo<Container, T>
523auto generator_buffered_promise<T, Container>::get_return_object() noexcept -> generator_type {
524 using handle_type = coroutine_handle<generator_buffered_promise<T, Container>>;
525 return generator_type{_buffer_capacity, handle_type::from_promise(*
this),
this};
528template<
typename T,
template <
typename>
class Container>
529coroutine_handle<> yield_awaiter<T, Container>::await_suspend(
530 coroutine_handle<generator_buffered_promise<T, Container>> coro)
noexcept {
531 if (_future.available()) {
532 auto& current_task = coro.promise();
533 seastar::schedule(¤t_task);
542 return noop_coroutine();
Definition: generator.hh:33
Definition: generator.hh:151
Definition: generator.hh:72
seastar::task * waiting_task() noexcept final
Returns the next task which is waiting for this task to complete execution, or nullptr.
Definition: generator.hh:118
A representation of a possibly not-yet-computed value.
Definition: future.hh:1240
bool available() const noexcept
Checks whether the future is available.
Definition: future.hh:1394
promise - allows a future value to be made available at a later time.
Definition: future.hh:934
virtual task * waiting_task() noexcept=0
Returns the next task which is waiting for this task to complete execution, or nullptr.
future< T > make_ready_future(A &&... value) noexcept
Creates a future in an available, value state.
Definition: future.hh:1943
Definition: generator.hh:241
Definition: generator.hh:132