Seastar
High performance C++ framework for concurrent servers
iostream.hh
1 /*
2  * This file is open source software, licensed to you under the terms
3  * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4  * distributed with this work for additional information regarding copyright
5  * ownership. You may not use this file except in compliance with the License.
6  *
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an
13  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14  * KIND, either express or implied. See the License for the
15  * specific language governing permissions and limitations
16  * under the License.
17  */
18 
19 /*
20  * Copyright (C) 2015 Cloudius Systems, Ltd.
21  */
22 
23 //
24 // Buffered input and output streams
25 //
26 // Two abstract classes (data_source and data_sink) provide means
27 // to acquire bulk data from, or push bulk data to, some provider.
28 // These could be tied to a TCP connection, a disk file, or a memory
29 // buffer.
30 //
31 // Two concrete classes (input_stream and output_stream) buffer data
32 // from data_source and data_sink and provide easier means to process
33 // it.
34 //
35 
36 #pragma once
37 
38 #include <seastar/core/future.hh>
39 #include <seastar/core/temporary_buffer.hh>
40 #include <seastar/core/scattered_message.hh>
41 #include <seastar/util/std-compat.hh>
42 #include <seastar/util/modules.hh>
43 #ifndef SEASTAR_MODULE
44 #include <boost/intrusive/slist.hpp>
45 #include <algorithm>
46 #include <memory>
47 #include <optional>
48 #include <variant>
49 #include <vector>
50 #endif
51 
52 namespace bi = boost::intrusive;
53 
54 namespace seastar {
55 
56 SEASTAR_MODULE_EXPORT_BEGIN
57 
58 namespace net { class packet; }
59 
61 public:
62  virtual ~data_source_impl() {}
63  virtual future<temporary_buffer<char>> get() = 0;
64  virtual future<temporary_buffer<char>> skip(uint64_t n);
65  virtual future<> close() { return make_ready_future<>(); }
66 };
67 
68 class data_source {
69  std::unique_ptr<data_source_impl> _dsi;
70 protected:
71  data_source_impl* impl() const { return _dsi.get(); }
72 public:
74 
75  data_source() noexcept = default;
76  explicit data_source(std::unique_ptr<data_source_impl> dsi) noexcept : _dsi(std::move(dsi)) {}
77  data_source(data_source&& x) noexcept = default;
78  data_source& operator=(data_source&& x) noexcept = default;
79 
80  future<tmp_buf> get() noexcept {
81  try {
82  return _dsi->get();
83  } catch (...) {
84  return current_exception_as_future<tmp_buf>();
85  }
86  }
87  future<tmp_buf> skip(uint64_t n) noexcept {
88  try {
89  return _dsi->skip(n);
90  } catch (...) {
91  return current_exception_as_future<tmp_buf>();
92  }
93  }
94  future<> close() noexcept {
95  try {
96  return _dsi->close();
97  } catch (...) {
98  return current_exception_as_future<>();
99  }
100  }
101 };
102 
104 public:
105  virtual ~data_sink_impl() {}
106  virtual temporary_buffer<char> allocate_buffer(size_t size) {
107  return temporary_buffer<char>(size);
108  }
109  virtual future<> put(net::packet data) = 0;
110  virtual future<> put(std::vector<temporary_buffer<char>> data) {
111  net::packet p;
112  p.reserve(data.size());
113  for (auto& buf : data) {
114  p = net::packet(std::move(p), net::fragment{buf.get_write(), buf.size()}, buf.release());
115  }
116  return put(std::move(p));
117  }
118  virtual future<> put(temporary_buffer<char> buf) {
119  return put(net::packet(net::fragment{buf.get_write(), buf.size()}, buf.release()));
120  }
121  virtual future<> flush() {
122  return make_ready_future<>();
123  }
124  virtual future<> close() = 0;
125 
126  // The method should return the maximum buffer size that's acceptable by
127  // the sink. It's used when the output stream is constructed without any
128  // specific buffer size. In this case the stream accepts this value as its
129  // buffer size and doesn't put larger buffers (see trim_to_size).
130  virtual size_t buffer_size() const noexcept {
131  assert(false && "Data sink must have the buffer_size() method overload");
132  return 0;
133  }
134 
135  // In order to support flushes batching (output_stream_options.batch_flushes)
136  // the sink mush handle flush errors that may happen in the background by
137  // overriding the on_batch_flush_error() method. If the sink doesn't do it,
138  // turning on batch_flushes would have no effect
139  virtual bool can_batch_flushes() const noexcept {
140  return false;
141  }
142 
143  virtual void on_batch_flush_error() noexcept {
144  assert(false && "Data sink must implement on_batch_flush_error() method");
145  }
146 };
147 
148 class data_sink {
149  std::unique_ptr<data_sink_impl> _dsi;
150 public:
151  data_sink() noexcept = default;
152  explicit data_sink(std::unique_ptr<data_sink_impl> dsi) noexcept : _dsi(std::move(dsi)) {}
153  data_sink(data_sink&& x) noexcept = default;
154  data_sink& operator=(data_sink&& x) noexcept = default;
155  temporary_buffer<char> allocate_buffer(size_t size) {
156  return _dsi->allocate_buffer(size);
157  }
158  future<> put(std::vector<temporary_buffer<char>> data) noexcept {
159  try {
160  return _dsi->put(std::move(data));
161  } catch (...) {
163  }
164  }
165  future<> put(temporary_buffer<char> data) noexcept {
166  try {
167  return _dsi->put(std::move(data));
168  } catch (...) {
170  }
171  }
172  future<> put(net::packet p) noexcept {
173  try {
174  return _dsi->put(std::move(p));
175  } catch (...) {
177  }
178  }
179  future<> flush() noexcept {
180  try {
181  return _dsi->flush();
182  } catch (...) {
184  }
185  }
186  future<> close() noexcept {
187  try {
188  return _dsi->close();
189  } catch (...) {
191  }
192  }
193 
194  size_t buffer_size() const noexcept { return _dsi->buffer_size(); }
195  bool can_batch_flushes() const noexcept { return _dsi->can_batch_flushes(); }
196  void on_batch_flush_error() noexcept { _dsi->on_batch_flush_error(); }
197 };
198 
200 
201 template <typename CharType>
203 public:
205  explicit stop_consuming(tmp_buf buf) : _buf(std::move(buf)) {}
206 
207  tmp_buf& get_buffer() { return _buf; }
208  const tmp_buf& get_buffer() const { return _buf; }
209 private:
210  tmp_buf _buf;
211 };
212 
213 class skip_bytes {
214 public:
215  explicit skip_bytes(uint64_t v) : _value(v) {}
216  uint64_t get_value() const { return _value; }
217 private:
218  uint64_t _value;
219 };
220 
221 template <typename CharType>
223 public:
225  using consumption_variant = std::variant<continue_consuming, stop_consuming_type, skip_bytes>;
226  using tmp_buf = typename stop_consuming_type::tmp_buf;
227 
228  /*[[deprecated]]*/ consumption_result(std::optional<tmp_buf> opt_buf) {
229  if (opt_buf) {
230  _result = stop_consuming_type{std::move(opt_buf.value())};
231  }
232  }
233 
235  consumption_result(stop_consuming_type&& stop) : _result(std::move(stop)) {}
236  consumption_result(skip_bytes&& skip) : _result(std::move(skip)) {}
237 
238  consumption_variant& get() { return _result; }
239  const consumption_variant& get() const { return _result; }
240 
241 private:
242  consumption_variant _result;
243 };
244 
245 // Consumer concept, for consume() method
246 SEASTAR_CONCEPT(
247 // The consumer should operate on the data given to it, and
248 // return a future "consumption result", which can be
249 // - continue_consuming, if the consumer has consumed all the input given
250 // to it and is ready for more
251 // - stop_consuming, when the consumer is done (and in that case
252 // the contained buffer is the unconsumed part of the last data buffer - this
253 // can also happen to be empty).
254 // - skip_bytes, when the consumer has consumed all the input given to it
255 // and wants to skip before processing the next chunk
256 //
257 // For backward compatibility reasons, we also support the deprecated return value
258 // of type "unconsumed remainder" which can be
259 // - empty optional, if the consumer consumed all the input given to it
260 // and is ready for more
261 // - non-empty optional, when the consumer is done (and in that case
262 // the value is the unconsumed part of the last data buffer - this
263 // can also happen to be empty).
264 
265 template <typename Consumer, typename CharType>
266 concept InputStreamConsumer = requires (Consumer c) {
267  { c(temporary_buffer<CharType>{}) } -> std::same_as<future<consumption_result<CharType>>>;
268 };
269 
270 template <typename Consumer, typename CharType>
271 concept ObsoleteInputStreamConsumer = requires (Consumer c) {
272  { c(temporary_buffer<CharType>{}) } -> std::same_as<future<std::optional<temporary_buffer<CharType>>>>;
273 };
274 )
275 
280 template <typename CharType>
281 class input_stream final {
282  static_assert(sizeof(CharType) == 1, "must buffer stream of bytes");
283  data_source _fd;
285  bool _eof = false;
286 private:
288  size_t available() const noexcept { return _buf.size(); }
289 protected:
290  void reset() noexcept { _buf = {}; }
291  data_source* fd() noexcept { return &_fd; }
292 public:
294  // unconsumed_remainder is mapped for compatibility only; new code should use consumption_result_type
295  using unconsumed_remainder = std::optional<tmp_buf>;
296  using char_type = CharType;
297  input_stream() noexcept = default;
298  explicit input_stream(data_source fd) noexcept : _fd(std::move(fd)), _buf() {}
299  input_stream(input_stream&&) = default;
300  input_stream& operator=(input_stream&&) = default;
311  future<temporary_buffer<CharType>> read_exactly(size_t n) noexcept;
312  template <typename Consumer>
313  SEASTAR_CONCEPT(requires InputStreamConsumer<Consumer, CharType> || ObsoleteInputStreamConsumer<Consumer, CharType>)
314  future<> consume(Consumer&& c) noexcept(std::is_nothrow_move_constructible_v<Consumer>);
315  template <typename Consumer>
316  SEASTAR_CONCEPT(requires InputStreamConsumer<Consumer, CharType> || ObsoleteInputStreamConsumer<Consumer, CharType>)
317  future<> consume(Consumer& c) noexcept(std::is_nothrow_move_constructible_v<Consumer>);
323  bool eof() const noexcept { return _eof; }
326  future<tmp_buf> read() noexcept;
329  future<tmp_buf> read_up_to(size_t n) noexcept;
339  future<> close() noexcept {
340  return _fd.close();
341  }
343  future<> skip(uint64_t n) noexcept;
344 
356  data_source detach() &&;
357 private:
358  future<temporary_buffer<CharType>> read_exactly_part(size_t n, tmp_buf buf, size_t completed) noexcept;
359 };
360 
362  bool trim_to_size = false;
364  bool batch_flushes = false;
365 };
366 
378 template <typename CharType>
379 class output_stream final {
380  static_assert(sizeof(CharType) == 1, "must buffer stream of bytes");
381  data_sink _fd;
383  net::packet _zc_bufs = net::packet::make_null_packet(); //zero copy buffers
384  size_t _size = 0;
385  size_t _begin = 0;
386  size_t _end = 0;
387  bool _trim_to_size = false;
388  bool _batch_flushes = false;
389  std::optional<promise<>> _in_batch;
390  bool _flush = false;
391  bool _flushing = false;
392  std::exception_ptr _ex;
393  bi::slist_member_hook<> _in_poller;
394 
395 private:
396  size_t available() const noexcept { return _end - _begin; }
397  future<> split_and_put(temporary_buffer<CharType> buf) noexcept;
398  future<> put(temporary_buffer<CharType> buf) noexcept;
399  void poll_flush() noexcept;
400  future<> do_flush() noexcept;
401  future<> zero_copy_put(net::packet p) noexcept;
402  future<> zero_copy_split_and_put(net::packet p) noexcept;
403  [[gnu::noinline]]
404  future<> slow_write(const CharType* buf, size_t n) noexcept;
405 public:
406  using char_type = CharType;
407  output_stream() noexcept = default;
408  output_stream(data_sink fd, size_t size, output_stream_options opts = {}) noexcept
409  : _fd(std::move(fd)), _size(size), _trim_to_size(opts.trim_to_size), _batch_flushes(opts.batch_flushes && _fd.can_batch_flushes()) {}
410  [[deprecated("use output_stream_options instead of booleans")]]
411  output_stream(data_sink fd, size_t size, bool trim_to_size, bool batch_flushes = false) noexcept
412  : _fd(std::move(fd)), _size(size), _trim_to_size(trim_to_size), _batch_flushes(batch_flushes && _fd.can_batch_flushes()) {}
413  output_stream(data_sink fd) noexcept
414  : _fd(std::move(fd)), _size(_fd.buffer_size()), _trim_to_size(true) {}
415  output_stream(output_stream&&) noexcept = default;
416  output_stream& operator=(output_stream&&) noexcept = default;
417  ~output_stream() {
418  if (_batch_flushes) {
419  assert(!_in_batch && "Was this stream properly closed?");
420  } else {
421  assert(!_end && !_zc_bufs && "Was this stream properly closed?");
422  }
423  }
424  future<> write(const char_type* buf, size_t n) noexcept;
425  future<> write(const char_type* buf) noexcept;
426 
427  template <typename StringChar, typename SizeType, SizeType MaxSize, bool NulTerminate>
429  future<> write(const std::basic_string<char_type>& s) noexcept;
430 
431  future<> write(net::packet p) noexcept;
432  future<> write(scattered_message<char_type> msg) noexcept;
433  future<> write(temporary_buffer<char_type>) noexcept;
434  future<> flush() noexcept;
435 
439  future<> close() noexcept;
440 
452  data_sink detach() &&;
453 
454  using batch_flush_list_t = bi::slist<output_stream,
455  bi::constant_time_size<false>, bi::cache_last<true>,
456  bi::member_hook<output_stream, bi::slist_member_hook<>, &output_stream::_in_poller>>;
457 private:
458  friend class reactor;
459 };
460 
464 template <typename CharType>
466 
467 SEASTAR_MODULE_EXPORT_END
468 }
469 
470 #include "iostream-impl.hh"
Definition: sstring.hh:75
Definition: iostream.hh:222
Definition: iostream.hh:103
Definition: iostream.hh:148
Definition: iostream.hh:60
Definition: iostream.hh:68
A representation of a possibly not-yet-computed value.
Definition: future.hh:1238
Definition: iostream.hh:281
bool eof() const noexcept
Definition: iostream.hh:323
Definition: packet.hh:87
Definition: iostream.hh:379
data_sink detach() &&
Definition: iostream-impl.hh:515
future close() noexcept
Definition: iostream-impl.hh:496
Definition: reactor.hh:168
Definition: scattered_message.hh:39
Definition: iostream.hh:213
Definition: iostream.hh:202
deleter release() noexcept
Definition: temporary_buffer.hh:203
CharType * get_write() noexcept
Definition: temporary_buffer.hh:128
size_t size() const noexcept
Gets the buffer size.
Definition: temporary_buffer.hh:130
future< T > current_exception_as_future() noexcept
Returns std::current_exception() wrapped in a future.
Definition: future.hh:1953
holds the implementation parts of the metrics layer, do not use directly.
Definition: packet.hh:46
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
bool batch_flushes
Try to merge flushes with each other.
Definition: iostream.hh:364
bool trim_to_size
Definition: iostream.hh:362
future copy(input_stream< CharType > &in, output_stream< CharType > &out)
copy all the content from the input stream to the output stream
Definition: iostream-impl.hh:550
Definition: iostream.hh:199
Definition: iostream.hh:361