Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
G
gevent
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
Analytics
Analytics
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Commits
Issue Boards
Open sidebar
Kirill Smelkov
gevent
Commits
4d5e8021
Commit
4d5e8021
authored
Jun 21, 2010
by
Denis Bilenko
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
greentest: add test_threading_2.py and lock_tests.py
parent
3755d037
Changes
2
Show whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
1095 additions
and
0 deletions
+1095
-0
greentest/lock_tests.py
greentest/lock_tests.py
+551
-0
greentest/test_threading_2.py
greentest/test_threading_2.py
+544
-0
No files found.
greentest/lock_tests.py
0 → 100644
View file @
4d5e8021
"""
Various tests for synchronization primitives.
"""
from
__future__
import
with_statement
import
sys
import
time
from
thread
import
start_new_thread
,
get_ident
import
threading
import
unittest
from
test
import
test_support
as
support
def
_wait
():
# A crude wait/yield function not relying on synchronization primitives.
time
.
sleep
(
0.01
)
class
Bunch
(
object
):
"""
A bunch of threads.
"""
def
__init__
(
self
,
f
,
n
,
wait_before_exit
=
False
):
"""
Construct a bunch of `n` threads running the same function `f`.
If `wait_before_exit` is True, the threads won't terminate until
do_finish() is called.
"""
self
.
f
=
f
self
.
n
=
n
self
.
started
=
[]
self
.
finished
=
[]
self
.
_can_exit
=
not
wait_before_exit
def
task
():
tid
=
get_ident
()
self
.
started
.
append
(
tid
)
try
:
f
()
finally
:
self
.
finished
.
append
(
tid
)
while
not
self
.
_can_exit
:
_wait
()
for
i
in
range
(
n
):
start_new_thread
(
task
,
())
def
wait_for_started
(
self
):
while
len
(
self
.
started
)
<
self
.
n
:
_wait
()
def
wait_for_finished
(
self
):
while
len
(
self
.
finished
)
<
self
.
n
:
_wait
()
def
do_finish
(
self
):
self
.
_can_exit
=
True
class
BaseTestCase
(
unittest
.
TestCase
):
def
setUp
(
self
):
self
.
_threads
=
support
.
threading_setup
()
def
tearDown
(
self
):
support
.
threading_cleanup
(
*
self
.
_threads
)
support
.
reap_children
()
class
BaseLockTests
(
BaseTestCase
):
"""
Tests for both recursive and non-recursive locks.
"""
def
test_constructor
(
self
):
lock
=
self
.
locktype
()
del
lock
def
test_acquire_destroy
(
self
):
lock
=
self
.
locktype
()
lock
.
acquire
()
del
lock
def
test_acquire_release
(
self
):
lock
=
self
.
locktype
()
lock
.
acquire
()
lock
.
release
()
del
lock
def
test_try_acquire
(
self
):
lock
=
self
.
locktype
()
self
.
assertTrue
(
lock
.
acquire
(
False
))
lock
.
release
()
def
test_try_acquire_contended
(
self
):
lock
=
self
.
locktype
()
lock
.
acquire
()
result
=
[]
def
f
():
result
.
append
(
lock
.
acquire
(
False
))
Bunch
(
f
,
1
).
wait_for_finished
()
self
.
assertFalse
(
result
[
0
])
lock
.
release
()
def
test_acquire_contended
(
self
):
lock
=
self
.
locktype
()
lock
.
acquire
()
N
=
5
def
f
():
lock
.
acquire
()
lock
.
release
()
b
=
Bunch
(
f
,
N
)
b
.
wait_for_started
()
_wait
()
self
.
assertEqual
(
len
(
b
.
finished
),
0
)
lock
.
release
()
b
.
wait_for_finished
()
self
.
assertEqual
(
len
(
b
.
finished
),
N
)
def
test_with
(
self
):
lock
=
self
.
locktype
()
def
f
():
lock
.
acquire
()
lock
.
release
()
def
_with
(
err
=
None
):
with
lock
:
if
err
is
not
None
:
raise
err
_with
()
# Check the lock is unacquired
Bunch
(
f
,
1
).
wait_for_finished
()
self
.
assertRaises
(
TypeError
,
_with
,
TypeError
)
# Check the lock is unacquired
Bunch
(
f
,
1
).
wait_for_finished
()
def
test_thread_leak
(
self
):
# The lock shouldn't leak a Thread instance when used from a foreign
# (non-threading) thread.
lock
=
self
.
locktype
()
def
f
():
lock
.
acquire
()
lock
.
release
()
n
=
len
(
threading
.
enumerate
())
# We run many threads in the hope that existing threads ids won't
# be recycled.
Bunch
(
f
,
15
).
wait_for_finished
()
self
.
assertEqual
(
n
,
len
(
threading
.
enumerate
()))
class
LockTests
(
BaseLockTests
):
"""
Tests for non-recursive, weak locks
(which can be acquired and released from different threads).
"""
def
test_reacquire
(
self
):
# Lock needs to be released before re-acquiring.
lock
=
self
.
locktype
()
phase
=
[]
def
f
():
lock
.
acquire
()
phase
.
append
(
None
)
lock
.
acquire
()
phase
.
append
(
None
)
start_new_thread
(
f
,
())
while
len
(
phase
)
==
0
:
_wait
()
_wait
()
self
.
assertEqual
(
len
(
phase
),
1
)
lock
.
release
()
while
len
(
phase
)
==
1
:
_wait
()
self
.
assertEqual
(
len
(
phase
),
2
)
def
test_different_thread
(
self
):
# Lock can be released from a different thread.
lock
=
self
.
locktype
()
lock
.
acquire
()
def
f
():
lock
.
release
()
b
=
Bunch
(
f
,
1
)
b
.
wait_for_finished
()
lock
.
acquire
()
lock
.
release
()
class
RLockTests
(
BaseLockTests
):
"""
Tests for recursive locks.
"""
def
test_reacquire
(
self
):
lock
=
self
.
locktype
()
lock
.
acquire
()
lock
.
acquire
()
lock
.
release
()
lock
.
acquire
()
lock
.
release
()
lock
.
release
()
def
test_release_unacquired
(
self
):
# Cannot release an unacquired lock
lock
=
self
.
locktype
()
self
.
assertRaises
(
RuntimeError
,
lock
.
release
)
lock
.
acquire
()
lock
.
acquire
()
lock
.
release
()
lock
.
acquire
()
lock
.
release
()
lock
.
release
()
self
.
assertRaises
(
RuntimeError
,
lock
.
release
)
def
test_different_thread
(
self
):
# Cannot release from a different thread
lock
=
self
.
locktype
()
def
f
():
lock
.
acquire
()
b
=
Bunch
(
f
,
1
,
True
)
try
:
self
.
assertRaises
(
RuntimeError
,
lock
.
release
)
finally
:
b
.
do_finish
()
def
test__is_owned
(
self
):
lock
=
self
.
locktype
()
self
.
assertFalse
(
lock
.
_is_owned
())
lock
.
acquire
()
self
.
assertTrue
(
lock
.
_is_owned
())
lock
.
acquire
()
self
.
assertTrue
(
lock
.
_is_owned
())
result
=
[]
def
f
():
result
.
append
(
lock
.
_is_owned
())
Bunch
(
f
,
1
).
wait_for_finished
()
self
.
assertFalse
(
result
[
0
])
lock
.
release
()
self
.
assertTrue
(
lock
.
_is_owned
())
lock
.
release
()
self
.
assertFalse
(
lock
.
_is_owned
())
class
EventTests
(
BaseTestCase
):
"""
Tests for Event objects.
"""
def
test_is_set
(
self
):
evt
=
self
.
eventtype
()
self
.
assertFalse
(
evt
.
is_set
())
evt
.
set
()
self
.
assertTrue
(
evt
.
is_set
())
evt
.
set
()
self
.
assertTrue
(
evt
.
is_set
())
evt
.
clear
()
self
.
assertFalse
(
evt
.
is_set
())
evt
.
clear
()
self
.
assertFalse
(
evt
.
is_set
())
def
_check_notify
(
self
,
evt
):
# All threads get notified
N
=
5
results1
=
[]
results2
=
[]
def
f
():
evt
.
wait
()
results1
.
append
(
evt
.
is_set
())
evt
.
wait
()
results2
.
append
(
evt
.
is_set
())
b
=
Bunch
(
f
,
N
)
b
.
wait_for_started
()
_wait
()
self
.
assertEqual
(
len
(
results1
),
0
)
evt
.
set
()
b
.
wait_for_finished
()
self
.
assertEqual
(
results1
,
[
True
]
*
N
)
self
.
assertEqual
(
results2
,
[
True
]
*
N
)
def
test_notify
(
self
):
evt
=
self
.
eventtype
()
self
.
_check_notify
(
evt
)
# Another time, after an explicit clear()
evt
.
set
()
evt
.
clear
()
self
.
_check_notify
(
evt
)
def
test_timeout
(
self
):
evt
=
self
.
eventtype
()
results1
=
[]
results2
=
[]
N
=
5
def
f
():
evt
.
wait
(
0.0
)
results1
.
append
(
evt
.
is_set
())
t1
=
time
.
time
()
evt
.
wait
(
0.2
)
r
=
evt
.
is_set
()
t2
=
time
.
time
()
results2
.
append
((
r
,
t2
-
t1
))
Bunch
(
f
,
N
).
wait_for_finished
()
self
.
assertEqual
(
results1
,
[
False
]
*
N
)
for
r
,
dt
in
results2
:
self
.
assertFalse
(
r
)
self
.
assertTrue
(
dt
>=
0.2
,
dt
)
# The event is set
results1
=
[]
results2
=
[]
evt
.
set
()
Bunch
(
f
,
N
).
wait_for_finished
()
self
.
assertEqual
(
results1
,
[
True
]
*
N
)
for
r
,
dt
in
results2
:
self
.
assertTrue
(
r
)
class
ConditionTests
(
BaseTestCase
):
"""
Tests for condition variables.
"""
def
test_acquire
(
self
):
cond
=
self
.
condtype
()
# Be default we have an RLock: the condition can be acquired multiple
# times.
cond
.
acquire
()
cond
.
acquire
()
cond
.
release
()
cond
.
release
()
lock
=
threading
.
Lock
()
cond
=
self
.
condtype
(
lock
)
cond
.
acquire
()
self
.
assertFalse
(
lock
.
acquire
(
False
))
cond
.
release
()
self
.
assertTrue
(
lock
.
acquire
(
False
))
self
.
assertFalse
(
cond
.
acquire
(
False
))
lock
.
release
()
with
cond
:
self
.
assertFalse
(
lock
.
acquire
(
False
))
def
test_unacquired_wait
(
self
):
cond
=
self
.
condtype
()
self
.
assertRaises
(
RuntimeError
,
cond
.
wait
)
def
test_unacquired_notify
(
self
):
cond
=
self
.
condtype
()
self
.
assertRaises
(
RuntimeError
,
cond
.
notify
)
def
_check_notify
(
self
,
cond
):
N
=
5
results1
=
[]
results2
=
[]
phase_num
=
0
def
f
():
cond
.
acquire
()
cond
.
wait
()
cond
.
release
()
results1
.
append
(
phase_num
)
cond
.
acquire
()
cond
.
wait
()
cond
.
release
()
results2
.
append
(
phase_num
)
b
=
Bunch
(
f
,
N
)
b
.
wait_for_started
()
_wait
()
self
.
assertEqual
(
results1
,
[])
# Notify 3 threads at first
cond
.
acquire
()
cond
.
notify
(
3
)
_wait
()
phase_num
=
1
cond
.
release
()
while
len
(
results1
)
<
3
:
_wait
()
self
.
assertEqual
(
results1
,
[
1
]
*
3
)
self
.
assertEqual
(
results2
,
[])
# Notify 5 threads: they might be in their first or second wait
cond
.
acquire
()
cond
.
notify
(
5
)
_wait
()
phase_num
=
2
cond
.
release
()
while
len
(
results1
)
+
len
(
results2
)
<
8
:
_wait
()
self
.
assertEqual
(
results1
,
[
1
]
*
3
+
[
2
]
*
2
)
self
.
assertEqual
(
results2
,
[
2
]
*
3
)
# Notify all threads: they are all in their second wait
cond
.
acquire
()
cond
.
notify_all
()
_wait
()
phase_num
=
3
cond
.
release
()
while
len
(
results2
)
<
5
:
_wait
()
self
.
assertEqual
(
results1
,
[
1
]
*
3
+
[
2
]
*
2
)
self
.
assertEqual
(
results2
,
[
2
]
*
3
+
[
3
]
*
2
)
b
.
wait_for_finished
()
def
test_notify
(
self
):
cond
=
self
.
condtype
()
self
.
_check_notify
(
cond
)
# A second time, to check internal state is still ok.
self
.
_check_notify
(
cond
)
def
test_timeout
(
self
):
cond
=
self
.
condtype
()
results
=
[]
N
=
5
def
f
():
cond
.
acquire
()
t1
=
time
.
time
()
cond
.
wait
(
0.2
)
t2
=
time
.
time
()
cond
.
release
()
results
.
append
(
t2
-
t1
)
Bunch
(
f
,
N
).
wait_for_finished
()
self
.
assertEqual
(
len
(
results
),
5
)
for
dt
in
results
:
self
.
assertTrue
(
dt
>=
0.2
,
dt
)
class
BaseSemaphoreTests
(
BaseTestCase
):
"""
Common tests for {bounded, unbounded} semaphore objects.
"""
def
test_constructor
(
self
):
self
.
assertRaises
(
ValueError
,
self
.
semtype
,
value
=
-
1
)
self
.
assertRaises
(
ValueError
,
self
.
semtype
,
value
=
-
sys
.
maxint
)
def
test_acquire
(
self
):
sem
=
self
.
semtype
(
1
)
sem
.
acquire
()
sem
.
release
()
sem
=
self
.
semtype
(
2
)
sem
.
acquire
()
sem
.
acquire
()
sem
.
release
()
sem
.
release
()
def
test_acquire_destroy
(
self
):
sem
=
self
.
semtype
()
sem
.
acquire
()
del
sem
def
test_acquire_contended
(
self
):
sem
=
self
.
semtype
(
7
)
sem
.
acquire
()
N
=
10
results1
=
[]
results2
=
[]
phase_num
=
0
def
f
():
sem
.
acquire
()
results1
.
append
(
phase_num
)
sem
.
acquire
()
results2
.
append
(
phase_num
)
b
=
Bunch
(
f
,
10
)
b
.
wait_for_started
()
while
len
(
results1
)
+
len
(
results2
)
<
6
:
_wait
()
self
.
assertEqual
(
results1
+
results2
,
[
0
]
*
6
)
phase_num
=
1
for
i
in
range
(
7
):
sem
.
release
()
while
len
(
results1
)
+
len
(
results2
)
<
13
:
_wait
()
self
.
assertEqual
(
sorted
(
results1
+
results2
),
[
0
]
*
6
+
[
1
]
*
7
)
phase_num
=
2
for
i
in
range
(
6
):
sem
.
release
()
while
len
(
results1
)
+
len
(
results2
)
<
19
:
_wait
()
self
.
assertEqual
(
sorted
(
results1
+
results2
),
[
0
]
*
6
+
[
1
]
*
7
+
[
2
]
*
6
)
# The semaphore is still locked
self
.
assertFalse
(
sem
.
acquire
(
False
))
# Final release, to let the last thread finish
sem
.
release
()
b
.
wait_for_finished
()
def
test_try_acquire
(
self
):
sem
=
self
.
semtype
(
2
)
self
.
assertTrue
(
sem
.
acquire
(
False
))
self
.
assertTrue
(
sem
.
acquire
(
False
))
self
.
assertFalse
(
sem
.
acquire
(
False
))
sem
.
release
()
self
.
assertTrue
(
sem
.
acquire
(
False
))
def
test_try_acquire_contended
(
self
):
sem
=
self
.
semtype
(
4
)
sem
.
acquire
()
results
=
[]
def
f
():
results
.
append
(
sem
.
acquire
(
False
))
results
.
append
(
sem
.
acquire
(
False
))
Bunch
(
f
,
5
).
wait_for_finished
()
# There can be a thread switch between acquiring the semaphore and
# appending the result, therefore results will not necessarily be
# ordered.
self
.
assertEqual
(
sorted
(
results
),
[
False
]
*
7
+
[
True
]
*
3
)
def
test_default_value
(
self
):
# The default initial value is 1.
sem
=
self
.
semtype
()
sem
.
acquire
()
def
f
():
sem
.
acquire
()
sem
.
release
()
b
=
Bunch
(
f
,
1
)
b
.
wait_for_started
()
_wait
()
self
.
assertFalse
(
b
.
finished
)
sem
.
release
()
b
.
wait_for_finished
()
def
test_with
(
self
):
sem
=
self
.
semtype
(
2
)
def
_with
(
err
=
None
):
with
sem
:
self
.
assertTrue
(
sem
.
acquire
(
False
))
sem
.
release
()
with
sem
:
self
.
assertFalse
(
sem
.
acquire
(
False
))
if
err
:
raise
err
_with
()
self
.
assertTrue
(
sem
.
acquire
(
False
))
sem
.
release
()
self
.
assertRaises
(
TypeError
,
_with
,
TypeError
)
self
.
assertTrue
(
sem
.
acquire
(
False
))
sem
.
release
()
class
SemaphoreTests
(
BaseSemaphoreTests
):
"""
Tests for unbounded semaphores.
"""
def
test_release_unacquired
(
self
):
# Unbounded releases are allowed and increment the semaphore's value
sem
=
self
.
semtype
(
1
)
sem
.
release
()
sem
.
acquire
()
sem
.
acquire
()
sem
.
release
()
class
BoundedSemaphoreTests
(
BaseSemaphoreTests
):
"""
Tests for bounded semaphores.
"""
def
test_release_unacquired
(
self
):
# Cannot go past the initial value
sem
=
self
.
semtype
()
self
.
assertRaises
(
ValueError
,
sem
.
release
)
sem
.
acquire
()
sem
.
release
()
self
.
assertRaises
(
ValueError
,
sem
.
release
)
greentest/test_threading_2.py
0 → 100644
View file @
4d5e8021
# testing gevent's Event, Lock, RLock, Semaphore, BoundedSemaphore with standard test_threading
from
__future__
import
with_statement
from
gevent
import
monkey
;
monkey
.
patch_all
()
from
gevent.event
import
Event
from
gevent.coros
import
RLock
,
Semaphore
,
BoundedSemaphore
from
gevent.thread
import
allocate_lock
as
Lock
import
test.test_support
from
test.test_support
import
verbose
import
random
import
re
import
sys
import
threading
import
thread
import
time
import
unittest
import
weakref
threading
.
Event
=
Event
threading
.
Lock
=
Lock
threading
.
RLock
=
RLock
threading
.
Semaphore
=
Semaphore
threading
.
BoundedSemaphore
=
BoundedSemaphore
if
not
hasattr
(
threading
,
'current_thread'
):
threading
.
current_thread
=
threading
.
currentThread
if
not
hasattr
(
threading
.
Thread
,
'name'
):
threading
.
Thread
.
name
=
property
(
lambda
self
:
self
.
getName
())
if
not
hasattr
(
threading
.
Thread
,
'is_alive'
):
threading
.
Thread
.
is_alive
=
threading
.
Thread
.
isAlive
if
not
hasattr
(
threading
.
_Condition
,
'notify_all'
):
threading
.
_Condition
.
notify_all
=
threading
.
_Condition
.
notifyAll
import
lock_tests
# A trivial mutable counter.
class
Counter
(
object
):
def
__init__
(
self
):
self
.
value
=
0
def
inc
(
self
):
self
.
value
+=
1
def
dec
(
self
):
self
.
value
-=
1
def
get
(
self
):
return
self
.
value
class
TestThread
(
threading
.
Thread
):
def
__init__
(
self
,
name
,
testcase
,
sema
,
mutex
,
nrunning
):
threading
.
Thread
.
__init__
(
self
,
name
=
name
)
self
.
testcase
=
testcase
self
.
sema
=
sema
self
.
mutex
=
mutex
self
.
nrunning
=
nrunning
def
run
(
self
):
delay
=
random
.
random
()
/
10000.0
if
verbose
:
print
'task %s will run for %.1f usec'
%
(
self
.
name
,
delay
*
1e6
)
with
self
.
sema
:
with
self
.
mutex
:
self
.
nrunning
.
inc
()
if
verbose
:
print
self
.
nrunning
.
get
(),
'tasks are running'
self
.
testcase
.
assert_
(
self
.
nrunning
.
get
()
<=
3
)
time
.
sleep
(
delay
)
if
verbose
:
print
'task'
,
self
.
name
,
'done'
with
self
.
mutex
:
self
.
nrunning
.
dec
()
self
.
testcase
.
assert_
(
self
.
nrunning
.
get
()
>=
0
)
if
verbose
:
print
'%s is finished. %d tasks are running'
%
(
self
.
name
,
self
.
nrunning
.
get
())
class
ThreadTests
(
unittest
.
TestCase
):
# Create a bunch of threads, let each do some work, wait until all are
# done.
def
test_various_ops
(
self
):
# This takes about n/3 seconds to run (about n/3 clumps of tasks,
# times about 1 second per clump).
NUMTASKS
=
10
# no more than 3 of the 10 can run at once
sema
=
threading
.
BoundedSemaphore
(
value
=
3
)
mutex
=
threading
.
RLock
()
numrunning
=
Counter
()
threads
=
[]
for
i
in
range
(
NUMTASKS
):
t
=
TestThread
(
"<thread %d>"
%
i
,
self
,
sema
,
mutex
,
numrunning
)
threads
.
append
(
t
)
if
hasattr
(
t
,
'ident'
):
self
.
failUnlessEqual
(
t
.
ident
,
None
)
self
.
assert_
(
re
.
match
(
'<TestThread
\
(.*, i
n
itial
\
)>
'
, repr(t)))
t.start()
if verbose:
print '
waiting
for
all
tasks
to
complete
'
for t in threads:
t.join(NUMTASKS)
self.assert_(not t.is_alive())
if hasattr(t, '
ident
'):
self.failIfEqual(t.ident, 0)
self.assertFalse(t.ident is None)
self.assert_(re.match('
<
TestThread
\
(.
*
,
\
w
+
-
?
\
d
+
\
)
>
', repr(t)))
if verbose:
print '
all
tasks
done
'
self.assertEqual(numrunning.get(), 0)
if sys.version_info[:2] > (2, 5):
def test_ident_of_no_threading_threads(self):
# The ident still must work for the main thread and dummy threads.
self.assertFalse(threading.currentThread().ident is None)
def f():
ident.append(threading.currentThread().ident)
done.set()
done = threading.Event()
ident = []
thread.start_new_thread(f, ())
done.wait()
self.assertFalse(ident[0] is None)
# Kill the "immortal" _DummyThread
del threading._active[ident[0]]
# run with a small(ish) thread stack size (256kB)
def test_various_ops_small_stack(self):
if verbose:
print '
with
256
kB
thread
stack
size
...
'
try:
threading.stack_size(262144)
except thread.error:
if verbose:
print '
platform
does
not
support
changing
thread
stack
size
'
return
self.test_various_ops()
threading.stack_size(0)
# run with a large thread stack size (1MB)
def test_various_ops_large_stack(self):
if verbose:
print '
with
1
MB
thread
stack
size
...
'
try:
threading.stack_size(0x100000)
except thread.error:
if verbose:
print '
platform
does
not
support
changing
thread
stack
size
'
return
self.test_various_ops()
threading.stack_size(0)
def BOGUS_test_foreign_thread(self):
# Check that a "foreign" thread can use the threading module.
def f(mutex):
# Calling current_thread() forces an entry for the foreign
# thread to get made in the threading._active map.
threading.current_thread()
mutex.release()
mutex = threading.Lock()
mutex.acquire()
tid = thread.start_new_thread(f, (mutex,))
# Wait for the thread to finish.
mutex.acquire()
self.assert_(tid in threading._active)
self.assert_(isinstance(threading._active[tid],
threading._DummyThread))
del threading._active[tid]
# PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently)
# exposed at the Python level. This test relies on ctypes to get at it.
def SKIP_test_PyThreadState_SetAsyncExc(self):
try:
import ctypes
except ImportError:
if verbose:
print "test_PyThreadState_SetAsyncExc can'
t
import
ctypes
"
return # can't do anything
set_async_exc = ctypes.pythonapi.PyThreadState_SetAsyncExc
class AsyncExc(Exception):
pass
exception = ctypes.py_object(AsyncExc)
# `worker_started` is set by the thread when it's inside a try/except
# block waiting to catch the asynchronously set AsyncExc exception.
# `worker_saw_exception` is set by the thread upon catching that
# exception.
worker_started = threading.Event()
worker_saw_exception = threading.Event()
class Worker(threading.Thread):
def run(self):
self.id = thread.get_ident()
self.finished = False
try:
while True:
worker_started.set()
time.sleep(0.1)
except AsyncExc:
self.finished = True
worker_saw_exception.set()
t = Worker()
t.daemon = True # so if this fails, we don't hang Python at shutdown
t.start()
if verbose:
print "
started
worker
thread
"
# Try a thread id that doesn't make sense.
if verbose:
print "
trying
nonsensical
thread
id
"
result = set_async_exc(ctypes.c_long(-1), exception)
self.assertEqual(result, 0) # no thread states modified
# Now raise an exception in the worker thread.
if verbose:
print "
waiting
for
worker
thread
to
get
started
"
worker_started.wait()
if verbose:
print "
verifying
worker
hasn
't exited"
self.assert_(not t.finished)
if verbose:
print " attempting to raise asynch exception in worker"
result = set_async_exc(ctypes.c_long(t.id), exception)
self.assertEqual(result, 1) # one thread state modified
if verbose:
print " waiting for worker to say it caught the exception"
worker_saw_exception.wait(timeout=10)
self.assert_(t.finished)
if verbose:
print " all OK -- joining worker"
if t.finished:
t.join()
# else the thread is still running, and we have no way to kill it
if sys.version_info[:2] > (2, 5):
def test_limbo_cleanup(self):
# Issue 7481: Failure to start thread should cleanup the limbo map.
def fail_new_thread(*args):
raise thread.error()
_start_new_thread = threading._start_new_thread
threading._start_new_thread = fail_new_thread
try:
t = threading.Thread(target=lambda: None)
self.assertRaises(thread.error, t.start)
self.assertFalse(
t in threading._limbo,
"Failed to cleanup _limbo map on failure of Thread.start().")
finally:
threading._start_new_thread = _start_new_thread
if sys.version_info[:2] > (2, 5):
def test_finalize_runnning_thread(self):
# Issue 1402: the PyGILState_Ensure / _Release functions may be called
# very late on python exit: on deallocation of a running thread for
# example.
try:
import ctypes
except ImportError:
if verbose:
print("test_finalize_with_runnning_thread can'
t
import
ctypes
")
return # can't do anything
import subprocess
rc = subprocess.call([sys.executable, "
-
c
", """if 1:
import ctypes, sys, time, thread
# This lock is used as a simple event variable.
ready = thread.allocate_lock()
ready.acquire()
# Module globals are cleared before __del__ is run
# So we save the functions in class dict
class C:
ensure = ctypes.pythonapi.PyGILState_Ensure
release = ctypes.pythonapi.PyGILState_Release
def __del__(self):
state = self.ensure()
self.release(state)
def waitingThread():
x = C()
ready.release()
time.sleep(100)
thread.start_new_thread(waitingThread, ())
ready.acquire() # Be sure the other thread is waiting.
sys.exit(42)
"""])
self.assertEqual(rc, 42)
def test_finalize_with_trace(self):
# Issue1733757
# Avoid a deadlock when sys.settrace steps into threading._shutdown
import subprocess
rc = subprocess.call([sys.executable, "
-
c
", """if 1:
import sys, threading
# A deadlock-killer, to prevent the
# testsuite to hang forever
def killer():
import os, time
time.sleep(2)
print 'program blocked; aborting'
os._exit(2)
t = threading.Thread(target=killer)
t.daemon = True
t.start()
# This is the trace function
def func(frame, event, arg):
threading.current_thread()
return func
sys.settrace(func)
"""])
self.failIf(rc == 2, "
interpreted
was
blocked
")
self.failUnless(rc == 0, "
Unexpected
error
")
if sys.version_info[:2] > (2, 5):
def test_join_nondaemon_on_shutdown(self):
# Issue 1722344
# Raising SystemExit skipped threading._shutdown
import subprocess
p = subprocess.Popen([sys.executable, "
-
c
", """if 1:
import threading
from time import sleep
def child():
sleep(1)
# As a non-daemon thread we SHOULD wake up and nothing
# should be torn down yet
print "
Woke
up
,
sleep
function
is
:
", sleep
threading.Thread(target=child).start()
raise SystemExit
"""],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
stdout, stderr = p.communicate()
self.assertEqual(stdout.strip(),
"
Woke
up
,
sleep
function
is
:
<
built
-
in
function
sleep
>
")
stderr = re.sub(r"
^
\
[
\
d
+
refs
\
]
", "", stderr, re.MULTILINE).strip()
self.assertEqual(stderr, "")
def test_enumerate_after_join(self):
# Try hard to trigger #1703448: a thread is still returned in
# threading.enumerate() after it has been join()ed.
enum = threading.enumerate
old_interval = sys.getcheckinterval()
try:
for i in xrange(1, 100):
# Try a couple times at each thread-switching interval
# to get more interleavings.
sys.setcheckinterval(i // 5)
t = threading.Thread(target=lambda: None)
t.start()
t.join()
l = enum()
self.assertFalse(t in l,
"
#1703448 triggered after %d trials: %s" % (i, l))
finally
:
sys
.
setcheckinterval
(
old_interval
)
if
sys
.
version_info
[:
2
]
>
(
2
,
5
):
def
test_no_refcycle_through_target
(
self
):
class
RunSelfFunction
(
object
):
def
__init__
(
self
,
should_raise
):
# The links in this refcycle from Thread back to self
# should be cleaned up when the thread completes.
self
.
should_raise
=
should_raise
self
.
thread
=
threading
.
Thread
(
target
=
self
.
_run
,
args
=
(
self
,),
kwargs
=
{
'yet_another'
:
self
})
self
.
thread
.
start
()
def
_run
(
self
,
other_ref
,
yet_another
):
if
self
.
should_raise
:
raise
SystemExit
cyclic_object
=
RunSelfFunction
(
should_raise
=
False
)
weak_cyclic_object
=
weakref
.
ref
(
cyclic_object
)
cyclic_object
.
thread
.
join
()
del
cyclic_object
self
.
assertEquals
(
None
,
weak_cyclic_object
(),
msg
=
(
'%d references still around'
%
sys
.
getrefcount
(
weak_cyclic_object
())))
raising_cyclic_object
=
RunSelfFunction
(
should_raise
=
True
)
weak_raising_cyclic_object
=
weakref
.
ref
(
raising_cyclic_object
)
raising_cyclic_object
.
thread
.
join
()
del
raising_cyclic_object
self
.
assertEquals
(
None
,
weak_raising_cyclic_object
(),
msg
=
(
'%d references still around'
%
sys
.
getrefcount
(
weak_raising_cyclic_object
())))
class
ThreadJoinOnShutdown
(
unittest
.
TestCase
):
def
_run_and_join
(
self
,
script
):
script
=
"""if 1:
import sys, os, time, threading
# a thread, which waits for the main program to terminate
def joiningfunc(mainthread):
mainthread.join()
print 'end of thread'
\
n
"""
+
script
import
subprocess
p
=
subprocess
.
Popen
([
sys
.
executable
,
"-c"
,
script
],
stdout
=
subprocess
.
PIPE
)
rc
=
p
.
wait
()
data
=
p
.
stdout
.
read
().
replace
(
'
\
r
'
,
''
)
self
.
assertEqual
(
data
,
"end of main
\
n
end of thread
\
n
"
)
self
.
failIf
(
rc
==
2
,
"interpreter was blocked"
)
self
.
failUnless
(
rc
==
0
,
"Unexpected error"
)
def
test_1_join_on_shutdown
(
self
):
# The usual case: on exit, wait for a non-daemon thread
script
=
"""if 1:
import os
t = threading.Thread(target=joiningfunc,
args=(threading.current_thread(),))
t.start()
time.sleep(0.1)
print 'end of main'
"""
self
.
_run_and_join
(
script
)
def
test_2_join_in_forked_process
(
self
):
# Like the test above, but from a forked interpreter
import
os
if
not
hasattr
(
os
,
'fork'
):
return
script
=
"""if 1:
childpid = os.fork()
if childpid != 0:
os.waitpid(childpid, 0)
sys.exit(0)
t = threading.Thread(target=joiningfunc,
args=(threading.current_thread(),))
t.start()
print 'end of main'
"""
self
.
_run_and_join
(
script
)
def
test_3_join_in_forked_from_thread
(
self
):
# Like the test above, but fork() was called from a worker thread
# In the forked process, the main Thread object must be marked as stopped.
import
os
if
not
hasattr
(
os
,
'fork'
):
return
# Skip platforms with known problems forking from a worker thread.
# See http://bugs.python.org/issue3863.
if
sys
.
platform
in
(
'freebsd4'
,
'freebsd5'
,
'freebsd6'
,
'os2emx'
):
print
>>
sys
.
stderr
,
(
'Skipping test_3_join_in_forked_from_thread'
' due to known OS bugs on'
),
sys
.
platform
return
script
=
"""if 1:
main_thread = threading.current_thread()
def worker():
childpid = os.fork()
if childpid != 0:
os.waitpid(childpid, 0)
sys.exit(0)
t = threading.Thread(target=joiningfunc,
args=(main_thread,))
print 'end of main'
t.start()
t.join() # Should not block: main_thread is already stopped
w = threading.Thread(target=worker)
w.start()
"""
self
.
_run_and_join
(
script
)
class
ThreadingExceptionTests
(
unittest
.
TestCase
):
# A RuntimeError should be raised if Thread.start() is called
# multiple times.
def
test_start_thread_again
(
self
):
thread
=
threading
.
Thread
()
thread
.
start
()
self
.
assertRaises
(
RuntimeError
,
thread
.
start
)
def
test_joining_current_thread
(
self
):
current_thread
=
threading
.
current_thread
()
self
.
assertRaises
(
RuntimeError
,
current_thread
.
join
);
def
test_joining_inactive_thread
(
self
):
thread
=
threading
.
Thread
()
self
.
assertRaises
(
RuntimeError
,
thread
.
join
)
def
test_daemonize_active_thread
(
self
):
thread
=
threading
.
Thread
()
thread
.
start
()
self
.
assertRaises
(
RuntimeError
,
setattr
,
thread
,
"daemon"
,
True
)
class
LockTests
(
lock_tests
.
LockTests
):
locktype
=
staticmethod
(
threading
.
Lock
)
class
RLockTests
(
lock_tests
.
RLockTests
):
locktype
=
staticmethod
(
threading
.
RLock
)
class
EventTests
(
lock_tests
.
EventTests
):
eventtype
=
staticmethod
(
threading
.
Event
)
class
ConditionAsRLockTests
(
lock_tests
.
RLockTests
):
# An Condition uses an RLock by default and exports its API.
locktype
=
staticmethod
(
threading
.
Condition
)
class
ConditionTests
(
lock_tests
.
ConditionTests
):
condtype
=
staticmethod
(
threading
.
Condition
)
class
SemaphoreTests
(
lock_tests
.
SemaphoreTests
):
semtype
=
staticmethod
(
threading
.
Semaphore
)
class
BoundedSemaphoreTests
(
lock_tests
.
BoundedSemaphoreTests
):
semtype
=
staticmethod
(
threading
.
BoundedSemaphore
)
def
test_main
():
test
.
test_support
.
run_unittest
(
LockTests
,
RLockTests
,
EventTests
,
ConditionAsRLockTests
,
ConditionTests
,
SemaphoreTests
,
BoundedSemaphoreTests
,
ThreadTests
,
ThreadJoinOnShutdown
,
ThreadingExceptionTests
,
)
if
__name__
==
"__main__"
:
test_main
()
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment