Seastar
High performance C++ framework for concurrent servers
future-util.hh
Go to the documentation of this file.
1 /*
2  * This file is open source software, licensed to you under the terms
3  * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4  * distributed with this work for additional information regarding copyright
5  * ownership. You may not use this file except in compliance with the License.
6  *
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an
13  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14  * KIND, either express or implied. See the License for the
15  * specific language governing permissions and limitations
16  * under the License.
17  */
18 /*
19  * Copyright (C) 2014 Cloudius Systems, Ltd.
20  */
21 
22 
25 #pragma once
26 
27 #include <seastar/core/task.hh>
28 #include <seastar/core/future.hh>
29 #include <seastar/core/shared_ptr.hh>
30 #include <seastar/core/do_with.hh>
31 #include <seastar/core/timer.hh>
32 #include <seastar/util/bool_class.hh>
33 #include <tuple>
34 #include <iterator>
35 #include <vector>
36 #include <seastar/util/std-compat.hh>
37 #include <seastar/util/tuple_utils.hh>
38 #include <seastar/util/noncopyable_function.hh>
39 
40 namespace seastar {
41 
43 extern __thread size_t task_quota;
45 
46 
48 namespace internal {
49 
50 template <typename Func>
51 void
52 schedule_in_group(scheduling_group sg, Func func) {
53  schedule(make_task(sg, std::move(func)));
54 }
55 
56 
57 }
59 
62 
73 template <typename Func, typename... Args>
74 inline
75 auto
76 with_scheduling_group(scheduling_group sg, Func func, Args&&... args) {
77  using return_type = decltype(func(std::forward<Args>(args)...));
78  using futurator = futurize<return_type>;
79  if (sg.active()) {
80  return futurator::apply(func, std::forward<Args>(args)...);
81  } else {
82  typename futurator::promise_type pr;
83  auto f = pr.get_future();
84  internal::schedule_in_group(sg, [pr = std::move(pr), func = std::move(func), args = std::make_tuple(std::forward<Args>(args)...)] () mutable {
85  return futurator::apply(func, std::move(args)).forward_to(std::move(pr));
86  });
87  return f;
88  }
89 }
90 
92 
93 struct parallel_for_each_state {
94  // use optional<> to avoid out-of-line constructor
95  compat::optional<std::exception_ptr> ex;
96  promise<> pr;
97  ~parallel_for_each_state() {
98  if (ex) {
99  pr.set_exception(std::move(*ex));
100  } else {
101  pr.set_value();
102  }
103  }
104 };
105 
107 
122 template <typename Iterator, typename Func>
123 GCC6_CONCEPT( requires requires (Func f, Iterator i) { { f(*i++) } -> future<>; } )
124 inline
125 future<>
126 parallel_for_each(Iterator begin, Iterator end, Func&& func) {
128  while (begin != end) {
129  auto f = futurize_apply(std::forward<Func>(func), *begin++);
130  if (__builtin_expect(!f.available() || f.failed(), false)) {
131  if (!state) {
132  if (begin == end) {
133  // Only the last element was not immediately ready (likely if
134  // there is exactly one element)
135  return f;
136  }
137  [&state] () noexcept {
139  state = make_lw_shared<parallel_for_each_state>();
140  }();
141  }
142  // Moving fiber to the background.
143  (void)f.then_wrapped([state] (future<> f) {
144  if (f.failed()) {
145  // We can only store one exception. For more, use when_all().
146  if (!state->ex) {
147  state->ex = f.get_exception();
148  } else {
149  f.ignore_ready_future();
150  }
151  }
152  });
153  }
154  }
155  if (__builtin_expect(bool(state), false)) {
156  return state->pr.get_future();
157  }
158  return make_ready_future<>();
159 }
160 
175 template <typename Range, typename Func>
176 GCC6_CONCEPT( requires requires (Func f, Range r) { { f(*r.begin()) } -> future<>; } )
177 inline
178 future<>
179 parallel_for_each(Range&& range, Func&& func) {
180  return parallel_for_each(std::begin(range), std::end(range),
181  std::forward<Func>(func));
182 }
183 
184 // The AsyncAction concept represents an action which can complete later than
185 // the actual function invocation. It is represented by a function which
186 // returns a future which resolves when the action is done.
187 
190 
192 
193 namespace internal {
194 
195 template <typename AsyncAction>
196 class repeater final : public continuation_base<stop_iteration> {
197  promise<> _promise;
198  AsyncAction _action;
199 public:
200  explicit repeater(AsyncAction action) : _action(std::move(action)) {}
201  repeater(stop_iteration si, AsyncAction action) : repeater(std::move(action)) {
202  _state.set(std::make_tuple(si));
203  }
204  future<> get_future() { return _promise.get_future(); }
205  virtual void run_and_dispose() noexcept override {
206  std::unique_ptr<repeater> zis{this};
207  if (_state.failed()) {
208  _promise.set_exception(std::move(_state).get_exception());
209  return;
210  } else {
211  if (std::get<0>(_state.get()) == stop_iteration::yes) {
212  _promise.set_value();
213  return;
214  }
215  _state = {};
216  }
217  try {
218  do {
219  auto f = _action();
220  if (!f.available()) {
221  internal::set_callback(f, std::move(zis));
222  return;
223  }
224  if (f.get0() == stop_iteration::yes) {
225  _promise.set_value();
226  return;
227  }
228  } while (!need_preempt());
229  } catch (...) {
230  _promise.set_exception(std::current_exception());
231  return;
232  }
233  _state.set(stop_iteration::no);
234  schedule(std::move(zis));
235  }
236 };
237 
238 template <typename AsyncAction, bool ReturnsFuture = true>
239 struct futurized_action_helper {
240  using type = AsyncAction;
241 };
242 
243 template <typename AsyncAction>
244 struct futurized_action_helper<AsyncAction, false> {
245  struct wrapper {
246  AsyncAction action;
247  using orig_ret = std::result_of_t<AsyncAction()>;
248  explicit wrapper(AsyncAction&& action) : action(std::move(action)) {}
249  futurize_t<orig_ret> operator()() {
250  return futurize<orig_ret>::convert(action());
251  };
252  };
253  using type = wrapper;
254 };
255 
256 template <typename AsyncAction>
257 struct futurized_action {
258  using type = typename futurized_action_helper<AsyncAction, is_future<std::result_of_t<AsyncAction()>>::value>::type;
259 };
260 
261 
262 }
263 
265 
275 template<typename AsyncAction>
276 GCC6_CONCEPT( requires seastar::ApplyReturns<AsyncAction, stop_iteration> || seastar::ApplyReturns<AsyncAction, future<stop_iteration>> )
277 inline
278 future<> repeat(AsyncAction action) {
279  using futurator = futurize<std::result_of_t<AsyncAction()>>;
280  static_assert(std::is_same<future<stop_iteration>, typename futurator::type>::value, "bad AsyncAction signature");
281  using futurized_action_type = typename internal::futurized_action<AsyncAction>::type;
282  auto futurized_action = futurized_action_type(std::move(action));
283  try {
284  do {
285  // Do not type-erase here in case this is a short repeat()
286  auto f = futurized_action();
287 
288  if (!f.available()) {
289  return [&] () noexcept {
291  auto repeater = std::make_unique<internal::repeater<futurized_action_type>>(std::move(futurized_action));
292  auto ret = repeater->get_future();
293  internal::set_callback(f, std::move(repeater));
294  return ret;
295  }();
296  }
297 
298  if (f.get0() == stop_iteration::yes) {
299  return make_ready_future<>();
300  }
301  } while (!need_preempt());
302 
303  auto repeater = std::make_unique<internal::repeater<futurized_action_type>>(stop_iteration::no, std::move(futurized_action));
304  auto ret = repeater->get_future();
305  schedule(std::move(repeater));
306  return ret;
307  } catch (...) {
308  return make_exception_future(std::current_exception());
309  }
310 }
311 
313 
314 template <typename T>
315 struct repeat_until_value_type_helper;
316 
318 
320 template <typename T>
321 struct repeat_until_value_type_helper<future<compat::optional<T>>> {
323  using value_type = T;
325  using optional_type = compat::optional<T>;
330 };
331 
333 template <typename AsyncAction>
335  = typename repeat_until_value_type_helper<std::result_of_t<AsyncAction()>>::future_type;
336 
337 namespace internal {
338 
339 template <typename AsyncAction, typename T>
340 class repeat_until_value_state final : public continuation_base<compat::optional<T>> {
341  promise<T> _promise;
342  AsyncAction _action;
343 public:
344  explicit repeat_until_value_state(AsyncAction action) : _action(std::move(action)) {}
345  repeat_until_value_state(compat::optional<T> st, AsyncAction action) : repeat_until_value_state(std::move(action)) {
346  this->_state.set(std::make_tuple(std::move(st)));
347  }
348  future<T> get_future() { return _promise.get_future(); }
349  virtual void run_and_dispose() noexcept override {
350  std::unique_ptr<repeat_until_value_state> zis{this};
351  if (this->_state.failed()) {
352  _promise.set_exception(std::move(this->_state).get_exception());
353  return;
354  } else {
355  auto v = std::get<0>(std::move(this->_state).get());
356  if (v) {
357  _promise.set_value(std::move(*v));
358  return;
359  }
360  this->_state = {};
361  }
362  try {
363  do {
364  auto f = _action();
365  if (!f.available()) {
366  internal::set_callback(f, std::move(zis));
367  return;
368  }
369  auto ret = f.get0();
370  if (ret) {
371  _promise.set_value(std::make_tuple(std::move(*ret)));
372  return;
373  }
374  } while (!need_preempt());
375  } catch (...) {
376  _promise.set_exception(std::current_exception());
377  return;
378  }
379  this->_state.set(compat::nullopt);
380  schedule(std::move(zis));
381  }
382 };
383 
384 }
385 
396 template<typename AsyncAction>
397 GCC6_CONCEPT( requires requires (AsyncAction aa) {
398  requires is_future<decltype(aa())>::value;
399  bool(aa().get0());
400  aa().get0().value();
401 } )
402 repeat_until_value_return_type<AsyncAction>
403 repeat_until_value(AsyncAction action) {
404  using type_helper = repeat_until_value_type_helper<std::result_of_t<AsyncAction()>>;
405  // the "T" in the documentation
406  using value_type = typename type_helper::value_type;
407  using optional_type = typename type_helper::optional_type;
408  using futurized_action_type = typename internal::futurized_action<AsyncAction>::type;
409  auto futurized_action = futurized_action_type(std::move(action));
410  do {
411  auto f = futurized_action();
412 
413  if (!f.available()) {
414  return [&] () noexcept {
416  auto state = std::make_unique<internal::repeat_until_value_state<futurized_action_type, value_type>>(std::move(futurized_action));
417  auto ret = state->get_future();
418  internal::set_callback(f, std::move(state));
419  return ret;
420  }();
421  }
422 
423  if (f.failed()) {
424  return make_exception_future<value_type>(f.get_exception());
425  }
426 
427  optional_type&& optional = std::move(f).get0();
428  if (optional) {
429  return make_ready_future<value_type>(std::move(optional.value()));
430  }
431  } while (!need_preempt());
432 
433  try {
434  auto state = std::make_unique<internal::repeat_until_value_state<futurized_action_type, value_type>>(compat::nullopt, std::move(futurized_action));
435  auto f = state->get_future();
436  schedule(std::move(state));
437  return f;
438  } catch (...) {
439  return make_exception_future<value_type>(std::current_exception());
440  }
441 }
442 
443 namespace internal {
444 
445 template <typename StopCondition, typename AsyncAction>
446 class do_until_state final : public continuation_base<> {
447  promise<> _promise;
448  StopCondition _stop;
449  AsyncAction _action;
450 public:
451  explicit do_until_state(StopCondition stop, AsyncAction action) : _stop(std::move(stop)), _action(std::move(action)) {}
452  future<> get_future() { return _promise.get_future(); }
453  virtual void run_and_dispose() noexcept override {
454  std::unique_ptr<do_until_state> zis{this};
455  if (_state.available()) {
456  if (_state.failed()) {
457  _promise.set_urgent_state(std::move(_state));
458  return;
459  }
460  _state = {}; // allow next cycle to overrun state
461  }
462  try {
463  do {
464  if (_stop()) {
465  _promise.set_value();
466  return;
467  }
468  auto f = _action();
469  if (!f.available()) {
470  internal::set_callback(f, std::move(zis));
471  return;
472  }
473  if (f.failed()) {
474  f.forward_to(std::move(_promise));
475  return;
476  }
477  } while (!need_preempt());
478  } catch (...) {
479  _promise.set_exception(std::current_exception());
480  return;
481  }
482  schedule(std::move(zis));
483  }
484 };
485 
486 }
487 
498 template<typename AsyncAction, typename StopCondition>
499 GCC6_CONCEPT( requires seastar::ApplyReturns<StopCondition, bool> && seastar::ApplyReturns<AsyncAction, future<>> )
500 inline
501 future<> do_until(StopCondition stop_cond, AsyncAction action) {
502  using namespace internal;
503  using futurator = futurize<void>;
504  do {
505  if (stop_cond()) {
506  return make_ready_future<>();
507  }
508  auto f = futurator::apply(action);
509  if (!f.available()) {
510  return [&] () noexcept {
512  auto task = std::make_unique<do_until_state<StopCondition, AsyncAction>>(std::move(stop_cond), std::move(action));
513  auto ret = task->get_future();
514  internal::set_callback(f, std::move(task));
515  return ret;
516  }();
517  }
518  if (f.failed()) {
519  return f;
520  }
521  } while (!need_preempt());
522 
523  auto task = std::make_unique<do_until_state<StopCondition, AsyncAction>>(std::move(stop_cond), std::move(action));
524  auto f = task->get_future();
525  schedule(std::move(task));
526  return f;
527 }
528 
536 template<typename AsyncAction>
537 GCC6_CONCEPT( requires seastar::ApplyReturns<AsyncAction, future<>> )
538 inline
539 future<> keep_doing(AsyncAction action) {
540  return repeat([action = std::move(action)] () mutable {
541  return action().then([] {
542  return stop_iteration::no;
543  });
544  });
545 }
546 
559 template<typename Iterator, typename AsyncAction>
560 GCC6_CONCEPT( requires requires (Iterator i, AsyncAction aa) {
561  { futurize_apply(aa, *i) } -> future<>;
562 } )
563 inline
564 future<> do_for_each(Iterator begin, Iterator end, AsyncAction action) {
565  if (begin == end) {
566  return make_ready_future<>();
567  }
568  while (true) {
569  auto f = futurize<void>::apply(action, *begin);
570  ++begin;
571  if (begin == end) {
572  return f;
573  }
574  if (!f.available() || need_preempt()) {
575  return std::move(f).then([action = std::move(action),
576  begin = std::move(begin), end = std::move(end)] () mutable {
577  return do_for_each(std::move(begin), std::move(end), std::move(action));
578  });
579  }
580  if (f.failed()) {
581  return f;
582  }
583  }
584 }
585 
597 template<typename Container, typename AsyncAction>
598 GCC6_CONCEPT( requires requires (Container c, AsyncAction aa) {
599  { futurize_apply(aa, *c.begin()) } -> future<>
600 } )
601 inline
602 future<> do_for_each(Container& c, AsyncAction action) {
603  return do_for_each(std::begin(c), std::end(c), std::move(action));
604 }
605 
607 namespace internal {
608 
609 template<typename... Futures>
610 struct identity_futures_tuple {
611  using future_type = future<std::tuple<Futures...>>;
612  using promise_type = typename future_type::promise_type;
613 
614  static void set_promise(promise_type& p, std::tuple<Futures...> futures) {
615  p.set_value(std::move(futures));
616  }
617 
618  static future_type make_ready_future(std::tuple<Futures...> futures) {
619  return futurize<future_type>::from_tuple(std::move(futures));
620  }
621 };
622 
623 // Given a future type, find the continuation_base corresponding to that future
624 template <typename Future>
625 struct continuation_base_for_future;
626 
627 template <typename... T>
628 struct continuation_base_for_future<future<T...>> {
629  using type = continuation_base<T...>;
630 };
631 
632 template <typename Future>
633 using continuation_base_for_future_t = typename continuation_base_for_future<Future>::type;
634 
635 class when_all_state_base;
636 
637 // If the future is ready, return true
638 // if the future is not ready, chain a continuation to it, and return false
639 using when_all_process_element_func = bool (*)(void* future, void* continuation, when_all_state_base* wasb);
640 
641 struct when_all_process_element {
642  when_all_process_element_func func;
643  void* future;
644 };
645 
646 class when_all_state_base {
647  size_t _nr_remain;
648  const when_all_process_element* _processors;
649  void* _continuation;
650 public:
651  virtual ~when_all_state_base() {}
652  when_all_state_base(size_t nr_remain, const when_all_process_element* processors, void* continuation)
653  : _nr_remain(nr_remain), _processors(processors), _continuation(continuation) {
654  }
655  void complete_one() {
656  // We complete in reverse order; if the futures happen to complete
657  // in order, then waiting for the last one will find the rest ready
658  --_nr_remain;
659  while (_nr_remain) {
660  bool ready = process_one(_nr_remain - 1);
661  if (!ready) {
662  return;
663  }
664  --_nr_remain;
665  }
666  if (!_nr_remain) {
667  delete this;
668  }
669  }
670  void do_wait_all() {
671  ++_nr_remain; // fake pending completion for complete_one()
672  complete_one();
673  }
674  bool process_one(size_t idx) {
675  auto p = _processors[idx];
676  return p.func(p.future, _continuation, this);
677  }
678 };
679 
680 template <typename Future>
681 class when_all_state_component : public continuation_base_for_future_t<Future> {
682  when_all_state_base* _base;
683  Future* _final_resting_place;
684 public:
685  static bool process_element_func(void* future, void* continuation, when_all_state_base* wasb) {
686  auto f = reinterpret_cast<Future*>(future);
687  if (f->available()) {
688  return true;
689  } else {
690  auto c = new (continuation) when_all_state_component(wasb, f);
691  set_callback(*f, std::unique_ptr<when_all_state_component>(c));
692  return false;
693  }
694  }
695  when_all_state_component(when_all_state_base *base, Future* future) : _base(base), _final_resting_place(future) {}
696  virtual void run_and_dispose() noexcept override {
697  using futurator = futurize<Future>;
698  if (__builtin_expect(this->_state.failed(), false)) {
699  *_final_resting_place = futurator::make_exception_future(std::move(this->_state).get_exception());
700  } else {
701  *_final_resting_place = futurator::from_tuple(std::move(this->_state).get_value());
702  }
703  auto base = _base;
704  this->~when_all_state_component();
705  base->complete_one();
706  }
707 };
708 
709 #if __cpp_fold_expressions >= 201603
710 // This optimization requires C++17
711 # define SEASTAR__WAIT_ALL__AVOID_ALLOCATION_WHEN_ALL_READY
712 #endif
713 
714 template<typename ResolvedTupleTransform, typename... Futures>
715 class when_all_state : public when_all_state_base {
716  static constexpr size_t nr = sizeof...(Futures);
717  using type = std::tuple<Futures...>;
718  type tuple;
719  // We only schedule one continuation at a time, and store it in _cont.
720  // This way, if while the future we wait for completes, some other futures
721  // also complete, we won't need to schedule continuations for them.
722  std::aligned_union_t<1, when_all_state_component<Futures>...> _cont;
723  when_all_process_element _processors[nr];
724 public:
725  typename ResolvedTupleTransform::promise_type p;
726  when_all_state(Futures&&... t) : when_all_state_base(nr, _processors, &_cont), tuple(std::make_tuple(std::move(t)...)) {
727  init_element_processors(std::make_index_sequence<nr>());
728  }
729  virtual ~when_all_state() {
730  ResolvedTupleTransform::set_promise(p, std::move(tuple));
731  }
732 private:
733  template <size_t... Idx>
734  void init_element_processors(std::index_sequence<Idx...>) {
735  auto ignore = {
736  0,
737  (_processors[Idx] = when_all_process_element{
738  when_all_state_component<std::tuple_element_t<Idx, type>>::process_element_func,
739  &std::get<Idx>(tuple)
740  }, 0)...
741  };
742  (void)ignore;
743  }
744 public:
745  static typename ResolvedTupleTransform::future_type wait_all(Futures&&... futures) {
746 #ifdef SEASTAR__WAIT_ALL__AVOID_ALLOCATION_WHEN_ALL_READY
747  if ((futures.available() && ...)) {
748  return ResolvedTupleTransform::make_ready_future(std::make_tuple(std::move(futures)...));
749  }
750 #endif
751  auto state = [&] () noexcept {
752  memory::disable_failure_guard dfg;
753  return new when_all_state(std::move(futures)...);
754  }();
755  auto ret = state->p.get_future();
756  state->do_wait_all();
757  return ret;
758  }
759 };
760 
761 }
763 
764 GCC6_CONCEPT(
765 
767 namespace impl {
768 
769 
770 // Want: folds
771 
772 template <typename T>
773 struct is_tuple_of_futures : std::false_type {
774 };
775 
776 template <>
777 struct is_tuple_of_futures<std::tuple<>> : std::true_type {
778 };
779 
780 template <typename... T, typename... Rest>
781 struct is_tuple_of_futures<std::tuple<future<T...>, Rest...>> : is_tuple_of_futures<std::tuple<Rest...>> {
782 };
783 
784 }
786 
787 template <typename... Futs>
788 concept bool AllAreFutures = impl::is_tuple_of_futures<std::tuple<Futs...>>::value;
789 
790 )
791 
792 template<typename Fut, std::enable_if_t<is_future<Fut>::value, int> = 0>
793 auto futurize_apply_if_func(Fut&& fut) {
794  return std::forward<Fut>(fut);
795 }
796 
797 template<typename Func, std::enable_if_t<!is_future<Func>::value, int> = 0>
798 auto futurize_apply_if_func(Func&& func) {
799  return futurize_apply(std::forward<Func>(func));
800 }
801 
802 template <typename... Futs>
803 GCC6_CONCEPT( requires seastar::AllAreFutures<Futs...> )
804 inline
805 future<std::tuple<Futs...>>
806 when_all_impl(Futs&&... futs) {
807  namespace si = internal;
808  using state = si::when_all_state<si::identity_futures_tuple<Futs...>, Futs...>;
809  return state::wait_all(std::forward<Futs>(futs)...);
810 }
811 
825 template <typename... FutOrFuncs>
826 inline auto when_all(FutOrFuncs&&... fut_or_funcs) {
827  return when_all_impl(futurize_apply_if_func(std::forward<FutOrFuncs>(fut_or_funcs))...);
828 }
829 
831 namespace internal {
832 
833 template <typename Iterator, typename IteratorCategory>
834 inline
835 size_t
836 when_all_estimate_vector_capacity(Iterator begin, Iterator end, IteratorCategory category) {
837  // For InputIterators we can't estimate needed capacity
838  return 0;
839 }
840 
841 template <typename Iterator>
842 inline
843 size_t
844 when_all_estimate_vector_capacity(Iterator begin, Iterator end, std::forward_iterator_tag category) {
845  // May be linear time below random_access_iterator_tag, but still better than reallocation
846  return std::distance(begin, end);
847 }
848 
849 template<typename Future>
850 struct identity_futures_vector {
851  using future_type = future<std::vector<Future>>;
852  static future_type run(std::vector<Future> futures) {
853  return make_ready_future<std::vector<Future>>(std::move(futures));
854  }
855 };
856 
857 // Internal function for when_all().
858 template <typename ResolvedVectorTransform, typename Future>
859 inline
860 typename ResolvedVectorTransform::future_type
861 complete_when_all(std::vector<Future>&& futures, typename std::vector<Future>::iterator pos) {
862  // If any futures are already ready, skip them.
863  while (pos != futures.end() && pos->available()) {
864  ++pos;
865  }
866  // Done?
867  if (pos == futures.end()) {
868  return ResolvedVectorTransform::run(std::move(futures));
869  }
870  // Wait for unready future, store, and continue.
871  return pos->then_wrapped([futures = std::move(futures), pos] (auto fut) mutable {
872  *pos++ = std::move(fut);
873  return complete_when_all<ResolvedVectorTransform>(std::move(futures), pos);
874  });
875 }
876 
877 template<typename ResolvedVectorTransform, typename FutureIterator>
878 inline auto
879 do_when_all(FutureIterator begin, FutureIterator end) {
880  using itraits = std::iterator_traits<FutureIterator>;
881  std::vector<typename itraits::value_type> ret;
882  ret.reserve(when_all_estimate_vector_capacity(begin, end, typename itraits::iterator_category()));
883  // Important to invoke the *begin here, in case it's a function iterator,
884  // so we launch all computation in parallel.
885  std::move(begin, end, std::back_inserter(ret));
886  return complete_when_all<ResolvedVectorTransform>(std::move(ret), ret.begin());
887 }
888 
889 }
891 
902 template <typename FutureIterator>
903 GCC6_CONCEPT( requires requires (FutureIterator i) { { *i++ }; requires is_future<std::remove_reference_t<decltype(*i)>>::value; } )
904 inline
905 future<std::vector<typename std::iterator_traits<FutureIterator>::value_type>>
906 when_all(FutureIterator begin, FutureIterator end) {
907  namespace si = internal;
908  using itraits = std::iterator_traits<FutureIterator>;
909  using result_transform = si::identity_futures_vector<typename itraits::value_type>;
910  return si::do_when_all<result_transform>(std::move(begin), std::move(end));
911 }
912 
913 template <typename T, bool IsFuture>
915 
916 template <typename T>
917 struct reducer_with_get_traits<T, false> {
918  using result_type = decltype(std::declval<T>().get());
920  static future_type maybe_call_get(future<> f, lw_shared_ptr<T> r) {
921  return f.then([r = std::move(r)] () mutable {
922  return make_ready_future<result_type>(std::move(*r).get());
923  });
924  }
925 };
926 
927 template <typename T>
928 struct reducer_with_get_traits<T, true> {
929  using future_type = decltype(std::declval<T>().get());
930  static future_type maybe_call_get(future<> f, lw_shared_ptr<T> r) {
931  return f.then([r = std::move(r)] () mutable {
932  return r->get();
933  }).then_wrapped([r] (future_type f) {
934  return f;
935  });
936  }
937 };
938 
939 template <typename T, typename V = void>
941  using future_type = future<>;
942  static future_type maybe_call_get(future<> f, lw_shared_ptr<T> r) {
943  return f.then([r = std::move(r)] {});
944  }
945 };
946 
947 template <typename T>
948 struct reducer_traits<T, decltype(std::declval<T>().get(), void())> : public reducer_with_get_traits<T, is_future<std::result_of_t<decltype(&T::get)(T)>>::value> {};
949 
950 // @Mapper is a callable which transforms values from the iterator range
951 // into a future<T>. @Reducer is an object which can be called with T as
952 // parameter and yields a future<>. It may have a get() method which returns
953 // a value of type U which holds the result of reduction. This value is wrapped
954 // in a future and returned by this function. If the reducer has no get() method
955 // then this function returns future<>.
956 //
957 // TODO: specialize for non-deferring reducer
958 template <typename Iterator, typename Mapper, typename Reducer>
959 inline
960 auto
961 map_reduce(Iterator begin, Iterator end, Mapper&& mapper, Reducer&& r)
963 {
964  auto r_ptr = make_lw_shared(std::forward<Reducer>(r));
965  future<> ret = make_ready_future<>();
966  using futurator = futurize<decltype(mapper(*begin))>;
967  while (begin != end) {
968  ret = futurator::apply(mapper, *begin++).then_wrapped([ret = std::move(ret), r_ptr] (auto f) mutable {
969  return ret.then_wrapped([f = std::move(f), r_ptr] (auto rf) mutable {
970  if (rf.failed()) {
971  f.ignore_ready_future();
972  return std::move(rf);
973  } else {
974  return futurize<void>::apply(*r_ptr, std::move(f.get()));
975  }
976  });
977  });
978  }
979  return reducer_traits<Reducer>::maybe_call_get(std::move(ret), r_ptr);
980 }
981 
1017 template <typename Iterator, typename Mapper, typename Initial, typename Reduce>
1018 GCC6_CONCEPT( requires requires (Iterator i, Mapper mapper, Initial initial, Reduce reduce) {
1019  *i++;
1020  { i != i} -> bool;
1021  mapper(*i);
1022  requires is_future<decltype(mapper(*i))>::value;
1023  { reduce(std::move(initial), mapper(*i).get0()) } -> Initial;
1024 } )
1025 inline
1026 future<Initial>
1027 map_reduce(Iterator begin, Iterator end, Mapper&& mapper, Initial initial, Reduce reduce) {
1028  struct state {
1029  Initial result;
1030  Reduce reduce;
1031  };
1032  auto s = make_lw_shared(state{std::move(initial), std::move(reduce)});
1033  future<> ret = make_ready_future<>();
1034  using futurator = futurize<decltype(mapper(*begin))>;
1035  while (begin != end) {
1036  ret = futurator::apply(mapper, *begin++).then_wrapped([s = s.get(), ret = std::move(ret)] (auto f) mutable {
1037  try {
1038  s->result = s->reduce(std::move(s->result), std::move(f.get0()));
1039  return std::move(ret);
1040  } catch (...) {
1041  return std::move(ret).then_wrapped([ex = std::current_exception()] (auto f) {
1042  f.ignore_ready_future();
1043  return make_exception_future<>(ex);
1044  });
1045  }
1046  });
1047  }
1048  return ret.then([s] {
1049  return make_ready_future<Initial>(std::move(s->result));
1050  });
1051 }
1052 
1088 template <typename Range, typename Mapper, typename Initial, typename Reduce>
1089 GCC6_CONCEPT( requires requires (Range range, Mapper mapper, Initial initial, Reduce reduce) {
1090  std::begin(range);
1091  std::end(range);
1092  mapper(*std::begin(range));
1093  requires is_future<std::remove_reference_t<decltype(mapper(*std::begin(range)))>>::value;
1094  { reduce(std::move(initial), mapper(*std::begin(range)).get0()) } -> Initial;
1095 } )
1096 inline
1097 future<Initial>
1098 map_reduce(Range&& range, Mapper&& mapper, Initial initial, Reduce reduce) {
1099  return map_reduce(std::begin(range), std::end(range), std::forward<Mapper>(mapper),
1100  std::move(initial), std::move(reduce));
1101 }
1102 
1103 // Implements @Reducer concept. Calculates the result by
1104 // adding elements to the accumulator.
1105 template <typename Result, typename Addend = Result>
1106 class adder {
1107 private:
1108  Result _result;
1109 public:
1110  future<> operator()(const Addend& value) {
1111  _result += value;
1112  return make_ready_future<>();
1113  }
1114  Result get() && {
1115  return std::move(_result);
1116  }
1117 };
1118 
1119 inline
1120 future<> now() {
1121  return make_ready_future<>();
1122 }
1123 
1124 // Returns a future which is not ready but is scheduled to resolve soon.
1125 future<> later();
1126 
1127 class timed_out_error : public std::exception {
1128 public:
1129  virtual const char* what() const noexcept {
1130  return "timedout";
1131  }
1132 };
1133 
1135  static auto timeout() {
1136  return timed_out_error();
1137  }
1138 };
1139 
1152 template<typename ExceptionFactory = default_timeout_exception_factory, typename Clock, typename Duration, typename... T>
1153 future<T...> with_timeout(std::chrono::time_point<Clock, Duration> timeout, future<T...> f) {
1154  if (f.available()) {
1155  return f;
1156  }
1157  auto pr = std::make_unique<promise<T...>>();
1158  auto result = pr->get_future();
1159  timer<Clock> timer([&pr = *pr] {
1160  pr.set_exception(std::make_exception_ptr(ExceptionFactory::timeout()));
1161  });
1162  timer.arm(timeout);
1163  // Future is returned indirectly.
1164  (void)f.then_wrapped([pr = std::move(pr), timer = std::move(timer)] (auto&& f) mutable {
1165  if (timer.cancel()) {
1166  f.forward_to(std::move(*pr));
1167  } else {
1168  f.ignore_ready_future();
1169  }
1170  });
1171  return result;
1172 }
1173 
1174 namespace internal {
1175 
1176 template<typename Future>
1177 struct future_has_value {
1178  enum {
1179  value = !std::is_same<std::decay_t<Future>, future<>>::value
1180  };
1181 };
1182 
1183 template<typename Tuple>
1184 struct tuple_to_future;
1185 
1186 template<typename... Elements>
1187 struct tuple_to_future<std::tuple<Elements...>> {
1188  using type = future<Elements...>;
1189  using promise_type = promise<Elements...>;
1190 
1191  static auto make_ready(std::tuple<Elements...> t) {
1192  auto create_future = [] (auto&&... args) {
1193  return make_ready_future<Elements...>(std::move(args)...);
1194  };
1195  return apply(create_future, std::move(t));
1196  }
1197 
1198  static auto make_failed(std::exception_ptr excp) {
1199  return make_exception_future<Elements...>(std::move(excp));
1200  }
1201 };
1202 
1203 template<typename... Futures>
1204 class extract_values_from_futures_tuple {
1205  static auto transform(std::tuple<Futures...> futures) {
1206  auto prepare_result = [] (auto futures) {
1207  auto fs = tuple_filter_by_type<internal::future_has_value>(std::move(futures));
1208  return tuple_map(std::move(fs), [] (auto&& e) {
1209  return internal::untuple(e.get());
1210  });
1211  };
1212 
1213  using tuple_futurizer = internal::tuple_to_future<decltype(prepare_result(std::move(futures)))>;
1214 
1215  std::exception_ptr excp;
1216  tuple_for_each(futures, [&excp] (auto& f) {
1217  if (!excp) {
1218  if (f.failed()) {
1219  excp = f.get_exception();
1220  }
1221  } else {
1222  f.ignore_ready_future();
1223  }
1224  });
1225  if (excp) {
1226  return tuple_futurizer::make_failed(std::move(excp));
1227  }
1228 
1229  return tuple_futurizer::make_ready(prepare_result(std::move(futures)));
1230  }
1231 public:
1232  using future_type = decltype(transform(std::declval<std::tuple<Futures...>>()));
1233  using promise_type = typename future_type::promise_type;
1234 
1235  static void set_promise(promise_type& p, std::tuple<Futures...> tuple) {
1236  transform(std::move(tuple)).forward_to(std::move(p));
1237  }
1238 
1239  static future_type make_ready_future(std::tuple<Futures...> tuple) {
1240  return transform(std::move(tuple));
1241  }
1242 };
1243 
1244 template<typename Future>
1245 struct extract_values_from_futures_vector {
1246  using value_type = decltype(untuple(std::declval<typename Future::value_type>()));
1247 
1248  using future_type = future<std::vector<value_type>>;
1249 
1250  static future_type run(std::vector<Future> futures) {
1251  std::vector<value_type> values;
1252  values.reserve(futures.size());
1253 
1254  std::exception_ptr excp;
1255  for (auto&& f : futures) {
1256  if (!excp) {
1257  if (f.failed()) {
1258  excp = f.get_exception();
1259  } else {
1260  values.emplace_back(untuple(f.get()));
1261  }
1262  } else {
1263  f.ignore_ready_future();
1264  }
1265  }
1266  if (excp) {
1267  return make_exception_future<std::vector<value_type>>(std::move(excp));
1268  }
1269  return make_ready_future<std::vector<value_type>>(std::move(values));
1270  }
1271 };
1272 
1273 template<>
1274 struct extract_values_from_futures_vector<future<>> {
1275  using future_type = future<>;
1276 
1277  static future_type run(std::vector<future<>> futures) {
1278  std::exception_ptr excp;
1279  for (auto&& f : futures) {
1280  if (!excp) {
1281  if (f.failed()) {
1282  excp = f.get_exception();
1283  }
1284  } else {
1285  f.ignore_ready_future();
1286  }
1287  }
1288  if (excp) {
1289  return make_exception_future<>(std::move(excp));
1290  }
1291  return make_ready_future<>();
1292  }
1293 };
1294 
1295 }
1296 
1297 template<typename... Futures>
1298 GCC6_CONCEPT( requires seastar::AllAreFutures<Futures...> )
1299 inline auto when_all_succeed_impl(Futures&&... futures) {
1300  using state = internal::when_all_state<internal::extract_values_from_futures_tuple<Futures...>, Futures...>;
1301  return state::wait_all(std::forward<Futures>(futures)...);
1302 }
1303 
1314 template <typename... FutOrFuncs>
1315 inline auto when_all_succeed(FutOrFuncs&&... fut_or_funcs) {
1316  return when_all_succeed_impl(futurize_apply_if_func(std::forward<FutOrFuncs>(fut_or_funcs))...);
1317 }
1318 
1329 template <typename FutureIterator, typename = typename std::iterator_traits<FutureIterator>::value_type>
1330 GCC6_CONCEPT( requires requires (FutureIterator i) {
1331  *i++;
1332  { i != i } -> bool;
1333  requires is_future<std::remove_reference_t<decltype(*i)>>::value;
1334 } )
1335 inline auto
1336 when_all_succeed(FutureIterator begin, FutureIterator end) {
1337  using itraits = std::iterator_traits<FutureIterator>;
1338  using result_transform = internal::extract_values_from_futures_vector<typename itraits::value_type>;
1339  return internal::do_when_all<result_transform>(std::move(begin), std::move(end));
1340 }
1341 
1342 }
1343 
future< T... > make_ready_future(A &&... value)
Creates a future in an available, value state.
Definition: future.hh:1302
A representation of a possibly not-yet-computed value.
Definition: future.hh:79
future< T... > with_timeout(std::chrono::time_point< Clock, Duration > timeout, future< T... > f)
Wait for either a future, or a timeout, whichever comes first.
Definition: future-util.hh:1153
future< Initial > map_reduce(Range &&range, Mapper &&mapper, Initial initial, Reduce reduce)
Definition: future-util.hh:1098
void set_exception(std::exception_ptr &&ex) noexcept
Marks the promise as failed.
Definition: future.hh:513
Definition: future-util.hh:940
Definition: timer.hh:36
compat::optional< T > optional_type
Type used by AsyncAction while looping.
Definition: future-util.hh:325
typename repeat_until_value_type_helper< std::result_of_t< AsyncAction()> >::future_type repeat_until_value_return_type
Return value of repeat_until_value()
Definition: future-util.hh:335
Definition: future-util.hh:914
STL namespace.
future keep_doing(AsyncAction action)
Definition: future-util.hh:539
Definition: future-util.hh:1106
Definition: alloc_failure_injector.hh:111
auto tuple_map(const std::tuple< Elements... > &t, Function &&f)
Definition: tuple_utils.hh:138
future do_for_each(Container &c, AsyncAction action)
Definition: future-util.hh:602
future repeat(AsyncAction action)
Definition: future-util.hh:278
Converts a type to a future type, if it isn&#39;t already.
Definition: future.hh:592
T value_type
The type of the value we are computing.
Definition: future-util.hh:323
Result then_wrapped(Func &&func) noexcept
Schedule a block of code to run when the future is ready, allowing for exception handling.
Definition: future.hh:1027
future parallel_for_each(Range &&range, Func &&func)
Definition: future-util.hh:179
bool available() const noexcept
Checks whether the future is available.
Definition: future.hh:936
auto when_all_succeed(FutureIterator begin, FutureIterator end)
Definition: future-util.hh:1336
Definition: future-util.hh:1134
void set_value(A &&... a)
Sets the promises value.
Definition: future.hh:502
Identifies function calls that are accounted as a group.
Definition: scheduling.hh:87
Definition: future-util.hh:188
repeat_until_value_return_type< AsyncAction > repeat_until_value(AsyncAction action)
Definition: future-util.hh:403
future< std::vector< typename std::iterator_traits< FutureIterator >::value_type > > when_all(FutureIterator begin, FutureIterator end)
Definition: future-util.hh:906
Result then(Func &&func) noexcept
Schedule a block of code to run when the future is ready.
Definition: future.hh:966
future< T... > make_exception_future(std::exception_ptr value) noexcept
Creates a future in an available, failed state.
Definition: future.hh:1308
void tuple_for_each(const std::tuple< Elements... > &t, Function &&f)
Definition: tuple_utils.hh:156
future< T... > get_future() noexcept
Gets the promise&#39;s associated future.
Definition: future.hh:1272
Definition: shared_ptr.hh:62
Definition: task.hh:29
Type-safe boolean.
Definition: bool_class.hh:57
auto with_scheduling_group(scheduling_group sg, Func func, Args &&... args)
run a callable (with some arbitrary arguments) in a scheduling group
Definition: future-util.hh:76
future do_until(StopCondition stop_cond, AsyncAction action)
Definition: future-util.hh:501
future parallel_for_each(Iterator begin, Iterator end, Func &&func)
Definition: future-util.hh:126
Definition: future-util.hh:1127
Seastar API namespace.
Definition: abort_on_ebadf.hh:24