25#include <boost/intrusive/list.hpp>
31#include <seastar/core/future.hh>
32#include <seastar/core/coroutine.hh>
34#include <seastar/core/loop.hh>
35#include <seastar/util/modules.hh>
39SEASTAR_MODULE_EXPORT_BEGIN
49 virtual const char*
what() const noexcept;
57 virtual const char*
what() const noexcept;
77 struct waiter :
public boost::intrusive::list_base_hook<boost::intrusive::link_mode<boost::intrusive::auto_unlink>> {
79 waiter(waiter&&) =
default;
80 waiter(
const waiter&) =
delete;
81 waiter& operator=(
const waiter&) =
delete;
82 virtual ~waiter() =
default;
83 void timeout()
noexcept;
85 virtual void signal()
noexcept = 0;
86 virtual void set_exception(std::exception_ptr)
noexcept = 0;
89 struct promise_waiter :
public waiter,
public promise<> {
90 void signal()
noexcept override {
97 void set_exception(std::exception_ptr ep)
noexcept override {
104 struct [[nodiscard("must co_await a when() call")]] awaiter :
public waiter {
112 void signal()
noexcept override {
115 void set_exception(std::exception_ptr ep)
noexcept override {
118 auto operator co_await() {
119 if (_cv->check_and_consume_signal()) {
120 return ::seastar::internal::awaiter<false, void>(make_ready_future<>());
122 _cv->add_waiter(*
this);
123 return ::seastar::internal::awaiter<false, void>(_p.
get_future());
127 template<
typename Clock,
typename Duration>
128 struct [[nodiscard("must co_await a when() call")]] timeout_awaiter :
public awaiter,
public timer<Clock> {
129 using my_type = timeout_awaiter<Clock, Duration>;
130 using time_point = std::chrono::time_point<Clock, Duration>;
138 void signal()
noexcept override {
142 void set_exception(std::exception_ptr ep)
noexcept override {
144 awaiter::set_exception(std::move(ep));
146 auto operator co_await() {
147 if (_cv->check_and_consume_signal()) {
148 return ::seastar::internal::awaiter<false, void>(make_ready_future<>());
150 this->set_callback(std::bind(&waiter::timeout,
this));
152 return awaiter::operator
co_await();
156 template<
typename Func,
typename Base>
157 struct [[nodiscard("must co_await a when() call")]] predicate_awaiter :
public Base {
159 template<
typename... Args>
160 predicate_awaiter(Func func, Args&& ...args)
161 : Base(std::forward<Args>(args)...)
162 , _func(std::move(func))
164 void signal()
noexcept override {
173 Base::_cv->add_waiter(*
this);
176 Base::set_exception(std::current_exception());
179 auto operator co_await() {
182 return ::seastar::internal::awaiter<false, void>(make_ready_future<>());
184 Base::_cv->check_and_consume_signal();
185 return Base::operator
co_await();
193 boost::intrusive::list<waiter, boost::intrusive::constant_time_size<false>> _waiters;
194 std::exception_ptr _ex;
195 bool _signalled =
false;
197 void add_waiter(waiter&)
noexcept;
198 void timeout(waiter&)
noexcept;
199 bool wakeup_first()
noexcept;
200 bool check_and_consume_signal()
noexcept;
215 if (check_and_consume_signal()) {
218 auto* w =
new promise_waiter;
219 auto f = w->get_future();
231 template<typename Clock = typename timer<>::clock,
typename Duration =
typename Clock::duration>
232 future<> wait(std::chrono::time_point<Clock, Duration> timeout)
noexcept {
233 if (check_and_consume_signal()) {
236 struct timeout_waiter :
public promise_waiter,
public timer<Clock> {};
238 auto w = std::make_unique<timeout_waiter>();
239 auto f = w->get_future();
241 w->set_callback(std::bind(&waiter::timeout, w.get()));
243 add_waiter(*w.release());
254 template<
typename Rep,
typename Period>
266 template<
typename Pred>
267 requires std::is_invocable_r_v<bool, Pred>
269 return do_until(std::forward<Pred>(pred), [
this] {
283 template<typename Clock = typename timer<>::clock,
typename Duration =
typename Clock::duration,
typename Pred>
284 requires std::is_invocable_r_v<bool, Pred>
285 future<> wait(std::chrono::time_point<Clock, Duration> timeout, Pred&& pred)
noexcept {
286 return do_until(std::forward<Pred>(pred), [
this, timeout] {
287 return wait(timeout);
300 template<
typename Rep,
typename Period,
typename Pred>
301 requires std::is_invocable_r_v<bool, Pred>
302 future<> wait(std::chrono::duration<Rep, Period> timeout, Pred&& pred)
noexcept {
313 return awaiter{
this};
324 template<typename Clock = typename timer<>::clock,
typename Duration =
typename Clock::duration>
325 timeout_awaiter<Clock, Duration>
when(std::chrono::time_point<Clock, Duration> timeout)
noexcept {
326 return timeout_awaiter<Clock, Duration>{
this, timeout};
337 template<
typename Rep,
typename Period>
338 auto when(std::chrono::duration<Rep, Period> timeout)
noexcept {
350 template<
typename Pred>
351 requires std::is_invocable_r_v<bool, Pred>
352 auto when(Pred&& pred)
noexcept {
353 return predicate_awaiter<Pred, awaiter>{std::forward<Pred>(pred), when()};
366 template<typename Clock = typename timer<>::clock,
typename Duration =
typename Clock::duration,
typename Pred>
367 requires std::is_invocable_r_v<bool, Pred>
368 auto when(std::chrono::time_point<Clock, Duration> timeout, Pred&& pred)
noexcept {
369 return predicate_awaiter<Pred, timeout_awaiter<Clock, Duration>>{std::forward<Pred>(pred), when(timeout)};
382 template<
typename Rep,
typename Period,
typename Pred>
383 requires std::is_invocable_r_v<bool, Pred>
384 auto when(std::chrono::duration<Rep, Period> timeout, Pred&& pred)
noexcept {
391 return !_waiters.empty();
398 void broadcast() noexcept;
403 void broken() noexcept;
405 void broken(
std::exception_ptr) noexcept;
410SEASTAR_MODULE_EXPORT_END
Definition: condition-variable.hh:46
virtual const char * what() const noexcept
Reports the exception reason.
Definition: condition-variable.hh:54
virtual const char * what() const noexcept
Reports the exception reason.
Conditional variable.
Definition: condition-variable.hh:73
timeout_awaiter< Clock, Duration > when(std::chrono::time_point< Clock, Duration > timeout) noexcept
Definition: condition-variable.hh:325
auto when(std::chrono::duration< Rep, Period > timeout, Pred &&pred) noexcept
Definition: condition-variable.hh:384
future wait(Pred &&pred) noexcept
Definition: condition-variable.hh:268
bool has_waiters() const noexcept
Definition: condition-variable.hh:390
awaiter when() noexcept
Definition: condition-variable.hh:312
future wait(std::chrono::time_point< Clock, Duration > timeout) noexcept
Definition: condition-variable.hh:232
auto when(Pred &&pred) noexcept
Definition: condition-variable.hh:352
void signal() noexcept
Notify variable and wake up a waiter if there is one.
auto when(std::chrono::duration< Rep, Period > timeout) noexcept
Definition: condition-variable.hh:338
future wait(std::chrono::duration< Rep, Period > timeout, Pred &&pred) noexcept
Definition: condition-variable.hh:302
auto when(std::chrono::time_point< Clock, Duration > timeout, Pred &&pred) noexcept
Definition: condition-variable.hh:368
future wait(std::chrono::time_point< Clock, Duration > timeout, Pred &&pred) noexcept
Definition: condition-variable.hh:285
future wait(std::chrono::duration< Rep, Period > timeout) noexcept
Definition: condition-variable.hh:255
condition_variable() noexcept=default
A representation of a possibly not-yet-computed value.
Definition: future.hh:1219
promise - allows a future value to be made available at a later time.
Definition: future.hh:913
void set_value(A &&... a) noexcept
Sets the promises value.
Definition: future.hh:969
void set_exception(std::exception_ptr &&ex) noexcept
Marks the promise as failed.
Definition: future.hh:977
future< T > make_ready_future(A &&... value) noexcept
Creates a future in an available, value state.
Definition: future.hh:1922
future< T > get_future() noexcept
Gets the promise's associated future.
Definition: future.hh:1905
future< T > make_exception_future(std::exception_ptr &&value) noexcept
Creates a future in an available, failed state.
Definition: future.hh:1928
future do_until(StopCondition stop_cond, AsyncAction action) noexcept
Definition: loop.hh:339
Seastar API namespace.
Definition: abort_on_ebadf.hh:26