24 #include <seastar/core/aligned_buffer.hh>
25 #include <seastar/core/cacheline.hh>
26 #include <seastar/core/circular_buffer.hh>
28 #include <seastar/core/condition-variable.hh>
29 #include <seastar/core/enum.hh>
30 #include <seastar/core/fair_queue.hh>
31 #include <seastar/core/file.hh>
32 #include <seastar/core/future.hh>
34 #include <seastar/core/internal/io_request.hh>
35 #include <seastar/core/internal/io_sink.hh>
36 #include <seastar/core/iostream.hh>
37 #include <seastar/core/linux-aio.hh>
38 #include <seastar/core/lowres_clock.hh>
39 #include <seastar/core/make_task.hh>
40 #include <seastar/core/manual_clock.hh>
41 #include <seastar/core/memory.hh>
43 #include <seastar/core/internal/estimated_histogram.hh>
45 #include <seastar/core/reactor_config.hh>
46 #include <seastar/core/scattered_message.hh>
48 #include <seastar/core/scheduling_specific.hh>
49 #include <seastar/core/seastar.hh>
50 #include <seastar/core/semaphore.hh>
52 #include <seastar/core/sstring.hh>
53 #include <seastar/core/temporary_buffer.hh>
54 #include <seastar/core/thread_cputime_clock.hh>
56 #include <seastar/core/gate.hh>
57 #include <seastar/net/api.hh>
58 #include <seastar/util/eclipse.hh>
59 #include <seastar/util/log.hh>
60 #include <seastar/util/std-compat.hh>
61 #include <seastar/util/modules.hh>
62 #include "internal/pollable_fd.hh"
64 #ifndef SEASTAR_MODULE
65 #include <boost/container/static_vector.hpp>
66 #include <boost/lockfree/spsc_queue.hpp>
67 #include <boost/next_prior.hpp>
68 #include <boost/range/irange.hpp>
69 #include <boost/thread/barrier.hpp>
81 #include <string_view>
82 #include <system_error>
84 #include <type_traits>
85 #include <unordered_map>
88 #include <sys/epoll.h>
89 #include <sys/types.h>
90 #include <sys/socket.h>
91 #include <netinet/ip.h>
94 #include <osv/sched.hh>
95 #include <osv/mutex.h>
96 #include <osv/condvar.h>
97 #include <osv/newpoll.hh>
102 struct _Unwind_Exception;
106 using shard_id = unsigned;
112 SEASTAR_MODULE_EXPORT
120 struct hash<::sockaddr_in> {
121 size_t operator()(::sockaddr_in a)
const {
122 return a.sin_port ^ a.sin_addr.s_addr;
128 bool operator==(const ::sockaddr_in a, const ::sockaddr_in b);
135 class reactor_backend_selector;
137 class reactor_backend;
142 class reactor_stall_sampler;
143 class cpu_stall_detector;
144 class buffer_allocator;
145 class priority_class;
148 size_t scheduling_group_count();
150 void increase_thrown_exceptions_counter() noexcept;
154 class kernel_completion;
156 SEASTAR_MODULE_EXPORT
161 virtual void complete_with(ssize_t res)
final override;
163 virtual void complete(
size_t res) noexcept = 0;
164 virtual void set_exception(std::exception_ptr eptr) noexcept = 0;
167 SEASTAR_MODULE_EXPORT
175 class batch_flush_pollfn;
177 class drain_cross_cpu_freelist_pollfn;
178 class lowres_timer_pollfn;
179 class manual_timer_pollfn;
181 class reap_kernel_completions_pollfn;
182 class kernel_submit_work_pollfn;
183 class io_queue_submission_pollfn;
184 class syscall_pollfn;
185 class execution_stage_pollfn;
187 friend class file_data_source_impl;
188 friend class internal::reactor_stall_sampler;
189 friend class preempt_io_context;
190 friend struct hrtimer_aio_completion;
191 friend class reactor_backend_epoll;
192 friend class reactor_backend_aio;
193 friend class reactor_backend_uring;
194 friend class reactor_backend_selector;
197 friend class aio_storage_context;
199 using poller = internal::poller;
205 uint64_t aio_reads = 0;
206 uint64_t aio_read_bytes = 0;
207 uint64_t aio_writes = 0;
208 uint64_t aio_write_bytes = 0;
209 uint64_t aio_outsizes = 0;
210 uint64_t aio_errors = 0;
211 uint64_t fstream_reads = 0;
212 uint64_t fstream_read_bytes = 0;
213 uint64_t fstream_reads_blocked = 0;
214 uint64_t fstream_read_bytes_blocked = 0;
215 uint64_t fstream_read_aheads_discarded = 0;
216 uint64_t fstream_read_ahead_discarded_bytes = 0;
223 uint64_t tasks_processed = 0;
225 friend void io_completion::complete_with(ssize_t);
232 std::shared_ptr<smp> _smp;
238 reactor_backend_osv _backend;
239 sched::thread _timer_thread;
240 sched::thread *_engine_thread;
241 mutable mutex _timer_mutex;
245 std::unique_ptr<reactor_backend> _backend;
247 sigset_t _active_sigmask;
248 std::vector<pollfn*> _pollers;
250 static constexpr
unsigned max_aio_per_queue = 128;
251 static constexpr
unsigned max_queues = 8;
252 static constexpr
unsigned max_aio = max_aio_per_queue * max_queues;
255 std::unordered_map<dev_t, std::unique_ptr<io_queue>> _io_queues;
257 internal::io_sink _io_sink;
258 unsigned _num_io_groups = 0;
260 std::vector<noncopyable_function<future<> ()>> _exit_funcs;
262 bool _stopping =
false;
263 bool _stopped =
false;
264 bool _finished_running_tasks =
false;
266 bool _handle_sigint =
true;
267 std::optional<future<std::unique_ptr<network_stack>>> _network_stack_ready;
271 internal::preemption_monitor _preemption_monitor{};
272 uint64_t _global_tasks_processed = 0;
274 metrics::internal::time_estimated_histogram _stalls_histogram;
275 std::unique_ptr<internal::cpu_stall_detector> _cpu_stall_detector;
277 unsigned _max_task_backlog = 1000;
278 timer_set<timer<>, &timer<>::_link> _timers;
279 timer_set<timer<>, &timer<>::_link>::timer_list_t _expired_timers;
280 timer_set<timer<lowres_clock>, &timer<lowres_clock>::_link> _lowres_timers;
281 timer_set<timer<lowres_clock>, &timer<lowres_clock>::_link>::timer_list_t _expired_lowres_timers;
282 timer_set<timer<manual_clock>, &timer<manual_clock>::_link> _manual_timers;
283 timer_set<timer<manual_clock>, &timer<manual_clock>::_link>::timer_list_t _expired_manual_timers;
285 uint64_t _fsyncs = 0;
286 uint64_t _cxx_exceptions = 0;
287 uint64_t _abandoned_failed_futures = 0;
289 explicit task_queue(
unsigned id, sstring name, sstring shortname,
float shares);
290 int64_t _vruntime = 0;
292 int64_t _reciprocal_shares_times_2_power_32;
293 bool _current =
false;
294 bool _active =
false;
296 sched_clock::time_point _ts;
297 sched_clock::duration _runtime = {};
298 sched_clock::duration _waittime = {};
299 sched_clock::duration _starvetime = {};
300 uint64_t _tasks_processed = 0;
301 circular_buffer<task*> _q;
305 static constexpr
size_t shortname_size = 4;
307 int64_t to_vruntime(sched_clock::duration runtime)
const;
308 void set_shares(
float shares) noexcept;
309 struct indirect_compare;
310 sched_clock::duration _time_spent_on_task_quota_violations = {};
312 void rename(sstring new_name, sstring new_shortname);
314 void register_stats();
317 boost::container::static_vector<std::unique_ptr<task_queue>, max_scheduling_groups()> _task_queues;
318 internal::scheduling_group_specific_thread_local_data _scheduling_group_specific_data;
319 int64_t _last_vruntime = 0;
320 task_queue_list _active_task_queues;
321 task_queue_list _activating_task_queues;
322 task_queue* _at_destroy_tasks;
323 sched_clock::duration _task_quota;
324 task* _current_task =
nullptr;
334 std::unique_ptr<network_stack> _network_stack;
335 lowres_clock::time_point _lowres_next_timeout = lowres_clock::time_point::max();
336 std::optional<pollable_fd> _aio_eventfd;
337 const bool _reuseport;
338 circular_buffer<double> _loads;
340 sched_clock::duration _total_idle{0};
341 sched_clock::duration _total_sleep;
342 sched_clock::time_point _start_time =
now();
343 std::chrono::nanoseconds _max_poll_time = calculate_poll_time();
344 output_stream<char>::batch_flush_list_t _flush_batching;
345 std::atomic<bool> _sleeping
alignas(seastar::cache_line_size){0};
346 pthread_t _thread_id
alignas(seastar::cache_line_size) = pthread_self();
347 bool _strict_o_direct =
true;
348 bool _force_io_getevents_syscall =
false;
349 bool _bypass_fsync =
false;
350 bool _have_aio_fsync =
false;
351 bool _kernel_page_cache =
false;
352 std::atomic<bool> _dying{
false};
353 gate _background_gate;
356 static std::chrono::nanoseconds calculate_poll_time();
357 static void block_notifier(
int);
358 bool flush_pending_aio();
359 steady_clock_type::time_point next_pending_aio() const noexcept;
360 bool reap_kernel_completions();
361 bool flush_tcp_batches();
362 void update_lowres_clocks() noexcept;
363 bool do_expire_lowres_timers() noexcept;
364 bool do_check_lowres_timers() const noexcept;
365 void expire_manual_timers() noexcept;
366 void start_aio_eventfd_loop();
367 void stop_aio_eventfd_loop();
368 template <typename T, typename E, typename EnableFunc>
369 void complete_timers(T&, E&, EnableFunc&& enable_fn) noexcept(noexcept(enable_fn()));
378 bool pure_poll_once();
391 bool pure_poll_signal()
const;
394 static void action(
int signo, siginfo_t* siginfo,
void* ignore);
395 static void failed_to_handle(
int signo);
397 struct signal_handler {
401 std::atomic<uint64_t> _pending_signals;
402 std::unordered_map<int, signal_handler> _signal_handlers;
406 std::unique_ptr<thread_pool> _thread_pool;
407 friend class thread_pool;
408 friend class thread_context;
409 friend class internal::cpu_stall_detector;
411 uint64_t pending_task_count()
const;
412 void run_tasks(task_queue& tq);
413 bool have_more_tasks()
const;
414 bool posix_reuseport_detect();
415 void run_some_tasks();
416 void activate(task_queue& tq);
417 void insert_active_task_queue(task_queue* tq);
418 task_queue* pop_active_task_queue(sched_clock::time_point
now);
419 void insert_activating_task_queues();
420 void account_runtime(task_queue& tq, sched_clock::duration runtime);
421 void account_idle(sched_clock::duration idletime);
427 uint64_t tasks_processed()
const;
428 uint64_t min_vruntime()
const;
429 void request_preemption();
430 void start_handling_signal();
431 void reset_preemption_monitor();
432 void service_highres_timer() noexcept;
455 explicit
reactor(std::
shared_ptr<
smp>
smp, alien::instance& alien,
unsigned id, reactor_backend_selector rbs, reactor_config cfg);
458 void operator=(const
reactor&) = delete;
460 static sched_clock::time_point
now() noexcept {
463 sched_clock::duration uptime() {
464 return now() - _start_time;
467 io_queue& get_io_queue(dev_t devid = 0) {
468 auto queue = _io_queues.find(devid);
469 if (queue == _io_queues.end()) {
470 return *_io_queues.at(0);
472 return *(queue->second);
476 #if SEASTAR_API_LEVEL < 7
477 [[deprecated(
"Use io_priority_class::register_one")]]
478 io_priority_class register_one_priority_class(sstring name, uint32_t shares);
480 [[deprecated(
"Use io_priority_class.update_shares")]]
481 future<> update_shares_for_class(io_priority_class pc, uint32_t shares);
483 [[deprecated(
"Use io_priority_class.rename")]]
484 static future<> rename_priority_class(io_priority_class pc, sstring new_name) noexcept;
488 future<> update_bandwidth_for_queues(internal::priority_class pc, uint64_t bandwidth);
490 void rename_queues(internal::priority_class pc, sstring new_name);
492 void update_shares_for_queues(internal::priority_class pc, uint32_t shares);
494 void configure(
const reactor_options& opts);
496 server_socket
listen(socket_address sa, listen_options opts = {});
498 future<connected_socket>
connect(socket_address sa);
499 future<connected_socket>
connect(socket_address, socket_address, transport proto = transport::TCP);
501 pollable_fd posix_listen(socket_address sa, listen_options opts = {});
503 bool posix_reuseport_available()
const {
return _reuseport; }
505 pollable_fd make_pollable_fd(socket_address sa,
int proto);
507 future<> posix_connect(pollable_fd pfd, socket_address sa, socket_address local);
509 future<> send_all(pollable_fd_state& fd,
const void* buffer,
size_t size);
511 future<file> open_file_dma(std::string_view name,
open_flags flags, file_open_options options = {}) noexcept;
512 future<file> open_directory(std::string_view name) noexcept;
513 future<> make_directory(std::string_view name, file_permissions permissions = file_permissions::default_dir_permissions) noexcept;
514 future<> touch_directory(std::string_view name, file_permissions permissions = file_permissions::default_dir_permissions) noexcept;
515 future<std::optional<directory_entry_type>>
file_type(std::string_view name, follow_symlink = follow_symlink::yes) noexcept;
516 future<stat_data>
file_stat(std::string_view pathname, follow_symlink) noexcept;
517 future<uint64_t>
file_size(std::string_view pathname) noexcept;
518 future<bool>
file_accessible(std::string_view pathname, access_flags flags) noexcept;
519 future<bool>
file_exists(std::string_view pathname) noexcept {
522 future<fs_type>
file_system_at(std::string_view pathname) noexcept;
523 future<struct statvfs> statvfs(std::string_view pathname) noexcept;
525 future<> rename_file(std::string_view old_pathname, std::string_view new_pathname) noexcept;
527 future<> chmod(std::string_view name, file_permissions permissions) noexcept;
529 future<size_t> read_directory(
int fd,
char* buffer,
size_t buffer_size);
531 future<int> inotify_add_watch(
int fd, std::string_view path, uint32_t flags);
533 future<std::tuple<file_desc, file_desc>>
make_pipe();
534 future<std::tuple<pid_t, file_desc, file_desc, file_desc>>
535 spawn(std::string_view pathname,
536 std::vector<sstring> argv,
537 std::vector<sstring> env = {});
538 future<int> waitpid(pid_t pid);
539 void kill(pid_t pid,
int sig);
543 future<> when_started() {
return _start_promise.
get_future(); }
546 template <
typename Rep,
typename Period>
547 future<> wait_for_stop(std::chrono::duration<Rep, Period> timeout) {
548 return _stop_requested.
wait(timeout, [
this] {
return _stopping; });
551 void at_exit(noncopyable_function<
future<> ()> func);
553 template <
typename Func>
554 void at_destroy(Func&& func) {
555 _at_destroy_tasks->_q.push_back(make_task(default_scheduling_group(), std::forward<Func>(func)));
558 task* current_task()
const {
return _current_task; }
563 void set_current_task(task* t) { _current_task = t; }
565 void add_task(task* t) noexcept;
566 void add_urgent_task(task* t) noexcept;
570 template <
typename Func>
571 void run_in_background(Func&& func) {
572 run_in_background(futurize_invoke(std::forward<Func>(func)));
587 _idle_cpu_handler = std::move(handler);
591 void add_high_priority_task(
task*) noexcept;
595 [[deprecated(
"Use this_shard_id")]]
596 shard_id cpu_id()
const;
600 steady_clock_type::duration total_idle_time();
601 steady_clock_type::duration total_busy_time();
602 std::chrono::nanoseconds total_steal_time();
604 const io_stats& get_io_stats()
const {
return _io_stats; }
611 uint64_t abandoned_failed_futures()
const {
return _abandoned_failed_futures; }
613 void timer_thread_func();
614 void set_timer(sched::timer &tmr, s64 t);
625 void register_poller(
pollfn* p);
626 void unregister_poller(
pollfn* p);
628 void register_metrics();
631 future<> fdatasync(
int fd) noexcept;
648 friend class posix_file_impl;
649 friend class blockdev_file_impl;
650 friend class timer<>;
655 friend class internal::poller;
658 friend void seastar::internal::increase_thrown_exceptions_counter() noexcept;
659 friend
void report_failed_future(const std::exception_ptr& eptr) noexcept;
660 metrics::metric_groups _metric_groups;
666 future<struct statfs> fstatfs(
int fd) noexcept;
673 void enable_timer(steady_clock_type::time_point when) noexcept;
683 void set_strict_dma(
bool value);
684 void set_bypass_fsync(
bool value);
685 void update_blocked_reactor_notify_ms(std::chrono::milliseconds ms);
686 std::chrono::milliseconds get_blocked_reactor_notify_ms() const;
690 static void with_allow_abandoned_failed_futures(
unsigned count,
noncopyable_function<
void ()> func);
696 static std::function<void ()> get_stall_detector_report_function();
700 extern __thread
reactor* local_engine;
701 extern __thread
size_t task_quota;
703 SEASTAR_MODULE_EXPORT
705 return *local_engine;
708 SEASTAR_MODULE_EXPORT
709 inline bool engine_is_ready() {
710 return local_engine !=
nullptr;
713 inline int hrtimer_signal() {
720 extern logger seastar_logger;
Definition: circular_buffer_fixed_capacity.hh:52
Conditional variable.
Definition: condition-variable.hh:74
future wait() noexcept
Definition: condition-variable.hh:209
Definition: reactor.hh:159
Definition: io_queue.hh:95
Definition: io_desc.hh:29
Low-resolution and efficient steady clock.
Definition: lowres_clock.hh:59
Definition: manual_clock.hh:35
holds the metric definition.
Definition: metrics_registration.hh:94
Definition: pollable_fd.hh:63
Definition: pollable_fd.hh:137
Definition: reactor.hh:688
static void set_stall_detector_report_function(std::function< void()> report)
Definition: reactor.hh:168
sched_stats get_sched_stats() const
void set_idle_cpu_handler(idle_cpu_handler &&handler)
Definition: reactor.hh:586
alien::instance & alien()
Definition: reactor.hh:229
Definition: reactor.hh:204
Scheduling statistics.
Definition: reactor.hh:219
Definition: scheduling.hh:183
Identifies function calls that are accounted as a group.
Definition: scheduling.hh:286
Definition: shared_ptr.hh:515
Definition: socket_defs.hh:47
Definition: temporary_buffer.hh:67
future< stat_data > file_stat(std::string_view name, follow_symlink fs=follow_symlink::yes) noexcept
future touch_directory(std::string_view name, file_permissions permissions=file_permissions::default_dir_permissions) noexcept
future remove_file(std::string_view name) noexcept
future< std::optional< directory_entry_type > > file_type(std::string_view name, follow_symlink follow=follow_symlink::yes) noexcept
future< fs_type > file_system_at(std::string_view name) noexcept
future rename_file(std::string_view old_name, std::string_view new_name) noexcept
future< bool > file_exists(std::string_view name) noexcept
future chmod(std::string_view name, file_permissions permissions) noexcept
future< uint64_t > file_size(std::string_view name) noexcept
future link_file(std::string_view oldpath, std::string_view newpath) noexcept
future< bool > file_accessible(std::string_view name, access_flags flags) noexcept
open_flags
Definition: file-types.hh:41
future make_directory(std::string_view name, file_permissions permissions=file_permissions::default_dir_permissions) noexcept
future< T > get_future() noexcept
Gets the promise's associated future.
Definition: future.hh:1917
future now()
Returns a ready future.
Definition: later.hh:35
future< std::tuple< file_desc, file_desc > > make_pipe()
server_socket listen(socket_address sa)
future< connected_socket > connect(socket_address sa)
holds the metric_groups definition needed by class that reports metrics
future configure(const options &opts)
set the metrics configuration
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
const noncopyable_function< bool()> & work_waiting_on_reactor
Definition: idle_cpu_handler.hh:46
future< scheduling_group > create_scheduling_group(sstring name, float shares) noexcept
future sleep(std::chrono::duration< Rep, Period > dur)
Definition: sleep.hh:49
future< scheduling_group_key > scheduling_group_key_create(scheduling_group_key_config cfg) noexcept
future destroy_scheduling_group(scheduling_group sg) noexcept
future rename_scheduling_group(scheduling_group sg, sstring new_name) noexcept
idle_cpu_handler_result
Definition: idle_cpu_handler.hh:37
@ no_more_work
The user callback has no more work to perform.
noncopyable_function< idle_cpu_handler_result(work_waiting_on_reactor poll)> idle_cpu_handler
Definition: idle_cpu_handler.hh:52
Definition: noncopyable_function.hh:37
Configuration for the reactor.
Definition: reactor_config.hh:45
Definition: scheduling.hh:143