Seastar
High performance C++ framework for concurrent servers
smp.hh
Go to the documentation of this file.
1 /*
2  * This file is open source software, licensed to you under the terms
3  * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4  * distributed with this work for additional information regarding copyright
5  * ownership. You may not use this file except in compliance with the License.
6  *
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an
13  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14  * KIND, either express or implied. See the License for the
15  * specific language governing permissions and limitations
16  * under the License.
17  */
18 /*
19  * Copyright 2019 ScyllaDB
20  */
21 
22 #pragma once
23 
24 #include <seastar/core/future.hh>
25 #include <seastar/core/loop.hh>
26 #include <seastar/core/semaphore.hh>
28 #include <seastar/core/posix.hh>
29 #include <seastar/core/reactor_config.hh>
30 #include <seastar/core/resource.hh>
31 #include <seastar/core/shard_id.hh>
32 #include <seastar/util/modules.hh>
33 
34 #ifndef SEASTAR_MODULE
35 #include <boost/lockfree/spsc_queue.hpp>
36 #include <boost/thread/barrier.hpp>
37 #include <boost/range/irange.hpp>
38 #include <deque>
39 #include <optional>
40 #include <thread>
41 #endif
42 
44 
45 namespace seastar {
46 
47 class reactor_backend_selector;
48 
49 SEASTAR_MODULE_EXPORT_BEGIN
50 
51 class smp_service_group;
52 
53 namespace alien {
54 
55 class instance;
56 
57 }
58 SEASTAR_MODULE_EXPORT_END
59 
60 namespace internal {
61 
62 unsigned smp_service_group_id(smp_service_group ssg) noexcept;
63 
64 class memory_prefaulter;
65 
66 }
67 
68 namespace memory::internal {
69 
70 struct numa_layout;
71 
72 }
73 
74 SEASTAR_MODULE_EXPORT_BEGIN
75 
83  unsigned max_nonlocal_requests = 0;
88  std::optional<sstring> group_name;
89 };
90 
112  unsigned _id;
113 #ifdef SEASTAR_DEBUG
114  unsigned _version = 0;
115 #endif
116 private:
117  explicit smp_service_group(unsigned id) noexcept : _id(id) {}
118 
119  friend unsigned internal::smp_service_group_id(smp_service_group ssg) noexcept;
123 };
124 
125 SEASTAR_MODULE_EXPORT_END
126 
127 inline
128 unsigned
129 internal::smp_service_group_id(smp_service_group ssg) noexcept {
130  return ssg._id;
131 }
132 
133 SEASTAR_MODULE_EXPORT_BEGIN
139 smp_service_group default_smp_service_group() noexcept;
140 
146 
152 
153 inline
155  return smp_service_group(0);
156 }
157 
161 
162 SEASTAR_MODULE_EXPORT_END
163 
164 static constexpr smp_timeout_clock::time_point smp_no_timeout = smp_timeout_clock::time_point::max();
165 
166 SEASTAR_MODULE_EXPORT_BEGIN
174  smp_timeout_clock::time_point timeout = smp_no_timeout;
175 
176  smp_submit_to_options(smp_service_group service_group = default_smp_service_group(), smp_timeout_clock::time_point timeout = smp_no_timeout) noexcept
178  , timeout(timeout) {
179  }
180 };
181 
182 void init_default_smp_service_group(shard_id cpu);
183 
184 smp_service_group_semaphore& get_smp_service_groups_semaphore(unsigned ssg_id, shard_id t) noexcept;
185 
187  static constexpr size_t queue_length = 128;
188  static constexpr size_t batch_size = 16;
189  static constexpr size_t prefetch_cnt = 2;
190  struct work_item;
191  struct lf_queue_remote {
192  reactor* remote;
193  };
194  using lf_queue_base = boost::lockfree::spsc_queue<work_item*,
195  boost::lockfree::capacity<queue_length>>;
196  // use inheritence to control placement order
197  struct lf_queue : lf_queue_remote, lf_queue_base {
198  lf_queue(reactor* remote) : lf_queue_remote{remote} {}
199  void maybe_wakeup();
200  ~lf_queue();
201  };
202  lf_queue _pending;
203  lf_queue _completed;
204  struct alignas(seastar::cache_line_size) {
205  size_t _sent = 0;
206  size_t _compl = 0;
207  size_t _last_snt_batch = 0;
208  size_t _last_cmpl_batch = 0;
209  size_t _current_queue_length = 0;
210  };
211  // keep this between two structures with statistics
212  // this makes sure that they have at least one cache line
213  // between them, so hw prefetcher will not accidentally prefetch
214  // cache line used by another cpu.
215  metrics::metric_groups _metrics;
216  struct alignas(seastar::cache_line_size) {
217  size_t _received = 0;
218  size_t _last_rcv_batch = 0;
219  };
220  struct work_item : public task {
221  explicit work_item(smp_service_group ssg) : task(current_scheduling_group()), ssg(ssg) {}
222  smp_service_group ssg;
223  virtual ~work_item() {}
224  virtual void fail_with(std::exception_ptr) = 0;
225  void process();
226  virtual void complete() = 0;
227  };
228  template <typename Func>
229  struct async_work_item : work_item {
230  smp_message_queue& _queue;
231  Func _func;
232  using futurator = futurize<std::invoke_result_t<Func>>;
233  using future_type = typename futurator::type;
234  using value_type = typename future_type::value_type;
235  std::optional<value_type> _result;
236  std::exception_ptr _ex; // if !_result
237  typename futurator::promise_type _promise; // used on local side
238  async_work_item(smp_message_queue& queue, smp_service_group ssg, Func&& func) : work_item(ssg), _queue(queue), _func(std::move(func)) {}
239  virtual void fail_with(std::exception_ptr ex) override {
240  _promise.set_exception(std::move(ex));
241  }
242  virtual task* waiting_task() noexcept override {
243  // FIXME: waiting_tasking across shards is not implemented. Unsynchronized task access is unsafe.
244  return nullptr;
245  }
246  virtual void run_and_dispose() noexcept override {
247  // _queue.respond() below forwards the continuation chain back to the
248  // calling shard.
249  (void)futurator::invoke(this->_func).then_wrapped([this] (auto f) {
250  if (f.failed()) {
251  _ex = f.get_exception();
252  } else {
253  _result = f.get();
254  }
255  _queue.respond(this);
256  });
257  // We don't delete the task here as the creator of the work item will
258  // delete it on the origin shard.
259  }
260  virtual void complete() override {
261  if (_result) {
262  _promise.set_value(std::move(*_result));
263  } else {
264  // FIXME: _ex was allocated on another cpu
265  _promise.set_exception(std::move(_ex));
266  }
267  }
268  future_type get_future() { return _promise.get_future(); }
269  };
270  union tx_side {
271  tx_side() {}
272  ~tx_side() {}
273  void init() { new (&a) aa; }
274  struct aa {
275  std::deque<work_item*> pending_fifo;
276  } a;
277  } _tx;
278  std::vector<work_item*> _completed_fifo;
279 public:
280  smp_message_queue(reactor* from, reactor* to);
282  template <typename Func>
283  futurize_t<std::invoke_result_t<Func>> submit(shard_id t, smp_submit_to_options options, Func&& func) noexcept {
285  auto wi = std::make_unique<async_work_item<Func>>(*this, options.service_group, std::forward<Func>(func));
286  auto fut = wi->get_future();
287  submit_item(t, options.timeout, std::move(wi));
288  return fut;
289  }
290  void start(unsigned cpuid);
291  template<size_t PrefetchCnt, typename Func>
292  size_t process_queue(lf_queue& q, Func process);
293  size_t process_incoming();
294  size_t process_completions(shard_id t);
295  void stop();
296 private:
297  void work();
298  void submit_item(shard_id t, smp_timeout_clock::time_point timeout, std::unique_ptr<work_item> wi);
299  void respond(work_item* wi);
300  void move_pending();
301  void flush_request_batch();
302  void flush_response_batch();
303  bool has_unflushed_responses() const;
304  bool pure_poll_rx() const;
305  bool pure_poll_tx() const;
306 
307  friend class smp;
308 };
309 
310 class smp_message_queue;
311 struct reactor_options;
312 struct smp_options;
313 
314 class smp : public std::enable_shared_from_this<smp> {
315  alien::instance& _alien;
316  std::vector<posix_thread> _threads;
317  std::vector<std::function<void ()>> _thread_loops; // for dpdk
318  std::optional<boost::barrier> _all_event_loops_done;
319  std::unique_ptr<internal::memory_prefaulter> _prefaulter;
320  struct qs_deleter {
321  void operator()(smp_message_queue** qs) const;
322  };
323  std::unique_ptr<smp_message_queue*[], qs_deleter> _qs_owner;
324  static thread_local smp_message_queue**_qs;
325  static thread_local std::thread::id _tmain;
326  bool _using_dpdk = false;
327 
328 private:
329  void setup_prefaulter(const seastar::resource::resources& res, seastar::memory::internal::numa_layout layout);
330 public:
331  explicit smp(alien::instance& alien);
332  ~smp();
333  void configure(const smp_options& smp_opts, const reactor_options& reactor_opts);
334  void cleanup() noexcept;
335  void cleanup_cpu();
336  void arrive_at_event_loop_end();
337  void join_all();
338  static bool main_thread() { return std::this_thread::get_id() == _tmain; }
339 
353  template <typename Func>
354  static futurize_t<std::invoke_result_t<Func>> submit_to(unsigned t, smp_submit_to_options options, Func&& func) noexcept {
355  using ret_type = std::invoke_result_t<Func>;
356  if (t == this_shard_id()) {
357  try {
359  // Non-deferring function, so don't worry about func lifetime
360  return futurize<ret_type>::invoke(std::forward<Func>(func));
361  } else if (std::is_lvalue_reference_v<Func>) {
362  // func is an lvalue, so caller worries about its lifetime
363  return futurize<ret_type>::invoke(func);
364  } else {
365  // Deferring call on rvalue function, make sure to preserve it across call
366  auto w = std::make_unique<std::decay_t<Func>>(std::move(func));
367  auto ret = futurize<ret_type>::invoke(*w);
368  return ret.finally([w = std::move(w)] {});
369  }
370  } catch (...) {
371  // Consistently return a failed future rather than throwing, to simplify callers
372  return futurize<std::invoke_result_t<Func>>::make_exception_future(std::current_exception());
373  }
374  } else {
375  return _qs[t][this_shard_id()].submit(t, options, std::forward<Func>(func));
376  }
377  }
392  template <typename Func>
393  static futurize_t<std::invoke_result_t<Func>> submit_to(unsigned t, Func&& func) noexcept {
394  return submit_to(t, default_smp_service_group(), std::forward<Func>(func));
395  }
396  static bool poll_queues();
397  static bool pure_poll_queues();
398  static boost::integer_range<unsigned> all_cpus() noexcept {
399  return boost::irange(0u, count);
400  }
409  template<typename Func>
410  SEASTAR_CONCEPT( requires std::is_nothrow_move_constructible_v<Func> )
411  static future<> invoke_on_all(smp_submit_to_options options, Func&& func) noexcept {
412  static_assert(std::is_same_v<future<>, typename futurize<std::invoke_result_t<Func>>::type>, "bad Func signature");
413  static_assert(std::is_nothrow_move_constructible_v<Func>);
414  return parallel_for_each(all_cpus(), [options, &func] (unsigned id) {
415  return smp::submit_to(id, options, Func(func));
416  });
417  }
427  template<typename Func>
428  static future<> invoke_on_all(Func&& func) noexcept {
429  return invoke_on_all(smp_submit_to_options{}, std::forward<Func>(func));
430  }
440  template<typename Func>
441  SEASTAR_CONCEPT( requires std::is_nothrow_move_constructible_v<Func> &&
442  std::is_nothrow_copy_constructible_v<Func> )
443  static future<> invoke_on_others(unsigned cpu_id, smp_submit_to_options options, Func func) noexcept {
444  static_assert(std::is_same_v<future<>, typename futurize<std::invoke_result_t<Func>>::type>, "bad Func signature");
445  static_assert(std::is_nothrow_move_constructible_v<Func>);
446  return parallel_for_each(all_cpus(), [cpu_id, options, func = std::move(func)] (unsigned id) {
447  return id != cpu_id ? smp::submit_to(id, options, Func(func)) : make_ready_future<>();
448  });
449  }
460  template<typename Func>
461  SEASTAR_CONCEPT( requires std::is_nothrow_move_constructible_v<Func> )
462  static future<> invoke_on_others(unsigned cpu_id, Func func) noexcept {
463  return invoke_on_others(cpu_id, smp_submit_to_options{}, std::move(func));
464  }
471  template<typename Func>
472  SEASTAR_CONCEPT( requires std::is_nothrow_move_constructible_v<Func> )
473  static future<> invoke_on_others(Func func) noexcept {
474  return invoke_on_others(this_shard_id(), std::move(func));
475  }
476 private:
477  void start_all_queues();
478  void pin(unsigned cpu_id);
479  void allocate_reactor(unsigned id, reactor_backend_selector rbs, reactor_config cfg);
480  void create_thread(std::function<void ()> thread_loop);
481  unsigned adjust_max_networking_aio_io_control_blocks(unsigned network_iocbs);
482 public:
483  static unsigned count;
484 };
485 
486 SEASTAR_MODULE_EXPORT_END
487 
488 }
Definition: alien.hh:120
Counted resource guard.
Definition: semaphore.hh:154
A representation of a possibly not-yet-computed value.
Definition: future.hh:1238
futurize_t< FuncResult > then_wrapped(Func &&func) &noexcept
Schedule a block of code to run when the future is ready, allowing for exception handling.
Definition: future.hh:1511
Low-resolution and efficient steady clock.
Definition: lowres_clock.hh:59
holds the metric definition.
Definition: metrics_registration.hh:94
Definition: queue.hh:44
Definition: reactor.hh:168
Definition: semaphore.hh:511
Definition: smp.hh:186
Definition: smp.hh:111
friend smp_service_group default_smp_service_group() noexcept
Definition: smp.hh:154
friend future destroy_smp_service_group(smp_service_group) noexcept
friend future< smp_service_group > create_smp_service_group(smp_service_group_config ssgc) noexcept
Definition: smp.hh:314
static futurize_t< std::invoke_result_t< Func > > submit_to(unsigned t, Func &&func) noexcept
Definition: smp.hh:393
static futurize_t< std::invoke_result_t< Func > > submit_to(unsigned t, smp_submit_to_options options, Func &&func) noexcept
Definition: smp.hh:354
static future invoke_on_all(Func &&func) noexcept
Definition: smp.hh:428
static future invoke_on_all(smp_submit_to_options options, Func &&func) noexcept
Definition: smp.hh:411
static future invoke_on_others(unsigned cpu_id, smp_submit_to_options options, Func func) noexcept
Definition: smp.hh:443
Definition: task.hh:35
future< T > make_exception_future(std::exception_ptr &&value) noexcept
Creates a future in an available, failed state.
Definition: future.hh:1940
future parallel_for_each(Iterator begin, Sentinel end, Func &&func) noexcept
Run tasks in parallel (iterator version).
Definition: loop.hh:565
holds the metric_groups definition needed by class that reports metrics
Definition: critical_alloc_section.hh:80
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
future< smp_service_group > create_smp_service_group(smp_service_group_config ssgc) noexcept
std::optional< sstring > group_name
Definition: smp.hh:88
scheduling_group current_scheduling_group() noexcept
Returns the current scheduling group.
Definition: scheduling.hh:400
SEASTAR_MODULE_EXPORT_BEGIN shard_id this_shard_id() noexcept
Returns shard_id of the of the current shard.
Definition: shard_id.hh:50
unsigned max_nonlocal_requests
Definition: smp.hh:83
smp_service_group default_smp_service_group() noexcept
Definition: smp.hh:154
future destroy_smp_service_group(smp_service_group ssg) noexcept
Converts a type to a future type, if it isn't already.
Definition: future.hh:1843
static type invoke(Func &&func, FuncArgs &&... args) noexcept
Check whether a type is a future.
Definition: future.hh:1024
Configuration for the reactor.
Definition: reactor_config.hh:45
Definition: resource.hh:129
Configuration for the multicore aspect of seastar.
Definition: smp_options.hh:47
Options controlling the behaviour of smp::submit_to().
Definition: smp.hh:168
smp_service_group service_group
Controls resource allocation.
Definition: smp.hh:170
smp_timeout_clock::time_point timeout
Definition: smp.hh:174