34#include <seastar/core/future.hh>
35#include <seastar/core/task.hh>
36#include <seastar/util/bool_class.hh>
37#include <seastar/util/modules.hh>
38#include <seastar/core/semaphore.hh>
42SEASTAR_MODULE_EXPORT_BEGIN
56template <
typename AsyncAction>
57class repeater final :
public continuation_base<stop_iteration> {
61 explicit repeater(AsyncAction&& action) : _action(
std::move(action)) {}
62 future<> get_future() {
return _promise.get_future(); }
63 task* waiting_task() noexcept
override {
return _promise.waiting_task(); }
64 virtual void run_and_dispose() noexcept
override {
65 if (_state.failed()) {
66 _promise.set_exception(std::move(_state).get_exception());
70 if (_state.get() == stop_iteration::yes) {
79 auto f = futurize_invoke(_action);
81 internal::set_callback(std::move(f),
this);
84 if (f.get() == stop_iteration::yes) {
89 }
while (!need_preempt());
91 _promise.set_exception(std::current_exception());
95 _state.set(stop_iteration::no);
104template<
typename AsyncAction>
105future<> repeat(
const AsyncAction& action)
noexcept =
delete;
106template<
typename AsyncAction>
107future<> repeat(AsyncAction& action)
noexcept =
delete;
118template<
typename AsyncAction>
119requires std::is_invocable_r_v<stop_iteration, AsyncAction> || std::is_invocable_r_v<future<stop_iteration>, AsyncAction>
123 static_assert(std::is_same_v<future<stop_iteration>,
typename futurator::type>,
"bad AsyncAction signature");
126 auto f = futurator::invoke(action);
128 if (!f.available() || f.failed() || need_preempt()) {
129 return [&] ()
noexcept {
131 auto repeater =
new internal::repeater<AsyncAction>(std::move(action));
132 auto ret = repeater->get_future();
133 internal::set_callback(std::move(f), repeater);
138 if (f.get() == stop_iteration::yes) {
139 return make_ready_future<>();
147struct repeat_until_value_type_helper;
151struct repeat_until_value_type_helper<future<
std::optional<T>>> {
153 using value_type = T;
155 using optional_type = std::optional<T>;
157 using future_type = future<value_type>;
161template <
typename AsyncAction>
162using repeat_until_value_return_type
163 =
typename repeat_until_value_type_helper<typename futurize<std::invoke_result_t<AsyncAction>>::type>::future_type;
169template <
typename AsyncAction,
typename T>
170class repeat_until_value_state final :
public continuation_base<std::optional<T>> {
174 explicit repeat_until_value_state(AsyncAction action) : _action(
std::move(action)) {}
175 repeat_until_value_state(std::optional<T> st, AsyncAction action) : repeat_until_value_state(
std::move(action)) {
176 this->_state.set(std::move(st));
178 future<T> get_future() {
return _promise.get_future(); }
179 task* waiting_task() noexcept
override {
return _promise.waiting_task(); }
180 virtual void run_and_dispose() noexcept
override {
181 if (this->_state.failed()) {
182 _promise.set_exception(std::move(this->_state).get_exception());
186 auto v = std::move(this->_state).get();
188 _promise.set_value(std::move(*v));
196 auto f = futurize_invoke(_action);
197 if (!f.available()) {
198 internal::set_callback(std::move(f),
this);
203 _promise.set_value(std::move(*ret));
207 }
while (!need_preempt());
209 _promise.set_exception(std::current_exception());
213 this->_state.set(std::nullopt);
231template<
typename AsyncAction>
232requires requires (AsyncAction aa) {
233 bool(futurize_invoke(aa).get());
234 futurize_invoke(aa).get().value();
236repeat_until_value_return_type<AsyncAction>
239 using type_helper = repeat_until_value_type_helper<typename futurator::type>;
241 using value_type =
typename type_helper::value_type;
242 using optional_type =
typename type_helper::optional_type;
244 auto f = futurator::invoke(action);
246 if (!f.available()) {
247 return [&] ()
noexcept {
249 auto state =
new internal::repeat_until_value_state<AsyncAction, value_type>(std::move(action));
250 auto ret = state->get_future();
251 internal::set_callback(std::move(f), state);
257 return make_exception_future<value_type>(f.get_exception());
260 optional_type&& optional = std::move(f).get();
262 return make_ready_future<value_type>(std::move(optional.value()));
264 }
while (!need_preempt());
267 auto state =
new internal::repeat_until_value_state<AsyncAction, value_type>(std::nullopt, std::move(action));
268 auto f = state->get_future();
272 return make_exception_future<value_type>(std::current_exception());
278template <
typename StopCondition,
typename AsyncAction>
279class do_until_state final :
public continuation_base<> {
284 explicit do_until_state(StopCondition stop, AsyncAction action) : _stop(
std::move(stop)), _action(
std::move(action)) {}
285 future<> get_future() {
return _promise.get_future(); }
286 task* waiting_task() noexcept
override {
return _promise.waiting_task(); }
287 virtual void run_and_dispose() noexcept
override {
288 if (_state.available()) {
289 if (_state.failed()) {
290 _promise.set_urgent_state(std::move(_state));
299 _promise.set_value();
304 if (!f.available()) {
305 internal::set_callback(std::move(f),
this);
309 f.forward_to(std::move(_promise));
313 }
while (!need_preempt());
315 _promise.set_exception(std::current_exception());
336template<
typename AsyncAction,
typename StopCondition>
337requires std::is_invocable_r_v<bool, StopCondition> && std::is_invocable_r_v<future<>, AsyncAction>
340 using namespace internal;
344 return make_ready_future<>();
349 auto f = futurize_invoke(action);
353 if (!f.available() || need_preempt()) {
354 return [&] ()
noexcept {
356 auto task =
new do_until_state<StopCondition, AsyncAction>(std::move(stop_cond), std::move(action));
357 auto ret =
task->get_future();
358 internal::set_callback(std::move(f),
task);
372template<
typename AsyncAction>
373requires std::is_invocable_r_v<future<>, AsyncAction>
376 return repeat([action = std::move(action)] ()
mutable {
377 return action().then([] {
378 return stop_iteration::no;
384template <
typename Iterator,
typename AsyncAction>
385class do_for_each_state final :
public continuation_base<> {
392 do_for_each_state(Iterator begin, Iterator end, AsyncAction action, future<>&& first_unavailable)
393 : _begin(
std::move(begin)), _end(
std::move(end)), _action(
std::move(action)) {
394 internal::set_callback(std::move(first_unavailable),
this);
396 virtual void run_and_dispose() noexcept
override {
397 std::unique_ptr<do_for_each_state> zis(
this);
398 if (_state.failed()) {
399 _pr.set_urgent_state(std::move(_state));
402 while (_begin != _end) {
403 auto f = futurize_invoke(_action, *_begin++);
405 f.forward_to(std::move(_pr));
408 if (!f.available() || need_preempt()) {
410 internal::set_callback(std::move(f),
this);
417 task* waiting_task() noexcept
override {
418 return _pr.waiting_task();
420 future<> get_future() {
425template<
typename Iterator,
typename AsyncAction>
427future<> do_for_each_impl(Iterator begin, Iterator end, AsyncAction action) {
428 while (begin != end) {
429 auto f = futurize_invoke(action, *begin++);
433 if (!f.available() || need_preempt()) {
434 auto* s =
new internal::do_for_each_state<Iterator, AsyncAction>{
435 std::move(begin), std::move(end), std::move(action), std::move(f)};
436 return s->get_future();
439 return make_ready_future<>();
457template<
typename Iterator,
typename AsyncAction>
458requires requires (Iterator i, AsyncAction aa) {
459 { futurize_invoke(aa, *i) } -> std::same_as<future<>>;
464 return internal::do_for_each_impl(std::move(begin), std::move(end), std::move(action));
481template<
typename Container,
typename AsyncAction>
482requires requires (Container c, AsyncAction aa) {
483 { futurize_invoke(aa, *std::begin(c)) } -> std::same_as<future<>>;
489 return internal::do_for_each_impl(std::begin(c), std::end(c), std::move(action));
497template <
typename T,
typename =
void>
498struct has_iterator_category : std::false_type {};
501struct has_iterator_category<T,
std::void_t<typename std::iterator_traits<T>::iterator_category >> : std::true_type {};
503template <
typename Iterator,
typename Sentinel,
typename IteratorCategory>
506iterator_range_estimate_vector_capacity(Iterator begin, Sentinel end, IteratorCategory) {
508 if constexpr (std::is_base_of_v<std::forward_iterator_tag, IteratorCategory>) {
509 return std::distance(begin, end);
520class parallel_for_each_state final :
private continuation_base<> {
521 std::vector<future<>> _incomplete;
523 std::exception_ptr _ex;
528 void wait_for_one() noexcept;
529 virtual
void run_and_dispose() noexcept override;
530 task* waiting_task() noexcept
override {
return _result.waiting_task(); }
532 parallel_for_each_state(
size_t n);
533 void add_future(future<>&& f);
534 future<> get_future();
558template <
typename Iterator,
typename Sentinel,
typename Func>
559requires (
requires (Func f, Iterator i) { { f(*i) } -> std::same_as<future<>>; { i++ }; } && (std::same_as<Sentinel, Iterator> || std::sentinel_for<Sentinel, Iterator>))
566 parallel_for_each_state* s =
nullptr;
571 while (begin != end) {
572 auto f = futurize_invoke(std::forward<Func>(func), *begin);
575 if (!f.available() || f.failed()) {
577 using itraits = std::iterator_traits<Iterator>;
579 if constexpr (internal::has_iterator_category<Iterator>::value) {
582 n = (internal::iterator_range_estimate_vector_capacity(begin, end,
typename itraits::iterator_category{}) + 1);
584 s =
new parallel_for_each_state(n);
586 s->add_future(std::move(f));
594 return s->get_future();
596 return make_ready_future<>();
621template <
typename Range,
typename Func>
624parallel_for_each_impl(Range&& range, Func&& func) {
626 std::forward<Func>(func));
631template <
typename Range,
typename Func>
632requires requires (Func f, Range r) {
633 { f(*std::begin(r)) } -> std::same_as<future<>>;
639 auto impl = internal::parallel_for_each_impl<Range, Func>;
640 return futurize_invoke(
impl, std::forward<Range>(range), std::forward<Func>(func));
664template <
typename Iterator,
typename Sentinel,
typename Func>
665requires (
requires (Func f, Iterator i) { { f(*i) } -> std::same_as<future<>>; { ++i }; } && (std::same_as<Sentinel, Iterator> || std::sentinel_for<Sentinel, Iterator>) )
676 size_t max_concurrent;
678 std::exception_ptr err;
680 state(Iterator begin_, Sentinel end_,
size_t max_concurrent_, Func func_)
681 : begin(std::move(begin_))
682 , end(std::move(end_))
683 , func(std::move(func_))
684 , max_concurrent(max_concurrent_)
685 , sem(max_concurrent_)
690 assert(max_concurrent > 0);
693 return do_with(state(std::move(begin), std::move(end), max_concurrent, std::forward<Func>(func)), [] (state& s) {
694 return do_until([&s] {
return s.begin == s.end; }, [&s] {
695 return s.sem.wait().then([&s] ()
mutable noexcept {
698 (void)futurize_invoke(s.func, *s.begin).then_wrapped([&s] (
future<> fut) {
700 auto e = fut.get_exception();;
702 s.err = std::move(e);
712 return s.sem.wait(s.max_concurrent);
715 return make_ready_future<>();
717 return seastar::make_exception_future<>(std::move(s.err));
745template <
typename Range,
typename Func>
746requires requires (Func f, Range r) {
747 { f(*std::begin(r)) } -> std::same_as<future<>>;
762SEASTAR_MODULE_EXPORT_END
Type-safe boolean.
Definition: bool_class.hh:58
A representation of a possibly not-yet-computed value.
Definition: future.hh:1240
void set_value(A &&... a) noexcept
Sets the promises value.
Definition: future.hh:990
future< T > get_future() noexcept
Gets the promise's associated future.
Definition: future.hh:1926
future< T > current_exception_as_future() noexcept
Returns std::current_exception() wrapped in a future.
Definition: future.hh:1962
future max_concurrent_for_each(Iterator begin, Sentinel end, size_t max_concurrent, Func &&func) noexcept
Definition: loop.hh:671
auto do_with(T1 &&rv1, T2 &&rv2, More &&... more) noexcept
Definition: do_with.hh:135
future do_until(StopCondition stop_cond, AsyncAction action) noexcept
Definition: loop.hh:339
future keep_doing(AsyncAction action) noexcept
Definition: loop.hh:375
repeat_until_value_return_type< AsyncAction > repeat_until_value(AsyncAction action) noexcept
Definition: loop.hh:237
future do_for_each(Iterator begin, Iterator end, AsyncAction action) noexcept
Call a function for each item in a range, sequentially (iterator version).
Definition: loop.hh:462
future parallel_for_each(Iterator begin, Sentinel end, Func &&func) noexcept
Run tasks in parallel (iterator version).
Definition: loop.hh:565
future max_concurrent_for_each(Range &&range, size_t max_concurrent, Func &&func) noexcept
Definition: loop.hh:752
holds the implementation parts of the metrics layer, do not use directly.
Definition: critical_alloc_section.hh:80
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
Converts a type to a future type, if it isn't already.
Definition: future.hh:1853