threading.py 20.9 KB
Newer Older
1
"""Thread module emulating a subset of Java's threading model."""
2

3 4 5 6 7 8 9 10 11 12 13
import sys as _sys

try:
    import thread
except ImportError:
    del _sys.modules[__name__]
    raise

from StringIO import StringIO as _StringIO
from time import time as _time, sleep as _sleep
from traceback import print_exc as _print_exc
14 15

# Rename some stuff so "from threading import *" is safe
16
__all__ = ['activeCount', 'Condition', 'currentThread', 'enumerate', 'Event',
17 18
           'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread',
           'Timer', 'setprofile', 'settrace']
19 20 21 22

_start_new_thread = thread.start_new_thread
_allocate_lock = thread.allocate_lock
_get_ident = thread.get_ident
Jeremy Hylton's avatar
Jeremy Hylton committed
23
ThreadError = thread.error
24 25 26 27 28
del thread


# Debug support (adapted from ihooks.py)

29
_VERBOSE = 0 # XXX Bool or int?
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54

if __debug__:

    class _Verbose:

        def __init__(self, verbose=None):
            if verbose is None:
                verbose = _VERBOSE
            self.__verbose = verbose

        def _note(self, format, *args):
            if self.__verbose:
                format = format % args
                format = "%s: %s\n" % (
                    currentThread().getName(), format)
                _sys.stderr.write(format)

else:
    # Disable this when using "python -O"
    class _Verbose:
        def __init__(self, verbose=None):
            pass
        def _note(self, *args):
            pass

55 56 57 58 59 60 61 62
# Support for profile and trace hooks

_profile_hook = None
_trace_hook = None

def setprofile(func):
    global _profile_hook
    _profile_hook = func
Tim Peters's avatar
Tim Peters committed
63

64 65 66
def settrace(func):
    global _trace_hook
    _trace_hook = func
67 68 69 70 71 72

# Synchronization classes

Lock = _allocate_lock

def RLock(*args, **kwargs):
73
    return _RLock(*args, **kwargs)
74 75

class _RLock(_Verbose):
Tim Peters's avatar
Tim Peters committed
76

77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
    def __init__(self, verbose=None):
        _Verbose.__init__(self, verbose)
        self.__block = _allocate_lock()
        self.__owner = None
        self.__count = 0

    def __repr__(self):
        return "<%s(%s, %d)>" % (
                self.__class__.__name__,
                self.__owner and self.__owner.getName(),
                self.__count)

    def acquire(self, blocking=1):
        me = currentThread()
        if self.__owner is me:
            self.__count = self.__count + 1
            if __debug__:
                self._note("%s.acquire(%s): recursive success", self, blocking)
            return 1
        rc = self.__block.acquire(blocking)
        if rc:
            self.__owner = me
            self.__count = 1
            if __debug__:
                self._note("%s.acquire(%s): initial succes", self, blocking)
        else:
            if __debug__:
                self._note("%s.acquire(%s): failure", self, blocking)
        return rc

    def release(self):
        me = currentThread()
        assert self.__owner is me, "release() of un-acquire()d lock"
        self.__count = count = self.__count - 1
        if not count:
            self.__owner = None
            self.__block.release()
            if __debug__:
                self._note("%s.release(): final release", self)
        else:
            if __debug__:
                self._note("%s.release(): non-final release", self)

    # Internal methods used by condition variables

    def _acquire_restore(self, (count, owner)):
        self.__block.acquire()
        self.__count = count
        self.__owner = owner
        if __debug__:
            self._note("%s._acquire_restore()", self)

    def _release_save(self):
        if __debug__:
            self._note("%s._release_save()", self)
        count = self.__count
        self.__count = 0
        owner = self.__owner
        self.__owner = None
        self.__block.release()
        return (count, owner)

    def _is_owned(self):
        return self.__owner is currentThread()


def Condition(*args, **kwargs):
144
    return _Condition(*args, **kwargs)
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182

class _Condition(_Verbose):

    def __init__(self, lock=None, verbose=None):
        _Verbose.__init__(self, verbose)
        if lock is None:
            lock = RLock()
        self.__lock = lock
        # Export the lock's acquire() and release() methods
        self.acquire = lock.acquire
        self.release = lock.release
        # If the lock defines _release_save() and/or _acquire_restore(),
        # these override the default implementations (which just call
        # release() and acquire() on the lock).  Ditto for _is_owned().
        try:
            self._release_save = lock._release_save
        except AttributeError:
            pass
        try:
            self._acquire_restore = lock._acquire_restore
        except AttributeError:
            pass
        try:
            self._is_owned = lock._is_owned
        except AttributeError:
            pass
        self.__waiters = []

    def __repr__(self):
        return "<Condition(%s, %d)>" % (self.__lock, len(self.__waiters))

    def _release_save(self):
        self.__lock.release()           # No state to save

    def _acquire_restore(self, x):
        self.__lock.acquire()           # Ignore saved state

    def _is_owned(self):
Jeremy Hylton's avatar
Jeremy Hylton committed
183
        # Return True if lock is owned by currentThread.
Jeremy Hylton's avatar
Jeremy Hylton committed
184
        # This method is called only if __lock doesn't have _is_owned().
185 186
        if self.__lock.acquire(0):
            self.__lock.release()
187
            return False
188
        else:
189
            return True
190 191

    def wait(self, timeout=None):
192
        currentThread() # for side-effect
193 194 195 196 197
        assert self._is_owned(), "wait() of un-acquire()d lock"
        waiter = _allocate_lock()
        waiter.acquire()
        self.__waiters.append(waiter)
        saved_state = self._release_save()
198 199 200
        try:    # restore state no matter what (e.g., KeyboardInterrupt)
            if timeout is None:
                waiter.acquire()
201
                if __debug__:
202
                    self._note("%s.wait(): got it", self)
203
            else:
204 205 206 207 208
                # Balancing act:  We can't afford a pure busy loop, so we
                # have to sleep; but if we sleep the whole timeout time,
                # we'll be unresponsive.  The scheme here sleeps very
                # little at first, longer as time goes on, but never longer
                # than 20 times per second (or the timeout time remaining).
209
                endtime = _time() + timeout
210
                delay = 0.0005 # 500 us -> initial delay of 1 ms
211
                while True:
212
                    gotit = waiter.acquire(0)
213
                    if gotit:
214
                        break
215 216 217 218
                    remaining = endtime - _time()
                    if remaining <= 0:
                        break
                    delay = min(delay * 2, remaining, .05)
219 220 221 222 223 224 225 226 227 228 229 230 231
                    _sleep(delay)
                if not gotit:
                    if __debug__:
                        self._note("%s.wait(%s): timed out", self, timeout)
                    try:
                        self.__waiters.remove(waiter)
                    except ValueError:
                        pass
                else:
                    if __debug__:
                        self._note("%s.wait(%s): got it", self, timeout)
        finally:
            self._acquire_restore(saved_state)
232 233

    def notify(self, n=1):
234
        currentThread() # for side-effect
235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255
        assert self._is_owned(), "notify() of un-acquire()d lock"
        __waiters = self.__waiters
        waiters = __waiters[:n]
        if not waiters:
            if __debug__:
                self._note("%s.notify(): no waiters", self)
            return
        self._note("%s.notify(): notifying %d waiter%s", self, n,
                   n!=1 and "s" or "")
        for waiter in waiters:
            waiter.release()
            try:
                __waiters.remove(waiter)
            except ValueError:
                pass

    def notifyAll(self):
        self.notify(len(self.__waiters))


def Semaphore(*args, **kwargs):
256
    return _Semaphore(*args, **kwargs)
257 258 259

class _Semaphore(_Verbose):

Andrew M. Kuchling's avatar
Andrew M. Kuchling committed
260
    # After Tim Peters' semaphore class, but not quite the same (no maximum)
261 262 263 264 265 266 267 268

    def __init__(self, value=1, verbose=None):
        assert value >= 0, "Semaphore initial value must be >= 0"
        _Verbose.__init__(self, verbose)
        self.__cond = Condition(Lock())
        self.__value = value

    def acquire(self, blocking=1):
269
        rc = False
270 271 272 273
        self.__cond.acquire()
        while self.__value == 0:
            if not blocking:
                break
274 275 276
            if __debug__:
                self._note("%s.acquire(%s): blocked waiting, value=%s",
                           self, blocking, self.__value)
277 278 279
            self.__cond.wait()
        else:
            self.__value = self.__value - 1
280
            if __debug__:
281 282
                self._note("%s.acquire: success, value=%s",
                           self, self.__value)
283
            rc = True
284 285 286 287 288 289
        self.__cond.release()
        return rc

    def release(self):
        self.__cond.acquire()
        self.__value = self.__value + 1
290
        if __debug__:
291 292
            self._note("%s.release: success, value=%s",
                       self, self.__value)
293 294 295 296
        self.__cond.notify()
        self.__cond.release()


297
def BoundedSemaphore(*args, **kwargs):
298
    return _BoundedSemaphore(*args, **kwargs)
299 300 301 302 303 304 305 306 307 308 309 310 311

class _BoundedSemaphore(_Semaphore):
    """Semaphore that checks that # releases is <= # acquires"""
    def __init__(self, value=1, verbose=None):
        _Semaphore.__init__(self, value, verbose)
        self._initial_value = value

    def release(self):
        if self._Semaphore__value >= self._initial_value:
            raise ValueError, "Semaphore released too many times"
        return _Semaphore.release(self)


312
def Event(*args, **kwargs):
313
    return _Event(*args, **kwargs)
314 315 316 317 318 319 320 321

class _Event(_Verbose):

    # After Tim Peters' event class (without is_posted())

    def __init__(self, verbose=None):
        _Verbose.__init__(self, verbose)
        self.__cond = Condition(Lock())
322
        self.__flag = False
323 324 325 326 327 328

    def isSet(self):
        return self.__flag

    def set(self):
        self.__cond.acquire()
329 330 331 332 333
        try:
            self.__flag = True
            self.__cond.notifyAll()
        finally:
            self.__cond.release()
334 335 336

    def clear(self):
        self.__cond.acquire()
337 338 339 340
        try:
            self.__flag = False
        finally:
            self.__cond.release()
341 342 343

    def wait(self, timeout=None):
        self.__cond.acquire()
344 345 346 347 348
        try:
            if not self.__flag:
                self.__cond.wait(timeout)
        finally:
            self.__cond.release()
349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366

# Helper to generate new thread names
_counter = 0
def _newname(template="Thread-%d"):
    global _counter
    _counter = _counter + 1
    return template % _counter

# Active thread administration
_active_limbo_lock = _allocate_lock()
_active = {}
_limbo = {}


# Main class for threads

class Thread(_Verbose):

367
    __initialized = False
368 369 370

    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs={}, verbose=None):
371
        assert group is None, "group argument must be None for now"
372 373 374 375 376 377
        _Verbose.__init__(self, verbose)
        self.__target = target
        self.__name = str(name or _newname())
        self.__args = args
        self.__kwargs = kwargs
        self.__daemonic = self._set_daemon()
378 379
        self.__started = False
        self.__stopped = False
380
        self.__block = Condition(Lock())
381
        self.__initialized = True
382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406

    def _set_daemon(self):
        # Overridden in _MainThread and _DummyThread
        return currentThread().isDaemon()

    def __repr__(self):
        assert self.__initialized, "Thread.__init__() was not called"
        status = "initial"
        if self.__started:
            status = "started"
        if self.__stopped:
            status = "stopped"
        if self.__daemonic:
            status = status + " daemon"
        return "<%s(%s, %s)>" % (self.__class__.__name__, self.__name, status)

    def start(self):
        assert self.__initialized, "Thread.__init__() not called"
        assert not self.__started, "thread already started"
        if __debug__:
            self._note("%s.start(): starting thread", self)
        _active_limbo_lock.acquire()
        _limbo[self] = self
        _active_limbo_lock.release()
        _start_new_thread(self.__bootstrap, ())
407
        self.__started = True
408 409 410 411
        _sleep(0.000001)    # 1 usec, to let the thread run (Solaris hack)

    def run(self):
        if self.__target:
412
            self.__target(*self.__args, **self.__kwargs)
413 414 415

    def __bootstrap(self):
        try:
416
            self.__started = True
417 418 419 420 421 422
            _active_limbo_lock.acquire()
            _active[_get_ident()] = self
            del _limbo[self]
            _active_limbo_lock.release()
            if __debug__:
                self._note("%s.__bootstrap(): thread started", self)
423 424 425 426 427 428 429

            if _trace_hook:
                self._note("%s.__bootstrap(): registering trace hook", self)
                _sys.settrace(_trace_hook)
            if _profile_hook:
                self._note("%s.__bootstrap(): registering profile hook", self)
                _sys.setprofile(_profile_hook)
Tim Peters's avatar
Tim Peters committed
430

431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447
            try:
                self.run()
            except SystemExit:
                if __debug__:
                    self._note("%s.__bootstrap(): raised SystemExit", self)
            except:
                if __debug__:
                    self._note("%s.__bootstrap(): unhandled exception", self)
                s = _StringIO()
                _print_exc(file=s)
                _sys.stderr.write("Exception in thread %s:\n%s\n" %
                                 (self.getName(), s.getvalue()))
            else:
                if __debug__:
                    self._note("%s.__bootstrap(): normal return", self)
        finally:
            self.__stop()
448 449 450 451
            try:
                self.__delete()
            except:
                pass
452 453 454

    def __stop(self):
        self.__block.acquire()
455
        self.__stopped = True
456 457 458 459 460 461 462 463 464 465
        self.__block.notifyAll()
        self.__block.release()

    def __delete(self):
        _active_limbo_lock.acquire()
        del _active[_get_ident()]
        _active_limbo_lock.release()

    def join(self, timeout=None):
        assert self.__initialized, "Thread.__init__() not called"
466
        assert self.__started, "cannot join thread before it is started"
467 468 469 470 471 472
        assert self is not currentThread(), "cannot join current thread"
        if __debug__:
            if not self.__stopped:
                self._note("%s.join(): waiting until thread stops", self)
        self.__block.acquire()
        if timeout is None:
473
            while not self.__stopped:
474 475 476 477
                self.__block.wait()
            if __debug__:
                self._note("%s.join(): thread stopped", self)
        else:
478
            deadline = _time() + timeout
479
            while not self.__stopped:
480
                delay = deadline - _time()
481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501
                if delay <= 0:
                    if __debug__:
                        self._note("%s.join(): timed out", self)
                    break
                self.__block.wait(delay)
            else:
                if __debug__:
                    self._note("%s.join(): thread stopped", self)
        self.__block.release()

    def getName(self):
        assert self.__initialized, "Thread.__init__() not called"
        return self.__name

    def setName(self, name):
        assert self.__initialized, "Thread.__init__() not called"
        self.__name = str(name)

    def isAlive(self):
        assert self.__initialized, "Thread.__init__() not called"
        return self.__started and not self.__stopped
Tim Peters's avatar
Tim Peters committed
502

503 504 505 506 507 508 509 510 511
    def isDaemon(self):
        assert self.__initialized, "Thread.__init__() not called"
        return self.__daemonic

    def setDaemon(self, daemonic):
        assert self.__initialized, "Thread.__init__() not called"
        assert not self.__started, "cannot set daemon status of active thread"
        self.__daemonic = daemonic

512 513 514 515 516 517 518
# The timer class was contributed by Itamar Shtull-Trauring

def Timer(*args, **kwargs):
    return _Timer(*args, **kwargs)

class _Timer(Thread):
    """Call a function after a specified number of seconds:
Tim Peters's avatar
Tim Peters committed
519

520 521 522 523
    t = Timer(30.0, f, args=[], kwargs={})
    t.start()
    t.cancel() # stop the timer's action if it's still waiting
    """
Tim Peters's avatar
Tim Peters committed
524

525 526 527 528 529 530 531
    def __init__(self, interval, function, args=[], kwargs={}):
        Thread.__init__(self)
        self.interval = interval
        self.function = function
        self.args = args
        self.kwargs = kwargs
        self.finished = Event()
Tim Peters's avatar
Tim Peters committed
532

533 534 535
    def cancel(self):
        """Stop the timer if it hasn't finished yet"""
        self.finished.set()
Tim Peters's avatar
Tim Peters committed
536

537 538 539 540 541
    def run(self):
        self.finished.wait(self.interval)
        if not self.finished.isSet():
            self.function(*self.args, **self.kwargs)
        self.finished.set()
542 543 544 545 546 547 548 549

# Special thread class to represent the main thread
# This is garbage collected through an exit handler

class _MainThread(Thread):

    def __init__(self):
        Thread.__init__(self, name="MainThread")
550
        self._Thread__started = True
551 552 553
        _active_limbo_lock.acquire()
        _active[_get_ident()] = self
        _active_limbo_lock.release()
554 555
        import atexit
        atexit.register(self.__exitfunc)
556 557

    def _set_daemon(self):
558
        return False
559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587

    def __exitfunc(self):
        self._Thread__stop()
        t = _pickSomeNonDaemonThread()
        if t:
            if __debug__:
                self._note("%s: waiting for other threads", self)
        while t:
            t.join()
            t = _pickSomeNonDaemonThread()
        if __debug__:
            self._note("%s: exiting", self)
        self._Thread__delete()

def _pickSomeNonDaemonThread():
    for t in enumerate():
        if not t.isDaemon() and t.isAlive():
            return t
    return None


# Dummy thread class to represent threads not started here.
# These aren't garbage collected when they die,
# nor can they be waited for.
# Their purpose is to return *something* from currentThread().
# They are marked as daemon threads so we won't wait for them
# when we exit (conform previous semantics).

class _DummyThread(Thread):
Tim Peters's avatar
Tim Peters committed
588

589 590
    def __init__(self):
        Thread.__init__(self, name=_newname("Dummy-%d"))
591
        self._Thread__started = True
592 593 594 595 596
        _active_limbo_lock.acquire()
        _active[_get_ident()] = self
        _active_limbo_lock.release()

    def _set_daemon(self):
597
        return True
598

599
    def join(self, timeout=None):
600
        assert False, "cannot join a dummy thread"
601 602 603 604 605 606 607 608


# Global API functions

def currentThread():
    try:
        return _active[_get_ident()]
    except KeyError:
609
        ##print "currentThread(): no current thread for", _get_ident()
610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658
        return _DummyThread()

def activeCount():
    _active_limbo_lock.acquire()
    count = len(_active) + len(_limbo)
    _active_limbo_lock.release()
    return count

def enumerate():
    _active_limbo_lock.acquire()
    active = _active.values() + _limbo.values()
    _active_limbo_lock.release()
    return active

# Create the main thread object

_MainThread()


# Self-test code

def _test():

    class BoundedQueue(_Verbose):

        def __init__(self, limit):
            _Verbose.__init__(self)
            self.mon = RLock()
            self.rc = Condition(self.mon)
            self.wc = Condition(self.mon)
            self.limit = limit
            self.queue = []

        def put(self, item):
            self.mon.acquire()
            while len(self.queue) >= self.limit:
                self._note("put(%s): queue full", item)
                self.wc.wait()
            self.queue.append(item)
            self._note("put(%s): appended, length now %d",
                       item, len(self.queue))
            self.rc.notify()
            self.mon.release()

        def get(self):
            self.mon.acquire()
            while not self.queue:
                self._note("get(): queue empty")
                self.rc.wait()
659
            item = self.queue.pop(0)
660 661 662 663 664 665 666 667 668 669 670 671 672
            self._note("get(): got %s, %d left", item, len(self.queue))
            self.wc.notify()
            self.mon.release()
            return item

    class ProducerThread(Thread):

        def __init__(self, queue, quota):
            Thread.__init__(self, name="Producer")
            self.queue = queue
            self.quota = quota

        def run(self):
673
            from random import random
674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714
            counter = 0
            while counter < self.quota:
                counter = counter + 1
                self.queue.put("%s.%d" % (self.getName(), counter))
                _sleep(random() * 0.00001)


    class ConsumerThread(Thread):

        def __init__(self, queue, count):
            Thread.__init__(self, name="Consumer")
            self.queue = queue
            self.count = count

        def run(self):
            while self.count > 0:
                item = self.queue.get()
                print item
                self.count = self.count - 1

    NP = 3
    QL = 4
    NI = 5

    Q = BoundedQueue(QL)
    P = []
    for i in range(NP):
        t = ProducerThread(Q, NI)
        t.setName("Producer-%d" % (i+1))
        P.append(t)
    C = ConsumerThread(Q, NI*NP)
    for t in P:
        t.start()
        _sleep(0.000001)
    C.start()
    for t in P:
        t.join()
    C.join()

if __name__ == '__main__':
    _test()