Seastar
High performance C++ framework for concurrent servers
rpc_types.hh
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18/*
19 * Copyright (C) 2015 Cloudius Systems, Ltd.
20 */
21
22#pragma once
23
24#if FMT_VERSION >= 90000
25#include <fmt/ostream.h>
26#endif
27#if FMT_VERSION >= 100000
28#include <fmt/std.h>
29#endif
30
31#include <seastar/net/api.hh>
32#include <stdexcept>
33#include <string>
34#include <boost/any.hpp>
35#include <boost/type.hpp>
36#include <seastar/util/std-compat.hh>
37#include <seastar/util/variant_utils.hh>
38#include <seastar/core/timer.hh>
39#include <seastar/core/circular_buffer.hh>
40#include <seastar/core/simple-stream.hh>
41#include <seastar/core/lowres_clock.hh>
42#include <boost/functional/hash.hpp>
43#include <seastar/core/sharded.hh>
44
45namespace seastar {
46
47namespace rpc {
48
49using rpc_clock_type = lowres_clock;
50
51// used to tag a type for serializers
52template<typename T>
53using type = boost::type<T>;
54
55struct stats {
56 using counter_type = uint64_t;
57 counter_type replied = 0;
58 counter_type pending = 0;
59 counter_type exception_received = 0;
60 counter_type sent_messages = 0;
61 counter_type wait_reply = 0;
62 counter_type timeout = 0;
63 counter_type delay_samples = 0;
64 std::chrono::duration<double> delay_total = std::chrono::duration<double>(0);
65};
66
68 uint64_t _id;
69
70public:
71 uint64_t id() const {
72 return _id;
73 }
74 bool operator==(const connection_id& o) const {
75 return _id == o._id;
76 }
77 explicit operator bool() const {
78 return shard() != 0xffff;
79 }
80 size_t shard() const {
81 return size_t(_id & 0xffff);
82 }
83 constexpr static connection_id make_invalid_id(uint64_t _id = 0) {
84 return make_id(_id, 0xffff);
85 }
86 constexpr static connection_id make_id(uint64_t _id, uint16_t shard) {
87 return {_id << 16 | shard};
88 }
89 constexpr connection_id(uint64_t id) : _id(id) {}
90};
91
92constexpr connection_id invalid_connection_id = connection_id::make_invalid_id();
93
94std::ostream& operator<<(std::ostream&, const connection_id&);
95
96class server;
97
99 socket_address addr;
101 connection_id conn_id;
102 std::unordered_map<sstring, boost::any> user_data;
103 template <typename T>
104 void attach_auxiliary(const sstring& key, T&& object) {
105 user_data.emplace(key, boost::any(std::forward<T>(object)));
106 }
107 template <typename T>
108 T& retrieve_auxiliary(const sstring& key) {
109 auto it = user_data.find(key);
110 assert(it != user_data.end());
111 return boost::any_cast<T&>(it->second);
112 }
113 template <typename T>
114 std::add_const_t<T>& retrieve_auxiliary(const sstring& key) const {
115 return const_cast<client_info*>(this)->retrieve_auxiliary<std::add_const_t<T>>(key);
116 }
117 template <typename T>
118 T* retrieve_auxiliary_opt(const sstring& key) noexcept {
119 auto it = user_data.find(key);
120 if (it == user_data.end()) {
121 return nullptr;
122 }
123 return &boost::any_cast<T&>(it->second);
124 }
125 template <typename T>
126 const T* retrieve_auxiliary_opt(const sstring& key) const noexcept {
127 auto it = user_data.find(key);
128 if (it == user_data.end()) {
129 return nullptr;
130 }
131 return &boost::any_cast<const T&>(it->second);
132 }
133};
134
135class error : public std::runtime_error {
136public:
137 error(const std::string& msg) : std::runtime_error(msg) {}
138};
139
140class closed_error : public error {
141public:
142 closed_error() : error("connection is closed") {}
143};
144
145class timeout_error : public error {
146public:
147 timeout_error() : error("rpc call timed out") {}
148};
149
150class unknown_verb_error : public error {
151public:
152 uint64_t type;
153 unknown_verb_error(uint64_t type_) : error("unknown verb"), type(type_) {}
154};
155
157public:
158 unknown_exception_error() : error("unknown exception") {}
159};
160
161class rpc_protocol_error : public error {
162public:
163 rpc_protocol_error() : error("rpc protocol exception") {}
164};
165
166class canceled_error : public error {
167public:
168 canceled_error() : error("rpc call was canceled") {}
169};
170
171class stream_closed : public error {
172public:
173 stream_closed() : error("rpc stream was closed by peer") {}
174};
175
176class remote_verb_error : public error {
177 using error::error;
178};
179
180struct no_wait_type {};
181
182// return this from a callback if client does not want to waiting for a reply
183extern no_wait_type no_wait;
184
187
188template <typename T>
189class optional : public std::optional<T> {
190public:
191 using std::optional<T>::optional;
192};
193
194class opt_time_point : public std::optional<rpc_clock_type::time_point> {
195public:
196 using std::optional<rpc_clock_type::time_point>::optional;
197 opt_time_point(std::optional<rpc_clock_type::time_point> time_point) {
198 static_cast<std::optional<rpc_clock_type::time_point>&>(*this) = time_point;
199 }
200};
201
203
205 std::function<void()> cancel_send;
206 std::function<void()> cancel_wait;
207 cancellable** send_back_pointer = nullptr;
208 cancellable** wait_back_pointer = nullptr;
209 cancellable() = default;
210 cancellable(cancellable&& x) : cancel_send(std::move(x.cancel_send)), cancel_wait(std::move(x.cancel_wait)), send_back_pointer(x.send_back_pointer), wait_back_pointer(x.wait_back_pointer) {
211 if (send_back_pointer) {
212 *send_back_pointer = this;
213 x.send_back_pointer = nullptr;
214 }
215 if (wait_back_pointer) {
216 *wait_back_pointer = this;
217 x.wait_back_pointer = nullptr;
218 }
219 }
220 cancellable& operator=(cancellable&& x) {
221 if (&x != this) {
222 this->~cancellable();
223 new (this) cancellable(std::move(x));
224 }
225 return *this;
226 }
227 void cancel() {
228 if (cancel_send) {
229 cancel_send();
230 }
231 if (cancel_wait) {
232 cancel_wait();
233 }
234 }
235 ~cancellable() {
236 cancel();
237 }
238};
239
240struct rcv_buf {
241 uint32_t size = 0;
242 std::optional<semaphore_units<>> su;
243 std::variant<std::vector<temporary_buffer<char>>, temporary_buffer<char>> bufs;
244 using iterator = std::vector<temporary_buffer<char>>::iterator;
245 rcv_buf() {}
246 explicit rcv_buf(size_t size_) : size(size_) {}
247 explicit rcv_buf(temporary_buffer<char> b) : size(b.size()), bufs(std::move(b)) {};
248 explicit rcv_buf(std::vector<temporary_buffer<char>> bufs, size_t size)
249 : size(size), bufs(std::move(bufs)) {};
250};
251
252struct snd_buf {
253 // Preferred, but not required, chunk size.
254 static constexpr size_t chunk_size = 128*1024;
255 uint32_t size = 0;
256 std::variant<std::vector<temporary_buffer<char>>, temporary_buffer<char>> bufs;
257 using iterator = std::vector<temporary_buffer<char>>::iterator;
258 snd_buf() {}
259 snd_buf(snd_buf&&) noexcept;
260 snd_buf& operator=(snd_buf&&) noexcept;
261 explicit snd_buf(size_t size_);
262 explicit snd_buf(temporary_buffer<char> b) : size(b.size()), bufs(std::move(b)) {};
263
264 explicit snd_buf(std::vector<temporary_buffer<char>> bufs, size_t size)
265 : size(size), bufs(std::move(bufs)) {};
266
267 temporary_buffer<char>& front();
268};
269
270static inline memory_input_stream<rcv_buf::iterator> make_deserializer_stream(rcv_buf& input) {
271 auto* b = std::get_if<temporary_buffer<char>>(&input.bufs);
272 if (b) {
274 } else {
275 auto& ar = std::get<std::vector<temporary_buffer<char>>>(input.bufs);
277 }
278}
279
281public:
282 virtual ~compressor() {}
283 // compress data and leave head_space bytes at the beginning of returned buffer
284 virtual snd_buf compress(size_t head_space, snd_buf data) = 0;
285 // decompress data
286 virtual rcv_buf decompress(rcv_buf data) = 0;
287 virtual sstring name() const = 0;
288 virtual future<> close() noexcept { return make_ready_future<>(); };
289
290 // factory to create compressor for a connection
291 class factory {
292 public:
293 virtual ~factory() {}
294 // return feature string that will be sent as part of protocol negotiation
295 virtual const sstring& supported() const = 0;
296 // negotiate compress algorithm
297 // send_empty_frame() requests an empty frame to be sent to the peer compressor on the other side of the connection.
298 // By attaching a header to this empty frame, the compressor can communicate somthing to the peer,
299 // send_empty_frame() mustn't be called from inside compress() or decompress().
300 virtual std::unique_ptr<compressor> negotiate(sstring feature, bool is_server, std::function<future<>()> send_empty_frame) const {
301 return negotiate(feature, is_server);
302 }
303 virtual std::unique_ptr<compressor> negotiate(sstring feature, bool is_server) const = 0;
304 };
305};
306
307class connection;
308
310constexpr size_t max_queued_stream_buffers = 50;
311constexpr size_t max_stream_buffers_memory = 100 * 1024;
312
315
316// send data Out...
317template<typename... Out>
318class sink {
319public:
320 class impl {
321 protected:
323 semaphore _sem;
324 std::exception_ptr _ex;
325 impl(xshard_connection_ptr con) : _con(std::move(con)), _sem(max_stream_buffers_memory) {}
326 public:
327 virtual ~impl() {};
328 virtual future<> operator()(const Out&... args) = 0;
329 virtual future<> close() = 0;
330 virtual future<> flush() = 0;
331 friend sink;
332 };
333
334private:
335 shared_ptr<impl> _impl;
336
337public:
338 sink(shared_ptr<impl> impl) : _impl(std::move(impl)) {}
339 future<> operator()(const Out&... args) {
340 return _impl->operator()(args...);
341 }
342 future<> close() {
343 return _impl->close();
344 }
345 // Calling this function makes sure that any data buffered
346 // by the stream sink will be flushed to the network.
347 // It does not mean the data was received by the corresponding
348 // source.
349 future<> flush() {
350 return _impl->flush();
351 }
352 connection_id get_id() const;
353};
354
355// receive data In...
356template<typename... In>
357class source {
358public:
359 class impl {
360 protected:
363 impl(xshard_connection_ptr con) : _con(std::move(con)) {
364 _bufs.reserve(max_queued_stream_buffers);
365 }
366 public:
367 virtual ~impl() {}
368 virtual future<std::optional<std::tuple<In...>>> operator()() = 0;
369 friend source;
370 };
371private:
372 shared_ptr<impl> _impl;
373
374public:
375 source(shared_ptr<impl> impl) : _impl(std::move(impl)) {}
376 future<std::optional<std::tuple<In...>>> operator()() {
377 return _impl->operator()();
378 };
379 connection_id get_id() const;
380 template<typename Serializer, typename... Out> sink<Out...> make_sink();
381};
382
397template <typename... T>
398class tuple : public std::tuple<T...> {
399public:
400 using std::tuple<T...>::tuple;
401 tuple(std::tuple<T...>&& x) : std::tuple<T...>(std::move(x)) {}
402};
403
405
406template <typename... T>
407tuple(T&&...) -> tuple<T...>;
408
409} // namespace rpc
410
411}
412
413namespace std {
414template<>
415struct hash<seastar::rpc::connection_id> {
416 size_t operator()(const seastar::rpc::connection_id& id) const {
417 size_t h = 0;
418 boost::hash_combine(h, std::hash<uint64_t>{}(id.id()));
419 return h;
420 }
421};
422
423template <typename... T>
424struct tuple_size<seastar::rpc::tuple<T...>> : tuple_size<tuple<T...>> {
425};
426
427template <size_t I, typename... T>
428struct tuple_element<I, seastar::rpc::tuple<T...>> : tuple_element<I, tuple<T...>> {
429};
430
431}
432
433#if FMT_VERSION >= 90000
434template <> struct fmt::formatter<seastar::rpc::connection_id> : fmt::ostream_formatter {};
435#endif
436
437#if FMT_VERSION < 100000
438// fmt v10 introduced formatter for std::exception
439template <std::derived_from<seastar::rpc::error> T>
440struct fmt::formatter<T> : fmt::formatter<string_view> {
441 auto format(const T& e, fmt::format_context& ctx) const {
442 return fmt::format_to(ctx.out(), "{}", e.what());
443 }
444};
445#endif
446
447#if FMT_VERSION < 100000
448template <typename T>
449struct fmt::formatter<seastar::rpc::optional<T>> {
450 constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); }
451 auto format(const seastar::rpc::optional<T>& opt, fmt::format_context& ctx) const {
452 if (opt) {
453 return fmt::format_to(ctx.out(), "optional({})", *opt);
454 } else {
455 return fmt::format_to(ctx.out(), "none");
456 }
457 }
458};
459#else
460template <typename T>
461struct fmt::formatter<seastar::rpc::optional<T>> : private fmt::formatter<std::optional<T>> {
462 using fmt::formatter<std::optional<T>>::parse;
463 auto format(const seastar::rpc::optional<T>& opt, fmt::format_context& ctx) const {
464 return fmt::formatter<std::optional<T>>::format(opt, ctx);
465 }
466};
467#endif
Definition: circular_buffer.hh:63
Definition: simple-stream.hh:383
A representation of a possibly not-yet-computed value.
Definition: future.hh:1240
Definition: simple-stream.hh:464
Definition: rpc_types.hh:166
Definition: rpc_types.hh:140
Definition: rpc_types.hh:291
Definition: rpc_types.hh:280
Definition: rpc_types.hh:67
Definition: rpc.hh:249
Definition: rpc_types.hh:135
Definition: rpc_types.hh:194
Definition: rpc_types.hh:189
Definition: rpc_types.hh:176
Definition: rpc_types.hh:161
Definition: rpc.hh:591
Definition: rpc_types.hh:318
Definition: rpc_types.hh:357
Definition: rpc_types.hh:171
Definition: rpc_types.hh:145
Definition: rpc_types.hh:398
Definition: rpc_types.hh:156
Definition: rpc_types.hh:150
Definition: simple-stream.hh:330
Definition: socket_defs.hh:47
size_t size() const noexcept
Gets the buffer size.
Definition: temporary_buffer.hh:130
holds the implementation parts of the metrics layer, do not use directly.
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
STL namespace.
Definition: rpc_types.hh:204
Definition: rpc_types.hh:98
Definition: rpc_types.hh:180
Definition: rpc_types.hh:240
Definition: rpc_types.hh:252
Definition: rpc_types.hh:55