24#if FMT_VERSION >= 90000
25#include <fmt/ostream.h>
27#if FMT_VERSION >= 100000
31#include <seastar/net/api.hh>
34#include <boost/any.hpp>
35#include <boost/type.hpp>
36#include <seastar/util/std-compat.hh>
37#include <seastar/util/variant_utils.hh>
39#include <seastar/core/circular_buffer.hh>
40#include <seastar/core/simple-stream.hh>
41#include <seastar/core/lowres_clock.hh>
42#include <boost/functional/hash.hpp>
43#include <seastar/core/sharded.hh>
49using rpc_clock_type = lowres_clock;
53using type = boost::type<T>;
56 using counter_type = uint64_t;
57 counter_type replied = 0;
58 counter_type pending = 0;
59 counter_type exception_received = 0;
60 counter_type sent_messages = 0;
61 counter_type wait_reply = 0;
62 counter_type timeout = 0;
63 counter_type delay_samples = 0;
64 std::chrono::duration<double> delay_total = std::chrono::duration<double>(0);
77 explicit operator bool()
const {
78 return shard() != 0xffff;
80 size_t shard()
const {
81 return size_t(_id & 0xffff);
83 constexpr static connection_id make_invalid_id(uint64_t _id = 0) {
84 return make_id(_id, 0xffff);
86 constexpr static connection_id make_id(uint64_t _id, uint16_t shard) {
87 return {_id << 16 | shard};
92constexpr connection_id invalid_connection_id = connection_id::make_invalid_id();
102 std::unordered_map<sstring, boost::any> user_data;
103 template <
typename T>
104 void attach_auxiliary(
const sstring& key, T&&
object) {
105 user_data.emplace(key, boost::any(std::forward<T>(
object)));
107 template <
typename T>
108 T& retrieve_auxiliary(
const sstring& key) {
109 auto it = user_data.find(key);
110 assert(it != user_data.end());
111 return boost::any_cast<T&>(it->second);
113 template <
typename T>
114 std::add_const_t<T>& retrieve_auxiliary(
const sstring& key)
const {
115 return const_cast<client_info*
>(
this)->retrieve_auxiliary<std::add_const_t<T>>(key);
117 template <
typename T>
118 T* retrieve_auxiliary_opt(
const sstring& key)
noexcept {
119 auto it = user_data.find(key);
120 if (it == user_data.end()) {
123 return &boost::any_cast<T&>(it->second);
125 template <
typename T>
126 const T* retrieve_auxiliary_opt(
const sstring& key)
const noexcept {
127 auto it = user_data.find(key);
128 if (it == user_data.end()) {
131 return &boost::any_cast<const T&>(it->second);
135class error :
public std::runtime_error {
137 error(
const std::string& msg) : std::runtime_error(msg) {}
191 using std::optional<T>::optional;
196 using std::optional<rpc_clock_type::time_point>::optional;
197 opt_time_point(std::optional<rpc_clock_type::time_point> time_point) {
198 static_cast<std::optional<rpc_clock_type::time_point>&
>(*this) = time_point;
205 std::function<void()> cancel_send;
206 std::function<void()> cancel_wait;
210 cancellable(
cancellable&& x) : cancel_send(std::move(x.cancel_send)), cancel_wait(std::move(x.cancel_wait)), send_back_pointer(x.send_back_pointer), wait_back_pointer(x.wait_back_pointer) {
211 if (send_back_pointer) {
212 *send_back_pointer =
this;
213 x.send_back_pointer =
nullptr;
215 if (wait_back_pointer) {
216 *wait_back_pointer =
this;
217 x.wait_back_pointer =
nullptr;
242 std::optional<semaphore_units<>> su;
244 using iterator = std::vector<temporary_buffer<char>>::iterator;
246 explicit rcv_buf(
size_t size_) : size(size_) {}
249 : size(size), bufs(std::move(bufs)) {};
254 static constexpr size_t chunk_size = 128*1024;
257 using iterator = std::vector<temporary_buffer<char>>::iterator;
261 explicit snd_buf(
size_t size_);
265 : size(size), bufs(std::move(bufs)) {};
271 auto* b = std::get_if<temporary_buffer<char>>(&input.bufs);
275 auto& ar = std::get<std::vector<temporary_buffer<char>>>(input.bufs);
287 virtual sstring name()
const = 0;
288 virtual future<> close()
noexcept {
return make_ready_future<>(); };
295 virtual const sstring& supported()
const = 0;
300 virtual std::unique_ptr<compressor> negotiate(sstring feature,
bool is_server, std::function<
future<>()> send_empty_frame)
const {
301 return negotiate(feature, is_server);
303 virtual std::unique_ptr<compressor> negotiate(sstring feature,
bool is_server)
const = 0;
310constexpr size_t max_queued_stream_buffers = 50;
311constexpr size_t max_stream_buffers_memory = 100 * 1024;
317template<
typename... Out>
324 std::exception_ptr _ex;
328 virtual future<> operator()(
const Out&... args) = 0;
339 future<> operator()(
const Out&... args) {
340 return _impl->operator()(args...);
343 return _impl->close();
350 return _impl->flush();
352 connection_id get_id()
const;
356template<
typename... In>
364 _bufs.reserve(max_queued_stream_buffers);
368 virtual future<std::optional<std::tuple<In...>>> operator()() = 0;
376 future<std::optional<std::tuple<In...>>> operator()() {
377 return _impl->operator()();
379 connection_id get_id()
const;
380 template<
typename Serializer,
typename... Out> sink<Out...> make_sink();
397template <
typename... T>
398class tuple :
public std::tuple<T...> {
400 using std::tuple<T...>::tuple;
401 tuple(std::tuple<T...>&& x) : std::tuple<T...>(std::move(x)) {}
406template <
typename... T>
418 boost::hash_combine(h, std::hash<uint64_t>{}(
id.id()));
423template <
typename... T>
424struct tuple_size<
seastar::rpc::tuple<T...>> : tuple_size<tuple<T...>> {
427template <
size_t I,
typename... T>
428struct tuple_element<I,
seastar::rpc::tuple<T...>> : tuple_element<I, tuple<T...>> {
433#if FMT_VERSION >= 90000
434template <>
struct fmt::formatter<
seastar::rpc::connection_id> : fmt::ostream_formatter {};
437#if FMT_VERSION < 100000
439template <std::derived_from<seastar::rpc::error> T>
440struct fmt::formatter<T> : fmt::formatter<string_view> {
441 auto format(
const T& e, fmt::format_context& ctx)
const {
442 return fmt::format_to(ctx.out(),
"{}", e.what());
447#if FMT_VERSION < 100000
449struct fmt::formatter<
seastar::rpc::optional<T>> {
450 constexpr auto parse(format_parse_context& ctx) {
return ctx.begin(); }
453 return fmt::format_to(ctx.out(),
"optional({})", *opt);
455 return fmt::format_to(ctx.out(),
"none");
461struct fmt::formatter<
seastar::rpc::optional<T>> :
private fmt::formatter<std::optional<T>> {
462 using fmt::formatter<std::optional<T>>::parse;
464 return fmt::formatter<std::optional<T>>::format(opt, ctx);
Definition: circular_buffer.hh:63
A representation of a possibly not-yet-computed value.
Definition: future.hh:1240
Definition: rpc_types.hh:166
Definition: rpc_types.hh:140
Definition: rpc_types.hh:291
Definition: rpc_types.hh:280
Definition: rpc_types.hh:67
Definition: rpc_types.hh:135
Definition: rpc_types.hh:194
Definition: rpc_types.hh:189
Definition: rpc_types.hh:176
Definition: rpc_types.hh:161
Definition: rpc_types.hh:318
Definition: rpc_types.hh:357
Definition: rpc_types.hh:171
Definition: rpc_types.hh:145
Definition: rpc_types.hh:398
Definition: rpc_types.hh:156
Definition: rpc_types.hh:150
Definition: socket_defs.hh:47
size_t size() const noexcept
Gets the buffer size.
Definition: temporary_buffer.hh:130
holds the implementation parts of the metrics layer, do not use directly.
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
Definition: rpc_types.hh:204
Definition: rpc_types.hh:98
Definition: rpc_types.hh:180
Definition: rpc_types.hh:240
Definition: rpc_types.hh:252
Definition: rpc_types.hh:55