Commit 8fefa73f authored by Łukasz Nowak's avatar Łukasz Nowak

kedifa: Be python2 and python3 compatible

 * *: defined encoding on each file
 * app: use b'' or encode before returning with wsgiref
 * cli: switch to binary for files
 * test: decode/b'' or encode when needed
parent 40db2229
# -*- coding: utf-8 -*-
# Copyright (C) 2018 Nexedi SA # Copyright (C) 2018 Nexedi SA
# Lukasz Nowak <luke@nexedi.com> # Lukasz Nowak <luke@nexedi.com>
# #
...@@ -247,7 +248,7 @@ class SQLite3Storage(local): ...@@ -247,7 +248,7 @@ class SQLite3Storage(local):
(reference, now, index), (reference, now, index),
) )
if result: if result:
return result['pem'].encode('ascii') return result['pem']
return None return None
def iterCertificateIndexes(self, reference): def iterCertificateIndexes(self, reference):
...@@ -365,7 +366,7 @@ class Kedifa(object): ...@@ -365,7 +366,7 @@ class Kedifa(object):
parameters = urlparse.parse_qs(qs, strict_parsing=True) parameters = urlparse.parse_qs(qs, strict_parsing=True)
except ValueError: except ValueError:
start_response('400 Bad Request', headers_text_plain) start_response('400 Bad Request', headers_text_plain)
return ('Query string %r was not correct.' % (qs, ),) return (b'Query string %r was not correct.' % (qs, ),)
if len(path_list) == 2: if len(path_list) == 2:
_, reference = path_list _, reference = path_list
...@@ -376,22 +377,22 @@ class Kedifa(object): ...@@ -376,22 +377,22 @@ class Kedifa(object):
index = None index = None
else: else:
start_response('400 Bad Request', headers_text_plain) start_response('400 Bad Request', headers_text_plain)
return ('Wrong path',) return (b'Wrong path',)
if not reference: if not reference:
start_response('400 Bad Request', headers_text_plain) start_response('400 Bad Request', headers_text_plain)
return ('Wrong path',) return (b'Wrong path',)
if environ['REQUEST_METHOD'] == 'PUT': if environ['REQUEST_METHOD'] == 'PUT':
# key auth # key auth
if 'auth' not in parameters: if 'auth' not in parameters:
start_response('400 Bad Request', headers_text_plain) start_response('400 Bad Request', headers_text_plain)
return ('Missing auth',) return (b'Missing auth',)
elif not self.pocket_db.validateUploader( elif not self.pocket_db.validateUploader(
reference, parameters['auth'][0]): reference, parameters['auth'][0]):
headers = headers_text_plain + [('WWW-Authenticate', 'transport')] headers = headers_text_plain + [('WWW-Authenticate', 'transport')]
start_response('401 Unauthorized', headers) start_response('401 Unauthorized', headers)
return ('',) return (b'',)
# play with curl --data-binary # play with curl --data-binary
if index is not None: if index is not None:
raise ValueError raise ValueError
...@@ -401,7 +402,7 @@ class Kedifa(object): ...@@ -401,7 +402,7 @@ class Kedifa(object):
certificate = self.checkKeyCertificate(request_body) certificate = self.checkKeyCertificate(request_body)
except CertificateError as e: except CertificateError as e:
start_response('422 Unprocessable Entity', headers_text_plain) start_response('422 Unprocessable Entity', headers_text_plain)
return e return (str(e).encode(),)
else: else:
try: try:
certificate_id = self.pocket_db.addCertificate( certificate_id = self.pocket_db.addCertificate(
...@@ -413,10 +414,10 @@ class Kedifa(object): ...@@ -413,10 +414,10 @@ class Kedifa(object):
) )
except ReferenceNotFound: except ReferenceNotFound:
start_response('404 Not Found', headers_text_plain) start_response('404 Not Found', headers_text_plain)
return ('Reservation required',) return (b'Reservation required',)
start_response('201 Created', headers_text_plain + [ start_response('201 Created', headers_text_plain + [
('Location', '/'.join(path_list + [str(certificate_id)]))]) ('Location', '/'.join(path_list + [str(certificate_id)]))])
return ('',) return (b'',)
elif environ['REQUEST_METHOD'] == 'POST': elif environ['REQUEST_METHOD'] == 'POST':
# SSL-auth # SSL-auth
try: try:
...@@ -424,7 +425,7 @@ class Kedifa(object): ...@@ -424,7 +425,7 @@ class Kedifa(object):
except Unauthorized: except Unauthorized:
headers = headers_text_plain + [('WWW-Authenticate', 'transport')] headers = headers_text_plain + [('WWW-Authenticate', 'transport')]
start_response('401 Unauthorized', headers) start_response('401 Unauthorized', headers)
return ('',) return (b'',)
if index is not None: if index is not None:
raise ValueError raise ValueError
if reference != 'reserve-id': if reference != 'reserve-id':
...@@ -433,7 +434,7 @@ class Kedifa(object): ...@@ -433,7 +434,7 @@ class Kedifa(object):
reserved_id = self.pocket_db.reserveId() reserved_id = self.pocket_db.reserveId()
start_response('201 Created', headers_text_plain + [ start_response('201 Created', headers_text_plain + [
('Location', '/%s' % reserved_id)]) ('Location', '/%s' % reserved_id)])
return (reserved_id,) return (reserved_id.encode(),)
elif environ['REQUEST_METHOD'] == 'GET': elif environ['REQUEST_METHOD'] == 'GET':
if index == 'list': if index == 'list':
# SSL-auth # SSL-auth
...@@ -442,23 +443,23 @@ class Kedifa(object): ...@@ -442,23 +443,23 @@ class Kedifa(object):
except Unauthorized: except Unauthorized:
headers = headers_text_plain + [('WWW-Authenticate', 'transport')] headers = headers_text_plain + [('WWW-Authenticate', 'transport')]
start_response('401 Unauthorized', headers) start_response('401 Unauthorized', headers)
return ('',) return (b'',)
key_list = [ key_list = [
str(q) for q in self.pocket_db.iterCertificateIndexes(reference)] str(q) for q in self.pocket_db.iterCertificateIndexes(reference)]
start_response('200 OK', headers_application_json) start_response('200 OK', headers_application_json)
return (json.dumps(dict(key_list=key_list), indent=2),) return (json.dumps(dict(key_list=key_list), indent=2).encode(),)
elif index == 'generateauth': elif index == 'generateauth':
try: try:
key = self.pocket_db.addUploader(reference) key = self.pocket_db.addUploader(reference)
except UserExists: except UserExists:
start_response('403 Forbidden', headers_text_plain) start_response('403 Forbidden', headers_text_plain)
return ('Already exists',) return (b'Already exists',)
except ReferenceNotFound: except ReferenceNotFound:
start_response('404 Not Found', headers_text_plain) start_response('404 Not Found', headers_text_plain)
return ('Reservation required',) return (b'Reservation required',)
else: else:
start_response('201 Created', headers_text_plain) start_response('201 Created', headers_text_plain)
return (key,) return (key.encode(),)
else: else:
# SSL-auth # SSL-auth
try: try:
...@@ -466,13 +467,15 @@ class Kedifa(object): ...@@ -466,13 +467,15 @@ class Kedifa(object):
except Unauthorized: except Unauthorized:
headers = headers_text_plain + [('WWW-Authenticate', 'transport')] headers = headers_text_plain + [('WWW-Authenticate', 'transport')]
start_response('401 Unauthorized', headers) start_response('401 Unauthorized', headers)
return ('',) return (b'',)
certificate = self.pocket_db.getCertificate(reference, index) certificate = self.pocket_db.getCertificate(reference, index)
if certificate is None: if certificate is None:
start_response('404 Not Found', headers_text_plain) start_response('404 Not Found', headers_text_plain)
return ('',) return (b'',)
else: else:
start_response('200 OK', headers_text_plain) start_response('200 OK', headers_text_plain)
if getattr(certificate, 'encode', None) is not None:
certificate = certificate.encode()
return (certificate,) return (certificate,)
else: else:
raise NotImplementedError raise NotImplementedError
......
# -*- coding: utf-8 -*-
# Copyright (C) 2018 Nexedi SA # Copyright (C) 2018 Nexedi SA
# Lukasz Nowak <luke@nexedi.com> # Lukasz Nowak <luke@nexedi.com>
# #
...@@ -130,7 +131,7 @@ def getter(*args): ...@@ -130,7 +131,7 @@ def getter(*args):
url, response.status_code)) url, response.status_code))
sys.exit(1) sys.exit(1)
if len(response.text) > 0: if len(response.text) > 0:
with open(parsed.out, 'w') as out: with open(parsed.out, 'wb') as out:
out.write(response.text.encode('utf-8')) out.write(response.text.encode('utf-8'))
......
# -*- coding: utf-8 -*-
# Copyright (C) 2018 Nexedi SA # Copyright (C) 2018 Nexedi SA
# Lukasz Nowak <luke@nexedi.com> # Lukasz Nowak <luke@nexedi.com>
# #
...@@ -17,7 +18,7 @@ ...@@ -17,7 +18,7 @@
# See COPYING file for full licensing terms. # See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options. # See https://www.nexedi.com/licensing for rationale and options.
from six import StringIO, BytesIO from six import BytesIO
import contextlib import contextlib
import datetime import datetime
from six.moves import http_client as httplib from six.moves import http_client as httplib
...@@ -149,7 +150,7 @@ class KedifaCaucaseMixin(KedifaMixin): ...@@ -149,7 +150,7 @@ class KedifaCaucaseMixin(KedifaMixin):
format=serialization.PrivateFormat.TraditionalOpenSSL, format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption() encryption_algorithm=serialization.NoEncryption()
) )
return key, key_pem return key, key_pem.decode()
def generateCSR(self, ip): def generateCSR(self, ip):
key_pem_file = os.path.join(self.testdir, '%s-key.pem' % (ip,)) key_pem_file = os.path.join(self.testdir, '%s-key.pem' % (ip,))
...@@ -169,7 +170,7 @@ class KedifaCaucaseMixin(KedifaMixin): ...@@ -169,7 +170,7 @@ class KedifaCaucaseMixin(KedifaMixin):
).sign(key, hashes.SHA256(), default_backend()) ).sign(key, hashes.SHA256(), default_backend())
with open(csr_pem_file, 'w') as out: with open(csr_pem_file, 'w') as out:
out.write(csr.public_bytes(serialization.Encoding.PEM)) out.write(csr.public_bytes(serialization.Encoding.PEM).decode())
return key_pem_file, csr_pem_file return key_pem_file, csr_pem_file
...@@ -199,7 +200,7 @@ class KedifaCaucaseMixin(KedifaMixin): ...@@ -199,7 +200,7 @@ class KedifaCaucaseMixin(KedifaMixin):
not_valid_after not_valid_after
).sign(key, hashes.SHA256(), default_backend()) ).sign(key, hashes.SHA256(), default_backend())
certificate_pem = certificate.public_bytes(serialization.Encoding.PEM) certificate_pem = certificate.public_bytes(serialization.Encoding.PEM)
return key, key_pem, certificate, certificate_pem return key, key_pem, certificate, certificate_pem.decode()
def createPem(self): def createPem(self):
_, key_pem, _, certificate_pem = self.generateKeyCertificateData() _, key_pem, _, certificate_pem = self.generateKeyCertificateData()
...@@ -249,9 +250,11 @@ class KedifaCaucaseMixin(KedifaMixin): ...@@ -249,9 +250,11 @@ class KedifaCaucaseMixin(KedifaMixin):
) )
self.cas = cas.split() self.cas = cas.split()
kedifa_key_pem, csr_file = self.generateCSR(unicode(common_name)) if getattr(common_name, 'decode', None) is not None:
out = StringIO() common_name = common_name.decode()
err = StringIO() kedifa_key_pem, csr_file = self.generateCSR(common_name)
out = BytesIO()
err = BytesIO()
caucase.cli.main( caucase.cli.main(
argv=self.cas + [ argv=self.cas + [
'--send-csr', csr_file '--send-csr', csr_file
...@@ -260,14 +263,14 @@ class KedifaCaucaseMixin(KedifaMixin): ...@@ -260,14 +263,14 @@ class KedifaCaucaseMixin(KedifaMixin):
stderr=err, stderr=err,
) )
self.assertEqual('', err.getvalue().strip()) self.assertEqual(b'', err.getvalue().strip())
output = out.getvalue().strip() output = out.getvalue().strip()
try: try:
csr_id = output.split()[0] csr_id = output.split()[0]
except IndexError: except IndexError:
self.fail('csr_id parse failed: %s' % (output,)) self.fail('csr_id parse failed: %s' % (output,))
caucase.cli.main(argv=self.cas + [ caucase.cli.main(argv=self.cas + [
'--get-crt', csr_id, kedifa_key_pem '--get-crt', csr_id.decode(), kedifa_key_pem
]) ])
# inject other root CA # inject other root CA
...@@ -387,7 +390,6 @@ ez+ONyvetfvjD8cxyQ== ...@@ -387,7 +390,6 @@ ez+ONyvetfvjD8cxyQ==
return requests.put(verify=self.ca_crt_pem, *args, **kwargs) return requests.put(verify=self.ca_crt_pem, *args, **kwargs)
@unittest.skipIf(sys.version_info >= (3, ), 'kedifa currently supports python 2 only')
class KedifaIntegrationTest(KedifaCaucaseMixin, unittest.TestCase): class KedifaIntegrationTest(KedifaCaucaseMixin, unittest.TestCase):
def assertAnyLogEntry(self, entry): def assertAnyLogEntry(self, entry):
with open(self.logfile) as fh: with open(self.logfile) as fh:
...@@ -435,7 +437,7 @@ class KedifaIntegrationTest(KedifaCaucaseMixin, unittest.TestCase): ...@@ -435,7 +437,7 @@ class KedifaIntegrationTest(KedifaCaucaseMixin, unittest.TestCase):
def _updater_get(self, url, certificate, destination): def _updater_get(self, url, certificate, destination):
mapping = tempfile.NamedTemporaryFile(dir=self.testdir, delete=False) mapping = tempfile.NamedTemporaryFile(dir=self.testdir, delete=False)
mapping.write("%s %s" % (url, destination)) mapping.write(("%s %s" % (url, destination)).encode())
mapping.close() mapping.close()
state = tempfile.NamedTemporaryFile(dir=self.testdir, delete=False) state = tempfile.NamedTemporaryFile(dir=self.testdir, delete=False)
state.close() state.close()
...@@ -530,8 +532,8 @@ class KedifaIntegrationTest(KedifaCaucaseMixin, unittest.TestCase): ...@@ -530,8 +532,8 @@ class KedifaIntegrationTest(KedifaCaucaseMixin, unittest.TestCase):
raise raise
def revokeCaucaseServiceCertifice(self): def revokeCaucaseServiceCertifice(self):
out = StringIO() out = BytesIO()
err = StringIO() err = BytesIO()
caucase.cli.main( caucase.cli.main(
argv=self.cas + [ argv=self.cas + [
'--revoke-crt', self.client_key_pem, self.client_key_pem '--revoke-crt', self.client_key_pem, self.client_key_pem
...@@ -540,8 +542,8 @@ class KedifaIntegrationTest(KedifaCaucaseMixin, unittest.TestCase): ...@@ -540,8 +542,8 @@ class KedifaIntegrationTest(KedifaCaucaseMixin, unittest.TestCase):
stderr=err, stderr=err,
) )
self.assertEqual('', out.getvalue().strip()) self.assertEqual(b'', out.getvalue().strip())
self.assertEqual('', err.getvalue().strip()) self.assertEqual(b'', err.getvalue().strip())
def test_GET_revoked_identity(self): def test_GET_revoked_identity(self):
self.put() self.put()
...@@ -800,7 +802,7 @@ class KedifaIntegrationTest(KedifaCaucaseMixin, unittest.TestCase): ...@@ -800,7 +802,7 @@ class KedifaIntegrationTest(KedifaCaucaseMixin, unittest.TestCase):
) )
def test_GET_invalid_yet(self): def test_GET_invalid_yet(self):
from app import SQLite3Storage from .app import SQLite3Storage
pocket_db = SQLite3Storage(self.db) pocket_db = SQLite3Storage(self.db)
_, key_pem, _, certificate_pem = self.generateKeyCertificateData() _, key_pem, _, certificate_pem = self.generateKeyCertificateData()
not_valid_before = datetime.datetime.utcnow() + datetime.timedelta(days=10) not_valid_before = datetime.datetime.utcnow() + datetime.timedelta(days=10)
...@@ -868,7 +870,7 @@ class KedifaIntegrationTest(KedifaCaucaseMixin, unittest.TestCase): ...@@ -868,7 +870,7 @@ class KedifaIntegrationTest(KedifaCaucaseMixin, unittest.TestCase):
) )
def test_GET_expired(self): def test_GET_expired(self):
from app import SQLite3Storage from .app import SQLite3Storage
pocket_db = SQLite3Storage(self.db) pocket_db = SQLite3Storage(self.db)
_, key_pem, _, certificate_pem = self.generateKeyCertificateData() _, key_pem, _, certificate_pem = self.generateKeyCertificateData()
not_valid_before = datetime.datetime.utcnow() - datetime.timedelta(days=10) not_valid_before = datetime.datetime.utcnow() - datetime.timedelta(days=10)
...@@ -1010,7 +1012,7 @@ class KedifaIntegrationTest(KedifaCaucaseMixin, unittest.TestCase): ...@@ -1010,7 +1012,7 @@ class KedifaIntegrationTest(KedifaCaucaseMixin, unittest.TestCase):
not_valid_before=datetime.datetime.utcnow() - datetime.timedelta(days=4), not_valid_before=datetime.datetime.utcnow() - datetime.timedelta(days=4),
not_valid_after=datetime.datetime.utcnow() + datetime.timedelta(days=2) not_valid_after=datetime.datetime.utcnow() + datetime.timedelta(days=2)
) )
self.put(data=key_pem + certificate_pem, auth=auth) self.put(data=certificate_pem + key_pem, auth=auth)
def test_PUT_multiple_different_reference(self): def test_PUT_multiple_different_reference(self):
# put first certificate # put first certificate
...@@ -1023,7 +1025,7 @@ class KedifaIntegrationTest(KedifaCaucaseMixin, unittest.TestCase): ...@@ -1023,7 +1025,7 @@ class KedifaIntegrationTest(KedifaCaucaseMixin, unittest.TestCase):
not_valid_before=datetime.datetime.utcnow() - datetime.timedelta(days=4), not_valid_before=datetime.datetime.utcnow() - datetime.timedelta(days=4),
not_valid_after=datetime.datetime.utcnow() + datetime.timedelta(days=2) not_valid_after=datetime.datetime.utcnow() + datetime.timedelta(days=2)
) )
self.put(key=reference, data=key_pem + certificate_pem, auth=auth) self.put(key=reference, data=certificate_pem + key_pem, auth=auth)
def test_PUT_certificate_expired(self): def test_PUT_certificate_expired(self):
_, key_pem, _, certificate_pem = self.generateKeyCertificateData( _, key_pem, _, certificate_pem = self.generateKeyCertificateData(
...@@ -1032,7 +1034,7 @@ class KedifaIntegrationTest(KedifaCaucaseMixin, unittest.TestCase): ...@@ -1032,7 +1034,7 @@ class KedifaIntegrationTest(KedifaCaucaseMixin, unittest.TestCase):
) )
auth = self.generateauth() auth = self.generateauth()
url = self.kedifa_url + self.reference + '?auth=%s' % (auth, ) url = self.kedifa_url + self.reference + '?auth=%s' % (auth, )
result = self.requests_put(url, data=key_pem + certificate_pem) result = self.requests_put(url, data=certificate_pem + key_pem)
self.assertEqual( self.assertEqual(
httplib.UNPROCESSABLE_ENTITY, httplib.UNPROCESSABLE_ENTITY,
result.status_code result.status_code
...@@ -1049,7 +1051,7 @@ class KedifaIntegrationTest(KedifaCaucaseMixin, unittest.TestCase): ...@@ -1049,7 +1051,7 @@ class KedifaIntegrationTest(KedifaCaucaseMixin, unittest.TestCase):
) )
auth = self.generateauth() auth = self.generateauth()
url = self.kedifa_url + self.reference + '?auth=%s' % (auth, ) url = self.kedifa_url + self.reference + '?auth=%s' % (auth, )
result = self.requests_put(url, data=key_pem + certificate_pem) result = self.requests_put(url, data=certificate_pem + key_pem)
self.assertEqual( self.assertEqual(
httplib.UNPROCESSABLE_ENTITY, httplib.UNPROCESSABLE_ENTITY,
result.status_code result.status_code
...@@ -1158,7 +1160,7 @@ class KedifaIntegrationTest(KedifaCaucaseMixin, unittest.TestCase): ...@@ -1158,7 +1160,7 @@ class KedifaIntegrationTest(KedifaCaucaseMixin, unittest.TestCase):
) )
def addExpiredNonvalidyetCertificate(self, key): def addExpiredNonvalidyetCertificate(self, key):
from app import SQLite3Storage from .app import SQLite3Storage
pocket_db = SQLite3Storage(self.db) pocket_db = SQLite3Storage(self.db)
not_valid_before_valid = datetime.datetime.utcnow() - \ not_valid_before_valid = datetime.datetime.utcnow() - \
...@@ -1188,7 +1190,7 @@ class KedifaIntegrationTest(KedifaCaucaseMixin, unittest.TestCase): ...@@ -1188,7 +1190,7 @@ class KedifaIntegrationTest(KedifaCaucaseMixin, unittest.TestCase):
) )
def _getDBCertificateCount(self): def _getDBCertificateCount(self):
from app import SQLite3Storage from .app import SQLite3Storage
pocket_db = SQLite3Storage(self.db) pocket_db = SQLite3Storage(self.db)
return pocket_db._executeSingleRow( return pocket_db._executeSingleRow(
'SELECT COUNT(*) FROM certificate')['COUNT(*)'] 'SELECT COUNT(*) FROM certificate')['COUNT(*)']
...@@ -1264,12 +1266,11 @@ class KedifaUpdaterMixin(KedifaMixin): ...@@ -1264,12 +1266,11 @@ class KedifaUpdaterMixin(KedifaMixin):
def setupMapping(self, mapping_content=''): def setupMapping(self, mapping_content=''):
mapping = tempfile.NamedTemporaryFile(dir=self.testdir, delete=False) mapping = tempfile.NamedTemporaryFile(dir=self.testdir, delete=False)
mapping.write(mapping_content) mapping.write(mapping_content.encode())
mapping.close() mapping.close()
self.mapping = mapping.name self.mapping = mapping.name
@unittest.skipIf(sys.version_info >= (3, ), 'kedifa currently supports python 2 only')
class KedifaUpdaterMappingTest(KedifaUpdaterMixin, unittest.TestCase): class KedifaUpdaterMappingTest(KedifaUpdaterMixin, unittest.TestCase):
def test_updateMapping_empty(self): def test_updateMapping_empty(self):
self.setupMapping() self.setupMapping()
...@@ -1322,7 +1323,6 @@ class KedifaUpdaterMappingTest(KedifaUpdaterMixin, unittest.TestCase): ...@@ -1322,7 +1323,6 @@ class KedifaUpdaterMappingTest(KedifaUpdaterMixin, unittest.TestCase):
self.assertEqual(u.mapping, {'file': ('url', None)}) self.assertEqual(u.mapping, {'file': ('url', None)})
@unittest.skipIf(sys.version_info >= (3, ), 'kedifa currently supports python 2 only')
class KedifaUpdaterUpdateCertificateTest( class KedifaUpdaterUpdateCertificateTest(
KedifaUpdaterMixin, unittest.TestCase): KedifaUpdaterMixin, unittest.TestCase):
def setUp(self): def setUp(self):
...@@ -1339,7 +1339,7 @@ class KedifaUpdaterUpdateCertificateTest( ...@@ -1339,7 +1339,7 @@ class KedifaUpdaterUpdateCertificateTest(
if fallback: if fallback:
fallback_file = tempfile.NamedTemporaryFile( fallback_file = tempfile.NamedTemporaryFile(
dir=self.testdir, delete=False) dir=self.testdir, delete=False)
fallback_file.write(fallback) fallback_file.write(fallback.encode())
fallback_file.close() fallback_file.close()
mapping = 'http://example.com %s' % (self.certificate_file_name,) mapping = 'http://example.com %s' % (self.certificate_file_name,)
if fallback_file: if fallback_file:
...@@ -1504,7 +1504,6 @@ class KedifaUpdaterUpdateCertificateTest( ...@@ -1504,7 +1504,6 @@ class KedifaUpdaterUpdateCertificateTest(
self.assertState({self.certificate_file_name: True}) self.assertState({self.certificate_file_name: True})
@unittest.skipIf(sys.version_info >= (3, ), 'kedifa currently supports python 2 only')
class KedifaUpdaterUpdateCertificatePrepareTest( class KedifaUpdaterUpdateCertificatePrepareTest(
KedifaUpdaterMixin, unittest.TestCase): KedifaUpdaterMixin, unittest.TestCase):
def setUp(self): def setUp(self):
...@@ -1522,13 +1521,13 @@ class KedifaUpdaterUpdateCertificatePrepareTest( ...@@ -1522,13 +1521,13 @@ class KedifaUpdaterUpdateCertificatePrepareTest(
if fallback: if fallback:
fallback_file = tempfile.NamedTemporaryFile( fallback_file = tempfile.NamedTemporaryFile(
dir=self.testdir, delete=False) dir=self.testdir, delete=False)
fallback_file.write(fallback) fallback_file.write(fallback.encode())
fallback_file.close() fallback_file.close()
master_file = '/master/certificate/file' master_file = '/master/certificate/file'
if master_content: if master_content:
master_file = tempfile.NamedTemporaryFile( master_file = tempfile.NamedTemporaryFile(
dir=self.testdir, delete=False) dir=self.testdir, delete=False)
master_file.write(master_content) master_file.write(master_content.encode())
master_file.close() master_file.close()
master_file = master_file.name master_file = master_file.name
...@@ -1592,7 +1591,6 @@ class KedifaUpdaterUpdateCertificatePrepareTest( ...@@ -1592,7 +1591,6 @@ class KedifaUpdaterUpdateCertificatePrepareTest(
self.assertEqual('cert', certificate) self.assertEqual('cert', certificate)
@unittest.skipIf(sys.version_info >= (3, ), 'kedifa currently supports python 2 only')
class KedifaUpdaterLoopTest( class KedifaUpdaterLoopTest(
KedifaUpdaterMixin, unittest.TestCase): KedifaUpdaterMixin, unittest.TestCase):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment