Commit ab745043 authored by Antoine Pitrou's avatar Antoine Pitrou Committed by GitHub

bpo-32576: use queue.SimpleQueue in critical places (#5216)

Where a queue may be invoked from a weakref callback, we need
to use the reentrant SimpleQueue.
parent 6027802c
...@@ -128,7 +128,7 @@ class ThreadPoolExecutor(_base.Executor): ...@@ -128,7 +128,7 @@ class ThreadPoolExecutor(_base.Executor):
raise TypeError("initializer must be a callable") raise TypeError("initializer must be a callable")
self._max_workers = max_workers self._max_workers = max_workers
self._work_queue = queue.Queue() self._work_queue = queue.SimpleQueue()
self._threads = set() self._threads = set()
self._broken = False self._broken = False
self._shutdown = False self._shutdown = False
......
...@@ -156,7 +156,7 @@ class Pool(object): ...@@ -156,7 +156,7 @@ class Pool(object):
maxtasksperchild=None, context=None): maxtasksperchild=None, context=None):
self._ctx = context or get_context() self._ctx = context or get_context()
self._setup_queues() self._setup_queues()
self._taskqueue = queue.Queue() self._taskqueue = queue.SimpleQueue()
self._cache = {} self._cache = {}
self._state = RUN self._state = RUN
self._maxtasksperchild = maxtasksperchild self._maxtasksperchild = maxtasksperchild
...@@ -802,15 +802,18 @@ class ThreadPool(Pool): ...@@ -802,15 +802,18 @@ class ThreadPool(Pool):
Pool.__init__(self, processes, initializer, initargs) Pool.__init__(self, processes, initializer, initargs)
def _setup_queues(self): def _setup_queues(self):
self._inqueue = queue.Queue() self._inqueue = queue.SimpleQueue()
self._outqueue = queue.Queue() self._outqueue = queue.SimpleQueue()
self._quick_put = self._inqueue.put self._quick_put = self._inqueue.put
self._quick_get = self._outqueue.get self._quick_get = self._outqueue.get
@staticmethod @staticmethod
def _help_stuff_finish(inqueue, task_handler, size): def _help_stuff_finish(inqueue, task_handler, size):
# put sentinels at head of inqueue to make workers finish # drain inqueue, and put sentinels at its head to make workers finish
with inqueue.not_empty: try:
inqueue.queue.clear() while True:
inqueue.queue.extend([None] * size) inqueue.get(block=False)
inqueue.not_empty.notify_all() except queue.Empty:
pass
for i in range(size):
inqueue.put(None)
Use queue.SimpleQueue() in places where it can be invoked from a weakref
callback.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment