Seastar
High performance C++ framework for concurrent servers
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Modules Pages
queue.hh
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18/*
19 * Copyright (C) 2014 Cloudius Systems, Ltd.
20 */
21
22#pragma once
23
24#include <seastar/core/circular_buffer.hh>
25#include <seastar/core/future.hh>
26#include <seastar/util/assert.hh>
27#include <seastar/util/modules.hh>
28#ifndef SEASTAR_MODULE
29#include <optional>
30#include <queue>
31#endif
32
33namespace seastar {
34
42SEASTAR_MODULE_EXPORT
43template <typename T>
44requires std::is_nothrow_move_constructible_v<T>
45class queue {
46 std::queue<T, circular_buffer<T>> _q;
47 size_t _max;
48 std::optional<promise<>> _not_empty;
49 std::optional<promise<>> _not_full;
50 std::exception_ptr _ex = nullptr;
51private:
52 void notify_not_empty() noexcept;
53 void notify_not_full() noexcept;
54public:
55 explicit queue(size_t size);
56
60 bool push(T&& a);
61
65 T pop() noexcept;
66
71 T& front() noexcept;
72
77 template <typename Func>
78 bool consume(Func&& func);
79
81 bool empty() const noexcept;
82
84 bool full() const noexcept;
85
89 future<> not_empty() noexcept;
90
93 future<> not_full() noexcept;
94
100 future<T> pop_eventually() noexcept;
101
107 future<> push_eventually(T&& data) noexcept;
108
110 size_t size() const noexcept {
111 // std::queue::size() has no reason to throw
112 return _q.size();
113 }
114
118 size_t max_size() const noexcept { return _max; }
119
123 void set_max_size(size_t max) noexcept {
124 _max = max;
125 if (!full()) {
126 notify_not_full();
127 }
128 }
129
132 void abort(std::exception_ptr ex) noexcept {
133 // std::queue::empty() and pop() doesn't throw
134 // since it just calls seastar::circular_buffer::pop_front
135 // that is specified as noexcept.
136 while (!_q.empty()) {
137 _q.pop();
138 }
139 _ex = ex;
140 if (_not_full) {
141 _not_full->set_exception(ex);
142 _not_full= std::nullopt;
143 }
144 if (_not_empty) {
145 _not_empty->set_exception(std::move(ex));
146 _not_empty = std::nullopt;
147 }
148 }
149
153 bool has_blocked_consumer() const noexcept {
154 return bool(_not_empty);
155 }
156};
157
158template <typename T>
159requires std::is_nothrow_move_constructible_v<T>
160inline
161queue<T>::queue(size_t size)
162 : _max(size) {
163}
164
165template <typename T>
166requires std::is_nothrow_move_constructible_v<T>
167inline
168void queue<T>::notify_not_empty() noexcept {
169 if (_not_empty) {
170 _not_empty->set_value();
171 _not_empty = std::optional<promise<>>();
172 }
173}
174
175template <typename T>
176requires std::is_nothrow_move_constructible_v<T>
177inline
178void queue<T>::notify_not_full() noexcept {
179 if (_not_full) {
180 _not_full->set_value();
181 _not_full = std::optional<promise<>>();
182 }
183}
184
185template <typename T>
186requires std::is_nothrow_move_constructible_v<T>
187inline
188bool queue<T>::push(T&& data) {
189 if (_q.size() < _max) {
190 _q.push(std::move(data));
191 notify_not_empty();
192 return true;
193 } else {
194 return false;
195 }
196}
197
198template <typename T>
199requires std::is_nothrow_move_constructible_v<T>
200inline
201T& queue<T>::front() noexcept {
202 // std::queue::front() has no reason to throw
203 return _q.front();
204}
205
206template <typename T>
207requires std::is_nothrow_move_constructible_v<T>
208inline
209T queue<T>::pop() noexcept {
210 if (_q.size() == _max) {
211 notify_not_full();
212 }
213 // popping the front element must not throw
214 // as T is required to be nothrow_move_constructible
215 // and std::queue::pop won't throw since it uses
216 // seastar::circular_beffer::pop_front.
217 SEASTAR_ASSERT(!_q.empty());
218 T data = std::move(_q.front());
219 _q.pop();
220 return data;
221}
222
223template <typename T>
224requires std::is_nothrow_move_constructible_v<T>
225inline
227 // seastar allows only nothrow_move_constructible types
228 // to be returned as future<T>
229 static_assert(std::is_nothrow_move_constructible_v<T>,
230 "Queue element type must be no-throw move constructible");
231
232 if (_ex) {
233 return make_exception_future<T>(_ex);
234 }
235 if (empty()) {
236 return not_empty().then([this] {
237 if (_ex) {
238 return make_exception_future<T>(_ex);
239 } else {
240 return make_ready_future<T>(pop());
241 }
242 });
243 } else {
244 return make_ready_future<T>(pop());
245 }
246}
247
248template <typename T>
249requires std::is_nothrow_move_constructible_v<T>
250inline
252 if (_ex) {
253 return make_exception_future<>(_ex);
254 }
255 if (full()) {
256 return not_full().then([this, data = std::move(data)] () mutable {
257 _q.push(std::move(data));
258 notify_not_empty();
259 });
260 } else {
261 try {
262 _q.push(std::move(data));
263 notify_not_empty();
264 return make_ready_future<>();
265 } catch (...) {
267 }
268 }
269}
270
271template <typename T>
272requires std::is_nothrow_move_constructible_v<T>
273template <typename Func>
274inline
275bool queue<T>::consume(Func&& func) {
276 if (_ex) {
277 std::rethrow_exception(_ex);
278 }
279 bool running = true;
280 while (!_q.empty() && running) {
281 running = func(std::move(_q.front()));
282 _q.pop();
283 }
284 if (!full()) {
285 notify_not_full();
286 }
287 return running;
288}
289
290template <typename T>
291requires std::is_nothrow_move_constructible_v<T>
292inline
293bool queue<T>::empty() const noexcept {
294 // std::queue::empty() has no reason to throw
295 return _q.empty();
296}
297
298template <typename T>
299requires std::is_nothrow_move_constructible_v<T>
300inline
301bool queue<T>::full() const noexcept {
302 // std::queue::size() has no reason to throw
303 return _q.size() >= _max;
304}
305
306template <typename T>
307requires std::is_nothrow_move_constructible_v<T>
308inline
310 if (_ex) {
311 return make_exception_future<>(_ex);
312 }
313 if (!empty()) {
314 return make_ready_future<>();
315 } else {
316 _not_empty = promise<>();
317 return _not_empty->get_future();
318 }
319}
320
321template <typename T>
322requires std::is_nothrow_move_constructible_v<T>
323inline
325 if (_ex) {
326 return make_exception_future<>(_ex);
327 }
328 if (!full()) {
329 return make_ready_future<>();
330 } else {
331 _not_full = promise<>();
332 return _not_full->get_future();
333 }
334}
335
336}
337
A representation of a possibly not-yet-computed value.
Definition: future.hh:1197
Definition: queue.hh:45
size_t size() const noexcept
Returns the number of items currently in the queue.
Definition: queue.hh:110
bool push(T &&a)
Push an item.
Definition: queue.hh:188
future not_full() noexcept
Definition: queue.hh:324
bool full() const noexcept
Returns true when the queue is full.
Definition: queue.hh:301
T pop() noexcept
Pop an item.
Definition: queue.hh:209
bool consume(Func &&func)
Definition: queue.hh:275
future not_empty() noexcept
Definition: queue.hh:309
future< T > pop_eventually() noexcept
Definition: queue.hh:226
size_t max_size() const noexcept
Definition: queue.hh:118
void set_max_size(size_t max) noexcept
Definition: queue.hh:123
T & front() noexcept
access the front element in the queue
Definition: queue.hh:201
void abort(std::exception_ptr ex) noexcept
Definition: queue.hh:132
bool empty() const noexcept
Returns true when the queue is empty.
Definition: queue.hh:293
bool has_blocked_consumer() const noexcept
Check if there is an active consumer.
Definition: queue.hh:153
future push_eventually(T &&data) noexcept
Definition: queue.hh:251
future< T > current_exception_as_future() noexcept
Returns std::current_exception() wrapped in a future.
Definition: future.hh:1888
Seastar API namespace.
Definition: abort_on_ebadf.hh:26