Seastar
High performance C++ framework for concurrent servers
fair_queue.hh
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18
19/*
20 * Copyright (C) 2016 ScyllaDB
21 */
22#pragma once
23
24#include <boost/intrusive/slist.hpp>
25#include <seastar/core/sstring.hh>
26#include <seastar/core/shared_ptr.hh>
27#include <seastar/core/circular_buffer.hh>
29#include <seastar/util/shared_token_bucket.hh>
30
31#include <chrono>
32#include <cstdint>
33#include <functional>
34#include <optional>
35#include <queue>
36#include <fmt/ostream.h>
37
38namespace bi = boost::intrusive;
39
40namespace seastar {
41
50 uint32_t _weight = 0;
51 uint32_t _size = 0;
52public:
57 fair_queue_ticket(uint32_t weight, uint32_t size) noexcept;
58 fair_queue_ticket() noexcept {}
59 fair_queue_ticket operator+(fair_queue_ticket desc) const noexcept;
60 fair_queue_ticket operator-(fair_queue_ticket desc) const noexcept;
69 bool operator==(const fair_queue_ticket& desc) const noexcept;
70
75 explicit operator bool() const noexcept;
76 bool is_non_zero() const noexcept;
77
78 friend std::ostream& operator<<(std::ostream& os, fair_queue_ticket t);
79
93 float normalize(fair_queue_ticket axis) const noexcept;
94
95 /*
96 * For both dimentions checks if the first rover is ahead of the
97 * second and returns the difference. If behind returns zero.
98 */
99 friend fair_queue_ticket wrapping_difference(const fair_queue_ticket& a, const fair_queue_ticket& b) noexcept;
100};
101
104
106public:
107 // The capacity_t represents tokens each entry needs to get dispatched, in
108 // a 'normalized' form -- converted from floating-point to fixed-point number
109 // and scaled accrding to fair-group's token-bucket duration
110 using capacity_t = uint64_t;
111 friend class fair_queue;
112
113private:
114 capacity_t _capacity;
115 bi::slist_member_hook<> _hook;
116
117public:
118 explicit fair_queue_entry(capacity_t c) noexcept
119 : _capacity(c) {}
120 using container_list_t = bi::slist<fair_queue_entry,
121 bi::constant_time_size<false>,
122 bi::cache_last<true>,
123 bi::member_hook<fair_queue_entry, bi::slist_member_hook<>, &fair_queue_entry::_hook>>;
124
125 capacity_t capacity() const noexcept { return _capacity; }
126};
127
139public:
140 using capacity_t = fair_queue_entry::capacity_t;
141 using clock_type = std::chrono::steady_clock;
142
143 /*
144 * tldr; The math
145 *
146 * Bw, Br -- write/read bandwidth (bytes per second)
147 * Ow, Or -- write/read iops (ops per second)
148 *
149 * xx_max -- their maximum values (configured)
150 *
151 * Throttling formula:
152 *
153 * Bw/Bw_max + Br/Br_max + Ow/Ow_max + Or/Or_max <= K
154 *
155 * where K is the scalar value <= 1.0 (also configured)
156 *
157 * Bandwidth is bytes time derivatite, iops is ops time derivative, i.e.
158 * Bx = d(bx)/dt, Ox = d(ox)/dt. Then the formula turns into
159 *
160 * d(bw/Bw_max + br/Br_max + ow/Ow_max + or/Or_max)/dt <= K
161 *
162 * Fair queue tickets are {w, s} weight-size pairs that are
163 *
164 * s = read_base_count * br, for reads
165 * Br_max/Bw_max * read_base_count * bw, for writes
166 *
167 * w = read_base_count, for reads
168 * Or_max/Ow_max * read_base_count, for writes
169 *
170 * Thus the formula turns into
171 *
172 * d(sum(w/W + s/S))/dr <= K
173 *
174 * where {w, s} is the ticket value if a request and sum summarizes the
175 * ticket values from all the requests seen so far, {W, S} is the ticket
176 * value that corresonds to a virtual summary of Or_max requests of
177 * Br_max size total.
178 */
179
180 /*
181 * The normalization results in a float of the 2^-30 seconds order of
182 * magnitude. Not to invent float point atomic arithmetics, the result
183 * is converted to an integer by multiplying by a factor that's large
184 * enough to turn these values into a non-zero integer.
185 *
186 * Also, the rates in bytes/sec when adjusted by io-queue according to
187 * multipliers become too large to be stored in 32-bit ticket value.
188 * Thus the rate resolution is applied. The t.bucket is configured with a
189 * time period for which the speeds from F (in above formula) are taken.
190 */
191
192 static constexpr float fixed_point_factor = float(1 << 24);
193 using rate_resolution = std::milli;
194 using token_bucket_t = internal::shared_token_bucket<capacity_t, rate_resolution, internal::capped_release::no>;
195
196private:
197
198 /*
199 * The dF/dt <= K limitation is managed by the modified token bucket
200 * algo where tokens are ticket.normalize(cost_capacity), the refill
201 * rate is K.
202 *
203 * The token bucket algo must have the limit on the number of tokens
204 * accumulated. Here it's configured so that it accumulates for the
205 * latency_goal duration.
206 *
207 * The replenish threshold is the minimal number of tokens to put back.
208 * It's reserved for future use to reduce the load on the replenish
209 * timestamp.
210 *
211 * The timestamp, in turn, is the time when the bucket was replenished
212 * last. Every time a shard tries to get tokens from bucket it first
213 * tries to convert the time that had passed since this timestamp
214 * into more tokens in the bucket.
215 */
216
217 token_bucket_t _token_bucket;
218 const capacity_t _per_tick_threshold;
219
220public:
221
222 // Convert internal capacity value back into the real token
223 static double capacity_tokens(capacity_t cap) noexcept {
224 return (double)cap / fixed_point_factor / token_bucket_t::rate_cast(std::chrono::seconds(1)).count();
225 }
226
227 // Convert floating-point tokens into the token bucket capacity
228 static capacity_t tokens_capacity(double tokens) noexcept {
229 return tokens * token_bucket_t::rate_cast(std::chrono::seconds(1)).count() * fixed_point_factor;
230 }
231
232 auto capacity_duration(capacity_t cap) const noexcept {
233 return _token_bucket.duration_for(cap);
234 }
235
236 struct config {
237 sstring label = "";
238 /*
239 * There are two "min" values that can be configured. The former one
240 * is the minimal weight:size pair that the upper layer is going to
241 * submit. However, it can submit _larger_ values, and the fair queue
242 * must accept those as large as the latter pair (but it can accept
243 * even larger values, of course)
244 */
245 double min_tokens = 0.0;
246 double limit_min_tokens = 0.0;
247 std::chrono::duration<double> rate_limit_duration = std::chrono::milliseconds(1);
248 };
249
250 explicit fair_group(config cfg, unsigned nr_queues);
251 fair_group(fair_group&&) = delete;
252
253 capacity_t maximum_capacity() const noexcept { return _token_bucket.limit(); }
254 capacity_t per_tick_grab_threshold() const noexcept { return _per_tick_threshold; }
255 capacity_t grab_capacity(capacity_t cap) noexcept;
256 clock_type::time_point replenished_ts() const noexcept { return _token_bucket.replenished_ts(); }
257 void refund_tokens(capacity_t) noexcept;
258 void replenish_capacity(clock_type::time_point now) noexcept;
259 void maybe_replenish_capacity(clock_type::time_point& local_ts) noexcept;
260
261 capacity_t capacity_deficiency(capacity_t from) const noexcept;
262
263 std::chrono::duration<double> rate_limit_duration() const noexcept {
264 std::chrono::duration<double, rate_resolution> dur((double)_token_bucket.limit() / _token_bucket.rate());
265 return std::chrono::duration_cast<std::chrono::duration<double>>(dur);
266 }
267
268 const token_bucket_t& token_bucket() const noexcept { return _token_bucket; }
269};
270
291public:
296 struct config {
297 sstring label = "";
298 std::chrono::microseconds tau = std::chrono::milliseconds(5);
299 };
300
301 using class_id = unsigned int;
302 class priority_class_data;
303 using capacity_t = fair_group::capacity_t;
304 using signed_capacity_t = std::make_signed_t<capacity_t>;
305
306private:
307 using clock_type = std::chrono::steady_clock;
308 using priority_class_ptr = priority_class_data*;
309 struct class_compare {
310 bool operator() (const priority_class_ptr& lhs, const priority_class_ptr & rhs) const noexcept;
311 };
312
313 class priority_queue : public std::priority_queue<priority_class_ptr, std::vector<priority_class_ptr>, class_compare> {
314 using super = std::priority_queue<priority_class_ptr, std::vector<priority_class_ptr>, class_compare>;
315 public:
316 void reserve(size_t len) {
317 c.reserve(len);
318 }
319
320 void assert_enough_capacity() const noexcept {
321 assert(c.size() < c.capacity());
322 }
323 };
324
325 config _config;
326 fair_group& _group;
327 clock_type::time_point _group_replenish;
328 fair_queue_ticket _resources_executing;
329 fair_queue_ticket _resources_queued;
330 priority_queue _handles;
331 std::vector<std::unique_ptr<priority_class_data>> _priority_classes;
332 size_t _nr_classes = 0;
333 capacity_t _last_accumulated = 0;
334
335 // _pending represents a reservation of tokens from the bucket.
336 //
337 // In the "dispatch timeline" defined by the growing bucket head of the group,
338 // tokens in the range [_pending.head - cap, _pending.head) belong
339 // to this queue.
340 //
341 // For example, if:
342 // _group._token_bucket.head == 300
343 // _pending.head == 700
344 // _pending.cap == 500
345 // then the reservation is [200, 700), 100 tokens are ready to be dispatched by this queue,
346 // and another 400 tokens are going to be appear soon. (And after that, this queue
347 // will be able to make its next reservation).
348 struct pending {
349 capacity_t head = 0;
350 capacity_t cap = 0;
351 };
352 pending _pending;
353
354 // Total capacity of all requests waiting in the queue.
355 capacity_t _queued_capacity = 0;
356
357 void push_priority_class(priority_class_data& pc) noexcept;
358 void push_priority_class_from_idle(priority_class_data& pc) noexcept;
359 void pop_priority_class(priority_class_data& pc) noexcept;
360 void plug_priority_class(priority_class_data& pc) noexcept;
361 void unplug_priority_class(priority_class_data& pc) noexcept;
362
363 // Replaces _pending with a new reservation starting at the current
364 // group bucket tail.
365 void grab_capacity(capacity_t cap) noexcept;
366 // Shaves off the fulfilled frontal part from `_pending` (if any),
367 // and returns the fulfilled tokens in `ready_tokens`.
368 // Sets `our_turn_has_come` to the truth value of "`_pending` is empty or
369 // there are no unfulfilled reservations (from other shards) earlier than `_pending`".
370 //
371 // Assumes that `_group.maybe_replenish_capacity()` was called recently.
372 struct reap_result {
373 capacity_t ready_tokens;
374 bool our_turn_has_come;
375 };
376 reap_result reap_pending_capacity() noexcept;
377public:
381 explicit fair_queue(fair_group& shared, config cfg);
382 fair_queue(fair_queue&&) = delete;
383 ~fair_queue();
384
385 sstring label() const noexcept { return _config.label; }
386
390 void register_priority_class(class_id c, uint32_t shares);
391
396
397 void update_shares_for_class(class_id c, uint32_t new_shares);
398
401
404
405 capacity_t tokens_capacity(double tokens) const noexcept {
406 return _group.tokens_capacity(tokens);
407 }
408
409 capacity_t maximum_capacity() const noexcept {
410 return _group.maximum_capacity();
411 }
412
417 void queue(class_id c, fair_queue_entry& ent) noexcept;
418
419 void plug_class(class_id c) noexcept;
420 void unplug_class(class_id c) noexcept;
421
424 void notify_request_finished(fair_queue_entry::capacity_t cap) noexcept;
425 void notify_request_cancelled(fair_queue_entry& ent) noexcept;
426
428 void dispatch_requests(std::function<void(fair_queue_entry&)> cb);
429
430 clock_type::time_point next_pending_aio() const noexcept;
431
432 std::vector<seastar::metrics::impl::metric_definition_impl> metrics(class_id c);
433};
435
436}
437
438#if FMT_VERSION >= 90000
439template <> struct fmt::formatter<seastar::fair_queue_ticket> : fmt::ostream_formatter {};
440#endif
Group of queues class.
Definition: fair_queue.hh:138
Definition: fair_queue.hh:236
Definition: fair_queue.hh:105
describes a request that passes through the fair_queue.
Definition: fair_queue.hh:49
fair_queue_ticket(uint32_t weight, uint32_t size) noexcept
bool operator==(const fair_queue_ticket &desc) const noexcept
fair_queue_ticket & operator-=(fair_queue_ticket desc) noexcept
float normalize(fair_queue_ticket axis) const noexcept
fair_queue_ticket & operator+=(fair_queue_ticket desc) noexcept
Fair queuing class.
Definition: fair_queue.hh:290
void queue(class_id c, fair_queue_entry &ent) noexcept
void unregister_priority_class(class_id c)
fair_queue_ticket resources_currently_executing() const
void notify_request_finished(fair_queue_entry::capacity_t cap) noexcept
void dispatch_requests(std::function< void(fair_queue_entry &)> cb)
Try to execute new requests if there is capacity left in the queue.
fair_queue_ticket resources_currently_waiting() const
void register_priority_class(class_id c, uint32_t shares)
Fair Queue configuration structure.
Definition: fair_queue.hh:296
future now()
Returns a ready future.
Definition: later.hh:35
holds the metric_groups definition needed by class that reports metrics
holds the implementation parts of the metrics layer, do not use directly.
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
STL namespace.