Seastar
High performance C++ framework for concurrent servers
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Modules Pages
execution_stage.hh
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18/*
19 * Copyright (C) 2017 ScyllaDB Ltd.
20 */
21
22#pragma once
23
24#include <seastar/core/future.hh>
25#include <seastar/core/chunked_fifo.hh>
26#include <seastar/core/function_traits.hh>
27#include <seastar/core/sstring.hh>
30#include <seastar/util/reference_wrapper.hh>
31#include <seastar/util/noncopyable_function.hh>
32#include <seastar/util/tuple_utils.hh>
33#include <seastar/util/std-compat.hh>
34#include <seastar/util/modules.hh>
35#ifndef SEASTAR_MODULE
36#include <fmt/format.h>
37#include <vector>
38#include <boost/container/static_vector.hpp>
39#endif
40
41namespace seastar {
42
71
74
76namespace internal {
77
78// Execution wraps lreferences in reference_wrapper so that the caller is forced
79// to use seastar::ref(). Then when the function is actually called the
80// reference is unwrapped. However, we need to distinguish between functions
81// which argument is lvalue reference and functions that take
82// reference_wrapper<> as an argument and not unwrap the latter. To solve this
83// issue reference_wrapper_for_es type is used for wrappings done automatically
84// by execution stage.
85template<typename T>
86struct reference_wrapper_for_es : reference_wrapper<T> {
87 reference_wrapper_for_es(reference_wrapper <T> rw) noexcept
88 : reference_wrapper<T>(std::move(rw)) {}
89};
90
91template<typename T>
92struct wrap_for_es {
93 using type = T;
94};
95
96template<typename T>
97struct wrap_for_es<T&> {
98 using type = reference_wrapper_for_es<T>;
99};
100
101template<typename T>
102struct wrap_for_es<T&&> {
103 using type = T;
104};
105
106template<typename T>
107decltype(auto) unwrap_for_es(T&& object) {
108 return std::forward<T>(object);
109}
110
111template<typename T>
112std::reference_wrapper<T> unwrap_for_es(reference_wrapper_for_es<T> ref) {
113 return std::reference_wrapper<T>(ref.get());
114}
115
116}
118
120SEASTAR_MODULE_EXPORT
122public:
123 struct stats {
124 uint64_t tasks_scheduled = 0;
125 uint64_t tasks_preempted = 0;
126 uint64_t function_calls_enqueued = 0;
127 uint64_t function_calls_executed = 0;
128 };
129protected:
130 bool _empty = true;
131 bool _flush_scheduled = false;
133 stats _stats;
134 sstring _name;
135 metrics::metric_group _metric_group;
136protected:
137 virtual void do_flush() noexcept = 0;
138public:
139 explicit execution_stage(const sstring& name, scheduling_group sg = {});
140 virtual ~execution_stage();
141
142 execution_stage(const execution_stage&) = delete;
143
151
153 const sstring& name() const noexcept { return _name; }
154
156 const stats& get_stats() const noexcept { return _stats; }
157
165 bool flush() noexcept;
166
170 bool poll() const noexcept {
171 return !_empty;
172 }
173};
174
176namespace internal {
177
178class execution_stage_manager {
179 std::vector<execution_stage*> _execution_stages;
180 std::unordered_map<sstring, execution_stage*> _stages_by_name;
181private:
182 execution_stage_manager() = default;
183 execution_stage_manager(const execution_stage_manager&) = delete;
184 execution_stage_manager(execution_stage_manager&&) = delete;
185public:
186 void register_execution_stage(execution_stage& stage);
187 void unregister_execution_stage(execution_stage& stage) noexcept;
188 void update_execution_stage_registration(execution_stage& old_es, execution_stage& new_es) noexcept;
189 execution_stage* get_stage(const sstring& name);
190 bool flush() noexcept;
191 bool poll() const noexcept;
192public:
193 static execution_stage_manager& get() noexcept;
194};
195
196}
198
207template<typename ReturnType, typename... Args>
208requires std::is_nothrow_move_constructible_v<std::tuple<Args...>>
210 using args_tuple = std::tuple<Args...>;
211 static_assert(std::is_nothrow_move_constructible_v<args_tuple>,
212 "Function arguments need to be nothrow move constructible");
213
214 static constexpr size_t flush_threshold = 128;
215 static constexpr size_t max_queue_length = 1024;
216
217 using return_type = futurize_t<ReturnType>;
218 using promise_type = typename return_type::promise_type;
220
221 struct work_item {
222 input_type _in;
223 promise_type _ready;
224
225 work_item(typename internal::wrap_for_es<Args>::type... args) : _in(std::move(args)...) { }
226
227 work_item(work_item&& other) = delete;
228 work_item(const work_item&) = delete;
229 work_item(work_item&) = delete;
230 };
232
233 noncopyable_function<ReturnType (Args...)> _function;
234private:
235 auto unwrap(input_type&& in) {
236 return tuple_map(std::move(in), [] (auto&& obj) {
237 return internal::unwrap_for_es(std::forward<decltype(obj)>(obj));
238 });
239 }
240
241 virtual void do_flush() noexcept override {
242 while (!_queue.empty()) {
243 auto& wi = _queue.front();
244 auto wi_in = std::move(wi._in);
245 auto wi_ready = std::move(wi._ready);
246 _queue.pop_front();
247 futurize<ReturnType>::apply(_function, unwrap(std::move(wi_in))).forward_to(std::move(wi_ready));
248 _stats.function_calls_executed++;
249
250 if (internal::scheduler_need_preempt()) {
251 _stats.tasks_preempted++;
252 break;
253 }
254 }
255 _empty = _queue.empty();
256 }
257public:
258 explicit concrete_execution_stage(const sstring& name, scheduling_group sg, noncopyable_function<ReturnType (Args...)> f)
259 : execution_stage(name, sg)
260 , _function(std::move(f))
261 {
262 _queue.reserve(flush_threshold);
263 }
264 explicit concrete_execution_stage(const sstring& name, noncopyable_function<ReturnType (Args...)> f)
265 : concrete_execution_stage(name, scheduling_group(), std::move(f)) {
266 }
267
289 return_type operator()(typename internal::wrap_for_es<Args>::type... args) {
290 if (_queue.size() >= max_queue_length) {
291 do_flush();
292 }
293 _queue.emplace_back(std::move(args)...);
294 _empty = false;
295 _stats.function_calls_enqueued++;
296 auto f = _queue.back()._ready.get_future();
297 flush();
298 return f;
299 }
300};
301
304public:
308 };
309 using stats = boost::container::static_vector<per_scheduling_group_stats, max_scheduling_groups()>;
310};
311
320template<typename ReturnType, typename... Args>
321requires std::is_nothrow_move_constructible_v<std::tuple<Args...>>
323 using return_type = futurize_t<ReturnType>;
324 using args_tuple = std::tuple<Args...>;
325 using per_group_stage_type = concrete_execution_stage<ReturnType, Args...>;
326
327 static_assert(std::is_nothrow_move_constructible_v<args_tuple>,
328 "Function arguments need to be nothrow move constructible");
329
330 sstring _name;
331 noncopyable_function<ReturnType (Args...)> _function;
332 std::vector<std::optional<per_group_stage_type>> _stage_for_group{max_scheduling_groups()};
333private:
334 per_group_stage_type make_stage_for_group(scheduling_group sg) {
335 // We can't use std::ref(function), because reference_wrapper decays to noncopyable_function& and
336 // that selects the noncopyable_function copy constructor. Use a lambda instead.
337 auto wrapped_function = [&_function = _function] (Args... args) {
338 return _function(std::forward<Args>(args)...);
339 };
340 auto name = fmt::format("{}.{}", _name, sg.name());
341 return per_group_stage_type(name, sg, wrapped_function);
342 }
343public:
350 inheriting_concrete_execution_stage(const sstring& name, noncopyable_function<ReturnType (Args...)> f)
351 : _name(std::move(name)),_function(std::move(f)) {
352 }
353
374 return_type operator()(typename internal::wrap_for_es<Args>::type... args) {
375 auto sg = current_scheduling_group();
376 auto sg_id = internal::scheduling_group_index(sg);
377 auto& slot = _stage_for_group[sg_id];
378 if (!slot) {
379 slot.emplace(make_stage_for_group(sg));
380 }
381 return (*slot)(std::move(args)...);
382 }
383
391 inheriting_execution_stage::stats get_stats() const noexcept {
392 inheriting_execution_stage::stats summary;
393 for (unsigned sg_id = 0; sg_id != _stage_for_group.size(); ++sg_id) {
394 auto sg = internal::scheduling_group_from_index(sg_id);
395 if (_stage_for_group[sg_id]) {
396 summary.push_back({sg, _stage_for_group[sg_id]->get_stats()});
397 }
398 }
399 return summary;
400 }
401};
402
403
405namespace internal {
406
407template <typename Ret, typename ArgsTuple>
408struct concrete_execution_stage_helper;
409
410template <typename Ret, typename... Args>
411struct concrete_execution_stage_helper<Ret, std::tuple<Args...>> {
412 using type = concrete_execution_stage<Ret, Args...>;
413};
414
415}
417
449SEASTAR_MODULE_EXPORT
450template<typename Function>
451auto make_execution_stage(const sstring& name, scheduling_group sg, Function&& fn) {
452 using traits = function_traits<Function>;
453 using ret_type = typename traits::return_type;
454 using args_as_tuple = typename traits::args_as_tuple;
455 using concrete_execution_stage = typename internal::concrete_execution_stage_helper<ret_type, args_as_tuple>::type;
456 return concrete_execution_stage(name, sg, std::forward<Function>(fn));
457}
458
489SEASTAR_MODULE_EXPORT
490template<typename Function>
491auto make_execution_stage(const sstring& name, Function&& fn) {
492 return make_execution_stage(name, scheduling_group(), std::forward<Function>(fn));
493}
494
518SEASTAR_MODULE_EXPORT
519template<typename Ret, typename Object, typename... Args>
520concrete_execution_stage<Ret, Object*, Args...>
521make_execution_stage(const sstring& name, scheduling_group sg, Ret (Object::*fn)(Args...)) {
522 return concrete_execution_stage<Ret, Object*, Args...>(name, sg, std::mem_fn(fn));
523}
524
525template<typename Ret, typename Object, typename... Args>
526concrete_execution_stage<Ret, const Object*, Args...>
527make_execution_stage(const sstring& name, scheduling_group sg, Ret (Object::*fn)(Args...) const) {
528 return concrete_execution_stage<Ret, const Object*, Args...>(name, sg, std::mem_fn(fn));
529}
530
531template<typename Ret, typename Object, typename... Args>
532concrete_execution_stage<Ret, Object*, Args...>
533make_execution_stage(const sstring& name, Ret (Object::*fn)(Args...)) {
534 return make_execution_stage(name, scheduling_group(), fn);
535}
536
537template<typename Ret, typename Object, typename... Args>
538concrete_execution_stage<Ret, const Object*, Args...>
539make_execution_stage(const sstring& name, Ret (Object::*fn)(Args...) const) {
540 return make_execution_stage(name, scheduling_group(), fn);
541}
542
544
545}
Concrete execution stage class.
Definition: execution_stage.hh:209
return_type operator()(typename internal::wrap_for_es< Args >::type... args)
Definition: execution_stage.hh:289
Base execution stage class.
Definition: execution_stage.hh:121
const sstring & name() const noexcept
Returns execution stage name.
Definition: execution_stage.hh:153
const stats & get_stats() const noexcept
Returns execution stage usage statistics.
Definition: execution_stage.hh:156
execution_stage(execution_stage &&)
bool flush() noexcept
bool poll() const noexcept
Definition: execution_stage.hh:170
Definition: execution_stage.hh:123
Concrete execution stage class, with support for automatic scheduling_group inheritance.
Definition: execution_stage.hh:322
inheriting_concrete_execution_stage(const sstring &name, noncopyable_function< ReturnType(Args...)> f)
Definition: execution_stage.hh:350
inheriting_execution_stage::stats get_stats() const noexcept
Definition: execution_stage.hh:391
return_type operator()(typename internal::wrap_for_es< Args >::type... args)
Definition: execution_stage.hh:374
Base class for execution stages with support for automatic scheduling_group inheritance.
Definition: execution_stage.hh:303
hold a single metric group Initialization is done in the constructor or with a call to add_group
Definition: metrics_registration.hh:160
Identifies function calls that are accounted as a group.
Definition: scheduling.hh:285
auto make_execution_stage(const sstring &name, scheduling_group sg, Function &&fn)
Definition: execution_stage.hh:451
auto tuple_map(const std::tuple< Elements... > &t, Function &&f)
Definition: tuple_utils.hh:139
reference_wrapper< T > ref(T &object) noexcept
Wraps reference in a reference_wrapper.
Definition: reference_wrapper.hh:62
Definition: tuple_utils.hh:96
header for metrics creation.
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
scheduling_group current_scheduling_group() noexcept
Returns the current scheduling group.
Definition: scheduling.hh:397
Definition: noncopyable_function.hh:37
Definition: function_traits.hh:62
STL namespace.
static type apply(Func &&func, std::tuple< FuncArgs... > &&args) noexcept