threading.py 47.9 KB
Newer Older
1
"""Thread module emulating a subset of Java's threading model."""
2

3
import sys as _sys
4
import _thread
5

6 7 8 9
try:
    from time import monotonic as _time
except ImportError:
    from time import time as _time
10
from traceback import format_exc as _format_exc
11
from _weakrefset import WeakSet
12
from itertools import islice as _islice, count as _count
13 14
try:
    from _collections import deque as _deque
15
except ImportError:
16
    from collections import deque as _deque
17

18 19 20
# Note regarding PEP 8 compliant names
#  This threading model was originally inspired by Java, and inherited
# the convention of camelCase function and method names from that
21
# language. Those original names are not in any imminent danger of
22 23 24 25 26 27
# being deprecated (even for Py3k),so this module provides them as an
# alias for the PEP 8 compliant names
# Note that using the new PEP 8 compliant names facilitates substitution
# with the multiprocessing module, which doesn't provide the old
# Java inspired names.

28
__all__ = ['active_count', 'Condition', 'current_thread', 'enumerate', 'Event',
29
           'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread', 'Barrier',
30
           'Timer', 'ThreadError', 'setprofile', 'settrace', 'local', 'stack_size']
31

32
# Rename some stuff so "from threading import *" is safe
33 34
_start_new_thread = _thread.start_new_thread
_allocate_lock = _thread.allocate_lock
35
_set_sentinel = _thread._set_sentinel
36
get_ident = _thread.get_ident
37
ThreadError = _thread.error
38 39 40 41
try:
    _CRLock = _thread.RLock
except AttributeError:
    _CRLock = None
42
TIMEOUT_MAX = _thread.TIMEOUT_MAX
43
del _thread
44 45


46 47 48 49 50 51
# Support for profile and trace hooks

_profile_hook = None
_trace_hook = None

def setprofile(func):
52 53 54 55 56 57
    """Set a profile function for all threads started from the threading module.

    The func will be passed to sys.setprofile() for each thread, before its
    run() method is called.

    """
58 59
    global _profile_hook
    _profile_hook = func
Tim Peters's avatar
Tim Peters committed
60

61
def settrace(func):
62 63 64 65 66 67
    """Set a trace function for all threads started from the threading module.

    The func will be passed to sys.settrace() for each thread, before its run()
    method is called.

    """
68 69
    global _trace_hook
    _trace_hook = func
70 71 72 73 74

# Synchronization classes

Lock = _allocate_lock

75
def RLock(*args, **kwargs):
76 77 78 79 80 81 82 83
    """Factory function that returns a new reentrant lock.

    A reentrant lock must be released by the thread that acquired it. Once a
    thread has acquired a reentrant lock, the same thread may acquire it again
    without blocking; the thread must release it once for each time it has
    acquired it.

    """
84 85
    if _CRLock is None:
        return _PyRLock(*args, **kwargs)
86
    return _CRLock(*args, **kwargs)
87

88
class _RLock:
89 90 91 92 93 94 95 96
    """This class implements reentrant lock objects.

    A reentrant lock must be released by the thread that acquired it. Once a
    thread has acquired a reentrant lock, the same thread may acquire it
    again without blocking; the thread must release it once for each time it
    has acquired it.

    """
Tim Peters's avatar
Tim Peters committed
97

98
    def __init__(self):
99 100 101
        self._block = _allocate_lock()
        self._owner = None
        self._count = 0
102 103

    def __repr__(self):
104
        owner = self._owner
105 106 107 108 109 110
        try:
            owner = _active[owner].name
        except KeyError:
            pass
        return "<%s owner=%r count=%d>" % (
                self.__class__.__name__, owner, self._count)
111

112
    def acquire(self, blocking=True, timeout=-1):
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
        """Acquire a lock, blocking or non-blocking.

        When invoked without arguments: if this thread already owns the lock,
        increment the recursion level by one, and return immediately. Otherwise,
        if another thread owns the lock, block until the lock is unlocked. Once
        the lock is unlocked (not owned by any thread), then grab ownership, set
        the recursion level to one, and return. If more than one thread is
        blocked waiting until the lock is unlocked, only one at a time will be
        able to grab ownership of the lock. There is no return value in this
        case.

        When invoked with the blocking argument set to true, do the same thing
        as when called without arguments, and return true.

        When invoked with the blocking argument set to false, do not block. If a
        call without an argument would block, return false immediately;
        otherwise, do the same thing as when called without arguments, and
        return true.

        When invoked with the floating-point timeout argument set to a positive
        value, block for at most the number of seconds specified by timeout
        and as long as the lock cannot be acquired.  Return true if the lock has
        been acquired, false if the timeout has elapsed.

        """
138
        me = get_ident()
139
        if self._owner == me:
140
            self._count += 1
141
            return 1
142
        rc = self._block.acquire(blocking, timeout)
143
        if rc:
144 145
            self._owner = me
            self._count = 1
146 147
        return rc

148 149
    __enter__ = acquire

150
    def release(self):
151 152 153 154 155 156 157 158 159 160 161 162 163 164 165
        """Release a lock, decrementing the recursion level.

        If after the decrement it is zero, reset the lock to unlocked (not owned
        by any thread), and if any other threads are blocked waiting for the
        lock to become unlocked, allow exactly one of them to proceed. If after
        the decrement the recursion level is still nonzero, the lock remains
        locked and owned by the calling thread.

        Only call this method when the calling thread owns the lock. A
        RuntimeError is raised if this method is called when the lock is
        unlocked.

        There is no return value.

        """
166
        if self._owner != get_ident():
Georg Brandl's avatar
Georg Brandl committed
167
            raise RuntimeError("cannot release un-acquired lock")
168
        self._count = count = self._count - 1
169
        if not count:
170 171
            self._owner = None
            self._block.release()
172

173 174 175
    def __exit__(self, t, v, tb):
        self.release()

176 177
    # Internal methods used by condition variables

178
    def _acquire_restore(self, state):
179 180
        self._block.acquire()
        self._count, self._owner = state
181 182

    def _release_save(self):
183 184
        if self._count == 0:
            raise RuntimeError("cannot release un-acquired lock")
185 186 187 188 189
        count = self._count
        self._count = 0
        owner = self._owner
        self._owner = None
        self._block.release()
190 191 192
        return (count, owner)

    def _is_owned(self):
193
        return self._owner == get_ident()
194

195 196
_PyRLock = _RLock

197

198
class Condition:
199 200 201 202 203 204 205 206 207 208
    """Class that implements a condition variable.

    A condition variable allows one or more threads to wait until they are
    notified by another thread.

    If the lock argument is given and not None, it must be a Lock or RLock
    object, and it is used as the underlying lock. Otherwise, a new RLock object
    is created and used as the underlying lock.

    """
209

210
    def __init__(self, lock=None):
211 212
        if lock is None:
            lock = RLock()
213
        self._lock = lock
214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231
        # Export the lock's acquire() and release() methods
        self.acquire = lock.acquire
        self.release = lock.release
        # If the lock defines _release_save() and/or _acquire_restore(),
        # these override the default implementations (which just call
        # release() and acquire() on the lock).  Ditto for _is_owned().
        try:
            self._release_save = lock._release_save
        except AttributeError:
            pass
        try:
            self._acquire_restore = lock._acquire_restore
        except AttributeError:
            pass
        try:
            self._is_owned = lock._is_owned
        except AttributeError:
            pass
232
        self._waiters = _deque()
233

234
    def __enter__(self):
235
        return self._lock.__enter__()
236

237
    def __exit__(self, *args):
238
        return self._lock.__exit__(*args)
239

240
    def __repr__(self):
241
        return "<Condition(%s, %d)>" % (self._lock, len(self._waiters))
242 243

    def _release_save(self):
244
        self._lock.release()           # No state to save
245 246

    def _acquire_restore(self, x):
247
        self._lock.acquire()           # Ignore saved state
248 249

    def _is_owned(self):
250
        # Return True if lock is owned by current_thread.
251
        # This method is called only if _lock doesn't have _is_owned().
252 253
        if self._lock.acquire(0):
            self._lock.release()
254
            return False
255
        else:
256
            return True
257 258

    def wait(self, timeout=None):
259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280
        """Wait until notified or until a timeout occurs.

        If the calling thread has not acquired the lock when this method is
        called, a RuntimeError is raised.

        This method releases the underlying lock, and then blocks until it is
        awakened by a notify() or notify_all() call for the same condition
        variable in another thread, or until the optional timeout occurs. Once
        awakened or timed out, it re-acquires the lock and returns.

        When the timeout argument is present and not None, it should be a
        floating point number specifying a timeout for the operation in seconds
        (or fractions thereof).

        When the underlying lock is an RLock, it is not released using its
        release() method, since this may not actually unlock the lock when it
        was acquired multiple times recursively. Instead, an internal interface
        of the RLock class is used, which really unlocks it even when it has
        been recursively acquired several times. Another internal interface is
        then used to restore the recursion level when the lock is reacquired.

        """
281
        if not self._is_owned():
Georg Brandl's avatar
Georg Brandl committed
282
            raise RuntimeError("cannot wait on un-acquired lock")
283 284
        waiter = _allocate_lock()
        waiter.acquire()
285
        self._waiters.append(waiter)
286
        saved_state = self._release_save()
287
        gotit = False
288 289 290
        try:    # restore state no matter what (e.g., KeyboardInterrupt)
            if timeout is None:
                waiter.acquire()
291
                gotit = True
292
            else:
293 294 295 296
                if timeout > 0:
                    gotit = waiter.acquire(True, timeout)
                else:
                    gotit = waiter.acquire(False)
297
            return gotit
298 299
        finally:
            self._acquire_restore(saved_state)
300 301 302 303 304
            if not gotit:
                try:
                    self._waiters.remove(waiter)
                except ValueError:
                    pass
305

Kristján Valur Jónsson's avatar
Kristján Valur Jónsson committed
306
    def wait_for(self, predicate, timeout=None):
307 308 309 310 311 312 313
        """Wait until a condition evaluates to True.

        predicate should be a callable which result will be interpreted as a
        boolean value.  A timeout may be provided giving the maximum time to
        wait.

        """
Kristján Valur Jónsson's avatar
Kristján Valur Jónsson committed
314 315 316 317 318 319 320 321 322 323 324 325 326 327 328
        endtime = None
        waittime = timeout
        result = predicate()
        while not result:
            if waittime is not None:
                if endtime is None:
                    endtime = _time() + waittime
                else:
                    waittime = endtime - _time()
                    if waittime <= 0:
                        break
            self.wait(waittime)
            result = predicate()
        return result

329
    def notify(self, n=1):
330 331 332 333 334 335 336 337 338
        """Wake up one or more threads waiting on this condition, if any.

        If the calling thread has not acquired the lock when this method is
        called, a RuntimeError is raised.

        This method wakes up at most n of the threads waiting for the condition
        variable; it is a no-op if no threads are waiting.

        """
339
        if not self._is_owned():
Georg Brandl's avatar
Georg Brandl committed
340
            raise RuntimeError("cannot notify on un-acquired lock")
Raymond Hettinger's avatar
Raymond Hettinger committed
341 342 343
        all_waiters = self._waiters
        waiters_to_notify = _deque(_islice(all_waiters, n))
        if not waiters_to_notify:
344
            return
Raymond Hettinger's avatar
Raymond Hettinger committed
345
        for waiter in waiters_to_notify:
346 347
            waiter.release()
            try:
Raymond Hettinger's avatar
Raymond Hettinger committed
348
                all_waiters.remove(waiter)
349 350 351
            except ValueError:
                pass

352
    def notify_all(self):
353 354 355 356 357 358
        """Wake up all threads waiting on this condition.

        If the calling thread has not acquired the lock when this method
        is called, a RuntimeError is raised.

        """
359
        self.notify(len(self._waiters))
360

361 362
    notifyAll = notify_all

363

364
class Semaphore:
365 366 367 368 369 370 371 372
    """This class implements semaphore objects.

    Semaphores manage a counter representing the number of release() calls minus
    the number of acquire() calls, plus an initial value. The acquire() method
    blocks if necessary until it can return without making the counter
    negative. If not given, value defaults to 1.

    """
373

Andrew M. Kuchling's avatar
Andrew M. Kuchling committed
374
    # After Tim Peters' semaphore class, but not quite the same (no maximum)
375

376
    def __init__(self, value=1):
377 378
        if value < 0:
            raise ValueError("semaphore initial value must be >= 0")
379 380
        self._cond = Condition(Lock())
        self._value = value
381

382
    def acquire(self, blocking=True, timeout=None):
383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405
        """Acquire a semaphore, decrementing the internal counter by one.

        When invoked without arguments: if the internal counter is larger than
        zero on entry, decrement it by one and return immediately. If it is zero
        on entry, block, waiting until some other thread has called release() to
        make it larger than zero. This is done with proper interlocking so that
        if multiple acquire() calls are blocked, release() will wake exactly one
        of them up. The implementation may pick one at random, so the order in
        which blocked threads are awakened should not be relied on. There is no
        return value in this case.

        When invoked with blocking set to true, do the same thing as when called
        without arguments, and return true.

        When invoked with blocking set to false, do not block. If a call without
        an argument would block, return false immediately; otherwise, do the
        same thing as when called without arguments, and return true.

        When invoked with a timeout other than None, it will block for at
        most timeout seconds.  If acquire does not complete successfully in
        that interval, return false.  Return true otherwise.

        """
406 407
        if not blocking and timeout is not None:
            raise ValueError("can't specify timeout for non-blocking acquire")
408
        rc = False
409
        endtime = None
410 411 412 413 414 415 416 417 418 419 420 421 422
        with self._cond:
            while self._value == 0:
                if not blocking:
                    break
                if timeout is not None:
                    if endtime is None:
                        endtime = _time() + timeout
                    else:
                        timeout = endtime - _time()
                        if timeout <= 0:
                            break
                self._cond.wait(timeout)
            else:
423
                self._value -= 1
424
                rc = True
425 426
        return rc

427 428
    __enter__ = acquire

429
    def release(self):
430 431 432 433 434 435
        """Release a semaphore, incrementing the internal counter by one.

        When the counter is zero on entry and another thread is waiting for it
        to become larger than zero again, wake up that thread.

        """
436
        with self._cond:
437
            self._value += 1
438
            self._cond.notify()
439

440 441 442
    def __exit__(self, t, v, tb):
        self.release()

443

444
class BoundedSemaphore(Semaphore):
445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460
    """Implements a bounded semaphore.

    A bounded semaphore checks to make sure its current value doesn't exceed its
    initial value. If it does, ValueError is raised. In most situations
    semaphores are used to guard resources with limited capacity.

    If the semaphore is released too many times it's a sign of a bug. If not
    given, value defaults to 1.

    Like regular semaphores, bounded semaphores manage a counter representing
    the number of release() calls minus the number of acquire() calls, plus an
    initial value. The acquire() method blocks if necessary until it can return
    without making the counter negative. If not given, value defaults to 1.

    """

461 462
    def __init__(self, value=1):
        Semaphore.__init__(self, value)
463 464 465
        self._initial_value = value

    def release(self):
466 467 468 469 470 471 472 473 474
        """Release a semaphore, incrementing the internal counter by one.

        When the counter is zero on entry and another thread is waiting for it
        to become larger than zero again, wake up that thread.

        If the number of releases exceeds the number of acquires,
        raise a ValueError.

        """
475 476 477 478 479
        with self._cond:
            if self._value >= self._initial_value:
                raise ValueError("Semaphore released too many times")
            self._value += 1
            self._cond.notify()
480 481


482
class Event:
483 484 485 486 487 488 489
    """Class implementing event objects.

    Events manage a flag that can be set to true with the set() method and reset
    to false with the clear() method. The wait() method blocks until the flag is
    true.  The flag is initially false.

    """
490 491 492

    # After Tim Peters' event class (without is_posted())

493
    def __init__(self):
494 495
        self._cond = Condition(Lock())
        self._flag = False
496

497 498
    def _reset_internal_locks(self):
        # private!  called by Thread._reset_internal_locks by _after_fork()
499
        self._cond.__init__(Lock())
500

501
    def is_set(self):
502
        """Return true if and only if the internal flag is true."""
503
        return self._flag
504

505
    isSet = is_set
506

507
    def set(self):
508 509 510 511 512 513
        """Set the internal flag to true.

        All threads waiting for it to become true are awakened. Threads
        that call wait() once the flag is true will not block at all.

        """
514 515
        self._cond.acquire()
        try:
516
            self._flag = True
517
            self._cond.notify_all()
518 519
        finally:
            self._cond.release()
520 521

    def clear(self):
522 523 524 525 526 527
        """Reset the internal flag to false.

        Subsequently, threads calling wait() will block until set() is called to
        set the internal flag to true again.

        """
528 529
        self._cond.acquire()
        try:
530
            self._flag = False
531 532
        finally:
            self._cond.release()
533 534

    def wait(self, timeout=None):
535 536 537 538 539 540 541 542 543 544 545 546 547 548
        """Block until the internal flag is true.

        If the internal flag is true on entry, return immediately. Otherwise,
        block until another thread calls set() to set the flag to true, or until
        the optional timeout occurs.

        When the timeout argument is present and not None, it should be a
        floating point number specifying a timeout for the operation in seconds
        (or fractions thereof).

        This method returns the internal flag on exit, so it will always return
        True except if a timeout is given and the operation times out.

        """
549 550
        self._cond.acquire()
        try:
551 552 553 554
            signaled = self._flag
            if not signaled:
                signaled = self._cond.wait(timeout)
            return signaled
555 556
        finally:
            self._cond.release()
557

Kristján Valur Jónsson's avatar
Kristján Valur Jónsson committed
558 559 560 561 562 563 564 565 566 567 568

# A barrier class.  Inspired in part by the pthread_barrier_* api and
# the CyclicBarrier class from Java.  See
# http://sourceware.org/pthreads-win32/manual/pthread_barrier_init.html and
# http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/
#        CyclicBarrier.html
# for information.
# We maintain two main states, 'filling' and 'draining' enabling the barrier
# to be cyclic.  Threads are not allowed into it until it has fully drained
# since the previous cycle.  In addition, a 'resetting' state exists which is
# similar to 'draining' except that threads leave with a BrokenBarrierError,
Ezio Melotti's avatar
Ezio Melotti committed
569
# and a 'broken' state in which all threads get the exception.
570
class Barrier:
571 572 573 574 575 576
    """Implements a Barrier.

    Useful for synchronizing a fixed number of threads at known synchronization
    points.  Threads block on 'wait()' and are simultaneously once they have all
    made that call.

Kristján Valur Jónsson's avatar
Kristján Valur Jónsson committed
577
    """
578

579
    def __init__(self, parties, action=None, timeout=None):
580 581 582 583 584 585 586
        """Create a barrier, initialised to 'parties' threads.

        'action' is a callable which, when supplied, will be called by one of
        the threads after they have all entered the barrier and just prior to
        releasing them all. If a 'timeout' is provided, it is uses as the
        default for all subsequent 'wait()' calls.

Kristján Valur Jónsson's avatar
Kristján Valur Jónsson committed
587 588 589 590 591 592 593 594 595
        """
        self._cond = Condition(Lock())
        self._action = action
        self._timeout = timeout
        self._parties = parties
        self._state = 0 #0 filling, 1, draining, -1 resetting, -2 broken
        self._count = 0

    def wait(self, timeout=None):
596 597 598 599 600
        """Wait for the barrier.

        When the specified number of threads have started waiting, they are all
        simultaneously awoken. If an 'action' was provided for the barrier, one
        of the threads will have executed that callback prior to returning.
Kristján Valur Jónsson's avatar
Kristján Valur Jónsson committed
601
        Returns an individual index number from 0 to 'parties-1'.
602

Kristján Valur Jónsson's avatar
Kristján Valur Jónsson committed
603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650
        """
        if timeout is None:
            timeout = self._timeout
        with self._cond:
            self._enter() # Block while the barrier drains.
            index = self._count
            self._count += 1
            try:
                if index + 1 == self._parties:
                    # We release the barrier
                    self._release()
                else:
                    # We wait until someone releases us
                    self._wait(timeout)
                return index
            finally:
                self._count -= 1
                # Wake up any threads waiting for barrier to drain.
                self._exit()

    # Block until the barrier is ready for us, or raise an exception
    # if it is broken.
    def _enter(self):
        while self._state in (-1, 1):
            # It is draining or resetting, wait until done
            self._cond.wait()
        #see if the barrier is in a broken state
        if self._state < 0:
            raise BrokenBarrierError
        assert self._state == 0

    # Optionally run the 'action' and release the threads waiting
    # in the barrier.
    def _release(self):
        try:
            if self._action:
                self._action()
            # enter draining state
            self._state = 1
            self._cond.notify_all()
        except:
            #an exception during the _action handler.  Break and reraise
            self._break()
            raise

    # Wait in the barrier until we are relased.  Raise an exception
    # if the barrier is reset or broken.
    def _wait(self, timeout):
Kristján Valur Jónsson's avatar
Kristján Valur Jónsson committed
651 652 653 654 655 656
        if not self._cond.wait_for(lambda : self._state != 0, timeout):
            #timed out.  Break the barrier
            self._break()
            raise BrokenBarrierError
        if self._state < 0:
            raise BrokenBarrierError
Kristján Valur Jónsson's avatar
Kristján Valur Jónsson committed
657 658 659 660 661 662 663 664 665 666 667 668
        assert self._state == 1

    # If we are the last thread to exit the barrier, signal any threads
    # waiting for the barrier to drain.
    def _exit(self):
        if self._count == 0:
            if self._state in (-1, 1):
                #resetting or draining
                self._state = 0
                self._cond.notify_all()

    def reset(self):
669 670
        """Reset the barrier to the initial state.

Kristján Valur Jónsson's avatar
Kristján Valur Jónsson committed
671 672
        Any threads currently waiting will get the BrokenBarrier exception
        raised.
673

Kristján Valur Jónsson's avatar
Kristján Valur Jónsson committed
674 675 676 677 678 679 680 681 682 683 684 685 686 687 688
        """
        with self._cond:
            if self._count > 0:
                if self._state == 0:
                    #reset the barrier, waking up threads
                    self._state = -1
                elif self._state == -2:
                    #was broken, set it to reset state
                    #which clears when the last thread exits
                    self._state = -1
            else:
                self._state = 0
            self._cond.notify_all()

    def abort(self):
689 690 691 692 693
        """Place the barrier into a 'broken' state.

        Useful in case of error.  Any currently waiting threads and threads
        attempting to 'wait()' will have BrokenBarrierError raised.

Kristján Valur Jónsson's avatar
Kristján Valur Jónsson committed
694 695 696 697 698 699 700 701 702 703 704 705
        """
        with self._cond:
            self._break()

    def _break(self):
        # An internal error was detected.  The barrier is set to
        # a broken state all parties awakened.
        self._state = -2
        self._cond.notify_all()

    @property
    def parties(self):
706
        """Return the number of threads required to trip the barrier."""
Kristján Valur Jónsson's avatar
Kristján Valur Jónsson committed
707 708 709 710
        return self._parties

    @property
    def n_waiting(self):
711
        """Return the number of threads currently waiting at the barrier."""
Kristján Valur Jónsson's avatar
Kristján Valur Jónsson committed
712 713 714 715 716 717 718 719
        # We don't need synchronization here since this is an ephemeral result
        # anyway.  It returns the correct value in the steady state.
        if self._state == 0:
            return self._count
        return 0

    @property
    def broken(self):
720
        """Return True if the barrier is in a broken state."""
Kristján Valur Jónsson's avatar
Kristján Valur Jónsson committed
721 722
        return self._state == -2

723 724 725
# exception raised by the Barrier class
class BrokenBarrierError(RuntimeError):
    pass
Kristján Valur Jónsson's avatar
Kristján Valur Jónsson committed
726 727


728
# Helper to generate new thread names
729 730
_counter = _count().__next__
_counter() # Consume 0 so first non-main thread has id 1.
731
def _newname(template="Thread-%d"):
732
    return template % _counter()
733 734 735

# Active thread administration
_active_limbo_lock = _allocate_lock()
736
_active = {}    # maps thread id to Thread object
737
_limbo = {}
738
_dangling = WeakSet()
739 740 741

# Main class for threads

742
class Thread:
743 744 745 746 747 748 749
    """A class that represents a thread of control.

    This class can be safely subclassed in a limited fashion. There are two ways
    to specify the activity: by passing a callable object to the constructor, or
    by overriding the run() method in a subclass.

    """
750

751
    _initialized = False
752 753 754 755
    # Need to store a reference to sys.exc_info for printing
    # out exceptions when a thread tries to use a global var. during interp.
    # shutdown and thus raises an exception about trying to perform some
    # operation on/with a NoneType
756
    _exc_info = _sys.exc_info
757 758 759
    # Keep sys.exc_clear too to clear the exception just before
    # allowing .join() to return.
    #XXX __exc_clear = _sys.exc_clear
760 761

    def __init__(self, group=None, target=None, name=None,
762
                 args=(), kwargs=None, *, daemon=None):
763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783
        """This constructor should always be called with keyword arguments. Arguments are:

        *group* should be None; reserved for future extension when a ThreadGroup
        class is implemented.

        *target* is the callable object to be invoked by the run()
        method. Defaults to None, meaning nothing is called.

        *name* is the thread name. By default, a unique name is constructed of
        the form "Thread-N" where N is a small decimal number.

        *args* is the argument tuple for the target invocation. Defaults to ().

        *kwargs* is a dictionary of keyword arguments for the target
        invocation. Defaults to {}.

        If a subclass overrides the constructor, it must make sure to invoke
        the base class constructor (Thread.__init__()) before doing anything
        else to the thread.

        """
784
        assert group is None, "group argument must be None for now"
785 786
        if kwargs is None:
            kwargs = {}
787 788 789 790
        self._target = target
        self._name = str(name or _newname())
        self._args = args
        self._kwargs = kwargs
791 792 793 794
        if daemon is not None:
            self._daemonic = daemon
        else:
            self._daemonic = current_thread().daemon
Georg Brandl's avatar
Georg Brandl committed
795
        self._ident = None
796
        self._tstate_lock = None
Christian Heimes's avatar
Christian Heimes committed
797
        self._started = Event()
798
        self._is_stopped = False
799
        self._initialized = True
800 801
        # sys.stderr is not stored in the class like
        # sys.exc_info since it can be changed between instances
802
        self._stderr = _sys.stderr
803
        # For debugging and _after_fork()
804
        _dangling.add(self)
805

806
    def _reset_internal_locks(self, is_alive):
807 808 809
        # private!  Called by _after_fork() to reset our internal locks as
        # they may be in an invalid state leading to a deadlock or crash.
        self._started._reset_internal_locks()
810 811 812 813 814
        if is_alive:
            self._set_tstate_lock()
        else:
            # The thread isn't alive after fork: it doesn't have a tstate
            # anymore.
815
            self._is_stopped = True
816
            self._tstate_lock = None
817

818
    def __repr__(self):
819
        assert self._initialized, "Thread.__init__() was not called"
820
        status = "initial"
821
        if self._started.is_set():
822
            status = "started"
823
        self.is_alive() # easy way to get ._is_stopped set when appropriate
824
        if self._is_stopped:
825
            status = "stopped"
826
        if self._daemonic:
Georg Brandl's avatar
Georg Brandl committed
827 828 829
            status += " daemon"
        if self._ident is not None:
            status += " %s" % self._ident
830
        return "<%s(%s, %s)>" % (self.__class__.__name__, self._name, status)
831 832

    def start(self):
833 834 835 836 837 838 839 840 841
        """Start the thread's activity.

        It must be called at most once per thread object. It arranges for the
        object's run() method to be invoked in a separate thread of control.

        This method will raise a RuntimeError if called more than once on the
        same thread object.

        """
842
        if not self._initialized:
843
            raise RuntimeError("thread.__init__() not called")
Christian Heimes's avatar
Christian Heimes committed
844

845
        if self._started.is_set():
846
            raise RuntimeError("threads can only be started once")
Benjamin Peterson's avatar
Benjamin Peterson committed
847 848
        with _active_limbo_lock:
            _limbo[self] = self
849 850 851 852 853 854
        try:
            _start_new_thread(self._bootstrap, ())
        except Exception:
            with _active_limbo_lock:
                del _limbo[self]
            raise
Christian Heimes's avatar
Christian Heimes committed
855
        self._started.wait()
856 857

    def run(self):
858 859 860 861 862 863 864 865
        """Method representing the thread's activity.

        You may override this method in a subclass. The standard run() method
        invokes the callable object passed to the object's constructor as the
        target argument, if any, with sequential and keyword arguments taken
        from the args and kwargs arguments, respectively.

        """
866 867 868 869 870 871 872
        try:
            if self._target:
                self._target(*self._args, **self._kwargs)
        finally:
            # Avoid a refcycle if the thread is running a function with
            # an argument that has a member that points to the thread.
            del self._target, self._args, self._kwargs
873

874
    def _bootstrap(self):
875 876 877 878 879
        # Wrapper around the real bootstrap code that ignores
        # exceptions during interpreter cleanup.  Those typically
        # happen when a daemon thread wakes up at an unfortunate
        # moment, finds the world around it destroyed, and raises some
        # random exception *** while trying to report the exception in
Christian Heimes's avatar
Christian Heimes committed
880
        # _bootstrap_inner() below ***.  Those random exceptions
881 882 883
        # don't help anybody, and they confuse users, so we suppress
        # them.  We suppress them only when it appears that the world
        # indeed has already been destroyed, so that exceptions in
Christian Heimes's avatar
Christian Heimes committed
884
        # _bootstrap_inner() during normal business hours are properly
885 886 887
        # reported.  Also, we only suppress them for daemonic threads;
        # if a non-daemonic encounters this, something else is wrong.
        try:
888
            self._bootstrap_inner()
889
        except:
890
            if self._daemonic and _sys is None:
891 892 893
                return
            raise

Benjamin Peterson's avatar
Benjamin Peterson committed
894
    def _set_ident(self):
895
        self._ident = get_ident()
Benjamin Peterson's avatar
Benjamin Peterson committed
896

897 898 899 900 901 902 903 904
    def _set_tstate_lock(self):
        """
        Set a lock object which will be released by the interpreter when
        the underlying thread state (see pystate.h) gets deleted.
        """
        self._tstate_lock = _set_sentinel()
        self._tstate_lock.acquire()

905
    def _bootstrap_inner(self):
906
        try:
Benjamin Peterson's avatar
Benjamin Peterson committed
907
            self._set_ident()
908
            self._set_tstate_lock()
Christian Heimes's avatar
Christian Heimes committed
909
            self._started.set()
Benjamin Peterson's avatar
Benjamin Peterson committed
910 911 912
            with _active_limbo_lock:
                _active[self._ident] = self
                del _limbo[self]
913 914 915 916 917

            if _trace_hook:
                _sys.settrace(_trace_hook)
            if _profile_hook:
                _sys.setprofile(_profile_hook)
Tim Peters's avatar
Tim Peters committed
918

919 920 921
            try:
                self.run()
            except SystemExit:
922
                pass
923
            except:
924
                # If sys.stderr is no more (most likely from interpreter
925
                # shutdown) use self._stderr.  Otherwise still use sys (as in
926 927
                # _sys) in case sys.stderr was redefined since the creation of
                # self.
928 929 930 931
                if _sys and _sys.stderr is not None:
                    print("Exception in thread %s:\n%s" %
                          (self.name, _format_exc()), file=self._stderr)
                elif self._stderr is not None:
932 933 934
                    # Do the best job possible w/o a huge amt. of code to
                    # approximate a traceback (code ideas from
                    # Lib/traceback.py)
935
                    exc_type, exc_value, exc_tb = self._exc_info()
936
                    try:
937
                        print((
938
                            "Exception in thread " + self.name +
939
                            " (most likely raised during interpreter shutdown):"), file=self._stderr)
940
                        print((
941
                            "Traceback (most recent call last):"), file=self._stderr)
942
                        while exc_tb:
943
                            print((
944 945 946
                                '  File "%s", line %s, in %s' %
                                (exc_tb.tb_frame.f_code.co_filename,
                                    exc_tb.tb_lineno,
947
                                    exc_tb.tb_frame.f_code.co_name)), file=self._stderr)
948
                            exc_tb = exc_tb.tb_next
949
                        print(("%s: %s" % (exc_type, exc_value)), file=self._stderr)
950 951 952 953
                    # Make sure that exc_tb gets deleted since it is a memory
                    # hog; deleting everything else is just for thoroughness
                    finally:
                        del exc_type, exc_value, exc_tb
954 955 956 957 958
            finally:
                # Prevent a race in
                # test_threading.test_no_refcycle_through_target when
                # the exception keeps the target alive past when we
                # assert that it's dead.
959
                #XXX self._exc_clear()
960
                pass
961
        finally:
962 963
            with _active_limbo_lock:
                try:
Georg Brandl's avatar
Georg Brandl committed
964
                    # We don't call self._delete() because it also
965
                    # grabs _active_limbo_lock.
966
                    del _active[get_ident()]
967 968
                except:
                    pass
969

970
    def _stop(self):
971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989
        # After calling ._stop(), .is_alive() returns False and .join() returns
        # immediately.  ._tstate_lock must be released before calling ._stop().
        #
        # Normal case:  C code at the end of the thread's life
        # (release_sentinel in _threadmodule.c) releases ._tstate_lock, and
        # that's detected by our ._wait_for_tstate_lock(), called by .join()
        # and .is_alive().  Any number of threads _may_ call ._stop()
        # simultaneously (for example, if multiple threads are blocked in
        # .join() calls), and they're not serialized.  That's harmless -
        # they'll just make redundant rebindings of ._is_stopped and
        # ._tstate_lock.  Obscure:  we rebind ._tstate_lock last so that the
        # "assert self._is_stopped" in ._wait_for_tstate_lock() always works
        # (the assert is executed only if ._tstate_lock is None).
        #
        # Special case:  _main_thread releases ._tstate_lock via this
        # module's _shutdown() function.
        lock = self._tstate_lock
        if lock is not None:
            assert not lock.locked()
990 991
        self._is_stopped = True
        self._tstate_lock = None
992

993
    def _delete(self):
994 995
        "Remove current thread from the dict of currently running threads."

996
        # Notes about running with _dummy_thread:
997
        #
998
        # Must take care to not raise an exception if _dummy_thread is being
999
        # used (and thus this module is being used as an instance of
1000 1001
        # dummy_threading).  _dummy_thread.get_ident() always returns -1 since
        # there is only one thread if _dummy_thread is being used.  Thus
1002 1003 1004 1005 1006
        # len(_active) is always <= 1 here, and any Thread instance created
        # overwrites the (if any) thread currently registered in _active.
        #
        # An instance of _MainThread is always created by 'threading'.  This
        # gets overwritten the instant an instance of Thread is created; both
1007
        # threads return -1 from _dummy_thread.get_ident() and thus have the
1008 1009 1010 1011 1012 1013 1014 1015 1016
        # same key in the dict.  So when the _MainThread instance created by
        # 'threading' tries to clean itself up when atexit calls this method
        # it gets a KeyError if another Thread instance was created.
        #
        # This all means that KeyError from trying to delete something from
        # _active if dummy_threading is being used is a red herring.  But
        # since it isn't if dummy_threading is *not* being used then don't
        # hide the exception.

1017
        try:
Neal Norwitz's avatar
Neal Norwitz committed
1018
            with _active_limbo_lock:
1019
                del _active[get_ident()]
Neal Norwitz's avatar
Neal Norwitz committed
1020 1021 1022
                # There must not be any python code between the previous line
                # and after the lock is released.  Otherwise a tracing function
                # could try to acquire the lock again in the same thread, (in
1023
                # current_thread()), and would block.
Neal Norwitz's avatar
Neal Norwitz committed
1024 1025 1026
        except KeyError:
            if 'dummy_threading' not in _sys.modules:
                raise
1027 1028

    def join(self, timeout=None):
1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051
        """Wait until the thread terminates.

        This blocks the calling thread until the thread whose join() method is
        called terminates -- either normally or through an unhandled exception
        or until the optional timeout occurs.

        When the timeout argument is present and not None, it should be a
        floating point number specifying a timeout for the operation in seconds
        (or fractions thereof). As join() always returns None, you must call
        isAlive() after join() to decide whether a timeout happened -- if the
        thread is still alive, the join() call timed out.

        When the timeout argument is not present or None, the operation will
        block until the thread terminates.

        A thread can be join()ed many times.

        join() raises a RuntimeError if an attempt is made to join the current
        thread as that would cause a deadlock. It is also an error to join() a
        thread before it has been started and attempts to do so raises the same
        exception.

        """
1052
        if not self._initialized:
1053
            raise RuntimeError("Thread.__init__() not called")
1054
        if not self._started.is_set():
1055
            raise RuntimeError("cannot join thread before it is started")
1056
        if self is current_thread():
1057
            raise RuntimeError("cannot join current thread")
1058

1059 1060
        if timeout is None:
            self._wait_for_tstate_lock()
1061 1062
        else:
            # the behavior of a negative timeout isn't documented, but
1063
            # historically .join(timeout=x) for x<0 has acted as if timeout=0
1064
            self._wait_for_tstate_lock(timeout=max(timeout, 0))
1065

1066
    def _wait_for_tstate_lock(self, block=True, timeout=-1):
1067
        # Issue #18808: wait for the thread state to be gone.
1068 1069 1070 1071 1072
        # At the end of the thread's life, after all knowledge of the thread
        # is removed from C data structures, C code releases our _tstate_lock.
        # This method passes its arguments to _tstate_lock.aquire().
        # If the lock is acquired, the C code is done, and self._stop() is
        # called.  That sets ._is_stopped to True, and ._tstate_lock to None.
1073
        lock = self._tstate_lock
1074 1075 1076
        if lock is None:  # already determined that the C code is done
            assert self._is_stopped
        elif lock.acquire(block, timeout):
1077
            lock.release()
1078
            self._stop()
1079

1080 1081
    @property
    def name(self):
1082 1083 1084 1085 1086 1087
        """A string used for identification purposes only.

        It has no semantics. Multiple threads may be given the same name. The
        initial name is set by the constructor.

        """
1088 1089
        assert self._initialized, "Thread.__init__() not called"
        return self._name
1090

1091 1092
    @name.setter
    def name(self, name):
1093 1094
        assert self._initialized, "Thread.__init__() not called"
        self._name = str(name)
1095

1096 1097
    @property
    def ident(self):
1098 1099 1100 1101 1102 1103 1104
        """Thread identifier of this thread or None if it has not been started.

        This is a nonzero integer. See the thread.get_ident() function. Thread
        identifiers may be recycled when a thread exits and another thread is
        created. The identifier is available even after the thread has exited.

        """
Georg Brandl's avatar
Georg Brandl committed
1105 1106 1107
        assert self._initialized, "Thread.__init__() not called"
        return self._ident

1108
    def is_alive(self):
1109 1110 1111 1112 1113 1114 1115
        """Return whether the thread is alive.

        This method returns True just before the run() method starts until just
        after the run() method terminates. The module function enumerate()
        returns a list of all alive threads.

        """
1116
        assert self._initialized, "Thread.__init__() not called"
1117
        if self._is_stopped or not self._started.is_set():
1118 1119
            return False
        self._wait_for_tstate_lock(False)
1120
        return not self._is_stopped
Tim Peters's avatar
Tim Peters committed
1121

1122
    isAlive = is_alive
1123

1124 1125
    @property
    def daemon(self):
1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136
        """A boolean value indicating whether this thread is a daemon thread.

        This must be set before start() is called, otherwise RuntimeError is
        raised. Its initial value is inherited from the creating thread; the
        main thread is not a daemon thread and therefore all threads created in
        the main thread default to daemon = False.

        The entire Python program exits when no alive non-daemon threads are
        left.

        """
1137 1138
        assert self._initialized, "Thread.__init__() not called"
        return self._daemonic
1139

1140 1141
    @daemon.setter
    def daemon(self, daemonic):
1142
        if not self._initialized:
1143
            raise RuntimeError("Thread.__init__() not called")
1144
        if self._started.is_set():
Antoine Pitrou's avatar
Antoine Pitrou committed
1145
            raise RuntimeError("cannot set daemon status of active thread")
1146
        self._daemonic = daemonic
1147

1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159
    def isDaemon(self):
        return self.daemon

    def setDaemon(self, daemonic):
        self.daemon = daemonic

    def getName(self):
        return self.name

    def setName(self, name):
        self.name = name

1160 1161
# The timer class was contributed by Itamar Shtull-Trauring

1162
class Timer(Thread):
1163
    """Call a function after a specified number of seconds:
Tim Peters's avatar
Tim Peters committed
1164

1165 1166 1167 1168
            t = Timer(30.0, f, args=None, kwargs=None)
            t.start()
            t.cancel()     # stop the timer's action if it's still waiting

1169
    """
Tim Peters's avatar
Tim Peters committed
1170

1171
    def __init__(self, interval, function, args=None, kwargs=None):
1172 1173 1174
        Thread.__init__(self)
        self.interval = interval
        self.function = function
1175 1176
        self.args = args if args is not None else []
        self.kwargs = kwargs if kwargs is not None else {}
1177
        self.finished = Event()
Tim Peters's avatar
Tim Peters committed
1178

1179
    def cancel(self):
1180
        """Stop the timer if it hasn't finished yet."""
1181
        self.finished.set()
Tim Peters's avatar
Tim Peters committed
1182

1183 1184
    def run(self):
        self.finished.wait(self.interval)
1185
        if not self.finished.is_set():
1186 1187
            self.function(*self.args, **self.kwargs)
        self.finished.set()
1188 1189 1190 1191 1192 1193 1194

# Special thread class to represent the main thread
# This is garbage collected through an exit handler

class _MainThread(Thread):

    def __init__(self):
1195
        Thread.__init__(self, name="MainThread", daemon=False)
1196
        self._set_tstate_lock()
Christian Heimes's avatar
Christian Heimes committed
1197
        self._started.set()
Benjamin Peterson's avatar
Benjamin Peterson committed
1198 1199 1200
        self._set_ident()
        with _active_limbo_lock:
            _active[self._ident] = self
1201 1202 1203


# Dummy thread class to represent threads not started here.
1204
# These aren't garbage collected when they die, nor can they be waited for.
1205
# If they invoke anything in threading.py that calls current_thread(), they
1206
# leave an entry in the _active dict forever after.
1207
# Their purpose is to return *something* from current_thread().
1208 1209 1210 1211
# They are marked as daemon threads so we won't wait for them
# when we exit (conform previous semantics).

class _DummyThread(Thread):
Tim Peters's avatar
Tim Peters committed
1212

1213
    def __init__(self):
1214
        Thread.__init__(self, name=_newname("Dummy-%d"), daemon=True)
1215

Christian Heimes's avatar
Christian Heimes committed
1216
        self._started.set()
Benjamin Peterson's avatar
Benjamin Peterson committed
1217 1218 1219
        self._set_ident()
        with _active_limbo_lock:
            _active[self._ident] = self
1220

1221 1222 1223
    def _stop(self):
        pass

1224
    def join(self, timeout=None):
1225
        assert False, "cannot join a dummy thread"
1226 1227 1228 1229


# Global API functions

1230
def current_thread():
1231 1232 1233 1234 1235 1236
    """Return the current Thread object, corresponding to the caller's thread of control.

    If the caller's thread of control was not created through the threading
    module, a dummy thread object with limited functionality is returned.

    """
1237
    try:
1238
        return _active[get_ident()]
1239 1240 1241
    except KeyError:
        return _DummyThread()

1242
currentThread = current_thread
1243

1244
def active_count():
1245 1246 1247 1248 1249 1250
    """Return the number of Thread objects currently alive.

    The returned count is equal to the length of the list returned by
    enumerate().

    """
Benjamin Peterson's avatar
Benjamin Peterson committed
1251 1252
    with _active_limbo_lock:
        return len(_active) + len(_limbo)
1253

1254
activeCount = active_count
1255

1256 1257 1258 1259
def _enumerate():
    # Same as enumerate(), but without the lock. Internal use only.
    return list(_active.values()) + list(_limbo.values())

1260
def enumerate():
1261 1262 1263 1264 1265 1266 1267
    """Return a list of all Thread objects currently alive.

    The list includes daemonic threads, dummy thread objects created by
    current_thread(), and the main thread. It excludes terminated threads and
    threads that have not yet been started.

    """
Benjamin Peterson's avatar
Benjamin Peterson committed
1268 1269
    with _active_limbo_lock:
        return list(_active.values()) + list(_limbo.values())
1270

1271
from _thread import stack_size
1272

1273 1274 1275
# Create the main thread object,
# and make it available for the interpreter
# (Py_Main) as threading._shutdown.
1276

1277 1278 1279
_main_thread = _MainThread()

def _shutdown():
1280 1281 1282 1283 1284
    # Obscure:  other threads may be waiting to join _main_thread.  That's
    # dubious, but some code does it.  We can't wait for C code to release
    # the main thread's tstate_lock - that won't happen until the interpreter
    # is nearly dead.  So we release it here.  Note that just calling _stop()
    # isn't enough:  other threads may already be waiting on _tstate_lock.
1285 1286 1287 1288 1289 1290
    tlock = _main_thread._tstate_lock
    # The main thread isn't finished yet, so its thread state lock can't have
    # been released.
    assert tlock is not None
    assert tlock.locked()
    tlock.release()
1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304
    _main_thread._stop()
    t = _pickSomeNonDaemonThread()
    while t:
        t.join()
        t = _pickSomeNonDaemonThread()
    _main_thread._delete()

def _pickSomeNonDaemonThread():
    for t in enumerate():
        if not t.daemon and t.is_alive():
            return t
    return None

def main_thread():
1305 1306 1307 1308 1309
    """Return the main thread object.

    In normal conditions, the main thread is the thread from which the
    Python interpreter was started.
    """
1310
    return _main_thread
1311

1312 1313 1314 1315
# get thread-local implementation, either from the thread
# module, or from the python fallback

try:
1316
    from _thread import _local as local
1317 1318 1319
except ImportError:
    from _threading_local import local

1320

Jesse Noller's avatar
Jesse Noller committed
1321 1322 1323 1324 1325 1326 1327
def _after_fork():
    # This function is called by Python/ceval.c:PyEval_ReInitThreads which
    # is called from PyOS_AfterFork.  Here we cleanup threading module state
    # that should not exist after a fork.

    # Reset _active_limbo_lock, in case we forked while the lock was held
    # by another (non-forked) thread.  http://bugs.python.org/issue874900
1328
    global _active_limbo_lock, _main_thread
Jesse Noller's avatar
Jesse Noller committed
1329 1330 1331 1332 1333
    _active_limbo_lock = _allocate_lock()

    # fork() only copied the current thread; clear references to others.
    new_active = {}
    current = current_thread()
1334
    _main_thread = current
Jesse Noller's avatar
Jesse Noller committed
1335
    with _active_limbo_lock:
1336 1337 1338 1339 1340
        # Dangling thread instances must still have their locks reset,
        # because someone may join() them.
        threads = set(_enumerate())
        threads.update(_dangling)
        for thread in threads:
1341 1342
            # Any lock/condition variable may be currently locked or in an
            # invalid state, so we reinitialize them.
Jesse Noller's avatar
Jesse Noller committed
1343
            if thread is current:
1344 1345
                # There is only one active thread. We reset the ident to
                # its new value since it can have changed.
1346
                thread._reset_internal_locks(True)
1347
                ident = get_ident()
1348
                thread._ident = ident
Jesse Noller's avatar
Jesse Noller committed
1349 1350 1351
                new_active[ident] = thread
            else:
                # All the others are already stopped.
1352
                thread._reset_internal_locks(False)
1353
                thread._stop()
Jesse Noller's avatar
Jesse Noller committed
1354 1355 1356 1357 1358

        _limbo.clear()
        _active.clear()
        _active.update(new_active)
        assert len(_active) == 1