Commit 8bf08ee4 authored by Victor Stinner's avatar Victor Stinner Committed by GitHub

bpo-37312: Remove _dummy_thread and dummy_threading modules (GH-14143)

Remove _dummy_thread and dummy_threading modules. These modules
were deprecated since Python 3.7 which requires threading support.
parent 2e9954d3
:mod:`_dummy_thread` --- Drop-in replacement for the :mod:`_thread` module
==========================================================================
.. module:: _dummy_thread
:synopsis: Drop-in replacement for the _thread module.
**Source code:** :source:`Lib/_dummy_thread.py`
.. deprecated:: 3.7
Python now always has threading enabled. Please use :mod:`_thread`
(or, better, :mod:`threading`) instead.
--------------
This module provides a duplicate interface to the :mod:`_thread` module.
It was meant to be imported when the :mod:`_thread` module was not provided
on a platform.
Be careful to not use this module where deadlock might occur from a thread being
created that blocks waiting for another thread to be created. This often occurs
with blocking I/O.
......@@ -28,5 +28,3 @@ The following are support modules for some of the above services:
.. toctree::
_thread.rst
_dummy_thread.rst
dummy_threading.rst
:mod:`dummy_threading` --- Drop-in replacement for the :mod:`threading` module
==============================================================================
.. module:: dummy_threading
:synopsis: Drop-in replacement for the threading module.
**Source code:** :source:`Lib/dummy_threading.py`
.. deprecated:: 3.7
Python now always has threading enabled. Please use :mod:`threading` instead.
--------------
This module provides a duplicate interface to the :mod:`threading` module.
It was meant to be imported when the :mod:`_thread` module was not provided
on a platform.
Be careful to not use this module where deadlock might occur from a thread being
created that blocks waiting for another thread to be created. This often occurs
with blocking I/O.
......@@ -112,6 +112,9 @@ Deprecated
Removed
=======
``_dummy_thread`` and ``dummy_threading`` modules have been removed. These
modules were deprecated since Python 3.7 which requires threading support.
(Contributed by Victor Stinner in :issue:`37312`.)
Porting to Python 3.9
......
"""Drop-in replacement for the thread module.
Meant to be used as a brain-dead substitute so that threaded code does
not need to be rewritten for when the thread module is not present.
Suggested usage is::
try:
import _thread
except ImportError:
import _dummy_thread as _thread
"""
# Exports only things specified by thread documentation;
# skipping obsolete synonyms allocate(), start_new(), exit_thread().
__all__ = ['error', 'start_new_thread', 'exit', 'get_ident', 'allocate_lock',
'interrupt_main', 'LockType', 'RLock']
# A dummy value
TIMEOUT_MAX = 2**31
# NOTE: this module can be imported early in the extension building process,
# and so top level imports of other modules should be avoided. Instead, all
# imports are done when needed on a function-by-function basis. Since threads
# are disabled, the import lock should not be an issue anyway (??).
error = RuntimeError
def start_new_thread(function, args, kwargs={}):
"""Dummy implementation of _thread.start_new_thread().
Compatibility is maintained by making sure that ``args`` is a
tuple and ``kwargs`` is a dictionary. If an exception is raised
and it is SystemExit (which can be done by _thread.exit()) it is
caught and nothing is done; all other exceptions are printed out
by using traceback.print_exc().
If the executed function calls interrupt_main the KeyboardInterrupt will be
raised when the function returns.
"""
if type(args) != type(tuple()):
raise TypeError("2nd arg must be a tuple")
if type(kwargs) != type(dict()):
raise TypeError("3rd arg must be a dict")
global _main
_main = False
try:
function(*args, **kwargs)
except SystemExit:
pass
except:
import traceback
traceback.print_exc()
_main = True
global _interrupt
if _interrupt:
_interrupt = False
raise KeyboardInterrupt
def exit():
"""Dummy implementation of _thread.exit()."""
raise SystemExit
def get_ident():
"""Dummy implementation of _thread.get_ident().
Since this module should only be used when _threadmodule is not
available, it is safe to assume that the current process is the
only thread. Thus a constant can be safely returned.
"""
return 1
def allocate_lock():
"""Dummy implementation of _thread.allocate_lock()."""
return LockType()
def stack_size(size=None):
"""Dummy implementation of _thread.stack_size()."""
if size is not None:
raise error("setting thread stack size not supported")
return 0
def _set_sentinel():
"""Dummy implementation of _thread._set_sentinel()."""
return LockType()
class LockType(object):
"""Class implementing dummy implementation of _thread.LockType.
Compatibility is maintained by maintaining self.locked_status
which is a boolean that stores the state of the lock. Pickling of
the lock, though, should not be done since if the _thread module is
then used with an unpickled ``lock()`` from here problems could
occur from this class not having atomic methods.
"""
def __init__(self):
self.locked_status = False
def acquire(self, waitflag=None, timeout=-1):
"""Dummy implementation of acquire().
For blocking calls, self.locked_status is automatically set to
True and returned appropriately based on value of
``waitflag``. If it is non-blocking, then the value is
actually checked and not set if it is already acquired. This
is all done so that threading.Condition's assert statements
aren't triggered and throw a little fit.
"""
if waitflag is None or waitflag:
self.locked_status = True
return True
else:
if not self.locked_status:
self.locked_status = True
return True
else:
if timeout > 0:
import time
time.sleep(timeout)
return False
__enter__ = acquire
def __exit__(self, typ, val, tb):
self.release()
def release(self):
"""Release the dummy lock."""
# XXX Perhaps shouldn't actually bother to test? Could lead
# to problems for complex, threaded code.
if not self.locked_status:
raise error
self.locked_status = False
return True
def locked(self):
return self.locked_status
def __repr__(self):
return "<%s %s.%s object at %s>" % (
"locked" if self.locked_status else "unlocked",
self.__class__.__module__,
self.__class__.__qualname__,
hex(id(self))
)
class RLock(LockType):
"""Dummy implementation of threading._RLock.
Re-entrant lock can be aquired multiple times and needs to be released
just as many times. This dummy implemention does not check wheter the
current thread actually owns the lock, but does accounting on the call
counts.
"""
def __init__(self):
super().__init__()
self._levels = 0
def acquire(self, waitflag=None, timeout=-1):
"""Aquire the lock, can be called multiple times in succession.
"""
locked = super().acquire(waitflag, timeout)
if locked:
self._levels += 1
return locked
def release(self):
"""Release needs to be called once for every call to acquire().
"""
if self._levels == 0:
raise error
if self._levels == 1:
super().release()
self._levels -= 1
# Used to signal that interrupt_main was called in a "thread"
_interrupt = False
# True when not executing in a "thread"
_main = True
def interrupt_main():
"""Set _interrupt flag to True to have start_new_thread raise
KeyboardInterrupt upon exiting."""
if _main:
raise KeyboardInterrupt
else:
global _interrupt
_interrupt = True
def _is_main_interpreter():
return True
"""Faux ``threading`` version using ``dummy_thread`` instead of ``thread``.
The module ``_dummy_threading`` is added to ``sys.modules`` in order
to not have ``threading`` considered imported. Had ``threading`` been
directly imported it would have made all subsequent imports succeed
regardless of whether ``_thread`` was available which is not desired.
"""
from sys import modules as sys_modules
import _dummy_thread
# Declaring now so as to not have to nest ``try``s to get proper clean-up.
holding_thread = False
holding_threading = False
holding__threading_local = False
try:
# Could have checked if ``_thread`` was not in sys.modules and gone
# a different route, but decided to mirror technique used with
# ``threading`` below.
if '_thread' in sys_modules:
held_thread = sys_modules['_thread']
holding_thread = True
# Must have some module named ``_thread`` that implements its API
# in order to initially import ``threading``.
sys_modules['_thread'] = sys_modules['_dummy_thread']
if 'threading' in sys_modules:
# If ``threading`` is already imported, might as well prevent
# trying to import it more than needed by saving it if it is
# already imported before deleting it.
held_threading = sys_modules['threading']
holding_threading = True
del sys_modules['threading']
if '_threading_local' in sys_modules:
# If ``_threading_local`` is already imported, might as well prevent
# trying to import it more than needed by saving it if it is
# already imported before deleting it.
held__threading_local = sys_modules['_threading_local']
holding__threading_local = True
del sys_modules['_threading_local']
import threading
# Need a copy of the code kept somewhere...
sys_modules['_dummy_threading'] = sys_modules['threading']
del sys_modules['threading']
sys_modules['_dummy__threading_local'] = sys_modules['_threading_local']
del sys_modules['_threading_local']
from _dummy_threading import *
from _dummy_threading import __all__
finally:
# Put back ``threading`` if we overwrote earlier
if holding_threading:
sys_modules['threading'] = held_threading
del held_threading
del holding_threading
# Put back ``_threading_local`` if we overwrote earlier
if holding__threading_local:
sys_modules['_threading_local'] = held__threading_local
del held__threading_local
del holding__threading_local
# Put back ``thread`` if we overwrote, else del the entry we made
if holding_thread:
sys_modules['_thread'] = held_thread
del held_thread
else:
del sys_modules['_thread']
del holding_thread
del _dummy_thread
del sys_modules
import _dummy_thread as _thread
import time
import queue
import random
import unittest
from test import support
from unittest import mock
DELAY = 0
class LockTests(unittest.TestCase):
"""Test lock objects."""
def setUp(self):
# Create a lock
self.lock = _thread.allocate_lock()
def test_initlock(self):
#Make sure locks start locked
self.assertFalse(self.lock.locked(),
"Lock object is not initialized unlocked.")
def test_release(self):
# Test self.lock.release()
self.lock.acquire()
self.lock.release()
self.assertFalse(self.lock.locked(),
"Lock object did not release properly.")
def test_LockType_context_manager(self):
with _thread.LockType():
pass
self.assertFalse(self.lock.locked(),
"Acquired Lock was not released")
def test_improper_release(self):
#Make sure release of an unlocked thread raises RuntimeError
self.assertRaises(RuntimeError, self.lock.release)
def test_cond_acquire_success(self):
#Make sure the conditional acquiring of the lock works.
self.assertTrue(self.lock.acquire(0),
"Conditional acquiring of the lock failed.")
def test_cond_acquire_fail(self):
#Test acquiring locked lock returns False
self.lock.acquire(0)
self.assertFalse(self.lock.acquire(0),
"Conditional acquiring of a locked lock incorrectly "
"succeeded.")
def test_uncond_acquire_success(self):
#Make sure unconditional acquiring of a lock works.
self.lock.acquire()
self.assertTrue(self.lock.locked(),
"Uncondional locking failed.")
def test_uncond_acquire_return_val(self):
#Make sure that an unconditional locking returns True.
self.assertIs(self.lock.acquire(1), True,
"Unconditional locking did not return True.")
self.assertIs(self.lock.acquire(), True)
def test_uncond_acquire_blocking(self):
#Make sure that unconditional acquiring of a locked lock blocks.
def delay_unlock(to_unlock, delay):
"""Hold on to lock for a set amount of time before unlocking."""
time.sleep(delay)
to_unlock.release()
self.lock.acquire()
start_time = int(time.monotonic())
_thread.start_new_thread(delay_unlock,(self.lock, DELAY))
if support.verbose:
print()
print("*** Waiting for thread to release the lock "\
"(approx. %s sec.) ***" % DELAY)
self.lock.acquire()
end_time = int(time.monotonic())
if support.verbose:
print("done")
self.assertGreaterEqual(end_time - start_time, DELAY,
"Blocking by unconditional acquiring failed.")
@mock.patch('time.sleep')
def test_acquire_timeout(self, mock_sleep):
"""Test invoking acquire() with a positive timeout when the lock is
already acquired. Ensure that time.sleep() is invoked with the given
timeout and that False is returned."""
self.lock.acquire()
retval = self.lock.acquire(waitflag=0, timeout=1)
self.assertTrue(mock_sleep.called)
mock_sleep.assert_called_once_with(1)
self.assertEqual(retval, False)
def test_lock_representation(self):
self.lock.acquire()
self.assertIn("locked", repr(self.lock))
self.lock.release()
self.assertIn("unlocked", repr(self.lock))
class RLockTests(unittest.TestCase):
"""Test dummy RLock objects."""
def setUp(self):
self.rlock = _thread.RLock()
def test_multiple_acquire(self):
self.assertIn("unlocked", repr(self.rlock))
self.rlock.acquire()
self.rlock.acquire()
self.assertIn("locked", repr(self.rlock))
self.rlock.release()
self.assertIn("locked", repr(self.rlock))
self.rlock.release()
self.assertIn("unlocked", repr(self.rlock))
self.assertRaises(RuntimeError, self.rlock.release)
class MiscTests(unittest.TestCase):
"""Miscellaneous tests."""
def test_exit(self):
self.assertRaises(SystemExit, _thread.exit)
def test_ident(self):
self.assertIsInstance(_thread.get_ident(), int,
"_thread.get_ident() returned a non-integer")
self.assertGreater(_thread.get_ident(), 0)
def test_LockType(self):
self.assertIsInstance(_thread.allocate_lock(), _thread.LockType,
"_thread.LockType is not an instance of what "
"is returned by _thread.allocate_lock()")
def test_set_sentinel(self):
self.assertIsInstance(_thread._set_sentinel(), _thread.LockType,
"_thread._set_sentinel() did not return a "
"LockType instance.")
def test_interrupt_main(self):
#Calling start_new_thread with a function that executes interrupt_main
# should raise KeyboardInterrupt upon completion.
def call_interrupt():
_thread.interrupt_main()
self.assertRaises(KeyboardInterrupt,
_thread.start_new_thread,
call_interrupt,
tuple())
def test_interrupt_in_main(self):
self.assertRaises(KeyboardInterrupt, _thread.interrupt_main)
def test_stack_size_None(self):
retval = _thread.stack_size(None)
self.assertEqual(retval, 0)
def test_stack_size_not_None(self):
with self.assertRaises(_thread.error) as cm:
_thread.stack_size("")
self.assertEqual(cm.exception.args[0],
"setting thread stack size not supported")
class ThreadTests(unittest.TestCase):
"""Test thread creation."""
def test_arg_passing(self):
#Make sure that parameter passing works.
def arg_tester(queue, arg1=False, arg2=False):
"""Use to test _thread.start_new_thread() passes args properly."""
queue.put((arg1, arg2))
testing_queue = queue.Queue(1)
_thread.start_new_thread(arg_tester, (testing_queue, True, True))
result = testing_queue.get()
self.assertTrue(result[0] and result[1],
"Argument passing for thread creation "
"using tuple failed")
_thread.start_new_thread(
arg_tester,
tuple(),
{'queue':testing_queue, 'arg1':True, 'arg2':True})
result = testing_queue.get()
self.assertTrue(result[0] and result[1],
"Argument passing for thread creation "
"using kwargs failed")
_thread.start_new_thread(
arg_tester,
(testing_queue, True),
{'arg2':True})
result = testing_queue.get()
self.assertTrue(result[0] and result[1],
"Argument passing for thread creation using both tuple"
" and kwargs failed")
def test_multi_thread_creation(self):
def queue_mark(queue, delay):
time.sleep(delay)
queue.put(_thread.get_ident())
thread_count = 5
testing_queue = queue.Queue(thread_count)
if support.verbose:
print()
print("*** Testing multiple thread creation "
"(will take approx. %s to %s sec.) ***" % (
DELAY, thread_count))
for count in range(thread_count):
if DELAY:
local_delay = round(random.random(), 1)
else:
local_delay = 0
_thread.start_new_thread(queue_mark,
(testing_queue, local_delay))
time.sleep(DELAY)
if support.verbose:
print('done')
self.assertEqual(testing_queue.qsize(), thread_count,
"Not all %s threads executed properly "
"after %s sec." % (thread_count, DELAY))
def test_args_not_tuple(self):
"""
Test invoking start_new_thread() with a non-tuple value for "args".
Expect TypeError with a meaningful error message to be raised.
"""
with self.assertRaises(TypeError) as cm:
_thread.start_new_thread(mock.Mock(), [])
self.assertEqual(cm.exception.args[0], "2nd arg must be a tuple")
def test_kwargs_not_dict(self):
"""
Test invoking start_new_thread() with a non-dict value for "kwargs".
Expect TypeError with a meaningful error message to be raised.
"""
with self.assertRaises(TypeError) as cm:
_thread.start_new_thread(mock.Mock(), tuple(), kwargs=[])
self.assertEqual(cm.exception.args[0], "3rd arg must be a dict")
def test_SystemExit(self):
"""
Test invoking start_new_thread() with a function that raises
SystemExit.
The exception should be discarded.
"""
func = mock.Mock(side_effect=SystemExit())
try:
_thread.start_new_thread(func, tuple())
except SystemExit:
self.fail("start_new_thread raised SystemExit.")
@mock.patch('traceback.print_exc')
def test_RaiseException(self, mock_print_exc):
"""
Test invoking start_new_thread() with a function that raises exception.
The exception should be discarded and the traceback should be printed
via traceback.print_exc()
"""
func = mock.Mock(side_effect=Exception)
_thread.start_new_thread(func, tuple())
self.assertTrue(mock_print_exc.called)
if __name__ == '__main__':
unittest.main()
from test import support
import unittest
import dummy_threading as _threading
import time
class DummyThreadingTestCase(unittest.TestCase):
class TestThread(_threading.Thread):
def run(self):
global running
global sema
global mutex
# Uncomment if testing another module, such as the real 'threading'
# module.
#delay = random.random() * 2
delay = 0
if support.verbose:
print('task', self.name, 'will run for', delay, 'sec')
sema.acquire()
mutex.acquire()
running += 1
if support.verbose:
print(running, 'tasks are running')
mutex.release()
time.sleep(delay)
if support.verbose:
print('task', self.name, 'done')
mutex.acquire()
running -= 1
if support.verbose:
print(self.name, 'is finished.', running, 'tasks are running')
mutex.release()
sema.release()
def setUp(self):
self.numtasks = 10
global sema
sema = _threading.BoundedSemaphore(value=3)
global mutex
mutex = _threading.RLock()
global running
running = 0
self.threads = []
def test_tasks(self):
for i in range(self.numtasks):
t = self.TestThread(name="<thread %d>"%i)
self.threads.append(t)
t.start()
if support.verbose:
print('waiting for all tasks to complete')
for t in self.threads:
t.join()
if support.verbose:
print('all tasks done')
if __name__ == '__main__':
unittest.main()
``_dummy_thread`` and ``dummy_threading`` modules have been removed. These
modules were deprecated since Python 3.7 which requires threading support.
......@@ -250,7 +250,6 @@
<Compile Include="distutils\_msvccompiler.py" />
<Compile Include="distutils\__init__.py" />
<Compile Include="doctest.py" />
<Compile Include="dummy_threading.py" />
<Compile Include="email\base64mime.py" />
<Compile Include="email\charset.py" />
<Compile Include="email\contentmanager.py" />
......@@ -976,8 +975,6 @@
<Compile Include="test\test_doctest2.py" />
<Compile Include="test\test_docxmlrpc.py" />
<Compile Include="test\test_dtrace.py" />
<Compile Include="test\test_dummy_thread.py" />
<Compile Include="test\test_dummy_threading.py" />
<Compile Include="test\test_dynamic.py" />
<Compile Include="test\test_dynamicclassattribute.py" />
<Compile Include="test\test_eintr.py" />
......@@ -1564,7 +1561,6 @@
<Compile Include="_collections_abc.py" />
<Compile Include="_compat_pickle.py" />
<Compile Include="_compression.py" />
<Compile Include="_dummy_thread.py" />
<Compile Include="_markupbase.py" />
<Compile Include="_osx_support.py" />
<Compile Include="_pydecimal.py" />
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment