24#include <seastar/core/future.hh>
25#include <seastar/core/loop.hh>
26#include <seastar/core/semaphore.hh>
29#include <seastar/core/reactor_config.hh>
30#include <seastar/core/resource.hh>
32#include <seastar/util/modules.hh>
35#include <boost/lockfree/spsc_queue.hpp>
36#include <boost/thread/barrier.hpp>
47class reactor_backend_selector;
49SEASTAR_MODULE_EXPORT_BEGIN
51class smp_service_group;
58SEASTAR_MODULE_EXPORT_END
62unsigned smp_service_group_id(smp_service_group ssg)
noexcept;
64class memory_prefaulter;
68namespace memory::internal {
74SEASTAR_MODULE_EXPORT_BEGIN
114 unsigned _version = 0;
125SEASTAR_MODULE_EXPORT_END
133SEASTAR_MODULE_EXPORT_BEGIN
162SEASTAR_MODULE_EXPORT_END
164static constexpr smp_timeout_clock::time_point smp_no_timeout = smp_timeout_clock::time_point::max();
166SEASTAR_MODULE_EXPORT_BEGIN
174 smp_timeout_clock::time_point
timeout = smp_no_timeout;
182void init_default_smp_service_group(shard_id cpu);
184smp_service_group_semaphore& get_smp_service_groups_semaphore(
unsigned ssg_id, shard_id t)
noexcept;
187 static constexpr size_t queue_length = 128;
188 static constexpr size_t batch_size = 16;
189 static constexpr size_t prefetch_cnt = 2;
191 struct lf_queue_remote {
194 using lf_queue_base = boost::lockfree::spsc_queue<work_item*,
195 boost::lockfree::capacity<queue_length>>;
197 struct lf_queue : lf_queue_remote, lf_queue_base {
198 lf_queue(
reactor* remote) : lf_queue_remote{remote} {}
204 struct alignas(seastar::cache_line_size) {
207 size_t _last_snt_batch = 0;
208 size_t _last_cmpl_batch = 0;
209 size_t _current_queue_length = 0;
216 struct alignas(seastar::cache_line_size) {
217 size_t _received = 0;
218 size_t _last_rcv_batch = 0;
220 struct work_item :
public task {
223 virtual ~work_item() {}
224 virtual void fail_with(std::exception_ptr) = 0;
226 virtual void complete() = 0;
228 template <
typename Func>
229 struct async_work_item : work_item {
233 using future_type =
typename futurator::type;
234 using value_type =
typename future_type::value_type;
235 std::optional<value_type> _result;
236 std::exception_ptr _ex;
237 typename futurator::promise_type _promise;
239 virtual void fail_with(std::exception_ptr ex)
override {
240 _promise.set_exception(std::move(ex));
242 virtual task* waiting_task()
noexcept override {
246 virtual void run_and_dispose()
noexcept override {
251 _ex = f.get_exception();
255 _queue.respond(
this);
260 virtual void complete()
override {
262 _promise.set_value(std::move(*_result));
265 _promise.set_exception(std::move(_ex));
268 future_type get_future() {
return _promise.get_future(); }
273 void init() {
new (&a) aa; }
275 std::deque<work_item*> pending_fifo;
278 std::vector<work_item*> _completed_fifo;
282 template <
typename Func>
283 futurize_t<std::invoke_result_t<Func>> submit(shard_id t,
smp_submit_to_options options, Func&& func)
noexcept {
285 auto wi = std::make_unique<async_work_item<Func>>(*
this, options.service_group, std::forward<Func>(func));
286 auto fut = wi->get_future();
287 submit_item(t, options.timeout, std::move(wi));
290 void start(
unsigned cpuid);
291 template<
size_t PrefetchCnt,
typename Func>
292 size_t process_queue(lf_queue& q, Func process);
293 size_t process_incoming();
294 size_t process_completions(shard_id t);
298 void submit_item(shard_id t, smp_timeout_clock::time_point timeout, std::unique_ptr<work_item> wi);
299 void respond(work_item* wi);
301 void flush_request_batch();
302 void flush_response_batch();
303 bool has_unflushed_responses()
const;
304 bool pure_poll_rx()
const;
305 bool pure_poll_tx()
const;
310class smp_message_queue;
311struct reactor_options;
314class smp :
public std::enable_shared_from_this<smp> {
316 std::vector<posix_thread> _threads;
317 std::vector<std::function<void ()>> _thread_loops;
318 std::optional<boost::barrier> _all_event_loops_done;
319 std::unique_ptr<internal::memory_prefaulter> _prefaulter;
323 std::unique_ptr<smp_message_queue*[], qs_deleter> _qs_owner;
325 static thread_local std::thread::id _tmain;
326 bool _using_dpdk =
false;
334 void cleanup()
noexcept;
336 void arrive_at_event_loop_end();
338 static bool main_thread() {
return std::this_thread::get_id() == _tmain; }
353 template <
typename Func>
355 using ret_type = std::invoke_result_t<Func>;
361 }
else if (std::is_lvalue_reference_v<Func>) {
366 auto w = std::make_unique<std::decay_t<Func>>(std::move(func));
368 return ret.finally([w = std::move(w)] {});
375 return _qs[t][
this_shard_id()].submit(t, options, std::forward<Func>(func));
392 template <
typename Func>
393 static futurize_t<std::invoke_result_t<Func>>
submit_to(
unsigned t, Func&& func)
noexcept {
396 static bool poll_queues();
397 static bool pure_poll_queues();
398 static std::ranges::range
auto all_cpus() noexcept {
399 return std::views::iota(0u, count);
409 template<
typename Func>
410 requires std::is_nothrow_move_constructible_v<Func>
413 static_assert(std::is_nothrow_move_constructible_v<Func>);
427 template<
typename Func>
440 template<
typename Func>
441 requires std::is_nothrow_move_constructible_v<Func> &&
442 std::is_nothrow_copy_constructible_v<Func>
445 static_assert(std::is_nothrow_move_constructible_v<Func>);
446 return parallel_for_each(all_cpus(), [cpu_id, options, func = std::move(func)] (
unsigned id) {
447 return id != cpu_id ?
smp::submit_to(
id, options, Func(func)) : make_ready_future<>();
460 template<
typename Func>
461 requires std::is_nothrow_move_constructible_v<Func>
471 template<
typename Func>
472 requires std::is_nothrow_move_constructible_v<Func>
477 void start_all_queues();
478 void pin(
unsigned cpu_id);
479 void allocate_reactor(
unsigned id, reactor_backend_selector rbs, reactor_config cfg);
480 void create_thread(std::function<
void ()> thread_loop);
481 unsigned adjust_max_networking_aio_io_control_blocks(
unsigned network_iocbs,
unsigned reserve_iocbs);
482 static void log_aiocbs(
log_level level,
unsigned storage,
unsigned preempt,
unsigned network,
unsigned reserve);
484 static unsigned count;
487SEASTAR_MODULE_EXPORT_END
Counted resource guard.
Definition: semaphore.hh:154
A representation of a possibly not-yet-computed value.
Definition: future.hh:1219
futurize_t< FuncResult > then_wrapped(Func &&func) &noexcept
Schedule a block of code to run when the future is ready, allowing for exception handling.
Definition: future.hh:1504
Low-resolution and efficient steady clock.
Definition: lowres_clock.hh:56
holds the metric definition.
Definition: metrics_registration.hh:94
Definition: reactor.hh:147
Definition: semaphore.hh:525
friend future< smp_service_group > create_smp_service_group(smp_service_group_config ssgc) noexcept
friend smp_service_group default_smp_service_group() noexcept
Definition: smp.hh:154
friend future destroy_smp_service_group(smp_service_group) noexcept
static future invoke_on_others(unsigned cpu_id, smp_submit_to_options options, Func func) noexcept
Definition: smp.hh:443
static future invoke_on_others(Func func) noexcept
Definition: smp.hh:473
static future invoke_on_others(unsigned cpu_id, Func func) noexcept
Definition: smp.hh:462
static futurize_t< std::invoke_result_t< Func > > submit_to(unsigned t, smp_submit_to_options options, Func &&func) noexcept
Definition: smp.hh:354
static future invoke_on_all(Func &&func) noexcept
Definition: smp.hh:428
static future invoke_on_all(smp_submit_to_options options, Func &&func) noexcept
Definition: smp.hh:411
static futurize_t< std::invoke_result_t< Func > > submit_to(unsigned t, Func &&func) noexcept
Definition: smp.hh:393
future< T > make_exception_future(std::exception_ptr &&value) noexcept
Creates a future in an available, failed state.
Definition: future.hh:1928
future parallel_for_each(Iterator begin, Sentinel end, Func &&func) noexcept
Run tasks in parallel (iterator version).
Definition: loop.hh:571
log_level
log level used with
Definition: log.hh:54
holds the metric_groups definition needed by class that reports metrics
Definition: critical_alloc_section.hh:80
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
std::optional< sstring > group_name
Definition: smp.hh:88
scheduling_group current_scheduling_group() noexcept
Returns the current scheduling group.
Definition: scheduling.hh:397
shard_id this_shard_id() noexcept
Returns shard_id of the of the current shard.
Definition: shard_id.hh:52
unsigned max_nonlocal_requests
Definition: smp.hh:83
smp_service_group default_smp_service_group() noexcept
Definition: smp.hh:154
future destroy_smp_service_group(smp_service_group ssg) noexcept
future< smp_service_group > create_smp_service_group(smp_service_group_config ssgc) noexcept
Converts a type to a future type, if it isn't already.
Definition: future.hh:1832
static type invoke(Func &&func, FuncArgs &&... args) noexcept
Check whether a type is a future.
Definition: future.hh:1011
Configuration for the reactor.
Definition: reactor_config.hh:53
Definition: resource.hh:129
Configuration for the multicore aspect of seastar.
Definition: smp_options.hh:47
Options controlling the behaviour of smp::submit_to().
Definition: smp.hh:168
smp_service_group service_group
Controls resource allocation.
Definition: smp.hh:170
smp_timeout_clock::time_point timeout
Definition: smp.hh:174