38#include <seastar/core/future.hh>
39#include <seastar/core/coroutine.hh>
40#include <seastar/core/temporary_buffer.hh>
41#include <seastar/core/scattered_message.hh>
42#include <seastar/util/assert.hh>
43#include <seastar/util/std-compat.hh>
44#include <seastar/util/modules.hh>
46#include <boost/intrusive/slist.hpp>
54namespace bi = boost::intrusive;
58SEASTAR_MODULE_EXPORT_BEGIN
60namespace net {
class packet; }
67 virtual future<> close() {
return make_ready_future<>(); }
71 std::unique_ptr<data_source_impl> _dsi;
78 explicit data_source(std::unique_ptr<data_source_impl> dsi) noexcept : _dsi(std::move(dsi)) {}
86 return current_exception_as_future<tmp_buf>();
93 return current_exception_as_future<tmp_buf>();
100 return current_exception_as_future<>();
114 p.reserve(data.size());
115 for (
auto& buf : data) {
118 return put(std::move(p));
124 return make_ready_future<>();
132 virtual size_t buffer_size()
const noexcept {
133 SEASTAR_ASSERT(
false &&
"Data sink must have the buffer_size() method overload");
141 virtual bool can_batch_flushes()
const noexcept {
145 virtual void on_batch_flush_error()
noexcept {
146 SEASTAR_ASSERT(
false &&
"Data sink must implement on_batch_flush_error() method");
157 auto buffers = data.release();
159 co_await this->put(std::move(buf));
165 std::unique_ptr<data_sink_impl> _dsi;
168 explicit data_sink(std::unique_ptr<data_sink_impl> dsi) noexcept : _dsi(std::move(dsi)) {}
172 return _dsi->allocate_buffer(size);
176 return _dsi->put(std::move(data));
183 return _dsi->put(std::move(data));
190 return _dsi->put(std::move(p));
197 return _dsi->flush();
204 return _dsi->close();
210 size_t buffer_size()
const noexcept {
return _dsi->buffer_size(); }
211 bool can_batch_flushes()
const noexcept {
return _dsi->can_batch_flushes(); }
212 void on_batch_flush_error()
noexcept { _dsi->on_batch_flush_error(); }
217template <
typename CharType>
223 tmp_buf& get_buffer() {
return _buf; }
224 const tmp_buf& get_buffer()
const {
return _buf; }
231 explicit skip_bytes(uint64_t v) : _value(v) {}
232 uint64_t get_value()
const {
return _value; }
237template <
typename CharType>
241 using consumption_variant = std::variant<continue_consuming, stop_consuming_type, skip_bytes>;
254 consumption_variant& get() {
return _result; }
255 const consumption_variant& get()
const {
return _result; }
258 consumption_variant _result;
281template <
typename Consumer,
typename CharType>
282concept InputStreamConsumer =
requires (Consumer c) {
286template <
typename Consumer,
typename CharType>
287concept ObsoleteInputStreamConsumer =
requires (Consumer c) {
288 { c(temporary_buffer<CharType>{}) } -> std::same_as<future<std::optional<temporary_buffer<CharType>>>>;
295template <
typename CharType>
297 static_assert(
sizeof(CharType) == 1,
"must buffer stream of bytes");
303 size_t available()
const noexcept {
return _buf.
size(); }
305 void reset()
noexcept { _buf = {}; }
310 using unconsumed_remainder = std::optional<tmp_buf>;
311 using char_type = CharType;
327 template <
typename Consumer>
328 requires InputStreamConsumer<Consumer, CharType> || ObsoleteInputStreamConsumer<Consumer, CharType>
329 future<> consume(Consumer&& c)
noexcept(std::is_nothrow_move_constructible_v<Consumer>);
330 template <
typename Consumer>
331 requires InputStreamConsumer<Consumer, CharType> || ObsoleteInputStreamConsumer<Consumer, CharType>
332 future<> consume(Consumer& c)
noexcept(std::is_nothrow_move_constructible_v<Consumer>);
338 bool eof() const noexcept {
return _eof; }
393template <
typename CharType>
395 static_assert(
sizeof(CharType) == 1,
"must buffer stream of bytes");
398 net::packet _zc_bufs = net::packet::make_null_packet();
402 bool _trim_to_size =
false;
403 bool _batch_flushes =
false;
404 std::optional<promise<>> _in_batch;
406 bool _flushing =
false;
407 std::exception_ptr _ex;
408 bi::slist_member_hook<> _in_poller;
411 size_t available()
const noexcept {
return _end - _begin; }
414 void poll_flush()
noexcept;
419 future<> slow_write(
const CharType* buf,
size_t n)
noexcept;
421 using char_type = CharType;
424 : _fd(std::move(fd)), _size(size), _trim_to_size(opts.trim_to_size), _batch_flushes(opts.batch_flushes && _fd.can_batch_flushes()) {}
425 [[deprecated(
"use output_stream_options instead of booleans")]]
427 : _fd(std::move(fd)), _size(size), _trim_to_size(trim_to_size), _batch_flushes(batch_flushes && _fd.can_batch_flushes()) {}
429 : _fd(std::move(fd)), _size(_fd.buffer_size()), _trim_to_size(
true) {}
433 if (_batch_flushes) {
434 SEASTAR_ASSERT(!_in_batch &&
"Was this stream properly closed?");
436 SEASTAR_ASSERT(!_end && !_zc_bufs &&
"Was this stream properly closed?");
439 future<> write(
const char_type* buf,
size_t n)
noexcept;
440 future<> write(
const char_type* buf)
noexcept;
442 template <
typename StringChar,
typename SizeType, SizeType MaxSize,
bool NulTerminate>
444 future<> write(
const std::basic_string<char_type>& s)
noexcept;
470 bi::constant_time_size<false>, bi::cache_last<true>,
471 bi::member_hook<output_stream, bi::slist_member_hook<>, &output_stream::_in_poller>>;
479template <
typename CharType>
482SEASTAR_MODULE_EXPORT_END
485#include "iostream-impl.hh"
Definition: sstring.hh:76
Definition: iostream.hh:238
Definition: iostream.hh:105
Definition: iostream.hh:164
Definition: iostream.hh:62
Definition: iostream.hh:70
A representation of a possibly not-yet-computed value.
Definition: future.hh:1197
value_type && get()
gets the value returned by the computation
Definition: future.hh:1299
Definition: iostream.hh:394
data_sink detach() &&
Definition: iostream-impl.hh:516
future close() noexcept
Definition: iostream-impl.hh:497
Definition: reactor.hh:152
Definition: scattered_message.hh:35
Definition: iostream.hh:229
Definition: iostream.hh:218
deleter release() noexcept
Definition: temporary_buffer.hh:203
size_t size() const noexcept
Gets the buffer size.
Definition: temporary_buffer.hh:130
CharType * get_write() noexcept
Definition: temporary_buffer.hh:128
future< T > current_exception_as_future() noexcept
Returns std::current_exception() wrapped in a future.
Definition: future.hh:1888
holds the implementation parts of the metrics layer, do not use directly.
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
bool batch_flushes
Try to merge flushes with each other.
Definition: iostream.hh:379
bool trim_to_size
Definition: iostream.hh:377
future copy(input_stream< CharType > &in, output_stream< CharType > &out)
copy all the content from the input stream to the output stream
Definition: iostream-impl.hh:551
Definition: iostream.hh:215
Definition: iostream.hh:376