Seastar
High performance C++ framework for concurrent servers
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Modules Pages
rpc.hh
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18/*
19 * Copyright (C) 2015 Cloudius Systems, Ltd.
20 */
21
22#pragma once
23
24#include <unordered_map>
25#include <unordered_set>
26#include <list>
27#include <variant>
28#include <boost/intrusive/list.hpp>
29#include <seastar/core/future.hh>
30#include <seastar/core/seastar.hh>
31#include <seastar/net/api.hh>
32#include <seastar/core/iostream.hh>
33#include <seastar/core/shared_ptr.hh>
34#include <seastar/core/condition-variable.hh>
35#include <seastar/core/gate.hh>
36#include <seastar/rpc/rpc_types.hh>
37#include <seastar/core/byteorder.hh>
38#include <seastar/core/shared_future.hh>
39#include <seastar/core/queue.hh>
40#include <seastar/core/weak_ptr.hh>
42#include <seastar/util/backtrace.hh>
43#include <seastar/util/log.hh>
44
45namespace bi = boost::intrusive;
46
47namespace seastar {
48
49namespace rpc {
50
63
64using id_type = int64_t;
65
66using rpc_semaphore = basic_semaphore<semaphore_default_exception_factory, rpc_clock_type>;
67using resource_permit = semaphore_units<semaphore_default_exception_factory, rpc_clock_type>;
68
69static constexpr char rpc_magic[] = "SSTARRPC";
70
73
79};
80
85
98 size_t basic_request_size = 0;
99 unsigned bloat_factor = 1;
103 using syncronous_isolation_function = std::function<isolation_config (sstring isolation_cookie)>;
104 using asyncronous_isolation_function = std::function<future<isolation_config> (sstring isolation_cookie)>;
105 using isolation_function_alternatives = std::variant<syncronous_isolation_function, asyncronous_isolation_function>;
106 isolation_function_alternatives isolate_connection = default_isolate_connection;
107};
108
110 std::optional<net::tcp_keepalive_params> keepalive;
111 bool tcp_nodelay = true;
112 bool reuseaddr = false;
113 compressor::factory* compressor_factory = nullptr;
114 bool send_timeout_data = true;
115 connection_id stream_parent = invalid_connection_id;
120 sstring metrics_domain = "default";
121 bool send_handler_duration = true;
122};
123
125
126// RPC call that passes stream connection id as a parameter
127// may arrive to a different shard from where the stream connection
128// was opened, so the connection id is not known to a server that handles
129// the RPC call. The shard that the stream connection belong to is know
130// since it is a part of connection id, but this is not enough to locate
131// a server instance the connection belongs to if there are more than one
132// server on the shard. Stream domain parameter is here to help with that.
133// Different servers on all shards logically belonging to the same service should
134// belong to the same streaming domain. Only one server on each shard can belong to
135// a particulr streaming domain.
137 uint64_t _id;
138public:
139 explicit streaming_domain_type(uint64_t id) : _id(id) {}
140 bool operator==(const streaming_domain_type& o) const {
141 return _id == o._id;
142 }
143 friend struct std::hash<streaming_domain_type>;
144 friend std::ostream& operator<<(std::ostream&, const streaming_domain_type&);
145};
146
149
150class server;
151
153 compressor::factory* compressor_factory = nullptr;
154 bool tcp_nodelay = true;
155 std::optional<streaming_domain_type> streaming_domain;
156 server_socket::load_balancing_algorithm load_balancing_algorithm = server_socket::load_balancing_algorithm::default_;
157 // optional filter function. If set, will be called with remote
158 // (connecting) address.
159 // Returning false will refuse the incoming connection.
160 // Returning true will allow the mechanism to proceed.
161 std::function<bool(const socket_address&)> filter_connection = {};
162};
163
165
166inline
167size_t
168estimate_request_size(const resource_limits& lim, size_t serialized_size) {
169 return lim.basic_request_size + serialized_size * lim.bloat_factor;
170}
171
173 char magic[sizeof(rpc_magic) - 1];
174 uint32_t len; // additional negotiation data length; multiple negotiation_frame_feature_record structs
175};
176
177enum class protocol_features : uint32_t {
178 COMPRESS = 0,
179 TIMEOUT = 1,
180 CONNECTION_ID = 2,
181 STREAM_PARENT = 3,
182 ISOLATION = 4,
183 HANDLER_DURATION = 5,
184};
185
186// internal representation of feature data
187using feature_map = std::map<protocol_features, sstring>;
188
189// An rpc signature, in the form signature<Ret (In0, In1, In2)>.
190template <typename Function>
192
193class logger {
194 std::function<void(const sstring&)> _logger;
195 ::seastar::logger* _seastar_logger = nullptr;
196
197 // _seastar_logger will always be used first if it's available
198 void log(const sstring& str) const {
199 if (_seastar_logger) {
200 // default level for log messages is `info`
201 _seastar_logger->info("{}", str);
202 } else if (_logger) {
203 _logger(str);
204 }
205 }
206
207 // _seastar_logger will always be used first if it's available
208 template <typename... Args>
209#ifdef SEASTAR_LOGGER_COMPILE_TIME_FMT
210 void log(log_level level, fmt::format_string<Args...> fmt, Args&&... args) const {
211#else
212 void log(log_level level, const char* fmt, Args&&... args) const {
213#endif
214 if (_seastar_logger) {
215 _seastar_logger->log(level, fmt, std::forward<Args>(args)...);
216 // If the log level is at least `info`, fall back to legacy logging without explicit level.
217 // Ignore less severe levels in order not to spam user's log with messages during transition,
218 // i.e. when the user still only defines a level-less logger.
219 } else if (_logger && level <= log_level::info) {
220 fmt::memory_buffer out;
221#ifdef SEASTAR_LOGGER_COMPILE_TIME_FMT
222 fmt::format_to(fmt::appender(out), fmt, std::forward<Args>(args)...);
223#else
224 fmt::format_to(fmt::appender(out), fmt::runtime(fmt), std::forward<Args>(args)...);
225#endif
226 _logger(sstring{out.data(), out.size()});
227 }
228 }
229
230public:
231 void set(std::function<void(const sstring&)> l) {
232 _logger = std::move(l);
233 }
234
235 void set(::seastar::logger* logger) {
236 _seastar_logger = logger;
237 }
238
239 void operator()(const client_info& info, id_type msg_id, const sstring& str) const;
240 void operator()(const client_info& info, id_type msg_id, log_level level, std::string_view str) const;
241
242 void operator()(const client_info& info, const sstring& str) const;
243 void operator()(const client_info& info, log_level level, std::string_view str) const;
244
245 void operator()(const socket_address& addr, const sstring& str) const;
246 void operator()(const socket_address& addr, log_level level, std::string_view str) const;
247};
248
250protected:
252 input_stream<char> _read_buf;
253 output_stream<char> _write_buf;
254 bool _error = false;
255 bool _connected = false;
256 std::optional<shared_promise<>> _negotiated = shared_promise<>();
257 promise<> _stopped;
258 stats _stats;
259 const logger& _logger;
260 // The owner of the pointer below is an instance of rpc::protocol<typename Serializer> class.
261 // The type of the pointer is erased here, but the original type is Serializer
262 void* _serializer;
263 struct outgoing_entry : public bi::list_base_hook<bi::link_mode<bi::auto_unlink>> {
265 snd_buf buf;
266 promise<> done;
267 cancellable* pcancel = nullptr;
268 outgoing_entry(snd_buf b) : buf(std::move(b)) {}
269
270 outgoing_entry(outgoing_entry&&) = delete;
271 outgoing_entry(const outgoing_entry&) = delete;
272
273 void uncancellable() {
274 t.cancel();
275 if (pcancel) {
276 pcancel->cancel_send = std::function<void()>();
277 }
278 }
279
281 if (pcancel) {
282 pcancel->cancel_send = std::function<void()>();
283 pcancel->send_back_pointer = nullptr;
284 }
285 }
286
287 using container_t = bi::list<outgoing_entry, bi::constant_time_size<false>>;
288 };
289 void withdraw(outgoing_entry::container_t::iterator it, std::exception_ptr ex = nullptr);
290 future<> _outgoing_queue_ready = _negotiated->get_shared_future();
291 outgoing_entry::container_t _outgoing_queue;
292 size_t _outgoing_queue_size = 0;
293 std::unique_ptr<compressor> _compressor;
294 bool _propagate_timeout = false;
295 bool _timeout_negotiated = false;
296 bool _handler_duration_negotiated = false;
297 // stream related fields
298 bool _is_stream = false;
299 connection_id _id = invalid_connection_id;
300
301 std::unordered_map<connection_id, xshard_connection_ptr> _streams;
302 queue<rcv_buf> _stream_queue = queue<rcv_buf>(max_queued_stream_buffers);
303 semaphore _stream_sem = semaphore(max_stream_buffers_memory);
304 bool _sink_closed = true;
305 bool _source_closed = true;
306 // the future holds if sink is already closed
307 // if it is not ready it means the sink is been closed
308 future<bool> _sink_closed_future = make_ready_future<bool>(false);
309
310 void set_negotiated() noexcept;
311
312 bool is_stream() const noexcept {
313 return _is_stream;
314 }
315
316 snd_buf compress(snd_buf buf);
317 future<> send_buffer(snd_buf buf);
318 future<> send(snd_buf buf, std::optional<rpc_clock_type::time_point> timeout = {}, cancellable* cancel = nullptr);
319 future<> send_entry(outgoing_entry& d) noexcept;
320 future<> stop_send_loop(std::exception_ptr ex);
321 future<std::optional<rcv_buf>> read_stream_frame_compressed(input_stream<char>& in);
322 bool stream_check_twoway_closed() const noexcept {
323 return _sink_closed && _source_closed;
324 }
325 future<> stream_close();
326 future<> stream_process_incoming(rcv_buf&&);
327 future<> handle_stream_frame();
328
329public:
330 connection(connected_socket&& fd, const logger& l, void* s, connection_id id = invalid_connection_id) : connection(l, s, id) {
331 set_socket(std::move(fd));
332 }
333 connection(const logger& l, void* s, connection_id id = invalid_connection_id) : _logger(l), _serializer(s), _id(id) {}
334 virtual ~connection() {}
335 size_t outgoing_queue_length() const noexcept {
336 return _outgoing_queue_size;
337 }
338
339 void set_socket(connected_socket&& fd);
340 future<> send_negotiation_frame(feature_map features);
341 bool error() const noexcept { return _error; }
342 void abort();
343 future<> stop() noexcept;
344 future<> stream_receive(circular_buffer<foreign_ptr<std::unique_ptr<rcv_buf>>>& bufs);
345 future<> close_sink() {
346 _sink_closed = true;
347 if (stream_check_twoway_closed()) {
348 return stream_close();
349 }
350 return make_ready_future();
351 }
352 bool sink_closed() const noexcept {
353 return _sink_closed;
354 }
355 future<> close_source() {
356 _source_closed = true;
357 if (stream_check_twoway_closed()) {
358 return stream_close();
359 }
360 return make_ready_future();
361 }
362 connection_id get_connection_id() const noexcept {
363 return _id;
364 }
365 stats& get_stats_internal() noexcept {
366 return _stats;
367 }
368 xshard_connection_ptr get_stream(connection_id id) const;
369 void register_stream(connection_id id, xshard_connection_ptr c);
370 virtual socket_address peer_address() const = 0;
371
372 const logger& get_logger() const noexcept {
373 return _logger;
374 }
375
376 template<typename Serializer>
377 Serializer& serializer() {
378 return *static_cast<Serializer*>(_serializer);
379 }
380
381 template <typename FrameType>
382 future<typename FrameType::return_type> read_frame(socket_address info, input_stream<char>& in);
383
384 template <typename FrameType>
385 future<typename FrameType::return_type> read_frame_compressed(socket_address info, std::unique_ptr<compressor>& compressor, input_stream<char>& in);
386 friend class client;
387 template<typename Serializer, typename... Out>
388 friend class sink_impl;
389 template<typename Serializer, typename... In>
390 friend class source_impl;
391
392 void suspend_for_testing(promise<>& p) {
393 _outgoing_queue_ready.get();
394 auto dummy = std::make_unique<outgoing_entry>(snd_buf());
395 _outgoing_queue.push_back(*dummy);
396 _outgoing_queue_ready = dummy->done.get_future();
397 (void)p.get_future().then([dummy = std::move(dummy)] { dummy->done.set_value(); });
398 }
399};
400
402 promise<> pr;
403 snd_buf data;
404};
405
406// send data Out...
407template<typename Serializer, typename... Out>
408class sink_impl : public sink<Out...>::impl {
409 // Used on the shard *this lives on.
410 alignas (cache_line_size) uint64_t _next_seq_num = 1;
411
412 // Used on the shard the _conn lives on.
413 struct alignas (cache_line_size) {
414 uint64_t last_seq_num = 0;
415 std::map<uint64_t, deferred_snd_buf> out_of_order_bufs;
416 } _remote_state;
417public:
418 sink_impl(xshard_connection_ptr con) : sink<Out...>::impl(std::move(con)) { this->_con->get()->_sink_closed = false; }
419 future<> operator()(const Out&... args) override;
420 future<> close() override;
421 future<> flush() override;
422 ~sink_impl() override;
423};
424
425// receive data In...
426template<typename Serializer, typename... In>
427class source_impl : public source<In...>::impl {
428public:
429 source_impl(xshard_connection_ptr con) : source<In...>::impl(std::move(con)) { this->_con->get()->_source_closed = false; }
430 future<std::optional<std::tuple<In...>>> operator()() override;
431};
432
433class client : public rpc::connection, public weakly_referencable<client> {
434 socket _socket;
435 id_type _message_id = 1;
436 struct reply_handler_base {
438 cancellable* pcancel = nullptr;
439 rpc_clock_type::time_point start;
440 virtual void operator()(client&, id_type, rcv_buf data) = 0;
441 virtual void timeout() {}
442 virtual void cancel() {}
443 virtual ~reply_handler_base() {
444 if (pcancel) {
445 pcancel->cancel_wait = std::function<void()>();
446 pcancel->wait_back_pointer = nullptr;
447 }
448 };
449 };
450
451 class metrics {
452 struct domain;
453
454 using domain_member_hook_t = boost::intrusive::list_member_hook<boost::intrusive::link_mode<boost::intrusive::auto_unlink>>;
455
456 const client& _c;
457 domain& _domain;
458 domain_member_hook_t _link;
459
460 using domain_list_t = boost::intrusive::list<metrics,
461 boost::intrusive::member_hook<metrics, metrics::domain_member_hook_t, &metrics::_link>,
462 boost::intrusive::constant_time_size<false>>;
463
464 public:
465 metrics(const client&);
466 ~metrics();
467 };
468
469 void enqueue_zero_frame();
470public:
471 template<typename Reply, typename Func>
472 struct reply_handler final : reply_handler_base {
473 Func func;
474 Reply reply;
475 reply_handler(Func&& f) : func(std::move(f)) {}
476 virtual void operator()(client& client, id_type msg_id, rcv_buf data) override {
477 return func(reply, client, msg_id, std::move(data));
478 }
479 virtual void timeout() override {
480 reply.done = true;
481 reply.p.set_exception(timeout_error());
482 }
483 virtual void cancel() override {
484 reply.done = true;
485 reply.p.set_exception(canceled_error());
486 }
487 virtual ~reply_handler() {}
488 };
489private:
490 std::unordered_map<id_type, std::unique_ptr<reply_handler_base>> _outstanding;
491 socket_address _server_addr, _local_addr;
492 client_options _options;
493 weak_ptr<client> _parent; // for stream clients
494
495 metrics _metrics;
496
497private:
498 future<> negotiate_protocol(feature_map map);
499 void negotiate(feature_map server_features);
500 // Returned future is
501 // - message id
502 // - optional server-side handler duration
503 // - message payload
505 read_response_frame_compressed(input_stream<char>& in);
506public:
515 client(const logger& l, void* s, const socket_address& addr, const socket_address& local = {});
516 client(const logger& l, void* s, client_options options, const socket_address& addr, const socket_address& local = {});
517
528 client(const logger& l, void* s, socket socket, const socket_address& addr, const socket_address& local = {});
529 client(const logger& l, void* s, client_options options, socket socket, const socket_address& addr, const socket_address& local = {});
530
531 stats get_stats() const;
532 size_t incoming_queue_length() const noexcept {
533 return _outstanding.size();
534 }
535
536 auto next_message_id() { return _message_id++; }
537 void wait_for_reply(id_type id, std::unique_ptr<reply_handler_base>&& h, std::optional<rpc_clock_type::time_point> timeout, cancellable* cancel);
538 void wait_timed_out(id_type id);
539 future<> stop() noexcept;
540 void abort_all_streams();
541 void deregister_this_stream();
542 socket_address peer_address() const override {
543 return _server_addr;
544 }
545 future<> await_connection() {
546 if (!_negotiated) {
547 return make_ready_future<>();
548 } else {
549 return _negotiated->get_shared_future();
550 }
551 }
552 template<typename Serializer, typename... Out>
553 future<sink<Out...>> make_stream_sink(socket socket) {
554 return await_connection().then([this, socket = std::move(socket)] () mutable {
555 if (!this->get_connection_id()) {
556 return make_exception_future<sink<Out...>>(std::runtime_error("Streaming is not supported by the server"));
557 }
558 client_options o = _options;
559 o.stream_parent = this->get_connection_id();
560 o.send_timeout_data = false;
561 o.metrics_domain += "_stream";
562 auto c = make_shared<client>(_logger, _serializer, o, std::move(socket), _server_addr, _local_addr);
563 c->_parent = this->weak_from_this();
564 c->_is_stream = true;
565 return c->await_connection().then([c, this] {
566 if (_error) {
567 throw closed_error();
568 }
569 xshard_connection_ptr s = make_lw_shared(make_foreign(static_pointer_cast<rpc::connection>(c)));
570 this->register_stream(c->get_connection_id(), s);
571 return sink<Out...>(make_shared<sink_impl<Serializer, Out...>>(std::move(s)));
572 }).handle_exception([c] (std::exception_ptr eptr) {
573 // If await_connection fails we need to stop the client
574 // before destroying it.
575 return c->stop().then([eptr, c] {
576 return make_exception_future<sink<Out...>>(eptr);
577 });
578 });
579 });
580 }
581 template<typename Serializer, typename... Out>
582 future<sink<Out...>> make_stream_sink() {
583 return make_stream_sink<Serializer, Out...>(make_socket());
584 }
585
586 future<> request(uint64_t type, int64_t id, snd_buf buf, std::optional<rpc_clock_type::time_point> timeout = {}, cancellable* cancel = nullptr);
587};
588
589class protocol_base;
590
591class server {
592private:
593 static thread_local std::unordered_map<streaming_domain_type, server*> _servers;
594
595public:
596 class connection : public rpc::connection, public enable_shared_from_this<connection> {
597 client_info _info;
598 connection_id _parent_id = invalid_connection_id;
599 std::optional<isolation_config> _isolation_config;
600 private:
601 future<> negotiate_protocol();
602 future<std::tuple<std::optional<uint64_t>, uint64_t, int64_t, std::optional<rcv_buf>>>
603 read_request_frame_compressed(input_stream<char>& in);
604 future<feature_map> negotiate(feature_map requested);
605 future<> send_unknown_verb_reply(std::optional<rpc_clock_type::time_point> timeout, int64_t msg_id, uint64_t type);
606 public:
607 connection(server& s, connected_socket&& fd, socket_address&& addr, const logger& l, void* seralizer, connection_id id);
608 future<> process();
609 future<> respond(int64_t msg_id, snd_buf&& data, std::optional<rpc_clock_type::time_point> timeout, std::optional<rpc_clock_type::duration> handler_duration);
610 client_info& info() { return _info; }
611 const client_info& info() const { return _info; }
612 stats get_stats() const {
613 stats res = _stats;
614 res.pending = outgoing_queue_length();
615 return res;
616 }
617 socket_address peer_address() const override {
618 return _info.addr;
619 }
620 // Resources will be released when this goes out of scope
621 future<resource_permit> wait_for_resources(size_t memory_consumed, std::optional<rpc_clock_type::time_point> timeout) {
622 if (timeout) {
623 return get_units(get_server()._resources_available, memory_consumed, *timeout);
624 } else {
625 return get_units(get_server()._resources_available, memory_consumed);
626 }
627 }
628 size_t estimate_request_size(size_t serialized_size) {
629 return rpc::estimate_request_size(get_server()._limits, serialized_size);
630 }
631 size_t max_request_size() const {
632 return get_server()._limits.max_memory;
633 }
634 server& get_server() {
635 return _info.server;
636 }
637 const server& get_server() const {
638 return _info.server;
639 }
640 future<> deregister_this_stream();
641 future<> abort_all_streams();
642 };
643private:
644 protocol_base& _proto;
645 server_socket _ss;
646 resource_limits _limits;
647 rpc_semaphore _resources_available;
648 std::unordered_map<connection_id, shared_ptr<connection>> _conns;
649 promise<> _ss_stopped;
650 gate _reply_gate;
651 server_options _options;
652 bool _shutdown = false;
653 uint64_t _next_client_id = 1;
654
655public:
656 server(protocol_base* proto, const socket_address& addr, resource_limits memory_limit = resource_limits());
657 server(protocol_base* proto, server_options opts, const socket_address& addr, resource_limits memory_limit = resource_limits());
660 void accept();
680 template<typename Func>
681 void foreach_connection(Func&& f) {
682 for (auto c : _conns) {
683 f(*c.second);
684 }
685 }
694 gate& reply_gate() {
695 return _reply_gate;
696 }
697 friend connection;
698 friend client;
699};
700
701using rpc_handler_func = std::function<future<> (shared_ptr<server::connection>, std::optional<rpc_clock_type::time_point> timeout, int64_t msgid,
702 rcv_buf data, gate::holder guard)>;
703
706 rpc_handler_func func;
707 gate use_gate;
708};
709
711public:
712 virtual ~protocol_base() {};
713 virtual shared_ptr<server::connection> make_server_connection(rpc::server& server, connected_socket fd, socket_address addr, connection_id id) = 0;
714protected:
715 friend class server;
716
718 rpc_handler& handler;
719 gate::holder holder;
720 };
721 virtual std::optional<handler_with_holder> get_handler(uint64_t msg_id) = 0;
722};
723
726
816template<typename Serializer, typename MsgType = uint32_t>
817class protocol final : public protocol_base {
818public:
820 class server : public rpc::server {
821 public:
822 server(protocol& proto, const socket_address& addr, resource_limits memory_limit = resource_limits()) :
823 rpc::server(&proto, addr, memory_limit) {}
824 server(protocol& proto, server_options opts, const socket_address& addr, resource_limits memory_limit = resource_limits()) :
825 rpc::server(&proto, opts, addr, memory_limit) {}
827 rpc::server(&proto, std::move(socket), memory_limit) {}
829 rpc::server(&proto, opts, std::move(socket), memory_limit) {}
830 };
832 class client : public rpc::client {
833 public:
834 /*
835 * Create client object which will attempt to connect to the remote address.
836 *
837 * @param addr the remote address identifying this client
838 * @param local the local address of this client
839 */
840 client(protocol& p, const socket_address& addr, const socket_address& local = {}) :
841 rpc::client(p.get_logger(), &p._serializer, addr, local) {}
842 client(protocol& p, client_options options, const socket_address& addr, const socket_address& local = {}) :
843 rpc::client(p.get_logger(), &p._serializer, options, addr, local) {}
844
853 client(protocol& p, socket socket, const socket_address& addr, const socket_address& local = {}) :
854 rpc::client(p.get_logger(), &p._serializer, std::move(socket), addr, local) {}
855 client(protocol& p, client_options options, socket socket, const socket_address& addr, const socket_address& local = {}) :
856 rpc::client(p.get_logger(), &p._serializer, options, std::move(socket), addr, local) {}
857 };
858
859 friend server;
860private:
861 std::unordered_map<MsgType, rpc_handler> _handlers;
862 Serializer _serializer;
863 logger _logger;
864
865public:
866 protocol(Serializer&& serializer) : _serializer(std::forward<Serializer>(serializer)) {}
867
877 template<typename Func>
878 auto make_client(MsgType t);
879
892 template<typename Func>
893 auto register_handler(MsgType t, Func&& func);
894
912 template <typename Func>
913 auto register_handler(MsgType t, scheduling_group sg, Func&& func);
914
925 future<> unregister_handler(MsgType t);
926
931 [[deprecated("Use set_logger(::seastar::logger*) instead")]]
932 void set_logger(std::function<void(const sstring&)> logger) {
933 _logger.set(std::move(logger));
934 }
935
937 void set_logger(::seastar::logger* logger) {
938 _logger.set(logger);
939 }
940
941 const logger& get_logger() const {
942 return _logger;
943 }
944
945 shared_ptr<rpc::server::connection> make_server_connection(rpc::server& server, connected_socket fd, socket_address addr, connection_id id) override {
946 return make_shared<rpc::server::connection>(server, std::move(fd), std::move(addr), _logger, &_serializer, id);
947 }
948
949 bool has_handler(MsgType msg_id);
950
955 bool has_handlers() const noexcept {
956 return !_handlers.empty();
957 }
958
959private:
960 std::optional<handler_with_holder> get_handler(uint64_t msg_id) override;
961
962 template<typename Ret, typename... In>
963 auto make_client(signature<Ret(In...)> sig, MsgType t);
964
965 void register_receiver(MsgType t, rpc_handler&& handler) {
966 auto r = _handlers.emplace(t, std::move(handler));
967 if (!r.second) {
968 throw_with_backtrace<std::runtime_error>("registered handler already exists");
969 }
970 }
971};
972
974
975}
976
977}
978
979#include "rpc_impl.hh"
static constexpr size_t max_counter() noexcept
Returns the maximum number of units the semaphore counter can hold.
Definition: semaphore.hh:231
Definition: api.hh:183
Definition: shared_ptr.hh:493
A representation of a possibly not-yet-computed value.
Definition: future.hh:1240
value_type && get()
gets the value returned by the computation
Definition: future.hh:1342
Definition: gate.hh:171
Definition: gate.hh:61
Definition: queue.hh:44
Definition: rpc_types.hh:165
Definition: rpc.hh:433
client(const logger &l, void *s, const socket_address &addr, const socket_address &local={})
client(const logger &l, void *s, socket socket, const socket_address &addr, const socket_address &local={})
Definition: rpc_types.hh:290
Definition: rpc_types.hh:66
Definition: rpc.hh:249
Definition: rpc.hh:193
Represents a client side connection.
Definition: rpc.hh:832
client(protocol &p, socket socket, const socket_address &addr, const socket_address &local={})
Definition: rpc.hh:853
Represents the listening port and all accepted connections.
Definition: rpc.hh:820
Definition: rpc.hh:710
Definition: rpc.hh:817
void set_logger(std::function< void(const sstring &)> logger)
Definition: rpc.hh:932
bool has_handlers() const noexcept
Definition: rpc.hh:955
void set_logger(::seastar::logger *logger)
Set a logger to be used to log messages.
Definition: rpc.hh:937
Definition: rpc.hh:596
Definition: rpc.hh:591
void abort_connection(connection_id id)
Definition: rpc_types.hh:319
Definition: rpc.hh:408
Definition: rpc_types.hh:317
Definition: rpc_types.hh:358
Definition: rpc.hh:427
Definition: rpc_types.hh:356
Definition: rpc_types.hh:144
Identifies function calls that are accounted as a group.
Definition: scheduling.hh:285
A listening socket, waiting to accept incoming network connections.
Definition: api.hh:326
Like promise except that its counterpart is shared_future instead of future.
Definition: shared_future.hh:319
Definition: shared_ptr.hh:507
Definition: socket_defs.hh:47
Definition: api.hh:283
Definition: timer.hh:83
bool cancel() noexcept
Definition: weak_ptr.hh:46
Definition: weak_ptr.hh:133
basic_semaphore< semaphore_default_exception_factory > semaphore
Definition: semaphore.hh:804
future< T > make_ready_future(A &&... value) noexcept
Creates a future in an available, value state.
Definition: future.hh:1943
future< T > get_future() noexcept
Gets the promise's associated future.
Definition: future.hh:1926
future< T > make_exception_future(std::exception_ptr &&value) noexcept
Creates a future in an available, failed state.
Definition: future.hh:1949
log_level
log level used with
Definition: log.hh:55
socket make_socket()
scheduling_group sched_group
Definition: rpc.hh:78
size_t max_memory
Definition: rpc.hh:100
unsigned bloat_factor
Serialized size multiplied by this to estimate memory used by request.
Definition: rpc.hh:99
std::function< isolation_config(sstring isolation_cookie)> syncronous_isolation_function
Definition: rpc.hh:103
size_t basic_request_size
Minimum request footprint in memory.
Definition: rpc.hh:98
sstring isolation_cookie
Definition: rpc.hh:119
isolation_config default_isolate_connection(sstring isolation_cookie)
Definition: rpc.hh:109
Specifies resource isolation for a connection.
Definition: rpc.hh:75
Resource limits for an RPC server.
Definition: rpc.hh:97
Definition: rpc.hh:152
statistics stats()
Capture a snapshot of memory allocation statistics for this lcore.
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
scheduling_group current_scheduling_group() noexcept
Returns the current scheduling group.
Definition: scheduling.hh:397
STL namespace.
ostream & operator<<(ostream &os, const seastar::lazy_eval< Func > &lf)
Definition: lazy.hh:131
Definition: rpc_types.hh:203
Definition: rpc_types.hh:97
Definition: rpc.hh:263
Definition: rpc.hh:401
Definition: rpc.hh:172
Definition: rpc_types.hh:239
Definition: rpc.hh:704
Definition: rpc.hh:191
Definition: rpc_types.hh:251
Definition: rpc_types.hh:54