Friday, August 05, 2005

NagleQueue

Here's a piece of code for the one or two other developers writing intensely network-dependent code in Python. The idea is, instead of a protocol that looks like

[command][arg]
[command][arg]
...

You save overhead (in your protocol and in the network's, if you're doing one connection per command) by batching things:

[command][arg][arg]...

Pretty obvious stuff, no doubt. But I thought the following class makes it rather elegant.

class NagleQueue:
    """
    When an item is put, NagleQueue waits for other puts for
    aggregate_time seconds (may be fractional) and then calls
    aggregate() with a list whose maximum length is max_items.

    TODO: currently, NagleQueue always waits the entire aggregate_time,
    even if max_items items are added sooner.

    NagleQueue starts a thread to handle the aggregate() call;
    this is not a Daemon thread, so you must call stop() before your
    application exits.  NagleQueue will process any remaining items,
    then quit.

    It is an unchecked error to put additional items after calling stop().
    (They may or may not get processed.)
    """
    def __init__(self, aggregate_time, max_items):
        self.q = queue.Queue()
        self.running = True
        self.aggregate_time = aggregate_time
        self.max_items = max_items
        threading.Thread(target=self._process, name='naglequeue').start()

    def put(self, item):
        self.q.put(item)

    def stop(self):
        self.running = False

    def _process(self):
        while True:
            try:
                item = self.q.get(timeout=1)
            except queue.Empty:
                if not self.running:
                    break
                else:
                    continue
            time.sleep(self.aggregate_time)

            L = []
            while True:
                L.append(item)
                if len(L) >= self.max_items:
                    break
                try:
                    item = self.q.get_nowait()
                except queue.Empty:
                    break
            self.aggregate(L)

    def aggregate(self, items):
        """
        combines list of items into a single request
        """
        raise 'must implement aggregate method'

2 comments:

Florian said...
This comment has been removed by a blog administrator.
Florian said...

Saw that you actually didn't implement the aggegation at all :D, my bad.