Seastar
High performance C++ framework for concurrent servers
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Modules Pages
abortable_fifo.hh
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18/*
19 * Copyright (C) 2022 ScyllaDB
20 */
21
22#pragma once
23
24#include <seastar/core/abort_source.hh>
25#include <seastar/core/future.hh>
26#include <seastar/core/chunked_fifo.hh>
27#include <seastar/util/assert.hh>
28#include <seastar/util/modules.hh>
29
30#ifndef SEASTAR_MODULE
31#include <memory>
32#include <optional>
33#include <type_traits>
34#endif
35
36namespace seastar {
37
38namespace internal {
39
40// Test if aborter has call operator accepting optional exception_ptr
41template <typename Aborter, typename T>
42concept aborter_ex = std::is_nothrow_invocable_r_v<void, Aborter, T&, const std::optional<std::exception_ptr>&>;
43
44template <typename Aborter, typename T>
45concept aborter = std::is_nothrow_invocable_r_v<void, Aborter, T&> || aborter_ex<Aborter, T>;
46
47// This class satisfies 'aborter' concept and is used by default
48template<typename... T>
49struct noop_aborter {
50 void operator()(T...) noexcept {};
51};
52
53
62template <typename T, typename OnAbort = noop_aborter<T>>
63requires aborter<OnAbort, T>
64class abortable_fifo {
65private:
66 struct entry {
67 std::optional<T> payload; // disengaged means that it's expired
68 optimized_optional<abort_source::subscription> sub;
69 entry(T&& payload_) : payload(std::move(payload_)) {}
70 entry(const T& payload_) : payload(payload_) {}
71 entry(T payload_, abortable_fifo& ef, abort_source& as)
72 : payload(std::move(payload_))
73 , sub(as.subscribe([this, &ef] (const std::optional<std::exception_ptr>& ex_opt) noexcept {
74 if constexpr (aborter_ex<OnAbort, T>) {
75 ef._on_abort(*payload, ex_opt);
76 } else {
77 ef._on_abort(*payload);
78 }
79 payload = std::nullopt;
80 --ef._size;
81 ef.drop_expired_front();
82 })) {}
83 entry() = default;
84 entry(entry&& x) = delete;
85 entry(const entry& x) = delete;
86 };
87
88 // If engaged, represents the first element.
89 // This is to avoid large allocations done by chunked_fifo for single-element cases.
90 // abortable_fifo is used to implement wait lists in synchronization primitives
91 // and in some uses it's common to have at most one waiter.
92 std::unique_ptr<entry> _front;
93
94 // There is an invariant that the front element is never expired.
95 chunked_fifo<entry> _list;
96 OnAbort _on_abort;
97 size_t _size = 0;
98
99 // Ensures that front() is not expired by dropping expired elements from the front.
100 void drop_expired_front() noexcept {
101 while (!_list.empty() && !_list.front().payload) {
102 _list.pop_front();
103 }
104 if (_front && !_front->payload) {
105 _front.reset();
106 }
107 }
108public:
109 abortable_fifo() noexcept = default;
110 abortable_fifo(OnAbort on_abort) noexcept(std::is_nothrow_move_constructible_v<OnAbort>) : _on_abort(std::move(on_abort)) {}
111
112 abortable_fifo(abortable_fifo&& o) noexcept
113 : abortable_fifo(std::move(o._on_abort)) {
114 // entry objects hold a reference to this so non-empty containers cannot be moved.
115 SEASTAR_ASSERT(o._size == 0);
116 }
117
118 abortable_fifo& operator=(abortable_fifo&& o) noexcept {
119 if (this != &o) {
120 this->~abortable_fifo();
121 new (this) abortable_fifo(std::move(o));
122 }
123 return *this;
124 }
125
131 bool empty() const noexcept {
132 return _size == 0;
133 }
134
136 explicit operator bool() const noexcept {
137 return !empty();
138 }
139
142 T& front() noexcept {
143 if (_front) {
144 return *_front->payload;
145 }
146 return *_list.front().payload;
147 }
148
151 const T& front() const noexcept {
152 if (_front) {
153 return *_front->payload;
154 }
155 return *_list.front().payload;
156 }
157
161 size_t size() const noexcept {
162 return _size;
163 }
164
169 void reserve(size_t size) {
170 return _list.reserve(size);
171 }
172
175 void push_back(const T& payload) {
176 if (_size == 0) {
177 _front = std::make_unique<entry>(payload);
178 } else {
179 _list.emplace_back(payload);
180 }
181 ++_size;
182 }
183
186 void push_back(T&& payload) {
187 if (_size == 0) {
188 _front = std::make_unique<entry>(std::move(payload));
189 } else {
190 _list.emplace_back(std::move(payload));
191 }
192 ++_size;
193 }
194
197 void push_back(T&& payload, abort_source& as) {
198 if (as.abort_requested()) {
199 if constexpr (aborter_ex<OnAbort, T>) {
200 _on_abort(payload, std::nullopt);
201 } else {
202 _on_abort(payload);
203 }
204 return;
205 }
206 if (_size == 0) {
207 _front = std::make_unique<entry>(std::move(payload), *this, as);
208 } else {
209 _list.emplace_back(std::move(payload), *this, as);
210 }
211 ++_size;
212 }
213
215 template<typename... U>
216 T& emplace_back(U&&... args) {
217 if (_size == 0) {
218 _front = std::make_unique<entry>();
219 _front->payload.emplace(std::forward<U>(args)...);
220 _size = 1;
221 return *_front->payload;
222 } else {
223 _list.emplace_back();
224 _list.back().payload.emplace(std::forward<U>(args)...);
225 ++_size;
226 return *_list.back().payload;
227 }
228 }
229
235 void make_back_abortable(abort_source& as) {
236 entry* e = _front.get();
237 if (!_list.empty()) {
238 e = &_list.back();
239 }
240 SEASTAR_ASSERT(!e->sub);
241 auto aborter = [this, e] (const std::optional<std::exception_ptr>& ex_opt) noexcept {
242 if constexpr (aborter_ex<OnAbort, T>) {
243 _on_abort(*e->payload, ex_opt);
244 } else {
245 _on_abort(*e->payload);
246 }
247 e->payload = std::nullopt;
248 --_size;
249 drop_expired_front();
250 };
251 if (as.abort_requested()) {
252 aborter(as.abort_requested_exception_ptr());
253 return;
254 }
255 e->sub = as.subscribe(std::move(aborter));
256 }
257
260 void pop_front() noexcept {
261 if (_front) {
262 _front.reset();
263 } else {
264 _list.pop_front();
265 }
266 --_size;
267 drop_expired_front();
268 }
269};
270
271}
272}
273
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
STL namespace.