Seastar
High performance C++ framework for concurrent servers
rpc_impl.hh
1 /*
2  * This file is open source software, licensed to you under the terms
3  * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4  * distributed with this work for additional information regarding copyright
5  * ownership. You may not use this file except in compliance with the License.
6  *
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an
13  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14  * KIND, either express or implied. See the License for the
15  * specific language governing permissions and limitations
16  * under the License.
17  */
18 /*
19  * Copyright (C) 2015 Cloudius Systems, Ltd.
20  */
21 #pragma once
22 
23 #include <seastar/core/function_traits.hh>
24 #include <seastar/core/shared_ptr.hh>
25 #include <seastar/core/sstring.hh>
26 #include <seastar/core/when_all.hh>
27 #include <seastar/util/is_smart_ptr.hh>
28 #include <seastar/core/simple-stream.hh>
29 #include <boost/range/numeric.hpp>
30 #include <boost/range/adaptor/transformed.hpp>
31 #include <seastar/net/packet-data-source.hh>
32 #include <seastar/core/print.hh>
33 
34 namespace seastar {
35 
36 namespace rpc {
37 
38 enum class exception_type : uint32_t {
39  USER = 0,
40  UNKNOWN_VERB = 1,
41 };
42 
43 template<typename T>
45  using type = T;
46 };
47 
48 template<typename T>
50  using type = T;
51 };
52 
53 struct wait_type {}; // opposite of no_wait_type
54 
55 // tags to tell whether we want a const client_info& parameter
58 
59 // tags to tell whether we want a opt_time_point parameter
62 
63 // General case
64 template <typename Ret, typename... In>
65 struct signature<Ret (In...)> {
66  using ret_type = Ret;
67  using arg_types = std::tuple<In...>;
68  using clean = signature;
71 };
72 
73 // Specialize 'clean' for handlers that receive client_info
74 template <typename Ret, typename... In>
75 struct signature<Ret (const client_info&, In...)> {
76  using ret_type = Ret;
77  using arg_types = std::tuple<In...>;
78  using clean = signature<Ret (In...)>;
81 };
82 
83 template <typename Ret, typename... In>
84 struct signature<Ret (client_info&, In...)> {
85  using ret_type = Ret;
86  using arg_types = std::tuple<In...>;
87  using clean = signature<Ret (In...)>;
90 };
91 
92 // Specialize 'clean' for handlers that receive client_info and opt_time_point
93 template <typename Ret, typename... In>
94 struct signature<Ret (const client_info&, opt_time_point, In...)> {
95  using ret_type = Ret;
96  using arg_types = std::tuple<In...>;
97  using clean = signature<Ret (In...)>;
100 };
101 
102 template <typename Ret, typename... In>
103 struct signature<Ret (client_info&, opt_time_point, In...)> {
104  using ret_type = Ret;
105  using arg_types = std::tuple<In...>;
106  using clean = signature<Ret (In...)>;
109 };
110 
111 // Specialize 'clean' for handlers that receive opt_time_point
112 template <typename Ret, typename... In>
113 struct signature<Ret (opt_time_point, In...)> {
114  using ret_type = Ret;
115  using arg_types = std::tuple<In...>;
116  using clean = signature<Ret (In...)>;
119 };
120 
121 template <typename T>
123  using type = wait_type;
124  using cleaned_type = T;
125 };
126 
127 template <typename... T>
128 struct wait_signature<future<T...>> {
129  using type = wait_type;
130  using cleaned_type = future<T...>;
131 };
132 
133 template <>
135  using type = no_wait_type;
136  using cleaned_type = void;
137 };
138 
139 template <>
141  using type = no_wait_type;
142  using cleaned_type = future<>;
143 };
144 
145 template <typename T>
146 using wait_signature_t = typename wait_signature<T>::type;
147 
148 template <typename... In>
149 inline
150 std::tuple<In...>
151 maybe_add_client_info(dont_want_client_info, client_info& ci, std::tuple<In...>&& args) {
152  return std::move(args);
153 }
154 
155 template <typename... In>
156 inline
157 std::tuple<std::reference_wrapper<client_info>, In...>
158 maybe_add_client_info(do_want_client_info, client_info& ci, std::tuple<In...>&& args) {
159  return std::tuple_cat(std::make_tuple(std::ref(ci)), std::move(args));
160 }
161 
162 template <typename... In>
163 inline
164 std::tuple<In...>
165 maybe_add_time_point(dont_want_time_point, opt_time_point& otp, std::tuple<In...>&& args) {
166  return std::move(args);
167 }
168 
169 template <typename... In>
170 inline
171 std::tuple<opt_time_point, In...>
172 maybe_add_time_point(do_want_time_point, opt_time_point& otp, std::tuple<In...>&& args) {
173  return std::tuple_cat(std::make_tuple(otp), std::move(args));
174 }
175 
176 inline sstring serialize_connection_id(const connection_id& id) {
177  sstring p = uninitialized_string(sizeof(id));
178  auto c = p.data();
179  write_le(c, id.id);
180  return p;
181 }
182 
183 inline connection_id deserialize_connection_id(const sstring& s) {
184  connection_id id;
185  auto p = s.c_str();
186  id.id = read_le<decltype(id.id)>(p);
187  return id;
188 }
189 
190 template <bool IsSmartPtr>
192 
193 template <>
194 struct serialize_helper<false> {
195  template <typename Serializer, typename Output, typename T>
196  static inline void serialize(Serializer& serializer, Output& out, const T& t) {
197  return write(serializer, out, t);
198  }
199 };
200 
201 template <>
202 struct serialize_helper<true> {
203  template <typename Serializer, typename Output, typename T>
204  static inline void serialize(Serializer& serializer, Output& out, const T& t) {
205  return write(serializer, out, *t);
206  }
207 };
208 
209 template <typename Serializer, typename Output, typename... T>
210 inline void do_marshall(Serializer& serializer, Output& out, const T&... args);
211 
212 template <typename Serializer, typename Output>
213 struct marshall_one {
214  template <typename T> struct helper {
215  static void doit(Serializer& serializer, Output& out, const T& arg) {
216  using serialize_helper_type = serialize_helper<is_smart_ptr<typename std::remove_reference<T>::type>::value>;
217  serialize_helper_type::serialize(serializer, out, arg);
218  }
219  };
220  template<typename T> struct helper<std::reference_wrapper<const T>> {
221  static void doit(Serializer& serializer, Output& out, const std::reference_wrapper<const T>& arg) {
222  helper<T>::doit(serializer, out, arg.get());
223  }
224  };
225  static void put_connection_id(const connection_id& cid, Output& out) {
226  sstring id = serialize_connection_id(cid);
227  out.write(id.c_str(), id.size());
228  }
229  template <typename... T> struct helper<sink<T...>> {
230  static void doit(Serializer& serializer, Output& out, const sink<T...>& arg) {
231  put_connection_id(arg.get_id(), out);
232  }
233  };
234  template <typename... T> struct helper<source<T...>> {
235  static void doit(Serializer& serializer, Output& out, const source<T...>& arg) {
236  put_connection_id(arg.get_id(), out);
237  }
238  };
239  template <typename... T> struct helper<tuple<T...>> {
240  static void doit(Serializer& serializer, Output& out, const tuple<T...>& arg) {
241  auto do_do_marshall = [&serializer, &out] (const auto&... args) {
242  do_marshall(serializer, out, args...);
243  };
244  std::apply(do_do_marshall, arg);
245  }
246  };
247 };
248 
249 template <typename Serializer, typename Output, typename... T>
250 inline void do_marshall(Serializer& serializer, Output& out, const T&... args) {
251  // C++ guarantees that brace-initialization expressions are evaluted in order
252  (void)std::initializer_list<int>{(marshall_one<Serializer, Output>::template helper<T>::doit(serializer, out, args), 1)...};
253 }
254 
255 static inline memory_output_stream<snd_buf::iterator> make_serializer_stream(snd_buf& output) {
256  auto* b = std::get_if<temporary_buffer<char>>(&output.bufs);
257  if (b) {
258  return memory_output_stream<snd_buf::iterator>(memory_output_stream<snd_buf::iterator>::simple(b->get_write(), b->size()));
259  } else {
260  auto& ar = std::get<std::vector<temporary_buffer<char>>>(output.bufs);
261  return memory_output_stream<snd_buf::iterator>(memory_output_stream<snd_buf::iterator>::fragmented(ar.begin(), output.size));
262  }
263 }
264 
265 template <typename Serializer, typename... T>
266 inline snd_buf marshall(Serializer& serializer, size_t head_space, const T&... args) {
267  measuring_output_stream measure;
268  do_marshall(serializer, measure, args...);
269  snd_buf ret(measure.size() + head_space);
270  auto out = make_serializer_stream(ret);
271  out.skip(head_space);
272  do_marshall(serializer, out, args...);
273  return ret;
274 }
275 
276 template <typename Serializer, typename Input>
277 inline std::tuple<> do_unmarshall(connection& c, Input& in) {
278  return std::make_tuple();
279 }
280 
281 template<typename Serializer, typename Input>
283  template<typename T> struct helper {
284  static T doit(connection& c, Input& in) {
285  return read(c.serializer<Serializer>(), in, type<T>());
286  }
287  };
288  template<typename T> struct helper<optional<T>> {
289  static optional<T> doit(connection& c, Input& in) {
290  if (in.size()) {
291  return optional<T>(read(c.serializer<Serializer>(), in, type<typename remove_optional<T>::type>()));
292  } else {
293  return optional<T>();
294  }
295  }
296  };
297  template<typename T> struct helper<std::reference_wrapper<const T>> {
298  static T doit(connection& c, Input& in) {
299  return helper<T>::doit(c, in);
300  }
301  };
302  static connection_id get_connection_id(Input& in) {
303  sstring id = uninitialized_string(sizeof(connection_id));
304  in.read(id.data(), sizeof(connection_id));
305  return deserialize_connection_id(id);
306  }
307  template<typename... T> struct helper<sink<T...>> {
308  static sink<T...> doit(connection& c, Input& in) {
309  return sink<T...>(make_shared<sink_impl<Serializer, T...>>(c.get_stream(get_connection_id(in))));
310  }
311  };
312  template<typename... T> struct helper<source<T...>> {
313  static source<T...> doit(connection& c, Input& in) {
314  return source<T...>(make_shared<source_impl<Serializer, T...>>(c.get_stream(get_connection_id(in))));
315  }
316  };
317  template <typename... T> struct helper<tuple<T...>> {
318  static tuple<T...> doit(connection& c, Input& in) {
319  return do_unmarshall<Serializer, Input, T...>(c, in);
320  }
321  };
322 };
323 
324 template <typename Serializer, typename Input, typename T0, typename... Trest>
325 inline std::tuple<T0, Trest...> do_unmarshall(connection& c, Input& in) {
326  // FIXME: something less recursive
327  auto first = std::make_tuple(unmarshal_one<Serializer, Input>::template helper<T0>::doit(c, in));
328  auto rest = do_unmarshall<Serializer, Input, Trest...>(c, in);
329  return std::tuple_cat(std::move(first), std::move(rest));
330 }
331 
332 template <typename Serializer, typename... T>
333 inline std::tuple<T...> unmarshall(connection& c, rcv_buf input) {
334  auto in = make_deserializer_stream(input);
335  return do_unmarshall<Serializer, decltype(in), T...>(c, in);
336 }
337 
338 inline std::exception_ptr unmarshal_exception(rcv_buf& d) {
339  std::exception_ptr ex;
340  auto data = make_deserializer_stream(d);
341 
342  uint32_t v32;
343  data.read(reinterpret_cast<char*>(&v32), 4);
344  exception_type ex_type = exception_type(le_to_cpu(v32));
345  data.read(reinterpret_cast<char*>(&v32), 4);
346  uint32_t ex_len = le_to_cpu(v32);
347 
348  switch (ex_type) {
349  case exception_type::USER: {
350  std::string s(ex_len, '\0');
351  data.read(&*s.begin(), ex_len);
352  ex = std::make_exception_ptr(std::runtime_error(std::move(s)));
353  break;
354  }
355  case exception_type::UNKNOWN_VERB: {
356  uint64_t v64;
357  data.read(reinterpret_cast<char*>(&v64), 8);
358  ex = std::make_exception_ptr(unknown_verb_error(le_to_cpu(v64)));
359  break;
360  }
361  default:
362  ex = std::make_exception_ptr(unknown_exception_error());
363  break;
364  }
365  return ex;
366 }
367 
368 template <typename Payload, typename... T>
370  bool done = false;
371  promise<T...> p;
372  template<typename... V>
373  void set_value(V&&... v) {
374  done = true;
375  p.set_value(internal::untuple(std::forward<V>(v))...);
376  }
377  ~rcv_reply_base() {
378  if (!done) {
379  p.set_exception(closed_error());
380  }
381  }
382 };
383 
384 template<typename Serializer, typename T>
385 struct rcv_reply : rcv_reply_base<T, T> {
386  inline void get_reply(rpc::client& dst, rcv_buf input) {
387  this->set_value(unmarshall<Serializer, T>(dst, std::move(input)));
388  }
389 };
390 
391 template<typename Serializer, typename... T>
392 struct rcv_reply<Serializer, future<T...>> : rcv_reply_base<std::tuple<T...>, T...> {
393  inline void get_reply(rpc::client& dst, rcv_buf input) {
394  this->set_value(unmarshall<Serializer, T...>(dst, std::move(input)));
395  }
396 };
397 
398 template<typename Serializer>
399 struct rcv_reply<Serializer, void> : rcv_reply_base<void, void> {
400  inline void get_reply(rpc::client& dst, rcv_buf input) {
401  this->set_value();
402  }
403 };
404 
405 template<typename Serializer>
406 struct rcv_reply<Serializer, future<>> : rcv_reply<Serializer, void> {};
407 
408 template <typename Serializer, typename Ret, typename... InArgs>
409 inline auto wait_for_reply(wait_type, std::optional<rpc_clock_type::time_point> timeout, cancellable* cancel, rpc::client& dst, id_type msg_id,
410  signature<Ret (InArgs...)> sig) {
411  using reply_type = rcv_reply<Serializer, Ret>;
412  auto lambda = [] (reply_type& r, rpc::client& dst, id_type msg_id, rcv_buf data) mutable {
413  if (msg_id >= 0) {
414  dst.get_stats_internal().replied++;
415  return r.get_reply(dst, std::move(data));
416  } else {
417  dst.get_stats_internal().exception_received++;
418  r.done = true;
419  r.p.set_exception(unmarshal_exception(data));
420  }
421  };
422  using handler_type = typename rpc::client::template reply_handler<reply_type, decltype(lambda)>;
423  auto r = std::make_unique<handler_type>(std::move(lambda));
424  auto fut = r->reply.p.get_future();
425  dst.wait_for_reply(msg_id, std::move(r), timeout, cancel);
426  return fut;
427 }
428 
429 template<typename Serializer, typename... InArgs>
430 inline auto wait_for_reply(no_wait_type, std::optional<rpc_clock_type::time_point>, cancellable* cancel, rpc::client& dst, id_type msg_id,
431  signature<no_wait_type (InArgs...)> sig) { // no_wait overload
432  return make_ready_future<>();
433 }
434 
435 template<typename Serializer, typename... InArgs>
436 inline auto wait_for_reply(no_wait_type, std::optional<rpc_clock_type::time_point>, cancellable* cancel, rpc::client& dst, id_type msg_id,
437  signature<future<no_wait_type> (InArgs...)> sig) { // future<no_wait> overload
438  return make_ready_future<>();
439 }
440 
441 // Convert a relative timeout (a duration) to an absolute one (time_point).
442 // Do the calculation safely so that a very large duration will be capped by
443 // time_point::max, instead of wrapping around to ancient history.
444 inline rpc_clock_type::time_point
445 relative_timeout_to_absolute(rpc_clock_type::duration relative) {
446  rpc_clock_type::time_point now = rpc_clock_type::now();
447  return now + std::min(relative, rpc_clock_type::time_point::max() - now);
448 }
449 
450 // Returns lambda that can be used to send rpc messages.
451 // The lambda gets client connection and rpc parameters as arguments, marshalls them sends
452 // to a server and waits for a reply. After receiving reply it unmarshalls it and signal completion
453 // to a caller.
454 template<typename Serializer, typename MsgType, typename Ret, typename... InArgs>
455 auto send_helper(MsgType xt, signature<Ret (InArgs...)> xsig) {
456  struct shelper {
457  MsgType t;
458  signature<Ret (InArgs...)> sig;
459  auto send(rpc::client& dst, std::optional<rpc_clock_type::time_point> timeout, cancellable* cancel, const InArgs&... args) {
460  if (dst.error()) {
461  using cleaned_ret_type = typename wait_signature<Ret>::cleaned_type;
463  }
464 
465  // send message
466  auto msg_id = dst.next_message_id();
467  snd_buf data = marshall(dst.template serializer<Serializer>(), 28, args...);
468  static_assert(snd_buf::chunk_size >= 28, "send buffer chunk size is too small");
469  auto p = data.front().get_write() + 8; // 8 extra bytes for expiration timer
470  write_le<uint64_t>(p, uint64_t(t));
471  write_le<int64_t>(p + 8, msg_id);
472  write_le<uint32_t>(p + 16, data.size - 28);
473 
474  // prepare reply handler, if return type is now_wait_type this does nothing, since no reply will be sent
475  using wait = wait_signature_t<Ret>;
476  return when_all(dst.send(std::move(data), timeout, cancel), wait_for_reply<Serializer>(wait(), timeout, cancel, dst, msg_id, sig)).then([] (auto r) {
477  return std::move(std::get<1>(r)); // return future of wait_for_reply
478  });
479  }
480  auto operator()(rpc::client& dst, const InArgs&... args) {
481  return send(dst, {}, nullptr, args...);
482  }
483  auto operator()(rpc::client& dst, rpc_clock_type::time_point timeout, const InArgs&... args) {
484  return send(dst, timeout, nullptr, args...);
485  }
486  auto operator()(rpc::client& dst, rpc_clock_type::duration timeout, const InArgs&... args) {
487  return send(dst, relative_timeout_to_absolute(timeout), nullptr, args...);
488  }
489  auto operator()(rpc::client& dst, cancellable& cancel, const InArgs&... args) {
490  return send(dst, {}, &cancel, args...);
491  }
492 
493  };
494  return shelper{xt, xsig};
495 }
496 
497 template<typename Serializer, typename SEASTAR_ELLIPSIS RetTypes>
498 inline future<> reply(wait_type, future<RetTypes SEASTAR_ELLIPSIS>&& ret, int64_t msg_id, shared_ptr<server::connection> client,
499  std::optional<rpc_clock_type::time_point> timeout) {
500  if (!client->error()) {
501  snd_buf data;
502  try {
503 #if SEASTAR_API_LEVEL < 6
504  if constexpr (sizeof...(RetTypes) == 0) {
505 #else
506  if constexpr (std::is_void_v<RetTypes>) {
507 #endif
508  ret.get();
509  data = std::invoke(marshall<Serializer>, std::ref(client->template serializer<Serializer>()), 12);
510  } else {
511  data = std::invoke(marshall<Serializer, const RetTypes& SEASTAR_ELLIPSIS>, std::ref(client->template serializer<Serializer>()), 12, std::move(ret.get0()));
512  }
513  } catch (std::exception& ex) {
514  uint32_t len = std::strlen(ex.what());
515  data = snd_buf(20 + len);
516  auto os = make_serializer_stream(data);
517  os.skip(12);
518  uint32_t v32 = cpu_to_le(uint32_t(exception_type::USER));
519  os.write(reinterpret_cast<char*>(&v32), sizeof(v32));
520  v32 = cpu_to_le(len);
521  os.write(reinterpret_cast<char*>(&v32), sizeof(v32));
522  os.write(ex.what(), len);
523  msg_id = -msg_id;
524  }
525 
526  return client->respond(msg_id, std::move(data), timeout);
527  } else {
528  ret.ignore_ready_future();
529  return make_ready_future<>();
530  }
531 }
532 
533 // specialization for no_wait_type which does not send a reply
534 template<typename Serializer>
535 inline future<> reply(no_wait_type, future<no_wait_type>&& r, int64_t msgid, shared_ptr<server::connection> client, std::optional<rpc_clock_type::time_point> timeout) {
536  try {
537  r.get();
538  } catch (std::exception& ex) {
539  client->get_logger()(client->info(), msgid, to_sstring("exception \"") + ex.what() + "\" in no_wait handler ignored");
540  }
541  return make_ready_future<>();
542 }
543 
544 template<typename Ret, typename... InArgs, typename WantClientInfo, typename WantTimePoint, typename Func, typename ArgsTuple>
545 inline futurize_t<Ret> apply(Func& func, client_info& info, opt_time_point time_point, WantClientInfo wci, WantTimePoint wtp, signature<Ret (InArgs...)> sig, ArgsTuple&& args) {
546  using futurator = futurize<Ret>;
547  try {
548  return futurator::apply(func, maybe_add_client_info(wci, info, maybe_add_time_point(wtp, time_point, std::forward<ArgsTuple>(args))));
549  } catch (std::runtime_error& ex) {
550  return futurator::make_exception_future(std::current_exception());
551  }
552 }
553 
554 // lref_to_cref is a helper that encapsulates lvalue reference in std::ref() or does nothing otherwise
555 template<typename T>
556 auto lref_to_cref(T&& x) {
557  return std::move(x);
558 }
559 
560 template<typename T>
561 auto lref_to_cref(T& x) {
562  return std::ref(x);
563 }
564 
565 // Creates lambda to handle RPC message on a server.
566 // The lambda unmarshalls all parameters, calls a handler, marshall return values and sends them back to a client
567 template <typename Serializer, typename Func, typename Ret, typename... InArgs, typename WantClientInfo, typename WantTimePoint>
568 auto recv_helper(signature<Ret (InArgs...)> sig, Func&& func, WantClientInfo wci, WantTimePoint wtp) {
569  using signature = decltype(sig);
570  using wait_style = wait_signature_t<Ret>;
571  return [func = lref_to_cref(std::forward<Func>(func))](shared_ptr<server::connection> client,
572  std::optional<rpc_clock_type::time_point> timeout,
573  int64_t msg_id,
574  rcv_buf data) mutable {
575  auto memory_consumed = client->estimate_request_size(data.size);
576  if (memory_consumed > client->max_request_size()) {
577  auto err = format("request size {:d} large than memory limit {:d}", memory_consumed, client->max_request_size());
578  client->get_logger()(client->peer_address(), err);
579  // FIXME: future is discarded
580  (void)try_with_gate(client->get_server().reply_gate(), [client, timeout, msg_id, err = std::move(err)] {
581  return reply<Serializer>(wait_style(), futurize<Ret>::make_exception_future(std::runtime_error(err.c_str())), msg_id, client, timeout).handle_exception([client, msg_id] (std::exception_ptr eptr) {
582  client->get_logger()(client->info(), msg_id, format("got exception while processing an oversized message: {}", eptr));
583  });
584  }).handle_exception_type([] (gate_closed_exception&) {/* ignore */});
585  return make_ready_future();
586  }
587  // note: apply is executed asynchronously with regards to networking so we cannot chain futures here by doing "return apply()"
588  auto f = client->wait_for_resources(memory_consumed, timeout).then([client, timeout, msg_id, data = std::move(data), &func] (auto permit) mutable {
589  // FIXME: future is discarded
590  (void)try_with_gate(client->get_server().reply_gate(), [client, timeout, msg_id, data = std::move(data), permit = std::move(permit), &func] () mutable {
591  try {
592  auto args = unmarshall<Serializer, InArgs...>(*client, std::move(data));
593  return apply(func, client->info(), timeout, WantClientInfo(), WantTimePoint(), signature(), std::move(args)).then_wrapped([client, timeout, msg_id, permit = std::move(permit)] (futurize_t<Ret> ret) mutable {
594  return reply<Serializer>(wait_style(), std::move(ret), msg_id, client, timeout).handle_exception([permit = std::move(permit), client, msg_id] (std::exception_ptr eptr) {
595  client->get_logger()(client->info(), msg_id, format("got exception while processing a message: {}", eptr));
596  });
597  });
598  } catch (...) {
599  client->get_logger()(client->info(), msg_id, format("got exception while processing a message: {}", std::current_exception()));
600  return make_ready_future();
601  }
602  }).handle_exception_type([] (gate_closed_exception&) {/* ignore */});
603  });
604 
605  if (timeout) {
606  f = f.handle_exception_type([] (semaphore_timed_out&) { /* ignore */ });
607  }
608 
609  return f;
610  };
611 }
612 
613 // helper to create copy constructible lambda from non copy constructible one. std::function<> works only with former kind.
614 template<typename Func>
615 auto make_copyable_function(Func&& func, std::enable_if_t<!std::is_copy_constructible<std::decay_t<Func>>::value, void*> = nullptr) {
616  auto p = make_lw_shared<typename std::decay_t<Func>>(std::forward<Func>(func));
617  return [p] (auto&&... args) { return (*p)( std::forward<decltype(args)>(args)... ); };
618 }
619 
620 template<typename Func>
621 auto make_copyable_function(Func&& func, std::enable_if_t<std::is_copy_constructible<std::decay_t<Func>>::value, void*> = nullptr) {
622  return std::forward<Func>(func);
623 }
624 
625 // This class is used to calculate client side rpc function signature.
626 // Return type is converted from a smart pointer to a type it points to.
627 // rpc::optional are converted to non optional type.
628 //
629 // Examples:
630 // std::unique_ptr<int>(int, rpc::optional<long>) -> int(int, long)
631 // double(float) -> double(float)
632 template<typename Ret, typename... In>
634  template<typename T, bool IsSmartPtr>
635  struct drop_smart_ptr_impl;
636  template<typename T>
637  struct drop_smart_ptr_impl<T, true> {
638  using type = typename T::element_type;
639  };
640  template<typename T>
641  struct drop_smart_ptr_impl<T, false> {
642  using type = T;
643  };
644  template<typename T>
645  using drop_smart_ptr = drop_smart_ptr_impl<T, is_smart_ptr<T>::value>;
646 
647  // if return type is smart ptr take a type it points to instead
648  using return_type = typename drop_smart_ptr<Ret>::type;
649 public:
650  using type = return_type(typename remove_optional<In>::type...);
651 };
652 
653 template<typename Serializer, typename MsgType>
654 template<typename Ret, typename... In>
655 auto protocol<Serializer, MsgType>::make_client(signature<Ret(In...)> clear_sig, MsgType t) {
656  using sig_type = signature<typename client_function_type<Ret, In...>::type>;
657  return send_helper<Serializer>(t, sig_type());
658 }
659 
660 template<typename Serializer, typename MsgType>
661 template<typename Func>
663  return make_client(typename signature<typename function_traits<Func>::signature>::clean(), t);
664 }
665 
666 template<typename Serializer, typename MsgType>
667 template<typename Func>
670  using clean_sig_type = typename sig_type::clean;
671  using want_client_info = typename sig_type::want_client_info;
672  using want_time_point = typename sig_type::want_time_point;
673  auto recv = recv_helper<Serializer>(clean_sig_type(), std::forward<Func>(func),
674  want_client_info(), want_time_point());
675  register_receiver(t, rpc_handler{sg, make_copyable_function(std::move(recv))});
676  return make_client(clean_sig_type(), t);
677 }
678 
679 template<typename Serializer, typename MsgType>
680 template<typename Func>
682  return register_handler(t, scheduling_group(), std::forward<Func>(func));
683 }
684 
685 template<typename Serializer, typename MsgType>
687  auto it = _handlers.find(t);
688  if (it != _handlers.end()) {
689  return it->second.use_gate.close().finally([this, t] {
690  _handlers.erase(t);
691  });
692  }
693  return make_ready_future<>();
694 }
695 
696 template<typename Serializer, typename MsgType>
697 bool protocol<Serializer, MsgType>::has_handler(MsgType msg_id) {
698  auto it = _handlers.find(msg_id);
699  if (it == _handlers.end()) {
700  return false;
701  }
702  return !it->second.use_gate.is_closed();
703 }
704 
705 template<typename Serializer, typename MsgType>
706 rpc_handler* protocol<Serializer, MsgType>::get_handler(uint64_t msg_id) {
707  rpc_handler* h = nullptr;
708  auto it = _handlers.find(MsgType(msg_id));
709  if (it != _handlers.end()) {
710  try {
711  it->second.use_gate.enter();
712  h = &it->second;
713  } catch (gate_closed_exception&) {
714  // unregistered, just ignore
715  }
716  }
717  return h;
718 }
719 
720 template<typename Serializer, typename MsgType>
721 void protocol<Serializer, MsgType>::put_handler(rpc_handler* h) {
722  h->use_gate.leave();
723 }
724 
725 template<typename T> T make_shard_local_buffer_copy(foreign_ptr<std::unique_ptr<T>> org);
726 
727 template<typename Serializer, typename... Out>
728 future<> sink_impl<Serializer, Out...>::operator()(const Out&... args) {
729  // note that we use remote serializer pointer, so if serailizer needs a state
730  // it should have per-cpu one
731  snd_buf data = marshall(this->_con->get()->template serializer<Serializer>(), 4, args...);
732  static_assert(snd_buf::chunk_size >= 4, "send buffer chunk size is too small");
733  auto p = data.front().get_write();
734  write_le<uint32_t>(p, data.size - 4);
735  // we do not want to dead lock on huge packets, so let them in
736  // but only one at a time
737  auto size = std::min(size_t(data.size), max_stream_buffers_memory);
738  return get_units(this->_sem, size).then([this, data = make_foreign(std::make_unique<snd_buf>(std::move(data)))] (semaphore_units<> su) mutable {
739  if (this->_ex) {
740  return make_exception_future(this->_ex);
741  }
742  // It is OK to discard this future. The user is required to
743  // wait for it when closing.
744  (void)smp::submit_to(this->_con->get_owner_shard(), [this, data = std::move(data)] () mutable {
745  connection* con = this->_con->get();
746  if (con->error()) {
747  return make_exception_future(closed_error());
748  }
749  if(con->sink_closed()) {
750  return make_exception_future(stream_closed());
751  }
752  return con->send(make_shard_local_buffer_copy(std::move(data)), {}, nullptr);
753  }).then_wrapped([su = std::move(su), this] (future<> f) {
754  if (f.failed() && !this->_ex) { // first error is the interesting one
755  this->_ex = f.get_exception();
756  } else {
757  f.ignore_ready_future();
758  }
759  });
760  return make_ready_future<>();
761  });
762 }
763 
764 template<typename Serializer, typename... Out>
765 future<> sink_impl<Serializer, Out...>::flush() {
766  // wait until everything is sent out before returning.
767  return with_semaphore(this->_sem, max_stream_buffers_memory, [this] {
768  if (this->_ex) {
769  return make_exception_future(this->_ex);
770  }
771  return make_ready_future();
772  });
773 }
774 
775 template<typename Serializer, typename... Out>
776 future<> sink_impl<Serializer, Out...>::close() {
777  return with_semaphore(this->_sem, max_stream_buffers_memory, [this] {
778  return smp::submit_to(this->_con->get_owner_shard(), [this] {
779  connection* con = this->_con->get();
780  if (con->sink_closed()) { // double close, should not happen!
781  return make_exception_future(stream_closed());
782  }
783  future<> f = make_ready_future<>();
784  if (!con->error() && !this->_ex) {
785  snd_buf data = marshall(con->template serializer<Serializer>(), 4);
786  static_assert(snd_buf::chunk_size >= 4, "send buffer chunk size is too small");
787  auto p = data.front().get_write();
788  write_le<uint32_t>(p, -1U); // max len fragment marks an end of a stream
789  f = con->send(std::move(data), {}, nullptr);
790  } else {
791  f = this->_ex ? make_exception_future(this->_ex) : make_exception_future(closed_error());
792  }
793  return f.finally([con] { return con->close_sink(); });
794  });
795  });
796 }
797 
798 template<typename Serializer, typename... Out>
799 sink_impl<Serializer, Out...>::~sink_impl() {
800  // A failure to close might leave some continuations running after
801  // this is destroyed, leading to use-after-free bugs.
802  assert(this->_con->get()->sink_closed());
803 }
804 
805 template<typename Serializer, typename... In>
806 future<std::optional<std::tuple<In...>>> source_impl<Serializer, In...>::operator()() {
807  auto process_one_buffer = [this] {
808  foreign_ptr<std::unique_ptr<rcv_buf>> buf = std::move(this->_bufs.front());
809  this->_bufs.pop_front();
810  return std::apply([] (In&&... args) {
811  auto ret = std::make_optional(std::make_tuple(std::move(args)...));
812  return make_ready_future<std::optional<std::tuple<In...>>>(std::move(ret));
813  }, unmarshall<Serializer, In...>(*this->_con->get(), make_shard_local_buffer_copy(std::move(buf))));
814  };
815 
816  if (!this->_bufs.empty()) {
817  return process_one_buffer();
818  }
819 
820  // refill buffers from remote cpu
821  return smp::submit_to(this->_con->get_owner_shard(), [this] () -> future<> {
822  connection* con = this->_con->get();
823  if (con->_source_closed) {
824  return make_exception_future<>(stream_closed());
825  }
826  return con->stream_receive(this->_bufs).then_wrapped([this, con] (future<>&& f) {
827  if (f.failed()) {
828  return con->close_source().then_wrapped([ex = f.get_exception()] (future<> f){
829  f.ignore_ready_future();
830  return make_exception_future<>(ex);
831  });
832  }
833  if (this->_bufs.empty()) { // nothing to read -> eof
834  return con->close_source().then_wrapped([] (future<> f) {
835  f.ignore_ready_future();
836  return make_ready_future<>();
837  });
838  }
839  return make_ready_future<>();
840  });
841  }).then([this, process_one_buffer] () {
842  if (this->_bufs.empty()) {
843  return make_ready_future<std::optional<std::tuple<In...>>>(std::nullopt);
844  } else {
845  return process_one_buffer();
846  }
847  });
848 }
849 
850 template<typename... Out>
851 connection_id sink<Out...>::get_id() const {
852  return _impl->_con->get()->get_connection_id();
853 }
854 
855 template<typename... In>
856 connection_id source<In...>::get_id() const {
857  return _impl->_con->get()->get_connection_id();
858 }
859 
860 template<typename... In>
861 template<typename Serializer, typename... Out>
862 sink<Out...> source<In...>::make_sink() {
863  return sink<Out...>(make_shared<sink_impl<Serializer, Out...>>(_impl->_con));
864 }
865 
866 }
867 
868 }
869 
870 namespace std {
871 template<>
872 struct hash<seastar::rpc::streaming_domain_type> {
873  size_t operator()(const seastar::rpc::streaming_domain_type& domain) const {
874  size_t h = 0;
875  boost::hash_combine(h, std::hash<uint64_t>{}(domain._id));
876  return h;
877  }
878 };
879 }
880 
881 
seastar::rpc::remove_optional
Definition: rpc_impl.hh:44
seastar::when_all
auto when_all(FutOrFuncs &&... fut_or_funcs) noexcept
Definition: when_all.hh:248
seastar::now
future now()
Returns a ready future.
Definition: later.hh:35
seastar::rpc::unmarshal_one::helper
Definition: rpc_impl.hh:283
seastar
Seastar API namespace.
Definition: abort_on_ebadf.hh:24
seastar::rpc::serialize_helper
Definition: rpc_impl.hh:191
seastar::rpc::tuple
Definition: rpc_types.hh:353
seastar::rpc::wait_signature
Definition: rpc_impl.hh:122
seastar::promise
promise - allows a future value to be made available at a later time.
Definition: future.hh:151
seastar::smp::submit_to
static futurize_t< std::result_of_t< Func()> > submit_to(unsigned t, smp_submit_to_options options, Func &&func) noexcept
Definition: smp.hh:326
seastar::rpc::protocol::register_handler
auto register_handler(MsgType t, Func &&func)
Definition: rpc_impl.hh:681
seastar::rpc::dont_want_client_info
Definition: rpc_impl.hh:57
seastar::rpc::connection_id
Definition: rpc_types.hh:241
seastar::rpc::source
Definition: rpc_types.hh:312
seastar::make_ready_future
future< T... > make_ready_future(A &&... value) noexcept
Creates a future in an available, value state.
Definition: future.hh:2048
seastar::rpc::do_want_time_point
Definition: rpc_impl.hh:60
seastar::rpc::rcv_reply_base
Definition: rpc_impl.hh:369
seastar::rpc::streaming_domain_type
Definition: rpc.hh:127
seastar::reference_wrapper
Definition: reference_wrapper.hh:43
seastar::rpc::client_function_type
Definition: rpc_impl.hh:633
seastar::rpc::protocol
Definition: rpc.hh:707
seastar::rpc::marshall_one
Definition: rpc_impl.hh:213
seastar::lowres_clock::now
static time_point now() noexcept
Definition: lowres_clock.hh:118
seastar::ref
reference_wrapper< T > ref(T &object) noexcept
Wraps reference in a reference_wrapper.
Definition: reference_wrapper.hh:62
seastar::promise::set_value
void set_value(A &&... a) noexcept
Sets the promises value.
Definition: future.hh:1013
seastar::rpc::rpc_handler
Definition: rpc.hh:597
seastar::make_exception_future
future< T... > make_exception_future(std::exception_ptr &&value) noexcept
Creates a future in an available, failed state.
Definition: future.hh:2054
seastar::format
sstring format(const char *fmt, A &&... a)
Definition: print.hh:134
seastar::rpc::rcv_reply
Definition: rpc_impl.hh:385
seastar::rpc::opt_time_point
Definition: rpc_types.hh:133
seastar::rpc::signature< Ret(In...)>
Definition: rpc_impl.hh:65
seastar::rpc::rcv_buf
Definition: rpc_types.hh:179
seastar::rpc::marshall_one::helper
Definition: rpc_impl.hh:214
seastar::rpc::cancellable
Definition: rpc_types.hh:143
seastar::rpc::sink_impl
Definition: rpc.hh:358
seastar::rpc::signature
Definition: rpc.hh:174
seastar::rpc::sink
Definition: rpc_types.hh:273
seastar::rpc::optional
Definition: rpc_types.hh:128
seastar::rpc::no_wait_type
Definition: rpc_types.hh:119
seastar::rpc::dont_want_time_point
Definition: rpc_impl.hh:61
seastar::function_traits
Definition: function_traits.hh:29
seastar::future::finally
future< T > finally(Func &&func) noexcept
Definition: future.hh:1744
seastar::future
A representation of a possibly not-yet-computed value.
Definition: future.hh:154
seastar::rpc::client
Definition: rpc.hh:375
seastar::rpc::closed_error
Definition: rpc_types.hh:83
seastar::rpc::source_impl
Definition: rpc.hh:369
seastar::rpc::protocol::make_client
auto make_client(MsgType t)
Definition: rpc_impl.hh:662
seastar::rpc::connection
Definition: rpc.hh:222
seastar::rpc::do_want_client_info
Definition: rpc_impl.hh:56
seastar::rpc::unmarshal_one
Definition: rpc_impl.hh:282
seastar::rpc::wait_type
Definition: rpc_impl.hh:53
seastar::rpc::client_info
Definition: rpc_types.hh:59
seastar::rpc::protocol::unregister_handler
future unregister_handler(MsgType t)
Definition: rpc_impl.hh:686
seastar::future::get
value_type && get()
gets the value returned by the computation
Definition: future.hh:1440
seastar::scheduling_group
Identifies function calls that are accounted as a group.
Definition: scheduling.hh:251
seastar::alien::submit_to
std::future< T > submit_to(unsigned shard, Func func)
Definition: alien.hh:167