Seastar
High performance C++ framework for concurrent servers
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Modules Pages
all.hh
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18/*
19 * Copyright (C) 2021-present ScyllaDB
20 */
21
22#pragma once
23
24#include <cstddef>
25#include <concepts>
26#include <tuple>
27#include <seastar/core/coroutine.hh>
28
29namespace seastar::coroutine {
30
31template <typename Future>
32constexpr inline bool is_future_v = is_future<Future>::value;
33
34template <typename Future>
35concept future_type = is_future_v<Future>;
36
37namespace internal {
38
39// Given a bunch of futures, find the indexes of the ones that are not avoid
40// and store them in member type `type` as an std::integer_sequence.
41//
42// `IndexSequence` and `current` are intermediates used for recursion.
43template <typename IndexSequence, size_t current, typename... Futures>
44struct index_sequence_for_non_void_futures_helper;
45
46// Terminate recursion be returning the accumulated `IndexSequence`
47template <typename IndexSequence, size_t current>
48struct index_sequence_for_non_void_futures_helper<IndexSequence, current> {
49 using type = IndexSequence;
50};
51
52// Process a future<T> by adding it to the current IndexSequence and recursing
53template <size_t... Existing, size_t current, typename T, typename... Futures>
54struct index_sequence_for_non_void_futures_helper<std::integer_sequence<size_t, Existing...>, current, future<T>, Futures...> {
55 using type = typename index_sequence_for_non_void_futures_helper<std::integer_sequence<size_t, Existing..., current>, current + 1, Futures...>::type;
56};
57
58// Process a future<void> by ignoring it and recursing
59template <size_t... Existing, size_t current, typename... Futures>
60struct index_sequence_for_non_void_futures_helper<std::integer_sequence<size_t, Existing...>, current, future<>, Futures...> {
61 using type = typename index_sequence_for_non_void_futures_helper<std::integer_sequence<size_t, Existing...>, current + 1, Futures...>::type;
62};
63
64// Simple interface for the above.
65template <typename... Futures>
66using index_sequence_for_non_void_futures = typename index_sequence_for_non_void_futures_helper<std::integer_sequence<size_t>, 0, Futures...>::type;
67
68// Given a tuple of futures, return a tuple of the value types, excluding future<void>.
69template <typename IndexSequence, typename FutureTuple>
70struct value_tuple_for_non_void_futures_helper;
71
72template <size_t... Idx, typename FutureTuple>
73struct value_tuple_for_non_void_futures_helper<std::integer_sequence<size_t, Idx...>, FutureTuple> {
74 using type = std::tuple<typename std::tuple_element_t<Idx, FutureTuple>::value_type...>;
75};
76
77// Simple interface for the above
78template <typename... Futures>
79using value_tuple_for_non_void_futures = typename value_tuple_for_non_void_futures_helper<index_sequence_for_non_void_futures<Futures...>, std::tuple<Futures...>>::type;
80
81}
82
115template <typename... Futures>
116requires (sizeof ...(Futures) > 0)
117class [[nodiscard("must co_await an all() object")]] all {
118 using tuple = std::tuple<Futures...>;
119 using value_tuple = typename internal::value_tuple_for_non_void_futures<Futures...>;
120 struct awaiter;
121 template <size_t idx>
122 struct intermediate_task final : continuation_base_from_future_t<std::tuple_element_t<idx, tuple>> {
123 awaiter& container;
124 explicit intermediate_task(awaiter& container) : container(container) {}
125 virtual void run_and_dispose() noexcept {
126 using value_type = typename std::tuple_element_t<idx, tuple>::value_type;
127 if (__builtin_expect(this->_state.failed(), false)) {
129 std::get<idx>(container.state._futures) = futurator::make_exception_future(std::move(this->_state).get_exception());
130 } else {
131 if constexpr (std::same_as<std::tuple_element_t<idx, tuple>, future<>>) {
132 std::get<idx>(container.state._futures) = make_ready_future<>();
133 } else {
134 std::get<idx>(container.state._futures) = make_ready_future<value_type>(std::move(this->_state).get());
135 }
136 }
137 awaiter& c = container;
138 this->~intermediate_task();
139 c.template process<idx+1>();
140 }
141 };
142 template <typename IndexSequence>
143 struct generate_aligned_union;
144 template <size_t... idx>
145 struct generate_aligned_union<std::integer_sequence<size_t, idx...>> {
146 static constexpr std::size_t alignment_value = std::max({alignof(intermediate_task<idx>)...});
147 using type = std::byte[std::max({sizeof(intermediate_task<idx>)...})];
148 };
149 using continuation_storage = generate_aligned_union<std::make_index_sequence<std::tuple_size_v<tuple>>>;
150 using coroutine_handle_t = std::coroutine_handle<void>;
151private:
152 tuple _futures;
153private:
154 struct awaiter {
155 all& state;
156 alignas(continuation_storage::alignment_value) typename continuation_storage::type _continuation_storage;
157 coroutine_handle_t when_ready;
158 awaiter(all& state) : state(state) {}
159 bool await_ready() const {
160 return std::apply([] (const Futures&... futures) {
161 return (... && futures.available());
162 }, state._futures);
163 }
164 void await_suspend(coroutine_handle_t h) {
165 when_ready = h;
166 process<0>();
167 }
168 value_tuple await_resume() {
169 std::apply([] (Futures&... futures) {
170 std::exception_ptr e;
171 // Call get_exception for every failed future, to avoid exceptional future
172 // ignored warnings.
173 (void)(..., (futures.failed() ? (e = futures.get_exception(), 0) : 0));
174 if (e) {
175 std::rethrow_exception(std::move(e));
176 }
177 }, state._futures);
178 // This immediately-invoked lambda is used to materialize the indexes
179 // of non-void futures in the tuple.
180 return [&] <size_t... Idx> (std::integer_sequence<size_t, Idx...>) {
181 return value_tuple(std::get<Idx>(state._futures).get()...);
182 } (internal::index_sequence_for_non_void_futures<Futures...>());
183 }
184 template <unsigned idx>
185 void process() {
186 if constexpr (idx == sizeof...(Futures)) {
187 when_ready.resume();
188 } else {
189 if (!std::get<idx>(state._futures).available()) {
190 auto task = new (&_continuation_storage) intermediate_task<idx>(*this);
191 seastar::internal::set_callback(std::move(std::get<idx>(state._futures)), task);
192 } else {
193 process<idx + 1>();
194 }
195 }
196 }
197 };
198public:
199 template <typename... Func>
200 requires (... && std::invocable<Func>) && (... && future_type<std::invoke_result_t<Func>>)
201 explicit all(Func&&... funcs)
202 : _futures(futurize_invoke(funcs)...) {
203 }
204 awaiter operator co_await() { return awaiter{*this}; }
205};
206
207template <typename FirstFunc, typename... MoreFuncs>
208explicit all(FirstFunc&&, MoreFuncs&&...) -> all<std::invoke_result_t<FirstFunc>,
209 std::invoke_result_t<MoreFuncs>...>;
210
211}
Definition: all.hh:117
A representation of a possibly not-yet-computed value.
Definition: future.hh:1240
Definition: task.hh:34
future< T > make_exception_future(std::exception_ptr &&value) noexcept
Creates a future in an available, failed state.
Definition: future.hh:1949
STL namespace.
Converts a type to a future type, if it isn't already.
Definition: future.hh:1853