test_signal.py 17.3 KB
Newer Older
1 2
import unittest
from test import test_support
3
from contextlib import closing
4
import gc
5 6
import pickle
import select
Guido van Rossum's avatar
Guido van Rossum committed
7
import signal
8
import subprocess
9
import traceback
10 11 12
import sys, os, time, errno

if sys.platform[:3] in ('win', 'os2') or sys.platform == 'riscos':
13
    raise unittest.SkipTest("Can't test signal on %s" % \
14 15
                                   sys.platform)

16

17 18
class HandlerBCalled(Exception):
    pass
Guido van Rossum's avatar
Guido van Rossum committed
19

20 21 22 23 24 25 26 27 28 29 30

def exit_subprocess():
    """Use os._exit(0) to exit the current subprocess.

    Otherwise, the test catches the SystemExit and continues executing
    in parallel with the original test, so you wind up with an
    exponential number of tests running concurrently.
    """
    os._exit(0)


31 32 33
def ignoring_eintr(__func, *args, **kwargs):
    try:
        return __func(*args, **kwargs)
34
    except EnvironmentError as e:
35
        if e.errno != errno.EINTR:
36 37 38 39
            raise
        return None


40 41 42
class InterProcessSignalTests(unittest.TestCase):
    MAX_DURATION = 20   # Entire test should last at most 20 sec.

43 44 45 46 47 48 49 50
    def setUp(self):
        self.using_gc = gc.isenabled()
        gc.disable()

    def tearDown(self):
        if self.using_gc:
            gc.enable()

51 52 53 54
    def format_frame(self, frame, limit=None):
        return ''.join(traceback.format_stack(frame, limit=limit))

    def handlerA(self, signum, frame):
55 56
        self.a_called = True
        if test_support.verbose:
57 58
            print "handlerA invoked from signal %s at:\n%s" % (
                signum, self.format_frame(frame, limit=1))
59

60
    def handlerB(self, signum, frame):
61 62
        self.b_called = True
        if test_support.verbose:
63 64 65
            print "handlerB invoked from signal %s at:\n%s" % (
                signum, self.format_frame(frame, limit=1))
        raise HandlerBCalled(signum, self.format_frame(frame))
66

67 68
    def wait(self, child):
        """Wait for child to finish, ignoring EINTR."""
69 70
        while True:
            try:
71
                child.wait()
72
                return
73 74 75
            except OSError as e:
                if e.errno != errno.EINTR:
                    raise
76

77 78 79 80 81 82 83 84 85 86 87
    def run_test(self):
        # Install handlers. This function runs in a sub-process, so we
        # don't worry about re-setting the default handlers.
        signal.signal(signal.SIGHUP, self.handlerA)
        signal.signal(signal.SIGUSR1, self.handlerB)
        signal.signal(signal.SIGUSR2, signal.SIG_IGN)
        signal.signal(signal.SIGALRM, signal.default_int_handler)

        # Variables the signals will modify:
        self.a_called = False
        self.b_called = False
88

89 90
        # Let the sub-processes know who to send signals to.
        pid = os.getpid()
91 92 93
        if test_support.verbose:
            print "test runner's pid is", pid

94 95 96 97 98
        child = ignoring_eintr(subprocess.Popen, ['kill', '-HUP', str(pid)])
        if child:
            self.wait(child)
            if not self.a_called:
                time.sleep(1)  # Give the signal time to be delivered.
99 100 101
        self.assertTrue(self.a_called)
        self.assertFalse(self.b_called)
        self.a_called = False
102

103 104 105 106
        # Make sure the signal isn't delivered while the previous
        # Popen object is being destroyed, because __del__ swallows
        # exceptions.
        del child
107
        try:
108
            child = subprocess.Popen(['kill', '-USR1', str(pid)])
109 110
            # This wait should be interrupted by the signal's exception.
            self.wait(child)
111
            time.sleep(1)  # Give the signal time to be delivered.
112 113 114 115
            self.fail('HandlerBCalled exception not thrown')
        except HandlerBCalled:
            self.assertTrue(self.b_called)
            self.assertFalse(self.a_called)
116
            if test_support.verbose:
117
                print "HandlerBCalled exception caught"
118

119 120 121
        child = ignoring_eintr(subprocess.Popen, ['kill', '-USR2', str(pid)])
        if child:
            self.wait(child)  # Nothing should happen.
122 123 124 125 126 127 128

        try:
            signal.alarm(1)
            # The race condition in pause doesn't matter in this case,
            # since alarm is going to raise a KeyboardException, which
            # will skip the call.
            signal.pause()
129 130 131
            # But if another signal arrives before the alarm, pause
            # may return early.
            time.sleep(1)
132 133 134 135
        except KeyboardInterrupt:
            if test_support.verbose:
                print "KeyboardInterrupt (the alarm() went off)"
        except:
136
            self.fail("Some other exception woke us from pause: %s" %
137 138
                      traceback.format_exc())
        else:
139 140
            self.fail("pause returned of its own accord, and the signal"
                      " didn't arrive after another second.")
141

142
    # Issue 3864. Unknown if this affects earlier versions of freebsd also.
143 144 145
    @unittest.skipIf(sys.platform=='freebsd6',
        'inter process signals not reliable (do not mix well with threading) '
        'on freebsd6')
146 147 148 149 150 151 152
    def test_main(self):
        # This function spawns a child process to insulate the main
        # test-running process from all the signals. It then
        # communicates with that child process over a pipe and
        # re-raises information about any exceptions the child
        # throws. The real work happens in self.run_test().
        os_done_r, os_done_w = os.pipe()
153 154
        with closing(os.fdopen(os_done_r)) as done_r, \
             closing(os.fdopen(os_done_w, 'w')) as done_w:
155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186
            child = os.fork()
            if child == 0:
                # In the child process; run the test and report results
                # through the pipe.
                try:
                    done_r.close()
                    # Have to close done_w again here because
                    # exit_subprocess() will skip the enclosing with block.
                    with closing(done_w):
                        try:
                            self.run_test()
                        except:
                            pickle.dump(traceback.format_exc(), done_w)
                        else:
                            pickle.dump(None, done_w)
                except:
                    print 'Uh oh, raised from pickle.'
                    traceback.print_exc()
                finally:
                    exit_subprocess()

            done_w.close()
            # Block for up to MAX_DURATION seconds for the test to finish.
            r, w, x = select.select([done_r], [], [], self.MAX_DURATION)
            if done_r in r:
                tb = pickle.load(done_r)
                if tb:
                    self.fail(tb)
            else:
                os.kill(child, signal.SIGKILL)
                self.fail('Test deadlocked after %d seconds.' %
                          self.MAX_DURATION)
187 188 189


class BasicSignalTests(unittest.TestCase):
190 191 192
    def trivial_signal_handler(self, *args):
        pass

193 194 195 196
    def test_out_of_range_signal_number_raises_error(self):
        self.assertRaises(ValueError, signal.getsignal, 4242)

        self.assertRaises(ValueError, signal.signal, 4242,
197
                          self.trivial_signal_handler)
198 199 200 201 202

    def test_setting_signal_handler_to_none_raises_error(self):
        self.assertRaises(TypeError, signal.signal,
                          signal.SIGUSR1, None)

203 204 205 206 207 208 209 210
    def test_getsignal(self):
        hup = signal.signal(signal.SIGHUP, self.trivial_signal_handler)
        self.assertEquals(signal.getsignal(signal.SIGHUP),
                          self.trivial_signal_handler)
        signal.signal(signal.SIGHUP, hup)
        self.assertEquals(signal.getsignal(signal.SIGHUP), hup)


Guido van Rossum's avatar
Guido van Rossum committed
211 212 213 214 215 216 217 218 219 220 221 222 223
class WakeupSignalTests(unittest.TestCase):
    TIMEOUT_FULL = 10
    TIMEOUT_HALF = 5

    def test_wakeup_fd_early(self):
        import select

        signal.alarm(1)
        before_time = time.time()
        # We attempt to get a signal during the sleep,
        # before select is called
        time.sleep(self.TIMEOUT_FULL)
        mid_time = time.time()
224
        self.assertTrue(mid_time - before_time < self.TIMEOUT_HALF)
Guido van Rossum's avatar
Guido van Rossum committed
225 226
        select.select([self.read], [], [], self.TIMEOUT_FULL)
        after_time = time.time()
227
        self.assertTrue(after_time - mid_time < self.TIMEOUT_HALF)
Guido van Rossum's avatar
Guido van Rossum committed
228 229 230 231 232 233 234 235 236 237

    def test_wakeup_fd_during(self):
        import select

        signal.alarm(1)
        before_time = time.time()
        # We attempt to get a signal during the select call
        self.assertRaises(select.error, select.select,
            [self.read], [], [], self.TIMEOUT_FULL)
        after_time = time.time()
238
        self.assertTrue(after_time - before_time < self.TIMEOUT_HALF)
Guido van Rossum's avatar
Guido van Rossum committed
239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255

    def setUp(self):
        import fcntl

        self.alrm = signal.signal(signal.SIGALRM, lambda x,y:None)
        self.read, self.write = os.pipe()
        flags = fcntl.fcntl(self.write, fcntl.F_GETFL, 0)
        flags = flags | os.O_NONBLOCK
        fcntl.fcntl(self.write, fcntl.F_SETFL, flags)
        self.old_wakeup = signal.set_wakeup_fd(self.write)

    def tearDown(self):
        signal.set_wakeup_fd(self.old_wakeup)
        os.close(self.read)
        os.close(self.write)
        signal.signal(signal.SIGALRM, self.alrm)

256 257
class SiginterruptTest(unittest.TestCase):
    signum = signal.SIGUSR1
258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274

    def setUp(self):
        """Install a no-op signal handler that can be set to allow
        interrupts or not, and arrange for the original signal handler to be
        re-installed when the test is finished.
        """
        oldhandler = signal.signal(self.signum, lambda x,y: None)
        self.addCleanup(signal.signal, self.signum, oldhandler)

    def readpipe_interrupted(self):
        """Perform a read during which a signal will arrive.  Return True if the
        read is interrupted by the signal and raises an exception.  Return False
        if it returns normally.
        """
        # Create a pipe that can be used for the read.  Also clean it up
        # when the test is over, since nothing else will (but see below for
        # the write end).
275
        r, w = os.pipe()
276 277 278 279
        self.addCleanup(os.close, r)

        # Create another process which can send a signal to this one to try
        # to interrupt the read.
280 281 282
        ppid = os.getpid()
        pid = os.fork()

283 284 285 286 287 288 289 290
        if pid == 0:
            # Child code: sleep to give the parent enough time to enter the
            # read() call (there's a race here, but it's really tricky to
            # eliminate it); then signal the parent process.  Also, sleep
            # again to make it likely that the signal is delivered to the
            # parent process before the child exits.  If the child exits
            # first, the write end of the pipe will be closed and the test
            # is invalid.
291 292 293 294 295
            try:
                time.sleep(0.2)
                os.kill(ppid, self.signum)
                time.sleep(0.2)
            finally:
296
                # No matter what, just exit as fast as possible now.
297
                exit_subprocess()
298 299 300 301 302 303 304 305 306 307 308
        else:
            # Parent code.
            # Make sure the child is eventually reaped, else it'll be a
            # zombie for the rest of the test suite run.
            self.addCleanup(os.waitpid, pid, 0)

            # Close the write end of the pipe.  The child has a copy, so
            # it's not really closed until the child exits.  We need it to
            # close when the child exits so that in the non-interrupt case
            # the read eventually completes, otherwise we could just close
            # it *after* the test.
309 310
            os.close(w)

311 312
            # Try the read and report whether it is interrupted or not to
            # the caller.
313
            try:
314
                d = os.read(r, 1)
315 316 317 318 319 320 321
                return False
            except OSError, err:
                if err.errno != errno.EINTR:
                    raise
                return True

    def test_without_siginterrupt(self):
322 323 324 325 326 327 328 329 330
        """If a signal handler is installed and siginterrupt is not called
        at all, when that signal arrives, it interrupts a syscall that's in
        progress.
        """
        i = self.readpipe_interrupted()
        self.assertTrue(i)
        # Arrival of the signal shouldn't have changed anything.
        i = self.readpipe_interrupted()
        self.assertTrue(i)
331 332

    def test_siginterrupt_on(self):
333 334 335 336 337 338 339 340 341 342
        """If a signal handler is installed and siginterrupt is called with
        a true value for the second argument, when that signal arrives, it
        interrupts a syscall that's in progress.
        """
        signal.siginterrupt(self.signum, 1)
        i = self.readpipe_interrupted()
        self.assertTrue(i)
        # Arrival of the signal shouldn't have changed anything.
        i = self.readpipe_interrupted()
        self.assertTrue(i)
343 344

    def test_siginterrupt_off(self):
345 346 347 348 349 350 351 352 353 354 355 356
        """If a signal handler is installed and siginterrupt is called with
        a false value for the second argument, when that signal arrives, it
        does not interrupt a syscall that's in progress.
        """
        signal.siginterrupt(self.signum, 0)
        i = self.readpipe_interrupted()
        self.assertFalse(i)
        # Arrival of the signal shouldn't have changed anything.
        i = self.readpipe_interrupted()
        self.assertFalse(i)


Guido van Rossum's avatar
Guido van Rossum committed
357

358 359 360 361 362
class ItimerTest(unittest.TestCase):
    def setUp(self):
        self.hndl_called = False
        self.hndl_count = 0
        self.itimer = None
363
        self.old_alarm = signal.signal(signal.SIGALRM, self.sig_alrm)
364 365

    def tearDown(self):
366
        signal.signal(signal.SIGALRM, self.old_alarm)
367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404
        if self.itimer is not None: # test_itimer_exc doesn't change this attr
            # just ensure that itimer is stopped
            signal.setitimer(self.itimer, 0)

    def sig_alrm(self, *args):
        self.hndl_called = True
        if test_support.verbose:
            print("SIGALRM handler invoked", args)

    def sig_vtalrm(self, *args):
        self.hndl_called = True

        if self.hndl_count > 3:
            # it shouldn't be here, because it should have been disabled.
            raise signal.ItimerError("setitimer didn't disable ITIMER_VIRTUAL "
                "timer.")
        elif self.hndl_count == 3:
            # disable ITIMER_VIRTUAL, this function shouldn't be called anymore
            signal.setitimer(signal.ITIMER_VIRTUAL, 0)
            if test_support.verbose:
                print("last SIGVTALRM handler call")

        self.hndl_count += 1

        if test_support.verbose:
            print("SIGVTALRM handler invoked", args)

    def sig_prof(self, *args):
        self.hndl_called = True
        signal.setitimer(signal.ITIMER_PROF, 0)

        if test_support.verbose:
            print("SIGPROF handler invoked", args)

    def test_itimer_exc(self):
        # XXX I'm assuming -1 is an invalid itimer, but maybe some platform
        # defines it ?
        self.assertRaises(signal.ItimerError, signal.setitimer, -1, 0)
405 406 407 408
        # Negative times are treated as zero on some platforms.
        if 0:
            self.assertRaises(signal.ItimerError,
                              signal.setitimer, signal.ITIMER_REAL, -1)
409 410 411 412 413 414 415 416 417 418

    def test_itimer_real(self):
        self.itimer = signal.ITIMER_REAL
        signal.setitimer(self.itimer, 1.0)
        if test_support.verbose:
            print("\ncall pause()...")
        signal.pause()

        self.assertEqual(self.hndl_called, True)

419
    # Issue 3864. Unknown if this affects earlier versions of freebsd also.
420 421
    @unittest.skipIf(sys.platform=='freebsd6',
        'itimer not reliable (does not mix well with threading) on freebsd6')
422 423 424 425 426
    def test_itimer_virtual(self):
        self.itimer = signal.ITIMER_VIRTUAL
        signal.signal(signal.SIGVTALRM, self.sig_vtalrm)
        signal.setitimer(self.itimer, 0.3, 0.2)

427
        start_time = time.time()
428
        while time.time() - start_time < 60.0:
429 430
            # use up some virtual time by doing real work
            _ = pow(12345, 67890, 10000019)
431 432
            if signal.getitimer(self.itimer) == (0.0, 0.0):
                break # sig_vtalrm handler stopped this itimer
433 434 435 436
        else: # Issue 8424
            sys.stdout.write("test_itimer_virtual: timeout: likely cause: "
                             "machine too slow or load too high.\n")
            return
437 438 439 440 441 442

        # virtual itimer should be (0.0, 0.0) now
        self.assertEquals(signal.getitimer(self.itimer), (0.0, 0.0))
        # and the handler should have been called
        self.assertEquals(self.hndl_called, True)

443
    # Issue 3864. Unknown if this affects earlier versions of freebsd also.
444 445
    @unittest.skipIf(sys.platform=='freebsd6',
        'itimer not reliable (does not mix well with threading) on freebsd6')
446 447 448
    def test_itimer_prof(self):
        self.itimer = signal.ITIMER_PROF
        signal.signal(signal.SIGPROF, self.sig_prof)
449
        signal.setitimer(self.itimer, 0.2, 0.2)
450

451
        start_time = time.time()
452
        while time.time() - start_time < 60.0:
453 454
            # do some work
            _ = pow(12345, 67890, 10000019)
455 456
            if signal.getitimer(self.itimer) == (0.0, 0.0):
                break # sig_prof handler stopped this itimer
457 458 459 460
        else: # Issue 8424
            sys.stdout.write("test_itimer_prof: timeout: likely cause: "
                             "machine too slow or load too high.\n")
            return
461

462 463 464
        # profiling itimer should be (0.0, 0.0) now
        self.assertEquals(signal.getitimer(self.itimer), (0.0, 0.0))
        # and the handler should have been called
465 466
        self.assertEqual(self.hndl_called, True)

467
def test_main():
Guido van Rossum's avatar
Guido van Rossum committed
468
    test_support.run_unittest(BasicSignalTests, InterProcessSignalTests,
469
        WakeupSignalTests, SiginterruptTest, ItimerTest)
470 471 472 473


if __name__ == "__main__":
    test_main()