Seastar
High performance C++ framework for concurrent servers
posix.hh
Go to the documentation of this file.
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18/*
19 * Copyright (C) 2016 ScyllaDB
20 */
21
22#pragma once
23
24#ifndef SEASTAR_MODULE
25#include <sys/epoll.h>
26#include <sys/eventfd.h>
27#include <sys/ioctl.h>
28#include <sys/mman.h>
29#include <sys/socket.h>
30#include <sys/stat.h>
31#include <sys/timerfd.h>
32#include <sys/types.h>
33#include <sys/uio.h>
34#include <assert.h>
35#include <fcntl.h>
36#include <pthread.h>
37#include <signal.h>
38#include <spawn.h>
39#include <unistd.h>
40#include <utility>
41#include <system_error>
42#include <chrono>
43#include <cstring>
44#include <functional>
45#include <memory>
46#include <set>
47#include <optional>
48#endif
49#include "abort_on_ebadf.hh"
50#include <seastar/core/sstring.hh>
51#include <seastar/net/socket_defs.hh>
52#include <seastar/util/std-compat.hh>
53#include <seastar/util/modules.hh>
54
55namespace seastar {
56
57SEASTAR_MODULE_EXPORT_BEGIN
58
67
68inline void throw_system_error_on(bool condition, const char* what_arg = "");
69
70template <typename T>
71inline void throw_kernel_error(T r);
72
73template <typename T>
74inline void throw_pthread_error(T r);
75
77 size_t _size;
78 void operator()(void* ptr) const;
79};
80
81using mmap_area = std::unique_ptr<char[], mmap_deleter>;
82
83mmap_area mmap_anonymous(void* addr, size_t length, int prot, int flags);
84
85class file_desc {
86 int _fd;
87public:
88 file_desc() = delete;
89 file_desc(const file_desc&) = delete;
90 file_desc(file_desc&& x) noexcept : _fd(x._fd) { x._fd = -1; }
91 ~file_desc() { if (_fd != -1) { ::close(_fd); } }
92 void operator=(const file_desc&) = delete;
93 file_desc& operator=(file_desc&& x) {
94 if (this != &x) {
95 std::swap(_fd, x._fd);
96 if (x._fd != -1) {
97 x.close();
98 }
99 }
100 return *this;
101 }
102 void close() {
103 assert(_fd != -1);
104 auto r = ::close(_fd);
105 throw_system_error_on(r == -1, "close");
106 _fd = -1;
107 }
108 int get() const { return _fd; }
109
110 sstring fdinfo() const noexcept;
111
112 static file_desc from_fd(int fd) {
113 return file_desc(fd);
114 }
115
116 static file_desc open(sstring name, int flags, mode_t mode = 0) {
117 int fd = ::open(name.c_str(), flags, mode);
118 throw_system_error_on(fd == -1, "open");
119 return file_desc(fd);
120 }
121 static file_desc socket(int family, int type, int protocol = 0) {
122 int fd = ::socket(family, type, protocol);
123 throw_system_error_on(fd == -1, "socket");
124 return file_desc(fd);
125 }
126 static file_desc eventfd(unsigned initval, int flags) {
127 int fd = ::eventfd(initval, flags);
128 throw_system_error_on(fd == -1, "eventfd");
129 return file_desc(fd);
130 }
131 static file_desc epoll_create(int flags = 0) {
132 int fd = ::epoll_create1(flags);
133 throw_system_error_on(fd == -1, "epoll_create1");
134 return file_desc(fd);
135 }
136 static file_desc timerfd_create(int clockid, int flags) {
137 int fd = ::timerfd_create(clockid, flags);
138 throw_system_error_on(fd == -1, "timerfd_create");
139 return file_desc(fd);
140 }
141 static file_desc temporary(sstring directory);
142 file_desc dup() const {
143 int fd = ::dup(get());
144 throw_system_error_on(fd == -1, "dup");
145 return file_desc(fd);
146 }
147 file_desc accept(socket_address& sa, int flags = 0) {
148 auto ret = ::accept4(_fd, &sa.as_posix_sockaddr(), &sa.addr_length, flags);
149 throw_system_error_on(ret == -1, "accept4");
150 return file_desc(ret);
151 }
152 static file_desc inotify_init(int flags);
153 // return nullopt if no connection is availbale to be accepted
154 std::optional<file_desc> try_accept(socket_address& sa, int flags = 0) {
155 auto ret = ::accept4(_fd, &sa.as_posix_sockaddr(), &sa.addr_length, flags);
156 if (ret == -1 && errno == EAGAIN) {
157 return {};
158 }
159 throw_system_error_on(ret == -1, "accept4");
160 return file_desc(ret);
161 }
162 void shutdown(int how) {
163 auto ret = ::shutdown(_fd, how);
164 if (ret == -1 && errno != ENOTCONN) {
165 throw_system_error_on(ret == -1, "shutdown");
166 }
167 }
168 void truncate(size_t size) {
169 auto ret = ::ftruncate(_fd, size);
170 throw_system_error_on(ret, "ftruncate");
171 }
172 int ioctl(int request) {
173 return ioctl(request, 0);
174 }
175 int ioctl(int request, int value) {
176 int r = ::ioctl(_fd, request, value);
177 throw_system_error_on(r == -1, "ioctl");
178 return r;
179 }
180 int ioctl(int request, unsigned int value) {
181 int r = ::ioctl(_fd, request, value);
182 throw_system_error_on(r == -1, "ioctl");
183 return r;
184 }
185 template <class X>
186 int ioctl(int request, X& data) {
187 int r = ::ioctl(_fd, request, &data);
188 throw_system_error_on(r == -1, "ioctl");
189 return r;
190 }
191 template <class X>
192 int ioctl(int request, X&& data) {
193 int r = ::ioctl(_fd, request, &data);
194 throw_system_error_on(r == -1, "ioctl");
195 return r;
196 }
197 template <class X>
198 int setsockopt(int level, int optname, X&& data) {
199 int r = ::setsockopt(_fd, level, optname, &data, sizeof(data));
200 throw_system_error_on(r == -1, "setsockopt");
201 return r;
202 }
203 int setsockopt(int level, int optname, const char* data) {
204 int r = ::setsockopt(_fd, level, optname, data, strlen(data) + 1);
205 throw_system_error_on(r == -1, "setsockopt");
206 return r;
207 }
208 int setsockopt(int level, int optname, const void* data, socklen_t len) {
209 int r = ::setsockopt(_fd, level, optname, data, len);
210 throw_system_error_on(r == -1, "setsockopt");
211 return r;
212 }
213 template <typename Data>
214 Data getsockopt(int level, int optname) {
215 Data data;
216 socklen_t len = sizeof(data);
217 memset(&data, 0, len);
218 int r = ::getsockopt(_fd, level, optname, &data, &len);
219 throw_system_error_on(r == -1, "getsockopt");
220 return data;
221 }
222 int getsockopt(int level, int optname, char* data, socklen_t len) {
223 int r = ::getsockopt(_fd, level, optname, data, &len);
224 throw_system_error_on(r == -1, "getsockopt");
225 return r;
226 }
227 size_t size() {
228 struct stat buf;
229 auto r = ::fstat(_fd, &buf);
230 throw_system_error_on(r == -1, "fstat");
231 return buf.st_size;
232 }
233 std::optional<size_t> read(void* buffer, size_t len) {
234 auto r = ::read(_fd, buffer, len);
235 if (r == -1 && errno == EAGAIN) {
236 return {};
237 }
238 throw_system_error_on(r == -1, "read");
239 return { size_t(r) };
240 }
241 std::optional<ssize_t> recv(void* buffer, size_t len, int flags) {
242 auto r = ::recv(_fd, buffer, len, flags);
243 if (r == -1 && errno == EAGAIN) {
244 return {};
245 }
246 throw_system_error_on(r == -1, "recv");
247 return { ssize_t(r) };
248 }
249 std::optional<size_t> recvmsg(msghdr* mh, int flags) {
250 auto r = ::recvmsg(_fd, mh, flags);
251 if (r == -1 && errno == EAGAIN) {
252 return {};
253 }
254 throw_system_error_on(r == -1, "recvmsg");
255 return { size_t(r) };
256 }
257 std::optional<size_t> send(const void* buffer, size_t len, int flags) {
258 auto r = ::send(_fd, buffer, len, flags);
259 if (r == -1 && errno == EAGAIN) {
260 return {};
261 }
262 throw_system_error_on(r == -1, "send");
263 return { size_t(r) };
264 }
265 std::optional<size_t> sendto(socket_address& addr, const void* buf, size_t len, int flags) {
266 auto r = ::sendto(_fd, buf, len, flags, &addr.u.sa, addr.length());
267 if (r == -1 && errno == EAGAIN) {
268 return {};
269 }
270 throw_system_error_on(r == -1, "sendto");
271 return { size_t(r) };
272 }
273 std::optional<size_t> sendmsg(const msghdr* msg, int flags) {
274 auto r = ::sendmsg(_fd, msg, flags);
275 if (r == -1 && errno == EAGAIN) {
276 return {};
277 }
278 throw_system_error_on(r == -1, "sendmsg");
279 return { size_t(r) };
280 }
281 void bind(sockaddr& sa, socklen_t sl) {
282 auto r = ::bind(_fd, &sa, sl);
283 throw_system_error_on(r == -1, "bind");
284 }
285 void connect(sockaddr& sa, socklen_t sl) {
286 auto r = ::connect(_fd, &sa, sl);
287 if (r == -1 && errno == EINPROGRESS) {
288 return;
289 }
290 throw_system_error_on(r == -1, "connect");
291 }
292 socket_address get_address() {
293 socket_address addr;
294 auto r = ::getsockname(_fd, &addr.u.sa, &addr.addr_length);
295 throw_system_error_on(r == -1, "getsockname");
296 return addr;
297 }
298 socket_address get_remote_address() {
299 socket_address addr;
300 auto r = ::getpeername(_fd, &addr.u.sa, &addr.addr_length);
301 throw_system_error_on(r == -1, "getpeername");
302 return addr;
303 }
304 void listen(int backlog) {
305 auto fd = ::listen(_fd, backlog);
306 throw_system_error_on(fd == -1, "listen");
307 }
308 std::optional<size_t> write(const void* buf, size_t len) {
309 auto r = ::write(_fd, buf, len);
310 if (r == -1 && errno == EAGAIN) {
311 return {};
312 }
313 throw_system_error_on(r == -1, "write");
314 return { size_t(r) };
315 }
316 std::optional<size_t> writev(const iovec *iov, int iovcnt) {
317 auto r = ::writev(_fd, iov, iovcnt);
318 if (r == -1 && errno == EAGAIN) {
319 return {};
320 }
321 throw_system_error_on(r == -1, "writev");
322 return { size_t(r) };
323 }
324 size_t pread(void* buf, size_t len, off_t off) {
325 auto r = ::pread(_fd, buf, len, off);
326 throw_system_error_on(r == -1, "pread");
327 return size_t(r);
328 }
329 void timerfd_settime(int flags, const itimerspec& its) {
330 auto r = ::timerfd_settime(_fd, flags, &its, NULL);
331 throw_system_error_on(r == -1, "timerfd_settime");
332 }
333
334 mmap_area map(size_t size, unsigned prot, unsigned flags, size_t offset,
335 void* addr = nullptr) {
336 void *x = mmap(addr, size, prot, flags, _fd, offset);
337 throw_system_error_on(x == MAP_FAILED, "mmap");
338 return mmap_area(static_cast<char*>(x), mmap_deleter{size});
339 }
340
341 mmap_area map_shared_rw(size_t size, size_t offset) {
342 return map(size, PROT_READ | PROT_WRITE, MAP_SHARED, offset);
343 }
344
345 mmap_area map_shared_ro(size_t size, size_t offset) {
346 return map(size, PROT_READ, MAP_SHARED, offset);
347 }
348
349 mmap_area map_private_rw(size_t size, size_t offset) {
350 return map(size, PROT_READ | PROT_WRITE, MAP_PRIVATE, offset);
351 }
352
353 mmap_area map_private_ro(size_t size, size_t offset) {
354 return map(size, PROT_READ, MAP_PRIVATE, offset);
355 }
356
357 void spawn_actions_add_close(posix_spawn_file_actions_t* actions) {
358 auto r = ::posix_spawn_file_actions_addclose(actions, _fd);
359 throw_pthread_error(r);
360 }
361
362 void spawn_actions_add_dup2(posix_spawn_file_actions_t* actions, int newfd) {
363 auto r = ::posix_spawn_file_actions_adddup2(actions, _fd, newfd);
364 throw_pthread_error(r);
365 }
366private:
367 file_desc(int fd) : _fd(fd) {}
368 };
369
370
371namespace posix {
372
373constexpr unsigned rcv_shutdown = 0x1;
374constexpr unsigned snd_shutdown = 0x2;
375inline constexpr unsigned shutdown_mask(int how) { return how + 1; }
376
381template <typename Rep, typename Period>
382struct timespec
383to_timespec(std::chrono::duration<Rep, Period> d) {
384 auto ns = std::chrono::duration_cast<std::chrono::nanoseconds>(d).count();
385 struct timespec ts {};
386 ts.tv_sec = ns / 1000000000;
387 ts.tv_nsec = ns % 1000000000;
388 return ts;
389}
390
396template <typename Rep1, typename Period1, typename Rep2, typename Period2>
397struct itimerspec
398to_relative_itimerspec(std::chrono::duration<Rep1, Period1> base, std::chrono::duration<Rep2, Period2> interval) {
399 struct itimerspec its {};
400 its.it_interval = to_timespec(interval);
401 its.it_value = to_timespec(base);
402 return its;
403}
404
405
411template <typename Clock, class Duration, class Rep, class Period>
412struct itimerspec
413to_absolute_itimerspec(std::chrono::time_point<Clock, Duration> base, std::chrono::duration<Rep, Period> interval) {
414 return to_relative_itimerspec(base.time_since_epoch(), interval);
415}
416
417}
418
420public:
421 class attr;
422private:
423 // must allocate, since this class is moveable
424 std::unique_ptr<std::function<void ()>> _func;
425 pthread_t _pthread;
426 bool _valid = true;
427 mmap_area _stack;
428private:
429 static void* start_routine(void* arg) noexcept;
430public:
431 posix_thread(std::function<void ()> func);
432 posix_thread(attr a, std::function<void ()> func);
435 void join();
436public:
437 class attr {
438 public:
439 struct stack_size { size_t size = 0; };
440 attr() = default;
441 template <typename... A>
442 attr(A... a) {
443 set(std::forward<A>(a)...);
444 }
445 void set() {}
446 template <typename A, typename... Rest>
447 void set(A a, Rest... rest) {
448 set(std::forward<A>(a));
449 set(std::forward<Rest>(rest)...);
450 }
451 void set(stack_size ss) { _stack_size = ss; }
452 void set(cpu_set_t affinity) { _affinity = affinity; }
453 private:
454 stack_size _stack_size;
455 std::optional<cpu_set_t> _affinity;
456 friend class posix_thread;
457 };
458};
459
460
461inline
462void throw_system_error_on(bool condition, const char* what_arg) {
463 if (condition) {
464 if ((errno == EBADF || errno == ENOTSOCK) && is_abort_on_ebadf_enabled()) {
465 abort();
466 }
467 throw std::system_error(errno, std::system_category(), what_arg);
468 }
469}
470
471template <typename T>
472inline
473void throw_kernel_error(T r) {
474 static_assert(std::is_signed_v<T>, "kernel error variables must be signed");
475 if (r < 0) {
476 auto ec = -r;
477 if ((ec == EBADF || ec == ENOTSOCK) && is_abort_on_ebadf_enabled()) {
478 abort();
479 }
480 throw std::system_error(-r, std::system_category());
481 }
482}
483
484template <typename T>
485inline
486void throw_pthread_error(T r) {
487 if (r != 0) {
488 throw std::system_error(r, std::system_category());
489 }
490}
491
492inline
493sigset_t make_sigset_mask(int signo) {
494 sigset_t set;
495 sigemptyset(&set);
496 sigaddset(&set, signo);
497 return set;
498}
499
500inline
501sigset_t make_full_sigset_mask() {
502 sigset_t set;
503 sigfillset(&set);
504 return set;
505}
506
507inline
508sigset_t make_empty_sigset_mask() {
509 sigset_t set;
510 sigemptyset(&set);
511 return set;
512}
513
514inline
515void pin_this_thread(unsigned cpu_id) {
516 cpu_set_t cs;
517 CPU_ZERO(&cs);
518 CPU_SET(cpu_id, &cs);
519 auto r = pthread_setaffinity_np(pthread_self(), sizeof(cs), &cs);
520 assert(r == 0);
521 (void)r;
522}
523
524std::set<unsigned> get_current_cpuset();
525
527
528SEASTAR_MODULE_EXPORT_END
529}
Definition: posix.hh:85
Definition: posix.hh:437
Definition: posix.hh:419
Definition: socket_defs.hh:47
union seastar::socket_address::@14 u
!< actual size of the relevant 'u' member
Definition: api.hh:283
server_socket listen(socket_address sa)
future< connected_socket > connect(socket_address sa)
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
bool is_abort_on_ebadf_enabled()
STL namespace.
struct itimerspec to_relative_itimerspec(std::chrono::duration< Rep1, Period1 > base, std::chrono::duration< Rep2, Period2 > interval)
Definition: posix.hh:398
struct itimerspec to_absolute_itimerspec(std::chrono::time_point< Clock, Duration > base, std::chrono::duration< Rep, Period > interval)
Definition: posix.hh:413
struct timespec to_timespec(std::chrono::duration< Rep, Period > d)
Definition: posix.hh:383
Definition: posix.hh:76