Here is a mostly-functioning class that facilitates writing producer / consumer algorithms. It is designed to support a single producing and multiple consuming threads, i.e., scenarios where consuming involves some blocking operations such as communicating over a socket.
"Mostly," in this case, means that there are two bugs in this class that can be fixed in a total of four or five lines. One is arguably more subtle than the other, but they involve the same part of the code. Can you spot them?
Warning: this one is tougher than part 2, which also dealt with threading.
import Queue as queue import threading class PCQueue: def __init__(self, initialthreads): self.q = queue.Queue() self.running = True self.condition = threading.Condition() for i in range(initialthreads): self.add_consumer() def run_consumer(self): while True: self.condition.acquire() try: self.condition.wait() if not self.running: return finally: self.condition.release() obj = self.q.get() self.consume(obj) def put(self, obj): self.condition.acquire() self.q.put(obj) self.condition.notify() self.condition.release() def consume(self, obj): raise 'must implement consume method' def add_consumer(self): threading.Thread(target=self.run_consumer).start() def stop(self): self.running = False self.condition.acquire() self.condition.notifyAll() self.condition.release()
Here's a short example using PCQueue.
class PCExample(PCQueue): def consume(self, url): import urllib n = len(urllib.urlopen(url).read()) print '\t%s contains %d bytes' % (url, n) q = PCExample(2) # 2 threads while True: print 'url? (empty string to quit)' url = raw_input() if not url: break q.put(url) q.stop()
Comments
(a) It looks like it's possible for the put() method to notify the threads at a moment when none of them are actually waiting. The item would get stuck in the queue, probably forever. Trying to call self.q.get_nowait() in run_consumer before the wait would probably fix this.
(b) The notifyAll() call in stop could also happen when some of the threads aren't waiting, so they would wait for another notification before checking self.running. Similarly to the last item, putting another check for self.running before the wait would solve this.
(b) the use of the condition.notify() in put() isn't quite future-proof; the docs say future versions of notify might sometimes wake up more than one thread, and the code doesn't allow for that. This is probably not an issue.
I have a feeling I didn't catch the subtle bug.
I tried the program and it seems to run well, but anyway :
In run_consumer there is:
self.condition.wait()
if not self.running:
return
shouldn't the condition be released before the return?
Damjan: that is why that code is in a try/finally block. The code in a finally block always gets executed after the try block completes, even if the try block terminated because of a break or return.
Then I added a print "Condition released" just after the finally: statement to make sure the condition is released.