Seastar
High performance C++ framework for concurrent servers
execution_stage.hh
1 /*
2  * This file is open source software, licensed to you under the terms
3  * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4  * distributed with this work for additional information regarding copyright
5  * ownership. You may not use this file except in compliance with the License.
6  *
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an
13  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14  * KIND, either express or implied. See the License for the
15  * specific language governing permissions and limitations
16  * under the License.
17  */
18 /*
19  * Copyright (C) 2017 ScyllaDB Ltd.
20  */
21 
22 #pragma once
23 
24 #include <seastar/core/future.hh>
25 #include <seastar/core/chunked_fifo.hh>
26 #include <seastar/core/function_traits.hh>
27 #include <seastar/core/sstring.hh>
28 #include <seastar/core/metrics.hh>
30 #include <seastar/util/reference_wrapper.hh>
31 #include <seastar/util/concepts.hh>
32 #include <seastar/util/noncopyable_function.hh>
33 #include <seastar/util/tuple_utils.hh>
34 #include <seastar/util/std-compat.hh>
35 #include <seastar/util/modules.hh>
36 #ifndef SEASTAR_MODULE
37 #include <fmt/format.h>
38 #include <vector>
39 #include <boost/range/irange.hpp>
40 #include <boost/range/adaptor/transformed.hpp>
41 #include <boost/container/static_vector.hpp>
42 #endif
43 
44 namespace seastar {
45 
74 
77 
79 namespace internal {
80 
81 // Execution wraps lreferences in reference_wrapper so that the caller is forced
82 // to use seastar::ref(). Then when the function is actually called the
83 // reference is unwrapped. However, we need to distinguish between functions
84 // which argument is lvalue reference and functions that take
85 // reference_wrapper<> as an argument and not unwrap the latter. To solve this
86 // issue reference_wrapper_for_es type is used for wrappings done automatically
87 // by execution stage.
88 template<typename T>
89 struct reference_wrapper_for_es : reference_wrapper<T> {
90  reference_wrapper_for_es(reference_wrapper <T> rw) noexcept
91  : reference_wrapper<T>(std::move(rw)) {}
92 };
93 
94 template<typename T>
95 struct wrap_for_es {
96  using type = T;
97 };
98 
99 template<typename T>
100 struct wrap_for_es<T&> {
101  using type = reference_wrapper_for_es<T>;
102 };
103 
104 template<typename T>
105 struct wrap_for_es<T&&> {
106  using type = T;
107 };
108 
109 template<typename T>
110 decltype(auto) unwrap_for_es(T&& object) {
111  return std::forward<T>(object);
112 }
113 
114 template<typename T>
115 std::reference_wrapper<T> unwrap_for_es(reference_wrapper_for_es<T> ref) {
116  return std::reference_wrapper<T>(ref.get());
117 }
118 
119 }
121 
123 SEASTAR_MODULE_EXPORT
125 public:
126  struct stats {
127  uint64_t tasks_scheduled = 0;
128  uint64_t tasks_preempted = 0;
129  uint64_t function_calls_enqueued = 0;
130  uint64_t function_calls_executed = 0;
131  };
132 protected:
133  bool _empty = true;
134  bool _flush_scheduled = false;
135  scheduling_group _sg;
136  stats _stats;
137  sstring _name;
138  metrics::metric_group _metric_group;
139 protected:
140  virtual void do_flush() noexcept = 0;
141 public:
142  explicit execution_stage(const sstring& name, scheduling_group sg = {});
143  virtual ~execution_stage();
144 
145  execution_stage(const execution_stage&) = delete;
146 
154 
156  const sstring& name() const noexcept { return _name; }
157 
159  const stats& get_stats() const noexcept { return _stats; }
160 
168  bool flush() noexcept;
169 
173  bool poll() const noexcept {
174  return !_empty;
175  }
176 };
177 
179 namespace internal {
180 
181 class execution_stage_manager {
182  std::vector<execution_stage*> _execution_stages;
183  std::unordered_map<sstring, execution_stage*> _stages_by_name;
184 private:
185  execution_stage_manager() = default;
186  execution_stage_manager(const execution_stage_manager&) = delete;
187  execution_stage_manager(execution_stage_manager&&) = delete;
188 public:
189  void register_execution_stage(execution_stage& stage);
190  void unregister_execution_stage(execution_stage& stage) noexcept;
191  void update_execution_stage_registration(execution_stage& old_es, execution_stage& new_es) noexcept;
192  execution_stage* get_stage(const sstring& name);
193  bool flush() noexcept;
194  bool poll() const noexcept;
195 public:
196  static execution_stage_manager& get() noexcept;
197 };
198 
199 }
201 
210 template<typename ReturnType, typename... Args>
211 SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<std::tuple<Args...>>)
213  using args_tuple = std::tuple<Args...>;
214  static_assert(std::is_nothrow_move_constructible_v<args_tuple>,
215  "Function arguments need to be nothrow move constructible");
216 
217  static constexpr size_t flush_threshold = 128;
218  static constexpr size_t max_queue_length = 1024;
219 
220  using return_type = futurize_t<ReturnType>;
221  using promise_type = typename return_type::promise_type;
223 
224  struct work_item {
225  input_type _in;
226  promise_type _ready;
227 
228  work_item(typename internal::wrap_for_es<Args>::type... args) : _in(std::move(args)...) { }
229 
230  work_item(work_item&& other) = delete;
231  work_item(const work_item&) = delete;
232  work_item(work_item&) = delete;
233  };
235 
236  noncopyable_function<ReturnType (Args...)> _function;
237 private:
238  auto unwrap(input_type&& in) {
239  return tuple_map(std::move(in), [] (auto&& obj) {
240  return internal::unwrap_for_es(std::forward<decltype(obj)>(obj));
241  });
242  }
243 
244  virtual void do_flush() noexcept override {
245  while (!_queue.empty()) {
246  auto& wi = _queue.front();
247  auto wi_in = std::move(wi._in);
248  auto wi_ready = std::move(wi._ready);
249  _queue.pop_front();
250  futurize<ReturnType>::apply(_function, unwrap(std::move(wi_in))).forward_to(std::move(wi_ready));
251  _stats.function_calls_executed++;
252 
253  if (need_preempt()) {
254  _stats.tasks_preempted++;
255  break;
256  }
257  }
258  _empty = _queue.empty();
259  }
260 public:
261  explicit concrete_execution_stage(const sstring& name, scheduling_group sg, noncopyable_function<ReturnType (Args...)> f)
262  : execution_stage(name, sg)
263  , _function(std::move(f))
264  {
265  _queue.reserve(flush_threshold);
266  }
267  explicit concrete_execution_stage(const sstring& name, noncopyable_function<ReturnType (Args...)> f)
268  : concrete_execution_stage(name, scheduling_group(), std::move(f)) {
269  }
270 
292  return_type operator()(typename internal::wrap_for_es<Args>::type... args) {
293  if (_queue.size() >= max_queue_length) {
294  do_flush();
295  }
296  _queue.emplace_back(std::move(args)...);
297  _empty = false;
298  _stats.function_calls_enqueued++;
299  auto f = _queue.back()._ready.get_future();
300  flush();
301  return f;
302  }
303 };
304 
307 public:
309  scheduling_group sg;
311  };
312  using stats = boost::container::static_vector<per_scheduling_group_stats, max_scheduling_groups()>;
313 };
314 
323 template<typename ReturnType, typename... Args>
324 SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<std::tuple<Args...>>)
326  using return_type = futurize_t<ReturnType>;
327  using args_tuple = std::tuple<Args...>;
328  using per_group_stage_type = concrete_execution_stage<ReturnType, Args...>;
329 
330  static_assert(std::is_nothrow_move_constructible_v<args_tuple>,
331  "Function arguments need to be nothrow move constructible");
332 
333  sstring _name;
334  noncopyable_function<ReturnType (Args...)> _function;
335  std::vector<std::optional<per_group_stage_type>> _stage_for_group{max_scheduling_groups()};
336 private:
337  per_group_stage_type make_stage_for_group(scheduling_group sg) {
338  // We can't use std::ref(function), because reference_wrapper decays to noncopyable_function& and
339  // that selects the noncopyable_function copy constructor. Use a lambda instead.
340  auto wrapped_function = [&_function = _function] (Args... args) {
341  return _function(std::forward<Args>(args)...);
342  };
343  auto name = fmt::format("{}.{}", _name, sg.name());
344  return per_group_stage_type(name, sg, wrapped_function);
345  }
346 public:
353  inheriting_concrete_execution_stage(const sstring& name, noncopyable_function<ReturnType (Args...)> f)
354  : _name(std::move(name)),_function(std::move(f)) {
355  }
356 
377  return_type operator()(typename internal::wrap_for_es<Args>::type... args) {
378  auto sg = current_scheduling_group();
379  auto sg_id = internal::scheduling_group_index(sg);
380  auto& slot = _stage_for_group[sg_id];
381  if (!slot) {
382  slot.emplace(make_stage_for_group(sg));
383  }
384  return (*slot)(std::move(args)...);
385  }
386 
394  inheriting_execution_stage::stats get_stats() const noexcept {
395  inheriting_execution_stage::stats summary;
396  for (unsigned sg_id = 0; sg_id != _stage_for_group.size(); ++sg_id) {
397  auto sg = internal::scheduling_group_from_index(sg_id);
398  if (_stage_for_group[sg_id]) {
399  summary.push_back({sg, _stage_for_group[sg_id]->get_stats()});
400  }
401  }
402  return summary;
403  }
404 };
405 
406 
408 namespace internal {
409 
410 template <typename Ret, typename ArgsTuple>
411 struct concrete_execution_stage_helper;
412 
413 template <typename Ret, typename... Args>
414 struct concrete_execution_stage_helper<Ret, std::tuple<Args...>> {
415  using type = concrete_execution_stage<Ret, Args...>;
416 };
417 
418 }
420 
452 SEASTAR_MODULE_EXPORT
453 template<typename Function>
454 auto make_execution_stage(const sstring& name, scheduling_group sg, Function&& fn) {
455  using traits = function_traits<Function>;
456  using ret_type = typename traits::return_type;
457  using args_as_tuple = typename traits::args_as_tuple;
458  using concrete_execution_stage = typename internal::concrete_execution_stage_helper<ret_type, args_as_tuple>::type;
459  return concrete_execution_stage(name, sg, std::forward<Function>(fn));
460 }
461 
492 SEASTAR_MODULE_EXPORT
493 template<typename Function>
494 auto make_execution_stage(const sstring& name, Function&& fn) {
495  return make_execution_stage(name, scheduling_group(), std::forward<Function>(fn));
496 }
497 
521 SEASTAR_MODULE_EXPORT
522 template<typename Ret, typename Object, typename... Args>
523 concrete_execution_stage<Ret, Object*, Args...>
524 make_execution_stage(const sstring& name, scheduling_group sg, Ret (Object::*fn)(Args...)) {
525  return concrete_execution_stage<Ret, Object*, Args...>(name, sg, std::mem_fn(fn));
526 }
527 
528 template<typename Ret, typename Object, typename... Args>
529 concrete_execution_stage<Ret, const Object*, Args...>
530 make_execution_stage(const sstring& name, scheduling_group sg, Ret (Object::*fn)(Args...) const) {
531  return concrete_execution_stage<Ret, const Object*, Args...>(name, sg, std::mem_fn(fn));
532 }
533 
534 template<typename Ret, typename Object, typename... Args>
535 concrete_execution_stage<Ret, Object*, Args...>
536 make_execution_stage(const sstring& name, Ret (Object::*fn)(Args...)) {
537  return make_execution_stage(name, scheduling_group(), fn);
538 }
539 
540 template<typename Ret, typename Object, typename... Args>
541 concrete_execution_stage<Ret, const Object*, Args...>
542 make_execution_stage(const sstring& name, Ret (Object::*fn)(Args...) const) {
543  return make_execution_stage(name, scheduling_group(), fn);
544 }
545 
547 
548 }
Concrete execution stage class.
Definition: execution_stage.hh:212
return_type operator()(typename internal::wrap_for_es< Args >::type... args)
Definition: execution_stage.hh:292
Base execution stage class.
Definition: execution_stage.hh:124
const sstring & name() const noexcept
Returns execution stage name.
Definition: execution_stage.hh:156
const stats & get_stats() const noexcept
Returns execution stage usage statistics.
Definition: execution_stage.hh:159
execution_stage(execution_stage &&)
bool flush() noexcept
bool poll() const noexcept
Definition: execution_stage.hh:173
Definition: execution_stage.hh:126
Concrete execution stage class, with support for automatic scheduling_group inheritance.
Definition: execution_stage.hh:325
inheriting_concrete_execution_stage(const sstring &name, noncopyable_function< ReturnType(Args...)> f)
Definition: execution_stage.hh:353
inheriting_execution_stage::stats get_stats() const noexcept
Definition: execution_stage.hh:394
return_type operator()(typename internal::wrap_for_es< Args >::type... args)
Definition: execution_stage.hh:377
Base class for execution stages with support for automatic scheduling_group inheritance.
Definition: execution_stage.hh:306
hold a single metric group Initialization is done in the constructor or with a call to add_group
Definition: metrics_registration.hh:160
Identifies function calls that are accounted as a group.
Definition: scheduling.hh:286
auto make_execution_stage(const sstring &name, scheduling_group sg, Function &&fn)
Definition: execution_stage.hh:454
auto tuple_map(const std::tuple< Elements... > &t, Function &&f)
Definition: tuple_utils.hh:139
reference_wrapper< T > ref(T &object) noexcept
Wraps reference in a reference_wrapper.
Definition: reference_wrapper.hh:62
Definition: tuple_utils.hh:96
header for metrics creation.
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
scheduling_group current_scheduling_group() noexcept
Returns the current scheduling group.
Definition: scheduling.hh:400
Definition: noncopyable_function.hh:37
Definition: function_traits.hh:62
static type apply(Func &&func, std::tuple< FuncArgs... > &&args) noexcept