Seastar
High performance C++ framework for concurrent servers
server.hh
1 /*
2  * This file is open source software, licensed to you under the terms
3  * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4  * distributed with this work for additional information regarding copyright
5  * ownership. You may not use this file except in compliance with the License.
6  *
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an
13  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14  * KIND, either express or implied. See the License for the
15  * specific language governing permissions and limitations
16  * under the License.
17  */
18 /*
19  * Copyright 2021 ScyllaDB
20  */
21 
22 #pragma once
23 
24 #include <map>
25 #include <functional>
26 
27 #include <seastar/http/request_parser.hh>
28 #include <seastar/core/seastar.hh>
29 #include <seastar/core/sstring.hh>
30 #include <seastar/net/api.hh>
31 #include <seastar/core/gate.hh>
32 #include <seastar/core/queue.hh>
33 #include <seastar/core/when_all.hh>
34 
35 namespace seastar::experimental::websocket {
36 
37 using handler_t = std::function<future<>(input_stream<char>&, output_stream<char>&)>;
38 
39 class server;
40 
44 
48 class exception : public std::exception {
49  std::string _msg;
50 public:
51  exception(std::string_view msg) : _msg(msg) {}
52  virtual const char* what() const noexcept {
53  return _msg.c_str();
54  }
55 };
56 
60 enum opcodes {
61  CONTINUATION = 0x0,
62  TEXT = 0x1,
63  BINARY = 0x2,
64  CLOSE = 0x8,
65  PING = 0x9,
66  PONG = 0xA,
67  INVALID = 0xFF,
68 };
69 
70 struct frame_header {
71  static constexpr uint8_t FIN = 7;
72  static constexpr uint8_t RSV1 = 6;
73  static constexpr uint8_t RSV2 = 5;
74  static constexpr uint8_t RSV3 = 4;
75  static constexpr uint8_t MASKED = 7;
76 
77  uint8_t fin : 1;
78  uint8_t rsv1 : 1;
79  uint8_t rsv2 : 1;
80  uint8_t rsv3 : 1;
81  uint8_t opcode : 4;
82  uint8_t masked : 1;
83  uint8_t length : 7;
84  frame_header(const char* input) {
85  this->fin = (input[0] >> FIN) & 1;
86  this->rsv1 = (input[0] >> RSV1) & 1;
87  this->rsv2 = (input[0] >> RSV2) & 1;
88  this->rsv3 = (input[0] >> RSV3) & 1;
89  this->opcode = input[0] & 0b1111;
90  this->masked = (input[1] >> MASKED) & 1;
91  this->length = (input[1] & 0b1111111);
92  }
93  // Returns length of the rest of the header.
94  uint64_t get_rest_of_header_length() {
95  size_t next_read_length = sizeof(uint32_t); // Masking key
96  if (length == 126) {
97  next_read_length += sizeof(uint16_t);
98  } else if (length == 127) {
99  next_read_length += sizeof(uint64_t);
100  }
101  return next_read_length;
102  }
103  uint8_t get_fin() {return fin;}
104  uint8_t get_rsv1() {return rsv1;}
105  uint8_t get_rsv2() {return rsv2;}
106  uint8_t get_rsv3() {return rsv3;}
107  uint8_t get_opcode() {return opcode;}
108  uint8_t get_masked() {return masked;}
109  uint8_t get_length() {return length;}
110 
111  bool is_opcode_known() {
112  //https://datatracker.ietf.org/doc/html/rfc6455#section-5.1
113  return opcode < 0xA && !(opcode < 0x8 && opcode > 0x2);
114  }
115 };
116 
118  enum class parsing_state : uint8_t {
119  flags_and_payload_data,
120  payload_length_and_mask,
121  payload
122  };
123  enum class connection_state : uint8_t {
124  valid,
125  closed,
126  error
127  };
130  // What parser is currently doing.
131  parsing_state _state;
132  // State of connection - can be valid, closed or should be closed
133  // due to error.
134  connection_state _cstate;
135  sstring _buffer;
136  std::unique_ptr<frame_header> _header;
137  uint64_t _payload_length;
138  uint32_t _masking_key;
139  buff_t _result;
140 
141  static future<consumption_result_t> dont_stop() {
142  return make_ready_future<consumption_result_t>(continue_consuming{});
143  }
144  static future<consumption_result_t> stop(buff_t data) {
145  return make_ready_future<consumption_result_t>(stop_consuming(std::move(data)));
146  }
147 
148  // Removes mask from payload given in p.
149  void remove_mask(buff_t& p, size_t n) {
150  char *payload = p.get_write();
151  for (uint64_t i = 0, j = 0; i < n; ++i, j = (j + 1) % 4) {
152  payload[i] ^= static_cast<char>(((_masking_key << (j * 8)) >> 24));
153  }
154  }
155 public:
156  websocket_parser() : _state(parsing_state::flags_and_payload_data),
157  _cstate(connection_state::valid),
158  _payload_length(0),
159  _masking_key(0) {}
161  bool is_valid() { return _cstate == connection_state::valid; }
162  bool eof() { return _cstate == connection_state::closed; }
163  opcodes opcode() const;
164  buff_t result();
165 };
166 
170 class connection : public boost::intrusive::list_base_hook<> {
172 
176  class connection_source_impl final : public data_source_impl {
177  queue<buff_t>* data;
178 
179  public:
180  connection_source_impl(queue<buff_t>* data) : data(data) {}
181 
182  virtual future<buff_t> get() override {
183  return data->pop_eventually().then_wrapped([](future<buff_t> f){
184  try {
185  return make_ready_future<buff_t>(std::move(f.get()));
186  } catch(...) {
187  return current_exception_as_future<buff_t>();
188  }
189  });
190  }
191 
192  virtual future<> close() override {
193  data->push(buff_t(0));
194  return make_ready_future<>();
195  }
196  };
197 
201  class connection_sink_impl final : public data_sink_impl {
202  queue<buff_t>* data;
203  public:
204  connection_sink_impl(queue<buff_t>* data) : data(data) {}
205 
206  virtual future<> put(net::packet d) override {
207  net::fragment f = d.frag(0);
208  return data->push_eventually(temporary_buffer<char>{std::move(f.base), f.size});
209  }
210 
211  size_t buffer_size() const noexcept override {
212  return data->max_size();
213  }
214 
215  virtual future<> close() override {
216  data->push(buff_t(0));
217  return make_ready_future<>();
218  }
219  };
220 
225  future<> handle_ping();
230  future<> handle_pong();
231 
232  static const size_t PIPE_SIZE = 512;
233  server& _server;
234  connected_socket _fd;
235  input_stream<char> _read_buf;
236  output_stream<char> _write_buf;
237  http_request_parser _http_parser;
238  bool _done = false;
239 
240  websocket_parser _websocket_parser;
241  queue <temporary_buffer<char>> _input_buffer;
242  input_stream<char> _input;
243  queue <temporary_buffer<char>> _output_buffer;
244  output_stream<char> _output;
245 
246  sstring _subprotocol;
247  handler_t _handler;
248 public:
254  : _server(server)
255  , _fd(std::move(fd))
256  , _read_buf(_fd.input())
257  , _write_buf(_fd.output())
258  , _input_buffer{PIPE_SIZE}
259  , _output_buffer{PIPE_SIZE}
260  {
262  std::make_unique<connection_source_impl>(&_input_buffer)}};
263  _output = output_stream<char>{data_sink{
264  std::make_unique<connection_sink_impl>(&_output_buffer)}};
265  on_new_connection();
266  }
267  ~connection();
268 
277  future<> close(bool send_close = true);
278 
279 protected:
280  future<> read_loop();
281  future<> read_one();
282  future<> read_http_upgrade_request();
283  future<> response_loop();
284  void on_new_connection();
289 
290 };
291 
298 class server {
299  std::vector<server_socket> _listeners;
300  boost::intrusive::list<connection> _connections;
301  std::map<std::string, handler_t> _handlers;
302  gate _task_gate;
303 public:
308  void listen(socket_address addr);
315 
320 
321  bool is_handler_registered(std::string const& name);
322 
323  void register_handler(std::string&& name, handler_t handler);
324 
325  friend class connection;
326 protected:
327  void accept(server_socket &listener);
328  future<stop_iteration> accept_one(server_socket &listener);
329 };
330 
332 
333 }
Definition: api.hh:182
Definition: iostream.hh:222
Definition: iostream.hh:103
Definition: iostream.hh:148
Definition: iostream.hh:60
Definition: iostream.hh:68
a WebSocket connection
Definition: server.hh:170
an error in handling a WebSocket connection
Definition: server.hh:48
a WebSocket server
Definition: server.hh:298
A representation of a possibly not-yet-computed value.
Definition: future.hh:1238
futurize_t< FuncResult > then_wrapped(Func &&func) &noexcept
Schedule a block of code to run when the future is ready, allowing for exception handling.
Definition: future.hh:1511
value_type && get()
gets the value returned by the computation
Definition: future.hh:1340
Definition: gate.hh:61
Definition: packet.hh:87
Definition: queue.hh:44
A listening socket, waiting to accept incoming network connections.
Definition: api.hh:325
Definition: socket_defs.hh:47
Definition: iostream.hh:202
CharType * get_write() noexcept
Definition: temporary_buffer.hh:128
future process()
serve WebSocket protocol on a connection
void listen(socket_address addr)
listen for a WebSocket connection on given address
future send_data(opcodes opcode, temporary_buffer< char > &&buff)
Packs buff in websocket frame and sends it to the client.
opcodes
Possible type of a websocket frame.
Definition: server.hh:60
connection(server &server, connected_socket &&fd)
Definition: server.hh:253
void listen(socket_address addr, listen_options lo)
listen for a WebSocket connection on given address with custom options
Definition: packet.hh:46
Definition: iostream.hh:199
Definition: api.hh:391