24#include <unordered_map>
25#include <unordered_set>
28#include <boost/intrusive/list.hpp>
29#include <seastar/core/future.hh>
30#include <seastar/core/seastar.hh>
31#include <seastar/net/api.hh>
32#include <seastar/core/iostream.hh>
33#include <seastar/core/shared_ptr.hh>
34#include <seastar/core/condition-variable.hh>
35#include <seastar/core/gate.hh>
36#include <seastar/rpc/rpc_types.hh>
37#include <seastar/core/byteorder.hh>
38#include <seastar/core/shared_future.hh>
39#include <seastar/core/queue.hh>
40#include <seastar/core/weak_ptr.hh>
42#include <seastar/util/backtrace.hh>
43#include <seastar/util/log.hh>
45namespace bi = boost::intrusive;
64using id_type = int64_t;
66using rpc_semaphore = basic_semaphore<semaphore_default_exception_factory, rpc_clock_type>;
67using resource_permit = semaphore_units<semaphore_default_exception_factory, rpc_clock_type>;
69static constexpr char rpc_magic[] =
"SSTARRPC";
104 using asyncronous_isolation_function = std::function<future<isolation_config> (sstring isolation_cookie)>;
105 using isolation_function_alternatives = std::variant<syncronous_isolation_function, asyncronous_isolation_function>;
110 std::optional<net::tcp_keepalive_params> keepalive;
111 bool tcp_nodelay =
true;
112 bool reuseaddr =
false;
114 bool send_timeout_data =
true;
120 sstring metrics_domain =
"default";
121 bool send_handler_duration =
true;
154 bool tcp_nodelay =
true;
155 std::optional<streaming_domain_type> streaming_domain;
156 server_socket::load_balancing_algorithm load_balancing_algorithm = server_socket::load_balancing_algorithm::default_;
161 std::function<bool(
const socket_address&)> filter_connection = {};
168estimate_request_size(
const resource_limits& lim,
size_t serialized_size) {
173 char magic[
sizeof(rpc_magic) - 1];
177enum class protocol_features : uint32_t {
183 HANDLER_DURATION = 5,
187using feature_map = std::map<protocol_features, sstring>;
190template <
typename Function>
194 std::function<void(
const sstring&)> _logger;
195 ::seastar::logger* _seastar_logger =
nullptr;
198 void log(
const sstring& str)
const {
199 if (_seastar_logger) {
201 _seastar_logger->info(
"{}", str);
202 }
else if (_logger) {
208 template <
typename... Args>
209#ifdef SEASTAR_LOGGER_COMPILE_TIME_FMT
210 void log(
log_level level, fmt::format_string<Args...> fmt, Args&&... args)
const {
212 void log(
log_level level,
const char* fmt, Args&&... args)
const {
214 if (_seastar_logger) {
215 _seastar_logger->log(level, fmt, std::forward<Args>(args)...);
219 }
else if (_logger && level <= log_level::info) {
220 fmt::memory_buffer out;
221#ifdef SEASTAR_LOGGER_COMPILE_TIME_FMT
222 fmt::format_to(fmt::appender(out), fmt, std::forward<Args>(args)...);
224 fmt::format_to(fmt::appender(out), fmt::runtime(fmt), std::forward<Args>(args)...);
226 _logger(sstring{out.data(), out.size()});
231 void set(std::function<
void(
const sstring&)> l) {
232 _logger = std::move(l);
235 void set(::seastar::logger*
logger) {
239 void operator()(
const client_info& info, id_type msg_id,
const sstring& str)
const;
240 void operator()(
const client_info& info, id_type msg_id,
log_level level, std::string_view str)
const;
242 void operator()(
const client_info& info,
const sstring& str)
const;
245 void operator()(
const socket_address& addr,
const sstring& str)
const;
255 bool _connected =
false;
263 struct outgoing_entry :
public bi::list_base_hook<bi::link_mode<bi::auto_unlink>> {
273 void uncancellable() {
276 pcancel->cancel_send = std::function<void()>();
282 pcancel->cancel_send = std::function<void()>();
283 pcancel->send_back_pointer =
nullptr;
287 using container_t = bi::list<outgoing_entry, bi::constant_time_size<false>>;
289 void withdraw(outgoing_entry::container_t::iterator it, std::exception_ptr ex =
nullptr);
290 future<> _outgoing_queue_ready = _negotiated->get_shared_future();
291 outgoing_entry::container_t _outgoing_queue;
292 size_t _outgoing_queue_size = 0;
293 std::unique_ptr<compressor> _compressor;
294 bool _propagate_timeout =
false;
295 bool _timeout_negotiated =
false;
296 bool _handler_duration_negotiated =
false;
298 bool _is_stream =
false;
301 std::unordered_map<connection_id, xshard_connection_ptr> _streams;
304 bool _sink_closed =
true;
305 bool _source_closed =
true;
308 future<bool> _sink_closed_future = make_ready_future<bool>(
false);
310 void set_negotiated() noexcept;
312 bool is_stream() const noexcept {
318 future<> send(
snd_buf buf, std::optional<rpc_clock_type::time_point> timeout = {}, cancellable* cancel =
nullptr);
319 future<> send_entry(outgoing_entry& d)
noexcept;
320 future<> stop_send_loop(std::exception_ptr ex);
322 bool stream_check_twoway_closed() const noexcept {
323 return _sink_closed && _source_closed;
325 future<> stream_close();
326 future<> stream_process_incoming(rcv_buf&&);
327 future<> handle_stream_frame();
330 connection(connected_socket&& fd,
const logger& l,
void* s, connection_id
id = invalid_connection_id) : connection(l, s, id) {
331 set_socket(std::move(fd));
333 connection(
const logger& l,
void* s, connection_id
id = invalid_connection_id) : _logger(l), _serializer(s), _id(id) {}
334 virtual ~connection() {}
335 size_t outgoing_queue_length() const noexcept {
336 return _outgoing_queue_size;
339 void set_socket(connected_socket&& fd);
340 future<> send_negotiation_frame(feature_map features);
341 bool error() const noexcept {
return _error; }
343 future<> stop() noexcept;
344 future<> stream_receive(circular_buffer<foreign_ptr<
std::unique_ptr<rcv_buf>>>& bufs);
345 future<> close_sink() {
347 if (stream_check_twoway_closed()) {
348 return stream_close();
352 bool sink_closed() const noexcept {
355 future<> close_source() {
356 _source_closed =
true;
357 if (stream_check_twoway_closed()) {
358 return stream_close();
362 connection_id get_connection_id() const noexcept {
365 stats& get_stats_internal() noexcept {
368 xshard_connection_ptr get_stream(connection_id
id)
const;
369 void register_stream(connection_id
id, xshard_connection_ptr c);
370 virtual socket_address peer_address()
const = 0;
372 const logger& get_logger() const noexcept {
376 template<
typename Serializer>
377 Serializer& serializer() {
378 return *
static_cast<Serializer*
>(_serializer);
381 template <
typename FrameType>
382 future<typename FrameType::return_type> read_frame(socket_address info, input_stream<char>& in);
384 template <
typename FrameType>
385 future<typename FrameType::return_type> read_frame_compressed(socket_address info, std::unique_ptr<compressor>& compressor, input_stream<char>& in);
387 template<
typename Serializer,
typename... Out>
388 friend class sink_impl;
389 template<
typename Serializer,
typename... In>
390 friend class source_impl;
393 _outgoing_queue_ready.
get();
394 auto dummy = std::make_unique<outgoing_entry>(snd_buf());
395 _outgoing_queue.push_back(*dummy);
396 _outgoing_queue_ready = dummy->done.get_future();
397 (void)p.
get_future().then([dummy = std::move(dummy)] { dummy->done.set_value(); });
407template<
typename Serializer,
typename... Out>
410 alignas (cache_line_size) uint64_t _next_seq_num = 1;
413 struct alignas (cache_line_size) {
414 uint64_t last_seq_num = 0;
415 std::map<uint64_t, deferred_snd_buf> out_of_order_bufs;
419 future<> operator()(
const Out&... args)
override;
426template<
typename Serializer,
typename... In>
430 future<std::optional<std::tuple<In...>>> operator()()
override;
435 id_type _message_id = 1;
436 struct reply_handler_base {
439 rpc_clock_type::time_point start;
441 virtual void timeout() {}
442 virtual void cancel() {}
443 virtual ~reply_handler_base() {
445 pcancel->cancel_wait = std::function<void()>();
446 pcancel->wait_back_pointer =
nullptr;
454 using domain_member_hook_t = boost::intrusive::list_member_hook<boost::intrusive::link_mode<boost::intrusive::auto_unlink>>;
458 domain_member_hook_t _link;
460 using domain_list_t = boost::intrusive::list<metrics,
461 boost::intrusive::member_hook<metrics, metrics::domain_member_hook_t, &metrics::_link>,
462 boost::intrusive::constant_time_size<false>>;
469 void enqueue_zero_frame();
471 template<
typename Reply,
typename Func>
477 return func(reply,
client, msg_id, std::move(data));
479 virtual void timeout()
override {
483 virtual void cancel()
override {
490 std::unordered_map<id_type, std::unique_ptr<reply_handler_base>> _outstanding;
498 future<> negotiate_protocol(feature_map map);
499 void negotiate(feature_map server_features);
531 stats get_stats()
const;
532 size_t incoming_queue_length() const noexcept {
533 return _outstanding.size();
536 auto next_message_id() {
return _message_id++; }
537 void wait_for_reply(id_type
id, std::unique_ptr<reply_handler_base>&& h, std::optional<rpc_clock_type::time_point> timeout, cancellable* cancel);
538 void wait_timed_out(id_type
id);
539 future<> stop() noexcept;
540 void abort_all_streams();
541 void deregister_this_stream();
542 socket_address peer_address()
const override {
545 future<> await_connection() {
547 return make_ready_future<>();
549 return _negotiated->get_shared_future();
552 template<
typename Serializer,
typename... Out>
553 future<sink<Out...>> make_stream_sink(socket socket) {
554 return await_connection().then([
this, socket = std::move(socket)] ()
mutable {
555 if (!this->get_connection_id()) {
556 return make_exception_future<sink<Out...>>(std::runtime_error(
"Streaming is not supported by the server"));
558 client_options o = _options;
559 o.stream_parent = this->get_connection_id();
560 o.send_timeout_data =
false;
561 o.metrics_domain +=
"_stream";
562 auto c = make_shared<client>(_logger, _serializer, o, std::move(socket), _server_addr, _local_addr);
563 c->_parent = this->weak_from_this();
564 c->_is_stream =
true;
565 return c->await_connection().then([c,
this] {
567 throw closed_error();
569 xshard_connection_ptr s = make_lw_shared(make_foreign(static_pointer_cast<rpc::connection>(c)));
570 this->register_stream(c->get_connection_id(), s);
571 return sink<Out...>(make_shared<sink_impl<Serializer, Out...>>(std::move(s)));
572 }).handle_exception([c] (std::exception_ptr eptr) {
575 return c->stop().then([eptr, c] {
581 template<
typename Serializer,
typename... Out>
582 future<sink<Out...>> make_stream_sink() {
583 return make_stream_sink<Serializer, Out...>(
make_socket());
586 future<> request(uint64_t type, int64_t
id, snd_buf buf, std::optional<rpc_clock_type::time_point> timeout = {}, cancellable* cancel =
nullptr);
593 static thread_local std::unordered_map<streaming_domain_type, server*> _servers;
599 std::optional<isolation_config> _isolation_config;
605 future<> send_unknown_verb_reply(std::optional<rpc_clock_type::time_point> timeout, int64_t msg_id, uint64_t type);
609 future<> respond(int64_t msg_id,
snd_buf&& data, std::optional<rpc_clock_type::time_point> timeout, std::optional<rpc_clock_type::duration> handler_duration);
612 stats get_stats()
const {
614 res.pending = outgoing_queue_length();
621 future<resource_permit> wait_for_resources(
size_t memory_consumed, std::optional<rpc_clock_type::time_point> timeout) {
623 return get_units(get_server()._resources_available, memory_consumed, *timeout);
625 return get_units(get_server()._resources_available, memory_consumed);
628 size_t estimate_request_size(
size_t serialized_size) {
629 return rpc::estimate_request_size(get_server()._limits, serialized_size);
631 size_t max_request_size()
const {
632 return get_server()._limits.max_memory;
637 const server& get_server()
const {
648 std::unordered_map<connection_id, shared_ptr<connection>> _conns;
652 bool _shutdown =
false;
653 uint64_t _next_client_id = 1;
680 template<
typename Func>
681 void foreach_connection(Func&& f) {
682 for (
auto c : _conns) {
706 rpc_handler_func func;
721 virtual std::optional<handler_with_holder> get_handler(uint64_t msg_id) = 0;
816template<
typename Serializer,
typename MsgType = u
int32_t>
841 rpc::client(p.get_logger(), &p._serializer, addr, local) {}
843 rpc::client(p.get_logger(), &p._serializer, options, addr, local) {}
856 rpc::client(p.get_logger(), &p._serializer, options, std::move(
socket), addr, local) {}
861 std::unordered_map<MsgType, rpc_handler> _handlers;
862 Serializer _serializer;
866 protocol(Serializer&& serializer) : _serializer(
std::forward<Serializer>(serializer)) {}
877 template<
typename Func>
878 auto make_client(MsgType t);
892 template<
typename Func>
893 auto register_handler(MsgType t, Func&& func);
912 template <
typename Func>
913 auto register_handler(MsgType t, scheduling_group sg, Func&& func);
925 future<> unregister_handler(MsgType t);
931 [[deprecated(
"Use set_logger(::seastar::logger*) instead")]]
933 _logger.set(std::move(
logger));
941 const logger& get_logger()
const {
946 return make_shared<rpc::server::connection>(server, std::move(fd), std::move(addr), _logger, &_serializer,
id);
949 bool has_handler(MsgType msg_id);
956 return !_handlers.empty();
960 std::optional<handler_with_holder> get_handler(uint64_t msg_id)
override;
962 template<
typename Ret,
typename... In>
963 auto make_client(
signature<Ret(In...)> sig, MsgType t);
965 void register_receiver(MsgType t,
rpc_handler&& handler) {
966 auto r = _handlers.emplace(t, std::move(handler));
968 throw_with_backtrace<std::runtime_error>(
"registered handler already exists");
979#include "rpc_impl.hh"
static constexpr size_t max_counter() noexcept
Returns the maximum number of units the semaphore counter can hold.
Definition: semaphore.hh:231
Definition: shared_ptr.hh:493
A representation of a possibly not-yet-computed value.
Definition: future.hh:1240
value_type && get()
gets the value returned by the computation
Definition: future.hh:1342
Definition: rpc_types.hh:165
client(const logger &l, void *s, const socket_address &addr, const socket_address &local={})
client(const logger &l, void *s, socket socket, const socket_address &addr, const socket_address &local={})
Definition: rpc_types.hh:290
Definition: rpc_types.hh:66
Represents a client side connection.
Definition: rpc.hh:832
client(protocol &p, socket socket, const socket_address &addr, const socket_address &local={})
Definition: rpc.hh:853
Represents the listening port and all accepted connections.
Definition: rpc.hh:820
void set_logger(std::function< void(const sstring &)> logger)
Definition: rpc.hh:932
bool has_handlers() const noexcept
Definition: rpc.hh:955
void set_logger(::seastar::logger *logger)
Set a logger to be used to log messages.
Definition: rpc.hh:937
void abort_connection(connection_id id)
Definition: rpc_types.hh:319
Definition: rpc_types.hh:317
Definition: rpc_types.hh:358
Definition: rpc_types.hh:356
Definition: rpc_types.hh:144
Identifies function calls that are accounted as a group.
Definition: scheduling.hh:285
A listening socket, waiting to accept incoming network connections.
Definition: api.hh:326
Like promise except that its counterpart is shared_future instead of future.
Definition: shared_future.hh:319
Definition: shared_ptr.hh:507
Definition: socket_defs.hh:47
Definition: weak_ptr.hh:46
Definition: weak_ptr.hh:133
basic_semaphore< semaphore_default_exception_factory > semaphore
Definition: semaphore.hh:804
future< T > make_ready_future(A &&... value) noexcept
Creates a future in an available, value state.
Definition: future.hh:1943
future< T > get_future() noexcept
Gets the promise's associated future.
Definition: future.hh:1926
future< T > make_exception_future(std::exception_ptr &&value) noexcept
Creates a future in an available, failed state.
Definition: future.hh:1949
log_level
log level used with
Definition: log.hh:55
scheduling_group sched_group
Definition: rpc.hh:78
size_t max_memory
Definition: rpc.hh:100
unsigned bloat_factor
Serialized size multiplied by this to estimate memory used by request.
Definition: rpc.hh:99
std::function< isolation_config(sstring isolation_cookie)> syncronous_isolation_function
Definition: rpc.hh:103
size_t basic_request_size
Minimum request footprint in memory.
Definition: rpc.hh:98
sstring isolation_cookie
Definition: rpc.hh:119
isolation_config default_isolate_connection(sstring isolation_cookie)
Specifies resource isolation for a connection.
Definition: rpc.hh:75
Resource limits for an RPC server.
Definition: rpc.hh:97
statistics stats()
Capture a snapshot of memory allocation statistics for this lcore.
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
scheduling_group current_scheduling_group() noexcept
Returns the current scheduling group.
Definition: scheduling.hh:397
ostream & operator<<(ostream &os, const seastar::lazy_eval< Func > &lf)
Definition: lazy.hh:131
Definition: rpc_types.hh:203
Definition: rpc_types.hh:97
Definition: rpc_types.hh:239
Definition: rpc_types.hh:251
Definition: rpc_types.hh:54