Seastar
High performance C++ framework for concurrent servers
pipe.hh
1 /*
2  * This file is open source software, licensed to you under the terms
3  * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4  * distributed with this work for additional information regarding copyright
5  * ownership. You may not use this file except in compliance with the License.
6  *
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an
13  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14  * KIND, either express or implied. See the License for the
15  * specific language governing permissions and limitations
16  * under the License.
17  */
18 /*
19  * Copyright (C) 2015 Cloudius Systems, Ltd.
20  */
21 
22 #pragma once
23 
24 #include <seastar/core/future.hh>
25 #include <seastar/core/queue.hh>
26 
27 #include <seastar/util/std-compat.hh>
28 #include <seastar/util/modules.hh>
29 
60 
62 namespace seastar {
63 
64 
65 
68 SEASTAR_MODULE_EXPORT
69 class broken_pipe_exception : public std::exception {
70 public:
71  virtual const char* what() const noexcept {
72  return "Broken pipe";
73  }
74 };
75 
76 SEASTAR_MODULE_EXPORT
77 class unread_overflow_exception : public std::exception {
78 public:
79  virtual const char* what() const noexcept {
80  return "pipe_reader::unread() overflow";
81  }
82 };
83 
85 namespace internal {
86 template <typename T>
87 class pipe_buffer {
88 private:
90  bool _read_open = true;
91  bool _write_open = true;
92 public:
93  pipe_buffer(size_t size) : _buf(size) {}
94  future<std::optional<T>> read() {
95  return _buf.pop_eventually();
96  }
97  future<> write(T&& data) {
98  return _buf.push_eventually(std::move(data));
99  }
100  bool readable() const {
101  return _write_open || !_buf.empty();
102  }
103  bool writeable() const {
104  return _read_open;
105  }
106  bool close_read() {
107  // If a writer blocking (on a full queue), need to stop it.
108  if (_buf.full()) {
109  _buf.abort(std::make_exception_ptr(broken_pipe_exception()));
110  }
111  _read_open = false;
112  return !_write_open;
113  }
114  bool close_write() {
115  // If the queue is empty, write the EOF (disengaged optional) to the
116  // queue to wake a blocked reader. If the queue is not empty, there is
117  // no need to write the EOF to the queue - the reader will return an
118  // EOF when it sees that _write_open == false.
119  if (_buf.empty()) {
120  _buf.push({});
121  }
122  _write_open = false;
123  return !_read_open;
124  }
125 };
126 } // namespace internal
128 
129 SEASTAR_MODULE_EXPORT_BEGIN
130 template <typename T>
131 class pipe;
132 
138 template <typename T>
139 class pipe_reader {
140 private:
141  internal::pipe_buffer<T> *_bufp;
142  std::optional<T> _unread;
143  pipe_reader(internal::pipe_buffer<T> *bufp) noexcept : _bufp(bufp) { }
144  friend class pipe<T>;
145 public:
153  if (_unread) {
154  auto ret = std::move(*_unread);
155  _unread = {};
156  return make_ready_future<std::optional<T>>(std::move(ret));
157  }
158  if (_bufp->readable()) {
159  return _bufp->read();
160  } else {
161  return make_ready_future<std::optional<T>>();
162  }
163  }
173  void unread(T&& item) {
174  if (_unread) {
176  }
177  _unread = std::move(item);
178  }
179  ~pipe_reader() {
180  if (_bufp && _bufp->close_read()) {
181  delete _bufp;
182  }
183  }
184  // Allow move, but not copy, of pipe_reader
185  pipe_reader(pipe_reader&& other) noexcept : _bufp(other._bufp) {
186  other._bufp = nullptr;
187  }
188  pipe_reader& operator=(pipe_reader&& other) noexcept {
189  std::swap(_bufp, other._bufp);
190  return *this;
191  }
192 };
193 
199 template <typename T>
200 class pipe_writer {
201 private:
202  internal::pipe_buffer<T> *_bufp;
203  pipe_writer(internal::pipe_buffer<T> *bufp) noexcept : _bufp(bufp) { }
204  friend class pipe<T>;
205 public:
212  future<> write(T&& data) {
213  if (_bufp->writeable()) {
214  return _bufp->write(std::move(data));
215  } else {
216  return make_exception_future<>(broken_pipe_exception());
217  }
218  }
219  ~pipe_writer() {
220  if (_bufp && _bufp->close_write()) {
221  delete _bufp;
222  }
223  }
224  // Allow move, but not copy, of pipe_writer
225  pipe_writer(pipe_writer&& other) noexcept : _bufp(other._bufp) {
226  other._bufp = nullptr;
227  }
228  pipe_writer& operator=(pipe_writer&& other) noexcept {
229  std::swap(_bufp, other._bufp);
230  return *this;
231  }
232 };
233 
261 template <typename T>
262 class pipe {
263 public:
264  pipe_reader<T> reader;
265  pipe_writer<T> writer;
266  explicit pipe(size_t size) : pipe(new internal::pipe_buffer<T>(size)) { }
267 private:
268  pipe(internal::pipe_buffer<T> *bufp) noexcept : reader(bufp), writer(bufp) { }
269 };
270 
271 SEASTAR_MODULE_EXPORT_END
272 
274 
275 } // namespace seastar
Definition: pipe.hh:69
Read side of a seastar::pipe.
Definition: pipe.hh:139
void unread(T &&item)
Return an item to the front of the pipe.
Definition: pipe.hh:173
future< std::optional< T > > read()
Read next item from the pipe.
Definition: pipe.hh:152
Write side of a seastar::pipe.
Definition: pipe.hh:200
future write(T &&data)
Write an item to the pipe.
Definition: pipe.hh:212
A fixed-size pipe for communicating between two fibers.
Definition: pipe.hh:262
Definition: queue.hh:44
future push_eventually(T &&data) noexcept
Definition: queue.hh:250
bool full() const noexcept
Returns true when the queue is full.
Definition: queue.hh:300
future< T > pop_eventually() noexcept
Definition: queue.hh:225
void abort(std::exception_ptr ex) noexcept
Definition: queue.hh:131
bool empty() const noexcept
Returns true when the queue is empty.
Definition: queue.hh:292
bool push(T &&a)
Push an item.
Definition: queue.hh:187
Seastar API namespace.
Definition: abort_on_ebadf.hh:26