Seastar
High performance C++ framework for concurrent servers
sharded.hh
1 /*
2  * This file is open source software, licensed to you under the terms
3  * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4  * distributed with this work for additional information regarding copyright
5  * ownership. You may not use this file except in compliance with the License.
6  *
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an
13  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14  * KIND, either express or implied. See the License for the
15  * specific language governing permissions and limitations
16  * under the License.
17  */
18 /*
19  * Copyright (C) 2015 Cloudius Systems, Ltd.
20  */
21 
22 #pragma once
23 
24 #include <seastar/core/smp.hh>
25 #include <seastar/core/loop.hh>
26 #include <seastar/core/map_reduce.hh>
27 #include <seastar/util/is_smart_ptr.hh>
28 #include <seastar/util/tuple_utils.hh>
29 #include <seastar/core/do_with.hh>
30 #include <seastar/util/concepts.hh>
31 #include <seastar/util/log.hh>
32 #include <boost/iterator/counting_iterator.hpp>
33 #include <functional>
34 #if __has_include(<concepts>)
35 #include <concepts>
36 #endif
37 
47 
48 namespace seastar {
49 
50 template <typename Func, typename... Param>
51 class sharded_parameter;
52 
53 template <typename Service>
54 class sharded;
55 
56 namespace internal {
57 
58 template <typename Func, typename... Param>
59 auto unwrap_sharded_arg(sharded_parameter<Func, Param...> sp);
60 
61 using on_each_shard_func = std::function<future<> (unsigned shard)>;
62 
63 future<> sharded_parallel_for_each(unsigned nr_shards, on_each_shard_func on_each_shard) noexcept(std::is_nothrow_move_constructible_v<on_each_shard_func>);
64 
65 template <typename Service>
66 class either_sharded_or_local {
67  sharded<Service>& _sharded;
68 public:
69  either_sharded_or_local(sharded<Service>& s) : _sharded(s) {}
70  operator sharded<Service>& ();
71  operator Service& ();
72 };
73 
74 template <typename T>
75 struct sharded_unwrap {
76  using evaluated_type = T;
77  using type = T;
78 };
79 
80 template <typename T>
81 struct sharded_unwrap<std::reference_wrapper<sharded<T>>> {
82  using evaluated_type = T&;
83  using type = either_sharded_or_local<T>;
84 };
85 
86 template <typename Func, typename... Param>
87 struct sharded_unwrap<sharded_parameter<Func, Param...>> {
88  using type = std::invoke_result_t<Func, Param...>;
89 };
90 
91 template <typename T>
92 using sharded_unwrap_evaluated_t = typename sharded_unwrap<T>::evaluated_type;
93 
94 template <typename T>
95 using sharded_unwrap_t = typename sharded_unwrap<T>::type;
96 
97 } // internal
98 
99 
102 
103 template <typename T>
104 class sharded;
105 
111 template<typename T>
113 protected:
114  std::function<void()> _delete_cb;
115  async_sharded_service() noexcept = default;
116  virtual ~async_sharded_service() {
117  if (_delete_cb) {
118  _delete_cb();
119  }
120  }
121  template <typename Service> friend class sharded;
122 };
123 
124 
130 template <typename Service>
132  sharded<Service>* _container = nullptr;
133 private:
134  template <typename T> friend class sharded;
135  void set_container(sharded<Service>* container) noexcept { _container = container; }
136 public:
137  peering_sharded_service() noexcept = default;
140  peering_sharded_service& operator=(const peering_sharded_service<Service>&) = delete;
141  sharded<Service>& container() noexcept { return *_container; }
142  const sharded<Service>& container() const noexcept { return *_container; }
143 };
144 
145 
147 class no_sharded_instance_exception : public std::exception {
148  sstring _msg;
149 public:
150  no_sharded_instance_exception() : _msg("sharded instance does not exist") {}
151  explicit no_sharded_instance_exception(sstring type_info)
152  : _msg("sharded instance does not exist: " + type_info) {}
153  virtual const char* what() const noexcept override {
154  return _msg.c_str();
155  }
156 };
157 
167 template <typename Service>
168 class sharded {
169  struct entry {
170  shared_ptr<Service> service;
171  promise<> freed;
172  };
173  std::vector<entry> _instances;
174 private:
175  using invoke_on_all_func_type = std::function<future<> (Service&)>;
176 private:
177  void service_deleted() noexcept {
178  _instances[this_shard_id()].freed.set_value();
179  }
180  template <typename U, bool async>
181  friend struct shared_ptr_make_helper;
182 
183  template <typename T>
184  std::enable_if_t<std::is_base_of<peering_sharded_service<T>, T>::value>
185  set_container(T& service) noexcept {
186  service.set_container(this);
187  }
188 
189  template <typename T>
190  std::enable_if_t<!std::is_base_of<peering_sharded_service<T>, T>::value>
191  set_container(T&) noexcept {
192  }
193 
194  future<>
195  sharded_parallel_for_each(internal::on_each_shard_func func) noexcept(std::is_nothrow_move_constructible_v<internal::on_each_shard_func>) {
196  return internal::sharded_parallel_for_each(_instances.size(), std::move(func));
197  }
198 public:
201  sharded() noexcept {}
202  sharded(const sharded& other) = delete;
203  sharded& operator=(const sharded& other) = delete;
206  sharded(sharded&& other) = delete;
207  sharded& operator=(sharded&& other) = delete;
209  ~sharded();
210 
217  template <typename... Args>
218  future<> start(Args&&... args) noexcept;
219 
226  template <typename... Args>
227  future<> start_single(Args&&... args) noexcept;
228 
233  future<> stop() noexcept;
234 
243  future<> invoke_on_all(smp_submit_to_options options, std::function<future<> (Service&)> func) noexcept;
244 
250  future<> invoke_on_all(std::function<future<> (Service&)> func) noexcept {
251  try {
252  return invoke_on_all(smp_submit_to_options{}, std::move(func));
253  } catch (...) {
255  }
256  }
257 
272  template <typename Func, typename... Args>
273  SEASTAR_CONCEPT(requires std::invocable<Func, Service&, internal::sharded_unwrap_t<Args>...>)
274  future<> invoke_on_all(smp_submit_to_options options, Func func, Args... args) noexcept;
275 
281  template <typename Func, typename... Args>
282  SEASTAR_CONCEPT(requires std::invocable<Func, Service&, internal::sharded_unwrap_t<Args>...>)
283  future<> invoke_on_all(Func func, Args... args) noexcept {
284  try {
285  return invoke_on_all(smp_submit_to_options{}, std::move(func), std::move(args)...);
286  } catch (...) {
288  }
289  }
290 
301  template <typename Func, typename... Args>
302  SEASTAR_CONCEPT(requires std::invocable<Func, Service&, Args...>)
303  future<> invoke_on_others(smp_submit_to_options options, Func func, Args... args) noexcept;
304 
316  template <typename Func, typename... Args>
317  SEASTAR_CONCEPT(requires std::invocable<Func, Service&, Args...>)
318  future<> invoke_on_others(Func func, Args... args) noexcept {
319  try {
320  return invoke_on_others(smp_submit_to_options{}, std::move(func), std::move(args)...);
321  } catch (...) {
323  }
324  }
325 
330  template <typename Reducer, typename Func, typename... Args>
331  inline
332  auto map_reduce(Reducer&& r, Func&& func, Args&&... args) -> typename reducer_traits<Reducer>::future_type
333  {
334  return ::seastar::map_reduce(boost::make_counting_iterator<unsigned>(0),
335  boost::make_counting_iterator<unsigned>(_instances.size()),
336  [this, func = std::forward<Func>(func), args = std::make_tuple(std::forward<Args>(args)...)] (unsigned c) mutable {
337  return smp::submit_to(c, [this, &func, args] () mutable {
338  return std::apply([this, &func] (Args&&... args) mutable {
339  auto inst = get_local_service();
340  return std::invoke(func, *inst, std::forward<Args>(args)...);
341  }, std::move(args));
342  });
343  }, std::forward<Reducer>(r));
344  }
345 
347  template <typename Reducer, typename Func, typename... Args>
348  inline
349  auto map_reduce(Reducer&& r, Func&& func, Args&&... args) const -> typename reducer_traits<Reducer>::future_type
350  {
351  return ::seastar::map_reduce(boost::make_counting_iterator<unsigned>(0),
352  boost::make_counting_iterator<unsigned>(_instances.size()),
353  [this, func = std::forward<Func>(func), args = std::make_tuple(std::forward<Args>(args)...)] (unsigned c) {
354  return smp::submit_to(c, [this, &func, args] () {
355  return std::apply([this, &func] (Args&&... args) {
356  auto inst = get_local_service();
357  return std::invoke(func, *inst, std::forward<Args>(args)...);
358  }, std::move(args));
359  });
360  }, std::forward<Reducer>(r));
361  }
362 
379  template <typename Mapper, typename Initial, typename Reduce>
380  inline
381  future<Initial>
382  map_reduce0(Mapper map, Initial initial, Reduce reduce) {
383  auto wrapped_map = [this, map] (unsigned c) {
384  return smp::submit_to(c, [this, map] {
385  auto inst = get_local_service();
386  return std::invoke(map, *inst);
387  });
388  };
389  return ::seastar::map_reduce(smp::all_cpus().begin(), smp::all_cpus().end(),
390  std::move(wrapped_map),
391  std::move(initial),
392  std::move(reduce));
393  }
394 
396  template <typename Mapper, typename Initial, typename Reduce>
397  inline
399  map_reduce0(Mapper map, Initial initial, Reduce reduce) const {
400  auto wrapped_map = [this, map] (unsigned c) {
401  return smp::submit_to(c, [this, map] {
402  auto inst = get_local_service();
403  return std::invoke(map, *inst);
404  });
405  };
406  return ::seastar::map_reduce(smp::all_cpus().begin(), smp::all_cpus().end(),
407  std::move(wrapped_map),
408  std::move(initial),
409  std::move(reduce));
410  }
411 
421  template <typename Mapper, typename Future = futurize_t<std::invoke_result_t<Mapper,Service&>>, typename return_type = decltype(internal::untuple(std::declval<typename Future::tuple_type>()))>
422  inline future<std::vector<return_type>> map(Mapper mapper) {
423  return do_with(std::vector<return_type>(), std::move(mapper),
424  [this] (std::vector<return_type>& vec, Mapper& mapper) mutable {
425  vec.resize(_instances.size());
426  return parallel_for_each(boost::irange<unsigned>(0, _instances.size()), [this, &vec, &mapper] (unsigned c) {
427  return smp::submit_to(c, [this, &mapper] {
428  auto inst = get_local_service();
429  return mapper(*inst);
430  }).then([&vec, c] (auto&& res) {
431  vec[c] = std::move(res);
432  });
433  }).then([&vec] {
434  return make_ready_future<std::vector<return_type>>(std::move(vec));
435  });
436  });
437  }
438 
451  template <typename Func, typename... Args, typename Ret = futurize_t<std::invoke_result_t<Func, Service&, Args...>>>
452  SEASTAR_CONCEPT(requires std::invocable<Func, Service&, Args&&...>)
453  Ret
454  invoke_on(unsigned id, smp_submit_to_options options, Func&& func, Args&&... args) {
455  return smp::submit_to(id, options, [this, func = std::forward<Func>(func), args = std::tuple(std::move(args)...)] () mutable {
456  auto inst = get_local_service();
457  return std::apply(std::forward<Func>(func), std::tuple_cat(std::forward_as_tuple(*inst), std::move(args)));
458  });
459  }
460 
469  template <typename Func, typename... Args, typename Ret = futurize_t<std::invoke_result_t<Func, Service&, Args&&...>>>
470  SEASTAR_CONCEPT(requires std::invocable<Func, Service&, Args&&...>)
471  Ret
472  invoke_on(unsigned id, Func&& func, Args&&... args) {
473  return invoke_on(id, smp_submit_to_options(), std::forward<Func>(func), std::forward<Args>(args)...);
474  }
475 
477  const Service& local() const noexcept;
478 
480  Service& local() noexcept;
481 
483  shared_ptr<Service> local_shared() noexcept;
484 
486  bool local_is_initialized() const noexcept;
487 
488 private:
489  void track_deletion(shared_ptr<Service>&, std::false_type) noexcept {
490  // do not wait for instance to be deleted since it is not going to notify us
491  service_deleted();
492  }
493 
494  void track_deletion(shared_ptr<Service>& s, std::true_type) {
495  s->_delete_cb = std::bind(std::mem_fn(&sharded<Service>::service_deleted), this);
496  }
497 
498  template <typename... Args>
499  shared_ptr<Service> create_local_service(Args&&... args) {
500  auto s = ::seastar::make_shared<Service>(std::forward<Args>(args)...);
501  set_container(*s);
502  track_deletion(s, std::is_base_of<async_sharded_service<Service>, Service>());
503  return s;
504  }
505 
506  shared_ptr<Service> get_local_service() {
507  auto inst = _instances[this_shard_id()].service;
508  if (!inst) {
509  throw no_sharded_instance_exception(pretty_type_name(typeid(Service)));
510  }
511  return inst;
512  }
513 
514  shared_ptr<const Service> get_local_service() const {
515  auto inst = _instances[this_shard_id()].service;
516  if (!inst) {
517  throw no_sharded_instance_exception(pretty_type_name(typeid(Service)));
518  }
519  return inst;
520  }
521 };
522 
523 
528 template <typename Func, typename... Params>
530  Func _func;
531  std::tuple<Params...> _params;
532 public:
541  explicit sharded_parameter(Func func, Params... params)
542  SEASTAR_CONCEPT(requires std::invocable<Func, internal::sharded_unwrap_evaluated_t<Params>...>)
543  : _func(std::move(func)), _params(std::make_tuple(std::move(params)...)) {
544  }
545 private:
546  auto evaluate() const;
547 
548  template <typename Func_, typename... Param_>
549  friend auto internal::unwrap_sharded_arg(sharded_parameter<Func_, Param_...> sp);
550 };
551 
555 
557 
558 template <typename Service>
560  assert(_instances.empty());
561 }
562 
563 namespace internal {
564 
565 template <typename T>
566 inline
567 T&&
568 unwrap_sharded_arg(T&& arg) {
569  return std::forward<T>(arg);
570 }
571 
572 template <typename Service>
573 either_sharded_or_local<Service>
574 unwrap_sharded_arg(std::reference_wrapper<sharded<Service>> arg) {
575  return either_sharded_or_local<Service>(arg);
576 }
577 
578 template <typename Func, typename... Param>
579 auto
580 unwrap_sharded_arg(sharded_parameter<Func, Param...> sp) {
581  return sp.evaluate();
582 }
583 
584 template <typename Service>
585 either_sharded_or_local<Service>::operator sharded<Service>& () { return _sharded; }
586 
587 template <typename Service>
588 either_sharded_or_local<Service>::operator Service& () { return _sharded.local(); }
589 
590 }
591 
592 template <typename Func, typename... Param>
593 auto
594 sharded_parameter<Func, Param...>::evaluate() const {
595  auto unwrap_params_and_invoke = [this] (const auto&... params) {
596  return std::invoke(_func, internal::unwrap_sharded_arg(params)...);
597  };
598  return std::apply(unwrap_params_and_invoke, _params);
599 }
600 
601 template <typename Service>
602 template <typename... Args>
603 future<>
604 sharded<Service>::start(Args&&... args) noexcept {
605  try {
606  _instances.resize(smp::count);
607  return sharded_parallel_for_each(
608  [this, args = std::make_tuple(std::forward<Args>(args)...)] (unsigned c) mutable {
609  return smp::submit_to(c, [this, args] () mutable {
610  _instances[this_shard_id()].service = std::apply([this] (Args... args) {
611  return create_local_service(internal::unwrap_sharded_arg(std::forward<Args>(args))...);
612  }, args);
613  });
614  }).then_wrapped([this] (future<> f) {
615  try {
616  f.get();
617  return make_ready_future<>();
618  } catch (...) {
619  return this->stop().then([e = std::current_exception()] () mutable {
620  std::rethrow_exception(e);
621  });
622  }
623  });
624  } catch (...) {
626  }
627 }
628 
629 template <typename Service>
630 template <typename... Args>
631 future<>
632 sharded<Service>::start_single(Args&&... args) noexcept {
633  try {
634  assert(_instances.empty());
635  _instances.resize(1);
636  return smp::submit_to(0, [this, args = std::make_tuple(std::forward<Args>(args)...)] () mutable {
637  _instances[0].service = std::apply([this] (Args... args) {
638  return create_local_service(internal::unwrap_sharded_arg(std::forward<Args>(args))...);
639  }, args);
640  }).then_wrapped([this] (future<> f) {
641  try {
642  f.get();
643  return make_ready_future<>();
644  } catch (...) {
645  return this->stop().then([e = std::current_exception()] () mutable {
646  std::rethrow_exception(e);
647  });
648  }
649  });
650  } catch (...) {
652  }
653 }
654 
655 namespace internal {
656 
657 // Helper check if Service::stop exists
658 
659 struct sharded_has_stop {
660  // If a member names "stop" exists, try to call it, even if it doesn't
661  // have the correct signature. This is so that we don't ignore a function
662  // named stop() just because the signature is incorrect, and instead
663  // force the user to resolve the ambiguity.
664  template <typename Service>
665  constexpr static auto check(int) -> std::enable_if_t<(sizeof(&Service::stop) >= 0), bool> {
666  return true;
667  }
668 
669  // Fallback in case Service::stop doesn't exist.
670  template<typename>
671  static constexpr auto check(...) -> bool {
672  return false;
673  }
674 };
675 
676 template <bool stop_exists>
677 struct sharded_call_stop {
678  template <typename Service>
679  static future<> call(Service& instance);
680 };
681 
682 template <>
683 template <typename Service>
684 inline
685 future<> sharded_call_stop<true>::call(Service& instance) {
686  return instance.stop();
687 }
688 
689 template <>
690 template <typename Service>
691 inline
692 future<> sharded_call_stop<false>::call(Service&) {
693  return make_ready_future<>();
694 }
695 
696 template <typename Service>
697 inline
698 future<>
699 stop_sharded_instance(Service& instance) {
700  constexpr bool has_stop = internal::sharded_has_stop::check<Service>(0);
701  return internal::sharded_call_stop<has_stop>::call(instance);
702 }
703 
704 }
705 
706 template <typename Service>
707 future<>
709  try {
710  return sharded_parallel_for_each([this] (unsigned c) mutable {
711  return smp::submit_to(c, [this] () mutable {
712  auto inst = _instances[this_shard_id()].service;
713  if (!inst) {
714  return make_ready_future<>();
715  }
716  return internal::stop_sharded_instance(*inst);
717  });
718  }).then_wrapped([this] (future<> fut) {
719  return sharded_parallel_for_each([this] (unsigned c) {
720  return smp::submit_to(c, [this] {
721  if (_instances[this_shard_id()].service == nullptr) {
722  return make_ready_future<>();
723  }
724  _instances[this_shard_id()].service = nullptr;
725  return _instances[this_shard_id()].freed.get_future();
726  });
727  }).finally([this, fut = std::move(fut)] () mutable {
728  _instances.clear();
729  _instances = std::vector<sharded<Service>::entry>();
730  return std::move(fut);
731  });
732  });
733  } catch (...) {
735  }
736 }
737 
738 template <typename Service>
739 future<>
740 sharded<Service>::invoke_on_all(smp_submit_to_options options, std::function<future<> (Service&)> func) noexcept {
741  try {
742  return sharded_parallel_for_each([this, options, func = std::move(func)] (unsigned c) {
743  return smp::submit_to(c, options, [this, func] {
744  return func(*get_local_service());
745  });
746  });
747  } catch (...) {
749  }
750 }
751 
752 template <typename Service>
753 template <typename Func, typename... Args>
754 SEASTAR_CONCEPT(requires std::invocable<Func, Service&, internal::sharded_unwrap_t<Args>...>)
755 inline
756 future<>
757 sharded<Service>::invoke_on_all(smp_submit_to_options options, Func func, Args... args) noexcept {
758  static_assert(std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, internal::sharded_unwrap_t<Args>...>>, future<>>,
759  "invoke_on_all()'s func must return void or future<>");
760  try {
761  return invoke_on_all(options, invoke_on_all_func_type([func = std::move(func), args = std::tuple(std::move(args)...)] (Service& service) mutable {
762  return std::apply([&service, &func] (Args&&... args) mutable {
763  return futurize_apply(func, std::tuple_cat(std::forward_as_tuple(service), std::tuple(internal::unwrap_sharded_arg(std::forward<Args>(args))...)));
764  }, std::move(args));
765  }));
766  } catch (...) {
768  }
769 }
770 
771 template <typename Service>
772 template <typename Func, typename... Args>
773 SEASTAR_CONCEPT(requires std::invocable<Func, Service&, Args...>)
774 inline
775 future<>
776 sharded<Service>::invoke_on_others(smp_submit_to_options options, Func func, Args... args) noexcept {
777  static_assert(std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, Args...>>, future<>>,
778  "invoke_on_others()'s func must return void or future<>");
779  try {
780  return invoke_on_all(options, [orig = this_shard_id(), func = std::move(func), args = std::tuple(std::move(args)...)] (Service& s) -> future<> {
781  return this_shard_id() == orig ? make_ready_future<>() : futurize_apply(func, std::tuple_cat(std::forward_as_tuple(s), args));;
782  });
783  } catch (...) {
785  }
786 }
787 
788 template <typename Service>
789 const Service& sharded<Service>::local() const noexcept {
790  assert(local_is_initialized());
791  return *_instances[this_shard_id()].service;
792 }
793 
794 template <typename Service>
795 Service& sharded<Service>::local() noexcept {
796  assert(local_is_initialized());
797  return *_instances[this_shard_id()].service;
798 }
799 
800 template <typename Service>
802  assert(local_is_initialized());
803  return _instances[this_shard_id()].service;
804 }
805 
806 template <typename Service>
807 inline bool sharded<Service>::local_is_initialized() const noexcept {
808  return _instances.size() > this_shard_id() &&
809  _instances[this_shard_id()].service;
810 }
811 
814 
837 template <typename PtrType>
838 SEASTAR_CONCEPT( requires (!std::is_pointer<PtrType>::value) )
839 class foreign_ptr {
840 private:
841  PtrType _value;
842  unsigned _cpu;
843 private:
844  void destroy(PtrType p, unsigned cpu) noexcept {
845  // `destroy()` is called from the destructor and other
846  // synchronous methods (like `reset()`), that have no way to
847  // wait for this future.
848  (void)destroy_on(std::move(p), cpu);
849  }
850 
851  static future<> destroy_on(PtrType p, unsigned cpu) noexcept {
852  if (p) {
853  if (cpu != this_shard_id()) {
854  return smp::submit_to(cpu, [v = std::move(p)] () mutable {
855  // Destroy the contained pointer. We do this explicitly
856  // in the current shard, because the lambda is destroyed
857  // in the shard that submitted the task.
858  v = {};
859  });
860  } else {
861  p = {};
862  }
863  }
864  return make_ready_future<>();
865  }
866 public:
867  using element_type = typename std::pointer_traits<PtrType>::element_type;
868  using pointer = element_type*;
869 
871  foreign_ptr() noexcept(std::is_nothrow_default_constructible_v<PtrType>)
872  : _value(PtrType())
873  , _cpu(this_shard_id()) {
874  }
876  foreign_ptr(std::nullptr_t) noexcept(std::is_nothrow_default_constructible_v<foreign_ptr>) : foreign_ptr() {}
878  foreign_ptr(PtrType value) noexcept(std::is_nothrow_move_constructible_v<PtrType>)
879  : _value(std::move(value))
880  , _cpu(this_shard_id()) {
881  }
882  // The type is intentionally non-copyable because copies
883  // are expensive because each copy requires across-CPU call.
884  foreign_ptr(const foreign_ptr&) = delete;
886  foreign_ptr(foreign_ptr&& other) noexcept(std::is_nothrow_move_constructible_v<PtrType>) = default;
889  destroy(std::move(_value), _cpu);
890  }
892  future<foreign_ptr> copy() const noexcept {
893  return smp::submit_to(_cpu, [this] () mutable {
894  auto v = _value;
895  return make_foreign(std::move(v));
896  });
897  }
899  element_type& operator*() const noexcept(noexcept(*_value)) { return *_value; }
901  element_type* operator->() const noexcept(noexcept(&*_value)) { return &*_value; }
903  pointer get() const noexcept(noexcept(&*_value)) { return &*_value; }
908  unsigned get_owner_shard() const noexcept { return _cpu; }
910  operator bool() const noexcept(noexcept(static_cast<bool>(_value))) { return static_cast<bool>(_value); }
912  foreign_ptr& operator=(foreign_ptr&& other) noexcept(std::is_nothrow_move_constructible<PtrType>::value) {
913  destroy(std::move(_value), _cpu);
914  _value = std::move(other._value);
915  _cpu = other._cpu;
916  return *this;
917  }
923  PtrType release() noexcept(std::is_nothrow_default_constructible_v<PtrType>) {
924  return std::exchange(_value, {});
925  }
929  void reset(PtrType new_ptr) noexcept(std::is_nothrow_move_constructible_v<PtrType>) {
930  auto old_ptr = std::move(_value);
931  auto old_cpu = _cpu;
932 
933  _value = std::move(new_ptr);
934  _cpu = this_shard_id();
935 
936  destroy(std::move(old_ptr), old_cpu);
937  }
941  void reset(std::nullptr_t = nullptr) noexcept(std::is_nothrow_default_constructible_v<PtrType>) {
942  reset(PtrType());
943  }
944 
948  future<> destroy() noexcept {
949  return destroy_on(std::move(_value), _cpu);
950  }
951 };
952 
956 template <typename T>
958  return foreign_ptr<T>(std::move(ptr));
959 }
960 
962 
963 template<typename T>
964 struct is_smart_ptr<foreign_ptr<T>> : std::true_type {};
965 
966 }
Definition: sharded.hh:112
Definition: shared_ptr.hh:488
Definition: sharded.hh:839
pointer get() const noexcept(noexcept(&*_value))
Access the raw pointer to the wrapped object.
Definition: sharded.hh:903
foreign_ptr(std::nullptr_t) noexcept(std::is_nothrow_default_constructible_v< foreign_ptr >)
Constructs a null foreign_ptr<>.
Definition: sharded.hh:876
void reset(PtrType new_ptr) noexcept(std::is_nothrow_move_constructible_v< PtrType >)
Definition: sharded.hh:929
foreign_ptr & operator=(foreign_ptr &&other) noexcept(std::is_nothrow_move_constructible< PtrType >::value)
Move-assigns a foreign_ptr<>.
Definition: sharded.hh:912
unsigned get_owner_shard() const noexcept
Definition: sharded.hh:908
~foreign_ptr()
Destroys the wrapped object on its original cpu.
Definition: sharded.hh:888
foreign_ptr(PtrType value) noexcept(std::is_nothrow_move_constructible_v< PtrType >)
Wraps a pointer object and remembers the current core.
Definition: sharded.hh:878
future< foreign_ptr > copy() const noexcept
Creates a copy of this foreign ptr. Only works if the stored ptr is copyable.
Definition: sharded.hh:892
foreign_ptr(foreign_ptr &&other) noexcept(std::is_nothrow_move_constructible_v< PtrType >)=default
Moves a foreign_ptr<> to another object.
void reset(std::nullptr_t=nullptr) noexcept(std::is_nothrow_default_constructible_v< PtrType >)
Definition: sharded.hh:941
element_type * operator->() const noexcept(noexcept(&*_value))
Accesses the wrapped object.
Definition: sharded.hh:901
foreign_ptr() noexcept(std::is_nothrow_default_constructible_v< PtrType >)
Constructs a null foreign_ptr<>.
Definition: sharded.hh:871
future destroy() noexcept
Definition: sharded.hh:948
PtrType release() noexcept(std::is_nothrow_default_constructible_v< PtrType >)
Definition: sharded.hh:923
element_type & operator*() const noexcept(noexcept(*_value))
Accesses the wrapped object.
Definition: sharded.hh:899
Result then(Func &&func) noexcept
Schedule a block of code to run when the future is ready.
Definition: future.hh:1525
value_type && get()
gets the value returned by the computation
Definition: future.hh:1452
Exception thrown when a sharded object does not exist.
Definition: sharded.hh:147
Provide a sharded service with access to its peers.
Definition: sharded.hh:131
void set_value(A &&... a) noexcept
Sets the promises value.
Definition: future.hh:1027
Helper to pass a parameter to a sharded<> object that depends on the shard. It is evaluated on the sh...
Definition: sharded.hh:529
sharded_parameter(Func func, Params... params)
Definition: sharded.hh:541
Definition: sharded.hh:168
future< std::vector< return_type > > map(Mapper mapper)
Definition: sharded.hh:422
sharded(sharded &&other)=delete
~sharded()
Destroyes a sharded object. Must not be in a started state.
Definition: sharded.hh:559
Ret invoke_on(unsigned id, Func &&func, Args &&... args)
Definition: sharded.hh:472
future invoke_on_all(smp_submit_to_options options, std::function< future<>(Service &)> func) noexcept
Definition: sharded.hh:740
Ret invoke_on(unsigned id, smp_submit_to_options options, Func &&func, Args &&... args)
Definition: sharded.hh:454
future start_single(Args &&... args) noexcept
Definition: sharded.hh:632
auto map_reduce(Reducer &&r, Func &&func, Args &&... args) const -> typename reducer_traits< Reducer >::future_type
The const version of map_reduce(Reducer&& r, Func&& func)
Definition: sharded.hh:349
future invoke_on_others(smp_submit_to_options options, Func func, Args... args) noexcept
Definition: sharded.hh:776
sharded() noexcept
Definition: sharded.hh:201
future stop() noexcept
Definition: sharded.hh:708
auto map_reduce(Reducer &&r, Func &&func, Args &&... args) -> typename reducer_traits< Reducer >::future_type
Definition: sharded.hh:332
future< Initial > map_reduce0(Mapper map, Initial initial, Reduce reduce) const
The const version of map_reduce0(Mapper map, Initial initial, Reduce reduce)
Definition: sharded.hh:399
future< Initial > map_reduce0(Mapper map, Initial initial, Reduce reduce)
Definition: sharded.hh:382
future start(Args &&... args) noexcept
Definition: sharded.hh:604
future< T... > current_exception_as_future() noexcept
Returns std::current_exception() wrapped in a future.
Definition: future.hh:2077
auto do_with(T1 &&rv1, T2 &&rv2, More &&... more) noexcept
Definition: do_with.hh:129
auto map_reduce(Iterator begin, Iterator end, Mapper &&mapper, Reducer &&r) -> typename reducer_traits< Reducer >::future_type
Definition: map_reduce.hh:102
future parallel_for_each(Iterator begin, Sentinel end, Func &&func) noexcept
Run tasks in parallel (iterator version).
Definition: loop.hh:562
foreign_ptr< T > make_foreign(T ptr)
Definition: sharded.hh:957
std::future< T > submit_to(instance &instance, unsigned shard, Func func)
Definition: alien.hh:201
Seastar API namespace.
Definition: abort_on_ebadf.hh:24
shard_id this_shard_id() noexcept
Returns shard_id of the of the current shard.
Definition: smp.hh:64
Definition: shared_ptr.hh:647
Definition: is_smart_ptr.hh:29
Options controlling the behaviour of smp::submit_to().
Definition: smp.hh:154