Seastar
High performance C++ framework for concurrent servers
map_reduce.hh
1 /*
2  * This file is open source software, licensed to you under the terms
3  * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4  * distributed with this work for additional information regarding copyright
5  * ownership. You may not use this file except in compliance with the License.
6  *
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an
13  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14  * KIND, either express or implied. See the License for the
15  * specific language governing permissions and limitations
16  * under the License.
17  */
18 
19 /*
20  * Copyright (C) 2020 ScyllaDB.
21  */
22 
23 #pragma once
24 
25 #ifndef SEASTAR_MODULE
26 #include <iterator>
27 
28 #include <seastar/core/future.hh>
29 #include <seastar/core/shared_ptr.hh>
30 #include <seastar/util/modules.hh>
31 #endif
32 
33 namespace seastar {
34 
35 SEASTAR_MODULE_EXPORT_BEGIN
36 
39 
41 
42 template <typename T, typename Ptr, bool IsFuture>
43 struct reducer_with_get_traits;
44 
45 template <typename T, typename Ptr>
46 struct reducer_with_get_traits<T, Ptr, false> {
47  using result_type = decltype(std::declval<T>().get());
48  using future_type = future<result_type>;
49  static future_type maybe_call_get(future<> f, Ptr r) {
50  return f.then([r = std::move(r)] () mutable {
51  return make_ready_future<result_type>(std::move(r->reducer).get());
52  });
53  }
54 };
55 
56 template <typename T, typename Ptr>
57 struct reducer_with_get_traits<T, Ptr, true> {
58  using future_type = decltype(std::declval<T>().get());
59  static future_type maybe_call_get(future<> f, Ptr r) {
60  return f.then([r = r.get()] {
61  return r->reducer.get();
62  }).then_wrapped([r] (future_type f) {
63  return f;
64  });
65  }
66 };
67 
68 template <typename T, typename Ptr = lw_shared_ptr<T>, typename V = void>
69 struct reducer_traits {
70  using future_type = future<>;
71  static future_type maybe_call_get(future<> f, Ptr r) {
72  return f.then([r = std::move(r)] {});
73  }
74 };
75 
76 template <typename T, typename Ptr>
77 struct reducer_traits<T, Ptr, decltype(std::declval<T>().get(), void())> : public reducer_with_get_traits<T, Ptr, is_future<std::invoke_result_t<decltype(&T::get),T>>::value> {};
78 
80 
96 
97 // TODO: specialize for non-deferring reducer
98 template <typename Iterator, typename Mapper, typename Reducer>
99 SEASTAR_CONCEPT( requires requires (Iterator i, Mapper mapper, Reducer reduce) {
100  *i++;
101  { i != i } -> std::convertible_to<bool>;
102  mapper(*i);
103  reduce(futurize_invoke(mapper, *i).get0());
104 } )
105 inline
106 auto
107 map_reduce(Iterator begin, Iterator end, Mapper&& mapper, Reducer&& r)
108  -> typename reducer_traits<Reducer>::future_type
109 {
110  struct state {
111  Mapper mapper;
112  Reducer reducer;
113  };
114  auto s = make_lw_shared(state{std::forward<Mapper>(mapper), std::forward<Reducer>(r)});
115  future<> ret = make_ready_future<>();
116  while (begin != end) {
117  ret = futurize_invoke(s->mapper, *begin++).then_wrapped([ret = std::move(ret), s] (auto f) mutable {
118  return ret.then_wrapped([f = std::move(f), s] (auto rf) mutable {
119  if (rf.failed()) {
120  f.ignore_ready_future();
121  return rf;
122  } else {
123  return futurize_invoke(s->reducer, std::move(f.get0()));
124  }
125  });
126  });
127  }
128  return reducer_traits<Reducer, lw_shared_ptr<state>>::maybe_call_get(std::move(ret), s);
129 }
130 
173 template <typename Iterator, typename Mapper, typename Initial, typename Reduce>
174 SEASTAR_CONCEPT( requires requires (Iterator i, Mapper mapper, Initial initial, Reduce reduce) {
175  *i++;
176  { i != i} -> std::convertible_to<bool>;
177  mapper(*i);
178  requires is_future<decltype(mapper(*i))>::value;
179  { reduce(std::move(initial), mapper(*i).get0()) } -> std::convertible_to<Initial>;
180 } )
181 inline
182 future<Initial>
183 map_reduce(Iterator begin, Iterator end, Mapper&& mapper, Initial initial, Reduce reduce) {
184  struct state {
185  Mapper mapper;
186  Initial result;
187  Reduce reduce;
188  };
189  auto s = make_lw_shared(state{std::forward<Mapper>(mapper), std::move(initial), std::move(reduce)});
190  future<> ret = make_ready_future<>();
191  while (begin != end) {
192  ret = futurize_invoke(s->mapper, *begin++).then_wrapped([s = s.get(), ret = std::move(ret)] (auto f) mutable {
193  try {
194  s->result = s->reduce(std::move(s->result), f.get0());
195  return std::move(ret);
196  } catch (...) {
197  return std::move(ret).then_wrapped([ex = std::current_exception()] (auto f) {
199  return make_exception_future<>(ex);
200  });
201  }
202  });
203  }
204  return ret.then([s] {
205  return make_ready_future<Initial>(std::move(s->result));
206  });
207 }
208 
251 template <typename Range, typename Mapper, typename Initial, typename Reduce>
252 SEASTAR_CONCEPT( requires requires (Range range, Mapper mapper, Initial initial, Reduce reduce) {
253  std::begin(range);
254  std::end(range);
255  mapper(*std::begin(range));
256  requires is_future<std::remove_reference_t<decltype(mapper(*std::begin(range)))>>::value;
257  { reduce(std::move(initial), mapper(*std::begin(range)).get0()) } -> std::convertible_to<Initial>;
258 } )
259 inline
260 future<Initial>
261 map_reduce(Range&& range, Mapper&& mapper, Initial initial, Reduce reduce) {
262  return map_reduce(std::begin(range), std::end(range), std::forward<Mapper>(mapper),
263  std::move(initial), std::move(reduce));
264 }
265 
268 template <typename Result, typename Addend = Result>
269 class adder {
270 private:
271  Result _result;
272 public:
273  future<> operator()(const Addend& value) {
274  _result += value;
275  return make_ready_future<>();
276  }
277  Result get() && {
278  return std::move(_result);
279  }
280 };
281 
283 
284 SEASTAR_MODULE_EXPORT_END
285 
286 } // namespace seastar
Definition: map_reduce.hh:269
Result then(Func &&func) noexcept
Schedule a block of code to run when the future is ready.
Definition: future.hh:1410
void ignore_ready_future() noexcept
Ignore any result hold by this future.
Definition: future.hh:1755
auto map_reduce(Iterator begin, Iterator end, Mapper &&mapper, Reducer &&r) -> typename reducer_traits< Reducer >::future_type
Definition: map_reduce.hh:107
future< Initial > map_reduce(Range &&range, Mapper &&mapper, Initial initial, Reduce reduce)
Definition: map_reduce.hh:261
Seastar API namespace.
Definition: abort_on_ebadf.hh:26