38#include <seastar/core/future.hh>
39#include <seastar/core/temporary_buffer.hh>
40#include <seastar/core/scattered_message.hh>
41#include <seastar/util/std-compat.hh>
42#include <seastar/util/modules.hh>
44#include <boost/intrusive/slist.hpp>
52namespace bi = boost::intrusive;
56SEASTAR_MODULE_EXPORT_BEGIN
58namespace net {
class packet; }
65 virtual future<> close() {
return make_ready_future<>(); }
69 std::unique_ptr<data_source_impl> _dsi;
76 explicit data_source(std::unique_ptr<data_source_impl> dsi) noexcept : _dsi(std::move(dsi)) {}
84 return current_exception_as_future<tmp_buf>();
91 return current_exception_as_future<tmp_buf>();
98 return current_exception_as_future<>();
112 p.reserve(data.size());
113 for (
auto& buf : data) {
116 return put(std::move(p));
122 return make_ready_future<>();
130 virtual size_t buffer_size()
const noexcept {
131 assert(
false &&
"Data sink must have the buffer_size() method overload");
139 virtual bool can_batch_flushes()
const noexcept {
143 virtual void on_batch_flush_error()
noexcept {
144 assert(
false &&
"Data sink must implement on_batch_flush_error() method");
149 std::unique_ptr<data_sink_impl> _dsi;
152 explicit data_sink(std::unique_ptr<data_sink_impl> dsi) noexcept : _dsi(std::move(dsi)) {}
156 return _dsi->allocate_buffer(size);
160 return _dsi->put(std::move(data));
167 return _dsi->put(std::move(data));
174 return _dsi->put(std::move(p));
181 return _dsi->flush();
188 return _dsi->close();
194 size_t buffer_size()
const noexcept {
return _dsi->buffer_size(); }
195 bool can_batch_flushes()
const noexcept {
return _dsi->can_batch_flushes(); }
196 void on_batch_flush_error()
noexcept { _dsi->on_batch_flush_error(); }
201template <
typename CharType>
207 tmp_buf& get_buffer() {
return _buf; }
208 const tmp_buf& get_buffer()
const {
return _buf; }
215 explicit skip_bytes(uint64_t v) : _value(v) {}
216 uint64_t get_value()
const {
return _value; }
221template <
typename CharType>
225 using consumption_variant = std::variant<continue_consuming, stop_consuming_type, skip_bytes>;
238 consumption_variant& get() {
return _result; }
239 const consumption_variant& get()
const {
return _result; }
242 consumption_variant _result;
265template <
typename Consumer,
typename CharType>
266concept InputStreamConsumer =
requires (Consumer c) {
270template <
typename Consumer,
typename CharType>
271concept ObsoleteInputStreamConsumer =
requires (Consumer c) {
272 { c(temporary_buffer<CharType>{}) } -> std::same_as<future<std::optional<temporary_buffer<CharType>>>>;
279template <
typename CharType>
281 static_assert(
sizeof(CharType) == 1,
"must buffer stream of bytes");
287 size_t available()
const noexcept {
return _buf.
size(); }
289 void reset()
noexcept { _buf = {}; }
294 using unconsumed_remainder = std::optional<tmp_buf>;
295 using char_type = CharType;
311 template <
typename Consumer>
312 requires InputStreamConsumer<Consumer, CharType> || ObsoleteInputStreamConsumer<Consumer, CharType>
313 future<> consume(Consumer&& c)
noexcept(std::is_nothrow_move_constructible_v<Consumer>);
314 template <
typename Consumer>
315 requires InputStreamConsumer<Consumer, CharType> || ObsoleteInputStreamConsumer<Consumer, CharType>
316 future<> consume(Consumer& c)
noexcept(std::is_nothrow_move_constructible_v<Consumer>);
322 bool eof() const noexcept {
return _eof; }
377template <
typename CharType>
379 static_assert(
sizeof(CharType) == 1,
"must buffer stream of bytes");
382 net::packet _zc_bufs = net::packet::make_null_packet();
386 bool _trim_to_size =
false;
387 bool _batch_flushes =
false;
388 std::optional<promise<>> _in_batch;
390 bool _flushing =
false;
391 std::exception_ptr _ex;
392 bi::slist_member_hook<> _in_poller;
395 size_t available()
const noexcept {
return _end - _begin; }
398 void poll_flush()
noexcept;
403 future<> slow_write(
const CharType* buf,
size_t n)
noexcept;
405 using char_type = CharType;
408 : _fd(std::move(fd)), _size(size), _trim_to_size(opts.trim_to_size), _batch_flushes(opts.batch_flushes && _fd.can_batch_flushes()) {}
409 [[deprecated(
"use output_stream_options instead of booleans")]]
411 : _fd(std::move(fd)), _size(size), _trim_to_size(trim_to_size), _batch_flushes(batch_flushes && _fd.can_batch_flushes()) {}
413 : _fd(std::move(fd)), _size(_fd.buffer_size()), _trim_to_size(
true) {}
417 if (_batch_flushes) {
418 assert(!_in_batch &&
"Was this stream properly closed?");
420 assert(!_end && !_zc_bufs &&
"Was this stream properly closed?");
423 future<> write(
const char_type* buf,
size_t n)
noexcept;
424 future<> write(
const char_type* buf)
noexcept;
426 template <
typename StringChar,
typename SizeType, SizeType MaxSize,
bool NulTerminate>
428 future<> write(
const std::basic_string<char_type>& s)
noexcept;
454 bi::constant_time_size<false>, bi::cache_last<true>,
455 bi::member_hook<output_stream, bi::slist_member_hook<>, &output_stream::_in_poller>>;
463template <
typename CharType>
466SEASTAR_MODULE_EXPORT_END
469#include "iostream-impl.hh"
Definition: sstring.hh:76
Definition: iostream.hh:222
Definition: iostream.hh:103
Definition: iostream.hh:148
Definition: iostream.hh:60
Definition: iostream.hh:68
A representation of a possibly not-yet-computed value.
Definition: future.hh:1240
value_type && get()
gets the value returned by the computation
Definition: future.hh:1342
Definition: iostream.hh:378
data_sink detach() &&
Definition: iostream-impl.hh:515
future close() noexcept
Definition: iostream-impl.hh:496
Definition: reactor.hh:150
Definition: scattered_message.hh:35
Definition: iostream.hh:213
Definition: iostream.hh:202
deleter release() noexcept
Definition: temporary_buffer.hh:203
size_t size() const noexcept
Gets the buffer size.
Definition: temporary_buffer.hh:130
CharType * get_write() noexcept
Definition: temporary_buffer.hh:128
future< T > current_exception_as_future() noexcept
Returns std::current_exception() wrapped in a future.
Definition: future.hh:1962
holds the implementation parts of the metrics layer, do not use directly.
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
bool batch_flushes
Try to merge flushes with each other.
Definition: iostream.hh:363
bool trim_to_size
Definition: iostream.hh:361
future copy(input_stream< CharType > &in, output_stream< CharType > &out)
copy all the content from the input stream to the output stream
Definition: iostream-impl.hh:550
Definition: iostream.hh:199
Definition: iostream.hh:360