Commit 3604b239 authored by Victor Stinner's avatar Victor Stinner Committed by GitHub

bpo-31479: Always reset the signal alarm in tests (GH-3588) (GH-7314)

* bpo-31479: Always reset the signal alarm in tests

Use "try: ... finally: signal.signal(0)" pattern to make sure that
tests don't "leak" a pending fatal signal alarm.

* Move two more alarm() calls into the try block

Fix also typo: replace signal.signal(0) with signal.alarm(0)

* Move another signal.alarm() into the try block

(cherry picked from commit 9abee722)
parent 0f642620
......@@ -3146,7 +3146,6 @@ class SignalsTest(unittest.TestCase):
try:
wio = self.io.open(w, **fdopen_kwargs)
t.start()
signal.alarm(1)
# Fill the pipe enough that the write will be blocking.
# It will be interrupted by the timer armed above. Since the
# other thread has read one byte, the low-level write will
......@@ -3154,10 +3153,13 @@ class SignalsTest(unittest.TestCase):
# The buffered IO layer must check for pending signal
# handlers, which in this case will invoke alarm_interrupt().
try:
signal.alarm(1)
with self.assertRaises(ZeroDivisionError):
wio.write(item * (support.PIPE_MAX_SIZE // len(item) + 1))
finally:
signal.alarm(0)
t.join()
# We got one byte, get another one and check that it isn't a
# repeat of the first one.
read_results.append(os.read(r, 1))
......@@ -3206,6 +3208,7 @@ class SignalsTest(unittest.TestCase):
if isinstance(exc, RuntimeError):
self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
finally:
signal.alarm(0)
wio.close()
os.close(r)
......@@ -3234,6 +3237,7 @@ class SignalsTest(unittest.TestCase):
# - third raw read() returns b"bar"
self.assertEqual(decode(rio.read(6)), "foobar")
finally:
signal.alarm(0)
rio.close()
os.close(w)
os.close(r)
......@@ -3295,6 +3299,7 @@ class SignalsTest(unittest.TestCase):
self.assertIsNone(error[0])
self.assertEqual(N, sum(len(x) for x in read_results))
finally:
signal.alarm(0)
write_finished = True
os.close(w)
os.close(r)
......
......@@ -70,14 +70,11 @@ class PtyTest(unittest.TestCase):
def setUp(self):
# isatty() and close() can hang on some platforms. Set an alarm
# before running the test to make sure we don't hang forever.
self.old_alarm = signal.signal(signal.SIGALRM, self.handle_sig)
old_alarm = signal.signal(signal.SIGALRM, self.handle_sig)
self.addCleanup(signal.signal, signal.SIGALRM, old_alarm)
self.addCleanup(signal.alarm, 0)
signal.alarm(10)
def tearDown(self):
# remove alarm, restore old alarm handler
signal.alarm(0)
signal.signal(signal.SIGALRM, self.old_alarm)
def handle_sig(self, sig, frame):
self.fail("isatty hung")
......
......@@ -138,6 +138,8 @@ class InterProcessSignalTests(unittest.TestCase):
else:
self.fail("pause returned of its own accord, and the signal"
" didn't arrive after another second.")
finally:
signal.alarm(0)
# Issue 3864. Unknown if this affects earlier versions of freebsd also.
@unittest.skipIf(sys.platform=='freebsd6',
......@@ -246,11 +248,15 @@ class WakeupSignalTests(unittest.TestCase):
import select
signal.alarm(1)
before_time = time.time()
# We attempt to get a signal during the sleep,
# before select is called
time.sleep(self.TIMEOUT_FULL)
mid_time = time.time()
try:
before_time = time.time()
# We attempt to get a signal during the sleep,
# before select is called
time.sleep(self.TIMEOUT_FULL)
mid_time = time.time()
finally:
signal.alarm(0)
self.assertTrue(mid_time - before_time < self.TIMEOUT_HALF)
select.select([self.read], [], [], self.TIMEOUT_FULL)
after_time = time.time()
......@@ -260,11 +266,15 @@ class WakeupSignalTests(unittest.TestCase):
import select
signal.alarm(1)
before_time = time.time()
# We attempt to get a signal during the select call
self.assertRaises(select.error, select.select,
[self.read], [], [], self.TIMEOUT_FULL)
after_time = time.time()
try:
before_time = time.time()
# We attempt to get a signal during the select call
self.assertRaises(select.error, select.select,
[self.read], [], [], self.TIMEOUT_FULL)
after_time = time.time()
finally:
signal.alarm(0)
self.assertTrue(after_time - before_time < self.TIMEOUT_HALF)
def setUp(self):
......
......@@ -1534,8 +1534,8 @@ class TCPTimeoutTest(SocketTCPTest):
raise Alarm
old_alarm = signal.signal(signal.SIGALRM, alarm_handler)
try:
signal.alarm(2) # POSIX allows alarm to be up to 1 second early
try:
signal.alarm(2) # POSIX allows alarm to be up to 1 second early
foo = self.serv.accept()
except socket.timeout:
self.fail("caught timeout instead of Alarm")
......
......@@ -88,12 +88,13 @@ class SocketServerTest(unittest.TestCase):
"""Test all socket servers."""
def setUp(self):
self.addCleanup(signal_alarm, 0)
signal_alarm(60) # Kill deadlocks after 60 seconds.
self.port_seed = 0
self.test_files = []
def tearDown(self):
signal_alarm(0) # Didn't deadlock.
self.doCleanups()
reap_children()
for fn in self.test_files:
......
......@@ -843,8 +843,11 @@ class _SuppressCoreFiles(object):
kw = {stream: subprocess.PIPE}
with subprocess.Popen(args, **kw) as process:
signal.alarm(1)
# communicate() will be interrupted by SIGALRM
process.communicate()
try:
# communicate() will be interrupted by SIGALRM
process.communicate()
finally:
signal.alarm(0)
@unittest.skipIf(mswindows, "POSIX specific tests")
......
......@@ -52,9 +52,11 @@ class ThreadSignals(unittest.TestCase):
# wait for it return.
if signal_blackboard[signal.SIGUSR1]['tripped'] == 0 \
or signal_blackboard[signal.SIGUSR2]['tripped'] == 0:
signal.alarm(1)
signal.pause()
signal.alarm(0)
try:
signal.alarm(1)
signal.pause()
finally:
signal.alarm(0)
self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped'], 1)
self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped_by'],
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment