Select and Python Generators

One of the loveliest things about Unix is the select() function (or its replacement, poll()), and the way it lets a single thread handle a host of concurrent tasks efficiently by just using file descriptors as work queues.

Unfortunately, it can be a nuisance to use — you end up having to structure your program as a state machine around the select() invocation, rather than the actual procedure you want to have happen. You can avoid that by not using select() and instead just having a separate thread/process for every task you want to do — but that creates a bunch of tedious overhead for the OS (and admin) to worry about.

But magically making state machines is what Python’s generators are all about; so for my little pet project that involves forking a bunch of subprocesses to do the interesting computational work my python program wants done, I thought I’d see if I could use that to make my code more obvious.

What I want to achieve is to have a bunch of subprocesses accepting some setup data, then a bunch of two byte ids, terminated by two bytes of 0xFF, and for each of the two byte inputs to output a line of text giving the calculation result. For the time being at least, I want the IO to be asynchronous: so I’ll give it as many inputs as I can, rather than waiting for the result before sending the next input.

So basically, I want to write something like:


def send_inputs(f, s, n):
        f.write(s) # write setup data
        for i in xrange(n):
                f.write(struct.pack("!H", i))
        f.write(struct.pack("!H", 0xFFFF))

def read_output(f):
        for line in f:
                if is_interesting(line):
                        print line

Except of course, that doesn’t work directly because writing some data or reading a line can block, and when it does, I want it to be doing something else (reading instead of writing or vice-versa, or paying attention to another process).

Generators are the way to do that in Python, with the “yield” keyword passing control flow and some information back somewhere else, so adopting the theory that: (a) I’ll only resume from a “yield” when it’s okay to write some more data, (b) if I “yield None” there’s probably no point coming back to me unless you’ve got some more data for me to read, and (c) I’ll provide a single parameter which is an iterator that will give me input when it’s available and None when it’s not, I can code the above as:


def send_inputs(_):
        # s, n declared in enclosing scope
        yield s
        for i in xrange(n):
                yield struct.pack("!H", i))
        yield struct.pack("!H", 0xFFFF)

def read_output(f):
        for line in f:
                if line is None: yield None; continue
                if is_interesting(line):
                        print line

There’s a few complications there. For one, I could be yielding more data than can actually be written, so I might want to buffer there to avoid blocking. (I haven’t bothered; just as I haven’t worried about “print” possibly blocking) Likewise, I might only receive part of a line, or I might receive more than one line at once, and afaics a buffer there is unavoidable. If I were doing fixed size reads (instead of line at a time), that might be different.

So far, the above seems pretty pleasant to me — those functions describe what I want to have happen in a nice procedural manner (almost as if they had a thread all to themselves) with the only extra bit the “None, None, continue” line, which I’m willing to accept in order not to use threads.

Making that actually function does need a little grunging around, but happily we can hide that away in a module — so my API looks like:


p = subprocess.Popen(["./helper"], stdin=PIPE, stdout=PIPE, close_fds=True)
comm = communicate.Communication()
comm.add(send_inputs, p.stdin, None)
comm.add(read_output, None, p.stdout, communicate.ByLine())
comm.communicate()

The comm.add() function takes a generator function, an output fd (ie, the subprocess’s stdin), an input fd (the subprocess’s output), and an (optional) iterator. The generator gets created when communication starts, with the iterator passed as the argument. The iterator needs to have an “add” function (which gets given the bytes received), a “waiting” function, which returns True or False depending on whether it can provide any more input for the generator, and a “finish” function that gets called once EOF is hit on the input. (Actually, it doesn’t strictly need to be an iterator, though it’s convenient for the generator if it is)

The generator functions once “executed” return an object with a next() method that’ll run the function you defined until the next “yield” (in which case next() will return the value yielded), or a “return” is hit (in which case the StopIteration exception is raised).

So what we then want to do to have this all work then, is this: (a) do a select() on all the files we’ve been given; (b) for the ones we can read from, read them and add() to the corresponding iterators; (c) for the generators that don’t have an output file, or whose output file we can write to, invoke next() until either: they raise StopIteration, they yield a value for us to output, or they yield None and their iterator reports that it’s waiting. Add in some code to ensure that reads from the file descriptors don’t block, and you get:


def communicate(self):
    readable, writable = [], []
    for g,o,i,iter in self.coroutines:
        if i is not None:
            fcntl.fcntl(i, fcntl.F_SETFL, 
                        fcntl.fcntl(i, fcntl.F_GETFL) | os.O_NONBLOCK)
            readable.append(i)
        if o is not None:
            writable.append(o)
    
    while readable != [] or writable != []:
        read, write, exc = select.select(readable, writable, [])
        for g,o,i,iter in self.coroutines:
            if i in read:
                x = i.read()
                if x == "": # eof
                    iter.finish()
                    readable.remove(i)
                else:
                    iter.add(x)

            if o is None or o in write:
                x = None
                try:
                    while x is None and not iter.waiting():
                        x = g.next()
                    if x is not None:
                        o.write(x)
                except StopIteration:
                    if o is not None:
                        writable.remove(o)
    return

You can break it by: (a) yielding more than you can write without blocking (it’ll block rather than buffer, and you might get a deadlock), (b) yielding a value from a generator that doesn’t have a file associated with it (None.write(x) won’t work), (c) having generators that don’t actually yield, and (d) probably some other ways. And it would’ve been nice if I could have somehow moved the “yield None” into the iterator so that it was implicit in the “for line in f”, rather than explicit.

But even so, I quite like it.

Leave a Reply