Seastar
High performance C++ framework for concurrent servers
semaphore.hh
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18/*
19 * Copyright (C) 2014 Cloudius Systems, Ltd.
20 */
21
22#pragma once
23
24#include "future.hh"
25#include <seastar/core/future.hh>
26#include <seastar/core/chunked_fifo.hh>
27#include <seastar/core/timer.hh>
28#include <seastar/core/abortable_fifo.hh>
29#include <seastar/core/timed_out_error.hh>
30#include <seastar/core/abort_on_expiry.hh>
31#include <seastar/util/modules.hh>
32#ifndef SEASTAR_MODULE
33#include <cassert>
34#include <exception>
35#include <optional>
36#include <stdexcept>
37#include <utility>
38#endif
39
40namespace seastar {
41
42namespace internal {
43// Test if a class T has member function broken()
44template <typename T>
45class has_broken {
46 template <typename U> constexpr static bool check(decltype(&U::broken)) { return true; }
47 template <typename U> constexpr static bool check(...) { return false; }
48
49public:
50 constexpr static bool value = check<T>(nullptr);
51};
52// Test if a class T has member function aborted()
53template <typename T>
54class has_aborted {
55 template <typename U> constexpr static bool check(decltype(&U::aborted)) { return true; }
56 template <typename U> constexpr static bool check(...) { return false; }
57
58public:
59 constexpr static bool value = check<T>(nullptr);
60};
61}
62
65SEASTAR_MODULE_EXPORT_BEGIN
68class broken_semaphore : public std::exception {
69public:
71 virtual const char* what() const noexcept;
72};
73
79public:
81 virtual const char* what() const noexcept;
82};
83
89public:
91 virtual const char* what() const noexcept;
92};
93
99 static semaphore_timed_out timeout() noexcept;
100 static broken_semaphore broken() noexcept;
101 static semaphore_aborted aborted() noexcept;
102};
103
105 sstring _msg;
106public:
107 named_semaphore_timed_out(std::string_view msg) noexcept;
108 virtual const char* what() const noexcept;
109};
110
112 sstring _msg;
113public:
114 broken_named_semaphore(std::string_view msg) noexcept;
115 virtual const char* what() const noexcept;
116};
117
119 sstring _msg;
120public:
121 named_semaphore_aborted(std::string_view msg) noexcept;
122 virtual const char* what() const noexcept;
123};
124
125// A factory of semaphore exceptions that contain additional context: the semaphore name
126// auto sem = named_semaphore(0, named_semaphore_exception_factory{"file_opening_limit_semaphore"});
128 sstring name;
129 named_semaphore_timed_out timeout() const noexcept;
130 broken_named_semaphore broken() const noexcept;
131 named_semaphore_aborted aborted() const noexcept;
132};
133
153template<typename ExceptionFactory, typename Clock = typename timer<>::clock>
154class basic_semaphore : private ExceptionFactory {
155public:
156 using duration = typename timer<Clock>::duration;
157 using clock = typename timer<Clock>::clock;
158 using time_point = typename timer<Clock>::time_point;
159 using exception_factory = ExceptionFactory;
160private:
161 ssize_t _count;
162 std::exception_ptr _ex;
163 struct entry {
164 promise<> pr;
165 size_t nr;
166 std::optional<abort_on_expiry<clock>> timer;
167 entry(promise<>&& pr_, size_t nr_) noexcept : pr(std::move(pr_)), nr(nr_) {}
168 };
169 struct expiry_handler {
170 basic_semaphore& sem;
171 void operator()(entry& e) noexcept {
172 if (e.timer) {
173 try {
174 e.pr.set_exception(sem.timeout());
175 } catch (...) {
176 e.pr.set_exception(semaphore_timed_out());
177 }
178 } else if (sem._ex) {
179 e.pr.set_exception(sem._ex);
180 } else {
181 if constexpr (internal::has_aborted<exception_factory>::value) {
182 try {
183 e.pr.set_exception(static_cast<exception_factory>(sem).aborted());
184 } catch (...) {
185 e.pr.set_exception(semaphore_aborted());
186 }
187 } else {
188 e.pr.set_exception(semaphore_aborted());
189 }
190 }
191 }
192 };
193 internal::abortable_fifo<entry, expiry_handler> _wait_list;
194
195#ifdef SEASTAR_SEMAPHORE_DEBUG
196 struct used_flag {
197 // set to true from the wait path
198 // prevents the semaphore from being moved or move-reassigned when _used
199 bool _used = false;
200
201 used_flag() = default;
202 used_flag(used_flag&& o) noexcept {
203 assert(!_used && "semaphore cannot be moved after it has been used");
204 }
205 used_flag& operator=(used_flag&& o) noexcept {
206 if (this != &o) {
207 assert(!_used && !o._used && "semaphore cannot be moved after it has been used");
208 }
209 return *this;
210 }
211 void use() noexcept {
212 _used = true;
213 }
214 };
215#else
216 struct used_flag {
217 void use() noexcept {}
218 };
219#endif
220
221 [[no_unique_address]] used_flag _used;
222
223 bool has_available_units(size_t nr) const noexcept {
224 return _count >= 0 && (static_cast<size_t>(_count) >= nr);
225 }
226 bool may_proceed(size_t nr) const noexcept {
227 return has_available_units(nr) && _wait_list.empty();
228 }
229public:
231 static constexpr size_t max_counter() noexcept {
232 return std::numeric_limits<decltype(_count)>::max();
233 }
234
240 basic_semaphore(size_t count) noexcept(std::is_nothrow_default_constructible_v<exception_factory>)
241 : exception_factory()
242 , _count(count),
243 _wait_list(expiry_handler{*this})
244 {}
245 basic_semaphore(size_t count, exception_factory&& factory) noexcept(std::is_nothrow_move_constructible_v<exception_factory>)
246 : exception_factory(std::move(factory))
247 , _count(count)
248 , _wait_list(expiry_handler{*this})
249 {
250 static_assert(std::is_nothrow_move_constructible_v<expiry_handler>);
251 }
252
258 basic_semaphore(basic_semaphore&& other) noexcept(std::is_nothrow_move_constructible_v<exception_factory>)
259 : exception_factory(other)
260 , _count(other._count)
261 , _ex(std::exchange(other._ex, std::exception_ptr()))
262 , _wait_list(expiry_handler{*this})
263 , _used(std::move(other._used))
264 {
265 // semaphore cannot be moved with non-empty waiting list
266 assert(other._wait_list.empty());
267 }
268
276 basic_semaphore& operator=(basic_semaphore&& other) noexcept(std::is_nothrow_move_assignable_v<exception_factory>) {
277 // semaphore cannot be moved with non-empty waiting list
278 assert(_wait_list.empty());
279 assert(other._wait_list.empty());
280 if (this != &other) {
281 exception_factory::operator=(other);
282 _count = other._count;
283 _ex = std::exchange(other._ex, std::exception_ptr());
284 _used = std::move(other._used);
285 }
286 return *this;
287 }
288
299 future<> wait(size_t nr = 1) noexcept {
300 return wait(time_point::max(), nr);
301 }
315 future<> wait(time_point timeout, size_t nr = 1) noexcept {
316 _used.use();
317 if (may_proceed(nr)) {
318 _count -= nr;
319 return make_ready_future<>();
320 }
321 if (_ex) {
322 return make_exception_future(_ex);
323 }
324 try {
325 entry& e = _wait_list.emplace_back(promise<>(), nr);
326 auto f = e.pr.get_future();
327 if (timeout != time_point::max()) {
328 e.timer.emplace(timeout);
329 abort_source& as = e.timer->abort_source();
330 _wait_list.make_back_abortable(as);
331 }
332 return f;
333 } catch (...) {
334 return make_exception_future(std::current_exception());
335 }
336 }
337
351 future<> wait(abort_source& as, size_t nr = 1) noexcept {
352 _used.use();
353 if (may_proceed(nr)) {
354 _count -= nr;
355 return make_ready_future<>();
356 }
357 if (_ex) {
358 return make_exception_future(_ex);
359 }
360 try {
361 entry& e = _wait_list.emplace_back(promise<>(), nr);
362 // taking future here since make_back_abortable may expire the entry
363 auto f = e.pr.get_future();
364 _wait_list.make_back_abortable(as);
365 return f;
366 } catch (...) {
367 return make_exception_future(std::current_exception());
368 }
369 }
370
384 future<> wait(duration timeout, size_t nr = 1) noexcept {
385 return wait(clock::now() + timeout, nr);
386 }
396 void signal(size_t nr = 1) noexcept {
397 if (_ex) {
398 return;
399 }
400 _count += nr;
401 while (!_wait_list.empty() && has_available_units(_wait_list.front().nr)) {
402 auto& x = _wait_list.front();
403 _count -= x.nr;
404 x.pr.set_value();
405 _wait_list.pop_front();
406 }
407 }
408
410 //
416 void consume(size_t nr = 1) noexcept {
417 _used.use();
418 if (_ex) {
419 return;
420 }
421 _count -= nr;
422 }
423
434 bool try_wait(size_t nr = 1) noexcept {
435 _used.use();
436 if (may_proceed(nr)) {
437 _count -= nr;
438 return true;
439 } else {
440 return false;
441 }
442 }
446 size_t current() const noexcept { return std::max(_count, ssize_t(0)); }
447
452 ssize_t available_units() const noexcept { return _count; }
453
455 size_t waiters() const noexcept { return _wait_list.size(); }
456
460 void broken() noexcept {
461 std::exception_ptr ep;
462 if constexpr (internal::has_broken<exception_factory>::value) {
463 try {
464 ep = std::make_exception_ptr(exception_factory::broken());
465 } catch (...) {
466 ep = std::make_exception_ptr(broken_semaphore());
467 }
468 } else {
469 ep = std::make_exception_ptr(broken_semaphore());
470 }
471 broken(std::move(ep));
472 }
473
477 template <typename Exception>
478 void broken(const Exception& ex) noexcept {
479 broken(std::make_exception_ptr(ex));
480 }
481
485 void broken(std::exception_ptr ex) noexcept;
486
489 _wait_list.reserve(n);
490 }
491};
492SEASTAR_MODULE_EXPORT_END
493
494template<typename ExceptionFactory, typename Clock>
495inline
496void
498 static_assert(std::is_nothrow_copy_constructible_v<std::exception_ptr>);
499 _ex = xp;
500 _count = 0;
501 while (!_wait_list.empty()) {
502 auto& x = _wait_list.front();
503 x.pr.set_exception(xp);
504 _wait_list.pop_front();
505 }
506}
507
508SEASTAR_MODULE_EXPORT_BEGIN
509
510template<typename ExceptionFactory = semaphore_default_exception_factory, typename Clock = typename timer<>::clock>
513 size_t _n;
514
515 semaphore_units(basic_semaphore<ExceptionFactory, Clock>* sem, size_t n) noexcept : _sem(sem), _n(n) {}
516public:
517 semaphore_units() noexcept : semaphore_units(nullptr, 0) {}
519 semaphore_units(semaphore_units&& o) noexcept : _sem(o._sem), _n(std::exchange(o._n, 0)) {
520 }
521 semaphore_units& operator=(semaphore_units&& o) noexcept {
522 if (this != &o) {
523 return_all();
524 _sem = o._sem;
525 _n = std::exchange(o._n, 0);
526 }
527 return *this;
528 }
529 semaphore_units(const semaphore_units&) = delete;
530 ~semaphore_units() noexcept {
531 return_all();
532 }
540 size_t return_units(size_t units) {
541 if (units > _n) {
542 throw std::invalid_argument("Cannot take more units than those protected by the semaphore");
543 }
544 _n -= units;
545 _sem->signal(units);
546 return _n;
547 }
549 void return_all() noexcept {
550 if (_n) {
551 _sem->signal(_n);
552 _n = 0;
553 }
554 }
558 size_t release() noexcept {
559 return std::exchange(_n, 0);
560 }
569 semaphore_units split(size_t units) {
570 if (units > _n) {
571 throw std::invalid_argument("Cannot take more units than those protected by the semaphore");
572 }
573 _n -= units;
574 return semaphore_units(_sem, units);
575 }
581 void adopt(semaphore_units&& other) noexcept {
582 assert(other._sem == _sem);
583 _n += other.release();
584 }
585
587 size_t count() const noexcept {
588 return _n;
589 }
590
592 explicit operator bool() const noexcept {
593 return _n != 0;
594 }
595};
596
614template<typename ExceptionFactory, typename Clock = typename timer<>::clock>
615future<semaphore_units<ExceptionFactory, Clock>>
616get_units(basic_semaphore<ExceptionFactory, Clock>& sem, size_t units) noexcept {
617 return sem.wait(units).then([&sem, units] {
618 return semaphore_units<ExceptionFactory, Clock>{ sem, units };
619 });
620}
621
637template<typename ExceptionFactory, typename Clock = typename timer<>::clock>
638future<semaphore_units<ExceptionFactory, Clock>>
639get_units(basic_semaphore<ExceptionFactory, Clock>& sem, size_t units, typename basic_semaphore<ExceptionFactory, Clock>::time_point timeout) noexcept {
640 return sem.wait(timeout, units).then([&sem, units] {
641 return semaphore_units<ExceptionFactory, Clock>{ sem, units };
642 });
643}
644
661template<typename ExceptionFactory, typename Clock>
662future<semaphore_units<ExceptionFactory, Clock>>
663get_units(basic_semaphore<ExceptionFactory, Clock>& sem, size_t units, typename basic_semaphore<ExceptionFactory, Clock>::duration timeout) noexcept {
664 return sem.wait(timeout, units).then([&sem, units] {
665 return semaphore_units<ExceptionFactory, Clock>{ sem, units };
666 });
667}
668
685template<typename ExceptionFactory, typename Clock>
686future<semaphore_units<ExceptionFactory, Clock>>
687get_units(basic_semaphore<ExceptionFactory, Clock>& sem, size_t units, abort_source& as) noexcept {
688 return sem.wait(as, units).then([&sem, units] {
689 return semaphore_units<ExceptionFactory, Clock>{ sem, units };
690 });
691}
692
710template<typename ExceptionFactory, typename Clock = typename timer<>::clock>
711std::optional<semaphore_units<ExceptionFactory, Clock>>
712try_get_units(basic_semaphore<ExceptionFactory, Clock>& sem, size_t units) noexcept {
713 if (!sem.try_wait(units)) {
714 return std::nullopt;
715 }
716 return std::make_optional<semaphore_units<ExceptionFactory, Clock>>(sem, units);
717}
718
731template<typename ExceptionFactory, typename Clock = typename timer<>::clock>
732semaphore_units<ExceptionFactory, Clock>
734 sem.consume(units);
735 return semaphore_units<ExceptionFactory, Clock>{ sem, units };
736}
737
759template <typename ExceptionFactory, typename Func, typename Clock = typename timer<>::clock>
760inline
761futurize_t<std::invoke_result_t<Func>>
762with_semaphore(basic_semaphore<ExceptionFactory, Clock>& sem, size_t units, Func&& func) noexcept {
763 return get_units(sem, units).then([func = std::forward<Func>(func)] (auto units) mutable {
764 return futurize_invoke(std::forward<Func>(func)).finally([units = std::move(units)] {});
765 });
766}
767
793template <typename ExceptionFactory, typename Clock, typename Func>
794inline
795futurize_t<std::invoke_result_t<Func>>
796with_semaphore(basic_semaphore<ExceptionFactory, Clock>& sem, size_t units, typename basic_semaphore<ExceptionFactory, Clock>::duration timeout, Func&& func) noexcept {
797 return get_units(sem, units, timeout).then([func = std::forward<Func>(func)] (auto units) mutable {
798 return futurize_invoke(std::forward<Func>(func)).finally([units = std::move(units)] {});
799 });
800}
801
806
807SEASTAR_MODULE_EXPORT_END
808
810
811}
Definition: abort_source.hh:48
Definition: abort_source.hh:58
Counted resource guard.
Definition: semaphore.hh:154
basic_semaphore(size_t count) noexcept(std::is_nothrow_default_constructible_v< exception_factory >)
Definition: semaphore.hh:240
future wait(abort_source &as, size_t nr=1) noexcept
Definition: semaphore.hh:351
static constexpr size_t max_counter() noexcept
Returns the maximum number of units the semaphore counter can hold.
Definition: semaphore.hh:231
size_t current() const noexcept
Definition: semaphore.hh:446
future wait(size_t nr=1) noexcept
Definition: semaphore.hh:299
void ensure_space_for_waiters(size_t n)
Reserve memory for waiters so that wait() will not throw.
Definition: semaphore.hh:488
future wait(time_point timeout, size_t nr=1) noexcept
Definition: semaphore.hh:315
future wait(duration timeout, size_t nr=1) noexcept
Definition: semaphore.hh:384
basic_semaphore & operator=(basic_semaphore &&other) noexcept(std::is_nothrow_move_assignable_v< exception_factory >)
Definition: semaphore.hh:276
void consume(size_t nr=1) noexcept
Consume the specific number of units without blocking.
Definition: semaphore.hh:416
ssize_t available_units() const noexcept
Definition: semaphore.hh:452
size_t waiters() const noexcept
Returns the current number of waiters.
Definition: semaphore.hh:455
bool try_wait(size_t nr=1) noexcept
Definition: semaphore.hh:434
void signal(size_t nr=1) noexcept
Definition: semaphore.hh:396
void broken(const Exception &ex) noexcept
Definition: semaphore.hh:478
basic_semaphore(basic_semaphore &&other) noexcept(std::is_nothrow_move_constructible_v< exception_factory >)
Definition: semaphore.hh:258
void broken() noexcept
Definition: semaphore.hh:460
Definition: semaphore.hh:111
virtual const char * what() const noexcept
Reports the exception reason.
Definition: semaphore.hh:68
virtual const char * what() const noexcept
Reports the exception reason.
A representation of a possibly not-yet-computed value.
Definition: future.hh:1240
Definition: semaphore.hh:118
virtual const char * what() const noexcept
Reports the exception reason.
Definition: semaphore.hh:104
virtual const char * what() const noexcept
Reports the exception reason.
Definition: semaphore.hh:88
virtual const char * what() const noexcept
Reports the exception reason.
Definition: semaphore.hh:78
virtual const char * what() const noexcept
Reports the exception reason.
Definition: semaphore.hh:511
void adopt(semaphore_units &&other) noexcept
Definition: semaphore.hh:581
size_t count() const noexcept
Returns the number of units held.
Definition: semaphore.hh:587
semaphore_units split(size_t units)
Definition: semaphore.hh:569
size_t release() noexcept
Definition: semaphore.hh:558
size_t return_units(size_t units)
Definition: semaphore.hh:540
void return_all() noexcept
Return ownership of all units. The semaphore will be signaled by the number of units returned.
Definition: semaphore.hh:549
Definition: timed_out_error.hh:34
Definition: timer.hh:83
semaphore_units< ExceptionFactory, Clock > consume_units(basic_semaphore< ExceptionFactory, Clock > &sem, size_t units) noexcept
Consume units from semaphore temporarily.
Definition: semaphore.hh:733
void broken(std::exception_ptr ex) noexcept
Definition: semaphore.hh:497
future< T > make_exception_future(std::exception_ptr &&value) noexcept
Creates a future in an available, failed state.
Definition: future.hh:1949
future now()
Returns a ready future.
Definition: later.hh:35
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
Definition: semaphore.hh:127