24#include <seastar/core/future.hh>
25#include <seastar/core/queue.hh>
27#include <seastar/util/std-compat.hh>
28#include <seastar/util/modules.hh>
71 virtual const char* what()
const noexcept {
79 virtual const char* what()
const noexcept {
80 return "pipe_reader::unread() overflow";
90 bool _read_open =
true;
91 bool _write_open =
true;
93 pipe_buffer(
size_t size) : _buf(size) {}
94 future<std::optional<T>> read() {
97 future<> write(T&& data) {
100 bool readable()
const {
101 return _write_open || !_buf.
empty();
103 bool writeable()
const {
109 _buf.
abort(std::make_exception_ptr(broken_pipe_exception()));
129SEASTAR_MODULE_EXPORT_BEGIN
141 internal::pipe_buffer<T> *_bufp;
142 std::optional<T> _unread;
143 pipe_reader(internal::pipe_buffer<T> *bufp) noexcept : _bufp(bufp) { }
144 friend class pipe<T>;
154 auto ret = std::move(*_unread);
156 return make_ready_future<std::optional<T>>(std::move(ret));
158 if (_bufp->readable()) {
159 return _bufp->read();
161 return make_ready_future<std::optional<T>>();
177 _unread = std::move(item);
180 if (_bufp && _bufp->close_read()) {
185 pipe_reader(pipe_reader&& other) noexcept : _bufp(other._bufp) {
186 other._bufp =
nullptr;
188 pipe_reader& operator=(pipe_reader&& other)
noexcept {
189 std::swap(_bufp, other._bufp);
202 internal::pipe_buffer<T> *_bufp;
203 pipe_writer(internal::pipe_buffer<T> *bufp) noexcept : _bufp(bufp) { }
204 friend class pipe<T>;
213 if (_bufp->writeable()) {
214 return _bufp->write(std::move(data));
220 if (_bufp && _bufp->close_write()) {
225 pipe_writer(pipe_writer&& other) noexcept : _bufp(other._bufp) {
226 other._bufp =
nullptr;
228 pipe_writer& operator=(pipe_writer&& other)
noexcept {
229 std::swap(_bufp, other._bufp);
266 explicit pipe(
size_t size) :
pipe(
new internal::pipe_buffer<T>(size)) { }
268 pipe(internal::pipe_buffer<T> *bufp) noexcept : reader(bufp), writer(bufp) { }
271SEASTAR_MODULE_EXPORT_END
A representation of a possibly not-yet-computed value.
Definition: future.hh:1240
Read side of a seastar::pipe.
Definition: pipe.hh:139
void unread(T &&item)
Return an item to the front of the pipe.
Definition: pipe.hh:173
future< std::optional< T > > read()
Read next item from the pipe.
Definition: pipe.hh:152
Write side of a seastar::pipe.
Definition: pipe.hh:200
future write(T &&data)
Write an item to the pipe.
Definition: pipe.hh:212
A fixed-size pipe for communicating between two fibers.
Definition: pipe.hh:262
bool push(T &&a)
Push an item.
Definition: queue.hh:187
bool full() const noexcept
Returns true when the queue is full.
Definition: queue.hh:300
future< T > pop_eventually() noexcept
Definition: queue.hh:225
void abort(std::exception_ptr ex) noexcept
Definition: queue.hh:131
bool empty() const noexcept
Returns true when the queue is empty.
Definition: queue.hh:292
future push_eventually(T &&data) noexcept
Definition: queue.hh:250
Seastar API namespace.
Definition: abort_on_ebadf.hh:26