Seastar
High performance C++ framework for concurrent servers
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Modules Pages
content_source.hh
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18/*
19 * Copyright 2020 ScyllaDB
20 */
21
22#pragma once
23
24#include <seastar/http/chunk_parsers.hh>
25#include <seastar/core/iostream.hh>
26#include <seastar/core/temporary_buffer.hh>
27#include <seastar/http/common.hh>
28#include <seastar/util/log.hh>
29#include <seastar/http/exception.hh>
30
31namespace seastar {
32
33namespace httpd {
34
35namespace internal {
36
37/*
38 * An input_stream wrapper that allows to read only "length" bytes
39 * from it, used to handle requests with large bodies.
40 * */
43 size_t _remaining_bytes = 0;
44public:
46 : _inp(inp), _remaining_bytes(length) {
47 }
48
49 virtual future<temporary_buffer<char>> get() override {
50 if (_remaining_bytes == 0) {
51 return make_ready_future<temporary_buffer<char>>();
52 }
53 return _inp.read_up_to(_remaining_bytes).then([this] (temporary_buffer<char> tmp_buf) {
54 _remaining_bytes -= tmp_buf.size();
55 return tmp_buf;
56 });
57 }
58
59 virtual future<temporary_buffer<char>> skip(uint64_t n) override {
60 uint64_t skip_bytes = std::min(n, _remaining_bytes);
61 _remaining_bytes -= skip_bytes;
62 return _inp.skip(skip_bytes).then([] {
64 });
65 }
66
67 virtual future<> close() override {
68 return make_ready_future<>();
69 }
70};
71
72/*
73 * An input_stream wrapper that decodes a request body
74 * with "chunked" encoding.
75 * */
77 class chunk_parser {
78 enum class parsing_state
79 : uint8_t {
80 size_and_ext,
81 body,
82 trailer_part
83 };
84 http_chunk_size_and_ext_parser _size_and_ext_parser;
85 http_chunk_trailer_parser _trailer_parser;
86
88 size_t _current_chunk_bytes_read = 0;
89 size_t _current_chunk_length;
90 parsing_state _ps = parsing_state::size_and_ext;
91 bool _end_of_request = false;
92 // references to fields in the request structure
93 std::unordered_map<sstring, sstring>& _chunk_extensions;
94 std::unordered_map<sstring, sstring>& _trailing_headers;
95 using consumption_result_type = consumption_result<char>;
96 public:
97 chunk_parser(std::unordered_map<sstring, sstring>& chunk_extensions, std::unordered_map<sstring, sstring>& trailing_headers)
98 : _chunk_extensions(chunk_extensions), _trailing_headers(trailing_headers) {
99 _size_and_ext_parser.init();
100 }
102 _current_chunk_bytes_read += _buf.size();
103 return std::move(_buf);
104 }
105
107 if (_buf.size() || _end_of_request || data.empty()) {
108 // return if we have already read some content (_buf.size()), we have already reached the end of the chunked request (_end_of_request),
109 // or the underlying stream reached eof (data.empty())
110 return make_ready_future<consumption_result_type>(stop_consuming(std::move(data)));
111 }
112 switch (_ps) {
113 // "data" buffer is non-empty
114 case parsing_state::size_and_ext:
115 return _size_and_ext_parser(std::move(data)).then([this] (std::optional<temporary_buffer<char>> res) {
116 if (res.has_value()) {
117 if (_size_and_ext_parser.failed()) {
118 return make_exception_future<consumption_result_type>(bad_request_exception("Can't parse chunk size and extensions"));
119 }
120 // save extensions
121 auto parsed_extensions = _size_and_ext_parser.get_parsed_extensions();
122 _chunk_extensions.merge(parsed_extensions);
123 for (auto& key_val : parsed_extensions) {
124 _chunk_extensions[key_val.first] += sstring(",") + key_val.second;
125 }
126
127 // save size
128 auto size_string = _size_and_ext_parser.get_size();
129 if (size_string.size() > 16) {
130 return make_exception_future<consumption_result_type>(bad_chunk_exception("Chunk length too big"));
131 }
132 _current_chunk_bytes_read = 0;
133 _current_chunk_length = strtol(size_string.c_str(), nullptr, 16);
134
135 if (_current_chunk_length == 0) {
136 _ps = parsing_state::trailer_part;
137 _trailer_parser.init();
138 } else {
139 _ps = parsing_state::body;
140 }
141 if (res->empty()) {
142 return make_ready_future<consumption_result_type>(continue_consuming{});
143 }
144 return this->operator()(std::move(res.value()));
145 } else {
146 return make_ready_future<consumption_result_type>(continue_consuming{});
147 }
148 });
149 case parsing_state::body:
150 // read the new data into _buf
151 if (_current_chunk_bytes_read < _current_chunk_length) {
152 size_t to_read = std::min(_current_chunk_length - _current_chunk_bytes_read, data.size());
153 if (_buf.empty()) {
154 _buf = data.share(0, to_read);
155 }
156 data.trim_front(to_read);
157 return make_ready_future<consumption_result_type>(stop_consuming(std::move(data)));
158 }
159
160 // chunk body is finished, we haven't entered the previous if, so "data" is still non-empty
161 if (_current_chunk_bytes_read == _current_chunk_length) {
162 // we haven't read \r yet
163 if (data.get()[0] != '\r') {
164 return make_exception_future<consumption_result_type>(bad_chunk_exception("The actual chunk length exceeds the specified length"));
165 } else {
166 _current_chunk_bytes_read++;
167 data.trim_front(1);
168 if (data.empty()) {
169 return make_ready_future<consumption_result_type>(continue_consuming{});
170 }
171 }
172 }
173 if (_current_chunk_bytes_read == _current_chunk_length + 1) {
174 // we haven't read \n but have \r
175 if (data.get()[0] != '\n') {
176 return make_exception_future<consumption_result_type>(bad_chunk_exception("The actual chunk length exceeds the specified length"));
177 } else {
178 _ps = parsing_state::size_and_ext;
179 _size_and_ext_parser.init();
180 data.trim_front(1);
181 if (data.empty()) {
182 return make_ready_future<consumption_result_type>(continue_consuming{});
183 }
184 }
185 }
186 return this->operator()(std::move(data));
187 case parsing_state::trailer_part:
188 return _trailer_parser(std::move(data)).then([this] (std::optional<temporary_buffer<char>> res) {
189 if (res.has_value()) {
190 if (_trailer_parser.failed()) {
191 return make_exception_future<consumption_result_type>(bad_request_exception("Can't parse chunked request trailer"));
192 }
193 // save trailing headers
194 _trailing_headers = _trailer_parser.get_parsed_headers();
195 _end_of_request = true;
196 return make_ready_future<consumption_result_type>(stop_consuming(std::move(*res)));
197 } else {
198 return make_ready_future<consumption_result_type>(continue_consuming{});
199 }
200 });
201 }
202 __builtin_unreachable();
203 }
204 };
205 input_stream<char>& _inp;
206 chunk_parser _chunk;
207
208public:
209 chunked_source_impl(input_stream<char>& inp, std::unordered_map<sstring, sstring>& chunk_extensions, std::unordered_map<sstring, sstring>& trailing_headers)
210 : _inp(inp), _chunk(chunk_extensions, trailing_headers) {
211 }
212
213 virtual future<temporary_buffer<char>> get() override {
214 return _inp.consume(_chunk).then([this] () mutable {
215 return _chunk.buf();
216 });
217 }
218
219 virtual future<> close() override {
220 return make_ready_future<>();
221 }
222};
223
224} // namespace internal
225
226} // namespace httpd
227
228}
Definition: iostream.hh:237
Definition: iostream.hh:61
A representation of a possibly not-yet-computed value.
Definition: future.hh:1240
Result then(Func &&func) noexcept
Schedule a block of code to run when the future is ready.
Definition: future.hh:1425
Definition: exception.hh:110
Definition: content_source.hh:76
future skip(uint64_t n) noexcept
Ignores n next bytes from the stream.
Definition: iostream-impl.hh:301
future< tmp_buf > read_up_to(size_t n) noexcept
Definition: iostream-impl.hh:255
Definition: iostream.hh:228
Definition: iostream.hh:217
bool empty() const noexcept
Checks whether the buffer is empty.
Definition: temporary_buffer.hh:152
temporary_buffer share()
Definition: temporary_buffer.hh:160
void trim_front(size_t pos) noexcept
Definition: temporary_buffer.hh:186
size_t size() const noexcept
Gets the buffer size.
Definition: temporary_buffer.hh:130
const CharType * get() const noexcept
Gets a pointer to the beginning of the buffer.
Definition: temporary_buffer.hh:125
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
Definition: iostream.hh:214