25#include <seastar/core/loop.hh>
26#include <seastar/core/map_reduce.hh>
27#include <seastar/core/internal/run_in_background.hh>
28#include <seastar/util/is_smart_ptr.hh>
29#include <seastar/util/tuple_utils.hh>
30#include <seastar/core/do_with.hh>
31#include <seastar/util/assert.hh>
32#include <seastar/util/log.hh>
33#include <seastar/util/modules.hh>
54SEASTAR_MODULE_EXPORT_BEGIN
56template <
typename Func,
typename... Param>
57class sharded_parameter;
59template <
typename Service>
62SEASTAR_MODULE_EXPORT_END
66template <
typename Func,
typename... Param>
67auto unwrap_sharded_arg(sharded_parameter<Func, Param...> sp);
69using on_each_shard_func = std::function<future<> (
unsigned shard)>;
71future<> sharded_parallel_for_each(
unsigned nr_shards, on_each_shard_func on_each_shard)
noexcept(std::is_nothrow_move_constructible_v<on_each_shard_func>);
73template <
typename Service>
74class either_sharded_or_local {
75 sharded<Service>& _sharded;
77 either_sharded_or_local(sharded<Service>& s) : _sharded(s) {}
78 operator sharded<Service>& ();
83struct sharded_unwrap {
84 using evaluated_type = T;
89struct sharded_unwrap<
std::reference_wrapper<sharded<T>>> {
90 using evaluated_type = T&;
91 using type = either_sharded_or_local<T>;
94template <
typename Func,
typename... Param>
95struct sharded_unwrap<sharded_parameter<Func, Param...>> {
96 using type = std::invoke_result_t<Func, Param...>;
100using sharded_unwrap_evaluated_t =
typename sharded_unwrap<T>::evaluated_type;
103using sharded_unwrap_t =
typename sharded_unwrap<T>::type;
106concept unsigned_range = std::ranges::range<R>
107 && std::is_unsigned_v<std::ranges::range_value_t<R>>;
115SEASTAR_MODULE_EXPORT_BEGIN
136 template <
typename Service>
friend class sharded;
145template <
typename Service>
149 template <
typename T>
friend class sharded;
150 void set_container(
sharded<Service>* container)
noexcept { _container = container; }
167 : _msg(
"sharded instance does not exist: " + type_info) {}
168 virtual const char* what()
const noexcept override {
182template <
typename Service>
187 future<> track_deletion()
noexcept {
189 if constexpr (std::is_base_of_v<async_sharded_service<Service>, Service>) {
191 return service->freed();
194 return make_ready_future<>();
197 std::vector<entry> _instances;
199 using invoke_on_multiple_func_type = std::function<future<> (Service&)>;
201 template <
typename U,
bool async>
204 template <
typename T>
205 std::enable_if_t<std::is_base_of_v<peering_sharded_service<T>, T>>
206 set_container(T& service)
noexcept {
207 service.set_container(
this);
210 template <
typename T>
211 std::enable_if_t<!std::is_base_of_v<peering_sharded_service<T>, T>>
212 set_container(T&)
noexcept {
216 sharded_parallel_for_each(internal::on_each_shard_func func)
noexcept(std::is_nothrow_move_constructible_v<internal::on_each_shard_func>) {
217 return internal::sharded_parallel_for_each(_instances.size(), std::move(func));
238 template <
typename... Args>
247 template <
typename... Args>
287 template <typename Func, typename... Args>
288 requires
std::invocable<Func, Service&, internal::sharded_unwrap_t<Args>...>
289 &&
std::is_same_v<futurize_t<
std::invoke_result_t<Func, Service&, internal::sharded_unwrap_t<Args>...>>,
future<>>
297 template <typename Func, typename... Args>
298 requires
std::invocable<Func, Service&, internal::sharded_unwrap_t<Args>...>
299 &&
std::is_same_v<futurize_t<
std::invoke_result_t<Func, Service&, internal::sharded_unwrap_t<Args>...>>,
future<>>
312 template <typename Func, typename... Args>
313 requires
std::invocable<Func, Service&, Args...>
314 &&
std::is_same_v<futurize_t<
std::invoke_result_t<Func, Service&, Args...>>,
future<>>
328 template <typename Func, typename... Args>
329 requires
std::invocable<Func, Service&, Args...>
330 &&
std::is_same_v<futurize_t<
std::invoke_result_t<Func, Service&, Args...>>,
future<>>
345 template <typename Func, typename... Args, typename Ret = futurize_t<
std::invoke_result_t<Func, Service&, Args...>>>
346 requires
std::invocable<Func, Service&, Args&&...>
358 template <typename Func, typename... Args, typename Ret = futurize_t<
std::invoke_result_t<Func, Service&, Args&&...>>>
359 requires
std::invocable<Func, Service&, Args&&...>
374 template <typename R, typename Func, typename... Args>
375 requires
std::invocable<Func, Service&, Args...>
376 &&
std::is_same_v<futurize_t<
std::invoke_result_t<Func, Service&, internal::sharded_unwrap_t<Args>...>>,
future<>>
377 && internal::unsigned_range<R>
392 template <typename R, typename Func, typename... Args>
393 requires
std::invocable<Func, Service&, Args...>
394 &&
std::is_same_v<futurize_t<
std::invoke_result_t<Func, Service&, internal::sharded_unwrap_t<Args>...>>,
future<>>
395 && internal::unsigned_range<R>
403 template <typename Reducer, typename Func, typename... Args>
404 auto
map_reduce(Reducer&& r, Func&& func, Args&&... args) -> typename reducer_traits<Reducer>::future_type
406 auto rng = std::views::iota(
size_t(0), _instances.size());
408 [
this, func = std::forward<Func>(func), args = std::make_tuple(std::forward<Args>(args)...)] (
unsigned c)
mutable {
409 return smp::submit_to(c, [this, &func, args] () mutable {
410 return std::apply([this, &func] (Args&&... args) mutable {
411 auto inst = get_local_service();
412 return std::invoke(func, *inst, std::forward<Args>(args)...);
415 }, std::forward<Reducer>(r));
419 template <
typename Reducer,
typename Func,
typename... Args>
420 auto map_reduce(Reducer&& r, Func&& func, Args&&... args)
const ->
typename reducer_traits<Reducer>::future_type
422 auto rng = std::views::iota(
size_t(0), _instances.size());
424 [
this, func = std::forward<Func>(func), args = std::make_tuple(std::forward<Args>(args)...)] (
unsigned c) {
425 return smp::submit_to(c, [this, &func, args] () {
426 return std::apply([this, &func] (Args&&... args) {
427 auto inst = get_local_service();
428 return std::invoke(func, *inst, std::forward<Args>(args)...);
431 }, std::forward<Reducer>(r));
450 template <
typename Mapper,
typename Initial,
typename Reduce>
453 auto wrapped_map = [
this, map] (
unsigned c) {
455 auto inst = get_local_service();
456 return std::invoke(map, *inst);
460 std::move(wrapped_map),
466 template <
typename Mapper,
typename Initial,
typename Reduce>
469 auto wrapped_map = [
this, map] (
unsigned c) {
471 auto inst = get_local_service();
472 return std::invoke(map, *inst);
476 std::move(wrapped_map),
490 template <
typename Mapper,
typename Future = futurize_t<std::invoke_result_t<Mapper,Service&>>,
typename return_type = decltype(
internal::untuple(std::declval<
typename Future::tuple_type>()))>
492 return do_with(std::vector<return_type>(), std::move(mapper),
493 [
this] (std::vector<return_type>& vec, Mapper& mapper)
mutable {
494 vec.resize(_instances.size());
495 return parallel_for_each(std::views::iota(0u, _instances.size()), [
this, &vec, &mapper] (
unsigned c) {
496 return smp::submit_to(c, [this, &mapper] {
497 auto inst = get_local_service();
498 return mapper(*inst);
499 }).
then([&vec, c] (
auto&& res) {
500 vec[c] = std::move(res);
503 return make_ready_future<std::vector<return_type>>(std::move(vec));
509 const Service&
local() const noexcept;
512 Service& local() noexcept;
518 bool local_is_initialized() const noexcept;
521 template <typename... Args>
522 shared_ptr<Service> create_local_service(Args&&... args) {
523 auto s = ::seastar::make_shared<Service>(std::forward<Args>(args)...);
536 shared_ptr<const Service> get_local_service()
const {
539 throw no_sharded_instance_exception(pretty_type_name(
typeid(Service)));
550template <
typename Func,
typename... Params>
553 std::tuple<Params...> _params;
564 requires std::invocable<Func, internal::sharded_unwrap_evaluated_t<Params>...>
565 : _func(std::move(func)), _params(std::make_tuple(std::move(params)...)) {
568 auto evaluate()
const;
570 template <
typename Func_,
typename... Param_>
577SEASTAR_MODULE_EXPORT_END
580template <
typename Service>
582 SEASTAR_ASSERT(_instances.empty());
589unwrap_sharded_arg(T&& arg) {
590 return std::forward<T>(arg);
593template <
typename Service>
594either_sharded_or_local<Service>
595unwrap_sharded_arg(std::reference_wrapper<sharded<Service>> arg) {
596 return either_sharded_or_local<Service>(arg);
599template <
typename Func,
typename... Param>
601unwrap_sharded_arg(sharded_parameter<Func, Param...> sp) {
602 return sp.evaluate();
605template <
typename Service>
606either_sharded_or_local<Service>::operator sharded<Service>& () {
return _sharded; }
608template <
typename Service>
609either_sharded_or_local<Service>::operator Service& () {
return _sharded.local(); }
613template <
typename Func,
typename... Param>
615sharded_parameter<Func, Param...>::evaluate()
const {
616 auto unwrap_params_and_invoke = [
this] (
const auto&... params) {
617 return std::invoke(_func, internal::unwrap_sharded_arg(params)...);
619 return std::apply(unwrap_params_and_invoke, _params);
622template <
typename Service>
623template <
typename... Args>
627 _instances.resize(smp::count);
628 return sharded_parallel_for_each(
629 [
this, args = std::make_tuple(std::forward<Args>(args)...)] (
unsigned c)
mutable {
631 _instances[
this_shard_id()].service = std::apply([
this] (Args... args) {
632 return create_local_service(internal::unwrap_sharded_arg(std::forward<Args>(args))...);
635 }).then_wrapped([
this] (
future<> f) {
638 return make_ready_future<>();
640 return this->stop().then([e = std::current_exception()] ()
mutable {
641 std::rethrow_exception(e);
650template <
typename Service>
651template <
typename... Args>
655 SEASTAR_ASSERT(_instances.empty());
656 _instances.resize(1);
657 return smp::submit_to(0, [
this, args = std::make_tuple(std::forward<Args>(args)...)] ()
mutable {
658 _instances[0].service = std::apply([
this] (Args... args) {
659 return create_local_service(internal::unwrap_sharded_arg(std::forward<Args>(args))...);
661 }).then_wrapped([
this] (
future<> f) {
664 return make_ready_future<>();
666 return this->stop().then([e = std::current_exception()] ()
mutable {
667 std::rethrow_exception(e);
680struct sharded_has_stop {
685 template <
typename Service>
686 constexpr static auto check(
int) -> std::enable_if_t<(
sizeof(&Service::stop) >= 0),
bool> {
692 static constexpr auto check(...) ->
bool {
697template <
bool stop_exists>
698struct sharded_call_stop {
699 template <
typename Service>
700 static future<> call(Service& instance);
704template <
typename Service>
705future<> sharded_call_stop<true>::call(Service& instance) {
706 return instance.stop();
710template <
typename Service>
711future<> sharded_call_stop<false>::call(Service&) {
712 return make_ready_future<>();
715template <
typename Service>
717stop_sharded_instance(Service& instance) {
718 constexpr bool has_stop = internal::sharded_has_stop::check<Service>(0);
719 return internal::sharded_call_stop<has_stop>::call(instance);
724template <
typename Service>
728 return sharded_parallel_for_each([
this] (
unsigned c)
mutable {
732 return make_ready_future<>();
734 return internal::stop_sharded_instance(*inst);
736 }).then_wrapped([
this] (
future<> fut) {
737 return sharded_parallel_for_each([
this] (
unsigned c) {
743 }).
finally([
this, fut = std::move(fut)] ()
mutable {
745 _instances = std::vector<sharded<Service>::entry>();
746 return std::move(fut);
754template <
typename Service>
758 return sharded_parallel_for_each([
this, options, func = std::move(func)] (
unsigned c) {
760 return func(*get_local_service());
768template <
typename Service>
778template <
typename Service>
779template <
typename Func,
typename... Args>
780requires std::invocable<Func, Service&, internal::sharded_unwrap_t<Args>...>
781 && std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, internal::sharded_unwrap_t<Args>...>>,
future<>>
785 return invoke_on_all(options, invoke_on_multiple_func_type([func = std::move(func), args = std::tuple(std::move(args)...)] (Service& service)
mutable {
786 return std::apply([&service, &func] (Args&&... args)
mutable {
787 return futurize_apply(func, std::tuple_cat(std::forward_as_tuple(service), std::tuple(internal::unwrap_sharded_arg(std::forward<Args>(args))...)));
795template <
typename Service>
796template <
typename Func,
typename... Args>
797requires std::invocable<Func, Service&, internal::sharded_unwrap_t<Args>...>
798 && std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, internal::sharded_unwrap_t<Args>...>>,
future<>>
808template <
typename Service>
809template <
typename Func,
typename... Args>
810requires std::invocable<Func, Service&, Args...>
811 && std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, Args...>>,
future<>>
815 return invoke_on_all(options, [orig =
this_shard_id(), func = std::move(func), args = std::tuple(std::move(args)...)] (Service& s)
mutable ->
future<> {
816 return this_shard_id() == orig ? make_ready_future<>() : futurize_apply(func, std::tuple_cat(std::forward_as_tuple(s), args));;
823template <
typename Service>
824template <
typename Func,
typename... Args>
825requires std::invocable<Func, Service&, Args...>
826 && std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, Args...>>,
future<>>
836template <
typename Service>
837template <
typename Func,
typename... Args,
typename Ret>
838requires std::invocable<Func, Service&, Args&&...>
841 return smp::submit_to(
id, options, [
this, func = std::forward<Func>(func), args = std::tuple(std::move(args)...)] ()
mutable {
842 auto inst = get_local_service();
843 return std::apply(std::forward<Func>(func), std::tuple_cat(std::forward_as_tuple(*inst), std::move(args)));
847template <
typename Service>
848template <
typename Func,
typename... Args,
typename Ret>
849requires std::invocable<Func, Service&, Args&&...>
852 return invoke_on(
id,
smp_submit_to_options(), std::forward<Func>(func), std::forward<Args>(args)...);
855template <
typename Service>
856template <
typename R,
typename Func,
typename... Args>
857requires std::invocable<Func, Service&, Args...>
858 && std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, internal::sharded_unwrap_t<Args>...>>,
future<>>
859 && internal::unsigned_range<R>
863 auto func_futurized = invoke_on_multiple_func_type([func = std::move(func), args = std::tuple(std::move(args)...)] (Service& service)
mutable {
866 return futurize_apply(func, std::tuple_cat(std::forward_as_tuple(service), std::tuple(internal::unwrap_sharded_arg(std::forward<Args>(args))...)));
868 return parallel_for_each(range, [
this, options, func = std::move(func_futurized)] (
unsigned s) {
869 if (s > smp::count - 1) {
870 throw std::invalid_argument(
format(
"Invalid shard id in range: {}. Must be in range [0,{})", s, smp::count));
873 return func(*get_local_service());
881template <
typename Service>
882template <
typename R,
typename Func,
typename... Args>
883requires std::invocable<Func, Service&, Args...>
884 && std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, internal::sharded_unwrap_t<Args>...>>,
future<>>
885 && internal::unsigned_range<R>
889 return invoke_on(std::forward<R>(range),
smp_submit_to_options{}, std::move(func), std::move(args)...);
895template <
typename Service>
897 SEASTAR_ASSERT(local_is_initialized());
901template <
typename Service>
903 SEASTAR_ASSERT(local_is_initialized());
907template <
typename Service>
909 SEASTAR_ASSERT(local_is_initialized());
913template <
typename Service>
919SEASTAR_MODULE_EXPORT_BEGIN
944template <
typename PtrType>
945requires (!std::is_pointer_v<PtrType>)
951 void destroy(PtrType p,
unsigned cpu)
noexcept {
955 auto f = destroy_on(std::move(p), cpu);
956 if (!f.available() || f.failed()) {
957 internal::run_in_background(std::move(f));
961 static future<> destroy_on(PtrType p,
unsigned cpu)
noexcept {
974 return make_ready_future<>();
977 using element_type =
typename std::pointer_traits<PtrType>::element_type;
978 using pointer = element_type*;
988 foreign_ptr(PtrType value)
noexcept(std::is_nothrow_move_constructible_v<PtrType>)
989 : _value(std::move(value))
999 destroy(std::move(_value), _cpu);
1009 element_type&
operator*() const noexcept(noexcept(*_value)) {
return *_value; }
1011 element_type*
operator->() const noexcept(noexcept(&*_value)) {
return &*_value; }
1013 pointer
get() const noexcept(noexcept(&*_value)) {
return &*_value; }
1020 operator bool() const noexcept(noexcept(static_cast<
bool>(_value))) {
return static_cast<bool>(_value); }
1023 destroy(std::move(_value), _cpu);
1024 _value = std::move(other._value);
1033 PtrType
release() noexcept(
std::is_nothrow_default_constructible_v<PtrType>) {
1034 return std::exchange(_value, {});
1039 void reset(PtrType new_ptr)
noexcept(std::is_nothrow_move_constructible_v<PtrType>) {
1040 auto old_ptr = std::move(_value);
1041 auto old_cpu = _cpu;
1043 _value = std::move(new_ptr);
1046 destroy(std::move(old_ptr), old_cpu);
1051 void reset(std::nullptr_t =
nullptr) noexcept(
std::is_nothrow_default_constructible_v<PtrType>) {
1059 return destroy_on(std::move(_value), _cpu);
1066template <
typename T>
1076SEASTAR_MODULE_EXPORT_END
Definition: sharded.hh:126
Definition: shared_ptr.hh:493
Definition: sharded.hh:946
pointer get() const noexcept(noexcept(&*_value))
Access the raw pointer to the wrapped object.
Definition: sharded.hh:1013
foreign_ptr(std::nullptr_t) noexcept(std::is_nothrow_default_constructible_v< foreign_ptr >)
Constructs a null foreign_ptr<>.
Definition: sharded.hh:986
element_type * operator->() const noexcept(noexcept(&*_value))
Accesses the wrapped object.
Definition: sharded.hh:1011
void reset(PtrType new_ptr) noexcept(std::is_nothrow_move_constructible_v< PtrType >)
Definition: sharded.hh:1039
unsigned get_owner_shard() const noexcept
Definition: sharded.hh:1018
~foreign_ptr()
Destroys the wrapped object on its original cpu.
Definition: sharded.hh:998
foreign_ptr(PtrType value) noexcept(std::is_nothrow_move_constructible_v< PtrType >)
Wraps a pointer object and remembers the current core.
Definition: sharded.hh:988
foreign_ptr(foreign_ptr &&other) noexcept(std::is_nothrow_move_constructible_v< PtrType >)=default
Moves a foreign_ptr<> to another object.
void reset(std::nullptr_t=nullptr) noexcept(std::is_nothrow_default_constructible_v< PtrType >)
Definition: sharded.hh:1051
foreign_ptr() noexcept(std::is_nothrow_default_constructible_v< PtrType >)
Constructs a null foreign_ptr<>.
Definition: sharded.hh:981
foreign_ptr & operator=(foreign_ptr &&other) noexcept(std::is_nothrow_move_constructible_v< PtrType >)
Move-assigns a foreign_ptr<>.
Definition: sharded.hh:1022
future< foreign_ptr > copy() const noexcept
Creates a copy of this foreign ptr. Only works if the stored ptr is copyable.
Definition: sharded.hh:1002
future destroy() noexcept
Definition: sharded.hh:1058
PtrType release() noexcept(std::is_nothrow_default_constructible_v< PtrType >)
Definition: sharded.hh:1033
element_type & operator*() const noexcept(noexcept(*_value))
Accesses the wrapped object.
Definition: sharded.hh:1009
A representation of a possibly not-yet-computed value.
Definition: future.hh:1197
Result then(Func &&func) noexcept
Schedule a block of code to run when the future is ready.
Definition: future.hh:1358
value_type && get()
gets the value returned by the computation
Definition: future.hh:1299
Exception thrown when a sharded object does not exist.
Definition: sharded.hh:162
Provide a sharded service with access to its peers.
Definition: sharded.hh:146
void set_value(A &&... a) noexcept
Sets the promises value.
Definition: future.hh:968
Helper to pass a parameter to a sharded<> object that depends on the shard. It is evaluated on the sh...
Definition: sharded.hh:551
sharded_parameter(Func func, Params... params)
Definition: sharded.hh:563
Definition: sharded.hh:183
sharded(sharded &&other)=delete
~sharded()
Destroyes a sharded object. Must not be in a started state.
Definition: sharded.hh:581
future invoke_on_all(smp_submit_to_options options, std::function< future<>(Service &)> func) noexcept
Definition: sharded.hh:756
future start_single(Args &&... args) noexcept
Definition: sharded.hh:653
const Service & local() const noexcept
Gets a reference to the local instance.
Definition: sharded.hh:896
future< std::vector< return_type > > map(Mapper mapper)
Definition: sharded.hh:491
future< Initial > map_reduce0(Mapper map, Initial initial, Reduce reduce) const
The const version of map_reduce0(Mapper map, Initial initial, Reduce reduce)
Definition: sharded.hh:468
auto map_reduce(Reducer &&r, Func &&func, Args &&... args) const -> typename reducer_traits< Reducer >::future_type
The const version of map_reduce(Reducer&& r, Func&& func)
Definition: sharded.hh:420
sharded() noexcept
Definition: sharded.hh:222
future stop() noexcept
Definition: sharded.hh:726
future invoke_on_others(smp_submit_to_options options, Func func, Args... args) noexcept
Definition: sharded.hh:813
auto map_reduce(Reducer &&r, Func &&func, Args &&... args) -> typename reducer_traits< Reducer >::future_type
Definition: sharded.hh:404
future< Initial > map_reduce0(Mapper map, Initial initial, Reduce reduce)
Definition: sharded.hh:452
Ret invoke_on(unsigned id, smp_submit_to_options options, Func &&func, Args &&... args)
Definition: sharded.hh:840
future start(Args &&... args) noexcept
Definition: sharded.hh:625
future< T > get_future() noexcept
Gets the promise's associated future.
Definition: future.hh:1852
future< T > current_exception_as_future() noexcept
Returns std::current_exception() wrapped in a future.
Definition: future.hh:1888
auto do_with(T1 &&rv1, T2 &&rv2, More &&... more) noexcept
Definition: do_with.hh:135
auto map_reduce(Iterator begin, Iterator end, Mapper &&mapper, Reducer &&r) -> typename reducer_traits< Reducer >::future_type
Definition: map_reduce.hh:107
future parallel_for_each(Iterator begin, Sentinel end, Func &&func) noexcept
Run tasks in parallel (iterator version).
Definition: loop.hh:572
foreign_ptr< T > make_foreign(T ptr)
Definition: sharded.hh:1067
std::future< T > submit_to(instance &instance, unsigned shard, Func func)
Definition: alien.hh:204
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
sstring format(fmt::format_string< A... > fmt, A &&... a)
Definition: format.hh:42
shard_id this_shard_id() noexcept
Returns shard_id of the of the current shard.
Definition: shard_id.hh:52
Definition: shared_ptr.hh:653
Definition: is_smart_ptr.hh:30
Options controlling the behaviour of smp::submit_to().
Definition: smp.hh:168