Seastar
High performance C++ framework for concurrent servers
iostream-impl.hh
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18
19/*
20 * Copyright (C) 2015 Cloudius Systems, Ltd.
21 */
22
23
24#pragma once
25
26#include <seastar/core/coroutine.hh>
27#include <seastar/core/do_with.hh>
28#include <seastar/core/loop.hh>
29#include <seastar/net/packet.hh>
30#include <seastar/util/assert.hh>
31#include <seastar/util/variant_utils.hh>
32
33namespace seastar {
34
35inline future<temporary_buffer<char>> data_source_impl::skip(uint64_t n)
36{
37 return do_with(uint64_t(n), [this] (uint64_t& n) {
38 return repeat_until_value([&] {
39 return get().then([&] (temporary_buffer<char> buffer) -> std::optional<temporary_buffer<char>> {
40 if (buffer.empty()) {
41 return buffer;
42 }
43 if (buffer.size() >= n) {
44 buffer.trim_front(n);
45 return buffer;
46 }
47 n -= buffer.size();
48 return { };
49 });
50 });
51 });
52}
53
54template<typename CharType>
55inline
56future<> output_stream<CharType>::write(const char_type* buf) noexcept {
57 return write(buf, strlen(buf));
58}
59
60template<typename CharType>
61template<typename StringChar, typename SizeType, SizeType MaxSize, bool NulTerminate>
62inline
64 return write(reinterpret_cast<const CharType *>(s.c_str()), s.size());
65}
66
67template<typename CharType>
68inline
69future<> output_stream<CharType>::write(const std::basic_string<CharType>& s) noexcept {
70 return write(s.c_str(), s.size());
71}
72
73template<typename CharType>
75 return write(std::move(msg).release());
76}
77
78template<typename CharType>
81 // if flush is scheduled, disable it, so it will not try to write in parallel
82 _flush = false;
83 if (_flushing) {
84 // flush in progress, wait for it to end before continuing
85 return _in_batch.value().get_future().then([this, p = std::move(p)] () mutable {
86 return _fd.put(std::move(p));
87 });
88 } else {
89 return _fd.put(std::move(p));
90 }
91}
92
93// Writes @p in chunks of _size length. The last chunk is buffered if smaller.
94template <typename CharType>
95future<>
96output_stream<CharType>::zero_copy_split_and_put(net::packet p) noexcept {
97 return repeat([this, p = std::move(p)] () mutable {
98 if (p.len() < _size) {
99 if (p.len()) {
100 _zc_bufs = std::move(p);
101 } else {
102 _zc_bufs = net::packet::make_null_packet();
103 }
104 return make_ready_future<stop_iteration>(stop_iteration::yes);
105 }
106 auto chunk = p.share(0, _size);
107 p.trim_front(_size);
108 return zero_copy_put(std::move(chunk)).then([] {
109 return stop_iteration::no;
110 });
111 });
112}
113
114template<typename CharType>
116 static_assert(std::is_same_v<CharType, char>, "packet works on char");
117 try {
118 if (p.len() != 0) {
119 if (_end) {
120 _buf.trim(_end);
121 _end = 0;
122 SEASTAR_ASSERT(!_zc_bufs);
123 _zc_bufs = net::packet(std::move(_buf));
124 }
125
126 if (_zc_bufs) {
127 _zc_bufs.append(std::move(p));
128 } else {
129 _zc_bufs = std::move(p);
130 }
131
132 if (_zc_bufs.len() >= _size) {
133 if (_trim_to_size) {
134 return zero_copy_split_and_put(std::move(_zc_bufs));
135 } else {
136 return zero_copy_put(std::move(_zc_bufs));
137 }
138 }
139 }
140 return make_ready_future<>();
141 } catch (...) {
143 }
144}
145
146template<typename CharType>
148 try {
149 return write(net::packet(std::move(p)));
150 } catch (...) {
152 }
153}
154
155template <typename CharType>
159 size_t completed{0U};
160 while (completed < n) {
161 size_t avail = available();
162 if (avail) {
163 auto now = std::min(n - completed, avail);
164 std::copy_n(_buf.get(), now, out.get_write() + completed);
165 _buf.trim_front(now);
166 completed += now;
167 if (completed == n) {
168 break;
169 }
170 }
171
172 // _buf is now empty
173 temporary_buffer<CharType> buf = co_await _fd.get();
174 if (buf.size() == 0) {
175 _eof = true;
176 out.trim(completed);
177 break;
178 }
179 _buf = std::move(buf);
180 }
181 co_return out;
182}
183
184template <typename CharType>
185future<temporary_buffer<CharType>>
187 if (_buf.size() == n) {
188 // easy case: steal buffer, return to caller
189 return make_ready_future<tmp_buf>(std::move(_buf));
190 } else if (_buf.size() > n) {
191 // buffer large enough, share it with caller
192 auto front = _buf.share(0, n);
193 _buf.trim_front(n);
194 return make_ready_future<tmp_buf>(std::move(front));
195 } else if (_buf.size() == 0) {
196 // buffer is empty: grab one and retry
197 return _fd.get().then([this, n] (auto buf) mutable {
198 if (buf.size() == 0) {
199 _eof = true;
200 return make_ready_future<tmp_buf>(std::move(buf));
201 }
202 _buf = std::move(buf);
203 return this->read_exactly(n);
204 });
205 } else {
206 // buffer too small: start copy/read loop
207 return read_exactly_part(n);
208 }
209}
210
211template <typename CharType>
212template <typename Consumer>
213requires InputStreamConsumer<Consumer, CharType> || ObsoleteInputStreamConsumer<Consumer, CharType>
215input_stream<CharType>::consume(Consumer&& consumer) noexcept(std::is_nothrow_move_constructible_v<Consumer>) {
216 return repeat([consumer = std::move(consumer), this] () mutable {
217 if (_buf.empty() && !_eof) {
218 return _fd.get().then([this] (tmp_buf buf) {
219 _buf = std::move(buf);
220 _eof = _buf.empty();
221 return make_ready_future<stop_iteration>(stop_iteration::no);
222 });
223 }
224 return consumer(std::move(_buf)).then([this] (consumption_result_type result) {
225 return seastar::visit(result.get(), [this] (const continue_consuming&) {
226 // If we're here, consumer consumed entire buffer and is ready for
227 // more now. So we do not return, and rather continue the loop.
228 //
229 // If we're at eof, we should stop.
230 return make_ready_future<stop_iteration>(stop_iteration(this->_eof));
231 }, [this] (stop_consuming<CharType>& stop) {
232 // consumer is done
233 this->_buf = std::move(stop.get_buffer());
234 return make_ready_future<stop_iteration>(stop_iteration::yes);
235 }, [this] (const skip_bytes& skip) {
236 return this->_fd.skip(skip.get_value()).then([this](tmp_buf buf) {
237 if (!buf.empty()) {
238 this->_buf = std::move(buf);
239 }
240 return make_ready_future<stop_iteration>(stop_iteration::no);
241 });
242 });
243 });
244 });
245}
246
247template <typename CharType>
248template <typename Consumer>
249requires InputStreamConsumer<Consumer, CharType> || ObsoleteInputStreamConsumer<Consumer, CharType>
250future<>
251input_stream<CharType>::consume(Consumer& consumer) noexcept(std::is_nothrow_move_constructible_v<Consumer>) {
252 return consume(std::ref(consumer));
253}
254
255template <typename CharType>
256future<temporary_buffer<CharType>>
259 if (_buf.empty()) {
260 if (_eof) {
261 return make_ready_future<tmp_buf>();
262 } else {
263 return _fd.get().then([this, n] (tmp_buf buf) {
264 _eof = buf.empty();
265 _buf = std::move(buf);
266 return read_up_to(n);
267 });
268 }
269 } else if (_buf.size() <= n) {
270 // easy case: steal buffer, return to caller
271 return make_ready_future<tmp_buf>(std::move(_buf));
272 } else {
273 try {
274 // buffer is larger than n, so share its head with a caller
275 auto front = _buf.share(0, n);
276 _buf.trim_front(n);
277 return make_ready_future<tmp_buf>(std::move(front));
278 } catch (...) {
279 return current_exception_as_future<tmp_buf>();
280 }
281 }
282}
283
284template <typename CharType>
288 if (_eof) {
289 return make_ready_future<tmp_buf>();
290 }
291 if (_buf.empty()) {
292 return _fd.get().then([this] (tmp_buf buf) {
293 _eof = buf.empty();
294 return make_ready_future<tmp_buf>(std::move(buf));
295 });
296 } else {
297 return make_ready_future<tmp_buf>(std::move(_buf));
298 }
299}
300
301template <typename CharType>
303input_stream<CharType>::skip(uint64_t n) noexcept {
304 auto skip_buf = std::min(n, _buf.size());
305 _buf.trim_front(skip_buf);
306 n -= skip_buf;
307 if (!n) {
308 return make_ready_future<>();
309 }
310 return _fd.skip(n).then([this] (temporary_buffer<CharType> buffer) {
311 _buf = std::move(buffer);
312 });
313}
314
315template <typename CharType>
318 if (_buf) {
319 throw std::logic_error("detach() called on a used input_stream");
320 }
321
322 return std::move(_fd);
323}
324
325// Writes @buf in chunks of _size length. The last chunk is buffered if smaller.
326template <typename CharType>
329 SEASTAR_ASSERT(_end == 0);
331 return repeat([this, buf = std::move(buf)] () mutable {
332 if (buf.size() < _size) {
333 if (!_buf) {
334 _buf = _fd.allocate_buffer(_size);
335 }
336 std::copy(buf.get(), buf.get() + buf.size(), _buf.get_write());
337 _end = buf.size();
338 return make_ready_future<stop_iteration>(stop_iteration::yes);
339 }
340 auto chunk = buf.share(0, _size);
341 buf.trim_front(_size);
342 return put(std::move(chunk)).then([] {
343 return stop_iteration::no;
344 });
345 });
346}
347
348template <typename CharType>
350output_stream<CharType>::write(const char_type* buf, size_t n) noexcept {
351 if (__builtin_expect(!_buf || n > _size - _end, false)) {
352 return slow_write(buf, n);
353 }
354 std::copy_n(buf, n, _buf.get_write() + _end);
355 _end += n;
356 return make_ready_future<>();
357}
358
359template <typename CharType>
361output_stream<CharType>::slow_write(const char_type* buf, size_t n) noexcept {
362 try {
363 SEASTAR_ASSERT(!_zc_bufs && "Mixing buffered writes and zero-copy writes not supported yet");
364 auto bulk_threshold = _end ? (2 * _size - _end) : _size;
365 if (n >= bulk_threshold) {
366 if (_end) {
367 auto now = _size - _end;
368 std::copy(buf, buf + now, _buf.get_write() + _end);
369 _end = _size;
370 temporary_buffer<char> tmp = _fd.allocate_buffer(n - now);
371 std::copy(buf + now, buf + n, tmp.get_write());
372 _buf.trim(_end);
373 _end = 0;
374 return put(std::move(_buf)).then([this, tmp = std::move(tmp)]() mutable {
375 if (_trim_to_size) {
376 return split_and_put(std::move(tmp));
377 } else {
378 return put(std::move(tmp));
379 }
380 });
381 } else {
382 temporary_buffer<char> tmp = _fd.allocate_buffer(n);
383 std::copy(buf, buf + n, tmp.get_write());
384 if (_trim_to_size) {
385 return split_and_put(std::move(tmp));
386 } else {
387 return put(std::move(tmp));
388 }
389 }
390 }
391
392 if (!_buf) {
393 _buf = _fd.allocate_buffer(_size);
394 }
395
396 auto now = std::min(n, _size - _end);
397 std::copy(buf, buf + now, _buf.get_write() + _end);
398 _end += now;
399 if (now == n) {
400 return make_ready_future<>();
401 } else {
402 temporary_buffer<char> next = _fd.allocate_buffer(_size);
403 std::copy(buf + now, buf + n, next.get_write());
404 _end = n - now;
405 std::swap(next, _buf);
406 return put(std::move(next));
407 }
408 } catch (...) {
410 }
411}
412
413namespace internal {
414void add_to_flush_poller(output_stream<char>& x) noexcept;
415}
416
417template <typename CharType>
418future<> output_stream<CharType>::do_flush() noexcept {
419 if (_end) {
420 _buf.trim(_end);
421 _end = 0;
422 return _fd.put(std::move(_buf)).then([this] {
423 return _fd.flush();
424 });
425 } else if (_zc_bufs) {
426 return _fd.put(std::move(_zc_bufs)).then([this] {
427 return _fd.flush();
428 });
429 } else {
430 return _fd.flush();
431 }
432}
433
434template <typename CharType>
435future<>
436output_stream<CharType>::flush() noexcept {
437 if (!_batch_flushes) {
438 return do_flush();
439 } else {
440 if (_ex) {
441 // flush is a good time to deliver outstanding errors
442 return make_exception_future<>(std::move(_ex));
443 } else {
444 _flush = true;
445 if (!_in_batch) {
446 internal::add_to_flush_poller(*this);
447 _in_batch = promise<>();
448 }
449 }
450 }
451 return make_ready_future<>();
452}
453
454template <typename CharType>
455future<>
456output_stream<CharType>::put(temporary_buffer<CharType> buf) noexcept {
457 // if flush is scheduled, disable it, so it will not try to write in parallel
458 _flush = false;
459 if (_flushing) {
460 // flush in progress, wait for it to end before continuing
461 return _in_batch.value().get_future().then([this, buf = std::move(buf)] () mutable {
462 return _fd.put(std::move(buf));
463 });
464 } else {
465 return _fd.put(std::move(buf));
466 }
467}
469template <typename CharType>
470void
472 if (!_flush) {
473 // flush was canceled, do nothing
474 _flushing = false;
475 _in_batch.value().set_value();
476 _in_batch = std::nullopt;
477 return;
478 }
479
480 _flush = false;
481 _flushing = true; // make whoever wants to write into the fd to wait for flush to complete
482
483 // FIXME: future is discarded
484 (void)do_flush().then_wrapped([this] (future<> f) {
485 try {
486 f.get();
487 } catch (...) {
488 _ex = std::current_exception();
489 _fd.on_batch_flush_error();
490 }
491 // if flush() was called while flushing flush once more
492 poll_flush();
493 });
494}
495
496template <typename CharType>
499 return flush().finally([this] {
500 if (_in_batch) {
501 return _in_batch.value().get_future();
502 } else {
503 return make_ready_future();
504 }
505 }).then([this] {
506 // report final exception as close error
507 if (_ex) {
508 std::rethrow_exception(_ex);
509 }
510 }).finally([this] {
511 return _fd.close();
512 });
513}
514
515template <typename CharType>
518 if (_buf) {
519 throw std::logic_error("detach() called on a used output_stream");
520 }
521
522 return std::move(_fd);
523}
524
525namespace internal {
526
528template <typename CharType>
529struct stream_copy_consumer {
530private:
532 using consumption_result_type = consumption_result<CharType>;
533public:
534 stream_copy_consumer(output_stream<CharType>& os) : _os(os) {
535 }
536 future<consumption_result_type> operator()(temporary_buffer<CharType> data) {
537 if (data.empty()) {
538 return make_ready_future<consumption_result_type>(stop_consuming(std::move(data)));
539 }
540 return _os.write(data.get(), data.size()).then([] {
541 return make_ready_future<consumption_result_type>(continue_consuming());
542 });
543 }
544};
546
547}
548
549extern template struct internal::stream_copy_consumer<char>;
550
551template <typename CharType>
553 return in.consume(internal::stream_copy_consumer<CharType>(out));
554}
555
556extern template future<> copy<char>(input_stream<char>&, output_stream<char>&);
557}
Definition: sstring.hh:76
Definition: iostream.hh:242
Definition: iostream.hh:168
Definition: iostream.hh:74
A representation of a possibly not-yet-computed value.
Definition: future.hh:1195
Result then(Func &&func) noexcept
Schedule a block of code to run when the future is ready.
Definition: future.hh:1356
value_type && get()
gets the value returned by the computation
Definition: future.hh:1297
Definition: iostream.hh:300
Definition: packet.hh:87
Definition: iostream.hh:419
future write(const char_type *buf, size_t n) noexcept
Writes n bytes from the memory pointed by buf into the buffer.
Definition: iostream-impl.hh:350
Definition: scattered_message.hh:35
Definition: temporary_buffer.hh:68
bool empty() const noexcept
Checks whether the buffer is empty.
Definition: temporary_buffer.hh:153
CharType * get_write() noexcept
Definition: temporary_buffer.hh:129
future< T > make_ready_future(A &&... value) noexcept
Creates a future in an available, value state.
Definition: future.hh:1867
future< T > current_exception_as_future() noexcept
Returns std::current_exception() wrapped in a future.
Definition: future.hh:1886
auto do_with(T1 &&rv1, T2 &&rv2, More &&... more) noexcept
Definition: do_with.hh:135
repeat_until_value_return_type< AsyncAction > repeat_until_value(AsyncAction action) noexcept
Definition: loop.hh:238
future now()
Returns a ready future.
Definition: later.hh:35
reference_wrapper< T > ref(T &object) noexcept
Wraps reference in a reference_wrapper.
Definition: reference_wrapper.hh:62
auto visit(Variant &&variant, Args &&... args)
Definition: variant_utils.hh:68
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
future copy(input_stream< CharType > &in, output_stream< CharType > &out)
copy all the content from the input stream to the output stream
Definition: iostream-impl.hh:552