24 #if FMT_VERSION >= 9'00'00
25 #include <fmt/ostream.h>
27 #if FMT_VERSION >= 10'00'00
31 #include <seastar/net/api.hh>
34 #include <boost/any.hpp>
35 #include <boost/type.hpp>
36 #include <seastar/util/std-compat.hh>
37 #include <seastar/util/variant_utils.hh>
39 #include <seastar/core/circular_buffer.hh>
40 #include <seastar/core/simple-stream.hh>
41 #include <seastar/core/lowres_clock.hh>
42 #include <boost/functional/hash.hpp>
43 #include <seastar/core/sharded.hh>
49 using rpc_clock_type = lowres_clock;
53 using type = boost::type<T>;
56 using counter_type = uint64_t;
57 counter_type replied = 0;
58 counter_type pending = 0;
59 counter_type exception_received = 0;
60 counter_type sent_messages = 0;
61 counter_type wait_reply = 0;
62 counter_type timeout = 0;
75 explicit operator bool()
const {
76 return shard() != 0xffff;
78 size_t shard()
const {
79 return size_t(_id & 0xffff);
81 constexpr
static connection_id make_invalid_id(uint64_t _id = 0) {
82 return make_id(_id, 0xffff);
84 constexpr
static connection_id make_id(uint64_t _id, uint16_t shard) {
85 return {_id << 16 | shard};
90 constexpr
connection_id invalid_connection_id = connection_id::make_invalid_id();
92 std::ostream& operator<<(std::ostream&,
const connection_id&);
100 std::unordered_map<sstring, boost::any> user_data;
101 template <
typename T>
102 void attach_auxiliary(
const sstring& key, T&&
object) {
103 user_data.emplace(key, boost::any(std::forward<T>(
object)));
105 template <
typename T>
106 T& retrieve_auxiliary(
const sstring& key) {
107 auto it = user_data.find(key);
108 assert(it != user_data.end());
109 return boost::any_cast<T&>(it->second);
111 template <
typename T>
112 std::add_const_t<T>& retrieve_auxiliary(
const sstring& key)
const {
113 return const_cast<client_info*
>(
this)->retrieve_auxiliary<std::add_const_t<T>>(key);
115 template <
typename T>
116 T* retrieve_auxiliary_opt(
const sstring& key) noexcept {
117 auto it = user_data.find(key);
118 if (it == user_data.end()) {
121 return &boost::any_cast<T&>(it->second);
123 template <
typename T>
124 const T* retrieve_auxiliary_opt(
const sstring& key)
const noexcept {
125 auto it = user_data.find(key);
126 if (it == user_data.end()) {
129 return &boost::any_cast<const T&>(it->second);
133 class error :
public std::runtime_error {
135 error(
const std::string& msg) : std::runtime_error(msg) {}
186 template <
typename T>
189 using std::optional<T>::optional;
194 using std::optional<rpc_clock_type::time_point>::optional;
195 opt_time_point(std::optional<rpc_clock_type::time_point> time_point) {
196 static_cast<std::optional<rpc_clock_type::time_point>&
>(*this) = time_point;
203 std::function<void()> cancel_send;
204 std::function<void()> cancel_wait;
208 cancellable(
cancellable&& x) : cancel_send(std::move(x.cancel_send)), cancel_wait(std::move(x.cancel_wait)), send_back_pointer(x.send_back_pointer), wait_back_pointer(x.wait_back_pointer) {
209 if (send_back_pointer) {
210 *send_back_pointer =
this;
211 x.send_back_pointer =
nullptr;
213 if (wait_back_pointer) {
214 *wait_back_pointer =
this;
215 x.wait_back_pointer =
nullptr;
240 std::optional<semaphore_units<>> su;
242 using iterator = std::vector<temporary_buffer<char>>::iterator;
244 explicit rcv_buf(
size_t size_) : size(size_) {}
247 : size(size), bufs(std::move(bufs)) {};
252 static constexpr
size_t chunk_size = 128*1024;
255 using iterator = std::vector<temporary_buffer<char>>::iterator;
259 explicit snd_buf(
size_t size_);
263 : size(size), bufs(std::move(bufs)) {};
269 auto* b = std::get_if<temporary_buffer<char>>(&input.bufs);
273 auto& ar = std::get<std::vector<temporary_buffer<char>>>(input.bufs);
285 virtual sstring name()
const = 0;
292 virtual const sstring& supported()
const = 0;
294 virtual std::unique_ptr<compressor> negotiate(sstring feature,
bool is_server)
const = 0;
301 constexpr
size_t max_queued_stream_buffers = 50;
302 constexpr
size_t max_stream_buffers_memory = 100 * 1024;
308 template<
typename... Out>
315 std::exception_ptr _ex;
319 virtual future<> operator()(
const Out&... args) = 0;
330 future<> operator()(
const Out&... args) {
331 return _impl->operator()(args...);
334 return _impl->close();
341 return _impl->flush();
343 connection_id get_id()
const;
347 template<
typename... In>
355 _bufs.reserve(max_queued_stream_buffers);
359 virtual future<std::optional<std::tuple<In...>>> operator()() = 0;
367 future<std::optional<std::tuple<In...>>> operator()() {
368 return _impl->operator()();
370 connection_id get_id()
const;
371 template<
typename Serializer,
typename... Out> sink<Out...> make_sink();
388 template <
typename... T>
389 class tuple :
public std::tuple<T...> {
392 tuple(std::tuple<T...>&& x) : std::tuple<T...>(std::move(x)) {}
397 template <
typename... T>
409 boost::hash_combine(h, std::hash<uint64_t>{}(
id.id()));
414 template <
typename... T>
415 struct tuple_size<
seastar::rpc::tuple<T...>> : tuple_size<tuple<T...>> {
418 template <
size_t I,
typename... T>
419 struct tuple_element<I,
seastar::rpc::tuple<T...>> : tuple_element<I, tuple<T...>> {
424 #if FMT_VERSION >= 9'00'00
425 template <>
struct fmt::formatter<
seastar::rpc::connection_id> : fmt::ostream_formatter {};
428 #if FMT_VERSION >= 10'00'00
429 template <
typename T>
430 struct fmt::formatter<
seastar::rpc::optional<T>> :
private fmt::formatter<std::optional<T>> {
431 using fmt::formatter<std::optional<T>>::parse;
433 return fmt::formatter<std::optional<T>>::format(opt, ctx);
Definition: circular_buffer.hh:63
Definition: shared_ptr.hh:270
Definition: rpc_types.hh:164
Definition: rpc_types.hh:138
Definition: rpc_types.hh:288
Definition: rpc_types.hh:278
Definition: rpc_types.hh:65
Definition: rpc_types.hh:133
Definition: rpc_types.hh:192
Definition: rpc_types.hh:187
Definition: rpc_types.hh:174
Definition: rpc_types.hh:159
Definition: rpc_types.hh:309
Definition: rpc_types.hh:348
Definition: rpc_types.hh:169
Definition: rpc_types.hh:143
Definition: rpc_types.hh:389
Definition: rpc_types.hh:154
Definition: rpc_types.hh:148
Definition: socket_defs.hh:47
size_t size() const noexcept
Gets the buffer size.
Definition: temporary_buffer.hh:130
holds the implementation parts of the metrics layer, do not use directly.
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
sstring format(const char *fmt, A &&... a)
Definition: print.hh:142
Definition: rpc_types.hh:202
Definition: rpc_types.hh:96
Definition: rpc_types.hh:178
Definition: rpc_types.hh:238
Definition: rpc_types.hh:250
Definition: rpc_types.hh:55