Seastar
High performance C++ framework for concurrent servers
map_reduce.hh
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18
19/*
20 * Copyright (C) 2020 ScyllaDB.
21 */
22
23#pragma once
24
25#ifndef SEASTAR_MODULE
26#include <iterator>
27
28#include <seastar/core/future.hh>
29#include <seastar/core/shared_ptr.hh>
30#include <seastar/util/modules.hh>
31#endif
32
33namespace seastar {
34
35SEASTAR_MODULE_EXPORT_BEGIN
36
39
41
42template <typename T, typename Ptr, bool IsFuture>
43struct reducer_with_get_traits;
44
45template <typename T, typename Ptr>
46struct reducer_with_get_traits<T, Ptr, false> {
47 using result_type = decltype(std::declval<T>().get());
48 using future_type = future<result_type>;
49 static future_type maybe_call_get(future<> f, Ptr r) {
50 return f.then([r = std::move(r)] () mutable {
51 return make_ready_future<result_type>(std::move(r->reducer).get());
52 });
53 }
54};
55
56template <typename T, typename Ptr>
57struct reducer_with_get_traits<T, Ptr, true> {
58 using future_type = decltype(std::declval<T>().get());
59 static future_type maybe_call_get(future<> f, Ptr r) {
60 return f.then([r = r.get()] {
61 return r->reducer.get();
62 }).then_wrapped([r] (future_type f) {
63 return f;
64 });
65 }
66};
67
68template <typename T, typename Ptr = lw_shared_ptr<T>, typename V = void>
69struct reducer_traits {
70 using future_type = future<>;
71 static future_type maybe_call_get(future<> f, Ptr r) {
72 return f.then([r = std::move(r)] {});
73 }
74};
75
76template <typename T, typename Ptr>
77struct reducer_traits<T, Ptr, decltype(std::declval<T>().get(), void())> : public reducer_with_get_traits<T, Ptr, is_future<std::invoke_result_t<decltype(&T::get),T>>::value> {};
78
80
96
97// TODO: specialize for non-deferring reducer
98template <typename Iterator, typename Mapper, typename Reducer>
99requires requires (Iterator i, Mapper mapper, Reducer reduce) {
100 *i++;
101 { i != i } -> std::convertible_to<bool>;
102 mapper(*i);
103 reduce(futurize_invoke(mapper, *i).get());
104}
105inline
106auto
107map_reduce(Iterator begin, Iterator end, Mapper&& mapper, Reducer&& r)
108 -> typename reducer_traits<Reducer>::future_type
109{
110 struct state {
111 Mapper mapper;
112 Reducer reducer;
113 };
114 auto s = make_lw_shared(state{std::forward<Mapper>(mapper), std::forward<Reducer>(r)});
115 future<> ret = make_ready_future<>();
116 while (begin != end) {
117 ret = futurize_invoke(s->mapper, *begin++).then_wrapped([ret = std::move(ret), s] (auto f) mutable {
118 return ret.then_wrapped([f = std::move(f), s] (auto rf) mutable {
119 if (rf.failed()) {
120 f.ignore_ready_future();
121 return rf;
122 } else {
123 return futurize_invoke(s->reducer, std::move(f.get()));
124 }
125 });
126 });
127 }
128 return reducer_traits<Reducer, lw_shared_ptr<state>>::maybe_call_get(std::move(ret), s);
129}
130
173template <typename Iterator, typename Mapper, typename Initial, typename Reduce>
174requires requires (Iterator i, Mapper mapper, Initial initial, Reduce reduce) {
175 *i++;
176 { i != i} -> std::convertible_to<bool>;
177 mapper(*i);
178 requires is_future<decltype(mapper(*i))>::value;
179 { reduce(std::move(initial), mapper(*i).get()) } -> std::convertible_to<Initial>;
180}
181inline
182future<Initial>
183map_reduce(Iterator begin, Iterator end, Mapper&& mapper, Initial initial, Reduce reduce) {
184 struct state {
185 Mapper mapper;
186 Initial result;
187 Reduce reduce;
188 };
189 auto s = make_lw_shared(state{std::forward<Mapper>(mapper), std::move(initial), std::move(reduce)});
190 future<> ret = make_ready_future<>();
191 while (begin != end) {
192 ret = futurize_invoke(s->mapper, *begin++).then_wrapped([s = s.get(), ret = std::move(ret)] (auto f) mutable {
193 try {
194 s->result = s->reduce(std::move(s->result), f.get());
195 return std::move(ret);
196 } catch (...) {
197 return std::move(ret).then_wrapped([ex = std::current_exception()] (auto f) {
198 f.ignore_ready_future();
199 return make_exception_future<>(ex);
200 });
201 }
202 });
203 }
204 return ret.then([s] {
205 return make_ready_future<Initial>(std::move(s->result));
206 });
207}
208
251template <typename Range, typename Mapper, typename Initial, typename Reduce>
252requires requires (Range range, Mapper mapper, Initial initial, Reduce reduce) {
253 std::begin(range);
254 std::end(range);
255 mapper(*std::begin(range));
256 requires is_future<std::remove_reference_t<decltype(mapper(*std::begin(range)))>>::value;
257 { reduce(std::move(initial), mapper(*std::begin(range)).get()) } -> std::convertible_to<Initial>;
258}
259inline
260future<Initial>
261map_reduce(Range&& range, Mapper&& mapper, Initial initial, Reduce reduce) {
262 return map_reduce(std::begin(range), std::end(range), std::forward<Mapper>(mapper),
263 std::move(initial), std::move(reduce));
264}
265
268template <typename Result, typename Addend = Result>
269class adder {
270private:
271 Result _result;
272public:
273 future<> operator()(const Addend& value) {
274 _result += value;
275 return make_ready_future<>();
276 }
277 Result get() && {
278 return std::move(_result);
279 }
280};
281
283
284SEASTAR_MODULE_EXPORT_END
285
286} // namespace seastar
Definition: map_reduce.hh:269
A representation of a possibly not-yet-computed value.
Definition: future.hh:1240
auto map_reduce(Iterator begin, Iterator end, Mapper &&mapper, Reducer &&r) -> typename reducer_traits< Reducer >::future_type
Definition: map_reduce.hh:107
future< Initial > map_reduce(Range &&range, Mapper &&mapper, Initial initial, Reduce reduce)
Definition: map_reduce.hh:261
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
STL namespace.