Seastar
High performance C++ framework for concurrent servers
net.hh
1 /*
2  * This file is open source software, licensed to you under the terms
3  * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4  * distributed with this work for additional information regarding copyright
5  * ownership. You may not use this file except in compliance with the License.
6  *
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an
13  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14  * KIND, either express or implied. See the License for the
15  * specific language governing permissions and limitations
16  * under the License.
17  */
18 /*
19  * Copyright (C) 2014 Cloudius Systems, Ltd.
20  */
21 
22 #pragma once
23 
24 #include <seastar/core/smp.hh>
25 #include <seastar/core/deleter.hh>
26 #include <seastar/core/queue.hh>
27 #include <seastar/core/stream.hh>
29 #include <seastar/net/toeplitz.hh>
30 #include <seastar/net/ethernet.hh>
31 #include <seastar/net/packet.hh>
32 #include <seastar/net/const.hh>
33 #include <unordered_map>
34 
35 namespace seastar {
36 
37 namespace internal {
38 
39 class poller;
40 
41 }
42 
43 namespace net {
44 
45 class packet;
46 class interface;
47 class device;
48 class qp;
49 class l3_protocol;
50 
51 class forward_hash {
52  uint8_t data[64];
53  size_t end_idx = 0;
54 public:
55  size_t size() const {
56  return end_idx;
57  }
58  void push_back(uint8_t b) {
59  assert(end_idx < sizeof(data));
60  data[end_idx++] = b;
61  }
62  void push_back(uint16_t b) {
63  push_back(uint8_t(b));
64  push_back(uint8_t(b >> 8));
65  }
66  void push_back(uint32_t b) {
67  push_back(uint16_t(b));
68  push_back(uint16_t(b >> 16));
69  }
70  const uint8_t& operator[](size_t idx) const {
71  return data[idx];
72  }
73 };
74 
75 struct hw_features {
76  // Enable tx ip header checksum offload
77  bool tx_csum_ip_offload = false;
78  // Enable tx l4 (TCP or UDP) checksum offload
79  bool tx_csum_l4_offload = false;
80  // Enable rx checksum offload
81  bool rx_csum_offload = false;
82  // LRO is enabled
83  bool rx_lro = false;
84  // Enable tx TCP segment offload
85  bool tx_tso = false;
86  // Enable tx UDP fragmentation offload
87  bool tx_ufo = false;
88  // Maximum Transmission Unit
89  uint16_t mtu = 1500;
90  // Maximun packet len when TCP/UDP offload is enabled
91  uint16_t max_packet_len = ip_packet_len_max - eth_hdr_len;
92 };
93 
94 class l3_protocol {
95 public:
96  struct l3packet {
97  eth_protocol_num proto_num;
99  packet p;
100  };
101  using packet_provider_type = std::function<std::optional<l3packet> ()>;
102 private:
103  interface* _netif;
104  eth_protocol_num _proto_num;
105 public:
106  explicit l3_protocol(interface* netif, eth_protocol_num proto_num, packet_provider_type func);
107  future<> receive(
108  std::function<future<> (packet, ethernet_address)> rx_fn,
109  std::function<bool (forward_hash&, packet&, size_t)> forward);
110 private:
111  friend class interface;
112 };
113 
114 class interface {
115  struct l3_rx_stream {
116  stream<packet, ethernet_address> packet_stream;
117  future<> ready;
118  std::function<bool (forward_hash&, packet&, size_t)> forward;
119  l3_rx_stream(std::function<bool (forward_hash&, packet&, size_t)>&& fw) : ready(packet_stream.started()), forward(fw) {}
120  };
121  std::unordered_map<uint16_t, l3_rx_stream> _proto_map;
122  std::shared_ptr<device> _dev;
123  ethernet_address _hw_address;
124  net::hw_features _hw_features;
125  std::vector<l3_protocol::packet_provider_type> _pkt_providers;
126 private:
127  future<> dispatch_packet(packet p);
128 public:
129  explicit interface(std::shared_ptr<device> dev);
130  ethernet_address hw_address() const noexcept { return _hw_address; }
131  const net::hw_features& hw_features() const { return _hw_features; }
132  future<> register_l3(eth_protocol_num proto_num,
133  std::function<future<> (packet p, ethernet_address from)> next,
134  std::function<bool (forward_hash&, packet&, size_t)> forward);
135  void forward(unsigned cpuid, packet p);
136  unsigned hash2cpu(uint32_t hash);
137  void register_packet_provider(l3_protocol::packet_provider_type func) {
138  _pkt_providers.push_back(std::move(func));
139  }
140  uint16_t hw_queues_count();
141  rss_key_type rss_key() const;
142  friend class l3_protocol;
143 };
144 
153  void update_pkts_bunch(uint64_t count) {
154  last_bunch = count;
155  packets += count;
156  }
157 
165  void update_copy_stats(uint64_t nr_frags, uint64_t bytes) {
166  copy_frags += nr_frags;
167  copy_bytes += bytes;
168  }
169 
176  void update_frags_stats(uint64_t nfrags, uint64_t nbytes) {
177  nr_frags += nfrags;
178  bytes += nbytes;
179  }
180 
181  uint64_t bytes; // total number of bytes
182  uint64_t nr_frags; // total number of fragments
183  uint64_t copy_frags; // fragments that were copied on L2 level
184  uint64_t copy_bytes; // bytes that were copied on L2 level
185  uint64_t packets; // total number of packets
186  uint64_t last_bunch; // number of packets in the last sent/received bunch
187 };
188 
189 struct qp_stats {
190  qp_stats() : rx{}, tx{} {}
191 
192  struct {
193  struct qp_stats_good good;
194 
195  struct {
196  void inc_csum_err() {
197  ++csum;
198  ++total;
199  }
200 
201  void inc_no_mem() {
202  ++no_mem;
203  ++total;
204  }
205 
206  uint64_t no_mem; // Packets dropped due to allocation failure
207  uint64_t total; // total number of erroneous packets
208  uint64_t csum; // packets with bad checksum
209  } bad;
210  } rx;
211 
212  struct {
213  struct qp_stats_good good;
214  uint64_t linearized; // number of packets that were linearized
215  } tx;
216 };
217 
218 class qp {
219  using packet_provider_type = std::function<std::optional<packet> ()>;
220  std::vector<packet_provider_type> _pkt_providers;
221  std::optional<std::array<uint8_t, 128>> _sw_reta;
222  circular_buffer<packet> _proxy_packetq;
223  stream<packet> _rx_stream;
224  std::unique_ptr<internal::poller> _tx_poller;
225  circular_buffer<packet> _tx_packetq;
226 
227 protected:
228  const std::string _stats_plugin_name;
229  const std::string _queue_name;
230  metrics::metric_groups _metrics;
231  qp_stats _stats;
232 
233 public:
234  qp(bool register_copy_stats = false,
235  const std::string stats_plugin_name = std::string("network"),
236  uint8_t qid = 0);
237  virtual ~qp();
238  virtual future<> send(packet p) = 0;
239  virtual uint32_t send(circular_buffer<packet>& p) {
240  uint32_t sent = 0;
241  while (!p.empty()) {
242  // FIXME: future is discarded
243  (void)send(std::move(p.front()));
244  p.pop_front();
245  sent++;
246  }
247  return sent;
248  }
249  virtual void rx_start() {};
250  void configure_proxies(const std::map<unsigned, float>& cpu_weights);
251  // build REdirection TAble for cpu_weights map: target cpu -> weight
252  void build_sw_reta(const std::map<unsigned, float>& cpu_weights);
253  void proxy_send(packet p) {
254  _proxy_packetq.push_back(std::move(p));
255  }
256  void register_packet_provider(packet_provider_type func) {
257  _pkt_providers.push_back(std::move(func));
258  }
259  bool poll_tx();
260  friend class device;
261 };
262 
263 class device {
264 protected:
265  std::unique_ptr<qp*[]> _queues;
266  size_t _rss_table_bits = 0;
267 public:
268  device() {
269  _queues = std::make_unique<qp*[]>(smp::count);
270  }
271  virtual ~device() {};
272  qp& queue_for_cpu(unsigned cpu) { return *_queues[cpu]; }
273  qp& local_queue() { return queue_for_cpu(this_shard_id()); }
274  void l2receive(packet p) {
275  // FIXME: future is discarded
276  (void)_queues[this_shard_id()]->_rx_stream.produce(std::move(p));
277  }
278  future<> receive(std::function<future<> (packet)> next_packet);
279  virtual ethernet_address hw_address() = 0;
280  virtual net::hw_features hw_features() = 0;
281  virtual rss_key_type rss_key() const { return default_rsskey_40bytes; }
282  virtual uint16_t hw_queues_count() { return 1; }
283  virtual future<> link_ready() { return make_ready_future<>(); }
284  virtual std::unique_ptr<qp> init_local_queue(const program_options::option_group& opts, uint16_t qid) = 0;
285  virtual unsigned hash2qid(uint32_t hash) {
286  return hash % hw_queues_count();
287  }
288  void set_local_queue(std::unique_ptr<qp> dev);
289  template <typename Func>
290  unsigned forward_dst(unsigned src_cpuid, Func&& hashfn) {
291  auto& qp = queue_for_cpu(src_cpuid);
292  if (!qp._sw_reta) {
293  return src_cpuid;
294  }
295  auto hash = hashfn() >> _rss_table_bits;
296  auto& reta = *qp._sw_reta;
297  return reta[hash % reta.size()];
298  }
299  virtual unsigned hash2cpu(uint32_t hash) {
300  // there is an assumption here that qid == cpu_id which will
301  // not necessary be true in the future
302  return forward_dst(hash2qid(hash), [hash] { return hash; });
303  }
304 };
305 
306 }
307 
308 }
Definition: circular_buffer.hh:63
holds the metric definition.
Definition: metrics_registration.hh:94
Definition: net.hh:263
Definition: net.hh:51
Definition: net.hh:114
Definition: net.hh:94
Definition: packet.hh:87
Definition: net.hh:218
Definition: program-options.hh:290
Definition: stream.hh:61
holds the metric_groups definition needed by class that reports metrics
Definition: net.hh:75
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
SEASTAR_MODULE_EXPORT_BEGIN shard_id this_shard_id() noexcept
Returns shard_id of the of the current shard.
Definition: shard_id.hh:50
Definition: ethernet.hh:37
Definition: net.hh:145
void update_frags_stats(uint64_t nfrags, uint64_t nbytes)
Definition: net.hh:176
void update_copy_stats(uint64_t nr_frags, uint64_t bytes)
Definition: net.hh:165
void update_pkts_bunch(uint64_t count)
Definition: net.hh:153
Definition: net.hh:189