25#include <seastar/core/loop.hh>
26#include <seastar/core/map_reduce.hh>
27#include <seastar/core/internal/run_in_background.hh>
28#include <seastar/util/is_smart_ptr.hh>
29#include <seastar/util/tuple_utils.hh>
30#include <seastar/core/do_with.hh>
31#include <seastar/util/log.hh>
32#include <seastar/util/modules.hh>
35#include <boost/iterator/counting_iterator.hpp>
52SEASTAR_MODULE_EXPORT_BEGIN
54template <
typename Func,
typename... Param>
55class sharded_parameter;
57template <
typename Service>
60SEASTAR_MODULE_EXPORT_END
64template <
typename Func,
typename... Param>
65auto unwrap_sharded_arg(sharded_parameter<Func, Param...> sp);
67using on_each_shard_func = std::function<future<> (
unsigned shard)>;
69future<> sharded_parallel_for_each(
unsigned nr_shards, on_each_shard_func on_each_shard)
noexcept(std::is_nothrow_move_constructible_v<on_each_shard_func>);
71template <
typename Service>
72class either_sharded_or_local {
73 sharded<Service>& _sharded;
75 either_sharded_or_local(sharded<Service>& s) : _sharded(s) {}
76 operator sharded<Service>& ();
81struct sharded_unwrap {
82 using evaluated_type = T;
87struct sharded_unwrap<
std::reference_wrapper<sharded<T>>> {
88 using evaluated_type = T&;
89 using type = either_sharded_or_local<T>;
92template <
typename Func,
typename... Param>
93struct sharded_unwrap<sharded_parameter<Func, Param...>> {
94 using type = std::invoke_result_t<Func, Param...>;
98using sharded_unwrap_evaluated_t =
typename sharded_unwrap<T>::evaluated_type;
101using sharded_unwrap_t =
typename sharded_unwrap<T>::type;
109SEASTAR_MODULE_EXPORT_BEGIN
130 template <
typename Service>
friend class sharded;
139template <
typename Service>
143 template <
typename T>
friend class sharded;
144 void set_container(
sharded<Service>* container)
noexcept { _container = container; }
161 : _msg(
"sharded instance does not exist: " + type_info) {}
162 virtual const char* what()
const noexcept override {
176template <
typename Service>
181 future<> track_deletion()
noexcept {
183 if constexpr (std::is_base_of_v<async_sharded_service<Service>, Service>) {
185 return service->freed();
188 return make_ready_future<>();
191 std::vector<entry> _instances;
193 using invoke_on_all_func_type = std::function<future<> (Service&)>;
195 template <
typename U,
bool async>
198 template <
typename T>
199 std::enable_if_t<std::is_base_of_v<peering_sharded_service<T>, T>>
200 set_container(T& service)
noexcept {
201 service.set_container(
this);
204 template <
typename T>
205 std::enable_if_t<!std::is_base_of_v<peering_sharded_service<T>, T>>
206 set_container(T&)
noexcept {
210 sharded_parallel_for_each(internal::on_each_shard_func func)
noexcept(std::is_nothrow_move_constructible_v<internal::on_each_shard_func>) {
211 return internal::sharded_parallel_for_each(_instances.size(), std::move(func));
232 template <
typename... Args>
241 template <
typename... Args>
287 template <
typename Func,
typename... Args>
288 requires std::invocable<Func, Service&, internal::sharded_unwrap_t<Args>...>
296 template <
typename Func,
typename... Args>
297 requires std::invocable<Func, Service&, internal::sharded_unwrap_t<Args>...>
316 template <
typename Func,
typename... Args>
317 requires std::invocable<Func, Service&, Args...>
331 template <
typename Func,
typename... Args>
332 requires std::invocable<Func, Service&, Args...>
345 template <
typename Reducer,
typename Func,
typename... Args>
347 auto map_reduce(Reducer&& r, Func&& func, Args&&... args) ->
typename reducer_traits<Reducer>::future_type
350 boost::make_counting_iterator<unsigned>(_instances.size()),
351 [
this, func = std::forward<Func>(func), args = std::make_tuple(std::forward<Args>(args)...)] (
unsigned c)
mutable {
352 return smp::submit_to(c, [this, &func, args] () mutable {
353 return std::apply([this, &func] (Args&&... args) mutable {
354 auto inst = get_local_service();
355 return std::invoke(func, *inst, std::forward<Args>(args)...);
358 }, std::forward<Reducer>(r));
362 template <
typename Reducer,
typename Func,
typename... Args>
364 auto map_reduce(Reducer&& r, Func&& func, Args&&... args)
const ->
typename reducer_traits<Reducer>::future_type
367 boost::make_counting_iterator<unsigned>(_instances.size()),
368 [
this, func = std::forward<Func>(func), args = std::make_tuple(std::forward<Args>(args)...)] (
unsigned c) {
369 return smp::submit_to(c, [this, &func, args] () {
370 return std::apply([this, &func] (Args&&... args) {
371 auto inst = get_local_service();
372 return std::invoke(func, *inst, std::forward<Args>(args)...);
375 }, std::forward<Reducer>(r));
394 template <
typename Mapper,
typename Initial,
typename Reduce>
398 auto wrapped_map = [
this, map] (
unsigned c) {
400 auto inst = get_local_service();
401 return std::invoke(map, *inst);
405 std::move(wrapped_map),
411 template <
typename Mapper,
typename Initial,
typename Reduce>
415 auto wrapped_map = [
this, map] (
unsigned c) {
417 auto inst = get_local_service();
418 return std::invoke(map, *inst);
422 std::move(wrapped_map),
436 template <
typename Mapper,
typename Future = futurize_t<std::invoke_result_t<Mapper,Service&>>,
typename return_type = decltype(
internal::untuple(std::declval<
typename Future::tuple_type>()))>
438 return do_with(std::vector<return_type>(), std::move(mapper),
439 [
this] (std::vector<return_type>& vec, Mapper& mapper)
mutable {
440 vec.resize(_instances.size());
441 return parallel_for_each(boost::irange<unsigned>(0, _instances.size()), [
this, &vec, &mapper] (
unsigned c) {
442 return smp::submit_to(c, [this, &mapper] {
443 auto inst = get_local_service();
444 return mapper(*inst);
445 }).
then([&vec, c] (
auto&& res) {
446 vec[c] = std::move(res);
449 return make_ready_future<std::vector<return_type>>(std::move(vec));
466 template <
typename Func,
typename... Args,
typename Ret = futurize_t<std::invoke_result_t<Func, Service&, Args...>>>
467 requires std::invocable<Func, Service&, Args&&...>
470 return smp::submit_to(
id, options, [
this, func = std::forward<Func>(func), args = std::tuple(std::move(args)...)] ()
mutable {
471 auto inst = get_local_service();
472 return std::apply(std::forward<Func>(func), std::tuple_cat(std::forward_as_tuple(*inst), std::move(args)));
484 template <
typename Func,
typename... Args,
typename Ret = futurize_t<std::invoke_result_t<Func, Service&, Args&&...>>>
485 requires std::invocable<Func, Service&, Args&&...>
488 return invoke_on(
id,
smp_submit_to_options(), std::forward<Func>(func), std::forward<Args>(args)...);
492 const Service&
local() const noexcept;
495 Service& local() noexcept;
501 bool local_is_initialized() const noexcept;
504 template <typename... Args>
505 shared_ptr<Service> create_local_service(Args&&... args) {
506 auto s = ::seastar::make_shared<Service>(std::forward<Args>(args)...);
519 shared_ptr<const Service> get_local_service()
const {
522 throw no_sharded_instance_exception(pretty_type_name(
typeid(Service)));
533template <
typename Func,
typename... Params>
536 std::tuple<Params...> _params;
547 requires std::invocable<Func, internal::sharded_unwrap_evaluated_t<Params>...>
548 : _func(std::move(func)), _params(std::make_tuple(std::move(params)...)) {
551 auto evaluate()
const;
553 template <
typename Func_,
typename... Param_>
560SEASTAR_MODULE_EXPORT_END
563template <
typename Service>
565 assert(_instances.empty());
573unwrap_sharded_arg(T&& arg) {
574 return std::forward<T>(arg);
577template <
typename Service>
578either_sharded_or_local<Service>
579unwrap_sharded_arg(std::reference_wrapper<sharded<Service>> arg) {
580 return either_sharded_or_local<Service>(arg);
583template <
typename Func,
typename... Param>
585unwrap_sharded_arg(sharded_parameter<Func, Param...> sp) {
586 return sp.evaluate();
589template <
typename Service>
590either_sharded_or_local<Service>::operator sharded<Service>& () {
return _sharded; }
592template <
typename Service>
593either_sharded_or_local<Service>::operator Service& () {
return _sharded.local(); }
597template <
typename Func,
typename... Param>
599sharded_parameter<Func, Param...>::evaluate()
const {
600 auto unwrap_params_and_invoke = [
this] (
const auto&... params) {
601 return std::invoke(_func, internal::unwrap_sharded_arg(params)...);
603 return std::apply(unwrap_params_and_invoke, _params);
606template <
typename Service>
607template <
typename... Args>
611 _instances.resize(smp::count);
612 return sharded_parallel_for_each(
613 [
this, args = std::make_tuple(std::forward<Args>(args)...)] (
unsigned c)
mutable {
615 _instances[
this_shard_id()].service = std::apply([
this] (Args... args) {
616 return create_local_service(internal::unwrap_sharded_arg(std::forward<Args>(args))...);
619 }).then_wrapped([
this] (
future<> f) {
622 return make_ready_future<>();
624 return this->stop().then([e = std::current_exception()] ()
mutable {
625 std::rethrow_exception(e);
634template <
typename Service>
635template <
typename... Args>
639 assert(_instances.empty());
640 _instances.resize(1);
641 return smp::submit_to(0, [
this, args = std::make_tuple(std::forward<Args>(args)...)] ()
mutable {
642 _instances[0].service = std::apply([
this] (Args... args) {
643 return create_local_service(internal::unwrap_sharded_arg(std::forward<Args>(args))...);
645 }).then_wrapped([
this] (
future<> f) {
648 return make_ready_future<>();
650 return this->stop().then([e = std::current_exception()] ()
mutable {
651 std::rethrow_exception(e);
664struct sharded_has_stop {
669 template <
typename Service>
670 constexpr static auto check(
int) -> std::enable_if_t<(
sizeof(&Service::stop) >= 0),
bool> {
676 static constexpr auto check(...) ->
bool {
681template <
bool stop_exists>
682struct sharded_call_stop {
683 template <
typename Service>
684 static future<> call(Service& instance);
688template <
typename Service>
690future<> sharded_call_stop<true>::call(Service& instance) {
691 return instance.stop();
695template <
typename Service>
697future<> sharded_call_stop<false>::call(Service&) {
698 return make_ready_future<>();
701template <
typename Service>
704stop_sharded_instance(Service& instance) {
705 constexpr bool has_stop = internal::sharded_has_stop::check<Service>(0);
706 return internal::sharded_call_stop<has_stop>::call(instance);
711template <
typename Service>
715 return sharded_parallel_for_each([
this] (
unsigned c)
mutable {
719 return make_ready_future<>();
721 return internal::stop_sharded_instance(*inst);
723 }).then_wrapped([
this] (
future<> fut) {
724 return sharded_parallel_for_each([
this] (
unsigned c) {
730 }).
finally([
this, fut = std::move(fut)] ()
mutable {
732 _instances = std::vector<sharded<Service>::entry>();
733 return std::move(fut);
741template <
typename Service>
745 return sharded_parallel_for_each([
this, options, func = std::move(func)] (
unsigned c) {
747 return func(*get_local_service());
755template <
typename Service>
756template <
typename Func,
typename... Args>
757requires std::invocable<Func, Service&, internal::sharded_unwrap_t<Args>...>
761 static_assert(std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, internal::sharded_unwrap_t<Args>...>>,
future<>>,
762 "invoke_on_all()'s func must return void or future<>");
764 return invoke_on_all(options, invoke_on_all_func_type([func = std::move(func), args = std::tuple(std::move(args)...)] (Service& service)
mutable {
765 return std::apply([&service, &func] (Args&&... args)
mutable {
766 return futurize_apply(func, std::tuple_cat(std::forward_as_tuple(service), std::tuple(internal::unwrap_sharded_arg(std::forward<Args>(args))...)));
774template <
typename Service>
775template <
typename Func,
typename... Args>
776requires std::invocable<Func, Service&, Args...>
780 static_assert(std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, Args...>>,
future<>>,
781 "invoke_on_others()'s func must return void or future<>");
783 return invoke_on_all(options, [orig =
this_shard_id(), func = std::move(func), args = std::tuple(std::move(args)...)] (Service& s)
mutable ->
future<> {
784 return this_shard_id() == orig ? make_ready_future<>() : futurize_apply(func, std::tuple_cat(std::forward_as_tuple(s), args));;
791template <
typename Service>
793 assert(local_is_initialized());
797template <
typename Service>
799 assert(local_is_initialized());
803template <
typename Service>
805 assert(local_is_initialized());
809template <
typename Service>
815SEASTAR_MODULE_EXPORT_BEGIN
840template <
typename PtrType>
841requires (!std::is_pointer_v<PtrType>)
847 void destroy(PtrType p,
unsigned cpu)
noexcept {
851 auto f = destroy_on(std::move(p), cpu);
852 if (!f.available() || f.failed()) {
853 internal::run_in_background(std::move(f));
857 static future<> destroy_on(PtrType p,
unsigned cpu)
noexcept {
870 return make_ready_future<>();
873 using element_type =
typename std::pointer_traits<PtrType>::element_type;
874 using pointer = element_type*;
884 foreign_ptr(PtrType value)
noexcept(std::is_nothrow_move_constructible_v<PtrType>)
885 : _value(std::move(value))
895 destroy(std::move(_value), _cpu);
905 element_type&
operator*() const noexcept(noexcept(*_value)) {
return *_value; }
907 element_type*
operator->() const noexcept(noexcept(&*_value)) {
return &*_value; }
909 pointer
get() const noexcept(noexcept(&*_value)) {
return &*_value; }
916 operator bool() const noexcept(noexcept(static_cast<
bool>(_value))) {
return static_cast<bool>(_value); }
919 destroy(std::move(_value), _cpu);
920 _value = std::move(other._value);
929 PtrType
release() noexcept(
std::is_nothrow_default_constructible_v<PtrType>) {
930 return std::exchange(_value, {});
935 void reset(PtrType new_ptr)
noexcept(std::is_nothrow_move_constructible_v<PtrType>) {
936 auto old_ptr = std::move(_value);
939 _value = std::move(new_ptr);
942 destroy(std::move(old_ptr), old_cpu);
947 void reset(std::nullptr_t =
nullptr) noexcept(
std::is_nothrow_default_constructible_v<PtrType>) {
955 return destroy_on(std::move(_value), _cpu);
972SEASTAR_MODULE_EXPORT_END
Definition: sharded.hh:120
Definition: shared_ptr.hh:499
Definition: sharded.hh:842
pointer get() const noexcept(noexcept(&*_value))
Access the raw pointer to the wrapped object.
Definition: sharded.hh:909
foreign_ptr(std::nullptr_t) noexcept(std::is_nothrow_default_constructible_v< foreign_ptr >)
Constructs a null foreign_ptr<>.
Definition: sharded.hh:882
element_type * operator->() const noexcept(noexcept(&*_value))
Accesses the wrapped object.
Definition: sharded.hh:907
void reset(PtrType new_ptr) noexcept(std::is_nothrow_move_constructible_v< PtrType >)
Definition: sharded.hh:935
unsigned get_owner_shard() const noexcept
Definition: sharded.hh:914
~foreign_ptr()
Destroys the wrapped object on its original cpu.
Definition: sharded.hh:894
foreign_ptr(PtrType value) noexcept(std::is_nothrow_move_constructible_v< PtrType >)
Wraps a pointer object and remembers the current core.
Definition: sharded.hh:884
foreign_ptr(foreign_ptr &&other) noexcept(std::is_nothrow_move_constructible_v< PtrType >)=default
Moves a foreign_ptr<> to another object.
void reset(std::nullptr_t=nullptr) noexcept(std::is_nothrow_default_constructible_v< PtrType >)
Definition: sharded.hh:947
foreign_ptr() noexcept(std::is_nothrow_default_constructible_v< PtrType >)
Constructs a null foreign_ptr<>.
Definition: sharded.hh:877
foreign_ptr & operator=(foreign_ptr &&other) noexcept(std::is_nothrow_move_constructible_v< PtrType >)
Move-assigns a foreign_ptr<>.
Definition: sharded.hh:918
future< foreign_ptr > copy() const noexcept
Creates a copy of this foreign ptr. Only works if the stored ptr is copyable.
Definition: sharded.hh:898
future destroy() noexcept
Definition: sharded.hh:954
PtrType release() noexcept(std::is_nothrow_default_constructible_v< PtrType >)
Definition: sharded.hh:929
element_type & operator*() const noexcept(noexcept(*_value))
Accesses the wrapped object.
Definition: sharded.hh:905
A representation of a possibly not-yet-computed value.
Definition: future.hh:1240
Result then(Func &&func) noexcept
Schedule a block of code to run when the future is ready.
Definition: future.hh:1425
value_type && get()
gets the value returned by the computation
Definition: future.hh:1342
Exception thrown when a sharded object does not exist.
Definition: sharded.hh:156
Provide a sharded service with access to its peers.
Definition: sharded.hh:140
void set_value(A &&... a) noexcept
Sets the promises value.
Definition: future.hh:990
Helper to pass a parameter to a sharded<> object that depends on the shard. It is evaluated on the sh...
Definition: sharded.hh:534
sharded_parameter(Func func, Params... params)
Definition: sharded.hh:546
Definition: sharded.hh:177
sharded(sharded &&other)=delete
~sharded()
Destroyes a sharded object. Must not be in a started state.
Definition: sharded.hh:564
future invoke_on_all(smp_submit_to_options options, Func func, Args... args) noexcept
Definition: sharded.hh:760
Ret invoke_on(unsigned id, Func &&func, Args &&... args)
Definition: sharded.hh:487
future invoke_on_all(smp_submit_to_options options, std::function< future<>(Service &)> func) noexcept
Definition: sharded.hh:743
future start_single(Args &&... args) noexcept
Definition: sharded.hh:637
const Service & local() const noexcept
Gets a reference to the local instance.
Definition: sharded.hh:792
future< std::vector< return_type > > map(Mapper mapper)
Definition: sharded.hh:437
future< Initial > map_reduce0(Mapper map, Initial initial, Reduce reduce) const
The const version of map_reduce0(Mapper map, Initial initial, Reduce reduce)
Definition: sharded.hh:414
auto map_reduce(Reducer &&r, Func &&func, Args &&... args) const -> typename reducer_traits< Reducer >::future_type
The const version of map_reduce(Reducer&& r, Func&& func)
Definition: sharded.hh:364
future invoke_on_others(smp_submit_to_options options, Func func, Args... args) noexcept
Definition: sharded.hh:779
sharded() noexcept
Definition: sharded.hh:216
future stop() noexcept
Definition: sharded.hh:713
future invoke_on_others(Func func, Args... args) noexcept
Definition: sharded.hh:333
auto map_reduce(Reducer &&r, Func &&func, Args &&... args) -> typename reducer_traits< Reducer >::future_type
Definition: sharded.hh:347
future< Initial > map_reduce0(Mapper map, Initial initial, Reduce reduce)
Definition: sharded.hh:397
Ret invoke_on(unsigned id, smp_submit_to_options options, Func &&func, Args &&... args)
Definition: sharded.hh:469
future start(Args &&... args) noexcept
Definition: sharded.hh:609
future invoke_on_all(Func func, Args... args) noexcept
Definition: sharded.hh:298
future< T > get_future() noexcept
Gets the promise's associated future.
Definition: future.hh:1926
future< T > current_exception_as_future() noexcept
Returns std::current_exception() wrapped in a future.
Definition: future.hh:1962
auto do_with(T1 &&rv1, T2 &&rv2, More &&... more) noexcept
Definition: do_with.hh:135
auto map_reduce(Iterator begin, Iterator end, Mapper &&mapper, Reducer &&r) -> typename reducer_traits< Reducer >::future_type
Definition: map_reduce.hh:107
future parallel_for_each(Iterator begin, Sentinel end, Func &&func) noexcept
Run tasks in parallel (iterator version).
Definition: loop.hh:565
foreign_ptr< T > make_foreign(T ptr)
Definition: sharded.hh:963
std::future< T > submit_to(instance &instance, unsigned shard, Func func)
Definition: alien.hh:204
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
shard_id this_shard_id() noexcept
Returns shard_id of the of the current shard.
Definition: shard_id.hh:52
Definition: shared_ptr.hh:659
Definition: is_smart_ptr.hh:30
Options controlling the behaviour of smp::submit_to().
Definition: smp.hh:168