In my previous post, Use More Iterators, I have outlined how to harvest some low hanging fruit when it comes to converting your functions to generators. In this series of posts I want to take it to the next level and introduce a few powerful constructs that can assist you when working with streams.
Streams ¶
Previously, I've compared Python's generators to value factories (producing values lazily) and talked about their composability. I want to pay some more attention to these concepts in this blog post.
One particular concept that fits generators like a glove is to use them to stream data. Streams help you express solutions to many data manipulation flows and processes elegantly. Of course, this idea is not novel: the concept of streams finds its roots in the early 60's (as all good CS ideas do).
How do streams fit generators? Since a generator is a function that returns a "value factory", it's a natural component to act as a "node" in a network of such generators. Each such component takes input, does something with it, and emits output.
A Little Word Puzzle ¶
Take a look at this example to generate a little word puzzle. It generates a list of the first 5 dictionary words of 20 characters or more that aren't names, and hides their vowels:
import re from itertools import islice vowels_re = re.compile('[aeiouy]') def all_words(): with open('/usr/share/dict/words') as f: for line in f: yield line.strip() def keep_long_words(iterable, min_len): return (word for word in iterable if len(word) >= min_len) def exclude_names(iterable): return (word for word in iterable if word.lower() == word) def hide_vowels(iterable): for word in iterable: yield vowels_re.sub('.', word) def limit(iterable, n): return islice(iterable, 0, n) stream = all_words() stream = keep_long_words(stream, 20) stream = exclude_names(stream) stream = hide_vowels(stream) stream = limit(stream, 5) for word in stream: print(word)
This will print the following list:
.bd.m.n.h.st.r.ct.m. .c.t.lm.th.lc.rb.n.l .c.t.lph.n.lh.dr.z.n. .m.n..c.t.ph.n.t.d.n. .n.rch..nd.v.d..l.st
Wrap(Wrap(Wrap(…))) ¶
The variable stream
is used to incrementally build up an entire stream
(network of stream processors). It start with all_words()
, the generator
that emits all dictionary words from the dictionary file.
Then with each further step, stream
is "wrapped" in another generator, which
is used to chain the generators together. The emitted output of all_words()
will now be consumed by the keep_long_words()
generator, emitting only the
words from the input stream that match the length criterium.
We keep "wrapping" these with another filter step (exclude_names()
) and
a manipulation step (hide_vowels()
), and finally limit the list to return
maximally 5 items.
As a result, the variable stream
is re-assigned a few times. There is a nice
advantage to this approach: it avoids using any further variables, and allows
us to build up the entire stream line by line. The order in which we build it
up, resembles the way the data flows.
Lastly, you can comment out a line in the middle and the code still works. If you decide you do want names in the result list, simply comment out this line:
stream = all_words() stream = keep_long_words(stream, 20) # stream = exclude_names(stream) # comment this out to skip this step stream = hide_vowels(stream) stream = limit(stream, 5)
And the stream will still be valid. This is especially useful while trying out your streams as you're still developing them streams. Without using this intermediate variable, the equivalent would look like this:
stream = limit(hide_vowels(exclude_names(keep_long_words(all_words(), 20))), 5) output = list(stream)
This does the same thing, but is rather messy, and gets unreadable quickly.
A DSL to Compose Streams ¶
Putting together the pieces of the stream as we did above is relatively clunky. There is a better way. What if we could express the thing above like this?
stream = (all_words() >> keep_long_words(20) >> exclude_names() >> hide_vowels() >> limit(5)) output = list(stream)
This syntax would have the best of both worlds: it allows you to elegantly
chain together generators using the >>
operator without using an intermediate
variable. If A
and B
are streams, then the result of A >> B
is the
composition of both streams, applying A
first to its input, then applying
B
:
We can actually build this. Let's define a Task
, a base class for each such
component that's chainable using the >>
operator. It looks like this:
class Task: def pipe(self, other_task): return ChainedTask(self, other_task) def process(self, inputs): raise NotImplementedError def __iter__(self): return self.process([]) def __rshift__(self, other): return self.pipe(other) class ChainedTask(Task): def __init__(self, task1, task2): self.task1 = task1 self.task2 = task2 def process(self, inputs): return self.task2.process(self.task1.process(inputs))
But how do we get from the generator functions of the example above to these
tasks that support the >>
operator? We need to convert them to tasks by
implementing the process()
method. Here's the exclude_names()
function
converted:
class exclude_names(Task): def process(self, inputs): return (word for word in iterable if word.lower() == word)
And here is an example of converting a function with arguments. The argument moves to the constructor of the class:
class keep_long_words(Task): def __init__(self, min_len): self.min_len = min_len def process(self, iterable): return (word for word in iterable if len(word) >= self.min_len)
Another notable case is the "starting" generator: the one spitting out the
dictionary words. As a generator function, this did not take any inputs, but
as a Task, it still receives the inputs
argument, but should ignore it. This
way, we can treat it like any other task, and we'll see an example of how this
is useful later on:
class all_words(Task): def process(self, inputs): # inputs arg is ignored with open('/usr/share/dict/words') as f: for line in f: yield line.strip()
Making Compositions ¶
Using this, we can now start to make some abstractions. We can assign a series of chained tasks to a variable and insert where we need it. This can be used to clarify what is going on in those steps, or for reusability of a component.
def puzzlify(): return (keep_long_words(20) >> exclude_names() >> hide_vowels()) stream = all_words() >> puzzlify() >> limit(5) output = list(stream)
Note that calling puzzlify()
will return a Task
instance (the one that
chains together the three sub tasks). Then, this task instance is further
chained into the larger example. Also note that puzzlify()
itself is
a perpetual processor: there's no start or end defined by it. The context it's
used in defined the start and the end.
The chaining primitive allows you to craft complex data streams in an elegant fashion.
Complex Operations ¶
The composition operation (>>
) isn't the only task common with streams.
Another one is the split-and-join operation. In this scenario, you may want to
perform multiple operations on a single stream independent of each other. This
can be achieved using the &
operator:
This will split the inputs and feed copies of the inputs to both processes.
After applying each task, the results get merged back, exhausting A
first,
then B
.
Suppose we would want to filter our dictionary for words that are anagrams or
contain the substring anana
(or both). You could do it as follows:
stream = (all_words() >> (is_anagram() & contains_anana())) for word in stream: print(word)
The real power of this split-and-join operator comes when you combine it to perform different actions on each "side" of the split. For example, if you want to uppercase all of the anagrams, but lowercase all of the words containing the "anana" substring:
stream = (all_words() >> (is_anagram() >> uppercase() & contains_anana() >> lowercase())) for word in stream: print(word)
And here are the results:
... SOOLOOS TEBBET TEBET TENET TERRET ULULU YARAY anana ananaplas ananaples ananas banana bananaland bananalander ...
This is all done in a single stream.
Why >>
and &
? ¶
You may ask why I did not follow the more familiar syntax that most shells
follow, using A | B
. The reason is Python's operator precedence.