Seastar
High performance C++ framework for concurrent servers
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Modules Pages
rpc_types.hh
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18/*
19 * Copyright (C) 2015 Cloudius Systems, Ltd.
20 */
21
22#pragma once
23
24#if FMT_VERSION >= 90000
25#include <fmt/ostream.h>
26#endif
27#if FMT_VERSION >= 100000
28#include <fmt/std.h>
29#endif
30
31#include <seastar/net/api.hh>
32#include <stdexcept>
33#include <string>
34#include <any>
35#include <seastar/util/std-compat.hh>
36#include <seastar/util/variant_utils.hh>
37#include <seastar/core/timer.hh>
38#include <seastar/core/circular_buffer.hh>
39#include <seastar/core/simple-stream.hh>
40#include <seastar/core/lowres_clock.hh>
41#include <boost/functional/hash.hpp>
42#include <seastar/core/sharded.hh>
43
44namespace seastar {
45
46namespace rpc {
47
48using rpc_clock_type = lowres_clock;
49
50// used to tag a type for serializers
51template<typename T>
52using type = std::type_identity<T>;
53
54struct stats {
55 using counter_type = uint64_t;
56 counter_type replied = 0;
57 counter_type pending = 0;
58 counter_type exception_received = 0;
59 counter_type sent_messages = 0;
60 counter_type wait_reply = 0;
61 counter_type timeout = 0;
62 counter_type delay_samples = 0;
63 std::chrono::duration<double> delay_total = std::chrono::duration<double>(0);
64};
65
67 uint64_t _id;
68
69public:
70 uint64_t id() const {
71 return _id;
72 }
73 bool operator==(const connection_id& o) const {
74 return _id == o._id;
75 }
76 explicit operator bool() const {
77 return shard() != 0xffff;
78 }
79 size_t shard() const {
80 return size_t(_id & 0xffff);
81 }
82 constexpr static connection_id make_invalid_id(uint64_t _id = 0) {
83 return make_id(_id, 0xffff);
84 }
85 constexpr static connection_id make_id(uint64_t _id, uint16_t shard) {
86 return {_id << 16 | shard};
87 }
88 constexpr connection_id(uint64_t id) : _id(id) {}
89};
90
91constexpr connection_id invalid_connection_id = connection_id::make_invalid_id();
92
93std::ostream& operator<<(std::ostream&, const connection_id&);
94
95class server;
96
98 socket_address addr;
100 connection_id conn_id;
101 std::unordered_map<sstring, std::any> user_data;
102 template <typename T>
103 void attach_auxiliary(const sstring& key, T&& object) {
104 user_data.emplace(key, std::any(std::forward<T>(object)));
105 }
106 template <typename T>
107 T& retrieve_auxiliary(const sstring& key) {
108 auto it = user_data.find(key);
109 assert(it != user_data.end());
110 return std::any_cast<T&>(it->second);
111 }
112 template <typename T>
113 std::add_const_t<T>& retrieve_auxiliary(const sstring& key) const {
114 return const_cast<client_info*>(this)->retrieve_auxiliary<std::add_const_t<T>>(key);
115 }
116 template <typename T>
117 T* retrieve_auxiliary_opt(const sstring& key) noexcept {
118 auto it = user_data.find(key);
119 if (it == user_data.end()) {
120 return nullptr;
121 }
122 return &std::any_cast<T&>(it->second);
123 }
124 template <typename T>
125 const T* retrieve_auxiliary_opt(const sstring& key) const noexcept {
126 auto it = user_data.find(key);
127 if (it == user_data.end()) {
128 return nullptr;
129 }
130 return &std::any_cast<const T&>(it->second);
131 }
132};
133
134class error : public std::runtime_error {
135public:
136 error(const std::string& msg) : std::runtime_error(msg) {}
137};
138
139class closed_error : public error {
140public:
141 closed_error() : error("connection is closed") {}
142};
143
144class timeout_error : public error {
145public:
146 timeout_error() : error("rpc call timed out") {}
147};
148
149class unknown_verb_error : public error {
150public:
151 uint64_t type;
152 unknown_verb_error(uint64_t type_) : error("unknown verb"), type(type_) {}
153};
154
156public:
157 unknown_exception_error() : error("unknown exception") {}
158};
159
160class rpc_protocol_error : public error {
161public:
162 rpc_protocol_error() : error("rpc protocol exception") {}
163};
164
165class canceled_error : public error {
166public:
167 canceled_error() : error("rpc call was canceled") {}
168};
169
170class stream_closed : public error {
171public:
172 stream_closed() : error("rpc stream was closed by peer") {}
173};
174
175class remote_verb_error : public error {
176 using error::error;
177};
178
179struct no_wait_type {};
180
181// return this from a callback if client does not want to waiting for a reply
182extern no_wait_type no_wait;
183
186
187template <typename T>
188class optional : public std::optional<T> {
189public:
190 using std::optional<T>::optional;
191};
192
193class opt_time_point : public std::optional<rpc_clock_type::time_point> {
194public:
195 using std::optional<rpc_clock_type::time_point>::optional;
196 opt_time_point(std::optional<rpc_clock_type::time_point> time_point) {
197 static_cast<std::optional<rpc_clock_type::time_point>&>(*this) = time_point;
198 }
199};
200
202
204 std::function<void()> cancel_send;
205 std::function<void()> cancel_wait;
206 cancellable** send_back_pointer = nullptr;
207 cancellable** wait_back_pointer = nullptr;
208 cancellable() = default;
209 cancellable(cancellable&& x) : cancel_send(std::move(x.cancel_send)), cancel_wait(std::move(x.cancel_wait)), send_back_pointer(x.send_back_pointer), wait_back_pointer(x.wait_back_pointer) {
210 if (send_back_pointer) {
211 *send_back_pointer = this;
212 x.send_back_pointer = nullptr;
213 }
214 if (wait_back_pointer) {
215 *wait_back_pointer = this;
216 x.wait_back_pointer = nullptr;
217 }
218 }
219 cancellable& operator=(cancellable&& x) {
220 if (&x != this) {
221 this->~cancellable();
222 new (this) cancellable(std::move(x));
223 }
224 return *this;
225 }
226 void cancel() {
227 if (cancel_send) {
228 cancel_send();
229 }
230 if (cancel_wait) {
231 cancel_wait();
232 }
233 }
234 ~cancellable() {
235 cancel();
236 }
237};
238
239struct rcv_buf {
240 uint32_t size = 0;
241 std::optional<semaphore_units<>> su;
242 std::variant<std::vector<temporary_buffer<char>>, temporary_buffer<char>> bufs;
243 using iterator = std::vector<temporary_buffer<char>>::iterator;
244 rcv_buf() {}
245 explicit rcv_buf(size_t size_) : size(size_) {}
246 explicit rcv_buf(temporary_buffer<char> b) : size(b.size()), bufs(std::move(b)) {};
247 explicit rcv_buf(std::vector<temporary_buffer<char>> bufs, size_t size)
248 : size(size), bufs(std::move(bufs)) {};
249};
250
251struct snd_buf {
252 // Preferred, but not required, chunk size.
253 static constexpr size_t chunk_size = 128*1024;
254 uint32_t size = 0;
255 std::variant<std::vector<temporary_buffer<char>>, temporary_buffer<char>> bufs;
256 using iterator = std::vector<temporary_buffer<char>>::iterator;
257 snd_buf() {}
258 snd_buf(snd_buf&&) noexcept;
259 snd_buf& operator=(snd_buf&&) noexcept;
260 explicit snd_buf(size_t size_);
261 explicit snd_buf(temporary_buffer<char> b) : size(b.size()), bufs(std::move(b)) {};
262
263 explicit snd_buf(std::vector<temporary_buffer<char>> bufs, size_t size)
264 : size(size), bufs(std::move(bufs)) {};
265
266 temporary_buffer<char>& front();
267};
268
269static inline memory_input_stream<rcv_buf::iterator> make_deserializer_stream(rcv_buf& input) {
270 auto* b = std::get_if<temporary_buffer<char>>(&input.bufs);
271 if (b) {
273 } else {
274 auto& ar = std::get<std::vector<temporary_buffer<char>>>(input.bufs);
276 }
277}
278
280public:
281 virtual ~compressor() {}
282 // compress data and leave head_space bytes at the beginning of returned buffer
283 virtual snd_buf compress(size_t head_space, snd_buf data) = 0;
284 // decompress data
285 virtual rcv_buf decompress(rcv_buf data) = 0;
286 virtual sstring name() const = 0;
287 virtual future<> close() noexcept { return make_ready_future<>(); };
288
289 // factory to create compressor for a connection
290 class factory {
291 public:
292 virtual ~factory() {}
293 // return feature string that will be sent as part of protocol negotiation
294 virtual const sstring& supported() const = 0;
295 // negotiate compress algorithm
296 // send_empty_frame() requests an empty frame to be sent to the peer compressor on the other side of the connection.
297 // By attaching a header to this empty frame, the compressor can communicate somthing to the peer,
298 // send_empty_frame() mustn't be called from inside compress() or decompress().
299 virtual std::unique_ptr<compressor> negotiate(sstring feature, bool is_server, std::function<future<>()> send_empty_frame) const {
300 return negotiate(feature, is_server);
301 }
302 virtual std::unique_ptr<compressor> negotiate(sstring feature, bool is_server) const = 0;
303 };
304};
305
306class connection;
307
309constexpr size_t max_queued_stream_buffers = 50;
310constexpr size_t max_stream_buffers_memory = 100 * 1024;
311
314
315// send data Out...
316template<typename... Out>
317class sink {
318public:
319 class impl {
320 protected:
322 semaphore _sem;
323 std::exception_ptr _ex;
324 impl(xshard_connection_ptr con) : _con(std::move(con)), _sem(max_stream_buffers_memory) {}
325 public:
326 virtual ~impl() {};
327 virtual future<> operator()(const Out&... args) = 0;
328 virtual future<> close() = 0;
329 virtual future<> flush() = 0;
330 friend sink;
331 };
332
333private:
334 shared_ptr<impl> _impl;
335
336public:
337 sink(shared_ptr<impl> impl) : _impl(std::move(impl)) {}
338 future<> operator()(const Out&... args) {
339 return _impl->operator()(args...);
340 }
341 future<> close() {
342 return _impl->close();
343 }
344 // Calling this function makes sure that any data buffered
345 // by the stream sink will be flushed to the network.
346 // It does not mean the data was received by the corresponding
347 // source.
348 future<> flush() {
349 return _impl->flush();
350 }
351 connection_id get_id() const;
352};
353
354// receive data In...
355template<typename... In>
356class source {
357public:
358 class impl {
359 protected:
362 impl(xshard_connection_ptr con) : _con(std::move(con)) {
363 _bufs.reserve(max_queued_stream_buffers);
364 }
365 public:
366 virtual ~impl() {}
367 virtual future<std::optional<std::tuple<In...>>> operator()() = 0;
368 friend source;
369 };
370private:
371 shared_ptr<impl> _impl;
372
373public:
374 source(shared_ptr<impl> impl) : _impl(std::move(impl)) {}
375 future<std::optional<std::tuple<In...>>> operator()() {
376 return _impl->operator()();
377 };
378 connection_id get_id() const;
379 template<typename Serializer, typename... Out> sink<Out...> make_sink();
380};
381
396template <typename... T>
397class tuple : public std::tuple<T...> {
398public:
399 using std::tuple<T...>::tuple;
400 tuple(std::tuple<T...>&& x) : std::tuple<T...>(std::move(x)) {}
401};
402
404
405#ifndef SEASTAR_P2581R1
406template <typename... T>
407tuple(T&&...) -> tuple<T...>;
408#endif
409
410} // namespace rpc
411
412}
413
414namespace std {
415template<>
416struct hash<seastar::rpc::connection_id> {
417 size_t operator()(const seastar::rpc::connection_id& id) const {
418 size_t h = 0;
419 boost::hash_combine(h, std::hash<uint64_t>{}(id.id()));
420 return h;
421 }
422};
423
424template <typename... T>
425struct tuple_size<seastar::rpc::tuple<T...>> : tuple_size<tuple<T...>> {
426};
427
428template <size_t I, typename... T>
429struct tuple_element<I, seastar::rpc::tuple<T...>> : tuple_element<I, tuple<T...>> {
430};
431
432}
433
434#if FMT_VERSION >= 90000
435template <> struct fmt::formatter<seastar::rpc::connection_id> : fmt::ostream_formatter {};
436#endif
437
438#if FMT_VERSION < 100000
439// fmt v10 introduced formatter for std::exception
440template <std::derived_from<seastar::rpc::error> T>
441struct fmt::formatter<T> : fmt::formatter<string_view> {
442 auto format(const T& e, fmt::format_context& ctx) const {
443 return fmt::format_to(ctx.out(), "{}", e.what());
444 }
445};
446#endif
447
448#if FMT_VERSION < 100000
449template <typename T>
450struct fmt::formatter<seastar::rpc::optional<T>> {
451 constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); }
452 auto format(const seastar::rpc::optional<T>& opt, fmt::format_context& ctx) const {
453 if (opt) {
454 return fmt::format_to(ctx.out(), "optional({})", *opt);
455 } else {
456 return fmt::format_to(ctx.out(), "none");
457 }
458 }
459};
460#else
461template <typename T>
462struct fmt::formatter<seastar::rpc::optional<T>> : private fmt::formatter<std::optional<T>> {
463 using fmt::formatter<std::optional<T>>::parse;
464 auto format(const seastar::rpc::optional<T>& opt, fmt::format_context& ctx) const {
465 return fmt::formatter<std::optional<T>>::format(opt, ctx);
466 }
467};
468#endif
Definition: circular_buffer.hh:63
Definition: simple-stream.hh:383
A representation of a possibly not-yet-computed value.
Definition: future.hh:1240
Definition: simple-stream.hh:464
Definition: rpc_types.hh:165
Definition: rpc_types.hh:139
Definition: rpc_types.hh:290
Definition: rpc_types.hh:279
Definition: rpc_types.hh:66
Definition: rpc.hh:249
Definition: rpc_types.hh:134
Definition: rpc_types.hh:193
Definition: rpc_types.hh:188
Definition: rpc_types.hh:175
Definition: rpc_types.hh:160
Definition: rpc.hh:591
Definition: rpc_types.hh:317
Definition: rpc_types.hh:356
Definition: rpc_types.hh:170
Definition: rpc_types.hh:144
Definition: rpc_types.hh:397
Definition: rpc_types.hh:155
Definition: rpc_types.hh:149
Definition: simple-stream.hh:330
Definition: socket_defs.hh:47
size_t size() const noexcept
Gets the buffer size.
Definition: temporary_buffer.hh:130
holds the implementation parts of the metrics layer, do not use directly.
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
STL namespace.
Definition: rpc_types.hh:203
Definition: rpc_types.hh:97
Definition: rpc_types.hh:179
Definition: rpc_types.hh:239
Definition: rpc_types.hh:251
Definition: rpc_types.hh:54