24#include <unordered_map>
25#include <unordered_set>
28#include <boost/intrusive/list.hpp>
29#include <seastar/core/future.hh>
30#include <seastar/core/seastar.hh>
31#include <seastar/net/api.hh>
32#include <seastar/core/iostream.hh>
33#include <seastar/core/shared_ptr.hh>
34#include <seastar/core/condition-variable.hh>
35#include <seastar/core/gate.hh>
36#include <seastar/rpc/rpc_types.hh>
37#include <seastar/core/byteorder.hh>
38#include <seastar/core/shared_future.hh>
39#include <seastar/core/queue.hh>
40#include <seastar/core/weak_ptr.hh>
42#include <seastar/util/backtrace.hh>
43#include <seastar/util/log.hh>
45namespace bi = boost::intrusive;
64using id_type = int64_t;
66using rpc_semaphore = basic_semaphore<semaphore_default_exception_factory, rpc_clock_type>;
67using resource_permit = semaphore_units<semaphore_default_exception_factory, rpc_clock_type>;
69static constexpr char rpc_magic[] =
"SSTARRPC";
104 using asyncronous_isolation_function = std::function<future<isolation_config> (sstring isolation_cookie)>;
105 using isolation_function_alternatives = std::variant<syncronous_isolation_function, asyncronous_isolation_function>;
110 std::optional<net::tcp_keepalive_params> keepalive;
111 bool tcp_nodelay =
true;
112 bool reuseaddr =
false;
114 bool send_timeout_data =
true;
120 sstring metrics_domain =
"default";
121 bool send_handler_duration =
true;
154 bool tcp_nodelay =
true;
155 std::optional<streaming_domain_type> streaming_domain;
156 server_socket::load_balancing_algorithm load_balancing_algorithm = server_socket::load_balancing_algorithm::default_;
161 std::function<bool(
const socket_address&)> filter_connection = {};
168estimate_request_size(
const resource_limits& lim,
size_t serialized_size) {
173 char magic[
sizeof(rpc_magic) - 1];
177enum class protocol_features : uint32_t {
183 HANDLER_DURATION = 5,
187using feature_map = std::map<protocol_features, sstring>;
190template <
typename Function>
194 ::seastar::logger* _seastar_logger =
nullptr;
196 void log(
const sstring& str)
const {
197 if (_seastar_logger) {
199 _seastar_logger->info(
"{}", str);
204 template <
typename... Args>
205#ifdef SEASTAR_LOGGER_COMPILE_TIME_FMT
206 void log(
log_level level, fmt::format_string<Args...> fmt, Args&&... args)
const {
208 void log(
log_level level,
const char* fmt, Args&&... args)
const {
210 if (_seastar_logger) {
211 _seastar_logger->log(level, fmt, std::forward<Args>(args)...);
216 void set(::seastar::logger*
logger) {
220 void operator()(
const client_info& info, id_type msg_id,
const sstring& str)
const;
221 void operator()(
const client_info& info, id_type msg_id,
log_level level, std::string_view str)
const;
223 void operator()(
const client_info& info,
const sstring& str)
const;
226 void operator()(
const socket_address& addr,
const sstring& str)
const;
236 bool _connected =
false;
244 struct outgoing_entry :
public bi::list_base_hook<bi::link_mode<bi::auto_unlink>> {
254 void uncancellable() {
257 pcancel->cancel_send = std::function<void()>();
263 pcancel->cancel_send = std::function<void()>();
264 pcancel->send_back_pointer =
nullptr;
268 using container_t = bi::list<outgoing_entry, bi::constant_time_size<false>>;
270 void withdraw(outgoing_entry::container_t::iterator it, std::exception_ptr ex =
nullptr);
271 future<> _outgoing_queue_ready = _negotiated->get_shared_future();
272 outgoing_entry::container_t _outgoing_queue;
273 size_t _outgoing_queue_size = 0;
274 std::unique_ptr<compressor> _compressor;
275 bool _propagate_timeout =
false;
276 bool _timeout_negotiated =
false;
277 bool _handler_duration_negotiated =
false;
279 bool _is_stream =
false;
282 std::unordered_map<connection_id, xshard_connection_ptr> _streams;
285 bool _sink_closed =
true;
286 bool _source_closed =
true;
289 future<bool> _sink_closed_future = make_ready_future<bool>(
false);
291 void set_negotiated() noexcept;
293 bool is_stream() const noexcept {
299 future<> send(
snd_buf buf, std::optional<rpc_clock_type::time_point> timeout = {}, cancellable* cancel =
nullptr);
300 future<> send_entry(outgoing_entry& d)
noexcept;
301 future<> stop_send_loop(std::exception_ptr ex);
303 bool stream_check_twoway_closed() const noexcept {
304 return _sink_closed && _source_closed;
306 future<> stream_close();
307 future<> stream_process_incoming(rcv_buf&&);
308 future<> handle_stream_frame();
311 connection(connected_socket&& fd,
const logger& l,
void* s, connection_id
id = invalid_connection_id) : connection(l, s, id) {
312 set_socket(std::move(fd));
314 connection(
const logger& l,
void* s, connection_id
id = invalid_connection_id) : _logger(l), _serializer(s), _id(id) {}
315 virtual ~connection() {}
316 size_t outgoing_queue_length() const noexcept {
317 return _outgoing_queue_size;
320 void set_socket(connected_socket&& fd);
321 future<> send_negotiation_frame(feature_map features);
322 bool error() const noexcept {
return _error; }
324 future<> stop() noexcept;
325 future<> stream_receive(circular_buffer<foreign_ptr<
std::unique_ptr<rcv_buf>>>& bufs);
326 future<> close_sink() {
328 if (stream_check_twoway_closed()) {
329 return stream_close();
333 bool sink_closed() const noexcept {
336 future<> close_source() {
337 _source_closed =
true;
338 if (stream_check_twoway_closed()) {
339 return stream_close();
343 connection_id get_connection_id() const noexcept {
346 stats& get_stats_internal() noexcept {
349 xshard_connection_ptr get_stream(connection_id
id)
const;
350 void register_stream(connection_id
id, xshard_connection_ptr c);
351 virtual socket_address peer_address()
const = 0;
353 const logger& get_logger() const noexcept {
357 template<
typename Serializer>
358 Serializer& serializer() {
359 return *
static_cast<Serializer*
>(_serializer);
362 template <
typename FrameType>
363 future<typename FrameType::return_type> read_frame(socket_address info, input_stream<char>& in);
365 template <
typename FrameType>
366 future<typename FrameType::return_type> read_frame_compressed(socket_address info, std::unique_ptr<compressor>& compressor, input_stream<char>& in);
368 template<
typename Serializer,
typename... Out>
369 friend class sink_impl;
370 template<
typename Serializer,
typename... In>
371 friend class source_impl;
374 _outgoing_queue_ready.
get();
375 auto dummy = std::make_unique<outgoing_entry>(snd_buf());
376 _outgoing_queue.push_back(*dummy);
377 _outgoing_queue_ready = dummy->done.get_future();
378 (void)p.
get_future().then([dummy = std::move(dummy)] { dummy->done.set_value(); });
388template<
typename Serializer,
typename... Out>
391 alignas (cache_line_size) uint64_t _next_seq_num = 1;
394 struct alignas (cache_line_size) {
395 uint64_t last_seq_num = 0;
396 std::map<uint64_t, deferred_snd_buf> out_of_order_bufs;
400 future<> operator()(
const Out&... args)
override;
407template<
typename Serializer,
typename... In>
411 future<std::optional<std::tuple<In...>>> operator()()
override;
416 id_type _message_id = 1;
417 struct reply_handler_base {
420 rpc_clock_type::time_point start;
422 virtual void timeout() {}
423 virtual void cancel() {}
424 virtual ~reply_handler_base() {
426 pcancel->cancel_wait = std::function<void()>();
427 pcancel->wait_back_pointer =
nullptr;
435 using domain_member_hook_t = boost::intrusive::list_member_hook<boost::intrusive::link_mode<boost::intrusive::auto_unlink>>;
439 domain_member_hook_t _link;
441 using domain_list_t = boost::intrusive::list<metrics,
442 boost::intrusive::member_hook<metrics, metrics::domain_member_hook_t, &metrics::_link>,
443 boost::intrusive::constant_time_size<false>>;
450 void enqueue_zero_frame();
452 template<
typename Reply,
typename Func>
458 return func(reply,
client, msg_id, std::move(data));
460 virtual void timeout()
override {
464 virtual void cancel()
override {
471 std::unordered_map<id_type, std::unique_ptr<reply_handler_base>> _outstanding;
479 future<> negotiate_protocol(feature_map map);
480 void negotiate(feature_map server_features);
512 stats get_stats()
const;
513 size_t incoming_queue_length() const noexcept {
514 return _outstanding.size();
517 auto next_message_id() {
return _message_id++; }
518 void wait_for_reply(id_type
id, std::unique_ptr<reply_handler_base>&& h, std::optional<rpc_clock_type::time_point> timeout, cancellable* cancel);
519 void wait_timed_out(id_type
id);
520 future<> stop() noexcept;
521 void abort_all_streams();
522 void deregister_this_stream();
523 socket_address peer_address()
const override {
526 future<> await_connection() {
528 return make_ready_future<>();
530 return _negotiated->get_shared_future();
533 template<
typename Serializer,
typename... Out>
534 future<sink<Out...>> make_stream_sink(socket socket) {
535 return await_connection().then([
this, socket = std::move(socket)] ()
mutable {
536 if (!this->get_connection_id()) {
537 return make_exception_future<sink<Out...>>(std::runtime_error(
"Streaming is not supported by the server"));
539 client_options o = _options;
540 o.stream_parent = this->get_connection_id();
541 o.send_timeout_data =
false;
542 o.metrics_domain +=
"_stream";
543 auto c = make_shared<client>(_logger, _serializer, o, std::move(socket), _server_addr, _local_addr);
544 c->_parent = this->weak_from_this();
545 c->_is_stream =
true;
546 return c->await_connection().then([c,
this] {
548 throw closed_error();
550 xshard_connection_ptr s = make_lw_shared(make_foreign(static_pointer_cast<rpc::connection>(c)));
551 this->register_stream(c->get_connection_id(), s);
552 return sink<Out...>(make_shared<sink_impl<Serializer, Out...>>(std::move(s)));
553 }).handle_exception([c] (std::exception_ptr eptr) {
556 return c->stop().then([eptr, c] {
562 template<
typename Serializer,
typename... Out>
563 future<sink<Out...>> make_stream_sink() {
564 return make_stream_sink<Serializer, Out...>(
make_socket());
567 future<> request(uint64_t type, int64_t
id, snd_buf buf, std::optional<rpc_clock_type::time_point> timeout = {}, cancellable* cancel =
nullptr);
574 static thread_local std::unordered_map<streaming_domain_type, server*> _servers;
580 std::optional<isolation_config> _isolation_config;
586 future<> send_unknown_verb_reply(std::optional<rpc_clock_type::time_point> timeout, int64_t msg_id, uint64_t type);
590 future<> respond(int64_t msg_id,
snd_buf&& data, std::optional<rpc_clock_type::time_point> timeout, std::optional<rpc_clock_type::duration> handler_duration);
593 stats get_stats()
const {
595 res.pending = outgoing_queue_length();
602 future<resource_permit> wait_for_resources(
size_t memory_consumed, std::optional<rpc_clock_type::time_point> timeout) {
604 return get_units(get_server()._resources_available, memory_consumed, *timeout);
606 return get_units(get_server()._resources_available, memory_consumed);
609 size_t estimate_request_size(
size_t serialized_size) {
610 return rpc::estimate_request_size(get_server()._limits, serialized_size);
612 size_t max_request_size()
const {
613 return get_server()._limits.max_memory;
618 const server& get_server()
const {
629 std::unordered_map<connection_id, shared_ptr<connection>> _conns;
633 bool _shutdown =
false;
634 uint64_t _next_client_id = 1;
661 template<
typename Func>
662 void foreach_connection(Func&& f) {
663 for (
auto c : _conns) {
687 rpc_handler_func func;
702 virtual std::optional<handler_with_holder> get_handler(uint64_t msg_id) = 0;
797template<
typename Serializer,
typename MsgType = u
int32_t>
822 rpc::client(p.get_logger(), &p._serializer, addr, local) {}
824 rpc::client(p.get_logger(), &p._serializer, options, addr, local) {}
837 rpc::client(p.get_logger(), &p._serializer, options, std::move(
socket), addr, local) {}
842 std::unordered_map<MsgType, rpc_handler> _handlers;
843 Serializer _serializer;
847 protocol(Serializer&& serializer) : _serializer(
std::forward<Serializer>(serializer)) {}
858 template<
typename Func>
859 auto make_client(MsgType t);
873 template<
typename Func>
874 auto register_handler(MsgType t, Func&& func);
893 template <
typename Func>
894 auto register_handler(MsgType t, scheduling_group sg, Func&& func);
906 future<> unregister_handler(MsgType t);
914 const logger& get_logger()
const {
919 return make_shared<rpc::server::connection>(server, std::move(fd), std::move(addr), _logger, &_serializer,
id);
922 bool has_handler(MsgType msg_id);
929 return !_handlers.empty();
933 std::optional<handler_with_holder> get_handler(uint64_t msg_id)
override;
935 template<
typename Ret,
typename... In>
936 auto make_client(
signature<Ret(In...)> sig, MsgType t);
938 void register_receiver(MsgType t,
rpc_handler&& handler) {
939 auto r = _handlers.emplace(t, std::move(handler));
941 throw_with_backtrace<std::runtime_error>(
"registered handler already exists");
952#include "rpc_impl.hh"
static constexpr size_t max_counter() noexcept
Returns the maximum number of units the semaphore counter can hold.
Definition: semaphore.hh:239
Definition: shared_ptr.hh:493
A representation of a possibly not-yet-computed value.
Definition: future.hh:1198
value_type && get()
gets the value returned by the computation
Definition: future.hh:1300
Definition: rpc_types.hh:166
client(const logger &l, void *s, const socket_address &addr, const socket_address &local={})
client(const logger &l, void *s, socket socket, const socket_address &addr, const socket_address &local={})
Definition: rpc_types.hh:291
Definition: rpc_types.hh:67
Represents a client side connection.
Definition: rpc.hh:813
client(protocol &p, socket socket, const socket_address &addr, const socket_address &local={})
Definition: rpc.hh:834
Represents the listening port and all accepted connections.
Definition: rpc.hh:801
bool has_handlers() const noexcept
Definition: rpc.hh:928
void set_logger(::seastar::logger *logger)
Set a logger to be used to log messages.
Definition: rpc.hh:910
void abort_connection(connection_id id)
Definition: rpc_types.hh:320
Definition: rpc_types.hh:318
Definition: rpc_types.hh:359
Definition: rpc_types.hh:357
Definition: rpc_types.hh:145
Identifies function calls that are accounted as a group.
Definition: scheduling.hh:293
A listening socket, waiting to accept incoming network connections.
Definition: api.hh:326
Like promise except that its counterpart is shared_future instead of future.
Definition: shared_future.hh:319
Definition: shared_ptr.hh:507
Definition: socket_defs.hh:47
Definition: weak_ptr.hh:46
Definition: weak_ptr.hh:133
basic_semaphore< semaphore_default_exception_factory > semaphore
Definition: semaphore.hh:818
future< T > make_ready_future(A &&... value) noexcept
Creates a future in an available, value state.
Definition: future.hh:1870
future< T > get_future() noexcept
Gets the promise's associated future.
Definition: future.hh:1853
future< T > make_exception_future(std::exception_ptr &&value) noexcept
Creates a future in an available, failed state.
Definition: future.hh:1876
log_level
log level used with
Definition: log.hh:54
scheduling_group sched_group
Definition: rpc.hh:78
size_t max_memory
Definition: rpc.hh:100
unsigned bloat_factor
Serialized size multiplied by this to estimate memory used by request.
Definition: rpc.hh:99
std::function< isolation_config(sstring isolation_cookie)> syncronous_isolation_function
Definition: rpc.hh:103
size_t basic_request_size
Minimum request footprint in memory.
Definition: rpc.hh:98
sstring isolation_cookie
Definition: rpc.hh:119
isolation_config default_isolate_connection(sstring isolation_cookie)
Specifies resource isolation for a connection.
Definition: rpc.hh:75
Resource limits for an RPC server.
Definition: rpc.hh:97
statistics stats()
Capture a snapshot of memory allocation statistics for this lcore.
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
scheduling_group current_scheduling_group() noexcept
Returns the current scheduling group.
Definition: scheduling.hh:405
ostream & operator<<(ostream &os, const seastar::lazy_eval< Func > &lf)
Definition: lazy.hh:131
Definition: rpc_types.hh:204
Definition: rpc_types.hh:98
Definition: rpc_types.hh:240
Definition: rpc_types.hh:252
Definition: rpc_types.hh:55