Seastar
High performance C++ framework for concurrent servers
sharded.hh
1 /*
2  * This file is open source software, licensed to you under the terms
3  * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4  * distributed with this work for additional information regarding copyright
5  * ownership. You may not use this file except in compliance with the License.
6  *
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an
13  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14  * KIND, either express or implied. See the License for the
15  * specific language governing permissions and limitations
16  * under the License.
17  */
18 /*
19  * Copyright (C) 2015 Cloudius Systems, Ltd.
20  */
21 
22 #pragma once
23 
24 #include <seastar/core/smp.hh>
25 #include <seastar/core/loop.hh>
26 #include <seastar/core/map_reduce.hh>
27 #include <seastar/core/internal/run_in_background.hh>
28 #include <seastar/util/is_smart_ptr.hh>
29 #include <seastar/util/tuple_utils.hh>
30 #include <seastar/core/do_with.hh>
31 #include <seastar/util/concepts.hh>
32 #include <seastar/util/log.hh>
33 #include <seastar/util/modules.hh>
34 
35 #ifndef SEASTAR_MODULE
36 #include <boost/iterator/counting_iterator.hpp>
37 #include <functional>
38 #if __has_include(<concepts>)
39 #include <concepts>
40 #endif
41 #endif
42 
52 
53 namespace seastar {
54 
55 SEASTAR_MODULE_EXPORT_BEGIN
56 
57 template <typename Func, typename... Param>
58 class sharded_parameter;
59 
60 template <typename Service>
61 class sharded;
62 
63 SEASTAR_MODULE_EXPORT_END
64 
65 namespace internal {
66 
67 template <typename Func, typename... Param>
68 auto unwrap_sharded_arg(sharded_parameter<Func, Param...> sp);
69 
70 using on_each_shard_func = std::function<future<> (unsigned shard)>;
71 
72 future<> sharded_parallel_for_each(unsigned nr_shards, on_each_shard_func on_each_shard) noexcept(std::is_nothrow_move_constructible_v<on_each_shard_func>);
73 
74 template <typename Service>
75 class either_sharded_or_local {
76  sharded<Service>& _sharded;
77 public:
78  either_sharded_or_local(sharded<Service>& s) : _sharded(s) {}
79  operator sharded<Service>& ();
80  operator Service& ();
81 };
82 
83 template <typename T>
84 struct sharded_unwrap {
85  using evaluated_type = T;
86  using type = T;
87 };
88 
89 template <typename T>
90 struct sharded_unwrap<std::reference_wrapper<sharded<T>>> {
91  using evaluated_type = T&;
92  using type = either_sharded_or_local<T>;
93 };
94 
95 template <typename Func, typename... Param>
96 struct sharded_unwrap<sharded_parameter<Func, Param...>> {
97  using type = std::invoke_result_t<Func, Param...>;
98 };
99 
100 template <typename T>
101 using sharded_unwrap_evaluated_t = typename sharded_unwrap<T>::evaluated_type;
102 
103 template <typename T>
104 using sharded_unwrap_t = typename sharded_unwrap<T>::type;
105 
106 } // internal
107 
108 
111 
112 SEASTAR_MODULE_EXPORT_BEGIN
113 
114 template <typename T>
115 class sharded;
116 
122 template<typename T>
124  promise<> _freed;
125 protected:
126  async_sharded_service() noexcept = default;
127  virtual ~async_sharded_service() {
128  _freed.set_value();
129  }
130  future<> freed() noexcept {
131  return _freed.get_future();
132  }
133  template <typename Service> friend class sharded;
134 };
135 
136 
142 template <typename Service>
144  sharded<Service>* _container = nullptr;
145 private:
146  template <typename T> friend class sharded;
147  void set_container(sharded<Service>* container) noexcept { _container = container; }
148 public:
149  peering_sharded_service() noexcept = default;
152  peering_sharded_service& operator=(const peering_sharded_service<Service>&) = delete;
153  sharded<Service>& container() noexcept { return *_container; }
154  const sharded<Service>& container() const noexcept { return *_container; }
155 };
156 
157 
159 class no_sharded_instance_exception : public std::exception {
160  sstring _msg;
161 public:
162  no_sharded_instance_exception() : _msg("sharded instance does not exist") {}
163  explicit no_sharded_instance_exception(sstring type_info)
164  : _msg("sharded instance does not exist: " + type_info) {}
165  virtual const char* what() const noexcept override {
166  return _msg.c_str();
167  }
168 };
169 
179 template <typename Service>
180 class sharded {
181  struct entry {
182  shared_ptr<Service> service;
183 
184  future<> track_deletion() noexcept {
185  // do not wait for instance to be deleted if it is not going to notify us
186  if constexpr (std::is_base_of_v<async_sharded_service<Service>, Service>) {
187  if (service) {
188  return service->freed();
189  }
190  }
191  return make_ready_future<>();
192  }
193  };
194  std::vector<entry> _instances;
195 private:
196  using invoke_on_all_func_type = std::function<future<> (Service&)>;
197 private:
198  template <typename U, bool async>
199  friend struct shared_ptr_make_helper;
200 
201  template <typename T>
202  std::enable_if_t<std::is_base_of_v<peering_sharded_service<T>, T>>
203  set_container(T& service) noexcept {
204  service.set_container(this);
205  }
206 
207  template <typename T>
208  std::enable_if_t<!std::is_base_of_v<peering_sharded_service<T>, T>>
209  set_container(T&) noexcept {
210  }
211 
212  future<>
213  sharded_parallel_for_each(internal::on_each_shard_func func) noexcept(std::is_nothrow_move_constructible_v<internal::on_each_shard_func>) {
214  return internal::sharded_parallel_for_each(_instances.size(), std::move(func));
215  }
216 public:
219  sharded() noexcept {}
220  sharded(const sharded& other) = delete;
221  sharded& operator=(const sharded& other) = delete;
224  sharded(sharded&& other) = delete;
225  sharded& operator=(sharded&& other) = delete;
227  ~sharded();
228 
235  template <typename... Args>
236  future<> start(Args&&... args) noexcept;
237 
244  template <typename... Args>
245  future<> start_single(Args&&... args) noexcept;
246 
251  future<> stop() noexcept;
252 
261  future<> invoke_on_all(smp_submit_to_options options, std::function<future<> (Service&)> func) noexcept;
262 
268  future<> invoke_on_all(std::function<future<> (Service&)> func) noexcept {
269  try {
270  return invoke_on_all(smp_submit_to_options{}, std::move(func));
271  } catch (...) {
273  }
274  }
275 
290  template <typename Func, typename... Args>
291  SEASTAR_CONCEPT(requires std::invocable<Func, Service&, internal::sharded_unwrap_t<Args>...>)
292  future<> invoke_on_all(smp_submit_to_options options, Func func, Args... args) noexcept;
293 
299  template <typename Func, typename... Args>
300  SEASTAR_CONCEPT(requires std::invocable<Func, Service&, internal::sharded_unwrap_t<Args>...>)
301  future<> invoke_on_all(Func func, Args... args) noexcept {
302  try {
303  return invoke_on_all(smp_submit_to_options{}, std::move(func), std::move(args)...);
304  } catch (...) {
306  }
307  }
308 
319  template <typename Func, typename... Args>
320  SEASTAR_CONCEPT(requires std::invocable<Func, Service&, Args...>)
321  future<> invoke_on_others(smp_submit_to_options options, Func func, Args... args) noexcept;
322 
334  template <typename Func, typename... Args>
335  SEASTAR_CONCEPT(requires std::invocable<Func, Service&, Args...>)
336  future<> invoke_on_others(Func func, Args... args) noexcept {
337  try {
338  return invoke_on_others(smp_submit_to_options{}, std::move(func), std::move(args)...);
339  } catch (...) {
341  }
342  }
343 
348  template <typename Reducer, typename Func, typename... Args>
349  inline
350  auto map_reduce(Reducer&& r, Func&& func, Args&&... args) -> typename reducer_traits<Reducer>::future_type
351  {
352  return ::seastar::map_reduce(boost::make_counting_iterator<unsigned>(0),
353  boost::make_counting_iterator<unsigned>(_instances.size()),
354  [this, func = std::forward<Func>(func), args = std::make_tuple(std::forward<Args>(args)...)] (unsigned c) mutable {
355  return smp::submit_to(c, [this, &func, args] () mutable {
356  return std::apply([this, &func] (Args&&... args) mutable {
357  auto inst = get_local_service();
358  return std::invoke(func, *inst, std::forward<Args>(args)...);
359  }, std::move(args));
360  });
361  }, std::forward<Reducer>(r));
362  }
363 
365  template <typename Reducer, typename Func, typename... Args>
366  inline
367  auto map_reduce(Reducer&& r, Func&& func, Args&&... args) const -> typename reducer_traits<Reducer>::future_type
368  {
369  return ::seastar::map_reduce(boost::make_counting_iterator<unsigned>(0),
370  boost::make_counting_iterator<unsigned>(_instances.size()),
371  [this, func = std::forward<Func>(func), args = std::make_tuple(std::forward<Args>(args)...)] (unsigned c) {
372  return smp::submit_to(c, [this, &func, args] () {
373  return std::apply([this, &func] (Args&&... args) {
374  auto inst = get_local_service();
375  return std::invoke(func, *inst, std::forward<Args>(args)...);
376  }, std::move(args));
377  });
378  }, std::forward<Reducer>(r));
379  }
380 
397  template <typename Mapper, typename Initial, typename Reduce>
398  inline
399  future<Initial>
400  map_reduce0(Mapper map, Initial initial, Reduce reduce) {
401  auto wrapped_map = [this, map] (unsigned c) {
402  return smp::submit_to(c, [this, map] {
403  auto inst = get_local_service();
404  return std::invoke(map, *inst);
405  });
406  };
407  return ::seastar::map_reduce(smp::all_cpus().begin(), smp::all_cpus().end(),
408  std::move(wrapped_map),
409  std::move(initial),
410  std::move(reduce));
411  }
412 
414  template <typename Mapper, typename Initial, typename Reduce>
415  inline
417  map_reduce0(Mapper map, Initial initial, Reduce reduce) const {
418  auto wrapped_map = [this, map] (unsigned c) {
419  return smp::submit_to(c, [this, map] {
420  auto inst = get_local_service();
421  return std::invoke(map, *inst);
422  });
423  };
424  return ::seastar::map_reduce(smp::all_cpus().begin(), smp::all_cpus().end(),
425  std::move(wrapped_map),
426  std::move(initial),
427  std::move(reduce));
428  }
429 
439  template <typename Mapper, typename Future = futurize_t<std::invoke_result_t<Mapper,Service&>>, typename return_type = decltype(internal::untuple(std::declval<typename Future::tuple_type>()))>
440  inline future<std::vector<return_type>> map(Mapper mapper) {
441  return do_with(std::vector<return_type>(), std::move(mapper),
442  [this] (std::vector<return_type>& vec, Mapper& mapper) mutable {
443  vec.resize(_instances.size());
444  return parallel_for_each(boost::irange<unsigned>(0, _instances.size()), [this, &vec, &mapper] (unsigned c) {
445  return smp::submit_to(c, [this, &mapper] {
446  auto inst = get_local_service();
447  return mapper(*inst);
448  }).then([&vec, c] (auto&& res) {
449  vec[c] = std::move(res);
450  });
451  }).then([&vec] {
452  return make_ready_future<std::vector<return_type>>(std::move(vec));
453  });
454  });
455  }
456 
469  template <typename Func, typename... Args, typename Ret = futurize_t<std::invoke_result_t<Func, Service&, Args...>>>
470  SEASTAR_CONCEPT(requires std::invocable<Func, Service&, Args&&...>)
471  Ret
472  invoke_on(unsigned id, smp_submit_to_options options, Func&& func, Args&&... args) {
473  return smp::submit_to(id, options, [this, func = std::forward<Func>(func), args = std::tuple(std::move(args)...)] () mutable {
474  auto inst = get_local_service();
475  return std::apply(std::forward<Func>(func), std::tuple_cat(std::forward_as_tuple(*inst), std::move(args)));
476  });
477  }
478 
487  template <typename Func, typename... Args, typename Ret = futurize_t<std::invoke_result_t<Func, Service&, Args&&...>>>
488  SEASTAR_CONCEPT(requires std::invocable<Func, Service&, Args&&...>)
489  Ret
490  invoke_on(unsigned id, Func&& func, Args&&... args) {
491  return invoke_on(id, smp_submit_to_options(), std::forward<Func>(func), std::forward<Args>(args)...);
492  }
493 
495  const Service& local() const noexcept;
496 
498  Service& local() noexcept;
499 
501  shared_ptr<Service> local_shared() noexcept;
502 
504  bool local_is_initialized() const noexcept;
505 
506 private:
507  template <typename... Args>
508  shared_ptr<Service> create_local_service(Args&&... args) {
509  auto s = ::seastar::make_shared<Service>(std::forward<Args>(args)...);
510  set_container(*s);
511  return s;
512  }
513 
514  shared_ptr<Service> get_local_service() {
515  auto inst = _instances[this_shard_id()].service;
516  if (!inst) {
517  throw no_sharded_instance_exception(pretty_type_name(typeid(Service)));
518  }
519  return inst;
520  }
521 
522  shared_ptr<const Service> get_local_service() const {
523  auto inst = _instances[this_shard_id()].service;
524  if (!inst) {
525  throw no_sharded_instance_exception(pretty_type_name(typeid(Service)));
526  }
527  return inst;
528  }
529 };
530 
531 
536 template <typename Func, typename... Params>
538  Func _func;
539  std::tuple<Params...> _params;
540 public:
549  explicit sharded_parameter(Func func, Params... params)
550  SEASTAR_CONCEPT(requires std::invocable<Func, internal::sharded_unwrap_evaluated_t<Params>...>)
551  : _func(std::move(func)), _params(std::make_tuple(std::move(params)...)) {
552  }
553 private:
554  auto evaluate() const;
555 
556  template <typename Func_, typename... Param_>
557  friend auto internal::unwrap_sharded_arg(sharded_parameter<Func_, Param_...> sp);
558 };
559 
563 SEASTAR_MODULE_EXPORT_END
565 
566 template <typename Service>
568  assert(_instances.empty());
569 }
570 
571 namespace internal {
572 
573 template <typename T>
574 inline
575 T&&
576 unwrap_sharded_arg(T&& arg) {
577  return std::forward<T>(arg);
578 }
579 
580 template <typename Service>
581 either_sharded_or_local<Service>
582 unwrap_sharded_arg(std::reference_wrapper<sharded<Service>> arg) {
583  return either_sharded_or_local<Service>(arg);
584 }
585 
586 template <typename Func, typename... Param>
587 auto
588 unwrap_sharded_arg(sharded_parameter<Func, Param...> sp) {
589  return sp.evaluate();
590 }
591 
592 template <typename Service>
593 either_sharded_or_local<Service>::operator sharded<Service>& () { return _sharded; }
594 
595 template <typename Service>
596 either_sharded_or_local<Service>::operator Service& () { return _sharded.local(); }
597 
598 }
599 
600 template <typename Func, typename... Param>
601 auto
602 sharded_parameter<Func, Param...>::evaluate() const {
603  auto unwrap_params_and_invoke = [this] (const auto&... params) {
604  return std::invoke(_func, internal::unwrap_sharded_arg(params)...);
605  };
606  return std::apply(unwrap_params_and_invoke, _params);
607 }
608 
609 template <typename Service>
610 template <typename... Args>
611 future<>
612 sharded<Service>::start(Args&&... args) noexcept {
613  try {
614  _instances.resize(smp::count);
615  return sharded_parallel_for_each(
616  [this, args = std::make_tuple(std::forward<Args>(args)...)] (unsigned c) mutable {
617  return smp::submit_to(c, [this, args] () mutable {
618  _instances[this_shard_id()].service = std::apply([this] (Args... args) {
619  return create_local_service(internal::unwrap_sharded_arg(std::forward<Args>(args))...);
620  }, args);
621  });
622  }).then_wrapped([this] (future<> f) {
623  try {
624  f.get();
625  return make_ready_future<>();
626  } catch (...) {
627  return this->stop().then([e = std::current_exception()] () mutable {
628  std::rethrow_exception(e);
629  });
630  }
631  });
632  } catch (...) {
634  }
635 }
636 
637 template <typename Service>
638 template <typename... Args>
639 future<>
640 sharded<Service>::start_single(Args&&... args) noexcept {
641  try {
642  assert(_instances.empty());
643  _instances.resize(1);
644  return smp::submit_to(0, [this, args = std::make_tuple(std::forward<Args>(args)...)] () mutable {
645  _instances[0].service = std::apply([this] (Args... args) {
646  return create_local_service(internal::unwrap_sharded_arg(std::forward<Args>(args))...);
647  }, args);
648  }).then_wrapped([this] (future<> f) {
649  try {
650  f.get();
651  return make_ready_future<>();
652  } catch (...) {
653  return this->stop().then([e = std::current_exception()] () mutable {
654  std::rethrow_exception(e);
655  });
656  }
657  });
658  } catch (...) {
660  }
661 }
662 
663 namespace internal {
664 
665 // Helper check if Service::stop exists
666 
667 struct sharded_has_stop {
668  // If a member names "stop" exists, try to call it, even if it doesn't
669  // have the correct signature. This is so that we don't ignore a function
670  // named stop() just because the signature is incorrect, and instead
671  // force the user to resolve the ambiguity.
672  template <typename Service>
673  constexpr static auto check(int) -> std::enable_if_t<(sizeof(&Service::stop) >= 0), bool> {
674  return true;
675  }
676 
677  // Fallback in case Service::stop doesn't exist.
678  template<typename>
679  static constexpr auto check(...) -> bool {
680  return false;
681  }
682 };
683 
684 template <bool stop_exists>
685 struct sharded_call_stop {
686  template <typename Service>
687  static future<> call(Service& instance);
688 };
689 
690 template <>
691 template <typename Service>
692 inline
693 future<> sharded_call_stop<true>::call(Service& instance) {
694  return instance.stop();
695 }
696 
697 template <>
698 template <typename Service>
699 inline
700 future<> sharded_call_stop<false>::call(Service&) {
701  return make_ready_future<>();
702 }
703 
704 template <typename Service>
705 inline
706 future<>
707 stop_sharded_instance(Service& instance) {
708  constexpr bool has_stop = internal::sharded_has_stop::check<Service>(0);
709  return internal::sharded_call_stop<has_stop>::call(instance);
710 }
711 
712 }
713 
714 template <typename Service>
715 future<>
717  try {
718  return sharded_parallel_for_each([this] (unsigned c) mutable {
719  return smp::submit_to(c, [this] () mutable {
720  auto inst = _instances[this_shard_id()].service;
721  if (!inst) {
722  return make_ready_future<>();
723  }
724  return internal::stop_sharded_instance(*inst);
725  });
726  }).then_wrapped([this] (future<> fut) {
727  return sharded_parallel_for_each([this] (unsigned c) {
728  return smp::submit_to(c, [this] {
729  auto fut = _instances[this_shard_id()].track_deletion();
730  _instances[this_shard_id()].service = nullptr;
731  return fut;
732  });
733  }).finally([this, fut = std::move(fut)] () mutable {
734  _instances.clear();
735  _instances = std::vector<sharded<Service>::entry>();
736  return std::move(fut);
737  });
738  });
739  } catch (...) {
741  }
742 }
743 
744 template <typename Service>
745 future<>
746 sharded<Service>::invoke_on_all(smp_submit_to_options options, std::function<future<> (Service&)> func) noexcept {
747  try {
748  return sharded_parallel_for_each([this, options, func = std::move(func)] (unsigned c) {
749  return smp::submit_to(c, options, [this, func] {
750  return func(*get_local_service());
751  });
752  });
753  } catch (...) {
755  }
756 }
757 
758 template <typename Service>
759 template <typename Func, typename... Args>
760 SEASTAR_CONCEPT(requires std::invocable<Func, Service&, internal::sharded_unwrap_t<Args>...>)
761 inline
762 future<>
763 sharded<Service>::invoke_on_all(smp_submit_to_options options, Func func, Args... args) noexcept {
764  static_assert(std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, internal::sharded_unwrap_t<Args>...>>, future<>>,
765  "invoke_on_all()'s func must return void or future<>");
766  try {
767  return invoke_on_all(options, invoke_on_all_func_type([func = std::move(func), args = std::tuple(std::move(args)...)] (Service& service) mutable {
768  return std::apply([&service, &func] (Args&&... args) mutable {
769  return futurize_apply(func, std::tuple_cat(std::forward_as_tuple(service), std::tuple(internal::unwrap_sharded_arg(std::forward<Args>(args))...)));
770  }, std::move(args));
771  }));
772  } catch (...) {
774  }
775 }
776 
777 template <typename Service>
778 template <typename Func, typename... Args>
779 SEASTAR_CONCEPT(requires std::invocable<Func, Service&, Args...>)
780 inline
781 future<>
782 sharded<Service>::invoke_on_others(smp_submit_to_options options, Func func, Args... args) noexcept {
783  static_assert(std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, Args...>>, future<>>,
784  "invoke_on_others()'s func must return void or future<>");
785  try {
786  return invoke_on_all(options, [orig = this_shard_id(), func = std::move(func), args = std::tuple(std::move(args)...)] (Service& s) -> future<> {
787  return this_shard_id() == orig ? make_ready_future<>() : futurize_apply(func, std::tuple_cat(std::forward_as_tuple(s), args));;
788  });
789  } catch (...) {
791  }
792 }
793 
794 template <typename Service>
795 const Service& sharded<Service>::local() const noexcept {
796  assert(local_is_initialized());
797  return *_instances[this_shard_id()].service;
798 }
799 
800 template <typename Service>
801 Service& sharded<Service>::local() noexcept {
802  assert(local_is_initialized());
803  return *_instances[this_shard_id()].service;
804 }
805 
806 template <typename Service>
808  assert(local_is_initialized());
809  return _instances[this_shard_id()].service;
810 }
811 
812 template <typename Service>
813 inline bool sharded<Service>::local_is_initialized() const noexcept {
814  return _instances.size() > this_shard_id() &&
815  _instances[this_shard_id()].service;
816 }
817 
818 SEASTAR_MODULE_EXPORT_BEGIN
843 template <typename PtrType>
844 SEASTAR_CONCEPT( requires (!std::is_pointer_v<PtrType>) )
845 class foreign_ptr {
846 private:
847  PtrType _value;
848  unsigned _cpu;
849 private:
850  void destroy(PtrType p, unsigned cpu) noexcept {
851  // `destroy()` is called from the destructor and other
852  // synchronous methods (like `reset()`), that have no way to
853  // wait for this future.
854  auto f = destroy_on(std::move(p), cpu);
855  if (!f.available() || f.failed()) {
856  internal::run_in_background(std::move(f));
857  }
858  }
859 
860  static future<> destroy_on(PtrType p, unsigned cpu) noexcept {
861  if (p) {
862  if (cpu != this_shard_id()) {
863  return smp::submit_to(cpu, [v = std::move(p)] () mutable {
864  // Destroy the contained pointer. We do this explicitly
865  // in the current shard, because the lambda is destroyed
866  // in the shard that submitted the task.
867  v = {};
868  });
869  } else {
870  p = {};
871  }
872  }
873  return make_ready_future<>();
874  }
875 public:
876  using element_type = typename std::pointer_traits<PtrType>::element_type;
877  using pointer = element_type*;
878 
880  foreign_ptr() noexcept(std::is_nothrow_default_constructible_v<PtrType>)
881  : _value(PtrType())
882  , _cpu(this_shard_id()) {
883  }
885  foreign_ptr(std::nullptr_t) noexcept(std::is_nothrow_default_constructible_v<foreign_ptr>) : foreign_ptr() {}
887  foreign_ptr(PtrType value) noexcept(std::is_nothrow_move_constructible_v<PtrType>)
888  : _value(std::move(value))
889  , _cpu(this_shard_id()) {
890  }
891  // The type is intentionally non-copyable because copies
892  // are expensive because each copy requires across-CPU call.
893  foreign_ptr(const foreign_ptr&) = delete;
895  foreign_ptr(foreign_ptr&& other) noexcept(std::is_nothrow_move_constructible_v<PtrType>) = default;
898  destroy(std::move(_value), _cpu);
899  }
901  future<foreign_ptr> copy() const noexcept {
902  return smp::submit_to(_cpu, [this] () mutable {
903  auto v = _value;
904  return make_foreign(std::move(v));
905  });
906  }
908  element_type& operator*() const noexcept(noexcept(*_value)) { return *_value; }
910  element_type* operator->() const noexcept(noexcept(&*_value)) { return &*_value; }
912  pointer get() const noexcept(noexcept(&*_value)) { return &*_value; }
917  unsigned get_owner_shard() const noexcept { return _cpu; }
919  operator bool() const noexcept(noexcept(static_cast<bool>(_value))) { return static_cast<bool>(_value); }
921  foreign_ptr& operator=(foreign_ptr&& other) noexcept(std::is_nothrow_move_constructible_v<PtrType>) {
922  destroy(std::move(_value), _cpu);
923  _value = std::move(other._value);
924  _cpu = other._cpu;
925  return *this;
926  }
932  PtrType release() noexcept(std::is_nothrow_default_constructible_v<PtrType>) {
933  return std::exchange(_value, {});
934  }
938  void reset(PtrType new_ptr) noexcept(std::is_nothrow_move_constructible_v<PtrType>) {
939  auto old_ptr = std::move(_value);
940  auto old_cpu = _cpu;
941 
942  _value = std::move(new_ptr);
943  _cpu = this_shard_id();
944 
945  destroy(std::move(old_ptr), old_cpu);
946  }
950  void reset(std::nullptr_t = nullptr) noexcept(std::is_nothrow_default_constructible_v<PtrType>) {
951  reset(PtrType());
952  }
953 
957  future<> destroy() noexcept {
958  return destroy_on(std::move(_value), _cpu);
959  }
960 };
961 
965 template <typename T>
967  return foreign_ptr<T>(std::move(ptr));
968 }
969 
971 
972 template<typename T>
973 struct is_smart_ptr<foreign_ptr<T>> : std::true_type {};
974 
975 SEASTAR_MODULE_EXPORT_END
976 
977 }
Definition: sharded.hh:123
Definition: shared_ptr.hh:501
Definition: sharded.hh:845
pointer get() const noexcept(noexcept(&*_value))
Access the raw pointer to the wrapped object.
Definition: sharded.hh:912
foreign_ptr(std::nullptr_t) noexcept(std::is_nothrow_default_constructible_v< foreign_ptr >)
Constructs a null foreign_ptr<>.
Definition: sharded.hh:885
foreign_ptr & operator=(foreign_ptr &&other) noexcept(std::is_nothrow_move_constructible_v< PtrType >)
Move-assigns a foreign_ptr<>.
Definition: sharded.hh:921
void reset(PtrType new_ptr) noexcept(std::is_nothrow_move_constructible_v< PtrType >)
Definition: sharded.hh:938
unsigned get_owner_shard() const noexcept
Definition: sharded.hh:917
~foreign_ptr()
Destroys the wrapped object on its original cpu.
Definition: sharded.hh:897
foreign_ptr(PtrType value) noexcept(std::is_nothrow_move_constructible_v< PtrType >)
Wraps a pointer object and remembers the current core.
Definition: sharded.hh:887
future< foreign_ptr > copy() const noexcept
Creates a copy of this foreign ptr. Only works if the stored ptr is copyable.
Definition: sharded.hh:901
foreign_ptr(foreign_ptr &&other) noexcept(std::is_nothrow_move_constructible_v< PtrType >)=default
Moves a foreign_ptr<> to another object.
void reset(std::nullptr_t=nullptr) noexcept(std::is_nothrow_default_constructible_v< PtrType >)
Definition: sharded.hh:950
element_type * operator->() const noexcept(noexcept(&*_value))
Accesses the wrapped object.
Definition: sharded.hh:910
foreign_ptr() noexcept(std::is_nothrow_default_constructible_v< PtrType >)
Constructs a null foreign_ptr<>.
Definition: sharded.hh:880
future destroy() noexcept
Definition: sharded.hh:957
PtrType release() noexcept(std::is_nothrow_default_constructible_v< PtrType >)
Definition: sharded.hh:932
element_type & operator*() const noexcept(noexcept(*_value))
Accesses the wrapped object.
Definition: sharded.hh:908
Result then(Func &&func) noexcept
Schedule a block of code to run when the future is ready.
Definition: future.hh:1410
value_type && get()
gets the value returned by the computation
Definition: future.hh:1340
Exception thrown when a sharded object does not exist.
Definition: sharded.hh:159
Provide a sharded service with access to its peers.
Definition: sharded.hh:143
void set_value(A &&... a) noexcept
Sets the promises value.
Definition: future.hh:982
Helper to pass a parameter to a sharded<> object that depends on the shard. It is evaluated on the sh...
Definition: sharded.hh:537
sharded_parameter(Func func, Params... params)
Definition: sharded.hh:549
Definition: sharded.hh:180
future< std::vector< return_type > > map(Mapper mapper)
Definition: sharded.hh:440
sharded(sharded &&other)=delete
~sharded()
Destroyes a sharded object. Must not be in a started state.
Definition: sharded.hh:567
Ret invoke_on(unsigned id, Func &&func, Args &&... args)
Definition: sharded.hh:490
future invoke_on_all(smp_submit_to_options options, std::function< future<>(Service &)> func) noexcept
Definition: sharded.hh:746
Ret invoke_on(unsigned id, smp_submit_to_options options, Func &&func, Args &&... args)
Definition: sharded.hh:472
future start_single(Args &&... args) noexcept
Definition: sharded.hh:640
auto map_reduce(Reducer &&r, Func &&func, Args &&... args) const -> typename reducer_traits< Reducer >::future_type
The const version of map_reduce(Reducer&& r, Func&& func)
Definition: sharded.hh:367
future invoke_on_others(smp_submit_to_options options, Func func, Args... args) noexcept
Definition: sharded.hh:782
sharded() noexcept
Definition: sharded.hh:219
future stop() noexcept
Definition: sharded.hh:716
auto map_reduce(Reducer &&r, Func &&func, Args &&... args) -> typename reducer_traits< Reducer >::future_type
Definition: sharded.hh:350
future< Initial > map_reduce0(Mapper map, Initial initial, Reduce reduce) const
The const version of map_reduce0(Mapper map, Initial initial, Reduce reduce)
Definition: sharded.hh:417
future< Initial > map_reduce0(Mapper map, Initial initial, Reduce reduce)
Definition: sharded.hh:400
future start(Args &&... args) noexcept
Definition: sharded.hh:612
future< T > get_future() noexcept
Gets the promise's associated future.
Definition: future.hh:1917
future< T > current_exception_as_future() noexcept
Returns std::current_exception() wrapped in a future.
Definition: future.hh:1953
auto do_with(T1 &&rv1, T2 &&rv2, More &&... more) noexcept
Definition: do_with.hh:135
auto map_reduce(Iterator begin, Iterator end, Mapper &&mapper, Reducer &&r) -> typename reducer_traits< Reducer >::future_type
Definition: map_reduce.hh:107
future parallel_for_each(Iterator begin, Sentinel end, Func &&func) noexcept
Run tasks in parallel (iterator version).
Definition: loop.hh:565
foreign_ptr< T > make_foreign(T ptr)
Definition: sharded.hh:966
std::future< T > submit_to(instance &instance, unsigned shard, Func func)
Definition: alien.hh:206
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
SEASTAR_MODULE_EXPORT_BEGIN shard_id this_shard_id() noexcept
Returns shard_id of the of the current shard.
Definition: shard_id.hh:50
Definition: shared_ptr.hh:661
Definition: is_smart_ptr.hh:30
Options controlling the behaviour of smp::submit_to().
Definition: smp.hh:168