30 #include <boost/lockfree/queue.hpp>
32 #include <seastar/core/future.hh>
33 #include <seastar/core/cacheline.hh>
34 #include <seastar/core/sstring.hh>
47 static constexpr
size_t batch_size = 128;
48 static constexpr
size_t prefetch_cnt = 2;
50 struct lf_queue_remote {
53 using lf_queue_base = boost::lockfree::queue<work_item*>;
55 struct lf_queue : lf_queue_remote, lf_queue_base {
57 : lf_queue_remote{remote}, lf_queue_base{batch_size} {}
60 struct alignas(seastar::cache_line_size) {
61 std::atomic<size_t> value{0};
68 struct alignas(seastar::cache_line_size) {
70 size_t _last_rcv_batch = 0;
73 virtual ~work_item() =
default;
74 virtual void process() = 0;
76 template <
typename Func>
77 struct async_work_item : work_item {
79 async_work_item(Func&& func) : _func(std::move(func)) {}
80 void process()
override {
84 template<
typename Func>
85 size_t process_queue(lf_queue& q, Func process);
86 void submit_item(std::unique_ptr<work_item> wi);
91 template <
typename Func>
92 void submit(Func&& func) {
93 auto wi = std::make_unique<async_work_item<Func>>(std::forward<Func>(func));
94 submit_item(std::move(wi));
96 size_t process_incoming();
97 bool pure_poll_rx()
const;
103 qs_deleter(
unsigned n = 0) : count(n) {}
104 qs_deleter(
const qs_deleter& d) : count(d.count) {}
107 using qs = std::unique_ptr<message_queue[], qs_deleter>;
109 static qs create_qs(
const std::vector<reactor*>& reactors);
111 static bool poll_queues();
112 static bool pure_poll_queues();
126 template <
typename Func>
128 smp::_qs[shard].submit(std::move(func));
132 template<
typename Func>
135 template<
typename Func,
136 bool = std::is_empty_v<return_value_t<Func>>>
139 static void set(std::promise<void>& p, return_value_t<Func>&&) {
143 template<
typename Func>
146 using type = std::tuple_element_t<0, return_tuple_t>;
147 static void set(std::promise<type>& p, return_value_t<Func>&& t) {
148 #if SEASTAR_API_LEVEL < 5
149 p.set_value(std::get<0>(std::move(t)));
151 p.set_value(std::move(t));
155 template <
typename Func>
using return_type_t =
typename return_type_of<Func>::type;
166 template<
typename Func,
typename T =
internal::return_type_t<Func>>
169 auto fut = pr.get_future();
170 run_on(shard, [pr = std::move(pr), func = std::move(func)] ()
mutable {
172 (void)func().then_wrapped([pr = std::move(pr)] (
auto&& result)
mutable {
176 pr.set_exception(std::current_exception());