Seastar
High performance C++ framework for concurrent servers
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Modules Pages
packet.hh
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18/*
19 * Copyright (C) 2014 Cloudius Systems, Ltd.
20 */
21
22#pragma once
23
24#include <seastar/core/deleter.hh>
25#include <seastar/core/temporary_buffer.hh>
26#include <seastar/net/const.hh>
27#include <seastar/util/std-compat.hh>
28#include <seastar/util/modules.hh>
29#ifndef SEASTAR_MODULE
30#include <algorithm>
31#include <cassert>
32#include <cstdint>
33#include <functional>
34#include <iosfwd>
35#include <memory>
36#include <optional>
37#include <vector>
38#endif
39
40namespace seastar {
41
42namespace net {
43
44SEASTAR_MODULE_EXPORT_BEGIN
45
46struct fragment {
47 char* base;
48 size_t size;
49};
50
52 ip_protocol_num protocol = ip_protocol_num::unused;
53 bool needs_csum = false;
54 uint8_t ip_hdr_len = 20;
55 uint8_t tcp_hdr_len = 20;
56 uint8_t udp_hdr_len = 8;
57 bool needs_ip_csum = false;
58 bool reassembled = false;
59 uint16_t tso_seg_size = 0;
60 // HW stripped VLAN header (CPU order)
61 std::optional<uint16_t> vlan_tci;
62};
63
64// Zero-copy friendly packet class
65//
66// For implementing zero-copy, we need a flexible destructor that can
67// destroy packet data in different ways: decrementing a reference count,
68// or calling a free()-like function.
69//
70// Moreover, we need different destructors for each set of fragments within
71// a single fragment. For example, a header and trailer might need delete[]
72// to be called, while the internal data needs a reference count to be
73// released. Matters are complicated in that fragments can be split
74// (due to virtual/physical translation).
75//
76// To implement this, we associate each packet with a single destructor,
77// but allow composing a packet from another packet plus a fragment to
78// be added, with its own destructor, causing the destructors to be chained.
79//
80// The downside is that the data needed for the destructor is duplicated,
81// if it is already available in the fragment itself.
82//
83// As an optimization, when we allocate small fragments, we allocate some
84// extra space, so prepending to the packet does not require extra
85// allocations. This is useful when adding headers.
86//
87class packet final {
88 // enough for lots of headers, not quite two cache lines:
89 static constexpr size_t internal_data_size = 128 - 16;
90 static constexpr size_t default_nr_frags = 4;
91
92 struct pseudo_vector {
93 fragment* _start;
94 fragment* _finish;
95 pseudo_vector(fragment* start, size_t nr) noexcept
96 : _start(start), _finish(_start + nr) {}
97 fragment* begin() noexcept { return _start; }
98 fragment* end() noexcept { return _finish; }
99 fragment& operator[](size_t idx) noexcept { return _start[idx]; }
100 };
101
102 struct impl {
103 // when destroyed, virtual destructor will reclaim resources
104 deleter _deleter;
105 unsigned _len = 0;
106 uint16_t _nr_frags = 0;
107 uint16_t _allocated_frags;
108 offload_info _offload_info;
109 std::optional<uint32_t> _rss_hash;
110 char _data[internal_data_size]; // only _frags[0] may use
111 unsigned _headroom = internal_data_size; // in _data
112 // FIXME: share _data/_frags space
113
114 fragment _frags[];
115
116 impl(size_t nr_frags = default_nr_frags) noexcept;
117 impl(const impl&) = delete;
118 impl(fragment frag, size_t nr_frags = default_nr_frags);
119
120 pseudo_vector fragments() noexcept { return { _frags, _nr_frags }; }
121
122 static std::unique_ptr<impl> allocate(size_t nr_frags) {
123 nr_frags = std::max(nr_frags, default_nr_frags);
124 return std::unique_ptr<impl>(new (nr_frags) impl(nr_frags));
125 }
126
127 static std::unique_ptr<impl> copy(impl* old, size_t nr) {
128 auto n = allocate(nr);
129 n->_deleter = std::move(old->_deleter);
130 n->_len = old->_len;
131 n->_nr_frags = old->_nr_frags;
132 n->_headroom = old->_headroom;
133 n->_offload_info = old->_offload_info;
134 n->_rss_hash = old->_rss_hash;
135 std::copy(old->_frags, old->_frags + old->_nr_frags, n->_frags);
136 old->copy_internal_fragment_to(n.get());
137 return n;
138 }
139
140 static std::unique_ptr<impl> copy(impl* old) {
141 return copy(old, old->_nr_frags);
142 }
143
144 static std::unique_ptr<impl> allocate_if_needed(std::unique_ptr<impl> old, size_t extra_frags) {
145 if (old->_allocated_frags >= old->_nr_frags + extra_frags) {
146 return old;
147 }
148 return copy(old.get(), std::max<size_t>(old->_nr_frags + extra_frags, 2 * old->_nr_frags));
149 }
150 void* operator new(size_t size, size_t nr_frags = default_nr_frags) {
151 assert(nr_frags == uint16_t(nr_frags));
152 return ::operator new(size + nr_frags * sizeof(fragment));
153 }
154 // Matching the operator new above
155 void operator delete(void* ptr, size_t) {
156 return ::operator delete(ptr);
157 }
158 // Since the above "placement delete" hides the global one, expose it
159 void operator delete(void* ptr) {
160 return ::operator delete(ptr);
161 }
162
163 bool using_internal_data() const noexcept {
164 return _nr_frags
165 && _frags[0].base >= _data
166 && _frags[0].base < _data + internal_data_size;
167 }
168
169 void unuse_internal_data() {
170 if (!using_internal_data()) {
171 return;
172 }
173 auto buf = static_cast<char*>(::malloc(_frags[0].size));
174 if (!buf) {
175 throw std::bad_alloc();
176 }
177 deleter d = make_free_deleter(buf);
178 std::copy(_frags[0].base, _frags[0].base + _frags[0].size, buf);
179 _frags[0].base = buf;
180 d.append(std::move(_deleter));
181 _deleter = std::move(d);
182 _headroom = internal_data_size;
183 }
184 void copy_internal_fragment_to(impl* to) noexcept {
185 if (!using_internal_data()) {
186 return;
187 }
188 to->_frags[0].base = to->_data + _headroom;
189 std::copy(_frags[0].base, _frags[0].base + _frags[0].size,
190 to->_frags[0].base);
191 }
192 };
193 packet(std::unique_ptr<impl>&& impl) noexcept : _impl(std::move(impl)) {}
194 std::unique_ptr<impl> _impl;
195public:
196 static packet from_static_data(const char* data, size_t len) noexcept {
197 return {fragment{const_cast<char*>(data), len}, deleter()};
198 }
199
200 // build empty packet
201 packet();
202 // build empty packet with nr_frags allocated
203 packet(size_t nr_frags);
204 // move existing packet
205 packet(packet&& x) noexcept;
206 // copy data into packet
207 packet(const char* data, size_t len);
208 // copy data into packet
209 packet(fragment frag);
210 // zero-copy single fragment
211 packet(fragment frag, deleter del);
212 // zero-copy multiple fragments
213 packet(std::vector<fragment> frag, deleter del);
214 // build packet with iterator
215 template <typename Iterator>
216 packet(Iterator begin, Iterator end, deleter del);
217 // append fragment (copying new fragment)
218 packet(packet&& x, fragment frag);
219 // prepend fragment (copying new fragment, with header optimization)
220 packet(fragment frag, packet&& x);
221 // prepend fragment (zero-copy)
222 packet(fragment frag, deleter del, packet&& x);
223 // append fragment (zero-copy)
224 packet(packet&& x, fragment frag, deleter d);
225 // append temporary_buffer (zero-copy)
227 // create from temporary_buffer (zero-copy)
229 // append deleter
230 packet(packet&& x, deleter d);
231
232 packet& operator=(packet&& x) noexcept {
233 if (this != &x) {
234 this->~packet();
235 new (this) packet(std::move(x));
236 }
237 return *this;
238 }
239
240 unsigned len() const noexcept { return _impl->_len; }
241 unsigned memory() const noexcept { return len() + sizeof(packet::impl); }
242
243 fragment frag(unsigned idx) const noexcept { return _impl->_frags[idx]; }
244 fragment& frag(unsigned idx) noexcept { return _impl->_frags[idx]; }
245
246 unsigned nr_frags() const noexcept { return _impl->_nr_frags; }
247 pseudo_vector fragments() const noexcept { return { _impl->_frags, _impl->_nr_frags }; }
248 fragment* fragment_array() const noexcept { return _impl->_frags; }
249
250 // share packet data (reference counted, non COW)
251 packet share();
252 packet share(size_t offset, size_t len);
253
254 void append(packet&& p);
255
256 void trim_front(size_t how_much) noexcept;
257 void trim_back(size_t how_much) noexcept;
258
259 // get a header pointer, linearizing if necessary
260 template <typename Header>
261 Header* get_header(size_t offset = 0);
262
263 // get a header pointer, linearizing if necessary
264 char* get_header(size_t offset, size_t size);
265
266 // prepend a header (default-initializing it)
267 template <typename Header>
268 Header* prepend_header(size_t extra_size = 0);
269
270 // prepend a header (uninitialized!)
271 char* prepend_uninitialized_header(size_t size);
272
273 packet free_on_cpu(unsigned cpu, std::function<void()> cb = []{});
274
275 void linearize() { return linearize(0, len()); }
276
277 void reset() noexcept { _impl.reset(); }
278
279 void reserve(int n_frags) {
280 if (n_frags > _impl->_nr_frags) {
281 auto extra = n_frags - _impl->_nr_frags;
282 _impl = impl::allocate_if_needed(std::move(_impl), extra);
283 }
284 }
285 std::optional<uint32_t> rss_hash() const noexcept {
286 return _impl->_rss_hash;
287 }
288 std::optional<uint32_t> set_rss_hash(uint32_t hash) noexcept {
289 return _impl->_rss_hash = hash;
290 }
291 // Call `func` for each fragment, avoiding data copies when possible
292 // `func` is called with a temporary_buffer<char> parameter
293 template <typename Func>
294 void release_into(Func&& func) {
295 unsigned idx = 0;
296 if (_impl->using_internal_data()) {
297 auto&& f = frag(idx++);
298 func(temporary_buffer<char>(f.base, f.size));
299 }
300 while (idx < nr_frags()) {
301 auto&& f = frag(idx++);
302 func(temporary_buffer<char>(f.base, f.size, _impl->_deleter.share()));
303 }
304 }
305 std::vector<temporary_buffer<char>> release() {
306 std::vector<temporary_buffer<char>> ret;
307 ret.reserve(_impl->_nr_frags);
308 release_into([&ret] (temporary_buffer<char>&& frag) {
309 ret.push_back(std::move(frag));
310 });
311 return ret;
312 }
313 explicit operator bool() noexcept {
314 return bool(_impl);
315 }
316 static packet make_null_packet() noexcept {
317 return net::packet(nullptr);
318 }
319private:
320 void linearize(size_t at_frag, size_t desired_size);
321 bool allocate_headroom(size_t size);
322public:
323 struct offload_info get_offload_info() const noexcept { return _impl->_offload_info; }
324 struct offload_info& offload_info_ref() noexcept { return _impl->_offload_info; }
325 void set_offload_info(struct offload_info oi) noexcept { _impl->_offload_info = oi; }
326};
327
328std::ostream& operator<<(std::ostream& os, const packet& p);
329
330SEASTAR_MODULE_EXPORT_END
331
332inline
333packet::packet(packet&& x) noexcept
334 : _impl(std::move(x._impl)) {
335}
336
337inline
338packet::impl::impl(size_t nr_frags) noexcept
339 : _len(0), _allocated_frags(nr_frags) {
340}
341
342inline
343packet::impl::impl(fragment frag, size_t nr_frags)
344 : _len(frag.size), _allocated_frags(nr_frags) {
345 assert(_allocated_frags > _nr_frags);
346 if (frag.size <= internal_data_size) {
347 _headroom -= frag.size;
348 _frags[0] = { _data + _headroom, frag.size };
349 } else {
350 auto buf = static_cast<char*>(::malloc(frag.size));
351 if (!buf) {
352 throw std::bad_alloc();
353 }
354 deleter d = make_free_deleter(buf);
355 _frags[0] = { buf, frag.size };
356 _deleter.append(std::move(d));
357 }
358 std::copy(frag.base, frag.base + frag.size, _frags[0].base);
359 ++_nr_frags;
360}
361
362inline
363packet::packet()
364 : _impl(impl::allocate(1)) {
365}
366
367inline
368packet::packet(size_t nr_frags)
369 : _impl(impl::allocate(nr_frags)) {
370}
371
372inline
373packet::packet(fragment frag) : _impl(new impl(frag)) {
374}
375
376inline
377packet::packet(const char* data, size_t size) : packet(fragment{const_cast<char*>(data), size}) {
378}
379
380inline
381packet::packet(fragment frag, deleter d)
382 : _impl(impl::allocate(1)) {
383 _impl->_deleter = std::move(d);
384 _impl->_frags[_impl->_nr_frags++] = frag;
385 _impl->_len = frag.size;
386}
387
388inline
389packet::packet(std::vector<fragment> frag, deleter d)
390 : _impl(impl::allocate(frag.size())) {
391 _impl->_deleter = std::move(d);
392 std::copy(frag.begin(), frag.end(), _impl->_frags);
393 _impl->_nr_frags = frag.size();
394 _impl->_len = 0;
395 for (auto&& f : _impl->fragments()) {
396 _impl->_len += f.size;
397 }
398}
399
400template <typename Iterator>
401inline
402packet::packet(Iterator begin, Iterator end, deleter del) {
403 unsigned nr_frags = 0, len = 0;
404 nr_frags = std::distance(begin, end);
405 std::for_each(begin, end, [&] (const fragment& frag) { len += frag.size; });
406 _impl = impl::allocate(nr_frags);
407 _impl->_deleter = std::move(del);
408 _impl->_len = len;
409 _impl->_nr_frags = nr_frags;
410 std::copy(begin, end, _impl->_frags);
411}
412
413inline
414packet::packet(packet&& x, fragment frag)
415 : _impl(impl::allocate_if_needed(std::move(x._impl), 1)) {
416 _impl->_len += frag.size;
417 std::unique_ptr<char[]> buf(new char[frag.size]);
418 std::copy(frag.base, frag.base + frag.size, buf.get());
419 _impl->_frags[_impl->_nr_frags++] = {buf.get(), frag.size};
420 _impl->_deleter = make_deleter(std::move(_impl->_deleter), [buf = buf.release()] {
421 delete[] buf;
422 });
423}
424
425inline
426bool
427packet::allocate_headroom(size_t size) {
428 if (_impl->_headroom >= size) {
429 _impl->_len += size;
430 if (!_impl->using_internal_data()) {
431 _impl = impl::allocate_if_needed(std::move(_impl), 1);
432 std::copy_backward(_impl->_frags, _impl->_frags + _impl->_nr_frags,
433 _impl->_frags + _impl->_nr_frags + 1);
434 _impl->_frags[0] = { _impl->_data + internal_data_size, 0 };
435 ++_impl->_nr_frags;
436 }
437 _impl->_headroom -= size;
438 _impl->_frags[0].base -= size;
439 _impl->_frags[0].size += size;
440 return true;
441 } else {
442 return false;
443 }
444}
445
446
447inline
448packet::packet(fragment frag, packet&& x)
449 : _impl(std::move(x._impl)) {
450 // try to prepend into existing internal fragment
451 if (allocate_headroom(frag.size)) {
452 std::copy(frag.base, frag.base + frag.size, _impl->_frags[0].base);
453 return;
454 } else {
455 // didn't work out, allocate and copy
456 _impl->unuse_internal_data();
457 _impl = impl::allocate_if_needed(std::move(_impl), 1);
458 _impl->_len += frag.size;
459 std::unique_ptr<char[]> buf(new char[frag.size]);
460 std::copy(frag.base, frag.base + frag.size, buf.get());
461 std::copy_backward(_impl->_frags, _impl->_frags + _impl->_nr_frags,
462 _impl->_frags + _impl->_nr_frags + 1);
463 ++_impl->_nr_frags;
464 _impl->_frags[0] = {buf.get(), frag.size};
465 _impl->_deleter = make_deleter(std::move(_impl->_deleter),
466 [buf = std::move(buf)] {});
467 }
468}
469
470inline
471packet::packet(packet&& x, fragment frag, deleter d)
472 : _impl(impl::allocate_if_needed(std::move(x._impl), 1)) {
473 _impl->_len += frag.size;
474 _impl->_frags[_impl->_nr_frags++] = frag;
475 d.append(std::move(_impl->_deleter));
476 _impl->_deleter = std::move(d);
477}
478
479inline
480packet::packet(packet&& x, deleter d)
481 : _impl(std::move(x._impl)) {
482 _impl->_deleter.append(std::move(d));
483}
484
485inline
486packet::packet(packet&& x, temporary_buffer<char> buf)
487 : packet(std::move(x), fragment{buf.get_write(), buf.size()}, buf.release()) {
488}
489
490inline
491packet::packet(temporary_buffer<char> buf)
492 : packet(fragment{buf.get_write(), buf.size()}, buf.release()) {}
493
494inline
495void packet::append(packet&& p) {
496 if (!_impl->_len) {
497 *this = std::move(p);
498 return;
499 }
500 _impl = impl::allocate_if_needed(std::move(_impl), p._impl->_nr_frags);
501 _impl->_len += p._impl->_len;
502 p._impl->unuse_internal_data();
503 std::copy(p._impl->_frags, p._impl->_frags + p._impl->_nr_frags,
504 _impl->_frags + _impl->_nr_frags);
505 _impl->_nr_frags += p._impl->_nr_frags;
506 p._impl->_deleter.append(std::move(_impl->_deleter));
507 _impl->_deleter = std::move(p._impl->_deleter);
508}
509
510inline
511char* packet::get_header(size_t offset, size_t size) {
512 if (offset + size > _impl->_len) {
513 return nullptr;
514 }
515 size_t i = 0;
516 while (i != _impl->_nr_frags && offset >= _impl->_frags[i].size) {
517 offset -= _impl->_frags[i++].size;
518 }
519 if (i == _impl->_nr_frags) {
520 return nullptr;
521 }
522 if (offset + size > _impl->_frags[i].size) {
523 linearize(i, offset + size);
524 }
525 return _impl->_frags[i].base + offset;
526}
527
528template <typename Header>
529inline
530Header* packet::get_header(size_t offset) {
531 return reinterpret_cast<Header*>(get_header(offset, sizeof(Header)));
532}
533
534inline
535void packet::trim_front(size_t how_much) noexcept {
536 assert(how_much <= _impl->_len);
537 _impl->_len -= how_much;
538 size_t i = 0;
539 while (how_much && how_much >= _impl->_frags[i].size) {
540 how_much -= _impl->_frags[i++].size;
541 }
542 std::copy(_impl->_frags + i, _impl->_frags + _impl->_nr_frags, _impl->_frags);
543 _impl->_nr_frags -= i;
544 if (!_impl->using_internal_data()) {
545 _impl->_headroom = internal_data_size;
546 }
547 if (how_much) {
548 if (_impl->using_internal_data()) {
549 _impl->_headroom += how_much;
550 }
551 _impl->_frags[0].base += how_much;
552 _impl->_frags[0].size -= how_much;
553 }
554}
555
556inline
557void packet::trim_back(size_t how_much) noexcept {
558 assert(how_much <= _impl->_len);
559 _impl->_len -= how_much;
560 size_t i = _impl->_nr_frags - 1;
561 while (how_much && how_much >= _impl->_frags[i].size) {
562 how_much -= _impl->_frags[i--].size;
563 }
564 _impl->_nr_frags = i + 1;
565 if (how_much) {
566 _impl->_frags[i].size -= how_much;
567 if (i == 0 && _impl->using_internal_data()) {
568 _impl->_headroom += how_much;
569 }
570 }
571}
572
573template <typename Header>
574Header*
575packet::prepend_header(size_t extra_size) {
576 auto h = prepend_uninitialized_header(sizeof(Header) + extra_size);
577 return new (h) Header{};
578}
579
580// prepend a header (uninitialized!)
581inline
582char* packet::prepend_uninitialized_header(size_t size) {
583 if (!allocate_headroom(size)) {
584 // didn't work out, allocate and copy
585 _impl->unuse_internal_data();
586 // try again, after unuse_internal_data we may have space after all
587 if (!allocate_headroom(size)) {
588 // failed
589 _impl->_len += size;
590 _impl = impl::allocate_if_needed(std::move(_impl), 1);
591 std::unique_ptr<char[]> buf(new char[size]);
592 std::copy_backward(_impl->_frags, _impl->_frags + _impl->_nr_frags,
593 _impl->_frags + _impl->_nr_frags + 1);
594 ++_impl->_nr_frags;
595 _impl->_frags[0] = {buf.get(), size};
596 _impl->_deleter = make_deleter(std::move(_impl->_deleter),
597 [buf = std::move(buf)] {});
598 }
599 }
600 return _impl->_frags[0].base;
601}
602
603inline
604packet packet::share() {
605 return share(0, _impl->_len);
606}
607
608inline
609packet packet::share(size_t offset, size_t len) {
610 _impl->unuse_internal_data(); // FIXME: eliminate?
611 packet n;
612 n._impl = impl::allocate_if_needed(std::move(n._impl), _impl->_nr_frags);
613 size_t idx = 0;
614 while (offset > 0 && offset >= _impl->_frags[idx].size) {
615 offset -= _impl->_frags[idx++].size;
616 }
617 while (n._impl->_len < len) {
618 auto& f = _impl->_frags[idx++];
619 auto fsize = std::min(len - n._impl->_len, f.size - offset);
620 n._impl->_frags[n._impl->_nr_frags++] = { f.base + offset, fsize };
621 n._impl->_len += fsize;
622 offset = 0;
623 }
624 n._impl->_offload_info = _impl->_offload_info;
625 assert(!n._impl->_deleter);
626 n._impl->_deleter = _impl->_deleter.share();
627 return n;
628}
629
630}
631
632}
Definition: deleter.hh:52
Definition: packet.hh:87
void append(deleter d)
Definition: deleter.hh:220
holds the implementation parts of the metrics layer, do not use directly.
Definition: packet.hh:46
Definition: packet.hh:51
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
future copy(input_stream< CharType > &in, output_stream< CharType > &out)
copy all the content from the input stream to the output stream
Definition: iostream-impl.hh:550
STL namespace.