👉 Latest post:
"Git power tools for daily use"
By Vincent Driessen
on Wednesday, March 25, 2015

In my previous post, Use More Iterators, I have outlined how to harvest some low hanging fruit when it comes to converting your functions to generators. In this series of posts I want to take it to the next level and introduce a few powerful constructs that can assist you when working with streams.

Streams

Previously, I've compared Python's generators to value factories (producing values lazily) and talked about their composability. I want to pay some more attention to these concepts in this blog post.

One particular concept that fits generators like a glove is to use them to stream data. Streams help you express solutions to many data manipulation flows and processes elegantly. Of course, this idea is not novel: the concept of streams finds its roots in the early 60's (as all good CS ideas do).

How do streams fit generators? Since a generator is a function that returns a "value factory", it's a natural component to act as a "node" in a network of such generators. Each such component takes input, does something with it, and emits output.

A Little Word Puzzle

Take a look at this example to generate a little word puzzle. It generates a list of the first 5 dictionary words of 20 characters or more that aren't names, and hides their vowels:

import re
from itertools import islice

vowels_re = re.compile('[aeiouy]')

def all_words():
    with open('/usr/share/dict/words') as f:
        for line in f:
            yield line.strip()

def keep_long_words(iterable, min_len):
    return (word for word in iterable if len(word) >= min_len)

def exclude_names(iterable):
    return (word for word in iterable if word.lower() == word)

def hide_vowels(iterable):
    for word in iterable:
        yield vowels_re.sub('.', word)

def limit(iterable, n):
    return islice(iterable, 0, n)

stream = all_words()
stream = keep_long_words(stream, 20)
stream = exclude_names(stream)
stream = hide_vowels(stream)
stream = limit(stream, 5)

for word in stream:
    print(word)

This will print the following list:

.bd.m.n.h.st.r.ct.m.
.c.t.lm.th.lc.rb.n.l
.c.t.lph.n.lh.dr.z.n.
.m.n..c.t.ph.n.t.d.n.
.n.rch..nd.v.d..l.st

Wrap(Wrap(Wrap(…)))

The variable stream is used to incrementally build up an entire stream (network of stream processors). It start with all_words(), the generator that emits all dictionary words from the dictionary file.

Then with each further step, stream is "wrapped" in another generator, which is used to chain the generators together. The emitted output of all_words() will now be consumed by the keep_long_words() generator, emitting only the words from the input stream that match the length criterium.

We keep "wrapping" these with another filter step (exclude_names()) and a manipulation step (hide_vowels()), and finally limit the list to return maximally 5 items.

As a result, the variable stream is re-assigned a few times. There is a nice advantage to this approach: it avoids using any further variables, and allows us to build up the entire stream line by line. The order in which we build it up, resembles the way the data flows.

Lastly, you can comment out a line in the middle and the code still works. If you decide you do want names in the result list, simply comment out this line:

stream = all_words()
stream = keep_long_words(stream, 20)
# stream = exclude_names(stream)  # comment this out to skip this step
stream = hide_vowels(stream)
stream = limit(stream, 5)

And the stream will still be valid. This is especially useful while trying out your streams as you're still developing them streams. Without using this intermediate variable, the equivalent would look like this:

stream = limit(hide_vowels(exclude_names(keep_long_words(all_words(), 20))), 5)
output = list(stream)

This does the same thing, but is rather messy, and gets unreadable quickly.

A DSL to Compose Streams

Putting together the pieces of the stream as we did above is relatively clunky. There is a better way. What if we could express the thing above like this?

stream = (all_words()
          >> keep_long_words(20)
          >> exclude_names()
          >> hide_vowels()
          >> limit(5))
output = list(stream)

This syntax would have the best of both worlds: it allows you to elegantly chain together generators using the >> operator without using an intermediate variable. If A and B are streams, then the result of A >> B is the composition of both streams, applying A first to its input, then applying B:

We can actually build this. Let's define a Task, a base class for each such component that's chainable using the >> operator. It looks like this:

class Task:
    def pipe(self, other_task):
        return ChainedTask(self, other_task)

    def process(self, inputs):
        raise NotImplementedError

    def __iter__(self):
        return self.process([])

    def __rshift__(self, other):
        return self.pipe(other)


class ChainedTask(Task):
    def __init__(self, task1, task2):
        self.task1 = task1
        self.task2 = task2

    def process(self, inputs):
        return self.task2.process(self.task1.process(inputs))

But how do we get from the generator functions of the example above to these tasks that support the >> operator? We need to convert them to tasks by implementing the process() method. Here's the exclude_names() function converted:

class exclude_names(Task):
    def process(self, inputs):
        return (word for word in iterable if word.lower() == word)

And here is an example of converting a function with arguments. The argument moves to the constructor of the class:

class keep_long_words(Task):
    def __init__(self, min_len):
        self.min_len = min_len

    def process(self, iterable):
        return (word for word in iterable if len(word) >= self.min_len)

Another notable case is the "starting" generator: the one spitting out the dictionary words. As a generator function, this did not take any inputs, but as a Task, it still receives the inputs argument, but should ignore it. This way, we can treat it like any other task, and we'll see an example of how this is useful later on:

class all_words(Task):
    def process(self, inputs):  # inputs arg is ignored
        with open('/usr/share/dict/words') as f:
            for line in f:
                yield line.strip()

Making Compositions

Using this, we can now start to make some abstractions. We can assign a series of chained tasks to a variable and insert where we need it. This can be used to clarify what is going on in those steps, or for reusability of a component.

def puzzlify():
    return (keep_long_words(20)
            >> exclude_names()
            >> hide_vowels())

stream = all_words() >> puzzlify() >> limit(5)
output = list(stream)

Note that calling puzzlify() will return a Task instance (the one that chains together the three sub tasks). Then, this task instance is further chained into the larger example. Also note that puzzlify() itself is a perpetual processor: there's no start or end defined by it. The context it's used in defined the start and the end.

The chaining primitive allows you to craft complex data streams in an elegant fashion.

Complex Operations

The composition operation (>>) isn't the only task common with streams. Another one is the split-and-join operation. In this scenario, you may want to perform multiple operations on a single stream independent of each other. This can be achieved using the & operator:

This will split the inputs and feed copies of the inputs to both processes. After applying each task, the results get merged back, exhausting A first, then B.

Suppose we would want to filter our dictionary for words that are anagrams or contain the substring anana (or both). You could do it as follows:

stream = (all_words()
          >> (is_anagram() & contains_anana()))
for word in stream:
    print(word)

The real power of this split-and-join operator comes when you combine it to perform different actions on each "side" of the split. For example, if you want to uppercase all of the anagrams, but lowercase all of the words containing the "anana" substring:

stream = (all_words()
          >> (is_anagram() >> uppercase()
              &
              contains_anana() >> lowercase()))
for word in stream:
    print(word)

And here are the results:

...
SOOLOOS
TEBBET
TEBET
TENET
TERRET
ULULU
YARAY
anana
ananaplas
ananaples
ananas
banana
bananaland
bananalander
...

This is all done in a single stream.

Why >> and &?

You may ask why I did not follow the more familiar syntax that most shells follow, using A | B. The reason is Python's operator precedence.

Other posts on this blog

If you want to get in touch, I'm @nvie on Twitter.