25#include <seastar/core/future.hh>
26#include <seastar/core/chunked_fifo.hh>
28#include <seastar/core/abortable_fifo.hh>
29#include <seastar/core/timed_out_error.hh>
30#include <seastar/core/abort_on_expiry.hh>
31#include <seastar/util/modules.hh>
46 template <
typename U>
constexpr static bool check(
decltype(&U::broken)) {
return true; }
47 template <
typename U>
constexpr static bool check(...) {
return false; }
50 constexpr static bool value = check<T>(
nullptr);
55 template <
typename U>
constexpr static bool check(
decltype(&U::aborted)) {
return true; }
56 template <
typename U>
constexpr static bool check(...) {
return false; }
59 constexpr static bool value = check<T>(
nullptr);
65SEASTAR_MODULE_EXPORT_BEGIN
71 virtual const char*
what() const noexcept;
81 virtual const char*
what() const noexcept;
91 virtual const char*
what() const noexcept;
108 virtual const char*
what() const noexcept;
115 virtual const char*
what() const noexcept;
122 virtual const char*
what() const noexcept;
153template<typename ExceptionFactory, typename Clock = typename timer<>::clock>
159 using exception_factory = ExceptionFactory;
162 std::exception_ptr _ex;
166 std::optional<abort_on_expiry<clock>>
timer;
167 entry(
promise<>&& pr_,
size_t nr_) noexcept : pr(std::move(pr_)), nr(nr_) {}
169 struct expiry_handler {
171 void operator()(entry& e)
noexcept {
174 e.pr.set_exception(sem.timeout());
178 }
else if (sem._ex) {
179 e.pr.set_exception(sem._ex);
181 if constexpr (internal::has_aborted<exception_factory>::value) {
183 e.pr.set_exception(
static_cast<exception_factory
>(sem).aborted());
193 internal::abortable_fifo<entry, expiry_handler> _wait_list;
195#ifdef SEASTAR_SEMAPHORE_DEBUG
201 used_flag() =
default;
202 used_flag(used_flag&& o)
noexcept {
203 assert(!_used &&
"semaphore cannot be moved after it has been used");
205 used_flag& operator=(used_flag&& o)
noexcept {
207 assert(!_used && !o._used &&
"semaphore cannot be moved after it has been used");
211 void use()
noexcept {
217 void use()
noexcept {}
221 [[no_unique_address]] used_flag _used;
223 bool has_available_units(
size_t nr)
const noexcept {
224 return _count >= 0 && (
static_cast<size_t>(_count) >= nr);
226 bool may_proceed(
size_t nr)
const noexcept {
227 return has_available_units(nr) && _wait_list.empty();
232 return std::numeric_limits<
decltype(_count)>::max();
240 basic_semaphore(
size_t count)
noexcept(std::is_nothrow_default_constructible_v<exception_factory>)
241 : exception_factory()
243 _wait_list(expiry_handler{*
this})
245 basic_semaphore(
size_t count, exception_factory&& factory)
noexcept(std::is_nothrow_move_constructible_v<exception_factory>)
246 : exception_factory(std::move(factory))
248 , _wait_list(expiry_handler{*
this})
250 static_assert(std::is_nothrow_move_constructible_v<expiry_handler>);
259 : exception_factory(other)
260 , _count(other._count)
261 , _ex(std::exchange(other._ex, std::exception_ptr()))
262 , _wait_list(expiry_handler{*
this})
263 , _used(std::move(other._used))
266 assert(other._wait_list.empty());
278 assert(_wait_list.empty());
279 assert(other._wait_list.empty());
280 if (
this != &other) {
281 exception_factory::operator=(other);
282 _count = other._count;
283 _ex = std::exchange(other._ex, std::exception_ptr());
284 _used = std::move(other._used);
300 return wait(time_point::max(), nr);
317 if (may_proceed(nr)) {
319 return make_ready_future<>();
325 entry& e = _wait_list.emplace_back(
promise<>(), nr);
326 auto f = e.pr.get_future();
327 if (timeout != time_point::max()) {
328 e.timer.emplace(timeout);
330 _wait_list.make_back_abortable(as);
353 if (may_proceed(nr)) {
355 return make_ready_future<>();
361 entry& e = _wait_list.emplace_back(
promise<>(), nr);
363 auto f = e.pr.get_future();
364 _wait_list.make_back_abortable(as);
401 while (!_wait_list.empty() && has_available_units(_wait_list.front().nr)) {
402 auto& x = _wait_list.front();
405 _wait_list.pop_front();
436 if (may_proceed(nr)) {
446 size_t current() const noexcept {
return std::max(_count, ssize_t(0)); }
455 size_t waiters() const noexcept {
return _wait_list.size(); }
461 std::exception_ptr ep;
462 if constexpr (internal::has_broken<exception_factory>::value) {
464 ep = std::make_exception_ptr(exception_factory::broken());
471 broken(std::move(ep));
477 template <
typename Exception>
478 void broken(
const Exception& ex)
noexcept {
479 broken(std::make_exception_ptr(ex));
485 void broken(std::exception_ptr ex)
noexcept;
489 _wait_list.reserve(n);
492SEASTAR_MODULE_EXPORT_END
494template<
typename ExceptionFactory,
typename Clock>
498 static_assert(std::is_nothrow_copy_constructible_v<std::exception_ptr>);
501 while (!_wait_list.empty()) {
502 auto& x = _wait_list.front();
503 x.pr.set_exception(xp);
504 _wait_list.pop_front();
508SEASTAR_MODULE_EXPORT_BEGIN
510template<typename ExceptionFactory = semaphore_default_exception_factory, typename Clock = typename timer<>::clock>
525 _n = std::exchange(o._n, 0);
542 throw std::invalid_argument(
"Cannot take more units than those protected by the semaphore");
559 return std::exchange(_n, 0);
571 throw std::invalid_argument(
"Cannot take more units than those protected by the semaphore");
582 assert(other._sem == _sem);
583 _n += other.release();
592 explicit operator bool() const noexcept {
614template<typename ExceptionFactory, typename Clock = typename timer<>::clock>
615future<semaphore_units<ExceptionFactory, Clock>>
616get_units(basic_semaphore<ExceptionFactory, Clock>& sem,
size_t units)
noexcept {
617 return sem.wait(units).then([&sem, units] {
618 return semaphore_units<ExceptionFactory, Clock>{ sem, units };
637template<typename ExceptionFactory, typename Clock = typename timer<>::clock>
638future<semaphore_units<ExceptionFactory, Clock>>
639get_units(basic_semaphore<ExceptionFactory, Clock>& sem,
size_t units,
typename basic_semaphore<ExceptionFactory, Clock>::time_point timeout)
noexcept {
640 return sem.wait(timeout, units).then([&sem, units] {
641 return semaphore_units<ExceptionFactory, Clock>{ sem, units };
661template<
typename ExceptionFactory,
typename Clock>
662future<semaphore_units<ExceptionFactory, Clock>>
663get_units(basic_semaphore<ExceptionFactory, Clock>& sem,
size_t units,
typename basic_semaphore<ExceptionFactory, Clock>::duration timeout)
noexcept {
664 return sem.wait(timeout, units).then([&sem, units] {
665 return semaphore_units<ExceptionFactory, Clock>{ sem, units };
685template<
typename ExceptionFactory,
typename Clock>
686future<semaphore_units<ExceptionFactory, Clock>>
687get_units(basic_semaphore<ExceptionFactory, Clock>& sem,
size_t units, abort_source& as)
noexcept {
688 return sem.wait(as, units).then([&sem, units] {
689 return semaphore_units<ExceptionFactory, Clock>{ sem, units };
710template<typename ExceptionFactory, typename Clock = typename timer<>::clock>
711std::optional<semaphore_units<ExceptionFactory, Clock>>
712try_get_units(basic_semaphore<ExceptionFactory, Clock>& sem,
size_t units)
noexcept {
713 if (!sem.try_wait(units)) {
716 return std::make_optional<semaphore_units<ExceptionFactory, Clock>>(sem, units);
731template<typename ExceptionFactory, typename Clock = typename timer<>::clock>
732semaphore_units<ExceptionFactory, Clock>
759template <typename ExceptionFactory, typename Func, typename Clock = typename timer<>::clock>
761futurize_t<std::invoke_result_t<Func>>
762with_semaphore(basic_semaphore<ExceptionFactory, Clock>& sem,
size_t units, Func&& func)
noexcept {
763 return get_units(sem, units).then([func = std::forward<Func>(func)] (
auto units)
mutable {
764 return futurize_invoke(std::forward<Func>(func)).finally([units = std::move(units)] {});
793template <
typename ExceptionFactory,
typename Clock,
typename Func>
795futurize_t<std::invoke_result_t<Func>>
796with_semaphore(basic_semaphore<ExceptionFactory, Clock>& sem,
size_t units,
typename basic_semaphore<ExceptionFactory, Clock>::duration timeout, Func&& func)
noexcept {
797 return get_units(sem, units, timeout).then([func = std::forward<Func>(func)] (
auto units)
mutable {
798 return futurize_invoke(std::forward<Func>(func)).finally([units = std::move(units)] {});
807SEASTAR_MODULE_EXPORT_END
Definition: abort_source.hh:48
Definition: abort_source.hh:58
Counted resource guard.
Definition: semaphore.hh:154
basic_semaphore(size_t count) noexcept(std::is_nothrow_default_constructible_v< exception_factory >)
Definition: semaphore.hh:240
future wait(abort_source &as, size_t nr=1) noexcept
Definition: semaphore.hh:351
static constexpr size_t max_counter() noexcept
Returns the maximum number of units the semaphore counter can hold.
Definition: semaphore.hh:231
size_t current() const noexcept
Definition: semaphore.hh:446
future wait(size_t nr=1) noexcept
Definition: semaphore.hh:299
void ensure_space_for_waiters(size_t n)
Reserve memory for waiters so that wait() will not throw.
Definition: semaphore.hh:488
future wait(time_point timeout, size_t nr=1) noexcept
Definition: semaphore.hh:315
future wait(duration timeout, size_t nr=1) noexcept
Definition: semaphore.hh:384
basic_semaphore & operator=(basic_semaphore &&other) noexcept(std::is_nothrow_move_assignable_v< exception_factory >)
Definition: semaphore.hh:276
void consume(size_t nr=1) noexcept
Consume the specific number of units without blocking.
Definition: semaphore.hh:416
ssize_t available_units() const noexcept
Definition: semaphore.hh:452
size_t waiters() const noexcept
Returns the current number of waiters.
Definition: semaphore.hh:455
bool try_wait(size_t nr=1) noexcept
Definition: semaphore.hh:434
void signal(size_t nr=1) noexcept
Definition: semaphore.hh:396
void broken(const Exception &ex) noexcept
Definition: semaphore.hh:478
basic_semaphore(basic_semaphore &&other) noexcept(std::is_nothrow_move_constructible_v< exception_factory >)
Definition: semaphore.hh:258
void broken() noexcept
Definition: semaphore.hh:460
Definition: semaphore.hh:111
virtual const char * what() const noexcept
Reports the exception reason.
Definition: semaphore.hh:68
virtual const char * what() const noexcept
Reports the exception reason.
A representation of a possibly not-yet-computed value.
Definition: future.hh:1240
Definition: semaphore.hh:118
virtual const char * what() const noexcept
Reports the exception reason.
Definition: semaphore.hh:104
virtual const char * what() const noexcept
Reports the exception reason.
Definition: semaphore.hh:88
virtual const char * what() const noexcept
Reports the exception reason.
Definition: semaphore.hh:78
virtual const char * what() const noexcept
Reports the exception reason.
Definition: semaphore.hh:511
void adopt(semaphore_units &&other) noexcept
Definition: semaphore.hh:581
size_t count() const noexcept
Returns the number of units held.
Definition: semaphore.hh:587
semaphore_units split(size_t units)
Definition: semaphore.hh:569
size_t release() noexcept
Definition: semaphore.hh:558
size_t return_units(size_t units)
Definition: semaphore.hh:540
void return_all() noexcept
Return ownership of all units. The semaphore will be signaled by the number of units returned.
Definition: semaphore.hh:549
Definition: timed_out_error.hh:34
semaphore_units< ExceptionFactory, Clock > consume_units(basic_semaphore< ExceptionFactory, Clock > &sem, size_t units) noexcept
Consume units from semaphore temporarily.
Definition: semaphore.hh:733
void broken(std::exception_ptr ex) noexcept
Definition: semaphore.hh:497
future< T > make_exception_future(std::exception_ptr &&value) noexcept
Creates a future in an available, failed state.
Definition: future.hh:1949
future now()
Returns a ready future.
Definition: later.hh:35
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
Definition: semaphore.hh:127
Definition: semaphore.hh:98