26 #include <seastar/core/do_with.hh>
27 #include <seastar/core/loop.hh>
28 #include <seastar/net/packet.hh>
29 #include <seastar/util/variant_utils.hh>
33 inline future<temporary_buffer<char>> data_source_impl::skip(uint64_t n)
35 return do_with(uint64_t(n), [
this] (uint64_t& n) {
37 return get().then([&] (temporary_buffer<char> buffer) -> std::optional<temporary_buffer<char>> {
41 if (buffer.size() >= n) {
52 template<
typename CharType>
54 future<> output_stream<CharType>::write(
const char_type* buf) noexcept {
55 return write(buf, strlen(buf));
58 template<
typename CharType>
59 template<
typename StringChar,
typename SizeType, SizeType MaxSize,
bool NulTerminate>
61 future<> output_stream<CharType>::write(
const basic_sstring<StringChar, SizeType, MaxSize, NulTerminate>& s) noexcept {
62 return write(
reinterpret_cast<const CharType *
>(s.c_str()), s.size());
65 template<
typename CharType>
67 future<> output_stream<CharType>::write(
const std::basic_string<CharType>& s) noexcept {
68 return write(s.c_str(), s.size());
71 template<
typename CharType>
72 future<> output_stream<CharType>::write(scattered_message<CharType> msg) noexcept {
73 return write(std::move(msg).release());
76 template<
typename CharType>
78 output_stream<CharType>::zero_copy_put(net::packet p) noexcept {
83 return _in_batch.value().get_future().
then([
this, p = std::move(p)] ()
mutable {
84 return _fd.put(std::move(p));
87 return _fd.put(std::move(p));
92 template <
typename CharType>
94 output_stream<CharType>::zero_copy_split_and_put(net::packet p) noexcept {
95 return repeat([
this, p = std::move(p)] ()
mutable {
96 if (p.len() < _size) {
98 _zc_bufs = std::move(p);
100 _zc_bufs = net::packet::make_null_packet();
102 return make_ready_future<stop_iteration>(stop_iteration::yes);
104 auto chunk = p.share(0, _size);
106 return zero_copy_put(std::move(chunk)).then([] {
107 return stop_iteration::no;
112 template<
typename CharType>
113 future<> output_stream<CharType>::write(net::packet p) noexcept {
114 static_assert(std::is_same_v<CharType, char>,
"packet works on char");
117 assert(!_end &&
"Mixing buffered writes and zero-copy writes not supported yet");
120 _zc_bufs.append(std::move(p));
122 _zc_bufs = std::move(p);
125 if (_zc_bufs.len() >= _size) {
127 return zero_copy_split_and_put(std::move(_zc_bufs));
129 return zero_copy_put(std::move(_zc_bufs));
133 return make_ready_future<>();
139 template<
typename CharType>
140 future<> output_stream<CharType>::write(temporary_buffer<CharType> p) noexcept {
143 return make_ready_future<>();
145 assert(!_end &&
"Mixing buffered writes and zero-copy writes not supported yet");
146 return write(net::packet(std::move(p)));
152 template <
typename CharType>
153 future<temporary_buffer<CharType>>
154 input_stream<CharType>::read_exactly_part(
size_t n, tmp_buf out,
size_t completed) noexcept {
156 auto now = std::min(n - completed, available());
157 std::copy(_buf.get(), _buf.get() +
now, out.get_write() + completed);
158 _buf.trim_front(
now);
161 if (completed == n) {
162 return make_ready_future<tmp_buf>(std::move(out));
166 return _fd.get().then([
this, n, out = std::move(out), completed] (
auto buf)
mutable {
167 if (buf.size() == 0) {
170 return make_ready_future<tmp_buf>(std::move(out));
172 _buf = std::move(buf);
173 return this->read_exactly_part(n, std::move(out), completed);
177 template <
typename CharType>
178 future<temporary_buffer<CharType>>
180 if (_buf.size() == n) {
182 return make_ready_future<tmp_buf>(std::move(_buf));
183 }
else if (_buf.size() > n) {
185 auto front = _buf.share(0, n);
187 return make_ready_future<tmp_buf>(std::move(front));
188 }
else if (_buf.size() == 0) {
190 return _fd.get().then([
this, n] (
auto buf)
mutable {
191 if (buf.size() == 0) {
193 return make_ready_future<tmp_buf>(std::move(buf));
195 _buf = std::move(buf);
196 return this->read_exactly(n);
202 return read_exactly_part(n, std::move(b), 0);
204 return current_exception_as_future<tmp_buf>();
209 template <
typename CharType>
210 template <
typename Consumer>
211 SEASTAR_CONCEPT(requires InputStreamConsumer<Consumer, CharType> || ObsoleteInputStreamConsumer<Consumer, CharType>)
214 return repeat([consumer = std::move(consumer),
this] ()
mutable {
215 if (_buf.empty() && !_eof) {
216 return _fd.get().then([this] (tmp_buf buf) {
217 _buf = std::move(buf);
219 return make_ready_future<stop_iteration>(stop_iteration::no);
222 return consumer(std::move(_buf)).then([
this] (consumption_result_type result) {
223 return seastar::visit(result.get(), [
this] (
const continue_consuming&) {
228 return make_ready_future<stop_iteration>(stop_iteration(this->_eof));
229 }, [
this] (stop_consuming<CharType>& stop) {
231 this->_buf = std::move(stop.get_buffer());
232 return make_ready_future<stop_iteration>(stop_iteration::yes);
233 }, [
this] (
const skip_bytes& skip) {
234 return this->_fd.skip(skip.get_value()).then([this](tmp_buf buf) {
236 this->_buf = std::move(buf);
238 return make_ready_future<stop_iteration>(stop_iteration::no);
245 template <
typename CharType>
246 template <
typename Consumer>
247 SEASTAR_CONCEPT(requires InputStreamConsumer<Consumer, CharType> || ObsoleteInputStreamConsumer<Consumer, CharType>)
249 input_stream<CharType>::consume(Consumer& consumer) noexcept(std::is_nothrow_move_constructible_v<Consumer>) {
253 template <
typename CharType>
254 future<temporary_buffer<CharType>>
259 return make_ready_future<tmp_buf>();
261 return _fd.get().then([
this, n] (
tmp_buf buf) {
263 _buf = std::move(buf);
264 return read_up_to(n);
267 }
else if (_buf.size() <= n) {
269 return make_ready_future<tmp_buf>(std::move(_buf));
273 auto front = _buf.share(0, n);
275 return make_ready_future<tmp_buf>(std::move(front));
277 return current_exception_as_future<tmp_buf>();
282 template <
typename CharType>
287 return make_ready_future<tmp_buf>();
290 return _fd.get().then([
this] (
tmp_buf buf) {
292 return make_ready_future<tmp_buf>(std::move(buf));
295 return make_ready_future<tmp_buf>(std::move(_buf));
299 template <
typename CharType>
302 auto skip_buf = std::min(n, _buf.size());
303 _buf.trim_front(skip_buf);
306 return make_ready_future<>();
309 _buf = std::move(buffer);
313 template <
typename CharType>
317 throw std::logic_error(
"detach() called on a used input_stream");
320 return std::move(_fd);
324 template <
typename CharType>
329 return repeat([
this, buf = std::move(buf)] ()
mutable {
330 if (buf.size() < _size) {
332 _buf = _fd.allocate_buffer(_size);
334 std::copy(buf.get(), buf.get() + buf.size(), _buf.get_write());
336 return make_ready_future<stop_iteration>(stop_iteration::yes);
338 auto chunk = buf.share(0, _size);
339 buf.trim_front(_size);
340 return put(std::move(chunk)).then([] {
341 return stop_iteration::no;
346 template <
typename CharType>
348 output_stream<CharType>::write(
const char_type* buf,
size_t n) noexcept {
349 if (__builtin_expect(!_buf || n > _size - _end,
false)) {
350 return slow_write(buf, n);
352 std::copy_n(buf, n, _buf.get_write() + _end);
354 return make_ready_future<>();
357 template <
typename CharType>
361 assert(!_zc_bufs &&
"Mixing buffered writes and zero-copy writes not supported yet");
362 auto bulk_threshold = _end ? (2 * _size - _end) : _size;
363 if (n >= bulk_threshold) {
365 auto now = _size - _end;
366 std::copy(buf, buf +
now, _buf.get_write() + _end);
372 return put(std::move(_buf)).then([
this, tmp = std::move(tmp)]()
mutable {
374 return split_and_put(std::move(tmp));
376 return put(std::move(tmp));
381 std::copy(buf, buf + n, tmp.
get_write());
383 return split_and_put(std::move(tmp));
385 return put(std::move(tmp));
391 _buf = _fd.allocate_buffer(_size);
394 auto now = std::min(n, _size - _end);
395 std::copy(buf, buf +
now, _buf.get_write() + _end);
398 return make_ready_future<>();
403 std::swap(next, _buf);
404 return put(std::move(next));
415 template <
typename CharType>
416 future<> output_stream<CharType>::do_flush() noexcept {
420 return _fd.put(std::move(_buf)).
then([
this] {
423 }
else if (_zc_bufs) {
424 return _fd.put(std::move(_zc_bufs)).then([
this] {
432 template <
typename CharType>
434 output_stream<CharType>::flush() noexcept {
435 if (!_batch_flushes) {
440 return make_exception_future<>(std::move(_ex));
444 internal::add_to_flush_poller(*
this);
449 return make_ready_future<>();
452 template <
typename CharType>
459 return _in_batch.value().get_future().then([
this, buf = std::move(buf)] ()
mutable {
460 return _fd.put(std::move(buf));
463 return _fd.put(std::move(buf));
467 template <
typename CharType>
473 _in_batch.value().set_value();
474 _in_batch = std::nullopt;
482 (void)do_flush().then_wrapped([
this] (
future<> f) {
486 _ex = std::current_exception();
487 _fd.on_batch_flush_error();
494 template <
typename CharType>
497 return flush().finally([
this] {
499 return _in_batch.value().get_future();
506 std::rethrow_exception(_ex);
513 template <
typename CharType>
517 throw std::logic_error(
"detach() called on a used output_stream");
520 return std::move(_fd);
526 template <
typename CharType>
527 struct stream_copy_consumer {
530 using unconsumed_remainder = std::optional<temporary_buffer<CharType>>;
534 future<unconsumed_remainder> operator()(temporary_buffer<CharType> data) {
536 return make_ready_future<unconsumed_remainder>(std::move(data));
538 return _os.write(data.get(), data.size()).
then([] () {
539 return make_ready_future<unconsumed_remainder>();
547 extern template struct internal::stream_copy_consumer<char>;
549 template <
typename CharType>
551 return in.consume(internal::stream_copy_consumer<CharType>(out));
554 extern template future<> copy<char>(input_stream<char>&, output_stream<char>&);
Definition: iostream.hh:148
Definition: iostream.hh:68
Result then(Func &&func) noexcept
Schedule a block of code to run when the future is ready.
Definition: future.hh:1410
value_type && get()
gets the value returned by the computation
Definition: future.hh:1340
Definition: iostream.hh:379
Definition: temporary_buffer.hh:67
bool empty() const noexcept
Checks whether the buffer is empty.
Definition: temporary_buffer.hh:152
CharType * get_write() noexcept
Definition: temporary_buffer.hh:128
future< T > make_ready_future(A &&... value) noexcept
Creates a future in an available, value state.
Definition: future.hh:1934
future< T > current_exception_as_future() noexcept
Returns std::current_exception() wrapped in a future.
Definition: future.hh:1953
auto do_with(T1 &&rv1, T2 &&rv2, More &&... more) noexcept
Definition: do_with.hh:135
repeat_until_value_return_type< AsyncAction > repeat_until_value(AsyncAction action) noexcept
Definition: loop.hh:237
future now()
Returns a ready future.
Definition: later.hh:35
auto visit(Variant &&variant, Args &&... args)
Definition: variant_utils.hh:68
reference_wrapper< T > ref(T &object) noexcept
Wraps reference in a reference_wrapper.
Definition: reference_wrapper.hh:62
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
future copy(input_stream< CharType > &in, output_stream< CharType > &out)
copy all the content from the input stream to the output stream
Definition: iostream-impl.hh:550