Seastar
High performance C++ framework for concurrent servers
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Modules Pages
iostream-impl.hh
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18
19/*
20 * Copyright (C) 2015 Cloudius Systems, Ltd.
21 */
22
23
24#pragma once
25
26#include <seastar/core/coroutine.hh>
27#include <seastar/core/do_with.hh>
28#include <seastar/core/loop.hh>
29#include <seastar/net/packet.hh>
30#include <seastar/util/assert.hh>
31#include <seastar/util/variant_utils.hh>
32
33namespace seastar {
34
35inline future<temporary_buffer<char>> data_source_impl::skip(uint64_t n)
36{
37 return do_with(uint64_t(n), [this] (uint64_t& n) {
38 return repeat_until_value([&] {
39 return get().then([&] (temporary_buffer<char> buffer) -> std::optional<temporary_buffer<char>> {
40 if (buffer.empty()) {
41 return buffer;
42 }
43 if (buffer.size() >= n) {
44 buffer.trim_front(n);
45 return buffer;
46 }
47 n -= buffer.size();
48 return { };
49 });
50 });
51 });
52}
53
54template<typename CharType>
55inline
56future<> output_stream<CharType>::write(const char_type* buf) noexcept {
57 return write(buf, strlen(buf));
58}
59
60template<typename CharType>
61template<typename StringChar, typename SizeType, SizeType MaxSize, bool NulTerminate>
62inline
63future<> output_stream<CharType>::write(const basic_sstring<StringChar, SizeType, MaxSize, NulTerminate>& s) noexcept {
64 return write(reinterpret_cast<const CharType *>(s.c_str()), s.size());
65}
66
67template<typename CharType>
68inline
69future<> output_stream<CharType>::write(const std::basic_string<CharType>& s) noexcept {
70 return write(s.c_str(), s.size());
71}
72
73template<typename CharType>
74future<> output_stream<CharType>::write(scattered_message<CharType> msg) noexcept {
75 return write(std::move(msg).release());
76}
77
78template<typename CharType>
79future<>
80output_stream<CharType>::zero_copy_put(net::packet p) noexcept {
81 // if flush is scheduled, disable it, so it will not try to write in parallel
82 _flush = false;
83 if (_flushing) {
84 // flush in progress, wait for it to end before continuing
85 return _in_batch.value().get_future().then([this, p = std::move(p)] () mutable {
86 return _fd.put(std::move(p));
87 });
88 } else {
89 return _fd.put(std::move(p));
90 }
91}
92
93// Writes @p in chunks of _size length. The last chunk is buffered if smaller.
94template <typename CharType>
95future<>
96output_stream<CharType>::zero_copy_split_and_put(net::packet p) noexcept {
97 return repeat([this, p = std::move(p)] () mutable {
98 if (p.len() < _size) {
99 if (p.len()) {
100 _zc_bufs = std::move(p);
101 } else {
102 _zc_bufs = net::packet::make_null_packet();
103 }
104 return make_ready_future<stop_iteration>(stop_iteration::yes);
105 }
106 auto chunk = p.share(0, _size);
107 p.trim_front(_size);
108 return zero_copy_put(std::move(chunk)).then([] {
109 return stop_iteration::no;
110 });
111 });
112}
113
114template<typename CharType>
115future<> output_stream<CharType>::write(net::packet p) noexcept {
116 static_assert(std::is_same_v<CharType, char>, "packet works on char");
117 try {
118 if (p.len() != 0) {
119 SEASTAR_ASSERT(!_end && "Mixing buffered writes and zero-copy writes not supported yet");
120
121 if (_zc_bufs) {
122 _zc_bufs.append(std::move(p));
123 } else {
124 _zc_bufs = std::move(p);
125 }
126
127 if (_zc_bufs.len() >= _size) {
128 if (_trim_to_size) {
129 return zero_copy_split_and_put(std::move(_zc_bufs));
130 } else {
131 return zero_copy_put(std::move(_zc_bufs));
132 }
133 }
134 }
135 return make_ready_future<>();
136 } catch (...) {
138 }
139}
140
141template<typename CharType>
142future<> output_stream<CharType>::write(temporary_buffer<CharType> p) noexcept {
143 try {
144 if (p.empty()) {
145 return make_ready_future<>();
146 }
147 SEASTAR_ASSERT(!_end && "Mixing buffered writes and zero-copy writes not supported yet");
148 return write(net::packet(std::move(p)));
149 } catch (...) {
151 }
152}
153
154template <typename CharType>
155future<temporary_buffer<CharType>>
156input_stream<CharType>::read_exactly_part(size_t n) noexcept {
157 temporary_buffer<CharType> out(n);
158 size_t completed{0U};
159 while (completed < n) {
160 size_t avail = available();
161 if (avail) {
162 auto now = std::min(n - completed, avail);
163 std::copy_n(_buf.get(), now, out.get_write() + completed);
164 _buf.trim_front(now);
165 completed += now;
166 if (completed == n) {
167 break;
168 }
169 }
170
171 // _buf is now empty
172 temporary_buffer<CharType> buf = co_await _fd.get();
173 if (buf.size() == 0) {
174 _eof = true;
175 out.trim(completed);
176 break;
177 }
178 _buf = std::move(buf);
179 }
180 co_return out;
181}
182
183template <typename CharType>
184future<temporary_buffer<CharType>>
186 if (_buf.size() == n) {
187 // easy case: steal buffer, return to caller
188 return make_ready_future<tmp_buf>(std::move(_buf));
189 } else if (_buf.size() > n) {
190 // buffer large enough, share it with caller
191 auto front = _buf.share(0, n);
192 _buf.trim_front(n);
193 return make_ready_future<tmp_buf>(std::move(front));
194 } else if (_buf.size() == 0) {
195 // buffer is empty: grab one and retry
196 return _fd.get().then([this, n] (auto buf) mutable {
197 if (buf.size() == 0) {
198 _eof = true;
199 return make_ready_future<tmp_buf>(std::move(buf));
200 }
201 _buf = std::move(buf);
202 return this->read_exactly(n);
203 });
204 } else {
205 // buffer too small: start copy/read loop
206 return read_exactly_part(n);
207 }
208}
209
210template <typename CharType>
211template <typename Consumer>
212requires InputStreamConsumer<Consumer, CharType> || ObsoleteInputStreamConsumer<Consumer, CharType>
214input_stream<CharType>::consume(Consumer&& consumer) noexcept(std::is_nothrow_move_constructible_v<Consumer>) {
215 return repeat([consumer = std::move(consumer), this] () mutable {
216 if (_buf.empty() && !_eof) {
217 return _fd.get().then([this] (tmp_buf buf) {
218 _buf = std::move(buf);
219 _eof = _buf.empty();
220 return make_ready_future<stop_iteration>(stop_iteration::no);
221 });
222 }
223 return consumer(std::move(_buf)).then([this] (consumption_result_type result) {
224 return seastar::visit(result.get(), [this] (const continue_consuming&) {
225 // If we're here, consumer consumed entire buffer and is ready for
226 // more now. So we do not return, and rather continue the loop.
227 //
228 // If we're at eof, we should stop.
229 return make_ready_future<stop_iteration>(stop_iteration(this->_eof));
230 }, [this] (stop_consuming<CharType>& stop) {
231 // consumer is done
232 this->_buf = std::move(stop.get_buffer());
233 return make_ready_future<stop_iteration>(stop_iteration::yes);
234 }, [this] (const skip_bytes& skip) {
235 return this->_fd.skip(skip.get_value()).then([this](tmp_buf buf) {
236 if (!buf.empty()) {
237 this->_buf = std::move(buf);
238 }
239 return make_ready_future<stop_iteration>(stop_iteration::no);
240 });
241 });
242 });
243 });
244}
245
246template <typename CharType>
247template <typename Consumer>
248requires InputStreamConsumer<Consumer, CharType> || ObsoleteInputStreamConsumer<Consumer, CharType>
249future<>
250input_stream<CharType>::consume(Consumer& consumer) noexcept(std::is_nothrow_move_constructible_v<Consumer>) {
251 return consume(std::ref(consumer));
252}
253
254template <typename CharType>
255future<temporary_buffer<CharType>>
258 if (_buf.empty()) {
259 if (_eof) {
260 return make_ready_future<tmp_buf>();
261 } else {
262 return _fd.get().then([this, n] (tmp_buf buf) {
263 _eof = buf.empty();
264 _buf = std::move(buf);
265 return read_up_to(n);
266 });
267 }
268 } else if (_buf.size() <= n) {
269 // easy case: steal buffer, return to caller
270 return make_ready_future<tmp_buf>(std::move(_buf));
271 } else {
272 try {
273 // buffer is larger than n, so share its head with a caller
274 auto front = _buf.share(0, n);
275 _buf.trim_front(n);
276 return make_ready_future<tmp_buf>(std::move(front));
277 } catch (...) {
278 return current_exception_as_future<tmp_buf>();
279 }
280 }
281}
282
283template <typename CharType>
287 if (_eof) {
288 return make_ready_future<tmp_buf>();
289 }
290 if (_buf.empty()) {
291 return _fd.get().then([this] (tmp_buf buf) {
292 _eof = buf.empty();
293 return make_ready_future<tmp_buf>(std::move(buf));
294 });
295 } else {
296 return make_ready_future<tmp_buf>(std::move(_buf));
297 }
298}
299
300template <typename CharType>
302input_stream<CharType>::skip(uint64_t n) noexcept {
303 auto skip_buf = std::min(n, _buf.size());
304 _buf.trim_front(skip_buf);
305 n -= skip_buf;
306 if (!n) {
307 return make_ready_future<>();
308 }
309 return _fd.skip(n).then([this] (temporary_buffer<CharType> buffer) {
310 _buf = std::move(buffer);
311 });
312}
313
314template <typename CharType>
317 if (_buf) {
318 throw std::logic_error("detach() called on a used input_stream");
319 }
320
321 return std::move(_fd);
322}
323
324// Writes @buf in chunks of _size length. The last chunk is buffered if smaller.
325template <typename CharType>
328 SEASTAR_ASSERT(_end == 0);
329
330 return repeat([this, buf = std::move(buf)] () mutable {
331 if (buf.size() < _size) {
332 if (!_buf) {
333 _buf = _fd.allocate_buffer(_size);
334 }
335 std::copy(buf.get(), buf.get() + buf.size(), _buf.get_write());
336 _end = buf.size();
337 return make_ready_future<stop_iteration>(stop_iteration::yes);
338 }
339 auto chunk = buf.share(0, _size);
340 buf.trim_front(_size);
341 return put(std::move(chunk)).then([] {
342 return stop_iteration::no;
343 });
344 });
345}
346
347template <typename CharType>
348future<>
349output_stream<CharType>::write(const char_type* buf, size_t n) noexcept {
350 if (__builtin_expect(!_buf || n > _size - _end, false)) {
351 return slow_write(buf, n);
352 }
353 std::copy_n(buf, n, _buf.get_write() + _end);
354 _end += n;
355 return make_ready_future<>();
356}
357
358template <typename CharType>
360output_stream<CharType>::slow_write(const char_type* buf, size_t n) noexcept {
361 try {
362 SEASTAR_ASSERT(!_zc_bufs && "Mixing buffered writes and zero-copy writes not supported yet");
363 auto bulk_threshold = _end ? (2 * _size - _end) : _size;
364 if (n >= bulk_threshold) {
365 if (_end) {
366 auto now = _size - _end;
367 std::copy(buf, buf + now, _buf.get_write() + _end);
368 _end = _size;
369 temporary_buffer<char> tmp = _fd.allocate_buffer(n - now);
370 std::copy(buf + now, buf + n, tmp.get_write());
371 _buf.trim(_end);
372 _end = 0;
373 return put(std::move(_buf)).then([this, tmp = std::move(tmp)]() mutable {
374 if (_trim_to_size) {
375 return split_and_put(std::move(tmp));
376 } else {
377 return put(std::move(tmp));
378 }
379 });
380 } else {
381 temporary_buffer<char> tmp = _fd.allocate_buffer(n);
382 std::copy(buf, buf + n, tmp.get_write());
383 if (_trim_to_size) {
384 return split_and_put(std::move(tmp));
385 } else {
386 return put(std::move(tmp));
387 }
388 }
389 }
390
391 if (!_buf) {
392 _buf = _fd.allocate_buffer(_size);
393 }
394
395 auto now = std::min(n, _size - _end);
396 std::copy(buf, buf + now, _buf.get_write() + _end);
397 _end += now;
398 if (now == n) {
399 return make_ready_future<>();
400 } else {
401 temporary_buffer<char> next = _fd.allocate_buffer(_size);
402 std::copy(buf + now, buf + n, next.get_write());
403 _end = n - now;
404 std::swap(next, _buf);
405 return put(std::move(next));
406 }
407 } catch (...) {
409 }
410}
411
412namespace internal {
413void add_to_flush_poller(output_stream<char>& x) noexcept;
414}
415
416template <typename CharType>
417future<> output_stream<CharType>::do_flush() noexcept {
418 if (_end) {
419 _buf.trim(_end);
420 _end = 0;
421 return _fd.put(std::move(_buf)).then([this] {
422 return _fd.flush();
423 });
424 } else if (_zc_bufs) {
425 return _fd.put(std::move(_zc_bufs)).then([this] {
426 return _fd.flush();
427 });
428 } else {
429 return _fd.flush();
430 }
431}
432
433template <typename CharType>
434future<>
435output_stream<CharType>::flush() noexcept {
436 if (!_batch_flushes) {
437 return do_flush();
438 } else {
439 if (_ex) {
440 // flush is a good time to deliver outstanding errors
441 return make_exception_future<>(std::move(_ex));
442 } else {
443 _flush = true;
444 if (!_in_batch) {
445 internal::add_to_flush_poller(*this);
446 _in_batch = promise<>();
447 }
448 }
449 }
450 return make_ready_future<>();
451}
452
453template <typename CharType>
456 // if flush is scheduled, disable it, so it will not try to write in parallel
457 _flush = false;
458 if (_flushing) {
459 // flush in progress, wait for it to end before continuing
460 return _in_batch.value().get_future().then([this, buf = std::move(buf)] () mutable {
461 return _fd.put(std::move(buf));
462 });
463 } else {
464 return _fd.put(std::move(buf));
465 }
466}
468template <typename CharType>
469void
471 if (!_flush) {
472 // flush was canceled, do nothing
473 _flushing = false;
474 _in_batch.value().set_value();
475 _in_batch = std::nullopt;
476 return;
477 }
478
479 _flush = false;
480 _flushing = true; // make whoever wants to write into the fd to wait for flush to complete
481
482 // FIXME: future is discarded
483 (void)do_flush().then_wrapped([this] (future<> f) {
484 try {
485 f.get();
486 } catch (...) {
487 _ex = std::current_exception();
488 _fd.on_batch_flush_error();
489 }
490 // if flush() was called while flushing flush once more
491 poll_flush();
492 });
493}
494
495template <typename CharType>
498 return flush().finally([this] {
499 if (_in_batch) {
500 return _in_batch.value().get_future();
501 } else {
502 return make_ready_future();
503 }
504 }).then([this] {
505 // report final exception as close error
506 if (_ex) {
507 std::rethrow_exception(_ex);
508 }
509 }).finally([this] {
510 return _fd.close();
511 });
512}
513
514template <typename CharType>
517 if (_buf) {
518 throw std::logic_error("detach() called on a used output_stream");
519 }
520
521 return std::move(_fd);
522}
523
524namespace internal {
525
527template <typename CharType>
528struct stream_copy_consumer {
529private:
531 using consumption_result_type = consumption_result<CharType>;
532public:
533 stream_copy_consumer(output_stream<CharType>& os) : _os(os) {
534 }
535 future<consumption_result_type> operator()(temporary_buffer<CharType> data) {
536 if (data.empty()) {
537 return make_ready_future<consumption_result_type>(stop_consuming(std::move(data)));
538 }
539 return _os.write(data.get(), data.size()).then([] {
540 return make_ready_future<consumption_result_type>(continue_consuming());
541 });
542 }
543};
545
546}
547
548extern template struct internal::stream_copy_consumer<char>;
549
550template <typename CharType>
552 return in.consume(internal::stream_copy_consumer<CharType>(out));
553}
554
555extern template future<> copy<char>(input_stream<char>&, output_stream<char>&);
556}
Definition: iostream.hh:238
Definition: iostream.hh:164
Definition: iostream.hh:70
A representation of a possibly not-yet-computed value.
Definition: future.hh:1197
Result then(Func &&func) noexcept
Schedule a block of code to run when the future is ready.
Definition: future.hh:1358
value_type && get()
gets the value returned by the computation
Definition: future.hh:1299
Definition: iostream.hh:296
Definition: iostream.hh:394
Definition: temporary_buffer.hh:67
bool empty() const noexcept
Checks whether the buffer is empty.
Definition: temporary_buffer.hh:152
CharType * get_write() noexcept
Definition: temporary_buffer.hh:128
future< T > make_ready_future(A &&... value) noexcept
Creates a future in an available, value state.
Definition: future.hh:1869
future< T > current_exception_as_future() noexcept
Returns std::current_exception() wrapped in a future.
Definition: future.hh:1888
auto do_with(T1 &&rv1, T2 &&rv2, More &&... more) noexcept
Definition: do_with.hh:135
repeat_until_value_return_type< AsyncAction > repeat_until_value(AsyncAction action) noexcept
Definition: loop.hh:238
future now()
Returns a ready future.
Definition: later.hh:35
reference_wrapper< T > ref(T &object) noexcept
Wraps reference in a reference_wrapper.
Definition: reference_wrapper.hh:62
auto visit(Variant &&variant, Args &&... args)
Definition: variant_utils.hh:68
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
future copy(input_stream< CharType > &in, output_stream< CharType > &out)
copy all the content from the input stream to the output stream
Definition: iostream-impl.hh:551