test_faulthandler.py 27.4 KB
Newer Older
1
from contextlib import contextmanager
2
import datetime
3
import faulthandler
4
import os
5 6 7
import signal
import subprocess
import sys
8
from test import support
9
from test.support import script_helper, is_android, requires_android_level
10
import tempfile
11
import threading
12
import unittest
13
from textwrap import dedent
14

15 16 17 18
try:
    import _testcapi
except ImportError:
    _testcapi = None
19

20
TIMEOUT = 0.5
21
MS_WINDOWS = (os.name == 'nt')
22

23
def expected_traceback(lineno1, lineno2, header, min_count=1):
24
    regex = header
25 26
    regex += '  File "<string>", line %s in func\n' % lineno1
    regex += '  File "<string>", line %s in <module>' % lineno2
27 28 29 30
    if 1 < min_count:
        return '^' + (regex + '\n') * (min_count - 1) + regex
    else:
        return '^' + regex + '$'
31 32 33 34 35 36 37 38 39

@contextmanager
def temporary_filename():
    filename = tempfile.mktemp()
    try:
        yield filename
    finally:
        support.unlink(filename)

40 41
def requires_raise(test):
    return (test if not is_android else
42
                   requires_android_level(24, 'raise() is buggy')(test))
43

44
class FaultHandlerTests(unittest.TestCase):
45
    def get_output(self, code, filename=None, fd=None):
46 47 48 49 50 51 52 53 54
        """
        Run the specified code in Python (in a new child process) and read the
        output from the standard error or from a file (if filename is set).
        Return the output lines as a list.

        Strip the reference count from the standard error for Python debug
        build, and replace "Current thread 0x00007f8d8fbd9700" by "Current
        thread XXX".
        """
55
        code = dedent(code).strip()
56 57 58
        pass_fds = []
        if fd is not None:
            pass_fds.append(fd)
59
        with support.SuppressCrashReport():
60
            process = script_helper.spawn_python('-c', code, pass_fds=pass_fds)
61 62 63
            with process:
                stdout, stderr = process.communicate()
                exitcode = process.wait()
64 65
        output = support.strip_python_stderr(stdout)
        output = output.decode('ascii', 'backslashreplace')
66
        if filename:
67
            self.assertEqual(output, '')
68 69
            with open(filename, "rb") as fp:
                output = fp.read()
70
            output = output.decode('ascii', 'backslashreplace')
71 72 73 74 75 76
        elif fd is not None:
            self.assertEqual(output, '')
            os.lseek(fd, os.SEEK_SET, 0)
            with open(fd, "rb", closefd=False) as fp:
                output = fp.read()
            output = output.decode('ascii', 'backslashreplace')
77
        return output.splitlines(), exitcode
78

79 80 81
    def check_error(self, code, line_number, fatal_error, *,
                    filename=None, all_threads=True, other_regex=None,
                    fd=None, know_current_thread=True):
82 83 84 85 86 87 88
        """
        Check that the fault handler for fatal errors is enabled and check the
        traceback from the child process output.

        Raise an error if the output doesn't match the expected format.
        """
        if all_threads:
89 90 91 92
            if know_current_thread:
                header = 'Current thread 0x[0-9a-f]+'
            else:
                header = 'Thread 0x[0-9a-f]+'
93
        else:
94
            header = 'Stack'
95
        regex = r"""
96
            ^{fatal_error}
97

98
            {header} \(most recent call first\):
99 100 101
              File "<string>", line {lineno} in <module>
            """
        regex = dedent(regex.format(
102
            lineno=line_number,
103
            fatal_error=fatal_error,
104
            header=header)).strip()
105 106
        if other_regex:
            regex += '|' + other_regex
107
        output, exitcode = self.get_output(code, filename=filename, fd=fd)
108 109
        output = '\n'.join(output)
        self.assertRegex(output, regex)
110
        self.assertNotEqual(exitcode, 0)
111

112 113 114 115 116
    def check_fatal_error(self, code, line_number, name_regex, **kw):
        fatal_error = 'Fatal Python error: %s' % name_regex
        self.check_error(code, line_number, fatal_error, **kw)

    def check_windows_exception(self, code, line_number, name_regex, **kw):
117
        fatal_error = 'Windows fatal exception: %s' % name_regex
118 119
        self.check_error(code, line_number, fatal_error, **kw)

120 121
    @unittest.skipIf(sys.platform.startswith('aix'),
                     "the first page of memory is a mapped read-only on AIX")
122
    def test_read_null(self):
123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141
        if not MS_WINDOWS:
            self.check_fatal_error("""
                import faulthandler
                faulthandler.enable()
                faulthandler._read_null()
                """,
                3,
                # Issue #12700: Read NULL raises SIGILL on Mac OS X Lion
                '(?:Segmentation fault'
                    '|Bus error'
                    '|Illegal instruction)')
        else:
            self.check_windows_exception("""
                import faulthandler
                faulthandler.enable()
                faulthandler._read_null()
                """,
                3,
                'access violation')
142

143
    @requires_raise
144 145
    def test_sigsegv(self):
        self.check_fatal_error("""
146 147 148 149
            import faulthandler
            faulthandler.enable()
            faulthandler._sigsegv()
            """,
150 151 152
            3,
            'Segmentation fault')

153 154 155 156 157 158 159 160 161 162
    def test_fatal_error_c_thread(self):
        self.check_fatal_error("""
            import faulthandler
            faulthandler.enable()
            faulthandler._fatal_error_c_thread()
            """,
            3,
            'in new thread',
            know_current_thread=False)

163 164
    def test_sigabrt(self):
        self.check_fatal_error("""
165 166 167 168
            import faulthandler
            faulthandler.enable()
            faulthandler._sigabrt()
            """,
169 170 171
            3,
            'Aborted')

172 173 174 175
    @unittest.skipIf(sys.platform == 'win32',
                     "SIGFPE cannot be caught on Windows")
    def test_sigfpe(self):
        self.check_fatal_error("""
176 177 178 179
            import faulthandler
            faulthandler.enable()
            faulthandler._sigfpe()
            """,
180 181 182
            3,
            'Floating point exception')

183 184
    @unittest.skipIf(_testcapi is None, 'need _testcapi')
    @unittest.skipUnless(hasattr(signal, 'SIGBUS'), 'need signal.SIGBUS')
185
    @requires_raise
186 187
    def test_sigbus(self):
        self.check_fatal_error("""
188
            import _testcapi
189
            import faulthandler
190
            import signal
191

192
            faulthandler.enable()
193
            _testcapi.raise_signal(signal.SIGBUS)
194
            """,
195
            6,
196 197
            'Bus error')

198 199
    @unittest.skipIf(_testcapi is None, 'need _testcapi')
    @unittest.skipUnless(hasattr(signal, 'SIGILL'), 'need signal.SIGILL')
200
    @requires_raise
201 202
    def test_sigill(self):
        self.check_fatal_error("""
203
            import _testcapi
204
            import faulthandler
205
            import signal
206

207
            faulthandler.enable()
208
            _testcapi.raise_signal(signal.SIGILL)
209
            """,
210
            6,
211 212 213 214
            'Illegal instruction')

    def test_fatal_error(self):
        self.check_fatal_error("""
215 216 217
            import faulthandler
            faulthandler._fatal_error(b'xyz')
            """,
218 219 220
            2,
            'xyz')

221 222 223 224 225 226 227 228
    def test_fatal_error_without_gil(self):
        self.check_fatal_error("""
            import faulthandler
            faulthandler._fatal_error(b'xyz', True)
            """,
            2,
            'xyz')

229
    @unittest.skipIf(sys.platform.startswith('openbsd'),
230 231
                     "Issue #12868: sigaltstack() doesn't work on "
                     "OpenBSD if Python is compiled with pthread")
232 233 234 235
    @unittest.skipIf(not hasattr(faulthandler, '_stack_overflow'),
                     'need faulthandler._stack_overflow()')
    def test_stack_overflow(self):
        self.check_fatal_error("""
236 237 238 239
            import faulthandler
            faulthandler.enable()
            faulthandler._stack_overflow()
            """,
240
            3,
241 242
            '(?:Segmentation fault|Bus error)',
            other_regex='unable to raise a stack overflow')
243

244
    @requires_raise
245 246
    def test_gil_released(self):
        self.check_fatal_error("""
247 248
            import faulthandler
            faulthandler.enable()
249
            faulthandler._sigsegv(True)
250
            """,
251
            3,
252
            'Segmentation fault')
253

254
    @requires_raise
255 256 257
    def test_enable_file(self):
        with temporary_filename() as filename:
            self.check_fatal_error("""
258 259 260 261 262
                import faulthandler
                output = open({filename}, 'wb')
                faulthandler.enable(output)
                faulthandler._sigsegv()
                """.format(filename=repr(filename)),
263
                4,
264
                'Segmentation fault',
265 266
                filename=filename)

267 268
    @unittest.skipIf(sys.platform == "win32",
                     "subprocess doesn't support pass_fds on Windows")
269
    @requires_raise
270 271 272 273 274 275 276 277 278 279 280 281 282
    def test_enable_fd(self):
        with tempfile.TemporaryFile('wb+') as fp:
            fd = fp.fileno()
            self.check_fatal_error("""
                import faulthandler
                import sys
                faulthandler.enable(%s)
                faulthandler._sigsegv()
                """ % fd,
                4,
                'Segmentation fault',
                fd=fd)

283
    @requires_raise
284
    def test_enable_single_thread(self):
285
        self.check_fatal_error("""
286 287 288 289
            import faulthandler
            faulthandler.enable(all_threads=False)
            faulthandler._sigsegv()
            """,
290
            3,
291
            'Segmentation fault',
292
            all_threads=False)
293

294
    @requires_raise
295 296
    def test_disable(self):
        code = """
297 298 299 300 301
            import faulthandler
            faulthandler.enable()
            faulthandler.disable()
            faulthandler._sigsegv()
            """
302
        not_expected = 'Fatal Python error'
303
        stderr, exitcode = self.get_output(code)
Victor Stinner's avatar
Victor Stinner committed
304
        stderr = '\n'.join(stderr)
305 306
        self.assertTrue(not_expected not in stderr,
                     "%r is present in %r" % (not_expected, stderr))
307
        self.assertNotEqual(exitcode, 0)
308 309

    def test_is_enabled(self):
310
        orig_stderr = sys.stderr
311
        try:
312 313 314
            # regrtest may replace sys.stderr by io.StringIO object, but
            # faulthandler.enable() requires that sys.stderr has a fileno()
            # method
315
            sys.stderr = sys.__stderr__
316 317 318

            was_enabled = faulthandler.is_enabled()
            try:
319
                faulthandler.enable()
320
                self.assertTrue(faulthandler.is_enabled())
321
                faulthandler.disable()
322 323 324 325 326 327 328 329
                self.assertFalse(faulthandler.is_enabled())
            finally:
                if was_enabled:
                    faulthandler.enable()
                else:
                    faulthandler.disable()
        finally:
            sys.stderr = orig_stderr
330

331 332 333
    def test_disabled_by_default(self):
        # By default, the module should be disabled
        code = "import faulthandler; print(faulthandler.is_enabled())"
334 335 336 337 338 339 340
        args = filter(None, (sys.executable,
                             "-E" if sys.flags.ignore_environment else "",
                             "-c", code))
        env = os.environ.copy()
        env.pop("PYTHONFAULTHANDLER", None)
        # don't use assert_python_ok() because it always enables faulthandler
        output = subprocess.check_output(args, env=env)
341
        self.assertEqual(output.rstrip(), b"False")
342 343 344 345

    def test_sys_xoptions(self):
        # Test python -X faulthandler
        code = "import faulthandler; print(faulthandler.is_enabled())"
346 347 348 349 350 351 352
        args = filter(None, (sys.executable,
                             "-E" if sys.flags.ignore_environment else "",
                             "-X", "faulthandler", "-c", code))
        env = os.environ.copy()
        env.pop("PYTHONFAULTHANDLER", None)
        # don't use assert_python_ok() because it always enables faulthandler
        output = subprocess.check_output(args, env=env)
353 354 355 356 357 358 359 360
        self.assertEqual(output.rstrip(), b"True")

    def test_env_var(self):
        # empty env var
        code = "import faulthandler; print(faulthandler.is_enabled())"
        args = (sys.executable, "-c", code)
        env = os.environ.copy()
        env['PYTHONFAULTHANDLER'] = ''
361
        # don't use assert_python_ok() because it always enables faulthandler
362 363 364 365 366 367 368 369
        output = subprocess.check_output(args, env=env)
        self.assertEqual(output.rstrip(), b"False")

        # non-empty env var
        env = os.environ.copy()
        env['PYTHONFAULTHANDLER'] = '1'
        output = subprocess.check_output(args, env=env)
        self.assertEqual(output.rstrip(), b"True")
370

371
    def check_dump_traceback(self, *, filename=None, fd=None):
372 373 374 375 376
        """
        Explicitly call dump_traceback() function and check its output.
        Raise an error if the output doesn't match the expected format.
        """
        code = """
377
            import faulthandler
378

379 380 381
            filename = {filename!r}
            fd = {fd}

382
            def funcB():
383 384
                if filename:
                    with open(filename, "wb") as fp:
385
                        faulthandler.dump_traceback(fp, all_threads=False)
386 387 388
                elif fd is not None:
                    faulthandler.dump_traceback(fd,
                                                all_threads=False)
389 390
                else:
                    faulthandler.dump_traceback(all_threads=False)
391

392 393
            def funcA():
                funcB()
394

395 396
            funcA()
            """
397
        code = code.format(
398 399
            filename=filename,
            fd=fd,
400 401
        )
        if filename:
402 403 404
            lineno = 9
        elif fd is not None:
            lineno = 12
405
        else:
406
            lineno = 14
407
        expected = [
408
            'Stack (most recent call first):',
409
            '  File "<string>", line %s in funcB' % lineno,
410 411
            '  File "<string>", line 17 in funcA',
            '  File "<string>", line 19 in <module>'
412
        ]
413
        trace, exitcode = self.get_output(code, filename, fd)
414
        self.assertEqual(trace, expected)
415
        self.assertEqual(exitcode, 0)
416 417

    def test_dump_traceback(self):
418
        self.check_dump_traceback()
419 420

    def test_dump_traceback_file(self):
421
        with temporary_filename() as filename:
422 423
            self.check_dump_traceback(filename=filename)

424 425
    @unittest.skipIf(sys.platform == "win32",
                     "subprocess doesn't support pass_fds on Windows")
426 427 428
    def test_dump_traceback_fd(self):
        with tempfile.TemporaryFile('wb+') as fp:
            self.check_dump_traceback(fd=fp.fileno())
429

430 431 432 433 434
    def test_truncate(self):
        maxlen = 500
        func_name = 'x' * (maxlen + 50)
        truncated = 'x' * maxlen + '...'
        code = """
435
            import faulthandler
436

437 438
            def {func_name}():
                faulthandler.dump_traceback(all_threads=False)
439

440 441
            {func_name}()
            """
442 443 444 445
        code = code.format(
            func_name=func_name,
        )
        expected = [
446
            'Stack (most recent call first):',
447 448 449 450 451 452 453
            '  File "<string>", line 4 in %s' % truncated,
            '  File "<string>", line 6 in <module>'
        ]
        trace, exitcode = self.get_output(code)
        self.assertEqual(trace, expected)
        self.assertEqual(exitcode, 0)

454 455 456 457 458 459
    def check_dump_traceback_threads(self, filename):
        """
        Call explicitly dump_traceback(all_threads=True) and check the output.
        Raise an error if the output doesn't match the expected format.
        """
        code = """
460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490
            import faulthandler
            from threading import Thread, Event
            import time

            def dump():
                if {filename}:
                    with open({filename}, "wb") as fp:
                        faulthandler.dump_traceback(fp, all_threads=True)
                else:
                    faulthandler.dump_traceback(all_threads=True)

            class Waiter(Thread):
                # avoid blocking if the main thread raises an exception.
                daemon = True

                def __init__(self):
                    Thread.__init__(self)
                    self.running = Event()
                    self.stop = Event()

                def run(self):
                    self.running.set()
                    self.stop.wait()

            waiter = Waiter()
            waiter.start()
            waiter.running.wait()
            dump()
            waiter.stop.set()
            waiter.join()
            """
491
        code = code.format(filename=repr(filename))
492
        output, exitcode = self.get_output(code, filename)
493 494 495 496 497
        output = '\n'.join(output)
        if filename:
            lineno = 8
        else:
            lineno = 10
498
        regex = r"""
499 500 501 502 503 504
            ^Thread 0x[0-9a-f]+ \(most recent call first\):
            (?:  File ".*threading.py", line [0-9]+ in [_a-z]+
            ){{1,3}}  File "<string>", line 23 in run
              File ".*threading.py", line [0-9]+ in _bootstrap_inner
              File ".*threading.py", line [0-9]+ in _bootstrap

505
            Current thread 0x[0-9a-f]+ \(most recent call first\):
506 507 508 509
              File "<string>", line {lineno} in dump
              File "<string>", line 28 in <module>$
            """
        regex = dedent(regex.format(lineno=lineno)).strip()
510
        self.assertRegex(output, regex)
511
        self.assertEqual(exitcode, 0)
512 513 514

    def test_dump_traceback_threads(self):
        self.check_dump_traceback_threads(None)
515 516

    def test_dump_traceback_threads_file(self):
517 518 519
        with temporary_filename() as filename:
            self.check_dump_traceback_threads(filename)

520 521 522 523
    @unittest.skipIf(not hasattr(faulthandler, 'dump_traceback_later'),
                     'need faulthandler.dump_traceback_later()')
    def check_dump_traceback_later(self, repeat=False, cancel=False, loops=1,
                                   *, filename=None, fd=None):
524 525 526 527 528 529 530
        """
        Check how many times the traceback is written in timeout x 2.5 seconds,
        or timeout x 3.5 seconds if cancel is True: 1, 2 or 3 times depending
        on repeat and cancel options.

        Raise an error if the output doesn't match the expect format.
        """
531
        timeout_str = str(datetime.timedelta(seconds=TIMEOUT))
532
        code = """
533 534
            import faulthandler
            import time
535 536 537 538 539 540 541 542
            import sys

            timeout = {timeout}
            repeat = {repeat}
            cancel = {cancel}
            loops = {loops}
            filename = {filename!r}
            fd = {fd}
543 544 545 546 547 548 549 550 551

            def func(timeout, repeat, cancel, file, loops):
                for loop in range(loops):
                    faulthandler.dump_traceback_later(timeout, repeat=repeat, file=file)
                    if cancel:
                        faulthandler.cancel_dump_traceback_later()
                    time.sleep(timeout * 5)
                    faulthandler.cancel_dump_traceback_later()

552 553 554 555
            if filename:
                file = open(filename, "wb")
            elif fd is not None:
                file = sys.stderr.fileno()
556 557 558
            else:
                file = None
            func(timeout, repeat, cancel, file, loops)
559
            if filename:
560 561
                file.close()
            """
562
        code = code.format(
563
            timeout=TIMEOUT,
564 565
            repeat=repeat,
            cancel=cancel,
566
            loops=loops,
567 568
            filename=filename,
            fd=fd,
569
        )
570
        trace, exitcode = self.get_output(code, filename)
571 572
        trace = '\n'.join(trace)

573
        if not cancel:
574
            count = loops
575
            if repeat:
576
                count *= 2
577
            header = r'Timeout \(%s\)!\nThread 0x[0-9a-f]+ \(most recent call first\):\n' % timeout_str
578
            regex = expected_traceback(17, 26, header, min_count=count)
579
            self.assertRegex(trace, regex)
580
        else:
581
            self.assertEqual(trace, '')
582
        self.assertEqual(exitcode, 0)
583

584 585
    def test_dump_traceback_later(self):
        self.check_dump_traceback_later()
586

587 588
    def test_dump_traceback_later_repeat(self):
        self.check_dump_traceback_later(repeat=True)
589

590 591
    def test_dump_traceback_later_cancel(self):
        self.check_dump_traceback_later(cancel=True)
592

593
    def test_dump_traceback_later_file(self):
594 595 596
        with temporary_filename() as filename:
            self.check_dump_traceback_later(filename=filename)

597 598
    @unittest.skipIf(sys.platform == "win32",
                     "subprocess doesn't support pass_fds on Windows")
599 600 601
    def test_dump_traceback_later_fd(self):
        with tempfile.TemporaryFile('wb+') as fp:
            self.check_dump_traceback_later(fd=fp.fileno())
602

603
    def test_dump_traceback_later_twice(self):
604
        self.check_dump_traceback_later(loops=2)
605

606 607
    @unittest.skipIf(not hasattr(faulthandler, "register"),
                     "need faulthandler.register")
608
    def check_register(self, filename=False, all_threads=False,
609
                       unregister=False, chain=False, fd=None):
610 611 612 613
        """
        Register a handler displaying the traceback on a user signal. Raise the
        signal and check the written traceback.

614 615
        If chain is True, check that the previous signal handler is called.

616 617
        Raise an error if the output doesn't match the expected format.
        """
618
        signum = signal.SIGUSR1
619
        code = """
620 621 622 623 624
            import faulthandler
            import os
            import signal
            import sys

625 626 627 628 629 630 631
            all_threads = {all_threads}
            signum = {signum}
            unregister = {unregister}
            chain = {chain}
            filename = {filename!r}
            fd = {fd}

632 633 634 635 636 637 638
            def func(signum):
                os.kill(os.getpid(), signum)

            def handler(signum, frame):
                handler.called = True
            handler.called = False

639 640 641 642
            if filename:
                file = open(filename, "wb")
            elif fd is not None:
                file = sys.stderr.fileno()
643 644 645 646 647
            else:
                file = None
            if chain:
                signal.signal(signum, handler)
            faulthandler.register(signum, file=file,
648
                                  all_threads=all_threads, chain={chain})
649 650 651 652 653 654 655 656 657 658
            if unregister:
                faulthandler.unregister(signum)
            func(signum)
            if chain and not handler.called:
                if file is not None:
                    output = file
                else:
                    output = sys.stderr
                print("Error: signal handler not called!", file=output)
                exitcode = 1
659 660 661
            else:
                exitcode = 0
            if filename:
662 663 664
                file.close()
            sys.exit(exitcode)
            """
665 666
        code = code.format(
            all_threads=all_threads,
667 668
            signum=signum,
            unregister=unregister,
669
            chain=chain,
670 671
            filename=filename,
            fd=fd,
672
        )
673
        trace, exitcode = self.get_output(code, filename)
674
        trace = '\n'.join(trace)
675 676
        if not unregister:
            if all_threads:
677
                regex = r'Current thread 0x[0-9a-f]+ \(most recent call first\):\n'
678
            else:
679
                regex = r'Stack \(most recent call first\):\n'
680
            regex = expected_traceback(14, 32, regex)
681
            self.assertRegex(trace, regex)
682
        else:
683 684 685 686 687
            self.assertEqual(trace, '')
        if unregister:
            self.assertNotEqual(exitcode, 0)
        else:
            self.assertEqual(exitcode, 0)
688 689 690 691

    def test_register(self):
        self.check_register()

692 693 694
    def test_unregister(self):
        self.check_register(unregister=True)

695 696 697 698
    def test_register_file(self):
        with temporary_filename() as filename:
            self.check_register(filename=filename)

699 700
    @unittest.skipIf(sys.platform == "win32",
                     "subprocess doesn't support pass_fds on Windows")
701 702 703 704
    def test_register_fd(self):
        with tempfile.TemporaryFile('wb+') as fp:
            self.check_register(fd=fp.fileno())

705 706 707
    def test_register_threads(self):
        self.check_register(all_threads=True)

708 709 710
    def test_register_chain(self):
        self.check_register(chain=True)

711 712 713 714 715 716 717 718 719 720 721 722
    @contextmanager
    def check_stderr_none(self):
        stderr = sys.stderr
        try:
            sys.stderr = None
            with self.assertRaises(RuntimeError) as cm:
                yield
            self.assertEqual(str(cm.exception), "sys.stderr is None")
        finally:
            sys.stderr = stderr

    def test_stderr_None(self):
723
        # Issue #21497: provide a helpful error if sys.stderr is None,
724 725 726 727 728 729 730 731 732 733 734 735
        # instead of just an attribute error: "None has no attribute fileno".
        with self.check_stderr_none():
            faulthandler.enable()
        with self.check_stderr_none():
            faulthandler.dump_traceback()
        if hasattr(faulthandler, 'dump_traceback_later'):
            with self.check_stderr_none():
                faulthandler.dump_traceback_later(1e-3)
        if hasattr(faulthandler, "register"):
            with self.check_stderr_none():
                faulthandler.register(signal.SIGUSR1)

736 737 738 739 740 741 742
    @unittest.skipUnless(MS_WINDOWS, 'specific to Windows')
    def test_raise_exception(self):
        for exc, name in (
            ('EXCEPTION_ACCESS_VIOLATION', 'access violation'),
            ('EXCEPTION_INT_DIVIDE_BY_ZERO', 'int divide by zero'),
            ('EXCEPTION_STACK_OVERFLOW', 'stack overflow'),
        ):
743
            self.check_windows_exception(f"""
744 745 746
                import faulthandler
                faulthandler.enable()
                faulthandler._raise_exception(faulthandler._{exc})
747
                """,
748 749 750
                3,
                name)

751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766
    @unittest.skipUnless(MS_WINDOWS, 'specific to Windows')
    def test_ignore_exception(self):
        for exc_code in (
            0xE06D7363,   # MSC exception ("Emsc")
            0xE0434352,   # COM Callable Runtime exception ("ECCR")
        ):
            code = f"""
                    import faulthandler
                    faulthandler.enable()
                    faulthandler._raise_exception({exc_code})
                    """
            code = dedent(code)
            output, exitcode = self.get_output(code)
            self.assertEqual(output, [])
            self.assertEqual(exitcode, exc_code)

767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789
    @unittest.skipUnless(MS_WINDOWS, 'specific to Windows')
    def test_raise_nonfatal_exception(self):
        # These exceptions are not strictly errors. Letting
        # faulthandler display the traceback when they are
        # raised is likely to result in noise. However, they
        # may still terminate the process if there is no
        # handler installed for them (which there typically
        # is, e.g. for debug messages).
        for exc in (
            0x00000000,
            0x34567890,
            0x40000000,
            0x40001000,
            0x70000000,
            0x7FFFFFFF,
        ):
            output, exitcode = self.get_output(f"""
                import faulthandler
                faulthandler.enable()
                faulthandler._raise_exception(0x{exc:x})
                """
            )
            self.assertEqual(output, [])
790 791 792 793
            # On Windows older than 7 SP1, the actual exception code has
            # bit 29 cleared.
            self.assertIn(exitcode,
                          (exc, exc & ~0x10000000))
794

795 796 797 798 799 800 801 802 803 804 805 806
    @unittest.skipUnless(MS_WINDOWS, 'specific to Windows')
    def test_disable_windows_exc_handler(self):
        code = dedent("""
            import faulthandler
            faulthandler.enable()
            faulthandler.disable()
            code = faulthandler._EXCEPTION_ACCESS_VIOLATION
            faulthandler._raise_exception(code)
        """)
        output, exitcode = self.get_output(code)
        self.assertEqual(output, [])
        self.assertEqual(exitcode, 0xC0000005)
807

808 809

if __name__ == "__main__":
810
    unittest.main()