Seastar’s semaphores are the standard computer-science semaphores, adapted for futures. A semaphore is a counter into which you can deposit units or take them away. Taking units from the counter may wait if not enough units are available.
The most common use for a semaphore in Seastar is for limiting parallelism, i.e., limiting the number of instances of some code which can run in parallel. This can be important when each of the parallel invocations uses a limited resource (e.g., memory) so letting an unlimited number of them run in parallel can exhaust this resource.
Consider a case where an external source of events (e.g., an incoming
network request) causes an asynchronous function g()
to be
called. Imagine that we want to limit the number of concurrent
g()
operations to 100. I.e., If g() is started when 100
other invocations are still ongoing, we want it to delay its real work
until one of the other invocations has completed. We can do this with a
semaphore:
::future<> g() {
seastarstatic thread_local seastar::semaphore limit(100);
return limit.wait(1).then([] {
return slow(); // do the real work of g()
}).finally([] {
.signal(1);
limit});
}
In this example, the semaphore starts with the counter at 100. The
asynchronous operation slow()
is only started when we can
reduce the counter by one (wait(1)
), and when
slow()
is done, either successfully or with exception, the
counter is increased back by one (signal(1)
). This way,
when 100 operations have already started their work and have not yet
finished, the 101st operation will wait, until one of the ongoing
operations finishes and returns a unit to the semaphore. This ensures
that at each time we have at most 100 concurrent slow()
operations running in the above code.
Note how we used a static thread_local
semaphore, so
that all calls to g()
from the same shard count towards the
same limit; As usual, a Seastar application is sharded so this limit is
separate per shard (CPU thread). This is usually fine, because sharded
applications consider resources to be separate per shard.
Luckily, the above code happens to be exception safe:
limit.wait(1)
can throw an exception when it runs out of
memory (keeping a list of waiters), and in that case the semaphore
counter is not decreased but the continuations below are not run so it
is not increased either. limit.wait(1)
can also return an
exceptional future when the semaphore is broken (we’ll discuss
this later) but in that case the extra signal()
call is
ignored. Finally, slow()
may also throw, or return an
exceptional future, but the finally()
ensures the semaphore
is still increased.
However, as the application code becomes more complex, it becomes
harder to ensure that we never forget to call signal()
after the operation is done, regardless of which code path or exceptions
happen. As an example of what might go wrong, consider the following
buggy code snippet, which differs subtly from the above one,
and also appears, on first sight, to be correct:
::future<> g() {
seastarstatic thread_local seastar::semaphore limit(100);
return limit.wait(1).then([] {
return slow().finally([] { limit.signal(1); });
});
}
But this version is not exception safe: Consider
what happens if slow()
throws an exception before returning
a future (this is different from slow()
returning an
exceptional future - we discussed this difference in the section about
exception handling). In this case, we decreased the counter, but the
finally()
will never be reached, and the counter will never
be increased back. There is a way to fix this code, by replacing the
call to slow()
with
seastar::futurize_invoke(slow)
. But the point we’re trying
to make here is not how to fix buggy code, but rather that by using the
separate semaphore::wait()
and
semaphore::signal()
functions, you can very easily get
things wrong.
For exception safety, in C++ it is generally not recommended to have separate resource acquisition and release functions. Instead, C++ offers safer mechanisms for acquiring a resource (in this case semaphore units) and later releasing it: lambda functions, and RAII (“resource acquisition is initialization”):
The lambda-based solution is a function
seastar::with_semaphore()
which is a shortcut for the code
in the examples above:
::future<> g() {
seastarstatic thread_local seastar::semaphore limit(100);
return seastar::with_semaphore(limit, 1, [] {
return slow(); // do the real work of g()
});
}
with_semaphore()
, like the earlier code snippets, waits
for the given number of units from the semaphore, then runs the given
lambda, and when the future returned by the lambda is resolved,
with_semaphore()
returns back the units to the semaphore.
with_semaphore()
returns a future which only resolves after
all these steps are done.
The function seastar::get_units()
is more general. It
provides an exception-safe alternative to
seastar::semaphore
’s separate wait()
and
signal()
methods, based on C++’s RAII philosophy: The
function returns an opaque units object, which while held, keeps the
semaphore’s counter decreased - and as soon as this object is
destructed, the counter is increased back. With this interface you
cannot forget to increase the counter, or increase it twice, or increase
without decreasing: The counter will always be decreased once when the
units object is created, and if that succeeded, increased when the
object is destructed. When the units object is moved into a
continuation, no matter how this continuation ends, when the
continuation is destructed, the units object is destructed and the units
are returned to the semaphore’s counter. The above examples, written
with get_units()
, looks like this:
::future<> g() {
seastarstatic thread_local semaphore limit(100);
return seastar::get_units(limit, 1).then([] (auto units) {
return slow().finally([units = std::move(units)] {});
});
}
Note the somewhat convoluted way that get_units()
needs
to be used: The continuations must be nested because we need the
units
object to be moved to the last continuation. If
slow()
returns a future (and does not throw immediately),
the finally()
continuation captures the units
object until everything is done, but does not run any code.
Seastar’s programmers should generally avoid using the the
semaphore::wait()
and semaphore::signal()
functions directly, and always prefer either
with_semaphore()
(when applicable) or
get_units()
.
Because semaphores support waiting for any number of units, not just
1, we can use them for more than simple limiting of the number
of parallel invocation. For example, consider we have an asynchronous
function using_lots_of_memory(size_t bytes)
, which uses
bytes
bytes of memory, and we want to ensure that not more
than 1 MB of memory is used by all parallel invocations of this function
— and that additional calls are delayed until previous calls have
finished. We can do this with a semaphore:
::future<> using_lots_of_memory(size_t bytes) {
seastarstatic thread_local seastar::semaphore limit(1000000); // limit to 1MB
return seastar::with_semaphore(limit, bytes, [bytes] {
// do something allocating 'bytes' bytes of memory
});
}
Watch out that in the above example, a call to
using_lots_of_memory(2000000)
will return a future that
never resolves, because the semaphore will never contain enough units to
satisfy the semaphore wait. using_lots_of_memory()
should
probably check whether bytes
is above the limit, and throw
an exception in that case. Seastar doesn’t do this for you.
Above, we looked at a function g()
which gets called by
some external event, and wanted to control its parallelism. In this
section, we look at parallelism of loops, which also can be controlled
with semaphores.
Consider the following simple loop:
#include <seastar/core/sleep.hh>
::future<> slow() {
seastarstd::cerr << ".";
return seastar::sleep(std::chrono::seconds(1));
}
::future<> f() {
seastarreturn seastar::repeat([] {
return slow().then([] { return seastar::stop_iteration::no; });
});
}
This loop runs the slow()
function (taking one second to
complete) without any parallelism — the next slow()
call
starts only when the previous one completed. But what if we do not need
to serialize the calls to slow()
, and want to allow
multiple instances of it to be ongoing concurrently?
Naively, we could achieve more parallelism, by starting the next call
to slow()
right after the previous call — ignoring the
future returned by the previous call to slow()
and not
waiting for it to resolve:
::future<> f() {
seastarreturn seastar::repeat([] {
();
slowreturn seastar::stop_iteration::no;
});
}
But in this loop, there is no limit to the amount of parallelism —
millions of sleep()
calls might be active in parallel,
before the first one ever returned. Eventually, this loop may consume
all available memory and crash.
Using a semaphore allows us to run many instances of
slow()
in parallel, but limit the number of these parallel
instances to, in the following example, 100:
::future<> f() {
seastarreturn seastar::do_with(seastar::semaphore(100), [] (auto& limit) {
return seastar::repeat([&limit] {
return limit.wait(1).then([&limit] {
::futurize_invoke(slow).finally([&limit] {
seastar.signal(1);
limit});
return seastar::stop_iteration::no;
});
});
});
}
Note how this code differs from the code we saw above for limiting
the number of parallel invocations of a function g()
:
thread_local
semaphore.
Each call to f()
has its loop with parallelism of 100, so
needs its own semaphore “limit
”, kept alive during the loop
with do_with()
.slow()
to complete before
continuing the loop, i.e., we do not return
the future
chain starting at futurize_invoke(slow)
. The loop continues
to the next iteration when a semaphore unit becomes available, while (in
our example) 99 other operations might be ongoing in the background and
we do not wait for them.In the examples in this section, we cannot use the
with_semaphore()
shortcut. with_semaphore()
returns a future which only resolves after the lambda’s returned future
resolves. But in the above example, the loop needs to know when just the
semaphore units are available, to start the next iteration — and not
wait for the previous iteration to complete. We could not achieve that
with with_semaphore()
. But the more general exception-safe
idiom, seastar::get_units()
, can be used in this case, and
is recommended:
::future<> f() {
seastarreturn seastar::do_with(seastar::semaphore(100), [] (auto& limit) {
return seastar::repeat([&limit] {
return seastar::get_units(limit, 1).then([] (auto units) {
().finally([units = std::move(units)] {});
slowreturn seastar::stop_iteration::no;
});
});
});
}
The above examples are not realistic, because they have a
never-ending loop and the future returned by f()
will never
resolve. In more realistic cases, the loop has an end, and at the end of
the loop we need to wait for all the background operations which the
loop started. We can do this by wait()
ing on the original
count of the semaphore: When the full count is finally available, it
means that all the operations have completed. For example, the
following loop ends after 456 iterations:
::future<> f() {
seastarreturn seastar::do_with(seastar::semaphore(100), [] (auto& limit) {
return seastar::do_for_each(boost::counting_iterator<int>(0),
boost::counting_iterator<int>(456), [&limit] (int i) {
return seastar::get_units(limit, 1).then([] (auto units) {
().finally([units = std::move(units)] {});
slow});
}).finally([&limit] {
return limit.wait(100);
});
});
}
The last finally
is what ensures that we wait for the
last operations to complete: After the repeat
loop ends
(whether successfully or prematurely because of an exception in one of
the iterations), we do a wait(100)
to wait for the
semaphore to reach its original value 100, meaning that all operations
that we started have completed. Without this finally
, the
future returned by f()
will resolve before all the
iterations of the loop actually completed (the last 100 may still be
running).
In the idiom we saw in the above example, the same semaphore is used both for limiting the number of background operations, and later to wait for all of them to complete. Sometimes, we want several different loops to use the same semaphore to limit their total parallelism. In that case we must use a separate mechanism for waiting for the completion of the background operations started by the loop. The most convenient way to wait for ongoing operations is using a gate, which we will describe in detail later. A typical example of a loop whose parallelism is limited by an external semaphore:
thread_local seastar::semaphore limit(100);
::future<> f() {
seastarreturn seastar::do_with(seastar::gate(), [] (auto& gate) {
return seastar::do_for_each(boost::counting_iterator<int>(0),
boost::counting_iterator<int>(456), [&gate] (int i) {
return seastar::get_units(limit, 1).then([&gate] (auto units) {
.enter();
gate::futurize_invoke(slow).finally([&gate, units = std::move(units)] {
seastar.leave();
gate});
});
}).finally([&gate] {
return gate.close();
});
});
}
In this code, we use the external semaphore limit
to
limit the number of concurrent operations, but additionally have a gate
specific to this loop to help us wait for all ongoing operations to
complete.
TODO: also allow get_units()
or something similar on a
gate, and use that instead of the explicit gate.enter/gate.leave.
TODO: say something about semaphore fairness - if someone is waiting for a lot of units and later someone asks for 1 unit, will both wait or will the request for 1 unit be satisfied?
TODO: say something about broken semaphores? (or in later section especially about breaking/closing/shutting down/etc?)
TODO: Have a few paragraphs, or even a section, on additional uses of semaphores. One is for mutual exclusion using semaphore(1) - we need to explain why although why in Seastar we don’t have multiple threads touching the same data, if code is composed of different continuations (i.e., a fiber) it can switch to a different fiber in the middle, so if data needs to be protected between two continuations, it needs a mutex. Another example is something akin to wait_all: we start with a semaphore(0), run a known number N of asynchronous functions with finally sem.signal(), and from all this return the future sem.wait(N). PERHAPS even have a separate section on mutual exclusion, where we begin with semaphore(1) but also mention shared_mutex