38 #include <seastar/core/future.hh>
39 #include <seastar/core/temporary_buffer.hh>
40 #include <seastar/core/scattered_message.hh>
41 #include <seastar/util/std-compat.hh>
42 #include <seastar/util/modules.hh>
43 #ifndef SEASTAR_MODULE
44 #include <boost/intrusive/slist.hpp>
52 namespace bi = boost::intrusive;
56 SEASTAR_MODULE_EXPORT_BEGIN
58 namespace net {
class packet; }
65 virtual future<> close() {
return make_ready_future<>(); }
69 std::unique_ptr<data_source_impl> _dsi;
76 explicit data_source(std::unique_ptr<data_source_impl> dsi) noexcept : _dsi(std::move(dsi)) {}
84 return current_exception_as_future<tmp_buf>();
91 return current_exception_as_future<tmp_buf>();
98 return current_exception_as_future<>();
112 p.reserve(data.size());
113 for (
auto& buf : data) {
116 return put(std::move(p));
122 return make_ready_future<>();
130 virtual size_t buffer_size()
const noexcept {
131 assert(
false &&
"Data sink must have the buffer_size() method overload");
139 virtual bool can_batch_flushes()
const noexcept {
143 virtual void on_batch_flush_error() noexcept {
144 assert(
false &&
"Data sink must implement on_batch_flush_error() method");
149 std::unique_ptr<data_sink_impl> _dsi;
152 explicit data_sink(std::unique_ptr<data_sink_impl> dsi) noexcept : _dsi(std::move(dsi)) {}
156 return _dsi->allocate_buffer(size);
160 return _dsi->put(std::move(data));
167 return _dsi->put(std::move(data));
174 return _dsi->put(std::move(p));
181 return _dsi->flush();
188 return _dsi->close();
194 size_t buffer_size()
const noexcept {
return _dsi->buffer_size(); }
195 bool can_batch_flushes()
const noexcept {
return _dsi->can_batch_flushes(); }
196 void on_batch_flush_error() noexcept { _dsi->on_batch_flush_error(); }
201 template <
typename CharType>
207 tmp_buf& get_buffer() {
return _buf; }
208 const tmp_buf& get_buffer()
const {
return _buf; }
215 explicit skip_bytes(uint64_t v) : _value(v) {}
216 uint64_t get_value()
const {
return _value; }
221 template <
typename CharType>
225 using consumption_variant = std::variant<continue_consuming, stop_consuming_type, skip_bytes>;
238 consumption_variant& get() {
return _result; }
239 const consumption_variant& get()
const {
return _result; }
242 consumption_variant _result;
265 template <typename Consumer, typename CharType>
266 concept InputStreamConsumer = requires (Consumer c) {
270 template <
typename Consumer,
typename CharType>
271 concept ObsoleteInputStreamConsumer = requires (Consumer c) {
272 { c(temporary_buffer<CharType>{}) } -> std::same_as<future<std::optional<temporary_buffer<CharType>>>>;
280 template <typename CharType>
282 static_assert(
sizeof(CharType) == 1,
"must buffer stream of bytes");
288 size_t available()
const noexcept {
return _buf.
size(); }
290 void reset() noexcept { _buf = {}; }
295 using unconsumed_remainder = std::optional<tmp_buf>;
296 using char_type = CharType;
312 template <
typename Consumer>
313 SEASTAR_CONCEPT(requires InputStreamConsumer<Consumer, CharType> || ObsoleteInputStreamConsumer<Consumer, CharType>)
314 future<> consume(Consumer&& c) noexcept(std::is_nothrow_move_constructible_v<Consumer>);
315 template <
typename Consumer>
316 SEASTAR_CONCEPT(requires InputStreamConsumer<Consumer, CharType> || ObsoleteInputStreamConsumer<Consumer, CharType>)
317 future<> consume(Consumer& c) noexcept(std::is_nothrow_move_constructible_v<Consumer>);
323 bool eof() const noexcept {
return _eof; }
329 future<tmp_buf> read_up_to(
size_t n) noexcept;
378 template <
typename CharType>
380 static_assert(
sizeof(CharType) == 1,
"must buffer stream of bytes");
383 net::packet _zc_bufs = net::packet::make_null_packet();
387 bool _trim_to_size =
false;
388 bool _batch_flushes =
false;
389 std::optional<promise<>> _in_batch;
391 bool _flushing =
false;
392 std::exception_ptr _ex;
393 bi::slist_member_hook<> _in_poller;
396 size_t available()
const noexcept {
return _end - _begin; }
399 void poll_flush() noexcept;
404 future<> slow_write(
const CharType* buf,
size_t n) noexcept;
406 using char_type = CharType;
409 : _fd(std::move(fd)), _size(size), _trim_to_size(opts.trim_to_size), _batch_flushes(opts.batch_flushes && _fd.can_batch_flushes()) {}
410 [[deprecated(
"use output_stream_options instead of booleans")]]
412 : _fd(std::move(fd)), _size(size), _trim_to_size(trim_to_size), _batch_flushes(batch_flushes && _fd.can_batch_flushes()) {}
414 : _fd(std::move(fd)), _size(_fd.buffer_size()), _trim_to_size(
true) {}
418 if (_batch_flushes) {
419 assert(!_in_batch &&
"Was this stream properly closed?");
421 assert(!_end && !_zc_bufs &&
"Was this stream properly closed?");
424 future<> write(
const char_type* buf,
size_t n) noexcept;
425 future<> write(
const char_type* buf) noexcept;
427 template <
typename StringChar,
typename SizeType, SizeType MaxSize,
bool NulTerminate>
429 future<> write(
const std::basic_string<char_type>& s) noexcept;
455 bi::constant_time_size<false>, bi::cache_last<true>,
456 bi::member_hook<output_stream, bi::slist_member_hook<>, &output_stream::_in_poller>>;
464 template <
typename CharType>
467 SEASTAR_MODULE_EXPORT_END
470 #include "iostream-impl.hh"
Definition: sstring.hh:75
Definition: iostream.hh:222
Definition: iostream.hh:103
Definition: iostream.hh:148
Definition: iostream.hh:60
Definition: iostream.hh:68
A representation of a possibly not-yet-computed value.
Definition: future.hh:1238
Definition: iostream.hh:379
data_sink detach() &&
Definition: iostream-impl.hh:515
future close() noexcept
Definition: iostream-impl.hh:496
Definition: reactor.hh:168
Definition: scattered_message.hh:39
Definition: iostream.hh:213
Definition: iostream.hh:202
deleter release() noexcept
Definition: temporary_buffer.hh:203
CharType * get_write() noexcept
Definition: temporary_buffer.hh:128
size_t size() const noexcept
Gets the buffer size.
Definition: temporary_buffer.hh:130
future< T > current_exception_as_future() noexcept
Returns std::current_exception() wrapped in a future.
Definition: future.hh:1953
holds the implementation parts of the metrics layer, do not use directly.
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
bool batch_flushes
Try to merge flushes with each other.
Definition: iostream.hh:364
bool trim_to_size
Definition: iostream.hh:362
future copy(input_stream< CharType > &in, output_stream< CharType > &out)
copy all the content from the input stream to the output stream
Definition: iostream-impl.hh:550
Definition: iostream.hh:199
Definition: iostream.hh:361