31#include <boost/lockfree/queue.hpp>
34#include <seastar/core/future.hh>
35#include <seastar/core/cacheline.hh>
36#include <seastar/core/sstring.hh>
38#include <seastar/util/modules.hh>
51 static constexpr size_t batch_size = 128;
52 static constexpr size_t prefetch_cnt = 2;
54 struct lf_queue_remote {
57 using lf_queue_base = boost::lockfree::queue<work_item*>;
59 struct lf_queue : lf_queue_remote, lf_queue_base {
61 : lf_queue_remote{remote}, lf_queue_base{batch_size} {}
64 struct alignas(seastar::cache_line_size) {
65 std::atomic<size_t> value{0};
72 struct alignas(seastar::cache_line_size) {
74 size_t _last_rcv_batch = 0;
77 virtual ~work_item() =
default;
78 virtual void process() = 0;
80 template <
typename Func>
81 struct async_work_item : work_item {
83 async_work_item(Func&& func) : _func(std::move(func)) {}
84 void process()
override {
88 template<
typename Func>
89 size_t process_queue(lf_queue& q, Func process);
90 void submit_item(std::unique_ptr<work_item> wi);
95 template <
typename Func>
96 void submit(Func&& func) {
97 auto wi = std::make_unique<async_work_item<Func>>(std::forward<Func>(func));
98 submit_item(std::move(wi));
100 size_t process_incoming();
101 bool pure_poll_rx()
const;
120 using qs = std::unique_ptr<message_queue[], internal::qs_deleter>;
122 static qs create_qs(
const std::vector<reactor*>& reactors);
125 bool pure_poll_queues();
147template <
typename Func>
148requires std::is_nothrow_invocable_r_v<void, Func>
150 instance._qs[shard].submit(std::move(func));
164template <
typename Func>
165[[deprecated(
"Use run_on(instance&, unsigned shard, Func) instead")]]
167 run_on(*internal::default_instance, shard, std::move(func));
171template<
typename Func>
174template<
typename Func,
175 bool = std::is_empty_v<return_value_t<Func>>>
178 static void set(std::promise<void>& p, return_value_t<Func>&&) {
182template<
typename Func>
185 using type = std::tuple_element_t<0, return_tuple_t>;
186 static void set(std::promise<type>& p, return_value_t<Func>&& t) {
187 p.set_value(std::move(t));
190template <
typename Func>
using return_type_t =
typename return_type_of<Func>::type;
203template<std::invocable Func,
typename T =
internal::return_type_t<Func>>
206 auto fut = pr.get_future();
207 run_on(
instance, shard, [pr = std::move(pr), func = std::move(func)] ()
mutable noexcept {
209 (void)func().then_wrapped([pr = std::move(pr)] (
auto&& result)
mutable {
213 pr.set_exception(std::current_exception());
228template<
typename Func,
typename T =
internal::return_type_t<Func>>
229[[deprecated(
"Use submit_to(instance&, unsigned shard, Func) instead.")]]
231 return submit_to(*internal::default_instance, shard, std::move(func));
holds the metric definition.
Definition: metrics_registration.hh:94
Definition: reactor.hh:147
holds the metric_groups definition needed by class that reports metrics
void run_on(instance &instance, unsigned shard, Func func)
Definition: alien.hh:149
std::future< T > submit_to(instance &instance, unsigned shard, Func func)
Definition: alien.hh:204
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
Converts a type to a future type, if it isn't already.
Definition: future.hh:1832