26#include <boost/container/small_vector.hpp>
28#include <seastar/core/loop.hh>
29#include <seastar/core/coroutine.hh>
30#include <seastar/core/reactor.hh>
32namespace seastar::coroutine {
61template <
typename Func>
64 using coroutine_handle_t = std::coroutine_handle<void>;
67 boost::container::small_vector<future<>, 5> _futures;
68 std::exception_ptr _ex;
69 coroutine_handle_t _when_ready;
70 task* _waiting_task =
nullptr;
80 bool consume_next()
noexcept {
81 while (!_futures.empty()) {
82 auto& fut = _futures.back();
83 if (!fut.available()) {
87 _ex = fut.get_exception();
94 void set_callback()
noexcept {
99 seastar::internal::set_callback(std::move(_futures.back()),
reinterpret_cast<continuation_base<>*
>(
this));
103 void resume_or_set_callback()
noexcept {
104 if (consume_next()) {
105 local_engine->set_current_task(_waiting_task);
106 _when_ready.resume();
116 template <
typename Iterator,
typename Sentinel,
typename Func1>
117 requires (std::same_as<Sentinel, Iterator> || std::sentinel_for<Sentinel, Iterator>)
118 && std::same_as<
future<>, futurize_t<std::invoke_result_t<Func,
typename std::iterator_traits<Iterator>::reference>>>
120 : _func(std::forward<Func1>(func))
122 for (
auto it = begin; it != end; ++it) {
123 auto fut = futurize_invoke(_func, *it);
124 if (fut.available()) {
126 _ex = fut.get_exception();
130 if (_futures.empty()) {
131 if constexpr (seastar::internal::has_iterator_category<Iterator>::value) {
132 auto n = seastar::internal::iterator_range_estimate_vector_capacity(it, end);
136 _futures.push_back(std::move(fut));
141 template <std::ranges::range Range,
typename Func1>
142 requires std::invocable<Func, std::ranges::range_reference_t<Range>>
144 :
parallel_for_each(std::ranges::begin(range), std::ranges::end(range), std::forward<Func1>(func))
147 bool await_ready()
const noexcept {
148 if (_futures.empty()) {
155 void await_suspend(std::coroutine_handle<T> h) {
157 _waiting_task = &h.promise();
158 resume_or_set_callback();
161 void await_resume()
const {
162 if (_ex) [[unlikely]] {
163 std::rethrow_exception(std::move(_ex));
167 virtual void run_and_dispose()
noexcept override {
168 if (this->_state.failed()) {
169 _ex = std::move(this->_state).get_exception();
171 resume_or_set_callback();
174 virtual task* waiting_task()
noexcept override {
175 return _waiting_task;
179template <
typename Iterator,
typename Sentinel,
typename Func>
180requires (std::same_as<Sentinel, Iterator> || std::sentinel_for<Sentinel, Iterator>)
181 && std::same_as<
future<>, futurize_t<std::invoke_result_t<Func,
typename std::iterator_traits<Iterator>::reference>>>
184template <std::ranges::range Range,
185 std::invocable<std::ranges::range_reference_t<Range>> Func>
Definition: parallel_for_each.hh:63
A representation of a possibly not-yet-computed value.
Definition: future.hh:1197
future parallel_for_each(Iterator begin, Sentinel end, Func &&func) noexcept
Run tasks in parallel (iterator version).
Definition: loop.hh:572
Definition: critical_alloc_section.hh:80