24#include <seastar/core/future.hh>
25#include <seastar/core/chunked_fifo.hh>
26#include <seastar/core/function_traits.hh>
27#include <seastar/core/sstring.hh>
30#include <seastar/util/reference_wrapper.hh>
31#include <seastar/util/noncopyable_function.hh>
32#include <seastar/util/tuple_utils.hh>
33#include <seastar/util/std-compat.hh>
34#include <seastar/util/modules.hh>
36#include <fmt/format.h>
38#include <boost/range/irange.hpp>
39#include <boost/range/adaptor/transformed.hpp>
40#include <boost/container/static_vector.hpp>
88struct reference_wrapper_for_es : reference_wrapper<T> {
89 reference_wrapper_for_es(reference_wrapper <T> rw) noexcept
90 : reference_wrapper<T>(std::move(rw)) {}
99struct wrap_for_es<T&> {
100 using type = reference_wrapper_for_es<T>;
104struct wrap_for_es<T&&> {
109decltype(
auto) unwrap_for_es(T&&
object) {
110 return std::forward<T>(
object);
114std::reference_wrapper<T> unwrap_for_es(reference_wrapper_for_es<T>
ref) {
115 return std::reference_wrapper<T>(
ref.get());
126 uint64_t tasks_scheduled = 0;
127 uint64_t tasks_preempted = 0;
128 uint64_t function_calls_enqueued = 0;
129 uint64_t function_calls_executed = 0;
133 bool _flush_scheduled =
false;
139 virtual void do_flush() noexcept = 0;
155 const sstring&
name() const noexcept {
return _name; }
180class execution_stage_manager {
181 std::vector<execution_stage*> _execution_stages;
182 std::unordered_map<sstring, execution_stage*> _stages_by_name;
184 execution_stage_manager() =
default;
185 execution_stage_manager(
const execution_stage_manager&) =
delete;
186 execution_stage_manager(execution_stage_manager&&) =
delete;
188 void register_execution_stage(execution_stage& stage);
189 void unregister_execution_stage(execution_stage& stage)
noexcept;
190 void update_execution_stage_registration(execution_stage& old_es, execution_stage& new_es)
noexcept;
191 execution_stage* get_stage(
const sstring& name);
192 bool flush() noexcept;
193 bool poll() const noexcept;
195 static execution_stage_manager& get() noexcept;
209template<typename ReturnType, typename... Args>
210requires
std::is_nothrow_move_constructible_v<
std::tuple<Args...>>
212 using args_tuple = std::tuple<Args...>;
213 static_assert(std::is_nothrow_move_constructible_v<args_tuple>,
214 "Function arguments need to be nothrow move constructible");
216 static constexpr size_t flush_threshold = 128;
217 static constexpr size_t max_queue_length = 1024;
219 using return_type = futurize_t<ReturnType>;
220 using promise_type =
typename return_type::promise_type;
227 work_item(
typename internal::wrap_for_es<Args>::type... args) : _in(std::move(args)...) { }
229 work_item(work_item&& other) =
delete;
230 work_item(
const work_item&) =
delete;
231 work_item(work_item&) =
delete;
237 auto unwrap(input_type&& in) {
238 return tuple_map(std::move(in), [] (
auto&& obj) {
239 return internal::unwrap_for_es(std::forward<
decltype(obj)>(obj));
243 virtual void do_flush()
noexcept override {
244 while (!_queue.empty()) {
245 auto& wi = _queue.front();
246 auto wi_in = std::move(wi._in);
247 auto wi_ready = std::move(wi._ready);
250 _stats.function_calls_executed++;
252 if (internal::scheduler_need_preempt()) {
253 _stats.tasks_preempted++;
257 _empty = _queue.empty();
262 , _function(std::move(f))
264 _queue.reserve(flush_threshold);
291 return_type
operator()(
typename internal::wrap_for_es<Args>::type... args) {
292 if (_queue.size() >= max_queue_length) {
295 _queue.emplace_back(std::move(args)...);
297 _stats.function_calls_enqueued++;
298 auto f = _queue.back()._ready.get_future();
322template<
typename ReturnType,
typename... Args>
323requires std::is_nothrow_move_constructible_v<std::tuple<Args...>>
325 using return_type = futurize_t<ReturnType>;
326 using args_tuple = std::tuple<Args...>;
329 static_assert(std::is_nothrow_move_constructible_v<args_tuple>,
330 "Function arguments need to be nothrow move constructible");
334 std::vector<std::optional<per_group_stage_type>> _stage_for_group{max_scheduling_groups()};
339 auto wrapped_function = [&_function = _function] (Args... args) {
340 return _function(std::forward<Args>(args)...);
342 auto name = fmt::format(
"{}.{}", _name, sg.name());
353 : _name(
std::move(name)),_function(
std::move(f)) {
376 return_type
operator()(
typename internal::wrap_for_es<Args>::type... args) {
378 auto sg_id = internal::scheduling_group_index(sg);
379 auto& slot = _stage_for_group[sg_id];
381 slot.emplace(make_stage_for_group(sg));
383 return (*slot)(std::move(args)...);
393 inheriting_execution_stage::stats
get_stats() const noexcept {
394 inheriting_execution_stage::stats summary;
395 for (
unsigned sg_id = 0; sg_id != _stage_for_group.size(); ++sg_id) {
396 auto sg = internal::scheduling_group_from_index(sg_id);
397 if (_stage_for_group[sg_id]) {
398 summary.push_back({sg, _stage_for_group[sg_id]->get_stats()});
409template <
typename Ret,
typename ArgsTuple>
410struct concrete_execution_stage_helper;
412template <
typename Ret,
typename... Args>
413struct concrete_execution_stage_helper<Ret,
std::tuple<Args...>> {
414 using type = concrete_execution_stage<Ret, Args...>;
452template<
typename Function>
455 using ret_type =
typename traits::return_type;
456 using args_as_tuple =
typename traits::args_as_tuple;
492template<
typename Function>
521template<
typename Ret,
typename Object,
typename... Args>
522concrete_execution_stage<Ret, Object*, Args...>
527template<
typename Ret,
typename Object,
typename... Args>
528concrete_execution_stage<Ret,
const Object*, Args...>
529make_execution_stage(
const sstring& name, scheduling_group sg, Ret (Object::*fn)(Args...)
const) {
530 return concrete_execution_stage<Ret,
const Object*, Args...>(name, sg, std::mem_fn(fn));
533template<
typename Ret,
typename Object,
typename... Args>
534concrete_execution_stage<Ret, Object*, Args...>
539template<
typename Ret,
typename Object,
typename... Args>
540concrete_execution_stage<Ret,
const Object*, Args...>
Concrete execution stage class.
Definition: execution_stage.hh:211
return_type operator()(typename internal::wrap_for_es< Args >::type... args)
Definition: execution_stage.hh:291
Base execution stage class.
Definition: execution_stage.hh:123
const sstring & name() const noexcept
Returns execution stage name.
Definition: execution_stage.hh:155
const stats & get_stats() const noexcept
Returns execution stage usage statistics.
Definition: execution_stage.hh:158
execution_stage(execution_stage &&)
bool poll() const noexcept
Definition: execution_stage.hh:172
Definition: execution_stage.hh:125
Concrete execution stage class, with support for automatic scheduling_group inheritance.
Definition: execution_stage.hh:324
inheriting_concrete_execution_stage(const sstring &name, noncopyable_function< ReturnType(Args...)> f)
Definition: execution_stage.hh:352
inheriting_execution_stage::stats get_stats() const noexcept
Definition: execution_stage.hh:393
return_type operator()(typename internal::wrap_for_es< Args >::type... args)
Definition: execution_stage.hh:376
Base class for execution stages with support for automatic scheduling_group inheritance.
Definition: execution_stage.hh:305
Definition: execution_stage.hh:307
hold a single metric group Initialization is done in the constructor or with a call to add_group
Definition: metrics_registration.hh:160
Identifies function calls that are accounted as a group.
Definition: scheduling.hh:285
auto make_execution_stage(const sstring &name, scheduling_group sg, Function &&fn)
Definition: execution_stage.hh:453
auto tuple_map(const std::tuple< Elements... > &t, Function &&f)
Definition: tuple_utils.hh:139
reference_wrapper< T > ref(T &object) noexcept
Wraps reference in a reference_wrapper.
Definition: reference_wrapper.hh:62
Definition: tuple_utils.hh:96
header for metrics creation.
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
scheduling_group current_scheduling_group() noexcept
Returns the current scheduling group.
Definition: scheduling.hh:399
Definition: noncopyable_function.hh:37
Definition: function_traits.hh:62
static type apply(Func &&func, std::tuple< FuncArgs... > &&args) noexcept