24 #include <seastar/core/future.hh>
25 #include <seastar/core/loop.hh>
26 #include <seastar/core/semaphore.hh>
29 #include <seastar/core/reactor_config.hh>
30 #include <boost/lockfree/spsc_queue.hpp>
31 #include <boost/thread/barrier.hpp>
32 #include <boost/range/irange.hpp>
33 #include <boost/program_options.hpp>
41 using shard_id = unsigned;
43 class smp_service_group;
44 class reactor_backend_selector;
48 unsigned smp_service_group_id(smp_service_group ssg) noexcept;
50 inline shard_id* this_shard_id_ptr() noexcept {
51 static thread_local shard_id g_this_shard_id;
52 return &g_this_shard_id;
59 return *internal::this_shard_id_ptr();
137 using smp_timeout_clock = lowres_clock;
138 using smp_service_group_semaphore = basic_semaphore<named_semaphore_exception_factory, smp_timeout_clock>;
139 using smp_service_group_semaphore_units = semaphore_units<named_semaphore_exception_factory, smp_timeout_clock>;
141 static constexpr smp_timeout_clock::time_point smp_no_timeout = smp_timeout_clock::time_point::max();
150 smp_timeout_clock::time_point
timeout = smp_no_timeout;
158 void init_default_smp_service_group(shard_id cpu);
160 smp_service_group_semaphore& get_smp_service_groups_semaphore(
unsigned ssg_id, shard_id t) noexcept;
163 static constexpr
size_t queue_length = 128;
164 static constexpr
size_t batch_size = 16;
165 static constexpr
size_t prefetch_cnt = 2;
167 struct lf_queue_remote {
170 using lf_queue_base = boost::lockfree::spsc_queue<work_item*,
171 boost::lockfree::capacity<queue_length>>;
173 struct lf_queue : lf_queue_remote, lf_queue_base {
174 lf_queue(
reactor* remote) : lf_queue_remote{remote} {}
180 struct alignas(seastar::cache_line_size) {
183 size_t _last_snt_batch = 0;
184 size_t _last_cmpl_batch = 0;
185 size_t _current_queue_length = 0;
192 struct alignas(seastar::cache_line_size) {
193 size_t _received = 0;
194 size_t _last_rcv_batch = 0;
196 struct work_item :
public task {
199 virtual ~work_item() {}
200 virtual void fail_with(std::exception_ptr) = 0;
202 virtual void complete() = 0;
204 template <
typename Func>
205 struct async_work_item : work_item {
208 using futurator =
futurize<std::result_of_t<Func()>>;
209 using future_type =
typename futurator::type;
210 using value_type =
typename future_type::value_type;
211 std::optional<value_type> _result;
212 std::exception_ptr _ex;
213 typename futurator::promise_type _promise;
215 virtual void fail_with(std::exception_ptr ex)
override {
216 _promise.set_exception(std::move(ex));
218 virtual task* waiting_task() noexcept
override {
222 virtual void run_and_dispose() noexcept
override {
227 _ex = f.get_exception();
231 _queue.respond(
this);
236 virtual void complete()
override {
238 _promise.set_value(std::move(*_result));
241 _promise.set_exception(std::move(_ex));
244 future_type get_future() {
return _promise.get_future(); }
249 void init() {
new (&a) aa; }
251 std::deque<work_item*> pending_fifo;
254 std::vector<work_item*> _completed_fifo;
258 template <
typename Func>
259 futurize_t<std::result_of_t<Func()>> submit(shard_id t,
smp_submit_to_options options, Func&& func) noexcept {
261 auto wi = std::make_unique<async_work_item<Func>>(*
this, options.service_group, std::forward<Func>(func));
262 auto fut = wi->get_future();
263 submit_item(t, options.timeout, std::move(wi));
266 void start(
unsigned cpuid);
267 template<
size_t PrefetchCnt,
typename Func>
268 size_t process_queue(lf_queue& q, Func process);
269 size_t process_incoming();
270 size_t process_completions(shard_id t);
274 void submit_item(shard_id t, smp_timeout_clock::time_point timeout, std::unique_ptr<work_item> wi);
275 void respond(work_item* wi);
277 void flush_request_batch();
278 void flush_response_batch();
279 bool has_unflushed_responses()
const;
280 bool pure_poll_rx()
const;
281 bool pure_poll_tx()
const;
287 static std::vector<posix_thread> _threads;
288 static std::vector<std::function<void ()>> _thread_loops;
289 static std::optional<boost::barrier> _all_event_loops_done;
290 static std::vector<reactor*> _reactors;
294 static std::unique_ptr<smp_message_queue*[], qs_deleter> _qs;
295 static std::thread::id _tmain;
296 static bool _using_dpdk;
298 template <
typename Func>
300 template <
typename Func>
301 using returns_void = std::is_same<std::result_of_t<Func()>,
void>;
303 static boost::program_options::options_description get_options_description();
304 static void register_network_stacks();
305 static void configure(boost::program_options::variables_map vm,
reactor_config cfg = {});
306 static void cleanup();
307 static void cleanup_cpu();
308 static void arrive_at_event_loop_end();
309 static void join_all();
310 static bool main_thread() {
return std::this_thread::get_id() == _tmain; }
325 template <
typename Func>
327 using ret_type = std::result_of_t<Func()>;
333 }
else if (std::is_lvalue_reference<Func>::value) {
338 auto w = std::make_unique<std::decay_t<Func>>(std::move(func));
340 return ret.finally([w = std::move(w)] {});
347 return _qs[t][
this_shard_id()].submit(t, options, std::forward<Func>(func));
364 template <
typename Func>
365 static futurize_t<std::result_of_t<Func()>>
submit_to(
unsigned t, Func&& func) noexcept {
368 static bool poll_queues();
369 static bool pure_poll_queues();
370 static boost::integer_range<unsigned> all_cpus() noexcept {
371 return boost::irange(0u, count);
381 template<
typename Func>
382 SEASTAR_CONCEPT( requires std::is_nothrow_move_constructible_v<Func> )
384 static_assert(std::is_same<
future<>,
typename futurize<std::result_of_t<Func()>>::type>::value,
"bad Func signature");
385 static_assert(std::is_nothrow_move_constructible_v<Func>);
399 template<
typename Func>
412 template<
typename Func>
413 SEASTAR_CONCEPT( requires std::is_nothrow_move_constructible_v<Func> )
415 static_assert(std::is_same<
future<>,
typename futurize<std::result_of_t<Func()>>::type>::value,
"bad Func signature");
416 static_assert(std::is_nothrow_move_constructible_v<Func>);
417 return parallel_for_each(all_cpus(), [cpu_id, options, func = std::move(func)] (
unsigned id) {
418 return id != cpu_id ?
smp::submit_to(
id, options, func) : make_ready_future<>();
431 template<
typename Func>
436 static void start_all_queues();
437 static void pin(
unsigned cpu_id);
438 static void allocate_reactor(
unsigned id, reactor_backend_selector rbs,
reactor_config cfg);
439 static void create_thread(std::function<
void ()> thread_loop);
441 static unsigned count;