Asynchronous Programming with Seastar

Nadav Har’El - nyh@ScyllaDB.com

Avi Kivity - avi@ScyllaDB.com

Back to table of contents. Previous: 11 when_all: Waiting for multiple futures. Next: 13 Pipes.

12 Semaphores

Seastar’s semaphores are the standard computer-science semaphores, adapted for futures. A semaphore is a counter into which you can deposit units or take them away. Taking units from the counter may wait if not enough units are available.

12.1 Limiting parallelism with semaphores

The most common use for a semaphore in Seastar is for limiting parallelism, i.e., limiting the number of instances of some code which can run in parallel. This can be important when each of the parallel invocations uses a limited resource (e.g., memory) so letting an unlimited number of them run in parallel can exhaust this resource.

Consider a case where an external source of events (e.g., an incoming network request) causes an asynchronous function g() to be called. Imagine that we want to limit the number of concurrent g() operations to 100. I.e., If g() is started when 100 other invocations are still ongoing, we want it to delay its real work until one of the other invocations has completed. We can do this with a semaphore:

seastar::future<> g() {
    static thread_local seastar::semaphore limit(100);
    return limit.wait(1).then([] {
        return slow(); // do the real work of g()
    }).finally([] {
        limit.signal(1);
    });
}

In this example, the semaphore starts with the counter at 100. The asynchronous operation slow() is only started when we can reduce the counter by one (wait(1)), and when slow() is done, either successfully or with exception, the counter is increased back by one (signal(1)). This way, when 100 operations have already started their work and have not yet finished, the 101st operation will wait, until one of the ongoing operations finishes and returns a unit to the semaphore. This ensures that at each time we have at most 100 concurrent slow() operations running in the above code.

Note how we used a static thread_local semaphore, so that all calls to g() from the same shard count towards the same limit; As usual, a Seastar application is sharded so this limit is separate per shard (CPU thread). This is usually fine, because sharded applications consider resources to be separate per shard.

Luckily, the above code happens to be exception safe: limit.wait(1) can throw an exception when it runs out of memory (keeping a list of waiters), and in that case the semaphore counter is not decreased but the continuations below are not run so it is not increased either. limit.wait(1) can also return an exceptional future when the semaphore is broken (we’ll discuss this later) but in that case the extra signal() call is ignored. Finally, slow() may also throw, or return an exceptional future, but the finally() ensures the semaphore is still increased.

However, as the application code becomes more complex, it becomes harder to ensure that we never forget to call signal() after the operation is done, regardless of which code path or exceptions happen. As an example of what might go wrong, consider the following buggy code snippet, which differs subtly from the above one, and also appears, on first sight, to be correct:

seastar::future<> g() {
    static thread_local seastar::semaphore limit(100);
    return limit.wait(1).then([] {
        return slow().finally([] { limit.signal(1); });
    });
}

But this version is not exception safe: Consider what happens if slow() throws an exception before returning a future (this is different from slow() returning an exceptional future - we discussed this difference in the section about exception handling). In this case, we decreased the counter, but the finally() will never be reached, and the counter will never be increased back. There is a way to fix this code, by replacing the call to slow() with seastar::futurize_invoke(slow). But the point we’re trying to make here is not how to fix buggy code, but rather that by using the separate semaphore::wait() and semaphore::signal() functions, you can very easily get things wrong.

For exception safety, in C++ it is generally not recommended to have separate resource acquisition and release functions. Instead, C++ offers safer mechanisms for acquiring a resource (in this case seamphore units) and later releasing it: lambda functions, and RAII (“resource acquisition is initialization”):

The lambda-based solution is a function seastar::with_semaphore() which is a shortcut for the code in the examples above:

seastar::future<> g() {
    static thread_local seastar::semaphore limit(100);
    return seastar::with_semaphore(limit, 1, [] {
        return slow(); // do the real work of g()
    });
}

with_semaphore(), like the earlier code snippets, waits for the given number of units from the semaphore, then runs the given lambda, and when the future returned by the lambda is resolved, with_semaphore() returns back the units to the semaphore. with_semaphore() returns a future which only resolves after all these steps are done.

The function seastar::get_units() is more general. It provides an exception-safe alternative to seastar::semaphore’s separate wait() and signal() methods, based on C++’s RAII philosophy: The function returns an opaque units object, which while held, keeps the semaphore’s counter decreased - and as soon as this object is destructed, the counter is increased back. With this interface you cannot forget to increase the counter, or increase it twice, or increase without decreasing: The counter will always be decreased once when the units object is created, and if that succeeded, increased when the object is destructed. When the units object is moved into a continuation, no matter how this continuation ends, when the continuation is destructed, the units object is destructed and the units are returned to the semaphore’s counter. The above examples, written with get_units(), looks like this:

seastar::future<> g() {
    static thread_local semaphore limit(100);
    return seastar::get_units(limit, 1).then([] (auto units) {
        return slow().finally([units = std::move(units)] {});
    });
}

Note the somewhat convoluted way that get_units() needs to be used: The continuations must be nested because we need the units object to be moved to the last continuation. If slow() returns a future (and does not throw immediately), the finally() continuation captures the units object until everything is done, but does not run any code.

Seastars programmers should generally avoid using the the seamphore::wait() and semaphore::signal() functions directly, and always prefer either with_semaphore() (when applicable) or get_units().

12.2 Limiting resource use

Because semaphores support waiting for any number of units, not just 1, we can use them for more than simple limiting of the number of parallel invocation. For example, consider we have an asynchronous function using_lots_of_memory(size_t bytes), which uses bytes bytes of memory, and we want to ensure that not more than 1 MB of memory is used by all parallel invocations of this function — and that additional calls are delayed until previous calls have finished. We can do this with a semaphore:

seastar::future<> using_lots_of_memory(size_t bytes) {
    static thread_local seastar::semaphore limit(1000000); // limit to 1MB
    return seastar::with_semaphore(limit, bytes, [bytes] {
        // do something allocating 'bytes' bytes of memory
    });
}

Watch out that in the above example, a call to using_lots_of_memory(2000000) will return a future that never resolves, because the semaphore will never contain enough units to satisfy the semaphore wait. using_lots_of_memory() should probably check whether bytes is above the limit, and throw an exception in that case. Seastar doesn’t do this for you.

12.3 Limiting parallelism of loops

Above, we looked at a function g() which gets called by some external event, and wanted to control its parallelism. In this section, we look at parallelism of loops, which also can be controlled with semaphores.

Consider the following simple loop:

#include <seastar/core/sleep.hh>
seastar::future<> slow() {
    std::cerr << ".";
    return seastar::sleep(std::chrono::seconds(1));
}
seastar::future<> f() {
    return seastar::repeat([] {
        return slow().then([] { return seastar::stop_iteration::no; });
    });
}

This loop runs the slow() function (taking one second to complete) without any parallelism — the next slow() call starts only when the previous one completed. But what if we do not need to serialize the calls to slow(), and want to allow multiple instances of it to be ongoing concurrently?

Naively, we could achieve more parallelism, by starting the next call to slow() right after the previous call — ignoring the future returned by the previous call to slow() and not waiting for it to resolve:

seastar::future<> f() {
    return seastar::repeat([] {
        slow();
        return seastar::stop_iteration::no;
    });
}

But in this loop, there is no limit to the amount of parallelism — millions of sleep() calls might be active in parallel, before the first one ever returned. Eventually, this loop may consume all available memory and crash.

Using a semaphore allows us to run many instances of slow() in parallel, but limit the number of these parallel instances to, in the following example, 100:

seastar::future<> f() {
    return seastar::do_with(seastar::semaphore(100), [] (auto& limit) {
        return seastar::repeat([&limit] {
            return limit.wait(1).then([&limit] {
                seastar::futurize_invoke(slow).finally([&limit] {
                    limit.signal(1); 
                });
                return seastar::stop_iteration::no;
            });
        });
    });
}

Note how this code differs from the code we saw above for limiting the number of parallel invocations of a function g(): 1. Here we cannot use a single thread_local semaphore. Each call to f() has its loop with parallelism of 100, so needs its own semaphore “limit”, kept alive during the loop with do_with(). 2. Here we do not wait for slow() to complete before continuing the loop, i.e., we do not return the future chain starting at futurize_invoke(slow). The loop continues to the next iteration when a semaphore unit becomes available, while (in our example) 99 other operations might be ongoing in the background and we do not wait for them.

In the examples in this section, we cannot use the with_semaphore() shortcut. with_semaphore() returns a future which only resolves after the lambda’s returned future resolves. But in the above example, the loop needs to know when just the semaphore units are available, to start the next iteration — and not wait for the previous iteration to complete. We could not achieve that with with_semaphore(). But the more general exception-safe idiom, seastar::get_units(), can be used in this case, and is recommended:

seastar::future<> f() {
    return seastar::do_with(seastar::semaphore(100), [] (auto& limit) {
        return seastar::repeat([&limit] {
            return seastar::get_units(limit, 1).then([] (auto units) {
                slow().finally([units = std::move(units)] {});
                return seastar::stop_iteration::no;
            });
        });
    });
}

The above examples are not realistic, because they have a never-ending loop and the future returned by f() will never resolve. In more realistic cases, the loop has an end, and at the end of the loop we need to wait for all the background operations which the loop started. We can do this by wait()ing on the original count of the semaphore: When the full count is finally available, it means that all the operations have completed. For example, the following loop ends after 456 iterations:

seastar::future<> f() {
    return seastar::do_with(seastar::semaphore(100), [] (auto& limit) {
        return seastar::do_for_each(boost::counting_iterator<int>(0),
                boost::counting_iterator<int>(456), [&limit] (int i) {
            return seastar::get_units(limit, 1).then([] (auto units) {
                slow().finally([units = std::move(units)] {});
            });
        }).finally([&limit] {
            return limit.wait(100);
        });
    });
}

The last finally is what ensures that we wait for the last operations to complete: After the repeat loop ends (whether successfully or prematurely because of an exception in one of the iterations), we do a wait(100) to wait for the semaphore to reach its original value 100, meaning that all operations that we started have completed. Without this finally, the future returned by f() will resolve before all the iterations of the loop actually completed (the last 100 may still be running).

In the idiom we saw in the above example, the same semaphore is used both for limiting the number of background operations, and later to wait for all of them to complete. Sometimes, we want several different loops to use the same semaphore to limit their total parallelism. In that case we must use a separate mechanism for waiting for the completion of the background operations started by the loop. The most convenient way to wait for ongoing operations is using a gate, which we will describe in detail later. A typical example of a loop whose parallelism is limited by an external semaphore:

thread_local seastar::semaphore limit(100);
seastar::future<> f() {
    return seastar::do_with(seastar::gate(), [] (auto& gate) {
        return seastar::do_for_each(boost::counting_iterator<int>(0),
                boost::counting_iterator<int>(456), [&gate] (int i) {
            return seastar::get_units(limit, 1).then([&gate] (auto units) {
                gate.enter();
                seastar::futurize_invoke(slow).finally([&gate, units = std::move(units)] {
                    gate.leave();
                });
            });
        }).finally([&gate] {
            return gate.close();
        });
    });
}

In this code, we use the external semaphore limit to limit the number of concurrent operations, but additionally have a gate specific to this loop to help us wait for all ongoing operations to complete.

TODO: also allow get_units() or something similar on a gate, and use that instead of the explicit gate.enter/gate.leave.

TODO: say something about semaphore fairness - if someone is waiting for a lot of units and later someone asks for 1 unit, will both wait or will the request for 1 unit be satisfied?

TODO: say something about broken semaphores? (or in later section especially about breaking/closing/shutting down/etc?)

TODO: Have a few paragraphs, or even a section, on additional uses of semaphores. One is for mutual exclusion using semaphore(1) - we need to explain why although why in Seastar we don’t have multiple threads touching the same data, if code is composed of different continuations (i.e., a fiber) it can switch to a different fiber in the middle, so if data needs to be protected between two continuations, it needs a mutex. Another example is something akin to wait_all: we start with a semaphore(0), run a known number N of asynchronous functions with finally sem.signal(), and from all this return the future sem.wait(N). PERHAPS even have a separate section on mutual exclusion, where we begin with semaphore(1) but also mention shared_mutex