Seastar
High performance C++ framework for concurrent servers
loop.hh
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18
19/*
20 * Copyright (C) 2020 ScyllaDB.
21 */
22
23#pragma once
24
25#ifndef SEASTAR_MODULE
26#include <cassert>
27#include <cstddef>
28#include <iterator>
29#include <memory>
30#include <optional>
31#include <type_traits>
32#include <vector>
33#endif
34#include <seastar/core/future.hh>
35#include <seastar/core/task.hh>
36#include <seastar/util/bool_class.hh>
37#include <seastar/util/modules.hh>
38#include <seastar/core/semaphore.hh>
39
40namespace seastar {
41
42SEASTAR_MODULE_EXPORT_BEGIN
43
46
47// The AsyncAction concept represents an action which can complete later than
48// the actual function invocation. It is represented by a function which
49// returns a future which resolves when the action is done.
50
53
54namespace internal {
55
56template <typename AsyncAction>
57class repeater final : public continuation_base<stop_iteration> {
58 promise<> _promise;
59 AsyncAction _action;
60public:
61 explicit repeater(AsyncAction&& action) : _action(std::move(action)) {}
62 future<> get_future() { return _promise.get_future(); }
63 task* waiting_task() noexcept override { return _promise.waiting_task(); }
64 virtual void run_and_dispose() noexcept override {
65 if (_state.failed()) {
66 _promise.set_exception(std::move(_state).get_exception());
67 delete this;
68 return;
69 } else {
70 if (_state.get() == stop_iteration::yes) {
71 _promise.set_value();
72 delete this;
73 return;
74 }
75 _state = {};
76 }
77 try {
78 do {
79 auto f = futurize_invoke(_action);
80 if (!f.available()) {
81 internal::set_callback(std::move(f), this);
82 return;
83 }
84 if (f.get() == stop_iteration::yes) {
85 _promise.set_value();
86 delete this;
87 return;
88 }
89 } while (!need_preempt());
90 } catch (...) {
91 _promise.set_exception(std::current_exception());
92 delete this;
93 return;
94 }
95 _state.set(stop_iteration::no);
96 schedule(this);
97 }
98};
99
100} // namespace internal
101
102// Delete these overloads so that the actual implementation can use a
103// universal reference but still reject lvalue references.
104template<typename AsyncAction>
105future<> repeat(const AsyncAction& action) noexcept = delete;
106template<typename AsyncAction>
107future<> repeat(AsyncAction& action) noexcept = delete;
108
118template<typename AsyncAction>
119requires std::is_invocable_r_v<stop_iteration, AsyncAction> || std::is_invocable_r_v<future<stop_iteration>, AsyncAction>
120inline
121future<> repeat(AsyncAction&& action) noexcept {
123 static_assert(std::is_same_v<future<stop_iteration>, typename futurator::type>, "bad AsyncAction signature");
124 for (;;) {
125 // Do not type-erase here in case this is a short repeat()
126 auto f = futurator::invoke(action);
127
128 if (!f.available() || f.failed() || need_preempt()) {
129 return [&] () noexcept {
131 auto repeater = new internal::repeater<AsyncAction>(std::move(action));
132 auto ret = repeater->get_future();
133 internal::set_callback(std::move(f), repeater);
134 return ret;
135 }();
136 }
137
138 if (f.get() == stop_iteration::yes) {
139 return make_ready_future<>();
140 }
141 }
142}
143
145
146template <typename T>
147struct repeat_until_value_type_helper;
148
150template <typename T>
151struct repeat_until_value_type_helper<future<std::optional<T>>> {
153 using value_type = T;
155 using optional_type = std::optional<T>;
157 using future_type = future<value_type>;
158};
159
161template <typename AsyncAction>
162using repeat_until_value_return_type
163 = typename repeat_until_value_type_helper<typename futurize<std::invoke_result_t<AsyncAction>>::type>::future_type;
164
166
167namespace internal {
168
169template <typename AsyncAction, typename T>
170class repeat_until_value_state final : public continuation_base<std::optional<T>> {
171 promise<T> _promise;
172 AsyncAction _action;
173public:
174 explicit repeat_until_value_state(AsyncAction action) : _action(std::move(action)) {}
175 repeat_until_value_state(std::optional<T> st, AsyncAction action) : repeat_until_value_state(std::move(action)) {
176 this->_state.set(std::move(st));
177 }
178 future<T> get_future() { return _promise.get_future(); }
179 task* waiting_task() noexcept override { return _promise.waiting_task(); }
180 virtual void run_and_dispose() noexcept override {
181 if (this->_state.failed()) {
182 _promise.set_exception(std::move(this->_state).get_exception());
183 delete this;
184 return;
185 } else {
186 auto v = std::move(this->_state).get();
187 if (v) {
188 _promise.set_value(std::move(*v));
189 delete this;
190 return;
191 }
192 this->_state = {};
193 }
194 try {
195 do {
196 auto f = futurize_invoke(_action);
197 if (!f.available()) {
198 internal::set_callback(std::move(f), this);
199 return;
200 }
201 auto ret = f.get();
202 if (ret) {
203 _promise.set_value(std::move(*ret));
204 delete this;
205 return;
206 }
207 } while (!need_preempt());
208 } catch (...) {
209 _promise.set_exception(std::current_exception());
210 delete this;
211 return;
212 }
213 this->_state.set(std::nullopt);
214 schedule(this);
215 }
216};
217
218} // namespace internal
219
231template<typename AsyncAction>
232requires requires (AsyncAction aa) {
233 bool(futurize_invoke(aa).get());
234 futurize_invoke(aa).get().value();
235}
236repeat_until_value_return_type<AsyncAction>
237repeat_until_value(AsyncAction action) noexcept {
239 using type_helper = repeat_until_value_type_helper<typename futurator::type>;
240 // the "T" in the documentation
241 using value_type = typename type_helper::value_type;
242 using optional_type = typename type_helper::optional_type;
243 do {
244 auto f = futurator::invoke(action);
245
246 if (!f.available()) {
247 return [&] () noexcept {
249 auto state = new internal::repeat_until_value_state<AsyncAction, value_type>(std::move(action));
250 auto ret = state->get_future();
251 internal::set_callback(std::move(f), state);
252 return ret;
253 }();
254 }
255
256 if (f.failed()) {
257 return make_exception_future<value_type>(f.get_exception());
258 }
259
260 optional_type&& optional = std::move(f).get();
261 if (optional) {
262 return make_ready_future<value_type>(std::move(optional.value()));
263 }
264 } while (!need_preempt());
265
266 try {
267 auto state = new internal::repeat_until_value_state<AsyncAction, value_type>(std::nullopt, std::move(action));
268 auto f = state->get_future();
269 schedule(state);
270 return f;
271 } catch (...) {
272 return make_exception_future<value_type>(std::current_exception());
273 }
274}
275
276namespace internal {
277
278template <typename StopCondition, typename AsyncAction>
279class do_until_state final : public continuation_base<> {
280 promise<> _promise;
281 StopCondition _stop;
282 AsyncAction _action;
283public:
284 explicit do_until_state(StopCondition stop, AsyncAction action) : _stop(std::move(stop)), _action(std::move(action)) {}
285 future<> get_future() { return _promise.get_future(); }
286 task* waiting_task() noexcept override { return _promise.waiting_task(); }
287 virtual void run_and_dispose() noexcept override {
288 if (_state.available()) {
289 if (_state.failed()) {
290 _promise.set_urgent_state(std::move(_state));
291 delete this;
292 return;
293 }
294 _state = {}; // allow next cycle to overrun state
295 }
296 try {
297 do {
298 if (_stop()) {
299 _promise.set_value();
300 delete this;
301 return;
302 }
303 auto f = _action();
304 if (!f.available()) {
305 internal::set_callback(std::move(f), this);
306 return;
307 }
308 if (f.failed()) {
309 f.forward_to(std::move(_promise));
310 delete this;
311 return;
312 }
313 } while (!need_preempt());
314 } catch (...) {
315 _promise.set_exception(std::current_exception());
316 delete this;
317 return;
318 }
319 schedule(this);
320 }
321};
322
323} // namespace internal
324
330// in the returned future.
336template<typename AsyncAction, typename StopCondition>
337requires std::is_invocable_r_v<bool, StopCondition> && std::is_invocable_r_v<future<>, AsyncAction>
338inline
339future<> do_until(StopCondition stop_cond, AsyncAction action) noexcept {
340 using namespace internal;
341 for (;;) {
342 try {
343 if (stop_cond()) {
344 return make_ready_future<>();
345 }
346 } catch (...) {
348 }
349 auto f = futurize_invoke(action);
350 if (f.failed()) {
351 return f;
352 }
353 if (!f.available() || need_preempt()) {
354 return [&] () noexcept {
356 auto task = new do_until_state<StopCondition, AsyncAction>(std::move(stop_cond), std::move(action));
357 auto ret = task->get_future();
358 internal::set_callback(std::move(f), task);
359 return ret;
360 }();
361 }
362 }
363}
364
372template<typename AsyncAction>
373requires std::is_invocable_r_v<future<>, AsyncAction>
374inline
375future<> keep_doing(AsyncAction action) noexcept {
376 return repeat([action = std::move(action)] () mutable {
377 return action().then([] {
378 return stop_iteration::no;
379 });
380 });
381}
382
383namespace internal {
384template <typename Iterator, typename AsyncAction>
385class do_for_each_state final : public continuation_base<> {
386 Iterator _begin;
387 Iterator _end;
388 AsyncAction _action;
389 promise<> _pr;
390
391public:
392 do_for_each_state(Iterator begin, Iterator end, AsyncAction action, future<>&& first_unavailable)
393 : _begin(std::move(begin)), _end(std::move(end)), _action(std::move(action)) {
394 internal::set_callback(std::move(first_unavailable), this);
395 }
396 virtual void run_and_dispose() noexcept override {
397 std::unique_ptr<do_for_each_state> zis(this);
398 if (_state.failed()) {
399 _pr.set_urgent_state(std::move(_state));
400 return;
401 }
402 while (_begin != _end) {
403 auto f = futurize_invoke(_action, *_begin++);
404 if (f.failed()) {
405 f.forward_to(std::move(_pr));
406 return;
407 }
408 if (!f.available() || need_preempt()) {
409 _state = {};
410 internal::set_callback(std::move(f), this);
411 zis.release();
412 return;
413 }
414 }
415 _pr.set_value();
416 }
417 task* waiting_task() noexcept override {
418 return _pr.waiting_task();
419 }
420 future<> get_future() {
421 return _pr.get_future();
422 }
423};
424
425template<typename Iterator, typename AsyncAction>
426inline
427future<> do_for_each_impl(Iterator begin, Iterator end, AsyncAction action) {
428 while (begin != end) {
429 auto f = futurize_invoke(action, *begin++);
430 if (f.failed()) {
431 return f;
432 }
433 if (!f.available() || need_preempt()) {
434 auto* s = new internal::do_for_each_state<Iterator, AsyncAction>{
435 std::move(begin), std::move(end), std::move(action), std::move(f)};
436 return s->get_future();
437 }
438 }
439 return make_ready_future<>();
440}
441} // namespace internal
442
444
457template<typename Iterator, typename AsyncAction>
458requires requires (Iterator i, AsyncAction aa) {
459 { futurize_invoke(aa, *i) } -> std::same_as<future<>>;
460}
461inline
462future<> do_for_each(Iterator begin, Iterator end, AsyncAction action) noexcept {
463 try {
464 return internal::do_for_each_impl(std::move(begin), std::move(end), std::move(action));
465 } catch (...) {
467 }
468}
469
481template<typename Container, typename AsyncAction>
482requires requires (Container c, AsyncAction aa) {
483 { futurize_invoke(aa, *std::begin(c)) } -> std::same_as<future<>>;
484 std::end(c);
485}
486inline
487future<> do_for_each(Container& c, AsyncAction action) noexcept {
488 try {
489 return internal::do_for_each_impl(std::begin(c), std::end(c), std::move(action));
490 } catch (...) {
492 }
493}
494
495namespace internal {
496
497template <typename T, typename = void>
498struct has_iterator_category : std::false_type {};
499
500template <typename T>
501struct has_iterator_category<T, std::void_t<typename std::iterator_traits<T>::iterator_category >> : std::true_type {};
502
503template <typename Iterator, typename Sentinel, typename IteratorCategory>
504inline
505size_t
506iterator_range_estimate_vector_capacity(Iterator begin, Sentinel end, IteratorCategory) {
507 // May be linear time below random_access_iterator_tag, but still better than reallocation
508 if constexpr (std::is_base_of_v<std::forward_iterator_tag, IteratorCategory>) {
509 return std::distance(begin, end);
510 }
511
512 // For InputIterators we can't estimate needed capacity
513 return 0;
514}
515
516} // namespace internal
517
519
520class parallel_for_each_state final : private continuation_base<> {
521 std::vector<future<>> _incomplete;
522 promise<> _result;
523 std::exception_ptr _ex;
524private:
525 // Wait for one of the futures in _incomplete to complete, and then
526 // decide what to do: wait for another one, or deliver _result if all
527 // are complete.
528 void wait_for_one() noexcept;
529 virtual void run_and_dispose() noexcept override;
530 task* waiting_task() noexcept override { return _result.waiting_task(); }
531public:
532 parallel_for_each_state(size_t n);
533 void add_future(future<>&& f);
534 future<> get_future();
535};
536
538
558template <typename Iterator, typename Sentinel, typename Func>
559requires (requires (Func f, Iterator i) { { f(*i) } -> std::same_as<future<>>; { i++ }; } && (std::same_as<Sentinel, Iterator> || std::sentinel_for<Sentinel, Iterator>))
560// We use a conjunction with std::same_as<Sentinel, Iterator> because std::sentinel_for requires Sentinel to be semiregular,
561// which implies that it requires Sentinel to be default-constructible, which is unnecessarily strict in below's context and could
562// break legacy code, for which it holds that Sentinel equals Iterator.
563inline
564future<>
565parallel_for_each(Iterator begin, Sentinel end, Func&& func) noexcept {
566 parallel_for_each_state* s = nullptr;
567 // Process all elements, giving each future the following treatment:
568 // - available, not failed: do nothing
569 // - available, failed: collect exception in ex
570 // - not available: collect in s (allocating it if needed)
571 while (begin != end) {
572 auto f = futurize_invoke(std::forward<Func>(func), *begin);
573 ++begin;
575 if (!f.available() || f.failed()) {
576 if (!s) {
577 using itraits = std::iterator_traits<Iterator>;
578 size_t n{0U};
579 if constexpr (internal::has_iterator_category<Iterator>::value) {
580 // We need if-constexpr here because there exist iterators for which std::iterator_traits
581 // does not have 'iterator_category' as member type
582 n = (internal::iterator_range_estimate_vector_capacity(begin, end, typename itraits::iterator_category{}) + 1);
583 }
584 s = new parallel_for_each_state(n);
585 }
586 s->add_future(std::move(f));
587 }
588 }
589 // If any futures were not available, hand off to parallel_for_each_state::start().
590 // Otherwise we can return a result immediately.
591 if (s) {
592 // s->get_future() takes ownership of s (and chains it to one of the futures it contains)
593 // so this isn't a leak
594 return s->get_future();
595 }
596 return make_ready_future<>();
597}
598
618
619namespace internal {
620
621template <typename Range, typename Func>
622inline
623future<>
624parallel_for_each_impl(Range&& range, Func&& func) {
625 return parallel_for_each(std::begin(range), std::end(range),
626 std::forward<Func>(func));
627}
628
629} // namespace internal
630
631template <typename Range, typename Func>
632requires requires (Func f, Range r) {
633 { f(*std::begin(r)) } -> std::same_as<future<>>;
634 std::end(r);
635}
636inline
637future<>
638parallel_for_each(Range&& range, Func&& func) noexcept {
639 auto impl = internal::parallel_for_each_impl<Range, Func>;
640 return futurize_invoke(impl, std::forward<Range>(range), std::forward<Func>(func));
641}
642
664template <typename Iterator, typename Sentinel, typename Func>
665requires (requires (Func f, Iterator i) { { f(*i) } -> std::same_as<future<>>; { ++i }; } && (std::same_as<Sentinel, Iterator> || std::sentinel_for<Sentinel, Iterator>) )
666// We use a conjunction with std::same_as<Sentinel, Iterator> because std::sentinel_for requires Sentinel to be semiregular,
667// which implies that it requires Sentinel to be default-constructible, which is unnecessarily strict in below's context and could
668// break legacy code, for which it holds that Sentinel equals Iterator.
669inline
670future<>
671max_concurrent_for_each(Iterator begin, Sentinel end, size_t max_concurrent, Func&& func) noexcept {
672 struct state {
673 Iterator begin;
674 Sentinel end;
675 Func func;
676 size_t max_concurrent;
677 semaphore sem;
678 std::exception_ptr err;
679
680 state(Iterator begin_, Sentinel end_, size_t max_concurrent_, Func func_)
681 : begin(std::move(begin_))
682 , end(std::move(end_))
683 , func(std::move(func_))
684 , max_concurrent(max_concurrent_)
685 , sem(max_concurrent_)
686 , err()
687 { }
688 };
689
690 assert(max_concurrent > 0);
691
692 try {
693 return do_with(state(std::move(begin), std::move(end), max_concurrent, std::forward<Func>(func)), [] (state& s) {
694 return do_until([&s] { return s.begin == s.end; }, [&s] {
695 return s.sem.wait().then([&s] () mutable noexcept {
696 // Possibly run in background and signal _sem when the task is done.
697 // The background tasks are waited on using _sem.
698 (void)futurize_invoke(s.func, *s.begin).then_wrapped([&s] (future<> fut) {
699 if (fut.failed()) {
700 auto e = fut.get_exception();;
701 if (!s.err) {
702 s.err = std::move(e);
703 }
704 }
705 s.sem.signal();
706 });
707 ++s.begin;
708 });
709 }).then([&s] {
710 // Wait for any background task to finish
711 // and signal and semaphore
712 return s.sem.wait(s.max_concurrent);
713 }).then([&s] {
714 if (!s.err) {
715 return make_ready_future<>();
716 }
717 return seastar::make_exception_future<>(std::move(s.err));
718 });
719 });
720 } catch (...) {
722 }
723}
724
745template <typename Range, typename Func>
746requires requires (Func f, Range r) {
747 { f(*std::begin(r)) } -> std::same_as<future<>>;
748 std::end(r);
749}
750inline
751future<>
752max_concurrent_for_each(Range&& range, size_t max_concurrent, Func&& func) noexcept {
753 try {
754 return max_concurrent_for_each(std::begin(range), std::end(range), max_concurrent, std::forward<Func>(func));
755 } catch (...) {
757 }
758}
759
761
762SEASTAR_MODULE_EXPORT_END
763
764} // namespace seastar
Type-safe boolean.
Definition: bool_class.hh:58
A representation of a possibly not-yet-computed value.
Definition: future.hh:1240
void set_value(A &&... a) noexcept
Sets the promises value.
Definition: future.hh:990
Definition: task.hh:34
future< T > get_future() noexcept
Gets the promise's associated future.
Definition: future.hh:1926
future< T > current_exception_as_future() noexcept
Returns std::current_exception() wrapped in a future.
Definition: future.hh:1962
future max_concurrent_for_each(Iterator begin, Sentinel end, size_t max_concurrent, Func &&func) noexcept
Definition: loop.hh:671
auto do_with(T1 &&rv1, T2 &&rv2, More &&... more) noexcept
Definition: do_with.hh:135
future do_until(StopCondition stop_cond, AsyncAction action) noexcept
Definition: loop.hh:339
future keep_doing(AsyncAction action) noexcept
Definition: loop.hh:375
repeat_until_value_return_type< AsyncAction > repeat_until_value(AsyncAction action) noexcept
Definition: loop.hh:237
future do_for_each(Iterator begin, Iterator end, AsyncAction action) noexcept
Call a function for each item in a range, sequentially (iterator version).
Definition: loop.hh:462
future parallel_for_each(Iterator begin, Sentinel end, Func &&func) noexcept
Run tasks in parallel (iterator version).
Definition: loop.hh:565
future max_concurrent_for_each(Range &&range, size_t max_concurrent, Func &&func) noexcept
Definition: loop.hh:752
Definition: loop.hh:51
holds the implementation parts of the metrics layer, do not use directly.
Definition: critical_alloc_section.hh:80
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
STL namespace.
Converts a type to a future type, if it isn't already.
Definition: future.hh:1853