diff --git a/slapos/manager/cpuset.py b/slapos/manager/cpuset.py index 0f5b0bf25bc57b2651b49f978b8165af6d0d7ce3..bfec635808545c965508549c1c0b5a4fc9958a1d 100644 --- a/slapos/manager/cpuset.py +++ b/slapos/manager/cpuset.py @@ -1,5 +1,6 @@ # coding: utf-8 import logging +import math import os import os.path import pwd @@ -45,20 +46,39 @@ class Manager(object): logger.warning("CPUSet Manager cannot format computer because cgroups do not exist.") return - for cpu in self._cpu_id_list(): + def create_cpuset_subtree(name, cpu_id_list): + """Create root for cpuset with per-CPU folders.""" + if not cpu_id_list: + logger.warning("Too less cores - cannot reserve any for \"{}\"".format(name)) + return + + root_path = self._prepare_folder( + os.path.join(self.cpuset_path, name)) + with open(root_path + "/cpuset.cpus", "wt") as fx: + # this cgroup manages all `name` cpus + if len(cpu_id_list) == 1: + fx.write(str(cpu_id_list[0])) + else: + fx.write("{:d}-{:d}".format(cpu_id_list[0], cpu_id_list[-1])) + + create_cpuset_subtree("shared", self._shared_cpu_id_list()) + create_cpuset_subtree("exclusive", self._exclusive_cpu_id_list()) + # separate CPUs in exclusive set + exclusive_cpu_path = os.path.join(self.cpuset_path, "exclusive") + write_file("1", exclusive_cpu_path + "/cpuset.cpu_exclusive") # exclusive + write_file("0", exclusive_cpu_path + "/cpuset.mems") # it doesn't work without that + for cpu in self._exclusive_cpu_id_list(): cpu_path = self._prepare_folder( - os.path.join(self.cpuset_path, "cpu" + str(cpu))) - with open(cpu_path + "/cpuset.cpus", "wt") as fx: - fx.write(str(cpu)) # this cgroup manages only this cpu - with open(cpu_path + "/cpuset.cpu_exclusive", "wt") as fx: - fx.write("1") # manages it exclusively - with open(cpu_path + "/cpuset.mems", "wt") as fx: - fx.write("0") # it doesn't work without that + os.path.join(self.cpuset_path, "exclusive", "cpu" + str(cpu))) + write_file(str(cpu), cpu_path + "/cpuset.cpus") # manage only this cpu + write_file("1", cpu_path + "/cpuset.cpu_exclusive") # exclusive + write_file("0", cpu_path + "/cpuset.mems") # it doesn't work without that + def instance(self, partition): """Control runtime state of the computer.""" - if not os.path.exists(os.path.join(self.cpuset_path, "cpu0")): + if not os.path.exists(os.path.join(self.cpuset_path, "shared")): # check whether the computer was formatted logger.warning("CGROUP's CPUSET Manager cannot update computer because it is not cpuset-formatted.") return @@ -73,15 +93,13 @@ class Manager(object): uid, gid = partition.getUserGroupId() uname = pwd.getpwuid(uid).pw_name if uname not in power_user_list: - logger.warning("User {} not allowed to modify cpuset! " + logger.warning("{} is not allowed to modify cpuset! " "Allowed users are in {} option in config file.".format( uname, self.config_power_user_option)) return # prepare paths to tasks file for all and per-cpu tasks_file = os.path.join(self.cpuset_path, "tasks") - cpu_tasks_file_list = [os.path.join(cpu_folder, "tasks") - for cpu_folder in self._cpu_folder_list()] # Gather exclusive CPU usage map {username: set[cpu_id]} # We do not need to gather that since we have no limits yet @@ -93,13 +111,14 @@ class Manager(object): # process = psutil.Process(pid) # cpu_usage[process.username()].add(cpu_id) - # Move all PIDs from the pool of all CPUs onto the first exclusive CPU. + # Move all PIDs from the pool of all CPUs onto shared CPUs. running_list = sorted(list(map(int, read_file(tasks_file).split())), reverse=True) - first_cpu = self._cpu_id_list()[0] + shared_tasks_path = os.path.join(self.cpuset_path, "shared", "tasks") + exclusive_tasks_path = os.path.join(self.cpuset_path, "exclusive", "tasks") success_set, refused_set = set(), set() for pid in running_list: try: - self._move_task(pid, first_cpu) + write_file("{:d}\n".format(pid), shared_tasks_path, mode=self.task_write_mode) success_set.add(pid) time.sleep(0.01) except IOError as e: @@ -108,37 +127,40 @@ class Manager(object): "Suceeded in moving {:d} PIDs {!s}\n".format( len(refused_set), refused_set, len(success_set), success_set)) - cpu_folder_list = self._cpu_folder_list() - generic_cpu_path = cpu_folder_list[0] - exclusive_cpu_path_list = cpu_folder_list[1:] - # Gather all running PIDs for filtering out stale PIDs running_pid_set = set(running_list) - running_pid_set.update(map(int, read_file(cpu_tasks_file_list[0]).split())) + running_pid_set.update(map(int, read_file(shared_tasks_path).split())) - # gather already exclusively running PIDs + # Gather already exclusively running PIDs exclusive_pid_set = set() - for cpu_tasks_file in cpu_tasks_file_list[1:]: + for cpu_tasks_file in [exclusive_tasks_path, ] + self._exclusive_cpu_tasks_list(): exclusive_pid_set.update(map(int, read_file(cpu_tasks_file).split())) - # Move processes to their demanded exclusive CPUs - with open(request_file, "rt") as fi: - # take such PIDs which are either really running or are not already exclusive - request_pid_list = [int(pid) for pid in fi.read().split() - if int(pid) in running_pid_set or int(pid) not in exclusive_pid_set] - with open(request_file, "wt") as fo: - fo.write("") # empty file (we will write back only PIDs which weren't moved) - for request_pid in request_pid_list: + # move processes to their demanded exclusive CPUs + request_pid_list = map(int, read_file(request_file).split()) + # empty the request file (we will write back only PIDs which weren't moved) + write_file("", request_file) + # take such PIDs which are either really running or are not already exclusive + for request_pid in filter(lambda pid: pid in running_pid_set and pid not in exclusive_pid_set, request_pid_list): assigned_cpu = self._move_to_exclusive_cpu(request_pid) if assigned_cpu < 0: # if no exclusive CPU was assigned - write the PID back and try other time - with open(request_file, "at") as fo: - fo.write(str(request_pid) + "\n") + write_file("{:d}\n".format(request_pid), request_file, mode="at") - def _cpu_folder_list(self): + def _exclusive_cpu_tasks_list(self): """Return list of folders for exclusive cpu cores.""" - return [os.path.join(self.cpuset_path, "cpu" + str(cpu_id)) - for cpu_id in self._cpu_id_list()] + return [os.path.join(self.cpuset_path, "exclusive", "cpu" + str(cpu_id), "tasks") + for cpu_id in self._exclusive_cpu_id_list()] + + def _shared_cpu_id_list(self): + """Return list of shared CPU core IDs.""" + cpu_id_list = self._cpu_id_list() + return cpu_id_list[:int(math.ceil(len(cpu_id_list) / 2))] + + def _exclusive_cpu_id_list(self): + """Return list of exclusive CPU core IDs.""" + cpu_id_list = self._cpu_id_list() + return cpu_id_list[int(math.ceil(len(cpu_id_list) / 2)):] def _cpu_id_list(self): """Extract IDs of available CPUs and return them as a list. @@ -162,12 +184,12 @@ class Manager(object): :return: int, cpu_id of used CPU, -1 if placement was not possible """ - exclusive_cpu_list = self._cpu_id_list()[1:] + exclusive_cpu_list = self._exclusive_cpu_id_list() for exclusive_cpu in exclusive_cpu_list: # gather tasks assigned to current exclusive CPU - task_path = os.path.join(self.cpuset_path, "cpu" + str(exclusive_cpu), "tasks") - with open(task_path, "rt") as fi: - task_list = fi.read().split() + task_path = os.path.join( + self.cpuset_path, "exclusive", "cpu" + str(exclusive_cpu), "tasks") + task_list = read_file(task_path).split() if len(task_list) > 0: continue # skip occupied CPUs return self._move_task(pid, exclusive_cpu)[1] @@ -179,8 +201,11 @@ class Manager(object): cpu_mode can be "performance" or "powersave" """ known_cpu_mode_list = ("performance", "powersave") - with open(os.path.join(self.cpuset_path, "cpu" + str(cpu_id), "tasks"), self.task_write_mode) as fo: - fo.write(str(pid) + "\n") + if cpu_id not in self._exclusive_cpu_id_list(): + raise ValueError("Cannot assign to cpu{:d} because it is shared".format(cpu_id)) + # write the PID into concrete tasks file + cpu_tasks = os.path.join(self.cpuset_path, "exclusive", "cpu" + str(cpu_id), "tasks") + write_file("{:d}\n".format(pid), cpu_tasks, mode=self.task_write_mode) # set the core to `cpu_mode` scaling_governor_file = "/sys/devices/system/cpu/cpu{:d}/cpufreq/scaling_governor".format(cpu_id) if os.path.exists(scaling_governor_file): @@ -216,5 +241,9 @@ def read_file(path, mode="rt"): def write_file(content, path, mode="wt"): - with open(path, mode) as fo: - fo.write(content) \ No newline at end of file + try: + with open(path, mode) as fo: + fo.write(content) + except IOError as e: + logger.error("Cannot write to {}".format(path)) + raise \ No newline at end of file diff --git a/slapos/tests/slapformat.py b/slapos/tests/slapformat.py index d84abfb905fd536de0a84bf9e073e10f9fbf583f..2f5184d3f3b1fc3b335867d013b2bec1e1a45179 100644 --- a/slapos/tests/slapformat.py +++ b/slapos/tests/slapformat.py @@ -744,31 +744,49 @@ class TestComputerWithCPUSet(SlapformatMixin): self.assertEqual(self.computer._manager_list[0]._cpu_id_list(), self.cpu_list) # This should created per-cpu groups and move all tasks in CPU pool into cpu0 self.computer.format(alter_network=False, alter_user=False) + # Test files creation for exclusive CPUs - for cpu_id in self.cpu_list: - cpu_n_path = os.path.join(self.cpuset_path, "cpu" + str(cpu_id)) + # First half of CPUs is shared + shared_cpuset_path = os.path.join(self.cpuset_path, "shared") + for cpu_id in self.cpu_list[:2]: + self.assertFalse(os.path.exists( + os.path.join(shared_cpuset_path, "cpu" + str(cpu_id))), + "Specific cpu doesn't make sense in shared cpuset.") + + # The second half of CPUs most be ready for exclusive + exclusive_cpuset_path = os.path.join(self.cpuset_path, "exclusive") + for cpu_id in self.cpu_list[2:]: + cpu_n_path = os.path.join(exclusive_cpuset_path, "cpu" + str(cpu_id)) self.assertEqual(str(cpu_id), file_content(os.path.join(cpu_n_path, "cpuset.cpus"))) self.assertEqual("1", file_content(os.path.join(cpu_n_path, "cpuset.cpu_exclusive"))) - if cpu_id > 0: - self.assertEqual("", file_content(os.path.join(cpu_n_path, "tasks"))) - + for cpu_id in self.cpu_list[:2]: + self.assertFalse(os.path.exists( + os.path.join(exclusive_cpuset_path, "cpu" + str(cpu_id))), + "Shared CPU cannot appear in EXCLUSIVE set.") + # Test moving tasks from generic core to private core # request PID 1001 to be moved to its private CPU request_file_path = os.path.join(self.computer.partition_list[0].path, slapos.manager.cpuset.Manager.cpu_exclusive_file) - file_write("1001\n", request_file_path) + # ask exclusivity for existing and non-existing PID + file_write("1001\n9999\n", request_file_path) # Simulate slapos instance call to perform the actual movement self.computer._manager_list[0].instance( SlapGridPartitionMock(self.computer.partition_list[0])) # Simulate cgroup behaviour - empty tasks in the pool file_write("", os.path.join(self.cpuset_path, "tasks")) - # Test that format moved all PIDs from CPU pool into CPU0 - tasks_at_cpu0 = file_content(os.path.join(self.cpuset_path, "cpu0", "tasks")).split() - self.assertIn("1000", tasks_at_cpu0) + # Test that format moved all PIDs from CPU pool into shared pool + shared_task_list = file_content(os.path.join(self.cpuset_path, "shared", "tasks")).split() + self.assertIn("1000", shared_task_list) + self.assertIn("1002", shared_task_list) # test if the moving suceeded into any provate CPUS (id>0) - self.assertTrue(any("1001" in file_content(exclusive_task) - for exclusive_task in glob.glob(os.path.join(self.cpuset_path, "cpu[1-9]", "tasks")))) - self.assertIn("1002", tasks_at_cpu0) + exclusive_tasks_path_list = [ + os.path.join(self.cpuset_path, "exclusive", "tasks"), + ] + glob.glob(os.path.join(self.cpuset_path, "exclusive", "cpu[1-9]", "tasks")) + self.assertTrue(any("1001" in file_content(exclusive_tasks_path) + for exclusive_tasks_path in exclusive_tasks_path_list)) + self.assertFalse(any("9999" in file_content(exclusive_tasks_path) + for exclusive_tasks_path in exclusive_tasks_path_list)) # slapformat should remove successfully moved PIDs from the .slapos-cpu-exclusive file self.assertEqual("", file_content(request_file_path).strip())