Commit 6a57dd86 authored by Brett Cannon's avatar Brett Cannon

Issue #16803: Have test_importlib.test_locks use frozen and source

code.
parent baced566
from importlib import _bootstrap from . import util
frozen_init, source_init = util.import_importlib('importlib')
frozen_bootstrap = frozen_init._bootstrap
source_bootstrap = source_init._bootstrap
import sys import sys
import time import time
import unittest import unittest
...@@ -13,14 +17,9 @@ except ImportError: ...@@ -13,14 +17,9 @@ except ImportError:
else: else:
from test import lock_tests from test import lock_tests
LockType = _bootstrap._ModuleLock
DeadlockError = _bootstrap._DeadlockError
if threading is not None: if threading is not None:
class ModuleLockAsRLockTests(lock_tests.RLockTests): class ModuleLockAsRLockTests(lock_tests.RLockTests):
locktype = staticmethod(lambda: LockType("some_lock")) locktype = classmethod(lambda cls: cls.LockType("some_lock"))
# _is_owned() unsupported # _is_owned() unsupported
test__is_owned = None test__is_owned = None
...@@ -34,13 +33,21 @@ if threading is not None: ...@@ -34,13 +33,21 @@ if threading is not None:
# _release_save() unsupported # _release_save() unsupported
test_release_save_unacquired = None test_release_save_unacquired = None
class Frozen_ModuleLockAsRLockTests(ModuleLockAsRLockTests, lock_tests.RLockTests):
LockType = frozen_bootstrap._ModuleLock
class Source_ModuleLockAsRLockTests(ModuleLockAsRLockTests, lock_tests.RLockTests):
LockType = source_bootstrap._ModuleLock
else: else:
class ModuleLockAsRLockTests(unittest.TestCase): class Frozen_ModuleLockAsRLockTests(unittest.TestCase):
pass pass
class Source_ModuleLockAsRLockTests(unittest.TestCase):
pass
@unittest.skipUnless(threading, "threads needed for this test")
class DeadlockAvoidanceTests(unittest.TestCase): class DeadlockAvoidanceTests:
def setUp(self): def setUp(self):
try: try:
...@@ -55,7 +62,7 @@ class DeadlockAvoidanceTests(unittest.TestCase): ...@@ -55,7 +62,7 @@ class DeadlockAvoidanceTests(unittest.TestCase):
def run_deadlock_avoidance_test(self, create_deadlock): def run_deadlock_avoidance_test(self, create_deadlock):
NLOCKS = 10 NLOCKS = 10
locks = [LockType(str(i)) for i in range(NLOCKS)] locks = [self.LockType(str(i)) for i in range(NLOCKS)]
pairs = [(locks[i], locks[(i+1)%NLOCKS]) for i in range(NLOCKS)] pairs = [(locks[i], locks[(i+1)%NLOCKS]) for i in range(NLOCKS)]
if create_deadlock: if create_deadlock:
NTHREADS = NLOCKS NTHREADS = NLOCKS
...@@ -67,7 +74,7 @@ class DeadlockAvoidanceTests(unittest.TestCase): ...@@ -67,7 +74,7 @@ class DeadlockAvoidanceTests(unittest.TestCase):
"""Try to acquire the lock. Return True on success, False on deadlock.""" """Try to acquire the lock. Return True on success, False on deadlock."""
try: try:
lock.acquire() lock.acquire()
except DeadlockError: except self.DeadlockError:
return False return False
else: else:
return True return True
...@@ -99,30 +106,50 @@ class DeadlockAvoidanceTests(unittest.TestCase): ...@@ -99,30 +106,50 @@ class DeadlockAvoidanceTests(unittest.TestCase):
self.assertEqual(results.count((True, False)), 0) self.assertEqual(results.count((True, False)), 0)
self.assertEqual(results.count((True, True)), len(results)) self.assertEqual(results.count((True, True)), len(results))
@unittest.skipUnless(threading, "threads needed for this test")
class Frozen_DeadlockAvoidanceTests(DeadlockAvoidanceTests, unittest.TestCase):
LockType = frozen_bootstrap._ModuleLock
DeadlockError = frozen_bootstrap._DeadlockError
@unittest.skipUnless(threading, "threads needed for this test")
class Source_DeadlockAvoidanceTests(DeadlockAvoidanceTests, unittest.TestCase):
LockType = source_bootstrap._ModuleLock
DeadlockError = source_bootstrap._DeadlockError
class LifetimeTests(unittest.TestCase):
class LifetimeTests:
def test_lock_lifetime(self): def test_lock_lifetime(self):
name = "xyzzy" name = "xyzzy"
self.assertNotIn(name, _bootstrap._module_locks) self.assertNotIn(name, self.bootstrap._module_locks)
lock = _bootstrap._get_module_lock(name) lock = self.bootstrap._get_module_lock(name)
self.assertIn(name, _bootstrap._module_locks) self.assertIn(name, self.bootstrap._module_locks)
wr = weakref.ref(lock) wr = weakref.ref(lock)
del lock del lock
support.gc_collect() support.gc_collect()
self.assertNotIn(name, _bootstrap._module_locks) self.assertNotIn(name, self.bootstrap._module_locks)
self.assertIsNone(wr()) self.assertIsNone(wr())
def test_all_locks(self): def test_all_locks(self):
support.gc_collect() support.gc_collect()
self.assertEqual(0, len(_bootstrap._module_locks), _bootstrap._module_locks) self.assertEqual(0, len(self.bootstrap._module_locks),
self.bootstrap._module_locks)
class Frozen_LifetimeTests(LifetimeTests, unittest.TestCase):
bootstrap = frozen_bootstrap
class Source_LifetimeTests(LifetimeTests, unittest.TestCase):
bootstrap = source_bootstrap
@support.reap_threads @support.reap_threads
def test_main(): def test_main():
support.run_unittest(ModuleLockAsRLockTests, support.run_unittest(Frozen_ModuleLockAsRLockTests,
DeadlockAvoidanceTests, Source_ModuleLockAsRLockTests,
LifetimeTests) Frozen_DeadlockAvoidanceTests,
Source_DeadlockAvoidanceTests,
Frozen_LifetimeTests,
Source_LifetimeTests)
if __name__ == '__main__': if __name__ == '__main__':
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment