Commit aa7c1792 authored by Vinay Sajip's avatar Vinay Sajip

Improved QueueListener implementation - queue sentinel addition made extensible.

parent 8f36af7a
...@@ -1307,6 +1307,16 @@ class QueueListener(object): ...@@ -1307,6 +1307,16 @@ class QueueListener(object):
except queue.Empty: except queue.Empty:
break break
def enqueue_sentinel(self):
"""
This is used to enqueue the sentinel record.
The base implementation uses put_nowait. You may want to override this
method if you want to use timeouts or work with custom queue
implementations.
"""
self.queue.put_nowait(self._sentinel)
def stop(self): def stop(self):
""" """
Stop the listener. Stop the listener.
...@@ -1316,6 +1326,6 @@ class QueueListener(object): ...@@ -1316,6 +1326,6 @@ class QueueListener(object):
may be some records still left on the queue, which won't be processed. may be some records still left on the queue, which won't be processed.
""" """
self._stop.set() self._stop.set()
self.queue.put_nowait(self._sentinel) self.enqueue_sentinel()
self._thread.join() self._thread.join()
self._thread = None self._thread = None
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment