Seastar
High performance C++ framework for concurrent servers
alien.hh
Go to the documentation of this file.
1// -*- mode:C++; tab-width:4; c-basic-offset:4; indent-tabs-mode:nil -*-
2/*
3 * This file is open source software, licensed to you under the terms
4 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
5 * distributed with this work for additional information regarding copyright
6 * ownership. You may not use this file except in compliance with the License.
7 *
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19/*
20 * Copyright (C) 2018 Red Hat
21 */
22
23#pragma once
24
25#ifndef SEASTAR_MODULE
26#include <atomic>
27#include <concepts>
28#include <future>
29#include <memory>
30#include <type_traits>
31#include <boost/lockfree/queue.hpp>
32#endif
33
34#include <seastar/core/future.hh>
35#include <seastar/core/cacheline.hh>
36#include <seastar/core/sstring.hh>
38#include <seastar/util/modules.hh>
39
41
42namespace seastar {
43
44SEASTAR_MODULE_EXPORT
45class reactor;
46
48namespace alien {
49
51 static constexpr size_t batch_size = 128;
52 static constexpr size_t prefetch_cnt = 2;
53 struct work_item;
54 struct lf_queue_remote {
55 reactor* remote;
56 };
57 using lf_queue_base = boost::lockfree::queue<work_item*>;
58 // use inheritence to control placement order
59 struct lf_queue : lf_queue_remote, lf_queue_base {
60 lf_queue(reactor* remote)
61 : lf_queue_remote{remote}, lf_queue_base{batch_size} {}
62 void maybe_wakeup();
63 } _pending;
64 struct alignas(seastar::cache_line_size) {
65 std::atomic<size_t> value{0};
66 } _sent;
67 // keep this between two structures with statistics
68 // this makes sure that they have at least one cache line
69 // between them, so hw prefetcher will not accidentally prefetch
70 // cache line used by another cpu.
72 struct alignas(seastar::cache_line_size) {
73 size_t _received = 0;
74 size_t _last_rcv_batch = 0;
75 };
76 struct work_item {
77 virtual ~work_item() = default;
78 virtual void process() = 0;
79 };
80 template <typename Func>
81 struct async_work_item : work_item {
82 Func _func;
83 async_work_item(Func&& func) : _func(std::move(func)) {}
84 void process() override {
85 _func();
86 }
87 };
88 template<typename Func>
89 size_t process_queue(lf_queue& q, Func process);
90 void submit_item(std::unique_ptr<work_item> wi);
91public:
93 void start();
94 void stop();
95 template <typename Func>
96 void submit(Func&& func) {
97 auto wi = std::make_unique<async_work_item<Func>>(std::forward<Func>(func));
98 submit_item(std::move(wi));
99 }
100 size_t process_incoming();
101 bool pure_poll_rx() const;
102};
103
104namespace internal {
105
107 unsigned count;
108 qs_deleter(unsigned n = 0) : count(n) {}
109 void operator()(message_queue* qs) const;
110};
111
112}
113
118SEASTAR_MODULE_EXPORT
119class instance {
120 using qs = std::unique_ptr<message_queue[], internal::qs_deleter>;
121public:
122 static qs create_qs(const std::vector<reactor*>& reactors);
123 qs _qs;
124 bool poll_queues();
125 bool pure_poll_queues();
126};
127
128namespace internal {
129
130extern instance* default_instance;
131
132}
133
146SEASTAR_MODULE_EXPORT
147template <typename Func>
148requires std::is_nothrow_invocable_r_v<void, Func>
149void run_on(instance& instance, unsigned shard, Func func) {
150 instance._qs[shard].submit(std::move(func));
151}
152
164template <typename Func>
165[[deprecated("Use run_on(instance&, unsigned shard, Func) instead")]]
166void run_on(unsigned shard, Func func) {
167 run_on(*internal::default_instance, shard, std::move(func));
168}
169
170namespace internal {
171template<typename Func>
172using return_value_t = typename futurize<std::invoke_result_t<Func>>::value_type;
173
174template<typename Func,
175 bool = std::is_empty_v<return_value_t<Func>>>
177 using type = void;
178 static void set(std::promise<void>& p, return_value_t<Func>&&) {
179 p.set_value();
180 }
181};
182template<typename Func>
183struct return_type_of<Func, false> {
184 using return_tuple_t = typename futurize<std::invoke_result_t<Func>>::tuple_type;
185 using type = std::tuple_element_t<0, return_tuple_t>;
186 static void set(std::promise<type>& p, return_value_t<Func>&& t) {
187 p.set_value(std::move(t));
188 }
189};
190template <typename Func> using return_type_t = typename return_type_of<Func>::type;
191}
192
202SEASTAR_MODULE_EXPORT
203template<std::invocable Func, typename T = internal::return_type_t<Func>>
204std::future<T> submit_to(instance& instance, unsigned shard, Func func) {
205 std::promise<T> pr;
206 auto fut = pr.get_future();
207 run_on(instance, shard, [pr = std::move(pr), func = std::move(func)] () mutable noexcept {
208 // std::future returned via std::promise above.
209 (void)func().then_wrapped([pr = std::move(pr)] (auto&& result) mutable {
210 try {
211 internal::return_type_of<Func>::set(pr, result.get());
212 } catch (...) {
213 pr.set_exception(std::current_exception());
214 }
215 });
216 });
217 return fut;
218}
219
228template<typename Func, typename T = internal::return_type_t<Func>>
229[[deprecated("Use submit_to(instance&, unsigned shard, Func) instead.")]]
230std::future<T> submit_to(unsigned shard, Func func) {
231 return submit_to(*internal::default_instance, shard, std::move(func));
232}
233
234}
235}
Definition: alien.hh:119
Definition: alien.hh:50
holds the metric definition.
Definition: metrics_registration.hh:94
Definition: reactor.hh:146
holds the metric_groups definition needed by class that reports metrics
void run_on(instance &instance, unsigned shard, Func func)
Definition: alien.hh:149
std::future< T > submit_to(instance &instance, unsigned shard, Func func)
Definition: alien.hh:204
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
Definition: alien.hh:106
Converts a type to a future type, if it isn't already.
Definition: future.hh:1853