Seastar
High performance C++ framework for concurrent servers
rpc_impl.hh
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18/*
19 * Copyright (C) 2015 Cloudius Systems, Ltd.
20 */
21#pragma once
22
23#include <seastar/core/format.hh>
24#include <seastar/core/function_traits.hh>
25#include <seastar/core/shared_ptr.hh>
26#include <seastar/core/sstring.hh>
27#include <seastar/core/when_all.hh>
28#include <seastar/util/assert.hh>
29#include <seastar/util/is_smart_ptr.hh>
30#include <seastar/core/simple-stream.hh>
31#include <seastar/net/packet-data-source.hh>
32
33#include <boost/type.hpp> // for compatibility
34
35namespace seastar {
36
37namespace rpc {
38
39enum class exception_type : uint32_t {
40 USER = 0,
41 UNKNOWN_VERB = 1,
42};
43
44template<typename T>
46 using type = T;
47};
48
49template<typename T>
51 using type = T;
52};
53
54struct wait_type {}; // opposite of no_wait_type
55
56// tags to tell whether we want a const client_info& parameter
59
60// tags to tell whether we want a opt_time_point parameter
63
64// General case
65template <typename Ret, typename... In>
66struct signature<Ret (In...)> {
67 using ret_type = Ret;
68 using arg_types = std::tuple<In...>;
69 using clean = signature;
72};
73
74// Specialize 'clean' for handlers that receive client_info
75template <typename Ret, typename... In>
76struct signature<Ret (const client_info&, In...)> {
77 using ret_type = Ret;
78 using arg_types = std::tuple<In...>;
79 using clean = signature<Ret (In...)>;
82};
83
84template <typename Ret, typename... In>
85struct signature<Ret (client_info&, In...)> {
86 using ret_type = Ret;
87 using arg_types = std::tuple<In...>;
88 using clean = signature<Ret (In...)>;
91};
92
93// Specialize 'clean' for handlers that receive client_info and opt_time_point
94template <typename Ret, typename... In>
95struct signature<Ret (const client_info&, opt_time_point, In...)> {
96 using ret_type = Ret;
97 using arg_types = std::tuple<In...>;
98 using clean = signature<Ret (In...)>;
101};
102
103template <typename Ret, typename... In>
104struct signature<Ret (client_info&, opt_time_point, In...)> {
105 using ret_type = Ret;
106 using arg_types = std::tuple<In...>;
107 using clean = signature<Ret (In...)>;
110};
111
112// Specialize 'clean' for handlers that receive opt_time_point
113template <typename Ret, typename... In>
114struct signature<Ret (opt_time_point, In...)> {
115 using ret_type = Ret;
116 using arg_types = std::tuple<In...>;
117 using clean = signature<Ret (In...)>;
120};
121
122template <typename T>
124 using type = wait_type;
125 using cleaned_type = T;
126};
127
128template <typename... T>
129struct wait_signature<future<T...>> {
130 using type = wait_type;
131 using cleaned_type = future<T...>;
132};
133
134template <>
136 using type = no_wait_type;
137 using cleaned_type = void;
138};
139
140template <>
142 using type = no_wait_type;
143 using cleaned_type = future<>;
144};
145
146template <typename T>
147using wait_signature_t = typename wait_signature<T>::type;
148
149template <typename... In>
150inline
151std::tuple<In...>
152maybe_add_client_info(dont_want_client_info, client_info&, std::tuple<In...>&& args) {
153 return std::move(args);
154}
155
156template <typename... In>
157inline
158std::tuple<std::reference_wrapper<client_info>, In...>
159maybe_add_client_info(do_want_client_info, client_info& ci, std::tuple<In...>&& args) {
160 return std::tuple_cat(std::make_tuple(std::ref(ci)), std::move(args));
161}
162
163template <typename... In>
164inline
165std::tuple<In...>
166maybe_add_time_point(dont_want_time_point, opt_time_point&, std::tuple<In...>&& args) {
167 return std::move(args);
168}
169
170template <typename... In>
171inline
172std::tuple<opt_time_point, In...>
173maybe_add_time_point(do_want_time_point, opt_time_point& otp, std::tuple<In...>&& args) {
174 return std::tuple_cat(std::make_tuple(otp), std::move(args));
175}
176
177inline sstring serialize_connection_id(const connection_id& id) {
178 sstring p = uninitialized_string(sizeof(id));
179 auto c = p.data();
180 write_le(c, id.id());
181 return p;
182}
183
184inline connection_id deserialize_connection_id(const sstring& s) {
185 using id_type = decltype(connection_id{0}.id());
186 auto p = s.c_str();
187 auto id = read_le<id_type>(p);
188 return connection_id{id};
189}
190
191template <bool IsSmartPtr>
193
194template <>
195struct serialize_helper<false> {
196 template <typename Serializer, typename Output, typename T>
197 static inline void serialize(Serializer& serializer, Output& out, const T& t) {
198 return write(serializer, out, t);
199 }
200};
201
202template <>
203struct serialize_helper<true> {
204 template <typename Serializer, typename Output, typename T>
205 static inline void serialize(Serializer& serializer, Output& out, const T& t) {
206 return write(serializer, out, *t);
207 }
208};
209
210template <typename Serializer, typename Output, typename... T>
211inline void do_marshall(Serializer& serializer, Output& out, const T&... args);
212
213template <typename Serializer, typename Output>
215 template <typename T> struct helper {
216 static void doit(Serializer& serializer, Output& out, const T& arg) {
217 using serialize_helper_type = serialize_helper<is_smart_ptr<typename std::remove_reference_t<T>>::value>;
218 serialize_helper_type::serialize(serializer, out, arg);
219 }
220 };
221 template<typename T> struct helper<std::reference_wrapper<const T>> {
222 static void doit(Serializer& serializer, Output& out, const std::reference_wrapper<const T>& arg) {
223 helper<T>::doit(serializer, out, arg.get());
224 }
225 };
226 static void put_connection_id(const connection_id& cid, Output& out) {
227 sstring id = serialize_connection_id(cid);
228 out.write(id.c_str(), id.size());
229 }
230 template <typename... T> struct helper<sink<T...>> {
231 static void doit(Serializer&, Output& out, const sink<T...>& arg) {
232 put_connection_id(arg.get_id(), out);
233 }
234 };
235 template <typename... T> struct helper<source<T...>> {
236 static void doit(Serializer&, Output& out, const source<T...>& arg) {
237 put_connection_id(arg.get_id(), out);
238 }
239 };
240 template <typename... T> struct helper<tuple<T...>> {
241 static void doit(Serializer& serializer, Output& out, const tuple<T...>& arg) {
242 auto do_do_marshall = [&serializer, &out] (const auto&... args) {
243 do_marshall(serializer, out, args...);
244 };
245 // since C++23, std::apply() only accepts tuple-like types, while
246 // rpc::tuple is not a tuple-like type from the tuple-like C++
247 // concept's perspective. so we have to cast it to std::tuple to
248 // appease std::apply()
249 std::apply(do_do_marshall, static_cast<const std::tuple<T...>&>(arg));
250 }
251 };
252};
253
254template <typename Serializer, typename Output, typename... T>
255inline void do_marshall(Serializer& serializer, Output& out, const T&... args) {
256 // C++ guarantees that brace-initialization expressions are evaluted in order
257 (void)std::initializer_list<int>{(marshall_one<Serializer, Output>::template helper<T>::doit(serializer, out, args), 1)...};
258}
259
260static inline memory_output_stream<snd_buf::iterator> make_serializer_stream(snd_buf& output) {
261 auto* b = std::get_if<temporary_buffer<char>>(&output.bufs);
262 if (b) {
263 return memory_output_stream<snd_buf::iterator>(memory_output_stream<snd_buf::iterator>::simple(b->get_write(), b->size()));
264 } else {
265 auto& ar = std::get<std::vector<temporary_buffer<char>>>(output.bufs);
266 return memory_output_stream<snd_buf::iterator>(memory_output_stream<snd_buf::iterator>::fragmented(ar.begin(), output.size));
267 }
268}
269
270template <typename Serializer, typename... T>
271inline snd_buf marshall(Serializer& serializer, size_t head_space, const T&... args) {
272 measuring_output_stream measure;
273 do_marshall(serializer, measure, args...);
274 snd_buf ret(measure.size() + head_space);
275 auto out = make_serializer_stream(ret);
276 out.skip(head_space);
277 do_marshall(serializer, out, args...);
278 return ret;
279}
280
281template <typename Serializer, typename Input, typename... T>
282std::tuple<T...> do_unmarshall(connection& c, Input& in);
283
284// The protocol to call the serializer is read(serializer, stream, rpc::type<T>).
285// However, some users (ahem) used boost::type instead of rpc::type when the two
286// types were aliased, preventing us from moving to the newer std::type_identity.
287// To preserve compatibility, calls to read() are routed through
288// read_via_type_marker(), of which there are two variants, one for
289// boost::type (marked as deprecated) and one for std::type_identity.
290
291template <typename T, typename... Args>
292requires requires (Args... args, type<T> t) { read(std::forward<Args>(args)..., t); }
293auto
294read_via_type_marker(Args&&... args) {
295 return read(std::forward<Args>(args)..., type<T>());
296}
297
298template <typename T, typename... Args>
299requires requires (Args... args, boost::type<T> t) { read(std::forward<Args>(args)..., t); }
300[[deprecated("Use rpc::type<> instead of boost::type<>")]]
301auto
302read_via_type_marker(Args&&... args) {
303 return read(std::forward<Args>(args)..., boost::type<T>());
304}
305
306template<typename Serializer, typename Input>
308 template<typename T> struct helper {
309 static T doit(connection& c, Input& in) {
310 return read_via_type_marker<T>(c.serializer<Serializer>(), in);
311 }
312 };
313 template<typename T> struct helper<optional<T>> {
314 static optional<T> doit(connection& c, Input& in) {
315 if (in.size()) {
316 return optional<T>(read_via_type_marker<typename remove_optional<T>::type>(c.serializer<Serializer>(), in));
317 } else {
318 return optional<T>();
319 }
320 }
321 };
322 template<typename T> struct helper<std::reference_wrapper<const T>> {
323 static T doit(connection& c, Input& in) {
324 return helper<T>::doit(c, in);
325 }
326 };
327 static connection_id get_connection_id(Input& in) {
328 sstring id = uninitialized_string(sizeof(connection_id));
329 in.read(id.data(), sizeof(connection_id));
330 return deserialize_connection_id(id);
331 }
332 template<typename... T> struct helper<sink<T...>> {
333 static sink<T...> doit(connection& c, Input& in) {
334 return sink<T...>(make_shared<sink_impl<Serializer, T...>>(c.get_stream(get_connection_id(in))));
335 }
336 };
337 template<typename... T> struct helper<source<T...>> {
338 static source<T...> doit(connection& c, Input& in) {
339 return source<T...>(make_shared<source_impl<Serializer, T...>>(c.get_stream(get_connection_id(in))));
340 }
341 };
342 template <typename... T> struct helper<tuple<T...>> {
343 static tuple<T...> doit(connection& c, Input& in) {
344 return do_unmarshall<Serializer, Input, T...>(c, in);
345 }
346 };
347};
348
349template <typename... T>
351
352template <>
354 using type = std::tuple<>;
355};
356
357template <typename T0, typename... T>
359 using type = std::tuple<
360 T0,
361 std::conditional_t<
362 std::is_default_constructible_v<T>,
363 T,
364 std::optional<T>
365 >...
366 >;
367};
368
369template <typename... T>
370using default_constructible_tuple_except_first_t = typename default_constructible_tuple_except_first<T...>::type;
371
372// Where Tin != Tout, apply std:optional::value()
373template <typename... Tout, typename... Tin>
374auto
375unwrap_optional_if_needed(std::tuple<Tin...>&& tuple_in) {
376 using tuple_in_t = std::tuple<Tin...>;
377 using tuple_out_t = std::tuple<Tout...>;
378 return std::invoke([&] <size_t... Idx> (std::index_sequence<Idx...>) {
379 return tuple_out_t(
380 std::invoke([&] () {
381 if constexpr (std::same_as<std::tuple_element_t<Idx, tuple_in_t>, std::tuple_element_t<Idx, tuple_out_t>>) {
382 return std::move(std::get<Idx>(tuple_in));
383 } else {
384 return std::move(std::get<Idx>(tuple_in).value());
385 }
386 })...);
387 }, std::make_index_sequence<sizeof...(Tout)>());
388}
389
390template <typename Serializer, typename Input, typename... T>
391inline std::tuple<T...> do_unmarshall(connection& c, Input& in) {
392 // Argument order processing is unspecified, but we need to deserialize
393 // left-to-right. So we deserialize into something that can be lazily
394 // constructed (and can conditionally destroy itself if we only constructed some
395 // of the arguments).
396 //
397 // The first element of the tuple has no ordering
398 // problem, and we can deserialize directly into a std::tuple<T...>.
399 //
400 // For the rest of the elements, if they are default-constructible, we leave
401 // them as is, and if not, we deserialize into std::optional<T>, and later
402 // unwrap them. If we're lucky and nothing was wrapped, we can return without
403 // any data movement.
404 using ret_type = std::tuple<T...>;
405 using temporary_type = default_constructible_tuple_except_first_t<T...>;
406 return std::invoke([&] <size_t... Idx> (std::index_sequence<Idx...>) {
407 auto tmp = temporary_type(
408 std::invoke([&] () -> std::tuple_element_t<Idx, temporary_type> {
409 if constexpr (Idx == 0) {
410 // The first T has no ordering problem, so we can deserialize it directly into the tuple
411 return unmarshal_one<Serializer, Input>::template helper<std::tuple_element_t<Idx, ret_type>>::doit(c, in);
412 } else {
413 // Use default constructor for the rest of the Ts
414 return {};
415 }
416 })...
417 );
418 // Deserialize the other Ts, comma-expression preserves left-to-right order.
419 (void)(..., ((Idx == 0
420 ? 0
421 : ((std::get<Idx>(tmp) = unmarshal_one<Serializer, Input>::template helper<std::tuple_element_t<Idx, ret_type>>::doit(c, in), 0)))));
422 if constexpr (std::same_as<ret_type, temporary_type>) {
423 // Use Named Return Vale Optimization (NVRO) if we didn't have to wrap anything
424 return tmp;
425 } else {
426 return unwrap_optional_if_needed<T...>(std::move(tmp));
427 }
428 }, std::index_sequence_for<T...>());
429}
430
431template <typename Serializer, typename... T>
432inline std::tuple<T...> unmarshall(connection& c, rcv_buf input) {
433 auto in = make_deserializer_stream(input);
434 return do_unmarshall<Serializer, decltype(in), T...>(c, in);
435}
436
437inline std::exception_ptr unmarshal_exception(rcv_buf& d) {
438 std::exception_ptr ex;
439 auto data = make_deserializer_stream(d);
440
441 uint32_t v32;
442 data.read(reinterpret_cast<char*>(&v32), 4);
443 exception_type ex_type = exception_type(le_to_cpu(v32));
444 data.read(reinterpret_cast<char*>(&v32), 4);
445 uint32_t ex_len = le_to_cpu(v32);
446
447 switch (ex_type) {
448 case exception_type::USER: {
449 std::string s(ex_len, '\0');
450 data.read(&*s.begin(), ex_len);
451 ex = std::make_exception_ptr(remote_verb_error(std::move(s)));
452 break;
453 }
454 case exception_type::UNKNOWN_VERB: {
455 uint64_t v64;
456 data.read(reinterpret_cast<char*>(&v64), 8);
457 ex = std::make_exception_ptr(unknown_verb_error(le_to_cpu(v64)));
458 break;
459 }
460 default:
461 ex = std::make_exception_ptr(unknown_exception_error());
462 break;
463 }
464 return ex;
465}
466
467template <typename Payload, typename... T>
469 bool done = false;
470 promise<T...> p;
471 template<typename... V>
472 void set_value(V&&... v) {
473 done = true;
474 p.set_value(internal::untuple(std::forward<V>(v))...);
475 }
477 if (!done) {
478 p.set_exception(closed_error());
479 }
480 }
481};
482
483template<typename Serializer, typename T>
484struct rcv_reply : rcv_reply_base<T, T> {
485 inline void get_reply(rpc::client& dst, rcv_buf input) {
486 this->set_value(unmarshall<Serializer, T>(dst, std::move(input)));
487 }
488};
489
490template<typename Serializer, typename... T>
491struct rcv_reply<Serializer, future<T...>> : rcv_reply_base<std::tuple<T...>, T...> {
492 inline void get_reply(rpc::client& dst, rcv_buf input) {
493 this->set_value(unmarshall<Serializer, T...>(dst, std::move(input)));
494 }
495};
496
497template<typename Serializer>
498struct rcv_reply<Serializer, void> : rcv_reply_base<void, void> {
499 inline void get_reply(rpc::client&, rcv_buf) {
500 this->set_value();
501 }
502};
503
504template<typename Serializer>
505struct rcv_reply<Serializer, future<>> : rcv_reply<Serializer, void> {};
506
507template <typename Serializer, typename Ret, typename... InArgs>
508inline auto wait_for_reply(wait_type, std::optional<rpc_clock_type::time_point> timeout, rpc_clock_type::time_point start, cancellable* cancel, rpc::client& dst, id_type msg_id,
509 signature<Ret (InArgs...)>) {
510 using reply_type = rcv_reply<Serializer, Ret>;
511 auto lambda = [] (reply_type& r, rpc::client& dst, id_type msg_id, rcv_buf data) mutable {
512 if (msg_id >= 0) {
513 dst.get_stats_internal().replied++;
514 return r.get_reply(dst, std::move(data));
515 } else {
516 dst.get_stats_internal().exception_received++;
517 r.done = true;
518 r.p.set_exception(unmarshal_exception(data));
519 }
520 };
521 using handler_type = typename rpc::client::template reply_handler<reply_type, decltype(lambda)>;
522 auto r = std::make_unique<handler_type>(std::move(lambda));
523 r->start = start;
524 auto fut = r->reply.p.get_future();
525 dst.wait_for_reply(msg_id, std::move(r), timeout, cancel);
526 return fut;
527}
528
529template<typename Serializer, typename... InArgs>
530inline auto wait_for_reply(no_wait_type, std::optional<rpc_clock_type::time_point>, rpc_clock_type::time_point start, cancellable*, rpc::client&, id_type,
531 signature<no_wait_type (InArgs...)>) { // no_wait overload
532 return make_ready_future<>();
533}
534
535template<typename Serializer, typename... InArgs>
536inline auto wait_for_reply(no_wait_type, std::optional<rpc_clock_type::time_point>, rpc_clock_type::time_point, cancellable*, rpc::client&, id_type,
537 signature<future<no_wait_type> (InArgs...)>) { // future<no_wait> overload
538 return make_ready_future<>();
539}
540
541// Convert a relative timeout (a duration) to an absolute one (time_point).
542// Do the calculation safely so that a very large duration will be capped by
543// time_point::max, instead of wrapping around to ancient history.
544inline rpc_clock_type::time_point
545relative_timeout_to_absolute(rpc_clock_type::duration relative) {
546 rpc_clock_type::time_point now = rpc_clock_type::now();
547 return now + std::min(relative, rpc_clock_type::time_point::max() - now);
548}
549
550// Refer to struct request_frame for more details
551static constexpr size_t request_frame_headroom = 28;
552
553// Returns lambda that can be used to send rpc messages.
554// The lambda gets client connection and rpc parameters as arguments, marshalls them sends
555// to a server and waits for a reply. After receiving reply it unmarshalls it and signal completion
556// to a caller.
557template<typename Serializer, typename MsgType, typename Ret, typename... InArgs>
558auto send_helper(MsgType xt, signature<Ret (InArgs...)> xsig) {
559 struct shelper {
560 MsgType t;
561 signature<Ret (InArgs...)> sig;
562 auto send(rpc::client& dst, std::optional<rpc_clock_type::time_point> timeout, cancellable* cancel, const InArgs&... args) {
563 if (dst.error()) {
564 using cleaned_ret_type = typename wait_signature<Ret>::cleaned_type;
565 return futurize<cleaned_ret_type>::make_exception_future(closed_error());
566 }
567
568 auto start = rpc_clock_type::now();
569 // send message
570 auto msg_id = dst.next_message_id();
571 snd_buf data = marshall(dst.template serializer<Serializer>(), request_frame_headroom, args...);
572
573 // prepare reply handler, if return type is now_wait_type this does nothing, since no reply will be sent
574 using wait = wait_signature_t<Ret>;
575 return when_all(dst.request(uint64_t(t), msg_id, std::move(data), timeout, cancel), wait_for_reply<Serializer>(wait(), timeout, start, cancel, dst, msg_id, sig)).then([] (auto r) {
576 std::get<0>(r).ignore_ready_future();
577 return std::move(std::get<1>(r)); // return future of wait_for_reply
578 });
579 }
580 auto operator()(rpc::client& dst, const InArgs&... args) {
581 return send(dst, {}, nullptr, args...);
582 }
583 auto operator()(rpc::client& dst, rpc_clock_type::time_point timeout, const InArgs&... args) {
584 return send(dst, timeout, nullptr, args...);
585 }
586 auto operator()(rpc::client& dst, rpc_clock_type::time_point timeout, cancellable& cancel, const InArgs&... args) {
587 return send(dst, timeout, &cancel, args...);
588 }
589 auto operator()(rpc::client& dst, rpc_clock_type::duration timeout, const InArgs&... args) {
590 return send(dst, relative_timeout_to_absolute(timeout), nullptr, args...);
591 }
592 auto operator()(rpc::client& dst, rpc_clock_type::duration timeout, cancellable& cancel, const InArgs&... args) {
593 return send(dst, relative_timeout_to_absolute(timeout), &cancel, args...);
594 }
595 auto operator()(rpc::client& dst, cancellable& cancel, const InArgs&... args) {
596 return send(dst, {}, &cancel, args...);
597 }
598
599 };
600 return shelper{xt, xsig};
601}
602
603// Refer to struct response_frame for more details
604static constexpr size_t response_frame_headroom = 16;
605
606template<typename Serializer, typename RetTypes>
607inline future<> reply(wait_type, future<RetTypes>&& ret, int64_t msg_id, shared_ptr<server::connection> client,
608 std::optional<rpc_clock_type::time_point> timeout, std::optional<rpc_clock_type::duration> handler_duration) {
609 if (!client->error()) {
610 snd_buf data;
611 try {
612 if constexpr (std::is_void_v<RetTypes>) {
613 ret.get();
614 data = std::invoke(marshall<Serializer>, std::ref(client->template serializer<Serializer>()), response_frame_headroom);
615 } else {
616 data = std::invoke(marshall<Serializer, const RetTypes&>, std::ref(client->template serializer<Serializer>()), response_frame_headroom, std::move(ret.get()));
617 }
618 } catch (std::exception& ex) {
619 uint32_t len = std::strlen(ex.what());
620 data = snd_buf(response_frame_headroom + 2 * sizeof(uint32_t) + len);
621 auto os = make_serializer_stream(data);
622 os.skip(response_frame_headroom);
623 uint32_t v32 = cpu_to_le(uint32_t(exception_type::USER));
624 os.write(reinterpret_cast<char*>(&v32), sizeof(v32));
625 v32 = cpu_to_le(len);
626 os.write(reinterpret_cast<char*>(&v32), sizeof(v32));
627 os.write(ex.what(), len);
628 msg_id = -msg_id;
629 }
630
631 return client->respond(msg_id, std::move(data), timeout, handler_duration);
632 } else {
633 ret.ignore_ready_future();
634 return make_ready_future<>();
635 }
636}
637
638// specialization for no_wait_type which does not send a reply
639template<typename Serializer>
640inline future<> reply(no_wait_type, future<no_wait_type>&& r, int64_t msgid, shared_ptr<server::connection> client,
641 std::optional<rpc_clock_type::time_point>, std::optional<rpc_clock_type::duration>) {
642 try {
643 r.get();
644 } catch (std::exception& ex) {
645 client->get_logger()(client->info(), msgid, to_sstring("exception \"") + ex.what() + "\" in no_wait handler ignored");
646 }
647 return make_ready_future<>();
648}
649
650template<typename Ret, typename... InArgs, typename WantClientInfo, typename WantTimePoint, typename Func, typename ArgsTuple>
651inline futurize_t<Ret> apply(Func& func, client_info& info, opt_time_point time_point, WantClientInfo wci, WantTimePoint wtp, signature<Ret (InArgs...)>, ArgsTuple&& args) {
652 using futurator = futurize<Ret>;
653 return futurator::apply(func, maybe_add_client_info(wci, info, maybe_add_time_point(wtp, time_point, std::forward<ArgsTuple>(args))));
654}
655
656// lref_to_cref is a helper that encapsulates lvalue reference in std::ref() or does nothing otherwise
657template<typename T>
658auto lref_to_cref(T&& x) {
659 return std::move(x);
660}
661
662template<typename T>
663auto lref_to_cref(T& x) {
664 return std::ref(x);
665}
666
667// Creates lambda to handle RPC message on a server.
668// The lambda unmarshalls all parameters, calls a handler, marshall return values and sends them back to a client
669template <typename Serializer, typename Func, typename Ret, typename... InArgs, typename WantClientInfo, typename WantTimePoint>
670auto recv_helper(signature<Ret (InArgs...)> sig, Func&& func, WantClientInfo, WantTimePoint) {
671 using signature = decltype(sig);
672 using wait_style = wait_signature_t<Ret>;
673 return [func = lref_to_cref(std::forward<Func>(func))](shared_ptr<server::connection> client,
674 std::optional<rpc_clock_type::time_point> timeout,
675 int64_t msg_id,
676 rcv_buf data,
677 gate::holder guard) mutable {
678 auto memory_consumed = client->estimate_request_size(data.size);
679 if (memory_consumed > client->max_request_size()) {
680 auto err = format("request size {:d} large than memory limit {:d}", memory_consumed, client->max_request_size());
681 client->get_logger()(client->peer_address(), err);
682 // FIXME: future is discarded
683 (void)try_with_gate(client->get_server().reply_gate(), [client, timeout, msg_id, err = std::move(err)] {
684 return reply<Serializer>(wait_style(), futurize<Ret>::make_exception_future(std::runtime_error(err.c_str())), msg_id, client, timeout, std::nullopt).handle_exception([client, msg_id] (std::exception_ptr eptr) {
685 client->get_logger()(client->info(), msg_id, seastar::format("got exception while processing an oversized message: {}", eptr));
686 });
687 }).handle_exception_type([] (gate_closed_exception&) {/* ignore */});
688 return make_ready_future();
689 }
690 // note: apply is executed asynchronously with regards to networking so we cannot chain futures here by doing "return apply()"
691 auto f = client->wait_for_resources(memory_consumed, timeout).then([client, timeout, msg_id, data = std::move(data), &func, g = std::move(guard)] (auto permit) mutable {
692 // FIXME: future is discarded
693 (void)try_with_gate(client->get_server().reply_gate(), [client, timeout, msg_id, data = std::move(data), permit = std::move(permit), &func] () mutable {
694 try {
695 auto args = unmarshall<Serializer, InArgs...>(*client, std::move(data));
696 auto start = rpc_clock_type::now();
697 return apply(func, client->info(), timeout, WantClientInfo(), WantTimePoint(), signature(), std::move(args)).then_wrapped([client, timeout, msg_id, permit = std::move(permit), start] (futurize_t<Ret> ret) mutable {
698 return reply<Serializer>(wait_style(), std::move(ret), msg_id, client, timeout, rpc_clock_type::now() - start).handle_exception([permit = std::move(permit), client, msg_id] (std::exception_ptr eptr) {
699 client->get_logger()(client->info(), msg_id, seastar::format("got exception while processing a message: {}", eptr));
700 });
701 });
702 } catch (...) {
703 client->get_logger()(client->info(), msg_id, seastar::format("caught exception while processing a message: {}", std::current_exception()));
704 return make_ready_future();
705 }
706 }).handle_exception_type([g = std::move(g)] (gate_closed_exception&) {/* ignore */});
707 });
708
709 if (timeout) {
710 f = f.handle_exception_type([] (semaphore_timed_out&) { /* ignore */ });
711 }
712
713 return f;
714 };
715}
716
717// helper to create copy constructible lambda from non copy constructible one. std::function<> works only with former kind.
718template<typename Func>
719auto make_copyable_function(Func&& func, std::enable_if_t<!std::is_copy_constructible_v<std::decay_t<Func>>, void*> = nullptr) {
720 auto p = make_lw_shared<typename std::decay_t<Func>>(std::forward<Func>(func));
721 return [p] (auto&&... args) { return (*p)( std::forward<decltype(args)>(args)... ); };
722}
723
724template<typename Func>
725auto make_copyable_function(Func&& func, std::enable_if_t<std::is_copy_constructible_v<std::decay_t<Func>>, void*> = nullptr) {
726 return std::forward<Func>(func);
727}
728
729// This class is used to calculate client side rpc function signature.
730// Return type is converted from a smart pointer to a type it points to.
731// rpc::optional are converted to non optional type.
732//
733// Examples:
734// std::unique_ptr<int>(int, rpc::optional<long>) -> int(int, long)
735// double(float) -> double(float)
736template<typename Ret, typename... In>
738 template<typename T, bool IsSmartPtr>
739 struct drop_smart_ptr_impl;
740 template<typename T>
741 struct drop_smart_ptr_impl<T, true> {
742 using type = typename T::element_type;
743 };
744 template<typename T>
745 struct drop_smart_ptr_impl<T, false> {
746 using type = T;
747 };
748 template<typename T>
749 using drop_smart_ptr = drop_smart_ptr_impl<T, is_smart_ptr<T>::value>;
750
751 // if return type is smart ptr take a type it points to instead
752 using return_type = typename drop_smart_ptr<Ret>::type;
753public:
754 using type = return_type(typename remove_optional<In>::type...);
755};
756
757template<typename Serializer, typename MsgType>
758template<typename Ret, typename... In>
759auto protocol<Serializer, MsgType>::make_client(signature<Ret(In...)>, MsgType t) {
760 using sig_type = signature<typename client_function_type<Ret, In...>::type>;
761 return send_helper<Serializer>(t, sig_type());
762}
763
764template<typename Serializer, typename MsgType>
765template<typename Func>
767 return make_client(typename signature<typename function_traits<Func>::signature>::clean(), t);
768}
769
770template<typename Serializer, typename MsgType>
771template<typename Func>
774 using clean_sig_type = typename sig_type::clean;
775 using want_client_info = typename sig_type::want_client_info;
776 using want_time_point = typename sig_type::want_time_point;
777 auto recv = recv_helper<Serializer>(clean_sig_type(), std::forward<Func>(func),
778 want_client_info(), want_time_point());
779 register_receiver(t, rpc_handler{sg, make_copyable_function(std::move(recv)), {}});
780 return make_client(clean_sig_type(), t);
781}
782
783template<typename Serializer, typename MsgType>
784template<typename Func>
786 return register_handler(t, scheduling_group(), std::forward<Func>(func));
787}
788
789template<typename Serializer, typename MsgType>
791 auto it = _handlers.find(t);
792 if (it != _handlers.end()) {
793 return it->second.use_gate.close().finally([this, t] {
794 _handlers.erase(t);
795 });
796 }
797 return make_ready_future<>();
798}
799
800template<typename Serializer, typename MsgType>
802 auto it = _handlers.find(msg_id);
803 if (it == _handlers.end()) {
804 return false;
805 }
806 return !it->second.use_gate.is_closed();
807}
808
809template<typename Serializer, typename MsgType>
810std::optional<protocol_base::handler_with_holder> protocol<Serializer, MsgType>::get_handler(uint64_t msg_id) {
811 const auto it = _handlers.find(MsgType(msg_id));
812 if (it != _handlers.end()) {
813 try {
814 return handler_with_holder{it->second, it->second.use_gate.hold()};
815 } catch (gate_closed_exception&) {
816 // unregistered, just ignore
817 }
818 }
819 return std::nullopt;
820}
821
822template<typename T> T make_shard_local_buffer_copy(foreign_ptr<std::unique_ptr<T>> org);
823
824template<typename Serializer, typename... Out>
825future<> sink_impl<Serializer, Out...>::operator()(const Out&... args) {
826 // note that we use remote serializer pointer, so if serailizer needs a state
827 // it should have per-cpu one
828 snd_buf data = marshall(this->_con->get()->template serializer<Serializer>(), 4, args...);
829 static_assert(snd_buf::chunk_size >= 4, "send buffer chunk size is too small");
830 auto p = data.front().get_write();
831 write_le<uint32_t>(p, data.size - 4);
832 // we do not want to dead lock on huge packets, so let them in
833 // but only one at a time
834 auto size = std::min(size_t(data.size), max_stream_buffers_memory);
835 const auto seq_num = _next_seq_num++;
836 return get_units(this->_sem, size).then([this, data = make_foreign(std::make_unique<snd_buf>(std::move(data))), seq_num] (semaphore_units<> su) mutable {
837 if (this->_ex) {
838 return make_exception_future(this->_ex);
839 }
840 // It is OK to discard this future. The user is required to
841 // wait for it when closing.
842 (void)smp::submit_to(this->_con->get_owner_shard(), [this, data = std::move(data), seq_num] () mutable {
843 connection* con = this->_con->get();
844 if (con->error()) {
845 return make_exception_future(closed_error());
846 }
847 if(con->sink_closed()) {
848 return make_exception_future(stream_closed());
849 }
850
851 auto& last_seq_num = _remote_state.last_seq_num;
852 auto& out_of_order_bufs = _remote_state.out_of_order_bufs;
853
854 auto local_data = make_shard_local_buffer_copy(std::move(data));
855 const auto seq_num_diff = seq_num - last_seq_num;
856 if (seq_num_diff > 1) {
857 auto [it, _] = out_of_order_bufs.emplace(seq_num, deferred_snd_buf{promise<>{}, std::move(local_data)});
858 return it->second.pr.get_future();
859 }
860
861 last_seq_num = seq_num;
862 auto ret_fut = con->send(std::move(local_data), {}, nullptr);
863 while (!out_of_order_bufs.empty() && out_of_order_bufs.begin()->first == (last_seq_num + 1)) {
864 auto it = out_of_order_bufs.begin();
865 last_seq_num = it->first;
866 auto fut = con->send(std::move(it->second.data), {}, nullptr);
867 fut.forward_to(std::move(it->second.pr));
868 out_of_order_bufs.erase(it);
869 }
870 return ret_fut;
871 }).then_wrapped([su = std::move(su), this] (future<> f) {
872 if (f.failed() && !this->_ex) { // first error is the interesting one
873 this->_ex = f.get_exception();
874 } else {
875 f.ignore_ready_future();
876 }
877 });
878 return make_ready_future<>();
879 });
880}
881
882template<typename Serializer, typename... Out>
883future<> sink_impl<Serializer, Out...>::flush() {
884 // wait until everything is sent out before returning.
885 return with_semaphore(this->_sem, max_stream_buffers_memory, [this] {
886 if (this->_ex) {
887 return make_exception_future(this->_ex);
888 }
889 return make_ready_future();
890 });
891}
892
893template<typename Serializer, typename... Out>
894future<> sink_impl<Serializer, Out...>::close() {
895 return with_semaphore(this->_sem, max_stream_buffers_memory, [this] {
896 return smp::submit_to(this->_con->get_owner_shard(), [this] {
897 connection* con = this->_con->get();
898 if (con->sink_closed()) { // double close, should not happen!
899 return make_exception_future(stream_closed());
900 }
901 future<> f = make_ready_future<>();
902 if (!con->error() && !this->_ex) {
903 snd_buf data = marshall(con->template serializer<Serializer>(), 4);
904 static_assert(snd_buf::chunk_size >= 4, "send buffer chunk size is too small");
905 auto p = data.front().get_write();
906 write_le<uint32_t>(p, -1U); // max len fragment marks an end of a stream
907 f = con->send(std::move(data), {}, nullptr);
908 } else {
909 f = this->_ex ? make_exception_future(this->_ex) : make_exception_future(closed_error());
910 }
911 return f.finally([con] { return con->close_sink(); });
912 });
913 });
914}
915
916template<typename Serializer, typename... Out>
917sink_impl<Serializer, Out...>::~sink_impl() {
918 // A failure to close might leave some continuations running after
919 // this is destroyed, leading to use-after-free bugs.
920 SEASTAR_ASSERT(this->_con->get()->sink_closed());
921}
922
923template<typename Serializer, typename... In>
924future<std::optional<std::tuple<In...>>> source_impl<Serializer, In...>::operator()() {
925 auto process_one_buffer = [this] {
926 foreign_ptr<std::unique_ptr<rcv_buf>> buf = std::move(this->_bufs.front());
927 this->_bufs.pop_front();
928 return std::apply([] (In&&... args) {
929 auto ret = std::make_optional(std::make_tuple(std::move(args)...));
930 return make_ready_future<std::optional<std::tuple<In...>>>(std::move(ret));
931 }, unmarshall<Serializer, In...>(*this->_con->get(), make_shard_local_buffer_copy(std::move(buf))));
932 };
933
934 if (!this->_bufs.empty()) {
935 return process_one_buffer();
936 }
937
938 // refill buffers from remote cpu
939 return smp::submit_to(this->_con->get_owner_shard(), [this] () -> future<> {
940 connection* con = this->_con->get();
941 if (con->_source_closed) {
942 return make_exception_future<>(stream_closed());
943 }
944 return con->stream_receive(this->_bufs).then_wrapped([this, con] (future<>&& f) {
945 if (f.failed()) {
946 return con->close_source().then_wrapped([ex = f.get_exception()] (future<> f){
947 f.ignore_ready_future();
948 return make_exception_future<>(ex);
949 });
950 }
951 if (this->_bufs.empty()) { // nothing to read -> eof
952 return con->close_source().then_wrapped([] (future<> f) {
953 f.ignore_ready_future();
954 return make_ready_future<>();
955 });
956 }
957 return make_ready_future<>();
958 });
959 }).then([this, process_one_buffer] () {
960 if (this->_bufs.empty()) {
961 return make_ready_future<std::optional<std::tuple<In...>>>(std::nullopt);
962 } else {
963 return process_one_buffer();
964 }
965 });
966}
967
968template<typename... Out>
969connection_id sink<Out...>::get_id() const {
970 return _impl->_con->get()->get_connection_id();
971}
972
973template<typename... In>
974connection_id source<In...>::get_id() const {
975 return _impl->_con->get()->get_connection_id();
976}
977
978template<typename... In>
979template<typename Serializer, typename... Out>
980sink<Out...> source<In...>::make_sink() {
981 return sink<Out...>(make_shared<sink_impl<Serializer, Out...>>(_impl->_con));
982}
983
984}
985
986}
987
988namespace std {
989template<>
990struct hash<seastar::rpc::streaming_domain_type> {
991 size_t operator()(const seastar::rpc::streaming_domain_type& domain) const {
992 size_t h = 0;
993 boost::hash_combine(h, std::hash<uint64_t>{}(domain._id));
994 return h;
995 }
996};
997}
998
999
A representation of a possibly not-yet-computed value.
Definition: future.hh:1198
static time_point now() noexcept
Definition: lowres_clock.hh:74
promise - allows a future value to be made available at a later time.
Definition: future.hh:913
void set_value(A &&... a) noexcept
Sets the promises value.
Definition: future.hh:969
Definition: reference_wrapper.hh:43
Definition: rpc_impl.hh:737
Definition: rpc.hh:414
Definition: rpc_types.hh:140
Definition: rpc_types.hh:67
Definition: rpc.hh:230
Definition: rpc_types.hh:194
Definition: rpc_types.hh:189
Definition: rpc.hh:798
Definition: rpc.hh:389
Definition: rpc_types.hh:318
Definition: rpc.hh:408
Definition: rpc_types.hh:357
Definition: rpc_types.hh:398
Identifies function calls that are accounted as a group.
Definition: scheduling.hh:293
static futurize_t< std::invoke_result_t< Func > > submit_to(unsigned t, smp_submit_to_options options, Func &&func) noexcept
Definition: smp.hh:358
future< T > make_ready_future(A &&... value) noexcept
Creates a future in an available, value state.
Definition: future.hh:1870
future< T > make_exception_future(std::exception_ptr &&value) noexcept
Creates a future in an available, failed state.
Definition: future.hh:1876
auto when_all(FutOrFuncs &&... fut_or_funcs) noexcept
Definition: when_all.hh:252
future now()
Returns a ready future.
Definition: later.hh:35
reference_wrapper< T > ref(T &object) noexcept
Wraps reference in a reference_wrapper.
Definition: reference_wrapper.hh:62
std::future< T > submit_to(instance &instance, unsigned shard, Func func)
Definition: alien.hh:204
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
sstring format(fmt::format_string< A... > fmt, A &&... a)
Definition: format.hh:42
Definition: function_traits.hh:74
STL namespace.
Definition: rpc_types.hh:204
Definition: rpc_types.hh:98
Definition: rpc_impl.hh:57
Definition: rpc_impl.hh:61
Definition: rpc_impl.hh:58
Definition: rpc_impl.hh:62
Definition: rpc_impl.hh:215
Definition: rpc_impl.hh:214
Definition: rpc_types.hh:180
Definition: rpc_types.hh:240
Definition: rpc_impl.hh:468
Definition: rpc_impl.hh:484
Definition: rpc_impl.hh:45
Definition: rpc.hh:685
Definition: rpc_impl.hh:192
Definition: rpc_impl.hh:66
Definition: rpc.hh:191
Definition: rpc_impl.hh:308
Definition: rpc_impl.hh:307
Definition: rpc_impl.hh:123
Definition: rpc_impl.hh:54