Seastar
High performance C++ framework for concurrent servers
abortable_fifo.hh
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18/*
19 * Copyright (C) 2022 ScyllaDB
20 */
21
22#pragma once
23
24#include <seastar/core/abort_source.hh>
25#include <seastar/core/future.hh>
26#include <seastar/core/chunked_fifo.hh>
27#include <seastar/util/modules.hh>
28
29#ifndef SEASTAR_MODULE
30#include <memory>
31#include <optional>
32#include <type_traits>
33#endif
34
35namespace seastar {
36
37namespace internal {
38
39template <typename Aborter, typename T>
40concept aborter = std::is_nothrow_invocable_r_v<void, Aborter, T&>;
41
42// This class satisfies 'aborter' concept and is used by default
43template<typename... T>
44struct noop_aborter {
45 void operator()(T...) noexcept {};
46};
47
48
57template <typename T, typename OnAbort = noop_aborter<T>>
58requires aborter<OnAbort, T>
59class abortable_fifo {
60private:
61 struct entry {
62 std::optional<T> payload; // disengaged means that it's expired
63 optimized_optional<abort_source::subscription> sub;
64 entry(T&& payload_) : payload(std::move(payload_)) {}
65 entry(const T& payload_) : payload(payload_) {}
66 entry(T payload_, abortable_fifo& ef, abort_source& as)
67 : payload(std::move(payload_))
68 , sub(as.subscribe([this, &ef] () noexcept {
69 ef._on_abort(*payload);
70 payload = std::nullopt;
71 --ef._size;
72 ef.drop_expired_front();
73 })) {}
74 entry() = default;
75 entry(entry&& x) = delete;
76 entry(const entry& x) = delete;
77 };
78
79 // If engaged, represents the first element.
80 // This is to avoid large allocations done by chunked_fifo for single-element cases.
81 // abortable_fifo is used to implement wait lists in synchronization primitives
82 // and in some uses it's common to have at most one waiter.
83 std::unique_ptr<entry> _front;
84
85 // There is an invariant that the front element is never expired.
86 chunked_fifo<entry> _list;
87 OnAbort _on_abort;
88 size_t _size = 0;
89
90 // Ensures that front() is not expired by dropping expired elements from the front.
91 void drop_expired_front() noexcept {
92 while (!_list.empty() && !_list.front().payload) {
93 _list.pop_front();
94 }
95 if (_front && !_front->payload) {
96 _front.reset();
97 }
98 }
99public:
100 abortable_fifo() noexcept = default;
101 abortable_fifo(OnAbort on_abort) noexcept(std::is_nothrow_move_constructible_v<OnAbort>) : _on_abort(std::move(on_abort)) {}
102
103 abortable_fifo(abortable_fifo&& o) noexcept
104 : abortable_fifo(std::move(o._on_abort)) {
105 // entry objects hold a reference to this so non-empty containers cannot be moved.
106 assert(o._size == 0);
107 }
108
109 abortable_fifo& operator=(abortable_fifo&& o) noexcept {
110 if (this != &o) {
111 this->~abortable_fifo();
112 new (this) abortable_fifo(std::move(o));
113 }
114 return *this;
115 }
116
122 bool empty() const noexcept {
123 return _size == 0;
124 }
125
127 explicit operator bool() const noexcept {
128 return !empty();
129 }
130
133 T& front() noexcept {
134 if (_front) {
135 return *_front->payload;
136 }
137 return *_list.front().payload;
138 }
139
142 const T& front() const noexcept {
143 if (_front) {
144 return *_front->payload;
145 }
146 return *_list.front().payload;
147 }
148
152 size_t size() const noexcept {
153 return _size;
154 }
155
160 void reserve(size_t size) {
161 return _list.reserve(size);
162 }
163
166 void push_back(const T& payload) {
167 if (_size == 0) {
168 _front = std::make_unique<entry>(payload);
169 } else {
170 _list.emplace_back(payload);
171 }
172 ++_size;
173 }
174
177 void push_back(T&& payload) {
178 if (_size == 0) {
179 _front = std::make_unique<entry>(std::move(payload));
180 } else {
181 _list.emplace_back(std::move(payload));
182 }
183 ++_size;
184 }
185
188 void push_back(T&& payload, abort_source& as) {
189 if (as.abort_requested()) {
190 _on_abort(payload);
191 return;
192 }
193 if (_size == 0) {
194 _front = std::make_unique<entry>(std::move(payload), *this, as);
195 } else {
196 _list.emplace_back(std::move(payload), *this, as);
197 }
198 ++_size;
199 }
200
202 template<typename... U>
203 T& emplace_back(U&&... args) {
204 if (_size == 0) {
205 _front = std::make_unique<entry>();
206 _front->payload.emplace(std::forward<U>(args)...);
207 _size = 1;
208 return *_front->payload;
209 } else {
210 _list.emplace_back();
211 _list.back().payload.emplace(std::forward<U>(args)...);
212 ++_size;
213 return *_list.back().payload;
214 }
215 }
216
222 void make_back_abortable(abort_source& as) {
223 entry* e = _front.get();
224 if (!_list.empty()) {
225 e = &_list.back();
226 }
227 assert(!e->sub);
228 auto aborter = [this, e] () noexcept {
229 _on_abort(*e->payload);
230 e->payload = std::nullopt;
231 --_size;
232 drop_expired_front();
233 };
234 if (as.abort_requested()) {
235 aborter();
236 return;
237 }
238 e->sub = as.subscribe(std::move(aborter));
239 }
240
243 void pop_front() noexcept {
244 if (_front) {
245 _front.reset();
246 } else {
247 _list.pop_front();
248 }
249 --_size;
250 drop_expired_front();
251 }
252};
253
254}
255}
256
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
STL namespace.