Commit 8efa3740 authored by Alain Takoudjou's avatar Alain Takoudjou

Add partition size snapshot to slapos.collect

Partition folder snapshot use du to calculate folder size and return value in Mb.
This snapshot is enabled by default and will calculate size of each partition every 24hours.
Partition with size < 1Mb are not are not reported (considered as empty).
The behavior can be changed using some configurations in slapos.cfg:

[collect]
report_disk_usage = True

disk_snapshot_process_pid_foder = /srv/slapgrid/var/run

disk_snapshot_time_cycle = 86400
parent 84a3bd53
......@@ -47,6 +47,10 @@ class Database:
" io_cycles_counter real, date text, time text, " \
" reported integer NULL DEFAULT 0)"
CREATE_FOLDER_TABLE = "create table if not exists folder "\
"(partition text, disk_used real, date text, " \
" time text, reported integer NULL DEFAULT 0)"
CREATE_COMPUTER_TABLE = "create table if not exists computer "\
"(cpu_num_core real, cpu_frequency real, cpu_type text," \
" memory_size real, memory_type text, partition_list text," \
......@@ -80,6 +84,10 @@ class Database:
"date, time) values " \
"('%s', %s, '%s', %s, %s, %s, %s, %s, %s, %s, '%s', '%s' )"
INSERT_FOLDER_TEMPLATE = "insert into folder(" \
"partition, disk_used, date, time) values " \
"('%s', %s, '%s', '%s' )"
INSERT_COMPUTER_TEMPLATE = "insert into computer("\
" cpu_num_core, cpu_frequency, cpu_type," \
"memory_size, memory_type, partition_list," \
......@@ -137,6 +145,7 @@ class Database:
assert self.CREATE_USER_TABLE is not None
self.connect()
self._execute(self.CREATE_USER_TABLE)
self._execute(self.CREATE_FOLDER_TABLE)
self._execute(self.CREATE_COMPUTER_TABLE)
self._execute(self.CREATE_SYSTEM_TABLE)
self._execute(self.CREATE_DISK_PARTITION)
......@@ -163,6 +172,13 @@ class Database:
self._execute(insertion_sql)
return insertion_sql
def inserFolderSnapshot(self, partition, disk_usage, insertion_date, insertion_time):
""" Insert folder disk usage snapshots information on a database """
insertion_sql = self.INSERT_FOLDER_TEMPLATE % \
( partition, disk_usage, insertion_date, insertion_time)
self._execute(insertion_sql)
return insertion_sql
def insertComputerSnapshot(self, cpu_num_core, cpu_frequency, cpu_type,
memory_size, memory_type, partition_list, insertion_date, insertion_time):
"""Insert Computer general informations snapshots informations on
......@@ -274,7 +290,7 @@ class Database:
for table in table_list:
self._execute(update_sql % (table, date_scope))
def select(self, table, date=None, columns="*", where=None):
def select(self, table, date=None, columns="*", where=None, order=None, limit=0):
""" Query database for a full table information """
if date is not None:
where_clause = " WHERE date = '%s' " % date
......@@ -284,8 +300,12 @@ class Database:
if where is not None:
if where_clause == "":
where_clause += " WHERE 1 = 1 "
where_clause += " AND %s " % where
where_clause += " AND %s " % where
select_sql = "SELECT %s FROM %s %s " % (columns, table, where_clause)
if order is not None:
select_sql += " ORDER BY %s" % order
if limit:
select_sql += " limit %s" % limit
return self._execute(select_sql)
......
......@@ -27,14 +27,27 @@
#
##############################################################################
import os
from datetime import datetime, timedelta
from slapos.collect.snapshot import FolderSizeSnapshot
def get_user_list(config):
nb_user = int(config.get("slapformat", "partition_amount"))
name_prefix = config.get("slapformat", "user_base_name")
path_prefix = config.get("slapformat", "partition_base_name")
instance_root = config.get("slapos", "instance_root")
user_dict = {name: User(name, path)
# By default, enable disk snapshot,
# and set time_cycle to 24hours after the first disk snapshot run
disk_snapshot_params = {'enable': True, 'time_cycle': 86400}
if config.has_section('collect'):
collect_section = dict(config.items("collect"))
disk_snapshot_params = dict(
enable=eval(collect_section.get("report_disk_usage", "True")),
Please register or sign in to reply
pid_folder=collect_section.get("disk_snapshot_process_pid_foder", None),
time_cycle=int(collect_section.get("disk_snapshot_time_cycle", 86400)),
use_quota=eval(collect_section.get("disk_snapshot_use_quota", "True"))
)
user_dict = {name: User(name, path, disk_snapshot_params)
for name, path in [
(
"%s%s" % (name_prefix, nb),
......@@ -47,17 +60,53 @@ def get_user_list(config):
return user_dict
class User(object):
def __init__(self, name, path):
def __init__(self, name, path, disk_snapshot_params={}):
self.name = str(name)
self.path = str(path)
self.disk_snapshot_params = disk_snapshot_params
self.snapshot_list = []
def append(self, value):
self.snapshot_list.append(value)
def _insertDiskSnapShot(self, database, collected_date, collected_time):
if self.disk_snapshot_params['enable']:
time_cycle = self.disk_snapshot_params.get('time_cycle', 0)
database.connect()
if time_cycle:
order = 'date DESC, time DESC'
limit = 1
query = database.select(table="folder", columns="date, time",
order=order, limit=limit,
where="partition=%s" % self.name)
query_result = zip(*query)
if len(query_result):
date, time = (query_result[0][0], query_result[1][0])
latest_date = datetime.strptime('%s %s' % (date, time),
"%Y-%m-%d %H:%M:%S")
if (datetime.now() - latest_date).seconds < time_cycle:
# wait the time cycle
return
pid_file = self.disk_snapshot_params.get('pid_folder', None)
if pid_file is not None:
pid_file = os.path.join(pid_file, '%s_disk_size.pid' % self.name)
disk_snapshot = FolderSizeSnapshot(self.path, pid_file)
disk_snapshot.update_folder_size()
# Skeep insert empty partition: size <= 1Mb
if disk_snapshot.disk_usage <= 1.0 and \
not self.disk_snapshot_params.get('testing', False):
return
database.inserFolderSnapshot(self.name,
disk_usage=disk_snapshot.get("disk_usage"),
insertion_date=collected_date,
insertion_time=collected_time)
database.commit()
database.close()
def save(self, database, collected_date, collected_time):
""" Insert collected data on user collector """
database.connect()
snapshot_counter = len(self.snapshot_list)
for snapshot_item in self.snapshot_list:
snapshot_item.update_cpu_percent()
database.insertUserSnapshot(self.name,
......@@ -74,6 +123,9 @@ class User(object):
insertion_time=collected_time)
database.commit()
database.close()
# Inser disk snapshot in a new transaction, it can take long
self._insertDiskSnapShot(database, collected_date, collected_time)
class Computer(dict):
......
......@@ -213,6 +213,16 @@ class ConsumptionReport(object):
reference=user,
category="")
for user in self.user_list:
partition_disk_used = self._getPartitionDiskUsedAverage(user, date_scope)
if partition_disk_used is not None:
journal.newMovement(transaction,
resource="service_module/disk_used",
title="Partition Disk Used Average for %s" % (user),
quantity=str(partition_disk_used),
reference=user,
category="")
with open(xml_report_path, 'w') as f:
f.write(journal.getXML())
f.close()
......@@ -288,6 +298,24 @@ class ConsumptionReport(object):
if len(sample_amount) and len(memory_sum):
return memory_sum[0][0]/sample_amount[0][0]
def _getPartitionDiskUsedAverage(self, partition_id, date_scope):
self.db.connect()
query_result_cursor = self.db.select("folder", date_scope,
columns="SUM(disk_used)",
where="partition = '%s'" % partition_id)
disk_used_sum = zip(*query_result_cursor)
if len(disk_used_sum) and disk_used_sum[0][0] == 0:
return
query_result_cursor = self.db.select("user", date_scope,
columns="COUNT(DISTINCT time)",
where="partition = '%s'" % partition_id)
collect_amount = zip(*query_result_cursor)
self.db.close()
if len(collect_amount) and len(disk_used_sum):
return disk_used_sum[0][0]/collect_amount[0][0]
class Journal(object):
......
......@@ -29,6 +29,7 @@
import psutil
import os
import subprocess
from temperature import collectComputerTemperature, \
launchTemperatureTest
......@@ -71,6 +72,53 @@ class ProcessSnapshot(_Snapshot):
# CPU percentage, we will have to get actual absolute value
self.cpu_percent = self.process_object.cpu_percent()
class FolderSizeSnapshot(_Snapshot):
"""Calculate partition folder size.
"""
def __init__(self, folder_path, pid_file=None, use_quota=False):
# slapos computer partition size
self.folder_path = folder_path
self.pid_file = pid_file
self.disk_usage = 0
self.use_quota = use_quota
def update_folder_size(self):
"""Return 0 if the process du is still running
"""
if self.pid_file and os.path.exists(self.pid_file):
with open(self.pid_file, 'r') as fpid:
pid_str = fpid.read()
if pid_str:
pid = int(pid_str)
try:
os.kill(pid, 0)
except OSError:
pass
else:
return
self.disk_usage = self._getSize(self.folder_path)
# If extra disk added to partition
data_dir = os.path.join(self.folder_path, 'DATA')
if os.path.exists(data_dir):
for filename in os.listdir(data_dir):
extra_path = os.path.join(data_dir, filename)
if os.path.islink(extra_path) and os.path.isdir('%s/' % extra_path):
self.disk_usage += self._getSize('%s/' % extra_path)
def _getSize(self, file_path):
size = 0
command = 'du -sm %s' % file_path
process = subprocess.Popen(command, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, shell=True)
if self.pid_file:
with open(self.pid_file, 'w') as fpid:
pid = fpid.write(str(process.pid))
result = process.communicate()[0]
if process.returncode == 0:
size, _ = result.strip().split()
return float(size)
class SystemSnapshot(_Snapshot):
""" Take a snapshot from current system usage
"""
......@@ -195,3 +243,4 @@ class ComputerSnapshot(_Snapshot):
return [(k, v) for k, v in partition_dict.iteritems()]
......@@ -54,6 +54,9 @@ class FakeDatabase(object):
def insertUserSnapshot(self, *args, **kw):
self.invoked_method_list.append(("insertUserSnapshot", (args, kw)))
def inserFolderSnapshot(self, *args, **kw):
self.invoked_method_list.append(("inserFolderSnapshot", (args, kw)))
def insertSystemSnapshot(self, *args, **kw):
self.invoked_method_list.append(("insertSystemSnapshot", (args, kw)))
......@@ -63,6 +66,17 @@ class FakeDatabase(object):
def insertDiskPartitionSnapshot(self, *args, **kw):
self.invoked_method_list.append(("insertDiskPartitionSnapshot", (args, kw)))
def insertTemperatureSnapshot(self, *args, **kw):
self.invoked_method_list.append(("insertTemperatureSnapshot", (args, kw)))
def insertHeatingSnapshot(self, *args, **kw):
self.invoked_method_list.append(("insertHeatingSnapshot", (args, kw)))
class FakeDatabase2(FakeDatabase):
def select(self, *args, **kw):
self.invoked_method_list.append(("select", (args, kw)))
return []
class TestCollectDatabase(unittest.TestCase):
def setUp(self):
......@@ -79,7 +93,7 @@ class TestCollectDatabase(unittest.TestCase):
database.connect()
try:
self.assertEquals(
[u'user', u'computer', u'system', u'disk', u'temperature', u'heating'],
[u'user', u'folder', u'computer', u'system', u'disk', u'temperature', u'heating'],
database.getTableList())
finally:
database.close()
......@@ -97,8 +111,21 @@ class TestCollectDatabase(unittest.TestCase):
'10.0', '10.0', '0.1', '0.1', 'DATE', 'TIME')
database.commit()
self.assertEquals([i for i in database.select('user')],
[(u'fakeuser0', 10.0, u'10-12345', 0.1, 10.0,
1.0, 10.0, 10.0, 0.1, 0.1, u'DATE', u'TIME', 0)])
[(u'fakeuser0', 10.0, u'10-12345', 0.1, 10.0,
1.0, 10.0, 10.0, 0.1, 0.1, u'DATE', u'TIME', 0)])
finally:
database.close()
def test_insert_folder_snapshot(self):
database = db.Database(self.instance_root)
database.connect()
try:
database.inserFolderSnapshot(
'fakeuser0', '0.1', 'DATE', 'TIME')
database.commit()
self.assertEquals([i for i in database.select('folder')],
[(u'fakeuser0', 0.1, u'DATE', u'TIME', 0)])
finally:
database.close()
......@@ -258,6 +285,7 @@ class TestCollectReport(unittest.TestCase):
csv_path_list = ['%s/1983-01-10/dump_disk.csv' % self.instance_root,
'%s/1983-01-10/dump_computer.csv' % self.instance_root,
'%s/1983-01-10/dump_user.csv' % self.instance_root,
'%s/1983-01-10/dump_folder.csv' % self.instance_root,
'%s/1983-01-10/dump_heating.csv' % self.instance_root,
'%s/1983-01-10/dump_temperature.csv' % self.instance_root,
'%s/1983-01-10/dump_system.csv' % self.instance_root]
......@@ -337,7 +365,7 @@ class TestCollectSnapshot(unittest.TestCase):
self.assertNotEquals(process_snapshot.username, None)
self.assertEquals(int(process_snapshot.pid), os.getpid())
self.assertEquals(int(process_snapshot.name.split("-")[0]),
self.assertEquals(int(process_snapshot.process.split("-")[0]),
os.getpid())
self.assertNotEquals(process_snapshot.cpu_percent , None)
......@@ -348,6 +376,30 @@ class TestCollectSnapshot(unittest.TestCase):
self.assertNotEquals(process_snapshot.io_rw_counter, None)
self.assertNotEquals(process_snapshot.io_cycles_counter, None)
def test_folder_size_snapshot(self):
disk_snapshot = snapshot.FolderSizeSnapshot(self.instance_root)
self.assertEqual(disk_snapshot.disk_usage, 0)
for i in range(0, 10):
folder = 'folder%s' % i
os.mkdir(os.path.join(self.instance_root, folder))
with open(os.path.join(self.instance_root, folder, 'toto'), 'w') as f:
f.write('toto text')
disk_snapshot.update_folder_size()
self.assertNotEquals(disk_snapshot.disk_usage, 0)
pid_file = os.path.join(self.instance_root, 'disk_snap.pid')
disk_snapshot = snapshot.FolderSizeSnapshot(self.instance_root, pid_file)
disk_snapshot.update_folder_size()
self.assertNotEquals(disk_snapshot.disk_usage, 0)
pid_file = os.path.join(self.instance_root, 'disk_snap.pid')
disk_snapshot = snapshot.FolderSizeSnapshot(self.instance_root, pid_file,
use_quota=True)
disk_snapshot.update_folder_size()
self.assertNotEquals(disk_snapshot.disk_usage, 0)
def test_process_snapshot_broken_process(self):
self.assertRaises(AssertionError,
snapshot.ProcessSnapshot, None)
......@@ -396,9 +448,10 @@ class TestCollectEntity(unittest.TestCase):
def tearDown(self):
pass
def getFakeUser(self):
def getFakeUser(self, disk_snapshot_params={}):
os.mkdir("%s/fakeuser0" % self.instance_root)
return entity.User("fakeuser0",
"%s/fakeuser0" % self.instance_root )
"%s/fakeuser0" % self.instance_root, disk_snapshot_params )
def test_get_user_list(self):
config = ConfigParser()
......@@ -426,7 +479,8 @@ class TestCollectEntity(unittest.TestCase):
self.assertEquals(user.snapshot_list, ["SNAPSHOT"])
def test_user_save(self):
user = self.getFakeUser()
disk_snapshot_params = {'enable': False}
user = self.getFakeUser(disk_snapshot_params)
process = psutil.Process(os.getpid())
user.append(snapshot.ProcessSnapshot(process))
database = FakeDatabase()
......@@ -443,6 +497,64 @@ class TestCollectEntity(unittest.TestCase):
self.assertEquals(database.invoked_method_list[2], ("commit", ""))
self.assertEquals(database.invoked_method_list[3], ("close", ""))
def test_user_save_disk_snapshot(self):
disk_snapshot_params = {'enable': True, 'testing': True}
user = self.getFakeUser(disk_snapshot_params)
process = psutil.Process(os.getpid())
user.append(snapshot.ProcessSnapshot(process))
database = FakeDatabase2()
user.save(database, "DATE", "TIME")
self.assertEquals(database.invoked_method_list[0], ("connect", ""))
self.assertEquals(database.invoked_method_list[1][0], "insertUserSnapshot")
self.assertEquals(database.invoked_method_list[1][1][0], ("fakeuser0",))
self.assertEquals(database.invoked_method_list[1][1][1].keys(),
['cpu_time', 'cpu_percent', 'process',
'memory_rss', 'pid', 'memory_percent',
'io_rw_counter', 'insertion_date', 'insertion_time',
'io_cycles_counter', 'cpu_num_threads'])
self.assertEquals(database.invoked_method_list[2], ("commit", ""))
self.assertEquals(database.invoked_method_list[3], ("close", ""))
self.assertEquals(database.invoked_method_list[4], ("connect", ""))
self.assertEquals(database.invoked_method_list[5][0], "inserFolderSnapshot")
self.assertEquals(database.invoked_method_list[5][1][0], ("fakeuser0",))
self.assertEquals(database.invoked_method_list[5][1][1].keys(),
['insertion_date', 'disk_usage', 'insertion_time'])
self.assertEquals(database.invoked_method_list[6], ("commit", ""))
self.assertEquals(database.invoked_method_list[7], ("close", ""))
def test_user_save_disk_snapshot_cycle(self):
disk_snapshot_params = {'enable': True, 'time_cycle': 3600, 'testing': True}
user = self.getFakeUser(disk_snapshot_params)
process = psutil.Process(os.getpid())
user.append(snapshot.ProcessSnapshot(process))
database = FakeDatabase2()
user.save(database, "DATE", "TIME")
self.assertEquals(database.invoked_method_list[0], ("connect", ""))
self.assertEquals(database.invoked_method_list[1][0], "insertUserSnapshot")
self.assertEquals(database.invoked_method_list[1][1][0], ("fakeuser0",))
self.assertEquals(database.invoked_method_list[1][1][1].keys(),
['cpu_time', 'cpu_percent', 'process',
'memory_rss', 'pid', 'memory_percent',
'io_rw_counter', 'insertion_date', 'insertion_time',
'io_cycles_counter', 'cpu_num_threads'])
self.assertEquals(database.invoked_method_list[2], ("commit", ""))
self.assertEquals(database.invoked_method_list[3], ("close", ""))
self.assertEquals(database.invoked_method_list[4], ("connect", ""))
self.assertEquals(database.invoked_method_list[5][0], "select")
self.assertEquals(database.invoked_method_list[5][1][0], ())
self.assertEquals(database.invoked_method_list[5][1][1].keys(),
['table', 'where', 'limit', 'order', 'columns'])
self.assertEquals(database.invoked_method_list[6][0], "inserFolderSnapshot")
self.assertEquals(database.invoked_method_list[6][1][0], ("fakeuser0",))
self.assertEquals(database.invoked_method_list[6][1][1].keys(),
['insertion_date', 'disk_usage', 'insertion_time'])
self.assertEquals(database.invoked_method_list[7], ("commit", ""))
self.assertEquals(database.invoked_method_list[8], ("close", ""))
def test_computer_entity(self):
computer = entity.Computer(snapshot.ComputerSnapshot())
database = FakeDatabase()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment