27#include <seastar/http/request_parser.hh>
28#include <seastar/core/seastar.hh>
29#include <seastar/core/sstring.hh>
30#include <seastar/net/api.hh>
31#include <seastar/core/gate.hh>
32#include <seastar/core/queue.hh>
33#include <seastar/core/when_all.hh>
35namespace seastar::experimental::websocket {
37using handler_t = std::function<future<>(input_stream<char>&, output_stream<char>&)>;
51 exception(std::string_view msg) : _msg(msg) {}
52 virtual const char* what()
const noexcept {
71 static constexpr uint8_t FIN = 7;
72 static constexpr uint8_t RSV1 = 6;
73 static constexpr uint8_t RSV2 = 5;
74 static constexpr uint8_t RSV3 = 4;
75 static constexpr uint8_t MASKED = 7;
85 this->fin = (input[0] >> FIN) & 1;
86 this->rsv1 = (input[0] >> RSV1) & 1;
87 this->rsv2 = (input[0] >> RSV2) & 1;
88 this->rsv3 = (input[0] >> RSV3) & 1;
89 this->opcode = input[0] & 0b1111;
90 this->masked = (input[1] >> MASKED) & 1;
91 this->length = (input[1] & 0b1111111);
94 uint64_t get_rest_of_header_length() {
95 size_t next_read_length =
sizeof(uint32_t);
97 next_read_length +=
sizeof(uint16_t);
98 }
else if (length == 127) {
99 next_read_length +=
sizeof(uint64_t);
101 return next_read_length;
103 uint8_t get_fin() {
return fin;}
104 uint8_t get_rsv1() {
return rsv1;}
105 uint8_t get_rsv2() {
return rsv2;}
106 uint8_t get_rsv3() {
return rsv3;}
107 uint8_t get_opcode() {
return opcode;}
108 uint8_t get_masked() {
return masked;}
109 uint8_t get_length() {
return length;}
111 bool is_opcode_known() {
113 return opcode < 0xA && !(opcode < 0x8 && opcode > 0x2);
118 enum class parsing_state : uint8_t {
119 flags_and_payload_data,
120 payload_length_and_mask,
123 enum class connection_state : uint8_t {
131 parsing_state _state;
134 connection_state _cstate;
136 std::unique_ptr<frame_header> _header;
137 uint64_t _payload_length;
138 uint32_t _masking_key;
145 return make_ready_future<consumption_result_t>(
stop_consuming(std::move(data)));
149 void remove_mask(
buff_t& p,
size_t n) {
151 for (uint64_t i = 0, j = 0; i < n; ++i, j = (j + 1) % 4) {
152 payload[i] ^=
static_cast<char>(((_masking_key << (j * 8)) >> 24));
157 _cstate(connection_state::valid),
161 bool is_valid() {
return _cstate == connection_state::valid; }
162 bool eof() {
return _cstate == connection_state::closed; }
170class connection :
public boost::intrusive::list_base_hook<> {
185 return make_ready_future<buff_t>(std::move(f.
get()));
187 return current_exception_as_future<buff_t>();
194 return make_ready_future<>();
211 size_t buffer_size()
const noexcept override {
212 return data->max_size();
217 return make_ready_future<>();
232 static const size_t PIPE_SIZE = 512;
237 http_request_parser _http_parser;
246 sstring _subprotocol;
256 , _read_buf(_fd.input())
257 , _write_buf(_fd.output())
258 , _input_buffer{PIPE_SIZE}
259 , _output_buffer{PIPE_SIZE}
262 std::make_unique<connection_source_impl>(&_input_buffer)}};
264 std::make_unique<connection_sink_impl>(&_output_buffer)}};
277 future<> close(
bool send_close =
true);
282 future<> read_http_upgrade_request();
284 void on_new_connection();
299 std::vector<server_socket> _listeners;
300 boost::intrusive::list<connection> _connections;
301 std::map<std::string, handler_t> _handlers;
321 bool is_handler_registered(std::string
const& name);
323 void register_handler(std::string&& name, handler_t handler);
Definition: iostream.hh:237
Definition: iostream.hh:104
Definition: iostream.hh:163
Definition: iostream.hh:61
Definition: iostream.hh:69
a WebSocket connection
Definition: server.hh:170
an error in handling a WebSocket connection
Definition: server.hh:48
a WebSocket server
Definition: server.hh:298
Definition: server.hh:117
A representation of a possibly not-yet-computed value.
Definition: future.hh:1240
futurize_t< FuncResult > then_wrapped(Func &&func) &noexcept
Schedule a block of code to run when the future is ready, allowing for exception handling.
Definition: future.hh:1525
value_type && get()
gets the value returned by the computation
Definition: future.hh:1342
A listening socket, waiting to accept incoming network connections.
Definition: api.hh:326
Definition: socket_defs.hh:47
Definition: iostream.hh:217
CharType * get_write() noexcept
Definition: temporary_buffer.hh:128
future process()
serve WebSocket protocol on a connection
void listen(socket_address addr)
listen for a WebSocket connection on given address
future send_data(opcodes opcode, temporary_buffer< char > &&buff)
Packs buff in websocket frame and sends it to the client.
opcodes
Possible type of a websocket frame.
Definition: server.hh:60
connection(server &server, connected_socket &&fd)
Definition: server.hh:253
void listen(socket_address addr, listen_options lo)
listen for a WebSocket connection on given address with custom options
void shutdown_input()
close the socket
Definition: iostream.hh:214