Asynchronous Programming with Seastar

Nadav Har’El - nyh@ScyllaDB.com

Avi Kivity - avi@ScyllaDB.com

Back to table of contents. Previous: 4 Introducing futures and continuations. Next: 6 Continuations.

5 Coroutines

Note: coroutines require C++20 and a supporting compiler. Clang 10 and above is known to work.

The simplest way to write efficient asynchronous code with Seastar is to use coroutines. Coroutines don’t share most of the pitfalls of traditional continuations (below), and so are the preferred way to write new code.

A coroutine is a function that returns a seastar::future<T> and uses the co_await or co_return keywords. Coroutines are invisible to their callers and callees; they integrate with traditional Seastar code in either role. If you are not familiar with C++ coroutines, you may want to consult A more general introduction to C++ coroutines; this section focuses on how coroutines integrate with Seastar.

Here’s an example of a simple Seastar coroutine:

#include <seastar/core/coroutine.hh>

seastar::future<int> read();
seastar::future<> write(int n);

seastar::future<int> slow_fetch_and_increment() {
    auto n = co_await read();     // #1
    co_await seastar::sleep(1s);  // #2
    auto new_n = n + 1;           // #3
    co_await write(new_n);        // #4
    co_return n;                  // #5
}

In #1, we call the read() function, which returns a future. The co_await keyword instructs Seastar to inspect the returned future. If the future is ready, then the value (an int) is extracted from the future and assigned to n. If the future is not ready, the coroutine arranges for itself to be called when the future becomes ready, and control is returned to Seastar. Once the future becomes ready, the coroutine is awakened and the value is extracted from the future and assigned to n.

In #2, we call seastar::sleep() and wait for the returned future to become ready, which it will in a second. This demonstrates that n is preserved across co_await calls, and the author of the coroutine need not arrange for storage for coroutine local variables.

Line #3 demonstrates the addition operation, with which the reader is assumed to be familiar.

In #4, we call a function that returns a seastar::future<>. In this case, the future carries no value, and so no value is extracted and assigned.

Line #5 demonstrates returning a value. The integer value is used to satisfy the future<int> that our caller got when calling the coroutine.

5.1 Lambda coroutines

A lambda function can be a coroutine. Due to an interaction between how C++ lambda coroutines are specified and how Seastar coroutines work, using lambda coroutines as continuations can result in use-after-free. To avoid such problems, take one of the following approaches:

  1. Use lambda coroutines as arguments to functions that explicitly claim support for them
  2. Wrap lambda coroutines with seastar::coroutine::lambda(), and ensure the lambda coroutine is fully awaited within the statement it is defined in.

An example of wrapping a lambda coroutine is:

#include <seastar/core/coroutine.hh>
#include <seastar/coroutine/maybe_yield.hh>

future<> foo() {
    int n = 3;
    int m = co_await seastar::yield().then(seastar::coroutine::lambda([n] () -> future<int> {
        co_await seastar::coroutine::maybe_yield();
        // `n` can be safely used here
        co_return n;
    }));
    assert(n == m);
}

Notes: 1. seastar::future::then() accepts a continuation 2. We wrap the argument to seastar::future::then() with seastar::coroutine::lambda() 3. We ensure evaluation of the lambda completes within the same expression using the outer co_await.

More information can be found in lambda-coroutine-fiasco.md.

5.2 Generators in coroutines

Sometimes, it would be convenient to model a view of input_range with a coroutine which emits the elements one after another asynchronously. From the consumer of the view’s perspective, it can retrieve the elements by co_awaiting the return value of the coroutine. From the coroutine’s perspective, it is able to produce the elements multiple times using co_yield without “leaving” the coroutine. A function producing a sequence of values can be named “generator”. But unlike the regular coroutine which returns a single seastar::future<T>, a generator should return seastar::coroutine::experimental::generator<T, Container>. Where T is the type of the elements, while Container is a template, which is used to store the elements. Because, underneath of Seastar’s generator implementation, a bounded buffer is used for holding the elements not yet retrieved by the consumer, there is a design decision to make – what kind of container should be used, and what its maximum size should be. To define the bounded buffer, developers need to:

  1. specify the type of the container’s type by via the second template parameter of the generator
  2. specify the size of the bounded buffer by passing the size as the first parameter of the generator coroutine. The type of the size have to be seastar::coroutine::experimental::buffer_size_t.

But there is an exception, if the buffer’s size is one, we assume that the programmer is likely to use std::optional for the bounded buffer, so it’s not required to pass the maximum size of the buffer as the first parameter in this case. But if a coroutine uses std::optional as its buffer, and its function sigature still lists the size as its first parameter, it will not break anything. As this parameter will just be ignored by the underlying implementation.

Following is an example

#include <seastar/core/circular_buffer.hh>
#include <seastar/core/coroutine.hh>
#include <seastar/core/sleep.hh>
#include <seastar/coroutine/generator.hh>

seastar::future<Preprocessed> prepare_ingredients(Ingredients&&);
seastar::future<Dish> cook_a_dish(Preprocessed&&);
seastar::future<> consume_a_dish(Dish&&);

seastar::coroutine::experimental::generator<Dish, seastar::circular_buffer>
make_dishes(coroutine::experimental::buffer_size_t max_dishes_on_table,
            Ingredients&& ingredients) {
    while (ingredients) {
        auto some_ingredients = ingredients.alloc();
        auto preprocessed = co_await prepare_ingredients(std::move(some_ingredients));
        co_yield co_await cook_a_dish(std::move(preprocessed));
    }
}

seastar::future<> have_a_dinner(unsigned max_dishes_on_table) {
    Ingredients ingredients;
    auto dishes = make_dishes(coroutine::experimental::buffer_size_t{max_dishes_on_table},
                              std::move(ingredients));
    while (auto dish = co_await dishes()) {
        co_await consume_a_dish(std::move(dish));
    }
}

In this hypothetical kitchen, a chef and a diner are working in parallel. Instead of preparing all dishes beforehand, the chef cooks the dishes while the diner is consuming them one after another. Under most circumstances, neither the chef or the diner is blocked by its peer. The dishes are buffered using the specified seastar::circular_buffer<Dish>. But if the diner is too slow so that there are max_dishes_on_table dishes left on the table, the chef would wait until the number of dishes is less than this setting. Please note, as explained above, despite that this parameter is not referenced by the coroutine’s body, it is actually passed to the generator’s promise constructor, which in turn creates the buffer, as we are not using std::optional here. On the other hand, apparently, if there is no dishes on the table, the diner would wait for new ones to be prepared by the chef.

Please note, generator<T, Container> is still at its early stage of developing, the public interface this template is subject to change before it is stablized enough.

5.3 Exceptions in coroutines

Coroutines automatically translate exceptions to futures and back.

Calling co_await foo(), when foo() returns an exceptional future, will throw the exception carried by the future.

Similarly throwing within a coroutine will cause the coroutine to return an exceptional future.

Example:

#include <seastar/core/coroutine.hh>

seastar::future<> function_returning_an_exceptional_future();

seastar::future<> exception_handling() {
    try {
        co_await function_returning_an_exceptional_future();
    } catch (...) {
        // exception will be handled here
    }
    throw 3; // will be captured by coroutine and returned as
             // an exceptional future
}

In certain cases, exceptions can also be propagated directly, without throwing or rethrowing them. It can be achieved by returning a coroutine::exception wrapper, but it unfortunately only works for coroutines which return future<T>, not future<>, due to the limitations in compilers. In particular, the example above won’t compile if the return type is changed to future<>.

Example:

seastar::future<int> exception_propagating() {
    std::exception_ptr eptr;
    try {
        co_await function_returning_an_exceptional_future();
    } catch (...) {
        eptr = std::current_exception();
    }
    if (eptr) {
        co_return seastar::coroutine::exception(eptr); // Saved exception pointer can be propagated without rethrowing
    }
    co_return seastar::coroutine::make_exception(3); // Custom exceptions can be propagated without throwing
}

5.4 Concurrency in coroutines

The co_await operator allows for simple sequential execution. Multiple coroutines can execute in parallel, but each coroutine has only one outstanding computation at a time.

The seastar::coroutine::all class template allows a coroutine to fork into several concurrently executing sub-coroutines (or Seastar fibers, see below) and join again when they complete. Consider this example:

#include <seastar/core/coroutines.hh>
#include <seastar/coroutine/all.hh>

seastar::future<int> read(int key);

seastar::future<int> parallel_sum(int key1, int key2) {
    int [a, b] = co_await seastar::coroutine::all(
        [&] {
            return read(key1);
        },
        [&] {
            return read(key2);
        }
    );
    co_return a + b;
}

Here, two read() calls are launched concurrently. The coroutine is paused until both reads complete, and the values returned are assigned to a and b. If read(key) is an operation that involves I/O, then the concurrent execution will complete sooner than if we co_awaited each call separately, since I/O can be overlapped.

Note that all waits for all of its sub-computations, even if some throw an exception. If an exception is thrown, it is propagated to the calling coroutine.

The seastar::coroutine::parallel_for_each class template allows a coroutine to fork into several concurrently executing function invocations (or Seastar fibers, see below) over a range of elements and join again when they complete. Consider this example:

#include <seastar/core/coroutines.hh>
#include <seastar/coroutine/parallel_for_each.hh>

seastar::future<bool> all_exist(std::vector<sstring> filenames) {
    bool res = true;
    co_await seastar::coroutine::parallel_for_each(filenames, [&res] (const seastar::sstring& name) -> seastar::future<> {
        res &= co_await seastar::file_exists(name);
    });
    co_return res;
}

Here, the lambda function passed to parallel_for_each is launched concurrently for each element in the filenames vector. The coroutine is paused until all calls complete.

5.5 Breaking up long running computations

Seastar is generally used for I/O, and coroutines usually launch I/O operations and consume their results, with little computation in between. But occasionally a long running computation is needed, and this risks preventing the reactor from performing I/O and scheduling other tasks.

A coroutine will automatically yield in a co_await expression; but in a computation we do not co_await anything. We can use the seastar::coroutine::maybe_yield class in such cases:

#include <seastar/coroutine/maybe_yield>

seastar::future<int> long_loop(int n) {
    float acc = 0;
    for (int i = 0; i < n; ++i) {
        acc += std::sin(float(i));
        // Give the Seastar reactor opportunity to perform I/O or schedule
        // other tasks.
        co_await seastar::coroutine::maybe_yield();
    }
    co_return acc;
}

5.6 Bypassing preemption checks in coroutines

By default, co_await-ing a future performs a preemption check, and will suspend if the task quota is already depleted. However, in certain cases it might be useful to be able to assume that awaiting a ready future will not yield. For such cases, it’s possible to explicitly bypass the preemption check:

#include <seastar/core/coroutine.hh>

struct resource;
seastar::future<int> compute_always_ready(int i, resource& r);

seastar::future<int> accumulate(int n, resource& important_resource) {
    float acc = 0;
    for (int i = 0; i < n; ++i) {
        // This await will not yield the control, so we're sure that nobody will
        // be able to touch important_resource while we accumulate all the results.
        acc += co_await seastar::coroutine::without_preemption_check(compute_always_ready(i, important_resource));
    }
    co_return acc;
}
Back to table of contents. Previous: 4 Introducing futures and continuations. Next: 6 Continuations.