Seastar
High performance C++ framework for concurrent servers
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Modules Pages
when_all.hh
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18
19/*
20 * Copyright (C) 2020 ScyllaDB.
21 */
22
23#pragma once
24
25#ifndef SEASTAR_MODULE
26#include <seastar/core/future.hh>
27#include <seastar/core/loop.hh>
28#include <seastar/util/tuple_utils.hh>
29#include <seastar/util/critical_alloc_section.hh>
30#include <seastar/util/modules.hh>
31#include <cstddef>
32#include <exception>
33#include <tuple>
34#include <type_traits>
35#include <utility>
36#include <vector>
37#endif
38
39namespace seastar {
40
43
44namespace internal {
45
46template<typename... Futures>
47struct identity_futures_tuple {
48 using future_type = future<std::tuple<Futures...>>;
49 using promise_type = typename future_type::promise_type;
50
51 static void set_promise(promise_type& p, std::tuple<Futures...> futures) {
52 p.set_value(std::move(futures));
53 }
54
55 static future_type make_ready_future(std::tuple<Futures...> futures) noexcept {
56 return seastar::make_ready_future<std::tuple<Futures...>>(std::move(futures));
57 }
58
59 static future_type current_exception_as_future() noexcept {
60 return seastar::current_exception_as_future<std::tuple<Futures...>>();
61 }
62};
63
64class when_all_state_base;
65
66// If the future is ready, return true
67// if the future is not ready, chain a continuation to it, and return false
68using when_all_process_element_func = bool (*)(void* future, void* continuation, when_all_state_base* wasb) noexcept;
69
70struct when_all_process_element {
71 when_all_process_element_func func;
72 void* future;
73};
74
75class when_all_state_base {
76 size_t _nr_remain;
77 const when_all_process_element* _processors;
78 void* _continuation;
79public:
80 virtual ~when_all_state_base() {}
81 when_all_state_base(size_t nr_remain, const when_all_process_element* processors, void* continuation)
82 : _nr_remain(nr_remain), _processors(processors), _continuation(continuation) {
83 }
84 virtual task* waiting_task() = 0;
85 void complete_one() noexcept {
86 // We complete in reverse order; if the futures happen to complete
87 // in order, then waiting for the last one will find the rest ready
88 --_nr_remain;
89 while (_nr_remain) {
90 bool ready = process_one(_nr_remain - 1);
91 if (!ready) {
92 return;
93 }
94 --_nr_remain;
95 }
96 if (!_nr_remain) {
97 delete this;
98 }
99 }
100 void do_wait_all() noexcept {
101 ++_nr_remain; // fake pending completion for complete_one()
102 complete_one();
103 }
104 bool process_one(size_t idx) noexcept {
105 auto p = _processors[idx];
106 return p.func(p.future, _continuation, this);
107 }
108};
109
110template <typename Future>
111class when_all_state_component final : public continuation_base_from_future_t<Future> {
112 when_all_state_base* _base;
113 Future* _final_resting_place;
114public:
115 static bool process_element_func(void* future, void* continuation, when_all_state_base* wasb) noexcept {
116 auto f = reinterpret_cast<Future*>(future);
117 if (f->available()) {
118 return true;
119 } else {
120 auto c = new (continuation) when_all_state_component(wasb, f);
121 set_callback(std::move(*f), c);
122 return false;
123 }
124 }
125 when_all_state_component(when_all_state_base *base, Future* future) noexcept : _base(base), _final_resting_place(future) {}
126 task* waiting_task() noexcept override { return _base->waiting_task(); }
127 virtual void run_and_dispose() noexcept override {
128 using futurator = futurize<Future>;
129 if (__builtin_expect(this->_state.failed(), false)) {
130 *_final_resting_place = futurator::make_exception_future(std::move(this->_state).get_exception());
131 } else {
132 *_final_resting_place = futurator::from_tuple(std::move(this->_state).get_value());
133 }
134 auto base = _base;
135 this->~when_all_state_component();
136 base->complete_one();
137 }
138};
139
140template<typename ResolvedTupleTransform, typename... Futures>
141class when_all_state : public when_all_state_base {
142 static constexpr size_t nr = sizeof...(Futures);
143 using type = std::tuple<Futures...>;
144 type tuple;
145 // We only schedule one continuation at a time, and store it in _cont.
146 // This way, if while the future we wait for completes, some other futures
147 // also complete, we won't need to schedule continuations for them.
148 alignas(when_all_state_component<Futures>...) std::byte _cont[std::max({sizeof(when_all_state_component<Futures>)...})];
149 when_all_process_element _processors[nr];
150public:
151 typename ResolvedTupleTransform::promise_type p;
152 when_all_state(Futures&&... t) : when_all_state_base(nr, _processors, &_cont), tuple(std::make_tuple(std::move(t)...)) {
153 init_element_processors(std::make_index_sequence<nr>());
154 }
155 virtual ~when_all_state() {
156 ResolvedTupleTransform::set_promise(p, std::move(tuple));
157 }
158 task* waiting_task() noexcept override {
159 return p.waiting_task();
160 }
161private:
162 template <size_t... Idx>
163 void init_element_processors(std::index_sequence<Idx...>) {
164 auto ignore = {
165 0,
166 (_processors[Idx] = when_all_process_element{
167 when_all_state_component<std::tuple_element_t<Idx, type>>::process_element_func,
168 &std::get<Idx>(tuple)
169 }, 0)...
170 };
171 (void)ignore;
172 }
173public:
174 static typename ResolvedTupleTransform::future_type wait_all(Futures&&... futures) noexcept {
175 if ((futures.available() && ...)) {
176 return ResolvedTupleTransform::make_ready_future(std::make_tuple(std::move(futures)...));
177 }
178 auto state = [&] () noexcept {
179 memory::scoped_critical_alloc_section _;
180 return new when_all_state(std::move(futures)...);
181 }();
182 auto ret = state->p.get_future();
183 state->do_wait_all();
184 return ret;
185 }
186};
187
188} // namespace internal
189
191
192namespace impl {
193
194// Want: folds
195
196template <typename T>
197struct is_tuple_of_futures : std::false_type {
198};
199
200template <>
201struct is_tuple_of_futures<std::tuple<>> : std::true_type {
202};
203
204template <typename... T, typename... Rest>
205struct is_tuple_of_futures<std::tuple<future<T...>, Rest...>> : is_tuple_of_futures<std::tuple<Rest...>> {
206};
207
208}
209
210template <typename... Futs>
211concept AllAreFutures = impl::is_tuple_of_futures<std::tuple<Futs...>>::value;
212
213template<typename Fut, std::enable_if_t<is_future<Fut>::value, int> = 0>
214auto futurize_invoke_if_func(Fut&& fut) noexcept {
215 return std::forward<Fut>(fut);
216}
217
218template<typename Func, std::enable_if_t<!is_future<Func>::value, int> = 0>
219auto futurize_invoke_if_func(Func&& func) noexcept {
220 return futurize_invoke(std::forward<Func>(func));
221}
223
224namespace internal {
225
226template <typename... Futs>
227requires seastar::AllAreFutures<Futs...>
228inline
229future<std::tuple<Futs...>>
230when_all_impl(Futs&&... futs) noexcept {
231 using state = when_all_state<identity_futures_tuple<Futs...>, Futs...>;
232 return state::wait_all(std::forward<Futs>(futs)...);
233}
234
235} // namespace internal
236
250SEASTAR_MODULE_EXPORT
251template <typename... FutOrFuncs>
252inline auto when_all(FutOrFuncs&&... fut_or_funcs) noexcept {
253 return internal::when_all_impl(futurize_invoke_if_func(std::forward<FutOrFuncs>(fut_or_funcs))...);
254}
255
256namespace internal {
257
258template<typename Future>
259struct identity_futures_vector {
260 using future_type = future<std::vector<Future>>;
261 static future_type run(std::vector<Future> futures) noexcept {
262 return make_ready_future<std::vector<Future>>(std::move(futures));
263 }
264 static future_type current_exception_as_future() noexcept {
265 return seastar::current_exception_as_future<std::vector<Future>>();
266 }
267};
268
269// Internal function for when_all().
270template <typename ResolvedVectorTransform, typename Future>
271inline
272typename ResolvedVectorTransform::future_type
273complete_when_all(std::vector<Future>&& futures, typename std::vector<Future>::iterator pos) noexcept {
274 // If any futures are already ready, skip them.
275 while (pos != futures.end() && pos->available()) {
276 ++pos;
277 }
278 // Done?
279 if (pos == futures.end()) {
280 return ResolvedVectorTransform::run(std::move(futures));
281 }
282 // Wait for unready future, store, and continue.
283 return pos->then_wrapped([futures = std::move(futures), pos] (auto fut) mutable {
284 *pos++ = std::move(fut);
285 return complete_when_all<ResolvedVectorTransform>(std::move(futures), pos);
286 });
287}
288
289template<typename ResolvedVectorTransform, typename FutureIterator>
290inline auto
291do_when_all(FutureIterator begin, FutureIterator end) noexcept {
292 using itraits = std::iterator_traits<FutureIterator>;
293 auto make_values_vector = [] (size_t size) noexcept {
294 memory::scoped_critical_alloc_section _;
295 std::vector<typename itraits::value_type> ret;
296 ret.reserve(size);
297 return ret;
298 };
299 std::vector<typename itraits::value_type> ret =
300 make_values_vector(iterator_range_estimate_vector_capacity(begin, end, typename itraits::iterator_category()));
301 // Important to invoke the *begin here, in case it's a function iterator,
302 // so we launch all computation in parallel.
303 std::move(begin, end, std::back_inserter(ret));
304 return complete_when_all<ResolvedVectorTransform>(std::move(ret), ret.begin());
305}
306
307} // namespace internal
308
319SEASTAR_MODULE_EXPORT
320template <typename FutureIterator>
321requires requires (FutureIterator i) { { *i++ }; requires is_future<std::remove_reference_t<decltype(*i)>>::value; }
322inline
324when_all(FutureIterator begin, FutureIterator end) noexcept {
325 namespace si = internal;
326 using itraits = std::iterator_traits<FutureIterator>;
327 using result_transform = si::identity_futures_vector<typename itraits::value_type>;
328 try {
329 return si::do_when_all<result_transform>(std::move(begin), std::move(end));
330 } catch (...) {
332 }
333}
334
335namespace internal {
336
337template<typename Future>
338struct future_has_value {
339 enum {
340 value = !std::is_same_v<std::decay_t<Future>, future<>>
341 };
342};
343
344template<typename Tuple>
345struct tuple_to_future;
346
347template<typename... Elements>
348struct tuple_to_future<std::tuple<Elements...>> {
349 using value_type = std::tuple<Elements...>;
350 using type = future<value_type>;
351 using promise_type = promise<value_type>;
352
353 // Elements... all come from futures, so we know they are nothrow move
354 // constructible. `future` also has a static assertion to that effect.
355
356 static auto make_ready(std::tuple<Elements...> t) noexcept {
357 return make_ready_future<value_type>(value_type(std::move(t)));
358 }
359
360 static auto make_failed(std::exception_ptr excp) noexcept {
361 return seastar::make_exception_future<value_type>(std::move(excp));
362 }
363};
364
365template<typename... Futures>
366class extract_values_from_futures_tuple {
367 static auto transform(std::tuple<Futures...> futures) noexcept {
368 auto prepare_result = [] (auto futures) noexcept {
369 auto fs = tuple_filter_by_type<internal::future_has_value>(std::move(futures));
370 return tuple_map(std::move(fs), [] (auto&& e) {
371 return e.get();
372 });
373 };
374
375 using tuple_futurizer = internal::tuple_to_future<decltype(prepare_result(std::move(futures)))>;
376
377 std::exception_ptr excp;
378 tuple_for_each(futures, [&excp] (auto& f) {
379 if (!excp) {
380 if (f.failed()) {
381 excp = f.get_exception();
382 }
383 } else {
384 f.ignore_ready_future();
385 }
386 });
387 if (excp) {
388 return tuple_futurizer::make_failed(std::move(excp));
389 }
390
391 return tuple_futurizer::make_ready(prepare_result(std::move(futures)));
392 }
393public:
394 using future_type = decltype(transform(std::declval<std::tuple<Futures...>>()));
395 using promise_type = typename future_type::promise_type;
396
397 static void set_promise(promise_type& p, std::tuple<Futures...> tuple) {
398 transform(std::move(tuple)).forward_to(std::move(p));
399 }
400
401 static future_type make_ready_future(std::tuple<Futures...> tuple) noexcept {
402 return transform(std::move(tuple));
403 }
404
405 static future_type current_exception_as_future() noexcept {
406 future_type (*type_deduct)() = current_exception_as_future;
407 return type_deduct();
408 }
409};
410
411template<typename Future>
412struct extract_values_from_futures_vector {
413 using value_type = decltype(untuple(std::declval<typename Future::tuple_type>()));
414
415 using future_type = future<std::vector<value_type>>;
416
417 static future_type run(std::vector<Future> futures) noexcept {
418 auto make_values_vector = [] (size_t size) noexcept {
419 memory::scoped_critical_alloc_section _;
420 std::vector<value_type> values;
421 values.reserve(size);
422 return values;
423 };
424 std::vector<value_type> values = make_values_vector(futures.size());
425
426 std::exception_ptr excp;
427 for (auto&& f : futures) {
428 if (!excp) {
429 if (f.failed()) {
430 excp = f.get_exception();
431 } else {
432 values.emplace_back(f.get());
433 }
434 } else {
435 f.ignore_ready_future();
436 }
437 }
438 if (excp) {
439 return seastar::make_exception_future<std::vector<value_type>>(std::move(excp));
440 }
441 return make_ready_future<std::vector<value_type>>(std::move(values));
442 }
443
444 static future_type current_exception_as_future() noexcept {
445 return seastar::current_exception_as_future<std::vector<value_type>>();
446 }
447};
448
449template<>
450struct extract_values_from_futures_vector<future<>> {
451 using future_type = future<>;
452
453 static future_type run(std::vector<future<>> futures) noexcept {
454 std::exception_ptr excp;
455 for (auto&& f : futures) {
456 if (!excp) {
457 if (f.failed()) {
458 excp = f.get_exception();
459 }
460 } else {
461 f.ignore_ready_future();
462 }
463 }
464 if (excp) {
465 return seastar::make_exception_future<>(std::move(excp));
466 }
467 return make_ready_future<>();
468 }
469
470 static future_type current_exception_as_future() noexcept {
471 return seastar::current_exception_as_future<>();
472 }
473};
474
475template<typename... Futures>
476requires seastar::AllAreFutures<Futures...>
477inline auto when_all_succeed_impl(Futures&&... futures) noexcept {
478 using state = when_all_state<extract_values_from_futures_tuple<Futures...>, Futures...>;
479 return state::wait_all(std::forward<Futures>(futures)...);
480}
481
482} // namespace internal
483
494SEASTAR_MODULE_EXPORT
495template <typename... FutOrFuncs>
496inline auto when_all_succeed(FutOrFuncs&&... fut_or_funcs) noexcept {
497 return internal::when_all_succeed_impl(futurize_invoke_if_func(std::forward<FutOrFuncs>(fut_or_funcs))...);
498}
499
510SEASTAR_MODULE_EXPORT
511template <typename FutureIterator, typename = typename std::iterator_traits<FutureIterator>::value_type>
512requires requires (FutureIterator i) {
513 *i++;
514 { i != i } -> std::convertible_to<bool>;
515 requires is_future<std::remove_reference_t<decltype(*i)>>::value;
516}
517inline auto
518when_all_succeed(FutureIterator begin, FutureIterator end) noexcept {
519 using itraits = std::iterator_traits<FutureIterator>;
520 using result_transform = internal::extract_values_from_futures_vector<typename itraits::value_type>;
521 try {
522 return internal::do_when_all<result_transform>(std::move(begin), std::move(end));
523 } catch (...) {
525 }
526}
527
528
540SEASTAR_MODULE_EXPORT
541template <typename T>
542inline auto
543when_all_succeed(std::vector<future<T>>&& futures) noexcept {
544 using result_transform = internal::extract_values_from_futures_vector<future<T>>;
545 try {
546 return internal::complete_when_all<result_transform>(std::move(futures), futures.begin());
547 } catch (...) {
549 }
550}
551
553
554} // namespace seastar
A representation of a possibly not-yet-computed value.
Definition: future.hh:1240
internal::future_stored_type_t< T > value_type
The data type carried by the future.
Definition: future.hh:1317
promise< T > promise_type
The data type carried by the future.
Definition: future.hh:1320
future< T > make_ready_future(A &&... value) noexcept
Creates a future in an available, value state.
Definition: future.hh:1943
future< T > make_exception_future(std::exception_ptr &&value) noexcept
Creates a future in an available, failed state.
Definition: future.hh:1949
future< T > current_exception_as_future() noexcept
Returns std::current_exception() wrapped in a future.
Definition: future.hh:1962
auto when_all_succeed(FutOrFuncs &&... fut_or_funcs) noexcept
Definition: when_all.hh:496
auto when_all(FutOrFuncs &&... fut_or_funcs) noexcept
Definition: when_all.hh:252
auto tuple_map(const std::tuple< Elements... > &t, Function &&f)
Definition: tuple_utils.hh:139
void tuple_for_each(const std::tuple< Elements... > &t, Function &&f)
Definition: tuple_utils.hh:157
holds the implementation parts of the metrics layer, do not use directly.
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
STL namespace.