Commit bc50f03d authored by grzgrzgrz3's avatar grzgrzgrz3 Committed by Antoine Pitrou

bpo-30414: multiprocessing.Queue._feed do not break from main loop on exc (#1683)

* bpo-30414: multiprocesing.Queue._feed do not break from main loop on exc

Queue background running thread was not handling exceptions correctly.
Any exception occurred inside thread (putting unpickable object) cause
feeder to finish running. After that every message put into queue is
silently ignored.

* bpo-30414: multiprocesing.Queue._feed do not break from main loop on exc

Queue background running thread was not handling exceptions correctly.
Any exception occurred inside thread (putting unpickable object) cause
feeder to finish running. After that every message put into queue is
silently ignored.
parent 7ff1e88a
......@@ -221,8 +221,8 @@ class Queue(object):
else:
wacquire = None
try:
while 1:
while 1:
try:
nacquire()
try:
if not buffer:
......@@ -249,21 +249,19 @@ class Queue(object):
wrelease()
except IndexError:
pass
except Exception as e:
if ignore_epipe and getattr(e, 'errno', 0) == errno.EPIPE:
return
# Since this runs in a daemon thread the resources it uses
# may be become unusable while the process is cleaning up.
# We ignore errors which happen after the process has
# started to cleanup.
try:
except Exception as e:
if ignore_epipe and getattr(e, 'errno', 0) == errno.EPIPE:
return
# Since this runs in a daemon thread the resources it uses
# may be become unusable while the process is cleaning up.
# We ignore errors which happen after the process has
# started to cleanup.
if is_exiting():
info('error in queue thread: %s', e)
return
else:
import traceback
traceback.print_exc()
except Exception:
pass
_sentinel = object()
......
......@@ -752,6 +752,20 @@ class _TestQueue(BaseTestCase):
# Windows (usually 15.6 ms)
self.assertGreaterEqual(delta, 0.170)
def test_queue_feeder_donot_stop_onexc(self):
# bpo-30414: verify feeder handles exceptions correctly
if self.TYPE != 'processes':
self.skipTest('test not appropriate for {}'.format(self.TYPE))
class NotSerializable(object):
def __reduce__(self):
raise AttributeError
with test.support.captured_stderr():
q = self.Queue()
q.put(NotSerializable())
q.put(True)
self.assertTrue(q.get(timeout=0.1))
#
#
#
......
......@@ -341,6 +341,9 @@ Extension Modules
Library
-------
- bpo-30414: multiprocessing.Queue._feed background running
thread do not break from main loop on exception.
- bpo-30003: Fix handling escape characters in HZ codec. Based on patch
by Ma Lin.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment