Seastar
High performance C++ framework for concurrent servers
execution_stage.hh
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18/*
19 * Copyright (C) 2017 ScyllaDB Ltd.
20 */
21
22#pragma once
23
24#include <seastar/core/future.hh>
25#include <seastar/core/chunked_fifo.hh>
26#include <seastar/core/function_traits.hh>
27#include <seastar/core/sstring.hh>
30#include <seastar/util/reference_wrapper.hh>
31#include <seastar/util/noncopyable_function.hh>
32#include <seastar/util/tuple_utils.hh>
33#include <seastar/util/std-compat.hh>
34#include <seastar/util/modules.hh>
35#ifndef SEASTAR_MODULE
36#include <fmt/format.h>
37#include <vector>
38#include <boost/range/irange.hpp>
39#include <boost/range/adaptor/transformed.hpp>
40#include <boost/container/static_vector.hpp>
41#endif
42
43namespace seastar {
44
73
76
78namespace internal {
79
80// Execution wraps lreferences in reference_wrapper so that the caller is forced
81// to use seastar::ref(). Then when the function is actually called the
82// reference is unwrapped. However, we need to distinguish between functions
83// which argument is lvalue reference and functions that take
84// reference_wrapper<> as an argument and not unwrap the latter. To solve this
85// issue reference_wrapper_for_es type is used for wrappings done automatically
86// by execution stage.
87template<typename T>
88struct reference_wrapper_for_es : reference_wrapper<T> {
89 reference_wrapper_for_es(reference_wrapper <T> rw) noexcept
90 : reference_wrapper<T>(std::move(rw)) {}
91};
92
93template<typename T>
94struct wrap_for_es {
95 using type = T;
96};
97
98template<typename T>
99struct wrap_for_es<T&> {
100 using type = reference_wrapper_for_es<T>;
101};
102
103template<typename T>
104struct wrap_for_es<T&&> {
105 using type = T;
106};
107
108template<typename T>
109decltype(auto) unwrap_for_es(T&& object) {
110 return std::forward<T>(object);
111}
112
113template<typename T>
114std::reference_wrapper<T> unwrap_for_es(reference_wrapper_for_es<T> ref) {
115 return std::reference_wrapper<T>(ref.get());
116}
117
118}
120
122SEASTAR_MODULE_EXPORT
124public:
125 struct stats {
126 uint64_t tasks_scheduled = 0;
127 uint64_t tasks_preempted = 0;
128 uint64_t function_calls_enqueued = 0;
129 uint64_t function_calls_executed = 0;
130 };
131protected:
132 bool _empty = true;
133 bool _flush_scheduled = false;
135 stats _stats;
136 sstring _name;
137 metrics::metric_group _metric_group;
138protected:
139 virtual void do_flush() noexcept = 0;
140public:
141 explicit execution_stage(const sstring& name, scheduling_group sg = {});
142 virtual ~execution_stage();
143
144 execution_stage(const execution_stage&) = delete;
145
153
155 const sstring& name() const noexcept { return _name; }
156
158 const stats& get_stats() const noexcept { return _stats; }
159
167 bool flush() noexcept;
168
172 bool poll() const noexcept {
173 return !_empty;
174 }
175};
176
178namespace internal {
179
180class execution_stage_manager {
181 std::vector<execution_stage*> _execution_stages;
182 std::unordered_map<sstring, execution_stage*> _stages_by_name;
183private:
184 execution_stage_manager() = default;
185 execution_stage_manager(const execution_stage_manager&) = delete;
186 execution_stage_manager(execution_stage_manager&&) = delete;
187public:
188 void register_execution_stage(execution_stage& stage);
189 void unregister_execution_stage(execution_stage& stage) noexcept;
190 void update_execution_stage_registration(execution_stage& old_es, execution_stage& new_es) noexcept;
191 execution_stage* get_stage(const sstring& name);
192 bool flush() noexcept;
193 bool poll() const noexcept;
194public:
195 static execution_stage_manager& get() noexcept;
196};
197
198}
200
209template<typename ReturnType, typename... Args>
210requires std::is_nothrow_move_constructible_v<std::tuple<Args...>>
212 using args_tuple = std::tuple<Args...>;
213 static_assert(std::is_nothrow_move_constructible_v<args_tuple>,
214 "Function arguments need to be nothrow move constructible");
215
216 static constexpr size_t flush_threshold = 128;
217 static constexpr size_t max_queue_length = 1024;
218
219 using return_type = futurize_t<ReturnType>;
220 using promise_type = typename return_type::promise_type;
222
223 struct work_item {
224 input_type _in;
225 promise_type _ready;
226
227 work_item(typename internal::wrap_for_es<Args>::type... args) : _in(std::move(args)...) { }
228
229 work_item(work_item&& other) = delete;
230 work_item(const work_item&) = delete;
231 work_item(work_item&) = delete;
232 };
234
235 noncopyable_function<ReturnType (Args...)> _function;
236private:
237 auto unwrap(input_type&& in) {
238 return tuple_map(std::move(in), [] (auto&& obj) {
239 return internal::unwrap_for_es(std::forward<decltype(obj)>(obj));
240 });
241 }
242
243 virtual void do_flush() noexcept override {
244 while (!_queue.empty()) {
245 auto& wi = _queue.front();
246 auto wi_in = std::move(wi._in);
247 auto wi_ready = std::move(wi._ready);
248 _queue.pop_front();
249 futurize<ReturnType>::apply(_function, unwrap(std::move(wi_in))).forward_to(std::move(wi_ready));
250 _stats.function_calls_executed++;
251
252 if (internal::scheduler_need_preempt()) {
253 _stats.tasks_preempted++;
254 break;
255 }
256 }
257 _empty = _queue.empty();
258 }
259public:
260 explicit concrete_execution_stage(const sstring& name, scheduling_group sg, noncopyable_function<ReturnType (Args...)> f)
261 : execution_stage(name, sg)
262 , _function(std::move(f))
263 {
264 _queue.reserve(flush_threshold);
265 }
266 explicit concrete_execution_stage(const sstring& name, noncopyable_function<ReturnType (Args...)> f)
267 : concrete_execution_stage(name, scheduling_group(), std::move(f)) {
268 }
269
291 return_type operator()(typename internal::wrap_for_es<Args>::type... args) {
292 if (_queue.size() >= max_queue_length) {
293 do_flush();
294 }
295 _queue.emplace_back(std::move(args)...);
296 _empty = false;
297 _stats.function_calls_enqueued++;
298 auto f = _queue.back()._ready.get_future();
299 flush();
300 return f;
301 }
302};
303
306public:
310 };
311 using stats = boost::container::static_vector<per_scheduling_group_stats, max_scheduling_groups()>;
312};
313
322template<typename ReturnType, typename... Args>
323requires std::is_nothrow_move_constructible_v<std::tuple<Args...>>
325 using return_type = futurize_t<ReturnType>;
326 using args_tuple = std::tuple<Args...>;
327 using per_group_stage_type = concrete_execution_stage<ReturnType, Args...>;
328
329 static_assert(std::is_nothrow_move_constructible_v<args_tuple>,
330 "Function arguments need to be nothrow move constructible");
331
332 sstring _name;
333 noncopyable_function<ReturnType (Args...)> _function;
334 std::vector<std::optional<per_group_stage_type>> _stage_for_group{max_scheduling_groups()};
335private:
336 per_group_stage_type make_stage_for_group(scheduling_group sg) {
337 // We can't use std::ref(function), because reference_wrapper decays to noncopyable_function& and
338 // that selects the noncopyable_function copy constructor. Use a lambda instead.
339 auto wrapped_function = [&_function = _function] (Args... args) {
340 return _function(std::forward<Args>(args)...);
341 };
342 auto name = fmt::format("{}.{}", _name, sg.name());
343 return per_group_stage_type(name, sg, wrapped_function);
344 }
345public:
352 inheriting_concrete_execution_stage(const sstring& name, noncopyable_function<ReturnType (Args...)> f)
353 : _name(std::move(name)),_function(std::move(f)) {
354 }
355
376 return_type operator()(typename internal::wrap_for_es<Args>::type... args) {
377 auto sg = current_scheduling_group();
378 auto sg_id = internal::scheduling_group_index(sg);
379 auto& slot = _stage_for_group[sg_id];
380 if (!slot) {
381 slot.emplace(make_stage_for_group(sg));
382 }
383 return (*slot)(std::move(args)...);
384 }
385
393 inheriting_execution_stage::stats get_stats() const noexcept {
394 inheriting_execution_stage::stats summary;
395 for (unsigned sg_id = 0; sg_id != _stage_for_group.size(); ++sg_id) {
396 auto sg = internal::scheduling_group_from_index(sg_id);
397 if (_stage_for_group[sg_id]) {
398 summary.push_back({sg, _stage_for_group[sg_id]->get_stats()});
399 }
400 }
401 return summary;
402 }
403};
404
405
407namespace internal {
408
409template <typename Ret, typename ArgsTuple>
410struct concrete_execution_stage_helper;
411
412template <typename Ret, typename... Args>
413struct concrete_execution_stage_helper<Ret, std::tuple<Args...>> {
414 using type = concrete_execution_stage<Ret, Args...>;
415};
416
417}
419
451SEASTAR_MODULE_EXPORT
452template<typename Function>
453auto make_execution_stage(const sstring& name, scheduling_group sg, Function&& fn) {
454 using traits = function_traits<Function>;
455 using ret_type = typename traits::return_type;
456 using args_as_tuple = typename traits::args_as_tuple;
457 using concrete_execution_stage = typename internal::concrete_execution_stage_helper<ret_type, args_as_tuple>::type;
458 return concrete_execution_stage(name, sg, std::forward<Function>(fn));
459}
460
491SEASTAR_MODULE_EXPORT
492template<typename Function>
493auto make_execution_stage(const sstring& name, Function&& fn) {
494 return make_execution_stage(name, scheduling_group(), std::forward<Function>(fn));
495}
496
520SEASTAR_MODULE_EXPORT
521template<typename Ret, typename Object, typename... Args>
522concrete_execution_stage<Ret, Object*, Args...>
523make_execution_stage(const sstring& name, scheduling_group sg, Ret (Object::*fn)(Args...)) {
524 return concrete_execution_stage<Ret, Object*, Args...>(name, sg, std::mem_fn(fn));
525}
526
527template<typename Ret, typename Object, typename... Args>
528concrete_execution_stage<Ret, const Object*, Args...>
529make_execution_stage(const sstring& name, scheduling_group sg, Ret (Object::*fn)(Args...) const) {
530 return concrete_execution_stage<Ret, const Object*, Args...>(name, sg, std::mem_fn(fn));
531}
532
533template<typename Ret, typename Object, typename... Args>
534concrete_execution_stage<Ret, Object*, Args...>
535make_execution_stage(const sstring& name, Ret (Object::*fn)(Args...)) {
536 return make_execution_stage(name, scheduling_group(), fn);
537}
538
539template<typename Ret, typename Object, typename... Args>
540concrete_execution_stage<Ret, const Object*, Args...>
541make_execution_stage(const sstring& name, Ret (Object::*fn)(Args...) const) {
542 return make_execution_stage(name, scheduling_group(), fn);
543}
544
546
547}
Concrete execution stage class.
Definition: execution_stage.hh:211
return_type operator()(typename internal::wrap_for_es< Args >::type... args)
Definition: execution_stage.hh:291
Base execution stage class.
Definition: execution_stage.hh:123
const sstring & name() const noexcept
Returns execution stage name.
Definition: execution_stage.hh:155
const stats & get_stats() const noexcept
Returns execution stage usage statistics.
Definition: execution_stage.hh:158
execution_stage(execution_stage &&)
bool flush() noexcept
bool poll() const noexcept
Definition: execution_stage.hh:172
Definition: execution_stage.hh:125
Concrete execution stage class, with support for automatic scheduling_group inheritance.
Definition: execution_stage.hh:324
inheriting_concrete_execution_stage(const sstring &name, noncopyable_function< ReturnType(Args...)> f)
Definition: execution_stage.hh:352
inheriting_execution_stage::stats get_stats() const noexcept
Definition: execution_stage.hh:393
return_type operator()(typename internal::wrap_for_es< Args >::type... args)
Definition: execution_stage.hh:376
Base class for execution stages with support for automatic scheduling_group inheritance.
Definition: execution_stage.hh:305
hold a single metric group Initialization is done in the constructor or with a call to add_group
Definition: metrics_registration.hh:160
Identifies function calls that are accounted as a group.
Definition: scheduling.hh:285
auto make_execution_stage(const sstring &name, scheduling_group sg, Function &&fn)
Definition: execution_stage.hh:453
auto tuple_map(const std::tuple< Elements... > &t, Function &&f)
Definition: tuple_utils.hh:139
reference_wrapper< T > ref(T &object) noexcept
Wraps reference in a reference_wrapper.
Definition: reference_wrapper.hh:62
Definition: tuple_utils.hh:96
header for metrics creation.
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
scheduling_group current_scheduling_group() noexcept
Returns the current scheduling group.
Definition: scheduling.hh:399
Definition: noncopyable_function.hh:37
Definition: function_traits.hh:62
STL namespace.
static type apply(Func &&func, std::tuple< FuncArgs... > &&args) noexcept