TECH(K)NOW Day workshop on “Writing a programming language”

Andy Balaam from Andy Balaam's Blog

My OpenMarket colleagues and I ran a workshop at TECH(K)NOW Day on how to write your own programming language:

A big thank you to my colleagues from OpenMarket who volunteered to help: Rowan, Jenny, Zach, James and Elliot.

An extra thank you to Zach and Elliott for their impromptu help on the information desk for attendees:

Hopefully the attendees enjoyed it and learned a bit:

You can find the workshop slides, the full code, info about another simple language called Cell, and lots more links here: github.com/andybalaam/videos-write-your-own-language, my blog at artificialworlds.net/blog, and follow me on twitter @andybalaam.

Thanks to OpenMarket for supporting us in running this workshop!

Running a virtualenv with a custom-built Python

Andy Balaam from Andy Balaam's Blog

For my attempt to improve the asyncio.as_completed Python standard library function I needed to build a local copy of cpython (the Python interpreter).

To test it, I needed the aiohttp module, which is not part of the standard library, so the easiest way to get it was using virtualenv.

Here is the recipe I used to get a virtualenv and install packages using pip with a custom-built Python:

$ ~/code/public/cpython/python -m venv env
$ . env/bin/activate
(env) $ pip install aiohttp
(env) $ python mycode.py

Adding a concurrency limit to Python’s asyncio.as_completed

Andy Balaam from Andy Balaam's Blog

Series: asyncio basics, large numbers in parallel, parallel HTTP requests, adding to stdlib

In the previous post I demonstrated how the limited_as_completed method allows us to run a very large number of tasks using concurrency, but limiting the number of concurrent tasks to a sensible limit to ensure we don’t exhaust resources like memory or operating system file handles.

I think this could be a useful addition to the Python standard library, so I have been working on a modification to the current asyncio.as_completed method. My work so far is here: limited-as_completed.

I ran similar tests to the ones I ran for the last blog post with this code to validate that the modified standard library version achieves the same goals as before.

I used an identical copy of timed from the previous post and updated versions of the other files because I was using a much newer version of aiohttp along with the custom-built python I was running.

server looked like:

#!/usr/bin/env python3

from aiohttp import web
import asyncio
import random

async def handle(request):
    await asyncio.sleep(random.randint(0, 3))
    return web.Response(text="Hello, World!")

app = web.Application()
app.router.add_get('/{name}', handle)

web.run_app(app)

client-async-sem needed me to add a custom TCPConnector to avoid a new limit on the number of concurrent connections that was added to aiohttp in version 2.0. I also need to move the ClientSession usage inside a coroutine to avoid a warning:

#!/usr/bin/env python3

from aiohttp import ClientSession, TCPConnector
import asyncio
import sys

limit = 1000

async def fetch(url, session):
    async with session.get(url) as response:
        return await response.read()

async def bound_fetch(sem, url, session):
    # Getter function with semaphore.
    async with sem:
        await fetch(url, session)

async def run(r):
    with ClientSession(connector=TCPConnector(limit=limit)) as session:
        url = "http://localhost:8080/{}"
        tasks = []
        # create instance of Semaphore
        sem = asyncio.Semaphore(limit)
        for i in range(r):
            # pass Semaphore and session to every GET request
            task = asyncio.ensure_future(
                bound_fetch(sem, url.format(i), session))
            tasks.append(task)
        responses = asyncio.gather(*tasks)
        await responses

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.ensure_future(run(int(sys.argv[1]))))

My new code that uses my proposed extension to as_completed looked like:

#!/usr/bin/env python3

from aiohttp import ClientSession, TCPConnector
import asyncio
import sys

async def fetch(url, session):
    async with session.get(url) as response:
        return await response.read()

limit = 1000

async def print_when_done():
    with ClientSession(connector=TCPConnector(limit=limit)) as session:
        tasks = (fetch(url.format(i), session) for i in range(r))
        for res in asyncio.as_completed(tasks, limit=limit):
            await res

r = int(sys.argv[1])
url = "http://localhost:8080/{}"
loop = asyncio.get_event_loop()
loop.run_until_complete(print_when_done())
loop.close()

and with these, we get similar behaviour to the previous post:

$ ./timed ./client-async-sem 10000
Memory usage: 73640KB	Time: 19.18 seconds
$ ./timed ./client-async-stdlib 10000
Memory usage: 49332KB	Time: 18.97 seconds

So the implementation I plan to submit to the Python standard library appears to work well. In fact, I think it is better than the one I presented in the previous post, because it uses on_complete callbacks to notice when futures have completed, which reduces the busy-looping we were doing to check for and yield finished tasks.

The Python issue is bpo-30782 and the pull request is #2424.

Note: at first glance, it looks like the aiohttp.ClientSession‘s limit on the number of connections (introduced in version 1.0 and then updated in version 2.0) gives us what we want without any of this extra code, but in fact it only limits the number of connections, not the number of futures we are creating, so it has the same problem of unbounded memory use as the semaphore-based implementation.

Making 100 million requests with Python aiohttp

Andy Balaam from Andy Balaam's Blog

Series: asyncio basics, large numbers in parallel, parallel HTTP requests, adding to stdlib

I’ve been working on how to make a very large number of HTTP requests using Python’s asyncio and aiohttp.

Paweł Miech’s post Making 1 million requests with python-aiohttp taught me how to think about this, and got us a long way, with 1 million requests running in a reasonable time, but I need to go further.

Paweł’s approach limits the number of requests that are in progress, but it uses an unbounded amount of memory to hold the futures that it wants to execute.

We can avoid using unbounded memory by using the limited_as_completed function I outined in my previous post.

Setup

Server

We have a server program “server”:

(Note it differs from Paweł’s version because I am using an older version of aiohttp which has fewer convenient features.)

#!/usr/bin/env python3.5

from aiohttp import web
import asyncio
import random

async def handle(request):
    await asyncio.sleep(random.randint(0, 3))
    return web.Response(text="Hello, World!")

async def init():
    app = web.Application()
    app.router.add_route('GET', '/{name}', handle)
    return await loop.create_server(
        app.make_handler(), '127.0.0.1', 8080)

loop = asyncio.get_event_loop()
loop.run_until_complete(init())
loop.run_forever()

This just responds “Hello, World!” to every request it receives, but after an artificial delay of 0-3 seconds.

Synchronous client

As a baseline, we have a synchronous client “client-sync”:

#!/usr/bin/env python3.5

import requests
import sys

url = "http://localhost:8080/{}"
for i in range(int(sys.argv[1])):
    requests.get(url.format(i)).text

This waits for each request to complete before making the next one. Like the other clients below, it takes the number of requests to make as a command-line argument.

Async client using semaphores

Copied mostly verbatim from Making 1 million requests with python-aiohttp we have an async client “client-async-sem” that uses a semaphore to restrict the number of requests that are in progress at any time to 1000:

#!/usr/bin/env python3.5

from aiohttp import ClientSession
import asyncio
import sys

limit = 1000

async def fetch(url, session):
    async with session.get(url) as response:
        return await response.read()

async def bound_fetch(sem, url, session):
    # Getter function with semaphore.
    async with sem:
        await fetch(url, session)

async def run(session, r):
    url = "http://localhost:8080/{}"
    tasks = []
    # create instance of Semaphore
    sem = asyncio.Semaphore(limit)
    for i in range(r):
        # pass Semaphore and session to every GET request
        task = asyncio.ensure_future(bound_fetch(sem, url.format(i), session))
        tasks.append(task)
    responses = asyncio.gather(*tasks)
    await responses

loop = asyncio.get_event_loop()
with ClientSession() as session:
    loop.run_until_complete(asyncio.ensure_future(run(session, int(sys.argv[1]))))

Async client using limited_as_completed

The new client I am presenting here uses limited_as_completed from the previous post. This means it can make a generator that provides the futures to wait for as they are needed, instead of making them all at the beginning.

It is called “client-async-as-completed”:

#!/usr/bin/env python3.5

from aiohttp import ClientSession
import asyncio
from itertools import islice
import sys

def limited_as_completed(coros, limit):
    futures = [
        asyncio.ensure_future(c)
        for c in islice(coros, 0, limit)
    ]
    async def first_to_finish():
        while True:
            await asyncio.sleep(0)
            for f in futures:
                if f.done():
                    futures.remove(f)
                    try:
                        newf = next(coros)
                        futures.append(
                            asyncio.ensure_future(newf))
                    except StopIteration as e:
                        pass
                    return f.result()
    while len(futures) > 0:
        yield first_to_finish()

async def fetch(url, session):
    async with session.get(url) as response:
        return await response.read()

limit = 1000

async def print_when_done(tasks):
    for res in limited_as_completed(tasks, limit):
        await res

r = int(sys.argv[1])
url = "http://localhost:8080/{}"
loop = asyncio.get_event_loop()
with ClientSession() as session:
    coros = (fetch(url.format(i), session) for i in range(r))
    loop.run_until_complete(print_when_done(coros))
loop.close()

Again, this limits the number of requests to 1000.

Test setup

Finally, we have a test runner script called “timed”:

#!/usr/bin/env bash

./server &
sleep 1 # Wait for server to start

/usr/bin/time --format "Memory usage: %MKB\tTime: %e seconds" "$@"

# %e Elapsed real (wall clock) time used by the process, in seconds.
# %M Maximum resident set size of the process in Kilobytes.

kill %1

This runs each process, ensuring the server is restarted each time it runs, and prints out how long it took to run, and how much memory it used.

Results

When making only 10 requests, the async clients worked faster because they launched all the requests simultaneously and only had to wait for the longest one (3 seconds). The memory usage of all three clients was fine:

$ ./timed ./client-sync 10
Memory usage: 20548KB	Time: 15.16 seconds
$ ./timed ./client-async-sem 10
Memory usage: 24996KB	Time: 3.13 seconds
$ ./timed ./client-async-as-completed 10
Memory usage: 23176KB	Time: 3.13 seconds

When making 100 requests, the synchronous client was very slow, but all three clients worked eventually:

$ ./timed ./client-sync 100
Memory usage: 20528KB	Time: 156.63 seconds
$ ./timed ./client-async-sem 100
Memory usage: 24980KB	Time: 3.21 seconds
$ ./timed ./client-async-as-completed 100
Memory usage: 24904KB	Time: 3.21 seconds

At this point let’s agree that life is too short to wait for the synchronous client.

When making 10000 requests, both async clients worked quite quickly, and both had increased memory usage, but the semaphore-based one used almost twice as much memory as the limited_as_completed version:

$ ./timed ./client-async-sem 10000
Memory usage: 77912KB	Time: 18.10 seconds
$ ./timed ./client-async-as-completed 10000
Memory usage: 46780KB	Time: 17.86 seconds

For 1 million requests, the semaphore-based client took 25 minutes on my (32GB RAM) machine. It only used about 10% of my CPU, and it used a lot of memory (over 3GB):

$ ./timed ./client-async-sem 1000000
Memory usage: 3815076KB	Time: 1544.04 seconds

Note: Paweł’s version only took 9 minutes on his laptop and used all his CPU, so I wonder whether I have made a mistake somewhere, or whether my version of Python (3.5.2) is not as good as a later one.

The limited_as_completed version ran in a similar amount of time but used 100% of my CPU, and used a much smaller amount of memory (162MB):

$ ./timed ./client-async-as-completed 1000000
Memory usage: 162168KB	Time: 1505.75 seconds

Now let’s try 100 million requests. The semaphore-based version lasted 10 hours before it was killed by Linux’s OOM Killer, but it didn’t manage to make any requests in this time, because it creates all its futures before it starts making requests:

$ ./timed ./client-async-sem 100000000
Command terminated by signal 9

I left the limited_as_completed version over the weekend and it managed to succeed eventually:

$ ./timed ./client-async-as-completed 100000000
Memory usage: 294304KB	Time: 150213.15 seconds

So its memory usage was still very bounded, and it managed to do about 665 requests/second over an extended period, which is almost identical to the throughput of the previous cases.

Conclusion

Making a million requests is usually enough, but when we really need to do a lot of work while keeping our memory usage bounded, it looks like an approach like limited_as_completed is a good way to go. I also think it’s slightly easier to understand.

In the next post I describe my attempt to get something like this added to the Python standard library.

Python – printing UTC dates in ISO8601 format with time zone

Andy Balaam from Andy Balaam's Blog

By default, when you make a UTC date from a Unix timestamp in Python and print it in ISO format, it has no time zone:

$ python3
>>> from datetime import datetime
>>> datetime.utcfromtimestamp(1496998804).isoformat()
'2017-06-09T09:00:04'

Whenever you talk about a datetime, I think you should always include a time zone, so I find this problematic.

The solution is to mention the timezone explicitly when you create the datetime:

$ python3
>>> from datetime import datetime, timezone
>>> datetime.fromtimestamp(1496998804, tz=timezone.utc).isoformat()
'2017-06-09T09:00:04+00:00'

Note, including the timezone explicitly works the same way when creating a datetime in other ways:

$ python3
>>> from datetime import datetime, timezone
>>> datetime(2017, 6, 9).isoformat()
'2017-06-09T00:00:00'
>>> datetime(2017, 6, 9, tzinfo=timezone.utc).isoformat()
'2017-06-09T00:00:00+00:00'

Python 3 – large numbers of tasks with limited concurrency

Andy Balaam from Andy Balaam's Blog

Series: asyncio basics, large numbers in parallel, parallel HTTP requests, adding to stdlib

I am interested in running large numbers of tasks in parallel, so I need something like asyncio.as_completed, but taking an iterable instead of a list, and with a limited number of tasks running concurrently. First, let’s try to build something pretty much equivalent to asyncio.as_completed. Here is my attempt, but I’d welcome feedback from readers who know better:

# Note this is not a coroutine - it returns
# an iterator - but it crucially depends on
# work being done inside the coroutines it
# yields - those coroutines empty out the
# list of futures it holds, and it will not
# end until that list is empty.
def my_as_completed(coros):

    # Start all the tasks
    futures = [asyncio.ensure_future(c) for c in coros]

    # A coroutine that waits for one of the
    # futures to finish and then returns
    # its result.
    async def first_to_finish():

        # Wait forever - we could add a
        # timeout here instead.
        while True:

            # Give up control to the scheduler
            # - otherwise we will spin here
            # forever!
            await asyncio.sleep(0)

            # Return anything that has finished
            for f in futures:
                if f.done():
                    futures.remove(f)
                    return f.result()

    # Keep yielding a waiting coroutine
    # until all the futures have finished.
    while len(futures) > 0:
        yield first_to_finish()

The above can be substituted for asyncio.as_completed in the code that uses it in the first article, and it seems to work. It also makes a reasonable amount of sense to me, so it may be correct, but I’d welcome comments and corrections.

my_as_completed above accepts an iterable and returns a generator producing results, but inside it starts all tasks concurrently, and stores all the futures in a list. To handle bigger lists we will need to do better, by limiting the number of running tasks to a sensible number.

Let’s start with a test program:

import asyncio
async def mycoro(number):
    print("Starting %d" % number)
    await asyncio.sleep(1.0 / number)
    print("Finishing %d" % number)
    return str(number)

async def print_when_done(tasks):
    for res in asyncio.as_completed(tasks):
        print("Result %s" % await res)

coros = [mycoro(i) for i in range(1, 101)]

loop = asyncio.get_event_loop()
loop.run_until_complete(print_when_done(coros))
loop.close()

This uses asyncio.as_completed to run 100 tasks and, because I adjusted the asyncio.sleep command to wait longer for earlier tasks, it prints something like this:

$ time python3 python-async.py
Starting 47
Starting 93
Starting 48
...
Finishing 93
Finishing 94
Finishing 95
...
Result 93
Result 94
Result 95
...
Finishing 46
Finishing 45
Finishing 42
...
Finishing 2
Result 2
Finishing 1
Result 1

real    0m1.590s
user    0m0.600s
sys 0m0.072s

So all 100 tasks were completed in 1.5 seconds, indicating that they really were run in parallel, but all 100 were allowed to run at the same time, with no limit.

We can adjust the test program to run using our customised my_as_completed function, and pass in an iterable of coroutines instead of a list by changing the last part of the program to look like this:

async def print_when_done(tasks):
    for res in my_as_completed(tasks):
        print("Result %s" % await res)
coros = (mycoro(i) for i in range(1, 101))
loop = asyncio.get_event_loop()
loop.run_until_complete(print_when_done(coros))
loop.close()

But we get similar output to last time, with all tasks running concurrently.

To limit the number of concurrent tasks, we limit the size of the futures list, and add more as needed:

from itertools import islice
def limited_as_completed(coros, limit):
    futures = [
        asyncio.ensure_future(c)
        for c in islice(coros, 0, limit)
    ]
    async def first_to_finish():
        while True:
            await asyncio.sleep(0)
            for f in futures:
                if f.done():
                    futures.remove(f)
                    try:
                        newf = next(coros)
                        futures.append(
                            asyncio.ensure_future(newf))
                    except StopIteration as e:
                        pass
                    return f.result()
    while len(futures) > 0:
        yield first_to_finish()

We start limit tasks at first, and whenever one ends, we ask for the next coroutine in coros and set it running. This keeps the number of running tasks at or below limit until we start running out of input coroutines (when next throws and we don’t add anything to futures), then futures starts emptying until we eventually stop yielding coroutine objects.

I thought this function might be useful to others, so I started a little repo over here and added it: asyncioplus/limited_as_completed.py. Please provide merge requests and log issues to improve it – maybe it should be part of standard Python?

When we run the same example program, but call limited_as_completed instead of the other versions:

async def print_when_done(tasks):
    for res in limited_as_completed(tasks, 10):
        print("Result %s" % await res)
coros = (mycoro(i) for i in range(1, 101))
loop = asyncio.get_event_loop()
loop.run_until_complete(print_when_done(coros))
loop.close()

We see output like this:

$ time python3 python-async.py
Starting 1
Starting 2
...
Starting 9
Starting 10
Finishing 10
Result 10
Starting 11
...
Finishing 100
Result 100
Finishing 1
Result 1

real	0m1.535s
user	0m1.436s
sys	0m0.084s

So we can see that the tasks are still running concurrently, but this time the number of concurrent tasks is limited to 10.

See also

To achieve a similar result using semaphores, see Python asyncio.semaphore in async-await function and Making 1 million requests with python-aiohttp.

It feels like limited_as_completed is more re-usable as an approach but I’d love to hear others’ thoughts on this. E.g. could/should I use a semaphore to implement limited_as_completed instead of manually holding a queue?

Basic ideas of Python 3 asyncio concurrency

Andy Balaam from Andy Balaam's Blog

Series: asyncio basics, large numbers in parallel, parallel HTTP requests, adding to stdlib

Python 3’s asyncio module and the async and await keywords combine to allow us to do cooperative concurrent programming, where a code path voluntarily yields control to a scheduler, trusting that it will get control back when some resource has become available (or just when the scheduler feels like it). This way of programming can be very confusing, and has been popularised by Twisted in the Python world, and nodejs (among others) in other worlds.

I have been trying to get my head around the basic ideas as they surface in Python 3’s model. Below are some definitions and explanations that have been useful to me as I tried to grasp how it all works.

Futures and coroutines are both things that you can wait for.

You can make a coroutine by declaring it with async def:

import asyncio
async def mycoro(number):
    print("Starting %d" % number)
    await asyncio.sleep(1)
    print("Finishing %d" % number)
    return str(number)

Almost always, a coroutine will await something such as some blocking IO. (Above we just sleep for a second.) When we await, we actually yield control to the scheduler so it can do other work and wake us up later, when something interesting has happened.

You can make a future out of a coroutine, but often you don’t need to. Bear in mind that if you do want to make a future, you should use ensure_future, but this actually runs what you pass to it – it doesn’t just create a future:

myfuture1 = asyncio.ensure_future(mycoro(1))
# Runs mycoro!

But, to get its result, you must wait for it – it is only scheduled in the background:

# Assume mycoro is defined as above
myfuture1 = asyncio.ensure_future(mycoro(1))
# We end the program without waiting for the future to finish

So the above fails like this:

$ python3 ./python-async.py
Task was destroyed but it is pending!
task: <Task pending coro=<mycoro() running at ./python-async:10>>
sys:1: RuntimeWarning: coroutine 'mycoro' was never awaited

The right way to block waiting for a future outside of a coroutine is to ask the event loop to do it:

# Keep on assuming mycoro is defined as above for all the examples
myfuture1 = asyncio.ensure_future(mycoro(1))
loop = asyncio.get_event_loop()
loop.run_until_complete(myfuture1)
loop.close()

Now this works properly (although we’re not yet getting any benefit from being asynchronous):

$ python3 python-async.py
Starting 1
Finishing 1

To run several things concurrently, we make a future that is the combination of several other futures. asyncio can make a future like that out of coroutines using asyncio.gather:

several_futures = asyncio.gather(
    mycoro(1), mycoro(2), mycoro(3))
loop = asyncio.get_event_loop()
print(loop.run_until_complete(several_futures))
loop.close()

The three coroutines all run at the same time, so this only takes about 1 second to run, even though we are running 3 tasks, each of which takes 1 second:

$ python3 python-async.py
Starting 3
Starting 1
Starting 2
Finishing 3
Finishing 1
Finishing 2
['1', '2', '3']

asyncio.gather won’t necessarily run your coroutines in order, but it will return a list of results in the same order as its input.

Notice also that run_until_complete returns the result of the future created by gather – a list of all the results from the individual coroutines.

To do the next bit we need to know how to call a coroutine from a coroutine. As we’ve already seen, just calling a coroutine in the normal Python way doesn’t run it, but gives you back a “coroutine object”. To actually run the code, we need to wait for it. When we want to block everything until we have a result, we can use something like run_until_complete but in an async context we want to yield control to the scheduler and let it give us back control when the coroutine has finished. We do that by using await:

import asyncio
async def f2():
    print("start f2")
    await asyncio.sleep(1)
    print("stop f2")
async def f1():
    print("start f1")
    await f2()
    print("stop f1")
loop = asyncio.get_event_loop()
loop.run_until_complete(f1())
loop.close()

This prints:

$ python3 python-async.py
start f1
start f2
stop f2
stop f1

Now we know how to call a coroutine from inside a coroutine, we can continue.

We have seen that asyncio.gather takes in some futures/coroutines and returns a future that collects their results (in order).

If, instead, you want to get results as soon as they are available, you need to write a second coroutine that deals with each result by looping through the results of asyncio.as_completed and awaiting each one.

# Keep on assuming mycoro is defined as at the top
async def print_when_done(tasks):
    for res in asyncio.as_completed(tasks):
        print("Result %s" % await res)
coros = [mycoro(1), mycoro(2), mycoro(3)]
loop = asyncio.get_event_loop()
loop.run_until_complete(print_when_done(coros))
loop.close()

This prints:

$ python3 python-async.py
Starting 1
Starting 3
Starting 2
Finishing 3
Result 3
Finishing 2
Result 2
Finishing 1
Result 1

Notice that task 3 finishes first and its result is printed, even though tasks 1 and 2 are still running.

asyncio.as_completed returns an iterable sequence of futures, each of which must be awaited, so it must run inside a coroutine, which must be waited for too.

The argument to asyncio.as_completed has to be a list of coroutines or futures, not an iterable, so you can’t use it with a very large list of items that won’t fit in memory.

Side note: if we want to work with very large lists, asyncio.wait won’t help us here – it also takes a list of futures and waits for all of them to complete (like gather), or, with other arguments, for one of them to complete or one of them to fail. It then returns two sets of futures: done and not-done. Each of these must be awaited to get their results, so:

asyncio.gather

# is roughly equivalent to:

async def mygather(*args):
    ret = []
    for r in (await asyncio.wait(args))[0]:
        ret.append(await r)
    return ret

I am interested in running very large numbers of tasks with limited concurrency – see the next article for how I managed it.

Event-Sourced Domain Models in Python at PyCon UK

Rob Smallshire from Good With Computers

At PyCon UK 2015 I led a very well attended workshop with the goal of introducing Python developers to the tried-and-tested techniques and patterns of Domain Driven Design (DDD), in particular when used as part of an event-sourced architecture.

The two-and-a-half hour workshop was comprised of excerpts from our training course DDD Patterns in Python. Although the workshop material was heavily edited and compressed from the course – I'm confident that the majority of attendees grasped the main principles.

Several attendees have since asked for the introductory slides, which preceded the exercises. Here they are:

Sixty North training materials are for individual use. For training in a commercial setting please contact us to book a training course or obtain a license for the materials.

Event Processing with Transducers

Rob Smallshire from Good With Computers

In the previous article in this series on transducers we looked at lazily evaluating transducers. This time we'll look not at pulling output through a transducer chain from downstream, but at pushing input items into the chain from upstream.

All of the uses of transducers we've demonstrated in Python so far are probably better handled by existing and well established Python programming techniques, such as generator expressions and generator functions. At this point in the series, we move definitely beyond that into new territory where transducers bring completely new capabilities to Python.

One the key selling points of transducers is that they abstract the essence of a transformation away from the details of the data series that is being transformed. We'll show this in Python by using transducers to transform a series of events modelled using Python coroutines.

Coroutines in Python

Coroutines in Python are little-used, and their workings are not widely known, so their implementation bears repeating here. If you're familiar with the notion of coroutines in general, and the specifics of how they're implemented in Python, you can skim over this section.

Coroutines are like generator functions insofar as they are resumable functions. In fact, coroutines in Python are generator functions which use yield as an expression rather than a statement. What this means in practice is that generator function objects sport a send() method which allows the client of the generator function to transmit information to a running generator and for the generator to receive this data as the value of the yield expression. As usual, an example will serve to make things clearer.

We'll start by defining a generator function which enters an infinite loop, waits at the yield expression for a value to be received, and then prints this value to the console.

>>> def event_receiver():
...     while True:
...         message = (yield)
...         print("Message:", message)
...
>>>

We create a generator object just the same as we would with any other generator:

>>> r = event_receiver()
>>> r

Now we'll try to send it a message, using the send() method of the generator object:

>>> r.send("message")
Traceback (most recent call last):
  File "", line 1, in
TypeError: can't send non-None value to a just-started generator
>>>

This actually fails, because the generator code has not yet been executed at all. We need to prime the pump, so to speak, by advancing execution to the first occurrence of yield. We can do this by passing the generator to the next() built-in:

>>> next(r)

We'll fix this pump-priming annoyance of generator based coroutines shortly.

Now we can send messages:

>>> r.send("message")
Message: message
>>> r.send("another message")
Message: another message

When we're done, we terminate the coroutine by calling the close() method. (This actually raises a GeneratorExit exception at the site of the yield expression, which allows control flow to exit the otherwise infinite loop; this special exception is intercepted by the Python runtime system, so it isn't seen by us at the console).

>>> r.close()
>>>

Any further attempts to send() messages into the generator function cause StopIteration to be raised. This, of course, is the normal means of indicating that a generator is exhausted:

>>> r.send("message")
Traceback (most recent call last):
  File "", line 1, in
StopIteration
>>>

Priming generator-based coroutines

Now to address the awkwardness of having to prime coroutine generator functions by initially passing them to next(). We can improve this with a function decorator which creates the generator object and calls next on our behalf. We'll call the decorator @coroutine:

def coroutine(func):
    def start(*args, **kwargs):
        g = func(*args, **kwargs)
        next(g)
        return g
    return start

We'll use our new decorator to assist in defining a slightly more sophisticated coroutine for printing, called rprint():

import sys

@coroutine
def rprint(sep='\n', end=''):
    """A coroutine sink which prints received items to stdout

    Args:
      sep: Optional separator to be printed between received items.
      end: Optional terminator to be printed after the last item.
    """
    try:
        first_item = (yield)
        sys.stdout.write(str(first_item))
        sys.stdout.flush()
        while True:
            item = (yield)
            sys.stdout.write(sep)
            sys.stdout.write(str((item)))
            sys.stdout.flush()
    except GeneratorExit:
        sys.stdout.write(end)
        sys.stdout.flush()

In this implementation, we intercept GeneratorExit explicitly to give us the opportunity to print a terminator. We also regularly flush the stream so we get immediate feedback for our following experiments.

Event sources

The opposite of a sink is a source. Until now, we've been sourcing 'events' ourself by sending them from the REPL, but to make this a little more interesting, we'll cook up a function – just a plain old function, not a generator – which takes values from an iterable series and intermittently sends them, after a delay, to anything with a send() method such as our coroutine generators. For fun, the random delay has a so-called Poisson distribution which mimics a radioactive source; imagine a device with a geiger counter which sends the next item from an iterable series each time an atom decays:

def poisson_source(rate, iterable, target):
    """Send events at random times with uniform probability.

    Args:
      rate: The average number of events to send per second.

      iterable: A series of items which will be sent to the target
          one by one.

      target: The target coroutine or sink.

    Returns:
        The completed value, or None if iterable was exhausted
        and the target was closed.
    """
    for item in iterable:
        duration = random.expovariate(rate)
        sleep(duration)
        try:
            target.send(item)
        except StopIteration as e:
            return e.value
    target.close()
    return None

When either the iterable series is exhausted or the target signals it has terminated (by raising StopIteration) we call close() on the target. Note that by supplying an infinite iterable series we could make the source send events forever.

Let's hook our source and sink together at the REPL:

>>> printer = rprint(sep=', ', end='\nDONE!\n')
>>> count_to_nine = range(10)
>>> poisson_source(rate=0.5, iterable=count_to_nine, target=printer)
0, 1, 2, 3, 4, 5, 6, 7, 8, 9
DONE!
>>>

Combined event sources and event sinks

Of course, we can build functions which act as both sinks and sources, transforming the messages they receive in some way and forwarding the processed results onwards to another sink. Here's a combined source and sink function, which simply doubles the values it receives:

@coroutine
def doubler(target):
    while True:
        item = (yield)
        doubled_item = item * 2
        try:
            target.send(doubled_item)
        except StopIteration as e:
            return e.value

To use, doubler() we chain the components of the pipeline together

>>> printer = rprint(sep=', ', end='\nDONE!\n')
>>> count_to_nine = range(10)
>>> poisson_source(rate=0.5,
...                iterable=count_to_nine,
...                target=doubler(target=printer))
0, 2, 4, 6, 8, 10, 12, 14, 16, 18
DONE!

From doubler() it's but a short hop to a more general mapper() which accepts an arbitrary transforming function:

@coroutine
def mapper(transform, target):
    while True:
        item = (yield)
        transformed_item = transform(item)
        try:
            target.send(transformed_item)
        except StopIteration as e:
            return e.value

Used like so,

>>> printer = rprint(sep=', ', end='\nDONE!\n')
>>> count_to_nine = range(10)
>>> poisson_source(rate=0.5, iterable=count_to_nine, target=mapper(transform=square, target=printer))
0, 1, 4, 9, 16, 25, 36, 49, 64, 81
DONE!

From here, you can see how we could also implement equivalents of filter(), reduce() and so on to operate on the 'push' event stream modelled by Python coroutines.

The point here is that we can't just re-use any existing functions which process 'pull' data series – such as the functions in itertools – with 'push' data series. Each and every function needs to be reimplemented to accept values pushed from upstream, and send processed results downstream.

Transducing events

Transducers provide a way out of this quandary. We've demonstrated earlier in this series that 'reduce' is a fundamental operation, and by reimagining reduce() into a more general transduce() we were able to use the same transducers to operate on both eager and lazy data series. We can do the same with coroutine-based push events, by implementing a version of transduce() which allows us to use any transducer to process a stream of such events.

Our reactive_transduce() is a coroutine which accepts two arguments: a transducer and a target sink to which the transduced results will be sent:

@coroutine
def reactive_transduce(transducer, target=None):
    reducer = transducer(sending())
    accumulator = target if (target is not None) else reducer.initial()
    try:
        while True:
            item = (yield)
            accumulator = reducer.step(accumulator, item)
            if isinstance(accumulator, Reduced):
                accumulator = accumulator.value
                break
    except GeneratorExit:
        pass
    return reducer.complete(accumulator)

The reactive_transduce() function connects to the upstream end of a transducer chain, adapting from the coroutine protocol to the reducer interface. At the downstream end of the transducer chain, we need to adapt the other way, from the reducer interface to the coroutine protocol. To do this we use a reducer called Sending, which we hard-wire as the 'bottom' reducer on the first line of reactive_transduce(). The Sending reducer looks like this:

class Sending:
    def initial(self):
        return null_sink()

    def step(self, result, item):
        try:
            result.send(item)
        except StopIteration:
            return Reduced(result)
        else:
            return result

    def complete(result):
        result.close()
        return result

The step() method literally sends the next item to the result – which must therefore be a legitimate event sink. Should the sink indicate that it can't accept a further item, by raising StopIteration we return the result wrapped in the Reduced sentinel. The initial() method provides a legitimate sink – just a simple do-nothing sink defined as:

@coroutine
def null_sink():
    while True:
        _ = (yield)

Going back to reactive_transduce() the main loop continues to iterate, receiving new values via a yield expression, until such time as GeneratorExit is signalled by the client or the reducer signals termination by returning Reduced.

When the main loop is exited by whatever means, we give the reducer opportunity to complete(), and the Sending.complete() method ensures that close() is called on the target.

With these pieces in place, let's look at how to use reactive_transduce(). We'll reproduce our previous example where we squared the output from poisson_source(), but this time using the mapping() transducer to do the work:

>>> poisson_source(rate=0.5,
...                iterable=range(10),
...                target=transduce(transducer=mapping(square),
...                target=printer))
...
0, 1, 4, 9, 16, 25, 36, 49, 64, 81
DONE!

The key point here is that we can now take an arbitrary transducer and reuse it with eager collections, lazy iterables, and push-events! In fact, simply by devising an appropriate transduce function we can use re-use our transducers in an arbitrary data-series processing context.

This is the true power of transducers: Data processing components completely abstracted away from how the input data arrives, or to where the output results are sent.