25#include <seastar/core/loop.hh>
26#include <seastar/core/map_reduce.hh>
27#include <seastar/core/internal/run_in_background.hh>
28#include <seastar/util/is_smart_ptr.hh>
29#include <seastar/util/tuple_utils.hh>
30#include <seastar/core/do_with.hh>
31#include <seastar/util/log.hh>
32#include <seastar/util/modules.hh>
53SEASTAR_MODULE_EXPORT_BEGIN
55template <
typename Func,
typename... Param>
56class sharded_parameter;
58template <
typename Service>
61SEASTAR_MODULE_EXPORT_END
65template <
typename Func,
typename... Param>
66auto unwrap_sharded_arg(sharded_parameter<Func, Param...> sp);
68using on_each_shard_func = std::function<future<> (
unsigned shard)>;
70future<> sharded_parallel_for_each(
unsigned nr_shards, on_each_shard_func on_each_shard)
noexcept(std::is_nothrow_move_constructible_v<on_each_shard_func>);
72template <
typename Service>
73class either_sharded_or_local {
74 sharded<Service>& _sharded;
76 either_sharded_or_local(sharded<Service>& s) : _sharded(s) {}
77 operator sharded<Service>& ();
82struct sharded_unwrap {
83 using evaluated_type = T;
88struct sharded_unwrap<
std::reference_wrapper<sharded<T>>> {
89 using evaluated_type = T&;
90 using type = either_sharded_or_local<T>;
93template <
typename Func,
typename... Param>
94struct sharded_unwrap<sharded_parameter<Func, Param...>> {
95 using type = std::invoke_result_t<Func, Param...>;
99using sharded_unwrap_evaluated_t =
typename sharded_unwrap<T>::evaluated_type;
102using sharded_unwrap_t =
typename sharded_unwrap<T>::type;
105concept unsigned_range = std::ranges::range<R>
106 && std::is_unsigned_v<std::ranges::range_value_t<R>>;
114SEASTAR_MODULE_EXPORT_BEGIN
135 template <
typename Service>
friend class sharded;
144template <
typename Service>
148 template <
typename T>
friend class sharded;
149 void set_container(
sharded<Service>* container)
noexcept { _container = container; }
166 : _msg(
"sharded instance does not exist: " + type_info) {}
167 virtual const char* what()
const noexcept override {
181template <
typename Service>
186 future<> track_deletion()
noexcept {
188 if constexpr (std::is_base_of_v<async_sharded_service<Service>, Service>) {
190 return service->freed();
193 return make_ready_future<>();
196 std::vector<entry> _instances;
198 using invoke_on_multiple_func_type = std::function<future<> (Service&)>;
200 template <
typename U,
bool async>
203 template <
typename T>
204 std::enable_if_t<std::is_base_of_v<peering_sharded_service<T>, T>>
205 set_container(T& service)
noexcept {
206 service.set_container(
this);
209 template <
typename T>
210 std::enable_if_t<!std::is_base_of_v<peering_sharded_service<T>, T>>
211 set_container(T&)
noexcept {
215 sharded_parallel_for_each(internal::on_each_shard_func func)
noexcept(std::is_nothrow_move_constructible_v<internal::on_each_shard_func>) {
216 return internal::sharded_parallel_for_each(_instances.size(), std::move(func));
237 template <
typename... Args>
246 template <
typename... Args>
286 template <typename Func, typename... Args>
287 requires
std::invocable<Func, Service&, internal::sharded_unwrap_t<Args>...>
288 &&
std::is_same_v<futurize_t<
std::invoke_result_t<Func, Service&, internal::sharded_unwrap_t<Args>...>>,
future<>>
296 template <typename Func, typename... Args>
297 requires
std::invocable<Func, Service&, internal::sharded_unwrap_t<Args>...>
298 &&
std::is_same_v<futurize_t<
std::invoke_result_t<Func, Service&, internal::sharded_unwrap_t<Args>...>>,
future<>>
311 template <typename Func, typename... Args>
312 requires
std::invocable<Func, Service&, Args...>
313 &&
std::is_same_v<futurize_t<
std::invoke_result_t<Func, Service&, Args...>>,
future<>>
327 template <typename Func, typename... Args>
328 requires
std::invocable<Func, Service&, Args...>
329 &&
std::is_same_v<futurize_t<
std::invoke_result_t<Func, Service&, Args...>>,
future<>>
344 template <typename Func, typename... Args, typename Ret = futurize_t<
std::invoke_result_t<Func, Service&, Args...>>>
345 requires
std::invocable<Func, Service&, Args&&...>
357 template <typename Func, typename... Args, typename Ret = futurize_t<
std::invoke_result_t<Func, Service&, Args&&...>>>
358 requires
std::invocable<Func, Service&, Args&&...>
373 template <typename R, typename Func, typename... Args>
374 requires
std::invocable<Func, Service&, Args...>
375 &&
std::is_same_v<futurize_t<
std::invoke_result_t<Func, Service&, internal::sharded_unwrap_t<Args>...>>,
future<>>
376 && internal::unsigned_range<R>
391 template <typename R, typename Func, typename... Args>
392 requires
std::invocable<Func, Service&, Args...>
393 &&
std::is_same_v<futurize_t<
std::invoke_result_t<Func, Service&, internal::sharded_unwrap_t<Args>...>>,
future<>>
394 && internal::unsigned_range<R>
402 template <typename Reducer, typename Func, typename... Args>
403 auto
map_reduce(Reducer&& r, Func&& func, Args&&... args) -> typename reducer_traits<Reducer>::future_type
405 auto rng = std::views::iota(
size_t(0), _instances.size());
407 [
this, func = std::forward<Func>(func), args = std::make_tuple(std::forward<Args>(args)...)] (
unsigned c)
mutable {
408 return smp::submit_to(c, [this, &func, args] () mutable {
409 return std::apply([this, &func] (Args&&... args) mutable {
410 auto inst = get_local_service();
411 return std::invoke(func, *inst, std::forward<Args>(args)...);
414 }, std::forward<Reducer>(r));
418 template <
typename Reducer,
typename Func,
typename... Args>
419 auto map_reduce(Reducer&& r, Func&& func, Args&&... args)
const ->
typename reducer_traits<Reducer>::future_type
421 auto rng = std::views::iota(
size_t(0), _instances.size());
423 [
this, func = std::forward<Func>(func), args = std::make_tuple(std::forward<Args>(args)...)] (
unsigned c) {
424 return smp::submit_to(c, [this, &func, args] () {
425 return std::apply([this, &func] (Args&&... args) {
426 auto inst = get_local_service();
427 return std::invoke(func, *inst, std::forward<Args>(args)...);
430 }, std::forward<Reducer>(r));
449 template <
typename Mapper,
typename Initial,
typename Reduce>
452 auto wrapped_map = [
this, map] (
unsigned c) {
454 auto inst = get_local_service();
455 return std::invoke(map, *inst);
459 std::move(wrapped_map),
465 template <
typename Mapper,
typename Initial,
typename Reduce>
468 auto wrapped_map = [
this, map] (
unsigned c) {
470 auto inst = get_local_service();
471 return std::invoke(map, *inst);
475 std::move(wrapped_map),
489 template <
typename Mapper,
typename Future = futurize_t<std::invoke_result_t<Mapper,Service&>>,
typename return_type = decltype(
internal::untuple(std::declval<
typename Future::tuple_type>()))>
491 return do_with(std::vector<return_type>(), std::move(mapper),
492 [
this] (std::vector<return_type>& vec, Mapper& mapper)
mutable {
493 vec.resize(_instances.size());
494 return parallel_for_each(std::views::iota(0u, _instances.size()), [
this, &vec, &mapper] (
unsigned c) {
495 return smp::submit_to(c, [this, &mapper] {
496 auto inst = get_local_service();
497 return mapper(*inst);
498 }).
then([&vec, c] (
auto&& res) {
499 vec[c] = std::move(res);
502 return make_ready_future<std::vector<return_type>>(std::move(vec));
508 const Service&
local() const noexcept;
511 Service& local() noexcept;
517 bool local_is_initialized() const noexcept;
520 template <typename... Args>
521 shared_ptr<Service> create_local_service(Args&&... args) {
522 auto s = ::seastar::make_shared<Service>(std::forward<Args>(args)...);
535 shared_ptr<const Service> get_local_service()
const {
538 throw no_sharded_instance_exception(pretty_type_name(
typeid(Service)));
549template <
typename Func,
typename... Params>
552 std::tuple<Params...> _params;
563 requires std::invocable<Func, internal::sharded_unwrap_evaluated_t<Params>...>
564 : _func(std::move(func)), _params(std::make_tuple(std::move(params)...)) {
567 auto evaluate()
const;
569 template <
typename Func_,
typename... Param_>
576SEASTAR_MODULE_EXPORT_END
579template <
typename Service>
581 assert(_instances.empty());
588unwrap_sharded_arg(T&& arg) {
589 return std::forward<T>(arg);
592template <
typename Service>
593either_sharded_or_local<Service>
594unwrap_sharded_arg(std::reference_wrapper<sharded<Service>> arg) {
595 return either_sharded_or_local<Service>(arg);
598template <
typename Func,
typename... Param>
600unwrap_sharded_arg(sharded_parameter<Func, Param...> sp) {
601 return sp.evaluate();
604template <
typename Service>
605either_sharded_or_local<Service>::operator sharded<Service>& () {
return _sharded; }
607template <
typename Service>
608either_sharded_or_local<Service>::operator Service& () {
return _sharded.local(); }
612template <
typename Func,
typename... Param>
614sharded_parameter<Func, Param...>::evaluate()
const {
615 auto unwrap_params_and_invoke = [
this] (
const auto&... params) {
616 return std::invoke(_func, internal::unwrap_sharded_arg(params)...);
618 return std::apply(unwrap_params_and_invoke, _params);
621template <
typename Service>
622template <
typename... Args>
626 _instances.resize(smp::count);
627 return sharded_parallel_for_each(
628 [
this, args = std::make_tuple(std::forward<Args>(args)...)] (
unsigned c)
mutable {
630 _instances[
this_shard_id()].service = std::apply([
this] (Args... args) {
631 return create_local_service(internal::unwrap_sharded_arg(std::forward<Args>(args))...);
634 }).then_wrapped([
this] (
future<> f) {
637 return make_ready_future<>();
639 return this->stop().then([e = std::current_exception()] ()
mutable {
640 std::rethrow_exception(e);
649template <
typename Service>
650template <
typename... Args>
654 assert(_instances.empty());
655 _instances.resize(1);
656 return smp::submit_to(0, [
this, args = std::make_tuple(std::forward<Args>(args)...)] ()
mutable {
657 _instances[0].service = std::apply([
this] (Args... args) {
658 return create_local_service(internal::unwrap_sharded_arg(std::forward<Args>(args))...);
660 }).then_wrapped([
this] (
future<> f) {
663 return make_ready_future<>();
665 return this->stop().then([e = std::current_exception()] ()
mutable {
666 std::rethrow_exception(e);
679struct sharded_has_stop {
684 template <
typename Service>
685 constexpr static auto check(
int) -> std::enable_if_t<(
sizeof(&Service::stop) >= 0),
bool> {
691 static constexpr auto check(...) ->
bool {
696template <
bool stop_exists>
697struct sharded_call_stop {
698 template <
typename Service>
699 static future<> call(Service& instance);
703template <
typename Service>
704future<> sharded_call_stop<true>::call(Service& instance) {
705 return instance.stop();
709template <
typename Service>
710future<> sharded_call_stop<false>::call(Service&) {
711 return make_ready_future<>();
714template <
typename Service>
716stop_sharded_instance(Service& instance) {
717 constexpr bool has_stop = internal::sharded_has_stop::check<Service>(0);
718 return internal::sharded_call_stop<has_stop>::call(instance);
723template <
typename Service>
727 return sharded_parallel_for_each([
this] (
unsigned c)
mutable {
731 return make_ready_future<>();
733 return internal::stop_sharded_instance(*inst);
735 }).then_wrapped([
this] (
future<> fut) {
736 return sharded_parallel_for_each([
this] (
unsigned c) {
742 }).
finally([
this, fut = std::move(fut)] ()
mutable {
744 _instances = std::vector<sharded<Service>::entry>();
745 return std::move(fut);
753template <
typename Service>
757 return sharded_parallel_for_each([
this, options, func = std::move(func)] (
unsigned c) {
759 return func(*get_local_service());
767template <
typename Service>
777template <
typename Service>
778template <
typename Func,
typename... Args>
779requires std::invocable<Func, Service&, internal::sharded_unwrap_t<Args>...>
780 && std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, internal::sharded_unwrap_t<Args>...>>,
future<>>
784 return invoke_on_all(options, invoke_on_multiple_func_type([func = std::move(func), args = std::tuple(std::move(args)...)] (Service& service)
mutable {
785 return std::apply([&service, &func] (Args&&... args)
mutable {
786 return futurize_apply(func, std::tuple_cat(std::forward_as_tuple(service), std::tuple(internal::unwrap_sharded_arg(std::forward<Args>(args))...)));
794template <
typename Service>
795template <
typename Func,
typename... Args>
796requires std::invocable<Func, Service&, internal::sharded_unwrap_t<Args>...>
797 && std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, internal::sharded_unwrap_t<Args>...>>,
future<>>
807template <
typename Service>
808template <
typename Func,
typename... Args>
809requires std::invocable<Func, Service&, Args...>
810 && std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, Args...>>,
future<>>
814 return invoke_on_all(options, [orig =
this_shard_id(), func = std::move(func), args = std::tuple(std::move(args)...)] (Service& s)
mutable ->
future<> {
815 return this_shard_id() == orig ? make_ready_future<>() : futurize_apply(func, std::tuple_cat(std::forward_as_tuple(s), args));;
822template <
typename Service>
823template <
typename Func,
typename... Args>
824requires std::invocable<Func, Service&, Args...>
825 && std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, Args...>>,
future<>>
835template <
typename Service>
836template <
typename Func,
typename... Args,
typename Ret>
837requires std::invocable<Func, Service&, Args&&...>
840 return smp::submit_to(
id, options, [
this, func = std::forward<Func>(func), args = std::tuple(std::move(args)...)] ()
mutable {
841 auto inst = get_local_service();
842 return std::apply(std::forward<Func>(func), std::tuple_cat(std::forward_as_tuple(*inst), std::move(args)));
846template <
typename Service>
847template <
typename Func,
typename... Args,
typename Ret>
848requires std::invocable<Func, Service&, Args&&...>
851 return invoke_on(
id,
smp_submit_to_options(), std::forward<Func>(func), std::forward<Args>(args)...);
854template <
typename Service>
855template <
typename R,
typename Func,
typename... Args>
856requires std::invocable<Func, Service&, Args...>
857 && std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, internal::sharded_unwrap_t<Args>...>>,
future<>>
858 && internal::unsigned_range<R>
862 auto func_futurized = invoke_on_multiple_func_type([func = std::move(func), args = std::tuple(std::move(args)...)] (Service& service)
mutable {
865 return futurize_apply(func, std::tuple_cat(std::forward_as_tuple(service), std::tuple(internal::unwrap_sharded_arg(std::forward<Args>(args))...)));
867 return parallel_for_each(range, [
this, options, func = std::move(func_futurized)] (
unsigned s) {
868 if (s > smp::count - 1) {
869 throw std::invalid_argument(
format(
"Invalid shard id in range: {}. Must be in range [0,{})", s, smp::count));
872 return func(*get_local_service());
880template <
typename Service>
881template <
typename R,
typename Func,
typename... Args>
882requires std::invocable<Func, Service&, Args...>
883 && std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, internal::sharded_unwrap_t<Args>...>>,
future<>>
884 && internal::unsigned_range<R>
888 return invoke_on(std::forward<R>(range),
smp_submit_to_options{}, std::move(func), std::move(args)...);
894template <
typename Service>
896 assert(local_is_initialized());
900template <
typename Service>
902 assert(local_is_initialized());
906template <
typename Service>
908 assert(local_is_initialized());
912template <
typename Service>
918SEASTAR_MODULE_EXPORT_BEGIN
943template <
typename PtrType>
944requires (!std::is_pointer_v<PtrType>)
950 void destroy(PtrType p,
unsigned cpu)
noexcept {
954 auto f = destroy_on(std::move(p), cpu);
955 if (!f.available() || f.failed()) {
956 internal::run_in_background(std::move(f));
960 static future<> destroy_on(PtrType p,
unsigned cpu)
noexcept {
973 return make_ready_future<>();
976 using element_type =
typename std::pointer_traits<PtrType>::element_type;
977 using pointer = element_type*;
987 foreign_ptr(PtrType value)
noexcept(std::is_nothrow_move_constructible_v<PtrType>)
988 : _value(std::move(value))
998 destroy(std::move(_value), _cpu);
1008 element_type&
operator*() const noexcept(noexcept(*_value)) {
return *_value; }
1010 element_type*
operator->() const noexcept(noexcept(&*_value)) {
return &*_value; }
1012 pointer
get() const noexcept(noexcept(&*_value)) {
return &*_value; }
1019 operator bool() const noexcept(noexcept(static_cast<
bool>(_value))) {
return static_cast<bool>(_value); }
1022 destroy(std::move(_value), _cpu);
1023 _value = std::move(other._value);
1032 PtrType
release() noexcept(
std::is_nothrow_default_constructible_v<PtrType>) {
1033 return std::exchange(_value, {});
1038 void reset(PtrType new_ptr)
noexcept(std::is_nothrow_move_constructible_v<PtrType>) {
1039 auto old_ptr = std::move(_value);
1040 auto old_cpu = _cpu;
1042 _value = std::move(new_ptr);
1045 destroy(std::move(old_ptr), old_cpu);
1050 void reset(std::nullptr_t =
nullptr) noexcept(
std::is_nothrow_default_constructible_v<PtrType>) {
1058 return destroy_on(std::move(_value), _cpu);
1065template <
typename T>
1075SEASTAR_MODULE_EXPORT_END
Definition: sharded.hh:125
Definition: shared_ptr.hh:493
Definition: sharded.hh:945
pointer get() const noexcept(noexcept(&*_value))
Access the raw pointer to the wrapped object.
Definition: sharded.hh:1012
foreign_ptr(std::nullptr_t) noexcept(std::is_nothrow_default_constructible_v< foreign_ptr >)
Constructs a null foreign_ptr<>.
Definition: sharded.hh:985
element_type * operator->() const noexcept(noexcept(&*_value))
Accesses the wrapped object.
Definition: sharded.hh:1010
void reset(PtrType new_ptr) noexcept(std::is_nothrow_move_constructible_v< PtrType >)
Definition: sharded.hh:1038
unsigned get_owner_shard() const noexcept
Definition: sharded.hh:1017
~foreign_ptr()
Destroys the wrapped object on its original cpu.
Definition: sharded.hh:997
foreign_ptr(PtrType value) noexcept(std::is_nothrow_move_constructible_v< PtrType >)
Wraps a pointer object and remembers the current core.
Definition: sharded.hh:987
foreign_ptr(foreign_ptr &&other) noexcept(std::is_nothrow_move_constructible_v< PtrType >)=default
Moves a foreign_ptr<> to another object.
void reset(std::nullptr_t=nullptr) noexcept(std::is_nothrow_default_constructible_v< PtrType >)
Definition: sharded.hh:1050
foreign_ptr() noexcept(std::is_nothrow_default_constructible_v< PtrType >)
Constructs a null foreign_ptr<>.
Definition: sharded.hh:980
foreign_ptr & operator=(foreign_ptr &&other) noexcept(std::is_nothrow_move_constructible_v< PtrType >)
Move-assigns a foreign_ptr<>.
Definition: sharded.hh:1021
future< foreign_ptr > copy() const noexcept
Creates a copy of this foreign ptr. Only works if the stored ptr is copyable.
Definition: sharded.hh:1001
future destroy() noexcept
Definition: sharded.hh:1057
PtrType release() noexcept(std::is_nothrow_default_constructible_v< PtrType >)
Definition: sharded.hh:1032
element_type & operator*() const noexcept(noexcept(*_value))
Accesses the wrapped object.
Definition: sharded.hh:1008
A representation of a possibly not-yet-computed value.
Definition: future.hh:1219
Result then(Func &&func) noexcept
Schedule a block of code to run when the future is ready.
Definition: future.hh:1404
value_type && get()
gets the value returned by the computation
Definition: future.hh:1321
Exception thrown when a sharded object does not exist.
Definition: sharded.hh:161
Provide a sharded service with access to its peers.
Definition: sharded.hh:145
void set_value(A &&... a) noexcept
Sets the promises value.
Definition: future.hh:969
Helper to pass a parameter to a sharded<> object that depends on the shard. It is evaluated on the sh...
Definition: sharded.hh:550
sharded_parameter(Func func, Params... params)
Definition: sharded.hh:562
Definition: sharded.hh:182
sharded(sharded &&other)=delete
~sharded()
Destroyes a sharded object. Must not be in a started state.
Definition: sharded.hh:580
future invoke_on_all(smp_submit_to_options options, std::function< future<>(Service &)> func) noexcept
Definition: sharded.hh:755
future start_single(Args &&... args) noexcept
Definition: sharded.hh:652
const Service & local() const noexcept
Gets a reference to the local instance.
Definition: sharded.hh:895
future< std::vector< return_type > > map(Mapper mapper)
Definition: sharded.hh:490
future< Initial > map_reduce0(Mapper map, Initial initial, Reduce reduce) const
The const version of map_reduce0(Mapper map, Initial initial, Reduce reduce)
Definition: sharded.hh:467
auto map_reduce(Reducer &&r, Func &&func, Args &&... args) const -> typename reducer_traits< Reducer >::future_type
The const version of map_reduce(Reducer&& r, Func&& func)
Definition: sharded.hh:419
sharded() noexcept
Definition: sharded.hh:221
future stop() noexcept
Definition: sharded.hh:725
future invoke_on_others(smp_submit_to_options options, Func func, Args... args) noexcept
Definition: sharded.hh:812
auto map_reduce(Reducer &&r, Func &&func, Args &&... args) -> typename reducer_traits< Reducer >::future_type
Definition: sharded.hh:403
future< Initial > map_reduce0(Mapper map, Initial initial, Reduce reduce)
Definition: sharded.hh:451
Ret invoke_on(unsigned id, smp_submit_to_options options, Func &&func, Args &&... args)
Definition: sharded.hh:839
future start(Args &&... args) noexcept
Definition: sharded.hh:624
future< T > get_future() noexcept
Gets the promise's associated future.
Definition: future.hh:1905
future< T > current_exception_as_future() noexcept
Returns std::current_exception() wrapped in a future.
Definition: future.hh:1941
auto do_with(T1 &&rv1, T2 &&rv2, More &&... more) noexcept
Definition: do_with.hh:135
auto map_reduce(Iterator begin, Iterator end, Mapper &&mapper, Reducer &&r) -> typename reducer_traits< Reducer >::future_type
Definition: map_reduce.hh:107
future parallel_for_each(Iterator begin, Sentinel end, Func &&func) noexcept
Run tasks in parallel (iterator version).
Definition: loop.hh:571
foreign_ptr< T > make_foreign(T ptr)
Definition: sharded.hh:1066
std::future< T > submit_to(instance &instance, unsigned shard, Func func)
Definition: alien.hh:204
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
sstring format(fmt::format_string< A... > fmt, A &&... a)
Definition: format.hh:42
shard_id this_shard_id() noexcept
Returns shard_id of the of the current shard.
Definition: shard_id.hh:52
Definition: shared_ptr.hh:653
Definition: is_smart_ptr.hh:30
Options controlling the behaviour of smp::submit_to().
Definition: smp.hh:168