Seastar
High performance C++ framework for concurrent servers
fair_queue.hh
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18
19/*
20 * Copyright (C) 2016 ScyllaDB
21 */
22#pragma once
23
24#include <boost/intrusive/slist.hpp>
25#include <seastar/core/sstring.hh>
26#include <seastar/core/shared_ptr.hh>
27#include <seastar/core/circular_buffer.hh>
29#include <seastar/util/shared_token_bucket.hh>
30
31#include <chrono>
32#include <cstdint>
33#include <functional>
34#include <optional>
35#include <queue>
36#include <fmt/ostream.h>
37
38namespace bi = boost::intrusive;
39
40namespace seastar {
41
50 uint32_t _weight = 0;
51 uint32_t _size = 0;
52public:
57 fair_queue_ticket(uint32_t weight, uint32_t size) noexcept;
58 fair_queue_ticket() noexcept {}
59 fair_queue_ticket operator+(fair_queue_ticket desc) const noexcept;
60 fair_queue_ticket operator-(fair_queue_ticket desc) const noexcept;
69 bool operator==(const fair_queue_ticket& desc) const noexcept;
70
75 explicit operator bool() const noexcept;
76 bool is_non_zero() const noexcept;
77
78 friend std::ostream& operator<<(std::ostream& os, fair_queue_ticket t);
79
93 float normalize(fair_queue_ticket axis) const noexcept;
94
95 /*
96 * For both dimentions checks if the first rover is ahead of the
97 * second and returns the difference. If behind returns zero.
98 */
99 friend fair_queue_ticket wrapping_difference(const fair_queue_ticket& a, const fair_queue_ticket& b) noexcept;
100};
101
104
106public:
107 // The capacity_t represents tokens each entry needs to get dispatched, in
108 // a 'normalized' form -- converted from floating-point to fixed-point number
109 // and scaled accrding to fair-group's token-bucket duration
110 using capacity_t = uint64_t;
111 friend class fair_queue;
112
113private:
114 capacity_t _capacity;
115 bi::slist_member_hook<> _hook;
116
117public:
118 fair_queue_entry(capacity_t c) noexcept
119 : _capacity(c) {}
120 using container_list_t = bi::slist<fair_queue_entry,
121 bi::constant_time_size<false>,
122 bi::cache_last<true>,
123 bi::member_hook<fair_queue_entry, bi::slist_member_hook<>, &fair_queue_entry::_hook>>;
124
125 capacity_t capacity() const noexcept { return _capacity; }
126};
127
139public:
140 using capacity_t = fair_queue_entry::capacity_t;
141 using clock_type = std::chrono::steady_clock;
142
143 /*
144 * tldr; The math
145 *
146 * Bw, Br -- write/read bandwidth (bytes per second)
147 * Ow, Or -- write/read iops (ops per second)
148 *
149 * xx_max -- their maximum values (configured)
150 *
151 * Throttling formula:
152 *
153 * Bw/Bw_max + Br/Br_max + Ow/Ow_max + Or/Or_max <= K
154 *
155 * where K is the scalar value <= 1.0 (also configured)
156 *
157 * Bandwidth is bytes time derivatite, iops is ops time derivative, i.e.
158 * Bx = d(bx)/dt, Ox = d(ox)/dt. Then the formula turns into
159 *
160 * d(bw/Bw_max + br/Br_max + ow/Ow_max + or/Or_max)/dt <= K
161 *
162 * Fair queue tickets are {w, s} weight-size pairs that are
163 *
164 * s = read_base_count * br, for reads
165 * Br_max/Bw_max * read_base_count * bw, for writes
166 *
167 * w = read_base_count, for reads
168 * Or_max/Ow_max * read_base_count, for writes
169 *
170 * Thus the formula turns into
171 *
172 * d(sum(w/W + s/S))/dr <= K
173 *
174 * where {w, s} is the ticket value if a request and sum summarizes the
175 * ticket values from all the requests seen so far, {W, S} is the ticket
176 * value that corresonds to a virtual summary of Or_max requests of
177 * Br_max size total.
178 */
179
180 /*
181 * The normalization results in a float of the 2^-30 seconds order of
182 * magnitude. Not to invent float point atomic arithmetics, the result
183 * is converted to an integer by multiplying by a factor that's large
184 * enough to turn these values into a non-zero integer.
185 *
186 * Also, the rates in bytes/sec when adjusted by io-queue according to
187 * multipliers become too large to be stored in 32-bit ticket value.
188 * Thus the rate resolution is applied. The t.bucket is configured with a
189 * time period for which the speeds from F (in above formula) are taken.
190 */
191
192 static constexpr float fixed_point_factor = float(1 << 24);
193 using rate_resolution = std::milli;
194 using token_bucket_t = internal::shared_token_bucket<capacity_t, rate_resolution, internal::capped_release::no>;
195
196private:
197
198 /*
199 * The dF/dt <= K limitation is managed by the modified token bucket
200 * algo where tokens are ticket.normalize(cost_capacity), the refill
201 * rate is K.
202 *
203 * The token bucket algo must have the limit on the number of tokens
204 * accumulated. Here it's configured so that it accumulates for the
205 * latency_goal duration.
206 *
207 * The replenish threshold is the minimal number of tokens to put back.
208 * It's reserved for future use to reduce the load on the replenish
209 * timestamp.
210 *
211 * The timestamp, in turn, is the time when the bucket was replenished
212 * last. Every time a shard tries to get tokens from bucket it first
213 * tries to convert the time that had passed since this timestamp
214 * into more tokens in the bucket.
215 */
216
217 token_bucket_t _token_bucket;
218 const capacity_t _per_tick_threshold;
219
220public:
221
222 // Convert internal capacity value back into the real token
223 static double capacity_tokens(capacity_t cap) noexcept {
224 return (double)cap / fixed_point_factor / token_bucket_t::rate_cast(std::chrono::seconds(1)).count();
225 }
226
227 // Convert floating-point tokens into the token bucket capacity
228 static capacity_t tokens_capacity(double tokens) noexcept {
229 return tokens * token_bucket_t::rate_cast(std::chrono::seconds(1)).count() * fixed_point_factor;
230 }
231
232 auto capacity_duration(capacity_t cap) const noexcept {
233 return _token_bucket.duration_for(cap);
234 }
235
236 struct config {
237 sstring label = "";
238 /*
239 * There are two "min" values that can be configured. The former one
240 * is the minimal weight:size pair that the upper layer is going to
241 * submit. However, it can submit _larger_ values, and the fair queue
242 * must accept those as large as the latter pair (but it can accept
243 * even larger values, of course)
244 */
245 double min_tokens = 0.0;
246 double limit_min_tokens = 0.0;
247 std::chrono::duration<double> rate_limit_duration = std::chrono::milliseconds(1);
248 };
249
250 explicit fair_group(config cfg, unsigned nr_queues);
251 fair_group(fair_group&&) = delete;
252
253 capacity_t maximum_capacity() const noexcept { return _token_bucket.limit(); }
254 capacity_t per_tick_grab_threshold() const noexcept { return _per_tick_threshold; }
255 capacity_t grab_capacity(capacity_t cap) noexcept;
256 clock_type::time_point replenished_ts() const noexcept { return _token_bucket.replenished_ts(); }
257 void replenish_capacity(clock_type::time_point now) noexcept;
258 void maybe_replenish_capacity(clock_type::time_point& local_ts) noexcept;
259
260 capacity_t capacity_deficiency(capacity_t from) const noexcept;
261
262 std::chrono::duration<double> rate_limit_duration() const noexcept {
263 std::chrono::duration<double, rate_resolution> dur((double)_token_bucket.limit() / _token_bucket.rate());
264 return std::chrono::duration_cast<std::chrono::duration<double>>(dur);
265 }
266
267 const token_bucket_t& token_bucket() const noexcept { return _token_bucket; }
268};
269
290public:
295 struct config {
296 sstring label = "";
297 std::chrono::microseconds tau = std::chrono::milliseconds(5);
298 };
299
300 using class_id = unsigned int;
301 class priority_class_data;
302 using capacity_t = fair_group::capacity_t;
303 using signed_capacity_t = std::make_signed_t<capacity_t>;
304
305private:
306 using clock_type = std::chrono::steady_clock;
307 using priority_class_ptr = priority_class_data*;
308 struct class_compare {
309 bool operator() (const priority_class_ptr& lhs, const priority_class_ptr & rhs) const noexcept;
310 };
311
312 class priority_queue : public std::priority_queue<priority_class_ptr, std::vector<priority_class_ptr>, class_compare> {
313 using super = std::priority_queue<priority_class_ptr, std::vector<priority_class_ptr>, class_compare>;
314 public:
315 void reserve(size_t len) {
316 c.reserve(len);
317 }
318
319 void assert_enough_capacity() const noexcept {
320 assert(c.size() < c.capacity());
321 }
322 };
323
324 config _config;
325 fair_group& _group;
326 clock_type::time_point _group_replenish;
327 fair_queue_ticket _resources_executing;
328 fair_queue_ticket _resources_queued;
329 priority_queue _handles;
330 std::vector<std::unique_ptr<priority_class_data>> _priority_classes;
331 size_t _nr_classes = 0;
332 capacity_t _last_accumulated = 0;
333
334 /*
335 * When the shared capacity os over the local queue delays
336 * further dispatching untill better times
337 *
338 * \head -- the value group head rover is expected to cross
339 * \cap -- the capacity that's accounted on the group
340 *
341 * The last field is needed to "rearm" the wait in case
342 * queue decides that it wants to dispatch another capacity
343 * in the middle of the waiting
344 */
345 struct pending {
346 capacity_t head;
347 capacity_t cap;
348
349 pending(capacity_t t, capacity_t c) noexcept : head(t), cap(c) {}
350 };
351
352 std::optional<pending> _pending;
353
354 void push_priority_class(priority_class_data& pc) noexcept;
355 void push_priority_class_from_idle(priority_class_data& pc) noexcept;
356 void pop_priority_class(priority_class_data& pc) noexcept;
357 void plug_priority_class(priority_class_data& pc) noexcept;
358 void unplug_priority_class(priority_class_data& pc) noexcept;
359
360 enum class grab_result { grabbed, cant_preempt, pending };
361 grab_result grab_capacity(const fair_queue_entry& ent) noexcept;
362 grab_result grab_pending_capacity(const fair_queue_entry& ent) noexcept;
363public:
367 explicit fair_queue(fair_group& shared, config cfg);
368 fair_queue(fair_queue&&) = delete;
369 ~fair_queue();
370
371 sstring label() const noexcept { return _config.label; }
372
376 void register_priority_class(class_id c, uint32_t shares);
377
382
383 void update_shares_for_class(class_id c, uint32_t new_shares);
384
387
390
391 capacity_t tokens_capacity(double tokens) const noexcept {
392 return _group.tokens_capacity(tokens);
393 }
394
395 capacity_t maximum_capacity() const noexcept {
396 return _group.maximum_capacity();
397 }
398
403 void queue(class_id c, fair_queue_entry& ent) noexcept;
404
405 void plug_class(class_id c) noexcept;
406 void unplug_class(class_id c) noexcept;
407
410 void notify_request_finished(fair_queue_entry::capacity_t cap) noexcept;
411 void notify_request_cancelled(fair_queue_entry& ent) noexcept;
412
414 void dispatch_requests(std::function<void(fair_queue_entry&)> cb);
415
416 clock_type::time_point next_pending_aio() const noexcept;
417
418 std::vector<seastar::metrics::impl::metric_definition_impl> metrics(class_id c);
419};
421
422}
423
424#if FMT_VERSION >= 90000
425template <> struct fmt::formatter<seastar::fair_queue_ticket> : fmt::ostream_formatter {};
426#endif
Group of queues class.
Definition: fair_queue.hh:138
Definition: fair_queue.hh:236
Definition: fair_queue.hh:105
describes a request that passes through the fair_queue.
Definition: fair_queue.hh:49
fair_queue_ticket(uint32_t weight, uint32_t size) noexcept
bool operator==(const fair_queue_ticket &desc) const noexcept
fair_queue_ticket & operator-=(fair_queue_ticket desc) noexcept
float normalize(fair_queue_ticket axis) const noexcept
fair_queue_ticket & operator+=(fair_queue_ticket desc) noexcept
Fair queuing class.
Definition: fair_queue.hh:289
void queue(class_id c, fair_queue_entry &ent) noexcept
void unregister_priority_class(class_id c)
fair_queue_ticket resources_currently_executing() const
fair_queue(fair_group &shared, config cfg)
void notify_request_finished(fair_queue_entry::capacity_t cap) noexcept
void dispatch_requests(std::function< void(fair_queue_entry &)> cb)
Try to execute new requests if there is capacity left in the queue.
fair_queue_ticket resources_currently_waiting() const
void register_priority_class(class_id c, uint32_t shares)
Fair Queue configuration structure.
Definition: fair_queue.hh:295
future now()
Returns a ready future.
Definition: later.hh:35
holds the metric_groups definition needed by class that reports metrics
holds the implementation parts of the metrics layer, do not use directly.
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
STL namespace.