Commit 52432e27 authored by Carlos Ramos Carreño's avatar Carlos Ramos Carreño

Testcase module typed and formatted.

- Type hints for testcase module are completed.
- Type comments (to be compatible with Python 2) have been added to
  other parts of slapos.core only when necessary to achieve full type
  checking in the testcase module, with Pyright in strict mode.
- Docstrings added/completed for the testcase module.
- Source is formatted with ruff.

See merge request nexedi/slapos.core!693
parent 2e08776e
......@@ -3,4 +3,4 @@ include slapos/proxy/schema.sql
include slapos/slapos-client.cfg.example
include slapos/slapos-proxy.cfg.example
include slapos/slapos.cfg.example
recursive-include slapos *.in *.txt *.xsd *.rst
recursive-include slapos *.in *.txt *.xsd *.rst py.typed
[tool.ruff]
line-length = 80
indent-width = 2
\ No newline at end of file
......@@ -211,6 +211,7 @@ class SlapPopen(subprocess.Popen):
def md5digest(url):
# type: (str) -> str
return hashlib.md5(url.encode('utf-8')).hexdigest()
......
......@@ -45,6 +45,11 @@ import warnings
import json
import six
try:
from typing import Mapping, Sequence
except ImportError: # XXX to be removed once we depend on typing
pass
from .exception import ResourceNotReady, ServerError, NotFoundError, \
ConnectionError
from .hateoas import SlapHateoasNavigator, ConnectionHelper
......@@ -178,6 +183,7 @@ class SoftwareRelease(SlapDocument):
return self._computer_guid
def getURI(self):
# type: () -> str
if not self._software_release:
raise NameError('software_release has not been defined.')
else:
......@@ -384,6 +390,7 @@ class Computer(SlapDocument):
@_syncComputerInformation
def getComputerPartitionList(self):
# type: (...) -> Sequence[ComputerPartition]
for computer_partition in self._computer_partition_list:
computer_partition._connection_helper = self._connection_helper
computer_partition._hateoas_navigator = self._hateoas_navigator
......@@ -596,6 +603,7 @@ class ComputerPartition(SlapRequester):
return software_instance
def getId(self):
# type: (...) -> str
if not getattr(self, '_partition_id', None):
raise ResourceNotReady()
return self._partition_id
......@@ -629,9 +637,11 @@ class ComputerPartition(SlapRequester):
return software_type
def getInstanceParameterDict(self):
# type: (...) -> Mapping[str, object]
return getattr(self, '_parameter_dict', None) or {}
def getConnectionParameterDict(self):
# type: (...) -> Mapping[str, str]
connection_dict = getattr(self, '_connection_dict', None)
if connection_dict is None:
# XXX Backward compatibility for older slapproxy (<= 1.0.0)
......@@ -640,6 +650,7 @@ class ComputerPartition(SlapRequester):
return connection_dict or {}
def getSoftwareRelease(self):
# type: (...) -> SoftwareRelease
"""
Returns the software release associate to the computer partition.
"""
......
......@@ -44,7 +44,7 @@ except ImportError:
import subprocess
try:
from typing import TYPE_CHECKING, Optional, Iterable, Dict, Union
from typing import TYPE_CHECKING, Iterable, Mapping, Optional, Union
if TYPE_CHECKING:
import subprocess
except ImportError: # XXX to be removed once we depend on typing
......@@ -501,7 +501,13 @@ class StandaloneSlapOS(object):
self._initBaseDirectory(software_root, instance_root, shared_part_root)
def _initBaseDirectory(self, software_root, instance_root, shared_part_root):
def _initBaseDirectory(
self,
software_root, # type: str
instance_root, # type: str
shared_part_root, # type: str
):
# type: (...) -> None
"""Create the directory after checking it's not too deep.
"""
base_directory = self._base_directory
......@@ -619,10 +625,12 @@ class StandaloneSlapOS(object):
def format(
self,
partition_count,
ipv4_address,
ipv6_address,
partition_base_name="slappart"):
partition_count, # type: int
ipv4_address, # type: str
ipv6_address, # type: str
partition_base_name="slappart", # type: str
):
# type: (...) -> None
"""Creates `partition_count` partitions.
All partitions have the same `ipv4_address` and use the current system
......@@ -728,7 +736,13 @@ class StandaloneSlapOS(object):
self._logger.error(e.output)
raise
def supply(self, software_url, computer_guid=None, state="available"):
def supply(
self,
software_url, # type: str
computer_guid=None, # type: str | None
state="available", # type: str
):
# type: (...) -> None
"""Supply a software, see ISupply.supply
Software can only be supplied on this embedded computer.
......@@ -742,14 +756,15 @@ class StandaloneSlapOS(object):
)
def request(
self,
software_release,
partition_reference,
software_type=None,
shared=False,
partition_parameter_kw=None,
filter_kw=None,
state=None):
self,
software_release, # type: str
partition_reference, # type: str
software_type=None, # type: str | None
shared=False, # type: bool
partition_parameter_kw=None, # type: Mapping[str, object] | None
filter_kw=None, # type: Mapping[str, object] | None
state=None, # type: str | None
):
"""Request an instance, see IRequester.request
Instance can only be requested on this embedded computer.
......
......@@ -26,6 +26,8 @@
#
##############################################################################
# pyright: strict
from __future__ import annotations
import contextlib
import fnmatch
......@@ -37,7 +39,7 @@ import sqlite3
import unittest
import warnings
from six.moves.urllib.parse import urlparse
from urllib.parse import urlparse
from netaddr import valid_ipv6
......@@ -59,41 +61,43 @@ from typing import (
ClassVar,
Dict,
Iterable,
List,
Iterator,
Mapping,
Optional,
Sequence,
Tuple,
Type,
TypeVar,
)
ManagedResourceType = TypeVar("ManagedResourceType", bound=ManagedResource)
IPV4_ADDRESS_DEFAULT: str = os.environ['SLAPOS_TEST_IPV4']
IPV6_ADDRESS_DEFAULT: str = os.environ['SLAPOS_TEST_IPV6']
IPV4_ADDRESS_DEFAULT: str = os.environ["SLAPOS_TEST_IPV4"]
IPV6_ADDRESS_DEFAULT: str = os.environ["SLAPOS_TEST_IPV6"]
DEBUG_DEFAULT: bool = bool(
int(os.environ.get('SLAPOS_TEST_DEBUG', 0)),
int(os.environ.get("SLAPOS_TEST_DEBUG", 0)),
)
VERBOSE_DEFAULT: bool = bool(
int(os.environ.get('SLAPOS_TEST_VERBOSE', 0)),
int(os.environ.get("SLAPOS_TEST_VERBOSE", 0)),
)
SKIP_SOFTWARE_CHECK_DEFAULT: bool = bool(
int(os.environ.get('SLAPOS_TEST_SKIP_SOFTWARE_CHECK', 0))
int(os.environ.get("SLAPOS_TEST_SKIP_SOFTWARE_CHECK", 0))
)
SKIP_SOFTWARE_REBUILD_DEFAULT: bool = bool(
int(os.environ.get('SLAPOS_TEST_SKIP_SOFTWARE_REBUILD', 0))
int(os.environ.get("SLAPOS_TEST_SKIP_SOFTWARE_REBUILD", 0))
)
SHARED_PART_LIST_DEFAULT: Sequence[str] = [
os.path.expanduser(p)
for p in os.environ.get(
'SLAPOS_TEST_SHARED_PART_LIST',
'',
).split(os.pathsep) if p
"SLAPOS_TEST_SHARED_PART_LIST",
"",
).split(os.pathsep)
if p
]
SNAPSHOT_DIRECTORY_DEFAULT: str | None = os.environ.get(
'SLAPOS_TEST_LOG_DIRECTORY',
"SLAPOS_TEST_LOG_DIRECTORY",
)
def makeModuleSetUpAndTestCaseClass(
software_url: str | os.PathLike[str],
*,
......@@ -106,7 +110,7 @@ def makeModuleSetUpAndTestCaseClass(
skip_software_rebuild: bool = SKIP_SOFTWARE_REBUILD_DEFAULT,
shared_part_list: Iterable[str] = SHARED_PART_LIST_DEFAULT,
snapshot_directory: str | None = SNAPSHOT_DIRECTORY_DEFAULT,
software_id: str | None = None
software_id: str | None = None,
) -> Tuple[Callable[[], None], Type[SlapOSInstanceTestCase]]:
"""
Create a setup module function and a testcase for testing `software_url`.
......@@ -143,7 +147,7 @@ def makeModuleSetUpAndTestCaseClass(
logs information describing the actions taken (sets logging level to
``DEBUG``).
By default it will be controlled by the value of the environment variable
``SLAPOS_TEST_VERBOSE`` if it is defined. Otherwise it will be disabled.
``SLAPOS_TEST_VERBOSE`` if it is defined. Otherwise it will be disabled.
skip_software_check: Skips costly software checks.
By default it will be controlled by the value of the environment variable
``SLAPOS_TEST_SKIP_SOFTWARE_CHECK`` if it is defined. Otherwise it will
......@@ -175,30 +179,30 @@ def makeModuleSetUpAndTestCaseClass(
- A function to install the software, to be used as `unittest`'s
`setUpModule`.
- A base class for test cases.
"""
software_url = os.fspath(software_url)
if base_directory is None:
base_directory = os.path.realpath(
os.environ.get(
'SLAPOS_TEST_WORKING_DIR',
"SLAPOS_TEST_WORKING_DIR",
os.path.join(
os.getcwd(),
'.slapos',
)
".slapos",
),
)
)
if not software_id:
software_id = urlparse(software_url).path.split('/')[-2]
software_id = urlparse(software_url).path.split("/")[-2]
logging.basicConfig(
level=logging.DEBUG,
format=f'%(asctime)s - {software_id} - %(name)s - %(levelname)s - %(message)s',
format=f"%(asctime)s - {software_id} - %(name)s - %(levelname)s - %(message)s",
filename=os.path.join(
snapshot_directory or base_directory,
'testcase.log',
"testcase.log",
),
)
logger = logging.getLogger()
......@@ -221,29 +225,29 @@ def makeModuleSetUpAndTestCaseClass(
)
except PathTooDeepError:
raise RuntimeError(
f'base directory ( {base_directory} ) is too deep, try setting '
f'SLAPOS_TEST_WORKING_DIR to a shallow enough directory',
f"base directory ( {base_directory} ) is too deep, try setting "
f"SLAPOS_TEST_WORKING_DIR to a shallow enough directory",
)
cls = type(
f'SlapOSInstanceTestCase for {software_url}',
f"SlapOSInstanceTestCase for {software_url}",
(SlapOSInstanceTestCase,),
{
'slap': slap,
'getSoftwareURL': classmethod(lambda _cls: software_url),
'software_id': software_id,
'_debug': debug,
'_skip_software_check': skip_software_check,
'_skip_software_rebuild': skip_software_rebuild,
'_ipv4_address': ipv4_address,
'_ipv6_address': ipv6_address,
'_base_directory': base_directory,
'_test_file_snapshot_directory': snapshot_directory
"slap": slap,
"getSoftwareURL": classmethod(lambda _cls: software_url),
"software_id": software_id,
"_debug": debug,
"_skip_software_check": skip_software_check,
"_skip_software_rebuild": skip_software_rebuild,
"_ipv4_address": ipv4_address,
"_ipv6_address": ipv6_address,
"_base_directory": base_directory,
"_test_file_snapshot_directory": snapshot_directory,
},
)
class SlapOSInstanceTestCase_(
cls, # type: ignore # https://github.com/python/mypy/issues/2813
cls,
SlapOSInstanceTestCase,
):
# useless intermediate class so that editors provide completion anyway.
......@@ -255,33 +259,56 @@ def makeModuleSetUpAndTestCaseClass(
return setUpModule, SlapOSInstanceTestCase_
def installSoftwareUrlList(cls, software_url_list, max_retry=10, debug=False):
# type: (Type[SlapOSInstanceTestCase], Iterable[str], int, bool) -> None
"""Install softwares on the current testing slapos, for use in `setUpModule`.
def installSoftwareUrlList(
cls: Type[SlapOSInstanceTestCase],
software_url_list: Sequence[str],
max_retry: int = 10,
debug: bool = False,
) -> None:
"""
Install softwares on the current testing slapos, for use in `setUpModule`.
This also check softwares with `checkSoftware`.
Args:
cls: The test case class used for the installation.
software_url_list: List of URLs or paths to install.
max_retry: Number of times that the installation will be retried if there
is an error.
debug: If set to ``True`` the software will not be automatically removed
if there is an error during the installation process, in order to
facilitate inspection during debug.
This also check softwares with `checkSoftware`
"""
def _storeSoftwareSnapshot(name):
for path in glob.glob(os.path.join(
cls._base_directory,
'var',
'log',
'*',
)) + glob.glob(os.path.join(
cls.slap.software_directory,
'*',
'*.cfg',
)) + glob.glob(os.path.join(
cls.slap.software_directory,
'*',
'.installed.cfg',
)) + glob.glob(os.path.join(
cls.slap.shared_directory,
'*',
'*',
'.slapos.recipe.cmmi.signature',
)):
cls._copySnapshot(path, name)
def _storeSoftwareSnapshot(name: str) -> None:
for path in (
glob.glob(
os.path.join(
cls._base_directory, # pyright: ignore[reportPrivateUsage]
"var/log/*",
)
)
+ glob.glob(
os.path.join(
cls.slap.software_directory,
"*/*.cfg",
)
)
+ glob.glob(
os.path.join(
cls.slap.software_directory,
"*/.installed.cfg",
)
)
+ glob.glob(
os.path.join(
cls.slap.shared_directory,
"*/*/.slapos.recipe.cmmi.signature",
)
)
):
cls._copySnapshot(path, name) # pyright: ignore[reportPrivateUsage]
try:
cls.logger.debug("Starting SlapOS")
......@@ -290,9 +317,13 @@ def installSoftwareUrlList(cls, software_url_list, max_retry=10, debug=False):
cls.logger.debug("Supplying %s", software_url)
cls.slap.supply(software_url)
cls.logger.debug("Waiting for slapos node software to build")
cls.slap.waitForSoftware(max_retry=max_retry, debug=debug, install_all=not cls._skip_software_rebuild)
_storeSoftwareSnapshot('setupModule')
if not cls._skip_software_check:
cls.slap.waitForSoftware(
max_retry=max_retry,
debug=debug,
install_all=not cls._skip_software_rebuild, # pyright: ignore[reportPrivateUsage]
)
_storeSoftwareSnapshot("setupModule")
if not cls._skip_software_check: # pyright: ignore[reportPrivateUsage]
for software_url in software_url_list:
cls.logger.debug("Checking software %s", software_url)
checkSoftware(cls.slap, software_url)
......@@ -300,8 +331,8 @@ def installSoftwareUrlList(cls, software_url_list, max_retry=10, debug=False):
else:
cls.logger.debug("Software checks skipped")
except BaseException as e:
_storeSoftwareSnapshot('setupModule failed installing software')
except BaseException:
_storeSoftwareSnapshot("setupModule failed installing software")
if not debug:
cls.logger.exception("Error building software, removing")
try:
......@@ -312,8 +343,8 @@ def installSoftwareUrlList(cls, software_url_list, max_retry=10, debug=False):
cls.slap.waitForSoftware(max_retry=max_retry, debug=debug)
except BaseException:
cls.logger.exception("Error removing software")
_storeSoftwareSnapshot('setupModule removing software')
cls._cleanup('setupModule')
_storeSoftwareSnapshot("setupModule removing software")
cls._cleanup("setupModule") # pyright: ignore[reportPrivateUsage]
raise
......@@ -352,7 +383,7 @@ class SlapOSInstanceTestCase(unittest.TestCase):
instance_max_retry: ClassVar[int] = 20
report_max_retry: ClassVar[int] = 20
partition_count: ClassVar[int] = 10
default_partition_reference: ClassVar[str] = 'testing partition 0'
default_partition_reference: ClassVar[str] = "testing partition 0"
request_instance: ClassVar[bool] = True
software_id: ClassVar[str] = ""
......@@ -364,9 +395,11 @@ class SlapOSInstanceTestCase(unittest.TestCase):
computer_partition_root_path: ClassVar[str]
computer_partition_ipv6_address: ClassVar[str]
# Private settings
# Partition reference: use when default length is too long.
__partition_reference__: ClassVar[str]
# True to enable debugging utilities.
_debug: ClassVar[bool] = False
......@@ -389,32 +422,47 @@ class SlapOSInstanceTestCase(unittest.TestCase):
_instance_parameter_dict: ClassVar[Mapping[str, object]]
# Base directory for standalone SlapOS.
_base_directory: ClassVar[str] = ""
_base_directory: ClassVar[str] = ""
# Directory to save snapshot files for inspections.
_test_file_snapshot_directory: ClassVar[str | None] = ""
# Patterns of files to save for inspection, relative to instance directory.
_save_instance_file_pattern_list: ClassVar[Sequence[str]] = (
'*/bin/*',
'*/etc/*',
'*/var/log/*',
'*/srv/monitor/*',
'*/srv/backup/logrotate/*',
'*/.*log',
'*/.*cfg',
'*/*cfg',
'etc/',
"*/bin/*",
"*/etc/*",
"*/var/log/*",
"*/srv/monitor/*",
"*/srv/backup/logrotate/*",
"*/.*log",
"*/.*cfg",
"*/*cfg",
"etc/",
)
@classmethod
def getManagedResource(cls, resource_name, resource_class):
# type: (str, Type[ManagedResourceType]) -> ManagedResourceType
"""Get the managed resource for this name.
def getManagedResource(
cls,
resource_name: str,
resource_class: Type[ManagedResourceType],
) -> ManagedResourceType:
"""
Get the managed resource for this name.
If resource was not created yet, it is created and `open`. The
resource will automatically be `close` at the end of the test
class.
Args:
resource_name: The name of the resource.
resource_class: The desired class of the resource. If the resource
exists, but is not an instance of this class, an exception will be
raised. Otherwise, if the resource does not exist, this class will be
used to construct a new resource with that name.
Returns:
A resource with name ``resource_name`` and class ``resource_class``.
"""
try:
existing_resource = cls._resources[resource_name]
......@@ -426,41 +474,64 @@ class SlapOSInstanceTestCase(unittest.TestCase):
else:
if not isinstance(existing_resource, resource_class):
raise ValueError(
"Resource %s is of unexpected class %s" %
(resource_name, existing_resource), )
f"Resource {resource_name} is of unexpected "
f"class {existing_resource}",
)
return existing_resource
# Methods to be defined by subclasses.
@classmethod
def getSoftwareURL(cls):
"""Return URL of software release to request instance.
def getSoftwareURL(cls) -> str:
"""
Return URL of software release to request instance.
This method will be defined when initialising the class
with makeModuleSetUpAndTestCaseClass.
Returns:
URL of the software release to request.
"""
raise NotImplementedError()
@classmethod
def getInstanceParameterDict(cls):
"""Return instance parameters.
def getInstanceParameterDict(cls) -> Mapping[str, object]:
"""
Return instance parameters.
To be defined by subclasses if they need to request instance
with specific parameters.
Returns:
A mapping with the parameters to be set in the instance.
"""
return {}
@classmethod
def getInstanceSoftwareType(cls):
"""Return software type for instance, default None.
def getInstanceSoftwareType(cls) -> str | None:
"""
Return software type for instance, default None.
To be defined by subclasses if they need to request instance with specific
software type.
Returns:
Name of the software type, or `None` to use the default software type.
"""
return None
# Unittest methods
@classmethod
def waitForInstance(cls):
def waitForInstance(cls) -> None:
"""
Wait for the instance to be ready.
This method does retry several times until either the instance is ready or
`cls.instance_max_retry` unsuccessful retries have been done.
"""
# waitForInstance does not tolerate any error but with instances,
# promises sometimes fail on first run, because services did not
# have time to start.
......@@ -473,18 +544,24 @@ class SlapOSInstanceTestCase(unittest.TestCase):
cls.slap.waitForInstance(debug=True)
else:
cls.slap.waitForInstance(
max_retry=cls.instance_max_retry, debug=cls._debug)
max_retry=cls.instance_max_retry,
debug=cls._debug,
)
@classmethod
def formatPartitions(cls):
def formatPartitions(cls) -> None:
"""Format the instance partitions."""
cls.logger.debug(
"Formatting to remove old partitions XXX should not be needed because we delete ..."
"Formatting to remove old partitions XXX should not be needed because we delete ..."
)
cls.slap.format(0, cls._ipv4_address, cls._ipv6_address)
cls.logger.debug("Formatting with %s partitions", cls.partition_count)
cls.slap.format(
cls.partition_count, cls._ipv4_address, cls._ipv6_address,
getattr(cls, '__partition_reference__', '{}-'.format(cls.__name__)))
cls.partition_count,
cls._ipv4_address,
cls._ipv6_address,
getattr(cls, "__partition_reference__", f"{cls.__name__}-"),
)
@classmethod
def _setUpClass(cls) -> None:
......@@ -507,7 +584,7 @@ class SlapOSInstanceTestCase(unittest.TestCase):
# the path of the instance on the filesystem, for low level inspection
cls.computer_partition_root_path = os.path.join(
cls.slap._instance_root,
cls.slap._instance_root, # pyright: ignore[reportPrivateUsage]
cls.computer_partition.getId(),
)
......@@ -518,7 +595,7 @@ class SlapOSInstanceTestCase(unittest.TestCase):
@classmethod
@contextlib.contextmanager
def _snapshotManager(cls, snapshot_name):
def _snapshotManager(cls, snapshot_name: str) -> Iterator[None]:
try:
yield
except BaseException:
......@@ -530,8 +607,7 @@ class SlapOSInstanceTestCase(unittest.TestCase):
@classmethod
def setUpClass(cls):
"""Request an instance.
"""
"""Request an instance."""
cls.logger.debug("Starting setUpClass %s", cls)
cls._instance_parameter_dict = cls.getInstanceParameterDict()
snapshot_name = "{}.{}.setUpClass".format(cls.__module__, cls.__name__)
......@@ -541,102 +617,140 @@ class SlapOSInstanceTestCase(unittest.TestCase):
cls._setUpClass()
except BaseException:
cls.logger.exception("Error during setUpClass")
cls.setUp = lambda self: self.fail('Setup Class failed.')
cls.setUp = lambda self: self.fail("Setup Class failed.")
raise
cls.logger.debug("setUpClass done")
@classmethod
def tearDownClass(cls):
"""Tear down class, stop the processes and destroy instance.
"""
cls._cleanup("{}.{}.tearDownClass".format(cls.__module__, cls.__name__))
"""Tear down class, stop the processes and destroy instance."""
cls._cleanup(f"{cls.__module__}.{cls.__name__}.tearDownClass")
if not cls._debug:
cls.logger.debug(
"cleaning up slapos log files in %s", cls.slap._log_directory)
for log_file in glob.glob(os.path.join(cls.slap._log_directory, '*')):
"cleaning up slapos log files in %s",
cls.slap._log_directory, # pyright: ignore[reportPrivateUsage]
)
for log_file in glob.glob(
os.path.join(
cls.slap._log_directory, # pyright: ignore[reportPrivateUsage]
"*",
)
):
os.unlink(log_file)
@classmethod
def _storePartitionSnapshot(cls, name):
"""Store snapshot of partitions.
def _storePartitionSnapshot(cls, name: str) -> None:
"""
Store snapshot of partitions.
This uses the definition from class attribute
`_save_instance_file_pattern_list`.
Args:
name: Name of the snapshot.
This uses the definition from class attribute `_save_instance_file_pattern_list`
"""
# copy config and log files from partitions
for (dirpath, dirnames, filenames) in os.walk(cls.slap.instance_directory):
for dirpath, dirnames, filenames in os.walk(cls.slap.instance_directory):
for dirname in list(dirnames):
dirabspath = os.path.join(dirpath, dirname)
if any(fnmatch.fnmatch(
if any(
fnmatch.fnmatch(
dirabspath,
pattern,
) for pattern in cls._save_instance_file_pattern_list):
)
for pattern in cls._save_instance_file_pattern_list
):
cls._copySnapshot(dirabspath, name)
# don't recurse, since _copySnapshot is already recursive
dirnames.remove(dirname)
for filename in filenames:
fileabspath = os.path.join(dirpath, filename)
if any(fnmatch.fnmatch(
if any(
fnmatch.fnmatch(
fileabspath,
pattern,
) for pattern in cls._save_instance_file_pattern_list):
)
for pattern in cls._save_instance_file_pattern_list
):
cls._copySnapshot(fileabspath, name)
@classmethod
def _storeSystemSnapshot(cls, name):
"""Store a snapshot of standalone slapos and partitions.
def _storeSystemSnapshot(cls, name: str) -> None:
"""
Store a snapshot of standalone slapos and partitions.
Does not include software log, because this is stored at the end of
software installation and software log is large.
Args:
name: Name of the snapshot.
Does not include software log, because this is stored at the end of software
installation and software log is large.
"""
# copy log files from standalone
for standalone_log in glob.glob(os.path.join(
for standalone_log in glob.glob(
os.path.join(
cls._base_directory,
'var',
'log',
'*',
)):
if not standalone_log.startswith('slapos-node-software.log'):
"var/log/*",
)
):
if not standalone_log.startswith("slapos-node-software.log"):
cls._copySnapshot(standalone_log, name)
# store slapproxy database
cls._copySnapshot(cls.slap._proxy_database, name)
cls._copySnapshot(
cls.slap._proxy_database, # pyright: ignore[reportPrivateUsage]
name,
)
cls._storePartitionSnapshot(name)
def tearDown(self):
self._storePartitionSnapshot(self.id())
@classmethod
def _copySnapshot(cls, source_file_name, name):
"""Save a file, symbolic link or directory for later inspection.
def _copySnapshot(cls, source_file_name: str, name: str) -> None:
"""
Save a file, symbolic link or directory for later inspection.
The path are made relative to slapos root directory and
we keep the same directory structure.
Args:
source_file_name: The name of the file or directory to copy.
name: Name of the snapshot.
"""
if not cls._test_file_snapshot_directory:
warnings.warn("No snapshot directory configured, skipping snapshot")
warnings.warn("Snapshot directory can be configured with SLAPOS_TEST_LOG_DIRECTORY environment")
warnings.warn(
"Snapshot directory can be configured with SLAPOS_TEST_LOG_DIRECTORY environment"
)
return
# we cannot use os.path.commonpath on python2, so implement something similar
common_path = os.path.commonprefix((source_file_name, cls._base_directory))
if not os.path.isdir(common_path):
common_path = os.path.dirname(common_path)
relative_path = source_file_name[len(common_path):]
relative_path = source_file_name[len(common_path) :]
if relative_path[0] == os.sep:
relative_path = relative_path[1:]
destination = os.path.join(
cls._test_file_snapshot_directory,
cls.software_id,
name,
relative_path,
cls._test_file_snapshot_directory,
cls.software_id,
name,
relative_path,
)
destination_dirname = os.path.dirname(destination)
mkdir_p(destination_dirname)
if os.path.islink(
source_file_name) and not os.path.exists(source_file_name):
if os.path.islink(source_file_name) and not os.path.exists(
source_file_name
):
cls.logger.debug(
"copy broken symlink %s as %s", source_file_name, destination)
with open(destination, 'w') as f:
f.write('broken symink to {}\n'.format(os.readlink(source_file_name)))
"copy broken symlink %s as %s",
source_file_name,
destination,
)
with open(destination, "w") as f:
f.write(f"broken symink to {os.readlink(source_file_name)}\n")
elif os.path.isfile(source_file_name):
shutil.copy(source_file_name, destination)
elif os.path.isdir(source_file_name):
......@@ -646,10 +760,15 @@ class SlapOSInstanceTestCase(unittest.TestCase):
# implementation methods
@classmethod
def _cleanup(cls, snapshot_name):
# type: (str) -> None
"""Destroy all instances and stop subsystem.
def _cleanup(cls, snapshot_name: str) -> None:
"""
Destroy all instances and stop subsystem.
Catches and log all exceptions and take snapshot named `snapshot_name` + the failing step.
Args:
snapshot_name: Name of the snapshot that will be taken in case of exception.
"""
for resource_name in list(cls._resources):
cls.logger.debug("closing resource %s", resource_name)
......@@ -658,12 +777,11 @@ class SlapOSInstanceTestCase(unittest.TestCase):
except:
cls.logger.exception("Error closing resource %s", resource_name)
try:
if cls.request_instance and hasattr(cls, '_instance_parameter_dict'):
cls.requestDefaultInstance(state='destroyed')
if cls.request_instance and hasattr(cls, "_instance_parameter_dict"):
cls.requestDefaultInstance(state="destroyed")
except:
cls.logger.exception("Error during request destruction")
cls._storeSystemSnapshot(
"{}._cleanup request destroy".format(snapshot_name))
cls._storeSystemSnapshot(f"{snapshot_name}._cleanup request destroy")
try:
# To make debug usable, we tolerate report_max_retry-1 errors and
# only debug the last.
......@@ -674,36 +792,43 @@ class SlapOSInstanceTestCase(unittest.TestCase):
except SlapOSNodeCommandError:
cls.slap.waitForReport(debug=True)
else:
cls.slap.waitForReport(max_retry=cls.report_max_retry, debug=cls._debug)
cls.slap.waitForReport(
max_retry=cls.report_max_retry, debug=cls._debug
)
except:
cls.logger.exception("Error during actual destruction")
cls._storeSystemSnapshot(
"{}._cleanup waitForReport".format(snapshot_name))
cls._storeSystemSnapshot(f"{snapshot_name}._cleanup waitForReport")
leaked_partitions = [
cp for cp in cls.slap.computer.getComputerPartitionList()
if cp.getState() != 'destroyed'
cp
for cp in cls.slap.computer.getComputerPartitionList()
if cp.getState() != "destroyed"
]
if leaked_partitions:
cls.logger.critical(
"The following partitions were not cleaned up: %s",
[cp.getId() for cp in leaked_partitions])
"The following partitions were not cleaned up: %s",
[cp.getId() for cp in leaked_partitions],
)
cls._storeSystemSnapshot(
"{}._cleanup leaked_partitions".format(snapshot_name))
"{}._cleanup leaked_partitions".format(snapshot_name)
)
for cp in leaked_partitions:
try:
# XXX is this really the reference ?
partition_reference = cp.getInstanceParameterDict()['instance_title']
partition_reference = cp.getInstanceParameterDict()["instance_title"]
assert isinstance(partition_reference, str)
cls.slap.request(
software_release=cp.getSoftwareRelease().getURI(),
# software_type=cp.getType(), # TODO
partition_reference=partition_reference,
state="destroyed")
software_release=cp.getSoftwareRelease().getURI(),
# software_type=cp.getType(), # TODO
partition_reference=partition_reference,
state="destroyed",
)
except:
cls.logger.exception(
"Error during request destruction of leaked partition")
"Error during request destruction of leaked partition",
)
cls._storeSystemSnapshot(
"{}._cleanup leaked_partitions request destruction".format(
snapshot_name))
f"{snapshot_name}._cleanup leaked_partitions request destruction",
)
try:
# To make debug usable, we tolerate report_max_retry-1 errors and
# only debug the last.
......@@ -714,61 +839,109 @@ class SlapOSInstanceTestCase(unittest.TestCase):
except SlapOSNodeCommandError:
cls.slap.waitForReport(debug=True)
else:
cls.slap.waitForReport(max_retry=cls.report_max_retry, debug=cls._debug)
cls.slap.waitForReport(
max_retry=cls.report_max_retry,
debug=cls._debug,
)
except:
cls.logger.exception(
"Error during leaked partitions actual destruction")
"Error during leaked partitions actual destruction",
)
cls._storeSystemSnapshot(
"{}._cleanup leaked_partitions waitForReport".format(snapshot_name))
f"{snapshot_name}._cleanup leaked_partitions waitForReport",
)
try:
cls.slap.stop()
except:
cls.logger.exception("Error during stop")
cls._storeSystemSnapshot("{}._cleanup stop".format(snapshot_name))
cls._storeSystemSnapshot(f"{snapshot_name}._cleanup stop")
leaked_supervisor_configs = glob.glob(
os.path.join(
cls.slap.instance_directory, 'etc', 'supervisord.conf.d', '*.conf'))
os.path.join(
cls.slap.instance_directory,
"etc/supervisord.conf.d/*.conf",
)
)
if leaked_supervisor_configs:
for config in leaked_supervisor_configs:
os.unlink(config)
raise AssertionError(
"Test leaked supervisor configurations: %s" %
leaked_supervisor_configs)
f"Test leaked supervisor configurations: {leaked_supervisor_configs}",
)
@classmethod
def requestDefaultInstance(cls, state='started'):
def requestDefaultInstance(
cls,
state: str = "started", # TODO: Change to enum/Literal when all code is Python 3.
) -> ComputerPartition:
software_url = cls.getSoftwareURL()
software_type = cls.getInstanceSoftwareType()
cls.logger.debug(
'requesting "%s" software:%s type:%r state:%s parameters:%s',
cls.default_partition_reference, software_url, software_type, state,
cls._instance_parameter_dict)
'requesting "%s" software:%s type:%r state:%s parameters:%s',
cls.default_partition_reference,
software_url,
software_type,
state,
cls._instance_parameter_dict,
)
return cls.slap.request(
software_release=software_url,
software_type=software_type,
partition_reference=cls.default_partition_reference,
partition_parameter_kw=cls._instance_parameter_dict,
state=state)
software_release=software_url,
software_type=software_type,
partition_reference=cls.default_partition_reference,
partition_parameter_kw=cls._instance_parameter_dict,
state=state,
)
@classmethod
def getPartitionId(cls, instance_name):
query = "SELECT reference FROM partition%s WHERE partition_reference=?" % DB_VERSION
with sqlite3.connect(os.path.join(
cls._base_directory,
'var/proxy.db',
)) as db:
def getPartitionId(cls, instance_name: str) -> str:
"""
Get the id of the partition.
Args:
instance_name: Name of the instance.
Returns:
Id of the partition.
"""
query = (
f"SELECT reference FROM partition{DB_VERSION} "
f"WHERE partition_reference=?"
)
with sqlite3.connect(
os.path.join(
cls._base_directory,
"var/proxy.db",
)
) as db:
return db.execute(query, (instance_name,)).fetchall()[0][0]
@classmethod
def getPartitionIPv6(cls, partition_id):
query = "SELECT address FROM partition_network%s WHERE partition_reference=?" % DB_VERSION
with sqlite3.connect(os.path.join(
cls._base_directory,
'var/proxy.db',
)) as db:
def getPartitionIPv6(cls, partition_id: str) -> str:
"""
Get the IP address of the partition.
Args:
partition_id: Id of the partition.
Returns:
An IPv6 address in presentation (string) format.
"""
query = (
f"SELECT address FROM partition_network{DB_VERSION} "
f"WHERE partition_reference=?"
)
with sqlite3.connect(
os.path.join(
cls._base_directory,
"var/proxy.db",
)
) as db:
rows = db.execute(query, (partition_id,)).fetchall()
# do not assume the partition's IPv6 address is the second one,
# instead find the first address that is IPv6
for (address,) in rows:
if valid_ipv6(address):
return address
raise ValueError("Missing IPv6 address")
......@@ -46,7 +46,7 @@ from ..grid.utils import getPythonExecutableFromSoftwarePath
try:
import typing
if typing.TYPE_CHECKING:
from PIL import Image # pylint:disable=unused-import
from PIL.Image import Image # pylint:disable=unused-import
from .testcase import SlapOSInstanceTestCase
except ImportError:
pass
......
......@@ -94,8 +94,12 @@ class SafeXMLUnmrshaller(Unmarshaller, object):
loads = SafeXMLUnmrshaller().loads
def mkdir_p(path, mode=0o700):
"""\
def mkdir_p(
path, # type: str
mode=0o700, # type: int
):
# type: (...) -> None
"""
Creates a directory and its parents, if needed.
NB: If the directory already exists, it does not change its permission.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment