Commit e2f33add authored by Thomas Moreau's avatar Thomas Moreau Committed by Antoine Pitrou

bpo-33078 - Fix queue size on pickling error (GH-6119)

parent 9308dea3
...@@ -161,7 +161,7 @@ class Queue(object): ...@@ -161,7 +161,7 @@ class Queue(object):
target=Queue._feed, target=Queue._feed,
args=(self._buffer, self._notempty, self._send_bytes, args=(self._buffer, self._notempty, self._send_bytes,
self._wlock, self._writer.close, self._ignore_epipe, self._wlock, self._writer.close, self._ignore_epipe,
self._on_queue_feeder_error), self._on_queue_feeder_error, self._sem),
name='QueueFeederThread' name='QueueFeederThread'
) )
self._thread.daemon = True self._thread.daemon = True
...@@ -203,7 +203,7 @@ class Queue(object): ...@@ -203,7 +203,7 @@ class Queue(object):
@staticmethod @staticmethod
def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe, def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe,
onerror): onerror, queue_sem):
debug('starting thread to feed data to pipe') debug('starting thread to feed data to pipe')
nacquire = notempty.acquire nacquire = notempty.acquire
nrelease = notempty.release nrelease = notempty.release
...@@ -255,6 +255,12 @@ class Queue(object): ...@@ -255,6 +255,12 @@ class Queue(object):
info('error in queue thread: %s', e) info('error in queue thread: %s', e)
return return
else: else:
# Since the object has not been sent in the queue, we need
# to decrease the size of the queue. The error acts as
# if the object had been silently removed from the queue
# and this step is necessary to have a properly working
# queue.
queue_sem.release()
onerror(e, obj) onerror(e, obj)
@staticmethod @staticmethod
......
...@@ -1056,6 +1056,19 @@ class _TestQueue(BaseTestCase): ...@@ -1056,6 +1056,19 @@ class _TestQueue(BaseTestCase):
self.assertTrue(q.get(timeout=1.0)) self.assertTrue(q.get(timeout=1.0))
close_queue(q) close_queue(q)
with test.support.captured_stderr():
# bpo-33078: verify that the queue size is correctly handled
# on errors.
q = self.Queue(maxsize=1)
q.put(NotSerializable())
q.put(True)
self.assertEqual(q.qsize(), 1)
# bpo-30595: use a timeout of 1 second for slow buildbots
self.assertTrue(q.get(timeout=1.0))
# Check that the size of the queue is correct
self.assertEqual(q.qsize(), 0)
close_queue(q)
def test_queue_feeder_on_queue_feeder_error(self): def test_queue_feeder_on_queue_feeder_error(self):
# bpo-30006: verify feeder handles exceptions using the # bpo-30006: verify feeder handles exceptions using the
# _on_queue_feeder_error hook. # _on_queue_feeder_error hook.
......
Fix the size handling in multiprocessing.Queue when a pickling error
occurs.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment