Seastar
High performance C++ framework for concurrent servers
queue.hh
1 /*
2  * This file is open source software, licensed to you under the terms
3  * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4  * distributed with this work for additional information regarding copyright
5  * ownership. You may not use this file except in compliance with the License.
6  *
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an
13  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14  * KIND, either express or implied. See the License for the
15  * specific language governing permissions and limitations
16  * under the License.
17  */
18 /*
19  * Copyright (C) 2014 Cloudius Systems, Ltd.
20  */
21 
22 #pragma once
23 
24 #include <seastar/core/circular_buffer.hh>
25 #include <seastar/core/future.hh>
26 #include <queue>
27 #include <seastar/util/std-compat.hh>
28 
29 namespace seastar {
30 
38 template <typename T>
39 SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<T>)
40 class queue {
41  std::queue<T, circular_buffer<T>> _q;
42  size_t _max;
43  std::optional<promise<>> _not_empty;
44  std::optional<promise<>> _not_full;
45  std::exception_ptr _ex = nullptr;
46 private:
47  void notify_not_empty() noexcept;
48  void notify_not_full() noexcept;
49 public:
50  explicit queue(size_t size);
51 
55  bool push(T&& a);
56 
60  T pop() noexcept;
61 
66  T& front() noexcept;
67 
72  template <typename Func>
73  bool consume(Func&& func);
74 
76  bool empty() const noexcept;
77 
79  bool full() const noexcept;
80 
84  future<> not_empty() noexcept;
85 
88  future<> not_full() noexcept;
89 
95  future<T> pop_eventually() noexcept;
96 
102  future<> push_eventually(T&& data) noexcept;
103 
105  size_t size() const noexcept {
106  // std::queue::size() has no reason to throw
107  return _q.size();
108  }
109 
113  size_t max_size() const noexcept { return _max; }
114 
118  void set_max_size(size_t max) noexcept {
119  _max = max;
120  if (!full()) {
121  notify_not_full();
122  }
123  }
124 
127  void abort(std::exception_ptr ex) noexcept {
128  // std::queue::empty() and pop() doesn't throw
129  // since it just calls seastar::circular_buffer::pop_front
130  // that is specified as noexcept.
131  while (!_q.empty()) {
132  _q.pop();
133  }
134  _ex = ex;
135  if (_not_full) {
136  _not_full->set_exception(ex);
137  _not_full= std::nullopt;
138  }
139  if (_not_empty) {
140  _not_empty->set_exception(std::move(ex));
141  _not_empty = std::nullopt;
142  }
143  }
144 
148  bool has_blocked_consumer() const noexcept {
149  return bool(_not_empty);
150  }
151 };
152 
153 template <typename T>
154 SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<T>)
155 inline
156 queue<T>::queue(size_t size)
157  : _max(size) {
158 }
159 
160 template <typename T>
161 SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<T>)
162 inline
163 void queue<T>::notify_not_empty() noexcept {
164  if (_not_empty) {
165  _not_empty->set_value();
166  _not_empty = std::optional<promise<>>();
167  }
168 }
169 
170 template <typename T>
171 SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<T>)
172 inline
173 void queue<T>::notify_not_full() noexcept {
174  if (_not_full) {
175  _not_full->set_value();
176  _not_full = std::optional<promise<>>();
177  }
178 }
179 
180 template <typename T>
181 SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<T>)
182 inline
183 bool queue<T>::push(T&& data) {
184  if (_q.size() < _max) {
185  _q.push(std::move(data));
186  notify_not_empty();
187  return true;
188  } else {
189  return false;
190  }
191 }
192 
193 template <typename T>
194 SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<T>)
195 inline
196 T& queue<T>::front() noexcept {
197  // std::queue::front() has no reason to throw
198  return _q.front();
199 }
200 
201 template <typename T>
202 SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<T>)
203 inline
204 T queue<T>::pop() noexcept {
205  if (_q.size() == _max) {
206  notify_not_full();
207  }
208  // popping the front element must not throw
209  // as T is required to be nothrow_move_constructible
210  // and std::queue::pop won't throw since it uses
211  // seastar::circular_beffer::pop_front.
212  T data = std::move(_q.front());
213  _q.pop();
214  return data;
215 }
216 
217 template <typename T>
218 SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<T>)
219 inline
221  // seastar allows only nothrow_move_constructible types
222  // to be returned as future<T>
223  static_assert(std::is_nothrow_move_constructible_v<T>,
224  "Queue element type must be no-throw move constructible");
225 
226  if (_ex) {
227  return make_exception_future<T>(_ex);
228  }
229  if (empty()) {
230  return not_empty().then([this] {
231  if (_ex) {
232  return make_exception_future<T>(_ex);
233  } else {
234  return make_ready_future<T>(pop());
235  }
236  });
237  } else {
238  return make_ready_future<T>(pop());
239  }
240 }
241 
242 template <typename T>
243 SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<T>)
244 inline
246  if (_ex) {
247  return make_exception_future<>(_ex);
248  }
249  if (full()) {
250  return not_full().then([this, data = std::move(data)] () mutable {
251  _q.push(std::move(data));
252  notify_not_empty();
253  });
254  } else {
255  try {
256  _q.push(std::move(data));
257  notify_not_empty();
258  return make_ready_future<>();
259  } catch (...) {
261  }
262  }
263 }
264 
265 template <typename T>
266 SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<T>)
267 template <typename Func>
268 inline
269 bool queue<T>::consume(Func&& func) {
270  if (_ex) {
271  std::rethrow_exception(_ex);
272  }
273  bool running = true;
274  while (!_q.empty() && running) {
275  running = func(std::move(_q.front()));
276  _q.pop();
277  }
278  if (!full()) {
279  notify_not_full();
280  }
281  return running;
282 }
283 
284 template <typename T>
285 SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<T>)
286 inline
287 bool queue<T>::empty() const noexcept {
288  // std::queue::empty() has no reason to throw
289  return _q.empty();
290 }
291 
292 template <typename T>
293 SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<T>)
294 inline
295 bool queue<T>::full() const noexcept {
296  // std::queue::size() has no reason to throw
297  return _q.size() >= _max;
298 }
299 
300 template <typename T>
301 SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<T>)
302 inline
304  if (_ex) {
305  return make_exception_future<>(_ex);
306  }
307  if (!empty()) {
308  return make_ready_future<>();
309  } else {
310  _not_empty = promise<>();
311  return _not_empty->get_future();
312  }
313 }
314 
315 template <typename T>
316 SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<T>)
317 inline
319  if (_ex) {
320  return make_exception_future<>(_ex);
321  }
322  if (!full()) {
323  return make_ready_future<>();
324  } else {
325  _not_full = promise<>();
326  return _not_full->get_future();
327  }
328 }
329 
330 }
331 
A representation of a possibly not-yet-computed value.
Definition: future.hh:1337
promise - allows a future value to be made available at a later time.
Definition: future.hh:957
Definition: queue.hh:40
T & front() noexcept
access the front element in the queue
Definition: queue.hh:196
size_t max_size() const noexcept
Definition: queue.hh:113
future push_eventually(T &&data) noexcept
Definition: queue.hh:245
bool full() const noexcept
Returns true when the queue is full.
Definition: queue.hh:295
future< T > pop_eventually() noexcept
Definition: queue.hh:220
void set_max_size(size_t max) noexcept
Definition: queue.hh:118
void abort(std::exception_ptr ex) noexcept
Definition: queue.hh:127
bool consume(Func &&func)
Definition: queue.hh:269
T pop() noexcept
Pop an item.
Definition: queue.hh:204
future not_empty() noexcept
Definition: queue.hh:303
bool has_blocked_consumer() const noexcept
Check if there is an active consumer.
Definition: queue.hh:148
bool empty() const noexcept
Returns true when the queue is empty.
Definition: queue.hh:287
bool push(T &&a)
Push an item.
Definition: queue.hh:183
future not_full() noexcept
Definition: queue.hh:318
future< T... > current_exception_as_future() noexcept
Returns std::current_exception() wrapped in a future.
Definition: future.hh:2065
Seastar API namespace.
Definition: abort_on_ebadf.hh:24