Note: coroutines require C++20 and a supporting compiler. Clang 10 and above is known to work.
The simplest way to write efficient asynchronous code with Seastar is to use coroutines. Coroutines don’t share most of the pitfalls of traditional continuations (below), and so are the preferred way to write new code.
A coroutine is a function that returns a
seastar::future<T>
and uses the co_await
or co_return
keywords. Coroutines are invisible to their
callers and callees; they integrate with traditional Seastar code in
either role. If you are not familiar with C++ coroutines, you may want
to consult A
more general introduction to C++ coroutines; this section focuses on
how coroutines integrate with Seastar.
Here’s an example of a simple Seastar coroutine:
#include <seastar/core/coroutine.hh>
::future<int> read();
seastar::future<> write(int n);
seastar
::future<int> slow_fetch_and_increment() {
seastarauto n = co_await read(); // #1
co_await seastar::sleep(1s); // #2
auto new_n = n + 1; // #3
co_await write(new_n); // #4
co_return n; // #5
}
In #1, we call the read()
function, which returns a
future. The co_await
keyword instructs Seastar to inspect
the returned future. If the future is ready, then the value (an
int
) is extracted from the future and assigned to
n
. If the future is not ready, the coroutine arranges for
itself to be called when the future becomes ready, and control is
returned to Seastar. Once the future becomes ready, the coroutine is
awakened and the value is extracted from the future and assigned to
n
.
In #2, we call seastar::sleep()
and wait for the
returned future to become ready, which it will in a second. This
demonstrates that n
is preserved across
co_await
calls, and the author of the coroutine need not
arrange for storage for coroutine local variables.
Line #3 demonstrates the addition operation, with which the reader is assumed to be familiar.
In #4, we call a function that returns a
seastar::future<>
. In this case, the future carries
no value, and so no value is extracted and assigned.
Line #5 demonstrates returning a value. The integer value is used to
satisfy the future<int>
that our caller got when
calling the coroutine.
A lambda function can be a coroutine. Due to an interaction between how C++ lambda coroutines are specified and how Seastar coroutines work, using lambda coroutines as continuations can result in use-after-free. To avoid such problems, take one of the following approaches:
seastar::coroutine::lambda()
, and ensure the lambda
coroutine is fully awaited within the statement it is defined in.An example of wrapping a lambda coroutine is:
#include <seastar/core/coroutine.hh>
#include <seastar/coroutine/maybe_yield.hh>
<> foo() {
futureint n = 3;
int m = co_await seastar::yield().then(seastar::coroutine::lambda([n] () -> future<int> {
co_await seastar::coroutine::maybe_yield();
// `n` can be safely used here
co_return n;
}));
assert(n == m);
}
Notes: 1. seastar::future::then()
accepts a continuation
2. We wrap the argument to seastar::future::then()
with
seastar::coroutine::lambda()
3. We ensure evaluation of the
lambda completes within the same expression using the outer
co_await
.
More information can be found in
lambda-coroutine-fiasco.md
.
Sometimes, it would be convenient to model a view of
input_range
with a coroutine which emits the elements one
after another asynchronously. From the consumer of the view’s
perspective, it can retrieve the elements by co_await
ing
the return value of the coroutine. From the coroutine’s perspective, it
is able to produce the elements multiple times using
co_yield
without “leaving” the coroutine. A function
producing a sequence of values can be named “generator”. But unlike the
regular coroutine which returns a single
seastar::future<T>
, a generator should return
seastar::coroutine::experimental::generator<T, Container>
.
Where T
is the type of the elements, while
Container
is a template, which is used to store the
elements. Because, underneath of Seastar’s generator implementation, a
bounded buffer is used for holding the elements not yet retrieved by the
consumer, there is a design decision to make – what kind of container
should be used, and what its maximum size should be. To define the
bounded buffer, developers need to:
generator
seastar::coroutine::experimental::buffer_size_t
.But there is an exception, if the buffer’s size is one, we assume
that the programmer is likely to use std::optional
for the
bounded buffer, so it’s not required to pass the maximum size of the
buffer as the first parameter in this case. But if a coroutine uses
std::optional
as its buffer, and its function signature
still lists the size as its first parameter, it will not break anything.
As this parameter will just be ignored by the underlying
implementation.
Following is an example
#include <seastar/core/circular_buffer.hh>
#include <seastar/core/coroutine.hh>
#include <seastar/core/sleep.hh>
#include <seastar/coroutine/generator.hh>
::future<Preprocessed> prepare_ingredients(Ingredients&&);
seastar::future<Dish> cook_a_dish(Preprocessed&&);
seastar::future<> consume_a_dish(Dish&&);
seastar
::coroutine::experimental::generator<Dish, seastar::circular_buffer>
seastar(coroutine::experimental::buffer_size_t max_dishes_on_table,
make_dishes&& ingredients) {
Ingredientswhile (ingredients) {
auto some_ingredients = ingredients.alloc();
auto preprocessed = co_await prepare_ingredients(std::move(some_ingredients));
co_yield co_await cook_a_dish(std::move(preprocessed));
}
}
::future<> have_a_dinner(unsigned max_dishes_on_table) {
seastar;
Ingredients ingredientsauto dishes = make_dishes(coroutine::experimental::buffer_size_t{max_dishes_on_table},
std::move(ingredients));
while (auto dish = co_await dishes()) {
co_await consume_a_dish(std::move(dish));
}
}
In this hypothetical kitchen, a chef and a diner are working in
parallel. Instead of preparing all dishes beforehand, the chef cooks the
dishes while the diner is consuming them one after another. Under most
circumstances, neither the chef or the diner is blocked by its peer. The
dishes are buffered using the specified
seastar::circular_buffer<Dish>
. But if the diner is
too slow so that there are max_dishes_on_table
dishes left
on the table, the chef would wait until the number of dishes is less
than this setting. Please note, as explained above, despite that this
parameter is not referenced by the coroutine’s body, it is actually
passed to the generator’s promise constructor, which in turn creates the
buffer, as we are not using std::optional
here. On the
other hand, apparently, if there is no dishes on the table, the diner
would wait for new ones to be prepared by the chef.
Please note, generator<T, Container>
is still at
its early stage of developing, the public interface this template is
subject to change before it is stabilized enough.
Coroutines automatically translate exceptions to futures and back.
Calling co_await foo()
, when foo()
returns
an exceptional future, will throw the exception carried by the
future.
Similarly throwing within a coroutine will cause the coroutine to return an exceptional future.
Example:
#include <seastar/core/coroutine.hh>
::future<> function_returning_an_exceptional_future();
seastar
::future<> exception_handling() {
seastartry {
co_await function_returning_an_exceptional_future();
} catch (...) {
// exception will be handled here
}
throw 3; // will be captured by coroutine and returned as
// an exceptional future
}
In certain cases, exceptions can also be propagated directly, without
throwing or rethrowing them. It can be achieved by returning a
coroutine::exception
wrapper, but it unfortunately only
works for coroutines which return future<T>
, not
future<>
, due to the limitations in compilers. In
particular, the example above won’t compile if the return type is
changed to future<>
.
Example:
::future<int> exception_propagating() {
seastarstd::exception_ptr eptr;
try {
co_await function_returning_an_exceptional_future();
} catch (...) {
= std::current_exception();
eptr }
if (eptr) {
co_return seastar::coroutine::exception(eptr); // Saved exception pointer can be propagated without rethrowing
}
co_return seastar::coroutine::make_exception(3); // Custom exceptions can be propagated without throwing
}
The co_await
operator allows for simple sequential
execution. Multiple coroutines can execute in parallel, but each
coroutine has only one outstanding computation at a time.
The seastar::coroutine::all
class template allows a
coroutine to fork into several concurrently executing sub-coroutines (or
Seastar fibers, see below) and join again when they complete. Consider
this example:
#include <seastar/core/coroutines.hh>
#include <seastar/coroutine/all.hh>
::future<int> read(int key);
seastar
::future<int> parallel_sum(int key1, int key2) {
seastarint [a, b] = co_await seastar::coroutine::all(
[&] {
return read(key1);
},
[&] {
return read(key2);
}
);
co_return a + b;
}
Here, two read() calls are launched concurrently. The coroutine is
paused until both reads complete, and the values returned are assigned
to a
and b
. If read(key)
is an
operation that involves I/O, then the concurrent execution will complete
sooner than if we co_await
ed each call separately, since
I/O can be overlapped.
Note that all
waits for all of its sub-computations,
even if some throw an exception. If an exception is thrown, it is
propagated to the calling coroutine.
The seastar::coroutine::parallel_for_each
class template
allows a coroutine to fork into several concurrently executing function
invocations (or Seastar fibers, see below) over a range of elements and
join again when they complete. Consider this example:
#include <seastar/core/coroutines.hh>
#include <seastar/coroutine/parallel_for_each.hh>
::future<bool> all_exist(std::vector<seastar::sstring> filenames) {
seastarbool res = true;
co_await seastar::coroutine::parallel_for_each(filenames, [&res] (const seastar::sstring& name) -> seastar::future<> {
&= co_await seastar::file_exists(name);
res });
co_return res;
}
Here, the lambda function passed to parallel_for_each is launched concurrently for each element in the filenames vector. The coroutine is paused until all calls complete.
Seastar is generally used for I/O, and coroutines usually launch I/O operations and consume their results, with little computation in between. But occasionally a long running computation is needed, and this risks preventing the reactor from performing I/O and scheduling other tasks.
A coroutine will automatically yield in a co_await
expression; but in a computation we do not co_await
anything. We can use the seastar::coroutine::maybe_yield
class in such cases:
#include <seastar/coroutine/maybe_yield>
::future<int> long_loop(int n) {
seastarfloat acc = 0;
for (int i = 0; i < n; ++i) {
+= std::sin(float(i));
acc // Give the Seastar reactor opportunity to perform I/O or schedule
// other tasks.
co_await seastar::coroutine::maybe_yield();
}
co_return acc;
}
By default, co_await
-ing a future performs a preemption
check, and will suspend if the task quota is already depleted. However,
in certain cases it might be useful to be able to assume that awaiting a
ready future will not yield. For such cases, it’s possible to explicitly
bypass the preemption check:
#include <seastar/core/coroutine.hh>
struct resource;
::future<int> compute_always_ready(int i, resource& r);
seastar
::future<int> accumulate(int n, resource& important_resource) {
seastarfloat acc = 0;
for (int i = 0; i < n; ++i) {
// This await will not yield the control, so we're sure that nobody will
// be able to touch important_resource while we accumulate all the results.
+= co_await seastar::coroutine::without_preemption_check(compute_always_ready(i, important_resource));
acc }
co_return acc;
}