Seastar
High performance C++ framework for concurrent servers
queue.hh
1 /*
2  * This file is open source software, licensed to you under the terms
3  * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4  * distributed with this work for additional information regarding copyright
5  * ownership. You may not use this file except in compliance with the License.
6  *
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an
13  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14  * KIND, either express or implied. See the License for the
15  * specific language governing permissions and limitations
16  * under the License.
17  */
18 /*
19  * Copyright (C) 2014 Cloudius Systems, Ltd.
20  */
21 
22 #pragma once
23 
24 #include <seastar/core/circular_buffer.hh>
25 #include <seastar/core/future.hh>
26 #include <seastar/util/std-compat.hh>
27 #include <seastar/util/modules.hh>
28 #ifndef SEASTAR_MODULE
29 #include <queue>
30 #endif
31 
32 namespace seastar {
33 
41 SEASTAR_MODULE_EXPORT
42 template <typename T>
43 SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<T>)
44 class queue {
45  std::queue<T, circular_buffer<T>> _q;
46  size_t _max;
47  std::optional<promise<>> _not_empty;
48  std::optional<promise<>> _not_full;
49  std::exception_ptr _ex = nullptr;
50 private:
51  void notify_not_empty() noexcept;
52  void notify_not_full() noexcept;
53 public:
54  explicit queue(size_t size);
55 
59  bool push(T&& a);
60 
64  T pop() noexcept;
65 
70  T& front() noexcept;
71 
76  template <typename Func>
77  bool consume(Func&& func);
78 
80  bool empty() const noexcept;
81 
83  bool full() const noexcept;
84 
88  future<> not_empty() noexcept;
89 
92  future<> not_full() noexcept;
93 
99  future<T> pop_eventually() noexcept;
100 
106  future<> push_eventually(T&& data) noexcept;
107 
109  size_t size() const noexcept {
110  // std::queue::size() has no reason to throw
111  return _q.size();
112  }
113 
117  size_t max_size() const noexcept { return _max; }
118 
122  void set_max_size(size_t max) noexcept {
123  _max = max;
124  if (!full()) {
125  notify_not_full();
126  }
127  }
128 
131  void abort(std::exception_ptr ex) noexcept {
132  // std::queue::empty() and pop() doesn't throw
133  // since it just calls seastar::circular_buffer::pop_front
134  // that is specified as noexcept.
135  while (!_q.empty()) {
136  _q.pop();
137  }
138  _ex = ex;
139  if (_not_full) {
140  _not_full->set_exception(ex);
141  _not_full= std::nullopt;
142  }
143  if (_not_empty) {
144  _not_empty->set_exception(std::move(ex));
145  _not_empty = std::nullopt;
146  }
147  }
148 
152  bool has_blocked_consumer() const noexcept {
153  return bool(_not_empty);
154  }
155 };
156 
157 template <typename T>
158 SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<T>)
159 inline
160 queue<T>::queue(size_t size)
161  : _max(size) {
162 }
163 
164 template <typename T>
165 SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<T>)
166 inline
167 void queue<T>::notify_not_empty() noexcept {
168  if (_not_empty) {
169  _not_empty->set_value();
170  _not_empty = std::optional<promise<>>();
171  }
172 }
173 
174 template <typename T>
175 SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<T>)
176 inline
177 void queue<T>::notify_not_full() noexcept {
178  if (_not_full) {
179  _not_full->set_value();
180  _not_full = std::optional<promise<>>();
181  }
182 }
183 
184 template <typename T>
185 SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<T>)
186 inline
187 bool queue<T>::push(T&& data) {
188  if (_q.size() < _max) {
189  _q.push(std::move(data));
190  notify_not_empty();
191  return true;
192  } else {
193  return false;
194  }
195 }
196 
197 template <typename T>
198 SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<T>)
199 inline
200 T& queue<T>::front() noexcept {
201  // std::queue::front() has no reason to throw
202  return _q.front();
203 }
204 
205 template <typename T>
206 SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<T>)
207 inline
208 T queue<T>::pop() noexcept {
209  if (_q.size() == _max) {
210  notify_not_full();
211  }
212  // popping the front element must not throw
213  // as T is required to be nothrow_move_constructible
214  // and std::queue::pop won't throw since it uses
215  // seastar::circular_beffer::pop_front.
216  assert(!_q.empty());
217  T data = std::move(_q.front());
218  _q.pop();
219  return data;
220 }
221 
222 template <typename T>
223 SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<T>)
224 inline
226  // seastar allows only nothrow_move_constructible types
227  // to be returned as future<T>
228  static_assert(std::is_nothrow_move_constructible_v<T>,
229  "Queue element type must be no-throw move constructible");
230 
231  if (_ex) {
232  return make_exception_future<T>(_ex);
233  }
234  if (empty()) {
235  return not_empty().then([this] {
236  if (_ex) {
237  return make_exception_future<T>(_ex);
238  } else {
239  return make_ready_future<T>(pop());
240  }
241  });
242  } else {
243  return make_ready_future<T>(pop());
244  }
245 }
246 
247 template <typename T>
248 SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<T>)
249 inline
251  if (_ex) {
252  return make_exception_future<>(_ex);
253  }
254  if (full()) {
255  return not_full().then([this, data = std::move(data)] () mutable {
256  _q.push(std::move(data));
257  notify_not_empty();
258  });
259  } else {
260  try {
261  _q.push(std::move(data));
262  notify_not_empty();
263  return make_ready_future<>();
264  } catch (...) {
266  }
267  }
268 }
269 
270 template <typename T>
271 SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<T>)
272 template <typename Func>
273 inline
274 bool queue<T>::consume(Func&& func) {
275  if (_ex) {
276  std::rethrow_exception(_ex);
277  }
278  bool running = true;
279  while (!_q.empty() && running) {
280  running = func(std::move(_q.front()));
281  _q.pop();
282  }
283  if (!full()) {
284  notify_not_full();
285  }
286  return running;
287 }
288 
289 template <typename T>
290 SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<T>)
291 inline
292 bool queue<T>::empty() const noexcept {
293  // std::queue::empty() has no reason to throw
294  return _q.empty();
295 }
296 
297 template <typename T>
298 SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<T>)
299 inline
300 bool queue<T>::full() const noexcept {
301  // std::queue::size() has no reason to throw
302  return _q.size() >= _max;
303 }
304 
305 template <typename T>
306 SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<T>)
307 inline
309  if (_ex) {
310  return make_exception_future<>(_ex);
311  }
312  if (!empty()) {
313  return make_ready_future<>();
314  } else {
315  _not_empty = promise<>();
316  return _not_empty->get_future();
317  }
318 }
319 
320 template <typename T>
321 SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible_v<T>)
322 inline
324  if (_ex) {
325  return make_exception_future<>(_ex);
326  }
327  if (!full()) {
328  return make_ready_future<>();
329  } else {
330  _not_full = promise<>();
331  return _not_full->get_future();
332  }
333 }
334 
335 }
336 
A representation of a possibly not-yet-computed value.
Definition: future.hh:1238
Definition: queue.hh:44
T & front() noexcept
access the front element in the queue
Definition: queue.hh:200
size_t max_size() const noexcept
Definition: queue.hh:117
future push_eventually(T &&data) noexcept
Definition: queue.hh:250
bool full() const noexcept
Returns true when the queue is full.
Definition: queue.hh:300
future< T > pop_eventually() noexcept
Definition: queue.hh:225
void set_max_size(size_t max) noexcept
Definition: queue.hh:122
void abort(std::exception_ptr ex) noexcept
Definition: queue.hh:131
bool consume(Func &&func)
Definition: queue.hh:274
T pop() noexcept
Pop an item.
Definition: queue.hh:208
future not_empty() noexcept
Definition: queue.hh:308
bool has_blocked_consumer() const noexcept
Check if there is an active consumer.
Definition: queue.hh:152
bool empty() const noexcept
Returns true when the queue is empty.
Definition: queue.hh:292
bool push(T &&a)
Push an item.
Definition: queue.hh:187
future not_full() noexcept
Definition: queue.hh:323
future< T > current_exception_as_future() noexcept
Returns std::current_exception() wrapped in a future.
Definition: future.hh:1953
Seastar API namespace.
Definition: abort_on_ebadf.hh:26