Seastar
High performance C++ framework for concurrent servers
simple-stream.hh
1 /*
2  * This file is open source software, licensed to you under the terms
3  * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4  * distributed with this work for additional information regarding copyright
5  * ownership. You may not use this file except in compliance with the License.
6  *
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an
13  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14  * KIND, either express or implied. See the License for the
15  * specific language governing permissions and limitations
16  * under the License.
17  */
18 /*
19  * Copyright (C) 2016 Scylladb, Ltd.
20  */
21 
22 #pragma once
23 #include <seastar/core/sstring.hh>
24 #include <seastar/util/variant_utils.hh>
25 
26 namespace seastar {
27 
29  size_t _size = 0;
30 public:
31  void write(const char* data, size_t size) {
32  _size += size;
33  }
34 
35  size_t size() const {
36  return _size;
37  }
38 };
39 
40 template<typename>
42 
44 
45 template<typename Iterator>
47 
48 template<typename Iterator>
50 
52  char* _p = nullptr;
53  size_t _size = 0;
54 public:
55  using has_with_stream = std::false_type;
57  simple_memory_output_stream(char* p, size_t size, size_t start = 0) : _p(p + start), _size(size) {}
58  char* begin() { return _p; }
59 
60  [[gnu::always_inline]]
61  void skip(size_t size) {
62  if (size > _size) {
63  throw std::out_of_range("serialization buffer overflow");
64  }
65  _p += size;
66  _size -= size;
67  }
68 
69  [[gnu::always_inline]]
70  simple_memory_output_stream write_substream(size_t size) {
71  if (size > _size) {
72  throw std::out_of_range("serialization buffer overflow");
73  }
74  simple_memory_output_stream substream(_p, size);
75  skip(size);
76  return substream;
77  }
78 
79  [[gnu::always_inline]]
80  void write(const char* p, size_t size) {
81  if (size > _size) {
82  throw std::out_of_range("serialization buffer overflow");
83  }
84  std::copy_n(p, size, _p);
85  skip(size);
86  }
87 
88  [[gnu::always_inline]]
89  void fill(char c, size_t size) {
90  if (size > _size) {
91  throw std::out_of_range("serialization buffer overflow");
92  }
93  std::fill_n(_p, size, c);
94  skip(size);
95  }
96 
97  [[gnu::always_inline]]
98  const size_t size() const {
99  return _size;
100  }
101 
102  // simple_memory_output_stream is a write cursor that keeps a mutable view of some
103  // underlying buffer and provides write interface. to_input_stream() converts it
104  // to a read cursor that points to the same part of the buffer but provides
105  // read interface.
106  simple_memory_input_stream to_input_stream() const;
107 };
108 
109 template<typename Iterator>
112 
113  Iterator _it;
114  simple _current;
115  size_t _size = 0;
116 
117  friend class memory_input_stream<Iterator>;
118 private:
119  template<typename Func>
120  //requires requires(Func f, view bv) { { f(bv) } -> void; }
121  void for_each_fragment(size_t size, Func&& func) {
122  if (size > _size) {
123  throw std::out_of_range("serialization buffer overflow");
124  }
125  _size -= size;
126  while (size) {
127  if (!_current.size()) {
128  _current = simple(reinterpret_cast<char*>((*_it).get_write()), (*_it).size());
129  _it++;
130  }
131  auto this_size = std::min(_current.size(), size);
132  func(_current.write_substream(this_size));
133  size -= this_size;
134  }
135  }
137  : _it(it), _current(bv), _size(size) { }
138 public:
139  using has_with_stream = std::false_type;
140  using iterator_type = Iterator;
141 
143 
144  fragmented_memory_output_stream(Iterator it, size_t size)
145  : _it(it), _size(size) {
146  }
147 
148  void skip(size_t size) {
149  for_each_fragment(size, [] (auto) { });
150  }
151  memory_output_stream<Iterator> write_substream(size_t size) {
152  if (size > _size) {
153  throw std::out_of_range("serialization buffer overflow");
154  }
155  if (_current.size() >= size) {
156  _size -= size;
157  return _current.write_substream(size);
158  }
159  fragmented_memory_output_stream substream(_it, _current, size);
160  skip(size);
161  return substream;
162  }
163  void write(const char* p, size_t size) {
164  for_each_fragment(size, [&p] (auto bv) {
165  std::copy_n(p, bv.size(), bv.begin());
166  p += bv.size();
167  });
168  }
169  void fill(char c, size_t size) {
170  for_each_fragment(size, [c] (simple fragment) {
171  std::fill_n(fragment.begin(), fragment.size(), c);
172  });
173  }
174  const size_t size() const {
175  return _size;
176  }
177 
178  // fragmented_memory_input_stream is a write cursor that keeps a mutable view of some
179  // underlying fragmented buffer and provides write interface. to_input_stream() converts
180  // it to a read cursor that points to the same part of the buffer but provides read interface.
181  fragmented_memory_input_stream<Iterator> to_input_stream() const;
182 };
183 
184 template<typename Iterator>
186 public:
189 
190 private:
191  const bool _is_simple;
192  using fragmented_type = fragmented;
193  union {
194  simple _simple;
195  fragmented_type _fragmented;
196  };
197 public:
198  template<typename StreamVisitor>
199  [[gnu::always_inline]]
200  decltype(auto) with_stream(StreamVisitor&& visitor) {
201  if (__builtin_expect(_is_simple, true)) {
202  return visitor(_simple);
203  }
204  return visitor(_fragmented);
205  }
206 
207  template<typename StreamVisitor>
208  [[gnu::always_inline]]
209  decltype(auto) with_stream(StreamVisitor&& visitor) const {
210  if (__builtin_expect(_is_simple, true)) {
211  return visitor(_simple);
212  }
213  return visitor(_fragmented);
214  }
215 public:
216  using has_with_stream = std::true_type;
217  using iterator_type = Iterator;
219  : _is_simple(true), _simple() {}
221  : _is_simple(true), _simple(std::move(stream)) {}
223  : _is_simple(false), _fragmented(std::move(stream)) {}
224 
225  [[gnu::always_inline]]
226  memory_output_stream(const memory_output_stream& other) noexcept : _is_simple(other._is_simple) {
227  // Making this copy constructor noexcept makes copy assignment simpler.
228  // Besides, performance of memory_output_stream relies on the fact that both
229  // fragmented and simple input stream are PODs and the branch below
230  // is optimized away, so throwable copy constructors aren't something
231  // we want.
232  static_assert(std::is_nothrow_copy_constructible<fragmented>::value,
233  "seastar::memory_output_stream::fragmented should be copy constructible");
234  static_assert(std::is_nothrow_copy_constructible<simple>::value,
235  "seastar::memory_output_stream::simple should be copy constructible");
236  if (_is_simple) {
237  new (&_simple) simple(other._simple);
238  } else {
239  new (&_fragmented) fragmented_type(other._fragmented);
240  }
241  }
242 
243  [[gnu::always_inline]]
244  memory_output_stream(memory_output_stream&& other) noexcept : _is_simple(other._is_simple) {
245  if (_is_simple) {
246  new (&_simple) simple(std::move(other._simple));
247  } else {
248  new (&_fragmented) fragmented_type(std::move(other._fragmented));
249  }
250  }
251 
252  [[gnu::always_inline]]
253  memory_output_stream& operator=(const memory_output_stream& other) noexcept {
254  // Copy constructor being noexcept makes copy assignment simpler.
255  static_assert(std::is_nothrow_copy_constructible<memory_output_stream>::value,
256  "memory_output_stream copy constructor shouldn't throw");
257  if (this != &other) {
258  this->~memory_output_stream();
259  new (this) memory_output_stream(other);
260  }
261  return *this;
262  }
263 
264  [[gnu::always_inline]]
265  memory_output_stream& operator=(memory_output_stream&& other) noexcept {
266  if (this != &other) {
267  this->~memory_output_stream();
268  new (this) memory_output_stream(std::move(other));
269  }
270  return *this;
271  }
272 
273  [[gnu::always_inline]]
275  if (_is_simple) {
276  _simple.~simple();
277  } else {
278  _fragmented.~fragmented_type();
279  }
280  }
281 
282  [[gnu::always_inline]]
283  void skip(size_t size) {
284  with_stream([size] (auto& stream) {
285  stream.skip(size);
286  });
287  }
288 
289  [[gnu::always_inline]]
290  memory_output_stream write_substream(size_t size) {
291  return with_stream([size] (auto& stream) -> memory_output_stream {
292  return stream.write_substream(size);
293  });
294  }
295 
296  [[gnu::always_inline]]
297  void write(const char* p, size_t size) {
298  with_stream([p, size] (auto& stream) {
299  stream.write(p, size);
300  });
301  }
302 
303  [[gnu::always_inline]]
304  void fill(char c, size_t size) {
305  with_stream([c, size] (auto& stream) {
306  stream.fill(c, size);
307  });
308  }
309 
310  [[gnu::always_inline]]
311  size_t size() const {
312  return with_stream([] (auto& stream) {
313  return stream.size();
314  });
315  }
316 
317  memory_input_stream<Iterator> to_input_stream() const;
318 };
319 
322 
323  const char* _p = nullptr;
324  size_t _size = 0;
325 public:
326  using has_with_stream = std::false_type;
327  simple_memory_input_stream() = default;
328  simple_memory_input_stream(const char* p, size_t size) : _p(p), _size(size) {}
329 
330  const char* begin() const { return _p; }
331 
332  [[gnu::always_inline]]
333  void skip(size_t size) {
334  if (size > _size) {
335  throw std::out_of_range("deserialization buffer underflow");
336  }
337  _p += size;
338  _size -= size;
339  }
340 
341  [[gnu::always_inline]]
342  simple read_substream(size_t size) {
343  if (size > _size) {
344  throw std::out_of_range("deserialization buffer underflow");
345  }
346  simple substream(_p, size);
347  skip(size);
348  return substream;
349  }
350 
351  [[gnu::always_inline]]
352  void read(char* p, size_t size) {
353  if (size > _size) {
354  throw std::out_of_range("deserialization buffer underflow");
355  }
356  std::copy_n(_p, size, p);
357  skip(size);
358  }
359 
360  template<typename Output>
361  [[gnu::always_inline]]
362  void copy_to(Output& out) const {
363  out.write(_p, _size);
364  }
365 
366  [[gnu::always_inline]]
367  const size_t size() const {
368  return _size;
369  }
370 };
371 
372 template<typename Iterator>
376 
377  Iterator _it;
378  simple _current;
379  size_t _size;
380 private:
381  template<typename Func>
382  //requires requires(Func f, view bv) { { f(bv) } -> void; }
383  void for_each_fragment(size_t size, Func&& func) {
384  if (size > _size) {
385  throw std::out_of_range("deserialization buffer underflow");
386  }
387  _size -= size;
388  while (size) {
389  if (!_current.size()) {
390  _current = simple(reinterpret_cast<const char*>((*_it).begin()), (*_it).size());
391  _it++;
392  }
393  auto this_size = std::min(_current.size(), size);
394  func(_current.read_substream(this_size));
395  size -= this_size;
396  }
397  }
398  fragmented_memory_input_stream(Iterator it, simple bv, size_t size)
399  : _it(it), _current(bv), _size(size) { }
400  friend class fragmented_memory_output_stream<Iterator>;
401 public:
402  using has_with_stream = std::false_type;
403  using iterator_type = Iterator;
404  fragmented_memory_input_stream(Iterator it, size_t size)
405  : _it(it), _size(size) {
406  }
407 
408  void skip(size_t size) {
409  for_each_fragment(size, [] (auto) { });
410  }
411  fragmented read_substream(size_t size) {
412  if (size > _size) {
413  throw std::out_of_range("deserialization buffer underflow");
414  }
415  fragmented substream(_it, _current, size);
416  skip(size);
417  return substream;
418  }
419  void read(char* p, size_t size) {
420  for_each_fragment(size, [&p] (auto bv) {
421  p = std::copy_n(bv.begin(), bv.size(), p);
422  });
423  }
424  template<typename Output>
425  void copy_to(Output& out) {
426  for_each_fragment(_size, [&out] (auto bv) {
427  bv.copy_to(out);
428  });
429  }
430  const size_t size() const {
431  return _size;
432  }
433 
434  const char* first_fragment_data() const { return _current.begin(); }
435  size_t first_fragment_size() const { return _current.size(); }
436  Iterator fragment_iterator() const { return _it; }
437 };
438 
439 /*
440 template<typename Visitor>
441 concept bool StreamVisitor() {
442  return requires(Visitor visitor, simple& simple, fragmented& fragmented) {
443  visitor(simple);
444  visitor(fragmented);
445  };
446 }
447 */
448 // memory_input_stream performs type erasure optimized for cases where
449 // simple is used.
450 // By using a lot of [[gnu::always_inline]] attributes this class attempts to
451 // make the compiler generate code with simple functions inlined
452 // directly in the user of the intput_stream.
453 template<typename Iterator>
455 public:
458 private:
459  const bool _is_simple;
460  using fragmented_type = fragmented;
461  union {
462  simple _simple;
463  fragmented_type _fragmented;
464  };
465 public:
466  template<typename StreamVisitor>
467  [[gnu::always_inline]]
468  decltype(auto) with_stream(StreamVisitor&& visitor) {
469  if (__builtin_expect(_is_simple, true)) {
470  return visitor(_simple);
471  }
472  return visitor(_fragmented);
473  }
474 
475  template<typename StreamVisitor>
476  [[gnu::always_inline]]
477  decltype(auto) with_stream(StreamVisitor&& visitor) const {
478  if (__builtin_expect(_is_simple, true)) {
479  return visitor(_simple);
480  }
481  return visitor(_fragmented);
482  }
483 public:
484  using has_with_stream = std::true_type;
485  using iterator_type = Iterator;
487  : _is_simple(true), _simple(std::move(stream)) {}
489  : _is_simple(false), _fragmented(std::move(stream)) {}
490 
491  [[gnu::always_inline]]
492  memory_input_stream(const memory_input_stream& other) noexcept : _is_simple(other._is_simple) {
493  // Making this copy constructor noexcept makes copy assignment simpler.
494  // Besides, performance of memory_input_stream relies on the fact that both
495  // fragmented and simple input stream are PODs and the branch below
496  // is optimized away, so throwable copy constructors aren't something
497  // we want.
498  static_assert(std::is_nothrow_copy_constructible<fragmented>::value,
499  "seastar::memory_input_stream::fragmented should be copy constructible");
500  static_assert(std::is_nothrow_copy_constructible<simple>::value,
501  "seastar::memory_input_stream::simple should be copy constructible");
502  if (_is_simple) {
503  new (&_simple) simple(other._simple);
504  } else {
505  new (&_fragmented) fragmented_type(other._fragmented);
506  }
507  }
508 
509  [[gnu::always_inline]]
510  memory_input_stream(memory_input_stream&& other) noexcept : _is_simple(other._is_simple) {
511  if (_is_simple) {
512  new (&_simple) simple(std::move(other._simple));
513  } else {
514  new (&_fragmented) fragmented_type(std::move(other._fragmented));
515  }
516  }
517 
518  [[gnu::always_inline]]
519  memory_input_stream& operator=(const memory_input_stream& other) noexcept {
520  // Copy constructor being noexcept makes copy assignment simpler.
521  static_assert(std::is_nothrow_copy_constructible<memory_input_stream>::value,
522  "memory_input_stream copy constructor shouldn't throw");
523  if (this != &other) {
524  this->~memory_input_stream();
525  new (this) memory_input_stream(other);
526  }
527  return *this;
528  }
529 
530  [[gnu::always_inline]]
531  memory_input_stream& operator=(memory_input_stream&& other) noexcept {
532  if (this != &other) {
533  this->~memory_input_stream();
534  new (this) memory_input_stream(std::move(other));
535  }
536  return *this;
537  }
538 
539  [[gnu::always_inline]]
541  if (_is_simple) {
542  _simple.~simple_memory_input_stream();
543  } else {
544  _fragmented.~fragmented_type();
545  }
546  }
547 
548  [[gnu::always_inline]]
549  void skip(size_t size) {
550  with_stream([size] (auto& stream) {
551  stream.skip(size);
552  });
553  }
554 
555  [[gnu::always_inline]]
556  memory_input_stream read_substream(size_t size) {
557  return with_stream([size] (auto& stream) -> memory_input_stream {
558  return stream.read_substream(size);
559  });
560  }
561 
562  [[gnu::always_inline]]
563  void read(char* p, size_t size) {
564  with_stream([p, size] (auto& stream) {
565  stream.read(p, size);
566  });
567  }
568 
569  template<typename Output>
570  [[gnu::always_inline]]
571  void copy_to(Output& out) {
572  with_stream([&out] (auto& stream) {
573  stream.copy_to(out);
574  });
575  }
576 
577  [[gnu::always_inline]]
578  size_t size() const {
579  return with_stream([] (auto& stream) {
580  return stream.size();
581  });
582  }
583 
584  template<typename Stream, typename StreamVisitor>
585  friend decltype(auto) with_serialized_stream(Stream& stream, StreamVisitor&& visitor);
586 };
587 
588 inline simple_memory_input_stream simple_memory_output_stream::to_input_stream() const {
589  return simple_memory_input_stream(_p, _size);
590 }
591 
592 template<typename Iterator>
593 inline fragmented_memory_input_stream<Iterator> fragmented_memory_output_stream<Iterator>::to_input_stream() const {
594  return fragmented_memory_input_stream<Iterator>(_it, _current.to_input_stream(), _size);
595 }
596 
597 template<typename Iterator>
598 inline memory_input_stream<Iterator> memory_output_stream<Iterator>::to_input_stream() const {
599  return with_stream(make_visitor(
600  [] (const simple_memory_output_stream& ostream) -> memory_input_stream<Iterator> {
601  return ostream.to_input_stream();
602  },
603  [] (const fragmented_memory_output_stream<Iterator>& ostream) -> memory_input_stream<Iterator> {
604  return ostream.to_input_stream();
605  }
606  ));
607 }
608 
609 // The purpose of the with_serialized_stream() is to minimize number of dynamic
610 // dispatches. For example, a lot of IDL-generated code looks like this:
611 // auto some_value() const {
612 // return seastar::with_serialized_stream(v, [] (auto& v) {
613 // auto in = v;
614 // ser::skip(in, boost::type<type1>());
615 // ser::skip(in, boost::type<type2>());
616 // return deserialize(in, boost::type<type3>());
617 // });
618 // }
619 // Using with_stream() there is at most one dynamic dispatch per such
620 // function, instead of one per each skip() and deserialize() call.
621 
622 template<typename Stream, typename StreamVisitor, typename = std::enable_if_t<Stream::has_with_stream::value>>
623 [[gnu::always_inline]]
624  static inline decltype(auto)
625  with_serialized_stream(Stream& stream, StreamVisitor&& visitor) {
626  return stream.with_stream(std::forward<StreamVisitor>(visitor));
627 }
628 
629 template<typename Stream, typename StreamVisitor, typename = std::enable_if_t<!Stream::has_with_stream::value>, typename = void>
630 [[gnu::always_inline]]
631  static inline decltype(auto)
632  with_serialized_stream(Stream& stream, StreamVisitor&& visitor) {
633  return visitor(stream);
634 }
635 
636 using simple_input_stream = simple_memory_input_stream;
637 using simple_output_stream = simple_memory_output_stream;
638 
639 }
Definition: simple-stream.hh:373
Definition: simple-stream.hh:110
Definition: simple-stream.hh:28
Definition: simple-stream.hh:454
Definition: simple-stream.hh:185
Definition: simple-stream.hh:320
Definition: simple-stream.hh:51
Definition: stream.hh:56
auto make_visitor(Args &&... args)
Definition: variant_utils.hh:52
Seastar API namespace.
Definition: abort_on_ebadf.hh:24