Here's a piece of code for the one or two other developers writing intensely network-dependent code in Python. The idea is, instead of a protocol that looks like
[command][arg] [command][arg] ...
You save overhead (in your protocol and in the network's, if you're doing one connection per command) by batching things:
[command][arg][arg]...
Pretty obvious stuff, no doubt. But I thought the following class makes it rather elegant.
class NagleQueue: """ When an item is put, NagleQueue waits for other puts for aggregate_time seconds (may be fractional) and then calls aggregate() with a list whose maximum length is max_items. TODO: currently, NagleQueue always waits the entire aggregate_time, even if max_items items are added sooner. NagleQueue starts a thread to handle the aggregate() call; this is not a Daemon thread, so you must call stop() before your application exits. NagleQueue will process any remaining items, then quit. It is an unchecked error to put additional items after calling stop(). (They may or may not get processed.) """ def __init__(self, aggregate_time, max_items): self.q = queue.Queue() self.running = True self.aggregate_time = aggregate_time self.max_items = max_items threading.Thread(target=self._process, name='naglequeue').start() def put(self, item): self.q.put(item) def stop(self): self.running = False def _process(self): while True: try: item = self.q.get(timeout=1) except queue.Empty: if not self.running: break else: continue time.sleep(self.aggregate_time) L = [] while True: L.append(item) if len(L) >= self.max_items: break try: item = self.q.get_nowait() except queue.Empty: break self.aggregate(L) def aggregate(self, items): """ combines list of items into a single request """ raise 'must implement aggregate method'
Comments