Seastar
High performance C++ framework for concurrent servers
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Modules Pages
sharded.hh
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18/*
19 * Copyright (C) 2015 Cloudius Systems, Ltd.
20 */
21
22#pragma once
23
24#include <seastar/core/smp.hh>
25#include <seastar/core/loop.hh>
26#include <seastar/core/map_reduce.hh>
27#include <seastar/core/internal/run_in_background.hh>
28#include <seastar/util/is_smart_ptr.hh>
29#include <seastar/util/tuple_utils.hh>
30#include <seastar/core/do_with.hh>
31#include <seastar/util/assert.hh>
32#include <seastar/util/log.hh>
33#include <seastar/util/modules.hh>
34
35#ifndef SEASTAR_MODULE
36#include <concepts>
37#include <functional>
38#include <ranges>
39#include <type_traits>
40#endif
41
51
52namespace seastar {
53
54SEASTAR_MODULE_EXPORT_BEGIN
55
56template <typename Func, typename... Param>
57class sharded_parameter;
58
59template <typename Service>
60class sharded;
61
62SEASTAR_MODULE_EXPORT_END
63
64namespace internal {
65
66template <typename Func, typename... Param>
67auto unwrap_sharded_arg(sharded_parameter<Func, Param...> sp);
68
69using on_each_shard_func = std::function<future<> (unsigned shard)>;
70
71future<> sharded_parallel_for_each(unsigned nr_shards, on_each_shard_func on_each_shard) noexcept(std::is_nothrow_move_constructible_v<on_each_shard_func>);
72
73template <typename Service>
74class either_sharded_or_local {
75 sharded<Service>& _sharded;
76public:
77 either_sharded_or_local(sharded<Service>& s) : _sharded(s) {}
78 operator sharded<Service>& ();
79 operator Service& ();
80};
81
82template <typename T>
83struct sharded_unwrap {
84 using evaluated_type = T;
85 using type = T;
86};
87
88template <typename T>
89struct sharded_unwrap<std::reference_wrapper<sharded<T>>> {
90 using evaluated_type = T&;
91 using type = either_sharded_or_local<T>;
92};
93
94template <typename Func, typename... Param>
95struct sharded_unwrap<sharded_parameter<Func, Param...>> {
96 using type = std::invoke_result_t<Func, Param...>;
97};
98
99template <typename T>
100using sharded_unwrap_evaluated_t = typename sharded_unwrap<T>::evaluated_type;
101
102template <typename T>
103using sharded_unwrap_t = typename sharded_unwrap<T>::type;
104
105template<typename R>
106concept unsigned_range = std::ranges::range<R>
107 && std::is_unsigned_v<std::ranges::range_value_t<R>>;
108
109} // internal
110
111
114
115SEASTAR_MODULE_EXPORT_BEGIN
116
117template <typename T>
118class sharded;
119
125template<typename T>
127 promise<> _freed;
128protected:
129 async_sharded_service() noexcept = default;
130 virtual ~async_sharded_service() {
131 _freed.set_value();
132 }
133 future<> freed() noexcept {
134 return _freed.get_future();
135 }
136 template <typename Service> friend class sharded;
137};
138
139
145template <typename Service>
147 sharded<Service>* _container = nullptr;
148private:
149 template <typename T> friend class sharded;
150 void set_container(sharded<Service>* container) noexcept { _container = container; }
151public:
152 peering_sharded_service() noexcept = default;
156 sharded<Service>& container() noexcept { return *_container; }
157 const sharded<Service>& container() const noexcept { return *_container; }
158};
159
160
162class no_sharded_instance_exception : public std::exception {
163 sstring _msg;
164public:
165 no_sharded_instance_exception() : _msg("sharded instance does not exist") {}
166 explicit no_sharded_instance_exception(sstring type_info)
167 : _msg("sharded instance does not exist: " + type_info) {}
168 virtual const char* what() const noexcept override {
169 return _msg.c_str();
170 }
171};
172
182template <typename Service>
183class sharded {
184 struct entry {
185 shared_ptr<Service> service;
186
187 future<> track_deletion() noexcept {
188 // do not wait for instance to be deleted if it is not going to notify us
189 if constexpr (std::is_base_of_v<async_sharded_service<Service>, Service>) {
190 if (service) {
191 return service->freed();
192 }
193 }
194 return make_ready_future<>();
195 }
196 };
197 std::vector<entry> _instances;
198private:
199 using invoke_on_multiple_func_type = std::function<future<> (Service&)>;
200private:
201 template <typename U, bool async>
202 friend struct shared_ptr_make_helper;
203
204 template <typename T>
205 std::enable_if_t<std::is_base_of_v<peering_sharded_service<T>, T>>
206 set_container(T& service) noexcept {
207 service.set_container(this);
208 }
209
210 template <typename T>
211 std::enable_if_t<!std::is_base_of_v<peering_sharded_service<T>, T>>
212 set_container(T&) noexcept {
213 }
214
216 sharded_parallel_for_each(internal::on_each_shard_func func) noexcept(std::is_nothrow_move_constructible_v<internal::on_each_shard_func>) {
217 return internal::sharded_parallel_for_each(_instances.size(), std::move(func));
218 }
219public:
222 sharded() noexcept {}
223 sharded(const sharded& other) = delete;
224 sharded& operator=(const sharded& other) = delete;
227 sharded(sharded&& other) = delete;
228 sharded& operator=(sharded&& other) = delete;
231
238 template <typename... Args>
239 future<> start(Args&&... args) noexcept;
240
247 template <typename... Args>
248 future<> start_single(Args&&... args) noexcept;
249
254 future<> stop() noexcept;
255
264 future<> invoke_on_all(smp_submit_to_options options, std::function<future<> (Service&)> func) noexcept;
265
271 future<> invoke_on_all(std::function<future<> (Service&)> func) noexcept;
272
287 template <typename Func, typename... Args>
288 requires std::invocable<Func, Service&, internal::sharded_unwrap_t<Args>...>
289 && std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, internal::sharded_unwrap_t<Args>...>>, future<>>
290 future<> invoke_on_all(smp_submit_to_options options, Func func, Args... args) noexcept;
291
297 template <typename Func, typename... Args>
298 requires std::invocable<Func, Service&, internal::sharded_unwrap_t<Args>...>
299 && std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, internal::sharded_unwrap_t<Args>...>>, future<>>
300 future<> invoke_on_all(Func func, Args... args) noexcept;
301
312 template <typename Func, typename... Args>
313 requires std::invocable<Func, Service&, Args...>
314 && std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, Args...>>, future<>>
315 future<> invoke_on_others(smp_submit_to_options options, Func func, Args... args) noexcept;
316
328 template <typename Func, typename... Args>
329 requires std::invocable<Func, Service&, Args...>
330 && std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, Args...>>, future<>>
331 future<> invoke_on_others(Func func, Args... args) noexcept;
332
345 template <typename Func, typename... Args, typename Ret = futurize_t<std::invoke_result_t<Func, Service&, Args...>>>
346 requires std::invocable<Func, Service&, Args&&...>
347 Ret
348 invoke_on(unsigned id, smp_submit_to_options options, Func&& func, Args&&... args);
349
358 template <typename Func, typename... Args, typename Ret = futurize_t<std::invoke_result_t<Func, Service&, Args&&...>>>
359 requires std::invocable<Func, Service&, Args&&...>
360 Ret
361 invoke_on(unsigned id, Func&& func, Args&&... args);
362
374 template <typename R, typename Func, typename... Args>
375 requires std::invocable<Func, Service&, Args...>
376 && std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, internal::sharded_unwrap_t<Args>...>>, future<>>
377 && internal::unsigned_range<R>
378 future<>
379 invoke_on(R range, smp_submit_to_options options, Func func, Args... args) noexcept;
380
392 template <typename R, typename Func, typename... Args>
393 requires std::invocable<Func, Service&, Args...>
394 && std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, internal::sharded_unwrap_t<Args>...>>, future<>>
395 && internal::unsigned_range<R>
396 future<>
397 invoke_on(R range, Func func, Args... args) noexcept;
398
403 template <typename Reducer, typename Func, typename... Args>
404 auto map_reduce(Reducer&& r, Func&& func, Args&&... args) -> typename reducer_traits<Reducer>::future_type
405 {
406 auto rng = std::views::iota(size_t(0), _instances.size());
407 return ::seastar::map_reduce(rng.begin(), rng.end(),
408 [this, func = std::forward<Func>(func), args = std::make_tuple(std::forward<Args>(args)...)] (unsigned c) mutable {
409 return smp::submit_to(c, [this, &func, args] () mutable {
410 return std::apply([this, &func] (Args&&... args) mutable {
411 auto inst = get_local_service();
412 return std::invoke(func, *inst, std::forward<Args>(args)...);
413 }, std::move(args));
414 });
415 }, std::forward<Reducer>(r));
416 }
417
419 template <typename Reducer, typename Func, typename... Args>
420 auto map_reduce(Reducer&& r, Func&& func, Args&&... args) const -> typename reducer_traits<Reducer>::future_type
421 {
422 auto rng = std::views::iota(size_t(0), _instances.size());
423 return ::seastar::map_reduce(rng.begin(), rng.end(),
424 [this, func = std::forward<Func>(func), args = std::make_tuple(std::forward<Args>(args)...)] (unsigned c) {
425 return smp::submit_to(c, [this, &func, args] () {
426 return std::apply([this, &func] (Args&&... args) {
427 auto inst = get_local_service();
428 return std::invoke(func, *inst, std::forward<Args>(args)...);
429 }, std::move(args));
430 });
431 }, std::forward<Reducer>(r));
432 }
433
450 template <typename Mapper, typename Initial, typename Reduce>
451 future<Initial>
452 map_reduce0(Mapper map, Initial initial, Reduce reduce) {
453 auto wrapped_map = [this, map] (unsigned c) {
454 return smp::submit_to(c, [this, map] {
455 auto inst = get_local_service();
456 return std::invoke(map, *inst);
457 });
458 };
459 return ::seastar::map_reduce(smp::all_cpus().begin(), smp::all_cpus().end(),
460 std::move(wrapped_map),
461 std::move(initial),
462 std::move(reduce));
463 }
464
466 template <typename Mapper, typename Initial, typename Reduce>
468 map_reduce0(Mapper map, Initial initial, Reduce reduce) const {
469 auto wrapped_map = [this, map] (unsigned c) {
470 return smp::submit_to(c, [this, map] {
471 auto inst = get_local_service();
472 return std::invoke(map, *inst);
473 });
474 };
475 return ::seastar::map_reduce(smp::all_cpus().begin(), smp::all_cpus().end(),
476 std::move(wrapped_map),
477 std::move(initial),
478 std::move(reduce));
479 }
480
490 template <typename Mapper, typename Future = futurize_t<std::invoke_result_t<Mapper,Service&>>, typename return_type = decltype(internal::untuple(std::declval<typename Future::tuple_type>()))>
492 return do_with(std::vector<return_type>(), std::move(mapper),
493 [this] (std::vector<return_type>& vec, Mapper& mapper) mutable {
494 vec.resize(_instances.size());
495 return parallel_for_each(std::views::iota(0u, _instances.size()), [this, &vec, &mapper] (unsigned c) {
496 return smp::submit_to(c, [this, &mapper] {
497 auto inst = get_local_service();
498 return mapper(*inst);
499 }).then([&vec, c] (auto&& res) {
500 vec[c] = std::move(res);
501 });
502 }).then([&vec] {
503 return make_ready_future<std::vector<return_type>>(std::move(vec));
504 });
505 });
506 }
507
509 const Service& local() const noexcept;
510
512 Service& local() noexcept;
513
515 shared_ptr<Service> local_shared() noexcept;
516
518 bool local_is_initialized() const noexcept;
519
520private:
521 template <typename... Args>
522 shared_ptr<Service> create_local_service(Args&&... args) {
523 auto s = ::seastar::make_shared<Service>(std::forward<Args>(args)...);
524 set_container(*s);
525 return s;
526 }
527
528 shared_ptr<Service> get_local_service() {
529 auto inst = _instances[this_shard_id()].service;
530 if (!inst) {
531 throw no_sharded_instance_exception(pretty_type_name(typeid(Service)));
532 }
533 return inst;
534 }
535
536 shared_ptr<const Service> get_local_service() const {
537 auto inst = _instances[this_shard_id()].service;
538 if (!inst) {
539 throw no_sharded_instance_exception(pretty_type_name(typeid(Service)));
540 }
541 return inst;
542 }
543};
544
545
550template <typename Func, typename... Params>
552 Func _func;
553 std::tuple<Params...> _params;
554public:
563 explicit sharded_parameter(Func func, Params... params)
564 requires std::invocable<Func, internal::sharded_unwrap_evaluated_t<Params>...>
565 : _func(std::move(func)), _params(std::make_tuple(std::move(params)...)) {
566 }
567private:
568 auto evaluate() const;
569
570 template <typename Func_, typename... Param_>
571 friend auto internal::unwrap_sharded_arg(sharded_parameter<Func_, Param_...> sp);
572};
573
577SEASTAR_MODULE_EXPORT_END
579
580template <typename Service>
582 SEASTAR_ASSERT(_instances.empty());
583}
584
585namespace internal {
586
587template <typename T>
588T&&
589unwrap_sharded_arg(T&& arg) {
590 return std::forward<T>(arg);
591}
592
593template <typename Service>
594either_sharded_or_local<Service>
595unwrap_sharded_arg(std::reference_wrapper<sharded<Service>> arg) {
596 return either_sharded_or_local<Service>(arg);
597}
598
599template <typename Func, typename... Param>
600auto
601unwrap_sharded_arg(sharded_parameter<Func, Param...> sp) {
602 return sp.evaluate();
603}
604
605template <typename Service>
606either_sharded_or_local<Service>::operator sharded<Service>& () { return _sharded; }
607
608template <typename Service>
609either_sharded_or_local<Service>::operator Service& () { return _sharded.local(); }
610
611}
612
613template <typename Func, typename... Param>
614auto
615sharded_parameter<Func, Param...>::evaluate() const {
616 auto unwrap_params_and_invoke = [this] (const auto&... params) {
617 return std::invoke(_func, internal::unwrap_sharded_arg(params)...);
618 };
619 return std::apply(unwrap_params_and_invoke, _params);
620}
621
622template <typename Service>
623template <typename... Args>
624future<>
625sharded<Service>::start(Args&&... args) noexcept {
626 try {
627 _instances.resize(smp::count);
628 return sharded_parallel_for_each(
629 [this, args = std::make_tuple(std::forward<Args>(args)...)] (unsigned c) mutable {
630 return smp::submit_to(c, [this, args] () mutable {
631 _instances[this_shard_id()].service = std::apply([this] (Args... args) {
632 return create_local_service(internal::unwrap_sharded_arg(std::forward<Args>(args))...);
633 }, args);
634 });
635 }).then_wrapped([this] (future<> f) {
636 try {
637 f.get();
638 return make_ready_future<>();
639 } catch (...) {
640 return this->stop().then([e = std::current_exception()] () mutable {
641 std::rethrow_exception(e);
642 });
643 }
644 });
645 } catch (...) {
647 }
648}
649
650template <typename Service>
651template <typename... Args>
653sharded<Service>::start_single(Args&&... args) noexcept {
654 try {
655 SEASTAR_ASSERT(_instances.empty());
656 _instances.resize(1);
657 return smp::submit_to(0, [this, args = std::make_tuple(std::forward<Args>(args)...)] () mutable {
658 _instances[0].service = std::apply([this] (Args... args) {
659 return create_local_service(internal::unwrap_sharded_arg(std::forward<Args>(args))...);
660 }, args);
661 }).then_wrapped([this] (future<> f) {
662 try {
663 f.get();
664 return make_ready_future<>();
665 } catch (...) {
666 return this->stop().then([e = std::current_exception()] () mutable {
667 std::rethrow_exception(e);
668 });
669 }
670 });
671 } catch (...) {
673 }
674}
675
676namespace internal {
677
678// Helper check if Service::stop exists
679
680struct sharded_has_stop {
681 // If a member names "stop" exists, try to call it, even if it doesn't
682 // have the correct signature. This is so that we don't ignore a function
683 // named stop() just because the signature is incorrect, and instead
684 // force the user to resolve the ambiguity.
685 template <typename Service>
686 constexpr static auto check(int) -> std::enable_if_t<(sizeof(&Service::stop) >= 0), bool> {
687 return true;
688 }
689
690 // Fallback in case Service::stop doesn't exist.
691 template<typename>
692 static constexpr auto check(...) -> bool {
693 return false;
694 }
695};
696
697template <bool stop_exists>
698struct sharded_call_stop {
699 template <typename Service>
700 static future<> call(Service& instance);
701};
702
703template <>
704template <typename Service>
705future<> sharded_call_stop<true>::call(Service& instance) {
706 return instance.stop();
707}
708
709template <>
710template <typename Service>
711future<> sharded_call_stop<false>::call(Service&) {
712 return make_ready_future<>();
713}
714
715template <typename Service>
716future<>
717stop_sharded_instance(Service& instance) {
718 constexpr bool has_stop = internal::sharded_has_stop::check<Service>(0);
719 return internal::sharded_call_stop<has_stop>::call(instance);
720}
721
722}
723
724template <typename Service>
725future<>
727 try {
728 return sharded_parallel_for_each([this] (unsigned c) mutable {
729 return smp::submit_to(c, [this] () mutable {
730 auto inst = _instances[this_shard_id()].service;
731 if (!inst) {
732 return make_ready_future<>();
733 }
734 return internal::stop_sharded_instance(*inst);
735 });
736 }).then_wrapped([this] (future<> fut) {
737 return sharded_parallel_for_each([this] (unsigned c) {
738 return smp::submit_to(c, [this] {
739 auto fut = _instances[this_shard_id()].track_deletion();
740 _instances[this_shard_id()].service = nullptr;
741 return fut;
742 });
743 }).finally([this, fut = std::move(fut)] () mutable {
744 _instances.clear();
745 _instances = std::vector<sharded<Service>::entry>();
746 return std::move(fut);
747 });
748 });
749 } catch (...) {
751 }
752}
753
754template <typename Service>
756sharded<Service>::invoke_on_all(smp_submit_to_options options, std::function<future<> (Service&)> func) noexcept {
757 try {
758 return sharded_parallel_for_each([this, options, func = std::move(func)] (unsigned c) {
759 return smp::submit_to(c, options, [this, func] {
760 return func(*get_local_service());
761 });
762 });
763 } catch (...) {
765 }
766}
767
768template <typename Service>
770sharded<Service>::invoke_on_all(std::function<future<> (Service&)> func) noexcept {
771 try {
772 return invoke_on_all(smp_submit_to_options{}, std::move(func));
773 } catch (...) {
775 }
776}
777
778template <typename Service>
779template <typename Func, typename... Args>
780requires std::invocable<Func, Service&, internal::sharded_unwrap_t<Args>...>
781 && std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, internal::sharded_unwrap_t<Args>...>>, future<>>
783sharded<Service>::invoke_on_all(smp_submit_to_options options, Func func, Args... args) noexcept {
784 try {
785 return invoke_on_all(options, invoke_on_multiple_func_type([func = std::move(func), args = std::tuple(std::move(args)...)] (Service& service) mutable {
786 return std::apply([&service, &func] (Args&&... args) mutable {
787 return futurize_apply(func, std::tuple_cat(std::forward_as_tuple(service), std::tuple(internal::unwrap_sharded_arg(std::forward<Args>(args))...)));
788 }, std::move(args));
789 }));
790 } catch (...) {
792 }
793}
794
795template <typename Service>
796template <typename Func, typename... Args>
797requires std::invocable<Func, Service&, internal::sharded_unwrap_t<Args>...>
798 && std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, internal::sharded_unwrap_t<Args>...>>, future<>>
800sharded<Service>::invoke_on_all(Func func, Args... args) noexcept {
801 try {
802 return invoke_on_all(smp_submit_to_options{}, std::move(func), std::move(args)...);
803 } catch (...) {
805 }
806}
807
808template <typename Service>
809template <typename Func, typename... Args>
810requires std::invocable<Func, Service&, Args...>
811 && std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, Args...>>, future<>>
813sharded<Service>::invoke_on_others(smp_submit_to_options options, Func func, Args... args) noexcept {
814 try {
815 return invoke_on_all(options, [orig = this_shard_id(), func = std::move(func), args = std::tuple(std::move(args)...)] (Service& s) mutable -> future<> {
816 return this_shard_id() == orig ? make_ready_future<>() : futurize_apply(func, std::tuple_cat(std::forward_as_tuple(s), args));;
817 });
818 } catch (...) {
820 }
821}
822
823template <typename Service>
824template <typename Func, typename... Args>
825requires std::invocable<Func, Service&, Args...>
826 && std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, Args...>>, future<>>
828sharded<Service>::invoke_on_others(Func func, Args... args) noexcept {
829 try {
830 return invoke_on_others(smp_submit_to_options{}, std::move(func), std::move(args)...);
831 } catch (...) {
833 }
834}
835
836template <typename Service>
837template <typename Func, typename... Args, typename Ret>
838requires std::invocable<Func, Service&, Args&&...>
839Ret
840sharded<Service>::invoke_on(unsigned id, smp_submit_to_options options, Func&& func, Args&&... args) {
841 return smp::submit_to(id, options, [this, func = std::forward<Func>(func), args = std::tuple(std::move(args)...)] () mutable {
842 auto inst = get_local_service();
843 return std::apply(std::forward<Func>(func), std::tuple_cat(std::forward_as_tuple(*inst), std::move(args)));
844 });
845}
846
847template <typename Service>
848template <typename Func, typename... Args, typename Ret>
849requires std::invocable<Func, Service&, Args&&...>
850Ret
851sharded<Service>::invoke_on(unsigned id, Func&& func, Args&&... args) {
852 return invoke_on(id, smp_submit_to_options(), std::forward<Func>(func), std::forward<Args>(args)...);
853}
854
855template <typename Service>
856template <typename R, typename Func, typename... Args>
857requires std::invocable<Func, Service&, Args...>
858 && std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, internal::sharded_unwrap_t<Args>...>>, future<>>
859 && internal::unsigned_range<R>
861sharded<Service>::invoke_on(R range, smp_submit_to_options options, Func func, Args... args) noexcept {
862 try {
863 auto func_futurized = invoke_on_multiple_func_type([func = std::move(func), args = std::tuple(std::move(args)...)] (Service& service) mutable {
864 // Avoid false-positive unused-lambda-capture warning on Clang
865 (void)args;
866 return futurize_apply(func, std::tuple_cat(std::forward_as_tuple(service), std::tuple(internal::unwrap_sharded_arg(std::forward<Args>(args))...)));
867 });
868 return parallel_for_each(range, [this, options, func = std::move(func_futurized)] (unsigned s) {
869 if (s > smp::count - 1) {
870 throw std::invalid_argument(format("Invalid shard id in range: {}. Must be in range [0,{})", s, smp::count));
871 }
872 return smp::submit_to(s, options, [this, func] {
873 return func(*get_local_service());
874 });
875 });
876 } catch(...) {
878 }
879}
880
881template <typename Service>
882template <typename R, typename Func, typename... Args>
883requires std::invocable<Func, Service&, Args...>
884 && std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, internal::sharded_unwrap_t<Args>...>>, future<>>
885 && internal::unsigned_range<R>
887sharded<Service>::invoke_on(R range, Func func, Args... args) noexcept {
888 try {
889 return invoke_on(std::forward<R>(range), smp_submit_to_options{}, std::move(func), std::move(args)...);
890 } catch(...) {
892 }
893}
894
895template <typename Service>
896const Service& sharded<Service>::local() const noexcept {
897 SEASTAR_ASSERT(local_is_initialized());
898 return *_instances[this_shard_id()].service;
899}
900
901template <typename Service>
902Service& sharded<Service>::local() noexcept {
903 SEASTAR_ASSERT(local_is_initialized());
904 return *_instances[this_shard_id()].service;
905}
906
907template <typename Service>
909 SEASTAR_ASSERT(local_is_initialized());
910 return _instances[this_shard_id()].service;
911}
912
913template <typename Service>
915 return _instances.size() > this_shard_id() &&
916 _instances[this_shard_id()].service;
917}
918
919SEASTAR_MODULE_EXPORT_BEGIN
944template <typename PtrType>
945requires (!std::is_pointer_v<PtrType>)
947private:
948 PtrType _value;
949 unsigned _cpu;
950private:
951 void destroy(PtrType p, unsigned cpu) noexcept {
952 // `destroy()` is called from the destructor and other
953 // synchronous methods (like `reset()`), that have no way to
954 // wait for this future.
955 auto f = destroy_on(std::move(p), cpu);
956 if (!f.available() || f.failed()) {
957 internal::run_in_background(std::move(f));
958 }
959 }
960
961 static future<> destroy_on(PtrType p, unsigned cpu) noexcept {
962 if (p) {
963 if (cpu != this_shard_id()) {
964 return smp::submit_to(cpu, [v = std::move(p)] () mutable {
965 // Destroy the contained pointer. We do this explicitly
966 // in the current shard, because the lambda is destroyed
967 // in the shard that submitted the task.
968 v = {};
969 });
970 } else {
971 p = {};
972 }
973 }
974 return make_ready_future<>();
975 }
976public:
977 using element_type = typename std::pointer_traits<PtrType>::element_type;
978 using pointer = element_type*;
979
981 foreign_ptr() noexcept(std::is_nothrow_default_constructible_v<PtrType>)
982 : _value(PtrType())
983 , _cpu(this_shard_id()) {
984 }
986 foreign_ptr(std::nullptr_t) noexcept(std::is_nothrow_default_constructible_v<foreign_ptr>) : foreign_ptr() {}
988 foreign_ptr(PtrType value) noexcept(std::is_nothrow_move_constructible_v<PtrType>)
989 : _value(std::move(value))
990 , _cpu(this_shard_id()) {
991 }
992 // The type is intentionally non-copyable because copies
993 // are expensive because each copy requires across-CPU call.
994 foreign_ptr(const foreign_ptr&) = delete;
996 foreign_ptr(foreign_ptr&& other) noexcept(std::is_nothrow_move_constructible_v<PtrType>) = default;
999 destroy(std::move(_value), _cpu);
1000 }
1002 future<foreign_ptr> copy() const noexcept {
1003 return smp::submit_to(_cpu, [this] () mutable {
1004 auto v = _value;
1005 return make_foreign(std::move(v));
1006 });
1007 }
1009 element_type& operator*() const noexcept(noexcept(*_value)) { return *_value; }
1011 element_type* operator->() const noexcept(noexcept(&*_value)) { return &*_value; }
1013 pointer get() const noexcept(noexcept(&*_value)) { return &*_value; }
1018 unsigned get_owner_shard() const noexcept { return _cpu; }
1020 operator bool() const noexcept(noexcept(static_cast<bool>(_value))) { return static_cast<bool>(_value); }
1022 foreign_ptr& operator=(foreign_ptr&& other) noexcept(std::is_nothrow_move_constructible_v<PtrType>) {
1023 destroy(std::move(_value), _cpu);
1024 _value = std::move(other._value);
1025 _cpu = other._cpu;
1026 return *this;
1027 }
1033 PtrType release() noexcept(std::is_nothrow_default_constructible_v<PtrType>) {
1034 return std::exchange(_value, {});
1035 }
1039 void reset(PtrType new_ptr) noexcept(std::is_nothrow_move_constructible_v<PtrType>) {
1040 auto old_ptr = std::move(_value);
1041 auto old_cpu = _cpu;
1042
1043 _value = std::move(new_ptr);
1044 _cpu = this_shard_id();
1045
1046 destroy(std::move(old_ptr), old_cpu);
1047 }
1051 void reset(std::nullptr_t = nullptr) noexcept(std::is_nothrow_default_constructible_v<PtrType>) {
1052 reset(PtrType());
1053 }
1054
1058 future<> destroy() noexcept {
1059 return destroy_on(std::move(_value), _cpu);
1060 }
1061};
1062
1066template <typename T>
1068 return foreign_ptr<T>(std::move(ptr));
1069}
1070
1072
1073template<typename T>
1074struct is_smart_ptr<foreign_ptr<T>> : std::true_type {};
1075
1076SEASTAR_MODULE_EXPORT_END
1077
1078}
Definition: sharded.hh:126
Definition: shared_ptr.hh:493
Definition: sharded.hh:946
pointer get() const noexcept(noexcept(&*_value))
Access the raw pointer to the wrapped object.
Definition: sharded.hh:1013
foreign_ptr(std::nullptr_t) noexcept(std::is_nothrow_default_constructible_v< foreign_ptr >)
Constructs a null foreign_ptr<>.
Definition: sharded.hh:986
element_type * operator->() const noexcept(noexcept(&*_value))
Accesses the wrapped object.
Definition: sharded.hh:1011
void reset(PtrType new_ptr) noexcept(std::is_nothrow_move_constructible_v< PtrType >)
Definition: sharded.hh:1039
unsigned get_owner_shard() const noexcept
Definition: sharded.hh:1018
~foreign_ptr()
Destroys the wrapped object on its original cpu.
Definition: sharded.hh:998
foreign_ptr(PtrType value) noexcept(std::is_nothrow_move_constructible_v< PtrType >)
Wraps a pointer object and remembers the current core.
Definition: sharded.hh:988
foreign_ptr(foreign_ptr &&other) noexcept(std::is_nothrow_move_constructible_v< PtrType >)=default
Moves a foreign_ptr<> to another object.
void reset(std::nullptr_t=nullptr) noexcept(std::is_nothrow_default_constructible_v< PtrType >)
Definition: sharded.hh:1051
foreign_ptr() noexcept(std::is_nothrow_default_constructible_v< PtrType >)
Constructs a null foreign_ptr<>.
Definition: sharded.hh:981
foreign_ptr & operator=(foreign_ptr &&other) noexcept(std::is_nothrow_move_constructible_v< PtrType >)
Move-assigns a foreign_ptr<>.
Definition: sharded.hh:1022
future< foreign_ptr > copy() const noexcept
Creates a copy of this foreign ptr. Only works if the stored ptr is copyable.
Definition: sharded.hh:1002
future destroy() noexcept
Definition: sharded.hh:1058
PtrType release() noexcept(std::is_nothrow_default_constructible_v< PtrType >)
Definition: sharded.hh:1033
element_type & operator*() const noexcept(noexcept(*_value))
Accesses the wrapped object.
Definition: sharded.hh:1009
A representation of a possibly not-yet-computed value.
Definition: future.hh:1197
Result then(Func &&func) noexcept
Schedule a block of code to run when the future is ready.
Definition: future.hh:1358
value_type && get()
gets the value returned by the computation
Definition: future.hh:1299
Exception thrown when a sharded object does not exist.
Definition: sharded.hh:162
Provide a sharded service with access to its peers.
Definition: sharded.hh:146
void set_value(A &&... a) noexcept
Sets the promises value.
Definition: future.hh:968
Helper to pass a parameter to a sharded<> object that depends on the shard. It is evaluated on the sh...
Definition: sharded.hh:551
sharded_parameter(Func func, Params... params)
Definition: sharded.hh:563
Definition: sharded.hh:183
sharded(sharded &&other)=delete
~sharded()
Destroyes a sharded object. Must not be in a started state.
Definition: sharded.hh:581
future invoke_on_all(smp_submit_to_options options, std::function< future<>(Service &)> func) noexcept
Definition: sharded.hh:756
future start_single(Args &&... args) noexcept
Definition: sharded.hh:653
const Service & local() const noexcept
Gets a reference to the local instance.
Definition: sharded.hh:896
future< std::vector< return_type > > map(Mapper mapper)
Definition: sharded.hh:491
future< Initial > map_reduce0(Mapper map, Initial initial, Reduce reduce) const
The const version of map_reduce0(Mapper map, Initial initial, Reduce reduce)
Definition: sharded.hh:468
auto map_reduce(Reducer &&r, Func &&func, Args &&... args) const -> typename reducer_traits< Reducer >::future_type
The const version of map_reduce(Reducer&& r, Func&& func)
Definition: sharded.hh:420
sharded() noexcept
Definition: sharded.hh:222
future stop() noexcept
Definition: sharded.hh:726
future invoke_on_others(smp_submit_to_options options, Func func, Args... args) noexcept
Definition: sharded.hh:813
auto map_reduce(Reducer &&r, Func &&func, Args &&... args) -> typename reducer_traits< Reducer >::future_type
Definition: sharded.hh:404
future< Initial > map_reduce0(Mapper map, Initial initial, Reduce reduce)
Definition: sharded.hh:452
Ret invoke_on(unsigned id, smp_submit_to_options options, Func &&func, Args &&... args)
Definition: sharded.hh:840
future start(Args &&... args) noexcept
Definition: sharded.hh:625
future< T > get_future() noexcept
Gets the promise's associated future.
Definition: future.hh:1852
future< T > current_exception_as_future() noexcept
Returns std::current_exception() wrapped in a future.
Definition: future.hh:1888
auto do_with(T1 &&rv1, T2 &&rv2, More &&... more) noexcept
Definition: do_with.hh:135
auto map_reduce(Iterator begin, Iterator end, Mapper &&mapper, Reducer &&r) -> typename reducer_traits< Reducer >::future_type
Definition: map_reduce.hh:107
future parallel_for_each(Iterator begin, Sentinel end, Func &&func) noexcept
Run tasks in parallel (iterator version).
Definition: loop.hh:572
foreign_ptr< T > make_foreign(T ptr)
Definition: sharded.hh:1067
std::future< T > submit_to(instance &instance, unsigned shard, Func func)
Definition: alien.hh:204
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
sstring format(fmt::format_string< A... > fmt, A &&... a)
Definition: format.hh:42
shard_id this_shard_id() noexcept
Returns shard_id of the of the current shard.
Definition: shard_id.hh:52
Definition: shared_ptr.hh:653
STL namespace.
Definition: is_smart_ptr.hh:30
Options controlling the behaviour of smp::submit_to().
Definition: smp.hh:168