25#include <seastar/core/loop.hh>
26#include <seastar/core/map_reduce.hh>
27#include <seastar/core/internal/run_in_background.hh>
28#include <seastar/util/is_smart_ptr.hh>
29#include <seastar/util/tuple_utils.hh>
30#include <seastar/core/do_with.hh>
31#include <seastar/util/log.hh>
32#include <seastar/util/modules.hh>
53SEASTAR_MODULE_EXPORT_BEGIN
55template <
typename Func,
typename... Param>
56class sharded_parameter;
58template <
typename Service>
61SEASTAR_MODULE_EXPORT_END
65template <
typename Func,
typename... Param>
66auto unwrap_sharded_arg(sharded_parameter<Func, Param...> sp);
68using on_each_shard_func = std::function<future<> (
unsigned shard)>;
70future<> sharded_parallel_for_each(
unsigned nr_shards, on_each_shard_func on_each_shard)
noexcept(std::is_nothrow_move_constructible_v<on_each_shard_func>);
72template <
typename Service>
73class either_sharded_or_local {
74 sharded<Service>& _sharded;
76 either_sharded_or_local(sharded<Service>& s) : _sharded(s) {}
77 operator sharded<Service>& ();
82struct sharded_unwrap {
83 using evaluated_type = T;
88struct sharded_unwrap<
std::reference_wrapper<sharded<T>>> {
89 using evaluated_type = T&;
90 using type = either_sharded_or_local<T>;
93template <
typename Func,
typename... Param>
94struct sharded_unwrap<sharded_parameter<Func, Param...>> {
95 using type = std::invoke_result_t<Func, Param...>;
99using sharded_unwrap_evaluated_t =
typename sharded_unwrap<T>::evaluated_type;
102using sharded_unwrap_t =
typename sharded_unwrap<T>::type;
105concept unsigned_range = std::ranges::range<R>
106 && std::is_unsigned_v<std::ranges::range_value_t<R>>;
114SEASTAR_MODULE_EXPORT_BEGIN
135 template <
typename Service>
friend class sharded;
144template <
typename Service>
148 template <
typename T>
friend class sharded;
149 void set_container(
sharded<Service>* container)
noexcept { _container = container; }
166 : _msg(
"sharded instance does not exist: " + type_info) {}
167 virtual const char* what()
const noexcept override {
181template <
typename Service>
186 future<> track_deletion()
noexcept {
188 if constexpr (std::is_base_of_v<async_sharded_service<Service>, Service>) {
190 return service->freed();
193 return make_ready_future<>();
196 std::vector<entry> _instances;
198 using invoke_on_multiple_func_type = std::function<future<> (Service&)>;
200 template <
typename U,
bool async>
203 template <
typename T>
204 std::enable_if_t<std::is_base_of_v<peering_sharded_service<T>, T>>
205 set_container(T& service)
noexcept {
206 service.set_container(
this);
209 template <
typename T>
210 std::enable_if_t<!std::is_base_of_v<peering_sharded_service<T>, T>>
211 set_container(T&)
noexcept {
215 sharded_parallel_for_each(internal::on_each_shard_func func)
noexcept(std::is_nothrow_move_constructible_v<internal::on_each_shard_func>) {
216 return internal::sharded_parallel_for_each(_instances.size(), std::move(func));
237 template <
typename... Args>
246 template <
typename... Args>
286 template <typename Func, typename... Args>
287 requires
std::invocable<Func, Service&, internal::sharded_unwrap_t<Args>...>
288 &&
std::is_same_v<futurize_t<
std::invoke_result_t<Func, Service&, internal::sharded_unwrap_t<Args>...>>,
future<>>
296 template <typename Func, typename... Args>
297 requires
std::invocable<Func, Service&, internal::sharded_unwrap_t<Args>...>
298 &&
std::is_same_v<futurize_t<
std::invoke_result_t<Func, Service&, internal::sharded_unwrap_t<Args>...>>,
future<>>
311 template <typename Func, typename... Args>
312 requires
std::invocable<Func, Service&, Args...>
313 &&
std::is_same_v<futurize_t<
std::invoke_result_t<Func, Service&, Args...>>,
future<>>
327 template <typename Func, typename... Args>
328 requires
std::invocable<Func, Service&, Args...>
329 &&
std::is_same_v<futurize_t<
std::invoke_result_t<Func, Service&, Args...>>,
future<>>
344 template <typename Func, typename... Args, typename Ret = futurize_t<
std::invoke_result_t<Func, Service&, Args...>>>
345 requires
std::invocable<Func, Service&, Args&&...>
357 template <typename Func, typename... Args, typename Ret = futurize_t<
std::invoke_result_t<Func, Service&, Args&&...>>>
358 requires
std::invocable<Func, Service&, Args&&...>
373 template <typename R, typename Func, typename... Args>
374 requires
std::invocable<Func, Service&, Args...>
375 &&
std::is_same_v<futurize_t<
std::invoke_result_t<Func, Service&, internal::sharded_unwrap_t<Args>...>>,
future<>>
376 && internal::unsigned_range<R>
391 template <typename R, typename Func, typename... Args>
392 requires
std::invocable<Func, Service&, Args...>
393 &&
std::is_same_v<futurize_t<
std::invoke_result_t<Func, Service&, internal::sharded_unwrap_t<Args>...>>,
future<>>
394 && internal::unsigned_range<R>
402 template <typename Reducer, typename Func, typename... Args>
404 auto
map_reduce(Reducer&& r, Func&& func, Args&&... args) -> typename reducer_traits<Reducer>::future_type
406 auto rng = std::views::iota(
size_t(0), _instances.size());
408 [
this, func = std::forward<Func>(func), args = std::make_tuple(std::forward<Args>(args)...)] (
unsigned c)
mutable {
409 return smp::submit_to(c, [this, &func, args] () mutable {
410 return std::apply([this, &func] (Args&&... args) mutable {
411 auto inst = get_local_service();
412 return std::invoke(func, *inst, std::forward<Args>(args)...);
415 }, std::forward<Reducer>(r));
419 template <
typename Reducer,
typename Func,
typename... Args>
421 auto map_reduce(Reducer&& r, Func&& func, Args&&... args)
const ->
typename reducer_traits<Reducer>::future_type
423 auto rng = std::views::iota(
size_t(0), _instances.size());
425 [
this, func = std::forward<Func>(func), args = std::make_tuple(std::forward<Args>(args)...)] (
unsigned c) {
426 return smp::submit_to(c, [this, &func, args] () {
427 return std::apply([this, &func] (Args&&... args) {
428 auto inst = get_local_service();
429 return std::invoke(func, *inst, std::forward<Args>(args)...);
432 }, std::forward<Reducer>(r));
451 template <
typename Mapper,
typename Initial,
typename Reduce>
455 auto wrapped_map = [
this, map] (
unsigned c) {
457 auto inst = get_local_service();
458 return std::invoke(map, *inst);
462 std::move(wrapped_map),
468 template <
typename Mapper,
typename Initial,
typename Reduce>
472 auto wrapped_map = [
this, map] (
unsigned c) {
474 auto inst = get_local_service();
475 return std::invoke(map, *inst);
479 std::move(wrapped_map),
493 template <
typename Mapper,
typename Future = futurize_t<std::invoke_result_t<Mapper,Service&>>,
typename return_type = decltype(
internal::untuple(std::declval<
typename Future::tuple_type>()))>
495 return do_with(std::vector<return_type>(), std::move(mapper),
496 [
this] (std::vector<return_type>& vec, Mapper& mapper)
mutable {
497 vec.resize(_instances.size());
498 return parallel_for_each(std::views::iota(0u, _instances.size()), [
this, &vec, &mapper] (
unsigned c) {
499 return smp::submit_to(c, [this, &mapper] {
500 auto inst = get_local_service();
501 return mapper(*inst);
502 }).
then([&vec, c] (
auto&& res) {
503 vec[c] = std::move(res);
506 return make_ready_future<std::vector<return_type>>(std::move(vec));
512 const Service&
local() const noexcept;
515 Service& local() noexcept;
521 bool local_is_initialized() const noexcept;
524 template <typename... Args>
525 shared_ptr<Service> create_local_service(Args&&... args) {
526 auto s = ::seastar::make_shared<Service>(std::forward<Args>(args)...);
539 shared_ptr<const Service> get_local_service()
const {
542 throw no_sharded_instance_exception(pretty_type_name(
typeid(Service)));
553template <
typename Func,
typename... Params>
556 std::tuple<Params...> _params;
567 requires std::invocable<Func, internal::sharded_unwrap_evaluated_t<Params>...>
568 : _func(std::move(func)), _params(std::make_tuple(std::move(params)...)) {
571 auto evaluate()
const;
573 template <
typename Func_,
typename... Param_>
580SEASTAR_MODULE_EXPORT_END
583template <
typename Service>
585 assert(_instances.empty());
593unwrap_sharded_arg(T&& arg) {
594 return std::forward<T>(arg);
597template <
typename Service>
598either_sharded_or_local<Service>
599unwrap_sharded_arg(std::reference_wrapper<sharded<Service>> arg) {
600 return either_sharded_or_local<Service>(arg);
603template <
typename Func,
typename... Param>
605unwrap_sharded_arg(sharded_parameter<Func, Param...> sp) {
606 return sp.evaluate();
609template <
typename Service>
610either_sharded_or_local<Service>::operator sharded<Service>& () {
return _sharded; }
612template <
typename Service>
613either_sharded_or_local<Service>::operator Service& () {
return _sharded.local(); }
617template <
typename Func,
typename... Param>
619sharded_parameter<Func, Param...>::evaluate()
const {
620 auto unwrap_params_and_invoke = [
this] (
const auto&... params) {
621 return std::invoke(_func, internal::unwrap_sharded_arg(params)...);
623 return std::apply(unwrap_params_and_invoke, _params);
626template <
typename Service>
627template <
typename... Args>
631 _instances.resize(smp::count);
632 return sharded_parallel_for_each(
633 [
this, args = std::make_tuple(std::forward<Args>(args)...)] (
unsigned c)
mutable {
635 _instances[
this_shard_id()].service = std::apply([
this] (Args... args) {
636 return create_local_service(internal::unwrap_sharded_arg(std::forward<Args>(args))...);
639 }).then_wrapped([
this] (
future<> f) {
642 return make_ready_future<>();
644 return this->stop().then([e = std::current_exception()] ()
mutable {
645 std::rethrow_exception(e);
654template <
typename Service>
655template <
typename... Args>
659 assert(_instances.empty());
660 _instances.resize(1);
661 return smp::submit_to(0, [
this, args = std::make_tuple(std::forward<Args>(args)...)] ()
mutable {
662 _instances[0].service = std::apply([
this] (Args... args) {
663 return create_local_service(internal::unwrap_sharded_arg(std::forward<Args>(args))...);
665 }).then_wrapped([
this] (
future<> f) {
668 return make_ready_future<>();
670 return this->stop().then([e = std::current_exception()] ()
mutable {
671 std::rethrow_exception(e);
684struct sharded_has_stop {
689 template <
typename Service>
690 constexpr static auto check(
int) -> std::enable_if_t<(
sizeof(&Service::stop) >= 0),
bool> {
696 static constexpr auto check(...) ->
bool {
701template <
bool stop_exists>
702struct sharded_call_stop {
703 template <
typename Service>
704 static future<> call(Service& instance);
708template <
typename Service>
710future<> sharded_call_stop<true>::call(Service& instance) {
711 return instance.stop();
715template <
typename Service>
717future<> sharded_call_stop<false>::call(Service&) {
718 return make_ready_future<>();
721template <
typename Service>
724stop_sharded_instance(Service& instance) {
725 constexpr bool has_stop = internal::sharded_has_stop::check<Service>(0);
726 return internal::sharded_call_stop<has_stop>::call(instance);
731template <
typename Service>
735 return sharded_parallel_for_each([
this] (
unsigned c)
mutable {
739 return make_ready_future<>();
741 return internal::stop_sharded_instance(*inst);
743 }).then_wrapped([
this] (
future<> fut) {
744 return sharded_parallel_for_each([
this] (
unsigned c) {
750 }).
finally([
this, fut = std::move(fut)] ()
mutable {
752 _instances = std::vector<sharded<Service>::entry>();
753 return std::move(fut);
761template <
typename Service>
765 return sharded_parallel_for_each([
this, options, func = std::move(func)] (
unsigned c) {
767 return func(*get_local_service());
775template <
typename Service>
786template <
typename Service>
787template <
typename Func,
typename... Args>
788requires std::invocable<Func, Service&, internal::sharded_unwrap_t<Args>...>
789 && std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, internal::sharded_unwrap_t<Args>...>>,
future<>>
794 return invoke_on_all(options, invoke_on_multiple_func_type([func = std::move(func), args = std::tuple(std::move(args)...)] (Service& service)
mutable {
795 return std::apply([&service, &func] (Args&&... args)
mutable {
796 return futurize_apply(func, std::tuple_cat(std::forward_as_tuple(service), std::tuple(internal::unwrap_sharded_arg(std::forward<Args>(args))...)));
804template <
typename Service>
805template <
typename Func,
typename... Args>
806requires std::invocable<Func, Service&, internal::sharded_unwrap_t<Args>...>
807 && std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, internal::sharded_unwrap_t<Args>...>>,
future<>>
818template <
typename Service>
819template <
typename Func,
typename... Args>
820requires std::invocable<Func, Service&, Args...>
821 && std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, Args...>>,
future<>>
826 return invoke_on_all(options, [orig =
this_shard_id(), func = std::move(func), args = std::tuple(std::move(args)...)] (Service& s)
mutable ->
future<> {
827 return this_shard_id() == orig ? make_ready_future<>() : futurize_apply(func, std::tuple_cat(std::forward_as_tuple(s), args));;
834template <
typename Service>
835template <
typename Func,
typename... Args>
836requires std::invocable<Func, Service&, Args...>
837 && std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, Args...>>,
future<>>
848template <
typename Service>
849template <
typename Func,
typename... Args,
typename Ret>
850requires std::invocable<Func, Service&, Args&&...>
854 return smp::submit_to(
id, options, [
this, func = std::forward<Func>(func), args = std::tuple(std::move(args)...)] ()
mutable {
855 auto inst = get_local_service();
856 return std::apply(std::forward<Func>(func), std::tuple_cat(std::forward_as_tuple(*inst), std::move(args)));
860template <
typename Service>
861template <
typename Func,
typename... Args,
typename Ret>
862requires std::invocable<Func, Service&, Args&&...>
866 return invoke_on(
id,
smp_submit_to_options(), std::forward<Func>(func), std::forward<Args>(args)...);
869template <
typename Service>
870template <
typename R,
typename Func,
typename... Args>
871requires std::invocable<Func, Service&, Args...>
872 && std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, internal::sharded_unwrap_t<Args>...>>,
future<>>
873 && internal::unsigned_range<R>
878 auto func_futurized = invoke_on_multiple_func_type([func = std::move(func), args = std::tuple(std::move(args)...)] (Service& service)
mutable {
881 return futurize_apply(func, std::tuple_cat(std::forward_as_tuple(service), std::tuple(internal::unwrap_sharded_arg(std::forward<Args>(args))...)));
883 return parallel_for_each(range, [
this, options, func = std::move(func_futurized)] (
unsigned s) {
884 if (s > smp::count - 1) {
885 throw std::invalid_argument(
format(
"Invalid shard id in range: {}. Must be in range [0,{})", s, smp::count));
888 return func(*get_local_service());
896template <
typename Service>
897template <
typename R,
typename Func,
typename... Args>
898requires std::invocable<Func, Service&, Args...>
899 && std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, internal::sharded_unwrap_t<Args>...>>,
future<>>
900 && internal::unsigned_range<R>
905 return invoke_on(std::forward<R>(range),
smp_submit_to_options{}, std::move(func), std::move(args)...);
911template <
typename Service>
913 assert(local_is_initialized());
917template <
typename Service>
919 assert(local_is_initialized());
923template <
typename Service>
925 assert(local_is_initialized());
929template <
typename Service>
935SEASTAR_MODULE_EXPORT_BEGIN
960template <
typename PtrType>
961requires (!std::is_pointer_v<PtrType>)
967 void destroy(PtrType p,
unsigned cpu)
noexcept {
971 auto f = destroy_on(std::move(p), cpu);
972 if (!f.available() || f.failed()) {
973 internal::run_in_background(std::move(f));
977 static future<> destroy_on(PtrType p,
unsigned cpu)
noexcept {
990 return make_ready_future<>();
993 using element_type =
typename std::pointer_traits<PtrType>::element_type;
994 using pointer = element_type*;
1004 foreign_ptr(PtrType value)
noexcept(std::is_nothrow_move_constructible_v<PtrType>)
1005 : _value(std::move(value))
1015 destroy(std::move(_value), _cpu);
1025 element_type&
operator*() const noexcept(noexcept(*_value)) {
return *_value; }
1027 element_type*
operator->() const noexcept(noexcept(&*_value)) {
return &*_value; }
1029 pointer
get() const noexcept(noexcept(&*_value)) {
return &*_value; }
1036 operator bool() const noexcept(noexcept(static_cast<
bool>(_value))) {
return static_cast<bool>(_value); }
1039 destroy(std::move(_value), _cpu);
1040 _value = std::move(other._value);
1049 PtrType
release() noexcept(
std::is_nothrow_default_constructible_v<PtrType>) {
1050 return std::exchange(_value, {});
1055 void reset(PtrType new_ptr)
noexcept(std::is_nothrow_move_constructible_v<PtrType>) {
1056 auto old_ptr = std::move(_value);
1057 auto old_cpu = _cpu;
1059 _value = std::move(new_ptr);
1062 destroy(std::move(old_ptr), old_cpu);
1067 void reset(std::nullptr_t =
nullptr) noexcept(
std::is_nothrow_default_constructible_v<PtrType>) {
1075 return destroy_on(std::move(_value), _cpu);
1082template <
typename T>
1092SEASTAR_MODULE_EXPORT_END
Definition: sharded.hh:125
Definition: shared_ptr.hh:493
Definition: sharded.hh:962
pointer get() const noexcept(noexcept(&*_value))
Access the raw pointer to the wrapped object.
Definition: sharded.hh:1029
foreign_ptr(std::nullptr_t) noexcept(std::is_nothrow_default_constructible_v< foreign_ptr >)
Constructs a null foreign_ptr<>.
Definition: sharded.hh:1002
element_type * operator->() const noexcept(noexcept(&*_value))
Accesses the wrapped object.
Definition: sharded.hh:1027
void reset(PtrType new_ptr) noexcept(std::is_nothrow_move_constructible_v< PtrType >)
Definition: sharded.hh:1055
unsigned get_owner_shard() const noexcept
Definition: sharded.hh:1034
~foreign_ptr()
Destroys the wrapped object on its original cpu.
Definition: sharded.hh:1014
foreign_ptr(PtrType value) noexcept(std::is_nothrow_move_constructible_v< PtrType >)
Wraps a pointer object and remembers the current core.
Definition: sharded.hh:1004
foreign_ptr(foreign_ptr &&other) noexcept(std::is_nothrow_move_constructible_v< PtrType >)=default
Moves a foreign_ptr<> to another object.
void reset(std::nullptr_t=nullptr) noexcept(std::is_nothrow_default_constructible_v< PtrType >)
Definition: sharded.hh:1067
foreign_ptr() noexcept(std::is_nothrow_default_constructible_v< PtrType >)
Constructs a null foreign_ptr<>.
Definition: sharded.hh:997
foreign_ptr & operator=(foreign_ptr &&other) noexcept(std::is_nothrow_move_constructible_v< PtrType >)
Move-assigns a foreign_ptr<>.
Definition: sharded.hh:1038
future< foreign_ptr > copy() const noexcept
Creates a copy of this foreign ptr. Only works if the stored ptr is copyable.
Definition: sharded.hh:1018
future destroy() noexcept
Definition: sharded.hh:1074
PtrType release() noexcept(std::is_nothrow_default_constructible_v< PtrType >)
Definition: sharded.hh:1049
element_type & operator*() const noexcept(noexcept(*_value))
Accesses the wrapped object.
Definition: sharded.hh:1025
A representation of a possibly not-yet-computed value.
Definition: future.hh:1240
Result then(Func &&func) noexcept
Schedule a block of code to run when the future is ready.
Definition: future.hh:1425
value_type && get()
gets the value returned by the computation
Definition: future.hh:1342
Exception thrown when a sharded object does not exist.
Definition: sharded.hh:161
Provide a sharded service with access to its peers.
Definition: sharded.hh:145
void set_value(A &&... a) noexcept
Sets the promises value.
Definition: future.hh:990
Helper to pass a parameter to a sharded<> object that depends on the shard. It is evaluated on the sh...
Definition: sharded.hh:554
sharded_parameter(Func func, Params... params)
Definition: sharded.hh:566
Definition: sharded.hh:182
sharded(sharded &&other)=delete
~sharded()
Destroyes a sharded object. Must not be in a started state.
Definition: sharded.hh:584
future invoke_on_all(smp_submit_to_options options, std::function< future<>(Service &)> func) noexcept
Definition: sharded.hh:763
future start_single(Args &&... args) noexcept
Definition: sharded.hh:657
const Service & local() const noexcept
Gets a reference to the local instance.
Definition: sharded.hh:912
future< std::vector< return_type > > map(Mapper mapper)
Definition: sharded.hh:494
future< Initial > map_reduce0(Mapper map, Initial initial, Reduce reduce) const
The const version of map_reduce0(Mapper map, Initial initial, Reduce reduce)
Definition: sharded.hh:471
auto map_reduce(Reducer &&r, Func &&func, Args &&... args) const -> typename reducer_traits< Reducer >::future_type
The const version of map_reduce(Reducer&& r, Func&& func)
Definition: sharded.hh:421
sharded() noexcept
Definition: sharded.hh:221
future stop() noexcept
Definition: sharded.hh:733
future invoke_on_others(smp_submit_to_options options, Func func, Args... args) noexcept
Definition: sharded.hh:824
auto map_reduce(Reducer &&r, Func &&func, Args &&... args) -> typename reducer_traits< Reducer >::future_type
Definition: sharded.hh:404
future< Initial > map_reduce0(Mapper map, Initial initial, Reduce reduce)
Definition: sharded.hh:454
Ret invoke_on(unsigned id, smp_submit_to_options options, Func &&func, Args &&... args)
Definition: sharded.hh:853
future start(Args &&... args) noexcept
Definition: sharded.hh:629
future< T > get_future() noexcept
Gets the promise's associated future.
Definition: future.hh:1926
future< T > current_exception_as_future() noexcept
Returns std::current_exception() wrapped in a future.
Definition: future.hh:1962
auto do_with(T1 &&rv1, T2 &&rv2, More &&... more) noexcept
Definition: do_with.hh:135
auto map_reduce(Iterator begin, Iterator end, Mapper &&mapper, Reducer &&r) -> typename reducer_traits< Reducer >::future_type
Definition: map_reduce.hh:107
future parallel_for_each(Iterator begin, Sentinel end, Func &&func) noexcept
Run tasks in parallel (iterator version).
Definition: loop.hh:568
foreign_ptr< T > make_foreign(T ptr)
Definition: sharded.hh:1083
std::future< T > submit_to(instance &instance, unsigned shard, Func func)
Definition: alien.hh:204
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
sstring format(fmt::format_string< A... > fmt, A &&... a)
Definition: print.hh:132
shard_id this_shard_id() noexcept
Returns shard_id of the of the current shard.
Definition: shard_id.hh:52
Definition: shared_ptr.hh:653
Definition: is_smart_ptr.hh:30
Options controlling the behaviour of smp::submit_to().
Definition: smp.hh:168