Commit 8bb366ab by Tomáš Peterka

[manager.cpuset] Split CPU cores half-half between shared and exclusive

1 parent d041e861
# coding: utf-8 # coding: utf-8
import logging import logging
import math
import os import os
import os.path import os.path
import pwd import pwd
...@@ -45,20 +46,39 @@ class Manager(object): ...@@ -45,20 +46,39 @@ class Manager(object):
logger.warning("CPUSet Manager cannot format computer because cgroups do not exist.") logger.warning("CPUSet Manager cannot format computer because cgroups do not exist.")
return return
for cpu in self._cpu_id_list(): def create_cpuset_subtree(name, cpu_id_list):
"""Create root for cpuset with per-CPU folders."""
if not cpu_id_list:
logger.warning("Too less cores - cannot reserve any for \"{}\"".format(name))
return
root_path = self._prepare_folder(
os.path.join(self.cpuset_path, name))
with open(root_path + "/cpuset.cpus", "wt") as fx:
# this cgroup manages all `name` cpus
if len(cpu_id_list) == 1:
fx.write(str(cpu_id_list[0]))
else:
fx.write("{:d}-{:d}".format(cpu_id_list[0], cpu_id_list[-1]))
create_cpuset_subtree("shared", self._shared_cpu_id_list())
create_cpuset_subtree("exclusive", self._exclusive_cpu_id_list())
# separate CPUs in exclusive set
exclusive_cpu_path = os.path.join(self.cpuset_path, "exclusive")
write_file("1", exclusive_cpu_path + "/cpuset.cpu_exclusive") # exclusive
write_file("0", exclusive_cpu_path + "/cpuset.mems") # it doesn't work without that
for cpu in self._exclusive_cpu_id_list():
cpu_path = self._prepare_folder( cpu_path = self._prepare_folder(
os.path.join(self.cpuset_path, "cpu" + str(cpu))) os.path.join(self.cpuset_path, "exclusive", "cpu" + str(cpu)))
with open(cpu_path + "/cpuset.cpus", "wt") as fx: write_file(str(cpu), cpu_path + "/cpuset.cpus") # manage only this cpu
fx.write(str(cpu)) # this cgroup manages only this cpu write_file("1", cpu_path + "/cpuset.cpu_exclusive") # exclusive
with open(cpu_path + "/cpuset.cpu_exclusive", "wt") as fx: write_file("0", cpu_path + "/cpuset.mems") # it doesn't work without that
fx.write("1") # manages it exclusively
with open(cpu_path + "/cpuset.mems", "wt") as fx:
fx.write("0") # it doesn't work without that
def instance(self, partition): def instance(self, partition):
"""Control runtime state of the computer.""" """Control runtime state of the computer."""
if not os.path.exists(os.path.join(self.cpuset_path, "cpu0")): if not os.path.exists(os.path.join(self.cpuset_path, "shared")):
# check whether the computer was formatted # check whether the computer was formatted
logger.warning("CGROUP's CPUSET Manager cannot update computer because it is not cpuset-formatted.") logger.warning("CGROUP's CPUSET Manager cannot update computer because it is not cpuset-formatted.")
return return
...@@ -73,15 +93,13 @@ class Manager(object): ...@@ -73,15 +93,13 @@ class Manager(object):
uid, gid = partition.getUserGroupId() uid, gid = partition.getUserGroupId()
uname = pwd.getpwuid(uid).pw_name uname = pwd.getpwuid(uid).pw_name
if uname not in power_user_list: if uname not in power_user_list:
logger.warning("User {} not allowed to modify cpuset! " logger.warning("{} is not allowed to modify cpuset! "
"Allowed users are in {} option in config file.".format( "Allowed users are in {} option in config file.".format(
uname, self.config_power_user_option)) uname, self.config_power_user_option))
return return
# prepare paths to tasks file for all and per-cpu # prepare paths to tasks file for all and per-cpu
tasks_file = os.path.join(self.cpuset_path, "tasks") tasks_file = os.path.join(self.cpuset_path, "tasks")
cpu_tasks_file_list = [os.path.join(cpu_folder, "tasks")
for cpu_folder in self._cpu_folder_list()]
# Gather exclusive CPU usage map {username: set[cpu_id]} # Gather exclusive CPU usage map {username: set[cpu_id]}
# We do not need to gather that since we have no limits yet # We do not need to gather that since we have no limits yet
...@@ -93,13 +111,14 @@ class Manager(object): ...@@ -93,13 +111,14 @@ class Manager(object):
# process = psutil.Process(pid) # process = psutil.Process(pid)
# cpu_usage[process.username()].add(cpu_id) # cpu_usage[process.username()].add(cpu_id)
# Move all PIDs from the pool of all CPUs onto the first exclusive CPU. # Move all PIDs from the pool of all CPUs onto shared CPUs.
running_list = sorted(list(map(int, read_file(tasks_file).split())), reverse=True) running_list = sorted(list(map(int, read_file(tasks_file).split())), reverse=True)
first_cpu = self._cpu_id_list()[0] shared_tasks_path = os.path.join(self.cpuset_path, "shared", "tasks")
exclusive_tasks_path = os.path.join(self.cpuset_path, "exclusive", "tasks")
success_set, refused_set = set(), set() success_set, refused_set = set(), set()
for pid in running_list: for pid in running_list:
try: try:
self._move_task(pid, first_cpu) write_file("{:d}\n".format(pid), shared_tasks_path, mode=self.task_write_mode)
success_set.add(pid) success_set.add(pid)
time.sleep(0.01) time.sleep(0.01)
except IOError as e: except IOError as e:
...@@ -108,37 +127,40 @@ class Manager(object): ...@@ -108,37 +127,40 @@ class Manager(object):
"Suceeded in moving {:d} PIDs {!s}\n".format( "Suceeded in moving {:d} PIDs {!s}\n".format(
len(refused_set), refused_set, len(success_set), success_set)) len(refused_set), refused_set, len(success_set), success_set))
cpu_folder_list = self._cpu_folder_list()
generic_cpu_path = cpu_folder_list[0]
exclusive_cpu_path_list = cpu_folder_list[1:]
# Gather all running PIDs for filtering out stale PIDs # Gather all running PIDs for filtering out stale PIDs
running_pid_set = set(running_list) running_pid_set = set(running_list)
running_pid_set.update(map(int, read_file(cpu_tasks_file_list[0]).split())) running_pid_set.update(map(int, read_file(shared_tasks_path).split()))
# gather already exclusively running PIDs # Gather already exclusively running PIDs
exclusive_pid_set = set() exclusive_pid_set = set()
for cpu_tasks_file in cpu_tasks_file_list[1:]: for cpu_tasks_file in [exclusive_tasks_path, ] + self._exclusive_cpu_tasks_list():
exclusive_pid_set.update(map(int, read_file(cpu_tasks_file).split())) exclusive_pid_set.update(map(int, read_file(cpu_tasks_file).split()))
# Move processes to their demanded exclusive CPUs # move processes to their demanded exclusive CPUs
with open(request_file, "rt") as fi: request_pid_list = map(int, read_file(request_file).split())
# take such PIDs which are either really running or are not already exclusive # empty the request file (we will write back only PIDs which weren't moved)
request_pid_list = [int(pid) for pid in fi.read().split() write_file("", request_file)
if int(pid) in running_pid_set or int(pid) not in exclusive_pid_set] # take such PIDs which are either really running or are not already exclusive
with open(request_file, "wt") as fo: for request_pid in filter(lambda pid: pid in running_pid_set and pid not in exclusive_pid_set, request_pid_list):
fo.write("") # empty file (we will write back only PIDs which weren't moved)
for request_pid in request_pid_list:
assigned_cpu = self._move_to_exclusive_cpu(request_pid) assigned_cpu = self._move_to_exclusive_cpu(request_pid)
if assigned_cpu < 0: if assigned_cpu < 0:
# if no exclusive CPU was assigned - write the PID back and try other time # if no exclusive CPU was assigned - write the PID back and try other time
with open(request_file, "at") as fo: write_file("{:d}\n".format(request_pid), request_file, mode="at")
fo.write(str(request_pid) + "\n")
def _cpu_folder_list(self): def _exclusive_cpu_tasks_list(self):
"""Return list of folders for exclusive cpu cores.""" """Return list of folders for exclusive cpu cores."""
return [os.path.join(self.cpuset_path, "cpu" + str(cpu_id)) return [os.path.join(self.cpuset_path, "exclusive", "cpu" + str(cpu_id), "tasks")
for cpu_id in self._cpu_id_list()] for cpu_id in self._exclusive_cpu_id_list()]
def _shared_cpu_id_list(self):
"""Return list of shared CPU core IDs."""
cpu_id_list = self._cpu_id_list()
return cpu_id_list[:int(math.ceil(len(cpu_id_list) / 2))]
def _exclusive_cpu_id_list(self):
"""Return list of exclusive CPU core IDs."""
cpu_id_list = self._cpu_id_list()
return cpu_id_list[int(math.ceil(len(cpu_id_list) / 2)):]
def _cpu_id_list(self): def _cpu_id_list(self):
"""Extract IDs of available CPUs and return them as a list. """Extract IDs of available CPUs and return them as a list.
...@@ -162,12 +184,12 @@ class Manager(object): ...@@ -162,12 +184,12 @@ class Manager(object):
:return: int, cpu_id of used CPU, -1 if placement was not possible :return: int, cpu_id of used CPU, -1 if placement was not possible
""" """
exclusive_cpu_list = self._cpu_id_list()[1:] exclusive_cpu_list = self._exclusive_cpu_id_list()
for exclusive_cpu in exclusive_cpu_list: for exclusive_cpu in exclusive_cpu_list:
# gather tasks assigned to current exclusive CPU # gather tasks assigned to current exclusive CPU
task_path = os.path.join(self.cpuset_path, "cpu" + str(exclusive_cpu), "tasks") task_path = os.path.join(
with open(task_path, "rt") as fi: self.cpuset_path, "exclusive", "cpu" + str(exclusive_cpu), "tasks")
task_list = fi.read().split() task_list = read_file(task_path).split()
if len(task_list) > 0: if len(task_list) > 0:
continue # skip occupied CPUs continue # skip occupied CPUs
return self._move_task(pid, exclusive_cpu)[1] return self._move_task(pid, exclusive_cpu)[1]
...@@ -179,8 +201,11 @@ class Manager(object): ...@@ -179,8 +201,11 @@ class Manager(object):
cpu_mode can be "performance" or "powersave" cpu_mode can be "performance" or "powersave"
""" """
known_cpu_mode_list = ("performance", "powersave") known_cpu_mode_list = ("performance", "powersave")
with open(os.path.join(self.cpuset_path, "cpu" + str(cpu_id), "tasks"), self.task_write_mode) as fo: if cpu_id not in self._exclusive_cpu_id_list():
fo.write(str(pid) + "\n") raise ValueError("Cannot assign to cpu{:d} because it is shared".format(cpu_id))
# write the PID into concrete tasks file
cpu_tasks = os.path.join(self.cpuset_path, "exclusive", "cpu" + str(cpu_id), "tasks")
write_file("{:d}\n".format(pid), cpu_tasks, mode=self.task_write_mode)
# set the core to `cpu_mode` # set the core to `cpu_mode`
scaling_governor_file = "/sys/devices/system/cpu/cpu{:d}/cpufreq/scaling_governor".format(cpu_id) scaling_governor_file = "/sys/devices/system/cpu/cpu{:d}/cpufreq/scaling_governor".format(cpu_id)
if os.path.exists(scaling_governor_file): if os.path.exists(scaling_governor_file):
...@@ -216,5 +241,9 @@ def read_file(path, mode="rt"): ...@@ -216,5 +241,9 @@ def read_file(path, mode="rt"):
def write_file(content, path, mode="wt"): def write_file(content, path, mode="wt"):
with open(path, mode) as fo:
fo.write(content)
\ No newline at end of file \ No newline at end of file
try:
with open(path, mode) as fo:
fo.write(content)
except IOError as e:
logger.error("Cannot write to {}".format(path))
raise
\ No newline at end of file \ No newline at end of file
...@@ -744,31 +744,49 @@ class TestComputerWithCPUSet(SlapformatMixin): ...@@ -744,31 +744,49 @@ class TestComputerWithCPUSet(SlapformatMixin):
self.assertEqual(self.computer._manager_list[0]._cpu_id_list(), self.cpu_list) self.assertEqual(self.computer._manager_list[0]._cpu_id_list(), self.cpu_list)
# This should created per-cpu groups and move all tasks in CPU pool into cpu0 # This should created per-cpu groups and move all tasks in CPU pool into cpu0
self.computer.format(alter_network=False, alter_user=False) self.computer.format(alter_network=False, alter_user=False)
# Test files creation for exclusive CPUs # Test files creation for exclusive CPUs
for cpu_id in self.cpu_list: # First half of CPUs is shared
cpu_n_path = os.path.join(self.cpuset_path, "cpu" + str(cpu_id)) shared_cpuset_path = os.path.join(self.cpuset_path, "shared")
for cpu_id in self.cpu_list[:2]:
self.assertFalse(os.path.exists(
os.path.join(shared_cpuset_path, "cpu" + str(cpu_id))),
"Specific cpu<N> doesn't make sense in shared cpuset.")
# The second half of CPUs most be ready for exclusive
exclusive_cpuset_path = os.path.join(self.cpuset_path, "exclusive")
for cpu_id in self.cpu_list[2:]:
cpu_n_path = os.path.join(exclusive_cpuset_path, "cpu" + str(cpu_id))
self.assertEqual(str(cpu_id), file_content(os.path.join(cpu_n_path, "cpuset.cpus"))) self.assertEqual(str(cpu_id), file_content(os.path.join(cpu_n_path, "cpuset.cpus")))
self.assertEqual("1", file_content(os.path.join(cpu_n_path, "cpuset.cpu_exclusive"))) self.assertEqual("1", file_content(os.path.join(cpu_n_path, "cpuset.cpu_exclusive")))
if cpu_id > 0: for cpu_id in self.cpu_list[:2]:
self.assertEqual("", file_content(os.path.join(cpu_n_path, "tasks"))) self.assertFalse(os.path.exists(
os.path.join(exclusive_cpuset_path, "cpu" + str(cpu_id))),
"Shared CPU cannot appear in EXCLUSIVE set.")
# Test moving tasks from generic core to private core # Test moving tasks from generic core to private core
# request PID 1001 to be moved to its private CPU # request PID 1001 to be moved to its private CPU
request_file_path = os.path.join(self.computer.partition_list[0].path, request_file_path = os.path.join(self.computer.partition_list[0].path,
slapos.manager.cpuset.Manager.cpu_exclusive_file) slapos.manager.cpuset.Manager.cpu_exclusive_file)
file_write("1001\n", request_file_path) # ask exclusivity for existing and non-existing PID
file_write("1001\n9999\n", request_file_path)
# Simulate slapos instance call to perform the actual movement # Simulate slapos instance call to perform the actual movement
self.computer._manager_list[0].instance( self.computer._manager_list[0].instance(
SlapGridPartitionMock(self.computer.partition_list[0])) SlapGridPartitionMock(self.computer.partition_list[0]))
# Simulate cgroup behaviour - empty tasks in the pool # Simulate cgroup behaviour - empty tasks in the pool
file_write("", os.path.join(self.cpuset_path, "tasks")) file_write("", os.path.join(self.cpuset_path, "tasks"))
# Test that format moved all PIDs from CPU pool into CPU0 # Test that format moved all PIDs from CPU pool into shared pool
tasks_at_cpu0 = file_content(os.path.join(self.cpuset_path, "cpu0", "tasks")).split() shared_task_list = file_content(os.path.join(self.cpuset_path, "shared", "tasks")).split()
self.assertIn("1000", tasks_at_cpu0) self.assertIn("1000", shared_task_list)
self.assertIn("1002", shared_task_list)
# test if the moving suceeded into any provate CPUS (id>0) # test if the moving suceeded into any provate CPUS (id>0)
self.assertTrue(any("1001" in file_content(exclusive_task) exclusive_tasks_path_list = [
for exclusive_task in glob.glob(os.path.join(self.cpuset_path, "cpu[1-9]", "tasks")))) os.path.join(self.cpuset_path, "exclusive", "tasks"),
self.assertIn("1002", tasks_at_cpu0) ] + glob.glob(os.path.join(self.cpuset_path, "exclusive", "cpu[1-9]", "tasks"))
self.assertTrue(any("1001" in file_content(exclusive_tasks_path)
for exclusive_tasks_path in exclusive_tasks_path_list))
self.assertFalse(any("9999" in file_content(exclusive_tasks_path)
for exclusive_tasks_path in exclusive_tasks_path_list))
# slapformat should remove successfully moved PIDs from the .slapos-cpu-exclusive file # slapformat should remove successfully moved PIDs from the .slapos-cpu-exclusive file
self.assertEqual("", file_content(request_file_path).strip()) self.assertEqual("", file_content(request_file_path).strip())
......
Styling with Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!