24#include <seastar/core/future.hh>
25#include <seastar/core/chunked_fifo.hh>
26#include <seastar/core/function_traits.hh>
27#include <seastar/core/sstring.hh>
30#include <seastar/util/reference_wrapper.hh>
31#include <seastar/util/noncopyable_function.hh>
32#include <seastar/util/tuple_utils.hh>
33#include <seastar/util/std-compat.hh>
34#include <seastar/util/modules.hh>
36#include <fmt/format.h>
38#include <boost/container/static_vector.hpp>
86struct reference_wrapper_for_es : reference_wrapper<T> {
87 reference_wrapper_for_es(reference_wrapper <T> rw) noexcept
88 : reference_wrapper<T>(std::move(rw)) {}
97struct wrap_for_es<T&> {
98 using type = reference_wrapper_for_es<T>;
102struct wrap_for_es<T&&> {
107decltype(
auto) unwrap_for_es(T&&
object) {
108 return std::forward<T>(
object);
112std::reference_wrapper<T> unwrap_for_es(reference_wrapper_for_es<T>
ref) {
113 return std::reference_wrapper<T>(
ref.get());
124 uint64_t tasks_scheduled = 0;
125 uint64_t tasks_preempted = 0;
126 uint64_t function_calls_enqueued = 0;
127 uint64_t function_calls_executed = 0;
131 bool _flush_scheduled =
false;
137 virtual void do_flush() noexcept = 0;
153 const sstring&
name() const noexcept {
return _name; }
178class execution_stage_manager {
179 std::vector<execution_stage*> _execution_stages;
180 std::unordered_map<sstring, execution_stage*> _stages_by_name;
182 execution_stage_manager() =
default;
183 execution_stage_manager(
const execution_stage_manager&) =
delete;
184 execution_stage_manager(execution_stage_manager&&) =
delete;
186 void register_execution_stage(execution_stage& stage);
187 void unregister_execution_stage(execution_stage& stage)
noexcept;
188 void update_execution_stage_registration(execution_stage& old_es, execution_stage& new_es)
noexcept;
189 execution_stage* get_stage(
const sstring& name);
190 bool flush() noexcept;
191 bool poll() const noexcept;
193 static execution_stage_manager& get() noexcept;
207template<typename ReturnType, typename... Args>
208requires
std::is_nothrow_move_constructible_v<
std::tuple<Args...>>
210 using args_tuple = std::tuple<Args...>;
211 static_assert(std::is_nothrow_move_constructible_v<args_tuple>,
212 "Function arguments need to be nothrow move constructible");
214 static constexpr size_t flush_threshold = 128;
215 static constexpr size_t max_queue_length = 1024;
217 using return_type = futurize_t<ReturnType>;
218 using promise_type =
typename return_type::promise_type;
225 work_item(
typename internal::wrap_for_es<Args>::type... args) : _in(std::move(args)...) { }
227 work_item(work_item&& other) =
delete;
228 work_item(
const work_item&) =
delete;
229 work_item(work_item&) =
delete;
235 auto unwrap(input_type&& in) {
236 return tuple_map(std::move(in), [] (
auto&& obj) {
237 return internal::unwrap_for_es(std::forward<
decltype(obj)>(obj));
241 virtual void do_flush()
noexcept override {
242 while (!_queue.empty()) {
243 auto& wi = _queue.front();
244 auto wi_in = std::move(wi._in);
245 auto wi_ready = std::move(wi._ready);
248 _stats.function_calls_executed++;
250 if (internal::scheduler_need_preempt()) {
251 _stats.tasks_preempted++;
255 _empty = _queue.empty();
260 , _function(std::move(f))
262 _queue.reserve(flush_threshold);
289 return_type
operator()(
typename internal::wrap_for_es<Args>::type... args) {
290 if (_queue.size() >= max_queue_length) {
293 _queue.emplace_back(std::move(args)...);
295 _stats.function_calls_enqueued++;
296 auto f = _queue.back()._ready.get_future();
320template<
typename ReturnType,
typename... Args>
321requires std::is_nothrow_move_constructible_v<std::tuple<Args...>>
323 using return_type = futurize_t<ReturnType>;
324 using args_tuple = std::tuple<Args...>;
327 static_assert(std::is_nothrow_move_constructible_v<args_tuple>,
328 "Function arguments need to be nothrow move constructible");
332 std::vector<std::optional<per_group_stage_type>> _stage_for_group{max_scheduling_groups()};
337 auto wrapped_function = [&_function = _function] (Args... args) {
338 return _function(std::forward<Args>(args)...);
340 auto name = fmt::format(
"{}.{}", _name, sg.name());
351 : _name(
std::move(name)),_function(
std::move(f)) {
374 return_type
operator()(
typename internal::wrap_for_es<Args>::type... args) {
376 auto sg_id = internal::scheduling_group_index(sg);
377 auto& slot = _stage_for_group[sg_id];
379 slot.emplace(make_stage_for_group(sg));
381 return (*slot)(std::move(args)...);
391 inheriting_execution_stage::stats
get_stats() const noexcept {
392 inheriting_execution_stage::stats summary;
393 for (
unsigned sg_id = 0; sg_id != _stage_for_group.size(); ++sg_id) {
394 auto sg = internal::scheduling_group_from_index(sg_id);
395 if (_stage_for_group[sg_id]) {
396 summary.push_back({sg, _stage_for_group[sg_id]->get_stats()});
407template <
typename Ret,
typename ArgsTuple>
408struct concrete_execution_stage_helper;
410template <
typename Ret,
typename... Args>
411struct concrete_execution_stage_helper<Ret,
std::tuple<Args...>> {
412 using type = concrete_execution_stage<Ret, Args...>;
450template<
typename Function>
453 using ret_type =
typename traits::return_type;
454 using args_as_tuple =
typename traits::args_as_tuple;
490template<
typename Function>
519template<
typename Ret,
typename Object,
typename... Args>
520concrete_execution_stage<Ret, Object*, Args...>
525template<
typename Ret,
typename Object,
typename... Args>
526concrete_execution_stage<Ret,
const Object*, Args...>
527make_execution_stage(
const sstring& name, scheduling_group sg, Ret (Object::*fn)(Args...)
const) {
528 return concrete_execution_stage<Ret,
const Object*, Args...>(name, sg, std::mem_fn(fn));
531template<
typename Ret,
typename Object,
typename... Args>
532concrete_execution_stage<Ret, Object*, Args...>
537template<
typename Ret,
typename Object,
typename... Args>
538concrete_execution_stage<Ret,
const Object*, Args...>
Concrete execution stage class.
Definition: execution_stage.hh:209
return_type operator()(typename internal::wrap_for_es< Args >::type... args)
Definition: execution_stage.hh:289
Base execution stage class.
Definition: execution_stage.hh:121
const sstring & name() const noexcept
Returns execution stage name.
Definition: execution_stage.hh:153
const stats & get_stats() const noexcept
Returns execution stage usage statistics.
Definition: execution_stage.hh:156
execution_stage(execution_stage &&)
bool poll() const noexcept
Definition: execution_stage.hh:170
Definition: execution_stage.hh:123
Concrete execution stage class, with support for automatic scheduling_group inheritance.
Definition: execution_stage.hh:322
inheriting_concrete_execution_stage(const sstring &name, noncopyable_function< ReturnType(Args...)> f)
Definition: execution_stage.hh:350
inheriting_execution_stage::stats get_stats() const noexcept
Definition: execution_stage.hh:391
return_type operator()(typename internal::wrap_for_es< Args >::type... args)
Definition: execution_stage.hh:374
Base class for execution stages with support for automatic scheduling_group inheritance.
Definition: execution_stage.hh:303
Definition: execution_stage.hh:305
hold a single metric group Initialization is done in the constructor or with a call to add_group
Definition: metrics_registration.hh:160
Identifies function calls that are accounted as a group.
Definition: scheduling.hh:285
auto make_execution_stage(const sstring &name, scheduling_group sg, Function &&fn)
Definition: execution_stage.hh:451
auto tuple_map(const std::tuple< Elements... > &t, Function &&f)
Definition: tuple_utils.hh:139
reference_wrapper< T > ref(T &object) noexcept
Wraps reference in a reference_wrapper.
Definition: reference_wrapper.hh:62
Definition: tuple_utils.hh:96
header for metrics creation.
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
scheduling_group current_scheduling_group() noexcept
Returns the current scheduling group.
Definition: scheduling.hh:397
Definition: noncopyable_function.hh:37
Definition: function_traits.hh:62
static type apply(Func &&func, std::tuple< FuncArgs... > &&args) noexcept