Seastar
High performance C++ framework for concurrent servers
iostream-impl.hh
1 /*
2  * This file is open source software, licensed to you under the terms
3  * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4  * distributed with this work for additional information regarding copyright
5  * ownership. You may not use this file except in compliance with the License.
6  *
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an
13  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14  * KIND, either express or implied. See the License for the
15  * specific language governing permissions and limitations
16  * under the License.
17  */
18 
19 /*
20  * Copyright (C) 2015 Cloudius Systems, Ltd.
21  */
22 
23 
24 #pragma once
25 
26 #include <seastar/core/do_with.hh>
27 #include <seastar/core/loop.hh>
28 #include <seastar/net/packet.hh>
29 #include <seastar/util/variant_utils.hh>
30 
31 namespace seastar {
32 
33 inline future<temporary_buffer<char>> data_source_impl::skip(uint64_t n)
34 {
35  return do_with(uint64_t(n), [this] (uint64_t& n) {
36  return repeat_until_value([&] {
37  return get().then([&] (temporary_buffer<char> buffer) -> std::optional<temporary_buffer<char>> {
38  if (buffer.empty()) {
39  return buffer;
40  }
41  if (buffer.size() >= n) {
42  buffer.trim_front(n);
43  return buffer;
44  }
45  n -= buffer.size();
46  return { };
47  });
48  });
49  });
50 }
51 
52 template<typename CharType>
53 inline
54 future<> output_stream<CharType>::write(const char_type* buf) noexcept {
55  return write(buf, strlen(buf));
56 }
57 
58 template<typename CharType>
59 template<typename StringChar, typename SizeType, SizeType MaxSize, bool NulTerminate>
60 inline
61 future<> output_stream<CharType>::write(const basic_sstring<StringChar, SizeType, MaxSize, NulTerminate>& s) noexcept {
62  return write(reinterpret_cast<const CharType *>(s.c_str()), s.size());
63 }
64 
65 template<typename CharType>
66 inline
67 future<> output_stream<CharType>::write(const std::basic_string<CharType>& s) noexcept {
68  return write(s.c_str(), s.size());
69 }
70 
71 template<typename CharType>
72 future<> output_stream<CharType>::write(scattered_message<CharType> msg) noexcept {
73  return write(std::move(msg).release());
74 }
75 
76 template<typename CharType>
78 output_stream<CharType>::zero_copy_put(net::packet p) noexcept {
79  // if flush is scheduled, disable it, so it will not try to write in parallel
80  _flush = false;
81  if (_flushing) {
82  // flush in progress, wait for it to end before continuing
83  return _in_batch.value().get_future().then([this, p = std::move(p)] () mutable {
84  return _fd.put(std::move(p));
85  });
86  } else {
87  return _fd.put(std::move(p));
88  }
89 }
90 
91 // Writes @p in chunks of _size length. The last chunk is buffered if smaller.
92 template <typename CharType>
94 output_stream<CharType>::zero_copy_split_and_put(net::packet p) noexcept {
95  return repeat([this, p = std::move(p)] () mutable {
96  if (p.len() < _size) {
97  if (p.len()) {
98  _zc_bufs = std::move(p);
99  } else {
100  _zc_bufs = net::packet::make_null_packet();
101  }
102  return make_ready_future<stop_iteration>(stop_iteration::yes);
103  }
104  auto chunk = p.share(0, _size);
105  p.trim_front(_size);
106  return zero_copy_put(std::move(chunk)).then([] {
107  return stop_iteration::no;
108  });
109  });
110 }
111 
112 template<typename CharType>
113 future<> output_stream<CharType>::write(net::packet p) noexcept {
114  static_assert(std::is_same_v<CharType, char>, "packet works on char");
115  try {
116  if (p.len() != 0) {
117  assert(!_end && "Mixing buffered writes and zero-copy writes not supported yet");
118 
119  if (_zc_bufs) {
120  _zc_bufs.append(std::move(p));
121  } else {
122  _zc_bufs = std::move(p);
123  }
124 
125  if (_zc_bufs.len() >= _size) {
126  if (_trim_to_size) {
127  return zero_copy_split_and_put(std::move(_zc_bufs));
128  } else {
129  return zero_copy_put(std::move(_zc_bufs));
130  }
131  }
132  }
133  return make_ready_future<>();
134  } catch (...) {
136  }
137 }
138 
139 template<typename CharType>
140 future<> output_stream<CharType>::write(temporary_buffer<CharType> p) noexcept {
141  try {
142  if (p.empty()) {
143  return make_ready_future<>();
144  }
145  assert(!_end && "Mixing buffered writes and zero-copy writes not supported yet");
146  return write(net::packet(std::move(p)));
147  } catch (...) {
149  }
150 }
151 
152 template <typename CharType>
153 future<temporary_buffer<CharType>>
154 input_stream<CharType>::read_exactly_part(size_t n, tmp_buf out, size_t completed) noexcept {
155  if (available()) {
156  auto now = std::min(n - completed, available());
157  std::copy(_buf.get(), _buf.get() + now, out.get_write() + completed);
158  _buf.trim_front(now);
159  completed += now;
160  }
161  if (completed == n) {
162  return make_ready_future<tmp_buf>(std::move(out));
163  }
164 
165  // _buf is now empty
166  return _fd.get().then([this, n, out = std::move(out), completed] (auto buf) mutable {
167  if (buf.size() == 0) {
168  _eof = true;
169  out.trim(completed);
170  return make_ready_future<tmp_buf>(std::move(out));
171  }
172  _buf = std::move(buf);
173  return this->read_exactly_part(n, std::move(out), completed);
174  });
175 }
176 
177 template <typename CharType>
178 future<temporary_buffer<CharType>>
180  if (_buf.size() == n) {
181  // easy case: steal buffer, return to caller
182  return make_ready_future<tmp_buf>(std::move(_buf));
183  } else if (_buf.size() > n) {
184  // buffer large enough, share it with caller
185  auto front = _buf.share(0, n);
186  _buf.trim_front(n);
187  return make_ready_future<tmp_buf>(std::move(front));
188  } else if (_buf.size() == 0) {
189  // buffer is empty: grab one and retry
190  return _fd.get().then([this, n] (auto buf) mutable {
191  if (buf.size() == 0) {
192  _eof = true;
193  return make_ready_future<tmp_buf>(std::move(buf));
194  }
195  _buf = std::move(buf);
196  return this->read_exactly(n);
197  });
198  } else {
199  try {
200  // buffer too small: start copy/read loop
201  tmp_buf b(n);
202  return read_exactly_part(n, std::move(b), 0);
203  } catch (...) {
204  return current_exception_as_future<tmp_buf>();
205  }
206  }
207 }
208 
209 template <typename CharType>
210 template <typename Consumer>
211 SEASTAR_CONCEPT(requires InputStreamConsumer<Consumer, CharType> || ObsoleteInputStreamConsumer<Consumer, CharType>)
212 future<>
213 input_stream<CharType>::consume(Consumer&& consumer) noexcept(std::is_nothrow_move_constructible_v<Consumer>) {
214  return repeat([consumer = std::move(consumer), this] () mutable {
215  if (_buf.empty() && !_eof) {
216  return _fd.get().then([this] (tmp_buf buf) {
217  _buf = std::move(buf);
218  _eof = _buf.empty();
219  return make_ready_future<stop_iteration>(stop_iteration::no);
220  });
221  }
222  return consumer(std::move(_buf)).then([this] (consumption_result_type result) {
223  return seastar::visit(result.get(), [this] (const continue_consuming&) {
224  // If we're here, consumer consumed entire buffer and is ready for
225  // more now. So we do not return, and rather continue the loop.
226  //
227  // If we're at eof, we should stop.
228  return make_ready_future<stop_iteration>(stop_iteration(this->_eof));
229  }, [this] (stop_consuming<CharType>& stop) {
230  // consumer is done
231  this->_buf = std::move(stop.get_buffer());
232  return make_ready_future<stop_iteration>(stop_iteration::yes);
233  }, [this] (const skip_bytes& skip) {
234  return this->_fd.skip(skip.get_value()).then([this](tmp_buf buf) {
235  if (!buf.empty()) {
236  this->_buf = std::move(buf);
237  }
238  return make_ready_future<stop_iteration>(stop_iteration::no);
239  });
240  });
241  });
242  });
243 }
244 
245 template <typename CharType>
246 template <typename Consumer>
247 SEASTAR_CONCEPT(requires InputStreamConsumer<Consumer, CharType> || ObsoleteInputStreamConsumer<Consumer, CharType>)
248 future<>
249 input_stream<CharType>::consume(Consumer& consumer) noexcept(std::is_nothrow_move_constructible_v<Consumer>) {
250  return consume(std::ref(consumer));
251 }
252 
253 template <typename CharType>
254 future<temporary_buffer<CharType>>
257  if (_buf.empty()) {
258  if (_eof) {
259  return make_ready_future<tmp_buf>();
260  } else {
261  return _fd.get().then([this, n] (tmp_buf buf) {
262  _eof = buf.empty();
263  _buf = std::move(buf);
264  return read_up_to(n);
265  });
266  }
267  } else if (_buf.size() <= n) {
268  // easy case: steal buffer, return to caller
269  return make_ready_future<tmp_buf>(std::move(_buf));
270  } else {
271  try {
272  // buffer is larger than n, so share its head with a caller
273  auto front = _buf.share(0, n);
274  _buf.trim_front(n);
275  return make_ready_future<tmp_buf>(std::move(front));
276  } catch (...) {
277  return current_exception_as_future<tmp_buf>();
278  }
279  }
280 }
281 
282 template <typename CharType>
286  if (_eof) {
287  return make_ready_future<tmp_buf>();
288  }
289  if (_buf.empty()) {
290  return _fd.get().then([this] (tmp_buf buf) {
291  _eof = buf.empty();
292  return make_ready_future<tmp_buf>(std::move(buf));
293  });
294  } else {
295  return make_ready_future<tmp_buf>(std::move(_buf));
296  }
297 }
298 
299 template <typename CharType>
300 future<>
301 input_stream<CharType>::skip(uint64_t n) noexcept {
302  auto skip_buf = std::min(n, _buf.size());
303  _buf.trim_front(skip_buf);
304  n -= skip_buf;
305  if (!n) {
306  return make_ready_future<>();
307  }
308  return _fd.skip(n).then([this] (temporary_buffer<CharType> buffer) {
309  _buf = std::move(buffer);
310  });
311 }
312 
313 template <typename CharType>
316  if (_buf) {
317  throw std::logic_error("detach() called on a used input_stream");
318  }
319 
320  return std::move(_fd);
321 }
322 
323 // Writes @buf in chunks of _size length. The last chunk is buffered if smaller.
324 template <typename CharType>
325 future<>
327  assert(_end == 0);
328 
329  return repeat([this, buf = std::move(buf)] () mutable {
330  if (buf.size() < _size) {
331  if (!_buf) {
332  _buf = _fd.allocate_buffer(_size);
333  }
334  std::copy(buf.get(), buf.get() + buf.size(), _buf.get_write());
335  _end = buf.size();
336  return make_ready_future<stop_iteration>(stop_iteration::yes);
337  }
338  auto chunk = buf.share(0, _size);
339  buf.trim_front(_size);
340  return put(std::move(chunk)).then([] {
341  return stop_iteration::no;
342  });
343  });
344 }
345 
346 template <typename CharType>
347 future<>
348 output_stream<CharType>::write(const char_type* buf, size_t n) noexcept {
349  if (__builtin_expect(!_buf || n > _size - _end, false)) {
350  return slow_write(buf, n);
351  }
352  std::copy_n(buf, n, _buf.get_write() + _end);
353  _end += n;
354  return make_ready_future<>();
355 }
356 
357 template <typename CharType>
358 future<>
359 output_stream<CharType>::slow_write(const char_type* buf, size_t n) noexcept {
360  try {
361  assert(!_zc_bufs && "Mixing buffered writes and zero-copy writes not supported yet");
362  auto bulk_threshold = _end ? (2 * _size - _end) : _size;
363  if (n >= bulk_threshold) {
364  if (_end) {
365  auto now = _size - _end;
366  std::copy(buf, buf + now, _buf.get_write() + _end);
367  _end = _size;
368  temporary_buffer<char> tmp = _fd.allocate_buffer(n - now);
369  std::copy(buf + now, buf + n, tmp.get_write());
370  _buf.trim(_end);
371  _end = 0;
372  return put(std::move(_buf)).then([this, tmp = std::move(tmp)]() mutable {
373  if (_trim_to_size) {
374  return split_and_put(std::move(tmp));
375  } else {
376  return put(std::move(tmp));
377  }
378  });
379  } else {
380  temporary_buffer<char> tmp = _fd.allocate_buffer(n);
381  std::copy(buf, buf + n, tmp.get_write());
382  if (_trim_to_size) {
383  return split_and_put(std::move(tmp));
384  } else {
385  return put(std::move(tmp));
386  }
387  }
388  }
389 
390  if (!_buf) {
391  _buf = _fd.allocate_buffer(_size);
392  }
393 
394  auto now = std::min(n, _size - _end);
395  std::copy(buf, buf + now, _buf.get_write() + _end);
396  _end += now;
397  if (now == n) {
398  return make_ready_future<>();
399  } else {
400  temporary_buffer<char> next = _fd.allocate_buffer(_size);
401  std::copy(buf + now, buf + n, next.get_write());
402  _end = n - now;
403  std::swap(next, _buf);
404  return put(std::move(next));
405  }
406  } catch (...) {
408  }
409 }
410 
411 namespace internal {
412 void add_to_flush_poller(output_stream<char>& x) noexcept;
413 }
414 
415 template <typename CharType>
416 future<> output_stream<CharType>::do_flush() noexcept {
417  if (_end) {
418  _buf.trim(_end);
419  _end = 0;
420  return _fd.put(std::move(_buf)).then([this] {
421  return _fd.flush();
422  });
423  } else if (_zc_bufs) {
424  return _fd.put(std::move(_zc_bufs)).then([this] {
425  return _fd.flush();
426  });
427  } else {
428  return _fd.flush();
429  }
430 }
431 
432 template <typename CharType>
433 future<>
434 output_stream<CharType>::flush() noexcept {
435  if (!_batch_flushes) {
436  return do_flush();
437  } else {
438  if (_ex) {
439  // flush is a good time to deliver outstanding errors
440  return make_exception_future<>(std::move(_ex));
441  } else {
442  _flush = true;
443  if (!_in_batch) {
444  internal::add_to_flush_poller(*this);
445  _in_batch = promise<>();
446  }
447  }
448  }
449  return make_ready_future<>();
450 }
451 
452 template <typename CharType>
453 future<>
455  // if flush is scheduled, disable it, so it will not try to write in parallel
456  _flush = false;
457  if (_flushing) {
458  // flush in progress, wait for it to end before continuing
459  return _in_batch.value().get_future().then([this, buf = std::move(buf)] () mutable {
460  return _fd.put(std::move(buf));
461  });
462  } else {
463  return _fd.put(std::move(buf));
464  }
465 }
466 
467 template <typename CharType>
468 void
470  if (!_flush) {
471  // flush was canceled, do nothing
472  _flushing = false;
473  _in_batch.value().set_value();
474  _in_batch = std::nullopt;
475  return;
476  }
477 
478  _flush = false;
479  _flushing = true; // make whoever wants to write into the fd to wait for flush to complete
480 
481  // FIXME: future is discarded
482  (void)do_flush().then_wrapped([this] (future<> f) {
483  try {
484  f.get();
485  } catch (...) {
486  _ex = std::current_exception();
487  _fd.on_batch_flush_error();
488  }
489  // if flush() was called while flushing flush once more
490  poll_flush();
491  });
492 }
493 
494 template <typename CharType>
495 future<>
497  return flush().finally([this] {
498  if (_in_batch) {
499  return _in_batch.value().get_future();
500  } else {
501  return make_ready_future();
502  }
503  }).then([this] {
504  // report final exception as close error
505  if (_ex) {
506  std::rethrow_exception(_ex);
507  }
508  }).finally([this] {
509  return _fd.close();
510  });
511 }
512 
513 template <typename CharType>
514 data_sink
516  if (_buf) {
517  throw std::logic_error("detach() called on a used output_stream");
518  }
519 
520  return std::move(_fd);
521 }
522 
523 namespace internal {
524 
526 template <typename CharType>
527 struct stream_copy_consumer {
528 private:
530  using unconsumed_remainder = std::optional<temporary_buffer<CharType>>;
531 public:
532  stream_copy_consumer(output_stream<CharType>& os) : _os(os) {
533  }
534  future<unconsumed_remainder> operator()(temporary_buffer<CharType> data) {
535  if (data.empty()) {
536  return make_ready_future<unconsumed_remainder>(std::move(data));
537  }
538  return _os.write(data.get(), data.size()).then([] () {
539  return make_ready_future<unconsumed_remainder>();
540  });
541  }
542 };
544 
545 }
546 
547 extern template struct internal::stream_copy_consumer<char>;
548 
549 template <typename CharType>
551  return in.consume(internal::stream_copy_consumer<CharType>(out));
552 }
553 
554 extern template future<> copy<char>(input_stream<char>&, output_stream<char>&);
555 }
Definition: iostream.hh:148
Definition: iostream.hh:68
Result then(Func &&func) noexcept
Schedule a block of code to run when the future is ready.
Definition: future.hh:1410
value_type && get()
gets the value returned by the computation
Definition: future.hh:1340
Definition: iostream.hh:281
Definition: iostream.hh:379
Definition: temporary_buffer.hh:67
bool empty() const noexcept
Checks whether the buffer is empty.
Definition: temporary_buffer.hh:152
CharType * get_write() noexcept
Definition: temporary_buffer.hh:128
future< T > make_ready_future(A &&... value) noexcept
Creates a future in an available, value state.
Definition: future.hh:1934
future< T > current_exception_as_future() noexcept
Returns std::current_exception() wrapped in a future.
Definition: future.hh:1953
auto do_with(T1 &&rv1, T2 &&rv2, More &&... more) noexcept
Definition: do_with.hh:135
repeat_until_value_return_type< AsyncAction > repeat_until_value(AsyncAction action) noexcept
Definition: loop.hh:237
future now()
Returns a ready future.
Definition: later.hh:35
auto visit(Variant &&variant, Args &&... args)
Definition: variant_utils.hh:68
reference_wrapper< T > ref(T &object) noexcept
Wraps reference in a reference_wrapper.
Definition: reference_wrapper.hh:62
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
future copy(input_stream< CharType > &in, output_stream< CharType > &out)
copy all the content from the input stream to the output stream
Definition: iostream-impl.hh:550