Seastar
High performance C++ framework for concurrent servers
sharded.hh
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18/*
19 * Copyright (C) 2015 Cloudius Systems, Ltd.
20 */
21
22#pragma once
23
24#include <seastar/core/smp.hh>
25#include <seastar/core/loop.hh>
26#include <seastar/core/map_reduce.hh>
27#include <seastar/core/internal/run_in_background.hh>
28#include <seastar/util/is_smart_ptr.hh>
29#include <seastar/util/tuple_utils.hh>
30#include <seastar/core/do_with.hh>
31#include <seastar/util/log.hh>
32#include <seastar/util/modules.hh>
33
34#ifndef SEASTAR_MODULE
35#include <boost/iterator/counting_iterator.hpp>
36#include <concepts>
37#include <functional>
38#endif
39
49
50namespace seastar {
51
52SEASTAR_MODULE_EXPORT_BEGIN
53
54template <typename Func, typename... Param>
55class sharded_parameter;
56
57template <typename Service>
58class sharded;
59
60SEASTAR_MODULE_EXPORT_END
61
62namespace internal {
63
64template <typename Func, typename... Param>
65auto unwrap_sharded_arg(sharded_parameter<Func, Param...> sp);
66
67using on_each_shard_func = std::function<future<> (unsigned shard)>;
68
69future<> sharded_parallel_for_each(unsigned nr_shards, on_each_shard_func on_each_shard) noexcept(std::is_nothrow_move_constructible_v<on_each_shard_func>);
70
71template <typename Service>
72class either_sharded_or_local {
73 sharded<Service>& _sharded;
74public:
75 either_sharded_or_local(sharded<Service>& s) : _sharded(s) {}
76 operator sharded<Service>& ();
77 operator Service& ();
78};
79
80template <typename T>
81struct sharded_unwrap {
82 using evaluated_type = T;
83 using type = T;
84};
85
86template <typename T>
87struct sharded_unwrap<std::reference_wrapper<sharded<T>>> {
88 using evaluated_type = T&;
89 using type = either_sharded_or_local<T>;
90};
91
92template <typename Func, typename... Param>
93struct sharded_unwrap<sharded_parameter<Func, Param...>> {
94 using type = std::invoke_result_t<Func, Param...>;
95};
96
97template <typename T>
98using sharded_unwrap_evaluated_t = typename sharded_unwrap<T>::evaluated_type;
99
100template <typename T>
101using sharded_unwrap_t = typename sharded_unwrap<T>::type;
102
103} // internal
104
105
108
109SEASTAR_MODULE_EXPORT_BEGIN
110
111template <typename T>
112class sharded;
113
119template<typename T>
121 promise<> _freed;
122protected:
123 async_sharded_service() noexcept = default;
124 virtual ~async_sharded_service() {
125 _freed.set_value();
126 }
127 future<> freed() noexcept {
128 return _freed.get_future();
129 }
130 template <typename Service> friend class sharded;
131};
132
133
139template <typename Service>
141 sharded<Service>* _container = nullptr;
142private:
143 template <typename T> friend class sharded;
144 void set_container(sharded<Service>* container) noexcept { _container = container; }
145public:
146 peering_sharded_service() noexcept = default;
150 sharded<Service>& container() noexcept { return *_container; }
151 const sharded<Service>& container() const noexcept { return *_container; }
152};
153
154
156class no_sharded_instance_exception : public std::exception {
157 sstring _msg;
158public:
159 no_sharded_instance_exception() : _msg("sharded instance does not exist") {}
160 explicit no_sharded_instance_exception(sstring type_info)
161 : _msg("sharded instance does not exist: " + type_info) {}
162 virtual const char* what() const noexcept override {
163 return _msg.c_str();
164 }
165};
166
176template <typename Service>
177class sharded {
178 struct entry {
179 shared_ptr<Service> service;
180
181 future<> track_deletion() noexcept {
182 // do not wait for instance to be deleted if it is not going to notify us
183 if constexpr (std::is_base_of_v<async_sharded_service<Service>, Service>) {
184 if (service) {
185 return service->freed();
186 }
187 }
188 return make_ready_future<>();
189 }
190 };
191 std::vector<entry> _instances;
192private:
193 using invoke_on_all_func_type = std::function<future<> (Service&)>;
194private:
195 template <typename U, bool async>
196 friend struct shared_ptr_make_helper;
197
198 template <typename T>
199 std::enable_if_t<std::is_base_of_v<peering_sharded_service<T>, T>>
200 set_container(T& service) noexcept {
201 service.set_container(this);
202 }
203
204 template <typename T>
205 std::enable_if_t<!std::is_base_of_v<peering_sharded_service<T>, T>>
206 set_container(T&) noexcept {
207 }
208
210 sharded_parallel_for_each(internal::on_each_shard_func func) noexcept(std::is_nothrow_move_constructible_v<internal::on_each_shard_func>) {
211 return internal::sharded_parallel_for_each(_instances.size(), std::move(func));
212 }
213public:
216 sharded() noexcept {}
217 sharded(const sharded& other) = delete;
218 sharded& operator=(const sharded& other) = delete;
221 sharded(sharded&& other) = delete;
222 sharded& operator=(sharded&& other) = delete;
225
232 template <typename... Args>
233 future<> start(Args&&... args) noexcept;
234
241 template <typename... Args>
242 future<> start_single(Args&&... args) noexcept;
243
248 future<> stop() noexcept;
249
258 future<> invoke_on_all(smp_submit_to_options options, std::function<future<> (Service&)> func) noexcept;
259
265 future<> invoke_on_all(std::function<future<> (Service&)> func) noexcept {
266 try {
267 return invoke_on_all(smp_submit_to_options{}, std::move(func));
268 } catch (...) {
270 }
271 }
272
287 template <typename Func, typename... Args>
288 requires std::invocable<Func, Service&, internal::sharded_unwrap_t<Args>...>
289 future<> invoke_on_all(smp_submit_to_options options, Func func, Args... args) noexcept;
290
296 template <typename Func, typename... Args>
297 requires std::invocable<Func, Service&, internal::sharded_unwrap_t<Args>...>
298 future<> invoke_on_all(Func func, Args... args) noexcept {
299 try {
300 return invoke_on_all(smp_submit_to_options{}, std::move(func), std::move(args)...);
301 } catch (...) {
303 }
304 }
305
316 template <typename Func, typename... Args>
317 requires std::invocable<Func, Service&, Args...>
318 future<> invoke_on_others(smp_submit_to_options options, Func func, Args... args) noexcept;
319
331 template <typename Func, typename... Args>
332 requires std::invocable<Func, Service&, Args...>
333 future<> invoke_on_others(Func func, Args... args) noexcept {
334 try {
335 return invoke_on_others(smp_submit_to_options{}, std::move(func), std::move(args)...);
336 } catch (...) {
338 }
339 }
340
345 template <typename Reducer, typename Func, typename... Args>
346 inline
347 auto map_reduce(Reducer&& r, Func&& func, Args&&... args) -> typename reducer_traits<Reducer>::future_type
348 {
349 return ::seastar::map_reduce(boost::make_counting_iterator<unsigned>(0),
350 boost::make_counting_iterator<unsigned>(_instances.size()),
351 [this, func = std::forward<Func>(func), args = std::make_tuple(std::forward<Args>(args)...)] (unsigned c) mutable {
352 return smp::submit_to(c, [this, &func, args] () mutable {
353 return std::apply([this, &func] (Args&&... args) mutable {
354 auto inst = get_local_service();
355 return std::invoke(func, *inst, std::forward<Args>(args)...);
356 }, std::move(args));
357 });
358 }, std::forward<Reducer>(r));
359 }
360
362 template <typename Reducer, typename Func, typename... Args>
363 inline
364 auto map_reduce(Reducer&& r, Func&& func, Args&&... args) const -> typename reducer_traits<Reducer>::future_type
365 {
366 return ::seastar::map_reduce(boost::make_counting_iterator<unsigned>(0),
367 boost::make_counting_iterator<unsigned>(_instances.size()),
368 [this, func = std::forward<Func>(func), args = std::make_tuple(std::forward<Args>(args)...)] (unsigned c) {
369 return smp::submit_to(c, [this, &func, args] () {
370 return std::apply([this, &func] (Args&&... args) {
371 auto inst = get_local_service();
372 return std::invoke(func, *inst, std::forward<Args>(args)...);
373 }, std::move(args));
374 });
375 }, std::forward<Reducer>(r));
376 }
377
394 template <typename Mapper, typename Initial, typename Reduce>
395 inline
396 future<Initial>
397 map_reduce0(Mapper map, Initial initial, Reduce reduce) {
398 auto wrapped_map = [this, map] (unsigned c) {
399 return smp::submit_to(c, [this, map] {
400 auto inst = get_local_service();
401 return std::invoke(map, *inst);
402 });
403 };
404 return ::seastar::map_reduce(smp::all_cpus().begin(), smp::all_cpus().end(),
405 std::move(wrapped_map),
406 std::move(initial),
407 std::move(reduce));
408 }
409
411 template <typename Mapper, typename Initial, typename Reduce>
412 inline
414 map_reduce0(Mapper map, Initial initial, Reduce reduce) const {
415 auto wrapped_map = [this, map] (unsigned c) {
416 return smp::submit_to(c, [this, map] {
417 auto inst = get_local_service();
418 return std::invoke(map, *inst);
419 });
420 };
421 return ::seastar::map_reduce(smp::all_cpus().begin(), smp::all_cpus().end(),
422 std::move(wrapped_map),
423 std::move(initial),
424 std::move(reduce));
425 }
426
436 template <typename Mapper, typename Future = futurize_t<std::invoke_result_t<Mapper,Service&>>, typename return_type = decltype(internal::untuple(std::declval<typename Future::tuple_type>()))>
437 inline future<std::vector<return_type>> map(Mapper mapper) {
438 return do_with(std::vector<return_type>(), std::move(mapper),
439 [this] (std::vector<return_type>& vec, Mapper& mapper) mutable {
440 vec.resize(_instances.size());
441 return parallel_for_each(boost::irange<unsigned>(0, _instances.size()), [this, &vec, &mapper] (unsigned c) {
442 return smp::submit_to(c, [this, &mapper] {
443 auto inst = get_local_service();
444 return mapper(*inst);
445 }).then([&vec, c] (auto&& res) {
446 vec[c] = std::move(res);
447 });
448 }).then([&vec] {
449 return make_ready_future<std::vector<return_type>>(std::move(vec));
450 });
451 });
452 }
453
466 template <typename Func, typename... Args, typename Ret = futurize_t<std::invoke_result_t<Func, Service&, Args...>>>
467 requires std::invocable<Func, Service&, Args&&...>
468 Ret
469 invoke_on(unsigned id, smp_submit_to_options options, Func&& func, Args&&... args) {
470 return smp::submit_to(id, options, [this, func = std::forward<Func>(func), args = std::tuple(std::move(args)...)] () mutable {
471 auto inst = get_local_service();
472 return std::apply(std::forward<Func>(func), std::tuple_cat(std::forward_as_tuple(*inst), std::move(args)));
473 });
474 }
475
484 template <typename Func, typename... Args, typename Ret = futurize_t<std::invoke_result_t<Func, Service&, Args&&...>>>
485 requires std::invocable<Func, Service&, Args&&...>
486 Ret
487 invoke_on(unsigned id, Func&& func, Args&&... args) {
488 return invoke_on(id, smp_submit_to_options(), std::forward<Func>(func), std::forward<Args>(args)...);
489 }
490
492 const Service& local() const noexcept;
493
495 Service& local() noexcept;
496
498 shared_ptr<Service> local_shared() noexcept;
499
501 bool local_is_initialized() const noexcept;
502
503private:
504 template <typename... Args>
505 shared_ptr<Service> create_local_service(Args&&... args) {
506 auto s = ::seastar::make_shared<Service>(std::forward<Args>(args)...);
507 set_container(*s);
508 return s;
509 }
510
511 shared_ptr<Service> get_local_service() {
512 auto inst = _instances[this_shard_id()].service;
513 if (!inst) {
514 throw no_sharded_instance_exception(pretty_type_name(typeid(Service)));
515 }
516 return inst;
517 }
518
519 shared_ptr<const Service> get_local_service() const {
520 auto inst = _instances[this_shard_id()].service;
521 if (!inst) {
522 throw no_sharded_instance_exception(pretty_type_name(typeid(Service)));
523 }
524 return inst;
525 }
526};
527
528
533template <typename Func, typename... Params>
535 Func _func;
536 std::tuple<Params...> _params;
537public:
546 explicit sharded_parameter(Func func, Params... params)
547 requires std::invocable<Func, internal::sharded_unwrap_evaluated_t<Params>...>
548 : _func(std::move(func)), _params(std::make_tuple(std::move(params)...)) {
549 }
550private:
551 auto evaluate() const;
552
553 template <typename Func_, typename... Param_>
554 friend auto internal::unwrap_sharded_arg(sharded_parameter<Func_, Param_...> sp);
555};
556
560SEASTAR_MODULE_EXPORT_END
562
563template <typename Service>
565 assert(_instances.empty());
566}
567
568namespace internal {
569
570template <typename T>
571inline
572T&&
573unwrap_sharded_arg(T&& arg) {
574 return std::forward<T>(arg);
575}
576
577template <typename Service>
578either_sharded_or_local<Service>
579unwrap_sharded_arg(std::reference_wrapper<sharded<Service>> arg) {
580 return either_sharded_or_local<Service>(arg);
581}
582
583template <typename Func, typename... Param>
584auto
585unwrap_sharded_arg(sharded_parameter<Func, Param...> sp) {
586 return sp.evaluate();
587}
588
589template <typename Service>
590either_sharded_or_local<Service>::operator sharded<Service>& () { return _sharded; }
591
592template <typename Service>
593either_sharded_or_local<Service>::operator Service& () { return _sharded.local(); }
594
595}
596
597template <typename Func, typename... Param>
598auto
599sharded_parameter<Func, Param...>::evaluate() const {
600 auto unwrap_params_and_invoke = [this] (const auto&... params) {
601 return std::invoke(_func, internal::unwrap_sharded_arg(params)...);
602 };
603 return std::apply(unwrap_params_and_invoke, _params);
604}
605
606template <typename Service>
607template <typename... Args>
608future<>
609sharded<Service>::start(Args&&... args) noexcept {
610 try {
611 _instances.resize(smp::count);
612 return sharded_parallel_for_each(
613 [this, args = std::make_tuple(std::forward<Args>(args)...)] (unsigned c) mutable {
614 return smp::submit_to(c, [this, args] () mutable {
615 _instances[this_shard_id()].service = std::apply([this] (Args... args) {
616 return create_local_service(internal::unwrap_sharded_arg(std::forward<Args>(args))...);
617 }, args);
618 });
619 }).then_wrapped([this] (future<> f) {
620 try {
621 f.get();
622 return make_ready_future<>();
623 } catch (...) {
624 return this->stop().then([e = std::current_exception()] () mutable {
625 std::rethrow_exception(e);
626 });
627 }
628 });
629 } catch (...) {
631 }
632}
633
634template <typename Service>
635template <typename... Args>
637sharded<Service>::start_single(Args&&... args) noexcept {
638 try {
639 assert(_instances.empty());
640 _instances.resize(1);
641 return smp::submit_to(0, [this, args = std::make_tuple(std::forward<Args>(args)...)] () mutable {
642 _instances[0].service = std::apply([this] (Args... args) {
643 return create_local_service(internal::unwrap_sharded_arg(std::forward<Args>(args))...);
644 }, args);
645 }).then_wrapped([this] (future<> f) {
646 try {
647 f.get();
648 return make_ready_future<>();
649 } catch (...) {
650 return this->stop().then([e = std::current_exception()] () mutable {
651 std::rethrow_exception(e);
652 });
653 }
654 });
655 } catch (...) {
657 }
658}
659
660namespace internal {
661
662// Helper check if Service::stop exists
663
664struct sharded_has_stop {
665 // If a member names "stop" exists, try to call it, even if it doesn't
666 // have the correct signature. This is so that we don't ignore a function
667 // named stop() just because the signature is incorrect, and instead
668 // force the user to resolve the ambiguity.
669 template <typename Service>
670 constexpr static auto check(int) -> std::enable_if_t<(sizeof(&Service::stop) >= 0), bool> {
671 return true;
672 }
673
674 // Fallback in case Service::stop doesn't exist.
675 template<typename>
676 static constexpr auto check(...) -> bool {
677 return false;
678 }
679};
680
681template <bool stop_exists>
682struct sharded_call_stop {
683 template <typename Service>
684 static future<> call(Service& instance);
685};
686
687template <>
688template <typename Service>
689inline
690future<> sharded_call_stop<true>::call(Service& instance) {
691 return instance.stop();
692}
693
694template <>
695template <typename Service>
696inline
697future<> sharded_call_stop<false>::call(Service&) {
698 return make_ready_future<>();
699}
700
701template <typename Service>
702inline
703future<>
704stop_sharded_instance(Service& instance) {
705 constexpr bool has_stop = internal::sharded_has_stop::check<Service>(0);
706 return internal::sharded_call_stop<has_stop>::call(instance);
707}
708
709}
710
711template <typename Service>
712future<>
714 try {
715 return sharded_parallel_for_each([this] (unsigned c) mutable {
716 return smp::submit_to(c, [this] () mutable {
717 auto inst = _instances[this_shard_id()].service;
718 if (!inst) {
719 return make_ready_future<>();
720 }
721 return internal::stop_sharded_instance(*inst);
722 });
723 }).then_wrapped([this] (future<> fut) {
724 return sharded_parallel_for_each([this] (unsigned c) {
725 return smp::submit_to(c, [this] {
726 auto fut = _instances[this_shard_id()].track_deletion();
727 _instances[this_shard_id()].service = nullptr;
728 return fut;
729 });
730 }).finally([this, fut = std::move(fut)] () mutable {
731 _instances.clear();
732 _instances = std::vector<sharded<Service>::entry>();
733 return std::move(fut);
734 });
735 });
736 } catch (...) {
738 }
739}
740
741template <typename Service>
743sharded<Service>::invoke_on_all(smp_submit_to_options options, std::function<future<> (Service&)> func) noexcept {
744 try {
745 return sharded_parallel_for_each([this, options, func = std::move(func)] (unsigned c) {
746 return smp::submit_to(c, options, [this, func] {
747 return func(*get_local_service());
748 });
749 });
750 } catch (...) {
752 }
753}
754
755template <typename Service>
756template <typename Func, typename... Args>
757requires std::invocable<Func, Service&, internal::sharded_unwrap_t<Args>...>
758inline
760sharded<Service>::invoke_on_all(smp_submit_to_options options, Func func, Args... args) noexcept {
761 static_assert(std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, internal::sharded_unwrap_t<Args>...>>, future<>>,
762 "invoke_on_all()'s func must return void or future<>");
763 try {
764 return invoke_on_all(options, invoke_on_all_func_type([func = std::move(func), args = std::tuple(std::move(args)...)] (Service& service) mutable {
765 return std::apply([&service, &func] (Args&&... args) mutable {
766 return futurize_apply(func, std::tuple_cat(std::forward_as_tuple(service), std::tuple(internal::unwrap_sharded_arg(std::forward<Args>(args))...)));
767 }, std::move(args));
768 }));
769 } catch (...) {
771 }
772}
773
774template <typename Service>
775template <typename Func, typename... Args>
776requires std::invocable<Func, Service&, Args...>
777inline
779sharded<Service>::invoke_on_others(smp_submit_to_options options, Func func, Args... args) noexcept {
780 static_assert(std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, Args...>>, future<>>,
781 "invoke_on_others()'s func must return void or future<>");
782 try {
783 return invoke_on_all(options, [orig = this_shard_id(), func = std::move(func), args = std::tuple(std::move(args)...)] (Service& s) mutable -> future<> {
784 return this_shard_id() == orig ? make_ready_future<>() : futurize_apply(func, std::tuple_cat(std::forward_as_tuple(s), args));;
785 });
786 } catch (...) {
788 }
789}
790
791template <typename Service>
792const Service& sharded<Service>::local() const noexcept {
793 assert(local_is_initialized());
794 return *_instances[this_shard_id()].service;
795}
796
797template <typename Service>
798Service& sharded<Service>::local() noexcept {
799 assert(local_is_initialized());
800 return *_instances[this_shard_id()].service;
801}
802
803template <typename Service>
805 assert(local_is_initialized());
806 return _instances[this_shard_id()].service;
807}
808
809template <typename Service>
810inline bool sharded<Service>::local_is_initialized() const noexcept {
811 return _instances.size() > this_shard_id() &&
812 _instances[this_shard_id()].service;
813}
814
815SEASTAR_MODULE_EXPORT_BEGIN
840template <typename PtrType>
841requires (!std::is_pointer_v<PtrType>)
843private:
844 PtrType _value;
845 unsigned _cpu;
846private:
847 void destroy(PtrType p, unsigned cpu) noexcept {
848 // `destroy()` is called from the destructor and other
849 // synchronous methods (like `reset()`), that have no way to
850 // wait for this future.
851 auto f = destroy_on(std::move(p), cpu);
852 if (!f.available() || f.failed()) {
853 internal::run_in_background(std::move(f));
854 }
855 }
856
857 static future<> destroy_on(PtrType p, unsigned cpu) noexcept {
858 if (p) {
859 if (cpu != this_shard_id()) {
860 return smp::submit_to(cpu, [v = std::move(p)] () mutable {
861 // Destroy the contained pointer. We do this explicitly
862 // in the current shard, because the lambda is destroyed
863 // in the shard that submitted the task.
864 v = {};
865 });
866 } else {
867 p = {};
868 }
869 }
870 return make_ready_future<>();
871 }
872public:
873 using element_type = typename std::pointer_traits<PtrType>::element_type;
874 using pointer = element_type*;
875
877 foreign_ptr() noexcept(std::is_nothrow_default_constructible_v<PtrType>)
878 : _value(PtrType())
879 , _cpu(this_shard_id()) {
880 }
882 foreign_ptr(std::nullptr_t) noexcept(std::is_nothrow_default_constructible_v<foreign_ptr>) : foreign_ptr() {}
884 foreign_ptr(PtrType value) noexcept(std::is_nothrow_move_constructible_v<PtrType>)
885 : _value(std::move(value))
886 , _cpu(this_shard_id()) {
887 }
888 // The type is intentionally non-copyable because copies
889 // are expensive because each copy requires across-CPU call.
890 foreign_ptr(const foreign_ptr&) = delete;
892 foreign_ptr(foreign_ptr&& other) noexcept(std::is_nothrow_move_constructible_v<PtrType>) = default;
895 destroy(std::move(_value), _cpu);
896 }
898 future<foreign_ptr> copy() const noexcept {
899 return smp::submit_to(_cpu, [this] () mutable {
900 auto v = _value;
901 return make_foreign(std::move(v));
902 });
903 }
905 element_type& operator*() const noexcept(noexcept(*_value)) { return *_value; }
907 element_type* operator->() const noexcept(noexcept(&*_value)) { return &*_value; }
909 pointer get() const noexcept(noexcept(&*_value)) { return &*_value; }
914 unsigned get_owner_shard() const noexcept { return _cpu; }
916 operator bool() const noexcept(noexcept(static_cast<bool>(_value))) { return static_cast<bool>(_value); }
918 foreign_ptr& operator=(foreign_ptr&& other) noexcept(std::is_nothrow_move_constructible_v<PtrType>) {
919 destroy(std::move(_value), _cpu);
920 _value = std::move(other._value);
921 _cpu = other._cpu;
922 return *this;
923 }
929 PtrType release() noexcept(std::is_nothrow_default_constructible_v<PtrType>) {
930 return std::exchange(_value, {});
931 }
935 void reset(PtrType new_ptr) noexcept(std::is_nothrow_move_constructible_v<PtrType>) {
936 auto old_ptr = std::move(_value);
937 auto old_cpu = _cpu;
938
939 _value = std::move(new_ptr);
940 _cpu = this_shard_id();
941
942 destroy(std::move(old_ptr), old_cpu);
943 }
947 void reset(std::nullptr_t = nullptr) noexcept(std::is_nothrow_default_constructible_v<PtrType>) {
948 reset(PtrType());
949 }
950
954 future<> destroy() noexcept {
955 return destroy_on(std::move(_value), _cpu);
956 }
957};
958
962template <typename T>
964 return foreign_ptr<T>(std::move(ptr));
965}
966
968
969template<typename T>
970struct is_smart_ptr<foreign_ptr<T>> : std::true_type {};
971
972SEASTAR_MODULE_EXPORT_END
973
974}
Definition: sharded.hh:120
Definition: shared_ptr.hh:499
Definition: sharded.hh:842
pointer get() const noexcept(noexcept(&*_value))
Access the raw pointer to the wrapped object.
Definition: sharded.hh:909
foreign_ptr(std::nullptr_t) noexcept(std::is_nothrow_default_constructible_v< foreign_ptr >)
Constructs a null foreign_ptr<>.
Definition: sharded.hh:882
element_type * operator->() const noexcept(noexcept(&*_value))
Accesses the wrapped object.
Definition: sharded.hh:907
void reset(PtrType new_ptr) noexcept(std::is_nothrow_move_constructible_v< PtrType >)
Definition: sharded.hh:935
unsigned get_owner_shard() const noexcept
Definition: sharded.hh:914
~foreign_ptr()
Destroys the wrapped object on its original cpu.
Definition: sharded.hh:894
foreign_ptr(PtrType value) noexcept(std::is_nothrow_move_constructible_v< PtrType >)
Wraps a pointer object and remembers the current core.
Definition: sharded.hh:884
foreign_ptr(foreign_ptr &&other) noexcept(std::is_nothrow_move_constructible_v< PtrType >)=default
Moves a foreign_ptr<> to another object.
void reset(std::nullptr_t=nullptr) noexcept(std::is_nothrow_default_constructible_v< PtrType >)
Definition: sharded.hh:947
foreign_ptr() noexcept(std::is_nothrow_default_constructible_v< PtrType >)
Constructs a null foreign_ptr<>.
Definition: sharded.hh:877
foreign_ptr & operator=(foreign_ptr &&other) noexcept(std::is_nothrow_move_constructible_v< PtrType >)
Move-assigns a foreign_ptr<>.
Definition: sharded.hh:918
future< foreign_ptr > copy() const noexcept
Creates a copy of this foreign ptr. Only works if the stored ptr is copyable.
Definition: sharded.hh:898
future destroy() noexcept
Definition: sharded.hh:954
PtrType release() noexcept(std::is_nothrow_default_constructible_v< PtrType >)
Definition: sharded.hh:929
element_type & operator*() const noexcept(noexcept(*_value))
Accesses the wrapped object.
Definition: sharded.hh:905
A representation of a possibly not-yet-computed value.
Definition: future.hh:1240
Result then(Func &&func) noexcept
Schedule a block of code to run when the future is ready.
Definition: future.hh:1425
value_type && get()
gets the value returned by the computation
Definition: future.hh:1342
Exception thrown when a sharded object does not exist.
Definition: sharded.hh:156
Provide a sharded service with access to its peers.
Definition: sharded.hh:140
void set_value(A &&... a) noexcept
Sets the promises value.
Definition: future.hh:990
Helper to pass a parameter to a sharded<> object that depends on the shard. It is evaluated on the sh...
Definition: sharded.hh:534
sharded_parameter(Func func, Params... params)
Definition: sharded.hh:546
Definition: sharded.hh:177
sharded(sharded &&other)=delete
~sharded()
Destroyes a sharded object. Must not be in a started state.
Definition: sharded.hh:564
future invoke_on_all(smp_submit_to_options options, Func func, Args... args) noexcept
Definition: sharded.hh:760
Ret invoke_on(unsigned id, Func &&func, Args &&... args)
Definition: sharded.hh:487
future invoke_on_all(smp_submit_to_options options, std::function< future<>(Service &)> func) noexcept
Definition: sharded.hh:743
future start_single(Args &&... args) noexcept
Definition: sharded.hh:637
const Service & local() const noexcept
Gets a reference to the local instance.
Definition: sharded.hh:792
future< std::vector< return_type > > map(Mapper mapper)
Definition: sharded.hh:437
future< Initial > map_reduce0(Mapper map, Initial initial, Reduce reduce) const
The const version of map_reduce0(Mapper map, Initial initial, Reduce reduce)
Definition: sharded.hh:414
auto map_reduce(Reducer &&r, Func &&func, Args &&... args) const -> typename reducer_traits< Reducer >::future_type
The const version of map_reduce(Reducer&& r, Func&& func)
Definition: sharded.hh:364
future invoke_on_others(smp_submit_to_options options, Func func, Args... args) noexcept
Definition: sharded.hh:779
sharded() noexcept
Definition: sharded.hh:216
future stop() noexcept
Definition: sharded.hh:713
future invoke_on_others(Func func, Args... args) noexcept
Definition: sharded.hh:333
auto map_reduce(Reducer &&r, Func &&func, Args &&... args) -> typename reducer_traits< Reducer >::future_type
Definition: sharded.hh:347
future< Initial > map_reduce0(Mapper map, Initial initial, Reduce reduce)
Definition: sharded.hh:397
Ret invoke_on(unsigned id, smp_submit_to_options options, Func &&func, Args &&... args)
Definition: sharded.hh:469
future start(Args &&... args) noexcept
Definition: sharded.hh:609
future invoke_on_all(Func func, Args... args) noexcept
Definition: sharded.hh:298
future< T > get_future() noexcept
Gets the promise's associated future.
Definition: future.hh:1926
future< T > current_exception_as_future() noexcept
Returns std::current_exception() wrapped in a future.
Definition: future.hh:1962
auto do_with(T1 &&rv1, T2 &&rv2, More &&... more) noexcept
Definition: do_with.hh:135
auto map_reduce(Iterator begin, Iterator end, Mapper &&mapper, Reducer &&r) -> typename reducer_traits< Reducer >::future_type
Definition: map_reduce.hh:107
future parallel_for_each(Iterator begin, Sentinel end, Func &&func) noexcept
Run tasks in parallel (iterator version).
Definition: loop.hh:565
foreign_ptr< T > make_foreign(T ptr)
Definition: sharded.hh:963
std::future< T > submit_to(instance &instance, unsigned shard, Func func)
Definition: alien.hh:204
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
shard_id this_shard_id() noexcept
Returns shard_id of the of the current shard.
Definition: shard_id.hh:52
Definition: shared_ptr.hh:659
STL namespace.
Definition: is_smart_ptr.hh:30
Options controlling the behaviour of smp::submit_to().
Definition: smp.hh:168