Seastar
High performance C++ framework for concurrent servers
execution_stage.hh
1 /*
2  * This file is open source software, licensed to you under the terms
3  * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4  * distributed with this work for additional information regarding copyright
5  * ownership. You may not use this file except in compliance with the License.
6  *
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an
13  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14  * KIND, either express or implied. See the License for the
15  * specific language governing permissions and limitations
16  * under the License.
17  */
18 /*
19  * Copyright (C) 2017 ScyllaDB Ltd.
20  */
21 
22 #pragma once
23 
24 #include <seastar/core/future.hh>
25 #include <seastar/core/chunked_fifo.hh>
26 #include <seastar/core/function_traits.hh>
27 #include <seastar/core/sstring.hh>
28 #include <seastar/core/metrics.hh>
30 #include <seastar/util/reference_wrapper.hh>
31 #include <seastar/util/concepts.hh>
32 #include <seastar/util/noncopyable_function.hh>
33 #include <seastar/util/tuple_utils.hh>
34 #include <seastar/util/std-compat.hh>
35 #include <fmt/format.h>
36 #include <fmt/ostream.h>
37 #include <vector>
38 #include <boost/range/irange.hpp>
39 #include <boost/range/adaptor/transformed.hpp>
40 #include <boost/container/static_vector.hpp>
41 
42 namespace seastar {
43 
72 
75 
77 namespace internal {
78 
79 // Execution wraps lreferences in reference_wrapper so that the caller is forced
80 // to use seastar::ref(). Then when the function is actually called the
81 // reference is unwrapped. However, we need to distinguish between functions
82 // which argument is lvalue reference and functions that take
83 // reference_wrapper<> as an argument and not unwrap the latter. To solve this
84 // issue reference_wrapper_for_es type is used for wrappings done automatically
85 // by execution stage.
86 template<typename T>
87 struct reference_wrapper_for_es : reference_wrapper<T> {
88  reference_wrapper_for_es(reference_wrapper <T> rw) noexcept
89  : reference_wrapper<T>(std::move(rw)) {}
90 };
91 
92 template<typename T>
93 struct wrap_for_es {
94  using type = T;
95 };
96 
97 template<typename T>
98 struct wrap_for_es<T&> {
99  using type = reference_wrapper_for_es<T>;
100 };
101 
102 template<typename T>
103 struct wrap_for_es<T&&> {
104  using type = T;
105 };
106 
107 template<typename T>
108 decltype(auto) unwrap_for_es(T&& object) {
109  return std::forward<T>(object);
110 }
111 
112 template<typename T>
113 std::reference_wrapper<T> unwrap_for_es(reference_wrapper_for_es<T> ref) {
114  return std::reference_wrapper<T>(ref.get());
115 }
116 
117 }
119 
122 public:
123  struct stats {
124  uint64_t tasks_scheduled = 0;
125  uint64_t tasks_preempted = 0;
126  uint64_t function_calls_enqueued = 0;
127  uint64_t function_calls_executed = 0;
128  };
129 protected:
130  bool _empty = true;
131  bool _flush_scheduled = false;
132  scheduling_group _sg;
133  stats _stats;
134  sstring _name;
135  metrics::metric_group _metric_group;
136 protected:
137  virtual void do_flush() noexcept = 0;
138 public:
139  explicit execution_stage(const sstring& name, scheduling_group sg = {});
140  virtual ~execution_stage();
141 
142  execution_stage(const execution_stage&) = delete;
143 
151 
153  const sstring& name() const noexcept { return _name; }
154 
156  const stats& get_stats() const noexcept { return _stats; }
157 
165  bool flush() noexcept;
166 
170  bool poll() const noexcept {
171  return !_empty;
172  }
173 };
174 
176 namespace internal {
177 
178 class execution_stage_manager {
179  std::vector<execution_stage*> _execution_stages;
180  std::unordered_map<sstring, execution_stage*> _stages_by_name;
181 private:
182  execution_stage_manager() = default;
183  execution_stage_manager(const execution_stage_manager&) = delete;
184  execution_stage_manager(execution_stage_manager&&) = delete;
185 public:
186  void register_execution_stage(execution_stage& stage);
187  void unregister_execution_stage(execution_stage& stage) noexcept;
188  void update_execution_stage_registration(execution_stage& old_es, execution_stage& new_es) noexcept;
189  execution_stage* get_stage(const sstring& name);
190  bool flush() noexcept;
191  bool poll() const noexcept;
192 public:
193  static execution_stage_manager& get() noexcept;
194 };
195 
196 }
198 
207 template<typename ReturnType, typename... Args>
208 SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible<std::tuple<Args...>>::value)
210  using args_tuple = std::tuple<Args...>;
211  static_assert(std::is_nothrow_move_constructible<args_tuple>::value,
212  "Function arguments need to be nothrow move constructible");
213 
214  static constexpr size_t flush_threshold = 128;
215  static constexpr size_t max_queue_length = 1024;
216 
217  using return_type = futurize_t<ReturnType>;
218  using promise_type = typename return_type::promise_type;
220 
221  struct work_item {
222  input_type _in;
223  promise_type _ready;
224 
225  work_item(typename internal::wrap_for_es<Args>::type... args) : _in(std::move(args)...) { }
226 
227  work_item(work_item&& other) = delete;
228  work_item(const work_item&) = delete;
229  work_item(work_item&) = delete;
230  };
232 
233  noncopyable_function<ReturnType (Args...)> _function;
234 private:
235  auto unwrap(input_type&& in) {
236  return tuple_map(std::move(in), [] (auto&& obj) {
237  return internal::unwrap_for_es(std::forward<decltype(obj)>(obj));
238  });
239  }
240 
241  virtual void do_flush() noexcept override {
242  while (!_queue.empty()) {
243  auto& wi = _queue.front();
244  auto wi_in = std::move(wi._in);
245  auto wi_ready = std::move(wi._ready);
246  _queue.pop_front();
247  futurize<ReturnType>::apply(_function, unwrap(std::move(wi_in))).forward_to(std::move(wi_ready));
248  _stats.function_calls_executed++;
249 
250  if (need_preempt()) {
251  _stats.tasks_preempted++;
252  break;
253  }
254  }
255  _empty = _queue.empty();
256  }
257 public:
258  explicit concrete_execution_stage(const sstring& name, scheduling_group sg, noncopyable_function<ReturnType (Args...)> f)
259  : execution_stage(name, sg)
260  , _function(std::move(f))
261  {
262  _queue.reserve(flush_threshold);
263  }
264  explicit concrete_execution_stage(const sstring& name, noncopyable_function<ReturnType (Args...)> f)
265  : concrete_execution_stage(name, scheduling_group(), std::move(f)) {
266  }
267 
289  return_type operator()(typename internal::wrap_for_es<Args>::type... args) {
290  if (_queue.size() >= max_queue_length) {
291  do_flush();
292  }
293  _queue.emplace_back(std::move(args)...);
294  _empty = false;
295  _stats.function_calls_enqueued++;
296  auto f = _queue.back()._ready.get_future();
297  flush();
298  return f;
299  }
300 };
301 
304 public:
306  scheduling_group sg;
308  };
309  using stats = boost::container::static_vector<per_scheduling_group_stats, max_scheduling_groups()>;
310 };
311 
320 template<typename ReturnType, typename... Args>
321 SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible<std::tuple<Args...>>::value)
323  using return_type = futurize_t<ReturnType>;
324  using args_tuple = std::tuple<Args...>;
325  using per_group_stage_type = concrete_execution_stage<ReturnType, Args...>;
326 
327  static_assert(std::is_nothrow_move_constructible<args_tuple>::value,
328  "Function arguments need to be nothrow move constructible");
329 
330  sstring _name;
331  noncopyable_function<ReturnType (Args...)> _function;
332  std::vector<std::optional<per_group_stage_type>> _stage_for_group{max_scheduling_groups()};
333 private:
334  per_group_stage_type make_stage_for_group(scheduling_group sg) {
335  // We can't use std::ref(function), because reference_wrapper decays to noncopyable_function& and
336  // that selects the noncopyable_function copy constructor. Use a lambda instead.
337  auto wrapped_function = [&_function = _function] (Args... args) {
338  return _function(std::forward<Args>(args)...);
339  };
340  auto name = fmt::format("{}.{}", _name, sg.name());
341  return per_group_stage_type(name, sg, wrapped_function);
342  }
343 public:
350  inheriting_concrete_execution_stage(const sstring& name, noncopyable_function<ReturnType (Args...)> f)
351  : _name(std::move(name)),_function(std::move(f)) {
352  }
353 
374  return_type operator()(typename internal::wrap_for_es<Args>::type... args) {
375  auto sg = current_scheduling_group();
376  auto sg_id = internal::scheduling_group_index(sg);
377  auto& slot = _stage_for_group[sg_id];
378  if (!slot) {
379  slot.emplace(make_stage_for_group(sg));
380  }
381  return (*slot)(std::move(args)...);
382  }
383 
391  inheriting_execution_stage::stats get_stats() const noexcept {
392  inheriting_execution_stage::stats summary;
393  for (unsigned sg_id = 0; sg_id != _stage_for_group.size(); ++sg_id) {
394  auto sg = internal::scheduling_group_from_index(sg_id);
395  if (_stage_for_group[sg_id]) {
396  summary.push_back({sg, _stage_for_group[sg_id]->get_stats()});
397  }
398  }
399  return summary;
400  }
401 };
402 
403 
405 namespace internal {
406 
407 template <typename Ret, typename ArgsTuple>
408 struct concrete_execution_stage_helper;
409 
410 template <typename Ret, typename... Args>
411 struct concrete_execution_stage_helper<Ret, std::tuple<Args...>> {
412  using type = concrete_execution_stage<Ret, Args...>;
413 };
414 
415 }
417 
449 template<typename Function>
450 auto make_execution_stage(const sstring& name, scheduling_group sg, Function&& fn) {
451  using traits = function_traits<Function>;
452  using ret_type = typename traits::return_type;
453  using args_as_tuple = typename traits::args_as_tuple;
454  using concrete_execution_stage = typename internal::concrete_execution_stage_helper<ret_type, args_as_tuple>::type;
455  return concrete_execution_stage(name, sg, std::forward<Function>(fn));
456 }
457 
488 template<typename Function>
489 auto make_execution_stage(const sstring& name, Function&& fn) {
490  return make_execution_stage(name, scheduling_group(), std::forward<Function>(fn));
491 }
492 
516 template<typename Ret, typename Object, typename... Args>
517 concrete_execution_stage<Ret, Object*, Args...>
518 make_execution_stage(const sstring& name, scheduling_group sg, Ret (Object::*fn)(Args...)) {
519  return concrete_execution_stage<Ret, Object*, Args...>(name, sg, std::mem_fn(fn));
520 }
521 
522 template<typename Ret, typename Object, typename... Args>
523 concrete_execution_stage<Ret, const Object*, Args...>
524 make_execution_stage(const sstring& name, scheduling_group sg, Ret (Object::*fn)(Args...) const) {
525  return concrete_execution_stage<Ret, const Object*, Args...>(name, sg, std::mem_fn(fn));
526 }
527 
528 template<typename Ret, typename Object, typename... Args>
529 concrete_execution_stage<Ret, Object*, Args...>
530 make_execution_stage(const sstring& name, Ret (Object::*fn)(Args...)) {
531  return make_execution_stage(name, scheduling_group(), fn);
532 }
533 
534 template<typename Ret, typename Object, typename... Args>
535 concrete_execution_stage<Ret, const Object*, Args...>
536 make_execution_stage(const sstring& name, Ret (Object::*fn)(Args...) const) {
537  return make_execution_stage(name, scheduling_group(), fn);
538 }
539 
541 
542 }
Concrete execution stage class.
Definition: execution_stage.hh:209
return_type operator()(typename internal::wrap_for_es< Args >::type... args)
Definition: execution_stage.hh:289
Base execution stage class.
Definition: execution_stage.hh:121
const sstring & name() const noexcept
Returns execution stage name.
Definition: execution_stage.hh:153
const stats & get_stats() const noexcept
Returns execution stage usage statistics.
Definition: execution_stage.hh:156
execution_stage(execution_stage &&)
bool flush() noexcept
bool poll() const noexcept
Definition: execution_stage.hh:170
Definition: execution_stage.hh:123
Concrete execution stage class, with support for automatic scheduling_group inheritance.
Definition: execution_stage.hh:322
inheriting_concrete_execution_stage(const sstring &name, noncopyable_function< ReturnType(Args...)> f)
Definition: execution_stage.hh:350
inheriting_execution_stage::stats get_stats() const noexcept
Definition: execution_stage.hh:391
return_type operator()(typename internal::wrap_for_es< Args >::type... args)
Definition: execution_stage.hh:374
Base class for execution stages with support for automatic scheduling_group inheritance.
Definition: execution_stage.hh:303
hold a single metric group Initialization is done in the constructor or with a call to add_group
Definition: metrics_registration.hh:155
Identifies function calls that are accounted as a group.
Definition: scheduling.hh:254
auto make_execution_stage(const sstring &name, scheduling_group sg, Function &&fn)
Definition: execution_stage.hh:450
auto tuple_map(const std::tuple< Elements... > &t, Function &&f)
Definition: tuple_utils.hh:139
reference_wrapper< T > ref(T &object) noexcept
Wraps reference in a reference_wrapper.
Definition: reference_wrapper.hh:62
Definition: tuple_utils.hh:96
header for metrics creation.
Seastar API namespace.
Definition: abort_on_ebadf.hh:24
scheduling_group current_scheduling_group() noexcept
Returns the current scheduling group.
Definition: scheduling.hh:342
Definition: noncopyable_function.hh:33
Definition: function_traits.hh:62
static type apply(Func &&func, std::tuple< FuncArgs... > &&args) noexcept