Wednesday, July 13, 2005

How well do you know python, part 8

Here is a mostly-functioning class that facilitates writing producer / consumer algorithms. It is designed to support a single producing and multiple consuming threads, i.e., scenarios where consuming involves some blocking operations such as communicating over a socket.

"Mostly," in this case, means that there are two bugs in this class that can be fixed in a total of four or five lines. One is arguably more subtle than the other, but they involve the same part of the code. Can you spot them?

Warning: this one is tougher than part 2, which also dealt with threading.

import Queue as queue
import threading

class PCQueue:
    def __init__(self, initialthreads):
        self.q = queue.Queue()
        self.running = True
        self.condition = threading.Condition()
        for i in range(initialthreads):
            self.add_consumer()
    def run_consumer(self):
        while True:
            self.condition.acquire()
            try:
                self.condition.wait()
                if not self.running:
                    return
            finally:
                self.condition.release()
            obj = self.q.get()
            self.consume(obj)
    def put(self, obj):
        self.condition.acquire()
        self.q.put(obj)
        self.condition.notify()
        self.condition.release()
    def consume(self, obj):
        raise 'must implement consume method'
    def add_consumer(self):
        threading.Thread(target=self.run_consumer).start()
    def stop(self):
        self.running = False
        self.condition.acquire()
        self.condition.notifyAll()
        self.condition.release()

Here's a short example using PCQueue.

class PCExample(PCQueue):
    def consume(self, url):
        import urllib
        n = len(urllib.urlopen(url).read())
        print '\t%s contains %d bytes' % (url, n)

q = PCExample(2) # 2 threads
while True:
    print 'url? (empty string to quit)'
    url = raw_input()
    if not url:
        break
    q.put(url)
q.stop()

7 comments:

paul cannon said...

Well, I haven't run this, so I don't know if there's something even more obvious that I'm missing, but..

(a) It looks like it's possible for the put() method to notify the threads at a moment when none of them are actually waiting. The item would get stuck in the queue, probably forever. Trying to call self.q.get_nowait() in run_consumer before the wait would probably fix this.

(b) The notifyAll() call in stop could also happen when some of the threads aren't waiting, so they would wait for another notification before checking self.running. Similarly to the last item, putting another check for self.running before the wait would solve this.

(b) the use of the condition.notify() in put() isn't quite future-proof; the docs say future versions of notify might sometimes wake up more than one thread, and the code doesn't allow for that. This is probably not an issue.

I have a feeling I didn't catch the subtle bug.

paul cannon said...

Er, on (a), I meant that some item would always be stuck in the queue, not that that specific item would remain stuck.

Damjan said...

I don't know Python this well, but your challenges are interesting.

I tried the program and it seems to run well, but anyway :

In run_consumer there is:
self.condition.wait()
if not self.running:
return
shouldn't the condition be released before the return?

Jonathan Ellis said...

Paul, those are the issues I had in mind.

Damjan: that is why that code is in a try/finally block. The code in a finally block always gets executed after the try block completes, even if the try block terminated because of a break or return.

Damjan said...

jonathan: on the first gleam I saw the finally: statement as an except: .

Then I added a print "Condition released" just after the finally: statement to make sure the condition is released.

paul cannon said...

This one was easier to me than the part 2 question, for which I had to decode Ian Bicking's answer. I suck!

Anonymous said...

That is so great!
It's also possible to run python in parallel on SMP: Parallel Python