25 #ifndef SEASTAR_MODULE
30 #include <type_traits>
31 #include <boost/lockfree/queue.hpp>
34 #include <seastar/core/future.hh>
35 #include <seastar/core/cacheline.hh>
36 #include <seastar/core/sstring.hh>
38 #include <seastar/util/concepts.hh>
39 #include <seastar/util/modules.hh>
52 static constexpr
size_t batch_size = 128;
53 static constexpr
size_t prefetch_cnt = 2;
55 struct lf_queue_remote {
58 using lf_queue_base = boost::lockfree::queue<work_item*>;
60 struct lf_queue : lf_queue_remote, lf_queue_base {
62 : lf_queue_remote{remote}, lf_queue_base{batch_size} {}
65 struct alignas(seastar::cache_line_size) {
66 std::atomic<size_t> value{0};
73 struct alignas(seastar::cache_line_size) {
75 size_t _last_rcv_batch = 0;
78 virtual ~work_item() =
default;
79 virtual void process() = 0;
81 template <
typename Func>
82 struct async_work_item : work_item {
84 async_work_item(Func&& func) : _func(std::move(func)) {}
85 void process()
override {
89 template<
typename Func>
90 size_t process_queue(lf_queue& q, Func process);
91 void submit_item(std::unique_ptr<work_item> wi);
96 template <
typename Func>
97 void submit(Func&& func) {
98 auto wi = std::make_unique<async_work_item<Func>>(std::forward<Func>(func));
99 submit_item(std::move(wi));
101 size_t process_incoming();
102 bool pure_poll_rx()
const;
119 SEASTAR_MODULE_EXPORT
121 using qs = std::unique_ptr<message_queue[], internal::qs_deleter>;
123 static qs create_qs(
const std::vector<reactor*>& reactors);
126 bool pure_poll_queues();
147 SEASTAR_MODULE_EXPORT
148 template <
typename Func>
149 SEASTAR_CONCEPT(requires std::is_nothrow_invocable_r_v<void, Func>)
151 instance._qs[shard].submit(std::move(func));
165 template <
typename Func>
166 [[deprecated(
"Use run_on(instance&, unsigned shard, Func) instead")]]
168 run_on(*internal::default_instance, shard, std::move(func));
172 template<
typename Func>
175 template<
typename Func,
176 bool = std::is_empty_v<return_value_t<Func>>>
179 static void set(std::promise<void>& p, return_value_t<Func>&&) {
183 template<
typename Func>
186 using type = std::tuple_element_t<0, return_tuple_t>;
187 static void set(std::promise<type>& p, return_value_t<Func>&& t) {
188 p.set_value(std::move(t));
191 template <
typename Func>
using return_type_t =
typename return_type_of<Func>::type;
203 SEASTAR_MODULE_EXPORT
204 template<
typename Func,
typename T =
internal::return_type_t<Func>>
205 SEASTAR_CONCEPT(requires std::invocable<Func>)
208 auto fut = pr.get_future();
209 run_on(
instance, shard, [pr = std::move(pr), func = std::move(func)] ()
mutable noexcept {
211 (void)func().then_wrapped([pr = std::move(pr)] (
auto&& result)
mutable {
215 pr.set_exception(std::current_exception());
230 template<
typename Func,
typename T =
internal::return_type_t<Func>>
231 [[deprecated(
"Use submit_to(instance&, unsigned shard, Func) instead.")]]
233 return submit_to(*internal::default_instance, shard, std::move(func));
A representation of a possibly not-yet-computed value.
Definition: future.hh:1238
holds the metric definition.
Definition: metrics_registration.hh:94
Definition: reactor.hh:168
holds the metric_groups definition needed by class that reports metrics
void run_on(instance &instance, unsigned shard, Func func)
Definition: alien.hh:150
std::future< T > submit_to(instance &instance, unsigned shard, Func func)
Definition: alien.hh:206
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
Converts a type to a future type, if it isn't already.
Definition: future.hh:1843