Seastar
High performance C++ framework for concurrent servers
queue.hh
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18/*
19 * Copyright (C) 2014 Cloudius Systems, Ltd.
20 */
21
22#pragma once
23
24#include <seastar/core/circular_buffer.hh>
25#include <seastar/core/future.hh>
26#include <seastar/util/modules.hh>
27#ifndef SEASTAR_MODULE
28#include <optional>
29#include <queue>
30#endif
31
32namespace seastar {
33
41SEASTAR_MODULE_EXPORT
42template <typename T>
43requires std::is_nothrow_move_constructible_v<T>
44class queue {
45 std::queue<T, circular_buffer<T>> _q;
46 size_t _max;
47 std::optional<promise<>> _not_empty;
48 std::optional<promise<>> _not_full;
49 std::exception_ptr _ex = nullptr;
50private:
51 void notify_not_empty() noexcept;
52 void notify_not_full() noexcept;
53public:
54 explicit queue(size_t size);
55
59 bool push(T&& a);
60
64 T pop() noexcept;
65
70 T& front() noexcept;
71
76 template <typename Func>
77 bool consume(Func&& func);
78
80 bool empty() const noexcept;
81
83 bool full() const noexcept;
84
88 future<> not_empty() noexcept;
89
92 future<> not_full() noexcept;
93
99 future<T> pop_eventually() noexcept;
100
106 future<> push_eventually(T&& data) noexcept;
107
109 size_t size() const noexcept {
110 // std::queue::size() has no reason to throw
111 return _q.size();
112 }
113
117 size_t max_size() const noexcept { return _max; }
118
122 void set_max_size(size_t max) noexcept {
123 _max = max;
124 if (!full()) {
125 notify_not_full();
126 }
127 }
128
131 void abort(std::exception_ptr ex) noexcept {
132 // std::queue::empty() and pop() doesn't throw
133 // since it just calls seastar::circular_buffer::pop_front
134 // that is specified as noexcept.
135 while (!_q.empty()) {
136 _q.pop();
137 }
138 _ex = ex;
139 if (_not_full) {
140 _not_full->set_exception(ex);
141 _not_full= std::nullopt;
142 }
143 if (_not_empty) {
144 _not_empty->set_exception(std::move(ex));
145 _not_empty = std::nullopt;
146 }
147 }
148
152 bool has_blocked_consumer() const noexcept {
153 return bool(_not_empty);
154 }
155};
156
157template <typename T>
158requires std::is_nothrow_move_constructible_v<T>
159inline
160queue<T>::queue(size_t size)
161 : _max(size) {
162}
163
164template <typename T>
165requires std::is_nothrow_move_constructible_v<T>
166inline
167void queue<T>::notify_not_empty() noexcept {
168 if (_not_empty) {
169 _not_empty->set_value();
170 _not_empty = std::optional<promise<>>();
171 }
172}
173
174template <typename T>
175requires std::is_nothrow_move_constructible_v<T>
176inline
177void queue<T>::notify_not_full() noexcept {
178 if (_not_full) {
179 _not_full->set_value();
180 _not_full = std::optional<promise<>>();
181 }
182}
183
184template <typename T>
185requires std::is_nothrow_move_constructible_v<T>
186inline
187bool queue<T>::push(T&& data) {
188 if (_q.size() < _max) {
189 _q.push(std::move(data));
190 notify_not_empty();
191 return true;
192 } else {
193 return false;
194 }
195}
196
197template <typename T>
198requires std::is_nothrow_move_constructible_v<T>
199inline
200T& queue<T>::front() noexcept {
201 // std::queue::front() has no reason to throw
202 return _q.front();
203}
204
205template <typename T>
206requires std::is_nothrow_move_constructible_v<T>
207inline
208T queue<T>::pop() noexcept {
209 if (_q.size() == _max) {
210 notify_not_full();
211 }
212 // popping the front element must not throw
213 // as T is required to be nothrow_move_constructible
214 // and std::queue::pop won't throw since it uses
215 // seastar::circular_beffer::pop_front.
216 assert(!_q.empty());
217 T data = std::move(_q.front());
218 _q.pop();
219 return data;
220}
221
222template <typename T>
223requires std::is_nothrow_move_constructible_v<T>
224inline
226 // seastar allows only nothrow_move_constructible types
227 // to be returned as future<T>
228 static_assert(std::is_nothrow_move_constructible_v<T>,
229 "Queue element type must be no-throw move constructible");
230
231 if (_ex) {
232 return make_exception_future<T>(_ex);
233 }
234 if (empty()) {
235 return not_empty().then([this] {
236 if (_ex) {
237 return make_exception_future<T>(_ex);
238 } else {
239 return make_ready_future<T>(pop());
240 }
241 });
242 } else {
243 return make_ready_future<T>(pop());
244 }
245}
246
247template <typename T>
248requires std::is_nothrow_move_constructible_v<T>
249inline
251 if (_ex) {
252 return make_exception_future<>(_ex);
253 }
254 if (full()) {
255 return not_full().then([this, data = std::move(data)] () mutable {
256 _q.push(std::move(data));
257 notify_not_empty();
258 });
259 } else {
260 try {
261 _q.push(std::move(data));
262 notify_not_empty();
263 return make_ready_future<>();
264 } catch (...) {
266 }
267 }
268}
269
270template <typename T>
271requires std::is_nothrow_move_constructible_v<T>
272template <typename Func>
273inline
274bool queue<T>::consume(Func&& func) {
275 if (_ex) {
276 std::rethrow_exception(_ex);
277 }
278 bool running = true;
279 while (!_q.empty() && running) {
280 running = func(std::move(_q.front()));
281 _q.pop();
282 }
283 if (!full()) {
284 notify_not_full();
285 }
286 return running;
287}
288
289template <typename T>
290requires std::is_nothrow_move_constructible_v<T>
291inline
292bool queue<T>::empty() const noexcept {
293 // std::queue::empty() has no reason to throw
294 return _q.empty();
295}
296
297template <typename T>
298requires std::is_nothrow_move_constructible_v<T>
299inline
300bool queue<T>::full() const noexcept {
301 // std::queue::size() has no reason to throw
302 return _q.size() >= _max;
303}
304
305template <typename T>
306requires std::is_nothrow_move_constructible_v<T>
307inline
309 if (_ex) {
310 return make_exception_future<>(_ex);
311 }
312 if (!empty()) {
313 return make_ready_future<>();
314 } else {
315 _not_empty = promise<>();
316 return _not_empty->get_future();
317 }
318}
319
320template <typename T>
321requires std::is_nothrow_move_constructible_v<T>
322inline
324 if (_ex) {
325 return make_exception_future<>(_ex);
326 }
327 if (!full()) {
328 return make_ready_future<>();
329 } else {
330 _not_full = promise<>();
331 return _not_full->get_future();
332 }
333}
334
335}
336
A representation of a possibly not-yet-computed value.
Definition: future.hh:1240
Definition: queue.hh:44
size_t size() const noexcept
Returns the number of items currently in the queue.
Definition: queue.hh:109
bool push(T &&a)
Push an item.
Definition: queue.hh:187
future not_full() noexcept
Definition: queue.hh:323
bool full() const noexcept
Returns true when the queue is full.
Definition: queue.hh:300
T pop() noexcept
Pop an item.
Definition: queue.hh:208
bool consume(Func &&func)
Definition: queue.hh:274
future not_empty() noexcept
Definition: queue.hh:308
future< T > pop_eventually() noexcept
Definition: queue.hh:225
size_t max_size() const noexcept
Definition: queue.hh:117
void set_max_size(size_t max) noexcept
Definition: queue.hh:122
T & front() noexcept
access the front element in the queue
Definition: queue.hh:200
void abort(std::exception_ptr ex) noexcept
Definition: queue.hh:131
bool empty() const noexcept
Returns true when the queue is empty.
Definition: queue.hh:292
bool has_blocked_consumer() const noexcept
Check if there is an active consumer.
Definition: queue.hh:152
future push_eventually(T &&data) noexcept
Definition: queue.hh:250
future< T > current_exception_as_future() noexcept
Returns std::current_exception() wrapped in a future.
Definition: future.hh:1962
Seastar API namespace.
Definition: abort_on_ebadf.hh:26