Above we’ve seen parallel_for_each()
, which starts a
number of asynchronous operations, and then waits for all to complete.
Seastar has another idiom, when_all()
, for waiting for
several already-existing futures to complete.
The first variant of when_all()
is variadic, i.e., the
futures are given as separate parameters, the exact number of which is
known at compile time. The individual futures may have different types.
For example,
#include <seastar/core/sleep.hh>
<> f() {
futureusing namespace std::chrono_literals;
<int> slow_two = sleep(2s).then([] { return 2; });
futurereturn when_all(sleep(1s), std::move(slow_two),
<double>(3.5)
make_ready_future).discard_result();
}
This starts three futures - one which sleeps for one second (and
doesn’t return anything), one which sleeps for two seconds and returns
the integer 2, and one which returns the double 3.5 immediately - and
then waits for them. The when_all()
function returns a
future which resolves as soon as all three futures resolves, i.e., after
two seconds. This future also has a value, which we shall explain below,
but in this example, we simply waited for the future to resolve and
discarded its value.
Note that when_all()
accept only rvalues, which can be
temporaries (like the return value of an asynchronous function or
make_ready_future
) or an std::move()
’ed
variable holding a future.
The future returned by when_all()
resolves to a tuple of
futures which are already resolved, and contain the results of the three
input futures. Continuing the above example,
<> f() {
futureusing namespace std::chrono_literals;
<int> slow_two = sleep(2s).then([] { return 2; });
futurereturn when_all(sleep(1s), std::move(slow_two),
<double>(3.5)
make_ready_future).then([] (auto tup) {
std::cout << std::get<0>(tup).available() << "\n";
std::cout << std::get<1>(tup).get() << "\n";
std::cout << std::get<2>(tup).get() << "\n";
});
}
The output of this program (which comes after two seconds) is
1, 2, 3.5
: the first future in the tuple is available (but
has no value), the second has the integer value 2, and the third a
double value 3.5 - as expected.
One or more of the waited futures might resolve in an exception, but
this does not change how when_all()
works: It still waits
for all the futures to resolve, each with either a value or an
exception, and in the returned tuple some of the futures may contain an
exception instead of a value. For example,
<> f() {
futureusing namespace std::chrono_literals;
<> slow_success = sleep(1s);
future<> slow_exception = sleep(2s).then([] { throw 1; });
futurereturn when_all(std::move(slow_success), std::move(slow_exception)
).then([] (auto tup) {
std::cout << std::get<0>(tup).available() << "\n";
std::cout << std::get<1>(tup).failed() << "\n";
std::get<1>(tup).ignore_ready_future();
});
}
Both futures are available()
(resolved), but the second
has failed()
(resulted in an exception instead of a value).
Note how we called ignore_ready_future()
on this failed
future, because silently ignoring a failed future is considered a bug,
and will result in an “Exceptional future ignored” error message. More
typically, an application will log the failed future instead of ignoring
it.
The above example demonstrate that when_all()
is
inconvenient and verbose to use properly. The results are wrapped in a
tuple, leading to verbose tuple syntax, and uses ready futures which
must all be inspected individually for an exception to avoid error
messages.
So Seastar also provides an easier to use
when_all_succeed()
function. This function too returns a
future which resolves when all the given futures have resolved. If all
of them succeeded, it passes a tuple of the resulting values to
continuation, without wrapping each of them in a future first.
Sometimes, it could be tedious to unpack the tuple for consuming the
resulting values. In that case, then_unpack()
can be used
in place of then()
. then_unpack()
unpacks the
returned tuple and passes its elements to the following continuation as
its parameters. If, however, one or more of the futures failed,
when_all_succeed()
resolves to a failed future, containing
the exception from one of the failed futures. If more than one of the
given future failed, one of those will be passed on (it is unspecified
which one is chosen), and the rest will be silently ignored. For
example,
using namespace seastar;
<> f() {
futureusing namespace std::chrono_literals;
return when_all_succeed(sleep(1s), make_ready_future<int>(2),
<double>(3.5)
make_ready_future).then_unpack([] (int i, double d) {
std::cout << i << " " << d << "\n";
});
}
Note how the integer and double values held by the futures are
conveniently passed, individually to the continuation. Since
sleep()
does not contain a value, it is waited for, but no
third value is passed to the continuation. That also means that if we
when_all_succeed()
on several future<>
(without a value), the result is a
future<tuple<>>
:
using namespace seastar;
<> f() {
futureusing namespace std::chrono_literals;
return when_all_succeed(sleep(1s), sleep(2s), sleep(3s)).then_unpack([] {
return make_ready_future<>();
});
}
This example simply waits for 3 seconds (the maximum of 1, 2 and 3 seconds).
An example of when_all_succeed()
with an exception:
using namespace seastar;
<> f() {
futureusing namespace std::chrono_literals;
return when_all_succeed(make_ready_future<int>(2),
<double>("oops")
make_exception_future).then_unpack([] (int i, double d) {
std::cout << i << " " << d << "\n";
}).handle_exception([] (std::exception_ptr e) {
std::cout << "exception: " << e << "\n";
});
}
In this example, one of the futures fails, so the result of
when_all_succeed
is a failed future, so the normal
continuation is not run, and the handle_exception()
continuation is done.
TODO: also explain when_all
and
when_all_succeed
for vectors.