24#if FMT_VERSION >= 90000
25#include <fmt/ostream.h>
27#if FMT_VERSION >= 100000
31#include <seastar/net/api.hh>
35#include <seastar/util/std-compat.hh>
36#include <seastar/util/variant_utils.hh>
38#include <seastar/core/circular_buffer.hh>
39#include <seastar/core/simple-stream.hh>
40#include <seastar/core/lowres_clock.hh>
41#include <boost/functional/hash.hpp>
42#include <seastar/core/sharded.hh>
48using rpc_clock_type = lowres_clock;
52using type = std::type_identity<T>;
55 using counter_type = uint64_t;
56 counter_type replied = 0;
57 counter_type pending = 0;
58 counter_type exception_received = 0;
59 counter_type sent_messages = 0;
60 counter_type wait_reply = 0;
61 counter_type timeout = 0;
62 counter_type delay_samples = 0;
63 std::chrono::duration<double> delay_total = std::chrono::duration<double>(0);
76 explicit operator bool()
const {
77 return shard() != 0xffff;
79 size_t shard()
const {
80 return size_t(_id & 0xffff);
82 constexpr static connection_id make_invalid_id(uint64_t _id = 0) {
83 return make_id(_id, 0xffff);
85 constexpr static connection_id make_id(uint64_t _id, uint16_t shard) {
86 return {_id << 16 | shard};
91constexpr connection_id invalid_connection_id = connection_id::make_invalid_id();
101 std::unordered_map<sstring, std::any> user_data;
102 template <
typename T>
103 void attach_auxiliary(
const sstring& key, T&&
object) {
104 user_data.emplace(key, std::any(std::forward<T>(
object)));
106 template <
typename T>
107 T& retrieve_auxiliary(
const sstring& key) {
108 auto it = user_data.find(key);
109 assert(it != user_data.end());
110 return std::any_cast<T&>(it->second);
112 template <
typename T>
113 std::add_const_t<T>& retrieve_auxiliary(
const sstring& key)
const {
114 return const_cast<client_info*
>(
this)->retrieve_auxiliary<std::add_const_t<T>>(key);
116 template <
typename T>
117 T* retrieve_auxiliary_opt(
const sstring& key)
noexcept {
118 auto it = user_data.find(key);
119 if (it == user_data.end()) {
122 return &std::any_cast<T&>(it->second);
124 template <
typename T>
125 const T* retrieve_auxiliary_opt(
const sstring& key)
const noexcept {
126 auto it = user_data.find(key);
127 if (it == user_data.end()) {
130 return &std::any_cast<const T&>(it->second);
134class error :
public std::runtime_error {
136 error(
const std::string& msg) : std::runtime_error(msg) {}
190 using std::optional<T>::optional;
195 using std::optional<rpc_clock_type::time_point>::optional;
196 opt_time_point(std::optional<rpc_clock_type::time_point> time_point) {
197 static_cast<std::optional<rpc_clock_type::time_point>&
>(*this) = time_point;
204 std::function<void()> cancel_send;
205 std::function<void()> cancel_wait;
209 cancellable(
cancellable&& x) : cancel_send(std::move(x.cancel_send)), cancel_wait(std::move(x.cancel_wait)), send_back_pointer(x.send_back_pointer), wait_back_pointer(x.wait_back_pointer) {
210 if (send_back_pointer) {
211 *send_back_pointer =
this;
212 x.send_back_pointer =
nullptr;
214 if (wait_back_pointer) {
215 *wait_back_pointer =
this;
216 x.wait_back_pointer =
nullptr;
241 std::optional<semaphore_units<>> su;
243 using iterator = std::vector<temporary_buffer<char>>::iterator;
245 explicit rcv_buf(
size_t size_) : size(size_) {}
248 : size(size), bufs(std::move(bufs)) {};
253 static constexpr size_t chunk_size = 128*1024;
256 using iterator = std::vector<temporary_buffer<char>>::iterator;
260 explicit snd_buf(
size_t size_);
264 : size(size), bufs(std::move(bufs)) {};
270 auto* b = std::get_if<temporary_buffer<char>>(&input.bufs);
274 auto& ar = std::get<std::vector<temporary_buffer<char>>>(input.bufs);
286 virtual sstring name()
const = 0;
287 virtual future<> close()
noexcept {
return make_ready_future<>(); };
294 virtual const sstring& supported()
const = 0;
299 virtual std::unique_ptr<compressor> negotiate(sstring feature,
bool is_server, std::function<
future<>()> send_empty_frame)
const {
300 return negotiate(feature, is_server);
302 virtual std::unique_ptr<compressor> negotiate(sstring feature,
bool is_server)
const = 0;
309constexpr size_t max_queued_stream_buffers = 50;
310constexpr size_t max_stream_buffers_memory = 100 * 1024;
316template<
typename... Out>
323 std::exception_ptr _ex;
327 virtual future<> operator()(
const Out&... args) = 0;
338 future<> operator()(
const Out&... args) {
339 return _impl->operator()(args...);
342 return _impl->close();
349 return _impl->flush();
351 connection_id get_id()
const;
355template<
typename... In>
363 _bufs.reserve(max_queued_stream_buffers);
367 virtual future<std::optional<std::tuple<In...>>> operator()() = 0;
375 future<std::optional<std::tuple<In...>>> operator()() {
376 return _impl->operator()();
378 connection_id get_id()
const;
379 template<
typename Serializer,
typename... Out> sink<Out...> make_sink();
396template <
typename... T>
397class tuple :
public std::tuple<T...> {
399 using std::tuple<T...>::tuple;
400 tuple(std::tuple<T...>&& x) : std::tuple<T...>(std::move(x)) {}
405#ifndef SEASTAR_P2581R1
406template <
typename... T>
419 boost::hash_combine(h, std::hash<uint64_t>{}(
id.id()));
424template <
typename... T>
425struct tuple_size<
seastar::rpc::tuple<T...>> : tuple_size<tuple<T...>> {
428template <
size_t I,
typename... T>
429struct tuple_element<I,
seastar::rpc::tuple<T...>> : tuple_element<I, tuple<T...>> {
434#if FMT_VERSION >= 90000
435template <>
struct fmt::formatter<
seastar::rpc::connection_id> : fmt::ostream_formatter {};
438#if FMT_VERSION < 100000
440template <std::derived_from<seastar::rpc::error> T>
441struct fmt::formatter<T> : fmt::formatter<string_view> {
442 auto format(
const T& e, fmt::format_context& ctx)
const {
443 return fmt::format_to(ctx.out(),
"{}", e.what());
448#if FMT_VERSION < 100000
450struct fmt::formatter<
seastar::rpc::optional<T>> {
451 constexpr auto parse(format_parse_context& ctx) {
return ctx.begin(); }
454 return fmt::format_to(ctx.out(),
"optional({})", *opt);
456 return fmt::format_to(ctx.out(),
"none");
462struct fmt::formatter<
seastar::rpc::optional<T>> :
private fmt::formatter<std::optional<T>> {
463 using fmt::formatter<std::optional<T>>::parse;
465 return fmt::formatter<std::optional<T>>::format(opt, ctx);
Definition: circular_buffer.hh:63
A representation of a possibly not-yet-computed value.
Definition: future.hh:1240
Definition: rpc_types.hh:165
Definition: rpc_types.hh:139
Definition: rpc_types.hh:290
Definition: rpc_types.hh:279
Definition: rpc_types.hh:66
Definition: rpc_types.hh:134
Definition: rpc_types.hh:193
Definition: rpc_types.hh:188
Definition: rpc_types.hh:175
Definition: rpc_types.hh:160
Definition: rpc_types.hh:317
Definition: rpc_types.hh:356
Definition: rpc_types.hh:170
Definition: rpc_types.hh:144
Definition: rpc_types.hh:397
Definition: rpc_types.hh:155
Definition: rpc_types.hh:149
Definition: socket_defs.hh:47
size_t size() const noexcept
Gets the buffer size.
Definition: temporary_buffer.hh:130
holds the implementation parts of the metrics layer, do not use directly.
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
Definition: rpc_types.hh:203
Definition: rpc_types.hh:97
Definition: rpc_types.hh:179
Definition: rpc_types.hh:239
Definition: rpc_types.hh:251
Definition: rpc_types.hh:54