Commit 8cbdb236 authored by Gregory P. Smith's avatar Gregory P. Smith

Merged revisions 87710 via svnmerge from

svn+ssh://pythondev@svn.python.org/python/branches/py3k

........
  r87710 | gregory.p.smith | 2011-01-03 13:06:12 -0800 (Mon, 03 Jan 2011) | 4 lines

  issue6643 - Two locks held within the threading module on each thread instance
  needed to be reinitialized after fork().  Adds tests to confirm that they are
  and that a potential deadlock and crasher bug are fixed (platform dependant).
........
parent 1d887fb7
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
import test.support import test.support
from test.support import verbose from test.support import verbose
import os
import random import random
import re import re
import sys import sys
...@@ -10,6 +11,7 @@ import _thread ...@@ -10,6 +11,7 @@ import _thread
import time import time
import unittest import unittest
import weakref import weakref
import subprocess
from test import lock_tests from test import lock_tests
...@@ -247,7 +249,6 @@ class ThreadTests(unittest.TestCase): ...@@ -247,7 +249,6 @@ class ThreadTests(unittest.TestCase):
print("test_finalize_with_runnning_thread can't import ctypes") print("test_finalize_with_runnning_thread can't import ctypes")
return # can't do anything return # can't do anything
import subprocess
rc = subprocess.call([sys.executable, "-c", """if 1: rc = subprocess.call([sys.executable, "-c", """if 1:
import ctypes, sys, time, _thread import ctypes, sys, time, _thread
...@@ -278,7 +279,6 @@ class ThreadTests(unittest.TestCase): ...@@ -278,7 +279,6 @@ class ThreadTests(unittest.TestCase):
def test_finalize_with_trace(self): def test_finalize_with_trace(self):
# Issue1733757 # Issue1733757
# Avoid a deadlock when sys.settrace steps into threading._shutdown # Avoid a deadlock when sys.settrace steps into threading._shutdown
import subprocess
p = subprocess.Popen([sys.executable, "-c", """if 1: p = subprocess.Popen([sys.executable, "-c", """if 1:
import sys, threading import sys, threading
...@@ -311,7 +311,6 @@ class ThreadTests(unittest.TestCase): ...@@ -311,7 +311,6 @@ class ThreadTests(unittest.TestCase):
def test_join_nondaemon_on_shutdown(self): def test_join_nondaemon_on_shutdown(self):
# Issue 1722344 # Issue 1722344
# Raising SystemExit skipped threading._shutdown # Raising SystemExit skipped threading._shutdown
import subprocess
p = subprocess.Popen([sys.executable, "-c", """if 1: p = subprocess.Popen([sys.executable, "-c", """if 1:
import threading import threading
from time import sleep from time import sleep
...@@ -412,7 +411,6 @@ class ThreadJoinOnShutdown(unittest.TestCase): ...@@ -412,7 +411,6 @@ class ThreadJoinOnShutdown(unittest.TestCase):
sys.stdout.flush() sys.stdout.flush()
\n""" + script \n""" + script
import subprocess
p = subprocess.Popen([sys.executable, "-c", script], stdout=subprocess.PIPE) p = subprocess.Popen([sys.executable, "-c", script], stdout=subprocess.PIPE)
rc = p.wait() rc = p.wait()
data = p.stdout.read().decode().replace('\r', '') data = p.stdout.read().decode().replace('\r', '')
...@@ -484,6 +482,152 @@ class ThreadJoinOnShutdown(unittest.TestCase): ...@@ -484,6 +482,152 @@ class ThreadJoinOnShutdown(unittest.TestCase):
""" """
self._run_and_join(script) self._run_and_join(script)
def assertScriptHasOutput(self, script, expected_output):
p = subprocess.Popen([sys.executable, "-c", script],
stdout=subprocess.PIPE)
rc = p.wait()
data = p.stdout.read().decode().replace('\r', '')
self.assertEqual(rc, 0, "Unexpected error")
self.assertEqual(data, expected_output)
@unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
def test_4_joining_across_fork_in_worker_thread(self):
# There used to be a possible deadlock when forking from a child
# thread. See http://bugs.python.org/issue6643.
# Skip platforms with known problems forking from a worker thread.
# See http://bugs.python.org/issue3863.
if sys.platform in ('freebsd4', 'freebsd5', 'freebsd6', 'os2emx'):
raise unittest.SkipTest('due to known OS bugs on ' + sys.platform)
# The script takes the following steps:
# - The main thread in the parent process starts a new thread and then
# tries to join it.
# - The join operation acquires the Lock inside the thread's _block
# Condition. (See threading.py:Thread.join().)
# - We stub out the acquire method on the condition to force it to wait
# until the child thread forks. (See LOCK ACQUIRED HERE)
# - The child thread forks. (See LOCK HELD and WORKER THREAD FORKS
# HERE)
# - The main thread of the parent process enters Condition.wait(),
# which releases the lock on the child thread.
# - The child process returns. Without the necessary fix, when the
# main thread of the child process (which used to be the child thread
# in the parent process) attempts to exit, it will try to acquire the
# lock in the Thread._block Condition object and hang, because the
# lock was held across the fork.
script = """if 1:
import os, time, threading
finish_join = False
start_fork = False
def worker():
# Wait until this thread's lock is acquired before forking to
# create the deadlock.
global finish_join
while not start_fork:
time.sleep(0.01)
# LOCK HELD: Main thread holds lock across this call.
childpid = os.fork()
finish_join = True
if childpid != 0:
# Parent process just waits for child.
os.waitpid(childpid, 0)
# Child process should just return.
w = threading.Thread(target=worker)
# Stub out the private condition variable's lock acquire method.
# This acquires the lock and then waits until the child has forked
# before returning, which will release the lock soon after. If
# someone else tries to fix this test case by acquiring this lock
# before forking instead of reseting it, the test case will
# deadlock when it shouldn't.
condition = w._block
orig_acquire = condition.acquire
call_count_lock = threading.Lock()
call_count = 0
def my_acquire():
global call_count
global start_fork
orig_acquire() # LOCK ACQUIRED HERE
start_fork = True
if call_count == 0:
while not finish_join:
time.sleep(0.01) # WORKER THREAD FORKS HERE
with call_count_lock:
call_count += 1
condition.acquire = my_acquire
w.start()
w.join()
print('end of main')
"""
self.assertScriptHasOutput(script, "end of main\n")
@unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
def test_5_clear_waiter_locks_to_avoid_crash(self):
# Check that a spawned thread that forks doesn't segfault on certain
# platforms, namely OS X. This used to happen if there was a waiter
# lock in the thread's condition variable's waiters list. Even though
# we know the lock will be held across the fork, it is not safe to
# release locks held across forks on all platforms, so releasing the
# waiter lock caused a segfault on OS X. Furthermore, since locks on
# OS X are (as of this writing) implemented with a mutex + condition
# variable instead of a semaphore, while we know that the Python-level
# lock will be acquired, we can't know if the internal mutex will be
# acquired at the time of the fork.
# Skip platforms with known problems forking from a worker thread.
# See http://bugs.python.org/issue3863.
if sys.platform in ('freebsd4', 'freebsd5', 'freebsd6', 'os2emx'):
raise unittest.SkipTest('due to known OS bugs on ' + sys.platform)
script = """if True:
import os, time, threading
start_fork = False
def worker():
# Wait until the main thread has attempted to join this thread
# before continuing.
while not start_fork:
time.sleep(0.01)
childpid = os.fork()
if childpid != 0:
# Parent process just waits for child.
(cpid, rc) = os.waitpid(childpid, 0)
assert cpid == childpid
assert rc == 0
print('end of worker thread')
else:
# Child process should just return.
pass
w = threading.Thread(target=worker)
# Stub out the private condition variable's _release_save method.
# This releases the condition's lock and flips the global that
# causes the worker to fork. At this point, the problematic waiter
# lock has been acquired once by the waiter and has been put onto
# the waiters list.
condition = w._block
orig_release_save = condition._release_save
def my_release_save():
global start_fork
orig_release_save()
# Waiter lock held here, condition lock released.
start_fork = True
condition._release_save = my_release_save
w.start()
w.join()
print('end of main thread')
"""
output = "end of worker thread\nend of main thread\n"
self.assertScriptHasOutput(script, output)
class ThreadingExceptionTests(unittest.TestCase): class ThreadingExceptionTests(unittest.TestCase):
# A RuntimeError should be raised if Thread.start() is called # A RuntimeError should be raised if Thread.start() is called
......
...@@ -856,6 +856,10 @@ def _after_fork(): ...@@ -856,6 +856,10 @@ def _after_fork():
# its new value since it can have changed. # its new value since it can have changed.
ident = _get_ident() ident = _get_ident()
thread._ident = ident thread._ident = ident
# Any condition variables hanging off of the active thread may
# be in an invalid state, so we reinitialize them.
thread._block.__init__()
thread._started._cond.__init__()
new_active[ident] = thread new_active[ident] = thread
else: else:
# All the others are already stopped. # All the others are already stopped.
......
...@@ -27,6 +27,9 @@ Core and Builtins ...@@ -27,6 +27,9 @@ Core and Builtins
Library Library
------- -------
- Issue #6643: Reinitialize locks held within the threading module after fork
to avoid a potential rare deadlock or crash on some platforms.
- Issue #10806, issue #9905: Fix subprocess pipes when some of the standard - Issue #10806, issue #9905: Fix subprocess pipes when some of the standard
file descriptors (0, 1, 2) are closed in the parent process. Initial file descriptors (0, 1, 2) are closed in the parent process. Initial
patch by Ross Lagerwall. patch by Ross Lagerwall.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment