25 #include <seastar/core/loop.hh>
26 #include <seastar/core/map_reduce.hh>
27 #include <seastar/core/internal/run_in_background.hh>
28 #include <seastar/util/is_smart_ptr.hh>
29 #include <seastar/util/tuple_utils.hh>
30 #include <seastar/core/do_with.hh>
31 #include <seastar/util/concepts.hh>
32 #include <seastar/util/log.hh>
33 #include <seastar/util/modules.hh>
35 #ifndef SEASTAR_MODULE
36 #include <boost/iterator/counting_iterator.hpp>
38 #if __has_include(<concepts>)
55 SEASTAR_MODULE_EXPORT_BEGIN
57 template <
typename Func,
typename... Param>
58 class sharded_parameter;
60 template <
typename Service>
63 SEASTAR_MODULE_EXPORT_END
67 template <
typename Func,
typename... Param>
68 auto unwrap_sharded_arg(sharded_parameter<Func, Param...> sp);
70 using on_each_shard_func = std::function<future<> (
unsigned shard)>;
72 future<> sharded_parallel_for_each(
unsigned nr_shards, on_each_shard_func on_each_shard) noexcept(std::is_nothrow_move_constructible_v<on_each_shard_func>);
74 template <
typename Service>
75 class either_sharded_or_local {
76 sharded<Service>& _sharded;
78 either_sharded_or_local(sharded<Service>& s) : _sharded(s) {}
79 operator sharded<Service>& ();
84 struct sharded_unwrap {
85 using evaluated_type = T;
90 struct sharded_unwrap<std::reference_wrapper<sharded<T>>> {
91 using evaluated_type = T&;
92 using type = either_sharded_or_local<T>;
95 template <
typename Func,
typename... Param>
96 struct sharded_unwrap<sharded_parameter<Func, Param...>> {
97 using type = std::invoke_result_t<Func, Param...>;
100 template <
typename T>
101 using sharded_unwrap_evaluated_t =
typename sharded_unwrap<T>::evaluated_type;
103 template <
typename T>
104 using sharded_unwrap_t =
typename sharded_unwrap<T>::type;
112 SEASTAR_MODULE_EXPORT_BEGIN
114 template <
typename T>
133 template <
typename Service>
friend class sharded;
142 template <
typename Service>
146 template <
typename T>
friend class sharded;
147 void set_container(
sharded<Service>* container) noexcept { _container = container; }
164 : _msg(
"sharded instance does not exist: " + type_info) {}
165 virtual const char* what()
const noexcept
override {
179 template <
typename Service>
184 future<> track_deletion() noexcept {
188 return service->freed();
191 return make_ready_future<>();
194 std::vector<entry> _instances;
196 using invoke_on_all_func_type = std::function<future<> (Service&)>;
198 template <
typename U,
bool async>
201 template <
typename T>
202 std::enable_if_t<std::is_base_of_v<peering_sharded_service<T>, T>>
203 set_container(T& service) noexcept {
204 service.set_container(
this);
207 template <
typename T>
208 std::enable_if_t<!std::is_base_of_v<peering_sharded_service<T>, T>>
209 set_container(T&) noexcept {
213 sharded_parallel_for_each(internal::on_each_shard_func func) noexcept(std::is_nothrow_move_constructible_v<internal::on_each_shard_func>) {
214 return internal::sharded_parallel_for_each(_instances.size(), std::move(func));
235 template <
typename... Args>
244 template <
typename... Args>
290 template <
typename Func,
typename... Args>
291 SEASTAR_CONCEPT(requires std::invocable<Func, Service&, internal::sharded_unwrap_t<Args>...>)
299 template <typename Func, typename... Args>
300 SEASTAR_CONCEPT(requires std::invocable<Func, Service&, internal::sharded_unwrap_t<Args>...>)
319 template <
typename Func,
typename... Args>
320 SEASTAR_CONCEPT(requires std::invocable<Func, Service&, Args...>)
334 template <
typename Func,
typename... Args>
335 SEASTAR_CONCEPT(requires std::invocable<Func, Service&, Args...>)
348 template <
typename Reducer,
typename Func,
typename... Args>
350 auto map_reduce(Reducer&& r, Func&& func, Args&&... args) ->
typename reducer_traits<Reducer>::future_type
353 boost::make_counting_iterator<unsigned>(_instances.size()),
354 [
this, func = std::forward<Func>(func), args = std::make_tuple(std::forward<Args>(args)...)] (
unsigned c)
mutable {
355 return smp::submit_to(c, [this, &func, args] () mutable {
356 return std::apply([this, &func] (Args&&... args) mutable {
357 auto inst = get_local_service();
358 return std::invoke(func, *inst, std::forward<Args>(args)...);
361 }, std::forward<Reducer>(r));
365 template <
typename Reducer,
typename Func,
typename... Args>
367 auto map_reduce(Reducer&& r, Func&& func, Args&&... args)
const ->
typename reducer_traits<Reducer>::future_type
370 boost::make_counting_iterator<unsigned>(_instances.size()),
371 [
this, func = std::forward<Func>(func), args = std::make_tuple(std::forward<Args>(args)...)] (
unsigned c) {
372 return smp::submit_to(c, [this, &func, args] () {
373 return std::apply([this, &func] (Args&&... args) {
374 auto inst = get_local_service();
375 return std::invoke(func, *inst, std::forward<Args>(args)...);
378 }, std::forward<Reducer>(r));
397 template <
typename Mapper,
typename Initial,
typename Reduce>
401 auto wrapped_map = [
this, map] (
unsigned c) {
403 auto inst = get_local_service();
404 return std::invoke(map, *inst);
408 std::move(wrapped_map),
414 template <
typename Mapper,
typename Initial,
typename Reduce>
418 auto wrapped_map = [
this, map] (
unsigned c) {
420 auto inst = get_local_service();
421 return std::invoke(map, *inst);
425 std::move(wrapped_map),
439 template <
typename Mapper,
typename Future = futurize_t<std::invoke_result_t<Mapper,Service&>>,
typename return_type = decltype(
internal::untuple(std::declval<
typename Future::tuple_type>()))>
441 return do_with(std::vector<return_type>(), std::move(mapper),
442 [
this] (std::vector<return_type>& vec, Mapper& mapper)
mutable {
443 vec.resize(_instances.size());
444 return parallel_for_each(boost::irange<unsigned>(0, _instances.size()), [
this, &vec, &mapper] (
unsigned c) {
445 return smp::submit_to(c, [this, &mapper] {
446 auto inst = get_local_service();
447 return mapper(*inst);
448 }).
then([&vec, c] (
auto&& res) {
449 vec[c] = std::move(res);
452 return make_ready_future<std::vector<return_type>>(std::move(vec));
469 template <
typename Func,
typename... Args,
typename Ret = futurize_t<std::invoke_result_t<Func, Service&, Args...>>>
470 SEASTAR_CONCEPT(requires std::invocable<Func, Service&, Args&&...>)
473 return smp::submit_to(
id, options, [
this, func = std::forward<Func>(func), args = std::tuple(std::move(args)...)] ()
mutable {
474 auto inst = get_local_service();
475 return std::apply(std::forward<Func>(func), std::tuple_cat(std::forward_as_tuple(*inst), std::move(args)));
487 template <
typename Func,
typename... Args,
typename Ret = futurize_t<std::invoke_result_t<Func, Service&, Args&&...>>>
488 SEASTAR_CONCEPT(requires std::invocable<Func, Service&, Args&&...>)
491 return invoke_on(
id,
smp_submit_to_options(), std::forward<Func>(func), std::forward<Args>(args)...);
495 const Service& local() const noexcept;
498 Service& local() noexcept;
504 bool local_is_initialized() const noexcept;
507 template <typename... Args>
508 shared_ptr<Service> create_local_service(Args&&... args) {
509 auto s = ::seastar::make_shared<Service>(std::forward<Args>(args)...);
514 shared_ptr<Service> get_local_service() {
517 throw no_sharded_instance_exception(pretty_type_name(
typeid(Service)));
522 shared_ptr<const Service> get_local_service()
const {
525 throw no_sharded_instance_exception(pretty_type_name(
typeid(Service)));
536 template <
typename Func,
typename... Params>
539 std::tuple<Params...> _params;
550 SEASTAR_CONCEPT(requires std::invocable<Func, internal::sharded_unwrap_evaluated_t<Params>...>)
551 : _func(std::move(func)), _params(std::make_tuple(std::move(params)...)) {
554 auto evaluate()
const;
556 template <
typename Func_,
typename... Param_>
563 SEASTAR_MODULE_EXPORT_END
566 template <
typename Service>
568 assert(_instances.empty());
573 template <
typename T>
576 unwrap_sharded_arg(T&& arg) {
577 return std::forward<T>(arg);
580 template <
typename Service>
581 either_sharded_or_local<Service>
582 unwrap_sharded_arg(std::reference_wrapper<sharded<Service>> arg) {
583 return either_sharded_or_local<Service>(arg);
586 template <
typename Func,
typename... Param>
588 unwrap_sharded_arg(sharded_parameter<Func, Param...> sp) {
589 return sp.evaluate();
592 template <
typename Service>
593 either_sharded_or_local<Service>::operator sharded<Service>& () {
return _sharded; }
595 template <
typename Service>
596 either_sharded_or_local<Service>::operator Service& () {
return _sharded.local(); }
600 template <
typename Func,
typename... Param>
602 sharded_parameter<Func, Param...>::evaluate()
const {
603 auto unwrap_params_and_invoke = [
this] (
const auto&... params) {
604 return std::invoke(_func, internal::unwrap_sharded_arg(params)...);
606 return std::apply(unwrap_params_and_invoke, _params);
609 template <
typename Service>
610 template <
typename... Args>
614 _instances.resize(smp::count);
615 return sharded_parallel_for_each(
616 [
this, args = std::make_tuple(std::forward<Args>(args)...)] (
unsigned c)
mutable {
618 _instances[
this_shard_id()].service = std::apply([
this] (Args... args) {
619 return create_local_service(internal::unwrap_sharded_arg(std::forward<Args>(args))...);
622 }).then_wrapped([
this] (
future<> f) {
625 return make_ready_future<>();
627 return this->stop().then([e = std::current_exception()] ()
mutable {
628 std::rethrow_exception(e);
637 template <
typename Service>
638 template <
typename... Args>
642 assert(_instances.empty());
643 _instances.resize(1);
644 return smp::submit_to(0, [
this, args = std::make_tuple(std::forward<Args>(args)...)] ()
mutable {
645 _instances[0].service = std::apply([
this] (Args... args) {
646 return create_local_service(internal::unwrap_sharded_arg(std::forward<Args>(args))...);
648 }).then_wrapped([
this] (
future<> f) {
651 return make_ready_future<>();
653 return this->stop().then([e = std::current_exception()] ()
mutable {
654 std::rethrow_exception(e);
667 struct sharded_has_stop {
672 template <
typename Service>
673 constexpr
static auto check(
int) -> std::enable_if_t<(
sizeof(&Service::stop) >= 0),
bool> {
679 static constexpr
auto check(...) ->
bool {
684 template <
bool stop_exists>
685 struct sharded_call_stop {
686 template <
typename Service>
687 static future<> call(Service& instance);
691 template <
typename Service>
693 future<> sharded_call_stop<true>::call(Service& instance) {
694 return instance.stop();
698 template <
typename Service>
700 future<> sharded_call_stop<false>::call(Service&) {
701 return make_ready_future<>();
704 template <
typename Service>
707 stop_sharded_instance(Service& instance) {
708 constexpr
bool has_stop = internal::sharded_has_stop::check<Service>(0);
709 return internal::sharded_call_stop<has_stop>::call(instance);
714 template <
typename Service>
718 return sharded_parallel_for_each([
this] (
unsigned c)
mutable {
722 return make_ready_future<>();
724 return internal::stop_sharded_instance(*inst);
726 }).then_wrapped([
this] (
future<> fut) {
727 return sharded_parallel_for_each([
this] (
unsigned c) {
733 }).
finally([
this, fut = std::move(fut)] ()
mutable {
735 _instances = std::vector<sharded<Service>::entry>();
736 return std::move(fut);
744 template <
typename Service>
748 return sharded_parallel_for_each([
this, options, func = std::move(func)] (
unsigned c) {
750 return func(*get_local_service());
758 template <
typename Service>
759 template <
typename Func,
typename... Args>
760 SEASTAR_CONCEPT(requires std::invocable<Func, Service&, internal::sharded_unwrap_t<Args>...>)
764 static_assert(std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, internal::sharded_unwrap_t<Args>...>>,
future<>>,
765 "invoke_on_all()'s func must return void or future<>");
767 return invoke_on_all(options, invoke_on_all_func_type([func = std::move(func), args = std::tuple(std::move(args)...)] (Service& service)
mutable {
768 return std::apply([&service, &func] (Args&&... args)
mutable {
769 return futurize_apply(func, std::tuple_cat(std::forward_as_tuple(service), std::tuple(internal::unwrap_sharded_arg(std::forward<Args>(args))...)));
777 template <
typename Service>
778 template <
typename Func,
typename... Args>
779 SEASTAR_CONCEPT(requires std::invocable<Func, Service&, Args...>)
783 static_assert(std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, Args...>>,
future<>>,
784 "invoke_on_others()'s func must return void or future<>");
786 return invoke_on_all(options, [orig =
this_shard_id(), func = std::move(func), args = std::tuple(std::move(args)...)] (Service& s) ->
future<> {
787 return this_shard_id() == orig ? make_ready_future<>() : futurize_apply(func, std::tuple_cat(std::forward_as_tuple(s), args));;
794 template <
typename Service>
796 assert(local_is_initialized());
800 template <
typename Service>
802 assert(local_is_initialized());
806 template <
typename Service>
808 assert(local_is_initialized());
812 template <
typename Service>
818 SEASTAR_MODULE_EXPORT_BEGIN
843 template <
typename PtrType>
844 SEASTAR_CONCEPT( requires (!std::is_pointer_v<PtrType>) )
850 void destroy(PtrType p,
unsigned cpu) noexcept {
854 auto f = destroy_on(std::move(p), cpu);
855 if (!f.available() || f.failed()) {
856 internal::run_in_background(std::move(f));
860 static future<> destroy_on(PtrType p,
unsigned cpu) noexcept {
873 return make_ready_future<>();
876 using element_type =
typename std::pointer_traits<PtrType>::element_type;
877 using pointer = element_type*;
880 foreign_ptr() noexcept(std::is_nothrow_default_constructible_v<PtrType>)
887 foreign_ptr(PtrType value) noexcept(std::is_nothrow_move_constructible_v<PtrType>)
888 : _value(std::move(value))
898 destroy(std::move(_value), _cpu);
908 element_type&
operator*() const noexcept(noexcept(*_value)) {
return *_value; }
910 element_type*
operator->() const noexcept(noexcept(&*_value)) {
return &*_value; }
912 pointer
get() const noexcept(noexcept(&*_value)) {
return &*_value; }
919 operator bool() const noexcept(noexcept(static_cast<
bool>(_value))) {
return static_cast<bool>(_value); }
922 destroy(std::move(_value), _cpu);
923 _value = std::move(other._value);
932 PtrType
release() noexcept(std::is_nothrow_default_constructible_v<PtrType>) {
933 return std::exchange(_value, {});
938 void reset(PtrType new_ptr) noexcept(std::is_nothrow_move_constructible_v<PtrType>) {
939 auto old_ptr = std::move(_value);
942 _value = std::move(new_ptr);
945 destroy(std::move(old_ptr), old_cpu);
950 void reset(std::nullptr_t =
nullptr) noexcept(std::is_nothrow_default_constructible_v<PtrType>) {
958 return destroy_on(std::move(_value), _cpu);
965 template <
typename T>
975 SEASTAR_MODULE_EXPORT_END
Definition: sharded.hh:123
Definition: shared_ptr.hh:501
Definition: sharded.hh:845
pointer get() const noexcept(noexcept(&*_value))
Access the raw pointer to the wrapped object.
Definition: sharded.hh:912
foreign_ptr(std::nullptr_t) noexcept(std::is_nothrow_default_constructible_v< foreign_ptr >)
Constructs a null foreign_ptr<>.
Definition: sharded.hh:885
foreign_ptr & operator=(foreign_ptr &&other) noexcept(std::is_nothrow_move_constructible_v< PtrType >)
Move-assigns a foreign_ptr<>.
Definition: sharded.hh:921
void reset(PtrType new_ptr) noexcept(std::is_nothrow_move_constructible_v< PtrType >)
Definition: sharded.hh:938
unsigned get_owner_shard() const noexcept
Definition: sharded.hh:917
~foreign_ptr()
Destroys the wrapped object on its original cpu.
Definition: sharded.hh:897
foreign_ptr(PtrType value) noexcept(std::is_nothrow_move_constructible_v< PtrType >)
Wraps a pointer object and remembers the current core.
Definition: sharded.hh:887
future< foreign_ptr > copy() const noexcept
Creates a copy of this foreign ptr. Only works if the stored ptr is copyable.
Definition: sharded.hh:901
foreign_ptr(foreign_ptr &&other) noexcept(std::is_nothrow_move_constructible_v< PtrType >)=default
Moves a foreign_ptr<> to another object.
void reset(std::nullptr_t=nullptr) noexcept(std::is_nothrow_default_constructible_v< PtrType >)
Definition: sharded.hh:950
element_type * operator->() const noexcept(noexcept(&*_value))
Accesses the wrapped object.
Definition: sharded.hh:910
foreign_ptr() noexcept(std::is_nothrow_default_constructible_v< PtrType >)
Constructs a null foreign_ptr<>.
Definition: sharded.hh:880
future destroy() noexcept
Definition: sharded.hh:957
PtrType release() noexcept(std::is_nothrow_default_constructible_v< PtrType >)
Definition: sharded.hh:932
element_type & operator*() const noexcept(noexcept(*_value))
Accesses the wrapped object.
Definition: sharded.hh:908
Result then(Func &&func) noexcept
Schedule a block of code to run when the future is ready.
Definition: future.hh:1410
value_type && get()
gets the value returned by the computation
Definition: future.hh:1340
Exception thrown when a sharded object does not exist.
Definition: sharded.hh:159
Provide a sharded service with access to its peers.
Definition: sharded.hh:143
void set_value(A &&... a) noexcept
Sets the promises value.
Definition: future.hh:982
Helper to pass a parameter to a sharded<> object that depends on the shard. It is evaluated on the sh...
Definition: sharded.hh:537
sharded_parameter(Func func, Params... params)
Definition: sharded.hh:549
Definition: sharded.hh:180
future< std::vector< return_type > > map(Mapper mapper)
Definition: sharded.hh:440
sharded(sharded &&other)=delete
~sharded()
Destroyes a sharded object. Must not be in a started state.
Definition: sharded.hh:567
Ret invoke_on(unsigned id, Func &&func, Args &&... args)
Definition: sharded.hh:490
future invoke_on_all(smp_submit_to_options options, std::function< future<>(Service &)> func) noexcept
Definition: sharded.hh:746
Ret invoke_on(unsigned id, smp_submit_to_options options, Func &&func, Args &&... args)
Definition: sharded.hh:472
future start_single(Args &&... args) noexcept
Definition: sharded.hh:640
auto map_reduce(Reducer &&r, Func &&func, Args &&... args) const -> typename reducer_traits< Reducer >::future_type
The const version of map_reduce(Reducer&& r, Func&& func)
Definition: sharded.hh:367
future invoke_on_others(smp_submit_to_options options, Func func, Args... args) noexcept
Definition: sharded.hh:782
sharded() noexcept
Definition: sharded.hh:219
future stop() noexcept
Definition: sharded.hh:716
auto map_reduce(Reducer &&r, Func &&func, Args &&... args) -> typename reducer_traits< Reducer >::future_type
Definition: sharded.hh:350
future< Initial > map_reduce0(Mapper map, Initial initial, Reduce reduce) const
The const version of map_reduce0(Mapper map, Initial initial, Reduce reduce)
Definition: sharded.hh:417
future< Initial > map_reduce0(Mapper map, Initial initial, Reduce reduce)
Definition: sharded.hh:400
future start(Args &&... args) noexcept
Definition: sharded.hh:612
future< T > get_future() noexcept
Gets the promise's associated future.
Definition: future.hh:1917
future< T > current_exception_as_future() noexcept
Returns std::current_exception() wrapped in a future.
Definition: future.hh:1953
auto do_with(T1 &&rv1, T2 &&rv2, More &&... more) noexcept
Definition: do_with.hh:135
auto map_reduce(Iterator begin, Iterator end, Mapper &&mapper, Reducer &&r) -> typename reducer_traits< Reducer >::future_type
Definition: map_reduce.hh:107
future parallel_for_each(Iterator begin, Sentinel end, Func &&func) noexcept
Run tasks in parallel (iterator version).
Definition: loop.hh:565
foreign_ptr< T > make_foreign(T ptr)
Definition: sharded.hh:966
std::future< T > submit_to(instance &instance, unsigned shard, Func func)
Definition: alien.hh:206
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
SEASTAR_MODULE_EXPORT_BEGIN shard_id this_shard_id() noexcept
Returns shard_id of the of the current shard.
Definition: shard_id.hh:50
Definition: shared_ptr.hh:661
Definition: is_smart_ptr.hh:30
Options controlling the behaviour of smp::submit_to().
Definition: smp.hh:168