Seastar
High performance C++ framework for concurrent servers
queue.hh
1 /*
2  * This file is open source software, licensed to you under the terms
3  * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4  * distributed with this work for additional information regarding copyright
5  * ownership. You may not use this file except in compliance with the License.
6  *
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an
13  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14  * KIND, either express or implied. See the License for the
15  * specific language governing permissions and limitations
16  * under the License.
17  */
18 /*
19  * Copyright (C) 2014 Cloudius Systems, Ltd.
20  */
21 
22 #pragma once
23 
24 #include <seastar/core/circular_buffer.hh>
25 #include <seastar/core/future.hh>
26 #include <queue>
27 #include <seastar/util/std-compat.hh>
28 
29 namespace seastar {
30 
34 template <typename T>
35 class queue {
36  std::queue<T, circular_buffer<T>> _q;
37  size_t _max;
38  std::optional<promise<>> _not_empty;
39  std::optional<promise<>> _not_full;
40  std::exception_ptr _ex = nullptr;
41 private:
42  void notify_not_empty();
43  void notify_not_full();
44 public:
45  explicit queue(size_t size);
46 
50  bool push(T&& a);
51 
55  T pop();
56 
61  template <typename Func>
62  bool consume(Func&& func);
63 
65  bool empty() const;
66 
68  bool full() const;
69 
74 
78 
85 
92 
94  size_t size() const { return _q.size(); }
95 
99  size_t max_size() const { return _max; }
100 
104  void set_max_size(size_t max) {
105  _max = max;
106  if (!full()) {
107  notify_not_full();
108  }
109  }
110 
113  void abort(std::exception_ptr ex) {
114  while (!_q.empty()) {
115  _q.pop();
116  }
117  _ex = ex;
118  if (_not_full) {
119  _not_full->set_exception(ex);
120  _not_full= std::nullopt;
121  }
122  if (_not_empty) {
123  _not_empty->set_exception(std::move(ex));
124  _not_empty = std::nullopt;
125  }
126  }
127 
131  bool has_blocked_consumer() const {
132  return bool(_not_empty);
133  }
134 };
135 
136 template <typename T>
137 inline
138 queue<T>::queue(size_t size)
139  : _max(size) {
140 }
141 
142 template <typename T>
143 inline
144 void queue<T>::notify_not_empty() {
145  if (_not_empty) {
146  _not_empty->set_value();
147  _not_empty = std::optional<promise<>>();
148  }
149 }
150 
151 template <typename T>
152 inline
153 void queue<T>::notify_not_full() {
154  if (_not_full) {
155  _not_full->set_value();
156  _not_full = std::optional<promise<>>();
157  }
158 }
159 
160 template <typename T>
161 inline
162 bool queue<T>::push(T&& data) {
163  if (_q.size() < _max) {
164  _q.push(std::move(data));
165  notify_not_empty();
166  return true;
167  } else {
168  return false;
169  }
170 }
171 
172 template <typename T>
173 inline
175  if (_q.size() == _max) {
176  notify_not_full();
177  }
178  T data = std::move(_q.front());
179  _q.pop();
180  return data;
181 }
182 
183 template <typename T>
184 inline
186  if (_ex) {
187  return make_exception_future<T>(_ex);
188  }
189  if (empty()) {
190  return not_empty().then([this] {
191  if (_ex) {
192  return make_exception_future<T>(_ex);
193  } else {
194  return make_ready_future<T>(pop());
195  }
196  });
197  } else {
198  return make_ready_future<T>(pop());
199  }
200 }
201 
202 template <typename T>
203 inline
205  if (_ex) {
206  return make_exception_future<>(_ex);
207  }
208  if (full()) {
209  return not_full().then([this, data = std::move(data)] () mutable {
210  _q.push(std::move(data));
211  notify_not_empty();
212  });
213  } else {
214  _q.push(std::move(data));
215  notify_not_empty();
216  return make_ready_future<>();
217  }
218 }
219 
220 template <typename T>
221 template <typename Func>
222 inline
223 bool queue<T>::consume(Func&& func) {
224  if (_ex) {
225  std::rethrow_exception(_ex);
226  }
227  bool running = true;
228  while (!_q.empty() && running) {
229  running = func(std::move(_q.front()));
230  _q.pop();
231  }
232  if (!full()) {
233  notify_not_full();
234  }
235  return running;
236 }
237 
238 template <typename T>
239 inline
240 bool queue<T>::empty() const {
241  return _q.empty();
242 }
243 
244 template <typename T>
245 inline
246 bool queue<T>::full() const {
247  return _q.size() >= _max;
248 }
249 
250 template <typename T>
251 inline
253  if (_ex) {
254  return make_exception_future<>(_ex);
255  }
256  if (!empty()) {
257  return make_ready_future<>();
258  } else {
259  _not_empty = promise<>();
260  return _not_empty->get_future();
261  }
262 }
263 
264 template <typename T>
265 inline
267  if (_ex) {
268  return make_exception_future<>(_ex);
269  }
270  if (!full()) {
271  return make_ready_future<>();
272  } else {
273  _not_full = promise<>();
274  return _not_full->get_future();
275  }
276 }
277 
278 }
279 
seastar::queue::has_blocked_consumer
bool has_blocked_consumer() const
Check if there is an active consumer.
Definition: queue.hh:131
seastar
Seastar API namespace.
Definition: abort_on_ebadf.hh:24
seastar::queue::push
bool push(T &&a)
Push an item.
Definition: queue.hh:162
seastar::promise
promise - allows a future value to be made available at a later time.
Definition: future.hh:957
seastar::queue::pop
T pop()
Pop an item.
Definition: queue.hh:174
seastar::queue::abort
void abort(std::exception_ptr ex)
Definition: queue.hh:113
seastar::queue::not_empty
future not_empty()
Definition: queue.hh:252
seastar::queue::pop_eventually
future< T > pop_eventually()
Definition: queue.hh:185
seastar::queue
Definition: queue.hh:35
seastar::queue::size
size_t size() const
Returns the number of items currently in the queue.
Definition: queue.hh:94
seastar::queue::max_size
size_t max_size() const
Definition: queue.hh:99
seastar::future
A representation of a possibly not-yet-computed value.
Definition: future.hh:1337
seastar::queue::push_eventually
future push_eventually(T &&data)
Definition: queue.hh:204
seastar::queue::full
bool full() const
Returns true when the queue is full.
Definition: queue.hh:246
seastar::queue::consume
bool consume(Func &&func)
Definition: queue.hh:223
seastar::queue::empty
bool empty() const
Returns true when the queue is empty.
Definition: queue.hh:240
seastar::queue::not_full
future not_full()
Definition: queue.hh:266
seastar::queue::set_max_size
void set_max_size(size_t max)
Definition: queue.hh:104