Commit 31ff1401 authored by Thomas Gambier's avatar Thomas Gambier 🚴🏼

slapos/collect: enable disk usage

See merge request !311
parents 7244f1f0 2da17657
......@@ -66,6 +66,12 @@ download-binary-dir-url = http://shadir.nxdcdn.com
#shadir-cert-file = /etc/opt/slapos/shacache/shacache.cert
#shadir-key-file = /etc/opt/slapos/shacache/shacache.key
[collect]
enable = True
time_cycle = 86400
pid_folder = /srv/slapgrid/var/run
use_quota = False
# List of signatures of uploaders we trust:
# Sebastien Robin
# Kazuhiko Shiozaki
......
......@@ -31,6 +31,8 @@ from slapos.collect import do_collect
from slapos.cli.command import must_be_root
from slapos.cli.config import ConfigCommand
import logging
class CollectCommand(ConfigCommand):
"""
Collect system consumption and data and store.
......@@ -44,4 +46,5 @@ class CollectCommand(ConfigCommand):
@must_be_root
def take_action(self, args):
configp = self.fetch_config(args)
do_collect(configp)
logger = self.app.log
do_collect(logger, configp)
......@@ -71,7 +71,7 @@ def current_state(user_dict):
for i, process in enumerate(process_list):
yield build_snapshot(process)
def do_collect(conf):
def do_collect(logger, conf):
"""
Main function
The idea here is to poll system every so many seconds
......@@ -80,6 +80,7 @@ def do_collect(conf):
Each user object is a dict, indexed on timestamp. We add every snapshot
matching the user so that we get informations for each users
"""
logger.info('Collecting data...')
try:
collected_date, collected_time = _get_time()
user_dict = get_user_list(conf)
......@@ -94,16 +95,20 @@ def do_collect(conf):
if conf.has_option("slapos", "collect_cache"):
days_to_preserve = conf.getint("slapos", "collect_cache")
log_directory = "%s/var/data-log" % conf.get("slapos", "instance_root")
logger.debug("Log directory: %s", log_directory)
mkdir_p(log_directory, 0o755)
consumption_report_directory = "%s/var/consumption-report" % \
conf.get("slapos", "instance_root")
mkdir_p(consumption_report_directory, 0o755)
logger.debug("Consumption report directory: %s", consumption_report_directory)
xml_report_directory = "%s/var/xml_report/%s" % \
(conf.get("slapos", "instance_root"),
conf.get("slapos", "computer_id"))
mkdir_p(xml_report_directory, 0o755)
logger.debug("XML report directory: %s", xml_report_directory)
if stat.S_IMODE(os.stat(log_directory).st_mode) != 0o755:
os.chmod(log_directory, 0o755)
......@@ -113,11 +118,12 @@ def do_collect(conf):
if conf.has_option("slapformat", "computer_model_id"):
computer_model_id = conf.get("slapformat",
"computer_model_id")
else:
computer_model_id = "no_model"
logger.debug("Computer model id: %s", computer_model_id)
uptime = _get_uptime()
if conf.has_option("slapformat", "heating_sensor_id"):
heating_sensor_id = conf.get("slapformat",
"heating_sensor_id")
......@@ -130,18 +136,30 @@ def do_collect(conf):
else:
heating_sensor_id = "no_sensor"
test_heating = False
logger.debug("Heating sensor id: %s", heating_sensor_id)
logger.info("Inserting computer information into database...")
computer = Computer(ComputerSnapshot(model_id=computer_model_id,
sensor_id = heating_sensor_id,
test_heating=test_heating))
# Insert computer's data
computer.save(database, collected_date, collected_time)
logger.info("Done.")
logger.info("Inserting user information into database...")
# Insert TABLE user + TABLE folder
for user in user_dict.values():
user.save(database, collected_date, collected_time)
logger.info("Done.")
logger.info("Writing csv, XML and JSON files...")
# Write a csv with dumped data in the log_directory
SystemCSVReporterDumper(database).dump(log_directory)
RawCSVDumper(database).dump(log_directory)
# Write xml files
consumption_report = ConsumptionReport(
computer_id=conf.get("slapos", "computer_id"),
user_list=user_dict,
......@@ -156,16 +174,23 @@ def do_collect(conf):
if report_file is not None:
shutil.copy(report_file, xml_report_directory)
# write json
partition_report = PartitionReport(
database=database,
user_list=user_dict)
partition_report.buildJSONMonitorReport()
# Put dumped csv in a current_date.tar.gz
compressLogFolder(log_directory)
logger.info("Done.")
# Drop older entries already reported
database.garbageCollect(days_to_preserve)
logger.info("Finished collecting.")
logger.info('=' * 80)
except AccessDenied:
print("You HAVE TO execute this script with root permission.")
logger.error("You HAVE TO execute this script with root permission.")
......@@ -28,9 +28,12 @@
##############################################################################
import os
import logging
from datetime import datetime, timedelta
from slapos.collect.snapshot import FolderSizeSnapshot
logger = logging.getLogger(__name__)
def get_user_list(config):
nb_user = int(config.get("slapformat", "partition_amount"))
name_prefix = config.get("slapformat", "user_base_name")
......@@ -38,14 +41,19 @@ def get_user_list(config):
instance_root = config.get("slapos", "instance_root")
# By default, enable disk snapshot,
# and set time_cycle to 24hours after the first disk snapshot run
disk_snapshot_params = {'enable': False, 'time_cycle': 86400}
pid_folder_tmp = instance_root + "/var/run"
disk_snapshot_params = {'enable': True,
'time_cycle': 86400,
'pid_folder': pid_folder_tmp,
'use_quota': False}
if config.has_section('collect'):
collect_section = dict(config.items("collect"))
disk_snapshot_params = dict(
enable=collect_section.get("report_disk_usage", "False").lower() in ('true', 'on', '1'),
pid_folder=collect_section.get("disk_snapshot_process_pid_foder", None),
time_cycle=int(collect_section.get("disk_snapshot_time_cycle", 86400)),
use_quota=collect_section.get("disk_snapshot_use_quota", "True").lower() in ('true', 'on', '1'),
enable = collect_section.get("report_disk_usage", "True").lower() in ('true', 'on', '1'),
pid_folder = collect_section.get("disk_snapshot_process_pid_foder", pid_folder_tmp),
time_cycle = int(collect_section.get("disk_snapshot_time_cycle", 86400)),
use_quota = collect_section.get("disk_snapshot_use_quota", "False").lower() in ('true', 'on', '1'),
)
user_dict = {name: User(name, path, disk_snapshot_params)
for name, path in [
......@@ -72,6 +80,7 @@ class User(object):
def _insertDiskSnapShot(self, database, collected_date, collected_time):
if self.disk_snapshot_params['enable']:
time_cycle = self.disk_snapshot_params.get('time_cycle', 0)
database.connect()
if time_cycle:
for date_time in database.select(table="folder", columns="date, time",
......@@ -79,18 +88,24 @@ class User(object):
where="partition='%s'" % self.name):
latest_date = datetime.strptime('%s %s' % date_time,
"%Y-%m-%d %H:%M:%S")
if (datetime.utcnow() - latest_date).seconds < time_cycle:
time_spent = (datetime.utcnow() - latest_date).total_seconds()
if time_spent < time_cycle:
# wait the time cycle
logger.info("Time cycle is not over (%s seconds remaining). No computation of "
"disk usage on the partition %s.", time_cycle - time_spent, self.name)
return
break
pid_file = self.disk_snapshot_params.get('pid_folder', None)
if pid_file is not None:
pid_file = os.path.join(pid_file, '%s_disk_size.pid' % self.name)
disk_snapshot = FolderSizeSnapshot(self.path, pid_file)
disk_snapshot.update_folder_size()
# Skeep insert empty partition: size <= 1Mb
if disk_snapshot.disk_usage <= 1024.0 and \
not self.disk_snapshot_params.get('testing', False):
logger.debug("Disk usage of the partition %s: %s. "
"Ignoring insertion in the dataset.", self.name, disk_snapshot.disk_usage)
return
database.inserFolderSnapshot(self.name,
disk_usage=disk_snapshot.get("disk_usage"),
......@@ -102,7 +117,6 @@ class User(object):
def save(self, database, collected_date, collected_time):
""" Insert collected data on user collector """
database.connect()
snapshot_counter = len(self.snapshot_list)
for snapshot_item in self.snapshot_list:
snapshot_item.update_cpu_percent()
database.insertUserSnapshot(self.name,
......
......@@ -31,6 +31,7 @@ from __future__ import print_function
import psutil
import os
import subprocess
import logging
from .temperature import collectComputerTemperature, launchTemperatureTest
from .temperature.heating import get_contribution_ratio
......@@ -39,6 +40,8 @@ import six
MEASURE_INTERVAL = 5
logger = logging.getLogger(__name__)
class _Snapshot(object):
def get(self, property, default=None):
return getattr(self, property, default)
......@@ -97,18 +100,21 @@ class FolderSizeSnapshot(_Snapshot):
except OSError:
pass
else:
logger.warning("Process %s still in progress. Try later.", pid)
return
self.disk_usage = self._getSize(self.folder_path)
# If extra disk added to partition
data_dir = os.path.join(self.folder_path, 'DATA')
if os.path.exists(data_dir):
logger.debug("Extra disk added to the partition")
for filename in os.listdir(data_dir):
extra_path = os.path.join(data_dir, filename)
if os.path.islink(extra_path) and os.path.isdir('%s/' % extra_path):
self.disk_usage += self._getSize('%s/' % extra_path)
def _getSize(self, file_path):
size = 0
command = 'du -s %s' % file_path
process = subprocess.Popen(command, stdout=subprocess.PIPE,
......@@ -119,6 +125,7 @@ class FolderSizeSnapshot(_Snapshot):
result = process.communicate()[0]
if process.returncode == 0:
size, _ = result.strip().split()
return float(size)
class SystemSnapshot(_Snapshot):
......@@ -161,8 +168,7 @@ class HeatingContributionSnapshot(_Snapshot):
result = launchTemperatureTest(sensor_id)
if result is None:
print("Impossible to test sensor: %s " % sensor_id)
logger.warning("Impossible to test sensor: %s", sensor_id)
initial_temperature, final_temperature, duration = result
......
......@@ -26,6 +26,7 @@
##############################################################################
from slapos.util import mkdir_p
from datetime import datetime, timedelta
import csv
import six
import mock
......@@ -40,6 +41,7 @@ import tempfile
import slapos.slap
import psutil
import sqlite3
import subprocess
from time import strftime
from slapos.collect import entity, snapshot, db, reporter
from slapos.cli.entry import SlapOSApp
......@@ -495,6 +497,11 @@ class TestCollectSnapshot(unittest.TestCase):
if os.path.exists(self.temp_dir):
shutil.rmtree(self.temp_dir)
def getFakeUser(self, disk_snapshot_params={}):
os.mkdir("%s/fakeuser0" % self.instance_root)
return entity.User("fakeuser0",
"%s/fakeuser0" % self.instance_root, disk_snapshot_params )
def test_process_snapshot(self):
process = psutil.Process(os.getpid())
process_snapshot = snapshot.ProcessSnapshot(process)
......@@ -535,6 +542,61 @@ class TestCollectSnapshot(unittest.TestCase):
disk_snapshot.update_folder_size()
self.assertNotEqual(disk_snapshot.disk_usage, 0)
def test_process_in_progress_disk_usage(self):
pid_file = os.path.join(self.instance_root, 'sleep.pid')
disk_snapshot = snapshot.FolderSizeSnapshot(self.instance_root, pid_file)
command = 'sleep 1h'
process = subprocess.Popen(command, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, shell=True)
with open(pid_file, 'w') as fpid:
pid = fpid.write(str(process.pid))
self.assertTrue(os.path.isfile(pid_file))
self.assertEqual(disk_snapshot.update_folder_size(), None)
disk_snapshot = snapshot.FolderSizeSnapshot(self.instance_root, pid_file,
use_quota=True)
self.assertEqual(disk_snapshot.update_folder_size(), None)
process.terminate()
def test_time_cycle(self):
disk_snapshot_params = {'enable': True, 'time_cycle': 3600, 'testing': True}
user = self.getFakeUser(disk_snapshot_params)
database = db.Database(self.instance_root, create=True)
date = datetime.utcnow().date()
time = datetime.utcnow().time().strftime("%H:%M:%S")
time_earlier = (datetime.utcnow() - \
timedelta(hours=3)).time().strftime("%H:%M:%S")
database.connect()
database.inserFolderSnapshot('fakeuser0', '1.0', date, time_earlier)
database.commit()
database.close()
# check that _insertDiskSnapShot called update_folder_size
with mock.patch('slapos.collect.snapshot.FolderSizeSnapshot.update_folder_size'
) as update_folder_size_call:
user._insertDiskSnapShot(database, date, time)
update_folder_size_call.assert_called_once()
time_earlier = (datetime.utcnow() - \
timedelta(minutes=10)).time().strftime("%H:%M:%S")
database.connect()
database.inserFolderSnapshot('fakeuser0', '1.0', date, time_earlier)
database.commit()
database.close()
# check that _insertDiskSnapShot stop before calling update_folder_size
with mock.patch('slapos.collect.snapshot.FolderSizeSnapshot.update_folder_size'
) as update_folder_size_call:
user._insertDiskSnapShot(database, date, time)
update_folder_size_call.assert_not_called()
def test_process_snapshot_broken_process(self):
self.assertRaises(AssertionError,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment