Seastar
High performance C++ framework for concurrent servers
queue.hh
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18/*
19 * Copyright (C) 2014 Cloudius Systems, Ltd.
20 */
21
22#pragma once
23
24#include <seastar/core/circular_buffer.hh>
25#include <seastar/core/future.hh>
26#include <seastar/util/assert.hh>
27#include <seastar/util/modules.hh>
28#ifndef SEASTAR_MODULE
29#include <optional>
30#include <queue>
31#endif
32
33namespace seastar {
34
42SEASTAR_MODULE_EXPORT
43template <typename T>
44requires std::is_nothrow_move_constructible_v<T>
45class queue {
46 std::queue<T, circular_buffer<T>> _q;
47 size_t _max;
48 std::optional<promise<>> _not_empty;
49 std::optional<promise<>> _not_full;
50 std::exception_ptr _ex = nullptr;
51private:
52 void notify_not_empty() noexcept;
53 void notify_not_full() noexcept;
54public:
55 explicit queue(size_t size);
56
60 bool push(T&& a);
61
65 T pop() noexcept;
66
71 T& front() noexcept;
72
77 template <typename Func>
78 bool consume(Func&& func);
79
81 bool empty() const noexcept;
82
84 bool full() const noexcept;
85
89 future<> not_empty() noexcept;
90
93 future<> not_full() noexcept;
94
100 future<T> pop_eventually() noexcept;
101
107 future<> push_eventually(T&& data) noexcept;
108
110 size_t size() const noexcept {
111 // std::queue::size() has no reason to throw
112 return _q.size();
113 }
114
118 size_t max_size() const noexcept { return _max; }
119
123 void set_max_size(size_t max) noexcept {
124 _max = max;
125 if (!full()) {
126 notify_not_full();
127 }
128 }
129
132 void abort(std::exception_ptr ex) noexcept {
133 // std::queue::empty() and pop() doesn't throw
134 // since it just calls seastar::circular_buffer::pop_front
135 // that is specified as noexcept.
136 while (!_q.empty()) {
137 _q.pop();
138 }
139 _ex = ex;
140 if (_not_full) {
141 _not_full->set_exception(ex);
142 _not_full= std::nullopt;
143 }
144 if (_not_empty) {
145 _not_empty->set_exception(std::move(ex));
146 _not_empty = std::nullopt;
147 }
148 }
149
153 bool has_blocked_consumer() const noexcept {
154 return bool(_not_empty);
155 }
156};
157
158template <typename T>
159requires std::is_nothrow_move_constructible_v<T>
160inline
161queue<T>::queue(size_t size)
162 : _max(size) {
163}
164
165template <typename T>
166requires std::is_nothrow_move_constructible_v<T>
167inline
168void queue<T>::notify_not_empty() noexcept {
169 if (_not_empty) {
170 _not_empty->set_value();
171 _not_empty = std::optional<promise<>>();
172 }
173}
174
175template <typename T>
176requires std::is_nothrow_move_constructible_v<T>
177inline
178void queue<T>::notify_not_full() noexcept {
179 if (_not_full) {
180 _not_full->set_value();
181 _not_full = std::optional<promise<>>();
182 }
183}
184
185template <typename T>
186requires std::is_nothrow_move_constructible_v<T>
187inline
188bool queue<T>::push(T&& data) {
189 if (_q.size() < _max) {
190 _q.push(std::move(data));
191 notify_not_empty();
192 return true;
193 } else {
194 return false;
195 }
196}
197
198template <typename T>
199requires std::is_nothrow_move_constructible_v<T>
200inline
201T& queue<T>::front() noexcept {
202 // std::queue::front() has no reason to throw
203 return _q.front();
204}
205
206template <typename T>
207requires std::is_nothrow_move_constructible_v<T>
208inline
209T queue<T>::pop() noexcept {
210 if (_q.size() == _max) {
211 notify_not_full();
212 }
213 // popping the front element must not throw
214 // as T is required to be nothrow_move_constructible
215 // and std::queue::pop won't throw since it uses
216 // seastar::circular_beffer::pop_front.
217 SEASTAR_ASSERT(!_q.empty());
218 T data = std::move(_q.front());
219 _q.pop();
220 return data;
221}
222
223template <typename T>
224// seastar allows only nothrow_move_constructible types
225// to be returned as future<T>
226requires std::is_nothrow_move_constructible_v<T>
227inline
229 if (_ex) {
230 return make_exception_future<T>(_ex);
231 }
232 if (empty()) {
233 return not_empty().then([this] {
234 if (_ex) {
235 return make_exception_future<T>(_ex);
236 } else {
237 return make_ready_future<T>(pop());
238 }
239 });
240 } else {
241 return make_ready_future<T>(pop());
242 }
243}
244
245template <typename T>
246requires std::is_nothrow_move_constructible_v<T>
247inline
249 if (_ex) {
250 return make_exception_future<>(_ex);
251 }
252 if (full()) {
253 return not_full().then([this, data = std::move(data)] () mutable {
254 _q.push(std::move(data));
255 notify_not_empty();
256 });
257 } else {
258 try {
259 _q.push(std::move(data));
260 notify_not_empty();
261 return make_ready_future<>();
262 } catch (...) {
264 }
265 }
266}
267
268template <typename T>
269requires std::is_nothrow_move_constructible_v<T>
270template <typename Func>
271inline
272bool queue<T>::consume(Func&& func) {
273 if (_ex) {
274 std::rethrow_exception(_ex);
275 }
276 bool running = true;
277 while (!_q.empty() && running) {
278 running = func(std::move(_q.front()));
279 _q.pop();
280 }
281 if (!full()) {
282 notify_not_full();
283 }
284 return running;
285}
286
287template <typename T>
288requires std::is_nothrow_move_constructible_v<T>
289inline
290bool queue<T>::empty() const noexcept {
291 // std::queue::empty() has no reason to throw
292 return _q.empty();
293}
294
295template <typename T>
296requires std::is_nothrow_move_constructible_v<T>
297inline
298bool queue<T>::full() const noexcept {
299 // std::queue::size() has no reason to throw
300 return _q.size() >= _max;
301}
302
303template <typename T>
304requires std::is_nothrow_move_constructible_v<T>
305inline
307 if (_ex) {
308 return make_exception_future<>(_ex);
309 }
310 if (!empty()) {
311 return make_ready_future<>();
312 } else {
313 _not_empty = promise<>();
314 return _not_empty->get_future();
315 }
316}
317
318template <typename T>
319requires std::is_nothrow_move_constructible_v<T>
320inline
322 if (_ex) {
323 return make_exception_future<>(_ex);
324 }
325 if (!full()) {
326 return make_ready_future<>();
327 } else {
328 _not_full = promise<>();
329 return _not_full->get_future();
330 }
331}
332
333}
334
A representation of a possibly not-yet-computed value.
Definition: future.hh:1199
Definition: queue.hh:45
size_t size() const noexcept
Returns the number of items currently in the queue.
Definition: queue.hh:110
bool push(T &&a)
Push an item.
Definition: queue.hh:188
future not_full() noexcept
Definition: queue.hh:321
bool full() const noexcept
Returns true when the queue is full.
Definition: queue.hh:298
T pop() noexcept
Pop an item.
Definition: queue.hh:209
bool consume(Func &&func)
Definition: queue.hh:272
future not_empty() noexcept
Definition: queue.hh:306
future< T > pop_eventually() noexcept
Definition: queue.hh:228
size_t max_size() const noexcept
Definition: queue.hh:118
void set_max_size(size_t max) noexcept
Definition: queue.hh:123
T & front() noexcept
access the front element in the queue
Definition: queue.hh:201
void abort(std::exception_ptr ex) noexcept
Definition: queue.hh:132
bool empty() const noexcept
Returns true when the queue is empty.
Definition: queue.hh:290
bool has_blocked_consumer() const noexcept
Check if there is an active consumer.
Definition: queue.hh:153
future push_eventually(T &&data) noexcept
Definition: queue.hh:248
future< T > current_exception_as_future() noexcept
Returns std::current_exception() wrapped in a future.
Definition: future.hh:1932
Seastar API namespace.
Definition: abort_on_ebadf.hh:26