Seastar
High performance C++ framework for concurrent servers
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Modules Pages
abortable_fifo.hh
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18/*
19 * Copyright (C) 2022 ScyllaDB
20 */
21
22#pragma once
23
24#include <seastar/core/abort_source.hh>
25#include <seastar/core/future.hh>
26#include <seastar/core/chunked_fifo.hh>
27#include <seastar/util/modules.hh>
28
29#ifndef SEASTAR_MODULE
30#include <memory>
31#include <optional>
32#include <type_traits>
33#endif
34
35namespace seastar {
36
37namespace internal {
38
39// Test if aborter has call operator accepting optional exception_ptr
40template <typename Aborter, typename T>
41concept aborter_ex = std::is_nothrow_invocable_r_v<void, Aborter, T&, const std::optional<std::exception_ptr>&>;
42
43template <typename Aborter, typename T>
44concept aborter = std::is_nothrow_invocable_r_v<void, Aborter, T&> || aborter_ex<Aborter, T>;
45
46// This class satisfies 'aborter' concept and is used by default
47template<typename... T>
48struct noop_aborter {
49 void operator()(T...) noexcept {};
50};
51
52
61template <typename T, typename OnAbort = noop_aborter<T>>
62requires aborter<OnAbort, T>
63class abortable_fifo {
64private:
65 struct entry {
66 std::optional<T> payload; // disengaged means that it's expired
67 optimized_optional<abort_source::subscription> sub;
68 entry(T&& payload_) : payload(std::move(payload_)) {}
69 entry(const T& payload_) : payload(payload_) {}
70 entry(T payload_, abortable_fifo& ef, abort_source& as)
71 : payload(std::move(payload_))
72 , sub(as.subscribe([this, &ef] (const std::optional<std::exception_ptr>& ex_opt) noexcept {
73 if constexpr (aborter_ex<OnAbort, T>) {
74 ef._on_abort(*payload, ex_opt);
75 } else {
76 ef._on_abort(*payload);
77 }
78 payload = std::nullopt;
79 --ef._size;
80 ef.drop_expired_front();
81 })) {}
82 entry() = default;
83 entry(entry&& x) = delete;
84 entry(const entry& x) = delete;
85 };
86
87 // If engaged, represents the first element.
88 // This is to avoid large allocations done by chunked_fifo for single-element cases.
89 // abortable_fifo is used to implement wait lists in synchronization primitives
90 // and in some uses it's common to have at most one waiter.
91 std::unique_ptr<entry> _front;
92
93 // There is an invariant that the front element is never expired.
94 chunked_fifo<entry> _list;
95 OnAbort _on_abort;
96 size_t _size = 0;
97
98 // Ensures that front() is not expired by dropping expired elements from the front.
99 void drop_expired_front() noexcept {
100 while (!_list.empty() && !_list.front().payload) {
101 _list.pop_front();
102 }
103 if (_front && !_front->payload) {
104 _front.reset();
105 }
106 }
107public:
108 abortable_fifo() noexcept = default;
109 abortable_fifo(OnAbort on_abort) noexcept(std::is_nothrow_move_constructible_v<OnAbort>) : _on_abort(std::move(on_abort)) {}
110
111 abortable_fifo(abortable_fifo&& o) noexcept
112 : abortable_fifo(std::move(o._on_abort)) {
113 // entry objects hold a reference to this so non-empty containers cannot be moved.
114 assert(o._size == 0);
115 }
116
117 abortable_fifo& operator=(abortable_fifo&& o) noexcept {
118 if (this != &o) {
119 this->~abortable_fifo();
120 new (this) abortable_fifo(std::move(o));
121 }
122 return *this;
123 }
124
130 bool empty() const noexcept {
131 return _size == 0;
132 }
133
135 explicit operator bool() const noexcept {
136 return !empty();
137 }
138
141 T& front() noexcept {
142 if (_front) {
143 return *_front->payload;
144 }
145 return *_list.front().payload;
146 }
147
150 const T& front() const noexcept {
151 if (_front) {
152 return *_front->payload;
153 }
154 return *_list.front().payload;
155 }
156
160 size_t size() const noexcept {
161 return _size;
162 }
163
168 void reserve(size_t size) {
169 return _list.reserve(size);
170 }
171
174 void push_back(const T& payload) {
175 if (_size == 0) {
176 _front = std::make_unique<entry>(payload);
177 } else {
178 _list.emplace_back(payload);
179 }
180 ++_size;
181 }
182
185 void push_back(T&& payload) {
186 if (_size == 0) {
187 _front = std::make_unique<entry>(std::move(payload));
188 } else {
189 _list.emplace_back(std::move(payload));
190 }
191 ++_size;
192 }
193
196 void push_back(T&& payload, abort_source& as) {
197 if (as.abort_requested()) {
198 if constexpr (aborter_ex<OnAbort, T>) {
199 _on_abort(payload, std::nullopt);
200 } else {
201 _on_abort(payload);
202 }
203 return;
204 }
205 if (_size == 0) {
206 _front = std::make_unique<entry>(std::move(payload), *this, as);
207 } else {
208 _list.emplace_back(std::move(payload), *this, as);
209 }
210 ++_size;
211 }
212
214 template<typename... U>
215 T& emplace_back(U&&... args) {
216 if (_size == 0) {
217 _front = std::make_unique<entry>();
218 _front->payload.emplace(std::forward<U>(args)...);
219 _size = 1;
220 return *_front->payload;
221 } else {
222 _list.emplace_back();
223 _list.back().payload.emplace(std::forward<U>(args)...);
224 ++_size;
225 return *_list.back().payload;
226 }
227 }
228
234 void make_back_abortable(abort_source& as) {
235 entry* e = _front.get();
236 if (!_list.empty()) {
237 e = &_list.back();
238 }
239 assert(!e->sub);
240 auto aborter = [this, e] (const std::optional<std::exception_ptr>& ex_opt) noexcept {
241 if constexpr (aborter_ex<OnAbort, T>) {
242 _on_abort(*e->payload, ex_opt);
243 } else {
244 _on_abort(*e->payload);
245 }
246 e->payload = std::nullopt;
247 --_size;
248 drop_expired_front();
249 };
250 if (as.abort_requested()) {
251 aborter(as.abort_requested_exception_ptr());
252 return;
253 }
254 e->sub = as.subscribe(std::move(aborter));
255 }
256
259 void pop_front() noexcept {
260 if (_front) {
261 _front.reset();
262 } else {
263 _list.pop_front();
264 }
265 --_size;
266 drop_expired_front();
267 }
268};
269
270}
271}
272
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
STL namespace.