Seastar
High performance C++ framework for concurrent servers
alien.hh
Go to the documentation of this file.
1 // -*- mode:C++; tab-width:4; c-basic-offset:4; indent-tabs-mode:nil -*-
2 /*
3  * This file is open source software, licensed to you under the terms
4  * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
5  * distributed with this work for additional information regarding copyright
6  * ownership. You may not use this file except in compliance with the License.
7  *
8  * You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 /*
20  * Copyright (C) 2018 Red Hat
21  */
22 
23 #pragma once
24 
25 #include <atomic>
26 #include <deque>
27 #include <future>
28 #include <memory>
29 
30 #include <boost/lockfree/queue.hpp>
31 
32 #include <seastar/core/future.hh>
33 #include <seastar/core/cacheline.hh>
34 #include <seastar/core/sstring.hh>
36 
38 
39 namespace seastar {
40 
41 class reactor;
42 
44 namespace alien {
45 
47  static constexpr size_t batch_size = 128;
48  static constexpr size_t prefetch_cnt = 2;
49  struct work_item;
50  struct lf_queue_remote {
51  reactor* remote;
52  };
53  using lf_queue_base = boost::lockfree::queue<work_item*>;
54  // use inheritence to control placement order
55  struct lf_queue : lf_queue_remote, lf_queue_base {
56  lf_queue(reactor* remote)
57  : lf_queue_remote{remote}, lf_queue_base{batch_size} {}
58  void maybe_wakeup();
59  } _pending;
60  struct alignas(seastar::cache_line_size) {
61  std::atomic<size_t> value{0};
62  } _sent;
63  // keep this between two structures with statistics
64  // this makes sure that they have at least one cache line
65  // between them, so hw prefetcher will not accidentally prefetch
66  // cache line used by another cpu.
67  metrics::metric_groups _metrics;
68  struct alignas(seastar::cache_line_size) {
69  size_t _received = 0;
70  size_t _last_rcv_batch = 0;
71  };
72  struct work_item {
73  virtual ~work_item() = default;
74  virtual void process() = 0;
75  };
76  template <typename Func>
77  struct async_work_item : work_item {
78  Func _func;
79  async_work_item(Func&& func) : _func(std::move(func)) {}
80  void process() override {
81  _func();
82  }
83  };
84  template<typename Func>
85  size_t process_queue(lf_queue& q, Func process);
86  void submit_item(std::unique_ptr<work_item> wi);
87 public:
89  void start();
90  void stop();
91  template <typename Func>
92  void submit(Func&& func) {
93  auto wi = std::make_unique<async_work_item<Func>>(std::forward<Func>(func));
94  submit_item(std::move(wi));
95  }
96  size_t process_incoming();
97  bool pure_poll_rx() const;
98 };
99 
100 class smp {
101  struct qs_deleter {
102  unsigned count;
103  qs_deleter(unsigned n = 0) : count(n) {}
104  qs_deleter(const qs_deleter& d) : count(d.count) {}
105  void operator()(message_queue* qs) const;
106  };
107  using qs = std::unique_ptr<message_queue[], qs_deleter>;
108 public:
109  static qs create_qs(const std::vector<reactor*>& reactors);
110  static qs _qs;
111  static bool poll_queues();
112  static bool pure_poll_queues();
113 };
114 
126 template <typename Func>
127 void run_on(unsigned shard, Func func) {
128  smp::_qs[shard].submit(std::move(func));
129 }
130 
131 namespace internal {
132 template<typename Func>
133 using return_value_t = typename futurize<std::invoke_result_t<Func>>::value_type;
134 
135 template<typename Func,
136  bool = std::is_empty_v<return_value_t<Func>>>
138  using type = void;
139  static void set(std::promise<void>& p, return_value_t<Func>&&) {
140  p.set_value();
141  }
142 };
143 template<typename Func>
144 struct return_type_of<Func, false> {
145  using return_tuple_t = typename futurize<std::invoke_result_t<Func>>::tuple_type;
146  using type = std::tuple_element_t<0, return_tuple_t>;
147  static void set(std::promise<type>& p, return_value_t<Func>&& t) {
148 #if SEASTAR_API_LEVEL < 5
149  p.set_value(std::get<0>(std::move(t)));
150 #else
151  p.set_value(std::move(t));
152 #endif
153  }
154 };
155 template <typename Func> using return_type_t = typename return_type_of<Func>::type;
156 }
157 
166 template<typename Func, typename T = internal::return_type_t<Func>>
167 std::future<T> submit_to(unsigned shard, Func func) {
168  std::promise<T> pr;
169  auto fut = pr.get_future();
170  run_on(shard, [pr = std::move(pr), func = std::move(func)] () mutable {
171  // std::future returned via std::promise above.
172  (void)func().then_wrapped([pr = std::move(pr)] (auto&& result) mutable {
173  try {
174  internal::return_type_of<Func>::set(pr, result.get());
175  } catch (...) {
176  pr.set_exception(std::current_exception());
177  }
178  });
179  });
180  return fut;
181 }
182 
183 }
184 }
seastar
Seastar API namespace.
Definition: abort_on_ebadf.hh:24
seastar::reactor
Definition: reactor.hh:187
seastar::alien::internal::return_type_of
Definition: alien.hh:137
seastar::alien::smp
Definition: alien.hh:100
seastar::alien::run_on
void run_on(unsigned shard, Func func)
Definition: alien.hh:127
seastar::futurize
Converts a type to a future type, if it isn't already.
Definition: future.hh:1956
seastar::alien::message_queue
Definition: alien.hh:46
seastar::metrics::metric_groups
holds the metric definition.
Definition: metrics_registration.hh:89
metrics_registration.hh
holds the metric_groups definition needed by class that reports metrics
seastar::alien::submit_to
std::future< T > submit_to(unsigned shard, Func func)
Definition: alien.hh:167