Seastar
High performance C++ framework for concurrent servers
sharded.hh
1 /*
2  * This file is open source software, licensed to you under the terms
3  * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4  * distributed with this work for additional information regarding copyright
5  * ownership. You may not use this file except in compliance with the License.
6  *
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an
13  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14  * KIND, either express or implied. See the License for the
15  * specific language governing permissions and limitations
16  * under the License.
17  */
18 /*
19  * Copyright (C) 2015 Cloudius Systems, Ltd.
20  */
21 
22 #pragma once
23 
24 #include <seastar/core/smp.hh>
25 #include <seastar/core/loop.hh>
26 #include <seastar/core/map_reduce.hh>
27 #include <seastar/util/is_smart_ptr.hh>
28 #include <seastar/util/tuple_utils.hh>
29 #include <seastar/core/do_with.hh>
30 #include <seastar/util/concepts.hh>
31 #include <boost/iterator/counting_iterator.hpp>
32 #include <functional>
33 #if __has_include(<concepts>)
34 #include <concepts>
35 #endif
36 
46 
47 namespace seastar {
48 
49 template <typename Func, typename... Param>
51 
52 namespace internal {
53 
54 template <typename Func, typename... Param>
55 auto unwrap_sharded_arg(sharded_parameter<Func, Param...> sp);
56 
57 using on_each_shard_func = std::function<future<> (unsigned shard)>;
58 
59 future<> sharded_parallel_for_each(unsigned nr_shards, on_each_shard_func on_each_shard) noexcept(std::is_nothrow_move_constructible_v<on_each_shard_func>);
60 
61 }
62 
65 
66 template <typename T>
67 class sharded;
68 
74 template<typename T>
76 protected:
77  std::function<void()> _delete_cb;
78  async_sharded_service() noexcept = default;
79  virtual ~async_sharded_service() {
80  if (_delete_cb) {
81  _delete_cb();
82  }
83  }
84  template <typename Service> friend class sharded;
85 };
86 
87 
93 template <typename Service>
95  sharded<Service>* _container = nullptr;
96 private:
97  template <typename T> friend class sharded;
98  void set_container(sharded<Service>* container) noexcept { _container = container; }
99 public:
100  peering_sharded_service() noexcept = default;
103  peering_sharded_service& operator=(const peering_sharded_service<Service>&) = delete;
104  sharded<Service>& container() noexcept { return *_container; }
105  const sharded<Service>& container() const noexcept { return *_container; }
106 };
107 
108 
110 class no_sharded_instance_exception : public std::exception {
111 public:
112  virtual const char* what() const noexcept override {
113  return "sharded instance does not exist";
114  }
115 };
116 
126 template <typename Service>
127 class sharded {
128  struct entry {
129  shared_ptr<Service> service;
130  promise<> freed;
131  };
132  std::vector<entry> _instances;
133 private:
134  using invoke_on_all_func_type = std::function<future<> (Service&)>;
135 private:
136  void service_deleted() noexcept {
137  _instances[this_shard_id()].freed.set_value();
138  }
139  template <typename U, bool async>
140  friend struct shared_ptr_make_helper;
141 
142  template <typename T>
143  std::enable_if_t<std::is_base_of<peering_sharded_service<T>, T>::value>
144  set_container(T& service) noexcept {
145  service.set_container(this);
146  }
147 
148  template <typename T>
149  std::enable_if_t<!std::is_base_of<peering_sharded_service<T>, T>::value>
150  set_container(T& service) noexcept {
151  }
152 
153  future<>
154  sharded_parallel_for_each(internal::on_each_shard_func func) noexcept(std::is_nothrow_move_constructible_v<internal::on_each_shard_func>) {
155  return internal::sharded_parallel_for_each(_instances.size(), std::move(func));
156  }
157 public:
160  sharded() noexcept {}
161  sharded(const sharded& other) = delete;
162  sharded& operator=(const sharded& other) = delete;
165  sharded(sharded&& other) = delete;
166  sharded& operator=(sharded&& other) = delete;
168  ~sharded();
169 
176  template <typename... Args>
177  future<> start(Args&&... args) noexcept;
178 
185  template <typename... Args>
186  future<> start_single(Args&&... args) noexcept;
187 
192  future<> stop() noexcept;
193 
202  future<> invoke_on_all(smp_submit_to_options options, std::function<future<> (Service&)> func) noexcept;
203 
209  future<> invoke_on_all(std::function<future<> (Service&)> func) noexcept {
210  try {
211  return invoke_on_all(smp_submit_to_options{}, std::move(func));
212  } catch (...) {
214  }
215  }
216 
231  template <typename Func, typename... Args>
232  SEASTAR_CONCEPT(requires std::invocable<Func, Service&, Args...>)
233  future<> invoke_on_all(smp_submit_to_options options, Func func, Args... args) noexcept;
234 
240  template <typename Func, typename... Args>
241  SEASTAR_CONCEPT(requires std::invocable<Func, Service&, Args...>)
242  future<> invoke_on_all(Func func, Args... args) noexcept {
243  try {
244  return invoke_on_all(smp_submit_to_options{}, std::move(func), std::move(args)...);
245  } catch (...) {
247  }
248  }
249 
260  template <typename Func, typename... Args>
261  SEASTAR_CONCEPT(requires std::invocable<Func, Service&, Args...>)
262  future<> invoke_on_others(smp_submit_to_options options, Func func, Args... args) noexcept;
263 
275  template <typename Func, typename... Args>
276  SEASTAR_CONCEPT(requires std::invocable<Func, Service&, Args...>)
277  future<> invoke_on_others(Func func, Args... args) noexcept {
278  try {
279  return invoke_on_others(smp_submit_to_options{}, std::move(func), std::move(args)...);
280  } catch (...) {
282  }
283  }
284 
289  template <typename Reducer, typename Ret, typename... FuncArgs, typename... Args>
290  inline
291  auto
292  map_reduce(Reducer&& r, Ret (Service::*func)(FuncArgs...), Args&&... args)
293  -> typename reducer_traits<Reducer>::future_type
294  {
295  return ::seastar::map_reduce(boost::make_counting_iterator<unsigned>(0),
296  boost::make_counting_iterator<unsigned>(_instances.size()),
297  [this, func, args = std::make_tuple(std::forward<Args>(args)...)] (unsigned c) mutable {
298  return smp::submit_to(c, [this, func, args] () mutable {
299  return std::apply([this, func] (Args&&... args) mutable {
300  auto inst = _instances[this_shard_id()].service;
301  if (inst) {
302  return ((*inst).*func)(std::forward<Args>(args)...);
303  } else {
304  throw no_sharded_instance_exception();
305  }
306  }, std::move(args));
307  });
308  }, std::forward<Reducer>(r));
309  }
310 
315  template <typename Reducer, typename Func>
316  inline
317  auto map_reduce(Reducer&& r, Func&& func) -> typename reducer_traits<Reducer>::future_type
318  {
319  return ::seastar::map_reduce(boost::make_counting_iterator<unsigned>(0),
320  boost::make_counting_iterator<unsigned>(_instances.size()),
321  [this, &func] (unsigned c) mutable {
322  return smp::submit_to(c, [this, func] () mutable {
323  auto inst = get_local_service();
324  return func(*inst);
325  });
326  }, std::forward<Reducer>(r));
327  }
328 
345  template <typename Mapper, typename Initial, typename Reduce>
346  inline
347  future<Initial>
348  map_reduce0(Mapper map, Initial initial, Reduce reduce) {
349  auto wrapped_map = [this, map] (unsigned c) {
350  return smp::submit_to(c, [this, map] {
351  auto inst = get_local_service();
352  return map(*inst);
353  });
354  };
355  return ::seastar::map_reduce(smp::all_cpus().begin(), smp::all_cpus().end(),
356  std::move(wrapped_map),
357  std::move(initial),
358  std::move(reduce));
359  }
360 
370  template <typename Mapper, typename Future = futurize_t<std::result_of_t<Mapper(Service&)>>, typename return_type = decltype(internal::untuple(std::declval<typename Future::tuple_type>()))>
371  inline future<std::vector<return_type>> map(Mapper mapper) {
372  return do_with(std::vector<return_type>(),
373  [&mapper, this] (std::vector<return_type>& vec) mutable {
374  vec.resize(smp::count);
375  return parallel_for_each(boost::irange<unsigned>(0, _instances.size()), [this, &vec, mapper] (unsigned c) {
376  return smp::submit_to(c, [this, mapper] {
377  auto inst = get_local_service();
378  return mapper(*inst);
379  }).then([&vec, c] (auto res) {
380  vec[c] = res;
381  });
382  }).then([&vec] {
383  return make_ready_future<std::vector<return_type>>(std::move(vec));
384  });
385  });
386  }
387 
400  template <typename Func, typename... Args, typename Ret = futurize_t<std::invoke_result_t<Func, Service&, Args...>>>
401  SEASTAR_CONCEPT(requires std::invocable<Func, Service&, Args&&...>)
402  Ret
403  invoke_on(unsigned id, smp_submit_to_options options, Func&& func, Args&&... args) {
404  return smp::submit_to(id, options, [this, func = std::forward<Func>(func), args = std::tuple(std::move(args)...)] () mutable {
405  auto inst = get_local_service();
406  return std::apply(std::forward<Func>(func), std::tuple_cat(std::forward_as_tuple(*inst), std::move(args)));
407  });
408  }
409 
418  template <typename Func, typename... Args, typename Ret = futurize_t<std::invoke_result_t<Func, Service&, Args&&...>>>
419  SEASTAR_CONCEPT(requires std::invocable<Func, Service&, Args&&...>)
420  Ret
421  invoke_on(unsigned id, Func&& func, Args&&... args) {
422  return invoke_on(id, smp_submit_to_options(), std::forward<Func>(func), std::forward<Args>(args)...);
423  }
424 
426  const Service& local() const noexcept;
427 
429  Service& local() noexcept;
430 
432  shared_ptr<Service> local_shared() noexcept;
433 
435  bool local_is_initialized() const noexcept;
436 
437 private:
438  void track_deletion(shared_ptr<Service>& s, std::false_type) noexcept {
439  // do not wait for instance to be deleted since it is not going to notify us
440  service_deleted();
441  }
442 
443  void track_deletion(shared_ptr<Service>& s, std::true_type) {
444  s->_delete_cb = std::bind(std::mem_fn(&sharded<Service>::service_deleted), this);
445  }
446 
447  template <typename... Args>
448  shared_ptr<Service> create_local_service(Args&&... args) {
449  auto s = ::seastar::make_shared<Service>(std::forward<Args>(args)...);
450  set_container(*s);
451  track_deletion(s, std::is_base_of<async_sharded_service<Service>, Service>());
452  return s;
453  }
454 
455  shared_ptr<Service> get_local_service() {
456  auto inst = _instances[this_shard_id()].service;
457  if (!inst) {
458  throw no_sharded_instance_exception();
459  }
460  return inst;
461  }
462 };
463 
464 namespace internal {
465 
466 template <typename T>
467 struct sharded_unwrap {
468  using type = T;
469 };
470 
471 template <typename T>
472 struct sharded_unwrap<std::reference_wrapper<sharded<T>>> {
473  using type = T&;
474 };
475 
476 template <typename T>
477 using sharded_unwrap_t = typename sharded_unwrap<T>::type;
478 
479 } // internal
480 
481 
486 template <typename Func, typename... Params>
487 class sharded_parameter {
488  Func _func;
489  std::tuple<Params...> _params;
490 public:
499  explicit sharded_parameter(Func func, Params... params)
500  SEASTAR_CONCEPT(requires std::invocable<Func, internal::sharded_unwrap_t<Params>...>)
501  : _func(std::move(func)), _params(std::make_tuple(std::move(params)...)) {
502  }
503 private:
504  auto evaluate() const;
505 
506  template <typename Func_, typename... Param_>
507  friend auto internal::unwrap_sharded_arg(sharded_parameter<Func_, Param_...> sp);
508 };
509 
513 
515 
516 template <typename Service>
518  assert(_instances.empty());
519 }
520 
521 namespace internal {
522 
523 template <typename Service>
524 class either_sharded_or_local {
525  sharded<Service>& _sharded;
526 public:
527  either_sharded_or_local(sharded<Service>& s) : _sharded(s) {}
528  operator sharded<Service>& () { return _sharded; }
529  operator Service& () { return _sharded.local(); }
530 };
531 
532 template <typename T>
533 inline
534 T&&
535 unwrap_sharded_arg(T&& arg) {
536  return std::forward<T>(arg);
537 }
538 
539 template <typename Service>
540 either_sharded_or_local<Service>
541 unwrap_sharded_arg(std::reference_wrapper<sharded<Service>> arg) {
542  return either_sharded_or_local<Service>(arg);
543 }
544 
545 template <typename Func, typename... Param>
546 auto
547 unwrap_sharded_arg(sharded_parameter<Func, Param...> sp) {
548  return sp.evaluate();
549 }
550 
551 }
552 
553 template <typename Func, typename... Param>
554 auto
555 sharded_parameter<Func, Param...>::evaluate() const {
556  auto unwrap_params_and_invoke = [this] (const auto&... params) {
557  return std::invoke(_func, internal::unwrap_sharded_arg(params)...);
558  };
559  return std::apply(unwrap_params_and_invoke, _params);
560 }
561 
562 template <typename Service>
563 template <typename... Args>
564 future<>
565 sharded<Service>::start(Args&&... args) noexcept {
566  try {
567  _instances.resize(smp::count);
568  return sharded_parallel_for_each(
569  [this, args = std::make_tuple(std::forward<Args>(args)...)] (unsigned c) mutable {
570  return smp::submit_to(c, [this, args] () mutable {
571  _instances[this_shard_id()].service = std::apply([this] (Args... args) {
572  return create_local_service(internal::unwrap_sharded_arg(std::forward<Args>(args))...);
573  }, args);
574  });
575  }).then_wrapped([this] (future<> f) {
576  try {
577  f.get();
578  return make_ready_future<>();
579  } catch (...) {
580  return this->stop().then([e = std::current_exception()] () mutable {
581  std::rethrow_exception(e);
582  });
583  }
584  });
585  } catch (...) {
587  }
588 }
589 
590 template <typename Service>
591 template <typename... Args>
592 future<>
593 sharded<Service>::start_single(Args&&... args) noexcept {
594  try {
595  assert(_instances.empty());
596  _instances.resize(1);
597  return smp::submit_to(0, [this, args = std::make_tuple(std::forward<Args>(args)...)] () mutable {
598  _instances[0].service = std::apply([this] (Args... args) {
599  return create_local_service(internal::unwrap_sharded_arg(std::forward<Args>(args))...);
600  }, args);
601  }).then_wrapped([this] (future<> f) {
602  try {
603  f.get();
604  return make_ready_future<>();
605  } catch (...) {
606  return this->stop().then([e = std::current_exception()] () mutable {
607  std::rethrow_exception(e);
608  });
609  }
610  });
611  } catch (...) {
613  }
614 }
615 
616 namespace internal {
617 
618 // Helper check if Service::stop exists
619 
620 struct sharded_has_stop {
621  // If a member names "stop" exists, try to call it, even if it doesn't
622  // have the correct signature. This is so that we don't ignore a function
623  // named stop() just because the signature is incorrect, and instead
624  // force the user to resolve the ambiguity.
625  template <typename Service>
626  constexpr static auto check(int) -> std::enable_if_t<(sizeof(&Service::stop) >= 0), bool> {
627  return true;
628  }
629 
630  // Fallback in case Service::stop doesn't exist.
631  template<typename>
632  static constexpr auto check(...) -> bool {
633  return false;
634  }
635 };
636 
637 template <bool stop_exists>
638 struct sharded_call_stop {
639  template <typename Service>
640  static future<> call(Service& instance);
641 };
642 
643 template <>
644 template <typename Service>
645 inline
646 future<> sharded_call_stop<true>::call(Service& instance) {
647  return instance.stop();
648 }
649 
650 template <>
651 template <typename Service>
652 inline
653 future<> sharded_call_stop<false>::call(Service& instance) {
654  return make_ready_future<>();
655 }
656 
657 template <typename Service>
658 inline
659 future<>
660 stop_sharded_instance(Service& instance) {
661  constexpr bool has_stop = internal::sharded_has_stop::check<Service>(0);
662  return internal::sharded_call_stop<has_stop>::call(instance);
663 }
664 
665 }
666 
667 template <typename Service>
668 future<>
670  try {
671  return sharded_parallel_for_each([this] (unsigned c) mutable {
672  return smp::submit_to(c, [this] () mutable {
673  auto inst = _instances[this_shard_id()].service;
674  if (!inst) {
675  return make_ready_future<>();
676  }
677  return internal::stop_sharded_instance(*inst);
678  });
679  }).then_wrapped([this] (future<> fut) {
680  return sharded_parallel_for_each([this] (unsigned c) {
681  return smp::submit_to(c, [this] {
682  if (_instances[this_shard_id()].service == nullptr) {
683  return make_ready_future<>();
684  }
685  _instances[this_shard_id()].service = nullptr;
686  return _instances[this_shard_id()].freed.get_future();
687  });
688  }).finally([this, fut = std::move(fut)] () mutable {
689  _instances.clear();
690  _instances = std::vector<sharded<Service>::entry>();
691  return std::move(fut);
692  });
693  });
694  } catch (...) {
696  }
697 }
698 
699 template <typename Service>
700 future<>
701 sharded<Service>::invoke_on_all(smp_submit_to_options options, std::function<future<> (Service&)> func) noexcept {
702  try {
703  return sharded_parallel_for_each([this, options, func = std::move(func)] (unsigned c) {
704  return smp::submit_to(c, options, [this, func] {
705  return func(*get_local_service());
706  });
707  });
708  } catch (...) {
710  }
711 }
712 
713 template <typename Service>
714 template <typename Func, typename... Args>
715 SEASTAR_CONCEPT(requires std::invocable<Func, Service&, Args...>)
716 inline
717 future<>
718 sharded<Service>::invoke_on_all(smp_submit_to_options options, Func func, Args... args) noexcept {
719  static_assert(std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, Args...>>, future<>>,
720  "invoke_on_all()'s func must return void or future<>");
721  try {
722  return invoke_on_all(options, invoke_on_all_func_type([func, args = std::tuple(std::move(args)...)] (Service& service) mutable {
723  return futurize_apply(func, std::tuple_cat(std::forward_as_tuple(service), args));
724  }));
725  } catch (...) {
727  }
728 }
729 
730 template <typename Service>
731 template <typename Func, typename... Args>
732 SEASTAR_CONCEPT(requires std::invocable<Func, Service&, Args...>)
733 inline
734 future<>
735 sharded<Service>::invoke_on_others(smp_submit_to_options options, Func func, Args... args) noexcept {
736  static_assert(std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, Args...>>, future<>>,
737  "invoke_on_others()'s func must return void or future<>");
738  try {
739  return invoke_on_all(options, [orig = this_shard_id(), func = std::move(func), args = std::tuple(std::move(args)...)] (Service& s) -> future<> {
740  return this_shard_id() == orig ? make_ready_future<>() : futurize_apply(func, std::tuple_cat(std::forward_as_tuple(s), args));;
741  });
742  } catch (...) {
744  }
745 }
746 
747 template <typename Service>
748 const Service& sharded<Service>::local() const noexcept {
749  assert(local_is_initialized());
750  return *_instances[this_shard_id()].service;
751 }
752 
753 template <typename Service>
754 Service& sharded<Service>::local() noexcept {
755  assert(local_is_initialized());
756  return *_instances[this_shard_id()].service;
757 }
758 
759 template <typename Service>
761  assert(local_is_initialized());
762  return _instances[this_shard_id()].service;
763 }
764 
765 template <typename Service>
766 inline bool sharded<Service>::local_is_initialized() const noexcept {
767  return _instances.size() > this_shard_id() &&
768  _instances[this_shard_id()].service;
769 }
770 
773 
797 template <typename PtrType>
798 class foreign_ptr {
799 private:
800  PtrType _value;
801  unsigned _cpu;
802 private:
803  void destroy(PtrType p, unsigned cpu) noexcept {
804  if (p && this_shard_id() != cpu) {
805  // `destroy()` is called from the destructor and other
806  // synchronous methods (like `reset()`), that have no way to
807  // wait for this future.
808  (void)smp::submit_to(cpu, [v = std::move(p)] () mutable {
809  // Destroy the contained pointer. We do this explicitly
810  // in the current shard, because the lambda is destroyed
811  // in the shard that submitted the task.
812  v = {};
813  });
814  }
815  }
816 public:
817  using element_type = typename std::pointer_traits<PtrType>::element_type;
818  using pointer = element_type*;
819 
821  foreign_ptr() noexcept(std::is_nothrow_default_constructible_v<PtrType>)
822  : _value(PtrType())
823  , _cpu(this_shard_id()) {
824  }
826  foreign_ptr(std::nullptr_t) noexcept(std::is_nothrow_default_constructible_v<foreign_ptr>) : foreign_ptr() {}
828  foreign_ptr(PtrType value) noexcept(std::is_nothrow_move_constructible_v<PtrType>)
829  : _value(std::move(value))
830  , _cpu(this_shard_id()) {
831  }
832  // The type is intentionally non-copyable because copies
833  // are expensive because each copy requires across-CPU call.
834  foreign_ptr(const foreign_ptr&) = delete;
836  foreign_ptr(foreign_ptr&& other) noexcept(std::is_nothrow_move_constructible_v<PtrType>) = default;
839  destroy(std::move(_value), _cpu);
840  }
842  future<foreign_ptr> copy() const noexcept {
843  return smp::submit_to(_cpu, [this] () mutable {
844  auto v = _value;
845  return make_foreign(std::move(v));
846  });
847  }
849  element_type& operator*() const noexcept(noexcept(*_value)) { return *_value; }
851  element_type* operator->() const noexcept(noexcept(&*_value)) { return &*_value; }
853  pointer get() const noexcept(noexcept(&*_value)) { return &*_value; }
858  unsigned get_owner_shard() const noexcept { return _cpu; }
860  operator bool() const noexcept(noexcept(static_cast<bool>(_value))) { return static_cast<bool>(_value); }
862  foreign_ptr& operator=(foreign_ptr&& other) noexcept(std::is_nothrow_move_constructible<PtrType>::value) {
863  destroy(std::move(_value), _cpu);
864  _value = std::move(other._value);
865  _cpu = other._cpu;
866  return *this;
867  }
873  PtrType release() noexcept(std::is_nothrow_default_constructible_v<PtrType>) {
874  return std::exchange(_value, {});
875  }
879  void reset(PtrType new_ptr) noexcept(std::is_nothrow_move_constructible_v<PtrType>) {
880  auto old_ptr = std::move(_value);
881  auto old_cpu = _cpu;
882 
883  _value = std::move(new_ptr);
884  _cpu = this_shard_id();
885 
886  destroy(std::move(old_ptr), old_cpu);
887  }
891  void reset(std::nullptr_t = nullptr) noexcept(std::is_nothrow_default_constructible_v<PtrType>) {
892  reset(PtrType());
893  }
894 };
895 
899 template <typename T>
901  return foreign_ptr<T>(std::move(ptr));
902 }
903 
905 
906 template<typename T>
907 struct is_smart_ptr<foreign_ptr<T>> : std::true_type {};
908 
909 }
seastar::shared_ptr< Service >
smp.hh
seastar::sharded::map_reduce
auto map_reduce(Reducer &&r, Ret(Service::*func)(FuncArgs...), Args &&... args) -> typename reducer_traits< Reducer >::future_type
Definition: sharded.hh:292
seastar
Seastar API namespace.
Definition: abort_on_ebadf.hh:24
seastar::current_exception_as_future
future< T... > current_exception_as_future() noexcept
Returns std::current_exception() wrapped in a future.
Definition: future.hh:2065
seastar::foreign_ptr::get_owner_shard
unsigned get_owner_shard() const noexcept
Definition: sharded.hh:858
seastar::this_shard_id
shard_id this_shard_id() noexcept
Returns shard_id of the of the current shard.
Definition: smp.hh:58
seastar::promise
promise - allows a future value to be made available at a later time.
Definition: future.hh:151
seastar::sharded::map_reduce
auto map_reduce(Reducer &&r, Func &&func) -> typename reducer_traits< Reducer >::future_type
Definition: sharded.hh:317
seastar::foreign_ptr::operator=
foreign_ptr & operator=(foreign_ptr &&other) noexcept(std::is_nothrow_move_constructible< PtrType >::value)
Move-assigns a foreign_ptr<>.
Definition: sharded.hh:862
seastar::sharded
Definition: sharded.hh:67
seastar::sharded::start
future start(Args &&... args) noexcept
Definition: sharded.hh:565
seastar::foreign_ptr::~foreign_ptr
~foreign_ptr()
Destroys the wrapped object on its original cpu.
Definition: sharded.hh:838
seastar::foreign_ptr::get
pointer get() const noexcept(noexcept(&*_value))
Access the raw pointer to the wrapped object.
Definition: sharded.hh:853
seastar::peering_sharded_service
Provide a sharded service with access to its peers.
Definition: sharded.hh:94
seastar::foreign_ptr::make_foreign
foreign_ptr< T > make_foreign(T ptr)
Definition: sharded.hh:900
seastar::smp_submit_to_options
Options controlling the behaviour of smp::submit_to().
Definition: smp.hh:144
seastar::sharded::invoke_on_all
future invoke_on_all(smp_submit_to_options options, std::function< future<>(Service &)> func) noexcept
Definition: sharded.hh:701
seastar::foreign_ptr::copy
future< foreign_ptr > copy() const noexcept
Creates a copy of this foreign ptr. Only works if the stored ptr is copyable.
Definition: sharded.hh:842
seastar::foreign_ptr::operator->
element_type * operator->() const noexcept(noexcept(&*_value))
Accesses the wrapped object.
Definition: sharded.hh:851
seastar::foreign_ptr::reset
void reset(PtrType new_ptr) noexcept(std::is_nothrow_move_constructible_v< PtrType >)
Definition: sharded.hh:879
seastar::is_smart_ptr
Definition: is_smart_ptr.hh:29
seastar::sharded::local
const Service & local() const noexcept
Gets a reference to the local instance.
Definition: sharded.hh:748
seastar::sharded::invoke_on
Ret invoke_on(unsigned id, smp_submit_to_options options, Func &&func, Args &&... args)
Definition: sharded.hh:403
seastar::foreign_ptr::foreign_ptr
foreign_ptr(PtrType value) noexcept(std::is_nothrow_move_constructible_v< PtrType >)
Wraps a pointer object and remembers the current core.
Definition: sharded.hh:828
seastar::do_with
auto do_with(T1 &&rv1, T2 &&rv2, More &&... more) noexcept
Definition: do_with.hh:129
seastar::sharded::stop
future stop() noexcept
Definition: sharded.hh:669
seastar::sharded::invoke_on
Ret invoke_on(unsigned id, Func &&func, Args &&... args)
Definition: sharded.hh:421
seastar::foreign_ptr::foreign_ptr
foreign_ptr() noexcept(std::is_nothrow_default_constructible_v< PtrType >)
Constructs a null foreign_ptr<>.
Definition: sharded.hh:821
seastar::sharded::invoke_on_others
future invoke_on_others(smp_submit_to_options options, Func func, Args... args) noexcept
Definition: sharded.hh:735
seastar::foreign_ptr::foreign_ptr
foreign_ptr(std::nullptr_t) noexcept(std::is_nothrow_default_constructible_v< foreign_ptr >)
Constructs a null foreign_ptr<>.
Definition: sharded.hh:826
seastar::sharded::~sharded
~sharded()
Destroyes a sharded object. Must not be in a started state.
Definition: sharded.hh:517
seastar::foreign_ptr::reset
void reset(std::nullptr_t=nullptr) noexcept(std::is_nothrow_default_constructible_v< PtrType >)
Definition: sharded.hh:891
seastar::map_reduce
auto map_reduce(Iterator begin, Iterator end, Mapper &&mapper, Reducer &&r) -> typename reducer_traits< Reducer >::future_type
Definition: map_reduce.hh:96
seastar::foreign_ptr::operator*
element_type & operator*() const noexcept(noexcept(*_value))
Accesses the wrapped object.
Definition: sharded.hh:849
seastar::sharded::sharded
sharded() noexcept
Definition: sharded.hh:160
seastar::parallel_for_each
future parallel_for_each(Iterator begin, Iterator end, Func &&func) noexcept
Run tasks in parallel (iterator version).
Definition: loop.hh:542
seastar::future
A representation of a possibly not-yet-computed value.
Definition: future.hh:154
seastar::foreign_ptr::release
PtrType release() noexcept(std::is_nothrow_default_constructible_v< PtrType >)
Definition: sharded.hh:873
seastar::foreign_ptr
Definition: sharded.hh:798
seastar::future::then
Result then(Func &&func) noexcept
Schedule a block of code to run when the future is ready.
Definition: future.hh:1513
seastar::enable_shared_from_this
Definition: shared_ptr.hh:71
seastar::sharded_parameter::sharded_parameter
sharded_parameter(Func func, Params... params)
Definition: sharded.hh:499
seastar::sharded_parameter
Helper to pass a parameter to a sharded<> object that depends on the shard. It is evaluated on the sh...
Definition: sharded.hh:50
seastar::sharded::map
future< std::vector< return_type > > map(Mapper mapper)
Definition: sharded.hh:371
seastar::no_sharded_instance_exception
Exception thrown when a sharded object does not exist.
Definition: sharded.hh:110
seastar::async_sharded_service
Definition: sharded.hh:75
seastar::sharded::start_single
future start_single(Args &&... args) noexcept
Definition: sharded.hh:593
seastar::sharded::map_reduce0
future< Initial > map_reduce0(Mapper map, Initial initial, Reduce reduce)
Definition: sharded.hh:348
seastar::future::get
value_type && get()
gets the value returned by the computation
Definition: future.hh:1440
seastar::alien::submit_to
std::future< T > submit_to(unsigned shard, Func func)
Definition: alien.hh:167