Seastar
High performance C++ framework for concurrent servers
rpc.hh
1 /*
2  * This file is open source software, licensed to you under the terms
3  * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4  * distributed with this work for additional information regarding copyright
5  * ownership. You may not use this file except in compliance with the License.
6  *
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an
13  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14  * KIND, either express or implied. See the License for the
15  * specific language governing permissions and limitations
16  * under the License.
17  */
18 /*
19  * Copyright (C) 2015 Cloudius Systems, Ltd.
20  */
21 
22 #pragma once
23 
24 #include <unordered_map>
25 #include <unordered_set>
26 #include <list>
27 #include <variant>
28 #include <boost/intrusive/list.hpp>
29 #include <seastar/core/future.hh>
30 #include <seastar/core/seastar.hh>
31 #include <seastar/net/api.hh>
32 #include <seastar/core/iostream.hh>
33 #include <seastar/core/shared_ptr.hh>
34 #include <seastar/core/condition-variable.hh>
35 #include <seastar/core/gate.hh>
36 #include <seastar/rpc/rpc_types.hh>
37 #include <seastar/core/byteorder.hh>
38 #include <seastar/core/shared_future.hh>
39 #include <seastar/core/queue.hh>
40 #include <seastar/core/weak_ptr.hh>
42 #include <seastar/util/backtrace.hh>
43 #include <seastar/util/log.hh>
44 
45 namespace bi = boost::intrusive;
46 
47 namespace seastar {
48 
49 namespace rpc {
50 
63 
64 using id_type = int64_t;
65 
66 using rpc_semaphore = basic_semaphore<semaphore_default_exception_factory, rpc_clock_type>;
67 using resource_permit = semaphore_units<semaphore_default_exception_factory, rpc_clock_type>;
68 
69 static constexpr char rpc_magic[] = "SSTARRPC";
70 
73 
79 };
80 
84 isolation_config default_isolate_connection(sstring isolation_cookie);
85 
98  size_t basic_request_size = 0;
99  unsigned bloat_factor = 1;
103  using syncronous_isolation_function = std::function<isolation_config (sstring isolation_cookie)>;
104  using asyncronous_isolation_function = std::function<future<isolation_config> (sstring isolation_cookie)>;
105  using isolation_function_alternatives = std::variant<syncronous_isolation_function, asyncronous_isolation_function>;
106  isolation_function_alternatives isolate_connection = default_isolate_connection;
107 };
108 
110  std::optional<net::tcp_keepalive_params> keepalive;
111  bool tcp_nodelay = true;
112  bool reuseaddr = false;
113  compressor::factory* compressor_factory = nullptr;
114  bool send_timeout_data = true;
115  connection_id stream_parent = invalid_connection_id;
120  sstring metrics_domain = "default";
121 };
122 
124 
125 // RPC call that passes stream connection id as a parameter
126 // may arrive to a different shard from where the stream connection
127 // was opened, so the connection id is not known to a server that handles
128 // the RPC call. The shard that the stream connection belong to is know
129 // since it is a part of connection id, but this is not enough to locate
130 // a server instance the connection belongs to if there are more than one
131 // server on the shard. Stream domain parameter is here to help with that.
132 // Different servers on all shards logically belonging to the same service should
133 // belong to the same streaming domain. Only one server on each shard can belong to
134 // a particulr streaming domain.
136  uint64_t _id;
137 public:
138  explicit streaming_domain_type(uint64_t id) : _id(id) {}
139  bool operator==(const streaming_domain_type& o) const {
140  return _id == o._id;
141  }
142  friend struct std::hash<streaming_domain_type>;
143  friend std::ostream& operator<<(std::ostream&, const streaming_domain_type&);
144 };
145 
148 
149 class server;
150 
152  compressor::factory* compressor_factory = nullptr;
153  bool tcp_nodelay = true;
154  std::optional<streaming_domain_type> streaming_domain;
155  server_socket::load_balancing_algorithm load_balancing_algorithm = server_socket::load_balancing_algorithm::default_;
156  // optional filter function. If set, will be called with remote
157  // (connecting) address.
158  // Returning false will refuse the incoming connection.
159  // Returning true will allow the mechanism to proceed.
160  std::function<bool(const socket_address&)> filter_connection = {};
161 };
162 
164 
165 inline
166 size_t
167 estimate_request_size(const resource_limits& lim, size_t serialized_size) {
168  return lim.basic_request_size + serialized_size * lim.bloat_factor;
169 }
170 
172  char magic[sizeof(rpc_magic) - 1];
173  uint32_t len; // additional negotiation data length; multiple negotiation_frame_feature_record structs
174 };
175 
176 enum class protocol_features : uint32_t {
177  COMPRESS = 0,
178  TIMEOUT = 1,
179  CONNECTION_ID = 2,
180  STREAM_PARENT = 3,
181  ISOLATION = 4,
182 };
183 
184 // internal representation of feature data
185 using feature_map = std::map<protocol_features, sstring>;
186 
187 // An rpc signature, in the form signature<Ret (In0, In1, In2)>.
188 template <typename Function>
189 struct signature;
190 
191 class logger {
192  std::function<void(const sstring&)> _logger;
193  ::seastar::logger* _seastar_logger = nullptr;
194 
195  // _seastar_logger will always be used first if it's available
196  void log(const sstring& str) const {
197  if (_seastar_logger) {
198  // default level for log messages is `info`
199  _seastar_logger->info("{}", str);
200  } else if (_logger) {
201  _logger(str);
202  }
203  }
204 
205  // _seastar_logger will always be used first if it's available
206  template <typename... Args>
207  void log(log_level level, const char* fmt, Args&&... args) const {
208  if (_seastar_logger) {
209  _seastar_logger->log(level, fmt, std::forward<Args>(args)...);
210  // If the log level is at least `info`, fall back to legacy logging without explicit level.
211  // Ignore less severe levels in order not to spam user's log with messages during transition,
212  // i.e. when the user still only defines a level-less logger.
213  } else if (_logger && level <= log_level::info) {
214  _logger(format(fmt, std::forward<Args>(args)...));
215  }
216  }
217 
218 public:
219  void set(std::function<void(const sstring&)> l) {
220  _logger = std::move(l);
221  }
222 
223  void set(::seastar::logger* logger) {
224  _seastar_logger = logger;
225  }
226 
227  void operator()(const client_info& info, id_type msg_id, const sstring& str) const;
228  void operator()(const client_info& info, id_type msg_id, log_level level, std::string_view str) const;
229 
230  void operator()(const client_info& info, const sstring& str) const;
231  void operator()(const client_info& info, log_level level, std::string_view str) const;
232 
233  void operator()(const socket_address& addr, const sstring& str) const;
234  void operator()(const socket_address& addr, log_level level, std::string_view str) const;
235 };
236 
237 class connection {
238 protected:
239  connected_socket _fd;
240  input_stream<char> _read_buf;
241  output_stream<char> _write_buf;
242  bool _error = false;
243  bool _connected = false;
244  std::optional<shared_promise<>> _negotiated = shared_promise<>();
245  promise<> _stopped;
246  stats _stats;
247  const logger& _logger;
248  // The owner of the pointer below is an instance of rpc::protocol<typename Serializer> class.
249  // The type of the pointer is erased here, but the original type is Serializer
250  void* _serializer;
251  struct outgoing_entry : public bi::list_base_hook<bi::link_mode<bi::auto_unlink>> {
253  snd_buf buf;
254  promise<> done;
255  cancellable* pcancel = nullptr;
256  outgoing_entry(snd_buf b) : buf(std::move(b)) {}
257 
258  outgoing_entry(outgoing_entry&&) = delete;
259  outgoing_entry(const outgoing_entry&) = delete;
260 
261  void uncancellable() {
262  t.cancel();
263  if (pcancel) {
264  pcancel->cancel_send = std::function<void()>();
265  }
266  }
267 
268  ~outgoing_entry() {
269  if (pcancel) {
270  pcancel->cancel_send = std::function<void()>();
271  pcancel->send_back_pointer = nullptr;
272  }
273  }
274 
275  using container_t = bi::list<outgoing_entry, bi::constant_time_size<false>>;
276  };
277  void withdraw(outgoing_entry::container_t::iterator it, std::exception_ptr ex = nullptr);
278  future<> _outgoing_queue_ready = _negotiated->get_shared_future();
279  outgoing_entry::container_t _outgoing_queue;
280  size_t _outgoing_queue_size = 0;
281  std::unique_ptr<compressor> _compressor;
282  bool _propagate_timeout = false;
283  bool _timeout_negotiated = false;
284  // stream related fields
285  bool _is_stream = false;
286  connection_id _id = invalid_connection_id;
287 
288  std::unordered_map<connection_id, xshard_connection_ptr> _streams;
289  queue<rcv_buf> _stream_queue = queue<rcv_buf>(max_queued_stream_buffers);
290  semaphore _stream_sem = semaphore(max_stream_buffers_memory);
291  bool _sink_closed = true;
292  bool _source_closed = true;
293  // the future holds if sink is already closed
294  // if it is not ready it means the sink is been closed
295  future<bool> _sink_closed_future = make_ready_future<bool>(false);
296 
297  void set_negotiated() noexcept;
298 
299  bool is_stream() const noexcept {
300  return _is_stream;
301  }
302 
303  snd_buf compress(snd_buf buf);
304  future<> send_buffer(snd_buf buf);
305  future<> send(snd_buf buf, std::optional<rpc_clock_type::time_point> timeout = {}, cancellable* cancel = nullptr);
306  future<> send_entry(outgoing_entry& d);
307  future<> stop_send_loop(std::exception_ptr ex);
308  future<std::optional<rcv_buf>> read_stream_frame_compressed(input_stream<char>& in);
309  bool stream_check_twoway_closed() const noexcept {
310  return _sink_closed && _source_closed;
311  }
312  future<> stream_close();
313  future<> stream_process_incoming(rcv_buf&&);
314  future<> handle_stream_frame();
315 
316 public:
317  connection(connected_socket&& fd, const logger& l, void* s, connection_id id = invalid_connection_id) : connection(l, s, id) {
318  set_socket(std::move(fd));
319  }
320  connection(const logger& l, void* s, connection_id id = invalid_connection_id) : _logger(l), _serializer(s), _id(id) {}
321  virtual ~connection() {}
322  size_t outgoing_queue_length() const noexcept {
323  return _outgoing_queue_size;
324  }
325 
326  void set_socket(connected_socket&& fd);
327  future<> send_negotiation_frame(feature_map features);
328  bool error() const noexcept { return _error; }
329  void abort();
330  future<> stop() noexcept;
331  future<> stream_receive(circular_buffer<foreign_ptr<std::unique_ptr<rcv_buf>>>& bufs);
332  future<> close_sink() {
333  _sink_closed = true;
334  if (stream_check_twoway_closed()) {
335  return stream_close();
336  }
337  return make_ready_future();
338  }
339  bool sink_closed() const noexcept {
340  return _sink_closed;
341  }
342  future<> close_source() {
343  _source_closed = true;
344  if (stream_check_twoway_closed()) {
345  return stream_close();
346  }
347  return make_ready_future();
348  }
349  connection_id get_connection_id() const noexcept {
350  return _id;
351  }
352  stats& get_stats_internal() noexcept {
353  return _stats;
354  }
355  xshard_connection_ptr get_stream(connection_id id) const;
356  void register_stream(connection_id id, xshard_connection_ptr c);
357  virtual socket_address peer_address() const = 0;
358 
359  const logger& get_logger() const noexcept {
360  return _logger;
361  }
362 
363  template<typename Serializer>
364  Serializer& serializer() {
365  return *static_cast<Serializer*>(_serializer);
366  }
367 
368  template <typename FrameType>
369  typename FrameType::return_type read_frame(socket_address info, input_stream<char>& in);
370 
371  template <typename FrameType>
372  typename FrameType::return_type read_frame_compressed(socket_address info, std::unique_ptr<compressor>& compressor, input_stream<char>& in);
373  friend class client;
374  template<typename Serializer, typename... Out>
375  friend class sink_impl;
376  template<typename Serializer, typename... In>
377  friend class source_impl;
378 
379  void suspend_for_testing(promise<>& p) {
380  _outgoing_queue_ready.get();
381  auto dummy = std::make_unique<outgoing_entry>(snd_buf());
382  _outgoing_queue.push_back(*dummy);
383  _outgoing_queue_ready = dummy->done.get_future();
384  (void)p.get_future().then([dummy = std::move(dummy)] { dummy->done.set_value(); });
385  }
386 };
387 
389  promise<> pr;
390  snd_buf data;
391 };
392 
393 // send data Out...
394 template<typename Serializer, typename... Out>
395 class sink_impl : public sink<Out...>::impl {
396  // Used on the shard *this lives on.
397  alignas (cache_line_size) uint64_t _next_seq_num = 1;
398 
399  // Used on the shard the _conn lives on.
400  struct alignas (cache_line_size) {
401  uint64_t last_seq_num = 0;
402  std::map<uint64_t, deferred_snd_buf> out_of_order_bufs;
403  } _remote_state;
404 public:
405  sink_impl(xshard_connection_ptr con) : sink<Out...>::impl(std::move(con)) { this->_con->get()->_sink_closed = false; }
406  future<> operator()(const Out&... args) override;
407  future<> close() override;
408  future<> flush() override;
409  ~sink_impl() override;
410 };
411 
412 // receive data In...
413 template<typename Serializer, typename... In>
414 class source_impl : public source<In...>::impl {
415 public:
416  source_impl(xshard_connection_ptr con) : source<In...>::impl(std::move(con)) { this->_con->get()->_source_closed = false; }
417  future<std::optional<std::tuple<In...>>> operator()() override;
418 };
419 
420 class client : public rpc::connection, public weakly_referencable<client> {
421  socket _socket;
422  id_type _message_id = 1;
423  struct reply_handler_base {
425  cancellable* pcancel = nullptr;
426  virtual void operator()(client&, id_type, rcv_buf data) = 0;
427  virtual void timeout() {}
428  virtual void cancel() {}
429  virtual ~reply_handler_base() {
430  if (pcancel) {
431  pcancel->cancel_wait = std::function<void()>();
432  pcancel->wait_back_pointer = nullptr;
433  }
434  };
435  };
436 
437  class metrics {
438  struct domain;
439 
440  using domain_member_hook_t = boost::intrusive::list_member_hook<boost::intrusive::link_mode<boost::intrusive::auto_unlink>>;
441 
442  const client& _c;
443  domain& _domain;
444  domain_member_hook_t _link;
445 
446  using domain_list_t = boost::intrusive::list<metrics,
447  boost::intrusive::member_hook<metrics, metrics::domain_member_hook_t, &metrics::_link>,
448  boost::intrusive::constant_time_size<false>>;
449 
450  public:
451  metrics(const client&);
452  ~metrics();
453  };
454 
455  void enqueue_zero_frame();
456 public:
457  template<typename Reply, typename Func>
458  struct reply_handler final : reply_handler_base {
459  Func func;
460  Reply reply;
461  reply_handler(Func&& f) : func(std::move(f)) {}
462  virtual void operator()(client& client, id_type msg_id, rcv_buf data) override {
463  return func(reply, client, msg_id, std::move(data));
464  }
465  virtual void timeout() override {
466  reply.done = true;
467  reply.p.set_exception(timeout_error());
468  }
469  virtual void cancel() override {
470  reply.done = true;
471  reply.p.set_exception(canceled_error());
472  }
473  virtual ~reply_handler() {}
474  };
475 private:
476  std::unordered_map<id_type, std::unique_ptr<reply_handler_base>> _outstanding;
477  socket_address _server_addr, _local_addr;
478  client_options _options;
479  weak_ptr<client> _parent; // for stream clients
480 
481  metrics _metrics;
482 
483 private:
484  future<> negotiate_protocol(feature_map map);
485  void negotiate(feature_map server_features);
487  read_response_frame_compressed(input_stream<char>& in);
488 public:
497  client(const logger& l, void* s, const socket_address& addr, const socket_address& local = {});
498  client(const logger& l, void* s, client_options options, const socket_address& addr, const socket_address& local = {});
499 
510  client(const logger& l, void* s, socket socket, const socket_address& addr, const socket_address& local = {});
511  client(const logger& l, void* s, client_options options, socket socket, const socket_address& addr, const socket_address& local = {});
512 
513  stats get_stats() const;
514  size_t incoming_queue_length() const noexcept {
515  return _outstanding.size();
516  }
517 
518  auto next_message_id() { return _message_id++; }
519  void wait_for_reply(id_type id, std::unique_ptr<reply_handler_base>&& h, std::optional<rpc_clock_type::time_point> timeout, cancellable* cancel);
520  void wait_timed_out(id_type id);
521  future<> stop() noexcept;
522  void abort_all_streams();
523  void deregister_this_stream();
524  socket_address peer_address() const override {
525  return _server_addr;
526  }
527  future<> await_connection() {
528  if (!_negotiated) {
529  return make_ready_future<>();
530  } else {
531  return _negotiated->get_shared_future();
532  }
533  }
534  template<typename Serializer, typename... Out>
535  future<sink<Out...>> make_stream_sink(socket socket) {
536  return await_connection().then([this, socket = std::move(socket)] () mutable {
537  if (!this->get_connection_id()) {
538  return make_exception_future<sink<Out...>>(std::runtime_error("Streaming is not supported by the server"));
539  }
540  client_options o = _options;
541  o.stream_parent = this->get_connection_id();
542  o.send_timeout_data = false;
543  o.metrics_domain += "_stream";
544  auto c = make_shared<client>(_logger, _serializer, o, std::move(socket), _server_addr, _local_addr);
545  c->_parent = this->weak_from_this();
546  c->_is_stream = true;
547  return c->await_connection().then([c, this] {
548  if (_error) {
549  throw closed_error();
550  }
551  xshard_connection_ptr s = make_lw_shared(make_foreign(static_pointer_cast<rpc::connection>(c)));
552  this->register_stream(c->get_connection_id(), s);
553  return sink<Out...>(make_shared<sink_impl<Serializer, Out...>>(std::move(s)));
554  }).handle_exception([c] (std::exception_ptr eptr) {
555  // If await_connection fails we need to stop the client
556  // before destroying it.
557  return c->stop().then([eptr, c] {
558  return make_exception_future<sink<Out...>>(eptr);
559  });
560  });
561  });
562  }
563  template<typename Serializer, typename... Out>
564  future<sink<Out...>> make_stream_sink() {
565  return make_stream_sink<Serializer, Out...>(make_socket());
566  }
567 
568  future<> request(uint64_t type, int64_t id, snd_buf buf, std::optional<rpc_clock_type::time_point> timeout = {}, cancellable* cancel = nullptr);
569 };
570 
571 class protocol_base;
572 
573 class server {
574 private:
575  static thread_local std::unordered_map<streaming_domain_type, server*> _servers;
576 
577 public:
578  class connection : public rpc::connection, public enable_shared_from_this<connection> {
579  client_info _info;
580  connection_id _parent_id = invalid_connection_id;
581  std::optional<isolation_config> _isolation_config;
582  private:
583  future<> negotiate_protocol();
584  future<std::tuple<std::optional<uint64_t>, uint64_t, int64_t, std::optional<rcv_buf>>>
585  read_request_frame_compressed(input_stream<char>& in);
586  future<feature_map> negotiate(feature_map requested);
587  future<> send_unknown_verb_reply(std::optional<rpc_clock_type::time_point> timeout, int64_t msg_id, uint64_t type);
588  public:
589  connection(server& s, connected_socket&& fd, socket_address&& addr, const logger& l, void* seralizer, connection_id id);
590  future<> process();
591  future<> respond(int64_t msg_id, snd_buf&& data, std::optional<rpc_clock_type::time_point> timeout);
592  client_info& info() { return _info; }
593  const client_info& info() const { return _info; }
594  stats get_stats() const {
595  stats res = _stats;
596  res.pending = outgoing_queue_length();
597  return res;
598  }
599  socket_address peer_address() const override {
600  return _info.addr;
601  }
602  // Resources will be released when this goes out of scope
603  future<resource_permit> wait_for_resources(size_t memory_consumed, std::optional<rpc_clock_type::time_point> timeout) {
604  if (timeout) {
605  return get_units(get_server()._resources_available, memory_consumed, *timeout);
606  } else {
607  return get_units(get_server()._resources_available, memory_consumed);
608  }
609  }
610  size_t estimate_request_size(size_t serialized_size) {
611  return rpc::estimate_request_size(get_server()._limits, serialized_size);
612  }
613  size_t max_request_size() const {
614  return get_server()._limits.max_memory;
615  }
616  server& get_server() {
617  return _info.server;
618  }
619  const server& get_server() const {
620  return _info.server;
621  }
622  future<> deregister_this_stream();
623  future<> abort_all_streams();
624  };
625 private:
626  protocol_base& _proto;
627  server_socket _ss;
628  resource_limits _limits;
629  rpc_semaphore _resources_available;
630  std::unordered_map<connection_id, shared_ptr<connection>> _conns;
631  promise<> _ss_stopped;
632  gate _reply_gate;
633  server_options _options;
634  bool _shutdown = false;
635  uint64_t _next_client_id = 1;
636 
637 public:
638  server(protocol_base* proto, const socket_address& addr, resource_limits memory_limit = resource_limits());
639  server(protocol_base* proto, server_options opts, const socket_address& addr, resource_limits memory_limit = resource_limits());
642  void accept();
662  template<typename Func>
663  void foreach_connection(Func&& f) {
664  for (auto c : _conns) {
665  f(*c.second);
666  }
667  }
676  gate& reply_gate() {
677  return _reply_gate;
678  }
679  friend connection;
680  friend client;
681 };
682 
683 using rpc_handler_func = std::function<future<> (shared_ptr<server::connection>, std::optional<rpc_clock_type::time_point> timeout, int64_t msgid,
684  rcv_buf data)>;
685 
686 struct rpc_handler {
687  scheduling_group sg;
688  rpc_handler_func func;
689  gate use_gate;
690 };
691 
693 public:
694  virtual ~protocol_base() {};
695  virtual shared_ptr<server::connection> make_server_connection(rpc::server& server, connected_socket fd, socket_address addr, connection_id id) = 0;
696 protected:
697  friend class server;
698 
699  virtual rpc_handler* get_handler(uint64_t msg_id) = 0;
700  virtual void put_handler(rpc_handler*) = 0;
701 };
702 
705 
795 template<typename Serializer, typename MsgType = uint32_t>
796 class protocol final : public protocol_base {
797 public:
799  class server : public rpc::server {
800  public:
801  server(protocol& proto, const socket_address& addr, resource_limits memory_limit = resource_limits()) :
802  rpc::server(&proto, addr, memory_limit) {}
803  server(protocol& proto, server_options opts, const socket_address& addr, resource_limits memory_limit = resource_limits()) :
804  rpc::server(&proto, opts, addr, memory_limit) {}
806  rpc::server(&proto, std::move(socket), memory_limit) {}
808  rpc::server(&proto, opts, std::move(socket), memory_limit) {}
809  };
811  class client : public rpc::client {
812  public:
813  /*
814  * Create client object which will attempt to connect to the remote address.
815  *
816  * @param addr the remote address identifying this client
817  * @param local the local address of this client
818  */
819  client(protocol& p, const socket_address& addr, const socket_address& local = {}) :
820  rpc::client(p.get_logger(), &p._serializer, addr, local) {}
821  client(protocol& p, client_options options, const socket_address& addr, const socket_address& local = {}) :
822  rpc::client(p.get_logger(), &p._serializer, options, addr, local) {}
823 
832  client(protocol& p, socket socket, const socket_address& addr, const socket_address& local = {}) :
833  rpc::client(p.get_logger(), &p._serializer, std::move(socket), addr, local) {}
834  client(protocol& p, client_options options, socket socket, const socket_address& addr, const socket_address& local = {}) :
835  rpc::client(p.get_logger(), &p._serializer, options, std::move(socket), addr, local) {}
836  };
837 
838  friend server;
839 private:
840  std::unordered_map<MsgType, rpc_handler> _handlers;
841  Serializer _serializer;
842  logger _logger;
843 
844 public:
845  protocol(Serializer&& serializer) : _serializer(std::forward<Serializer>(serializer)) {}
846 
856  template<typename Func>
857  auto make_client(MsgType t);
858 
871  template<typename Func>
872  auto register_handler(MsgType t, Func&& func);
873 
891  template <typename Func>
892  auto register_handler(MsgType t, scheduling_group sg, Func&& func);
893 
904  future<> unregister_handler(MsgType t);
905 
910  [[deprecated("Use set_logger(::seastar::logger*) instead")]]
911  void set_logger(std::function<void(const sstring&)> logger) {
912  _logger.set(std::move(logger));
913  }
914 
917  _logger.set(logger);
918  }
919 
920  const logger& get_logger() const {
921  return _logger;
922  }
923 
924  shared_ptr<rpc::server::connection> make_server_connection(rpc::server& server, connected_socket fd, socket_address addr, connection_id id) override {
925  return make_shared<rpc::server::connection>(server, std::move(fd), std::move(addr), _logger, &_serializer, id);
926  }
927 
928  bool has_handler(MsgType msg_id);
929 
934  bool has_handlers() const noexcept {
935  return !_handlers.empty();
936  }
937 
938 private:
939  rpc_handler* get_handler(uint64_t msg_id) override;
940  void put_handler(rpc_handler*) override;
941 
942  template<typename Ret, typename... In>
943  auto make_client(signature<Ret(In...)> sig, MsgType t);
944 
945  void register_receiver(MsgType t, rpc_handler&& handler) {
946  auto r = _handlers.emplace(t, std::move(handler));
947  if (!r.second) {
948  throw_with_backtrace<std::runtime_error>("registered handler already exists");
949  }
950  }
951 };
952 
954 
955 }
956 
957 }
958 
959 #include "rpc_impl.hh"
static constexpr size_t max_counter() noexcept
Returns the maximum number of units the semaphore counter can hold.
Definition: semaphore.hh:231
Definition: api.hh:182
Definition: shared_ptr.hh:501
value_type && get()
gets the value returned by the computation
Definition: future.hh:1340
Definition: gate.hh:61
Logger class for ostream or syslog.
Definition: log.hh:90
void info(format_info fmt, Args &&... args) noexcept
Definition: log.hh:355
Definition: shared_ptr.hh:270
Definition: queue.hh:44
Definition: rpc_types.hh:164
Definition: rpc.hh:420
client(const logger &l, void *s, const socket_address &addr, const socket_address &local={})
client(const logger &l, void *s, socket socket, const socket_address &addr, const socket_address &local={})
Definition: rpc_types.hh:288
Definition: rpc_types.hh:65
Definition: rpc.hh:237
Definition: rpc.hh:191
Represents a client side connection.
Definition: rpc.hh:811
client(protocol &p, socket socket, const socket_address &addr, const socket_address &local={})
Definition: rpc.hh:832
Represents the listening port and all accepted connections.
Definition: rpc.hh:799
Definition: rpc.hh:692
Definition: rpc.hh:796
void set_logger(std::function< void(const sstring &)> logger)
Definition: rpc.hh:911
bool has_handlers() const noexcept
Definition: rpc.hh:934
void set_logger(::seastar::logger *logger)
Set a logger to be used to log messages.
Definition: rpc.hh:916
Definition: rpc.hh:578
Definition: rpc.hh:573
void abort_connection(connection_id id)
Definition: rpc_types.hh:311
Definition: rpc.hh:395
Definition: rpc_types.hh:309
Definition: rpc_types.hh:350
Definition: rpc.hh:414
Definition: rpc_types.hh:348
Definition: rpc_types.hh:143
Identifies function calls that are accounted as a group.
Definition: scheduling.hh:286
A listening socket, waiting to accept incoming network connections.
Definition: api.hh:325
Like promise except that its counterpart is shared_future instead of future.
Definition: shared_future.hh:316
Definition: shared_ptr.hh:515
Definition: socket_defs.hh:47
Definition: api.hh:282
Definition: timer.hh:84
bool cancel() noexcept
Definition: weak_ptr.hh:46
Definition: weak_ptr.hh:120
basic_semaphore< semaphore_default_exception_factory > semaphore
Definition: semaphore.hh:804
future< T > make_ready_future(A &&... value) noexcept
Creates a future in an available, value state.
Definition: future.hh:1934
future< T > get_future() noexcept
Gets the promise's associated future.
Definition: future.hh:1917
future< T > make_exception_future(std::exception_ptr &&value) noexcept
Creates a future in an available, failed state.
Definition: future.hh:1940
socket make_socket()
scheduling_group sched_group
Definition: rpc.hh:78
size_t max_memory
Definition: rpc.hh:100
unsigned bloat_factor
Serialized size multiplied by this to estimate memory used by request.
Definition: rpc.hh:99
std::function< isolation_config(sstring isolation_cookie)> syncronous_isolation_function
Definition: rpc.hh:103
size_t basic_request_size
Minimum request footprint in memory.
Definition: rpc.hh:98
sstring isolation_cookie
Definition: rpc.hh:119
isolation_config default_isolate_connection(sstring isolation_cookie)
Definition: rpc.hh:109
Specifies resource isolation for a connection.
Definition: rpc.hh:75
Resource limits for an RPC server.
Definition: rpc.hh:97
Definition: rpc.hh:151
statistics stats()
Capture a snapshot of memory allocation statistics for this lcore.
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
scheduling_group current_scheduling_group() noexcept
Returns the current scheduling group.
Definition: scheduling.hh:400
sstring format(const char *fmt, A &&... a)
Definition: print.hh:142
log_level
log level used with
Definition: log.hh:52
Definition: rpc_types.hh:202
Definition: rpc_types.hh:96
Definition: rpc.hh:251
Definition: rpc.hh:388
Definition: rpc.hh:171
Definition: rpc_types.hh:238
Definition: rpc.hh:686
Definition: rpc.hh:189
Definition: rpc_types.hh:250
Definition: rpc_types.hh:55