Seastar
High performance C++ framework for concurrent servers
posix-stack.hh
1 /*
2  * This file is open source software, licensed to you under the terms
3  * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4  * distributed with this work for additional information regarding copyright
5  * ownership. You may not use this file except in compliance with the License.
6  *
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an
13  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14  * KIND, either express or implied. See the License for the
15  * specific language governing permissions and limitations
16  * under the License.
17  */
18 /*
19  * Copyright (C) 2014 Cloudius Systems, Ltd.
20  */
21 
22 #pragma once
23 
24 #include <seastar/core/sharded.hh>
25 #include <seastar/core/internal/pollable_fd.hh>
26 #include <seastar/net/stack.hh>
27 #include <seastar/core/polymorphic_temporary_buffer.hh>
28 #include <seastar/core/internal/buffer_allocator.hh>
29 #include <seastar/util/program-options.hh>
30 
31 namespace seastar {
32 
33 namespace net {
34 
35 using namespace seastar;
36 
37 // We can't keep this in any of the socket servers as instance members, because a connection can
38 // outlive the socket server. To avoid having the whole socket_server tracked as a shared pointer,
39 // we will have a conntrack structure.
40 //
41 // Right now this class is used by the posix_server_socket_impl, but it could be used by any other.
42 class conntrack {
43  class load_balancer {
44  std::vector<unsigned> _cpu_load;
45  public:
46  load_balancer() : _cpu_load(size_t(smp::count), 0) {}
47  void closed_cpu(shard_id cpu) {
48  _cpu_load[cpu]--;
49  }
50  shard_id next_cpu() {
51  // FIXME: The naive algorithm will just round robin the connections around the shards.
52  // A more complex version can keep track of the amount of activity in each connection,
53  // and use that information.
54  auto min_el = std::min_element(_cpu_load.begin(), _cpu_load.end());
55  auto cpu = shard_id(std::distance(_cpu_load.begin(), min_el));
56  _cpu_load[cpu]++;
57  return cpu;
58  }
59  shard_id force_cpu(shard_id cpu) {
60  _cpu_load[cpu]++;
61  return cpu;
62  }
63  };
64 
66  void closed_cpu(shard_id cpu) {
67  _lb->closed_cpu(cpu);
68  }
69 public:
70  class handle {
71  shard_id _host_cpu;
72  shard_id _target_cpu;
74  public:
75  handle() : _lb(nullptr) {}
76  handle(shard_id cpu, lw_shared_ptr<load_balancer> lb)
77  : _host_cpu(this_shard_id())
78  , _target_cpu(cpu)
79  , _lb(make_foreign(std::move(lb))) {}
80 
81  handle(const handle&) = delete;
82  handle(handle&&) = default;
83  ~handle() {
84  if (!_lb) {
85  return;
86  }
87  // FIXME: future is discarded
88  (void)smp::submit_to(_host_cpu, [cpu = _target_cpu, lb = std::move(_lb)] {
89  lb->closed_cpu(cpu);
90  });
91  }
92  shard_id cpu() {
93  return _target_cpu;
94  }
95  };
96  friend class handle;
97 
98  conntrack() : _lb(make_lw_shared<load_balancer>()) {}
99  handle get_handle() {
100  return handle(_lb->next_cpu(), _lb);
101  }
102  handle get_handle(shard_id cpu) {
103  return handle(_lb->force_cpu(cpu), _lb);
104  }
105 };
106 
107 class posix_data_source_impl final : public data_source_impl, private internal::buffer_allocator {
108  std::pmr::polymorphic_allocator<char>* _buffer_allocator;
109  pollable_fd _fd;
111 private:
112  virtual temporary_buffer<char> allocate_buffer() override;
113 public:
115  std::pmr::polymorphic_allocator<char>* allocator=memory::malloc_allocator)
116  : _buffer_allocator(allocator), _fd(std::move(fd)), _config(config) {
117  }
118  future<temporary_buffer<char>> get() override;
119  future<> close() override;
120 };
121 
123  pollable_fd _fd;
124  packet _p;
125 public:
126  explicit posix_data_sink_impl(pollable_fd fd) : _fd(std::move(fd)) {}
127  using data_sink_impl::put;
128  future<> put(packet p) override;
129  future<> put(temporary_buffer<char> buf) override;
130  future<> close() override;
131  bool can_batch_flushes() const noexcept override { return true; }
132  void on_batch_flush_error() noexcept override;
133 };
134 
135 class posix_ap_server_socket_impl : public server_socket_impl {
136  using protocol_and_socket_address = std::tuple<int, socket_address>;
137  struct connection {
138  pollable_fd fd;
139  socket_address addr;
140  conntrack::handle connection_tracking_handle;
141  connection(pollable_fd xfd, socket_address xaddr, conntrack::handle cth) : fd(std::move(xfd)), addr(xaddr), connection_tracking_handle(std::move(cth)) {}
142  };
143  using sockets_map_t = std::unordered_map<protocol_and_socket_address, promise<accept_result>>;
144  using conn_map_t = std::unordered_multimap<protocol_and_socket_address, connection>;
145  static thread_local sockets_map_t sockets;
146  static thread_local conn_map_t conn_q;
147  int _protocol;
148  socket_address _sa;
149  std::pmr::polymorphic_allocator<char>* _allocator;
150 public:
151  explicit posix_ap_server_socket_impl(int protocol, socket_address sa, std::pmr::polymorphic_allocator<char>* allocator = memory::malloc_allocator) : _protocol(protocol), _sa(sa), _allocator(allocator) {}
152  virtual future<accept_result> accept() override;
153  virtual void abort_accept() override;
154  socket_address local_address() const override {
155  return _sa;
156  }
157  static void move_connected_socket(int protocol, socket_address sa, pollable_fd fd, socket_address addr, conntrack::handle handle, std::pmr::polymorphic_allocator<char>* allocator);
158 
159  template <typename T>
160  friend class std::hash;
161 };
162 
163 class posix_server_socket_impl : public server_socket_impl {
164  socket_address _sa;
165  int _protocol;
166  pollable_fd _lfd;
167  conntrack _conntrack;
168  server_socket::load_balancing_algorithm _lba;
169  shard_id _fixed_cpu;
170  std::pmr::polymorphic_allocator<char>* _allocator;
171 public:
172  explicit posix_server_socket_impl(int protocol, socket_address sa, pollable_fd lfd,
173  server_socket::load_balancing_algorithm lba, shard_id fixed_cpu,
174  std::pmr::polymorphic_allocator<char>* allocator=memory::malloc_allocator) : _sa(sa), _protocol(protocol), _lfd(std::move(lfd)), _lba(lba), _fixed_cpu(fixed_cpu), _allocator(allocator) {}
175  virtual future<accept_result> accept() override;
176  virtual void abort_accept() override;
177  virtual socket_address local_address() const override;
178 };
179 
180 class posix_reuseport_server_socket_impl : public server_socket_impl {
181  socket_address _sa;
182  int _protocol;
183  pollable_fd _lfd;
184  std::pmr::polymorphic_allocator<char>* _allocator;
185 public:
186  explicit posix_reuseport_server_socket_impl(int protocol, socket_address sa, pollable_fd lfd,
187  std::pmr::polymorphic_allocator<char>* allocator=memory::malloc_allocator) : _sa(sa), _protocol(protocol), _lfd(std::move(lfd)), _allocator(allocator) {}
188  virtual future<accept_result> accept() override;
189  virtual void abort_accept() override;
190  virtual socket_address local_address() const override;
191 };
192 
194 private:
195  const bool _reuseport;
196 protected:
197  std::pmr::polymorphic_allocator<char>* _allocator;
198 public:
199  explicit posix_network_stack(const program_options::option_group& opts, std::pmr::polymorphic_allocator<char>* allocator=memory::malloc_allocator);
200  virtual server_socket listen(socket_address sa, listen_options opts) override;
201  virtual ::seastar::socket socket() override;
202  virtual net::udp_channel make_udp_channel(const socket_address&) override;
203  virtual net::datagram_channel make_unbound_datagram_channel(sa_family_t) override;
205  static future<std::unique_ptr<network_stack>> create(const program_options::option_group& opts, std::pmr::polymorphic_allocator<char>* allocator=memory::malloc_allocator) {
206  return make_ready_future<std::unique_ptr<network_stack>>(std::unique_ptr<network_stack>(new posix_network_stack(opts, allocator)));
207  }
208  virtual bool has_per_core_namespace() override { return _reuseport; };
209  bool supports_ipv6() const override;
210  std::vector<network_interface> network_interfaces() override;
211 };
212 
214 private:
215  const bool _reuseport;
216 public:
217  posix_ap_network_stack(const program_options::option_group& opts, std::pmr::polymorphic_allocator<char>* allocator=memory::malloc_allocator);
218  virtual server_socket listen(socket_address sa, listen_options opts) override;
219  static future<std::unique_ptr<network_stack>> create(const program_options::option_group& opts, std::pmr::polymorphic_allocator<char>* allocator=memory::malloc_allocator) {
220  return make_ready_future<std::unique_ptr<network_stack>>(std::unique_ptr<network_stack>(new posix_ap_network_stack(opts, allocator)));
221  }
222 };
223 
224 network_stack_entry register_posix_stack();
225 }
226 
227 }
Definition: iostream.hh:103
Definition: iostream.hh:60
Definition: sharded.hh:845
A representation of a possibly not-yet-computed value.
Definition: future.hh:1238
Definition: posix-stack.hh:70
Definition: posix-stack.hh:42
Definition: api.hh:115
Definition: packet.hh:87
Definition: posix-stack.hh:213
Definition: posix-stack.hh:135
Definition: posix-stack.hh:122
Definition: posix-stack.hh:107
Definition: posix-stack.hh:193
std::vector< network_interface > network_interfaces() override
Definition: posix-stack.hh:163
Definition: api.hh:427
Definition: pollable_fd.hh:137
Definition: program-options.hh:290
A listening socket, waiting to accept incoming network connections.
Definition: api.hh:325
static futurize_t< std::invoke_result_t< Func > > submit_to(unsigned t, smp_submit_to_options options, Func &&func) noexcept
Definition: smp.hh:354
Definition: socket_defs.hh:47
Definition: api.hh:282
server_socket listen(socket_address sa)
net::udp_channel make_udp_channel()
net::datagram_channel make_unbound_datagram_channel(sa_family_t family)
net::datagram_channel make_bound_datagram_channel(const socket_address &local)
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
SEASTAR_MODULE_EXPORT_BEGIN shard_id this_shard_id() noexcept
Returns shard_id of the of the current shard.
Definition: shard_id.hh:50
Definition: api.hh:459
Definition: api.hh:391