Commit b53ba563 authored by Alain Takoudjou's avatar Alain Takoudjou

plugin recipe: improve recipe to correctly generate promise which has complex parameters


Use json.dumps/.loads to generate promise plugin code. This is safe as it will correctly escape sring and prevent code injection from untrustable parameters.

Add 'import' parameter to simplify parameters used to generate the script instead of passing full import code. When import parameter is set, promise will be loaded from that import path and parameter `content` is ignored.

/reviewed-on nexedi/slapos!515
parent 40eff292
......@@ -24,19 +24,18 @@
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import logging, os, sys
import json
import re
import logging, os
import zc.buildout.easy_install
from slapos.recipe.librecipe import GenericBaseRecipe
script_template = '''# This script is auto generated by slapgrid, do not edit!
import json
import sys
sys.path[0:0] = [
%(path)s
]
sys.path[0:0] = %(path)s
extra_config_dict = {
%(config)s
}
extra_config_dict = json.loads("""%(config)s""")
# We want to cleanup all imported modules from slapos namespace, because
# they will conflict with slapos.core.
......@@ -53,8 +52,6 @@ for module in sys.modules.keys():
if 'slapos' in module or 'pkg_resources' in module:
del sys.modules[module]
import slapos.grid.promise
%(content)s
'''
......@@ -79,11 +76,18 @@ class Recipe(GenericBaseRecipe):
)
if cache_storage is None:
cache_storage = {}
setattr(
self.buildout,
self._WORKING_SET_CACHE_NAME,
cache_storage
)
try:
setattr(
self.buildout,
self._WORKING_SET_CACHE_NAME,
cache_storage
)
except AttributeError:
if type(self.buildout) == type({}):
# failed to set attribute in test mode, cache not used
pass
else:
raise
return cache_storage
def install(self):
......@@ -102,30 +106,43 @@ class Recipe(GenericBaseRecipe):
develop_eggs_dir,
)
if cache_key not in cache_storage:
working_set = zc.buildout.easy_install.working_set(
egg_list,
[develop_eggs_dir, eggs_dir]
)
cache_storage[cache_key] = working_set
if develop_eggs_dir and eggs_dir:
working_set = zc.buildout.easy_install.working_set(
egg_list,
[develop_eggs_dir, eggs_dir]
)
cache_storage[cache_key] = working_set
else:
working_set = set()
else:
working_set = cache_storage[cache_key]
content = self.options['content'].strip()
regex = r"^[\w_\-\.\s]+$"
import_path = self.options.get('import', '').strip()
if import_path:
if not re.search(regex, import_path):
raise ValueError("Import path %r is not a valid" % import_path)
content_string = "from %s import RunPromise" % import_path
else:
# old parameter for compatibility
content_string = self.options['content'].strip()
if not re.search(regex, content_string):
raise ValueError("Promise content %r is not valid" % content_string)
output = self.options['output']
mode = self.options.get('mode', '0600')
path_list_string = ""
mode = self.options.get('mode', '0644')
path_list = []
for dist in working_set:
path_list_string += ' "%s",\n' % dist.location
path_list.append(dist.location)
content_string = '\n'.join([line.lstrip() for line in content.split('\n')])
config_string = ""
config_dict = dict()
for key in self.options:
if key.startswith('config-'):
config_string += " '%s': '%s',\n" % (key[7:], self.options[key])
config_dict[key[7:]] = self.options[key]
option_dict = dict(path=path_list_string.strip(),
option_dict = dict(path=json.dumps(path_list, indent=2),
content=content_string,
config=config_string.strip())
config=json.dumps(config_dict, indent=2, sort_keys=True))
with open(output, 'w') as f:
f.write(script_template % option_dict)
......
import os, shutil, tempfile, unittest
from slapos.recipe import promise_plugin
from slapos.test.utils import makeRecipe
import stat, json
class TestPromisePlugin(unittest.TestCase):
def setUp(self):
self.tmp = tempfile.mkdtemp()
self.output = os.path.join(self.tmp, 'output.py')
self.options = options = {
'output': self.output,
'eggs': 'slapos.cookbook'
}
def tearDown(self):
shutil.rmtree(self.tmp)
def test_parameters(self):
self.options['mode'] = '0644'
self.options['import'] = 'slapos.promise.plugin.check_site_available'
self.options['config-param1'] = "YY^@12"
self.options['config-param2'] = "23'91'"
self.options['config-param3'] = None
self.options['config-param4'] = """param
in multi line
123444
"""
recipe = makeRecipe(
promise_plugin.Recipe,
options=self.options,
name='plugin')
recipe.install()
self.assertTrue(os.path.exists(self.output))
with open(self.output) as f:
content = f.read()
self.assertIn("from slapos.promise.plugin.check_site_available import RunPromise", content)
self.assertEqual(stat.S_IMODE(os.stat(self.output).st_mode), int('644', 8))
expected_dict = dict(
param1=self.options['config-param1'],
param2=self.options['config-param2'],
param3=self.options['config-param3'],
param4=self.options['config-param4'],
)
self.assertIn('extra_config_dict = json.loads("""%s""")' % json.dumps(expected_dict, indent=2, sort_keys=True), content)
def test_no_module_set(self):
recipe = makeRecipe(
promise_plugin.Recipe,
options=self.options,
name='plugin')
with self.assertRaises(KeyError):
recipe.install()
def test_default(self):
self.options['import'] = 'slapos.promise.plugin.check_site_available'
recipe = makeRecipe(
promise_plugin.Recipe,
options=self.options,
name='plugin')
recipe.install()
self.assertTrue(os.path.exists(self.output))
self.assertEqual(stat.S_IMODE(os.stat(self.output).st_mode), int('644', 8))
with open(self.output) as f:
content = f.read()
self.assertIn("from slapos.promise.plugin.check_site_available import RunPromise", content)
self.assertIn('extra_config_dict = json.loads("""{}""")', content)
def test_bad_parameters(self):
self.options['import'] = 'slapos.promise.plugin.check_site_available'
self.options['config-param1; print "toto"'] = """#xxxx"\nimport os; os.stat(f)"""
self.options['config-param2\n@domething'] = '"#$$*PPP\n\n p = 2*5; print "result is %s" % p'
recipe = makeRecipe(
promise_plugin.Recipe,
options=self.options,
name='plugin')
recipe.install()
self.assertTrue(os.path.exists(self.output))
with open(self.output) as f:
content = f.read()
expected_param1 = '"param1; print \\"toto\\"": "#xxxx\\"\\nimport os; os.stat(f)",'
expected_param2 = '"param2\\n@domething": "\\"#$$*PPP\\n\\n p = 2*5; print \\"result is %s\\" % p"'
self.assertIn(expected_param1, content)
self.assertIn(expected_param2, content)
def test_bad_module_path(self):
self.options['import'] = 'slapos.promise.plugin.check_site_available; print "toto"'
recipe = makeRecipe(
promise_plugin.Recipe,
options=self.options,
name='plugin')
with self.assertRaises(ValueError) as p:
recipe.install()
self.assertEqual(p.exception.message, "Import path %r is not a valid" % self.options['import'])
def test_bad_content(self):
self.options['content'] = 'from slapos.plugin.check_site_available import toto; print "toto"'
recipe = makeRecipe(
promise_plugin.Recipe,
options=self.options,
name='plugin')
with self.assertRaises(ValueError) as p:
recipe.install()
self.assertEqual(p.exception.message, "Promise content %r is not valid" % self.options['content'])
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment