25 #include <seastar/core/loop.hh>
26 #include <seastar/core/map_reduce.hh>
27 #include <seastar/util/is_smart_ptr.hh>
28 #include <seastar/util/tuple_utils.hh>
29 #include <seastar/core/do_with.hh>
30 #include <seastar/util/concepts.hh>
31 #include <seastar/util/log.hh>
32 #include <seastar/core/reactor.hh>
33 #include <seastar/util/modules.hh>
35 #ifndef SEASTAR_MODULE
36 #include <boost/iterator/counting_iterator.hpp>
38 #if __has_include(<concepts>)
55 SEASTAR_MODULE_EXPORT_BEGIN
57 template <
typename Func,
typename... Param>
58 class sharded_parameter;
60 template <
typename Service>
63 SEASTAR_MODULE_EXPORT_END
67 template <
typename Func,
typename... Param>
68 auto unwrap_sharded_arg(sharded_parameter<Func, Param...> sp);
70 using on_each_shard_func = std::function<future<> (
unsigned shard)>;
72 future<> sharded_parallel_for_each(
unsigned nr_shards, on_each_shard_func on_each_shard) noexcept(std::is_nothrow_move_constructible_v<on_each_shard_func>);
74 template <
typename Service>
75 class either_sharded_or_local {
76 sharded<Service>& _sharded;
78 either_sharded_or_local(sharded<Service>& s) : _sharded(s) {}
79 operator sharded<Service>& ();
84 struct sharded_unwrap {
85 using evaluated_type = T;
90 struct sharded_unwrap<std::reference_wrapper<sharded<T>>> {
91 using evaluated_type = T&;
92 using type = either_sharded_or_local<T>;
95 template <
typename Func,
typename... Param>
96 struct sharded_unwrap<sharded_parameter<Func, Param...>> {
97 using type = std::invoke_result_t<Func, Param...>;
100 template <
typename T>
101 using sharded_unwrap_evaluated_t =
typename sharded_unwrap<T>::evaluated_type;
103 template <
typename T>
104 using sharded_unwrap_t =
typename sharded_unwrap<T>::type;
112 SEASTAR_MODULE_EXPORT_BEGIN
114 template <
typename T>
125 std::function<void()> _delete_cb;
132 template <
typename Service>
friend class sharded;
141 template <
typename Service>
145 template <
typename T>
friend class sharded;
146 void set_container(
sharded<Service>* container) noexcept { _container = container; }
163 : _msg(
"sharded instance does not exist: " + type_info) {}
164 virtual const char* what()
const noexcept
override {
178 template <
typename Service>
184 std::vector<entry> _instances;
186 using invoke_on_all_func_type = std::function<future<> (Service&)>;
188 void service_deleted() noexcept {
191 template <
typename U,
bool async>
194 template <
typename T>
195 std::enable_if_t<std::is_base_of<peering_sharded_service<T>, T>::value>
196 set_container(T& service) noexcept {
197 service.set_container(
this);
200 template <
typename T>
201 std::enable_if_t<!std::is_base_of<peering_sharded_service<T>, T>::value>
202 set_container(T&) noexcept {
206 sharded_parallel_for_each(internal::on_each_shard_func func) noexcept(std::is_nothrow_move_constructible_v<internal::on_each_shard_func>) {
207 return internal::sharded_parallel_for_each(_instances.size(), std::move(func));
228 template <
typename... Args>
237 template <
typename... Args>
283 template <
typename Func,
typename... Args>
284 SEASTAR_CONCEPT(requires std::invocable<Func, Service&, internal::sharded_unwrap_t<Args>...>)
292 template <typename Func, typename... Args>
293 SEASTAR_CONCEPT(requires std::invocable<Func, Service&, internal::sharded_unwrap_t<Args>...>)
312 template <
typename Func,
typename... Args>
313 SEASTAR_CONCEPT(requires std::invocable<Func, Service&, Args...>)
327 template <
typename Func,
typename... Args>
328 SEASTAR_CONCEPT(requires std::invocable<Func, Service&, Args...>)
341 template <
typename Reducer,
typename Func,
typename... Args>
343 auto map_reduce(Reducer&& r, Func&& func, Args&&... args) ->
typename reducer_traits<Reducer>::future_type
346 boost::make_counting_iterator<unsigned>(_instances.size()),
347 [
this, func = std::forward<Func>(func), args = std::make_tuple(std::forward<Args>(args)...)] (
unsigned c)
mutable {
348 return smp::submit_to(c, [this, &func, args] () mutable {
349 return std::apply([this, &func] (Args&&... args) mutable {
350 auto inst = get_local_service();
351 return std::invoke(func, *inst, std::forward<Args>(args)...);
354 }, std::forward<Reducer>(r));
358 template <
typename Reducer,
typename Func,
typename... Args>
360 auto map_reduce(Reducer&& r, Func&& func, Args&&... args)
const ->
typename reducer_traits<Reducer>::future_type
363 boost::make_counting_iterator<unsigned>(_instances.size()),
364 [
this, func = std::forward<Func>(func), args = std::make_tuple(std::forward<Args>(args)...)] (
unsigned c) {
365 return smp::submit_to(c, [this, &func, args] () {
366 return std::apply([this, &func] (Args&&... args) {
367 auto inst = get_local_service();
368 return std::invoke(func, *inst, std::forward<Args>(args)...);
371 }, std::forward<Reducer>(r));
390 template <
typename Mapper,
typename Initial,
typename Reduce>
394 auto wrapped_map = [
this, map] (
unsigned c) {
396 auto inst = get_local_service();
397 return std::invoke(map, *inst);
401 std::move(wrapped_map),
407 template <
typename Mapper,
typename Initial,
typename Reduce>
411 auto wrapped_map = [
this, map] (
unsigned c) {
413 auto inst = get_local_service();
414 return std::invoke(map, *inst);
418 std::move(wrapped_map),
432 template <
typename Mapper,
typename Future = futurize_t<std::invoke_result_t<Mapper,Service&>>,
typename return_type = decltype(
internal::untuple(std::declval<
typename Future::tuple_type>()))>
434 return do_with(std::vector<return_type>(), std::move(mapper),
435 [
this] (std::vector<return_type>& vec, Mapper& mapper)
mutable {
436 vec.resize(_instances.size());
437 return parallel_for_each(boost::irange<unsigned>(0, _instances.size()), [
this, &vec, &mapper] (
unsigned c) {
438 return smp::submit_to(c, [this, &mapper] {
439 auto inst = get_local_service();
440 return mapper(*inst);
441 }).
then([&vec, c] (
auto&& res) {
442 vec[c] = std::move(res);
445 return make_ready_future<std::vector<return_type>>(std::move(vec));
462 template <
typename Func,
typename... Args,
typename Ret = futurize_t<std::invoke_result_t<Func, Service&, Args...>>>
463 SEASTAR_CONCEPT(requires std::invocable<Func, Service&, Args&&...>)
466 return smp::submit_to(
id, options, [
this, func = std::forward<Func>(func), args = std::tuple(std::move(args)...)] ()
mutable {
467 auto inst = get_local_service();
468 return std::apply(std::forward<Func>(func), std::tuple_cat(std::forward_as_tuple(*inst), std::move(args)));
480 template <
typename Func,
typename... Args,
typename Ret = futurize_t<std::invoke_result_t<Func, Service&, Args&&...>>>
481 SEASTAR_CONCEPT(requires std::invocable<Func, Service&, Args&&...>)
484 return invoke_on(
id,
smp_submit_to_options(), std::forward<Func>(func), std::forward<Args>(args)...);
488 const Service& local() const noexcept;
491 Service& local() noexcept;
497 bool local_is_initialized() const noexcept;
500 void track_deletion(
shared_ptr<Service>&, std::false_type) noexcept {
505 void track_deletion(shared_ptr<Service>& s, std::true_type) {
506 s->_delete_cb = std::bind(std::mem_fn(&sharded<Service>::service_deleted),
this);
509 template <
typename... Args>
510 shared_ptr<Service> create_local_service(Args&&... args) {
511 auto s = ::seastar::make_shared<Service>(std::forward<Args>(args)...);
513 track_deletion(s, std::is_base_of<async_sharded_service<Service>, Service>());
517 shared_ptr<Service> get_local_service() {
520 throw no_sharded_instance_exception(pretty_type_name(
typeid(Service)));
525 shared_ptr<const Service> get_local_service()
const {
528 throw no_sharded_instance_exception(pretty_type_name(
typeid(Service)));
539 template <
typename Func,
typename... Params>
542 std::tuple<Params...> _params;
553 SEASTAR_CONCEPT(requires std::invocable<Func, internal::sharded_unwrap_evaluated_t<Params>...>)
554 : _func(std::move(func)), _params(std::make_tuple(std::move(params)...)) {
557 auto evaluate()
const;
559 template <
typename Func_,
typename... Param_>
566 SEASTAR_MODULE_EXPORT_END
569 template <
typename Service>
571 assert(_instances.empty());
576 template <
typename T>
579 unwrap_sharded_arg(T&& arg) {
580 return std::forward<T>(arg);
583 template <
typename Service>
584 either_sharded_or_local<Service>
585 unwrap_sharded_arg(std::reference_wrapper<sharded<Service>> arg) {
586 return either_sharded_or_local<Service>(arg);
589 template <
typename Func,
typename... Param>
591 unwrap_sharded_arg(sharded_parameter<Func, Param...> sp) {
592 return sp.evaluate();
595 template <
typename Service>
596 either_sharded_or_local<Service>::operator sharded<Service>& () {
return _sharded; }
598 template <
typename Service>
599 either_sharded_or_local<Service>::operator Service& () {
return _sharded.local(); }
603 template <
typename Func,
typename... Param>
605 sharded_parameter<Func, Param...>::evaluate()
const {
606 auto unwrap_params_and_invoke = [
this] (
const auto&... params) {
607 return std::invoke(_func, internal::unwrap_sharded_arg(params)...);
609 return std::apply(unwrap_params_and_invoke, _params);
612 template <
typename Service>
613 template <
typename... Args>
617 _instances.resize(smp::count);
618 return sharded_parallel_for_each(
619 [
this, args = std::make_tuple(std::forward<Args>(args)...)] (
unsigned c)
mutable {
621 _instances[
this_shard_id()].service = std::apply([
this] (Args... args) {
622 return create_local_service(internal::unwrap_sharded_arg(std::forward<Args>(args))...);
625 }).then_wrapped([
this] (
future<> f) {
628 return make_ready_future<>();
630 return this->stop().then([e = std::current_exception()] ()
mutable {
631 std::rethrow_exception(e);
640 template <
typename Service>
641 template <
typename... Args>
645 assert(_instances.empty());
646 _instances.resize(1);
647 return smp::submit_to(0, [
this, args = std::make_tuple(std::forward<Args>(args)...)] ()
mutable {
648 _instances[0].service = std::apply([
this] (Args... args) {
649 return create_local_service(internal::unwrap_sharded_arg(std::forward<Args>(args))...);
651 }).then_wrapped([
this] (
future<> f) {
654 return make_ready_future<>();
656 return this->stop().then([e = std::current_exception()] ()
mutable {
657 std::rethrow_exception(e);
670 struct sharded_has_stop {
675 template <
typename Service>
676 constexpr
static auto check(
int) -> std::enable_if_t<(
sizeof(&Service::stop) >= 0),
bool> {
682 static constexpr
auto check(...) ->
bool {
687 template <
bool stop_exists>
688 struct sharded_call_stop {
689 template <
typename Service>
690 static future<> call(Service& instance);
694 template <
typename Service>
696 future<> sharded_call_stop<true>::call(Service& instance) {
697 return instance.stop();
701 template <
typename Service>
703 future<> sharded_call_stop<false>::call(Service&) {
704 return make_ready_future<>();
707 template <
typename Service>
710 stop_sharded_instance(Service& instance) {
711 constexpr
bool has_stop = internal::sharded_has_stop::check<Service>(0);
712 return internal::sharded_call_stop<has_stop>::call(instance);
717 template <
typename Service>
721 return sharded_parallel_for_each([
this] (
unsigned c)
mutable {
725 return make_ready_future<>();
727 return internal::stop_sharded_instance(*inst);
729 }).then_wrapped([
this] (
future<> fut) {
730 return sharded_parallel_for_each([
this] (
unsigned c) {
733 return make_ready_future<>();
738 }).
finally([
this, fut = std::move(fut)] ()
mutable {
740 _instances = std::vector<sharded<Service>::entry>();
741 return std::move(fut);
749 template <
typename Service>
753 return sharded_parallel_for_each([
this, options, func = std::move(func)] (
unsigned c) {
755 return func(*get_local_service());
763 template <
typename Service>
764 template <
typename Func,
typename... Args>
765 SEASTAR_CONCEPT(requires std::invocable<Func, Service&, internal::sharded_unwrap_t<Args>...>)
769 static_assert(std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, internal::sharded_unwrap_t<Args>...>>,
future<>>,
770 "invoke_on_all()'s func must return void or future<>");
772 return invoke_on_all(options, invoke_on_all_func_type([func = std::move(func), args = std::tuple(std::move(args)...)] (Service& service)
mutable {
773 return std::apply([&service, &func] (Args&&... args)
mutable {
774 return futurize_apply(func, std::tuple_cat(std::forward_as_tuple(service), std::tuple(internal::unwrap_sharded_arg(std::forward<Args>(args))...)));
782 template <
typename Service>
783 template <
typename Func,
typename... Args>
784 SEASTAR_CONCEPT(requires std::invocable<Func, Service&, Args...>)
788 static_assert(std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, Args...>>,
future<>>,
789 "invoke_on_others()'s func must return void or future<>");
791 return invoke_on_all(options, [orig =
this_shard_id(), func = std::move(func), args = std::tuple(std::move(args)...)] (Service& s) ->
future<> {
792 return this_shard_id() == orig ? make_ready_future<>() : futurize_apply(func, std::tuple_cat(std::forward_as_tuple(s), args));;
799 template <
typename Service>
801 assert(local_is_initialized());
805 template <
typename Service>
807 assert(local_is_initialized());
811 template <
typename Service>
813 assert(local_is_initialized());
817 template <
typename Service>
823 SEASTAR_MODULE_EXPORT_BEGIN
848 template <
typename PtrType>
849 SEASTAR_CONCEPT( requires (!std::is_pointer<PtrType>::value) )
855 void destroy(PtrType p,
unsigned cpu) noexcept {
859 auto f = destroy_on(std::move(p), cpu);
860 if (!f.available() || f.failed()) {
861 engine().run_in_background(std::move(f));
865 static future<> destroy_on(PtrType p,
unsigned cpu) noexcept {
878 return make_ready_future<>();
881 using element_type =
typename std::pointer_traits<PtrType>::element_type;
882 using pointer = element_type*;
885 foreign_ptr() noexcept(std::is_nothrow_default_constructible_v<PtrType>)
892 foreign_ptr(PtrType value) noexcept(std::is_nothrow_move_constructible_v<PtrType>)
893 : _value(std::move(value))
903 destroy(std::move(_value), _cpu);
913 element_type&
operator*() const noexcept(noexcept(*_value)) {
return *_value; }
915 element_type*
operator->() const noexcept(noexcept(&*_value)) {
return &*_value; }
917 pointer
get() const noexcept(noexcept(&*_value)) {
return &*_value; }
924 operator bool() const noexcept(noexcept(static_cast<
bool>(_value))) {
return static_cast<bool>(_value); }
927 destroy(std::move(_value), _cpu);
928 _value = std::move(other._value);
937 PtrType
release() noexcept(std::is_nothrow_default_constructible_v<PtrType>) {
938 return std::exchange(_value, {});
943 void reset(PtrType new_ptr) noexcept(std::is_nothrow_move_constructible_v<PtrType>) {
944 auto old_ptr = std::move(_value);
947 _value = std::move(new_ptr);
950 destroy(std::move(old_ptr), old_cpu);
955 void reset(std::nullptr_t =
nullptr) noexcept(std::is_nothrow_default_constructible_v<PtrType>) {
963 return destroy_on(std::move(_value), _cpu);
970 template <
typename T>
980 SEASTAR_MODULE_EXPORT_END
Definition: sharded.hh:123
Definition: shared_ptr.hh:499
Definition: sharded.hh:850
pointer get() const noexcept(noexcept(&*_value))
Access the raw pointer to the wrapped object.
Definition: sharded.hh:917
foreign_ptr(std::nullptr_t) noexcept(std::is_nothrow_default_constructible_v< foreign_ptr >)
Constructs a null foreign_ptr<>.
Definition: sharded.hh:890
void reset(PtrType new_ptr) noexcept(std::is_nothrow_move_constructible_v< PtrType >)
Definition: sharded.hh:943
foreign_ptr & operator=(foreign_ptr &&other) noexcept(std::is_nothrow_move_constructible< PtrType >::value)
Move-assigns a foreign_ptr<>.
Definition: sharded.hh:926
unsigned get_owner_shard() const noexcept
Definition: sharded.hh:922
~foreign_ptr()
Destroys the wrapped object on its original cpu.
Definition: sharded.hh:902
foreign_ptr(PtrType value) noexcept(std::is_nothrow_move_constructible_v< PtrType >)
Wraps a pointer object and remembers the current core.
Definition: sharded.hh:892
future< foreign_ptr > copy() const noexcept
Creates a copy of this foreign ptr. Only works if the stored ptr is copyable.
Definition: sharded.hh:906
foreign_ptr(foreign_ptr &&other) noexcept(std::is_nothrow_move_constructible_v< PtrType >)=default
Moves a foreign_ptr<> to another object.
void reset(std::nullptr_t=nullptr) noexcept(std::is_nothrow_default_constructible_v< PtrType >)
Definition: sharded.hh:955
element_type * operator->() const noexcept(noexcept(&*_value))
Accesses the wrapped object.
Definition: sharded.hh:915
foreign_ptr() noexcept(std::is_nothrow_default_constructible_v< PtrType >)
Constructs a null foreign_ptr<>.
Definition: sharded.hh:885
future destroy() noexcept
Definition: sharded.hh:962
PtrType release() noexcept(std::is_nothrow_default_constructible_v< PtrType >)
Definition: sharded.hh:937
element_type & operator*() const noexcept(noexcept(*_value))
Accesses the wrapped object.
Definition: sharded.hh:913
Result then(Func &&func) noexcept
Schedule a block of code to run when the future is ready.
Definition: future.hh:1396
value_type && get()
gets the value returned by the computation
Definition: future.hh:1326
Exception thrown when a sharded object does not exist.
Definition: sharded.hh:158
Provide a sharded service with access to its peers.
Definition: sharded.hh:142
void set_value(A &&... a) noexcept
Sets the promises value.
Definition: future.hh:969
Helper to pass a parameter to a sharded<> object that depends on the shard. It is evaluated on the sh...
Definition: sharded.hh:540
sharded_parameter(Func func, Params... params)
Definition: sharded.hh:552
Definition: sharded.hh:179
future< std::vector< return_type > > map(Mapper mapper)
Definition: sharded.hh:433
sharded(sharded &&other)=delete
~sharded()
Destroyes a sharded object. Must not be in a started state.
Definition: sharded.hh:570
Ret invoke_on(unsigned id, Func &&func, Args &&... args)
Definition: sharded.hh:483
future invoke_on_all(smp_submit_to_options options, std::function< future<>(Service &)> func) noexcept
Definition: sharded.hh:751
Ret invoke_on(unsigned id, smp_submit_to_options options, Func &&func, Args &&... args)
Definition: sharded.hh:465
future start_single(Args &&... args) noexcept
Definition: sharded.hh:643
auto map_reduce(Reducer &&r, Func &&func, Args &&... args) const -> typename reducer_traits< Reducer >::future_type
The const version of map_reduce(Reducer&& r, Func&& func)
Definition: sharded.hh:360
future invoke_on_others(smp_submit_to_options options, Func func, Args... args) noexcept
Definition: sharded.hh:787
sharded() noexcept
Definition: sharded.hh:212
future stop() noexcept
Definition: sharded.hh:719
auto map_reduce(Reducer &&r, Func &&func, Args &&... args) -> typename reducer_traits< Reducer >::future_type
Definition: sharded.hh:343
future< Initial > map_reduce0(Mapper map, Initial initial, Reduce reduce) const
The const version of map_reduce0(Mapper map, Initial initial, Reduce reduce)
Definition: sharded.hh:410
future< Initial > map_reduce0(Mapper map, Initial initial, Reduce reduce)
Definition: sharded.hh:393
future start(Args &&... args) noexcept
Definition: sharded.hh:615
future< T > current_exception_as_future() noexcept
Returns std::current_exception() wrapped in a future.
Definition: future.hh:1934
auto do_with(T1 &&rv1, T2 &&rv2, More &&... more) noexcept
Definition: do_with.hh:135
auto map_reduce(Iterator begin, Iterator end, Mapper &&mapper, Reducer &&r) -> typename reducer_traits< Reducer >::future_type
Definition: map_reduce.hh:107
future parallel_for_each(Iterator begin, Sentinel end, Func &&func) noexcept
Run tasks in parallel (iterator version).
Definition: loop.hh:568
foreign_ptr< T > make_foreign(T ptr)
Definition: sharded.hh:971
std::future< T > submit_to(instance &instance, unsigned shard, Func func)
Definition: alien.hh:207
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
shard_id this_shard_id() noexcept
Returns shard_id of the of the current shard.
Definition: smp.hh:78
Definition: shared_ptr.hh:659
Definition: is_smart_ptr.hh:30
Options controlling the behaviour of smp::submit_to().
Definition: smp.hh:174