Seastar
High performance C++ framework for concurrent servers
shared_future.hh
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18
19/*
20 * Copyright (C) 2015 ScyllaDB
21 */
22
23#pragma once
24
25#ifndef SEASTAR_MODULE
26#include <seastar/core/future.hh>
27#include <seastar/core/abortable_fifo.hh>
28#include <seastar/core/abort_on_expiry.hh>
29#include <seastar/core/timed_out_error.hh>
30#include <seastar/util/modules.hh>
31#include <exception>
32#include <optional>
33#endif
34
35namespace seastar {
36
39
41template<typename Clock>
42struct with_clock {};
43
45
46template <typename... T>
47struct future_option_traits;
48
49template <typename Clock, typename T>
50struct future_option_traits<with_clock<Clock>, T> {
51 using clock_type = Clock;
52
53 using future_type = future<T>;
54 using promise_type = promise<T>;
55};
56
57template <typename Clock>
58struct future_option_traits<with_clock<Clock>> {
59 using clock_type = Clock;
60
61 using future_type = future<>;
62 using promise_type = promise<>;
63};
64
65template <typename T>
66struct future_option_traits<T> : public future_option_traits<with_clock<lowres_clock>, T> {
67};
68
69template <>
70struct future_option_traits<> : future_option_traits<void> {
71};
72
74
107template<typename... T>
109 template <typename... U> friend class shared_promise;
110 using options = future_option_traits<T...>;
111public:
112 using clock = typename options::clock_type;
113 using time_point = typename clock::time_point;
114 using future_type = typename future_option_traits<T...>::future_type;
115 using promise_type = typename future_option_traits<T...>::promise_type;
116 using value_tuple_type = typename future_type::tuple_type;
117private:
119 class shared_state final : public enable_lw_shared_from_this<shared_state>, public task {
120 future_type _original_future;
121 // Ensures that shared_state is alive until run_and_dispose() runs (if the task was scheduled)
122 lw_shared_ptr<shared_state> _keepaliver;
123 struct entry {
124 promise_type pr;
125 std::optional<abort_on_expiry<clock>> timer;
126 };
127
128 struct entry_expiry {
129 void operator()(entry& e) noexcept {
130 if (e.timer) {
131 e.pr.set_exception(std::make_exception_ptr(timed_out_error()));
132 } else {
133 e.pr.set_exception(std::make_exception_ptr(abort_requested_exception()));
134 }
135 };
136 };
137
138 internal::abortable_fifo<entry, entry_expiry> _peers;
139
140 public:
141 ~shared_state() {
142 // Don't warn if the shared future is exceptional. Any
143 // warnings will be reported by the futures returned by
144 // get_future.
145 if (_original_future.failed()) {
146 _original_future.ignore_ready_future();
147 }
148 }
149 explicit shared_state(future_type f) noexcept
150 : task(default_scheduling_group()) // SG is set later, when the task is scheduled
151 , _original_future(std::move(f)) {
152 }
153 void run_and_dispose() noexcept override {
154 auto& state = _original_future._state;
155 if (_original_future.failed()) {
156 while (_peers) {
157 _peers.front().pr.set_exception(state.get_exception());
158 _peers.pop_front();
159 }
160 } else {
161 while (_peers) {
162 auto& p = _peers.front().pr;
163 try {
164 p.set_value(state.get_value());
165 } catch (...) {
166 p.set_exception(std::current_exception());
167 }
168 _peers.pop_front();
169 }
170 }
171 // _peer is now empty, but let's also make sure it releases any
172 // memory it might hold in reserve.
173 _peers = {};
174 _keepaliver.release();
175 }
176
177 task* waiting_task() noexcept override {
178 return nullptr;
179 }
180
181 future_type get_future(time_point timeout = time_point::max()) noexcept {
182 // Note that some functions called below may throw,
183 // like pushing to _peers or copying _original_future's ready value.
184 // We'd rather terminate than propagate these errors similar to
185 // .then()'s failure to allocate a continuation as the caller cannot
186 // distinguish between an error returned by the original future to
187 // failing to perform `get_future` itself.
189 if (!_original_future.available()) {
190 entry& e = _peers.emplace_back();
191
192 auto f = e.pr.get_future();
193 if (timeout != time_point::max()) {
194 e.timer.emplace(timeout);
195 abort_source& as = e.timer->abort_source();
196 _peers.make_back_abortable(as);
197 }
198 if (!_keepaliver) {
199 this->set_scheduling_group(current_scheduling_group());
200 _original_future.set_task(*this);
201 _keepaliver = this->shared_from_this();
202 }
203 return f;
204 } else if (_original_future.failed()) {
205 return future_type(exception_future_marker(), std::exception_ptr(_original_future._state.get_exception()));
206 } else {
207 return future_type(ready_future_marker(), _original_future._state.get_value());
208 }
209 }
210
211 future_type get_future(abort_source& as) noexcept {
212 // Note that some functions called below may throw,
213 // like pushing to _peers or copying _original_future's ready value.
214 // We'd rather terminate than propagate these errors similar to
215 // .then()'s failure to allocate a continuation as the caller cannot
216 // distinguish between an error returned by the original future to
217 // failing to perform `get_future` itself.
219 if (!_original_future.available()) {
220 entry& e = _peers.emplace_back();
221
222 auto f = e.pr.get_future();
223 _peers.make_back_abortable(as);
224 if (!_keepaliver) {
225 this->set_scheduling_group(current_scheduling_group());
226 _original_future.set_task(*this);
227 _keepaliver = this->shared_from_this();
228 }
229 return f;
230 } else if (_original_future.failed()) {
231 return future_type(exception_future_marker(), std::exception_ptr(_original_future._state.get_exception()));
232 } else {
233 return future_type(ready_future_marker(), _original_future._state.get_value());
234 }
235 }
236
237 bool available() const noexcept {
238 return _original_future.available();
239 }
240
241 bool failed() const noexcept {
242 return _original_future.failed();
243 }
244
245 // Used only in tests (see shared_future_tester in futures_test.cc)
246 bool has_scheduled_task() const noexcept {
247 return _keepaliver != nullptr;
248 }
249 };
252public:
254 shared_future(future_type f)
255 : _state(make_lw_shared<shared_state>(std::move(f))) { }
256
257 shared_future() = default; // noexcept, based on the respective lw_shared_ptr constructor
258 shared_future(const shared_future&) = default; // noexcept, based on the respective lw_shared_ptr constructor
259 shared_future& operator=(const shared_future&) = default; // noexcept, based on respective constructor
260 shared_future(shared_future&&) = default; // noexcept, based on the respective lw_shared_ptr constructor
261 shared_future& operator=(shared_future&&) = default; // noexcept, based on the respective constructor
262
269 future_type get_future(time_point timeout = time_point::max()) const noexcept {
270 return _state->get_future(timeout);
271 }
272
279 future_type get_future(abort_source& as) const noexcept {
280 return _state->get_future(as);
281 }
282
286 bool available() const noexcept {
287 return _state->available();
288 }
289
293 bool failed() const noexcept {
294 return _state->failed();
295 }
296
298 operator future_type() const noexcept {
299 return get_future();
300 }
301
303 bool valid() const noexcept {
304 return bool(_state);
305 }
306
308 friend class shared_future_tester;
310};
311
317SEASTAR_MODULE_EXPORT
318template <typename... T>
320public:
321 using shared_future_type = shared_future<T...>;
322 using future_type = typename shared_future_type::future_type;
323 using promise_type = typename shared_future_type::promise_type;
324 using clock = typename shared_future_type::clock;
325 using time_point = typename shared_future_type::time_point;
326 using value_tuple_type = typename shared_future_type::value_tuple_type;
327private:
328 promise_type _promise;
329 shared_future_type _shared_future;
330 static constexpr bool copy_noexcept = future_type::copy_noexcept;
331public:
332 shared_promise(const shared_promise&) = delete;
333 shared_promise(shared_promise&&) = default; // noexcept, based on the respective promise and shared_future constructors
334 shared_promise& operator=(shared_promise&&) = default; // noexcept, based on the respective promise and shared_future constructors
335 shared_promise() : _promise(), _shared_future(_promise.get_future()) {
336 }
337
341 future_type get_shared_future(time_point timeout = time_point::max()) const noexcept {
342 return _shared_future.get_future(timeout);
343 }
344
349 future_type get_shared_future(abort_source& as) const noexcept {
350 return _shared_future.get_future(as);
351 }
352
354 void set_value(const value_tuple_type& result) noexcept(copy_noexcept) {
355 _promise.set_value(result);
356 }
357
359 void set_value(value_tuple_type&& result) noexcept {
360 _promise.set_value(std::move(result));
361 }
362
364 template <typename... A>
365 void set_value(A&&... a) noexcept {
366 _promise.set_value(std::forward<A>(a)...);
367 }
368
370 void set_exception(std::exception_ptr ex) noexcept {
371 _promise.set_exception(std::move(ex));
372 }
373
375 template<typename Exception>
376 void set_exception(Exception&& e) noexcept {
377 set_exception(make_exception_ptr(std::forward<Exception>(e)));
378 }
379
381 bool available() const noexcept {
382 return _shared_future.available();
383 }
384
386 bool failed() const noexcept {
387 return _shared_future.failed();
388 }
389};
390
392
393}
Definition: abort_source.hh:48
Definition: abort_source.hh:58
Definition: shared_ptr.hh:148
A representation of a possibly not-yet-computed value.
Definition: future.hh:1240
promise - allows a future value to be made available at a later time.
Definition: future.hh:934
Like future except the result can be waited for by many fibers.
Definition: shared_future.hh:108
bool available() const noexcept
Returns true if the future is available (ready or failed)
Definition: shared_future.hh:286
shared_future(future_type f)
Forwards the result of future f into this shared_future.
Definition: shared_future.hh:254
future_type get_future(abort_source &as) const noexcept
Creates a new future which will resolve with the result of this shared_future.
Definition: shared_future.hh:279
bool failed() const noexcept
Returns true if the future is failed.
Definition: shared_future.hh:293
bool valid() const noexcept
Returns true if the instance is in valid state.
Definition: shared_future.hh:303
future_type get_future(time_point timeout=time_point::max()) const noexcept
Creates a new future which will resolve with the result of this shared_future.
Definition: shared_future.hh:269
Like promise except that its counterpart is shared_future instead of future.
Definition: shared_future.hh:319
void set_exception(Exception &&e) noexcept
Marks the shared_promise as failed, same as normal promise.
Definition: shared_future.hh:376
void set_value(value_tuple_type &&result) noexcept
Sets the shared_promise's value (as tuple; by moving), same as normal promise.
Definition: shared_future.hh:359
bool failed() const noexcept
Returns true if the underlying future is failed.
Definition: shared_future.hh:386
void set_exception(std::exception_ptr ex) noexcept
Marks the shared_promise as failed, same as normal promise.
Definition: shared_future.hh:370
void set_value(const value_tuple_type &result) noexcept(copy_noexcept)
Sets the shared_promise's value (as tuple; by copying), same as normal promise.
Definition: shared_future.hh:354
future_type get_shared_future(time_point timeout=time_point::max()) const noexcept
Gets new future associated with this promise. If the promise is not resolved before timeout the retur...
Definition: shared_future.hh:341
future_type get_shared_future(abort_source &as) const noexcept
Gets new future associated with this promise. If the promise is not resolved before abort source is t...
Definition: shared_future.hh:349
void set_value(A &&... a) noexcept
Sets the shared_promise's value (variadic), same as normal promise.
Definition: shared_future.hh:365
bool available() const noexcept
Returns true if the underlying future is available (ready or failed)
Definition: shared_future.hh:381
Definition: task.hh:34
Definition: timed_out_error.hh:34
Definition: timer.hh:83
Definition: future.hh:566
Definition: future.hh:565
Changes the clock used by shared_future<> and shared_promise<> when passed as the first template para...
Definition: shared_future.hh:42
Definition: critical_alloc_section.hh:80
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
scheduling_group current_scheduling_group() noexcept
Returns the current scheduling group.
Definition: scheduling.hh:397
STL namespace.