34inline uint64_t wrapping_difference(
const uint64_t& a,
const uint64_t& b)
noexcept {
35 return std::max<int64_t>(a - b, 0);
38inline uint64_t fetch_add(std::atomic<uint64_t>& a, uint64_t b)
noexcept {
39 return a.fetch_add(b);
43concept supports_wrapping_arithmetics =
requires (T a, std::atomic<T> atomic_a, T b) {
44 { fetch_add(atomic_a, b) }
noexcept -> std::same_as<T>;
45 { wrapping_difference(a, b) }
noexcept -> std::same_as<T>;
46 { a + b }
noexcept -> std::same_as<T>;
49enum class capped_release { yes, no };
51template <
typename T, capped_release Capped>
55struct rovers<T, capped_release::yes> {
56 using atomic_rover = std::atomic<T>;
62 rovers(T limit) noexcept : tail(0), head(0), ceil(limit) {}
64 T max_extra(T)
const noexcept {
65 return wrapping_difference(ceil.load(std::memory_order_relaxed), head.load(std::memory_order_relaxed));
68 void release(T tokens) {
69 fetch_add(ceil, tokens);
74struct rovers<T, capped_release::no> {
75 using atomic_rover = std::atomic<T>;
80 rovers(T) noexcept : tail(0), head(0) {}
82 T max_extra(T limit)
const noexcept {
83 return wrapping_difference(tail.load(std::memory_order_relaxed) + limit, head.load(std::memory_order_relaxed));
86 void release(T) =
delete;
89template <
typename T,
typename Period, capped_release Capped,
typename Clock = std::chrono::steady_clock>
90requires std::is_nothrow_copy_constructible_v<T> && supports_wrapping_arithmetics<T>
91class shared_token_bucket {
92 using rate_resolution = std::chrono::duration<double, Period>;
95 const T _replenish_limit;
96 const T _replenish_threshold;
97 std::atomic<typename Clock::time_point> _replenished;
114 using rovers_t = rovers<T, Capped>;
115 static_assert(rovers_t::atomic_rover::is_always_lock_free);
118 T tail() const noexcept {
return _rovers.tail.load(std::memory_order_relaxed); }
119 T head() const noexcept {
return _rovers.head.load(std::memory_order_relaxed); }
130 static constexpr rate_resolution max_delta = std::chrono::duration_cast<rate_resolution>(std::chrono::hours(1));
132 static constexpr T max_rate = std::numeric_limits<T>::max() / 2 / max_delta.count();
133 static constexpr capped_release is_capped = Capped;
136 static constexpr T accumulated(T rate, rate_resolution delta)
noexcept {
137 return std::round(rate * delta.count());
141 static_assert(accumulated(max_rate, max_delta) <= std::numeric_limits<T>::max());
145 shared_token_bucket(T rate, T limit, T threshold,
bool add_replenish_iffset =
true) noexcept
146 : _replenish_rate(
std::min(rate, max_rate))
147 , _replenish_limit(limit)
148 , _replenish_threshold(
std::clamp(threshold, (T)1, limit))
150 , _replenished(Clock::
now() -
std::chrono::hours(add_replenish_iffset ? 24 : 0))
151 , _rovers(_replenish_limit)
154 T grab(T tokens)
noexcept {
155 return fetch_add(_rovers.tail, tokens) + tokens;
158 void release(T tokens)
noexcept {
159 _rovers.release(tokens);
162 void replenish(
typename Clock::time_point
now)
noexcept {
163 auto ts = _replenished.load(std::memory_order_relaxed);
169 auto delta =
now - ts;
170 auto extra = accumulated_in(delta);
172 if (extra >= _replenish_threshold) {
173 if (!_replenished.compare_exchange_weak(ts, ts + delta)) {
177 fetch_add(_rovers.head, std::min(extra, _rovers.max_extra(_replenish_limit)));
181 T deficiency(T from)
const noexcept {
182 return wrapping_difference(from, head());
185 template <
typename Rep,
typename Per>
186 static auto rate_cast(
const std::chrono::duration<Rep, Per> delta)
noexcept {
187 return std::chrono::duration_cast<rate_resolution>(delta);
191 template <
typename Rep,
typename Per>
192 T accumulated_in(
const std::chrono::duration<Rep, Per> delta)
const noexcept {
193 auto delta_at_rate = std::min(rate_cast(delta), max_delta);
194 return accumulated(_replenish_rate, delta_at_rate);
199 rate_resolution duration_for(T tokens)
const noexcept {
200 return rate_resolution(
double(tokens) / _replenish_rate);
203 T rate() const noexcept {
return _replenish_rate; }
204 T limit() const noexcept {
return _replenish_limit; }
205 T threshold() const noexcept {
return _replenish_threshold; }
206 typename Clock::time_point replenished_ts() const noexcept {
return _replenished; }
208 void update_rate(T rate)
noexcept {
209 _replenish_rate = std::min(rate, max_rate);
future now()
Returns a ready future.
Definition: later.hh:35
Seastar API namespace.
Definition: abort_on_ebadf.hh:26