Seastar
High performance C++ framework for concurrent servers
condition-variable.hh
1 /*
2  * This file is open source software, licensed to you under the terms
3  * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4  * distributed with this work for additional information regarding copyright
5  * ownership. You may not use this file except in compliance with the License.
6  *
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an
13  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14  * KIND, either express or implied. See the License for the
15  * specific language governing permissions and limitations
16  * under the License.
17  */
18 /*
19  * Copyright (C) 2016 ScyllaDB, Ltd.
20  */
21 
22 #pragma once
23 
24 #ifndef SEASTAR_MODULE
25 #include <boost/intrusive/list.hpp>
26 #include <chrono>
27 #include <exception>
28 #include <functional>
29 #endif
30 
31 #include <seastar/core/timer.hh>
32 #ifdef SEASTAR_COROUTINES_ENABLED
33 # include <seastar/core/coroutine.hh>
34 #endif
35 #include <seastar/core/loop.hh>
36 #include <seastar/util/modules.hh>
37 
38 namespace seastar {
39 
40 SEASTAR_MODULE_EXPORT_BEGIN
41 
44 
47 class broken_condition_variable : public std::exception {
48 public:
50  virtual const char* what() const noexcept;
51 };
52 
55 class condition_variable_timed_out : public std::exception {
56 public:
58  virtual const char* what() const noexcept;
59 };
60 
73 
75 private:
76  // the base for queue waiters. looks complicated, but this is
77  // to make it transparent once we add non-promise based nodes
78  struct waiter : public boost::intrusive::list_base_hook<boost::intrusive::link_mode<boost::intrusive::auto_unlink>> {
79  waiter() = default;
80  waiter(waiter&&) = default;
81  waiter(const waiter&) = delete;
82  waiter& operator=(const waiter&) = delete;
83  virtual ~waiter() = default;
84  void timeout() noexcept;
85 
86  virtual void signal() noexcept = 0;
87  virtual void set_exception(std::exception_ptr) noexcept = 0;
88  };
89 
90  struct promise_waiter : public waiter, public promise<> {
91  void signal() noexcept override {
92  set_value();
93  // note: we self-delete in either case we are woken
94  // up. See usage below: only the resulting future
95  // state is required once we've left the wait queue
96  delete this;
97  }
98  void set_exception(std::exception_ptr ep) noexcept override {
99  promise<>::set_exception(std::move(ep));
100  // see comment above
101  delete this;
102  }
103  };
104 
105 #ifdef SEASTAR_COROUTINES_ENABLED
106  struct [[nodiscard("must co_await a when() call")]] awaiter : public waiter {
107  condition_variable* _cv;
108  promise<> _p;
109 
110  awaiter(condition_variable* cv)
111  : _cv(cv)
112  {}
113 
114  void signal() noexcept override {
115  _p.set_value();
116  }
117  void set_exception(std::exception_ptr ep) noexcept override {
118  _p.set_exception(std::move(ep));
119  }
120  auto operator co_await() {
121  if (_cv->check_and_consume_signal()) {
122  return ::seastar::internal::awaiter<false, void>(make_ready_future<>());
123  }
124  _cv->add_waiter(*this);
125  return ::seastar::internal::awaiter<false, void>(_p.get_future());
126  }
127  };
128 
129  template<typename Clock, typename Duration>
130  struct [[nodiscard("must co_await a when() call")]] timeout_awaiter : public awaiter, public timer<Clock> {
131  using my_type = timeout_awaiter<Clock, Duration>;
132  using time_point = std::chrono::time_point<Clock, Duration>;
133 
134  time_point _timeout;
135 
136  timeout_awaiter(condition_variable* cv, time_point timeout)
137  : awaiter(cv)
138  , _timeout(timeout)
139  {}
140  void signal() noexcept override {
141  this->cancel();
142  awaiter::signal();
143  }
144  void set_exception(std::exception_ptr ep) noexcept override {
145  this->cancel();
146  awaiter::set_exception(std::move(ep));
147  }
148  auto operator co_await() {
149  if (_cv->check_and_consume_signal()) {
150  return ::seastar::internal::awaiter<false, void>(make_ready_future<>());
151  }
152  this->set_callback(std::bind(&waiter::timeout, this));
153  this->arm(_timeout);
154  return awaiter::operator co_await();
155  }
156  };
157 
158  template<typename Func, typename Base>
159  struct [[nodiscard("must co_await a when() call")]] predicate_awaiter : public Base {
160  Func _func;
161  template<typename... Args>
162  predicate_awaiter(Func func, Args&& ...args)
163  : Base(std::forward<Args>(args)...)
164  , _func(std::move(func))
165  {}
166  void signal() noexcept override {
167  if (_func()) {
168  Base::signal();
169  } else {
170  // must re-enter waiter queue
171  // this maintains "wait" version
172  // semantics of moving to back of queue
173  // if predicate fails
174  Base::_cv->add_waiter(*this);
175  }
176  }
177  auto operator co_await() {
178  if (_func()) {
179  return ::seastar::internal::awaiter<false, void>(make_ready_future<>());
180  } else {
181  Base::_cv->check_and_consume_signal(); // clear out any signal state
182  return Base::operator co_await();
183  }
184  }
185  };
186 #endif
187 
188  boost::intrusive::list<waiter, boost::intrusive::constant_time_size<false>> _waiters;
189  std::exception_ptr _ex; //"broken" exception
190  bool _signalled = false; // set to true if signalled while no waiters
191 
192  void add_waiter(waiter&) noexcept;
193  void timeout(waiter&) noexcept;
194  bool wakeup_first() noexcept;
195  bool check_and_consume_signal() noexcept;
196 public:
200  condition_variable() noexcept = default;
201  condition_variable(condition_variable&& rhs) noexcept = default;
203 
209  future<> wait() noexcept {
210  if (check_and_consume_signal()) {
211  return make_ready_future();
212  }
213  auto* w = new promise_waiter;
214  auto f = w->get_future();
215  add_waiter(*w);
216  return f;
217  }
218 
226  template<typename Clock = typename timer<>::clock, typename Duration = typename Clock::duration>
227  future<> wait(std::chrono::time_point<Clock, Duration> timeout) noexcept {
228  if (check_and_consume_signal()) {
229  return make_ready_future();
230  }
231  struct timeout_waiter : public promise_waiter, public timer<Clock> {};
232 
233  auto w = std::make_unique<timeout_waiter>();
234  auto f = w->get_future();
235 
236  w->set_callback(std::bind(&waiter::timeout, w.get()));
237  w->arm(timeout);
238  add_waiter(*w.release());
239  return f;
240  }
241 
249  template<typename Rep, typename Period>
250  future<> wait(std::chrono::duration<Rep, Period> timeout) noexcept {
251  return wait(timer<>::clock::now() + timeout);
252  }
253 
261  template<typename Pred>
262  SEASTAR_CONCEPT( requires std::is_invocable_r_v<bool, Pred> )
263  future<> wait(Pred&& pred) noexcept {
264  return do_until(std::forward<Pred>(pred), [this] {
265  return wait();
266  });
267  }
268 
278  template<typename Clock = typename timer<>::clock, typename Duration = typename Clock::duration, typename Pred>
279  SEASTAR_CONCEPT( requires std::is_invocable_r_v<bool, Pred> )
280  future<> wait(std::chrono::time_point<Clock, Duration> timeout, Pred&& pred) noexcept {
281  return do_until(std::forward<Pred>(pred), [this, timeout] {
282  return wait(timeout);
283  });
284  }
285 
295  template<typename Rep, typename Period, typename Pred>
296  SEASTAR_CONCEPT( requires std::is_invocable_r_v<bool, Pred> )
297  future<> wait(std::chrono::duration<Rep, Period> timeout, Pred&& pred) noexcept {
298  return wait(timer<>::clock::now() + timeout, std::forward<Pred>(pred));
299  }
300 
301 #ifdef SEASTAR_COROUTINES_ENABLED
308  awaiter when() noexcept {
309  return awaiter{this};
310  }
311 
320  template<typename Clock = typename timer<>::clock, typename Duration = typename Clock::duration>
321  timeout_awaiter<Clock, Duration> when(std::chrono::time_point<Clock, Duration> timeout) noexcept {
322  return timeout_awaiter<Clock, Duration>{this, timeout};
323  }
324 
333  template<typename Rep, typename Period>
334  auto when(std::chrono::duration<Rep, Period> timeout) noexcept {
335  return when(timer<>::clock::now() + timeout);
336  }
337 
346  template<typename Pred>
347  SEASTAR_CONCEPT( requires std::is_invocable_r_v<bool, Pred> )
348  auto when(Pred&& pred) noexcept {
349  return predicate_awaiter<Pred, awaiter>{std::forward<Pred>(pred), when()};
350  }
351 
362  template<typename Clock = typename timer<>::clock, typename Duration = typename Clock::duration, typename Pred>
363  SEASTAR_CONCEPT( requires std::is_invocable_r_v<bool, Pred> )
364  auto when(std::chrono::time_point<Clock, Duration> timeout, Pred&& pred) noexcept {
365  return predicate_awaiter<Pred, timeout_awaiter<Clock, Duration>>{std::forward<Pred>(pred), when(timeout)};
366  }
367 
378  template<typename Rep, typename Period, typename Pred>
379  SEASTAR_CONCEPT( requires std::is_invocable_r_v<bool, Pred> )
380  auto when(std::chrono::duration<Rep, Period> timeout, Pred&& pred) noexcept {
381  return when(timer<>::clock::now() + timeout, std::forward<Pred>(pred));
382  }
383 
384 #endif
385 
388  bool has_waiters() const noexcept {
389  return !_waiters.empty();
390  }
391 
393  void signal() noexcept;
394 
396  void broadcast() noexcept;
397 
401  void broken() noexcept;
402 
403  void broken(std::exception_ptr) noexcept;
404 };
405 
407 
408 SEASTAR_MODULE_EXPORT_END
409 
410 }
Definition: condition-variable.hh:47
virtual const char * what() const noexcept
Reports the exception reason.
Definition: condition-variable.hh:55
virtual const char * what() const noexcept
Reports the exception reason.
Conditional variable.
Definition: condition-variable.hh:74
bool has_waiters() const noexcept
Definition: condition-variable.hh:388
future wait(std::chrono::time_point< Clock, Duration > timeout) noexcept
Definition: condition-variable.hh:227
void signal() noexcept
Notify variable and wake up a waiter if there is one.
future wait(std::chrono::duration< Rep, Period > timeout) noexcept
Definition: condition-variable.hh:250
condition_variable() noexcept=default
A representation of a possibly not-yet-computed value.
Definition: future.hh:1238
promise - allows a future value to be made available at a later time.
Definition: future.hh:926
void set_value(A &&... a) noexcept
Sets the promises value.
Definition: future.hh:982
void set_exception(std::exception_ptr &&ex) noexcept
Marks the promise as failed.
Definition: future.hh:990
Definition: timer.hh:84
future< T > make_ready_future(A &&... value) noexcept
Creates a future in an available, value state.
Definition: future.hh:1934
future< T > get_future() noexcept
Gets the promise's associated future.
Definition: future.hh:1917
future do_until(StopCondition stop_cond, AsyncAction action) noexcept
Definition: loop.hh:339
future now()
Returns a ready future.
Definition: later.hh:35
Seastar API namespace.
Definition: abort_on_ebadf.hh:26