34#include <seastar/core/future.hh>
35#include <seastar/core/task.hh>
36#include <seastar/util/assert.hh>
37#include <seastar/util/bool_class.hh>
38#include <seastar/util/modules.hh>
39#include <seastar/core/semaphore.hh>
43SEASTAR_MODULE_EXPORT_BEGIN
57template <
typename AsyncAction>
58class repeater final :
public continuation_base<stop_iteration> {
62 explicit repeater(AsyncAction&& action) : _action(
std::move(action)) {}
63 future<> get_future() {
return _promise.get_future(); }
64 task* waiting_task() noexcept
override {
return _promise.waiting_task(); }
65 virtual void run_and_dispose() noexcept
override {
66 if (_state.failed()) {
67 _promise.set_exception(std::move(_state).get_exception());
71 if (_state.get() == stop_iteration::yes) {
80 auto f = futurize_invoke(_action);
82 internal::set_callback(std::move(f),
this);
85 if (f.get() == stop_iteration::yes) {
90 }
while (!need_preempt());
92 _promise.set_exception(std::current_exception());
96 _state.set(stop_iteration::no);
105template<
typename AsyncAction>
106future<> repeat(
const AsyncAction& action)
noexcept =
delete;
107template<
typename AsyncAction>
108future<> repeat(AsyncAction& action)
noexcept =
delete;
119template<
typename AsyncAction>
120requires std::is_invocable_r_v<stop_iteration, AsyncAction> || std::is_invocable_r_v<future<stop_iteration>, AsyncAction>
124 static_assert(std::is_same_v<future<stop_iteration>,
typename futurator::type>,
"bad AsyncAction signature");
127 auto f = futurator::invoke(action);
129 if (!f.available() || f.failed() || need_preempt()) {
130 return [&] ()
noexcept {
132 auto repeater =
new internal::repeater<AsyncAction>(std::move(action));
133 auto ret = repeater->get_future();
134 internal::set_callback(std::move(f), repeater);
139 if (f.get() == stop_iteration::yes) {
140 return make_ready_future<>();
148struct repeat_until_value_type_helper;
152struct repeat_until_value_type_helper<future<
std::optional<T>>> {
154 using value_type = T;
156 using optional_type = std::optional<T>;
158 using future_type = future<value_type>;
162template <
typename AsyncAction>
163using repeat_until_value_return_type
164 =
typename repeat_until_value_type_helper<typename futurize<std::invoke_result_t<AsyncAction>>::type>::future_type;
170template <
typename AsyncAction,
typename T>
171class repeat_until_value_state final :
public continuation_base<std::optional<T>> {
175 explicit repeat_until_value_state(AsyncAction action) : _action(
std::move(action)) {}
176 repeat_until_value_state(std::optional<T> st, AsyncAction action) : repeat_until_value_state(
std::move(action)) {
177 this->_state.set(std::move(st));
179 future<T> get_future() {
return _promise.get_future(); }
180 task* waiting_task() noexcept
override {
return _promise.waiting_task(); }
181 virtual void run_and_dispose() noexcept
override {
182 if (this->_state.failed()) {
183 _promise.set_exception(std::move(this->_state).get_exception());
187 auto v = std::move(this->_state).get();
189 _promise.set_value(std::move(*v));
197 auto f = futurize_invoke(_action);
198 if (!f.available()) {
199 internal::set_callback(std::move(f),
this);
204 _promise.set_value(std::move(*ret));
208 }
while (!need_preempt());
210 _promise.set_exception(std::current_exception());
214 this->_state.set(std::nullopt);
232template<
typename AsyncAction>
233requires requires (AsyncAction aa) {
234 bool(futurize_invoke(aa).get());
235 futurize_invoke(aa).get().value();
237repeat_until_value_return_type<AsyncAction>
240 using type_helper = repeat_until_value_type_helper<typename futurator::type>;
242 using value_type =
typename type_helper::value_type;
243 using optional_type =
typename type_helper::optional_type;
245 auto f = futurator::invoke(action);
247 if (!f.available()) {
248 return [&] ()
noexcept {
250 auto state =
new internal::repeat_until_value_state<AsyncAction, value_type>(std::move(action));
251 auto ret = state->get_future();
252 internal::set_callback(std::move(f), state);
258 return make_exception_future<value_type>(f.get_exception());
261 optional_type&& optional = std::move(f).get();
263 return make_ready_future<value_type>(std::move(optional.value()));
265 }
while (!need_preempt());
268 auto state =
new internal::repeat_until_value_state<AsyncAction, value_type>(std::nullopt, std::move(action));
269 auto f = state->get_future();
273 return make_exception_future<value_type>(std::current_exception());
279template <
typename StopCondition,
typename AsyncAction>
280class do_until_state final :
public continuation_base<> {
285 explicit do_until_state(StopCondition stop, AsyncAction action) : _stop(
std::move(stop)), _action(
std::move(action)) {}
286 future<> get_future() {
return _promise.get_future(); }
287 task* waiting_task() noexcept
override {
return _promise.waiting_task(); }
288 virtual void run_and_dispose() noexcept
override {
289 if (_state.available()) {
290 if (_state.failed()) {
291 _promise.set_urgent_state(std::move(_state));
300 _promise.set_value();
305 if (!f.available()) {
306 internal::set_callback(std::move(f),
this);
310 f.forward_to(std::move(_promise));
314 }
while (!need_preempt());
316 _promise.set_exception(std::current_exception());
337template<
typename AsyncAction,
typename StopCondition>
338requires std::is_invocable_r_v<bool, StopCondition> && std::is_invocable_r_v<future<>, AsyncAction>
341 using namespace internal;
345 return make_ready_future<>();
350 auto f = futurize_invoke(action);
354 if (!f.available() || need_preempt()) {
355 return [&] ()
noexcept {
357 auto task =
new do_until_state<StopCondition, AsyncAction>(std::move(stop_cond), std::move(action));
358 auto ret =
task->get_future();
359 internal::set_callback(std::move(f),
task);
373template<
typename AsyncAction>
374requires std::is_invocable_r_v<future<>, AsyncAction>
377 return repeat([action = std::move(action)] ()
mutable {
378 return action().then([] {
379 return stop_iteration::no;
385template <
typename Iterator,
class Sentinel,
typename AsyncAction>
386class do_for_each_state final :
public continuation_base<> {
393 do_for_each_state(Iterator begin, Sentinel end, AsyncAction action, future<>&& first_unavailable)
394 : _begin(
std::move(begin)), _end(
std::move(end)), _action(
std::move(action)) {
395 internal::set_callback(std::move(first_unavailable),
this);
397 virtual void run_and_dispose() noexcept
override {
398 std::unique_ptr<do_for_each_state> zis(
this);
399 if (_state.failed()) {
400 _pr.set_urgent_state(std::move(_state));
403 while (_begin != _end) {
404 auto f = futurize_invoke(_action, *_begin++);
406 f.forward_to(std::move(_pr));
409 if (!f.available() || need_preempt()) {
411 internal::set_callback(std::move(f),
this);
418 task* waiting_task() noexcept
override {
419 return _pr.waiting_task();
421 future<> get_future() {
426template<
typename Iterator,
typename Sentinel,
typename AsyncAction>
428future<> do_for_each_impl(Iterator begin, Sentinel end, AsyncAction action) {
429 while (begin != end) {
430 auto f = futurize_invoke(action, *begin++);
434 if (!f.available() || need_preempt()) {
435 auto* s =
new internal::do_for_each_state<Iterator, Sentinel, AsyncAction>{
436 std::move(begin), std::move(end), std::move(action), std::move(f)};
437 return s->get_future();
440 return make_ready_future<>();
458template<
typename Iterator,
typename Sentinel,
typename AsyncAction>
460 requires (Iterator i, AsyncAction aa) {
461 { futurize_invoke(aa, *i) } -> std::same_as<future<>>;
463 (std::same_as<Sentinel, Iterator> || std::sentinel_for<Sentinel, Iterator>)
468 return internal::do_for_each_impl(std::move(begin), std::move(end), std::move(action));
485template<
typename Range,
typename AsyncAction>
486requires requires (Range c, AsyncAction aa) {
487 { futurize_invoke(aa, *std::begin(c)) } -> std::same_as<future<>>;
493 return internal::do_for_each_impl(std::begin(c), std::end(c), std::move(action));
501template <
typename T,
typename =
void>
502struct has_iterator_category : std::false_type {};
505struct has_iterator_category<T,
std::void_t<typename std::iterator_traits<T>::iterator_category >> : std::true_type {};
507template <
typename Iterator,
typename Sentinel>
510iterator_range_estimate_vector_capacity(Iterator begin, Sentinel end) {
511 if constexpr (std::forward_iterator<Iterator> &&
512 std::forward_iterator<Sentinel>) {
513 return std::ranges::distance(begin, end);
514 }
else if constexpr (std::random_access_iterator<Iterator> &&
515 std::random_access_iterator<Sentinel>) {
516 return std::ranges::distance(begin, end);
527class parallel_for_each_state final :
private continuation_base<> {
528 std::vector<future<>> _incomplete;
530 std::exception_ptr _ex;
535 void wait_for_one() noexcept;
536 virtual
void run_and_dispose() noexcept override;
537 task* waiting_task() noexcept
override {
return _result.waiting_task(); }
539 parallel_for_each_state(
size_t n);
540 void add_future(future<>&& f);
541 future<> get_future();
565template <
typename Iterator,
typename Sentinel,
typename Func>
566requires (
requires (Func f, Iterator i) { { f(*i) } -> std::same_as<future<>>; { i++ }; } && (std::same_as<Sentinel, Iterator> || std::sentinel_for<Sentinel, Iterator>))
573 parallel_for_each_state* s =
nullptr;
578 while (begin != end) {
579 auto f = futurize_invoke(std::forward<Func>(func), *begin);
582 if (!f.available() || f.failed()) {
585 if constexpr (internal::has_iterator_category<Iterator>::value) {
588 n = (internal::iterator_range_estimate_vector_capacity(begin, end) + 1);
590 s =
new parallel_for_each_state(n);
592 s->add_future(std::move(f));
600 return s->get_future();
602 return make_ready_future<>();
627template <
typename Range,
typename Func>
630parallel_for_each_impl(Range&& range, Func&& func) {
632 std::forward<Func>(func));
637template <
typename Range,
typename Func>
638requires requires (Func f, Range r) {
639 { f(*std::begin(r)) } -> std::same_as<future<>>;
645 auto impl = internal::parallel_for_each_impl<Range, Func>;
646 return futurize_invoke(
impl, std::forward<Range>(range), std::forward<Func>(func));
670template <
typename Iterator,
typename Sentinel,
typename Func>
671requires (
requires (Func f, Iterator i) { { f(*i) } -> std::same_as<future<>>; { ++i }; } && (std::same_as<Sentinel, Iterator> || std::sentinel_for<Sentinel, Iterator>) )
682 size_t max_concurrent;
684 std::exception_ptr err;
686 state(Iterator begin_, Sentinel end_,
size_t max_concurrent_, Func func_)
687 : begin(std::move(begin_))
688 , end(std::move(end_))
689 , func(std::move(func_))
690 , max_concurrent(max_concurrent_)
691 , sem(max_concurrent_)
696 SEASTAR_ASSERT(max_concurrent > 0);
699 return do_with(state(std::move(begin), std::move(end), max_concurrent, std::forward<Func>(func)), [] (state& s) {
700 return do_until([&s] {
return s.begin == s.end; }, [&s] {
701 return s.sem.wait().then([&s] ()
mutable noexcept {
704 (void)futurize_invoke(s.func, *s.begin).then_wrapped([&s] (
future<> fut) {
706 auto e = fut.get_exception();;
708 s.err = std::move(e);
718 return s.sem.wait(s.max_concurrent);
721 return make_ready_future<>();
723 return seastar::make_exception_future<>(std::move(s.err));
751template <
typename Range,
typename Func>
752requires requires (Func f, Range r) {
753 { f(*std::begin(r)) } -> std::same_as<future<>>;
768SEASTAR_MODULE_EXPORT_END
Type-safe boolean.
Definition: bool_class.hh:58
A representation of a possibly not-yet-computed value.
Definition: future.hh:1197
void set_value(A &&... a) noexcept
Sets the promises value.
Definition: future.hh:968
future< T > get_future() noexcept
Gets the promise's associated future.
Definition: future.hh:1852
future< T > current_exception_as_future() noexcept
Returns std::current_exception() wrapped in a future.
Definition: future.hh:1888
future max_concurrent_for_each(Iterator begin, Sentinel end, size_t max_concurrent, Func &&func) noexcept
Definition: loop.hh:677
future do_for_each(Iterator begin, Sentinel end, AsyncAction action) noexcept
Call a function for each item in a range, sequentially (iterator version).
Definition: loop.hh:466
auto do_with(T1 &&rv1, T2 &&rv2, More &&... more) noexcept
Definition: do_with.hh:135
future do_until(StopCondition stop_cond, AsyncAction action) noexcept
Definition: loop.hh:340
future keep_doing(AsyncAction action) noexcept
Definition: loop.hh:376
repeat_until_value_return_type< AsyncAction > repeat_until_value(AsyncAction action) noexcept
Definition: loop.hh:238
future parallel_for_each(Iterator begin, Sentinel end, Func &&func) noexcept
Run tasks in parallel (iterator version).
Definition: loop.hh:572
future max_concurrent_for_each(Range &&range, size_t max_concurrent, Func &&func) noexcept
Definition: loop.hh:758
holds the implementation parts of the metrics layer, do not use directly.
Definition: critical_alloc_section.hh:80
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
Converts a type to a future type, if it isn't already.
Definition: future.hh:1786