Seastar
High performance C++ framework for concurrent servers
rpc_impl.hh
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18/*
19 * Copyright (C) 2015 Cloudius Systems, Ltd.
20 */
21#pragma once
22
23#include <seastar/core/format.hh>
24#include <seastar/core/function_traits.hh>
25#include <seastar/core/shared_ptr.hh>
26#include <seastar/core/sstring.hh>
27#include <seastar/core/when_all.hh>
28#include <seastar/util/assert.hh>
29#include <seastar/util/is_smart_ptr.hh>
30#include <seastar/core/simple-stream.hh>
31#include <seastar/net/packet-data-source.hh>
32#include <seastar/core/deleter.hh>
33
34#include <boost/type.hpp> // for compatibility
35
36#include <concepts>
37
38namespace seastar {
39
40namespace rpc {
41
42enum class exception_type : uint32_t {
43 USER = 0,
44 UNKNOWN_VERB = 1,
45};
46
47template<typename T>
49 using type = T;
50};
51
52template<typename T>
54 using type = T;
55};
56
57struct wait_type {}; // opposite of no_wait_type
58
59// tags to tell whether we want a const client_info& parameter
62
63// tags to tell whether we want a opt_time_point parameter
66
67// General case
68template <typename Ret, typename... In>
69struct signature<Ret (In...)> {
70 using ret_type = Ret;
71 using arg_types = std::tuple<In...>;
72 using clean = signature;
75};
76
77// Specialize 'clean' for handlers that receive client_info
78template <typename Ret, typename... In>
79struct signature<Ret (const client_info&, In...)> {
80 using ret_type = Ret;
81 using arg_types = std::tuple<In...>;
82 using clean = signature<Ret (In...)>;
85};
86
87template <typename Ret, typename... In>
88struct signature<Ret (client_info&, In...)> {
89 using ret_type = Ret;
90 using arg_types = std::tuple<In...>;
91 using clean = signature<Ret (In...)>;
94};
95
96// Specialize 'clean' for handlers that receive client_info and opt_time_point
97template <typename Ret, typename... In>
98struct signature<Ret (const client_info&, opt_time_point, In...)> {
99 using ret_type = Ret;
100 using arg_types = std::tuple<In...>;
101 using clean = signature<Ret (In...)>;
104};
105
106template <typename Ret, typename... In>
107struct signature<Ret (client_info&, opt_time_point, In...)> {
108 using ret_type = Ret;
109 using arg_types = std::tuple<In...>;
110 using clean = signature<Ret (In...)>;
113};
114
115// Specialize 'clean' for handlers that receive opt_time_point
116template <typename Ret, typename... In>
117struct signature<Ret (opt_time_point, In...)> {
118 using ret_type = Ret;
119 using arg_types = std::tuple<In...>;
120 using clean = signature<Ret (In...)>;
123};
124
125template <typename T>
127 using type = wait_type;
128 using cleaned_type = T;
129};
130
131template <typename... T>
132struct wait_signature<future<T...>> {
133 using type = wait_type;
134 using cleaned_type = future<T...>;
135};
136
137template <>
139 using type = no_wait_type;
140 using cleaned_type = void;
141};
142
143template <>
145 using type = no_wait_type;
146 using cleaned_type = future<>;
147};
148
149template <typename T>
150using wait_signature_t = typename wait_signature<T>::type;
151
152template <typename... In>
153inline
154std::tuple<In...>
155maybe_add_client_info(dont_want_client_info, client_info&, std::tuple<In...>&& args) {
156 return std::move(args);
157}
158
159template <typename... In>
160inline
161std::tuple<std::reference_wrapper<client_info>, In...>
162maybe_add_client_info(do_want_client_info, client_info& ci, std::tuple<In...>&& args) {
163 return std::tuple_cat(std::make_tuple(std::ref(ci)), std::move(args));
164}
165
166template <typename... In>
167inline
168std::tuple<In...>
169maybe_add_time_point(dont_want_time_point, opt_time_point&, std::tuple<In...>&& args) {
170 return std::move(args);
171}
172
173template <typename... In>
174inline
175std::tuple<opt_time_point, In...>
176maybe_add_time_point(do_want_time_point, opt_time_point& otp, std::tuple<In...>&& args) {
177 return std::tuple_cat(std::make_tuple(otp), std::move(args));
178}
179
180inline sstring serialize_connection_id(const connection_id& id) {
181 sstring p = uninitialized_string(sizeof(id));
182 auto c = p.data();
183 write_le(c, id.id());
184 return p;
185}
186
187inline connection_id deserialize_connection_id(const sstring& s) {
188 using id_type = decltype(connection_id{0}.id());
189 auto p = s.c_str();
190 auto id = read_le<id_type>(p);
191 return connection_id{id};
192}
193
194template <bool IsSmartPtr>
196
197template <>
198struct serialize_helper<false> {
199 template <typename Serializer, typename Output, typename T>
200 static inline void serialize(Serializer& serializer, Output& out, const T& t) {
201 return write(serializer, out, t);
202 }
203};
204
205template <>
206struct serialize_helper<true> {
207 template <typename Serializer, typename Output, typename T>
208 static inline void serialize(Serializer& serializer, Output& out, const T& t) {
209 return write(serializer, out, *t);
210 }
211};
212
213template <typename Serializer, typename Output, typename... T>
214inline void do_marshall(Serializer& serializer, Output& out, const T&... args);
215
216template <typename Serializer, typename Output>
218 template <typename T> struct helper {
219 static void doit(Serializer& serializer, Output& out, const T& arg) {
220 using serialize_helper_type = serialize_helper<is_smart_ptr<typename std::remove_reference_t<T>>::value>;
221 serialize_helper_type::serialize(serializer, out, arg);
222 }
223 };
224 template<typename T> struct helper<std::reference_wrapper<const T>> {
225 static void doit(Serializer& serializer, Output& out, const std::reference_wrapper<const T>& arg) {
226 helper<T>::doit(serializer, out, arg.get());
227 }
228 };
229 static void put_connection_id(const connection_id& cid, Output& out) {
230 sstring id = serialize_connection_id(cid);
231 out.write(id.c_str(), id.size());
232 }
233 template <typename... T> struct helper<sink<T...>> {
234 static void doit(Serializer&, Output& out, const sink<T...>& arg) {
235 put_connection_id(arg.get_id(), out);
236 }
237 };
238 template <typename... T> struct helper<source<T...>> {
239 static void doit(Serializer&, Output& out, const source<T...>& arg) {
240 put_connection_id(arg.get_id(), out);
241 }
242 };
243 template <typename... T> struct helper<tuple<T...>> {
244 static void doit(Serializer& serializer, Output& out, const tuple<T...>& arg) {
245 auto do_do_marshall = [&serializer, &out] (const auto&... args) {
246 do_marshall(serializer, out, args...);
247 };
248 // since C++23, std::apply() only accepts tuple-like types, while
249 // rpc::tuple is not a tuple-like type from the tuple-like C++
250 // concept's perspective. so we have to cast it to std::tuple to
251 // appease std::apply()
252 std::apply(do_do_marshall, static_cast<const std::tuple<T...>&>(arg));
253 }
254 };
255};
256
257template <typename Serializer, typename Output, typename... T>
258inline void do_marshall(Serializer& serializer, Output& out, const T&... args) {
259 // C++ guarantees that brace-initialization expressions are evaluted in order
260 (void)std::initializer_list<int>{(marshall_one<Serializer, Output>::template helper<T>::doit(serializer, out, args), 1)...};
261}
262
263static inline memory_output_stream<snd_buf::iterator> make_serializer_stream(snd_buf& output) {
264 auto* b = std::get_if<temporary_buffer<char>>(&output.bufs);
265 if (b) {
266 return memory_output_stream<snd_buf::iterator>(memory_output_stream<snd_buf::iterator>::simple(b->get_write(), b->size()));
267 } else {
268 auto& ar = std::get<std::vector<temporary_buffer<char>>>(output.bufs);
269 return memory_output_stream<snd_buf::iterator>(memory_output_stream<snd_buf::iterator>::fragmented(ar.begin(), output.size));
270 }
271}
272
273template <typename Serializer, typename... T>
274inline snd_buf marshall(Serializer& serializer, size_t head_space, const T&... args) {
275 measuring_output_stream measure;
276 do_marshall(serializer, measure, args...);
277 snd_buf ret(measure.size() + head_space);
278 auto out = make_serializer_stream(ret);
279 out.skip(head_space);
280 do_marshall(serializer, out, args...);
281 return ret;
282}
283
284template <typename Serializer, typename Input, typename... T>
285std::tuple<T...> do_unmarshall(connection& c, Input& in);
286
287// The protocol to call the serializer is read(serializer, stream, rpc::type<T>).
288// However, some users (ahem) used boost::type instead of rpc::type when the two
289// types were aliased, preventing us from moving to the newer std::type_identity.
290// To preserve compatibility, calls to read() are routed through
291// read_via_type_marker(), of which there are two variants, one for
292// boost::type (marked as deprecated) and one for std::type_identity.
293
294template <typename T, typename... Args>
295requires requires (Args... args, type<T> t) { read(std::forward<Args>(args)..., t); }
296auto
297read_via_type_marker(Args&&... args) {
298 return read(std::forward<Args>(args)..., type<T>());
299}
300
301template <typename T, typename... Args>
302requires requires (Args... args, boost::type<T> t) { read(std::forward<Args>(args)..., t); }
303[[deprecated("Use rpc::type<> instead of boost::type<>")]]
304auto
305read_via_type_marker(Args&&... args) {
306 return read(std::forward<Args>(args)..., boost::type<T>());
307}
308
309template<typename Serializer, typename Input>
311 template<typename T> struct helper {
312 static T doit(connection& c, Input& in) {
313 return read_via_type_marker<T>(c.serializer<Serializer>(), in);
314 }
315 };
316 template<typename T> struct helper<optional<T>> {
317 static optional<T> doit(connection& c, Input& in) {
318 if (in.size()) {
319 return optional<T>(read_via_type_marker<typename remove_optional<T>::type>(c.serializer<Serializer>(), in));
320 } else {
321 return optional<T>();
322 }
323 }
324 };
325 template<typename T> struct helper<std::reference_wrapper<const T>> {
326 static T doit(connection& c, Input& in) {
327 return helper<T>::doit(c, in);
328 }
329 };
330 static connection_id get_connection_id(Input& in) {
331 sstring id = uninitialized_string(sizeof(connection_id));
332 in.read(id.data(), sizeof(connection_id));
333 return deserialize_connection_id(id);
334 }
335 template<typename... T> struct helper<sink<T...>> {
336 static sink<T...> doit(connection& c, Input& in) {
337 return sink<T...>(make_shared<internal::sink_impl<Serializer, T...>>(c.get_stream(get_connection_id(in))));
338 }
339 };
340 template<typename... T> struct helper<source<T...>> {
341 static source<T...> doit(connection& c, Input& in) {
342 return source<T...>(make_shared<internal::source_impl<Serializer, T...>>(c.get_stream(get_connection_id(in))));
343 }
344 };
345 template <typename... T> struct helper<tuple<T...>> {
346 static tuple<T...> doit(connection& c, Input& in) {
347 return do_unmarshall<Serializer, Input, T...>(c, in);
348 }
349 };
350};
351
352template <typename... T>
354
355template <>
357 using type = std::tuple<>;
358};
359
360template <typename T0, typename... T>
362 using type = std::tuple<
363 T0,
364 std::conditional_t<
365 std::is_default_constructible_v<T>,
366 T,
367 std::optional<T>
368 >...
369 >;
370};
371
372template <typename... T>
373using default_constructible_tuple_except_first_t = typename default_constructible_tuple_except_first<T...>::type;
374
375// Where Tin != Tout, apply std:optional::value()
376template <typename... Tout, typename... Tin>
377auto
378unwrap_optional_if_needed(std::tuple<Tin...>&& tuple_in) {
379 using tuple_in_t = std::tuple<Tin...>;
380 using tuple_out_t = std::tuple<Tout...>;
381 return std::invoke([&] <size_t... Idx> (std::index_sequence<Idx...>) {
382 return tuple_out_t(
383 std::invoke([&] () {
384 if constexpr (std::same_as<std::tuple_element_t<Idx, tuple_in_t>, std::tuple_element_t<Idx, tuple_out_t>>) {
385 return std::move(std::get<Idx>(tuple_in));
386 } else {
387 return std::move(std::get<Idx>(tuple_in).value());
388 }
389 })...);
390 }, std::make_index_sequence<sizeof...(Tout)>());
391}
392
393template <typename Serializer, typename Input, typename... T>
394inline std::tuple<T...> do_unmarshall(connection& c, Input& in) {
395 // Argument order processing is unspecified, but we need to deserialize
396 // left-to-right. So we deserialize into something that can be lazily
397 // constructed (and can conditionally destroy itself if we only constructed some
398 // of the arguments).
399 //
400 // The first element of the tuple has no ordering
401 // problem, and we can deserialize directly into a std::tuple<T...>.
402 //
403 // For the rest of the elements, if they are default-constructible, we leave
404 // them as is, and if not, we deserialize into std::optional<T>, and later
405 // unwrap them. If we're lucky and nothing was wrapped, we can return without
406 // any data movement.
407 using ret_type = std::tuple<T...>;
408 using temporary_type = default_constructible_tuple_except_first_t<T...>;
409 return std::invoke([&] <size_t... Idx> (std::index_sequence<Idx...>) {
410 auto tmp = temporary_type(
411 std::invoke([&] () -> std::tuple_element_t<Idx, temporary_type> {
412 if constexpr (Idx == 0) {
413 // The first T has no ordering problem, so we can deserialize it directly into the tuple
414 return unmarshal_one<Serializer, Input>::template helper<std::tuple_element_t<Idx, ret_type>>::doit(c, in);
415 } else {
416 // Use default constructor for the rest of the Ts
417 return {};
418 }
419 })...
420 );
421 // Deserialize the other Ts, comma-expression preserves left-to-right order.
422 (void)(..., ((Idx == 0
423 ? 0
424 : ((std::get<Idx>(tmp) = unmarshal_one<Serializer, Input>::template helper<std::tuple_element_t<Idx, ret_type>>::doit(c, in), 0)))));
425 if constexpr (std::same_as<ret_type, temporary_type>) {
426 // Use Named Return Vale Optimization (NVRO) if we didn't have to wrap anything
427 return tmp;
428 } else {
429 return unwrap_optional_if_needed<T...>(std::move(tmp));
430 }
431 }, std::index_sequence_for<T...>());
432}
433
434template <typename Serializer, typename... T>
435inline std::tuple<T...> unmarshall(connection& c, rcv_buf input) {
436 auto in = make_deserializer_stream(input);
437 return do_unmarshall<Serializer, decltype(in), T...>(c, in);
438}
439
440inline std::exception_ptr unmarshal_exception(rcv_buf& d) {
441 std::exception_ptr ex;
442 auto data = make_deserializer_stream(d);
443
444 uint32_t v32;
445 data.read(reinterpret_cast<char*>(&v32), 4);
446 exception_type ex_type = exception_type(le_to_cpu(v32));
447 data.read(reinterpret_cast<char*>(&v32), 4);
448 uint32_t ex_len = le_to_cpu(v32);
449
450 switch (ex_type) {
451 case exception_type::USER: {
452 std::string s(ex_len, '\0');
453 data.read(&*s.begin(), ex_len);
454 ex = std::make_exception_ptr(remote_verb_error(std::move(s)));
455 break;
456 }
457 case exception_type::UNKNOWN_VERB: {
458 uint64_t v64;
459 data.read(reinterpret_cast<char*>(&v64), 8);
460 ex = std::make_exception_ptr(unknown_verb_error(le_to_cpu(v64)));
461 break;
462 }
463 default:
464 ex = std::make_exception_ptr(unknown_exception_error());
465 break;
466 }
467 return ex;
468}
469
470template <typename Payload, typename... T>
472 bool done = false;
473 promise<T...> p;
474 template<typename... V>
475 void set_value(V&&... v) {
476 done = true;
477 p.set_value(seastar::internal::untuple(std::forward<V>(v))...);
478 }
480 if (!done) {
481 p.set_exception(closed_error());
482 }
483 }
484};
485
486template<typename Serializer, typename T>
487struct rcv_reply : rcv_reply_base<T, T> {
488 inline void get_reply(rpc::client& dst, rcv_buf input) {
489 this->set_value(unmarshall<Serializer, T>(dst, std::move(input)));
490 }
491};
492
493template<typename Serializer, typename... T>
494struct rcv_reply<Serializer, future<T...>> : rcv_reply_base<std::tuple<T...>, T...> {
495 inline void get_reply(rpc::client& dst, rcv_buf input) {
496 this->set_value(unmarshall<Serializer, T...>(dst, std::move(input)));
497 }
498};
499
500template<typename Serializer>
501struct rcv_reply<Serializer, void> : rcv_reply_base<void, void> {
502 inline void get_reply(rpc::client&, rcv_buf) {
503 this->set_value();
504 }
505};
506
507template<typename Serializer>
508struct rcv_reply<Serializer, future<>> : rcv_reply<Serializer, void> {};
509
510template <typename Serializer, typename Ret, typename... InArgs>
511inline auto wait_for_reply(wait_type, std::optional<rpc_clock_type::time_point> timeout, rpc_clock_type::time_point start, cancellable* cancel, rpc::client& dst, id_type msg_id,
512 signature<Ret (InArgs...)>) {
513 using reply_type = rcv_reply<Serializer, Ret>;
514 auto lambda = [] (reply_type& r, rpc::client& dst, id_type msg_id, rcv_buf data) mutable {
515 if (msg_id >= 0) {
516 dst.get_stats_internal().replied++;
517 return r.get_reply(dst, std::move(data));
518 } else {
519 dst.get_stats_internal().exception_received++;
520 r.done = true;
521 r.p.set_exception(unmarshal_exception(data));
522 }
523 };
524 using handler_type = typename rpc::client::template reply_handler<reply_type, decltype(lambda)>;
525 auto r = std::make_unique<handler_type>(std::move(lambda));
526 r->start = start;
527 auto fut = r->reply.p.get_future();
528 dst.wait_for_reply(msg_id, std::move(r), timeout, cancel);
529 return fut;
530}
531
532template<typename Serializer, typename... InArgs>
533inline auto wait_for_reply(no_wait_type, std::optional<rpc_clock_type::time_point>, rpc_clock_type::time_point start, cancellable*, rpc::client&, id_type,
534 signature<no_wait_type (InArgs...)>) { // no_wait overload
535 return make_ready_future<>();
536}
537
538template<typename Serializer, typename... InArgs>
539inline auto wait_for_reply(no_wait_type, std::optional<rpc_clock_type::time_point>, rpc_clock_type::time_point, cancellable*, rpc::client&, id_type,
540 signature<future<no_wait_type> (InArgs...)>) { // future<no_wait> overload
541 return make_ready_future<>();
542}
543
544// Convert a relative timeout (a duration) to an absolute one (time_point).
545// Do the calculation safely so that a very large duration will be capped by
546// time_point::max, instead of wrapping around to ancient history.
547inline rpc_clock_type::time_point
548relative_timeout_to_absolute(rpc_clock_type::duration relative) {
549 rpc_clock_type::time_point now = rpc_clock_type::now();
550 return now + std::min(relative, rpc_clock_type::time_point::max() - now);
551}
552
553// Refer to struct request_frame for more details
554static constexpr size_t request_frame_headroom = 28;
555
556// Returns lambda that can be used to send rpc messages.
557// The lambda gets client connection and rpc parameters as arguments, marshalls them sends
558// to a server and waits for a reply. After receiving reply it unmarshalls it and signal completion
559// to a caller.
560template<typename Serializer, typename MsgType, typename Ret, typename... InArgs>
561auto send_helper(MsgType xt, signature<Ret (InArgs...)> xsig) {
562 struct shelper {
563 MsgType t;
564 signature<Ret (InArgs...)> sig;
565 auto send(rpc::client& dst, std::optional<rpc_clock_type::time_point> timeout, cancellable* cancel, const InArgs&... args) {
566 if (dst.error()) {
567 using cleaned_ret_type = typename wait_signature<Ret>::cleaned_type;
568 return futurize<cleaned_ret_type>::make_exception_future(closed_error());
569 }
570
571 auto start = rpc_clock_type::now();
572 // send message
573 auto msg_id = dst.next_message_id();
574 snd_buf data = marshall(dst.template serializer<Serializer>(), request_frame_headroom, args...);
575
576 // prepare reply handler, if return type is now_wait_type this does nothing, since no reply will be sent
577 using wait = wait_signature_t<Ret>;
578 return when_all(dst.request(uint64_t(t), msg_id, std::move(data), timeout, cancel), wait_for_reply<Serializer>(wait(), timeout, start, cancel, dst, msg_id, sig)).then([] (auto r) {
579 std::get<0>(r).ignore_ready_future();
580 return std::move(std::get<1>(r)); // return future of wait_for_reply
581 });
582 }
583 auto operator()(rpc::client& dst, const InArgs&... args) {
584 return send(dst, {}, nullptr, args...);
585 }
586 auto operator()(rpc::client& dst, rpc_clock_type::time_point timeout, const InArgs&... args) {
587 return send(dst, timeout, nullptr, args...);
588 }
589 auto operator()(rpc::client& dst, rpc_clock_type::time_point timeout, cancellable& cancel, const InArgs&... args) {
590 return send(dst, timeout, &cancel, args...);
591 }
592 auto operator()(rpc::client& dst, rpc_clock_type::duration timeout, const InArgs&... args) {
593 return send(dst, relative_timeout_to_absolute(timeout), nullptr, args...);
594 }
595 auto operator()(rpc::client& dst, rpc_clock_type::duration timeout, cancellable& cancel, const InArgs&... args) {
596 return send(dst, relative_timeout_to_absolute(timeout), &cancel, args...);
597 }
598 auto operator()(rpc::client& dst, cancellable& cancel, const InArgs&... args) {
599 return send(dst, {}, &cancel, args...);
600 }
601
602 };
603 return shelper{xt, xsig};
604}
605
606// Refer to struct response_frame for more details
607static constexpr size_t response_frame_headroom = 16;
608
609template<typename Serializer, typename RetTypes>
610inline future<> reply(wait_type, future<RetTypes>&& ret, int64_t msg_id, shared_ptr<server::connection> client,
611 std::optional<rpc_clock_type::time_point> timeout, std::optional<rpc_clock_type::duration> handler_duration) {
612 if (!client->error()) {
613 snd_buf data;
614 try {
615 if constexpr (std::is_void_v<RetTypes>) {
616 ret.get();
617 data = std::invoke(marshall<Serializer>, std::ref(client->template serializer<Serializer>()), response_frame_headroom);
618 } else {
619 data = std::invoke(marshall<Serializer, const RetTypes&>, std::ref(client->template serializer<Serializer>()), response_frame_headroom, std::move(ret.get()));
620 }
621 } catch (std::exception& ex) {
622 uint32_t len = std::strlen(ex.what());
623 data = snd_buf(response_frame_headroom + 2 * sizeof(uint32_t) + len);
624 auto os = make_serializer_stream(data);
625 os.skip(response_frame_headroom);
626 uint32_t v32 = cpu_to_le(uint32_t(exception_type::USER));
627 os.write(reinterpret_cast<char*>(&v32), sizeof(v32));
628 v32 = cpu_to_le(len);
629 os.write(reinterpret_cast<char*>(&v32), sizeof(v32));
630 os.write(ex.what(), len);
631 msg_id = -msg_id;
632 }
633
634 return client->respond(msg_id, std::move(data), timeout, handler_duration);
635 } else {
636 ret.ignore_ready_future();
637 return make_ready_future<>();
638 }
639}
640
641// specialization for no_wait_type which does not send a reply
642template<typename Serializer>
643inline future<> reply(no_wait_type, future<no_wait_type>&& r, int64_t msgid, shared_ptr<server::connection> client,
644 std::optional<rpc_clock_type::time_point>, std::optional<rpc_clock_type::duration>) {
645 try {
646 r.get();
647 } catch (std::exception& ex) {
648 client->get_logger()(client->info(), msgid, to_sstring("exception \"") + ex.what() + "\" in no_wait handler ignored");
649 }
650 return make_ready_future<>();
651}
652
653template<typename Ret, typename... InArgs, typename WantClientInfo, typename WantTimePoint, typename Func, typename ArgsTuple>
654inline futurize_t<Ret> apply(Func& func, client_info& info, opt_time_point time_point, WantClientInfo wci, WantTimePoint wtp, signature<Ret (InArgs...)>, ArgsTuple&& args) {
655 using futurator = futurize<Ret>;
656 return futurator::apply(func, maybe_add_client_info(wci, info, maybe_add_time_point(wtp, time_point, std::forward<ArgsTuple>(args))));
657}
658
659// lref_to_cref is a helper that encapsulates lvalue reference in std::ref() or does nothing otherwise
660template<typename T>
661auto lref_to_cref(T&& x) {
662 return std::move(x);
663}
664
665template<typename T>
666auto lref_to_cref(T& x) {
667 return std::ref(x);
668}
669
670// Creates lambda to handle RPC message on a server.
671// The lambda unmarshalls all parameters, calls a handler, marshall return values and sends them back to a client
672template <typename Serializer, typename Func, typename Ret, typename... InArgs, typename WantClientInfo, typename WantTimePoint>
673auto recv_helper(signature<Ret (InArgs...)> sig, Func&& func, WantClientInfo, WantTimePoint) {
674 using signature = decltype(sig);
675 using wait_style = wait_signature_t<Ret>;
676 return [func = lref_to_cref(std::forward<Func>(func))](shared_ptr<server::connection> client,
677 std::optional<rpc_clock_type::time_point> timeout,
678 int64_t msg_id,
679 rcv_buf data,
680 gate::holder guard) mutable {
681 auto memory_consumed = client->estimate_request_size(data.size);
682 if (memory_consumed > client->max_request_size()) {
683 auto err = format("request size {:d} large than memory limit {:d}", memory_consumed, client->max_request_size());
684 client->get_logger()(client->peer_address(), err);
685 // FIXME: future is discarded
686 (void)try_with_gate(client->get_server().reply_gate(), [client, timeout, msg_id, err = std::move(err)] {
687 return reply<Serializer>(wait_style(), futurize<Ret>::make_exception_future(std::runtime_error(err.c_str())), msg_id, client, timeout, std::nullopt).handle_exception([client, msg_id] (std::exception_ptr eptr) {
688 client->get_logger()(client->info(), msg_id, seastar::format("got exception while processing an oversized message: {}", eptr));
689 });
690 }).handle_exception_type([] (gate_closed_exception&) {/* ignore */});
691 return make_ready_future();
692 }
693 // note: apply is executed asynchronously with regards to networking so we cannot chain futures here by doing "return apply()"
694 auto f = client->wait_for_resources(memory_consumed, timeout).then([client, timeout, msg_id, data = std::move(data), &func, g = std::move(guard)] (auto permit) mutable {
695 // FIXME: future is discarded
696 (void)try_with_gate(client->get_server().reply_gate(), [client, timeout, msg_id, data = std::move(data), permit = std::move(permit), &func] () mutable {
697 try {
698 auto args = unmarshall<Serializer, InArgs...>(*client, std::move(data));
699 auto start = rpc_clock_type::now();
700 return apply(func, client->info(), timeout, WantClientInfo(), WantTimePoint(), signature(), std::move(args)).then_wrapped([client, timeout, msg_id, permit = std::move(permit), start] (futurize_t<Ret> ret) mutable {
701 return reply<Serializer>(wait_style(), std::move(ret), msg_id, client, timeout, rpc_clock_type::now() - start).handle_exception([permit = std::move(permit), client, msg_id] (std::exception_ptr eptr) {
702 client->get_logger()(client->info(), msg_id, seastar::format("got exception while processing a message: {}", eptr));
703 });
704 });
705 } catch (...) {
706 client->get_logger()(client->info(), msg_id, seastar::format("caught exception while processing a message: {}", std::current_exception()));
707 return make_ready_future();
708 }
709 }).handle_exception_type([g = std::move(g)] (gate_closed_exception&) {/* ignore */});
710 });
711
712 if (timeout) {
713 f = f.handle_exception_type([] (semaphore_timed_out&) { /* ignore */ });
714 }
715
716 return f;
717 };
718}
719
720// helper to create copy constructible lambda from non copy constructible one. std::function<> works only with former kind.
721template<typename Func>
722auto make_copyable_function(Func&& func) {
723 auto p = make_lw_shared<typename std::decay_t<Func>>(std::forward<Func>(func));
724 return [p] (auto&&... args) { return (*p)( std::forward<decltype(args)>(args)... ); };
725}
726
727template<typename Func>
728requires std::copy_constructible<std::decay_t<Func>>
729auto make_copyable_function(Func&& func) {
730 return std::forward<Func>(func);
731}
732
733// This class is used to calculate client side rpc function signature.
734// Return type is converted from a smart pointer to a type it points to.
735// rpc::optional are converted to non optional type.
736//
737// Examples:
738// std::unique_ptr<int>(int, rpc::optional<long>) -> int(int, long)
739// double(float) -> double(float)
740template<typename Ret, typename... In>
742 template<typename T, bool IsSmartPtr>
743 struct drop_smart_ptr_impl;
744 template<typename T>
745 struct drop_smart_ptr_impl<T, true> {
746 using type = typename T::element_type;
747 };
748 template<typename T>
749 struct drop_smart_ptr_impl<T, false> {
750 using type = T;
751 };
752 template<typename T>
753 using drop_smart_ptr = drop_smart_ptr_impl<T, is_smart_ptr<T>::value>;
754
755 // if return type is smart ptr take a type it points to instead
756 using return_type = typename drop_smart_ptr<Ret>::type;
757public:
758 using type = return_type(typename remove_optional<In>::type...);
759};
760
761template<typename Serializer, typename MsgType>
762template<typename Ret, typename... In>
763auto protocol<Serializer, MsgType>::make_client(signature<Ret(In...)>, MsgType t) {
764 using sig_type = signature<typename client_function_type<Ret, In...>::type>;
765 return send_helper<Serializer>(t, sig_type());
766}
767
768template<typename Serializer, typename MsgType>
769template<typename Func>
771 return make_client(typename signature<typename function_traits<Func>::signature>::clean(), t);
772}
773
774template<typename Serializer, typename MsgType>
775template<typename Func>
778 using clean_sig_type = typename sig_type::clean;
779 using want_client_info = typename sig_type::want_client_info;
780 using want_time_point = typename sig_type::want_time_point;
781 auto recv = recv_helper<Serializer>(clean_sig_type(), std::forward<Func>(func),
782 want_client_info(), want_time_point());
783 register_receiver(t, rpc_handler{sg, make_copyable_function(std::move(recv)), {}});
784 return make_client(clean_sig_type(), t);
785}
786
787template<typename Serializer, typename MsgType>
788template<typename Func>
790 return register_handler(t, scheduling_group(), std::forward<Func>(func));
791}
792
793template<typename Serializer, typename MsgType>
795 auto it = _handlers.find(t);
796 if (it != _handlers.end()) {
797 return it->second.use_gate.close().finally([this, t] {
798 _handlers.erase(t);
799 });
800 }
801 return make_ready_future<>();
802}
803
804template<typename Serializer, typename MsgType>
806 auto it = _handlers.find(msg_id);
807 if (it == _handlers.end()) {
808 return false;
809 }
810 return !it->second.use_gate.is_closed();
811}
812
813template<typename Serializer, typename MsgType>
814std::optional<protocol_base::handler_with_holder> protocol<Serializer, MsgType>::get_handler(uint64_t msg_id) {
815 const auto it = _handlers.find(MsgType(msg_id));
816 if (it != _handlers.end()) {
817 try {
818 return handler_with_holder{it->second, it->second.use_gate.hold()};
819 } catch (gate_closed_exception&) {
820 // unregistered, just ignore
821 }
822 }
823 return std::nullopt;
824}
825
826namespace internal {
827
828template<typename Serializer, typename... Out>
829sink_impl<Serializer, Out...>::sink_impl(xshard_connection_ptr con)
830 : sink<Out...>::impl(std::move(con))
831 , _send_queue([this] (snd_buf* buf) { return send_buffer(buf); }, this->_con->get_owner_shard())
832 , _delete_queue([] (snd_buf* buf) { delete buf; return make_ready_future<>(); }, this_shard_id())
833{
834 this->_con->get()->_sink_closed = false;
835}
836
837snd_buf make_shard_local_buffer_copy(snd_buf* org, std::function<deleter(snd_buf*)> make_deleter);
838
839// Runs on connection shard
840template<typename Serializer, typename... Out>
841future<> sink_impl<Serializer, Out...>::send_buffer(snd_buf* data) {
842 auto local_data = make_shard_local_buffer_copy(data, [this] (snd_buf* org) {
843 return deleter(new snd_buf_deleter_impl(org, _delete_queue));
844 });
845 // Exceptions are allowed from here since destroying local_data will free the original data buffer
846 if (this->_ex) {
847 return make_ready_future<>();
848 }
849 connection* con = this->_con->get();
850 // Keep first error in _ex, but make sure to drain the whole batch
851 // and destroy all queued buffers
852 if (con->error()) {
853 this->_ex = std::make_exception_ptr(closed_error());
854 return make_ready_future<>();
855 }
856 if (con->sink_closed()) {
857 this->_ex = std::make_exception_ptr(stream_closed());
858 return make_ready_future<>();
859 }
860
861 return con->send(std::move(local_data), {}, nullptr);
862}
863
864template<typename Serializer, typename... Out>
865future<> sink_impl<Serializer, Out...>::operator()(const Out&... args) {
866 // note that we use remote serializer pointer, so if serailizer needs a state
867 // it should have per-cpu one
868 auto data = std::make_unique<snd_buf>(marshall(this->_con->get()->template serializer<Serializer>(), 4, args...));
869 static_assert(snd_buf::chunk_size >= 4, "send buffer chunk size is too small");
870 auto p = data->front().get_write();
871 write_le<uint32_t>(p, data->size - 4);
872 // we do not want to dead lock on huge packets, so let them in
873 // but only one at a time
874 auto size = std::min(size_t(data->size), max_stream_buffers_memory);
875 return get_units(this->_sem, size).then([this, data = std::move(data)] (semaphore_units<> su) mutable {
876 if (this->_ex) {
877 return make_exception_future(this->_ex);
878 }
879 data->su = std::move(su);
880 _send_queue.enqueue(data.get());
881 data.release();
882 return make_ready_future<>();
883 });
884}
885
886template<typename Serializer, typename... Out>
887future<> sink_impl<Serializer, Out...>::flush() noexcept {
888 // wait until everything is sent out before returning.
889 return with_semaphore(this->_sem, max_stream_buffers_memory, [this] {
890 if (this->_ex) {
891 return make_exception_future(this->_ex);
892 }
893 return make_ready_future();
894 });
895}
896
897template<typename Serializer, typename... Out>
898future<> sink_impl<Serializer, Out...>::close() noexcept {
899 return with_semaphore(this->_sem, max_stream_buffers_memory, [this] {
900 // break the semaphore to prevent any new messages to be sent
901 this->_sem.broken(stream_closed());
902 return _send_queue.stop().finally([this] {
903 return smp::submit_to(this->_con->get_owner_shard(), [this] {
904 return _delete_queue.stop().finally([this] {
905 connection* con = this->_con->get();
906 if (con->sink_closed()) { // double close, should not happen!
907 return make_exception_future(stream_closed());
908 }
909 future<> f = make_ready_future<>();
910 if (!con->error() && !this->_ex) {
911 snd_buf data = marshall(con->template serializer<Serializer>(), 4);
912 static_assert(snd_buf::chunk_size >= 4, "send buffer chunk size is too small");
913 auto p = data.front().get_write();
914 write_le<uint32_t>(p, -1U); // max len fragment marks an end of a stream
915 f = con->send(std::move(data), {}, nullptr);
916 } else {
917 f = this->_ex ? make_exception_future(this->_ex) : make_exception_future(closed_error());
918 }
919 return f.finally([con] { return con->close_sink(); });
920 });
921 });
922 });
923 });
924}
925
926template<typename Serializer, typename... Out>
927sink_impl<Serializer, Out...>::~sink_impl() {
928 // A failure to close might leave some continuations running after
929 // this is destroyed, leading to use-after-free bugs.
930 SEASTAR_ASSERT(this->_con->get()->sink_closed());
931}
932
933rcv_buf make_shard_local_buffer_copy(foreign_ptr<std::unique_ptr<rcv_buf>> org);
934
935template<typename Serializer, typename... In>
936future<std::optional<std::tuple<In...>>> source_impl<Serializer, In...>::operator()() {
937 auto process_one_buffer = [this] {
938 foreign_ptr<std::unique_ptr<rcv_buf>> buf = std::move(this->_bufs.front());
939 this->_bufs.pop_front();
940 return std::apply([] (In&&... args) {
941 auto ret = std::make_optional(std::make_tuple(std::move(args)...));
942 return make_ready_future<std::optional<std::tuple<In...>>>(std::move(ret));
943 }, unmarshall<Serializer, In...>(*this->_con->get(), make_shard_local_buffer_copy(std::move(buf))));
944 };
945
946 if (!this->_bufs.empty()) {
947 return process_one_buffer();
948 }
949
950 // refill buffers from remote cpu
951 return smp::submit_to(this->_con->get_owner_shard(), [this] () -> future<> {
952 connection* con = this->_con->get();
953 if (con->_source_closed) {
954 return make_exception_future<>(stream_closed());
955 }
956 return con->stream_receive(this->_bufs).then_wrapped([this, con] (future<>&& f) {
957 if (f.failed()) {
958 return con->close_source().then_wrapped([ex = f.get_exception()] (future<> f){
959 f.ignore_ready_future();
960 return make_exception_future<>(ex);
961 });
962 }
963 if (this->_bufs.empty()) { // nothing to read -> eof
964 return con->close_source().then_wrapped([] (future<> f) {
965 f.ignore_ready_future();
966 return make_ready_future<>();
967 });
968 }
969 return make_ready_future<>();
970 });
971 }).then([this, process_one_buffer] () {
972 if (this->_bufs.empty()) {
973 return make_ready_future<std::optional<std::tuple<In...>>>(std::nullopt);
974 } else {
975 return process_one_buffer();
976 }
977 });
978}
979
980} // namespace internal
981
982template<typename... Out>
983connection_id sink<Out...>::get_id() const {
984 return _impl->_con->get()->get_connection_id();
985}
986
987template<typename... In>
988connection_id source<In...>::get_id() const {
989 return _impl->_con->get()->get_connection_id();
990}
991
992template<typename... In>
993template<typename Serializer, typename... Out>
994sink<Out...> source<In...>::make_sink() {
995 return sink<Out...>(make_shared<internal::sink_impl<Serializer, Out...>>(_impl->_con));
996}
997
998}
999
1000}
1001
1002namespace std {
1003template<>
1004struct hash<seastar::rpc::streaming_domain_type> {
1005 size_t operator()(const seastar::rpc::streaming_domain_type& domain) const {
1006 size_t h = 0;
1007 boost::hash_combine(h, std::hash<uint64_t>{}(domain._id));
1008 return h;
1009 }
1010};
1011}
1012
1013
A representation of a possibly not-yet-computed value.
Definition: future.hh:1199
static time_point now() noexcept
Definition: lowres_clock.hh:74
promise - allows a future value to be made available at a later time.
Definition: future.hh:913
void set_value(A &&... a) noexcept
Sets the promises value.
Definition: future.hh:969
Definition: reference_wrapper.hh:43
Definition: rpc_impl.hh:741
Definition: rpc.hh:500
Definition: rpc_types.hh:142
Definition: rpc_types.hh:69
Definition: rpc.hh:241
Definition: rpc.hh:474
Definition: rpc_types.hh:197
Definition: rpc_types.hh:192
Definition: rpc.hh:885
Definition: rpc_types.hh:323
Definition: rpc_types.hh:364
Definition: rpc_types.hh:405
Identifies function calls that are accounted as a group.
Definition: scheduling.hh:369
static futurize_t< std::invoke_result_t< Func > > submit_to(unsigned t, smp_submit_to_options options, Func &&func) noexcept
Definition: smp.hh:359
future< T > make_ready_future(A &&... value) noexcept
Creates a future in an available, value state.
Definition: future.hh:1913
future< T > make_exception_future(std::exception_ptr &&value) noexcept
Creates a future in an available, failed state.
Definition: future.hh:1919
auto when_all(FutOrFuncs &&... fut_or_funcs) noexcept
Definition: when_all.hh:253
future now()
Returns a ready future.
Definition: later.hh:35
reference_wrapper< T > ref(T &object) noexcept
Wraps reference in a reference_wrapper.
Definition: reference_wrapper.hh:62
holds the implementation parts of the metrics layer, do not use directly.
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
sstring format(fmt::format_string< A... > fmt, A &&... a)
Definition: format.hh:42
shard_id this_shard_id() noexcept
Returns shard_id of the of the current shard.
Definition: shard_id.hh:52
Definition: function_traits.hh:74
STL namespace.
Definition: rpc_types.hh:207
Definition: rpc_types.hh:100
Definition: rpc_impl.hh:60
Definition: rpc_impl.hh:64
Definition: rpc_impl.hh:61
Definition: rpc_impl.hh:65
Definition: rpc_impl.hh:218
Definition: rpc_impl.hh:217
Definition: rpc_types.hh:183
Definition: rpc_types.hh:243
Definition: rpc_impl.hh:471
Definition: rpc_impl.hh:487
Definition: rpc_impl.hh:48
Definition: rpc.hh:772
Definition: rpc_impl.hh:195
Definition: rpc_impl.hh:69
Definition: rpc.hh:194
Definition: rpc_impl.hh:311
Definition: rpc_impl.hh:310
Definition: rpc_impl.hh:126
Definition: rpc_impl.hh:57