26#include <seastar/core/future.hh>
27#include <seastar/core/loop.hh>
28#include <seastar/util/tuple_utils.hh>
29#include <seastar/util/critical_alloc_section.hh>
30#include <seastar/util/modules.hh>
46template<
typename... Futures>
47struct identity_futures_tuple {
48 using future_type = future<std::tuple<Futures...>>;
51 static void set_promise(promise_type& p, std::tuple<Futures...> futures) {
52 p.set_value(std::move(futures));
64class when_all_state_base;
68using when_all_process_element_func = bool (*)(
void* future,
void* continuation, when_all_state_base* wasb)
noexcept;
70struct when_all_process_element {
71 when_all_process_element_func func;
75class when_all_state_base {
77 const when_all_process_element* _processors;
80 virtual ~when_all_state_base() {}
81 when_all_state_base(
size_t nr_remain,
const when_all_process_element* processors,
void* continuation)
82 : _nr_remain(nr_remain), _processors(processors), _continuation(continuation) {
84 virtual task* waiting_task() = 0;
85 void complete_one() noexcept {
90 bool ready = process_one(_nr_remain - 1);
100 void do_wait_all() noexcept {
104 bool process_one(
size_t idx)
noexcept {
105 auto p = _processors[idx];
106 return p.func(p.future, _continuation,
this);
110template <
typename Future>
111class when_all_state_component final :
public continuation_base_from_future_t<Future> {
112 when_all_state_base* _base;
113 Future* _final_resting_place;
115 static bool process_element_func(
void* future,
void* continuation, when_all_state_base* wasb)
noexcept {
116 auto f =
reinterpret_cast<Future*
>(future);
117 if (f->available()) {
120 auto c =
new (continuation) when_all_state_component(wasb, f);
121 set_callback(std::move(*f), c);
125 when_all_state_component(when_all_state_base *base, Future* future) noexcept : _base(base), _final_resting_place(future) {}
126 task* waiting_task() noexcept
override {
return _base->waiting_task(); }
127 virtual void run_and_dispose() noexcept
override {
128 using futurator = futurize<Future>;
129 if (__builtin_expect(this->_state.failed(),
false)) {
132 *_final_resting_place = futurator::from_tuple(std::move(this->_state).get_value());
135 this->~when_all_state_component();
136 base->complete_one();
140template<
typename ResolvedTupleTransform,
typename... Futures>
141class when_all_state :
public when_all_state_base {
142 static constexpr size_t nr =
sizeof...(Futures);
143 using type = std::tuple<Futures...>;
148 alignas(when_all_state_component<Futures>...) std::byte _cont[std::max({
sizeof(when_all_state_component<Futures>)...})];
149 when_all_process_element _processors[nr];
151 typename ResolvedTupleTransform::promise_type p;
152 when_all_state(Futures&&... t) : when_all_state_base(nr, _processors, &_cont), tuple(
std::make_tuple(
std::move(t)...)) {
153 init_element_processors(std::make_index_sequence<nr>());
155 virtual ~when_all_state() {
156 ResolvedTupleTransform::set_promise(p, std::move(tuple));
158 task* waiting_task() noexcept
override {
159 return p.waiting_task();
162 template <
size_t... Idx>
163 void init_element_processors(std::index_sequence<Idx...>) {
166 (_processors[Idx] = when_all_process_element{
167 when_all_state_component<std::tuple_element_t<Idx, type>>::process_element_func,
168 &std::get<Idx>(tuple)
174 static typename ResolvedTupleTransform::future_type wait_all(Futures&&... futures)
noexcept {
175 if ((futures.available() && ...)) {
178 auto state = [&] ()
noexcept {
179 memory::scoped_critical_alloc_section _;
180 return new when_all_state(std::move(futures)...);
182 auto ret = state->p.get_future();
183 state->do_wait_all();
197struct is_tuple_of_futures : std::false_type {
201struct is_tuple_of_futures<
std::tuple<>> : std::true_type {
204template <
typename... T,
typename... Rest>
205struct is_tuple_of_futures<
std::tuple<future<T...>, Rest...>> : is_tuple_of_futures<std::tuple<Rest...>> {
210template <
typename... Futs>
211concept AllAreFutures = impl::is_tuple_of_futures<std::tuple<Futs...>>::value;
213template<typename Fut, std::enable_if_t<is_future<Fut>::value,
int> = 0>
214auto futurize_invoke_if_func(Fut&& fut)
noexcept {
215 return std::forward<Fut>(fut);
218template<typename Func, std::enable_if_t<!is_future<Func>::value,
int> = 0>
219auto futurize_invoke_if_func(Func&& func)
noexcept {
220 return futurize_invoke(std::forward<Func>(func));
226template <
typename... Futs>
227requires seastar::AllAreFutures<Futs...>
229future<std::tuple<Futs...>>
230when_all_impl(Futs&&... futs)
noexcept {
231 using state = when_all_state<identity_futures_tuple<Futs...>, Futs...>;
232 return state::wait_all(std::forward<Futs>(futs)...);
251template <
typename... FutOrFuncs>
252inline auto when_all(FutOrFuncs&&... fut_or_funcs)
noexcept {
253 return internal::when_all_impl(futurize_invoke_if_func(std::forward<FutOrFuncs>(fut_or_funcs))...);
258template<
typename Future>
259struct identity_futures_vector {
260 using future_type = future<std::vector<Future>>;
261 static future_type run(std::vector<Future> futures)
noexcept {
262 return make_ready_future<std::vector<Future>>(std::move(futures));
265 return seastar::current_exception_as_future<std::vector<Future>>();
270template <
typename ResolvedVectorTransform,
typename Future>
272typename ResolvedVectorTransform::future_type
273complete_when_all(std::vector<Future>&& futures,
typename std::vector<Future>::iterator pos)
noexcept {
275 while (pos != futures.end() && pos->available()) {
279 if (pos == futures.end()) {
280 return ResolvedVectorTransform::run(std::move(futures));
283 return pos->then_wrapped([futures = std::move(futures), pos] (
auto fut)
mutable {
284 *pos++ = std::move(fut);
285 return complete_when_all<ResolvedVectorTransform>(std::move(futures), pos);
289template<
typename ResolvedVectorTransform,
typename FutureIterator>
291do_when_all(FutureIterator begin, FutureIterator end)
noexcept {
292 using itraits = std::iterator_traits<FutureIterator>;
293 auto make_values_vector = [] (
size_t size)
noexcept {
294 memory::scoped_critical_alloc_section _;
295 std::vector<typename itraits::value_type> ret;
299 std::vector<typename itraits::value_type> ret =
300 make_values_vector(iterator_range_estimate_vector_capacity(begin, end,
typename itraits::iterator_category()));
303 std::move(begin, end, std::back_inserter(ret));
304 return complete_when_all<ResolvedVectorTransform>(std::move(ret), ret.begin());
320template <
typename FutureIterator>
321requires requires (FutureIterator i) { { *i++ };
requires is_future<std::remove_reference_t<
decltype(*i)>>::value; }
324when_all(FutureIterator begin, FutureIterator end)
noexcept {
325 namespace si = internal;
326 using itraits = std::iterator_traits<FutureIterator>;
327 using result_transform = si::identity_futures_vector<typename itraits::value_type>;
329 return si::do_when_all<result_transform>(std::move(begin), std::move(end));
337template<
typename Future>
338struct future_has_value {
340 value = !std::is_same_v<std::decay_t<Future>, future<>>
344template<
typename Tuple>
345struct tuple_to_future;
347template<
typename... Elements>
348struct tuple_to_future<
std::tuple<Elements...>> {
349 using value_type = std::tuple<Elements...>;
350 using type = future<value_type>;
351 using promise_type = promise<value_type>;
356 static auto make_ready(std::tuple<Elements...> t)
noexcept {
357 return make_ready_future<value_type>(value_type(std::move(t)));
360 static auto make_failed(std::exception_ptr excp)
noexcept {
361 return seastar::make_exception_future<value_type>(std::move(excp));
365template<
typename... Futures>
366class extract_values_from_futures_tuple {
367 static auto transform(std::tuple<Futures...> futures)
noexcept {
368 auto prepare_result = [] (
auto futures)
noexcept {
369 auto fs = tuple_filter_by_type<internal::future_has_value>(std::move(futures));
370 return tuple_map(std::move(fs), [] (
auto&& e) {
375 using tuple_futurizer = internal::tuple_to_future<
decltype(prepare_result(std::move(futures)))>;
377 std::exception_ptr excp;
381 excp = f.get_exception();
384 f.ignore_ready_future();
388 return tuple_futurizer::make_failed(std::move(excp));
391 return tuple_futurizer::make_ready(prepare_result(std::move(futures)));
394 using future_type =
decltype(transform(std::declval<std::tuple<Futures...>>()));
395 using promise_type =
typename future_type::promise_type;
397 static void set_promise(promise_type& p, std::tuple<Futures...> tuple) {
398 transform(std::move(tuple)).forward_to(std::move(p));
402 return transform(std::move(tuple));
407 return type_deduct();
411template<
typename Future>
412struct extract_values_from_futures_vector {
413 using value_type =
decltype(untuple(std::declval<typename Future::tuple_type>()));
415 using future_type = future<std::vector<value_type>>;
417 static future_type run(std::vector<Future> futures)
noexcept {
418 auto make_values_vector = [] (
size_t size)
noexcept {
419 memory::scoped_critical_alloc_section _;
420 std::vector<value_type> values;
421 values.reserve(size);
424 std::vector<value_type> values = make_values_vector(futures.size());
426 std::exception_ptr excp;
427 for (
auto&& f : futures) {
430 excp = f.get_exception();
432 values.emplace_back(f.get());
435 f.ignore_ready_future();
439 return seastar::make_exception_future<std::vector<value_type>>(std::move(excp));
441 return make_ready_future<std::vector<value_type>>(std::move(values));
445 return seastar::current_exception_as_future<std::vector<value_type>>();
450struct extract_values_from_futures_vector<future<>> {
451 using future_type = future<>;
453 static future_type run(std::vector<future<>> futures)
noexcept {
454 std::exception_ptr excp;
455 for (
auto&& f : futures) {
458 excp = f.get_exception();
461 f.ignore_ready_future();
465 return seastar::make_exception_future<>(std::move(excp));
467 return make_ready_future<>();
471 return seastar::current_exception_as_future<>();
475template<
typename... Futures>
476requires seastar::AllAreFutures<Futures...>
477inline auto when_all_succeed_impl(Futures&&... futures)
noexcept {
478 using state = when_all_state<extract_values_from_futures_tuple<Futures...>, Futures...>;
479 return state::wait_all(std::forward<Futures>(futures)...);
495template <
typename... FutOrFuncs>
497 return internal::when_all_succeed_impl(futurize_invoke_if_func(std::forward<FutOrFuncs>(fut_or_funcs))...);
511template <typename FutureIterator, typename = typename std::iterator_traits<FutureIterator>::value_type>
512requires requires (FutureIterator i) {
514 { i != i } -> std::convertible_to<bool>;
515 requires is_future<std::remove_reference_t<
decltype(*i)>>::value;
519 using itraits = std::iterator_traits<FutureIterator>;
520 using result_transform = internal::extract_values_from_futures_vector<typename itraits::value_type>;
522 return internal::do_when_all<result_transform>(std::move(begin), std::move(end));
544 using result_transform = internal::extract_values_from_futures_vector<future<T>>;
546 return internal::complete_when_all<result_transform>(std::move(futures), futures.begin());
A representation of a possibly not-yet-computed value.
Definition: future.hh:1240
internal::future_stored_type_t< T > value_type
The data type carried by the future.
Definition: future.hh:1317
promise< T > promise_type
The data type carried by the future.
Definition: future.hh:1320
future< T > make_ready_future(A &&... value) noexcept
Creates a future in an available, value state.
Definition: future.hh:1943
future< T > make_exception_future(std::exception_ptr &&value) noexcept
Creates a future in an available, failed state.
Definition: future.hh:1949
future< T > current_exception_as_future() noexcept
Returns std::current_exception() wrapped in a future.
Definition: future.hh:1962
auto when_all_succeed(FutOrFuncs &&... fut_or_funcs) noexcept
Definition: when_all.hh:496
auto when_all(FutOrFuncs &&... fut_or_funcs) noexcept
Definition: when_all.hh:252
auto tuple_map(const std::tuple< Elements... > &t, Function &&f)
Definition: tuple_utils.hh:139
void tuple_for_each(const std::tuple< Elements... > &t, Function &&f)
Definition: tuple_utils.hh:157
holds the implementation parts of the metrics layer, do not use directly.
Seastar API namespace.
Definition: abort_on_ebadf.hh:26