25#include <seastar/core/future.hh>
26#include <seastar/core/chunked_fifo.hh>
28#include <seastar/core/abortable_fifo.hh>
29#include <seastar/core/timed_out_error.hh>
30#include <seastar/core/abort_on_expiry.hh>
31#include <seastar/util/modules.hh>
46 template <
typename U>
constexpr static bool check(
decltype(&U::broken)) {
return true; }
47 template <
typename U>
constexpr static bool check(...) {
return false; }
50 constexpr static bool value = check<T>(
nullptr);
55 template <
typename U>
constexpr static bool check(
decltype(&U::aborted)) {
return true; }
56 template <
typename U>
constexpr static bool check(...) {
return false; }
59 constexpr static bool value = check<T>(
nullptr);
65SEASTAR_MODULE_EXPORT_BEGIN
71 virtual const char*
what() const noexcept;
81 virtual const char*
what() const noexcept;
91 virtual const char*
what() const noexcept;
108 virtual const char*
what() const noexcept;
115 virtual const char*
what() const noexcept;
122 virtual const char*
what() const noexcept;
153template<typename ExceptionFactory, typename Clock = typename timer<>::clock>
159 using exception_factory = ExceptionFactory;
162 std::exception_ptr _ex;
166 std::optional<abort_on_expiry<clock>>
timer;
167 entry(
promise<>&& pr_,
size_t nr_) noexcept : pr(std::move(pr_)), nr(nr_) {}
169 std::exception_ptr get_timeout_exception() {
171 return std::make_exception_ptr(this->timeout());
176 std::exception_ptr get_aborted_exception() {
177 if constexpr (internal::has_aborted<exception_factory>::value) {
179 return std::make_exception_ptr(this->aborted());
187 struct expiry_handler {
189 void operator()(entry& e,
const std::optional<std::exception_ptr>& ex)
noexcept {
191 e.pr.set_exception(sem.get_timeout_exception());
193 e.pr.set_exception(*ex);
194 }
else if (sem._ex) {
195 e.pr.set_exception(sem._ex);
197 e.pr.set_exception(sem.get_aborted_exception());
201 internal::abortable_fifo<entry, expiry_handler> _wait_list;
203#ifdef SEASTAR_SEMAPHORE_DEBUG
209 used_flag() =
default;
210 used_flag(used_flag&& o)
noexcept {
211 assert(!_used &&
"semaphore cannot be moved after it has been used");
213 used_flag& operator=(used_flag&& o)
noexcept {
215 assert(!_used && !o._used &&
"semaphore cannot be moved after it has been used");
219 void use()
noexcept {
225 void use()
noexcept {}
229 [[no_unique_address]] used_flag _used;
231 bool has_available_units(
size_t nr)
const noexcept {
232 return _count >= 0 && (
static_cast<size_t>(_count) >= nr);
234 bool may_proceed(
size_t nr)
const noexcept {
235 return has_available_units(nr) && _wait_list.empty();
240 return std::numeric_limits<
decltype(_count)>::max();
248 basic_semaphore(
size_t count)
noexcept(std::is_nothrow_default_constructible_v<exception_factory>)
249 : exception_factory()
251 _wait_list(expiry_handler{*
this})
253 basic_semaphore(
size_t count, exception_factory&& factory)
noexcept(std::is_nothrow_move_constructible_v<exception_factory>)
254 : exception_factory(std::move(factory))
256 , _wait_list(expiry_handler{*
this})
258 static_assert(std::is_nothrow_move_constructible_v<expiry_handler>);
267 : exception_factory(other)
268 , _count(other._count)
269 , _ex(std::exchange(other._ex, std::exception_ptr()))
270 , _wait_list(expiry_handler{*
this})
271 , _used(std::move(other._used))
274 assert(other._wait_list.empty());
286 assert(_wait_list.empty());
287 assert(other._wait_list.empty());
288 if (
this != &other) {
289 exception_factory::operator=(other);
290 _count = other._count;
291 _ex = std::exchange(other._ex, std::exception_ptr());
292 _used = std::move(other._used);
308 return wait(time_point::max(), nr);
325 if (may_proceed(nr)) {
327 return make_ready_future<>();
336 entry& e = _wait_list.emplace_back(
promise<>(), nr);
337 auto f = e.pr.get_future();
338 if (timeout != time_point::max()) {
339 e.timer.emplace(timeout);
341 _wait_list.make_back_abortable(as);
364 if (may_proceed(nr)) {
366 return make_ready_future<>();
375 entry& e = _wait_list.emplace_back(
promise<>(), nr);
377 auto f = e.pr.get_future();
378 _wait_list.make_back_abortable(as);
415 while (!_wait_list.empty() && has_available_units(_wait_list.front().nr)) {
416 auto& x = _wait_list.front();
419 _wait_list.pop_front();
450 if (may_proceed(nr)) {
460 size_t current() const noexcept {
return std::max(_count, ssize_t(0)); }
469 size_t waiters() const noexcept {
return _wait_list.size(); }
475 std::exception_ptr ep;
476 if constexpr (internal::has_broken<exception_factory>::value) {
478 ep = std::make_exception_ptr(exception_factory::broken());
485 broken(std::move(ep));
491 template <
typename Exception>
492 void broken(
const Exception& ex)
noexcept {
493 broken(std::make_exception_ptr(ex));
499 void broken(std::exception_ptr ex)
noexcept;
503 _wait_list.reserve(n);
506SEASTAR_MODULE_EXPORT_END
508template<
typename ExceptionFactory,
typename Clock>
512 static_assert(std::is_nothrow_copy_constructible_v<std::exception_ptr>);
515 while (!_wait_list.empty()) {
516 auto& x = _wait_list.front();
517 x.pr.set_exception(xp);
518 _wait_list.pop_front();
522SEASTAR_MODULE_EXPORT_BEGIN
524template<typename ExceptionFactory = semaphore_default_exception_factory, typename Clock = typename timer<>::clock>
539 _n = std::exchange(o._n, 0);
556 throw std::invalid_argument(
"Cannot take more units than those protected by the semaphore");
573 return std::exchange(_n, 0);
585 throw std::invalid_argument(
"Cannot take more units than those protected by the semaphore");
596 assert(other._sem == _sem);
597 _n += other.release();
606 explicit operator bool() const noexcept {
628template<typename ExceptionFactory, typename Clock = typename timer<>::clock>
629future<semaphore_units<ExceptionFactory, Clock>>
630get_units(basic_semaphore<ExceptionFactory, Clock>& sem,
size_t units)
noexcept {
631 return sem.wait(units).then([&sem, units] {
632 return semaphore_units<ExceptionFactory, Clock>{ sem, units };
651template<typename ExceptionFactory, typename Clock = typename timer<>::clock>
652future<semaphore_units<ExceptionFactory, Clock>>
653get_units(basic_semaphore<ExceptionFactory, Clock>& sem,
size_t units,
typename basic_semaphore<ExceptionFactory, Clock>::time_point timeout)
noexcept {
654 return sem.wait(timeout, units).then([&sem, units] {
655 return semaphore_units<ExceptionFactory, Clock>{ sem, units };
675template<
typename ExceptionFactory,
typename Clock>
676future<semaphore_units<ExceptionFactory, Clock>>
677get_units(basic_semaphore<ExceptionFactory, Clock>& sem,
size_t units,
typename basic_semaphore<ExceptionFactory, Clock>::duration timeout)
noexcept {
678 return sem.wait(timeout, units).then([&sem, units] {
679 return semaphore_units<ExceptionFactory, Clock>{ sem, units };
699template<
typename ExceptionFactory,
typename Clock>
700future<semaphore_units<ExceptionFactory, Clock>>
701get_units(basic_semaphore<ExceptionFactory, Clock>& sem,
size_t units, abort_source& as)
noexcept {
702 return sem.wait(as, units).then([&sem, units] {
703 return semaphore_units<ExceptionFactory, Clock>{ sem, units };
724template<typename ExceptionFactory, typename Clock = typename timer<>::clock>
725std::optional<semaphore_units<ExceptionFactory, Clock>>
726try_get_units(basic_semaphore<ExceptionFactory, Clock>& sem,
size_t units)
noexcept {
727 if (!sem.try_wait(units)) {
730 return std::make_optional<semaphore_units<ExceptionFactory, Clock>>(sem, units);
745template<typename ExceptionFactory, typename Clock = typename timer<>::clock>
746semaphore_units<ExceptionFactory, Clock>
773template <typename ExceptionFactory, typename Func, typename Clock = typename timer<>::clock>
775futurize_t<std::invoke_result_t<Func>>
776with_semaphore(basic_semaphore<ExceptionFactory, Clock>& sem,
size_t units, Func&& func)
noexcept {
777 return get_units(sem, units).then([func = std::forward<Func>(func)] (
auto units)
mutable {
778 return futurize_invoke(std::forward<Func>(func)).finally([units = std::move(units)] {});
807template <
typename ExceptionFactory,
typename Clock,
typename Func>
809futurize_t<std::invoke_result_t<Func>>
810with_semaphore(basic_semaphore<ExceptionFactory, Clock>& sem,
size_t units,
typename basic_semaphore<ExceptionFactory, Clock>::duration timeout, Func&& func)
noexcept {
811 return get_units(sem, units, timeout).then([func = std::forward<Func>(func)] (
auto units)
mutable {
812 return futurize_invoke(std::forward<Func>(func)).finally([units = std::move(units)] {});
821SEASTAR_MODULE_EXPORT_END
Definition: abort_source.hh:48
Definition: abort_source.hh:58
bool abort_requested() const noexcept
Returns whether an abort has been requested.
Definition: abort_source.hh:200
Counted resource guard.
Definition: semaphore.hh:154
basic_semaphore(size_t count) noexcept(std::is_nothrow_default_constructible_v< exception_factory >)
Definition: semaphore.hh:248
future wait(abort_source &as, size_t nr=1) noexcept
Definition: semaphore.hh:362
static constexpr size_t max_counter() noexcept
Returns the maximum number of units the semaphore counter can hold.
Definition: semaphore.hh:239
size_t current() const noexcept
Definition: semaphore.hh:460
future wait(size_t nr=1) noexcept
Definition: semaphore.hh:307
void ensure_space_for_waiters(size_t n)
Reserve memory for waiters so that wait() will not throw.
Definition: semaphore.hh:502
future wait(time_point timeout, size_t nr=1) noexcept
Definition: semaphore.hh:323
future wait(duration timeout, size_t nr=1) noexcept
Definition: semaphore.hh:398
basic_semaphore & operator=(basic_semaphore &&other) noexcept(std::is_nothrow_move_assignable_v< exception_factory >)
Definition: semaphore.hh:284
void consume(size_t nr=1) noexcept
Consume the specific number of units without blocking.
Definition: semaphore.hh:430
ssize_t available_units() const noexcept
Definition: semaphore.hh:466
size_t waiters() const noexcept
Returns the current number of waiters.
Definition: semaphore.hh:469
bool try_wait(size_t nr=1) noexcept
Definition: semaphore.hh:448
void signal(size_t nr=1) noexcept
Definition: semaphore.hh:410
void broken(const Exception &ex) noexcept
Definition: semaphore.hh:492
basic_semaphore(basic_semaphore &&other) noexcept(std::is_nothrow_move_constructible_v< exception_factory >)
Definition: semaphore.hh:266
void broken() noexcept
Definition: semaphore.hh:474
Definition: semaphore.hh:111
virtual const char * what() const noexcept
Reports the exception reason.
Definition: semaphore.hh:68
virtual const char * what() const noexcept
Reports the exception reason.
A representation of a possibly not-yet-computed value.
Definition: future.hh:1219
Definition: semaphore.hh:118
virtual const char * what() const noexcept
Reports the exception reason.
Definition: semaphore.hh:104
virtual const char * what() const noexcept
Reports the exception reason.
Definition: semaphore.hh:88
virtual const char * what() const noexcept
Reports the exception reason.
Definition: semaphore.hh:78
virtual const char * what() const noexcept
Reports the exception reason.
Definition: semaphore.hh:525
void adopt(semaphore_units &&other) noexcept
Definition: semaphore.hh:595
size_t count() const noexcept
Returns the number of units held.
Definition: semaphore.hh:601
semaphore_units split(size_t units)
Definition: semaphore.hh:583
size_t release() noexcept
Definition: semaphore.hh:572
size_t return_units(size_t units)
Definition: semaphore.hh:554
void return_all() noexcept
Return ownership of all units. The semaphore will be signaled by the number of units returned.
Definition: semaphore.hh:563
Definition: timed_out_error.hh:34
semaphore_units< ExceptionFactory, Clock > consume_units(basic_semaphore< ExceptionFactory, Clock > &sem, size_t units) noexcept
Consume units from semaphore temporarily.
Definition: semaphore.hh:747
void broken(std::exception_ptr ex) noexcept
Definition: semaphore.hh:511
future< T > make_exception_future(std::exception_ptr &&value) noexcept
Creates a future in an available, failed state.
Definition: future.hh:1928
future now()
Returns a ready future.
Definition: later.hh:35
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
Definition: semaphore.hh:127
Definition: semaphore.hh:98