Commit c38b6928 authored by Kazuhiko Shiozaki's avatar Kazuhiko Shiozaki Committed by Jérome Perrin

py2/py3: convert str <=> bytes.

parent ae6f5d18
......@@ -70,6 +70,7 @@ from DateTime import DateTime
from Products.ERP5Type import Permissions
from Products.ERP5Type.Message import translateString
from Products.ERP5Type.UnrestrictedMethod import super_user
from Products.ERP5Type.Utils import bytes2str, str2bytes, unicode2str
from Products.ERP5Type.XMLObject import XMLObject
from Products.ERP5Security.ERP5GroupManager import (
disableCache as ERP5GroupManager_disableCache,
......@@ -535,7 +536,7 @@ class _ERP5RequestValidator(RequestValidator):
def _getClientValue(self, client_id):
try:
result = self._authorisation_server_connector_value[client_id.encode('utf-8')]
result = self._authorisation_server_connector_value[unicode2str(client_id)]
except KeyError:
return
if result.getValidationState() == 'validated':
......@@ -1035,7 +1036,7 @@ class OAuth2AuthorisationServerConnector(XMLObject):
multi_fernet = self.__getLoginRetryURLMultiFernet()
# Retrieve posted field, validate signature and extract the url.
try:
login_retry_url = multi_fernet.decrypt(REQUEST.form['login_retry_url'])
login_retry_url = bytes2str(multi_fernet.decrypt(str2bytes(REQUEST.form['login_retry_url'])))
except (fernet.InvalidToken, TypeError, KeyError):
# No login_retry_url provided or its value is unusable: if this is a GET
# request (trying to display a login form), use the current URL.
......@@ -1049,7 +1050,7 @@ class OAuth2AuthorisationServerConnector(XMLObject):
def getSignedLoginRetryUrl():
if login_retry_url is None:
return None
return multi_fernet.encrypt(login_retry_url)
return bytes2str(multi_fernet.encrypt(str2bytes(login_retry_url)))
return _ERP5AuthorisationEndpoint(
server_connector_path=self.getPath(),
zope_request=REQUEST,
......@@ -1082,7 +1083,7 @@ class OAuth2AuthorisationServerConnector(XMLObject):
method=method,
query_list=query_list + [(
'login_retry_url',
self.__getLoginRetryURLMultiFernet().encrypt(login_retry_url),
bytes2str(self.__getLoginRetryURLMultiFernet().encrypt(str2bytes(login_retry_url))),
)],
) as inner_request:
# pylint: disable=unexpected-keyword-arg, no-value-for-parameter
......
......@@ -41,6 +41,7 @@ import six
from AccessControl.SecurityManagement import getSecurityManager, setSecurityManager
from DateTime import DateTime
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from Products.ERP5Type.Utils import bytes2str, str2bytes
from Products.ERP5.ERP5Site import (
ERP5_AUTHORISATION_EXTRACTOR_USERNAME_NAME,
ERP5_AUTHORISATION_EXTRACTOR_PASSWORD_NAME,
......@@ -549,7 +550,7 @@ class TestOAuth2(ERP5TypeTestCase):
self.assertContentTypeEqual(result_header_dict, 'text/html')
assert result_body
parser = FormExtractor()
parser.feed(result_body)
parser.feed(bytes2str(result_body))
parser.close()
(action_url, field_list), = parser.form_list # pylint: disable=unbalanced-tuple-unpacking
for field_name, _ in field_list:
......@@ -566,13 +567,13 @@ class TestOAuth2(ERP5TypeTestCase):
content_type='application/x-www-form-urlencoded',
header_dict=header_dict,
cookie_dict=cookie_dict,
body=urlencode(list(value_callback(
body=str2bytes(urlencode(list(value_callback(
field_item_list=tuple(
(key, value)
for key, value in field_list
if not key.endswith(':method')
),
))),
)))),
)
if script_id == 'Base_callDialogMethod' and status == 302:
# Base_callDialogMethod ended in redirection. It may be that the action
......@@ -781,9 +782,9 @@ class TestOAuth2(ERP5TypeTestCase):
"""
Get a token, renew it, terminate session.
"""
basic_auth = 'Basic ' + base64.encodestring(
_TEST_USER_LOGIN + ':' + self.__password,
).rstrip()
basic_auth = 'Basic ' + bytes2str(base64.encodestring(
str2bytes(_TEST_USER_LOGIN + ':' + self.__password),
)).rstrip()
oauth2_server_connector = self.__oauth2_server_connector_value.getPath()
oauth2_client_declaration_value = self.__oauth2_external_client_declaration
authorisation_code_lifespan = oauth2_client_declaration_value.getAuthorisationCodeLifespan()
......@@ -802,7 +803,7 @@ class TestOAuth2(ERP5TypeTestCase):
# Client produces a PKCE secret and sends the Resource Owner to the Authorisation Server
# to authorise them, getting an ahutorisation code.
code_verifier = base64.urlsafe_b64encode(
'this is not a good secret6789012', # 32 bytes
b'this is not a good secret6789012', # 32 bytes
)
reference_state = 'dummy'
client_id = oauth2_client_declaration_value.getId()
......@@ -813,9 +814,9 @@ class TestOAuth2(ERP5TypeTestCase):
'client_id': client_id,
'state': reference_state,
'code_challenge_method': 'S256',
'code_challenge': base64.urlsafe_b64encode(
'code_challenge': bytes2str(base64.urlsafe_b64encode(
hashlib.sha256(code_verifier).digest(),
).rstrip('='),
)).rstrip('='),
'redirect_uri': _EXTERNAL_CLIENT_REDIRECT_URI,
}),
redirect_uri=_EXTERNAL_CLIENT_REDIRECT_URI,
......
......@@ -36,7 +36,7 @@ import json
from os import urandom
import random
from time import time
from six.moves.urllib.parse import urlencode, urljoin, urlparse
from six.moves.urllib.parse import urlencode, urljoin, urlparse, urlsplit
import ssl
from AccessControl import (
ClassSecurityInfo,
......@@ -51,6 +51,7 @@ from OFS.Traversable import NotFound
from Products.ERP5Type import Permissions
from Products.ERP5Type.XMLObject import XMLObject
from Products.ERP5Type.Timeout import getTimeLeft
from Products.ERP5Type.Utils import bytes2str, str2bytes, str2unicode
from Products.ERP5Security.ERP5OAuth2ResourceServerPlugin import (
OAuth2AuthorisationClientConnectorMixIn,
ERP5OAuth2ResourceServerPlugin,
......@@ -156,9 +157,9 @@ class _SimpleHTTPRequest(object):
def _authUserPW(self):
if self._auth.lower().startswith('basic '):
return base64.decodestring(
return bytes2str(base64.decodestring(
self._auth.split(None, 1)[1],
).split(':', 1)
)).split(':', 1)
def get(self, name):
if name == 'BODY':
......@@ -200,7 +201,7 @@ class _OAuth2AuthorisationServerProxy(object):
self._bind_address = (bind_address, 0) if bind_address else None
if ca_certificate_pem is not None:
# On python2 cadata is expected as an unicode object only.
ca_certificate_pem = ca_certificate_pem.decode('utf-8')
ca_certificate_pem = str2unicode(ca_certificate_pem)
self._ca_certificate_pem = ca_certificate_pem
#
......@@ -580,7 +581,7 @@ class OAuth2AuthorisationClientConnector(
)
RESPONSE.setCookie(
name=name,
value=base64.urlsafe_b64encode(content),
value=bytes2str(base64.urlsafe_b64encode(str2bytes(content))),
# prevent this cookie from being read over the network
# (assuming an uncompromised SSL setup, but if it is compromised
# then the attacker may just as well impersonate the victim using
......@@ -615,10 +616,10 @@ class OAuth2AuthorisationClientConnector(
ttl = self._SESSION_STATE_VALIDITY
for name, value in six.iteritems(self._getRawStateCookieDict(REQUEST)):
try:
result[name] = decrypt(
result[name] = bytes2str(decrypt(
base64.urlsafe_b64decode(value),
ttl=ttl,
)
))
except (fernet.InvalidToken, TypeError):
self._expireStateCookie(RESPONSE, name)
return result
......@@ -752,8 +753,8 @@ class OAuth2AuthorisationClientConnector(
)))
except StopIteration:
name = None
identifier = base64.urlsafe_b64encode(urandom(32))
code_verifier = base64.urlsafe_b64encode(urandom(32))
identifier = bytes2str(base64.urlsafe_b64encode(urandom(32)))
code_verifier = bytes2str(base64.urlsafe_b64encode(urandom(32)))
_, state_key = self.__getStateFernetKeyList()[0]
encrypt = fernet.Fernet(state_key).encrypt
query_list = [
......@@ -765,7 +766,7 @@ class OAuth2AuthorisationClientConnector(
# Note: fernet both signs and encrypts the content.
# It uses on AES128-CBC, PKCS7 padding, and SHA256 HMAC, with
# independent keys for encryption and authentication.
encrypt(json.dumps({
bytes2str(encrypt(str2bytes(json.dumps({
# Identifier is also stored in User-Agent as a cookie.
# This is used to prevent an attacker from tricking a user into
# giving us an Authorisation Code under the control of the attacker.
......@@ -787,7 +788,7 @@ class OAuth2AuthorisationClientConnector(
# done above), this means the key may be attacked using (partially)
# chosen-cleartext (if AES128 is found vulnerable to such attack).
_STATE_CAME_FROM_NAME: (
came_from.decode('utf-8')
str2unicode(came_from)
if came_from else
came_from
),
......@@ -795,15 +796,15 @@ class OAuth2AuthorisationClientConnector(
# Authorisation Code converted into tokens. To be kept secret from
# everyone other than this server.
_STATE_CODE_VERIFIER_NAME: code_verifier,
})),
})))),
),
('code_challenge_method', 'S256'),
(
'code_challenge',
# S256 standard PKCE encoding
base64.urlsafe_b64encode(
hashlib.sha256(code_verifier).digest(),
).rstrip('='),
bytes2str(base64.urlsafe_b64encode(
hashlib.sha256(str2bytes(code_verifier)).digest(),
)).rstrip('='),
),
]
if scope_list:
......@@ -817,7 +818,7 @@ class OAuth2AuthorisationClientConnector(
self._setStateCookie(
RESPONSE=RESPONSE,
name=name,
content=encrypt(identifier),
content=bytes2str(encrypt(str2bytes(identifier))),
)
if (
self.isAuthorisationServerRemote() or
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment