Seastar
High performance C++ framework for concurrent servers
reactor.hh
1 /*
2  * This file is open source software, licensed to you under the terms
3  * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4  * distributed with this work for additional information regarding copyright
5  * ownership. You may not use this file except in compliance with the License.
6  *
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an
13  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14  * KIND, either express or implied. See the License for the
15  * specific language governing permissions and limitations
16  * under the License.
17  */
18 /*
19  * Copyright 2014 Cloudius Systems
20  */
21 
22 #pragma once
23 
24 #include <seastar/core/aligned_buffer.hh>
25 #include <seastar/core/cacheline.hh>
26 #include <seastar/core/circular_buffer.hh>
28 #include <seastar/core/condition-variable.hh>
29 #include <seastar/core/enum.hh>
30 #include <seastar/core/fair_queue.hh>
31 #include <seastar/core/file.hh>
32 #include <seastar/core/future.hh>
34 #include <seastar/core/internal/io_request.hh>
35 #include <seastar/core/internal/io_sink.hh>
36 #include <seastar/core/iostream.hh>
37 #include <seastar/core/linux-aio.hh>
38 #include <seastar/core/lowres_clock.hh>
39 #include <seastar/core/make_task.hh>
40 #include <seastar/core/manual_clock.hh>
41 #include <seastar/core/memory.hh>
43 #include <seastar/core/internal/estimated_histogram.hh>
44 #include <seastar/core/posix.hh>
45 #include <seastar/core/reactor_config.hh>
46 #include <seastar/core/scattered_message.hh>
48 #include <seastar/core/scheduling_specific.hh>
49 #include <seastar/core/seastar.hh>
50 #include <seastar/core/semaphore.hh>
51 #include <seastar/core/smp.hh>
52 #include <seastar/core/sstring.hh>
53 #include <seastar/core/temporary_buffer.hh>
54 #include <seastar/core/thread_cputime_clock.hh>
55 #include <seastar/core/timer.hh>
56 #include <seastar/core/gate.hh>
57 #include <seastar/net/api.hh>
58 #include <seastar/util/eclipse.hh>
59 #include <seastar/util/log.hh>
60 #include <seastar/util/std-compat.hh>
61 #include <seastar/util/modules.hh>
62 #include "internal/pollable_fd.hh"
63 
64 #ifndef SEASTAR_MODULE
65 #include <boost/container/static_vector.hpp>
66 #include <boost/lockfree/spsc_queue.hpp>
67 #include <boost/next_prior.hpp>
68 #include <boost/range/irange.hpp>
69 #include <boost/thread/barrier.hpp>
70 #include <algorithm>
71 #include <atomic>
72 #include <cassert>
73 #include <chrono>
74 #include <cstring>
75 #include <memory>
76 #include <queue>
77 #include <ratio>
78 #include <set>
79 #include <stack>
80 #include <stdexcept>
81 #include <string_view>
82 #include <system_error>
83 #include <thread>
84 #include <type_traits>
85 #include <unordered_map>
86 #include <vector>
87 #include <unistd.h>
88 #include <sys/epoll.h>
89 #include <sys/types.h>
90 #include <sys/socket.h>
91 #include <netinet/ip.h>
92 
93 #ifdef HAVE_OSV
94 #include <osv/sched.hh>
95 #include <osv/mutex.h>
96 #include <osv/condvar.h>
97 #include <osv/newpoll.hh>
98 #endif
99 #endif
100 
101 struct statfs;
102 struct _Unwind_Exception;
103 
104 namespace seastar {
105 
106 using shard_id = unsigned;
107 
108 namespace alien {
109 class message_queue;
110 class instance;
111 }
112 SEASTAR_MODULE_EXPORT
113 class reactor;
114 
115 }
116 
117 namespace std {
118 
119 template <>
120 struct hash<::sockaddr_in> {
121  size_t operator()(::sockaddr_in a) const {
122  return a.sin_port ^ a.sin_addr.s_addr;
123  }
124 };
125 
126 }
127 
128 bool operator==(const ::sockaddr_in a, const ::sockaddr_in b);
129 
130 namespace seastar {
131 
132 class thread_pool;
133 class smp;
134 
135 class reactor_backend_selector;
136 
137 class reactor_backend;
138 struct pollfn;
139 
140 namespace internal {
141 
142 class reactor_stall_sampler;
143 class cpu_stall_detector;
144 class buffer_allocator;
145 class priority_class;
146 class poller;
147 
148 size_t scheduling_group_count();
149 
150 void increase_thrown_exceptions_counter() noexcept;
151 
152 }
153 
154 class kernel_completion;
155 class io_queue;
156 SEASTAR_MODULE_EXPORT
157 class io_intent;
158 
160 public:
161  virtual void complete_with(ssize_t res) final override;
162 
163  virtual void complete(size_t res) noexcept = 0;
164  virtual void set_exception(std::exception_ptr eptr) noexcept = 0;
165 };
166 
167 SEASTAR_MODULE_EXPORT
168 class reactor {
169 private:
170  struct task_queue;
171  using task_queue_list = circular_buffer_fixed_capacity<task_queue*, 1 << log2ceil(max_scheduling_groups())>;
172  using pollfn = seastar::pollfn;
173 
174  class signal_pollfn;
175  class batch_flush_pollfn;
176  class smp_pollfn;
177  class drain_cross_cpu_freelist_pollfn;
178  class lowres_timer_pollfn;
179  class manual_timer_pollfn;
180  class epoll_pollfn;
181  class reap_kernel_completions_pollfn;
182  class kernel_submit_work_pollfn;
183  class io_queue_submission_pollfn;
184  class syscall_pollfn;
185  class execution_stage_pollfn;
186  friend class manual_clock;
187  friend class file_data_source_impl; // for fstream statistics
188  friend class internal::reactor_stall_sampler;
189  friend class preempt_io_context;
190  friend struct hrtimer_aio_completion;
191  friend class reactor_backend_epoll;
192  friend class reactor_backend_aio;
193  friend class reactor_backend_uring;
194  friend class reactor_backend_selector;
195  friend class io_queue; // for aio statistics
196  friend struct reactor_options;
197  friend class aio_storage_context;
198 public:
199  using poller = internal::poller;
203 
204  struct io_stats {
205  uint64_t aio_reads = 0;
206  uint64_t aio_read_bytes = 0;
207  uint64_t aio_writes = 0;
208  uint64_t aio_write_bytes = 0;
209  uint64_t aio_outsizes = 0;
210  uint64_t aio_errors = 0;
211  uint64_t fstream_reads = 0;
212  uint64_t fstream_read_bytes = 0;
213  uint64_t fstream_reads_blocked = 0;
214  uint64_t fstream_read_bytes_blocked = 0;
215  uint64_t fstream_read_aheads_discarded = 0;
216  uint64_t fstream_read_ahead_discarded_bytes = 0;
217  };
219  struct sched_stats {
223  uint64_t tasks_processed = 0;
224  };
225  friend void io_completion::complete_with(ssize_t);
226 
229  alien::instance& alien() { return _alien; }
230 
231 private:
232  std::shared_ptr<smp> _smp;
233  alien::instance& _alien;
234  reactor_config _cfg;
235  file_desc _notify_eventfd;
236  file_desc _task_quota_timer;
237 #ifdef HAVE_OSV
238  reactor_backend_osv _backend;
239  sched::thread _timer_thread;
240  sched::thread *_engine_thread;
241  mutable mutex _timer_mutex;
242  condvar _timer_cond;
243  s64 _timer_due = 0;
244 #else
245  std::unique_ptr<reactor_backend> _backend;
246 #endif
247  sigset_t _active_sigmask; // holds sigmask while sleeping with sig disabled
248  std::vector<pollfn*> _pollers;
249 
250  static constexpr unsigned max_aio_per_queue = 128;
251  static constexpr unsigned max_queues = 8;
252  static constexpr unsigned max_aio = max_aio_per_queue * max_queues;
253 
254  // Each mountpouint is controlled by its own io_queue, but ...
255  std::unordered_map<dev_t, std::unique_ptr<io_queue>> _io_queues;
256  // ... when dispatched all requests get into this single sink
257  internal::io_sink _io_sink;
258  unsigned _num_io_groups = 0;
259 
260  std::vector<noncopyable_function<future<> ()>> _exit_funcs;
261  unsigned _id = 0;
262  bool _stopping = false;
263  bool _stopped = false;
264  bool _finished_running_tasks = false;
265  condition_variable _stop_requested;
266  bool _handle_sigint = true;
267  std::optional<future<std::unique_ptr<network_stack>>> _network_stack_ready;
268  int _return = 0;
269  promise<> _start_promise;
270  semaphore _cpu_started;
271  internal::preemption_monitor _preemption_monitor{};
272  uint64_t _global_tasks_processed = 0;
273  uint64_t _polls = 0;
274  metrics::internal::time_estimated_histogram _stalls_histogram;
275  std::unique_ptr<internal::cpu_stall_detector> _cpu_stall_detector;
276 
277  unsigned _max_task_backlog = 1000;
278  timer_set<timer<>, &timer<>::_link> _timers;
279  timer_set<timer<>, &timer<>::_link>::timer_list_t _expired_timers;
280  timer_set<timer<lowres_clock>, &timer<lowres_clock>::_link> _lowres_timers;
281  timer_set<timer<lowres_clock>, &timer<lowres_clock>::_link>::timer_list_t _expired_lowres_timers;
282  timer_set<timer<manual_clock>, &timer<manual_clock>::_link> _manual_timers;
283  timer_set<timer<manual_clock>, &timer<manual_clock>::_link>::timer_list_t _expired_manual_timers;
284  io_stats _io_stats;
285  uint64_t _fsyncs = 0;
286  uint64_t _cxx_exceptions = 0;
287  uint64_t _abandoned_failed_futures = 0;
288  struct task_queue {
289  explicit task_queue(unsigned id, sstring name, sstring shortname, float shares);
290  int64_t _vruntime = 0;
291  float _shares;
292  int64_t _reciprocal_shares_times_2_power_32;
293  bool _current = false;
294  bool _active = false;
295  uint8_t _id;
296  sched_clock::time_point _ts; // to help calculating wait/starve-times
297  sched_clock::duration _runtime = {};
298  sched_clock::duration _waittime = {};
299  sched_clock::duration _starvetime = {};
300  uint64_t _tasks_processed = 0;
301  circular_buffer<task*> _q;
302  sstring _name;
303  // the shortened version of scheduling_gruop's name, only the first 4
304  // chars are used.
305  static constexpr size_t shortname_size = 4;
306  sstring _shortname;
307  int64_t to_vruntime(sched_clock::duration runtime) const;
308  void set_shares(float shares) noexcept;
309  struct indirect_compare;
310  sched_clock::duration _time_spent_on_task_quota_violations = {};
312  void rename(sstring new_name, sstring new_shortname);
313  private:
314  void register_stats();
315  };
316 
317  boost::container::static_vector<std::unique_ptr<task_queue>, max_scheduling_groups()> _task_queues;
318  internal::scheduling_group_specific_thread_local_data _scheduling_group_specific_data;
319  int64_t _last_vruntime = 0;
320  task_queue_list _active_task_queues;
321  task_queue_list _activating_task_queues;
322  task_queue* _at_destroy_tasks;
323  sched_clock::duration _task_quota;
324  task* _current_task = nullptr;
334  std::unique_ptr<network_stack> _network_stack;
335  lowres_clock::time_point _lowres_next_timeout = lowres_clock::time_point::max();
336  std::optional<pollable_fd> _aio_eventfd;
337  const bool _reuseport;
338  circular_buffer<double> _loads;
339  double _load = 0;
340  sched_clock::duration _total_idle{0};
341  sched_clock::duration _total_sleep;
342  sched_clock::time_point _start_time = now();
343  std::chrono::nanoseconds _max_poll_time = calculate_poll_time();
344  output_stream<char>::batch_flush_list_t _flush_batching;
345  std::atomic<bool> _sleeping alignas(seastar::cache_line_size){0};
346  pthread_t _thread_id alignas(seastar::cache_line_size) = pthread_self();
347  bool _strict_o_direct = true;
348  bool _force_io_getevents_syscall = false;
349  bool _bypass_fsync = false;
350  bool _have_aio_fsync = false;
351  bool _kernel_page_cache = false;
352  std::atomic<bool> _dying{false};
353  gate _background_gate;
354 
355 private:
356  static std::chrono::nanoseconds calculate_poll_time();
357  static void block_notifier(int);
358  bool flush_pending_aio();
359  steady_clock_type::time_point next_pending_aio() const noexcept;
360  bool reap_kernel_completions();
361  bool flush_tcp_batches();
362  void update_lowres_clocks() noexcept;
363  bool do_expire_lowres_timers() noexcept;
364  bool do_check_lowres_timers() const noexcept;
365  void expire_manual_timers() noexcept;
366  void start_aio_eventfd_loop();
367  void stop_aio_eventfd_loop();
368  template <typename T, typename E, typename EnableFunc>
369  void complete_timers(T&, E&, EnableFunc&& enable_fn) noexcept(noexcept(enable_fn()));
370 
377  bool poll_once();
378  bool pure_poll_once();
379 public:
381  void handle_signal(int signo, noncopyable_function<void ()>&& handler);
382  void wakeup();
383 
384 private:
385  class signals {
386  public:
387  signals();
388  ~signals();
389 
390  bool poll_signal();
391  bool pure_poll_signal() const;
392  void handle_signal(int signo, noncopyable_function<void ()>&& handler);
393  void handle_signal_once(int signo, noncopyable_function<void ()>&& handler);
394  static void action(int signo, siginfo_t* siginfo, void* ignore);
395  static void failed_to_handle(int signo);
396  private:
397  struct signal_handler {
398  signal_handler(int signo, noncopyable_function<void ()>&& handler);
399  noncopyable_function<void ()> _handler;
400  };
401  std::atomic<uint64_t> _pending_signals;
402  std::unordered_map<int, signal_handler> _signal_handlers;
403  };
404 
405  signals _signals;
406  std::unique_ptr<thread_pool> _thread_pool;
407  friend class thread_pool;
408  friend class thread_context;
409  friend class internal::cpu_stall_detector;
410 
411  uint64_t pending_task_count() const;
412  void run_tasks(task_queue& tq);
413  bool have_more_tasks() const;
414  bool posix_reuseport_detect();
415  void run_some_tasks();
416  void activate(task_queue& tq);
417  void insert_active_task_queue(task_queue* tq);
418  task_queue* pop_active_task_queue(sched_clock::time_point now);
419  void insert_activating_task_queues();
420  void account_runtime(task_queue& tq, sched_clock::duration runtime);
421  void account_idle(sched_clock::duration idletime);
422  void allocate_scheduling_group_specific_data(scheduling_group sg, scheduling_group_key key);
423  future<> rename_scheduling_group_specific_data(scheduling_group sg);
424  future<> init_scheduling_group(scheduling_group sg, sstring name, sstring shortname, float shares);
425  future<> init_new_scheduling_group_key(scheduling_group_key key, scheduling_group_key_config cfg);
427  uint64_t tasks_processed() const;
428  uint64_t min_vruntime() const;
429  void request_preemption();
430  void start_handling_signal();
431  void reset_preemption_monitor();
432  void service_highres_timer() noexcept;
433 
434  future<std::tuple<pollable_fd, socket_address>>
435  do_accept(pollable_fd_state& listen_fd);
436  future<> do_connect(pollable_fd_state& pfd, socket_address& sa);
437 
438  future<size_t>
439  do_read(pollable_fd_state& fd, void* buffer, size_t size);
440  future<size_t>
441  do_recvmsg(pollable_fd_state& fd, const std::vector<iovec>& iov);
442  future<temporary_buffer<char>>
443  do_read_some(pollable_fd_state& fd, internal::buffer_allocator* ba);
444 
445  future<size_t>
446  do_send(pollable_fd_state& fd, const void* buffer, size_t size);
447  future<size_t>
448  do_sendmsg(pollable_fd_state& fd, net::packet& p);
449 
450  future<temporary_buffer<char>>
451  do_recv_some(pollable_fd_state& fd, internal::buffer_allocator* ba);
452 
453  int do_run();
454 public:
455  explicit reactor(std::shared_ptr<smp> smp, alien::instance& alien, unsigned id, reactor_backend_selector rbs, reactor_config cfg);
456  reactor(const reactor&) = delete;
457  ~reactor();
458  void operator=(const reactor&) = delete;
459 
460  static sched_clock::time_point now() noexcept {
461  return sched_clock::now();
462  }
463  sched_clock::duration uptime() {
464  return now() - _start_time;
465  }
466 
467  io_queue& get_io_queue(dev_t devid = 0) {
468  auto queue = _io_queues.find(devid);
469  if (queue == _io_queues.end()) {
470  return *_io_queues.at(0);
471  } else {
472  return *(queue->second);
473  }
474  }
475 
476 #if SEASTAR_API_LEVEL < 7
477  [[deprecated("Use io_priority_class::register_one")]]
478  io_priority_class register_one_priority_class(sstring name, uint32_t shares);
479 
480  [[deprecated("Use io_priority_class.update_shares")]]
481  future<> update_shares_for_class(io_priority_class pc, uint32_t shares);
482 
483  [[deprecated("Use io_priority_class.rename")]]
484  static future<> rename_priority_class(io_priority_class pc, sstring new_name) noexcept;
485 #endif
486 
488  future<> update_bandwidth_for_queues(internal::priority_class pc, uint64_t bandwidth);
490  void rename_queues(internal::priority_class pc, sstring new_name);
492  void update_shares_for_queues(internal::priority_class pc, uint32_t shares);
493 
494  void configure(const reactor_options& opts);
495 
496  server_socket listen(socket_address sa, listen_options opts = {});
497 
498  future<connected_socket> connect(socket_address sa);
499  future<connected_socket> connect(socket_address, socket_address, transport proto = transport::TCP);
500 
501  pollable_fd posix_listen(socket_address sa, listen_options opts = {});
502 
503  bool posix_reuseport_available() const { return _reuseport; }
504 
505  pollable_fd make_pollable_fd(socket_address sa, int proto);
506 
507  future<> posix_connect(pollable_fd pfd, socket_address sa, socket_address local);
508 
509  future<> send_all(pollable_fd_state& fd, const void* buffer, size_t size);
510 
511  future<file> open_file_dma(std::string_view name, open_flags flags, file_open_options options = {}) noexcept;
512  future<file> open_directory(std::string_view name) noexcept;
513  future<> make_directory(std::string_view name, file_permissions permissions = file_permissions::default_dir_permissions) noexcept;
514  future<> touch_directory(std::string_view name, file_permissions permissions = file_permissions::default_dir_permissions) noexcept;
515  future<std::optional<directory_entry_type>> file_type(std::string_view name, follow_symlink = follow_symlink::yes) noexcept;
516  future<stat_data> file_stat(std::string_view pathname, follow_symlink) noexcept;
517  future<uint64_t> file_size(std::string_view pathname) noexcept;
518  future<bool> file_accessible(std::string_view pathname, access_flags flags) noexcept;
519  future<bool> file_exists(std::string_view pathname) noexcept {
520  return file_accessible(pathname, access_flags::exists);
521  }
522  future<fs_type> file_system_at(std::string_view pathname) noexcept;
523  future<struct statvfs> statvfs(std::string_view pathname) noexcept;
524  future<> remove_file(std::string_view pathname) noexcept;
525  future<> rename_file(std::string_view old_pathname, std::string_view new_pathname) noexcept;
526  future<> link_file(std::string_view oldpath, std::string_view newpath) noexcept;
527  future<> chmod(std::string_view name, file_permissions permissions) noexcept;
528 
529  future<size_t> read_directory(int fd, char* buffer, size_t buffer_size);
530 
531  future<int> inotify_add_watch(int fd, std::string_view path, uint32_t flags);
532 
533  future<std::tuple<file_desc, file_desc>> make_pipe();
534  future<std::tuple<pid_t, file_desc, file_desc, file_desc>>
535  spawn(std::string_view pathname,
536  std::vector<sstring> argv,
537  std::vector<sstring> env = {});
538  future<int> waitpid(pid_t pid);
539  void kill(pid_t pid, int sig);
540 
541  int run() noexcept;
542  void exit(int ret);
543  future<> when_started() { return _start_promise.get_future(); }
544  // The function waits for timeout period for reactor stop notification
545  // which happens on termination signals or call for exit().
546  template <typename Rep, typename Period>
547  future<> wait_for_stop(std::chrono::duration<Rep, Period> timeout) {
548  return _stop_requested.wait(timeout, [this] { return _stopping; });
549  }
550 
551  void at_exit(noncopyable_function<future<> ()> func);
552 
553  template <typename Func>
554  void at_destroy(Func&& func) {
555  _at_destroy_tasks->_q.push_back(make_task(default_scheduling_group(), std::forward<Func>(func)));
556  }
557 
558  task* current_task() const { return _current_task; }
559  // If a task wants to resume a different task instead of returning control to the reactor,
560  // it should set _current_task to the resumed task.
561  // In particular, this is mandatory if the task is going to die before control is returned
562  // to the reactor -- otherwise _current_task will be left dangling.
563  void set_current_task(task* t) { _current_task = t; }
564 
565  void add_task(task* t) noexcept;
566  void add_urgent_task(task* t) noexcept;
567 
568  void run_in_background(future<> f);
569 
570  template <typename Func>
571  void run_in_background(Func&& func) {
572  run_in_background(futurize_invoke(std::forward<Func>(func)));
573  }
574 
575  // Waits for all background tasks on all shards
576  static future<> drain();
577 
587  _idle_cpu_handler = std::move(handler);
588  }
589  void force_poll();
590 
591  void add_high_priority_task(task*) noexcept;
592 
593  network_stack& net() { return *_network_stack; }
594 
595  [[deprecated("Use this_shard_id")]]
596  shard_id cpu_id() const;
597 
598  void sleep();
599 
600  steady_clock_type::duration total_idle_time();
601  steady_clock_type::duration total_busy_time();
602  std::chrono::nanoseconds total_steal_time();
603 
604  const io_stats& get_io_stats() const { return _io_stats; }
611  uint64_t abandoned_failed_futures() const { return _abandoned_failed_futures; }
612 #ifdef HAVE_OSV
613  void timer_thread_func();
614  void set_timer(sched::timer &tmr, s64 t);
615 #endif
616 private:
625  void register_poller(pollfn* p);
626  void unregister_poller(pollfn* p);
627  void replace_poller(pollfn* old, pollfn* neww);
628  void register_metrics();
629  future<> send_all_part(pollable_fd_state& fd, const void* buffer, size_t size, size_t completed);
630 
631  future<> fdatasync(int fd) noexcept;
632 
633  void add_timer(timer<steady_clock_type>*) noexcept;
634  bool queue_timer(timer<steady_clock_type>*) noexcept;
635  void del_timer(timer<steady_clock_type>*) noexcept;
636  void add_timer(timer<lowres_clock>*) noexcept;
637  bool queue_timer(timer<lowres_clock>*) noexcept;
638  void del_timer(timer<lowres_clock>*) noexcept;
639  void add_timer(timer<manual_clock>*) noexcept;
640  bool queue_timer(timer<manual_clock>*) noexcept;
641  void del_timer(timer<manual_clock>*) noexcept;
642 
643  future<> run_exit_tasks();
644  void stop();
645  friend class alien::message_queue;
646  friend class pollable_fd;
647  friend class pollable_fd_state;
648  friend class posix_file_impl;
649  friend class blockdev_file_impl;
650  friend class timer<>;
651  friend class timer<lowres_clock>;
652  friend class timer<manual_clock>;
653  friend class smp;
654  friend class smp_message_queue;
655  friend class internal::poller;
656  friend class scheduling_group;
657  friend void internal::add_to_flush_poller(output_stream<char>& os) noexcept;
658  friend void seastar::internal::increase_thrown_exceptions_counter() noexcept;
659  friend void report_failed_future(const std::exception_ptr& eptr) noexcept;
660  metrics::metric_groups _metric_groups;
661  friend future<scheduling_group> create_scheduling_group(sstring name, sstring shortname, float shares) noexcept;
663  friend future<> seastar::rename_scheduling_group(scheduling_group sg, sstring new_name, sstring new_shortname) noexcept;
665 
666  future<struct statfs> fstatfs(int fd) noexcept;
667  friend future<shared_ptr<file_impl>> make_file_impl(int fd, file_open_options options, int flags, struct stat st) noexcept;
668 public:
669  future<> readable(pollable_fd_state& fd);
670  future<> writeable(pollable_fd_state& fd);
671  future<> readable_or_writeable(pollable_fd_state& fd);
672  future<> poll_rdhup(pollable_fd_state& fd);
673  void enable_timer(steady_clock_type::time_point when) noexcept;
683  void set_strict_dma(bool value);
684  void set_bypass_fsync(bool value);
685  void update_blocked_reactor_notify_ms(std::chrono::milliseconds ms);
686  std::chrono::milliseconds get_blocked_reactor_notify_ms() const;
687 
688  class test {
689  public:
690  static void with_allow_abandoned_failed_futures(unsigned count, noncopyable_function<void ()> func);
691 
695  static void set_stall_detector_report_function(std::function<void ()> report);
696  static std::function<void ()> get_stall_detector_report_function();
697  };
698 };
699 
700 extern __thread reactor* local_engine;
701 extern __thread size_t task_quota;
702 
703 SEASTAR_MODULE_EXPORT
704 inline reactor& engine() {
705  return *local_engine;
706 }
707 
708 SEASTAR_MODULE_EXPORT
709 inline bool engine_is_ready() {
710  return local_engine != nullptr;
711 }
712 
713 inline int hrtimer_signal() {
714  // We don't want to use SIGALRM, because the boost unit test library
715  // also plays with it.
716  return SIGRTMIN;
717 }
718 
719 
720 extern logger seastar_logger;
721 
722 }
Definition: alien.hh:120
Definition: alien.hh:51
Definition: circular_buffer_fixed_capacity.hh:52
Conditional variable.
Definition: condition-variable.hh:74
future wait() noexcept
Definition: condition-variable.hh:209
Definition: posix.hh:85
Definition: file.hh:118
Definition: reactor.hh:159
Definition: io_queue.hh:95
Definition: io_desc.hh:29
Low-resolution and efficient steady clock.
Definition: lowres_clock.hh:59
Definition: manual_clock.hh:35
holds the metric definition.
Definition: metrics_registration.hh:94
Definition: api.hh:427
Definition: pollable_fd.hh:63
Definition: pollable_fd.hh:137
Definition: reactor.hh:688
static void set_stall_detector_report_function(std::function< void()> report)
Definition: reactor.hh:168
sched_stats get_sched_stats() const
void set_idle_cpu_handler(idle_cpu_handler &&handler)
Definition: reactor.hh:586
alien::instance & alien()
Definition: reactor.hh:229
Definition: reactor.hh:204
Scheduling statistics.
Definition: reactor.hh:219
Definition: scheduling.hh:183
Identifies function calls that are accounted as a group.
Definition: scheduling.hh:286
Definition: shared_ptr.hh:515
Definition: smp.hh:186
Definition: smp.hh:314
Definition: socket_defs.hh:47
Definition: task.hh:35
Definition: temporary_buffer.hh:67
Definition: timer.hh:84
future< stat_data > file_stat(std::string_view name, follow_symlink fs=follow_symlink::yes) noexcept
future touch_directory(std::string_view name, file_permissions permissions=file_permissions::default_dir_permissions) noexcept
future remove_file(std::string_view name) noexcept
future< std::optional< directory_entry_type > > file_type(std::string_view name, follow_symlink follow=follow_symlink::yes) noexcept
future< fs_type > file_system_at(std::string_view name) noexcept
future rename_file(std::string_view old_name, std::string_view new_name) noexcept
future< bool > file_exists(std::string_view name) noexcept
future chmod(std::string_view name, file_permissions permissions) noexcept
future< uint64_t > file_size(std::string_view name) noexcept
future link_file(std::string_view oldpath, std::string_view newpath) noexcept
future< bool > file_accessible(std::string_view name, access_flags flags) noexcept
open_flags
Definition: file-types.hh:41
future make_directory(std::string_view name, file_permissions permissions=file_permissions::default_dir_permissions) noexcept
future< T > get_future() noexcept
Gets the promise's associated future.
Definition: future.hh:1917
future now()
Returns a ready future.
Definition: later.hh:35
future< std::tuple< file_desc, file_desc > > make_pipe()
server_socket listen(socket_address sa)
future< connected_socket > connect(socket_address sa)
holds the metric_groups definition needed by class that reports metrics
future configure(const options &opts)
set the metrics configuration
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
const noncopyable_function< bool()> & work_waiting_on_reactor
Definition: idle_cpu_handler.hh:46
future< scheduling_group > create_scheduling_group(sstring name, float shares) noexcept
future sleep(std::chrono::duration< Rep, Period > dur)
Definition: sleep.hh:49
future< scheduling_group_key > scheduling_group_key_create(scheduling_group_key_config cfg) noexcept
future destroy_scheduling_group(scheduling_group sg) noexcept
future rename_scheduling_group(scheduling_group sg, sstring new_name) noexcept
idle_cpu_handler_result
Definition: idle_cpu_handler.hh:37
@ no_more_work
The user callback has no more work to perform.
noncopyable_function< idle_cpu_handler_result(work_waiting_on_reactor poll)> idle_cpu_handler
Definition: idle_cpu_handler.hh:52
Definition: noncopyable_function.hh:37
Definition: file.hh:91
Definition: poll.hh:26
Configuration for the reactor.
Definition: reactor_config.hh:45
Definition: scheduling.hh:143