Commit 68e0808a authored by Victor Stinner's avatar Victor Stinner

(Merge 3.4) Close #22175: Improve test_faulthandler readability with dedent.

Patch written by Xavier de Gaye.
parents ed16b2e5 6d201685
...@@ -10,6 +10,7 @@ from test import support, script_helper ...@@ -10,6 +10,7 @@ from test import support, script_helper
from test.script_helper import assert_python_ok from test.script_helper import assert_python_ok
import tempfile import tempfile
import unittest import unittest
from textwrap import dedent
try: try:
import threading import threading
...@@ -51,6 +52,7 @@ class FaultHandlerTests(unittest.TestCase): ...@@ -51,6 +52,7 @@ class FaultHandlerTests(unittest.TestCase):
build, and replace "Current thread 0x00007f8d8fbd9700" by "Current build, and replace "Current thread 0x00007f8d8fbd9700" by "Current
thread XXX". thread XXX".
""" """
code = dedent(code).strip()
with support.SuppressCrashReport(): with support.SuppressCrashReport():
process = script_helper.spawn_python('-c', code) process = script_helper.spawn_python('-c', code)
stdout, stderr = process.communicate() stdout, stderr = process.communicate()
...@@ -80,15 +82,15 @@ class FaultHandlerTests(unittest.TestCase): ...@@ -80,15 +82,15 @@ class FaultHandlerTests(unittest.TestCase):
else: else:
header = 'Stack (most recent call first)' header = 'Stack (most recent call first)'
regex = """ regex = """
^Fatal Python error: {name} ^Fatal Python error: {name}
{header}: {header}:
File "<string>", line {lineno} in <module> File "<string>", line {lineno} in <module>
""".strip() """
regex = regex.format( regex = dedent(regex.format(
lineno=line_number, lineno=line_number,
name=name_regex, name=name_regex,
header=re.escape(header)) header=re.escape(header))).strip()
if other_regex: if other_regex:
regex += '|' + other_regex regex += '|' + other_regex
output, exitcode = self.get_output(code, filename) output, exitcode = self.get_output(code, filename)
...@@ -100,29 +102,29 @@ class FaultHandlerTests(unittest.TestCase): ...@@ -100,29 +102,29 @@ class FaultHandlerTests(unittest.TestCase):
"the first page of memory is a mapped read-only on AIX") "the first page of memory is a mapped read-only on AIX")
def test_read_null(self): def test_read_null(self):
self.check_fatal_error(""" self.check_fatal_error("""
import faulthandler import faulthandler
faulthandler.enable() faulthandler.enable()
faulthandler._read_null() faulthandler._read_null()
""".strip(), """,
3, 3,
# Issue #12700: Read NULL raises SIGILL on Mac OS X Lion # Issue #12700: Read NULL raises SIGILL on Mac OS X Lion
'(?:Segmentation fault|Bus error|Illegal instruction)') '(?:Segmentation fault|Bus error|Illegal instruction)')
def test_sigsegv(self): def test_sigsegv(self):
self.check_fatal_error(""" self.check_fatal_error("""
import faulthandler import faulthandler
faulthandler.enable() faulthandler.enable()
faulthandler._sigsegv() faulthandler._sigsegv()
""".strip(), """,
3, 3,
'Segmentation fault') 'Segmentation fault')
def test_sigabrt(self): def test_sigabrt(self):
self.check_fatal_error(""" self.check_fatal_error("""
import faulthandler import faulthandler
faulthandler.enable() faulthandler.enable()
faulthandler._sigabrt() faulthandler._sigabrt()
""".strip(), """,
3, 3,
'Aborted') 'Aborted')
...@@ -130,10 +132,10 @@ faulthandler._sigabrt() ...@@ -130,10 +132,10 @@ faulthandler._sigabrt()
"SIGFPE cannot be caught on Windows") "SIGFPE cannot be caught on Windows")
def test_sigfpe(self): def test_sigfpe(self):
self.check_fatal_error(""" self.check_fatal_error("""
import faulthandler import faulthandler
faulthandler.enable() faulthandler.enable()
faulthandler._sigfpe() faulthandler._sigfpe()
""".strip(), """,
3, 3,
'Floating point exception') 'Floating point exception')
...@@ -141,13 +143,13 @@ faulthandler._sigfpe() ...@@ -141,13 +143,13 @@ faulthandler._sigfpe()
@unittest.skipUnless(hasattr(signal, 'SIGBUS'), 'need signal.SIGBUS') @unittest.skipUnless(hasattr(signal, 'SIGBUS'), 'need signal.SIGBUS')
def test_sigbus(self): def test_sigbus(self):
self.check_fatal_error(""" self.check_fatal_error("""
import _testcapi import _testcapi
import faulthandler import faulthandler
import signal import signal
faulthandler.enable() faulthandler.enable()
_testcapi.raise_signal(signal.SIGBUS) _testcapi.raise_signal(signal.SIGBUS)
""".strip(), """,
6, 6,
'Bus error') 'Bus error')
...@@ -155,21 +157,21 @@ _testcapi.raise_signal(signal.SIGBUS) ...@@ -155,21 +157,21 @@ _testcapi.raise_signal(signal.SIGBUS)
@unittest.skipUnless(hasattr(signal, 'SIGILL'), 'need signal.SIGILL') @unittest.skipUnless(hasattr(signal, 'SIGILL'), 'need signal.SIGILL')
def test_sigill(self): def test_sigill(self):
self.check_fatal_error(""" self.check_fatal_error("""
import _testcapi import _testcapi
import faulthandler import faulthandler
import signal import signal
faulthandler.enable() faulthandler.enable()
_testcapi.raise_signal(signal.SIGILL) _testcapi.raise_signal(signal.SIGILL)
""".strip(), """,
6, 6,
'Illegal instruction') 'Illegal instruction')
def test_fatal_error(self): def test_fatal_error(self):
self.check_fatal_error(""" self.check_fatal_error("""
import faulthandler import faulthandler
faulthandler._fatal_error(b'xyz') faulthandler._fatal_error(b'xyz')
""".strip(), """,
2, 2,
'xyz') 'xyz')
...@@ -180,52 +182,52 @@ faulthandler._fatal_error(b'xyz') ...@@ -180,52 +182,52 @@ faulthandler._fatal_error(b'xyz')
'need faulthandler._stack_overflow()') 'need faulthandler._stack_overflow()')
def test_stack_overflow(self): def test_stack_overflow(self):
self.check_fatal_error(""" self.check_fatal_error("""
import faulthandler import faulthandler
faulthandler.enable() faulthandler.enable()
faulthandler._stack_overflow() faulthandler._stack_overflow()
""".strip(), """,
3, 3,
'(?:Segmentation fault|Bus error)', '(?:Segmentation fault|Bus error)',
other_regex='unable to raise a stack overflow') other_regex='unable to raise a stack overflow')
def test_gil_released(self): def test_gil_released(self):
self.check_fatal_error(""" self.check_fatal_error("""
import faulthandler import faulthandler
faulthandler.enable() faulthandler.enable()
faulthandler._read_null(True) faulthandler._read_null(True)
""".strip(), """,
3, 3,
'(?:Segmentation fault|Bus error|Illegal instruction)') '(?:Segmentation fault|Bus error|Illegal instruction)')
def test_enable_file(self): def test_enable_file(self):
with temporary_filename() as filename: with temporary_filename() as filename:
self.check_fatal_error(""" self.check_fatal_error("""
import faulthandler import faulthandler
output = open({filename}, 'wb') output = open({filename}, 'wb')
faulthandler.enable(output) faulthandler.enable(output)
faulthandler._sigsegv() faulthandler._sigsegv()
""".strip().format(filename=repr(filename)), """.format(filename=repr(filename)),
4, 4,
'Segmentation fault', 'Segmentation fault',
filename=filename) filename=filename)
def test_enable_single_thread(self): def test_enable_single_thread(self):
self.check_fatal_error(""" self.check_fatal_error("""
import faulthandler import faulthandler
faulthandler.enable(all_threads=False) faulthandler.enable(all_threads=False)
faulthandler._sigsegv() faulthandler._sigsegv()
""".strip(), """,
3, 3,
'Segmentation fault', 'Segmentation fault',
all_threads=False) all_threads=False)
def test_disable(self): def test_disable(self):
code = """ code = """
import faulthandler import faulthandler
faulthandler.enable() faulthandler.enable()
faulthandler.disable() faulthandler.disable()
faulthandler._sigsegv() faulthandler._sigsegv()
""".strip() """
not_expected = 'Fatal Python error' not_expected = 'Fatal Python error'
stderr, exitcode = self.get_output(code) stderr, exitcode = self.get_output(code)
stder = '\n'.join(stderr) stder = '\n'.join(stderr)
...@@ -293,20 +295,20 @@ faulthandler._sigsegv() ...@@ -293,20 +295,20 @@ faulthandler._sigsegv()
Raise an error if the output doesn't match the expected format. Raise an error if the output doesn't match the expected format.
""" """
code = """ code = """
import faulthandler import faulthandler
def funcB(): def funcB():
if {has_filename}: if {has_filename}:
with open({filename}, "wb") as fp: with open({filename}, "wb") as fp:
faulthandler.dump_traceback(fp, all_threads=False) faulthandler.dump_traceback(fp, all_threads=False)
else: else:
faulthandler.dump_traceback(all_threads=False) faulthandler.dump_traceback(all_threads=False)
def funcA(): def funcA():
funcB() funcB()
funcA() funcA()
""".strip() """
code = code.format( code = code.format(
filename=repr(filename), filename=repr(filename),
has_filename=bool(filename), has_filename=bool(filename),
...@@ -337,13 +339,13 @@ funcA() ...@@ -337,13 +339,13 @@ funcA()
func_name = 'x' * (maxlen + 50) func_name = 'x' * (maxlen + 50)
truncated = 'x' * maxlen + '...' truncated = 'x' * maxlen + '...'
code = """ code = """
import faulthandler import faulthandler
def {func_name}(): def {func_name}():
faulthandler.dump_traceback(all_threads=False) faulthandler.dump_traceback(all_threads=False)
{func_name}() {func_name}()
""".strip() """
code = code.format( code = code.format(
func_name=func_name, func_name=func_name,
) )
...@@ -363,37 +365,37 @@ def {func_name}(): ...@@ -363,37 +365,37 @@ def {func_name}():
Raise an error if the output doesn't match the expected format. Raise an error if the output doesn't match the expected format.
""" """
code = """ code = """
import faulthandler import faulthandler
from threading import Thread, Event from threading import Thread, Event
import time import time
def dump(): def dump():
if {filename}: if {filename}:
with open({filename}, "wb") as fp: with open({filename}, "wb") as fp:
faulthandler.dump_traceback(fp, all_threads=True) faulthandler.dump_traceback(fp, all_threads=True)
else: else:
faulthandler.dump_traceback(all_threads=True) faulthandler.dump_traceback(all_threads=True)
class Waiter(Thread): class Waiter(Thread):
# avoid blocking if the main thread raises an exception. # avoid blocking if the main thread raises an exception.
daemon = True daemon = True
def __init__(self): def __init__(self):
Thread.__init__(self) Thread.__init__(self)
self.running = Event() self.running = Event()
self.stop = Event() self.stop = Event()
def run(self): def run(self):
self.running.set() self.running.set()
self.stop.wait() self.stop.wait()
waiter = Waiter() waiter = Waiter()
waiter.start() waiter.start()
waiter.running.wait() waiter.running.wait()
dump() dump()
waiter.stop.set() waiter.stop.set()
waiter.join() waiter.join()
""".strip() """
code = code.format(filename=repr(filename)) code = code.format(filename=repr(filename))
output, exitcode = self.get_output(code, filename) output, exitcode = self.get_output(code, filename)
output = '\n'.join(output) output = '\n'.join(output)
...@@ -402,17 +404,17 @@ waiter.join() ...@@ -402,17 +404,17 @@ waiter.join()
else: else:
lineno = 10 lineno = 10
regex = """ regex = """
^Thread 0x[0-9a-f]+ \(most recent call first\): ^Thread 0x[0-9a-f]+ \(most recent call first\):
(?: File ".*threading.py", line [0-9]+ in [_a-z]+ (?: File ".*threading.py", line [0-9]+ in [_a-z]+
){{1,3}} File "<string>", line 23 in run ){{1,3}} File "<string>", line 23 in run
File ".*threading.py", line [0-9]+ in _bootstrap_inner File ".*threading.py", line [0-9]+ in _bootstrap_inner
File ".*threading.py", line [0-9]+ in _bootstrap File ".*threading.py", line [0-9]+ in _bootstrap
Current thread XXX \(most recent call first\): Current thread XXX \(most recent call first\):
File "<string>", line {lineno} in dump File "<string>", line {lineno} in dump
File "<string>", line 28 in <module>$ File "<string>", line 28 in <module>$
""".strip() """
regex = regex.format(lineno=lineno) regex = dedent(regex.format(lineno=lineno)).strip()
self.assertRegex(output, regex) self.assertRegex(output, regex)
self.assertEqual(exitcode, 0) self.assertEqual(exitcode, 0)
...@@ -433,29 +435,29 @@ Current thread XXX \(most recent call first\): ...@@ -433,29 +435,29 @@ Current thread XXX \(most recent call first\):
""" """
timeout_str = str(datetime.timedelta(seconds=TIMEOUT)) timeout_str = str(datetime.timedelta(seconds=TIMEOUT))
code = """ code = """
import faulthandler import faulthandler
import time import time
def func(timeout, repeat, cancel, file, loops): def func(timeout, repeat, cancel, file, loops):
for loop in range(loops): for loop in range(loops):
faulthandler.dump_traceback_later(timeout, repeat=repeat, file=file) faulthandler.dump_traceback_later(timeout, repeat=repeat, file=file)
if cancel: if cancel:
faulthandler.cancel_dump_traceback_later() faulthandler.cancel_dump_traceback_later()
time.sleep(timeout * 5) time.sleep(timeout * 5)
faulthandler.cancel_dump_traceback_later() faulthandler.cancel_dump_traceback_later()
timeout = {timeout} timeout = {timeout}
repeat = {repeat} repeat = {repeat}
cancel = {cancel} cancel = {cancel}
loops = {loops} loops = {loops}
if {has_filename}: if {has_filename}:
file = open({filename}, "wb") file = open({filename}, "wb")
else: else:
file = None file = None
func(timeout, repeat, cancel, file, loops) func(timeout, repeat, cancel, file, loops)
if file is not None: if file is not None:
file.close() file.close()
""".strip() """
code = code.format( code = code.format(
timeout=TIMEOUT, timeout=TIMEOUT,
repeat=repeat, repeat=repeat,
...@@ -522,45 +524,45 @@ if file is not None: ...@@ -522,45 +524,45 @@ if file is not None:
""" """
signum = signal.SIGUSR1 signum = signal.SIGUSR1
code = """ code = """
import faulthandler import faulthandler
import os import os
import signal import signal
import sys import sys
def func(signum): def func(signum):
os.kill(os.getpid(), signum) os.kill(os.getpid(), signum)
def handler(signum, frame): def handler(signum, frame):
handler.called = True handler.called = True
handler.called = False handler.called = False
exitcode = 0 exitcode = 0
signum = {signum} signum = {signum}
unregister = {unregister} unregister = {unregister}
chain = {chain} chain = {chain}
if {has_filename}: if {has_filename}:
file = open({filename}, "wb") file = open({filename}, "wb")
else: else:
file = None file = None
if chain: if chain:
signal.signal(signum, handler) signal.signal(signum, handler)
faulthandler.register(signum, file=file, faulthandler.register(signum, file=file,
all_threads={all_threads}, chain={chain}) all_threads={all_threads}, chain={chain})
if unregister: if unregister:
faulthandler.unregister(signum) faulthandler.unregister(signum)
func(signum) func(signum)
if chain and not handler.called: if chain and not handler.called:
if file is not None: if file is not None:
output = file output = file
else: else:
output = sys.stderr output = sys.stderr
print("Error: signal handler not called!", file=output) print("Error: signal handler not called!", file=output)
exitcode = 1 exitcode = 1
if file is not None: if file is not None:
file.close() file.close()
sys.exit(exitcode) sys.exit(exitcode)
""".strip() """
code = code.format( code = code.format(
filename=repr(filename), filename=repr(filename),
has_filename=bool(filename), has_filename=bool(filename),
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment