Seastar
High performance C++ framework for concurrent servers
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Modules Pages
parallel_for_each.hh
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18/*
19 * Copyright (C) 2022-present ScyllaDB
20 */
21
22#pragma once
23
24#include <ranges>
25
26#include <boost/container/small_vector.hpp>
27
28#include <seastar/core/loop.hh>
29#include <seastar/core/coroutine.hh>
30#include <seastar/core/reactor.hh>
31
32namespace seastar::coroutine {
33
61template <typename Func>
62// constaints for Func are defined at the parallel_for_each constructor
63class [[nodiscard("must co_await an parallel_for_each() object")]] parallel_for_each final : continuation_base<> {
64 using coroutine_handle_t = std::coroutine_handle<void>;
65
66 Func _func;
67 boost::container::small_vector<future<>, 5> _futures;
68 std::exception_ptr _ex;
69 coroutine_handle_t _when_ready;
70 task* _waiting_task = nullptr;
71
72 // Consume futures in reverse order.
73 // Since futures at the front are expected
74 // to become ready before futures at the back,
75 // therefore it is less likely we will have
76 // to wait on them, after the back futures
77 // become available.
78 //
79 // Return true iff all futures were consumed.
80 bool consume_next() noexcept {
81 while (!_futures.empty()) {
82 auto& fut = _futures.back();
83 if (!fut.available()) {
84 return false;
85 }
86 if (fut.failed()) {
87 _ex = fut.get_exception();
88 }
89 _futures.pop_back();
90 }
91 return true;
92 }
93
94 void set_callback() noexcept {
95 // To reuse `this` as continuation_base<>
96 // we must reset _state, to allow setting
97 // it again.
98 this->_state = {};
99 seastar::internal::set_callback(std::move(_futures.back()), reinterpret_cast<continuation_base<>*>(this));
100 _futures.pop_back();
101 }
102
103 void resume_or_set_callback() noexcept {
104 if (consume_next()) {
105 local_engine->set_current_task(_waiting_task);
106 _when_ready.resume();
107 } else {
108 set_callback();
109 }
110 }
111
112public:
113 // clang 13.0.1 doesn't support subrange
114 // so provide also a Iterator/Sentinel based constructor.
115 // See https://github.com/llvm/llvm-project/issues/46091
116 template <typename Iterator, typename Sentinel, typename Func1>
117 requires (std::same_as<Sentinel, Iterator> || std::sentinel_for<Sentinel, Iterator>)
118 && std::same_as<future<>, futurize_t<std::invoke_result_t<Func, typename std::iterator_traits<Iterator>::reference>>>
119 explicit parallel_for_each(Iterator begin, Sentinel end, Func1&& func) noexcept
120 : _func(std::forward<Func1>(func))
121 {
122 for (auto it = begin; it != end; ++it) {
123 auto fut = futurize_invoke(_func, *it);
124 if (fut.available()) {
125 if (fut.failed()) {
126 _ex = fut.get_exception();
127 }
128 } else {
130 if (_futures.empty()) {
131 using itraits = std::iterator_traits<Iterator>;
132 if constexpr (seastar::internal::has_iterator_category<Iterator>::value) {
133 auto n = seastar::internal::iterator_range_estimate_vector_capacity(it, end, typename itraits::iterator_category{});
134 _futures.reserve(n);
135 }
136 }
137 _futures.push_back(std::move(fut));
138 }
139 }
140 }
141
142 template <std::ranges::range Range, typename Func1>
143 requires std::invocable<Func, std::ranges::range_reference_t<Range>>
144 explicit parallel_for_each(Range&& range, Func1&& func) noexcept
145 : parallel_for_each(std::ranges::begin(range), std::ranges::end(range), std::forward<Func1>(func))
146 { }
147
148 bool await_ready() const noexcept {
149 if (_futures.empty()) {
150 return !_ex;
151 }
152 return false;
153 }
154
155 template<typename T>
156 void await_suspend(std::coroutine_handle<T> h) {
157 _when_ready = h;
158 _waiting_task = &h.promise();
159 resume_or_set_callback();
160 }
161
162 void await_resume() const {
163 if (_ex) [[unlikely]] {
164 std::rethrow_exception(std::move(_ex));
165 }
166 }
167
168 virtual void run_and_dispose() noexcept override {
169 if (this->_state.failed()) {
170 _ex = std::move(this->_state).get_exception();
171 }
172 resume_or_set_callback();
173 }
174
175 virtual task* waiting_task() noexcept override {
176 return _waiting_task;
177 }
178};
179
180template <typename Iterator, typename Sentinel, typename Func>
181requires (std::same_as<Sentinel, Iterator> || std::sentinel_for<Sentinel, Iterator>)
182 && std::same_as<future<>, futurize_t<std::invoke_result_t<Func, typename std::iterator_traits<Iterator>::reference>>>
183parallel_for_each(Iterator begin, Sentinel end, Func&& func) -> parallel_for_each<Func>;
184
185template <std::ranges::range Range,
186 std::invocable<std::ranges::range_reference_t<Range>> Func>
187parallel_for_each(Range&& range, Func&& func) -> parallel_for_each<Func>;
188
189
190
191}
Definition: parallel_for_each.hh:63
A representation of a possibly not-yet-computed value.
Definition: future.hh:1240
Definition: task.hh:34
future parallel_for_each(Iterator begin, Sentinel end, Func &&func) noexcept
Run tasks in parallel (iterator version).
Definition: loop.hh:568
Definition: critical_alloc_section.hh:80