Seastar
High performance C++ framework for concurrent servers
semaphore.hh
1 /*
2  * This file is open source software, licensed to you under the terms
3  * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4  * distributed with this work for additional information regarding copyright
5  * ownership. You may not use this file except in compliance with the License.
6  *
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an
13  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14  * KIND, either express or implied. See the License for the
15  * specific language governing permissions and limitations
16  * under the License.
17  */
18 /*
19  * Copyright (C) 2014 Cloudius Systems, Ltd.
20  */
21 
22 #pragma once
23 
24 #include "future.hh"
25 #include <seastar/core/future.hh>
26 #include <seastar/core/chunked_fifo.hh>
27 #include <seastar/core/timer.hh>
28 #include <seastar/core/abortable_fifo.hh>
29 #include <seastar/core/timed_out_error.hh>
30 #include <seastar/core/abort_on_expiry.hh>
31 #include <seastar/util/modules.hh>
32 #ifndef SEASTAR_MODULE
33 #include <cassert>
34 #include <exception>
35 #include <optional>
36 #include <stdexcept>
37 #include <utility>
38 #endif
39 
40 namespace seastar {
41 
42 namespace internal {
43 // Test if a class T has member function broken()
44 template <typename T>
45 class has_broken {
46  template <typename U> constexpr static bool check(decltype(&U::broken)) { return true; }
47  template <typename U> constexpr static bool check(...) { return false; }
48 
49 public:
50  constexpr static bool value = check<T>(nullptr);
51 };
52 // Test if a class T has member function aborted()
53 template <typename T>
54 class has_aborted {
55  template <typename U> constexpr static bool check(decltype(&U::aborted)) { return true; }
56  template <typename U> constexpr static bool check(...) { return false; }
57 
58 public:
59  constexpr static bool value = check<T>(nullptr);
60 };
61 }
62 
65 SEASTAR_MODULE_EXPORT_BEGIN
68 class broken_semaphore : public std::exception {
69 public:
71  virtual const char* what() const noexcept;
72 };
73 
79 public:
81  virtual const char* what() const noexcept;
82 };
83 
89 public:
91  virtual const char* what() const noexcept;
92 };
93 
99  static semaphore_timed_out timeout() noexcept;
100  static broken_semaphore broken() noexcept;
101  static semaphore_aborted aborted() noexcept;
102 };
103 
105  sstring _msg;
106 public:
107  named_semaphore_timed_out(std::string_view msg) noexcept;
108  virtual const char* what() const noexcept;
109 };
110 
112  sstring _msg;
113 public:
114  broken_named_semaphore(std::string_view msg) noexcept;
115  virtual const char* what() const noexcept;
116 };
117 
119  sstring _msg;
120 public:
121  named_semaphore_aborted(std::string_view msg) noexcept;
122  virtual const char* what() const noexcept;
123 };
124 
125 // A factory of semaphore exceptions that contain additional context: the semaphore name
126 // auto sem = named_semaphore(0, named_semaphore_exception_factory{"file_opening_limit_semaphore"});
128  sstring name;
129  named_semaphore_timed_out timeout() const noexcept;
130  broken_named_semaphore broken() const noexcept;
131  named_semaphore_aborted aborted() const noexcept;
132 };
133 
153 template<typename ExceptionFactory, typename Clock = typename timer<>::clock>
154 class basic_semaphore : private ExceptionFactory {
155 public:
156  using duration = typename timer<Clock>::duration;
157  using clock = typename timer<Clock>::clock;
158  using time_point = typename timer<Clock>::time_point;
159  using exception_factory = ExceptionFactory;
160 private:
161  ssize_t _count;
162  std::exception_ptr _ex;
163  struct entry {
164  promise<> pr;
165  size_t nr;
166  std::optional<abort_on_expiry<clock>> timer;
167  entry(promise<>&& pr_, size_t nr_) noexcept : pr(std::move(pr_)), nr(nr_) {}
168  };
169  struct expiry_handler {
170  basic_semaphore& sem;
171  void operator()(entry& e) noexcept {
172  if (e.timer) {
173  try {
174  e.pr.set_exception(sem.timeout());
175  } catch (...) {
176  e.pr.set_exception(semaphore_timed_out());
177  }
178  } else if (sem._ex) {
179  e.pr.set_exception(sem._ex);
180  } else {
181  if constexpr (internal::has_aborted<exception_factory>::value) {
182  try {
183  e.pr.set_exception(static_cast<exception_factory>(sem).aborted());
184  } catch (...) {
185  e.pr.set_exception(semaphore_aborted());
186  }
187  } else {
188  e.pr.set_exception(semaphore_aborted());
189  }
190  }
191  }
192  };
193  internal::abortable_fifo<entry, expiry_handler> _wait_list;
194 
195 #ifdef SEASTAR_SEMAPHORE_DEBUG
196  struct used_flag {
197  // set to true from the wait path
198  // prevents the semaphore from being moved or move-reassigned when _used
199  bool _used = false;
200 
201  used_flag() = default;
202  used_flag(used_flag&& o) noexcept {
203  assert(!_used && "semaphore cannot be moved after it has been used");
204  }
205  used_flag& operator=(used_flag&& o) noexcept {
206  if (this != &o) {
207  assert(!_used && !o._used && "semaphore cannot be moved after it has been used");
208  }
209  return *this;
210  }
211  void use() noexcept {
212  _used = true;
213  }
214  };
215 #else
216  struct used_flag {
217  void use() noexcept {}
218  };
219 #endif
220 
221  [[no_unique_address]] used_flag _used;
222 
223  bool has_available_units(size_t nr) const noexcept {
224  return _count >= 0 && (static_cast<size_t>(_count) >= nr);
225  }
226  bool may_proceed(size_t nr) const noexcept {
227  return has_available_units(nr) && _wait_list.empty();
228  }
229 public:
231  static constexpr size_t max_counter() noexcept {
232  return std::numeric_limits<decltype(_count)>::max();
233  }
234 
240  basic_semaphore(size_t count) noexcept(std::is_nothrow_default_constructible_v<exception_factory>)
241  : exception_factory()
242  , _count(count),
243  _wait_list(expiry_handler{*this})
244  {}
245  basic_semaphore(size_t count, exception_factory&& factory) noexcept(std::is_nothrow_move_constructible_v<exception_factory>)
246  : exception_factory(std::move(factory))
247  , _count(count)
248  , _wait_list(expiry_handler{*this})
249  {
250  static_assert(std::is_nothrow_move_constructible_v<expiry_handler>);
251  }
252 
258  basic_semaphore(basic_semaphore&& other) noexcept(std::is_nothrow_move_constructible_v<exception_factory>)
259  : exception_factory(other)
260  , _count(other._count)
261  , _ex(std::exchange(other._ex, std::exception_ptr()))
262  , _wait_list(expiry_handler{*this})
263  , _used(std::move(other._used))
264  {
265  // semaphore cannot be moved with non-empty waiting list
266  assert(other._wait_list.empty());
267  }
268 
276  basic_semaphore& operator=(basic_semaphore&& other) noexcept(std::is_nothrow_move_assignable_v<exception_factory>) {
277  // semaphore cannot be moved with non-empty waiting list
278  assert(_wait_list.empty());
279  assert(other._wait_list.empty());
280  if (this != &other) {
281  exception_factory::operator=(other);
282  _count = other._count;
283  _ex = std::exchange(other._ex, std::exception_ptr());
284  _used = std::move(other._used);
285  }
286  return *this;
287  }
288 
299  future<> wait(size_t nr = 1) noexcept {
300  return wait(time_point::max(), nr);
301  }
315  future<> wait(time_point timeout, size_t nr = 1) noexcept {
316  _used.use();
317  if (may_proceed(nr)) {
318  _count -= nr;
319  return make_ready_future<>();
320  }
321  if (_ex) {
322  return make_exception_future(_ex);
323  }
324  try {
325  entry& e = _wait_list.emplace_back(promise<>(), nr);
326  auto f = e.pr.get_future();
327  if (timeout != time_point::max()) {
328  e.timer.emplace(timeout);
329  abort_source& as = e.timer->abort_source();
330  _wait_list.make_back_abortable(as);
331  }
332  return f;
333  } catch (...) {
334  return make_exception_future(std::current_exception());
335  }
336  }
337 
351  future<> wait(abort_source& as, size_t nr = 1) noexcept {
352  _used.use();
353  if (may_proceed(nr)) {
354  _count -= nr;
355  return make_ready_future<>();
356  }
357  if (_ex) {
358  return make_exception_future(_ex);
359  }
360  try {
361  entry& e = _wait_list.emplace_back(promise<>(), nr);
362  // taking future here since make_back_abortable may expire the entry
363  auto f = e.pr.get_future();
364  _wait_list.make_back_abortable(as);
365  return f;
366  } catch (...) {
367  return make_exception_future(std::current_exception());
368  }
369  }
370 
384  future<> wait(duration timeout, size_t nr = 1) noexcept {
385  return wait(clock::now() + timeout, nr);
386  }
396  void signal(size_t nr = 1) noexcept {
397  if (_ex) {
398  return;
399  }
400  _count += nr;
401  while (!_wait_list.empty() && has_available_units(_wait_list.front().nr)) {
402  auto& x = _wait_list.front();
403  _count -= x.nr;
404  x.pr.set_value();
405  _wait_list.pop_front();
406  }
407  }
408 
410  //
416  void consume(size_t nr = 1) noexcept {
417  _used.use();
418  if (_ex) {
419  return;
420  }
421  _count -= nr;
422  }
423 
434  bool try_wait(size_t nr = 1) noexcept {
435  _used.use();
436  if (may_proceed(nr)) {
437  _count -= nr;
438  return true;
439  } else {
440  return false;
441  }
442  }
446  size_t current() const noexcept { return std::max(_count, ssize_t(0)); }
447 
452  ssize_t available_units() const noexcept { return _count; }
453 
455  size_t waiters() const noexcept { return _wait_list.size(); }
456 
460  void broken() noexcept {
461  std::exception_ptr ep;
462  if constexpr (internal::has_broken<exception_factory>::value) {
463  try {
464  ep = std::make_exception_ptr(exception_factory::broken());
465  } catch (...) {
466  ep = std::make_exception_ptr(broken_semaphore());
467  }
468  } else {
469  ep = std::make_exception_ptr(broken_semaphore());
470  }
471  broken(std::move(ep));
472  }
473 
477  template <typename Exception>
478  void broken(const Exception& ex) noexcept {
479  broken(std::make_exception_ptr(ex));
480  }
481 
485  void broken(std::exception_ptr ex) noexcept;
486 
488  void ensure_space_for_waiters(size_t n) {
489  _wait_list.reserve(n);
490  }
491 };
492 SEASTAR_MODULE_EXPORT_END
493 
494 template<typename ExceptionFactory, typename Clock>
495 inline
496 void
497 basic_semaphore<ExceptionFactory, Clock>::broken(std::exception_ptr xp) noexcept {
498  static_assert(std::is_nothrow_copy_constructible_v<std::exception_ptr>);
499  _ex = xp;
500  _count = 0;
501  while (!_wait_list.empty()) {
502  auto& x = _wait_list.front();
503  x.pr.set_exception(xp);
504  _wait_list.pop_front();
505  }
506 }
507 
508 SEASTAR_MODULE_EXPORT_BEGIN
509 
510 template<typename ExceptionFactory = semaphore_default_exception_factory, typename Clock = typename timer<>::clock>
513  size_t _n;
514 
515  semaphore_units(basic_semaphore<ExceptionFactory, Clock>* sem, size_t n) noexcept : _sem(sem), _n(n) {}
516 public:
517  semaphore_units() noexcept : semaphore_units(nullptr, 0) {}
518  semaphore_units(basic_semaphore<ExceptionFactory, Clock>& sem, size_t n) noexcept : semaphore_units(&sem, n) {}
519  semaphore_units(semaphore_units&& o) noexcept : _sem(o._sem), _n(std::exchange(o._n, 0)) {
520  }
521  semaphore_units& operator=(semaphore_units&& o) noexcept {
522  if (this != &o) {
523  return_all();
524  _sem = o._sem;
525  _n = std::exchange(o._n, 0);
526  }
527  return *this;
528  }
529  semaphore_units(const semaphore_units&) = delete;
530  ~semaphore_units() noexcept {
531  return_all();
532  }
540  size_t return_units(size_t units) {
541  if (units > _n) {
542  throw std::invalid_argument("Cannot take more units than those protected by the semaphore");
543  }
544  _n -= units;
545  _sem->signal(units);
546  return _n;
547  }
549  void return_all() noexcept {
550  if (_n) {
551  _sem->signal(_n);
552  _n = 0;
553  }
554  }
558  size_t release() noexcept {
559  return std::exchange(_n, 0);
560  }
569  semaphore_units split(size_t units) {
570  if (units > _n) {
571  throw std::invalid_argument("Cannot take more units than those protected by the semaphore");
572  }
573  _n -= units;
574  return semaphore_units(_sem, units);
575  }
581  void adopt(semaphore_units&& other) noexcept {
582  assert(other._sem == _sem);
583  _n += other.release();
584  }
585 
587  size_t count() const noexcept {
588  return _n;
589  }
590 
592  explicit operator bool() const noexcept {
593  return _n != 0;
594  }
595 };
596 
614 template<typename ExceptionFactory, typename Clock = typename timer<>::clock>
615 future<semaphore_units<ExceptionFactory, Clock>>
616 get_units(basic_semaphore<ExceptionFactory, Clock>& sem, size_t units) noexcept {
617  return sem.wait(units).then([&sem, units] {
618  return semaphore_units<ExceptionFactory, Clock>{ sem, units };
619  });
620 }
621 
637 template<typename ExceptionFactory, typename Clock = typename timer<>::clock>
638 future<semaphore_units<ExceptionFactory, Clock>>
639 get_units(basic_semaphore<ExceptionFactory, Clock>& sem, size_t units, typename basic_semaphore<ExceptionFactory, Clock>::time_point timeout) noexcept {
640  return sem.wait(timeout, units).then([&sem, units] {
641  return semaphore_units<ExceptionFactory, Clock>{ sem, units };
642  });
643 }
644 
661 template<typename ExceptionFactory, typename Clock>
662 future<semaphore_units<ExceptionFactory, Clock>>
663 get_units(basic_semaphore<ExceptionFactory, Clock>& sem, size_t units, typename basic_semaphore<ExceptionFactory, Clock>::duration timeout) noexcept {
664  return sem.wait(timeout, units).then([&sem, units] {
665  return semaphore_units<ExceptionFactory, Clock>{ sem, units };
666  });
667 }
668 
685 template<typename ExceptionFactory, typename Clock>
686 future<semaphore_units<ExceptionFactory, Clock>>
687 get_units(basic_semaphore<ExceptionFactory, Clock>& sem, size_t units, abort_source& as) noexcept {
688  return sem.wait(as, units).then([&sem, units] {
689  return semaphore_units<ExceptionFactory, Clock>{ sem, units };
690  });
691 }
692 
710 template<typename ExceptionFactory, typename Clock = typename timer<>::clock>
711 std::optional<semaphore_units<ExceptionFactory, Clock>>
712 try_get_units(basic_semaphore<ExceptionFactory, Clock>& sem, size_t units) noexcept {
713  if (!sem.try_wait(units)) {
714  return std::nullopt;
715  }
716  return std::make_optional<semaphore_units<ExceptionFactory, Clock>>(sem, units);
717 }
718 
731 template<typename ExceptionFactory, typename Clock = typename timer<>::clock>
732 semaphore_units<ExceptionFactory, Clock>
734  sem.consume(units);
735  return semaphore_units<ExceptionFactory, Clock>{ sem, units };
736 }
737 
759 template <typename ExceptionFactory, typename Func, typename Clock = typename timer<>::clock>
760 inline
761 futurize_t<std::invoke_result_t<Func>>
762 with_semaphore(basic_semaphore<ExceptionFactory, Clock>& sem, size_t units, Func&& func) noexcept {
763  return get_units(sem, units).then([func = std::forward<Func>(func)] (auto units) mutable {
764  return futurize_invoke(std::forward<Func>(func)).finally([units = std::move(units)] {});
765  });
766 }
767 
793 template <typename ExceptionFactory, typename Clock, typename Func>
794 inline
795 futurize_t<std::invoke_result_t<Func>>
796 with_semaphore(basic_semaphore<ExceptionFactory, Clock>& sem, size_t units, typename basic_semaphore<ExceptionFactory, Clock>::duration timeout, Func&& func) noexcept {
797  return get_units(sem, units, timeout).then([func = std::forward<Func>(func)] (auto units) mutable {
798  return futurize_invoke(std::forward<Func>(func)).finally([units = std::move(units)] {});
799  });
800 }
801 
806 
807 SEASTAR_MODULE_EXPORT_END
808 
810 
811 }
Definition: abort_source.hh:49
Definition: abort_source.hh:59
Counted resource guard.
Definition: semaphore.hh:154
basic_semaphore(size_t count) noexcept(std::is_nothrow_default_constructible_v< exception_factory >)
Definition: semaphore.hh:240
future wait(abort_source &as, size_t nr=1) noexcept
Definition: semaphore.hh:351
static constexpr size_t max_counter() noexcept
Returns the maximum number of units the semaphore counter can hold.
Definition: semaphore.hh:231
size_t current() const noexcept
Definition: semaphore.hh:446
future wait(size_t nr=1) noexcept
Definition: semaphore.hh:299
void ensure_space_for_waiters(size_t n)
Reserve memory for waiters so that wait() will not throw.
Definition: semaphore.hh:488
future wait(time_point timeout, size_t nr=1) noexcept
Definition: semaphore.hh:315
future wait(duration timeout, size_t nr=1) noexcept
Definition: semaphore.hh:384
void consume(size_t nr=1) noexcept
Consume the specific number of units without blocking.
Definition: semaphore.hh:416
ssize_t available_units() const noexcept
Definition: semaphore.hh:452
size_t waiters() const noexcept
Returns the current number of waiters.
Definition: semaphore.hh:455
bool try_wait(size_t nr=1) noexcept
Definition: semaphore.hh:434
void signal(size_t nr=1) noexcept
Definition: semaphore.hh:396
void broken(const Exception &ex) noexcept
Definition: semaphore.hh:478
basic_semaphore & operator=(basic_semaphore &&other) noexcept(std::is_nothrow_move_assignable_v< exception_factory >)
Definition: semaphore.hh:276
basic_semaphore(basic_semaphore &&other) noexcept(std::is_nothrow_move_constructible_v< exception_factory >)
Definition: semaphore.hh:258
void broken() noexcept
Definition: semaphore.hh:460
Definition: semaphore.hh:111
virtual const char * what() const noexcept
Reports the exception reason.
Definition: semaphore.hh:68
virtual const char * what() const noexcept
Reports the exception reason.
Definition: semaphore.hh:118
virtual const char * what() const noexcept
Reports the exception reason.
Definition: semaphore.hh:104
virtual const char * what() const noexcept
Reports the exception reason.
Definition: semaphore.hh:88
virtual const char * what() const noexcept
Reports the exception reason.
Definition: semaphore.hh:78
virtual const char * what() const noexcept
Reports the exception reason.
Definition: semaphore.hh:511
void adopt(semaphore_units &&other) noexcept
Definition: semaphore.hh:581
size_t count() const noexcept
Returns the number of units held.
Definition: semaphore.hh:587
semaphore_units split(size_t units)
Definition: semaphore.hh:569
size_t release() noexcept
Definition: semaphore.hh:558
size_t return_units(size_t units)
Definition: semaphore.hh:540
void return_all() noexcept
Return ownership of all units. The semaphore will be signaled by the number of units returned.
Definition: semaphore.hh:549
Definition: timed_out_error.hh:33
Definition: timer.hh:84
semaphore_units< ExceptionFactory, Clock > consume_units(basic_semaphore< ExceptionFactory, Clock > &sem, size_t units) noexcept
Consume units from semaphore temporarily.
Definition: semaphore.hh:733
void broken(std::exception_ptr ex) noexcept
Definition: semaphore.hh:497
future< T > make_exception_future(std::exception_ptr &&value) noexcept
Creates a future in an available, failed state.
Definition: future.hh:1940
future now()
Returns a ready future.
Definition: later.hh:35
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
Definition: semaphore.hh:127