Seastar
High performance C++ framework for concurrent servers
smp.hh
Go to the documentation of this file.
1 /*
2  * This file is open source software, licensed to you under the terms
3  * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4  * distributed with this work for additional information regarding copyright
5  * ownership. You may not use this file except in compliance with the License.
6  *
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an
13  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14  * KIND, either express or implied. See the License for the
15  * specific language governing permissions and limitations
16  * under the License.
17  */
18 /*
19  * Copyright 2019 ScyllaDB
20  */
21 
22 #pragma once
23 
24 #include <seastar/core/future.hh>
25 #include <seastar/core/loop.hh>
26 #include <seastar/core/semaphore.hh>
27 #include <seastar/core/metrics.hh>
28 #include <seastar/core/posix.hh>
29 #include <seastar/core/reactor_config.hh>
30 #include <boost/lockfree/spsc_queue.hpp>
31 #include <boost/thread/barrier.hpp>
32 #include <boost/range/irange.hpp>
33 #include <boost/program_options.hpp>
34 #include <deque>
35 #include <thread>
36 
38 
39 namespace seastar {
40 
41 using shard_id = unsigned;
42 
43 class smp_service_group;
44 class reactor_backend_selector;
45 
46 namespace internal {
47 
48 unsigned smp_service_group_id(smp_service_group ssg) noexcept;
49 
50 inline shard_id* this_shard_id_ptr() noexcept {
51  static thread_local shard_id g_this_shard_id;
52  return &g_this_shard_id;
53 }
54 
55 }
56 
58 inline shard_id this_shard_id() noexcept {
59  return *internal::this_shard_id_ptr();
60 }
61 
69  unsigned max_nonlocal_requests = 0;
74  std::optional<sstring> group_name;
75 };
76 
98  unsigned _id;
99 private:
100  explicit smp_service_group(unsigned id) noexcept : _id(id) {}
101 
102  friend unsigned internal::smp_service_group_id(smp_service_group ssg) noexcept;
105 };
106 
107 inline
108 unsigned
109 internal::smp_service_group_id(smp_service_group ssg) noexcept {
110  return ssg._id;
111 }
112 
118 smp_service_group default_smp_service_group() noexcept;
119 
125 
131 
132 inline
134  return smp_service_group(0);
135 }
136 
137 using smp_timeout_clock = lowres_clock;
138 using smp_service_group_semaphore = basic_semaphore<named_semaphore_exception_factory, smp_timeout_clock>;
139 using smp_service_group_semaphore_units = semaphore_units<named_semaphore_exception_factory, smp_timeout_clock>;
140 
141 static constexpr smp_timeout_clock::time_point smp_no_timeout = smp_timeout_clock::time_point::max();
142 
150  smp_timeout_clock::time_point timeout = smp_no_timeout;
151 
152  smp_submit_to_options(smp_service_group service_group = default_smp_service_group(), smp_timeout_clock::time_point timeout = smp_no_timeout) noexcept
154  , timeout(timeout) {
155  }
156 };
157 
158 void init_default_smp_service_group(shard_id cpu);
159 
160 smp_service_group_semaphore& get_smp_service_groups_semaphore(unsigned ssg_id, shard_id t) noexcept;
161 
163  static constexpr size_t queue_length = 128;
164  static constexpr size_t batch_size = 16;
165  static constexpr size_t prefetch_cnt = 2;
166  struct work_item;
167  struct lf_queue_remote {
168  reactor* remote;
169  };
170  using lf_queue_base = boost::lockfree::spsc_queue<work_item*,
171  boost::lockfree::capacity<queue_length>>;
172  // use inheritence to control placement order
173  struct lf_queue : lf_queue_remote, lf_queue_base {
174  lf_queue(reactor* remote) : lf_queue_remote{remote} {}
175  void maybe_wakeup();
176  ~lf_queue();
177  };
178  lf_queue _pending;
179  lf_queue _completed;
180  struct alignas(seastar::cache_line_size) {
181  size_t _sent = 0;
182  size_t _compl = 0;
183  size_t _last_snt_batch = 0;
184  size_t _last_cmpl_batch = 0;
185  size_t _current_queue_length = 0;
186  };
187  // keep this between two structures with statistics
188  // this makes sure that they have at least one cache line
189  // between them, so hw prefetcher will not accidentally prefetch
190  // cache line used by another cpu.
191  metrics::metric_groups _metrics;
192  struct alignas(seastar::cache_line_size) {
193  size_t _received = 0;
194  size_t _last_rcv_batch = 0;
195  };
196  struct work_item : public task {
197  explicit work_item(smp_service_group ssg) : task(current_scheduling_group()), ssg(ssg) {}
198  smp_service_group ssg;
199  virtual ~work_item() {}
200  virtual void fail_with(std::exception_ptr) = 0;
201  void process();
202  virtual void complete() = 0;
203  };
204  template <typename Func>
205  struct async_work_item : work_item {
206  smp_message_queue& _queue;
207  Func _func;
208  using futurator = futurize<std::result_of_t<Func()>>;
209  using future_type = typename futurator::type;
210  using value_type = typename future_type::value_type;
211  std::optional<value_type> _result;
212  std::exception_ptr _ex; // if !_result
213  typename futurator::promise_type _promise; // used on local side
214  async_work_item(smp_message_queue& queue, smp_service_group ssg, Func&& func) : work_item(ssg), _queue(queue), _func(std::move(func)) {}
215  virtual void fail_with(std::exception_ptr ex) override {
216  _promise.set_exception(std::move(ex));
217  }
218  virtual task* waiting_task() noexcept override {
219  // FIXME: waiting_tasking across shards is not implemented. Unsynchronized task access is unsafe.
220  return nullptr;
221  }
222  virtual void run_and_dispose() noexcept override {
223  // _queue.respond() below forwards the continuation chain back to the
224  // calling shard.
225  (void)futurator::invoke(this->_func).then_wrapped([this] (auto f) {
226  if (f.failed()) {
227  _ex = f.get_exception();
228  } else {
229  _result = f.get();
230  }
231  _queue.respond(this);
232  });
233  // We don't delete the task here as the creator of the work item will
234  // delete it on the origin shard.
235  }
236  virtual void complete() override {
237  if (_result) {
238  _promise.set_value(std::move(*_result));
239  } else {
240  // FIXME: _ex was allocated on another cpu
241  _promise.set_exception(std::move(_ex));
242  }
243  }
244  future_type get_future() { return _promise.get_future(); }
245  };
246  union tx_side {
247  tx_side() {}
248  ~tx_side() {}
249  void init() { new (&a) aa; }
250  struct aa {
251  std::deque<work_item*> pending_fifo;
252  } a;
253  } _tx;
254  std::vector<work_item*> _completed_fifo;
255 public:
256  smp_message_queue(reactor* from, reactor* to);
258  template <typename Func>
259  futurize_t<std::result_of_t<Func()>> submit(shard_id t, smp_submit_to_options options, Func&& func) noexcept {
261  auto wi = std::make_unique<async_work_item<Func>>(*this, options.service_group, std::forward<Func>(func));
262  auto fut = wi->get_future();
263  submit_item(t, options.timeout, std::move(wi));
264  return fut;
265  }
266  void start(unsigned cpuid);
267  template<size_t PrefetchCnt, typename Func>
268  size_t process_queue(lf_queue& q, Func process);
269  size_t process_incoming();
270  size_t process_completions(shard_id t);
271  void stop();
272 private:
273  void work();
274  void submit_item(shard_id t, smp_timeout_clock::time_point timeout, std::unique_ptr<work_item> wi);
275  void respond(work_item* wi);
276  void move_pending();
277  void flush_request_batch();
278  void flush_response_batch();
279  bool has_unflushed_responses() const;
280  bool pure_poll_rx() const;
281  bool pure_poll_tx() const;
282 
283  friend class smp;
284 };
285 
286 class smp {
287  static std::vector<posix_thread> _threads;
288  static std::vector<std::function<void ()>> _thread_loops; // for dpdk
289  static std::optional<boost::barrier> _all_event_loops_done;
290  static std::vector<reactor*> _reactors;
291  struct qs_deleter {
292  void operator()(smp_message_queue** qs) const;
293  };
294  static std::unique_ptr<smp_message_queue*[], qs_deleter> _qs;
295  static std::thread::id _tmain;
296  static bool _using_dpdk;
297 
298  template <typename Func>
299  using returns_future = is_future<std::result_of_t<Func()>>;
300  template <typename Func>
301  using returns_void = std::is_same<std::result_of_t<Func()>, void>;
302 public:
303  static boost::program_options::options_description get_options_description();
304  static void register_network_stacks();
305  static void configure(boost::program_options::variables_map vm, reactor_config cfg = {});
306  static void cleanup();
307  static void cleanup_cpu();
308  static void arrive_at_event_loop_end();
309  static void join_all();
310  static bool main_thread() { return std::this_thread::get_id() == _tmain; }
311 
325  template <typename Func>
326  static futurize_t<std::result_of_t<Func()>> submit_to(unsigned t, smp_submit_to_options options, Func&& func) noexcept {
327  using ret_type = std::result_of_t<Func()>;
328  if (t == this_shard_id()) {
329  try {
331  // Non-deferring function, so don't worry about func lifetime
332  return futurize<ret_type>::invoke(std::forward<Func>(func));
333  } else if (std::is_lvalue_reference<Func>::value) {
334  // func is an lvalue, so caller worries about its lifetime
335  return futurize<ret_type>::invoke(func);
336  } else {
337  // Deferring call on rvalue function, make sure to preserve it across call
338  auto w = std::make_unique<std::decay_t<Func>>(std::move(func));
339  auto ret = futurize<ret_type>::invoke(*w);
340  return ret.finally([w = std::move(w)] {});
341  }
342  } catch (...) {
343  // Consistently return a failed future rather than throwing, to simplify callers
344  return futurize<std::result_of_t<Func()>>::make_exception_future(std::current_exception());
345  }
346  } else {
347  return _qs[t][this_shard_id()].submit(t, options, std::forward<Func>(func));
348  }
349  }
364  template <typename Func>
365  static futurize_t<std::result_of_t<Func()>> submit_to(unsigned t, Func&& func) noexcept {
366  return submit_to(t, default_smp_service_group(), std::forward<Func>(func));
367  }
368  static bool poll_queues();
369  static bool pure_poll_queues();
370  static boost::integer_range<unsigned> all_cpus() noexcept {
371  return boost::irange(0u, count);
372  }
381  template<typename Func>
382  SEASTAR_CONCEPT( requires std::is_nothrow_move_constructible_v<Func> )
383  static future<> invoke_on_all(smp_submit_to_options options, Func&& func) noexcept {
384  static_assert(std::is_same<future<>, typename futurize<std::result_of_t<Func()>>::type>::value, "bad Func signature");
385  static_assert(std::is_nothrow_move_constructible_v<Func>);
386  return parallel_for_each(all_cpus(), [options, &func] (unsigned id) {
387  return smp::submit_to(id, options, Func(func));
388  });
389  }
399  template<typename Func>
400  static future<> invoke_on_all(Func&& func) noexcept {
401  return invoke_on_all(smp_submit_to_options{}, std::forward<Func>(func));
402  }
412  template<typename Func>
413  SEASTAR_CONCEPT( requires std::is_nothrow_move_constructible_v<Func> )
414  static future<> invoke_on_others(unsigned cpu_id, smp_submit_to_options options, Func func) noexcept {
415  static_assert(std::is_same<future<>, typename futurize<std::result_of_t<Func()>>::type>::value, "bad Func signature");
416  static_assert(std::is_nothrow_move_constructible_v<Func>);
417  return parallel_for_each(all_cpus(), [cpu_id, options, func = std::move(func)] (unsigned id) {
418  return id != cpu_id ? smp::submit_to(id, options, func) : make_ready_future<>();
419  });
420  }
431  template<typename Func>
432  static future<> invoke_on_others(unsigned cpu_id, Func func) noexcept {
433  return invoke_on_others(cpu_id, smp_submit_to_options{}, std::move(func));
434  }
435 private:
436  static void start_all_queues();
437  static void pin(unsigned cpu_id);
438  static void allocate_reactor(unsigned id, reactor_backend_selector rbs, reactor_config cfg);
439  static void create_thread(std::function<void ()> thread_loop);
440 public:
441  static unsigned count;
442 };
443 
444 }
seastar::default_smp_service_group
smp_service_group default_smp_service_group() noexcept
Definition: smp.hh:133
seastar::futurize::invoke
static type invoke(Func &&func, FuncArgs &&... args) noexcept
seastar::smp_message_queue::tx_side::aa
Definition: smp.hh:250
seastar::smp_service_group::default_smp_service_group
friend smp_service_group default_smp_service_group() noexcept
Definition: smp.hh:133
seastar
Seastar API namespace.
Definition: abort_on_ebadf.hh:24
seastar::smp::invoke_on_all
static future invoke_on_all(smp_submit_to_options options, Func &&func) noexcept
Definition: smp.hh:383
seastar::this_shard_id
shard_id this_shard_id() noexcept
Returns shard_id of the of the current shard.
Definition: smp.hh:58
seastar::smp::submit_to
static futurize_t< std::result_of_t< Func()> > submit_to(unsigned t, smp_submit_to_options options, Func &&func) noexcept
Definition: smp.hh:326
seastar::smp_submit_to_options::timeout
smp_timeout_clock::time_point timeout
Definition: smp.hh:150
seastar::reactor
Definition: reactor.hh:187
seastar::is_future
Check whether a type is a future.
Definition: future.hh:1065
seastar::smp::invoke_on_all
static future invoke_on_all(Func &&func) noexcept
Definition: smp.hh:400
seastar::smp_service_group_config::group_name
std::optional< sstring > group_name
Definition: smp.hh:74
seastar::smp_service_group::create_smp_service_group
friend future< smp_service_group > create_smp_service_group(smp_service_group_config ssgc) noexcept
seastar::smp_service_group_config::max_nonlocal_requests
unsigned max_nonlocal_requests
Definition: smp.hh:69
seastar::make_exception_future
future< T... > make_exception_future(std::exception_ptr &&value) noexcept
Creates a future in an available, failed state.
Definition: future.hh:2054
metrics.hh
header for metrics creation.
seastar::destroy_smp_service_group
future destroy_smp_service_group(smp_service_group ssg) noexcept
seastar::smp_submit_to_options
Options controlling the behaviour of smp::submit_to().
Definition: smp.hh:144
seastar::smp_service_group
Definition: smp.hh:97
seastar::reactor_config
Definition: reactor_config.hh:32
seastar::smp_service_group_config
Definition: smp.hh:65
seastar::smp::submit_to
static futurize_t< std::result_of_t< Func()> > submit_to(unsigned t, Func &&func) noexcept
Definition: smp.hh:365
seastar::smp_submit_to_options::service_group
smp_service_group service_group
Controls resource allocation.
Definition: smp.hh:146
seastar::current_scheduling_group
scheduling_group current_scheduling_group() noexcept
Returns the current scheduling group.
Definition: scheduling.hh:339
seastar::smp::invoke_on_others
static future invoke_on_others(unsigned cpu_id, smp_submit_to_options options, Func func) noexcept
Definition: smp.hh:414
seastar::queue
Definition: queue.hh:35
seastar::memory::scoped_critical_alloc_section
Marks scopes that contain critical allocations.
Definition: critical_alloc_section.hh:55
posix.hh
seastar::futurize
Converts a type to a future type, if it isn't already.
Definition: future.hh:1956
seastar::parallel_for_each
future parallel_for_each(Iterator begin, Iterator end, Func &&func) noexcept
Run tasks in parallel (iterator version).
Definition: loop.hh:542
seastar::future
A representation of a possibly not-yet-computed value.
Definition: future.hh:1337
seastar::task
Definition: task.hh:30
seastar::smp_message_queue
Definition: smp.hh:162
seastar::smp
Definition: smp.hh:286
seastar::metrics::metric_groups
holds the metric definition.
Definition: metrics_registration.hh:89
seastar::smp::invoke_on_others
static future invoke_on_others(unsigned cpu_id, Func func) noexcept
Definition: smp.hh:432
seastar::create_smp_service_group
future< smp_service_group > create_smp_service_group(smp_service_group_config ssgc) noexcept