Seastar
High performance C++ framework for concurrent servers
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Modules Pages
semaphore.hh
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18/*
19 * Copyright (C) 2014 Cloudius Systems, Ltd.
20 */
21
22#pragma once
23
24#include "future.hh"
25#include <seastar/core/future.hh>
26#include <seastar/core/chunked_fifo.hh>
27#include <seastar/core/timer.hh>
28#include <seastar/core/abortable_fifo.hh>
29#include <seastar/core/timed_out_error.hh>
30#include <seastar/core/abort_on_expiry.hh>
31#include <seastar/util/modules.hh>
32#ifndef SEASTAR_MODULE
33#include <cassert>
34#include <exception>
35#include <optional>
36#include <stdexcept>
37#include <utility>
38#endif
39
40namespace seastar {
41
42namespace internal {
43// Test if a class T has member function broken()
44template <typename T>
45class has_broken {
46 template <typename U> constexpr static bool check(decltype(&U::broken)) { return true; }
47 template <typename U> constexpr static bool check(...) { return false; }
48
49public:
50 constexpr static bool value = check<T>(nullptr);
51};
52// Test if a class T has member function aborted()
53template <typename T>
54class has_aborted {
55 template <typename U> constexpr static bool check(decltype(&U::aborted)) { return true; }
56 template <typename U> constexpr static bool check(...) { return false; }
57
58public:
59 constexpr static bool value = check<T>(nullptr);
60};
61}
62
65SEASTAR_MODULE_EXPORT_BEGIN
68class broken_semaphore : public std::exception {
69public:
71 virtual const char* what() const noexcept;
72};
73
79public:
81 virtual const char* what() const noexcept;
82};
83
89public:
91 virtual const char* what() const noexcept;
92};
93
99 static semaphore_timed_out timeout() noexcept;
100 static broken_semaphore broken() noexcept;
101 static semaphore_aborted aborted() noexcept;
102};
103
105 sstring _msg;
106public:
107 named_semaphore_timed_out(std::string_view msg) noexcept;
108 virtual const char* what() const noexcept;
109};
110
112 sstring _msg;
113public:
114 broken_named_semaphore(std::string_view msg) noexcept;
115 virtual const char* what() const noexcept;
116};
117
119 sstring _msg;
120public:
121 named_semaphore_aborted(std::string_view msg) noexcept;
122 virtual const char* what() const noexcept;
123};
124
125// A factory of semaphore exceptions that contain additional context: the semaphore name
126// auto sem = named_semaphore(0, named_semaphore_exception_factory{"file_opening_limit_semaphore"});
128 sstring name;
129 named_semaphore_timed_out timeout() const noexcept;
130 broken_named_semaphore broken() const noexcept;
131 named_semaphore_aborted aborted() const noexcept;
132};
133
153template<typename ExceptionFactory, typename Clock = typename timer<>::clock>
154class basic_semaphore : private ExceptionFactory {
155public:
156 using duration = typename timer<Clock>::duration;
157 using clock = typename timer<Clock>::clock;
158 using time_point = typename timer<Clock>::time_point;
159 using exception_factory = ExceptionFactory;
160private:
161 ssize_t _count;
162 std::exception_ptr _ex;
163 struct entry {
164 promise<> pr;
165 size_t nr;
166 std::optional<abort_on_expiry<clock>> timer;
167 entry(promise<>&& pr_, size_t nr_) noexcept : pr(std::move(pr_)), nr(nr_) {}
168 };
169 std::exception_ptr get_timeout_exception() {
170 try {
171 return std::make_exception_ptr(this->timeout());
172 } catch (...) {
173 return std::make_exception_ptr(semaphore_timed_out());
174 }
175 }
176 std::exception_ptr get_aborted_exception() {
177 if constexpr (internal::has_aborted<exception_factory>::value) {
178 try {
179 return std::make_exception_ptr(this->aborted());
180 } catch (...) {
181 return std::make_exception_ptr(semaphore_aborted());
182 }
183 } else {
184 return std::make_exception_ptr(semaphore_aborted());
185 }
186 }
187 struct expiry_handler {
188 basic_semaphore& sem;
189 void operator()(entry& e, const std::optional<std::exception_ptr>& ex) noexcept {
190 if (e.timer) {
191 e.pr.set_exception(sem.get_timeout_exception());
192 } else if (ex) {
193 e.pr.set_exception(*ex);
194 } else if (sem._ex) {
195 e.pr.set_exception(sem._ex);
196 } else {
197 e.pr.set_exception(sem.get_aborted_exception());
198 }
199 }
200 };
201 internal::abortable_fifo<entry, expiry_handler> _wait_list;
202
203#ifdef SEASTAR_SEMAPHORE_DEBUG
204 struct used_flag {
205 // set to true from the wait path
206 // prevents the semaphore from being moved or move-reassigned when _used
207 bool _used = false;
208
209 used_flag() = default;
210 used_flag(used_flag&& o) noexcept {
211 assert(!_used && "semaphore cannot be moved after it has been used");
212 }
213 used_flag& operator=(used_flag&& o) noexcept {
214 if (this != &o) {
215 assert(!_used && !o._used && "semaphore cannot be moved after it has been used");
216 }
217 return *this;
218 }
219 void use() noexcept {
220 _used = true;
221 }
222 };
223#else
224 struct used_flag {
225 void use() noexcept {}
226 };
227#endif
228
229 [[no_unique_address]] used_flag _used;
230
231 bool has_available_units(size_t nr) const noexcept {
232 return _count >= 0 && (static_cast<size_t>(_count) >= nr);
233 }
234 bool may_proceed(size_t nr) const noexcept {
235 return has_available_units(nr) && _wait_list.empty();
236 }
237public:
239 static constexpr size_t max_counter() noexcept {
240 return std::numeric_limits<decltype(_count)>::max();
241 }
242
248 basic_semaphore(size_t count) noexcept(std::is_nothrow_default_constructible_v<exception_factory>)
249 : exception_factory()
250 , _count(count),
251 _wait_list(expiry_handler{*this})
252 {}
253 basic_semaphore(size_t count, exception_factory&& factory) noexcept(std::is_nothrow_move_constructible_v<exception_factory>)
254 : exception_factory(std::move(factory))
255 , _count(count)
256 , _wait_list(expiry_handler{*this})
257 {
258 static_assert(std::is_nothrow_move_constructible_v<expiry_handler>);
259 }
260
266 basic_semaphore(basic_semaphore&& other) noexcept(std::is_nothrow_move_constructible_v<exception_factory>)
267 : exception_factory(other)
268 , _count(other._count)
269 , _ex(std::exchange(other._ex, std::exception_ptr()))
270 , _wait_list(expiry_handler{*this})
271 , _used(std::move(other._used))
272 {
273 // semaphore cannot be moved with non-empty waiting list
274 assert(other._wait_list.empty());
275 }
276
284 basic_semaphore& operator=(basic_semaphore&& other) noexcept(std::is_nothrow_move_assignable_v<exception_factory>) {
285 // semaphore cannot be moved with non-empty waiting list
286 assert(_wait_list.empty());
287 assert(other._wait_list.empty());
288 if (this != &other) {
289 exception_factory::operator=(other);
290 _count = other._count;
291 _ex = std::exchange(other._ex, std::exception_ptr());
292 _used = std::move(other._used);
293 }
294 return *this;
295 }
296
307 future<> wait(size_t nr = 1) noexcept {
308 return wait(time_point::max(), nr);
309 }
323 future<> wait(time_point timeout, size_t nr = 1) noexcept {
324 _used.use();
325 if (may_proceed(nr)) {
326 _count -= nr;
327 return make_ready_future<>();
328 }
329 if (_ex) {
330 return make_exception_future(_ex);
331 }
332 if (Clock::now() >= timeout) [[unlikely]] {
333 return make_exception_future(get_timeout_exception());
334 }
335 try {
336 entry& e = _wait_list.emplace_back(promise<>(), nr);
337 auto f = e.pr.get_future();
338 if (timeout != time_point::max()) {
339 e.timer.emplace(timeout);
340 abort_source& as = e.timer->abort_source();
341 _wait_list.make_back_abortable(as);
342 }
343 return f;
344 } catch (...) {
345 return make_exception_future(std::current_exception());
346 }
347 }
348
362 future<> wait(abort_source& as, size_t nr = 1) noexcept {
363 _used.use();
364 if (may_proceed(nr)) {
365 _count -= nr;
366 return make_ready_future<>();
367 }
368 if (_ex) {
369 return make_exception_future(_ex);
370 }
371 if (as.abort_requested()) [[unlikely]] {
372 return make_exception_future(get_aborted_exception());
373 }
374 try {
375 entry& e = _wait_list.emplace_back(promise<>(), nr);
376 // taking future here since make_back_abortable may expire the entry
377 auto f = e.pr.get_future();
378 _wait_list.make_back_abortable(as);
379 return f;
380 } catch (...) {
381 return make_exception_future(std::current_exception());
382 }
383 }
384
398 future<> wait(duration timeout, size_t nr = 1) noexcept {
399 return wait(clock::now() + timeout, nr);
400 }
410 void signal(size_t nr = 1) noexcept {
411 if (_ex) {
412 return;
413 }
414 _count += nr;
415 while (!_wait_list.empty() && has_available_units(_wait_list.front().nr)) {
416 auto& x = _wait_list.front();
417 _count -= x.nr;
418 x.pr.set_value();
419 _wait_list.pop_front();
420 }
421 }
422
424 //
430 void consume(size_t nr = 1) noexcept {
431 _used.use();
432 if (_ex) {
433 return;
434 }
435 _count -= nr;
436 }
437
448 bool try_wait(size_t nr = 1) noexcept {
449 _used.use();
450 if (may_proceed(nr)) {
451 _count -= nr;
452 return true;
453 } else {
454 return false;
455 }
456 }
460 size_t current() const noexcept { return std::max(_count, ssize_t(0)); }
461
466 ssize_t available_units() const noexcept { return _count; }
467
469 size_t waiters() const noexcept { return _wait_list.size(); }
470
474 void broken() noexcept {
475 std::exception_ptr ep;
476 if constexpr (internal::has_broken<exception_factory>::value) {
477 try {
478 ep = std::make_exception_ptr(exception_factory::broken());
479 } catch (...) {
480 ep = std::make_exception_ptr(broken_semaphore());
481 }
482 } else {
483 ep = std::make_exception_ptr(broken_semaphore());
484 }
485 broken(std::move(ep));
486 }
487
491 template <typename Exception>
492 void broken(const Exception& ex) noexcept {
493 broken(std::make_exception_ptr(ex));
494 }
495
499 void broken(std::exception_ptr ex) noexcept;
500
503 _wait_list.reserve(n);
504 }
505};
506SEASTAR_MODULE_EXPORT_END
507
508template<typename ExceptionFactory, typename Clock>
509inline
510void
512 static_assert(std::is_nothrow_copy_constructible_v<std::exception_ptr>);
513 _ex = xp;
514 _count = 0;
515 while (!_wait_list.empty()) {
516 auto& x = _wait_list.front();
517 x.pr.set_exception(xp);
518 _wait_list.pop_front();
519 }
520}
521
522SEASTAR_MODULE_EXPORT_BEGIN
523
524template<typename ExceptionFactory = semaphore_default_exception_factory, typename Clock = typename timer<>::clock>
527 size_t _n;
528
529 semaphore_units(basic_semaphore<ExceptionFactory, Clock>* sem, size_t n) noexcept : _sem(sem), _n(n) {}
530public:
531 semaphore_units() noexcept : semaphore_units(nullptr, 0) {}
533 semaphore_units(semaphore_units&& o) noexcept : _sem(o._sem), _n(std::exchange(o._n, 0)) {
534 }
535 semaphore_units& operator=(semaphore_units&& o) noexcept {
536 if (this != &o) {
537 return_all();
538 _sem = o._sem;
539 _n = std::exchange(o._n, 0);
540 }
541 return *this;
542 }
543 semaphore_units(const semaphore_units&) = delete;
544 ~semaphore_units() noexcept {
545 return_all();
546 }
554 size_t return_units(size_t units) {
555 if (units > _n) {
556 throw std::invalid_argument("Cannot take more units than those protected by the semaphore");
557 }
558 _n -= units;
559 _sem->signal(units);
560 return _n;
561 }
563 void return_all() noexcept {
564 if (_n) {
565 _sem->signal(_n);
566 _n = 0;
567 }
568 }
572 size_t release() noexcept {
573 return std::exchange(_n, 0);
574 }
583 semaphore_units split(size_t units) {
584 if (units > _n) {
585 throw std::invalid_argument("Cannot take more units than those protected by the semaphore");
586 }
587 _n -= units;
588 return semaphore_units(_sem, units);
589 }
595 void adopt(semaphore_units&& other) noexcept {
596 assert(other._sem == _sem);
597 _n += other.release();
598 }
599
601 size_t count() const noexcept {
602 return _n;
603 }
604
606 explicit operator bool() const noexcept {
607 return _n != 0;
608 }
609};
610
628template<typename ExceptionFactory, typename Clock = typename timer<>::clock>
629future<semaphore_units<ExceptionFactory, Clock>>
630get_units(basic_semaphore<ExceptionFactory, Clock>& sem, size_t units) noexcept {
631 return sem.wait(units).then([&sem, units] {
632 return semaphore_units<ExceptionFactory, Clock>{ sem, units };
633 });
634}
635
651template<typename ExceptionFactory, typename Clock = typename timer<>::clock>
652future<semaphore_units<ExceptionFactory, Clock>>
653get_units(basic_semaphore<ExceptionFactory, Clock>& sem, size_t units, typename basic_semaphore<ExceptionFactory, Clock>::time_point timeout) noexcept {
654 return sem.wait(timeout, units).then([&sem, units] {
655 return semaphore_units<ExceptionFactory, Clock>{ sem, units };
656 });
657}
658
675template<typename ExceptionFactory, typename Clock>
676future<semaphore_units<ExceptionFactory, Clock>>
677get_units(basic_semaphore<ExceptionFactory, Clock>& sem, size_t units, typename basic_semaphore<ExceptionFactory, Clock>::duration timeout) noexcept {
678 return sem.wait(timeout, units).then([&sem, units] {
679 return semaphore_units<ExceptionFactory, Clock>{ sem, units };
680 });
681}
682
699template<typename ExceptionFactory, typename Clock>
700future<semaphore_units<ExceptionFactory, Clock>>
701get_units(basic_semaphore<ExceptionFactory, Clock>& sem, size_t units, abort_source& as) noexcept {
702 return sem.wait(as, units).then([&sem, units] {
703 return semaphore_units<ExceptionFactory, Clock>{ sem, units };
704 });
705}
706
724template<typename ExceptionFactory, typename Clock = typename timer<>::clock>
725std::optional<semaphore_units<ExceptionFactory, Clock>>
726try_get_units(basic_semaphore<ExceptionFactory, Clock>& sem, size_t units) noexcept {
727 if (!sem.try_wait(units)) {
728 return std::nullopt;
729 }
730 return std::make_optional<semaphore_units<ExceptionFactory, Clock>>(sem, units);
731}
732
745template<typename ExceptionFactory, typename Clock = typename timer<>::clock>
746semaphore_units<ExceptionFactory, Clock>
748 sem.consume(units);
749 return semaphore_units<ExceptionFactory, Clock>{ sem, units };
750}
751
773template <typename ExceptionFactory, typename Func, typename Clock = typename timer<>::clock>
774inline
775futurize_t<std::invoke_result_t<Func>>
776with_semaphore(basic_semaphore<ExceptionFactory, Clock>& sem, size_t units, Func&& func) noexcept {
777 return get_units(sem, units).then([func = std::forward<Func>(func)] (auto units) mutable {
778 return futurize_invoke(std::forward<Func>(func)).finally([units = std::move(units)] {});
779 });
780}
781
807template <typename ExceptionFactory, typename Clock, typename Func>
808inline
809futurize_t<std::invoke_result_t<Func>>
810with_semaphore(basic_semaphore<ExceptionFactory, Clock>& sem, size_t units, typename basic_semaphore<ExceptionFactory, Clock>::duration timeout, Func&& func) noexcept {
811 return get_units(sem, units, timeout).then([func = std::forward<Func>(func)] (auto units) mutable {
812 return futurize_invoke(std::forward<Func>(func)).finally([units = std::move(units)] {});
813 });
814}
815
820
821SEASTAR_MODULE_EXPORT_END
822
824
825}
Definition: abort_source.hh:48
Definition: abort_source.hh:58
bool abort_requested() const noexcept
Returns whether an abort has been requested.
Definition: abort_source.hh:200
Counted resource guard.
Definition: semaphore.hh:154
basic_semaphore(size_t count) noexcept(std::is_nothrow_default_constructible_v< exception_factory >)
Definition: semaphore.hh:248
future wait(abort_source &as, size_t nr=1) noexcept
Definition: semaphore.hh:362
static constexpr size_t max_counter() noexcept
Returns the maximum number of units the semaphore counter can hold.
Definition: semaphore.hh:239
size_t current() const noexcept
Definition: semaphore.hh:460
future wait(size_t nr=1) noexcept
Definition: semaphore.hh:307
void ensure_space_for_waiters(size_t n)
Reserve memory for waiters so that wait() will not throw.
Definition: semaphore.hh:502
future wait(time_point timeout, size_t nr=1) noexcept
Definition: semaphore.hh:323
future wait(duration timeout, size_t nr=1) noexcept
Definition: semaphore.hh:398
basic_semaphore & operator=(basic_semaphore &&other) noexcept(std::is_nothrow_move_assignable_v< exception_factory >)
Definition: semaphore.hh:284
void consume(size_t nr=1) noexcept
Consume the specific number of units without blocking.
Definition: semaphore.hh:430
ssize_t available_units() const noexcept
Definition: semaphore.hh:466
size_t waiters() const noexcept
Returns the current number of waiters.
Definition: semaphore.hh:469
bool try_wait(size_t nr=1) noexcept
Definition: semaphore.hh:448
void signal(size_t nr=1) noexcept
Definition: semaphore.hh:410
void broken(const Exception &ex) noexcept
Definition: semaphore.hh:492
basic_semaphore(basic_semaphore &&other) noexcept(std::is_nothrow_move_constructible_v< exception_factory >)
Definition: semaphore.hh:266
void broken() noexcept
Definition: semaphore.hh:474
Definition: semaphore.hh:111
virtual const char * what() const noexcept
Reports the exception reason.
Definition: semaphore.hh:68
virtual const char * what() const noexcept
Reports the exception reason.
A representation of a possibly not-yet-computed value.
Definition: future.hh:1219
Definition: semaphore.hh:118
virtual const char * what() const noexcept
Reports the exception reason.
Definition: semaphore.hh:104
virtual const char * what() const noexcept
Reports the exception reason.
Definition: semaphore.hh:88
virtual const char * what() const noexcept
Reports the exception reason.
Definition: semaphore.hh:78
virtual const char * what() const noexcept
Reports the exception reason.
Definition: semaphore.hh:525
void adopt(semaphore_units &&other) noexcept
Definition: semaphore.hh:595
size_t count() const noexcept
Returns the number of units held.
Definition: semaphore.hh:601
semaphore_units split(size_t units)
Definition: semaphore.hh:583
size_t release() noexcept
Definition: semaphore.hh:572
size_t return_units(size_t units)
Definition: semaphore.hh:554
void return_all() noexcept
Return ownership of all units. The semaphore will be signaled by the number of units returned.
Definition: semaphore.hh:563
Definition: timed_out_error.hh:34
Definition: timer.hh:83
semaphore_units< ExceptionFactory, Clock > consume_units(basic_semaphore< ExceptionFactory, Clock > &sem, size_t units) noexcept
Consume units from semaphore temporarily.
Definition: semaphore.hh:747
void broken(std::exception_ptr ex) noexcept
Definition: semaphore.hh:511
future< T > make_exception_future(std::exception_ptr &&value) noexcept
Creates a future in an available, failed state.
Definition: future.hh:1928
future now()
Returns a ready future.
Definition: later.hh:35
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
Definition: semaphore.hh:127