Commit 4f9bb1d9 authored by Jason Madden's avatar Jason Madden Committed by GitHub

Merge pull request #1053 from gevent/test-363

Test on 3.6.3 on Travis; appveyor was already there. 
parents 5d2bcb8d 7c9842b3
......@@ -149,6 +149,14 @@
See :issue:`790` for history and more in-depth discussion.
- gevent is now tested on Python 3.6.3. This includes the following
fixes and changes:
- Errors raised from :mod:`gevent.subprocess` will have a
``filename`` attribute set.
- The :class:`threading.Timer` class is now monkey-patched and can
be joined.
1.2.2 (2017-06-05)
==================
......
......@@ -116,7 +116,7 @@ PY278=$(BUILD_RUNTIMES)/snakepit/python2.7.8
PY27=$(BUILD_RUNTIMES)/snakepit/python2.7.14
PY34=$(BUILD_RUNTIMES)/snakepit/python3.4.7
PY35=$(BUILD_RUNTIMES)/snakepit/python3.5.4
PY36=$(BUILD_RUNTIMES)/snakepit/python3.6.2
PY36=$(BUILD_RUNTIMES)/snakepit/python3.6.3
PYPY=$(BUILD_RUNTIMES)/snakepit/pypy590
PYPY3=$(BUILD_RUNTIMES)/snakepit/pypy3.5_590
......@@ -181,7 +181,7 @@ test-py35: $(PY35)
PYTHON=python3.5.4 PATH=$(BUILD_RUNTIMES)/versions/python3.5.4/bin:$(PATH) make develop fulltoxtest
test-py36: $(PY36)
PYTHON=python3.6.2 PATH=$(BUILD_RUNTIMES)/versions/python3.6.2/bin:$(PATH) make develop toxtest
PYTHON=python3.6.3 PATH=$(BUILD_RUNTIMES)/versions/python3.6.3/bin:$(PATH) make develop toxtest
test-py36-libuv: $(PY36)
GEVENT_CORE_CFFI_ONLY=libuv make test-py36
......
......@@ -94,7 +94,7 @@ for var in "$@"; do
install 3.5.4 python3.5.4
;;
3.6)
install 3.6.2 python3.6.2
install 3.6.3 python3.6.3
;;
pypy)
install pypy2.7-5.9.0 pypy590
......
......@@ -464,6 +464,11 @@ class socket(object):
raise
def setblocking(self, flag):
# Beginning in 3.6.0b3 this is supposed to raise
# if the file descriptor is closed, but the test for it
# involves closing the fileno directly. Since we
# don't touch the fileno here, it doesn't make sense for
# us.
if flag:
self.timeout = None
else:
......
......@@ -1236,7 +1236,11 @@ class Popen(object):
closed.add(fd)
if cwd is not None:
os.chdir(cwd)
try:
os.chdir(cwd)
except OSError as e:
e._failed_chdir = True
raise
if preexec_fn:
preexec_fn()
......@@ -1340,6 +1344,10 @@ class Popen(object):
for fd in (p2cwrite, c2pread, errread):
if fd is not None:
os.close(fd)
if isinstance(child_exception, OSError):
child_exception.filename = executable
if hasattr(child_exception, '_failed_chdir'):
child_exception.filename = cwd
raise child_exception
def _handle_exitstatus(self, sts):
......
......@@ -188,11 +188,13 @@ if sys.version_info[:2] >= (3, 4):
__implements__.append('Thread')
# The main thread is patched up with more care in monkey.py
#t = __threading__.current_thread()
#if isinstance(t, __threading__.Thread):
# t.__class__ = Thread
# t._greenlet = getcurrent()
class Timer(Thread, __threading__.Timer): # pylint:disable=abstract-method,inherit-non-class
pass
__implements__.append('Timer')
# The main thread is patched up with more care
# in _gevent_will_monkey_patch
if sys.version_info[:2] >= (3, 3):
__implements__.remove('_get_ident')
......
......@@ -278,8 +278,14 @@ class ThreadableTest:
def clientRun(self, test_func):
self.server_ready.wait()
self.clientSetUp()
self.client_ready.set()
try:
self.clientSetUp()
except BaseException as e:
self.queue.put(e)
self.clientTearDown()
return
finally:
self.client_ready.set()
if self.server_crashed:
self.clientTearDown()
return
......@@ -520,8 +526,11 @@ class ConnectedStreamTestMixin(SocketListeningTestMixin,
self.serv_conn = self.cli
def clientTearDown(self):
self.serv_conn.close()
self.serv_conn = None
try:
self.serv_conn.close()
self.serv_conn = None
except AttributeError:
pass
super().clientTearDown()
......@@ -540,7 +549,7 @@ class UnixSocketTestBase(SocketTestBase):
def bindSock(self, sock):
path = tempfile.mktemp(dir=self.dir_path)
sock.bind(path)
support.bind_unix_socket(sock, path)
self.addCleanup(support.unlink, path)
class UnixStreamBase(UnixSocketTestBase):
......@@ -794,11 +803,6 @@ class GeneralModuleTests(unittest.TestCase):
self.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn, repr(all_host_names)))
def test_host_resolution(self):
for addr in ['0.1.1.~1', '1+.1.1.1', '::1q', '::1::2',
'1:1:1:1:1:1:1:1:1']:
self.assertRaises(OSError, socket.gethostbyname, addr)
self.assertRaises(OSError, socket.gethostbyaddr, addr)
for addr in [support.HOST, '10.0.0.1', '255.255.255.255']:
self.assertEqual(socket.gethostbyname(addr), addr)
......@@ -807,6 +811,21 @@ class GeneralModuleTests(unittest.TestCase):
for host in [support.HOST]:
self.assertIn(host, socket.gethostbyaddr(host)[2])
def test_host_resolution_bad_address(self):
# These are all malformed IP addresses and expected not to resolve to
# any result. But some ISPs, e.g. AWS, may successfully resolve these
# IPs.
explanation = (
"resolving an invalid IP address did not raise OSError; "
"can be caused by a broken DNS server"
)
for addr in ['0.1.1.~1', '1+.1.1.1', '::1q', '::1::2',
'1:1:1:1:1:1:1:1:1']:
with self.assertRaises(OSError):
socket.gethostbyname(addr)
with self.assertRaises(OSError, msg=explanation):
socket.gethostbyaddr(addr)
@unittest.skipUnless(hasattr(socket, 'sethostname'), "test needs socket.sethostname()")
@unittest.skipUnless(hasattr(socket, 'gethostname'), "test needs socket.gethostname()")
def test_sethostname(self):
......@@ -887,6 +906,7 @@ class GeneralModuleTests(unittest.TestCase):
self.assertEqual(swapped & mask, mask)
self.assertRaises(OverflowError, func, 1<<34)
@support.cpython_only
def testNtoHErrors(self):
good_values = [ 1, 2, 3, 1, 2, 3 ]
bad_values = [ -1, -2, -3, -1, -2, -3 ]
......@@ -1480,7 +1500,8 @@ class GeneralModuleTests(unittest.TestCase):
# type and populates the socket object.
#
# On Windows this trick won't work, so the test is skipped.
fd, _ = tempfile.mkstemp()
fd, path = tempfile.mkstemp()
self.addCleanup(os.unlink, path)
with socket.socket(family=42424, type=13331, fileno=fd) as s:
self.assertEqual(s.family, 42424)
self.assertEqual(s.type, 13331)
......@@ -4552,6 +4573,19 @@ class TestExceptions(unittest.TestCase):
self.assertTrue(issubclass(socket.gaierror, OSError))
self.assertTrue(issubclass(socket.timeout, OSError))
def test_setblocking_invalidfd(self):
# Regression test for issue #28471
sock0 = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
sock = socket.socket(
socket.AF_INET, socket.SOCK_STREAM, 0, sock0.fileno())
sock0.close()
self.addCleanup(sock.detach)
with self.assertRaises(OSError):
sock.setblocking(False)
@unittest.skipUnless(sys.platform == 'linux', 'Linux specific test')
class TestLinuxAbstractNamespace(unittest.TestCase):
......@@ -4617,7 +4651,7 @@ class TestUnixDomain(unittest.TestCase):
def bind(self, sock, path):
# Bind the socket
try:
sock.bind(path)
support.bind_unix_socket(sock, path)
except OSError as e:
if str(e) == "AF_UNIX path too long":
self.skipTest(
......@@ -4626,6 +4660,10 @@ class TestUnixDomain(unittest.TestCase):
else:
raise
def testUnbound(self):
# Issue #30205
self.assertIn(self.sock.getsockname(), ('', None))
def testStrAddr(self):
# Test binding to and retrieving a normal string pathname.
path = os.path.abspath(support.TESTFN)
......@@ -4755,9 +4793,13 @@ def isTipcAvailable():
"""
if not hasattr(socket, "AF_TIPC"):
return False
if not os.path.isfile("/proc/modules"):
try:
f = open("/proc/modules")
except (FileNotFoundError, IsADirectoryError, PermissionError):
# It's ok if the file does not exist, is a directory or if we
# have not the permission to read it.
return False
with open("/proc/modules") as f:
with f:
for line in f:
if line.startswith("tipc "):
return True
......@@ -5442,7 +5484,7 @@ class LinuxKernelCryptoAPI(unittest.TestCase):
self.assertEqual(len(dec), msglen * multiplier)
self.assertEqual(dec, msg * multiplier)
@support.requires_linux_version(4, 3) # see test_aes_cbc
@support.requires_linux_version(4, 9) # see issue29324
def test_aead_aes_gcm(self):
key = bytes.fromhex('c939cc13397c1d37de6ae0e1cb7c423c')
iv = bytes.fromhex('b3d8cc017cbb89b39e0f67e2')
......@@ -5465,8 +5507,7 @@ class LinuxKernelCryptoAPI(unittest.TestCase):
op.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, iv=iv,
assoclen=assoclen, flags=socket.MSG_MORE)
op.sendall(assoc, socket.MSG_MORE)
op.sendall(plain, socket.MSG_MORE)
op.sendall(b'\x00' * taglen)
op.sendall(plain)
res = op.recv(assoclen + len(plain) + taglen)
self.assertEqual(expected_ct, res[assoclen:-taglen])
self.assertEqual(expected_tag, res[-taglen:])
......@@ -5474,7 +5515,7 @@ class LinuxKernelCryptoAPI(unittest.TestCase):
# now with msg
op, _ = algo.accept()
with op:
msg = assoc + plain + b'\x00' * taglen
msg = assoc + plain
op.sendmsg_afalg([msg], op=socket.ALG_OP_ENCRYPT, iv=iv,
assoclen=assoclen)
res = op.recv(assoclen + len(plain) + taglen)
......@@ -5485,7 +5526,7 @@ class LinuxKernelCryptoAPI(unittest.TestCase):
pack_uint32 = struct.Struct('I').pack
op, _ = algo.accept()
with op:
msg = assoc + plain + b'\x00' * taglen
msg = assoc + plain
op.sendmsg(
[msg],
([socket.SOL_ALG, socket.ALG_SET_OP, pack_uint32(socket.ALG_OP_ENCRYPT)],
......@@ -5493,7 +5534,7 @@ class LinuxKernelCryptoAPI(unittest.TestCase):
[socket.SOL_ALG, socket.ALG_SET_AEAD_ASSOCLEN, pack_uint32(assoclen)],
)
)
res = op.recv(len(msg))
res = op.recv(len(msg) + taglen)
self.assertEqual(expected_ct, res[assoclen:-taglen])
self.assertEqual(expected_tag, res[-taglen:])
......@@ -5503,8 +5544,8 @@ class LinuxKernelCryptoAPI(unittest.TestCase):
msg = assoc + expected_ct + expected_tag
op.sendmsg_afalg([msg], op=socket.ALG_OP_DECRYPT, iv=iv,
assoclen=assoclen)
res = op.recv(len(msg))
self.assertEqual(plain, res[assoclen:-taglen])
res = op.recv(len(msg) - taglen)
self.assertEqual(plain, res[assoclen:])
@support.requires_linux_version(4, 3) # see test_aes_cbc
def test_drbg_pr_sha256(self):
......
......@@ -18,6 +18,10 @@ import asyncore
import weakref
import platform
import functools
try:
import ctypes
except ImportError:
ctypes = None
ssl = support.import_module("ssl")
......@@ -172,6 +176,13 @@ class BasicSocketTests(unittest.TestCase):
ssl.OP_NO_COMPRESSION
self.assertIn(ssl.HAS_SNI, {True, False})
self.assertIn(ssl.HAS_ECDH, {True, False})
ssl.OP_NO_SSLv2
ssl.OP_NO_SSLv3
ssl.OP_NO_TLSv1
ssl.OP_NO_TLSv1_3
if ssl.OPENSSL_VERSION_INFO >= (1, 0, 1):
ssl.OP_NO_TLSv1_1
ssl.OP_NO_TLSv1_2
def test_str_for_enums(self):
# Make sure that the PROTOCOL_* constants have enum-like string
......@@ -1736,6 +1747,7 @@ class SimpleBackgroundTests(unittest.TestCase):
sslobj = ctx.wrap_bio(incoming, outgoing, False, 'localhost')
self.assertIs(sslobj._sslobj.owner, sslobj)
self.assertIsNone(sslobj.cipher())
self.assertIsNone(sslobj.version())
self.assertIsNotNone(sslobj.shared_ciphers())
self.assertRaises(ValueError, sslobj.getpeercert)
if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
......@@ -1743,6 +1755,7 @@ class SimpleBackgroundTests(unittest.TestCase):
self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
self.assertTrue(sslobj.cipher())
self.assertIsNotNone(sslobj.shared_ciphers())
self.assertIsNotNone(sslobj.version())
self.assertTrue(sslobj.getpeercert())
if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
self.assertTrue(sslobj.get_channel_binding('tls-unique'))
......@@ -1793,34 +1806,6 @@ class NetworkedTests(unittest.TestCase):
_test_get_server_certificate(self, 'ipv6.google.com', 443)
_test_get_server_certificate_fail(self, 'ipv6.google.com', 443)
def test_algorithms(self):
# Issue #8484: all algorithms should be available when verifying a
# certificate.
# SHA256 was added in OpenSSL 0.9.8
if ssl.OPENSSL_VERSION_INFO < (0, 9, 8, 0, 15):
self.skipTest("SHA256 not available on %r" % ssl.OPENSSL_VERSION)
# sha256.tbs-internet.com needs SNI to use the correct certificate
if not ssl.HAS_SNI:
self.skipTest("SNI needed for this test")
# https://sha2.hboeck.de/ was used until 2011-01-08 (no route to host)
remote = ("sha256.tbs-internet.com", 443)
sha256_cert = os.path.join(os.path.dirname(__file__), "sha256.pem")
with support.transient_internet("sha256.tbs-internet.com"):
ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
ctx.verify_mode = ssl.CERT_REQUIRED
ctx.load_verify_locations(sha256_cert)
s = ctx.wrap_socket(socket.socket(socket.AF_INET),
server_hostname="sha256.tbs-internet.com")
try:
s.connect(remote)
if support.verbose:
sys.stdout.write("\nCipher with %r is %r\n" %
(remote, s.cipher()))
sys.stdout.write("Certificate is:\n%s\n" %
pprint.pformat(s.getpeercert()))
finally:
s.close()
def _test_get_server_certificate(test, host, port, cert=None):
pem = ssl.get_server_certificate((host, port))
......@@ -1871,15 +1856,22 @@ if _have_threads:
self.sock, server_side=True)
self.server.selected_npn_protocols.append(self.sslconn.selected_npn_protocol())
self.server.selected_alpn_protocols.append(self.sslconn.selected_alpn_protocol())
except (ssl.SSLError, ConnectionResetError) as e:
except (ssl.SSLError, ConnectionResetError, OSError) as e:
# We treat ConnectionResetError as though it were an
# SSLError - OpenSSL on Ubuntu abruptly closes the
# connection when asked to use an unsupported protocol.
#
# OSError may occur with wrong protocols, e.g. both
# sides use PROTOCOL_TLS_SERVER.
#
# XXX Various errors can have happened here, for example
# a mismatching protocol version, an invalid certificate,
# or a low-level bug. This should be made more discriminating.
self.server.conn_errors.append(e)
#
# bpo-31323: Store the exception as string to prevent
# a reference leak: server -> conn_errors -> exception
# -> traceback -> self (ConnectionHandler) -> server
self.server.conn_errors.append(str(e))
if self.server.chatty:
handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
self.running = False
......@@ -2065,7 +2057,7 @@ if _have_threads:
class EchoServer (asyncore.dispatcher):
class ConnectionHandler (asyncore.dispatcher_with_send):
class ConnectionHandler(asyncore.dispatcher_with_send):
def __init__(self, conn, certfile):
self.socket = test_wrap_socket(conn, server_side=True,
......@@ -2156,6 +2148,8 @@ if _have_threads:
self.join()
if support.verbose:
sys.stdout.write(" cleanup: successfully joined.\n")
# make sure that ConnectionHandler is removed from socket_map
asyncore.close_all(ignore_all=True)
def start (self, flag=None):
self.flag = flag
......@@ -2903,7 +2897,13 @@ if _have_threads:
s.send(data)
buffer = bytearray(len(data))
self.assertEqual(s.read(-1, buffer), len(data))
self.assertEqual(buffer, data)
self.assertEqual(buffer, data) # sendall accepts bytes-like objects
if ctypes is not None:
ubyte = ctypes.c_ubyte * len(data)
byteslike = ubyte.from_buffer_copy(data)
s.sendall(byteslike)
self.assertEqual(s.read(), data)
# Make sure sendmsg et al are disallowed to avoid
# inadvertent disclosure of data and/or corruption
......@@ -3085,7 +3085,7 @@ if _have_threads:
with context.wrap_socket(socket.socket()) as s:
with self.assertRaises(OSError):
s.connect((HOST, server.port))
self.assertIn("no shared cipher", str(server.conn_errors[0]))
self.assertIn("no shared cipher", server.conn_errors[0])
def test_version_basic(self):
"""
......@@ -3102,12 +3102,33 @@ if _have_threads:
self.assertEqual(s.version(), 'TLSv1')
self.assertIs(s.version(), None)
@unittest.skipUnless(ssl.HAS_TLSv1_3,
"test requires TLSv1.3 enabled OpenSSL")
def test_tls1_3(self):
context = ssl.SSLContext(ssl.PROTOCOL_TLS)
context.load_cert_chain(CERTFILE)
# disable all but TLS 1.3
context.options |= (
ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 | ssl.OP_NO_TLSv1_2
)
with ThreadedEchoServer(context=context) as server:
with context.wrap_socket(socket.socket()) as s:
s.connect((HOST, server.port))
self.assertIn(s.cipher()[0], [
'TLS13-AES-256-GCM-SHA384',
'TLS13-CHACHA20-POLY1305-SHA256',
'TLS13-AES-128-GCM-SHA256',
])
@unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
def test_default_ecdh_curve(self):
# Issue #21015: elliptic curve-based Diffie Hellman key exchange
# should be enabled by default on SSL contexts.
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.load_cert_chain(CERTFILE)
# TLSv1.3 defaults to PFS key agreement and no longer has KEA in
# cipher name.
context.options |= ssl.OP_NO_TLSv1_3
# Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
# explicitly using the 'ECCdraft' cipher alias. Otherwise,
# our default cipher list should prefer ECDH-based ciphers
......@@ -3256,8 +3277,9 @@ if _have_threads:
except ssl.SSLError as e:
stats = e
if expected is None and IS_OPENSSL_1_1:
# OpenSSL 1.1.0 raises handshake error
if (expected is None and IS_OPENSSL_1_1
and ssl.OPENSSL_VERSION_INFO < (1, 1, 0, 6)):
# OpenSSL 1.1.0 to 1.1.0e raises handshake error
self.assertIsInstance(stats, ssl.SSLError)
else:
msg = "failed trying %s (s) and %s (c).\n" \
......@@ -3475,7 +3497,7 @@ if _have_threads:
client_context.verify_mode = ssl.CERT_REQUIRED
client_context.load_verify_locations(SIGNING_CA)
# first conncetion without session
# first connection without session
stats = server_params_test(client_context, server_context)
session = stats['session']
self.assertTrue(session.id)
......@@ -3535,6 +3557,10 @@ if _have_threads:
context2.load_verify_locations(CERTFILE)
context2.load_cert_chain(CERTFILE)
# TODO: session reuse does not work with TLS 1.3
context.options |= ssl.OP_NO_TLSv1_3
context2.options |= ssl.OP_NO_TLSv1_3
server = ThreadedEchoServer(context=context, chatty=False)
with server:
with context.wrap_socket(socket.socket()) as s:
......
......@@ -3,6 +3,7 @@ from unittest import mock
from test import support
import subprocess
import sys
import platform
import signal
import io
import os
......@@ -16,11 +17,23 @@ import shutil
import gc
import textwrap
try:
import ctypes
except ImportError:
ctypes = None
else:
import ctypes.util
try:
import threading
except ImportError:
threading = None
try:
import _testcapi
except ImportError:
_testcapi = None
if support.PGO:
raise unittest.SkipTest("test is not helpful for PGO")
......@@ -36,6 +49,8 @@ if mswindows:
else:
SETBINARY = ''
NONEXISTING_CMD = ('nonexisting_i_hope',)
class BaseTestCase(unittest.TestCase):
def setUp(self):
......@@ -48,6 +63,8 @@ class BaseTestCase(unittest.TestCase):
inst.wait()
subprocess._cleanup()
self.assertFalse(subprocess._active, "subprocess._active not empty")
self.doCleanups()
support.reap_children()
def assertStderrEqual(self, stderr, expected, msg=None):
# In a debug build, stuff like "[6580 refs]" is printed to stderr at
......@@ -293,7 +310,8 @@ class ProcessTestCase(BaseTestCase):
# Verify first that the call succeeds without the executable arg.
pre_args = [sys.executable, "-c"]
self._assert_python(pre_args)
self.assertRaises(FileNotFoundError, self._assert_python, pre_args,
self.assertRaises((FileNotFoundError, PermissionError),
self._assert_python, pre_args,
executable="doesnotexist")
@unittest.skipIf(mswindows, "executable argument replaces shell")
......@@ -340,6 +358,16 @@ class ProcessTestCase(BaseTestCase):
temp_dir = self._normalize_cwd(temp_dir)
self._assert_cwd(temp_dir, sys.executable, cwd=temp_dir)
def test_cwd_with_pathlike(self):
temp_dir = tempfile.gettempdir()
temp_dir = self._normalize_cwd(temp_dir)
class _PathLikeObj:
def __fspath__(self):
return temp_dir
self._assert_cwd(temp_dir, sys.executable, cwd=_PathLikeObj())
@unittest.skipIf(mswindows, "pending resolution of issue #15533")
def test_cwd_with_relative_arg(self):
# Check that Popen looks for args[0] relative to cwd if args[0]
......@@ -627,6 +655,46 @@ class ProcessTestCase(BaseTestCase):
# environment
b"['__CF_USER_TEXT_ENCODING']"))
def test_invalid_cmd(self):
# null character in the command name
cmd = sys.executable + '\0'
with self.assertRaises(ValueError):
subprocess.Popen([cmd, "-c", "pass"])
# null character in the command argument
with self.assertRaises(ValueError):
subprocess.Popen([sys.executable, "-c", "pass#\0"])
def test_invalid_env(self):
# null character in the enviroment variable name
newenv = os.environ.copy()
newenv["FRUIT\0VEGETABLE"] = "cabbage"
with self.assertRaises(ValueError):
subprocess.Popen([sys.executable, "-c", "pass"], env=newenv)
# null character in the enviroment variable value
newenv = os.environ.copy()
newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
with self.assertRaises(ValueError):
subprocess.Popen([sys.executable, "-c", "pass"], env=newenv)
# equal character in the enviroment variable name
newenv = os.environ.copy()
newenv["FRUIT=ORANGE"] = "lemon"
with self.assertRaises(ValueError):
subprocess.Popen([sys.executable, "-c", "pass"], env=newenv)
# equal character in the enviroment variable value
newenv = os.environ.copy()
newenv["FRUIT"] = "orange=lemon"
with subprocess.Popen([sys.executable, "-c",
'import sys, os;'
'sys.stdout.write(os.getenv("FRUIT"))'],
stdout=subprocess.PIPE,
env=newenv) as p:
stdout, stderr = p.communicate()
self.assertEqual(stdout, b"orange=lemon")
def test_communicate_stdin(self):
p = subprocess.Popen([sys.executable, "-c",
'import sys;'
......@@ -1014,6 +1082,19 @@ class ProcessTestCase(BaseTestCase):
# time to start.
self.assertEqual(p.wait(timeout=3), 0)
def test_wait_endtime(self):
"""Confirm that the deprecated endtime parameter warns."""
p = subprocess.Popen([sys.executable, "-c", "pass"])
try:
with self.assertWarns(DeprecationWarning) as warn_cm:
p.wait(endtime=time.time()+0.01)
except subprocess.TimeoutExpired:
pass # We're not testing endtime timeout behavior.
finally:
p.kill()
self.assertIn('test_subprocess.py', warn_cm.filename)
self.assertIn('endtime', str(warn_cm.warning))
def test_invalid_bufsize(self):
# an invalid type of the bufsize argument should raise
# TypeError.
......@@ -1041,10 +1122,11 @@ class ProcessTestCase(BaseTestCase):
p.stdin.write(line) # expect that it flushes the line in text mode
os.close(p.stdin.fileno()) # close it without flushing the buffer
read_line = p.stdout.readline()
try:
p.stdin.close()
except OSError:
pass
with support.SuppressCrashReport():
try:
p.stdin.close()
except OSError:
pass
p.stdin = None
self.assertEqual(p.returncode, 0)
self.assertEqual(read_line, expected)
......@@ -1069,13 +1151,54 @@ class ProcessTestCase(BaseTestCase):
# 1024 times (each call leaked two fds).
for i in range(1024):
with self.assertRaises(OSError) as c:
subprocess.Popen(['nonexisting_i_hope'],
subprocess.Popen(NONEXISTING_CMD,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
# ignore errors that indicate the command was not found
if c.exception.errno not in (errno.ENOENT, errno.EACCES):
raise c.exception
def test_nonexisting_with_pipes(self):
# bpo-30121: Popen with pipes must close properly pipes on error.
# Previously, os.close() was called with a Windows handle which is not
# a valid file descriptor.
#
# Run the test in a subprocess to control how the CRT reports errors
# and to get stderr content.
try:
import msvcrt
msvcrt.CrtSetReportMode
except (AttributeError, ImportError):
self.skipTest("need msvcrt.CrtSetReportMode")
code = textwrap.dedent(f"""
import msvcrt
import subprocess
cmd = {NONEXISTING_CMD!r}
for report_type in [msvcrt.CRT_WARN,
msvcrt.CRT_ERROR,
msvcrt.CRT_ASSERT]:
msvcrt.CrtSetReportMode(report_type, msvcrt.CRTDBG_MODE_FILE)
msvcrt.CrtSetReportFile(report_type, msvcrt.CRTDBG_FILE_STDERR)
try:
subprocess.Popen([cmd],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
except OSError:
pass
""")
cmd = [sys.executable, "-c", code]
proc = subprocess.Popen(cmd,
stderr=subprocess.PIPE,
universal_newlines=True)
with proc:
stderr = proc.communicate()[1]
self.assertEqual(stderr, "")
self.assertEqual(proc.returncode, 0)
@unittest.skipIf(threading is None, "threading required")
def test_double_close_on_error(self):
# Issue #18851
......@@ -1088,7 +1211,7 @@ class ProcessTestCase(BaseTestCase):
t.start()
try:
with self.assertRaises(EnvironmentError):
subprocess.Popen(['nonexisting_i_hope'],
subprocess.Popen(NONEXISTING_CMD,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
......@@ -1252,6 +1375,18 @@ class ProcessTestCase(BaseTestCase):
fds_after_exception = os.listdir(fd_directory)
self.assertEqual(fds_before_popen, fds_after_exception)
@unittest.skipIf(mswindows, "behavior currently not supported on Windows")
def test_file_not_found_includes_filename(self):
with self.assertRaises(FileNotFoundError) as c:
subprocess.call(['/opt/nonexistent_binary', 'with', 'some', 'args'])
self.assertEqual(c.exception.filename, '/opt/nonexistent_binary')
@unittest.skipIf(mswindows, "behavior currently not supported on Windows")
def test_file_not_found_with_bad_cwd(self):
with self.assertRaises(FileNotFoundError) as c:
subprocess.Popen(['exit', '0'], cwd='/some/nonexistent/directory')
self.assertEqual(c.exception.filename, '/some/nonexistent/directory')
class RunFuncTestCase(BaseTestCase):
def run_python(self, code, **kwargs):
......@@ -1410,6 +1545,53 @@ class POSIXProcessTestCase(BaseTestCase):
else:
self.fail("Expected OSError: %s" % desired_exception)
# We mock the __del__ method for Popen in the next two tests
# because it does cleanup based on the pid returned by fork_exec
# along with issuing a resource warning if it still exists. Since
# we don't actually spawn a process in these tests we can forego
# the destructor. An alternative would be to set _child_created to
# False before the destructor is called but there is no easy way
# to do that
class PopenNoDestructor(subprocess.Popen):
def __del__(self):
pass
@mock.patch("subprocess._posixsubprocess.fork_exec")
def test_exception_errpipe_normal(self, fork_exec):
"""Test error passing done through errpipe_write in the good case"""
def proper_error(*args):
errpipe_write = args[13]
# Write the hex for the error code EISDIR: 'is a directory'
err_code = '{:x}'.format(errno.EISDIR).encode()
os.write(errpipe_write, b"OSError:" + err_code + b":")
return 0
fork_exec.side_effect = proper_error
with self.assertRaises(IsADirectoryError):
self.PopenNoDestructor(["non_existent_command"])
@mock.patch("subprocess._posixsubprocess.fork_exec")
def test_exception_errpipe_bad_data(self, fork_exec):
"""Test error passing done through errpipe_write where its not
in the expected format"""
error_data = b"\xFF\x00\xDE\xAD"
def bad_error(*args):
errpipe_write = args[13]
# Anything can be in the pipe, no assumptions should
# be made about its encoding, so we'll write some
# arbitrary hex bytes to test it out
os.write(errpipe_write, error_data)
return 0
fork_exec.side_effect = bad_error
with self.assertRaises(subprocess.SubprocessError) as e:
self.PopenNoDestructor(["non_existent_command"])
self.assertIn(repr(error_data), str(e.exception))
def test_restore_signals(self):
# Code coverage for both values of restore_signals to make sure it
# at least does not blow up.
......@@ -2354,7 +2536,7 @@ class POSIXProcessTestCase(BaseTestCase):
# should trigger the wait() of p
time.sleep(0.2)
with self.assertRaises(OSError) as c:
with subprocess.Popen(['nonexisting_i_hope'],
with subprocess.Popen(NONEXISTING_CMD,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE) as proc:
pass
......@@ -2401,7 +2583,7 @@ class POSIXProcessTestCase(BaseTestCase):
with self.assertRaises(TypeError):
_posixsubprocess.fork_exec(
args, exe_list,
True, [], cwd, env_list,
True, (), cwd, env_list,
-1, -1, -1, -1,
1, 2, 3, 4,
True, True, func)
......@@ -2413,6 +2595,16 @@ class POSIXProcessTestCase(BaseTestCase):
def test_fork_exec_sorted_fd_sanity_check(self):
# Issue #23564: sanity check the fork_exec() fds_to_keep sanity check.
import _posixsubprocess
class BadInt:
first = True
def __init__(self, value):
self.value = value
def __int__(self):
if self.first:
self.first = False
return self.value
raise ValueError
gc_enabled = gc.isenabled()
try:
gc.enable()
......@@ -2423,6 +2615,7 @@ class POSIXProcessTestCase(BaseTestCase):
(18, 23, 42, 2**63), # Out of range.
(5, 4), # Not sorted.
(6, 7, 7, 8), # Duplicate.
(BadInt(1), BadInt(2)),
):
with self.assertRaises(
ValueError,
......@@ -2484,6 +2677,25 @@ class POSIXProcessTestCase(BaseTestCase):
proc.communicate(timeout=999)
mock_proc_stdin.close.assert_called_once_with()
@unittest.skipUnless(_testcapi is not None
and hasattr(_testcapi, 'W_STOPCODE'),
'need _testcapi.W_STOPCODE')
def test_stopped(self):
"""Test wait() behavior when waitpid returns WIFSTOPPED; issue29335."""
args = [sys.executable, '-c', 'pass']
proc = subprocess.Popen(args)
# Wait until the real process completes to avoid zombie process
pid = proc.pid
pid, status = os.waitpid(pid, 0)
self.assertEqual(status, 0)
status = _testcapi.W_STOPCODE(3)
with mock.patch('subprocess.os.waitpid', return_value=(pid, status)):
returncode = proc.wait()
self.assertEqual(returncode, -3)
@unittest.skipUnless(mswindows, "Windows specific tests")
class Win32ProcessTestCase(BaseTestCase):
......@@ -2523,6 +2735,15 @@ class Win32ProcessTestCase(BaseTestCase):
stdout=subprocess.PIPE,
close_fds=True)
@support.cpython_only
def test_issue31471(self):
# There shouldn't be an assertion failure in Popen() in case the env
# argument has a bad keys() method.
class BadEnv(dict):
keys = None
with self.assertRaises(TypeError):
subprocess.Popen([sys.executable, "-c", "pass"], env=BadEnv())
def test_close_fds(self):
# close file descriptors
rc = subprocess.call([sys.executable, "-c",
......@@ -2753,8 +2974,8 @@ class ContextManagerTests(BaseTestCase):
self.assertEqual(proc.returncode, 1)
def test_invalid_args(self):
with self.assertRaises(FileNotFoundError) as c:
with subprocess.Popen(['nonexisting_i_hope'],
with self.assertRaises((FileNotFoundError, PermissionError)) as c:
with subprocess.Popen(NONEXISTING_CMD,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE) as proc:
pass
......@@ -2776,19 +2997,5 @@ class ContextManagerTests(BaseTestCase):
self.assertTrue(proc.stdin.closed)
def test_main():
unit_tests = (ProcessTestCase,
POSIXProcessTestCase,
Win32ProcessTestCase,
MiscTests,
ProcessTestCaseNoPoll,
CommandsWithSpaces,
ContextManagerTests,
RunFuncTestCase,
)
support.run_unittest(*unit_tests)
support.reap_children()
if __name__ == "__main__":
unittest.main()
......@@ -170,6 +170,9 @@ class ThreadTests(BaseTestCase):
mutex.acquire()
self.assertIn(tid, threading._active)
self.assertIsInstance(threading._active[tid], threading._DummyThread)
#Issue 29376
self.assertTrue(threading._active[tid].is_alive())
self.assertRegex(repr(threading._active[tid]), '_DummyThread')
del threading._active[tid]
# PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently)
......@@ -462,18 +465,20 @@ class ThreadTests(BaseTestCase):
self.addCleanup(sys.setswitchinterval, old_interval)
# Make the bug more likely to manifest.
sys.setswitchinterval(1e-6)
test.support.setswitchinterval(1e-6)
for i in range(20):
t = threading.Thread(target=lambda: None)
t.start()
self.addCleanup(t.join)
pid = os.fork()
if pid == 0:
os._exit(1 if t.is_alive() else 0)
os._exit(11 if t.is_alive() else 10)
else:
t.join()
pid, status = os.waitpid(pid, 0)
self.assertEqual(0, status)
self.assertTrue(os.WIFEXITED(status))
self.assertEqual(10, os.WEXITSTATUS(status))
def test_main_thread(self):
main = threading.main_thread()
......@@ -570,6 +575,7 @@ class ThreadTests(BaseTestCase):
self.assertFalse(t.is_alive())
# And verify the thread disposed of _tstate_lock.
self.assertIsNone(t._tstate_lock)
t.join()
def test_repr_stopped(self):
# Verify that "stopped" shows up in repr(Thread) appropriately.
......@@ -596,6 +602,7 @@ class ThreadTests(BaseTestCase):
break
time.sleep(0.01)
self.assertIn(LOOKING_FOR, repr(t)) # we waited at least 5 seconds
t.join()
def test_BoundedSemaphore_limit(self):
# BoundedSemaphore should raise ValueError if released too often.
......@@ -910,6 +917,7 @@ class ThreadingExceptionTests(BaseTestCase):
thread = threading.Thread()
thread.start()
self.assertRaises(RuntimeError, thread.start)
thread.join()
def test_joining_current_thread(self):
current_thread = threading.current_thread()
......@@ -923,6 +931,7 @@ class ThreadingExceptionTests(BaseTestCase):
thread = threading.Thread()
thread.start()
self.assertRaises(RuntimeError, setattr, thread, "daemon", True)
thread.join()
def test_releasing_unacquired_lock(self):
lock = threading.Lock()
......@@ -1061,6 +1070,8 @@ class ThreadingExceptionTests(BaseTestCase):
thread.join()
self.assertIsNotNone(thread.exc)
self.assertIsInstance(thread.exc, RuntimeError)
# explicitly break the reference cycle to not leak a dangling thread
thread.exc = None
class TimerTests(BaseTestCase):
......@@ -1083,6 +1094,8 @@ class TimerTests(BaseTestCase):
self.callback_event.wait()
self.assertEqual(len(self.callback_args), 2)
self.assertEqual(self.callback_args, [((), {}), ((), {})])
timer1.join()
timer2.join()
def _callback_spy(self, *args, **kwargs):
self.callback_args.append((args[:], kwargs.copy()))
......@@ -1091,7 +1104,6 @@ class TimerTests(BaseTestCase):
class LockTests(lock_tests.LockTests):
locktype = staticmethod(threading.Lock)
@unittest.skip("not on gevent")
def test_locked_repr(self):
pass
......@@ -1112,6 +1124,7 @@ class EventTests(lock_tests.EventTests):
@unittest.skip("not on gevent")
def test_reset_internal_locks(self):
# XXX: gevent: this appears to have gone away by 3.6.3
pass
class ConditionAsRLockTests(lock_tests.RLockTests):
......
......@@ -484,12 +484,20 @@ if sys.version_info[0] == 3:
'test_subprocess.ProcessTestCase.test_io_buffered_by_default',
'test_subprocess.ProcessTestCase.test_io_unbuffered_works',
# 3.3 exposed the `endtime` argument to wait accidentally.
# It is documented as deprecated and not to be used since 3.4
# This test in 3.6.3 wants to use it though, and we don't have it.
'test_subprocess.ProcessTestCase.test_wait_endtime',
# These all want to inspect the string value of an exception raised
# by the exec() call in the child. The _posixsubprocess module arranges
# for better exception handling and printing than we do.
'test_subprocess.POSIXProcessTestCase.test_exception_bad_args_0',
'test_subprocess.POSIXProcessTestCase.test_exception_bad_executable',
'test_subprocess.POSIXProcessTestCase.test_exception_cwd',
# Relies on a 'fork_exec' attribute that we don't provide
'test_subprocess.POSIXProcessTestCase.test_exception_errpipe_bad_data',
'test_subprocess.POSIXProcessTestCase.test_exception_errpipe_normal',
# Python 3 fixed a bug if the stdio file descriptors were closed;
# we still have that bug
......@@ -721,6 +729,18 @@ if sys.version_info[:2] >= (3, 5):
# 'lock_tests.LockTests.lest_locked_repr',
# 'lock_tests.LockTests.lest_repr',
# Added between 3.6.0 and 3.6.3, uses _testcapi and internals
# of the subprocess module.
'test_subprocess.POSIXProcessTestCase.test_stopped',
# This test opens a socket, creates a new socket with the same fileno,
# closes the original socket (and hence fileno) and then
# expects that the calling setblocking() on the duplicate socket
# will raise an error. Our implementation doesn't work that way because
# setblocking() doesn't actually touch the file descriptor.
# That's probably OK because this was a GIL state error in CPython
# see https://github.com/python/cpython/commit/fa22b29960b4e683f4e5d7e308f674df2620473c
'test_socket.TestExceptions.test_setblocking_invalidfd',
]
if os.environ.get('GEVENT_RESOLVER') == 'ares':
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment