Seastar
High performance C++ framework for concurrent servers
simple-stream.hh
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18/*
19 * Copyright (C) 2016 Scylladb, Ltd.
20 */
21
22#pragma once
23
24#include <seastar/core/sstring.hh>
25#include <seastar/util/variant_utils.hh>
26#include <seastar/util/modules.hh>
27#ifndef SEASTAR_MODULE
28#include <algorithm>
29#include <cstddef>
30#include <stdexcept>
31#include <type_traits>
32#endif
33
34namespace seastar {
35
36SEASTAR_MODULE_EXPORT_BEGIN
37
39 size_t _size = 0;
40public:
41 void write(const char*, size_t size) {
42 _size += size;
43 }
44
45 size_t size() const {
46 return _size;
47 }
48};
49
50template<typename>
52
54
55template<typename Iterator>
57
58template<typename Iterator>
60
62 char* _p = nullptr;
63 size_t _size = 0;
64public:
65 using has_with_stream = std::false_type;
67 simple_memory_output_stream(char* p, size_t size, size_t start = 0) : _p(p + start), _size(size) {}
68 char* begin() { return _p; }
69
70 [[gnu::always_inline]]
71 void skip(size_t size) {
72 if (size > _size) {
73 throw std::out_of_range("serialization buffer overflow");
74 }
75 _p += size;
76 _size -= size;
77 }
78
79 [[gnu::always_inline]]
80 simple_memory_output_stream write_substream(size_t size) {
81 if (size > _size) {
82 throw std::out_of_range("serialization buffer overflow");
83 }
84 simple_memory_output_stream substream(_p, size);
85 skip(size);
86 return substream;
87 }
88
89 [[gnu::always_inline]]
90 void write(const char* p, size_t size) {
91 if (size > _size) {
92 throw std::out_of_range("serialization buffer overflow");
93 }
94 std::copy_n(p, size, _p);
95 skip(size);
96 }
97
98 [[gnu::always_inline]]
99 void fill(char c, size_t size) {
100 if (size > _size) {
101 throw std::out_of_range("serialization buffer overflow");
102 }
103 std::fill_n(_p, size, c);
104 skip(size);
105 }
106
107 [[gnu::always_inline]]
108 size_t size() const {
109 return _size;
110 }
111
112 // simple_memory_output_stream is a write cursor that keeps a mutable view of some
113 // underlying buffer and provides write interface. to_input_stream() converts it
114 // to a read cursor that points to the same part of the buffer but provides
115 // read interface.
116 simple_memory_input_stream to_input_stream() const;
117};
118
119template<typename Iterator>
122
123 Iterator _it;
124 simple _current;
125 size_t _size = 0;
126
127 friend class memory_input_stream<Iterator>;
128private:
129 template<typename Func>
130 //requires requires(Func f, view bv) { { f(bv) } -> void; }
131 void for_each_fragment(size_t size, Func&& func) {
132 if (size > _size) {
133 throw std::out_of_range("serialization buffer overflow");
134 }
135 _size -= size;
136 while (size) {
137 if (!_current.size()) {
138 _current = simple(reinterpret_cast<char*>((*_it).get_write()), (*_it).size());
139 _it++;
140 }
141 auto this_size = std::min(_current.size(), size);
142 func(_current.write_substream(this_size));
143 size -= this_size;
144 }
145 }
147 : _it(it), _current(bv), _size(size) { }
148public:
149 using has_with_stream = std::false_type;
150 using iterator_type = Iterator;
151
153
154 fragmented_memory_output_stream(Iterator it, size_t size)
155 : _it(it), _size(size) {
156 }
157
158 void skip(size_t size) {
159 for_each_fragment(size, [] (auto) { });
160 }
161 memory_output_stream<Iterator> write_substream(size_t size) {
162 if (size > _size) {
163 throw std::out_of_range("serialization buffer overflow");
164 }
165 if (_current.size() >= size) {
166 _size -= size;
167 return _current.write_substream(size);
168 }
169 fragmented_memory_output_stream substream(_it, _current, size);
170 skip(size);
171 return substream;
172 }
173 void write(const char* p, size_t size) {
174 for_each_fragment(size, [&p] (auto bv) {
175 std::copy_n(p, bv.size(), bv.begin());
176 p += bv.size();
177 });
178 }
179 void fill(char c, size_t size) {
180 for_each_fragment(size, [c] (simple fragment) {
181 std::fill_n(fragment.begin(), fragment.size(), c);
182 });
183 }
184 size_t size() const {
185 return _size;
186 }
187
188 // fragmented_memory_input_stream is a write cursor that keeps a mutable view of some
189 // underlying fragmented buffer and provides write interface. to_input_stream() converts
190 // it to a read cursor that points to the same part of the buffer but provides read interface.
191 fragmented_memory_input_stream<Iterator> to_input_stream() const;
192};
193
194template<typename Iterator>
196public:
199
200private:
201 const bool _is_simple;
203 union {
204 simple _simple;
205 fragmented_type _fragmented;
206 };
207public:
208 template<typename StreamVisitor>
209 [[gnu::always_inline]]
210 decltype(auto) with_stream(StreamVisitor&& visitor) {
211 if (__builtin_expect(_is_simple, true)) {
212 return visitor(_simple);
213 }
214 return visitor(_fragmented);
215 }
216
217 template<typename StreamVisitor>
218 [[gnu::always_inline]]
219 decltype(auto) with_stream(StreamVisitor&& visitor) const {
220 if (__builtin_expect(_is_simple, true)) {
221 return visitor(_simple);
222 }
223 return visitor(_fragmented);
224 }
225public:
226 using has_with_stream = std::true_type;
227 using iterator_type = Iterator;
229 : _is_simple(true), _simple() {}
231 : _is_simple(true), _simple(std::move(stream)) {}
233 : _is_simple(false), _fragmented(std::move(stream)) {}
234
235 [[gnu::always_inline]]
236 memory_output_stream(const memory_output_stream& other) noexcept : _is_simple(other._is_simple) {
237 // Making this copy constructor noexcept makes copy assignment simpler.
238 // Besides, performance of memory_output_stream relies on the fact that both
239 // fragmented and simple input stream are PODs and the branch below
240 // is optimized away, so throwable copy constructors aren't something
241 // we want.
242 static_assert(std::is_nothrow_copy_constructible_v<fragmented>,
243 "seastar::memory_output_stream::fragmented should be copy constructible");
244 static_assert(std::is_nothrow_copy_constructible_v<simple>,
245 "seastar::memory_output_stream::simple should be copy constructible");
246 if (_is_simple) {
247 new (&_simple) simple(other._simple);
248 } else {
249 new (&_fragmented) fragmented_type(other._fragmented);
250 }
251 }
252
253 [[gnu::always_inline]]
254 memory_output_stream(memory_output_stream&& other) noexcept : _is_simple(other._is_simple) {
255 if (_is_simple) {
256 new (&_simple) simple(std::move(other._simple));
257 } else {
258 new (&_fragmented) fragmented_type(std::move(other._fragmented));
259 }
260 }
261
262 [[gnu::always_inline]]
263 memory_output_stream& operator=(const memory_output_stream& other) noexcept {
264 // Copy constructor being noexcept makes copy assignment simpler.
265 static_assert(std::is_nothrow_copy_constructible_v<memory_output_stream>,
266 "memory_output_stream copy constructor shouldn't throw");
267 if (this != &other) {
268 this->~memory_output_stream();
269 new (this) memory_output_stream(other);
270 }
271 return *this;
272 }
273
274 [[gnu::always_inline]]
275 memory_output_stream& operator=(memory_output_stream&& other) noexcept {
276 if (this != &other) {
277 this->~memory_output_stream();
278 new (this) memory_output_stream(std::move(other));
279 }
280 return *this;
281 }
282
283 [[gnu::always_inline]]
285 if (_is_simple) {
286 _simple.~simple();
287 } else {
288 _fragmented.~fragmented_type();
289 }
290 }
291
292 [[gnu::always_inline]]
293 void skip(size_t size) {
294 with_stream([size] (auto& stream) {
295 stream.skip(size);
296 });
297 }
298
299 [[gnu::always_inline]]
300 memory_output_stream write_substream(size_t size) {
301 return with_stream([size] (auto& stream) -> memory_output_stream {
302 return stream.write_substream(size);
303 });
304 }
305
306 [[gnu::always_inline]]
307 void write(const char* p, size_t size) {
308 with_stream([p, size] (auto& stream) {
309 stream.write(p, size);
310 });
311 }
312
313 [[gnu::always_inline]]
314 void fill(char c, size_t size) {
315 with_stream([c, size] (auto& stream) {
316 stream.fill(c, size);
317 });
318 }
319
320 [[gnu::always_inline]]
321 size_t size() const {
322 return with_stream([] (auto& stream) {
323 return stream.size();
324 });
325 }
326
327 memory_input_stream<Iterator> to_input_stream() const;
328};
329
332
333 const char* _p = nullptr;
334 size_t _size = 0;
335public:
336 using has_with_stream = std::false_type;
337 simple_memory_input_stream() = default;
338 simple_memory_input_stream(const char* p, size_t size) : _p(p), _size(size) {}
339
340 const char* begin() const { return _p; }
341
342 [[gnu::always_inline]]
343 void skip(size_t size) {
344 if (size > _size) {
345 throw std::out_of_range("deserialization buffer underflow");
346 }
347 _p += size;
348 _size -= size;
349 }
350
351 [[gnu::always_inline]]
352 simple read_substream(size_t size) {
353 if (size > _size) {
354 throw std::out_of_range("deserialization buffer underflow");
355 }
356 simple substream(_p, size);
357 skip(size);
358 return substream;
359 }
360
361 [[gnu::always_inline]]
362 void read(char* p, size_t size) {
363 if (size > _size) {
364 throw std::out_of_range("deserialization buffer underflow");
365 }
366 std::copy_n(_p, size, p);
367 skip(size);
368 }
369
370 template<typename Output>
371 [[gnu::always_inline]]
372 void copy_to(Output& out) const {
373 out.write(_p, _size);
374 }
375
376 [[gnu::always_inline]]
377 size_t size() const {
378 return _size;
379 }
380};
381
382template<typename Iterator>
386
387 Iterator _it;
388 simple _current;
389 size_t _size;
390private:
391 template<typename Func>
392 //requires requires(Func f, view bv) { { f(bv) } -> void; }
393 void for_each_fragment(size_t size, Func&& func) {
394 if (size > _size) {
395 throw std::out_of_range("deserialization buffer underflow");
396 }
397 _size -= size;
398 while (size) {
399 if (!_current.size()) {
400 _current = simple(reinterpret_cast<const char*>((*_it).begin()), (*_it).size());
401 _it++;
402 }
403 auto this_size = std::min(_current.size(), size);
404 func(_current.read_substream(this_size));
405 size -= this_size;
406 }
407 }
408 fragmented_memory_input_stream(Iterator it, simple bv, size_t size)
409 : _it(it), _current(bv), _size(size) { }
410 friend class fragmented_memory_output_stream<Iterator>;
411public:
412 using has_with_stream = std::false_type;
413 using iterator_type = Iterator;
414 fragmented_memory_input_stream(Iterator it, size_t size)
415 : _it(it), _size(size) {
416 }
417
418 void skip(size_t size) {
419 for_each_fragment(size, [] (auto) { });
420 }
421 fragmented read_substream(size_t size) {
422 if (size > _size) {
423 throw std::out_of_range("deserialization buffer underflow");
424 }
425 fragmented substream(_it, _current, size);
426 skip(size);
427 return substream;
428 }
429 void read(char* p, size_t size) {
430 for_each_fragment(size, [&p] (auto bv) {
431 p = std::copy_n(bv.begin(), bv.size(), p);
432 });
433 }
434 template<typename Output>
435 void copy_to(Output& out) {
436 for_each_fragment(_size, [&out] (auto bv) {
437 bv.copy_to(out);
438 });
439 }
440 size_t size() const {
441 return _size;
442 }
443
444 const char* first_fragment_data() const { return _current.begin(); }
445 size_t first_fragment_size() const { return _current.size(); }
446 Iterator fragment_iterator() const { return _it; }
447};
448
449/*
450template<typename Visitor>
451concept bool StreamVisitor() {
452 return requires(Visitor visitor, simple& simple, fragmented& fragmented) {
453 visitor(simple);
454 visitor(fragmented);
455 };
456}
457*/
458// memory_input_stream performs type erasure optimized for cases where
459// simple is used.
460// By using a lot of [[gnu::always_inline]] attributes this class attempts to
461// make the compiler generate code with simple functions inlined
462// directly in the user of the intput_stream.
463template<typename Iterator>
465public:
468private:
469 const bool _is_simple;
471 union {
472 simple _simple;
473 fragmented_type _fragmented;
474 };
475public:
476 template<typename StreamVisitor>
477 [[gnu::always_inline]]
478 decltype(auto) with_stream(StreamVisitor&& visitor) {
479 if (__builtin_expect(_is_simple, true)) {
480 return visitor(_simple);
481 }
482 return visitor(_fragmented);
483 }
484
485 template<typename StreamVisitor>
486 [[gnu::always_inline]]
487 decltype(auto) with_stream(StreamVisitor&& visitor) const {
488 if (__builtin_expect(_is_simple, true)) {
489 return visitor(_simple);
490 }
491 return visitor(_fragmented);
492 }
493public:
494 using has_with_stream = std::true_type;
495 using iterator_type = Iterator;
497 : _is_simple(true), _simple(std::move(stream)) {}
499 : _is_simple(false), _fragmented(std::move(stream)) {}
500
501 [[gnu::always_inline]]
502 memory_input_stream(const memory_input_stream& other) noexcept : _is_simple(other._is_simple) {
503 // Making this copy constructor noexcept makes copy assignment simpler.
504 // Besides, performance of memory_input_stream relies on the fact that both
505 // fragmented and simple input stream are PODs and the branch below
506 // is optimized away, so throwable copy constructors aren't something
507 // we want.
508 static_assert(std::is_nothrow_copy_constructible_v<fragmented>,
509 "seastar::memory_input_stream::fragmented should be copy constructible");
510 static_assert(std::is_nothrow_copy_constructible_v<simple>,
511 "seastar::memory_input_stream::simple should be copy constructible");
512 if (_is_simple) {
513 new (&_simple) simple(other._simple);
514 } else {
515 new (&_fragmented) fragmented_type(other._fragmented);
516 }
517 }
518
519 [[gnu::always_inline]]
520 memory_input_stream(memory_input_stream&& other) noexcept : _is_simple(other._is_simple) {
521 if (_is_simple) {
522 new (&_simple) simple(std::move(other._simple));
523 } else {
524 new (&_fragmented) fragmented_type(std::move(other._fragmented));
525 }
526 }
527
528 [[gnu::always_inline]]
529 memory_input_stream& operator=(const memory_input_stream& other) noexcept {
530 // Copy constructor being noexcept makes copy assignment simpler.
531 static_assert(std::is_nothrow_copy_constructible_v<memory_input_stream>,
532 "memory_input_stream copy constructor shouldn't throw");
533 if (this != &other) {
534 this->~memory_input_stream();
535 new (this) memory_input_stream(other);
536 }
537 return *this;
538 }
539
540 [[gnu::always_inline]]
541 memory_input_stream& operator=(memory_input_stream&& other) noexcept {
542 if (this != &other) {
543 this->~memory_input_stream();
544 new (this) memory_input_stream(std::move(other));
545 }
546 return *this;
547 }
548
549 [[gnu::always_inline]]
551 if (_is_simple) {
552 _simple.~simple_memory_input_stream();
553 } else {
554 _fragmented.~fragmented_type();
555 }
556 }
557
558 [[gnu::always_inline]]
559 void skip(size_t size) {
560 with_stream([size] (auto& stream) {
561 stream.skip(size);
562 });
563 }
564
565 [[gnu::always_inline]]
566 memory_input_stream read_substream(size_t size) {
567 return with_stream([size] (auto& stream) -> memory_input_stream {
568 return stream.read_substream(size);
569 });
570 }
571
572 [[gnu::always_inline]]
573 void read(char* p, size_t size) {
574 with_stream([p, size] (auto& stream) {
575 stream.read(p, size);
576 });
577 }
578
579 template<typename Output>
580 [[gnu::always_inline]]
581 void copy_to(Output& out) {
582 with_stream([&out] (auto& stream) {
583 stream.copy_to(out);
584 });
585 }
586
587 [[gnu::always_inline]]
588 size_t size() const {
589 return with_stream([] (auto& stream) {
590 return stream.size();
591 });
592 }
593
594 template<typename Stream, typename StreamVisitor>
595 friend decltype(auto) with_serialized_stream(Stream& stream, StreamVisitor&& visitor);
596};
597
598SEASTAR_MODULE_EXPORT_END
599
600inline simple_memory_input_stream simple_memory_output_stream::to_input_stream() const {
601 return simple_memory_input_stream(_p, _size);
602}
603
604template<typename Iterator>
605inline fragmented_memory_input_stream<Iterator> fragmented_memory_output_stream<Iterator>::to_input_stream() const {
606 return fragmented_memory_input_stream<Iterator>(_it, _current.to_input_stream(), _size);
607}
608
609template<typename Iterator>
610inline memory_input_stream<Iterator> memory_output_stream<Iterator>::to_input_stream() const {
611 return with_stream(make_visitor(
612 [] (const simple_memory_output_stream& ostream) -> memory_input_stream<Iterator> {
613 return ostream.to_input_stream();
614 },
615 [] (const fragmented_memory_output_stream<Iterator>& ostream) -> memory_input_stream<Iterator> {
616 return ostream.to_input_stream();
617 }
618 ));
619}
620
621// The purpose of the with_serialized_stream() is to minimize number of dynamic
622// dispatches. For example, a lot of IDL-generated code looks like this:
623// auto some_value() const {
624// return seastar::with_serialized_stream(v, [] (auto& v) {
625// auto in = v;
626// ser::skip(in, boost::type<type1>());
627// ser::skip(in, boost::type<type2>());
628// return deserialize(in, boost::type<type3>());
629// });
630// }
631// Using with_stream() there is at most one dynamic dispatch per such
632// function, instead of one per each skip() and deserialize() call.
633
634SEASTAR_MODULE_EXPORT_BEGIN
635template<typename Stream, typename StreamVisitor, typename = std::enable_if_t<Stream::has_with_stream::value>>
636[[gnu::always_inline]]
637 inline decltype(auto)
638 with_serialized_stream(Stream& stream, StreamVisitor&& visitor) {
639 return stream.with_stream(std::forward<StreamVisitor>(visitor));
640}
641
642template<typename Stream, typename StreamVisitor, typename = std::enable_if_t<!Stream::has_with_stream::value>, typename = void>
643[[gnu::always_inline]]
644 inline decltype(auto)
645 with_serialized_stream(Stream& stream, StreamVisitor&& visitor) {
646 return visitor(stream);
647}
648
649using simple_input_stream = simple_memory_input_stream;
650using simple_output_stream = simple_memory_output_stream;
651
652SEASTAR_MODULE_EXPORT_END
653
654}
Definition: simple-stream.hh:383
Definition: simple-stream.hh:120
Definition: simple-stream.hh:38
Definition: simple-stream.hh:464
Definition: simple-stream.hh:195
Definition: simple-stream.hh:330
Definition: simple-stream.hh:61
Definition: stream.hh:60
auto make_visitor(Args &&... args)
Definition: variant_utils.hh:52
Seastar API namespace.
Definition: abort_on_ebadf.hh:26