Seastar
High performance C++ framework for concurrent servers
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Modules Pages
iostream-impl.hh
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18
19/*
20 * Copyright (C) 2015 Cloudius Systems, Ltd.
21 */
22
23
24#pragma once
25
26#include <seastar/core/coroutine.hh>
27#include <seastar/core/do_with.hh>
28#include <seastar/core/loop.hh>
29#include <seastar/net/packet.hh>
30#include <seastar/util/variant_utils.hh>
31
32namespace seastar {
33
34inline future<temporary_buffer<char>> data_source_impl::skip(uint64_t n)
35{
36 return do_with(uint64_t(n), [this] (uint64_t& n) {
37 return repeat_until_value([&] {
38 return get().then([&] (temporary_buffer<char> buffer) -> std::optional<temporary_buffer<char>> {
39 if (buffer.empty()) {
40 return buffer;
41 }
42 if (buffer.size() >= n) {
43 buffer.trim_front(n);
44 return buffer;
45 }
46 n -= buffer.size();
47 return { };
48 });
49 });
50 });
51}
52
53template<typename CharType>
54inline
55future<> output_stream<CharType>::write(const char_type* buf) noexcept {
56 return write(buf, strlen(buf));
57}
58
59template<typename CharType>
60template<typename StringChar, typename SizeType, SizeType MaxSize, bool NulTerminate>
61inline
62future<> output_stream<CharType>::write(const basic_sstring<StringChar, SizeType, MaxSize, NulTerminate>& s) noexcept {
63 return write(reinterpret_cast<const CharType *>(s.c_str()), s.size());
64}
65
66template<typename CharType>
67inline
68future<> output_stream<CharType>::write(const std::basic_string<CharType>& s) noexcept {
69 return write(s.c_str(), s.size());
70}
71
72template<typename CharType>
73future<> output_stream<CharType>::write(scattered_message<CharType> msg) noexcept {
74 return write(std::move(msg).release());
75}
76
77template<typename CharType>
78future<>
79output_stream<CharType>::zero_copy_put(net::packet p) noexcept {
80 // if flush is scheduled, disable it, so it will not try to write in parallel
81 _flush = false;
82 if (_flushing) {
83 // flush in progress, wait for it to end before continuing
84 return _in_batch.value().get_future().then([this, p = std::move(p)] () mutable {
85 return _fd.put(std::move(p));
86 });
87 } else {
88 return _fd.put(std::move(p));
89 }
90}
91
92// Writes @p in chunks of _size length. The last chunk is buffered if smaller.
93template <typename CharType>
94future<>
95output_stream<CharType>::zero_copy_split_and_put(net::packet p) noexcept {
96 return repeat([this, p = std::move(p)] () mutable {
97 if (p.len() < _size) {
98 if (p.len()) {
99 _zc_bufs = std::move(p);
100 } else {
101 _zc_bufs = net::packet::make_null_packet();
102 }
103 return make_ready_future<stop_iteration>(stop_iteration::yes);
104 }
105 auto chunk = p.share(0, _size);
106 p.trim_front(_size);
107 return zero_copy_put(std::move(chunk)).then([] {
108 return stop_iteration::no;
109 });
110 });
111}
112
113template<typename CharType>
114future<> output_stream<CharType>::write(net::packet p) noexcept {
115 static_assert(std::is_same_v<CharType, char>, "packet works on char");
116 try {
117 if (p.len() != 0) {
118 assert(!_end && "Mixing buffered writes and zero-copy writes not supported yet");
119
120 if (_zc_bufs) {
121 _zc_bufs.append(std::move(p));
122 } else {
123 _zc_bufs = std::move(p);
124 }
125
126 if (_zc_bufs.len() >= _size) {
127 if (_trim_to_size) {
128 return zero_copy_split_and_put(std::move(_zc_bufs));
129 } else {
130 return zero_copy_put(std::move(_zc_bufs));
131 }
132 }
133 }
134 return make_ready_future<>();
135 } catch (...) {
137 }
138}
139
140template<typename CharType>
141future<> output_stream<CharType>::write(temporary_buffer<CharType> p) noexcept {
142 try {
143 if (p.empty()) {
144 return make_ready_future<>();
145 }
146 assert(!_end && "Mixing buffered writes and zero-copy writes not supported yet");
147 return write(net::packet(std::move(p)));
148 } catch (...) {
150 }
151}
152
153template <typename CharType>
154future<temporary_buffer<CharType>>
155input_stream<CharType>::read_exactly_part(size_t n) noexcept {
156 temporary_buffer<CharType> out(n);
157 size_t completed{0U};
158 while (completed < n) {
159 size_t avail = available();
160 if (avail) {
161 auto now = std::min(n - completed, avail);
162 std::copy_n(_buf.get(), now, out.get_write() + completed);
163 _buf.trim_front(now);
164 completed += now;
165 if (completed == n) {
166 break;
167 }
168 }
169
170 // _buf is now empty
171 temporary_buffer<CharType> buf = co_await _fd.get();
172 if (buf.size() == 0) {
173 _eof = true;
174 out.trim(completed);
175 break;
176 }
177 _buf = std::move(buf);
178 }
179 co_return out;
180}
181
182template <typename CharType>
183future<temporary_buffer<CharType>>
185 if (_buf.size() == n) {
186 // easy case: steal buffer, return to caller
187 return make_ready_future<tmp_buf>(std::move(_buf));
188 } else if (_buf.size() > n) {
189 // buffer large enough, share it with caller
190 auto front = _buf.share(0, n);
191 _buf.trim_front(n);
192 return make_ready_future<tmp_buf>(std::move(front));
193 } else if (_buf.size() == 0) {
194 // buffer is empty: grab one and retry
195 return _fd.get().then([this, n] (auto buf) mutable {
196 if (buf.size() == 0) {
197 _eof = true;
198 return make_ready_future<tmp_buf>(std::move(buf));
199 }
200 _buf = std::move(buf);
201 return this->read_exactly(n);
202 });
203 } else {
204 // buffer too small: start copy/read loop
205 return read_exactly_part(n);
206 }
207}
208
209template <typename CharType>
210template <typename Consumer>
211requires InputStreamConsumer<Consumer, CharType> || ObsoleteInputStreamConsumer<Consumer, CharType>
213input_stream<CharType>::consume(Consumer&& consumer) noexcept(std::is_nothrow_move_constructible_v<Consumer>) {
214 return repeat([consumer = std::move(consumer), this] () mutable {
215 if (_buf.empty() && !_eof) {
216 return _fd.get().then([this] (tmp_buf buf) {
217 _buf = std::move(buf);
218 _eof = _buf.empty();
219 return make_ready_future<stop_iteration>(stop_iteration::no);
220 });
221 }
222 return consumer(std::move(_buf)).then([this] (consumption_result_type result) {
223 return seastar::visit(result.get(), [this] (const continue_consuming&) {
224 // If we're here, consumer consumed entire buffer and is ready for
225 // more now. So we do not return, and rather continue the loop.
226 //
227 // If we're at eof, we should stop.
228 return make_ready_future<stop_iteration>(stop_iteration(this->_eof));
229 }, [this] (stop_consuming<CharType>& stop) {
230 // consumer is done
231 this->_buf = std::move(stop.get_buffer());
232 return make_ready_future<stop_iteration>(stop_iteration::yes);
233 }, [this] (const skip_bytes& skip) {
234 return this->_fd.skip(skip.get_value()).then([this](tmp_buf buf) {
235 if (!buf.empty()) {
236 this->_buf = std::move(buf);
237 }
238 return make_ready_future<stop_iteration>(stop_iteration::no);
239 });
240 });
241 });
242 });
243}
244
245template <typename CharType>
246template <typename Consumer>
247requires InputStreamConsumer<Consumer, CharType> || ObsoleteInputStreamConsumer<Consumer, CharType>
248future<>
249input_stream<CharType>::consume(Consumer& consumer) noexcept(std::is_nothrow_move_constructible_v<Consumer>) {
250 return consume(std::ref(consumer));
251}
252
253template <typename CharType>
254future<temporary_buffer<CharType>>
257 if (_buf.empty()) {
258 if (_eof) {
259 return make_ready_future<tmp_buf>();
260 } else {
261 return _fd.get().then([this, n] (tmp_buf buf) {
262 _eof = buf.empty();
263 _buf = std::move(buf);
264 return read_up_to(n);
265 });
266 }
267 } else if (_buf.size() <= n) {
268 // easy case: steal buffer, return to caller
269 return make_ready_future<tmp_buf>(std::move(_buf));
270 } else {
271 try {
272 // buffer is larger than n, so share its head with a caller
273 auto front = _buf.share(0, n);
274 _buf.trim_front(n);
275 return make_ready_future<tmp_buf>(std::move(front));
276 } catch (...) {
277 return current_exception_as_future<tmp_buf>();
278 }
279 }
280}
281
282template <typename CharType>
286 if (_eof) {
287 return make_ready_future<tmp_buf>();
288 }
289 if (_buf.empty()) {
290 return _fd.get().then([this] (tmp_buf buf) {
291 _eof = buf.empty();
292 return make_ready_future<tmp_buf>(std::move(buf));
293 });
294 } else {
295 return make_ready_future<tmp_buf>(std::move(_buf));
296 }
297}
298
299template <typename CharType>
301input_stream<CharType>::skip(uint64_t n) noexcept {
302 auto skip_buf = std::min(n, _buf.size());
303 _buf.trim_front(skip_buf);
304 n -= skip_buf;
305 if (!n) {
306 return make_ready_future<>();
307 }
308 return _fd.skip(n).then([this] (temporary_buffer<CharType> buffer) {
309 _buf = std::move(buffer);
310 });
311}
312
313template <typename CharType>
316 if (_buf) {
317 throw std::logic_error("detach() called on a used input_stream");
318 }
319
320 return std::move(_fd);
321}
322
323// Writes @buf in chunks of _size length. The last chunk is buffered if smaller.
324template <typename CharType>
327 assert(_end == 0);
328
329 return repeat([this, buf = std::move(buf)] () mutable {
330 if (buf.size() < _size) {
331 if (!_buf) {
332 _buf = _fd.allocate_buffer(_size);
333 }
334 std::copy(buf.get(), buf.get() + buf.size(), _buf.get_write());
335 _end = buf.size();
336 return make_ready_future<stop_iteration>(stop_iteration::yes);
337 }
338 auto chunk = buf.share(0, _size);
339 buf.trim_front(_size);
340 return put(std::move(chunk)).then([] {
341 return stop_iteration::no;
342 });
343 });
344}
345
346template <typename CharType>
347future<>
348output_stream<CharType>::write(const char_type* buf, size_t n) noexcept {
349 if (__builtin_expect(!_buf || n > _size - _end, false)) {
350 return slow_write(buf, n);
351 }
352 std::copy_n(buf, n, _buf.get_write() + _end);
353 _end += n;
354 return make_ready_future<>();
355}
356
357template <typename CharType>
359output_stream<CharType>::slow_write(const char_type* buf, size_t n) noexcept {
360 try {
361 assert(!_zc_bufs && "Mixing buffered writes and zero-copy writes not supported yet");
362 auto bulk_threshold = _end ? (2 * _size - _end) : _size;
363 if (n >= bulk_threshold) {
364 if (_end) {
365 auto now = _size - _end;
366 std::copy(buf, buf + now, _buf.get_write() + _end);
367 _end = _size;
368 temporary_buffer<char> tmp = _fd.allocate_buffer(n - now);
369 std::copy(buf + now, buf + n, tmp.get_write());
370 _buf.trim(_end);
371 _end = 0;
372 return put(std::move(_buf)).then([this, tmp = std::move(tmp)]() mutable {
373 if (_trim_to_size) {
374 return split_and_put(std::move(tmp));
375 } else {
376 return put(std::move(tmp));
377 }
378 });
379 } else {
380 temporary_buffer<char> tmp = _fd.allocate_buffer(n);
381 std::copy(buf, buf + n, tmp.get_write());
382 if (_trim_to_size) {
383 return split_and_put(std::move(tmp));
384 } else {
385 return put(std::move(tmp));
386 }
387 }
388 }
389
390 if (!_buf) {
391 _buf = _fd.allocate_buffer(_size);
392 }
393
394 auto now = std::min(n, _size - _end);
395 std::copy(buf, buf + now, _buf.get_write() + _end);
396 _end += now;
397 if (now == n) {
398 return make_ready_future<>();
399 } else {
400 temporary_buffer<char> next = _fd.allocate_buffer(_size);
401 std::copy(buf + now, buf + n, next.get_write());
402 _end = n - now;
403 std::swap(next, _buf);
404 return put(std::move(next));
405 }
406 } catch (...) {
408 }
409}
410
411namespace internal {
412void add_to_flush_poller(output_stream<char>& x) noexcept;
413}
414
415template <typename CharType>
416future<> output_stream<CharType>::do_flush() noexcept {
417 if (_end) {
418 _buf.trim(_end);
419 _end = 0;
420 return _fd.put(std::move(_buf)).then([this] {
421 return _fd.flush();
422 });
423 } else if (_zc_bufs) {
424 return _fd.put(std::move(_zc_bufs)).then([this] {
425 return _fd.flush();
426 });
427 } else {
428 return _fd.flush();
429 }
430}
431
432template <typename CharType>
433future<>
434output_stream<CharType>::flush() noexcept {
435 if (!_batch_flushes) {
436 return do_flush();
437 } else {
438 if (_ex) {
439 // flush is a good time to deliver outstanding errors
440 return make_exception_future<>(std::move(_ex));
441 } else {
442 _flush = true;
443 if (!_in_batch) {
444 internal::add_to_flush_poller(*this);
445 _in_batch = promise<>();
446 }
447 }
448 }
449 return make_ready_future<>();
450}
451
452template <typename CharType>
455 // if flush is scheduled, disable it, so it will not try to write in parallel
456 _flush = false;
457 if (_flushing) {
458 // flush in progress, wait for it to end before continuing
459 return _in_batch.value().get_future().then([this, buf = std::move(buf)] () mutable {
460 return _fd.put(std::move(buf));
461 });
462 } else {
463 return _fd.put(std::move(buf));
464 }
465}
467template <typename CharType>
468void
470 if (!_flush) {
471 // flush was canceled, do nothing
472 _flushing = false;
473 _in_batch.value().set_value();
474 _in_batch = std::nullopt;
475 return;
476 }
477
478 _flush = false;
479 _flushing = true; // make whoever wants to write into the fd to wait for flush to complete
480
481 // FIXME: future is discarded
482 (void)do_flush().then_wrapped([this] (future<> f) {
483 try {
484 f.get();
485 } catch (...) {
486 _ex = std::current_exception();
487 _fd.on_batch_flush_error();
488 }
489 // if flush() was called while flushing flush once more
490 poll_flush();
491 });
492}
493
494template <typename CharType>
497 return flush().finally([this] {
498 if (_in_batch) {
499 return _in_batch.value().get_future();
500 } else {
501 return make_ready_future();
502 }
503 }).then([this] {
504 // report final exception as close error
505 if (_ex) {
506 std::rethrow_exception(_ex);
507 }
508 }).finally([this] {
509 return _fd.close();
510 });
511}
512
513template <typename CharType>
516 if (_buf) {
517 throw std::logic_error("detach() called on a used output_stream");
518 }
519
520 return std::move(_fd);
521}
522
523namespace internal {
524
526template <typename CharType>
527struct stream_copy_consumer {
528private:
530 using consumption_result_type = consumption_result<CharType>;
531public:
532 stream_copy_consumer(output_stream<CharType>& os) : _os(os) {
533 }
534 future<consumption_result_type> operator()(temporary_buffer<CharType> data) {
535 if (data.empty()) {
536 return make_ready_future<consumption_result_type>(stop_consuming(std::move(data)));
537 }
538 return _os.write(data.get(), data.size()).then([] {
539 return make_ready_future<consumption_result_type>(continue_consuming());
540 });
541 }
542};
544
545}
546
547extern template struct internal::stream_copy_consumer<char>;
548
549template <typename CharType>
551 return in.consume(internal::stream_copy_consumer<CharType>(out));
552}
553
554extern template future<> copy<char>(input_stream<char>&, output_stream<char>&);
555}
Definition: iostream.hh:237
Definition: iostream.hh:163
Definition: iostream.hh:69
A representation of a possibly not-yet-computed value.
Definition: future.hh:1219
Result then(Func &&func) noexcept
Schedule a block of code to run when the future is ready.
Definition: future.hh:1404
value_type && get()
gets the value returned by the computation
Definition: future.hh:1321
Definition: iostream.hh:295
Definition: iostream.hh:393
Definition: temporary_buffer.hh:67
bool empty() const noexcept
Checks whether the buffer is empty.
Definition: temporary_buffer.hh:152
CharType * get_write() noexcept
Definition: temporary_buffer.hh:128
future< T > make_ready_future(A &&... value) noexcept
Creates a future in an available, value state.
Definition: future.hh:1922
future< T > current_exception_as_future() noexcept
Returns std::current_exception() wrapped in a future.
Definition: future.hh:1941
auto do_with(T1 &&rv1, T2 &&rv2, More &&... more) noexcept
Definition: do_with.hh:135
repeat_until_value_return_type< AsyncAction > repeat_until_value(AsyncAction action) noexcept
Definition: loop.hh:237
future now()
Returns a ready future.
Definition: later.hh:35
reference_wrapper< T > ref(T &object) noexcept
Wraps reference in a reference_wrapper.
Definition: reference_wrapper.hh:62
auto visit(Variant &&variant, Args &&... args)
Definition: variant_utils.hh:68
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
future copy(input_stream< CharType > &in, output_stream< CharType > &out)
copy all the content from the input stream to the output stream
Definition: iostream-impl.hh:550