24#include <boost/intrusive/slist.hpp>
25#include <seastar/core/sstring.hh>
26#include <seastar/core/shared_ptr.hh>
27#include <seastar/core/circular_buffer.hh>
29#include <seastar/util/shared_token_bucket.hh>
36#include <fmt/ostream.h>
38namespace bi = boost::intrusive;
75 explicit operator bool() const noexcept;
76 bool is_non_zero() const noexcept;
110 using capacity_t = uint64_t;
114 capacity_t _capacity;
115 bi::slist_member_hook<> _hook;
121 bi::constant_time_size<false>,
122 bi::cache_last<true>,
123 bi::member_hook<fair_queue_entry, bi::slist_member_hook<>, &fair_queue_entry::_hook>>;
125 capacity_t capacity()
const noexcept {
return _capacity; }
140 using capacity_t = fair_queue_entry::capacity_t;
141 using clock_type = std::chrono::steady_clock;
192 static constexpr float fixed_point_factor = float(1 << 24);
193 using rate_resolution = std::milli;
194 using token_bucket_t = internal::shared_token_bucket<capacity_t, rate_resolution, internal::capped_release::no>;
217 token_bucket_t _token_bucket;
218 const capacity_t _per_tick_threshold;
223 static double capacity_tokens(capacity_t cap)
noexcept {
224 return (
double)cap / fixed_point_factor / token_bucket_t::rate_cast(std::chrono::seconds(1)).count();
228 static capacity_t tokens_capacity(
double tokens)
noexcept {
229 return tokens * token_bucket_t::rate_cast(std::chrono::seconds(1)).count() * fixed_point_factor;
232 auto capacity_duration(capacity_t cap)
const noexcept {
233 return _token_bucket.duration_for(cap);
245 double min_tokens = 0.0;
246 double limit_min_tokens = 0.0;
247 std::chrono::duration<double> rate_limit_duration = std::chrono::milliseconds(1);
253 capacity_t maximum_capacity() const noexcept {
return _token_bucket.limit(); }
254 capacity_t per_tick_grab_threshold() const noexcept {
return _per_tick_threshold; }
255 capacity_t grab_capacity(capacity_t cap)
noexcept;
256 clock_type::time_point replenished_ts() const noexcept {
return _token_bucket.replenished_ts(); }
257 void replenish_capacity(clock_type::time_point
now)
noexcept;
258 void maybe_replenish_capacity(clock_type::time_point& local_ts)
noexcept;
260 capacity_t capacity_deficiency(capacity_t from)
const noexcept;
262 std::chrono::duration<double> rate_limit_duration() const noexcept {
263 std::chrono::duration<double, rate_resolution> dur((
double)_token_bucket.limit() / _token_bucket.rate());
264 return std::chrono::duration_cast<std::chrono::duration<double>>(dur);
267 const token_bucket_t& token_bucket() const noexcept {
return _token_bucket; }
297 std::chrono::microseconds tau = std::chrono::milliseconds(5);
300 using class_id =
unsigned int;
301 class priority_class_data;
302 using capacity_t = fair_group::capacity_t;
303 using signed_capacity_t = std::make_signed_t<capacity_t>;
306 using clock_type = std::chrono::steady_clock;
307 using priority_class_ptr = priority_class_data*;
308 struct class_compare {
309 bool operator() (
const priority_class_ptr& lhs,
const priority_class_ptr & rhs)
const noexcept;
312 class priority_queue :
public std::priority_queue<priority_class_ptr, std::vector<priority_class_ptr>, class_compare> {
313 using super = std::priority_queue<priority_class_ptr, std::vector<priority_class_ptr>, class_compare>;
315 void reserve(
size_t len) {
319 void assert_enough_capacity() const noexcept {
320 assert(c.size() < c.capacity());
326 clock_type::time_point _group_replenish;
327 fair_queue_ticket _resources_executing;
328 fair_queue_ticket _resources_queued;
329 priority_queue _handles;
330 std::vector<std::unique_ptr<priority_class_data>> _priority_classes;
331 size_t _nr_classes = 0;
332 capacity_t _last_accumulated = 0;
349 pending(capacity_t t, capacity_t c) noexcept : head(t), cap(c) {}
352 std::optional<pending> _pending;
354 void push_priority_class(priority_class_data& pc)
noexcept;
355 void push_priority_class_from_idle(priority_class_data& pc)
noexcept;
356 void pop_priority_class(priority_class_data& pc)
noexcept;
357 void plug_priority_class(priority_class_data& pc)
noexcept;
358 void unplug_priority_class(priority_class_data& pc)
noexcept;
360 enum class grab_result { grabbed, cant_preempt, pending };
361 grab_result grab_capacity(
const fair_queue_entry& ent)
noexcept;
362 grab_result grab_pending_capacity(
const fair_queue_entry& ent)
noexcept;
371 sstring label() const noexcept {
return _config.label; }
383 void update_shares_for_class(class_id c, uint32_t new_shares);
391 capacity_t tokens_capacity(
double tokens)
const noexcept {
392 return _group.tokens_capacity(tokens);
395 capacity_t maximum_capacity() const noexcept {
396 return _group.maximum_capacity();
405 void plug_class(class_id c)
noexcept;
406 void unplug_class(class_id c)
noexcept;
416 clock_type::time_point next_pending_aio() const noexcept;
418 std::vector<
seastar::metrics::
impl::metric_definition_impl> metrics(class_id c);
424#if FMT_VERSION >= 90000
Group of queues class.
Definition: fair_queue.hh:138
Definition: fair_queue.hh:236
Definition: fair_queue.hh:105
describes a request that passes through the fair_queue.
Definition: fair_queue.hh:49
fair_queue_ticket(uint32_t weight, uint32_t size) noexcept
bool operator==(const fair_queue_ticket &desc) const noexcept
fair_queue_ticket & operator-=(fair_queue_ticket desc) noexcept
float normalize(fair_queue_ticket axis) const noexcept
fair_queue_ticket & operator+=(fair_queue_ticket desc) noexcept
Fair queuing class.
Definition: fair_queue.hh:289
void queue(class_id c, fair_queue_entry &ent) noexcept
void unregister_priority_class(class_id c)
fair_queue_ticket resources_currently_executing() const
fair_queue(fair_group &shared, config cfg)
void notify_request_finished(fair_queue_entry::capacity_t cap) noexcept
void dispatch_requests(std::function< void(fair_queue_entry &)> cb)
Try to execute new requests if there is capacity left in the queue.
fair_queue_ticket resources_currently_waiting() const
void register_priority_class(class_id c, uint32_t shares)
Fair Queue configuration structure.
Definition: fair_queue.hh:295
future now()
Returns a ready future.
Definition: later.hh:35
holds the metric_groups definition needed by class that reports metrics
holds the implementation parts of the metrics layer, do not use directly.
Seastar API namespace.
Definition: abort_on_ebadf.hh:26