Seastar
High performance C++ framework for concurrent servers
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Modules Pages
smp.hh
Go to the documentation of this file.
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18/*
19 * Copyright 2019 ScyllaDB
20 */
21
22#pragma once
23
24#include <seastar/core/future.hh>
25#include <seastar/core/loop.hh>
26#include <seastar/core/semaphore.hh>
28#include <seastar/core/posix.hh>
29#include <seastar/core/reactor_config.hh>
30#include <seastar/core/resource.hh>
32#include <seastar/util/modules.hh>
33
34#ifndef SEASTAR_MODULE
35#include <boost/lockfree/spsc_queue.hpp>
36#include <boost/thread/barrier.hpp>
37#include <deque>
38#include <optional>
39#include <thread>
40#include <ranges>
41#endif
42
44
45namespace seastar {
46
47class reactor_backend_selector;
48
49SEASTAR_MODULE_EXPORT_BEGIN
50
51class smp_service_group;
52
53namespace alien {
54
55class instance;
56
57}
58SEASTAR_MODULE_EXPORT_END
59
60namespace internal {
61
62unsigned smp_service_group_id(smp_service_group ssg) noexcept;
63
64class memory_prefaulter;
65
66}
67
68namespace memory::internal {
69
70struct numa_layout;
71
72}
73
74SEASTAR_MODULE_EXPORT_BEGIN
75
88 std::optional<sstring> group_name;
89};
90
112 unsigned _id;
113#ifdef SEASTAR_DEBUG
114 unsigned _version = 0;
115#endif
116private:
117 explicit smp_service_group(unsigned id) noexcept : _id(id) {}
118
119 friend unsigned internal::smp_service_group_id(smp_service_group ssg) noexcept;
123};
124
125SEASTAR_MODULE_EXPORT_END
126
127inline
128unsigned
129internal::smp_service_group_id(smp_service_group ssg) noexcept {
130 return ssg._id;
131}
132
133SEASTAR_MODULE_EXPORT_BEGIN
139smp_service_group default_smp_service_group() noexcept;
140
146
152
153inline
155 return smp_service_group(0);
156}
157
161
162SEASTAR_MODULE_EXPORT_END
163
164static constexpr smp_timeout_clock::time_point smp_no_timeout = smp_timeout_clock::time_point::max();
165
166SEASTAR_MODULE_EXPORT_BEGIN
174 smp_timeout_clock::time_point timeout = smp_no_timeout;
175
176 smp_submit_to_options(smp_service_group service_group = default_smp_service_group(), smp_timeout_clock::time_point timeout = smp_no_timeout) noexcept
178 , timeout(timeout) {
179 }
180};
181
182void init_default_smp_service_group(shard_id cpu);
183
184smp_service_group_semaphore& get_smp_service_groups_semaphore(unsigned ssg_id, shard_id t) noexcept;
185
187 static constexpr size_t queue_length = 128;
188 static constexpr size_t batch_size = 16;
189 static constexpr size_t prefetch_cnt = 2;
190 struct work_item;
191 struct lf_queue_remote {
192 reactor* remote;
193 };
194 using lf_queue_base = boost::lockfree::spsc_queue<work_item*,
195 boost::lockfree::capacity<queue_length>>;
196 // use inheritence to control placement order
197 struct lf_queue : lf_queue_remote, lf_queue_base {
198 lf_queue(reactor* remote) : lf_queue_remote{remote} {}
199 void maybe_wakeup();
200 ~lf_queue();
201 };
202 lf_queue _pending;
203 lf_queue _completed;
204 struct alignas(seastar::cache_line_size) {
205 size_t _sent = 0;
206 size_t _compl = 0;
207 size_t _last_snt_batch = 0;
208 size_t _last_cmpl_batch = 0;
209 size_t _current_queue_length = 0;
210 };
211 // keep this between two structures with statistics
212 // this makes sure that they have at least one cache line
213 // between them, so hw prefetcher will not accidentally prefetch
214 // cache line used by another cpu.
215 metrics::metric_groups _metrics;
216 struct alignas(seastar::cache_line_size) {
217 size_t _received = 0;
218 size_t _last_rcv_batch = 0;
219 };
220 struct work_item : public task {
221 explicit work_item(smp_service_group ssg) : task(current_scheduling_group()), ssg(ssg) {}
223 virtual ~work_item() {}
224 virtual void fail_with(std::exception_ptr) = 0;
225 void process();
226 virtual void complete() = 0;
227 };
228 template <typename Func>
229 struct async_work_item : work_item {
230 smp_message_queue& _queue;
231 Func _func;
232 using futurator = futurize<std::invoke_result_t<Func>>;
233 using future_type = typename futurator::type;
234 using value_type = typename future_type::value_type;
235 std::optional<value_type> _result;
236 std::exception_ptr _ex; // if !_result
237 typename futurator::promise_type _promise; // used on local side
238 async_work_item(smp_message_queue& queue, smp_service_group ssg, Func&& func) : work_item(ssg), _queue(queue), _func(std::move(func)) {}
239 virtual void fail_with(std::exception_ptr ex) override {
240 _promise.set_exception(std::move(ex));
241 }
242 virtual task* waiting_task() noexcept override {
243 // FIXME: waiting_tasking across shards is not implemented. Unsynchronized task access is unsafe.
244 return nullptr;
245 }
246 virtual void run_and_dispose() noexcept override {
247 // _queue.respond() below forwards the continuation chain back to the
248 // calling shard.
249 (void)futurator::invoke(this->_func).then_wrapped([this] (auto f) {
250 if (f.failed()) {
251 _ex = f.get_exception();
252 } else {
253 _result = f.get();
254 }
255 _queue.respond(this);
256 });
257 // We don't delete the task here as the creator of the work item will
258 // delete it on the origin shard.
259 }
260 virtual void complete() override {
261 if (_result) {
262 _promise.set_value(std::move(*_result));
263 } else {
264 // FIXME: _ex was allocated on another cpu
265 _promise.set_exception(std::move(_ex));
266 }
267 }
268 future_type get_future() { return _promise.get_future(); }
269 };
270 union tx_side {
271 tx_side() {}
272 ~tx_side() {}
273 void init() { new (&a) aa; }
274 struct aa {
275 std::deque<work_item*> pending_fifo;
276 } a;
277 } _tx;
278 std::vector<work_item*> _completed_fifo;
279public:
282 template <typename Func>
283 futurize_t<std::invoke_result_t<Func>> submit(shard_id t, smp_submit_to_options options, Func&& func) noexcept {
285 auto wi = std::make_unique<async_work_item<Func>>(*this, options.service_group, std::forward<Func>(func));
286 auto fut = wi->get_future();
287 submit_item(t, options.timeout, std::move(wi));
288 return fut;
289 }
290 void start(unsigned cpuid);
291 template<size_t PrefetchCnt, typename Func>
292 size_t process_queue(lf_queue& q, Func process);
293 size_t process_incoming();
294 size_t process_completions(shard_id t);
295 void stop();
296private:
297 void work();
298 void submit_item(shard_id t, smp_timeout_clock::time_point timeout, std::unique_ptr<work_item> wi);
299 void respond(work_item* wi);
300 void move_pending();
301 void flush_request_batch();
302 void flush_response_batch();
303 bool has_unflushed_responses() const;
304 bool pure_poll_rx() const;
305 bool pure_poll_tx() const;
306
307 friend class smp;
308};
309
310class smp_message_queue;
311struct reactor_options;
312struct smp_options;
313
314class smp : public std::enable_shared_from_this<smp> {
315 alien::instance& _alien;
316 std::vector<posix_thread> _threads;
317 std::vector<std::function<void ()>> _thread_loops; // for dpdk
318 std::optional<boost::barrier> _all_event_loops_done;
319 std::unique_ptr<internal::memory_prefaulter> _prefaulter;
320 struct qs_deleter {
321 void operator()(smp_message_queue** qs) const;
322 };
323 std::unique_ptr<smp_message_queue*[], qs_deleter> _qs_owner;
324 static thread_local smp_message_queue**_qs;
325 static thread_local std::thread::id _tmain;
326 bool _using_dpdk = false;
327
328private:
329 void setup_prefaulter(const seastar::resource::resources& res, seastar::memory::internal::numa_layout layout);
330public:
331 explicit smp(alien::instance& alien);
332 ~smp();
333 void configure(const smp_options& smp_opts, const reactor_options& reactor_opts);
334 void cleanup() noexcept;
335 void cleanup_cpu();
336 void arrive_at_event_loop_end();
337 void join_all();
338 static bool main_thread() { return std::this_thread::get_id() == _tmain; }
339
353 template <typename Func>
354 static futurize_t<std::invoke_result_t<Func>> submit_to(unsigned t, smp_submit_to_options options, Func&& func) noexcept {
355 using ret_type = std::invoke_result_t<Func>;
356 if (t == this_shard_id()) {
357 try {
359 // Non-deferring function, so don't worry about func lifetime
360 return futurize<ret_type>::invoke(std::forward<Func>(func));
361 } else if (std::is_lvalue_reference_v<Func>) {
362 // func is an lvalue, so caller worries about its lifetime
363 return futurize<ret_type>::invoke(func);
364 } else {
365 // Deferring call on rvalue function, make sure to preserve it across call
366 auto w = std::make_unique<std::decay_t<Func>>(std::move(func));
367 auto ret = futurize<ret_type>::invoke(*w);
368 return ret.finally([w = std::move(w)] {});
369 }
370 } catch (...) {
371 // Consistently return a failed future rather than throwing, to simplify callers
372 return futurize<std::invoke_result_t<Func>>::make_exception_future(std::current_exception());
373 }
374 } else {
375 return _qs[t][this_shard_id()].submit(t, options, std::forward<Func>(func));
376 }
377 }
392 template <typename Func>
393 static futurize_t<std::invoke_result_t<Func>> submit_to(unsigned t, Func&& func) noexcept {
394 return submit_to(t, default_smp_service_group(), std::forward<Func>(func));
395 }
396 static bool poll_queues();
397 static bool pure_poll_queues();
398 static std::ranges::range auto all_cpus() noexcept {
399 return std::views::iota(0u, count);
400 }
409 template<typename Func>
410 requires std::is_nothrow_move_constructible_v<Func>
411 static future<> invoke_on_all(smp_submit_to_options options, Func&& func) noexcept {
412 static_assert(std::is_same_v<future<>, typename futurize<std::invoke_result_t<Func>>::type>, "bad Func signature");
413 static_assert(std::is_nothrow_move_constructible_v<Func>);
414 return parallel_for_each(all_cpus(), [options, &func] (unsigned id) {
415 return smp::submit_to(id, options, Func(func));
416 });
417 }
427 template<typename Func>
428 static future<> invoke_on_all(Func&& func) noexcept {
429 return invoke_on_all(smp_submit_to_options{}, std::forward<Func>(func));
430 }
440 template<typename Func>
441 requires std::is_nothrow_move_constructible_v<Func> &&
442 std::is_nothrow_copy_constructible_v<Func>
443 static future<> invoke_on_others(unsigned cpu_id, smp_submit_to_options options, Func func) noexcept {
444 static_assert(std::is_same_v<future<>, typename futurize<std::invoke_result_t<Func>>::type>, "bad Func signature");
445 static_assert(std::is_nothrow_move_constructible_v<Func>);
446 return parallel_for_each(all_cpus(), [cpu_id, options, func = std::move(func)] (unsigned id) {
447 return id != cpu_id ? smp::submit_to(id, options, Func(func)) : make_ready_future<>();
448 });
449 }
460 template<typename Func>
461 requires std::is_nothrow_move_constructible_v<Func>
462 static future<> invoke_on_others(unsigned cpu_id, Func func) noexcept {
463 return invoke_on_others(cpu_id, smp_submit_to_options{}, std::move(func));
464 }
471 template<typename Func>
472 requires std::is_nothrow_move_constructible_v<Func>
473 static future<> invoke_on_others(Func func) noexcept {
474 return invoke_on_others(this_shard_id(), std::move(func));
475 }
476private:
477 void start_all_queues();
478 void pin(unsigned cpu_id);
479 void allocate_reactor(unsigned id, reactor_backend_selector rbs, reactor_config cfg);
480 void create_thread(std::function<void ()> thread_loop);
481 unsigned adjust_max_networking_aio_io_control_blocks(unsigned network_iocbs, unsigned reserve_iocbs);
482 static void log_aiocbs(log_level level, unsigned storage, unsigned preempt, unsigned network);
483public:
484 static unsigned count;
485};
486
487SEASTAR_MODULE_EXPORT_END
488
489}
Definition: alien.hh:119
Counted resource guard.
Definition: semaphore.hh:154
A representation of a possibly not-yet-computed value.
Definition: future.hh:1240
futurize_t< FuncResult > then_wrapped(Func &&func) &noexcept
Schedule a block of code to run when the future is ready, allowing for exception handling.
Definition: future.hh:1525
Low-resolution and efficient steady clock.
Definition: lowres_clock.hh:56
holds the metric definition.
Definition: metrics_registration.hh:94
Definition: queue.hh:44
Definition: reactor.hh:146
Definition: semaphore.hh:511
Definition: smp.hh:186
Definition: smp.hh:111
friend future< smp_service_group > create_smp_service_group(smp_service_group_config ssgc) noexcept
friend smp_service_group default_smp_service_group() noexcept
Definition: smp.hh:154
friend future destroy_smp_service_group(smp_service_group) noexcept
Definition: smp.hh:314
static future invoke_on_others(unsigned cpu_id, smp_submit_to_options options, Func func) noexcept
Definition: smp.hh:443
static future invoke_on_others(Func func) noexcept
Definition: smp.hh:473
static future invoke_on_others(unsigned cpu_id, Func func) noexcept
Definition: smp.hh:462
static futurize_t< std::invoke_result_t< Func > > submit_to(unsigned t, smp_submit_to_options options, Func &&func) noexcept
Definition: smp.hh:354
static future invoke_on_all(Func &&func) noexcept
Definition: smp.hh:428
static future invoke_on_all(smp_submit_to_options options, Func &&func) noexcept
Definition: smp.hh:411
static futurize_t< std::invoke_result_t< Func > > submit_to(unsigned t, Func &&func) noexcept
Definition: smp.hh:393
Definition: task.hh:34
future< T > make_exception_future(std::exception_ptr &&value) noexcept
Creates a future in an available, failed state.
Definition: future.hh:1949
future parallel_for_each(Iterator begin, Sentinel end, Func &&func) noexcept
Run tasks in parallel (iterator version).
Definition: loop.hh:568
log_level
log level used with
Definition: log.hh:55
holds the metric_groups definition needed by class that reports metrics
Definition: critical_alloc_section.hh:80
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
std::optional< sstring > group_name
Definition: smp.hh:88
scheduling_group current_scheduling_group() noexcept
Returns the current scheduling group.
Definition: scheduling.hh:397
shard_id this_shard_id() noexcept
Returns shard_id of the of the current shard.
Definition: shard_id.hh:52
unsigned max_nonlocal_requests
Definition: smp.hh:83
smp_service_group default_smp_service_group() noexcept
Definition: smp.hh:154
future destroy_smp_service_group(smp_service_group ssg) noexcept
future< smp_service_group > create_smp_service_group(smp_service_group_config ssgc) noexcept
Converts a type to a future type, if it isn't already.
Definition: future.hh:1853
static type invoke(Func &&func, FuncArgs &&... args) noexcept
Check whether a type is a future.
Definition: future.hh:1032
Configuration for the reactor.
Definition: reactor_config.hh:53
Definition: resource.hh:129
Configuration for the multicore aspect of seastar.
Definition: smp_options.hh:47
Options controlling the behaviour of smp::submit_to().
Definition: smp.hh:168
smp_service_group service_group
Controls resource allocation.
Definition: smp.hh:170
smp_timeout_clock::time_point timeout
Definition: smp.hh:174