Here is a mostly-functioning class that facilitates writing producer / consumer algorithms. It is designed to support a single producing and multiple consuming threads, i.e., scenarios where consuming involves some blocking operations such as communicating over a socket.
"Mostly," in this case, means that there are two bugs in this class that can be fixed in a total of four or five lines. One is arguably more subtle than the other, but they involve the same part of the code. Can you spot them?
Warning: this one is tougher than part 2, which also dealt with threading.
import Queue as queue
import threading
class PCQueue:
def __init__(self, initialthreads):
self.q = queue.Queue()
self.running = True
self.condition = threading.Condition()
for i in range(initialthreads):
self.add_consumer()
def run_consumer(self):
while True:
self.condition.acquire()
try:
self.condition.wait()
if not self.running:
return
finally:
self.condition.release()
obj = self.q.get()
self.consume(obj)
def put(self, obj):
self.condition.acquire()
self.q.put(obj)
self.condition.notify()
self.condition.release()
def consume(self, obj):
raise 'must implement consume method'
def add_consumer(self):
threading.Thread(target=self.run_consumer).start()
def stop(self):
self.running = False
self.condition.acquire()
self.condition.notifyAll()
self.condition.release()
Here's a short example using PCQueue.
class PCExample(PCQueue):
def consume(self, url):
import urllib
n = len(urllib.urlopen(url).read())
print '\t%s contains %d bytes' % (url, n)
q = PCExample(2) # 2 threads
while True:
print 'url? (empty string to quit)'
url = raw_input()
if not url:
break
q.put(url)
q.stop()