Seastar
High performance C++ framework for concurrent servers
iostream-impl.hh
1 /*
2  * This file is open source software, licensed to you under the terms
3  * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4  * distributed with this work for additional information regarding copyright
5  * ownership. You may not use this file except in compliance with the License.
6  *
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an
13  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14  * KIND, either express or implied. See the License for the
15  * specific language governing permissions and limitations
16  * under the License.
17  */
18 
19 /*
20  * Copyright (C) 2015 Cloudius Systems, Ltd.
21  */
22 
23 
24 #pragma once
25 
26 #include <seastar/core/do_with.hh>
27 #include <seastar/core/loop.hh>
28 #include <seastar/net/packet.hh>
29 #include <seastar/util/variant_utils.hh>
30 
31 namespace seastar {
32 
33 inline future<temporary_buffer<char>> data_source_impl::skip(uint64_t n)
34 {
35  return do_with(uint64_t(n), [this] (uint64_t& n) {
36  return repeat_until_value([&] {
37  return get().then([&] (temporary_buffer<char> buffer) -> std::optional<temporary_buffer<char>> {
38  if (buffer.size() >= n) {
39  buffer.trim_front(n);
40  return buffer;
41  }
42  n -= buffer.size();
43  return { };
44  });
45  });
46  });
47 }
48 
49 template<typename CharType>
50 inline
51 future<> output_stream<CharType>::write(const char_type* buf) {
52  return write(buf, strlen(buf));
53 }
54 
55 template<typename CharType>
56 template<typename StringChar, typename SizeType, SizeType MaxSize, bool NulTerminate>
57 inline
58 future<> output_stream<CharType>::write(const basic_sstring<StringChar, SizeType, MaxSize, NulTerminate>& s) {
59  return write(reinterpret_cast<const CharType *>(s.c_str()), s.size());
60 }
61 
62 template<typename CharType>
63 inline
64 future<> output_stream<CharType>::write(const std::basic_string<CharType>& s) {
65  return write(s.c_str(), s.size());
66 }
67 
68 template<typename CharType>
69 future<> output_stream<CharType>::write(scattered_message<CharType> msg) {
70  return write(std::move(msg).release());
71 }
72 
73 template<typename CharType>
74 future<>
75 output_stream<CharType>::zero_copy_put(net::packet p) {
76  // if flush is scheduled, disable it, so it will not try to write in parallel
77  _flush = false;
78  if (_flushing) {
79  // flush in progress, wait for it to end before continuing
80  return _in_batch.value().get_future().then([this, p = std::move(p)] () mutable {
81  return _fd.put(std::move(p));
82  });
83  } else {
84  return _fd.put(std::move(p));
85  }
86 }
87 
88 // Writes @p in chunks of _size length. The last chunk is buffered if smaller.
89 template <typename CharType>
90 future<>
91 output_stream<CharType>::zero_copy_split_and_put(net::packet p) {
92  return repeat([this, p = std::move(p)] () mutable {
93  if (p.len() < _size) {
94  if (p.len()) {
95  _zc_bufs = std::move(p);
96  } else {
97  _zc_bufs = net::packet::make_null_packet();
98  }
99  return make_ready_future<stop_iteration>(stop_iteration::yes);
100  }
101  auto chunk = p.share(0, _size);
102  p.trim_front(_size);
103  return zero_copy_put(std::move(chunk)).then([] {
104  return stop_iteration::no;
105  });
106  });
107 }
108 
109 template<typename CharType>
110 future<> output_stream<CharType>::write(net::packet p) {
111  static_assert(std::is_same<CharType, char>::value, "packet works on char");
112 
113  if (p.len() != 0) {
114  assert(!_end && "Mixing buffered writes and zero-copy writes not supported yet");
115 
116  if (_zc_bufs) {
117  _zc_bufs.append(std::move(p));
118  } else {
119  _zc_bufs = std::move(p);
120  }
121 
122  if (_zc_bufs.len() >= _size) {
123  if (_trim_to_size) {
124  return zero_copy_split_and_put(std::move(_zc_bufs));
125  } else {
126  return zero_copy_put(std::move(_zc_bufs));
127  }
128  }
129  }
130  return make_ready_future<>();
131 }
132 
133 template<typename CharType>
134 future<> output_stream<CharType>::write(temporary_buffer<CharType> p) {
135  if (p.empty()) {
136  return make_ready_future<>();
137  }
138  assert(!_end && "Mixing buffered writes and zero-copy writes not supported yet");
139 
140  return write(net::packet(std::move(p)));
141 }
142 
143 template <typename CharType>
144 future<temporary_buffer<CharType>>
145 input_stream<CharType>::read_exactly_part(size_t n, tmp_buf out, size_t completed) {
146  if (available()) {
147  auto now = std::min(n - completed, available());
148  std::copy(_buf.get(), _buf.get() + now, out.get_write() + completed);
149  _buf.trim_front(now);
150  completed += now;
151  }
152  if (completed == n) {
153  return make_ready_future<tmp_buf>(std::move(out));
154  }
155 
156  // _buf is now empty
157  return _fd.get().then([this, n, out = std::move(out), completed] (auto buf) mutable {
158  if (buf.size() == 0) {
159  _eof = true;
160  return make_ready_future<tmp_buf>(std::move(buf));
161  }
162  _buf = std::move(buf);
163  return this->read_exactly_part(n, std::move(out), completed);
164  });
165 }
166 
167 template <typename CharType>
168 future<temporary_buffer<CharType>>
170  if (_buf.size() == n) {
171  // easy case: steal buffer, return to caller
172  return make_ready_future<tmp_buf>(std::move(_buf));
173  } else if (_buf.size() > n) {
174  // buffer large enough, share it with caller
175  auto front = _buf.share(0, n);
176  _buf.trim_front(n);
177  return make_ready_future<tmp_buf>(std::move(front));
178  } else if (_buf.size() == 0) {
179  // buffer is empty: grab one and retry
180  return _fd.get().then([this, n] (auto buf) mutable {
181  if (buf.size() == 0) {
182  _eof = true;
183  return make_ready_future<tmp_buf>(std::move(buf));
184  }
185  _buf = std::move(buf);
186  return this->read_exactly(n);
187  });
188  } else {
189  // buffer too small: start copy/read loop
190  tmp_buf b(n);
191  return read_exactly_part(n, std::move(b), 0);
192  }
193 }
194 
195 template <typename CharType>
196 template <typename Consumer>
197 SEASTAR_CONCEPT(requires InputStreamConsumer<Consumer, CharType> || ObsoleteInputStreamConsumer<Consumer, CharType>)
198 future<>
199 input_stream<CharType>::consume(Consumer&& consumer) {
200  return repeat([consumer = std::move(consumer), this] () mutable {
201  if (_buf.empty() && !_eof) {
202  return _fd.get().then([this] (tmp_buf buf) {
203  _buf = std::move(buf);
204  _eof = _buf.empty();
205  return make_ready_future<stop_iteration>(stop_iteration::no);
206  });
207  }
208  return consumer(std::move(_buf)).then([this] (consumption_result_type result) {
209  return seastar::visit(result.get(), [this] (const continue_consuming&) {
210  // If we're here, consumer consumed entire buffer and is ready for
211  // more now. So we do not return, and rather continue the loop.
212  //
213  // If we're at eof, we should stop.
214  return make_ready_future<stop_iteration>(stop_iteration(this->_eof));
215  }, [this] (stop_consuming<CharType>& stop) {
216  // consumer is done
217  this->_buf = std::move(stop.get_buffer());
218  return make_ready_future<stop_iteration>(stop_iteration::yes);
219  }, [this] (const skip_bytes& skip) {
220  return this->_fd.skip(skip.get_value()).then([this](tmp_buf buf) {
221  if (!buf.empty()) {
222  this->_buf = std::move(buf);
223  }
224  return make_ready_future<stop_iteration>(stop_iteration::no);
225  });
226  });
227  });
228  });
229 }
230 
231 template <typename CharType>
232 template <typename Consumer>
233 SEASTAR_CONCEPT(requires InputStreamConsumer<Consumer, CharType> || ObsoleteInputStreamConsumer<Consumer, CharType>)
234 future<>
235 input_stream<CharType>::consume(Consumer& consumer) {
236  return consume(std::ref(consumer));
237 }
238 
239 template <typename CharType>
243  if (_buf.empty()) {
244  if (_eof) {
245  return make_ready_future<tmp_buf>();
246  } else {
247  return _fd.get().then([this, n] (tmp_buf buf) {
248  _eof = buf.empty();
249  _buf = std::move(buf);
250  return read_up_to(n);
251  });
252  }
253  } else if (_buf.size() <= n) {
254  // easy case: steal buffer, return to caller
255  return make_ready_future<tmp_buf>(std::move(_buf));
256  } else {
257  // buffer is larger than n, so share its head with a caller
258  auto front = _buf.share(0, n);
259  _buf.trim_front(n);
260  return make_ready_future<tmp_buf>(std::move(front));
261  }
262 }
263 
264 template <typename CharType>
265 future<temporary_buffer<CharType>>
268  if (_eof) {
269  return make_ready_future<tmp_buf>();
270  }
271  if (_buf.empty()) {
272  return _fd.get().then([this] (tmp_buf buf) {
273  _eof = buf.empty();
274  return make_ready_future<tmp_buf>(std::move(buf));
275  });
276  } else {
277  return make_ready_future<tmp_buf>(std::move(_buf));
278  }
279 }
280 
281 template <typename CharType>
282 future<>
284  auto skip_buf = std::min(n, _buf.size());
285  _buf.trim_front(skip_buf);
286  n -= skip_buf;
287  if (!n) {
288  return make_ready_future<>();
289  }
290  return _fd.skip(n).then([this] (temporary_buffer<CharType> buffer) {
291  _buf = std::move(buffer);
292  });
293 }
294 
295 template <typename CharType>
298  if (_buf) {
299  throw std::logic_error("detach() called on a used input_stream");
300  }
301 
302  return std::move(_fd);
303 }
304 
305 // Writes @buf in chunks of _size length. The last chunk is buffered if smaller.
306 template <typename CharType>
307 future<>
309  assert(_end == 0);
310 
311  return repeat([this, buf = std::move(buf)] () mutable {
312  if (buf.size() < _size) {
313  if (!_buf) {
314  _buf = _fd.allocate_buffer(_size);
315  }
316  std::copy(buf.get(), buf.get() + buf.size(), _buf.get_write());
317  _end = buf.size();
318  return make_ready_future<stop_iteration>(stop_iteration::yes);
319  }
320  auto chunk = buf.share(0, _size);
321  buf.trim_front(_size);
322  return put(std::move(chunk)).then([] {
323  return stop_iteration::no;
324  });
325  });
326 }
327 
328 template <typename CharType>
329 future<>
330 output_stream<CharType>::write(const char_type* buf, size_t n) {
331  if (__builtin_expect(!_buf || n > _size - _end, false)) {
332  return slow_write(buf, n);
333  }
334  std::copy_n(buf, n, _buf.get_write() + _end);
335  _end += n;
336  return make_ready_future<>();
337 }
338 
339 template <typename CharType>
340 future<>
341 output_stream<CharType>::slow_write(const char_type* buf, size_t n) {
342  assert(!_zc_bufs && "Mixing buffered writes and zero-copy writes not supported yet");
343  auto bulk_threshold = _end ? (2 * _size - _end) : _size;
344  if (n >= bulk_threshold) {
345  if (_end) {
346  auto now = _size - _end;
347  std::copy(buf, buf + now, _buf.get_write() + _end);
348  _end = _size;
349  temporary_buffer<char> tmp = _fd.allocate_buffer(n - now);
350  std::copy(buf + now, buf + n, tmp.get_write());
351  _buf.trim(_end);
352  _end = 0;
353  return put(std::move(_buf)).then([this, tmp = std::move(tmp)]() mutable {
354  if (_trim_to_size) {
355  return split_and_put(std::move(tmp));
356  } else {
357  return put(std::move(tmp));
358  }
359  });
360  } else {
361  temporary_buffer<char> tmp = _fd.allocate_buffer(n);
362  std::copy(buf, buf + n, tmp.get_write());
363  if (_trim_to_size) {
364  return split_and_put(std::move(tmp));
365  } else {
366  return put(std::move(tmp));
367  }
368  }
369  }
370 
371  if (!_buf) {
372  _buf = _fd.allocate_buffer(_size);
373  }
374 
375  auto now = std::min(n, _size - _end);
376  std::copy(buf, buf + now, _buf.get_write() + _end);
377  _end += now;
378  if (now == n) {
379  return make_ready_future<>();
380  } else {
381  temporary_buffer<char> next = _fd.allocate_buffer(_size);
382  std::copy(buf + now, buf + n, next.get_write());
383  _end = n - now;
384  std::swap(next, _buf);
385  return put(std::move(next));
386  }
387 }
388 
389 template <typename CharType>
390 future<>
391 output_stream<CharType>::flush() {
392  if (!_batch_flushes) {
393  if (_end) {
394  _buf.trim(_end);
395  _end = 0;
396  return put(std::move(_buf)).then([this] {
397  return _fd.flush();
398  });
399  } else if (_zc_bufs) {
400  return zero_copy_put(std::move(_zc_bufs)).then([this] {
401  return _fd.flush();
402  });
403  }
404  } else {
405  if (_ex) {
406  // flush is a good time to deliver outstanding errors
407  return make_exception_future<>(std::move(_ex));
408  } else {
409  _flush = true;
410  if (!_in_batch) {
411  add_to_flush_poller(this);
412  _in_batch = promise<>();
413  }
414  }
415  }
416  return make_ready_future<>();
417 }
418 
419 void add_to_flush_poller(output_stream<char>* x);
420 
421 template <typename CharType>
422 future<>
423 output_stream<CharType>::put(temporary_buffer<CharType> buf) {
424  // if flush is scheduled, disable it, so it will not try to write in parallel
425  _flush = false;
426  if (_flushing) {
427  // flush in progress, wait for it to end before continuing
428  return _in_batch.value().get_future().then([this, buf = std::move(buf)] () mutable {
429  return _fd.put(std::move(buf));
430  });
431  } else {
432  return _fd.put(std::move(buf));
433  }
434 }
435 
436 template <typename CharType>
437 void
438 output_stream<CharType>::poll_flush() {
439  if (!_flush) {
440  // flush was canceled, do nothing
441  _flushing = false;
442  _in_batch.value().set_value();
443  _in_batch = std::nullopt;
444  return;
445  }
446 
447  auto f = make_ready_future();
448  _flush = false;
449  _flushing = true; // make whoever wants to write into the fd to wait for flush to complete
450 
451  if (_end) {
452  // send whatever is in the buffer right now
453  _buf.trim(_end);
454  _end = 0;
455  f = _fd.put(std::move(_buf));
456  } else if(_zc_bufs) {
457  f = _fd.put(std::move(_zc_bufs));
458  }
459 
460  // FIXME: future is discarded
461  (void)f.then([this] {
462  return _fd.flush();
463  }).then_wrapped([this] (future<> f) {
464  try {
465  f.get();
466  } catch (...) {
467  _ex = std::current_exception();
468  }
469  // if flush() was called while flushing flush once more
470  poll_flush();
471  });
472 }
473 
474 template <typename CharType>
475 future<>
477  return flush().finally([this] {
478  if (_in_batch) {
479  return _in_batch.value().get_future();
480  } else {
481  return make_ready_future();
482  }
483  }).then([this] {
484  // report final exception as close error
485  if (_ex) {
486  std::rethrow_exception(_ex);
487  }
488  }).finally([this] {
489  return _fd.close();
490  });
491 }
492 
493 template <typename CharType>
494 data_sink
496  if (_buf) {
497  throw std::logic_error("detach() called on a used output_stream");
498  }
499 
500  return std::move(_fd);
501 }
502 
503 namespace internal {
504 
506 template <typename CharType>
507 struct stream_copy_consumer {
508 private:
510  using unconsumed_remainder = std::optional<temporary_buffer<CharType>>;
511 public:
512  stream_copy_consumer(output_stream<CharType>& os) : _os(os) {
513  }
514  future<unconsumed_remainder> operator()(temporary_buffer<CharType> data) {
515  if (data.empty()) {
516  return make_ready_future<unconsumed_remainder>(std::move(data));
517  }
518  return _os.write(data.get(), data.size()).then([] () {
519  return make_ready_future<unconsumed_remainder>();
520  });
521  }
522 };
524 
525 }
526 
527 extern template struct internal::stream_copy_consumer<char>;
528 
529 template <typename CharType>
531  return in.consume(internal::stream_copy_consumer<CharType>(out));
532 }
533 
534 extern template future<> copy<char>(input_stream<char>&, output_stream<char>&);
535 }
seastar::data_source
Definition: iostream.hh:55
seastar::temporary_buffer::share
temporary_buffer share()
Definition: temporary_buffer.hh:155
seastar::now
future now()
Returns a ready future.
Definition: later.hh:35
seastar
Seastar API namespace.
Definition: abort_on_ebadf.hh:24
seastar::temporary_buffer::trim_front
void trim_front(size_t pos)
Definition: temporary_buffer.hh:181
seastar::temporary_buffer::empty
bool empty() const
Checks whether the buffer is empty.
Definition: temporary_buffer.hh:147
seastar::copy
future copy(input_stream< CharType > &in, output_stream< CharType > &out)
copy all the content from the input stream to the output stream
Definition: iostream-impl.hh:530
seastar::make_ready_future
future< T... > make_ready_future(A &&... value) noexcept
Creates a future in an available, value state.
Definition: future.hh:2048
seastar::input_stream
Definition: iostream.hh:200
seastar::ref
reference_wrapper< T > ref(T &object) noexcept
Wraps reference in a reference_wrapper.
Definition: reference_wrapper.hh:62
seastar::temporary_buffer
Definition: temporary_buffer.hh:62
seastar::temporary_buffer::size
size_t size() const
Gets the buffer size.
Definition: temporary_buffer.hh:125
seastar::do_with
auto do_with(T1 &&rv1, T2 &&rv2, More &&... more) noexcept
Definition: do_with.hh:129
seastar::visit
auto visit(Variant &&variant, Args &&... args)
Definition: variant_utils.hh:68
seastar::future
A representation of a possibly not-yet-computed value.
Definition: future.hh:1337
seastar::temporary_buffer::get
const CharType * get() const
Gets a pointer to the beginning of the buffer.
Definition: temporary_buffer.hh:120
seastar::repeat_until_value
repeat_until_value_return_type< AsyncAction > repeat_until_value(AsyncAction action) noexcept
Definition: loop.hh:229
seastar::output_stream
Definition: iostream.hh:287
seastar::data_sink
Definition: iostream.hh:93
seastar::temporary_buffer::get_write
CharType * get_write()
Definition: temporary_buffer.hh:123