Seastar
High performance C++ framework for concurrent servers
iostream.hh
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18
19/*
20 * Copyright (C) 2015 Cloudius Systems, Ltd.
21 */
22
23//
24// Buffered input and output streams
25//
26// Two abstract classes (data_source and data_sink) provide means
27// to acquire bulk data from, or push bulk data to, some provider.
28// These could be tied to a TCP connection, a disk file, or a memory
29// buffer.
30//
31// Two concrete classes (input_stream and output_stream) buffer data
32// from data_source and data_sink and provide easier means to process
33// it.
34//
35
36#pragma once
37
38#include <seastar/core/future.hh>
39#include <seastar/core/temporary_buffer.hh>
40#include <seastar/core/scattered_message.hh>
41#include <seastar/util/std-compat.hh>
42#include <seastar/util/modules.hh>
43#ifndef SEASTAR_MODULE
44#include <boost/intrusive/slist.hpp>
45#include <algorithm>
46#include <memory>
47#include <optional>
48#include <variant>
49#include <vector>
50#endif
51
52namespace bi = boost::intrusive;
53
54namespace seastar {
55
56SEASTAR_MODULE_EXPORT_BEGIN
57
58namespace net { class packet; }
59
61public:
62 virtual ~data_source_impl() {}
63 virtual future<temporary_buffer<char>> get() = 0;
64 virtual future<temporary_buffer<char>> skip(uint64_t n);
65 virtual future<> close() { return make_ready_future<>(); }
66};
67
69 std::unique_ptr<data_source_impl> _dsi;
70protected:
71 data_source_impl* impl() const { return _dsi.get(); }
72public:
74
75 data_source() noexcept = default;
76 explicit data_source(std::unique_ptr<data_source_impl> dsi) noexcept : _dsi(std::move(dsi)) {}
77 data_source(data_source&& x) noexcept = default;
78 data_source& operator=(data_source&& x) noexcept = default;
79
80 future<tmp_buf> get() noexcept {
81 try {
82 return _dsi->get();
83 } catch (...) {
84 return current_exception_as_future<tmp_buf>();
85 }
86 }
87 future<tmp_buf> skip(uint64_t n) noexcept {
88 try {
89 return _dsi->skip(n);
90 } catch (...) {
91 return current_exception_as_future<tmp_buf>();
92 }
93 }
94 future<> close() noexcept {
95 try {
96 return _dsi->close();
97 } catch (...) {
98 return current_exception_as_future<>();
99 }
100 }
101};
102
104public:
105 virtual ~data_sink_impl() {}
106 virtual temporary_buffer<char> allocate_buffer(size_t size) {
107 return temporary_buffer<char>(size);
108 }
109 virtual future<> put(net::packet data) = 0;
110 virtual future<> put(std::vector<temporary_buffer<char>> data) {
111 net::packet p;
112 p.reserve(data.size());
113 for (auto& buf : data) {
114 p = net::packet(std::move(p), net::fragment{buf.get_write(), buf.size()}, buf.release());
115 }
116 return put(std::move(p));
117 }
118 virtual future<> put(temporary_buffer<char> buf) {
119 return put(net::packet(net::fragment{buf.get_write(), buf.size()}, buf.release()));
120 }
121 virtual future<> flush() {
122 return make_ready_future<>();
123 }
124 virtual future<> close() = 0;
125
126 // The method should return the maximum buffer size that's acceptable by
127 // the sink. It's used when the output stream is constructed without any
128 // specific buffer size. In this case the stream accepts this value as its
129 // buffer size and doesn't put larger buffers (see trim_to_size).
130 virtual size_t buffer_size() const noexcept {
131 assert(false && "Data sink must have the buffer_size() method overload");
132 return 0;
133 }
134
135 // In order to support flushes batching (output_stream_options.batch_flushes)
136 // the sink mush handle flush errors that may happen in the background by
137 // overriding the on_batch_flush_error() method. If the sink doesn't do it,
138 // turning on batch_flushes would have no effect
139 virtual bool can_batch_flushes() const noexcept {
140 return false;
141 }
142
143 virtual void on_batch_flush_error() noexcept {
144 assert(false && "Data sink must implement on_batch_flush_error() method");
145 }
146};
147
149 std::unique_ptr<data_sink_impl> _dsi;
150public:
151 data_sink() noexcept = default;
152 explicit data_sink(std::unique_ptr<data_sink_impl> dsi) noexcept : _dsi(std::move(dsi)) {}
153 data_sink(data_sink&& x) noexcept = default;
154 data_sink& operator=(data_sink&& x) noexcept = default;
155 temporary_buffer<char> allocate_buffer(size_t size) {
156 return _dsi->allocate_buffer(size);
157 }
158 future<> put(std::vector<temporary_buffer<char>> data) noexcept {
159 try {
160 return _dsi->put(std::move(data));
161 } catch (...) {
163 }
164 }
165 future<> put(temporary_buffer<char> data) noexcept {
166 try {
167 return _dsi->put(std::move(data));
168 } catch (...) {
170 }
171 }
172 future<> put(net::packet p) noexcept {
173 try {
174 return _dsi->put(std::move(p));
175 } catch (...) {
177 }
178 }
179 future<> flush() noexcept {
180 try {
181 return _dsi->flush();
182 } catch (...) {
184 }
185 }
186 future<> close() noexcept {
187 try {
188 return _dsi->close();
189 } catch (...) {
191 }
192 }
193
194 size_t buffer_size() const noexcept { return _dsi->buffer_size(); }
195 bool can_batch_flushes() const noexcept { return _dsi->can_batch_flushes(); }
196 void on_batch_flush_error() noexcept { _dsi->on_batch_flush_error(); }
197};
198
200
201template <typename CharType>
203public:
205 explicit stop_consuming(tmp_buf buf) : _buf(std::move(buf)) {}
206
207 tmp_buf& get_buffer() { return _buf; }
208 const tmp_buf& get_buffer() const { return _buf; }
209private:
210 tmp_buf _buf;
211};
212
214public:
215 explicit skip_bytes(uint64_t v) : _value(v) {}
216 uint64_t get_value() const { return _value; }
217private:
218 uint64_t _value;
219};
220
221template <typename CharType>
223public:
225 using consumption_variant = std::variant<continue_consuming, stop_consuming_type, skip_bytes>;
226 using tmp_buf = typename stop_consuming_type::tmp_buf;
227
228 /*[[deprecated]]*/ consumption_result(std::optional<tmp_buf> opt_buf) {
229 if (opt_buf) {
230 _result = stop_consuming_type{std::move(opt_buf.value())};
231 }
232 }
233
235 consumption_result(stop_consuming_type&& stop) : _result(std::move(stop)) {}
236 consumption_result(skip_bytes&& skip) : _result(std::move(skip)) {}
237
238 consumption_variant& get() { return _result; }
239 const consumption_variant& get() const { return _result; }
240
241private:
242 consumption_variant _result;
243};
244
245// Consumer concept, for consume() method
246
247// The consumer should operate on the data given to it, and
248// return a future "consumption result", which can be
249// - continue_consuming, if the consumer has consumed all the input given
250// to it and is ready for more
251// - stop_consuming, when the consumer is done (and in that case
252// the contained buffer is the unconsumed part of the last data buffer - this
253// can also happen to be empty).
254// - skip_bytes, when the consumer has consumed all the input given to it
255// and wants to skip before processing the next chunk
256//
257// For backward compatibility reasons, we also support the deprecated return value
258// of type "unconsumed remainder" which can be
259// - empty optional, if the consumer consumed all the input given to it
260// and is ready for more
261// - non-empty optional, when the consumer is done (and in that case
262// the value is the unconsumed part of the last data buffer - this
263// can also happen to be empty).
264
265template <typename Consumer, typename CharType>
266concept InputStreamConsumer = requires (Consumer c) {
267 { c(temporary_buffer<CharType>{}) } -> std::same_as<future<consumption_result<CharType>>>;
268};
269
270template <typename Consumer, typename CharType>
271concept ObsoleteInputStreamConsumer = requires (Consumer c) {
272 { c(temporary_buffer<CharType>{}) } -> std::same_as<future<std::optional<temporary_buffer<CharType>>>>;
273};
274
279template <typename CharType>
280class input_stream final {
281 static_assert(sizeof(CharType) == 1, "must buffer stream of bytes");
282 data_source _fd;
284 bool _eof = false;
285private:
287 size_t available() const noexcept { return _buf.size(); }
288protected:
289 void reset() noexcept { _buf = {}; }
290 data_source* fd() noexcept { return &_fd; }
291public:
293 // unconsumed_remainder is mapped for compatibility only; new code should use consumption_result_type
294 using unconsumed_remainder = std::optional<tmp_buf>;
295 using char_type = CharType;
296 input_stream() noexcept = default;
297 explicit input_stream(data_source fd) noexcept : _fd(std::move(fd)), _buf() {}
298 input_stream(input_stream&&) = default;
299 input_stream& operator=(input_stream&&) = default;
311 template <typename Consumer>
312 requires InputStreamConsumer<Consumer, CharType> || ObsoleteInputStreamConsumer<Consumer, CharType>
313 future<> consume(Consumer&& c) noexcept(std::is_nothrow_move_constructible_v<Consumer>);
314 template <typename Consumer>
315 requires InputStreamConsumer<Consumer, CharType> || ObsoleteInputStreamConsumer<Consumer, CharType>
316 future<> consume(Consumer& c) noexcept(std::is_nothrow_move_constructible_v<Consumer>);
322 bool eof() const noexcept { return _eof; }
325 future<tmp_buf> read() noexcept;
328 future<tmp_buf> read_up_to(size_t n) noexcept;
338 future<> close() noexcept {
339 return _fd.close();
340 }
342 future<> skip(uint64_t n) noexcept;
343
355 data_source detach() &&;
356private:
357 future<temporary_buffer<CharType>> read_exactly_part(size_t n) noexcept;
358};
359
361 bool trim_to_size = false;
363 bool batch_flushes = false;
364};
365
377template <typename CharType>
378class output_stream final {
379 static_assert(sizeof(CharType) == 1, "must buffer stream of bytes");
380 data_sink _fd;
382 net::packet _zc_bufs = net::packet::make_null_packet(); //zero copy buffers
383 size_t _size = 0;
384 size_t _begin = 0;
385 size_t _end = 0;
386 bool _trim_to_size = false;
387 bool _batch_flushes = false;
388 std::optional<promise<>> _in_batch;
389 bool _flush = false;
390 bool _flushing = false;
391 std::exception_ptr _ex;
392 bi::slist_member_hook<> _in_poller;
393
394private:
395 size_t available() const noexcept { return _end - _begin; }
396 future<> split_and_put(temporary_buffer<CharType> buf) noexcept;
397 future<> put(temporary_buffer<CharType> buf) noexcept;
398 void poll_flush() noexcept;
399 future<> do_flush() noexcept;
400 future<> zero_copy_put(net::packet p) noexcept;
401 future<> zero_copy_split_and_put(net::packet p) noexcept;
402 [[gnu::noinline]]
403 future<> slow_write(const CharType* buf, size_t n) noexcept;
404public:
405 using char_type = CharType;
406 output_stream() noexcept = default;
407 output_stream(data_sink fd, size_t size, output_stream_options opts = {}) noexcept
408 : _fd(std::move(fd)), _size(size), _trim_to_size(opts.trim_to_size), _batch_flushes(opts.batch_flushes && _fd.can_batch_flushes()) {}
409 [[deprecated("use output_stream_options instead of booleans")]]
410 output_stream(data_sink fd, size_t size, bool trim_to_size, bool batch_flushes = false) noexcept
411 : _fd(std::move(fd)), _size(size), _trim_to_size(trim_to_size), _batch_flushes(batch_flushes && _fd.can_batch_flushes()) {}
412 output_stream(data_sink fd) noexcept
413 : _fd(std::move(fd)), _size(_fd.buffer_size()), _trim_to_size(true) {}
414 output_stream(output_stream&&) noexcept = default;
415 output_stream& operator=(output_stream&&) noexcept = default;
417 if (_batch_flushes) {
418 assert(!_in_batch && "Was this stream properly closed?");
419 } else {
420 assert(!_end && !_zc_bufs && "Was this stream properly closed?");
421 }
422 }
423 future<> write(const char_type* buf, size_t n) noexcept;
424 future<> write(const char_type* buf) noexcept;
425
426 template <typename StringChar, typename SizeType, SizeType MaxSize, bool NulTerminate>
428 future<> write(const std::basic_string<char_type>& s) noexcept;
429
430 future<> write(net::packet p) noexcept;
431 future<> write(scattered_message<char_type> msg) noexcept;
433 future<> flush() noexcept;
434
438 future<> close() noexcept;
439
451 data_sink detach() &&;
452
453 using batch_flush_list_t = bi::slist<output_stream,
454 bi::constant_time_size<false>, bi::cache_last<true>,
455 bi::member_hook<output_stream, bi::slist_member_hook<>, &output_stream::_in_poller>>;
456private:
457 friend class reactor;
458};
459
463template <typename CharType>
465
466SEASTAR_MODULE_EXPORT_END
467}
468
469#include "iostream-impl.hh"
Definition: sstring.hh:76
Definition: iostream.hh:222
Definition: iostream.hh:103
Definition: iostream.hh:148
Definition: iostream.hh:60
Definition: iostream.hh:68
A representation of a possibly not-yet-computed value.
Definition: future.hh:1240
value_type && get()
gets the value returned by the computation
Definition: future.hh:1342
Definition: iostream.hh:280
future skip(uint64_t n) noexcept
Ignores n next bytes from the stream.
Definition: iostream-impl.hh:301
future< tmp_buf > read_up_to(size_t n) noexcept
Definition: iostream-impl.hh:255
future< temporary_buffer< CharType > > read_exactly(size_t n) noexcept
Definition: iostream-impl.hh:184
future< tmp_buf > read() noexcept
Definition: iostream-impl.hh:284
future close() noexcept
Definition: iostream.hh:338
data_source detach() &&
Definition: iostream-impl.hh:315
bool eof() const noexcept
Definition: iostream.hh:322
Definition: packet.hh:87
Definition: iostream.hh:378
data_sink detach() &&
Definition: iostream-impl.hh:515
future close() noexcept
Definition: iostream-impl.hh:496
Definition: reactor.hh:155
Definition: scattered_message.hh:35
Definition: iostream.hh:213
Definition: iostream.hh:202
deleter release() noexcept
Definition: temporary_buffer.hh:203
size_t size() const noexcept
Gets the buffer size.
Definition: temporary_buffer.hh:130
CharType * get_write() noexcept
Definition: temporary_buffer.hh:128
future< T > current_exception_as_future() noexcept
Returns std::current_exception() wrapped in a future.
Definition: future.hh:1962
holds the implementation parts of the metrics layer, do not use directly.
Definition: packet.hh:46
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
bool batch_flushes
Try to merge flushes with each other.
Definition: iostream.hh:363
bool trim_to_size
Definition: iostream.hh:361
future copy(input_stream< CharType > &in, output_stream< CharType > &out)
copy all the content from the input stream to the output stream
Definition: iostream-impl.hh:550
Definition: iostream.hh:199
Definition: iostream.hh:360