Seastar
High performance C++ framework for concurrent servers
pollable_fd.hh
1 /*
2  * This file is open source software, licensed to you under the terms
3  * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4  * distributed with this work for additional information regarding copyright
5  * ownership. You may not use this file except in compliance with the License.
6  *
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an
13  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14  * KIND, either express or implied. See the License for the
15  * specific language governing permissions and limitations
16  * under the License.
17  */
18 /*
19  * Copyright 2019 ScyllaDB
20  */
21 
22 #pragma once
23 
24 #include <seastar/core/future.hh>
25 #include <seastar/core/posix.hh>
26 #include <seastar/core/internal/io_desc.hh>
27 #include <seastar/util/bool_class.hh>
28 #include <seastar/util/modules.hh>
29 #ifndef SEASTAR_MODULE
30 #include <boost/intrusive_ptr.hpp>
31 #include <cstdint>
32 #include <vector>
33 #include <tuple>
34 #include <sys/uio.h>
35 #endif
36 
37 namespace seastar {
38 
39 SEASTAR_MODULE_EXPORT_BEGIN
40 class reactor;
41 class pollable_fd;
42 class pollable_fd_state;
43 class socket_address;
44 SEASTAR_MODULE_EXPORT_END
45 
46 namespace internal {
47 
48 class buffer_allocator;
49 
50 }
51 
52 namespace net {
53 
54 SEASTAR_MODULE_EXPORT
55 class packet;
56 
57 }
58 
59 class pollable_fd_state;
60 
61 using pollable_fd_state_ptr = boost::intrusive_ptr<pollable_fd_state>;
62 
64  unsigned _refs = 0;
65 public:
66  virtual ~pollable_fd_state() {}
67  struct speculation {
68  int events = 0;
69  explicit speculation(int epoll_events_guessed = 0) : events(epoll_events_guessed) {}
70  };
71  pollable_fd_state(const pollable_fd_state&) = delete;
72  void operator=(const pollable_fd_state&) = delete;
80  void speculate_epoll(int events) { events_known |= events; }
85  bool take_speculation(int events) {
86  // invalidate the speculation set by the last speculate_epoll() call,
87  if (events_known & events) {
88  events_known &= ~events;
89  return true;
90  }
91  return false;
92  }
93  file_desc fd;
94  bool events_rw = false; // single consumer for both read and write (accept())
95  unsigned shutdown_mask = 0; // For udp, there is no shutdown indication from the kernel
96  int events_requested = 0; // wanted by pollin/pollout promises
97  int events_epoll = 0; // installed in epoll
98  int events_known = 0; // returned from epoll
99 
100  friend class reactor;
101  friend class pollable_fd;
102  friend class reactor_backend_uring;
103 
104  future<size_t> read_some(char* buffer, size_t size);
105  future<size_t> read_some(uint8_t* buffer, size_t size);
106  future<size_t> read_some(const std::vector<iovec>& iov);
107  future<temporary_buffer<char>> read_some(internal::buffer_allocator* ba);
108  future<> write_all(const char* buffer, size_t size);
109  future<> write_all(const uint8_t* buffer, size_t size);
110  future<size_t> write_some(net::packet& p);
111  future<> write_all(net::packet& p);
112  future<> readable();
113  future<> writeable();
114  future<> readable_or_writeable();
117  future<temporary_buffer<char>> recv_some(internal::buffer_allocator* ba);
118  future<size_t> sendmsg(struct msghdr *msg);
119  future<size_t> recvmsg(struct msghdr *msg);
120  future<size_t> sendto(socket_address addr, const void* buf, size_t len);
121  future<> poll_rdhup();
122 
123 protected:
124  explicit pollable_fd_state(file_desc fd, speculation speculate = speculation())
125  : fd(std::move(fd)), events_known(speculate.events) {}
126 private:
127  void maybe_no_more_recv();
128  void maybe_no_more_send();
129  void forget(); // called on end-of-life
130 
131  friend void intrusive_ptr_add_ref(pollable_fd_state* fd) {
132  ++fd->_refs;
133  }
134  friend void intrusive_ptr_release(pollable_fd_state* fd);
135 };
136 
137 class pollable_fd {
138 public:
140  pollable_fd() = default;
141  pollable_fd(file_desc fd, speculation speculate = speculation());
142 public:
143  future<size_t> read_some(char* buffer, size_t size) {
144  return _s->read_some(buffer, size);
145  }
146  future<size_t> read_some(uint8_t* buffer, size_t size) {
147  return _s->read_some(buffer, size);
148  }
149  future<size_t> read_some(const std::vector<iovec>& iov) {
150  return _s->read_some(iov);
151  }
152  future<temporary_buffer<char>> read_some(internal::buffer_allocator* ba) {
153  return _s->read_some(ba);
154  }
155  future<> write_all(const char* buffer, size_t size) {
156  return _s->write_all(buffer, size);
157  }
158  future<> write_all(const uint8_t* buffer, size_t size) {
159  return _s->write_all(buffer, size);
160  }
161  future<size_t> write_some(net::packet& p) {
162  return _s->write_some(p);
163  }
164  future<> write_all(net::packet& p) {
165  return _s->write_all(p);
166  }
167  future<> readable() {
168  return _s->readable();
169  }
170  future<> writeable() {
171  return _s->writeable();
172  }
173  future<> readable_or_writeable() {
174  return _s->readable_or_writeable();
175  }
177  return _s->accept();
178  }
179  future<> connect(socket_address& sa) {
180  return _s->connect(sa);
181  }
182  future<temporary_buffer<char>> recv_some(internal::buffer_allocator* ba) {
183  return _s->recv_some(ba);
184  }
185  future<size_t> sendmsg(struct msghdr *msg) {
186  return _s->sendmsg(msg);
187  }
188  future<size_t> recvmsg(struct msghdr *msg) {
189  return _s->recvmsg(msg);
190  }
191  future<size_t> sendto(socket_address addr, const void* buf, size_t len) {
192  return _s->sendto(addr, buf, len);
193  }
194  file_desc& get_file_desc() const { return _s->fd; }
196  void shutdown(int how, shutdown_kernel_only kernel_only = shutdown_kernel_only::yes);
197  void close() { _s.reset(); }
198  explicit operator bool() const noexcept {
199  return bool(_s);
200  }
201  future<> poll_rdhup() {
202  return _s->poll_rdhup();
203  }
204 protected:
205  int get_fd() const { return _s->fd.get(); }
206  void maybe_no_more_recv() { return _s->maybe_no_more_recv(); }
207  void maybe_no_more_send() { return _s->maybe_no_more_send(); }
208  friend class reactor;
209  friend class readable_eventfd;
210  friend class writeable_eventfd;
211  friend class aio_storage_context;
212 private:
213  pollable_fd_state_ptr _s;
214 };
215 
216 class writeable_eventfd;
217 
219  pollable_fd _fd;
220 public:
221  explicit readable_eventfd(size_t initial = 0) : _fd(try_create_eventfd(initial)) {}
222  readable_eventfd(readable_eventfd&&) = default;
223  writeable_eventfd write_side();
224  future<size_t> wait();
225  int get_write_fd() { return _fd.get_fd(); }
226 private:
227  explicit readable_eventfd(file_desc&& fd) : _fd(std::move(fd)) {}
228  static file_desc try_create_eventfd(size_t initial);
229 
230  friend class writeable_eventfd;
231 };
232 
234  file_desc _fd;
235 public:
236  explicit writeable_eventfd(size_t initial = 0) : _fd(try_create_eventfd(initial)) {}
238  readable_eventfd read_side();
239  void signal(size_t nr);
240  int get_read_fd() { return _fd.get(); }
241 private:
242  explicit writeable_eventfd(file_desc&& fd) : _fd(std::move(fd)) {}
243  static file_desc try_create_eventfd(size_t initial);
244 
245  friend class readable_eventfd;
246 };
247 
248 }
Type-safe boolean.
Definition: bool_class.hh:57
Definition: posix.hh:85
A representation of a possibly not-yet-computed value.
Definition: future.hh:1238
Definition: packet.hh:87
Definition: pollable_fd.hh:63
bool take_speculation(int events)
Definition: pollable_fd.hh:85
void speculate_epoll(int events)
Definition: pollable_fd.hh:80
Definition: pollable_fd.hh:137
Definition: reactor.hh:168
Definition: pollable_fd.hh:218
Definition: socket_defs.hh:47
Definition: pollable_fd.hh:233
future< connected_socket > connect(socket_address sa)
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
Definition: pollable_fd.hh:67