Seastar
High performance C++ framework for concurrent servers
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Modules Pages
server.hh
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18/*
19 * Copyright 2021 ScyllaDB
20 */
21
22#pragma once
23
24#include <map>
25#include <functional>
26
27#include <seastar/http/request_parser.hh>
28#include <seastar/core/seastar.hh>
29#include <seastar/core/sstring.hh>
30#include <seastar/net/api.hh>
31#include <seastar/core/gate.hh>
32#include <seastar/core/queue.hh>
33#include <seastar/core/when_all.hh>
34
35namespace seastar::experimental::websocket {
36
37using handler_t = std::function<future<>(input_stream<char>&, output_stream<char>&)>;
38
39class server;
40
44
48class exception : public std::exception {
49 std::string _msg;
50public:
51 exception(std::string_view msg) : _msg(msg) {}
52 virtual const char* what() const noexcept {
53 return _msg.c_str();
54 }
55};
56
60enum opcodes {
61 CONTINUATION = 0x0,
62 TEXT = 0x1,
63 BINARY = 0x2,
64 CLOSE = 0x8,
65 PING = 0x9,
66 PONG = 0xA,
67 INVALID = 0xFF,
68};
69
71 static constexpr uint8_t FIN = 7;
72 static constexpr uint8_t RSV1 = 6;
73 static constexpr uint8_t RSV2 = 5;
74 static constexpr uint8_t RSV3 = 4;
75 static constexpr uint8_t MASKED = 7;
76
77 uint8_t fin : 1;
78 uint8_t rsv1 : 1;
79 uint8_t rsv2 : 1;
80 uint8_t rsv3 : 1;
81 uint8_t opcode : 4;
82 uint8_t masked : 1;
83 uint8_t length : 7;
84 frame_header(const char* input) {
85 this->fin = (input[0] >> FIN) & 1;
86 this->rsv1 = (input[0] >> RSV1) & 1;
87 this->rsv2 = (input[0] >> RSV2) & 1;
88 this->rsv3 = (input[0] >> RSV3) & 1;
89 this->opcode = input[0] & 0b1111;
90 this->masked = (input[1] >> MASKED) & 1;
91 this->length = (input[1] & 0b1111111);
92 }
93 // Returns length of the rest of the header.
94 uint64_t get_rest_of_header_length() {
95 size_t next_read_length = sizeof(uint32_t); // Masking key
96 if (length == 126) {
97 next_read_length += sizeof(uint16_t);
98 } else if (length == 127) {
99 next_read_length += sizeof(uint64_t);
100 }
101 return next_read_length;
102 }
103 uint8_t get_fin() {return fin;}
104 uint8_t get_rsv1() {return rsv1;}
105 uint8_t get_rsv2() {return rsv2;}
106 uint8_t get_rsv3() {return rsv3;}
107 uint8_t get_opcode() {return opcode;}
108 uint8_t get_masked() {return masked;}
109 uint8_t get_length() {return length;}
110
111 bool is_opcode_known() {
112 //https://datatracker.ietf.org/doc/html/rfc6455#section-5.1
113 return opcode < 0xA && !(opcode < 0x8 && opcode > 0x2);
114 }
115};
116
118 enum class parsing_state : uint8_t {
119 flags_and_payload_data,
120 payload_length_and_mask,
121 payload
122 };
123 enum class connection_state : uint8_t {
124 valid,
125 closed,
126 error
127 };
130 // What parser is currently doing.
131 parsing_state _state;
132 // State of connection - can be valid, closed or should be closed
133 // due to error.
134 connection_state _cstate;
135 sstring _buffer;
136 std::unique_ptr<frame_header> _header;
137 uint64_t _payload_length;
138 uint64_t _consumed_payload_length = 0;
139 uint32_t _masking_key;
140 buff_t _result;
141
142 static future<consumption_result_t> dont_stop() {
143 return make_ready_future<consumption_result_t>(continue_consuming{});
144 }
145 static future<consumption_result_t> stop(buff_t data) {
146 return make_ready_future<consumption_result_t>(stop_consuming(std::move(data)));
147 }
148 uint64_t remaining_payload_length() const {
149 return _payload_length - _consumed_payload_length;
150 }
151
152 // Removes mask from payload given in p.
153 void remove_mask(buff_t& p, size_t n) {
154 char *payload = p.get_write();
155 for (uint64_t i = 0, j = 0; i < n; ++i, j = (j + 1) % 4) {
156 payload[i] ^= static_cast<char>(((_masking_key << (j * 8)) >> 24));
157 }
158 }
159public:
160 websocket_parser() : _state(parsing_state::flags_and_payload_data),
161 _cstate(connection_state::valid),
162 _payload_length(0),
163 _masking_key(0) {}
165 bool is_valid() { return _cstate == connection_state::valid; }
166 bool eof() { return _cstate == connection_state::closed; }
167 opcodes opcode() const;
168 buff_t result();
169};
170
174class connection : public boost::intrusive::list_base_hook<> {
176
180 class connection_source_impl final : public data_source_impl {
181 queue<buff_t>* data;
182
183 public:
184 connection_source_impl(queue<buff_t>* data) : data(data) {}
185
186 virtual future<buff_t> get() override {
187 return data->pop_eventually().then_wrapped([](future<buff_t> f){
188 try {
189 return make_ready_future<buff_t>(std::move(f.get()));
190 } catch(...) {
191 return current_exception_as_future<buff_t>();
192 }
193 });
194 }
195
196 virtual future<> close() override {
197 data->push(buff_t(0));
198 return make_ready_future<>();
199 }
200 };
201
205 class connection_sink_impl final : public data_sink_impl {
206 queue<buff_t>* data;
207 public:
208 connection_sink_impl(queue<buff_t>* data) : data(data) {}
209
210 virtual future<> put(net::packet d) override {
211 net::fragment f = d.frag(0);
212 return data->push_eventually(temporary_buffer<char>{std::move(f.base), f.size});
213 }
214
215 size_t buffer_size() const noexcept override {
216 return data->max_size();
217 }
218
219 virtual future<> close() override {
220 data->push(buff_t(0));
221 return make_ready_future<>();
222 }
223 };
224
229 future<> handle_ping();
234 future<> handle_pong();
235
236 static const size_t PIPE_SIZE = 512;
237 server& _server;
239 input_stream<char> _read_buf;
240 output_stream<char> _write_buf;
241 http_request_parser _http_parser;
242 bool _done = false;
243
244 websocket_parser _websocket_parser;
245 queue <temporary_buffer<char>> _input_buffer;
246 input_stream<char> _input;
247 queue <temporary_buffer<char>> _output_buffer;
248 output_stream<char> _output;
249
250 sstring _subprotocol;
251 handler_t _handler;
252public:
258 : _server(server)
259 , _fd(std::move(fd))
260 , _read_buf(_fd.input())
261 , _write_buf(_fd.output())
262 , _input_buffer{PIPE_SIZE}
263 , _output_buffer{PIPE_SIZE}
264 {
266 std::make_unique<connection_source_impl>(&_input_buffer)}};
268 std::make_unique<connection_sink_impl>(&_output_buffer)}};
269 on_new_connection();
270 }
271 ~connection();
272
281 future<> close(bool send_close = true);
282
283protected:
284 future<> read_loop();
285 future<> read_one();
286 future<> read_http_upgrade_request();
287 future<> response_loop();
288 void on_new_connection();
293
294};
295
302class server {
303 std::vector<server_socket> _listeners;
304 boost::intrusive::list<connection> _connections;
305 std::map<std::string, handler_t> _handlers;
306 gate _task_gate;
307public:
319
324
325 bool is_handler_registered(std::string const& name);
326
327 void register_handler(std::string&& name, handler_t handler);
328
329 friend class connection;
330protected:
331 void accept(server_socket &listener);
332 future<stop_iteration> accept_one(server_socket &listener);
333};
334
336
337}
Definition: api.hh:183
Definition: iostream.hh:237
Definition: iostream.hh:104
Definition: iostream.hh:163
Definition: iostream.hh:61
Definition: iostream.hh:69
a WebSocket connection
Definition: server.hh:174
an error in handling a WebSocket connection
Definition: server.hh:48
a WebSocket server
Definition: server.hh:302
A representation of a possibly not-yet-computed value.
Definition: future.hh:1240
futurize_t< FuncResult > then_wrapped(Func &&func) &noexcept
Schedule a block of code to run when the future is ready, allowing for exception handling.
Definition: future.hh:1525
value_type && get()
gets the value returned by the computation
Definition: future.hh:1342
Definition: gate.hh:61
Definition: packet.hh:87
Definition: queue.hh:44
A listening socket, waiting to accept incoming network connections.
Definition: api.hh:326
Definition: socket_defs.hh:47
Definition: iostream.hh:217
CharType * get_write() noexcept
Definition: temporary_buffer.hh:128
future process()
serve WebSocket protocol on a connection
void listen(socket_address addr)
listen for a WebSocket connection on given address
future send_data(opcodes opcode, temporary_buffer< char > &&buff)
Packs buff in websocket frame and sends it to the client.
opcodes
Possible type of a websocket frame.
Definition: server.hh:60
connection(server &server, connected_socket &&fd)
Definition: server.hh:257
void listen(socket_address addr, listen_options lo)
listen for a WebSocket connection on given address with custom options
Definition: packet.hh:46
Definition: iostream.hh:214
STL namespace.
Definition: api.hh:397