Seastar
High performance C++ framework for concurrent servers
rpc.hh
1 /*
2  * This file is open source software, licensed to you under the terms
3  * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4  * distributed with this work for additional information regarding copyright
5  * ownership. You may not use this file except in compliance with the License.
6  *
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an
13  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14  * KIND, either express or implied. See the License for the
15  * specific language governing permissions and limitations
16  * under the License.
17  */
18 /*
19  * Copyright (C) 2015 Cloudius Systems, Ltd.
20  */
21 
22 #pragma once
23 
24 #include <unordered_map>
25 #include <unordered_set>
26 #include <list>
27 #include <seastar/core/future.hh>
28 #include <seastar/core/seastar.hh>
29 #include <seastar/net/api.hh>
30 #include <seastar/core/iostream.hh>
31 #include <seastar/core/shared_ptr.hh>
32 #include <seastar/core/condition-variable.hh>
33 #include <seastar/core/gate.hh>
34 #include <seastar/rpc/rpc_types.hh>
35 #include <seastar/core/byteorder.hh>
36 #include <seastar/core/shared_future.hh>
37 #include <seastar/core/queue.hh>
38 #include <seastar/core/weak_ptr.hh>
40 #include <seastar/util/backtrace.hh>
41 #include <seastar/util/log.hh>
42 
43 namespace seastar {
44 
45 namespace rpc {
46 
59 
60 using id_type = int64_t;
61 
62 using rpc_semaphore = basic_semaphore<semaphore_default_exception_factory, rpc_clock_type>;
63 using resource_permit = semaphore_units<semaphore_default_exception_factory, rpc_clock_type>;
64 
65 static constexpr char rpc_magic[] = "SSTARRPC";
66 
69 
75 };
76 
80 isolation_config default_isolate_connection(sstring isolation_cookie);
81 
94  size_t basic_request_size = 0;
95  unsigned bloat_factor = 1;
97  std::function<isolation_config (sstring isolation_cookie)> isolate_connection = default_isolate_connection;
100 };
101 
103  std::optional<net::tcp_keepalive_params> keepalive;
104  bool tcp_nodelay = true;
105  bool reuseaddr = false;
106  compressor::factory* compressor_factory = nullptr;
107  bool send_timeout_data = true;
108  connection_id stream_parent = invalid_connection_id;
113 };
114 
116 
117 // RPC call that passes stream connection id as a parameter
118 // may arrive to a different shard from where the stream connection
119 // was opened, so the connection id is not known to a server that handles
120 // the RPC call. The shard that the stream connection belong to is know
121 // since it is a part of connection id, but this is not enough to locate
122 // a server instance the connection belongs to if there are more than one
123 // server on the shard. Stream domain parameter is here to help with that.
124 // Different servers on all shards logically belonging to the same service should
125 // belong to the same streaming domain. Only one server on each shard can belong to
126 // a particulr streaming domain.
128  uint64_t _id;
129 public:
130  explicit streaming_domain_type(uint64_t id) : _id(id) {}
131  bool operator==(const streaming_domain_type& o) const {
132  return _id == o._id;
133  }
134  friend struct std::hash<streaming_domain_type>;
135  friend std::ostream& operator<<(std::ostream&, const streaming_domain_type&);
136 };
137 
140 
142  compressor::factory* compressor_factory = nullptr;
143  bool tcp_nodelay = true;
144  std::optional<streaming_domain_type> streaming_domain;
145  server_socket::load_balancing_algorithm load_balancing_algorithm = server_socket::load_balancing_algorithm::default_;
146 };
147 
149 
150 inline
151 size_t
152 estimate_request_size(const resource_limits& lim, size_t serialized_size) {
153  return lim.basic_request_size + serialized_size * lim.bloat_factor;
154 }
155 
157  char magic[sizeof(rpc_magic) - 1];
158  uint32_t len; // additional negotiation data length; multiple negotiation_frame_feature_record structs
159 };
160 
161 enum class protocol_features : uint32_t {
162  COMPRESS = 0,
163  TIMEOUT = 1,
164  CONNECTION_ID = 2,
165  STREAM_PARENT = 3,
166  ISOLATION = 4,
167 };
168 
169 // internal representation of feature data
170 using feature_map = std::map<protocol_features, sstring>;
171 
172 // An rpc signature, in the form signature<Ret (In0, In1, In2)>.
173 template <typename Function>
174 struct signature;
175 
176 class logger {
177  std::function<void(const sstring&)> _logger;
178  ::seastar::logger* _seastar_logger = nullptr;
179 
180  // _seastar_logger will always be used first if it's available
181  void log(const sstring& str) const {
182  if (_seastar_logger) {
183  // default level for log messages is `info`
184  _seastar_logger->info("{}", str);
185  } else if (_logger) {
186  _logger(str);
187  }
188  }
189 
190  // _seastar_logger will always be used first if it's available
191  template <typename... Args>
192  void log(log_level level, const char* fmt, Args&&... args) const {
193  if (_seastar_logger) {
194  _seastar_logger->log(level, fmt, std::forward<Args>(args)...);
195  // If the log level is at least `info`, fall back to legacy logging without explicit level.
196  // Ignore less severe levels in order not to spam user's log with messages during transition,
197  // i.e. when the user still only defines a level-less logger.
198  } else if (_logger && level <= log_level::info) {
199  _logger(format(fmt, std::forward<Args>(args)...));
200  }
201  }
202 
203 public:
204  void set(std::function<void(const sstring&)> l) {
205  _logger = std::move(l);
206  }
207 
208  void set(::seastar::logger* logger) {
209  _seastar_logger = logger;
210  }
211 
212  void operator()(const client_info& info, id_type msg_id, const sstring& str) const;
213  void operator()(const client_info& info, id_type msg_id, log_level level, std::string_view str) const;
214 
215  void operator()(const client_info& info, const sstring& str) const;
216  void operator()(const client_info& info, log_level level, std::string_view str) const;
217 
218  void operator()(const socket_address& addr, const sstring& str) const;
219  void operator()(const socket_address& addr, log_level level, std::string_view str) const;
220 };
221 
222 class connection {
223 protected:
224  connected_socket _fd;
225  input_stream<char> _read_buf;
226  output_stream<char> _write_buf;
227  bool _error = false;
228  bool _connected = false;
229  promise<> _stopped;
230  stats _stats;
231  const logger& _logger;
232  // The owner of the pointer below is an instance of rpc::protocol<typename Serializer> class.
233  // The type of the pointer is erased here, but the original type is Serializer
234  void* _serializer;
235  struct outgoing_entry {
237  snd_buf buf;
238  std::optional<promise<>> p = promise<>();
239  cancellable* pcancel = nullptr;
240  outgoing_entry(snd_buf b) : buf(std::move(b)) {}
241  outgoing_entry(outgoing_entry&& o) noexcept : t(std::move(o.t)), buf(std::move(o.buf)), p(std::move(o.p)), pcancel(o.pcancel) {
242  o.p = std::nullopt;
243  }
244  ~outgoing_entry() {
245  if (p) {
246  if (pcancel) {
247  pcancel->cancel_send = std::function<void()>();
248  pcancel->send_back_pointer = nullptr;
249  }
250  p->set_value();
251  }
252  }
253  };
254  friend outgoing_entry;
255  std::list<outgoing_entry> _outgoing_queue;
256  condition_variable _outgoing_queue_cond;
257  future<> _send_loop_stopped = make_ready_future<>();
258  std::unique_ptr<compressor> _compressor;
259  bool _timeout_negotiated = false;
260  // stream related fields
261  bool _is_stream = false;
262  connection_id _id = invalid_connection_id;
263 
264  std::unordered_map<connection_id, xshard_connection_ptr> _streams;
265  queue<rcv_buf> _stream_queue = queue<rcv_buf>(max_queued_stream_buffers);
266  semaphore _stream_sem = semaphore(max_stream_buffers_memory);
267  bool _sink_closed = true;
268  bool _source_closed = true;
269  // the future holds if sink is already closed
270  // if it is not ready it means the sink is been closed
271  future<bool> _sink_closed_future = make_ready_future<bool>(false);
272 
273  bool is_stream() {
274  return _is_stream;
275  }
276 
277  snd_buf compress(snd_buf buf);
278  future<> send_buffer(snd_buf buf);
279 
280  enum class outgoing_queue_type {
281  request,
282  response,
283  stream = response
284  };
285 
286  template<outgoing_queue_type QueueType> void send_loop();
287  future<> stop_send_loop();
288  future<std::optional<rcv_buf>> read_stream_frame_compressed(input_stream<char>& in);
289  bool stream_check_twoway_closed() {
290  return _sink_closed && _source_closed;
291  }
292  future<> stream_close();
293  future<> stream_process_incoming(rcv_buf&&);
294  future<> handle_stream_frame();
295 
296 public:
297  connection(connected_socket&& fd, const logger& l, void* s, connection_id id = invalid_connection_id) : connection(l, s, id) {
298  set_socket(std::move(fd));
299  }
300  connection(const logger& l, void* s, connection_id id = invalid_connection_id) : _logger(l), _serializer(s), _id(id) {}
301  virtual ~connection() {}
302  void set_socket(connected_socket&& fd);
303  future<> send_negotiation_frame(feature_map features);
304  // functions below are public because they are used by external heavily templated functions
305  // and I am not smart enough to know how to define them as friends
306  future<> send(snd_buf buf, std::optional<rpc_clock_type::time_point> timeout = {}, cancellable* cancel = nullptr);
307  bool error() { return _error; }
308  void abort();
309  future<> stop();
310  future<> stream_receive(circular_buffer<foreign_ptr<std::unique_ptr<rcv_buf>>>& bufs);
311  future<> close_sink() {
312  _sink_closed = true;
313  if (stream_check_twoway_closed()) {
314  return stream_close();
315  }
316  return make_ready_future();
317  }
318  bool sink_closed() {
319  return _sink_closed;
320  }
321  future<> close_source() {
322  _source_closed = true;
323  if (stream_check_twoway_closed()) {
324  return stream_close();
325  }
326  return make_ready_future();
327  }
328  connection_id get_connection_id() const {
329  return _id;
330  }
331  xshard_connection_ptr get_stream(connection_id id) const;
332  void register_stream(connection_id id, xshard_connection_ptr c);
333  virtual socket_address peer_address() const = 0;
334 
335  const logger& get_logger() const {
336  return _logger;
337  }
338 
339  template<typename Serializer>
340  Serializer& serializer() {
341  return *static_cast<Serializer*>(_serializer);
342  }
343 
344  template <typename FrameType>
345  typename FrameType::return_type read_frame(socket_address info, input_stream<char>& in);
346 
347  template <typename FrameType>
348  typename FrameType::return_type read_frame_compressed(socket_address info, std::unique_ptr<compressor>& compressor, input_stream<char>& in);
349  friend class client;
350  template<typename Serializer, typename... Out>
351  friend class sink_impl;
352  template<typename Serializer, typename... In>
353  friend class source_impl;
354 };
355 
356 // send data Out...
357 template<typename Serializer, typename... Out>
358 class sink_impl : public sink<Out...>::impl {
359 public:
360  sink_impl(xshard_connection_ptr con) : sink<Out...>::impl(std::move(con)) { this->_con->get()->_sink_closed = false; }
361  future<> operator()(const Out&... args) override;
362  future<> close() override;
363  future<> flush() override;
364  ~sink_impl() override;
365 };
366 
367 // receive data In...
368 template<typename Serializer, typename... In>
369 class source_impl : public source<In...>::impl {
370 public:
371  source_impl(xshard_connection_ptr con) : source<In...>::impl(std::move(con)) { this->_con->get()->_source_closed = false; }
372  future<std::optional<std::tuple<In...>>> operator()() override;
373 };
374 
375 class client : public rpc::connection, public weakly_referencable<client> {
376  socket _socket;
377  id_type _message_id = 1;
378  struct reply_handler_base {
380  cancellable* pcancel = nullptr;
381  virtual void operator()(client&, id_type, rcv_buf data) = 0;
382  virtual void timeout() {}
383  virtual void cancel() {}
384  virtual ~reply_handler_base() {
385  if (pcancel) {
386  pcancel->cancel_wait = std::function<void()>();
387  pcancel->wait_back_pointer = nullptr;
388  }
389  };
390  };
391 public:
392  template<typename Reply, typename Func>
393  struct reply_handler final : reply_handler_base {
394  Func func;
395  Reply reply;
396  reply_handler(Func&& f) : func(std::move(f)) {}
397  virtual void operator()(client& client, id_type msg_id, rcv_buf data) override {
398  return func(reply, client, msg_id, std::move(data));
399  }
400  virtual void timeout() override {
401  reply.done = true;
402  reply.p.set_exception(timeout_error());
403  }
404  virtual void cancel() override {
405  reply.done = true;
406  reply.p.set_exception(canceled_error());
407  }
408  virtual ~reply_handler() {}
409  };
410 private:
411  std::unordered_map<id_type, std::unique_ptr<reply_handler_base>> _outstanding;
412  socket_address _server_addr;
413  client_options _options;
414  std::optional<shared_promise<>> _client_negotiated = shared_promise<>();
415  weak_ptr<client> _parent; // for stream clients
416 
417 private:
418  future<> negotiate_protocol(input_stream<char>& in);
419  void negotiate(feature_map server_features);
421  read_response_frame(input_stream<char>& in);
423  read_response_frame_compressed(input_stream<char>& in);
424  void send_loop() {
425  if (is_stream()) {
426  rpc::connection::send_loop<rpc::connection::outgoing_queue_type::stream>();
427  } else {
428  rpc::connection::send_loop<rpc::connection::outgoing_queue_type::request>();
429  }
430  }
431 public:
440  client(const logger& l, void* s, const socket_address& addr, const socket_address& local = {});
441  client(const logger& l, void* s, client_options options, const socket_address& addr, const socket_address& local = {});
442 
453  client(const logger& l, void* s, socket socket, const socket_address& addr, const socket_address& local = {});
454  client(const logger& l, void* s, client_options options, socket socket, const socket_address& addr, const socket_address& local = {});
455 
456  stats get_stats() const;
457  stats& get_stats_internal() {
458  return _stats;
459  }
460  auto next_message_id() { return _message_id++; }
461  void wait_for_reply(id_type id, std::unique_ptr<reply_handler_base>&& h, std::optional<rpc_clock_type::time_point> timeout, cancellable* cancel);
462  void wait_timed_out(id_type id);
463  future<> stop();
464  void abort_all_streams();
465  void deregister_this_stream();
466  socket_address peer_address() const override {
467  return _server_addr;
468  }
469  future<> await_connection() {
470  if (!_client_negotiated) {
471  return make_ready_future<>();
472  } else {
473  return _client_negotiated->get_shared_future();
474  }
475  }
476  template<typename Serializer, typename... Out>
477  future<sink<Out...>> make_stream_sink(socket socket) {
478  return await_connection().then([this, socket = std::move(socket)] () mutable {
479  if (!this->get_connection_id()) {
480  return make_exception_future<sink<Out...>>(std::runtime_error("Streaming is not supported by the server"));
481  }
482  client_options o = _options;
483  o.stream_parent = this->get_connection_id();
484  o.send_timeout_data = false;
485  auto c = make_shared<client>(_logger, _serializer, o, std::move(socket), _server_addr);
486  c->_parent = this->weak_from_this();
487  c->_is_stream = true;
488  return c->await_connection().then([c, this] {
489  xshard_connection_ptr s = make_lw_shared(make_foreign(static_pointer_cast<rpc::connection>(c)));
490  this->register_stream(c->get_connection_id(), s);
491  return sink<Out...>(make_shared<sink_impl<Serializer, Out...>>(std::move(s)));
492  });
493  });
494  }
495  template<typename Serializer, typename... Out>
496  future<sink<Out...>> make_stream_sink() {
497  return make_stream_sink<Serializer, Out...>(make_socket());
498  }
499 };
500 
501 class protocol_base;
502 
503 class server {
504 private:
505  static thread_local std::unordered_map<streaming_domain_type, server*> _servers;
506 
507 public:
508  class connection : public rpc::connection, public enable_shared_from_this<connection> {
509  server& _server;
510  client_info _info;
511  connection_id _parent_id = invalid_connection_id;
512  std::optional<isolation_config> _isolation_config;
513  private:
514  future<> negotiate_protocol(input_stream<char>& in);
515  future<std::tuple<std::optional<uint64_t>, uint64_t, int64_t, std::optional<rcv_buf>>>
516  read_request_frame_compressed(input_stream<char>& in);
517  future<feature_map> negotiate(feature_map requested);
518  void send_loop() {
519  if (is_stream()) {
520  rpc::connection::send_loop<rpc::connection::outgoing_queue_type::stream>();
521  } else {
522  rpc::connection::send_loop<rpc::connection::outgoing_queue_type::response>();
523  }
524  }
525  future<> send_unknown_verb_reply(std::optional<rpc_clock_type::time_point> timeout, int64_t msg_id, uint64_t type);
526  public:
527  connection(server& s, connected_socket&& fd, socket_address&& addr, const logger& l, void* seralizer, connection_id id);
528  future<> process();
529  future<> respond(int64_t msg_id, snd_buf&& data, std::optional<rpc_clock_type::time_point> timeout);
530  client_info& info() { return _info; }
531  const client_info& info() const { return _info; }
532  stats get_stats() const {
533  stats res = _stats;
534  res.pending = _outgoing_queue.size();
535  return res;
536  }
537 
538  stats& get_stats_internal() {
539  return _stats;
540  }
541  socket_address peer_address() const override {
542  return _info.addr;
543  }
544  // Resources will be released when this goes out of scope
545  future<resource_permit> wait_for_resources(size_t memory_consumed, std::optional<rpc_clock_type::time_point> timeout) {
546  if (timeout) {
547  return get_units(_server._resources_available, memory_consumed, *timeout);
548  } else {
549  return get_units(_server._resources_available, memory_consumed);
550  }
551  }
552  size_t estimate_request_size(size_t serialized_size) {
553  return rpc::estimate_request_size(_server._limits, serialized_size);
554  }
555  size_t max_request_size() const {
556  return _server._limits.max_memory;
557  }
558  server& get_server() {
559  return _server;
560  }
561  future<> deregister_this_stream();
562  };
563 private:
564  protocol_base* _proto;
565  server_socket _ss;
566  resource_limits _limits;
567  rpc_semaphore _resources_available;
568  std::unordered_map<connection_id, shared_ptr<connection>> _conns;
569  promise<> _ss_stopped;
570  gate _reply_gate;
571  server_options _options;
572  uint64_t _next_client_id = 1;
573 
574 public:
575  server(protocol_base* proto, const socket_address& addr, resource_limits memory_limit = resource_limits());
576  server(protocol_base* proto, server_options opts, const socket_address& addr, resource_limits memory_limit = resource_limits());
579  void accept();
580  future<> stop();
581  template<typename Func>
582  void foreach_connection(Func&& f) {
583  for (auto c : _conns) {
584  f(*c.second);
585  }
586  }
587  gate& reply_gate() {
588  return _reply_gate;
589  }
590  friend connection;
591  friend client;
592 };
593 
594 using rpc_handler_func = std::function<future<> (shared_ptr<server::connection>, std::optional<rpc_clock_type::time_point> timeout, int64_t msgid,
595  rcv_buf data)>;
596 
597 struct rpc_handler {
598  scheduling_group sg;
599  rpc_handler_func func;
600  gate use_gate;
601 };
602 
604 public:
605  virtual ~protocol_base() {};
606  virtual shared_ptr<server::connection> make_server_connection(rpc::server& server, connected_socket fd, socket_address addr, connection_id id) = 0;
607 protected:
608  friend class server;
609 
610  virtual rpc_handler* get_handler(uint64_t msg_id) = 0;
611  virtual void put_handler(rpc_handler*) = 0;
612 };
613 
616 
706 template<typename Serializer, typename MsgType = uint32_t>
707 class protocol : public protocol_base {
708 public:
710  class server : public rpc::server {
711  public:
712  server(protocol& proto, const socket_address& addr, resource_limits memory_limit = resource_limits()) :
713  rpc::server(&proto, addr, memory_limit) {}
714  server(protocol& proto, server_options opts, const socket_address& addr, resource_limits memory_limit = resource_limits()) :
715  rpc::server(&proto, opts, addr, memory_limit) {}
717  rpc::server(&proto, std::move(socket), memory_limit) {}
719  rpc::server(&proto, opts, std::move(socket), memory_limit) {}
720  };
722  class client : public rpc::client {
723  public:
724  /*
725  * Create client object which will attempt to connect to the remote address.
726  *
727  * @param addr the remote address identifying this client
728  * @param local the local address of this client
729  */
730  client(protocol& p, const socket_address& addr, const socket_address& local = {}) :
731  rpc::client(p.get_logger(), &p._serializer, addr, local) {}
732  client(protocol& p, client_options options, const socket_address& addr, const socket_address& local = {}) :
733  rpc::client(p.get_logger(), &p._serializer, options, addr, local) {}
734 
743  client(protocol& p, socket socket, const socket_address& addr, const socket_address& local = {}) :
744  rpc::client(p.get_logger(), &p._serializer, std::move(socket), addr, local) {}
745  client(protocol& p, client_options options, socket socket, const socket_address& addr, const socket_address& local = {}) :
746  rpc::client(p.get_logger(), &p._serializer, options, std::move(socket), addr, local) {}
747  };
748 
749  friend server;
750 private:
751  std::unordered_map<MsgType, rpc_handler> _handlers;
752  Serializer _serializer;
753  logger _logger;
754 
755 public:
756  protocol(Serializer&& serializer) : _serializer(std::forward<Serializer>(serializer)) {}
757 
767  template<typename Func>
768  auto make_client(MsgType t);
769 
782  template<typename Func>
783  auto register_handler(MsgType t, Func&& func);
784 
802  template <typename Func>
803  auto register_handler(MsgType t, scheduling_group sg, Func&& func);
804 
815  future<> unregister_handler(MsgType t);
816 
821  [[deprecated("Use set_logger(::seastar::logger*) instead")]]
822  void set_logger(std::function<void(const sstring&)> logger) {
823  _logger.set(std::move(logger));
824  }
825 
828  _logger.set(logger);
829  }
830 
831  const logger& get_logger() const {
832  return _logger;
833  }
834 
835  shared_ptr<rpc::server::connection> make_server_connection(rpc::server& server, connected_socket fd, socket_address addr, connection_id id) override {
836  return make_shared<rpc::server::connection>(server, std::move(fd), std::move(addr), _logger, &_serializer, id);
837  }
838 
839  bool has_handler(MsgType msg_id);
840 
845  bool has_handlers() const noexcept {
846  return !_handlers.empty();
847  }
848 
849 private:
850  rpc_handler* get_handler(uint64_t msg_id) override;
851  void put_handler(rpc_handler*) override;
852 
853  template<typename Ret, typename... In>
854  auto make_client(signature<Ret(In...)> sig, MsgType t);
855 
856  void register_receiver(MsgType t, rpc_handler&& handler) {
857  auto r = _handlers.emplace(t, std::move(handler));
858  if (!r.second) {
859  throw_with_backtrace<std::runtime_error>("registered handler already exists");
860  }
861  }
862 };
863 
865 
866 }
867 
868 }
869 
870 #include "rpc_impl.hh"
seastar::rpc::resource_limits
Resource limits for an RPC server.
Definition: rpc.hh:93
seastar::shared_ptr
Definition: shared_ptr.hh:65
seastar::log_level
log_level
log level used with
Definition: log.hh:45
seastar::rpc::stats
Definition: rpc_types.hh:48
seastar::rpc::protocol::has_handlers
bool has_handlers() const noexcept
Definition: rpc.hh:845
seastar::make_socket
socket make_socket()
seastar
Seastar API namespace.
Definition: abort_on_ebadf.hh:24
seastar::rpc::source::impl
Definition: rpc_types.hh:314
seastar::weak_ptr
Definition: weak_ptr.hh:41
seastar::lw_shared_ptr
Definition: shared_ptr.hh:62
seastar::promise
promise - allows a future value to be made available at a later time.
Definition: future.hh:151
seastar::rpc::server
Definition: rpc.hh:503
seastar::rpc::protocol::register_handler
auto register_handler(MsgType t, Func &&func)
Definition: rpc_impl.hh:681
seastar::rpc::server_options
Definition: rpc.hh:141
seastar::rpc::connection_id
Definition: rpc_types.hh:241
seastar::weakly_referencable
Definition: weak_ptr.hh:95
seastar::rpc::source
Definition: rpc_types.hh:312
seastar::make_ready_future
future< T... > make_ready_future(A &&... value) noexcept
Creates a future in an available, value state.
Definition: future.hh:2048
seastar::rpc::resource_limits::max_memory
size_t max_memory
Definition: rpc.hh:96
seastar::rpc::resource_limits::isolate_connection
std::function< isolation_config(sstring isolation_cookie)> isolate_connection
Definition: rpc.hh:99
seastar::basic_semaphore< semaphore_default_exception_factory, rpc_clock_type >::max_counter
static constexpr size_t max_counter()
Returns the maximum number of units the semaphore counter can hold.
Definition: semaphore.hh:150
seastar::rpc::canceled_error
Definition: rpc_types.hh:109
seastar::rpc::protocol::server
Represents the listening port and all accepted connections.
Definition: rpc.hh:710
seastar::rpc::streaming_domain_type
Definition: rpc.hh:127
seastar::socket
Definition: api.hh:237
seastar::rpc::client_options
Definition: rpc.hh:102
seastar::rpc::protocol::client
Represents a client side connection.
Definition: rpc.hh:722
seastar::rpc::protocol
Definition: rpc.hh:707
seastar::gate
Definition: gate.hh:47
seastar::rpc::sink::impl
Definition: rpc_types.hh:275
seastar::logger
Logger class for ostream or syslog.
Definition: log.hh:80
seastar::input_stream< char >
seastar::rpc::client::client
client(const logger &l, void *s, const socket_address &addr, const socket_address &local={})
seastar::rpc::rpc_handler
Definition: rpc.hh:597
seastar::make_exception_future
future< T... > make_exception_future(std::exception_ptr &&value) noexcept
Creates a future in an available, failed state.
Definition: future.hh:2054
seastar::connected_socket
Definition: api.hh:164
seastar::format
sstring format(const char *fmt, A &&... a)
Definition: print.hh:134
seastar::memory::stats
statistics stats()
Capture a snapshot of memory allocation statistics for this lcore.
seastar::rpc::negotiation_frame
Definition: rpc.hh:156
seastar::logger::log
void log(log_level level, const char *fmt, Args &&... args) noexcept
Definition: log.hh:131
seastar::rpc::rcv_buf
Definition: rpc_types.hh:179
seastar::rpc::snd_buf
Definition: rpc_types.hh:191
seastar::semaphore
basic_semaphore< semaphore_default_exception_factory > semaphore
Definition: semaphore.hh:561
seastar::rpc::timeout_error
Definition: rpc_types.hh:88
seastar::rpc::cancellable
Definition: rpc_types.hh:143
seastar::rpc::protocol::set_logger
void set_logger(::seastar::logger *logger)
Set a logger to be used to log messages.
Definition: rpc.hh:827
seastar::current_scheduling_group
scheduling_group current_scheduling_group() noexcept
Returns the current scheduling group.
Definition: scheduling.hh:339
seastar::rpc::sink_impl
Definition: rpc.hh:358
seastar::logger::info
void info(const char *fmt, Args &&... args) noexcept
Definition: log.hh:191
seastar::rpc::signature
Definition: rpc.hh:174
seastar::server_socket
A listening socket, waiting to accept incoming network connections.
Definition: api.hh:280
seastar::rpc::sink
Definition: rpc_types.hh:273
seastar::queue
Definition: queue.hh:35
seastar::rpc::client_options::isolation_cookie
sstring isolation_cookie
Definition: rpc.hh:112
scheduling.hh
seastar::rpc::isolation_config::sched_group
scheduling_group sched_group
Definition: rpc.hh:74
seastar::rpc::isolation_config
Specifies resource isolation for a connection.
Definition: rpc.hh:71
seastar::rpc::logger
Definition: rpc.hh:176
seastar::rpc::server::connection
Definition: rpc.hh:508
seastar::timer
Definition: timer.hh:78
seastar::rpc::client::reply_handler
Definition: rpc.hh:393
seastar::future
A representation of a possibly not-yet-computed value.
Definition: future.hh:154
seastar::rpc::client
Definition: rpc.hh:375
seastar::rpc::default_isolate_connection
isolation_config default_isolate_connection(sstring isolation_cookie)
seastar::rpc::source_impl
Definition: rpc.hh:369
seastar::rpc::protocol::make_client
auto make_client(MsgType t)
Definition: rpc_impl.hh:662
seastar::future::then
Result then(Func &&func) noexcept
Schedule a block of code to run when the future is ready.
Definition: future.hh:1513
seastar::rpc::compressor::factory
Definition: rpc_types.hh:229
seastar::enable_shared_from_this
Definition: shared_ptr.hh:71
seastar::rpc::protocol::client::client
client(protocol &p, socket socket, const socket_address &addr, const socket_address &local={})
Definition: rpc.hh:743
seastar::rpc::connection
Definition: rpc.hh:222
seastar::rpc::protocol::set_logger
void set_logger(std::function< void(const sstring &)> logger)
Definition: rpc.hh:822
seastar::shared_promise
Like promise except that its counterpart is shared_future instead of future.
Definition: shared_future.hh:233
seastar::rpc::connection::outgoing_entry
Definition: rpc.hh:235
seastar::rpc::resource_limits::basic_request_size
size_t basic_request_size
Minimum request footprint in memory.
Definition: rpc.hh:94
seastar::stream
Definition: stream.hh:50
seastar::basic_semaphore< semaphore_default_exception_factory >
seastar::rpc::resource_limits::bloat_factor
unsigned bloat_factor
Serialized size multiplied by this to estimate memory used by request.
Definition: rpc.hh:95
seastar::output_stream< char >
seastar::socket_address
Definition: socket_defs.hh:41
seastar::rpc::protocol_base
Definition: rpc.hh:603
seastar::condition_variable
Conditional variable.
Definition: condition-variable.hh:65
seastar::rpc::client_info
Definition: rpc_types.hh:59
seastar::rpc::protocol::unregister_handler
future unregister_handler(MsgType t)
Definition: rpc_impl.hh:686
seastar::scheduling_group
Identifies function calls that are accounted as a group.
Definition: scheduling.hh:251