24#include <seastar/core/abort_source.hh>
25#include <seastar/core/future.hh>
26#include <seastar/core/chunked_fifo.hh>
27#include <seastar/util/modules.hh>
40template <
typename Aborter,
typename T>
41concept aborter_ex = std::is_nothrow_invocable_r_v<void, Aborter, T&, const std::optional<std::exception_ptr>&>;
43template <
typename Aborter,
typename T>
44concept aborter = std::is_nothrow_invocable_r_v<void, Aborter, T&> || aborter_ex<Aborter, T>;
47template<
typename... T>
49 void operator()(T...) noexcept {};
61template <
typename T,
typename OnAbort = noop_aborter<T>>
62requires aborter<OnAbort, T>
66 std::optional<T> payload;
67 optimized_optional<abort_source::subscription> sub;
68 entry(T&& payload_) : payload(
std::move(payload_)) {}
69 entry(
const T& payload_) : payload(payload_) {}
70 entry(T payload_, abortable_fifo& ef, abort_source& as)
71 : payload(
std::move(payload_))
72 , sub(as.subscribe([this, &ef] (const
std::optional<
std::exception_ptr>& ex_opt) noexcept {
73 if constexpr (aborter_ex<OnAbort, T>) {
74 ef._on_abort(*payload, ex_opt);
76 ef._on_abort(*payload);
78 payload = std::nullopt;
80 ef.drop_expired_front();
83 entry(entry&& x) =
delete;
84 entry(
const entry& x) =
delete;
91 std::unique_ptr<entry> _front;
94 chunked_fifo<entry> _list;
99 void drop_expired_front() noexcept {
100 while (!_list.empty() && !_list.front().payload) {
103 if (_front && !_front->payload) {
108 abortable_fifo() noexcept = default;
109 abortable_fifo(OnAbort on_abort) noexcept(
std::is_nothrow_move_constructible_v<OnAbort>) : _on_abort(
std::move(on_abort)) {}
111 abortable_fifo(abortable_fifo&& o) noexcept
112 : abortable_fifo(std::move(o._on_abort)) {
114 assert(o._size == 0);
117 abortable_fifo& operator=(abortable_fifo&& o)
noexcept {
119 this->~abortable_fifo();
120 new (
this) abortable_fifo(std::move(o));
130 bool empty() const noexcept {
135 explicit operator bool() const noexcept {
141 T& front() noexcept {
143 return *_front->payload;
145 return *_list.front().payload;
150 const T& front() const noexcept {
152 return *_front->payload;
154 return *_list.front().payload;
160 size_t size() const noexcept {
168 void reserve(
size_t size) {
169 return _list.reserve(size);
174 void push_back(
const T& payload) {
176 _front = std::make_unique<entry>(payload);
178 _list.emplace_back(payload);
185 void push_back(T&& payload) {
187 _front = std::make_unique<entry>(std::move(payload));
189 _list.emplace_back(std::move(payload));
196 void push_back(T&& payload, abort_source& as) {
197 if (as.abort_requested()) {
198 if constexpr (aborter_ex<OnAbort, T>) {
199 _on_abort(payload, std::nullopt);
206 _front = std::make_unique<entry>(std::move(payload), *
this, as);
208 _list.emplace_back(std::move(payload), *
this, as);
214 template<
typename... U>
215 T& emplace_back(U&&... args) {
217 _front = std::make_unique<entry>();
218 _front->payload.emplace(std::forward<U>(args)...);
220 return *_front->payload;
222 _list.emplace_back();
223 _list.back().payload.emplace(std::forward<U>(args)...);
225 return *_list.back().payload;
234 void make_back_abortable(abort_source& as) {
235 entry* e = _front.get();
236 if (!_list.empty()) {
240 auto aborter = [
this, e] (
const std::optional<std::exception_ptr>& ex_opt)
noexcept {
241 if constexpr (aborter_ex<OnAbort, T>) {
242 _on_abort(*e->payload, ex_opt);
244 _on_abort(*e->payload);
246 e->payload = std::nullopt;
248 drop_expired_front();
250 if (as.abort_requested()) {
251 aborter(as.abort_requested_exception_ptr());
254 e->sub = as.subscribe(std::move(aborter));
259 void pop_front() noexcept {
266 drop_expired_front();
Seastar API namespace.
Definition: abort_on_ebadf.hh:26