Seastar
High performance C++ framework for concurrent servers
rpc_types.hh
1 /*
2  * This file is open source software, licensed to you under the terms
3  * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4  * distributed with this work for additional information regarding copyright
5  * ownership. You may not use this file except in compliance with the License.
6  *
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an
13  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14  * KIND, either express or implied. See the License for the
15  * specific language governing permissions and limitations
16  * under the License.
17  */
18 /*
19  * Copyright (C) 2015 Cloudius Systems, Ltd.
20  */
21 
22 #pragma once
23 
24 #include <seastar/net/api.hh>
25 #include <stdexcept>
26 #include <string>
27 #include <boost/any.hpp>
28 #include <boost/type.hpp>
29 #include <seastar/util/std-compat.hh>
30 #include <seastar/util/variant_utils.hh>
31 #include <seastar/core/timer.hh>
32 #include <seastar/core/circular_buffer.hh>
33 #include <seastar/core/simple-stream.hh>
34 #include <seastar/core/lowres_clock.hh>
35 #include <boost/functional/hash.hpp>
36 #include <seastar/core/sharded.hh>
37 
38 namespace seastar {
39 
40 namespace rpc {
41 
42 using rpc_clock_type = lowres_clock;
43 
44 // used to tag a type for serializers
45 template<typename T>
46 using type = boost::type<T>;
47 
48 struct stats {
49  using counter_type = uint64_t;
50  counter_type replied = 0;
51  counter_type pending = 0;
52  counter_type exception_received = 0;
53  counter_type sent_messages = 0;
54  counter_type wait_reply = 0;
55  counter_type timeout = 0;
56 };
57 
58 
59 struct client_info {
60  socket_address addr;
61  std::unordered_map<sstring, boost::any> user_data;
62  template <typename T>
63  void attach_auxiliary(const sstring& key, T&& object) {
64  user_data.emplace(key, boost::any(std::forward<T>(object)));
65  }
66  template <typename T>
67  T& retrieve_auxiliary(const sstring& key) {
68  auto it = user_data.find(key);
69  assert(it != user_data.end());
70  return boost::any_cast<T&>(it->second);
71  }
72  template <typename T>
73  typename std::add_const<T>::type& retrieve_auxiliary(const sstring& key) const {
74  return const_cast<client_info*>(this)->retrieve_auxiliary<typename std::add_const<T>::type>(key);
75  }
76 };
77 
78 class error : public std::runtime_error {
79 public:
80  error(const std::string& msg) : std::runtime_error(msg) {}
81 };
82 
83 class closed_error : public error {
84 public:
85  closed_error() : error("connection is closed") {}
86 };
87 
88 class timeout_error : public error {
89 public:
90  timeout_error() : error("rpc call timed out") {}
91 };
92 
93 class unknown_verb_error : public error {
94 public:
95  uint64_t type;
96  unknown_verb_error(uint64_t type_) : error("unknown verb"), type(type_) {}
97 };
98 
100 public:
101  unknown_exception_error() : error("unknown exception") {}
102 };
103 
104 class rpc_protocol_error : public error {
105 public:
106  rpc_protocol_error() : error("rpc protocol exception") {}
107 };
108 
109 class canceled_error : public error {
110 public:
111  canceled_error() : error("rpc call was canceled") {}
112 };
113 
114 class stream_closed : public error {
115 public:
116  stream_closed() : error("rpc stream was closed by peer") {}
117 };
118 
119 class remote_verb_error : public error {
120  using error::error;
121 };
122 
123 struct no_wait_type {};
124 
125 // return this from a callback if client does not want to waiting for a reply
126 extern no_wait_type no_wait;
127 
130 
131 template <typename T>
132 class optional : public std::optional<T> {
133 public:
134  using std::optional<T>::optional;
135 };
136 
137 class opt_time_point : public std::optional<rpc_clock_type::time_point> {
138 public:
139  using std::optional<rpc_clock_type::time_point>::optional;
140  opt_time_point(std::optional<rpc_clock_type::time_point> time_point) {
141  static_cast<std::optional<rpc_clock_type::time_point>&>(*this) = time_point;
142  }
143 };
144 
146 
147 struct cancellable {
148  std::function<void()> cancel_send;
149  std::function<void()> cancel_wait;
150  cancellable** send_back_pointer = nullptr;
151  cancellable** wait_back_pointer = nullptr;
152  cancellable() = default;
153  cancellable(cancellable&& x) : cancel_send(std::move(x.cancel_send)), cancel_wait(std::move(x.cancel_wait)), send_back_pointer(x.send_back_pointer), wait_back_pointer(x.wait_back_pointer) {
154  if (send_back_pointer) {
155  *send_back_pointer = this;
156  x.send_back_pointer = nullptr;
157  }
158  if (wait_back_pointer) {
159  *wait_back_pointer = this;
160  x.wait_back_pointer = nullptr;
161  }
162  }
163  cancellable& operator=(cancellable&& x) {
164  if (&x != this) {
165  this->~cancellable();
166  new (this) cancellable(std::move(x));
167  }
168  return *this;
169  }
170  void cancel() {
171  if (cancel_send) {
172  cancel_send();
173  }
174  if (cancel_wait) {
175  cancel_wait();
176  }
177  }
178  ~cancellable() {
179  cancel();
180  }
181 };
182 
183 struct rcv_buf {
184  uint32_t size = 0;
185  std::optional<semaphore_units<>> su;
186  std::variant<std::vector<temporary_buffer<char>>, temporary_buffer<char>> bufs;
187  using iterator = std::vector<temporary_buffer<char>>::iterator;
188  rcv_buf() {}
189  explicit rcv_buf(size_t size_) : size(size_) {}
190  explicit rcv_buf(temporary_buffer<char> b) : size(b.size()), bufs(std::move(b)) {};
191  explicit rcv_buf(std::vector<temporary_buffer<char>> bufs, size_t size)
192  : size(size), bufs(std::move(bufs)) {};
193 };
194 
195 struct snd_buf {
196  // Preferred, but not required, chunk size.
197  static constexpr size_t chunk_size = 128*1024;
198  uint32_t size = 0;
199  std::variant<std::vector<temporary_buffer<char>>, temporary_buffer<char>> bufs;
200  using iterator = std::vector<temporary_buffer<char>>::iterator;
201  snd_buf() {}
202  snd_buf(snd_buf&&) noexcept;
203  snd_buf& operator=(snd_buf&&) noexcept;
204  explicit snd_buf(size_t size_);
205  explicit snd_buf(temporary_buffer<char> b) : size(b.size()), bufs(std::move(b)) {};
206 
207  explicit snd_buf(std::vector<temporary_buffer<char>> bufs, size_t size)
208  : size(size), bufs(std::move(bufs)) {};
209 
210  temporary_buffer<char>& front();
211 };
212 
213 static inline memory_input_stream<rcv_buf::iterator> make_deserializer_stream(rcv_buf& input) {
214  auto* b = std::get_if<temporary_buffer<char>>(&input.bufs);
215  if (b) {
217  } else {
218  auto& ar = std::get<std::vector<temporary_buffer<char>>>(input.bufs);
220  }
221 }
222 
223 class compressor {
224 public:
225  virtual ~compressor() {}
226  // compress data and leave head_space bytes at the beginning of returned buffer
227  virtual snd_buf compress(size_t head_space, snd_buf data) = 0;
228  // decompress data
229  virtual rcv_buf decompress(rcv_buf data) = 0;
230  virtual sstring name() const = 0;
231 
232  // factory to create compressor for a connection
233  class factory {
234  public:
235  virtual ~factory() {}
236  // return feature string that will be sent as part of protocol negotiation
237  virtual const sstring& supported() const = 0;
238  // negotiate compress algorithm
239  virtual std::unique_ptr<compressor> negotiate(sstring feature, bool is_server) const = 0;
240  };
241 };
242 
243 class connection;
244 
246  uint64_t id;
247  bool operator==(const connection_id& o) const {
248  return id == o.id;
249  }
250  operator bool() const {
251  return shard() != 0xffff;
252  }
253  size_t shard() const {
254  return size_t(id & 0xffff);
255  }
256  constexpr static connection_id make_invalid_id(uint64_t id = 0) {
257  return make_id(id, 0xffff);
258  }
259  constexpr static connection_id make_id(uint64_t id, uint16_t shard) {
260  return {id << 16 | shard};
261  }
262 };
263 
264 constexpr connection_id invalid_connection_id = connection_id::make_invalid_id();
265 
266 std::ostream& operator<<(std::ostream&, const connection_id&);
267 
269 constexpr size_t max_queued_stream_buffers = 50;
270 constexpr size_t max_stream_buffers_memory = 100 * 1024;
271 
274 
275 // send data Out...
276 template<typename... Out>
277 class sink {
278 public:
279  class impl {
280  protected:
282  semaphore _sem;
283  std::exception_ptr _ex;
284  impl(xshard_connection_ptr con) : _con(std::move(con)), _sem(max_stream_buffers_memory) {}
285  public:
286  virtual ~impl() {};
287  virtual future<> operator()(const Out&... args) = 0;
288  virtual future<> close() = 0;
289  virtual future<> flush() = 0;
290  friend sink;
291  };
292 
293 private:
294  shared_ptr<impl> _impl;
295 
296 public:
297  sink(shared_ptr<impl> impl) : _impl(std::move(impl)) {}
298  future<> operator()(const Out&... args) {
299  return _impl->operator()(args...);
300  }
301  future<> close() {
302  return _impl->close();
303  }
304  // Calling this function makes sure that any data buffered
305  // by the stream sink will be flushed to the network.
306  // It does not mean the data was received by the corresponding
307  // source.
308  future<> flush() {
309  return _impl->flush();
310  }
311  connection_id get_id() const;
312 };
313 
314 // receive data In...
315 template<typename... In>
316 class source {
317 public:
318  class impl {
319  protected:
322  impl(xshard_connection_ptr con) : _con(std::move(con)) {
323  _bufs.reserve(max_queued_stream_buffers);
324  }
325  public:
326  virtual ~impl() {}
327  virtual future<std::optional<std::tuple<In...>>> operator()() = 0;
328  friend source;
329  };
330 private:
331  shared_ptr<impl> _impl;
332 
333 public:
334  source(shared_ptr<impl> impl) : _impl(std::move(impl)) {}
335  future<std::optional<std::tuple<In...>>> operator()() {
336  return _impl->operator()();
337  };
338  connection_id get_id() const;
339  template<typename Serializer, typename... Out> sink<Out...> make_sink();
340 };
341 
356 template <typename... T>
357 class tuple : public std::tuple<T...> {
358 public:
359  using std::tuple<T...>::tuple;
360  tuple(std::tuple<T...>&& x) : std::tuple<T...>(std::move(x)) {}
361 };
362 
364 
365 template <typename... T>
366 tuple(T&&...) -> tuple<T...>;
367 
368 } // namespace rpc
369 
370 }
371 
372 namespace std {
373 template<>
374 struct hash<seastar::rpc::connection_id> {
375  size_t operator()(const seastar::rpc::connection_id& id) const {
376  size_t h = 0;
377  boost::hash_combine(h, std::hash<uint64_t>{}(id.id));
378  return h;
379  }
380 };
381 
382 template <typename... T>
383 struct tuple_size<seastar::rpc::tuple<T...>> : tuple_size<tuple<T...>> {
384 };
385 
386 template <size_t I, typename... T>
387 struct tuple_element<I, seastar::rpc::tuple<T...>> : tuple_element<I, tuple<T...>> {
388 };
389 
390 }
Definition: circular_buffer.hh:59
Definition: simple-stream.hh:373
Definition: shared_ptr.hh:255
Definition: simple-stream.hh:454
Definition: rpc_types.hh:109
Definition: rpc_types.hh:83
Definition: rpc_types.hh:233
Definition: rpc_types.hh:223
Definition: rpc.hh:236
Definition: rpc_types.hh:78
Definition: rpc_types.hh:137
Definition: rpc_types.hh:132
Definition: rpc_types.hh:119
Definition: rpc_types.hh:104
Definition: rpc_types.hh:277
Definition: rpc_types.hh:316
Definition: rpc_types.hh:114
Definition: rpc_types.hh:88
Definition: rpc_types.hh:357
Definition: rpc_types.hh:99
Definition: rpc_types.hh:93
Definition: simple-stream.hh:320
Definition: socket_defs.hh:41
size_t size() const noexcept
Gets the buffer size.
Definition: temporary_buffer.hh:125
holds the implementation parts of the metrics layer, do not use directly.
Seastar API namespace.
Definition: abort_on_ebadf.hh:24
Definition: rpc_types.hh:147
Definition: rpc_types.hh:59
Definition: rpc_types.hh:245
Definition: rpc_types.hh:123
Definition: rpc_types.hh:183
Definition: rpc_types.hh:195
Definition: rpc_types.hh:48