Seastar
High performance C++ framework for concurrent servers
pollable_fd.hh
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18/*
19 * Copyright 2019 ScyllaDB
20 */
21
22#pragma once
23
24#include <seastar/core/future.hh>
25#include <seastar/core/posix.hh>
26#include <seastar/util/bool_class.hh>
27#include <seastar/util/modules.hh>
28#ifndef SEASTAR_MODULE
29#include <boost/intrusive_ptr.hpp>
30#include <cstdint>
31#include <vector>
32#include <tuple>
33#include <sys/uio.h>
34#endif
35
36namespace seastar {
37
38SEASTAR_MODULE_EXPORT_BEGIN
39class reactor;
40class pollable_fd;
41class pollable_fd_state;
42class socket_address;
43SEASTAR_MODULE_EXPORT_END
44
45namespace internal {
46
47class buffer_allocator;
48
49}
50
51namespace net {
52
53SEASTAR_MODULE_EXPORT
54class packet;
55
56}
57
59
60using pollable_fd_state_ptr = boost::intrusive_ptr<pollable_fd_state>;
61
63 unsigned _refs = 0;
64public:
65 struct speculation {
66 int events = 0;
67 explicit speculation(int epoll_events_guessed = 0) : events(epoll_events_guessed) {}
68 };
69 pollable_fd_state(const pollable_fd_state&) = delete;
70 void operator=(const pollable_fd_state&) = delete;
78 void speculate_epoll(int events) { events_known |= events; }
83 bool take_speculation(int events) {
84 // invalidate the speculation set by the last speculate_epoll() call,
85 if (events_known & events) {
86 events_known &= ~events;
87 return true;
88 }
89 return false;
90 }
91 file_desc fd;
92 bool events_rw = false; // single consumer for both read and write (accept())
93 unsigned shutdown_mask = 0; // For udp, there is no shutdown indication from the kernel
94 int events_requested = 0; // wanted by pollin/pollout promises
95 int events_epoll = 0; // installed in epoll
96 int events_known = 0; // returned from epoll
97
98 friend class reactor;
99 friend class pollable_fd;
100 friend class reactor_backend_uring;
101
102 future<size_t> read_some(char* buffer, size_t size);
103 future<size_t> read_some(uint8_t* buffer, size_t size);
104 future<size_t> read_some(const std::vector<iovec>& iov);
105 future<temporary_buffer<char>> read_some(internal::buffer_allocator* ba);
106 future<> write_all(const char* buffer, size_t size);
107 future<> write_all(const uint8_t* buffer, size_t size);
108 future<size_t> write_some(net::packet& p);
109 future<> write_all(net::packet& p);
110 future<> readable();
111 future<> writeable();
112 future<> readable_or_writeable();
115 future<temporary_buffer<char>> recv_some(internal::buffer_allocator* ba);
116 future<size_t> sendmsg(struct msghdr *msg);
117 future<size_t> recvmsg(struct msghdr *msg);
118 future<size_t> sendto(socket_address addr, const void* buf, size_t len);
119 future<> poll_rdhup();
120
121protected:
122 explicit pollable_fd_state(file_desc fd, speculation speculate = speculation())
123 : fd(std::move(fd)), events_known(speculate.events) {}
124 ~pollable_fd_state() = default;
125private:
126 void maybe_no_more_recv();
127 void maybe_no_more_send();
128 void forget(); // called on end-of-life
129
130 friend void intrusive_ptr_add_ref(pollable_fd_state* fd) {
131 ++fd->_refs;
132 }
133 friend void intrusive_ptr_release(pollable_fd_state* fd);
134};
135
137public:
139 pollable_fd() = default;
140 pollable_fd(file_desc fd, speculation speculate = speculation());
141public:
142 future<size_t> read_some(char* buffer, size_t size) {
143 return _s->read_some(buffer, size);
144 }
145 future<size_t> read_some(uint8_t* buffer, size_t size) {
146 return _s->read_some(buffer, size);
147 }
148 future<size_t> read_some(const std::vector<iovec>& iov) {
149 return _s->read_some(iov);
150 }
151 future<temporary_buffer<char>> read_some(internal::buffer_allocator* ba) {
152 return _s->read_some(ba);
153 }
154 future<> write_all(const char* buffer, size_t size) {
155 return _s->write_all(buffer, size);
156 }
157 future<> write_all(const uint8_t* buffer, size_t size) {
158 return _s->write_all(buffer, size);
159 }
160 future<size_t> write_some(net::packet& p) {
161 return _s->write_some(p);
162 }
163 future<> write_all(net::packet& p) {
164 return _s->write_all(p);
165 }
166 future<> readable() {
167 return _s->readable();
168 }
169 future<> writeable() {
170 return _s->writeable();
171 }
172 future<> readable_or_writeable() {
173 return _s->readable_or_writeable();
174 }
176 return _s->accept();
177 }
178 future<> connect(socket_address& sa) {
179 return _s->connect(sa);
180 }
181 future<temporary_buffer<char>> recv_some(internal::buffer_allocator* ba) {
182 return _s->recv_some(ba);
183 }
184 future<size_t> sendmsg(struct msghdr *msg) {
185 return _s->sendmsg(msg);
186 }
187 future<size_t> recvmsg(struct msghdr *msg) {
188 return _s->recvmsg(msg);
189 }
190 future<size_t> sendto(socket_address addr, const void* buf, size_t len) {
191 return _s->sendto(addr, buf, len);
192 }
193 file_desc& get_file_desc() const { return _s->fd; }
195 void shutdown(int how, shutdown_kernel_only kernel_only = shutdown_kernel_only::yes);
196 void close() { _s.reset(); }
197 explicit operator bool() const noexcept {
198 return bool(_s);
199 }
200 future<> poll_rdhup() {
201 return _s->poll_rdhup();
202 }
203protected:
204 int get_fd() const { return _s->fd.get(); }
205 void maybe_no_more_recv() { return _s->maybe_no_more_recv(); }
206 void maybe_no_more_send() { return _s->maybe_no_more_send(); }
207 friend class reactor;
208 friend class readable_eventfd;
209 friend class writeable_eventfd;
210 friend class aio_storage_context;
211private:
212 pollable_fd_state_ptr _s;
213};
214
216
218 pollable_fd _fd;
219public:
220 explicit readable_eventfd(size_t initial = 0) : _fd(try_create_eventfd(initial)) {}
222 writeable_eventfd write_side();
223 future<size_t> wait();
224 int get_write_fd() { return _fd.get_fd(); }
225private:
226 explicit readable_eventfd(file_desc&& fd) : _fd(std::move(fd)) {}
227 static file_desc try_create_eventfd(size_t initial);
228
229 friend class writeable_eventfd;
230};
231
233 file_desc _fd;
234public:
235 explicit writeable_eventfd(size_t initial = 0) : _fd(try_create_eventfd(initial)) {}
237 readable_eventfd read_side();
238 void signal(size_t nr);
239 int get_read_fd() { return _fd.get(); }
240private:
241 explicit writeable_eventfd(file_desc&& fd) : _fd(std::move(fd)) {}
242 static file_desc try_create_eventfd(size_t initial);
243
244 friend class readable_eventfd;
245};
246
247}
Type-safe boolean.
Definition: bool_class.hh:58
Definition: posix.hh:85
A representation of a possibly not-yet-computed value.
Definition: future.hh:1240
Definition: packet.hh:87
Definition: pollable_fd.hh:62
bool take_speculation(int events)
Definition: pollable_fd.hh:83
void speculate_epoll(int events)
Definition: pollable_fd.hh:78
Definition: pollable_fd.hh:136
Definition: reactor.hh:155
Definition: pollable_fd.hh:217
Definition: socket_defs.hh:47
Definition: pollable_fd.hh:232
future< connected_socket > connect(socket_address sa)
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
STL namespace.
Definition: pollable_fd.hh:65