26#include <boost/container/small_vector.hpp>
28#include <seastar/core/loop.hh>
29#include <seastar/core/coroutine.hh>
30#include <seastar/core/reactor.hh>
32namespace seastar::coroutine {
61template <
typename Func>
64 using coroutine_handle_t = std::coroutine_handle<void>;
67 boost::container::small_vector<future<>, 5> _futures;
68 std::exception_ptr _ex;
69 coroutine_handle_t _when_ready;
70 task* _waiting_task =
nullptr;
80 bool consume_next()
noexcept {
81 while (!_futures.empty()) {
82 auto& fut = _futures.back();
83 if (!fut.available()) {
87 _ex = fut.get_exception();
94 void set_callback()
noexcept {
99 seastar::internal::set_callback(std::move(_futures.back()),
reinterpret_cast<continuation_base<>*
>(
this));
103 void resume_or_set_callback()
noexcept {
104 if (consume_next()) {
105 local_engine->set_current_task(_waiting_task);
106 _when_ready.resume();
116 template <
typename Iterator,
typename Sentinel,
typename Func1>
117 requires (std::same_as<Sentinel, Iterator> || std::sentinel_for<Sentinel, Iterator>)
118 && std::same_as<
future<>, futurize_t<std::invoke_result_t<Func,
typename std::iterator_traits<Iterator>::reference>>>
120 : _func(std::forward<Func1>(func))
122 for (
auto it = begin; it != end; ++it) {
123 auto fut = futurize_invoke(_func, *it);
124 if (fut.available()) {
126 _ex = fut.get_exception();
130 if (_futures.empty()) {
131 using itraits = std::iterator_traits<Iterator>;
132 if constexpr (seastar::internal::has_iterator_category<Iterator>::value) {
133 auto n = seastar::internal::iterator_range_estimate_vector_capacity(it, end,
typename itraits::iterator_category{});
137 _futures.push_back(std::move(fut));
142 template <std::ranges::range Range,
typename Func1>
143 requires std::invocable<Func, std::ranges::range_reference_t<Range>>
145 :
parallel_for_each(std::ranges::begin(range), std::ranges::end(range), std::forward<Func1>(func))
148 bool await_ready()
const noexcept {
149 if (_futures.empty()) {
156 void await_suspend(std::coroutine_handle<T> h) {
158 _waiting_task = &h.promise();
159 resume_or_set_callback();
162 void await_resume()
const {
163 if (_ex) [[unlikely]] {
164 std::rethrow_exception(std::move(_ex));
168 virtual void run_and_dispose()
noexcept override {
169 if (this->_state.failed()) {
170 _ex = std::move(this->_state).get_exception();
172 resume_or_set_callback();
175 virtual task* waiting_task()
noexcept override {
176 return _waiting_task;
180template <
typename Iterator,
typename Sentinel,
typename Func>
181requires (std::same_as<Sentinel, Iterator> || std::sentinel_for<Sentinel, Iterator>)
182 && std::same_as<
future<>, futurize_t<std::invoke_result_t<Func,
typename std::iterator_traits<Iterator>::reference>>>
185template <std::ranges::range Range,
186 std::invocable<std::ranges::range_reference_t<Range>> Func>
Definition: parallel_for_each.hh:63
A representation of a possibly not-yet-computed value.
Definition: future.hh:1240
future parallel_for_each(Iterator begin, Sentinel end, Func &&func) noexcept
Run tasks in parallel (iterator version).
Definition: loop.hh:568
Definition: critical_alloc_section.hh:80