Commit 8d64248c authored by Victor Stinner's avatar Victor Stinner

Issue #12363: improve siginterrupt() tests

Backport commits 968b9ff9a059 and aff0a7b0cb12 from the default branch to 3.2
branch. Extract of the changelog messages:

"The previous tests used time.sleep() to synchronize two processes. If the host
was too slow, the test could fail.

The new tests only use one process, but they use a subprocess to:

- have only one thread
- have a timeout on the blocking read (select cannot be used in the test,
select always fail with EINTR, the kernel doesn't restart it)
- not touch signal handling of the parent process"

and

"Add a basic synchronization code between the child and the parent processes:
the child writes "ready" to stdout."

I replaced .communicate(timeout=3.0) by an explicit waiting loop using
Popen.poll().
parent 26d31869
import unittest
from test import support
from contextlib import closing
import errno
import gc
import os
import pickle
import select
import signal
import subprocess
import sys
import time
import traceback
import sys, os, time, errno
import unittest
from test import support
from contextlib import closing
from test.script_helper import spawn_python
if sys.platform in ('os2', 'riscos'):
raise unittest.SkipTest("Can't test signal on %s" % sys.platform)
......@@ -276,103 +280,96 @@ class WakeupSignalTests(unittest.TestCase):
@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
class SiginterruptTest(unittest.TestCase):
def setUp(self):
"""Install a no-op signal handler that can be set to allow
interrupts or not, and arrange for the original signal handler to be
re-installed when the test is finished.
"""
self.signum = signal.SIGUSR1
oldhandler = signal.signal(self.signum, lambda x,y: None)
self.addCleanup(signal.signal, self.signum, oldhandler)
def readpipe_interrupted(self):
def readpipe_interrupted(self, interrupt):
"""Perform a read during which a signal will arrive. Return True if the
read is interrupted by the signal and raises an exception. Return False
if it returns normally.
"""
# Create a pipe that can be used for the read. Also clean it up
# when the test is over, since nothing else will (but see below for
# the write end).
r, w = os.pipe()
self.addCleanup(os.close, r)
# Create another process which can send a signal to this one to try
# to interrupt the read.
ppid = os.getpid()
pid = os.fork()
if pid == 0:
# Child code: sleep to give the parent enough time to enter the
# read() call (there's a race here, but it's really tricky to
# eliminate it); then signal the parent process. Also, sleep
# again to make it likely that the signal is delivered to the
# parent process before the child exits. If the child exits
# first, the write end of the pipe will be closed and the test
# is invalid.
try:
time.sleep(0.2)
os.kill(ppid, self.signum)
time.sleep(0.2)
finally:
# No matter what, just exit as fast as possible now.
exit_subprocess()
else:
# Parent code.
# Make sure the child is eventually reaped, else it'll be a
# zombie for the rest of the test suite run.
self.addCleanup(os.waitpid, pid, 0)
# Close the write end of the pipe. The child has a copy, so
# it's not really closed until the child exits. We need it to
# close when the child exits so that in the non-interrupt case
# the read eventually completes, otherwise we could just close
# it *after* the test.
os.close(w)
# Try the read and report whether it is interrupted or not to
# the caller.
class Timeout(Exception):
pass
# use a subprocess to have only one thread, to have a timeout on the
# blocking read and to not touch signal handling in this process
code = """if 1:
import errno
import os
import signal
import sys
interrupt = %r
r, w = os.pipe()
def handler(signum, frame):
pass
print("ready")
sys.stdout.flush()
signal.signal(signal.SIGALRM, handler)
if interrupt is not None:
signal.siginterrupt(signal.SIGALRM, interrupt)
# run the test twice
for loop in range(2):
# send a SIGALRM in a second (during the read)
signal.alarm(1)
try:
# blocking call: read from a pipe without data
os.read(r, 1)
except OSError as err:
if err.errno != errno.EINTR:
raise
else:
sys.exit(2)
sys.exit(3)
""" % (interrupt,)
with spawn_python('-c', code) as process:
try:
d = os.read(r, 1)
# wait until the child process is loaded and has started
first_line = process.stdout.readline()
# Wait the process with a timeout of 3 seconds
timeout = time.time() + 3.0
while True:
if timeout < time.time():
raise Timeout()
status = process.poll()
if status is not None:
break
time.sleep(0.1)
stdout, stderr = process.communicate()
except Timeout:
process.kill()
return False
except OSError as err:
if err.errno != errno.EINTR:
raise
return True
else:
stdout = first_line + stdout
exitcode = process.wait()
if exitcode not in (2, 3):
raise Exception("Child error (exit code %s): %s"
% (exitcode, stdout))
return (exitcode == 3)
def test_without_siginterrupt(self):
"""If a signal handler is installed and siginterrupt is not called
at all, when that signal arrives, it interrupts a syscall that's in
progress.
"""
i = self.readpipe_interrupted()
self.assertTrue(i)
# Arrival of the signal shouldn't have changed anything.
i = self.readpipe_interrupted()
self.assertTrue(i)
# If a signal handler is installed and siginterrupt is not called
# at all, when that signal arrives, it interrupts a syscall that's in
# progress.
interrupted = self.readpipe_interrupted(None)
self.assertTrue(interrupted)
def test_siginterrupt_on(self):
"""If a signal handler is installed and siginterrupt is called with
a true value for the second argument, when that signal arrives, it
interrupts a syscall that's in progress.
"""
signal.siginterrupt(self.signum, 1)
i = self.readpipe_interrupted()
self.assertTrue(i)
# Arrival of the signal shouldn't have changed anything.
i = self.readpipe_interrupted()
self.assertTrue(i)
# If a signal handler is installed and siginterrupt is called with
# a true value for the second argument, when that signal arrives, it
# interrupts a syscall that's in progress.
interrupted = self.readpipe_interrupted(True)
self.assertTrue(interrupted)
def test_siginterrupt_off(self):
"""If a signal handler is installed and siginterrupt is called with
a false value for the second argument, when that signal arrives, it
does not interrupt a syscall that's in progress.
"""
signal.siginterrupt(self.signum, 0)
i = self.readpipe_interrupted()
self.assertFalse(i)
# Arrival of the signal shouldn't have changed anything.
i = self.readpipe_interrupted()
self.assertFalse(i)
# If a signal handler is installed and siginterrupt is called with
# a false value for the second argument, when that signal arrives, it
# does not interrupt a syscall that's in progress.
interrupted = self.readpipe_interrupted(False)
self.assertFalse(interrupted)
@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment