pax_global_header 0000666 0000000 0000000 00000000064 13232036066 0014513 g ustar 00root root 0000000 0000000 52 comment=15dedb62a3565373c77977e5ebd2e56b4ffe341a
slapos.core-15dedb62a3565373c77977e5ebd2e56b4ffe341a-slapos-tests/ 0000775 0000000 0000000 00000000000 13232036066 0023500 5 ustar 00root root 0000000 0000000 slapos.core-15dedb62a3565373c77977e5ebd2e56b4ffe341a-slapos-tests/slapos/ 0000775 0000000 0000000 00000000000 13232036066 0025001 5 ustar 00root root 0000000 0000000 slapos.core-15dedb62a3565373c77977e5ebd2e56b4ffe341a-slapos-tests/slapos/tests/ 0000775 0000000 0000000 00000000000 13232036066 0026143 5 ustar 00root root 0000000 0000000 slapos.core-15dedb62a3565373c77977e5ebd2e56b4ffe341a-slapos-tests/slapos/tests/__init__.py 0000664 0000000 0000000 00000000000 13232036066 0030242 0 ustar 00root root 0000000 0000000 slapos.core-15dedb62a3565373c77977e5ebd2e56b4ffe341a-slapos-tests/slapos/tests/cli.py 0000664 0000000 0000000 00000013507 13232036066 0027272 0 ustar 00root root 0000000 0000000 ##############################################################################
#
# Copyright (c) 2013 Vifib SARL and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import logging
import pprint
import unittest
from mock import patch, create_autospec
import slapos.cli.list
import slapos.cli.info
import slapos.cli.supervisorctl
from slapos.client import ClientConfig
import slapos.grid.svcbackend
import slapos.proxy
import slapos.slap
import supervisor.supervisorctl
def raiseNotFoundError(*args, **kwargs):
raise slapos.slap.NotFoundError()
class CliMixin(unittest.TestCase):
def setUp(self):
slap = slapos.slap.slap()
self.local = {'slap': slap}
self.logger = create_autospec(logging.Logger)
self.conf = create_autospec(ClientConfig)
class TestCliProxy(CliMixin):
def test_generateSoftwareProductListFromString(self):
"""
Test that generateSoftwareProductListFromString correctly parses a parameter
coming from the configuration file.
"""
software_product_list_string = """
product1 url1
product2 url2"""
software_release_url_list = {
'product1': 'url1',
'product2': 'url2',
}
self.assertEqual(
slapos.proxy._generateSoftwareProductListFromString(
software_product_list_string),
software_release_url_list
)
def test_generateSoftwareProductListFromString_emptyString(self):
self.assertEqual(
slapos.proxy._generateSoftwareProductListFromString(''),
{}
)
class TestCliList(CliMixin):
def test_list(self):
"""
Test "slapos list" command output.
"""
return_value = {
'instance1': slapos.slap.SoftwareInstance(_title='instance1', _software_release_url='SR1'),
'instance2': slapos.slap.SoftwareInstance(_title='instance2', _software_release_url='SR2'),
}
with patch.object(slapos.slap.slap, 'getOpenOrderDict', return_value=return_value) as _:
slapos.cli.list.do_list(self.logger, None, self.local)
self.logger.info.assert_any_call('%s %s', 'instance1', 'SR1')
self.logger.info.assert_any_call('%s %s', 'instance2', 'SR2')
def test_emptyList(self):
with patch.object(slapos.slap.slap, 'getOpenOrderDict', return_value={}) as _:
slapos.cli.list.do_list(self.logger, None, self.local)
self.logger.info.assert_called_once_with('No existing service.')
@patch.object(slapos.slap.slap, 'registerOpenOrder', return_value=slapos.slap.OpenOrder())
class TestCliInfo(CliMixin):
def test_info(self, _):
"""
Test "slapos info" command output.
"""
setattr(self.conf, 'reference', 'instance1')
instance = slapos.slap.SoftwareInstance(
_software_release_url='SR1',
_requested_state = 'mystate',
_connection_dict = {'myconnectionparameter': 'value1'},
_parameter_dict = {'myinstanceparameter': 'value2'}
)
with patch.object(slapos.slap.OpenOrder, 'getInformation', return_value=instance):
slapos.cli.info.do_info(self.logger, self.conf, self.local)
self.logger.info.assert_any_call(pprint.pformat(instance._parameter_dict))
self.logger.info.assert_any_call('Software Release URL: %s', instance._software_release_url)
self.logger.info.assert_any_call('Instance state: %s', instance._requested_state)
self.logger.info.assert_any_call(pprint.pformat(instance._parameter_dict))
self.logger.info.assert_any_call(pprint.pformat(instance._connection_dict))
def test_unknownReference(self, _):
"""
Test "slapos info" command output in case reference
of service is not known.
"""
setattr(self.conf, 'reference', 'instance1')
with patch.object(slapos.slap.OpenOrder, 'getInformation', side_effect=raiseNotFoundError):
slapos.cli.info.do_info(self.logger, self.conf, self.local)
self.logger.warning.assert_called_once_with('Instance %s does not exist.', self.conf.reference)
@patch.object(supervisor.supervisorctl, 'main')
class TestCliSupervisorctl(CliMixin):
def test_allow_supervisord_launch(self, _):
"""
Test that "slapos node supervisorctl" tries to launch supervisord
"""
instance_root = '/foo/bar'
with patch.object(slapos.grid.svcbackend, 'launchSupervisord') as launchSupervisord:
slapos.cli.supervisorctl.do_supervisorctl(self.logger, instance_root, ['status'], False)
launchSupervisord.assert_any_call(instance_root=instance_root, logger=self.logger)
def test_forbid_supervisord_launch(self, _):
"""
Test that "slapos node supervisorctl" does not try to launch supervisord
"""
instance_root = '/foo/bar'
with patch.object(slapos.grid.svcbackend, 'launchSupervisord') as launchSupervisord:
slapos.cli.supervisorctl.do_supervisorctl(self.logger, instance_root, ['status'], True)
self.assertFalse(launchSupervisord.called)
slapos.core-15dedb62a3565373c77977e5ebd2e56b4ffe341a-slapos-tests/slapos/tests/client.py 0000664 0000000 0000000 00000007430 13232036066 0027777 0 ustar 00root root 0000000 0000000 ##############################################################################
#
# Copyright (c) 2010 Vifib SARL and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import logging
import unittest
import slapos.slap
import slapos.client
class TestClient(unittest.TestCase):
def setUp(self):
self.called_software_product = None
class FakeSoftwareProductCollection(object):
def __init__(inner_self, *args, **kw_args):
inner_self.__getattr__ = inner_self.get
def get(inner_self, software_product):
self.called_software_product = software_product
return self.software_product_reference
self.slap = slapos.slap.slap()
self.product_collection = FakeSoftwareProductCollection(
logging.getLogger(), self.slap)
def test_getSoftwareReleaseFromSoftwareString_softwareProduct(self):
"""
Test that if given software is a Sofwtare Product (i.e matching
the magic string), it returns the corresponding value of a call to
SoftwareProductCollection.
"""
self.software_product_reference = 'foo'
software_string = '%s%s' % (
slapos.client.SOFTWARE_PRODUCT_NAMESPACE,
self.software_product_reference
)
slapos.client._getSoftwareReleaseFromSoftwareString(
logging.getLogger(), software_string, self.product_collection)
self.assertEqual(
self.called_software_product,
self.software_product_reference
)
def test_getSoftwareReleaseFromSoftwareString_softwareProduct_emptySoftwareProduct(self):
"""
Test that if given software is a Software Product (i.e matching
the magic string), but this software product is empty, it exits.
"""
self.software_product_reference = 'foo'
software_string = '%s%s' % (
slapos.client.SOFTWARE_PRODUCT_NAMESPACE,
self.software_product_reference
)
def fake_get(software_product):
raise AttributeError()
self.product_collection.__getattr__ = fake_get
self.assertRaises(
SystemExit,
slapos.client._getSoftwareReleaseFromSoftwareString,
logging.getLogger(), software_string, self.product_collection
)
def test_getSoftwareReleaseFromSoftwareString_softwareRelease(self):
"""
Test that if given software is a simple Software Release URL (not matching
the magic string), it is just returned without modification.
"""
software_string = 'foo'
returned_value = slapos.client._getSoftwareReleaseFromSoftwareString(
logging.getLogger(), software_string, self.product_collection)
self.assertEqual(
self.called_software_product,
None
)
self.assertEqual(
returned_value,
software_string
)
slapos.core-15dedb62a3565373c77977e5ebd2e56b4ffe341a-slapos-tests/slapos/tests/collect.py 0000664 0000000 0000000 00000065507 13232036066 0030157 0 ustar 00root root 0000000 0000000 ##############################################################################
#
# Copyright (c) 2014 Vifib SARL and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import os
import glob
import unittest
import shutil
import tarfile
import tempfile
import slapos.slap
import psutil
from time import strftime
from slapos.collect import entity, snapshot, db, reporter
from slapos.cli.entry import SlapOSApp
from ConfigParser import ConfigParser
class FakeDatabase(object):
def __init__(self):
self.invoked_method_list = []
def connect(self):
self.invoked_method_list.append(("connect", ""))
def close(self):
self.invoked_method_list.append(("close", ""))
def commit(self):
self.invoked_method_list.append(("commit", ""))
def insertUserSnapshot(self, *args, **kw):
self.invoked_method_list.append(("insertUserSnapshot", (args, kw)))
def inserFolderSnapshot(self, *args, **kw):
self.invoked_method_list.append(("inserFolderSnapshot", (args, kw)))
def insertSystemSnapshot(self, *args, **kw):
self.invoked_method_list.append(("insertSystemSnapshot", (args, kw)))
def insertComputerSnapshot(self, *args, **kw):
self.invoked_method_list.append(("insertComputerSnapshot", (args, kw)))
def insertDiskPartitionSnapshot(self, *args, **kw):
self.invoked_method_list.append(("insertDiskPartitionSnapshot", (args, kw)))
def insertTemperatureSnapshot(self, *args, **kw):
self.invoked_method_list.append(("insertTemperatureSnapshot", (args, kw)))
def insertHeatingSnapshot(self, *args, **kw):
self.invoked_method_list.append(("insertHeatingSnapshot", (args, kw)))
class FakeDatabase2(FakeDatabase):
def select(self, *args, **kw):
self.invoked_method_list.append(("select", (args, kw)))
return []
class TestCollectDatabase(unittest.TestCase):
def setUp(self):
self.instance_root = tempfile.mkdtemp()
def tearDown(self):
if os.path.exists(self.instance_root):
shutil.rmtree(self.instance_root)
def test_database_bootstrap(self):
self.assertFalse(os.path.exists(
"%s/collector.db" % self.instance_root ))
database = db.Database(self.instance_root)
database.connect()
try:
self.assertEquals(
[u'user', u'folder', u'computer', u'system', u'disk', u'temperature', u'heating'],
database.getTableList())
finally:
database.close()
self.assertTrue(os.path.exists(
"%s/collector.db" % self.instance_root ))
def test_database_select(self):
def _fake_execute(sql): return sql
database = db.Database(self.instance_root)
database.connect()
original_execute = database._execute
try:
database._execute = _fake_execute
self.assertEquals("SELECT * FROM user ", database.select('user'))
self.assertEquals("SELECT DATE FROM user WHERE date = '0.1' AND time=\"00:02\" ",
database.select('user', 0.1, 'DATE', 'time="00:02"'))
self.assertEquals("SELECT DATE FROM user WHERE date = '0.1' GROUP BY time ORDER BY date",
database.select('user', 0.1, 'DATE', order='date', group='time' ))
self.assertEquals("SELECT DATE FROM user WHERE date = '0.1' limit 1",
database.select('user', 0.1, 'DATE', limit=1))
finally:
database._execute = original_execute
database.close()
def test_insert_user_snapshot(self):
database = db.Database(self.instance_root)
database.connect()
try:
database.insertUserSnapshot(
'fakeuser0', 10, '10-12345', '0.1', '10.0', '1',
'10.0', '10.0', '0.1', '0.1', 'DATE', 'TIME')
database.commit()
self.assertEquals([i for i in database.select('user')],
[(u'fakeuser0', 10.0, u'10-12345', 0.1, 10.0,
1.0, 10.0, 10.0, 0.1, 0.1, u'DATE', u'TIME', 0)])
finally:
database.close()
def test_insert_folder_snapshot(self):
database = db.Database(self.instance_root)
database.connect()
try:
database.inserFolderSnapshot(
'fakeuser0', '0.1', 'DATE', 'TIME')
database.commit()
self.assertEquals([i for i in database.select('folder')],
[(u'fakeuser0', 0.1, u'DATE', u'TIME', 0)])
finally:
database.close()
def test_insert_computer_snapshot(self):
database = db.Database(self.instance_root)
database.connect()
try:
database.insertComputerSnapshot(
'1', '0', '0', '100', '0', '/dev/sdx1', 'DATE', 'TIME')
database.commit()
self.assertEquals([i for i in database.select('computer')],
[(1.0, 0.0, u'0', 100.0, u'0', u'/dev/sdx1', u'DATE', u'TIME', 0)])
finally:
database.close()
def test_insert_disk_partition_snapshot(self):
database = db.Database(self.instance_root)
database.connect()
try:
database.insertDiskPartitionSnapshot(
'/dev/sdx1', '10', '20', '/mnt', 'DATE', 'TIME')
database.commit()
self.assertEquals([i for i in database.select('disk')],
[(u'/dev/sdx1', u'10', u'20', u'/mnt', u'DATE', u'TIME', 0)])
finally:
database.close()
def test_insert_system_snapshot(self):
database = db.Database(self.instance_root)
database.connect()
try:
database.insertSystemSnapshot("0.1", '10.0', '100.0', '100.0',
'10.0', '1', '2', '12.0', '1', '1', 'DATE', 'TIME')
database.commit()
self.assertEquals([i for i in database.select('system')],
[(0.1, 10.0, 100.0, 100.0, 10.0, 1.0,
2.0, 12.0, 1.0, 1.0, u'DATE', u'TIME', 0)])
finally:
database.close()
def test_date_scope(self):
database = db.Database(self.instance_root)
database.connect()
try:
database.insertSystemSnapshot("0.1", '10.0', '100.0', '100.0',
'10.0', '1', '2', '12.0', '1', '1', 'EXPECTED-DATE', 'TIME')
database.commit()
self.assertEquals([i for i in database.getDateScopeList()],
[('EXPECTED-DATE', 1)])
self.assertEquals([i for i in \
database.getDateScopeList(ignore_date='EXPECTED-DATE')],
[])
self.assertEquals([i for i in \
database.getDateScopeList(reported=1)],
[])
finally:
database.close()
def test_garbage_collection_date_list(self):
database = db.Database(self.instance_root)
self.assertEquals(len(database._getGarbageCollectionDateList()), 3)
self.assertEquals(len(database._getGarbageCollectionDateList(1)), 1)
self.assertEquals(len(database._getGarbageCollectionDateList(0)), 0)
self.assertEquals(database._getGarbageCollectionDateList(1),
[strftime("%Y-%m-%d")])
def test_garbage(self):
database = db.Database(self.instance_root)
database.connect()
database.insertSystemSnapshot("0.1", '10.0', '100.0', '100.0',
'10.0', '1', '2', '12.0', '1', '1', '1983-01-10', 'TIME')
database.insertDiskPartitionSnapshot(
'/dev/sdx1', '10', '20', '/mnt', '1983-01-10', 'TIME')
database.insertComputerSnapshot(
'1', '0', '0', '100', '0', '/dev/sdx1', '1983-01-10', 'TIME')
database.commit()
database.markDayAsReported(date_scope="1983-01-10",
table_list=database.table_list)
database.commit()
self.assertEquals(len([x for x in database.select('system')]), 1)
self.assertEquals(len([x for x in database.select('computer')]), 1)
self.assertEquals(len([x for x in database.select('disk')]), 1)
database.close()
database.garbageCollect()
database.connect()
self.assertEquals(len([x for x in database.select('system')]), 0)
self.assertEquals(len([x for x in database.select('computer')]), 0)
self.assertEquals(len([x for x in database.select('disk')]), 0)
def test_mark_day_as_reported(self):
database = db.Database(self.instance_root)
database.connect()
try:
database.insertSystemSnapshot("0.1", '10.0', '100.0', '100.0',
'10.0', '1', '2', '12.0', '1', '1', 'EXPECTED-DATE', 'TIME')
database.insertSystemSnapshot("0.1", '10.0', '100.0', '100.0',
'10.0', '1', '2', '12.0', '1', '1', 'NOT-EXPECTED-DATE', 'TIME')
database.commit()
self.assertEquals([i for i in database.select('system')],
[(0.1, 10.0, 100.0, 100.0, 10.0, 1.0,
2.0, 12.0, 1.0, 1.0, u'EXPECTED-DATE', u'TIME', 0),
(0.1, 10.0, 100.0, 100.0, 10.0, 1.0,
2.0, 12.0, 1.0, 1.0, u'NOT-EXPECTED-DATE', u'TIME', 0)])
database.markDayAsReported(date_scope="EXPECTED-DATE",
table_list=["system"])
database.commit()
self.assertEquals([i for i in database.select('system')],
[(0.1, 10.0, 100.0, 100.0, 10.0, 1.0,
2.0, 12.0, 1.0, 1.0, u'EXPECTED-DATE', u'TIME', 1),
(0.1, 10.0, 100.0, 100.0, 10.0, 1.0,
2.0, 12.0, 1.0, 1.0, u'NOT-EXPECTED-DATE', u'TIME', 0)])
finally:
database.close()
class TestCollectReport(unittest.TestCase):
def setUp(self):
self.instance_root = tempfile.mkdtemp()
def tearDown(self):
if os.path.exists(self.instance_root):
shutil.rmtree(self.instance_root)
def test_raw_csv_report(self):
database = db.Database(self.instance_root)
database.connect()
database.insertSystemSnapshot("0.1", '10.0', '100.0', '100.0',
'10.0', '1', '2', '12.0', '1', '1', '1983-01-10', 'TIME')
database.insertDiskPartitionSnapshot(
'/dev/sdx1', '10', '20', '/mnt', '1983-01-10', 'TIME')
database.insertComputerSnapshot(
'1', '0', '0', '100', '0', '/dev/sdx1', '1983-01-10', 'TIME')
database.commit()
database.close()
reporter.RawCSVDumper(database).dump(self.instance_root)
self.assertTrue(os.path.exists("%s/1983-01-10" % self.instance_root))
csv_path_list = ['%s/1983-01-10/dump_disk.csv' % self.instance_root,
'%s/1983-01-10/dump_computer.csv' % self.instance_root,
'%s/1983-01-10/dump_user.csv' % self.instance_root,
'%s/1983-01-10/dump_folder.csv' % self.instance_root,
'%s/1983-01-10/dump_heating.csv' % self.instance_root,
'%s/1983-01-10/dump_temperature.csv' % self.instance_root,
'%s/1983-01-10/dump_system.csv' % self.instance_root]
self.assertEquals(set(glob.glob("%s/1983-01-10/*.csv" % self.instance_root)),
set(csv_path_list))
def test_system_csv_report(self):
database = db.Database(self.instance_root)
database.connect()
database.insertSystemSnapshot("0.1", '10.0', '100.0', '100.0',
'10.0', '1', '2', '12.0', '1', '1', strftime("%Y-%m-%d"), 'TIME')
database.insertDiskPartitionSnapshot(
'/dev/sdx1', '10', '20', '/mnt', strftime("%Y-%m-%d"), 'TIME')
database.insertComputerSnapshot(
'1', '0', '0', '100', '0', '/dev/sdx1', strftime("%Y-%m-%d"), 'TIME')
database.commit()
database.close()
reporter.SystemCSVReporterDumper(database).dump(self.instance_root)
csv_path_list = ['%s/system_memory_used.csv' % self.instance_root,
'%s/system_cpu_percent.csv' % self.instance_root,
'%s/system_net_out_bytes.csv' % self.instance_root,
'%s/system_net_in_bytes.csv' % self.instance_root,
'%s/system_disk_memory_free__dev_sdx1.csv' % self.instance_root,
'%s/system_net_out_errors.csv' % self.instance_root,
'%s/system_disk_memory_used__dev_sdx1.csv' % self.instance_root,
'%s/system_net_out_dropped.csv' % self.instance_root,
'%s/system_memory_free.csv' % self.instance_root,
'%s/system_net_in_errors.csv' % self.instance_root,
'%s/system_net_in_dropped.csv' % self.instance_root,
'%s/system_loadavg.csv' % self.instance_root]
self.assertEquals(set(glob.glob("%s/*.csv" % self.instance_root)), set(csv_path_list))
def test_compress_log_directory(self):
log_directory = "%s/test_compress" % self.instance_root
dump_folder = "%s/1990-01-01" % log_directory
if not os.path.exists(log_directory):
os.mkdir(log_directory)
if os.path.exists(dump_folder):
shutil.rmtree(dump_folder)
os.mkdir("%s/1990-01-01" % log_directory)
with open("%s/test.txt" % dump_folder, "w") as dump_file:
dump_file.write("hi")
dump_file.close()
reporter.compressLogFolder(log_directory)
self.assertFalse(os.path.exists(dump_folder))
self.assertTrue(os.path.exists("%s.tar.gz" % dump_folder))
with tarfile.open("%s.tar.gz" % dump_folder) as tf:
self.assertEquals(tf.getmembers()[0].name, "1990-01-01")
self.assertEquals(tf.getmembers()[1].name, "1990-01-01/test.txt")
self.assertEquals(tf.extractfile(tf.getmembers()[1]).read(), 'hi')
class TestCollectSnapshot(unittest.TestCase):
def setUp(self):
self.slap = slapos.slap.slap()
self.app = SlapOSApp()
self.temp_dir = tempfile.mkdtemp()
os.environ["HOME"] = self.temp_dir
self.instance_root = tempfile.mkdtemp()
self.software_root = tempfile.mkdtemp()
if os.path.exists(self.temp_dir):
shutil.rmtree(self.temp_dir)
def test_process_snapshot(self):
process = psutil.Process(os.getpid())
process_snapshot = snapshot.ProcessSnapshot(process)
self.assertNotEquals(process_snapshot.username, None)
self.assertEquals(int(process_snapshot.pid), os.getpid())
self.assertEquals(int(process_snapshot.process.split("-")[0]),
os.getpid())
self.assertNotEquals(process_snapshot.cpu_percent , None)
self.assertNotEquals(process_snapshot.cpu_time , None)
self.assertNotEquals(process_snapshot.cpu_num_threads, None)
self.assertNotEquals(process_snapshot.memory_percent , None)
self.assertNotEquals(process_snapshot.memory_rss, None)
self.assertNotEquals(process_snapshot.io_rw_counter, None)
self.assertNotEquals(process_snapshot.io_cycles_counter, None)
def test_folder_size_snapshot(self):
disk_snapshot = snapshot.FolderSizeSnapshot(self.instance_root)
self.assertEqual(disk_snapshot.disk_usage, 0)
for i in range(0, 10):
folder = 'folder%s' % i
os.mkdir(os.path.join(self.instance_root, folder))
with open(os.path.join(self.instance_root, folder, 'toto'), 'w') as f:
f.write('toto text')
disk_snapshot.update_folder_size()
self.assertNotEquals(disk_snapshot.disk_usage, 0)
pid_file = os.path.join(self.instance_root, 'disk_snap.pid')
disk_snapshot = snapshot.FolderSizeSnapshot(self.instance_root, pid_file)
disk_snapshot.update_folder_size()
self.assertNotEquals(disk_snapshot.disk_usage, 0)
pid_file = os.path.join(self.instance_root, 'disk_snap.pid')
disk_snapshot = snapshot.FolderSizeSnapshot(self.instance_root, pid_file,
use_quota=True)
disk_snapshot.update_folder_size()
self.assertNotEquals(disk_snapshot.disk_usage, 0)
def test_process_snapshot_broken_process(self):
self.assertRaises(AssertionError,
snapshot.ProcessSnapshot, None)
def test_computer_snapshot(self):
computer_snapshot = snapshot.ComputerSnapshot()
self.assertNotEquals(computer_snapshot.cpu_num_core , None)
self.assertNotEquals(computer_snapshot.cpu_frequency , None)
self.assertNotEquals(computer_snapshot.cpu_type , None)
self.assertNotEquals(computer_snapshot.memory_size , None)
self.assertNotEquals(computer_snapshot.memory_type , None)
self.assertEquals(type(computer_snapshot.system_snapshot),
snapshot.SystemSnapshot)
self.assertNotEquals(computer_snapshot.disk_snapshot_list, [])
self.assertNotEquals(computer_snapshot.partition_list, [])
self.assertEquals(type(computer_snapshot.disk_snapshot_list[0]),
snapshot.DiskPartitionSnapshot)
def test_system_snapshot(self):
system_snapshot = snapshot.SystemSnapshot()
self.assertNotEquals(system_snapshot.memory_used , None)
self.assertNotEquals(system_snapshot.memory_free , None)
self.assertNotEquals(system_snapshot.memory_percent , None)
self.assertNotEquals(system_snapshot.cpu_percent , None)
self.assertNotEquals(system_snapshot.load , None)
self.assertNotEquals(system_snapshot.net_in_bytes , None)
self.assertNotEquals(system_snapshot.net_in_errors, None)
self.assertNotEquals(system_snapshot.net_in_dropped , None)
self.assertNotEquals(system_snapshot.net_out_bytes , None)
self.assertNotEquals(system_snapshot.net_out_errors, None)
self.assertNotEquals(system_snapshot.net_out_dropped , None)
class TestCollectEntity(unittest.TestCase):
def setUp(self):
self.temp_dir = tempfile.mkdtemp()
os.environ["HOME"] = self.temp_dir
self.instance_root = tempfile.mkdtemp()
self.software_root = tempfile.mkdtemp()
if os.path.exists(self.temp_dir):
shutil.rmtree(self.temp_dir)
def tearDown(self):
pass
def getFakeUser(self, disk_snapshot_params={}):
os.mkdir("%s/fakeuser0" % self.instance_root)
return entity.User("fakeuser0",
"%s/fakeuser0" % self.instance_root, disk_snapshot_params )
def test_get_user_list(self):
config = ConfigParser()
config.add_section('slapformat')
config.set('slapformat', 'partition_amount', '3')
config.set('slapformat', 'user_base_name', 'slapuser')
config.set('slapformat', 'partition_base_name', 'slappart')
config.add_section('slapos')
config.set('slapos', 'instance_root', self.instance_root)
user_dict = entity.get_user_list(config)
username_list = ['slapuser0', 'slapuser1', 'slapuser2']
self.assertEquals(username_list, user_dict.keys())
for name in username_list:
self.assertEquals(user_dict[name].name, name)
self.assertEquals(user_dict[name].snapshot_list, [])
expected_path = "%s/slappart%s" % (self.instance_root, name.strip("slapuser"))
self.assertEquals(user_dict[name].path, expected_path)
def test_user_add_snapshot(self):
user = self.getFakeUser()
self.assertEquals(user.snapshot_list, [])
user.append("SNAPSHOT")
self.assertEquals(user.snapshot_list, ["SNAPSHOT"])
def test_user_save(self):
disk_snapshot_params = {'enable': False}
user = self.getFakeUser(disk_snapshot_params)
process = psutil.Process(os.getpid())
user.append(snapshot.ProcessSnapshot(process))
database = FakeDatabase()
user.save(database, "DATE", "TIME")
self.assertEquals(database.invoked_method_list[0], ("connect", ""))
self.assertEquals(database.invoked_method_list[1][0], "insertUserSnapshot")
self.assertEquals(database.invoked_method_list[1][1][0], ("fakeuser0",))
self.assertEquals(database.invoked_method_list[1][1][1].keys(),
['cpu_time', 'cpu_percent', 'process',
'memory_rss', 'pid', 'memory_percent',
'io_rw_counter', 'insertion_date', 'insertion_time',
'io_cycles_counter', 'cpu_num_threads'])
self.assertEquals(database.invoked_method_list[2], ("commit", ""))
self.assertEquals(database.invoked_method_list[3], ("close", ""))
def test_user_save_disk_snapshot(self):
disk_snapshot_params = {'enable': True, 'testing': True}
user = self.getFakeUser(disk_snapshot_params)
process = psutil.Process(os.getpid())
user.append(snapshot.ProcessSnapshot(process))
database = FakeDatabase2()
user.save(database, "DATE", "TIME")
self.assertEquals(database.invoked_method_list[0], ("connect", ""))
self.assertEquals(database.invoked_method_list[1][0], "insertUserSnapshot")
self.assertEquals(database.invoked_method_list[1][1][0], ("fakeuser0",))
self.assertEquals(database.invoked_method_list[1][1][1].keys(),
['cpu_time', 'cpu_percent', 'process',
'memory_rss', 'pid', 'memory_percent',
'io_rw_counter', 'insertion_date', 'insertion_time',
'io_cycles_counter', 'cpu_num_threads'])
self.assertEquals(database.invoked_method_list[2], ("commit", ""))
self.assertEquals(database.invoked_method_list[3], ("close", ""))
self.assertEquals(database.invoked_method_list[4], ("connect", ""))
self.assertEquals(database.invoked_method_list[5][0], "inserFolderSnapshot")
self.assertEquals(database.invoked_method_list[5][1][0], ("fakeuser0",))
self.assertEquals(database.invoked_method_list[5][1][1].keys(),
['insertion_date', 'disk_usage', 'insertion_time'])
self.assertEquals(database.invoked_method_list[6], ("commit", ""))
self.assertEquals(database.invoked_method_list[7], ("close", ""))
def test_user_save_disk_snapshot_cycle(self):
disk_snapshot_params = {'enable': True, 'time_cycle': 3600, 'testing': True}
user = self.getFakeUser(disk_snapshot_params)
process = psutil.Process(os.getpid())
user.append(snapshot.ProcessSnapshot(process))
database = FakeDatabase2()
user.save(database, "DATE", "TIME")
self.assertEquals(database.invoked_method_list[0], ("connect", ""))
self.assertEquals(database.invoked_method_list[1][0], "insertUserSnapshot")
self.assertEquals(database.invoked_method_list[1][1][0], ("fakeuser0",))
self.assertEquals(database.invoked_method_list[1][1][1].keys(),
['cpu_time', 'cpu_percent', 'process',
'memory_rss', 'pid', 'memory_percent',
'io_rw_counter', 'insertion_date', 'insertion_time',
'io_cycles_counter', 'cpu_num_threads'])
self.assertEquals(database.invoked_method_list[2], ("commit", ""))
self.assertEquals(database.invoked_method_list[3], ("close", ""))
self.assertEquals(database.invoked_method_list[4], ("connect", ""))
self.assertEquals(database.invoked_method_list[5][0], "select")
self.assertEquals(database.invoked_method_list[5][1][0], ())
self.assertEquals(database.invoked_method_list[5][1][1].keys(),
['table', 'where', 'limit', 'order', 'columns'])
self.assertEquals(database.invoked_method_list[6][0], "inserFolderSnapshot")
self.assertEquals(database.invoked_method_list[6][1][0], ("fakeuser0",))
self.assertEquals(database.invoked_method_list[6][1][1].keys(),
['insertion_date', 'disk_usage', 'insertion_time'])
self.assertEquals(database.invoked_method_list[7], ("commit", ""))
self.assertEquals(database.invoked_method_list[8], ("close", ""))
def test_computer_entity(self):
computer = entity.Computer(snapshot.ComputerSnapshot())
database = FakeDatabase()
computer.save(database, "DATE", "TIME")
self.assertEquals(database.invoked_method_list[0], ("connect", ""))
self.assertEquals(database.invoked_method_list[1][0], "insertComputerSnapshot")
self.assertEquals(database.invoked_method_list[1][1][0], ())
self.assertEquals(database.invoked_method_list[1][1][1].keys(),
['insertion_time', 'insertion_date', 'cpu_num_core',
'partition_list', 'cpu_frequency', 'memory_size',
'cpu_type', 'memory_type'])
self.assertEquals(database.invoked_method_list[2][0], "insertSystemSnapshot")
self.assertEquals(database.invoked_method_list[2][1][0], ())
self.assertEquals(set(database.invoked_method_list[2][1][1].keys()),
set([ 'memory_used', 'cpu_percent', 'insertion_date', 'insertion_time',
'loadavg', 'memory_free', 'net_in_bytes', 'net_in_dropped',
'net_in_errors', 'net_out_bytes', 'net_out_dropped',
'net_out_errors']))
self.assertEquals(database.invoked_method_list[3][0], "insertDiskPartitionSnapshot")
self.assertEquals(database.invoked_method_list[3][1][0], ())
self.assertEquals(set(database.invoked_method_list[3][1][1].keys()),
set([ 'used', 'insertion_date', 'partition', 'free',
'mountpoint', 'insertion_time' ]))
self.assertEquals(database.invoked_method_list[-2], ("commit", ""))
self.assertEquals(database.invoked_method_list[-1], ("close", ""))
slapos.core-15dedb62a3565373c77977e5ebd2e56b4ffe341a-slapos-tests/slapos/tests/configure_local.py 0000664 0000000 0000000 00000012763 13232036066 0031661 0 ustar 00root root 0000000 0000000 ##############################################################################
#
# Copyright (c) 2014 Vifib SARL and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import os
import unittest
import shutil
import tempfile
import slapos.slap
import slapos.cli.configure_local
from slapos.cli.configure_local import ConfigureLocalCommand, _createConfigurationDirectory
from slapos.cli.entry import SlapOSApp
from argparse import Namespace
from ConfigParser import ConfigParser
# Disable any command to launch slapformat and supervisor
slapos.cli.configure_local._runFormat = lambda x: "Do nothing"
slapos.cli.configure_local.launchSupervisord = lambda instance_root, logger: "Do nothing"
class TestConfigureLocal(unittest.TestCase):
def setUp(self):
self.slap = slapos.slap.slap()
self.app = SlapOSApp()
self.temp_dir = tempfile.mkdtemp()
os.environ["HOME"] = self.temp_dir
self.instance_root = tempfile.mkdtemp()
self.software_root = tempfile.mkdtemp()
if os.path.exists(self.temp_dir):
shutil.rmtree(self.temp_dir)
def tearDown(self):
for temp_path in (self.temp_dir, \
self.instance_root, self.software_root):
if os.path.exists(temp_path):
shutil.rmtree(temp_path)
def test_configure_local_environment_with_default_value(self):
config = ConfigureLocalCommand(self.app, Namespace())
config.__dict__.update({i.dest: i.default \
for i in config.get_parser(None)._option_string_actions.values()})
config.slapos_configuration_directory = self.temp_dir
config.slapos_buildout_directory = self.temp_dir
config.slapos_instance_root = self.instance_root
slapos.cli.configure_local.do_configure(
config, config.fetch_config, self.app.log)
expected_software_root = "/opt/slapgrid"
self.assertTrue(
os.path.exists("%s/.slapos/slapos-client.cfg" % self.temp_dir))
with open(self.temp_dir + '/slapos-proxy.cfg') as fout:
proxy_config = ConfigParser()
proxy_config.readfp(fout)
self.assertEquals(proxy_config.get('slapos', 'instance_root'),
self.instance_root)
self.assertEquals(proxy_config.get('slapos', 'software_root'),
expected_software_root)
with open(self.temp_dir + '/slapos.cfg') as fout:
proxy_config = ConfigParser()
proxy_config.readfp(fout)
self.assertEquals(proxy_config.get('slapos', 'instance_root'),
self.instance_root)
self.assertEquals(proxy_config.get('slapos', 'software_root'),
expected_software_root)
def test_configure_local_environment(self):
config = ConfigureLocalCommand(self.app, Namespace())
config.__dict__.update({i.dest: i.default \
for i in config.get_parser(None)._option_string_actions.values()})
config.slapos_configuration_directory = self.temp_dir
config.slapos_buildout_directory = self.temp_dir
config.slapos_instance_root = self.instance_root
config.slapos_software_root = self.software_root
slapos.cli.configure_local.do_configure(
config, config.fetch_config, self.app.log)
log_folder = os.path.join(config.slapos_buildout_directory, 'log')
self.assertTrue(os.path.exists(log_folder), "%s not exists" % log_folder)
self.assertTrue(
os.path.exists("%s/.slapos/slapos-client.cfg" % self.temp_dir))
with open(self.temp_dir + '/slapos-proxy.cfg') as fout:
proxy_config = ConfigParser()
proxy_config.readfp(fout)
self.assertEquals(proxy_config.get('slapos', 'instance_root'),
self.instance_root)
self.assertEquals(proxy_config.get('slapos', 'software_root'),
self.software_root)
with open(self.temp_dir + '/slapos.cfg') as fout:
proxy_config = ConfigParser()
proxy_config.readfp(fout)
self.assertEquals(proxy_config.get('slapos', 'instance_root'),
self.instance_root)
self.assertEquals(proxy_config.get('slapos', 'software_root'),
self.software_root)
log_file = proxy_config.get('slapformat', 'log_file')
self.assertTrue(log_file.startswith(log_folder),
"%s don't starts with %s" % (log_file, log_folder))
slapos.core-15dedb62a3565373c77977e5ebd2e56b4ffe341a-slapos-tests/slapos/tests/distribution.py 0000664 0000000 0000000 00000006536 13232036066 0031246 0 ustar 00root root 0000000 0000000 # -*- coding: utf-8 -*-
##############################################################################
#
# Copyright (c) 2010-2014 Vifib SARL and Contributors.
# All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public License
# as published by the Free Software Foundation; either version 2.1
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import unittest
from slapos.grid import distribution
class TestDebianize(unittest.TestCase):
def test_debian_major(self):
"""
On debian, we only care about major release.
All the other tuples are unchanged.
"""
for provided, expected in [
(('CentOS', '6.3', 'Final'), None),
(('Ubuntu', '12.04', 'precise'), None),
(('Ubuntu', '13.04', 'raring'), None),
(('Fedora', '17', 'Beefy Miracle'), None),
(('debian', '6.0.6', ''), ('debian', '6', '')),
(('debian', '7.0', ''), ('debian', '7', '')),
]:
self.assertEqual(distribution._debianize(provided), expected or provided)
class TestOSMatches(unittest.TestCase):
def test_centos(self):
self.assertFalse(distribution.os_matches(('CentOS', '6.3', 'Final'),
('Ubuntu', '13.04', 'raring')))
self.assertFalse(distribution.os_matches(('CentOS', '6.3', 'Final'),
('debian', '6.3', '')))
def test_ubuntu(self):
self.assertFalse(distribution.os_matches(('Ubuntu', '12.04', 'precise'),
('Ubuntu', '13.04', 'raring')))
self.assertTrue(distribution.os_matches(('Ubuntu', '13.04', 'raring'),
('Ubuntu', '13.04', 'raring')))
self.assertTrue(distribution.os_matches(('Ubuntu', '12.04', 'precise'),
('Ubuntu', '12.04', 'precise')))
def test_debian(self):
self.assertFalse(distribution.os_matches(('debian', '6.0.6', ''),
('debian', '7.0', '')))
self.assertTrue(distribution.os_matches(('debian', '6.0.6', ''),
('debian', '6.0.5', '')))
self.assertTrue(distribution.os_matches(('debian', '6.0.6', ''),
('debian', '6.1', '')))
slapos.core-15dedb62a3565373c77977e5ebd2e56b4ffe341a-slapos-tests/slapos/tests/grid_utils.py 0000664 0000000 0000000 00000012014 13232036066 0030660 0 ustar 00root root 0000000 0000000 ##############################################################################
#
# Copyright (c) 2013 Vifib SARL and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import os
import slapos.grid.utils
import tempfile
import unittest
class TestGridUtils(unittest.TestCase):
def test_check_promise_success(self):
"""
check that checkPromiseList works well
"""
script = """#!/bin/sh
echo "%(name)s"
exit %(code)s
"""
root_directory = tempfile.mkdtemp()
promise_1 = "first_promise"
promise_2 = "second_promise"
promise_timeout = 10
with open(os.path.join(root_directory, promise_1), 'w') as f:
f.write(script % {'name': promise_1, 'code': 0})
with open(os.path.join(root_directory, promise_2), 'w') as f:
f.write(script % {'name': promise_2, 'code': 0})
for file in os.listdir(root_directory):
if 'promise' in file:
os.chmod(os.path.join(root_directory, file), 0755)
result_list = []
try:
result_list = slapos.grid.utils.checkPromiseList(
root_directory,
promise_timeout,
profile=True,
raise_on_failure=True,
logger=None)
except slapos.grid.utils.PromiseError:
self.fail("Unexpected raise of PromiseError in 'checkPromiseList()'")
for result in result_list:
self.assertEquals(result['returncode'], 0)
self.assertTrue(result['message'].strip() in [promise_1, promise_2])
def test_check_promise_failure(self):
"""
check that checkPromiseList works well
"""
script = """#!/bin/sh
echo "%(name)s"
exit %(code)s
"""
root_directory = tempfile.mkdtemp()
promise_1 = "first_promise"
promise_2 = "second_promise_fail"
promise_timeout = 10
with open(os.path.join(root_directory, promise_1), 'w') as f:
f.write(script % {'name': promise_1, 'code': 0})
with open(os.path.join(root_directory, promise_2), 'w') as f:
f.write(script % {'name': promise_2, 'code': 1})
for file in os.listdir(root_directory):
if 'promise' in file:
os.chmod(os.path.join(root_directory, file), 0755)
with self.assertRaises(slapos.grid.utils.PromiseError):
slapos.grid.utils.checkPromiseList(
root_directory,
promise_timeout,
profile=True,
raise_on_failure=True,
logger=None)
def test_check_promise_no_raise(self):
"""
check that checkPromiseList works well
"""
script = """#!/bin/sh
echo "%(name)s"
exit %(code)s
"""
root_directory = tempfile.mkdtemp()
promise_1 = "first_promise"
promise_2 = "second_promise"
promise_3 = "third_promise"
promise_4 = "fourth_promise_fail"
promise_timeout = 10
with open(os.path.join(root_directory, promise_1), 'w') as f:
f.write(script % {'name': promise_1, 'code': 0})
with open(os.path.join(root_directory, promise_2), 'w') as f:
f.write(script % {'name': promise_2, 'code': 0})
with open(os.path.join(root_directory, promise_3), 'w') as f:
f.write(script % {'name': promise_3, 'code': 0})
with open(os.path.join(root_directory, promise_4), 'w') as f:
f.write(script % {'name': promise_4, 'code': 1})
for file in os.listdir(root_directory):
if 'promise' in file:
os.chmod(os.path.join(root_directory, file), 0755)
result_list = []
try:
result_list = slapos.grid.utils.checkPromiseList(
root_directory,
promise_timeout,
profile=True,
raise_on_failure=False,
logger=None)
except slapos.grid.utils.PromiseError:
self.fail("Unexpected raise of PromiseError in 'checkPromiseList()'")
for result in result_list:
self.assertTrue(result['message'].strip() in [promise_1, promise_2, promise_3, promise_4])
if result['title'] == promise_4:
self.assertEquals(result['returncode'], 1)
else:
self.assertEquals(result['returncode'], 0)
if __name__ == '__main__':
unittest.main()
slapos.core-15dedb62a3565373c77977e5ebd2e56b4ffe341a-slapos-tests/slapos/tests/interface.py 0000664 0000000 0000000 00000010060 13232036066 0030452 0 ustar 00root root 0000000 0000000 ##############################################################################
#
# Copyright (c) 2010 Vifib SARL and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import unittest
from zope.interface.verify import verifyClass
import zope.interface
import types
from slapos import slap
def getOnlyImplementationAssertionMethod(klass, method_list):
"""Returns method which verifies if a klass only implements its interfaces"""
def testMethod(self):
implemented_method_list = [x for x in dir(klass) \
if ((not x.startswith('_')) and callable(getattr(klass, x)))]
for interface_method in method_list:
if interface_method in implemented_method_list:
implemented_method_list.remove(interface_method)
if implemented_method_list:
raise AssertionError("Unexpected methods %s" % implemented_method_list)
return testMethod
def getImplementationAssertionMethod(klass, interface):
"""Returns method which verifies if interface is properly implemented by klass"""
def testMethod(self):
verifyClass(interface, klass)
return testMethod
def getDeclarationAssertionMethod(klass):
"""Returns method which verifies if klass is declaring interface"""
def testMethod(self):
if len(list(zope.interface.implementedBy(klass))) == 0:
self.fail('%s class does not respect its interface(s).' % klass.__name__)
return testMethod
def generateTestMethodListOnClass(klass, module):
"""Generate test method on klass"""
for class_id in dir(module):
implementing_class = getattr(module, class_id)
if type(implementing_class) not in (types.ClassType, types.TypeType):
continue
# add methods to assert that publicly available classes are defining
# interfaces
method_name = 'test_%s_declares_interface' % (class_id,)
setattr(klass, method_name, getDeclarationAssertionMethod(
implementing_class))
implemented_method_list = []
for interface in list(zope.interface.implementedBy(implementing_class)):
# for each interface which class declares add a method which verify
# implementation
method_name = 'test_%s_implements_%s' % (class_id,
interface.__identifier__)
setattr(klass, method_name, getImplementationAssertionMethod(
implementing_class, interface))
for interface_klass in interface.__iro__:
implemented_method_list.extend(interface_klass.names())
# for each interface which class declares, check that no other method are
# available
method_name = 'test_%s_only_implements' % class_id
setattr(klass, method_name, getOnlyImplementationAssertionMethod(
implementing_class,
implemented_method_list))
class TestInterface(unittest.TestCase):
"""Tests all publicly available classes of slap
Classes are checked *if* they implement interface and if the implementation
is correct.
"""
# add methods to test class
generateTestMethodListOnClass(TestInterface, slap)
if __name__ == '__main__':
unittest.main()
slapos.core-15dedb62a3565373c77977e5ebd2e56b4ffe341a-slapos-tests/slapos/tests/pyflakes/ 0000775 0000000 0000000 00000000000 13232036066 0027761 5 ustar 00root root 0000000 0000000 slapos.core-15dedb62a3565373c77977e5ebd2e56b4ffe341a-slapos-tests/slapos/tests/pyflakes/__init__.py 0000664 0000000 0000000 00000004242 13232036066 0032074 0 ustar 00root root 0000000 0000000 ##############################################################################
#
# Copyright (c) 2013 Vifib SARL and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import os
import pkg_resources
import pyflakes.scripts.pyflakes
import sys
import unittest
class CheckCodeConsistency(unittest.TestCase):
"""Lints all SlapOS Node and SLAP library code base."""
def setUp(self):
self._original_argv = sys.argv
sys.argv = [sys.argv[0],
os.path.join(
pkg_resources.get_distribution('slapos.core').location,
'slapos',
)
]
def tearDown(self):
sys.argv = self._original_argv
@unittest.skip('pyflakes test is disabled')
def testCodeConsistency(self):
if pyflakes.scripts.pyflakes.main.func_code.co_argcount:
pyflakes.scripts.pyflakes.main([
os.path.join(
pkg_resources.get_distribution('slapos.core').location,
'slapos',
)])
else:
pyflakes.scripts.pyflakes.main()
slapos.core-15dedb62a3565373c77977e5ebd2e56b4ffe341a-slapos-tests/slapos/tests/register.py 0000664 0000000 0000000 00000004145 13232036066 0030345 0 ustar 00root root 0000000 0000000 ##############################################################################
#
# Copyright (c) 2010 Vifib SARL and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import unittest
import slapos.cli.register
class TestRegister(unittest.TestCase):
""" Tests for slapos.cli.register
XXX There is a lack of tests for register, so include more.
"""
def test_fetch_configuration(self):
""" Simple test to Fetch the configuration template
"""
template = slapos.cli.register.fetch_configuration_template()
self.assertNotEquals("", template)
for entry in ['computer_id',
'master_url',
'key_file',
'cert_file',
'certificate_repository_path',
'interface_name',
'ipv4_local_network',
'partition_amount',
'create_tap']:
self.assertTrue(entry in template, "%s is not in template (%s)" % (entry, template))
slapos.core-15dedb62a3565373c77977e5ebd2e56b4ffe341a-slapos-tests/slapos/tests/slap.py 0000664 0000000 0000000 00000132776 13232036066 0027474 0 ustar 00root root 0000000 0000000 ##############################################################################
#
# Copyright (c) 2010 Vifib SARL and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import logging
import os
import unittest
import urlparse
import tempfile
import httmock
import slapos.slap
import xml_marshaller
class UndefinedYetException(Exception):
"""To catch exceptions which are not yet defined"""
class SlapMixin(unittest.TestCase):
"""
Useful methods for slap tests
"""
def setUp(self):
self._server_url = os.environ.get('TEST_SLAP_SERVER_URL', None)
if self._server_url is None:
self.server_url = 'http://localhost/'
else:
self.server_url = self._server_url
print 'Testing against SLAP server %r' % self.server_url
self.slap = slapos.slap.slap()
self.partition_id = 'PARTITION_01'
if os.environ.has_key('SLAPGRID_INSTANCE_ROOT'):
del os.environ['SLAPGRID_INSTANCE_ROOT']
def tearDown(self):
pass
def _getTestComputerId(self):
"""
Returns the computer id used by the test
"""
return self.id()
class TestSlap(SlapMixin):
"""
Test slap against slap server
"""
def test_slap_initialisation(self):
"""
Asserts that slap initialisation works properly in case of
passing correct url
"""
slap_instance = slapos.slap.slap()
slap_instance.initializeConnection(self.server_url)
self.assertEquals(slap_instance._connection_helper.slapgrid_uri, self.server_url)
def test_slap_initialisation_ipv6_and_port(self):
slap_instance = slapos.slap.slap()
slap_instance.initializeConnection("http://1234:1234:1234:1234:1:1:1:1:5000/foo/")
self.assertEqual(
slap_instance._connection_helper.slapgrid_uri,
"http://[1234:1234:1234:1234:1:1:1:1]:5000/foo/"
)
def test_slap_initialisation_ipv6_without_port(self):
slap_instance = slapos.slap.slap()
slap_instance.initializeConnection("http://1234:1234:1234:1234:1:1:1:1/foo/")
self.assertEqual(
slap_instance._connection_helper.slapgrid_uri,
"http://[1234:1234:1234:1234:1:1:1:1]/foo/"
)
def test_slap_initialisation_ipv6_with_bracket(self):
slap_instance = slapos.slap.slap()
slap_instance.initializeConnection("http://[1234:1234:1234:1234:1:1:1:1]:5000/foo/")
self.assertEqual(
slap_instance._connection_helper.slapgrid_uri,
"http://[1234:1234:1234:1234:1:1:1:1]:5000/foo/"
)
def test_slap_initialisation_ipv4(self):
slap_instance = slapos.slap.slap()
slap_instance.initializeConnection("http://127.0.0.1:5000/foo/")
self.assertEqual(
slap_instance._connection_helper.slapgrid_uri,
"http://127.0.0.1:5000/foo/"
)
def test_slap_initialisation_hostname(self):
# XXX this really opens a connection !
slap_instance = slapos.slap.slap()
slap_instance.initializeConnection("http://foo.com:5000/foo/")
self.assertEqual(
slap_instance._connection_helper.slapgrid_uri,
"http://foo.com:5000/foo/"
)
def test_registerComputer_with_new_guid(self):
"""
Asserts that calling slap.registerComputer with new guid returns
Computer object
"""
computer_guid = self._getTestComputerId()
self.slap = slapos.slap.slap()
self.slap.initializeConnection(self.server_url)
computer = self.slap.registerComputer(computer_guid)
self.assertIsInstance(computer, slapos.slap.Computer)
def test_registerComputer_with_existing_guid(self):
"""
Asserts that calling slap.registerComputer with already used guid
returns Computer object
"""
computer_guid = self._getTestComputerId()
self.slap = slapos.slap.slap()
self.slap.initializeConnection(self.server_url)
computer = self.slap.registerComputer(computer_guid)
self.assertIsInstance(computer, slapos.slap.Computer)
computer2 = self.slap.registerComputer(computer_guid)
self.assertIsInstance(computer2, slapos.slap.Computer)
# XXX: There is naming conflict in slap library.
# SoftwareRelease is currently used as suboject of Slap transmission object
def test_registerSoftwareRelease_with_new_uri(self):
"""
Asserts that calling slap.registerSoftwareRelease with new guid
returns SoftwareRelease object
"""
software_release_uri = 'http://server/' + self._getTestComputerId()
self.slap = slapos.slap.slap()
self.slap.initializeConnection(self.server_url)
software_release = self.slap.registerSoftwareRelease(software_release_uri)
self.assertIsInstance(software_release, slapos.slap.SoftwareRelease)
def test_registerSoftwareRelease_with_existing_uri(self):
"""
Asserts that calling slap.registerSoftwareRelease with already
used guid returns SoftwareRelease object
"""
software_release_uri = 'http://server/' + self._getTestComputerId()
self.slap = slapos.slap.slap()
self.slap.initializeConnection(self.server_url)
software_release = self.slap.registerSoftwareRelease(software_release_uri)
self.assertIsInstance(software_release, slapos.slap.SoftwareRelease)
software_release2 = self.slap.registerSoftwareRelease(software_release_uri)
self.assertIsInstance(software_release2, slapos.slap.SoftwareRelease)
def test_registerComputerPartition_new_partition_id_known_computer_guid(self):
"""
Asserts that calling slap.registerComputerPartition on known computer
returns ComputerPartition object
"""
computer_guid = self._getTestComputerId()
partition_id = self.partition_id
self.slap.initializeConnection(self.server_url)
self.slap.registerComputer(computer_guid)
def handler(url, req):
qs = urlparse.parse_qs(url.query)
if (url.path == '/registerComputerPartition'
and qs == {
'computer_reference': [computer_guid],
'computer_partition_reference': [partition_id]
}):
partition = slapos.slap.ComputerPartition(computer_guid, partition_id)
return {
'status_code': 200,
'content': xml_marshaller.xml_marshaller.dumps(partition)
}
else:
return {'status_code': 400}
self._handler = handler
with httmock.HTTMock(handler):
partition = self.slap.registerComputerPartition(computer_guid, partition_id)
self.assertIsInstance(partition, slapos.slap.ComputerPartition)
def test_registerComputerPartition_existing_partition_id_known_computer_guid(self):
"""
Asserts that calling slap.registerComputerPartition on known computer
returns ComputerPartition object
"""
self.test_registerComputerPartition_new_partition_id_known_computer_guid()
with httmock.HTTMock(self._handler):
partition = self.slap.registerComputerPartition(self._getTestComputerId(),
self.partition_id)
self.assertIsInstance(partition, slapos.slap.ComputerPartition)
def test_registerComputerPartition_unknown_computer_guid(self):
"""
Asserts that calling slap.registerComputerPartition on unknown
computer raises NotFoundError exception
"""
computer_guid = self._getTestComputerId()
self.slap.initializeConnection(self.server_url)
partition_id = 'PARTITION_01'
def handler(url, req):
qs = urlparse.parse_qs(url.query)
if (url.path == '/registerComputerPartition'
and qs == {
'computer_reference': [computer_guid],
'computer_partition_reference': [partition_id]
}):
return {'status_code': 404}
else:
return {'status_code': 0}
with httmock.HTTMock(handler):
self.assertRaises(slapos.slap.NotFoundError,
self.slap.registerComputerPartition,
computer_guid, partition_id)
def test_getFullComputerInformation_empty_computer_guid(self):
"""
Asserts that calling getFullComputerInformation with empty computer_id
raises early, before calling master.
"""
self.slap.initializeConnection(self.server_url)
def handler(url, req):
# Shouldn't even be called
self.assertFalse(True)
with httmock.HTTMock(handler):
self.assertRaises(slapos.slap.NotFoundError,
self.slap._connection_helper.getFullComputerInformation,
None)
def test_registerComputerPartition_empty_computer_guid(self):
"""
Asserts that calling registerComputerPartition with empty computer_id
raises early, before calling master.
"""
self.slap.initializeConnection(self.server_url)
def handler(url, req):
# Shouldn't even be called
self.assertFalse(True)
with httmock.HTTMock(handler):
self.assertRaises(slapos.slap.NotFoundError,
self.slap.registerComputerPartition,
None, 'PARTITION_01')
def test_registerComputerPartition_empty_computer_partition_id(self):
"""
Asserts that calling registerComputerPartition with empty
computer_partition_id raises early, before calling master.
"""
self.slap.initializeConnection(self.server_url)
def handler(url, req):
# Shouldn't even be called
self.assertFalse(True)
with httmock.HTTMock(handler):
self.assertRaises(slapos.slap.NotFoundError,
self.slap.registerComputerPartition,
self._getTestComputerId(), None)
def test_registerComputerPartition_empty_computer_guid_empty_computer_partition_id(self):
"""
Asserts that calling registerComputerPartition with empty
computer_partition_id raises early, before calling master.
"""
self.slap.initializeConnection(self.server_url)
def handler(url, req):
# Shouldn't even be called
self.assertFalse(True)
with httmock.HTTMock(handler):
self.assertRaises(slapos.slap.NotFoundError,
self.slap.registerComputerPartition,
None, None)
def test_getSoftwareReleaseListFromSoftwareProduct_software_product_reference(self):
"""
Check that slap.getSoftwareReleaseListFromSoftwareProduct calls
"/getSoftwareReleaseListFromSoftwareProduct" URL with correct parameters,
with software_product_reference parameter being specified.
"""
self.slap.initializeConnection(self.server_url)
software_product_reference = 'random_reference'
software_release_url_list = ['1', '2']
def handler(url, req):
qs = urlparse.parse_qs(url.query)
if (url.path == '/getSoftwareReleaseListFromSoftwareProduct'
and qs == {'software_product_reference': [software_product_reference]}):
return {
'status_code': 200,
'content': xml_marshaller.xml_marshaller.dumps(software_release_url_list)
}
with httmock.HTTMock(handler):
self.assertEqual(
self.slap.getSoftwareReleaseListFromSoftwareProduct(
software_product_reference=software_product_reference),
software_release_url_list
)
def test_getSoftwareReleaseListFromSoftwareProduct_software_release_url(self):
"""
Check that slap.getSoftwareReleaseListFromSoftwareProduct calls
"/getSoftwareReleaseListFromSoftwareProduct" URL with correct parameters,
with software_release_url parameter being specified.
"""
self.slap.initializeConnection(self.server_url)
software_release_url = 'random_url'
software_release_url_list = ['1', '2']
def handler(url, req):
qs = urlparse.parse_qs(url.query)
if (url.path == '/getSoftwareReleaseListFromSoftwareProduct'
and qs == {'software_release_url': [software_release_url]}):
return {
'status_code': 200,
'content': xml_marshaller.xml_marshaller.dumps(software_release_url_list)
}
with httmock.HTTMock(handler):
self.assertEqual(
self.slap.getSoftwareReleaseListFromSoftwareProduct(
software_release_url=software_release_url),
software_release_url_list
)
def test_getSoftwareReleaseListFromSoftwareProduct_too_many_parameters(self):
"""
Check that slap.getSoftwareReleaseListFromSoftwareProduct raises if
both parameters are set.
"""
self.assertRaises(
AttributeError,
self.slap.getSoftwareReleaseListFromSoftwareProduct, 'foo', 'bar'
)
def test_getSoftwareReleaseListFromSoftwareProduct_no_parameter(self):
"""
Check that slap.getSoftwareReleaseListFromSoftwareProduct raises if
both parameters are either not set or None.
"""
self.assertRaises(
AttributeError,
self.slap.getSoftwareReleaseListFromSoftwareProduct
)
def test_initializeConnection_getHateoasUrl(self):
"""
Test that by default, slap will try to fetch Hateoas URL from XML/RPC URL.
"""
hateoas_url = 'foo'
def handler(url, req):
qs = urlparse.parse_qs(url.query)
if (url.path == '/getHateoasUrl'):
return {
'status_code': 200,
'content': hateoas_url
}
with httmock.HTTMock(handler):
self.slap.initializeConnection('http://bar')
self.assertEqual(
self.slap._hateoas_navigator.slapos_master_hateoas_uri,
hateoas_url
)
def test_initializeConnection_specifiedHateoasUrl(self):
"""
Test that if rest URL is specified, slap will NOT try to fetch
Hateoas URL from XML/RPC URL.
"""
hateoas_url = 'foo'
def handler(url, req):
qs = urlparse.parse_qs(url.query)
if (url.path == '/getHateoasUrl'):
self.fail('slap should not have contacted master to get Hateoas URL.')
with httmock.HTTMock(handler):
self.slap.initializeConnection('http://bar', slapgrid_rest_uri=hateoas_url)
self.assertEqual(
self.slap._hateoas_navigator.slapos_master_hateoas_uri,
hateoas_url
)
def test_initializeConnection_noHateoasUrl(self):
"""
Test that if no rest URL is specified and master does not know about rest,
it still work.
"""
hateoas_url = 'foo'
def handler(url, req):
qs = urlparse.parse_qs(url.query)
if (url.path == '/getHateoasUrl'):
return {
'status_code': 404,
}
with httmock.HTTMock(handler):
self.slap.initializeConnection('http://bar')
self.assertEqual(None, getattr(self.slap, '_hateoas_navigator', None))
class TestComputer(SlapMixin):
"""
Tests slapos.slap.slap.Computer class functionality
"""
def test_computer_getComputerPartitionList_no_partition(self):
"""
Asserts that calling Computer.getComputerPartitionList without Computer
Partitions returns empty list
"""
computer_guid = self._getTestComputerId()
slap = self.slap
slap.initializeConnection(self.server_url)
def handler(url, req):
qs = urlparse.parse_qs(url.query)
if (url.path == '/registerComputerPartition'
and 'computer_reference' in qs
and 'computer_partition_reference' in qs):
slap_partition = slapos.slap.ComputerPartition(
qs['computer_reference'][0],
qs['computer_partition_reference'][0])
return {
'status_code': 200,
'content': xml_marshaller.xml_marshaller.dumps(slap_partition)
}
elif (url.path == '/getFullComputerInformation'
and 'computer_id' in qs):
slap_computer = slapos.slap.Computer(qs['computer_id'][0])
slap_computer._software_release_list = []
slap_computer._computer_partition_list = []
return {
'status_code': 200,
'content': xml_marshaller.xml_marshaller.dumps(slap_computer)
}
elif url.path == '/requestComputerPartition':
return {'status_code': 408}
else:
return {'status_code': 404}
with httmock.HTTMock(handler):
computer = self.slap.registerComputer(computer_guid)
self.assertEqual(computer.getComputerPartitionList(), [])
def _test_computer_empty_computer_guid(self, computer_method):
"""
Helper method checking if calling Computer method with empty id raises
early.
"""
self.slap.initializeConnection(self.server_url)
def handler(url, req):
# Shouldn't even be called
self.assertFalse(True)
with httmock.HTTMock(handler):
computer = self.slap.registerComputer(None)
self.assertRaises(slapos.slap.NotFoundError,
getattr(computer, computer_method))
def test_computer_getComputerPartitionList_empty_computer_guid(self):
"""
Asserts that calling getComputerPartitionList with empty
computer_guid raises early, before calling master.
"""
self._test_computer_empty_computer_guid('getComputerPartitionList')
def test_computer_getSoftwareReleaseList_empty_computer_guid(self):
"""
Asserts that calling getSoftwareReleaseList with empty
computer_guid raises early, before calling master.
"""
self._test_computer_empty_computer_guid('getSoftwareReleaseList')
def test_computer_getComputerPartitionList_only_partition(self):
"""
Asserts that calling Computer.getComputerPartitionList with only
Computer Partitions returns empty list
"""
self.computer_guid = self._getTestComputerId()
partition_id = 'PARTITION_01'
self.slap = slapos.slap.slap()
self.slap.initializeConnection(self.server_url)
def handler(url, req):
qs = urlparse.parse_qs(url.query)
if (url.path == '/registerComputerPartition'
and qs == {
'computer_reference': [self.computer_guid],
'computer_partition_reference': [partition_id]
}):
partition = slapos.slap.ComputerPartition(self.computer_guid, partition_id)
return {
'status_code': 200,
'content': xml_marshaller.xml_marshaller.dumps(partition)
}
elif (url.path == '/getFullComputerInformation'
and 'computer_id' in qs):
slap_computer = slapos.slap.Computer(qs['computer_id'][0])
slap_computer._computer_partition_list = []
return {
'status_code': 200,
'content': xml_marshaller.xml_marshaller.dumps(slap_computer)
}
else:
return {'status_code': 400}
with httmock.HTTMock(handler):
self.computer = self.slap.registerComputer(self.computer_guid)
self.partition = self.slap.registerComputerPartition(self.computer_guid,
partition_id)
self.assertEqual(self.computer.getComputerPartitionList(), [])
@unittest.skip("Not implemented")
def test_computer_reportUsage_non_valid_xml_raises(self):
"""
Asserts that calling Computer.reportUsage with non DTD
(not defined yet) XML raises (not defined yet) exception
"""
self.computer_guid = self._getTestComputerId()
self.slap = slapos.slap.slap()
self.slap.initializeConnection(self.server_url)
self.computer = self.slap.registerComputer(self.computer_guid)
non_dtd_xml = """
value
"""
self.assertRaises(UndefinedYetException,
self.computer.reportUsage,
non_dtd_xml)
@unittest.skip("Not implemented")
def test_computer_reportUsage_valid_xml_invalid_partition_raises(self):
"""
Asserts that calling Computer.reportUsage with DTD (not defined
yet) XML which refers to invalid partition raises (not defined yet)
exception
"""
self.computer_guid = self._getTestComputerId()
partition_id = 'PARTITION_01'
self.slap = slapos.slap.slap()
self.slap.initializeConnection(self.server_url)
self.computer = self.slap.registerComputer(self.computer_guid)
self.partition = self.slap.registerComputerPartition(self.computer_guid,
partition_id)
# XXX: As DTD is not defined currently proper XML is not known
bad_partition_dtd_xml = """
URL_CONNECTION_PARAMETER
""",
slap_computer_id=computer_guid,
slap_computer_partition_id=requested_partition_id)
return {
'status_code': 200,
'content': xml_marshaller.xml_marshaller.dumps(slap_partition)
}
with httmock.HTTMock(handler):
computer_partition = open_order.request(software_release_uri, 'myrefe')
self.assertIsInstance(computer_partition, slapos.slap.ComputerPartition)
self.assertEqual(requested_partition_id, computer_partition.getId())
self.assertEqual("URL_CONNECTION_PARAMETER",
computer_partition.getConnectionParameter('url'))
class TestSoftwareProductCollection(SlapMixin):
def setUp(self):
SlapMixin.setUp(self)
self.real_getSoftwareReleaseListFromSoftwareProduct =\
slapos.slap.slap.getSoftwareReleaseListFromSoftwareProduct
def fake_getSoftwareReleaseListFromSoftwareProduct(inside_self, software_product_reference):
return self.getSoftwareReleaseListFromSoftwareProduct_response
slapos.slap.slap.getSoftwareReleaseListFromSoftwareProduct =\
fake_getSoftwareReleaseListFromSoftwareProduct
self.product_collection = slapos.slap.SoftwareProductCollection(
logging.getLogger(), slapos.slap.slap())
def tearDown(self):
slapos.slap.slap.getSoftwareReleaseListFromSoftwareProduct =\
self.real_getSoftwareReleaseListFromSoftwareProduct
def test_get_product(self):
"""
Test that the get method (aliased to __getattr__) returns the first element
of the list given by getSoftwareReleaseListFromSoftwareProduct (i.e the
best one).
"""
self.getSoftwareReleaseListFromSoftwareProduct_response = ['0', '1', '2']
self.assertEqual(
self.product_collection.get('random_reference'),
self.getSoftwareReleaseListFromSoftwareProduct_response[0]
)
def test_get_product_empty_product(self):
"""
Test that the get method (aliased to __getattr__) raises if no
Software Release is related to the Software Product, or if the
Software Product does not exist.
"""
self.getSoftwareReleaseListFromSoftwareProduct_response = []
self.assertRaises(
AttributeError,
self.product_collection.get, 'random_reference',
)
def test_get_product_getattr(self):
"""
Test that __getattr__ method is bound to get() method.
"""
self.getSoftwareReleaseListFromSoftwareProduct_response = ['0']
self.product_collection.foo
self.assertEqual(
self.product_collection.__getattr__,
self.product_collection.get
)
self.assertEqual(self.product_collection.foo, '0')
if __name__ == '__main__':
print 'You can point to any SLAP server by setting TEST_SLAP_SERVER_URL '\
'environment variable'
unittest.main()
slapos.core-15dedb62a3565373c77977e5ebd2e56b4ffe341a-slapos-tests/slapos/tests/slapformat.py 0000664 0000000 0000000 00000073673 13232036066 0030705 0 ustar 00root root 0000000 0000000 # -*- coding: utf-8 -*-
##############################################################################
#
# Copyright (c) 2012 Vifib SARL and Contributors.
# All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly advised to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import glob
import logging
import slapos.format
import slapos.util
import slapos.manager.cpuset
import unittest
import netaddr
import shutil
import socket
# for mocking
import grp
import netifaces
import os
import pwd
import time
import mock
USER_LIST = []
GROUP_LIST = []
INTERFACE_DICT = {}
def file_content(file_path):
"""Read file(s) content."""
if isinstance(file_path, (list, tuple)):
return [file_content(fx) for fx in file_path]
with open(file_path, "rt") as fi:
return fi.read().strip()
def file_write(stuff, file_path):
"""Write stuff into file_path."""
with open(file_path, "wt") as fo:
fo.write(stuff)
class FakeConfig:
pass
class TestLoggerHandler(logging.Handler):
def __init__(self, *args, **kwargs):
self.bucket = []
logging.Handler.__init__(self, *args, **kwargs)
def emit(self, record):
self.bucket.append(record.msg)
class FakeCallAndRead:
def __init__(self):
self.external_command_list = []
def __call__(self, argument_list, raise_on_error=True):
retval = 0, 'UP'
global INTERFACE_DICT
if 'useradd' in argument_list:
print argument_list
global USER_LIST
username = argument_list[-1]
if username == '-r':
username = argument_list[-2]
USER_LIST.append(username)
elif 'groupadd' in argument_list:
global GROUP_LIST
GROUP_LIST.append(argument_list[-1])
elif argument_list[:3] == ['ip', 'addr', 'add']:
ip, interface = argument_list[3], argument_list[5]
if ':' not in ip:
netmask = netaddr.strategy.ipv4.int_to_str(
netaddr.strategy.ipv4.prefix_to_netmask[int(ip.split('/')[1])])
ip = ip.split('/')[0]
INTERFACE_DICT[interface][socket.AF_INET].append({'addr': ip, 'netmask': netmask})
else:
netmask = netaddr.strategy.ipv6.int_to_str(
netaddr.strategy.ipv6.prefix_to_netmask[int(ip.split('/')[1])])
ip = ip.split('/')[0]
INTERFACE_DICT[interface][socket.AF_INET6].append({'addr': ip, 'netmask': netmask})
# stabilise by mangling ip to just ip string
argument_list[3] = 'ip/%s' % netmask
elif argument_list[:3] == ['ip', 'addr', 'list'] or \
argument_list[:4] == ['ip', '-6', 'addr', 'list']:
retval = 0, str(INTERFACE_DICT)
elif argument_list[:3] == ['ip', 'route', 'show']:
retval = 0, 'OK'
elif argument_list[:3] == ['route', 'add', '-host']:
retval = 0, 'OK'
elif argument_list[:2] == ['brctl', 'show']:
retval = 0, "\n".join(("bridge name bridge id STP enabled interfaces",
"bridge bridge bridge b001 000:000 1 fakeinterface",
" fakeinterface2"
""))
self.external_command_list.append(' '.join(argument_list))
return retval
class LoggableWrapper:
def __init__(self, logger, name):
self.__logger = logger
self.__name = name
def __call__(self, *args, **kwargs):
arg_list = [repr(x) for x in args] + [
'%s=%r' % (x, y) for x, y in kwargs.iteritems()]
self.__logger.debug('%s(%s)' % (self.__name, ', '.join(arg_list)))
class TimeMock:
@classmethod
def sleep(self, seconds):
return
class GrpMock:
@classmethod
def getgrnam(self, name):
global GROUP_LIST
if name in GROUP_LIST:
return True
raise KeyError
class PwdMock:
@classmethod
def getpwnam(self, name):
global USER_LIST
if name in USER_LIST:
class PwdResult:
def __init__(self, name):
self.pw_name = name
self.pw_uid = self.pw_gid = USER_LIST.index(name)
def __getitem__(self, index):
if index == 0:
return self.pw_name
if index == 2:
return self.pw_uid
if index == 3:
return self.pw_gid
return PwdResult(name)
raise KeyError("User \"{}\" not in global USER_LIST {!s}".format(name, USER_LIST))
class NetifacesMock:
@classmethod
def ifaddresses(self, name):
global INTERFACE_DICT
if name in INTERFACE_DICT:
return INTERFACE_DICT[name]
raise ValueError("Interface \"{}\" not in INTERFACE_DICT {!s}".format(
name, INTERFACE_DICT))
@classmethod
def interfaces(self):
global INTERFACE_DICT
return INTERFACE_DICT.keys()
class SlaposUtilMock:
@classmethod
def chownDirectory(*args, **kw):
pass
class SlapformatMixin(unittest.TestCase):
# keep big diffs
maxDiff = None
def patchNetifaces(self):
self.netifaces = NetifacesMock()
self.saved_netifaces = {}
for fake in vars(NetifacesMock):
self.saved_netifaces[fake] = getattr(netifaces, fake, None)
setattr(netifaces, fake, getattr(self.netifaces, fake))
def restoreNetifaces(self):
for name, original_value in self.saved_netifaces.items():
setattr(netifaces, name, original_value)
del self.saved_netifaces
def patchPwd(self):
self.saved_pwd = {}
for fake in vars(PwdMock):
self.saved_pwd[fake] = getattr(pwd, fake, None)
setattr(pwd, fake, getattr(PwdMock, fake))
def restorePwd(self):
for name, original_value in self.saved_pwd.items():
setattr(pwd, name, original_value)
del self.saved_pwd
def patchTime(self):
self.saved_time = {}
for fake in vars(TimeMock):
self.saved_time[fake] = getattr(time, fake, None)
setattr(time, fake, getattr(TimeMock, fake))
def restoreTime(self):
for name, original_value in self.saved_time.items():
setattr(time, name, original_value)
del self.saved_time
def patchGrp(self):
self.saved_grp = {}
for fake in vars(GrpMock):
self.saved_grp[fake] = getattr(grp, fake, None)
setattr(grp, fake, getattr(GrpMock, fake))
def restoreGrp(self):
for name, original_value in self.saved_grp.items():
setattr(grp, name, original_value)
del self.saved_grp
def patchOs(self, logger):
self.saved_os = {}
for fake in ['mkdir', 'chown', 'chmod', 'makedirs']:
self.saved_os[fake] = getattr(os, fake, None)
f = LoggableWrapper(logger, fake)
setattr(os, fake, f)
def restoreOs(self):
if not hasattr(self, 'saved_os'):
return # os was never patched or already restored
for name, original_value in self.saved_os.items():
setattr(os, name, original_value)
del self.saved_os
def patchSlaposUtil(self):
self.saved_slapos_util = {}
for fake in ['chownDirectory']:
self.saved_slapos_util[fake] = getattr(slapos.util, fake, None)
setattr(slapos.util, fake, getattr(SlaposUtilMock, fake))
def restoreSlaposUtil(self):
for name, original_value in self.saved_slapos_util.items():
setattr(slapos.util, name, original_value)
del self.saved_slapos_util
def setUp(self):
config = FakeConfig()
config.dry_run = True
config.verbose = True
logger = logging.getLogger('testcatch')
logger.setLevel(logging.DEBUG)
self.test_result = TestLoggerHandler()
logger.addHandler(self.test_result)
config.logger = logger
if hasattr(self, "logger"):
raise ValueError("{} already has logger attached".format(self.__class__.__name__))
self.logger = logger
self.partition = slapos.format.Partition('partition', '/part_path',
slapos.format.User('testuser'), [], None)
global USER_LIST
USER_LIST = []
global GROUP_LIST
GROUP_LIST = []
global INTERFACE_DICT
INTERFACE_DICT = {}
self.real_callAndRead = slapos.format.callAndRead
self.fakeCallAndRead = FakeCallAndRead()
slapos.format.callAndRead = self.fakeCallAndRead
self.patchOs(logger)
self.patchGrp()
self.patchTime()
self.patchPwd()
self.patchNetifaces()
self.patchSlaposUtil()
def tearDown(self):
self.restoreOs()
self.restoreGrp()
self.restoreTime()
self.restorePwd()
self.restoreNetifaces()
self.restoreSlaposUtil()
slapos.format.callAndRead = self.real_callAndRead
class TestComputer(SlapformatMixin):
def test_getAddress_empty_computer(self):
computer = slapos.format.Computer('computer', instance_root='/instance_root', software_root='software_root')
self.assertEqual(computer.getAddress(), {'netmask': None, 'addr': None})
@unittest.skip("Not implemented")
def test_construct_empty(self):
computer = slapos.format.Computer('computer', instance_root='/instance_root', software_root='software_root')
computer.format()
@unittest.skip("Not implemented")
def test_construct_empty_prepared(self):
computer = slapos.format.Computer('computer',
instance_root='/instance_root',
software_root='/software_root',
interface=slapos.format.Interface(
logger=self.logger, name='bridge', ipv4_local_network='127.0.0.1/16'),
partition_list=[])
computer.format()
self.assertEqual([
"makedirs('/instance_root', 493)",
"makedirs('/software_root', 493)",
"chown('/software_root', 0, 0)",
"chmod('/software_root', 493)"],
self.test_result.bucket)
self.assertEqual([
'ip addr list bridge',
'groupadd slapsoft',
'useradd -d /software_root -g slapsoft slapsoft -r'
],
self.fakeCallAndRead.external_command_list)
def test_construct_empty_prepared_no_alter_user(self):
computer = slapos.format.Computer('computer',
instance_root='/instance_root',
software_root='/software_root',
interface=slapos.format.Interface(
logger=self.logger, name='bridge', ipv4_local_network='127.0.0.1/16'),
partition_list=[])
computer.format(alter_user=False)
self.assertEqual([
"makedirs('/instance_root', 493)",
"makedirs('/software_root', 493)",
"chmod('/software_root', 493)"],
self.test_result.bucket)
self.assertEqual([
'ip addr list bridge',
'brctl show',
],
self.fakeCallAndRead.external_command_list)
@unittest.skip("Not implemented")
def test_construct_empty_prepared_no_alter_network(self):
computer = slapos.format.Computer('computer',
instance_root='/instance_root',
software_root='/software_root',
interface=slapos.format.Interface(
logger=self.logger, name='bridge', ipv4_local_network='127.0.0.1/16'),
partition_list=[])
self.assertEqual([
"makedirs('/instance_root', 493)",
"makedirs('/software_root', 493)",
"chown('/software_root', 0, 0)",
"chmod('/software_root', 493)"],
self.test_result.bucket)
self.assertEqual([
'ip addr list bridge',
'groupadd slapsoft',
'useradd -d /software_root -g slapsoft slapsoft -r'
],
self.fakeCallAndRead.external_command_list)
def test_construct_empty_prepared_no_alter_network_user(self):
computer = slapos.format.Computer('computer',
instance_root='/instance_root',
software_root='/software_root',
interface=slapos.format.Interface(
logger=self.logger, name='bridge', ipv4_local_network='127.0.0.1/16'),
partition_list=[])
computer.format(alter_network=False, alter_user=False)
self.assertEqual([
"makedirs('/instance_root', 493)",
"makedirs('/software_root', 493)",
"chmod('/software_root', 493)"],
self.test_result.bucket)
self.assertEqual([
'ip addr list bridge',
'brctl show',
],
self.fakeCallAndRead.external_command_list)
@unittest.skip("Not implemented")
def test_construct_prepared(self):
computer = slapos.format.Computer('computer',
instance_root='/instance_root',
software_root='/software_root',
interface=slapos.format.Interface(
logger=self.logger, name='bridge', ipv4_local_network='127.0.0.1/16'),
partition_list=[
slapos.format.Partition(
'partition', '/part_path', slapos.format.User('testuser'), [], tap=slapos.format.Tap('tap')),
])
global INTERFACE_DICT
INTERFACE_DICT['bridge'] = {
socket.AF_INET: [{'addr': '192.168.242.77', 'broadcast': '127.0.0.1',
'netmask': '255.255.255.0'}],
socket.AF_INET6: [{'addr': '2a01:e35:2e27::e59c', 'netmask': 'ffff:ffff:ffff:ffff::'}]
}
computer.format()
self.assertEqual([
"makedirs('/instance_root', 493)",
"makedirs('/software_root', 493)",
"chown('/software_root', 0, 0)",
"chmod('/software_root', 493)",
"mkdir('/instance_root/partition', 488)",
"chown('/instance_root/partition', 0, 0)",
"chmod('/instance_root/partition', 488)"
],
self.test_result.bucket)
self.assertEqual([
'ip addr list bridge',
'groupadd slapsoft',
'useradd -d /software_root -g slapsoft slapsoft -r',
'groupadd testuser',
'useradd -d /instance_root/partition -g testuser -G slapsoft testuser -r',
'brctl show',
'ip tuntap add dev tap mode tap user testuser',
'ip link set tap up',
'brctl show',
'brctl addif bridge tap',
'ip addr add ip/255.255.255.255 dev bridge',
'ip addr list bridge',
'ip addr add ip/ffff:ffff:ffff:ffff:: dev bridge',
'ip addr list bridge',
],
self.fakeCallAndRead.external_command_list)
def test_construct_prepared_no_alter_user(self):
computer = slapos.format.Computer('computer',
instance_root='/instance_root',
software_root='/software_root',
interface=slapos.format.Interface(
logger=self.logger, name='bridge', ipv4_local_network='127.0.0.1/16'),
partition_list=[
slapos.format.Partition(
'partition', '/part_path', slapos.format.User('testuser'), [], tap=slapos.format.Tap('tap')),
])
global USER_LIST
USER_LIST = ['testuser']
global INTERFACE_DICT
INTERFACE_DICT['bridge'] = {
socket.AF_INET: [{'addr': '192.168.242.77', 'broadcast': '127.0.0.1',
'netmask': '255.255.255.0'}],
socket.AF_INET6: [{'addr': '2a01:e35:2e27::e59c', 'netmask': 'ffff:ffff:ffff:ffff::'}]
}
computer.format(alter_user=False)
self.assertEqual([
"makedirs('/instance_root', 493)",
"makedirs('/software_root', 493)",
"chmod('/software_root', 493)",
"mkdir('/instance_root/partition', 488)",
"chmod('/instance_root/partition', 488)"
],
self.test_result.bucket)
self.assertEqual([
'ip addr list bridge',
'brctl show',
'ip tuntap add dev tap mode tap user testuser',
'ip link set tap up',
'brctl show',
'brctl show',
'brctl addif bridge tap',
'ip addr add ip/255.255.255.255 dev bridge',
# 'ip addr list bridge',
'ip addr add ip/ffff:ffff:ffff:ffff:: dev bridge',
'ip -6 addr list bridge',
],
self.fakeCallAndRead.external_command_list)
def test_construct_prepared_tap_no_alter_user(self):
computer = slapos.format.Computer('computer',
instance_root='/instance_root',
software_root='/software_root',
tap_gateway_interface='eth1',
interface=slapos.format.Interface(
logger=self.logger, name='iface', ipv4_local_network='127.0.0.1/16'),
partition_list=[
slapos.format.Partition(
'partition', '/part_path', slapos.format.User('testuser'), [],
tap=slapos.format.Tap('tap')),
])
global USER_LIST
USER_LIST = ['testuser']
global INTERFACE_DICT
INTERFACE_DICT['iface'] = {
socket.AF_INET: [{'addr': '192.168.242.77', 'broadcast': '127.0.0.1',
'netmask': '255.255.255.0'}],
socket.AF_INET6: [{'addr': '2a01:e35:2e27::e59c', 'netmask': 'ffff:ffff:ffff:ffff::'}]
}
INTERFACE_DICT['eth1'] = {
socket.AF_INET: [{'addr': '10.8.0.1', 'broadcast': '10.8.0.254',
'netmask': '255.255.255.0'}]
}
computer.format(alter_user=False)
self.assertEqual([
"makedirs('/instance_root', 493)",
"makedirs('/software_root', 493)",
"chmod('/software_root', 493)",
"mkdir('/instance_root/partition', 488)",
"chmod('/instance_root/partition', 488)"
],
self.test_result.bucket)
self.assertEqual([
'ip addr list iface',
'brctl show',
'ip tuntap add dev tap mode tap user testuser',
'ip link set tap up',
'ip route show 10.8.0.2',
'ip route add 10.8.0.2 dev tap',
'ip addr add ip/255.255.255.255 dev iface',
'ip addr add ip/ffff:ffff:ffff:ffff:: dev iface',
'ip -6 addr list iface'
],
self.fakeCallAndRead.external_command_list)
partition = computer.partition_list[0]
self.assertEqual(partition.tap.ipv4_addr, '10.8.0.2')
self.assertEqual(partition.tap.ipv4_netmask, '255.255.255.0')
self.assertEqual(partition.tap.ipv4_gateway, '10.8.0.1')
self.assertEqual(partition.tap.ipv4_network, '10.8.0.0')
@unittest.skip("Not implemented")
def test_construct_prepared_no_alter_network(self):
computer = slapos.format.Computer('computer',
instance_root='/instance_root',
software_root='/software_root',
interface=slapos.format.Interface(
logger=self.logger, name='bridge', ipv4_local_network='127.0.0.1/16'),
partition_list=[
slapos.format.Partition(
'partition', '/part_path', slapos.format.User('testuser'), [],
tap=slapos.format.Tap('tap')),
])
global INTERFACE_DICT
INTERFACE_DICT['bridge'] = {
socket.AF_INET: [{'addr': '192.168.242.77', 'broadcast': '127.0.0.1',
'netmask': '255.255.255.0'}],
socket.AF_INET6: [{'addr': '2a01:e35:2e27::e59c', 'netmask': 'ffff:ffff:ffff:ffff::'}]
}
computer.format(alter_network=False)
self.assertEqual([
"makedirs('/instance_root', 493)",
"makedirs('/software_root', 493)",
"chown('/software_root', 0, 0)",
"chmod('/software_root', 493)",
"mkdir('/instance_root/partition', 488)",
"chown('/instance_root/partition', 0, 0)",
"chmod('/instance_root/partition', 488)"
],
self.test_result.bucket)
self.assertEqual([
# 'ip addr list bridge',
'groupadd slapsoft',
'useradd -d /software_root -g slapsoft slapsoft -r',
'groupadd testuser',
'useradd -d /instance_root/partition -g testuser -G slapsoft testuser -r',
# 'ip addr add ip/255.255.255.255 dev bridge',
# 'ip addr list bridge',
# 'ip addr add ip/ffff:ffff:ffff:ffff:: dev bridge',
# 'ip addr list bridge',
],
self.fakeCallAndRead.external_command_list)
def test_construct_prepared_no_alter_network_user(self):
computer = slapos.format.Computer('computer',
instance_root='/instance_root',
software_root='/software_root',
interface=slapos.format.Interface(
logger=self.logger, name='bridge', ipv4_local_network='127.0.0.1/16'),
partition_list=[
slapos.format.Partition(
'partition', '/part_path', slapos.format.User('testuser'), [],
tap=slapos.format.Tap('tap')),
])
global INTERFACE_DICT
INTERFACE_DICT['bridge'] = {
socket.AF_INET: [{'addr': '192.168.242.77', 'broadcast': '127.0.0.1',
'netmask': '255.255.255.0'}],
socket.AF_INET6: [{'addr': '2a01:e35:2e27::e59c', 'netmask': 'ffff:ffff:ffff:ffff::'}]
}
computer.format(alter_network=False, alter_user=False)
self.assertEqual([
"makedirs('/instance_root', 493)",
"makedirs('/software_root', 493)",
"chmod('/software_root', 493)",
"mkdir('/instance_root/partition', 488)",
"chmod('/instance_root/partition', 488)"
],
self.test_result.bucket)
self.assertEqual([
'ip addr list bridge',
'brctl show',
'ip addr add ip/255.255.255.255 dev bridge',
# 'ip addr list bridge',
'ip addr add ip/ffff:ffff:ffff:ffff:: dev bridge',
'ip -6 addr list bridge',
],
self.fakeCallAndRead.external_command_list)
def test_construct_use_unique_local_address_block(self):
"""
Test that slapformat creates a unique local address in the interface.
"""
global USER_LIST
USER_LIST = ['root']
computer = slapos.format.Computer('computer',
instance_root='/instance_root',
software_root='/software_root',
interface=slapos.format.Interface(
logger=self.logger, name='myinterface', ipv4_local_network='127.0.0.1/16'),
partition_list=[
slapos.format.Partition(
'partition', '/part_path', slapos.format.User('testuser'), [],
tap=slapos.format.Tap('tap')),
])
global INTERFACE_DICT
INTERFACE_DICT['myinterface'] = {
socket.AF_INET: [{'addr': '192.168.242.77', 'broadcast': '127.0.0.1',
'netmask': '255.255.255.0'}],
socket.AF_INET6: [{'addr': '2a01:e35:2e27::e59c', 'netmask': 'ffff:ffff:ffff:ffff::'}]
}
computer.format(use_unique_local_address_block=True, alter_user=False, create_tap=False)
self.assertEqual([
"makedirs('/instance_root', 493)",
"makedirs('/software_root', 493)",
"chmod('/software_root', 493)",
"mkdir('/instance_root/partition', 488)",
"chmod('/instance_root/partition', 488)"
],
self.test_result.bucket)
self.assertEqual([
'ip addr list myinterface',
'brctl show',
'ip address add dev myinterface fd00::1/64',
'ip addr add ip/255.255.255.255 dev myinterface',
'ip addr add ip/ffff:ffff:ffff:ffff:: dev myinterface',
'ip -6 addr list myinterface'
],
self.fakeCallAndRead.external_command_list)
class SlapGridPartitionMock:
def __init__(self, partition):
self.partition = partition
self.instance_path = partition.path
def getUserGroupId(self):
return (0, 0)
class TestComputerWithCPUSet(SlapformatMixin):
cpuset_path = "/tmp/cpuset/"
task_write_mode = "at" # append insted of write tasks PIDs for the tests
def setUp(self):
logging.getLogger("slapos.manager.cpuset").addHandler(
logging.StreamHandler())
super(TestComputerWithCPUSet, self).setUp()
self.restoreOs()
if os.path.isdir("/tmp/slapgrid/"):
shutil.rmtree("/tmp/slapgrid/")
os.mkdir("/tmp/slapgrid/")
if os.path.isdir(self.cpuset_path):
shutil.rmtree(self.cpuset_path)
os.mkdir(self.cpuset_path)
file_write("0,1-3",
os.path.join(self.cpuset_path, "cpuset.cpus"))
file_write("\n".join(("1000", "1001", "1002", "")),
os.path.join(self.cpuset_path, "tasks"))
self.cpu_list = [0, 1, 2, 3]
global USER_LIST, INTERFACE_DICT
USER_LIST = ['testuser']
INTERFACE_DICT['bridge'] = {
socket.AF_INET: [
{'addr': '127.0.0.1', 'broadcast': '127.0.255.255', 'netmask': '255.255.0.0'}],
socket.AF_INET6: [
{'addr': '2a01:e35:2e27::e59c', 'netmask': 'ffff:ffff:ffff:ffff::'}]
}
from slapos.manager.cpuset import Manager
self.orig_cpuset_path = Manager.cpuset_path
self.orig_task_write_mode = Manager.task_write_mode
Manager.cpuset_path = self.cpuset_path
Manager.task_write_mode = self.task_write_mode
self.computer = slapos.format.Computer('computer',
software_user='testuser',
instance_root='/tmp/slapgrid/instance_root',
software_root='/tmp/slapgrid/software_root',
interface=slapos.format.Interface(
logger=self.logger, name='bridge', ipv4_local_network='127.0.0.1/16'),
partition_list=[
slapos.format.Partition(
'partition', '/tmp/slapgrid/instance_root/part1', slapos.format.User('testuser'), [], tap=None),
],
config={
"manager_list": "cpuset",
"power_user_list": "testuser root"
}
)
# self.patchOs(self.logger)
def tearDown(self):
"""Cleanup temporary test folders."""
from slapos.manager.cpuset import Manager
Manager.cpuset_path = self.orig_cpuset_path
Manager.task_write_mode = self.orig_task_write_mode
super(TestComputerWithCPUSet, self).tearDown()
shutil.rmtree("/tmp/slapgrid/")
if self.cpuset_path.startswith("/tmp"):
shutil.rmtree(self.cpuset_path)
logging.getLogger("slapos.manager.cpuset")
def test_positive_cgroups(self):
"""Positive test of cgroups."""
# Test parsing "cpuset.cpus" file
self.assertEqual(self.computer._manager_list[0]._cpu_id_list(), self.cpu_list)
# This should created per-cpu groups and move all tasks in CPU pool into cpu0
self.computer.format(alter_network=False, alter_user=False)
# Test files creation for exclusive CPUs
for cpu_id in self.cpu_list:
cpu_n_path = os.path.join(self.cpuset_path, "cpu" + str(cpu_id))
self.assertEqual(str(cpu_id), file_content(os.path.join(cpu_n_path, "cpuset.cpus")))
self.assertEqual("1", file_content(os.path.join(cpu_n_path, "cpuset.cpu_exclusive")))
if cpu_id > 0:
self.assertEqual("", file_content(os.path.join(cpu_n_path, "tasks")))
# Test moving tasks from generic core to private core
# request PID 1001 to be moved to its private CPU
request_file_path = os.path.join(self.computer.partition_list[0].path,
slapos.manager.cpuset.Manager.cpu_exclusive_file)
file_write("1001\n", request_file_path)
# Simulate slapos instance call to perform the actual movement
self.computer._manager_list[0].instance(
SlapGridPartitionMock(self.computer.partition_list[0]))
# Simulate cgroup behaviour - empty tasks in the pool
file_write("", os.path.join(self.cpuset_path, "tasks"))
# Test that format moved all PIDs from CPU pool into CPU0
tasks_at_cpu0 = file_content(os.path.join(self.cpuset_path, "cpu0", "tasks")).split()
self.assertIn("1000", tasks_at_cpu0)
# test if the moving suceeded into any provate CPUS (id>0)
self.assertTrue(any("1001" in file_content(exclusive_task)
for exclusive_task in glob.glob(os.path.join(self.cpuset_path, "cpu[1-9]", "tasks"))))
self.assertIn("1002", tasks_at_cpu0)
# slapformat should remove successfully moved PIDs from the .slapos-cpu-exclusive file
self.assertEqual("", file_content(request_file_path).strip())
class TestPartition(SlapformatMixin):
def test_createPath_no_alter_user(self):
self.partition.createPath(False)
self.assertEqual(
[
"mkdir('/part_path', 488)",
"chmod('/part_path', 488)"
],
self.test_result.bucket
)
class TestUser(SlapformatMixin):
def test_create(self):
user = slapos.format.User('doesnotexistsyet')
user.setPath('/doesnotexistsyet')
user.create()
self.assertEqual([
'groupadd doesnotexistsyet',
'useradd -d /doesnotexistsyet -g doesnotexistsyet -s /bin/sh '\
'doesnotexistsyet -r',
'passwd -l doesnotexistsyet'
],
self.fakeCallAndRead.external_command_list)
def test_create_additional_groups(self):
user = slapos.format.User('doesnotexistsyet', ['additionalgroup1',
'additionalgroup2'])
user.setPath('/doesnotexistsyet')
user.create()
self.assertEqual([
'groupadd doesnotexistsyet',
'useradd -d /doesnotexistsyet -g doesnotexistsyet -s /bin/sh -G '\
'additionalgroup1,additionalgroup2 doesnotexistsyet -r',
'passwd -l doesnotexistsyet'
],
self.fakeCallAndRead.external_command_list)
def test_create_group_exists(self):
global GROUP_LIST
GROUP_LIST = ['testuser']
user = slapos.format.User('testuser')
user.setPath('/testuser')
user.create()
self.assertEqual([
'useradd -d /testuser -g testuser -s /bin/sh testuser -r',
'passwd -l testuser'
],
self.fakeCallAndRead.external_command_list)
def test_create_user_exists_additional_groups(self):
global USER_LIST
USER_LIST = ['testuser']
user = slapos.format.User('testuser', ['additionalgroup1',
'additionalgroup2'])
user.setPath('/testuser')
user.create()
self.assertEqual([
'groupadd testuser',
'usermod -d /testuser -g testuser -s /bin/sh -G '\
'additionalgroup1,additionalgroup2 testuser',
'passwd -l testuser'
],
self.fakeCallAndRead.external_command_list)
def test_create_user_exists(self):
global USER_LIST
USER_LIST = ['testuser']
user = slapos.format.User('testuser')
user.setPath('/testuser')
user.create()
self.assertEqual([
'groupadd testuser',
'usermod -d /testuser -g testuser -s /bin/sh testuser',
'passwd -l testuser'
],
self.fakeCallAndRead.external_command_list)
def test_create_user_group_exists(self):
global USER_LIST
USER_LIST = ['testuser']
global GROUP_LIST
GROUP_LIST = ['testuser']
user = slapos.format.User('testuser')
user.setPath('/testuser')
user.create()
self.assertEqual([
'usermod -d /testuser -g testuser -s /bin/sh testuser',
'passwd -l testuser'
],
self.fakeCallAndRead.external_command_list)
def test_isAvailable(self):
global USER_LIST
USER_LIST = ['testuser']
user = slapos.format.User('testuser')
self.assertTrue(user.isAvailable())
def test_isAvailable_notAvailable(self):
user = slapos.format.User('doesnotexistsyet')
self.assertFalse(user.isAvailable())
if __name__ == '__main__':
unittest.main()
slapos.core-15dedb62a3565373c77977e5ebd2e56b4ffe341a-slapos-tests/slapos/tests/slapgrid.py 0000664 0000000 0000000 00000334124 13232036066 0030331 0 ustar 00root root 0000000 0000000 ##############################################################################
#
# Copyright (c) 2010 Vifib SARL and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from __future__ import absolute_import
import logging
import os
import random
import shutil
import signal
import socket
import sys
import stat
import tempfile
import textwrap
import time
import unittest
import urlparse
import json
import re
import xml_marshaller
from mock import patch
import slapos.slap.slap
import slapos.grid.utils
from slapos.grid import slapgrid
from slapos.grid.utils import md5digest
from slapos.grid.watchdog import Watchdog
from slapos.grid import SlapObject
from slapos.grid.SlapObject import WATCHDOG_MARK
from slapos.slap.slap import COMPUTER_PARTITION_REQUEST_LIST_TEMPLATE_FILENAME
import slapos.grid.SlapObject
from slapos import manager as slapmanager
import httmock
dummylogger = logging.getLogger()
WATCHDOG_TEMPLATE = """#!{python_path} -S
import sys
sys.path={sys_path}
import slapos.slap
import slapos.grid.watchdog
def bang(self_partition, message):
nl = chr(10)
with open('{watchdog_banged}', 'w') as fout:
for key, value in vars(self_partition).items():
fout.write('%s: %s%s' % (key, value, nl))
if key == '_connection_helper':
for k, v in vars(value).items():
fout.write(' %s: %s%s' % (k, v, nl))
fout.write(message)
slapos.slap.ComputerPartition.bang = bang
slapos.grid.watchdog.main()
"""
WRAPPER_CONTENT = """#!/bin/sh
touch worked &&
mkdir -p etc/run &&
echo "#!/bin/sh" > etc/run/wrapper &&
echo "while true; do echo Working; sleep 0.1; done" >> etc/run/wrapper &&
chmod 755 etc/run/wrapper
"""
DAEMON_CONTENT = """#!/bin/sh
mkdir -p etc/service &&
echo "#!/bin/sh" > etc/service/daemon &&
echo "touch launched
if [ -f ./crashed ]; then
while true; do echo Working; sleep 0.1; done
else
touch ./crashed; echo Failing; sleep 1; exit 111;
fi" >> etc/service/daemon &&
chmod 755 etc/service/daemon &&
touch worked
"""
class BasicMixin(object):
def setUp(self):
self._tempdir = tempfile.mkdtemp()
self.software_root = os.path.join(self._tempdir, 'software')
self.instance_root = os.path.join(self._tempdir, 'instance')
if os.environ.has_key('SLAPGRID_INSTANCE_ROOT'):
del os.environ['SLAPGRID_INSTANCE_ROOT']
logging.basicConfig(level=logging.DEBUG)
self.setSlapgrid()
def setSlapgrid(self, develop=False):
if getattr(self, 'master_url', None) is None:
self.master_url = 'http://127.0.0.1:80/'
self.computer_id = 'computer'
self.supervisord_socket = os.path.join(self._tempdir, 'supervisord.sock')
self.supervisord_configuration_path = os.path.join(self._tempdir,
'supervisord')
self.usage_report_periodicity = 1
self.buildout = None
self.grid = slapgrid.Slapgrid(self.software_root,
self.instance_root,
self.master_url,
self.computer_id,
self.buildout,
develop=develop,
logger=logging.getLogger())
# monkey patch buildout bootstrap
def dummy(*args, **kw):
pass
slapos.grid.utils.bootstrapBuildout = dummy
SlapObject.PROGRAM_PARTITION_TEMPLATE = textwrap.dedent("""\
[program:%(program_id)s]
directory=%(program_directory)s
command=%(program_command)s
process_name=%(program_name)s
autostart=false
autorestart=false
startsecs=0
startretries=0
exitcodes=0
stopsignal=TERM
stopwaitsecs=60
stopasgroup=true
killasgroup=true
user=%(user_id)s
group=%(group_id)s
serverurl=AUTO
redirect_stderr=true
stdout_logfile=%(instance_path)s/.%(program_id)s.log
stderr_logfile=%(instance_path)s/.%(program_id)s.log
environment=USER="%(USER)s",LOGNAME="%(USER)s",HOME="%(HOME)s"
""")
def launchSlapgrid(self, develop=False):
self.setSlapgrid(develop=develop)
return self.grid.processComputerPartitionList()
def launchSlapgridSoftware(self, develop=False):
self.setSlapgrid(develop=develop)
return self.grid.processSoftwareReleaseList()
def assertLogContent(self, log_path, expected, tries=600):
for i in range(tries):
if expected in open(log_path).read():
return
time.sleep(0.1)
self.fail('%r not found in %s' % (expected, log_path))
def assertIsCreated(self, path, tries=600):
for i in range(tries):
if os.path.exists(path):
return
time.sleep(0.1)
self.fail('%s should be created' % path)
def assertIsNotCreated(self, path, tries=50):
for i in range(tries):
if os.path.exists(path):
self.fail('%s should not be created' % path)
time.sleep(0.01)
def assertInstanceDirectoryListEqual(self, instance_list):
instance_list.append('etc')
instance_list.append('var')
instance_list.append('supervisord.socket')
self.assertItemsEqual(os.listdir(self.instance_root), instance_list)
def tearDown(self):
# XXX: Hardcoded pid, as it is not configurable in slapos
svc = os.path.join(self.instance_root, 'var', 'run', 'supervisord.pid')
if os.path.exists(svc):
try:
pid = int(open(svc).read().strip())
except ValueError:
pass
else:
os.kill(pid, signal.SIGTERM)
shutil.rmtree(self._tempdir, True)
class TestRequiredOnlyPartitions(unittest.TestCase):
def test_no_errors(self):
required = ['one', 'three']
existing = ['one', 'two', 'three']
slapgrid.check_required_only_partitions(existing, required)
def test_one_missing(self):
required = ['foobar', 'two', 'one']
existing = ['one', 'two', 'three']
self.assertRaisesRegexp(ValueError,
'Unknown partition: foobar',
slapgrid.check_required_only_partitions,
existing, required)
def test_several_missing(self):
required = ['foobar', 'barbaz']
existing = ['one', 'two', 'three']
self.assertRaisesRegexp(ValueError,
'Unknown partitions: barbaz, foobar',
slapgrid.check_required_only_partitions,
existing, required)
class TestBasicSlapgridCP(BasicMixin, unittest.TestCase):
def test_no_software_root(self):
self.assertRaises(OSError, self.grid.processComputerPartitionList)
def test_no_instance_root(self):
os.mkdir(self.software_root)
self.assertRaises(OSError, self.grid.processComputerPartitionList)
@unittest.skip('which request handler here?')
def test_no_master(self):
os.mkdir(self.software_root)
os.mkdir(self.instance_root)
self.assertRaises(socket.error, self.grid.processComputerPartitionList)
class MasterMixin(BasicMixin):
def _mock_sleep(self):
self.fake_waiting_time = None
self.real_sleep = time.sleep
def mocked_sleep(secs):
if self.fake_waiting_time is not None:
secs = self.fake_waiting_time
self.real_sleep(secs)
time.sleep = mocked_sleep
def _unmock_sleep(self):
time.sleep = self.real_sleep
def setUp(self):
self._mock_sleep()
BasicMixin.setUp(self)
def tearDown(self):
self._unmock_sleep()
BasicMixin.tearDown(self)
class ComputerForTest(object):
"""
Class to set up environment for tests setting instance, software
and server response
"""
def __init__(self,
software_root,
instance_root,
instance_amount=1,
software_amount=1):
"""
Will set up instances, software and sequence
"""
self.sequence = []
self.instance_amount = instance_amount
self.software_amount = software_amount
self.software_root = software_root
self.instance_root = instance_root
self.ip_address_list = [
('interface1', '10.0.8.3'),
('interface2', '10.0.8.4'),
('route_interface1', '10.10.8.4')
]
if not os.path.isdir(self.instance_root):
os.mkdir(self.instance_root)
if not os.path.isdir(self.software_root):
os.mkdir(self.software_root)
self.setSoftwares()
self.setInstances()
def request_handler(self, url, req):
"""
Define _callback.
Will register global sequence of message, sequence by partition
and error and error message by partition
"""
self.sequence.append(url.path)
if req.method == 'GET':
qs = urlparse.parse_qs(url.query)
else:
qs = urlparse.parse_qs(req.body)
if (url.path == '/getFullComputerInformation'
and 'computer_id' in qs):
slap_computer = self.getComputer(qs['computer_id'][0])
return {
'status_code': 200,
'content': xml_marshaller.xml_marshaller.dumps(slap_computer)
}
elif url.path == '/getHostingSubscriptionIpList':
ip_address_list = self.ip_address_list
return {
'status_code': 200,
'content': xml_marshaller.xml_marshaller.dumps(ip_address_list)
}
if req.method == 'POST' and 'computer_partition_id' in qs:
instance = self.instance_list[int(qs['computer_partition_id'][0])]
instance.sequence.append(url.path)
instance.header_list.append(req.headers)
if url.path == '/availableComputerPartition':
return {'status_code': 200}
if url.path == '/startedComputerPartition':
instance.state = 'started'
return {'status_code': 200}
if url.path == '/stoppedComputerPartition':
instance.state = 'stopped'
return {'status_code': 200}
if url.path == '/destroyedComputerPartition':
instance.state = 'destroyed'
return {'status_code': 200}
if url.path == '/softwareInstanceBang':
return {'status_code': 200}
if url.path == "/updateComputerPartitionRelatedInstanceList":
return {'status_code': 200}
if url.path == '/softwareInstanceError':
instance.error_log = '\n'.join(
[
line
for line in qs['error_log'][0].splitlines()
if 'dropPrivileges' not in line
]
)
instance.error = True
return {'status_code': 200}
elif req.method == 'POST' and 'url' in qs:
# XXX hardcoded to first software release!
software = self.software_list[0]
software.sequence.append(url.path)
if url.path == '/buildingSoftwareRelease':
return {'status_code': 200}
if url.path == '/softwareReleaseError':
software.error_log = '\n'.join(
[
line
for line in qs['error_log'][0].splitlines()
if 'dropPrivileges' not in line
]
)
software.error = True
return {'status_code': 200}
else:
return {'status_code': 500}
def setSoftwares(self):
"""
Will set requested amount of software
"""
self.software_list = [
SoftwareForTest(self.software_root, name=str(i))
for i in range(self.software_amount)
]
def setInstances(self):
"""
Will set requested amount of instance giving them by default first software
"""
if self.software_list:
software = self.software_list[0]
else:
software = None
self.instance_list = [
InstanceForTest(self.instance_root, name=str(i), software=software)
for i in range(self.instance_amount)
]
def getComputer(self, computer_id):
"""
Will return current requested state of computer
"""
slap_computer = slapos.slap.Computer(computer_id)
slap_computer._software_release_list = [
software.getSoftware(computer_id)
for software in self.software_list
]
slap_computer._computer_partition_list = [
instance.getInstance(computer_id)
for instance in self.instance_list
]
return slap_computer
class InstanceForTest(object):
"""
Class containing all needed paramaters and function to simulate instances
"""
def __init__(self, instance_root, name, software):
self.instance_root = instance_root
self.software = software
self.requested_state = 'stopped'
self.state = None
self.error = False
self.error_log = None
self.sequence = []
self.header_list = []
self.name = name
self.partition_path = os.path.join(self.instance_root, self.name)
os.mkdir(self.partition_path, 0o750)
self.timestamp = None
self.ip_list = [('interface0', '10.0.8.2')]
self.full_ip_list = [('route_interface0', '10.10.2.3', '10.10.0.1',
'255.0.0.0', '10.0.0.0')]
def getInstance(self, computer_id, ):
"""
Will return current requested state of instance
"""
partition = slapos.slap.ComputerPartition(computer_id, self.name)
partition._software_release_document = self.getSoftwareRelease()
partition._requested_state = self.requested_state
if getattr(self, 'filter_dict', None):
partition._filter_dict = self.filter_dict
partition._parameter_dict = {'ip_list': self.ip_list,
'full_ip_list': self.full_ip_list
}
if self.software is not None:
if self.timestamp is not None:
partition._parameter_dict['timestamp'] = self.timestamp
self.current_partition = partition
return partition
def getSoftwareRelease(self):
"""
Return software release for Instance
"""
if self.software is not None:
sr = slapos.slap.SoftwareRelease()
sr._software_release = self.software.name
return sr
else:
return None
def setPromise(self, promise_name, promise_content):
"""
This function will set promise and return its path
"""
promise_path = os.path.join(self.partition_path, 'etc', 'promise')
if not os.path.isdir(promise_path):
os.makedirs(promise_path)
promise = os.path.join(promise_path, promise_name)
open(promise, 'w').write(promise_content)
os.chmod(promise, 0o777)
def setCertificate(self, certificate_repository_path):
if not os.path.exists(certificate_repository_path):
os.mkdir(certificate_repository_path)
self.cert_file = os.path.join(certificate_repository_path,
"%s.crt" % self.name)
self.certificate = str(random.random())
open(self.cert_file, 'w').write(self.certificate)
self.key_file = os.path.join(certificate_repository_path,
'%s.key' % self.name)
self.key = str(random.random())
open(self.key_file, 'w').write(self.key)
class SoftwareForTest(object):
"""
Class to prepare and simulate software.
each instance has a sotfware attributed
"""
def __init__(self, software_root, name=''):
"""
Will set file and variable for software
"""
self.software_root = software_root
self.name = 'http://sr%s/' % name
self.sequence = []
self.software_hash = md5digest(self.name)
self.srdir = os.path.join(self.software_root, self.software_hash)
self.requested_state = 'available'
os.mkdir(self.srdir)
self.setTemplateCfg()
self.srbindir = os.path.join(self.srdir, 'bin')
os.mkdir(self.srbindir)
self.setBuildout()
def getSoftware(self, computer_id):
"""
Will return current requested state of software
"""
software = slapos.slap.SoftwareRelease(self.name, computer_id)
software._requested_state = self.requested_state
return software
def setTemplateCfg(self, template="""[buildout]"""):
"""
Set template.cfg
"""
open(os.path.join(self.srdir, 'template.cfg'), 'w').write(template)
def setBuildout(self, buildout="""#!/bin/sh
touch worked"""):
"""
Set a buildout exec in bin
"""
open(os.path.join(self.srbindir, 'buildout'), 'w').write(buildout)
os.chmod(os.path.join(self.srbindir, 'buildout'), 0o755)
def setPeriodicity(self, periodicity):
"""
Set a periodicity file
"""
with open(os.path.join(self.srdir, 'periodicity'), 'w') as fout:
fout.write(str(periodicity))
class TestSlapgridCPWithMaster(MasterMixin, unittest.TestCase):
def test_nothing_to_do(self):
computer = ComputerForTest(self.software_root, self.instance_root, 0, 0)
with httmock.HTTMock(computer.request_handler):
self.assertEqual(self.grid.processComputerPartitionList(), slapgrid.SLAPGRID_SUCCESS)
self.assertInstanceDirectoryListEqual([])
self.assertItemsEqual(os.listdir(self.software_root), [])
st = os.stat(os.path.join(self.instance_root, 'var'))
self.assertEquals(stat.S_IMODE(st.st_mode), 0o755)
def test_one_partition(self):
computer = ComputerForTest(self.software_root, self.instance_root)
with httmock.HTTMock(computer.request_handler):
instance = computer.instance_list[0]
self.assertEqual(self.grid.processComputerPartitionList(), slapgrid.SLAPGRID_SUCCESS)
self.assertInstanceDirectoryListEqual(['0'])
partition = os.path.join(self.instance_root, '0')
self.assertItemsEqual(os.listdir(partition), ['.slapgrid', 'buildout.cfg',
'software_release', 'worked', '.slapos-retention-lock-delay'])
self.assertItemsEqual(os.listdir(self.software_root), [instance.software.software_hash])
self.assertEqual(computer.sequence,
['/getFullComputerInformation', '/availableComputerPartition',
'/stoppedComputerPartition'])
def test_one_partition_instance_cfg(self):
"""
Check that slapgrid processes instance is profile is not named
"template.cfg" but "instance.cfg".
"""
computer = ComputerForTest(self.software_root, self.instance_root)
with httmock.HTTMock(computer.request_handler):
instance = computer.instance_list[0]
self.assertEqual(self.grid.processComputerPartitionList(), slapgrid.SLAPGRID_SUCCESS)
self.assertInstanceDirectoryListEqual(['0'])
partition = os.path.join(self.instance_root, '0')
self.assertItemsEqual(os.listdir(partition), ['.slapgrid', 'buildout.cfg',
'software_release', 'worked', '.slapos-retention-lock-delay'])
self.assertItemsEqual(os.listdir(self.software_root), [instance.software.software_hash])
self.assertEqual(computer.sequence,
['/getFullComputerInformation', '/availableComputerPartition',
'/stoppedComputerPartition'])
def test_one_free_partition(self):
"""
Test if slapgrid cp does not process "free" partition
"""
computer = ComputerForTest(self.software_root,
self.instance_root,
software_amount=0)
with httmock.HTTMock(computer.request_handler):
partition = computer.instance_list[0]
partition.requested_state = 'destroyed'
self.assertEqual(self.grid.processComputerPartitionList(), slapgrid.SLAPGRID_SUCCESS)
self.assertInstanceDirectoryListEqual(['0'])
self.assertItemsEqual(os.listdir(partition.partition_path), [])
self.assertItemsEqual(os.listdir(self.software_root), [])
self.assertEqual(partition.sequence, [])
def test_one_partition_started(self):
computer = ComputerForTest(self.software_root, self.instance_root)
with httmock.HTTMock(computer.request_handler):
partition = computer.instance_list[0]
partition.requested_state = 'started'
partition.software.setBuildout(WRAPPER_CONTENT)
self.assertEqual(self.grid.processComputerPartitionList(), slapgrid.SLAPGRID_SUCCESS)
self.assertInstanceDirectoryListEqual(['0'])
self.assertItemsEqual(os.listdir(partition.partition_path),
['.slapgrid', '.0_wrapper.log', 'buildout.cfg',
'etc', 'software_release', 'worked', '.slapos-retention-lock-delay'])
wrapper_log = os.path.join(partition.partition_path, '.0_wrapper.log')
self.assertLogContent(wrapper_log, 'Working')
self.assertItemsEqual(os.listdir(self.software_root), [partition.software.software_hash])
self.assertEqual(computer.sequence,
['/getFullComputerInformation', '/availableComputerPartition',
'/startedComputerPartition'])
self.assertEqual(partition.state, 'started')
def test_one_partition_started_fail(self):
computer = ComputerForTest(self.software_root, self.instance_root)
with httmock.HTTMock(computer.request_handler):
partition = computer.instance_list[0]
partition.requested_state = 'started'
partition.software.setBuildout(WRAPPER_CONTENT)
self.assertEqual(self.grid.processComputerPartitionList(), slapgrid.SLAPGRID_SUCCESS)
self.assertInstanceDirectoryListEqual(['0'])
self.assertItemsEqual(os.listdir(partition.partition_path),
['.slapgrid', '.0_wrapper.log', 'buildout.cfg',
'etc', 'software_release', 'worked', '.slapos-retention-lock-delay'])
wrapper_log = os.path.join(partition.partition_path, '.0_wrapper.log')
self.assertLogContent(wrapper_log, 'Working')
self.assertItemsEqual(os.listdir(self.software_root), [partition.software.software_hash])
self.assertEqual(computer.sequence,
['/getFullComputerInformation', '/availableComputerPartition',
'/startedComputerPartition'])
self.assertEqual(partition.state, 'started')
instance = computer.instance_list[0]
instance.software.setBuildout("""#!/bin/sh
exit 1
""")
self.assertEqual(self.launchSlapgrid(), slapgrid.SLAPGRID_FAIL)
self.assertInstanceDirectoryListEqual(['0'])
self.assertItemsEqual(os.listdir(instance.partition_path),
['.slapgrid', '.0_wrapper.log', 'buildout.cfg',
'etc', 'software_release', 'worked',
'.slapos-retention-lock-delay', '.slapgrid-0-error.log'])
self.assertEqual(computer.sequence,
['/getFullComputerInformation', '/availableComputerPartition',
'/startedComputerPartition', '/getHateoasUrl',
'/getFullComputerInformation', '/softwareInstanceError'])
self.assertEqual(instance.state, 'started')
def test_one_partition_started_stopped(self):
computer = ComputerForTest(self.software_root, self.instance_root)
with httmock.HTTMock(computer.request_handler):
instance = computer.instance_list[0]
instance.requested_state = 'started'
instance.software.setBuildout("""#!/bin/sh
touch worked &&
mkdir -p etc/run &&
(
cat <<'HEREDOC'
#!%(python)s
import signal
def handler(signum, frame):
for i in range(30):
print 'Signal handler called with signal', signum
raise SystemExit
signal.signal(signal.SIGTERM, handler)
while True:
print "Working"
HEREDOC
)> etc/run/wrapper &&
chmod 755 etc/run/wrapper
""" % {'python': sys.executable})
self.assertEqual(self.grid.processComputerPartitionList(), slapgrid.SLAPGRID_SUCCESS)
self.assertInstanceDirectoryListEqual(['0'])
self.assertItemsEqual(os.listdir(instance.partition_path),
['.slapgrid', '.0_wrapper.log', 'buildout.cfg',
'etc', 'software_release', 'worked', '.slapos-retention-lock-delay'])
wrapper_log = os.path.join(instance.partition_path, '.0_wrapper.log')
self.assertLogContent(wrapper_log, 'Working')
self.assertItemsEqual(os.listdir(self.software_root), [instance.software.software_hash])
self.assertEqual(computer.sequence,
['/getFullComputerInformation', '/availableComputerPartition',
'/startedComputerPartition'])
self.assertEqual(instance.state, 'started')
computer.sequence = []
instance.requested_state = 'stopped'
self.assertEqual(self.launchSlapgrid(), slapgrid.SLAPGRID_SUCCESS)
self.assertInstanceDirectoryListEqual(['0'])
self.assertItemsEqual(os.listdir(instance.partition_path),
['.slapgrid', '.0_wrapper.log', 'buildout.cfg',
'etc', 'software_release', 'worked', '.slapos-retention-lock-delay'])
self.assertLogContent(wrapper_log, 'Signal handler called with signal 15')
self.assertEqual(computer.sequence,
['/getHateoasUrl', '/getFullComputerInformation', '/availableComputerPartition',
'/stoppedComputerPartition'])
self.assertEqual(instance.state, 'stopped')
def test_one_broken_partition_stopped(self):
"""
Check that, for, an already started instance if stop is requested,
processes will be stopped even if instance is broken (buildout fails
to run) but status is still started.
"""
computer = ComputerForTest(self.software_root, self.instance_root)
with httmock.HTTMock(computer.request_handler):
instance = computer.instance_list[0]
instance.requested_state = 'started'
instance.software.setBuildout("""#!/bin/sh
touch worked &&
mkdir -p etc/run &&
(
cat <<'HEREDOC'
#!%(python)s
import signal
def handler(signum, frame):
for i in range(30):
print 'Signal handler called with signal', signum
raise SystemExit
signal.signal(signal.SIGTERM, handler)
while True:
print "Working"
HEREDOC
)> etc/run/wrapper &&
chmod 755 etc/run/wrapper
""" % {'python': sys.executable})
self.assertEqual(self.grid.processComputerPartitionList(), slapgrid.SLAPGRID_SUCCESS)
self.assertInstanceDirectoryListEqual(['0'])
self.assertItemsEqual(os.listdir(instance.partition_path),
['.slapgrid', '.0_wrapper.log', 'buildout.cfg',
'etc', 'software_release', 'worked', '.slapos-retention-lock-delay'])
wrapper_log = os.path.join(instance.partition_path, '.0_wrapper.log')
self.assertLogContent(wrapper_log, 'Working')
self.assertItemsEqual(os.listdir(self.software_root),
[instance.software.software_hash])
self.assertEqual(computer.sequence,
['/getFullComputerInformation', '/availableComputerPartition',
'/startedComputerPartition'])
self.assertEqual(instance.state, 'started')
computer.sequence = []
instance.requested_state = 'stopped'
instance.software.setBuildout("""#!/bin/sh
exit 1
""")
self.assertEqual(self.launchSlapgrid(), slapgrid.SLAPGRID_FAIL)
self.assertInstanceDirectoryListEqual(['0'])
self.assertItemsEqual(os.listdir(instance.partition_path),
['.slapgrid', '.0_wrapper.log', 'buildout.cfg',
'etc', 'software_release', 'worked',
'.slapos-retention-lock-delay', '.slapgrid-0-error.log'])
self.assertLogContent(wrapper_log, 'Signal handler called with signal 15')
self.assertEqual(computer.sequence,
['/getHateoasUrl', '/getFullComputerInformation',
'/softwareInstanceError'])
self.assertEqual(instance.state, 'started')
def test_one_partition_stopped_started(self):
computer = ComputerForTest(self.software_root, self.instance_root)
with httmock.HTTMock(computer.request_handler):
instance = computer.instance_list[0]
instance.requested_state = 'stopped'
instance.software.setBuildout(WRAPPER_CONTENT)
self.assertEqual(self.grid.processComputerPartitionList(), slapgrid.SLAPGRID_SUCCESS)
self.assertInstanceDirectoryListEqual(['0'])
partition = os.path.join(self.instance_root, '0')
self.assertItemsEqual(os.listdir(partition),
['.slapgrid', 'buildout.cfg', 'etc', 'software_release', 'worked', '.slapos-retention-lock-delay'])
self.assertItemsEqual(os.listdir(self.software_root),
[instance.software.software_hash])
self.assertEqual(computer.sequence,
['/getFullComputerInformation', '/availableComputerPartition',
'/stoppedComputerPartition'])
self.assertEqual('stopped', instance.state)
instance.requested_state = 'started'
computer.sequence = []
self.assertEqual(self.launchSlapgrid(), slapgrid.SLAPGRID_SUCCESS)
self.assertInstanceDirectoryListEqual(['0'])
partition = os.path.join(self.instance_root, '0')
self.assertItemsEqual(os.listdir(partition),
['.slapgrid', '.0_wrapper.log', 'etc',
'buildout.cfg', 'software_release', 'worked', '.slapos-retention-lock-delay'])
self.assertItemsEqual(os.listdir(self.software_root),
[instance.software.software_hash])
wrapper_log = os.path.join(instance.partition_path, '.0_wrapper.log')
self.assertLogContent(wrapper_log, 'Working')
self.assertEqual(computer.sequence,
['/getHateoasUrl', '/getFullComputerInformation', '/availableComputerPartition',
'/startedComputerPartition'])
self.assertEqual('started', instance.state)
def test_one_partition_destroyed(self):
"""
Test that an existing partition with "destroyed" status will only be
stopped by slapgrid-cp, not processed
"""
computer = ComputerForTest(self.software_root, self.instance_root)
with httmock.HTTMock(computer.request_handler):
instance = computer.instance_list[0]
instance.requested_state = 'destroyed'
dummy_file_name = 'dummy_file'
with open(os.path.join(instance.partition_path, dummy_file_name), 'w') as dummy_file:
dummy_file.write('dummy')
self.assertEqual(self.grid.processComputerPartitionList(), slapgrid.SLAPGRID_SUCCESS)
self.assertInstanceDirectoryListEqual(['0'])
partition = os.path.join(self.instance_root, '0')
self.assertItemsEqual(os.listdir(partition), ['.slapgrid', dummy_file_name])
self.assertItemsEqual(os.listdir(self.software_root), [instance.software.software_hash])
self.assertEqual(computer.sequence,
['/getFullComputerInformation',
'/stoppedComputerPartition'])
self.assertEqual('stopped', instance.state)
class TestSlapgridCPWithMasterWatchdog(MasterMixin, unittest.TestCase):
def setUp(self):
MasterMixin.setUp(self)
# Prepare watchdog
self.watchdog_banged = os.path.join(self._tempdir, 'watchdog_banged')
watchdog_path = os.path.join(self._tempdir, 'watchdog')
open(watchdog_path, 'w').write(WATCHDOG_TEMPLATE.format(
python_path=sys.executable,
sys_path=sys.path,
watchdog_banged=self.watchdog_banged
))
os.chmod(watchdog_path, 0o755)
self.grid.watchdog_path = watchdog_path
slapos.grid.slapgrid.WATCHDOG_PATH = watchdog_path
def test_one_failing_daemon_in_service_will_bang_with_watchdog(self):
"""
Check that a failing service watched by watchdog trigger bang
1.Prepare computer and set a service named daemon in etc/service
(to be watched by watchdog). This daemon will fail.
2.Prepare file for supervisord to call watchdog
-Set sys.path
-Monkeypatch computer partition bang
3.Check damemon is launched
4.Wait for it to fail
5.Wait for file generated by monkeypacthed bang to appear
"""
computer = ComputerForTest(self.software_root, self.instance_root)
with httmock.HTTMock(computer.request_handler):
partition = computer.instance_list[0]
partition.requested_state = 'started'
partition.software.setBuildout(DAEMON_CONTENT)
self.assertEqual(self.grid.processComputerPartitionList(), slapgrid.SLAPGRID_SUCCESS)
self.assertInstanceDirectoryListEqual(['0'])
self.assertItemsEqual(os.listdir(partition.partition_path),
['.slapgrid', '.0_daemon.log', 'buildout.cfg',
'etc', 'software_release', 'worked', '.slapos-retention-lock-delay'])
daemon_log = os.path.join(partition.partition_path, '.0_daemon.log')
self.assertLogContent(daemon_log, 'Failing')
self.assertIsCreated(self.watchdog_banged)
self.assertIn('daemon', open(self.watchdog_banged).read())
def test_one_failing_daemon_in_run_will_not_bang_with_watchdog(self):
"""
Check that a failing service watched by watchdog does not trigger bang
1.Prepare computer and set a service named daemon in etc/run
(not watched by watchdog). This daemon will fail.
2.Prepare file for supervisord to call watchdog
-Set sys.path
-Monkeypatch computer partition bang
3.Check damemon is launched
4.Wait for it to fail
5.Check that file generated by monkeypacthed bang do not appear
"""
computer = ComputerForTest(self.software_root, self.instance_root)
with httmock.HTTMock(computer.request_handler):
partition = computer.instance_list[0]
partition.requested_state = 'started'
# Content of run wrapper
WRAPPER_CONTENT = textwrap.dedent("""#!/bin/sh
touch ./launched
touch ./crashed
echo Failing
sleep 1
exit 111
""")
BUILDOUT_RUN_CONTENT = textwrap.dedent("""#!/bin/sh
mkdir -p etc/run &&
echo "%s" >> etc/run/daemon &&
chmod 755 etc/run/daemon &&
touch worked
""" % WRAPPER_CONTENT)
partition.software.setBuildout(BUILDOUT_RUN_CONTENT)
self.assertEqual(self.grid.processComputerPartitionList(), slapgrid.SLAPGRID_SUCCESS)
self.assertInstanceDirectoryListEqual(['0'])
self.assertItemsEqual(os.listdir(partition.partition_path),
['.slapgrid', '.0_daemon.log', 'buildout.cfg',
'etc', 'software_release', 'worked', '.slapos-retention-lock-delay'])
daemon_log = os.path.join(partition.partition_path, '.0_daemon.log')
self.assertLogContent(daemon_log, 'Failing')
self.assertIsNotCreated(self.watchdog_banged)
def test_watched_by_watchdog_bang(self):
"""
Test that a process going to fatal or exited mode in supervisord
is banged if watched by watchdog
Certificates used for the bang are also checked
(ie: watchdog id in process name)
"""
computer = ComputerForTest(self.software_root, self.instance_root)
with httmock.HTTMock(computer.request_handler):
instance = computer.instance_list[0]
certificate_repository_path = os.path.join(self._tempdir, 'partition_pki')
instance.setCertificate(certificate_repository_path)
watchdog = Watchdog(
master_url='https://127.0.0.1/',
computer_id=self.computer_id,
certificate_repository_path=certificate_repository_path
)
for event in watchdog.process_state_events:
instance.sequence = []
instance.header_list = []
headers = {'eventname': event}
payload = 'processname:%s groupname:%s from_state:RUNNING' % (
'daemon' + WATCHDOG_MARK, instance.name)
watchdog.handle_event(headers, payload)
self.assertEqual(instance.sequence, ['/softwareInstanceBang'])
def test_unwanted_events_will_not_bang(self):
"""
Test that a process going to a mode not watched by watchdog
in supervisord is not banged if watched by watchdog
"""
computer = ComputerForTest(self.software_root, self.instance_root)
instance = computer.instance_list[0]
watchdog = Watchdog(
master_url=self.master_url,
computer_id=self.computer_id,
certificate_repository_path=None
)
for event in ['EVENT', 'PROCESS_STATE', 'PROCESS_STATE_RUNNING',
'PROCESS_STATE_BACKOFF', 'PROCESS_STATE_STOPPED']:
computer.sequence = []
headers = {'eventname': event}
payload = 'processname:%s groupname:%s from_state:RUNNING' % (
'daemon' + WATCHDOG_MARK, instance.name)
watchdog.handle_event(headers, payload)
self.assertEqual(instance.sequence, [])
def test_not_watched_by_watchdog_do_not_bang(self):
"""
Test that a process going to fatal or exited mode in supervisord
is not banged if not watched by watchdog
(ie: no watchdog id in process name)
"""
computer = ComputerForTest(self.software_root, self.instance_root)
instance = computer.instance_list[0]
watchdog = Watchdog(
master_url=self.master_url,
computer_id=self.computer_id,
certificate_repository_path=None
)
for event in watchdog.process_state_events:
computer.sequence = []
headers = {'eventname': event}
payload = "processname:%s groupname:%s from_state:RUNNING"\
% ('daemon', instance.name)
watchdog.handle_event(headers, payload)
self.assertEqual(computer.sequence, [])
def test_watchdog_create_bang_file_after_bang(self):
"""
For a partition that has been successfully deployed (thus .timestamp file
existing), check that bang file is created and contains the timestamp of
.timestamp file.
"""
computer = ComputerForTest(self.software_root, self.instance_root)
with httmock.HTTMock(computer.request_handler):
instance = computer.instance_list[0]
certificate_repository_path = os.path.join(self._tempdir, 'partition_pki')
instance.setCertificate(certificate_repository_path)
partition = os.path.join(self.instance_root, '0')
timestamp_content = '1234'
timestamp_file = open(os.path.join(partition, slapos.grid.slapgrid.COMPUTER_PARTITION_TIMESTAMP_FILENAME), 'w')
timestamp_file.write(timestamp_content)
timestamp_file.close()
watchdog = Watchdog(
master_url='https://127.0.0.1/',
computer_id=self.computer_id,
certificate_repository_path=certificate_repository_path,
instance_root_path=self.instance_root
)
event = watchdog.process_state_events[0]
instance.sequence = []
instance.header_list = []
headers = {'eventname': event}
payload = 'processname:%s groupname:%s from_state:RUNNING' % (
'daemon' + WATCHDOG_MARK, instance.name)
watchdog.handle_event(headers, payload)
self.assertEqual(instance.sequence, ['/softwareInstanceBang'])
self.assertEqual(open(os.path.join(partition, slapos.grid.slapgrid.COMPUTER_PARTITION_LATEST_BANG_TIMESTAMP_FILENAME)).read(), timestamp_content)
def test_watchdog_ignore_bang_if_partition_not_deployed(self):
"""
For a partition that has never been successfully deployed (buildout is
failing, promise is not passing, etc), test that bang is ignored.
Practically speaking, .timestamp file in the partition does not exsit.
"""
computer = ComputerForTest(self.software_root, self.instance_root)
with httmock.HTTMock(computer.request_handler):
instance = computer.instance_list[0]
certificate_repository_path = os.path.join(self._tempdir, 'partition_pki')
instance.setCertificate(certificate_repository_path)
partition = os.path.join(self.instance_root, '0')
timestamp_content = '1234'
watchdog = Watchdog(
master_url='https://127.0.0.1/',
computer_id=self.computer_id,
certificate_repository_path=certificate_repository_path,
instance_root_path=self.instance_root
)
event = watchdog.process_state_events[0]
instance.sequence = []
instance.header_list = []
headers = {'eventname': event}
payload = 'processname:%s groupname:%s from_state:RUNNING' % (
'daemon' + WATCHDOG_MARK, instance.name)
watchdog.handle_event(headers, payload)
self.assertEqual(instance.sequence, ['/softwareInstanceBang'])
self.assertNotEqual(open(os.path.join(partition, slapos.grid.slapgrid.COMPUTER_PARTITION_LATEST_BANG_TIMESTAMP_FILENAME)).read(), timestamp_content)
def test_watchdog_bang_only_once_if_partition_never_deployed(self):
"""
For a partition that has been never successfully deployed (promises are not passing,
etc), test that:
* First bang is transmitted
* subsequent bangs are ignored until a deployment is successful.
"""
computer = ComputerForTest(self.software_root, self.instance_root)
with httmock.HTTMock(computer.request_handler):
instance = computer.instance_list[0]
certificate_repository_path = os.path.join(self._tempdir, 'partition_pki')
instance.setCertificate(certificate_repository_path)
partition = os.path.join(self.instance_root, '0')
watchdog = Watchdog(
master_url='https://127.0.0.1/',
computer_id=self.computer_id,
certificate_repository_path=certificate_repository_path,
instance_root_path=self.instance_root
)
# First bang
event = watchdog.process_state_events[0]
instance.sequence = []
instance.header_list = []
headers = {'eventname': event}
payload = 'processname:%s groupname:%s from_state:RUNNING' % (
'daemon' + WATCHDOG_MARK, instance.name)
watchdog.handle_event(headers, payload)
self.assertEqual(instance.sequence, ['/softwareInstanceBang'])
# Second bang
event = watchdog.process_state_events[0]
instance.sequence = []
instance.header_list = []
headers = {'eventname': event}
payload = 'processname:%s groupname:%s from_state:RUNNING' % (
'daemon' + WATCHDOG_MARK, instance.name)
watchdog.handle_event(headers, payload)
self.assertEqual(instance.sequence, [])
def test_watchdog_bang_only_once_if_timestamp_did_not_change(self):
"""
For a partition that has been successfully deployed (promises are passing,
etc), test that:
* First bang is transmitted
* subsequent bangs are ignored until a new deployment is successful.
Scenario:
* slapgrid successfully deploys a partition
* A process crashes, watchdog calls bang
* Another deployment (run of slapgrid) is done, but not successful (
promise is failing)
* The process crashes again, but watchdog ignores it
* Yet another deployment is done, and it is successful
* The process crashes again, watchdog calls bang
* The process crashes again, watchdog ignroes it
"""
computer = ComputerForTest(self.software_root, self.instance_root)
with httmock.HTTMock(computer.request_handler):
instance = computer.instance_list[0]
certificate_repository_path = os.path.join(self._tempdir, 'partition_pki')
instance.setCertificate(certificate_repository_path)
partition = os.path.join(self.instance_root, '0')
timestamp_content = '1234'
timestamp_file = open(os.path.join(partition, slapos.grid.slapgrid.COMPUTER_PARTITION_TIMESTAMP_FILENAME), 'w')
timestamp_file.write(timestamp_content)
timestamp_file.close()
watchdog = Watchdog(
master_url='https://127.0.0.1/',
computer_id=self.computer_id,
certificate_repository_path=certificate_repository_path,
instance_root_path=self.instance_root
)
# First bang
event = watchdog.process_state_events[0]
instance.sequence = []
instance.header_list = []
headers = {'eventname': event}
payload = 'processname:%s groupname:%s from_state:RUNNING' % (
'daemon' + WATCHDOG_MARK, instance.name)
watchdog.handle_event(headers, payload)
self.assertEqual(instance.sequence, ['/softwareInstanceBang'])
self.assertEqual(open(os.path.join(partition, slapos.grid.slapgrid.COMPUTER_PARTITION_LATEST_BANG_TIMESTAMP_FILENAME)).read(), timestamp_content)
# Second bang
event = watchdog.process_state_events[0]
instance.sequence = []
instance.header_list = []
headers = {'eventname': event}
payload = 'processname:%s groupname:%s from_state:RUNNING' % (
'daemon' + WATCHDOG_MARK, instance.name)
watchdog.handle_event(headers, payload)
self.assertEqual(instance.sequence, [])
# Second successful deployment
timestamp_content = '12345'
timestamp_file = open(os.path.join(partition, slapos.grid.slapgrid.COMPUTER_PARTITION_TIMESTAMP_FILENAME), 'w')
timestamp_file.write(timestamp_content)
timestamp_file.close()
# Third bang
event = watchdog.process_state_events[0]
instance.sequence = []
instance.header_list = []
headers = {'eventname': event}
payload = 'processname:%s groupname:%s from_state:RUNNING' % (
'daemon' + WATCHDOG_MARK, instance.name)
watchdog.handle_event(headers, payload)
self.assertEqual(instance.sequence, ['/softwareInstanceBang'])
self.assertEqual(open(os.path.join(partition, slapos.grid.slapgrid.COMPUTER_PARTITION_LATEST_BANG_TIMESTAMP_FILENAME)).read(), timestamp_content)
# Fourth bang
event = watchdog.process_state_events[0]
instance.sequence = []
instance.header_list = []
headers = {'eventname': event}
payload = 'processname:%s groupname:%s from_state:RUNNING' % (
'daemon' + WATCHDOG_MARK, instance.name)
watchdog.handle_event(headers, payload)
self.assertEqual(instance.sequence, [])
class TestSlapgridCPPartitionProcessing(MasterMixin, unittest.TestCase):
def test_partition_timestamp(self):
computer = ComputerForTest(self.software_root, self.instance_root)
with httmock.HTTMock(computer.request_handler):
instance = computer.instance_list[0]
timestamp = str(int(time.time()))
instance.timestamp = timestamp
self.assertEqual(self.grid.processComputerPartitionList(), slapgrid.SLAPGRID_SUCCESS)
self.assertInstanceDirectoryListEqual(['0'])
partition = os.path.join(self.instance_root, '0')
self.assertItemsEqual(os.listdir(partition),
['.slapgrid', '.timestamp', 'buildout.cfg', 'software_release', 'worked', '.slapos-retention-lock-delay'])
self.assertItemsEqual(os.listdir(self.software_root), [instance.software.software_hash])
timestamp_path = os.path.join(instance.partition_path, '.timestamp')
self.setSlapgrid()
self.assertEqual(self.grid.processComputerPartitionList(), slapgrid.SLAPGRID_SUCCESS)
self.assertIn(timestamp, open(timestamp_path).read())
self.assertEqual(instance.sequence,
['/availableComputerPartition', '/stoppedComputerPartition'])
def test_partition_timestamp_develop(self):
computer = ComputerForTest(self.software_root, self.instance_root)
with httmock.HTTMock(computer.request_handler):
instance = computer.instance_list[0]
timestamp = str(int(time.time()))
instance.timestamp = timestamp
self.assertEqual(self.grid.processComputerPartitionList(), slapgrid.SLAPGRID_SUCCESS)
self.assertInstanceDirectoryListEqual(['0'])
partition = os.path.join(self.instance_root, '0')
self.assertItemsEqual(os.listdir(partition),
['.slapgrid', '.timestamp', 'buildout.cfg',
'software_release', 'worked', '.slapos-retention-lock-delay'])
self.assertItemsEqual(os.listdir(self.software_root), [instance.software.software_hash])
self.assertEqual(self.launchSlapgrid(develop=True),
slapgrid.SLAPGRID_SUCCESS)
self.assertEqual(self.launchSlapgrid(), slapgrid.SLAPGRID_SUCCESS)
self.assertEqual(instance.sequence,
['/availableComputerPartition', '/stoppedComputerPartition',
'/availableComputerPartition', '/stoppedComputerPartition'])
def test_partition_old_timestamp(self):
computer = ComputerForTest(self.software_root, self.instance_root)
with httmock.HTTMock(computer.request_handler):
instance = computer.instance_list[0]
timestamp = str(int(time.time()))
instance.timestamp = timestamp
self.assertEqual(self.grid.processComputerPartitionList(), slapgrid.SLAPGRID_SUCCESS)
self.assertInstanceDirectoryListEqual(['0'])
partition = os.path.join(self.instance_root, '0')
self.assertItemsEqual(os.listdir(partition),
['.slapgrid', '.timestamp', 'buildout.cfg', 'software_release', 'worked', '.slapos-retention-lock-delay'])
self.assertItemsEqual(os.listdir(self.software_root), [instance.software.software_hash])
instance.timestamp = str(int(timestamp) - 1)
self.assertEqual(self.launchSlapgrid(), slapgrid.SLAPGRID_SUCCESS)
self.assertEqual(instance.sequence,
['/availableComputerPartition', '/stoppedComputerPartition'])
def test_partition_timestamp_new_timestamp(self):
computer = ComputerForTest(self.software_root, self.instance_root)
with httmock.HTTMock(computer.request_handler):
instance = computer.instance_list[0]
timestamp = str(int(time.time()))
instance.timestamp = timestamp
self.assertEqual(self.launchSlapgrid(), slapgrid.SLAPGRID_SUCCESS)
self.assertInstanceDirectoryListEqual(['0'])
partition = os.path.join(self.instance_root, '0')
self.assertItemsEqual(os.listdir(partition),
['.slapgrid', '.timestamp', 'buildout.cfg', 'software_release', 'worked', '.slapos-retention-lock-delay'])
self.assertItemsEqual(os.listdir(self.software_root), [instance.software.software_hash])
instance.timestamp = str(int(timestamp) + 1)
self.assertEqual(self.launchSlapgrid(), slapgrid.SLAPGRID_SUCCESS)
self.assertEqual(self.launchSlapgrid(), slapgrid.SLAPGRID_SUCCESS)
self.assertEqual(computer.sequence,
['/getHateoasUrl',
'/getFullComputerInformation', '/availableComputerPartition',
'/stoppedComputerPartition',
'/getHateoasUrl', '/getFullComputerInformation',
'/availableComputerPartition', '/stoppedComputerPartition',
'/getHateoasUrl',
'/getFullComputerInformation'])
def test_partition_timestamp_no_timestamp(self):
computer = ComputerForTest(self.software_root, self.instance_root)
with httmock.HTTMock(computer.request_handler):
instance = computer.instance_list[0]
timestamp = str(int(time.time()))
instance.timestamp = timestamp
self.launchSlapgrid()
self.assertInstanceDirectoryListEqual(['0'])
partition = os.path.join(self.instance_root, '0')
self.assertItemsEqual(os.listdir(partition),
['.slapgrid', '.timestamp', 'buildout.cfg', 'software_release', 'worked', '.slapos-retention-lock-delay'])
self.assertItemsEqual(os.listdir(self.software_root),
[instance.software.software_hash])
instance.timestamp = None
self.launchSlapgrid()
self.assertEqual(computer.sequence,
['/getHateoasUrl',
'/getFullComputerInformation', '/availableComputerPartition',
'/stoppedComputerPartition',
'/getHateoasUrl', '/getFullComputerInformation',
'/availableComputerPartition', '/stoppedComputerPartition'])
def test_partition_periodicity_remove_timestamp(self):
"""
Check that if periodicity forces run of buildout for a partition, it
removes the .timestamp file.
"""
computer = ComputerForTest(self.software_root, self.instance_root)
with httmock.HTTMock(computer.request_handler):
instance = computer.instance_list[0]
timestamp = str(int(time.time()))
instance.timestamp = timestamp
instance.requested_state = 'started'
instance.software.setPeriodicity(1)
self.launchSlapgrid()
partition = os.path.join(self.instance_root, '0')
self.assertItemsEqual(os.listdir(partition),
['.slapgrid', '.timestamp', 'buildout.cfg',
'software_release', 'worked', '.slapos-retention-lock-delay'])
time.sleep(2)
# dummify install() so that it doesn't actually do anything so that it
# doesn't recreate .timestamp.
instance.install = lambda: None
self.launchSlapgrid()
self.assertItemsEqual(os.listdir(partition),
['.slapgrid', '.timestamp', 'buildout.cfg',
'software_release', 'worked', '.slapos-retention-lock-delay'])
def test_one_partition_periodicity_from_file_does_not_disturb_others(self):
"""
If time between last processing of instance and now is superior
to periodicity then instance should be proceed
1. We set a wanted maximum_periodicity in periodicity file in
in one software release directory and not the other one
2. We process computer partition and check if wanted_periodicity was
used as maximum_periodicty
3. We wait for a time superior to wanted_periodicty
4. We launch processComputerPartition and check that partition using
software with periodicity was runned and not the other
5. We check that modification time of .timestamp was modified
"""
computer = ComputerForTest(self.software_root, self.instance_root, 20, 20)
with httmock.HTTMock(computer.request_handler):
instance0 = computer.instance_list[0]
timestamp = str(int(time.time() - 5))
instance0.timestamp = timestamp
instance0.requested_state = 'started'
for instance in computer.instance_list[1:]:
instance.software = \
computer.software_list[computer.instance_list.index(instance)]
instance.timestamp = timestamp
wanted_periodicity = 1
instance0.software.setPeriodicity(wanted_periodicity)
self.launchSlapgrid()
self.assertNotEqual(wanted_periodicity, self.grid.maximum_periodicity)
last_runtime = os.path.getmtime(
os.path.join(instance0.partition_path, '.timestamp'))
time.sleep(wanted_periodicity + 1)
for instance in computer.instance_list[1:]:
self.assertEqual(instance.sequence,
['/availableComputerPartition', '/stoppedComputerPartition'])
time.sleep(1)
self.launchSlapgrid()
self.assertEqual(instance0.sequence,
['/availableComputerPartition', '/startedComputerPartition',
'/availableComputerPartition', '/startedComputerPartition',
])
for instance in computer.instance_list[1:]:
self.assertEqual(instance.sequence,
['/availableComputerPartition', '/stoppedComputerPartition'])
self.assertGreater(
os.path.getmtime(os.path.join(instance0.partition_path, '.timestamp')),
last_runtime)
self.assertNotEqual(wanted_periodicity, self.grid.maximum_periodicity)
def test_one_partition_stopped_is_not_processed_after_periodicity(self):
"""
Check that periodicity forces processing a partition even if it is not
started.
"""
computer = ComputerForTest(self.software_root, self.instance_root, 20, 20)
with httmock.HTTMock(computer.request_handler):
instance0 = computer.instance_list[0]
timestamp = str(int(time.time() - 5))
instance0.timestamp = timestamp
for instance in computer.instance_list[1:]:
instance.software = \
computer.software_list[computer.instance_list.index(instance)]
instance.timestamp = timestamp
wanted_periodicity = 1
instance0.software.setPeriodicity(wanted_periodicity)
self.launchSlapgrid()
self.assertNotEqual(wanted_periodicity, self.grid.maximum_periodicity)
last_runtime = os.path.getmtime(
os.path.join(instance0.partition_path, '.timestamp'))
time.sleep(wanted_periodicity + 1)
for instance in computer.instance_list[1:]:
self.assertEqual(instance.sequence,
['/availableComputerPartition', '/stoppedComputerPartition'])
time.sleep(1)
self.launchSlapgrid()
self.assertEqual(instance0.sequence,
['/availableComputerPartition', '/stoppedComputerPartition',
'/availableComputerPartition', '/stoppedComputerPartition'])
for instance in computer.instance_list[1:]:
self.assertEqual(instance.sequence,
['/availableComputerPartition', '/stoppedComputerPartition'])
self.assertNotEqual(os.path.getmtime(os.path.join(instance0.partition_path,
'.timestamp')),
last_runtime)
self.assertNotEqual(wanted_periodicity, self.grid.maximum_periodicity)
def test_one_partition_destroyed_is_not_processed_after_periodicity(self):
"""
Check that periodicity forces processing a partition even if it is not
started.
"""
computer = ComputerForTest(self.software_root, self.instance_root, 20, 20)
with httmock.HTTMock(computer.request_handler):
instance0 = computer.instance_list[0]
timestamp = str(int(time.time() - 5))
instance0.timestamp = timestamp
instance0.requested_state = 'stopped'
for instance in computer.instance_list[1:]:
instance.software = \
computer.software_list[computer.instance_list.index(instance)]
instance.timestamp = timestamp
wanted_periodicity = 1
instance0.software.setPeriodicity(wanted_periodicity)
self.launchSlapgrid()
self.assertNotEqual(wanted_periodicity, self.grid.maximum_periodicity)
last_runtime = os.path.getmtime(
os.path.join(instance0.partition_path, '.timestamp'))
time.sleep(wanted_periodicity + 1)
for instance in computer.instance_list[1:]:
self.assertEqual(instance.sequence,
['/availableComputerPartition', '/stoppedComputerPartition'])
time.sleep(1)
instance0.requested_state = 'destroyed'
self.launchSlapgrid()
self.assertEqual(instance0.sequence,
['/availableComputerPartition', '/stoppedComputerPartition',
'/stoppedComputerPartition'])
for instance in computer.instance_list[1:]:
self.assertEqual(instance.sequence,
['/availableComputerPartition', '/stoppedComputerPartition'])
self.assertNotEqual(os.path.getmtime(os.path.join(instance0.partition_path,
'.timestamp')),
last_runtime)
self.assertNotEqual(wanted_periodicity, self.grid.maximum_periodicity)
def test_one_partition_is_never_processed_when_periodicity_is_negative(self):
"""
Checks that a partition is not processed when
its periodicity is negative
1. We setup one instance and set periodicity at -1
2. We mock the install method from slapos.grid.slapgrid.Partition
3. We launch slapgrid once so that .timestamp file is created and check that install method is
indeed called (through mocked_method.called
4. We launch slapgrid anew and check that install as not been called again
"""
computer = ComputerForTest(self.software_root, self.instance_root, 1, 1)
with httmock.HTTMock(computer.request_handler):
timestamp = str(int(time.time()))
instance = computer.instance_list[0]
instance.software.setPeriodicity(-1)
instance.timestamp = timestamp
with patch.object(slapos.grid.slapgrid.Partition, 'install', return_value=None) as mock_method:
self.launchSlapgrid()
self.assertTrue(mock_method.called)
self.launchSlapgrid()
self.assertEqual(mock_method.call_count, 1)
def test_one_partition_is_always_processed_when_periodicity_is_zero(self):
"""
Checks that a partition is always processed when
its periodicity is 0
1. We setup one instance and set periodicity at 0
2. We mock the install method from slapos.grid.slapgrid.Partition
3. We launch slapgrid once so that .timestamp file is created
4. We launch slapgrid anew and check that install has been called twice (one time because of the
new setup and one time because of periodicity = 0)
"""
computer = ComputerForTest(self.software_root, self.instance_root, 1, 1)
with httmock.HTTMock(computer.request_handler):
timestamp = str(int(time.time()))
instance = computer.instance_list[0]
instance.software.setPeriodicity(0)
instance.timestamp = timestamp
with patch.object(slapos.grid.slapgrid.Partition, 'install', return_value=None) as mock_method:
self.launchSlapgrid()
self.launchSlapgrid()
self.assertEqual(mock_method.call_count, 2)
def test_one_partition_buildout_fail_does_not_disturb_others(self):
"""
1. We set up two instance one using a corrupted buildout
2. One will fail but the other one will be processed correctly
"""
computer = ComputerForTest(self.software_root, self.instance_root, 2, 2)
with httmock.HTTMock(computer.request_handler):
instance0 = computer.instance_list[0]
instance1 = computer.instance_list[1]
instance1.software = computer.software_list[1]
instance0.software.setBuildout("""#!/bin/sh
exit 42""")
self.launchSlapgrid()
self.assertEqual(instance0.sequence,
['/softwareInstanceError'])
self.assertEqual(instance1.sequence,
['/availableComputerPartition', '/stoppedComputerPartition'])
def test_one_partition_lacking_software_path_does_not_disturb_others(self):
"""
1. We set up two instance but remove software path of one
2. One will fail but the other one will be processed correctly
"""
computer = ComputerForTest(self.software_root, self.instance_root, 2, 2)
with httmock.HTTMock(computer.request_handler):
instance0 = computer.instance_list[0]
instance1 = computer.instance_list[1]
instance1.software = computer.software_list[1]
shutil.rmtree(instance0.software.srdir)
self.launchSlapgrid()
self.assertEqual(instance0.sequence,
['/softwareInstanceError'])
self.assertEqual(instance1.sequence,
['/availableComputerPartition', '/stoppedComputerPartition'])
def test_one_partition_lacking_software_bin_path_does_not_disturb_others(self):
"""
1. We set up two instance but remove software bin path of one
2. One will fail but the other one will be processed correctly
"""
computer = ComputerForTest(self.software_root, self.instance_root, 2, 2)
with httmock.HTTMock(computer.request_handler):
instance0 = computer.instance_list[0]
instance1 = computer.instance_list[1]
instance1.software = computer.software_list[1]
shutil.rmtree(instance0.software.srbindir)
self.launchSlapgrid()
self.assertEqual(instance0.sequence,
['/softwareInstanceError'])
self.assertEqual(instance1.sequence,
['/availableComputerPartition', '/stoppedComputerPartition'])
def test_one_partition_lacking_path_does_not_disturb_others(self):
"""
1. We set up two instances but remove path of one
2. One will fail but the other one will be processed correctly
"""
computer = ComputerForTest(self.software_root, self.instance_root, 2, 2)
with httmock.HTTMock(computer.request_handler):
instance0 = computer.instance_list[0]
instance1 = computer.instance_list[1]
instance1.software = computer.software_list[1]
shutil.rmtree(instance0.partition_path)
self.launchSlapgrid()
self.assertEqual(instance0.sequence,
['/softwareInstanceError'])
self.assertEqual(instance1.sequence,
['/availableComputerPartition', '/stoppedComputerPartition'])
def test_one_partition_buildout_fail_is_correctly_logged(self):
"""
1. We set up an instance using a corrupted buildout
2. It will fail, make sure that whole log is sent to master
"""
computer = ComputerForTest(self.software_root, self.instance_root, 1, 1)
with httmock.HTTMock(computer.request_handler):
instance = computer.instance_list[0]
line1 = "Nerdy kitten: Can I haz a process crash?"
line2 = "Cedric: Sure, here it is."
instance.software.setBuildout("""#!/bin/sh
echo %s; echo %s; exit 42""" % (line1, line2))
self.launchSlapgrid()
self.assertEqual(instance.sequence, ['/softwareInstanceError'])
# We don't care of actual formatting, we just want to have full log
self.assertIn(line1, instance.error_log)
self.assertIn(line2, instance.error_log)
self.assertIn('Failed to run buildout', instance.error_log)
class TestSlapgridUsageReport(MasterMixin, unittest.TestCase):
"""
Test suite about slapgrid-ur
"""
def test_slapgrid_destroys_instance_to_be_destroyed(self):
"""
Test than an instance in "destroyed" state is correctly destroyed
"""
computer = ComputerForTest(self.software_root, self.instance_root)
with httmock.HTTMock(computer.request_handler):
instance = computer.instance_list[0]
instance.requested_state = 'started'
instance.software.setBuildout(WRAPPER_CONTENT)
self.assertEqual(self.grid.processComputerPartitionList(), slapgrid.SLAPGRID_SUCCESS)
self.assertInstanceDirectoryListEqual(['0'])
self.assertItemsEqual(os.listdir(instance.partition_path),
['.slapgrid', '.0_wrapper.log', 'buildout.cfg',
'etc', 'software_release', 'worked', '.slapos-retention-lock-delay'])
wrapper_log = os.path.join(instance.partition_path, '.0_wrapper.log')
self.assertLogContent(wrapper_log, 'Working')
self.assertItemsEqual(os.listdir(self.software_root), [instance.software.software_hash])
self.assertEqual(computer.sequence,
['/getFullComputerInformation',
'/availableComputerPartition',
'/startedComputerPartition'])
self.assertEqual(instance.state, 'started')
# Then destroy the instance
computer.sequence = []
instance.requested_state = 'destroyed'
self.assertEqual(self.grid.agregateAndSendUsage(), slapgrid.SLAPGRID_SUCCESS)
# Assert partition directory is empty
self.assertInstanceDirectoryListEqual(['0'])
self.assertItemsEqual(os.listdir(instance.partition_path), [])
self.assertItemsEqual(os.listdir(self.software_root),
[instance.software.software_hash])
# Assert supervisor stopped process
wrapper_log = os.path.join(instance.partition_path, '.0_wrapper.log')
self.assertIsNotCreated(wrapper_log)
self.assertEqual(computer.sequence,
['/getFullComputerInformation',
'/stoppedComputerPartition',
'/destroyedComputerPartition'])
self.assertEqual(instance.state, 'destroyed')
def test_partition_list_is_complete_if_empty_destroyed_partition(self):
"""
Test that an empty partition with destroyed state but with SR informations
Is correctly destroyed
Axiom: each valid partition has a state and a software_release.
Scenario:
1. Simulate computer containing one "destroyed" partition but with valid SR
2. See if it destroyed
"""
computer = ComputerForTest(self.software_root, self.instance_root)
with httmock.HTTMock(computer.request_handler):
instance = computer.instance_list[0]
computer.sequence = []
instance.requested_state = 'destroyed'
self.assertEqual(self.grid.agregateAndSendUsage(), slapgrid.SLAPGRID_SUCCESS)
# Assert partition directory is empty
self.assertInstanceDirectoryListEqual(['0'])
self.assertItemsEqual(os.listdir(instance.partition_path), [])
self.assertItemsEqual(os.listdir(self.software_root),
[instance.software.software_hash])
# Assert supervisor stopped process
wrapper_log = os.path.join(instance.partition_path, '.0_wrapper.log')
self.assertIsNotCreated(wrapper_log)
self.assertEqual(
computer.sequence,
['/getFullComputerInformation', '/stoppedComputerPartition', '/destroyedComputerPartition'])
def test_slapgrid_not_destroy_bad_instance(self):
"""
Checks that slapgrid-ur don't destroy instance not to be destroyed.
"""
computer = ComputerForTest(self.software_root, self.instance_root)
with httmock.HTTMock(computer.request_handler):
instance = computer.instance_list[0]
instance.requested_state = 'started'
instance.software.setBuildout(WRAPPER_CONTENT)
self.assertEqual(self.grid.processComputerPartitionList(), slapgrid.SLAPGRID_SUCCESS)
self.assertInstanceDirectoryListEqual(['0'])
self.assertItemsEqual(os.listdir(instance.partition_path),
['.slapgrid', '.0_wrapper.log', 'buildout.cfg',
'etc', 'software_release', 'worked', '.slapos-retention-lock-delay'])
wrapper_log = os.path.join(instance.partition_path, '.0_wrapper.log')
self.assertLogContent(wrapper_log, 'Working')
self.assertItemsEqual(os.listdir(self.software_root), [instance.software.software_hash])
self.assertEqual(computer.sequence,
['/getFullComputerInformation',
'/availableComputerPartition',
'/startedComputerPartition'])
self.assertEqual('started', instance.state)
# Then run usage report and see if it is still working
computer.sequence = []
self.assertEqual(self.grid.agregateAndSendUsage(), slapgrid.SLAPGRID_SUCCESS)
# registerComputerPartition will create one more file:
from slapos.slap.slap import COMPUTER_PARTITION_REQUEST_LIST_TEMPLATE_FILENAME
request_list_file = COMPUTER_PARTITION_REQUEST_LIST_TEMPLATE_FILENAME % instance.name
self.assertInstanceDirectoryListEqual(['0'])
self.assertItemsEqual(os.listdir(instance.partition_path),
['.slapgrid', '.0_wrapper.log', 'buildout.cfg',
'etc', 'software_release', 'worked',
'.slapos-retention-lock-delay', request_list_file])
wrapper_log = os.path.join(instance.partition_path, '.0_wrapper.log')
self.assertLogContent(wrapper_log, 'Working')
self.assertInstanceDirectoryListEqual(['0'])
self.assertItemsEqual(os.listdir(instance.partition_path),
['.slapgrid', '.0_wrapper.log', 'buildout.cfg',
'etc', 'software_release', 'worked',
'.slapos-retention-lock-delay', request_list_file])
wrapper_log = os.path.join(instance.partition_path, '.0_wrapper.log')
self.assertLogContent(wrapper_log, 'Working')
self.assertEqual(computer.sequence,
['/getFullComputerInformation'])
self.assertEqual('started', instance.state)
def test_slapgrid_instance_ignore_free_instance(self):
"""
Test than a free instance (so in "destroyed" state, but empty, without
software_release URI) is ignored by slapgrid-cp.
"""
computer = ComputerForTest(self.software_root, self.instance_root)
with httmock.HTTMock(computer.request_handler):
instance = computer.instance_list[0]
instance.software.name = None
computer.sequence = []
instance.requested_state = 'destroyed'
self.assertEqual(self.grid.processComputerPartitionList(), slapgrid.SLAPGRID_SUCCESS)
# Assert partition directory is empty
self.assertInstanceDirectoryListEqual(['0'])
self.assertItemsEqual(os.listdir(instance.partition_path), [])
self.assertItemsEqual(os.listdir(self.software_root), [instance.software.software_hash])
self.assertEqual(computer.sequence, ['/getFullComputerInformation'])
def test_slapgrid_report_ignore_free_instance(self):
"""
Test than a free instance (so in "destroyed" state, but empty, without
software_release URI) is ignored by slapgrid-ur.
"""
computer = ComputerForTest(self.software_root, self.instance_root)
with httmock.HTTMock(computer.request_handler):
instance = computer.instance_list[0]
instance.software.name = None
computer.sequence = []
instance.requested_state = 'destroyed'
self.assertEqual(self.grid.agregateAndSendUsage(), slapgrid.SLAPGRID_SUCCESS)
# Assert partition directory is empty
self.assertInstanceDirectoryListEqual(['0'])
self.assertItemsEqual(os.listdir(instance.partition_path), [])
self.assertItemsEqual(os.listdir(self.software_root), [instance.software.software_hash])
self.assertEqual(computer.sequence, ['/getFullComputerInformation'])
class TestSlapgridSoftwareRelease(MasterMixin, unittest.TestCase):
def test_one_software_buildout_fail_is_correctly_logged(self):
"""
1. We set up a software using a corrupted buildout
2. It will fail, make sure that whole log is sent to master
"""
computer = ComputerForTest(self.software_root, self.instance_root, 1, 1)
with httmock.HTTMock(computer.request_handler):
software = computer.software_list[0]
line1 = "Nerdy kitten: Can I haz a process crash?"
line2 = "Cedric: Sure, here it is."
software.setBuildout("""#!/bin/sh
echo %s; echo %s; exit 42""" % (line1, line2))
self.launchSlapgridSoftware()
self.assertEqual(software.sequence,
['/buildingSoftwareRelease', '/softwareReleaseError'])
# We don't care of actual formatting, we just want to have full log
self.assertIn(line1, software.error_log)
self.assertIn(line2, software.error_log)
self.assertIn('Failed to run buildout', software.error_log)
class SlapgridInitialization(unittest.TestCase):
"""
"Abstract" class setting setup and teardown for TestSlapgridArgumentTuple
and TestSlapgridConfigurationFile.
"""
def setUp(self):
"""
Create the minimun default argument and configuration.
"""
self.certificate_repository_path = tempfile.mkdtemp()
self.fake_file_descriptor = tempfile.NamedTemporaryFile()
self.slapos_config_descriptor = tempfile.NamedTemporaryFile()
self.slapos_config_descriptor.write("""
[slapos]
software_root = /opt/slapgrid
instance_root = /srv/slapgrid
master_url = https://slap.vifib.com/
computer_id = your computer id
buildout = /path/to/buildout/binary
""")
self.slapos_config_descriptor.seek(0)
self.default_arg_tuple = (
'--cert_file', self.fake_file_descriptor.name,
'--key_file', self.fake_file_descriptor.name,
'--master_ca_file', self.fake_file_descriptor.name,
'--certificate_repository_path', self.certificate_repository_path,
'-c', self.slapos_config_descriptor.name, '--now')
self.signature_key_file_descriptor = tempfile.NamedTemporaryFile()
self.signature_key_file_descriptor.seek(0)
def tearDown(self):
"""
Removing the temp file.
"""
self.fake_file_descriptor.close()
self.slapos_config_descriptor.close()
self.signature_key_file_descriptor.close()
shutil.rmtree(self.certificate_repository_path, True)
class TestSlapgridCPWithMasterPromise(MasterMixin, unittest.TestCase):
def test_one_failing_promise(self):
computer = ComputerForTest(self.software_root, self.instance_root)
with httmock.HTTMock(computer.request_handler):
instance = computer.instance_list[0]
instance.requested_state = 'started'
worked_file = os.path.join(instance.partition_path, 'fail_worked')
fail = textwrap.dedent("""\
#!/usr/bin/env sh
touch "%s"
exit 127""" % worked_file)
instance.setPromise('fail', fail)
self.assertEqual(self.grid.processComputerPartitionList(),
slapos.grid.slapgrid.SLAPGRID_PROMISE_FAIL)
self.assertTrue(os.path.isfile(worked_file))
self.assertTrue(instance.error)
self.assertNotEqual('started', instance.state)
def test_one_succeeding_promise(self):
computer = ComputerForTest(self.software_root, self.instance_root)
with httmock.HTTMock(computer.request_handler):
instance = computer.instance_list[0]
instance.requested_state = 'started'
self.fake_waiting_time = 0.1
worked_file = os.path.join(instance.partition_path, 'succeed_worked')
succeed = textwrap.dedent("""\
#!/usr/bin/env sh
touch "%s"
exit 0""" % worked_file)
instance.setPromise('succeed', succeed)
self.assertEqual(self.grid.processComputerPartitionList(), slapgrid.SLAPGRID_SUCCESS)
self.assertTrue(os.path.isfile(worked_file))
self.assertFalse(instance.error)
self.assertEqual(instance.state, 'started')
def test_stderr_has_been_sent(self):
computer = ComputerForTest(self.software_root, self.instance_root)
with httmock.HTTMock(computer.request_handler):
instance = computer.instance_list[0]
instance.requested_state = 'started'
self.fake_waiting_time = 0.5
promise_path = os.path.join(instance.partition_path, 'etc', 'promise')
os.makedirs(promise_path)
succeed = os.path.join(promise_path, 'stderr_writer')
worked_file = os.path.join(instance.partition_path, 'stderr_worked')
with open(succeed, 'w') as f:
f.write(textwrap.dedent("""\
#!/usr/bin/env sh
touch "%s"
echo Error 1>&2
exit 127""" % worked_file))
os.chmod(succeed, 0o777)
self.assertEqual(self.grid.processComputerPartitionList(),
slapos.grid.slapgrid.SLAPGRID_PROMISE_FAIL)
self.assertTrue(os.path.isfile(worked_file))
self.assertEqual(instance.error_log[-5:], 'Error')
self.assertTrue(instance.error)
self.assertIsNone(instance.state)
def test_timeout_works(self):
computer = ComputerForTest(self.software_root, self.instance_root)
with httmock.HTTMock(computer.request_handler):
instance = computer.instance_list[0]
instance.requested_state = 'started'
self.fake_waiting_time = 0.1
promise_path = os.path.join(instance.partition_path, 'etc', 'promise')
os.makedirs(promise_path)
succeed = os.path.join(promise_path, 'timed_out_promise')
worked_file = os.path.join(instance.partition_path, 'timed_out_worked')
with open(succeed, 'w') as f:
f.write(textwrap.dedent("""\
#!/usr/bin/env sh
touch "%s"
sleep 5
exit 0""" % worked_file))
os.chmod(succeed, 0o777)
self.assertEqual(self.grid.processComputerPartitionList(),
slapos.grid.slapgrid.SLAPGRID_PROMISE_FAIL)
self.assertTrue(os.path.isfile(worked_file))
self.assertTrue(instance.error)
self.assertIsNone(instance.state)
def test_two_succeeding_promises(self):
computer = ComputerForTest(self.software_root, self.instance_root)
with httmock.HTTMock(computer.request_handler):
instance = computer.instance_list[0]
instance.requested_state = 'started'
self.fake_waiting_time = 0.1
for i in range(2):
worked_file = os.path.join(instance.partition_path, 'succeed_%s_worked' % i)
succeed = textwrap.dedent("""\
#!/usr/bin/env sh
touch "%s"
exit 0""" % worked_file)
instance.setPromise('succeed_%s' % i, succeed)
self.assertEqual(self.grid.processComputerPartitionList(), slapgrid.SLAPGRID_SUCCESS)
for i in range(2):
worked_file = os.path.join(instance.partition_path, 'succeed_%s_worked' % i)
self.assertTrue(os.path.isfile(worked_file))
self.assertFalse(instance.error)
self.assertEqual(instance.state, 'started')
def test_one_succeeding_one_failing_promises(self):
computer = ComputerForTest(self.software_root, self.instance_root)
with httmock.HTTMock(computer.request_handler):
instance = computer.instance_list[0]
instance.requested_state = 'started'
self.fake_waiting_time = 0.1
for i in range(2):
worked_file = os.path.join(instance.partition_path, 'promise_worked_%d' % i)
lockfile = os.path.join(instance.partition_path, 'lock')
promise = textwrap.dedent("""\
#!/usr/bin/env sh
touch "%(worked_file)s"
if [ ! -f %(lockfile)s ]
then
touch "%(lockfile)s"
exit 0
else
exit 127
fi""" % {
'worked_file': worked_file,
'lockfile': lockfile
})
instance.setPromise('promise_%s' % i, promise)
self.assertEqual(self.grid.processComputerPartitionList(),
slapos.grid.slapgrid.SLAPGRID_PROMISE_FAIL)
self.assertEquals(instance.error, 1)
self.assertNotEqual('started', instance.state)
def test_one_succeeding_one_timing_out_promises(self):
computer = ComputerForTest(self.software_root, self.instance_root)
with httmock.HTTMock(computer.request_handler):
instance = computer.instance_list[0]
instance.requested_state = 'started'
self.fake_waiting_time = 0.1
for i in range(2):
worked_file = os.path.join(instance.partition_path, 'promise_worked_%d' % i)
lockfile = os.path.join(instance.partition_path, 'lock')
promise = textwrap.dedent("""\
#!/usr/bin/env sh
touch "%(worked_file)s"
if [ ! -f %(lockfile)s ]
then
touch "%(lockfile)s"
else
sleep 5
fi
exit 0""" % {
'worked_file': worked_file,
'lockfile': lockfile}
)
instance.setPromise('promise_%d' % i, promise)
self.assertEqual(self.grid.processComputerPartitionList(),
slapos.grid.slapgrid.SLAPGRID_PROMISE_FAIL)
self.assertEquals(instance.error, 1)
self.assertNotEqual(instance.state, 'started')
class TestSlapgridDestructionLock(MasterMixin, unittest.TestCase):
def test_retention_lock(self):
"""
Higher level test about actual retention (or no-retention) of instance
if specifying a retention lock delay.
"""
computer = ComputerForTest(self.software_root, self.instance_root)
with httmock.HTTMock(computer.request_handler):
instance = computer.instance_list[0]
instance.requested_state = 'started'
instance.filter_dict = {'retention_delay': 1.0 / (3600 * 24)}
self.grid.processComputerPartitionList()
dummy_instance_file_path = os.path.join(instance.partition_path, 'dummy')
with open(dummy_instance_file_path, 'w') as dummy_instance_file:
dummy_instance_file.write('dummy')
self.assertTrue(os.path.exists(os.path.join(
instance.partition_path,
slapos.grid.SlapObject.Partition.retention_lock_delay_filename
)))
instance.requested_state = 'destroyed'
self.grid.agregateAndSendUsage()
self.assertTrue(os.path.exists(dummy_instance_file_path))
self.assertTrue(os.path.exists(os.path.join(
instance.partition_path,
slapos.grid.SlapObject.Partition.retention_lock_date_filename
)))
self.grid.agregateAndSendUsage()
self.assertTrue(os.path.exists(dummy_instance_file_path))
time.sleep(1)
self.grid.agregateAndSendUsage()
self.assertFalse(os.path.exists(dummy_instance_file_path))
class TestSlapgridCPWithFirewall(MasterMixin, unittest.TestCase):
def setFirewallConfig(self, source_ip=""):
self.firewall_cmd_add = os.path.join(self._tempdir, 'firewall_cmd_add')
with open(self.firewall_cmd_add, 'w') as f:
f.write("""#!/bin/sh
var="$*"
R=$(echo $var | grep "query-rule") > /dev/null
if [ $? -eq 0 ]; then
echo "no"
exit 0
fi
R=$(echo $var | grep "add-rule")
if [ $? -eq 0 ]; then
echo "success"
exit 0
fi
echo "ERROR: $var"
exit 1
""")
self.firewall_cmd_remove = os.path.join(self._tempdir, 'firewall_cmd_remove')
with open(self.firewall_cmd_remove, 'w') as f:
f.write("""#!/bin/sh
var="$*"
R=$(echo $var | grep "query-rule")
if [ $? -eq 0 ]; then
echo "yes"
exit 0
fi
R=$(echo $var | grep "remove-rule")
if [ $? -eq 0 ]; then
echo "success"
exit 0
fi
echo "ERROR: $var"
exit 1
""")
os.chmod(self.firewall_cmd_add, 0755)
os.chmod(self.firewall_cmd_remove, 0755)
firewall_conf= dict(
authorized_sources=source_ip,
firewall_cmd=self.firewall_cmd_add,
firewall_executable='/bin/echo "service firewall started"',
reload_config_cmd='/bin/echo "Config reloaded."',
log_file='fw-log.log',
testing=True,
)
self.grid.firewall_conf = firewall_conf
def checkRuleFromIpSource(self, ip, accept_ip_list, cmd_list):
# XXX - rules for one ip contain 2*len(ip_address_list + accept_ip_list) rules ACCEPT and 4 rules REJECT
num_rules = len(self.ip_address_list) * 2 + len(accept_ip_list) * 2 + 4
self.assertEqual(len(cmd_list), num_rules)
base_cmd = '--permanent --direct --add-rule ipv4 filter'
# Check that there is REJECT rule on INPUT
rule = '%s INPUT 1000 -d %s -j REJECT' % (base_cmd, ip)
self.assertIn(rule, cmd_list)
# Check that there is REJECT rule on FORWARD
rule = '%s FORWARD 1000 -d %s -j REJECT' % (base_cmd, ip)
self.assertIn(rule, cmd_list)
# Check that there is REJECT rule on INPUT, ESTABLISHED,RELATED
rule = '%s INPUT 900 -d %s -m state --state ESTABLISHED,RELATED -j REJECT' % (base_cmd, ip)
self.assertIn(rule, cmd_list)
# Check that there is REJECT rule on FORWARD, ESTABLISHED,RELATED
rule = '%s FORWARD 900 -d %s -m state --state ESTABLISHED,RELATED -j REJECT' % (base_cmd, ip)
self.assertIn(rule, cmd_list)
# Check that there is INPUT ACCEPT on ip_list
for _, other_ip in self.ip_address_list:
rule = '%s INPUT 0 -s %s -d %s -j ACCEPT' % (base_cmd, other_ip, ip)
self.assertIn(rule, cmd_list)
rule = '%s FORWARD 0 -s %s -d %s -j ACCEPT' % (base_cmd, other_ip, ip)
self.assertIn(rule, cmd_list)
# Check that there is FORWARD ACCEPT on ip_list
for other_ip in accept_ip_list:
rule = '%s INPUT 0 -s %s -d %s -j ACCEPT' % (base_cmd, other_ip, ip)
self.assertIn(rule, cmd_list)
rule = '%s FORWARD 0 -s %s -d %s -j ACCEPT' % (base_cmd, other_ip, ip)
self.assertIn(rule, cmd_list)
def checkRuleFromIpSourceReject(self, ip, reject_ip_list, cmd_list):
# XXX - rules for one ip contain 2 + 2*len(ip_address_list) rules ACCEPT and 4*len(reject_ip_list) rules REJECT
num_rules = (len(self.ip_address_list) * 2) + (len(reject_ip_list) * 4)
self.assertEqual(len(cmd_list), num_rules)
base_cmd = '--permanent --direct --add-rule ipv4 filter'
# Check that there is ACCEPT rule on INPUT
#rule = '%s INPUT 0 -d %s -j ACCEPT' % (base_cmd, ip)
#self.assertIn(rule, cmd_list)
# Check that there is ACCEPT rule on FORWARD
#rule = '%s FORWARD 0 -d %s -j ACCEPT' % (base_cmd, ip)
#self.assertIn(rule, cmd_list)
# Check that there is INPUT/FORWARD ACCEPT on ip_list
for _, other_ip in self.ip_address_list:
rule = '%s INPUT 0 -s %s -d %s -j ACCEPT' % (base_cmd, other_ip, ip)
self.assertIn(rule, cmd_list)
rule = '%s FORWARD 0 -s %s -d %s -j ACCEPT' % (base_cmd, other_ip, ip)
self.assertIn(rule, cmd_list)
# Check that there is INPUT/FORWARD REJECT on ip_list
for other_ip in reject_ip_list:
rule = '%s INPUT 900 -s %s -d %s -j REJECT' % (base_cmd, other_ip, ip)
self.assertIn(rule, cmd_list)
rule = '%s FORWARD 900 -s %s -d %s -j REJECT' % (base_cmd, other_ip, ip)
self.assertIn(rule, cmd_list)
rule = '%s INPUT 800 -s %s -d %s -m state --state ESTABLISHED,RELATED -j REJECT' % (base_cmd, other_ip, ip)
self.assertIn(rule, cmd_list)
rule = '%s FORWARD 800 -s %s -d %s -m state --state ESTABLISHED,RELATED -j REJECT' % (base_cmd, other_ip, ip)
self.assertIn(rule, cmd_list)
def test_getFirewallRules(self):
computer = ComputerForTest(self.software_root, self.instance_root)
self.setFirewallConfig()
self.ip_address_list = computer.ip_address_list
ip = computer.instance_list[0].full_ip_list[0][1]
source_ip_list = ['10.32.0.15', '10.32.0.0/8']
cmd_list = self.grid._getFirewallAcceptRules(ip,
[elt[1] for elt in self.ip_address_list],
source_ip_list,
ip_type='ipv4')
self.checkRuleFromIpSource(ip, source_ip_list, cmd_list)
cmd_list = self.grid._getFirewallRejectRules(ip,
[elt[1] for elt in self.ip_address_list],
source_ip_list,
ip_type='ipv4')
self.checkRuleFromIpSourceReject(ip, source_ip_list, cmd_list)
def test_checkAddFirewallRules(self):
computer = ComputerForTest(self.software_root, self.instance_root)
self.setFirewallConfig()
# For simulate query rule success
self.grid.firewall_conf['firewall_cmd'] = self.firewall_cmd_add
self.ip_address_list = computer.ip_address_list
instance = computer.instance_list[0]
ip = instance.full_ip_list[0][1]
name = computer.instance_list[0].name
cmd_list = self.grid._getFirewallAcceptRules(ip,
[elt[1] for elt in self.ip_address_list],
[],
ip_type='ipv4')
self.grid._checkAddFirewallRules(name, cmd_list, add=True)
rules_path = os.path.join(
instance.partition_path,
slapos.grid.SlapObject.Partition.partition_firewall_rules_name
)
with open(rules_path, 'r') as frules:
rules_list = json.loads(frules.read())
self.checkRuleFromIpSource(ip, [], rules_list)
# Remove all rules
self.grid.firewall_conf['firewall_cmd'] = self.firewall_cmd_remove
self.grid._checkAddFirewallRules(name, cmd_list, add=False)
with open(rules_path, 'r') as frules:
rules_list = json.loads(frules.read())
self.assertEqual(rules_list, [])
# Add one more ip in the authorized list
self.grid.firewall_conf['firewall_cmd'] = self.firewall_cmd_add
self.ip_address_list.append(('interface1', '10.0.8.7'))
cmd_list = self.grid._getFirewallAcceptRules(ip,
[elt[1] for elt in self.ip_address_list],
[],
ip_type='ipv4')
self.grid._checkAddFirewallRules(name, cmd_list, add=True)
with open(rules_path, 'r') as frules:
rules_list = json.loads(frules.read())
self.checkRuleFromIpSource(ip, [], rules_list)
def test_partition_no_firewall(self):
computer = ComputerForTest(self.software_root, self.instance_root)
with httmock.HTTMock(computer.request_handler):
instance = computer.instance_list[0]
self.assertEqual(self.grid.processComputerPartitionList(),
slapgrid.SLAPGRID_SUCCESS)
self.assertFalse(os.path.exists(os.path.join(
instance.partition_path,
slapos.grid.SlapObject.Partition.partition_firewall_rules_name
)))
def test_partition_firewall_restrict(self):
computer = ComputerForTest(self.software_root, self.instance_root)
self.setFirewallConfig()
with httmock.HTTMock(computer.request_handler):
instance = computer.instance_list[0]
self.assertEqual(self.grid.processComputerPartitionList(), slapgrid.SLAPGRID_SUCCESS)
self.assertTrue(os.path.exists(os.path.join(
instance.partition_path,
slapos.grid.SlapObject.Partition.partition_firewall_rules_name
)))
rules_path = os.path.join(
instance.partition_path,
slapos.grid.SlapObject.Partition.partition_firewall_rules_name
)
self.ip_address_list = computer.ip_address_list
with open(rules_path, 'r') as frules:
rules_list = json.loads(frules.read())
ip = instance.full_ip_list[0][1]
self.checkRuleFromIpSource(ip, [], rules_list)
def test_partition_firewall(self):
computer = ComputerForTest(self.software_root, self.instance_root)
self.setFirewallConfig()
with httmock.HTTMock(computer.request_handler):
instance = computer.instance_list[0]
instance.filter_dict = {'fw_restricted_access': 'off'}
self.assertEqual(self.grid.processComputerPartitionList(), slapgrid.SLAPGRID_SUCCESS)
self.assertTrue(os.path.exists(os.path.join(
instance.partition_path,
slapos.grid.SlapObject.Partition.partition_firewall_rules_name
)))
rules_path = os.path.join(
instance.partition_path,
slapos.grid.SlapObject.Partition.partition_firewall_rules_name
)
self.ip_address_list = computer.ip_address_list
with open(rules_path, 'r') as frules:
rules_list = json.loads(frules.read())
ip = instance.full_ip_list[0][1]
self.checkRuleFromIpSourceReject(ip, [], rules_list)
@unittest.skip('Always fail: instance.filter_dict can\'t change')
def test_partition_firewall_restricted_access_change(self):
computer = ComputerForTest(self.software_root, self.instance_root)
self.setFirewallConfig()
with httmock.HTTMock(computer.request_handler):
instance = computer.instance_list[0]
instance.filter_dict = {'fw_restricted_access': 'off',
'fw_rejected_sources': '10.0.8.11'}
self.assertEqual(self.grid.processComputerPartitionList(), slapgrid.SLAPGRID_SUCCESS)
self.assertTrue(os.path.exists(os.path.join(
instance.partition_path,
slapos.grid.SlapObject.Partition.partition_firewall_rules_name
)))
rules_path = os.path.join(
instance.partition_path,
slapos.grid.SlapObject.Partition.partition_firewall_rules_name
)
self.ip_address_list = computer.ip_address_list
with open(rules_path, 'r') as frules:
rules_list = json.loads(frules.read())
ip = instance.full_ip_list[0][1]
self.checkRuleFromIpSourceReject(ip, ['10.0.8.11'], rules_list)
# For remove rules
self.grid.firewall_conf['firewall_cmd'] = self.firewall_cmd_remove
instance.setFilterParameter({'fw_restricted_access': 'on',
'fw_authorized_sources': ''})
self.assertEqual(self.grid.processComputerPartitionList(), slapgrid.SLAPGRID_SUCCESS)
with open(rules_path, 'r') as frules:
rules_list = json.loads(frules.read())
self.checkRuleFromIpSource(ip, [], rules_list)
def test_partition_firewall_ipsource_accept(self):
computer = ComputerForTest(self.software_root, self.instance_root)
self.setFirewallConfig()
source_ip = ['10.0.8.10', '10.0.8.11']
self.grid.firewall_conf['authorized_sources'] = [source_ip[0]]
with httmock.HTTMock(computer.request_handler):
instance = computer.instance_list[0]
instance.filter_dict = {'fw_restricted_access': 'on',
'fw_authorized_sources': source_ip[1]}
self.assertEqual(self.grid.processComputerPartitionList(), slapgrid.SLAPGRID_SUCCESS)
self.assertTrue(os.path.exists(os.path.join(
instance.partition_path,
slapos.grid.SlapObject.Partition.partition_firewall_rules_name
)))
rules_path = os.path.join(
instance.partition_path,
slapos.grid.SlapObject.Partition.partition_firewall_rules_name
)
rules_list= []
self.ip_address_list = computer.ip_address_list
ip = instance.full_ip_list[0][1]
base_cmd = '--permanent --direct --add-rule ipv4 filter'
with open(rules_path, 'r') as frules:
rules_list = json.loads(frules.read())
for thier_ip in source_ip:
rule_input = '%s INPUT 0 -s %s -d %s -j ACCEPT' % (base_cmd, thier_ip, ip)
self.assertIn(rule_input, rules_list)
rule_fwd = '%s FORWARD 0 -s %s -d %s -j ACCEPT' % (base_cmd, thier_ip, ip)
self.assertIn(rule_fwd, rules_list)
self.checkRuleFromIpSource(ip, source_ip, rules_list)
def test_partition_firewall_ipsource_reject(self):
computer = ComputerForTest(self.software_root, self.instance_root)
self.setFirewallConfig()
source_ip = '10.0.8.10'
self.grid.firewall_conf['authorized_sources'] = ['10.0.8.15']
with httmock.HTTMock(computer.request_handler):
instance = computer.instance_list[0]
instance.filter_dict = {'fw_rejected_sources': source_ip,
'fw_restricted_access': 'off'}
self.assertEqual(self.grid.processComputerPartitionList(), slapgrid.SLAPGRID_SUCCESS)
self.assertTrue(os.path.exists(os.path.join(
instance.partition_path,
slapos.grid.SlapObject.Partition.partition_firewall_rules_name
)))
rules_path = os.path.join(
instance.partition_path,
slapos.grid.SlapObject.Partition.partition_firewall_rules_name
)
rules_list= []
self.ip_address_list = computer.ip_address_list
self.ip_address_list.append(('iface', '10.0.8.15'))
ip = instance.full_ip_list[0][1]
base_cmd = '--permanent --direct --add-rule ipv4 filter'
with open(rules_path, 'r') as frules:
rules_list = json.loads(frules.read())
self.checkRuleFromIpSourceReject(ip, source_ip.split(' '), rules_list)
def test_partition_firewall_ip_change(self):
computer = ComputerForTest(self.software_root, self.instance_root)
self.setFirewallConfig()
source_ip = ['10.0.8.10', '10.0.8.11']
self.grid.firewall_conf['authorized_sources'] = [source_ip[0]]
with httmock.HTTMock(computer.request_handler):
instance = computer.instance_list[0]
instance.filter_dict = {'fw_restricted_access': 'on',
'fw_authorized_sources': source_ip[1]}
self.assertEqual(self.grid.processComputerPartitionList(), slapgrid.SLAPGRID_SUCCESS)
self.assertTrue(os.path.exists(os.path.join(
instance.partition_path,
slapos.grid.SlapObject.Partition.partition_firewall_rules_name
)))
rules_path = os.path.join(
instance.partition_path,
slapos.grid.SlapObject.Partition.partition_firewall_rules_name
)
rules_list= []
self.ip_address_list = computer.ip_address_list
ip = instance.full_ip_list[0][1]
with open(rules_path, 'r') as frules:
rules_list = json.loads(frules.read())
self.checkRuleFromIpSource(ip, source_ip, rules_list)
instance = computer.instance_list[0]
# XXX -- removed
#instance.filter_dict = {'fw_restricted_access': 'on',
# 'fw_authorized_sources': source_ip[0]}
# For simulate query rule exist
self.grid.firewall_conf['firewall_cmd'] = self.firewall_cmd_remove
self.grid.firewall_conf['authorized_sources'] = []
computer.ip_address_list.append(('route_interface1', '10.10.8.4'))
self.assertEqual(self.grid.processComputerPartitionList(), slapgrid.SLAPGRID_SUCCESS)
self.ip_address_list = computer.ip_address_list
with open(rules_path, 'r') as frules:
rules_list = json.loads(frules.read())
self.checkRuleFromIpSource(ip, [source_ip[1]], rules_list)
class TestSlapgridCPWithTransaction(MasterMixin, unittest.TestCase):
def test_one_partition(self):
computer = ComputerForTest(self.software_root, self.instance_root)
with httmock.HTTMock(computer.request_handler):
instance = computer.instance_list[0]
partition = os.path.join(self.instance_root, '0')
request_list_file = os.path.join(partition,
COMPUTER_PARTITION_REQUEST_LIST_TEMPLATE_FILENAME % instance.name)
with open(request_list_file, 'w') as f:
f.write('some partition')
self.assertEqual(self.grid.processComputerPartitionList(), slapgrid.SLAPGRID_SUCCESS)
self.assertInstanceDirectoryListEqual(['0'])
self.assertFalse(os.path.exists(request_list_file))
class TestSlapgridReportWithPreDeleteScript(MasterMixin, unittest.TestCase):
prerm_script_content = """#!/bin/sh
echo "Running prerm script for this partition..."
touch etc/prerm.txt
for i in {1..2}
do
echo "sleeping for 1s..."
sleep 1
done
echo "finished prerm script."
rm etc/prerm.txt
exit 0
"""
def _wait_prerm_script_finished(self, base_path):
check_file = os.path.join(base_path, 'etc/prerm.txt')
limit = 10
count = 0
time.sleep(1)
while (count < limit) and os.path.exists(check_file):
time.sleep(1)
count += 1
def test_partition_destroy_with_pre_remove_service(self):
computer = ComputerForTest(self.software_root, self.instance_root)
with httmock.HTTMock(computer.request_handler):
partition = computer.instance_list[0]
pre_delete_dir = os.path.join(partition.partition_path, 'etc/prerm')
pre_delete_script = os.path.join(pre_delete_dir, 'slapos_pre_delete')
partition.requested_state = 'started'
partition.software.setBuildout(WRAPPER_CONTENT)
self.assertEqual(self.grid.processComputerPartitionList(), slapgrid.SLAPGRID_SUCCESS)
os.makedirs(pre_delete_dir, 0o700)
with open(pre_delete_script, 'w') as f:
f.write(self.prerm_script_content)
os.chmod(pre_delete_script, 0754)
self.assertInstanceDirectoryListEqual(['0'])
self.assertItemsEqual(os.listdir(partition.partition_path),
['.slapgrid', '.0_wrapper.log', 'buildout.cfg',
'etc', 'software_release', 'worked', '.slapos-retention-lock-delay'])
self.assertEqual(computer.sequence,
['/getFullComputerInformation', '/availableComputerPartition',
'/startedComputerPartition'])
self.assertEqual(partition.state, 'started')
manager_list = slapmanager.from_config({'manager_list': 'prerm'})
self.grid._manager_list = manager_list
partition.requested_state = 'destroyed'
self.assertEqual(self.grid.agregateAndSendUsage(), slapgrid.SLAPGRID_SUCCESS)
# Assert partition directory is not destroyed (pre-delete is running)
self.assertInstanceDirectoryListEqual(['0'])
self.assertItemsEqual(os.listdir(partition.partition_path),
['.slapgrid', '.0_wrapper.log', 'buildout.cfg',
'etc', 'software_release', 'worked', '.slapos-retention-lock-delay',
'.0-prerm_slapos_pre_delete.log', '.slapos-report-wait-service-list',
'.slapos-request-transaction-0'])
self.assertItemsEqual(os.listdir(self.software_root),
[partition.software.software_hash])
# wait until the pre-delete script is finished
self._wait_prerm_script_finished(partition.partition_path)
self.assertEqual(self.grid.agregateAndSendUsage(), slapgrid.SLAPGRID_SUCCESS)
# Assert partition directory is empty
self.assertInstanceDirectoryListEqual(['0'])
self.assertItemsEqual(os.listdir(partition.partition_path), [])
def test_partition_destroy_pre_remove_with_retention_lock(self):
computer = ComputerForTest(self.software_root, self.instance_root)
with httmock.HTTMock(computer.request_handler):
partition = computer.instance_list[0]
pre_delete_dir = os.path.join(partition.partition_path, 'etc/prerm')
pre_delete_script = os.path.join(pre_delete_dir, 'slapos_pre_delete')
partition.requested_state = 'started'
partition.filter_dict = {'retention_delay': 1.0 / (3600 * 24)}
self.assertEqual(self.grid.processComputerPartitionList(), slapgrid.SLAPGRID_SUCCESS)
self.assertTrue(os.path.exists(os.path.join(
partition.partition_path,
slapos.grid.SlapObject.Partition.retention_lock_delay_filename
)))
os.makedirs(pre_delete_dir, 0o700)
with open(pre_delete_script, 'w') as f:
f.write(self.prerm_script_content)
os.chmod(pre_delete_script, 0754)
self.assertTrue(os.path.exists(pre_delete_script))
manager_list = slapmanager.from_config({'manager_list': 'prerm'})
self.grid._manager_list = manager_list
partition.requested_state = 'destroyed'
self.assertEqual(self.grid.agregateAndSendUsage(), slapgrid.SLAPGRID_SUCCESS)
# Assert partition directory is not destroyed (retention-delay-lock)
self.assertItemsEqual(os.listdir(partition.partition_path),
['.slapgrid', 'buildout.cfg', 'etc', 'software_release',
'worked', '.slapos-retention-lock-delay',
'.slapos-retention-lock-date', '.slapos-request-transaction-0'])
self.assertTrue(os.path.exists(pre_delete_script))
self.assertTrue(os.path.exists(os.path.join(
partition.partition_path,
slapos.grid.SlapObject.Partition.retention_lock_date_filename
)))
time.sleep(1)
self.assertEqual(self.grid.agregateAndSendUsage(), slapgrid.SLAPGRID_SUCCESS)
# Assert partition directory is not destroyed (pre-delete is running)
self.assertItemsEqual(os.listdir(partition.partition_path),
['.slapgrid', 'buildout.cfg', 'etc', 'software_release',
'worked', '.slapos-retention-lock-delay', '.slapos-retention-lock-date',
'.0-prerm_slapos_pre_delete.log', '.slapos-report-wait-service-list',
'.slapos-request-transaction-0'])
# wait until the pre-delete script is finished
self._wait_prerm_script_finished(partition.partition_path)
self.assertEqual(self.grid.agregateAndSendUsage(), slapgrid.SLAPGRID_SUCCESS)
# Assert partition directory is empty
self.assertItemsEqual(os.listdir(partition.partition_path), [])
def test_partition_destroy_pre_remove_script_not_stopped(self):
computer = ComputerForTest(self.software_root, self.instance_root)
with httmock.HTTMock(computer.request_handler):
partition = computer.instance_list[0]
pre_delete_dir = os.path.join(partition.partition_path, 'etc/prerm')
pre_delete_script = os.path.join(pre_delete_dir, 'slapos_pre_delete')
partition.requested_state = 'started'
self.assertEqual(self.grid.processComputerPartitionList(), slapgrid.SLAPGRID_SUCCESS)
os.makedirs(pre_delete_dir, 0o700)
with open(pre_delete_script, 'w') as f:
f.write(self.prerm_script_content)
os.chmod(pre_delete_script, 0754)
self.assertEqual(partition.state, 'started')
manager_list = slapmanager.from_config({'manager_list': 'prerm'})
self.grid._manager_list = manager_list
partition.requested_state = 'destroyed'
self.assertEqual(self.grid.agregateAndSendUsage(), slapgrid.SLAPGRID_SUCCESS)
# Assert partition directory is not destroyed (pre-delete is running)
self.assertItemsEqual(os.listdir(partition.partition_path),
['.slapgrid', 'buildout.cfg', 'etc', 'software_release',
'worked', '.slapos-retention-lock-delay', '.slapos-request-transaction-0',
'.0-prerm_slapos_pre_delete.log', '.slapos-report-wait-service-list'])
# wait until the pre-delete script is finished
self._wait_prerm_script_finished(partition.partition_path)
with open(os.path.join(partition.partition_path, '.0-prerm_slapos_pre_delete.log')) as f:
# the script is well finished...
self.assertTrue("finished prerm script." in f.read())
self.assertEqual(self.grid.agregateAndSendUsage(), slapgrid.SLAPGRID_SUCCESS)
# Assert partition directory is empty
self.assertInstanceDirectoryListEqual(['0'])
self.assertItemsEqual(os.listdir(partition.partition_path), [])
def test_partition_destroy_pre_remove_script_run_as_partition_user(self):
computer = ComputerForTest(self.software_root, self.instance_root)
with httmock.HTTMock(computer.request_handler):
partition = computer.instance_list[0]
pre_delete_dir = os.path.join(partition.partition_path, 'etc/prerm')
pre_delete_script = os.path.join(pre_delete_dir, 'slapos_pre_delete')
partition.requested_state = 'started'
self.assertEqual(self.grid.processComputerPartitionList(), slapgrid.SLAPGRID_SUCCESS)
os.makedirs(pre_delete_dir, 0o700)
with open(pre_delete_script, 'w') as f:
f.write(self.prerm_script_content)
os.chmod(pre_delete_script, 0754)
manager_list = slapmanager.from_config({'manager_list': 'prerm'})
self.grid._manager_list = manager_list
partition.requested_state = 'destroyed'
self.assertEqual(self.grid.agregateAndSendUsage(), slapgrid.SLAPGRID_SUCCESS)
# Assert partition directory is not destroyed (pre-delete is running)
self.assertItemsEqual(os.listdir(partition.partition_path),
['.slapgrid', 'buildout.cfg', 'etc', 'software_release',
'worked', '.slapos-retention-lock-delay', '.slapos-request-transaction-0',
'.0-prerm_slapos_pre_delete.log', '.slapos-report-wait-service-list'])
stat_info = os.stat(partition.partition_path)
uid = stat_info.st_uid
gid = stat_info.st_gid
supervisor_conf_file = os.path.join(self.instance_root,
'etc/supervisord.conf.d',
'%s.conf' % partition.name)
self.assertTrue(os.path.exists(supervisor_conf_file))
regex_user = r"user=(\d+)"
regex_group = r"group=(\d+)"
with open(supervisor_conf_file) as f:
config = f.read()
# search user uid in conf file
result = re.search(regex_user, config, re.DOTALL)
self.assertTrue(result is not None)
self.assertEqual(int(result.groups()[0]), uid)
# search user group gid in conf file
result = re.search(regex_group, config, re.DOTALL)
self.assertTrue(result is not None)
self.assertEqual(int(result.groups()[0]), gid)
# wait until the pre-delete script is finished
self._wait_prerm_script_finished(partition.partition_path)
self.assertEqual(self.grid.agregateAndSendUsage(), slapgrid.SLAPGRID_SUCCESS)
# Assert partition directory is empty
self.assertInstanceDirectoryListEqual(['0'])
self.assertItemsEqual(os.listdir(partition.partition_path), [])
slapos.core-15dedb62a3565373c77977e5ebd2e56b4ffe341a-slapos-tests/slapos/tests/slapmock/ 0000775 0000000 0000000 00000000000 13232036066 0027754 5 ustar 00root root 0000000 0000000 slapos.core-15dedb62a3565373c77977e5ebd2e56b4ffe341a-slapos-tests/slapos/tests/slapmock/__init__.py 0000664 0000000 0000000 00000000000 13232036066 0032053 0 ustar 00root root 0000000 0000000 slapos.core-15dedb62a3565373c77977e5ebd2e56b4ffe341a-slapos-tests/slapos/tests/slapmock/requests.py 0000664 0000000 0000000 00000000216 13232036066 0032200 0 ustar 00root root 0000000 0000000 # -*- coding: utf-8 -*-
def response_ok(url, request):
return {
'status_code': 200,
'content': ''
}
slapos.core-15dedb62a3565373c77977e5ebd2e56b4ffe341a-slapos-tests/slapos/tests/slapobject.py 0000664 0000000 0000000 00000042674 13232036066 0030660 0 ustar 00root root 0000000 0000000 ##############################################################################
#
# Copyright (c) 2010 Vifib SARL and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import logging
import os
import time
import unittest
from slapos.slap import ComputerPartition as SlapComputerPartition
from slapos.grid.SlapObject import Partition, Software
from slapos.grid import utils
from slapos.grid import networkcache
# XXX: BasicMixin should be in a separated module, not in slapgrid test module.
from slapos.tests.slapgrid import BasicMixin
# Mockup
# XXX: Ambiguous name
# XXX: Factor with common SlapOS tests
class FakeCallAndStore(object):
"""
Used to check if the mocked method has been called.
"""
def __init__(self):
self.called = False
def __call__(self, *args, **kwargs):
self.called = True
class FakeCallAndNoop(object):
"""
Used to no-op a method.
"""
def __call__(self, *args, **kwargs):
pass
# XXX: change name and behavior to be more generic and factor with other tests
class FakeNetworkCacheCallAndRead(object):
"""
Short-circuit normal calls to slapos buildout helpers, get and store
'additional_buildout_parameter_list' for future analysis.
"""
def __init__(self):
self.external_command_list = []
def __call__(self, *args, **kwargs):
additional_buildout_parameter_list = \
kwargs.get('additional_buildout_parameter_list')
self.external_command_list.extend(additional_buildout_parameter_list)
# Backup modules
original_install_from_buildout = Software._install_from_buildout
original_upload_network_cached = networkcache.upload_network_cached
originalBootstrapBuildout = utils.bootstrapBuildout
originalLaunchBuildout = utils.launchBuildout
originalUploadSoftwareRelease = Software.uploadSoftwareRelease
originalPartitionGenerateSupervisorConfigurationFile = Partition.generateSupervisorConfigurationFile
class MasterMixin(BasicMixin, unittest.TestCase):
"""
Master Mixin of slapobject test classes.
"""
def setUp(self):
BasicMixin.setUp(self)
os.mkdir(self.software_root)
os.mkdir(self.instance_root)
def tearDown(self):
BasicMixin.tearDown(self)
# Un-monkey patch possible modules
global originalBootstrapBuildout
global originalLaunchBuildout
utils.bootstrapBuildout = originalBootstrapBuildout
utils.launchBuildout = originalLaunchBuildout
# Helper functions
def createSoftware(self, url=None, empty=False):
"""
Create an empty software, and return a Software object from
dummy parameters.
"""
if url is None:
url = 'mysoftware'
software_path = os.path.join(self.software_root, utils.md5digest(url))
os.mkdir(software_path)
if not empty:
# Populate the Software Release directory so that it is "complete" and
# "working" from a slapos point of view.
open(os.path.join(software_path, 'instance.cfg'), 'w').close()
return Software(
url=url,
software_root=self.software_root,
buildout=self.buildout,
logger=logging.getLogger(),
)
def createPartition(
self,
software_release_url,
partition_id=None,
slap_computer_partition=None,
retention_delay=None,
):
"""
Create a partition, and return a Partition object created
from dummy parameters.
"""
# XXX dirty, should disappear when Partition is cleaned up
software_path = os.path.join(
self.software_root,
utils.md5digest(software_release_url)
)
if partition_id is None:
partition_id = 'mypartition'
if slap_computer_partition is None:
slap_computer_partition = SlapComputerPartition(
computer_id='bidon',
partition_id=partition_id)
instance_path = os.path.join(self.instance_root, partition_id)
os.mkdir(instance_path)
os.chmod(instance_path, 0o750)
supervisor_configuration_path = os.path.join(
self.instance_root, 'supervisor')
os.mkdir(supervisor_configuration_path)
partition = Partition(
software_path=software_path,
instance_path=instance_path,
supervisord_partition_configuration_path=os.path.join(
supervisor_configuration_path, partition_id),
supervisord_socket=os.path.join(
supervisor_configuration_path, 'supervisor.sock'),
computer_partition=slap_computer_partition,
computer_id='bidon',
partition_id=partition_id,
server_url='bidon',
software_release_url=software_release_url,
buildout=self.buildout,
logger=logging.getLogger(),
)
partition.updateSupervisor = FakeCallAndNoop
if retention_delay:
partition.retention_delay = retention_delay
return partition
class TestSoftwareNetworkCacheSlapObject(MasterMixin, unittest.TestCase):
"""
Test for Network Cache related features in Software class.
"""
def setUp(self):
MasterMixin.setUp(self)
self.fakeCallAndRead = FakeNetworkCacheCallAndRead()
utils.bootstrapBuildout = self.fakeCallAndRead
utils.launchBuildout = self.fakeCallAndRead
self.signature_private_key_file = '/signature/private/key_file'
self.upload_cache_url = 'http://example.com/uploadcache'
self.upload_dir_url = 'http://example.com/uploaddir'
self.shacache_ca_file = '/path/to/shacache/ca/file'
self.shacache_cert_file = '/path/to/shacache/cert/file'
self.shacache_key_file = '/path/to/shacache/key/file'
self.shadir_ca_file = '/path/to/shadir/ca/file'
self.shadir_cert_file = '/path/to/shadir/cert/file'
self.shadir_key_file = '/path/to/shadir/key/file'
def tearDown(self):
MasterMixin.tearDown(self)
Software._install_from_buildout = original_install_from_buildout
networkcache.upload_network_cached = original_upload_network_cached
Software.uploadSoftwareRelease = originalUploadSoftwareRelease
# Test methods
def test_software_install_with_networkcache(self):
"""
Check if the networkcache parameters are propagated.
"""
software = Software(
url='http://example.com/software.cfg',
software_root=self.software_root,
buildout=self.buildout,
logger=logging.getLogger(),
signature_private_key_file='/signature/private/key_file',
upload_cache_url='http://example.com/uploadcache',
upload_dir_url='http://example.com/uploaddir',
shacache_ca_file=self.shacache_ca_file,
shacache_cert_file=self.shacache_cert_file,
shacache_key_file=self.shacache_key_file,
shadir_ca_file=self.shadir_ca_file,
shadir_cert_file=self.shadir_cert_file,
shadir_key_file=self.shadir_key_file)
software.install()
command_list = self.fakeCallAndRead.external_command_list
self.assertIn('buildout:networkcache-section=networkcache', command_list)
self.assertIn('networkcache:signature-private-key-file=%s' % self.signature_private_key_file, command_list)
self.assertIn('networkcache:upload-cache-url=%s' % self.upload_cache_url, command_list)
self.assertIn('networkcache:upload-dir-url=%s' % self.upload_dir_url, command_list)
self.assertIn('networkcache:shacache-ca-file=%s' % self.shacache_ca_file, command_list)
self.assertIn('networkcache:shacache-cert-file=%s' % self.shacache_cert_file, command_list)
self.assertIn('networkcache:shacache-key-file=%s' % self.shacache_key_file, command_list)
self.assertIn('networkcache:shadir-ca-file=%s' % self.shadir_ca_file, command_list)
self.assertIn('networkcache:shadir-cert-file=%s' % self.shadir_cert_file, command_list)
self.assertIn('networkcache:shadir-key-file=%s' % self.shadir_key_file, command_list)
def test_software_install_without_networkcache(self):
"""
Check if the networkcache parameters are not propagated if they are not
available.
"""
software = Software(url='http://example.com/software.cfg',
software_root=self.software_root,
buildout=self.buildout,
logger=logging.getLogger())
software.install()
command_list = self.fakeCallAndRead.external_command_list
self.assertNotIn('buildout:networkcache-section=networkcache', command_list)
self.assertNotIn('networkcache:signature-private-key-file=%s' %
self.signature_private_key_file,
command_list)
self.assertNotIn('networkcache:upload-cache-url=%s' % self.upload_cache_url,
command_list)
self.assertNotIn('networkcache:upload-dir-url=%s' % self.upload_dir_url,
command_list)
# XXX-Cedric: do the same with upload
def test_software_install_networkcache_upload_blacklist(self):
"""
Check if the networkcache upload blacklist parameters are propagated.
"""
def fakeBuildout(*args, **kw):
pass
Software._install_from_buildout = fakeBuildout
def fake_upload_network_cached(*args, **kw):
self.assertFalse(True)
networkcache.upload_network_cached = fake_upload_network_cached
upload_to_binary_cache_url_blacklist = ["http://example.com"]
software = Software(
url='http://example.com/software.cfg',
software_root=self.software_root,
buildout=self.buildout,
logger=logging.getLogger(),
signature_private_key_file='/signature/private/key_file',
upload_cache_url='http://example.com/uploadcache',
upload_dir_url='http://example.com/uploaddir',
shacache_ca_file=self.shacache_ca_file,
shacache_cert_file=self.shacache_cert_file,
shacache_key_file=self.shacache_key_file,
shadir_ca_file=self.shadir_ca_file,
shadir_cert_file=self.shadir_cert_file,
shadir_key_file=self.shadir_key_file,
upload_to_binary_cache_url_blacklist=
upload_to_binary_cache_url_blacklist,
)
software.install()
def test_software_install_networkcache_upload_blacklist_side_effect(self):
"""
Check if the networkcache upload blacklist parameters only prevent
blacklisted Software Release to be uploaded.
"""
def fakeBuildout(*args, **kw):
pass
Software._install_from_buildout = fakeBuildout
def fakeUploadSoftwareRelease(*args, **kw):
self.uploaded = True
Software.uploadSoftwareRelease = fakeUploadSoftwareRelease
upload_to_binary_cache_url_blacklist = ["http://anotherexample.com"]
software = Software(
url='http://example.com/software.cfg',
software_root=self.software_root,
buildout=self.buildout,
logger=logging.getLogger(),
signature_private_key_file='/signature/private/key_file',
upload_cache_url='http://example.com/uploadcache',
upload_dir_url='http://example.com/uploaddir',
upload_binary_cache_url='http://example.com/uploadcache',
upload_binary_dir_url='http://example.com/uploaddir',
shacache_ca_file=self.shacache_ca_file,
shacache_cert_file=self.shacache_cert_file,
shacache_key_file=self.shacache_key_file,
shadir_ca_file=self.shadir_ca_file,
shadir_cert_file=self.shadir_cert_file,
shadir_key_file=self.shadir_key_file,
upload_to_binary_cache_url_blacklist=
upload_to_binary_cache_url_blacklist,
)
software.install()
self.assertTrue(getattr(self, 'uploaded', False))
class TestPartitionSlapObject(MasterMixin, unittest.TestCase):
def setUp(self):
MasterMixin.setUp(self)
Partition.generateSupervisorConfigurationFile = FakeCallAndNoop()
utils.bootstrapBuildout = FakeCallAndNoop()
utils.launchBuildout = FakeCallAndStore()
def tearDown(self):
MasterMixin.tearDown(self)
Partition.generateSupervisorConfigurationFile = originalPartitionGenerateSupervisorConfigurationFile
def test_instance_is_deploying_if_software_release_exists(self):
"""
Test that slapgrid deploys an instance if its Software Release exists and
instance.cfg in the Software Release exists.
"""
software = self.createSoftware()
partition = self.createPartition(software.url)
partition.install()
self.assertTrue(utils.launchBuildout.called)
def test_backward_compatibility_instance_is_deploying_if_template_cfg_is_used(self):
"""
Backward compatibility test, for old software releases.
Test that slapgrid deploys an instance if its Software Release exists and
template.cfg in the Software Release exists.
"""
software = self.createSoftware(empty=True)
open(os.path.join(software.software_path, 'template.cfg'), 'w').close()
partition = self.createPartition(software.url)
partition.install()
self.assertTrue(utils.launchBuildout.called)
def test_instance_slapgrid_raise_if_software_release_instance_profile_does_not_exist(self):
"""
Test that slapgrid raises XXX when deploying an instance if the Software Release
related to the instance is not correctly installed (i.e there is no
instance.cfg in it).
"""
software = self.createSoftware(empty=True)
partition = self.createPartition(software.url)
# XXX: What should it raise?
self.assertRaises(IOError, partition.install)
def test_instance_slapgrid_raise_if_software_release_does_not_exist(self):
"""
Test that slapgrid raises XXX when deploying an instance if the Software Release
related to the instance is not present at all (i.e its directory does not
exist at all).
"""
software = self.createSoftware(empty=True)
os.rmdir(software.software_path)
partition = self.createPartition(software.url)
# XXX: What should it raise?
self.assertRaises(IOError, partition.install)
class TestPartitionDestructionLock(MasterMixin, unittest.TestCase):
def setUp(self):
MasterMixin.setUp(self)
Partition.generateSupervisorConfigurationFile = FakeCallAndNoop()
utils.bootstrapBuildout = FakeCallAndNoop()
utils.launchBuildout = FakeCallAndStore()
def test_retention_lock_delay_creation(self):
delay = 42
software = self.createSoftware()
partition = self.createPartition(software.url, retention_delay=delay)
partition.install()
deployed_delay = int(open(partition.retention_lock_delay_file_path).read())
self.assertEqual(delay, deployed_delay)
def test_no_retention_lock_delay(self):
software = self.createSoftware()
partition = self.createPartition(software.url)
partition.install()
delay = open(partition.retention_lock_delay_file_path).read()
self.assertTrue(delay, '0')
self.assertTrue(partition.destroy())
def test_retention_lock_delay_does_not_change(self):
delay = 42
software = self.createSoftware()
partition = self.createPartition(software.url, retention_delay=delay)
partition.install()
partition.retention_delay = 23
# install/destroy many times
partition.install()
partition.destroy()
partition.destroy()
partition.install()
partition.destroy()
deployed_delay = int(open(partition.retention_lock_delay_file_path).read())
self.assertEqual(delay, deployed_delay)
def test_retention_lock_delay_is_respected(self):
delay = 2.0 / (3600 * 24)
software = self.createSoftware()
partition = self.createPartition(software.url, retention_delay=delay)
partition.install()
deployed_delay = float(open(partition.retention_lock_delay_file_path).read())
self.assertEqual(int(delay), int(deployed_delay))
self.assertFalse(partition.destroy())
time.sleep(1)
self.assertFalse(partition.destroy())
time.sleep(1)
self.assertTrue(partition.destroy())
def test_retention_lock_date_creation(self):
delay = 42
software = self.createSoftware()
partition = self.createPartition(software.url, retention_delay=delay)
partition.install()
self.assertFalse(os.path.exists(partition.retention_lock_date_file_path))
partition.destroy()
deployed_date = float(open(partition.retention_lock_date_file_path).read())
self.assertEqual(delay * 3600 * 24 + int(time.time()), int(deployed_date))
def test_retention_lock_date_does_not_change(self):
delay = 42
software = self.createSoftware()
partition = self.createPartition(software.url, retention_delay=delay)
now = time.time()
partition.install()
partition.destroy()
partition.retention_delay = 23
# install/destroy many times
partition.install()
partition.destroy()
partition.destroy()
partition.install()
partition.destroy()
deployed_date = float(open(partition.retention_lock_date_file_path).read())
self.assertEqual(delay * 3600 * 24 + int(now), int(deployed_date))
slapos.core-15dedb62a3565373c77977e5ebd2e56b4ffe341a-slapos-tests/slapos/tests/slapproxy/ 0000775 0000000 0000000 00000000000 13232036066 0030204 5 ustar 00root root 0000000 0000000 slapos.core-15dedb62a3565373c77977e5ebd2e56b4ffe341a-slapos-tests/slapos/tests/slapproxy/__init__.py0000664 0000000 0000000 00000165256 13232036066 0032334 0 ustar 00root root 0000000 0000000 # -*- coding: utf-8 -*-
# vim: set et sts=2:
##############################################################################
#
# Copyright (c) 2012, 2013, 2014 Vifib SARL and Contributors.
# All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly advised to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import ConfigParser
import os
import logging
import shutil
import socket
import subprocess
import sys
import tempfile
import time
import unittest
import xml_marshaller
from xml_marshaller.xml_marshaller import loads, dumps
import slapos.proxy
import slapos.proxy.views as views
import slapos.slap
import slapos.slap.slap
from slapos.util import sqlite_connect
import sqlite3
import pkg_resources
class WrongFormat(Exception):
pass
class ProxyOption(object):
"""
Will simulate options given to slapproxy
"""
def __init__(self, proxy_db):
self.verbose = True
self.database_uri = proxy_db
self.console = False
self.log_file = None
class BasicMixin(object):
def setUp(self):
"""
Will set files and start slapproxy
"""
self._tempdir = tempfile.mkdtemp()
logging.basicConfig(level=logging.DEBUG)
self.setFiles()
self.startProxy()
def createSlapOSConfigurationFile(self):
open(self.slapos_cfg, 'w').write("""[slapos]
software_root = %(tempdir)s/opt/slapgrid
instance_root = %(tempdir)s/srv/slapgrid
master_url = %(proxyaddr)s
computer_id = computer
[slapproxy]
host = 127.0.0.1
port = 8080
database_uri = %(tempdir)s/lib/proxy.db
""" % {'tempdir': self._tempdir, 'proxyaddr': self.proxyaddr})
def setFiles(self):
"""
Set environment to run slapproxy
"""
self.slapos_cfg = os.path.join(self._tempdir, 'slapos.cfg')
self.proxy_db = os.path.join(self._tempdir, 'lib', 'proxy.db')
self.proxyaddr = 'http://localhost:80/'
self.computer_id = 'computer'
self.createSlapOSConfigurationFile()
for directory in ['opt', 'srv', 'lib']:
path = os.path.join(self._tempdir, directory)
os.mkdir(path)
def startProxy(self):
"""
Set config for slapproxy and start it
"""
conf = slapos.proxy.ProxyConfig(logger=logging.getLogger())
configp = ConfigParser.SafeConfigParser()
configp.read(self.slapos_cfg)
conf.mergeConfig(ProxyOption(self.proxy_db), configp)
conf.setConfig()
views.app.config['TESTING'] = True
slapos.proxy.setupFlaskConfiguration(conf)
self.app_config = views.app.config
self.app = views.app.test_client()
def add_free_partition(self, partition_amount, computer_id=None):
"""
Will simulate a slapformat first run
and create "partition_amount" partitions
"""
if not computer_id:
computer_id = self.computer_id
computer_dict = {
'reference': computer_id,
'address': '123.456.789',
'netmask': 'fffffffff',
'partition_list': [],
}
for i in range(partition_amount):
partition_example = {
'reference': 'slappart%s' % i,
'address_list': [
{'addr': '1.2.3.4', 'netmask': '255.255.255.255'},
{'addr': '4.3.2.1', 'netmask': '255.255.255.255'}
],
'tap': {'name': 'tap0'},
}
computer_dict['partition_list'].append(partition_example)
request_dict = {
'computer_id': self.computer_id,
'xml': xml_marshaller.xml_marshaller.dumps(computer_dict),
}
rv = self.app.post('/loadComputerConfigurationFromXML',
data=request_dict)
self.assertEqual(rv._status_code, 200)
def tearDown(self):
"""
Remove files generated for test
"""
shutil.rmtree(self._tempdir, True)
views.is_schema_already_executed = False
class TestInformation(BasicMixin, unittest.TestCase):
"""
Test Basic response of slapproxy
"""
def test_getComputerInformation(self):
"""
Check that getComputerInformation return a Computer
and database is generated
"""
rv = self.app.get('/getComputerInformation?computer_id=%s' % self.computer_id)
self.assertIsInstance(
xml_marshaller.xml_marshaller.loads(rv.data),
slapos.slap.Computer)
self.assertTrue(os.path.exists(self.proxy_db))
def test_getFullComputerInformation(self):
"""
Check that getFullComputerInformation return a Computer
and database is generated
"""
rv = self.app.get('/getFullComputerInformation?computer_id=%s' % self.computer_id)
self.assertIsInstance(
xml_marshaller.xml_marshaller.loads(rv.data),
slapos.slap.Computer)
self.assertTrue(os.path.exists(self.proxy_db))
def test_getComputerInformation_wrong_computer(self):
"""
Test that computer information won't be given to a requester different
from the one specified
"""
with self.assertRaises(slapos.slap.NotFoundError):
self.app.get('/getComputerInformation?computer_id=%s42' % self.computer_id)
def test_partition_are_empty(self):
"""
Test that empty partition are empty :)
"""
self.add_free_partition(10)
rv = self.app.get('/getFullComputerInformation?computer_id=%s' % self.computer_id)
computer = xml_marshaller.xml_marshaller.loads(rv.data)
for slap_partition in computer._computer_partition_list:
self.assertIsNone(slap_partition._software_release_document)
self.assertEqual(slap_partition._requested_state, 'destroyed')
self.assertEqual(slap_partition._need_modification, 0)
def test_getSoftwareReleaseListFromSoftwareProduct_software_product_reference(self):
"""
Check that calling getSoftwareReleaseListFromSoftwareProduct() in slapproxy
using a software_product_reference as parameter behaves correctly.
"""
software_product_reference = 'my_product'
software_release_url = 'my_url'
self.app_config['software_product_list'] = {
software_product_reference: software_release_url
}
response = self.app.get('/getSoftwareReleaseListFromSoftwareProduct'
'?software_product_reference=%s' %\
software_product_reference)
software_release_url_list = xml_marshaller.xml_marshaller.loads(
response.data)
self.assertEqual(
software_release_url_list,
[software_release_url]
)
def test_getSoftwareReleaseListFromSoftwareProduct_noSoftwareProduct(self):
"""
Check that calling getSoftwareReleaseListFromSoftwareProduct() in slapproxy
using a software_product_reference that doesn't exist as parameter
returns empty list.
"""
self.app_config['software_product_list'] = {'random': 'random'}
response = self.app.get('/getSoftwareReleaseListFromSoftwareProduct'
'?software_product_reference=idonotexist')
software_release_url_list = xml_marshaller.xml_marshaller.loads(
response.data)
self.assertEqual(
software_release_url_list,
[]
)
def test_getSoftwareReleaseListFromSoftwareProduct_bothParameter(self):
"""
Test that a call to getSoftwareReleaseListFromSoftwareProduct with no
parameter raises
"""
self.assertRaises(
AssertionError,
self.app.get,
'/getSoftwareReleaseListFromSoftwareProduct'
'?software_product_reference=foo'
'&software_release_url=bar'
)
def test_getSoftwareReleaseListFromSoftwareProduct_noParameter(self):
"""
Test that a call to getSoftwareReleaseListFromSoftwareProduct with both
software_product_reference and software_release_url parameters raises
"""
self.assertRaises(
AssertionError,
self.app.get, '/getSoftwareReleaseListFromSoftwareProduct'
)
def test_getComputerPartitionCertificate(self):
"""
Tests that getComputerPartitionCertificate method is implemented in slapproxy.
"""
rv = self.app.get(
'/getComputerPartitionCertificate?computer_id=%s&computer_partition_id=%s' % (
self.computer_id, 'slappart0'))
response = xml_marshaller.xml_marshaller.loads(rv.data)
self.assertEquals({'certificate': '', 'key': ''}, response)
def test_computerBang(self):
"""
Tests that computerBang method is implemented in slapproxy.
"""
rv = self.app.post( '/computerBang?computer_id=%s' % ( self.computer_id))
response = xml_marshaller.xml_marshaller.loads(rv.data)
self.assertEquals('', response)
class MasterMixin(BasicMixin, unittest.TestCase):
"""
Define advanced tool for test proxy simulating behavior slap library tools
"""
def _requestComputerPartition(self, software_release, software_type, partition_reference,
partition_id=None,
shared=False, partition_parameter_kw=None, filter_kw=None,
state=None):
"""
Check parameters, call requestComputerPartition server method and return result
"""
if partition_parameter_kw is None:
partition_parameter_kw = {}
if filter_kw is None:
filter_kw = {}
# Let's enforce a default software type
if software_type is None:
software_type = 'default'
request_dict = {
'computer_id': self.computer_id,
'computer_partition_id': partition_id,
'software_release': software_release,
'software_type': software_type,
'partition_reference': partition_reference,
'shared_xml': xml_marshaller.xml_marshaller.dumps(shared),
'partition_parameter_xml': xml_marshaller.xml_marshaller.dumps(
partition_parameter_kw),
'filter_xml': xml_marshaller.xml_marshaller.dumps(filter_kw),
'state': xml_marshaller.xml_marshaller.dumps(state),
}
return self.app.post('/requestComputerPartition', data=request_dict)
def request(self, *args, **kwargs):
"""
Simulate a request with above parameters
Return response by server (a computer partition or an error)
"""
rv = self._requestComputerPartition(*args, **kwargs)
self.assertEqual(rv._status_code, 200)
xml = rv.data
software_instance = xml_marshaller.xml_marshaller.loads(xml)
computer_partition = slapos.slap.ComputerPartition(
software_instance.slap_computer_id,
software_instance.slap_computer_partition_id)
computer_partition.__dict__.update(software_instance.__dict__)
return computer_partition
def supply(self, url, computer_id=None, state=''):
if not computer_id:
computer_id = self.computer_id
request_dict = {'url':url, 'computer_id': computer_id, 'state':state}
rv = self.app.post('/supplySupply',
data=request_dict)
# XXX return a Software Release
def setConnectionDict(self, partition_id,
connection_dict, slave_reference=None):
self.app.post('/setComputerPartitionConnectionXml', data={
'computer_id': self.computer_id,
'computer_partition_id': partition_id,
'connection_xml': xml_marshaller.xml_marshaller.dumps(connection_dict),
'slave_reference': slave_reference})
def getPartitionInformation(self, computer_partition_id):
"""
Return computer information as stored in proxy for corresponding id
"""
rv = self.app.get('/getFullComputerInformation?computer_id=%s' % self.computer_id)
computer = xml_marshaller.xml_marshaller.loads(rv.data)
for instance in computer._computer_partition_list:
if instance._partition_id == computer_partition_id:
return instance
class TestRequest(MasterMixin):
"""
Set of tests for requests
"""
def test_request_consistent_parameters(self):
"""
Check that all different parameters related to requests (like instance_guid, state) are set and consistent
"""
self.add_free_partition(1)
partition = self.request('http://sr//', None, 'MyFirstInstance', 'slappart0')
self.assertEqual(partition.getState(), 'started')
self.assertEqual(partition.getInstanceGuid(), 'computer-slappart0')
def test_two_request_one_partition_free(self):
"""
Since slapproxy does not implement scope, providing two partition_id
values will still succeed, even if only one partition is available.
"""
self.add_free_partition(1)
self.assertIsInstance(self.request('http://sr//', None,
'MyFirstInstance', 'slappart2'),
slapos.slap.ComputerPartition)
self.assertIsInstance(self.request('http://sr//', None,
'MyFirstInstance', 'slappart3'),
slapos.slap.ComputerPartition)
def test_two_request_two_partition_free(self):
"""
If two requests are made with two available partition
both will succeed
"""
self.add_free_partition(2)
self.assertIsInstance(self.request('http://sr//', None,
'MyFirstInstance', 'slappart2'),
slapos.slap.ComputerPartition)
self.assertIsInstance(self.request('http://sr//', None,
'MyFirstInstance', 'slappart3'),
slapos.slap.ComputerPartition)
def test_two_same_request_from_one_partition(self):
"""
Request will return same partition for two equal requests
"""
self.add_free_partition(2)
self.assertEqual(
self.request('http://sr//', None, 'MyFirstInstance', 'slappart2').__dict__,
self.request('http://sr//', None, 'MyFirstInstance', 'slappart2').__dict__)
def test_request_propagate_partition_state(self):
"""
Request will return same partition for two equal requests
"""
self.add_free_partition(2)
partition_parent = self.request('http://sr//', None, 'MyFirstInstance')
parent_dict = partition_parent.__dict__
partition_child = self.request('http://sr//', None, 'MySubInstance', parent_dict['_partition_id'])
self.assertEqual(partition_parent.getState(), 'started')
self.assertEqual(partition_child.getState(), 'started')
partition_parent = self.request('http://sr//', None, 'MyFirstInstance', state='stopped')
partition_child = self.request('http://sr//', None, 'MySubInstance', parent_dict['_partition_id'])
self.assertEqual(partition_parent.getState(), 'stopped')
self.assertEqual(partition_child.getState(), 'stopped')
partition_parent = self.request('http://sr//', None, 'MyFirstInstance', state='started')
partition_child = self.request('http://sr//', None, 'MySubInstance', parent_dict['_partition_id'])
self.assertEqual(partition_parent.getState(), 'started')
self.assertEqual(partition_child.getState(), 'started')
def test_request_parent_started_children_stopped(self):
"""
Request will return same partition for two equal requests
"""
self.add_free_partition(2)
partition_parent = self.request('http://sr//', None, 'MyFirstInstance')
parent_dict = partition_parent.__dict__
partition_child = self.request('http://sr//', None, 'MySubInstance', parent_dict['_partition_id'])
self.assertEqual(partition_parent.getState(), 'started')
self.assertEqual(partition_child.getState(), 'started')
partition_parent = self.request('http://sr//', None, 'MyFirstInstance')
partition_child = self.request('http://sr//', None, 'MySubInstance', parent_dict['_partition_id'], state='stopped')
self.assertEqual(partition_parent.getState(), 'started')
self.assertEqual(partition_child.getState(), 'stopped')
def test_two_requests_with_different_parameters_but_same_reference(self):
"""
Request will return same partition for two different requests but will
only update parameters
"""
self.add_free_partition(2)
wanted_domain1 = 'fou.org'
wanted_domain2 = 'carzy.org'
request1 = self.request('http://sr//', None, 'MyFirstInstance', 'slappart2',
partition_parameter_kw={'domain': wanted_domain1})
request1_dict = request1.__dict__
requested_result1 = self.getPartitionInformation(
request1_dict['_partition_id'])
request2 = self.request('http://sr//', 'Papa', 'MyFirstInstance', 'slappart2',
partition_parameter_kw={'domain': wanted_domain2})
request2_dict = request2.__dict__
requested_result2 = self.getPartitionInformation(
request2_dict['_partition_id'])
# Test we received same partition
for key in ['_partition_id', '_computer_id']:
self.assertEqual(request1_dict[key], request2_dict[key])
# Test that only parameters changed
for key in requested_result2.__dict__:
if key not in ['_parameter_dict',
'_software_release_document']:
self.assertEqual(requested_result2.__dict__[key],
requested_result1.__dict__[key])
elif key in ['_software_release_document']:
self.assertEqual(requested_result2.__dict__[key].__dict__,
requested_result1.__dict__[key].__dict__)
#Test parameters where set correctly
self.assertEqual(wanted_domain1,
requested_result1._parameter_dict['domain'])
self.assertEqual(wanted_domain2,
requested_result2._parameter_dict['domain'])
def test_two_requests_with_different_parameters_and_sr_url_but_same_reference(self):
"""
Request will return same partition for two different requests but will
only update parameters
"""
self.add_free_partition(2)
wanted_domain1 = 'fou.org'
wanted_domain2 = 'carzy.org'
request1 = self.request('http://sr//', None, 'MyFirstInstance', 'slappart2',
partition_parameter_kw={'domain': wanted_domain1})
request1_dict = request1.__dict__
requested_result1 = self.getPartitionInformation(
request1_dict['_partition_id'])
request2 = self.request('http://sr1//', 'Papa', 'MyFirstInstance', 'slappart2',
partition_parameter_kw={'domain': wanted_domain2})
request2_dict = request2.__dict__
requested_result2 = self.getPartitionInformation(
request2_dict['_partition_id'])
# Test we received same partition
for key in ['_partition_id', '_computer_id']:
self.assertEqual(request1_dict[key], request2_dict[key])
# Test that parameters and software_release url changed
for key in requested_result2.__dict__:
if key not in ['_parameter_dict',
'_software_release_document']:
self.assertEqual(requested_result2.__dict__[key],
requested_result1.__dict__[key])
elif key in ['_software_release_document']:
# software_release will be updated
self.assertEqual(requested_result2.__dict__[key].__dict__['_software_release'],
'http://sr1//')
self.assertEqual(requested_result1.__dict__[key].__dict__['_software_release'],
'http://sr//')
#Test parameters where set correctly
self.assertEqual(wanted_domain1,
requested_result1._parameter_dict['domain'])
self.assertEqual(wanted_domain2,
requested_result2._parameter_dict['domain'])
def test_two_different_request_from_two_partition(self):
"""
Since slapproxy does not implement scope, two request with
different partition_id will still return the same partition.
"""
self.add_free_partition(2)
self.assertEqual(
self.request('http://sr//', None, 'MyFirstInstance', 'slappart2').__dict__,
self.request('http://sr//', None, 'MyFirstInstance', 'slappart3').__dict__)
def test_two_different_request_from_one_partition(self):
"""
Two different request from same partition
will return two different partitions
"""
self.add_free_partition(2)
self.assertNotEqual(
self.request('http://sr//', None, 'MyFirstInstance', 'slappart2').__dict__,
self.request('http://sr//', None, 'frontend', 'slappart2').__dict__)
def test_request_with_nonascii_parameters(self):
"""
Verify that request with non-ascii parameters is correctly accepted
"""
self.add_free_partition(1)
request = self.request('http://sr//', None, 'myinstance', 'slappart0',
partition_parameter_kw={'text': u'Привет Мир!'})
self.assertIsInstance(request, slapos.slap.ComputerPartition)
class TestSlaveRequest(MasterMixin):
"""
Test requests related to slave instances.
"""
def test_slave_request_no_corresponding_partition(self):
"""
Slave instance request will fail if no corresponding are found
"""
self.add_free_partition(2)
rv = self._requestComputerPartition('http://sr//', None, 'MyFirstInstance', 'slappart2', shared=True)
self.assertEqual(rv._status_code, 404)
def test_slave_request_set_parameters(self):
"""
Parameters sent in slave request must be put in slave master
slave instance list.
1. We request a slave instance we defined parameters
2. We check parameters are in the dictionnary defining slave in
slave master slave_instance_list
"""
self.add_free_partition(6)
# Provide partition
master_partition_id = self.request('http://sr//', None,
'MyFirstInstance', 'slappart4')._partition_id
# First request of slave instance
wanted_domain = 'fou.org'
self.request('http://sr//', None, 'MyFirstInstance', 'slappart2', shared=True,
partition_parameter_kw={'domain': wanted_domain})
# Get updated information for master partition
master_partition = self.getPartitionInformation(master_partition_id)
our_slave = master_partition._parameter_dict['slave_instance_list'][0]
self.assertEqual(our_slave.get('domain'), wanted_domain)
def test_master_instance_with_no_slave(self):
"""
Test that a master instance with no requested slave
has an empty slave_instance_list parameter.
"""
self.add_free_partition(6)
# Provide partition
master_partition_id = self.request('http://sr//', None, 'MyMasterInstance', 'slappart4')._partition_id
master_partition = self.getPartitionInformation(master_partition_id)
self.assertEqual(len(master_partition._parameter_dict['slave_instance_list']), 0)
def test_slave_request_set_parameters_are_updated(self):
"""
Parameters sent in slave request must be put in slave master
slave instance list and updated when they change.
1. We request a slave instance we defined parameters
2. We check parameters are in the dictionnary defining slave in
slave master slave_instance_list
3. We request same slave instance with changed parameters
4. We check parameters are in the dictionnary defining slave in
slave master slave_instance_list have changed
"""
self.add_free_partition(6)
# Provide partition
master_partition_id = self.request('http://sr//', None,
'MyFirstInstance', 'slappart4')._partition_id
# First request of slave instance
wanted_domain_1 = 'crazy.org'
self.request('http://sr//', None, 'MyFirstInstance', 'slappart2', shared=True,
partition_parameter_kw={'domain': wanted_domain_1})
# Get updated information for master partition
master_partition = self.getPartitionInformation(master_partition_id)
our_slave = master_partition._parameter_dict['slave_instance_list'][0]
self.assertEqual(our_slave.get('domain'), wanted_domain_1)
# Second request of slave instance
wanted_domain_2 = 'maluco.org'
self.request('http://sr//', None, 'MyFirstInstance', 'slappart2', shared=True,
partition_parameter_kw={'domain': wanted_domain_2})
# Get updated information for master partition
master_partition = self.getPartitionInformation(master_partition_id)
our_slave = master_partition._parameter_dict['slave_instance_list'][0]
self.assertNotEqual(our_slave.get('domain'), wanted_domain_1)
self.assertEqual(our_slave.get('domain'), wanted_domain_2)
def test_slave_request_set_connection_parameters(self):
"""
Parameters set in slave instance by master instance must be put in slave instance connection parameters.
1. We request a slave instance
2. We set connection parameters for this slave instance
2. We check parameter is present when we do request() for the slave.
"""
self.add_free_partition(6)
# Provide partition
master_partition_id = self.request('http://sr//', None, 'MyMasterInstance', 'slappart4')._partition_id
# First request of slave instance
self.request('http://sr//', None, 'MySlaveInstance', 'slappart2', shared=True)
# Set connection parameter
master_partition = self.getPartitionInformation(master_partition_id)
# XXX change slave reference to be compatible with multiple nodes
self.setConnectionDict(partition_id=master_partition._partition_id,
connection_dict={'foo': 'bar'},
slave_reference=master_partition._parameter_dict['slave_instance_list'][0]['slave_reference'])
# Get updated information for slave partition
slave_partition = self.request('http://sr//', None, 'MySlaveInstance', 'slappart2', shared=True)
self.assertEqual(slave_partition.getConnectionParameter('foo'), 'bar')
def test_slave_request_one_corresponding_partition(self):
"""
Successfull request slave instance follow these steps:
1. Provide one corresponding partition
2. Ask for Slave instance. But no connection parameters
But slave is added to Master Instance slave list
3. Master Instance get updated information (including slave list)
4. Master instance post information about slave connection parameters
5. Ask for slave instance is successfull and return a computer instance
with connection information
"""
self.add_free_partition(6)
# Provide partition
master_partition_id = self.request('http://sr//', None,
'MyFirstInstance', 'slappart4')._partition_id
# First request of slave instance
name = 'MyFirstInstance'
requester = 'slappart2'
our_slave = self.request('http://sr//', None, name, requester, shared=True)
self.assertIsInstance(our_slave, slapos.slap.ComputerPartition)
self.assertEqual(our_slave._connection_dict, {})
# Get updated information for master partition
master_partition = self.getPartitionInformation(master_partition_id)
slave_for_master = master_partition._parameter_dict['slave_instance_list'][0]
# Send information about slave
slave_address = {'url': '%s.master.com'}
self.setConnectionDict(partition_id=master_partition._partition_id,
connection_dict=slave_address,
slave_reference=slave_for_master['slave_reference'])
# Successfull slave request with connection parameters
our_slave = self.request('http://sr//', None,
name, requester, shared=True)
self.assertIsInstance(our_slave, slapos.slap.ComputerPartition)
self.assertEqual(slave_address, our_slave._connection_dict)
def test_slave_request_instance_guid(self):
"""
Test that instance_guid support behaves correctly.
Warning: proxy doesn't gives unique id of instance, but gives instead unique id
of partition.
"""
self.add_free_partition(1)
partition = self.request('http://sr//', None, 'MyInstance', 'slappart1')
slave = self.request('http://sr//', None, 'MySlaveInstance', 'slappart1',
shared=True, filter_kw=dict(instance_guid=partition._instance_guid))
self.assertEqual(slave._partition_id, partition._partition_id)
class TestMultiNodeSupport(MasterMixin):
def test_multi_node_support_different_software_release_list(self):
"""
Test that two different registered computers have their own
Software Release list.
"""
self.add_free_partition(6, computer_id='COMP-0')
self.add_free_partition(6, computer_id='COMP-1')
software_release_1_url = 'http://sr1'
software_release_2_url = 'http://sr2'
software_release_3_url = 'http://sr3'
self.supply(software_release_1_url, 'COMP-0')
self.supply(software_release_2_url, 'COMP-1')
self.supply(software_release_3_url, 'COMP-0')
self.supply(software_release_3_url, 'COMP-1')
computer_default = loads(self.app.get('/getFullComputerInformation?computer_id=%s' % self.computer_id).data)
computer_0 = loads(self.app.get('/getFullComputerInformation?computer_id=COMP-0').data)
computer_1 = loads(self.app.get('/getFullComputerInformation?computer_id=COMP-1').data)
self.assertEqual(len(computer_default._software_release_list), 0)
self.assertEqual(len(computer_0._software_release_list), 2)
self.assertEqual(len(computer_1._software_release_list), 2)
self.assertEqual(
computer_0._software_release_list[0]._software_release,
software_release_1_url
)
self.assertEqual(
computer_0._software_release_list[0]._computer_guid,
'COMP-0'
)
self.assertEqual(
computer_0._software_release_list[1]._software_release,
software_release_3_url
)
self.assertEqual(
computer_0._software_release_list[1]._computer_guid,
'COMP-0'
)
self.assertEqual(
computer_1._software_release_list[0]._software_release,
software_release_2_url
)
self.assertEqual(
computer_1._software_release_list[0]._computer_guid,
'COMP-1'
)
self.assertEqual(
computer_1._software_release_list[1]._software_release,
software_release_3_url
)
self.assertEqual(
computer_1._software_release_list[1]._computer_guid,
'COMP-1'
)
def test_multi_node_support_remove_software_release(self):
"""
Test that removing a software from a Computer doesn't
affect other computer
"""
software_release_url = 'http://sr'
self.add_free_partition(6, computer_id='COMP-0')
self.add_free_partition(6, computer_id='COMP-1')
self.supply(software_release_url, 'COMP-0')
self.supply(software_release_url, 'COMP-1')
self.supply(software_release_url, 'COMP-0', state='destroyed')
computer_0 = loads(self.app.get('/getFullComputerInformation?computer_id=COMP-0').data)
computer_1 = loads(self.app.get('/getFullComputerInformation?computer_id=COMP-1').data)
self.assertEqual(len(computer_0._software_release_list), 0)
self.assertEqual(len(computer_1._software_release_list), 1)
self.assertEqual(
computer_1._software_release_list[0]._software_release,
software_release_url
)
self.assertEqual(
computer_1._software_release_list[0]._computer_guid,
'COMP-1'
)
def test_multi_node_support_instance_default_computer(self):
"""
Test that instance request behaves correctly with default computer
"""
software_release_url = 'http://sr'
computer_0_id = 'COMP-0'
computer_1_id = 'COMP-1'
self.add_free_partition(6, computer_id=computer_0_id)
self.add_free_partition(6, computer_id=computer_1_id)
# Request without SLA -> goes to default computer only.
# It should fail if we didn't registered partitions for default computer
# (default computer is always registered)
rv = self._requestComputerPartition('http://sr//', None, 'MyFirstInstance', 'slappart2')
self.assertEqual(rv._status_code, 404)
rv = self._requestComputerPartition('http://sr//', None, 'MyFirstInstance', 'slappart2',
filter_kw={'computer_guid':self.computer_id})
self.assertEqual(rv._status_code, 404)
# Register default computer: deployment works
self.add_free_partition(1)
self.request('http://sr//', None, 'MyFirstInstance', 'slappart0')
computer_default = loads(self.app.get(
'/getFullComputerInformation?computer_id=%s' % self.computer_id).data)
self.assertEqual(len(computer_default._software_release_list), 0)
# No free space on default computer: request without SLA fails
rv = self._requestComputerPartition('http://sr//', None, 'CanIHasPartition', 'slappart2',
filter_kw={'computer_guid':self.computer_id})
self.assertEqual(rv._status_code, 404)
def test_multi_node_support_instance(self):
"""
Test that instance request behaves correctly with several
registered computers
"""
software_release_url = 'http://sr'
computer_0_id = 'COMP-0'
computer_1_id = 'COMP-1'
software_release_1 = 'http://sr//'
software_release_2 = 'http://othersr//'
self.add_free_partition(2, computer_id=computer_1_id)
# Deploy to first non-default computer using SLA
# It should fail since computer is not registered
rv = self._requestComputerPartition(software_release_1, None, 'MyFirstInstance', 'slappart2', filter_kw={'computer_guid':computer_0_id})
self.assertEqual(rv._status_code, 404)
self.add_free_partition(2, computer_id=computer_0_id)
# Deploy to first non-default computer using SLA
partition = self.request(software_release_1, None, 'MyFirstInstance', 'slappart0', filter_kw={'computer_guid':computer_0_id})
self.assertEqual(partition.getState(), 'started')
self.assertEqual(partition._partition_id, 'slappart0')
self.assertEqual(partition._computer_id, computer_0_id)
# All other instances should be empty
computer_0 = loads(self.app.get('/getFullComputerInformation?computer_id=COMP-0').data)
computer_1 = loads(self.app.get('/getFullComputerInformation?computer_id=COMP-1').data)
self.assertEqual(computer_0._computer_partition_list[0]._software_release_document._software_release, software_release_1)
self.assertTrue(computer_0._computer_partition_list[1]._software_release_document == None)
self.assertTrue(computer_1._computer_partition_list[0]._software_release_document == None)
self.assertTrue(computer_1._computer_partition_list[1]._software_release_document == None)
# Deploy to second non-default computer using SLA
partition = self.request(software_release_2, None, 'MySecondInstance', 'slappart0', filter_kw={'computer_guid':computer_1_id})
self.assertEqual(partition.getState(), 'started')
self.assertEqual(partition._partition_id, 'slappart0')
self.assertEqual(partition._computer_id, computer_1_id)
# The two remaining instances should be free, and MyfirstInstance should still be there
computer_0 = loads(self.app.get('/getFullComputerInformation?computer_id=COMP-0').data)
computer_1 = loads(self.app.get('/getFullComputerInformation?computer_id=COMP-1').data)
self.assertEqual(computer_0._computer_partition_list[0]._software_release_document._software_release, software_release_1)
self.assertTrue(computer_0._computer_partition_list[1]._software_release_document == None)
self.assertEqual(computer_1._computer_partition_list[0]._software_release_document._software_release, software_release_2)
self.assertTrue(computer_1._computer_partition_list[1]._software_release_document == None)
def test_multi_node_support_change_instance_state(self):
"""
Test that destroying an instance (i.e change state) from a Computer doesn't
affect other computer
"""
software_release_url = 'http://sr'
computer_0_id = 'COMP-0'
computer_1_id = 'COMP-1'
self.add_free_partition(6, computer_id=computer_0_id)
self.add_free_partition(6, computer_id=computer_1_id)
partition_first = self.request('http://sr//', None, 'MyFirstInstance', 'slappart0', filter_kw={'computer_guid':computer_0_id})
partition_second = self.request('http://sr//', None, 'MySecondInstance', 'slappart0', filter_kw={'computer_guid':computer_1_id})
partition_first = self.request('http://sr//', None, 'MyFirstInstance', 'slappart0', filter_kw={'computer_guid':computer_0_id}, state='stopped')
computer_0 = loads(self.app.get('/getFullComputerInformation?computer_id=COMP-0').data)
computer_1 = loads(self.app.get('/getFullComputerInformation?computer_id=COMP-1').data)
self.assertEqual(computer_0._computer_partition_list[0].getState(), 'stopped')
self.assertEqual(computer_0._computer_partition_list[1].getState(), 'destroyed')
self.assertEqual(computer_1._computer_partition_list[0].getState(), 'started')
self.assertEqual(computer_1._computer_partition_list[1].getState(), 'destroyed')
def test_multi_node_support_same_reference(self):
"""
Test that requesting an instance with same reference to two
different nodes behaves like master: once an instance is assigned to a node,
changing SLA will not change node.
"""
software_release_url = 'http://sr'
computer_0_id = 'COMP-0'
computer_1_id = 'COMP-1'
self.add_free_partition(2, computer_id=computer_0_id)
self.add_free_partition(2, computer_id=computer_1_id)
partition = self.request('http://sr//', None, 'MyFirstInstance', 'slappart0', filter_kw={'computer_guid':computer_0_id})
partition = self.request('http://sr//', None, 'MyFirstInstance', 'slappart0', filter_kw={'computer_guid':computer_1_id})
self.assertEqual(partition._computer_id, computer_0_id)
computer_1 = loads(self.app.get('/getFullComputerInformation?computer_id=COMP-1').data)
self.assertTrue(computer_1._computer_partition_list[0]._software_release_document == None)
self.assertTrue(computer_1._computer_partition_list[1]._software_release_document == None)
def test_multi_node_support_slave_instance(self):
"""
Test that slave instances are correctly deployed if SLA is specified
but deployed only on default computer if not specified (i.e not deployed
if default computer doesn't have corresponding master instance).
"""
computer_0_id = 'COMP-0'
computer_1_id = 'COMP-1'
self.add_free_partition(2, computer_id=computer_0_id)
self.add_free_partition(2, computer_id=computer_1_id)
self.add_free_partition(2)
self.request('http://sr2//', None, 'MyFirstInstance', 'slappart0', filter_kw={'computer_guid':computer_0_id})
self.request('http://sr//', None, 'MyOtherInstance', 'slappart0', filter_kw={'computer_guid':computer_1_id})
# Request slave without SLA: will fail
rv = self._requestComputerPartition('http://sr//', None, 'MySlaveInstance', 'slappart2', shared=True)
self.assertEqual(rv._status_code, 404)
# Request slave with SLA on incorrect computer: will fail
rv = self._requestComputerPartition('http://sr//', None, 'MySlaveInstance', 'slappart2', shared=True, filter_kw={'computer_guid':computer_0_id})
self.assertEqual(rv._status_code, 404)
# Request computer on correct computer: will succeed
partition = self.request('http://sr//', None, 'MySlaveInstance', 'slappart2', shared=True, filter_kw={'computer_guid':computer_1_id})
self.assertEqual(partition._computer_id, computer_1_id)
def test_multi_node_support_instance_guid(self):
"""
Test that instance_guid support behaves correctly with multiple nodes.
Warning: proxy doesn't gives unique id of instance, but gives instead unique id
of partition.
"""
computer_0_id = 'COMP-0'
computer_1_id = 'COMP-1'
self.add_free_partition(2, computer_id=computer_0_id)
self.add_free_partition(2, computer_id=computer_1_id)
self.add_free_partition(2)
partition_computer_0 = self.request('http://sr2//', None, 'MyFirstInstance', 'slappart0', filter_kw={'computer_guid':computer_0_id})
partition_computer_1 = self.request('http://sr//', None, 'MyOtherInstance', 'slappart0', filter_kw={'computer_guid':computer_1_id})
partition_computer_default = self.request('http://sr//', None, 'MyThirdInstance', 'slappart0')
self.assertEqual(partition_computer_0.getInstanceGuid(), 'COMP-0-slappart0')
self.assertEqual(partition_computer_1.getInstanceGuid(), 'COMP-1-slappart0')
self.assertEqual(partition_computer_default.getInstanceGuid(), 'computer-slappart0')
def test_multi_node_support_getComputerInformation(self):
"""
Test that computer information will not be given if computer is not registered.
Test that it still should work for the 'default' computer specified in slapos config
even if not yet registered.
Test that computer information is given if computer is registered.
"""
new_computer_id = '%s42' % self.computer_id
with self.assertRaises(slapos.slap.NotFoundError):
self.app.get('/getComputerInformation?computer_id=%s42' % new_computer_id)
try:
self.app.get('/getComputerInformation?computer_id=%s' % self.computer_id)
except slapos.slap.NotFoundError:
self.fail('Could not fetch informations for default computer.')
self.add_free_partition(1, computer_id=new_computer_id)
try:
self.app.get('/getComputerInformation?computer_id=%s' % new_computer_id)
except slapos.slap.NotFoundError:
self.fail('Could not fetch informations for registered computer.')
class TestMultiMasterSupport(MasterMixin):
"""
Test multimaster support in slapproxy.
"""
external_software_release = 'http://mywebsite.me/exteral_software_release.cfg'
software_release_not_in_list = 'http://mywebsite.me/exteral_software_release_not_listed.cfg'
def setUp(self):
self.addCleanup(self.stopExternalProxy)
# XXX don't use lo
self.external_proxy_host = os.environ.get('LOCAL_IPV4', '127.0.0.1')
self.external_proxy_port = 8281
self.external_master_url = 'http://%s:%s' % (self.external_proxy_host, self.external_proxy_port)
self.external_computer_id = 'external_computer'
self.external_proxy_slap = slapos.slap.slap()
self.external_proxy_slap.initializeConnection(self.external_master_url)
super(TestMultiMasterSupport, self).setUp()
self.db = sqlite_connect(self.proxy_db)
self.external_slapproxy_configuration_file_location = os.path.join(
self._tempdir, 'external_slapos.cfg')
self.createExternalProxyConfigurationFile()
self.startExternalProxy()
def tearDown(self):
super(TestMultiMasterSupport, self).tearDown()
def createExternalProxyConfigurationFile(self):
open(self.external_slapproxy_configuration_file_location, 'w').write("""[slapos]
computer_id = %(external_computer_id)s
[slapproxy]
host = %(host)s
port = %(port)s
database_uri = %(tempdir)s/lib/external_proxy.db
""" % {
'tempdir': self._tempdir,
'host': self.external_proxy_host,
'port': self.external_proxy_port,
'external_computer_id': self.external_computer_id
})
def startExternalProxy(self):
"""
Start external slapproxy
"""
logging.getLogger().info('Starting external proxy, listening to %s:%s' % (self.external_proxy_host, self.external_proxy_port))
# XXX This uses a hack to run current code of slapos.core
import slapos
self.external_proxy_process = subprocess.Popen(
[
sys.executable, '%s/../cli/entry.py' % os.path.dirname(slapos.tests.__file__),
'proxy', 'start', '--cfg', self.external_slapproxy_configuration_file_location
],
env={"PYTHONPATH": ':'.join(sys.path)}
)
# Wait a bit for proxy to be started
attempts = 0
while (attempts < 20):
try:
self.external_proxy_slap._connection_helper.GET('/')
except slapos.slap.NotFoundError:
break
except slapos.slap.ConnectionError, socket.error:
attempts = attempts + 1
time.sleep(0.1)
else:
self.fail('Could not start external proxy.')
def stopExternalProxy(self):
self.external_proxy_process.kill()
def createSlapOSConfigurationFile(self):
"""
Overwrite default slapos configuration file to enable specific multimaster
behaviours.
"""
configuration = pkg_resources.resource_stream(
'slapos.tests.slapproxy', 'slapos_multimaster.cfg.in'
).read() % {
'tempdir': self._tempdir, 'proxyaddr': self.proxyaddr,
'external_proxy_host': self.external_proxy_host,
'external_proxy_port': self.external_proxy_port
}
open(self.slapos_cfg, 'w').write(configuration)
def external_proxy_add_free_partition(self, partition_amount, computer_id=None):
"""
Will simulate a slapformat first run
and create "partition_amount" partitions
"""
if not computer_id:
computer_id = self.external_computer_id
computer_dict = {
'reference': computer_id,
'address': '123.456.789',
'netmask': 'fffffffff',
'partition_list': [],
}
for i in range(partition_amount):
partition_example = {
'reference': 'slappart%s' % i,
'address_list': [
{'addr': '1.2.3.4', 'netmask': '255.255.255.255'},
{'addr': '4.3.2.1', 'netmask': '255.255.255.255'}
],
'tap': {'name': 'tap0'},
}
computer_dict['partition_list'].append(partition_example)
request_dict = {
'computer_id': self.computer_id,
'xml': xml_marshaller.xml_marshaller.dumps(computer_dict),
}
self.external_proxy_slap._connection_helper.POST('/loadComputerConfigurationFromXML',
data=request_dict)
def _checkInstanceIsFowarded(self, name, partition_parameter_kw, software_release):
"""
Test there is no instance on local proxy.
Test there is instance on external proxy.
Test there is instance reference in external table of databse of local proxy.
"""
# Test it has been correctly added to local database
forwarded_instance_list = slapos.proxy.views.execute_db('forwarded_partition_request', 'SELECT * from %s', db=self.db)
self.assertEqual(len(forwarded_instance_list), 1)
forwarded_instance = forwarded_instance_list[0]
self.assertEqual(forwarded_instance['partition_reference'], name)
self.assertEqual(forwarded_instance['master_url'], self.external_master_url)
# Test there is nothing allocated locally
computer = loads(self.app.get(
'/getFullComputerInformation?computer_id=%s' % self.computer_id
).data)
self.assertEqual(
computer._computer_partition_list[0]._software_release_document,
None
)
# Test there is an instance allocated in external master
external_slap = slapos.slap.slap()
external_slap.initializeConnection(self.external_master_url)
external_computer = external_slap.registerComputer(self.external_computer_id)
external_partition = external_computer.getComputerPartitionList()[0]
for k, v in partition_parameter_kw.iteritems():
self.assertEqual(
external_partition.getInstanceParameter(k),
v
)
self.assertEqual(
external_partition._software_release_document._software_release,
software_release
)
def _checkInstanceIsAllocatedLocally(self, name, partition_parameter_kw, software_release):
"""
Test there is one instance on local proxy.
Test there NO is instance reference in external table of databse of local proxy.
Test there is not instance on external proxy.
"""
# Test it has NOT been added to local database
forwarded_instance_list = slapos.proxy.views.execute_db('forwarded_partition_request', 'SELECT * from %s', db=self.db)
self.assertEqual(len(forwarded_instance_list), 0)
# Test there is an instance allocated locally
computer = loads(self.app.get(
'/getFullComputerInformation?computer_id=%s' % self.computer_id
).data)
partition = computer._computer_partition_list[0]
for k, v in partition_parameter_kw.iteritems():
self.assertEqual(
partition.getInstanceParameter(k),
v
)
self.assertEqual(
partition._software_release_document._software_release,
software_release
)
# Test there is NOT instance allocated in external master
external_slap = slapos.slap.slap()
external_slap.initializeConnection(self.external_master_url)
external_computer = external_slap.registerComputer(self.external_computer_id)
external_partition = external_computer.getComputerPartitionList()[0]
self.assertEqual(
external_partition._software_release_document,
None
)
def testForwardToMasterInList(self):
"""
Test that explicitely asking a master_url in SLA causes
proxy to forward request to this master.
"""
dummy_parameter_dict = {'foo': 'bar'}
instance_reference = 'MyFirstInstance'
self.add_free_partition(1)
self.external_proxy_add_free_partition(1)
filter_kw = {'master_url': self.external_master_url}
partition = self.request(self.software_release_not_in_list, None, instance_reference, 'slappart0',
filter_kw=filter_kw, partition_parameter_kw=dummy_parameter_dict)
self._checkInstanceIsFowarded(instance_reference, dummy_parameter_dict, self.software_release_not_in_list)
self.assertEqual(
partition._master_url,
self.external_master_url
)
def testForwardToMasterNotInList(self):
"""
Test that explicitely asking a master_url in SLA causes
proxy to refuse to forward if this master_url is not whitelisted
"""
self.add_free_partition(1)
self.external_proxy_add_free_partition(1)
filter_kw = {'master_url': self.external_master_url + 'bad'}
rv = self._requestComputerPartition(self.software_release_not_in_list, None, 'MyFirstInstance', 'slappart0', filter_kw=filter_kw)
self.assertEqual(rv._status_code, 404)
def testForwardRequest_SoftwareReleaseList(self):
"""
Test that instance request is automatically forwarded
if its Software Release matches list.
"""
dummy_parameter_dict = {'foo': 'bar'}
instance_reference = 'MyFirstInstance'
self.add_free_partition(1)
self.external_proxy_add_free_partition(1)
partition = self.request(self.external_software_release, None, instance_reference, 'slappart0',
partition_parameter_kw=dummy_parameter_dict)
self._checkInstanceIsFowarded(instance_reference, dummy_parameter_dict, self.external_software_release)
def testRequestToCurrentMaster(self):
"""
Explicitely ask deployment of an instance to current master
"""
self.add_free_partition(1)
self.external_proxy_add_free_partition(1)
instance_reference = 'MyFirstInstance'
dummy_parameter_dict = {'foo': 'bar'}
filter_kw = {'master_url': self.proxyaddr}
self.request(self.software_release_not_in_list, None, instance_reference, 'slappart0',
filter_kw=filter_kw, partition_parameter_kw=dummy_parameter_dict)
self._checkInstanceIsAllocatedLocally(instance_reference, dummy_parameter_dict, self.software_release_not_in_list)
def testRequestExplicitelyOnExternalMasterThenRequestAgain(self):
"""
Request an instance that will get forwarded to another an instance.
Test that subsequent request without SLA doesn't forward
"""
dummy_parameter_dict = {'foo': 'bar'}
self.testForwardToMasterInList()
partition = self.request(self.software_release_not_in_list, None, 'MyFirstInstance', 'slappart0', partition_parameter_kw=dummy_parameter_dict)
self.assertEqual(
getattr(partition, '_master_url', None),
None
)
# Test it has not been removed from local database (we keep track)
forwarded_instance_list = slapos.proxy.views.execute_db('forwarded_partition_request', 'SELECT * from %s', db=self.db)
self.assertEqual(len(forwarded_instance_list), 1)
# Test there is an instance allocated locally
computer = loads(self.app.get(
'/getFullComputerInformation?computer_id=%s' % self.computer_id
).data)
partition = computer._computer_partition_list[0]
for k, v in dummy_parameter_dict.iteritems():
self.assertEqual(
partition.getInstanceParameter(k),
v
)
self.assertEqual(
partition._software_release_document._software_release,
self.software_release_not_in_list
)
# XXX: when testing new schema version,
# rename to "TestMigrateVersion10ToLatest" and test accordingly.
# Of course, also test version 11 to latest (should be 12).
class TestMigrateVersion10To11(TestInformation, TestRequest, TestSlaveRequest, TestMultiNodeSupport):
"""
Test that old database version are automatically migrated without failure
"""
def setUp(self):
super(TestMigrateVersion10To11, self).setUp()
schema = pkg_resources.resource_stream('slapos.tests.slapproxy', 'database_dump_version_10.sql')
schema = schema.read() % dict(version='11')
self.db = sqlite_connect(self.proxy_db)
self.db.cursor().executescript(schema)
self.db.commit()
def test_automatic_migration(self):
table_list = ('software11', 'computer11', 'partition11', 'slave11', 'partition_network11')
for table in table_list:
self.assertRaises(sqlite3.OperationalError, self.db.execute, "SELECT name FROM computer11")
# Run a dummy request to cause migration
self.app.get('/getComputerInformation?computer_id=computer')
# Check some partition parameters
self.assertEqual(
loads(self.app.get('/getComputerInformation?computer_id=computer').data)._computer_partition_list[0]._parameter_dict['slap_software_type'],
'production'
)
# Lower level tests
computer_list = self.db.execute("SELECT * FROM computer11").fetchall()
self.assertEqual(
computer_list,
[(u'computer', u'127.0.0.1', u'255.255.255.255')]
)
software_list = self.db.execute("SELECT * FROM software11").fetchall()
self.assertEqual(
software_list,
[(u'/srv/slapgrid//srv//runner/project//slapos/software.cfg', u'computer')]
)
partition_list = self.db.execute("select * from partition11").fetchall()
self.assertEqual(
partition_list,
[(u'slappart0', u'computer', u'busy', u'/srv/slapgrid//srv//runner/project//slapos/software.cfg', u'\n\n {\n "site-id": "erp5"\n }\n}\n\n', None, None, u'production', u'slapos', None, u'started'), (u'slappart1', u'computer', u'busy', u'/srv/slapgrid//srv//runner/project//slapos/software.cfg', u"\n\n", u'\n\n mysql://127.0.0.1:45678/erp5\n\n', None, u'mariadb', u'MariaDB DataBase', u'slappart0', u'started'), (u'slappart2', u'computer', u'busy', u'/srv/slapgrid//srv//runner/project//slapos/software.cfg', u'\n\n \n\n', u'\n\n cloudooo://127.0.0.1:23000/\n\n', None, u'cloudooo', u'Cloudooo', u'slappart0', u'started'), (u'slappart3', u'computer', u'busy', u'/srv/slapgrid//srv//runner/project//slapos/software.cfg', u"\n\n", u'\n\n memcached://127.0.0.1:11000/\n\n', None, u'memcached', u'Memcached', u'slappart0', u'started'), (u'slappart4', u'computer', u'busy', u'/srv/slapgrid//srv//runner/project//slapos/software.cfg', u"\n\n", u'\n\n memcached://127.0.0.1:13301/\n\n', None, u'kumofs', u'KumoFS', u'slappart0', u'started'), (u'slappart5', u'computer', u'busy', u'/srv/slapgrid//srv//runner/project//slapos/software.cfg', u'\n\n memcached://127.0.0.1:13301/\n memcached://127.0.0.1:11000/\n cloudooo://127.0.0.1:23000/\n\n', u'\n\n https://[fc00::1]:10001\n\n', None, u'tidstorage', u'TidStorage', u'slappart0', u'started'), (u'slappart6', u'computer', u'free', None, None, None, None, None, None, None, u'started'), (u'slappart7', u'computer', u'free', None, None, None, None, None, None, None, u'started'), (u'slappart8', u'computer', u'free', None, None, None, None, None, None, None, u'started'), (u'slappart9', u'computer', u'free', None, None, None, None, None, None, None, u'started')]
)
slave_list = self.db.execute("select * from slave11").fetchall()
self.assertEqual(
slave_list,
[]
)
partition_network_list = self.db.execute("select * from partition_network11").fetchall()
self.assertEqual(
partition_network_list,
[(u'slappart0', u'computer', u'slappart0', u'127.0.0.1', u'255.255.255.255'), (u'slappart0', u'computer', u'slappart0', u'fc00::1', u'ffff:ffff:ffff::'), (u'slappart1', u'computer', u'slappart1', u'127.0.0.1', u'255.255.255.255'), (u'slappart1', u'computer', u'slappart1', u'fc00::1', u'ffff:ffff:ffff::'), (u'slappart2', u'computer', u'slappart2', u'127.0.0.1', u'255.255.255.255'), (u'slappart2', u'computer', u'slappart2', u'fc00::1', u'ffff:ffff:ffff::'), (u'slappart3', u'computer', u'slappart3', u'127.0.0.1', u'255.255.255.255'), (u'slappart3', u'computer', u'slappart3', u'fc00::1', u'ffff:ffff:ffff::'), (u'slappart4', u'computer', u'slappart4', u'127.0.0.1', u'255.255.255.255'), (u'slappart4', u'computer', u'slappart4', u'fc00::1', u'ffff:ffff:ffff::'), (u'slappart5', u'computer', u'slappart5', u'127.0.0.1', u'255.255.255.255'), (u'slappart5', u'computer', u'slappart5', u'fc00::1', u'ffff:ffff:ffff::'), (u'slappart6', u'computer', u'slappart6', u'127.0.0.1', u'255.255.255.255'), (u'slappart6', u'computer', u'slappart6', u'fc00::1', u'ffff:ffff:ffff::'), (u'slappart7', u'computer', u'slappart7', u'127.0.0.1', u'255.255.255.255'), (u'slappart7', u'computer', u'slappart7', u'fc00::1', u'ffff:ffff:ffff::'), (u'slappart8', u'computer', u'slappart8', u'127.0.0.1', u'255.255.255.255'), (u'slappart8', u'computer', u'slappart8', u'fc00::1', u'ffff:ffff:ffff::'), (u'slappart9', u'computer', u'slappart9', u'127.0.0.1', u'255.255.255.255'), (u'slappart9', u'computer', u'slappart9', u'fc00::1', u'ffff:ffff:ffff::')]
)
# Override several tests that needs an empty database
@unittest.skip("Not implemented")
def test_multi_node_support_different_software_release_list(self):
pass
@unittest.skip("Not implemented")
def test_multi_node_support_instance_default_computer(self):
pass
@unittest.skip("Not implemented")
def test_multi_node_support_instance_guid(self):
pass
@unittest.skip("Not implemented")
def test_partition_are_empty(self):
pass
@unittest.skip("Not implemented")
def test_request_consistent_parameters(self):
pass
database_dump_version_10.sql 0000664 0000000 0000000 00000013471 13232036066 0035512 0 ustar 00root root 0000000 0000000 slapos.core-15dedb62a3565373c77977e5ebd2e56b4ffe341a-slapos-tests/slapos/tests/slapproxy -- Real world example of webrunner database running version 10 of proxy db.
PRAGMA foreign_keys=OFF;
BEGIN TRANSACTION;
CREATE TABLE software10 (url VARCHAR(255) UNIQUE);
INSERT INTO "software10" VALUES('/srv/slapgrid//srv//runner/project//slapos/software.cfg');
CREATE TABLE computer10 (
address VARCHAR(255),
netmask VARCHAR(255),
CONSTRAINT uniq PRIMARY KEY (address, netmask));
INSERT INTO "computer10" VALUES('127.0.0.1','255.255.255.255');
CREATE TABLE partition10 (
reference VARCHAR(255) UNIQUE,
slap_state VARCHAR(255) DEFAULT 'free',
software_release VARCHAR(255),
xml TEXT,
connection_xml TEXT,
slave_instance_list TEXT,
software_type VARCHAR(255),
partition_reference VARCHAR(255),
requested_by VARCHAR(255), -- only used for debugging,
-- slapproxy does not support proper scope
requested_state VARCHAR(255) NOT NULL DEFAULT 'started'
);
INSERT INTO "partition10" VALUES('slappart0','busy','/srv/slapgrid//srv//runner/project//slapos/software.cfg','
{
"site-id": "erp5"
}
}
',NULL,NULL,'production','slapos',NULL,'started');
INSERT INTO "partition10" VALUES('slappart1','busy','/srv/slapgrid//srv//runner/project//slapos/software.cfg','
','
mysql://127.0.0.1:45678/erp5
',NULL,'mariadb','MariaDB DataBase','slappart0','started');
INSERT INTO "partition10" VALUES('slappart2','busy','/srv/slapgrid//srv//runner/project//slapos/software.cfg','
','
cloudooo://127.0.0.1:23000/
',NULL,'cloudooo','Cloudooo','slappart0','started');
INSERT INTO "partition10" VALUES('slappart3','busy','/srv/slapgrid//srv//runner/project//slapos/software.cfg','
','
memcached://127.0.0.1:11000/
',NULL,'memcached','Memcached','slappart0','started');
INSERT INTO "partition10" VALUES('slappart4','busy','/srv/slapgrid//srv//runner/project//slapos/software.cfg','
','
memcached://127.0.0.1:13301/
',NULL,'kumofs','KumoFS','slappart0','started');
INSERT INTO "partition10" VALUES('slappart5','busy','/srv/slapgrid//srv//runner/project//slapos/software.cfg','
memcached://127.0.0.1:13301/
memcached://127.0.0.1:11000/
cloudooo://127.0.0.1:23000/
','
https://[fc00::1]:10001
',NULL,'tidstorage','TidStorage','slappart0','started');
INSERT INTO "partition10" VALUES('slappart6','free',NULL,NULL,NULL,NULL,NULL,NULL,NULL,'started');
INSERT INTO "partition10" VALUES('slappart7','free',NULL,NULL,NULL,NULL,NULL,NULL,NULL,'started');
INSERT INTO "partition10" VALUES('slappart8','free',NULL,NULL,NULL,NULL,NULL,NULL,NULL,'started');
INSERT INTO "partition10" VALUES('slappart9','free',NULL,NULL,NULL,NULL,NULL,NULL,NULL,'started');
CREATE TABLE slave10 (
reference VARCHAR(255) UNIQUE,
connection_xml TEXT,
hosted_by VARCHAR(255),
asked_by VARCHAR(255) -- only used for debugging,
-- slapproxy does not support proper scope
);
CREATE TABLE partition_network10 (
partition_reference VARCHAR(255),
reference VARCHAR(255),
address VARCHAR(255),
netmask VARCHAR(255)
);
INSERT INTO "partition_network10" VALUES('slappart0','slappart0','127.0.0.1','255.255.255.255');
INSERT INTO "partition_network10" VALUES('slappart0','slappart0','fc00::1','ffff:ffff:ffff::');
INSERT INTO "partition_network10" VALUES('slappart1','slappart1','127.0.0.1','255.255.255.255');
INSERT INTO "partition_network10" VALUES('slappart1','slappart1','fc00::1','ffff:ffff:ffff::');
INSERT INTO "partition_network10" VALUES('slappart2','slappart2','127.0.0.1','255.255.255.255');
INSERT INTO "partition_network10" VALUES('slappart2','slappart2','fc00::1','ffff:ffff:ffff::');
INSERT INTO "partition_network10" VALUES('slappart3','slappart3','127.0.0.1','255.255.255.255');
INSERT INTO "partition_network10" VALUES('slappart3','slappart3','fc00::1','ffff:ffff:ffff::');
INSERT INTO "partition_network10" VALUES('slappart4','slappart4','127.0.0.1','255.255.255.255');
INSERT INTO "partition_network10" VALUES('slappart4','slappart4','fc00::1','ffff:ffff:ffff::');
INSERT INTO "partition_network10" VALUES('slappart5','slappart5','127.0.0.1','255.255.255.255');
INSERT INTO "partition_network10" VALUES('slappart5','slappart5','fc00::1','ffff:ffff:ffff::');
INSERT INTO "partition_network10" VALUES('slappart6','slappart6','127.0.0.1','255.255.255.255');
INSERT INTO "partition_network10" VALUES('slappart6','slappart6','fc00::1','ffff:ffff:ffff::');
INSERT INTO "partition_network10" VALUES('slappart7','slappart7','127.0.0.1','255.255.255.255');
INSERT INTO "partition_network10" VALUES('slappart7','slappart7','fc00::1','ffff:ffff:ffff::');
INSERT INTO "partition_network10" VALUES('slappart8','slappart8','127.0.0.1','255.255.255.255');
INSERT INTO "partition_network10" VALUES('slappart8','slappart8','fc00::1','ffff:ffff:ffff::');
INSERT INTO "partition_network10" VALUES('slappart9','slappart9','127.0.0.1','255.255.255.255');
INSERT INTO "partition_network10" VALUES('slappart9','slappart9','fc00::1','ffff:ffff:ffff::');
COMMIT;
slapos_multimaster.cfg.in 0000664 0000000 0000000 00000002247 13232036066 0035147 0 ustar 00root root 0000000 0000000 slapos.core-15dedb62a3565373c77977e5ebd2e56b4ffe341a-slapos-tests/slapos/tests/slapproxy [slapos]
software_root = %(tempdir)s/opt/slapgrid
instance_root = %(tempdir)s/srv/slapgrid
master_url = %(proxyaddr)s
computer_id = computer
[slapproxy]
host = 127.0.0.1
port = 8080
database_uri = %(tempdir)s/lib/proxy.db
# Here goes the list of slapos masters that slapproxy can contact
# Each section beginning by multimaster is a different SlapOS Master, represented by arbitrary name.
# For each section, you need to specify the URL of the SlapOS Master.
# For each section, you can specify if needed the location of key/certificate used to authenticate to this slapOS Master.
# For each section, you can specify a list of Software Releases. Any instance request matching this Softwrare Release will be automatically forwarded to this SlapOS Master and will not be allocated locally.
[multimaster/https://slap.vifib.com]
key = /path/to/cert.key
cert = /path/to/cert.cert
# XXX add wildcard support for SR list.
software_release_list =
http://something.io/software.cfg
/some/arbitrary/local/unix/path
[multimaster/http://%(external_proxy_host)s:%(external_proxy_port)s]
# No certificate here: it is http.
software_release_list =
http://mywebsite.me/exteral_software_release.cfg
slapos.core-15dedb62a3565373c77977e5ebd2e56b4ffe341a-slapos-tests/slapos/tests/util.py 0000664 0000000 0000000 00000012204 13232036066 0027471 0 ustar 00root root 0000000 0000000 ##############################################################################
#
# Copyright (c) 2013 Vifib SARL and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import os
import slapos.util
from slapos.util import string_to_boolean
import tempfile
import unittest
import shutil
from pwd import getpwnam
class TestUtil(unittest.TestCase):
"""
Tests methods available in the slapos.util module.
"""
def test_mkdir_p_new_directory(self):
"""
Test that mkdir_p recursively creates a directory.
"""
root_directory = tempfile.mkdtemp()
wanted_directory = os.path.join(root_directory, 'foo', 'bar')
slapos.util.mkdir_p(wanted_directory)
self.assertTrue(os.path.isdir(wanted_directory))
shutil.rmtree(root_directory)
def test_mkdir_already_existing(self):
"""
Check that mkdir_p doesn't raise if directory already exist.
"""
root_directory = tempfile.mkdtemp()
slapos.util.mkdir_p(root_directory)
self.assertTrue(os.path.isdir(root_directory))
shutil.rmtree(root_directory)
def test_chown_directory(self):
"""
Test that slapos.util.chownDirectory correctly changes owner.
Note: requires root privileges.
"""
root_slaptest = tempfile.mkdtemp()
wanted_directory0 = os.path.join(root_slaptest, 'slap-write0')
wanted_directory1 = os.path.join(root_slaptest, 'slap-write0', 'write-slap1')
wanted_directory2 = os.path.join(root_slaptest, 'slap-write0', 'write-slap1', 'write-teste2')
wanted_directory_mkdir0 = os.makedirs(wanted_directory0, mode=0777)
wanted_directory_mkdir1 = os.makedirs(wanted_directory1, mode=0777)
wanted_directory_mkdir2 = os.makedirs(wanted_directory2, mode=0777)
create_file_txt = tempfile.mkstemp(suffix='.txt', prefix='tmp', dir=wanted_directory2, text=True)
user = 'nobody'
try:
uid = getpwnam(user)[2]
gid = getpwnam(user)[3]
except KeyError:
raise unittest.SkipTest("user %s doesn't exist." % user)
if os.getuid() != 0:
raise unittest.SkipTest("No root privileges, impossible to chown.")
slapos.util.chownDirectory(root_slaptest, uid, gid)
uid_check_root_slaptest = os.stat(root_slaptest)[4]
gid_check_root_slaptest = os.stat(root_slaptest)[5]
self.assertEquals(uid, uid_check_root_slaptest)
self.assertEquals(gid, gid_check_root_slaptest)
uid_check_wanted_directory0 = os.stat(wanted_directory0)[4]
gid_check_wanted_directory0 = os.stat(wanted_directory0)[5]
self.assertEquals(uid, uid_check_wanted_directory0)
self.assertEquals(gid, gid_check_wanted_directory0)
uid_check_wanted_directory1 = os.stat(wanted_directory1)[4]
gid_check_wanted_directory1 = os.stat(wanted_directory1)[5]
self.assertEquals(uid, uid_check_wanted_directory1)
self.assertEquals(gid, gid_check_wanted_directory1)
uid_check_wanted_directory2 = os.stat(wanted_directory2)[4]
gid_check_wanted_directory2 = os.stat(wanted_directory2)[5]
self.assertEquals(uid, uid_check_wanted_directory2)
self.assertEquals(gid, gid_check_wanted_directory2)
uid_check_file_txt = os.stat(create_file_txt[1])[4]
gid_check_file_txt = os.stat(create_file_txt[1])[5]
self.assertEquals(uid, uid_check_file_txt)
self.assertEquals(gid, gid_check_file_txt)
shutil.rmtree(root_slaptest)
def test_string_to_boolean_with_true_values(self):
"""
Check that mkdir_p doesn't raise if directory already exist.
"""
for value in ['true', 'True', 'TRUE']:
self.assertTrue(string_to_boolean(value))
def test_string_to_boolean_with_false_values(self):
"""
Check that mkdir_p doesn't raise if directory already exist.
"""
for value in ['false', 'False', 'False']:
self.assertFalse(string_to_boolean(value))
def test_string_to_boolean_with_incorrect_values(self):
"""
Check that mkdir_p doesn't raise if directory already exist.
"""
for value in [True, False, 1, '1', 't', 'tru', 'truelle', 'f', 'fals', 'falsey']:
self.assertRaises(ValueError, string_to_boolean, value)
if __name__ == '__main__':
unittest.main()