Seastar
High performance C++ framework for concurrent servers
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Modules Pages
posix-stack.hh
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18/*
19 * Copyright (C) 2014 Cloudius Systems, Ltd.
20 */
21
22#pragma once
23#ifndef SEASTAR_MODULE
24#include <unordered_set>
25#endif
26#include <seastar/core/sharded.hh>
27#include <seastar/core/internal/pollable_fd.hh>
28#include <seastar/net/stack.hh>
29#include <seastar/core/polymorphic_temporary_buffer.hh>
30#include <seastar/core/internal/buffer_allocator.hh>
31#include <seastar/util/program-options.hh>
32
33#include <unordered_set>
34
35namespace seastar {
36
37namespace net {
38
39using namespace seastar;
40
41// We can't keep this in any of the socket servers as instance members, because a connection can
42// outlive the socket server. To avoid having the whole socket_server tracked as a shared pointer,
43// we will have a conntrack structure.
44//
45// Right now this class is used by the posix_server_socket_impl, but it could be used by any other.
46class conntrack {
47 class load_balancer {
48 std::vector<unsigned> _cpu_load;
49 public:
50 load_balancer() : _cpu_load(size_t(smp::count), 0) {}
51 void closed_cpu(shard_id cpu) {
52 _cpu_load[cpu]--;
53 }
54 shard_id next_cpu() {
55 // FIXME: The naive algorithm will just round robin the connections around the shards.
56 // A more complex version can keep track of the amount of activity in each connection,
57 // and use that information.
58 auto min_el = std::min_element(_cpu_load.begin(), _cpu_load.end());
59 auto cpu = shard_id(std::distance(_cpu_load.begin(), min_el));
60 _cpu_load[cpu]++;
61 return cpu;
62 }
63 shard_id force_cpu(shard_id cpu) {
64 _cpu_load[cpu]++;
65 return cpu;
66 }
67 };
68
70 void closed_cpu(shard_id cpu) {
71 _lb->closed_cpu(cpu);
72 }
73public:
74 class handle {
75 shard_id _host_cpu;
76 shard_id _target_cpu;
78 public:
79 handle() : _lb(nullptr) {}
80 handle(shard_id cpu, lw_shared_ptr<load_balancer> lb)
81 : _host_cpu(this_shard_id())
82 , _target_cpu(cpu)
83 , _lb(make_foreign(std::move(lb))) {}
84
85 handle(const handle&) = delete;
86 handle(handle&&) = default;
87 ~handle() {
88 if (!_lb) {
89 return;
90 }
91 // FIXME: future is discarded
92 (void)smp::submit_to(_host_cpu, [cpu = _target_cpu, lb = std::move(_lb)] {
93 lb->closed_cpu(cpu);
94 });
95 }
96 shard_id cpu() {
97 return _target_cpu;
98 }
99 };
100 friend class handle;
101
102 conntrack() : _lb(make_lw_shared<load_balancer>()) {}
103 handle get_handle() {
104 return handle(_lb->next_cpu(), _lb);
105 }
106 handle get_handle(shard_id cpu) {
107 return handle(_lb->force_cpu(cpu), _lb);
108 }
109};
110
111class posix_data_source_impl final : public data_source_impl, private internal::buffer_allocator {
112 std::pmr::polymorphic_allocator<char>* _buffer_allocator;
113 pollable_fd _fd;
115private:
116 virtual temporary_buffer<char> allocate_buffer() override;
117public:
119 std::pmr::polymorphic_allocator<char>* allocator=memory::malloc_allocator)
120 : _buffer_allocator(allocator), _fd(std::move(fd)), _config(config) {
121 }
122 future<temporary_buffer<char>> get() override;
123 future<> close() override;
124};
125
127 pollable_fd _fd;
128 packet _p;
129public:
130 explicit posix_data_sink_impl(pollable_fd fd) : _fd(std::move(fd)) {}
131 using data_sink_impl::put;
132 future<> put(packet p) override;
133 future<> put(temporary_buffer<char> buf) override;
134 future<> close() override;
135 bool can_batch_flushes() const noexcept override { return true; }
136 void on_batch_flush_error() noexcept override;
137};
138
139class posix_ap_server_socket_impl : public server_socket_impl {
140 using protocol_and_socket_address = std::tuple<int, socket_address>;
141 struct connection {
142 pollable_fd fd;
143 socket_address addr;
144 conntrack::handle connection_tracking_handle;
145 connection(pollable_fd xfd, socket_address xaddr, conntrack::handle cth) : fd(std::move(xfd)), addr(xaddr), connection_tracking_handle(std::move(cth)) {}
146 };
147 using port_map_t = std::unordered_set<protocol_and_socket_address>;
148 using sockets_map_t = std::unordered_map<protocol_and_socket_address, promise<accept_result>>;
149 using conn_map_t = std::unordered_multimap<protocol_and_socket_address, connection>;
150 static thread_local port_map_t ports;
151 static thread_local sockets_map_t sockets;
152 static thread_local conn_map_t conn_q;
153 int _protocol;
154 socket_address _sa;
155 std::pmr::polymorphic_allocator<char>* _allocator;
156public:
157 explicit posix_ap_server_socket_impl(int protocol, socket_address sa, std::pmr::polymorphic_allocator<char>* allocator = memory::malloc_allocator);
159 virtual future<accept_result> accept() override;
160 virtual void abort_accept() override;
161 socket_address local_address() const override {
162 return _sa;
163 }
164 static void move_connected_socket(int protocol, socket_address sa, pollable_fd fd, socket_address addr, conntrack::handle handle, std::pmr::polymorphic_allocator<char>* allocator);
165
166 template <typename T>
167 friend class std::hash;
168};
169
170class posix_server_socket_impl : public server_socket_impl {
171 socket_address _sa;
172 int _protocol;
173 pollable_fd _lfd;
174 conntrack _conntrack;
175 server_socket::load_balancing_algorithm _lba;
176 shard_id _fixed_cpu;
177 std::pmr::polymorphic_allocator<char>* _allocator;
178public:
179 explicit posix_server_socket_impl(int protocol, socket_address sa, pollable_fd lfd,
180 server_socket::load_balancing_algorithm lba, shard_id fixed_cpu,
181 std::pmr::polymorphic_allocator<char>* allocator=memory::malloc_allocator) : _sa(sa), _protocol(protocol), _lfd(std::move(lfd)), _lba(lba), _fixed_cpu(fixed_cpu), _allocator(allocator) {}
182 virtual future<accept_result> accept() override;
183 virtual void abort_accept() override;
184 virtual socket_address local_address() const override;
185};
186
187class posix_reuseport_server_socket_impl : public server_socket_impl {
188 socket_address _sa;
189 int _protocol;
190 pollable_fd _lfd;
191 std::pmr::polymorphic_allocator<char>* _allocator;
192public:
194 std::pmr::polymorphic_allocator<char>* allocator=memory::malloc_allocator) : _sa(sa), _protocol(protocol), _lfd(std::move(lfd)), _allocator(allocator) {}
195 virtual future<accept_result> accept() override;
196 virtual void abort_accept() override;
197 virtual socket_address local_address() const override;
198};
199
201private:
202 const bool _reuseport;
203protected:
204 std::pmr::polymorphic_allocator<char>* _allocator;
205public:
206 explicit posix_network_stack(const program_options::option_group& opts, std::pmr::polymorphic_allocator<char>* allocator=memory::malloc_allocator);
207 virtual server_socket listen(socket_address sa, listen_options opts) override;
208 virtual ::seastar::socket socket() override;
209 virtual net::udp_channel make_udp_channel(const socket_address&) override;
210 virtual net::datagram_channel make_unbound_datagram_channel(sa_family_t) override;
211 virtual net::datagram_channel make_bound_datagram_channel(const socket_address& local) override;
212 static future<std::unique_ptr<network_stack>> create(const program_options::option_group& opts, std::pmr::polymorphic_allocator<char>* allocator=memory::malloc_allocator) {
213 return make_ready_future<std::unique_ptr<network_stack>>(std::unique_ptr<network_stack>(new posix_network_stack(opts, allocator)));
214 }
215 virtual bool has_per_core_namespace() override { return _reuseport; };
216 bool supports_ipv6() const override;
217 std::vector<network_interface> network_interfaces() override;
218};
219
221private:
222 const bool _reuseport;
223public:
224 posix_ap_network_stack(const program_options::option_group& opts, std::pmr::polymorphic_allocator<char>* allocator=memory::malloc_allocator);
225 virtual server_socket listen(socket_address sa, listen_options opts) override;
226 static future<std::unique_ptr<network_stack>> create(const program_options::option_group& opts, std::pmr::polymorphic_allocator<char>* allocator=memory::malloc_allocator) {
227 return make_ready_future<std::unique_ptr<network_stack>>(std::unique_ptr<network_stack>(new posix_ap_network_stack(opts, allocator)));
228 }
229};
230
231network_stack_entry register_posix_stack();
232}
233
234}
Definition: iostream.hh:104
Definition: iostream.hh:61
Definition: sharded.hh:945
A representation of a possibly not-yet-computed value.
Definition: future.hh:1240
Definition: posix-stack.hh:74
Definition: posix-stack.hh:46
Definition: api.hh:116
Definition: packet.hh:87
Definition: posix-stack.hh:220
Definition: posix-stack.hh:139
Definition: posix-stack.hh:126
Definition: posix-stack.hh:111
Definition: posix-stack.hh:200
std::vector< network_interface > network_interfaces() override
Definition: posix-stack.hh:170
Definition: api.hh:447
Definition: pollable_fd.hh:136
Definition: program-options.hh:293
A listening socket, waiting to accept incoming network connections.
Definition: api.hh:326
static futurize_t< std::invoke_result_t< Func > > submit_to(unsigned t, smp_submit_to_options options, Func &&func) noexcept
Definition: smp.hh:354
Definition: socket_defs.hh:47
Definition: api.hh:283
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
shard_id this_shard_id() noexcept
Returns shard_id of the of the current shard.
Definition: shard_id.hh:52
Definition: api.hh:479
Definition: api.hh:397