Commit f95dd1da authored by Jason Madden's avatar Jason Madden

Replace deprecated TestCase methods.

parent e711bce4
......@@ -84,7 +84,7 @@ class TestThread(threading.Thread):
self.nrunning.inc()
if verbose:
print(self.nrunning.get(), 'tasks are running')
self.testcase.assert_(self.nrunning.get() <= 3)
self.testcase.assertLessEqual(self.nrunning.get(), 3)
time.sleep(delay)
if verbose:
......@@ -92,7 +92,7 @@ class TestThread(threading.Thread):
with self.mutex:
self.nrunning.dec()
self.testcase.assert_(self.nrunning.get() >= 0)
self.testcase.assertGreaterEqual(self.nrunning.get(), 0)
if verbose:
print('%s is finished. %d tasks are running' % (
self.name, self.nrunning.get()))
......@@ -119,20 +119,20 @@ class ThreadTests(unittest.TestCase):
threads.append(t)
t.daemon = False # Under PYPY we get daemon by default?
if hasattr(t, 'ident'):
self.failUnlessEqual(t.ident, None)
self.assertIsNone(t.ident)
self.assertFalse(t.daemon)
self.assert_(re.match(r'<TestThread\(.*, initial\)>', repr(t)))
self.assertTrue(re.match(r'<TestThread\(.*, initial\)>', repr(t)))
t.start()
if verbose:
print('waiting for all tasks to complete')
for t in threads:
t.join(NUMTASKS)
self.assert_(not t.is_alive())
self.assertFalse(t.is_alive())
if hasattr(t, 'ident'):
self.failIfEqual(t.ident, 0)
self.assertNotEqual(t.ident, 0)
self.assertFalse(t.ident is None)
self.assert_(re.match(r'<TestThread\(.*, \w+ -?\d+\)>', repr(t)))
self.assertTrue(re.match(r'<TestThread\(.*, \w+ -?\d+\)>', repr(t)))
if verbose:
print('all tasks done')
self.assertEqual(numrunning.get(), 0)
......@@ -200,9 +200,9 @@ class ThreadTests(unittest.TestCase):
tid = thread.start_new_thread(f, (mutex,))
# Wait for the thread to finish.
mutex.acquire()
self.assert_(tid in threading._active)
self.assert_(isinstance(threading._active[tid],
threading._DummyThread))
self.assertIn(tid, threading._active)
self.assertIsInstance(threading._active[tid],
threading._DummyThread)
del threading._active[tid]
# in gevent, we actually clean up threading._active, but it's not happended there yet
......@@ -261,7 +261,7 @@ class ThreadTests(unittest.TestCase):
worker_started.wait()
if verbose:
print(" verifying worker hasn't exited")
self.assert_(not t.finished)
self.assertFalse(t.finished)
if verbose:
print(" attempting to raise asynch exception in worker")
result = set_async_exc(ctypes.c_long(t.id), exception)
......@@ -269,7 +269,7 @@ class ThreadTests(unittest.TestCase):
if verbose:
print(" waiting for worker to say it caught the exception")
worker_saw_exception.wait(timeout=10)
self.assert_(t.finished)
self.assertTrue(t.finished)
if verbose:
print(" all OK -- joining worker")
if t.finished:
......@@ -357,8 +357,8 @@ class ThreadTests(unittest.TestCase):
threading.Thread(target=child).start()
raise SystemExit
""" % setup_4],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
stdout, stderr = p.communicate()
stdout = stdout.strip()
stdout = stdout.decode('utf-8')
......@@ -382,7 +382,7 @@ class ThreadTests(unittest.TestCase):
t.join()
l = enum()
self.assertFalse(t in l,
"#1703448 triggered after %d trials: %s" % (i, l))
"#1703448 triggered after %d trials: %s" % (i, l))
finally:
sys.setcheckinterval(old_interval)
......@@ -406,7 +406,7 @@ class ThreadTests(unittest.TestCase):
weak_cyclic_object = weakref.ref(cyclic_object)
cyclic_object.thread.join()
del cyclic_object
self.assertEquals(None, weak_cyclic_object(),
self.assertIsNone(weak_cyclic_object(),
msg=('%d references still around' %
sys.getrefcount(weak_cyclic_object())))
......@@ -414,7 +414,7 @@ class ThreadTests(unittest.TestCase):
weak_raising_cyclic_object = weakref.ref(raising_cyclic_object)
raising_cyclic_object.thread.join()
del raising_cyclic_object
self.assertEquals(None, weak_raising_cyclic_object(),
self.assertIsNone(weak_raising_cyclic_object(),
msg=('%d references still around' %
sys.getrefcount(weak_raising_cyclic_object())))
......@@ -436,8 +436,8 @@ class ThreadJoinOnShutdown(unittest.TestCase):
rc = p.wait()
data = p.stdout.read().replace(b'\r', b'')
self.assertEqual(data, b"end of main\nend of thread\n")
self.failIf(rc == 2, b"interpreter was blocked")
self.failUnless(rc == 0, b"Unexpected error")
self.assertNotEqual(rc, 2, b"interpreter was blocked")
self.assertEqual(rc, 0, b"Unexpected error")
def test_1_join_on_shutdown(self):
# The usual case: on exit, wait for a non-daemon thread
......@@ -566,14 +566,15 @@ class BoundedSemaphoreTests(lock_tests.BoundedSemaphoreTests):
def main():
support.run_unittest(LockTests, RLockTests, EventTests,
ConditionAsRLockTests, ConditionTests,
SemaphoreTests, BoundedSemaphoreTests,
ThreadTests,
ThreadJoinOnShutdown,
ThreadingExceptionTests,
NativeRLockTests,
)
support.run_unittest(
LockTests, RLockTests, EventTests,
ConditionAsRLockTests, ConditionTests,
SemaphoreTests, BoundedSemaphoreTests,
ThreadTests,
ThreadJoinOnShutdown,
ThreadingExceptionTests,
NativeRLockTests,
)
if __name__ == "__main__":
main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment