Seastar
High performance C++ framework for concurrent servers
shared_future.hh
1 /*
2  * This file is open source software, licensed to you under the terms
3  * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4  * distributed with this work for additional information regarding copyright
5  * ownership. You may not use this file except in compliance with the License.
6  *
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an
13  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14  * KIND, either express or implied. See the License for the
15  * specific language governing permissions and limitations
16  * under the License.
17  */
18 
19 /*
20  * Copyright (C) 2015 ScyllaDB
21  */
22 
23 #pragma once
24 
25 #ifndef SEASTAR_MODULE
26 #include <seastar/core/future.hh>
27 #include <seastar/core/abortable_fifo.hh>
28 #include <seastar/core/abort_on_expiry.hh>
29 #include <seastar/core/timed_out_error.hh>
30 #include <seastar/util/modules.hh>
31 #include <exception>
32 #include <optional>
33 #endif
34 
35 namespace seastar {
36 
39 
41 template<typename Clock>
42 struct with_clock {};
43 
45 
46 template <typename... T>
47 struct future_option_traits;
48 
49 template <typename Clock, typename T>
50 struct future_option_traits<with_clock<Clock>, T> {
51  using clock_type = Clock;
52 
53  using future_type = future<T>;
54  using promise_type = promise<T>;
55 };
56 
57 template <typename Clock>
58 struct future_option_traits<with_clock<Clock>> {
59  using clock_type = Clock;
60 
61  using future_type = future<>;
62  using promise_type = promise<>;
63 };
64 
65 template <typename T>
66 struct future_option_traits<T> : public future_option_traits<with_clock<lowres_clock>, T> {
67 };
68 
69 template <>
70 struct future_option_traits<> : future_option_traits<void> {
71 };
72 
74 
107 template<typename... T>
109  template <typename... U> friend class shared_promise;
110  using options = future_option_traits<T...>;
111 public:
112  using clock = typename options::clock_type;
113  using time_point = typename clock::time_point;
114  using future_type = typename future_option_traits<T...>::future_type;
115  using promise_type = typename future_option_traits<T...>::promise_type;
116  using value_tuple_type = typename future_type::tuple_type;
117 private:
119  class shared_state final : public enable_lw_shared_from_this<shared_state>, public task {
120  future_type _original_future;
121  // Ensures that shared_state is alive until run_and_dispose() runs (if the task was scheduled)
122  lw_shared_ptr<shared_state> _keepaliver;
123  struct entry {
124  promise_type pr;
125  std::optional<abort_on_expiry<clock>> timer;
126  };
127 
128  struct entry_expiry {
129  void operator()(entry& e) noexcept {
130  if (e.timer) {
131  e.pr.set_exception(std::make_exception_ptr(timed_out_error()));
132  } else {
133  e.pr.set_exception(std::make_exception_ptr(abort_requested_exception()));
134  }
135  };
136  };
137 
138  internal::abortable_fifo<entry, entry_expiry> _peers;
139 
140  public:
141  ~shared_state() {
142  // Don't warn if the shared future is exceptional. Any
143  // warnings will be reported by the futures returned by
144  // get_future.
145  if (_original_future.failed()) {
146  _original_future.ignore_ready_future();
147  }
148  }
149  explicit shared_state(future_type f) noexcept
150  : task(default_scheduling_group()) // SG is set later, when the task is scheduled
151  , _original_future(std::move(f)) {
152  }
153  void run_and_dispose() noexcept override {
154  auto& state = _original_future._state;
155  if (_original_future.failed()) {
156  while (_peers) {
157  _peers.front().pr.set_exception(state.get_exception());
158  _peers.pop_front();
159  }
160  } else {
161  while (_peers) {
162  auto& p = _peers.front().pr;
163  try {
164  p.set_value(state.get_value());
165  } catch (...) {
166  p.set_exception(std::current_exception());
167  }
168  _peers.pop_front();
169  }
170  }
171  _keepaliver.release();
172  }
173 
174  task* waiting_task() noexcept override {
175  return nullptr;
176  }
177 
178  future_type get_future(time_point timeout = time_point::max()) noexcept {
179  // Note that some functions called below may throw,
180  // like pushing to _peers or copying _original_future's ready value.
181  // We'd rather terminate than propagate these errors similar to
182  // .then()'s failure to allocate a continuation as the caller cannot
183  // distinguish between an error returned by the original future to
184  // failing to perform `get_future` itself.
186  if (!_original_future.available()) {
187  entry& e = _peers.emplace_back();
188 
189  auto f = e.pr.get_future();
190  if (timeout != time_point::max()) {
191  e.timer.emplace(timeout);
192  abort_source& as = e.timer->abort_source();
193  _peers.make_back_abortable(as);
194  }
195  if (!_keepaliver) {
196  this->set_scheduling_group(current_scheduling_group());
197  _original_future.set_task(*this);
198  _keepaliver = this->shared_from_this();
199  }
200  return f;
201  } else if (_original_future.failed()) {
202  return future_type(exception_future_marker(), std::exception_ptr(_original_future._state.get_exception()));
203  } else {
204  return future_type(ready_future_marker(), _original_future._state.get_value());
205  }
206  }
207 
208  future_type get_future(abort_source& as) noexcept {
209  // Note that some functions called below may throw,
210  // like pushing to _peers or copying _original_future's ready value.
211  // We'd rather terminate than propagate these errors similar to
212  // .then()'s failure to allocate a continuation as the caller cannot
213  // distinguish between an error returned by the original future to
214  // failing to perform `get_future` itself.
216  if (!_original_future.available()) {
217  entry& e = _peers.emplace_back();
218 
219  auto f = e.pr.get_future();
220  _peers.make_back_abortable(as);
221  if (!_keepaliver) {
222  this->set_scheduling_group(current_scheduling_group());
223  _original_future.set_task(*this);
224  _keepaliver = this->shared_from_this();
225  }
226  return f;
227  } else if (_original_future.failed()) {
228  return future_type(exception_future_marker(), std::exception_ptr(_original_future._state.get_exception()));
229  } else {
230  return future_type(ready_future_marker(), _original_future._state.get_value());
231  }
232  }
233 
234  bool available() const noexcept {
235  return _original_future.available();
236  }
237 
238  bool failed() const noexcept {
239  return _original_future.failed();
240  }
241 
242  // Used only in tests (see shared_future_tester in futures_test.cc)
243  bool has_scheduled_task() const noexcept {
244  return _keepaliver != nullptr;
245  }
246  };
249 public:
251  shared_future(future_type f)
252  : _state(make_lw_shared<shared_state>(std::move(f))) { }
253 
254  shared_future() = default; // noexcept, based on the respective lw_shared_ptr constructor
255  shared_future(const shared_future&) = default; // noexcept, based on the respective lw_shared_ptr constructor
256  shared_future& operator=(const shared_future&) = default; // noexcept, based on respective constructor
257  shared_future(shared_future&&) = default; // noexcept, based on the respective lw_shared_ptr constructor
258  shared_future& operator=(shared_future&&) = default; // noexcept, based on the respective constructor
259 
266  future_type get_future(time_point timeout = time_point::max()) const noexcept {
267  return _state->get_future(timeout);
268  }
269 
276  future_type get_future(abort_source& as) const noexcept {
277  return _state->get_future(as);
278  }
279 
283  bool available() const noexcept {
284  return _state->available();
285  }
286 
290  bool failed() const noexcept {
291  return _state->failed();
292  }
293 
295  operator future_type() const noexcept {
296  return get_future();
297  }
298 
300  bool valid() const noexcept {
301  return bool(_state);
302  }
303 
305  friend class shared_future_tester;
307 };
308 
314 SEASTAR_MODULE_EXPORT
315 template <typename... T>
317 public:
318  using shared_future_type = shared_future<T...>;
319  using future_type = typename shared_future_type::future_type;
320  using promise_type = typename shared_future_type::promise_type;
321  using clock = typename shared_future_type::clock;
322  using time_point = typename shared_future_type::time_point;
323  using value_tuple_type = typename shared_future_type::value_tuple_type;
324 private:
325  promise_type _promise;
326  shared_future_type _shared_future;
327  static constexpr bool copy_noexcept = future_type::copy_noexcept;
328 public:
329  shared_promise(const shared_promise&) = delete;
330  shared_promise(shared_promise&&) = default; // noexcept, based on the respective promise and shared_future constructors
331  shared_promise& operator=(shared_promise&&) = default; // noexcept, based on the respective promise and shared_future constructors
332  shared_promise() : _promise(), _shared_future(_promise.get_future()) {
333  }
334 
338  future_type get_shared_future(time_point timeout = time_point::max()) const noexcept {
339  return _shared_future.get_future(timeout);
340  }
341 
346  future_type get_shared_future(abort_source& as) const noexcept {
347  return _shared_future.get_future(as);
348  }
349 
351  void set_value(const value_tuple_type& result) noexcept(copy_noexcept) {
352  _promise.set_value(result);
353  }
354 
356  void set_value(value_tuple_type&& result) noexcept {
357  _promise.set_value(std::move(result));
358  }
359 
361  template <typename... A>
362  void set_value(A&&... a) noexcept {
363  _promise.set_value(std::forward<A>(a)...);
364  }
365 
367  void set_exception(std::exception_ptr ex) noexcept {
368  _promise.set_exception(std::move(ex));
369  }
370 
372  template<typename Exception>
373  void set_exception(Exception&& e) noexcept {
374  set_exception(make_exception_ptr(std::forward<Exception>(e)));
375  }
376 
378  bool available() const noexcept {
379  return _shared_future.available();
380  }
381 
383  bool failed() const noexcept {
384  return _shared_future.failed();
385  }
386 };
387 
389 
390 }
Definition: abort_source.hh:49
Definition: abort_source.hh:59
Definition: shared_ptr.hh:150
A representation of a possibly not-yet-computed value.
Definition: future.hh:1238
promise - allows a future value to be made available at a later time.
Definition: future.hh:926
Like future except the result can be waited for by many fibers.
Definition: shared_future.hh:108
bool available() const noexcept
Returns true if the future is available (ready or failed)
Definition: shared_future.hh:283
shared_future(future_type f)
Forwards the result of future f into this shared_future.
Definition: shared_future.hh:251
future_type get_future(abort_source &as) const noexcept
Creates a new future which will resolve with the result of this shared_future.
Definition: shared_future.hh:276
bool failed() const noexcept
Returns true if the future is failed.
Definition: shared_future.hh:290
bool valid() const noexcept
Returns true if the instance is in valid state.
Definition: shared_future.hh:300
future_type get_future(time_point timeout=time_point::max()) const noexcept
Creates a new future which will resolve with the result of this shared_future.
Definition: shared_future.hh:266
Like promise except that its counterpart is shared_future instead of future.
Definition: shared_future.hh:316
void set_exception(Exception &&e) noexcept
Marks the shared_promise as failed, same as normal promise.
Definition: shared_future.hh:373
void set_value(value_tuple_type &&result) noexcept
Sets the shared_promise's value (as tuple; by moving), same as normal promise.
Definition: shared_future.hh:356
bool failed() const noexcept
Returns true if the underlying future is failed.
Definition: shared_future.hh:383
void set_exception(std::exception_ptr ex) noexcept
Marks the shared_promise as failed, same as normal promise.
Definition: shared_future.hh:367
void set_value(const value_tuple_type &result) noexcept(copy_noexcept)
Sets the shared_promise's value (as tuple; by copying), same as normal promise.
Definition: shared_future.hh:351
future_type get_shared_future(time_point timeout=time_point::max()) const noexcept
Gets new future associated with this promise. If the promise is not resolved before timeout the retur...
Definition: shared_future.hh:338
future_type get_shared_future(abort_source &as) const noexcept
Gets new future associated with this promise. If the promise is not resolved before abort source is t...
Definition: shared_future.hh:346
void set_value(A &&... a) noexcept
Sets the shared_promise's value (variadic), same as normal promise.
Definition: shared_future.hh:362
bool available() const noexcept
Returns true if the underlying future is available (ready or failed)
Definition: shared_future.hh:378
Definition: task.hh:35
Definition: timed_out_error.hh:33
Definition: timer.hh:84
Definition: future.hh:558
Definition: future.hh:557
Changes the clock used by shared_future<> and shared_promise<> when passed as the first template para...
Definition: shared_future.hh:42
Definition: critical_alloc_section.hh:80
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
scheduling_group current_scheduling_group() noexcept
Returns the current scheduling group.
Definition: scheduling.hh:400