Seastar
High performance C++ framework for concurrent servers
fair_queue.hh
1 /*
2  * This file is open source software, licensed to you under the terms
3  * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4  * distributed with this work for additional information regarding copyright
5  * ownership. You may not use this file except in compliance with the License.
6  *
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an
13  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14  * KIND, either express or implied. See the License for the
15  * specific language governing permissions and limitations
16  * under the License.
17  */
18 
19 /*
20  * Copyright (C) 2016 ScyllaDB
21  */
22 #pragma once
23 
24 #include <boost/intrusive/slist.hpp>
25 #include <seastar/core/sstring.hh>
26 #include <seastar/core/shared_ptr.hh>
27 #include <seastar/core/circular_buffer.hh>
29 #include <seastar/util/shared_token_bucket.hh>
30 
31 #include <chrono>
32 #include <cstdint>
33 #include <functional>
34 #include <optional>
35 #include <queue>
36 #include <unordered_set>
37 
38 namespace bi = boost::intrusive;
39 
40 namespace seastar {
41 
50  uint32_t _weight = 0;
51  uint32_t _size = 0;
52 public:
57  fair_queue_ticket(uint32_t weight, uint32_t size) noexcept;
58  fair_queue_ticket() noexcept {}
59  fair_queue_ticket operator+(fair_queue_ticket desc) const noexcept;
60  fair_queue_ticket operator-(fair_queue_ticket desc) const noexcept;
69  bool operator==(const fair_queue_ticket& desc) const noexcept;
70 
75  explicit operator bool() const noexcept;
76  bool is_non_zero() const noexcept;
77 
78  friend std::ostream& operator<<(std::ostream& os, fair_queue_ticket t);
79 
93  float normalize(fair_queue_ticket axis) const noexcept;
94 
95  /*
96  * For both dimentions checks if the first rover is ahead of the
97  * second and returns the difference. If behind returns zero.
98  */
99  friend fair_queue_ticket wrapping_difference(const fair_queue_ticket& a, const fair_queue_ticket& b) noexcept;
100 };
101 
104 
106 public:
107  // The capacity_t represents tokens each entry needs to get dispatched, in
108  // a 'normalized' form -- converted from floating-point to fixed-point number
109  // and scaled accrding to fair-group's token-bucket duration
110  using capacity_t = uint64_t;
111  friend class fair_queue;
112 
113 private:
114  capacity_t _capacity;
115  bi::slist_member_hook<> _hook;
116 
117 public:
118  fair_queue_entry(capacity_t c) noexcept
119  : _capacity(c) {}
120  using container_list_t = bi::slist<fair_queue_entry,
121  bi::constant_time_size<false>,
122  bi::cache_last<true>,
123  bi::member_hook<fair_queue_entry, bi::slist_member_hook<>, &fair_queue_entry::_hook>>;
124 
125  capacity_t capacity() const noexcept { return _capacity; }
126 };
127 
138 class fair_group {
139 public:
140  using capacity_t = fair_queue_entry::capacity_t;
141  using clock_type = std::chrono::steady_clock;
142 
143  /*
144  * tldr; The math
145  *
146  * Bw, Br -- write/read bandwidth (bytes per second)
147  * Ow, Or -- write/read iops (ops per second)
148  *
149  * xx_max -- their maximum values (configured)
150  *
151  * Throttling formula:
152  *
153  * Bw/Bw_max + Br/Br_max + Ow/Ow_max + Or/Or_max <= K
154  *
155  * where K is the scalar value <= 1.0 (also configured)
156  *
157  * Bandwidth is bytes time derivatite, iops is ops time derivative, i.e.
158  * Bx = d(bx)/dt, Ox = d(ox)/dt. Then the formula turns into
159  *
160  * d(bw/Bw_max + br/Br_max + ow/Ow_max + or/Or_max)/dt <= K
161  *
162  * Fair queue tickets are {w, s} weight-size pairs that are
163  *
164  * s = read_base_count * br, for reads
165  * Br_max/Bw_max * read_base_count * bw, for writes
166  *
167  * w = read_base_count, for reads
168  * Or_max/Ow_max * read_base_count, for writes
169  *
170  * Thus the formula turns into
171  *
172  * d(sum(w/W + s/S))/dr <= K
173  *
174  * where {w, s} is the ticket value if a request and sum summarizes the
175  * ticket values from all the requests seen so far, {W, S} is the ticket
176  * value that corresonds to a virtual summary of Or_max requests of
177  * Br_max size total.
178  */
179 
180  /*
181  * The normalization results in a float of the 2^-30 seconds order of
182  * magnitude. Not to invent float point atomic arithmetics, the result
183  * is converted to an integer by multiplying by a factor that's large
184  * enough to turn these values into a non-zero integer.
185  *
186  * Also, the rates in bytes/sec when adjusted by io-queue according to
187  * multipliers become too large to be stored in 32-bit ticket value.
188  * Thus the rate resolution is applied. The t.bucket is configured with a
189  * time period for which the speeds from F (in above formula) are taken.
190  */
191 
192  static constexpr float fixed_point_factor = float(1 << 24);
193  using rate_resolution = std::milli;
194  using token_bucket_t = internal::shared_token_bucket<capacity_t, rate_resolution, internal::capped_release::no>;
195 
196 private:
197 
198  /*
199  * The dF/dt <= K limitation is managed by the modified token bucket
200  * algo where tokens are ticket.normalize(cost_capacity), the refill
201  * rate is K.
202  *
203  * The token bucket algo must have the limit on the number of tokens
204  * accumulated. Here it's configured so that it accumulates for the
205  * latency_goal duration.
206  *
207  * The replenish threshold is the minimal number of tokens to put back.
208  * It's reserved for future use to reduce the load on the replenish
209  * timestamp.
210  *
211  * The timestamp, in turn, is the time when the bucket was replenished
212  * last. Every time a shard tries to get tokens from bucket it first
213  * tries to convert the time that had passed since this timestamp
214  * into more tokens in the bucket.
215  */
216 
217  token_bucket_t _token_bucket;
218  const capacity_t _per_tick_threshold;
219 
220 public:
221 
222  // Convert internal capacity value back into the real token
223  static double capacity_tokens(capacity_t cap) noexcept {
224  return (double)cap / fixed_point_factor / token_bucket_t::rate_cast(std::chrono::seconds(1)).count();
225  }
226 
227  // Convert floating-point tokens into the token bucket capacity
228  static capacity_t tokens_capacity(double tokens) noexcept {
229  return tokens * token_bucket_t::rate_cast(std::chrono::seconds(1)).count() * fixed_point_factor;
230  }
231 
232  auto capacity_duration(capacity_t cap) const noexcept {
233  return _token_bucket.duration_for(cap);
234  }
235 
236  struct config {
237  sstring label = "";
238  /*
239  * There are two "min" values that can be configured. The former one
240  * is the minimal weight:size pair that the upper layer is going to
241  * submit. However, it can submit _larger_ values, and the fair queue
242  * must accept those as large as the latter pair (but it can accept
243  * even larger values, of course)
244  */
245  double min_tokens = 0.0;
246  double limit_min_tokens = 0.0;
247  float rate_factor = 1.0;
248  std::chrono::duration<double> rate_limit_duration = std::chrono::milliseconds(1);
249  };
250 
251  explicit fair_group(config cfg, unsigned nr_queues);
252  fair_group(fair_group&&) = delete;
253 
254  capacity_t maximum_capacity() const noexcept { return _token_bucket.limit(); }
255  capacity_t per_tick_grab_threshold() const noexcept { return _per_tick_threshold; }
256  capacity_t grab_capacity(capacity_t cap) noexcept;
257  clock_type::time_point replenished_ts() const noexcept { return _token_bucket.replenished_ts(); }
258  void replenish_capacity(clock_type::time_point now) noexcept;
259  void maybe_replenish_capacity(clock_type::time_point& local_ts) noexcept;
260 
261  capacity_t capacity_deficiency(capacity_t from) const noexcept;
262 
263  std::chrono::duration<double> rate_limit_duration() const noexcept {
264  std::chrono::duration<double, rate_resolution> dur((double)_token_bucket.limit() / _token_bucket.rate());
265  return std::chrono::duration_cast<std::chrono::duration<double>>(dur);
266  }
267 
268  const token_bucket_t& token_bucket() const noexcept { return _token_bucket; }
269 };
270 
290 class fair_queue {
291 public:
296  struct config {
297  sstring label = "";
298  std::chrono::microseconds tau = std::chrono::milliseconds(5);
299  };
300 
301  using class_id = unsigned int;
302  class priority_class_data;
303  using capacity_t = fair_group::capacity_t;
304  using signed_capacity_t = std::make_signed_t<capacity_t>;
305 
306 private:
307  using clock_type = std::chrono::steady_clock;
308  using priority_class_ptr = priority_class_data*;
309  struct class_compare {
310  bool operator() (const priority_class_ptr& lhs, const priority_class_ptr & rhs) const noexcept;
311  };
312 
313  class priority_queue : public std::priority_queue<priority_class_ptr, std::vector<priority_class_ptr>, class_compare> {
314  using super = std::priority_queue<priority_class_ptr, std::vector<priority_class_ptr>, class_compare>;
315  public:
316  void reserve(size_t len) {
317  c.reserve(len);
318  }
319 
320  void assert_enough_capacity() const noexcept {
321  assert(c.size() < c.capacity());
322  }
323  };
324 
325  config _config;
326  fair_group& _group;
327  clock_type::time_point _group_replenish;
328  fair_queue_ticket _resources_executing;
329  fair_queue_ticket _resources_queued;
330  priority_queue _handles;
331  std::vector<std::unique_ptr<priority_class_data>> _priority_classes;
332  size_t _nr_classes = 0;
333  capacity_t _last_accumulated = 0;
334 
335  /*
336  * When the shared capacity os over the local queue delays
337  * further dispatching untill better times
338  *
339  * \head -- the value group head rover is expected to cross
340  * \cap -- the capacity that's accounted on the group
341  *
342  * The last field is needed to "rearm" the wait in case
343  * queue decides that it wants to dispatch another capacity
344  * in the middle of the waiting
345  */
346  struct pending {
347  capacity_t head;
348  capacity_t cap;
349 
350  pending(capacity_t t, capacity_t c) noexcept : head(t), cap(c) {}
351  };
352 
353  std::optional<pending> _pending;
354 
355  void push_priority_class(priority_class_data& pc) noexcept;
356  void push_priority_class_from_idle(priority_class_data& pc) noexcept;
357  void pop_priority_class(priority_class_data& pc) noexcept;
358  void plug_priority_class(priority_class_data& pc) noexcept;
359  void unplug_priority_class(priority_class_data& pc) noexcept;
360 
361  enum class grab_result { grabbed, cant_preempt, pending };
362  grab_result grab_capacity(const fair_queue_entry& ent) noexcept;
363  grab_result grab_pending_capacity(const fair_queue_entry& ent) noexcept;
364 public:
368  explicit fair_queue(fair_group& shared, config cfg);
370  ~fair_queue();
371 
372  sstring label() const noexcept { return _config.label; }
373 
377  void register_priority_class(class_id c, uint32_t shares);
378 
382  void unregister_priority_class(class_id c);
383 
384  void update_shares_for_class(class_id c, uint32_t new_shares);
385 
388 
391 
392  capacity_t tokens_capacity(double tokens) const noexcept {
393  return _group.tokens_capacity(tokens);
394  }
395 
396  capacity_t maximum_capacity() const noexcept {
397  return _group.maximum_capacity();
398  }
399 
404  void queue(class_id c, fair_queue_entry& ent) noexcept;
405 
406  void plug_class(class_id c) noexcept;
407  void unplug_class(class_id c) noexcept;
408 
411  void notify_request_finished(fair_queue_entry::capacity_t cap) noexcept;
412  void notify_request_cancelled(fair_queue_entry& ent) noexcept;
413 
415  void dispatch_requests(std::function<void(fair_queue_entry&)> cb);
416 
417  clock_type::time_point next_pending_aio() const noexcept;
418 
419  std::vector<seastar::metrics::impl::metric_definition_impl> metrics(class_id c);
420 };
422 
423 }
424 
425 #if FMT_VERSION >= 90000
426 template <> struct fmt::formatter<seastar::fair_queue_ticket> : fmt::ostream_formatter {};
427 #endif
Group of queues class.
Definition: fair_queue.hh:138
Definition: fair_queue.hh:236
Definition: fair_queue.hh:105
describes a request that passes through the fair_queue.
Definition: fair_queue.hh:49
fair_queue_ticket(uint32_t weight, uint32_t size) noexcept
bool operator==(const fair_queue_ticket &desc) const noexcept
fair_queue_ticket & operator-=(fair_queue_ticket desc) noexcept
float normalize(fair_queue_ticket axis) const noexcept
fair_queue_ticket & operator+=(fair_queue_ticket desc) noexcept
Fair queuing class.
Definition: fair_queue.hh:290
void queue(class_id c, fair_queue_entry &ent) noexcept
void unregister_priority_class(class_id c)
fair_queue_ticket resources_currently_executing() const
fair_queue(fair_group &shared, config cfg)
void notify_request_finished(fair_queue_entry::capacity_t cap) noexcept
void dispatch_requests(std::function< void(fair_queue_entry &)> cb)
Try to execute new requests if there is capacity left in the queue.
fair_queue_ticket resources_currently_waiting() const
void register_priority_class(class_id c, uint32_t shares)
Fair Queue configuration structure.
Definition: fair_queue.hh:296
future now()
Returns a ready future.
Definition: later.hh:35
holds the metric_groups definition needed by class that reports metrics
holds the implementation parts of the metrics layer, do not use directly.
Seastar API namespace.
Definition: abort_on_ebadf.hh:26