Seastar
High performance C++ framework for concurrent servers
pipe.hh
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18/*
19 * Copyright (C) 2015 Cloudius Systems, Ltd.
20 */
21
22#pragma once
23
24#include <seastar/core/future.hh>
25#include <seastar/core/queue.hh>
26
27#include <seastar/util/std-compat.hh>
28#include <seastar/util/modules.hh>
29
60
62namespace seastar {
63
64
65
68SEASTAR_MODULE_EXPORT
69class broken_pipe_exception : public std::exception {
70public:
71 virtual const char* what() const noexcept {
72 return "Broken pipe";
73 }
74};
75
76SEASTAR_MODULE_EXPORT
77class unread_overflow_exception : public std::exception {
78public:
79 virtual const char* what() const noexcept {
80 return "pipe_reader::unread() overflow";
81 }
82};
83
85namespace internal {
86template <typename T>
87class pipe_buffer {
88private:
90 bool _read_open = true;
91 bool _write_open = true;
92public:
93 pipe_buffer(size_t size) : _buf(size) {}
94 future<std::optional<T>> read() {
95 return _buf.pop_eventually();
96 }
97 future<> write(T&& data) {
98 return _buf.push_eventually(std::move(data));
99 }
100 bool readable() const {
101 return _write_open || !_buf.empty();
102 }
103 bool writeable() const {
104 return _read_open;
105 }
106 bool close_read() {
107 // If a writer blocking (on a full queue), need to stop it.
108 if (_buf.full()) {
109 _buf.abort(std::make_exception_ptr(broken_pipe_exception()));
110 }
111 _read_open = false;
112 return !_write_open;
113 }
114 bool close_write() {
115 // If the queue is empty, write the EOF (disengaged optional) to the
116 // queue to wake a blocked reader. If the queue is not empty, there is
117 // no need to write the EOF to the queue - the reader will return an
118 // EOF when it sees that _write_open == false.
119 if (_buf.empty()) {
120 _buf.push({});
121 }
122 _write_open = false;
123 return !_read_open;
124 }
125};
126} // namespace internal
128
129SEASTAR_MODULE_EXPORT_BEGIN
130template <typename T>
131class pipe;
132
138template <typename T>
140private:
141 internal::pipe_buffer<T> *_bufp;
142 std::optional<T> _unread;
143 pipe_reader(internal::pipe_buffer<T> *bufp) noexcept : _bufp(bufp) { }
144 friend class pipe<T>;
145public:
153 if (_unread) {
154 auto ret = std::move(*_unread);
155 _unread = {};
156 return make_ready_future<std::optional<T>>(std::move(ret));
157 }
158 if (_bufp->readable()) {
159 return _bufp->read();
160 } else {
161 return make_ready_future<std::optional<T>>();
162 }
163 }
173 void unread(T&& item) {
174 if (_unread) {
176 }
177 _unread = std::move(item);
178 }
179 ~pipe_reader() {
180 if (_bufp && _bufp->close_read()) {
181 delete _bufp;
182 }
183 }
184 // Allow move, but not copy, of pipe_reader
185 pipe_reader(pipe_reader&& other) noexcept : _bufp(other._bufp) {
186 other._bufp = nullptr;
187 }
188 pipe_reader& operator=(pipe_reader&& other) noexcept {
189 std::swap(_bufp, other._bufp);
190 return *this;
191 }
192};
193
199template <typename T>
201private:
202 internal::pipe_buffer<T> *_bufp;
203 pipe_writer(internal::pipe_buffer<T> *bufp) noexcept : _bufp(bufp) { }
204 friend class pipe<T>;
205public:
212 future<> write(T&& data) {
213 if (_bufp->writeable()) {
214 return _bufp->write(std::move(data));
215 } else {
216 return make_exception_future<>(broken_pipe_exception());
217 }
218 }
219 ~pipe_writer() {
220 if (_bufp && _bufp->close_write()) {
221 delete _bufp;
222 }
223 }
224 // Allow move, but not copy, of pipe_writer
225 pipe_writer(pipe_writer&& other) noexcept : _bufp(other._bufp) {
226 other._bufp = nullptr;
227 }
228 pipe_writer& operator=(pipe_writer&& other) noexcept {
229 std::swap(_bufp, other._bufp);
230 return *this;
231 }
232};
233
261template <typename T>
262class pipe {
263public:
264 pipe_reader<T> reader;
265 pipe_writer<T> writer;
266 explicit pipe(size_t size) : pipe(new internal::pipe_buffer<T>(size)) { }
267private:
268 pipe(internal::pipe_buffer<T> *bufp) noexcept : reader(bufp), writer(bufp) { }
269};
270
271SEASTAR_MODULE_EXPORT_END
272
274
275} // namespace seastar
Definition: pipe.hh:69
A representation of a possibly not-yet-computed value.
Definition: future.hh:1240
Read side of a seastar::pipe.
Definition: pipe.hh:139
void unread(T &&item)
Return an item to the front of the pipe.
Definition: pipe.hh:173
future< std::optional< T > > read()
Read next item from the pipe.
Definition: pipe.hh:152
Write side of a seastar::pipe.
Definition: pipe.hh:200
future write(T &&data)
Write an item to the pipe.
Definition: pipe.hh:212
A fixed-size pipe for communicating between two fibers.
Definition: pipe.hh:262
Definition: queue.hh:44
bool push(T &&a)
Push an item.
Definition: queue.hh:187
bool full() const noexcept
Returns true when the queue is full.
Definition: queue.hh:300
future< T > pop_eventually() noexcept
Definition: queue.hh:225
void abort(std::exception_ptr ex) noexcept
Definition: queue.hh:131
bool empty() const noexcept
Returns true when the queue is empty.
Definition: queue.hh:292
future push_eventually(T &&data) noexcept
Definition: queue.hh:250
Seastar API namespace.
Definition: abort_on_ebadf.hh:26