Seastar
High performance C++ framework for concurrent servers
alien.hh
Go to the documentation of this file.
1 // -*- mode:C++; tab-width:4; c-basic-offset:4; indent-tabs-mode:nil -*-
2 /*
3  * This file is open source software, licensed to you under the terms
4  * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
5  * distributed with this work for additional information regarding copyright
6  * ownership. You may not use this file except in compliance with the License.
7  *
8  * You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 /*
20  * Copyright (C) 2018 Red Hat
21  */
22 
23 #pragma once
24 
25 #ifndef SEASTAR_MODULE
26 #include <atomic>
27 #include <deque>
28 #include <future>
29 #include <memory>
30 #include <type_traits>
31 #include <boost/lockfree/queue.hpp>
32 #endif
33 
34 #include <seastar/core/future.hh>
35 #include <seastar/core/cacheline.hh>
36 #include <seastar/core/sstring.hh>
38 #include <seastar/util/concepts.hh>
39 #include <seastar/util/modules.hh>
40 
42 
43 namespace seastar {
44 
45 SEASTAR_MODULE_EXPORT
46 class reactor;
47 
49 namespace alien {
50 
52  static constexpr size_t batch_size = 128;
53  static constexpr size_t prefetch_cnt = 2;
54  struct work_item;
55  struct lf_queue_remote {
56  reactor* remote;
57  };
58  using lf_queue_base = boost::lockfree::queue<work_item*>;
59  // use inheritence to control placement order
60  struct lf_queue : lf_queue_remote, lf_queue_base {
61  lf_queue(reactor* remote)
62  : lf_queue_remote{remote}, lf_queue_base{batch_size} {}
63  void maybe_wakeup();
64  } _pending;
65  struct alignas(seastar::cache_line_size) {
66  std::atomic<size_t> value{0};
67  } _sent;
68  // keep this between two structures with statistics
69  // this makes sure that they have at least one cache line
70  // between them, so hw prefetcher will not accidentally prefetch
71  // cache line used by another cpu.
72  metrics::metric_groups _metrics;
73  struct alignas(seastar::cache_line_size) {
74  size_t _received = 0;
75  size_t _last_rcv_batch = 0;
76  };
77  struct work_item {
78  virtual ~work_item() = default;
79  virtual void process() = 0;
80  };
81  template <typename Func>
82  struct async_work_item : work_item {
83  Func _func;
84  async_work_item(Func&& func) : _func(std::move(func)) {}
85  void process() override {
86  _func();
87  }
88  };
89  template<typename Func>
90  size_t process_queue(lf_queue& q, Func process);
91  void submit_item(std::unique_ptr<work_item> wi);
92 public:
94  void start();
95  void stop();
96  template <typename Func>
97  void submit(Func&& func) {
98  auto wi = std::make_unique<async_work_item<Func>>(std::forward<Func>(func));
99  submit_item(std::move(wi));
100  }
101  size_t process_incoming();
102  bool pure_poll_rx() const;
103 };
104 
105 namespace internal {
106 
107 struct qs_deleter {
108  unsigned count;
109  qs_deleter(unsigned n = 0) : count(n) {}
110  void operator()(message_queue* qs) const;
111 };
112 
113 }
114 
119 SEASTAR_MODULE_EXPORT
120 class instance {
121  using qs = std::unique_ptr<message_queue[], internal::qs_deleter>;
122 public:
123  static qs create_qs(const std::vector<reactor*>& reactors);
124  qs _qs;
125  bool poll_queues();
126  bool pure_poll_queues();
127 };
128 
129 namespace internal {
130 
131 extern instance* default_instance;
132 
133 }
134 
147 SEASTAR_MODULE_EXPORT
148 template <typename Func>
149 SEASTAR_CONCEPT(requires std::is_nothrow_invocable_r_v<void, Func>)
150 void run_on(instance& instance, unsigned shard, Func func) {
151  instance._qs[shard].submit(std::move(func));
152 }
153 
165 template <typename Func>
166 [[deprecated("Use run_on(instance&, unsigned shard, Func) instead")]]
167 void run_on(unsigned shard, Func func) {
168  run_on(*internal::default_instance, shard, std::move(func));
169 }
170 
171 namespace internal {
172 template<typename Func>
173 using return_value_t = typename futurize<std::invoke_result_t<Func>>::value_type;
174 
175 template<typename Func,
176  bool = std::is_empty_v<return_value_t<Func>>>
178  using type = void;
179  static void set(std::promise<void>& p, return_value_t<Func>&&) {
180  p.set_value();
181  }
182 };
183 template<typename Func>
184 struct return_type_of<Func, false> {
185  using return_tuple_t = typename futurize<std::invoke_result_t<Func>>::tuple_type;
186  using type = std::tuple_element_t<0, return_tuple_t>;
187  static void set(std::promise<type>& p, return_value_t<Func>&& t) {
188  p.set_value(std::move(t));
189  }
190 };
191 template <typename Func> using return_type_t = typename return_type_of<Func>::type;
192 }
193 
203 SEASTAR_MODULE_EXPORT
204 template<typename Func, typename T = internal::return_type_t<Func>>
205 SEASTAR_CONCEPT(requires std::invocable<Func>)
206 std::future<T> submit_to(instance& instance, unsigned shard, Func func) {
207  std::promise<T> pr;
208  auto fut = pr.get_future();
209  run_on(instance, shard, [pr = std::move(pr), func = std::move(func)] () mutable noexcept {
210  // std::future returned via std::promise above.
211  (void)func().then_wrapped([pr = std::move(pr)] (auto&& result) mutable {
212  try {
213  internal::return_type_of<Func>::set(pr, result.get());
214  } catch (...) {
215  pr.set_exception(std::current_exception());
216  }
217  });
218  });
219  return fut;
220 }
221 
230 template<typename Func, typename T = internal::return_type_t<Func>>
231 [[deprecated("Use submit_to(instance&, unsigned shard, Func) instead.")]]
232 std::future<T> submit_to(unsigned shard, Func func) {
233  return submit_to(*internal::default_instance, shard, std::move(func));
234 }
235 
236 }
237 }
Definition: alien.hh:120
Definition: alien.hh:51
A representation of a possibly not-yet-computed value.
Definition: future.hh:1238
holds the metric definition.
Definition: metrics_registration.hh:94
Definition: reactor.hh:168
holds the metric_groups definition needed by class that reports metrics
void run_on(instance &instance, unsigned shard, Func func)
Definition: alien.hh:150
std::future< T > submit_to(instance &instance, unsigned shard, Func func)
Definition: alien.hh:206
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
Definition: alien.hh:107
Converts a type to a future type, if it isn't already.
Definition: future.hh:1843