24 #include <seastar/core/future.hh>
25 #include <seastar/core/chunked_fifo.hh>
30 #include <seastar/core/abortable_fifo.hh>
31 #include <seastar/core/timed_out_error.hh>
32 #include <seastar/core/abort_on_expiry.hh>
40 template <
typename U> constexpr
static bool check(decltype(&U::broken)) {
return true; }
41 template <
typename U> constexpr
static bool check(...) {
return false; }
44 constexpr
static bool value = check<T>(
nullptr);
49 template <
typename U> constexpr
static bool check(decltype(&U::aborted)) {
return true; }
50 template <
typename U> constexpr
static bool check(...) {
return false; }
53 constexpr
static bool value = check<T>(
nullptr);
65 virtual const char*
what() const noexcept;
75 virtual const char*
what() const noexcept;
85 virtual const char*
what() const noexcept;
102 virtual const char*
what() const noexcept;
109 virtual const char*
what() const noexcept;
116 virtual const char*
what() const noexcept;
147 template<typename ExceptionFactory, typename Clock = typename timer<>::clock>
153 using exception_factory = ExceptionFactory;
156 std::exception_ptr _ex;
160 std::optional<abort_on_expiry<clock>>
timer;
161 entry(
promise<>&& pr_,
size_t nr_) noexcept : pr(std::move(pr_)), nr(nr_) {}
163 struct expiry_handler {
165 void operator()(entry& e) noexcept {
168 e.pr.set_exception(sem.timeout());
172 }
else if (sem._ex) {
173 e.pr.set_exception(sem._ex);
175 if constexpr (internal::has_aborted<exception_factory>::value) {
177 e.pr.set_exception(
static_cast<exception_factory
>(sem).aborted());
187 internal::abortable_fifo<entry, expiry_handler> _wait_list;
188 bool has_available_units(
size_t nr)
const noexcept {
189 return _count >= 0 && (
static_cast<size_t>(_count) >= nr);
191 bool may_proceed(
size_t nr)
const noexcept {
192 return has_available_units(nr) && _wait_list.empty();
197 return std::numeric_limits<decltype(_count)>::max();
205 basic_semaphore(
size_t count) noexcept(std::is_nothrow_default_constructible_v<exception_factory>)
206 : exception_factory()
208 _wait_list(expiry_handler{*this})
210 basic_semaphore(
size_t count, exception_factory&& factory) noexcept(std::is_nothrow_move_constructible_v<exception_factory>)
211 : exception_factory(std::move(factory))
213 , _wait_list(expiry_handler{*this})
215 static_assert(std::is_nothrow_move_constructible_v<expiry_handler>);
228 return wait(time_point::max(), nr);
244 if (may_proceed(nr)) {
246 return make_ready_future<>();
252 entry& e = _wait_list.emplace_back(
promise<>(), nr);
253 auto f = e.pr.get_future();
254 if (timeout != time_point::max()) {
255 e.timer.emplace(timeout);
257 _wait_list.make_back_abortable(as);
279 if (may_proceed(nr)) {
281 return make_ready_future<>();
287 entry& e = _wait_list.emplace_back(
promise<>(), nr);
289 auto f = e.pr.get_future();
290 _wait_list.make_back_abortable(as);
327 while (!_wait_list.empty() && has_available_units(_wait_list.front().nr)) {
328 auto& x = _wait_list.front();
331 _wait_list.pop_front();
360 if (may_proceed(nr)) {
370 size_t current() const noexcept {
return std::max(_count, ssize_t(0)); }
379 size_t waiters() const noexcept {
return _wait_list.size(); }
385 std::exception_ptr ep;
386 if constexpr (internal::has_broken<exception_factory>::value) {
388 ep = std::make_exception_ptr(exception_factory::broken());
395 broken(std::move(ep));
401 template <
typename Exception>
402 void broken(
const Exception& ex) noexcept {
403 broken(std::make_exception_ptr(ex));
409 void broken(std::exception_ptr ex) noexcept;
413 _wait_list.reserve(n);
417 template<
typename ExceptionFactory,
typename Clock>
421 static_assert(std::is_nothrow_copy_constructible_v<std::exception_ptr>);
424 while (!_wait_list.empty()) {
425 auto& x = _wait_list.front();
426 x.pr.set_exception(xp);
427 _wait_list.pop_front();
431 template<typename ExceptionFactory = semaphore_default_exception_factory, typename Clock = typename timer<>::clock>
444 _n = std::exchange(o._n, 0);
460 throw std::invalid_argument(
"Cannot take more units than those protected by the semaphore");
477 return std::exchange(_n, 0);
489 throw std::invalid_argument(
"Cannot take more units than those protected by the semaphore");
500 assert(other._sem == _sem);
501 _n += other.release();
510 explicit operator bool() const noexcept {
532 template<typename ExceptionFactory, typename Clock = typename timer<>::clock>
533 future<semaphore_units<ExceptionFactory, Clock>>
534 get_units(basic_semaphore<ExceptionFactory, Clock>& sem,
size_t units) noexcept {
535 return sem.wait(units).then([&sem, units] {
536 return semaphore_units<ExceptionFactory, Clock>{ sem, units };
555 template<typename ExceptionFactory, typename Clock = typename timer<>::clock>
556 future<semaphore_units<ExceptionFactory, Clock>>
557 get_units(basic_semaphore<ExceptionFactory, Clock>& sem,
size_t units,
typename basic_semaphore<ExceptionFactory, Clock>::time_point timeout) noexcept {
558 return sem.wait(timeout, units).then([&sem, units] {
559 return semaphore_units<ExceptionFactory, Clock>{ sem, units };
579 template<
typename ExceptionFactory,
typename Clock>
580 future<semaphore_units<ExceptionFactory, Clock>>
581 get_units(basic_semaphore<ExceptionFactory, Clock>& sem,
size_t units,
typename basic_semaphore<ExceptionFactory, Clock>::duration timeout) noexcept {
582 return sem.wait(timeout, units).then([&sem, units] {
583 return semaphore_units<ExceptionFactory, Clock>{ sem, units };
603 template<
typename ExceptionFactory,
typename Clock>
604 future<semaphore_units<ExceptionFactory, Clock>>
605 get_units(basic_semaphore<ExceptionFactory, Clock>& sem,
size_t units, abort_source& as) noexcept {
606 return sem.wait(as, units).then([&sem, units] {
607 return semaphore_units<ExceptionFactory, Clock>{ sem, units };
628 template<typename ExceptionFactory, typename Clock = typename timer<>::clock>
629 std::optional<semaphore_units<ExceptionFactory, Clock>>
630 try_get_units(basic_semaphore<ExceptionFactory, Clock>& sem,
size_t units) noexcept {
631 if (!sem.try_wait(units)) {
634 return std::make_optional<semaphore_units<ExceptionFactory, Clock>>(sem, units);
649 template<typename ExceptionFactory, typename Clock = typename timer<>::clock>
650 semaphore_units<ExceptionFactory, Clock>
677 template <typename ExceptionFactory, typename Func, typename Clock = typename timer<>::clock>
679 futurize_t<std::invoke_result_t<Func>>
680 with_semaphore(basic_semaphore<ExceptionFactory, Clock>& sem,
size_t units, Func&& func) noexcept {
681 return get_units(sem, units).then([func = std::forward<Func>(func)] (
auto units)
mutable {
682 return futurize_invoke(std::forward<Func>(func)).finally([units = std::move(units)] {});
711 template <
typename ExceptionFactory,
typename Clock,
typename Func>
713 futurize_t<std::invoke_result_t<Func>>
714 with_semaphore(basic_semaphore<ExceptionFactory, Clock>& sem,
size_t units,
typename basic_semaphore<ExceptionFactory, Clock>::duration timeout, Func&& func) noexcept {
715 return get_units(sem, units, timeout).then([func = std::forward<Func>(func)] (
auto units)
mutable {
716 return futurize_invoke(std::forward<Func>(func)).finally([units = std::move(units)] {});
Definition: abort_source.hh:41
Definition: abort_source.hh:51
Counted resource guard.
Definition: semaphore.hh:148
basic_semaphore(size_t count) noexcept(std::is_nothrow_default_constructible_v< exception_factory >)
Definition: semaphore.hh:205
future wait(abort_source &as, size_t nr=1) noexcept
Definition: semaphore.hh:278
static constexpr size_t max_counter() noexcept
Returns the maximum number of units the semaphore counter can hold.
Definition: semaphore.hh:196
size_t current() const noexcept
Definition: semaphore.hh:370
future wait(size_t nr=1) noexcept
Definition: semaphore.hh:227
void ensure_space_for_waiters(size_t n)
Reserve memory for waiters so that wait() will not throw.
Definition: semaphore.hh:412
future wait(time_point timeout, size_t nr=1) noexcept
Definition: semaphore.hh:243
future wait(duration timeout, size_t nr=1) noexcept
Definition: semaphore.hh:310
void consume(size_t nr=1) noexcept
Consume the specific number of units without blocking.
Definition: semaphore.hh:342
ssize_t available_units() const noexcept
Definition: semaphore.hh:376
size_t waiters() const noexcept
Returns the current number of waiters.
Definition: semaphore.hh:379
bool try_wait(size_t nr=1) noexcept
Definition: semaphore.hh:359
void signal(size_t nr=1) noexcept
Definition: semaphore.hh:322
void broken(const Exception &ex) noexcept
Definition: semaphore.hh:402
void broken() noexcept
Definition: semaphore.hh:384
Definition: semaphore.hh:105
virtual const char * what() const noexcept
Reports the exception reason.
Definition: semaphore.hh:62
virtual const char * what() const noexcept
Reports the exception reason.
Definition: semaphore.hh:112
virtual const char * what() const noexcept
Reports the exception reason.
Definition: semaphore.hh:98
virtual const char * what() const noexcept
Reports the exception reason.
Definition: semaphore.hh:82
virtual const char * what() const noexcept
Reports the exception reason.
Definition: semaphore.hh:72
virtual const char * what() const noexcept
Reports the exception reason.
Definition: semaphore.hh:432
void adopt(semaphore_units &&other) noexcept
Definition: semaphore.hh:499
size_t count() const noexcept
Returns the number of units held.
Definition: semaphore.hh:505
semaphore_units split(size_t units)
Definition: semaphore.hh:487
size_t release() noexcept
Definition: semaphore.hh:476
size_t return_units(size_t units)
Definition: semaphore.hh:458
void return_all() noexcept
Return ownership of all units. The semaphore will be signaled by the number of units returned.
Definition: semaphore.hh:467
Definition: timed_out_error.hh:29
semaphore_units< ExceptionFactory, Clock > consume_units(basic_semaphore< ExceptionFactory, Clock > &sem, size_t units) noexcept
Consume units from semaphore temporarily.
Definition: semaphore.hh:651
void broken(std::exception_ptr ex) noexcept
Definition: semaphore.hh:420
future< T... > make_exception_future(std::exception_ptr &&value) noexcept
Creates a future in an available, failed state.
Definition: future.hh:2068
future now()
Returns a ready future.
Definition: later.hh:35
Seastar API namespace.
Definition: abort_on_ebadf.hh:24
Definition: semaphore.hh:121
Definition: semaphore.hh:92