Commit 3f30659f authored by Tomáš Peterka's avatar Tomáš Peterka

[manager.cpuset] Manager interface and cpuset implementation (with tests)

parent 94b4c294
...@@ -55,6 +55,7 @@ setup(name=name, ...@@ -55,6 +55,7 @@ setup(name=name,
'zc.buildout', 'zc.buildout',
'cliff', 'cliff',
'requests>=2.4.3', 'requests>=2.4.3',
'six',
'uritemplate', # used by hateoas navigator 'uritemplate', # used by hateoas navigator
] + additional_install_requires, ] + additional_install_requires,
extras_require={ extras_require={
......
...@@ -10,3 +10,16 @@ Manager is a plugin-like class that is being run in multiple phases of slapos no ...@@ -10,3 +10,16 @@ Manager is a plugin-like class that is being run in multiple phases of slapos no
Constructor will receive configuration of current stage. Then each method receives Constructor will receive configuration of current stage. Then each method receives
object most related to the current operation. For details see <slapos/manager/interface.py>. object most related to the current operation. For details see <slapos/manager/interface.py>.
In code, a list of manager instances can be easily retreived by
from slapos import manager
manager_list = manager.from_config(config)
Where `from_config` extracts "manager_list" item from dict-like `config` argument
and then dynamically loads modules named according to the configuration inside
`slapos.manager` package. The manager must be a class named Manager and implementing
interface `slapos.manager.interface.IManager`.
Managers might require a list of user for whom they are allowed to perform tasks.
This list of users is given by "power_user_list" in [slapos] section in the
config file.
...@@ -280,7 +280,10 @@ class Computer(object): ...@@ -280,7 +280,10 @@ class Computer(object):
# attributes starting with '_' are saved from serialization # attributes starting with '_' are saved from serialization
# monkey-patch use of class instead of dictionary # monkey-patch use of class instead of dictionary
self._config = config if isinstance(config, dict) else config.__dict__ if config is None:
logger.warning("Computer needs config in constructor to allow managers.")
self._config = config if config is None or isinstance(config, dict) else config.__dict__
self._manager_list = slapmanager.from_config(self._config) self._manager_list = slapmanager.from_config(self._config)
def __getinitargs__(self): def __getinitargs__(self):
......
...@@ -1086,7 +1086,7 @@ stderr_logfile_backups=1 ...@@ -1086,7 +1086,7 @@ stderr_logfile_backups=1
# call manager for every software release # call manager for every software release
for manager in self._manager_list: for manager in self._manager_list:
manager.instance(partition) manager.instance(local_partition)
if computer_partition_state == COMPUTER_PARTITION_STARTED_STATE: if computer_partition_state == COMPUTER_PARTITION_STARTED_STATE:
local_partition.install() local_partition.install()
......
# coding: utf-8 # coding: utf-8
import re
import importlib import importlib
import re
import six
from zope.interface import declarations from zope.interface import declarations
...@@ -12,20 +13,27 @@ def load_manager(name): ...@@ -12,20 +13,27 @@ def load_manager(name):
if re.match(r'[a-zA-Z_]', name) is None: if re.match(r'[a-zA-Z_]', name) is None:
raise ValueError("Manager name \"{!s}\" is not allowed! Must contain only letters and \"_\"". raise ValueError("Manager name \"{!s}\" is not allowed! Must contain only letters and \"_\"".
format(name)) format(name))
manager_module_name = "slapos.manager.{}".format(name)
from slapos.manager import interface from slapos.manager import interface
manager_module = importlib.import_module("slapos.manager." + name) manager_module = importlib.import_module(manager_module_name)
if not hasattr(manager_module, "Manager"): if not hasattr(manager_module, "Manager"):
raise AttributeError("Manager class in {} has to be called \"Manager\"".format( raise AttributeError("Manager class in {} has to be called \"Manager\"".format(
name)) manager_module_name))
if not interface.IManager.implementedBy(manager_module.Manager): if not interface.IManager.implementedBy(manager_module.Manager):
raise RuntimeError("Manager class in {} must zope.interface.implements \"IManager\"".format( raise RuntimeError("Manager class in {} must zope.interface.implements \"IManager\"".format(
name)) manager_module_name))
return manager_module.Manager return manager_module.Manager
def from_config(config): def from_config(config):
"""Return list of instances of managers allowed from the config.""" """Return list of instances of managers allowed from the config."""
name_list = config.get(config_option, "").split() if config is None:
return []
name_list = config.get(config_option, "")
if isinstance(name_list, six.string_types):
name_list = name_list.replace(",", " ").split()
return [load_manager(name)(config) for name in name_list] return [load_manager(name)(config) for name in name_list]
\ No newline at end of file
...@@ -2,6 +2,8 @@ ...@@ -2,6 +2,8 @@
import logging import logging
import os import os
import os.path import os.path
import pwd
import time
from zope import interface as zope_interface from zope import interface as zope_interface
from slapos.manager import interface from slapos.manager import interface
...@@ -82,13 +84,14 @@ class Manager(object): ...@@ -82,13 +84,14 @@ class Manager(object):
for cpu_folder in self._cpu_folder_list()] for cpu_folder in self._cpu_folder_list()]
# Gather exclusive CPU usage map {username: set[cpu_id]} # Gather exclusive CPU usage map {username: set[cpu_id]}
cpu_usage = defaultdict(set) # We do not need to gather that since we have no limits yet
for cpu_id in self._cpu_id_list()[1:]: # skip the first public CPU #cpu_usage = defaultdict(set)
pids = [int(pid) #for cpu_id in self._cpu_id_list()[1:]: # skip the first public CPU
for pid in read_file(cpu_tasks_file_list[cpu_id]).splitlines()] # pids = [int(pid)
for pid in pids: # for pid in read_file(cpu_tasks_file_list[cpu_id]).splitlines()]
process = psutil.Process(pid) # for pid in pids:
cpu_usage[process.username()].add(cpu_id) # process = psutil.Process(pid)
# cpu_usage[process.username()].add(cpu_id)
# Move all PIDs from the pool of all CPUs onto the first exclusive CPU. # Move all PIDs from the pool of all CPUs onto the first exclusive CPU.
running_list = sorted(list(map(int, read_file(tasks_file).split())), reverse=True) running_list = sorted(list(map(int, read_file(tasks_file).split())), reverse=True)
...@@ -105,7 +108,7 @@ class Manager(object): ...@@ -105,7 +108,7 @@ class Manager(object):
"Suceeded in moving {:d} PIDs {!s}\n".format( "Suceeded in moving {:d} PIDs {!s}\n".format(
len(refused_set), refused_set, len(success_set), success_set)) len(refused_set), refused_set, len(success_set), success_set))
cpu_list = self._cpu_folder_list() cpu_folder_list = self._cpu_folder_list()
generic_cpu_path = cpu_folder_list[0] generic_cpu_path = cpu_folder_list[0]
exclusive_cpu_path_list = cpu_folder_list[1:] exclusive_cpu_path_list = cpu_folder_list[1:]
...@@ -116,7 +119,7 @@ class Manager(object): ...@@ -116,7 +119,7 @@ class Manager(object):
# gather already exclusively running PIDs # gather already exclusively running PIDs
exclusive_pid_set = set() exclusive_pid_set = set()
for cpu_tasks_file in cpu_tasks_file_list[1:]: for cpu_tasks_file in cpu_tasks_file_list[1:]:
exclusive_pid_set.update(map(int, read_content(cpu_tasks_file).split())) exclusive_pid_set.update(map(int, read_file(cpu_tasks_file).split()))
# Move processes to their demanded exclusive CPUs # Move processes to their demanded exclusive CPUs
with open(request_file, "rt") as fi: with open(request_file, "rt") as fi:
...@@ -135,7 +138,7 @@ class Manager(object): ...@@ -135,7 +138,7 @@ class Manager(object):
def _cpu_folder_list(self): def _cpu_folder_list(self):
"""Return list of folders for exclusive cpu cores.""" """Return list of folders for exclusive cpu cores."""
return [os.path.join(self.cpuset_path, "cpu" + str(cpu_id)) return [os.path.join(self.cpuset_path, "cpu" + str(cpu_id))
for cpu_id in self._cpu_id_list] for cpu_id in self._cpu_id_list()]
def _cpu_id_list(self): def _cpu_id_list(self):
"""Extract IDs of available CPUs and return them as a list. """Extract IDs of available CPUs and return them as a list.
......
...@@ -155,14 +155,18 @@ class PwdMock: ...@@ -155,14 +155,18 @@ class PwdMock:
global USER_LIST global USER_LIST
if name in USER_LIST: if name in USER_LIST:
class PwdResult: class PwdResult:
pw_uid = 0 def __init__(self, name):
pw_gid = 0 self.pw_name = name
self.pw_uid = self.pw_gid = USER_LIST.index(name)
def __getitem__(self, index): def __getitem__(self, index):
if index == 0:
return self.pw_name
if index == 2: if index == 2:
return self.pw_uid return self.pw_uid
if index == 3: if index == 3:
return self.pw_gid return self.pw_gid
return PwdResult() return PwdResult(name)
raise KeyError("User \"{}\" not in global USER_LIST {!s}".format(name, USER_LIST)) raise KeyError("User \"{}\" not in global USER_LIST {!s}".format(name, USER_LIST))
...@@ -656,10 +660,14 @@ class TestComputer(SlapformatMixin): ...@@ -656,10 +660,14 @@ class TestComputer(SlapformatMixin):
class SlapGridPartitionMock: class SlapGridPartitionMock:
def __init__(self, partition): def __init__(self, partition):
self.partition = partition self.partition = partition
self.instance_path = partition.path self.instance_path = partition.path
def getUserGroupId(self):
return (0, 0)
class TestComputerWithCPUSet(SlapformatMixin): class TestComputerWithCPUSet(SlapformatMixin):
...@@ -667,6 +675,9 @@ class TestComputerWithCPUSet(SlapformatMixin): ...@@ -667,6 +675,9 @@ class TestComputerWithCPUSet(SlapformatMixin):
task_write_mode = "at" # append insted of write tasks PIDs for the tests task_write_mode = "at" # append insted of write tasks PIDs for the tests
def setUp(self): def setUp(self):
logging.getLogger("slapos.manager.cpuset").addHandler(
logging.StreamHandler())
super(TestComputerWithCPUSet, self).setUp() super(TestComputerWithCPUSet, self).setUp()
self.restoreOs() self.restoreOs()
...@@ -710,14 +721,13 @@ class TestComputerWithCPUSet(SlapformatMixin): ...@@ -710,14 +721,13 @@ class TestComputerWithCPUSet(SlapformatMixin):
], ],
config={ config={
"manager_list": "cpuset", "manager_list": "cpuset",
"power_user_list": "testuser" "power_user_list": "testuser root"
} }
) )
# self.patchOs(self.logger) # self.patchOs(self.logger)
def tearDown(self): def tearDown(self):
"""Cleanup temporary test folders.""" """Cleanup temporary test folders."""
from slapos.manager.cpuset import Manager from slapos.manager.cpuset import Manager
Manager.cpuset_path = self.orig_cpuset_path Manager.cpuset_path = self.orig_cpuset_path
Manager.task_write_mode = self.orig_task_write_mode Manager.task_write_mode = self.orig_task_write_mode
...@@ -726,6 +736,7 @@ class TestComputerWithCPUSet(SlapformatMixin): ...@@ -726,6 +736,7 @@ class TestComputerWithCPUSet(SlapformatMixin):
shutil.rmtree("/tmp/slapgrid/") shutil.rmtree("/tmp/slapgrid/")
if self.cpuset_path.startswith("/tmp"): if self.cpuset_path.startswith("/tmp"):
shutil.rmtree(self.cpuset_path) shutil.rmtree(self.cpuset_path)
logging.getLogger("slapos.manager.cpuset")
def test_positive_cgroups(self): def test_positive_cgroups(self):
"""Positive test of cgroups.""" """Positive test of cgroups."""
...@@ -741,25 +752,23 @@ class TestComputerWithCPUSet(SlapformatMixin): ...@@ -741,25 +752,23 @@ class TestComputerWithCPUSet(SlapformatMixin):
if cpu_id > 0: if cpu_id > 0:
self.assertEqual("", file_content(os.path.join(cpu_n_path, "tasks"))) self.assertEqual("", file_content(os.path.join(cpu_n_path, "tasks")))
# Simulate slapos instance call # Test moving tasks from generic core to private core
self.computer._manager_list[0].instance(SlapGridPartitionMock(self.computer.partition_list[0]))
# Test that format moved all PIDs from CPU pool into CPU0
tasks_at_cpu0 = file_content(os.path.join(self.cpuset_path, "cpu0", "tasks")).split()
self.assertIn("1000", tasks_at_cpu0)
self.assertIn("1001", tasks_at_cpu0)
self.assertIn("1002", tasks_at_cpu0)
# Simulate cgroup behaviour - empty tasks in the pool
file_write("", os.path.join(self.cpuset_path, "tasks"))
# test moving tasks from generic core to private core
# request PID 1001 to be moved to its private CPU # request PID 1001 to be moved to its private CPU
request_file_path = os.path.join(self.computer.partition_list[0].path, request_file_path = os.path.join(self.computer.partition_list[0].path,
self.cpu_exclusive_file) slapos.manager.cpuset.Manager.cpu_exclusive_file)
file_write("1001\n", request_file_path) file_write("1001\n", request_file_path)
# let format do the moving # Simulate slapos instance call to perform the actual movement
self.computer.update() self.computer._manager_list[0].instance(
SlapGridPartitionMock(self.computer.partition_list[0]))
# Simulate cgroup behaviour - empty tasks in the pool
file_write("", os.path.join(self.cpuset_path, "tasks"))
# Test that format moved all PIDs from CPU pool into CPU0
tasks_at_cpu0 = file_content(os.path.join(self.cpuset_path, "cpu0", "tasks")).split()
self.assertIn("1000", tasks_at_cpu0)
# test if the moving suceeded into any provate CPUS (id>0) # test if the moving suceeded into any provate CPUS (id>0)
self.assertTrue(any("1001" in file_content(exclusive_task) self.assertTrue(any("1001" in file_content(exclusive_task)
for exclusive_task in glob.glob(os.path.join(self.cpuset_path, "cpu[1-9]", "tasks")))) for exclusive_task in glob.glob(os.path.join(self.cpuset_path, "cpu[1-9]", "tasks"))))
self.assertIn("1002", tasks_at_cpu0)
# slapformat should remove successfully moved PIDs from the .slapos-cpu-exclusive file # slapformat should remove successfully moved PIDs from the .slapos-cpu-exclusive file
self.assertEqual("", file_content(request_file_path).strip()) self.assertEqual("", file_content(request_file_path).strip())
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment