24 #ifndef SEASTAR_MODULE
25 #include <unordered_map>
32 #include <system_error>
34 #define CRYPTOPP_ENABLE_NAMESPACE_WEAK 1
35 #include <cryptopp/md5.h>
38 #include <seastar/core/shared_ptr.hh>
39 #include <seastar/core/queue.hh>
40 #include <seastar/core/semaphore.hh>
41 #include <seastar/core/byteorder.hh>
43 #include <seastar/net/net.hh>
44 #include <seastar/net/ip_checksum.hh>
45 #include <seastar/net/ip.hh>
46 #include <seastar/net/const.hh>
47 #include <seastar/net/packet-util.hh>
48 #include <seastar/util/std-compat.hh>
52 using namespace std::chrono_literals;
58 inline auto tcp_error(
int err) {
59 return std::system_error(err, std::system_category());
62 inline auto tcp_reset_error() {
63 return tcp_error(ECONNRESET);
66 inline auto tcp_connect_error() {
67 return tcp_error(ECONNABORTED);
70 inline auto tcp_refused_error() {
71 return tcp_error(ECONNREFUSED);
74 enum class tcp_state : uint16_t {
78 SYN_RECEIVED = (1 << 3),
79 ESTABLISHED = (1 << 4),
80 FIN_WAIT_1 = (1 << 5),
81 FIN_WAIT_2 = (1 << 6),
82 CLOSE_WAIT = (1 << 7),
88 inline tcp_state operator|(tcp_state s1, tcp_state s2) {
89 return tcp_state(uint16_t(s1) | uint16_t(s2));
92 template <
typename... Args>
93 void tcp_debug(
const char* fmt, Args&&... args) {
95 print(fmt, std::forward<Args>(args)...);
103 static void write(
char* p, option_kind kind, option_len len) {
104 p[0] =
static_cast<uint8_t
>(kind);
105 if (
static_cast<uint8_t
>(len) > 1) {
106 p[1] =
static_cast<uint8_t
>(len);
110 static constexpr option_kind kind = option_kind::mss;
111 static constexpr option_len len = option_len::mss;
115 x.mss = read_be<uint16_t>(p + 2);
118 void write(
char* p)
const {
119 tcp_option::write(p, kind, len);
120 write_be<uint16_t>(p + 2,
mss);
124 static constexpr option_kind kind = option_kind::win_scale;
125 static constexpr option_len len = option_len::win_scale;
132 void write(
char* p)
const {
133 tcp_option::write(p, kind, len);
138 static constexpr option_kind kind = option_kind::sack;
139 static constexpr option_len len = option_len::sack;
143 void write(
char* p)
const {
144 tcp_option::write(p, kind, len);
148 static constexpr option_kind kind = option_kind::timestamps;
149 static constexpr option_len len = option_len::timestamps;
154 ts.t1 = read_be<uint32_t>(p + 2);
155 ts.t2 = read_be<uint32_t>(p + 6);
158 void write(
char* p)
const {
159 tcp_option::write(p, kind, len);
160 write_be<uint32_t>(p + 2, t1);
161 write_be<uint32_t>(p + 6, t2);
165 static constexpr option_kind kind = option_kind::nop;
166 static constexpr option_len len = option_len::nop;
167 void write(
char* p)
const {
168 tcp_option::write(p, kind, len);
172 static constexpr option_kind kind = option_kind::eol;
173 static constexpr option_len len = option_len::eol;
174 void write(
char* p)
const {
175 tcp_option::write(p, kind, len);
178 static const uint8_t align = 4;
180 void parse(uint8_t* beg, uint8_t* end);
181 uint8_t fill(
void* h,
const tcp_hdr* th, uint8_t option_size);
182 uint8_t get_size(
bool syn_on,
bool ack_on);
185 bool _mss_received =
false;
186 bool _win_scale_received =
false;
187 bool _timestamps_received =
false;
188 bool _sack_received =
false;
191 uint16_t _remote_mss = 536;
193 uint8_t _remote_win_scale = 0;
194 uint8_t _local_win_scale = 0;
196 inline char*& operator+=(
char*& x, tcp_option::option_len len) { x += uint8_t(len);
return x; }
197 inline const char*& operator+=(
const char*& x, tcp_option::option_len len) { x += uint8_t(len);
return x; }
198 inline uint8_t& operator+=(uint8_t& x, tcp_option::option_len len) { x += uint8_t(len);
return x; }
205 return tcp_seq { ntoh(s.raw) };
208 inline tcp_seq hton(tcp_seq s) {
209 return tcp_seq { hton(s.raw) };
213 std::ostream& operator<<(std::ostream& os, tcp_seq s) {
217 inline tcp_seq make_seq(uint32_t raw) {
return tcp_seq{raw}; }
218 inline tcp_seq& operator+=(tcp_seq& s, int32_t n) { s.raw += n;
return s; }
219 inline tcp_seq& operator-=(tcp_seq& s, int32_t n) { s.raw -= n;
return s; }
220 inline tcp_seq operator+(tcp_seq s, int32_t n) {
return s += n; }
221 inline tcp_seq operator-(tcp_seq s, int32_t n) {
return s -= n; }
222 inline int32_t operator-(tcp_seq s, tcp_seq q) {
return s.raw - q.raw; }
223 inline bool operator==(tcp_seq s, tcp_seq q) {
return s.raw == q.raw; }
224 inline bool operator!=(tcp_seq s, tcp_seq q) {
return !(s == q); }
225 inline bool operator<(tcp_seq s, tcp_seq q) {
return s - q < 0; }
226 inline bool operator>(tcp_seq s, tcp_seq q) {
return q < s; }
227 inline bool operator<=(tcp_seq s, tcp_seq q) {
return !(s > q); }
228 inline bool operator>=(tcp_seq s, tcp_seq q) {
return !(s < q); }
231 static constexpr
size_t len = 20;
237 uint8_t data_offset : 4;
248 static tcp_hdr read(
const char* p) {
250 h.src_port = read_be<uint16_t>(p + 0);
251 h.dst_port = read_be<uint16_t>(p + 2);
252 h.seq =
tcp_seq{read_be<uint32_t>(p + 4)};
253 h.ack =
tcp_seq{read_be<uint32_t>(p + 8)};
254 h.rsvd1 = p[12] & 15;
255 h.data_offset = uint8_t(p[12]) >> 4;
256 h.f_fin = (uint8_t(p[13]) >> 0) & 1;
257 h.f_syn = (uint8_t(p[13]) >> 1) & 1;
258 h.f_rst = (uint8_t(p[13]) >> 2) & 1;
259 h.f_psh = (uint8_t(p[13]) >> 3) & 1;
260 h.f_ack = (uint8_t(p[13]) >> 4) & 1;
261 h.f_urg = (uint8_t(p[13]) >> 5) & 1;
262 h.rsvd2 = (uint8_t(p[13]) >> 6) & 3;
263 h.window = read_be<uint16_t>(p + 14);
264 h.checksum = read_be<uint16_t>(p + 16);
265 h.urgent = read_be<uint16_t>(p + 18);
268 void write(
char* p)
const {
269 write_be<uint16_t>(p + 0, src_port);
270 write_be<uint16_t>(p + 2, dst_port);
271 write_be<uint32_t>(p + 4, seq.raw);
272 write_be<uint32_t>(p + 8, ack.raw);
273 p[12] = rsvd1 | (data_offset << 4);
281 write_be<uint16_t>(p + 14, window);
282 write_be<uint16_t>(p + 16, checksum);
283 write_be<uint16_t>(p + 18, urgent);
285 static void write_nbo_checksum(
char* p, uint16_t checksum_in_network_byte_order) {
286 std::copy_n(
reinterpret_cast<const char*
>(&checksum_in_network_byte_order), 2, p + 16);
293 template <
typename InetTraits>
296 using ipaddr =
typename InetTraits::address_type;
297 using inet_type =
typename InetTraits::inet_type;
307 static constexpr tcp_state CLOSED = tcp_state::CLOSED;
308 static constexpr tcp_state LISTEN = tcp_state::LISTEN;
309 static constexpr tcp_state SYN_SENT = tcp_state::SYN_SENT;
310 static constexpr tcp_state SYN_RECEIVED = tcp_state::SYN_RECEIVED;
311 static constexpr tcp_state ESTABLISHED = tcp_state::ESTABLISHED;
312 static constexpr tcp_state FIN_WAIT_1 = tcp_state::FIN_WAIT_1;
313 static constexpr tcp_state FIN_WAIT_2 = tcp_state::FIN_WAIT_2;
314 static constexpr tcp_state CLOSE_WAIT = tcp_state::CLOSE_WAIT;
315 static constexpr tcp_state CLOSING = tcp_state::CLOSING;
316 static constexpr tcp_state LAST_ACK = tcp_state::LAST_ACK;
317 static constexpr tcp_state TIME_WAIT = tcp_state::TIME_WAIT;
318 tcp_state _state = CLOSED;
322 std::optional<promise<>> _fin_recvd_promise =
promise<>();
325 uint16_t _local_port;
326 uint16_t _foreign_port;
327 struct unacked_segment {
330 unsigned nr_transmits;
331 clock_type::time_point tx_time;
337 uint8_t window_scale;
343 std::deque<unacked_segment> data;
344 std::deque<packet> unsent;
345 uint32_t unsent_len = 0;
349 std::optional<promise<>> _all_data_acked_promise;
351 size_t max_queue_space = 212992;
352 size_t current_queue_space = 0;
354 std::optional<promise<>> _send_available_promise;
356 std::chrono::milliseconds rttvar;
358 std::chrono::milliseconds srtt;
359 bool first_rto_sample =
true;
360 clock_type::time_point syn_tx_time;
366 uint16_t dupacks = 0;
367 unsigned syn_retransmit = 0;
368 unsigned fin_retransmit = 0;
369 uint32_t limited_transfer = 0;
370 uint32_t partial_ack = 0;
372 bool window_probe =
false;
373 uint8_t zero_window_probing_out = 0;
378 uint8_t window_scale;
382 std::deque<packet> data;
384 size_t data_size = 0;
386 std::optional<promise<>> _data_received_promise;
389 size_t max_receive_buf_size = 3737600;
394 std::chrono::milliseconds _rto{1000};
395 std::chrono::milliseconds _persist_time_out{1000};
396 static constexpr std::chrono::milliseconds _rto_min{1000};
397 static constexpr std::chrono::milliseconds _rto_max{60000};
399 static constexpr std::chrono::milliseconds _rto_clk_granularity{1};
400 static constexpr uint16_t _max_nr_retransmit{5};
403 uint16_t _nr_full_seg_received = 0;
408 std::random_device rd;
409 std::default_random_engine e(rd());
410 std::uniform_int_distribution<uint32_t> dist{};
411 for (
auto& k : key) {
416 static isn_secret _isn_secret;
419 bool _poll_active =
false;
420 uint32_t get_default_receive_window_size() {
422 constexpr uint32_t size = 29200;
423 return size << _rcv.window_scale;
426 uint32_t get_modified_receive_window_size() {
427 uint32_t left = _rcv.data_size > _rcv.max_receive_buf_size ? 0 : _rcv.max_receive_buf_size - _rcv.data_size;
428 return std::min(left, get_default_receive_window_size());
435 void output_one(
bool data_retransmit =
false);
438 void abort_reader() noexcept;
444 void close() noexcept;
445 void remove_from_tcbs() {
446 auto id =
connid{_local_ip, _foreign_ip, _local_port, _foreign_port};
447 _tcp._tcbs.erase(
id);
449 std::optional<typename InetTraits::l4packet> get_packet();
454 (void)_tcp.poll_tcb(_foreign_ip, this->shared_from_this()).then_wrapped([
this] (
auto&& f) {
459 _poll_active =
false;
460 this->start_retransmit_timer();
462 if (this->in_state(SYN_SENT)) {
478 void respond_with_reset(
tcp_hdr* th);
479 bool merge_out_of_order();
481 void trim_receive_data_after_window();
482 bool should_send_ack(uint16_t seg_len);
483 void clear_delayed_ack() noexcept;
484 packet get_transmit_packet();
485 void retransmit_one() {
486 bool data_retransmit =
true;
487 output_one(data_retransmit);
489 void start_retransmit_timer() {
491 start_retransmit_timer(
now);
493 void start_retransmit_timer(clock_type::time_point
now) {
494 auto tp =
now + _rto;
495 _retransmit.
rearm(tp);
497 void stop_retransmit_timer() noexcept {
500 void start_persist_timer() {
502 start_persist_timer(
now);
504 void start_persist_timer(clock_type::time_point
now) {
505 auto tp =
now + _persist_time_out;
508 void stop_persist_timer() {
513 void fast_retransmit();
514 void update_rto(clock_type::time_point tx_time);
515 void update_cwnd(uint32_t acked_bytes);
517 uint32_t can_send() {
518 if (_snd.window_probe) {
523 if (_snd.window == 0) {
528 auto window_used = uint32_t(_snd.next - _snd.unacknowledged);
529 if (window_used > _snd.window) {
534 auto x = std::min(_snd.window - window_used, _snd.unsent_len);
537 x = std::min(_snd.cwnd, x);
538 if (_snd.dupacks == 1 || _snd.dupacks == 2) {
541 auto flight = flight_size();
542 auto max = _snd.cwnd + 2 * _snd.mss;
543 x = flight <= max ? std::min(x, max - flight) : 0;
544 _snd.limited_transfer += x;
545 }
else if (_snd.dupacks >= 3) {
548 x = std::min(uint32_t(_snd.mss), x);
552 uint32_t flight_size() {
554 std::for_each(_snd.data.begin(), _snd.data.end(), [&] (unacked_segment& seg) { size += seg.p.len(); });
557 uint16_t local_mss() {
558 return _tcp.hw_features().mtu - net::tcp_hdr_len_min - InetTraits::ip_hdr_len_min;
560 void queue_packet(
packet p) {
561 _packetq.emplace_back(
typename InetTraits::l4packet{_foreign_ip, std::move(p)});
563 void signal_data_received() {
564 if (_rcv._data_received_promise) {
565 _rcv._data_received_promise->set_value();
566 _rcv._data_received_promise = {};
569 void signal_all_data_acked() {
570 if (_snd._all_data_acked_promise && _snd.unsent_len == 0) {
571 _snd._all_data_acked_promise->set_value();
572 _snd._all_data_acked_promise = {};
575 void signal_send_available() {
576 if (_snd._send_available_promise && _snd.max_queue_space > _snd.current_queue_space) {
577 _snd._send_available_promise->set_value();
578 _snd._send_available_promise = {};
587 void do_syn_received() {
588 _state = SYN_RECEIVED;
593 void do_established() {
594 _state = ESTABLISHED;
595 update_rto(_snd.syn_tx_time);
601 if (_rcv._data_received_promise) {
602 _rcv._data_received_promise->set_exception(tcp_reset_error());
603 _rcv._data_received_promise = std::nullopt;
605 if (_snd._all_data_acked_promise) {
606 _snd._all_data_acked_promise->set_exception(tcp_reset_error());
607 _snd._all_data_acked_promise = std::nullopt;
609 if (_snd._send_available_promise) {
610 _snd._send_available_promise->set_exception(tcp_reset_error());
611 _snd._send_available_promise = std::nullopt;
614 void do_time_wait() {
623 void do_setup_isn() {
624 _snd.initial = get_isn();
625 _snd.unacknowledged = _snd.initial;
626 _snd.next = _snd.initial + 1;
627 _snd.recover = _snd.initial;
629 void do_local_fin_acked() {
630 _snd.unacknowledged += 1;
633 bool syn_needs_on()
const noexcept {
634 return in_state(SYN_SENT | SYN_RECEIVED);
636 bool fin_needs_on()
const noexcept {
637 return in_state(FIN_WAIT_1 | CLOSING | LAST_ACK) && _snd.closed &&
638 _snd.unsent_len == 0;
640 bool ack_needs_on()
const noexcept {
641 return !in_state(CLOSED | LISTEN | SYN_SENT);
643 bool foreign_will_not_send()
const noexcept {
644 return in_state(CLOSING | TIME_WAIT | CLOSE_WAIT | LAST_ACK | CLOSED);
646 bool in_state(tcp_state state)
const noexcept {
647 return uint16_t(_state) & uint16_t(state);
649 void exit_fast_recovery() {
651 _snd.limited_transfer = 0;
652 _snd.partial_ack = 0;
654 uint32_t data_segment_acked(
tcp_seq seg_ack);
655 bool segment_acceptable(
tcp_seq seg_seq,
unsigned seg_len);
656 void init_from_options(
tcp_hdr* th, uint8_t* opt_start, uint8_t* opt_end);
660 std::unordered_map<connid, lw_shared_ptr<tcb>, connid_hash> _tcbs;
661 std::unordered_map<uint16_t, listener*> _listening;
662 std::random_device _rd;
663 std::default_random_engine _e;
664 std::uniform_int_distribution<uint16_t> _port_dist{41952, 65535};
671 const inet_type& inet()
const {
692 return _tcb->connect_done();
695 return _tcb->send(std::move(p));
698 return _tcb->wait_for_data();
701 return _tcb->wait_input_shutdown();
706 ipaddr foreign_ip() {
707 return _tcb->_foreign_ip;
709 uint16_t foreign_port() {
710 return _tcb->_foreign_port;
713 return _tcb->_local_ip;
715 uint16_t local_port() {
716 return _tcb->_local_port;
718 void shutdown_connect();
719 void close_read() noexcept;
720 void close_write() noexcept;
728 listener(
tcp& t, uint16_t port,
size_t queue_length)
729 : _tcp(t), _port(port), _q(queue_length) {
730 _tcp._listening.emplace(_port,
this);
734 : _tcp(x._tcp), _port(x._port), _q(std::move(x._q)) {
735 _tcp._listening[_port] =
this;
740 _tcp._listening.erase(_port);
745 return make_ready_future<connection>(_q.
pop());
748 void abort_accept() {
749 _q.
abort(std::make_exception_ptr(std::system_error(ECONNABORTED, std::system_category())));
751 bool full() {
return _pending + _q.
size() >= _q.
max_size(); }
752 void inc_pending() { _pending++; }
753 void dec_pending() { _pending--; }
755 const tcp& get_tcp()
const {
758 uint16_t port()
const {
764 explicit tcp(inet_type& inet);
765 void received(
packet p, ipaddr from, ipaddr to);
772 auto it = _listening.find(local_port);
773 if (it != _listening.end()) {
774 it->second->_q.push(connection(tcbp));
775 it->second->dec_pending();
779 void send_packet_without_tcb(ipaddr from, ipaddr to, packet p);
780 void respond_with_reset(tcp_hdr* rth, ipaddr local_ip, ipaddr foreign_ip);
781 friend class listener;
784 template <
typename InetTraits>
785 tcp<InetTraits>::tcp(inet_type& inet)
788 namespace sm = metrics;
791 sm::make_counter(
"linearizations", [] {
return tcp_packet_merger::linearizations(); },
792 sm::description(
"Counts a number of times a buffer linearization was invoked during the buffers merge process. "
793 "Divide it by a total TCP receive packet rate to get an everage number of lineraizations per TCP packet."))
796 _inet.register_packet_provider([
this, tcb_polled = 0u] ()
mutable {
797 std::optional<typename InetTraits::l4packet> l4p;
798 auto c = _poll_tcbs.size();
799 if (!_packetq.empty() && (!(tcb_polled % 128) || c == 0)) {
800 l4p = std::move(_packetq.front());
801 _packetq.pop_front();
802 _queue_space.
signal(l4p.value().p.len());
806 lw_shared_ptr<tcb> tcb;
807 ethernet_address dst;
808 std::tie(tcb, dst) = std::move(_poll_tcbs.front());
809 _poll_tcbs.pop_front();
810 l4p = tcb->get_packet();
812 l4p.value().e_dst = dst;
821 template <
typename InetTraits>
822 future<> tcp<InetTraits>::poll_tcb(ipaddr to, lw_shared_ptr<tcb> tcb) {
823 return _inet.get_l2_dst_address(to).
then([
this, tcb = std::move(tcb)] (ethernet_address dst) {
824 _poll_tcbs.emplace_back(std::move(tcb), dst);
828 template <
typename InetTraits>
830 return listener(*
this, port, queue_length);
833 template <
typename InetTraits>
837 auto src_ip = _inet._inet.host_address();
838 auto dst_ip = ipv4_address(sa);
839 auto dst_port = net::ntoh(sa.u.in.sin_port);
842 src_port = _port_dist(_e);
843 id = connid{src_ip, dst_ip, src_port, dst_port};
844 }
while (_inet._inet.netif()->hw_queues_count() > 1 &&
845 (_inet._inet.netif()->hash2cpu(
id.hash(_inet._inet.netif()->rss_key())) !=
this_shard_id()
846 || _tcbs.find(
id) != _tcbs.end()));
848 auto tcbp = make_lw_shared<tcb>(*
this,
id);
849 _tcbs.insert({id, tcbp});
851 return connection(tcbp);
854 template <
typename InetTraits>
855 bool tcp<InetTraits>::forward(forward_hash& out_hash_data, packet& p,
size_t off) {
856 auto th = p.get_header(off, tcp_hdr::len);
859 out_hash_data.push_back(uint8_t(th[0]));
860 out_hash_data.push_back(uint8_t(th[1]));
861 out_hash_data.push_back(uint8_t(th[2]));
862 out_hash_data.push_back(uint8_t(th[3]));
867 template <
typename InetTraits>
868 void tcp<InetTraits>::received(packet p, ipaddr from, ipaddr to) {
869 auto th = p.get_header(0, tcp_hdr::len);
874 auto data_offset = uint8_t(th[12]) >> 4;
875 if (
size_t(data_offset * 4) < tcp_hdr::len) {
879 if (!hw_features().rx_csum_offload) {
881 InetTraits::tcp_pseudo_header_checksum(csum, from, to, p.len());
883 if (csum.get() != 0) {
887 auto h = tcp_hdr::read(th);
888 auto id = connid{to, from, h.dst_port, h.src_port};
889 auto tcbi = _tcbs.find(
id);
890 lw_shared_ptr<tcb> tcbp;
891 if (tcbi == _tcbs.end()) {
892 auto listener = _listening.find(
id.local_port);
893 if (listener == _listening.end() || listener->second->full()) {
901 return respond_with_reset(&h,
id.local_ip,
id.foreign_ip);
914 return respond_with_reset(&h,
id.local_ip,
id.foreign_ip);
920 tcbp = make_lw_shared<tcb>(*
this,
id);
921 _tcbs.insert({id, tcbp});
924 listener->second->inc_pending();
926 return tcbp->input_handle_listen_state(&h, std::move(p));
935 if (tcbp->state() == tcp_state::SYN_SENT) {
937 return tcbp->input_handle_syn_sent_state(&h, std::move(p));
942 return tcbp->input_handle_other_state(&h, std::move(p));
948 template <
typename InetTraits>
949 void tcp<InetTraits>::send_packet_without_tcb(ipaddr from, ipaddr to, packet p) {
950 if (_queue_space.
try_wait(p.len())) {
952 (void)_inet.get_l2_dst_address(to).then([
this, to, p = std::move(p)] (ethernet_address e_dst)
mutable {
953 _packetq.emplace_back(ipv4_traits::l4packet{to, std::move(p), e_dst, ip_protocol_num::tcp});
958 template <
typename InetTraits>
959 tcp<InetTraits>::connection::~connection() {
961 _tcb->_conn =
nullptr;
967 template <
typename InetTraits>
968 tcp<InetTraits>::tcb::tcb(tcp& t, connid
id)
970 , _local_ip(id.local_ip)
971 , _foreign_ip(id.foreign_ip)
972 , _local_port(id.local_port)
973 , _foreign_port(id.foreign_port)
974 , _delayed_ack([this] { _nr_full_seg_received = 0; output(); })
975 , _retransmit([
this] { retransmit(); })
976 , _persist([
this] { persist(); }) {
979 template <
typename InetTraits>
980 void tcp<InetTraits>::tcb::respond_with_reset(tcp_hdr* rth) {
981 _tcp.respond_with_reset(rth, _local_ip, _foreign_ip);
984 template <
typename InetTraits>
985 void tcp<InetTraits>::respond_with_reset(tcp_hdr* rth, ipaddr local_ip, ipaddr foreign_ip) {
990 auto th = p.prepend_uninitialized_header(tcp_hdr::len);
992 h.src_port = rth->dst_port;
993 h.dst_port = rth->src_port;
999 h.ack = rth->seq + 1;
1003 h.data_offset = tcp_hdr::len / 4;
1009 InetTraits::tcp_pseudo_header_checksum(csum, local_ip, foreign_ip, tcp_hdr::len);
1011 if (hw_features().tx_csum_l4_offload) {
1012 checksum = ~csum.get();
1013 oi.needs_csum =
true;
1016 checksum = csum.get();
1017 oi.needs_csum =
false;
1019 tcp_hdr::write_nbo_checksum(th, checksum);
1021 oi.protocol = ip_protocol_num::tcp;
1022 oi.tcp_hdr_len = tcp_hdr::len;
1023 p.set_offload_info(oi);
1025 send_packet_without_tcb(local_ip, foreign_ip, std::move(p));
1028 template <
typename InetTraits>
1029 uint32_t tcp<InetTraits>::tcb::data_segment_acked(tcp_seq seg_ack) {
1030 uint32_t total_acked_bytes = 0;
1032 while (!_snd.data.empty()
1033 && (_snd.unacknowledged + _snd.data.front().p.len() <= seg_ack)) {
1034 auto acked_bytes = _snd.data.front().p.len();
1035 _snd.unacknowledged += acked_bytes;
1037 if (_snd.data.front().nr_transmits == 0) {
1038 update_rto(_snd.data.front().tx_time);
1040 update_cwnd(acked_bytes);
1041 total_acked_bytes += acked_bytes;
1042 _snd.current_queue_space -= _snd.data.front().data_len;
1043 signal_send_available();
1044 _snd.data.pop_front();
1047 if (_snd.unacknowledged < seg_ack) {
1048 auto acked_bytes = seg_ack - _snd.unacknowledged;
1049 if (!_snd.data.empty()) {
1050 auto& unacked_seg = _snd.data.front();
1051 unacked_seg.p.trim_front(acked_bytes);
1053 _snd.unacknowledged = seg_ack;
1054 update_cwnd(acked_bytes);
1055 total_acked_bytes += acked_bytes;
1057 return total_acked_bytes;
1060 template <
typename InetTraits>
1061 bool tcp<InetTraits>::tcb::segment_acceptable(tcp_seq seg_seq,
unsigned seg_len) {
1062 if (seg_len == 0 && _rcv.window == 0) {
1064 return seg_seq == _rcv.next;
1065 }
else if (seg_len == 0 && _rcv.window > 0) {
1067 return (_rcv.next <= seg_seq) && (seg_seq < _rcv.next + _rcv.window);
1068 }
else if (seg_len > 0 && _rcv.window > 0) {
1072 bool x = (_rcv.next <= seg_seq) && seg_seq < (_rcv.next + _rcv.window);
1073 bool y = (_rcv.next <= seg_seq + seg_len - 1) && (seg_seq + seg_len - 1 < _rcv.next + _rcv.window);
1081 template <
typename InetTraits>
1082 void tcp<InetTraits>::tcb::init_from_options(tcp_hdr* th, uint8_t* opt_start, uint8_t* opt_end) {
1084 _option.parse(opt_start, opt_end);
1087 _snd.window_scale = _option._remote_win_scale;
1089 _rcv.window_scale = _option._local_win_scale;
1092 _snd.mss = _option._remote_mss;
1094 _rcv.mss = _option._local_mss = local_mss();
1096 _rcv.window = get_default_receive_window_size();
1097 _snd.window = th->window << _snd.window_scale;
1105 if (2190 < _snd.mss) {
1106 _snd.cwnd = 2 * _snd.mss;
1107 }
else if (1095 < _snd.mss && _snd.mss <= 2190) {
1108 _snd.cwnd = 3 * _snd.mss;
1110 _snd.cwnd = 4 * _snd.mss;
1114 _snd.ssthresh = th->window << _snd.window_scale;
1117 template <
typename InetTraits>
1118 void tcp<InetTraits>::tcb::input_handle_listen_state(tcp_hdr* th, packet p) {
1119 auto opt_len = th->data_offset * 4 - tcp_hdr::len;
1120 auto opt_start =
reinterpret_cast<uint8_t*
>(p.get_header(0, th->data_offset * 4)) + tcp_hdr::len;
1121 auto opt_end = opt_start + opt_len;
1122 p.trim_front(th->data_offset * 4);
1123 tcp_seq seg_seq = th->seq;
1126 _rcv.next = seg_seq + 1;
1127 _rcv.initial = seg_seq;
1139 _rcv.urgent = _rcv.next;
1141 tcp_debug(
"listen: LISTEN -> SYN_RECEIVED\n");
1142 init_from_options(th, opt_start, opt_end);
1146 template <
typename InetTraits>
1147 void tcp<InetTraits>::tcb::input_handle_syn_sent_state(tcp_hdr* th, packet p) {
1148 auto opt_len = th->data_offset * 4 - tcp_hdr::len;
1149 auto opt_start =
reinterpret_cast<uint8_t*
>(p.get_header(0, th->data_offset * 4)) + tcp_hdr::len;
1150 auto opt_end = opt_start + opt_len;
1151 p.trim_front(th->data_offset * 4);
1152 tcp_seq seg_seq = th->seq;
1153 auto seg_ack = th->ack;
1155 bool acceptable =
false;
1160 if (seg_ack <= _snd.initial || seg_ack > _snd.next) {
1161 return respond_with_reset(th);
1165 acceptable = _snd.unacknowledged <= seg_ack && seg_ack <= _snd.next;
1174 _connect_done.set_exception(tcp_refused_error());
1190 _rcv.next = seg_seq + 1;
1191 _rcv.initial = seg_seq;
1194 _snd.unacknowledged = seg_ack;
1196 if (_snd.unacknowledged > _snd.initial) {
1200 tcp_debug(
"syn: SYN_SENT -> ESTABLISHED\n");
1201 init_from_options(th, opt_start, opt_end);
1207 tcp_debug(
"syn: SYN_SENT -> SYN_RECEIVED\n");
1217 template <
typename InetTraits>
1218 void tcp<InetTraits>::tcb::input_handle_other_state(tcp_hdr* th, packet p) {
1219 p.trim_front(th->data_offset * 4);
1220 bool do_output =
false;
1221 bool do_output_data =
false;
1222 tcp_seq seg_seq = th->seq;
1223 auto seg_ack = th->ack;
1224 auto seg_len = p.len();
1227 if (!segment_acceptable(seg_seq, seg_len)) {
1234 if (seg_seq < _rcv.next) {
1236 auto dup = std::min(uint32_t(_rcv.next - seg_seq), seg_len);
1243 if (seg_seq != _rcv.next) {
1244 insert_out_of_order(seg_seq, std::move(p));
1252 if (in_state(SYN_RECEIVED)) {
1262 _connect_done.set_exception(tcp_refused_error());
1265 if (in_state(ESTABLISHED | FIN_WAIT_1 | FIN_WAIT_2 | CLOSE_WAIT)) {
1273 if (in_state(CLOSING | LAST_ACK | TIME_WAIT)) {
1293 respond_with_reset(th);
1307 if (in_state(SYN_RECEIVED)) {
1310 if (_snd.unacknowledged <= seg_ack && seg_ack <= _snd.next) {
1311 tcp_debug(
"SYN_RECEIVED -> ESTABLISHED\n");
1313 _tcp.add_connected_tcb(this->shared_from_this(), _local_port);
1316 return respond_with_reset(th);
1319 auto update_window = [
this, th, seg_seq, seg_ack] {
1320 tcp_debug(
"window update seg_seq=%d, seg_ack=%d, old window=%d new window=%d\n",
1321 seg_seq, seg_ack, _snd.window, th->window << _snd.window_scale);
1322 _snd.window = th->window << _snd.window_scale;
1325 _snd.zero_window_probing_out = 0;
1326 if (_snd.window == 0) {
1327 _persist_time_out = _rto;
1328 start_persist_timer();
1330 stop_persist_timer();
1335 if (in_state(ESTABLISHED | CLOSE_WAIT)){
1337 auto packets_out = _snd.next - _snd.unacknowledged - _snd.zero_window_probing_out;
1339 if (_snd.unacknowledged < seg_ack && seg_ack <= _snd.next) {
1341 auto acked_bytes = data_segment_acked(seg_ack);
1344 if (_snd.wl1 < seg_seq || (_snd.wl1 == seg_seq && _snd.wl2 <= seg_ack)) {
1349 do_output_data =
true;
1351 auto set_retransmit_timer = [
this] {
1352 if (_snd.data.empty()) {
1354 stop_retransmit_timer();
1356 signal_all_data_acked();
1359 start_retransmit_timer();
1363 if (_snd.dupacks >= 3) {
1365 uint32_t smss = _snd.mss;
1366 if (seg_ack > _snd.recover) {
1367 tcp_debug(
"ack: full_ack\n");
1369 _snd.cwnd = std::min(_snd.ssthresh, std::max(flight_size(), smss) + smss);
1371 exit_fast_recovery();
1372 set_retransmit_timer();
1374 tcp_debug(
"ack: partial_ack\n");
1379 _snd.cwnd -= acked_bytes;
1382 if (acked_bytes >= smss) {
1389 if (++_snd.partial_ack == 1) {
1390 start_retransmit_timer();
1401 exit_fast_recovery();
1402 set_retransmit_timer();
1404 }
else if ((packets_out > 0) && !_snd.data.empty() && seg_len == 0 &&
1405 th->f_fin == 0 && th->f_syn == 0 &&
1406 th->ack == _snd.unacknowledged &&
1407 uint32_t(th->window << _snd.window_scale) == _snd.window) {
1416 uint32_t smss = _snd.mss;
1418 if (_snd.dupacks == 1 || _snd.dupacks == 2) {
1421 do_output_data =
true;
1422 }
else if (_snd.dupacks == 3) {
1424 if (seg_ack - 1 > _snd.recover) {
1425 _snd.recover = _snd.next - 1;
1427 _snd.ssthresh = std::max((flight_size() - _snd.limited_transfer) / 2, 2 * smss);
1433 _snd.cwnd = _snd.ssthresh + 3 * smss;
1434 }
else if (_snd.dupacks > 3) {
1438 do_output_data =
true;
1440 }
else if (seg_ack > _snd.next) {
1444 }
else if (_snd.window == 0 && th->window > 0) {
1446 do_output_data =
true;
1450 if (in_state(FIN_WAIT_1)) {
1454 if (seg_ack == _snd.next + 1) {
1455 tcp_debug(
"ack: FIN_WAIT_1 -> FIN_WAIT_2\n");
1456 _state = FIN_WAIT_2;
1457 do_local_fin_acked();
1461 if (in_state(FIN_WAIT_2)) {
1468 if (in_state(CLOSING)) {
1469 if (seg_ack == _snd.next + 1) {
1470 tcp_debug(
"ack: CLOSING -> TIME_WAIT\n");
1471 do_local_fin_acked();
1472 return do_time_wait();
1478 if (in_state(LAST_ACK)) {
1479 if (seg_ack == _snd.next + 1) {
1480 tcp_debug(
"ack: LAST_ACK -> CLOSED\n");
1481 do_local_fin_acked();
1486 if (in_state(TIME_WAIT)) {
1500 if (in_state(ESTABLISHED | FIN_WAIT_1 | FIN_WAIT_2)) {
1506 _rcv.data_size += p.len();
1507 _rcv.data.push_back(std::move(p));
1508 _rcv.next += seg_len;
1509 auto merged = merge_out_of_order();
1510 _rcv.window = get_modified_receive_window_size();
1511 signal_data_received();
1522 do_output = should_send_ack(seg_len);
1525 }
else if (in_state(CLOSE_WAIT | CLOSING | LAST_ACK | TIME_WAIT)) {
1533 if (_fin_recvd_promise) {
1534 _fin_recvd_promise->set_value();
1535 _fin_recvd_promise.reset();
1537 if (in_state(CLOSED | LISTEN | SYN_SENT)) {
1542 auto fin_seq = seg_seq + seg_len;
1543 if (fin_seq == _rcv.next) {
1544 _rcv.next = fin_seq + 1;
1545 signal_data_received();
1549 clear_delayed_ack();
1554 if (in_state(SYN_RECEIVED | ESTABLISHED)) {
1555 tcp_debug(
"fin: SYN_RECEIVED or ESTABLISHED -> CLOSE_WAIT\n");
1556 _state = CLOSE_WAIT;
1558 if (in_state(FIN_WAIT_1)) {
1564 tcp_debug(
"fin: FIN_WAIT_1 -> CLOSING\n");
1567 if (in_state(FIN_WAIT_2)) {
1568 tcp_debug(
"fin: FIN_WAIT_2 -> TIME_WAIT\n");
1569 return do_time_wait();
1573 if (do_output || (do_output_data && can_send())) {
1575 clear_delayed_ack();
1580 template <
typename InetTraits>
1581 packet tcp<InetTraits>::tcb::get_transmit_packet() {
1583 if (_snd.unsent.empty()) {
1586 auto can_send = this->can_send();
1589 if (_tcp.hw_features().tx_tso) {
1591 len = _tcp.hw_features().max_packet_len - net::tcp_hdr_len_min - InetTraits::ip_hdr_len_min;
1593 len = std::min(uint16_t(_tcp.hw_features().mtu - net::tcp_hdr_len_min - InetTraits::ip_hdr_len_min), _snd.mss);
1595 can_send = std::min(can_send, len);
1597 if (_snd.unsent.size() == 1 && _snd.unsent.front().len() <= can_send) {
1598 auto p = std::move(_snd.unsent.front());
1599 _snd.unsent.pop_front();
1600 _snd.unsent_len -= p.len();
1604 if (_snd.unsent.front().len() > can_send) {
1605 auto p = _snd.unsent.front().share(0, can_send);
1606 _snd.unsent.front().trim_front(can_send);
1607 _snd.unsent_len -= p.len();
1611 auto p = std::move(_snd.unsent.front());
1612 _snd.unsent.pop_front();
1613 can_send -= p.len();
1614 while (!_snd.unsent.empty()
1615 && _snd.unsent.front().len() <= can_send) {
1616 can_send -= _snd.unsent.front().len();
1617 p.append(std::move(_snd.unsent.front()));
1618 _snd.unsent.pop_front();
1620 if (!_snd.unsent.empty() && can_send) {
1621 auto& q = _snd.unsent.front();
1622 p.append(q.share(0, can_send));
1623 q.trim_front(can_send);
1625 _snd.unsent_len -= p.len();
1629 template <
typename InetTraits>
1630 void tcp<InetTraits>::tcb::output_one(
bool data_retransmit) {
1631 if (in_state(CLOSED)) {
1635 packet p = data_retransmit ? _snd.data.front().p.share() : get_transmit_packet();
1636 packet clone = p.share();
1637 uint16_t len = p.len();
1638 bool syn_on = syn_needs_on();
1639 bool ack_on = ack_needs_on();
1641 auto options_size = _option.get_size(syn_on, ack_on);
1642 auto th = p.prepend_uninitialized_header(tcp_hdr::len + options_size);
1645 h.src_port = _local_port;
1646 h.dst_port = _foreign_port;
1651 clear_delayed_ack();
1657 if (data_retransmit) {
1658 seq = _snd.unacknowledged;
1660 seq = syn_on ? _snd.initial : _snd.next;
1665 h.data_offset = (tcp_hdr::len + options_size) / 4;
1666 h.window = _rcv.window >> _rcv.window_scale;
1670 bool fin_on = fin_needs_on();
1674 _option.fill(th, &h, options_size);
1679 uint16_t pseudo_hdr_seg_len = 0;
1681 oi.tcp_hdr_len = tcp_hdr::len + options_size;
1683 if (_tcp.hw_features().tx_csum_l4_offload) {
1684 oi.needs_csum =
true;
1695 if (_tcp.hw_features().tx_tso && len > _snd.mss) {
1696 oi.tso_seg_size = _snd.mss;
1698 pseudo_hdr_seg_len = tcp_hdr::len + options_size + len;
1701 pseudo_hdr_seg_len = tcp_hdr::len + options_size + len;
1702 oi.needs_csum =
false;
1705 InetTraits::tcp_pseudo_header_checksum(csum, _local_ip, _foreign_ip,
1706 pseudo_hdr_seg_len);
1709 if (_tcp.hw_features().tx_csum_l4_offload) {
1710 checksum = ~csum.get();
1713 checksum = csum.get();
1715 tcp_hdr::write_nbo_checksum(th, checksum);
1717 oi.protocol = ip_protocol_num::tcp;
1719 p.set_offload_info(oi);
1721 if (!data_retransmit && (len || syn_on || fin_on)) {
1724 unsigned nr_transmits = 0;
1725 _snd.data.emplace_back(unacked_segment{std::move(clone),
1726 len, nr_transmits,
now});
1728 if (!_retransmit.armed()) {
1729 start_retransmit_timer(
now);
1737 assert((_snd.window > 0) || ((_snd.window == 0) && (len <= 1)));
1738 queue_packet(std::move(p));
1741 template <
typename InetTraits>
1742 future<> tcp<InetTraits>::tcb::wait_for_data() {
1743 if (!_rcv.data.empty() || foreign_will_not_send()) {
1744 return make_ready_future<>();
1746 _rcv._data_received_promise =
promise<>();
1747 return _rcv._data_received_promise->get_future();
1750 template <
typename InetTraits>
1751 future<> tcp<InetTraits>::tcb::wait_input_shutdown() {
1752 if (!_fin_recvd_promise) {
1753 return make_ready_future<>();
1755 return _fin_recvd_promise->get_future();
1758 template <
typename InetTraits>
1760 tcp<InetTraits>::tcb::abort_reader() noexcept {
1761 if (_rcv._data_received_promise) {
1762 _rcv._data_received_promise->set_exception(
1763 std::make_exception_ptr(std::system_error(ECONNABORTED, std::system_category())));
1764 _rcv._data_received_promise = std::nullopt;
1766 if (_fin_recvd_promise) {
1767 _fin_recvd_promise->set_value();
1768 _fin_recvd_promise.reset();
1772 template <
typename InetTraits>
1773 future<> tcp<InetTraits>::tcb::wait_for_all_data_acked() {
1774 if (_snd.data.empty() && _snd.unsent_len == 0) {
1775 return make_ready_future<>();
1777 _snd._all_data_acked_promise =
promise<>();
1778 return _snd._all_data_acked_promise->get_future();
1781 template <
typename InetTraits>
1789 _rcv.window_scale = _option._local_win_scale = 7;
1791 _rcv.mss = _option._local_mss = local_mss();
1792 _rcv.window = get_default_receive_window_size();
1797 template <
typename InetTraits>
1798 packet tcp<InetTraits>::tcb::read() {
1800 for (
auto&& q : _rcv.data) {
1801 p.append(std::move(q));
1805 _rcv.window = get_default_receive_window_size();
1809 template <
typename InetTraits>
1810 future<> tcp<InetTraits>::tcb::wait_send_available() {
1811 if (_snd.max_queue_space > _snd.current_queue_space) {
1812 return make_ready_future<>();
1814 _snd._send_available_promise =
promise<>();
1815 return _snd._send_available_promise->get_future();
1818 template <
typename InetTraits>
1819 future<> tcp<InetTraits>::tcb::send(packet p) {
1821 if (_snd.closed || in_state(CLOSED)) {
1822 return make_exception_future<>(tcp_reset_error());
1826 _snd.current_queue_space += len;
1827 _snd.unsent_len += len;
1828 _snd.unsent.push_back(std::move(p));
1830 if (can_send() > 0) {
1834 return wait_send_available();
1837 template <
typename InetTraits>
1838 void tcp<InetTraits>::tcb::close() noexcept {
1839 if (in_state(CLOSED) || _snd.closed) {
1843 (void)wait_for_all_data_acked().then([
this, zis = this->shared_from_this()] ()
mutable {
1845 tcp_debug(
"close: unsent_len=%d\n", _snd.unsent_len);
1846 if (in_state(CLOSE_WAIT)) {
1847 tcp_debug(
"close: CLOSE_WAIT -> LAST_ACK\n");
1849 }
else if (in_state(ESTABLISHED)) {
1850 tcp_debug(
"close: ESTABLISHED -> FIN_WAIT_1\n");
1851 _state = FIN_WAIT_1;
1862 template <
typename InetTraits>
1863 bool tcp<InetTraits>::tcb::should_send_ack(uint16_t seg_len) {
1865 if (seg_len > _rcv.mss) {
1866 _nr_full_seg_received = 0;
1867 _delayed_ack.cancel();
1872 if (seg_len == _rcv.mss) {
1873 if (_nr_full_seg_received++ >= 1) {
1874 _nr_full_seg_received = 0;
1875 _delayed_ack.cancel();
1881 if (_delayed_ack.armed()) {
1888 _delayed_ack.arm(200ms);
1892 template <
typename InetTraits>
1893 void tcp<InetTraits>::tcb::clear_delayed_ack() noexcept {
1894 _delayed_ack.cancel();
1897 template <
typename InetTraits>
1898 bool tcp<InetTraits>::tcb::merge_out_of_order() {
1899 bool merged =
false;
1900 if (_rcv.out_of_order.map.empty()) {
1903 for (
auto it = _rcv.out_of_order.map.begin(); it != _rcv.out_of_order.map.end();) {
1904 auto& p = it->second;
1905 auto seg_beg = it->first;
1906 auto seg_len = p.len();
1907 auto seg_end = seg_beg + seg_len;
1908 if (seg_beg <= _rcv.next && _rcv.next < seg_end) {
1911 auto trim = _rcv.next - seg_beg;
1916 _rcv.next += seg_len;
1917 _rcv.data_size += p.len();
1918 _rcv.data.push_back(std::move(p));
1920 it = _rcv.out_of_order.map.erase(it);
1922 }
else if (_rcv.next >= seg_end) {
1924 it = _rcv.out_of_order.map.erase(it);
1935 template <
typename InetTraits>
1936 void tcp<InetTraits>::tcb::insert_out_of_order(tcp_seq seg, packet p) {
1937 _rcv.out_of_order.merge(seg, std::move(p));
1940 template <
typename InetTraits>
1941 void tcp<InetTraits>::tcb::trim_receive_data_after_window() {
1945 template <
typename InetTraits>
1946 void tcp<InetTraits>::tcb::persist() {
1947 tcp_debug(
"persist timer fired\n");
1949 _snd.window_probe =
true;
1950 _snd.zero_window_probing_out++;
1952 _snd.window_probe =
false;
1956 _persist_time_out = std::min(_persist_time_out * 2, _rto_max);
1957 start_persist_timer();
1960 template <
typename InetTraits>
1961 void tcp<InetTraits>::tcb::retransmit() {
1962 auto output_update_rto = [
this] {
1965 this->_rto = std::min(this->_rto * 2, this->_rto_max);
1966 start_retransmit_timer();
1970 if (syn_needs_on()) {
1971 if (_snd.syn_retransmit++ < _max_nr_retransmit) {
1972 output_update_rto();
1974 _connect_done.set_exception(tcp_connect_error());
1981 if (fin_needs_on()) {
1982 if (_snd.fin_retransmit++ < _max_nr_retransmit) {
1983 output_update_rto();
1991 if (_snd.data.empty()) {
1996 auto& unacked_seg = _snd.data.front();
2000 uint32_t smss = _snd.mss;
2001 if (unacked_seg.nr_transmits == 0) {
2002 _snd.ssthresh = std::max(flight_size() / 2, 2 * smss);
2005 _snd.recover = _snd.next - 1;
2009 exit_fast_recovery();
2011 if (unacked_seg.nr_transmits < _max_nr_retransmit) {
2012 unacked_seg.nr_transmits++;
2020 output_update_rto();
2023 template <
typename InetTraits>
2024 void tcp<InetTraits>::tcb::fast_retransmit() {
2025 if (!_snd.data.empty()) {
2026 auto& unacked_seg = _snd.data.front();
2027 unacked_seg.nr_transmits++;
2033 template <
typename InetTraits>
2034 void tcp<InetTraits>::tcb::update_rto(clock_type::time_point tx_time) {
2036 auto R = std::chrono::duration_cast<std::chrono::milliseconds>(
clock_type::now() - tx_time);
2037 if (_snd.first_rto_sample) {
2038 _snd.first_rto_sample =
false;
2041 _snd.rttvar = R / 2;
2047 auto delta = _snd.srtt > R ? (_snd.srtt - R) : (R - _snd.srtt);
2048 _snd.rttvar = _snd.rttvar * 3 / 4 + delta / 4;
2049 _snd.srtt = _snd.srtt * 7 / 8 + R / 8;
2052 _rto = _snd.srtt + std::max(_rto_clk_granularity, 4 * _snd.rttvar);
2055 _rto = std::max(_rto, _rto_min);
2056 _rto = std::min(_rto, _rto_max);
2059 template <
typename InetTraits>
2060 void tcp<InetTraits>::tcb::update_cwnd(uint32_t acked_bytes) {
2061 uint32_t smss = _snd.mss;
2062 if (_snd.cwnd < _snd.ssthresh) {
2064 _snd.cwnd += std::min(acked_bytes, smss);
2067 uint32_t round_up = 1;
2068 _snd.cwnd += std::max(round_up, smss * smss / _snd.cwnd);
2072 template <
typename InetTraits>
2073 void tcp<InetTraits>::tcb::cleanup() {
2074 _snd.unsent.clear();
2076 _rcv.out_of_order.map.clear();
2079 stop_retransmit_timer();
2080 clear_delayed_ack();
2084 template <
typename InetTraits>
2085 tcp_seq tcp<InetTraits>::tcb::get_isn() {
2090 using namespace std::chrono;
2092 hash[0] = _local_ip.ip;
2093 hash[1] = _foreign_ip.ip;
2094 hash[2] = (_local_port << 16) + _foreign_port;
2095 hash[3] = _isn_secret.key[15];
2096 CryptoPP::Weak::MD5::Transform(hash, _isn_secret.key);
2098 auto m = duration_cast<microseconds>(
clock_type::now().time_since_epoch());
2099 seq += m.count() / 4;
2100 return make_seq(seq);
2103 template <
typename InetTraits>
2104 std::optional<typename InetTraits::l4packet> tcp<InetTraits>::tcb::get_packet() {
2105 _poll_active =
false;
2106 if (_packetq.empty()) {
2110 if (in_state(CLOSED)) {
2111 return std::optional<typename InetTraits::l4packet>();
2114 assert(!_packetq.empty());
2116 auto p = std::move(_packetq.front());
2117 _packetq.pop_front();
2118 if (!_packetq.empty() || (_snd.dupacks < 3 && can_send() > 0 && (_snd.window > 0))) {
2128 template <
typename InetTraits>
2129 void tcp<InetTraits>::connection::close_read() noexcept {
2130 _tcb->abort_reader();
2133 template <
typename InetTraits>
2134 void tcp<InetTraits>::connection::close_write() noexcept {
2138 template <
typename InetTraits>
2139 void tcp<InetTraits>::connection::shutdown_connect() {
2140 if (_tcb->syn_needs_on()) {
2141 _tcb->_connect_done.set_exception(tcp_refused_error());
2149 template <
typename InetTraits>
2150 typename tcp<InetTraits>::tcb::isn_secret tcp<InetTraits>::tcb::_isn_secret;
bool try_wait(size_t nr=1) noexcept
Definition: semaphore.hh:434
void signal(size_t nr=1) noexcept
Definition: semaphore.hh:396
Definition: shared_ptr.hh:150
Result then(Func &&func) noexcept
Schedule a block of code to run when the future is ready.
Definition: future.hh:1410
Low-resolution and efficient steady clock.
Definition: lowres_clock.hh:59
holds the metric definition.
Definition: metrics_registration.hh:94
metric_groups & add_group(const group_name_type &name, const std::initializer_list< metric_definition > &l)
Add metrics belonging to the same group.
void set_value(A &&... a) noexcept
Sets the promises value.
Definition: future.hh:982
void set_exception(std::exception_ptr &&ex) noexcept
Marks the promise as failed.
Definition: future.hh:990
size_t size() const noexcept
Returns the number of items currently in the queue.
Definition: queue.hh:109
size_t max_size() const noexcept
Definition: queue.hh:117
void abort(std::exception_ptr ex) noexcept
Definition: queue.hh:131
T pop() noexcept
Pop an item.
Definition: queue.hh:208
future not_empty() noexcept
Definition: queue.hh:308
Definition: socket_defs.hh:47
void rearm(time_point until, std::optional< duration > period={}) noexcept
Definition: timer.hh:168
future< T > get_future() noexcept
Gets the promise's associated future.
Definition: future.hh:1917
future now()
Returns a ready future.
Definition: later.hh:35
impl::metric_definition_impl make_counter(metric_name_type name, T &&val, description d=description(), std::vector< label_instance > labels={})
create a counter metric
Definition: metrics.hh:529
server_socket listen(socket_address sa)
future< connected_socket > connect(socket_address sa)
header for metrics creation.
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
SEASTAR_MODULE_EXPORT_BEGIN shard_id this_shard_id() noexcept
Returns shard_id of the of the current shard.
Definition: shard_id.hh:50
Definition: ethernet.hh:37