Seastar
High performance C++ framework for concurrent servers
semaphore.hh
1 /*
2  * This file is open source software, licensed to you under the terms
3  * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4  * distributed with this work for additional information regarding copyright
5  * ownership. You may not use this file except in compliance with the License.
6  *
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an
13  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14  * KIND, either express or implied. See the License for the
15  * specific language governing permissions and limitations
16  * under the License.
17  */
18 /*
19  * Copyright (C) 2014 Cloudius Systems, Ltd.
20  */
21 
22 #pragma once
23 
24 #include <seastar/core/future.hh>
25 #include <seastar/core/chunked_fifo.hh>
26 #include <stdexcept>
27 #include <exception>
28 #include <optional>
29 #include <seastar/core/timer.hh>
30 #include <seastar/core/expiring_fifo.hh>
31 #include <seastar/core/timed_out_error.hh>
32 
33 namespace seastar {
34 
37 
40 class broken_semaphore : public std::exception {
41 public:
43  virtual const char* what() const noexcept;
44 };
45 
51 public:
53  virtual const char* what() const noexcept;
54 };
55 
61  static semaphore_timed_out timeout() noexcept;
62  static broken_semaphore broken() noexcept;
63 };
64 
66  sstring _msg;
67 public:
68  named_semaphore_timed_out(std::string_view msg) noexcept;
69  virtual const char* what() const noexcept;
70 };
71 
73  sstring _msg;
74 public:
75  broken_named_semaphore(std::string_view msg) noexcept;
76  virtual const char* what() const noexcept;
77 };
78 
79 // A factory of semaphore exceptions that contain additional context: the semaphore name
80 // auto sem = named_semaphore(0, named_semaphore_exception_factory{"file_opening_limit_semaphore"});
82  sstring name;
83  named_semaphore_timed_out timeout() const noexcept;
84  broken_named_semaphore broken() const noexcept;
85 };
86 
105 template<typename ExceptionFactory, typename Clock = typename timer<>::clock>
106 class basic_semaphore : private ExceptionFactory {
107 public:
108  using duration = typename timer<Clock>::duration;
109  using clock = typename timer<Clock>::clock;
110  using time_point = typename timer<Clock>::time_point;
111  using exception_factory = ExceptionFactory;
112 private:
113  ssize_t _count;
114  std::exception_ptr _ex;
115  struct entry {
116  promise<> pr;
117  size_t nr;
118  entry(promise<>&& pr_, size_t nr_) noexcept : pr(std::move(pr_)), nr(nr_) {}
119  };
120  using expiry_handler = std::function<void (entry&)>;
122  expiry_handler make_expiry_handler() noexcept {
123  return [this] (entry& e) noexcept {
124  try {
125  e.pr.set_exception(this->timeout());
126  } catch (...) {
127  e.pr.set_exception(semaphore_timed_out());
128  }
129  };
130  }
131  bool has_available_units(size_t nr) const noexcept {
132  return _count >= 0 && (static_cast<size_t>(_count) >= nr);
133  }
134  bool may_proceed(size_t nr) const noexcept {
135  return has_available_units(nr) && _wait_list.empty();
136  }
137 public:
139  static constexpr size_t max_counter() noexcept {
140  return std::numeric_limits<decltype(_count)>::max();
141  }
142 
148  basic_semaphore(size_t count) noexcept(std::is_nothrow_default_constructible_v<exception_factory>)
149  : exception_factory()
150  , _count(count),
151  _wait_list(make_expiry_handler())
152  {}
153  basic_semaphore(size_t count, exception_factory&& factory) noexcept(std::is_nothrow_move_constructible_v<exception_factory>)
154  : exception_factory(std::move(factory))
155  , _count(count)
156  , _wait_list(make_expiry_handler())
157  {
158  static_assert(std::is_nothrow_move_constructible_v<expiry_handler>);
159  }
170  future<> wait(size_t nr = 1) noexcept {
171  return wait(time_point::max(), nr);
172  }
186  future<> wait(time_point timeout, size_t nr = 1) noexcept {
187  if (may_proceed(nr)) {
188  _count -= nr;
189  return make_ready_future<>();
190  }
191  if (_ex) {
192  return make_exception_future(_ex);
193  }
194  entry e(promise<>(), nr);
195  auto fut = e.pr.get_future();
196  try {
197  _wait_list.push_back(std::move(e), timeout);
198  } catch (...) {
199  e.pr.set_exception(std::current_exception());
200  }
201  return fut;
202  }
203 
217  future<> wait(duration timeout, size_t nr = 1) noexcept {
218  return wait(clock::now() + timeout, nr);
219  }
229  void signal(size_t nr = 1) noexcept {
230  if (_ex) {
231  return;
232  }
233  _count += nr;
234  while (!_wait_list.empty() && has_available_units(_wait_list.front().nr)) {
235  auto& x = _wait_list.front();
236  _count -= x.nr;
237  x.pr.set_value();
238  _wait_list.pop_front();
239  }
240  }
241 
243  //
249  void consume(size_t nr = 1) noexcept {
250  if (_ex) {
251  return;
252  }
253  _count -= nr;
254  }
255 
266  bool try_wait(size_t nr = 1) noexcept {
267  if (may_proceed(nr)) {
268  _count -= nr;
269  return true;
270  } else {
271  return false;
272  }
273  }
277  size_t current() const noexcept { return std::max(_count, ssize_t(0)); }
278 
283  ssize_t available_units() const noexcept { return _count; }
284 
286  size_t waiters() const noexcept { return _wait_list.size(); }
287 
291  void broken() noexcept {
292  std::exception_ptr ep;
293  try {
294  ep = std::make_exception_ptr(exception_factory::broken());
295  } catch (...) {
296  ep = std::make_exception_ptr(broken_semaphore());
297  }
298  broken(std::move(ep));
299  }
300 
304  template <typename Exception>
305  void broken(const Exception& ex) noexcept {
306  broken(std::make_exception_ptr(ex));
307  }
308 
312  void broken(std::exception_ptr ex) noexcept;
313 
315  void ensure_space_for_waiters(size_t n) {
316  _wait_list.reserve(n);
317  }
318 };
319 
320 template<typename ExceptionFactory, typename Clock>
321 inline
322 void
323 basic_semaphore<ExceptionFactory, Clock>::broken(std::exception_ptr xp) noexcept {
324  static_assert(std::is_nothrow_copy_constructible_v<std::exception_ptr>);
325  _ex = xp;
326  _count = 0;
327  while (!_wait_list.empty()) {
328  auto& x = _wait_list.front();
329  x.pr.set_exception(xp);
330  _wait_list.pop_front();
331  }
332 }
333 
334 template<typename ExceptionFactory = semaphore_default_exception_factory, typename Clock = typename timer<>::clock>
337  size_t _n;
338 
339  semaphore_units(basic_semaphore<ExceptionFactory, Clock>* sem, size_t n) noexcept : _sem(sem), _n(n) {}
340 public:
341  semaphore_units() noexcept : semaphore_units(nullptr, 0) {}
342  semaphore_units(basic_semaphore<ExceptionFactory, Clock>& sem, size_t n) noexcept : semaphore_units(&sem, n) {}
343  semaphore_units(semaphore_units&& o) noexcept : _sem(o._sem), _n(std::exchange(o._n, 0)) {
344  }
345  semaphore_units& operator=(semaphore_units&& o) noexcept {
346  _sem = o._sem;
347  _n = std::exchange(o._n, 0);
348  return *this;
349  }
350  semaphore_units(const semaphore_units&) = delete;
351  ~semaphore_units() noexcept {
352  return_all();
353  }
361  size_t return_units(size_t units) {
362  if (units > _n) {
363  throw std::invalid_argument("Cannot take more units than those protected by the semaphore");
364  }
365  _n -= units;
366  _sem->signal(units);
367  return _n;
368  }
370  void return_all() noexcept {
371  if (_n) {
372  _sem->signal(_n);
373  _n = 0;
374  }
375  }
379  size_t release() noexcept {
380  return std::exchange(_n, 0);
381  }
390  semaphore_units split(size_t units) {
391  if (units > _n) {
392  throw std::invalid_argument("Cannot take more units than those protected by the semaphore");
393  }
394  _n -= units;
395  return semaphore_units(_sem, units);
396  }
402  void adopt(semaphore_units&& other) noexcept {
403  assert(other._sem == _sem);
404  _n += other.release();
405  }
406 
408  size_t count() const noexcept {
409  return _n;
410  }
411 };
412 
430 template<typename ExceptionFactory, typename Clock = typename timer<>::clock>
431 future<semaphore_units<ExceptionFactory, Clock>>
432 get_units(basic_semaphore<ExceptionFactory, Clock>& sem, size_t units) noexcept {
433  return sem.wait(units).then([&sem, units] {
434  return semaphore_units<ExceptionFactory, Clock>{ sem, units };
435  });
436 }
437 
452 template<typename ExceptionFactory, typename Clock = typename timer<>::clock>
453 future<semaphore_units<ExceptionFactory, Clock>>
454 get_units(basic_semaphore<ExceptionFactory, Clock>& sem, size_t units, typename basic_semaphore<ExceptionFactory, Clock>::time_point timeout) noexcept {
455  return sem.wait(timeout, units).then([&sem, units] {
456  return semaphore_units<ExceptionFactory, Clock>{ sem, units };
457  });
458 }
459 
475 template<typename ExceptionFactory, typename Clock>
476 future<semaphore_units<ExceptionFactory, Clock>>
477 get_units(basic_semaphore<ExceptionFactory, Clock>& sem, size_t units, typename basic_semaphore<ExceptionFactory, Clock>::duration timeout) noexcept {
478  return sem.wait(timeout, units).then([&sem, units] {
479  return semaphore_units<ExceptionFactory, Clock>{ sem, units };
480  });
481 }
482 
500 template<typename ExceptionFactory, typename Clock = typename timer<>::clock>
501 std::optional<semaphore_units<ExceptionFactory, Clock>>
502 try_get_units(basic_semaphore<ExceptionFactory, Clock>& sem, size_t units) noexcept {
503  if (!sem.try_wait(units)) {
504  return std::nullopt;
505  }
506  return std::make_optional<semaphore_units<ExceptionFactory, Clock>>(sem, units);
507 }
508 
521 template<typename ExceptionFactory, typename Clock = typename timer<>::clock>
522 semaphore_units<ExceptionFactory, Clock>
524  sem.consume(units);
525  return semaphore_units<ExceptionFactory, Clock>{ sem, units };
526 }
527 
549 template <typename ExceptionFactory, typename Func, typename Clock = typename timer<>::clock>
550 inline
551 futurize_t<std::result_of_t<Func()>>
552 with_semaphore(basic_semaphore<ExceptionFactory, Clock>& sem, size_t units, Func&& func) noexcept {
553  return get_units(sem, units).then([func = std::forward<Func>(func)] (auto units) mutable {
554  return futurize_invoke(std::forward<Func>(func)).finally([units = std::move(units)] {});
555  });
556 }
557 
583 template <typename ExceptionFactory, typename Clock, typename Func>
584 inline
585 futurize_t<std::result_of_t<Func()>>
586 with_semaphore(basic_semaphore<ExceptionFactory, Clock>& sem, size_t units, typename basic_semaphore<ExceptionFactory, Clock>::duration timeout, Func&& func) noexcept {
587  return get_units(sem, units, timeout).then([func = std::forward<Func>(func)] (auto units) mutable {
588  return futurize_invoke(std::forward<Func>(func)).finally([units = std::move(units)] {});
589  });
590 }
591 
596 
598 
599 }
Counted resource guard.
Definition: semaphore.hh:106
basic_semaphore(size_t count) noexcept(std::is_nothrow_default_constructible_v< exception_factory >)
Definition: semaphore.hh:148
static constexpr size_t max_counter() noexcept
Returns the maximum number of units the semaphore counter can hold.
Definition: semaphore.hh:139
size_t current() const noexcept
Definition: semaphore.hh:277
future wait(size_t nr=1) noexcept
Definition: semaphore.hh:170
void ensure_space_for_waiters(size_t n)
Reserve memory for waiters so that wait() will not throw.
Definition: semaphore.hh:315
future wait(time_point timeout, size_t nr=1) noexcept
Definition: semaphore.hh:186
future wait(duration timeout, size_t nr=1) noexcept
Definition: semaphore.hh:217
void consume(size_t nr=1) noexcept
Consume the specific number of units without blocking.
Definition: semaphore.hh:249
ssize_t available_units() const noexcept
Definition: semaphore.hh:283
size_t waiters() const noexcept
Returns the current number of waiters.
Definition: semaphore.hh:286
bool try_wait(size_t nr=1) noexcept
Definition: semaphore.hh:266
void signal(size_t nr=1) noexcept
Definition: semaphore.hh:229
void broken(const Exception &ex) noexcept
Definition: semaphore.hh:305
void broken() noexcept
Definition: semaphore.hh:291
Definition: semaphore.hh:72
virtual const char * what() const noexcept
Reports the exception reason.
Definition: semaphore.hh:40
virtual const char * what() const noexcept
Reports the exception reason.
bool empty() const noexcept
Definition: expiring_fifo.hh:124
size_t size() const noexcept
Definition: expiring_fifo.hh:154
void pop_front() noexcept
Definition: expiring_fifo.hh:206
void push_back(const T &payload)
Definition: expiring_fifo.hh:168
T & front() noexcept
Definition: expiring_fifo.hh:135
void reserve(size_t size)
Definition: expiring_fifo.hh:162
A representation of a possibly not-yet-computed value.
Definition: future.hh:1337
Definition: semaphore.hh:65
virtual const char * what() const noexcept
Reports the exception reason.
promise - allows a future value to be made available at a later time.
Definition: future.hh:957
Definition: semaphore.hh:50
virtual const char * what() const noexcept
Reports the exception reason.
Definition: semaphore.hh:335
void adopt(semaphore_units &&other) noexcept
Definition: semaphore.hh:402
size_t count() const noexcept
Returns the number of units held.
Definition: semaphore.hh:408
semaphore_units split(size_t units)
Definition: semaphore.hh:390
size_t release() noexcept
Definition: semaphore.hh:379
size_t return_units(size_t units)
Definition: semaphore.hh:361
void return_all() noexcept
Return ownership of all units. The semaphore will be signaled by the number of units returned.
Definition: semaphore.hh:370
Definition: timed_out_error.hh:29
Definition: timer.hh:78
semaphore_units< ExceptionFactory, Clock > consume_units(basic_semaphore< ExceptionFactory, Clock > &sem, size_t units) noexcept
Consume units from semaphore temporarily.
Definition: semaphore.hh:523
void broken(std::exception_ptr ex) noexcept
Definition: semaphore.hh:323
future< T... > make_exception_future(std::exception_ptr &&value) noexcept
Creates a future in an available, failed state.
Definition: future.hh:2054
future now()
Returns a ready future.
Definition: later.hh:35
Seastar API namespace.
Definition: abort_on_ebadf.hh:24