Seastar
High performance C++ framework for concurrent servers
rpc_types.hh
1 /*
2  * This file is open source software, licensed to you under the terms
3  * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4  * distributed with this work for additional information regarding copyright
5  * ownership. You may not use this file except in compliance with the License.
6  *
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an
13  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14  * KIND, either express or implied. See the License for the
15  * specific language governing permissions and limitations
16  * under the License.
17  */
18 /*
19  * Copyright (C) 2015 Cloudius Systems, Ltd.
20  */
21 
22 #pragma once
23 
24 #include <seastar/net/api.hh>
25 #include <stdexcept>
26 #include <string>
27 #include <boost/any.hpp>
28 #include <boost/type.hpp>
29 #include <seastar/util/std-compat.hh>
30 #include <seastar/util/variant_utils.hh>
31 #include <seastar/core/timer.hh>
32 #include <seastar/core/circular_buffer.hh>
33 #include <seastar/core/simple-stream.hh>
34 #include <seastar/core/lowres_clock.hh>
35 #include <boost/functional/hash.hpp>
36 #include <seastar/core/sharded.hh>
37 
38 namespace seastar {
39 
40 namespace rpc {
41 
42 using rpc_clock_type = lowres_clock;
43 
44 // used to tag a type for serializers
45 template<typename T>
46 using type = boost::type<T>;
47 
48 struct stats {
49  using counter_type = uint64_t;
50  counter_type replied = 0;
51  counter_type pending = 0;
52  counter_type exception_received = 0;
53  counter_type sent_messages = 0;
54  counter_type wait_reply = 0;
55  counter_type timeout = 0;
56 };
57 
58 
59 struct client_info {
60  socket_address addr;
61  std::unordered_map<sstring, boost::any> user_data;
62  template <typename T>
63  void attach_auxiliary(const sstring& key, T&& object) {
64  user_data.emplace(key, boost::any(std::forward<T>(object)));
65  }
66  template <typename T>
67  T& retrieve_auxiliary(const sstring& key) {
68  auto it = user_data.find(key);
69  assert(it != user_data.end());
70  return boost::any_cast<T&>(it->second);
71  }
72  template <typename T>
73  typename std::add_const<T>::type& retrieve_auxiliary(const sstring& key) const {
74  return const_cast<client_info*>(this)->retrieve_auxiliary<typename std::add_const<T>::type>(key);
75  }
76 };
77 
78 class error : public std::runtime_error {
79 public:
80  error(const std::string& msg) : std::runtime_error(msg) {}
81 };
82 
83 class closed_error : public error {
84 public:
85  closed_error() : error("connection is closed") {}
86 };
87 
88 class timeout_error : public error {
89 public:
90  timeout_error() : error("rpc call timed out") {}
91 };
92 
93 class unknown_verb_error : public error {
94 public:
95  uint64_t type;
96  unknown_verb_error(uint64_t type_) : error("unknown verb"), type(type_) {}
97 };
98 
100 public:
101  unknown_exception_error() : error("unknown exception") {}
102 };
103 
104 class rpc_protocol_error : public error {
105 public:
106  rpc_protocol_error() : error("rpc protocol exception") {}
107 };
108 
109 class canceled_error : public error {
110 public:
111  canceled_error() : error("rpc call was canceled") {}
112 };
113 
114 class stream_closed : public error {
115 public:
116  stream_closed() : error("rpc stream was closed by peer") {}
117 };
118 
119 struct no_wait_type {};
120 
121 // return this from a callback if client does not want to waiting for a reply
122 extern no_wait_type no_wait;
123 
126 
127 template <typename T>
128 class optional : public std::optional<T> {
129 public:
130  using std::optional<T>::optional;
131 };
132 
133 class opt_time_point : public std::optional<rpc_clock_type::time_point> {
134 public:
135  using std::optional<rpc_clock_type::time_point>::optional;
136  opt_time_point(std::optional<rpc_clock_type::time_point> time_point) {
137  static_cast<std::optional<rpc_clock_type::time_point>&>(*this) = time_point;
138  }
139 };
140 
142 
143 struct cancellable {
144  std::function<void()> cancel_send;
145  std::function<void()> cancel_wait;
146  cancellable** send_back_pointer = nullptr;
147  cancellable** wait_back_pointer = nullptr;
148  cancellable() = default;
149  cancellable(cancellable&& x) : cancel_send(std::move(x.cancel_send)), cancel_wait(std::move(x.cancel_wait)), send_back_pointer(x.send_back_pointer), wait_back_pointer(x.wait_back_pointer) {
150  if (send_back_pointer) {
151  *send_back_pointer = this;
152  x.send_back_pointer = nullptr;
153  }
154  if (wait_back_pointer) {
155  *wait_back_pointer = this;
156  x.wait_back_pointer = nullptr;
157  }
158  }
159  cancellable& operator=(cancellable&& x) {
160  if (&x != this) {
161  this->~cancellable();
162  new (this) cancellable(std::move(x));
163  }
164  return *this;
165  }
166  void cancel() {
167  if (cancel_send) {
168  cancel_send();
169  }
170  if (cancel_wait) {
171  cancel_wait();
172  }
173  }
174  ~cancellable() {
175  cancel();
176  }
177 };
178 
179 struct rcv_buf {
180  uint32_t size = 0;
181  std::optional<semaphore_units<>> su;
182  std::variant<std::vector<temporary_buffer<char>>, temporary_buffer<char>> bufs;
183  using iterator = std::vector<temporary_buffer<char>>::iterator;
184  rcv_buf() {}
185  explicit rcv_buf(size_t size_) : size(size_) {}
186  explicit rcv_buf(temporary_buffer<char> b) : size(b.size()), bufs(std::move(b)) {};
187  explicit rcv_buf(std::vector<temporary_buffer<char>> bufs, size_t size)
188  : size(size), bufs(std::move(bufs)) {};
189 };
190 
191 struct snd_buf {
192  // Preferred, but not required, chunk size.
193  static constexpr size_t chunk_size = 128*1024;
194  uint32_t size = 0;
195  std::variant<std::vector<temporary_buffer<char>>, temporary_buffer<char>> bufs;
196  using iterator = std::vector<temporary_buffer<char>>::iterator;
197  snd_buf() {}
198  snd_buf(snd_buf&&) noexcept;
199  snd_buf& operator=(snd_buf&&) noexcept;
200  explicit snd_buf(size_t size_);
201  explicit snd_buf(temporary_buffer<char> b) : size(b.size()), bufs(std::move(b)) {};
202 
203  explicit snd_buf(std::vector<temporary_buffer<char>> bufs, size_t size)
204  : size(size), bufs(std::move(bufs)) {};
205 
206  temporary_buffer<char>& front();
207 };
208 
209 static inline memory_input_stream<rcv_buf::iterator> make_deserializer_stream(rcv_buf& input) {
210  auto* b = std::get_if<temporary_buffer<char>>(&input.bufs);
211  if (b) {
213  } else {
214  auto& ar = std::get<std::vector<temporary_buffer<char>>>(input.bufs);
216  }
217 }
218 
219 class compressor {
220 public:
221  virtual ~compressor() {}
222  // compress data and leave head_space bytes at the beginning of returned buffer
223  virtual snd_buf compress(size_t head_space, snd_buf data) = 0;
224  // decompress data
225  virtual rcv_buf decompress(rcv_buf data) = 0;
226  virtual sstring name() const = 0;
227 
228  // factory to create compressor for a connection
229  class factory {
230  public:
231  virtual ~factory() {}
232  // return feature string that will be sent as part of protocol negotiation
233  virtual const sstring& supported() const = 0;
234  // negotiate compress algorithm
235  virtual std::unique_ptr<compressor> negotiate(sstring feature, bool is_server) const = 0;
236  };
237 };
238 
239 class connection;
240 
242  uint64_t id;
243  bool operator==(const connection_id& o) const {
244  return id == o.id;
245  }
246  operator bool() const {
247  return shard() != 0xffff;
248  }
249  size_t shard() const {
250  return size_t(id & 0xffff);
251  }
252  constexpr static connection_id make_invalid_id(uint64_t id = 0) {
253  return make_id(id, 0xffff);
254  }
255  constexpr static connection_id make_id(uint64_t id, uint16_t shard) {
256  return {id << 16 | shard};
257  }
258 };
259 
260 constexpr connection_id invalid_connection_id = connection_id::make_invalid_id();
261 
262 std::ostream& operator<<(std::ostream&, const connection_id&);
263 
265 constexpr size_t max_queued_stream_buffers = 50;
266 constexpr size_t max_stream_buffers_memory = 100 * 1024;
267 
270 
271 // send data Out...
272 template<typename... Out>
273 class sink {
274 public:
275  class impl {
276  protected:
278  semaphore _sem;
279  std::exception_ptr _ex;
280  impl(xshard_connection_ptr con) : _con(std::move(con)), _sem(max_stream_buffers_memory) {}
281  public:
282  virtual ~impl() {};
283  virtual future<> operator()(const Out&... args) = 0;
284  virtual future<> close() = 0;
285  virtual future<> flush() = 0;
286  friend sink;
287  };
288 
289 private:
290  shared_ptr<impl> _impl;
291 
292 public:
293  sink(shared_ptr<impl> impl) : _impl(std::move(impl)) {}
294  future<> operator()(const Out&... args) {
295  return _impl->operator()(args...);
296  }
297  future<> close() {
298  return _impl->close();
299  }
300  // Calling this function makes sure that any data buffered
301  // by the stream sink will be flushed to the network.
302  // It does not mean the data was received by the corresponding
303  // source.
304  future<> flush() {
305  return _impl->flush();
306  }
307  connection_id get_id() const;
308 };
309 
310 // receive data In...
311 template<typename... In>
312 class source {
313 public:
314  class impl {
315  protected:
318  impl(xshard_connection_ptr con) : _con(std::move(con)) {
319  _bufs.reserve(max_queued_stream_buffers);
320  }
321  public:
322  virtual ~impl() {}
323  virtual future<std::optional<std::tuple<In...>>> operator()() = 0;
324  friend source;
325  };
326 private:
327  shared_ptr<impl> _impl;
328 
329 public:
330  source(shared_ptr<impl> impl) : _impl(std::move(impl)) {}
331  future<std::optional<std::tuple<In...>>> operator()() {
332  return _impl->operator()();
333  };
334  connection_id get_id() const;
335  template<typename Serializer, typename... Out> sink<Out...> make_sink();
336 };
337 
352 template <typename... T>
353 class tuple : public std::tuple<T...> {
354 public:
355  using std::tuple<T...>::tuple;
356  tuple(std::tuple<T...>&& x) : std::tuple<T...>(std::move(x)) {}
357 };
358 
360 
361 template <typename... T>
362 tuple(T&&...) -> tuple<T...>;
363 
364 } // namespace rpc
365 
366 }
367 
368 namespace std {
369 template<>
370 struct hash<seastar::rpc::connection_id> {
371  size_t operator()(const seastar::rpc::connection_id& id) const {
372  size_t h = 0;
373  boost::hash_combine(h, std::hash<uint64_t>{}(id.id));
374  return h;
375  }
376 };
377 
378 template <typename... T>
379 struct tuple_size<seastar::rpc::tuple<T...>> : tuple_size<tuple<T...>> {
380 };
381 
382 template <size_t I, typename... T>
383 struct tuple_element<I, seastar::rpc::tuple<T...>> : tuple_element<I, tuple<T...>> {
384 };
385 
386 }
seastar::shared_ptr< impl >
seastar::rpc::stats
Definition: rpc_types.hh:48
seastar
Seastar API namespace.
Definition: abort_on_ebadf.hh:24
seastar::rpc::tuple
Definition: rpc_types.hh:353
seastar::rpc::compressor
Definition: rpc_types.hh:219
seastar::lw_shared_ptr
Definition: shared_ptr.hh:62
seastar::fragmented_memory_input_stream
Definition: simple-stream.hh:46
seastar::rpc::connection_id
Definition: rpc_types.hh:241
seastar::rpc::source
Definition: rpc_types.hh:312
seastar::rpc::canceled_error
Definition: rpc_types.hh:109
seastar::rpc::unknown_verb_error
Definition: rpc_types.hh:93
seastar::rpc::error
Definition: rpc_types.hh:78
seastar::rpc::stream_closed
Definition: rpc_types.hh:114
seastar::temporary_buffer< char >
seastar::temporary_buffer::size
size_t size() const
Gets the buffer size.
Definition: temporary_buffer.hh:125
seastar::rpc::opt_time_point
Definition: rpc_types.hh:133
seastar::rpc::rcv_buf
Definition: rpc_types.hh:179
seastar::rpc::snd_buf
Definition: rpc_types.hh:191
seastar::rpc::timeout_error
Definition: rpc_types.hh:88
seastar::memory_input_stream
Definition: simple-stream.hh:49
seastar::rpc::cancellable
Definition: rpc_types.hh:143
seastar::rpc::sink
Definition: rpc_types.hh:273
seastar::rpc::optional
Definition: rpc_types.hh:128
seastar::rpc::rpc_protocol_error
Definition: rpc_types.hh:104
seastar::rpc::no_wait_type
Definition: rpc_types.hh:119
seastar::simple_memory_input_stream
Definition: simple-stream.hh:320
seastar::future
A representation of a possibly not-yet-computed value.
Definition: future.hh:154
seastar::circular_buffer
Definition: circular_buffer.hh:59
seastar::rpc::closed_error
Definition: rpc_types.hh:83
timer.hh
impl
holds the implementation parts of the metrics layer, do not use directly.
seastar::rpc::compressor::factory
Definition: rpc_types.hh:229
seastar::rpc::connection
Definition: rpc.hh:222
seastar::basic_semaphore< semaphore_default_exception_factory >
seastar::socket_address
Definition: socket_defs.hh:41
seastar::rpc::unknown_exception_error
Definition: rpc_types.hh:99
seastar::rpc::client_info
Definition: rpc_types.hh:59