# -*- coding: utf-8 -*- ############################################################################## # # Copyright (c) 2012 Vifib SARL and Contributors. # All Rights Reserved. # # WARNING: This program as such is intended to be used by professional # programmers who take the whole responsibility of assessing all potential # consequences resulting from its eventual inadequacies and bugs # End users who are looking for a ready-to-use solution with commercial # guarantees and support are strongly advised to contract a Free Software # Service Company # # This program is Free Software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 3 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. # ############################################################################## import glob import logging import slapos.format import slapos.util import slapos.manager.cpuset import unittest from slapos.cli.format import FormatCommand from slapos.cli.entry import SlapOSApp from argparse import Namespace import netaddr import shutil import socket # for mocking import grp import netifaces import os import pwd import time import mock from .slapgrid import DummyManager USER_LIST = [] GROUP_LIST = [] INTERFACE_DICT = {} def file_content(file_path): """Read file(s) content.""" if isinstance(file_path, (list, tuple)): return [file_content(fx) for fx in file_path] with open(file_path, "rt") as fi: return fi.read().strip() def file_write(stuff, file_path): """Write stuff into file_path.""" with open(file_path, "wt") as fo: fo.write(stuff) class FakeConfig: pass class TestLoggerHandler(logging.Handler): def __init__(self, *args, **kwargs): self.bucket = [] logging.Handler.__init__(self, *args, **kwargs) def emit(self, record): self.bucket.append(record.msg) class FakeCallAndRead: def __init__(self): self.external_command_list = [] def __call__(self, argument_list, raise_on_error=True): retval = 0, 'UP' global INTERFACE_DICT if 'useradd' in argument_list: print argument_list global USER_LIST username = argument_list[-1] if username == '-r': username = argument_list[-2] USER_LIST.append(username) elif 'groupadd' in argument_list: global GROUP_LIST GROUP_LIST.append(argument_list[-1]) elif argument_list[:3] == ['ip', 'addr', 'add']: ip, interface = argument_list[3], argument_list[5] if ':' not in ip: netmask = netaddr.strategy.ipv4.int_to_str( netaddr.strategy.ipv4.prefix_to_netmask[int(ip.split('/')[1])]) ip = ip.split('/')[0] INTERFACE_DICT[interface][socket.AF_INET].append({'addr': ip, 'netmask': netmask}) else: netmask = netaddr.strategy.ipv6.int_to_str( netaddr.strategy.ipv6.prefix_to_netmask[int(ip.split('/')[1])]) ip = ip.split('/')[0] INTERFACE_DICT[interface][socket.AF_INET6].append({'addr': ip, 'netmask': netmask}) # stabilise by mangling ip to just ip string argument_list[3] = 'ip/%s' % netmask elif argument_list[:3] == ['ip', 'addr', 'list'] or \ argument_list[:4] == ['ip', '-6', 'addr', 'list']: retval = 0, str(INTERFACE_DICT) elif argument_list[:3] == ['ip', 'route', 'show']: retval = 0, 'OK' elif argument_list[:3] == ['route', 'add', '-host']: retval = 0, 'OK' self.external_command_list.append(' '.join(argument_list)) return retval class LoggableWrapper: def __init__(self, logger, name): self.__logger = logger self.__name = name def __call__(self, *args, **kwargs): arg_list = [repr(x) for x in args] + [ '%s=%r' % (x, y) for x, y in kwargs.iteritems()] self.__logger.debug('%s(%s)' % (self.__name, ', '.join(arg_list))) class TimeMock: @classmethod def sleep(self, seconds): return class GrpMock: @classmethod def getgrnam(self, name): global GROUP_LIST if name in GROUP_LIST: return True raise KeyError class PwdMock: @classmethod def getpwnam(self, name): global USER_LIST if name in USER_LIST: class PwdResult: def __init__(self, name): self.pw_name = name self.pw_uid = self.pw_gid = USER_LIST.index(name) def __getitem__(self, index): if index == 0: return self.pw_name if index == 2: return self.pw_uid if index == 3: return self.pw_gid return PwdResult(name) raise KeyError("User \"{}\" not in global USER_LIST {!s}".format(name, USER_LIST)) class NetifacesMock: @classmethod def ifaddresses(self, name): global INTERFACE_DICT if name in INTERFACE_DICT: return INTERFACE_DICT[name] raise ValueError("Interface \"{}\" not in INTERFACE_DICT {!s}".format( name, INTERFACE_DICT)) @classmethod def interfaces(self): global INTERFACE_DICT return INTERFACE_DICT.keys() class SlaposUtilMock: @classmethod def chownDirectory(*args, **kw): pass class SlapformatMixin(unittest.TestCase): # keep big diffs maxDiff = None def patchNetifaces(self): self.netifaces = NetifacesMock() self.saved_netifaces = {} for fake in vars(NetifacesMock): self.saved_netifaces[fake] = getattr(netifaces, fake, None) setattr(netifaces, fake, getattr(self.netifaces, fake)) def restoreNetifaces(self): for name, original_value in self.saved_netifaces.items(): setattr(netifaces, name, original_value) del self.saved_netifaces def patchPwd(self): self.saved_pwd = {} for fake in vars(PwdMock): self.saved_pwd[fake] = getattr(pwd, fake, None) setattr(pwd, fake, getattr(PwdMock, fake)) def restorePwd(self): for name, original_value in self.saved_pwd.items(): setattr(pwd, name, original_value) del self.saved_pwd def patchTime(self): self.saved_time = {} for fake in vars(TimeMock): self.saved_time[fake] = getattr(time, fake, None) setattr(time, fake, getattr(TimeMock, fake)) def restoreTime(self): for name, original_value in self.saved_time.items(): setattr(time, name, original_value) del self.saved_time def patchGrp(self): self.saved_grp = {} for fake in vars(GrpMock): self.saved_grp[fake] = getattr(grp, fake, None) setattr(grp, fake, getattr(GrpMock, fake)) def restoreGrp(self): for name, original_value in self.saved_grp.items(): setattr(grp, name, original_value) del self.saved_grp def patchOs(self, logger): self.saved_os = {} for fake in ['mkdir', 'chown', 'chmod', 'makedirs']: self.saved_os[fake] = getattr(os, fake, None) f = LoggableWrapper(logger, fake) setattr(os, fake, f) def restoreOs(self): if not hasattr(self, 'saved_os'): return # os was never patched or already restored for name, original_value in self.saved_os.items(): setattr(os, name, original_value) del self.saved_os def patchSlaposUtil(self): self.saved_slapos_util = {} for fake in ['chownDirectory']: self.saved_slapos_util[fake] = getattr(slapos.util, fake, None) setattr(slapos.util, fake, getattr(SlaposUtilMock, fake)) def restoreSlaposUtil(self): for name, original_value in self.saved_slapos_util.items(): setattr(slapos.util, name, original_value) del self.saved_slapos_util def setUp(self): config = FakeConfig() config.dry_run = True config.verbose = True logger = logging.getLogger('testcatch') logger.setLevel(logging.DEBUG) self.test_result = TestLoggerHandler() logger.addHandler(self.test_result) config.logger = logger if hasattr(self, "logger"): raise ValueError("{} already has logger attached".format(self.__class__.__name__)) self.logger = logger self.partition = slapos.format.Partition('partition', '/part_path', slapos.format.User('testuser'), [], None) global USER_LIST USER_LIST = [] global GROUP_LIST GROUP_LIST = [] global INTERFACE_DICT INTERFACE_DICT = {} self.real_callAndRead = slapos.format.callAndRead self.fakeCallAndRead = FakeCallAndRead() slapos.format.callAndRead = self.fakeCallAndRead self.patchOs(logger) self.patchGrp() self.patchTime() self.patchPwd() self.patchNetifaces() self.patchSlaposUtil() def tearDown(self): self.restoreOs() self.restoreGrp() self.restoreTime() self.restorePwd() self.restoreNetifaces() self.restoreSlaposUtil() slapos.format.callAndRead = self.real_callAndRead class TestComputer(SlapformatMixin): def test_getAddress_empty_computer(self): computer = slapos.format.Computer('computer', instance_root='/instance_root', software_root='software_root') self.assertEqual(computer.getAddress(), {'netmask': None, 'addr': None}) @unittest.skip("Not implemented") def test_construct_empty(self): computer = slapos.format.Computer('computer', instance_root='/instance_root', software_root='software_root') computer.format() @unittest.skip("Not implemented") def test_construct_empty_prepared(self): computer = slapos.format.Computer('computer', instance_root='/instance_root', software_root='/software_root', interface=slapos.format.Interface( logger=self.logger, name='lo', ipv4_local_network='127.0.0.1/16'), partition_list=[]) computer.format() self.assertEqual([ "makedirs('/instance_root', 493)", "makedirs('/software_root', 493)", "chown('/software_root', 0, 0)", "chmod('/software_root', 493)"], self.test_result.bucket) self.assertEqual([ 'ip addr list lo', 'groupadd slapsoft', 'useradd -d /software_root -g slapsoft slapsoft -r' ], self.fakeCallAndRead.external_command_list) def test_construct_empty_prepared_no_alter_user(self): computer = slapos.format.Computer('computer', instance_root='/instance_root', software_root='/software_root', interface=slapos.format.Interface( logger=self.logger, name='lo', ipv4_local_network='127.0.0.1/16'), partition_list=[]) computer.format(alter_user=False) self.assertEqual([ "makedirs('/instance_root', 493)", "makedirs('/software_root', 493)", "chmod('/software_root', 493)"], self.test_result.bucket) self.assertEqual([], self.fakeCallAndRead.external_command_list) @unittest.skip("Not implemented") def test_construct_empty_prepared_no_alter_network(self): computer = slapos.format.Computer('computer', instance_root='/instance_root', software_root='/software_root', interface=slapos.format.Interface( logger=self.logger, name='lo', ipv4_local_network='127.0.0.1/16'), partition_list=[]) self.assertEqual([ "makedirs('/instance_root', 493)", "makedirs('/software_root', 493)", "chown('/software_root', 0, 0)", "chmod('/software_root', 493)"], self.test_result.bucket) self.assertEqual([ 'ip addr list lo', 'groupadd slapsoft', 'useradd -d /software_root -g slapsoft slapsoft -r' ], self.fakeCallAndRead.external_command_list) def test_construct_empty_prepared_no_alter_network_user(self): computer = slapos.format.Computer('computer', instance_root='/instance_root', software_root='/software_root', interface=slapos.format.Interface( logger=self.logger, name='lo', ipv4_local_network='127.0.0.1/16'), partition_list=[]) computer.format(alter_network=False, alter_user=False) self.assertEqual([ "makedirs('/instance_root', 493)", "makedirs('/software_root', 493)", "chmod('/software_root', 493)"], self.test_result.bucket) self.assertEqual([], self.fakeCallAndRead.external_command_list) @unittest.skip("Not implemented") def test_construct_prepared(self): computer = slapos.format.Computer('computer', instance_root='/instance_root', software_root='/software_root', interface=slapos.format.Interface( logger=self.logger, name='myinterface', ipv4_local_network='127.0.0.1/16'), partition_list=[ slapos.format.Partition( 'partition', '/part_path', slapos.format.User('testuser'), [], tap=slapos.format.Tap('tap')), ]) global INTERFACE_DICT INTERFACE_DICT['myinterface'] = { socket.AF_INET: [{'addr': '192.168.242.77', 'broadcast': '127.0.0.1', 'netmask': '255.255.255.0'}], socket.AF_INET6: [{'addr': '2a01:e35:2e27::e59c', 'netmask': 'ffff:ffff:ffff:ffff::'}] } computer.format() self.assertEqual([ "makedirs('/instance_root', 493)", "makedirs('/software_root', 493)", "chown('/software_root', 0, 0)", "chmod('/software_root', 493)", "mkdir('/instance_root/partition', 488)", "chown('/instance_root/partition', 0, 0)", "chmod('/instance_root/partition', 488)" ], self.test_result.bucket) self.assertEqual([ 'ip addr list myinterface', 'groupadd slapsoft', 'useradd -d /software_root -g slapsoft slapsoft -r', 'groupadd testuser', 'useradd -d /instance_root/partition -g testuser -G slapsoft testuser -r', 'ip tuntap add dev tap mode tap user testuser', 'ip link set tap up', 'ip addr add ip/255.255.255.255 dev myinterface', 'ip addr list myinterface', 'ip addr add ip/ffff:ffff:ffff:ffff:: dev myinterface', 'ip addr list myinterface', ], self.fakeCallAndRead.external_command_list) def test_construct_prepared_no_alter_user(self): computer = slapos.format.Computer('computer', instance_root='/instance_root', software_root='/software_root', interface=slapos.format.Interface( logger=self.logger, name='myinterface', ipv4_local_network='127.0.0.1/16'), partition_list=[ slapos.format.Partition( 'partition', '/part_path', slapos.format.User('testuser'), [], tap=slapos.format.Tap('tap')), ]) global USER_LIST USER_LIST = ['testuser'] global INTERFACE_DICT INTERFACE_DICT['myinterface'] = { socket.AF_INET: [{'addr': '192.168.242.77', 'broadcast': '127.0.0.1', 'netmask': '255.255.255.0'}], socket.AF_INET6: [{'addr': '2a01:e35:2e27::e59c', 'netmask': 'ffff:ffff:ffff:ffff::'}] } INTERFACE_DICT['tap'] = { socket.AF_INET6: [{'addr': '2a01:e35:2e27::e59c', 'netmask': 'ffff:ffff:ffff:ffff::'}] } computer.format(alter_user=False) self.assertEqual([ "makedirs('/instance_root', 493)", "makedirs('/software_root', 493)", "chmod('/software_root', 493)", "mkdir('/instance_root/partition', 488)", "chmod('/instance_root/partition', 488)" ], self.test_result.bucket) self.assertEqual([ 'ip tuntap add dev tap mode tap user testuser', 'ip link set tap up', 'ip addr add ip/ffff:ffff:ffff:ffff:ffff:: dev tap', 'ip -6 addr list tap', 'ip route show 10.0.0.2', 'ip route add 10.0.0.2 dev tap', 'ip addr add ip/255.255.255.255 dev myinterface', # 'ip addr list myinterface', 'ip addr add ip/ffff:ffff:ffff:ffff:: dev myinterface', 'ip -6 addr list myinterface', ], self.fakeCallAndRead.external_command_list) def test_construct_prepared_tap_no_alter_user(self): computer = slapos.format.Computer('computer', instance_root='/instance_root', software_root='/software_root', tap_gateway_interface='eth1', interface=slapos.format.Interface( logger=self.logger, name='iface', ipv4_local_network='127.0.0.1/16'), partition_list=[ slapos.format.Partition( 'partition', '/part_path', slapos.format.User('testuser'), [], tap=slapos.format.Tap('tap')), ]) global USER_LIST USER_LIST = ['testuser'] global INTERFACE_DICT INTERFACE_DICT['iface'] = { socket.AF_INET: [{'addr': '192.168.242.77', 'broadcast': '127.0.0.1', 'netmask': '255.255.255.0'}], socket.AF_INET6: [{'addr': '2a01:e35:2e27:3456::e59c', 'netmask': 'ffff:ffff:ffff:ffff:ffff::'}] } INTERFACE_DICT['eth1'] = { socket.AF_INET: [{'addr': '10.8.0.1', 'broadcast': '10.8.0.254', 'netmask': '255.255.255.0'}] } INTERFACE_DICT['tap'] = { socket.AF_INET6: [{'addr': '2a01:e35:2e27:3456::e59c', 'netmask': 'ffff:ffff:ffff:ffff:ffff::'}] } computer.format(alter_user=False) self.assertEqual([ "makedirs('/instance_root', 493)", "makedirs('/software_root', 493)", "chmod('/software_root', 493)", "mkdir('/instance_root/partition', 488)", "chmod('/instance_root/partition', 488)" ], self.test_result.bucket) self.assertEqual([ 'ip tuntap add dev tap mode tap user testuser', 'ip link set tap up', 'ip addr add ip/ffff:ffff:ffff:ffff:ffff:ffff:: dev tap', 'ip -6 addr list tap', 'ip route show 10.8.0.2', 'ip route add 10.8.0.2 dev tap', 'ip addr add ip/255.255.255.255 dev iface', 'ip addr add ip/ffff:ffff:ffff:ffff:ffff:: dev iface', 'ip -6 addr list iface' ], self.fakeCallAndRead.external_command_list) partition = computer.partition_list[0] self.assertEqual(partition.tap.ipv4_addr, '10.8.0.2') self.assertEqual(partition.tap.ipv4_netmask, '255.255.255.0') self.assertEqual(partition.tap.ipv4_gateway, '10.8.0.1') self.assertEqual(partition.tap.ipv4_network, '10.8.0.0') @unittest.skip("Not implemented") def test_construct_prepared_no_alter_network(self): computer = slapos.format.Computer('computer', instance_root='/instance_root', software_root='/software_root', interface=slapos.format.Interface( logger=self.logger, name='myinterface', ipv4_local_network='127.0.0.1/16'), partition_list=[ slapos.format.Partition( 'partition', '/part_path', slapos.format.User('testuser'), [], tap=slapos.format.Tap('tap')), ]) global INTERFACE_DICT INTERFACE_DICT['myinterface'] = { socket.AF_INET: [{'addr': '192.168.242.77', 'broadcast': '127.0.0.1', 'netmask': '255.255.255.0'}], socket.AF_INET6: [{'addr': '2a01:e35:2e27::e59c', 'netmask': 'ffff:ffff:ffff:ffff::'}] } computer.format(alter_network=False) self.assertEqual([ "makedirs('/instance_root', 493)", "makedirs('/software_root', 493)", "chown('/software_root', 0, 0)", "chmod('/software_root', 493)", "mkdir('/instance_root/partition', 488)", "chown('/instance_root/partition', 0, 0)", "chmod('/instance_root/partition', 488)" ], self.test_result.bucket) self.assertEqual([ # 'ip addr list myinterface', 'groupadd slapsoft', 'useradd -d /software_root -g slapsoft slapsoft -r', 'groupadd testuser', 'useradd -d /instance_root/partition -g testuser -G slapsoft testuser -r', # 'ip addr add ip/255.255.255.255 dev myinterface', # 'ip addr list myinterface', # 'ip addr add ip/ffff:ffff:ffff:ffff:: dev myinterface', # 'ip addr list myinterface', ], self.fakeCallAndRead.external_command_list) def test_construct_prepared_no_alter_network_user(self): computer = slapos.format.Computer('computer', instance_root='/instance_root', software_root='/software_root', interface=slapos.format.Interface( logger=self.logger, name='myinterface', ipv4_local_network='127.0.0.1/16'), partition_list=[ slapos.format.Partition( 'partition', '/part_path', slapos.format.User('testuser'), [], tap=slapos.format.Tap('tap')), ]) global INTERFACE_DICT INTERFACE_DICT['myinterface'] = { socket.AF_INET: [{'addr': '192.168.242.77', 'broadcast': '127.0.0.1', 'netmask': '255.255.255.0'}], socket.AF_INET6: [{'addr': '2a01:e35:2e27::e59c', 'netmask': 'ffff:ffff:ffff:ffff::'}] } computer.format(alter_network=False, alter_user=False) self.assertEqual([ "makedirs('/instance_root', 493)", "makedirs('/software_root', 493)", "chmod('/software_root', 493)", "mkdir('/instance_root/partition', 488)", "chmod('/instance_root/partition', 488)" ], self.test_result.bucket) self.assertEqual([ 'ip addr add ip/255.255.255.255 dev myinterface', # 'ip addr list myinterface', 'ip addr add ip/ffff:ffff:ffff:ffff:: dev myinterface', 'ip -6 addr list myinterface', ], self.fakeCallAndRead.external_command_list) def test_construct_use_unique_local_address_block(self): """ Test that slapformat creates a unique local address in the interface. """ global USER_LIST USER_LIST = ['root'] computer = slapos.format.Computer('computer', instance_root='/instance_root', software_root='/software_root', interface=slapos.format.Interface( logger=self.logger, name='myinterface', ipv4_local_network='127.0.0.1/16'), partition_list=[ slapos.format.Partition( 'partition', '/part_path', slapos.format.User('testuser'), [], tap=slapos.format.Tap('tap')), ]) global INTERFACE_DICT INTERFACE_DICT['myinterface'] = { socket.AF_INET: [{'addr': '192.168.242.77', 'broadcast': '127.0.0.1', 'netmask': '255.255.255.0'}], socket.AF_INET6: [{'addr': '2a01:e35:2e27::e59c', 'netmask': 'ffff:ffff:ffff:ffff::'}] } computer.format(use_unique_local_address_block=True, alter_user=False, create_tap=False) self.assertEqual([ "makedirs('/instance_root', 493)", "makedirs('/software_root', 493)", "chmod('/software_root', 493)", "mkdir('/instance_root/partition', 488)", "chmod('/instance_root/partition', 488)" ], self.test_result.bucket) self.assertEqual([ 'ip address add dev myinterface fd00::1/64', 'ip addr add ip/255.255.255.255 dev myinterface', 'ip addr add ip/ffff:ffff:ffff:ffff:: dev myinterface', 'ip -6 addr list myinterface' ], self.fakeCallAndRead.external_command_list) class SlapGridPartitionMock: def __init__(self, partition): self.partition = partition self.instance_path = partition.path def getUserGroupId(self): return (0, 0) class TestComputerWithCPUSet(SlapformatMixin): cpuset_path = "/tmp/cpuset/" task_write_mode = "at" # append insted of write tasks PIDs for the tests def setUp(self): logging.getLogger("slapos.manager.cpuset").addHandler( logging.StreamHandler()) super(TestComputerWithCPUSet, self).setUp() self.restoreOs() if os.path.isdir("/tmp/slapgrid/"): shutil.rmtree("/tmp/slapgrid/") os.mkdir("/tmp/slapgrid/") if os.path.isdir(self.cpuset_path): shutil.rmtree(self.cpuset_path) os.mkdir(self.cpuset_path) file_write("0,1-3", os.path.join(self.cpuset_path, "cpuset.cpus")) file_write("\n".join(("1000", "1001", "1002", "")), os.path.join(self.cpuset_path, "tasks")) self.cpu_list = [0, 1, 2, 3] global USER_LIST, INTERFACE_DICT USER_LIST = ['testuser'] INTERFACE_DICT['lo'] = { socket.AF_INET: [ {'addr': '127.0.0.1', 'broadcast': '127.0.255.255', 'netmask': '255.255.0.0'}], socket.AF_INET6: [ {'addr': '2a01:e35:2e27::e59c', 'netmask': 'ffff:ffff:ffff:ffff::'}] } from slapos.manager.cpuset import Manager self.orig_cpuset_path = Manager.cpuset_path self.orig_task_write_mode = Manager.task_write_mode Manager.cpuset_path = self.cpuset_path Manager.task_write_mode = self.task_write_mode self.computer = slapos.format.Computer('computer', software_user='testuser', instance_root='/tmp/slapgrid/instance_root', software_root='/tmp/slapgrid/software_root', interface=slapos.format.Interface( logger=self.logger, name='lo', ipv4_local_network='127.0.0.1/16'), partition_list=[ slapos.format.Partition( 'partition', '/tmp/slapgrid/instance_root/part1', slapos.format.User('testuser'), [], tap=None), ], config={ "manager_list": "cpuset", "power_user_list": "testuser root" } ) # self.patchOs(self.logger) def tearDown(self): """Cleanup temporary test folders.""" from slapos.manager.cpuset import Manager Manager.cpuset_path = self.orig_cpuset_path Manager.task_write_mode = self.orig_task_write_mode super(TestComputerWithCPUSet, self).tearDown() shutil.rmtree("/tmp/slapgrid/") if self.cpuset_path.startswith("/tmp"): shutil.rmtree(self.cpuset_path) logging.getLogger("slapos.manager.cpuset") def test_positive_cgroups(self): """Positive test of cgroups.""" # Test parsing "cpuset.cpus" file self.assertEqual(self.computer._manager_list[0]._cpu_id_list(), self.cpu_list) # This should created per-cpu groups and move all tasks in CPU pool into cpu0 self.computer.format(alter_network=False, alter_user=False) # Test files creation for exclusive CPUs for cpu_id in self.cpu_list: cpu_n_path = os.path.join(self.cpuset_path, "cpu" + str(cpu_id)) self.assertEqual(str(cpu_id), file_content(os.path.join(cpu_n_path, "cpuset.cpus"))) self.assertEqual("1", file_content(os.path.join(cpu_n_path, "cpuset.cpu_exclusive"))) if cpu_id > 0: self.assertEqual("", file_content(os.path.join(cpu_n_path, "tasks"))) # Test moving tasks from generic core to private core # request PID 1001 to be moved to its private CPU request_file_path = os.path.join(self.computer.partition_list[0].path, slapos.manager.cpuset.Manager.cpu_exclusive_file) file_write("1001\n", request_file_path) # Simulate slapos instance call to perform the actual movement self.computer._manager_list[0].instance( SlapGridPartitionMock(self.computer.partition_list[0])) # Simulate cgroup behaviour - empty tasks in the pool file_write("", os.path.join(self.cpuset_path, "tasks")) # Test that format moved all PIDs from CPU pool into CPU0 tasks_at_cpu0 = file_content(os.path.join(self.cpuset_path, "cpu0", "tasks")).split() self.assertIn("1000", tasks_at_cpu0) # test if the moving suceeded into any provate CPUS (id>0) self.assertTrue(any("1001" in file_content(exclusive_task) for exclusive_task in glob.glob(os.path.join(self.cpuset_path, "cpu[1-9]", "tasks")))) self.assertIn("1002", tasks_at_cpu0) # slapformat should remove successfully moved PIDs from the .slapos-cpu-exclusive file self.assertEqual("", file_content(request_file_path).strip()) class TestPartition(SlapformatMixin): def test_createPath_no_alter_user(self): self.partition.createPath(False) self.assertEqual( [ "mkdir('/part_path', 488)", "chmod('/part_path', 488)" ], self.test_result.bucket ) class TestUser(SlapformatMixin): def test_create(self): user = slapos.format.User('doesnotexistsyet') user.setPath('/doesnotexistsyet') user.create() self.assertEqual([ 'groupadd doesnotexistsyet', 'useradd -d /doesnotexistsyet -g doesnotexistsyet -s /bin/sh '\ 'doesnotexistsyet -r', 'passwd -l doesnotexistsyet' ], self.fakeCallAndRead.external_command_list) def test_create_additional_groups(self): user = slapos.format.User('doesnotexistsyet', ['additionalgroup1', 'additionalgroup2']) user.setPath('/doesnotexistsyet') user.create() self.assertEqual([ 'groupadd doesnotexistsyet', 'useradd -d /doesnotexistsyet -g doesnotexistsyet -s /bin/sh -G '\ 'additionalgroup1,additionalgroup2 doesnotexistsyet -r', 'passwd -l doesnotexistsyet' ], self.fakeCallAndRead.external_command_list) def test_create_group_exists(self): global GROUP_LIST GROUP_LIST = ['testuser'] user = slapos.format.User('testuser') user.setPath('/testuser') user.create() self.assertEqual([ 'useradd -d /testuser -g testuser -s /bin/sh testuser -r', 'passwd -l testuser' ], self.fakeCallAndRead.external_command_list) def test_create_user_exists_additional_groups(self): global USER_LIST USER_LIST = ['testuser'] user = slapos.format.User('testuser', ['additionalgroup1', 'additionalgroup2']) user.setPath('/testuser') user.create() self.assertEqual([ 'groupadd testuser', 'usermod -d /testuser -g testuser -s /bin/sh -G '\ 'additionalgroup1,additionalgroup2 testuser', 'passwd -l testuser' ], self.fakeCallAndRead.external_command_list) def test_create_user_exists(self): global USER_LIST USER_LIST = ['testuser'] user = slapos.format.User('testuser') user.setPath('/testuser') user.create() self.assertEqual([ 'groupadd testuser', 'usermod -d /testuser -g testuser -s /bin/sh testuser', 'passwd -l testuser' ], self.fakeCallAndRead.external_command_list) def test_create_user_group_exists(self): global USER_LIST USER_LIST = ['testuser'] global GROUP_LIST GROUP_LIST = ['testuser'] user = slapos.format.User('testuser') user.setPath('/testuser') user.create() self.assertEqual([ 'usermod -d /testuser -g testuser -s /bin/sh testuser', 'passwd -l testuser' ], self.fakeCallAndRead.external_command_list) def test_isAvailable(self): global USER_LIST USER_LIST = ['testuser'] user = slapos.format.User('testuser') self.assertTrue(user.isAvailable()) def test_isAvailable_notAvailable(self): user = slapos.format.User('doesnotexistsyet') self.assertFalse(user.isAvailable()) class TestSlapformatManagerLifecycle(SlapformatMixin): def test_partition_format(self): computer = slapos.format.Computer('computer', instance_root='/instance_root', software_root='software_root') manager = DummyManager() computer._manager_list = [manager] computer.format(alter_user=False, alter_network=False) self.assertEqual(manager.sequence, ['format', 'formatTearDown']) class TestFormatConfig(SlapformatMixin): def fake_take_action(self, *args): app = SlapOSApp() format_command = FormatCommand(app, Namespace()) parsed_args = format_command.get_parser("slapos node format fake").parse_args(args) configp = format_command.fetch_config(parsed_args) conf = slapos.format.FormatConfig(logger=self.logger) conf.mergeConfig(parsed_args, configp) conf.setConfig() return conf def test_empty_cmdline_options(self): conf = self.fake_take_action() self.assertTrue(conf.alter_network) self.assertTrue(conf.alter_user) def test_cmdline1_options(self): conf = self.fake_take_action( "--alter_network", "False", "--alter_user", "True") self.assertFalse(conf.alter_network) self.assertTrue(conf.alter_user) # TODO add more tests with config file if __name__ == '__main__': unittest.main()