26#include <seastar/core/coroutine.hh>
27#include <seastar/core/do_with.hh>
28#include <seastar/core/loop.hh>
29#include <seastar/net/packet.hh>
30#include <seastar/util/assert.hh>
31#include <seastar/util/variant_utils.hh>
35inline future<temporary_buffer<char>> data_source_impl::skip(uint64_t n)
37 return do_with(uint64_t(n), [
this] (uint64_t& n) {
39 return get().then([&] (temporary_buffer<char> buffer) -> std::optional<temporary_buffer<char>> {
43 if (buffer.size() >= n) {
54template<
typename CharType>
56future<> output_stream<CharType>::write(
const char_type* buf)
noexcept {
57 return write(buf, strlen(buf));
60template<
typename CharType>
61template<
typename StringChar,
typename SizeType, SizeType MaxSize,
bool NulTerminate>
63future<> output_stream<CharType>::write(
const basic_sstring<StringChar, SizeType, MaxSize, NulTerminate>& s)
noexcept {
64 return write(
reinterpret_cast<const CharType *
>(s.c_str()), s.size());
67template<
typename CharType>
69future<> output_stream<CharType>::write(
const std::basic_string<CharType>& s)
noexcept {
70 return write(s.c_str(), s.size());
73template<
typename CharType>
74future<> output_stream<CharType>::write(scattered_message<CharType> msg)
noexcept {
75 return write(std::move(msg).release());
78template<
typename CharType>
80output_stream<CharType>::zero_copy_put(net::packet p)
noexcept {
85 return _in_batch.value().get_future().then([
this, p = std::move(p)] ()
mutable {
86 return _fd.put(std::move(p));
89 return _fd.put(std::move(p));
94template <
typename CharType>
96output_stream<CharType>::zero_copy_split_and_put(net::packet p)
noexcept {
97 return repeat([
this, p = std::move(p)] ()
mutable {
98 if (p.len() < _size) {
100 _zc_bufs = std::move(p);
102 _zc_bufs = net::packet::make_null_packet();
104 return make_ready_future<stop_iteration>(stop_iteration::yes);
106 auto chunk = p.share(0, _size);
108 return zero_copy_put(std::move(chunk)).then([] {
109 return stop_iteration::no;
114template<
typename CharType>
115future<> output_stream<CharType>::write(net::packet p)
noexcept {
116 static_assert(std::is_same_v<CharType, char>,
"packet works on char");
119 SEASTAR_ASSERT(!_end &&
"Mixing buffered writes and zero-copy writes not supported yet");
122 _zc_bufs.append(std::move(p));
124 _zc_bufs = std::move(p);
127 if (_zc_bufs.len() >= _size) {
129 return zero_copy_split_and_put(std::move(_zc_bufs));
131 return zero_copy_put(std::move(_zc_bufs));
135 return make_ready_future<>();
141template<
typename CharType>
142future<> output_stream<CharType>::write(temporary_buffer<CharType> p)
noexcept {
145 return make_ready_future<>();
147 SEASTAR_ASSERT(!_end &&
"Mixing buffered writes and zero-copy writes not supported yet");
148 return write(net::packet(std::move(p)));
154template <
typename CharType>
155future<temporary_buffer<CharType>>
156input_stream<CharType>::read_exactly_part(
size_t n)
noexcept {
157 temporary_buffer<CharType> out(n);
158 size_t completed{0U};
159 while (completed < n) {
160 size_t avail = available();
162 auto now = std::min(n - completed, avail);
163 std::copy_n(_buf.get(),
now, out.get_write() + completed);
164 _buf.trim_front(
now);
166 if (completed == n) {
172 temporary_buffer<CharType> buf =
co_await _fd.
get();
173 if (buf.size() == 0) {
178 _buf = std::move(buf);
183template <
typename CharType>
184future<temporary_buffer<CharType>>
186 if (_buf.size() == n) {
188 return make_ready_future<tmp_buf>(std::move(_buf));
189 }
else if (_buf.size() > n) {
191 auto front = _buf.share(0, n);
193 return make_ready_future<tmp_buf>(std::move(front));
194 }
else if (_buf.size() == 0) {
196 return _fd.get().then([
this, n] (
auto buf)
mutable {
197 if (buf.size() == 0) {
199 return make_ready_future<tmp_buf>(std::move(buf));
201 _buf = std::move(buf);
202 return this->read_exactly(n);
206 return read_exactly_part(n);
210template <
typename CharType>
211template <
typename Consumer>
212requires InputStreamConsumer<Consumer, CharType> || ObsoleteInputStreamConsumer<Consumer, CharType>
215 return repeat([consumer = std::move(consumer),
this] ()
mutable {
216 if (_buf.empty() && !_eof) {
217 return _fd.get().then([this] (tmp_buf buf) {
218 _buf = std::move(buf);
220 return make_ready_future<stop_iteration>(stop_iteration::no);
223 return consumer(std::move(_buf)).then([
this] (consumption_result_type result) {
224 return seastar::visit(result.get(), [
this] (
const continue_consuming&) {
229 return make_ready_future<stop_iteration>(stop_iteration(this->_eof));
230 }, [
this] (stop_consuming<CharType>& stop) {
232 this->_buf = std::move(stop.get_buffer());
233 return make_ready_future<stop_iteration>(stop_iteration::yes);
234 }, [
this] (
const skip_bytes& skip) {
235 return this->_fd.skip(skip.get_value()).then([this](tmp_buf buf) {
237 this->_buf = std::move(buf);
239 return make_ready_future<stop_iteration>(stop_iteration::no);
246template <
typename CharType>
247template <
typename Consumer>
248requires InputStreamConsumer<Consumer, CharType> || ObsoleteInputStreamConsumer<Consumer, CharType>
250input_stream<CharType>::consume(Consumer& consumer)
noexcept(std::is_nothrow_move_constructible_v<Consumer>) {
254template <
typename CharType>
255future<temporary_buffer<CharType>>
260 return make_ready_future<tmp_buf>();
262 return _fd.get().then([
this, n] (
tmp_buf buf) {
264 _buf = std::move(buf);
265 return read_up_to(n);
268 }
else if (_buf.size() <= n) {
270 return make_ready_future<tmp_buf>(std::move(_buf));
274 auto front = _buf.share(0, n);
276 return make_ready_future<tmp_buf>(std::move(front));
278 return current_exception_as_future<tmp_buf>();
283template <
typename CharType>
288 return make_ready_future<tmp_buf>();
291 return _fd.get().then([
this] (
tmp_buf buf) {
293 return make_ready_future<tmp_buf>(std::move(buf));
296 return make_ready_future<tmp_buf>(std::move(_buf));
300template <
typename CharType>
303 auto skip_buf = std::min(n, _buf.size());
304 _buf.trim_front(skip_buf);
307 return make_ready_future<>();
310 _buf = std::move(buffer);
314template <
typename CharType>
318 throw std::logic_error(
"detach() called on a used input_stream");
321 return std::move(_fd);
325template <
typename CharType>
328 SEASTAR_ASSERT(_end == 0);
330 return repeat([
this, buf = std::move(buf)] ()
mutable {
331 if (buf.size() < _size) {
333 _buf = _fd.allocate_buffer(_size);
335 std::copy(buf.get(), buf.get() + buf.size(), _buf.get_write());
337 return make_ready_future<stop_iteration>(stop_iteration::yes);
339 auto chunk = buf.share(0, _size);
340 buf.trim_front(_size);
341 return put(std::move(chunk)).then([] {
342 return stop_iteration::no;
347template <
typename CharType>
349output_stream<CharType>::write(
const char_type* buf,
size_t n)
noexcept {
350 if (__builtin_expect(!_buf || n > _size - _end,
false)) {
351 return slow_write(buf, n);
353 std::copy_n(buf, n, _buf.get_write() + _end);
355 return make_ready_future<>();
358template <
typename CharType>
362 SEASTAR_ASSERT(!_zc_bufs &&
"Mixing buffered writes and zero-copy writes not supported yet");
363 auto bulk_threshold = _end ? (2 * _size - _end) : _size;
364 if (n >= bulk_threshold) {
366 auto now = _size - _end;
367 std::copy(buf, buf +
now, _buf.get_write() + _end);
373 return put(std::move(_buf)).then([
this, tmp = std::move(tmp)]()
mutable {
375 return split_and_put(std::move(tmp));
377 return put(std::move(tmp));
381 temporary_buffer<char> tmp = _fd.allocate_buffer(n);
382 std::copy(buf, buf + n, tmp.get_write());
384 return split_and_put(std::move(tmp));
386 return put(std::move(tmp));
392 _buf = _fd.allocate_buffer(_size);
395 auto now = std::min(n, _size - _end);
396 std::copy(buf, buf +
now, _buf.get_write() + _end);
399 return make_ready_future<>();
401 temporary_buffer<char> next = _fd.allocate_buffer(_size);
402 std::copy(buf +
now, buf + n, next.get_write());
404 std::swap(next, _buf);
405 return put(std::move(next));
413void add_to_flush_poller(output_stream<char>& x)
noexcept;
416template <
typename CharType>
417future<> output_stream<CharType>::do_flush() noexcept {
421 return _fd.put(std::move(_buf)).then([
this] {
424 }
else if (_zc_bufs) {
425 return _fd.put(std::move(_zc_bufs)).then([
this] {
433template <
typename CharType>
435output_stream<CharType>::flush() noexcept {
436 if (!_batch_flushes) {
441 return make_exception_future<>(std::move(_ex));
445 internal::add_to_flush_poller(*
this);
450 return make_ready_future<>();
453template <
typename CharType>
460 return _in_batch.value().get_future().then([
this, buf = std::move(buf)] ()
mutable {
461 return _fd.put(std::move(buf));
464 return _fd.put(std::move(buf));
468template <
typename CharType>
474 _in_batch.value().set_value();
475 _in_batch = std::nullopt;
483 (void)do_flush().then_wrapped([
this] (
future<> f) {
487 _ex = std::current_exception();
488 _fd.on_batch_flush_error();
495template <
typename CharType>
498 return flush().finally([
this] {
500 return _in_batch.value().get_future();
507 std::rethrow_exception(_ex);
514template <
typename CharType>
518 throw std::logic_error(
"detach() called on a used output_stream");
521 return std::move(_fd);
527template <
typename CharType>
528struct stream_copy_consumer {
535 future<consumption_result_type> operator()(temporary_buffer<CharType> data) {
537 return make_ready_future<consumption_result_type>(stop_consuming(std::move(data)));
539 return _os.write(data.get(), data.size()).
then([] {
540 return make_ready_future<consumption_result_type>(continue_consuming());
548extern template struct internal::stream_copy_consumer<char>;
550template <
typename CharType>
552 return in.consume(internal::stream_copy_consumer<CharType>(out));
555extern template future<> copy<char>(input_stream<char>&, output_stream<char>&);
Definition: iostream.hh:238
Definition: iostream.hh:164
Definition: iostream.hh:70
A representation of a possibly not-yet-computed value.
Definition: future.hh:1197
Result then(Func &&func) noexcept
Schedule a block of code to run when the future is ready.
Definition: future.hh:1358
value_type && get()
gets the value returned by the computation
Definition: future.hh:1299
Definition: iostream.hh:394
Definition: temporary_buffer.hh:67
bool empty() const noexcept
Checks whether the buffer is empty.
Definition: temporary_buffer.hh:152
CharType * get_write() noexcept
Definition: temporary_buffer.hh:128
future< T > make_ready_future(A &&... value) noexcept
Creates a future in an available, value state.
Definition: future.hh:1869
future< T > current_exception_as_future() noexcept
Returns std::current_exception() wrapped in a future.
Definition: future.hh:1888
auto do_with(T1 &&rv1, T2 &&rv2, More &&... more) noexcept
Definition: do_with.hh:135
repeat_until_value_return_type< AsyncAction > repeat_until_value(AsyncAction action) noexcept
Definition: loop.hh:238
future now()
Returns a ready future.
Definition: later.hh:35
reference_wrapper< T > ref(T &object) noexcept
Wraps reference in a reference_wrapper.
Definition: reference_wrapper.hh:62
auto visit(Variant &&variant, Args &&... args)
Definition: variant_utils.hh:68
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
future copy(input_stream< CharType > &in, output_stream< CharType > &out)
copy all the content from the input stream to the output stream
Definition: iostream-impl.hh:551