Seastar
High performance C++ framework for concurrent servers
packet.hh
1 /*
2  * This file is open source software, licensed to you under the terms
3  * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4  * distributed with this work for additional information regarding copyright
5  * ownership. You may not use this file except in compliance with the License.
6  *
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an
13  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14  * KIND, either express or implied. See the License for the
15  * specific language governing permissions and limitations
16  * under the License.
17  */
18 /*
19  * Copyright (C) 2014 Cloudius Systems, Ltd.
20  */
21 
22 #pragma once
23 
24 #include <seastar/core/deleter.hh>
25 #include <seastar/core/temporary_buffer.hh>
26 #include <seastar/net/const.hh>
27 #include <seastar/util/std-compat.hh>
28 #include <seastar/util/modules.hh>
29 #ifndef SEASTAR_MODULE
30 #include <algorithm>
31 #include <cassert>
32 #include <cstdint>
33 #include <functional>
34 #include <iosfwd>
35 #include <memory>
36 #include <optional>
37 #include <vector>
38 #endif
39 
40 namespace seastar {
41 
42 namespace net {
43 
44 SEASTAR_MODULE_EXPORT_BEGIN
45 
46 struct fragment {
47  char* base;
48  size_t size;
49 };
50 
51 struct offload_info {
52  ip_protocol_num protocol = ip_protocol_num::unused;
53  bool needs_csum = false;
54  uint8_t ip_hdr_len = 20;
55  uint8_t tcp_hdr_len = 20;
56  uint8_t udp_hdr_len = 8;
57  bool needs_ip_csum = false;
58  bool reassembled = false;
59  uint16_t tso_seg_size = 0;
60  // HW stripped VLAN header (CPU order)
61  std::optional<uint16_t> vlan_tci;
62 };
63 
64 // Zero-copy friendly packet class
65 //
66 // For implementing zero-copy, we need a flexible destructor that can
67 // destroy packet data in different ways: decrementing a reference count,
68 // or calling a free()-like function.
69 //
70 // Moreover, we need different destructors for each set of fragments within
71 // a single fragment. For example, a header and trailer might need delete[]
72 // to be called, while the internal data needs a reference count to be
73 // released. Matters are complicated in that fragments can be split
74 // (due to virtual/physical translation).
75 //
76 // To implement this, we associate each packet with a single destructor,
77 // but allow composing a packet from another packet plus a fragment to
78 // be added, with its own destructor, causing the destructors to be chained.
79 //
80 // The downside is that the data needed for the destructor is duplicated,
81 // if it is already available in the fragment itself.
82 //
83 // As an optimization, when we allocate small fragments, we allocate some
84 // extra space, so prepending to the packet does not require extra
85 // allocations. This is useful when adding headers.
86 //
87 class packet final {
88  // enough for lots of headers, not quite two cache lines:
89  static constexpr size_t internal_data_size = 128 - 16;
90  static constexpr size_t default_nr_frags = 4;
91 
92  struct pseudo_vector {
93  fragment* _start;
94  fragment* _finish;
95  pseudo_vector(fragment* start, size_t nr) noexcept
96  : _start(start), _finish(_start + nr) {}
97  fragment* begin() noexcept { return _start; }
98  fragment* end() noexcept { return _finish; }
99  fragment& operator[](size_t idx) noexcept { return _start[idx]; }
100  };
101 
102  struct impl {
103  // when destroyed, virtual destructor will reclaim resources
104  deleter _deleter;
105  unsigned _len = 0;
106  uint16_t _nr_frags = 0;
107  uint16_t _allocated_frags;
108  offload_info _offload_info;
109  std::optional<uint32_t> _rss_hash;
110  char _data[internal_data_size]; // only _frags[0] may use
111  unsigned _headroom = internal_data_size; // in _data
112  // FIXME: share _data/_frags space
113 
114  fragment _frags[];
115 
116  impl(size_t nr_frags = default_nr_frags) noexcept;
117  impl(const impl&) = delete;
118  impl(fragment frag, size_t nr_frags = default_nr_frags);
119 
120  pseudo_vector fragments() noexcept { return { _frags, _nr_frags }; }
121 
122  static std::unique_ptr<impl> allocate(size_t nr_frags) {
123  nr_frags = std::max(nr_frags, default_nr_frags);
124  return std::unique_ptr<impl>(new (nr_frags) impl(nr_frags));
125  }
126 
127  static std::unique_ptr<impl> copy(impl* old, size_t nr) {
128  auto n = allocate(nr);
129  n->_deleter = std::move(old->_deleter);
130  n->_len = old->_len;
131  n->_nr_frags = old->_nr_frags;
132  n->_headroom = old->_headroom;
133  n->_offload_info = old->_offload_info;
134  n->_rss_hash = old->_rss_hash;
135  std::copy(old->_frags, old->_frags + old->_nr_frags, n->_frags);
136  old->copy_internal_fragment_to(n.get());
137  return n;
138  }
139 
140  static std::unique_ptr<impl> copy(impl* old) {
141  return copy(old, old->_nr_frags);
142  }
143 
144  static std::unique_ptr<impl> allocate_if_needed(std::unique_ptr<impl> old, size_t extra_frags) {
145  if (old->_allocated_frags >= old->_nr_frags + extra_frags) {
146  return old;
147  }
148  return copy(old.get(), std::max<size_t>(old->_nr_frags + extra_frags, 2 * old->_nr_frags));
149  }
150  void* operator new(size_t size, size_t nr_frags = default_nr_frags) {
151  assert(nr_frags == uint16_t(nr_frags));
152  return ::operator new(size + nr_frags * sizeof(fragment));
153  }
154  // Matching the operator new above
155  void operator delete(void* ptr, size_t) {
156  return ::operator delete(ptr);
157  }
158  // Since the above "placement delete" hides the global one, expose it
159  void operator delete(void* ptr) {
160  return ::operator delete(ptr);
161  }
162 
163  bool using_internal_data() const noexcept {
164  return _nr_frags
165  && _frags[0].base >= _data
166  && _frags[0].base < _data + internal_data_size;
167  }
168 
169  void unuse_internal_data() {
170  if (!using_internal_data()) {
171  return;
172  }
173  auto buf = static_cast<char*>(::malloc(_frags[0].size));
174  if (!buf) {
175  throw std::bad_alloc();
176  }
177  deleter d = make_free_deleter(buf);
178  std::copy(_frags[0].base, _frags[0].base + _frags[0].size, buf);
179  _frags[0].base = buf;
180  d.append(std::move(_deleter));
181  _deleter = std::move(d);
182  _headroom = internal_data_size;
183  }
184  void copy_internal_fragment_to(impl* to) noexcept {
185  if (!using_internal_data()) {
186  return;
187  }
188  to->_frags[0].base = to->_data + _headroom;
189  std::copy(_frags[0].base, _frags[0].base + _frags[0].size,
190  to->_frags[0].base);
191  }
192  };
193  packet(std::unique_ptr<impl>&& impl) noexcept : _impl(std::move(impl)) {}
194  std::unique_ptr<impl> _impl;
195 public:
196  static packet from_static_data(const char* data, size_t len) noexcept {
197  return {fragment{const_cast<char*>(data), len}, deleter()};
198  }
199 
200  // build empty packet
201  packet();
202  // build empty packet with nr_frags allocated
203  packet(size_t nr_frags);
204  // move existing packet
205  packet(packet&& x) noexcept;
206  // copy data into packet
207  packet(const char* data, size_t len);
208  // copy data into packet
209  packet(fragment frag);
210  // zero-copy single fragment
211  packet(fragment frag, deleter del);
212  // zero-copy multiple fragments
213  packet(std::vector<fragment> frag, deleter del);
214  // build packet with iterator
215  template <typename Iterator>
216  packet(Iterator begin, Iterator end, deleter del);
217  // append fragment (copying new fragment)
218  packet(packet&& x, fragment frag);
219  // prepend fragment (copying new fragment, with header optimization)
220  packet(fragment frag, packet&& x);
221  // prepend fragment (zero-copy)
222  packet(fragment frag, deleter del, packet&& x);
223  // append fragment (zero-copy)
224  packet(packet&& x, fragment frag, deleter d);
225  // append temporary_buffer (zero-copy)
227  // create from temporary_buffer (zero-copy)
229  // append deleter
230  packet(packet&& x, deleter d);
231 
232  packet& operator=(packet&& x) noexcept {
233  if (this != &x) {
234  this->~packet();
235  new (this) packet(std::move(x));
236  }
237  return *this;
238  }
239 
240  unsigned len() const noexcept { return _impl->_len; }
241  unsigned memory() const noexcept { return len() + sizeof(packet::impl); }
242 
243  fragment frag(unsigned idx) const noexcept { return _impl->_frags[idx]; }
244  fragment& frag(unsigned idx) noexcept { return _impl->_frags[idx]; }
245 
246  unsigned nr_frags() const noexcept { return _impl->_nr_frags; }
247  pseudo_vector fragments() const noexcept { return { _impl->_frags, _impl->_nr_frags }; }
248  fragment* fragment_array() const noexcept { return _impl->_frags; }
249 
250  // share packet data (reference counted, non COW)
251  packet share();
252  packet share(size_t offset, size_t len);
253 
254  void append(packet&& p);
255 
256  void trim_front(size_t how_much) noexcept;
257  void trim_back(size_t how_much) noexcept;
258 
259  // get a header pointer, linearizing if necessary
260  template <typename Header>
261  Header* get_header(size_t offset = 0);
262 
263  // get a header pointer, linearizing if necessary
264  char* get_header(size_t offset, size_t size);
265 
266  // prepend a header (default-initializing it)
267  template <typename Header>
268  Header* prepend_header(size_t extra_size = 0);
269 
270  // prepend a header (uninitialized!)
271  char* prepend_uninitialized_header(size_t size);
272 
273  packet free_on_cpu(unsigned cpu, std::function<void()> cb = []{});
274 
275  void linearize() { return linearize(0, len()); }
276 
277  void reset() noexcept { _impl.reset(); }
278 
279  void reserve(int n_frags) {
280  if (n_frags > _impl->_nr_frags) {
281  auto extra = n_frags - _impl->_nr_frags;
282  _impl = impl::allocate_if_needed(std::move(_impl), extra);
283  }
284  }
285  std::optional<uint32_t> rss_hash() const noexcept {
286  return _impl->_rss_hash;
287  }
288  std::optional<uint32_t> set_rss_hash(uint32_t hash) noexcept {
289  return _impl->_rss_hash = hash;
290  }
291  // Call `func` for each fragment, avoiding data copies when possible
292  // `func` is called with a temporary_buffer<char> parameter
293  template <typename Func>
294  void release_into(Func&& func) {
295  unsigned idx = 0;
296  if (_impl->using_internal_data()) {
297  auto&& f = frag(idx++);
298  func(temporary_buffer<char>(f.base, f.size));
299  }
300  while (idx < nr_frags()) {
301  auto&& f = frag(idx++);
302  func(temporary_buffer<char>(f.base, f.size, _impl->_deleter.share()));
303  }
304  }
305  std::vector<temporary_buffer<char>> release() {
306  std::vector<temporary_buffer<char>> ret;
307  ret.reserve(_impl->_nr_frags);
308  release_into([&ret] (temporary_buffer<char>&& frag) {
309  ret.push_back(std::move(frag));
310  });
311  return ret;
312  }
313  explicit operator bool() noexcept {
314  return bool(_impl);
315  }
316  static packet make_null_packet() noexcept {
317  return net::packet(nullptr);
318  }
319 private:
320  void linearize(size_t at_frag, size_t desired_size);
321  bool allocate_headroom(size_t size);
322 public:
323  struct offload_info get_offload_info() const noexcept { return _impl->_offload_info; }
324  struct offload_info& offload_info_ref() noexcept { return _impl->_offload_info; }
325  void set_offload_info(struct offload_info oi) noexcept { _impl->_offload_info = oi; }
326 };
327 
328 std::ostream& operator<<(std::ostream& os, const packet& p);
329 
330 SEASTAR_MODULE_EXPORT_END
331 
332 inline
333 packet::packet(packet&& x) noexcept
334  : _impl(std::move(x._impl)) {
335 }
336 
337 inline
338 packet::impl::impl(size_t nr_frags) noexcept
339  : _len(0), _allocated_frags(nr_frags) {
340 }
341 
342 inline
343 packet::impl::impl(fragment frag, size_t nr_frags)
344  : _len(frag.size), _allocated_frags(nr_frags) {
345  assert(_allocated_frags > _nr_frags);
346  if (frag.size <= internal_data_size) {
347  _headroom -= frag.size;
348  _frags[0] = { _data + _headroom, frag.size };
349  } else {
350  auto buf = static_cast<char*>(::malloc(frag.size));
351  if (!buf) {
352  throw std::bad_alloc();
353  }
354  deleter d = make_free_deleter(buf);
355  _frags[0] = { buf, frag.size };
356  _deleter.append(std::move(d));
357  }
358  std::copy(frag.base, frag.base + frag.size, _frags[0].base);
359  ++_nr_frags;
360 }
361 
362 inline
363 packet::packet()
364  : _impl(impl::allocate(1)) {
365 }
366 
367 inline
368 packet::packet(size_t nr_frags)
369  : _impl(impl::allocate(nr_frags)) {
370 }
371 
372 inline
373 packet::packet(fragment frag) : _impl(new impl(frag)) {
374 }
375 
376 inline
377 packet::packet(const char* data, size_t size) : packet(fragment{const_cast<char*>(data), size}) {
378 }
379 
380 inline
381 packet::packet(fragment frag, deleter d)
382  : _impl(impl::allocate(1)) {
383  _impl->_deleter = std::move(d);
384  _impl->_frags[_impl->_nr_frags++] = frag;
385  _impl->_len = frag.size;
386 }
387 
388 inline
389 packet::packet(std::vector<fragment> frag, deleter d)
390  : _impl(impl::allocate(frag.size())) {
391  _impl->_deleter = std::move(d);
392  std::copy(frag.begin(), frag.end(), _impl->_frags);
393  _impl->_nr_frags = frag.size();
394  _impl->_len = 0;
395  for (auto&& f : _impl->fragments()) {
396  _impl->_len += f.size;
397  }
398 }
399 
400 template <typename Iterator>
401 inline
402 packet::packet(Iterator begin, Iterator end, deleter del) {
403  unsigned nr_frags = 0, len = 0;
404  nr_frags = std::distance(begin, end);
405  std::for_each(begin, end, [&] (const fragment& frag) { len += frag.size; });
406  _impl = impl::allocate(nr_frags);
407  _impl->_deleter = std::move(del);
408  _impl->_len = len;
409  _impl->_nr_frags = nr_frags;
410  std::copy(begin, end, _impl->_frags);
411 }
412 
413 inline
414 packet::packet(packet&& x, fragment frag)
415  : _impl(impl::allocate_if_needed(std::move(x._impl), 1)) {
416  _impl->_len += frag.size;
417  std::unique_ptr<char[]> buf(new char[frag.size]);
418  std::copy(frag.base, frag.base + frag.size, buf.get());
419  _impl->_frags[_impl->_nr_frags++] = {buf.get(), frag.size};
420  _impl->_deleter = make_deleter(std::move(_impl->_deleter), [buf = buf.release()] {
421  delete[] buf;
422  });
423 }
424 
425 inline
426 bool
427 packet::allocate_headroom(size_t size) {
428  if (_impl->_headroom >= size) {
429  _impl->_len += size;
430  if (!_impl->using_internal_data()) {
431  _impl = impl::allocate_if_needed(std::move(_impl), 1);
432  std::copy_backward(_impl->_frags, _impl->_frags + _impl->_nr_frags,
433  _impl->_frags + _impl->_nr_frags + 1);
434  _impl->_frags[0] = { _impl->_data + internal_data_size, 0 };
435  ++_impl->_nr_frags;
436  }
437  _impl->_headroom -= size;
438  _impl->_frags[0].base -= size;
439  _impl->_frags[0].size += size;
440  return true;
441  } else {
442  return false;
443  }
444 }
445 
446 
447 inline
448 packet::packet(fragment frag, packet&& x)
449  : _impl(std::move(x._impl)) {
450  // try to prepend into existing internal fragment
451  if (allocate_headroom(frag.size)) {
452  std::copy(frag.base, frag.base + frag.size, _impl->_frags[0].base);
453  return;
454  } else {
455  // didn't work out, allocate and copy
456  _impl->unuse_internal_data();
457  _impl = impl::allocate_if_needed(std::move(_impl), 1);
458  _impl->_len += frag.size;
459  std::unique_ptr<char[]> buf(new char[frag.size]);
460  std::copy(frag.base, frag.base + frag.size, buf.get());
461  std::copy_backward(_impl->_frags, _impl->_frags + _impl->_nr_frags,
462  _impl->_frags + _impl->_nr_frags + 1);
463  ++_impl->_nr_frags;
464  _impl->_frags[0] = {buf.get(), frag.size};
465  _impl->_deleter = make_deleter(std::move(_impl->_deleter),
466  [buf = std::move(buf)] {});
467  }
468 }
469 
470 inline
471 packet::packet(packet&& x, fragment frag, deleter d)
472  : _impl(impl::allocate_if_needed(std::move(x._impl), 1)) {
473  _impl->_len += frag.size;
474  _impl->_frags[_impl->_nr_frags++] = frag;
475  d.append(std::move(_impl->_deleter));
476  _impl->_deleter = std::move(d);
477 }
478 
479 inline
480 packet::packet(packet&& x, deleter d)
481  : _impl(std::move(x._impl)) {
482  _impl->_deleter.append(std::move(d));
483 }
484 
485 inline
486 packet::packet(packet&& x, temporary_buffer<char> buf)
487  : packet(std::move(x), fragment{buf.get_write(), buf.size()}, buf.release()) {
488 }
489 
490 inline
491 packet::packet(temporary_buffer<char> buf)
492  : packet(fragment{buf.get_write(), buf.size()}, buf.release()) {}
493 
494 inline
495 void packet::append(packet&& p) {
496  if (!_impl->_len) {
497  *this = std::move(p);
498  return;
499  }
500  _impl = impl::allocate_if_needed(std::move(_impl), p._impl->_nr_frags);
501  _impl->_len += p._impl->_len;
502  p._impl->unuse_internal_data();
503  std::copy(p._impl->_frags, p._impl->_frags + p._impl->_nr_frags,
504  _impl->_frags + _impl->_nr_frags);
505  _impl->_nr_frags += p._impl->_nr_frags;
506  p._impl->_deleter.append(std::move(_impl->_deleter));
507  _impl->_deleter = std::move(p._impl->_deleter);
508 }
509 
510 inline
511 char* packet::get_header(size_t offset, size_t size) {
512  if (offset + size > _impl->_len) {
513  return nullptr;
514  }
515  size_t i = 0;
516  while (i != _impl->_nr_frags && offset >= _impl->_frags[i].size) {
517  offset -= _impl->_frags[i++].size;
518  }
519  if (i == _impl->_nr_frags) {
520  return nullptr;
521  }
522  if (offset + size > _impl->_frags[i].size) {
523  linearize(i, offset + size);
524  }
525  return _impl->_frags[i].base + offset;
526 }
527 
528 template <typename Header>
529 inline
530 Header* packet::get_header(size_t offset) {
531  return reinterpret_cast<Header*>(get_header(offset, sizeof(Header)));
532 }
533 
534 inline
535 void packet::trim_front(size_t how_much) noexcept {
536  assert(how_much <= _impl->_len);
537  _impl->_len -= how_much;
538  size_t i = 0;
539  while (how_much && how_much >= _impl->_frags[i].size) {
540  how_much -= _impl->_frags[i++].size;
541  }
542  std::copy(_impl->_frags + i, _impl->_frags + _impl->_nr_frags, _impl->_frags);
543  _impl->_nr_frags -= i;
544  if (!_impl->using_internal_data()) {
545  _impl->_headroom = internal_data_size;
546  }
547  if (how_much) {
548  if (_impl->using_internal_data()) {
549  _impl->_headroom += how_much;
550  }
551  _impl->_frags[0].base += how_much;
552  _impl->_frags[0].size -= how_much;
553  }
554 }
555 
556 inline
557 void packet::trim_back(size_t how_much) noexcept {
558  assert(how_much <= _impl->_len);
559  _impl->_len -= how_much;
560  size_t i = _impl->_nr_frags - 1;
561  while (how_much && how_much >= _impl->_frags[i].size) {
562  how_much -= _impl->_frags[i--].size;
563  }
564  _impl->_nr_frags = i + 1;
565  if (how_much) {
566  _impl->_frags[i].size -= how_much;
567  if (i == 0 && _impl->using_internal_data()) {
568  _impl->_headroom += how_much;
569  }
570  }
571 }
572 
573 template <typename Header>
574 Header*
575 packet::prepend_header(size_t extra_size) {
576  auto h = prepend_uninitialized_header(sizeof(Header) + extra_size);
577  return new (h) Header{};
578 }
579 
580 // prepend a header (uninitialized!)
581 inline
582 char* packet::prepend_uninitialized_header(size_t size) {
583  if (!allocate_headroom(size)) {
584  // didn't work out, allocate and copy
585  _impl->unuse_internal_data();
586  // try again, after unuse_internal_data we may have space after all
587  if (!allocate_headroom(size)) {
588  // failed
589  _impl->_len += size;
590  _impl = impl::allocate_if_needed(std::move(_impl), 1);
591  std::unique_ptr<char[]> buf(new char[size]);
592  std::copy_backward(_impl->_frags, _impl->_frags + _impl->_nr_frags,
593  _impl->_frags + _impl->_nr_frags + 1);
594  ++_impl->_nr_frags;
595  _impl->_frags[0] = {buf.get(), size};
596  _impl->_deleter = make_deleter(std::move(_impl->_deleter),
597  [buf = std::move(buf)] {});
598  }
599  }
600  return _impl->_frags[0].base;
601 }
602 
603 inline
604 packet packet::share() {
605  return share(0, _impl->_len);
606 }
607 
608 inline
609 packet packet::share(size_t offset, size_t len) {
610  _impl->unuse_internal_data(); // FIXME: eliminate?
611  packet n;
612  n._impl = impl::allocate_if_needed(std::move(n._impl), _impl->_nr_frags);
613  size_t idx = 0;
614  while (offset > 0 && offset >= _impl->_frags[idx].size) {
615  offset -= _impl->_frags[idx++].size;
616  }
617  while (n._impl->_len < len) {
618  auto& f = _impl->_frags[idx++];
619  auto fsize = std::min(len - n._impl->_len, f.size - offset);
620  n._impl->_frags[n._impl->_nr_frags++] = { f.base + offset, fsize };
621  n._impl->_len += fsize;
622  offset = 0;
623  }
624  n._impl->_offload_info = _impl->_offload_info;
625  assert(!n._impl->_deleter);
626  n._impl->_deleter = _impl->_deleter.share();
627  return n;
628 }
629 
630 }
631 
632 }
Definition: deleter.hh:51
Definition: packet.hh:87
void append(deleter d)
Definition: deleter.hh:219
holds the implementation parts of the metrics layer, do not use directly.
Definition: packet.hh:46
Definition: packet.hh:51
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
future copy(input_stream< CharType > &in, output_stream< CharType > &out)
copy all the content from the input stream to the output stream
Definition: iostream-impl.hh:550