Seastar
High performance C++ framework for concurrent servers
expiring_fifo.hh
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18/*
19 * Copyright (C) 2016 ScyllaDB
20 */
21
22#pragma once
23
24#ifndef SEASTAR_MODULE
25#include <seastar/core/future.hh>
26#include <seastar/core/chunked_fifo.hh>
27#include <exception>
28#include <memory>
29#include <seastar/core/timer.hh>
30#include <seastar/core/lowres_clock.hh>
31#include <seastar/core/timed_out_error.hh>
32#include <seastar/util/modules.hh>
33#endif
34
35namespace seastar {
36
37template<typename T>
39 void operator()(T&) noexcept {};
40};
41
42template<typename... T>
44 void operator()(promise<T...>& pr) noexcept {
45 pr.set_exception(std::make_exception_ptr(timed_out_error()));
46 };
47};
48
57SEASTAR_MODULE_EXPORT
58template <typename T, typename OnExpiry = dummy_expiry<T>, typename Clock = lowres_clock>
60public:
61 using clock = Clock;
62 using time_point = typename Clock::time_point;
63private:
64 struct entry {
65 std::optional<T> payload; // disengaged means that it's expired
66 timer<Clock> tr;
67 entry(T&& payload_) : payload(std::move(payload_)) {}
68 entry(const T& payload_) : payload(payload_) {}
69 entry(T payload_, expiring_fifo& ef, time_point timeout)
70 : payload(std::move(payload_))
71 , tr([this, &ef] {
72 ef._on_expiry(*payload);
73 payload = std::nullopt;
74 --ef._size;
75 ef.drop_expired_front();
76 })
77 {
78 tr.arm(timeout);
79 }
80 entry(entry&& x) = delete;
81 entry(const entry& x) = delete;
82 };
83
84 // If engaged, represents the first element.
85 // This is to avoid large allocations done by chunked_fifo for single-element cases.
86 // expiring_fifo is used to implement wait lists in synchronization primitives
87 // and in some uses it's common to have at most one waiter.
88 std::unique_ptr<entry> _front;
89
90 // There is an invariant that the front element is never expired.
92 OnExpiry _on_expiry;
93 size_t _size = 0;
94
95 // Ensures that front() is not expired by dropping expired elements from the front.
96 void drop_expired_front() noexcept {
97 while (!_list.empty() && !_list.front().payload) {
98 _list.pop_front();
99 }
100 if (_front && !_front->payload) {
101 _front.reset();
102 }
103 }
104public:
105 expiring_fifo() noexcept = default;
106 expiring_fifo(OnExpiry on_expiry) noexcept(std::is_nothrow_move_constructible_v<OnExpiry>) : _on_expiry(std::move(on_expiry)) {}
107
108 expiring_fifo(expiring_fifo&& o) noexcept
109 : expiring_fifo(std::move(o._on_expiry)) {
110 // entry objects hold a reference to this so non-empty containers cannot be moved.
111 assert(o._size == 0);
112 }
113
114 expiring_fifo& operator=(expiring_fifo&& o) noexcept {
115 if (this != &o) {
116 this->~expiring_fifo();
117 new (this) expiring_fifo(std::move(o));
118 }
119 return *this;
120 }
121
127 bool empty() const noexcept {
128 return _size == 0;
129 }
130
132 explicit operator bool() const noexcept {
133 return !empty();
134 }
135
138 T& front() noexcept {
139 if (_front) {
140 return *_front->payload;
141 }
142 return *_list.front().payload;
143 }
144
147 const T& front() const noexcept {
148 if (_front) {
149 return *_front->payload;
150 }
151 return *_list.front().payload;
152 }
153
157 size_t size() const noexcept {
158 return _size;
159 }
160
165 void reserve(size_t size) {
166 return _list.reserve(size);
167 }
168
171 void push_back(const T& payload) {
172 if (_size == 0) {
173 _front = std::make_unique<entry>(payload);
174 } else {
175 _list.emplace_back(payload);
176 }
177 ++_size;
178 }
179
182 void push_back(T&& payload) {
183 if (_size == 0) {
184 _front = std::make_unique<entry>(std::move(payload));
185 } else {
186 _list.emplace_back(std::move(payload));
187 }
188 ++_size;
189 }
190
194 void push_back(T&& payload, time_point timeout) {
195 if (timeout == time_point::max()) {
196 push_back(std::move(payload));
197 return;
198 }
199 if (_size == 0) {
200 _front = std::make_unique<entry>(std::move(payload), *this, timeout);
201 } else {
202 _list.emplace_back(std::move(payload), *this, timeout);
203 }
204 ++_size;
205 }
206
209 void pop_front() noexcept {
210 if (_front) {
211 _front.reset();
212 } else {
213 _list.pop_front();
214 }
215 --_size;
216 drop_expired_front();
217 }
218};
219
220}
Definition: expiring_fifo.hh:59
bool empty() const noexcept
Definition: expiring_fifo.hh:127
size_t size() const noexcept
Definition: expiring_fifo.hh:157
void pop_front() noexcept
Definition: expiring_fifo.hh:209
void push_back(const T &payload)
Definition: expiring_fifo.hh:171
const T & front() const noexcept
Definition: expiring_fifo.hh:147
void reserve(size_t size)
Definition: expiring_fifo.hh:165
T & front() noexcept
Definition: expiring_fifo.hh:138
void push_back(T &&payload)
Definition: expiring_fifo.hh:182
void push_back(T &&payload, time_point timeout)
Definition: expiring_fifo.hh:194
void set_exception(std::exception_ptr &&ex) noexcept
Marks the promise as failed.
Definition: future.hh:998
Definition: timed_out_error.hh:34
void arm(time_point until, std::optional< duration > period={}) noexcept
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
Definition: expiring_fifo.hh:38
Definition: expiring_fifo.hh:43