Seastar
High performance C++ framework for concurrent servers
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Modules Pages
iostream.hh
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18
19/*
20 * Copyright (C) 2015 Cloudius Systems, Ltd.
21 */
22
23//
24// Buffered input and output streams
25//
26// Two abstract classes (data_source and data_sink) provide means
27// to acquire bulk data from, or push bulk data to, some provider.
28// These could be tied to a TCP connection, a disk file, or a memory
29// buffer.
30//
31// Two concrete classes (input_stream and output_stream) buffer data
32// from data_source and data_sink and provide easier means to process
33// it.
34//
35
36#pragma once
37
38#include <seastar/core/future.hh>
39#include <seastar/core/coroutine.hh>
40#include <seastar/core/temporary_buffer.hh>
41#include <seastar/core/scattered_message.hh>
42#include <seastar/util/assert.hh>
43#include <seastar/util/std-compat.hh>
44#include <seastar/util/modules.hh>
45#ifndef SEASTAR_MODULE
46#include <boost/intrusive/slist.hpp>
47#include <algorithm>
48#include <memory>
49#include <optional>
50#include <variant>
51#include <vector>
52#endif
53
54namespace bi = boost::intrusive;
55
56namespace seastar {
57
58SEASTAR_MODULE_EXPORT_BEGIN
59
60namespace net { class packet; }
61
63public:
64 virtual ~data_source_impl() {}
65 virtual future<temporary_buffer<char>> get() = 0;
66 virtual future<temporary_buffer<char>> skip(uint64_t n);
67 virtual future<> close() { return make_ready_future<>(); }
68};
69
71 std::unique_ptr<data_source_impl> _dsi;
72protected:
73 data_source_impl* impl() const { return _dsi.get(); }
74public:
76
77 data_source() noexcept = default;
78 explicit data_source(std::unique_ptr<data_source_impl> dsi) noexcept : _dsi(std::move(dsi)) {}
79 data_source(data_source&& x) noexcept = default;
80 data_source& operator=(data_source&& x) noexcept = default;
81
82 future<tmp_buf> get() noexcept {
83 try {
84 return _dsi->get();
85 } catch (...) {
86 return current_exception_as_future<tmp_buf>();
87 }
88 }
89 future<tmp_buf> skip(uint64_t n) noexcept {
90 try {
91 return _dsi->skip(n);
92 } catch (...) {
93 return current_exception_as_future<tmp_buf>();
94 }
95 }
96 future<> close() noexcept {
97 try {
98 return _dsi->close();
99 } catch (...) {
100 return current_exception_as_future<>();
101 }
102 }
103};
104
106public:
107 virtual ~data_sink_impl() {}
108 virtual temporary_buffer<char> allocate_buffer(size_t size) {
109 return temporary_buffer<char>(size);
110 }
111 virtual future<> put(net::packet data) = 0;
112 virtual future<> put(std::vector<temporary_buffer<char>> data) {
113 net::packet p;
114 p.reserve(data.size());
115 for (auto& buf : data) {
116 p = net::packet(std::move(p), net::fragment{buf.get_write(), buf.size()}, buf.release());
117 }
118 return put(std::move(p));
119 }
120 virtual future<> put(temporary_buffer<char> buf) {
121 return put(net::packet(net::fragment{buf.get_write(), buf.size()}, buf.release()));
122 }
123 virtual future<> flush() {
124 return make_ready_future<>();
125 }
126 virtual future<> close() = 0;
127
128 // The method should return the maximum buffer size that's acceptable by
129 // the sink. It's used when the output stream is constructed without any
130 // specific buffer size. In this case the stream accepts this value as its
131 // buffer size and doesn't put larger buffers (see trim_to_size).
132 virtual size_t buffer_size() const noexcept {
133 SEASTAR_ASSERT(false && "Data sink must have the buffer_size() method overload");
134 return 0;
135 }
136
137 // In order to support flushes batching (output_stream_options.batch_flushes)
138 // the sink mush handle flush errors that may happen in the background by
139 // overriding the on_batch_flush_error() method. If the sink doesn't do it,
140 // turning on batch_flushes would have no effect
141 virtual bool can_batch_flushes() const noexcept {
142 return false;
143 }
144
145 virtual void on_batch_flush_error() noexcept {
146 SEASTAR_ASSERT(false && "Data sink must implement on_batch_flush_error() method");
147 }
148
149protected:
150 // This is a helper function that classes that inherit from data_sink_impl
151 // can use to implement the put overload for net::packet.
152 // Unfortunately, we currently cannot define this function as
153 // 'virtual future<> put(net::packet)', because we would get infinite
154 // recursion between this function and
155 // 'virtual future<> put(temporary_buffer<char>)'.
156 future<> fallback_put(net::packet data) {
157 auto buffers = data.release();
158 for (temporary_buffer<char>& buf : buffers) {
159 co_await this->put(std::move(buf));
160 }
161 }
162};
163
165 std::unique_ptr<data_sink_impl> _dsi;
166public:
167 data_sink() noexcept = default;
168 explicit data_sink(std::unique_ptr<data_sink_impl> dsi) noexcept : _dsi(std::move(dsi)) {}
169 data_sink(data_sink&& x) noexcept = default;
170 data_sink& operator=(data_sink&& x) noexcept = default;
171 temporary_buffer<char> allocate_buffer(size_t size) {
172 return _dsi->allocate_buffer(size);
173 }
174 future<> put(std::vector<temporary_buffer<char>> data) noexcept {
175 try {
176 return _dsi->put(std::move(data));
177 } catch (...) {
179 }
180 }
181 future<> put(temporary_buffer<char> data) noexcept {
182 try {
183 return _dsi->put(std::move(data));
184 } catch (...) {
186 }
187 }
188 future<> put(net::packet p) noexcept {
189 try {
190 return _dsi->put(std::move(p));
191 } catch (...) {
193 }
194 }
195 future<> flush() noexcept {
196 try {
197 return _dsi->flush();
198 } catch (...) {
200 }
201 }
202 future<> close() noexcept {
203 try {
204 return _dsi->close();
205 } catch (...) {
207 }
208 }
209
210 size_t buffer_size() const noexcept { return _dsi->buffer_size(); }
211 bool can_batch_flushes() const noexcept { return _dsi->can_batch_flushes(); }
212 void on_batch_flush_error() noexcept { _dsi->on_batch_flush_error(); }
213};
214
216
217template <typename CharType>
219public:
221 explicit stop_consuming(tmp_buf buf) : _buf(std::move(buf)) {}
222
223 tmp_buf& get_buffer() { return _buf; }
224 const tmp_buf& get_buffer() const { return _buf; }
225private:
226 tmp_buf _buf;
227};
228
230public:
231 explicit skip_bytes(uint64_t v) : _value(v) {}
232 uint64_t get_value() const { return _value; }
233private:
234 uint64_t _value;
235};
236
237template <typename CharType>
239public:
241 using consumption_variant = std::variant<continue_consuming, stop_consuming_type, skip_bytes>;
242 using tmp_buf = typename stop_consuming_type::tmp_buf;
243
244 /*[[deprecated]]*/ consumption_result(std::optional<tmp_buf> opt_buf) {
245 if (opt_buf) {
246 _result = stop_consuming_type{std::move(opt_buf.value())};
247 }
248 }
249
251 consumption_result(stop_consuming_type&& stop) : _result(std::move(stop)) {}
252 consumption_result(skip_bytes&& skip) : _result(std::move(skip)) {}
253
254 consumption_variant& get() { return _result; }
255 const consumption_variant& get() const { return _result; }
256
257private:
258 consumption_variant _result;
259};
260
261// Consumer concept, for consume() method
262
263// The consumer should operate on the data given to it, and
264// return a future "consumption result", which can be
265// - continue_consuming, if the consumer has consumed all the input given
266// to it and is ready for more
267// - stop_consuming, when the consumer is done (and in that case
268// the contained buffer is the unconsumed part of the last data buffer - this
269// can also happen to be empty).
270// - skip_bytes, when the consumer has consumed all the input given to it
271// and wants to skip before processing the next chunk
272//
273// For backward compatibility reasons, we also support the deprecated return value
274// of type "unconsumed remainder" which can be
275// - empty optional, if the consumer consumed all the input given to it
276// and is ready for more
277// - non-empty optional, when the consumer is done (and in that case
278// the value is the unconsumed part of the last data buffer - this
279// can also happen to be empty).
280
281template <typename Consumer, typename CharType>
282concept InputStreamConsumer = requires (Consumer c) {
283 { c(temporary_buffer<CharType>{}) } -> std::same_as<future<consumption_result<CharType>>>;
284};
285
286template <typename Consumer, typename CharType>
287concept ObsoleteInputStreamConsumer = requires (Consumer c) {
288 { c(temporary_buffer<CharType>{}) } -> std::same_as<future<std::optional<temporary_buffer<CharType>>>>;
289};
290
295template <typename CharType>
296class input_stream final {
297 static_assert(sizeof(CharType) == 1, "must buffer stream of bytes");
298 data_source _fd;
300 bool _eof = false;
301private:
303 size_t available() const noexcept { return _buf.size(); }
304protected:
305 void reset() noexcept { _buf = {}; }
306 data_source* fd() noexcept { return &_fd; }
307public:
309 // unconsumed_remainder is mapped for compatibility only; new code should use consumption_result_type
310 using unconsumed_remainder = std::optional<tmp_buf>;
311 using char_type = CharType;
312 input_stream() noexcept = default;
313 explicit input_stream(data_source fd) noexcept : _fd(std::move(fd)), _buf() {}
314 input_stream(input_stream&&) = default;
315 input_stream& operator=(input_stream&&) = default;
327 template <typename Consumer>
328 requires InputStreamConsumer<Consumer, CharType> || ObsoleteInputStreamConsumer<Consumer, CharType>
329 future<> consume(Consumer&& c) noexcept(std::is_nothrow_move_constructible_v<Consumer>);
330 template <typename Consumer>
331 requires InputStreamConsumer<Consumer, CharType> || ObsoleteInputStreamConsumer<Consumer, CharType>
332 future<> consume(Consumer& c) noexcept(std::is_nothrow_move_constructible_v<Consumer>);
338 bool eof() const noexcept { return _eof; }
341 future<tmp_buf> read() noexcept;
344 future<tmp_buf> read_up_to(size_t n) noexcept;
354 future<> close() noexcept {
355 return _fd.close();
356 }
358 future<> skip(uint64_t n) noexcept;
359
371 data_source detach() &&;
372private:
373 future<temporary_buffer<CharType>> read_exactly_part(size_t n) noexcept;
374};
375
377 bool trim_to_size = false;
379 bool batch_flushes = false;
380};
381
393template <typename CharType>
394class output_stream final {
395 static_assert(sizeof(CharType) == 1, "must buffer stream of bytes");
396 data_sink _fd;
398 net::packet _zc_bufs = net::packet::make_null_packet(); //zero copy buffers
399 size_t _size = 0;
400 size_t _begin = 0;
401 size_t _end = 0;
402 bool _trim_to_size = false;
403 bool _batch_flushes = false;
404 std::optional<promise<>> _in_batch;
405 bool _flush = false;
406 bool _flushing = false;
407 std::exception_ptr _ex;
408 bi::slist_member_hook<> _in_poller;
409
410private:
411 size_t available() const noexcept { return _end - _begin; }
412 future<> split_and_put(temporary_buffer<CharType> buf) noexcept;
413 future<> put(temporary_buffer<CharType> buf) noexcept;
414 void poll_flush() noexcept;
415 future<> do_flush() noexcept;
416 future<> zero_copy_put(net::packet p) noexcept;
417 future<> zero_copy_split_and_put(net::packet p) noexcept;
418 [[gnu::noinline]]
419 future<> slow_write(const CharType* buf, size_t n) noexcept;
420public:
421 using char_type = CharType;
422 output_stream() noexcept = default;
423 output_stream(data_sink fd, size_t size, output_stream_options opts = {}) noexcept
424 : _fd(std::move(fd)), _size(size), _trim_to_size(opts.trim_to_size), _batch_flushes(opts.batch_flushes && _fd.can_batch_flushes()) {}
425 [[deprecated("use output_stream_options instead of booleans")]]
426 output_stream(data_sink fd, size_t size, bool trim_to_size, bool batch_flushes = false) noexcept
427 : _fd(std::move(fd)), _size(size), _trim_to_size(trim_to_size), _batch_flushes(batch_flushes && _fd.can_batch_flushes()) {}
428 output_stream(data_sink fd) noexcept
429 : _fd(std::move(fd)), _size(_fd.buffer_size()), _trim_to_size(true) {}
430 output_stream(output_stream&&) noexcept = default;
431 output_stream& operator=(output_stream&&) noexcept = default;
433 if (_batch_flushes) {
434 SEASTAR_ASSERT(!_in_batch && "Was this stream properly closed?");
435 } else {
436 SEASTAR_ASSERT(!_end && !_zc_bufs && "Was this stream properly closed?");
437 }
438 }
439 future<> write(const char_type* buf, size_t n) noexcept;
440 future<> write(const char_type* buf) noexcept;
441
442 template <typename StringChar, typename SizeType, SizeType MaxSize, bool NulTerminate>
444 future<> write(const std::basic_string<char_type>& s) noexcept;
445
446 future<> write(net::packet p) noexcept;
447 future<> write(scattered_message<char_type> msg) noexcept;
449 future<> flush() noexcept;
450
454 future<> close() noexcept;
455
467 data_sink detach() &&;
468
469 using batch_flush_list_t = bi::slist<output_stream,
470 bi::constant_time_size<false>, bi::cache_last<true>,
471 bi::member_hook<output_stream, bi::slist_member_hook<>, &output_stream::_in_poller>>;
472private:
473 friend class reactor;
474};
475
479template <typename CharType>
481
482SEASTAR_MODULE_EXPORT_END
483}
484
485#include "iostream-impl.hh"
Definition: sstring.hh:76
Definition: iostream.hh:238
Definition: iostream.hh:105
Definition: iostream.hh:164
Definition: iostream.hh:62
Definition: iostream.hh:70
A representation of a possibly not-yet-computed value.
Definition: future.hh:1197
value_type && get()
gets the value returned by the computation
Definition: future.hh:1299
Definition: iostream.hh:296
future skip(uint64_t n) noexcept
Ignores n next bytes from the stream.
Definition: iostream-impl.hh:302
future< tmp_buf > read_up_to(size_t n) noexcept
Definition: iostream-impl.hh:256
future< temporary_buffer< CharType > > read_exactly(size_t n) noexcept
Definition: iostream-impl.hh:185
future< tmp_buf > read() noexcept
Definition: iostream-impl.hh:285
future close() noexcept
Definition: iostream.hh:354
data_source detach() &&
Definition: iostream-impl.hh:316
bool eof() const noexcept
Definition: iostream.hh:338
Definition: packet.hh:87
Definition: iostream.hh:394
data_sink detach() &&
Definition: iostream-impl.hh:516
future close() noexcept
Definition: iostream-impl.hh:497
Definition: reactor.hh:152
Definition: scattered_message.hh:35
Definition: iostream.hh:229
Definition: iostream.hh:218
deleter release() noexcept
Definition: temporary_buffer.hh:203
size_t size() const noexcept
Gets the buffer size.
Definition: temporary_buffer.hh:130
CharType * get_write() noexcept
Definition: temporary_buffer.hh:128
future< T > current_exception_as_future() noexcept
Returns std::current_exception() wrapped in a future.
Definition: future.hh:1888
holds the implementation parts of the metrics layer, do not use directly.
Definition: packet.hh:46
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
bool batch_flushes
Try to merge flushes with each other.
Definition: iostream.hh:379
bool trim_to_size
Definition: iostream.hh:377
future copy(input_stream< CharType > &in, output_stream< CharType > &out)
copy all the content from the input stream to the output stream
Definition: iostream-impl.hh:551
Definition: iostream.hh:215
Definition: iostream.hh:376