Seastar
High performance C++ framework for concurrent servers
posix.hh
Go to the documentation of this file.
1 /*
2  * This file is open source software, licensed to you under the terms
3  * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4  * distributed with this work for additional information regarding copyright
5  * ownership. You may not use this file except in compliance with the License.
6  *
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an
13  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14  * KIND, either express or implied. See the License for the
15  * specific language governing permissions and limitations
16  * under the License.
17  */
18 /*
19  * Copyright (C) 2016 ScyllaDB
20  */
21 
22 #pragma once
23 
24 #ifndef SEASTAR_MODULE
25 #include <sys/epoll.h>
26 #include <sys/eventfd.h>
27 #include <sys/ioctl.h>
28 #include <sys/mman.h>
29 #include <sys/socket.h>
30 #include <sys/stat.h>
31 #include <sys/timerfd.h>
32 #include <sys/types.h>
33 #include <sys/uio.h>
34 #include <assert.h>
35 #include <fcntl.h>
36 #include <pthread.h>
37 #include <signal.h>
38 #include <spawn.h>
39 #include <unistd.h>
40 #include <utility>
41 #include <system_error>
42 #include <chrono>
43 #include <cstring>
44 #include <functional>
45 #include <memory>
46 #include <set>
47 #include <optional>
48 #endif
49 #include "abort_on_ebadf.hh"
50 #include <seastar/core/sstring.hh>
51 #include <seastar/net/socket_defs.hh>
52 #include <seastar/util/std-compat.hh>
53 #include <seastar/util/modules.hh>
54 
55 namespace seastar {
56 
57 SEASTAR_MODULE_EXPORT_BEGIN
58 
67 
68 inline void throw_system_error_on(bool condition, const char* what_arg = "");
69 
70 template <typename T>
71 inline void throw_kernel_error(T r);
72 
73 template <typename T>
74 inline void throw_pthread_error(T r);
75 
76 struct mmap_deleter {
77  size_t _size;
78  void operator()(void* ptr) const;
79 };
80 
81 using mmap_area = std::unique_ptr<char[], mmap_deleter>;
82 
83 mmap_area mmap_anonymous(void* addr, size_t length, int prot, int flags);
84 
85 class file_desc {
86  int _fd;
87 public:
88  file_desc() = delete;
89  file_desc(const file_desc&) = delete;
90  file_desc(file_desc&& x) noexcept : _fd(x._fd) { x._fd = -1; }
91  ~file_desc() { if (_fd != -1) { ::close(_fd); } }
92  void operator=(const file_desc&) = delete;
93  file_desc& operator=(file_desc&& x) {
94  if (this != &x) {
95  std::swap(_fd, x._fd);
96  if (x._fd != -1) {
97  x.close();
98  }
99  }
100  return *this;
101  }
102  void close() {
103  assert(_fd != -1);
104  auto r = ::close(_fd);
105  throw_system_error_on(r == -1, "close");
106  _fd = -1;
107  }
108  int get() const { return _fd; }
109 
110  sstring fdinfo() const noexcept;
111 
112  static file_desc from_fd(int fd) {
113  return file_desc(fd);
114  }
115 
116  static file_desc open(sstring name, int flags, mode_t mode = 0) {
117  int fd = ::open(name.c_str(), flags, mode);
118  throw_system_error_on(fd == -1, "open");
119  return file_desc(fd);
120  }
121  static file_desc socket(int family, int type, int protocol = 0) {
122  int fd = ::socket(family, type, protocol);
123  throw_system_error_on(fd == -1, "socket");
124  return file_desc(fd);
125  }
126  static file_desc eventfd(unsigned initval, int flags) {
127  int fd = ::eventfd(initval, flags);
128  throw_system_error_on(fd == -1, "eventfd");
129  return file_desc(fd);
130  }
131  static file_desc epoll_create(int flags = 0) {
132  int fd = ::epoll_create1(flags);
133  throw_system_error_on(fd == -1, "epoll_create1");
134  return file_desc(fd);
135  }
136  static file_desc timerfd_create(int clockid, int flags) {
137  int fd = ::timerfd_create(clockid, flags);
138  throw_system_error_on(fd == -1, "timerfd_create");
139  return file_desc(fd);
140  }
141  static file_desc temporary(sstring directory);
142  file_desc dup() const {
143  int fd = ::dup(get());
144  throw_system_error_on(fd == -1, "dup");
145  return file_desc(fd);
146  }
147  file_desc accept(socket_address& sa, int flags = 0) {
148  auto ret = ::accept4(_fd, &sa.as_posix_sockaddr(), &sa.addr_length, flags);
149  throw_system_error_on(ret == -1, "accept4");
150  return file_desc(ret);
151  }
152  static file_desc inotify_init(int flags);
153  // return nullopt if no connection is availbale to be accepted
154  std::optional<file_desc> try_accept(socket_address& sa, int flags = 0) {
155  auto ret = ::accept4(_fd, &sa.as_posix_sockaddr(), &sa.addr_length, flags);
156  if (ret == -1 && errno == EAGAIN) {
157  return {};
158  }
159  throw_system_error_on(ret == -1, "accept4");
160  return file_desc(ret);
161  }
162  void shutdown(int how) {
163  auto ret = ::shutdown(_fd, how);
164  if (ret == -1 && errno != ENOTCONN) {
165  throw_system_error_on(ret == -1, "shutdown");
166  }
167  }
168  void truncate(size_t size) {
169  auto ret = ::ftruncate(_fd, size);
170  throw_system_error_on(ret, "ftruncate");
171  }
172  int ioctl(int request) {
173  return ioctl(request, 0);
174  }
175  int ioctl(int request, int value) {
176  int r = ::ioctl(_fd, request, value);
177  throw_system_error_on(r == -1, "ioctl");
178  return r;
179  }
180  int ioctl(int request, unsigned int value) {
181  int r = ::ioctl(_fd, request, value);
182  throw_system_error_on(r == -1, "ioctl");
183  return r;
184  }
185  template <class X>
186  int ioctl(int request, X& data) {
187  int r = ::ioctl(_fd, request, &data);
188  throw_system_error_on(r == -1, "ioctl");
189  return r;
190  }
191  template <class X>
192  int ioctl(int request, X&& data) {
193  int r = ::ioctl(_fd, request, &data);
194  throw_system_error_on(r == -1, "ioctl");
195  return r;
196  }
197  template <class X>
198  int setsockopt(int level, int optname, X&& data) {
199  int r = ::setsockopt(_fd, level, optname, &data, sizeof(data));
200  throw_system_error_on(r == -1, "setsockopt");
201  return r;
202  }
203  int setsockopt(int level, int optname, const char* data) {
204  int r = ::setsockopt(_fd, level, optname, data, strlen(data) + 1);
205  throw_system_error_on(r == -1, "setsockopt");
206  return r;
207  }
208  int setsockopt(int level, int optname, const void* data, socklen_t len) {
209  int r = ::setsockopt(_fd, level, optname, data, len);
210  throw_system_error_on(r == -1, "setsockopt");
211  return r;
212  }
213  template <typename Data>
214  Data getsockopt(int level, int optname) {
215  Data data;
216  socklen_t len = sizeof(data);
217  memset(&data, 0, len);
218  int r = ::getsockopt(_fd, level, optname, &data, &len);
219  throw_system_error_on(r == -1, "getsockopt");
220  return data;
221  }
222  int getsockopt(int level, int optname, char* data, socklen_t len) {
223  int r = ::getsockopt(_fd, level, optname, data, &len);
224  throw_system_error_on(r == -1, "getsockopt");
225  return r;
226  }
227  size_t size() {
228  struct stat buf;
229  auto r = ::fstat(_fd, &buf);
230  throw_system_error_on(r == -1, "fstat");
231  return buf.st_size;
232  }
233  std::optional<size_t> read(void* buffer, size_t len) {
234  auto r = ::read(_fd, buffer, len);
235  if (r == -1 && errno == EAGAIN) {
236  return {};
237  }
238  throw_system_error_on(r == -1, "read");
239  return { size_t(r) };
240  }
241  std::optional<ssize_t> recv(void* buffer, size_t len, int flags) {
242  auto r = ::recv(_fd, buffer, len, flags);
243  if (r == -1 && errno == EAGAIN) {
244  return {};
245  }
246  throw_system_error_on(r == -1, "recv");
247  return { ssize_t(r) };
248  }
249  std::optional<size_t> recvmsg(msghdr* mh, int flags) {
250  auto r = ::recvmsg(_fd, mh, flags);
251  if (r == -1 && errno == EAGAIN) {
252  return {};
253  }
254  throw_system_error_on(r == -1, "recvmsg");
255  return { size_t(r) };
256  }
257  std::optional<size_t> send(const void* buffer, size_t len, int flags) {
258  auto r = ::send(_fd, buffer, len, flags);
259  if (r == -1 && errno == EAGAIN) {
260  return {};
261  }
262  throw_system_error_on(r == -1, "send");
263  return { size_t(r) };
264  }
265  std::optional<size_t> sendto(socket_address& addr, const void* buf, size_t len, int flags) {
266  auto r = ::sendto(_fd, buf, len, flags, &addr.u.sa, addr.length());
267  if (r == -1 && errno == EAGAIN) {
268  return {};
269  }
270  throw_system_error_on(r == -1, "sendto");
271  return { size_t(r) };
272  }
273  std::optional<size_t> sendmsg(const msghdr* msg, int flags) {
274  auto r = ::sendmsg(_fd, msg, flags);
275  if (r == -1 && errno == EAGAIN) {
276  return {};
277  }
278  throw_system_error_on(r == -1, "sendmsg");
279  return { size_t(r) };
280  }
281  void bind(sockaddr& sa, socklen_t sl) {
282  auto r = ::bind(_fd, &sa, sl);
283  throw_system_error_on(r == -1, "bind");
284  }
285  void connect(sockaddr& sa, socklen_t sl) {
286  auto r = ::connect(_fd, &sa, sl);
287  if (r == -1 && errno == EINPROGRESS) {
288  return;
289  }
290  throw_system_error_on(r == -1, "connect");
291  }
292  socket_address get_address() {
293  socket_address addr;
294  auto r = ::getsockname(_fd, &addr.u.sa, &addr.addr_length);
295  throw_system_error_on(r == -1, "getsockname");
296  return addr;
297  }
298  socket_address get_remote_address() {
299  socket_address addr;
300  auto r = ::getpeername(_fd, &addr.u.sa, &addr.addr_length);
301  throw_system_error_on(r == -1, "getpeername");
302  return addr;
303  }
304  void listen(int backlog) {
305  auto fd = ::listen(_fd, backlog);
306  throw_system_error_on(fd == -1, "listen");
307  }
308  std::optional<size_t> write(const void* buf, size_t len) {
309  auto r = ::write(_fd, buf, len);
310  if (r == -1 && errno == EAGAIN) {
311  return {};
312  }
313  throw_system_error_on(r == -1, "write");
314  return { size_t(r) };
315  }
316  std::optional<size_t> writev(const iovec *iov, int iovcnt) {
317  auto r = ::writev(_fd, iov, iovcnt);
318  if (r == -1 && errno == EAGAIN) {
319  return {};
320  }
321  throw_system_error_on(r == -1, "writev");
322  return { size_t(r) };
323  }
324  size_t pread(void* buf, size_t len, off_t off) {
325  auto r = ::pread(_fd, buf, len, off);
326  throw_system_error_on(r == -1, "pread");
327  return size_t(r);
328  }
329  void timerfd_settime(int flags, const itimerspec& its) {
330  auto r = ::timerfd_settime(_fd, flags, &its, NULL);
331  throw_system_error_on(r == -1, "timerfd_settime");
332  }
333 
334  mmap_area map(size_t size, unsigned prot, unsigned flags, size_t offset,
335  void* addr = nullptr) {
336  void *x = mmap(addr, size, prot, flags, _fd, offset);
337  throw_system_error_on(x == MAP_FAILED, "mmap");
338  return mmap_area(static_cast<char*>(x), mmap_deleter{size});
339  }
340 
341  mmap_area map_shared_rw(size_t size, size_t offset) {
342  return map(size, PROT_READ | PROT_WRITE, MAP_SHARED, offset);
343  }
344 
345  mmap_area map_shared_ro(size_t size, size_t offset) {
346  return map(size, PROT_READ, MAP_SHARED, offset);
347  }
348 
349  mmap_area map_private_rw(size_t size, size_t offset) {
350  return map(size, PROT_READ | PROT_WRITE, MAP_PRIVATE, offset);
351  }
352 
353  mmap_area map_private_ro(size_t size, size_t offset) {
354  return map(size, PROT_READ, MAP_PRIVATE, offset);
355  }
356 
357  void spawn_actions_add_close(posix_spawn_file_actions_t* actions) {
358  auto r = ::posix_spawn_file_actions_addclose(actions, _fd);
359  throw_pthread_error(r);
360  }
361 
362  void spawn_actions_add_dup2(posix_spawn_file_actions_t* actions, int newfd) {
363  auto r = ::posix_spawn_file_actions_adddup2(actions, _fd, newfd);
364  throw_pthread_error(r);
365  }
366 private:
367  file_desc(int fd) : _fd(fd) {}
368  };
369 
370 
371 namespace posix {
372 
373 constexpr unsigned rcv_shutdown = 0x1;
374 constexpr unsigned snd_shutdown = 0x2;
375 inline constexpr unsigned shutdown_mask(int how) { return how + 1; }
376 
381 template <typename Rep, typename Period>
382 struct timespec
383 to_timespec(std::chrono::duration<Rep, Period> d) {
384  auto ns = std::chrono::duration_cast<std::chrono::nanoseconds>(d).count();
385  struct timespec ts {};
386  ts.tv_sec = ns / 1000000000;
387  ts.tv_nsec = ns % 1000000000;
388  return ts;
389 }
390 
396 template <typename Rep1, typename Period1, typename Rep2, typename Period2>
397 struct itimerspec
398 to_relative_itimerspec(std::chrono::duration<Rep1, Period1> base, std::chrono::duration<Rep2, Period2> interval) {
399  struct itimerspec its {};
400  its.it_interval = to_timespec(interval);
401  its.it_value = to_timespec(base);
402  return its;
403 }
404 
405 
411 template <typename Clock, class Duration, class Rep, class Period>
412 struct itimerspec
413 to_absolute_itimerspec(std::chrono::time_point<Clock, Duration> base, std::chrono::duration<Rep, Period> interval) {
414  return to_relative_itimerspec(base.time_since_epoch(), interval);
415 }
416 
417 }
418 
420 public:
421  class attr;
422 private:
423  // must allocate, since this class is moveable
424  std::unique_ptr<std::function<void ()>> _func;
425  pthread_t _pthread;
426  bool _valid = true;
427  mmap_area _stack;
428 private:
429  static void* start_routine(void* arg) noexcept;
430 public:
431  posix_thread(std::function<void ()> func);
432  posix_thread(attr a, std::function<void ()> func);
434  ~posix_thread();
435  void join();
436 public:
437  class attr {
438  public:
439  struct stack_size { size_t size = 0; };
440  attr() = default;
441  template <typename... A>
442  attr(A... a) {
443  set(std::forward<A>(a)...);
444  }
445  void set() {}
446  template <typename A, typename... Rest>
447  void set(A a, Rest... rest) {
448  set(std::forward<A>(a));
449  set(std::forward<Rest>(rest)...);
450  }
451  void set(stack_size ss) { _stack_size = ss; }
452  void set(cpu_set_t affinity) { _affinity = affinity; }
453  private:
454  stack_size _stack_size;
455  std::optional<cpu_set_t> _affinity;
456  friend class posix_thread;
457  };
458 };
459 
460 
461 inline
462 void throw_system_error_on(bool condition, const char* what_arg) {
463  if (condition) {
464  if ((errno == EBADF || errno == ENOTSOCK) && is_abort_on_ebadf_enabled()) {
465  abort();
466  }
467  throw std::system_error(errno, std::system_category(), what_arg);
468  }
469 }
470 
471 template <typename T>
472 inline
473 void throw_kernel_error(T r) {
474  static_assert(std::is_signed_v<T>, "kernel error variables must be signed");
475  if (r < 0) {
476  auto ec = -r;
477  if ((ec == EBADF || ec == ENOTSOCK) && is_abort_on_ebadf_enabled()) {
478  abort();
479  }
480  throw std::system_error(-r, std::system_category());
481  }
482 }
483 
484 template <typename T>
485 inline
486 void throw_pthread_error(T r) {
487  if (r != 0) {
488  throw std::system_error(r, std::system_category());
489  }
490 }
491 
492 inline
493 sigset_t make_sigset_mask(int signo) {
494  sigset_t set;
495  sigemptyset(&set);
496  sigaddset(&set, signo);
497  return set;
498 }
499 
500 inline
501 sigset_t make_full_sigset_mask() {
502  sigset_t set;
503  sigfillset(&set);
504  return set;
505 }
506 
507 inline
508 sigset_t make_empty_sigset_mask() {
509  sigset_t set;
510  sigemptyset(&set);
511  return set;
512 }
513 
514 inline
515 void pin_this_thread(unsigned cpu_id) {
516  cpu_set_t cs;
517  CPU_ZERO(&cs);
518  CPU_SET(cpu_id, &cs);
519  auto r = pthread_setaffinity_np(pthread_self(), sizeof(cs), &cs);
520  assert(r == 0);
521  (void)r;
522 }
523 
524 std::set<unsigned> get_current_cpuset();
525 
527 
528 SEASTAR_MODULE_EXPORT_END
529 }
Definition: posix.hh:85
Definition: posix.hh:437
Definition: posix.hh:419
Definition: socket_defs.hh:47
union seastar::socket_address::@14 u
!< actual size of the relevant 'u' member
Definition: api.hh:282
server_socket listen(socket_address sa)
future< connected_socket > connect(socket_address sa)
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
bool is_abort_on_ebadf_enabled()
struct itimerspec to_relative_itimerspec(std::chrono::duration< Rep1, Period1 > base, std::chrono::duration< Rep2, Period2 > interval)
Definition: posix.hh:398
struct itimerspec to_absolute_itimerspec(std::chrono::time_point< Clock, Duration > base, std::chrono::duration< Rep, Period > interval)
Definition: posix.hh:413
struct timespec to_timespec(std::chrono::duration< Rep, Period > d)
Definition: posix.hh:383
Definition: posix.hh:76