Seastar
High performance C++ framework for concurrent servers
server.hh
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18/*
19 * Copyright 2021 ScyllaDB
20 */
21
22#pragma once
23
24#include <map>
25#include <functional>
26
27#include <seastar/http/request_parser.hh>
28#include <seastar/core/seastar.hh>
29#include <seastar/core/sstring.hh>
30#include <seastar/net/api.hh>
31#include <seastar/core/gate.hh>
32#include <seastar/core/queue.hh>
33#include <seastar/core/when_all.hh>
34
35namespace seastar::experimental::websocket {
36
37using handler_t = std::function<future<>(input_stream<char>&, output_stream<char>&)>;
38
39class server;
40
44
48class exception : public std::exception {
49 std::string _msg;
50public:
51 exception(std::string_view msg) : _msg(msg) {}
52 virtual const char* what() const noexcept {
53 return _msg.c_str();
54 }
55};
56
60enum opcodes {
61 CONTINUATION = 0x0,
62 TEXT = 0x1,
63 BINARY = 0x2,
64 CLOSE = 0x8,
65 PING = 0x9,
66 PONG = 0xA,
67 INVALID = 0xFF,
68};
69
71 static constexpr uint8_t FIN = 7;
72 static constexpr uint8_t RSV1 = 6;
73 static constexpr uint8_t RSV2 = 5;
74 static constexpr uint8_t RSV3 = 4;
75 static constexpr uint8_t MASKED = 7;
76
77 uint8_t fin : 1;
78 uint8_t rsv1 : 1;
79 uint8_t rsv2 : 1;
80 uint8_t rsv3 : 1;
81 uint8_t opcode : 4;
82 uint8_t masked : 1;
83 uint8_t length : 7;
84 frame_header(const char* input) {
85 this->fin = (input[0] >> FIN) & 1;
86 this->rsv1 = (input[0] >> RSV1) & 1;
87 this->rsv2 = (input[0] >> RSV2) & 1;
88 this->rsv3 = (input[0] >> RSV3) & 1;
89 this->opcode = input[0] & 0b1111;
90 this->masked = (input[1] >> MASKED) & 1;
91 this->length = (input[1] & 0b1111111);
92 }
93 // Returns length of the rest of the header.
94 uint64_t get_rest_of_header_length() {
95 size_t next_read_length = sizeof(uint32_t); // Masking key
96 if (length == 126) {
97 next_read_length += sizeof(uint16_t);
98 } else if (length == 127) {
99 next_read_length += sizeof(uint64_t);
100 }
101 return next_read_length;
102 }
103 uint8_t get_fin() {return fin;}
104 uint8_t get_rsv1() {return rsv1;}
105 uint8_t get_rsv2() {return rsv2;}
106 uint8_t get_rsv3() {return rsv3;}
107 uint8_t get_opcode() {return opcode;}
108 uint8_t get_masked() {return masked;}
109 uint8_t get_length() {return length;}
110
111 bool is_opcode_known() {
112 //https://datatracker.ietf.org/doc/html/rfc6455#section-5.1
113 return opcode < 0xA && !(opcode < 0x8 && opcode > 0x2);
114 }
115};
116
118 enum class parsing_state : uint8_t {
119 flags_and_payload_data,
120 payload_length_and_mask,
121 payload
122 };
123 enum class connection_state : uint8_t {
124 valid,
125 closed,
126 error
127 };
130 // What parser is currently doing.
131 parsing_state _state;
132 // State of connection - can be valid, closed or should be closed
133 // due to error.
134 connection_state _cstate;
135 sstring _buffer;
136 std::unique_ptr<frame_header> _header;
137 uint64_t _payload_length;
138 uint32_t _masking_key;
139 buff_t _result;
140
141 static future<consumption_result_t> dont_stop() {
142 return make_ready_future<consumption_result_t>(continue_consuming{});
143 }
144 static future<consumption_result_t> stop(buff_t data) {
145 return make_ready_future<consumption_result_t>(stop_consuming(std::move(data)));
146 }
147
148 // Removes mask from payload given in p.
149 void remove_mask(buff_t& p, size_t n) {
150 char *payload = p.get_write();
151 for (uint64_t i = 0, j = 0; i < n; ++i, j = (j + 1) % 4) {
152 payload[i] ^= static_cast<char>(((_masking_key << (j * 8)) >> 24));
153 }
154 }
155public:
156 websocket_parser() : _state(parsing_state::flags_and_payload_data),
157 _cstate(connection_state::valid),
158 _payload_length(0),
159 _masking_key(0) {}
161 bool is_valid() { return _cstate == connection_state::valid; }
162 bool eof() { return _cstate == connection_state::closed; }
163 opcodes opcode() const;
164 buff_t result();
165};
166
170class connection : public boost::intrusive::list_base_hook<> {
172
176 class connection_source_impl final : public data_source_impl {
177 queue<buff_t>* data;
178
179 public:
180 connection_source_impl(queue<buff_t>* data) : data(data) {}
181
182 virtual future<buff_t> get() override {
183 return data->pop_eventually().then_wrapped([](future<buff_t> f){
184 try {
185 return make_ready_future<buff_t>(std::move(f.get()));
186 } catch(...) {
187 return current_exception_as_future<buff_t>();
188 }
189 });
190 }
191
192 virtual future<> close() override {
193 data->push(buff_t(0));
194 return make_ready_future<>();
195 }
196 };
197
201 class connection_sink_impl final : public data_sink_impl {
202 queue<buff_t>* data;
203 public:
204 connection_sink_impl(queue<buff_t>* data) : data(data) {}
205
206 virtual future<> put(net::packet d) override {
207 net::fragment f = d.frag(0);
208 return data->push_eventually(temporary_buffer<char>{std::move(f.base), f.size});
209 }
210
211 size_t buffer_size() const noexcept override {
212 return data->max_size();
213 }
214
215 virtual future<> close() override {
216 data->push(buff_t(0));
217 return make_ready_future<>();
218 }
219 };
220
225 future<> handle_ping();
230 future<> handle_pong();
231
232 static const size_t PIPE_SIZE = 512;
233 server& _server;
235 input_stream<char> _read_buf;
236 output_stream<char> _write_buf;
237 http_request_parser _http_parser;
238 bool _done = false;
239
240 websocket_parser _websocket_parser;
241 queue <temporary_buffer<char>> _input_buffer;
242 input_stream<char> _input;
243 queue <temporary_buffer<char>> _output_buffer;
244 output_stream<char> _output;
245
246 sstring _subprotocol;
247 handler_t _handler;
248public:
254 : _server(server)
255 , _fd(std::move(fd))
256 , _read_buf(_fd.input())
257 , _write_buf(_fd.output())
258 , _input_buffer{PIPE_SIZE}
259 , _output_buffer{PIPE_SIZE}
260 {
262 std::make_unique<connection_source_impl>(&_input_buffer)}};
264 std::make_unique<connection_sink_impl>(&_output_buffer)}};
265 on_new_connection();
266 }
267 ~connection();
268
277 future<> close(bool send_close = true);
278
279protected:
280 future<> read_loop();
281 future<> read_one();
282 future<> read_http_upgrade_request();
283 future<> response_loop();
284 void on_new_connection();
289
290};
291
298class server {
299 std::vector<server_socket> _listeners;
300 boost::intrusive::list<connection> _connections;
301 std::map<std::string, handler_t> _handlers;
302 gate _task_gate;
303public:
315
320
321 bool is_handler_registered(std::string const& name);
322
323 void register_handler(std::string&& name, handler_t handler);
324
325 friend class connection;
326protected:
327 void accept(server_socket &listener);
328 future<stop_iteration> accept_one(server_socket &listener);
329};
330
332
333}
Definition: api.hh:183
Definition: iostream.hh:237
Definition: iostream.hh:104
Definition: iostream.hh:163
Definition: iostream.hh:61
Definition: iostream.hh:69
a WebSocket connection
Definition: server.hh:170
an error in handling a WebSocket connection
Definition: server.hh:48
a WebSocket server
Definition: server.hh:298
A representation of a possibly not-yet-computed value.
Definition: future.hh:1240
futurize_t< FuncResult > then_wrapped(Func &&func) &noexcept
Schedule a block of code to run when the future is ready, allowing for exception handling.
Definition: future.hh:1525
value_type && get()
gets the value returned by the computation
Definition: future.hh:1342
Definition: gate.hh:61
Definition: packet.hh:87
Definition: queue.hh:44
A listening socket, waiting to accept incoming network connections.
Definition: api.hh:326
Definition: socket_defs.hh:47
Definition: iostream.hh:217
CharType * get_write() noexcept
Definition: temporary_buffer.hh:128
future process()
serve WebSocket protocol on a connection
void listen(socket_address addr)
listen for a WebSocket connection on given address
future send_data(opcodes opcode, temporary_buffer< char > &&buff)
Packs buff in websocket frame and sends it to the client.
opcodes
Possible type of a websocket frame.
Definition: server.hh:60
connection(server &server, connected_socket &&fd)
Definition: server.hh:253
void listen(socket_address addr, listen_options lo)
listen for a WebSocket connection on given address with custom options
Definition: packet.hh:46
Definition: iostream.hh:214
STL namespace.
Definition: api.hh:392