26#include <seastar/core/coroutine.hh>
27#include <seastar/core/do_with.hh>
28#include <seastar/core/loop.hh>
29#include <seastar/net/packet.hh>
30#include <seastar/util/assert.hh>
31#include <seastar/util/variant_utils.hh>
35inline future<temporary_buffer<char>> data_source_impl::skip(uint64_t n)
37 return do_with(uint64_t(n), [
this] (uint64_t& n) {
39 return get().then([&] (temporary_buffer<char> buffer) -> std::optional<temporary_buffer<char>> {
43 if (buffer.size() >= n) {
54template<
typename CharType>
57 return write(buf, strlen(buf));
60template<
typename CharType>
61template<
typename StringChar,
typename SizeType, SizeType MaxSize,
bool NulTerminate>
64 return write(
reinterpret_cast<const CharType *
>(s.c_str()), s.size());
67template<
typename CharType>
70 return write(s.c_str(), s.size());
73template<
typename CharType>
75 return write(std::move(msg).release());
78template<
typename CharType>
85 return _in_batch.value().get_future().then([
this, p = std::move(p)] ()
mutable {
86 return _fd.put(std::move(p));
89 return _fd.put(std::move(p));
94template <
typename CharType>
96output_stream<CharType>::zero_copy_split_and_put(net::packet p)
noexcept {
97 return repeat([
this, p = std::move(p)] ()
mutable {
98 if (p.len() < _size) {
100 _zc_bufs = std::move(p);
102 _zc_bufs = net::packet::make_null_packet();
104 return make_ready_future<stop_iteration>(stop_iteration::yes);
106 auto chunk = p.share(0, _size);
108 return zero_copy_put(std::move(chunk)).then([] {
109 return stop_iteration::no;
114template<
typename CharType>
116 static_assert(std::is_same_v<CharType, char>,
"packet works on char");
122 SEASTAR_ASSERT(!_zc_bufs);
127 _zc_bufs.append(std::move(p));
129 _zc_bufs = std::move(p);
132 if (_zc_bufs.len() >= _size) {
134 return zero_copy_split_and_put(std::move(_zc_bufs));
136 return zero_copy_put(std::move(_zc_bufs));
140 return make_ready_future<>();
146template<
typename CharType>
155template <
typename CharType>
159 size_t completed{0U};
160 while (completed < n) {
161 size_t avail = available();
163 auto now = std::min(n - completed, avail);
164 std::copy_n(_buf.get(),
now, out.get_write() + completed);
165 _buf.trim_front(
now);
167 if (completed == n) {
173 temporary_buffer<CharType> buf =
co_await _fd.
get();
174 if (buf.size() == 0) {
179 _buf = std::move(buf);
184template <
typename CharType>
185future<temporary_buffer<CharType>>
187 if (_buf.size() == n) {
189 return make_ready_future<tmp_buf>(std::move(_buf));
190 }
else if (_buf.size() > n) {
192 auto front = _buf.share(0, n);
194 return make_ready_future<tmp_buf>(std::move(front));
195 }
else if (_buf.size() == 0) {
197 return _fd.get().then([
this, n] (
auto buf)
mutable {
198 if (buf.size() == 0) {
200 return make_ready_future<tmp_buf>(std::move(buf));
202 _buf = std::move(buf);
203 return this->read_exactly(n);
207 return read_exactly_part(n);
211template <
typename CharType>
212template <
typename Consumer>
213requires InputStreamConsumer<Consumer, CharType> || ObsoleteInputStreamConsumer<Consumer, CharType>
216 return repeat([consumer = std::move(consumer),
this] ()
mutable {
217 if (_buf.empty() && !_eof) {
218 return _fd.get().then([this] (tmp_buf buf) {
219 _buf = std::move(buf);
221 return make_ready_future<stop_iteration>(stop_iteration::no);
224 return consumer(std::move(_buf)).then([
this] (consumption_result_type result) {
225 return seastar::visit(result.get(), [
this] (
const continue_consuming&) {
230 return make_ready_future<stop_iteration>(stop_iteration(this->_eof));
231 }, [
this] (stop_consuming<CharType>& stop) {
233 this->_buf = std::move(stop.get_buffer());
234 return make_ready_future<stop_iteration>(stop_iteration::yes);
235 }, [
this] (
const skip_bytes& skip) {
236 return this->_fd.skip(skip.get_value()).then([this](tmp_buf buf) {
238 this->_buf = std::move(buf);
240 return make_ready_future<stop_iteration>(stop_iteration::no);
247template <
typename CharType>
248template <
typename Consumer>
249requires InputStreamConsumer<Consumer, CharType> || ObsoleteInputStreamConsumer<Consumer, CharType>
251input_stream<CharType>::consume(Consumer& consumer)
noexcept(std::is_nothrow_move_constructible_v<Consumer>) {
255template <
typename CharType>
256future<temporary_buffer<CharType>>
261 return make_ready_future<tmp_buf>();
263 return _fd.get().then([
this, n] (
tmp_buf buf) {
265 _buf = std::move(buf);
266 return read_up_to(n);
269 }
else if (_buf.size() <= n) {
271 return make_ready_future<tmp_buf>(std::move(_buf));
275 auto front = _buf.share(0, n);
277 return make_ready_future<tmp_buf>(std::move(front));
279 return current_exception_as_future<tmp_buf>();
284template <
typename CharType>
289 return make_ready_future<tmp_buf>();
292 return _fd.get().then([
this] (
tmp_buf buf) {
294 return make_ready_future<tmp_buf>(std::move(buf));
297 return make_ready_future<tmp_buf>(std::move(_buf));
301template <
typename CharType>
304 auto skip_buf = std::min(n, _buf.size());
305 _buf.trim_front(skip_buf);
308 return make_ready_future<>();
311 _buf = std::move(buffer);
315template <
typename CharType>
319 throw std::logic_error(
"detach() called on a used input_stream");
322 return std::move(_fd);
326template <
typename CharType>
329 SEASTAR_ASSERT(_end == 0);
331 return repeat([
this, buf = std::move(buf)] ()
mutable {
332 if (buf.size() < _size) {
334 _buf = _fd.allocate_buffer(_size);
336 std::copy(buf.get(), buf.get() + buf.size(), _buf.get_write());
338 return make_ready_future<stop_iteration>(stop_iteration::yes);
340 auto chunk = buf.share(0, _size);
341 buf.trim_front(_size);
342 return put(std::move(chunk)).then([] {
343 return stop_iteration::no;
348template <
typename CharType>
351 if (__builtin_expect(!_buf || n > _size - _end,
false)) {
352 return slow_write(buf, n);
354 std::copy_n(buf, n, _buf.get_write() + _end);
356 return make_ready_future<>();
359template <
typename CharType>
363 SEASTAR_ASSERT(!_zc_bufs &&
"Mixing buffered writes and zero-copy writes not supported yet");
364 auto bulk_threshold = _end ? (2 * _size - _end) : _size;
365 if (n >= bulk_threshold) {
367 auto now = _size - _end;
368 std::copy(buf, buf +
now, _buf.get_write() + _end);
374 return put(std::move(_buf)).then([
this, tmp = std::move(tmp)]()
mutable {
376 return split_and_put(std::move(tmp));
378 return put(std::move(tmp));
382 temporary_buffer<char> tmp = _fd.allocate_buffer(n);
383 std::copy(buf, buf + n, tmp.get_write());
385 return split_and_put(std::move(tmp));
387 return put(std::move(tmp));
393 _buf = _fd.allocate_buffer(_size);
396 auto now = std::min(n, _size - _end);
397 std::copy(buf, buf +
now, _buf.get_write() + _end);
400 return make_ready_future<>();
402 temporary_buffer<char> next = _fd.allocate_buffer(_size);
403 std::copy(buf +
now, buf + n, next.get_write());
405 std::swap(next, _buf);
406 return put(std::move(next));
414void add_to_flush_poller(output_stream<char>& x)
noexcept;
417template <
typename CharType>
418future<> output_stream<CharType>::do_flush() noexcept {
422 return _fd.put(std::move(_buf)).then([
this] {
425 }
else if (_zc_bufs) {
426 return _fd.put(std::move(_zc_bufs)).then([
this] {
434template <
typename CharType>
436output_stream<CharType>::flush() noexcept {
437 if (!_batch_flushes) {
442 return make_exception_future<>(std::move(_ex));
446 internal::add_to_flush_poller(*
this);
451 return make_ready_future<>();
454template <
typename CharType>
456output_stream<CharType>::put(temporary_buffer<CharType> buf)
noexcept {
461 return _in_batch.value().get_future().then([
this, buf = std::move(buf)] ()
mutable {
462 return _fd.put(std::move(buf));
465 return _fd.put(std::move(buf));
469template <
typename CharType>
475 _in_batch.value().set_value();
476 _in_batch = std::nullopt;
484 (void)do_flush().then_wrapped([
this] (
future<> f) {
488 _ex = std::current_exception();
489 _fd.on_batch_flush_error();
496template <
typename CharType>
499 return flush().finally([
this] {
501 return _in_batch.value().get_future();
508 std::rethrow_exception(_ex);
515template <
typename CharType>
519 throw std::logic_error(
"detach() called on a used output_stream");
522 return std::move(_fd);
528template <
typename CharType>
529struct stream_copy_consumer {
536 future<consumption_result_type> operator()(temporary_buffer<CharType> data) {
538 return make_ready_future<consumption_result_type>(stop_consuming(std::move(data)));
540 return _os.
write(data.get(), data.size()).
then([] {
541 return make_ready_future<consumption_result_type>(continue_consuming());
549extern template struct internal::stream_copy_consumer<char>;
551template <
typename CharType>
553 return in.consume(internal::stream_copy_consumer<CharType>(out));
556extern template future<> copy<char>(input_stream<char>&, output_stream<char>&);
Definition: sstring.hh:76
Definition: iostream.hh:242
Definition: iostream.hh:168
Definition: iostream.hh:74
A representation of a possibly not-yet-computed value.
Definition: future.hh:1195
Result then(Func &&func) noexcept
Schedule a block of code to run when the future is ready.
Definition: future.hh:1356
value_type && get()
gets the value returned by the computation
Definition: future.hh:1297
Definition: iostream.hh:419
future write(const char_type *buf, size_t n) noexcept
Writes n bytes from the memory pointed by buf into the buffer.
Definition: iostream-impl.hh:350
Definition: scattered_message.hh:35
Definition: temporary_buffer.hh:68
bool empty() const noexcept
Checks whether the buffer is empty.
Definition: temporary_buffer.hh:153
CharType * get_write() noexcept
Definition: temporary_buffer.hh:129
future< T > make_ready_future(A &&... value) noexcept
Creates a future in an available, value state.
Definition: future.hh:1867
future< T > current_exception_as_future() noexcept
Returns std::current_exception() wrapped in a future.
Definition: future.hh:1886
auto do_with(T1 &&rv1, T2 &&rv2, More &&... more) noexcept
Definition: do_with.hh:135
repeat_until_value_return_type< AsyncAction > repeat_until_value(AsyncAction action) noexcept
Definition: loop.hh:238
future now()
Returns a ready future.
Definition: later.hh:35
reference_wrapper< T > ref(T &object) noexcept
Wraps reference in a reference_wrapper.
Definition: reference_wrapper.hh:62
auto visit(Variant &&variant, Args &&... args)
Definition: variant_utils.hh:68
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
future copy(input_stream< CharType > &in, output_stream< CharType > &out)
copy all the content from the input stream to the output stream
Definition: iostream-impl.hh:552