All of the uses of transducers we've demonstrated in Python so far are
probably better handled by existing and well established Python
programming techniques, such as generator expressions and generator
functions. At this point in the series, we move definitely beyond that
into new territory where transducers bring completely new capabilities
to Python.
One the key selling points of transducers is that they abstract the
essence of a transformation away from the details of the data series
that is being transformed. We'll show this in Python by using
transducers to transform a series of events modelled using Python
coroutines.
Coroutines in Python
Coroutines in Python are little-used, and their workings are not widely
known, so their implementation bears repeating here. If you're familiar
with the notion of coroutines in general, and the specifics of how
they're implemented in Python, you can skim over this section.
Coroutines are like generator functions insofar as they are resumable
functions. In fact, coroutines in Python are generator functions which
use yield as an expression rather than a statement. What this means
in practice is that generator function objects sport a send() method
which allows the client of the generator function to transmit
information to a running generator and for the generator to receive this
data as the value of the yield expression. As usual, an example will
serve to make things clearer.
We'll start by defining a generator function which enters an infinite
loop, waits at the yield expression for a value to be received, and
then prints this value to the console.
>>> def event_receiver():
... while True:
... message = (yield)
... print("Message:", message)
...
>>>
We create a generator object just the same as we would with any other
generator:
>>> r = event_receiver()
>>> r
Now we'll try to send it a message, using the send() method of the
generator object:
>>> r.send("message")
Traceback (most recent call last):
File "", line 1, in
TypeError: can't send non-None value to a just-started generator
>>>
This actually fails, because the generator code has not yet been
executed at all. We need to prime the pump, so to speak, by advancing
execution to the first occurrence of yield. We can do this by
passing the generator to the next() built-in:
We'll fix this pump-priming annoyance of generator based coroutines
shortly.
Now we can send messages:
>>> r.send("message")
Message: message
>>> r.send("another message")
Message: another message
When we're done, we terminate the coroutine by calling the close()
method. (This actually raises a GeneratorExit exception at the site
of the yield expression, which allows control flow to exit the
otherwise infinite loop; this special exception is intercepted by the
Python runtime system, so it isn't seen by us at the console).
Any further attempts to send() messages into the generator function
cause StopIteration to be raised. This, of course, is the normal
means of indicating that a generator is exhausted:
>>> r.send("message")
Traceback (most recent call last):
File "", line 1, in
StopIteration
>>>
Priming generator-based coroutines
Now to address the awkwardness of having to prime coroutine generator
functions by initially passing them to next(). We can improve this
with a function decorator which creates the generator object and calls
next on our behalf. We'll call the decorator @coroutine:
def coroutine(func):
def start(*args, **kwargs):
g = func(*args, **kwargs)
next(g)
return g
return start
We'll use our new decorator to assist in defining a slightly more
sophisticated coroutine for printing, called rprint():
import sys
@coroutine
def rprint(sep='\n', end=''):
"""A coroutine sink which prints received items to stdout
Args:
sep: Optional separator to be printed between received items.
end: Optional terminator to be printed after the last item.
"""
try:
first_item = (yield)
sys.stdout.write(str(first_item))
sys.stdout.flush()
while True:
item = (yield)
sys.stdout.write(sep)
sys.stdout.write(str((item)))
sys.stdout.flush()
except GeneratorExit:
sys.stdout.write(end)
sys.stdout.flush()
In this implementation, we intercept GeneratorExit explicitly to
give us the opportunity to print a terminator. We also regularly flush
the stream so we get immediate feedback for our following experiments.
Event sources
The opposite of a sink is a source. Until now, we've been sourcing
'events' ourself by sending them from the REPL, but to make this a
little more interesting, we'll cook up a function – just a plain old
function, not a generator – which takes values from an iterable series
and intermittently sends them, after a delay, to anything with a
send() method such as our coroutine generators. For fun, the random
delay has a so-called Poisson distribution which mimics a radioactive
source; imagine a device with a geiger counter which sends the next item
from an iterable series each time an atom decays:
def poisson_source(rate, iterable, target):
"""Send events at random times with uniform probability.
Args:
rate: The average number of events to send per second.
iterable: A series of items which will be sent to the target
one by one.
target: The target coroutine or sink.
Returns:
The completed value, or None if iterable was exhausted
and the target was closed.
"""
for item in iterable:
duration = random.expovariate(rate)
sleep(duration)
try:
target.send(item)
except StopIteration as e:
return e.value
target.close()
return None
When either the iterable series is exhausted or the target signals it
has terminated (by raising StopIteration) we call close() on the
target. Note that by supplying an infinite iterable series we could make
the source send events forever.
Let's hook our source and sink together at the REPL:
>>> printer = rprint(sep=', ', end='\nDONE!\n')
>>> count_to_nine = range(10)
>>> poisson_source(rate=0.5, iterable=count_to_nine, target=printer)
0, 1, 2, 3, 4, 5, 6, 7, 8, 9
DONE!
>>>
Combined event sources and event sinks
Of course, we can build functions which act as both sinks and sources,
transforming the messages they receive in some way and forwarding the
processed results onwards to another sink. Here's a combined source and
sink function, which simply doubles the values it receives:
@coroutine
def doubler(target):
while True:
item = (yield)
doubled_item = item * 2
try:
target.send(doubled_item)
except StopIteration as e:
return e.value
To use, doubler() we chain the components of the pipeline together
>>> printer = rprint(sep=', ', end='\nDONE!\n')
>>> count_to_nine = range(10)
>>> poisson_source(rate=0.5,
... iterable=count_to_nine,
... target=doubler(target=printer))
0, 2, 4, 6, 8, 10, 12, 14, 16, 18
DONE!
From doubler() it's but a short hop to a more general mapper()
which accepts an arbitrary transforming function:
@coroutine
def mapper(transform, target):
while True:
item = (yield)
transformed_item = transform(item)
try:
target.send(transformed_item)
except StopIteration as e:
return e.value
Used like so,
>>> printer = rprint(sep=', ', end='\nDONE!\n')
>>> count_to_nine = range(10)
>>> poisson_source(rate=0.5, iterable=count_to_nine, target=mapper(transform=square, target=printer))
0, 1, 4, 9, 16, 25, 36, 49, 64, 81
DONE!
From here, you can see how we could also implement equivalents of
filter(), reduce() and so on to operate on the 'push' event
stream modelled by Python coroutines.
The point here is that we can't just re-use any existing functions which
process 'pull' data series – such as the functions in itertools –
with 'push' data series. Each and every function needs to be
reimplemented to accept values pushed from upstream, and send processed
results downstream.
Transducing events
Transducers provide a way out of this quandary. We've demonstrated
earlier in this series that 'reduce' is a fundamental operation, and by
reimagining reduce() into a more general transduce() we were
able to use the same transducers to operate on both eager and lazy data
series. We can do the same with coroutine-based push events, by
implementing a version of transduce() which allows us to use any
transducer to process a stream of such events.
Our reactive_transduce() is a coroutine which accepts two arguments:
a transducer and a target sink to which the transduced results will be
sent:
@coroutine
def reactive_transduce(transducer, target=None):
reducer = transducer(sending())
accumulator = target if (target is not None) else reducer.initial()
try:
while True:
item = (yield)
accumulator = reducer.step(accumulator, item)
if isinstance(accumulator, Reduced):
accumulator = accumulator.value
break
except GeneratorExit:
pass
return reducer.complete(accumulator)
The reactive_transduce() function connects to the upstream end of a
transducer chain, adapting from the coroutine protocol to the reducer
interface. At the downstream end of the transducer chain, we need to
adapt the other way, from the reducer interface to the coroutine
protocol. To do this we use a reducer called Sending, which we
hard-wire as the 'bottom' reducer on the first line of
reactive_transduce(). The Sending reducer looks like this:
class Sending:
def initial(self):
return null_sink()
def step(self, result, item):
try:
result.send(item)
except StopIteration:
return Reduced(result)
else:
return result
def complete(result):
result.close()
return result
The step() method literally sends the next item to the result –
which must therefore be a legitimate event sink. Should the sink
indicate that it can't accept a further item, by raising
StopIteration we return the result wrapped in the Reduced
sentinel. The initial() method provides a legitimate sink – just a
simple do-nothing sink defined as:
@coroutine
def null_sink():
while True:
_ = (yield)
Going back to reactive_transduce() the main loop continues to
iterate, receiving new values via a yield expression, until such time as
GeneratorExit is signalled by the client or the reducer signals
termination by returning Reduced.
When the main loop is exited by whatever means, we give the reducer
opportunity to complete(), and the Sending.complete() method
ensures that close() is called on the target.
With these pieces in place, let's look at how to use
reactive_transduce(). We'll reproduce our previous example where we
squared the output from poisson_source(), but this time using the
mapping() transducer to do the work:
>>> poisson_source(rate=0.5,
... iterable=range(10),
... target=transduce(transducer=mapping(square),
... target=printer))
...
0, 1, 4, 9, 16, 25, 36, 49, 64, 81
DONE!
The key point here is that we can now take an arbitrary transducer and
reuse it with eager collections, lazy iterables, and push-events! In
fact, simply by devising an appropriate transduce function we can use
re-use our transducers in an arbitrary data-series processing context.
This is the true power of transducers: Data processing components
completely abstracted away from how the input data arrives, or to where
the output results are sent.