38 #include <seastar/core/future.hh>
39 #include <seastar/core/temporary_buffer.hh>
40 #include <seastar/core/scattered_message.hh>
41 #include <seastar/util/std-compat.hh>
45 namespace net {
class packet; }
52 virtual future<> close() {
return make_ready_future<>(); }
56 std::unique_ptr<data_source_impl> _dsi;
61 explicit data_source(std::unique_ptr<data_source_impl> dsi) noexcept : _dsi(std::move(dsi)) {}
66 future<> close() {
return _dsi->close(); }
78 p.reserve(data.size());
79 for (
auto& buf : data) {
82 return put(std::move(p));
88 return make_ready_future<>();
94 std::unique_ptr<data_sink_impl> _dsi;
97 explicit data_sink(std::unique_ptr<data_sink_impl> dsi) noexcept : _dsi(std::move(dsi)) {}
101 return _dsi->allocate_buffer(size);
104 return _dsi->put(std::move(data));
107 return _dsi->put(std::move(data));
110 return _dsi->put(std::move(p));
113 return _dsi->flush();
115 future<> close() {
return _dsi->close(); }
120 template <
typename CharType>
126 tmp_buf& get_buffer() {
return _buf; }
127 const tmp_buf& get_buffer()
const {
return _buf; }
134 explicit skip_bytes(uint64_t v) : _value(v) {}
135 uint64_t get_value()
const {
return _value; }
140 template <
typename CharType>
144 using consumption_variant = std::variant<continue_consuming, stop_consuming_type, skip_bytes>;
157 consumption_variant& get() {
return _result; }
158 const consumption_variant& get()
const {
return _result; }
161 consumption_variant _result;
184 template <typename Consumer, typename CharType>
185 concept InputStreamConsumer = requires (Consumer c) {
189 template <
typename Consumer,
typename CharType>
190 concept ObsoleteInputStreamConsumer = requires (Consumer c) {
191 { c(temporary_buffer<CharType>{}) } -> std::same_as<future<std::optional<temporary_buffer<CharType>>>>;
199 template <typename CharType>
201 static_assert(
sizeof(CharType) == 1,
"must buffer stream of bytes");
207 size_t available()
const {
return _buf.
size(); }
209 void reset() { _buf = {}; }
214 using unconsumed_remainder = std::optional<tmp_buf>;
215 using char_type = CharType;
231 template <
typename Consumer>
232 SEASTAR_CONCEPT(requires InputStreamConsumer<Consumer, CharType> || ObsoleteInputStreamConsumer<Consumer, CharType>)
234 template <
typename Consumer>
235 SEASTAR_CONCEPT(requires InputStreamConsumer<Consumer, CharType> || ObsoleteInputStreamConsumer<Consumer, CharType>)
237 bool eof()
const {
return _eof; }
286 template <
typename CharType>
288 static_assert(
sizeof(CharType) == 1,
"must buffer stream of bytes");
291 net::packet _zc_bufs = net::packet::make_null_packet();
295 bool _trim_to_size =
false;
296 bool _batch_flushes =
false;
297 std::optional<promise<>> _in_batch;
299 bool _flushing =
false;
300 std::exception_ptr _ex;
302 size_t available()
const {
return _end - _begin; }
303 size_t possibly_available()
const {
return _size - _begin; }
310 future<> slow_write(
const CharType* buf,
size_t n);
312 using char_type = CharType;
315 : _fd(std::move(fd)), _size(size), _trim_to_size(trim_to_size), _batch_flushes(batch_flushes) {}
318 ~
output_stream() { assert(!_in_batch &&
"Was this stream properly closed?"); }
319 future<> write(
const char_type* buf,
size_t n);
320 future<> write(
const char_type* buf);
322 template <
typename StringChar,
typename SizeType, SizeType MaxSize,
bool NulTerminate>
324 future<> write(
const std::basic_string<char_type>& s);
355 template <
typename CharType>
360 #include "iostream-impl.hh"