Seastar
High performance C++ framework for concurrent servers
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Modules Pages
sharded.hh
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18/*
19 * Copyright (C) 2015 Cloudius Systems, Ltd.
20 */
21
22#pragma once
23
24#include <seastar/core/smp.hh>
25#include <seastar/core/loop.hh>
26#include <seastar/core/map_reduce.hh>
27#include <seastar/core/internal/run_in_background.hh>
28#include <seastar/util/is_smart_ptr.hh>
29#include <seastar/util/tuple_utils.hh>
30#include <seastar/core/do_with.hh>
31#include <seastar/util/log.hh>
32#include <seastar/util/modules.hh>
33
34#ifndef SEASTAR_MODULE
35#include <concepts>
36#include <functional>
37#include <ranges>
38#include <type_traits>
39#endif
40
50
51namespace seastar {
52
53SEASTAR_MODULE_EXPORT_BEGIN
54
55template <typename Func, typename... Param>
56class sharded_parameter;
57
58template <typename Service>
59class sharded;
60
61SEASTAR_MODULE_EXPORT_END
62
63namespace internal {
64
65template <typename Func, typename... Param>
66auto unwrap_sharded_arg(sharded_parameter<Func, Param...> sp);
67
68using on_each_shard_func = std::function<future<> (unsigned shard)>;
69
70future<> sharded_parallel_for_each(unsigned nr_shards, on_each_shard_func on_each_shard) noexcept(std::is_nothrow_move_constructible_v<on_each_shard_func>);
71
72template <typename Service>
73class either_sharded_or_local {
74 sharded<Service>& _sharded;
75public:
76 either_sharded_or_local(sharded<Service>& s) : _sharded(s) {}
77 operator sharded<Service>& ();
78 operator Service& ();
79};
80
81template <typename T>
82struct sharded_unwrap {
83 using evaluated_type = T;
84 using type = T;
85};
86
87template <typename T>
88struct sharded_unwrap<std::reference_wrapper<sharded<T>>> {
89 using evaluated_type = T&;
90 using type = either_sharded_or_local<T>;
91};
92
93template <typename Func, typename... Param>
94struct sharded_unwrap<sharded_parameter<Func, Param...>> {
95 using type = std::invoke_result_t<Func, Param...>;
96};
97
98template <typename T>
99using sharded_unwrap_evaluated_t = typename sharded_unwrap<T>::evaluated_type;
100
101template <typename T>
102using sharded_unwrap_t = typename sharded_unwrap<T>::type;
103
104template<typename R>
105concept unsigned_range = std::ranges::range<R>
106 && std::is_unsigned_v<std::ranges::range_value_t<R>>;
107
108} // internal
109
110
113
114SEASTAR_MODULE_EXPORT_BEGIN
115
116template <typename T>
117class sharded;
118
124template<typename T>
126 promise<> _freed;
127protected:
128 async_sharded_service() noexcept = default;
129 virtual ~async_sharded_service() {
130 _freed.set_value();
131 }
132 future<> freed() noexcept {
133 return _freed.get_future();
134 }
135 template <typename Service> friend class sharded;
136};
137
138
144template <typename Service>
146 sharded<Service>* _container = nullptr;
147private:
148 template <typename T> friend class sharded;
149 void set_container(sharded<Service>* container) noexcept { _container = container; }
150public:
151 peering_sharded_service() noexcept = default;
155 sharded<Service>& container() noexcept { return *_container; }
156 const sharded<Service>& container() const noexcept { return *_container; }
157};
158
159
161class no_sharded_instance_exception : public std::exception {
162 sstring _msg;
163public:
164 no_sharded_instance_exception() : _msg("sharded instance does not exist") {}
165 explicit no_sharded_instance_exception(sstring type_info)
166 : _msg("sharded instance does not exist: " + type_info) {}
167 virtual const char* what() const noexcept override {
168 return _msg.c_str();
169 }
170};
171
181template <typename Service>
182class sharded {
183 struct entry {
184 shared_ptr<Service> service;
185
186 future<> track_deletion() noexcept {
187 // do not wait for instance to be deleted if it is not going to notify us
188 if constexpr (std::is_base_of_v<async_sharded_service<Service>, Service>) {
189 if (service) {
190 return service->freed();
191 }
192 }
193 return make_ready_future<>();
194 }
195 };
196 std::vector<entry> _instances;
197private:
198 using invoke_on_multiple_func_type = std::function<future<> (Service&)>;
199private:
200 template <typename U, bool async>
201 friend struct shared_ptr_make_helper;
202
203 template <typename T>
204 std::enable_if_t<std::is_base_of_v<peering_sharded_service<T>, T>>
205 set_container(T& service) noexcept {
206 service.set_container(this);
207 }
208
209 template <typename T>
210 std::enable_if_t<!std::is_base_of_v<peering_sharded_service<T>, T>>
211 set_container(T&) noexcept {
212 }
213
215 sharded_parallel_for_each(internal::on_each_shard_func func) noexcept(std::is_nothrow_move_constructible_v<internal::on_each_shard_func>) {
216 return internal::sharded_parallel_for_each(_instances.size(), std::move(func));
217 }
218public:
221 sharded() noexcept {}
222 sharded(const sharded& other) = delete;
223 sharded& operator=(const sharded& other) = delete;
226 sharded(sharded&& other) = delete;
227 sharded& operator=(sharded&& other) = delete;
230
237 template <typename... Args>
238 future<> start(Args&&... args) noexcept;
239
246 template <typename... Args>
247 future<> start_single(Args&&... args) noexcept;
248
253 future<> stop() noexcept;
254
263 future<> invoke_on_all(smp_submit_to_options options, std::function<future<> (Service&)> func) noexcept;
264
270 future<> invoke_on_all(std::function<future<> (Service&)> func) noexcept;
271
286 template <typename Func, typename... Args>
287 requires std::invocable<Func, Service&, internal::sharded_unwrap_t<Args>...>
288 && std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, internal::sharded_unwrap_t<Args>...>>, future<>>
289 future<> invoke_on_all(smp_submit_to_options options, Func func, Args... args) noexcept;
290
296 template <typename Func, typename... Args>
297 requires std::invocable<Func, Service&, internal::sharded_unwrap_t<Args>...>
298 && std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, internal::sharded_unwrap_t<Args>...>>, future<>>
299 future<> invoke_on_all(Func func, Args... args) noexcept;
300
311 template <typename Func, typename... Args>
312 requires std::invocable<Func, Service&, Args...>
313 && std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, Args...>>, future<>>
314 future<> invoke_on_others(smp_submit_to_options options, Func func, Args... args) noexcept;
315
327 template <typename Func, typename... Args>
328 requires std::invocable<Func, Service&, Args...>
329 && std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, Args...>>, future<>>
330 future<> invoke_on_others(Func func, Args... args) noexcept;
331
344 template <typename Func, typename... Args, typename Ret = futurize_t<std::invoke_result_t<Func, Service&, Args...>>>
345 requires std::invocable<Func, Service&, Args&&...>
346 Ret
347 invoke_on(unsigned id, smp_submit_to_options options, Func&& func, Args&&... args);
348
357 template <typename Func, typename... Args, typename Ret = futurize_t<std::invoke_result_t<Func, Service&, Args&&...>>>
358 requires std::invocable<Func, Service&, Args&&...>
359 Ret
360 invoke_on(unsigned id, Func&& func, Args&&... args);
361
373 template <typename R, typename Func, typename... Args>
374 requires std::invocable<Func, Service&, Args...>
375 && std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, internal::sharded_unwrap_t<Args>...>>, future<>>
376 && internal::unsigned_range<R>
377 future<>
378 invoke_on(R range, smp_submit_to_options options, Func func, Args... args) noexcept;
379
391 template <typename R, typename Func, typename... Args>
392 requires std::invocable<Func, Service&, Args...>
393 && std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, internal::sharded_unwrap_t<Args>...>>, future<>>
394 && internal::unsigned_range<R>
395 future<>
396 invoke_on(R range, Func func, Args... args) noexcept;
397
402 template <typename Reducer, typename Func, typename... Args>
403 auto map_reduce(Reducer&& r, Func&& func, Args&&... args) -> typename reducer_traits<Reducer>::future_type
404 {
405 auto rng = std::views::iota(size_t(0), _instances.size());
406 return ::seastar::map_reduce(rng.begin(), rng.end(),
407 [this, func = std::forward<Func>(func), args = std::make_tuple(std::forward<Args>(args)...)] (unsigned c) mutable {
408 return smp::submit_to(c, [this, &func, args] () mutable {
409 return std::apply([this, &func] (Args&&... args) mutable {
410 auto inst = get_local_service();
411 return std::invoke(func, *inst, std::forward<Args>(args)...);
412 }, std::move(args));
413 });
414 }, std::forward<Reducer>(r));
415 }
416
418 template <typename Reducer, typename Func, typename... Args>
419 auto map_reduce(Reducer&& r, Func&& func, Args&&... args) const -> typename reducer_traits<Reducer>::future_type
420 {
421 auto rng = std::views::iota(size_t(0), _instances.size());
422 return ::seastar::map_reduce(rng.begin(), rng.end(),
423 [this, func = std::forward<Func>(func), args = std::make_tuple(std::forward<Args>(args)...)] (unsigned c) {
424 return smp::submit_to(c, [this, &func, args] () {
425 return std::apply([this, &func] (Args&&... args) {
426 auto inst = get_local_service();
427 return std::invoke(func, *inst, std::forward<Args>(args)...);
428 }, std::move(args));
429 });
430 }, std::forward<Reducer>(r));
431 }
432
449 template <typename Mapper, typename Initial, typename Reduce>
450 future<Initial>
451 map_reduce0(Mapper map, Initial initial, Reduce reduce) {
452 auto wrapped_map = [this, map] (unsigned c) {
453 return smp::submit_to(c, [this, map] {
454 auto inst = get_local_service();
455 return std::invoke(map, *inst);
456 });
457 };
458 return ::seastar::map_reduce(smp::all_cpus().begin(), smp::all_cpus().end(),
459 std::move(wrapped_map),
460 std::move(initial),
461 std::move(reduce));
462 }
463
465 template <typename Mapper, typename Initial, typename Reduce>
467 map_reduce0(Mapper map, Initial initial, Reduce reduce) const {
468 auto wrapped_map = [this, map] (unsigned c) {
469 return smp::submit_to(c, [this, map] {
470 auto inst = get_local_service();
471 return std::invoke(map, *inst);
472 });
473 };
474 return ::seastar::map_reduce(smp::all_cpus().begin(), smp::all_cpus().end(),
475 std::move(wrapped_map),
476 std::move(initial),
477 std::move(reduce));
478 }
479
489 template <typename Mapper, typename Future = futurize_t<std::invoke_result_t<Mapper,Service&>>, typename return_type = decltype(internal::untuple(std::declval<typename Future::tuple_type>()))>
491 return do_with(std::vector<return_type>(), std::move(mapper),
492 [this] (std::vector<return_type>& vec, Mapper& mapper) mutable {
493 vec.resize(_instances.size());
494 return parallel_for_each(std::views::iota(0u, _instances.size()), [this, &vec, &mapper] (unsigned c) {
495 return smp::submit_to(c, [this, &mapper] {
496 auto inst = get_local_service();
497 return mapper(*inst);
498 }).then([&vec, c] (auto&& res) {
499 vec[c] = std::move(res);
500 });
501 }).then([&vec] {
502 return make_ready_future<std::vector<return_type>>(std::move(vec));
503 });
504 });
505 }
506
508 const Service& local() const noexcept;
509
511 Service& local() noexcept;
512
514 shared_ptr<Service> local_shared() noexcept;
515
517 bool local_is_initialized() const noexcept;
518
519private:
520 template <typename... Args>
521 shared_ptr<Service> create_local_service(Args&&... args) {
522 auto s = ::seastar::make_shared<Service>(std::forward<Args>(args)...);
523 set_container(*s);
524 return s;
525 }
526
527 shared_ptr<Service> get_local_service() {
528 auto inst = _instances[this_shard_id()].service;
529 if (!inst) {
530 throw no_sharded_instance_exception(pretty_type_name(typeid(Service)));
531 }
532 return inst;
533 }
534
535 shared_ptr<const Service> get_local_service() const {
536 auto inst = _instances[this_shard_id()].service;
537 if (!inst) {
538 throw no_sharded_instance_exception(pretty_type_name(typeid(Service)));
539 }
540 return inst;
541 }
542};
543
544
549template <typename Func, typename... Params>
551 Func _func;
552 std::tuple<Params...> _params;
553public:
562 explicit sharded_parameter(Func func, Params... params)
563 requires std::invocable<Func, internal::sharded_unwrap_evaluated_t<Params>...>
564 : _func(std::move(func)), _params(std::make_tuple(std::move(params)...)) {
565 }
566private:
567 auto evaluate() const;
568
569 template <typename Func_, typename... Param_>
570 friend auto internal::unwrap_sharded_arg(sharded_parameter<Func_, Param_...> sp);
571};
572
576SEASTAR_MODULE_EXPORT_END
578
579template <typename Service>
581 assert(_instances.empty());
582}
583
584namespace internal {
585
586template <typename T>
587T&&
588unwrap_sharded_arg(T&& arg) {
589 return std::forward<T>(arg);
590}
591
592template <typename Service>
593either_sharded_or_local<Service>
594unwrap_sharded_arg(std::reference_wrapper<sharded<Service>> arg) {
595 return either_sharded_or_local<Service>(arg);
596}
597
598template <typename Func, typename... Param>
599auto
600unwrap_sharded_arg(sharded_parameter<Func, Param...> sp) {
601 return sp.evaluate();
602}
603
604template <typename Service>
605either_sharded_or_local<Service>::operator sharded<Service>& () { return _sharded; }
606
607template <typename Service>
608either_sharded_or_local<Service>::operator Service& () { return _sharded.local(); }
609
610}
611
612template <typename Func, typename... Param>
613auto
614sharded_parameter<Func, Param...>::evaluate() const {
615 auto unwrap_params_and_invoke = [this] (const auto&... params) {
616 return std::invoke(_func, internal::unwrap_sharded_arg(params)...);
617 };
618 return std::apply(unwrap_params_and_invoke, _params);
619}
620
621template <typename Service>
622template <typename... Args>
623future<>
624sharded<Service>::start(Args&&... args) noexcept {
625 try {
626 _instances.resize(smp::count);
627 return sharded_parallel_for_each(
628 [this, args = std::make_tuple(std::forward<Args>(args)...)] (unsigned c) mutable {
629 return smp::submit_to(c, [this, args] () mutable {
630 _instances[this_shard_id()].service = std::apply([this] (Args... args) {
631 return create_local_service(internal::unwrap_sharded_arg(std::forward<Args>(args))...);
632 }, args);
633 });
634 }).then_wrapped([this] (future<> f) {
635 try {
636 f.get();
637 return make_ready_future<>();
638 } catch (...) {
639 return this->stop().then([e = std::current_exception()] () mutable {
640 std::rethrow_exception(e);
641 });
642 }
643 });
644 } catch (...) {
646 }
647}
648
649template <typename Service>
650template <typename... Args>
652sharded<Service>::start_single(Args&&... args) noexcept {
653 try {
654 assert(_instances.empty());
655 _instances.resize(1);
656 return smp::submit_to(0, [this, args = std::make_tuple(std::forward<Args>(args)...)] () mutable {
657 _instances[0].service = std::apply([this] (Args... args) {
658 return create_local_service(internal::unwrap_sharded_arg(std::forward<Args>(args))...);
659 }, args);
660 }).then_wrapped([this] (future<> f) {
661 try {
662 f.get();
663 return make_ready_future<>();
664 } catch (...) {
665 return this->stop().then([e = std::current_exception()] () mutable {
666 std::rethrow_exception(e);
667 });
668 }
669 });
670 } catch (...) {
672 }
673}
674
675namespace internal {
676
677// Helper check if Service::stop exists
678
679struct sharded_has_stop {
680 // If a member names "stop" exists, try to call it, even if it doesn't
681 // have the correct signature. This is so that we don't ignore a function
682 // named stop() just because the signature is incorrect, and instead
683 // force the user to resolve the ambiguity.
684 template <typename Service>
685 constexpr static auto check(int) -> std::enable_if_t<(sizeof(&Service::stop) >= 0), bool> {
686 return true;
687 }
688
689 // Fallback in case Service::stop doesn't exist.
690 template<typename>
691 static constexpr auto check(...) -> bool {
692 return false;
693 }
694};
695
696template <bool stop_exists>
697struct sharded_call_stop {
698 template <typename Service>
699 static future<> call(Service& instance);
700};
701
702template <>
703template <typename Service>
704future<> sharded_call_stop<true>::call(Service& instance) {
705 return instance.stop();
706}
707
708template <>
709template <typename Service>
710future<> sharded_call_stop<false>::call(Service&) {
711 return make_ready_future<>();
712}
713
714template <typename Service>
715future<>
716stop_sharded_instance(Service& instance) {
717 constexpr bool has_stop = internal::sharded_has_stop::check<Service>(0);
718 return internal::sharded_call_stop<has_stop>::call(instance);
719}
720
721}
722
723template <typename Service>
724future<>
726 try {
727 return sharded_parallel_for_each([this] (unsigned c) mutable {
728 return smp::submit_to(c, [this] () mutable {
729 auto inst = _instances[this_shard_id()].service;
730 if (!inst) {
731 return make_ready_future<>();
732 }
733 return internal::stop_sharded_instance(*inst);
734 });
735 }).then_wrapped([this] (future<> fut) {
736 return sharded_parallel_for_each([this] (unsigned c) {
737 return smp::submit_to(c, [this] {
738 auto fut = _instances[this_shard_id()].track_deletion();
739 _instances[this_shard_id()].service = nullptr;
740 return fut;
741 });
742 }).finally([this, fut = std::move(fut)] () mutable {
743 _instances.clear();
744 _instances = std::vector<sharded<Service>::entry>();
745 return std::move(fut);
746 });
747 });
748 } catch (...) {
750 }
751}
752
753template <typename Service>
755sharded<Service>::invoke_on_all(smp_submit_to_options options, std::function<future<> (Service&)> func) noexcept {
756 try {
757 return sharded_parallel_for_each([this, options, func = std::move(func)] (unsigned c) {
758 return smp::submit_to(c, options, [this, func] {
759 return func(*get_local_service());
760 });
761 });
762 } catch (...) {
764 }
765}
766
767template <typename Service>
769sharded<Service>::invoke_on_all(std::function<future<> (Service&)> func) noexcept {
770 try {
771 return invoke_on_all(smp_submit_to_options{}, std::move(func));
772 } catch (...) {
774 }
775}
776
777template <typename Service>
778template <typename Func, typename... Args>
779requires std::invocable<Func, Service&, internal::sharded_unwrap_t<Args>...>
780 && std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, internal::sharded_unwrap_t<Args>...>>, future<>>
782sharded<Service>::invoke_on_all(smp_submit_to_options options, Func func, Args... args) noexcept {
783 try {
784 return invoke_on_all(options, invoke_on_multiple_func_type([func = std::move(func), args = std::tuple(std::move(args)...)] (Service& service) mutable {
785 return std::apply([&service, &func] (Args&&... args) mutable {
786 return futurize_apply(func, std::tuple_cat(std::forward_as_tuple(service), std::tuple(internal::unwrap_sharded_arg(std::forward<Args>(args))...)));
787 }, std::move(args));
788 }));
789 } catch (...) {
791 }
792}
793
794template <typename Service>
795template <typename Func, typename... Args>
796requires std::invocable<Func, Service&, internal::sharded_unwrap_t<Args>...>
797 && std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, internal::sharded_unwrap_t<Args>...>>, future<>>
799sharded<Service>::invoke_on_all(Func func, Args... args) noexcept {
800 try {
801 return invoke_on_all(smp_submit_to_options{}, std::move(func), std::move(args)...);
802 } catch (...) {
804 }
805}
806
807template <typename Service>
808template <typename Func, typename... Args>
809requires std::invocable<Func, Service&, Args...>
810 && std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, Args...>>, future<>>
812sharded<Service>::invoke_on_others(smp_submit_to_options options, Func func, Args... args) noexcept {
813 try {
814 return invoke_on_all(options, [orig = this_shard_id(), func = std::move(func), args = std::tuple(std::move(args)...)] (Service& s) mutable -> future<> {
815 return this_shard_id() == orig ? make_ready_future<>() : futurize_apply(func, std::tuple_cat(std::forward_as_tuple(s), args));;
816 });
817 } catch (...) {
819 }
820}
821
822template <typename Service>
823template <typename Func, typename... Args>
824requires std::invocable<Func, Service&, Args...>
825 && std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, Args...>>, future<>>
827sharded<Service>::invoke_on_others(Func func, Args... args) noexcept {
828 try {
829 return invoke_on_others(smp_submit_to_options{}, std::move(func), std::move(args)...);
830 } catch (...) {
832 }
833}
834
835template <typename Service>
836template <typename Func, typename... Args, typename Ret>
837requires std::invocable<Func, Service&, Args&&...>
838Ret
839sharded<Service>::invoke_on(unsigned id, smp_submit_to_options options, Func&& func, Args&&... args) {
840 return smp::submit_to(id, options, [this, func = std::forward<Func>(func), args = std::tuple(std::move(args)...)] () mutable {
841 auto inst = get_local_service();
842 return std::apply(std::forward<Func>(func), std::tuple_cat(std::forward_as_tuple(*inst), std::move(args)));
843 });
844}
845
846template <typename Service>
847template <typename Func, typename... Args, typename Ret>
848requires std::invocable<Func, Service&, Args&&...>
849Ret
850sharded<Service>::invoke_on(unsigned id, Func&& func, Args&&... args) {
851 return invoke_on(id, smp_submit_to_options(), std::forward<Func>(func), std::forward<Args>(args)...);
852}
853
854template <typename Service>
855template <typename R, typename Func, typename... Args>
856requires std::invocable<Func, Service&, Args...>
857 && std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, internal::sharded_unwrap_t<Args>...>>, future<>>
858 && internal::unsigned_range<R>
860sharded<Service>::invoke_on(R range, smp_submit_to_options options, Func func, Args... args) noexcept {
861 try {
862 auto func_futurized = invoke_on_multiple_func_type([func = std::move(func), args = std::tuple(std::move(args)...)] (Service& service) mutable {
863 // Avoid false-positive unused-lambda-capture warning on Clang
864 (void)args;
865 return futurize_apply(func, std::tuple_cat(std::forward_as_tuple(service), std::tuple(internal::unwrap_sharded_arg(std::forward<Args>(args))...)));
866 });
867 return parallel_for_each(range, [this, options, func = std::move(func_futurized)] (unsigned s) {
868 if (s > smp::count - 1) {
869 throw std::invalid_argument(format("Invalid shard id in range: {}. Must be in range [0,{})", s, smp::count));
870 }
871 return smp::submit_to(s, options, [this, func] {
872 return func(*get_local_service());
873 });
874 });
875 } catch(...) {
877 }
878}
879
880template <typename Service>
881template <typename R, typename Func, typename... Args>
882requires std::invocable<Func, Service&, Args...>
883 && std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, internal::sharded_unwrap_t<Args>...>>, future<>>
884 && internal::unsigned_range<R>
886sharded<Service>::invoke_on(R range, Func func, Args... args) noexcept {
887 try {
888 return invoke_on(std::forward<R>(range), smp_submit_to_options{}, std::move(func), std::move(args)...);
889 } catch(...) {
891 }
892}
893
894template <typename Service>
895const Service& sharded<Service>::local() const noexcept {
896 assert(local_is_initialized());
897 return *_instances[this_shard_id()].service;
898}
899
900template <typename Service>
901Service& sharded<Service>::local() noexcept {
902 assert(local_is_initialized());
903 return *_instances[this_shard_id()].service;
904}
905
906template <typename Service>
908 assert(local_is_initialized());
909 return _instances[this_shard_id()].service;
910}
911
912template <typename Service>
914 return _instances.size() > this_shard_id() &&
915 _instances[this_shard_id()].service;
916}
917
918SEASTAR_MODULE_EXPORT_BEGIN
943template <typename PtrType>
944requires (!std::is_pointer_v<PtrType>)
946private:
947 PtrType _value;
948 unsigned _cpu;
949private:
950 void destroy(PtrType p, unsigned cpu) noexcept {
951 // `destroy()` is called from the destructor and other
952 // synchronous methods (like `reset()`), that have no way to
953 // wait for this future.
954 auto f = destroy_on(std::move(p), cpu);
955 if (!f.available() || f.failed()) {
956 internal::run_in_background(std::move(f));
957 }
958 }
959
960 static future<> destroy_on(PtrType p, unsigned cpu) noexcept {
961 if (p) {
962 if (cpu != this_shard_id()) {
963 return smp::submit_to(cpu, [v = std::move(p)] () mutable {
964 // Destroy the contained pointer. We do this explicitly
965 // in the current shard, because the lambda is destroyed
966 // in the shard that submitted the task.
967 v = {};
968 });
969 } else {
970 p = {};
971 }
972 }
973 return make_ready_future<>();
974 }
975public:
976 using element_type = typename std::pointer_traits<PtrType>::element_type;
977 using pointer = element_type*;
978
980 foreign_ptr() noexcept(std::is_nothrow_default_constructible_v<PtrType>)
981 : _value(PtrType())
982 , _cpu(this_shard_id()) {
983 }
985 foreign_ptr(std::nullptr_t) noexcept(std::is_nothrow_default_constructible_v<foreign_ptr>) : foreign_ptr() {}
987 foreign_ptr(PtrType value) noexcept(std::is_nothrow_move_constructible_v<PtrType>)
988 : _value(std::move(value))
989 , _cpu(this_shard_id()) {
990 }
991 // The type is intentionally non-copyable because copies
992 // are expensive because each copy requires across-CPU call.
993 foreign_ptr(const foreign_ptr&) = delete;
995 foreign_ptr(foreign_ptr&& other) noexcept(std::is_nothrow_move_constructible_v<PtrType>) = default;
998 destroy(std::move(_value), _cpu);
999 }
1001 future<foreign_ptr> copy() const noexcept {
1002 return smp::submit_to(_cpu, [this] () mutable {
1003 auto v = _value;
1004 return make_foreign(std::move(v));
1005 });
1006 }
1008 element_type& operator*() const noexcept(noexcept(*_value)) { return *_value; }
1010 element_type* operator->() const noexcept(noexcept(&*_value)) { return &*_value; }
1012 pointer get() const noexcept(noexcept(&*_value)) { return &*_value; }
1017 unsigned get_owner_shard() const noexcept { return _cpu; }
1019 operator bool() const noexcept(noexcept(static_cast<bool>(_value))) { return static_cast<bool>(_value); }
1021 foreign_ptr& operator=(foreign_ptr&& other) noexcept(std::is_nothrow_move_constructible_v<PtrType>) {
1022 destroy(std::move(_value), _cpu);
1023 _value = std::move(other._value);
1024 _cpu = other._cpu;
1025 return *this;
1026 }
1032 PtrType release() noexcept(std::is_nothrow_default_constructible_v<PtrType>) {
1033 return std::exchange(_value, {});
1034 }
1038 void reset(PtrType new_ptr) noexcept(std::is_nothrow_move_constructible_v<PtrType>) {
1039 auto old_ptr = std::move(_value);
1040 auto old_cpu = _cpu;
1041
1042 _value = std::move(new_ptr);
1043 _cpu = this_shard_id();
1044
1045 destroy(std::move(old_ptr), old_cpu);
1046 }
1050 void reset(std::nullptr_t = nullptr) noexcept(std::is_nothrow_default_constructible_v<PtrType>) {
1051 reset(PtrType());
1052 }
1053
1057 future<> destroy() noexcept {
1058 return destroy_on(std::move(_value), _cpu);
1059 }
1060};
1061
1065template <typename T>
1067 return foreign_ptr<T>(std::move(ptr));
1068}
1069
1071
1072template<typename T>
1073struct is_smart_ptr<foreign_ptr<T>> : std::true_type {};
1074
1075SEASTAR_MODULE_EXPORT_END
1076
1077}
Definition: sharded.hh:125
Definition: shared_ptr.hh:493
Definition: sharded.hh:945
pointer get() const noexcept(noexcept(&*_value))
Access the raw pointer to the wrapped object.
Definition: sharded.hh:1012
foreign_ptr(std::nullptr_t) noexcept(std::is_nothrow_default_constructible_v< foreign_ptr >)
Constructs a null foreign_ptr<>.
Definition: sharded.hh:985
element_type * operator->() const noexcept(noexcept(&*_value))
Accesses the wrapped object.
Definition: sharded.hh:1010
void reset(PtrType new_ptr) noexcept(std::is_nothrow_move_constructible_v< PtrType >)
Definition: sharded.hh:1038
unsigned get_owner_shard() const noexcept
Definition: sharded.hh:1017
~foreign_ptr()
Destroys the wrapped object on its original cpu.
Definition: sharded.hh:997
foreign_ptr(PtrType value) noexcept(std::is_nothrow_move_constructible_v< PtrType >)
Wraps a pointer object and remembers the current core.
Definition: sharded.hh:987
foreign_ptr(foreign_ptr &&other) noexcept(std::is_nothrow_move_constructible_v< PtrType >)=default
Moves a foreign_ptr<> to another object.
void reset(std::nullptr_t=nullptr) noexcept(std::is_nothrow_default_constructible_v< PtrType >)
Definition: sharded.hh:1050
foreign_ptr() noexcept(std::is_nothrow_default_constructible_v< PtrType >)
Constructs a null foreign_ptr<>.
Definition: sharded.hh:980
foreign_ptr & operator=(foreign_ptr &&other) noexcept(std::is_nothrow_move_constructible_v< PtrType >)
Move-assigns a foreign_ptr<>.
Definition: sharded.hh:1021
future< foreign_ptr > copy() const noexcept
Creates a copy of this foreign ptr. Only works if the stored ptr is copyable.
Definition: sharded.hh:1001
future destroy() noexcept
Definition: sharded.hh:1057
PtrType release() noexcept(std::is_nothrow_default_constructible_v< PtrType >)
Definition: sharded.hh:1032
element_type & operator*() const noexcept(noexcept(*_value))
Accesses the wrapped object.
Definition: sharded.hh:1008
A representation of a possibly not-yet-computed value.
Definition: future.hh:1219
Result then(Func &&func) noexcept
Schedule a block of code to run when the future is ready.
Definition: future.hh:1404
value_type && get()
gets the value returned by the computation
Definition: future.hh:1321
Exception thrown when a sharded object does not exist.
Definition: sharded.hh:161
Provide a sharded service with access to its peers.
Definition: sharded.hh:145
void set_value(A &&... a) noexcept
Sets the promises value.
Definition: future.hh:969
Helper to pass a parameter to a sharded<> object that depends on the shard. It is evaluated on the sh...
Definition: sharded.hh:550
sharded_parameter(Func func, Params... params)
Definition: sharded.hh:562
Definition: sharded.hh:182
sharded(sharded &&other)=delete
~sharded()
Destroyes a sharded object. Must not be in a started state.
Definition: sharded.hh:580
future invoke_on_all(smp_submit_to_options options, std::function< future<>(Service &)> func) noexcept
Definition: sharded.hh:755
future start_single(Args &&... args) noexcept
Definition: sharded.hh:652
const Service & local() const noexcept
Gets a reference to the local instance.
Definition: sharded.hh:895
future< std::vector< return_type > > map(Mapper mapper)
Definition: sharded.hh:490
future< Initial > map_reduce0(Mapper map, Initial initial, Reduce reduce) const
The const version of map_reduce0(Mapper map, Initial initial, Reduce reduce)
Definition: sharded.hh:467
auto map_reduce(Reducer &&r, Func &&func, Args &&... args) const -> typename reducer_traits< Reducer >::future_type
The const version of map_reduce(Reducer&& r, Func&& func)
Definition: sharded.hh:419
sharded() noexcept
Definition: sharded.hh:221
future stop() noexcept
Definition: sharded.hh:725
future invoke_on_others(smp_submit_to_options options, Func func, Args... args) noexcept
Definition: sharded.hh:812
auto map_reduce(Reducer &&r, Func &&func, Args &&... args) -> typename reducer_traits< Reducer >::future_type
Definition: sharded.hh:403
future< Initial > map_reduce0(Mapper map, Initial initial, Reduce reduce)
Definition: sharded.hh:451
Ret invoke_on(unsigned id, smp_submit_to_options options, Func &&func, Args &&... args)
Definition: sharded.hh:839
future start(Args &&... args) noexcept
Definition: sharded.hh:624
future< T > get_future() noexcept
Gets the promise's associated future.
Definition: future.hh:1905
future< T > current_exception_as_future() noexcept
Returns std::current_exception() wrapped in a future.
Definition: future.hh:1941
auto do_with(T1 &&rv1, T2 &&rv2, More &&... more) noexcept
Definition: do_with.hh:135
auto map_reduce(Iterator begin, Iterator end, Mapper &&mapper, Reducer &&r) -> typename reducer_traits< Reducer >::future_type
Definition: map_reduce.hh:107
future parallel_for_each(Iterator begin, Sentinel end, Func &&func) noexcept
Run tasks in parallel (iterator version).
Definition: loop.hh:571
foreign_ptr< T > make_foreign(T ptr)
Definition: sharded.hh:1066
std::future< T > submit_to(instance &instance, unsigned shard, Func func)
Definition: alien.hh:204
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
sstring format(fmt::format_string< A... > fmt, A &&... a)
Definition: format.hh:42
shard_id this_shard_id() noexcept
Returns shard_id of the of the current shard.
Definition: shard_id.hh:52
Definition: shared_ptr.hh:653
STL namespace.
Definition: is_smart_ptr.hh:30
Options controlling the behaviour of smp::submit_to().
Definition: smp.hh:168