Basic ideas of Python 3 asyncio concurrency

Andy Balaam from Andy Balaam's Blog

Series: asyncio basics, large numbers in parallel, parallel HTTP requests, adding to stdlib

Python 3’s asyncio module and the async and await keywords combine to allow us to do cooperative concurrent programming, where a code path voluntarily yields control to a scheduler, trusting that it will get control back when some resource has become available (or just when the scheduler feels like it). This way of programming can be very confusing, and has been popularised by Twisted in the Python world, and nodejs (among others) in other worlds.

I have been trying to get my head around the basic ideas as they surface in Python 3’s model. Below are some definitions and explanations that have been useful to me as I tried to grasp how it all works.

Futures and coroutines are both things that you can wait for.

You can make a coroutine by declaring it with async def:

import asyncio
async def mycoro(number):
    print("Starting %d" % number)
    await asyncio.sleep(1)
    print("Finishing %d" % number)
    return str(number)

Almost always, a coroutine will await something such as some blocking IO. (Above we just sleep for a second.) When we await, we actually yield control to the scheduler so it can do other work and wake us up later, when something interesting has happened.

You can make a future out of a coroutine, but often you don’t need to. Bear in mind that if you do want to make a future, you should use ensure_future, but this actually runs what you pass to it – it doesn’t just create a future:

myfuture1 = asyncio.ensure_future(mycoro(1))
# Runs mycoro!

But, to get its result, you must wait for it – it is only scheduled in the background:

# Assume mycoro is defined as above
myfuture1 = asyncio.ensure_future(mycoro(1))
# We end the program without waiting for the future to finish

So the above fails like this:

$ python3 ./python-async.py
Task was destroyed but it is pending!
task: <Task pending coro=<mycoro() running at ./python-async:10>>
sys:1: RuntimeWarning: coroutine 'mycoro' was never awaited

The right way to block waiting for a future outside of a coroutine is to ask the event loop to do it:

# Keep on assuming mycoro is defined as above for all the examples
myfuture1 = asyncio.ensure_future(mycoro(1))
loop = asyncio.get_event_loop()
loop.run_until_complete(myfuture1)
loop.close()

Now this works properly (although we’re not yet getting any benefit from being asynchronous):

$ python3 python-async.py
Starting 1
Finishing 1

To run several things concurrently, we make a future that is the combination of several other futures. asyncio can make a future like that out of coroutines using asyncio.gather:

several_futures = asyncio.gather(
    mycoro(1), mycoro(2), mycoro(3))
loop = asyncio.get_event_loop()
print(loop.run_until_complete(several_futures))
loop.close()

The three coroutines all run at the same time, so this only takes about 1 second to run, even though we are running 3 tasks, each of which takes 1 second:

$ python3 python-async.py
Starting 3
Starting 1
Starting 2
Finishing 3
Finishing 1
Finishing 2
['1', '2', '3']

asyncio.gather won’t necessarily run your coroutines in order, but it will return a list of results in the same order as its input.

Notice also that run_until_complete returns the result of the future created by gather – a list of all the results from the individual coroutines.

To do the next bit we need to know how to call a coroutine from a coroutine. As we’ve already seen, just calling a coroutine in the normal Python way doesn’t run it, but gives you back a “coroutine object”. To actually run the code, we need to wait for it. When we want to block everything until we have a result, we can use something like run_until_complete but in an async context we want to yield control to the scheduler and let it give us back control when the coroutine has finished. We do that by using await:

import asyncio
async def f2():
    print("start f2")
    await asyncio.sleep(1)
    print("stop f2")
async def f1():
    print("start f1")
    await f2()
    print("stop f1")
loop = asyncio.get_event_loop()
loop.run_until_complete(f1())
loop.close()

This prints:

$ python3 python-async.py
start f1
start f2
stop f2
stop f1

Now we know how to call a coroutine from inside a coroutine, we can continue.

We have seen that asyncio.gather takes in some futures/coroutines and returns a future that collects their results (in order).

If, instead, you want to get results as soon as they are available, you need to write a second coroutine that deals with each result by looping through the results of asyncio.as_completed and awaiting each one.

# Keep on assuming mycoro is defined as at the top
async def print_when_done(tasks):
    for res in asyncio.as_completed(tasks):
        print("Result %s" % await res)
coros = [mycoro(1), mycoro(2), mycoro(3)]
loop = asyncio.get_event_loop()
loop.run_until_complete(print_when_done(coros))
loop.close()

This prints:

$ python3 python-async.py
Starting 1
Starting 3
Starting 2
Finishing 3
Result 3
Finishing 2
Result 2
Finishing 1
Result 1

Notice that task 3 finishes first and its result is printed, even though tasks 1 and 2 are still running.

asyncio.as_completed returns an iterable sequence of futures, each of which must be awaited, so it must run inside a coroutine, which must be waited for too.

The argument to asyncio.as_completed has to be a list of coroutines or futures, not an iterable, so you can’t use it with a very large list of items that won’t fit in memory.

Side note: if we want to work with very large lists, asyncio.wait won’t help us here – it also takes a list of futures and waits for all of them to complete (like gather), or, with other arguments, for one of them to complete or one of them to fail. It then returns two sets of futures: done and not-done. Each of these must be awaited to get their results, so:

asyncio.gather

# is roughly equivalent to:

async def mygather(*args):
    ret = []
    for r in (await asyncio.wait(args))[0]:
        ret.append(await r)
    return ret

I am interested in running very large numbers of tasks with limited concurrency – see the next article for how I managed it.

C++ iterator wrapping a stream not 1-1

Andy Balaam from Andy Balaam&#039;s Blog

Series: Iterator, Iterator Wrapper, Non-1-1 Wrapper

Sometimes we want to write an iterator that consumes items from some underlying iterator but produces its own items slower than the items it consumes, like this:

ColonSep items("aa:foo::x");
// Prints "aa, foo, , x"
for(auto s : items)
{
    std::cout << s << ", ";
}

When we pass a 9-character string (i.e. an iterator that yields 9 items) to ColonSep, above, we only repeat 4 times in our loop, because ColonSep provides an iterable range that yields one value for each whole word it finds in the underlying iterator of 9 characters.

To do something like this I'd recommend consuming the items of the underlying iterator early, so it is ready when requested with operator*. We also need our iterator class to hold on to the end of the underlying iterator as well as the current position.

First we need a small container to hold the next item we will provide:

struct maybestring
{
    std::string value_;
    bool at_end_;

    explicit maybestring(const std::string value)
    : value_(value)
    , at_end_(false)
    {}

    explicit maybestring()
    : value_("--error-past-end--")
    , at_end_(true)
    {}
};

A maybestring either holds the next item we will provide, or at_end_ is true, meaning we have reached the end of the underlying iterator and we will report that we are at the end ourself when asked.

Like the simpler iterators we have looked at, we still need a little container to return from the postincrement operator:

class stringholder
{
    const std::string value_;
public:
    stringholder(const std::string value) : value_(value) {}
    std::string operator*() const { return value_; }
};

Now we are ready to write our iterator class, which always has the next value ready in its next_ member, and holds on to the current and end positions of the underlying iterator in wrapped_ and wrapped_end_:

class myit
{
private:
    typedef std::string::const_iterator wrapped_t;
    wrapped_t wrapped_;
    wrapped_t wrapped_end_;
    maybestring next_;

The constructor holds on the underlying iterator pointers, and immediately fills next_ with the next value by calling next_item passing in true to indicate that this is the first item:

public:
    myit(wrapped_t wrapped, wrapped_t wrapped_end)
    : wrapped_(wrapped)
    , wrapped_end_(wrapped_end)
    , next_(next_item(true))
    {
    }

    // Previously provided by std::iterator
    typedef int                     value_type;
    typedef std::ptrdiff_t          difference_type;
    typedef int*                    pointer;
    typedef int&                    reference;
    typedef std::input_iterator_tag iterator_category;

next_item looks like this:

private:
    maybestring next_item(bool first_time)
    {
        if (wrapped_ == wrapped_end_)
        {
            return maybestring();  // We are at the end
        }
        else
        {
            if (!first_time)
            {
                ++wrapped_;
            }
            return read_item();
        }
    }

next_item recognises whether we've reached the end of the underlying iterator and saves the empty maybstring if so. Otherwise, it skips forward once (unless we are on the first element) and then calls read_item:

    maybestring read_item()
    {
        std::string ret = "";
        for (; wrapped_ != wrapped_end_; ++wrapped_)
        {
            char c = *wrapped_;
            if (c == ':')
            {
                break;
            }
            ret += c;
        }
        return maybestring(ret);
    }

read_item implements the real logic of looping through the underlying iterator and combining those values together to create the next item to provide.

The hard part of the iterator class is done, leaving only the more normal functions we must provide:

public:
    value_type operator*() const
    {
        assert(!next_.at_end_);
        return next_.value_;
    }

    bool operator==(const myit& other) const
    {
        // We only care about whether we are at the end
        return next_.at_end_ == other.next_.at_end_;
    }

    bool operator!=(const myit& other) const { return !(*this == other); }

    stringholder operator++(int)
    {
        assert(!next_.at_end_);
        stringholder ret(next_.value_);
        next_ = next_item(false);
        return ret;
    }

    myit& operator++()
    {
        assert(!next_.at_end_);
        next_ = next_item(false);
        return *this;
    }
}

Note that operator== is only concerned with whether or not we are an end iterator or not. Nothing else matters for providing correct iteration.

Our final bit of bookkeeping is the range class that allows our new iterator to be used in a for loop:

class ColonSep
{
private:
    const std::string str_;
public:
    ColonSep(const std::string str) : str_(str) {}
    myit begin() { return myit(std::begin(str_), std::end(str_)); }
    myit end()   { return myit(std::end(str_),   std::end(str_)); }
};

A lot of the code above is needed for all code that does this kind of job. Next time we'll look at how to use templates to make it useable in the general case.