Seastar
High performance C++ framework for concurrent servers
sharded.hh
1 /*
2  * This file is open source software, licensed to you under the terms
3  * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4  * distributed with this work for additional information regarding copyright
5  * ownership. You may not use this file except in compliance with the License.
6  *
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an
13  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14  * KIND, either express or implied. See the License for the
15  * specific language governing permissions and limitations
16  * under the License.
17  */
18 /*
19  * Copyright (C) 2015 Cloudius Systems, Ltd.
20  */
21 
22 #pragma once
23 
24 #include <seastar/core/smp.hh>
25 #include <seastar/core/loop.hh>
26 #include <seastar/core/map_reduce.hh>
27 #include <seastar/util/is_smart_ptr.hh>
28 #include <seastar/util/tuple_utils.hh>
29 #include <seastar/core/do_with.hh>
30 #include <seastar/util/concepts.hh>
31 #include <seastar/util/log.hh>
32 #include <seastar/core/reactor.hh>
33 #include <seastar/util/modules.hh>
34 
35 #ifndef SEASTAR_MODULE
36 #include <boost/iterator/counting_iterator.hpp>
37 #include <functional>
38 #if __has_include(<concepts>)
39 #include <concepts>
40 #endif
41 #endif
42 
52 
53 namespace seastar {
54 
55 SEASTAR_MODULE_EXPORT_BEGIN
56 
57 template <typename Func, typename... Param>
58 class sharded_parameter;
59 
60 template <typename Service>
61 class sharded;
62 
63 SEASTAR_MODULE_EXPORT_END
64 
65 namespace internal {
66 
67 template <typename Func, typename... Param>
68 auto unwrap_sharded_arg(sharded_parameter<Func, Param...> sp);
69 
70 using on_each_shard_func = std::function<future<> (unsigned shard)>;
71 
72 future<> sharded_parallel_for_each(unsigned nr_shards, on_each_shard_func on_each_shard) noexcept(std::is_nothrow_move_constructible_v<on_each_shard_func>);
73 
74 template <typename Service>
75 class either_sharded_or_local {
76  sharded<Service>& _sharded;
77 public:
78  either_sharded_or_local(sharded<Service>& s) : _sharded(s) {}
79  operator sharded<Service>& ();
80  operator Service& ();
81 };
82 
83 template <typename T>
84 struct sharded_unwrap {
85  using evaluated_type = T;
86  using type = T;
87 };
88 
89 template <typename T>
90 struct sharded_unwrap<std::reference_wrapper<sharded<T>>> {
91  using evaluated_type = T&;
92  using type = either_sharded_or_local<T>;
93 };
94 
95 template <typename Func, typename... Param>
96 struct sharded_unwrap<sharded_parameter<Func, Param...>> {
97  using type = std::invoke_result_t<Func, Param...>;
98 };
99 
100 template <typename T>
101 using sharded_unwrap_evaluated_t = typename sharded_unwrap<T>::evaluated_type;
102 
103 template <typename T>
104 using sharded_unwrap_t = typename sharded_unwrap<T>::type;
105 
106 } // internal
107 
108 
111 
112 SEASTAR_MODULE_EXPORT_BEGIN
113 
114 template <typename T>
115 class sharded;
116 
122 template<typename T>
124 protected:
125  std::function<void()> _delete_cb;
126  async_sharded_service() noexcept = default;
127  virtual ~async_sharded_service() {
128  if (_delete_cb) {
129  _delete_cb();
130  }
131  }
132  template <typename Service> friend class sharded;
133 };
134 
135 
141 template <typename Service>
143  sharded<Service>* _container = nullptr;
144 private:
145  template <typename T> friend class sharded;
146  void set_container(sharded<Service>* container) noexcept { _container = container; }
147 public:
148  peering_sharded_service() noexcept = default;
151  peering_sharded_service& operator=(const peering_sharded_service<Service>&) = delete;
152  sharded<Service>& container() noexcept { return *_container; }
153  const sharded<Service>& container() const noexcept { return *_container; }
154 };
155 
156 
158 class no_sharded_instance_exception : public std::exception {
159  sstring _msg;
160 public:
161  no_sharded_instance_exception() : _msg("sharded instance does not exist") {}
162  explicit no_sharded_instance_exception(sstring type_info)
163  : _msg("sharded instance does not exist: " + type_info) {}
164  virtual const char* what() const noexcept override {
165  return _msg.c_str();
166  }
167 };
168 
178 template <typename Service>
179 class sharded {
180  struct entry {
181  shared_ptr<Service> service;
182  promise<> freed;
183  };
184  std::vector<entry> _instances;
185 private:
186  using invoke_on_all_func_type = std::function<future<> (Service&)>;
187 private:
188  void service_deleted() noexcept {
189  _instances[this_shard_id()].freed.set_value();
190  }
191  template <typename U, bool async>
192  friend struct shared_ptr_make_helper;
193 
194  template <typename T>
195  std::enable_if_t<std::is_base_of<peering_sharded_service<T>, T>::value>
196  set_container(T& service) noexcept {
197  service.set_container(this);
198  }
199 
200  template <typename T>
201  std::enable_if_t<!std::is_base_of<peering_sharded_service<T>, T>::value>
202  set_container(T&) noexcept {
203  }
204 
205  future<>
206  sharded_parallel_for_each(internal::on_each_shard_func func) noexcept(std::is_nothrow_move_constructible_v<internal::on_each_shard_func>) {
207  return internal::sharded_parallel_for_each(_instances.size(), std::move(func));
208  }
209 public:
212  sharded() noexcept {}
213  sharded(const sharded& other) = delete;
214  sharded& operator=(const sharded& other) = delete;
217  sharded(sharded&& other) = delete;
218  sharded& operator=(sharded&& other) = delete;
220  ~sharded();
221 
228  template <typename... Args>
229  future<> start(Args&&... args) noexcept;
230 
237  template <typename... Args>
238  future<> start_single(Args&&... args) noexcept;
239 
244  future<> stop() noexcept;
245 
254  future<> invoke_on_all(smp_submit_to_options options, std::function<future<> (Service&)> func) noexcept;
255 
261  future<> invoke_on_all(std::function<future<> (Service&)> func) noexcept {
262  try {
263  return invoke_on_all(smp_submit_to_options{}, std::move(func));
264  } catch (...) {
266  }
267  }
268 
283  template <typename Func, typename... Args>
284  SEASTAR_CONCEPT(requires std::invocable<Func, Service&, internal::sharded_unwrap_t<Args>...>)
285  future<> invoke_on_all(smp_submit_to_options options, Func func, Args... args) noexcept;
286 
292  template <typename Func, typename... Args>
293  SEASTAR_CONCEPT(requires std::invocable<Func, Service&, internal::sharded_unwrap_t<Args>...>)
294  future<> invoke_on_all(Func func, Args... args) noexcept {
295  try {
296  return invoke_on_all(smp_submit_to_options{}, std::move(func), std::move(args)...);
297  } catch (...) {
299  }
300  }
301 
312  template <typename Func, typename... Args>
313  SEASTAR_CONCEPT(requires std::invocable<Func, Service&, Args...>)
314  future<> invoke_on_others(smp_submit_to_options options, Func func, Args... args) noexcept;
315 
327  template <typename Func, typename... Args>
328  SEASTAR_CONCEPT(requires std::invocable<Func, Service&, Args...>)
329  future<> invoke_on_others(Func func, Args... args) noexcept {
330  try {
331  return invoke_on_others(smp_submit_to_options{}, std::move(func), std::move(args)...);
332  } catch (...) {
334  }
335  }
336 
341  template <typename Reducer, typename Func, typename... Args>
342  inline
343  auto map_reduce(Reducer&& r, Func&& func, Args&&... args) -> typename reducer_traits<Reducer>::future_type
344  {
345  return ::seastar::map_reduce(boost::make_counting_iterator<unsigned>(0),
346  boost::make_counting_iterator<unsigned>(_instances.size()),
347  [this, func = std::forward<Func>(func), args = std::make_tuple(std::forward<Args>(args)...)] (unsigned c) mutable {
348  return smp::submit_to(c, [this, &func, args] () mutable {
349  return std::apply([this, &func] (Args&&... args) mutable {
350  auto inst = get_local_service();
351  return std::invoke(func, *inst, std::forward<Args>(args)...);
352  }, std::move(args));
353  });
354  }, std::forward<Reducer>(r));
355  }
356 
358  template <typename Reducer, typename Func, typename... Args>
359  inline
360  auto map_reduce(Reducer&& r, Func&& func, Args&&... args) const -> typename reducer_traits<Reducer>::future_type
361  {
362  return ::seastar::map_reduce(boost::make_counting_iterator<unsigned>(0),
363  boost::make_counting_iterator<unsigned>(_instances.size()),
364  [this, func = std::forward<Func>(func), args = std::make_tuple(std::forward<Args>(args)...)] (unsigned c) {
365  return smp::submit_to(c, [this, &func, args] () {
366  return std::apply([this, &func] (Args&&... args) {
367  auto inst = get_local_service();
368  return std::invoke(func, *inst, std::forward<Args>(args)...);
369  }, std::move(args));
370  });
371  }, std::forward<Reducer>(r));
372  }
373 
390  template <typename Mapper, typename Initial, typename Reduce>
391  inline
392  future<Initial>
393  map_reduce0(Mapper map, Initial initial, Reduce reduce) {
394  auto wrapped_map = [this, map] (unsigned c) {
395  return smp::submit_to(c, [this, map] {
396  auto inst = get_local_service();
397  return std::invoke(map, *inst);
398  });
399  };
400  return ::seastar::map_reduce(smp::all_cpus().begin(), smp::all_cpus().end(),
401  std::move(wrapped_map),
402  std::move(initial),
403  std::move(reduce));
404  }
405 
407  template <typename Mapper, typename Initial, typename Reduce>
408  inline
410  map_reduce0(Mapper map, Initial initial, Reduce reduce) const {
411  auto wrapped_map = [this, map] (unsigned c) {
412  return smp::submit_to(c, [this, map] {
413  auto inst = get_local_service();
414  return std::invoke(map, *inst);
415  });
416  };
417  return ::seastar::map_reduce(smp::all_cpus().begin(), smp::all_cpus().end(),
418  std::move(wrapped_map),
419  std::move(initial),
420  std::move(reduce));
421  }
422 
432  template <typename Mapper, typename Future = futurize_t<std::invoke_result_t<Mapper,Service&>>, typename return_type = decltype(internal::untuple(std::declval<typename Future::tuple_type>()))>
433  inline future<std::vector<return_type>> map(Mapper mapper) {
434  return do_with(std::vector<return_type>(), std::move(mapper),
435  [this] (std::vector<return_type>& vec, Mapper& mapper) mutable {
436  vec.resize(_instances.size());
437  return parallel_for_each(boost::irange<unsigned>(0, _instances.size()), [this, &vec, &mapper] (unsigned c) {
438  return smp::submit_to(c, [this, &mapper] {
439  auto inst = get_local_service();
440  return mapper(*inst);
441  }).then([&vec, c] (auto&& res) {
442  vec[c] = std::move(res);
443  });
444  }).then([&vec] {
445  return make_ready_future<std::vector<return_type>>(std::move(vec));
446  });
447  });
448  }
449 
462  template <typename Func, typename... Args, typename Ret = futurize_t<std::invoke_result_t<Func, Service&, Args...>>>
463  SEASTAR_CONCEPT(requires std::invocable<Func, Service&, Args&&...>)
464  Ret
465  invoke_on(unsigned id, smp_submit_to_options options, Func&& func, Args&&... args) {
466  return smp::submit_to(id, options, [this, func = std::forward<Func>(func), args = std::tuple(std::move(args)...)] () mutable {
467  auto inst = get_local_service();
468  return std::apply(std::forward<Func>(func), std::tuple_cat(std::forward_as_tuple(*inst), std::move(args)));
469  });
470  }
471 
480  template <typename Func, typename... Args, typename Ret = futurize_t<std::invoke_result_t<Func, Service&, Args&&...>>>
481  SEASTAR_CONCEPT(requires std::invocable<Func, Service&, Args&&...>)
482  Ret
483  invoke_on(unsigned id, Func&& func, Args&&... args) {
484  return invoke_on(id, smp_submit_to_options(), std::forward<Func>(func), std::forward<Args>(args)...);
485  }
486 
488  const Service& local() const noexcept;
489 
491  Service& local() noexcept;
492 
494  shared_ptr<Service> local_shared() noexcept;
495 
497  bool local_is_initialized() const noexcept;
498 
499 private:
500  void track_deletion(shared_ptr<Service>&, std::false_type) noexcept {
501  // do not wait for instance to be deleted since it is not going to notify us
502  service_deleted();
503  }
504 
505  void track_deletion(shared_ptr<Service>& s, std::true_type) {
506  s->_delete_cb = std::bind(std::mem_fn(&sharded<Service>::service_deleted), this);
507  }
508 
509  template <typename... Args>
510  shared_ptr<Service> create_local_service(Args&&... args) {
511  auto s = ::seastar::make_shared<Service>(std::forward<Args>(args)...);
512  set_container(*s);
513  track_deletion(s, std::is_base_of<async_sharded_service<Service>, Service>());
514  return s;
515  }
516 
517  shared_ptr<Service> get_local_service() {
518  auto inst = _instances[this_shard_id()].service;
519  if (!inst) {
520  throw no_sharded_instance_exception(pretty_type_name(typeid(Service)));
521  }
522  return inst;
523  }
524 
525  shared_ptr<const Service> get_local_service() const {
526  auto inst = _instances[this_shard_id()].service;
527  if (!inst) {
528  throw no_sharded_instance_exception(pretty_type_name(typeid(Service)));
529  }
530  return inst;
531  }
532 };
533 
534 
539 template <typename Func, typename... Params>
541  Func _func;
542  std::tuple<Params...> _params;
543 public:
552  explicit sharded_parameter(Func func, Params... params)
553  SEASTAR_CONCEPT(requires std::invocable<Func, internal::sharded_unwrap_evaluated_t<Params>...>)
554  : _func(std::move(func)), _params(std::make_tuple(std::move(params)...)) {
555  }
556 private:
557  auto evaluate() const;
558 
559  template <typename Func_, typename... Param_>
560  friend auto internal::unwrap_sharded_arg(sharded_parameter<Func_, Param_...> sp);
561 };
562 
566 SEASTAR_MODULE_EXPORT_END
568 
569 template <typename Service>
571  assert(_instances.empty());
572 }
573 
574 namespace internal {
575 
576 template <typename T>
577 inline
578 T&&
579 unwrap_sharded_arg(T&& arg) {
580  return std::forward<T>(arg);
581 }
582 
583 template <typename Service>
584 either_sharded_or_local<Service>
585 unwrap_sharded_arg(std::reference_wrapper<sharded<Service>> arg) {
586  return either_sharded_or_local<Service>(arg);
587 }
588 
589 template <typename Func, typename... Param>
590 auto
591 unwrap_sharded_arg(sharded_parameter<Func, Param...> sp) {
592  return sp.evaluate();
593 }
594 
595 template <typename Service>
596 either_sharded_or_local<Service>::operator sharded<Service>& () { return _sharded; }
597 
598 template <typename Service>
599 either_sharded_or_local<Service>::operator Service& () { return _sharded.local(); }
600 
601 }
602 
603 template <typename Func, typename... Param>
604 auto
605 sharded_parameter<Func, Param...>::evaluate() const {
606  auto unwrap_params_and_invoke = [this] (const auto&... params) {
607  return std::invoke(_func, internal::unwrap_sharded_arg(params)...);
608  };
609  return std::apply(unwrap_params_and_invoke, _params);
610 }
611 
612 template <typename Service>
613 template <typename... Args>
614 future<>
615 sharded<Service>::start(Args&&... args) noexcept {
616  try {
617  _instances.resize(smp::count);
618  return sharded_parallel_for_each(
619  [this, args = std::make_tuple(std::forward<Args>(args)...)] (unsigned c) mutable {
620  return smp::submit_to(c, [this, args] () mutable {
621  _instances[this_shard_id()].service = std::apply([this] (Args... args) {
622  return create_local_service(internal::unwrap_sharded_arg(std::forward<Args>(args))...);
623  }, args);
624  });
625  }).then_wrapped([this] (future<> f) {
626  try {
627  f.get();
628  return make_ready_future<>();
629  } catch (...) {
630  return this->stop().then([e = std::current_exception()] () mutable {
631  std::rethrow_exception(e);
632  });
633  }
634  });
635  } catch (...) {
637  }
638 }
639 
640 template <typename Service>
641 template <typename... Args>
642 future<>
643 sharded<Service>::start_single(Args&&... args) noexcept {
644  try {
645  assert(_instances.empty());
646  _instances.resize(1);
647  return smp::submit_to(0, [this, args = std::make_tuple(std::forward<Args>(args)...)] () mutable {
648  _instances[0].service = std::apply([this] (Args... args) {
649  return create_local_service(internal::unwrap_sharded_arg(std::forward<Args>(args))...);
650  }, args);
651  }).then_wrapped([this] (future<> f) {
652  try {
653  f.get();
654  return make_ready_future<>();
655  } catch (...) {
656  return this->stop().then([e = std::current_exception()] () mutable {
657  std::rethrow_exception(e);
658  });
659  }
660  });
661  } catch (...) {
663  }
664 }
665 
666 namespace internal {
667 
668 // Helper check if Service::stop exists
669 
670 struct sharded_has_stop {
671  // If a member names "stop" exists, try to call it, even if it doesn't
672  // have the correct signature. This is so that we don't ignore a function
673  // named stop() just because the signature is incorrect, and instead
674  // force the user to resolve the ambiguity.
675  template <typename Service>
676  constexpr static auto check(int) -> std::enable_if_t<(sizeof(&Service::stop) >= 0), bool> {
677  return true;
678  }
679 
680  // Fallback in case Service::stop doesn't exist.
681  template<typename>
682  static constexpr auto check(...) -> bool {
683  return false;
684  }
685 };
686 
687 template <bool stop_exists>
688 struct sharded_call_stop {
689  template <typename Service>
690  static future<> call(Service& instance);
691 };
692 
693 template <>
694 template <typename Service>
695 inline
696 future<> sharded_call_stop<true>::call(Service& instance) {
697  return instance.stop();
698 }
699 
700 template <>
701 template <typename Service>
702 inline
703 future<> sharded_call_stop<false>::call(Service&) {
704  return make_ready_future<>();
705 }
706 
707 template <typename Service>
708 inline
709 future<>
710 stop_sharded_instance(Service& instance) {
711  constexpr bool has_stop = internal::sharded_has_stop::check<Service>(0);
712  return internal::sharded_call_stop<has_stop>::call(instance);
713 }
714 
715 }
716 
717 template <typename Service>
718 future<>
720  try {
721  return sharded_parallel_for_each([this] (unsigned c) mutable {
722  return smp::submit_to(c, [this] () mutable {
723  auto inst = _instances[this_shard_id()].service;
724  if (!inst) {
725  return make_ready_future<>();
726  }
727  return internal::stop_sharded_instance(*inst);
728  });
729  }).then_wrapped([this] (future<> fut) {
730  return sharded_parallel_for_each([this] (unsigned c) {
731  return smp::submit_to(c, [this] {
732  if (_instances[this_shard_id()].service == nullptr) {
733  return make_ready_future<>();
734  }
735  _instances[this_shard_id()].service = nullptr;
736  return _instances[this_shard_id()].freed.get_future();
737  });
738  }).finally([this, fut = std::move(fut)] () mutable {
739  _instances.clear();
740  _instances = std::vector<sharded<Service>::entry>();
741  return std::move(fut);
742  });
743  });
744  } catch (...) {
746  }
747 }
748 
749 template <typename Service>
750 future<>
751 sharded<Service>::invoke_on_all(smp_submit_to_options options, std::function<future<> (Service&)> func) noexcept {
752  try {
753  return sharded_parallel_for_each([this, options, func = std::move(func)] (unsigned c) {
754  return smp::submit_to(c, options, [this, func] {
755  return func(*get_local_service());
756  });
757  });
758  } catch (...) {
760  }
761 }
762 
763 template <typename Service>
764 template <typename Func, typename... Args>
765 SEASTAR_CONCEPT(requires std::invocable<Func, Service&, internal::sharded_unwrap_t<Args>...>)
766 inline
767 future<>
768 sharded<Service>::invoke_on_all(smp_submit_to_options options, Func func, Args... args) noexcept {
769  static_assert(std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, internal::sharded_unwrap_t<Args>...>>, future<>>,
770  "invoke_on_all()'s func must return void or future<>");
771  try {
772  return invoke_on_all(options, invoke_on_all_func_type([func = std::move(func), args = std::tuple(std::move(args)...)] (Service& service) mutable {
773  return std::apply([&service, &func] (Args&&... args) mutable {
774  return futurize_apply(func, std::tuple_cat(std::forward_as_tuple(service), std::tuple(internal::unwrap_sharded_arg(std::forward<Args>(args))...)));
775  }, std::move(args));
776  }));
777  } catch (...) {
779  }
780 }
781 
782 template <typename Service>
783 template <typename Func, typename... Args>
784 SEASTAR_CONCEPT(requires std::invocable<Func, Service&, Args...>)
785 inline
786 future<>
787 sharded<Service>::invoke_on_others(smp_submit_to_options options, Func func, Args... args) noexcept {
788  static_assert(std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, Args...>>, future<>>,
789  "invoke_on_others()'s func must return void or future<>");
790  try {
791  return invoke_on_all(options, [orig = this_shard_id(), func = std::move(func), args = std::tuple(std::move(args)...)] (Service& s) -> future<> {
792  return this_shard_id() == orig ? make_ready_future<>() : futurize_apply(func, std::tuple_cat(std::forward_as_tuple(s), args));;
793  });
794  } catch (...) {
796  }
797 }
798 
799 template <typename Service>
800 const Service& sharded<Service>::local() const noexcept {
801  assert(local_is_initialized());
802  return *_instances[this_shard_id()].service;
803 }
804 
805 template <typename Service>
806 Service& sharded<Service>::local() noexcept {
807  assert(local_is_initialized());
808  return *_instances[this_shard_id()].service;
809 }
810 
811 template <typename Service>
813  assert(local_is_initialized());
814  return _instances[this_shard_id()].service;
815 }
816 
817 template <typename Service>
818 inline bool sharded<Service>::local_is_initialized() const noexcept {
819  return _instances.size() > this_shard_id() &&
820  _instances[this_shard_id()].service;
821 }
822 
823 SEASTAR_MODULE_EXPORT_BEGIN
848 template <typename PtrType>
849 SEASTAR_CONCEPT( requires (!std::is_pointer<PtrType>::value) )
850 class foreign_ptr {
851 private:
852  PtrType _value;
853  unsigned _cpu;
854 private:
855  void destroy(PtrType p, unsigned cpu) noexcept {
856  // `destroy()` is called from the destructor and other
857  // synchronous methods (like `reset()`), that have no way to
858  // wait for this future.
859  auto f = destroy_on(std::move(p), cpu);
860  if (!f.available() || f.failed()) {
861  engine().run_in_background(std::move(f));
862  }
863  }
864 
865  static future<> destroy_on(PtrType p, unsigned cpu) noexcept {
866  if (p) {
867  if (cpu != this_shard_id()) {
868  return smp::submit_to(cpu, [v = std::move(p)] () mutable {
869  // Destroy the contained pointer. We do this explicitly
870  // in the current shard, because the lambda is destroyed
871  // in the shard that submitted the task.
872  v = {};
873  });
874  } else {
875  p = {};
876  }
877  }
878  return make_ready_future<>();
879  }
880 public:
881  using element_type = typename std::pointer_traits<PtrType>::element_type;
882  using pointer = element_type*;
883 
885  foreign_ptr() noexcept(std::is_nothrow_default_constructible_v<PtrType>)
886  : _value(PtrType())
887  , _cpu(this_shard_id()) {
888  }
890  foreign_ptr(std::nullptr_t) noexcept(std::is_nothrow_default_constructible_v<foreign_ptr>) : foreign_ptr() {}
892  foreign_ptr(PtrType value) noexcept(std::is_nothrow_move_constructible_v<PtrType>)
893  : _value(std::move(value))
894  , _cpu(this_shard_id()) {
895  }
896  // The type is intentionally non-copyable because copies
897  // are expensive because each copy requires across-CPU call.
898  foreign_ptr(const foreign_ptr&) = delete;
900  foreign_ptr(foreign_ptr&& other) noexcept(std::is_nothrow_move_constructible_v<PtrType>) = default;
903  destroy(std::move(_value), _cpu);
904  }
906  future<foreign_ptr> copy() const noexcept {
907  return smp::submit_to(_cpu, [this] () mutable {
908  auto v = _value;
909  return make_foreign(std::move(v));
910  });
911  }
913  element_type& operator*() const noexcept(noexcept(*_value)) { return *_value; }
915  element_type* operator->() const noexcept(noexcept(&*_value)) { return &*_value; }
917  pointer get() const noexcept(noexcept(&*_value)) { return &*_value; }
922  unsigned get_owner_shard() const noexcept { return _cpu; }
924  operator bool() const noexcept(noexcept(static_cast<bool>(_value))) { return static_cast<bool>(_value); }
926  foreign_ptr& operator=(foreign_ptr&& other) noexcept(std::is_nothrow_move_constructible<PtrType>::value) {
927  destroy(std::move(_value), _cpu);
928  _value = std::move(other._value);
929  _cpu = other._cpu;
930  return *this;
931  }
937  PtrType release() noexcept(std::is_nothrow_default_constructible_v<PtrType>) {
938  return std::exchange(_value, {});
939  }
943  void reset(PtrType new_ptr) noexcept(std::is_nothrow_move_constructible_v<PtrType>) {
944  auto old_ptr = std::move(_value);
945  auto old_cpu = _cpu;
946 
947  _value = std::move(new_ptr);
948  _cpu = this_shard_id();
949 
950  destroy(std::move(old_ptr), old_cpu);
951  }
955  void reset(std::nullptr_t = nullptr) noexcept(std::is_nothrow_default_constructible_v<PtrType>) {
956  reset(PtrType());
957  }
958 
962  future<> destroy() noexcept {
963  return destroy_on(std::move(_value), _cpu);
964  }
965 };
966 
970 template <typename T>
972  return foreign_ptr<T>(std::move(ptr));
973 }
974 
976 
977 template<typename T>
978 struct is_smart_ptr<foreign_ptr<T>> : std::true_type {};
979 
980 SEASTAR_MODULE_EXPORT_END
981 
982 }
Definition: sharded.hh:123
Definition: shared_ptr.hh:499
Definition: sharded.hh:850
pointer get() const noexcept(noexcept(&*_value))
Access the raw pointer to the wrapped object.
Definition: sharded.hh:917
foreign_ptr(std::nullptr_t) noexcept(std::is_nothrow_default_constructible_v< foreign_ptr >)
Constructs a null foreign_ptr<>.
Definition: sharded.hh:890
void reset(PtrType new_ptr) noexcept(std::is_nothrow_move_constructible_v< PtrType >)
Definition: sharded.hh:943
foreign_ptr & operator=(foreign_ptr &&other) noexcept(std::is_nothrow_move_constructible< PtrType >::value)
Move-assigns a foreign_ptr<>.
Definition: sharded.hh:926
unsigned get_owner_shard() const noexcept
Definition: sharded.hh:922
~foreign_ptr()
Destroys the wrapped object on its original cpu.
Definition: sharded.hh:902
foreign_ptr(PtrType value) noexcept(std::is_nothrow_move_constructible_v< PtrType >)
Wraps a pointer object and remembers the current core.
Definition: sharded.hh:892
future< foreign_ptr > copy() const noexcept
Creates a copy of this foreign ptr. Only works if the stored ptr is copyable.
Definition: sharded.hh:906
foreign_ptr(foreign_ptr &&other) noexcept(std::is_nothrow_move_constructible_v< PtrType >)=default
Moves a foreign_ptr<> to another object.
void reset(std::nullptr_t=nullptr) noexcept(std::is_nothrow_default_constructible_v< PtrType >)
Definition: sharded.hh:955
element_type * operator->() const noexcept(noexcept(&*_value))
Accesses the wrapped object.
Definition: sharded.hh:915
foreign_ptr() noexcept(std::is_nothrow_default_constructible_v< PtrType >)
Constructs a null foreign_ptr<>.
Definition: sharded.hh:885
future destroy() noexcept
Definition: sharded.hh:962
PtrType release() noexcept(std::is_nothrow_default_constructible_v< PtrType >)
Definition: sharded.hh:937
element_type & operator*() const noexcept(noexcept(*_value))
Accesses the wrapped object.
Definition: sharded.hh:913
Result then(Func &&func) noexcept
Schedule a block of code to run when the future is ready.
Definition: future.hh:1396
value_type && get()
gets the value returned by the computation
Definition: future.hh:1326
Exception thrown when a sharded object does not exist.
Definition: sharded.hh:158
Provide a sharded service with access to its peers.
Definition: sharded.hh:142
void set_value(A &&... a) noexcept
Sets the promises value.
Definition: future.hh:969
Helper to pass a parameter to a sharded<> object that depends on the shard. It is evaluated on the sh...
Definition: sharded.hh:540
sharded_parameter(Func func, Params... params)
Definition: sharded.hh:552
Definition: sharded.hh:179
future< std::vector< return_type > > map(Mapper mapper)
Definition: sharded.hh:433
sharded(sharded &&other)=delete
~sharded()
Destroyes a sharded object. Must not be in a started state.
Definition: sharded.hh:570
Ret invoke_on(unsigned id, Func &&func, Args &&... args)
Definition: sharded.hh:483
future invoke_on_all(smp_submit_to_options options, std::function< future<>(Service &)> func) noexcept
Definition: sharded.hh:751
Ret invoke_on(unsigned id, smp_submit_to_options options, Func &&func, Args &&... args)
Definition: sharded.hh:465
future start_single(Args &&... args) noexcept
Definition: sharded.hh:643
auto map_reduce(Reducer &&r, Func &&func, Args &&... args) const -> typename reducer_traits< Reducer >::future_type
The const version of map_reduce(Reducer&& r, Func&& func)
Definition: sharded.hh:360
future invoke_on_others(smp_submit_to_options options, Func func, Args... args) noexcept
Definition: sharded.hh:787
sharded() noexcept
Definition: sharded.hh:212
future stop() noexcept
Definition: sharded.hh:719
auto map_reduce(Reducer &&r, Func &&func, Args &&... args) -> typename reducer_traits< Reducer >::future_type
Definition: sharded.hh:343
future< Initial > map_reduce0(Mapper map, Initial initial, Reduce reduce) const
The const version of map_reduce0(Mapper map, Initial initial, Reduce reduce)
Definition: sharded.hh:410
future< Initial > map_reduce0(Mapper map, Initial initial, Reduce reduce)
Definition: sharded.hh:393
future start(Args &&... args) noexcept
Definition: sharded.hh:615
future< T > current_exception_as_future() noexcept
Returns std::current_exception() wrapped in a future.
Definition: future.hh:1934
auto do_with(T1 &&rv1, T2 &&rv2, More &&... more) noexcept
Definition: do_with.hh:135
auto map_reduce(Iterator begin, Iterator end, Mapper &&mapper, Reducer &&r) -> typename reducer_traits< Reducer >::future_type
Definition: map_reduce.hh:107
future parallel_for_each(Iterator begin, Sentinel end, Func &&func) noexcept
Run tasks in parallel (iterator version).
Definition: loop.hh:568
foreign_ptr< T > make_foreign(T ptr)
Definition: sharded.hh:971
std::future< T > submit_to(instance &instance, unsigned shard, Func func)
Definition: alien.hh:207
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
shard_id this_shard_id() noexcept
Returns shard_id of the of the current shard.
Definition: smp.hh:78
Definition: shared_ptr.hh:659
Definition: is_smart_ptr.hh:30
Options controlling the behaviour of smp::submit_to().
Definition: smp.hh:174