Commit 46ac1b2b authored by Jason Madden's avatar Jason Madden

Sync the new python 2.7 test_ssl code for pypy.

parent 27dfd220
......@@ -3,6 +3,7 @@
import sys
import unittest
from test import test_support
support = test_support
import asyncore
import socket
import select
......@@ -16,16 +17,21 @@ import traceback
import weakref
import functools
import platform
from contextlib import closing
from BaseHTTPServer import HTTPServer
from SimpleHTTPServer import SimpleHTTPRequestHandler
ssl = test_support.import_module("ssl")
try:
PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
except AttributeError:
PROTOCOLS = ()
HOST = test_support.HOST
CERTFILE = None
SVN_PYTHON_ORG_ROOT_CERT = None
NULLBYTECERT = None
def handle_error(prefix):
exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
......@@ -55,6 +61,17 @@ class BasicTests(unittest.TestCase):
# Issue #9415: Ubuntu hijacks their OpenSSL and forcefully disables SSLv2
def skip_if_broken_ubuntu_ssl(func):
if hasattr(ssl, 'PROTOCOL_SSLv2'):
if hasattr(ssl, 'SSLContext'): # >= 2.7.9
def f(*args, **kwargs):
try:
ssl.SSLContext(ssl.PROTOCOL_SSLv2)
except ssl.SSLError:
if (ssl.OPENSSL_VERSION_INFO == (0, 9, 8, 15, 15) and
platform.linux_distribution() == ('debian', 'squeeze/sid', '')):
raise unittest.SkipTest("Patched Ubuntu OpenSSL breaks behaviour")
return func(*args, **kwargs)
return f
# We need to access the lower-level wrapper in order to create an
# implicit SSL context without trying to connect or listen.
try:
......@@ -62,6 +79,7 @@ def skip_if_broken_ubuntu_ssl(func):
except ImportError:
# The returned function won't get executed, just ignore the error
pass
@functools.wraps(func)
def f(*args, **kwargs):
try:
......@@ -96,15 +114,22 @@ class BasicSocketTests(unittest.TestCase):
sys.stdout.write("\n RAND_status is %d (%s)\n"
% (v, (v and "sufficient randomness") or
"insufficient randomness"))
self.assertRaises(TypeError, ssl.RAND_egd, 1)
self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
try:
ssl.RAND_egd(1)
except TypeError:
pass
else:
print "didn't raise TypeError"
ssl.RAND_add("this is a random string", 75.0)
def test_parse_cert(self):
# note that this uses an 'unofficial' function in _ssl.c,
# provided solely for this test, to exercise the certificate
# parsing code
p = ssl._ssl._test_decode_cert(CERTFILE, False)
try:
p = ssl._ssl._test_decode_cert(CERTFILE, False)
except TypeError: # >= 2.7.9
p = ssl._ssl._test_decode_cert(CERTFILE)
if test_support.verbose:
sys.stdout.write("\n" + pprint.pformat(p) + "\n")
self.assertEqual(p['subject'],
......@@ -124,35 +149,6 @@ class BasicSocketTests(unittest.TestCase):
('DNS', 'projects.forum.nokia.com'))
)
def test_parse_cert_CVE_2013_4238(self):
p = ssl._ssl._test_decode_cert(NULLBYTECERT)
if test_support.verbose:
sys.stdout.write("\n" + pprint.pformat(p) + "\n")
subject = ((('countryName', 'US'),),
(('stateOrProvinceName', 'Oregon'),),
(('localityName', 'Beaverton'),),
(('organizationName', 'Python Software Foundation'),),
(('organizationalUnitName', 'Python Core Development'),),
(('commonName', 'null.python.org\x00example.org'),),
(('emailAddress', 'python-dev@python.org'),))
self.assertEqual(p['subject'], subject)
self.assertEqual(p['issuer'], subject)
if ssl.OPENSSL_VERSION_INFO >= (0, 9, 8):
san = (('DNS', 'altnull.python.org\x00example.com'),
('email', 'null@python.org\x00user@example.org'),
('URI', 'http://null.python.org\x00http://example.org'),
('IP Address', '192.0.2.1'),
('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
else:
# OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
san = (('DNS', 'altnull.python.org\x00example.com'),
('email', 'null@python.org\x00user@example.org'),
('URI', 'http://null.python.org\x00http://example.org'),
('IP Address', '192.0.2.1'),
('IP Address', '<invalid>'))
self.assertEqual(p['subjectAltName'], san)
def test_DER_to_PEM(self):
with open(SVN_PYTHON_ORG_ROOT_CERT, 'r') as f:
pem = f.read()
......@@ -192,8 +188,9 @@ class BasicSocketTests(unittest.TestCase):
self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
(s, t))
@test_support.requires_resource('network')
def test_ciphers(self):
if not test_support.is_resource_enabled('network'):
return
remote = ("svn.python.org", 443)
with test_support.transient_internet(remote[0]):
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
......@@ -202,11 +199,10 @@ class BasicSocketTests(unittest.TestCase):
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT")
s.connect(remote)
# Error checking occurs when connecting, because the SSL context
# isn't created before.
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
# Error checking can happen at instantiation (>=2.7.9) or when connecting
with self.assertRaisesRegexp(ssl.SSLError, "No cipher can be selected"):
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
s.connect(remote)
@test_support.cpython_only
......@@ -232,13 +228,6 @@ class BasicSocketTests(unittest.TestCase):
self.assertRaises(socket.error, ss.send, b'x')
self.assertRaises(socket.error, ss.sendto, b'x', ('0.0.0.0', 0))
def test_unsupported_dtls(self):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.addCleanup(s.close)
with self.assertRaises(NotImplementedError) as cx:
ssl.wrap_socket(s, cert_reqs=ssl.CERT_NONE)
self.assertEqual(str(cx.exception), "only stream sockets are supported")
class NetworkedTests(unittest.TestCase):
......@@ -315,34 +304,6 @@ class NetworkedTests(unittest.TestCase):
finally:
s.close()
def test_timeout_connect_ex(self):
# Issue #12065: on a timeout, connect_ex() should return the original
# errno (mimicking the behaviour of non-SSL sockets).
with test_support.transient_internet("svn.python.org"):
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
cert_reqs=ssl.CERT_REQUIRED,
ca_certs=SVN_PYTHON_ORG_ROOT_CERT,
do_handshake_on_connect=False)
try:
s.settimeout(0.0000001)
rc = s.connect_ex(('svn.python.org', 443))
if rc == 0:
self.skipTest("svn.python.org responded too quickly")
self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
finally:
s.close()
def test_connect_ex_error(self):
with test_support.transient_internet("svn.python.org"):
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
cert_reqs=ssl.CERT_REQUIRED,
ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
try:
self.assertEqual(errno.ECONNREFUSED,
s.connect_ex(("svn.python.org", 444)))
finally:
s.close()
@unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
def test_makefile_close(self):
# Issue #5238: creating a file-like object with makefile() shouldn't
......@@ -390,24 +351,19 @@ class NetworkedTests(unittest.TestCase):
def test_get_server_certificate(self):
with test_support.transient_internet("svn.python.org"):
pem = ssl.get_server_certificate(("svn.python.org", 443),
ssl.PROTOCOL_SSLv23)
pem = ssl.get_server_certificate(("svn.python.org", 443))
if not pem:
self.fail("No server certificate on svn.python.org:443!")
try:
pem = ssl.get_server_certificate(("svn.python.org", 443),
ssl.PROTOCOL_SSLv23,
ca_certs=CERTFILE)
pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=CERTFILE)
except ssl.SSLError:
#should fail
pass
else:
self.fail("Got server certificate %s for svn.python.org!" % pem)
pem = ssl.get_server_certificate(("svn.python.org", 443),
ssl.PROTOCOL_SSLv23,
ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
if not pem:
self.fail("No server certificate on svn.python.org:443!")
if test_support.verbose:
......@@ -419,14 +375,19 @@ class NetworkedTests(unittest.TestCase):
# SHA256 was added in OpenSSL 0.9.8
if ssl.OPENSSL_VERSION_INFO < (0, 9, 8, 0, 15):
self.skipTest("SHA256 not available on %r" % ssl.OPENSSL_VERSION)
self.skipTest("remote host needs SNI, only available on Python 3.2+")
# NOTE: https://sha2.hboeck.de is another possible test host
if not hasattr(ssl, 'SSLContext') or not getattr(ssl, 'HAS_SNI', True):
# The Python 2.7.8 test lib doesn't run this
self.skipTest('remote host needs SNI, only available on Python 3.2+')
# NOTE: https://sha256.tbs-internet.com is another possible test host
remote = ("sha256.tbs-internet.com", 443)
sha256_cert = os.path.join(os.path.dirname(__file__), "sha256.pem")
with test_support.transient_internet("sha256.tbs-internet.com"):
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
cert_reqs=ssl.CERT_REQUIRED,
ca_certs=sha256_cert,)
ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
ctx.verify_mode = ssl.CERT_REQUIRED
ctx.load_verify_locations(sha256_cert)
s = ctx.wrap_socket(socket.socket(socket.AF_INET),
server_hostname="sha256.tbs-internet.com")
try:
s.connect(remote)
if test_support.verbose:
......@@ -474,28 +435,66 @@ else:
if test_support.verbose and self.server.chatty:
sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
def wrap_conn(self):
try:
self.sslconn = ssl.wrap_socket(self.sock, server_side=True,
certfile=self.server.certificate,
ssl_version=self.server.protocol,
ca_certs=self.server.cacerts,
cert_reqs=self.server.certreqs,
ciphers=self.server.ciphers)
except ssl.SSLError as e:
# XXX Various errors can have happened here, for example
# a mismatching protocol version, an invalid certificate,
# or a low-level bug. This should be made more discriminating.
self.server.conn_errors.append(e)
if self.server.chatty:
handle_error("\n server: bad connection attempt from " +
str(self.sock.getpeername()) + ":\n")
self.close()
self.running = False
self.server.stop()
return False
else:
return True
if hasattr(ssl, 'SSLContext'): # >= 2.7.9
def wrap_conn(self):
try:
self.sslconn = self.server.context.wrap_socket(
self.sock, server_side=True)
self.server.selected_protocols.append(self.sslconn.selected_npn_protocol())
except socket.error as e:
# We treat ConnectionResetError as though it were an
# SSLError - OpenSSL on Ubuntu abruptly closes the
# connection when asked to use an unsupported protocol.
#
# XXX Various errors can have happened here, for example
# a mismatching protocol version, an invalid certificate,
# or a low-level bug. This should be made more discriminating.
if not isinstance(e, ssl.SSLError) and e.errno != errno.ECONNRESET:
raise
self.server.conn_errors.append(e)
if self.server.chatty:
handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
self.running = False
self.server.stop()
self.close()
return False
else:
if self.server.context.verify_mode == ssl.CERT_REQUIRED:
cert = self.sslconn.getpeercert()
if support.verbose and self.server.chatty:
sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
cert_binary = self.sslconn.getpeercert(True)
if support.verbose and self.server.chatty:
sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
cipher = self.sslconn.cipher()
if support.verbose and self.server.chatty:
sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
sys.stdout.write(" server: selected protocol is now "
+ str(self.sslconn.selected_npn_protocol()) + "\n")
return True
else:
def wrap_conn(self):
try:
self.sslconn = ssl.wrap_socket(self.sock, server_side=True,
certfile=self.server.certificate,
ssl_version=self.server.protocol,
ca_certs=self.server.cacerts,
cert_reqs=self.server.certreqs,
ciphers=self.server.ciphers)
except ssl.SSLError as e:
# XXX Various errors can have happened here, for example
# a mismatching protocol version, an invalid certificate,
# or a low-level bug. This should be made more discriminating.
self.server.conn_errors.append(e)
if self.server.chatty:
handle_error("\n server: bad connection attempt from " +
str(self.sock.getpeername()) + ":\n")
self.close()
self.running = False
self.server.stop()
return False
else:
return True
def read(self):
if self.sslconn:
......@@ -522,7 +521,7 @@ else:
self.sslconn = self.sock
elif not self.wrap_conn():
return
self.show_conn_details()
while self.running:
try:
msg = self.read()
......@@ -565,39 +564,72 @@ else:
# harness, we want to stop the server
self.server.stop()
def __init__(self, certificate, ssl_version=None,
certreqs=None, cacerts=None,
chatty=True, connectionchatty=False, starttls_server=False,
wrap_accepting_socket=False, ciphers=None):
if ssl_version is None:
ssl_version = ssl.PROTOCOL_TLSv1
if certreqs is None:
certreqs = ssl.CERT_NONE
self.certificate = certificate
self.protocol = ssl_version
self.certreqs = certreqs
self.cacerts = cacerts
self.ciphers = ciphers
self.chatty = chatty
self.connectionchatty = connectionchatty
self.starttls_server = starttls_server
self.sock = socket.socket()
self.flag = None
if wrap_accepting_socket:
self.sock = ssl.wrap_socket(self.sock, server_side=True,
certfile=self.certificate,
cert_reqs = self.certreqs,
ca_certs = self.cacerts,
ssl_version = self.protocol,
ciphers = self.ciphers)
if test_support.verbose and self.chatty:
sys.stdout.write(' server: wrapped server socket as %s\n' % str(self.sock))
self.port = test_support.bind_port(self.sock)
self.active = False
self.conn_errors = []
threading.Thread.__init__(self)
self.daemon = True
if hasattr(ssl, 'SSLContext'): # >= 2.7.9
def __init__(self, certificate=None, ssl_version=None,
certreqs=None, cacerts=None,
chatty=True, connectionchatty=False, starttls_server=False,
npn_protocols=None, ciphers=None, context=None):
if context:
self.context = context
else:
self.context = ssl.SSLContext(ssl_version
if ssl_version is not None
else ssl.PROTOCOL_TLSv1)
self.context.verify_mode = (certreqs if certreqs is not None
else ssl.CERT_NONE)
if cacerts:
self.context.load_verify_locations(cacerts)
if certificate:
self.context.load_cert_chain(certificate)
if npn_protocols:
self.context.set_npn_protocols(npn_protocols)
if ciphers:
self.context.set_ciphers(ciphers)
self.chatty = chatty
self.connectionchatty = connectionchatty
self.starttls_server = starttls_server
self.sock = socket.socket()
self.port = support.bind_port(self.sock)
self.flag = None
self.active = False
self.selected_protocols = []
self.conn_errors = []
threading.Thread.__init__(self)
self.daemon = True
else:
def __init__(self, certificate, ssl_version=None,
certreqs=None, cacerts=None,
chatty=True, connectionchatty=False, starttls_server=False,
wrap_accepting_socket=False, ciphers=None):
if ssl_version is None:
ssl_version = ssl.PROTOCOL_TLSv1
if certreqs is None:
certreqs = ssl.CERT_NONE
self.certificate = certificate
self.protocol = ssl_version
self.certreqs = certreqs
self.cacerts = cacerts
self.ciphers = ciphers
self.chatty = chatty
self.connectionchatty = connectionchatty
self.starttls_server = starttls_server
self.sock = socket.socket()
self.flag = None
if wrap_accepting_socket:
self.sock = ssl.wrap_socket(self.sock, server_side=True,
certfile=self.certificate,
cert_reqs = self.certreqs,
ca_certs = self.cacerts,
ssl_version = self.protocol,
ciphers = self.ciphers)
if test_support.verbose and self.chatty:
sys.stdout.write(' server: wrapped server socket as %s\n' % str(self.sock))
self.port = test_support.bind_port(self.sock)
self.active = False
self.conn_errors = []
threading.Thread.__init__(self)
self.daemon = True
def __enter__(self):
self.start(threading.Event())
......@@ -859,92 +891,197 @@ else:
else:
raise AssertionError("Use of invalid cert should have failed!")
def server_params_test(certfile, protocol, certreqs, cacertsfile,
client_certfile, client_protocol=None, indata="FOO\n",
ciphers=None, chatty=True, connectionchatty=False,
wrap_accepting_socket=False):
"""
Launch a server, connect a client to it and try various reads
and writes.
"""
server = ThreadedEchoServer(certfile,
certreqs=certreqs,
ssl_version=protocol,
cacerts=cacertsfile,
if hasattr(ssl, 'SSLContext'): # >= 2.7.9
def server_params_test(client_context, server_context, indata=b"FOO\n",
chatty=True, connectionchatty=False, sni_name=None):
"""
Launch a server, connect a client to it and try various reads
and writes.
"""
stats = {}
server = ThreadedEchoServer(context=server_context,
chatty=chatty,
connectionchatty=False)
with server:
with closing(client_context.wrap_socket(socket.socket(),
server_hostname=sni_name)) as s:
s.connect((HOST, server.port))
for arg in [indata, bytearray(indata), memoryview(indata)]:
if connectionchatty:
if support.verbose:
sys.stdout.write(
" client: sending %r...\n" % indata)
s.write(arg)
outdata = s.read()
if connectionchatty:
if support.verbose:
sys.stdout.write(" client: read %r\n" % outdata)
if outdata != indata.lower():
raise AssertionError(
"bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
% (outdata[:20], len(outdata),
indata[:20].lower(), len(indata)))
s.write(b"over\n")
if connectionchatty:
if support.verbose:
sys.stdout.write(" client: closing connection.\n")
stats.update({
'compression': s.compression(),
'cipher': s.cipher(),
'peercert': s.getpeercert(),
'client_npn_protocol': s.selected_npn_protocol(),
'version': s.version(),
})
s.close()
stats['server_npn_protocols'] = server.selected_protocols
return stats
def try_protocol_combo(server_protocol, client_protocol, expect_success,
certsreqs=None, server_options=0, client_options=0):
"""
Try to SSL-connect using *client_protocol* to *server_protocol*.
If *expect_success* is true, assert that the connection succeeds,
if it's false, assert that the connection fails.
Also, if *expect_success* is a string, assert that it is the protocol
version actually used by the connection.
"""
if certsreqs is None:
certsreqs = ssl.CERT_NONE
certtype = {
ssl.CERT_NONE: "CERT_NONE",
ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
ssl.CERT_REQUIRED: "CERT_REQUIRED",
}[certsreqs]
if support.verbose:
formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
sys.stdout.write(formatstr %
(ssl.get_protocol_name(client_protocol),
ssl.get_protocol_name(server_protocol),
certtype))
client_context = ssl.SSLContext(client_protocol)
client_context.options |= client_options
server_context = ssl.SSLContext(server_protocol)
server_context.options |= server_options
# NOTE: we must enable "ALL" ciphers on the client, otherwise an
# SSLv23 client will send an SSLv3 hello (rather than SSLv2)
# starting from OpenSSL 1.0.0 (see issue #8322).
if client_context.protocol == ssl.PROTOCOL_SSLv23:
client_context.set_ciphers("ALL")
for ctx in (client_context, server_context):
ctx.verify_mode = certsreqs
ctx.load_cert_chain(CERTFILE)
ctx.load_verify_locations(CERTFILE)
try:
stats = server_params_test(client_context, server_context,
chatty=False, connectionchatty=False)
# Protocol mismatch can result in either an SSLError, or a
# "Connection reset by peer" error.
except ssl.SSLError:
if expect_success:
raise
except socket.error as e:
if expect_success or e.errno != errno.ECONNRESET:
raise
else:
if not expect_success:
raise AssertionError(
"Client protocol %s succeeded with server protocol %s!"
% (ssl.get_protocol_name(client_protocol),
ssl.get_protocol_name(server_protocol)))
elif (expect_success is not True
and expect_success != stats['version']):
raise AssertionError("version mismatch: expected %r, got %r"
% (expect_success, stats['version']))
else:
def server_params_test(certfile, protocol, certreqs, cacertsfile,
client_certfile, client_protocol=None, indata="FOO\n",
ciphers=None, chatty=True, connectionchatty=False,
wrap_accepting_socket=False):
"""
Launch a server, connect a client to it and try various reads
and writes.
"""
server = ThreadedEchoServer(certfile,
certreqs=certreqs,
ssl_version=protocol,
cacerts=cacertsfile,
ciphers=ciphers,
chatty=chatty,
connectionchatty=connectionchatty,
wrap_accepting_socket=wrap_accepting_socket)
with server:
# try to connect
if client_protocol is None:
client_protocol = protocol
s = ssl.wrap_socket(socket.socket(),
certfile=client_certfile,
ca_certs=cacertsfile,
ciphers=ciphers,
chatty=chatty,
connectionchatty=connectionchatty,
wrap_accepting_socket=wrap_accepting_socket)
with server:
# try to connect
if client_protocol is None:
client_protocol = protocol
s = ssl.wrap_socket(socket.socket(),
certfile=client_certfile,
ca_certs=cacertsfile,
ciphers=ciphers,
cert_reqs=certreqs,
ssl_version=client_protocol)
s.connect((HOST, server.port))
for arg in [indata, bytearray(indata), memoryview(indata)]:
if connectionchatty:
if test_support.verbose:
sys.stdout.write(
" client: sending %s...\n" % (repr(arg)))
s.write(arg)
outdata = s.read()
cert_reqs=certreqs,
ssl_version=client_protocol)
s.connect((HOST, server.port))
for arg in [indata, bytearray(indata), memoryview(indata)]:
if connectionchatty:
if test_support.verbose:
sys.stdout.write(
" client: sending %s...\n" % (repr(arg)))
s.write(arg)
outdata = s.read()
if connectionchatty:
if test_support.verbose:
sys.stdout.write(" client: read %s\n" % repr(outdata))
if outdata != indata.lower():
raise AssertionError(
"bad data <<%s>> (%d) received; expected <<%s>> (%d)\n"
% (outdata[:min(len(outdata),20)], len(outdata),
indata[:min(len(indata),20)].lower(), len(indata)))
s.write("over\n")
if connectionchatty:
if test_support.verbose:
sys.stdout.write(" client: read %s\n" % repr(outdata))
if outdata != indata.lower():
raise AssertionError(
"bad data <<%s>> (%d) received; expected <<%s>> (%d)\n"
% (outdata[:min(len(outdata),20)], len(outdata),
indata[:min(len(indata),20)].lower(), len(indata)))
s.write("over\n")
if connectionchatty:
if test_support.verbose:
sys.stdout.write(" client: closing connection.\n")
s.close()
sys.stdout.write(" client: closing connection.\n")
s.close()
def try_protocol_combo(server_protocol,
client_protocol,
expect_success,
certsreqs=None):
if certsreqs is None:
certsreqs = ssl.CERT_NONE
certtype = {
ssl.CERT_NONE: "CERT_NONE",
ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
ssl.CERT_REQUIRED: "CERT_REQUIRED",
}[certsreqs]
if test_support.verbose:
formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
sys.stdout.write(formatstr %
(ssl.get_protocol_name(client_protocol),
ssl.get_protocol_name(server_protocol),
certtype))
try:
# NOTE: we must enable "ALL" ciphers, otherwise an SSLv23 client
# will send an SSLv3 hello (rather than SSLv2) starting from
# OpenSSL 1.0.0 (see issue #8322).
server_params_test(CERTFILE, server_protocol, certsreqs,
CERTFILE, CERTFILE, client_protocol,
ciphers="ALL", chatty=False)
# Protocol mismatch can result in either an SSLError, or a
# "Connection reset by peer" error.
except ssl.SSLError:
if expect_success:
raise
except socket.error as e:
if expect_success or e.errno != errno.ECONNRESET:
raise
else:
if not expect_success:
raise AssertionError(
"Client protocol %s succeeded with server protocol %s!"
% (ssl.get_protocol_name(client_protocol),
ssl.get_protocol_name(server_protocol)))
def try_protocol_combo(server_protocol,
client_protocol,
expect_success,
certsreqs=None):
if certsreqs is None:
certsreqs = ssl.CERT_NONE
certtype = {
ssl.CERT_NONE: "CERT_NONE",
ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
ssl.CERT_REQUIRED: "CERT_REQUIRED",
}[certsreqs]
if test_support.verbose:
formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
sys.stdout.write(formatstr %
(ssl.get_protocol_name(client_protocol),
ssl.get_protocol_name(server_protocol),
certtype))
try:
# NOTE: we must enable "ALL" ciphers, otherwise an SSLv23 client
# will send an SSLv3 hello (rather than SSLv2) starting from
# OpenSSL 1.0.0 (see issue #8322).
server_params_test(CERTFILE, server_protocol, certsreqs,
CERTFILE, CERTFILE, client_protocol,
ciphers="ALL", chatty=False)
# Protocol mismatch can result in either an SSLError, or a
# "Connection reset by peer" error.
except ssl.SSLError:
if expect_success:
raise
except socket.error as e:
if expect_success or e.errno != errno.ECONNRESET:
raise
else:
if not expect_success:
raise AssertionError(
"Client protocol %s succeeded with server protocol %s!"
% (ssl.get_protocol_name(client_protocol),
ssl.get_protocol_name(server_protocol)))
class ThreadedTests(unittest.TestCase):
......@@ -996,9 +1133,16 @@ else:
"""Basic test of an SSL client connecting to a server"""
if test_support.verbose:
sys.stdout.write("\n")
server_params_test(CERTFILE, ssl.PROTOCOL_TLSv1, ssl.CERT_NONE,
CERTFILE, CERTFILE, ssl.PROTOCOL_TLSv1,
chatty=True, connectionchatty=True)
if hasattr(ssl, 'SSLContext'): # >= 2.7.9
for protocol in PROTOCOLS:
context = ssl.SSLContext(protocol)
context.load_cert_chain(CERTFILE)
server_params_test(context, context,
chatty=True, connectionchatty=True)
else:
server_params_test(CERTFILE, ssl.PROTOCOL_TLSv1, ssl.CERT_NONE,
CERTFILE, CERTFILE, ssl.PROTOCOL_TLSv1,
chatty=True, connectionchatty=True)
def test_getpeercert(self):
if test_support.verbose:
......@@ -1178,7 +1322,14 @@ else:
url = 'https://127.0.0.1:%d/%s' % (
server.port, os.path.split(CERTFILE)[1])
with test_support.check_py3k_warnings():
f = urllib.urlopen(url)
import ssl
if hasattr(ssl, '_create_unverified_context'):
# Disable verification for our self-signed cert
# on Python >= 2.7.9 and 3.4
ssl_ctx = ssl._create_unverified_context()
f = urllib.urlopen(url, context=ssl_ctx)
else:
f = urllib.urlopen(url)
dlen = f.info().getheader("content-length")
if dlen and (int(dlen) > 0):
d2 = f.read(int(dlen))
......@@ -1192,6 +1343,7 @@ else:
server.stop()
server.join()
@unittest.skipIf(hasattr(ssl, 'SSLContext'), "<= 2.7.9 only")
def test_wrapped_accept(self):
"""Check the accept() method on SSL sockets."""
if test_support.verbose:
......@@ -1403,7 +1555,7 @@ else:
def test_main(verbose=False):
global CERTFILE, SVN_PYTHON_ORG_ROOT_CERT, NOKIACERT, NULLBYTECERT
global CERTFILE, SVN_PYTHON_ORG_ROOT_CERT, NOKIACERT
CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir,
"keycert.pem")
SVN_PYTHON_ORG_ROOT_CERT = os.path.join(
......@@ -1411,13 +1563,10 @@ def test_main(verbose=False):
"https_svn_python_org_root.pem")
NOKIACERT = os.path.join(os.path.dirname(__file__) or os.curdir,
"nokia.pem")
NULLBYTECERT = os.path.join(os.path.dirname(__file__) or os.curdir,
"nullbytecert.pem")
if (not os.path.exists(CERTFILE) or
not os.path.exists(SVN_PYTHON_ORG_ROOT_CERT) or
not os.path.exists(NOKIACERT) or
not os.path.exists(NULLBYTECERT)):
not os.path.exists(NOKIACERT)):
raise test_support.TestFailed("Can't read certificate files!")
tests = [BasicTests, BasicSocketTests]
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment