Seastar
High performance C++ framework for concurrent servers
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Modules Pages
shared_token_bucket.hh
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18
19/*
20 * Copyright (C) 2022 ScyllaDB
21 */
22
23#pragma once
24
25#include <atomic>
26#include <chrono>
27#include <cmath>
28#include <concepts>
29#include <cstdint>
30
31namespace seastar {
32namespace internal {
33
34inline uint64_t wrapping_difference(const uint64_t& a, const uint64_t& b) noexcept {
35 return std::max<int64_t>(a - b, 0);
36}
37
38inline uint64_t fetch_add(std::atomic<uint64_t>& a, uint64_t b) noexcept {
39 return a.fetch_add(b);
40}
41
42template <typename T>
43concept supports_wrapping_arithmetics = requires (T a, std::atomic<T> atomic_a, T b) {
44 { fetch_add(atomic_a, b) } noexcept -> std::same_as<T>;
45 { wrapping_difference(a, b) } noexcept -> std::same_as<T>;
46 { a + b } noexcept -> std::same_as<T>;
47};
48
49enum class capped_release { yes, no };
50
51template <typename T, capped_release Capped>
52struct rovers;
53
54template <typename T>
55struct rovers<T, capped_release::yes> {
56 using atomic_rover = std::atomic<T>;
57
58 atomic_rover tail;
59 atomic_rover head;
60 atomic_rover ceil;
61
62 rovers(T limit) noexcept : tail(0), head(0), ceil(limit) {}
63
64 T max_extra(T) const noexcept {
65 return wrapping_difference(ceil.load(std::memory_order_relaxed), head.load(std::memory_order_relaxed));
66 }
67
68 void release(T tokens) {
69 fetch_add(ceil, tokens);
70 }
71};
72
73template <typename T>
74struct rovers<T, capped_release::no> {
75 using atomic_rover = std::atomic<T>;
76
77 atomic_rover tail;
78 atomic_rover head;
79
80 rovers(T) noexcept : tail(0), head(0) {}
81
82 T max_extra(T limit) const noexcept {
83 return wrapping_difference(tail.load(std::memory_order_relaxed) + limit, head.load(std::memory_order_relaxed));
84 }
85
86 void release(T) = delete;
87};
88
89template <typename T, typename Period, capped_release Capped, typename Clock = std::chrono::steady_clock>
90requires std::is_nothrow_copy_constructible_v<T> && supports_wrapping_arithmetics<T>
91class shared_token_bucket {
92 using rate_resolution = std::chrono::duration<double, Period>;
93
94 T _replenish_rate;
95 const T _replenish_limit;
96 const T _replenish_threshold;
97 std::atomic<typename Clock::time_point> _replenished;
98
99 /*
100 * The token bucket is implemented as a pair of wrapping monotonic
101 * counters (called rovers) one chasing the other. Getting a token
102 * from the bucket is increasing the tail, replenishing a token back
103 * is increasing the head. If increased tail overruns the head then
104 * the bucket is empty and we have to wait. The shard that grabs tail
105 * earlier will be "woken up" earlier, so they form a queue.
106 *
107 * The top rover is needed to implement two buckets actually. The
108 * tokens are not just replenished by timer. They are replenished by
109 * timer from the second bucket. And the second bucket only get a
110 * token in it after the request that grabbed it from the first bucket
111 * completes and returns it back.
112 */
113
114 using rovers_t = rovers<T, Capped>;
115 static_assert(rovers_t::atomic_rover::is_always_lock_free);
116 rovers_t _rovers;
117
118 T tail() const noexcept { return _rovers.tail.load(std::memory_order_relaxed); }
119 T head() const noexcept { return _rovers.head.load(std::memory_order_relaxed); }
120
121 /*
122 * Need to make sure that the multiplication in accumulated_in() doesn't
123 * overflow. Not to introduce an extra branch there, define that the
124 * replenish period is not larger than this delta and limit the rate with
125 * the value that can overflow it.
126 *
127 * The additional /=2 in max_rate math is to make extra sure that the
128 * overflow doesn't break wrapping_difference sign tricks.
129 */
130 static constexpr rate_resolution max_delta = std::chrono::duration_cast<rate_resolution>(std::chrono::hours(1));
131public:
132 static constexpr T max_rate = std::numeric_limits<T>::max() / 2 / max_delta.count();
133 static constexpr capped_release is_capped = Capped;
134
135private:
136 static constexpr T accumulated(T rate, rate_resolution delta) noexcept {
137 return std::round(rate * delta.count());
138 }
139#ifndef __clang__
140 // std::round() is constexpr only since C++23 (but g++ doesn't care)
141 static_assert(accumulated(max_rate, max_delta) <= std::numeric_limits<T>::max());
142#endif
143
144public:
145 shared_token_bucket(T rate, T limit, T threshold, bool add_replenish_iffset = true) noexcept
146 : _replenish_rate(std::min(rate, max_rate))
147 , _replenish_limit(limit)
148 , _replenish_threshold(std::clamp(threshold, (T)1, limit))
149 // pretend it was replenished yesterday to spot overflows early
150 , _replenished(Clock::now() - std::chrono::hours(add_replenish_iffset ? 24 : 0))
151 , _rovers(_replenish_limit)
152 {}
153
154 T grab(T tokens) noexcept {
155 return fetch_add(_rovers.tail, tokens) + tokens;
156 }
157
158 void release(T tokens) noexcept {
159 _rovers.release(tokens);
160 }
161
162 void replenish(typename Clock::time_point now) noexcept {
163 auto ts = _replenished.load(std::memory_order_relaxed);
164
165 if (now <= ts) {
166 return;
167 }
168
169 auto delta = now - ts;
170 auto extra = accumulated_in(delta);
171
172 if (extra >= _replenish_threshold) {
173 if (!_replenished.compare_exchange_weak(ts, ts + delta)) {
174 return; // next time or another shard
175 }
176
177 fetch_add(_rovers.head, std::min(extra, _rovers.max_extra(_replenish_limit)));
178 }
179 }
180
181 T deficiency(T from) const noexcept {
182 return wrapping_difference(from, head());
183 }
184
185 template <typename Rep, typename Per>
186 static auto rate_cast(const std::chrono::duration<Rep, Per> delta) noexcept {
187 return std::chrono::duration_cast<rate_resolution>(delta);
188 }
189
190 // the number of tokens accumulated for the given time frame
191 template <typename Rep, typename Per>
192 T accumulated_in(const std::chrono::duration<Rep, Per> delta) const noexcept {
193 auto delta_at_rate = std::min(rate_cast(delta), max_delta);
194 return accumulated(_replenish_rate, delta_at_rate);
195 }
196
197 // Estimated time to process the given amount of tokens
198 // (peer of accumulated_in helper)
199 rate_resolution duration_for(T tokens) const noexcept {
200 return rate_resolution(double(tokens) / _replenish_rate);
201 }
202
203 T rate() const noexcept { return _replenish_rate; }
204 T limit() const noexcept { return _replenish_limit; }
205 T threshold() const noexcept { return _replenish_threshold; }
206 typename Clock::time_point replenished_ts() const noexcept { return _replenished; }
207
208 void update_rate(T rate) noexcept {
209 _replenish_rate = std::min(rate, max_rate);
210 }
211};
212
213} // internal namespace
214} // seastar namespace
future now()
Returns a ready future.
Definition: later.hh:35
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
STL namespace.