38#include <seastar/core/future.hh>
39#include <seastar/core/coroutine.hh>
40#include <seastar/core/temporary_buffer.hh>
41#include <seastar/core/scattered_message.hh>
42#include <seastar/util/std-compat.hh>
43#include <seastar/util/modules.hh>
45#include <boost/intrusive/slist.hpp>
53namespace bi = boost::intrusive;
57SEASTAR_MODULE_EXPORT_BEGIN
59namespace net {
class packet; }
66 virtual future<> close() {
return make_ready_future<>(); }
70 std::unique_ptr<data_source_impl> _dsi;
77 explicit data_source(std::unique_ptr<data_source_impl> dsi) noexcept : _dsi(std::move(dsi)) {}
85 return current_exception_as_future<tmp_buf>();
92 return current_exception_as_future<tmp_buf>();
99 return current_exception_as_future<>();
113 p.reserve(data.size());
114 for (
auto& buf : data) {
117 return put(std::move(p));
123 return make_ready_future<>();
131 virtual size_t buffer_size()
const noexcept {
132 assert(
false &&
"Data sink must have the buffer_size() method overload");
140 virtual bool can_batch_flushes()
const noexcept {
144 virtual void on_batch_flush_error()
noexcept {
145 assert(
false &&
"Data sink must implement on_batch_flush_error() method");
156 auto buffers = data.release();
158 co_await this->put(std::move(buf));
164 std::unique_ptr<data_sink_impl> _dsi;
167 explicit data_sink(std::unique_ptr<data_sink_impl> dsi) noexcept : _dsi(std::move(dsi)) {}
171 return _dsi->allocate_buffer(size);
175 return _dsi->put(std::move(data));
182 return _dsi->put(std::move(data));
189 return _dsi->put(std::move(p));
196 return _dsi->flush();
203 return _dsi->close();
209 size_t buffer_size()
const noexcept {
return _dsi->buffer_size(); }
210 bool can_batch_flushes()
const noexcept {
return _dsi->can_batch_flushes(); }
211 void on_batch_flush_error()
noexcept { _dsi->on_batch_flush_error(); }
216template <
typename CharType>
222 tmp_buf& get_buffer() {
return _buf; }
223 const tmp_buf& get_buffer()
const {
return _buf; }
230 explicit skip_bytes(uint64_t v) : _value(v) {}
231 uint64_t get_value()
const {
return _value; }
236template <
typename CharType>
240 using consumption_variant = std::variant<continue_consuming, stop_consuming_type, skip_bytes>;
253 consumption_variant& get() {
return _result; }
254 const consumption_variant& get()
const {
return _result; }
257 consumption_variant _result;
280template <
typename Consumer,
typename CharType>
281concept InputStreamConsumer =
requires (Consumer c) {
285template <
typename Consumer,
typename CharType>
286concept ObsoleteInputStreamConsumer =
requires (Consumer c) {
287 { c(temporary_buffer<CharType>{}) } -> std::same_as<future<std::optional<temporary_buffer<CharType>>>>;
294template <
typename CharType>
296 static_assert(
sizeof(CharType) == 1,
"must buffer stream of bytes");
302 size_t available()
const noexcept {
return _buf.
size(); }
304 void reset()
noexcept { _buf = {}; }
309 using unconsumed_remainder = std::optional<tmp_buf>;
310 using char_type = CharType;
326 template <
typename Consumer>
327 requires InputStreamConsumer<Consumer, CharType> || ObsoleteInputStreamConsumer<Consumer, CharType>
328 future<> consume(Consumer&& c)
noexcept(std::is_nothrow_move_constructible_v<Consumer>);
329 template <
typename Consumer>
330 requires InputStreamConsumer<Consumer, CharType> || ObsoleteInputStreamConsumer<Consumer, CharType>
331 future<> consume(Consumer& c)
noexcept(std::is_nothrow_move_constructible_v<Consumer>);
337 bool eof() const noexcept {
return _eof; }
392template <
typename CharType>
394 static_assert(
sizeof(CharType) == 1,
"must buffer stream of bytes");
397 net::packet _zc_bufs = net::packet::make_null_packet();
401 bool _trim_to_size =
false;
402 bool _batch_flushes =
false;
403 std::optional<promise<>> _in_batch;
405 bool _flushing =
false;
406 std::exception_ptr _ex;
407 bi::slist_member_hook<> _in_poller;
410 size_t available()
const noexcept {
return _end - _begin; }
413 void poll_flush()
noexcept;
418 future<> slow_write(
const CharType* buf,
size_t n)
noexcept;
420 using char_type = CharType;
423 : _fd(std::move(fd)), _size(size), _trim_to_size(opts.trim_to_size), _batch_flushes(opts.batch_flushes && _fd.can_batch_flushes()) {}
424 [[deprecated(
"use output_stream_options instead of booleans")]]
426 : _fd(std::move(fd)), _size(size), _trim_to_size(trim_to_size), _batch_flushes(batch_flushes && _fd.can_batch_flushes()) {}
428 : _fd(std::move(fd)), _size(_fd.buffer_size()), _trim_to_size(
true) {}
432 if (_batch_flushes) {
433 assert(!_in_batch &&
"Was this stream properly closed?");
435 assert(!_end && !_zc_bufs &&
"Was this stream properly closed?");
438 future<> write(
const char_type* buf,
size_t n)
noexcept;
439 future<> write(
const char_type* buf)
noexcept;
441 template <
typename StringChar,
typename SizeType, SizeType MaxSize,
bool NulTerminate>
443 future<> write(
const std::basic_string<char_type>& s)
noexcept;
469 bi::constant_time_size<false>, bi::cache_last<true>,
470 bi::member_hook<output_stream, bi::slist_member_hook<>, &output_stream::_in_poller>>;
478template <
typename CharType>
481SEASTAR_MODULE_EXPORT_END
484#include "iostream-impl.hh"
Definition: sstring.hh:76
Definition: iostream.hh:237
Definition: iostream.hh:104
Definition: iostream.hh:163
Definition: iostream.hh:61
Definition: iostream.hh:69
A representation of a possibly not-yet-computed value.
Definition: future.hh:1240
value_type && get()
gets the value returned by the computation
Definition: future.hh:1342
Definition: iostream.hh:393
data_sink detach() &&
Definition: iostream-impl.hh:515
future close() noexcept
Definition: iostream-impl.hh:496
Definition: reactor.hh:146
Definition: scattered_message.hh:35
Definition: iostream.hh:228
Definition: iostream.hh:217
deleter release() noexcept
Definition: temporary_buffer.hh:203
size_t size() const noexcept
Gets the buffer size.
Definition: temporary_buffer.hh:130
CharType * get_write() noexcept
Definition: temporary_buffer.hh:128
future< T > current_exception_as_future() noexcept
Returns std::current_exception() wrapped in a future.
Definition: future.hh:1962
holds the implementation parts of the metrics layer, do not use directly.
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
bool batch_flushes
Try to merge flushes with each other.
Definition: iostream.hh:378
bool trim_to_size
Definition: iostream.hh:376
future copy(input_stream< CharType > &in, output_stream< CharType > &out)
copy all the content from the input stream to the output stream
Definition: iostream-impl.hh:550
Definition: iostream.hh:214
Definition: iostream.hh:375