Seastar
High performance C++ framework for concurrent servers
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Modules Pages
loop.hh
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18
19/*
20 * Copyright (C) 2020 ScyllaDB.
21 */
22
23#pragma once
24
25#ifndef SEASTAR_MODULE
26#include <cassert>
27#include <cstddef>
28#include <iterator>
29#include <memory>
30#include <optional>
31#include <type_traits>
32#include <vector>
33#endif
34#include <seastar/core/future.hh>
35#include <seastar/core/task.hh>
36#include <seastar/util/assert.hh>
37#include <seastar/util/bool_class.hh>
38#include <seastar/util/modules.hh>
39#include <seastar/core/semaphore.hh>
40
41namespace seastar {
42
43SEASTAR_MODULE_EXPORT_BEGIN
44
47
48// The AsyncAction concept represents an action which can complete later than
49// the actual function invocation. It is represented by a function which
50// returns a future which resolves when the action is done.
51
54
55namespace internal {
56
57template <typename AsyncAction>
58class repeater final : public continuation_base<stop_iteration> {
59 promise<> _promise;
60 AsyncAction _action;
61public:
62 explicit repeater(AsyncAction&& action) : _action(std::move(action)) {}
63 future<> get_future() { return _promise.get_future(); }
64 task* waiting_task() noexcept override { return _promise.waiting_task(); }
65 virtual void run_and_dispose() noexcept override {
66 if (_state.failed()) {
67 _promise.set_exception(std::move(_state).get_exception());
68 delete this;
69 return;
70 } else {
71 if (_state.get() == stop_iteration::yes) {
72 _promise.set_value();
73 delete this;
74 return;
75 }
76 _state = {};
77 }
78 try {
79 do {
80 auto f = futurize_invoke(_action);
81 if (!f.available()) {
82 internal::set_callback(std::move(f), this);
83 return;
84 }
85 if (f.get() == stop_iteration::yes) {
86 _promise.set_value();
87 delete this;
88 return;
89 }
90 } while (!need_preempt());
91 } catch (...) {
92 _promise.set_exception(std::current_exception());
93 delete this;
94 return;
95 }
96 _state.set(stop_iteration::no);
97 schedule(this);
98 }
99};
100
101} // namespace internal
102
103// Delete these overloads so that the actual implementation can use a
104// universal reference but still reject lvalue references.
105template<typename AsyncAction>
106future<> repeat(const AsyncAction& action) noexcept = delete;
107template<typename AsyncAction>
108future<> repeat(AsyncAction& action) noexcept = delete;
109
119template<typename AsyncAction>
120requires std::is_invocable_r_v<stop_iteration, AsyncAction> || std::is_invocable_r_v<future<stop_iteration>, AsyncAction>
121inline
122future<> repeat(AsyncAction&& action) noexcept {
124 static_assert(std::is_same_v<future<stop_iteration>, typename futurator::type>, "bad AsyncAction signature");
125 for (;;) {
126 // Do not type-erase here in case this is a short repeat()
127 auto f = futurator::invoke(action);
128
129 if (!f.available() || f.failed() || need_preempt()) {
130 return [&] () noexcept {
132 auto repeater = new internal::repeater<AsyncAction>(std::move(action));
133 auto ret = repeater->get_future();
134 internal::set_callback(std::move(f), repeater);
135 return ret;
136 }();
137 }
138
139 if (f.get() == stop_iteration::yes) {
140 return make_ready_future<>();
141 }
142 }
143}
144
146
147template <typename T>
148struct repeat_until_value_type_helper;
149
151template <typename T>
152struct repeat_until_value_type_helper<future<std::optional<T>>> {
154 using value_type = T;
156 using optional_type = std::optional<T>;
158 using future_type = future<value_type>;
159};
160
162template <typename AsyncAction>
163using repeat_until_value_return_type
164 = typename repeat_until_value_type_helper<typename futurize<std::invoke_result_t<AsyncAction>>::type>::future_type;
165
167
168namespace internal {
169
170template <typename AsyncAction, typename T>
171class repeat_until_value_state final : public continuation_base<std::optional<T>> {
172 promise<T> _promise;
173 AsyncAction _action;
174public:
175 explicit repeat_until_value_state(AsyncAction action) : _action(std::move(action)) {}
176 repeat_until_value_state(std::optional<T> st, AsyncAction action) : repeat_until_value_state(std::move(action)) {
177 this->_state.set(std::move(st));
178 }
179 future<T> get_future() { return _promise.get_future(); }
180 task* waiting_task() noexcept override { return _promise.waiting_task(); }
181 virtual void run_and_dispose() noexcept override {
182 if (this->_state.failed()) {
183 _promise.set_exception(std::move(this->_state).get_exception());
184 delete this;
185 return;
186 } else {
187 auto v = std::move(this->_state).get();
188 if (v) {
189 _promise.set_value(std::move(*v));
190 delete this;
191 return;
192 }
193 this->_state = {};
194 }
195 try {
196 do {
197 auto f = futurize_invoke(_action);
198 if (!f.available()) {
199 internal::set_callback(std::move(f), this);
200 return;
201 }
202 auto ret = f.get();
203 if (ret) {
204 _promise.set_value(std::move(*ret));
205 delete this;
206 return;
207 }
208 } while (!need_preempt());
209 } catch (...) {
210 _promise.set_exception(std::current_exception());
211 delete this;
212 return;
213 }
214 this->_state.set(std::nullopt);
215 schedule(this);
216 }
217};
218
219} // namespace internal
220
232template<typename AsyncAction>
233requires requires (AsyncAction aa) {
234 bool(futurize_invoke(aa).get());
235 futurize_invoke(aa).get().value();
236}
237repeat_until_value_return_type<AsyncAction>
238repeat_until_value(AsyncAction action) noexcept {
240 using type_helper = repeat_until_value_type_helper<typename futurator::type>;
241 // the "T" in the documentation
242 using value_type = typename type_helper::value_type;
243 using optional_type = typename type_helper::optional_type;
244 do {
245 auto f = futurator::invoke(action);
246
247 if (!f.available()) {
248 return [&] () noexcept {
250 auto state = new internal::repeat_until_value_state<AsyncAction, value_type>(std::move(action));
251 auto ret = state->get_future();
252 internal::set_callback(std::move(f), state);
253 return ret;
254 }();
255 }
256
257 if (f.failed()) {
258 return make_exception_future<value_type>(f.get_exception());
259 }
260
261 optional_type&& optional = std::move(f).get();
262 if (optional) {
263 return make_ready_future<value_type>(std::move(optional.value()));
264 }
265 } while (!need_preempt());
266
267 try {
268 auto state = new internal::repeat_until_value_state<AsyncAction, value_type>(std::nullopt, std::move(action));
269 auto f = state->get_future();
270 schedule(state);
271 return f;
272 } catch (...) {
273 return make_exception_future<value_type>(std::current_exception());
274 }
275}
276
277namespace internal {
278
279template <typename StopCondition, typename AsyncAction>
280class do_until_state final : public continuation_base<> {
281 promise<> _promise;
282 StopCondition _stop;
283 AsyncAction _action;
284public:
285 explicit do_until_state(StopCondition stop, AsyncAction action) : _stop(std::move(stop)), _action(std::move(action)) {}
286 future<> get_future() { return _promise.get_future(); }
287 task* waiting_task() noexcept override { return _promise.waiting_task(); }
288 virtual void run_and_dispose() noexcept override {
289 if (_state.available()) {
290 if (_state.failed()) {
291 _promise.set_urgent_state(std::move(_state));
292 delete this;
293 return;
294 }
295 _state = {}; // allow next cycle to overrun state
296 }
297 try {
298 do {
299 if (_stop()) {
300 _promise.set_value();
301 delete this;
302 return;
303 }
304 auto f = _action();
305 if (!f.available()) {
306 internal::set_callback(std::move(f), this);
307 return;
308 }
309 if (f.failed()) {
310 f.forward_to(std::move(_promise));
311 delete this;
312 return;
313 }
314 } while (!need_preempt());
315 } catch (...) {
316 _promise.set_exception(std::current_exception());
317 delete this;
318 return;
319 }
320 schedule(this);
321 }
322};
323
324} // namespace internal
325
331// in the returned future.
337template<typename AsyncAction, typename StopCondition>
338requires std::is_invocable_r_v<bool, StopCondition> && std::is_invocable_r_v<future<>, AsyncAction>
339inline
340future<> do_until(StopCondition stop_cond, AsyncAction action) noexcept {
341 using namespace internal;
342 for (;;) {
343 try {
344 if (stop_cond()) {
345 return make_ready_future<>();
346 }
347 } catch (...) {
349 }
350 auto f = futurize_invoke(action);
351 if (f.failed()) {
352 return f;
353 }
354 if (!f.available() || need_preempt()) {
355 return [&] () noexcept {
357 auto task = new do_until_state<StopCondition, AsyncAction>(std::move(stop_cond), std::move(action));
358 auto ret = task->get_future();
359 internal::set_callback(std::move(f), task);
360 return ret;
361 }();
362 }
363 }
364}
365
373template<typename AsyncAction>
374requires std::is_invocable_r_v<future<>, AsyncAction>
375inline
376future<> keep_doing(AsyncAction action) noexcept {
377 return repeat([action = std::move(action)] () mutable {
378 return action().then([] {
379 return stop_iteration::no;
380 });
381 });
382}
383
384namespace internal {
385template <typename Iterator, class Sentinel, typename AsyncAction>
386class do_for_each_state final : public continuation_base<> {
387 Iterator _begin;
388 Sentinel _end;
389 AsyncAction _action;
390 promise<> _pr;
391
392public:
393 do_for_each_state(Iterator begin, Sentinel end, AsyncAction action, future<>&& first_unavailable)
394 : _begin(std::move(begin)), _end(std::move(end)), _action(std::move(action)) {
395 internal::set_callback(std::move(first_unavailable), this);
396 }
397 virtual void run_and_dispose() noexcept override {
398 std::unique_ptr<do_for_each_state> zis(this);
399 if (_state.failed()) {
400 _pr.set_urgent_state(std::move(_state));
401 return;
402 }
403 while (_begin != _end) {
404 auto f = futurize_invoke(_action, *_begin++);
405 if (f.failed()) {
406 f.forward_to(std::move(_pr));
407 return;
408 }
409 if (!f.available() || need_preempt()) {
410 _state = {};
411 internal::set_callback(std::move(f), this);
412 zis.release();
413 return;
414 }
415 }
416 _pr.set_value();
417 }
418 task* waiting_task() noexcept override {
419 return _pr.waiting_task();
420 }
421 future<> get_future() {
422 return _pr.get_future();
423 }
424};
425
426template<typename Iterator, typename Sentinel, typename AsyncAction>
427inline
428future<> do_for_each_impl(Iterator begin, Sentinel end, AsyncAction action) {
429 while (begin != end) {
430 auto f = futurize_invoke(action, *begin++);
431 if (f.failed()) {
432 return f;
433 }
434 if (!f.available() || need_preempt()) {
435 auto* s = new internal::do_for_each_state<Iterator, Sentinel, AsyncAction>{
436 std::move(begin), std::move(end), std::move(action), std::move(f)};
437 return s->get_future();
438 }
439 }
440 return make_ready_future<>();
441}
442} // namespace internal
443
445
458template<typename Iterator, typename Sentinel, typename AsyncAction>
459requires (
460 requires (Iterator i, AsyncAction aa) {
461 { futurize_invoke(aa, *i) } -> std::same_as<future<>>;
462 } &&
463 (std::same_as<Sentinel, Iterator> || std::sentinel_for<Sentinel, Iterator>)
464)
465inline
466future<> do_for_each(Iterator begin, Sentinel end, AsyncAction action) noexcept {
467 try {
468 return internal::do_for_each_impl(std::move(begin), std::move(end), std::move(action));
469 } catch (...) {
471 }
472}
473
485template<typename Range, typename AsyncAction>
486requires requires (Range c, AsyncAction aa) {
487 { futurize_invoke(aa, *std::begin(c)) } -> std::same_as<future<>>;
488 std::end(c);
489}
490inline
491future<> do_for_each(Range& c, AsyncAction action) noexcept {
492 try {
493 return internal::do_for_each_impl(std::begin(c), std::end(c), std::move(action));
494 } catch (...) {
496 }
497}
498
499namespace internal {
500
501template <typename T, typename = void>
502struct has_iterator_category : std::false_type {};
503
504template <typename T>
505struct has_iterator_category<T, std::void_t<typename std::iterator_traits<T>::iterator_category >> : std::true_type {};
506
507template <typename Iterator, typename Sentinel>
508inline
509size_t
510iterator_range_estimate_vector_capacity(Iterator begin, Sentinel end) {
511 if constexpr (std::forward_iterator<Iterator> &&
512 std::forward_iterator<Sentinel>) {
513 return std::ranges::distance(begin, end);
514 } else if constexpr (std::random_access_iterator<Iterator> &&
515 std::random_access_iterator<Sentinel>) {
516 return std::ranges::distance(begin, end);
517 } else {
518 // For InputIterators we can't estimate needed capacity
519 return 0;
520 }
521}
522
523} // namespace internal
524
526
527class parallel_for_each_state final : private continuation_base<> {
528 std::vector<future<>> _incomplete;
529 promise<> _result;
530 std::exception_ptr _ex;
531private:
532 // Wait for one of the futures in _incomplete to complete, and then
533 // decide what to do: wait for another one, or deliver _result if all
534 // are complete.
535 void wait_for_one() noexcept;
536 virtual void run_and_dispose() noexcept override;
537 task* waiting_task() noexcept override { return _result.waiting_task(); }
538public:
539 parallel_for_each_state(size_t n);
540 void add_future(future<>&& f);
541 future<> get_future();
542};
543
545
565template <typename Iterator, typename Sentinel, typename Func>
566requires (requires (Func f, Iterator i) { { f(*i) } -> std::same_as<future<>>; { i++ }; } && (std::same_as<Sentinel, Iterator> || std::sentinel_for<Sentinel, Iterator>))
567// We use a conjunction with std::same_as<Sentinel, Iterator> because std::sentinel_for requires Sentinel to be semiregular,
568// which implies that it requires Sentinel to be default-constructible, which is unnecessarily strict in below's context and could
569// break legacy code, for which it holds that Sentinel equals Iterator.
570inline
571future<>
572parallel_for_each(Iterator begin, Sentinel end, Func&& func) noexcept {
573 parallel_for_each_state* s = nullptr;
574 // Process all elements, giving each future the following treatment:
575 // - available, not failed: do nothing
576 // - available, failed: collect exception in ex
577 // - not available: collect in s (allocating it if needed)
578 while (begin != end) {
579 auto f = futurize_invoke(std::forward<Func>(func), *begin);
580 ++begin;
582 if (!f.available() || f.failed()) {
583 if (!s) {
584 size_t n{0U};
585 if constexpr (internal::has_iterator_category<Iterator>::value) {
586 // We need if-constexpr here because there exist iterators for which std::iterator_traits
587 // does not have 'iterator_category' as member type
588 n = (internal::iterator_range_estimate_vector_capacity(begin, end) + 1);
589 }
590 s = new parallel_for_each_state(n);
591 }
592 s->add_future(std::move(f));
593 }
594 }
595 // If any futures were not available, hand off to parallel_for_each_state::start().
596 // Otherwise we can return a result immediately.
597 if (s) {
598 // s->get_future() takes ownership of s (and chains it to one of the futures it contains)
599 // so this isn't a leak
600 return s->get_future();
601 }
602 return make_ready_future<>();
603}
604
624
625namespace internal {
626
627template <typename Range, typename Func>
628inline
629future<>
630parallel_for_each_impl(Range&& range, Func&& func) {
631 return parallel_for_each(std::begin(range), std::end(range),
632 std::forward<Func>(func));
633}
634
635} // namespace internal
636
637template <typename Range, typename Func>
638requires requires (Func f, Range r) {
639 { f(*std::begin(r)) } -> std::same_as<future<>>;
640 std::end(r);
641}
642inline
643future<>
644parallel_for_each(Range&& range, Func&& func) noexcept {
645 auto impl = internal::parallel_for_each_impl<Range, Func>;
646 return futurize_invoke(impl, std::forward<Range>(range), std::forward<Func>(func));
647}
648
670template <typename Iterator, typename Sentinel, typename Func>
671requires (requires (Func f, Iterator i) { { f(*i) } -> std::same_as<future<>>; { ++i }; } && (std::same_as<Sentinel, Iterator> || std::sentinel_for<Sentinel, Iterator>) )
672// We use a conjunction with std::same_as<Sentinel, Iterator> because std::sentinel_for requires Sentinel to be semiregular,
673// which implies that it requires Sentinel to be default-constructible, which is unnecessarily strict in below's context and could
674// break legacy code, for which it holds that Sentinel equals Iterator.
675inline
676future<>
677max_concurrent_for_each(Iterator begin, Sentinel end, size_t max_concurrent, Func&& func) noexcept {
678 struct state {
679 Iterator begin;
680 Sentinel end;
681 Func func;
682 size_t max_concurrent;
683 semaphore sem;
684 std::exception_ptr err;
685
686 state(Iterator begin_, Sentinel end_, size_t max_concurrent_, Func func_)
687 : begin(std::move(begin_))
688 , end(std::move(end_))
689 , func(std::move(func_))
690 , max_concurrent(max_concurrent_)
691 , sem(max_concurrent_)
692 , err()
693 { }
694 };
695
696 SEASTAR_ASSERT(max_concurrent > 0);
697
698 try {
699 return do_with(state(std::move(begin), std::move(end), max_concurrent, std::forward<Func>(func)), [] (state& s) {
700 return do_until([&s] { return s.begin == s.end; }, [&s] {
701 return s.sem.wait().then([&s] () mutable noexcept {
702 // Possibly run in background and signal _sem when the task is done.
703 // The background tasks are waited on using _sem.
704 (void)futurize_invoke(s.func, *s.begin).then_wrapped([&s] (future<> fut) {
705 if (fut.failed()) {
706 auto e = fut.get_exception();;
707 if (!s.err) {
708 s.err = std::move(e);
709 }
710 }
711 s.sem.signal();
712 });
713 ++s.begin;
714 });
715 }).then([&s] {
716 // Wait for any background task to finish
717 // and signal and semaphore
718 return s.sem.wait(s.max_concurrent);
719 }).then([&s] {
720 if (!s.err) {
721 return make_ready_future<>();
722 }
723 return seastar::make_exception_future<>(std::move(s.err));
724 });
725 });
726 } catch (...) {
728 }
729}
730
751template <typename Range, typename Func>
752requires requires (Func f, Range r) {
753 { f(*std::begin(r)) } -> std::same_as<future<>>;
754 std::end(r);
755}
756inline
757future<>
758max_concurrent_for_each(Range&& range, size_t max_concurrent, Func&& func) noexcept {
759 try {
760 return max_concurrent_for_each(std::begin(range), std::end(range), max_concurrent, std::forward<Func>(func));
761 } catch (...) {
763 }
764}
765
767
768SEASTAR_MODULE_EXPORT_END
769
770} // namespace seastar
Type-safe boolean.
Definition: bool_class.hh:58
A representation of a possibly not-yet-computed value.
Definition: future.hh:1197
void set_value(A &&... a) noexcept
Sets the promises value.
Definition: future.hh:968
Definition: task.hh:34
future< T > get_future() noexcept
Gets the promise's associated future.
Definition: future.hh:1852
future< T > current_exception_as_future() noexcept
Returns std::current_exception() wrapped in a future.
Definition: future.hh:1888
future max_concurrent_for_each(Iterator begin, Sentinel end, size_t max_concurrent, Func &&func) noexcept
Definition: loop.hh:677
future do_for_each(Iterator begin, Sentinel end, AsyncAction action) noexcept
Call a function for each item in a range, sequentially (iterator version).
Definition: loop.hh:466
auto do_with(T1 &&rv1, T2 &&rv2, More &&... more) noexcept
Definition: do_with.hh:135
future do_until(StopCondition stop_cond, AsyncAction action) noexcept
Definition: loop.hh:340
future keep_doing(AsyncAction action) noexcept
Definition: loop.hh:376
repeat_until_value_return_type< AsyncAction > repeat_until_value(AsyncAction action) noexcept
Definition: loop.hh:238
future parallel_for_each(Iterator begin, Sentinel end, Func &&func) noexcept
Run tasks in parallel (iterator version).
Definition: loop.hh:572
future max_concurrent_for_each(Range &&range, size_t max_concurrent, Func &&func) noexcept
Definition: loop.hh:758
Definition: loop.hh:52
holds the implementation parts of the metrics layer, do not use directly.
Definition: critical_alloc_section.hh:80
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
STL namespace.
Converts a type to a future type, if it isn't already.
Definition: future.hh:1786