Seastar
High performance C++ framework for concurrent servers
rpc_types.hh
1 /*
2  * This file is open source software, licensed to you under the terms
3  * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4  * distributed with this work for additional information regarding copyright
5  * ownership. You may not use this file except in compliance with the License.
6  *
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an
13  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14  * KIND, either express or implied. See the License for the
15  * specific language governing permissions and limitations
16  * under the License.
17  */
18 /*
19  * Copyright (C) 2015 Cloudius Systems, Ltd.
20  */
21 
22 #pragma once
23 
24 #if FMT_VERSION >= 9'00'00
25 #include <fmt/ostream.h>
26 #endif
27 #if FMT_VERSION >= 10'00'00
28 #include <fmt/std.h>
29 #endif
30 
31 #include <seastar/net/api.hh>
32 #include <stdexcept>
33 #include <string>
34 #include <boost/any.hpp>
35 #include <boost/type.hpp>
36 #include <seastar/util/std-compat.hh>
37 #include <seastar/util/variant_utils.hh>
38 #include <seastar/core/timer.hh>
39 #include <seastar/core/circular_buffer.hh>
40 #include <seastar/core/simple-stream.hh>
41 #include <seastar/core/lowres_clock.hh>
42 #include <boost/functional/hash.hpp>
43 #include <seastar/core/sharded.hh>
44 
45 namespace seastar {
46 
47 namespace rpc {
48 
49 using rpc_clock_type = lowres_clock;
50 
51 // used to tag a type for serializers
52 template<typename T>
53 using type = boost::type<T>;
54 
55 struct stats {
56  using counter_type = uint64_t;
57  counter_type replied = 0;
58  counter_type pending = 0;
59  counter_type exception_received = 0;
60  counter_type sent_messages = 0;
61  counter_type wait_reply = 0;
62  counter_type timeout = 0;
63 };
64 
66  uint64_t _id;
67 
68 public:
69  uint64_t id() const {
70  return _id;
71  }
72  bool operator==(const connection_id& o) const {
73  return _id == o._id;
74  }
75  explicit operator bool() const {
76  return shard() != 0xffff;
77  }
78  size_t shard() const {
79  return size_t(_id & 0xffff);
80  }
81  constexpr static connection_id make_invalid_id(uint64_t _id = 0) {
82  return make_id(_id, 0xffff);
83  }
84  constexpr static connection_id make_id(uint64_t _id, uint16_t shard) {
85  return {_id << 16 | shard};
86  }
87  constexpr connection_id(uint64_t id) : _id(id) {}
88 };
89 
90 constexpr connection_id invalid_connection_id = connection_id::make_invalid_id();
91 
92 std::ostream& operator<<(std::ostream&, const connection_id&);
93 
94 class server;
95 
96 struct client_info {
97  socket_address addr;
99  connection_id conn_id;
100  std::unordered_map<sstring, boost::any> user_data;
101  template <typename T>
102  void attach_auxiliary(const sstring& key, T&& object) {
103  user_data.emplace(key, boost::any(std::forward<T>(object)));
104  }
105  template <typename T>
106  T& retrieve_auxiliary(const sstring& key) {
107  auto it = user_data.find(key);
108  assert(it != user_data.end());
109  return boost::any_cast<T&>(it->second);
110  }
111  template <typename T>
112  std::add_const_t<T>& retrieve_auxiliary(const sstring& key) const {
113  return const_cast<client_info*>(this)->retrieve_auxiliary<std::add_const_t<T>>(key);
114  }
115  template <typename T>
116  T* retrieve_auxiliary_opt(const sstring& key) noexcept {
117  auto it = user_data.find(key);
118  if (it == user_data.end()) {
119  return nullptr;
120  }
121  return &boost::any_cast<T&>(it->second);
122  }
123  template <typename T>
124  const T* retrieve_auxiliary_opt(const sstring& key) const noexcept {
125  auto it = user_data.find(key);
126  if (it == user_data.end()) {
127  return nullptr;
128  }
129  return &boost::any_cast<const T&>(it->second);
130  }
131 };
132 
133 class error : public std::runtime_error {
134 public:
135  error(const std::string& msg) : std::runtime_error(msg) {}
136 };
137 
138 class closed_error : public error {
139 public:
140  closed_error() : error("connection is closed") {}
141 };
142 
143 class timeout_error : public error {
144 public:
145  timeout_error() : error("rpc call timed out") {}
146 };
147 
148 class unknown_verb_error : public error {
149 public:
150  uint64_t type;
151  unknown_verb_error(uint64_t type_) : error("unknown verb"), type(type_) {}
152 };
153 
155 public:
156  unknown_exception_error() : error("unknown exception") {}
157 };
158 
159 class rpc_protocol_error : public error {
160 public:
161  rpc_protocol_error() : error("rpc protocol exception") {}
162 };
163 
164 class canceled_error : public error {
165 public:
166  canceled_error() : error("rpc call was canceled") {}
167 };
168 
169 class stream_closed : public error {
170 public:
171  stream_closed() : error("rpc stream was closed by peer") {}
172 };
173 
174 class remote_verb_error : public error {
175  using error::error;
176 };
177 
178 struct no_wait_type {};
179 
180 // return this from a callback if client does not want to waiting for a reply
181 extern no_wait_type no_wait;
182 
185 
186 template <typename T>
187 class optional : public std::optional<T> {
188 public:
189  using std::optional<T>::optional;
190 };
191 
192 class opt_time_point : public std::optional<rpc_clock_type::time_point> {
193 public:
194  using std::optional<rpc_clock_type::time_point>::optional;
195  opt_time_point(std::optional<rpc_clock_type::time_point> time_point) {
196  static_cast<std::optional<rpc_clock_type::time_point>&>(*this) = time_point;
197  }
198 };
199 
201 
202 struct cancellable {
203  std::function<void()> cancel_send;
204  std::function<void()> cancel_wait;
205  cancellable** send_back_pointer = nullptr;
206  cancellable** wait_back_pointer = nullptr;
207  cancellable() = default;
208  cancellable(cancellable&& x) : cancel_send(std::move(x.cancel_send)), cancel_wait(std::move(x.cancel_wait)), send_back_pointer(x.send_back_pointer), wait_back_pointer(x.wait_back_pointer) {
209  if (send_back_pointer) {
210  *send_back_pointer = this;
211  x.send_back_pointer = nullptr;
212  }
213  if (wait_back_pointer) {
214  *wait_back_pointer = this;
215  x.wait_back_pointer = nullptr;
216  }
217  }
218  cancellable& operator=(cancellable&& x) {
219  if (&x != this) {
220  this->~cancellable();
221  new (this) cancellable(std::move(x));
222  }
223  return *this;
224  }
225  void cancel() {
226  if (cancel_send) {
227  cancel_send();
228  }
229  if (cancel_wait) {
230  cancel_wait();
231  }
232  }
233  ~cancellable() {
234  cancel();
235  }
236 };
237 
238 struct rcv_buf {
239  uint32_t size = 0;
240  std::optional<semaphore_units<>> su;
241  std::variant<std::vector<temporary_buffer<char>>, temporary_buffer<char>> bufs;
242  using iterator = std::vector<temporary_buffer<char>>::iterator;
243  rcv_buf() {}
244  explicit rcv_buf(size_t size_) : size(size_) {}
245  explicit rcv_buf(temporary_buffer<char> b) : size(b.size()), bufs(std::move(b)) {};
246  explicit rcv_buf(std::vector<temporary_buffer<char>> bufs, size_t size)
247  : size(size), bufs(std::move(bufs)) {};
248 };
249 
250 struct snd_buf {
251  // Preferred, but not required, chunk size.
252  static constexpr size_t chunk_size = 128*1024;
253  uint32_t size = 0;
254  std::variant<std::vector<temporary_buffer<char>>, temporary_buffer<char>> bufs;
255  using iterator = std::vector<temporary_buffer<char>>::iterator;
256  snd_buf() {}
257  snd_buf(snd_buf&&) noexcept;
258  snd_buf& operator=(snd_buf&&) noexcept;
259  explicit snd_buf(size_t size_);
260  explicit snd_buf(temporary_buffer<char> b) : size(b.size()), bufs(std::move(b)) {};
261 
262  explicit snd_buf(std::vector<temporary_buffer<char>> bufs, size_t size)
263  : size(size), bufs(std::move(bufs)) {};
264 
265  temporary_buffer<char>& front();
266 };
267 
268 static inline memory_input_stream<rcv_buf::iterator> make_deserializer_stream(rcv_buf& input) {
269  auto* b = std::get_if<temporary_buffer<char>>(&input.bufs);
270  if (b) {
272  } else {
273  auto& ar = std::get<std::vector<temporary_buffer<char>>>(input.bufs);
275  }
276 }
277 
278 class compressor {
279 public:
280  virtual ~compressor() {}
281  // compress data and leave head_space bytes at the beginning of returned buffer
282  virtual snd_buf compress(size_t head_space, snd_buf data) = 0;
283  // decompress data
284  virtual rcv_buf decompress(rcv_buf data) = 0;
285  virtual sstring name() const = 0;
286 
287  // factory to create compressor for a connection
288  class factory {
289  public:
290  virtual ~factory() {}
291  // return feature string that will be sent as part of protocol negotiation
292  virtual const sstring& supported() const = 0;
293  // negotiate compress algorithm
294  virtual std::unique_ptr<compressor> negotiate(sstring feature, bool is_server) const = 0;
295  };
296 };
297 
298 class connection;
299 
301 constexpr size_t max_queued_stream_buffers = 50;
302 constexpr size_t max_stream_buffers_memory = 100 * 1024;
303 
306 
307 // send data Out...
308 template<typename... Out>
309 class sink {
310 public:
311  class impl {
312  protected:
314  semaphore _sem;
315  std::exception_ptr _ex;
316  impl(xshard_connection_ptr con) : _con(std::move(con)), _sem(max_stream_buffers_memory) {}
317  public:
318  virtual ~impl() {};
319  virtual future<> operator()(const Out&... args) = 0;
320  virtual future<> close() = 0;
321  virtual future<> flush() = 0;
322  friend sink;
323  };
324 
325 private:
326  shared_ptr<impl> _impl;
327 
328 public:
329  sink(shared_ptr<impl> impl) : _impl(std::move(impl)) {}
330  future<> operator()(const Out&... args) {
331  return _impl->operator()(args...);
332  }
333  future<> close() {
334  return _impl->close();
335  }
336  // Calling this function makes sure that any data buffered
337  // by the stream sink will be flushed to the network.
338  // It does not mean the data was received by the corresponding
339  // source.
340  future<> flush() {
341  return _impl->flush();
342  }
343  connection_id get_id() const;
344 };
345 
346 // receive data In...
347 template<typename... In>
348 class source {
349 public:
350  class impl {
351  protected:
354  impl(xshard_connection_ptr con) : _con(std::move(con)) {
355  _bufs.reserve(max_queued_stream_buffers);
356  }
357  public:
358  virtual ~impl() {}
359  virtual future<std::optional<std::tuple<In...>>> operator()() = 0;
360  friend source;
361  };
362 private:
363  shared_ptr<impl> _impl;
364 
365 public:
366  source(shared_ptr<impl> impl) : _impl(std::move(impl)) {}
367  future<std::optional<std::tuple<In...>>> operator()() {
368  return _impl->operator()();
369  };
370  connection_id get_id() const;
371  template<typename Serializer, typename... Out> sink<Out...> make_sink();
372 };
373 
388 template <typename... T>
389 class tuple : public std::tuple<T...> {
390 public:
391  using std::tuple<T...>::tuple;
392  tuple(std::tuple<T...>&& x) : std::tuple<T...>(std::move(x)) {}
393 };
394 
396 
397 template <typename... T>
398 tuple(T&&...) -> tuple<T...>;
399 
400 } // namespace rpc
401 
402 }
403 
404 namespace std {
405 template<>
406 struct hash<seastar::rpc::connection_id> {
407  size_t operator()(const seastar::rpc::connection_id& id) const {
408  size_t h = 0;
409  boost::hash_combine(h, std::hash<uint64_t>{}(id.id()));
410  return h;
411  }
412 };
413 
414 template <typename... T>
415 struct tuple_size<seastar::rpc::tuple<T...>> : tuple_size<tuple<T...>> {
416 };
417 
418 template <size_t I, typename... T>
419 struct tuple_element<I, seastar::rpc::tuple<T...>> : tuple_element<I, tuple<T...>> {
420 };
421 
422 }
423 
424 #if FMT_VERSION >= 9'00'00
425 template <> struct fmt::formatter<seastar::rpc::connection_id> : fmt::ostream_formatter {};
426 #endif
427 
428 #if FMT_VERSION >= 10'00'00
429 template <typename T>
430 struct fmt::formatter<seastar::rpc::optional<T>> : private fmt::formatter<std::optional<T>> {
431  using fmt::formatter<std::optional<T>>::parse;
432  auto format(const seastar::rpc::optional<T>& opt, fmt::format_context& ctx) const {
433  return fmt::formatter<std::optional<T>>::format(opt, ctx);
434  }
435 };
436 #endif
Definition: circular_buffer.hh:63
Definition: simple-stream.hh:383
Definition: shared_ptr.hh:270
Definition: simple-stream.hh:464
Definition: rpc_types.hh:164
Definition: rpc_types.hh:138
Definition: rpc_types.hh:288
Definition: rpc_types.hh:278
Definition: rpc_types.hh:65
Definition: rpc.hh:237
Definition: rpc_types.hh:133
Definition: rpc_types.hh:192
Definition: rpc_types.hh:187
Definition: rpc_types.hh:174
Definition: rpc_types.hh:159
Definition: rpc.hh:573
Definition: rpc_types.hh:309
Definition: rpc_types.hh:348
Definition: rpc_types.hh:169
Definition: rpc_types.hh:143
Definition: rpc_types.hh:389
Definition: rpc_types.hh:154
Definition: rpc_types.hh:148
Definition: simple-stream.hh:330
Definition: socket_defs.hh:47
size_t size() const noexcept
Gets the buffer size.
Definition: temporary_buffer.hh:130
holds the implementation parts of the metrics layer, do not use directly.
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
sstring format(const char *fmt, A &&... a)
Definition: print.hh:142
Definition: rpc_types.hh:202
Definition: rpc_types.hh:96
Definition: rpc_types.hh:178
Definition: rpc_types.hh:238
Definition: rpc_types.hh:250
Definition: rpc_types.hh:55