24 #include <seastar/core/future.hh>
25 #include <seastar/core/chunked_fifo.hh>
26 #include <seastar/core/function_traits.hh>
27 #include <seastar/core/sstring.hh>
30 #include <seastar/util/reference_wrapper.hh>
31 #include <seastar/util/concepts.hh>
32 #include <seastar/util/noncopyable_function.hh>
33 #include <seastar/util/tuple_utils.hh>
34 #include <seastar/util/std-compat.hh>
35 #include <seastar/util/modules.hh>
36 #ifndef SEASTAR_MODULE
37 #include <fmt/format.h>
39 #include <boost/range/irange.hpp>
40 #include <boost/range/adaptor/transformed.hpp>
41 #include <boost/container/static_vector.hpp>
89 struct reference_wrapper_for_es : reference_wrapper<T> {
90 reference_wrapper_for_es(reference_wrapper <T> rw) noexcept
91 : reference_wrapper<T>(std::move(rw)) {}
100 struct wrap_for_es<T&> {
101 using type = reference_wrapper_for_es<T>;
105 struct wrap_for_es<T&&> {
110 decltype(
auto) unwrap_for_es(T&&
object) {
111 return std::forward<T>(
object);
115 std::reference_wrapper<T> unwrap_for_es(reference_wrapper_for_es<T>
ref) {
116 return std::reference_wrapper<T>(
ref.get());
123 SEASTAR_MODULE_EXPORT
127 uint64_t tasks_scheduled = 0;
128 uint64_t tasks_preempted = 0;
129 uint64_t function_calls_enqueued = 0;
130 uint64_t function_calls_executed = 0;
134 bool _flush_scheduled =
false;
140 virtual void do_flush() noexcept = 0;
156 const sstring&
name() const noexcept {
return _name; }
181 class execution_stage_manager {
182 std::vector<execution_stage*> _execution_stages;
183 std::unordered_map<sstring, execution_stage*> _stages_by_name;
185 execution_stage_manager() =
default;
186 execution_stage_manager(
const execution_stage_manager&) =
delete;
187 execution_stage_manager(execution_stage_manager&&) =
delete;
189 void register_execution_stage(execution_stage& stage);
190 void unregister_execution_stage(execution_stage& stage) noexcept;
191 void update_execution_stage_registration(execution_stage& old_es, execution_stage& new_es) noexcept;
192 execution_stage* get_stage(
const sstring& name);
193 bool flush() noexcept;
194 bool poll() const noexcept;
196 static execution_stage_manager& get() noexcept;
210 template<typename ReturnType, typename... Args>
211 SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<std::tuple<Args...>>)
213 using args_tuple = std::tuple<Args...>;
214 static_assert(std::is_nothrow_move_constructible_v<args_tuple>,
215 "Function arguments need to be nothrow move constructible");
217 static constexpr
size_t flush_threshold = 128;
218 static constexpr
size_t max_queue_length = 1024;
220 using return_type = futurize_t<ReturnType>;
221 using promise_type =
typename return_type::promise_type;
228 work_item(
typename internal::wrap_for_es<Args>::type... args) : _in(std::move(args)...) { }
230 work_item(work_item&& other) =
delete;
231 work_item(
const work_item&) =
delete;
232 work_item(work_item&) =
delete;
238 auto unwrap(input_type&& in) {
239 return tuple_map(std::move(in), [] (
auto&& obj) {
240 return internal::unwrap_for_es(std::forward<decltype(obj)>(obj));
244 virtual void do_flush() noexcept
override {
245 while (!_queue.empty()) {
246 auto& wi = _queue.front();
247 auto wi_in = std::move(wi._in);
248 auto wi_ready = std::move(wi._ready);
251 _stats.function_calls_executed++;
253 if (need_preempt()) {
254 _stats.tasks_preempted++;
258 _empty = _queue.empty();
263 , _function(std::move(f))
265 _queue.reserve(flush_threshold);
292 return_type
operator()(
typename internal::wrap_for_es<Args>::type... args) {
293 if (_queue.size() >= max_queue_length) {
296 _queue.emplace_back(std::move(args)...);
298 _stats.function_calls_enqueued++;
299 auto f = _queue.back()._ready.get_future();
323 template<
typename ReturnType,
typename... Args>
324 SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<std::tuple<Args...>>)
326 using return_type = futurize_t<ReturnType>;
327 using args_tuple = std::tuple<Args...>;
330 static_assert(std::is_nothrow_move_constructible_v<args_tuple>,
331 "Function arguments need to be nothrow move constructible");
335 std::vector<std::optional<per_group_stage_type>> _stage_for_group{max_scheduling_groups()};
340 auto wrapped_function = [&_function = _function] (Args... args) {
341 return _function(std::forward<Args>(args)...);
343 auto name = fmt::format(
"{}.{}", _name, sg.name());
354 : _name(std::move(name)),_function(std::move(f)) {
377 return_type
operator()(
typename internal::wrap_for_es<Args>::type... args) {
379 auto sg_id = internal::scheduling_group_index(sg);
380 auto& slot = _stage_for_group[sg_id];
382 slot.emplace(make_stage_for_group(sg));
384 return (*slot)(std::move(args)...);
394 inheriting_execution_stage::stats
get_stats() const noexcept {
395 inheriting_execution_stage::stats summary;
396 for (
unsigned sg_id = 0; sg_id != _stage_for_group.size(); ++sg_id) {
397 auto sg = internal::scheduling_group_from_index(sg_id);
398 if (_stage_for_group[sg_id]) {
399 summary.push_back({sg, _stage_for_group[sg_id]->get_stats()});
410 template <
typename Ret,
typename ArgsTuple>
411 struct concrete_execution_stage_helper;
413 template <
typename Ret,
typename... Args>
414 struct concrete_execution_stage_helper<Ret, std::tuple<Args...>> {
415 using type = concrete_execution_stage<Ret, Args...>;
452 SEASTAR_MODULE_EXPORT
453 template<
typename Function>
456 using ret_type =
typename traits::return_type;
457 using args_as_tuple =
typename traits::args_as_tuple;
492 SEASTAR_MODULE_EXPORT
493 template<
typename Function>
521 SEASTAR_MODULE_EXPORT
522 template<
typename Ret,
typename Object,
typename... Args>
523 concrete_execution_stage<Ret, Object*, Args...>
528 template<
typename Ret,
typename Object,
typename... Args>
529 concrete_execution_stage<Ret,
const Object*, Args...>
530 make_execution_stage(
const sstring& name, scheduling_group sg, Ret (Object::*fn)(Args...)
const) {
531 return concrete_execution_stage<Ret,
const Object*, Args...>(name, sg, std::mem_fn(fn));
534 template<
typename Ret,
typename Object,
typename... Args>
535 concrete_execution_stage<Ret, Object*, Args...>
540 template<
typename Ret,
typename Object,
typename... Args>
541 concrete_execution_stage<Ret,
const Object*, Args...>
Concrete execution stage class.
Definition: execution_stage.hh:212
return_type operator()(typename internal::wrap_for_es< Args >::type... args)
Definition: execution_stage.hh:292
Base execution stage class.
Definition: execution_stage.hh:124
const sstring & name() const noexcept
Returns execution stage name.
Definition: execution_stage.hh:156
const stats & get_stats() const noexcept
Returns execution stage usage statistics.
Definition: execution_stage.hh:159
execution_stage(execution_stage &&)
bool poll() const noexcept
Definition: execution_stage.hh:173
Definition: execution_stage.hh:126
Concrete execution stage class, with support for automatic scheduling_group inheritance.
Definition: execution_stage.hh:325
inheriting_concrete_execution_stage(const sstring &name, noncopyable_function< ReturnType(Args...)> f)
Definition: execution_stage.hh:353
inheriting_execution_stage::stats get_stats() const noexcept
Definition: execution_stage.hh:394
return_type operator()(typename internal::wrap_for_es< Args >::type... args)
Definition: execution_stage.hh:377
Base class for execution stages with support for automatic scheduling_group inheritance.
Definition: execution_stage.hh:306
Definition: execution_stage.hh:308
hold a single metric group Initialization is done in the constructor or with a call to add_group
Definition: metrics_registration.hh:160
Identifies function calls that are accounted as a group.
Definition: scheduling.hh:286
auto make_execution_stage(const sstring &name, scheduling_group sg, Function &&fn)
Definition: execution_stage.hh:454
auto tuple_map(const std::tuple< Elements... > &t, Function &&f)
Definition: tuple_utils.hh:139
reference_wrapper< T > ref(T &object) noexcept
Wraps reference in a reference_wrapper.
Definition: reference_wrapper.hh:62
Definition: tuple_utils.hh:96
header for metrics creation.
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
scheduling_group current_scheduling_group() noexcept
Returns the current scheduling group.
Definition: scheduling.hh:400
Definition: noncopyable_function.hh:37
Definition: function_traits.hh:62
static type apply(Func &&func, std::tuple< FuncArgs... > &&args) noexcept