Seastar
High performance C++ framework for concurrent servers
iostream.hh
1 /*
2  * This file is open source software, licensed to you under the terms
3  * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4  * distributed with this work for additional information regarding copyright
5  * ownership. You may not use this file except in compliance with the License.
6  *
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an
13  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14  * KIND, either express or implied. See the License for the
15  * specific language governing permissions and limitations
16  * under the License.
17  */
18 
19 /*
20  * Copyright (C) 2015 Cloudius Systems, Ltd.
21  */
22 
23 //
24 // Buffered input and output streams
25 //
26 // Two abstract classes (data_source and data_sink) provide means
27 // to acquire bulk data from, or push bulk data to, some provider.
28 // These could be tied to a TCP connection, a disk file, or a memory
29 // buffer.
30 //
31 // Two concrete classes (input_stream and output_stream) buffer data
32 // from data_source and data_sink and provide easier means to process
33 // it.
34 //
35 
36 #pragma once
37 
38 #include <seastar/core/future.hh>
39 #include <seastar/core/temporary_buffer.hh>
40 #include <seastar/core/scattered_message.hh>
41 #include <seastar/util/std-compat.hh>
42 
43 namespace seastar {
44 
45 namespace net { class packet; }
46 
48 public:
49  virtual ~data_source_impl() {}
50  virtual future<temporary_buffer<char>> get() = 0;
51  virtual future<temporary_buffer<char>> skip(uint64_t n);
52  virtual future<> close() { return make_ready_future<>(); }
53 };
54 
55 class data_source {
56  std::unique_ptr<data_source_impl> _dsi;
57 protected:
58  data_source_impl* impl() const { return _dsi.get(); }
59 public:
61 
62  data_source() noexcept = default;
63  explicit data_source(std::unique_ptr<data_source_impl> dsi) noexcept : _dsi(std::move(dsi)) {}
64  data_source(data_source&& x) noexcept = default;
65  data_source& operator=(data_source&& x) noexcept = default;
66 
67  future<tmp_buf> get() noexcept {
68  try {
69  return _dsi->get();
70  } catch (...) {
71  return current_exception_as_future<tmp_buf>();
72  }
73  }
74  future<tmp_buf> skip(uint64_t n) noexcept {
75  try {
76  return _dsi->skip(n);
77  } catch (...) {
78  return current_exception_as_future<tmp_buf>();
79  }
80  }
81  future<> close() noexcept {
82  try {
83  return _dsi->close();
84  } catch (...) {
85  return current_exception_as_future<>();
86  }
87  }
88 };
89 
91 public:
92  virtual ~data_sink_impl() {}
93  virtual temporary_buffer<char> allocate_buffer(size_t size) {
94  return temporary_buffer<char>(size);
95  }
96  virtual future<> put(net::packet data) = 0;
97  virtual future<> put(std::vector<temporary_buffer<char>> data) {
98  net::packet p;
99  p.reserve(data.size());
100  for (auto& buf : data) {
101  p = net::packet(std::move(p), net::fragment{buf.get_write(), buf.size()}, buf.release());
102  }
103  return put(std::move(p));
104  }
105  virtual future<> put(temporary_buffer<char> buf) {
106  return put(net::packet(net::fragment{buf.get_write(), buf.size()}, buf.release()));
107  }
108  virtual future<> flush() {
109  return make_ready_future<>();
110  }
111  virtual future<> close() = 0;
112 
113  // The method should return the maximum buffer size that's acceptable by
114  // the sink. It's used when the output stream is constructed without any
115  // specific buffer size. In this case the stream accepts this value as its
116  // buffer size and doesn't put larger buffers (see trim_to_size).
117  virtual size_t buffer_size() const noexcept {
118  assert(false && "Data sink must have the buffer_size() method overload");
119  return 0;
120  }
121 };
122 
123 class data_sink {
124  std::unique_ptr<data_sink_impl> _dsi;
125 public:
126  data_sink() noexcept = default;
127  explicit data_sink(std::unique_ptr<data_sink_impl> dsi) noexcept : _dsi(std::move(dsi)) {}
128  data_sink(data_sink&& x) noexcept = default;
129  data_sink& operator=(data_sink&& x) noexcept = default;
130  temporary_buffer<char> allocate_buffer(size_t size) {
131  return _dsi->allocate_buffer(size);
132  }
133  future<> put(std::vector<temporary_buffer<char>> data) noexcept {
134  try {
135  return _dsi->put(std::move(data));
136  } catch (...) {
138  }
139  }
140  future<> put(temporary_buffer<char> data) noexcept {
141  try {
142  return _dsi->put(std::move(data));
143  } catch (...) {
145  }
146  }
147  future<> put(net::packet p) noexcept {
148  try {
149  return _dsi->put(std::move(p));
150  } catch (...) {
152  }
153  }
154  future<> flush() noexcept {
155  try {
156  return _dsi->flush();
157  } catch (...) {
159  }
160  }
161  future<> close() noexcept {
162  try {
163  return _dsi->close();
164  } catch (...) {
166  }
167  }
168 
169  size_t buffer_size() const noexcept { return _dsi->buffer_size(); }
170 };
171 
173 
174 template <typename CharType>
176 public:
178  explicit stop_consuming(tmp_buf buf) : _buf(std::move(buf)) {}
179 
180  tmp_buf& get_buffer() { return _buf; }
181  const tmp_buf& get_buffer() const { return _buf; }
182 private:
183  tmp_buf _buf;
184 };
185 
186 class skip_bytes {
187 public:
188  explicit skip_bytes(uint64_t v) : _value(v) {}
189  uint64_t get_value() const { return _value; }
190 private:
191  uint64_t _value;
192 };
193 
194 template <typename CharType>
196 public:
198  using consumption_variant = std::variant<continue_consuming, stop_consuming_type, skip_bytes>;
199  using tmp_buf = typename stop_consuming_type::tmp_buf;
200 
201  /*[[deprecated]]*/ consumption_result(std::optional<tmp_buf> opt_buf) {
202  if (opt_buf) {
203  _result = stop_consuming_type{std::move(opt_buf.value())};
204  }
205  }
206 
208  consumption_result(stop_consuming_type&& stop) : _result(std::move(stop)) {}
209  consumption_result(skip_bytes&& skip) : _result(std::move(skip)) {}
210 
211  consumption_variant& get() { return _result; }
212  const consumption_variant& get() const { return _result; }
213 
214 private:
215  consumption_variant _result;
216 };
217 
218 // Consumer concept, for consume() method
219 SEASTAR_CONCEPT(
220 // The consumer should operate on the data given to it, and
221 // return a future "consumption result", which can be
222 // - continue_consuming, if the consumer has consumed all the input given
223 // to it and is ready for more
224 // - stop_consuming, when the consumer is done (and in that case
225 // the contained buffer is the unconsumed part of the last data buffer - this
226 // can also happen to be empty).
227 // - skip_bytes, when the consumer has consumed all the input given to it
228 // and wants to skip before processing the next chunk
229 //
230 // For backward compatibility reasons, we also support the deprecated return value
231 // of type "unconsumed remainder" which can be
232 // - empty optional, if the consumer consumed all the input given to it
233 // and is ready for more
234 // - non-empty optional, when the consumer is done (and in that case
235 // the value is the unconsumed part of the last data buffer - this
236 // can also happen to be empty).
237 
238 template <typename Consumer, typename CharType>
239 concept InputStreamConsumer = requires (Consumer c) {
240  { c(temporary_buffer<CharType>{}) } -> std::same_as<future<consumption_result<CharType>>>;
241 };
242 
243 template <typename Consumer, typename CharType>
244 concept ObsoleteInputStreamConsumer = requires (Consumer c) {
245  { c(temporary_buffer<CharType>{}) } -> std::same_as<future<std::optional<temporary_buffer<CharType>>>>;
246 };
247 )
248 
253 template <typename CharType>
254 class input_stream final {
255  static_assert(sizeof(CharType) == 1, "must buffer stream of bytes");
256  data_source _fd;
258  bool _eof = false;
259 private:
261  size_t available() const noexcept { return _buf.size(); }
262 protected:
263  void reset() noexcept { _buf = {}; }
264  data_source* fd() noexcept { return &_fd; }
265 public:
267  // unconsumed_remainder is mapped for compatibility only; new code should use consumption_result_type
268  using unconsumed_remainder = std::optional<tmp_buf>;
269  using char_type = CharType;
270  input_stream() noexcept = default;
271  explicit input_stream(data_source fd) noexcept : _fd(std::move(fd)), _buf() {}
272  input_stream(input_stream&&) = default;
273  input_stream& operator=(input_stream&&) = default;
284  future<temporary_buffer<CharType>> read_exactly(size_t n) noexcept;
285  template <typename Consumer>
286  SEASTAR_CONCEPT(requires InputStreamConsumer<Consumer, CharType> || ObsoleteInputStreamConsumer<Consumer, CharType>)
287  future<> consume(Consumer&& c) noexcept(std::is_nothrow_move_constructible_v<Consumer>);
288  template <typename Consumer>
289  SEASTAR_CONCEPT(requires InputStreamConsumer<Consumer, CharType> || ObsoleteInputStreamConsumer<Consumer, CharType>)
290  future<> consume(Consumer& c) noexcept(std::is_nothrow_move_constructible_v<Consumer>);
291  bool eof() const noexcept { return _eof; }
294  future<tmp_buf> read() noexcept;
297  future<tmp_buf> read_up_to(size_t n) noexcept;
307  future<> close() noexcept {
308  return _fd.close();
309  }
311  future<> skip(uint64_t n) noexcept;
312 
324  data_source detach() &&;
325 private:
326  future<temporary_buffer<CharType>> read_exactly_part(size_t n, tmp_buf buf, size_t completed) noexcept;
327 };
328 
330  bool trim_to_size = false;
332  bool batch_flushes = false;
333 };
334 
346 template <typename CharType>
347 class output_stream final {
348  static_assert(sizeof(CharType) == 1, "must buffer stream of bytes");
349  data_sink _fd;
351  net::packet _zc_bufs = net::packet::make_null_packet(); //zero copy buffers
352  size_t _size = 0;
353  size_t _begin = 0;
354  size_t _end = 0;
355  bool _trim_to_size = false;
356  bool _batch_flushes = false;
357  std::optional<promise<>> _in_batch;
358  bool _flush = false;
359  bool _flushing = false;
360  std::exception_ptr _ex;
361 private:
362  size_t available() const noexcept { return _end - _begin; }
363  size_t possibly_available() const noexcept { return _size - _begin; }
364  future<> split_and_put(temporary_buffer<CharType> buf) noexcept;
365  future<> put(temporary_buffer<CharType> buf) noexcept;
366  void poll_flush() noexcept;
367  future<> zero_copy_put(net::packet p) noexcept;
368  future<> zero_copy_split_and_put(net::packet p) noexcept;
369  [[gnu::noinline]]
370  future<> slow_write(const CharType* buf, size_t n) noexcept;
371 public:
372  using char_type = CharType;
373  output_stream() noexcept = default;
374  output_stream(data_sink fd, size_t size, output_stream_options opts = {}) noexcept
375  : _fd(std::move(fd)), _size(size), _trim_to_size(opts.trim_to_size), _batch_flushes(opts.batch_flushes) {}
376  [[deprecated("use output_stream_options instead of booleans")]]
377  output_stream(data_sink fd, size_t size, bool trim_to_size, bool batch_flushes = false) noexcept
378  : _fd(std::move(fd)), _size(size), _trim_to_size(trim_to_size), _batch_flushes(batch_flushes) {}
379  output_stream(data_sink fd) noexcept
380  : _fd(std::move(fd)), _size(_fd.buffer_size()), _trim_to_size(true) {}
381  output_stream(output_stream&&) noexcept = default;
382  output_stream& operator=(output_stream&&) noexcept = default;
383  ~output_stream() { assert(!_in_batch && "Was this stream properly closed?"); }
384  future<> write(const char_type* buf, size_t n) noexcept;
385  future<> write(const char_type* buf) noexcept;
386 
387  template <typename StringChar, typename SizeType, SizeType MaxSize, bool NulTerminate>
389  future<> write(const std::basic_string<char_type>& s) noexcept;
390 
391  future<> write(net::packet p) noexcept;
392  future<> write(scattered_message<char_type> msg) noexcept;
393  future<> write(temporary_buffer<char_type>) noexcept;
394  future<> flush() noexcept;
395 
399  future<> close() noexcept;
400 
412  data_sink detach() &&;
413 private:
414  friend class reactor;
415 };
416 
420 template <typename CharType>
422 
423 }
424 
425 #include "iostream-impl.hh"
Definition: sstring.hh:58
Definition: iostream.hh:195
Definition: iostream.hh:90
Definition: iostream.hh:123
Definition: iostream.hh:47
Definition: iostream.hh:55
A representation of a possibly not-yet-computed value.
Definition: future.hh:1337
Definition: iostream.hh:254
future close() noexcept
Definition: iostream.hh:307
Definition: packet.hh:79
Definition: iostream.hh:347
data_sink detach() &&
Definition: iostream-impl.hh:517
future close() noexcept
Definition: iostream-impl.hh:498
Definition: reactor.hh:192
Definition: scattered_message.hh:35
Definition: iostream.hh:186
Definition: iostream.hh:175
deleter release() noexcept
Definition: temporary_buffer.hh:198
CharType * get_write() noexcept
Definition: temporary_buffer.hh:123
size_t size() const noexcept
Gets the buffer size.
Definition: temporary_buffer.hh:125
future< T... > current_exception_as_future() noexcept
Returns std::current_exception() wrapped in a future.
Definition: future.hh:2065
holds the implementation parts of the metrics layer, do not use directly.
Definition: packet.hh:38
Seastar API namespace.
Definition: abort_on_ebadf.hh:24
bool batch_flushes
Try to merge flushes with each other.
Definition: iostream.hh:332
bool trim_to_size
Definition: iostream.hh:330
future copy(input_stream< CharType > &in, output_stream< CharType > &out)
copy all the content from the input stream to the output stream
Definition: iostream-impl.hh:552
Definition: iostream.hh:172
Definition: iostream.hh:329