Seastar
High performance C++ framework for concurrent servers
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Modules Pages
generator.hh
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18/*
19 * Copyright (C) 2022 Kefu Chai ( tchaikov@gmail.com )
20 */
21
22#pragma once
23
24#include <cassert>
25#include <coroutine>
26#include <optional>
27#include <utility>
28#include <seastar/core/future.hh>
29
30namespace seastar::coroutine::experimental {
31
32template<typename T, template <typename> class Container = std::optional>
34
45enum class buffer_size_t : size_t;
46
47namespace internal {
48
49using std::coroutine_handle;
50using std::suspend_never;
51using std::suspend_always;
52using std::suspend_never;
53using std::noop_coroutine;
54
55template<typename T>
56using next_value_t = std::optional<T>;
57
58template <template <typename> class Container, typename T>
59concept Fifo = requires(Container<T>&& c, T&& value) {
60 // better off returning a reference though, so we can move away from it
61 { c.front() } -> std::same_as<T&>;
62 c.pop_front();
63 c.push_back(value);
64 bool(c.empty());
65 { c.size() } -> std::convertible_to<size_t>;
66};
67
68template<typename T>
69concept NothrowMoveConstructible = std::is_nothrow_move_constructible_v<T>;
70
71template<NothrowMoveConstructible T>
74 std::optional<seastar::promise<>> _wait_for_next_value;
75 generator_type* _generator = nullptr;
76
77public:
81
82 void return_void() noexcept;
83 void unhandled_exception() noexcept;
84
85 template<std::convertible_to<T> U>
86 suspend_always yield_value(U&& value) noexcept {
87 assert(_generator);
88 _generator->put_next_value(std::forward<U>(value));
89 assert(_wait_for_next_value);
90 _wait_for_next_value->set_value();
91 _wait_for_next_value = {};
92 return {};
93 }
94
95 generator_type get_return_object() noexcept;
96 void set_generator(generator_type* g) noexcept {
97 assert(!_generator);
98 _generator = g;
99 }
100
101 suspend_always initial_suspend() const noexcept { return {}; }
102 suspend_never final_suspend() const noexcept {
103 assert(_generator);
104 _generator->on_finished();
105 return {};
106 }
107
108 seastar::future<> wait_for_next_value() noexcept {
109 assert(!_wait_for_next_value);
110 return _wait_for_next_value.emplace().get_future();
111 }
112
113 void run_and_dispose() noexcept final {
114 using handle_type = coroutine_handle<generator_unbuffered_promise>;
115 handle_type::from_promise(*this).resume();
116 }
117
118 seastar::task* waiting_task() noexcept final {
119 if (_wait_for_next_value) {
120 return _wait_for_next_value->waiting_task();
121 } else {
122 return nullptr;
123 }
124 }
125};
126
127template <NothrowMoveConstructible T, template <typename> class Container>
128requires Fifo<Container, T>
129class generator_buffered_promise;
130
131template<typename T, template <typename> class Container>
132struct yield_awaiter final {
134 seastar::future<> _future;
135
136public:
137 yield_awaiter(seastar::future<>&& f) noexcept
138 : _future{std::move(f)} {}
139
140 bool await_ready() noexcept {
141 return _future.available();
142 }
143
144 coroutine_handle<> await_suspend(coroutine_handle<promise_type> coro) noexcept;
145 void await_resume() noexcept { }
146};
147
148
149template<NothrowMoveConstructible T, template <typename> class Container>
150requires Fifo<Container, T>
153
154 std::optional<seastar::promise<>> _wait_for_next_value;
155 std::optional<seastar::promise<>> _wait_for_free_space;
156 generator_type* _generator = nullptr;
157 const size_t _buffer_capacity;
158
159public:
160 template<typename... Args>
161 generator_buffered_promise(buffer_size_t buffer_capacity, Args&&... args)
162 : _buffer_capacity{static_cast<size_t>(buffer_capacity)} {}
165 ~generator_buffered_promise() = default;
166 void return_void() noexcept {
167 if (_wait_for_next_value) {
168 _wait_for_next_value->set_value();
169 _wait_for_next_value = {};
170 }
171 }
172 void unhandled_exception() noexcept;
173
174 template<std::convertible_to<T> U>
175 yield_awaiter<T, Container> yield_value(U&& value) noexcept {
176 bool ready = _generator->put_next_value(std::forward<U>(value));
177
178 if (_wait_for_next_value) {
179 _wait_for_next_value->set_value();
180 _wait_for_next_value = {};
181 return {make_ready_future()};
182 }
183 if (ready) {
184 return {make_ready_future()};
185 } else {
186 assert(!_wait_for_free_space);
187 return {_wait_for_free_space.emplace().get_future()};
188 }
189 }
190
191 auto get_return_object() noexcept -> generator_type;
192 void set_generator(generator_type* g) noexcept {
193 assert(!_generator);
194 _generator = g;
195 }
196
197 suspend_always initial_suspend() const noexcept { return {}; }
198 suspend_never final_suspend() const noexcept {
199 assert(_generator);
200 _generator->on_finished();
201 return {};
202 }
203
204 bool is_awaiting() const noexcept {
205 return _wait_for_next_value.has_value();
206 }
207
208 coroutine_handle<> coroutine() const noexcept {
209 return coroutine_handle<>::from_address(_wait_for_next_value->waiting_task());
210 }
211
212 seastar::future<> wait_for_next_value() noexcept {
213 assert(!_wait_for_next_value);
214 return _wait_for_next_value.emplace().get_future();
215 }
216
217 void on_reclaim_free_space() noexcept {
218 assert(_wait_for_free_space);
219 _wait_for_free_space->set_value();
220 _wait_for_free_space = {};
221 }
222
223private:
224 void run_and_dispose() noexcept final {
225 using handle_type = coroutine_handle<generator_buffered_promise>;
226 handle_type::from_promise(*this).resume();
227 }
228
229 seastar::task* waiting_task() noexcept final {
230 if (_wait_for_next_value) {
231 return _wait_for_next_value->waiting_task();
232 } else if (_wait_for_free_space) {
233 return _wait_for_free_space->waiting_task();
234 } else {
235 return nullptr;
236 }
237 }
238};
239
240template<typename T, typename Generator>
241struct next_awaiter final {
242 using next_value_type = next_value_t<T>;
243 Generator* const _generator;
244 seastar::task* const _task;
245 seastar::future<> _next_value_future;
246
247public:
248 next_awaiter(Generator* generator,
250 seastar::future<>&& f) noexcept
251 : _generator{generator}
252 , _task(task)
253 , _next_value_future(std::move(f)) {}
254
255 next_awaiter(const next_awaiter&) = delete;
256 next_awaiter(next_awaiter&&) = delete;
257
258 constexpr bool await_ready() const noexcept {
259 return _next_value_future.available() && !seastar::need_preempt();
260 }
261
262 template<typename Promise>
263 void await_suspend(coroutine_handle<Promise> coro) noexcept {
264 auto& current_task = coro.promise();
265 if (_next_value_future.available()) {
266 seastar::schedule(&current_task);
267 } else {
268 _next_value_future.set_coroutine(current_task);
269 seastar::schedule(_task);
270 }
271 }
272
273 next_value_type await_resume() {
274 assert(_next_value_future.available());
275 assert(_generator);
276 return _generator->take_next_value();
277 }
278};
279
280} // namespace internal
281
312template <typename T, template <typename> class Container>
313class generator {
314public:
316
317private:
318 using handle_type = internal::coroutine_handle<promise_type>;
319 handle_type _coro;
320 promise_type* _promise;
321 Container<T> _values;
322 const size_t _buffer_capacity;
323 std::exception_ptr _exception;
324
325public:
326 generator(size_t buffer_capacity,
327 handle_type coro,
328 promise_type* promise) noexcept
329 : _coro{coro}
330 , _promise{promise}
331 , _buffer_capacity{buffer_capacity} {
332 assert(_promise);
333 _promise->set_generator(this);
334 }
335 generator(const generator&) = delete;
336 generator(generator&& other) noexcept
337 : _coro{std::exchange(other._coro, {})}
338 , _buffer_capacity{other._buffer_capacity} {}
339 generator& operator=(generator&& other) noexcept {
340 if (std::addressof(other) != this) {
341 auto old_coro = std::exchange(_coro, std::exchange(other._coro, {}));
342 if (old_coro) {
343 old_coro.destroy();
344 }
345 }
346 return *this;
347 }
348 ~generator() {
349 if (_coro) {
350 _coro.destroy();
351 }
352 }
353
354 void swap(generator& other) noexcept {
355 std::swap(_coro, other._coro);
356 }
357
358 internal::next_awaiter<T, generator> operator()() noexcept {
359 if (!_values.empty()) {
360 return {this, nullptr, make_ready_future<>()};
361 } else if (_exception) [[unlikely]] {
362 return {this, nullptr, make_ready_future<>()};
363 } else if (_promise) {
364 return {this, _promise, _promise->wait_for_next_value()};
365 } else {
366 return {this, nullptr, make_ready_future<>()};
367 }
368 }
369
370 template<typename U>
371 bool put_next_value(U&& value) {
372 _values.push_back(std::forward<U>(value));
373 return _values.size() < _buffer_capacity;
374 }
375
376 internal::next_value_t<T> take_next_value() {
377 if (!_values.empty()) [[likely]] {
378 auto value = std::move(_values.front());
379 bool maybe_reclaim = _values.size() == _buffer_capacity;
380 _values.pop_front();
381 if (maybe_reclaim) {
382 if (_promise) [[likely]] {
383 _promise->on_reclaim_free_space();
384 }
385 }
386 return internal::next_value_t<T>(std::move(value));
387 } else if (_exception) [[unlikely]] {
388 std::rethrow_exception(std::exchange(_exception, nullptr));
389 } else {
390 return std::nullopt;
391 }
392 }
393
394 void on_finished() {
395 _promise = nullptr;
396 _coro = nullptr;
397 }
398
399 void unhandled_exception() noexcept {
400 // called by promise's unhandled_exception()
401 assert(!_exception);
402 _exception = std::current_exception();
403 }
404};
405
406template <typename T>
407class generator<T, std::optional> {
408public:
409 using promise_type = internal::generator_unbuffered_promise<T>;
410
411private:
412 using handle_type = internal::coroutine_handle<promise_type>;
413 handle_type _coro;
414 promise_type* _promise;
415 std::optional<T> _maybe_value;
416 std::exception_ptr _exception;
417
418public:
419 generator(handle_type coro,
420 promise_type* promise) noexcept
421 : _coro{coro}
422 , _promise{promise} {
423 assert(_promise);
424 _promise->set_generator(this);
425 }
426 generator(const generator&) = delete;
427 generator(generator&& other) noexcept
428 : _coro{std::exchange(other._coro, {})} {}
429 generator& operator=(generator&& other) noexcept {
430 if (std::addressof(other) != this) {
431 auto old_coro = std::exchange(_coro, std::exchange(other._coro, {}));
432 if (old_coro) {
433 old_coro.destroy();
434 }
435 }
436 return *this;
437 }
438 ~generator() {
439 if (_coro) {
440 _coro.destroy();
441 }
442 }
443
444 void swap(generator& other) noexcept {
445 std::swap(_coro, other._coro);
446 }
447
448 internal::next_awaiter<T, generator> operator()() noexcept {
449 if (_promise) [[likely]] {
450 return {this, _promise, _promise->wait_for_next_value()};
451 } else {
452 return {this, nullptr, make_ready_future<>()};
453 }
454 }
455
456 template<typename U>
457 void put_next_value(U&& value) noexcept {
458 _maybe_value.emplace(std::forward<U>(value));
459 }
460
461 internal::next_value_t<T> take_next_value() {
462 if (_maybe_value.has_value()) [[likely]] {
463 return std::exchange(_maybe_value, std::nullopt);
464 } else if (_exception) [[unlikely]] {
465 std::rethrow_exception(std::exchange(_exception, nullptr));
466 } else {
467 return std::nullopt;
468 }
469 }
470
471 void on_finished() {
472 _promise = nullptr;
473 _coro = nullptr;
474 }
475
476 void unhandled_exception() noexcept {
477 assert(!_exception);
478 _exception = std::current_exception();
479 }
480};
481
482namespace internal {
483
484template<NothrowMoveConstructible T>
485void generator_unbuffered_promise<T>::return_void() noexcept {
486 assert(_wait_for_next_value);
487 _wait_for_next_value->set_value();
488 _wait_for_next_value = {};
489}
490
491template<NothrowMoveConstructible T>
492void generator_unbuffered_promise<T>::unhandled_exception() noexcept {
493 // instead of storing the current exception into promise, in order to be
494 // more consistent, we let generator preserve all the output of produced
495 // value, including the values and the exception if any. so we just signal
496 // _wait_for_next_value, and delegate generator's unhandled_exception() to
497 // store the exception.
498 _generator->unhandled_exception();
499 if (_wait_for_next_value.has_value()) {
500 _wait_for_next_value->set_value();
501 _wait_for_next_value = {};
502 }
503}
504
505template<NothrowMoveConstructible T>
506auto generator_unbuffered_promise<T>::get_return_object() noexcept -> generator_type {
507 using handle_type = coroutine_handle<generator_unbuffered_promise<T>>;
508 return generator_type{handle_type::from_promise(*this), this};
509}
510
511template<NothrowMoveConstructible T, template <typename> class Container>
512requires Fifo<Container, T>
513void generator_buffered_promise<T, Container>::unhandled_exception() noexcept {
514 _generator->unhandled_exception();
515 if (_wait_for_next_value.has_value()) {
516 _wait_for_next_value->set_value();
517 _wait_for_next_value = {};
518 }
519}
520
521template<NothrowMoveConstructible T, template <typename> class Container>
522requires Fifo<Container, T>
523auto generator_buffered_promise<T, Container>::get_return_object() noexcept -> generator_type {
524 using handle_type = coroutine_handle<generator_buffered_promise<T, Container>>;
525 return generator_type{_buffer_capacity, handle_type::from_promise(*this), this};
526}
527
528template<typename T, template <typename> class Container>
529coroutine_handle<> yield_awaiter<T, Container>::await_suspend(
530 coroutine_handle<generator_buffered_promise<T, Container>> coro) noexcept {
531 if (_future.available()) {
532 auto& current_task = coro.promise();
533 seastar::schedule(&current_task);
534 return coro;
535 } else {
536 // we cannot do something like `task.set_coroutine(consumer_task)`.
537 // because, instead of waiting for a subcoroutine, we are pending on
538 // the caller of current coroutine to consume the produced values to
539 // free up at least a free slot in the buffer, if we set the `_task`
540 // of the of the awaiting task, we would have an infinite loop of
541 // "promise->_task".
542 return noop_coroutine();
543 }
544}
545
546} // namespace internal
547} // namespace seastar::coroutine::experimental
seastar::task * waiting_task() noexcept final
Returns the next task which is waiting for this task to complete execution, or nullptr.
Definition: generator.hh:118
A representation of a possibly not-yet-computed value.
Definition: future.hh:1240
bool available() const noexcept
Checks whether the future is available.
Definition: future.hh:1394
promise - allows a future value to be made available at a later time.
Definition: future.hh:934
Definition: task.hh:34
virtual task * waiting_task() noexcept=0
Returns the next task which is waiting for this task to complete execution, or nullptr.
future< T > make_ready_future(A &&... value) noexcept
Creates a future in an available, value state.
Definition: future.hh:1943
STL namespace.