23 #include <seastar/core/function_traits.hh>
24 #include <seastar/core/shared_ptr.hh>
25 #include <seastar/core/sstring.hh>
26 #include <seastar/core/when_all.hh>
27 #include <seastar/util/is_smart_ptr.hh>
28 #include <seastar/core/simple-stream.hh>
29 #include <boost/range/numeric.hpp>
30 #include <boost/range/adaptor/transformed.hpp>
31 #include <seastar/net/packet-data-source.hh>
32 #include <seastar/core/print.hh>
38 enum class exception_type : uint32_t {
64 template <
typename Ret,
typename... In>
67 using arg_types = std::tuple<In...>;
74 template <
typename Ret,
typename... In>
77 using arg_types = std::tuple<In...>;
83 template <
typename Ret,
typename... In>
86 using arg_types = std::tuple<In...>;
93 template <
typename Ret,
typename... In>
96 using arg_types = std::tuple<In...>;
102 template <
typename Ret,
typename... In>
104 using ret_type = Ret;
105 using arg_types = std::tuple<In...>;
112 template <
typename Ret,
typename... In>
114 using ret_type = Ret;
115 using arg_types = std::tuple<In...>;
121 template <
typename T>
124 using cleaned_type = T;
127 template <
typename... T>
136 using cleaned_type = void;
145 template <
typename T>
148 template <
typename... In>
152 return std::move(args);
155 template <
typename... In>
157 std::tuple<std::reference_wrapper<client_info>, In...>
158 maybe_add_client_info(do_want_client_info, client_info& ci, std::tuple<In...>&& args) {
159 return std::tuple_cat(std::make_tuple(
std::ref(ci)), std::move(args));
162 template <
typename... In>
165 maybe_add_time_point(dont_want_time_point, opt_time_point&, std::tuple<In...>&& args) {
166 return std::move(args);
169 template <
typename... In>
171 std::tuple<opt_time_point, In...>
172 maybe_add_time_point(do_want_time_point, opt_time_point& otp, std::tuple<In...>&& args) {
173 return std::tuple_cat(std::make_tuple(otp), std::move(args));
176 inline sstring serialize_connection_id(
const connection_id&
id) {
177 sstring p = uninitialized_string(
sizeof(
id));
179 write_le(c,
id.
id());
183 inline connection_id deserialize_connection_id(
const sstring& s) {
184 using id_type = decltype(connection_id{0}.id());
186 auto id = read_le<id_type>(p);
187 return connection_id{
id};
190 template <
bool IsSmartPtr>
195 template <
typename Serializer,
typename Output,
typename T>
196 static inline void serialize(Serializer& serializer, Output& out,
const T& t) {
197 return write(serializer, out, t);
203 template <
typename Serializer,
typename Output,
typename T>
204 static inline void serialize(Serializer& serializer, Output& out,
const T& t) {
205 return write(serializer, out, *t);
209 template <
typename Serializer,
typename Output,
typename... T>
210 inline void do_marshall(Serializer& serializer, Output& out,
const T&... args);
212 template <
typename Serializer,
typename Output>
215 static void doit(Serializer& serializer, Output& out,
const T& arg) {
217 serialize_helper_type::serialize(serializer, out, arg);
221 static void doit(Serializer& serializer, Output& out,
const std::reference_wrapper<const T>& arg) {
225 static void put_connection_id(
const connection_id& cid, Output& out) {
226 sstring
id = serialize_connection_id(cid);
227 out.write(
id.c_str(),
id.size());
230 static void doit(Serializer&, Output& out,
const sink<T...>& arg) {
231 put_connection_id(arg.get_id(), out);
235 static void doit(Serializer&, Output& out,
const source<T...>& arg) {
236 put_connection_id(arg.get_id(), out);
240 static void doit(Serializer& serializer, Output& out,
const tuple<T...>& arg) {
241 auto do_do_marshall = [&serializer, &out] (
const auto&... args) {
242 do_marshall(serializer, out, args...);
244 std::apply(do_do_marshall, arg);
249 template <
typename Serializer,
typename Output,
typename... T>
250 inline void do_marshall(Serializer& serializer, Output& out,
const T&... args) {
255 static inline memory_output_stream<snd_buf::iterator> make_serializer_stream(snd_buf& output) {
256 auto* b = std::get_if<temporary_buffer<char>>(&output.bufs);
258 return memory_output_stream<snd_buf::iterator>(memory_output_stream<snd_buf::iterator>::simple(b->get_write(), b->size()));
260 auto& ar = std::get<std::vector<temporary_buffer<char>>>(output.bufs);
261 return memory_output_stream<snd_buf::iterator>(memory_output_stream<snd_buf::iterator>::fragmented(ar.begin(), output.size));
265 template <
typename Serializer,
typename... T>
266 inline snd_buf marshall(Serializer& serializer,
size_t head_space,
const T&... args) {
267 measuring_output_stream measure;
268 do_marshall(serializer, measure, args...);
269 snd_buf ret(measure.size() + head_space);
270 auto out = make_serializer_stream(ret);
271 out.skip(head_space);
272 do_marshall(serializer, out, args...);
276 template <
typename Serializer,
typename Input,
typename... T>
277 std::tuple<T...> do_unmarshall(connection& c, Input& in);
279 template<
typename Serializer,
typename Input>
283 return read(c.serializer<Serializer>(), in, type<T>());
289 return optional<T>(read(c.serializer<Serializer>(), in, type<
typename remove_optional<T>::type>()));
303 return deserialize_connection_id(
id);
307 return sink<T...>(make_shared<
sink_impl<Serializer, T...>>(c.get_stream(get_connection_id(in))));
312 return source<T...>(make_shared<
source_impl<Serializer, T...>>(c.get_stream(get_connection_id(in))));
317 return do_unmarshall<Serializer, Input, T...>(c, in);
322 template <
typename Serializer,
typename Input,
typename... T>
323 inline std::tuple<T...> do_unmarshall(
connection& c, Input& in) {
328 std::tuple<std::optional<T>...> temporary;
329 return std::apply([&] (
auto&... args) {
332 return std::tuple(std::move(*args)...);
336 template <
typename Serializer,
typename... T>
337 inline std::tuple<T...> unmarshall(connection& c, rcv_buf input) {
338 auto in = make_deserializer_stream(input);
339 return do_unmarshall<Serializer, decltype(in), T...>(c, in);
342 inline std::exception_ptr unmarshal_exception(rcv_buf& d) {
343 std::exception_ptr ex;
344 auto data = make_deserializer_stream(d);
347 data.read(
reinterpret_cast<char*
>(&v32), 4);
348 exception_type ex_type = exception_type(le_to_cpu(v32));
349 data.read(
reinterpret_cast<char*
>(&v32), 4);
350 uint32_t ex_len = le_to_cpu(v32);
353 case exception_type::USER: {
354 std::string s(ex_len,
'\0');
355 data.read(&*s.begin(), ex_len);
356 ex = std::make_exception_ptr(remote_verb_error(std::move(s)));
359 case exception_type::UNKNOWN_VERB: {
361 data.read(
reinterpret_cast<char*
>(&v64), 8);
362 ex = std::make_exception_ptr(unknown_verb_error(le_to_cpu(v64)));
366 ex = std::make_exception_ptr(unknown_exception_error());
372 template <
typename Payload,
typename... T>
376 template<
typename... V>
377 void set_value(V&&... v) {
379 p.
set_value(internal::untuple(std::forward<V>(v))...);
388 template<
typename Serializer,
typename T>
391 this->set_value(unmarshall<Serializer, T>(dst, std::move(input)));
395 template<
typename Serializer,
typename... T>
398 this->set_value(unmarshall<Serializer, T...>(dst, std::move(input)));
402 template<
typename Serializer>
409 template<
typename Serializer>
412 template <
typename Serializer,
typename Ret,
typename... InArgs>
416 auto lambda = [] (reply_type& r,
rpc::client& dst, id_type msg_id,
rcv_buf data)
mutable {
418 dst.get_stats_internal().replied++;
419 return r.get_reply(dst, std::move(data));
421 dst.get_stats_internal().exception_received++;
423 r.p.set_exception(unmarshal_exception(data));
426 using handler_type =
typename rpc::client::template reply_handler<reply_type, decltype(lambda)>;
427 auto r = std::make_unique<handler_type>(std::move(lambda));
428 auto fut = r->reply.p.get_future();
429 dst.wait_for_reply(msg_id, std::move(r), timeout, cancel);
433 template<
typename Serializer,
typename... InArgs>
434 inline auto wait_for_reply(no_wait_type, std::optional<rpc_clock_type::time_point>, cancellable*, rpc::client&, id_type,
435 signature<no_wait_type (InArgs...)>) {
436 return make_ready_future<>();
439 template<
typename Serializer,
typename... InArgs>
440 inline auto wait_for_reply(no_wait_type, std::optional<rpc_clock_type::time_point>, cancellable*, rpc::client&, id_type,
441 signature<future<no_wait_type> (InArgs...)>) {
442 return make_ready_future<>();
448 inline rpc_clock_type::time_point
449 relative_timeout_to_absolute(rpc_clock_type::duration relative) {
451 return now + std::min(relative, rpc_clock_type::time_point::max() -
now);
455 static constexpr
size_t request_frame_headroom = 28;
461 template<
typename Serializer,
typename MsgType,
typename Ret,
typename... InArgs>
462 auto send_helper(MsgType xt, signature<Ret (InArgs...)> xsig) {
465 signature<Ret (InArgs...)> sig;
466 auto send(rpc::client& dst, std::optional<rpc_clock_type::time_point> timeout, cancellable* cancel,
const InArgs&... args) {
468 using cleaned_ret_type =
typename wait_signature<Ret>::cleaned_type;
469 return futurize<cleaned_ret_type>::make_exception_future(closed_error());
473 auto msg_id = dst.next_message_id();
474 snd_buf data = marshall(dst.template serializer<Serializer>(), request_frame_headroom, args...);
477 using wait = wait_signature_t<Ret>;
478 return when_all(dst.request(uint64_t(t), msg_id, std::move(data), timeout, cancel), wait_for_reply<Serializer>(wait(), timeout, cancel, dst, msg_id, sig)).then([] (
auto r) {
479 std::get<0>(r).ignore_ready_future();
480 return std::move(std::get<1>(r));
483 auto operator()(rpc::client& dst,
const InArgs&... args) {
484 return send(dst, {},
nullptr, args...);
486 auto operator()(rpc::client& dst, rpc_clock_type::time_point timeout,
const InArgs&... args) {
487 return send(dst, timeout,
nullptr, args...);
489 auto operator()(rpc::client& dst, rpc_clock_type::duration timeout,
const InArgs&... args) {
490 return send(dst, relative_timeout_to_absolute(timeout),
nullptr, args...);
492 auto operator()(rpc::client& dst, cancellable& cancel,
const InArgs&... args) {
493 return send(dst, {}, &cancel, args...);
497 return shelper{xt, xsig};
501 static constexpr
size_t response_frame_headroom = 12;
503 template<
typename Serializer,
typename RetTypes>
504 inline future<> reply(wait_type, future<RetTypes>&& ret, int64_t msg_id, shared_ptr<server::connection> client,
505 std::optional<rpc_clock_type::time_point> timeout) {
506 if (!client->error()) {
509 if constexpr (std::is_void_v<RetTypes>) {
511 data = std::invoke(marshall<Serializer>,
std::ref(client->template serializer<Serializer>()), response_frame_headroom);
513 data = std::invoke(marshall<Serializer, const RetTypes&>,
std::ref(client->template serializer<Serializer>()), response_frame_headroom, std::move(ret.get0()));
515 }
catch (std::exception& ex) {
516 uint32_t len = std::strlen(ex.what());
517 data = snd_buf(response_frame_headroom + 2 *
sizeof(uint32_t) + len);
518 auto os = make_serializer_stream(data);
519 os.skip(response_frame_headroom);
520 uint32_t v32 = cpu_to_le(uint32_t(exception_type::USER));
521 os.write(
reinterpret_cast<char*
>(&v32),
sizeof(v32));
522 v32 = cpu_to_le(len);
523 os.write(
reinterpret_cast<char*
>(&v32),
sizeof(v32));
524 os.write(ex.what(), len);
528 return client->respond(msg_id, std::move(data), timeout);
530 ret.ignore_ready_future();
531 return make_ready_future<>();
536 template<
typename Serializer>
537 inline future<> reply(no_wait_type, future<no_wait_type>&& r, int64_t msgid, shared_ptr<server::connection> client, std::optional<rpc_clock_type::time_point>) {
540 }
catch (std::exception& ex) {
541 client->get_logger()(client->info(), msgid, to_sstring(
"exception \"") + ex.what() +
"\" in no_wait handler ignored");
543 return make_ready_future<>();
546 template<
typename Ret,
typename... InArgs,
typename WantClientInfo,
typename WantTimePoint,
typename Func,
typename ArgsTuple>
547 inline futurize_t<Ret> apply(Func& func, client_info& info, opt_time_point time_point, WantClientInfo wci, WantTimePoint wtp, signature<Ret (InArgs...)>, ArgsTuple&& args) {
548 using futurator = futurize<Ret>;
549 return futurator::apply(func, maybe_add_client_info(wci, info, maybe_add_time_point(wtp, time_point, std::forward<ArgsTuple>(args))));
554 auto lref_to_cref(T&& x) {
559 auto lref_to_cref(T& x) {
565 template <
typename Serializer,
typename Func,
typename Ret,
typename... InArgs,
typename WantClientInfo,
typename WantTimePoint>
566 auto recv_helper(signature<Ret (InArgs...)> sig, Func&& func, WantClientInfo, WantTimePoint) {
567 using signature = decltype(sig);
568 using wait_style = wait_signature_t<Ret>;
569 return [func = lref_to_cref(std::forward<Func>(func))](shared_ptr<server::connection> client,
570 std::optional<rpc_clock_type::time_point> timeout,
572 rcv_buf data)
mutable {
573 auto memory_consumed = client->estimate_request_size(data.size);
574 if (memory_consumed > client->max_request_size()) {
575 auto err =
format(
"request size {:d} large than memory limit {:d}", memory_consumed, client->max_request_size());
576 client->get_logger()(client->peer_address(), err);
578 (void)try_with_gate(client->get_server().reply_gate(), [client, timeout, msg_id, err = std::move(err)] {
579 return reply<Serializer>(wait_style(), futurize<Ret>::make_exception_future(std::runtime_error(err.c_str())), msg_id, client, timeout).handle_exception([client, msg_id] (std::exception_ptr eptr) {
580 client->get_logger()(client->info(), msg_id,
format(
"got exception while processing an oversized message: {}", eptr));
582 }).handle_exception_type([] (gate_closed_exception&) {});
586 auto f = client->wait_for_resources(memory_consumed, timeout).then([client, timeout, msg_id, data = std::move(data), &func] (
auto permit)
mutable {
588 (void)try_with_gate(client->get_server().reply_gate(), [client, timeout, msg_id, data = std::move(data), permit = std::move(permit), &func] ()
mutable {
590 auto args = unmarshall<Serializer, InArgs...>(*client, std::move(data));
591 return apply(func, client->info(), timeout, WantClientInfo(), WantTimePoint(), signature(), std::move(args)).then_wrapped([client, timeout, msg_id, permit = std::move(permit)] (futurize_t<Ret> ret) mutable {
592 return reply<Serializer>(wait_style(), std::move(ret), msg_id, client, timeout).handle_exception([permit = std::move(permit), client, msg_id] (std::exception_ptr eptr) {
593 client->get_logger()(client->info(), msg_id, format(
"got exception while processing a message: {}", eptr));
597 client->get_logger()(client->info(), msg_id,
format(
"caught exception while processing a message: {}", std::current_exception()));
600 }).handle_exception_type([] (gate_closed_exception&) {});
604 f = f.handle_exception_type([] (semaphore_timed_out&) { });
612 template<
typename Func>
613 auto make_copyable_function(Func&& func, std::enable_if_t<!std::is_copy_constructible_v<std::decay_t<Func>>,
void*> =
nullptr) {
614 auto p = make_lw_shared<typename std::decay_t<Func>>(std::forward<Func>(func));
615 return [p] (
auto&&... args) {
return (*p)( std::forward<decltype(args)>(args)... ); };
618 template<
typename Func>
619 auto make_copyable_function(Func&& func, std::enable_if_t<std::is_copy_constructible_v<std::decay_t<Func>>,
void*> =
nullptr) {
620 return std::forward<Func>(func);
630 template<
typename Ret,
typename... In>
632 template<
typename T,
bool IsSmartPtr>
633 struct drop_smart_ptr_impl;
635 struct drop_smart_ptr_impl<T, true> {
636 using type =
typename T::element_type;
639 struct drop_smart_ptr_impl<T, false> {
643 using drop_smart_ptr = drop_smart_ptr_impl<T, is_smart_ptr<T>::value>;
646 using return_type =
typename drop_smart_ptr<Ret>::type;
648 using type = return_type(
typename remove_optional<In>::type...);
651 template<
typename Serializer,
typename MsgType>
652 template<
typename Ret,
typename... In>
655 return send_helper<Serializer>(t, sig_type());
658 template<
typename Serializer,
typename MsgType>
659 template<
typename Func>
664 template<
typename Serializer,
typename MsgType>
665 template<
typename Func>
668 using clean_sig_type =
typename sig_type::clean;
669 using want_client_info =
typename sig_type::want_client_info;
670 using want_time_point =
typename sig_type::want_time_point;
671 auto recv = recv_helper<Serializer>(clean_sig_type(), std::forward<Func>(func),
672 want_client_info(), want_time_point());
673 register_receiver(t,
rpc_handler{sg, make_copyable_function(std::move(recv)), {}});
674 return make_client(clean_sig_type(), t);
677 template<
typename Serializer,
typename MsgType>
678 template<
typename Func>
683 template<
typename Serializer,
typename MsgType>
685 auto it = _handlers.find(t);
686 if (it != _handlers.end()) {
687 return it->second.use_gate.close().finally([
this, t] {
691 return make_ready_future<>();
694 template<
typename Serializer,
typename MsgType>
696 auto it = _handlers.find(msg_id);
697 if (it == _handlers.end()) {
700 return !it->second.use_gate.is_closed();
703 template<
typename Serializer,
typename MsgType>
704 rpc_handler* protocol<Serializer, MsgType>::get_handler(uint64_t msg_id) {
705 rpc_handler* h =
nullptr;
706 auto it = _handlers.find(MsgType(msg_id));
707 if (it != _handlers.end()) {
709 it->second.use_gate.enter();
711 }
catch (gate_closed_exception&) {
718 template<
typename Serializer,
typename MsgType>
719 void protocol<Serializer, MsgType>::put_handler(rpc_handler* h) {
723 template<
typename T> T make_shard_local_buffer_copy(foreign_ptr<std::unique_ptr<T>> org);
725 template<
typename Serializer,
typename... Out>
726 future<> sink_impl<Serializer, Out...>::operator()(
const Out&... args) {
729 snd_buf data = marshall(this->_con->get()->template serializer<Serializer>(), 4, args...);
730 static_assert(snd_buf::chunk_size >= 4,
"send buffer chunk size is too small");
731 auto p = data.front().get_write();
732 write_le<uint32_t>(p, data.size - 4);
735 auto size = std::min(
size_t(data.size), max_stream_buffers_memory);
736 const auto seq_num = _next_seq_num++;
737 return get_units(this->_sem, size).then([
this, data = make_foreign(std::make_unique<snd_buf>(std::move(data))), seq_num] (semaphore_units<> su)
mutable {
743 (void)
smp::submit_to(this->_con->get_owner_shard(), [
this, data = std::move(data), seq_num] ()
mutable {
744 connection* con = this->_con->get();
748 if(con->sink_closed()) {
752 auto& last_seq_num = _remote_state.last_seq_num;
753 auto& out_of_order_bufs = _remote_state.out_of_order_bufs;
755 auto local_data = make_shard_local_buffer_copy(std::move(data));
756 const auto seq_num_diff = seq_num - last_seq_num;
757 if (seq_num_diff > 1) {
758 auto [it, _] = out_of_order_bufs.emplace(seq_num, deferred_snd_buf{
promise<>{}, std::move(local_data)});
759 return it->second.pr.get_future();
762 last_seq_num = seq_num;
763 auto ret_fut = con->send(std::move(local_data), {},
nullptr);
764 while (!out_of_order_bufs.empty() && out_of_order_bufs.begin()->first == (last_seq_num + 1)) {
765 auto it = out_of_order_bufs.begin();
766 last_seq_num = it->first;
767 auto fut = con->send(std::move(it->second.data), {},
nullptr);
768 fut.forward_to(std::move(it->second.pr));
769 out_of_order_bufs.erase(it);
772 }).then_wrapped([su = std::move(su),
this] (
future<> f) {
773 if (f.failed() && !this->_ex) {
774 this->_ex = f.get_exception();
776 f.ignore_ready_future();
779 return make_ready_future<>();
783 template<
typename Serializer,
typename... Out>
784 future<> sink_impl<Serializer, Out...>::flush() {
786 return with_semaphore(this->_sem, max_stream_buffers_memory, [
this] {
794 template<
typename Serializer,
typename... Out>
795 future<> sink_impl<Serializer, Out...>::close() {
796 return with_semaphore(this->_sem, max_stream_buffers_memory, [
this] {
798 connection* con = this->_con->get();
799 if (con->sink_closed()) {
800 return make_exception_future(stream_closed());
803 if (!con->error() && !this->_ex) {
804 snd_buf data = marshall(con->template serializer<Serializer>(), 4);
805 static_assert(snd_buf::chunk_size >= 4,
"send buffer chunk size is too small");
806 auto p = data.front().get_write();
807 write_le<uint32_t>(p, -1U);
808 f = con->send(std::move(data), {},
nullptr);
812 return f.finally([con] {
return con->close_sink(); });
817 template<
typename Serializer,
typename... Out>
818 sink_impl<Serializer, Out...>::~sink_impl() {
821 assert(this->_con->get()->sink_closed());
824 template<
typename Serializer,
typename... In>
825 future<std::optional<std::tuple<In...>>> source_impl<Serializer, In...>::operator()() {
826 auto process_one_buffer = [
this] {
827 foreign_ptr<std::unique_ptr<rcv_buf>> buf = std::move(this->_bufs.front());
828 this->_bufs.pop_front();
829 return std::apply([] (In&&... args) {
830 auto ret = std::make_optional(std::make_tuple(std::move(args)...));
832 }, unmarshall<Serializer, In...>(*this->_con->get(), make_shard_local_buffer_copy(std::move(buf))));
835 if (!this->_bufs.empty()) {
836 return process_one_buffer();
841 connection* con = this->_con->get();
842 if (con->_source_closed) {
843 return make_exception_future<>(stream_closed());
847 return con->close_source().then_wrapped([ex = f.get_exception()] (future<> f){
848 f.ignore_ready_future();
849 return make_exception_future<>(ex);
852 if (this->_bufs.empty()) {
853 return con->close_source().then_wrapped([] (future<> f) {
854 f.ignore_ready_future();
855 return make_ready_future<>();
858 return make_ready_future<>();
860 }).then([
this, process_one_buffer] () {
861 if (this->_bufs.empty()) {
864 return process_one_buffer();
869 template<
typename... Out>
870 connection_id sink<Out...>::get_id()
const {
871 return _impl->_con->get()->get_connection_id();
874 template<
typename... In>
875 connection_id source<In...>::get_id()
const {
876 return _impl->_con->get()->get_connection_id();
879 template<
typename... In>
880 template<
typename Serializer,
typename... Out>
881 sink<Out...> source<In...>::make_sink() {
882 return sink<Out...>(make_shared<sink_impl<Serializer, Out...>>(_impl->_con));
891 struct hash<
seastar::rpc::streaming_domain_type> {
894 boost::hash_combine(h, std::hash<uint64_t>{}(domain._id));
A representation of a possibly not-yet-computed value.
Definition: future.hh:1238
futurize_t< FuncResult > then_wrapped(Func &&func) &noexcept
Schedule a block of code to run when the future is ready, allowing for exception handling.
Definition: future.hh:1511
static time_point now() noexcept
Definition: lowres_clock.hh:77
promise - allows a future value to be made available at a later time.
Definition: future.hh:926
void set_value(A &&... a) noexcept
Sets the promises value.
Definition: future.hh:982
Definition: reference_wrapper.hh:43
Definition: rpc_impl.hh:631
Definition: rpc_types.hh:138
Definition: rpc_types.hh:65
Definition: rpc_types.hh:192
Definition: rpc_types.hh:187
Definition: rpc_types.hh:309
Definition: rpc_types.hh:348
Definition: rpc_types.hh:389
Identifies function calls that are accounted as a group.
Definition: scheduling.hh:286
static futurize_t< std::invoke_result_t< Func > > submit_to(unsigned t, smp_submit_to_options options, Func &&func) noexcept
Definition: smp.hh:354
future< T > make_ready_future(A &&... value) noexcept
Creates a future in an available, value state.
Definition: future.hh:1934
future< T > make_exception_future(std::exception_ptr &&value) noexcept
Creates a future in an available, failed state.
Definition: future.hh:1940
auto when_all(FutOrFuncs &&... fut_or_funcs) noexcept
Definition: when_all.hh:255
future now()
Returns a ready future.
Definition: later.hh:35
reference_wrapper< T > ref(T &object) noexcept
Wraps reference in a reference_wrapper.
Definition: reference_wrapper.hh:62
std::future< T > submit_to(instance &instance, unsigned shard, Func func)
Definition: alien.hh:206
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
sstring format(const char *fmt, A &&... a)
Definition: print.hh:142
Definition: function_traits.hh:62
Definition: rpc_types.hh:202
Definition: rpc_types.hh:96
Definition: rpc_impl.hh:56
Definition: rpc_impl.hh:60
Definition: rpc_impl.hh:57
Definition: rpc_impl.hh:61
Definition: rpc_impl.hh:214
Definition: rpc_impl.hh:213
Definition: rpc_types.hh:178
Definition: rpc_types.hh:238
Definition: rpc_impl.hh:373
Definition: rpc_impl.hh:389
Definition: rpc_impl.hh:44
Definition: rpc_impl.hh:191
Definition: rpc_impl.hh:65
Definition: rpc_impl.hh:281
Definition: rpc_impl.hh:280
Definition: rpc_impl.hh:122
Definition: rpc_impl.hh:53