{"id":336,"date":"2008-04-10T02:09:47","date_gmt":"2008-04-09T16:09:47","guid":{"rendered":"http:\/\/www.erisian.com.au\/wordpress\/?p=336"},"modified":"2008-04-10T02:09:47","modified_gmt":"2008-04-09T16:09:47","slug":"select-and-python-generators","status":"publish","type":"post","link":"https:\/\/www.erisian.com.au\/wordpress\/2008\/04\/10\/select-and-python-generators","title":{"rendered":"Select and Python Generators"},"content":{"rendered":"<p>One of the loveliest things about Unix is the select() function (or its replacement, poll()), and the way it lets a single thread handle a host of concurrent tasks efficiently by just using file descriptors as work queues.<\/p>\n<p>Unfortunately, it can be a nuisance to use &#8212; you end up having to structure your program as a state machine around the select() invocation, rather than the actual procedure you want to have happen. You can avoid that by not using select() and instead just having a separate thread\/process for every task you want to do &#8212; but that creates a bunch of tedious overhead for the OS (and admin) to worry about.<\/p>\n<p>But magically making state machines is what Python&#8217;s generators are all about; so for my little pet project that involves forking a bunch of subprocesses to do the interesting computational work my python program wants done, I thought I&#8217;d see if I could use that to make my code more obvious.<\/p>\n<p>What I want to achieve is to have a bunch of subprocesses accepting some setup data, then a bunch of two byte ids, terminated by two bytes of 0xFF, and for each of the two byte inputs to output a line of text giving the calculation result. For the time being at least, I want the IO to be asynchronous: so I&#8217;ll give it as many inputs as I can, rather than waiting for the result before sending the next input.<\/p>\n<p>So basically, I want to write something like:<\/p>\n<blockquote>\n<pre>\n<code>\ndef send_inputs(f, s, n):\n        f.write(s) # write setup data\n        for i in xrange(n):\n                f.write(struct.pack(\"!H\", i))\n        f.write(struct.pack(\"!H\", 0xFFFF))\n\ndef read_output(f):\n        for line in f:\n                if is_interesting(line):\n                        print line\n<\/code>\n<\/pre>\n<\/blockquote>\n<p>Except of course, that doesn&#8217;t work directly because writing some data or reading a line can block, and when it does, I want it to be doing something else (reading instead of writing or vice-versa, or paying attention to another process).<\/p>\n<p>Generators are the way to do that in Python, with the &#8220;yield&#8221; keyword passing control flow and some information back somewhere else, so adopting the theory that: (a) I&#8217;ll only resume from a &#8220;yield&#8221; when it&#8217;s okay to write some more data, (b) if I &#8220;yield None&#8221; there&#8217;s probably no point coming back to me unless you&#8217;ve got some more data for me to read, and (c) I&#8217;ll provide a single parameter which is an iterator that will give me input when it&#8217;s available and None when it&#8217;s not, I can code the above as:<\/p>\n<blockquote>\n<pre>\n<code>\ndef send_inputs(_):\n        # s, n declared in enclosing scope\n        yield s\n        for i in xrange(n):\n                yield struct.pack(\"!H\", i))\n        yield struct.pack(\"!H\", 0xFFFF)\n\ndef read_output(f):\n        for line in f:\n                if line is None: yield None; continue\n                if is_interesting(line):\n                        print line\n<\/code>\n<\/pre>\n<\/blockquote>\n<p>There&#8217;s a few complications there. For one, I could be yielding more data than can actually be written, so I might want to buffer there to avoid blocking. (I haven&#8217;t bothered; just as I haven&#8217;t worried about &#8220;print&#8221; possibly blocking) Likewise, I might only receive part of a line, or I might receive more than one line at once, and afaics a buffer there is unavoidable. If I were doing fixed size reads (instead of line at a time), that might be different.<\/p>\n<p>So far, the above seems pretty pleasant to me &#8212; those functions describe what I want to have happen in a nice procedural manner (almost as if they had a thread all to themselves) with the only extra bit the &#8220;None, None, continue&#8221; line, which I&#8217;m willing to accept in order not to use threads.<\/p>\n<p>Making that actually function does need a little grunging around, but happily we can hide that away in a module &#8212; so my API looks like:<\/p>\n<blockquote>\n<pre>\n<code>\np = subprocess.Popen([\".\/helper\"], stdin=PIPE, stdout=PIPE, close_fds=True)\ncomm = communicate.Communication()\ncomm.add(send_inputs, p.stdin, None)\ncomm.add(read_output, None, p.stdout, communicate.ByLine())\ncomm.communicate()\n<\/code>\n<\/pre>\n<\/blockquote>\n<p>The comm.add() function takes a generator function, an output fd (ie, the subprocess&#8217;s stdin), an input fd (the subprocess&#8217;s output), and an (optional) iterator. The generator gets created when communication starts, with the iterator passed as the argument. The iterator needs to have an &#8220;add&#8221; function (which gets given the bytes received), a &#8220;waiting&#8221; function, which returns True or False depending on whether it can provide any more input for the generator, and a &#8220;finish&#8221; function that gets called once EOF is hit on the input. (Actually, it doesn&#8217;t strictly need to be an iterator, though it&#8217;s convenient for the generator if it is)<\/p>\n<p>The generator functions once &#8220;executed&#8221; return an object with a next() method that&#8217;ll run the function you defined until the next &#8220;yield&#8221; (in which case next() will return the value yielded), or a &#8220;return&#8221; is hit (in which case the StopIteration exception is raised).<\/p>\n<p>So what we then want to do to have this all work then, is this: (a) do a select() on all the files we&#8217;ve been given; (b) for the ones we can read from, read them and add() to the corresponding iterators; (c) for the generators that don&#8217;t have an output file, or whose output file we can write to, invoke next() until either: they raise StopIteration, they yield a value for us to output, or they yield None and their iterator reports that it&#8217;s waiting. Add in some code to ensure that reads from the file descriptors don&#8217;t block, and you get:<\/p>\n<blockquote>\n<pre>\n<code>\ndef communicate(self):\n    readable, writable = [], []\n    for g,o,i,iter in self.coroutines:\n        if i is not None:\n            fcntl.fcntl(i, fcntl.F_SETFL, \n                        fcntl.fcntl(i, fcntl.F_GETFL) | os.O_NONBLOCK)\n            readable.append(i)\n        if o is not None:\n            writable.append(o)\n    \n    while readable != [] or writable != []:\n        read, write, exc = select.select(readable, writable, [])\n        for g,o,i,iter in self.coroutines:\n            if i in read:\n                x = i.read()\n                if x == \"\": # eof\n                    iter.finish()\n                    readable.remove(i)\n                else:\n                    iter.add(x)\n\n            if o is None or o in write:\n                x = None\n                try:\n                    while x is None and not iter.waiting():\n                        x = g.next()\n                    if x is not None:\n                        o.write(x)\n                except StopIteration:\n                    if o is not None:\n                        writable.remove(o)\n    return\n<\/code>\n<\/pre>\n<\/blockquote>\n<p>You can break it by: (a) yielding more than you can write without blocking (it&#8217;ll block rather than buffer, and you might get a deadlock), (b) yielding a value from a generator that doesn&#8217;t have a file associated with it (None.write(x) won&#8217;t work), (c) having generators that don&#8217;t actually yield, and (d) probably some other ways. And it would&#8217;ve been nice if I could have somehow moved the &#8220;yield None&#8221; into the iterator so that it was implicit in the &#8220;for line in f&#8221;, rather than explicit.<\/p>\n<p>But even so, I quite like it.<\/p>\n","protected":false},"excerpt":{"rendered":"<p>One of the loveliest things about Unix is the select() function (or its replacement, poll()), and the way it lets a single thread handle a host of concurrent tasks efficiently by just using file descriptors as work queues. Unfortunately, it can be a nuisance to use &#8212; you end up having to structure your program [&hellip;]<\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":[],"categories":[16],"tags":[],"_links":{"self":[{"href":"https:\/\/www.erisian.com.au\/wordpress\/wp-json\/wp\/v2\/posts\/336"}],"collection":[{"href":"https:\/\/www.erisian.com.au\/wordpress\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/www.erisian.com.au\/wordpress\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/www.erisian.com.au\/wordpress\/wp-json\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/www.erisian.com.au\/wordpress\/wp-json\/wp\/v2\/comments?post=336"}],"version-history":[{"count":0,"href":"https:\/\/www.erisian.com.au\/wordpress\/wp-json\/wp\/v2\/posts\/336\/revisions"}],"wp:attachment":[{"href":"https:\/\/www.erisian.com.au\/wordpress\/wp-json\/wp\/v2\/media?parent=336"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/www.erisian.com.au\/wordpress\/wp-json\/wp\/v2\/categories?post=336"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/www.erisian.com.au\/wordpress\/wp-json\/wp\/v2\/tags?post=336"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}