Commit 710ef80e authored by Łukasz Nowak's avatar Łukasz Nowak

Merge branch 'core-testing'

parents 29f6349e 44e84d66
...@@ -57,4 +57,5 @@ setup(name=name, ...@@ -57,4 +57,5 @@ setup(name=name,
'slapproxy = slapos.proxy:main', 'slapproxy = slapos.proxy:main',
] ]
}, },
test_suite="slapos.tests",
) )
...@@ -27,13 +27,13 @@ ...@@ -27,13 +27,13 @@
############################################################################## ##############################################################################
from optparse import OptionParser, Option from optparse import OptionParser, Option
from xml_marshaller import xml_marshaller from xml_marshaller import xml_marshaller
from pwd import getpwnam
import ConfigParser import ConfigParser
import grp import grp
import logging import logging
import netaddr import netaddr
import netifaces import netifaces
import os import os
import pwd
import random import random
import slapos.slap as slap import slapos.slap as slap
import socket import socket
...@@ -317,7 +317,7 @@ class Computer: ...@@ -317,7 +317,7 @@ class Computer:
slapsoft.path = self.software_root slapsoft.path = self.software_root
if alter_user: if alter_user:
slapsoft.create() slapsoft.create()
slapsoft_pw = getpwnam(slapsoft.name) slapsoft_pw = pwd.getpwnam(slapsoft.name)
os.chown(self.software_root, slapsoft_pw.pw_uid, slapsoft_pw.pw_gid) os.chown(self.software_root, slapsoft_pw.pw_uid, slapsoft_pw.pw_gid)
os.chmod(self.software_root, 0755) os.chmod(self.software_root, 0755)
...@@ -415,7 +415,7 @@ class Partition: ...@@ -415,7 +415,7 @@ class Partition:
if not os.path.exists(self.path): if not os.path.exists(self.path):
os.mkdir(self.path, 0750) os.mkdir(self.path, 0750)
if alter_user: if alter_user:
owner_pw = getpwnam(owner.name) owner_pw = pwd.getpwnam(owner.name)
os.chown(self.path, owner_pw.pw_uid, owner_pw.pw_gid) os.chown(self.path, owner_pw.pw_uid, owner_pw.pw_gid)
os.chmod(self.path, 0750) os.chmod(self.path, 0750)
...@@ -457,7 +457,7 @@ class User: ...@@ -457,7 +457,7 @@ class User:
user_parameter_list.extend(['-G', ','.join(self.additional_group_list)]) user_parameter_list.extend(['-G', ','.join(self.additional_group_list)])
user_parameter_list.append(self.name) user_parameter_list.append(self.name)
try: try:
getpwnam(self.name) pwd.getpwnam(self.name)
except KeyError: except KeyError:
callAndRead(['useradd'] + user_parameter_list) callAndRead(['useradd'] + user_parameter_list)
else: else:
...@@ -475,7 +475,7 @@ class User: ...@@ -475,7 +475,7 @@ class User:
""" """
try: try:
getpwnam(self.name) pwd.getpwnam(self.name)
return True return True
except KeyError: except KeyError:
...@@ -572,7 +572,7 @@ class Tap: ...@@ -572,7 +572,7 @@ class Tap:
owner_id = int(open(check_file).read().strip()) owner_id = int(open(check_file).read().strip())
except Exception: except Exception:
pass pass
if (owner_id is None) or (owner_id != getpwnam(owner.name).pw_uid): if (owner_id is None) or (owner_id != pwd.getpwnam(owner.name).pw_uid):
callAndRead(['tunctl', '-t', self.name, '-u', owner.name]) callAndRead(['tunctl', '-t', self.name, '-u', owner.name])
callAndRead(['ip', 'link', 'set', self.name, 'up']) callAndRead(['ip', 'link', 'set', self.name, 'up'])
...@@ -584,7 +584,7 @@ class Tap: ...@@ -584,7 +584,7 @@ class Tap:
class Bridge: class Bridge:
"Bridge represent a bridge on the system" "Bridge represent a bridge on the system"
def __init__(self, name, ipv4_local_network, ipv6_interface): def __init__(self, name, ipv4_local_network, ipv6_interface=None):
""" """
Attributes: Attributes:
name: String, the name of the bridge name: String, the name of the bridge
...@@ -846,11 +846,14 @@ class Parser(OptionParser): ...@@ -846,11 +846,14 @@ class Parser(OptionParser):
help="Shall slapformat alter network configuration [default: True]"), help="Shall slapformat alter network configuration [default: True]"),
]) ])
def check_args(self): def check_args(self, args):
""" """
Check arguments Check arguments
""" """
(options, args) = self.parse_args() if args:
(options, args) = self.parse_args(list(args))
else:
(options, args) = self.parse_args()
if len(args) != 1: if len(args) != 1:
self.error("Incorrect number of arguments") self.error("Incorrect number of arguments")
return options, args[0] return options, args[0]
...@@ -1093,17 +1096,17 @@ class Config: ...@@ -1093,17 +1096,17 @@ class Config:
self.computer_xml = os.path.abspath(self.computer_xml) self.computer_xml = os.path.abspath(self.computer_xml)
def main(): def main(*args):
"Run default configuration." "Run default configuration."
global os global os
global callAndRead global callAndRead
global getpwnam global pwd
real_callAndRead = callAndRead real_callAndRead = callAndRead
usage = "usage: %s [options] CONFIGURATION_FILE" % sys.argv[0] usage = "usage: %s [options] CONFIGURATION_FILE" % sys.argv[0]
try: try:
# Parse arguments # Parse arguments
options, configuration_file_path = Parser(usage=usage).check_args() options, configuration_file_path = Parser(usage=usage).check_args(args)
config = Config() config = Config()
config.setConfig(options, configuration_file_path) config.setConfig(options, configuration_file_path)
os = OS(config) os = OS(config)
...@@ -1125,7 +1128,7 @@ def main(): ...@@ -1125,7 +1128,7 @@ def main():
pw_uid = 12345 pw_uid = 12345
pw_gid = 54321 pw_gid = 54321
return result return result
getpwnam = fake_getpwnam pwd.getpwnam = fake_getpwnam
else: else:
dry_callAndRead = real_callAndRead dry_callAndRead = real_callAndRead
if config.verbose: if config.verbose:
......
...@@ -337,7 +337,7 @@ class Slapgrid(object): ...@@ -337,7 +337,7 @@ class Slapgrid(object):
computer_partition_list = self.computer.getComputerPartitionList() computer_partition_list = self.computer.getComputerPartitionList()
except socket.error as error: except socket.error as error:
self.logger.fatal(error) self.logger.fatal(error)
sys.exit(1) raise
return computer_partition_list return computer_partition_list
def processSoftwareReleaseList(self): def processSoftwareReleaseList(self):
......
#!/usr/bin/env python2
# -*- coding: utf-8 -*-
"""Mocked httplib
"""
from urlparse import urlparse
__all__ = []
def log(message):
"""Need to be overridden to get a proper logger
"""
pass
class HTTPConnection(object):
scheme = 'http'
def _callback(self, path, method, body, headers):
"""To get it works properly, you need to override
HTTPConnection._callback. This method received the instance, the path,
method and request body as parameter, and it has to return a tuple with
headers dictionary and body response string.
@param self object instance reference
@param URL the parsed URL
@param method the http method
@param body the request body
@param headers the request headers
@return tuple containing status integer, headers dictionary and body
response"""
return (0, {}, '', )
def __init__(self, host, port=None, strict=None,
timeout=None, source_address=None):
self.host = host
self.port = port
self.strict = strict
self.timeout = timeout
self.source_address = source_address
self.__response = None
def request(self, method, url, body=None, headers=None):
status, headers, body = self._callback(url, method, body, headers)
self.__response = HTTPResponse('HTTP/1.1', 200, 'OK', body, headers)
def getresponse(self):
response = self.__response
self.__response = None
return response
def set_debuglevel(self, level):
pass
def set_tunnel(self, host, port=None, headers=None):
pass
def connect(self):
pass
def close(self):
pass
def putrequest(self, request, selector, skip_host=None,
skip_accept_encoding=None):
pass
def putheader(self, *args):
pass
def endheaders(self):
pass
def send(self, data):
pass
class HTTPSConnection(HTTPConnection):
def __init__(self, host, port=None, key_file=None,
cert_file=None, strict=None, timeout=None,
source_address=None):
super().__init__(self, host, port, strict, timeout,
source_address)
pass
class HTTPResponse(object):
def __init__(self, version, status, reason, content, headers=()):
self.version = version
self.status = status
self.reason = reason
self.__headers = headers
self.__content = content
def read(self, amt=None):
result = None
if amt is None:
result = self.__content
self.__content = ''
else:
end = max(amt, len(self.__content))
result = self.__content[:end]
del self.__content[:end]
return result
def getheader(self, name, default=None):
pass
def getheaders(self):
pass
import logging
import slapos.format
import unittest
import netaddr
# for mocking
import grp
import netifaces
import os
import pwd
import time
USER_LIST = []
GROUP_LIST = []
INTERFACE_DICT = {}
class FakeConfig:
pass
class TestLoggerHandler(logging.Handler):
def __init__(self, *args, **kwargs):
self.bucket = []
logging.Handler.__init__(self, *args, **kwargs)
def emit(self, record):
self.bucket.append(record.msg)
class FakeCallAndRead:
def __init__(self):
self.external_command_list = []
def __call__(self, argument_list, raise_on_error=True):
retval = 0, 'UP'
global INTERFACE_DICT
if 'useradd' in argument_list:
global USER_LIST
USER_LIST.append(argument_list[-1])
elif 'groupadd' in argument_list:
global GROUP_LIST
GROUP_LIST.append(argument_list[-1])
elif argument_list[:3] == ['ip', 'addr', 'add']:
ip, interface = argument_list[3], argument_list[5]
if ':' not in ip:
netmask = netaddr.strategy.ipv4.int_to_str(
netaddr.strategy.ipv4.prefix_to_netmask[int(ip.split('/')[1])])
ip = ip.split('/')[0]
INTERFACE_DICT[interface][2].append({'addr': ip, 'netmask': netmask})
else:
netmask = netaddr.strategy.ipv6.int_to_str(
netaddr.strategy.ipv6.prefix_to_netmask[int(ip.split('/')[1])])
ip = ip.split('/')[0]
INTERFACE_DICT[interface][10].append({'addr': ip, 'netmask': netmask})
# stabilise by mangling ip to just ip string
argument_list[3] = 'ip/%s' % netmask
elif argument_list[:3] == ['ip', 'addr', 'list']:
retval = 0, str(INTERFACE_DICT)
self.external_command_list.append(' '.join(argument_list))
return retval
class LoggableWrapper:
def __init__(self, logger, name):
self.__logger = logger
self.__name = name
def __call__(self, *args, **kwargs):
arg_list = [repr(x) for x in args] + [
'%s=%r' % (x, y) for x, y in kwargs.iteritems()]
self.__logger.debug('%s(%s)' % (self.__name, ', '.join(arg_list)))
class TimeMock:
@classmethod
def sleep(self, seconds):
return
class GrpMock:
@classmethod
def getgrnam(self, name):
global GROUP_LIST
if name in GROUP_LIST:
return True
raise KeyError
class PwdMock:
@classmethod
def getpwnam(self, name):
global USER_LIST
if name in USER_LIST:
class result:
pw_uid = 0
pw_gid = 0
return result
raise KeyError
class NetifacesMock:
@classmethod
def ifaddresses(self, name):
global INTERFACE_DICT
if name in INTERFACE_DICT:
return INTERFACE_DICT[name]
raise ValueError
@classmethod
def interfaces(self):
global INTERFACE_DICT
return INTERFACE_DICT.keys()
class SlapformatMixin(unittest.TestCase):
# keep big diffs
maxDiff = None
def patchNetifaces(self):
self.netifaces = NetifacesMock()
self.saved_netifaces = dict()
for fake in vars(NetifacesMock):
self.saved_netifaces[fake] = getattr(netifaces, fake, None)
setattr(netifaces, fake, getattr(self.netifaces, fake))
def restoreNetifaces(self):
for name, original_value in self.saved_netifaces.items():
setattr(netifaces, name, original_value)
del self.saved_netifaces
def patchPwd(self):
self.saved_pwd = dict()
for fake in vars(PwdMock):
self.saved_pwd[fake] = getattr(pwd, fake, None)
setattr(pwd, fake, getattr(PwdMock, fake))
def restorePwd(self):
for name, original_value in self.saved_pwd.items():
setattr(pwd, name, original_value)
del self.saved_pwd
def patchTime(self):
self.saved_time = dict()
for fake in vars(TimeMock):
self.saved_time[fake] = getattr(time, fake, None)
setattr(time, fake, getattr(TimeMock, fake))
def restoreTime(self):
for name, original_value in self.saved_time.items():
setattr(time, name, original_value)
del self.saved_time
def patchGrp(self):
self.saved_grp = dict()
for fake in vars(GrpMock):
self.saved_grp[fake] = getattr(grp, fake, None)
setattr(grp, fake, getattr(GrpMock, fake))
def restoreGrp(self):
for name, original_value in self.saved_grp.items():
setattr(grp, name, original_value)
del self.saved_grp
def patchOs(self, logger):
self.saved_os = dict()
for fake in ['mkdir', 'chown', 'chmod', 'makedirs']:
self.saved_os[fake] = getattr(os, fake, None)
f = LoggableWrapper(logger, fake)
setattr(os, fake, f)
def restoreOs(self):
for name, original_value in self.saved_os.items():
setattr(os, name, original_value)
del self.saved_os
def setUp(self):
config = FakeConfig()
config.dry_run = True
config.verbose = True
logger = logging.getLogger('testcatch')
logger.setLevel(logging.DEBUG)
self.test_result = TestLoggerHandler()
logger.addHandler(self.test_result)
config.logger = logger
self.partition = slapos.format.Partition('partition', '/part_path',
slapos.format.User('testuser'), [], None)
global USER_LIST
USER_LIST = []
global GROUP_LIST
GROUP_LIST = []
global INTERFACE_DICT
INTERFACE_DICT = {}
self.real_callAndRead = slapos.format.callAndRead
self.fakeCallAndRead = FakeCallAndRead()
slapos.format.callAndRead = self.fakeCallAndRead
self.patchOs(logger)
self.patchGrp()
self.patchTime()
self.patchPwd()
self.patchNetifaces()
def tearDown(self):
self.restoreOs()
self.restoreGrp()
self.restoreTime()
self.restorePwd()
self.restoreNetifaces()
slapos.format.callAndRead = self.real_callAndRead
class TestComputer(SlapformatMixin):
def test_getAddress_empty_computer(self):
computer = slapos.format.Computer('computer')
self.assertEqual(computer.getAddress(), {'netmask': None, 'addr': None})
def test_construct_empty(self):
computer = slapos.format.Computer('computer')
computer.construct()
raise NotImplementedError
def test_construct_empty_prepared(self):
computer = slapos.format.Computer('computer',
bridge=slapos.format.Bridge('bridge', '127.0.0.1/16'))
computer.instance_root = '/instance_root'
computer.software_root = '/software_root'
computer.construct()
self.assertEqual([
"makedirs('/instance_root', 493)",
"makedirs('/software_root', 493)",
"chown('/software_root', 0, 0)",
"chmod('/software_root', 493)"],
self.test_result.bucket)
self.assertEqual([
'ip addr list bridge',
'groupadd slapsoft',
'useradd -d /software_root -g slapsoft -s /bin/false slapsoft'
],
self.fakeCallAndRead.external_command_list)
def test_construct_empty_prepared_no_alter_user(self):
computer = slapos.format.Computer('computer',
bridge=slapos.format.Bridge('bridge', '127.0.0.1/16'))
computer.instance_root = '/instance_root'
computer.software_root = '/software_root'
computer.construct(alter_user=False)
self.assertEqual([
"makedirs('/instance_root', 493)",
"makedirs('/software_root', 493)",
"chmod('/software_root', 493)"],
self.test_result.bucket)
self.assertEqual([
'ip addr list bridge',],
self.fakeCallAndRead.external_command_list)
def test_construct_empty_prepared_no_alter_network(self):
computer = slapos.format.Computer('computer',
bridge=slapos.format.Bridge('bridge', '127.0.0.1/16'))
computer.instance_root = '/instance_root'
computer.software_root = '/software_root'
computer.construct(alter_network=False)
self.assertEqual([
"makedirs('/instance_root', 493)",
"makedirs('/software_root', 493)",
"chown('/software_root', 0, 0)",
"chmod('/software_root', 493)"],
self.test_result.bucket)
self.assertEqual([
'ip addr list bridge',
'groupadd slapsoft',
'useradd -d /software_root -g slapsoft -s /bin/false slapsoft'
],
self.fakeCallAndRead.external_command_list)
def test_construct_empty_prepared_no_alter_network_user(self):
computer = slapos.format.Computer('computer',
bridge=slapos.format.Bridge('bridge', '127.0.0.1/16'))
computer.instance_root = '/instance_root'
computer.software_root = '/software_root'
computer.construct(alter_network=False, alter_user=False)
self.assertEqual([
"makedirs('/instance_root', 493)",
"makedirs('/software_root', 493)",
"chmod('/software_root', 493)"],
self.test_result.bucket)
self.assertEqual([
'ip addr list bridge',
],
self.fakeCallAndRead.external_command_list)
def test_construct_prepared(self):
computer = slapos.format.Computer('computer',
bridge=slapos.format.Bridge('bridge', '127.0.0.1/16'))
computer.instance_root = '/instance_root'
computer.software_root = '/software_root'
partition = slapos.format.Partition('partition', '/part_path',
slapos.format.User('testuser'), [], None)
partition.tap = slapos.format.Tap('tap')
computer.partition_list = [partition]
global INTERFACE_DICT
INTERFACE_DICT['bridge'] = {
2: [{'addr': '192.168.242.77', 'broadcast': '127.0.0.1',
'netmask': '255.255.255.0'}],
10: [{'addr': '2a01:e35:2e27::e59c', 'netmask': 'ffff:ffff:ffff:ffff::'}]
}
computer.construct()
self.assertEqual([
"makedirs('/instance_root', 493)",
"makedirs('/software_root', 493)",
"chown('/software_root', 0, 0)",
"chmod('/software_root', 493)",
"mkdir('/instance_root/partition', 488)",
"chown('/instance_root/partition', 0, 0)",
"chmod('/instance_root/partition', 488)"
],
self.test_result.bucket)
self.assertEqual([
'ip addr list bridge',
'groupadd slapsoft',
'useradd -d /software_root -g slapsoft -s /bin/false slapsoft',
'groupadd testuser',
'useradd -d /instance_root/partition -g testuser -s /bin/false -G slapsoft testuser',
'tunctl -t tap -u testuser',
'ip link set tap up',
'brctl show',
'brctl addif bridge tap',
'ip addr add ip/255.255.255.255 dev bridge',
'ip addr list bridge',
'ip addr add ip/ffff:ffff:ffff:ffff:: dev bridge',
'ip addr list bridge',
],
self.fakeCallAndRead.external_command_list)
def test_construct_prepared_no_alter_user(self):
computer = slapos.format.Computer('computer',
bridge=slapos.format.Bridge('bridge', '127.0.0.1/16'))
computer.instance_root = '/instance_root'
computer.software_root = '/software_root'
partition = slapos.format.Partition('partition', '/part_path',
slapos.format.User('testuser'), [], None)
partition.tap = slapos.format.Tap('tap')
computer.partition_list = [partition]
global INTERFACE_DICT
INTERFACE_DICT['bridge'] = {
2: [{'addr': '192.168.242.77', 'broadcast': '127.0.0.1',
'netmask': '255.255.255.0'}],
10: [{'addr': '2a01:e35:2e27::e59c', 'netmask': 'ffff:ffff:ffff:ffff::'}]
}
computer.construct(alter_user=False)
self.assertEqual([
"makedirs('/instance_root', 493)",
"makedirs('/software_root', 493)",
"chmod('/software_root', 493)",
"mkdir('/instance_root/partition', 488)",
"chmod('/instance_root/partition', 488)"
],
self.test_result.bucket)
self.assertEqual([
'ip addr list bridge',
'tunctl -t tap -u root',
'ip link set tap up',
'brctl show',
'brctl addif bridge tap',
'ip addr add ip/255.255.255.255 dev bridge',
'ip addr list bridge',
'ip addr add ip/ffff:ffff:ffff:ffff:: dev bridge',
'ip addr list bridge',
],
self.fakeCallAndRead.external_command_list)
def test_construct_prepared_no_alter_network(self):
computer = slapos.format.Computer('computer',
bridge=slapos.format.Bridge('bridge', '127.0.0.1/16'))
computer.instance_root = '/instance_root'
computer.software_root = '/software_root'
partition = slapos.format.Partition('partition', '/part_path',
slapos.format.User('testuser'), [], None)
partition.tap = slapos.format.Tap('tap')
computer.partition_list = [partition]
global INTERFACE_DICT
INTERFACE_DICT['bridge'] = {
2: [{'addr': '192.168.242.77', 'broadcast': '127.0.0.1',
'netmask': '255.255.255.0'}],
10: [{'addr': '2a01:e35:2e27::e59c', 'netmask': 'ffff:ffff:ffff:ffff::'}]
}
computer.construct(alter_network=False)
self.assertEqual([
"makedirs('/instance_root', 493)",
"makedirs('/software_root', 493)",
"chown('/software_root', 0, 0)",
"chmod('/software_root', 493)",
"mkdir('/instance_root/partition', 488)",
"chown('/instance_root/partition', 0, 0)",
"chmod('/instance_root/partition', 488)"
],
self.test_result.bucket)
self.assertEqual([
# 'ip addr list bridge',
'groupadd slapsoft',
'useradd -d /software_root -g slapsoft -s /bin/false slapsoft',
'groupadd testuser',
'useradd -d /instance_root/partition -g testuser -s /bin/false -G slapsoft testuser',
# 'ip addr add ip/255.255.255.255 dev bridge',
# 'ip addr list bridge',
# 'ip addr add ip/ffff:ffff:ffff:ffff:: dev bridge',
# 'ip addr list bridge',
],
self.fakeCallAndRead.external_command_list)
def test_construct_prepared_no_alter_network_user(self):
computer = slapos.format.Computer('computer',
bridge=slapos.format.Bridge('bridge', '127.0.0.1/16'))
computer.instance_root = '/instance_root'
computer.software_root = '/software_root'
partition = slapos.format.Partition('partition', '/part_path',
slapos.format.User('testuser'), [], None)
partition.tap = slapos.format.Tap('tap')
computer.partition_list = [partition]
global INTERFACE_DICT
INTERFACE_DICT['bridge'] = {
2: [{'addr': '192.168.242.77', 'broadcast': '127.0.0.1',
'netmask': '255.255.255.0'}],
10: [{'addr': '2a01:e35:2e27::e59c', 'netmask': 'ffff:ffff:ffff:ffff::'}]
}
computer.construct(alter_network=False, alter_user=False)
self.assertEqual([
"makedirs('/instance_root', 493)",
"makedirs('/software_root', 493)",
"chmod('/software_root', 493)",
"mkdir('/instance_root/partition', 488)",
"chmod('/instance_root/partition', 488)"
],
self.test_result.bucket)
self.assertEqual([
# 'ip addr list bridge',
# 'ip addr add ip/255.255.255.255 dev bridge',
# 'ip addr list bridge',
# 'ip addr add ip/ffff:ffff:ffff:ffff:: dev bridge',
# 'ip addr list bridge',
],
self.fakeCallAndRead.external_command_list)
class TestPartition(SlapformatMixin):
def test_createPath(self):
global USER_LIST
USER_LIST = ['testuser']
self.partition.createPath()
self.assertEqual(
[
"mkdir('/part_path', 488)",
"chown('/part_path', 0, 0)",
"chmod('/part_path', 488)"
],
self.test_result.bucket
)
def test_createPath_no_alter_user(self):
self.partition.createPath(False)
self.assertEqual(
[
"mkdir('/part_path', 488)",
"chmod('/part_path', 488)"
],
self.test_result.bucket
)
class TestUser(SlapformatMixin):
def test_create(self):
user = slapos.format.User('doesnotexistsyet')
user.setPath('/doesnotexistsyet')
user.create()
self.assertEqual([
'groupadd doesnotexistsyet',
'useradd -d /doesnotexistsyet -g doesnotexistsyet -s /bin/false '\
'doesnotexistsyet'
],
self.fakeCallAndRead.external_command_list)
def test_create_additional_groups(self):
user = slapos.format.User('doesnotexistsyet', ['additionalgroup1',
'additionalgroup2'])
user.setPath('/doesnotexistsyet')
user.create()
self.assertEqual([
'groupadd doesnotexistsyet',
'useradd -d /doesnotexistsyet -g doesnotexistsyet -s /bin/false -G '\
'additionalgroup1,additionalgroup2 doesnotexistsyet'
],
self.fakeCallAndRead.external_command_list)
def test_create_group_exists(self):
global GROUP_LIST
GROUP_LIST = ['testuser']
user = slapos.format.User('testuser')
user.setPath('/testuser')
user.create()
self.assertEqual([
'useradd -d /testuser -g testuser -s /bin/false testuser'
],
self.fakeCallAndRead.external_command_list)
def test_create_user_exists_additional_groups(self):
global USER_LIST
USER_LIST = ['testuser']
user = slapos.format.User('testuser', ['additionalgroup1',
'additionalgroup2'])
user.setPath('/testuser')
user.create()
self.assertEqual([
'groupadd testuser',
'usermod -d /testuser -g testuser -s /bin/false -G '\
'additionalgroup1,additionalgroup2 testuser'
],
self.fakeCallAndRead.external_command_list)
def test_create_user_exists(self):
global USER_LIST
USER_LIST = ['testuser']
user = slapos.format.User('testuser')
user.setPath('/testuser')
user.create()
self.assertEqual([
'groupadd testuser',
'usermod -d /testuser -g testuser -s /bin/false testuser'
],
self.fakeCallAndRead.external_command_list)
def test_create_user_group_exists(self):
global USER_LIST
USER_LIST = ['testuser']
global GROUP_LIST
GROUP_LIST = ['testuser']
user = slapos.format.User('testuser')
user.setPath('/testuser')
user.create()
self.assertEqual([
'usermod -d /testuser -g testuser -s /bin/false testuser'
],
self.fakeCallAndRead.external_command_list)
def test_isAvailable(self):
global USER_LIST
USER_LIST = ['testuser']
user = slapos.format.User('testuser')
self.assertTrue(user.isAvailable())
def test_isAvailable_notAvailable(self):
user = slapos.format.User('doesnotexistsyet')
self.assertFalse(user.isAvailable())
if __name__ == '__main__':
unittest.main()
from slapos.grid import slapgrid
import httplib
import logging
import os
import shutil
import signal
import slapos.slap.slap
import socket
import tempfile
import unittest
import urlparse
import xml_marshaller
class BasicMixin:
def assertSortedListEqual(self, list1, list2, msg=None):
self.assertListEqual(sorted(list1), sorted(list2), msg)
def setUp(self):
self._tempdir = tempfile.mkdtemp()
self.software_root = os.path.join(self._tempdir, 'software')
self.instance_root = os.path.join(self._tempdir, 'instance')
if getattr(self, 'master_url', None) is None:
self.master_url = 'http://127.0.0.1:0/'
self.computer_id = 'computer'
self.supervisord_socket = os.path.join(self._tempdir, 'supervisord.sock')
self.supervisord_configuration_path = os.path.join(self._tempdir,
'supervisord')
self.usage_report_periodicity = 1
self.buildout = None
logging.basicConfig(level=logging.DEBUG)
self.grid = slapgrid.Slapgrid(self.software_root, self.instance_root,
self.master_url, self.computer_id, self.supervisord_socket,
self.supervisord_configuration_path, self.usage_report_periodicity,
self.buildout)
def tearDown(self):
shutil.rmtree(self._tempdir, True)
class TestBasicSlapgridCP(BasicMixin, unittest.TestCase):
def test_no_software_root(self):
self.assertRaises(OSError, self.grid.processComputerPartitionList)
def test_no_instance_root(self):
os.mkdir(self.software_root)
self.assertRaises(OSError, self.grid.processComputerPartitionList)
def test_no_master(self):
os.mkdir(self.software_root)
os.mkdir(self.instance_root)
self.assertRaises(socket.error, self.grid.processComputerPartitionList)
class MasterMixin(BasicMixin):
def _patchHttplib(self):
"""Overrides httplib"""
import mock.httplib
self.saved_httplib = dict()
for fake in vars(mock.httplib):
self.saved_httplib[fake] = getattr(httplib, fake, None)
setattr(httplib, fake, getattr(mock.httplib, fake))
def _unpatchHttplib(self):
"""Restores httplib overriding"""
import httplib
for name, original_value in self.saved_httplib.items():
setattr(httplib, name, original_value)
del self.saved_httplib
def setUp(self):
self._patchHttplib()
BasicMixin.setUp(self)
def tearDown(self):
self._unpatchHttplib()
# XXX: Hardcoded pid, as it is not configurable in slapos
svc = os.path.join(self.instance_root, 'var', 'run', 'supervisord.pid')
if os.path.exists(svc):
try:
pid = int(open(svc).read().strip())
except ValueError:
pass
else:
os.kill(pid, signal.SIGTERM)
BasicMixin.tearDown(self)
class TestSlapgridCPWithMaster(MasterMixin, unittest.TestCase):
def test_nothing_to_do(self):
def server_response(self, path, method, body, header):
parsed_url = urlparse.urlparse(path.lstrip('/'))
parsed_qs = urlparse.parse_qs(parsed_url.query)
if parsed_url.path == 'getComputerInformation' and \
'computer_id' in parsed_qs:
slap_computer = slapos.slap.Computer(parsed_qs['computer_id'])
slap_computer._software_release_list = []
slap_computer._computer_partition_list = []
return (200, {}, xml_marshaller.xml_marshaller.dumps(slap_computer))
else:
return (404, {}, '')
httplib.HTTPConnection._callback = server_response
os.mkdir(self.software_root)
os.mkdir(self.instance_root)
self.assertTrue(self.grid.processComputerPartitionList())
self.assertSortedListEqual(os.listdir(self.instance_root), ['etc', 'var'])
self.assertSortedListEqual(os.listdir(self.software_root), [])
def test_one_partition(self):
def server_response(self, path, method, body, header):
parsed_url = urlparse.urlparse('/' + path)
parsed_qs = urlparse.parse_qs(parsed_url.query)
if parsed_url.path == '/getComputerInformation' and \
'computer_id' in parsed_qs:
slap_computer = slapos.slap.Computer(parsed_qs['computer_id'])
slap_computer._software_release_list = []
partition = slapos.slap.ComputerPartition(parsed_qs['computer_id'],
'0')
partition._need_modification = True
sr = slapos.slap.SoftwareRelease()
sr._software_release = 'http://sr/'
partition._software_release_document = sr
partition._requested_state = 'stopped'
slap_computer._computer_partition_list = [partition]
return (200, {}, xml_marshaller.xml_marshaller.dumps(slap_computer))
else:
return (404, {}, '')
httplib.HTTPConnection._callback = server_response
os.mkdir(self.software_root)
os.mkdir(self.instance_root)
partition_path = os.path.join(self.instance_root, '0')
os.mkdir(partition_path, 0750)
software_hash = slapos.grid.utils.getSoftwareUrlHash('http://sr/')
srdir = os.path.join(self.software_root, software_hash)
os.mkdir(srdir)
open(os.path.join(srdir, 'template.cfg'), 'w').write(
"""[buildout]""")
srbindir = os.path.join(srdir, 'bin')
os.mkdir(srbindir)
open(os.path.join(srbindir, 'buildout'), 'w').write("""#!/bin/sh
touch worked""")
os.chmod(os.path.join(srbindir, 'buildout'), 0755)
self.assertTrue(self.grid.processComputerPartitionList())
self.assertSortedListEqual(os.listdir(self.instance_root), ['0', 'etc',
'var'])
self.assertSortedListEqual(os.listdir(self.software_root),
[software_hash])
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment