Seastar
High performance C++ framework for concurrent servers
rpc.hh
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18/*
19 * Copyright (C) 2015 Cloudius Systems, Ltd.
20 */
21
22#pragma once
23
24#include <unordered_map>
25#include <unordered_set>
26#include <list>
27#include <variant>
28#include <boost/intrusive/list.hpp>
29#include <seastar/core/future.hh>
30#include <seastar/core/seastar.hh>
31#include <seastar/net/api.hh>
32#include <seastar/core/iostream.hh>
33#include <seastar/core/shared_ptr.hh>
34#include <seastar/core/condition-variable.hh>
35#include <seastar/core/gate.hh>
36#include <seastar/rpc/rpc_types.hh>
37#include <seastar/core/byteorder.hh>
38#include <seastar/core/shared_future.hh>
39#include <seastar/core/queue.hh>
40#include <seastar/core/weak_ptr.hh>
42#include <seastar/util/backtrace.hh>
43#include <seastar/util/log.hh>
44
45namespace bi = boost::intrusive;
46
47namespace seastar {
48
49namespace rpc {
50
63
64using id_type = int64_t;
65
66using rpc_semaphore = basic_semaphore<semaphore_default_exception_factory, rpc_clock_type>;
67using resource_permit = semaphore_units<semaphore_default_exception_factory, rpc_clock_type>;
68
69static constexpr char rpc_magic[] = "SSTARRPC";
70
73
79};
80
85
98 size_t basic_request_size = 0;
99 unsigned bloat_factor = 1;
103 using syncronous_isolation_function = std::function<isolation_config (sstring isolation_cookie)>;
104 using asyncronous_isolation_function = std::function<future<isolation_config> (sstring isolation_cookie)>;
105 using isolation_function_alternatives = std::variant<syncronous_isolation_function, asyncronous_isolation_function>;
106 isolation_function_alternatives isolate_connection = default_isolate_connection;
107};
108
110 std::optional<net::tcp_keepalive_params> keepalive;
111 bool tcp_nodelay = true;
112 bool reuseaddr = false;
113 compressor::factory* compressor_factory = nullptr;
114 bool send_timeout_data = true;
115 connection_id stream_parent = invalid_connection_id;
120 sstring metrics_domain = "default";
121 bool send_handler_duration = true;
122};
123
125
126// RPC call that passes stream connection id as a parameter
127// may arrive to a different shard from where the stream connection
128// was opened, so the connection id is not known to a server that handles
129// the RPC call. The shard that the stream connection belong to is know
130// since it is a part of connection id, but this is not enough to locate
131// a server instance the connection belongs to if there are more than one
132// server on the shard. Stream domain parameter is here to help with that.
133// Different servers on all shards logically belonging to the same service should
134// belong to the same streaming domain. Only one server on each shard can belong to
135// a particulr streaming domain.
137 uint64_t _id;
138public:
139 explicit streaming_domain_type(uint64_t id) : _id(id) {}
140 bool operator==(const streaming_domain_type& o) const {
141 return _id == o._id;
142 }
143 friend struct std::hash<streaming_domain_type>;
144 friend std::ostream& operator<<(std::ostream&, const streaming_domain_type&);
145};
146
149
150class server;
151
153 compressor::factory* compressor_factory = nullptr;
154 bool tcp_nodelay = true;
155 std::optional<streaming_domain_type> streaming_domain;
156 server_socket::load_balancing_algorithm load_balancing_algorithm = server_socket::load_balancing_algorithm::default_;
157 // optional filter function. If set, will be called with remote
158 // (connecting) address.
159 // Returning false will refuse the incoming connection.
160 // Returning true will allow the mechanism to proceed.
161 std::function<bool(const socket_address&)> filter_connection = {};
162};
163
165
166inline
167size_t
168estimate_request_size(const resource_limits& lim, size_t serialized_size) {
169 return lim.basic_request_size + serialized_size * lim.bloat_factor;
170}
171
173 char magic[sizeof(rpc_magic) - 1];
174 uint32_t len; // additional negotiation data length; multiple negotiation_frame_feature_record structs
175};
176
177enum class protocol_features : uint32_t {
178 COMPRESS = 0,
179 TIMEOUT = 1,
180 CONNECTION_ID = 2,
181 STREAM_PARENT = 3,
182 ISOLATION = 4,
183 HANDLER_DURATION = 5,
184};
185
186// internal representation of feature data
187using feature_map = std::map<protocol_features, sstring>;
188
189// An rpc signature, in the form signature<Ret (In0, In1, In2)>.
190template <typename Function>
192
193class logger {
194 ::seastar::logger* _seastar_logger = nullptr;
195
196 void log(const sstring& str) const {
197 if (_seastar_logger) {
198 // default level for log messages is `info`
199 _seastar_logger->info("{}", str);
200 }
201 }
202
203 // _seastar_logger will always be used first if it's available
204 template <typename... Args>
205#ifdef SEASTAR_LOGGER_COMPILE_TIME_FMT
206 void log(log_level level, fmt::format_string<Args...> fmt, Args&&... args) const {
207#else
208 void log(log_level level, const char* fmt, Args&&... args) const {
209#endif
210 if (_seastar_logger) {
211 _seastar_logger->log(level, fmt, std::forward<Args>(args)...);
212 }
213 }
214
215public:
216 void set(::seastar::logger* logger) {
217 _seastar_logger = logger;
218 }
219
220 void operator()(const client_info& info, id_type msg_id, const sstring& str) const;
221 void operator()(const client_info& info, id_type msg_id, log_level level, std::string_view str) const;
222
223 void operator()(const client_info& info, const sstring& str) const;
224 void operator()(const client_info& info, log_level level, std::string_view str) const;
225
226 void operator()(const socket_address& addr, const sstring& str) const;
227 void operator()(const socket_address& addr, log_level level, std::string_view str) const;
228};
229
231protected:
233 input_stream<char> _read_buf;
234 output_stream<char> _write_buf;
235 bool _error = false;
236 bool _connected = false;
237 std::optional<shared_promise<>> _negotiated = shared_promise<>();
238 promise<> _stopped;
239 stats _stats;
240 const logger& _logger;
241 // The owner of the pointer below is an instance of rpc::protocol<typename Serializer> class.
242 // The type of the pointer is erased here, but the original type is Serializer
243 void* _serializer;
244 struct outgoing_entry : public bi::list_base_hook<bi::link_mode<bi::auto_unlink>> {
246 snd_buf buf;
247 promise<> done;
248 cancellable* pcancel = nullptr;
249 outgoing_entry(snd_buf b) : buf(std::move(b)) {}
250
251 outgoing_entry(outgoing_entry&&) = delete;
252 outgoing_entry(const outgoing_entry&) = delete;
253
254 void uncancellable() {
255 t.cancel();
256 if (pcancel) {
257 pcancel->cancel_send = std::function<void()>();
258 }
259 }
260
262 if (pcancel) {
263 pcancel->cancel_send = std::function<void()>();
264 pcancel->send_back_pointer = nullptr;
265 }
266 }
267
268 using container_t = bi::list<outgoing_entry, bi::constant_time_size<false>>;
269 };
270 void withdraw(outgoing_entry::container_t::iterator it, std::exception_ptr ex = nullptr);
271 future<> _outgoing_queue_ready = _negotiated->get_shared_future();
272 outgoing_entry::container_t _outgoing_queue;
273 size_t _outgoing_queue_size = 0;
274 std::unique_ptr<compressor> _compressor;
275 bool _propagate_timeout = false;
276 bool _timeout_negotiated = false;
277 bool _handler_duration_negotiated = false;
278 // stream related fields
279 bool _is_stream = false;
280 connection_id _id = invalid_connection_id;
281
282 std::unordered_map<connection_id, xshard_connection_ptr> _streams;
283 queue<rcv_buf> _stream_queue = queue<rcv_buf>(max_queued_stream_buffers);
284 semaphore _stream_sem = semaphore(max_stream_buffers_memory);
285 bool _sink_closed = true;
286 bool _source_closed = true;
287 // the future holds if sink is already closed
288 // if it is not ready it means the sink is been closed
289 future<bool> _sink_closed_future = make_ready_future<bool>(false);
290
291 void set_negotiated() noexcept;
292
293 bool is_stream() const noexcept {
294 return _is_stream;
295 }
296
297 snd_buf compress(snd_buf buf);
298 future<> send_buffer(snd_buf buf);
299 future<> send(snd_buf buf, std::optional<rpc_clock_type::time_point> timeout = {}, cancellable* cancel = nullptr);
300 future<> send_entry(outgoing_entry& d) noexcept;
301 future<> stop_send_loop(std::exception_ptr ex);
302 future<std::optional<rcv_buf>> read_stream_frame_compressed(input_stream<char>& in);
303 bool stream_check_twoway_closed() const noexcept {
304 return _sink_closed && _source_closed;
305 }
306 future<> stream_close();
307 future<> stream_process_incoming(rcv_buf&&);
308 future<> handle_stream_frame();
309
310public:
311 connection(connected_socket&& fd, const logger& l, void* s, connection_id id = invalid_connection_id) : connection(l, s, id) {
312 set_socket(std::move(fd));
313 }
314 connection(const logger& l, void* s, connection_id id = invalid_connection_id) : _logger(l), _serializer(s), _id(id) {}
315 virtual ~connection() {}
316 size_t outgoing_queue_length() const noexcept {
317 return _outgoing_queue_size;
318 }
319
320 void set_socket(connected_socket&& fd);
321 future<> send_negotiation_frame(feature_map features);
322 bool error() const noexcept { return _error; }
323 void abort();
324 future<> stop() noexcept;
325 future<> stream_receive(circular_buffer<foreign_ptr<std::unique_ptr<rcv_buf>>>& bufs);
326 future<> close_sink() {
327 _sink_closed = true;
328 if (stream_check_twoway_closed()) {
329 return stream_close();
330 }
331 return make_ready_future();
332 }
333 bool sink_closed() const noexcept {
334 return _sink_closed;
335 }
336 future<> close_source() {
337 _source_closed = true;
338 if (stream_check_twoway_closed()) {
339 return stream_close();
340 }
341 return make_ready_future();
342 }
343 connection_id get_connection_id() const noexcept {
344 return _id;
345 }
346 stats& get_stats_internal() noexcept {
347 return _stats;
348 }
349 xshard_connection_ptr get_stream(connection_id id) const;
350 void register_stream(connection_id id, xshard_connection_ptr c);
351 virtual socket_address peer_address() const = 0;
352
353 const logger& get_logger() const noexcept {
354 return _logger;
355 }
356
357 template<typename Serializer>
358 Serializer& serializer() {
359 return *static_cast<Serializer*>(_serializer);
360 }
361
362 template <typename FrameType>
363 future<typename FrameType::return_type> read_frame(socket_address info, input_stream<char>& in);
364
365 template <typename FrameType>
366 future<typename FrameType::return_type> read_frame_compressed(socket_address info, std::unique_ptr<compressor>& compressor, input_stream<char>& in);
367 friend class client;
368 template<typename Serializer, typename... Out>
369 friend class sink_impl;
370 template<typename Serializer, typename... In>
371 friend class source_impl;
372
373 void suspend_for_testing(promise<>& p) {
374 _outgoing_queue_ready.get();
375 auto dummy = std::make_unique<outgoing_entry>(snd_buf());
376 _outgoing_queue.push_back(*dummy);
377 _outgoing_queue_ready = dummy->done.get_future();
378 (void)p.get_future().then([dummy = std::move(dummy)] { dummy->done.set_value(); });
379 }
380};
381
383 promise<> pr;
384 snd_buf data;
385};
386
387// send data Out...
388template<typename Serializer, typename... Out>
389class sink_impl : public sink<Out...>::impl {
390 // Used on the shard *this lives on.
391 alignas (cache_line_size) uint64_t _next_seq_num = 1;
392
393 // Used on the shard the _conn lives on.
394 struct alignas (cache_line_size) {
395 uint64_t last_seq_num = 0;
396 std::map<uint64_t, deferred_snd_buf> out_of_order_bufs;
397 } _remote_state;
398public:
399 sink_impl(xshard_connection_ptr con) : sink<Out...>::impl(std::move(con)) { this->_con->get()->_sink_closed = false; }
400 future<> operator()(const Out&... args) override;
401 future<> close() override;
402 future<> flush() override;
403 ~sink_impl() override;
404};
405
406// receive data In...
407template<typename Serializer, typename... In>
408class source_impl : public source<In...>::impl {
409public:
410 source_impl(xshard_connection_ptr con) : source<In...>::impl(std::move(con)) { this->_con->get()->_source_closed = false; }
411 future<std::optional<std::tuple<In...>>> operator()() override;
412};
413
414class client : public rpc::connection, public weakly_referencable<client> {
415 socket _socket;
416 id_type _message_id = 1;
417 struct reply_handler_base {
419 cancellable* pcancel = nullptr;
420 rpc_clock_type::time_point start;
421 virtual void operator()(client&, id_type, rcv_buf data) = 0;
422 virtual void timeout() {}
423 virtual void cancel() {}
424 virtual ~reply_handler_base() {
425 if (pcancel) {
426 pcancel->cancel_wait = std::function<void()>();
427 pcancel->wait_back_pointer = nullptr;
428 }
429 };
430 };
431
432 class metrics {
433 struct domain;
434
435 using domain_member_hook_t = boost::intrusive::list_member_hook<boost::intrusive::link_mode<boost::intrusive::auto_unlink>>;
436
437 const client& _c;
438 domain& _domain;
439 domain_member_hook_t _link;
440
441 using domain_list_t = boost::intrusive::list<metrics,
442 boost::intrusive::member_hook<metrics, metrics::domain_member_hook_t, &metrics::_link>,
443 boost::intrusive::constant_time_size<false>>;
444
445 public:
446 metrics(const client&);
447 ~metrics();
448 };
449
450 void enqueue_zero_frame();
451public:
452 template<typename Reply, typename Func>
453 struct reply_handler final : reply_handler_base {
454 Func func;
455 Reply reply;
456 reply_handler(Func&& f) : func(std::move(f)) {}
457 virtual void operator()(client& client, id_type msg_id, rcv_buf data) override {
458 return func(reply, client, msg_id, std::move(data));
459 }
460 virtual void timeout() override {
461 reply.done = true;
462 reply.p.set_exception(timeout_error());
463 }
464 virtual void cancel() override {
465 reply.done = true;
466 reply.p.set_exception(canceled_error());
467 }
468 virtual ~reply_handler() {}
469 };
470private:
471 std::unordered_map<id_type, std::unique_ptr<reply_handler_base>> _outstanding;
472 socket_address _server_addr, _local_addr;
473 client_options _options;
474 weak_ptr<client> _parent; // for stream clients
475
476 metrics _metrics;
477
478private:
479 future<> negotiate_protocol(feature_map map);
480 void negotiate(feature_map server_features);
481 // Returned future is
482 // - message id
483 // - optional server-side handler duration
484 // - message payload
486 read_response_frame_compressed(input_stream<char>& in);
487public:
496 client(const logger& l, void* s, const socket_address& addr, const socket_address& local = {});
497 client(const logger& l, void* s, client_options options, const socket_address& addr, const socket_address& local = {});
498
509 client(const logger& l, void* s, socket socket, const socket_address& addr, const socket_address& local = {});
510 client(const logger& l, void* s, client_options options, socket socket, const socket_address& addr, const socket_address& local = {});
511
512 stats get_stats() const;
513 size_t incoming_queue_length() const noexcept {
514 return _outstanding.size();
515 }
516
517 auto next_message_id() { return _message_id++; }
518 void wait_for_reply(id_type id, std::unique_ptr<reply_handler_base>&& h, std::optional<rpc_clock_type::time_point> timeout, cancellable* cancel);
519 void wait_timed_out(id_type id);
520 future<> stop() noexcept;
521 void abort_all_streams();
522 void deregister_this_stream();
523 socket_address peer_address() const override {
524 return _server_addr;
525 }
526 future<> await_connection() {
527 if (!_negotiated) {
528 return make_ready_future<>();
529 } else {
530 return _negotiated->get_shared_future();
531 }
532 }
533 template<typename Serializer, typename... Out>
534 future<sink<Out...>> make_stream_sink(socket socket) {
535 return await_connection().then([this, socket = std::move(socket)] () mutable {
536 if (!this->get_connection_id()) {
537 return make_exception_future<sink<Out...>>(std::runtime_error("Streaming is not supported by the server"));
538 }
539 client_options o = _options;
540 o.stream_parent = this->get_connection_id();
541 o.send_timeout_data = false;
542 o.metrics_domain += "_stream";
543 auto c = make_shared<client>(_logger, _serializer, o, std::move(socket), _server_addr, _local_addr);
544 c->_parent = this->weak_from_this();
545 c->_is_stream = true;
546 return c->await_connection().then([c, this] {
547 if (_error) {
548 throw closed_error();
549 }
550 xshard_connection_ptr s = make_lw_shared(make_foreign(static_pointer_cast<rpc::connection>(c)));
551 this->register_stream(c->get_connection_id(), s);
552 return sink<Out...>(make_shared<sink_impl<Serializer, Out...>>(std::move(s)));
553 }).handle_exception([c] (std::exception_ptr eptr) {
554 // If await_connection fails we need to stop the client
555 // before destroying it.
556 return c->stop().then([eptr, c] {
557 return make_exception_future<sink<Out...>>(eptr);
558 });
559 });
560 });
561 }
562 template<typename Serializer, typename... Out>
563 future<sink<Out...>> make_stream_sink() {
564 return make_stream_sink<Serializer, Out...>(make_socket());
565 }
566
567 future<> request(uint64_t type, int64_t id, snd_buf buf, std::optional<rpc_clock_type::time_point> timeout = {}, cancellable* cancel = nullptr);
568};
569
570class protocol_base;
571
572class server {
573private:
574 static thread_local std::unordered_map<streaming_domain_type, server*> _servers;
575
576public:
577 class connection : public rpc::connection, public enable_shared_from_this<connection> {
578 client_info _info;
579 connection_id _parent_id = invalid_connection_id;
580 std::optional<isolation_config> _isolation_config;
581 private:
582 future<> negotiate_protocol();
583 future<std::tuple<std::optional<uint64_t>, uint64_t, int64_t, std::optional<rcv_buf>>>
584 read_request_frame_compressed(input_stream<char>& in);
585 future<feature_map> negotiate(feature_map requested);
586 future<> send_unknown_verb_reply(std::optional<rpc_clock_type::time_point> timeout, int64_t msg_id, uint64_t type);
587 public:
588 connection(server& s, connected_socket&& fd, socket_address&& addr, const logger& l, void* seralizer, connection_id id);
589 future<> process();
590 future<> respond(int64_t msg_id, snd_buf&& data, std::optional<rpc_clock_type::time_point> timeout, std::optional<rpc_clock_type::duration> handler_duration);
591 client_info& info() { return _info; }
592 const client_info& info() const { return _info; }
593 stats get_stats() const {
594 stats res = _stats;
595 res.pending = outgoing_queue_length();
596 return res;
597 }
598 socket_address peer_address() const override {
599 return _info.addr;
600 }
601 // Resources will be released when this goes out of scope
602 future<resource_permit> wait_for_resources(size_t memory_consumed, std::optional<rpc_clock_type::time_point> timeout) {
603 if (timeout) {
604 return get_units(get_server()._resources_available, memory_consumed, *timeout);
605 } else {
606 return get_units(get_server()._resources_available, memory_consumed);
607 }
608 }
609 size_t estimate_request_size(size_t serialized_size) {
610 return rpc::estimate_request_size(get_server()._limits, serialized_size);
611 }
612 size_t max_request_size() const {
613 return get_server()._limits.max_memory;
614 }
615 server& get_server() {
616 return _info.server;
617 }
618 const server& get_server() const {
619 return _info.server;
620 }
621 future<> deregister_this_stream();
622 future<> abort_all_streams();
623 };
624private:
625 protocol_base& _proto;
626 server_socket _ss;
627 resource_limits _limits;
628 rpc_semaphore _resources_available;
629 std::unordered_map<connection_id, shared_ptr<connection>> _conns;
630 promise<> _ss_stopped;
631 gate _reply_gate;
632 server_options _options;
633 bool _shutdown = false;
634 uint64_t _next_client_id = 1;
635
636public:
637 server(protocol_base* proto, const socket_address& addr, resource_limits memory_limit = resource_limits());
638 server(protocol_base* proto, server_options opts, const socket_address& addr, resource_limits memory_limit = resource_limits());
641 void accept();
661 template<typename Func>
662 void foreach_connection(Func&& f) {
663 for (auto c : _conns) {
664 f(*c.second);
665 }
666 }
675 gate& reply_gate() {
676 return _reply_gate;
677 }
678 friend connection;
679 friend client;
680};
681
682using rpc_handler_func = std::function<future<> (shared_ptr<server::connection>, std::optional<rpc_clock_type::time_point> timeout, int64_t msgid,
683 rcv_buf data, gate::holder guard)>;
684
687 rpc_handler_func func;
688 gate use_gate;
689};
690
692public:
693 virtual ~protocol_base() {};
694 virtual shared_ptr<server::connection> make_server_connection(rpc::server& server, connected_socket fd, socket_address addr, connection_id id) = 0;
695protected:
696 friend class server;
697
699 rpc_handler& handler;
700 gate::holder holder;
701 };
702 virtual std::optional<handler_with_holder> get_handler(uint64_t msg_id) = 0;
703};
704
707
797template<typename Serializer, typename MsgType = uint32_t>
798class protocol final : public protocol_base {
799public:
801 class server : public rpc::server {
802 public:
803 server(protocol& proto, const socket_address& addr, resource_limits memory_limit = resource_limits()) :
804 rpc::server(&proto, addr, memory_limit) {}
805 server(protocol& proto, server_options opts, const socket_address& addr, resource_limits memory_limit = resource_limits()) :
806 rpc::server(&proto, opts, addr, memory_limit) {}
808 rpc::server(&proto, std::move(socket), memory_limit) {}
810 rpc::server(&proto, opts, std::move(socket), memory_limit) {}
811 };
813 class client : public rpc::client {
814 public:
815 /*
816 * Create client object which will attempt to connect to the remote address.
817 *
818 * @param addr the remote address identifying this client
819 * @param local the local address of this client
820 */
821 client(protocol& p, const socket_address& addr, const socket_address& local = {}) :
822 rpc::client(p.get_logger(), &p._serializer, addr, local) {}
823 client(protocol& p, client_options options, const socket_address& addr, const socket_address& local = {}) :
824 rpc::client(p.get_logger(), &p._serializer, options, addr, local) {}
825
834 client(protocol& p, socket socket, const socket_address& addr, const socket_address& local = {}) :
835 rpc::client(p.get_logger(), &p._serializer, std::move(socket), addr, local) {}
836 client(protocol& p, client_options options, socket socket, const socket_address& addr, const socket_address& local = {}) :
837 rpc::client(p.get_logger(), &p._serializer, options, std::move(socket), addr, local) {}
838 };
839
840 friend server;
841private:
842 std::unordered_map<MsgType, rpc_handler> _handlers;
843 Serializer _serializer;
844 logger _logger;
845
846public:
847 protocol(Serializer&& serializer) : _serializer(std::forward<Serializer>(serializer)) {}
848
858 template<typename Func>
859 auto make_client(MsgType t);
860
873 template<typename Func>
874 auto register_handler(MsgType t, Func&& func);
875
893 template <typename Func>
894 auto register_handler(MsgType t, scheduling_group sg, Func&& func);
895
906 future<> unregister_handler(MsgType t);
907
908
910 void set_logger(::seastar::logger* logger) {
911 _logger.set(logger);
912 }
913
914 const logger& get_logger() const {
915 return _logger;
916 }
917
918 shared_ptr<rpc::server::connection> make_server_connection(rpc::server& server, connected_socket fd, socket_address addr, connection_id id) override {
919 return make_shared<rpc::server::connection>(server, std::move(fd), std::move(addr), _logger, &_serializer, id);
920 }
921
922 bool has_handler(MsgType msg_id);
923
928 bool has_handlers() const noexcept {
929 return !_handlers.empty();
930 }
931
932private:
933 std::optional<handler_with_holder> get_handler(uint64_t msg_id) override;
934
935 template<typename Ret, typename... In>
936 auto make_client(signature<Ret(In...)> sig, MsgType t);
937
938 void register_receiver(MsgType t, rpc_handler&& handler) {
939 auto r = _handlers.emplace(t, std::move(handler));
940 if (!r.second) {
941 throw_with_backtrace<std::runtime_error>("registered handler already exists");
942 }
943 }
944};
945
947
948}
949
950}
951
952#include "rpc_impl.hh"
static constexpr size_t max_counter() noexcept
Returns the maximum number of units the semaphore counter can hold.
Definition: semaphore.hh:239
Definition: api.hh:183
Definition: shared_ptr.hh:493
A representation of a possibly not-yet-computed value.
Definition: future.hh:1198
value_type && get()
gets the value returned by the computation
Definition: future.hh:1300
Definition: gate.hh:193
Definition: gate.hh:83
Definition: queue.hh:45
Definition: rpc_types.hh:166
Definition: rpc.hh:414
client(const logger &l, void *s, const socket_address &addr, const socket_address &local={})
client(const logger &l, void *s, socket socket, const socket_address &addr, const socket_address &local={})
Definition: rpc_types.hh:291
Definition: rpc_types.hh:67
Definition: rpc.hh:230
Definition: rpc.hh:193
Represents a client side connection.
Definition: rpc.hh:813
client(protocol &p, socket socket, const socket_address &addr, const socket_address &local={})
Definition: rpc.hh:834
Represents the listening port and all accepted connections.
Definition: rpc.hh:801
Definition: rpc.hh:691
Definition: rpc.hh:798
bool has_handlers() const noexcept
Definition: rpc.hh:928
void set_logger(::seastar::logger *logger)
Set a logger to be used to log messages.
Definition: rpc.hh:910
Definition: rpc.hh:577
Definition: rpc.hh:572
void abort_connection(connection_id id)
Definition: rpc_types.hh:320
Definition: rpc.hh:389
Definition: rpc_types.hh:318
Definition: rpc_types.hh:359
Definition: rpc.hh:408
Definition: rpc_types.hh:357
Definition: rpc_types.hh:145
Identifies function calls that are accounted as a group.
Definition: scheduling.hh:293
A listening socket, waiting to accept incoming network connections.
Definition: api.hh:326
Like promise except that its counterpart is shared_future instead of future.
Definition: shared_future.hh:319
Definition: shared_ptr.hh:507
Definition: socket_defs.hh:47
Definition: api.hh:283
Definition: timer.hh:84
bool cancel() noexcept
Definition: weak_ptr.hh:46
Definition: weak_ptr.hh:133
basic_semaphore< semaphore_default_exception_factory > semaphore
Definition: semaphore.hh:818
future< T > make_ready_future(A &&... value) noexcept
Creates a future in an available, value state.
Definition: future.hh:1870
future< T > get_future() noexcept
Gets the promise's associated future.
Definition: future.hh:1853
future< T > make_exception_future(std::exception_ptr &&value) noexcept
Creates a future in an available, failed state.
Definition: future.hh:1876
log_level
log level used with
Definition: log.hh:54
socket make_socket()
scheduling_group sched_group
Definition: rpc.hh:78
size_t max_memory
Definition: rpc.hh:100
unsigned bloat_factor
Serialized size multiplied by this to estimate memory used by request.
Definition: rpc.hh:99
std::function< isolation_config(sstring isolation_cookie)> syncronous_isolation_function
Definition: rpc.hh:103
size_t basic_request_size
Minimum request footprint in memory.
Definition: rpc.hh:98
sstring isolation_cookie
Definition: rpc.hh:119
isolation_config default_isolate_connection(sstring isolation_cookie)
Definition: rpc.hh:109
Specifies resource isolation for a connection.
Definition: rpc.hh:75
Resource limits for an RPC server.
Definition: rpc.hh:97
Definition: rpc.hh:152
statistics stats()
Capture a snapshot of memory allocation statistics for this lcore.
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
scheduling_group current_scheduling_group() noexcept
Returns the current scheduling group.
Definition: scheduling.hh:405
STL namespace.
ostream & operator<<(ostream &os, const seastar::lazy_eval< Func > &lf)
Definition: lazy.hh:131
Definition: rpc_types.hh:204
Definition: rpc_types.hh:98
Definition: rpc.hh:244
Definition: rpc.hh:382
Definition: rpc.hh:172
Definition: rpc_types.hh:240
Definition: rpc.hh:685
Definition: rpc.hh:191
Definition: rpc_types.hh:252
Definition: rpc_types.hh:55