Asynchronous Programming with Seastar

Nadav Har’El - nyh@ScyllaDB.com

Avi Kivity - avi@ScyllaDB.com

Back to table of contents. Previous: 9 Fibers. Next: 11 when_all: Waiting for multiple futures.

10 Loops

A majority of time-consuming computations involve using loops. Seastar provides several primitives for expressing them in a way that composes nicely with the future/promise model. A very important aspect of Seastar loop primitives is that each iteration is followed by a preemption point, thus allowing other tasks to run inbetween iterations. ## repeat A loop created with repeat executes its body until it receives a stop_iteration object, which informs if the iteration should continue (stop_iteration::no) or stop (stop_iteration::yes). Next iteration will be launched only after the first one has finished. The loop body passed to repeat is expected to have a future<stop_iteration> return type.

seastar::future<int> recompute_number(int number);

seastar::future<> push_until_100(seastar::lw_shared_ptr<std::vector<int>> queue, int element) {
    return seastar::repeat([queue, element] {
        if (queue->size() == 100) {
            return make_ready_future<stop_iteration>(stop_iteration::yes);
        }
        return recompute_number(element).then([queue] (int new_element) {
            queue->push_back(element);
            return stop_iteration::no;
        });
    });
}

10.1 do_until

Do until is a close relative of repeat, but it uses an explicitly passed condition to decide whether it should stop iterating. The above example could be expressed with do_until as follows:

seastar::future<int> recompute_number(int number);

seastar::future<> push_until_100(seastar::lw_shared_ptr<std::vector<int>> queue, int element) {
    return seastar::do_until([queue] { return queue->size() == 100; }, [queue, element] {
        return recompute_number(element).then([queue] (int new_element) {
            queue->push_back(new_element);
        });
    });
}

Note that the loop body is expected to return a future<>, which allows composing complex continuations inside the loop.

10.2 do_for_each

A do_for_each is an equivalent of a for loop in Seastar world. It accepts a range (or a pair of iterators) and a function body, which it applies to each argument, in order, one by one. The next iteration will be launched only after the first one has finished, as was the case with repeat. As usual, do_for_each expects its loop body to return a future<>.

seastar::future<> append(seastar::lw_shared_ptr<std::vector<int>> queue1, seastar::lw_shared_ptr<std::vector<int>> queue2) {
    return seastar::do_for_each(queue2, [queue1] (int element) {
        queue1->push_back(element);
    });
}

seastar::future<> append_iota(seastar::lw_shared_ptr<std::vector<int>> queue1, int n) {
    return seastar::do_for_each(boost::make_counting_iterator<size_t>(0), boost::make_counting_iterator<size_t>(n), [queue1] (int element) {
        queue1->push_back(element);
    });
}

do_for_each accepts either an lvalue reference to a container or a pair of iterators. It implies that the responsibility to ensure that the container is alive during the whole loop execution belongs to the caller. If the container needs its lifetime prolonged, it can be easily achieved with do_with:

seastar::future<> do_something(int number);

seastar::future<> do_for_all(std::vector<int> numbers) {
    // Note that the "numbers" vector will be destroyed as soon as this function
    // returns, so we use do_with to guarantee it lives during the whole loop execution:
    return seastar::do_with(std::move(numbers), [] (std::vector<int>& numbers) {
        return seastar::do_for_each(numbers, [] (int number) {
            return do_something(number);
        });
    });
}

## parallel_for_each
Parallel for each is a high concurrency variant of `do_for_each`. When using `parallel_for_each`, all iterations are queued simultaneously - which means that there's no guarantee in which order they finish their operations.

```cpp
seastar::future<> flush_all_files(seastar::lw_shared_ptr<std::vector<seastar::file>> files) {
    return seastar::parallel_for_each(files, [] (seastar::file f) {
        // file::flush() returns a future<>
        return f.flush();
    });
}

parallel_for_each is a powerful tool, as it allows spawning many tasks in parallel. It can be a great performance gain, but there are also caveats. First of all, too high concurrency may be troublesome - the details can be found in chapter Limiting parallelism of loops.

To restrict the concurrency of parallel_for_each by an integer number, use max_concurrent_for_each that is described below. More details about dealing with parallelism can be found in chapter Limiting parallelism of loops.

Secondly, take note that the order in which iterations will be executed within a parallel_for_each loop is arbitrary - if a strict ordering is needed, consider using do_for_each instead.

TODO: map_reduce, as a shortcut (?) for parallel_for_each which needs to produce some results (e.g., logical_or of boolean results), so we don’t need to create a lw_shared_ptr explicitly (or do_with).

TODO: See seastar commit “input_stream: Fix possible infinite recursion in consume()” for an example on why recursion is a possible, but bad, replacement for repeat(). See also my comment on https://groups.google.com/d/msg/seastar-dev/CUkLVBwva3Y/3DKGw-9aAQAJ on why Seastar’s iteration primitives should be used over tail call optimization.

10.3 max_concurrent_for_each

Max concurrent for each is a variant of parallel_for_each with restricted parallelism. It accepts an additional parameter - max_concurrent - with which, up to max_concurrent iterations are queued simultaneously, with no guarantee in which order they finish their operations.

seastar::future<> flush_all_files(seastar::lw_shared_ptr<std::vector<seastar::file>> files, size_t max_concurrent) {
    return seastar::max_concurrent_for_each(files, max_concurrent, [] (seastar::file f) {
        return f.flush();
    });
}

Determining the maximum concurrency limit is out of the scope of this document. It should typically be derived from the actual capabilities of the system the software is running on, like the number of parallel execution units or I/O channels, so to optimize utilization of resources without overwhelming the system.