Seastar
High performance C++ framework for concurrent servers
tcp.hh
1 /*
2  * This file is open source software, licensed to you under the terms
3  * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4  * distributed with this work for additional information regarding copyright
5  * ownership. You may not use this file except in compliance with the License.
6  *
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an
13  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14  * KIND, either express or implied. See the License for the
15  * specific language governing permissions and limitations
16  * under the License.
17  */
18 /*
19  * Copyright (C) 2014 Cloudius Systems, Ltd.
20  */
21 
22 #pragma once
23 
24 #ifndef SEASTAR_MODULE
25 #include <unordered_map>
26 #include <map>
27 #include <functional>
28 #include <deque>
29 #include <chrono>
30 #include <random>
31 #include <stdexcept>
32 #include <system_error>
33 
34 #define CRYPTOPP_ENABLE_NAMESPACE_WEAK 1
35 #include <cryptopp/md5.h>
36 #endif
37 
38 #include <seastar/core/shared_ptr.hh>
39 #include <seastar/core/queue.hh>
40 #include <seastar/core/semaphore.hh>
41 #include <seastar/core/byteorder.hh>
42 #include <seastar/core/metrics.hh>
43 #include <seastar/net/net.hh>
44 #include <seastar/net/ip_checksum.hh>
45 #include <seastar/net/ip.hh>
46 #include <seastar/net/const.hh>
47 #include <seastar/net/packet-util.hh>
48 #include <seastar/util/std-compat.hh>
49 
50 namespace seastar {
51 
52 using namespace std::chrono_literals;
53 
54 namespace net {
55 
56 struct tcp_hdr;
57 
58 inline auto tcp_error(int err) {
59  return std::system_error(err, std::system_category());
60 }
61 
62 inline auto tcp_reset_error() {
63  return tcp_error(ECONNRESET);
64 };
65 
66 inline auto tcp_connect_error() {
67  return tcp_error(ECONNABORTED);
68 }
69 
70 inline auto tcp_refused_error() {
71  return tcp_error(ECONNREFUSED);
72 };
73 
74 enum class tcp_state : uint16_t {
75  CLOSED = (1 << 0),
76  LISTEN = (1 << 1),
77  SYN_SENT = (1 << 2),
78  SYN_RECEIVED = (1 << 3),
79  ESTABLISHED = (1 << 4),
80  FIN_WAIT_1 = (1 << 5),
81  FIN_WAIT_2 = (1 << 6),
82  CLOSE_WAIT = (1 << 7),
83  CLOSING = (1 << 8),
84  LAST_ACK = (1 << 9),
85  TIME_WAIT = (1 << 10)
86 };
87 
88 inline tcp_state operator|(tcp_state s1, tcp_state s2) {
89  return tcp_state(uint16_t(s1) | uint16_t(s2));
90 }
91 
92 template <typename... Args>
93 void tcp_debug(const char* fmt, Args&&... args) {
94 #if TCP_DEBUG
95  print(fmt, std::forward<Args>(args)...);
96 #endif
97 }
98 
99 struct tcp_option {
100  // The kind and len field are fixed and defined in TCP protocol
101  enum class option_kind: uint8_t { mss = 2, win_scale = 3, sack = 4, timestamps = 8, nop = 1, eol = 0 };
102  enum class option_len: uint8_t { mss = 4, win_scale = 3, sack = 2, timestamps = 10, nop = 1, eol = 1 };
103  static void write(char* p, option_kind kind, option_len len) {
104  p[0] = static_cast<uint8_t>(kind);
105  if (static_cast<uint8_t>(len) > 1) {
106  p[1] = static_cast<uint8_t>(len);
107  }
108  }
109  struct mss {
110  static constexpr option_kind kind = option_kind::mss;
111  static constexpr option_len len = option_len::mss;
112  uint16_t mss;
113  static tcp_option::mss read(const char* p) {
114  tcp_option::mss x;
115  x.mss = read_be<uint16_t>(p + 2);
116  return x;
117  }
118  void write(char* p) const {
119  tcp_option::write(p, kind, len);
120  write_be<uint16_t>(p + 2, mss);
121  }
122  };
123  struct win_scale {
124  static constexpr option_kind kind = option_kind::win_scale;
125  static constexpr option_len len = option_len::win_scale;
126  uint8_t shift;
127  static tcp_option::win_scale read(const char* p) {
129  x.shift = p[2];
130  return x;
131  }
132  void write(char* p) const {
133  tcp_option::write(p, kind, len);
134  p[2] = shift;
135  }
136  };
137  struct sack {
138  static constexpr option_kind kind = option_kind::sack;
139  static constexpr option_len len = option_len::sack;
140  static tcp_option::sack read(const char* p) {
141  return {};
142  }
143  void write(char* p) const {
144  tcp_option::write(p, kind, len);
145  }
146  };
147  struct timestamps {
148  static constexpr option_kind kind = option_kind::timestamps;
149  static constexpr option_len len = option_len::timestamps;
150  uint32_t t1;
151  uint32_t t2;
152  static tcp_option::timestamps read(const char* p) {
154  ts.t1 = read_be<uint32_t>(p + 2);
155  ts.t2 = read_be<uint32_t>(p + 6);
156  return ts;
157  }
158  void write(char* p) const {
159  tcp_option::write(p, kind, len);
160  write_be<uint32_t>(p + 2, t1);
161  write_be<uint32_t>(p + 6, t2);
162  }
163  };
164  struct nop {
165  static constexpr option_kind kind = option_kind::nop;
166  static constexpr option_len len = option_len::nop;
167  void write(char* p) const {
168  tcp_option::write(p, kind, len);
169  }
170  };
171  struct eol {
172  static constexpr option_kind kind = option_kind::eol;
173  static constexpr option_len len = option_len::eol;
174  void write(char* p) const {
175  tcp_option::write(p, kind, len);
176  }
177  };
178  static const uint8_t align = 4;
179 
180  void parse(uint8_t* beg, uint8_t* end);
181  uint8_t fill(void* h, const tcp_hdr* th, uint8_t option_size);
182  uint8_t get_size(bool syn_on, bool ack_on);
183 
184  // For option negotiattion
185  bool _mss_received = false;
186  bool _win_scale_received = false;
187  bool _timestamps_received = false;
188  bool _sack_received = false;
189 
190  // Option data
191  uint16_t _remote_mss = 536;
192  uint16_t _local_mss;
193  uint8_t _remote_win_scale = 0;
194  uint8_t _local_win_scale = 0;
195 };
196 inline char*& operator+=(char*& x, tcp_option::option_len len) { x += uint8_t(len); return x; }
197 inline const char*& operator+=(const char*& x, tcp_option::option_len len) { x += uint8_t(len); return x; }
198 inline uint8_t& operator+=(uint8_t& x, tcp_option::option_len len) { x += uint8_t(len); return x; }
199 
200 struct tcp_seq {
201  uint32_t raw;
202 };
203 
204 inline tcp_seq ntoh(tcp_seq s) {
205  return tcp_seq { ntoh(s.raw) };
206 }
207 
208 inline tcp_seq hton(tcp_seq s) {
209  return tcp_seq { hton(s.raw) };
210 }
211 
212 inline
213 std::ostream& operator<<(std::ostream& os, tcp_seq s) {
214  return os << s.raw;
215 }
216 
217 inline tcp_seq make_seq(uint32_t raw) { return tcp_seq{raw}; }
218 inline tcp_seq& operator+=(tcp_seq& s, int32_t n) { s.raw += n; return s; }
219 inline tcp_seq& operator-=(tcp_seq& s, int32_t n) { s.raw -= n; return s; }
220 inline tcp_seq operator+(tcp_seq s, int32_t n) { return s += n; }
221 inline tcp_seq operator-(tcp_seq s, int32_t n) { return s -= n; }
222 inline int32_t operator-(tcp_seq s, tcp_seq q) { return s.raw - q.raw; }
223 inline bool operator==(tcp_seq s, tcp_seq q) { return s.raw == q.raw; }
224 inline bool operator!=(tcp_seq s, tcp_seq q) { return !(s == q); }
225 inline bool operator<(tcp_seq s, tcp_seq q) { return s - q < 0; }
226 inline bool operator>(tcp_seq s, tcp_seq q) { return q < s; }
227 inline bool operator<=(tcp_seq s, tcp_seq q) { return !(s > q); }
228 inline bool operator>=(tcp_seq s, tcp_seq q) { return !(s < q); }
229 
230 struct tcp_hdr {
231  static constexpr size_t len = 20;
232  uint16_t src_port;
233  uint16_t dst_port;
234  tcp_seq seq;
235  tcp_seq ack;
236  uint8_t rsvd1 : 4;
237  uint8_t data_offset : 4;
238  uint8_t f_fin : 1;
239  uint8_t f_syn : 1;
240  uint8_t f_rst : 1;
241  uint8_t f_psh : 1;
242  uint8_t f_ack : 1;
243  uint8_t f_urg : 1;
244  uint8_t rsvd2 : 2;
245  uint16_t window;
246  uint16_t checksum;
247  uint16_t urgent;
248  static tcp_hdr read(const char* p) {
249  tcp_hdr h;
250  h.src_port = read_be<uint16_t>(p + 0);
251  h.dst_port = read_be<uint16_t>(p + 2);
252  h.seq = tcp_seq{read_be<uint32_t>(p + 4)};
253  h.ack = tcp_seq{read_be<uint32_t>(p + 8)};
254  h.rsvd1 = p[12] & 15;
255  h.data_offset = uint8_t(p[12]) >> 4;
256  h.f_fin = (uint8_t(p[13]) >> 0) & 1;
257  h.f_syn = (uint8_t(p[13]) >> 1) & 1;
258  h.f_rst = (uint8_t(p[13]) >> 2) & 1;
259  h.f_psh = (uint8_t(p[13]) >> 3) & 1;
260  h.f_ack = (uint8_t(p[13]) >> 4) & 1;
261  h.f_urg = (uint8_t(p[13]) >> 5) & 1;
262  h.rsvd2 = (uint8_t(p[13]) >> 6) & 3;
263  h.window = read_be<uint16_t>(p + 14);
264  h.checksum = read_be<uint16_t>(p + 16);
265  h.urgent = read_be<uint16_t>(p + 18);
266  return h;
267  }
268  void write(char* p) const {
269  write_be<uint16_t>(p + 0, src_port);
270  write_be<uint16_t>(p + 2, dst_port);
271  write_be<uint32_t>(p + 4, seq.raw);
272  write_be<uint32_t>(p + 8, ack.raw);
273  p[12] = rsvd1 | (data_offset << 4);
274  p[13] = (f_fin << 0)
275  | (f_syn << 1)
276  | (f_rst << 2)
277  | (f_psh << 3)
278  | (f_ack << 4)
279  | (f_urg << 5)
280  | (rsvd2 << 6);
281  write_be<uint16_t>(p + 14, window);
282  write_be<uint16_t>(p + 16, checksum);
283  write_be<uint16_t>(p + 18, urgent);
284  }
285  static void write_nbo_checksum(char* p, uint16_t checksum_in_network_byte_order) {
286  std::copy_n(reinterpret_cast<const char*>(&checksum_in_network_byte_order), 2, p + 16);
287  }
288 };
289 
290 struct tcp_tag {};
292 
293 template <typename InetTraits>
294 class tcp {
295 public:
296  using ipaddr = typename InetTraits::address_type;
297  using inet_type = typename InetTraits::inet_type;
299  using connid_hash = typename connid::connid_hash;
300  class connection;
301  class listener;
302 private:
303  class tcb;
304 
305  class tcb : public enable_lw_shared_from_this<tcb> {
306  using clock_type = lowres_clock;
307  static constexpr tcp_state CLOSED = tcp_state::CLOSED;
308  static constexpr tcp_state LISTEN = tcp_state::LISTEN;
309  static constexpr tcp_state SYN_SENT = tcp_state::SYN_SENT;
310  static constexpr tcp_state SYN_RECEIVED = tcp_state::SYN_RECEIVED;
311  static constexpr tcp_state ESTABLISHED = tcp_state::ESTABLISHED;
312  static constexpr tcp_state FIN_WAIT_1 = tcp_state::FIN_WAIT_1;
313  static constexpr tcp_state FIN_WAIT_2 = tcp_state::FIN_WAIT_2;
314  static constexpr tcp_state CLOSE_WAIT = tcp_state::CLOSE_WAIT;
315  static constexpr tcp_state CLOSING = tcp_state::CLOSING;
316  static constexpr tcp_state LAST_ACK = tcp_state::LAST_ACK;
317  static constexpr tcp_state TIME_WAIT = tcp_state::TIME_WAIT;
318  tcp_state _state = CLOSED;
319  tcp& _tcp;
320  connection* _conn = nullptr;
321  promise<> _connect_done;
322  std::optional<promise<>> _fin_recvd_promise = promise<>();
323  ipaddr _local_ip;
324  ipaddr _foreign_ip;
325  uint16_t _local_port;
326  uint16_t _foreign_port;
327  struct unacked_segment {
328  packet p;
329  uint16_t data_len;
330  unsigned nr_transmits;
331  clock_type::time_point tx_time;
332  };
333  struct send {
334  tcp_seq unacknowledged;
335  tcp_seq next;
336  uint32_t window;
337  uint8_t window_scale;
338  uint16_t mss;
339  tcp_seq urgent;
340  tcp_seq wl1;
341  tcp_seq wl2;
342  tcp_seq initial;
343  std::deque<unacked_segment> data;
344  std::deque<packet> unsent;
345  uint32_t unsent_len = 0;
346  bool closed = false;
347  promise<> _window_opened;
348  // Wait for all data are acked
349  std::optional<promise<>> _all_data_acked_promise;
350  // Limit number of data queued into send queue
351  size_t max_queue_space = 212992;
352  size_t current_queue_space = 0;
353  // wait for there is at least one byte available in the queue
354  std::optional<promise<>> _send_available_promise;
355  // Round-trip time variation
356  std::chrono::milliseconds rttvar;
357  // Smoothed round-trip time
358  std::chrono::milliseconds srtt;
359  bool first_rto_sample = true;
360  clock_type::time_point syn_tx_time;
361  // Congestion window
362  uint32_t cwnd;
363  // Slow start threshold
364  uint32_t ssthresh;
365  // Duplicated ACKs
366  uint16_t dupacks = 0;
367  unsigned syn_retransmit = 0;
368  unsigned fin_retransmit = 0;
369  uint32_t limited_transfer = 0;
370  uint32_t partial_ack = 0;
371  tcp_seq recover;
372  bool window_probe = false;
373  uint8_t zero_window_probing_out = 0;
374  } _snd;
375  struct receive {
376  tcp_seq next;
377  uint32_t window;
378  uint8_t window_scale;
379  uint16_t mss;
380  tcp_seq urgent;
381  tcp_seq initial;
382  std::deque<packet> data;
383  // The total size of data stored in std::deque<packet> data
384  size_t data_size = 0;
385  tcp_packet_merger out_of_order;
386  std::optional<promise<>> _data_received_promise;
387  // The maximun memory buffer size allowed for receiving
388  // Currently, it is the same as default receive window size when window scaling is enabled
389  size_t max_receive_buf_size = 3737600;
390  } _rcv;
391  tcp_option _option;
392  timer<lowres_clock> _delayed_ack;
393  // Retransmission timeout
394  std::chrono::milliseconds _rto{1000};
395  std::chrono::milliseconds _persist_time_out{1000};
396  static constexpr std::chrono::milliseconds _rto_min{1000};
397  static constexpr std::chrono::milliseconds _rto_max{60000};
398  // Clock granularity
399  static constexpr std::chrono::milliseconds _rto_clk_granularity{1};
400  static constexpr uint16_t _max_nr_retransmit{5};
401  timer<lowres_clock> _retransmit;
402  timer<lowres_clock> _persist;
403  uint16_t _nr_full_seg_received = 0;
404  struct isn_secret {
405  // 512 bits secretkey for ISN generating
406  uint32_t key[16];
407  isn_secret () {
408  std::random_device rd;
409  std::default_random_engine e(rd());
410  std::uniform_int_distribution<uint32_t> dist{};
411  for (auto& k : key) {
412  k = dist(e);
413  }
414  }
415  };
416  static isn_secret _isn_secret;
417  tcp_seq get_isn();
419  bool _poll_active = false;
420  uint32_t get_default_receive_window_size() {
421  // Linux's default window size
422  constexpr uint32_t size = 29200;
423  return size << _rcv.window_scale;
424  }
425  // Returns the current receive window according to available receiving buffer size
426  uint32_t get_modified_receive_window_size() {
427  uint32_t left = _rcv.data_size > _rcv.max_receive_buf_size ? 0 : _rcv.max_receive_buf_size - _rcv.data_size;
428  return std::min(left, get_default_receive_window_size());
429  }
430  public:
431  tcb(tcp& t, connid id);
432  void input_handle_listen_state(tcp_hdr* th, packet p);
433  void input_handle_syn_sent_state(tcp_hdr* th, packet p);
434  void input_handle_other_state(tcp_hdr* th, packet p);
435  void output_one(bool data_retransmit = false);
436  future<> wait_for_data();
437  future<> wait_input_shutdown();
438  void abort_reader() noexcept;
439  future<> wait_for_all_data_acked();
440  future<> wait_send_available();
441  future<> send(packet p);
442  void connect();
443  packet read();
444  void close() noexcept;
445  void remove_from_tcbs() {
446  auto id = connid{_local_ip, _foreign_ip, _local_port, _foreign_port};
447  _tcp._tcbs.erase(id);
448  }
449  std::optional<typename InetTraits::l4packet> get_packet();
450  void output() {
451  if (!_poll_active) {
452  _poll_active = true;
453  // FIXME: future is discarded
454  (void)_tcp.poll_tcb(_foreign_ip, this->shared_from_this()).then_wrapped([this] (auto&& f) {
455  try {
456  f.get();
457  } catch(arp_queue_full_error& ex) {
458  // retry later
459  _poll_active = false;
460  this->start_retransmit_timer();
461  } catch(arp_timeout_error& ex) {
462  if (this->in_state(SYN_SENT)) {
463  _connect_done.set_exception(ex);
464  this->cleanup();
465  }
466  // in other states connection should time out
467  }
468  });
469  }
470  }
471  future<> connect_done() {
472  return _connect_done.get_future();
473  }
474  tcp_state& state() {
475  return _state;
476  }
477  private:
478  void respond_with_reset(tcp_hdr* th);
479  bool merge_out_of_order();
480  void insert_out_of_order(tcp_seq seq, packet p);
481  void trim_receive_data_after_window();
482  bool should_send_ack(uint16_t seg_len);
483  void clear_delayed_ack() noexcept;
484  packet get_transmit_packet();
485  void retransmit_one() {
486  bool data_retransmit = true;
487  output_one(data_retransmit);
488  }
489  void start_retransmit_timer() {
490  auto now = clock_type::now();
491  start_retransmit_timer(now);
492  };
493  void start_retransmit_timer(clock_type::time_point now) {
494  auto tp = now + _rto;
495  _retransmit.rearm(tp);
496  };
497  void stop_retransmit_timer() noexcept {
498  _retransmit.cancel();
499  };
500  void start_persist_timer() {
501  auto now = clock_type::now();
502  start_persist_timer(now);
503  };
504  void start_persist_timer(clock_type::time_point now) {
505  auto tp = now + _persist_time_out;
506  _persist.rearm(tp);
507  };
508  void stop_persist_timer() {
509  _persist.cancel();
510  };
511  void persist();
512  void retransmit();
513  void fast_retransmit();
514  void update_rto(clock_type::time_point tx_time);
515  void update_cwnd(uint32_t acked_bytes);
516  void cleanup();
517  uint32_t can_send() {
518  if (_snd.window_probe) {
519  return 1;
520  }
521 
522  // Can not send if send window is zero
523  if (_snd.window == 0) {
524  return 0;
525  }
526 
527  // Can not send if send window is less than unacknowledged data size
528  auto window_used = uint32_t(_snd.next - _snd.unacknowledged);
529  if (window_used > _snd.window) {
530  return 0;
531  }
532 
533  // Can not send more than advertised window allows or unsent data size
534  auto x = std::min(_snd.window - window_used, _snd.unsent_len);
535 
536  // Can not send more than congestion window allows
537  x = std::min(_snd.cwnd, x);
538  if (_snd.dupacks == 1 || _snd.dupacks == 2) {
539  // RFC5681 Step 3.1
540  // Send cwnd + 2 * smss per RFC3042
541  auto flight = flight_size();
542  auto max = _snd.cwnd + 2 * _snd.mss;
543  x = flight <= max ? std::min(x, max - flight) : 0;
544  _snd.limited_transfer += x;
545  } else if (_snd.dupacks >= 3) {
546  // RFC5681 Step 3.5
547  // Sent 1 full-sized segment at most
548  x = std::min(uint32_t(_snd.mss), x);
549  }
550  return x;
551  }
552  uint32_t flight_size() {
553  uint32_t size = 0;
554  std::for_each(_snd.data.begin(), _snd.data.end(), [&] (unacked_segment& seg) { size += seg.p.len(); });
555  return size;
556  }
557  uint16_t local_mss() {
558  return _tcp.hw_features().mtu - net::tcp_hdr_len_min - InetTraits::ip_hdr_len_min;
559  }
560  void queue_packet(packet p) {
561  _packetq.emplace_back(typename InetTraits::l4packet{_foreign_ip, std::move(p)});
562  }
563  void signal_data_received() {
564  if (_rcv._data_received_promise) {
565  _rcv._data_received_promise->set_value();
566  _rcv._data_received_promise = {};
567  }
568  }
569  void signal_all_data_acked() {
570  if (_snd._all_data_acked_promise && _snd.unsent_len == 0) {
571  _snd._all_data_acked_promise->set_value();
572  _snd._all_data_acked_promise = {};
573  }
574  }
575  void signal_send_available() {
576  if (_snd._send_available_promise && _snd.max_queue_space > _snd.current_queue_space) {
577  _snd._send_available_promise->set_value();
578  _snd._send_available_promise = {};
579  }
580  }
581  void do_syn_sent() {
582  _state = SYN_SENT;
583  _snd.syn_tx_time = clock_type::now();
584  // Send <SYN> to remote
585  output();
586  }
587  void do_syn_received() {
588  _state = SYN_RECEIVED;
589  _snd.syn_tx_time = clock_type::now();
590  // Send <SYN,ACK> to remote
591  output();
592  }
593  void do_established() {
594  _state = ESTABLISHED;
595  update_rto(_snd.syn_tx_time);
596  _connect_done.set_value();
597  }
598  void do_reset() {
599  _state = CLOSED;
600  cleanup();
601  if (_rcv._data_received_promise) {
602  _rcv._data_received_promise->set_exception(tcp_reset_error());
603  _rcv._data_received_promise = std::nullopt;
604  }
605  if (_snd._all_data_acked_promise) {
606  _snd._all_data_acked_promise->set_exception(tcp_reset_error());
607  _snd._all_data_acked_promise = std::nullopt;
608  }
609  if (_snd._send_available_promise) {
610  _snd._send_available_promise->set_exception(tcp_reset_error());
611  _snd._send_available_promise = std::nullopt;
612  }
613  }
614  void do_time_wait() {
615  // FIXME: Implement TIME_WAIT state timer
616  _state = TIME_WAIT;
617  cleanup();
618  }
619  void do_closed() {
620  _state = CLOSED;
621  cleanup();
622  }
623  void do_setup_isn() {
624  _snd.initial = get_isn();
625  _snd.unacknowledged = _snd.initial;
626  _snd.next = _snd.initial + 1;
627  _snd.recover = _snd.initial;
628  }
629  void do_local_fin_acked() {
630  _snd.unacknowledged += 1;
631  _snd.next += 1;
632  }
633  bool syn_needs_on() const noexcept {
634  return in_state(SYN_SENT | SYN_RECEIVED);
635  }
636  bool fin_needs_on() const noexcept {
637  return in_state(FIN_WAIT_1 | CLOSING | LAST_ACK) && _snd.closed &&
638  _snd.unsent_len == 0;
639  }
640  bool ack_needs_on() const noexcept {
641  return !in_state(CLOSED | LISTEN | SYN_SENT);
642  }
643  bool foreign_will_not_send() const noexcept {
644  return in_state(CLOSING | TIME_WAIT | CLOSE_WAIT | LAST_ACK | CLOSED);
645  }
646  bool in_state(tcp_state state) const noexcept {
647  return uint16_t(_state) & uint16_t(state);
648  }
649  void exit_fast_recovery() {
650  _snd.dupacks = 0;
651  _snd.limited_transfer = 0;
652  _snd.partial_ack = 0;
653  }
654  uint32_t data_segment_acked(tcp_seq seg_ack);
655  bool segment_acceptable(tcp_seq seg_seq, unsigned seg_len);
656  void init_from_options(tcp_hdr* th, uint8_t* opt_start, uint8_t* opt_end);
657  friend class connection;
658  };
659  inet_type& _inet;
660  std::unordered_map<connid, lw_shared_ptr<tcb>, connid_hash> _tcbs;
661  std::unordered_map<uint16_t, listener*> _listening;
662  std::random_device _rd;
663  std::default_random_engine _e;
664  std::uniform_int_distribution<uint16_t> _port_dist{41952, 65535};
666  // queue for packets that do not belong to any tcb
668  semaphore _queue_space = {212992};
669  metrics::metric_groups _metrics;
670 public:
671  const inet_type& inet() const {
672  return _inet;
673  }
674  class connection {
675  lw_shared_ptr<tcb> _tcb;
676  public:
677  explicit connection(lw_shared_ptr<tcb> tcbp) : _tcb(std::move(tcbp)) { _tcb->_conn = this; }
678  connection(const connection&) = delete;
679  connection(connection&& x) noexcept : _tcb(std::move(x._tcb)) {
680  _tcb->_conn = this;
681  }
682  ~connection();
683  void operator=(const connection&) = delete;
684  connection& operator=(connection&& x) {
685  if (this != &x) {
686  this->~connection();
687  new (this) connection(std::move(x));
688  }
689  return *this;
690  }
691  future<> connected() {
692  return _tcb->connect_done();
693  }
694  future<> send(packet p) {
695  return _tcb->send(std::move(p));
696  }
697  future<> wait_for_data() {
698  return _tcb->wait_for_data();
699  }
700  future<> wait_input_shutdown() {
701  return _tcb->wait_input_shutdown();
702  }
703  packet read() {
704  return _tcb->read();
705  }
706  ipaddr foreign_ip() {
707  return _tcb->_foreign_ip;
708  }
709  uint16_t foreign_port() {
710  return _tcb->_foreign_port;
711  }
712  ipaddr local_ip() {
713  return _tcb->_local_ip;
714  }
715  uint16_t local_port() {
716  return _tcb->_local_port;
717  }
718  void shutdown_connect();
719  void close_read() noexcept;
720  void close_write() noexcept;
721  };
722  class listener {
723  tcp& _tcp;
724  uint16_t _port;
726  size_t _pending = 0;
727  private:
728  listener(tcp& t, uint16_t port, size_t queue_length)
729  : _tcp(t), _port(port), _q(queue_length) {
730  _tcp._listening.emplace(_port, this);
731  }
732  public:
733  listener(listener&& x)
734  : _tcp(x._tcp), _port(x._port), _q(std::move(x._q)) {
735  _tcp._listening[_port] = this;
736  x._port = 0;
737  }
738  ~listener() {
739  if (_port) {
740  _tcp._listening.erase(_port);
741  }
742  }
743  future<connection> accept() {
744  return _q.not_empty().then([this] {
745  return make_ready_future<connection>(_q.pop());
746  });
747  }
748  void abort_accept() {
749  _q.abort(std::make_exception_ptr(std::system_error(ECONNABORTED, std::system_category())));
750  }
751  bool full() { return _pending + _q.size() >= _q.max_size(); }
752  void inc_pending() { _pending++; }
753  void dec_pending() { _pending--; }
754 
755  const tcp& get_tcp() const {
756  return _tcp;
757  }
758  uint16_t port() const {
759  return _port;
760  }
761  friend class tcp;
762  };
763 public:
764  explicit tcp(inet_type& inet);
765  void received(packet p, ipaddr from, ipaddr to);
766  bool forward(forward_hash& out_hash_data, packet& p, size_t off);
767  listener listen(uint16_t port, size_t queue_length = 100);
769  const net::hw_features& hw_features() const { return _inet._inet.hw_features(); }
770  future<> poll_tcb(ipaddr to, lw_shared_ptr<tcb> tcb);
771  void add_connected_tcb(lw_shared_ptr<tcb> tcbp, uint16_t local_port) {
772  auto it = _listening.find(local_port);
773  if (it != _listening.end()) {
774  it->second->_q.push(connection(tcbp));
775  it->second->dec_pending();
776  }
777  }
778 private:
779  void send_packet_without_tcb(ipaddr from, ipaddr to, packet p);
780  void respond_with_reset(tcp_hdr* rth, ipaddr local_ip, ipaddr foreign_ip);
781  friend class listener;
782 };
783 
784 template <typename InetTraits>
785 tcp<InetTraits>::tcp(inet_type& inet)
786  : _inet(inet)
787  , _e(_rd()) {
788  namespace sm = metrics;
789 
790  _metrics.add_group("tcp", {
791  sm::make_counter("linearizations", [] { return tcp_packet_merger::linearizations(); },
792  sm::description("Counts a number of times a buffer linearization was invoked during the buffers merge process. "
793  "Divide it by a total TCP receive packet rate to get an everage number of lineraizations per TCP packet."))
794  });
795 
796  _inet.register_packet_provider([this, tcb_polled = 0u] () mutable {
797  std::optional<typename InetTraits::l4packet> l4p;
798  auto c = _poll_tcbs.size();
799  if (!_packetq.empty() && (!(tcb_polled % 128) || c == 0)) {
800  l4p = std::move(_packetq.front());
801  _packetq.pop_front();
802  _queue_space.signal(l4p.value().p.len());
803  } else {
804  while (c--) {
805  tcb_polled++;
806  lw_shared_ptr<tcb> tcb;
807  ethernet_address dst;
808  std::tie(tcb, dst) = std::move(_poll_tcbs.front());
809  _poll_tcbs.pop_front();
810  l4p = tcb->get_packet();
811  if (l4p) {
812  l4p.value().e_dst = dst;
813  break;
814  }
815  }
816  }
817  return l4p;
818  });
819 }
820 
821 template <typename InetTraits>
822 future<> tcp<InetTraits>::poll_tcb(ipaddr to, lw_shared_ptr<tcb> tcb) {
823  return _inet.get_l2_dst_address(to).then([this, tcb = std::move(tcb)] (ethernet_address dst) {
824  _poll_tcbs.emplace_back(std::move(tcb), dst);
825  });
826 }
827 
828 template <typename InetTraits>
829 auto tcp<InetTraits>::listen(uint16_t port, size_t queue_length) -> listener {
830  return listener(*this, port, queue_length);
831 }
832 
833 template <typename InetTraits>
834 auto tcp<InetTraits>::connect(socket_address sa) -> connection {
835  uint16_t src_port;
836  connid id;
837  auto src_ip = _inet._inet.host_address();
838  auto dst_ip = ipv4_address(sa);
839  auto dst_port = net::ntoh(sa.u.in.sin_port);
840 
841  do {
842  src_port = _port_dist(_e);
843  id = connid{src_ip, dst_ip, src_port, dst_port};
844  } while (_inet._inet.netif()->hw_queues_count() > 1 &&
845  (_inet._inet.netif()->hash2cpu(id.hash(_inet._inet.netif()->rss_key())) != this_shard_id()
846  || _tcbs.find(id) != _tcbs.end()));
847 
848  auto tcbp = make_lw_shared<tcb>(*this, id);
849  _tcbs.insert({id, tcbp});
850  tcbp->connect();
851  return connection(tcbp);
852 }
853 
854 template <typename InetTraits>
855 bool tcp<InetTraits>::forward(forward_hash& out_hash_data, packet& p, size_t off) {
856  auto th = p.get_header(off, tcp_hdr::len);
857  if (th) {
858  // src_port, dst_port in network byte order
859  out_hash_data.push_back(uint8_t(th[0]));
860  out_hash_data.push_back(uint8_t(th[1]));
861  out_hash_data.push_back(uint8_t(th[2]));
862  out_hash_data.push_back(uint8_t(th[3]));
863  }
864  return true;
865 }
866 
867 template <typename InetTraits>
868 void tcp<InetTraits>::received(packet p, ipaddr from, ipaddr to) {
869  auto th = p.get_header(0, tcp_hdr::len);
870  if (!th) {
871  return;
872  }
873  // data_offset is correct even before ntoh()
874  auto data_offset = uint8_t(th[12]) >> 4;
875  if (size_t(data_offset * 4) < tcp_hdr::len) {
876  return;
877  }
878 
879  if (!hw_features().rx_csum_offload) {
880  checksummer csum;
881  InetTraits::tcp_pseudo_header_checksum(csum, from, to, p.len());
882  csum.sum(p);
883  if (csum.get() != 0) {
884  return;
885  }
886  }
887  auto h = tcp_hdr::read(th);
888  auto id = connid{to, from, h.dst_port, h.src_port};
889  auto tcbi = _tcbs.find(id);
890  lw_shared_ptr<tcb> tcbp;
891  if (tcbi == _tcbs.end()) {
892  auto listener = _listening.find(id.local_port);
893  if (listener == _listening.end() || listener->second->full()) {
894  // 1) In CLOSE state
895  // 1.1 all data in the incoming segment is discarded. An incoming
896  // segment containing a RST is discarded. An incoming segment not
897  // containing a RST causes a RST to be sent in response.
898  // FIXME:
899  // if ACK off: <SEQ=0><ACK=SEG.SEQ+SEG.LEN><CTL=RST,ACK>
900  // if ACK on: <SEQ=SEG.ACK><CTL=RST>
901  return respond_with_reset(&h, id.local_ip, id.foreign_ip);
902  } else {
903  // 2) In LISTEN state
904  // 2.1 first check for an RST
905  if (h.f_rst) {
906  // An incoming RST should be ignored
907  return;
908  }
909  // 2.2 second check for an ACK
910  if (h.f_ack) {
911  // Any acknowledgment is bad if it arrives on a connection
912  // still in the LISTEN state.
913  // <SEQ=SEG.ACK><CTL=RST>
914  return respond_with_reset(&h, id.local_ip, id.foreign_ip);
915  }
916  // 2.3 third check for a SYN
917  if (h.f_syn) {
918  // check the security
919  // NOTE: Ignored for now
920  tcbp = make_lw_shared<tcb>(*this, id);
921  _tcbs.insert({id, tcbp});
922  // TODO: we need to remove the tcb and decrease the pending if
923  // it stays SYN_RECEIVED state forever.
924  listener->second->inc_pending();
925 
926  return tcbp->input_handle_listen_state(&h, std::move(p));
927  }
928  // 2.4 fourth other text or control
929  // So you are unlikely to get here, but if you do, drop the
930  // segment, and return.
931  return;
932  }
933  } else {
934  tcbp = tcbi->second;
935  if (tcbp->state() == tcp_state::SYN_SENT) {
936  // 3) In SYN_SENT State
937  return tcbp->input_handle_syn_sent_state(&h, std::move(p));
938  } else {
939  // 4) In other state, can be one of the following:
940  // SYN_RECEIVED, ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2
941  // CLOSE_WAIT, CLOSING, LAST_ACK, TIME_WAIT
942  return tcbp->input_handle_other_state(&h, std::move(p));
943  }
944  }
945 }
946 
947 // Send packet does not belong to any tcb
948 template <typename InetTraits>
949 void tcp<InetTraits>::send_packet_without_tcb(ipaddr from, ipaddr to, packet p) {
950  if (_queue_space.try_wait(p.len())) { // drop packets that do not fit the queue
951  // FIXME: future is discarded
952  (void)_inet.get_l2_dst_address(to).then([this, to, p = std::move(p)] (ethernet_address e_dst) mutable {
953  _packetq.emplace_back(ipv4_traits::l4packet{to, std::move(p), e_dst, ip_protocol_num::tcp});
954  });
955  }
956 }
957 
958 template <typename InetTraits>
959 tcp<InetTraits>::connection::~connection() {
960  if (_tcb) {
961  _tcb->_conn = nullptr;
962  close_read();
963  close_write();
964  }
965 }
966 
967 template <typename InetTraits>
968 tcp<InetTraits>::tcb::tcb(tcp& t, connid id)
969  : _tcp(t)
970  , _local_ip(id.local_ip)
971  , _foreign_ip(id.foreign_ip)
972  , _local_port(id.local_port)
973  , _foreign_port(id.foreign_port)
974  , _delayed_ack([this] { _nr_full_seg_received = 0; output(); })
975  , _retransmit([this] { retransmit(); })
976  , _persist([this] { persist(); }) {
977 }
978 
979 template <typename InetTraits>
980 void tcp<InetTraits>::tcb::respond_with_reset(tcp_hdr* rth) {
981  _tcp.respond_with_reset(rth, _local_ip, _foreign_ip);
982 }
983 
984 template <typename InetTraits>
985 void tcp<InetTraits>::respond_with_reset(tcp_hdr* rth, ipaddr local_ip, ipaddr foreign_ip) {
986  if (rth->f_rst) {
987  return;
988  }
989  packet p;
990  auto th = p.prepend_uninitialized_header(tcp_hdr::len);
991  auto h = tcp_hdr{};
992  h.src_port = rth->dst_port;
993  h.dst_port = rth->src_port;
994  if (rth->f_ack) {
995  h.seq = rth->ack;
996  }
997  // If this RST packet is in response to a SYN packet. We ACK the ISN.
998  if (rth->f_syn) {
999  h.ack = rth->seq + 1;
1000  h.f_ack = true;
1001  }
1002  h.f_rst = true;
1003  h.data_offset = tcp_hdr::len / 4;
1004  h.checksum = 0;
1005  h.write(th);
1006 
1007  checksummer csum;
1008  offload_info oi;
1009  InetTraits::tcp_pseudo_header_checksum(csum, local_ip, foreign_ip, tcp_hdr::len);
1010  uint16_t checksum;
1011  if (hw_features().tx_csum_l4_offload) {
1012  checksum = ~csum.get();
1013  oi.needs_csum = true;
1014  } else {
1015  csum.sum(p);
1016  checksum = csum.get();
1017  oi.needs_csum = false;
1018  }
1019  tcp_hdr::write_nbo_checksum(th, checksum);
1020 
1021  oi.protocol = ip_protocol_num::tcp;
1022  oi.tcp_hdr_len = tcp_hdr::len;
1023  p.set_offload_info(oi);
1024 
1025  send_packet_without_tcb(local_ip, foreign_ip, std::move(p));
1026 }
1027 
1028 template <typename InetTraits>
1029 uint32_t tcp<InetTraits>::tcb::data_segment_acked(tcp_seq seg_ack) {
1030  uint32_t total_acked_bytes = 0;
1031  // Full ACK of segment
1032  while (!_snd.data.empty()
1033  && (_snd.unacknowledged + _snd.data.front().p.len() <= seg_ack)) {
1034  auto acked_bytes = _snd.data.front().p.len();
1035  _snd.unacknowledged += acked_bytes;
1036  // Ignore retransmitted segments when setting the RTO
1037  if (_snd.data.front().nr_transmits == 0) {
1038  update_rto(_snd.data.front().tx_time);
1039  }
1040  update_cwnd(acked_bytes);
1041  total_acked_bytes += acked_bytes;
1042  _snd.current_queue_space -= _snd.data.front().data_len;
1043  signal_send_available();
1044  _snd.data.pop_front();
1045  }
1046  // Partial ACK of segment
1047  if (_snd.unacknowledged < seg_ack) {
1048  auto acked_bytes = seg_ack - _snd.unacknowledged;
1049  if (!_snd.data.empty()) {
1050  auto& unacked_seg = _snd.data.front();
1051  unacked_seg.p.trim_front(acked_bytes);
1052  }
1053  _snd.unacknowledged = seg_ack;
1054  update_cwnd(acked_bytes);
1055  total_acked_bytes += acked_bytes;
1056  }
1057  return total_acked_bytes;
1058 }
1059 
1060 template <typename InetTraits>
1061 bool tcp<InetTraits>::tcb::segment_acceptable(tcp_seq seg_seq, unsigned seg_len) {
1062  if (seg_len == 0 && _rcv.window == 0) {
1063  // SEG.SEQ = RCV.NXT
1064  return seg_seq == _rcv.next;
1065  } else if (seg_len == 0 && _rcv.window > 0) {
1066  // RCV.NXT =< SEG.SEQ < RCV.NXT+RCV.WND
1067  return (_rcv.next <= seg_seq) && (seg_seq < _rcv.next + _rcv.window);
1068  } else if (seg_len > 0 && _rcv.window > 0) {
1069  // RCV.NXT =< SEG.SEQ < RCV.NXT+RCV.WND
1070  // or
1071  // RCV.NXT =< SEG.SEQ+SEG.LEN-1 < RCV.NXT+RCV.WND
1072  bool x = (_rcv.next <= seg_seq) && seg_seq < (_rcv.next + _rcv.window);
1073  bool y = (_rcv.next <= seg_seq + seg_len - 1) && (seg_seq + seg_len - 1 < _rcv.next + _rcv.window);
1074  return x || y;
1075  } else {
1076  // SEG.LEN > 0 RCV.WND = 0, not acceptable
1077  return false;
1078  }
1079 }
1080 
1081 template <typename InetTraits>
1082 void tcp<InetTraits>::tcb::init_from_options(tcp_hdr* th, uint8_t* opt_start, uint8_t* opt_end) {
1083  // Handle tcp options
1084  _option.parse(opt_start, opt_end);
1085 
1086  // Remote receive window scale factor
1087  _snd.window_scale = _option._remote_win_scale;
1088  // Local receive window scale factor
1089  _rcv.window_scale = _option._local_win_scale;
1090 
1091  // Maximum segment size remote can receive
1092  _snd.mss = _option._remote_mss;
1093  // Maximum segment size local can receive
1094  _rcv.mss = _option._local_mss = local_mss();
1095 
1096  _rcv.window = get_default_receive_window_size();
1097  _snd.window = th->window << _snd.window_scale;
1098 
1099  // Segment sequence number used for last window update
1100  _snd.wl1 = th->seq;
1101  // Segment acknowledgment number used for last window update
1102  _snd.wl2 = th->ack;
1103 
1104  // Setup initial congestion window
1105  if (2190 < _snd.mss) {
1106  _snd.cwnd = 2 * _snd.mss;
1107  } else if (1095 < _snd.mss && _snd.mss <= 2190) {
1108  _snd.cwnd = 3 * _snd.mss;
1109  } else {
1110  _snd.cwnd = 4 * _snd.mss;
1111  }
1112 
1113  // Setup initial slow start threshold
1114  _snd.ssthresh = th->window << _snd.window_scale;
1115 }
1116 
1117 template <typename InetTraits>
1118 void tcp<InetTraits>::tcb::input_handle_listen_state(tcp_hdr* th, packet p) {
1119  auto opt_len = th->data_offset * 4 - tcp_hdr::len;
1120  auto opt_start = reinterpret_cast<uint8_t*>(p.get_header(0, th->data_offset * 4)) + tcp_hdr::len;
1121  auto opt_end = opt_start + opt_len;
1122  p.trim_front(th->data_offset * 4);
1123  tcp_seq seg_seq = th->seq;
1124 
1125  // Set RCV.NXT to SEG.SEQ+1, IRS is set to SEG.SEQ
1126  _rcv.next = seg_seq + 1;
1127  _rcv.initial = seg_seq;
1128 
1129  // ISS should be selected and a SYN segment sent of the form:
1130  // <SEQ=ISS><ACK=RCV.NXT><CTL=SYN,ACK>
1131  // SND.NXT is set to ISS+1 and SND.UNA to ISS
1132  // NOTE: In previous code, _snd.next is set to ISS + 1 only when SYN is
1133  // ACKed. Now, we set _snd.next to ISS + 1 here, so in output_one(): we
1134  // have
1135  // th->seq = syn_on ? _snd.initial : _snd.next
1136  // to make sure retransmitted SYN has correct SEQ number.
1137  do_setup_isn();
1138 
1139  _rcv.urgent = _rcv.next;
1140 
1141  tcp_debug("listen: LISTEN -> SYN_RECEIVED\n");
1142  init_from_options(th, opt_start, opt_end);
1143  do_syn_received();
1144 }
1145 
1146 template <typename InetTraits>
1147 void tcp<InetTraits>::tcb::input_handle_syn_sent_state(tcp_hdr* th, packet p) {
1148  auto opt_len = th->data_offset * 4 - tcp_hdr::len;
1149  auto opt_start = reinterpret_cast<uint8_t*>(p.get_header(0, th->data_offset * 4)) + tcp_hdr::len;
1150  auto opt_end = opt_start + opt_len;
1151  p.trim_front(th->data_offset * 4);
1152  tcp_seq seg_seq = th->seq;
1153  auto seg_ack = th->ack;
1154 
1155  bool acceptable = false;
1156  // 3.1 first check the ACK bit
1157  if (th->f_ack) {
1158  // If SEG.ACK =< ISS, or SEG.ACK > SND.NXT, send a reset (unless the
1159  // RST bit is set, if so drop the segment and return)
1160  if (seg_ack <= _snd.initial || seg_ack > _snd.next) {
1161  return respond_with_reset(th);
1162  }
1163 
1164  // If SND.UNA =< SEG.ACK =< SND.NXT then the ACK is acceptable.
1165  acceptable = _snd.unacknowledged <= seg_ack && seg_ack <= _snd.next;
1166  }
1167 
1168  // 3.2 second check the RST bit
1169  if (th->f_rst) {
1170  // If the ACK was acceptable then signal the user "error: connection
1171  // reset", drop the segment, enter CLOSED state, delete TCB, and
1172  // return. Otherwise (no ACK) drop the segment and return.
1173  if (acceptable) {
1174  _connect_done.set_exception(tcp_refused_error());
1175  return do_reset();
1176  } else {
1177  return;
1178  }
1179  }
1180 
1181  // 3.3 third check the security and precedence
1182  // NOTE: Ignored for now
1183 
1184  // 3.4 fourth check the SYN bit
1185  if (th->f_syn) {
1186  // RCV.NXT is set to SEG.SEQ+1, IRS is set to SEG.SEQ. SND.UNA should
1187  // be advanced to equal SEG.ACK (if there is an ACK), and any segments
1188  // on the retransmission queue which are thereby acknowledged should be
1189  // removed.
1190  _rcv.next = seg_seq + 1;
1191  _rcv.initial = seg_seq;
1192  if (th->f_ack) {
1193  // TODO: clean retransmission queue
1194  _snd.unacknowledged = seg_ack;
1195  }
1196  if (_snd.unacknowledged > _snd.initial) {
1197  // If SND.UNA > ISS (our SYN has been ACKed), change the connection
1198  // state to ESTABLISHED, form an ACK segment
1199  // <SEQ=SND.NXT><ACK=RCV.NXT><CTL=ACK>
1200  tcp_debug("syn: SYN_SENT -> ESTABLISHED\n");
1201  init_from_options(th, opt_start, opt_end);
1202  do_established();
1203  output();
1204  } else {
1205  // Otherwise enter SYN_RECEIVED, form a SYN,ACK segment
1206  // <SEQ=ISS><ACK=RCV.NXT><CTL=SYN,ACK>
1207  tcp_debug("syn: SYN_SENT -> SYN_RECEIVED\n");
1208  do_syn_received();
1209  }
1210  }
1211 
1212  // 3.5 fifth, if neither of the SYN or RST bits is set then drop the
1213  // segment and return.
1214  return;
1215 }
1216 
1217 template <typename InetTraits>
1218 void tcp<InetTraits>::tcb::input_handle_other_state(tcp_hdr* th, packet p) {
1219  p.trim_front(th->data_offset * 4);
1220  bool do_output = false;
1221  bool do_output_data = false;
1222  tcp_seq seg_seq = th->seq;
1223  auto seg_ack = th->ack;
1224  auto seg_len = p.len();
1225 
1226  // 4.1 first check sequence number
1227  if (!segment_acceptable(seg_seq, seg_len)) {
1228  //<SEQ=SND.NXT><ACK=RCV.NXT><CTL=ACK>
1229  return output();
1230  }
1231 
1232  // In the following it is assumed that the segment is the idealized
1233  // segment that begins at RCV.NXT and does not exceed the window.
1234  if (seg_seq < _rcv.next) {
1235  // ignore already acknowledged data
1236  auto dup = std::min(uint32_t(_rcv.next - seg_seq), seg_len);
1237  p.trim_front(dup);
1238  seg_len -= dup;
1239  seg_seq += dup;
1240  }
1241  // FIXME: We should trim data outside the right edge of the receive window as well
1242 
1243  if (seg_seq != _rcv.next) {
1244  insert_out_of_order(seg_seq, std::move(p));
1245  // A TCP receiver SHOULD send an immediate duplicate ACK
1246  // when an out-of-order segment arrives.
1247  return output();
1248  }
1249 
1250  // 4.2 second check the RST bit
1251  if (th->f_rst) {
1252  if (in_state(SYN_RECEIVED)) {
1253  // If this connection was initiated with a passive OPEN (i.e.,
1254  // came from the LISTEN state), then return this connection to
1255  // LISTEN state and return. The user need not be informed. If
1256  // this connection was initiated with an active OPEN (i.e., came
1257  // from SYN_SENT state) then the connection was refused, signal
1258  // the user "connection refused". In either case, all segments
1259  // on the retransmission queue should be removed. And in the
1260  // active OPEN case, enter the CLOSED state and delete the TCB,
1261  // and return.
1262  _connect_done.set_exception(tcp_refused_error());
1263  return do_reset();
1264  }
1265  if (in_state(ESTABLISHED | FIN_WAIT_1 | FIN_WAIT_2 | CLOSE_WAIT)) {
1266  // If the RST bit is set then, any outstanding RECEIVEs and SEND
1267  // should receive "reset" responses. All segment queues should be
1268  // flushed. Users should also receive an unsolicited general
1269  // "connection reset" signal. Enter the CLOSED state, delete the
1270  // TCB, and return.
1271  return do_reset();
1272  }
1273  if (in_state(CLOSING | LAST_ACK | TIME_WAIT)) {
1274  // If the RST bit is set then, enter the CLOSED state, delete the
1275  // TCB, and return.
1276  return do_closed();
1277  }
1278  }
1279 
1280  // 4.3 third check security and precedence
1281  // NOTE: Ignored for now
1282 
1283  // 4.4 fourth, check the SYN bit
1284  if (th->f_syn) {
1285  // SYN_RECEIVED, ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2
1286  // CLOSE_WAIT, CLOSING, LAST_ACK, TIME_WAIT
1287 
1288  // If the SYN is in the window it is an error, send a reset, any
1289  // outstanding RECEIVEs and SEND should receive "reset" responses,
1290  // all segment queues should be flushed, the user should also
1291  // receive an unsolicited general "connection reset" signal, enter
1292  // the CLOSED state, delete the TCB, and return.
1293  respond_with_reset(th);
1294  return do_reset();
1295 
1296  // If the SYN is not in the window this step would not be reached
1297  // and an ack would have been sent in the first step (sequence
1298  // number check).
1299  }
1300 
1301  // 4.5 fifth check the ACK field
1302  if (!th->f_ack) {
1303  // if the ACK bit is off drop the segment and return
1304  return;
1305  } else {
1306  // SYN_RECEIVED STATE
1307  if (in_state(SYN_RECEIVED)) {
1308  // If SND.UNA =< SEG.ACK =< SND.NXT then enter ESTABLISHED state
1309  // and continue processing.
1310  if (_snd.unacknowledged <= seg_ack && seg_ack <= _snd.next) {
1311  tcp_debug("SYN_RECEIVED -> ESTABLISHED\n");
1312  do_established();
1313  _tcp.add_connected_tcb(this->shared_from_this(), _local_port);
1314  } else {
1315  // <SEQ=SEG.ACK><CTL=RST>
1316  return respond_with_reset(th);
1317  }
1318  }
1319  auto update_window = [this, th, seg_seq, seg_ack] {
1320  tcp_debug("window update seg_seq=%d, seg_ack=%d, old window=%d new window=%d\n",
1321  seg_seq, seg_ack, _snd.window, th->window << _snd.window_scale);
1322  _snd.window = th->window << _snd.window_scale;
1323  _snd.wl1 = seg_seq;
1324  _snd.wl2 = seg_ack;
1325  _snd.zero_window_probing_out = 0;
1326  if (_snd.window == 0) {
1327  _persist_time_out = _rto;
1328  start_persist_timer();
1329  } else {
1330  stop_persist_timer();
1331  }
1332  };
1333  // ESTABLISHED STATE or
1334  // CLOSE_WAIT STATE: Do the same processing as for the ESTABLISHED state.
1335  if (in_state(ESTABLISHED | CLOSE_WAIT)){
1336  // When we are in zero window probing phase and packets_out = 0 we bypass "duplicated ack" check
1337  auto packets_out = _snd.next - _snd.unacknowledged - _snd.zero_window_probing_out;
1338  // If SND.UNA < SEG.ACK =< SND.NXT then, set SND.UNA <- SEG.ACK.
1339  if (_snd.unacknowledged < seg_ack && seg_ack <= _snd.next) {
1340  // Remote ACKed data we sent
1341  auto acked_bytes = data_segment_acked(seg_ack);
1342 
1343  // If SND.UNA < SEG.ACK =< SND.NXT, the send window should be updated.
1344  if (_snd.wl1 < seg_seq || (_snd.wl1 == seg_seq && _snd.wl2 <= seg_ack)) {
1345  update_window();
1346  }
1347 
1348  // some data is acked, try send more data
1349  do_output_data = true;
1350 
1351  auto set_retransmit_timer = [this] {
1352  if (_snd.data.empty()) {
1353  // All outstanding segments are acked, turn off the timer.
1354  stop_retransmit_timer();
1355  // Signal the waiter of this event
1356  signal_all_data_acked();
1357  } else {
1358  // Restart the timer becasue new data is acked.
1359  start_retransmit_timer();
1360  }
1361  };
1362 
1363  if (_snd.dupacks >= 3) {
1364  // We are in fast retransmit / fast recovery phase
1365  uint32_t smss = _snd.mss;
1366  if (seg_ack > _snd.recover) {
1367  tcp_debug("ack: full_ack\n");
1368  // Set cwnd to min (ssthresh, max(FlightSize, SMSS) + SMSS)
1369  _snd.cwnd = std::min(_snd.ssthresh, std::max(flight_size(), smss) + smss);
1370  // Exit the fast recovery procedure
1371  exit_fast_recovery();
1372  set_retransmit_timer();
1373  } else {
1374  tcp_debug("ack: partial_ack\n");
1375  // Retransmit the first unacknowledged segment
1376  fast_retransmit();
1377  // Deflate the congestion window by the amount of new data
1378  // acknowledged by the Cumulative Acknowledgment field
1379  _snd.cwnd -= acked_bytes;
1380  // If the partial ACK acknowledges at least one SMSS of new
1381  // data, then add back SMSS bytes to the congestion window
1382  if (acked_bytes >= smss) {
1383  _snd.cwnd += smss;
1384  }
1385  // Send a new segment if permitted by the new value of
1386  // cwnd. Do not exit the fast recovery procedure For
1387  // the first partial ACK that arrives during fast
1388  // recovery, also reset the retransmit timer.
1389  if (++_snd.partial_ack == 1) {
1390  start_retransmit_timer();
1391  }
1392  }
1393  } else {
1394  // RFC5681: The fast retransmit algorithm uses the arrival
1395  // of 3 duplicate ACKs (as defined in section 2, without
1396  // any intervening ACKs which move SND.UNA) as an
1397  // indication that a segment has been lost.
1398  //
1399  // So, here we reset dupacks to zero becasue this ACK moves
1400  // SND.UNA.
1401  exit_fast_recovery();
1402  set_retransmit_timer();
1403  }
1404  } else if ((packets_out > 0) && !_snd.data.empty() && seg_len == 0 &&
1405  th->f_fin == 0 && th->f_syn == 0 &&
1406  th->ack == _snd.unacknowledged &&
1407  uint32_t(th->window << _snd.window_scale) == _snd.window) {
1408  // Note:
1409  // RFC793 states:
1410  // If the ACK is a duplicate (SEG.ACK < SND.UNA), it can be ignored
1411  // RFC5681 states:
1412  // The TCP sender SHOULD use the "fast retransmit" algorithm to detect
1413  // and repair loss, based on incoming duplicate ACKs.
1414  // Here, We follow RFC5681.
1415  _snd.dupacks++;
1416  uint32_t smss = _snd.mss;
1417  // 3 duplicated ACKs trigger a fast retransmit
1418  if (_snd.dupacks == 1 || _snd.dupacks == 2) {
1419  // RFC5681 Step 3.1
1420  // Send cwnd + 2 * smss per RFC3042
1421  do_output_data = true;
1422  } else if (_snd.dupacks == 3) {
1423  // RFC6582 Step 3.2
1424  if (seg_ack - 1 > _snd.recover) {
1425  _snd.recover = _snd.next - 1;
1426  // RFC5681 Step 3.2
1427  _snd.ssthresh = std::max((flight_size() - _snd.limited_transfer) / 2, 2 * smss);
1428  fast_retransmit();
1429  } else {
1430  // Do not enter fast retransmit and do not reset ssthresh
1431  }
1432  // RFC5681 Step 3.3
1433  _snd.cwnd = _snd.ssthresh + 3 * smss;
1434  } else if (_snd.dupacks > 3) {
1435  // RFC5681 Step 3.4
1436  _snd.cwnd += smss;
1437  // RFC5681 Step 3.5
1438  do_output_data = true;
1439  }
1440  } else if (seg_ack > _snd.next) {
1441  // If the ACK acks something not yet sent (SEG.ACK > SND.NXT)
1442  // then send an ACK, drop the segment, and return
1443  return output();
1444  } else if (_snd.window == 0 && th->window > 0) {
1445  update_window();
1446  do_output_data = true;
1447  }
1448  }
1449  // FIN_WAIT_1 STATE
1450  if (in_state(FIN_WAIT_1)) {
1451  // In addition to the processing for the ESTABLISHED state, if
1452  // our FIN is now acknowledged then enter FIN-WAIT-2 and continue
1453  // processing in that state.
1454  if (seg_ack == _snd.next + 1) {
1455  tcp_debug("ack: FIN_WAIT_1 -> FIN_WAIT_2\n");
1456  _state = FIN_WAIT_2;
1457  do_local_fin_acked();
1458  }
1459  }
1460  // FIN_WAIT_2 STATE
1461  if (in_state(FIN_WAIT_2)) {
1462  // In addition to the processing for the ESTABLISHED state, if
1463  // the retransmission queue is empty, the user’s CLOSE can be
1464  // acknowledged ("ok") but do not delete the TCB.
1465  // TODO
1466  }
1467  // CLOSING STATE
1468  if (in_state(CLOSING)) {
1469  if (seg_ack == _snd.next + 1) {
1470  tcp_debug("ack: CLOSING -> TIME_WAIT\n");
1471  do_local_fin_acked();
1472  return do_time_wait();
1473  } else {
1474  return;
1475  }
1476  }
1477  // LAST_ACK STATE
1478  if (in_state(LAST_ACK)) {
1479  if (seg_ack == _snd.next + 1) {
1480  tcp_debug("ack: LAST_ACK -> CLOSED\n");
1481  do_local_fin_acked();
1482  return do_closed();
1483  }
1484  }
1485  // TIME_WAIT STATE
1486  if (in_state(TIME_WAIT)) {
1487  // The only thing that can arrive in this state is a
1488  // retransmission of the remote FIN. Acknowledge it, and restart
1489  // the 2 MSL timeout.
1490  // TODO
1491  }
1492  }
1493 
1494  // 4.6 sixth, check the URG bit
1495  if (th->f_urg) {
1496  // TODO
1497  }
1498 
1499  // 4.7 seventh, process the segment text
1500  if (in_state(ESTABLISHED | FIN_WAIT_1 | FIN_WAIT_2)) {
1501  if (p.len()) {
1502  // Once the TCP takes responsibility for the data it advances
1503  // RCV.NXT over the data accepted, and adjusts RCV.WND as
1504  // apporopriate to the current buffer availability. The total of
1505  // RCV.NXT and RCV.WND should not be reduced.
1506  _rcv.data_size += p.len();
1507  _rcv.data.push_back(std::move(p));
1508  _rcv.next += seg_len;
1509  auto merged = merge_out_of_order();
1510  _rcv.window = get_modified_receive_window_size();
1511  signal_data_received();
1512  // Send an acknowledgment of the form:
1513  // <SEQ=SND.NXT><ACK=RCV.NXT><CTL=ACK>
1514  // This acknowledgment should be piggybacked on a segment being
1515  // transmitted if possible without incurring undue delay.
1516  if (merged) {
1517  // TCP receiver SHOULD send an immediate ACK when the
1518  // incoming segment fills in all or part of a gap in the
1519  // sequence space.
1520  do_output = true;
1521  } else {
1522  do_output = should_send_ack(seg_len);
1523  }
1524  }
1525  } else if (in_state(CLOSE_WAIT | CLOSING | LAST_ACK | TIME_WAIT)) {
1526  // This should not occur, since a FIN has been received from the
1527  // remote side. Ignore the segment text.
1528  return;
1529  }
1530 
1531  // 4.8 eighth, check the FIN bit
1532  if (th->f_fin) {
1533  if (_fin_recvd_promise) {
1534  _fin_recvd_promise->set_value();
1535  _fin_recvd_promise.reset();
1536  }
1537  if (in_state(CLOSED | LISTEN | SYN_SENT)) {
1538  // Do not process the FIN if the state is CLOSED, LISTEN or SYN-SENT
1539  // since the SEG.SEQ cannot be validated; drop the segment and return.
1540  return;
1541  }
1542  auto fin_seq = seg_seq + seg_len;
1543  if (fin_seq == _rcv.next) {
1544  _rcv.next = fin_seq + 1;
1545  signal_data_received();
1546 
1547  // If this <FIN> packet contains data as well, we can ACK both data
1548  // and <FIN> in a single packet, so canncel the previous ACK.
1549  clear_delayed_ack();
1550  do_output = false;
1551  // Send ACK for the FIN!
1552  output();
1553 
1554  if (in_state(SYN_RECEIVED | ESTABLISHED)) {
1555  tcp_debug("fin: SYN_RECEIVED or ESTABLISHED -> CLOSE_WAIT\n");
1556  _state = CLOSE_WAIT;
1557  }
1558  if (in_state(FIN_WAIT_1)) {
1559  // If our FIN has been ACKed (perhaps in this segment), then
1560  // enter TIME-WAIT, start the time-wait timer, turn off the other
1561  // timers; otherwise enter the CLOSING state.
1562  // Note: If our FIN has been ACKed, we should be in FIN_WAIT_2
1563  // not FIN_WAIT_1 if we reach here.
1564  tcp_debug("fin: FIN_WAIT_1 -> CLOSING\n");
1565  _state = CLOSING;
1566  }
1567  if (in_state(FIN_WAIT_2)) {
1568  tcp_debug("fin: FIN_WAIT_2 -> TIME_WAIT\n");
1569  return do_time_wait();
1570  }
1571  }
1572  }
1573  if (do_output || (do_output_data && can_send())) {
1574  // Since we will do output, we can canncel scheduled delayed ACK.
1575  clear_delayed_ack();
1576  output();
1577  }
1578 }
1579 
1580 template <typename InetTraits>
1581 packet tcp<InetTraits>::tcb::get_transmit_packet() {
1582  // easy case: empty queue
1583  if (_snd.unsent.empty()) {
1584  return packet();
1585  }
1586  auto can_send = this->can_send();
1587  // Max number of TCP payloads we can pass to NIC
1588  uint32_t len;
1589  if (_tcp.hw_features().tx_tso) {
1590  // FIXME: Info tap device the size of the splitted packet
1591  len = _tcp.hw_features().max_packet_len - net::tcp_hdr_len_min - InetTraits::ip_hdr_len_min;
1592  } else {
1593  len = std::min(uint16_t(_tcp.hw_features().mtu - net::tcp_hdr_len_min - InetTraits::ip_hdr_len_min), _snd.mss);
1594  }
1595  can_send = std::min(can_send, len);
1596  // easy case: one small packet
1597  if (_snd.unsent.size() == 1 && _snd.unsent.front().len() <= can_send) {
1598  auto p = std::move(_snd.unsent.front());
1599  _snd.unsent.pop_front();
1600  _snd.unsent_len -= p.len();
1601  return p;
1602  }
1603  // moderate case: need to split one packet
1604  if (_snd.unsent.front().len() > can_send) {
1605  auto p = _snd.unsent.front().share(0, can_send);
1606  _snd.unsent.front().trim_front(can_send);
1607  _snd.unsent_len -= p.len();
1608  return p;
1609  }
1610  // hard case: merge some packets, possibly split last
1611  auto p = std::move(_snd.unsent.front());
1612  _snd.unsent.pop_front();
1613  can_send -= p.len();
1614  while (!_snd.unsent.empty()
1615  && _snd.unsent.front().len() <= can_send) {
1616  can_send -= _snd.unsent.front().len();
1617  p.append(std::move(_snd.unsent.front()));
1618  _snd.unsent.pop_front();
1619  }
1620  if (!_snd.unsent.empty() && can_send) {
1621  auto& q = _snd.unsent.front();
1622  p.append(q.share(0, can_send));
1623  q.trim_front(can_send);
1624  }
1625  _snd.unsent_len -= p.len();
1626  return p;
1627 }
1628 
1629 template <typename InetTraits>
1630 void tcp<InetTraits>::tcb::output_one(bool data_retransmit) {
1631  if (in_state(CLOSED)) {
1632  return;
1633  }
1634 
1635  packet p = data_retransmit ? _snd.data.front().p.share() : get_transmit_packet();
1636  packet clone = p.share(); // early clone to prevent share() from calling packet::unuse_internal_data() on header.
1637  uint16_t len = p.len();
1638  bool syn_on = syn_needs_on();
1639  bool ack_on = ack_needs_on();
1640 
1641  auto options_size = _option.get_size(syn_on, ack_on);
1642  auto th = p.prepend_uninitialized_header(tcp_hdr::len + options_size);
1643  auto h = tcp_hdr{};
1644 
1645  h.src_port = _local_port;
1646  h.dst_port = _foreign_port;
1647 
1648  h.f_syn = syn_on;
1649  h.f_ack = ack_on;
1650  if (ack_on) {
1651  clear_delayed_ack();
1652  }
1653  h.f_urg = false;
1654  h.f_psh = false;
1655 
1656  tcp_seq seq;
1657  if (data_retransmit) {
1658  seq = _snd.unacknowledged;
1659  } else {
1660  seq = syn_on ? _snd.initial : _snd.next;
1661  _snd.next += len;
1662  }
1663  h.seq = seq;
1664  h.ack = _rcv.next;
1665  h.data_offset = (tcp_hdr::len + options_size) / 4;
1666  h.window = _rcv.window >> _rcv.window_scale;
1667  h.checksum = 0;
1668 
1669  // FIXME: does the FIN have to fit in the window?
1670  bool fin_on = fin_needs_on();
1671  h.f_fin = fin_on;
1672 
1673  // Add tcp options
1674  _option.fill(th, &h, options_size);
1675  h.write(th);
1676 
1677  offload_info oi;
1678  checksummer csum;
1679  uint16_t pseudo_hdr_seg_len = 0;
1680 
1681  oi.tcp_hdr_len = tcp_hdr::len + options_size;
1682 
1683  if (_tcp.hw_features().tx_csum_l4_offload) {
1684  oi.needs_csum = true;
1685 
1686  //
1687  // tx checksum offloading: both virtio-net's VIRTIO_NET_F_CSUM dpdk's
1688  // PKT_TX_TCP_CKSUM - requires th->checksum to be initialized to ones'
1689  // complement sum of the pseudo header.
1690  //
1691  // For TSO the csum should be calculated for a pseudo header with
1692  // segment length set to 0. All the rest is the same as for a TCP Tx
1693  // CSUM offload case.
1694  //
1695  if (_tcp.hw_features().tx_tso && len > _snd.mss) {
1696  oi.tso_seg_size = _snd.mss;
1697  } else {
1698  pseudo_hdr_seg_len = tcp_hdr::len + options_size + len;
1699  }
1700  } else {
1701  pseudo_hdr_seg_len = tcp_hdr::len + options_size + len;
1702  oi.needs_csum = false;
1703  }
1704 
1705  InetTraits::tcp_pseudo_header_checksum(csum, _local_ip, _foreign_ip,
1706  pseudo_hdr_seg_len);
1707 
1708  uint16_t checksum;
1709  if (_tcp.hw_features().tx_csum_l4_offload) {
1710  checksum = ~csum.get();
1711  } else {
1712  csum.sum(p);
1713  checksum = csum.get();
1714  }
1715  tcp_hdr::write_nbo_checksum(th, checksum);
1716 
1717  oi.protocol = ip_protocol_num::tcp;
1718 
1719  p.set_offload_info(oi);
1720 
1721  if (!data_retransmit && (len || syn_on || fin_on)) {
1722  auto now = clock_type::now();
1723  if (len) {
1724  unsigned nr_transmits = 0;
1725  _snd.data.emplace_back(unacked_segment{std::move(clone),
1726  len, nr_transmits, now});
1727  }
1728  if (!_retransmit.armed()) {
1729  start_retransmit_timer(now);
1730  }
1731  }
1732 
1733 
1734  // if advertised TCP receive window is 0 we may only transmit zero window probing segment.
1735  // Payload size of this segment is 1. Queueing anything bigger when _snd.window == 0 is bug
1736  // and violation of RFC
1737  assert((_snd.window > 0) || ((_snd.window == 0) && (len <= 1)));
1738  queue_packet(std::move(p));
1739 }
1740 
1741 template <typename InetTraits>
1742 future<> tcp<InetTraits>::tcb::wait_for_data() {
1743  if (!_rcv.data.empty() || foreign_will_not_send()) {
1744  return make_ready_future<>();
1745  }
1746  _rcv._data_received_promise = promise<>();
1747  return _rcv._data_received_promise->get_future();
1748 }
1749 
1750 template <typename InetTraits>
1751 future<> tcp<InetTraits>::tcb::wait_input_shutdown() {
1752  if (!_fin_recvd_promise) {
1753  return make_ready_future<>();
1754  }
1755  return _fin_recvd_promise->get_future();
1756 }
1757 
1758 template <typename InetTraits>
1759 void
1760 tcp<InetTraits>::tcb::abort_reader() noexcept {
1761  if (_rcv._data_received_promise) {
1762  _rcv._data_received_promise->set_exception(
1763  std::make_exception_ptr(std::system_error(ECONNABORTED, std::system_category())));
1764  _rcv._data_received_promise = std::nullopt;
1765  }
1766  if (_fin_recvd_promise) {
1767  _fin_recvd_promise->set_value();
1768  _fin_recvd_promise.reset();
1769  }
1770 }
1771 
1772 template <typename InetTraits>
1773 future<> tcp<InetTraits>::tcb::wait_for_all_data_acked() {
1774  if (_snd.data.empty() && _snd.unsent_len == 0) {
1775  return make_ready_future<>();
1776  }
1777  _snd._all_data_acked_promise = promise<>();
1778  return _snd._all_data_acked_promise->get_future();
1779 }
1780 
1781 template <typename InetTraits>
1783  // An initial send sequence number (ISS) is selected. A SYN segment of the
1784  // form <SEQ=ISS><CTL=SYN> is sent. Set SND.UNA to ISS, SND.NXT to ISS+1,
1785  // enter SYN-SENT state, and return.
1786  do_setup_isn();
1787 
1788  // Local receive window scale factor
1789  _rcv.window_scale = _option._local_win_scale = 7;
1790  // Maximum segment size local can receive
1791  _rcv.mss = _option._local_mss = local_mss();
1792  _rcv.window = get_default_receive_window_size();
1793 
1794  do_syn_sent();
1795 }
1796 
1797 template <typename InetTraits>
1798 packet tcp<InetTraits>::tcb::read() {
1799  packet p;
1800  for (auto&& q : _rcv.data) {
1801  p.append(std::move(q));
1802  }
1803  _rcv.data_size = 0;
1804  _rcv.data.clear();
1805  _rcv.window = get_default_receive_window_size();
1806  return p;
1807 }
1808 
1809 template <typename InetTraits>
1810 future<> tcp<InetTraits>::tcb::wait_send_available() {
1811  if (_snd.max_queue_space > _snd.current_queue_space) {
1812  return make_ready_future<>();
1813  }
1814  _snd._send_available_promise = promise<>();
1815  return _snd._send_available_promise->get_future();
1816 }
1817 
1818 template <typename InetTraits>
1819 future<> tcp<InetTraits>::tcb::send(packet p) {
1820  // We can not send after the connection is closed
1821  if (_snd.closed || in_state(CLOSED)) {
1822  return make_exception_future<>(tcp_reset_error());
1823  }
1824 
1825  auto len = p.len();
1826  _snd.current_queue_space += len;
1827  _snd.unsent_len += len;
1828  _snd.unsent.push_back(std::move(p));
1829 
1830  if (can_send() > 0) {
1831  output();
1832  }
1833 
1834  return wait_send_available();
1835 }
1836 
1837 template <typename InetTraits>
1838 void tcp<InetTraits>::tcb::close() noexcept {
1839  if (in_state(CLOSED) || _snd.closed) {
1840  return;
1841  }
1842  // TODO: We should return a future to upper layer
1843  (void)wait_for_all_data_acked().then([this, zis = this->shared_from_this()] () mutable {
1844  _snd.closed = true;
1845  tcp_debug("close: unsent_len=%d\n", _snd.unsent_len);
1846  if (in_state(CLOSE_WAIT)) {
1847  tcp_debug("close: CLOSE_WAIT -> LAST_ACK\n");
1848  _state = LAST_ACK;
1849  } else if (in_state(ESTABLISHED)) {
1850  tcp_debug("close: ESTABLISHED -> FIN_WAIT_1\n");
1851  _state = FIN_WAIT_1;
1852  }
1853  // Send <FIN> to remote
1854  // Note: we call output_one to make sure a packet with FIN actually
1855  // sent out. If we only call output() and _packetq is not empty,
1856  // tcp::tcb::get_packet(), packet with FIN will not be generated.
1857  output_one();
1858  output();
1859  });
1860 }
1861 
1862 template <typename InetTraits>
1863 bool tcp<InetTraits>::tcb::should_send_ack(uint16_t seg_len) {
1864  // We've received a TSO packet, do ack immediately
1865  if (seg_len > _rcv.mss) {
1866  _nr_full_seg_received = 0;
1867  _delayed_ack.cancel();
1868  return true;
1869  }
1870 
1871  // We've received a full sized segment, ack for every second full sized segment
1872  if (seg_len == _rcv.mss) {
1873  if (_nr_full_seg_received++ >= 1) {
1874  _nr_full_seg_received = 0;
1875  _delayed_ack.cancel();
1876  return true;
1877  }
1878  }
1879 
1880  // If the timer is armed and its callback hasn't been run.
1881  if (_delayed_ack.armed()) {
1882  return false;
1883  }
1884 
1885  // If the timer is not armed, schedule a delayed ACK.
1886  // The maximum delayed ack timer allowed by RFC1122 is 500ms, most
1887  // implementations use 200ms.
1888  _delayed_ack.arm(200ms);
1889  return false;
1890 }
1891 
1892 template <typename InetTraits>
1893 void tcp<InetTraits>::tcb::clear_delayed_ack() noexcept {
1894  _delayed_ack.cancel();
1895 }
1896 
1897 template <typename InetTraits>
1898 bool tcp<InetTraits>::tcb::merge_out_of_order() {
1899  bool merged = false;
1900  if (_rcv.out_of_order.map.empty()) {
1901  return merged;
1902  }
1903  for (auto it = _rcv.out_of_order.map.begin(); it != _rcv.out_of_order.map.end();) {
1904  auto& p = it->second;
1905  auto seg_beg = it->first;
1906  auto seg_len = p.len();
1907  auto seg_end = seg_beg + seg_len;
1908  if (seg_beg <= _rcv.next && _rcv.next < seg_end) {
1909  // This segment has been received out of order and its previous
1910  // segment has been received now
1911  auto trim = _rcv.next - seg_beg;
1912  if (trim) {
1913  p.trim_front(trim);
1914  seg_len -= trim;
1915  }
1916  _rcv.next += seg_len;
1917  _rcv.data_size += p.len();
1918  _rcv.data.push_back(std::move(p));
1919  // Since c++11, erase() always returns the value of the following element
1920  it = _rcv.out_of_order.map.erase(it);
1921  merged = true;
1922  } else if (_rcv.next >= seg_end) {
1923  // This segment has been receive already, drop it
1924  it = _rcv.out_of_order.map.erase(it);
1925  } else {
1926  // seg_beg > _rcv.need, can not merge. Note, seg_beg can grow only,
1927  // so we can stop looking here.
1928  it++;
1929  break;
1930  }
1931  }
1932  return merged;
1933 }
1934 
1935 template <typename InetTraits>
1936 void tcp<InetTraits>::tcb::insert_out_of_order(tcp_seq seg, packet p) {
1937  _rcv.out_of_order.merge(seg, std::move(p));
1938 }
1939 
1940 template <typename InetTraits>
1941 void tcp<InetTraits>::tcb::trim_receive_data_after_window() {
1942  abort();
1943 }
1944 
1945 template <typename InetTraits>
1946 void tcp<InetTraits>::tcb::persist() {
1947  tcp_debug("persist timer fired\n");
1948  // Send 1 byte packet to probe peer's window size
1949  _snd.window_probe = true;
1950  _snd.zero_window_probing_out++;
1951  output_one();
1952  _snd.window_probe = false;
1953 
1954  output();
1955  // Perform binary exponential back-off per RFC1122
1956  _persist_time_out = std::min(_persist_time_out * 2, _rto_max);
1957  start_persist_timer();
1958 }
1959 
1960 template <typename InetTraits>
1961 void tcp<InetTraits>::tcb::retransmit() {
1962  auto output_update_rto = [this] {
1963  output();
1964  // According to RFC6298, Update RTO <- RTO * 2 to perform binary exponential back-off
1965  this->_rto = std::min(this->_rto * 2, this->_rto_max);
1966  start_retransmit_timer();
1967  };
1968 
1969  // Retransmit SYN
1970  if (syn_needs_on()) {
1971  if (_snd.syn_retransmit++ < _max_nr_retransmit) {
1972  output_update_rto();
1973  } else {
1974  _connect_done.set_exception(tcp_connect_error());
1975  cleanup();
1976  return;
1977  }
1978  }
1979 
1980  // Retransmit FIN
1981  if (fin_needs_on()) {
1982  if (_snd.fin_retransmit++ < _max_nr_retransmit) {
1983  output_update_rto();
1984  } else {
1985  cleanup();
1986  return;
1987  }
1988  }
1989 
1990  // Retransmit Data
1991  if (_snd.data.empty()) {
1992  return;
1993  }
1994 
1995  // If there are unacked data, retransmit the earliest segment
1996  auto& unacked_seg = _snd.data.front();
1997 
1998  // According to RFC5681
1999  // Update ssthresh only for the first retransmit
2000  uint32_t smss = _snd.mss;
2001  if (unacked_seg.nr_transmits == 0) {
2002  _snd.ssthresh = std::max(flight_size() / 2, 2 * smss);
2003  }
2004  // RFC6582 Step 4
2005  _snd.recover = _snd.next - 1;
2006  // Start the slow start process
2007  _snd.cwnd = smss;
2008  // End fast recovery
2009  exit_fast_recovery();
2010 
2011  if (unacked_seg.nr_transmits < _max_nr_retransmit) {
2012  unacked_seg.nr_transmits++;
2013  } else {
2014  // Delete connection when max num of retransmission is reached
2015  do_reset();
2016  return;
2017  }
2018  retransmit_one();
2019 
2020  output_update_rto();
2021 }
2022 
2023 template <typename InetTraits>
2024 void tcp<InetTraits>::tcb::fast_retransmit() {
2025  if (!_snd.data.empty()) {
2026  auto& unacked_seg = _snd.data.front();
2027  unacked_seg.nr_transmits++;
2028  retransmit_one();
2029  output();
2030  }
2031 }
2032 
2033 template <typename InetTraits>
2034 void tcp<InetTraits>::tcb::update_rto(clock_type::time_point tx_time) {
2035  // Update RTO according to RFC6298
2036  auto R = std::chrono::duration_cast<std::chrono::milliseconds>(clock_type::now() - tx_time);
2037  if (_snd.first_rto_sample) {
2038  _snd.first_rto_sample = false;
2039  // RTTVAR <- R/2
2040  // SRTT <- R
2041  _snd.rttvar = R / 2;
2042  _snd.srtt = R;
2043  } else {
2044  // RTTVAR <- (1 - beta) * RTTVAR + beta * |SRTT - R'|
2045  // SRTT <- (1 - alpha) * SRTT + alpha * R'
2046  // where alpha = 1/8 and beta = 1/4
2047  auto delta = _snd.srtt > R ? (_snd.srtt - R) : (R - _snd.srtt);
2048  _snd.rttvar = _snd.rttvar * 3 / 4 + delta / 4;
2049  _snd.srtt = _snd.srtt * 7 / 8 + R / 8;
2050  }
2051  // RTO <- SRTT + max(G, K * RTTVAR)
2052  _rto = _snd.srtt + std::max(_rto_clk_granularity, 4 * _snd.rttvar);
2053 
2054  // Make sure 1 sec << _rto << 60 sec
2055  _rto = std::max(_rto, _rto_min);
2056  _rto = std::min(_rto, _rto_max);
2057 }
2058 
2059 template <typename InetTraits>
2060 void tcp<InetTraits>::tcb::update_cwnd(uint32_t acked_bytes) {
2061  uint32_t smss = _snd.mss;
2062  if (_snd.cwnd < _snd.ssthresh) {
2063  // In slow start phase
2064  _snd.cwnd += std::min(acked_bytes, smss);
2065  } else {
2066  // In congestion avoidance phase
2067  uint32_t round_up = 1;
2068  _snd.cwnd += std::max(round_up, smss * smss / _snd.cwnd);
2069  }
2070 }
2071 
2072 template <typename InetTraits>
2073 void tcp<InetTraits>::tcb::cleanup() {
2074  _snd.unsent.clear();
2075  _snd.data.clear();
2076  _rcv.out_of_order.map.clear();
2077  _rcv.data_size = 0;
2078  _rcv.data.clear();
2079  stop_retransmit_timer();
2080  clear_delayed_ack();
2081  remove_from_tcbs();
2082 }
2083 
2084 template <typename InetTraits>
2085 tcp_seq tcp<InetTraits>::tcb::get_isn() {
2086  // Per RFC6528, TCP SHOULD generate its Initial Sequence Numbers
2087  // with the expression:
2088  // ISN = M + F(localip, localport, remoteip, remoteport, secretkey)
2089  // M is the 4 microsecond timer
2090  using namespace std::chrono;
2091  uint32_t hash[4];
2092  hash[0] = _local_ip.ip;
2093  hash[1] = _foreign_ip.ip;
2094  hash[2] = (_local_port << 16) + _foreign_port;
2095  hash[3] = _isn_secret.key[15];
2096  CryptoPP::Weak::MD5::Transform(hash, _isn_secret.key);
2097  auto seq = hash[0];
2098  auto m = duration_cast<microseconds>(clock_type::now().time_since_epoch());
2099  seq += m.count() / 4;
2100  return make_seq(seq);
2101 }
2102 
2103 template <typename InetTraits>
2104 std::optional<typename InetTraits::l4packet> tcp<InetTraits>::tcb::get_packet() {
2105  _poll_active = false;
2106  if (_packetq.empty()) {
2107  output_one();
2108  }
2109 
2110  if (in_state(CLOSED)) {
2111  return std::optional<typename InetTraits::l4packet>();
2112  }
2113 
2114  assert(!_packetq.empty());
2115 
2116  auto p = std::move(_packetq.front());
2117  _packetq.pop_front();
2118  if (!_packetq.empty() || (_snd.dupacks < 3 && can_send() > 0 && (_snd.window > 0))) {
2119  // If there are packets to send in the queue or tcb is allowed to send
2120  // more add tcp back to polling set to keep sending. In addition, dupacks >= 3
2121  // is an indication that an segment is lost, stop sending more in this case.
2122  // Finally - we can't send more until window is opened again.
2123  output();
2124  }
2125  return p;
2126 }
2127 
2128 template <typename InetTraits>
2129 void tcp<InetTraits>::connection::close_read() noexcept {
2130  _tcb->abort_reader();
2131 }
2132 
2133 template <typename InetTraits>
2134 void tcp<InetTraits>::connection::close_write() noexcept {
2135  _tcb->close();
2136 }
2137 
2138 template <typename InetTraits>
2139 void tcp<InetTraits>::connection::shutdown_connect() {
2140  if (_tcb->syn_needs_on()) {
2141  _tcb->_connect_done.set_exception(tcp_refused_error());
2142  _tcb->cleanup();
2143  } else {
2144  close_read();
2145  close_write();
2146  }
2147 }
2148 
2149 template <typename InetTraits>
2150 typename tcp<InetTraits>::tcb::isn_secret tcp<InetTraits>::tcb::_isn_secret;
2151 
2152 }
2153 
2154 }
bool try_wait(size_t nr=1) noexcept
Definition: semaphore.hh:434
void signal(size_t nr=1) noexcept
Definition: semaphore.hh:396
Definition: shared_ptr.hh:150
Result then(Func &&func) noexcept
Schedule a block of code to run when the future is ready.
Definition: future.hh:1410
Low-resolution and efficient steady clock.
Definition: lowres_clock.hh:59
holds the metric definition.
Definition: metrics_registration.hh:94
metric_groups & add_group(const group_name_type &name, const std::initializer_list< metric_definition > &l)
Add metrics belonging to the same group.
Definition: arp.hh:205
Definition: net.hh:51
Definition: packet.hh:87
Definition: tcp.hh:674
Definition: tcp.hh:722
Definition: tcp.hh:294
void set_value(A &&... a) noexcept
Sets the promises value.
Definition: future.hh:982
void set_exception(std::exception_ptr &&ex) noexcept
Marks the promise as failed.
Definition: future.hh:990
Definition: queue.hh:44
size_t size() const noexcept
Returns the number of items currently in the queue.
Definition: queue.hh:109
size_t max_size() const noexcept
Definition: queue.hh:117
void abort(std::exception_ptr ex) noexcept
Definition: queue.hh:131
T pop() noexcept
Pop an item.
Definition: queue.hh:208
future not_empty() noexcept
Definition: queue.hh:308
Definition: socket_defs.hh:47
bool cancel() noexcept
void rearm(time_point until, std::optional< duration > period={}) noexcept
Definition: timer.hh:168
future< T > get_future() noexcept
Gets the promise's associated future.
Definition: future.hh:1917
future now()
Returns a ready future.
Definition: later.hh:35
impl::metric_definition_impl make_counter(metric_name_type name, T &&val, description d=description(), std::vector< label_instance > labels={})
create a counter metric
Definition: metrics.hh:529
server_socket listen(socket_address sa)
future< connected_socket > connect(socket_address sa)
header for metrics creation.
Definition: net.hh:75
Definition: tcp.hh:200
Definition: tcp.hh:290
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
SEASTAR_MODULE_EXPORT_BEGIN shard_id this_shard_id() noexcept
Returns shard_id of the of the current shard.
Definition: shard_id.hh:50
Definition: ethernet.hh:37
Definition: ip.hh:108
Definition: tcp.hh:230
Definition: tcp.hh:171
Definition: tcp.hh:109
Definition: tcp.hh:164
Definition: tcp.hh:137
Definition: tcp.hh:99