23 #include <seastar/core/function_traits.hh>
24 #include <seastar/core/shared_ptr.hh>
25 #include <seastar/core/sstring.hh>
26 #include <seastar/core/when_all.hh>
27 #include <seastar/util/is_smart_ptr.hh>
28 #include <seastar/core/simple-stream.hh>
29 #include <boost/range/numeric.hpp>
30 #include <boost/range/adaptor/transformed.hpp>
31 #include <seastar/net/packet-data-source.hh>
32 #include <seastar/core/print.hh>
38 enum class exception_type : uint32_t {
64 template <
typename Ret,
typename... In>
67 using arg_types = std::tuple<In...>;
74 template <
typename Ret,
typename... In>
77 using arg_types = std::tuple<In...>;
83 template <
typename Ret,
typename... In>
86 using arg_types = std::tuple<In...>;
93 template <
typename Ret,
typename... In>
96 using arg_types = std::tuple<In...>;
102 template <
typename Ret,
typename... In>
104 using ret_type = Ret;
105 using arg_types = std::tuple<In...>;
112 template <
typename Ret,
typename... In>
114 using ret_type = Ret;
115 using arg_types = std::tuple<In...>;
121 template <
typename T>
124 using cleaned_type = T;
127 template <
typename... T>
136 using cleaned_type = void;
145 template <
typename T>
148 template <
typename... In>
152 return std::move(args);
155 template <
typename... In>
157 std::tuple<std::reference_wrapper<client_info>, In...>
158 maybe_add_client_info(do_want_client_info, client_info& ci, std::tuple<In...>&& args) {
159 return std::tuple_cat(std::make_tuple(
std::ref(ci)), std::move(args));
162 template <
typename... In>
165 maybe_add_time_point(dont_want_time_point, opt_time_point& otp, std::tuple<In...>&& args) {
166 return std::move(args);
169 template <
typename... In>
171 std::tuple<opt_time_point, In...>
172 maybe_add_time_point(do_want_time_point, opt_time_point& otp, std::tuple<In...>&& args) {
173 return std::tuple_cat(std::make_tuple(otp), std::move(args));
176 inline sstring serialize_connection_id(
const connection_id&
id) {
177 sstring p = uninitialized_string(
sizeof(
id));
183 inline connection_id deserialize_connection_id(
const sstring& s) {
186 id.id = read_le<decltype(
id.
id)>(p);
190 template <
bool IsSmartPtr>
195 template <
typename Serializer,
typename Output,
typename T>
196 static inline void serialize(Serializer& serializer, Output& out,
const T& t) {
197 return write(serializer, out, t);
203 template <
typename Serializer,
typename Output,
typename T>
204 static inline void serialize(Serializer& serializer, Output& out,
const T& t) {
205 return write(serializer, out, *t);
209 template <
typename Serializer,
typename Output,
typename... T>
210 inline void do_marshall(Serializer& serializer, Output& out,
const T&... args);
212 template <
typename Serializer,
typename Output>
215 static void doit(Serializer& serializer, Output& out,
const T& arg) {
217 serialize_helper_type::serialize(serializer, out, arg);
221 static void doit(Serializer& serializer, Output& out,
const std::reference_wrapper<const T>& arg) {
225 static void put_connection_id(
const connection_id& cid, Output& out) {
226 sstring
id = serialize_connection_id(cid);
227 out.write(
id.c_str(),
id.size());
230 static void doit(Serializer& serializer, Output& out,
const sink<T...>& arg) {
231 put_connection_id(arg.get_id(), out);
235 static void doit(Serializer& serializer, Output& out,
const source<T...>& arg) {
236 put_connection_id(arg.get_id(), out);
240 static void doit(Serializer& serializer, Output& out,
const tuple<T...>& arg) {
241 auto do_do_marshall = [&serializer, &out] (
const auto&... args) {
242 do_marshall(serializer, out, args...);
244 std::apply(do_do_marshall, arg);
249 template <
typename Serializer,
typename Output,
typename... T>
250 inline void do_marshall(Serializer& serializer, Output& out,
const T&... args) {
255 static inline memory_output_stream<snd_buf::iterator> make_serializer_stream(snd_buf& output) {
256 auto* b = std::get_if<temporary_buffer<char>>(&output.bufs);
258 return memory_output_stream<snd_buf::iterator>(memory_output_stream<snd_buf::iterator>::simple(b->get_write(), b->size()));
260 auto& ar = std::get<std::vector<temporary_buffer<char>>>(output.bufs);
261 return memory_output_stream<snd_buf::iterator>(memory_output_stream<snd_buf::iterator>::fragmented(ar.begin(), output.size));
265 template <
typename Serializer,
typename... T>
266 inline snd_buf marshall(Serializer& serializer,
size_t head_space,
const T&... args) {
267 measuring_output_stream measure;
268 do_marshall(serializer, measure, args...);
269 snd_buf ret(measure.size() + head_space);
270 auto out = make_serializer_stream(ret);
271 out.skip(head_space);
272 do_marshall(serializer, out, args...);
276 template <
typename Serializer,
typename Input>
277 inline std::tuple<> do_unmarshall(connection& c, Input& in) {
278 return std::make_tuple();
281 template<
typename Serializer,
typename Input>
285 return read(c.serializer<Serializer>(), in, type<T>());
291 return optional<T>(read(c.serializer<Serializer>(), in, type<
typename remove_optional<T>::type>()));
305 return deserialize_connection_id(
id);
309 return sink<T...>(make_shared<
sink_impl<Serializer, T...>>(c.get_stream(get_connection_id(in))));
314 return source<T...>(make_shared<
source_impl<Serializer, T...>>(c.get_stream(get_connection_id(in))));
319 return do_unmarshall<Serializer, Input, T...>(c, in);
324 template <
typename Serializer,
typename Input,
typename T0,
typename... Trest>
325 inline std::tuple<T0, Trest...> do_unmarshall(
connection& c, Input& in) {
328 auto rest = do_unmarshall<Serializer, Input, Trest...>(c, in);
329 return std::tuple_cat(std::move(first), std::move(rest));
332 template <
typename Serializer,
typename... T>
333 inline std::tuple<T...> unmarshall(connection& c, rcv_buf input) {
334 auto in = make_deserializer_stream(input);
335 return do_unmarshall<Serializer, decltype(in), T...>(c, in);
338 inline std::exception_ptr unmarshal_exception(rcv_buf& d) {
339 std::exception_ptr ex;
340 auto data = make_deserializer_stream(d);
343 data.read(
reinterpret_cast<char*
>(&v32), 4);
344 exception_type ex_type = exception_type(le_to_cpu(v32));
345 data.read(
reinterpret_cast<char*
>(&v32), 4);
346 uint32_t ex_len = le_to_cpu(v32);
349 case exception_type::USER: {
350 std::string s(ex_len,
'\0');
351 data.read(&*s.begin(), ex_len);
352 ex = std::make_exception_ptr(std::runtime_error(std::move(s)));
355 case exception_type::UNKNOWN_VERB: {
357 data.read(
reinterpret_cast<char*
>(&v64), 8);
358 ex = std::make_exception_ptr(unknown_verb_error(le_to_cpu(v64)));
362 ex = std::make_exception_ptr(unknown_exception_error());
368 template <
typename Payload,
typename... T>
372 template<
typename... V>
373 void set_value(V&&... v) {
375 p.
set_value(internal::untuple(std::forward<V>(v))...);
384 template<
typename Serializer,
typename T>
387 this->set_value(unmarshall<Serializer, T>(dst, std::move(input)));
391 template<
typename Serializer,
typename... T>
394 this->set_value(unmarshall<Serializer, T...>(dst, std::move(input)));
398 template<
typename Serializer>
405 template<
typename Serializer>
408 template <
typename Serializer,
typename Ret,
typename... InArgs>
412 auto lambda = [] (reply_type& r,
rpc::client& dst, id_type msg_id,
rcv_buf data)
mutable {
414 dst.get_stats_internal().replied++;
415 return r.get_reply(dst, std::move(data));
417 dst.get_stats_internal().exception_received++;
419 r.p.set_exception(unmarshal_exception(data));
422 using handler_type =
typename rpc::client::template reply_handler<reply_type, decltype(lambda)>;
423 auto r = std::make_unique<handler_type>(std::move(lambda));
424 auto fut = r->reply.p.get_future();
425 dst.wait_for_reply(msg_id, std::move(r), timeout, cancel);
429 template<
typename Serializer,
typename... InArgs>
430 inline auto wait_for_reply(no_wait_type, std::optional<rpc_clock_type::time_point>, cancellable* cancel, rpc::client& dst, id_type msg_id,
431 signature<no_wait_type (InArgs...)> sig) {
432 return make_ready_future<>();
435 template<
typename Serializer,
typename... InArgs>
436 inline auto wait_for_reply(no_wait_type, std::optional<rpc_clock_type::time_point>, cancellable* cancel, rpc::client& dst, id_type msg_id,
437 signature<future<no_wait_type> (InArgs...)> sig) {
438 return make_ready_future<>();
444 inline rpc_clock_type::time_point
445 relative_timeout_to_absolute(rpc_clock_type::duration relative) {
447 return now + std::min(relative, rpc_clock_type::time_point::max() -
now);
454 template<
typename Serializer,
typename MsgType,
typename Ret,
typename... InArgs>
455 auto send_helper(MsgType xt, signature<Ret (InArgs...)> xsig) {
458 signature<Ret (InArgs...)> sig;
459 auto send(rpc::client& dst, std::optional<rpc_clock_type::time_point> timeout, cancellable* cancel,
const InArgs&... args) {
461 using cleaned_ret_type =
typename wait_signature<Ret>::cleaned_type;
466 auto msg_id = dst.next_message_id();
467 snd_buf data = marshall(dst.template serializer<Serializer>(), 28, args...);
468 static_assert(snd_buf::chunk_size >= 28,
"send buffer chunk size is too small");
469 auto p = data.front().get_write() + 8;
470 write_le<uint64_t>(p, uint64_t(t));
471 write_le<int64_t>(p + 8, msg_id);
472 write_le<uint32_t>(p + 16, data.size - 28);
475 using wait = wait_signature_t<Ret>;
476 return when_all(dst.send(std::move(data), timeout, cancel), wait_for_reply<Serializer>(wait(), timeout, cancel, dst, msg_id, sig)).then([] (
auto r) {
477 return std::move(std::get<1>(r));
480 auto operator()(rpc::client& dst,
const InArgs&... args) {
481 return send(dst, {},
nullptr, args...);
483 auto operator()(rpc::client& dst, rpc_clock_type::time_point timeout,
const InArgs&... args) {
484 return send(dst, timeout,
nullptr, args...);
486 auto operator()(rpc::client& dst, rpc_clock_type::duration timeout,
const InArgs&... args) {
487 return send(dst, relative_timeout_to_absolute(timeout),
nullptr, args...);
489 auto operator()(rpc::client& dst, cancellable& cancel,
const InArgs&... args) {
490 return send(dst, {}, &cancel, args...);
494 return shelper{xt, xsig};
497 template<
typename Serializer,
typename SEASTAR_ELLIPSIS RetTypes>
498 inline future<> reply(wait_type, future<RetTypes SEASTAR_ELLIPSIS>&& ret, int64_t msg_id, shared_ptr<server::connection> client,
499 std::optional<rpc_clock_type::time_point> timeout) {
500 if (!client->error()) {
503 #if SEASTAR_API_LEVEL < 6
504 if constexpr (
sizeof...(RetTypes) == 0) {
506 if constexpr (std::is_void_v<RetTypes>) {
509 data = std::invoke(marshall<Serializer>,
std::ref(client->template serializer<Serializer>()), 12);
511 data = std::invoke(marshall<Serializer, const RetTypes& SEASTAR_ELLIPSIS>,
std::ref(client->template serializer<Serializer>()), 12, std::move(ret.get0()));
513 }
catch (std::exception& ex) {
514 uint32_t len = std::strlen(ex.what());
515 data = snd_buf(20 + len);
516 auto os = make_serializer_stream(data);
518 uint32_t v32 = cpu_to_le(uint32_t(exception_type::USER));
519 os.write(
reinterpret_cast<char*
>(&v32),
sizeof(v32));
520 v32 = cpu_to_le(len);
521 os.write(
reinterpret_cast<char*
>(&v32),
sizeof(v32));
522 os.write(ex.what(), len);
526 return client->respond(msg_id, std::move(data), timeout);
528 ret.ignore_ready_future();
529 return make_ready_future<>();
534 template<
typename Serializer>
535 inline future<> reply(no_wait_type, future<no_wait_type>&& r, int64_t msgid, shared_ptr<server::connection> client, std::optional<rpc_clock_type::time_point> timeout) {
538 }
catch (std::exception& ex) {
539 client->get_logger()(client->info(), msgid, to_sstring(
"exception \"") + ex.what() +
"\" in no_wait handler ignored");
541 return make_ready_future<>();
544 template<
typename Ret,
typename... InArgs,
typename WantClientInfo,
typename WantTimePoint,
typename Func,
typename ArgsTuple>
545 inline futurize_t<Ret> apply(Func& func, client_info& info, opt_time_point time_point, WantClientInfo wci, WantTimePoint wtp, signature<Ret (InArgs...)> sig, ArgsTuple&& args) {
546 using futurator = futurize<Ret>;
548 return futurator::apply(func, maybe_add_client_info(wci, info, maybe_add_time_point(wtp, time_point, std::forward<ArgsTuple>(args))));
549 }
catch (std::runtime_error& ex) {
556 auto lref_to_cref(T&& x) {
561 auto lref_to_cref(T& x) {
567 template <
typename Serializer,
typename Func,
typename Ret,
typename... InArgs,
typename WantClientInfo,
typename WantTimePoint>
568 auto recv_helper(signature<Ret (InArgs...)> sig, Func&& func, WantClientInfo wci, WantTimePoint wtp) {
569 using signature = decltype(sig);
570 using wait_style = wait_signature_t<Ret>;
571 return [func = lref_to_cref(std::forward<Func>(func))](shared_ptr<server::connection> client,
572 std::optional<rpc_clock_type::time_point> timeout,
574 rcv_buf data)
mutable {
575 auto memory_consumed = client->estimate_request_size(data.size);
576 if (memory_consumed > client->max_request_size()) {
577 auto err =
format(
"request size {:d} large than memory limit {:d}", memory_consumed, client->max_request_size());
578 client->get_logger()(client->peer_address(), err);
580 (void)try_with_gate(client->get_server().reply_gate(), [client, timeout, msg_id, err = std::move(err)] {
581 return reply<Serializer>(wait_style(),
futurize<Ret>::make_exception_future(std::runtime_error(err.c_str())), msg_id, client, timeout).handle_exception([client, msg_id] (std::exception_ptr eptr) {
582 client->get_logger()(client->info(), msg_id,
format(
"got exception while processing an oversized message: {}", eptr));
584 }).handle_exception_type([] (gate_closed_exception&) {});
588 auto f = client->wait_for_resources(memory_consumed, timeout).then([client, timeout, msg_id, data = std::move(data), &func] (
auto permit)
mutable {
590 (void)try_with_gate(client->get_server().reply_gate(), [client, timeout, msg_id, data = std::move(data), permit = std::move(permit), &func] ()
mutable {
592 auto args = unmarshall<Serializer, InArgs...>(*client, std::move(data));
593 return apply(func, client->info(), timeout, WantClientInfo(), WantTimePoint(), signature(), std::move(args)).then_wrapped([client, timeout, msg_id, permit = std::move(permit)] (futurize_t<Ret> ret) mutable {
594 return reply<Serializer>(wait_style(), std::move(ret), msg_id, client, timeout).handle_exception([permit = std::move(permit), client, msg_id] (std::exception_ptr eptr) {
595 client->get_logger()(client->info(), msg_id, format(
"got exception while processing a message: {}", eptr));
599 client->get_logger()(client->info(), msg_id,
format(
"got exception while processing a message: {}", std::current_exception()));
602 }).handle_exception_type([] (gate_closed_exception&) {});
606 f = f.handle_exception_type([] (semaphore_timed_out&) { });
614 template<
typename Func>
615 auto make_copyable_function(Func&& func, std::enable_if_t<!std::is_copy_constructible<std::decay_t<Func>>::value,
void*> =
nullptr) {
616 auto p = make_lw_shared<typename std::decay_t<Func>>(std::forward<Func>(func));
617 return [p] (
auto&&... args) {
return (*p)( std::forward<decltype(args)>(args)... ); };
620 template<
typename Func>
621 auto make_copyable_function(Func&& func, std::enable_if_t<std::is_copy_constructible<std::decay_t<Func>>::value,
void*> =
nullptr) {
622 return std::forward<Func>(func);
632 template<
typename Ret,
typename... In>
634 template<
typename T,
bool IsSmartPtr>
635 struct drop_smart_ptr_impl;
637 struct drop_smart_ptr_impl<T, true> {
638 using type =
typename T::element_type;
641 struct drop_smart_ptr_impl<T, false> {
645 using drop_smart_ptr = drop_smart_ptr_impl<T, is_smart_ptr<T>::value>;
648 using return_type =
typename drop_smart_ptr<Ret>::type;
650 using type = return_type(
typename remove_optional<In>::type...);
653 template<
typename Serializer,
typename MsgType>
654 template<
typename Ret,
typename... In>
657 return send_helper<Serializer>(t, sig_type());
660 template<
typename Serializer,
typename MsgType>
661 template<
typename Func>
666 template<
typename Serializer,
typename MsgType>
667 template<
typename Func>
670 using clean_sig_type =
typename sig_type::clean;
671 using want_client_info =
typename sig_type::want_client_info;
672 using want_time_point =
typename sig_type::want_time_point;
673 auto recv = recv_helper<Serializer>(clean_sig_type(), std::forward<Func>(func),
674 want_client_info(), want_time_point());
675 register_receiver(t,
rpc_handler{sg, make_copyable_function(std::move(recv))});
676 return make_client(clean_sig_type(), t);
679 template<
typename Serializer,
typename MsgType>
680 template<
typename Func>
685 template<
typename Serializer,
typename MsgType>
687 auto it = _handlers.find(t);
688 if (it != _handlers.end()) {
689 return it->second.use_gate.close().
finally([
this, t] {
693 return make_ready_future<>();
696 template<
typename Serializer,
typename MsgType>
698 auto it = _handlers.find(msg_id);
699 if (it == _handlers.end()) {
702 return !it->second.use_gate.is_closed();
705 template<
typename Serializer,
typename MsgType>
706 rpc_handler* protocol<Serializer, MsgType>::get_handler(uint64_t msg_id) {
707 rpc_handler* h =
nullptr;
708 auto it = _handlers.find(MsgType(msg_id));
709 if (it != _handlers.end()) {
711 it->second.use_gate.enter();
713 }
catch (gate_closed_exception&) {
720 template<
typename Serializer,
typename MsgType>
721 void protocol<Serializer, MsgType>::put_handler(rpc_handler* h) {
725 template<
typename T> T make_shard_local_buffer_copy(foreign_ptr<std::unique_ptr<T>> org);
727 template<
typename Serializer,
typename... Out>
728 future<> sink_impl<Serializer, Out...>::operator()(
const Out&... args) {
731 snd_buf data = marshall(this->_con->get()->template serializer<Serializer>(), 4, args...);
732 static_assert(snd_buf::chunk_size >= 4,
"send buffer chunk size is too small");
733 auto p = data.front().get_write();
734 write_le<uint32_t>(p, data.size - 4);
737 auto size = std::min(
size_t(data.size), max_stream_buffers_memory);
738 return get_units(this->_sem, size).then([
this, data = make_foreign(std::make_unique<snd_buf>(std::move(data)))] (semaphore_units<> su)
mutable {
744 (void)
smp::submit_to(this->_con->get_owner_shard(), [
this, data = std::move(data)] ()
mutable {
745 connection* con = this->_con->get();
749 if(con->sink_closed()) {
752 return con->send(make_shard_local_buffer_copy(std::move(data)), {},
nullptr);
753 }).then_wrapped([su = std::move(su),
this] (future<> f) {
754 if (f.failed() && !this->_ex) {
755 this->_ex = f.get_exception();
757 f.ignore_ready_future();
760 return make_ready_future<>();
764 template<
typename Serializer,
typename... Out>
765 future<> sink_impl<Serializer, Out...>::flush() {
767 return with_semaphore(this->_sem, max_stream_buffers_memory, [
this] {
775 template<
typename Serializer,
typename... Out>
776 future<> sink_impl<Serializer, Out...>::close() {
777 return with_semaphore(this->_sem, max_stream_buffers_memory, [
this] {
779 connection* con = this->_con->get();
780 if (con->sink_closed()) {
781 return make_exception_future(stream_closed());
783 future<> f = make_ready_future<>();
784 if (!con->error() && !this->_ex) {
785 snd_buf data = marshall(con->template serializer<Serializer>(), 4);
786 static_assert(snd_buf::chunk_size >= 4,
"send buffer chunk size is too small");
787 auto p = data.front().get_write();
788 write_le<uint32_t>(p, -1U);
789 f = con->send(std::move(data), {},
nullptr);
793 return f.finally([con] {
return con->close_sink(); });
798 template<
typename Serializer,
typename... Out>
799 sink_impl<Serializer, Out...>::~sink_impl() {
802 assert(this->_con->get()->sink_closed());
805 template<
typename Serializer,
typename... In>
806 future<std::optional<std::tuple<In...>>> source_impl<Serializer, In...>::operator()() {
807 auto process_one_buffer = [
this] {
808 foreign_ptr<std::unique_ptr<rcv_buf>> buf = std::move(this->_bufs.front());
809 this->_bufs.pop_front();
810 return std::apply([] (In&&... args) {
811 auto ret = std::make_optional(std::make_tuple(std::move(args)...));
813 }, unmarshall<Serializer, In...>(*this->_con->get(), make_shard_local_buffer_copy(std::move(buf))));
816 if (!this->_bufs.empty()) {
817 return process_one_buffer();
821 return smp::submit_to(this->_con->get_owner_shard(), [
this] () -> future<> {
822 connection* con = this->_con->get();
823 if (con->_source_closed) {
824 return make_exception_future<>(stream_closed());
826 return con->stream_receive(this->_bufs).then_wrapped([
this, con] (future<>&& f) {
828 return con->close_source().then_wrapped([ex = f.get_exception()] (future<> f){
829 f.ignore_ready_future();
830 return make_exception_future<>(ex);
833 if (this->_bufs.empty()) {
834 return con->close_source().then_wrapped([] (future<> f) {
835 f.ignore_ready_future();
836 return make_ready_future<>();
839 return make_ready_future<>();
841 }).then([
this, process_one_buffer] () {
842 if (this->_bufs.empty()) {
845 return process_one_buffer();
850 template<
typename... Out>
851 connection_id sink<Out...>::get_id()
const {
852 return _impl->_con->get()->get_connection_id();
855 template<
typename... In>
856 connection_id source<In...>::get_id()
const {
857 return _impl->_con->get()->get_connection_id();
860 template<
typename... In>
861 template<
typename Serializer,
typename... Out>
862 sink<Out...> source<In...>::make_sink() {
863 return sink<Out...>(make_shared<sink_impl<Serializer, Out...>>(_impl->_con));
872 struct hash<
seastar::rpc::streaming_domain_type> {
875 boost::hash_combine(h, std::hash<uint64_t>{}(domain._id));