Commit 280370c7 authored by Jérome Perrin's avatar Jérome Perrin

random: fix password recipe when using storage-path and passwd

As discussed on nexedi/slapos@bb841a7b (comment 219278)
when using storage-path and passwd option, the storage file could not
be updated to the new format because of AttributeError _needs_migration.

This changes to no longer try to detect if the storage needs migration,
but just compare the expected content of the storage file during install
and overwrite the file if it is different.

This new approach also fix a behavior that re-running buildout with
storage-path option and a different passwd option did not update the
storage file. Now it is also updated.

( this also fixes a potential encoding problem on py2 )
parent e7e8cfd7
Pipeline #37612 failed with stage
in 0 seconds
......@@ -150,32 +150,31 @@ class Password(object):
except KeyError:
self.storage_path = options['storage-path'] = os.path.join(
buildout['buildout']['parts-directory'], name)
passwd_dict = {
'': options.get('passwd')
}
passwd_option = options.get('passwd')
passwd_dict = {'': passwd_option}
if self.storage_path:
try:
with open(self.storage_path) as f:
content = f.read().strip('\n')
# new format: the file contains password and hashes in json format
try:
passwd_dict = json.loads(content)
if sys.version_info < (3, ):
passwd_dict = {k: v.encode('utf-8') for k, v in passwd_dict.items()}
except ValueError:
# old format: the file only contains the password in plain text
passwd_dict[''] = content
except IOError as e:
if e.errno != errno.ENOENT:
raise
if passwd_option and passwd_dict[''] != passwd_option:
passwd_dict = {'': passwd_option}
if not passwd_dict['']:
if self.storage_path:
self._needs_migration = False
try:
with open(self.storage_path) as f:
content = f.read().strip('\n')
# new format: the file contains password and hashes in json format
try:
passwd_dict = json.loads(content)
if sys.version_info < (3, ):
passwd_dict = {k: v.encode() for k, v in passwd_dict.items()}
except ValueError:
# old format: the file only contains the password in plain text
passwd_dict[''] = content
self._needs_migration = True
except IOError as e:
if e.errno != errno.ENOENT:
raise
if not passwd_dict['']:
passwd_dict[''] = self.generatePassword(int(options.get('bytes', '16')))
self.update = self.install
options['passwd'] = passwd_dict['']
passwd_dict[''] = self.generatePassword(int(options.get('bytes', '16')))
self.update = self.install
options['passwd'] = passwd_dict['']
class HashedPasswordDict(dict):
def __missing__(self, key):
......@@ -202,17 +201,17 @@ class Password(object):
def install(self):
if self.storage_path:
serialized = json.dumps(self.passwd_dict, sort_keys=True)
stored = None
try:
# The following 2 lines are just an optimization to avoid recreating
# the file with the same content.
if self.create_once and os.stat(self.storage_path).st_size and not self._needs_migration:
return
os.unlink(self.storage_path)
except OSError as e:
with open(self.storage_path) as f:
stored = f.read()
except IOError as e:
if e.errno != errno.ENOENT:
raise
with open(self.storage_path, 'w') as f:
json.dump(self.passwd_dict, f)
if stored != serialized:
with open(self.storage_path, 'w') as f:
f.write(serialized)
if not self.create_once:
return self.storage_path
......
# coding: utf-8
import json
import os
import shutil
......@@ -44,8 +45,33 @@ class TestPassword(unittest.TestCase):
self._makeRecipe({'storage-path': tf.name}, "another").install()
self.assertEqual(self.buildout["another"]["passwd"], passwd)
def test_storage_path_passwd_set_in_options(self):
tf = tempfile.NamedTemporaryFile(delete=False)
self.addCleanup(os.unlink, tf.name)
self._makeRecipe({'storage-path': tf.name, 'passwd': 'secret'}).install()
with open(tf.name) as f:
self.assertEqual(json.load(f), {'': 'secret'})
self._makeRecipe({'storage-path': tf.name}, "another").install()
self.assertEqual(self.buildout["another"]["passwd"], 'secret')
self._makeRecipe({'storage-path': tf.name, 'passwd': 'updated'}, "updated").install()
self.assertEqual(self.buildout["updated"]["passwd"], 'updated')
with open(tf.name) as f:
self.assertEqual(json.load(f), {'': 'updated'})
def test_storage_path_passwd_set_in_options_non_ascii(self): # BBB Py2
tf = tempfile.NamedTemporaryFile(delete=False)
self.addCleanup(os.unlink, tf.name)
self._makeRecipe({'storage-path': tf.name, 'passwd': 'sécret'}).install()
with open(tf.name) as f:
self.assertEqual(json.load(f), {'': u'sécret'})
self._makeRecipe({'storage-path': tf.name}, "another").install()
self.assertEqual(self.buildout["another"]["passwd"], 'sécret')
def test_storage_path_legacy_format(self):
with tempfile.NamedTemporaryFile(delete=False) as tf:
with tempfile.NamedTemporaryFile() as tf:
tf.write(b'secret\n')
tf.flush()
......@@ -56,8 +82,22 @@ class TestPassword(unittest.TestCase):
with open(tf.name) as f:
self.assertEqual(json.load(f), {'': 'secret'})
self._makeRecipe({'storage-path': tf.name}, "another").install()
self.assertEqual(self.buildout["another"]["passwd"], passwd)
self._makeRecipe({'storage-path': tf.name}, "another").install()
self.assertEqual(self.buildout["another"]["passwd"], passwd)
def test_storage_path_legacy_format_passwd_set_in_options(self):
with tempfile.NamedTemporaryFile() as tf:
tf.write(b'secret\n')
tf.flush()
self._makeRecipe({'storage-path': tf.name, 'passwd': 'secret'}).install()
passwd = self.buildout["random"]["passwd"]
self.assertEqual(passwd, 'secret')
tf.flush()
with open(tf.name) as f:
self.assertEqual(json.load(f), {'': 'secret'})
self._makeRecipe({'storage-path': tf.name}, "another").install()
self.assertEqual(self.buildout["another"]["passwd"], passwd)
def test_bytes(self):
self._makeRecipe({'bytes': '32'}).install()
......@@ -125,9 +165,42 @@ class TestPassword(unittest.TestCase):
self.assertIsInstance(self.buildout['reread']['passwd'], str)
self.assertIsInstance(self.buildout['reread']['passwd-ldap-salted-sha1'], str)
def test_passlib_no_storage_path(self):
recipe = self._makeRecipe({'storage-path': ''})
passwd = self.buildout['random']['passwd']
self.assertTrue(passwd)
hashed = self.buildout['random']['passwd-sha256-crypt']
self.assertTrue(passlib.hash.sha256_crypt.verify(passwd, hashed))
self.assertFalse(recipe.install())
def test_passlib_input_passwd(self):
self._makeRecipe({'passwd': 'insecure'})
self.assertEqual(self.buildout['random']['passwd'], 'insecure')
hashed = self.buildout['random']['passwd-sha256-crypt']
self.assertTrue(passlib.hash.sha256_crypt.verify('insecure', hashed))
def test_passlib_input_passwd_no_storage_path(self):
recipe = self._makeRecipe({'storage-path': '', 'passwd': 'insecure'})
self.assertEqual(self.buildout['random']['passwd'], 'insecure')
hashed = self.buildout['random']['passwd-sha256-crypt']
self.assertTrue(passlib.hash.sha256_crypt.verify('insecure', hashed))
self.assertFalse(recipe.install())
def test_passlib_input_passwd_update(self):
tf = tempfile.NamedTemporaryFile(delete=False)
self.addCleanup(os.unlink, tf.name)
initial_recipe = self._makeRecipe({'storage-path': tf.name, 'passwd': 'initial'}, 'initial')
initial_hashed = self.buildout['initial']['passwd-sha256-crypt']
initial_recipe.install()
with open(tf.name) as f:
self.assertEqual(json.load(f), {'': 'initial', 'passwd-sha256-crypt': initial_hashed})
updated_recipe = self._makeRecipe({'storage-path': tf.name, 'passwd': 'updated'}, "updated")
updated_hashed = self.buildout['updated']['passwd-sha256-crypt']
updated_recipe.install()
with open(tf.name) as f:
self.assertEqual(json.load(f), {'': 'updated', 'passwd-sha256-crypt': updated_hashed})
self.assertNotEqual(initial_hashed, updated_hashed)
self.assertTrue(passlib.hash.sha256_crypt.verify('updated', updated_hashed))
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment