24#include <seastar/core/abort_source.hh>
25#include <seastar/core/future.hh>
26#include <seastar/core/chunked_fifo.hh>
27#include <seastar/util/assert.hh>
28#include <seastar/util/modules.hh>
41template <
typename Aborter,
typename T>
42concept aborter_ex = std::is_nothrow_invocable_r_v<void, Aborter, T&, const std::optional<std::exception_ptr>&>;
44template <
typename Aborter,
typename T>
45concept aborter = std::is_nothrow_invocable_r_v<void, Aborter, T&> || aborter_ex<Aborter, T>;
48template<
typename... T>
50 void operator()(T...) noexcept {};
62template <
typename T,
typename OnAbort = noop_aborter<T>>
63requires aborter<OnAbort, T>
67 std::optional<T> payload;
68 optimized_optional<abort_source::subscription> sub;
69 entry(T&& payload_) : payload(
std::move(payload_)) {}
70 entry(
const T& payload_) : payload(payload_) {}
71 entry(T payload_, abortable_fifo& ef, abort_source& as)
72 : payload(
std::move(payload_))
73 , sub(as.subscribe([this, &ef] (const
std::optional<
std::exception_ptr>& ex_opt) noexcept {
74 if constexpr (aborter_ex<OnAbort, T>) {
75 ef._on_abort(*payload, ex_opt);
77 ef._on_abort(*payload);
79 payload = std::nullopt;
81 ef.drop_expired_front();
84 entry(entry&& x) =
delete;
85 entry(
const entry& x) =
delete;
92 std::unique_ptr<entry> _front;
95 chunked_fifo<entry> _list;
100 void drop_expired_front() noexcept {
101 while (!_list.empty() && !_list.front().payload) {
104 if (_front && !_front->payload) {
109 abortable_fifo() noexcept = default;
110 abortable_fifo(OnAbort on_abort) noexcept(
std::is_nothrow_move_constructible_v<OnAbort>) : _on_abort(
std::move(on_abort)) {}
112 abortable_fifo(abortable_fifo&& o) noexcept
113 : abortable_fifo(std::move(o._on_abort)) {
115 SEASTAR_ASSERT(o._size == 0);
118 abortable_fifo& operator=(abortable_fifo&& o)
noexcept {
120 this->~abortable_fifo();
121 new (
this) abortable_fifo(std::move(o));
131 bool empty() const noexcept {
136 explicit operator bool() const noexcept {
142 T& front() noexcept {
144 return *_front->payload;
146 return *_list.front().payload;
151 const T& front() const noexcept {
153 return *_front->payload;
155 return *_list.front().payload;
161 size_t size() const noexcept {
169 void reserve(
size_t size) {
170 return _list.reserve(size);
175 void push_back(
const T& payload) {
177 _front = std::make_unique<entry>(payload);
179 _list.emplace_back(payload);
186 void push_back(T&& payload) {
188 _front = std::make_unique<entry>(std::move(payload));
190 _list.emplace_back(std::move(payload));
197 void push_back(T&& payload, abort_source& as) {
198 if (as.abort_requested()) {
199 if constexpr (aborter_ex<OnAbort, T>) {
200 _on_abort(payload, std::nullopt);
207 _front = std::make_unique<entry>(std::move(payload), *
this, as);
209 _list.emplace_back(std::move(payload), *
this, as);
215 template<
typename... U>
216 T& emplace_back(U&&... args) {
218 _front = std::make_unique<entry>();
219 _front->payload.emplace(std::forward<U>(args)...);
221 return *_front->payload;
223 _list.emplace_back();
224 _list.back().payload.emplace(std::forward<U>(args)...);
226 return *_list.back().payload;
235 void make_back_abortable(abort_source& as) {
236 entry* e = _front.get();
237 if (!_list.empty()) {
240 SEASTAR_ASSERT(!e->sub);
241 auto aborter = [
this, e] (
const std::optional<std::exception_ptr>& ex_opt)
noexcept {
242 if constexpr (aborter_ex<OnAbort, T>) {
243 _on_abort(*e->payload, ex_opt);
245 _on_abort(*e->payload);
247 e->payload = std::nullopt;
249 drop_expired_front();
251 if (as.abort_requested()) {
252 aborter(as.abort_requested_exception_ptr());
255 e->sub = as.subscribe(std::move(aborter));
260 void pop_front() noexcept {
267 drop_expired_front();
Seastar API namespace.
Definition: abort_on_ebadf.hh:26