Seastar
High performance C++ framework for concurrent servers
iostream.hh
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18
19/*
20 * Copyright (C) 2015 Cloudius Systems, Ltd.
21 */
22
23//
24// Buffered input and output streams
25//
26// Two abstract classes (data_source and data_sink) provide means
27// to acquire bulk data from, or push bulk data to, some provider.
28// These could be tied to a TCP connection, a disk file, or a memory
29// buffer.
30//
31// Two concrete classes (input_stream and output_stream) buffer data
32// from data_source and data_sink and provide easier means to process
33// it.
34//
35
36#pragma once
37
38#include <seastar/core/future.hh>
39#include <seastar/core/coroutine.hh>
40#include <seastar/core/temporary_buffer.hh>
41#include <seastar/core/scattered_message.hh>
42#include <seastar/util/std-compat.hh>
43#include <seastar/util/modules.hh>
44#ifndef SEASTAR_MODULE
45#include <boost/intrusive/slist.hpp>
46#include <algorithm>
47#include <memory>
48#include <optional>
49#include <variant>
50#include <vector>
51#endif
52
53namespace bi = boost::intrusive;
54
55namespace seastar {
56
57SEASTAR_MODULE_EXPORT_BEGIN
58
59namespace net { class packet; }
60
62public:
63 virtual ~data_source_impl() {}
64 virtual future<temporary_buffer<char>> get() = 0;
65 virtual future<temporary_buffer<char>> skip(uint64_t n);
66 virtual future<> close() { return make_ready_future<>(); }
67};
68
70 std::unique_ptr<data_source_impl> _dsi;
71protected:
72 data_source_impl* impl() const { return _dsi.get(); }
73public:
75
76 data_source() noexcept = default;
77 explicit data_source(std::unique_ptr<data_source_impl> dsi) noexcept : _dsi(std::move(dsi)) {}
78 data_source(data_source&& x) noexcept = default;
79 data_source& operator=(data_source&& x) noexcept = default;
80
81 future<tmp_buf> get() noexcept {
82 try {
83 return _dsi->get();
84 } catch (...) {
85 return current_exception_as_future<tmp_buf>();
86 }
87 }
88 future<tmp_buf> skip(uint64_t n) noexcept {
89 try {
90 return _dsi->skip(n);
91 } catch (...) {
92 return current_exception_as_future<tmp_buf>();
93 }
94 }
95 future<> close() noexcept {
96 try {
97 return _dsi->close();
98 } catch (...) {
99 return current_exception_as_future<>();
100 }
101 }
102};
103
105public:
106 virtual ~data_sink_impl() {}
107 virtual temporary_buffer<char> allocate_buffer(size_t size) {
108 return temporary_buffer<char>(size);
109 }
110 virtual future<> put(net::packet data) = 0;
111 virtual future<> put(std::vector<temporary_buffer<char>> data) {
112 net::packet p;
113 p.reserve(data.size());
114 for (auto& buf : data) {
115 p = net::packet(std::move(p), net::fragment{buf.get_write(), buf.size()}, buf.release());
116 }
117 return put(std::move(p));
118 }
119 virtual future<> put(temporary_buffer<char> buf) {
120 return put(net::packet(net::fragment{buf.get_write(), buf.size()}, buf.release()));
121 }
122 virtual future<> flush() {
123 return make_ready_future<>();
124 }
125 virtual future<> close() = 0;
126
127 // The method should return the maximum buffer size that's acceptable by
128 // the sink. It's used when the output stream is constructed without any
129 // specific buffer size. In this case the stream accepts this value as its
130 // buffer size and doesn't put larger buffers (see trim_to_size).
131 virtual size_t buffer_size() const noexcept {
132 assert(false && "Data sink must have the buffer_size() method overload");
133 return 0;
134 }
135
136 // In order to support flushes batching (output_stream_options.batch_flushes)
137 // the sink mush handle flush errors that may happen in the background by
138 // overriding the on_batch_flush_error() method. If the sink doesn't do it,
139 // turning on batch_flushes would have no effect
140 virtual bool can_batch_flushes() const noexcept {
141 return false;
142 }
143
144 virtual void on_batch_flush_error() noexcept {
145 assert(false && "Data sink must implement on_batch_flush_error() method");
146 }
147
148protected:
149 // This is a helper function that classes that inherit from data_sink_impl
150 // can use to implement the put overload for net::packet.
151 // Unfortunately, we currently cannot define this function as
152 // 'virtual future<> put(net::packet)', because we would get infinite
153 // recursion between this function and
154 // 'virtual future<> put(temporary_buffer<char>)'.
155 future<> fallback_put(net::packet data) {
156 auto buffers = data.release();
157 for (temporary_buffer<char>& buf : buffers) {
158 co_await this->put(std::move(buf));
159 }
160 }
161};
162
164 std::unique_ptr<data_sink_impl> _dsi;
165public:
166 data_sink() noexcept = default;
167 explicit data_sink(std::unique_ptr<data_sink_impl> dsi) noexcept : _dsi(std::move(dsi)) {}
168 data_sink(data_sink&& x) noexcept = default;
169 data_sink& operator=(data_sink&& x) noexcept = default;
170 temporary_buffer<char> allocate_buffer(size_t size) {
171 return _dsi->allocate_buffer(size);
172 }
173 future<> put(std::vector<temporary_buffer<char>> data) noexcept {
174 try {
175 return _dsi->put(std::move(data));
176 } catch (...) {
178 }
179 }
180 future<> put(temporary_buffer<char> data) noexcept {
181 try {
182 return _dsi->put(std::move(data));
183 } catch (...) {
185 }
186 }
187 future<> put(net::packet p) noexcept {
188 try {
189 return _dsi->put(std::move(p));
190 } catch (...) {
192 }
193 }
194 future<> flush() noexcept {
195 try {
196 return _dsi->flush();
197 } catch (...) {
199 }
200 }
201 future<> close() noexcept {
202 try {
203 return _dsi->close();
204 } catch (...) {
206 }
207 }
208
209 size_t buffer_size() const noexcept { return _dsi->buffer_size(); }
210 bool can_batch_flushes() const noexcept { return _dsi->can_batch_flushes(); }
211 void on_batch_flush_error() noexcept { _dsi->on_batch_flush_error(); }
212};
213
215
216template <typename CharType>
218public:
220 explicit stop_consuming(tmp_buf buf) : _buf(std::move(buf)) {}
221
222 tmp_buf& get_buffer() { return _buf; }
223 const tmp_buf& get_buffer() const { return _buf; }
224private:
225 tmp_buf _buf;
226};
227
229public:
230 explicit skip_bytes(uint64_t v) : _value(v) {}
231 uint64_t get_value() const { return _value; }
232private:
233 uint64_t _value;
234};
235
236template <typename CharType>
238public:
240 using consumption_variant = std::variant<continue_consuming, stop_consuming_type, skip_bytes>;
241 using tmp_buf = typename stop_consuming_type::tmp_buf;
242
243 /*[[deprecated]]*/ consumption_result(std::optional<tmp_buf> opt_buf) {
244 if (opt_buf) {
245 _result = stop_consuming_type{std::move(opt_buf.value())};
246 }
247 }
248
250 consumption_result(stop_consuming_type&& stop) : _result(std::move(stop)) {}
251 consumption_result(skip_bytes&& skip) : _result(std::move(skip)) {}
252
253 consumption_variant& get() { return _result; }
254 const consumption_variant& get() const { return _result; }
255
256private:
257 consumption_variant _result;
258};
259
260// Consumer concept, for consume() method
261
262// The consumer should operate on the data given to it, and
263// return a future "consumption result", which can be
264// - continue_consuming, if the consumer has consumed all the input given
265// to it and is ready for more
266// - stop_consuming, when the consumer is done (and in that case
267// the contained buffer is the unconsumed part of the last data buffer - this
268// can also happen to be empty).
269// - skip_bytes, when the consumer has consumed all the input given to it
270// and wants to skip before processing the next chunk
271//
272// For backward compatibility reasons, we also support the deprecated return value
273// of type "unconsumed remainder" which can be
274// - empty optional, if the consumer consumed all the input given to it
275// and is ready for more
276// - non-empty optional, when the consumer is done (and in that case
277// the value is the unconsumed part of the last data buffer - this
278// can also happen to be empty).
279
280template <typename Consumer, typename CharType>
281concept InputStreamConsumer = requires (Consumer c) {
282 { c(temporary_buffer<CharType>{}) } -> std::same_as<future<consumption_result<CharType>>>;
283};
284
285template <typename Consumer, typename CharType>
286concept ObsoleteInputStreamConsumer = requires (Consumer c) {
287 { c(temporary_buffer<CharType>{}) } -> std::same_as<future<std::optional<temporary_buffer<CharType>>>>;
288};
289
294template <typename CharType>
295class input_stream final {
296 static_assert(sizeof(CharType) == 1, "must buffer stream of bytes");
297 data_source _fd;
299 bool _eof = false;
300private:
302 size_t available() const noexcept { return _buf.size(); }
303protected:
304 void reset() noexcept { _buf = {}; }
305 data_source* fd() noexcept { return &_fd; }
306public:
308 // unconsumed_remainder is mapped for compatibility only; new code should use consumption_result_type
309 using unconsumed_remainder = std::optional<tmp_buf>;
310 using char_type = CharType;
311 input_stream() noexcept = default;
312 explicit input_stream(data_source fd) noexcept : _fd(std::move(fd)), _buf() {}
313 input_stream(input_stream&&) = default;
314 input_stream& operator=(input_stream&&) = default;
326 template <typename Consumer>
327 requires InputStreamConsumer<Consumer, CharType> || ObsoleteInputStreamConsumer<Consumer, CharType>
328 future<> consume(Consumer&& c) noexcept(std::is_nothrow_move_constructible_v<Consumer>);
329 template <typename Consumer>
330 requires InputStreamConsumer<Consumer, CharType> || ObsoleteInputStreamConsumer<Consumer, CharType>
331 future<> consume(Consumer& c) noexcept(std::is_nothrow_move_constructible_v<Consumer>);
337 bool eof() const noexcept { return _eof; }
340 future<tmp_buf> read() noexcept;
343 future<tmp_buf> read_up_to(size_t n) noexcept;
353 future<> close() noexcept {
354 return _fd.close();
355 }
357 future<> skip(uint64_t n) noexcept;
358
370 data_source detach() &&;
371private:
372 future<temporary_buffer<CharType>> read_exactly_part(size_t n) noexcept;
373};
374
376 bool trim_to_size = false;
378 bool batch_flushes = false;
379};
380
392template <typename CharType>
393class output_stream final {
394 static_assert(sizeof(CharType) == 1, "must buffer stream of bytes");
395 data_sink _fd;
397 net::packet _zc_bufs = net::packet::make_null_packet(); //zero copy buffers
398 size_t _size = 0;
399 size_t _begin = 0;
400 size_t _end = 0;
401 bool _trim_to_size = false;
402 bool _batch_flushes = false;
403 std::optional<promise<>> _in_batch;
404 bool _flush = false;
405 bool _flushing = false;
406 std::exception_ptr _ex;
407 bi::slist_member_hook<> _in_poller;
408
409private:
410 size_t available() const noexcept { return _end - _begin; }
411 future<> split_and_put(temporary_buffer<CharType> buf) noexcept;
412 future<> put(temporary_buffer<CharType> buf) noexcept;
413 void poll_flush() noexcept;
414 future<> do_flush() noexcept;
415 future<> zero_copy_put(net::packet p) noexcept;
416 future<> zero_copy_split_and_put(net::packet p) noexcept;
417 [[gnu::noinline]]
418 future<> slow_write(const CharType* buf, size_t n) noexcept;
419public:
420 using char_type = CharType;
421 output_stream() noexcept = default;
422 output_stream(data_sink fd, size_t size, output_stream_options opts = {}) noexcept
423 : _fd(std::move(fd)), _size(size), _trim_to_size(opts.trim_to_size), _batch_flushes(opts.batch_flushes && _fd.can_batch_flushes()) {}
424 [[deprecated("use output_stream_options instead of booleans")]]
425 output_stream(data_sink fd, size_t size, bool trim_to_size, bool batch_flushes = false) noexcept
426 : _fd(std::move(fd)), _size(size), _trim_to_size(trim_to_size), _batch_flushes(batch_flushes && _fd.can_batch_flushes()) {}
427 output_stream(data_sink fd) noexcept
428 : _fd(std::move(fd)), _size(_fd.buffer_size()), _trim_to_size(true) {}
429 output_stream(output_stream&&) noexcept = default;
430 output_stream& operator=(output_stream&&) noexcept = default;
432 if (_batch_flushes) {
433 assert(!_in_batch && "Was this stream properly closed?");
434 } else {
435 assert(!_end && !_zc_bufs && "Was this stream properly closed?");
436 }
437 }
438 future<> write(const char_type* buf, size_t n) noexcept;
439 future<> write(const char_type* buf) noexcept;
440
441 template <typename StringChar, typename SizeType, SizeType MaxSize, bool NulTerminate>
443 future<> write(const std::basic_string<char_type>& s) noexcept;
444
445 future<> write(net::packet p) noexcept;
446 future<> write(scattered_message<char_type> msg) noexcept;
448 future<> flush() noexcept;
449
453 future<> close() noexcept;
454
466 data_sink detach() &&;
467
468 using batch_flush_list_t = bi::slist<output_stream,
469 bi::constant_time_size<false>, bi::cache_last<true>,
470 bi::member_hook<output_stream, bi::slist_member_hook<>, &output_stream::_in_poller>>;
471private:
472 friend class reactor;
473};
474
478template <typename CharType>
480
481SEASTAR_MODULE_EXPORT_END
482}
483
484#include "iostream-impl.hh"
Definition: sstring.hh:76
Definition: iostream.hh:237
Definition: iostream.hh:104
Definition: iostream.hh:163
Definition: iostream.hh:61
Definition: iostream.hh:69
A representation of a possibly not-yet-computed value.
Definition: future.hh:1240
value_type && get()
gets the value returned by the computation
Definition: future.hh:1342
Definition: iostream.hh:295
future skip(uint64_t n) noexcept
Ignores n next bytes from the stream.
Definition: iostream-impl.hh:301
future< tmp_buf > read_up_to(size_t n) noexcept
Definition: iostream-impl.hh:255
future< temporary_buffer< CharType > > read_exactly(size_t n) noexcept
Definition: iostream-impl.hh:184
future< tmp_buf > read() noexcept
Definition: iostream-impl.hh:284
future close() noexcept
Definition: iostream.hh:353
data_source detach() &&
Definition: iostream-impl.hh:315
bool eof() const noexcept
Definition: iostream.hh:337
Definition: packet.hh:87
Definition: iostream.hh:393
data_sink detach() &&
Definition: iostream-impl.hh:515
future close() noexcept
Definition: iostream-impl.hh:496
Definition: reactor.hh:146
Definition: scattered_message.hh:35
Definition: iostream.hh:228
Definition: iostream.hh:217
deleter release() noexcept
Definition: temporary_buffer.hh:203
size_t size() const noexcept
Gets the buffer size.
Definition: temporary_buffer.hh:130
CharType * get_write() noexcept
Definition: temporary_buffer.hh:128
future< T > current_exception_as_future() noexcept
Returns std::current_exception() wrapped in a future.
Definition: future.hh:1962
holds the implementation parts of the metrics layer, do not use directly.
Definition: packet.hh:46
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
bool batch_flushes
Try to merge flushes with each other.
Definition: iostream.hh:378
bool trim_to_size
Definition: iostream.hh:376
future copy(input_stream< CharType > &in, output_stream< CharType > &out)
copy all the content from the input stream to the output stream
Definition: iostream-impl.hh:550
Definition: iostream.hh:214
Definition: iostream.hh:375