Asynchronous Programming with Seastar

Nadav Har’El - nyh@ScyllaDB.com

Avi Kivity - avi@ScyllaDB.com

Back to table of contents. Previous: 16 More about Seastar’s event loop. Next: 18 Sharded services.

17 Introducing Seastar’s network stack

TODO: Mention the two modes of operation: Posix and native (i.e., take a L2 (Ethernet) interface (vhost or dpdk) and on top of it we built (in Seastar itself) an L3 interface (TCP/IP)).

For optimal performance, Seastar’s network stack is sharded just like Seastar applications are: each shard (thread) takes responsibility for a different subset of the connections. Each incoming connection is directed to one of the threads, and after a connection is established, it continues to be handled on the same thread.

In the examples we saw earlier, main() ran our function f() only once, on the first thread. Unless the server is run with the "-c1" option (one thread only), this will mean that any connection arriving to a different thread will not be handled. So in all the examples below, we will need to run the same service loop on all cores. We can easily do this with the smp::submit_to function:

seastar::future<> service_loop();

seastar::future<> f() {
    return seastar::parallel_for_each(boost::irange<unsigned>(0, seastar::smp::count),
            [] (unsigned c) {
        return seastar::smp::submit_to(c, service_loop);
    });
}

Here we ask each of Seastar cores (from 0 to smp::count-1) to run the same function service_loop(). Each of these invocations returns a future, and f() will return when all of them have returned (in the examples below, they will never return - we will discuss shutting down services in later sections).

We begin with a simple example of a TCP network server written in Seastar. This server repeatedly accepts connections on TCP port 1234, and returns an empty response:

#include <seastar/core/seastar.hh>
#include <seastar/core/reactor.hh>
#include <seastar/core/future-util.hh>
#include <seastar/net/api.hh>

seastar::future<> service_loop() {
    return seastar::do_with(seastar::listen(seastar::make_ipv4_address({1234})),
            [] (auto& listener) {
        return seastar::keep_doing([&listener] () {
            return listener.accept().then(
                [] (seastar::accept_result res) {
                    std::cout << "Accepted connection from " << res.remote_address << "\n";
            });
        });
    });
}

This code works as follows: 1. The listen() call creates a server_socket object, listener, which listens on TCP port 1234 (on any network interface). 2. We use do_with() to ensure that the listener socket lives throughout the loop. 3. To handle one connection, we call listener’s accept() method. This method returns a future<accept_result>, i.e., is eventually resolved with an incoming TCP connection from a client (accept_result.connection) and the client’s IP address and port (accept_result.remote_address). 4. To repeatedly accept new connections, we use the keep_doing() loop idiom. keep_doing() runs its lambda parameter over and over, starting the next iteration as soon as the future returned by the previous iteration completes. The iterations only stop if an exception is encountered. The future returned by keep_doing() itself completes only when the iteration stops (i.e., only on exception).

Output from this server looks like the following example:

$ ./a.out
Accepted connection from 127.0.0.1:47578
Accepted connection from 127.0.0.1:47582
...

If you run the above example server immediately after killing the previous server, it often fails to start again, complaining that:

$ ./a.out
program failed with uncaught exception: bind: Address already in use

This happens because by default, Seastar refuses to reuse the local port if there are any vestiges of old connections using that port. In our silly server, because the server is the side which first closes the connection, each connection lingers for a while in the “TIME_WAIT” state after being closed, and these prevent listen() on the same port from succeeding. Luckily, we can give listen an option to work despite these remaining TIME_WAIT. This option is analogous to socket(7)’s SO_REUSEADDR option:

    seastar::listen_options lo;
    lo.reuse_address = true;
    return seastar::do_with(seastar::listen(seastar::make_ipv4_address({1234}), lo),

Most servers will always turn on this reuse_address listen option. Stevens’ book “Unix Network Programming” even says that “All TCP servers should specify this socket option to allow the server to be restarted”. Therefore in the future Seastar should probably default to this option being on — even if for historic reasons this is not the default in Linux’s socket API.

Let’s advance our example server by outputting some canned response to each connection, instead of closing each connection immediately with an empty reply.

#include <seastar/core/seastar.hh>
#include <seastar/core/reactor.hh>
#include <seastar/core/future-util.hh>
#include <seastar/net/api.hh>

const char* canned_response = "Seastar is the future!\n";

seastar::future<> service_loop() {
    seastar::listen_options lo;
    lo.reuse_address = true;
    return seastar::do_with(seastar::listen(seastar::make_ipv4_address({1234}), lo),
            [] (auto& listener) {
        return seastar::keep_doing([&listener] () {
            return listener.accept().then(
                    [] (seastar::accept_result res) {
                auto s = std::move(res.connection);
                auto out = s.output();
                return seastar::do_with(std::move(s), std::move(out),
                        [] (auto& s, auto& out) {
                    return out.write(canned_response).then([&out] {
                        return out.close();
                    });
                });
            });
        });
    });
}

The new part of this code begins by taking the connected_socket’s output(), which returns an output_stream<char> object. On this output stream out we can write our response using the write() method. The simple-looking write() operation is in fact a complex asynchronous operation behind the scenes, possibly causing multiple packets to be sent, retransmitted, etc., as needed. write() returns a future saying when it is ok to write() again to this output stream; This does not necessarily guarantee that the remote peer received all the data we sent it, but it guarantees that the output stream has enough buffer space (or in the TCP case, there is enough room in the TCP congestion window) to allow another write to begin.

After write()ing the response to out, the example code calls out.close() and waits for the future it returns. This is necessary, because write() attempts to batch writes so might not have yet written anything to the TCP stack at this point, and only when close() concludes can we be sure that all the data we wrote to the output stream has actually reached the TCP stack — and only at this point we may finally dispose of the out and s objects.

Indeed, this server returns the expected response:

$ telnet localhost 1234
...
Seastar is the future!
Connection closed by foreign host.

In the above example we only saw writing to the socket. Real servers will also want to read from the socket. The connected_socket’s input() method returns an input_stream<char> object which can be used to read from the socket. The simplest way to read from this stream is using the read() method which returns a future temporary_buffer<char>, containing some more bytes read from the socket — or an empty buffer when the remote end shut down the connection.

temporary_buffer<char> is a convenient and safe way to pass around byte buffers that are only needed temporarily (e.g., while processing a request). As soon as this object goes out of scope (by normal return, or exception), the memory it holds gets automatically freed. Ownership of buffer can also be transferred by std::move()ing it. We’ll discuss temporary_buffer in more details in a later section.

Let’s look at a simple example server involving both reads an writes. This is a simple echo server, as described in RFC 862: The server listens for connections from the client, and once a connection is established, any data received is simply sent back - until the client closes the connection.

#include <seastar/core/seastar.hh>
#include <seastar/core/reactor.hh>
#include <seastar/core/future-util.hh>
#include <seastar/net/api.hh>

seastar::future<> handle_connection(seastar::connected_socket s,
                                    seastar::socket_address a) {
    auto out = s.output();
    auto in = s.input();
    return do_with(std::move(s), std::move(out), std::move(in),
            [] (auto& s, auto& out, auto& in) {
        return seastar::repeat([&out, &in] {
            return in.read().then([&out] (auto buf) {
                if (buf) {
                    return out.write(std::move(buf)).then([&out] {
                        return out.flush();
                    }).then([] {
                        return seastar::stop_iteration::no;
                    });
                } else {
                    return seastar::make_ready_future<seastar::stop_iteration>(
                            seastar::stop_iteration::yes);
                }
            });
        }).then([&out] {
            return out.close();
        });
    });
}

seastar::future<> service_loop_3() {
    seastar::listen_options lo;
    lo.reuse_address = true;
    return seastar::do_with(seastar::listen(seastar::make_ipv4_address({1234}), lo),
            [] (auto& listener) {
        return seastar::keep_doing([&listener] () {
            return listener.accept().then(
                    [] (seastar::accept_result res) {
                // Note we ignore, not return, the future returned by
                // handle_connection(), so we do not wait for one
                // connection to be handled before accepting the next one.
                (void)handle_connection(std::move(res.connection), std::move(res.remote_address)).handle_exception(
                        [] (std::exception_ptr ep) {
                    fmt::print(stderr, "Could not handle connection: {}\n", ep);
                });
            });
        });
    });
}

The main function service_loop() loops accepting new connections, and for each connection calls handle_connection() to handle this connection. Our handle_connection() returns a future saying when handling this connection completed, but importantly, we do not wait for this future: Remember that keep_doing will only start the next iteration when the future returned by the previous iteration is resolved. Because we want to allow parallel ongoing connections, we don’t want the next accept() to wait until the previously accepted connection was closed. So we call handle_connection() to start the handling of the connection, but return nothing from the continuation, which resolves that future immediately, so keep_doing will continue to the next accept().

This demonstrates how easy it is to run parallel fibers (chains of continuations) in Seastar - When a continuation runs an asynchronous function but ignores the future it returns, the asynchronous operation continues in parallel, but never waited for.

It is often a mistake to silently ignore an exception, so if the future we’re ignoring might resolve with an except, it is recommended to handle this case, e.g. using a handle_exception() continuation. In our case, a failed connection is fine (e.g., the client might close its connection will we’re sending it output), so we did not bother to handle the exception.

The handle_connection() function itself is straightforward — it repeatedly calls read() read on the input stream, to receive a temporary_buffer with some data, and then moves this temporary buffer into a write() call on the output stream. The buffer will eventually be freed, automatically, when the write() is done with it. When read() eventually returns an empty buffer signifying the end of input, we stop repeat’s iteration by returning a stop_iteration::yes.