24 #include <unordered_map>
25 #include <unordered_set>
28 #include <boost/intrusive/list.hpp>
29 #include <seastar/core/future.hh>
30 #include <seastar/core/seastar.hh>
31 #include <seastar/net/api.hh>
32 #include <seastar/core/iostream.hh>
33 #include <seastar/core/shared_ptr.hh>
34 #include <seastar/core/condition-variable.hh>
35 #include <seastar/core/gate.hh>
36 #include <seastar/rpc/rpc_types.hh>
37 #include <seastar/core/byteorder.hh>
38 #include <seastar/core/shared_future.hh>
39 #include <seastar/core/queue.hh>
40 #include <seastar/core/weak_ptr.hh>
42 #include <seastar/util/backtrace.hh>
43 #include <seastar/util/log.hh>
45 namespace bi = boost::intrusive;
64 using id_type = int64_t;
66 using rpc_semaphore = basic_semaphore<semaphore_default_exception_factory, rpc_clock_type>;
67 using resource_permit = semaphore_units<semaphore_default_exception_factory, rpc_clock_type>;
69 static constexpr
char rpc_magic[] =
"SSTARRPC";
104 using asyncronous_isolation_function = std::function<future<isolation_config> (sstring isolation_cookie)>;
105 using isolation_function_alternatives = std::variant<syncronous_isolation_function, asyncronous_isolation_function>;
110 std::optional<net::tcp_keepalive_params> keepalive;
111 bool tcp_nodelay =
true;
112 bool reuseaddr =
false;
114 bool send_timeout_data =
true;
120 sstring metrics_domain =
"default";
153 bool tcp_nodelay =
true;
154 std::optional<streaming_domain_type> streaming_domain;
155 server_socket::load_balancing_algorithm load_balancing_algorithm = server_socket::load_balancing_algorithm::default_;
160 std::function<bool(
const socket_address&)> filter_connection = {};
167 estimate_request_size(
const resource_limits& lim,
size_t serialized_size) {
172 char magic[
sizeof(rpc_magic) - 1];
176 enum class protocol_features : uint32_t {
185 using feature_map = std::map<protocol_features, sstring>;
188 template <
typename Function>
192 std::function<void(
const sstring&)> _logger;
196 void log(
const sstring& str)
const {
197 if (_seastar_logger) {
199 _seastar_logger->
info(
"{}", str);
200 }
else if (_logger) {
206 template <
typename... Args>
207 void log(
log_level level,
const char* fmt, Args&&... args)
const {
208 if (_seastar_logger) {
209 _seastar_logger->log(level, fmt, std::forward<Args>(args)...);
213 }
else if (_logger && level <= log_level::info) {
214 _logger(
format(fmt, std::forward<Args>(args)...));
219 void set(std::function<
void(
const sstring&)> l) {
220 _logger = std::move(l);
227 void operator()(
const client_info& info, id_type msg_id,
const sstring& str)
const;
228 void operator()(
const client_info& info, id_type msg_id,
log_level level, std::string_view str)
const;
230 void operator()(
const client_info& info,
const sstring& str)
const;
233 void operator()(
const socket_address& addr,
const sstring& str)
const;
243 bool _connected =
false;
251 struct outgoing_entry :
public bi::list_base_hook<bi::link_mode<bi::auto_unlink>> {
261 void uncancellable() {
264 pcancel->cancel_send = std::function<void()>();
270 pcancel->cancel_send = std::function<void()>();
271 pcancel->send_back_pointer =
nullptr;
275 using container_t = bi::list<outgoing_entry, bi::constant_time_size<false>>;
277 void withdraw(outgoing_entry::container_t::iterator it, std::exception_ptr ex =
nullptr);
278 future<> _outgoing_queue_ready = _negotiated->get_shared_future();
279 outgoing_entry::container_t _outgoing_queue;
280 size_t _outgoing_queue_size = 0;
281 std::unique_ptr<compressor> _compressor;
282 bool _propagate_timeout =
false;
283 bool _timeout_negotiated =
false;
285 bool _is_stream =
false;
288 std::unordered_map<connection_id, xshard_connection_ptr> _streams;
291 bool _sink_closed =
true;
292 bool _source_closed =
true;
295 future<bool> _sink_closed_future = make_ready_future<bool>(
false);
297 void set_negotiated() noexcept;
299 bool is_stream() const noexcept {
305 future<> send(
snd_buf buf, std::optional<rpc_clock_type::time_point> timeout = {}, cancellable* cancel =
nullptr);
306 future<> send_entry(outgoing_entry& d);
307 future<> stop_send_loop(std::exception_ptr ex);
309 bool stream_check_twoway_closed() const noexcept {
310 return _sink_closed && _source_closed;
313 future<> stream_process_incoming(rcv_buf&&);
317 connection(connected_socket&& fd,
const logger& l,
void* s, connection_id
id = invalid_connection_id) : connection(l, s, id) {
318 set_socket(std::move(fd));
320 connection(
const logger& l,
void* s, connection_id
id = invalid_connection_id) : _logger(l), _serializer(s), _id(id) {}
321 virtual ~connection() {}
322 size_t outgoing_queue_length() const noexcept {
323 return _outgoing_queue_size;
326 void set_socket(connected_socket&& fd);
327 future<> send_negotiation_frame(feature_map features);
328 bool error() const noexcept {
return _error; }
331 future<> stream_receive(circular_buffer<foreign_ptr<std::unique_ptr<rcv_buf>>>& bufs);
332 future<> close_sink() {
334 if (stream_check_twoway_closed()) {
335 return stream_close();
339 bool sink_closed() const noexcept {
343 _source_closed =
true;
344 if (stream_check_twoway_closed()) {
345 return stream_close();
349 connection_id get_connection_id() const noexcept {
352 stats& get_stats_internal() noexcept {
355 xshard_connection_ptr get_stream(connection_id
id)
const;
356 void register_stream(connection_id
id, xshard_connection_ptr c);
357 virtual socket_address peer_address()
const = 0;
359 const logger& get_logger() const noexcept {
363 template<
typename Serializer>
364 Serializer& serializer() {
365 return *
static_cast<Serializer*
>(_serializer);
368 template <
typename FrameType>
369 typename FrameType::return_type read_frame(socket_address info, input_stream<char>& in);
371 template <
typename FrameType>
372 typename FrameType::return_type read_frame_compressed(socket_address info, std::unique_ptr<compressor>& compressor, input_stream<char>& in);
374 template<
typename Serializer,
typename... Out>
375 friend class sink_impl;
376 template<
typename Serializer,
typename... In>
377 friend class source_impl;
380 _outgoing_queue_ready.
get();
381 auto dummy = std::make_unique<outgoing_entry>(snd_buf());
382 _outgoing_queue.push_back(*dummy);
383 _outgoing_queue_ready = dummy->done.get_future();
384 (void)p.
get_future().then([dummy = std::move(dummy)] { dummy->done.set_value(); });
394 template<
typename Serializer,
typename... Out>
397 alignas (cache_line_size) uint64_t _next_seq_num = 1;
400 struct alignas (cache_line_size) {
401 uint64_t last_seq_num = 0;
402 std::map<uint64_t, deferred_snd_buf> out_of_order_bufs;
406 future<> operator()(
const Out&... args)
override;
413 template<
typename Serializer,
typename... In>
417 future<std::optional<std::tuple<In...>>> operator()()
override;
422 id_type _message_id = 1;
423 struct reply_handler_base {
427 virtual void timeout() {}
428 virtual void cancel() {}
429 virtual ~reply_handler_base() {
431 pcancel->cancel_wait = std::function<void()>();
432 pcancel->wait_back_pointer =
nullptr;
440 using domain_member_hook_t = boost::intrusive::list_member_hook<boost::intrusive::link_mode<boost::intrusive::auto_unlink>>;
444 domain_member_hook_t _link;
446 using domain_list_t = boost::intrusive::list<metrics,
447 boost::intrusive::member_hook<metrics, metrics::domain_member_hook_t, &metrics::_link>,
448 boost::intrusive::constant_time_size<false>>;
455 void enqueue_zero_frame();
457 template<
typename Reply,
typename Func>
463 return func(reply,
client, msg_id, std::move(data));
465 virtual void timeout()
override {
469 virtual void cancel()
override {
476 std::unordered_map<id_type, std::unique_ptr<reply_handler_base>> _outstanding;
484 future<> negotiate_protocol(feature_map map);
485 void negotiate(feature_map server_features);
513 stats get_stats()
const;
514 size_t incoming_queue_length() const noexcept {
515 return _outstanding.size();
518 auto next_message_id() {
return _message_id++; }
519 void wait_for_reply(id_type
id, std::unique_ptr<reply_handler_base>&& h, std::optional<rpc_clock_type::time_point> timeout, cancellable* cancel);
520 void wait_timed_out(id_type
id);
522 void abort_all_streams();
523 void deregister_this_stream();
524 socket_address peer_address()
const override {
529 return make_ready_future<>();
531 return _negotiated->get_shared_future();
534 template<
typename Serializer,
typename... Out>
535 future<sink<Out...>> make_stream_sink(socket socket) {
536 return await_connection().then([
this, socket = std::move(socket)] ()
mutable {
537 if (!this->get_connection_id()) {
538 return make_exception_future<sink<Out...>>(std::runtime_error(
"Streaming is not supported by the server"));
540 client_options o = _options;
541 o.stream_parent = this->get_connection_id();
542 o.send_timeout_data =
false;
543 o.metrics_domain +=
"_stream";
544 auto c = make_shared<client>(_logger, _serializer, o, std::move(socket), _server_addr, _local_addr);
545 c->_parent = this->weak_from_this();
546 c->_is_stream =
true;
547 return c->await_connection().then([c,
this] {
549 throw closed_error();
551 xshard_connection_ptr s = make_lw_shared(make_foreign(static_pointer_cast<rpc::connection>(c)));
552 this->register_stream(c->get_connection_id(), s);
553 return sink<Out...>(make_shared<sink_impl<Serializer, Out...>>(std::move(s)));
554 }).handle_exception([c] (std::exception_ptr eptr) {
557 return c->stop().then([eptr, c] {
563 template<
typename Serializer,
typename... Out>
564 future<sink<Out...>> make_stream_sink() {
565 return make_stream_sink<Serializer, Out...>(
make_socket());
568 future<> request(uint64_t type, int64_t
id, snd_buf buf, std::optional<rpc_clock_type::time_point> timeout = {}, cancellable* cancel =
nullptr);
575 static thread_local std::unordered_map<streaming_domain_type, server*> _servers;
581 std::optional<isolation_config> _isolation_config;
587 future<> send_unknown_verb_reply(std::optional<rpc_clock_type::time_point> timeout, int64_t msg_id, uint64_t type);
591 future<> respond(int64_t msg_id,
snd_buf&& data, std::optional<rpc_clock_type::time_point> timeout);
594 stats get_stats()
const {
596 res.pending = outgoing_queue_length();
603 future<resource_permit> wait_for_resources(
size_t memory_consumed, std::optional<rpc_clock_type::time_point> timeout) {
605 return get_units(get_server()._resources_available, memory_consumed, *timeout);
607 return get_units(get_server()._resources_available, memory_consumed);
610 size_t estimate_request_size(
size_t serialized_size) {
611 return rpc::estimate_request_size(get_server()._limits, serialized_size);
613 size_t max_request_size()
const {
614 return get_server()._limits.max_memory;
619 const server& get_server()
const {
630 std::unordered_map<connection_id, shared_ptr<connection>> _conns;
634 bool _shutdown =
false;
635 uint64_t _next_client_id = 1;
662 template<
typename Func>
663 void foreach_connection(Func&& f) {
664 for (
auto c : _conns) {
683 using rpc_handler_func = std::function<future<> (
shared_ptr<server::connection>, std::optional<rpc_clock_type::time_point> timeout, int64_t msgid,
688 rpc_handler_func func;
699 virtual rpc_handler* get_handler(uint64_t msg_id) = 0;
795 template<
typename Serializer,
typename MsgType = u
int32_t>
820 rpc::client(p.get_logger(), &p._serializer, addr, local) {}
822 rpc::client(p.get_logger(), &p._serializer, options, addr, local) {}
835 rpc::client(p.get_logger(), &p._serializer, options, std::move(
socket), addr, local) {}
840 std::unordered_map<MsgType, rpc_handler> _handlers;
841 Serializer _serializer;
845 protocol(Serializer&& serializer) : _serializer(std::forward<Serializer>(serializer)) {}
856 template<
typename Func>
857 auto make_client(MsgType t);
871 template<
typename Func>
872 auto register_handler(MsgType t, Func&& func);
891 template <
typename Func>
892 auto register_handler(MsgType t, scheduling_group sg, Func&& func);
904 future<> unregister_handler(MsgType t);
910 [[deprecated(
"Use set_logger(::seastar::logger*) instead")]]
912 _logger.set(std::move(
logger));
920 const logger& get_logger()
const {
925 return make_shared<rpc::server::connection>(server, std::move(fd), std::move(addr), _logger, &_serializer,
id);
928 bool has_handler(MsgType msg_id);
935 return !_handlers.empty();
939 rpc_handler* get_handler(uint64_t msg_id)
override;
942 template<
typename Ret,
typename... In>
943 auto make_client(
signature<Ret(In...)> sig, MsgType t);
945 void register_receiver(MsgType t,
rpc_handler&& handler) {
946 auto r = _handlers.emplace(t, std::move(handler));
948 throw_with_backtrace<std::runtime_error>(
"registered handler already exists");
959 #include "rpc_impl.hh"
static constexpr size_t max_counter() noexcept
Returns the maximum number of units the semaphore counter can hold.
Definition: semaphore.hh:231
Definition: shared_ptr.hh:501
value_type && get()
gets the value returned by the computation
Definition: future.hh:1340
Logger class for ostream or syslog.
Definition: log.hh:90
void info(format_info fmt, Args &&... args) noexcept
Definition: log.hh:355
Definition: shared_ptr.hh:270
Definition: rpc_types.hh:164
client(const logger &l, void *s, const socket_address &addr, const socket_address &local={})
client(const logger &l, void *s, socket socket, const socket_address &addr, const socket_address &local={})
Definition: rpc_types.hh:288
Definition: rpc_types.hh:65
Represents a client side connection.
Definition: rpc.hh:811
client(protocol &p, socket socket, const socket_address &addr, const socket_address &local={})
Definition: rpc.hh:832
Represents the listening port and all accepted connections.
Definition: rpc.hh:799
void set_logger(std::function< void(const sstring &)> logger)
Definition: rpc.hh:911
bool has_handlers() const noexcept
Definition: rpc.hh:934
void set_logger(::seastar::logger *logger)
Set a logger to be used to log messages.
Definition: rpc.hh:916
void abort_connection(connection_id id)
Definition: rpc_types.hh:311
Definition: rpc_types.hh:309
Definition: rpc_types.hh:350
Definition: rpc_types.hh:348
Definition: rpc_types.hh:143
Identifies function calls that are accounted as a group.
Definition: scheduling.hh:286
A listening socket, waiting to accept incoming network connections.
Definition: api.hh:325
Like promise except that its counterpart is shared_future instead of future.
Definition: shared_future.hh:316
Definition: shared_ptr.hh:515
Definition: socket_defs.hh:47
Definition: weak_ptr.hh:46
Definition: weak_ptr.hh:120
basic_semaphore< semaphore_default_exception_factory > semaphore
Definition: semaphore.hh:804
future< T > make_ready_future(A &&... value) noexcept
Creates a future in an available, value state.
Definition: future.hh:1934
future< T > get_future() noexcept
Gets the promise's associated future.
Definition: future.hh:1917
future< T > make_exception_future(std::exception_ptr &&value) noexcept
Creates a future in an available, failed state.
Definition: future.hh:1940
scheduling_group sched_group
Definition: rpc.hh:78
size_t max_memory
Definition: rpc.hh:100
unsigned bloat_factor
Serialized size multiplied by this to estimate memory used by request.
Definition: rpc.hh:99
std::function< isolation_config(sstring isolation_cookie)> syncronous_isolation_function
Definition: rpc.hh:103
size_t basic_request_size
Minimum request footprint in memory.
Definition: rpc.hh:98
sstring isolation_cookie
Definition: rpc.hh:119
isolation_config default_isolate_connection(sstring isolation_cookie)
Specifies resource isolation for a connection.
Definition: rpc.hh:75
Resource limits for an RPC server.
Definition: rpc.hh:97
statistics stats()
Capture a snapshot of memory allocation statistics for this lcore.
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
scheduling_group current_scheduling_group() noexcept
Returns the current scheduling group.
Definition: scheduling.hh:400
sstring format(const char *fmt, A &&... a)
Definition: print.hh:142
log_level
log level used with
Definition: log.hh:52
Definition: rpc_types.hh:202
Definition: rpc_types.hh:96
Definition: rpc_types.hh:238
Definition: rpc_types.hh:250
Definition: rpc_types.hh:55