Seastar
High performance C++ framework for concurrent servers
condition-variable.hh
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18/*
19 * Copyright (C) 2016 ScyllaDB, Ltd.
20 */
21
22#pragma once
23
24#ifndef SEASTAR_MODULE
25#include <boost/intrusive/list.hpp>
26#include <chrono>
27#include <exception>
28#include <functional>
29#endif
30
31#include <seastar/core/future.hh>
32#include <seastar/core/coroutine.hh>
33#include <seastar/core/timer.hh>
34#include <seastar/core/loop.hh>
35#include <seastar/util/modules.hh>
36
37namespace seastar {
38
39SEASTAR_MODULE_EXPORT_BEGIN
40
43
46class broken_condition_variable : public std::exception {
47public:
49 virtual const char* what() const noexcept;
50};
51
54class condition_variable_timed_out : public std::exception {
55public:
57 virtual const char* what() const noexcept;
58};
59
72
74private:
75 // the base for queue waiters. looks complicated, but this is
76 // to make it transparent once we add non-promise based nodes
77 struct waiter : public boost::intrusive::list_base_hook<boost::intrusive::link_mode<boost::intrusive::auto_unlink>> {
78 waiter() = default;
79 waiter(waiter&&) = default;
80 waiter(const waiter&) = delete;
81 waiter& operator=(const waiter&) = delete;
82 virtual ~waiter() = default;
83 void timeout() noexcept;
84
85 virtual void signal() noexcept = 0;
86 virtual void set_exception(std::exception_ptr) noexcept = 0;
87 };
88
89 struct promise_waiter : public waiter, public promise<> {
90 void signal() noexcept override {
91 set_value();
92 // note: we self-delete in either case we are woken
93 // up. See usage below: only the resulting future
94 // state is required once we've left the wait queue
95 delete this;
96 }
97 void set_exception(std::exception_ptr ep) noexcept override {
98 promise<>::set_exception(std::move(ep));
99 // see comment above
100 delete this;
101 }
102 };
103
104 struct [[nodiscard("must co_await a when() call")]] awaiter : public waiter {
106 promise<> _p;
107
108 awaiter(condition_variable* cv)
109 : _cv(cv)
110 {}
111
112 void signal() noexcept override {
113 _p.set_value();
114 }
115 void set_exception(std::exception_ptr ep) noexcept override {
116 _p.set_exception(std::move(ep));
117 }
118 auto operator co_await() {
119 if (_cv->check_and_consume_signal()) {
120 return ::seastar::internal::awaiter<false, void>(make_ready_future<>());
121 }
122 _cv->add_waiter(*this);
123 return ::seastar::internal::awaiter<false, void>(_p.get_future());
124 }
125 };
126
127 template<typename Clock, typename Duration>
128 struct [[nodiscard("must co_await a when() call")]] timeout_awaiter : public awaiter, public timer<Clock> {
129 using my_type = timeout_awaiter<Clock, Duration>;
130 using time_point = std::chrono::time_point<Clock, Duration>;
131
132 time_point _timeout;
133
134 timeout_awaiter(condition_variable* cv, time_point timeout)
135 : awaiter(cv)
136 , _timeout(timeout)
137 {}
138 void signal() noexcept override {
139 this->cancel();
140 awaiter::signal();
141 }
142 void set_exception(std::exception_ptr ep) noexcept override {
143 this->cancel();
144 awaiter::set_exception(std::move(ep));
145 }
146 auto operator co_await() {
147 if (_cv->check_and_consume_signal()) {
148 return ::seastar::internal::awaiter<false, void>(make_ready_future<>());
149 }
150 this->set_callback(std::bind(&waiter::timeout, this));
151 this->arm(_timeout);
152 return awaiter::operator co_await();
153 }
154 };
155
156 template<typename Func, typename Base>
157 struct [[nodiscard("must co_await a when() call")]] predicate_awaiter : public Base {
158 Func _func;
159 template<typename... Args>
160 predicate_awaiter(Func func, Args&& ...args)
161 : Base(std::forward<Args>(args)...)
162 , _func(std::move(func))
163 {}
164 void signal() noexcept override {
165 try {
166 if (_func()) {
167 Base::signal();
168 } else {
169 // must re-enter waiter queue
170 // this maintains "wait" version
171 // semantics of moving to back of queue
172 // if predicate fails
173 Base::_cv->add_waiter(*this);
174 }
175 } catch(...) {
176 Base::set_exception(std::current_exception());
177 }
178 }
179 auto operator co_await() {
180 try {
181 if (_func()) {
182 return ::seastar::internal::awaiter<false, void>(make_ready_future<>());
183 } else {
184 Base::_cv->check_and_consume_signal(); // clear out any signal state
185 return Base::operator co_await();
186 }
187 } catch (...) {
188 return ::seastar::internal::awaiter<false, void>(make_exception_future(std::current_exception()));
189 }
190 }
191 };
192
193 boost::intrusive::list<waiter, boost::intrusive::constant_time_size<false>> _waiters;
194 std::exception_ptr _ex; //"broken" exception
195 bool _signalled = false; // set to true if signalled while no waiters
196
197 void add_waiter(waiter&) noexcept;
198 void timeout(waiter&) noexcept;
199 bool wakeup_first() noexcept;
200 bool check_and_consume_signal() noexcept;
201public:
205 condition_variable() noexcept = default;
206 condition_variable(condition_variable&& rhs) noexcept = default;
208
214 future<> wait() noexcept {
215 if (check_and_consume_signal()) {
216 return make_ready_future();
217 }
218 auto* w = new promise_waiter;
219 auto f = w->get_future();
220 add_waiter(*w);
221 return f;
222 }
223
231 template<typename Clock = typename timer<>::clock, typename Duration = typename Clock::duration>
232 future<> wait(std::chrono::time_point<Clock, Duration> timeout) noexcept {
233 if (check_and_consume_signal()) {
234 return make_ready_future();
235 }
236 struct timeout_waiter : public promise_waiter, public timer<Clock> {};
237
238 auto w = std::make_unique<timeout_waiter>();
239 auto f = w->get_future();
240
241 w->set_callback(std::bind(&waiter::timeout, w.get()));
242 w->arm(timeout);
243 add_waiter(*w.release());
244 return f;
245 }
246
254 template<typename Rep, typename Period>
255 future<> wait(std::chrono::duration<Rep, Period> timeout) noexcept {
256 return wait(timer<>::clock::now() + timeout);
257 }
258
266 template<typename Pred>
267 requires std::is_invocable_r_v<bool, Pred>
268 future<> wait(Pred&& pred) noexcept {
269 return do_until(std::forward<Pred>(pred), [this] {
270 return wait();
271 });
272 }
273
283 template<typename Clock = typename timer<>::clock, typename Duration = typename Clock::duration, typename Pred>
284 requires std::is_invocable_r_v<bool, Pred>
285 future<> wait(std::chrono::time_point<Clock, Duration> timeout, Pred&& pred) noexcept {
286 return do_until(std::forward<Pred>(pred), [this, timeout] {
287 return wait(timeout);
288 });
289 }
290
300 template<typename Rep, typename Period, typename Pred>
301 requires std::is_invocable_r_v<bool, Pred>
302 future<> wait(std::chrono::duration<Rep, Period> timeout, Pred&& pred) noexcept {
303 return wait(timer<>::clock::now() + timeout, std::forward<Pred>(pred));
304 }
305
312 awaiter when() noexcept {
313 return awaiter{this};
314 }
315
324 template<typename Clock = typename timer<>::clock, typename Duration = typename Clock::duration>
325 timeout_awaiter<Clock, Duration> when(std::chrono::time_point<Clock, Duration> timeout) noexcept {
326 return timeout_awaiter<Clock, Duration>{this, timeout};
327 }
328
337 template<typename Rep, typename Period>
338 auto when(std::chrono::duration<Rep, Period> timeout) noexcept {
339 return when(timer<>::clock::now() + timeout);
340 }
341
350 template<typename Pred>
351 requires std::is_invocable_r_v<bool, Pred>
352 auto when(Pred&& pred) noexcept {
353 return predicate_awaiter<Pred, awaiter>{std::forward<Pred>(pred), when()};
354 }
355
366 template<typename Clock = typename timer<>::clock, typename Duration = typename Clock::duration, typename Pred>
367 requires std::is_invocable_r_v<bool, Pred>
368 auto when(std::chrono::time_point<Clock, Duration> timeout, Pred&& pred) noexcept {
369 return predicate_awaiter<Pred, timeout_awaiter<Clock, Duration>>{std::forward<Pred>(pred), when(timeout)};
370 }
371
382 template<typename Rep, typename Period, typename Pred>
383 requires std::is_invocable_r_v<bool, Pred>
384 auto when(std::chrono::duration<Rep, Period> timeout, Pred&& pred) noexcept {
385 return when(timer<>::clock::now() + timeout, std::forward<Pred>(pred));
386 }
387
390 bool has_waiters() const noexcept {
391 return !_waiters.empty();
392 }
393
395 void signal() noexcept;
396
398 void broadcast() noexcept;
399
403 void broken() noexcept;
404
405 void broken(std::exception_ptr) noexcept;
406};
407
409
410SEASTAR_MODULE_EXPORT_END
411
412}
Definition: condition-variable.hh:46
virtual const char * what() const noexcept
Reports the exception reason.
Definition: condition-variable.hh:54
virtual const char * what() const noexcept
Reports the exception reason.
Conditional variable.
Definition: condition-variable.hh:73
timeout_awaiter< Clock, Duration > when(std::chrono::time_point< Clock, Duration > timeout) noexcept
Definition: condition-variable.hh:325
auto when(std::chrono::duration< Rep, Period > timeout, Pred &&pred) noexcept
Definition: condition-variable.hh:384
future wait(Pred &&pred) noexcept
Definition: condition-variable.hh:268
bool has_waiters() const noexcept
Definition: condition-variable.hh:390
awaiter when() noexcept
Definition: condition-variable.hh:312
future wait(std::chrono::time_point< Clock, Duration > timeout) noexcept
Definition: condition-variable.hh:232
auto when(Pred &&pred) noexcept
Definition: condition-variable.hh:352
void signal() noexcept
Notify variable and wake up a waiter if there is one.
auto when(std::chrono::duration< Rep, Period > timeout) noexcept
Definition: condition-variable.hh:338
future wait(std::chrono::duration< Rep, Period > timeout, Pred &&pred) noexcept
Definition: condition-variable.hh:302
auto when(std::chrono::time_point< Clock, Duration > timeout, Pred &&pred) noexcept
Definition: condition-variable.hh:368
future wait(std::chrono::time_point< Clock, Duration > timeout, Pred &&pred) noexcept
Definition: condition-variable.hh:285
future wait(std::chrono::duration< Rep, Period > timeout) noexcept
Definition: condition-variable.hh:255
condition_variable() noexcept=default
A representation of a possibly not-yet-computed value.
Definition: future.hh:1240
promise - allows a future value to be made available at a later time.
Definition: future.hh:934
void set_value(A &&... a) noexcept
Sets the promises value.
Definition: future.hh:990
void set_exception(std::exception_ptr &&ex) noexcept
Marks the promise as failed.
Definition: future.hh:998
Definition: timer.hh:83
future< T > make_ready_future(A &&... value) noexcept
Creates a future in an available, value state.
Definition: future.hh:1943
future< T > get_future() noexcept
Gets the promise's associated future.
Definition: future.hh:1926
future< T > make_exception_future(std::exception_ptr &&value) noexcept
Creates a future in an available, failed state.
Definition: future.hh:1949
future do_until(StopCondition stop_cond, AsyncAction action) noexcept
Definition: loop.hh:339
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
STL namespace.