Seastar
High performance C++ framework for concurrent servers
simple-stream.hh
1 /*
2  * This file is open source software, licensed to you under the terms
3  * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4  * distributed with this work for additional information regarding copyright
5  * ownership. You may not use this file except in compliance with the License.
6  *
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an
13  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14  * KIND, either express or implied. See the License for the
15  * specific language governing permissions and limitations
16  * under the License.
17  */
18 /*
19  * Copyright (C) 2016 Scylladb, Ltd.
20  */
21 
22 #pragma once
23 
24 #include <seastar/core/sstring.hh>
25 #include <seastar/util/variant_utils.hh>
26 #include <seastar/util/modules.hh>
27 #ifndef SEASTAR_MODULE
28 #include <algorithm>
29 #include <cstddef>
30 #include <stdexcept>
31 #include <type_traits>
32 #endif
33 
34 namespace seastar {
35 
36 SEASTAR_MODULE_EXPORT_BEGIN
37 
39  size_t _size = 0;
40 public:
41  void write(const char*, size_t size) {
42  _size += size;
43  }
44 
45  size_t size() const {
46  return _size;
47  }
48 };
49 
50 template<typename>
52 
54 
55 template<typename Iterator>
57 
58 template<typename Iterator>
60 
62  char* _p = nullptr;
63  size_t _size = 0;
64 public:
65  using has_with_stream = std::false_type;
67  simple_memory_output_stream(char* p, size_t size, size_t start = 0) : _p(p + start), _size(size) {}
68  char* begin() { return _p; }
69 
70  [[gnu::always_inline]]
71  void skip(size_t size) {
72  if (size > _size) {
73  throw std::out_of_range("serialization buffer overflow");
74  }
75  _p += size;
76  _size -= size;
77  }
78 
79  [[gnu::always_inline]]
80  simple_memory_output_stream write_substream(size_t size) {
81  if (size > _size) {
82  throw std::out_of_range("serialization buffer overflow");
83  }
84  simple_memory_output_stream substream(_p, size);
85  skip(size);
86  return substream;
87  }
88 
89  [[gnu::always_inline]]
90  void write(const char* p, size_t size) {
91  if (size > _size) {
92  throw std::out_of_range("serialization buffer overflow");
93  }
94  std::copy_n(p, size, _p);
95  skip(size);
96  }
97 
98  [[gnu::always_inline]]
99  void fill(char c, size_t size) {
100  if (size > _size) {
101  throw std::out_of_range("serialization buffer overflow");
102  }
103  std::fill_n(_p, size, c);
104  skip(size);
105  }
106 
107  [[gnu::always_inline]]
108  size_t size() const {
109  return _size;
110  }
111 
112  // simple_memory_output_stream is a write cursor that keeps a mutable view of some
113  // underlying buffer and provides write interface. to_input_stream() converts it
114  // to a read cursor that points to the same part of the buffer but provides
115  // read interface.
116  simple_memory_input_stream to_input_stream() const;
117 };
118 
119 template<typename Iterator>
122 
123  Iterator _it;
124  simple _current;
125  size_t _size = 0;
126 
127  friend class memory_input_stream<Iterator>;
128 private:
129  template<typename Func>
130  //requires requires(Func f, view bv) { { f(bv) } -> void; }
131  void for_each_fragment(size_t size, Func&& func) {
132  if (size > _size) {
133  throw std::out_of_range("serialization buffer overflow");
134  }
135  _size -= size;
136  while (size) {
137  if (!_current.size()) {
138  _current = simple(reinterpret_cast<char*>((*_it).get_write()), (*_it).size());
139  _it++;
140  }
141  auto this_size = std::min(_current.size(), size);
142  func(_current.write_substream(this_size));
143  size -= this_size;
144  }
145  }
147  : _it(it), _current(bv), _size(size) { }
148 public:
149  using has_with_stream = std::false_type;
150  using iterator_type = Iterator;
151 
153 
154  fragmented_memory_output_stream(Iterator it, size_t size)
155  : _it(it), _size(size) {
156  }
157 
158  void skip(size_t size) {
159  for_each_fragment(size, [] (auto) { });
160  }
161  memory_output_stream<Iterator> write_substream(size_t size) {
162  if (size > _size) {
163  throw std::out_of_range("serialization buffer overflow");
164  }
165  if (_current.size() >= size) {
166  _size -= size;
167  return _current.write_substream(size);
168  }
169  fragmented_memory_output_stream substream(_it, _current, size);
170  skip(size);
171  return substream;
172  }
173  void write(const char* p, size_t size) {
174  for_each_fragment(size, [&p] (auto bv) {
175  std::copy_n(p, bv.size(), bv.begin());
176  p += bv.size();
177  });
178  }
179  void fill(char c, size_t size) {
180  for_each_fragment(size, [c] (simple fragment) {
181  std::fill_n(fragment.begin(), fragment.size(), c);
182  });
183  }
184  size_t size() const {
185  return _size;
186  }
187 
188  // fragmented_memory_input_stream is a write cursor that keeps a mutable view of some
189  // underlying fragmented buffer and provides write interface. to_input_stream() converts
190  // it to a read cursor that points to the same part of the buffer but provides read interface.
191  fragmented_memory_input_stream<Iterator> to_input_stream() const;
192 };
193 
194 template<typename Iterator>
196 public:
199 
200 private:
201  const bool _is_simple;
202  using fragmented_type = fragmented;
203  union {
204  simple _simple;
205  fragmented_type _fragmented;
206  };
207 public:
208  template<typename StreamVisitor>
209  [[gnu::always_inline]]
210  decltype(auto) with_stream(StreamVisitor&& visitor) {
211  if (__builtin_expect(_is_simple, true)) {
212  return visitor(_simple);
213  }
214  return visitor(_fragmented);
215  }
216 
217  template<typename StreamVisitor>
218  [[gnu::always_inline]]
219  decltype(auto) with_stream(StreamVisitor&& visitor) const {
220  if (__builtin_expect(_is_simple, true)) {
221  return visitor(_simple);
222  }
223  return visitor(_fragmented);
224  }
225 public:
226  using has_with_stream = std::true_type;
227  using iterator_type = Iterator;
229  : _is_simple(true), _simple() {}
231  : _is_simple(true), _simple(std::move(stream)) {}
233  : _is_simple(false), _fragmented(std::move(stream)) {}
234 
235  [[gnu::always_inline]]
236  memory_output_stream(const memory_output_stream& other) noexcept : _is_simple(other._is_simple) {
237  // Making this copy constructor noexcept makes copy assignment simpler.
238  // Besides, performance of memory_output_stream relies on the fact that both
239  // fragmented and simple input stream are PODs and the branch below
240  // is optimized away, so throwable copy constructors aren't something
241  // we want.
242  static_assert(std::is_nothrow_copy_constructible_v<fragmented>,
243  "seastar::memory_output_stream::fragmented should be copy constructible");
244  static_assert(std::is_nothrow_copy_constructible_v<simple>,
245  "seastar::memory_output_stream::simple should be copy constructible");
246  if (_is_simple) {
247  new (&_simple) simple(other._simple);
248  } else {
249  new (&_fragmented) fragmented_type(other._fragmented);
250  }
251  }
252 
253  [[gnu::always_inline]]
254  memory_output_stream(memory_output_stream&& other) noexcept : _is_simple(other._is_simple) {
255  if (_is_simple) {
256  new (&_simple) simple(std::move(other._simple));
257  } else {
258  new (&_fragmented) fragmented_type(std::move(other._fragmented));
259  }
260  }
261 
262  [[gnu::always_inline]]
263  memory_output_stream& operator=(const memory_output_stream& other) noexcept {
264  // Copy constructor being noexcept makes copy assignment simpler.
265  static_assert(std::is_nothrow_copy_constructible_v<memory_output_stream>,
266  "memory_output_stream copy constructor shouldn't throw");
267  if (this != &other) {
268  this->~memory_output_stream();
269  new (this) memory_output_stream(other);
270  }
271  return *this;
272  }
273 
274  [[gnu::always_inline]]
275  memory_output_stream& operator=(memory_output_stream&& other) noexcept {
276  if (this != &other) {
277  this->~memory_output_stream();
278  new (this) memory_output_stream(std::move(other));
279  }
280  return *this;
281  }
282 
283  [[gnu::always_inline]]
285  if (_is_simple) {
286  _simple.~simple();
287  } else {
288  _fragmented.~fragmented_type();
289  }
290  }
291 
292  [[gnu::always_inline]]
293  void skip(size_t size) {
294  with_stream([size] (auto& stream) {
295  stream.skip(size);
296  });
297  }
298 
299  [[gnu::always_inline]]
300  memory_output_stream write_substream(size_t size) {
301  return with_stream([size] (auto& stream) -> memory_output_stream {
302  return stream.write_substream(size);
303  });
304  }
305 
306  [[gnu::always_inline]]
307  void write(const char* p, size_t size) {
308  with_stream([p, size] (auto& stream) {
309  stream.write(p, size);
310  });
311  }
312 
313  [[gnu::always_inline]]
314  void fill(char c, size_t size) {
315  with_stream([c, size] (auto& stream) {
316  stream.fill(c, size);
317  });
318  }
319 
320  [[gnu::always_inline]]
321  size_t size() const {
322  return with_stream([] (auto& stream) {
323  return stream.size();
324  });
325  }
326 
327  memory_input_stream<Iterator> to_input_stream() const;
328 };
329 
332 
333  const char* _p = nullptr;
334  size_t _size = 0;
335 public:
336  using has_with_stream = std::false_type;
337  simple_memory_input_stream() = default;
338  simple_memory_input_stream(const char* p, size_t size) : _p(p), _size(size) {}
339 
340  const char* begin() const { return _p; }
341 
342  [[gnu::always_inline]]
343  void skip(size_t size) {
344  if (size > _size) {
345  throw std::out_of_range("deserialization buffer underflow");
346  }
347  _p += size;
348  _size -= size;
349  }
350 
351  [[gnu::always_inline]]
352  simple read_substream(size_t size) {
353  if (size > _size) {
354  throw std::out_of_range("deserialization buffer underflow");
355  }
356  simple substream(_p, size);
357  skip(size);
358  return substream;
359  }
360 
361  [[gnu::always_inline]]
362  void read(char* p, size_t size) {
363  if (size > _size) {
364  throw std::out_of_range("deserialization buffer underflow");
365  }
366  std::copy_n(_p, size, p);
367  skip(size);
368  }
369 
370  template<typename Output>
371  [[gnu::always_inline]]
372  void copy_to(Output& out) const {
373  out.write(_p, _size);
374  }
375 
376  [[gnu::always_inline]]
377  size_t size() const {
378  return _size;
379  }
380 };
381 
382 template<typename Iterator>
386 
387  Iterator _it;
388  simple _current;
389  size_t _size;
390 private:
391  template<typename Func>
392  //requires requires(Func f, view bv) { { f(bv) } -> void; }
393  void for_each_fragment(size_t size, Func&& func) {
394  if (size > _size) {
395  throw std::out_of_range("deserialization buffer underflow");
396  }
397  _size -= size;
398  while (size) {
399  if (!_current.size()) {
400  _current = simple(reinterpret_cast<const char*>((*_it).begin()), (*_it).size());
401  _it++;
402  }
403  auto this_size = std::min(_current.size(), size);
404  func(_current.read_substream(this_size));
405  size -= this_size;
406  }
407  }
408  fragmented_memory_input_stream(Iterator it, simple bv, size_t size)
409  : _it(it), _current(bv), _size(size) { }
410  friend class fragmented_memory_output_stream<Iterator>;
411 public:
412  using has_with_stream = std::false_type;
413  using iterator_type = Iterator;
414  fragmented_memory_input_stream(Iterator it, size_t size)
415  : _it(it), _size(size) {
416  }
417 
418  void skip(size_t size) {
419  for_each_fragment(size, [] (auto) { });
420  }
421  fragmented read_substream(size_t size) {
422  if (size > _size) {
423  throw std::out_of_range("deserialization buffer underflow");
424  }
425  fragmented substream(_it, _current, size);
426  skip(size);
427  return substream;
428  }
429  void read(char* p, size_t size) {
430  for_each_fragment(size, [&p] (auto bv) {
431  p = std::copy_n(bv.begin(), bv.size(), p);
432  });
433  }
434  template<typename Output>
435  void copy_to(Output& out) {
436  for_each_fragment(_size, [&out] (auto bv) {
437  bv.copy_to(out);
438  });
439  }
440  size_t size() const {
441  return _size;
442  }
443 
444  const char* first_fragment_data() const { return _current.begin(); }
445  size_t first_fragment_size() const { return _current.size(); }
446  Iterator fragment_iterator() const { return _it; }
447 };
448 
449 /*
450 template<typename Visitor>
451 concept bool StreamVisitor() {
452  return requires(Visitor visitor, simple& simple, fragmented& fragmented) {
453  visitor(simple);
454  visitor(fragmented);
455  };
456 }
457 */
458 // memory_input_stream performs type erasure optimized for cases where
459 // simple is used.
460 // By using a lot of [[gnu::always_inline]] attributes this class attempts to
461 // make the compiler generate code with simple functions inlined
462 // directly in the user of the intput_stream.
463 template<typename Iterator>
465 public:
468 private:
469  const bool _is_simple;
470  using fragmented_type = fragmented;
471  union {
472  simple _simple;
473  fragmented_type _fragmented;
474  };
475 public:
476  template<typename StreamVisitor>
477  [[gnu::always_inline]]
478  decltype(auto) with_stream(StreamVisitor&& visitor) {
479  if (__builtin_expect(_is_simple, true)) {
480  return visitor(_simple);
481  }
482  return visitor(_fragmented);
483  }
484 
485  template<typename StreamVisitor>
486  [[gnu::always_inline]]
487  decltype(auto) with_stream(StreamVisitor&& visitor) const {
488  if (__builtin_expect(_is_simple, true)) {
489  return visitor(_simple);
490  }
491  return visitor(_fragmented);
492  }
493 public:
494  using has_with_stream = std::true_type;
495  using iterator_type = Iterator;
497  : _is_simple(true), _simple(std::move(stream)) {}
499  : _is_simple(false), _fragmented(std::move(stream)) {}
500 
501  [[gnu::always_inline]]
502  memory_input_stream(const memory_input_stream& other) noexcept : _is_simple(other._is_simple) {
503  // Making this copy constructor noexcept makes copy assignment simpler.
504  // Besides, performance of memory_input_stream relies on the fact that both
505  // fragmented and simple input stream are PODs and the branch below
506  // is optimized away, so throwable copy constructors aren't something
507  // we want.
508  static_assert(std::is_nothrow_copy_constructible_v<fragmented>,
509  "seastar::memory_input_stream::fragmented should be copy constructible");
510  static_assert(std::is_nothrow_copy_constructible_v<simple>,
511  "seastar::memory_input_stream::simple should be copy constructible");
512  if (_is_simple) {
513  new (&_simple) simple(other._simple);
514  } else {
515  new (&_fragmented) fragmented_type(other._fragmented);
516  }
517  }
518 
519  [[gnu::always_inline]]
520  memory_input_stream(memory_input_stream&& other) noexcept : _is_simple(other._is_simple) {
521  if (_is_simple) {
522  new (&_simple) simple(std::move(other._simple));
523  } else {
524  new (&_fragmented) fragmented_type(std::move(other._fragmented));
525  }
526  }
527 
528  [[gnu::always_inline]]
529  memory_input_stream& operator=(const memory_input_stream& other) noexcept {
530  // Copy constructor being noexcept makes copy assignment simpler.
531  static_assert(std::is_nothrow_copy_constructible_v<memory_input_stream>,
532  "memory_input_stream copy constructor shouldn't throw");
533  if (this != &other) {
534  this->~memory_input_stream();
535  new (this) memory_input_stream(other);
536  }
537  return *this;
538  }
539 
540  [[gnu::always_inline]]
541  memory_input_stream& operator=(memory_input_stream&& other) noexcept {
542  if (this != &other) {
543  this->~memory_input_stream();
544  new (this) memory_input_stream(std::move(other));
545  }
546  return *this;
547  }
548 
549  [[gnu::always_inline]]
551  if (_is_simple) {
552  _simple.~simple_memory_input_stream();
553  } else {
554  _fragmented.~fragmented_type();
555  }
556  }
557 
558  [[gnu::always_inline]]
559  void skip(size_t size) {
560  with_stream([size] (auto& stream) {
561  stream.skip(size);
562  });
563  }
564 
565  [[gnu::always_inline]]
566  memory_input_stream read_substream(size_t size) {
567  return with_stream([size] (auto& stream) -> memory_input_stream {
568  return stream.read_substream(size);
569  });
570  }
571 
572  [[gnu::always_inline]]
573  void read(char* p, size_t size) {
574  with_stream([p, size] (auto& stream) {
575  stream.read(p, size);
576  });
577  }
578 
579  template<typename Output>
580  [[gnu::always_inline]]
581  void copy_to(Output& out) {
582  with_stream([&out] (auto& stream) {
583  stream.copy_to(out);
584  });
585  }
586 
587  [[gnu::always_inline]]
588  size_t size() const {
589  return with_stream([] (auto& stream) {
590  return stream.size();
591  });
592  }
593 
594  template<typename Stream, typename StreamVisitor>
595  friend decltype(auto) with_serialized_stream(Stream& stream, StreamVisitor&& visitor);
596 };
597 
598 SEASTAR_MODULE_EXPORT_END
599 
600 inline simple_memory_input_stream simple_memory_output_stream::to_input_stream() const {
601  return simple_memory_input_stream(_p, _size);
602 }
603 
604 template<typename Iterator>
605 inline fragmented_memory_input_stream<Iterator> fragmented_memory_output_stream<Iterator>::to_input_stream() const {
606  return fragmented_memory_input_stream<Iterator>(_it, _current.to_input_stream(), _size);
607 }
608 
609 template<typename Iterator>
610 inline memory_input_stream<Iterator> memory_output_stream<Iterator>::to_input_stream() const {
611  return with_stream(make_visitor(
612  [] (const simple_memory_output_stream& ostream) -> memory_input_stream<Iterator> {
613  return ostream.to_input_stream();
614  },
615  [] (const fragmented_memory_output_stream<Iterator>& ostream) -> memory_input_stream<Iterator> {
616  return ostream.to_input_stream();
617  }
618  ));
619 }
620 
621 // The purpose of the with_serialized_stream() is to minimize number of dynamic
622 // dispatches. For example, a lot of IDL-generated code looks like this:
623 // auto some_value() const {
624 // return seastar::with_serialized_stream(v, [] (auto& v) {
625 // auto in = v;
626 // ser::skip(in, boost::type<type1>());
627 // ser::skip(in, boost::type<type2>());
628 // return deserialize(in, boost::type<type3>());
629 // });
630 // }
631 // Using with_stream() there is at most one dynamic dispatch per such
632 // function, instead of one per each skip() and deserialize() call.
633 
634 SEASTAR_MODULE_EXPORT_BEGIN
635 template<typename Stream, typename StreamVisitor, typename = std::enable_if_t<Stream::has_with_stream::value>>
636 [[gnu::always_inline]]
637  inline decltype(auto)
638  with_serialized_stream(Stream& stream, StreamVisitor&& visitor) {
639  return stream.with_stream(std::forward<StreamVisitor>(visitor));
640 }
641 
642 template<typename Stream, typename StreamVisitor, typename = std::enable_if_t<!Stream::has_with_stream::value>, typename = void>
643 [[gnu::always_inline]]
644  inline decltype(auto)
645  with_serialized_stream(Stream& stream, StreamVisitor&& visitor) {
646  return visitor(stream);
647 }
648 
649 using simple_input_stream = simple_memory_input_stream;
650 using simple_output_stream = simple_memory_output_stream;
651 
652 SEASTAR_MODULE_EXPORT_END
653 
654 }
Definition: simple-stream.hh:383
Definition: simple-stream.hh:120
Definition: simple-stream.hh:38
Definition: simple-stream.hh:464
Definition: simple-stream.hh:195
Definition: simple-stream.hh:330
Definition: simple-stream.hh:61
Definition: stream.hh:61
auto make_visitor(Args &&... args)
Definition: variant_utils.hh:52
Seastar API namespace.
Definition: abort_on_ebadf.hh:26