26#include <seastar/core/coroutine.hh>
27#include <seastar/core/do_with.hh>
28#include <seastar/core/loop.hh>
29#include <seastar/net/packet.hh>
30#include <seastar/util/variant_utils.hh>
34inline future<temporary_buffer<char>> data_source_impl::skip(uint64_t n)
36 return do_with(uint64_t(n), [
this] (uint64_t& n) {
38 return get().then([&] (temporary_buffer<char> buffer) -> std::optional<temporary_buffer<char>> {
42 if (buffer.size() >= n) {
53template<
typename CharType>
55future<> output_stream<CharType>::write(
const char_type* buf)
noexcept {
56 return write(buf, strlen(buf));
59template<
typename CharType>
60template<
typename StringChar,
typename SizeType, SizeType MaxSize,
bool NulTerminate>
62future<> output_stream<CharType>::write(
const basic_sstring<StringChar, SizeType, MaxSize, NulTerminate>& s)
noexcept {
63 return write(
reinterpret_cast<const CharType *
>(s.c_str()), s.size());
66template<
typename CharType>
68future<> output_stream<CharType>::write(
const std::basic_string<CharType>& s)
noexcept {
69 return write(s.c_str(), s.size());
72template<
typename CharType>
73future<> output_stream<CharType>::write(scattered_message<CharType> msg)
noexcept {
74 return write(std::move(msg).release());
77template<
typename CharType>
79output_stream<CharType>::zero_copy_put(net::packet p)
noexcept {
84 return _in_batch.value().get_future().then([
this, p = std::move(p)] ()
mutable {
85 return _fd.put(std::move(p));
88 return _fd.put(std::move(p));
93template <
typename CharType>
95output_stream<CharType>::zero_copy_split_and_put(net::packet p)
noexcept {
96 return repeat([
this, p = std::move(p)] ()
mutable {
97 if (p.len() < _size) {
99 _zc_bufs = std::move(p);
101 _zc_bufs = net::packet::make_null_packet();
103 return make_ready_future<stop_iteration>(stop_iteration::yes);
105 auto chunk = p.share(0, _size);
107 return zero_copy_put(std::move(chunk)).then([] {
108 return stop_iteration::no;
113template<
typename CharType>
114future<> output_stream<CharType>::write(net::packet p)
noexcept {
115 static_assert(std::is_same_v<CharType, char>,
"packet works on char");
118 assert(!_end &&
"Mixing buffered writes and zero-copy writes not supported yet");
121 _zc_bufs.append(std::move(p));
123 _zc_bufs = std::move(p);
126 if (_zc_bufs.len() >= _size) {
128 return zero_copy_split_and_put(std::move(_zc_bufs));
130 return zero_copy_put(std::move(_zc_bufs));
134 return make_ready_future<>();
140template<
typename CharType>
141future<> output_stream<CharType>::write(temporary_buffer<CharType> p)
noexcept {
144 return make_ready_future<>();
146 assert(!_end &&
"Mixing buffered writes and zero-copy writes not supported yet");
147 return write(net::packet(std::move(p)));
153template <
typename CharType>
154future<temporary_buffer<CharType>>
155input_stream<CharType>::read_exactly_part(
size_t n)
noexcept {
156 temporary_buffer<CharType> out(n);
157 size_t completed{0U};
158 while (completed < n) {
159 size_t avail = available();
161 auto now = std::min(n - completed, avail);
162 std::copy_n(_buf.get(),
now, out.get_write() + completed);
163 _buf.trim_front(
now);
165 if (completed == n) {
171 temporary_buffer<CharType> buf =
co_await _fd.
get();
172 if (buf.size() == 0) {
177 _buf = std::move(buf);
182template <
typename CharType>
183future<temporary_buffer<CharType>>
185 if (_buf.size() == n) {
187 return make_ready_future<tmp_buf>(std::move(_buf));
188 }
else if (_buf.size() > n) {
190 auto front = _buf.share(0, n);
192 return make_ready_future<tmp_buf>(std::move(front));
193 }
else if (_buf.size() == 0) {
195 return _fd.get().then([
this, n] (
auto buf)
mutable {
196 if (buf.size() == 0) {
198 return make_ready_future<tmp_buf>(std::move(buf));
200 _buf = std::move(buf);
201 return this->read_exactly(n);
205 return read_exactly_part(n);
209template <
typename CharType>
210template <
typename Consumer>
211requires InputStreamConsumer<Consumer, CharType> || ObsoleteInputStreamConsumer<Consumer, CharType>
214 return repeat([consumer = std::move(consumer),
this] ()
mutable {
215 if (_buf.empty() && !_eof) {
216 return _fd.get().then([this] (tmp_buf buf) {
217 _buf = std::move(buf);
219 return make_ready_future<stop_iteration>(stop_iteration::no);
222 return consumer(std::move(_buf)).then([
this] (consumption_result_type result) {
223 return seastar::visit(result.get(), [
this] (
const continue_consuming&) {
228 return make_ready_future<stop_iteration>(stop_iteration(this->_eof));
229 }, [
this] (stop_consuming<CharType>& stop) {
231 this->_buf = std::move(stop.get_buffer());
232 return make_ready_future<stop_iteration>(stop_iteration::yes);
233 }, [
this] (
const skip_bytes& skip) {
234 return this->_fd.skip(skip.get_value()).then([this](tmp_buf buf) {
236 this->_buf = std::move(buf);
238 return make_ready_future<stop_iteration>(stop_iteration::no);
245template <
typename CharType>
246template <
typename Consumer>
247requires InputStreamConsumer<Consumer, CharType> || ObsoleteInputStreamConsumer<Consumer, CharType>
249input_stream<CharType>::consume(Consumer& consumer)
noexcept(std::is_nothrow_move_constructible_v<Consumer>) {
253template <
typename CharType>
254future<temporary_buffer<CharType>>
259 return make_ready_future<tmp_buf>();
261 return _fd.get().then([
this, n] (
tmp_buf buf) {
263 _buf = std::move(buf);
264 return read_up_to(n);
267 }
else if (_buf.size() <= n) {
269 return make_ready_future<tmp_buf>(std::move(_buf));
273 auto front = _buf.share(0, n);
275 return make_ready_future<tmp_buf>(std::move(front));
277 return current_exception_as_future<tmp_buf>();
282template <
typename CharType>
287 return make_ready_future<tmp_buf>();
290 return _fd.get().then([
this] (
tmp_buf buf) {
292 return make_ready_future<tmp_buf>(std::move(buf));
295 return make_ready_future<tmp_buf>(std::move(_buf));
299template <
typename CharType>
302 auto skip_buf = std::min(n, _buf.size());
303 _buf.trim_front(skip_buf);
306 return make_ready_future<>();
309 _buf = std::move(buffer);
313template <
typename CharType>
317 throw std::logic_error(
"detach() called on a used input_stream");
320 return std::move(_fd);
324template <
typename CharType>
329 return repeat([
this, buf = std::move(buf)] ()
mutable {
330 if (buf.size() < _size) {
332 _buf = _fd.allocate_buffer(_size);
334 std::copy(buf.get(), buf.get() + buf.size(), _buf.get_write());
336 return make_ready_future<stop_iteration>(stop_iteration::yes);
338 auto chunk = buf.share(0, _size);
339 buf.trim_front(_size);
340 return put(std::move(chunk)).then([] {
341 return stop_iteration::no;
346template <
typename CharType>
348output_stream<CharType>::write(
const char_type* buf,
size_t n)
noexcept {
349 if (__builtin_expect(!_buf || n > _size - _end,
false)) {
350 return slow_write(buf, n);
352 std::copy_n(buf, n, _buf.get_write() + _end);
354 return make_ready_future<>();
357template <
typename CharType>
361 assert(!_zc_bufs &&
"Mixing buffered writes and zero-copy writes not supported yet");
362 auto bulk_threshold = _end ? (2 * _size - _end) : _size;
363 if (n >= bulk_threshold) {
365 auto now = _size - _end;
366 std::copy(buf, buf +
now, _buf.get_write() + _end);
372 return put(std::move(_buf)).then([
this, tmp = std::move(tmp)]()
mutable {
374 return split_and_put(std::move(tmp));
376 return put(std::move(tmp));
380 temporary_buffer<char> tmp = _fd.allocate_buffer(n);
381 std::copy(buf, buf + n, tmp.get_write());
383 return split_and_put(std::move(tmp));
385 return put(std::move(tmp));
391 _buf = _fd.allocate_buffer(_size);
394 auto now = std::min(n, _size - _end);
395 std::copy(buf, buf +
now, _buf.get_write() + _end);
398 return make_ready_future<>();
400 temporary_buffer<char> next = _fd.allocate_buffer(_size);
401 std::copy(buf +
now, buf + n, next.get_write());
403 std::swap(next, _buf);
404 return put(std::move(next));
412void add_to_flush_poller(output_stream<char>& x)
noexcept;
415template <
typename CharType>
416future<> output_stream<CharType>::do_flush() noexcept {
420 return _fd.put(std::move(_buf)).then([
this] {
423 }
else if (_zc_bufs) {
424 return _fd.put(std::move(_zc_bufs)).then([
this] {
432template <
typename CharType>
434output_stream<CharType>::flush() noexcept {
435 if (!_batch_flushes) {
440 return make_exception_future<>(std::move(_ex));
444 internal::add_to_flush_poller(*
this);
449 return make_ready_future<>();
452template <
typename CharType>
459 return _in_batch.value().get_future().then([
this, buf = std::move(buf)] ()
mutable {
460 return _fd.put(std::move(buf));
463 return _fd.put(std::move(buf));
467template <
typename CharType>
473 _in_batch.value().set_value();
474 _in_batch = std::nullopt;
482 (void)do_flush().then_wrapped([
this] (
future<> f) {
486 _ex = std::current_exception();
487 _fd.on_batch_flush_error();
494template <
typename CharType>
497 return flush().finally([
this] {
499 return _in_batch.value().get_future();
506 std::rethrow_exception(_ex);
513template <
typename CharType>
517 throw std::logic_error(
"detach() called on a used output_stream");
520 return std::move(_fd);
526template <
typename CharType>
527struct stream_copy_consumer {
534 future<consumption_result_type> operator()(temporary_buffer<CharType> data) {
536 return make_ready_future<consumption_result_type>(stop_consuming(std::move(data)));
538 return _os.write(data.get(), data.size()).
then([] {
539 return make_ready_future<consumption_result_type>(continue_consuming());
547extern template struct internal::stream_copy_consumer<char>;
549template <
typename CharType>
551 return in.consume(internal::stream_copy_consumer<CharType>(out));
554extern template future<> copy<char>(input_stream<char>&, output_stream<char>&);
Definition: iostream.hh:237
Definition: iostream.hh:163
Definition: iostream.hh:69
A representation of a possibly not-yet-computed value.
Definition: future.hh:1240
Result then(Func &&func) noexcept
Schedule a block of code to run when the future is ready.
Definition: future.hh:1425
value_type && get()
gets the value returned by the computation
Definition: future.hh:1342
Definition: iostream.hh:393
Definition: temporary_buffer.hh:67
bool empty() const noexcept
Checks whether the buffer is empty.
Definition: temporary_buffer.hh:152
CharType * get_write() noexcept
Definition: temporary_buffer.hh:128
future< T > make_ready_future(A &&... value) noexcept
Creates a future in an available, value state.
Definition: future.hh:1943
future< T > current_exception_as_future() noexcept
Returns std::current_exception() wrapped in a future.
Definition: future.hh:1962
auto do_with(T1 &&rv1, T2 &&rv2, More &&... more) noexcept
Definition: do_with.hh:135
repeat_until_value_return_type< AsyncAction > repeat_until_value(AsyncAction action) noexcept
Definition: loop.hh:237
future now()
Returns a ready future.
Definition: later.hh:35
reference_wrapper< T > ref(T &object) noexcept
Wraps reference in a reference_wrapper.
Definition: reference_wrapper.hh:62
auto visit(Variant &&variant, Args &&... args)
Definition: variant_utils.hh:68
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
future copy(input_stream< CharType > &in, output_stream< CharType > &out)
copy all the content from the input stream to the output stream
Definition: iostream-impl.hh:550