Seastar
High performance C++ framework for concurrent servers
semaphore.hh
1 /*
2  * This file is open source software, licensed to you under the terms
3  * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4  * distributed with this work for additional information regarding copyright
5  * ownership. You may not use this file except in compliance with the License.
6  *
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an
13  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14  * KIND, either express or implied. See the License for the
15  * specific language governing permissions and limitations
16  * under the License.
17  */
18 /*
19  * Copyright (C) 2014 Cloudius Systems, Ltd.
20  */
21 
22 #pragma once
23 
24 #include <seastar/core/future.hh>
25 #include <seastar/core/chunked_fifo.hh>
26 #include <stdexcept>
27 #include <exception>
28 #include <optional>
29 #include <seastar/core/timer.hh>
30 #include <seastar/core/abortable_fifo.hh>
31 #include <seastar/core/timed_out_error.hh>
32 #include <seastar/core/abort_on_expiry.hh>
33 
34 namespace seastar {
35 
36 namespace internal {
37 // Test if a class T has member function broken()
38 template <typename T>
39 class has_broken {
40  template <typename U> constexpr static bool check(decltype(&U::broken)) { return true; }
41  template <typename U> constexpr static bool check(...) { return false; }
42 
43 public:
44  constexpr static bool value = check<T>(nullptr);
45 };
46 // Test if a class T has member function aborted()
47 template <typename T>
48 class has_aborted {
49  template <typename U> constexpr static bool check(decltype(&U::aborted)) { return true; }
50  template <typename U> constexpr static bool check(...) { return false; }
51 
52 public:
53  constexpr static bool value = check<T>(nullptr);
54 };
55 }
56 
59 
62 class broken_semaphore : public std::exception {
63 public:
65  virtual const char* what() const noexcept;
66 };
67 
73 public:
75  virtual const char* what() const noexcept;
76 };
77 
83 public:
85  virtual const char* what() const noexcept;
86 };
87 
93  static semaphore_timed_out timeout() noexcept;
94  static broken_semaphore broken() noexcept;
95  static semaphore_aborted aborted() noexcept;
96 };
97 
99  sstring _msg;
100 public:
101  named_semaphore_timed_out(std::string_view msg) noexcept;
102  virtual const char* what() const noexcept;
103 };
104 
106  sstring _msg;
107 public:
108  broken_named_semaphore(std::string_view msg) noexcept;
109  virtual const char* what() const noexcept;
110 };
111 
113  sstring _msg;
114 public:
115  named_semaphore_aborted(std::string_view msg) noexcept;
116  virtual const char* what() const noexcept;
117 };
118 
119 // A factory of semaphore exceptions that contain additional context: the semaphore name
120 // auto sem = named_semaphore(0, named_semaphore_exception_factory{"file_opening_limit_semaphore"});
122  sstring name;
123  named_semaphore_timed_out timeout() const noexcept;
124  broken_named_semaphore broken() const noexcept;
125  named_semaphore_aborted aborted() const noexcept;
126 };
127 
147 template<typename ExceptionFactory, typename Clock = typename timer<>::clock>
148 class basic_semaphore : private ExceptionFactory {
149 public:
150  using duration = typename timer<Clock>::duration;
151  using clock = typename timer<Clock>::clock;
152  using time_point = typename timer<Clock>::time_point;
153  using exception_factory = ExceptionFactory;
154 private:
155  ssize_t _count;
156  std::exception_ptr _ex;
157  struct entry {
158  promise<> pr;
159  size_t nr;
160  std::optional<abort_on_expiry<clock>> timer;
161  entry(promise<>&& pr_, size_t nr_) noexcept : pr(std::move(pr_)), nr(nr_) {}
162  };
163  struct expiry_handler {
164  basic_semaphore& sem;
165  void operator()(entry& e) noexcept {
166  if (e.timer) {
167  try {
168  e.pr.set_exception(sem.timeout());
169  } catch (...) {
170  e.pr.set_exception(semaphore_timed_out());
171  }
172  } else if (sem._ex) {
173  e.pr.set_exception(sem._ex);
174  } else {
175  if constexpr (internal::has_aborted<exception_factory>::value) {
176  try {
177  e.pr.set_exception(static_cast<exception_factory>(sem).aborted());
178  } catch (...) {
179  e.pr.set_exception(semaphore_aborted());
180  }
181  } else {
182  e.pr.set_exception(semaphore_aborted());
183  }
184  }
185  }
186  };
187  internal::abortable_fifo<entry, expiry_handler> _wait_list;
188  bool has_available_units(size_t nr) const noexcept {
189  return _count >= 0 && (static_cast<size_t>(_count) >= nr);
190  }
191  bool may_proceed(size_t nr) const noexcept {
192  return has_available_units(nr) && _wait_list.empty();
193  }
194 public:
196  static constexpr size_t max_counter() noexcept {
197  return std::numeric_limits<decltype(_count)>::max();
198  }
199 
205  basic_semaphore(size_t count) noexcept(std::is_nothrow_default_constructible_v<exception_factory>)
206  : exception_factory()
207  , _count(count),
208  _wait_list(expiry_handler{*this})
209  {}
210  basic_semaphore(size_t count, exception_factory&& factory) noexcept(std::is_nothrow_move_constructible_v<exception_factory>)
211  : exception_factory(std::move(factory))
212  , _count(count)
213  , _wait_list(expiry_handler{*this})
214  {
215  static_assert(std::is_nothrow_move_constructible_v<expiry_handler>);
216  }
227  future<> wait(size_t nr = 1) noexcept {
228  return wait(time_point::max(), nr);
229  }
243  future<> wait(time_point timeout, size_t nr = 1) noexcept {
244  if (may_proceed(nr)) {
245  _count -= nr;
246  return make_ready_future<>();
247  }
248  if (_ex) {
249  return make_exception_future(_ex);
250  }
251  try {
252  entry& e = _wait_list.emplace_back(promise<>(), nr);
253  auto f = e.pr.get_future();
254  if (timeout != time_point::max()) {
255  e.timer.emplace(timeout);
256  abort_source& as = e.timer->abort_source();
257  _wait_list.make_back_abortable(as);
258  }
259  return f;
260  } catch (...) {
261  return make_exception_future(std::current_exception());
262  }
263  }
264 
278  future<> wait(abort_source& as, size_t nr = 1) noexcept {
279  if (may_proceed(nr)) {
280  _count -= nr;
281  return make_ready_future<>();
282  }
283  if (_ex) {
284  return make_exception_future(_ex);
285  }
286  try {
287  entry& e = _wait_list.emplace_back(promise<>(), nr);
288  // taking future here since make_back_abortable may expire the entry
289  auto f = e.pr.get_future();
290  _wait_list.make_back_abortable(as);
291  return f;
292  } catch (...) {
293  return make_exception_future(std::current_exception());
294  }
295  }
296 
310  future<> wait(duration timeout, size_t nr = 1) noexcept {
311  return wait(clock::now() + timeout, nr);
312  }
322  void signal(size_t nr = 1) noexcept {
323  if (_ex) {
324  return;
325  }
326  _count += nr;
327  while (!_wait_list.empty() && has_available_units(_wait_list.front().nr)) {
328  auto& x = _wait_list.front();
329  _count -= x.nr;
330  x.pr.set_value();
331  _wait_list.pop_front();
332  }
333  }
334 
336  //
342  void consume(size_t nr = 1) noexcept {
343  if (_ex) {
344  return;
345  }
346  _count -= nr;
347  }
348 
359  bool try_wait(size_t nr = 1) noexcept {
360  if (may_proceed(nr)) {
361  _count -= nr;
362  return true;
363  } else {
364  return false;
365  }
366  }
370  size_t current() const noexcept { return std::max(_count, ssize_t(0)); }
371 
376  ssize_t available_units() const noexcept { return _count; }
377 
379  size_t waiters() const noexcept { return _wait_list.size(); }
380 
384  void broken() noexcept {
385  std::exception_ptr ep;
386  if constexpr (internal::has_broken<exception_factory>::value) {
387  try {
388  ep = std::make_exception_ptr(exception_factory::broken());
389  } catch (...) {
390  ep = std::make_exception_ptr(broken_semaphore());
391  }
392  } else {
393  ep = std::make_exception_ptr(broken_semaphore());
394  }
395  broken(std::move(ep));
396  }
397 
401  template <typename Exception>
402  void broken(const Exception& ex) noexcept {
403  broken(std::make_exception_ptr(ex));
404  }
405 
409  void broken(std::exception_ptr ex) noexcept;
410 
412  void ensure_space_for_waiters(size_t n) {
413  _wait_list.reserve(n);
414  }
415 };
416 
417 template<typename ExceptionFactory, typename Clock>
418 inline
419 void
420 basic_semaphore<ExceptionFactory, Clock>::broken(std::exception_ptr xp) noexcept {
421  static_assert(std::is_nothrow_copy_constructible_v<std::exception_ptr>);
422  _ex = xp;
423  _count = 0;
424  while (!_wait_list.empty()) {
425  auto& x = _wait_list.front();
426  x.pr.set_exception(xp);
427  _wait_list.pop_front();
428  }
429 }
430 
431 template<typename ExceptionFactory = semaphore_default_exception_factory, typename Clock = typename timer<>::clock>
434  size_t _n;
435 
436  semaphore_units(basic_semaphore<ExceptionFactory, Clock>* sem, size_t n) noexcept : _sem(sem), _n(n) {}
437 public:
438  semaphore_units() noexcept : semaphore_units(nullptr, 0) {}
439  semaphore_units(basic_semaphore<ExceptionFactory, Clock>& sem, size_t n) noexcept : semaphore_units(&sem, n) {}
440  semaphore_units(semaphore_units&& o) noexcept : _sem(o._sem), _n(std::exchange(o._n, 0)) {
441  }
442  semaphore_units& operator=(semaphore_units&& o) noexcept {
443  _sem = o._sem;
444  _n = std::exchange(o._n, 0);
445  return *this;
446  }
447  semaphore_units(const semaphore_units&) = delete;
448  ~semaphore_units() noexcept {
449  return_all();
450  }
458  size_t return_units(size_t units) {
459  if (units > _n) {
460  throw std::invalid_argument("Cannot take more units than those protected by the semaphore");
461  }
462  _n -= units;
463  _sem->signal(units);
464  return _n;
465  }
467  void return_all() noexcept {
468  if (_n) {
469  _sem->signal(_n);
470  _n = 0;
471  }
472  }
476  size_t release() noexcept {
477  return std::exchange(_n, 0);
478  }
487  semaphore_units split(size_t units) {
488  if (units > _n) {
489  throw std::invalid_argument("Cannot take more units than those protected by the semaphore");
490  }
491  _n -= units;
492  return semaphore_units(_sem, units);
493  }
499  void adopt(semaphore_units&& other) noexcept {
500  assert(other._sem == _sem);
501  _n += other.release();
502  }
503 
505  size_t count() const noexcept {
506  return _n;
507  }
508 
510  explicit operator bool() const noexcept {
511  return _n != 0;
512  }
513 };
514 
532 template<typename ExceptionFactory, typename Clock = typename timer<>::clock>
533 future<semaphore_units<ExceptionFactory, Clock>>
534 get_units(basic_semaphore<ExceptionFactory, Clock>& sem, size_t units) noexcept {
535  return sem.wait(units).then([&sem, units] {
536  return semaphore_units<ExceptionFactory, Clock>{ sem, units };
537  });
538 }
539 
555 template<typename ExceptionFactory, typename Clock = typename timer<>::clock>
556 future<semaphore_units<ExceptionFactory, Clock>>
557 get_units(basic_semaphore<ExceptionFactory, Clock>& sem, size_t units, typename basic_semaphore<ExceptionFactory, Clock>::time_point timeout) noexcept {
558  return sem.wait(timeout, units).then([&sem, units] {
559  return semaphore_units<ExceptionFactory, Clock>{ sem, units };
560  });
561 }
562 
579 template<typename ExceptionFactory, typename Clock>
580 future<semaphore_units<ExceptionFactory, Clock>>
581 get_units(basic_semaphore<ExceptionFactory, Clock>& sem, size_t units, typename basic_semaphore<ExceptionFactory, Clock>::duration timeout) noexcept {
582  return sem.wait(timeout, units).then([&sem, units] {
583  return semaphore_units<ExceptionFactory, Clock>{ sem, units };
584  });
585 }
586 
603 template<typename ExceptionFactory, typename Clock>
604 future<semaphore_units<ExceptionFactory, Clock>>
605 get_units(basic_semaphore<ExceptionFactory, Clock>& sem, size_t units, abort_source& as) noexcept {
606  return sem.wait(as, units).then([&sem, units] {
607  return semaphore_units<ExceptionFactory, Clock>{ sem, units };
608  });
609 }
610 
628 template<typename ExceptionFactory, typename Clock = typename timer<>::clock>
629 std::optional<semaphore_units<ExceptionFactory, Clock>>
630 try_get_units(basic_semaphore<ExceptionFactory, Clock>& sem, size_t units) noexcept {
631  if (!sem.try_wait(units)) {
632  return std::nullopt;
633  }
634  return std::make_optional<semaphore_units<ExceptionFactory, Clock>>(sem, units);
635 }
636 
649 template<typename ExceptionFactory, typename Clock = typename timer<>::clock>
650 semaphore_units<ExceptionFactory, Clock>
652  sem.consume(units);
653  return semaphore_units<ExceptionFactory, Clock>{ sem, units };
654 }
655 
677 template <typename ExceptionFactory, typename Func, typename Clock = typename timer<>::clock>
678 inline
679 futurize_t<std::invoke_result_t<Func>>
680 with_semaphore(basic_semaphore<ExceptionFactory, Clock>& sem, size_t units, Func&& func) noexcept {
681  return get_units(sem, units).then([func = std::forward<Func>(func)] (auto units) mutable {
682  return futurize_invoke(std::forward<Func>(func)).finally([units = std::move(units)] {});
683  });
684 }
685 
711 template <typename ExceptionFactory, typename Clock, typename Func>
712 inline
713 futurize_t<std::invoke_result_t<Func>>
714 with_semaphore(basic_semaphore<ExceptionFactory, Clock>& sem, size_t units, typename basic_semaphore<ExceptionFactory, Clock>::duration timeout, Func&& func) noexcept {
715  return get_units(sem, units, timeout).then([func = std::forward<Func>(func)] (auto units) mutable {
716  return futurize_invoke(std::forward<Func>(func)).finally([units = std::move(units)] {});
717  });
718 }
719 
724 
726 
727 }
Definition: abort_source.hh:41
Definition: abort_source.hh:51
Counted resource guard.
Definition: semaphore.hh:148
basic_semaphore(size_t count) noexcept(std::is_nothrow_default_constructible_v< exception_factory >)
Definition: semaphore.hh:205
future wait(abort_source &as, size_t nr=1) noexcept
Definition: semaphore.hh:278
static constexpr size_t max_counter() noexcept
Returns the maximum number of units the semaphore counter can hold.
Definition: semaphore.hh:196
size_t current() const noexcept
Definition: semaphore.hh:370
future wait(size_t nr=1) noexcept
Definition: semaphore.hh:227
void ensure_space_for_waiters(size_t n)
Reserve memory for waiters so that wait() will not throw.
Definition: semaphore.hh:412
future wait(time_point timeout, size_t nr=1) noexcept
Definition: semaphore.hh:243
future wait(duration timeout, size_t nr=1) noexcept
Definition: semaphore.hh:310
void consume(size_t nr=1) noexcept
Consume the specific number of units without blocking.
Definition: semaphore.hh:342
ssize_t available_units() const noexcept
Definition: semaphore.hh:376
size_t waiters() const noexcept
Returns the current number of waiters.
Definition: semaphore.hh:379
bool try_wait(size_t nr=1) noexcept
Definition: semaphore.hh:359
void signal(size_t nr=1) noexcept
Definition: semaphore.hh:322
void broken(const Exception &ex) noexcept
Definition: semaphore.hh:402
void broken() noexcept
Definition: semaphore.hh:384
Definition: semaphore.hh:105
virtual const char * what() const noexcept
Reports the exception reason.
Definition: semaphore.hh:62
virtual const char * what() const noexcept
Reports the exception reason.
Definition: semaphore.hh:112
virtual const char * what() const noexcept
Reports the exception reason.
Definition: semaphore.hh:98
virtual const char * what() const noexcept
Reports the exception reason.
Definition: semaphore.hh:82
virtual const char * what() const noexcept
Reports the exception reason.
Definition: semaphore.hh:72
virtual const char * what() const noexcept
Reports the exception reason.
Definition: semaphore.hh:432
void adopt(semaphore_units &&other) noexcept
Definition: semaphore.hh:499
size_t count() const noexcept
Returns the number of units held.
Definition: semaphore.hh:505
semaphore_units split(size_t units)
Definition: semaphore.hh:487
size_t release() noexcept
Definition: semaphore.hh:476
size_t return_units(size_t units)
Definition: semaphore.hh:458
void return_all() noexcept
Return ownership of all units. The semaphore will be signaled by the number of units returned.
Definition: semaphore.hh:467
Definition: timed_out_error.hh:29
Definition: timer.hh:78
semaphore_units< ExceptionFactory, Clock > consume_units(basic_semaphore< ExceptionFactory, Clock > &sem, size_t units) noexcept
Consume units from semaphore temporarily.
Definition: semaphore.hh:651
void broken(std::exception_ptr ex) noexcept
Definition: semaphore.hh:420
future< T... > make_exception_future(std::exception_ptr &&value) noexcept
Creates a future in an available, failed state.
Definition: future.hh:2068
future now()
Returns a ready future.
Definition: later.hh:35
Seastar API namespace.
Definition: abort_on_ebadf.hh:24
Definition: semaphore.hh:121