Seastar
High performance C++ framework for concurrent servers
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Modules Pages
fair_queue.hh
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18
19/*
20 * Copyright (C) 2016 ScyllaDB
21 */
22#pragma once
23
24#include <boost/intrusive/slist.hpp>
25#include <seastar/core/sstring.hh>
26#include <seastar/core/shared_ptr.hh>
27#include <seastar/core/circular_buffer.hh>
29#include <seastar/util/assert.hh>
30#include <seastar/util/shared_token_bucket.hh>
31
32#include <chrono>
33#include <cstdint>
34#include <functional>
35#include <optional>
36#include <queue>
37#include <fmt/ostream.h>
38
39namespace bi = boost::intrusive;
40
41namespace seastar {
42
51 uint32_t _weight = 0;
52 uint32_t _size = 0;
53public:
58 fair_queue_ticket(uint32_t weight, uint32_t size) noexcept;
59 fair_queue_ticket() noexcept {}
60 fair_queue_ticket operator+(fair_queue_ticket desc) const noexcept;
61 fair_queue_ticket operator-(fair_queue_ticket desc) const noexcept;
70 bool operator==(const fair_queue_ticket& desc) const noexcept;
71
76 explicit operator bool() const noexcept;
77 bool is_non_zero() const noexcept;
78
79 friend std::ostream& operator<<(std::ostream& os, fair_queue_ticket t);
80
94 float normalize(fair_queue_ticket axis) const noexcept;
95
96 /*
97 * For both dimentions checks if the first rover is ahead of the
98 * second and returns the difference. If behind returns zero.
99 */
100 friend fair_queue_ticket wrapping_difference(const fair_queue_ticket& a, const fair_queue_ticket& b) noexcept;
101};
102
105
107public:
108 // The capacity_t represents tokens each entry needs to get dispatched, in
109 // a 'normalized' form -- converted from floating-point to fixed-point number
110 // and scaled accrding to fair-group's token-bucket duration
111 using capacity_t = uint64_t;
112 friend class fair_queue;
113
114private:
115 capacity_t _capacity;
116 bi::slist_member_hook<> _hook;
117
118public:
119 explicit fair_queue_entry(capacity_t c) noexcept
120 : _capacity(c) {}
121 using container_list_t = bi::slist<fair_queue_entry,
122 bi::constant_time_size<false>,
123 bi::cache_last<true>,
124 bi::member_hook<fair_queue_entry, bi::slist_member_hook<>, &fair_queue_entry::_hook>>;
125
126 capacity_t capacity() const noexcept { return _capacity; }
127};
128
140public:
141 using capacity_t = fair_queue_entry::capacity_t;
142 using clock_type = std::chrono::steady_clock;
143
144 /*
145 * tldr; The math
146 *
147 * Bw, Br -- write/read bandwidth (bytes per second)
148 * Ow, Or -- write/read iops (ops per second)
149 *
150 * xx_max -- their maximum values (configured)
151 *
152 * Throttling formula:
153 *
154 * Bw/Bw_max + Br/Br_max + Ow/Ow_max + Or/Or_max <= K
155 *
156 * where K is the scalar value <= 1.0 (also configured)
157 *
158 * Bandwidth is bytes time derivatite, iops is ops time derivative, i.e.
159 * Bx = d(bx)/dt, Ox = d(ox)/dt. Then the formula turns into
160 *
161 * d(bw/Bw_max + br/Br_max + ow/Ow_max + or/Or_max)/dt <= K
162 *
163 * Fair queue tickets are {w, s} weight-size pairs that are
164 *
165 * s = read_base_count * br, for reads
166 * Br_max/Bw_max * read_base_count * bw, for writes
167 *
168 * w = read_base_count, for reads
169 * Or_max/Ow_max * read_base_count, for writes
170 *
171 * Thus the formula turns into
172 *
173 * d(sum(w/W + s/S))/dr <= K
174 *
175 * where {w, s} is the ticket value if a request and sum summarizes the
176 * ticket values from all the requests seen so far, {W, S} is the ticket
177 * value that corresonds to a virtual summary of Or_max requests of
178 * Br_max size total.
179 */
180
181 /*
182 * The normalization results in a float of the 2^-30 seconds order of
183 * magnitude. Not to invent float point atomic arithmetics, the result
184 * is converted to an integer by multiplying by a factor that's large
185 * enough to turn these values into a non-zero integer.
186 *
187 * Also, the rates in bytes/sec when adjusted by io-queue according to
188 * multipliers become too large to be stored in 32-bit ticket value.
189 * Thus the rate resolution is applied. The t.bucket is configured with a
190 * time period for which the speeds from F (in above formula) are taken.
191 */
192
193 static constexpr float fixed_point_factor = float(1 << 24);
194 using rate_resolution = std::milli;
195 using token_bucket_t = internal::shared_token_bucket<capacity_t, rate_resolution, internal::capped_release::no>;
196
197private:
198
199 /*
200 * The dF/dt <= K limitation is managed by the modified token bucket
201 * algo where tokens are ticket.normalize(cost_capacity), the refill
202 * rate is K.
203 *
204 * The token bucket algo must have the limit on the number of tokens
205 * accumulated. Here it's configured so that it accumulates for the
206 * latency_goal duration.
207 *
208 * The replenish threshold is the minimal number of tokens to put back.
209 * It's reserved for future use to reduce the load on the replenish
210 * timestamp.
211 *
212 * The timestamp, in turn, is the time when the bucket was replenished
213 * last. Every time a shard tries to get tokens from bucket it first
214 * tries to convert the time that had passed since this timestamp
215 * into more tokens in the bucket.
216 */
217
218 token_bucket_t _token_bucket;
219 const capacity_t _per_tick_threshold;
220
221public:
222
223 // Convert internal capacity value back into the real token
224 static double capacity_tokens(capacity_t cap) noexcept {
225 return (double)cap / fixed_point_factor / token_bucket_t::rate_cast(std::chrono::seconds(1)).count();
226 }
227
228 // Convert floating-point tokens into the token bucket capacity
229 static capacity_t tokens_capacity(double tokens) noexcept {
230 return tokens * token_bucket_t::rate_cast(std::chrono::seconds(1)).count() * fixed_point_factor;
231 }
232
233 auto capacity_duration(capacity_t cap) const noexcept {
234 return _token_bucket.duration_for(cap);
235 }
236
237 struct config {
238 sstring label = "";
239 /*
240 * There are two "min" values that can be configured. The former one
241 * is the minimal weight:size pair that the upper layer is going to
242 * submit. However, it can submit _larger_ values, and the fair queue
243 * must accept those as large as the latter pair (but it can accept
244 * even larger values, of course)
245 */
246 double min_tokens = 0.0;
247 double limit_min_tokens = 0.0;
248 std::chrono::duration<double> rate_limit_duration = std::chrono::milliseconds(1);
249 };
250
251 explicit fair_group(config cfg, unsigned nr_queues);
252 fair_group(fair_group&&) = delete;
253
254 capacity_t maximum_capacity() const noexcept { return _token_bucket.limit(); }
255 capacity_t per_tick_grab_threshold() const noexcept { return _per_tick_threshold; }
256 capacity_t grab_capacity(capacity_t cap) noexcept;
257 clock_type::time_point replenished_ts() const noexcept { return _token_bucket.replenished_ts(); }
258 void refund_tokens(capacity_t) noexcept;
259 void replenish_capacity(clock_type::time_point now) noexcept;
260 void maybe_replenish_capacity(clock_type::time_point& local_ts) noexcept;
261
262 capacity_t capacity_deficiency(capacity_t from) const noexcept;
263
264 std::chrono::duration<double> rate_limit_duration() const noexcept {
265 std::chrono::duration<double, rate_resolution> dur((double)_token_bucket.limit() / _token_bucket.rate());
266 return std::chrono::duration_cast<std::chrono::duration<double>>(dur);
267 }
268
269 const token_bucket_t& token_bucket() const noexcept { return _token_bucket; }
270};
271
292public:
297 struct config {
298 sstring label = "";
299 std::chrono::microseconds tau = std::chrono::milliseconds(5);
300 };
301
302 using class_id = unsigned int;
303 class priority_class_data;
304 using capacity_t = fair_group::capacity_t;
305 using signed_capacity_t = std::make_signed_t<capacity_t>;
306
307private:
308 using clock_type = std::chrono::steady_clock;
309 using priority_class_ptr = priority_class_data*;
310 struct class_compare {
311 bool operator() (const priority_class_ptr& lhs, const priority_class_ptr & rhs) const noexcept;
312 };
313
314 class priority_queue : public std::priority_queue<priority_class_ptr, std::vector<priority_class_ptr>, class_compare> {
315 using super = std::priority_queue<priority_class_ptr, std::vector<priority_class_ptr>, class_compare>;
316 public:
317 void reserve(size_t len) {
318 c.reserve(len);
319 }
320
321 void assert_enough_capacity() const noexcept {
322 SEASTAR_ASSERT(c.size() < c.capacity());
323 }
324 };
325
326 config _config;
327 fair_group& _group;
328 clock_type::time_point _group_replenish;
329 fair_queue_ticket _resources_executing;
330 fair_queue_ticket _resources_queued;
331 priority_queue _handles;
332 std::vector<std::unique_ptr<priority_class_data>> _priority_classes;
333 size_t _nr_classes = 0;
334 capacity_t _last_accumulated = 0;
335
336 // _pending represents a reservation of tokens from the bucket.
337 //
338 // In the "dispatch timeline" defined by the growing bucket head of the group,
339 // tokens in the range [_pending.head - cap, _pending.head) belong
340 // to this queue.
341 //
342 // For example, if:
343 // _group._token_bucket.head == 300
344 // _pending.head == 700
345 // _pending.cap == 500
346 // then the reservation is [200, 700), 100 tokens are ready to be dispatched by this queue,
347 // and another 400 tokens are going to be appear soon. (And after that, this queue
348 // will be able to make its next reservation).
349 struct pending {
350 capacity_t head = 0;
351 capacity_t cap = 0;
352 };
353 pending _pending;
354
355 // Total capacity of all requests waiting in the queue.
356 capacity_t _queued_capacity = 0;
357
358 void push_priority_class(priority_class_data& pc) noexcept;
359 void push_priority_class_from_idle(priority_class_data& pc) noexcept;
360 void pop_priority_class(priority_class_data& pc) noexcept;
361 void plug_priority_class(priority_class_data& pc) noexcept;
362 void unplug_priority_class(priority_class_data& pc) noexcept;
363
364 // Replaces _pending with a new reservation starting at the current
365 // group bucket tail.
366 void grab_capacity(capacity_t cap) noexcept;
367 // Shaves off the fulfilled frontal part from `_pending` (if any),
368 // and returns the fulfilled tokens in `ready_tokens`.
369 // Sets `our_turn_has_come` to the truth value of "`_pending` is empty or
370 // there are no unfulfilled reservations (from other shards) earlier than `_pending`".
371 //
372 // Assumes that `_group.maybe_replenish_capacity()` was called recently.
373 struct reap_result {
374 capacity_t ready_tokens;
375 bool our_turn_has_come;
376 };
377 reap_result reap_pending_capacity() noexcept;
378public:
382 explicit fair_queue(fair_group& shared, config cfg);
383 fair_queue(fair_queue&&) = delete;
384 ~fair_queue();
385
386 sstring label() const noexcept { return _config.label; }
387
391 void register_priority_class(class_id c, uint32_t shares);
392
397
398 void update_shares_for_class(class_id c, uint32_t new_shares);
399
402
405
406 capacity_t tokens_capacity(double tokens) const noexcept {
407 return _group.tokens_capacity(tokens);
408 }
409
410 capacity_t maximum_capacity() const noexcept {
411 return _group.maximum_capacity();
412 }
413
418 void queue(class_id c, fair_queue_entry& ent) noexcept;
419
420 void plug_class(class_id c) noexcept;
421 void unplug_class(class_id c) noexcept;
422
425 void notify_request_finished(fair_queue_entry::capacity_t cap) noexcept;
426 void notify_request_cancelled(fair_queue_entry& ent) noexcept;
427
429 void dispatch_requests(std::function<void(fair_queue_entry&)> cb);
430
431 clock_type::time_point next_pending_aio() const noexcept;
432
433 std::vector<seastar::metrics::impl::metric_definition_impl> metrics(class_id c);
434};
436
437}
438
439#if FMT_VERSION >= 90000
440template <> struct fmt::formatter<seastar::fair_queue_ticket> : fmt::ostream_formatter {};
441#endif
Group of queues class.
Definition: fair_queue.hh:139
Definition: fair_queue.hh:237
Definition: fair_queue.hh:106
describes a request that passes through the fair_queue.
Definition: fair_queue.hh:50
fair_queue_ticket(uint32_t weight, uint32_t size) noexcept
bool operator==(const fair_queue_ticket &desc) const noexcept
fair_queue_ticket & operator-=(fair_queue_ticket desc) noexcept
float normalize(fair_queue_ticket axis) const noexcept
fair_queue_ticket & operator+=(fair_queue_ticket desc) noexcept
Fair queuing class.
Definition: fair_queue.hh:291
void queue(class_id c, fair_queue_entry &ent) noexcept
void unregister_priority_class(class_id c)
fair_queue_ticket resources_currently_executing() const
void notify_request_finished(fair_queue_entry::capacity_t cap) noexcept
void dispatch_requests(std::function< void(fair_queue_entry &)> cb)
Try to execute new requests if there is capacity left in the queue.
fair_queue_ticket resources_currently_waiting() const
void register_priority_class(class_id c, uint32_t shares)
Fair Queue configuration structure.
Definition: fair_queue.hh:297
future now()
Returns a ready future.
Definition: later.hh:35
holds the metric_groups definition needed by class that reports metrics
holds the implementation parts of the metrics layer, do not use directly.
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
STL namespace.