Seastar
High performance C++ framework for concurrent servers
generator.hh
1 /*
2  * This file is open source software, licensed to you under the terms
3  * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4  * distributed with this work for additional information regarding copyright
5  * ownership. You may not use this file except in compliance with the License.
6  *
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an
13  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14  * KIND, either express or implied. See the License for the
15  * specific language governing permissions and limitations
16  * under the License.
17  */
18 /*
19  * Copyright (C) 2022 Kefu Chai ( tchaikov@gmail.com )
20  */
21 
22 #pragma once
23 
24 #include <cassert>
25 #include <coroutine>
26 #include <optional>
27 #include <utility>
28 #include <seastar/core/future.hh>
29 
30 namespace seastar::coroutine::experimental {
31 
32 template<typename T, template <typename> class Container = std::optional>
33 class generator;
34 
45 enum class buffer_size_t : size_t;
46 
47 namespace internal {
48 
49 using std::coroutine_handle;
50 using std::suspend_never;
51 using std::suspend_always;
52 using std::suspend_never;
53 using std::noop_coroutine;
54 
55 template<typename T>
56 using next_value_t = std::optional<T>;
57 
58 template <template <typename> class Container, typename T>
59 concept Fifo = requires(Container<T>&& c, T&& value) {
60  // better off returning a reference though, so we can move away from it
61  { c.front() } -> std::same_as<T&>;
62  c.pop_front();
63  c.push_back(value);
64  bool(c.empty());
65  { c.size() } -> std::convertible_to<size_t>;
66 };
67 
68 template<typename T>
69 concept NothrowMoveConstructible = std::is_nothrow_move_constructible_v<T>;
70 
71 template<NothrowMoveConstructible T>
74  std::optional<seastar::promise<>> _wait_for_next_value;
75  generator_type* _generator = nullptr;
76 
77 public:
78  generator_unbuffered_promise() = default;
81 
82  void return_void() noexcept;
83  void unhandled_exception() noexcept;
84 
85  template<std::convertible_to<T> U>
86  suspend_always yield_value(U&& value) noexcept {
87  assert(_generator);
88  _generator->put_next_value(std::forward<U>(value));
89  assert(_wait_for_next_value);
90  _wait_for_next_value->set_value();
91  _wait_for_next_value = {};
92  return {};
93  }
94 
95  generator_type get_return_object() noexcept;
96  void set_generator(generator_type* g) noexcept {
97  assert(!_generator);
98  _generator = g;
99  }
100 
101  suspend_always initial_suspend() const noexcept { return {}; }
102  suspend_never final_suspend() const noexcept {
103  assert(_generator);
104  _generator->on_finished();
105  return {};
106  }
107 
108  seastar::future<> wait_for_next_value() noexcept {
109  assert(!_wait_for_next_value);
110  return _wait_for_next_value.emplace().get_future();
111  }
112 
113  void run_and_dispose() noexcept final {
114  using handle_type = coroutine_handle<generator_unbuffered_promise>;
115  handle_type::from_promise(*this).resume();
116  }
117 
118  seastar::task* waiting_task() noexcept final {
119  if (_wait_for_next_value) {
120  return _wait_for_next_value->waiting_task();
121  } else {
122  return nullptr;
123  }
124  }
125 };
126 
127 template <NothrowMoveConstructible T, template <typename> class Container>
128 requires Fifo<Container, T>
129 class generator_buffered_promise;
130 
131 template<typename T, template <typename> class Container>
132 struct yield_awaiter final {
134  seastar::future<> _future;
135 
136 public:
137  yield_awaiter(seastar::future<>&& f) noexcept
138  : _future{std::move(f)} {}
139 
140  bool await_ready() noexcept {
141  return _future.available();
142  }
143 
144  coroutine_handle<> await_suspend(coroutine_handle<promise_type> coro) noexcept;
145  void await_resume() noexcept { }
146 };
147 
148 
149 template<NothrowMoveConstructible T, template <typename> class Container>
150 requires Fifo<Container, T>
153 
154  std::optional<seastar::promise<>> _wait_for_next_value;
155  std::optional<seastar::promise<>> _wait_for_free_space;
156  generator_type* _generator = nullptr;
157  const size_t _buffer_capacity;
158 
159 public:
160  template<typename... Args>
161  generator_buffered_promise(buffer_size_t buffer_capacity, Args&&... args)
162  : _buffer_capacity{static_cast<size_t>(buffer_capacity)} {}
165  ~generator_buffered_promise() = default;
166  void return_void() noexcept {
167  if (_wait_for_next_value) {
168  _wait_for_next_value->set_value();
169  _wait_for_next_value = {};
170  }
171  }
172  void unhandled_exception() noexcept;
173 
174  template<std::convertible_to<T> U>
175  yield_awaiter<T, Container> yield_value(U&& value) noexcept {
176  bool ready = _generator->put_next_value(std::forward<U>(value));
177 
178  if (_wait_for_next_value) {
179  _wait_for_next_value->set_value();
180  _wait_for_next_value = {};
181  return {make_ready_future()};
182  }
183  if (ready) {
184  return {make_ready_future()};
185  } else {
186  assert(!_wait_for_free_space);
187  return {_wait_for_free_space.emplace().get_future()};
188  }
189  }
190 
191  auto get_return_object() noexcept -> generator_type;
192  void set_generator(generator_type* g) noexcept {
193  assert(!_generator);
194  _generator = g;
195  }
196 
197  suspend_always initial_suspend() const noexcept { return {}; }
198  suspend_never final_suspend() const noexcept {
199  assert(_generator);
200  _generator->on_finished();
201  return {};
202  }
203 
204  bool is_awaiting() const noexcept {
205  return _wait_for_next_value.has_value();
206  }
207 
208  coroutine_handle<> coroutine() const noexcept {
209  return coroutine_handle<>::from_address(_wait_for_next_value->waiting_task());
210  }
211 
212  seastar::future<> wait_for_next_value() noexcept {
213  assert(!_wait_for_next_value);
214  return _wait_for_next_value.emplace().get_future();
215  }
216 
217  void on_reclaim_free_space() noexcept {
218  assert(_wait_for_free_space);
219  _wait_for_free_space->set_value();
220  _wait_for_free_space = {};
221  }
222 
223 private:
224  void run_and_dispose() noexcept final {
225  using handle_type = coroutine_handle<generator_buffered_promise>;
226  handle_type::from_promise(*this).resume();
227  }
228 
229  seastar::task* waiting_task() noexcept final {
230  if (_wait_for_next_value) {
231  return _wait_for_next_value->waiting_task();
232  } else if (_wait_for_free_space) {
233  return _wait_for_free_space->waiting_task();
234  } else {
235  return nullptr;
236  }
237  }
238 };
239 
240 template<typename T, typename Generator>
241 struct next_awaiter final {
242  using next_value_type = next_value_t<T>;
243  Generator* const _generator;
244  seastar::task* const _task;
245  seastar::future<> _next_value_future;
246 
247 public:
248  next_awaiter(Generator* generator,
250  seastar::future<>&& f) noexcept
251  : _generator{generator}
252  , _task(task)
253  , _next_value_future(std::move(f)) {}
254 
255  next_awaiter(const next_awaiter&) = delete;
256  next_awaiter(next_awaiter&&) = delete;
257 
258  constexpr bool await_ready() const noexcept {
259  return _next_value_future.available() && !seastar::need_preempt();
260  }
261 
262  template<typename Promise>
263  void await_suspend(coroutine_handle<Promise> coro) noexcept {
264  auto& current_task = coro.promise();
265  if (_next_value_future.available()) {
266  seastar::schedule(&current_task);
267  } else {
268  _next_value_future.set_coroutine(current_task);
269  seastar::schedule(_task);
270  }
271  }
272 
273  next_value_type await_resume() {
274  assert(_next_value_future.available());
275  assert(_generator);
276  return _generator->take_next_value();
277  }
278 };
279 
280 } // namespace internal
281 
312 template <typename T, template <typename> class Container>
313 class generator {
314 public:
316 
317 private:
318  using handle_type = internal::coroutine_handle<promise_type>;
319  handle_type _coro;
320  promise_type* _promise;
321  Container<T> _values;
322  const size_t _buffer_capacity;
323  std::exception_ptr _exception;
324 
325 public:
326  generator(size_t buffer_capacity,
327  handle_type coro,
328  promise_type* promise) noexcept
329  : _coro{coro}
330  , _promise{promise}
331  , _buffer_capacity{buffer_capacity} {
332  assert(_promise);
333  _promise->set_generator(this);
334  }
335  generator(const generator&) = delete;
336  generator(generator&& other) noexcept
337  : _coro{std::exchange(other._coro, {})}
338  , _buffer_capacity{other._buffer_capacity} {}
339  generator& operator=(generator&& other) noexcept {
340  if (std::addressof(other) != this) {
341  auto old_coro = std::exchange(_coro, std::exchange(other._coro, {}));
342  if (old_coro) {
343  old_coro.destroy();
344  }
345  }
346  return *this;
347  }
348  ~generator() {
349  if (_coro) {
350  _coro.destroy();
351  }
352  }
353 
354  void swap(generator& other) noexcept {
355  std::swap(_coro, other._coro);
356  }
357 
358  internal::next_awaiter<T, generator> operator()() noexcept {
359  if (!_values.empty()) {
360  return {this, nullptr, make_ready_future<>()};
361  } else if (_exception) [[unlikely]] {
362  return {this, nullptr, make_ready_future<>()};
363  } else if (_promise) {
364  return {this, _promise, _promise->wait_for_next_value()};
365  } else {
366  return {this, nullptr, make_ready_future<>()};
367  }
368  }
369 
370  template<typename U>
371  bool put_next_value(U&& value) {
372  _values.push_back(std::forward<U>(value));
373  return _values.size() < _buffer_capacity;
374  }
375 
376  internal::next_value_t<T> take_next_value() {
377  if (!_values.empty()) [[likely]] {
378  auto value = std::move(_values.front());
379  bool maybe_reclaim = _values.size() == _buffer_capacity;
380  _values.pop_front();
381  if (maybe_reclaim) {
382  if (_promise) [[likely]] {
383  _promise->on_reclaim_free_space();
384  }
385  }
386  return internal::next_value_t<T>(std::move(value));
387  } else if (_exception) [[unlikely]] {
388  std::rethrow_exception(std::exchange(_exception, nullptr));
389  } else {
390  return std::nullopt;
391  }
392  }
393 
394  void on_finished() {
395  _promise = nullptr;
396  _coro = nullptr;
397  }
398 
399  void unhandled_exception() noexcept {
400  // called by promise's unhandled_exception()
401  assert(!_exception);
402  _exception = std::current_exception();
403  }
404 };
405 
406 template <typename T>
407 class generator<T, std::optional> {
408 public:
409  using promise_type = internal::generator_unbuffered_promise<T>;
410 
411 private:
412  using handle_type = internal::coroutine_handle<promise_type>;
413  handle_type _coro;
414  promise_type* _promise;
415  std::optional<T> _maybe_value;
416  std::exception_ptr _exception;
417 
418 public:
419  generator(handle_type coro,
420  promise_type* promise) noexcept
421  : _coro{coro}
422  , _promise{promise} {
423  assert(_promise);
424  _promise->set_generator(this);
425  }
426  generator(const generator&) = delete;
427  generator(generator&& other) noexcept
428  : _coro{std::exchange(other._coro, {})} {}
429  generator& operator=(generator&& other) noexcept {
430  if (std::addressof(other) != this) {
431  auto old_coro = std::exchange(_coro, std::exchange(other._coro, {}));
432  if (old_coro) {
433  old_coro.destroy();
434  }
435  }
436  return *this;
437  }
438  ~generator() {
439  if (_coro) {
440  _coro.destroy();
441  }
442  }
443 
444  void swap(generator& other) noexcept {
445  std::swap(_coro, other._coro);
446  }
447 
448  internal::next_awaiter<T, generator> operator()() noexcept {
449  if (_promise) [[likely]] {
450  return {this, _promise, _promise->wait_for_next_value()};
451  } else {
452  return {this, nullptr, make_ready_future<>()};
453  }
454  }
455 
456  template<typename U>
457  void put_next_value(U&& value) noexcept {
458  _maybe_value.emplace(std::forward<U>(value));
459  }
460 
461  internal::next_value_t<T> take_next_value() {
462  if (_maybe_value.has_value()) [[likely]] {
463  return std::exchange(_maybe_value, std::nullopt);
464  } else if (_exception) [[unlikely]] {
465  std::rethrow_exception(std::exchange(_exception, nullptr));
466  } else {
467  return std::nullopt;
468  }
469  }
470 
471  void on_finished() {
472  _promise = nullptr;
473  _coro = nullptr;
474  }
475 
476  void unhandled_exception() noexcept {
477  assert(!_exception);
478  _exception = std::current_exception();
479  }
480 };
481 
482 namespace internal {
483 
484 template<NothrowMoveConstructible T>
485 void generator_unbuffered_promise<T>::return_void() noexcept {
486  assert(_wait_for_next_value);
487  _wait_for_next_value->set_value();
488  _wait_for_next_value = {};
489 }
490 
491 template<NothrowMoveConstructible T>
492 void generator_unbuffered_promise<T>::unhandled_exception() noexcept {
493  // instead of storing the current exception into promise, in order to be
494  // more consistent, we let generator preserve all the output of produced
495  // value, including the values and the exception if any. so we just signal
496  // _wait_for_next_value, and delegate generator's unhandled_exception() to
497  // store the exception.
498  _generator->unhandled_exception();
499  if (_wait_for_next_value.has_value()) {
500  _wait_for_next_value->set_value();
501  _wait_for_next_value = {};
502  }
503 }
504 
505 template<NothrowMoveConstructible T>
506 auto generator_unbuffered_promise<T>::get_return_object() noexcept -> generator_type {
507  using handle_type = coroutine_handle<generator_unbuffered_promise<T>>;
508  return generator_type{handle_type::from_promise(*this), this};
509 }
510 
511 template<NothrowMoveConstructible T, template <typename> class Container>
512 requires Fifo<Container, T>
513 void generator_buffered_promise<T, Container>::unhandled_exception() noexcept {
514  _generator->unhandled_exception();
515  if (_wait_for_next_value.has_value()) {
516  _wait_for_next_value->set_value();
517  _wait_for_next_value = {};
518  }
519 }
520 
521 template<NothrowMoveConstructible T, template <typename> class Container>
522 requires Fifo<Container, T>
523 auto generator_buffered_promise<T, Container>::get_return_object() noexcept -> generator_type {
524  using handle_type = coroutine_handle<generator_buffered_promise<T, Container>>;
525  return generator_type{_buffer_capacity, handle_type::from_promise(*this), this};
526 }
527 
528 template<typename T, template <typename> class Container>
529 coroutine_handle<> yield_awaiter<T, Container>::await_suspend(
530  coroutine_handle<generator_buffered_promise<T, Container>> coro) noexcept {
531  if (_future.available()) {
532  auto& current_task = coro.promise();
533  seastar::schedule(&current_task);
534  return coro;
535  } else {
536  // we cannot do something like `task.set_coroutine(consumer_task)`.
537  // because, instead of waiting for a subcoroutine, we are pending on
538  // the caller of current coroutine to consume the produced values to
539  // free up at least a free slot in the buffer, if we set the `_task`
540  // of the of the awaiting task, we would have an infinite loop of
541  // "promise->_task".
542  return noop_coroutine();
543  }
544 }
545 
546 } // namespace internal
547 } // namespace seastar::coroutine::experimental
seastar::task * waiting_task() noexcept final
Returns the next task which is waiting for this task to complete execution, or nullptr.
Definition: generator.hh:118
bool available() const noexcept
Checks whether the future is available.
Definition: future.hh:1379
promise - allows a future value to be made available at a later time.
Definition: future.hh:926
Definition: task.hh:35
virtual task * waiting_task() noexcept=0
Returns the next task which is waiting for this task to complete execution, or nullptr.
future< T > make_ready_future(A &&... value) noexcept
Creates a future in an available, value state.
Definition: future.hh:1934