Seastar
High performance C++ framework for concurrent servers
posix.hh
Go to the documentation of this file.
1 /*
2  * This file is open source software, licensed to you under the terms
3  * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4  * distributed with this work for additional information regarding copyright
5  * ownership. You may not use this file except in compliance with the License.
6  *
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an
13  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14  * KIND, either express or implied. See the License for the
15  * specific language governing permissions and limitations
16  * under the License.
17  */
18 /*
19  * Copyright (C) 2016 ScyllaDB
20  */
21 
22 #pragma once
23 
24 #include <seastar/core/sstring.hh>
25 #include "abort_on_ebadf.hh"
26 #include <sys/types.h>
27 #include <sys/stat.h>
28 #include <unistd.h>
29 #include <assert.h>
30 #include <utility>
31 #include <fcntl.h>
32 #include <sys/ioctl.h>
33 #include <sys/eventfd.h>
34 #include <sys/timerfd.h>
35 #include <sys/socket.h>
36 #include <sys/epoll.h>
37 #include <sys/mman.h>
38 #include <signal.h>
39 #include <system_error>
40 #include <pthread.h>
41 #include <signal.h>
42 #include <memory>
43 #include <chrono>
44 #include <sys/uio.h>
45 
46 #include <seastar/net/socket_defs.hh>
47 #include <seastar/util/std-compat.hh>
48 
49 namespace seastar {
50 
59 
60 inline void throw_system_error_on(bool condition, const char* what_arg = "");
61 
62 template <typename T>
63 inline void throw_kernel_error(T r);
64 
65 struct mmap_deleter {
66  size_t _size;
67  void operator()(void* ptr) const;
68 };
69 
70 using mmap_area = std::unique_ptr<char[], mmap_deleter>;
71 
72 mmap_area mmap_anonymous(void* addr, size_t length, int prot, int flags);
73 
74 class file_desc {
75  int _fd;
76 public:
77  file_desc() = delete;
78  file_desc(const file_desc&) = delete;
79  file_desc(file_desc&& x) noexcept : _fd(x._fd) { x._fd = -1; }
80  ~file_desc() { if (_fd != -1) { ::close(_fd); } }
81  void operator=(const file_desc&) = delete;
82  file_desc& operator=(file_desc&& x) {
83  if (this != &x) {
84  std::swap(_fd, x._fd);
85  if (x._fd != -1) {
86  x.close();
87  }
88  }
89  return *this;
90  }
91  void close() {
92  assert(_fd != -1);
93  auto r = ::close(_fd);
94  throw_system_error_on(r == -1, "close");
95  _fd = -1;
96  }
97  int get() const { return _fd; }
98 
99  static file_desc from_fd(int fd) {
100  return file_desc(fd);
101  }
102 
103  static file_desc open(sstring name, int flags, mode_t mode = 0) {
104  int fd = ::open(name.c_str(), flags, mode);
105  throw_system_error_on(fd == -1, "open");
106  return file_desc(fd);
107  }
108  static file_desc socket(int family, int type, int protocol = 0) {
109  int fd = ::socket(family, type, protocol);
110  throw_system_error_on(fd == -1, "socket");
111  return file_desc(fd);
112  }
113  static file_desc eventfd(unsigned initval, int flags) {
114  int fd = ::eventfd(initval, flags);
115  throw_system_error_on(fd == -1, "eventfd");
116  return file_desc(fd);
117  }
118  static file_desc epoll_create(int flags = 0) {
119  int fd = ::epoll_create1(flags);
120  throw_system_error_on(fd == -1, "epoll_create1");
121  return file_desc(fd);
122  }
123  static file_desc timerfd_create(int clockid, int flags) {
124  int fd = ::timerfd_create(clockid, flags);
125  throw_system_error_on(fd == -1, "timerfd_create");
126  return file_desc(fd);
127  }
128  static file_desc temporary(sstring directory);
129  file_desc dup() const {
130  int fd = ::dup(get());
131  throw_system_error_on(fd == -1, "dup");
132  return file_desc(fd);
133  }
134  file_desc accept(socket_address& sa, int flags = 0) {
135  auto ret = ::accept4(_fd, &sa.as_posix_sockaddr(), &sa.addr_length, flags);
136  throw_system_error_on(ret == -1, "accept4");
137  return file_desc(ret);
138  }
139  static file_desc inotify_init(int flags);
140  // return nullopt if no connection is availbale to be accepted
141  std::optional<file_desc> try_accept(socket_address& sa, int flags = 0) {
142  auto ret = ::accept4(_fd, &sa.as_posix_sockaddr(), &sa.addr_length, flags);
143  if (ret == -1 && errno == EAGAIN) {
144  return {};
145  }
146  throw_system_error_on(ret == -1, "accept4");
147  return file_desc(ret);
148  }
149  void shutdown(int how) {
150  auto ret = ::shutdown(_fd, how);
151  if (ret == -1 && errno != ENOTCONN) {
152  throw_system_error_on(ret == -1, "shutdown");
153  }
154  }
155  void truncate(size_t size) {
156  auto ret = ::ftruncate(_fd, size);
157  throw_system_error_on(ret, "ftruncate");
158  }
159  int ioctl(int request) {
160  return ioctl(request, 0);
161  }
162  int ioctl(int request, int value) {
163  int r = ::ioctl(_fd, request, value);
164  throw_system_error_on(r == -1, "ioctl");
165  return r;
166  }
167  int ioctl(int request, unsigned int value) {
168  int r = ::ioctl(_fd, request, value);
169  throw_system_error_on(r == -1, "ioctl");
170  return r;
171  }
172  template <class X>
173  int ioctl(int request, X& data) {
174  int r = ::ioctl(_fd, request, &data);
175  throw_system_error_on(r == -1, "ioctl");
176  return r;
177  }
178  template <class X>
179  int ioctl(int request, X&& data) {
180  int r = ::ioctl(_fd, request, &data);
181  throw_system_error_on(r == -1, "ioctl");
182  return r;
183  }
184  template <class X>
185  int setsockopt(int level, int optname, X&& data) {
186  int r = ::setsockopt(_fd, level, optname, &data, sizeof(data));
187  throw_system_error_on(r == -1, "setsockopt");
188  return r;
189  }
190  int setsockopt(int level, int optname, const char* data) {
191  int r = ::setsockopt(_fd, level, optname, data, strlen(data) + 1);
192  throw_system_error_on(r == -1, "setsockopt");
193  return r;
194  }
195  int setsockopt(int level, int optname, const void* data, socklen_t len) {
196  int r = ::setsockopt(_fd, level, optname, data, len);
197  throw_system_error_on(r == -1, "setsockopt");
198  return r;
199  }
200  template <typename Data>
201  Data getsockopt(int level, int optname) {
202  Data data;
203  socklen_t len = sizeof(data);
204  memset(&data, 0, len);
205  int r = ::getsockopt(_fd, level, optname, &data, &len);
206  throw_system_error_on(r == -1, "getsockopt");
207  return data;
208  }
209  int getsockopt(int level, int optname, char* data, socklen_t len) {
210  int r = ::getsockopt(_fd, level, optname, data, &len);
211  throw_system_error_on(r == -1, "getsockopt");
212  return r;
213  }
214  size_t size() {
215  struct stat buf;
216  auto r = ::fstat(_fd, &buf);
217  throw_system_error_on(r == -1, "fstat");
218  return buf.st_size;
219  }
220  std::optional<size_t> read(void* buffer, size_t len) {
221  auto r = ::read(_fd, buffer, len);
222  if (r == -1 && errno == EAGAIN) {
223  return {};
224  }
225  throw_system_error_on(r == -1, "read");
226  return { size_t(r) };
227  }
228  std::optional<ssize_t> recv(void* buffer, size_t len, int flags) {
229  auto r = ::recv(_fd, buffer, len, flags);
230  if (r == -1 && errno == EAGAIN) {
231  return {};
232  }
233  throw_system_error_on(r == -1, "recv");
234  return { ssize_t(r) };
235  }
236  std::optional<size_t> recvmsg(msghdr* mh, int flags) {
237  auto r = ::recvmsg(_fd, mh, flags);
238  if (r == -1 && errno == EAGAIN) {
239  return {};
240  }
241  throw_system_error_on(r == -1, "recvmsg");
242  return { size_t(r) };
243  }
244  std::optional<size_t> send(const void* buffer, size_t len, int flags) {
245  auto r = ::send(_fd, buffer, len, flags);
246  if (r == -1 && errno == EAGAIN) {
247  return {};
248  }
249  throw_system_error_on(r == -1, "send");
250  return { size_t(r) };
251  }
252  std::optional<size_t> sendto(socket_address& addr, const void* buf, size_t len, int flags) {
253  auto r = ::sendto(_fd, buf, len, flags, &addr.u.sa, addr.length());
254  if (r == -1 && errno == EAGAIN) {
255  return {};
256  }
257  throw_system_error_on(r == -1, "sendto");
258  return { size_t(r) };
259  }
260  std::optional<size_t> sendmsg(const msghdr* msg, int flags) {
261  auto r = ::sendmsg(_fd, msg, flags);
262  if (r == -1 && errno == EAGAIN) {
263  return {};
264  }
265  throw_system_error_on(r == -1, "sendmsg");
266  return { size_t(r) };
267  }
268  void bind(sockaddr& sa, socklen_t sl) {
269  auto r = ::bind(_fd, &sa, sl);
270  throw_system_error_on(r == -1, "bind");
271  }
272  void connect(sockaddr& sa, socklen_t sl) {
273  auto r = ::connect(_fd, &sa, sl);
274  if (r == -1 && errno == EINPROGRESS) {
275  return;
276  }
277  throw_system_error_on(r == -1, "connect");
278  }
279  socket_address get_address() {
280  socket_address addr;
281  auto r = ::getsockname(_fd, &addr.u.sa, &addr.addr_length);
282  throw_system_error_on(r == -1, "getsockname");
283  return addr;
284  }
285  void listen(int backlog) {
286  auto fd = ::listen(_fd, backlog);
287  throw_system_error_on(fd == -1, "listen");
288  }
289  std::optional<size_t> write(const void* buf, size_t len) {
290  auto r = ::write(_fd, buf, len);
291  if (r == -1 && errno == EAGAIN) {
292  return {};
293  }
294  throw_system_error_on(r == -1, "write");
295  return { size_t(r) };
296  }
297  std::optional<size_t> writev(const iovec *iov, int iovcnt) {
298  auto r = ::writev(_fd, iov, iovcnt);
299  if (r == -1 && errno == EAGAIN) {
300  return {};
301  }
302  throw_system_error_on(r == -1, "writev");
303  return { size_t(r) };
304  }
305  size_t pread(void* buf, size_t len, off_t off) {
306  auto r = ::pread(_fd, buf, len, off);
307  throw_system_error_on(r == -1, "pread");
308  return size_t(r);
309  }
310  void timerfd_settime(int flags, const itimerspec& its) {
311  auto r = ::timerfd_settime(_fd, flags, &its, NULL);
312  throw_system_error_on(r == -1, "timerfd_settime");
313  }
314 
315  mmap_area map(size_t size, unsigned prot, unsigned flags, size_t offset,
316  void* addr = nullptr) {
317  void *x = mmap(addr, size, prot, flags, _fd, offset);
318  throw_system_error_on(x == MAP_FAILED, "mmap");
319  return mmap_area(static_cast<char*>(x), mmap_deleter{size});
320  }
321 
322  mmap_area map_shared_rw(size_t size, size_t offset) {
323  return map(size, PROT_READ | PROT_WRITE, MAP_SHARED, offset);
324  }
325 
326  mmap_area map_shared_ro(size_t size, size_t offset) {
327  return map(size, PROT_READ, MAP_SHARED, offset);
328  }
329 
330  mmap_area map_private_rw(size_t size, size_t offset) {
331  return map(size, PROT_READ | PROT_WRITE, MAP_PRIVATE, offset);
332  }
333 
334  mmap_area map_private_ro(size_t size, size_t offset) {
335  return map(size, PROT_READ, MAP_PRIVATE, offset);
336  }
337 
338 private:
339  file_desc(int fd) : _fd(fd) {}
340  };
341 
342 
343 namespace posix {
344 
349 template <typename Rep, typename Period>
350 struct timespec
351 to_timespec(std::chrono::duration<Rep, Period> d) {
352  auto ns = std::chrono::duration_cast<std::chrono::nanoseconds>(d).count();
353  struct timespec ts {};
354  ts.tv_sec = ns / 1000000000;
355  ts.tv_nsec = ns % 1000000000;
356  return ts;
357 }
358 
364 template <typename Rep1, typename Period1, typename Rep2, typename Period2>
365 struct itimerspec
366 to_relative_itimerspec(std::chrono::duration<Rep1, Period1> base, std::chrono::duration<Rep2, Period2> interval) {
367  struct itimerspec its {};
368  its.it_interval = to_timespec(interval);
369  its.it_value = to_timespec(base);
370  return its;
371 }
372 
373 
379 template <typename Clock, class Duration, class Rep, class Period>
380 struct itimerspec
381 to_absolute_itimerspec(std::chrono::time_point<Clock, Duration> base, std::chrono::duration<Rep, Period> interval) {
382  return to_relative_itimerspec(base.time_since_epoch(), interval);
383 }
384 
385 }
386 
388 public:
389  class attr;
390 private:
391  // must allocate, since this class is moveable
392  std::unique_ptr<std::function<void ()>> _func;
393  pthread_t _pthread;
394  bool _valid = true;
395  mmap_area _stack;
396 private:
397  static void* start_routine(void* arg) noexcept;
398 public:
399  posix_thread(std::function<void ()> func);
400  posix_thread(attr a, std::function<void ()> func);
402  ~posix_thread();
403  void join();
404 public:
405  class attr {
406  public:
407  struct stack_size { size_t size = 0; };
408  attr() = default;
409  template <typename... A>
410  attr(A... a) {
411  set(std::forward<A>(a)...);
412  }
413  void set() {}
414  template <typename A, typename... Rest>
415  void set(A a, Rest... rest) {
416  set(std::forward<A>(a));
417  set(std::forward<Rest>(rest)...);
418  }
419  void set(stack_size ss) { _stack_size = ss; }
420  private:
421  stack_size _stack_size;
422  friend class posix_thread;
423  };
424 };
425 
426 
427 inline
428 void throw_system_error_on(bool condition, const char* what_arg) {
429  if (condition) {
430  if ((errno == EBADF || errno == ENOTSOCK) && is_abort_on_ebadf_enabled()) {
431  abort();
432  }
433  throw std::system_error(errno, std::system_category(), what_arg);
434  }
435 }
436 
437 template <typename T>
438 inline
439 void throw_kernel_error(T r) {
440  static_assert(std::is_signed<T>::value, "kernel error variables must be signed");
441  if (r < 0) {
442  auto ec = -r;
443  if ((ec == EBADF || ec == ENOTSOCK) && is_abort_on_ebadf_enabled()) {
444  abort();
445  }
446  throw std::system_error(-r, std::system_category());
447  }
448 }
449 
450 template <typename T>
451 inline
452 void throw_pthread_error(T r) {
453  if (r != 0) {
454  throw std::system_error(r, std::system_category());
455  }
456 }
457 
458 inline
459 sigset_t make_sigset_mask(int signo) {
460  sigset_t set;
461  sigemptyset(&set);
462  sigaddset(&set, signo);
463  return set;
464 }
465 
466 inline
467 sigset_t make_full_sigset_mask() {
468  sigset_t set;
469  sigfillset(&set);
470  return set;
471 }
472 
473 inline
474 sigset_t make_empty_sigset_mask() {
475  sigset_t set;
476  sigemptyset(&set);
477  return set;
478 }
479 
480 inline
481 void pin_this_thread(unsigned cpu_id) {
482  cpu_set_t cs;
483  CPU_ZERO(&cs);
484  CPU_SET(cpu_id, &cs);
485  auto r = pthread_setaffinity_np(pthread_self(), sizeof(cs), &cs);
486  assert(r == 0);
487  (void)r;
488 }
489 
491 
492 }
seastar::listen
server_socket listen(socket_address sa)
seastar::mmap_deleter
Definition: posix.hh:65
seastar
Seastar API namespace.
Definition: abort_on_ebadf.hh:24
seastar::socket_address::u
union seastar::socket_address::@13 u
!< actual size of the relevant 'u' member
seastar::is_abort_on_ebadf_enabled
bool is_abort_on_ebadf_enabled()
seastar::posix_thread
Definition: posix.hh:387
seastar::socket
Definition: api.hh:237
seastar::posix_thread::attr::stack_size
Definition: posix.hh:407
seastar::posix::to_absolute_itimerspec
struct itimerspec to_absolute_itimerspec(std::chrono::time_point< Clock, Duration > base, std::chrono::duration< Rep, Period > interval)
Definition: posix.hh:381
seastar::posix::to_timespec
struct timespec to_timespec(std::chrono::duration< Rep, Period > d)
Definition: posix.hh:351
seastar::connect
future< connected_socket > connect(socket_address sa)
seastar::file_desc
Definition: posix.hh:74
seastar::posix::to_relative_itimerspec
struct itimerspec to_relative_itimerspec(std::chrono::duration< Rep1, Period1 > base, std::chrono::duration< Rep2, Period2 > interval)
Definition: posix.hh:366
seastar::posix_thread::attr
Definition: posix.hh:405
seastar::socket_address
Definition: socket_defs.hh:41