Event Processing with Transducers

Rob Smallshire from Good With Computers

In the previous article in this series on transducers we looked at lazily evaluating transducers. This time we'll look not at pulling output through a transducer chain from downstream, but at pushing input items into the chain from upstream.

All of the uses of transducers we've demonstrated in Python so far are probably better handled by existing and well established Python programming techniques, such as generator expressions and generator functions. At this point in the series, we move definitely beyond that into new territory where transducers bring completely new capabilities to Python.

One the key selling points of transducers is that they abstract the essence of a transformation away from the details of the data series that is being transformed. We'll show this in Python by using transducers to transform a series of events modelled using Python coroutines.

Coroutines in Python

Coroutines in Python are little-used, and their workings are not widely known, so their implementation bears repeating here. If you're familiar with the notion of coroutines in general, and the specifics of how they're implemented in Python, you can skim over this section.

Coroutines are like generator functions insofar as they are resumable functions. In fact, coroutines in Python are generator functions which use yield as an expression rather than a statement. What this means in practice is that generator function objects sport a send() method which allows the client of the generator function to transmit information to a running generator and for the generator to receive this data as the value of the yield expression. As usual, an example will serve to make things clearer.

We'll start by defining a generator function which enters an infinite loop, waits at the yield expression for a value to be received, and then prints this value to the console.

>>> def event_receiver():
...     while True:
...         message = (yield)
...         print("Message:", message)
...
>>>

We create a generator object just the same as we would with any other generator:

>>> r = event_receiver()
>>> r

Now we'll try to send it a message, using the send() method of the generator object:

>>> r.send("message")
Traceback (most recent call last):
  File "", line 1, in
TypeError: can't send non-None value to a just-started generator
>>>

This actually fails, because the generator code has not yet been executed at all. We need to prime the pump, so to speak, by advancing execution to the first occurrence of yield. We can do this by passing the generator to the next() built-in:

>>> next(r)

We'll fix this pump-priming annoyance of generator based coroutines shortly.

Now we can send messages:

>>> r.send("message")
Message: message
>>> r.send("another message")
Message: another message

When we're done, we terminate the coroutine by calling the close() method. (This actually raises a GeneratorExit exception at the site of the yield expression, which allows control flow to exit the otherwise infinite loop; this special exception is intercepted by the Python runtime system, so it isn't seen by us at the console).

>>> r.close()
>>>

Any further attempts to send() messages into the generator function cause StopIteration to be raised. This, of course, is the normal means of indicating that a generator is exhausted:

>>> r.send("message")
Traceback (most recent call last):
  File "", line 1, in
StopIteration
>>>

Priming generator-based coroutines

Now to address the awkwardness of having to prime coroutine generator functions by initially passing them to next(). We can improve this with a function decorator which creates the generator object and calls next on our behalf. We'll call the decorator @coroutine:

def coroutine(func):
    def start(*args, **kwargs):
        g = func(*args, **kwargs)
        next(g)
        return g
    return start

We'll use our new decorator to assist in defining a slightly more sophisticated coroutine for printing, called rprint():

import sys

@coroutine
def rprint(sep='\n', end=''):
    """A coroutine sink which prints received items to stdout

    Args:
      sep: Optional separator to be printed between received items.
      end: Optional terminator to be printed after the last item.
    """
    try:
        first_item = (yield)
        sys.stdout.write(str(first_item))
        sys.stdout.flush()
        while True:
            item = (yield)
            sys.stdout.write(sep)
            sys.stdout.write(str((item)))
            sys.stdout.flush()
    except GeneratorExit:
        sys.stdout.write(end)
        sys.stdout.flush()

In this implementation, we intercept GeneratorExit explicitly to give us the opportunity to print a terminator. We also regularly flush the stream so we get immediate feedback for our following experiments.

Event sources

The opposite of a sink is a source. Until now, we've been sourcing 'events' ourself by sending them from the REPL, but to make this a little more interesting, we'll cook up a function – just a plain old function, not a generator – which takes values from an iterable series and intermittently sends them, after a delay, to anything with a send() method such as our coroutine generators. For fun, the random delay has a so-called Poisson distribution which mimics a radioactive source; imagine a device with a geiger counter which sends the next item from an iterable series each time an atom decays:

def poisson_source(rate, iterable, target):
    """Send events at random times with uniform probability.

    Args:
      rate: The average number of events to send per second.

      iterable: A series of items which will be sent to the target
          one by one.

      target: The target coroutine or sink.

    Returns:
        The completed value, or None if iterable was exhausted
        and the target was closed.
    """
    for item in iterable:
        duration = random.expovariate(rate)
        sleep(duration)
        try:
            target.send(item)
        except StopIteration as e:
            return e.value
    target.close()
    return None

When either the iterable series is exhausted or the target signals it has terminated (by raising StopIteration) we call close() on the target. Note that by supplying an infinite iterable series we could make the source send events forever.

Let's hook our source and sink together at the REPL:

>>> printer = rprint(sep=', ', end='\nDONE!\n')
>>> count_to_nine = range(10)
>>> poisson_source(rate=0.5, iterable=count_to_nine, target=printer)
0, 1, 2, 3, 4, 5, 6, 7, 8, 9
DONE!
>>>

Combined event sources and event sinks

Of course, we can build functions which act as both sinks and sources, transforming the messages they receive in some way and forwarding the processed results onwards to another sink. Here's a combined source and sink function, which simply doubles the values it receives:

@coroutine
def doubler(target):
    while True:
        item = (yield)
        doubled_item = item * 2
        try:
            target.send(doubled_item)
        except StopIteration as e:
            return e.value

To use, doubler() we chain the components of the pipeline together

>>> printer = rprint(sep=', ', end='\nDONE!\n')
>>> count_to_nine = range(10)
>>> poisson_source(rate=0.5,
...                iterable=count_to_nine,
...                target=doubler(target=printer))
0, 2, 4, 6, 8, 10, 12, 14, 16, 18
DONE!

From doubler() it's but a short hop to a more general mapper() which accepts an arbitrary transforming function:

@coroutine
def mapper(transform, target):
    while True:
        item = (yield)
        transformed_item = transform(item)
        try:
            target.send(transformed_item)
        except StopIteration as e:
            return e.value

Used like so,

>>> printer = rprint(sep=', ', end='\nDONE!\n')
>>> count_to_nine = range(10)
>>> poisson_source(rate=0.5, iterable=count_to_nine, target=mapper(transform=square, target=printer))
0, 1, 4, 9, 16, 25, 36, 49, 64, 81
DONE!

From here, you can see how we could also implement equivalents of filter(), reduce() and so on to operate on the 'push' event stream modelled by Python coroutines.

The point here is that we can't just re-use any existing functions which process 'pull' data series – such as the functions in itertools – with 'push' data series. Each and every function needs to be reimplemented to accept values pushed from upstream, and send processed results downstream.

Transducing events

Transducers provide a way out of this quandary. We've demonstrated earlier in this series that 'reduce' is a fundamental operation, and by reimagining reduce() into a more general transduce() we were able to use the same transducers to operate on both eager and lazy data series. We can do the same with coroutine-based push events, by implementing a version of transduce() which allows us to use any transducer to process a stream of such events.

Our reactive_transduce() is a coroutine which accepts two arguments: a transducer and a target sink to which the transduced results will be sent:

@coroutine
def reactive_transduce(transducer, target=None):
    reducer = transducer(sending())
    accumulator = target if (target is not None) else reducer.initial()
    try:
        while True:
            item = (yield)
            accumulator = reducer.step(accumulator, item)
            if isinstance(accumulator, Reduced):
                accumulator = accumulator.value
                break
    except GeneratorExit:
        pass
    return reducer.complete(accumulator)

The reactive_transduce() function connects to the upstream end of a transducer chain, adapting from the coroutine protocol to the reducer interface. At the downstream end of the transducer chain, we need to adapt the other way, from the reducer interface to the coroutine protocol. To do this we use a reducer called Sending, which we hard-wire as the 'bottom' reducer on the first line of reactive_transduce(). The Sending reducer looks like this:

class Sending:
    def initial(self):
        return null_sink()

    def step(self, result, item):
        try:
            result.send(item)
        except StopIteration:
            return Reduced(result)
        else:
            return result

    def complete(result):
        result.close()
        return result

The step() method literally sends the next item to the result – which must therefore be a legitimate event sink. Should the sink indicate that it can't accept a further item, by raising StopIteration we return the result wrapped in the Reduced sentinel. The initial() method provides a legitimate sink – just a simple do-nothing sink defined as:

@coroutine
def null_sink():
    while True:
        _ = (yield)

Going back to reactive_transduce() the main loop continues to iterate, receiving new values via a yield expression, until such time as GeneratorExit is signalled by the client or the reducer signals termination by returning Reduced.

When the main loop is exited by whatever means, we give the reducer opportunity to complete(), and the Sending.complete() method ensures that close() is called on the target.

With these pieces in place, let's look at how to use reactive_transduce(). We'll reproduce our previous example where we squared the output from poisson_source(), but this time using the mapping() transducer to do the work:

>>> poisson_source(rate=0.5,
...                iterable=range(10),
...                target=transduce(transducer=mapping(square),
...                target=printer))
...
0, 1, 4, 9, 16, 25, 36, 49, 64, 81
DONE!

The key point here is that we can now take an arbitrary transducer and reuse it with eager collections, lazy iterables, and push-events! In fact, simply by devising an appropriate transduce function we can use re-use our transducers in an arbitrary data-series processing context.

This is the true power of transducers: Data processing components completely abstracted away from how the input data arrives, or to where the output results are sent.

Useful regular expressions for searching C++ code with Visual Studio

The Lone C++ Coder's Blog from The Lone C++ Coder's Blog

I’m generally more of a grep person but sometimes it’s easier to just use the built-in search in Visual Studio, especially if you want to be able to restrict the search to parts of your Visual Studio solution. Visual Studio does have pretty powerful search built in if you do use regular expressions instead of the default text matching. Here are a couple of regexes to get you started: Find all shared_ptr calls that use “new” instead of the recommended make_shared: shared_ptr<.

Merging two sets of Jacoco Integration Test Coverage Reports for Sonar using Gradle

Tim Pizey from Tim Pizey

In a Jenkins build:


./gradlew cukeTest
./gradlew integTest
./gradlew sonarrunner

This leaves three .exec files behind. SonarQube can only use two. So we merge the two integration tests results.


task integCukeMerge(type: JacocoMerge) {
description = 'Merge test code coverage results from feign and cucumber'
// This assumes cuketests have already been run in a separate gradle session

doFirst {
delete destinationFile
// Wait until integration tests have actually finished
println start.process != null ? start.process.waitFor() : "In integCukeMerge tomcat is null"
}

executionData fileTree("${buildDir}/jacoco/it/")

}


sonarRunner {
tasks.sonarRunner.dependsOn integCukeMerge
sonarProperties {
property "sonar.projectDescription", "A legacy codebase."
property "sonar.exclusions", "**/fakes/**/*.java, **/domain/**.java, **/*Response.java"

properties["sonar.tests"] += sourceSets.integTest.allSource.srcDirs

property "sonar.jdbc.url", "jdbc:postgresql://localhost/sonar"
property "sonar.jdbc.driverClassName", "org.postgresql.Driver"
property "sonar.jdbc.username", "sonar"
property "sonar.host.url", "http://sonar.we7.local:9000"


def jenkinsBranchName = System.getenv("GIT_BRANCH")
if (jenkinsBranchName != null) {
jenkinsBranchName = jenkinsBranchName.substring(jenkinsBranchName.lastIndexOf('/') + 1)
}
def branch = jenkinsBranchName ?: ('git rev-parse --abbrev-ref HEAD'.execute().text ?: 'unknown').trim()

def buildName = System.getenv("JOB_NAME")
if (buildName == null) {
property "sonar.projectKey", "${name}"
def username = System.getProperty('user.name')
property "sonar.projectName", "~${name.capitalize()} (${username})"
property "sonar.branch", "developer"
} else {
property "sonar.projectKey", "$buildName"
property "sonar.projectName", name.capitalize()
property "sonar.branch", "${branch}"
property "sonar.links.ci", "http://jenkins/job/${buildName}/"
}

property "sonar.projectVersion", "git describe --abbrev=0".execute().text.trim()

property "sonar.java.coveragePlugin", "jacoco"
property "sonar.jacoco.reportPath", "$project.buildDir/jacoco/test.exec"
// feign results
//property "sonar.jacoco.itReportPath", "$project.buildDir/jacoco/it/integTest.exec"
// Cucumber results
//property "sonar.jacoco.itReportPath", "$project.buildDir/jacoco/it/cukeTest.exec"
// Merged results
property "sonar.jacoco.itReportPath", "$project.buildDir/jacoco/integCukeMerge.exec"

property "sonar.links.homepage", "https://github.com/${org}/${name}"
}
}

Remember to use Overall Coverage in any Sonar Quality Gates!

Writing: Coders Causing Conflict

Pete Goodliffe from Pete Goodliffe

My latest Becoming a Better Programmer column is published in the March issue of C Vu magazine (27.1). It's called Coders Causing Conflict and investigates how "conflict" can be a driving force for good in software development. It's quite an interesting topic, and one worth thinking about.

I realise that I failed to announce my previous few C Vu columns here. So, as a recap:
  • In January (C Vu 26.6) I published Advice for the Young at Heart, considering how to give career advice to new programmers.
  • In November 2014 (C Vu 26.5) I published Playing by the Rules which looks at developing "tribal rules" for your development team to help it work most effectively.

C Vu is a magazine produced by the ACCU - an excellent organisation for programmers. It has a great community, great publications, and an awesome conference. Check it out.

Setting up Emacs spell checking on OS X

The Lone C++ Coder's Blog from The Lone C++ Coder&#039;s Blog

As I mentioned in yesterday’s post, I’m trying to improve my blogging workflow by using org2blog to draft my posts before pushing them to my WordPress blog. When I posted yesterday I had the basic workflow going, could edit posts in Emacs, save them, update drafts and push them to WordPress. The last piece that was missing was getting spell checking to work. I’ve actually never spent much time thinking about spell checkers until I discovered that OS X doesn’t come with a spell checker that ispell recognises.

Improving my blogging workflow using Emacs (of course)

The Lone C++ Coder's Blog from The Lone C++ Coder&#039;s Blog

I try not to post too many metablogging posts. Other people do it better and I’m trying to focus on journalling what I learn as a software engineer and manager, not what tools I use for blogging. However after losing another post to WordPress’s built-in editor I decided Something Must Be Done. I think this is only the second post I lost, but it’s a fairly regular occurrence for a journalist friend of mine and I really don’t have that much time to retype blog entries that ended up in Bit Nirvana.

An SSL Truster

Tim Pizey from Tim Pizey

Should you wish, when testing say, to trust all ssl certificates:


import java.net.URL;

import javax.net.ssl.*;

import com.mediagraft.shared.utils.UtilsHandbag;

/**
* Modify the JVM wide SSL trusting so that locally signed https urls, and others, are
* no longer rejected.
*
* Use with care as the JVM should not be used in production after activation.
*
*/
public class JvmSslTruster {

private static boolean activated_;

private static X509TrustManager allTrustingManager_ = new X509TrustManager() {

public java.security.cert.X509Certificate[] getAcceptedIssuers() {
return null;
}

public void checkClientTrusted(java.security.cert.X509Certificate[] certs, String authType) {
}

public void checkServerTrusted(java.security.cert.X509Certificate[] certs, String authType) {
}
};

public static SSLSocketFactory trustingSSLFactory() {
SSLContext sc = null;
try {
sc = SSLContext.getInstance("SSL");
sc.init(null, new TrustManager[]{allTrustingManager_}, new java.security.SecureRandom());
new URL(UtilsHandbag.getSecureApplicationURL()); //Force loading of installed trust manager
}
catch (Exception e) {
throw new RuntimeException("Unhandled exception", e);
}
return sc.getSocketFactory();
}

public static void startTrusting() {
if (!activated_) {
HttpsURLConnection.setDefaultSSLSocketFactory(trustingSSLFactory());
com.sun.net.ssl.HttpsURLConnection.setDefaultSSLSocketFactory(trustingSSLFactory());
activated_ = true;
}
}

private JvmSslTruster() {
}
}

When a Swift immutable collection isn’t or is at least immutable-ish

Pete Barber from C#, C++, Windows &amp; other ramblings

Take the following code:

 struct Foo  
{
var value: Int
}

let foo = [Foo(value: 1), Foo(value: 2)]

By using 'let foo' an immutable array has been created. As such no members of this array can be replaced and nor can an element's contents be modified. As such both statements below result in compilation errors.

 foo[0] = Foo(value: 777) // error!  
foo[0].value = 8 // error!

If foo were declared var then both of the above statements would work.

This changes slightly when using reference types:

 class Bar  
{
var value: Int = 0

init(value: Int)
{
self.value = value
}
}

let bar = [Bar(value: 1), Bar(value: 2)]

bar[0] = Foo(value: 777) // error!
bar[0].value = 8 // allowed

The first case of trying to replace an element fails as before but modifying the instance referenced by that element is permitted. This is because the immutable collection holds a reference to the instance of Bar. Modifying the instance does not change the reference so the collection remains unchanged.

If you're making an effort to make your code as immutable (const) as possible (I'm from a C++ background so I endeavour to making everything as const I can) then this is a gotcha to lookout for.

The only way to make Bar properly const is to provide a private setter for the member, assuming it's defined in it's own source file (so this example doesn't work in a Plaground), i.e.

 class Baz  
{
private(set) var value: Int = 0

init(value: Int)
{
self.value = value
}

func f()
{
value = 88;
}
}

Which now prevents value being assigned too.

 let baz = [Baz(value: 1), Baz(value: 2)]  
baz[0].value = 8 // error!


Even if all the properties have private setters there might be methods on the reference type that modify the instance, such as f() above. Just having an immutable collection of reference types is not sufficient to stop these mutating the instances referred too. Invoking f() in the above collection will change the value to 88, i.e.

 println("\(baz[0].value)") // Gives 1  
baz[0].f()
println("\(baz[0].value)") // Gives 88


This differs to C++ where if a collection is const or is returned as a const reference (from a method) than not only is the container immutable but so are its contents. It would be good if Swift were to gain a mechanism that would mark the collection and its contents completely immutable other than having to use a value type.

Anyway, beware of making a collection of reference types immutable and assuming that both the collection and its contents cannot be modified!

Visual Studio 2013, PC-lint and C++ 11 Variadic Templates

Products, the Universe and Everything from Products, the Universe and Everything

Although we added support for Visual Studio 2013 some time ago, PC-lint has lagged behind somewhat and even now (well over a year after it was released) has difficulty analysing any projects for it which use the Standard Template Library (STL) to any significant extent.

In large part this is due to the fact that PC-lint has to date lacked support for C++ 11 variadic templates (which are heavily used in parts of the Visual Studio 2013 system headers). With PC-lint 9.00L (the current patch level) even a simple #include <map> will trigger an internal error (9.00k was less vocal, but still raised errors you had to suppress).

Although this was not a huge problem when Visual Studio 2013 first came out (most development teams take their time moving to new versions of Visual Studio), it is now sufficiently mature that many teams are moving to it, and that's potentially a big problem if you also use PC-lint. The arrival of the free Visual Studio Community Edition has of course accelerated this trend.

Although we were expecting this to have been fixed by Gimpel around the middle of last year they apparently found that doing so proved to be far trickier than anticipated, with the end result that this limitation has become an increasingly large problem. The latest information we have is that there will be a beta with full support for variadic templates available sometime in March, so at least there is now some light at the end of this particular tunnel.

However, that does probably mean that there won't be a complete "production" fix for at least a couple of months after that. Hence we have been looking at potential workarounds (with one of our customers who is in the process of moving their codebase to Visual Studio 2013 and has run into this issue) to see what we can do in the meantime.

The most promising approach we have identified so far is actually very simple - just substitute the system headers for an earlier version of Visual Studio while analysing Visual Studio 2013 projects by modifying the -i directives for the system headers, while leaving the rest of the analysis configuration unchanged. The major caveat is of course that you need to have an earlier version of Visual Studio co-installed, but in practice that's pretty common.

The second caveat is that you may run into problems if you are using STL functionality (e.g. std::make_unique) which is implemented in the Visual Studio 2013 system headers but absent from earlier versions. Even then, there are workarounds in some cases - it really depends on what you use in your projects. It also goes without saying that the workaround can't handle any code you write in your own projects which uses variadic templates directly.

Given all that however it does seem to work rather well (it even seems to make it practical to analyse Visual Studio 2013 projects with PC-lint 8.0, which is an unexpected bonus!) and as a result we've decided to build this into Visual Lint so that it can take care of it automatically (but optionally, of course) for you when it determines that you are using PC-lint 9.00L or earlier. For now we've limited it to using the system headers from a Visual Studio 2012 or 2010 installation on the same machine, but we can extend that if needed.

This functionality should be out as part of Visual Lint 4.5.9.239 soon, but we are happy to release preliminary builds on a case by case basis if it will help other teams who are running into the same problem. Likewise if you have any questions about the specifics of this approach or are running into this issue (not necessarily just with Visual Studio) just let us know and we will be happy to help.

Update: Visual Lint 4.5.9.239 was released on 6th March 2015.

Visual Studio 2013, PC-lint and C++ 11 Variadic Templates

Products, the Universe and Everything from Products, the Universe and Everything

Although we added support for Visual Studio 2013 some time ago, PC-lint has lagged behind somewhat and even now (well over a year after it was released) has difficulty analysing any projects for it which use the Standard Template Library (STL) to any significant extent.

In large part this is due to the fact that PC-lint has to date lacked support for C++ 11 variadic templates (which are heavily used in parts of the Visual Studio 2013 system headers). With PC-lint 9.00L (the current patch level) even a simple #include <map> will trigger an internal error (9.00k was less vocal, but still raised errors you had to suppress).

Although this was not a huge problem when Visual Studio 2013 first came out (most development teams take their time moving to new versions of Visual Studio), it is now sufficiently mature that many teams are moving to it, and that's potentially a big problem if you also use PC-lint. The arrival of the free Visual Studio 2013 Community Edition has of course accelerated this trend.

Although we were expecting this to have been fixed by Gimpel around the middle of last year they apparently found that doing so proved to be far trickier than anticipated, with the end result that this limitation has become an increasingly large problem. The latest information we have is that there will be a beta with full support for variadic templates available sometime in March, so at least there is now some light at the end of this particular tunnel.

However, that does probably mean that there won't be a complete "production" fix for at least a couple of months after that. Hence we have been looking at potential workarounds (with one of our customers who is in the process of moving their codebase to Visual Studio 2013 and has run into this issue) to see what we can do in the meantime.

The most promising approach we have identified so far is actually very simple - just substitute the system headers for an earlier version of Visual Studio while analysing Visual Studio 2013 projects by modifying the -i directives for the system headers, while leaving the rest of the analysis configuration unchanged. The major caveat is of course that you need to have an earlier version of Visual Studio co-installed, but in practice that's pretty common.

The second caveat is that you may run into problems if you are using STL functionality (e.g. std::make_unique) which is implemented in the Visual Studio 2013 system headers but absent from earlier versions. Even then, there are workarounds in some cases - it really depends on what you use in your projects. It also goes without saying that the workaround can't handle any code you write in your own projects which uses variadic templates directly.

Given all that however it does seem to work rather well (it even seems to make it practical to analyse Visual Studio 2013 projects with PC-lint 8.0, which is an unexpected bonus!) and as a result we've decided to build this into Visual Lint so that it can take care of it automatically (but optionally, of course) for you when it determines that you are using PC-lint 9.00L or earlier. For now we've limited it to using the system headers from a Visual Studio 2012 or 2010 installation on the same machine, but we can extend that if needed.

This functionality should be out as part of Visual Lint 4.5.9.239 soon, but we are happy to release preliminary builds on a case by case basis if it will help other teams who are running into the same problem. Likewise if you have any questions about the specifics of this approach or are running into this issue (not necessarily just with Visual Studio) just let us know and we will be happy to help.

Update: The build is now available. Download Visual Lint 4.5.9.239