Here is a mostly-functioning class that facilitates writing producer / consumer algorithms. It is designed to support a single producing and multiple consuming threads, i.e., scenarios where consuming involves some blocking operations such as communicating over a socket.
"Mostly," in this case, means that there are two bugs in this class that can be fixed in a total of four or five lines. One is arguably more subtle than the other, but they involve the same part of the code. Can you spot them?
Warning: this one is tougher than part 2, which also dealt with threading.
import Queue as queue import threading class PCQueue: def __init__(self, initialthreads): self.q = queue.Queue() self.running = True self.condition = threading.Condition() for i in range(initialthreads): self.add_consumer() def run_consumer(self): while True: self.condition.acquire() try: self.condition.wait() if not self.running: return finally: self.condition.release() obj = self.q.get() self.consume(obj) def put(self, obj): self.condition.acquire() self.q.put(obj) self.condition.notify() self.condition.release() def consume(self, obj): raise 'must implement consume method' def add_consumer(self): threading.Thread(target=self.run_consumer).start() def stop(self): self.running = False self.condition.acquire() self.condition.notifyAll() self.condition.release()
Here's a short example using PCQueue.
class PCExample(PCQueue): def consume(self, url): import urllib n = len(urllib.urlopen(url).read()) print '\t%s contains %d bytes' % (url, n) q = PCExample(2) # 2 threads while True: print 'url? (empty string to quit)' url = raw_input() if not url: break q.put(url) q.stop()