Seastar
High performance C++ framework for concurrent servers
future-util.hh
Go to the documentation of this file.
1 /*
2  * This file is open source software, licensed to you under the terms
3  * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4  * distributed with this work for additional information regarding copyright
5  * ownership. You may not use this file except in compliance with the License.
6  *
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an
13  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14  * KIND, either express or implied. See the License for the
15  * specific language governing permissions and limitations
16  * under the License.
17  */
18 /*
19  * Copyright (C) 2014 Cloudius Systems, Ltd.
20  */
21 
22 
25 #pragma once
26 
27 #include <seastar/core/task.hh>
28 #include <seastar/core/future.hh>
29 #include <seastar/core/shared_ptr.hh>
30 #include <seastar/core/do_with.hh>
31 #include <seastar/core/timer.hh>
32 #include <seastar/util/bool_class.hh>
33 #include <tuple>
34 #include <iterator>
35 #include <vector>
36 #include <seastar/util/std-compat.hh>
37 #include <seastar/util/tuple_utils.hh>
38 #include <seastar/util/noncopyable_function.hh>
39 
40 namespace seastar {
41 
43 extern __thread size_t task_quota;
45 
46 
48 namespace internal {
49 
50 template <typename Func>
51 void
52 schedule_in_group(scheduling_group sg, Func func) {
53  schedule(make_task(sg, std::move(func)));
54 }
55 
56 
57 }
59 
62 
73 template <typename Func, typename... Args>
74 inline
75 auto
76 with_scheduling_group(scheduling_group sg, Func func, Args&&... args) {
77  using return_type = decltype(func(std::forward<Args>(args)...));
78  using futurator = futurize<return_type>;
79  if (sg.active()) {
80  return futurator::apply(func, std::forward<Args>(args)...);
81  } else {
82  typename futurator::promise_type pr;
83  auto f = pr.get_future();
84  internal::schedule_in_group(sg, [pr = std::move(pr), func = std::move(func), args = std::make_tuple(std::forward<Args>(args)...)] () mutable {
85  return futurator::apply(func, std::move(args)).forward_to(std::move(pr));
86  });
87  return f;
88  }
89 }
90 
92 
93 struct parallel_for_each_state {
94  // use optional<> to avoid out-of-line constructor
95  compat::optional<std::exception_ptr> ex;
96  promise<> pr;
97  ~parallel_for_each_state() {
98  if (ex) {
99  pr.set_exception(std::move(*ex));
100  } else {
101  pr.set_value();
102  }
103  }
104 };
105 
107 
122 template <typename Iterator, typename Func>
123 GCC6_CONCEPT( requires requires (Func f, Iterator i) { { f(*i++) } -> future<>; } )
124 inline
125 future<>
126 parallel_for_each(Iterator begin, Iterator end, Func&& func) {
128  while (begin != end) {
129  auto f = futurize_apply(std::forward<Func>(func), *begin++);
130  if (__builtin_expect(!f.available() || f.failed(), false)) {
131  if (!state) {
132  if (begin == end) {
133  // Only the last element was not immediately ready (likely if
134  // there is exactly one element)
135  return f;
136  }
137  [&state] () noexcept {
139  state = make_lw_shared<parallel_for_each_state>();
140  }();
141  }
142  f.then_wrapped([state] (future<> f) {
143  if (f.failed()) {
144  // We can only store one exception. For more, use when_all().
145  if (!state->ex) {
146  state->ex = f.get_exception();
147  } else {
149  }
150  }
151  });
152  }
153  }
154  if (__builtin_expect(bool(state), false)) {
155  return state->pr.get_future();
156  }
157  return make_ready_future<>();
158 }
159 
174 template <typename Range, typename Func>
175 GCC6_CONCEPT( requires requires (Func f, Range r) { { f(*r.begin()) } -> future<>; } )
176 inline
177 future<>
178 parallel_for_each(Range&& range, Func&& func) {
179  return parallel_for_each(std::begin(range), std::end(range),
180  std::forward<Func>(func));
181 }
182 
183 // The AsyncAction concept represents an action which can complete later than
184 // the actual function invocation. It is represented by a function which
185 // returns a future which resolves when the action is done.
186 
189 
191 
192 namespace internal {
193 
194 template <typename AsyncAction>
195 class repeater final : public continuation_base<stop_iteration> {
196  promise<> _promise;
197  AsyncAction _action;
198 public:
199  explicit repeater(AsyncAction action) : _action(std::move(action)) {}
200  repeater(stop_iteration si, AsyncAction action) : repeater(std::move(action)) {
201  _state.set(std::make_tuple(si));
202  }
203  future<> get_future() { return _promise.get_future(); }
204  virtual void run_and_dispose() noexcept override {
205  std::unique_ptr<repeater> zis{this};
206  if (_state.failed()) {
207  _promise.set_exception(std::move(_state).get_exception());
208  return;
209  } else {
210  if (std::get<0>(_state.get()) == stop_iteration::yes) {
211  _promise.set_value();
212  return;
213  }
214  _state = {};
215  }
216  try {
217  do {
218  auto f = _action();
219  if (!f.available()) {
220  internal::set_callback(f, std::move(zis));
221  return;
222  }
223  if (f.get0() == stop_iteration::yes) {
224  _promise.set_value();
225  return;
226  }
227  } while (!need_preempt());
228  } catch (...) {
229  _promise.set_exception(std::current_exception());
230  return;
231  }
232  _state.set(stop_iteration::no);
233  schedule(std::move(zis));
234  }
235 };
236 
237 template <typename AsyncAction, bool ReturnsFuture = true>
238 struct futurized_action_helper {
239  using type = AsyncAction;
240 };
241 
242 template <typename AsyncAction>
243 struct futurized_action_helper<AsyncAction, false> {
244  struct wrapper {
245  AsyncAction action;
246  using orig_ret = std::result_of_t<AsyncAction()>;
247  explicit wrapper(AsyncAction&& action) : action(std::move(action)) {}
248  futurize_t<orig_ret> operator()() {
249  return futurize<orig_ret>::convert(action());
250  };
251  };
252  using type = wrapper;
253 };
254 
255 template <typename AsyncAction>
256 struct futurized_action {
257  using type = typename futurized_action_helper<AsyncAction, is_future<std::result_of_t<AsyncAction()>>::value>::type;
258 };
259 
260 
261 }
262 
264 
274 template<typename AsyncAction>
275 GCC6_CONCEPT( requires seastar::ApplyReturns<AsyncAction, stop_iteration> || seastar::ApplyReturns<AsyncAction, future<stop_iteration>> )
276 inline
277 future<> repeat(AsyncAction action) {
278  using futurator = futurize<std::result_of_t<AsyncAction()>>;
279  static_assert(std::is_same<future<stop_iteration>, typename futurator::type>::value, "bad AsyncAction signature");
280  using futurized_action_type = typename internal::futurized_action<AsyncAction>::type;
281  auto futurized_action = futurized_action_type(std::move(action));
282  try {
283  do {
284  // Do not type-erase here in case this is a short repeat()
285  auto f = futurized_action();
286 
287  if (!f.available()) {
288  return [&] () noexcept {
290  auto repeater = std::make_unique<internal::repeater<futurized_action_type>>(std::move(futurized_action));
291  auto ret = repeater->get_future();
292  internal::set_callback(f, std::move(repeater));
293  return ret;
294  }();
295  }
296 
297  if (f.get0() == stop_iteration::yes) {
298  return make_ready_future<>();
299  }
300  } while (!need_preempt());
301 
302  auto repeater = std::make_unique<internal::repeater<futurized_action_type>>(stop_iteration::no, std::move(futurized_action));
303  auto ret = repeater->get_future();
304  schedule(std::move(repeater));
305  return ret;
306  } catch (...) {
307  return make_exception_future(std::current_exception());
308  }
309 }
310 
312 
313 template <typename T>
314 struct repeat_until_value_type_helper;
315 
317 
319 template <typename T>
320 struct repeat_until_value_type_helper<future<compat::optional<T>>> {
322  using value_type = T;
324  using optional_type = compat::optional<T>;
329 };
330 
332 template <typename AsyncAction>
334  = typename repeat_until_value_type_helper<std::result_of_t<AsyncAction()>>::future_type;
335 
336 namespace internal {
337 
338 template <typename AsyncAction, typename T>
339 class repeat_until_value_state final : public continuation_base<compat::optional<T>> {
340  promise<T> _promise;
341  AsyncAction _action;
342 public:
343  explicit repeat_until_value_state(AsyncAction action) : _action(std::move(action)) {}
344  repeat_until_value_state(compat::optional<T> st, AsyncAction action) : repeat_until_value_state(std::move(action)) {
345  this->_state.set(std::make_tuple(std::move(st)));
346  }
347  future<T> get_future() { return _promise.get_future(); }
348  virtual void run_and_dispose() noexcept override {
349  std::unique_ptr<repeat_until_value_state> zis{this};
350  if (this->_state.failed()) {
351  _promise.set_exception(std::move(this->_state).get_exception());
352  return;
353  } else {
354  auto v = std::get<0>(std::move(this->_state).get());
355  if (v) {
356  _promise.set_value(std::move(*v));
357  return;
358  }
359  this->_state = {};
360  }
361  try {
362  do {
363  auto f = _action();
364  if (!f.available()) {
365  internal::set_callback(f, std::move(zis));
366  return;
367  }
368  auto ret = f.get0();
369  if (ret) {
370  _promise.set_value(std::make_tuple(std::move(*ret)));
371  return;
372  }
373  } while (!need_preempt());
374  } catch (...) {
375  _promise.set_exception(std::current_exception());
376  return;
377  }
378  this->_state.set(compat::nullopt);
379  schedule(std::move(zis));
380  }
381 };
382 
383 }
384 
395 template<typename AsyncAction>
396 GCC6_CONCEPT( requires requires (AsyncAction aa) {
397  requires is_future<decltype(aa())>::value;
398  bool(aa().get0());
399  aa().get0().value();
400 } )
401 repeat_until_value_return_type<AsyncAction>
402 repeat_until_value(AsyncAction action) {
403  using type_helper = repeat_until_value_type_helper<std::result_of_t<AsyncAction()>>;
404  // the "T" in the documentation
405  using value_type = typename type_helper::value_type;
406  using optional_type = typename type_helper::optional_type;
407  using futurized_action_type = typename internal::futurized_action<AsyncAction>::type;
408  auto futurized_action = futurized_action_type(std::move(action));
409  do {
410  auto f = futurized_action();
411 
412  if (!f.available()) {
413  return [&] () noexcept {
415  auto state = std::make_unique<internal::repeat_until_value_state<futurized_action_type, value_type>>(std::move(futurized_action));
416  auto ret = state->get_future();
417  internal::set_callback(f, std::move(state));
418  return ret;
419  }();
420  }
421 
422  if (f.failed()) {
423  return make_exception_future<value_type>(f.get_exception());
424  }
425 
426  optional_type&& optional = std::move(f).get0();
427  if (optional) {
428  return make_ready_future<value_type>(std::move(optional.value()));
429  }
430  } while (!need_preempt());
431 
432  try {
433  auto state = std::make_unique<internal::repeat_until_value_state<futurized_action_type, value_type>>(compat::nullopt, std::move(futurized_action));
434  auto f = state->get_future();
435  schedule(std::move(state));
436  return f;
437  } catch (...) {
438  return make_exception_future<value_type>(std::current_exception());
439  }
440 }
441 
442 namespace internal {
443 
444 template <typename StopCondition, typename AsyncAction>
445 class do_until_state final : public continuation_base<> {
446  promise<> _promise;
447  StopCondition _stop;
448  AsyncAction _action;
449 public:
450  explicit do_until_state(StopCondition stop, AsyncAction action) : _stop(std::move(stop)), _action(std::move(action)) {}
451  future<> get_future() { return _promise.get_future(); }
452  virtual void run_and_dispose() noexcept override {
453  std::unique_ptr<do_until_state> zis{this};
454  if (_state.available()) {
455  if (_state.failed()) {
456  _promise.set_urgent_state(std::move(_state));
457  return;
458  }
459  _state = {}; // allow next cycle to overrun state
460  }
461  try {
462  do {
463  if (_stop()) {
464  _promise.set_value();
465  return;
466  }
467  auto f = _action();
468  if (!f.available()) {
469  internal::set_callback(f, std::move(zis));
470  return;
471  }
472  if (f.failed()) {
473  f.forward_to(std::move(_promise));
474  return;
475  }
476  } while (!need_preempt());
477  } catch (...) {
478  _promise.set_exception(std::current_exception());
479  return;
480  }
481  schedule(std::move(zis));
482  }
483 };
484 
485 }
486 
497 template<typename AsyncAction, typename StopCondition>
498 GCC6_CONCEPT( requires seastar::ApplyReturns<StopCondition, bool> && seastar::ApplyReturns<AsyncAction, future<>> )
499 inline
500 future<> do_until(StopCondition stop_cond, AsyncAction action) {
501  using namespace internal;
502  using futurator = futurize<void>;
503  do {
504  if (stop_cond()) {
505  return make_ready_future<>();
506  }
507  auto f = futurator::apply(action);
508  if (!f.available()) {
509  return [&] () noexcept {
511  auto task = std::make_unique<do_until_state<StopCondition, AsyncAction>>(std::move(stop_cond), std::move(action));
512  auto ret = task->get_future();
513  internal::set_callback(f, std::move(task));
514  return ret;
515  }();
516  }
517  if (f.failed()) {
518  return f;
519  }
520  } while (!need_preempt());
521 
522  auto task = std::make_unique<do_until_state<StopCondition, AsyncAction>>(std::move(stop_cond), std::move(action));
523  auto f = task->get_future();
524  schedule(std::move(task));
525  return f;
526 }
527 
535 template<typename AsyncAction>
536 GCC6_CONCEPT( requires seastar::ApplyReturns<AsyncAction, future<>> )
537 inline
538 future<> keep_doing(AsyncAction action) {
539  return repeat([action = std::move(action)] () mutable {
540  return action().then([] {
541  return stop_iteration::no;
542  });
543  });
544 }
545 
558 template<typename Iterator, typename AsyncAction>
559 GCC6_CONCEPT( requires requires (Iterator i, AsyncAction aa) {
560  { futurize_apply(aa, *i) } -> future<>;
561 } )
562 inline
563 future<> do_for_each(Iterator begin, Iterator end, AsyncAction action) {
564  if (begin == end) {
565  return make_ready_future<>();
566  }
567  while (true) {
568  auto f = futurize<void>::apply(action, *begin);
569  ++begin;
570  if (begin == end) {
571  return f;
572  }
573  if (!f.available() || need_preempt()) {
574  return std::move(f).then([action = std::move(action),
575  begin = std::move(begin), end = std::move(end)] () mutable {
576  return do_for_each(std::move(begin), std::move(end), std::move(action));
577  });
578  }
579  if (f.failed()) {
580  return f;
581  }
582  }
583 }
584 
596 template<typename Container, typename AsyncAction>
597 GCC6_CONCEPT( requires requires (Container c, AsyncAction aa) {
598  { futurize_apply(aa, *c.begin()) } -> future<>
599 } )
600 inline
601 future<> do_for_each(Container& c, AsyncAction action) {
602  return do_for_each(std::begin(c), std::end(c), std::move(action));
603 }
604 
606 namespace internal {
607 
608 template<typename... Futures>
609 struct identity_futures_tuple {
610  using future_type = future<std::tuple<Futures...>>;
611  using promise_type = typename future_type::promise_type;
612 
613  static void set_promise(promise_type& p, std::tuple<Futures...> futures) {
614  p.set_value(std::move(futures));
615  }
616 
617  static future_type make_ready_future(std::tuple<Futures...> futures) {
618  return futurize<future_type>::from_tuple(std::move(futures));
619  }
620 };
621 
622 // Given a future type, find the continuation_base corresponding to that future
623 template <typename Future>
624 struct continuation_base_for_future;
625 
626 template <typename... T>
627 struct continuation_base_for_future<future<T...>> {
628  using type = continuation_base<T...>;
629 };
630 
631 template <typename Future>
632 using continuation_base_for_future_t = typename continuation_base_for_future<Future>::type;
633 
634 class when_all_state_base;
635 
636 // If the future is ready, return true
637 // if the future is not ready, chain a continuation to it, and return false
638 using when_all_process_element_func = bool (*)(void* future, void* continuation, when_all_state_base* wasb);
639 
640 struct when_all_process_element {
641  when_all_process_element_func func;
642  void* future;
643 };
644 
645 class when_all_state_base {
646  size_t _nr_remain;
647  const when_all_process_element* _processors;
648  void* _continuation;
649 public:
650  virtual ~when_all_state_base() {}
651  when_all_state_base(size_t nr_remain, const when_all_process_element* processors, void* continuation)
652  : _nr_remain(nr_remain), _processors(processors), _continuation(continuation) {
653  }
654  void complete_one() {
655  // We complete in reverse order; if the futures happen to complete
656  // in order, then waiting for the last one will find the rest ready
657  --_nr_remain;
658  while (_nr_remain) {
659  bool ready = process_one(_nr_remain - 1);
660  if (!ready) {
661  return;
662  }
663  --_nr_remain;
664  }
665  if (!_nr_remain) {
666  delete this;
667  }
668  }
669  void do_wait_all() {
670  ++_nr_remain; // fake pending completion for complete_one()
671  complete_one();
672  }
673  bool process_one(size_t idx) {
674  auto p = _processors[idx];
675  return p.func(p.future, _continuation, this);
676  }
677 };
678 
679 template <typename Future>
680 class when_all_state_component : public continuation_base_for_future_t<Future> {
681  when_all_state_base* _base;
682  Future* _final_resting_place;
683 public:
684  static bool process_element_func(void* future, void* continuation, when_all_state_base* wasb) {
685  auto f = reinterpret_cast<Future*>(future);
686  if (f->available()) {
687  return true;
688  } else {
689  auto c = new (continuation) when_all_state_component(wasb, f);
690  set_callback(*f, std::unique_ptr<when_all_state_component>(c));
691  return false;
692  }
693  }
694  when_all_state_component(when_all_state_base *base, Future* future) : _base(base), _final_resting_place(future) {}
695  virtual void run_and_dispose() noexcept override {
696  using futurator = futurize<Future>;
697  if (__builtin_expect(this->_state.failed(), false)) {
698  *_final_resting_place = futurator::make_exception_future(std::move(this->_state).get_exception());
699  } else {
700  *_final_resting_place = futurator::from_tuple(std::move(this->_state).get_value());
701  }
702  auto base = _base;
703  this->~when_all_state_component();
704  base->complete_one();
705  }
706 };
707 
708 #if __cpp_fold_expressions >= 201603
709 // This optimization requires C++17
710 # define SEASTAR__WAIT_ALL__AVOID_ALLOCATION_WHEN_ALL_READY
711 #endif
712 
713 template<typename ResolvedTupleTransform, typename... Futures>
714 class when_all_state : public when_all_state_base {
715  static constexpr size_t nr = sizeof...(Futures);
716  using type = std::tuple<Futures...>;
717  type tuple;
718  // We only schedule one continuation at a time, and store it in _cont.
719  // This way, if while the future we wait for completes, some other futures
720  // also complete, we won't need to schedule continuations for them.
721  std::aligned_union_t<1, when_all_state_component<Futures>...> _cont;
722  when_all_process_element _processors[nr];
723 public:
724  typename ResolvedTupleTransform::promise_type p;
725  when_all_state(Futures&&... t) : when_all_state_base(nr, _processors, &_cont), tuple(std::make_tuple(std::move(t)...)) {
726  init_element_processors(std::make_index_sequence<nr>());
727  }
728  virtual ~when_all_state() {
729  ResolvedTupleTransform::set_promise(p, std::move(tuple));
730  }
731 private:
732  template <size_t... Idx>
733  void init_element_processors(std::index_sequence<Idx...>) {
734  auto ignore = {
735  0,
736  (_processors[Idx] = when_all_process_element{
737  when_all_state_component<std::tuple_element_t<Idx, type>>::process_element_func,
738  &std::get<Idx>(tuple)
739  }, 0)...
740  };
741  (void)ignore;
742  }
743 public:
744  static typename ResolvedTupleTransform::future_type wait_all(Futures&&... futures) {
745 #ifdef SEASTAR__WAIT_ALL__AVOID_ALLOCATION_WHEN_ALL_READY
746  if ((futures.available() && ...)) {
747  return ResolvedTupleTransform::make_ready_future(std::make_tuple(std::move(futures)...));
748  }
749 #endif
750  auto state = [&] () noexcept {
751  memory::disable_failure_guard dfg;
752  return new when_all_state(std::move(futures)...);
753  }();
754  auto ret = state->p.get_future();
755  state->do_wait_all();
756  return ret;
757  }
758 };
759 
760 }
762 
763 GCC6_CONCEPT(
764 
766 namespace impl {
767 
768 
769 // Want: folds
770 
771 template <typename T>
772 struct is_tuple_of_futures : std::false_type {
773 };
774 
775 template <>
776 struct is_tuple_of_futures<std::tuple<>> : std::true_type {
777 };
778 
779 template <typename... T, typename... Rest>
780 struct is_tuple_of_futures<std::tuple<future<T...>, Rest...>> : is_tuple_of_futures<std::tuple<Rest...>> {
781 };
782 
783 }
785 
786 template <typename... Futs>
787 concept bool AllAreFutures = impl::is_tuple_of_futures<std::tuple<Futs...>>::value;
788 
789 )
790 
791 template<typename Fut, std::enable_if_t<is_future<Fut>::value, int> = 0>
792 auto futurize_apply_if_func(Fut&& fut) {
793  return std::forward<Fut>(fut);
794 }
795 
796 template<typename Func, std::enable_if_t<!is_future<Func>::value, int> = 0>
797 auto futurize_apply_if_func(Func&& func) {
798  return futurize_apply(std::forward<Func>(func));
799 }
800 
801 template <typename... Futs>
802 GCC6_CONCEPT( requires seastar::AllAreFutures<Futs...> )
803 inline
804 future<std::tuple<Futs...>>
805 when_all_impl(Futs&&... futs) {
806  namespace si = internal;
807  using state = si::when_all_state<si::identity_futures_tuple<Futs...>, Futs...>;
808  return state::wait_all(std::forward<Futs>(futs)...);
809 }
810 
824 template <typename... FutOrFuncs>
825 inline auto when_all(FutOrFuncs&&... fut_or_funcs) {
826  return when_all_impl(futurize_apply_if_func(std::forward<FutOrFuncs>(fut_or_funcs))...);
827 }
828 
830 namespace internal {
831 
832 template <typename Iterator, typename IteratorCategory>
833 inline
834 size_t
835 when_all_estimate_vector_capacity(Iterator begin, Iterator end, IteratorCategory category) {
836  // For InputIterators we can't estimate needed capacity
837  return 0;
838 }
839 
840 template <typename Iterator>
841 inline
842 size_t
843 when_all_estimate_vector_capacity(Iterator begin, Iterator end, std::forward_iterator_tag category) {
844  // May be linear time below random_access_iterator_tag, but still better than reallocation
845  return std::distance(begin, end);
846 }
847 
848 template<typename Future>
849 struct identity_futures_vector {
850  using future_type = future<std::vector<Future>>;
851  static future_type run(std::vector<Future> futures) {
852  return make_ready_future<std::vector<Future>>(std::move(futures));
853  }
854 };
855 
856 // Internal function for when_all().
857 template <typename ResolvedVectorTransform, typename Future>
858 inline
859 typename ResolvedVectorTransform::future_type
860 complete_when_all(std::vector<Future>&& futures, typename std::vector<Future>::iterator pos) {
861  // If any futures are already ready, skip them.
862  while (pos != futures.end() && pos->available()) {
863  ++pos;
864  }
865  // Done?
866  if (pos == futures.end()) {
867  return ResolvedVectorTransform::run(std::move(futures));
868  }
869  // Wait for unready future, store, and continue.
870  return pos->then_wrapped([futures = std::move(futures), pos] (auto fut) mutable {
871  *pos++ = std::move(fut);
872  return complete_when_all<ResolvedVectorTransform>(std::move(futures), pos);
873  });
874 }
875 
876 template<typename ResolvedVectorTransform, typename FutureIterator>
877 inline auto
878 do_when_all(FutureIterator begin, FutureIterator end) {
879  using itraits = std::iterator_traits<FutureIterator>;
880  std::vector<typename itraits::value_type> ret;
881  ret.reserve(when_all_estimate_vector_capacity(begin, end, typename itraits::iterator_category()));
882  // Important to invoke the *begin here, in case it's a function iterator,
883  // so we launch all computation in parallel.
884  std::move(begin, end, std::back_inserter(ret));
885  return complete_when_all<ResolvedVectorTransform>(std::move(ret), ret.begin());
886 }
887 
888 }
890 
901 template <typename FutureIterator>
902 GCC6_CONCEPT( requires requires (FutureIterator i) { { *i++ }; requires is_future<std::remove_reference_t<decltype(*i)>>::value; } )
903 inline
904 future<std::vector<typename std::iterator_traits<FutureIterator>::value_type>>
905 when_all(FutureIterator begin, FutureIterator end) {
906  namespace si = internal;
907  using itraits = std::iterator_traits<FutureIterator>;
908  using result_transform = si::identity_futures_vector<typename itraits::value_type>;
909  return si::do_when_all<result_transform>(std::move(begin), std::move(end));
910 }
911 
912 template <typename T, bool IsFuture>
914 
915 template <typename T>
916 struct reducer_with_get_traits<T, false> {
917  using result_type = decltype(std::declval<T>().get());
919  static future_type maybe_call_get(future<> f, lw_shared_ptr<T> r) {
920  return f.then([r = std::move(r)] () mutable {
921  return make_ready_future<result_type>(std::move(*r).get());
922  });
923  }
924 };
925 
926 template <typename T>
927 struct reducer_with_get_traits<T, true> {
928  using future_type = decltype(std::declval<T>().get());
929  static future_type maybe_call_get(future<> f, lw_shared_ptr<T> r) {
930  return f.then([r = std::move(r)] () mutable {
931  return r->get();
932  }).then_wrapped([r] (future_type f) {
933  return f;
934  });
935  }
936 };
937 
938 template <typename T, typename V = void>
940  using future_type = future<>;
941  static future_type maybe_call_get(future<> f, lw_shared_ptr<T> r) {
942  return f.then([r = std::move(r)] {});
943  }
944 };
945 
946 template <typename T>
947 struct reducer_traits<T, decltype(std::declval<T>().get(), void())> : public reducer_with_get_traits<T, is_future<std::result_of_t<decltype(&T::get)(T)>>::value> {};
948 
949 // @Mapper is a callable which transforms values from the iterator range
950 // into a future<T>. @Reducer is an object which can be called with T as
951 // parameter and yields a future<>. It may have a get() method which returns
952 // a value of type U which holds the result of reduction. This value is wrapped
953 // in a future and returned by this function. If the reducer has no get() method
954 // then this function returns future<>.
955 //
956 // TODO: specialize for non-deferring reducer
957 template <typename Iterator, typename Mapper, typename Reducer>
958 inline
959 auto
960 map_reduce(Iterator begin, Iterator end, Mapper&& mapper, Reducer&& r)
962 {
963  auto r_ptr = make_lw_shared(std::forward<Reducer>(r));
964  future<> ret = make_ready_future<>();
965  using futurator = futurize<decltype(mapper(*begin))>;
966  while (begin != end) {
967  ret = futurator::apply(mapper, *begin++).then_wrapped([ret = std::move(ret), r_ptr] (auto f) mutable {
968  return ret.then_wrapped([f = std::move(f), r_ptr] (auto rf) mutable {
969  if (rf.failed()) {
970  f.ignore_ready_future();
971  return std::move(rf);
972  } else {
973  return futurize<void>::apply(*r_ptr, std::move(f.get()));
974  }
975  });
976  });
977  }
978  return reducer_traits<Reducer>::maybe_call_get(std::move(ret), r_ptr);
979 }
980 
1016 template <typename Iterator, typename Mapper, typename Initial, typename Reduce>
1017 GCC6_CONCEPT( requires requires (Iterator i, Mapper mapper, Initial initial, Reduce reduce) {
1018  *i++;
1019  { i != i} -> bool;
1020  mapper(*i);
1021  requires is_future<decltype(mapper(*i))>::value;
1022  { reduce(std::move(initial), mapper(*i).get0()) } -> Initial;
1023 } )
1024 inline
1025 future<Initial>
1026 map_reduce(Iterator begin, Iterator end, Mapper&& mapper, Initial initial, Reduce reduce) {
1027  struct state {
1028  Initial result;
1029  Reduce reduce;
1030  };
1031  auto s = make_lw_shared(state{std::move(initial), std::move(reduce)});
1032  future<> ret = make_ready_future<>();
1033  using futurator = futurize<decltype(mapper(*begin))>;
1034  while (begin != end) {
1035  ret = futurator::apply(mapper, *begin++).then_wrapped([s = s.get(), ret = std::move(ret)] (auto f) mutable {
1036  try {
1037  s->result = s->reduce(std::move(s->result), std::move(f.get0()));
1038  return std::move(ret);
1039  } catch (...) {
1040  return std::move(ret).then_wrapped([ex = std::current_exception()] (auto f) {
1041  f.ignore_ready_future();
1042  return make_exception_future<>(ex);
1043  });
1044  }
1045  });
1046  }
1047  return ret.then([s] {
1048  return make_ready_future<Initial>(std::move(s->result));
1049  });
1050 }
1051 
1087 template <typename Range, typename Mapper, typename Initial, typename Reduce>
1088 GCC6_CONCEPT( requires requires (Range range, Mapper mapper, Initial initial, Reduce reduce) {
1089  std::begin(range);
1090  std::end(range);
1091  mapper(*std::begin(range));
1092  requires is_future<std::remove_reference_t<decltype(mapper(*std::begin(range)))>>::value;
1093  { reduce(std::move(initial), mapper(*std::begin(range)).get0()) } -> Initial;
1094 } )
1095 inline
1096 future<Initial>
1097 map_reduce(Range&& range, Mapper&& mapper, Initial initial, Reduce reduce) {
1098  return map_reduce(std::begin(range), std::end(range), std::forward<Mapper>(mapper),
1099  std::move(initial), std::move(reduce));
1100 }
1101 
1102 // Implements @Reducer concept. Calculates the result by
1103 // adding elements to the accumulator.
1104 template <typename Result, typename Addend = Result>
1105 class adder {
1106 private:
1107  Result _result;
1108 public:
1109  future<> operator()(const Addend& value) {
1110  _result += value;
1111  return make_ready_future<>();
1112  }
1113  Result get() && {
1114  return std::move(_result);
1115  }
1116 };
1117 
1118 inline
1119 future<> now() {
1120  return make_ready_future<>();
1121 }
1122 
1123 // Returns a future which is not ready but is scheduled to resolve soon.
1124 future<> later();
1125 
1126 class timed_out_error : public std::exception {
1127 public:
1128  virtual const char* what() const noexcept {
1129  return "timedout";
1130  }
1131 };
1132 
1134  static auto timeout() {
1135  return timed_out_error();
1136  }
1137 };
1138 
1151 template<typename ExceptionFactory = default_timeout_exception_factory, typename Clock, typename Duration, typename... T>
1152 future<T...> with_timeout(std::chrono::time_point<Clock, Duration> timeout, future<T...> f) {
1153  if (f.available()) {
1154  return f;
1155  }
1156  auto pr = std::make_unique<promise<T...>>();
1157  auto result = pr->get_future();
1158  timer<Clock> timer([&pr = *pr] {
1159  pr.set_exception(std::make_exception_ptr(ExceptionFactory::timeout()));
1160  });
1161  timer.arm(timeout);
1162  f.then_wrapped([pr = std::move(pr), timer = std::move(timer)] (auto&& f) mutable {
1163  if (timer.cancel()) {
1164  f.forward_to(std::move(*pr));
1165  } else {
1166  f.ignore_ready_future();
1167  }
1168  });
1169  return result;
1170 }
1171 
1172 namespace internal {
1173 
1174 template<typename Future>
1175 struct future_has_value {
1176  enum {
1177  value = !std::is_same<std::decay_t<Future>, future<>>::value
1178  };
1179 };
1180 
1181 template<typename Tuple>
1182 struct tuple_to_future;
1183 
1184 template<typename... Elements>
1185 struct tuple_to_future<std::tuple<Elements...>> {
1186  using type = future<Elements...>;
1187  using promise_type = promise<Elements...>;
1188 
1189  static auto make_ready(std::tuple<Elements...> t) {
1190  auto create_future = [] (auto&&... args) {
1191  return make_ready_future<Elements...>(std::move(args)...);
1192  };
1193  return apply(create_future, std::move(t));
1194  }
1195 
1196  static auto make_failed(std::exception_ptr excp) {
1197  return make_exception_future<Elements...>(std::move(excp));
1198  }
1199 };
1200 
1201 template<typename... Futures>
1202 class extract_values_from_futures_tuple {
1203  static auto transform(std::tuple<Futures...> futures) {
1204  auto prepare_result = [] (auto futures) {
1205  auto fs = tuple_filter_by_type<internal::future_has_value>(std::move(futures));
1206  return tuple_map(std::move(fs), [] (auto&& e) {
1207  return internal::untuple(e.get());
1208  });
1209  };
1210 
1211  using tuple_futurizer = internal::tuple_to_future<decltype(prepare_result(std::move(futures)))>;
1212 
1213  std::exception_ptr excp;
1214  tuple_for_each(futures, [&excp] (auto& f) {
1215  if (!excp) {
1216  if (f.failed()) {
1217  excp = f.get_exception();
1218  }
1219  } else {
1220  f.ignore_ready_future();
1221  }
1222  });
1223  if (excp) {
1224  return tuple_futurizer::make_failed(std::move(excp));
1225  }
1226 
1227  return tuple_futurizer::make_ready(prepare_result(std::move(futures)));
1228  }
1229 public:
1230  using future_type = decltype(transform(std::declval<std::tuple<Futures...>>()));
1231  using promise_type = typename future_type::promise_type;
1232 
1233  static void set_promise(promise_type& p, std::tuple<Futures...> tuple) {
1234  transform(std::move(tuple)).forward_to(std::move(p));
1235  }
1236 
1237  static future_type make_ready_future(std::tuple<Futures...> tuple) {
1238  return transform(std::move(tuple));
1239  }
1240 };
1241 
1242 template<typename Future>
1243 struct extract_values_from_futures_vector {
1244  using value_type = decltype(untuple(std::declval<typename Future::value_type>()));
1245 
1246  using future_type = future<std::vector<value_type>>;
1247 
1248  static future_type run(std::vector<Future> futures) {
1249  std::vector<value_type> values;
1250  values.reserve(futures.size());
1251 
1252  std::exception_ptr excp;
1253  for (auto&& f : futures) {
1254  if (!excp) {
1255  if (f.failed()) {
1256  excp = f.get_exception();
1257  } else {
1258  values.emplace_back(untuple(f.get()));
1259  }
1260  } else {
1261  f.ignore_ready_future();
1262  }
1263  }
1264  if (excp) {
1265  return make_exception_future<std::vector<value_type>>(std::move(excp));
1266  }
1267  return make_ready_future<std::vector<value_type>>(std::move(values));
1268  }
1269 };
1270 
1271 template<>
1272 struct extract_values_from_futures_vector<future<>> {
1273  using future_type = future<>;
1274 
1275  static future_type run(std::vector<future<>> futures) {
1276  std::exception_ptr excp;
1277  for (auto&& f : futures) {
1278  if (!excp) {
1279  if (f.failed()) {
1280  excp = f.get_exception();
1281  }
1282  } else {
1283  f.ignore_ready_future();
1284  }
1285  }
1286  if (excp) {
1287  return make_exception_future<>(std::move(excp));
1288  }
1289  return make_ready_future<>();
1290  }
1291 };
1292 
1293 }
1294 
1295 template<typename... Futures>
1296 GCC6_CONCEPT( requires seastar::AllAreFutures<Futures...> )
1297 inline auto when_all_succeed_impl(Futures&&... futures) {
1298  using state = internal::when_all_state<internal::extract_values_from_futures_tuple<Futures...>, Futures...>;
1299  return state::wait_all(std::forward<Futures>(futures)...);
1300 }
1301 
1312 template <typename... FutOrFuncs>
1313 inline auto when_all_succeed(FutOrFuncs&&... fut_or_funcs) {
1314  return when_all_succeed_impl(futurize_apply_if_func(std::forward<FutOrFuncs>(fut_or_funcs))...);
1315 }
1316 
1327 template <typename FutureIterator, typename = typename std::iterator_traits<FutureIterator>::value_type>
1328 GCC6_CONCEPT( requires requires (FutureIterator i) {
1329  *i++;
1330  { i != i } -> bool;
1331  requires is_future<std::remove_reference_t<decltype(*i)>>::value;
1332 } )
1333 inline auto
1334 when_all_succeed(FutureIterator begin, FutureIterator end) {
1335  using itraits = std::iterator_traits<FutureIterator>;
1336  using result_transform = internal::extract_values_from_futures_vector<typename itraits::value_type>;
1337  return internal::do_when_all<result_transform>(std::move(begin), std::move(end));
1338 }
1339 
1340 }
1341 
future< T... > make_ready_future(A &&... value)
Creates a future in an available, value state.
Definition: future.hh:1273
A representation of a possibly not-yet-computed value.
Definition: future.hh:78
future< T... > with_timeout(std::chrono::time_point< Clock, Duration > timeout, future< T... > f)
Wait for either a future, or a timeout, whichever comes first.
Definition: future-util.hh:1152
auto when_all_succeed(FutOrFuncs &&... fut_or_funcs)
Definition: future-util.hh:1313
Definition: future-util.hh:939
Definition: timer.hh:36
compat::optional< T > optional_type
Type used by AsyncAction while looping.
Definition: future-util.hh:324
typename repeat_until_value_type_helper< std::result_of_t< AsyncAction()> >::future_type repeat_until_value_return_type
Return value of repeat_until_value()
Definition: future-util.hh:334
Definition: future-util.hh:913
STL namespace.
static type apply(Func &&func, std::tuple< FuncArgs... > &&args) noexcept
future keep_doing(AsyncAction action)
Definition: future-util.hh:538
Definition: future-util.hh:1105
Definition: alloc_failure_injector.hh:111
auto tuple_map(const std::tuple< Elements... > &t, Function &&f)
Definition: tuple_utils.hh:138
future repeat(AsyncAction action)
Definition: future-util.hh:277
static type from_tuple(value_type &&value)
Convert the tuple representation into a future.
Converts a type to a future type, if it isn&#39;t already.
Definition: future.hh:588
T value_type
The type of the value we are computing.
Definition: future-util.hh:322
void forward_to(promise< T... > &&pr) noexcept
Satisfy some promise object with this future as a result.
Definition: future.hh:1043
Result then_wrapped(Func &&func) noexcept
Schedule a block of code to run when the future is ready, allowing for exception handling.
Definition: future.hh:998
bool available() const noexcept
Checks whether the future is available.
Definition: future.hh:907
Definition: future-util.hh:1133
Identifies function calls that are accounted as a group.
Definition: scheduling.hh:75
Definition: future-util.hh:187
static type convert(T &&value)
Convert a value or a future to a future.
Definition: future.hh:610
future do_for_each(Iterator begin, Iterator end, AsyncAction action)
Definition: future-util.hh:563
repeat_until_value_return_type< AsyncAction > repeat_until_value(AsyncAction action)
Definition: future-util.hh:402
Result then(Func &&func) noexcept
Schedule a block of code to run when the future is ready.
Definition: future.hh:937
future< T... > make_exception_future(std::exception_ptr value) noexcept
Creates a future in an available, failed state.
Definition: future.hh:1279
void tuple_for_each(const std::tuple< Elements... > &t, Function &&f)
Definition: tuple_utils.hh:156
Definition: shared_ptr.hh:62
bool failed() noexcept
Checks whether the future has failed.
Definition: future.hh:915
Definition: task.hh:29
Type-safe boolean.
Definition: bool_class.hh:57
auto with_scheduling_group(scheduling_group sg, Func func, Args &&... args)
run a callable (with some arbitrary arguments) in a scheduling group
Definition: future-util.hh:76
Definition: future.hh:75
future do_until(StopCondition stop_cond, AsyncAction action)
Definition: future-util.hh:500
future parallel_for_each(Iterator begin, Iterator end, Func &&func)
Definition: future-util.hh:126
Definition: future-util.hh:1126
auto when_all(FutOrFuncs &&... fut_or_funcs)
Definition: future-util.hh:825
Seastar API namespace.
Definition: abort_source.hh:34
void ignore_ready_future() noexcept
Ignore any result hold by this future.
Definition: future.hh:1203