Seastar
High performance C++ framework for concurrent servers
iostream.hh
1 /*
2  * This file is open source software, licensed to you under the terms
3  * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4  * distributed with this work for additional information regarding copyright
5  * ownership. You may not use this file except in compliance with the License.
6  *
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an
13  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14  * KIND, either express or implied. See the License for the
15  * specific language governing permissions and limitations
16  * under the License.
17  */
18 
19 /*
20  * Copyright (C) 2015 Cloudius Systems, Ltd.
21  */
22 
23 //
24 // Buffered input and output streams
25 //
26 // Two abstract classes (data_source and data_sink) provide means
27 // to acquire bulk data from, or push bulk data to, some provider.
28 // These could be tied to a TCP connection, a disk file, or a memory
29 // buffer.
30 //
31 // Two concrete classes (input_stream and output_stream) buffer data
32 // from data_source and data_sink and provide easier means to process
33 // it.
34 //
35 
36 #pragma once
37 
38 #include <seastar/core/future.hh>
39 #include <seastar/core/temporary_buffer.hh>
40 #include <seastar/core/scattered_message.hh>
41 #include <seastar/util/std-compat.hh>
42 
43 namespace seastar {
44 
45 namespace net { class packet; }
46 
48 public:
49  virtual ~data_source_impl() {}
50  virtual future<temporary_buffer<char>> get() = 0;
51  virtual future<temporary_buffer<char>> skip(uint64_t n);
52  virtual future<> close() { return make_ready_future<>(); }
53 };
54 
55 class data_source {
56  std::unique_ptr<data_source_impl> _dsi;
57 protected:
58  data_source_impl* impl() const { return _dsi.get(); }
59 public:
60  data_source() noexcept = default;
61  explicit data_source(std::unique_ptr<data_source_impl> dsi) noexcept : _dsi(std::move(dsi)) {}
62  data_source(data_source&& x) noexcept = default;
63  data_source& operator=(data_source&& x) noexcept = default;
64  future<temporary_buffer<char>> get() { return _dsi->get(); }
65  future<temporary_buffer<char>> skip(uint64_t n) { return _dsi->skip(n); }
66  future<> close() { return _dsi->close(); }
67 };
68 
70 public:
71  virtual ~data_sink_impl() {}
72  virtual temporary_buffer<char> allocate_buffer(size_t size) {
73  return temporary_buffer<char>(size);
74  }
75  virtual future<> put(net::packet data) = 0;
76  virtual future<> put(std::vector<temporary_buffer<char>> data) {
77  net::packet p;
78  p.reserve(data.size());
79  for (auto& buf : data) {
80  p = net::packet(std::move(p), net::fragment{buf.get_write(), buf.size()}, buf.release());
81  }
82  return put(std::move(p));
83  }
84  virtual future<> put(temporary_buffer<char> buf) {
85  return put(net::packet(net::fragment{buf.get_write(), buf.size()}, buf.release()));
86  }
87  virtual future<> flush() {
88  return make_ready_future<>();
89  }
90  virtual future<> close() = 0;
91 };
92 
93 class data_sink {
94  std::unique_ptr<data_sink_impl> _dsi;
95 public:
96  data_sink() noexcept = default;
97  explicit data_sink(std::unique_ptr<data_sink_impl> dsi) noexcept : _dsi(std::move(dsi)) {}
98  data_sink(data_sink&& x) noexcept = default;
99  data_sink& operator=(data_sink&& x) noexcept = default;
100  temporary_buffer<char> allocate_buffer(size_t size) {
101  return _dsi->allocate_buffer(size);
102  }
103  future<> put(std::vector<temporary_buffer<char>> data) {
104  return _dsi->put(std::move(data));
105  }
106  future<> put(temporary_buffer<char> data) {
107  return _dsi->put(std::move(data));
108  }
109  future<> put(net::packet p) {
110  return _dsi->put(std::move(p));
111  }
112  future<> flush() {
113  return _dsi->flush();
114  }
115  future<> close() { return _dsi->close(); }
116 };
117 
119 
120 template <typename CharType>
122 public:
124  explicit stop_consuming(tmp_buf buf) : _buf(std::move(buf)) {}
125 
126  tmp_buf& get_buffer() { return _buf; }
127  const tmp_buf& get_buffer() const { return _buf; }
128 private:
129  tmp_buf _buf;
130 };
131 
132 class skip_bytes {
133 public:
134  explicit skip_bytes(uint64_t v) : _value(v) {}
135  uint64_t get_value() const { return _value; }
136 private:
137  uint64_t _value;
138 };
139 
140 template <typename CharType>
142 public:
144  using consumption_variant = std::variant<continue_consuming, stop_consuming_type, skip_bytes>;
145  using tmp_buf = typename stop_consuming_type::tmp_buf;
146 
147  /*[[deprecated]]*/ consumption_result(std::optional<tmp_buf> opt_buf) {
148  if (opt_buf) {
149  _result = stop_consuming_type{std::move(opt_buf.value())};
150  }
151  }
152 
154  consumption_result(stop_consuming_type&& stop) : _result(std::move(stop)) {}
155  consumption_result(skip_bytes&& skip) : _result(std::move(skip)) {}
156 
157  consumption_variant& get() { return _result; }
158  const consumption_variant& get() const { return _result; }
159 
160 private:
161  consumption_variant _result;
162 };
163 
164 // Consumer concept, for consume() method
165 SEASTAR_CONCEPT(
166 // The consumer should operate on the data given to it, and
167 // return a future "consumption result", which can be
168 // - continue_consuming, if the consumer has consumed all the input given
169 // to it and is ready for more
170 // - stop_consuming, when the consumer is done (and in that case
171 // the contained buffer is the unconsumed part of the last data buffer - this
172 // can also happen to be empty).
173 // - skip_bytes, when the consumer has consumed all the input given to it
174 // and wants to skip before processing the next chunk
175 //
176 // For backward compatibility reasons, we also support the deprecated return value
177 // of type "unconsumed remainder" which can be
178 // - empty optional, if the consumer consumed all the input given to it
179 // and is ready for more
180 // - non-empty optional, when the consumer is done (and in that case
181 // the value is the unconsumed part of the last data buffer - this
182 // can also happen to be empty).
183 
184 template <typename Consumer, typename CharType>
185 concept InputStreamConsumer = requires (Consumer c) {
186  { c(temporary_buffer<CharType>{}) } -> std::same_as<future<consumption_result<CharType>>>;
187 };
188 
189 template <typename Consumer, typename CharType>
190 concept ObsoleteInputStreamConsumer = requires (Consumer c) {
191  { c(temporary_buffer<CharType>{}) } -> std::same_as<future<std::optional<temporary_buffer<CharType>>>>;
192 };
193 )
194 
199 template <typename CharType>
200 class input_stream final {
201  static_assert(sizeof(CharType) == 1, "must buffer stream of bytes");
202  data_source _fd;
204  bool _eof = false;
205 private:
207  size_t available() const { return _buf.size(); }
208 protected:
209  void reset() { _buf = {}; }
210  data_source* fd() { return &_fd; }
211 public:
213  // unconsumed_remainder is mapped for compatibility only; new code should use consumption_result_type
214  using unconsumed_remainder = std::optional<tmp_buf>;
215  using char_type = CharType;
216  input_stream() noexcept = default;
217  explicit input_stream(data_source fd) noexcept : _fd(std::move(fd)), _buf() {}
218  input_stream(input_stream&&) = default;
219  input_stream& operator=(input_stream&&) = default;
230  future<temporary_buffer<CharType>> read_exactly(size_t n);
231  template <typename Consumer>
232  SEASTAR_CONCEPT(requires InputStreamConsumer<Consumer, CharType> || ObsoleteInputStreamConsumer<Consumer, CharType>)
233  future<> consume(Consumer&& c);
234  template <typename Consumer>
235  SEASTAR_CONCEPT(requires InputStreamConsumer<Consumer, CharType> || ObsoleteInputStreamConsumer<Consumer, CharType>)
236  future<> consume(Consumer& c);
237  bool eof() const { return _eof; }
240  future<tmp_buf> read();
243  future<tmp_buf> read_up_to(size_t n);
254  return _fd.close();
255  }
257  future<> skip(uint64_t n);
258 
270  data_source detach() &&;
271 private:
272  future<temporary_buffer<CharType>> read_exactly_part(size_t n, tmp_buf buf, size_t completed);
273 };
274 
286 template <typename CharType>
287 class output_stream final {
288  static_assert(sizeof(CharType) == 1, "must buffer stream of bytes");
289  data_sink _fd;
291  net::packet _zc_bufs = net::packet::make_null_packet(); //zero copy buffers
292  size_t _size = 0;
293  size_t _begin = 0;
294  size_t _end = 0;
295  bool _trim_to_size = false;
296  bool _batch_flushes = false;
297  std::optional<promise<>> _in_batch;
298  bool _flush = false;
299  bool _flushing = false;
300  std::exception_ptr _ex;
301 private:
302  size_t available() const { return _end - _begin; }
303  size_t possibly_available() const { return _size - _begin; }
304  future<> split_and_put(temporary_buffer<CharType> buf);
306  void poll_flush();
307  future<> zero_copy_put(net::packet p);
308  future<> zero_copy_split_and_put(net::packet p);
309  [[gnu::noinline]]
310  future<> slow_write(const CharType* buf, size_t n);
311 public:
312  using char_type = CharType;
313  output_stream() noexcept = default;
314  output_stream(data_sink fd, size_t size, bool trim_to_size = false, bool batch_flushes = false) noexcept
315  : _fd(std::move(fd)), _size(size), _trim_to_size(trim_to_size), _batch_flushes(batch_flushes) {}
316  output_stream(output_stream&&) noexcept = default;
317  output_stream& operator=(output_stream&&) noexcept = default;
318  ~output_stream() { assert(!_in_batch && "Was this stream properly closed?"); }
319  future<> write(const char_type* buf, size_t n);
320  future<> write(const char_type* buf);
321 
322  template <typename StringChar, typename SizeType, SizeType MaxSize, bool NulTerminate>
324  future<> write(const std::basic_string<char_type>& s);
325 
326  future<> write(net::packet p);
329  future<> flush();
330 
334  future<> close();
335 
347  data_sink detach() &&;
348 private:
349  friend class reactor;
350 };
351 
355 template <typename CharType>
357 
358 }
359 
360 #include "iostream-impl.hh"
seastar::net::fragment
Definition: packet.hh:38
seastar::scattered_message
Definition: scattered_message.hh:35
seastar::data_source
Definition: iostream.hh:55
seastar
Seastar API namespace.
Definition: abort_on_ebadf.hh:24
seastar::basic_sstring
Definition: sstring.hh:58
seastar::reactor
Definition: reactor.hh:187
seastar::skip_bytes
Definition: iostream.hh:132
seastar::stop_consuming
Definition: iostream.hh:121
seastar::copy
future copy(input_stream< CharType > &in, output_stream< CharType > &out)
copy all the content from the input stream to the output stream
Definition: iostream-impl.hh:530
seastar::data_source_impl
Definition: iostream.hh:47
seastar::net::packet
Definition: packet.hh:79
seastar::output_stream::detach
data_sink detach() &&
Definition: iostream-impl.hh:495
seastar::output_stream::close
future close()
Definition: iostream-impl.hh:476
seastar::input_stream
Definition: iostream.hh:200
seastar::temporary_buffer< char >
seastar::temporary_buffer::size
size_t size() const
Gets the buffer size.
Definition: temporary_buffer.hh:125
seastar::data_sink_impl
Definition: iostream.hh:69
seastar::temporary_buffer::release
deleter release()
Definition: temporary_buffer.hh:198
seastar::future
A representation of a possibly not-yet-computed value.
Definition: future.hh:1337
impl
holds the implementation parts of the metrics layer, do not use directly.
seastar::continue_consuming
Definition: iostream.hh:118
seastar::output_stream
Definition: iostream.hh:287
seastar::input_stream::close
future close()
Definition: iostream.hh:253
seastar::data_sink
Definition: iostream.hh:93
seastar::consumption_result
Definition: iostream.hh:141
seastar::temporary_buffer::get_write
CharType * get_write()
Definition: temporary_buffer.hh:123