Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
C
cpython
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
Analytics
Analytics
Repository
Value Stream
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Commits
Issue Boards
Open sidebar
Kirill Smelkov
cpython
Commits
32856669
Commit
32856669
authored
Mar 14, 2016
by
Serhiy Storchaka
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Issue #20556: Used specific assert methods in threading tests.
parent
c04c28a7
Changes
3
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
22 additions
and
23 deletions
+22
-23
Lib/test/test_dummy_thread.py
Lib/test/test_dummy_thread.py
+8
-8
Lib/test/test_threading.py
Lib/test/test_threading.py
+13
-14
Lib/test/test_threading_local.py
Lib/test/test_threading_local.py
+1
-1
No files found.
Lib/test/test_dummy_thread.py
View file @
32856669
...
...
@@ -24,14 +24,14 @@ class LockTests(unittest.TestCase):
def
test_initlock
(
self
):
#Make sure locks start locked
self
.
assert
True
(
not
self
.
lock
.
locked
(),
self
.
assert
False
(
self
.
lock
.
locked
(),
"Lock object is not initialized unlocked."
)
def
test_release
(
self
):
# Test self.lock.release()
self
.
lock
.
acquire
()
self
.
lock
.
release
()
self
.
assert
True
(
not
self
.
lock
.
locked
(),
self
.
assert
False
(
self
.
lock
.
locked
(),
"Lock object did not release properly."
)
def
test_improper_release
(
self
):
...
...
@@ -46,7 +46,7 @@ class LockTests(unittest.TestCase):
def
test_cond_acquire_fail
(
self
):
#Test acquiring locked lock returns False
self
.
lock
.
acquire
(
0
)
self
.
assert
True
(
not
self
.
lock
.
acquire
(
0
),
self
.
assert
False
(
self
.
lock
.
acquire
(
0
),
"Conditional acquiring of a locked lock incorrectly "
"succeeded."
)
...
...
@@ -58,9 +58,9 @@ class LockTests(unittest.TestCase):
def
test_uncond_acquire_return_val
(
self
):
#Make sure that an unconditional locking returns True.
self
.
assert
True
(
self
.
lock
.
acquire
(
1
)
is
True
,
self
.
assert
Is
(
self
.
lock
.
acquire
(
1
),
True
,
"Unconditional locking did not return True."
)
self
.
assert
True
(
self
.
lock
.
acquire
()
is
True
)
self
.
assert
Is
(
self
.
lock
.
acquire
(),
True
)
def
test_uncond_acquire_blocking
(
self
):
#Make sure that unconditional acquiring of a locked lock blocks.
...
...
@@ -80,7 +80,7 @@ class LockTests(unittest.TestCase):
end_time
=
int
(
time
.
time
())
if
support
.
verbose
:
print
(
"done"
)
self
.
assert
True
((
end_time
-
start_time
)
>=
DELAY
,
self
.
assert
GreaterEqual
(
end_time
-
start_time
,
DELAY
,
"Blocking by unconditional acquiring failed."
)
class
MiscTests
(
unittest
.
TestCase
):
...
...
@@ -94,7 +94,7 @@ class MiscTests(unittest.TestCase):
#Test sanity of _thread.get_ident()
self
.
assertIsInstance
(
_thread
.
get_ident
(),
int
,
"_thread.get_ident() returned a non-integer"
)
self
.
assert
True
(
_thread
.
get_ident
()
!=
0
,
self
.
assert
NotEqual
(
_thread
.
get_ident
(),
0
,
"_thread.get_ident() returned 0"
)
def
test_LockType
(
self
):
...
...
@@ -164,7 +164,7 @@ class ThreadTests(unittest.TestCase):
time
.
sleep
(
DELAY
)
if
support
.
verbose
:
print
(
'done'
)
self
.
assert
True
(
testing_queue
.
qsize
()
==
thread_count
,
self
.
assert
Equal
(
testing_queue
.
qsize
(),
thread_count
,
"Not all %s threads executed properly after %s sec."
%
(
thread_count
,
DELAY
))
...
...
Lib/test/test_threading.py
View file @
32856669
...
...
@@ -58,7 +58,7 @@ class TestThread(threading.Thread):
self
.
nrunning
.
inc
()
if
verbose
:
print
(
self
.
nrunning
.
get
(),
'tasks are running'
)
self
.
testcase
.
assert
True
(
self
.
nrunning
.
get
()
<=
3
)
self
.
testcase
.
assert
LessEqual
(
self
.
nrunning
.
get
(),
3
)
time
.
sleep
(
delay
)
if
verbose
:
...
...
@@ -66,7 +66,7 @@ class TestThread(threading.Thread):
with
self
.
mutex
:
self
.
nrunning
.
dec
()
self
.
testcase
.
assert
True
(
self
.
nrunning
.
get
()
>=
0
)
self
.
testcase
.
assert
GreaterEqual
(
self
.
nrunning
.
get
(),
0
)
if
verbose
:
print
(
'%s is finished. %d tasks are running'
%
(
self
.
name
,
self
.
nrunning
.
get
()))
...
...
@@ -100,26 +100,25 @@ class ThreadTests(BaseTestCase):
for
i
in
range
(
NUMTASKS
):
t
=
TestThread
(
"<thread %d>"
%
i
,
self
,
sema
,
mutex
,
numrunning
)
threads
.
append
(
t
)
self
.
assert
Equal
(
t
.
ident
,
None
)
self
.
assert
True
(
re
.
match
(
'<TestThread
\
(.*, i
n
itial
\
)>
'
, repr(t))
)
self
.
assert
IsNone
(
t
.
ident
)
self
.
assert
Regex
(
repr
(
t
),
r'^<TestThread\
(.*, i
nitial\
)>$
'
)
t.start()
if verbose:
print('
waiting
for
all
tasks
to
complete
')
for t in threads:
t.join()
self.assert
True(not
t.is_alive())
self.assert
False(
t.is_alive())
self.assertNotEqual(t.ident, 0)
self.assertFalse(t.ident is None)
self.assertTrue(re.match('
<
TestThread
\
(.
*
,
stopped
-
?
\
d
+
\
)
>
',
repr(t)))
self.assertIsNotNone(t.ident)
self.assertRegex(repr(t), r'
^<
TestThread
\
(.
*
,
stopped
-
?
\
d
+
\
)
>
$
')
if verbose:
print('
all
tasks
done
')
self.assertEqual(numrunning.get(), 0)
def test_ident_of_no_threading_threads(self):
# The ident still must work for the main thread and dummy threads.
self.assert
False(threading.currentThread().ident is None
)
self.assert
IsNotNone(threading.currentThread().ident
)
def f():
ident.append(threading.currentThread().ident)
done.set()
...
...
@@ -127,7 +126,7 @@ class ThreadTests(BaseTestCase):
ident = []
_thread.start_new_thread(f, ())
done.wait()
self.assert
False(ident[0] is None
)
self.assert
IsNotNone(ident[0]
)
# Kill the "immortal" _DummyThread
del threading._active[ident[0]]
...
...
@@ -244,7 +243,7 @@ class ThreadTests(BaseTestCase):
self
.
assertTrue
(
ret
)
if
verbose
:
print
(
" verifying worker hasn't exited"
)
self
.
assert
True
(
not
t
.
finished
)
self
.
assert
False
(
t
.
finished
)
if
verbose
:
print
(
" attempting to raise asynch exception in worker"
)
result
=
set_async_exc
(
ctypes
.
c_long
(
t
.
id
),
exception
)
...
...
@@ -415,9 +414,9 @@ class ThreadTests(BaseTestCase):
def
test_repr_daemon
(
self
):
t
=
threading
.
Thread
()
self
.
assert
False
(
'daemon'
in
repr
(
t
))
self
.
assert
NotIn
(
'daemon'
,
repr
(
t
))
t
.
daemon
=
True
self
.
assert
True
(
'daemon'
in
repr
(
t
))
self
.
assert
In
(
'daemon'
,
repr
(
t
))
def
test_deamon_param
(
self
):
t
=
threading
.
Thread
()
...
...
@@ -569,7 +568,7 @@ class ThreadTests(BaseTestCase):
tstate_lock
.
release
()
self
.
assertFalse
(
t
.
is_alive
())
# And verify the thread disposed of _tstate_lock.
self
.
assert
True
(
t
.
_tstate_lock
is
None
)
self
.
assert
IsNone
(
t
.
_tstate_lock
)
def
test_repr_stopped
(
self
):
# Verify that "stopped" shows up in repr(Thread) appropriately.
...
...
Lib/test/test_threading_local.py
View file @
32856669
...
...
@@ -189,7 +189,7 @@ class BaseLocalTest:
wr
=
weakref
.
ref
(
x
)
del
x
gc
.
collect
()
self
.
assertIs
(
wr
(),
None
)
self
.
assertIs
None
(
wr
()
)
class
ThreadLocalTest
(
unittest
.
TestCase
,
BaseLocalTest
):
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment