24 #include <seastar/net/api.hh>
27 #include <boost/any.hpp>
28 #include <boost/type.hpp>
29 #include <seastar/util/std-compat.hh>
30 #include <seastar/util/variant_utils.hh>
32 #include <seastar/core/circular_buffer.hh>
33 #include <seastar/core/simple-stream.hh>
34 #include <seastar/core/lowres_clock.hh>
35 #include <boost/functional/hash.hpp>
36 #include <seastar/core/sharded.hh>
42 using rpc_clock_type = lowres_clock;
46 using type = boost::type<T>;
49 using counter_type = uint64_t;
50 counter_type replied = 0;
51 counter_type pending = 0;
52 counter_type exception_received = 0;
53 counter_type sent_messages = 0;
54 counter_type wait_reply = 0;
55 counter_type timeout = 0;
61 std::unordered_map<sstring, boost::any> user_data;
63 void attach_auxiliary(
const sstring& key, T&&
object) {
64 user_data.emplace(key, boost::any(std::forward<T>(
object)));
67 T& retrieve_auxiliary(
const sstring& key) {
68 auto it = user_data.find(key);
69 assert(it != user_data.end());
70 return boost::any_cast<T&>(it->second);
73 typename std::add_const<T>::type& retrieve_auxiliary(
const sstring& key)
const {
74 return const_cast<client_info*
>(
this)->retrieve_auxiliary<
typename std::add_const<T>::type>(key);
78 class error :
public std::runtime_error {
80 error(
const std::string& msg) : std::runtime_error(msg) {}
131 template <
typename T>
134 using std::optional<T>::optional;
139 using std::optional<rpc_clock_type::time_point>::optional;
140 opt_time_point(std::optional<rpc_clock_type::time_point> time_point) {
141 static_cast<std::optional<rpc_clock_type::time_point>&
>(*this) = time_point;
148 std::function<void()> cancel_send;
149 std::function<void()> cancel_wait;
153 cancellable(
cancellable&& x) : cancel_send(std::move(x.cancel_send)), cancel_wait(std::move(x.cancel_wait)), send_back_pointer(x.send_back_pointer), wait_back_pointer(x.wait_back_pointer) {
154 if (send_back_pointer) {
155 *send_back_pointer =
this;
156 x.send_back_pointer =
nullptr;
158 if (wait_back_pointer) {
159 *wait_back_pointer =
this;
160 x.wait_back_pointer =
nullptr;
185 std::optional<semaphore_units<>> su;
187 using iterator = std::vector<temporary_buffer<char>>::iterator;
189 explicit rcv_buf(
size_t size_) : size(size_) {}
192 : size(size), bufs(std::move(bufs)) {};
197 static constexpr
size_t chunk_size = 128*1024;
200 using iterator = std::vector<temporary_buffer<char>>::iterator;
204 explicit snd_buf(
size_t size_);
208 : size(size), bufs(std::move(bufs)) {};
214 auto* b = std::get_if<temporary_buffer<char>>(&input.bufs);
218 auto& ar = std::get<std::vector<temporary_buffer<char>>>(input.bufs);
230 virtual sstring name()
const = 0;
237 virtual const sstring& supported()
const = 0;
239 virtual std::unique_ptr<compressor> negotiate(sstring feature,
bool is_server)
const = 0;
250 operator bool()
const {
251 return shard() != 0xffff;
253 size_t shard()
const {
254 return size_t(
id & 0xffff);
256 constexpr
static connection_id make_invalid_id(uint64_t
id = 0) {
257 return make_id(
id, 0xffff);
259 constexpr
static connection_id make_id(uint64_t
id, uint16_t shard) {
260 return {
id << 16 | shard};
264 constexpr
connection_id invalid_connection_id = connection_id::make_invalid_id();
266 std::ostream& operator<<(std::ostream&,
const connection_id&);
269 constexpr
size_t max_queued_stream_buffers = 50;
270 constexpr
size_t max_stream_buffers_memory = 100 * 1024;
276 template<
typename... Out>
283 std::exception_ptr _ex;
287 virtual future<> operator()(
const Out&... args) = 0;
298 future<> operator()(
const Out&... args) {
299 return _impl->operator()(args...);
302 return _impl->close();
309 return _impl->flush();
311 connection_id get_id()
const;
315 template<
typename... In>
323 _bufs.reserve(max_queued_stream_buffers);
327 virtual future<std::optional<std::tuple<In...>>> operator()() = 0;
335 future<std::optional<std::tuple<In...>>> operator()() {
336 return _impl->operator()();
338 connection_id get_id()
const;
339 template<
typename Serializer,
typename... Out> sink<Out...> make_sink();
356 template <
typename... T>
357 class tuple :
public std::tuple<T...> {
360 tuple(std::tuple<T...>&& x) : std::tuple<T...>(std::move(x)) {}
365 template <
typename... T>
377 boost::hash_combine(h, std::hash<uint64_t>{}(
id.id));
382 template <
typename... T>
383 struct tuple_size<
seastar::rpc::tuple<T...>> : tuple_size<tuple<T...>> {
386 template <
size_t I,
typename... T>
387 struct tuple_element<I,
seastar::rpc::tuple<T...>> : tuple_element<I, tuple<T...>> {
Definition: circular_buffer.hh:59
Definition: shared_ptr.hh:255
Definition: rpc_types.hh:109
Definition: rpc_types.hh:83
Definition: rpc_types.hh:233
Definition: rpc_types.hh:223
Definition: rpc_types.hh:78
Definition: rpc_types.hh:137
Definition: rpc_types.hh:132
Definition: rpc_types.hh:119
Definition: rpc_types.hh:104
Definition: rpc_types.hh:277
Definition: rpc_types.hh:316
Definition: rpc_types.hh:114
Definition: rpc_types.hh:88
Definition: rpc_types.hh:357
Definition: rpc_types.hh:99
Definition: rpc_types.hh:93
Definition: socket_defs.hh:41
size_t size() const noexcept
Gets the buffer size.
Definition: temporary_buffer.hh:125
holds the implementation parts of the metrics layer, do not use directly.
Seastar API namespace.
Definition: abort_on_ebadf.hh:24
Definition: rpc_types.hh:147
Definition: rpc_types.hh:59
Definition: rpc_types.hh:245
Definition: rpc_types.hh:123
Definition: rpc_types.hh:183
Definition: rpc_types.hh:195
Definition: rpc_types.hh:48