Seastar
High performance C++ framework for concurrent servers
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Modules Pages
loop.hh
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18
19/*
20 * Copyright (C) 2020 ScyllaDB.
21 */
22
23#pragma once
24
25#ifndef SEASTAR_MODULE
26#include <cassert>
27#include <cstddef>
28#include <iterator>
29#include <memory>
30#include <optional>
31#include <type_traits>
32#include <vector>
33#endif
34#include <seastar/core/future.hh>
35#include <seastar/core/task.hh>
36#include <seastar/util/bool_class.hh>
37#include <seastar/util/modules.hh>
38#include <seastar/core/semaphore.hh>
39
40namespace seastar {
41
42SEASTAR_MODULE_EXPORT_BEGIN
43
46
47// The AsyncAction concept represents an action which can complete later than
48// the actual function invocation. It is represented by a function which
49// returns a future which resolves when the action is done.
50
53
54namespace internal {
55
56template <typename AsyncAction>
57class repeater final : public continuation_base<stop_iteration> {
58 promise<> _promise;
59 AsyncAction _action;
60public:
61 explicit repeater(AsyncAction&& action) : _action(std::move(action)) {}
62 future<> get_future() { return _promise.get_future(); }
63 task* waiting_task() noexcept override { return _promise.waiting_task(); }
64 virtual void run_and_dispose() noexcept override {
65 if (_state.failed()) {
66 _promise.set_exception(std::move(_state).get_exception());
67 delete this;
68 return;
69 } else {
70 if (_state.get() == stop_iteration::yes) {
71 _promise.set_value();
72 delete this;
73 return;
74 }
75 _state = {};
76 }
77 try {
78 do {
79 auto f = futurize_invoke(_action);
80 if (!f.available()) {
81 internal::set_callback(std::move(f), this);
82 return;
83 }
84 if (f.get() == stop_iteration::yes) {
85 _promise.set_value();
86 delete this;
87 return;
88 }
89 } while (!need_preempt());
90 } catch (...) {
91 _promise.set_exception(std::current_exception());
92 delete this;
93 return;
94 }
95 _state.set(stop_iteration::no);
96 schedule(this);
97 }
98};
99
100} // namespace internal
101
102// Delete these overloads so that the actual implementation can use a
103// universal reference but still reject lvalue references.
104template<typename AsyncAction>
105future<> repeat(const AsyncAction& action) noexcept = delete;
106template<typename AsyncAction>
107future<> repeat(AsyncAction& action) noexcept = delete;
108
118template<typename AsyncAction>
119requires std::is_invocable_r_v<stop_iteration, AsyncAction> || std::is_invocable_r_v<future<stop_iteration>, AsyncAction>
120inline
121future<> repeat(AsyncAction&& action) noexcept {
123 static_assert(std::is_same_v<future<stop_iteration>, typename futurator::type>, "bad AsyncAction signature");
124 for (;;) {
125 // Do not type-erase here in case this is a short repeat()
126 auto f = futurator::invoke(action);
127
128 if (!f.available() || f.failed() || need_preempt()) {
129 return [&] () noexcept {
131 auto repeater = new internal::repeater<AsyncAction>(std::move(action));
132 auto ret = repeater->get_future();
133 internal::set_callback(std::move(f), repeater);
134 return ret;
135 }();
136 }
137
138 if (f.get() == stop_iteration::yes) {
139 return make_ready_future<>();
140 }
141 }
142}
143
145
146template <typename T>
147struct repeat_until_value_type_helper;
148
150template <typename T>
151struct repeat_until_value_type_helper<future<std::optional<T>>> {
153 using value_type = T;
155 using optional_type = std::optional<T>;
157 using future_type = future<value_type>;
158};
159
161template <typename AsyncAction>
162using repeat_until_value_return_type
163 = typename repeat_until_value_type_helper<typename futurize<std::invoke_result_t<AsyncAction>>::type>::future_type;
164
166
167namespace internal {
168
169template <typename AsyncAction, typename T>
170class repeat_until_value_state final : public continuation_base<std::optional<T>> {
171 promise<T> _promise;
172 AsyncAction _action;
173public:
174 explicit repeat_until_value_state(AsyncAction action) : _action(std::move(action)) {}
175 repeat_until_value_state(std::optional<T> st, AsyncAction action) : repeat_until_value_state(std::move(action)) {
176 this->_state.set(std::move(st));
177 }
178 future<T> get_future() { return _promise.get_future(); }
179 task* waiting_task() noexcept override { return _promise.waiting_task(); }
180 virtual void run_and_dispose() noexcept override {
181 if (this->_state.failed()) {
182 _promise.set_exception(std::move(this->_state).get_exception());
183 delete this;
184 return;
185 } else {
186 auto v = std::move(this->_state).get();
187 if (v) {
188 _promise.set_value(std::move(*v));
189 delete this;
190 return;
191 }
192 this->_state = {};
193 }
194 try {
195 do {
196 auto f = futurize_invoke(_action);
197 if (!f.available()) {
198 internal::set_callback(std::move(f), this);
199 return;
200 }
201 auto ret = f.get();
202 if (ret) {
203 _promise.set_value(std::move(*ret));
204 delete this;
205 return;
206 }
207 } while (!need_preempt());
208 } catch (...) {
209 _promise.set_exception(std::current_exception());
210 delete this;
211 return;
212 }
213 this->_state.set(std::nullopt);
214 schedule(this);
215 }
216};
217
218} // namespace internal
219
231template<typename AsyncAction>
232requires requires (AsyncAction aa) {
233 bool(futurize_invoke(aa).get());
234 futurize_invoke(aa).get().value();
235}
236repeat_until_value_return_type<AsyncAction>
237repeat_until_value(AsyncAction action) noexcept {
239 using type_helper = repeat_until_value_type_helper<typename futurator::type>;
240 // the "T" in the documentation
241 using value_type = typename type_helper::value_type;
242 using optional_type = typename type_helper::optional_type;
243 do {
244 auto f = futurator::invoke(action);
245
246 if (!f.available()) {
247 return [&] () noexcept {
249 auto state = new internal::repeat_until_value_state<AsyncAction, value_type>(std::move(action));
250 auto ret = state->get_future();
251 internal::set_callback(std::move(f), state);
252 return ret;
253 }();
254 }
255
256 if (f.failed()) {
257 return make_exception_future<value_type>(f.get_exception());
258 }
259
260 optional_type&& optional = std::move(f).get();
261 if (optional) {
262 return make_ready_future<value_type>(std::move(optional.value()));
263 }
264 } while (!need_preempt());
265
266 try {
267 auto state = new internal::repeat_until_value_state<AsyncAction, value_type>(std::nullopt, std::move(action));
268 auto f = state->get_future();
269 schedule(state);
270 return f;
271 } catch (...) {
272 return make_exception_future<value_type>(std::current_exception());
273 }
274}
275
276namespace internal {
277
278template <typename StopCondition, typename AsyncAction>
279class do_until_state final : public continuation_base<> {
280 promise<> _promise;
281 StopCondition _stop;
282 AsyncAction _action;
283public:
284 explicit do_until_state(StopCondition stop, AsyncAction action) : _stop(std::move(stop)), _action(std::move(action)) {}
285 future<> get_future() { return _promise.get_future(); }
286 task* waiting_task() noexcept override { return _promise.waiting_task(); }
287 virtual void run_and_dispose() noexcept override {
288 if (_state.available()) {
289 if (_state.failed()) {
290 _promise.set_urgent_state(std::move(_state));
291 delete this;
292 return;
293 }
294 _state = {}; // allow next cycle to overrun state
295 }
296 try {
297 do {
298 if (_stop()) {
299 _promise.set_value();
300 delete this;
301 return;
302 }
303 auto f = _action();
304 if (!f.available()) {
305 internal::set_callback(std::move(f), this);
306 return;
307 }
308 if (f.failed()) {
309 f.forward_to(std::move(_promise));
310 delete this;
311 return;
312 }
313 } while (!need_preempt());
314 } catch (...) {
315 _promise.set_exception(std::current_exception());
316 delete this;
317 return;
318 }
319 schedule(this);
320 }
321};
322
323} // namespace internal
324
330// in the returned future.
336template<typename AsyncAction, typename StopCondition>
337requires std::is_invocable_r_v<bool, StopCondition> && std::is_invocable_r_v<future<>, AsyncAction>
338inline
339future<> do_until(StopCondition stop_cond, AsyncAction action) noexcept {
340 using namespace internal;
341 for (;;) {
342 try {
343 if (stop_cond()) {
344 return make_ready_future<>();
345 }
346 } catch (...) {
348 }
349 auto f = futurize_invoke(action);
350 if (f.failed()) {
351 return f;
352 }
353 if (!f.available() || need_preempt()) {
354 return [&] () noexcept {
356 auto task = new do_until_state<StopCondition, AsyncAction>(std::move(stop_cond), std::move(action));
357 auto ret = task->get_future();
358 internal::set_callback(std::move(f), task);
359 return ret;
360 }();
361 }
362 }
363}
364
372template<typename AsyncAction>
373requires std::is_invocable_r_v<future<>, AsyncAction>
374inline
375future<> keep_doing(AsyncAction action) noexcept {
376 return repeat([action = std::move(action)] () mutable {
377 return action().then([] {
378 return stop_iteration::no;
379 });
380 });
381}
382
383namespace internal {
384template <typename Iterator, class Sentinel, typename AsyncAction>
385class do_for_each_state final : public continuation_base<> {
386 Iterator _begin;
387 Sentinel _end;
388 AsyncAction _action;
389 promise<> _pr;
390
391public:
392 do_for_each_state(Iterator begin, Sentinel end, AsyncAction action, future<>&& first_unavailable)
393 : _begin(std::move(begin)), _end(std::move(end)), _action(std::move(action)) {
394 internal::set_callback(std::move(first_unavailable), this);
395 }
396 virtual void run_and_dispose() noexcept override {
397 std::unique_ptr<do_for_each_state> zis(this);
398 if (_state.failed()) {
399 _pr.set_urgent_state(std::move(_state));
400 return;
401 }
402 while (_begin != _end) {
403 auto f = futurize_invoke(_action, *_begin++);
404 if (f.failed()) {
405 f.forward_to(std::move(_pr));
406 return;
407 }
408 if (!f.available() || need_preempt()) {
409 _state = {};
410 internal::set_callback(std::move(f), this);
411 zis.release();
412 return;
413 }
414 }
415 _pr.set_value();
416 }
417 task* waiting_task() noexcept override {
418 return _pr.waiting_task();
419 }
420 future<> get_future() {
421 return _pr.get_future();
422 }
423};
424
425template<typename Iterator, typename Sentinel, typename AsyncAction>
426inline
427future<> do_for_each_impl(Iterator begin, Sentinel end, AsyncAction action) {
428 while (begin != end) {
429 auto f = futurize_invoke(action, *begin++);
430 if (f.failed()) {
431 return f;
432 }
433 if (!f.available() || need_preempt()) {
434 auto* s = new internal::do_for_each_state<Iterator, Sentinel, AsyncAction>{
435 std::move(begin), std::move(end), std::move(action), std::move(f)};
436 return s->get_future();
437 }
438 }
439 return make_ready_future<>();
440}
441} // namespace internal
442
444
457template<typename Iterator, typename Sentinel, typename AsyncAction>
458requires (
459 requires (Iterator i, AsyncAction aa) {
460 { futurize_invoke(aa, *i) } -> std::same_as<future<>>;
461 } &&
462 (std::same_as<Sentinel, Iterator> || std::sentinel_for<Sentinel, Iterator>)
463)
464inline
465future<> do_for_each(Iterator begin, Sentinel end, AsyncAction action) noexcept {
466 try {
467 return internal::do_for_each_impl(std::move(begin), std::move(end), std::move(action));
468 } catch (...) {
470 }
471}
472
484template<typename Range, typename AsyncAction>
485requires requires (Range c, AsyncAction aa) {
486 { futurize_invoke(aa, *std::begin(c)) } -> std::same_as<future<>>;
487 std::end(c);
488}
489inline
490future<> do_for_each(Range& c, AsyncAction action) noexcept {
491 try {
492 return internal::do_for_each_impl(std::begin(c), std::end(c), std::move(action));
493 } catch (...) {
495 }
496}
497
498namespace internal {
499
500template <typename T, typename = void>
501struct has_iterator_category : std::false_type {};
502
503template <typename T>
504struct has_iterator_category<T, std::void_t<typename std::iterator_traits<T>::iterator_category >> : std::true_type {};
505
506template <typename Iterator, typename Sentinel>
507inline
508size_t
509iterator_range_estimate_vector_capacity(Iterator begin, Sentinel end) {
510 if constexpr (std::forward_iterator<Iterator> &&
511 std::forward_iterator<Sentinel>) {
512 return std::ranges::distance(begin, end);
513 } else if constexpr (std::random_access_iterator<Iterator> &&
514 std::random_access_iterator<Sentinel>) {
515 return std::ranges::distance(begin, end);
516 } else {
517 // For InputIterators we can't estimate needed capacity
518 return 0;
519 }
520}
521
522} // namespace internal
523
525
526class parallel_for_each_state final : private continuation_base<> {
527 std::vector<future<>> _incomplete;
528 promise<> _result;
529 std::exception_ptr _ex;
530private:
531 // Wait for one of the futures in _incomplete to complete, and then
532 // decide what to do: wait for another one, or deliver _result if all
533 // are complete.
534 void wait_for_one() noexcept;
535 virtual void run_and_dispose() noexcept override;
536 task* waiting_task() noexcept override { return _result.waiting_task(); }
537public:
538 parallel_for_each_state(size_t n);
539 void add_future(future<>&& f);
540 future<> get_future();
541};
542
544
564template <typename Iterator, typename Sentinel, typename Func>
565requires (requires (Func f, Iterator i) { { f(*i) } -> std::same_as<future<>>; { i++ }; } && (std::same_as<Sentinel, Iterator> || std::sentinel_for<Sentinel, Iterator>))
566// We use a conjunction with std::same_as<Sentinel, Iterator> because std::sentinel_for requires Sentinel to be semiregular,
567// which implies that it requires Sentinel to be default-constructible, which is unnecessarily strict in below's context and could
568// break legacy code, for which it holds that Sentinel equals Iterator.
569inline
570future<>
571parallel_for_each(Iterator begin, Sentinel end, Func&& func) noexcept {
572 parallel_for_each_state* s = nullptr;
573 // Process all elements, giving each future the following treatment:
574 // - available, not failed: do nothing
575 // - available, failed: collect exception in ex
576 // - not available: collect in s (allocating it if needed)
577 while (begin != end) {
578 auto f = futurize_invoke(std::forward<Func>(func), *begin);
579 ++begin;
581 if (!f.available() || f.failed()) {
582 if (!s) {
583 size_t n{0U};
584 if constexpr (internal::has_iterator_category<Iterator>::value) {
585 // We need if-constexpr here because there exist iterators for which std::iterator_traits
586 // does not have 'iterator_category' as member type
587 n = (internal::iterator_range_estimate_vector_capacity(begin, end) + 1);
588 }
589 s = new parallel_for_each_state(n);
590 }
591 s->add_future(std::move(f));
592 }
593 }
594 // If any futures were not available, hand off to parallel_for_each_state::start().
595 // Otherwise we can return a result immediately.
596 if (s) {
597 // s->get_future() takes ownership of s (and chains it to one of the futures it contains)
598 // so this isn't a leak
599 return s->get_future();
600 }
601 return make_ready_future<>();
602}
603
623
624namespace internal {
625
626template <typename Range, typename Func>
627inline
628future<>
629parallel_for_each_impl(Range&& range, Func&& func) {
630 return parallel_for_each(std::begin(range), std::end(range),
631 std::forward<Func>(func));
632}
633
634} // namespace internal
635
636template <typename Range, typename Func>
637requires requires (Func f, Range r) {
638 { f(*std::begin(r)) } -> std::same_as<future<>>;
639 std::end(r);
640}
641inline
642future<>
643parallel_for_each(Range&& range, Func&& func) noexcept {
644 auto impl = internal::parallel_for_each_impl<Range, Func>;
645 return futurize_invoke(impl, std::forward<Range>(range), std::forward<Func>(func));
646}
647
669template <typename Iterator, typename Sentinel, typename Func>
670requires (requires (Func f, Iterator i) { { f(*i) } -> std::same_as<future<>>; { ++i }; } && (std::same_as<Sentinel, Iterator> || std::sentinel_for<Sentinel, Iterator>) )
671// We use a conjunction with std::same_as<Sentinel, Iterator> because std::sentinel_for requires Sentinel to be semiregular,
672// which implies that it requires Sentinel to be default-constructible, which is unnecessarily strict in below's context and could
673// break legacy code, for which it holds that Sentinel equals Iterator.
674inline
675future<>
676max_concurrent_for_each(Iterator begin, Sentinel end, size_t max_concurrent, Func&& func) noexcept {
677 struct state {
678 Iterator begin;
679 Sentinel end;
680 Func func;
681 size_t max_concurrent;
682 semaphore sem;
683 std::exception_ptr err;
684
685 state(Iterator begin_, Sentinel end_, size_t max_concurrent_, Func func_)
686 : begin(std::move(begin_))
687 , end(std::move(end_))
688 , func(std::move(func_))
689 , max_concurrent(max_concurrent_)
690 , sem(max_concurrent_)
691 , err()
692 { }
693 };
694
695 assert(max_concurrent > 0);
696
697 try {
698 return do_with(state(std::move(begin), std::move(end), max_concurrent, std::forward<Func>(func)), [] (state& s) {
699 return do_until([&s] { return s.begin == s.end; }, [&s] {
700 return s.sem.wait().then([&s] () mutable noexcept {
701 // Possibly run in background and signal _sem when the task is done.
702 // The background tasks are waited on using _sem.
703 (void)futurize_invoke(s.func, *s.begin).then_wrapped([&s] (future<> fut) {
704 if (fut.failed()) {
705 auto e = fut.get_exception();;
706 if (!s.err) {
707 s.err = std::move(e);
708 }
709 }
710 s.sem.signal();
711 });
712 ++s.begin;
713 });
714 }).then([&s] {
715 // Wait for any background task to finish
716 // and signal and semaphore
717 return s.sem.wait(s.max_concurrent);
718 }).then([&s] {
719 if (!s.err) {
720 return make_ready_future<>();
721 }
722 return seastar::make_exception_future<>(std::move(s.err));
723 });
724 });
725 } catch (...) {
727 }
728}
729
750template <typename Range, typename Func>
751requires requires (Func f, Range r) {
752 { f(*std::begin(r)) } -> std::same_as<future<>>;
753 std::end(r);
754}
755inline
756future<>
757max_concurrent_for_each(Range&& range, size_t max_concurrent, Func&& func) noexcept {
758 try {
759 return max_concurrent_for_each(std::begin(range), std::end(range), max_concurrent, std::forward<Func>(func));
760 } catch (...) {
762 }
763}
764
766
767SEASTAR_MODULE_EXPORT_END
768
769} // namespace seastar
Type-safe boolean.
Definition: bool_class.hh:58
A representation of a possibly not-yet-computed value.
Definition: future.hh:1219
void set_value(A &&... a) noexcept
Sets the promises value.
Definition: future.hh:969
Definition: task.hh:34
future< T > get_future() noexcept
Gets the promise's associated future.
Definition: future.hh:1905
future< T > current_exception_as_future() noexcept
Returns std::current_exception() wrapped in a future.
Definition: future.hh:1941
future max_concurrent_for_each(Iterator begin, Sentinel end, size_t max_concurrent, Func &&func) noexcept
Definition: loop.hh:676
future do_for_each(Iterator begin, Sentinel end, AsyncAction action) noexcept
Call a function for each item in a range, sequentially (iterator version).
Definition: loop.hh:465
auto do_with(T1 &&rv1, T2 &&rv2, More &&... more) noexcept
Definition: do_with.hh:135
future do_until(StopCondition stop_cond, AsyncAction action) noexcept
Definition: loop.hh:339
future keep_doing(AsyncAction action) noexcept
Definition: loop.hh:375
repeat_until_value_return_type< AsyncAction > repeat_until_value(AsyncAction action) noexcept
Definition: loop.hh:237
future parallel_for_each(Iterator begin, Sentinel end, Func &&func) noexcept
Run tasks in parallel (iterator version).
Definition: loop.hh:571
future max_concurrent_for_each(Range &&range, size_t max_concurrent, Func &&func) noexcept
Definition: loop.hh:757
Definition: loop.hh:51
holds the implementation parts of the metrics layer, do not use directly.
Definition: critical_alloc_section.hh:80
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
STL namespace.
Converts a type to a future type, if it isn't already.
Definition: future.hh:1832