24#include <boost/intrusive/slist.hpp>
25#include <seastar/core/sstring.hh>
26#include <seastar/core/shared_ptr.hh>
27#include <seastar/core/circular_buffer.hh>
29#include <seastar/util/assert.hh>
30#include <seastar/util/shared_token_bucket.hh>
37#include <fmt/ostream.h>
39namespace bi = boost::intrusive;
76 explicit operator bool() const noexcept;
77 bool is_non_zero() const noexcept;
111 using capacity_t = uint64_t;
115 capacity_t _capacity;
116 bi::slist_member_hook<> _hook;
122 bi::constant_time_size<false>,
123 bi::cache_last<true>,
124 bi::member_hook<fair_queue_entry, bi::slist_member_hook<>, &fair_queue_entry::_hook>>;
126 capacity_t capacity()
const noexcept {
return _capacity; }
141 using capacity_t = fair_queue_entry::capacity_t;
142 using clock_type = std::chrono::steady_clock;
193 static constexpr float fixed_point_factor = float(1 << 24);
194 using rate_resolution = std::milli;
195 using token_bucket_t = internal::shared_token_bucket<capacity_t, rate_resolution, internal::capped_release::no>;
218 token_bucket_t _token_bucket;
219 const capacity_t _per_tick_threshold;
224 static double capacity_tokens(capacity_t cap)
noexcept {
225 return (
double)cap / fixed_point_factor / token_bucket_t::rate_cast(std::chrono::seconds(1)).count();
229 static capacity_t tokens_capacity(
double tokens)
noexcept {
230 return tokens * token_bucket_t::rate_cast(std::chrono::seconds(1)).count() * fixed_point_factor;
233 auto capacity_duration(capacity_t cap)
const noexcept {
234 return _token_bucket.duration_for(cap);
246 double min_tokens = 0.0;
247 double limit_min_tokens = 0.0;
248 std::chrono::duration<double> rate_limit_duration = std::chrono::milliseconds(1);
254 capacity_t maximum_capacity() const noexcept {
return _token_bucket.limit(); }
255 capacity_t per_tick_grab_threshold() const noexcept {
return _per_tick_threshold; }
256 capacity_t grab_capacity(capacity_t cap)
noexcept;
257 clock_type::time_point replenished_ts() const noexcept {
return _token_bucket.replenished_ts(); }
258 void refund_tokens(capacity_t)
noexcept;
259 void replenish_capacity(clock_type::time_point
now)
noexcept;
260 void maybe_replenish_capacity(clock_type::time_point& local_ts)
noexcept;
262 capacity_t capacity_deficiency(capacity_t from)
const noexcept;
264 std::chrono::duration<double> rate_limit_duration() const noexcept {
265 std::chrono::duration<double, rate_resolution> dur((
double)_token_bucket.limit() / _token_bucket.rate());
266 return std::chrono::duration_cast<std::chrono::duration<double>>(dur);
269 const token_bucket_t& token_bucket() const noexcept {
return _token_bucket; }
299 std::chrono::microseconds tau = std::chrono::milliseconds(5);
302 using class_id =
unsigned int;
303 class priority_class_data;
304 using capacity_t = fair_group::capacity_t;
305 using signed_capacity_t = std::make_signed_t<capacity_t>;
308 using clock_type = std::chrono::steady_clock;
309 using priority_class_ptr = priority_class_data*;
310 struct class_compare {
311 bool operator() (
const priority_class_ptr& lhs,
const priority_class_ptr & rhs)
const noexcept;
314 class priority_queue :
public std::priority_queue<priority_class_ptr, std::vector<priority_class_ptr>, class_compare> {
315 using super = std::priority_queue<priority_class_ptr, std::vector<priority_class_ptr>, class_compare>;
317 void reserve(
size_t len) {
321 void assert_enough_capacity() const noexcept {
322 SEASTAR_ASSERT(c.size() < c.capacity());
328 clock_type::time_point _group_replenish;
329 fair_queue_ticket _resources_executing;
330 fair_queue_ticket _resources_queued;
331 priority_queue _handles;
332 std::vector<std::unique_ptr<priority_class_data>> _priority_classes;
333 size_t _nr_classes = 0;
334 capacity_t _last_accumulated = 0;
356 capacity_t _queued_capacity = 0;
358 void push_priority_class(priority_class_data& pc)
noexcept;
359 void push_priority_class_from_idle(priority_class_data& pc)
noexcept;
360 void pop_priority_class(priority_class_data& pc)
noexcept;
361 void plug_priority_class(priority_class_data& pc)
noexcept;
362 void unplug_priority_class(priority_class_data& pc)
noexcept;
366 void grab_capacity(capacity_t cap)
noexcept;
374 capacity_t ready_tokens;
375 bool our_turn_has_come;
377 reap_result reap_pending_capacity() noexcept;
386 sstring label() const noexcept {
return _config.label; }
398 void update_shares_for_class(class_id c, uint32_t new_shares);
406 capacity_t tokens_capacity(
double tokens)
const noexcept {
407 return _group.tokens_capacity(tokens);
410 capacity_t maximum_capacity() const noexcept {
411 return _group.maximum_capacity();
420 void plug_class(class_id c)
noexcept;
421 void unplug_class(class_id c)
noexcept;
431 clock_type::time_point next_pending_aio() const noexcept;
433 std::vector<
seastar::metrics::
impl::metric_definition_impl> metrics(class_id c);
439#if FMT_VERSION >= 90000
Group of queues class.
Definition: fair_queue.hh:139
Definition: fair_queue.hh:237
Definition: fair_queue.hh:106
describes a request that passes through the fair_queue.
Definition: fair_queue.hh:50
fair_queue_ticket(uint32_t weight, uint32_t size) noexcept
bool operator==(const fair_queue_ticket &desc) const noexcept
fair_queue_ticket & operator-=(fair_queue_ticket desc) noexcept
float normalize(fair_queue_ticket axis) const noexcept
fair_queue_ticket & operator+=(fair_queue_ticket desc) noexcept
Fair queuing class.
Definition: fair_queue.hh:291
void queue(class_id c, fair_queue_entry &ent) noexcept
void unregister_priority_class(class_id c)
fair_queue_ticket resources_currently_executing() const
void notify_request_finished(fair_queue_entry::capacity_t cap) noexcept
void dispatch_requests(std::function< void(fair_queue_entry &)> cb)
Try to execute new requests if there is capacity left in the queue.
fair_queue_ticket resources_currently_waiting() const
void register_priority_class(class_id c, uint32_t shares)
Fair Queue configuration structure.
Definition: fair_queue.hh:297
future now()
Returns a ready future.
Definition: later.hh:35
holds the metric_groups definition needed by class that reports metrics
holds the implementation parts of the metrics layer, do not use directly.
Seastar API namespace.
Definition: abort_on_ebadf.hh:26