25 #ifndef SEASTAR_MODULE
28 #include <seastar/core/future.hh>
29 #include <seastar/core/shared_ptr.hh>
30 #include <seastar/util/modules.hh>
35 SEASTAR_MODULE_EXPORT_BEGIN
42 template <
typename T,
typename Ptr,
bool IsFuture>
43 struct reducer_with_get_traits;
45 template <
typename T,
typename Ptr>
46 struct reducer_with_get_traits<T, Ptr, false> {
47 using result_type = decltype(std::declval<T>().get());
48 using future_type = future<result_type>;
49 static future_type maybe_call_get(
future<> f, Ptr r) {
50 return f.
then([r = std::move(r)] ()
mutable {
51 return make_ready_future<result_type>(std::move(r->reducer).get());
56 template <
typename T,
typename Ptr>
57 struct reducer_with_get_traits<T, Ptr, true> {
58 using future_type = decltype(std::declval<T>().get());
59 static future_type maybe_call_get(
future<> f, Ptr r) {
60 return f.
then([r = r.get()] {
61 return r->reducer.get();
62 }).then_wrapped([r] (future_type f) {
68 template <
typename T,
typename Ptr = lw_shared_ptr<T>,
typename V =
void>
69 struct reducer_traits {
71 static future_type maybe_call_get(
future<> f, Ptr r) {
72 return f.
then([r = std::move(r)] {});
76 template <
typename T,
typename Ptr>
77 struct reducer_traits<T, Ptr, decltype(std::declval<T>().get(), void())> :
public reducer_with_get_traits<T, Ptr, is_future<std::invoke_result_t<decltype(&T::get),T>>::value> {};
98 template <
typename Iterator,
typename Mapper,
typename Reducer>
99 SEASTAR_CONCEPT( requires requires (Iterator i, Mapper mapper, Reducer reduce) {
101 { i != i } -> std::convertible_to<bool>;
103 reduce(futurize_invoke(mapper, *i).get0());
107 map_reduce(Iterator begin, Iterator end, Mapper&& mapper, Reducer&& r)
108 ->
typename reducer_traits<Reducer>::future_type
114 auto s = make_lw_shared(state{std::forward<Mapper>(mapper), std::forward<Reducer>(r)});
115 future<> ret = make_ready_future<>();
116 while (begin != end) {
117 ret = futurize_invoke(s->mapper, *begin++).then_wrapped([ret = std::move(ret), s] (
auto f)
mutable {
118 return ret.then_wrapped([f = std::move(f), s] (
auto rf)
mutable {
120 f.ignore_ready_future();
123 return futurize_invoke(s->reducer, std::move(f.get0()));
128 return reducer_traits<Reducer, lw_shared_ptr<state>>::maybe_call_get(std::move(ret), s);
173 template <
typename Iterator,
typename Mapper,
typename Initial,
typename Reduce>
174 SEASTAR_CONCEPT( requires requires (Iterator i, Mapper mapper, Initial initial, Reduce reduce) {
176 { i != i} -> std::convertible_to<bool>;
178 requires is_future<decltype(mapper(*i))>::value;
179 { reduce(std::move(initial), mapper(*i).get0()) } -> std::convertible_to<Initial>;
183 map_reduce(Iterator begin, Iterator end, Mapper&& mapper, Initial initial, Reduce reduce) {
189 auto s = make_lw_shared(state{std::forward<Mapper>(mapper), std::move(initial), std::move(reduce)});
190 future<> ret = make_ready_future<>();
191 while (begin != end) {
192 ret = futurize_invoke(s->mapper, *begin++).then_wrapped([s = s.get(), ret = std::move(ret)] (
auto f)
mutable {
194 s->result = s->reduce(std::move(s->result), f.get0());
195 return std::move(ret);
197 return std::move(ret).then_wrapped([ex = std::current_exception()] (
auto f) {
199 return make_exception_future<>(ex);
204 return ret.then([s] {
205 return make_ready_future<Initial>(std::move(s->result));
251 template <
typename Range,
typename Mapper,
typename Initial,
typename Reduce>
252 SEASTAR_CONCEPT( requires requires (Range range, Mapper mapper, Initial initial, Reduce reduce) {
255 mapper(*std::begin(range));
256 requires is_future<std::remove_reference_t<decltype(mapper(*std::begin(range)))>>::value;
257 { reduce(std::move(initial), mapper(*std::begin(range)).get0()) } -> std::convertible_to<Initial>;
261 map_reduce(Range&& range, Mapper&& mapper, Initial initial, Reduce reduce) {
262 return map_reduce(std::begin(range), std::end(range), std::forward<Mapper>(mapper),
263 std::move(initial), std::move(reduce));
268 template <
typename Result,
typename Addend = Result>
273 future<> operator()(
const Addend& value) {
275 return make_ready_future<>();
278 return std::move(_result);
284 SEASTAR_MODULE_EXPORT_END
Definition: map_reduce.hh:269
Result then(Func &&func) noexcept
Schedule a block of code to run when the future is ready.
Definition: future.hh:1410
void ignore_ready_future() noexcept
Ignore any result hold by this future.
Definition: future.hh:1755
auto map_reduce(Iterator begin, Iterator end, Mapper &&mapper, Reducer &&r) -> typename reducer_traits< Reducer >::future_type
Definition: map_reduce.hh:107
future< Initial > map_reduce(Range &&range, Mapper &&mapper, Initial initial, Reduce reduce)
Definition: map_reduce.hh:261
Seastar API namespace.
Definition: abort_on_ebadf.hh:26