Commit 55ed4b48 authored by Lutra Conseil's avatar Lutra Conseil Committed by Rafael Monnerat

[slapos.collect] Refactor and Collect information from Computer and System

Major changes
  Rename pymonitor, add dependency and simplify
  Save all information using sqllite
  Record information from all processes individually
  Implement Snapshot for Computer and Systems information
  General Clean up
parent b5b6390f
......@@ -48,6 +48,7 @@ setup(name=name,
'netifaces', # to fetch information about network devices
'setuptools', # namespaces
'supervisor', # slapgrid uses supervisor to manage processes
'psutil',
'xml_marshaller>=0.9.3', # to unmarshall/marshall python objects to/from
# XML
'zope.interface', # slap library implementes interfaces
......@@ -102,6 +103,7 @@ setup(name=name,
'node software = slapos.cli.slapgrid:SoftwareCommand',
'node instance = slapos.cli.slapgrid:InstanceCommand',
'node boot = slapos.cli.boot:BootCommand',
'node collect = slapos.cli.collect:CollectCommand',
# SlapOS client commands
'console = slapos.cli.console:ConsoleCommand',
'configure local = slapos.cli.configure_local:ConfigureLocalCommand',
......
# -*- coding: utf-8 -*-
import subprocess
from time import sleep
import socket
import glob
import os
from slapos.collect import do_collect
from slapos.cli.command import must_be_root
from slapos.cli.entry import SlapOSApp
from slapos.cli.config import ConfigCommand
class CollectCommand(ConfigCommand):
"""
Collect system consumption and data and store.
"""
command_group = 'node'
def get_parser(self, prog_name):
ap = super(CollectCommand, self).get_parser(prog_name)
return ap
@must_be_root
def take_action(self, args):
configp = self.fetch_config(args)
# Make sure ipv4 is working
do_collect(configp)
from psutil import process_iter, NoSuchProcess, AccessDenied
from time import time, sleep, strftime
from slapos.collect.db import Database
from slapos.util import mkdir_p
# Local import
from snapshot import ProcessSnapshot, SystemSnapshot, ComputerSnapshot
from entity import get_user_list, Computer
def _get_time():
return strftime("%Y-%m-%d -- %H:%M:%S").split(" -- ")
def build_snapshot(proc):
try:
return ProcessSnapshot(proc)
except NoSuchProcess:
return None
def current_state(user_dict):
"""
Iterator used to apply build_snapshot(...) on every single relevant process.
A process is considered relevant if its user matches our user list, i.e.
its user is a slapos user
"""
process_list = [p for p in process_iter() if p.username() in user_dict]
for i, process in enumerate(process_list):
yield build_snapshot(process)
def do_collect(conf):
"""
Main function
The idea here is to poll system every so many seconds
For each poll, we get a list of Snapshots, holding informations about
processes. We iterate over that list to store datas on a per user basis:
Each user object is a dict, indexed on timestamp. We add every snapshot
matching the user so that we get informations for each users
"""
try:
collected_date, collected_time = _get_time()
user_dict = get_user_list(conf)
try:
for snapshot in current_state(user_dict):
if snapshot:
user_dict[snapshot.username].append(snapshot)
except (KeyboardInterrupt, SystemExit, NoSuchProcess):
raise
# XXX: we should use a value from the config file and not a hardcoded one
instance_root = conf.get("slapos", "instance_root")
mkdir_p("%s/var/data-log/" % instance_root)
database = Database("%s/var/data-log/" % instance_root)
computer = Computer(ComputerSnapshot())
computer.save(database, collected_date, collected_time)
for user in user_dict.values():
user.save(database, collected_date, collected_time)
from slapos.collect.reporter import SystemJSONReporterDumper, RawCSVDumper, SystemCSVReporterDumper
#SystemJSONReporterDumper(database).dump()
SystemCSVReporterDumper(database).dump("%s/var/data-log/" % instance_root)
RawCSVDumper(database).dump("%s/var/data-log/" % instance_root)
except AccessDenied:
print "You HAVE TO execute this script with root permission."
This diff is collapsed.
def get_user_list(config):
nb_user = int(config.get("slapformat", "partition_amount"))
name_prefix = config.get("slapformat", "user_base_name")
path_prefix = config.get("slapformat", "partition_base_name")
instance_root = config.get("slapos", "instance_root")
user_dict = {name: User(name, path)
for name, path in [
(
"%s%s" % (name_prefix, nb),
"%s/%s%s" % (instance_root, path_prefix, nb)
) for nb in range(nb_user)
]
}
#user_dict['root'] = User("root", "/opt/slapgrid")
return user_dict
class User(object):
def __init__(self, name, path):
self.name = str(name)
self.path = str(path)
self.snapshot_list = []
def append(self, value):
self.snapshot_list.append(value)
def save(self, database, collected_date, collected_time):
""" Insert collected data on user collector """
database.connect()
for snapshot_item in self.snapshot_list:
database.insertUserSnapshot(self.name,
pid=snapshot_item.get("pid"),
process=snapshot_item.get("process"),
cpu_percent=snapshot_item.get("cpu_percent"),
cpu_time=snapshot_item.get("cpu_time"),
cpu_num_threads=snapshot_item.get("cpu_num_threads"),
memory_percent=snapshot_item.get("memory_percent"),
memory_rss=snapshot_item.get("memory_rss"),
io_rw_counter=snapshot_item.get("io_rw_counter"),
io_cycles_counter=snapshot_item.get("io_cycles_counter"),
insertion_date=collected_date,
insertion_time=collected_time)
database.commit()
database.close()
class Computer(dict):
def __init__(self, computer_snapshot):
self.computer_snapshot = computer_snapshot
def save(self, database, collected_date, collected_time):
database.connect()
self._save_computer_snapshot(database, collected_date, collected_time)
self._save_system_snapshot(database, collected_date, collected_time)
self._save_disk_partition_snapshot(database, collected_date, collected_time)
database.commit()
database.close()
def _save_computer_snapshot(self, database, collected_date, collected_time):
partition_list = ";".join(["%s=%s" % (x,y) for x,y in \
self.computer_snapshot.get("partition_list")])
database.insertComputerSnapshot(
cpu_num_core=self.computer_snapshot.get("cpu_num_core"),
cpu_frequency=self.computer_snapshot.get("cpu_frequency"),
cpu_type=self.computer_snapshot.get("cpu_type"),
memory_size=self.computer_snapshot.get("memory_size"),
memory_type=self.computer_snapshot.get("memory_type"),
partition_list=partition_list,
insertion_date=collected_date,
insertion_time=collected_time)
def _save_system_snapshot(self, database, collected_date, collected_time):
snapshot = self.computer_snapshot.get("system_snapshot")
database.insertSystemSnapshot(
loadavg=snapshot.get("load"),
cpu_percent=snapshot.get("cpu_percent"),
memory_used=snapshot.get("memory_used"),
memory_free=snapshot.get("memory_free"),
net_in_bytes=snapshot.get("net_in_bytes"),
net_in_errors=snapshot.get("net_in_errors"),
net_in_dropped=snapshot.get("net_in_dropped"),
net_out_bytes=snapshot.get("net_out_bytes"),
net_out_errors= snapshot.get("net_out_errors"),
net_out_dropped=snapshot.get("net_out_dropped"),
insertion_date=collected_date,
insertion_time=collected_time)
def _save_disk_partition_snapshot(self, database, collected_date, collected_time):
for disk_partition in self.computer_snapshot.get("disk_snapshot_list"):
database.insertDiskPartitionSnapshot(
partition=disk_partition.partition,
used=disk_partition.disk_size_used,
free=disk_partition.disk_size_free,
mountpoint=';'.join(disk_partition.mountpoint_list),
insertion_date=collected_date,
insertion_time=collected_time)
#!/usr/bin/env python
from slapos.collect.db import Database
from slapos.util import mkdir_p
import os.path
import json
import csv
from time import strftime
class Dumper(object):
def __init__(self, database):
self.db = database
class SystemReporter(Dumper):
def dump(self, folder):
""" Dump data """
_date = strftime("%Y-%m-%d")
self.db.connect()
for item, collected_item_list in self.db.exportSystemAsDict(_date).iteritems():
self.writeFile(item, folder, collected_item_list)
for partition, collected_item_list in self.db.exportDiskAsDict(_date).iteritems():
partition_id = partition.split("-")[0].split("/")[-1]
item = "memory_%s" % partition.split("-")[1]
self.writeFile("disk_%s_%s" % (item, partition_id), folder, collected_item_list)
self.writeFile("disk_%s_%s" % (item, partition_id), folder, collected_item_list)
self.db.close()
class SystemJSONReporterDumper(SystemReporter):
def writeFile(self, name, folder, collected_entry_list=[]):
""" Dump data as json """
file_io = open(os.path.join(folder, "system_%s.json" % name), "w")
json.dump(collected_entry_list, file_io, sort_keys=True, indent=2)
file_io.close()
class SystemCSVReporterDumper(SystemReporter):
def writeFile(self, name, folder, collected_entry_list=[]):
""" Dump data as json """
file_io = open(os.path.join(folder, "system_%s.csv" % name), "w")
csv_output = csv.writer(file_io)
csv_output.writerow(["time", "entry"])
for collected_entry in collected_entry_list:
csv_output.writerow([collected_entry["time"], collected_entry["entry"]])
file_io.close()
class RawDumper(Dumper):
""" Dump raw data in a certain format
"""
def dump(self, folder):
date = strftime("%Y-%m-%d")
self.db.connect()
for date_scope, amount in self.db.getDataScopeList(ignore_date=date):
self.writeFile("system", folder, date_scope, self.db.select("system", date_scope))
self.writeFile("user", folder, date_scope, self.db.select("user", date_scope))
self.writeFile("disk", folder, date_scope, self.db.select("disk", date_scope))
self.db.markDayAsReported(date_scope, table_list = ["system", "user", "disk"])
self.db.commit()
self.db.close()
class RawCSVDumper(RawDumper):
def writeFile(self, name, folder, date_scope, rows):
mkdir_p(os.path.join(folder, date_scope))
file_io = open(os.path.join(folder, "%s/dump_%s.csv" % (date_scope, name)), "w")
csv_output = csv.writer(file_io)
csv_output.writerows(rows)
file_io.close()
# -*- coding: utf-8 -*-
##############################################################################
#
# Copyright (c) 2010, 2011, 2012 Vifib SARL and Contributors.
# All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public License
# as published by the Free Software Foundation; either version 2.1
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import psutil
import os
class _Snapshot(object):
def get(self, property, default=None):
return getattr(self, property, default)
class ProcessSnapshot(_Snapshot):
""" Take a snapshot from the running process
"""
def __init__(self, process=None):
assert type(process) is psutil.Process
ui_counter_list = process.get_io_counters()
self.username = process.username()
self.pid = process.pid
# Save full command line from the process.
self.name = "%s-%s" % (process.pid, process.create_time())
# CPU percentage, we will have to get actual absolute value
self.cpu_percent = process.get_cpu_percent(None)
# CPU Time
self.cpu_time = sum(process.get_cpu_times())
# Thread number, might not be really relevant
self.cpu_num_threads = process.get_num_threads()
# Memory percentage
self.memory_percent = process.get_memory_percent()
# Resident Set Size, virtual memory size is not accouned for
self.memory_rss = process.get_memory_info()[0]
# Byte count, Read and write. OSX NOT SUPPORTED
self.io_rw_counter = ui_counter_list[2] + ui_counter_list[3]
# Read + write IO cycles
self.io_cycles_counter = ui_counter_list[0] + ui_counter_list[1]
class SystemSnapshot(_Snapshot):
""" Take a snapshot from current system usage
"""
def __init__(self):
memory = psutil.phymem_usage()
net_io = psutil.net_io_counters()
self.memory_used = memory.used
self.memory_free = memory.free
self.memory_percent = memory.percent
self.cpu_percent = psutil.cpu_percent()
self.load = os.getloadavg()[0]
self.net_in_bytes = net_io.bytes_recv
self.net_in_errors = net_io.errin
self.net_in_dropped = net_io.dropin
self.net_out_bytes = net_io.bytes_sent
self.net_out_errors = net_io.errout
self.net_out_dropped = net_io.dropout
class DiskPartitionSnapshot(_Snapshot):
""" Take Snapshot from general disk partitions
usage
"""
def __init__(self, partition, mountpoint):
self.partition = partition
self.mountpoint_list = [ mountpoint ]
disk = psutil.disk_usage(mountpoint)
disk_io = psutil.disk_io_counters()
self.disk_size_used = disk.used
self.disk_size_free = disk.free
self.disk_size_percent = disk.percent
class ComputerSnapshot(_Snapshot):
""" Take a snapshot from computer informations
"""
def __init__(self):
self.cpu_num_core = psutil.NUM_CPUS
self.cpu_frequency = 0
self.cpu_type = 0
self.memory_size = psutil.TOTAL_PHYMEM
self.memory_type = 0
#
# Include a SystemSnapshot and a list DiskPartitionSnapshot
# on a Computer Snapshot
#
self.system_snapshot = SystemSnapshot()
self.disk_snapshot_list = []
self.partition_list = self._get_physical_disk_info()
def _get_physical_disk_info(self):
partition_dict = {}
for partition in psutil.disk_partitions():
if partition.device not in partition_dict:
usage = psutil.disk_usage(partition.mountpoint)
partition_dict[partition.device] = usage.total
self.disk_snapshot_list.append(
DiskPartitionSnapshot(partition.device,
partition.mountpoint))
return [(k, v) for k, v in partition_dict.iteritems()]
......@@ -893,7 +893,7 @@ class Slapgrid(object):
try:
computer_partition_id = computer_partition.getId()
#We want to execute all the script in the report folder
# We want to execute all the script in the report folder
instance_path = os.path.join(self.instance_root,
computer_partition.getId())
report_path = os.path.join(instance_path, 'etc', 'report')
......@@ -902,7 +902,7 @@ class Slapgrid(object):
else:
script_list_to_run = []
#We now generate the pseudorandom name for the xml file
# We now generate the pseudorandom name for the xml file
# and we add it in the invocation_list
f = tempfile.NamedTemporaryFile()
name_xml = '%s.%s' % ('slapreport', os.path.basename(f.name))
......@@ -914,13 +914,13 @@ class Slapgrid(object):
invocation_list = []
invocation_list.append(os.path.join(instance_path, 'etc', 'report',
script))
#We add the xml_file name to the invocation_list
# We add the xml_file name to the invocation_list
#f = tempfile.NamedTemporaryFile()
#name_xml = '%s.%s' % ('slapreport', os.path.basename(f.name))
#path_to_slapreport = os.path.join(instance_path, 'var', name_xml)
invocation_list.append(path_to_slapreport)
#Dropping privileges
# Dropping privileges
uid, gid = None, None
stat_info = os.stat(instance_path)
#stat sys call to get statistics informations
......@@ -946,47 +946,51 @@ class Slapgrid(object):
self.logger.exception('Cannot run usage script(s) for %r:' %
computer_partition.getId())
#Now we loop through the different computer partitions to report
# Now we loop through the different computer partitions to report
report_usage_issue_cp_list = []
for computer_partition in computer_partition_list:
try:
filename_delete_list = []
computer_partition_id = computer_partition.getId()
instance_path = os.path.join(self.instance_root, computer_partition_id)
dir_reports = os.path.join(instance_path, 'var', 'xml_report')
#The directory xml_report contain a number of files equal
#to the number of software instance running inside the same partition
if os.path.isdir(dir_reports):
filename_list = os.listdir(dir_reports)
else:
filename_list = []
#self.logger.debug('name List %s' % filename_list)
for filename in filename_list:
file_path = os.path.join(dir_reports, filename)
if os.path.exists(file_path):
usage = open(file_path, 'r').read()
#We check the validity of xml content of each reports
if not self.validateXML(usage, partition_consumption_model):
self.logger.info('WARNING: The XML file %s generated by slapreport is '
'not valid - This report is left as is at %s where you can '
'inspect what went wrong ' % (filename, dir_reports))
# Warn the SlapOS Master that a partition generates corrupted xml
# report
else:
computer_partition_usage = self.slap.registerComputerPartition(
self.computer_id, computer_partition_id)
computer_partition_usage.setUsage(usage)
computer_partition_usage_list.append(computer_partition_usage)
filename_delete_list.append(filename)
dir_report_list = [os.path.join(instance_path, 'var', 'xml_report'),
os.path.join(self.instance_root, 'var', 'xml_report',
computer_partition_id)]
for dir_reports in dir_report_list:
# The directory xml_report contain a number of files equal
# to the number of software instance running inside the same partition
if os.path.isdir(dir_reports):
filename_list = os.listdir(dir_reports)
else:
self.logger.debug('Usage report %r not found, ignored' % file_path)
filename_list = []
# self.logger.debug('name List %s' % filename_list)
for filename in filename_list:
file_path = os.path.join(dir_reports, filename)
if os.path.exists(file_path):
usage = open(file_path, 'r').read()
# We check the validity of xml content of each reports
if not self.validateXML(usage, partition_consumption_model):
self.logger.info('WARNING: The XML file %s generated by slapreport is '
'not valid - This report is left as is at %s where you can '
'inspect what went wrong ' % (filename, dir_reports))
# Warn the SlapOS Master that a partition generates corrupted xml
# report
else:
computer_partition_usage = self.slap.registerComputerPartition(
self.computer_id, computer_partition_id)
computer_partition_usage.setUsage(usage)
computer_partition_usage_list.append(computer_partition_usage)
filename_delete_list.append(filename)
else:
self.logger.debug('Usage report %r not found, ignored' % file_path)
#After sending the aggregated file we remove all the valid xml reports
for filename in filename_delete_list:
os.remove(os.path.join(dir_reports, filename))
# After sending the aggregated file we remove all the valid xml reports
for filename in filename_delete_list:
os.remove(os.path.join(dir_reports, filename))
# Whatever happens, don't stop processing other instances
except Exception:
......@@ -997,15 +1001,15 @@ class Slapgrid(object):
self.logger.info('computer_partition_usage_list: %s - %s' %
(computer_partition_usage.usage, computer_partition_usage.getId()))
#If there is, at least, one report
# If there is, at least, one report
if computer_partition_usage_list != []:
try:
#We generate the final XML report with asXML method
# We generate the final XML report with asXML method
computer_consumption = self.asXML(computer_partition_usage_list)
self.logger.info('Final xml report: %s' % computer_consumption)
#We test the XML report before sending it
# We test the XML report before sending it
if self.validateXML(computer_consumption, computer_consumption_model):
self.logger.info('XML file generated by asXML is valid')
slap_computer_usage.reportUsage(computer_consumption)
......
from psutil import *
from psutil._error import NoSuchProcess, AccessDenied
from time import time, sleep
from datetime import datetime
import os
import ConfigParser
# Local import
from snapshot import Snapshot
from user import User
# XXX : this is BAAAAD !!
# ***************** Config *****************
GLOBAL_SLAPOS_CONFIGURATION = os.environ.get(
'SLAPOS_CONFIGURATION',
'/etc/opt/slapos/slapos.cfg'
)
# ******************************************
# XXX : should rebuild this to make it more explicit
def build_user_list():
config = ConfigParser.SafeConfigParser()
config.read(GLOBAL_SLAPOS_CONFIGURATION)
nb_user = int(config.get("slapformat", "partition_amount"))
name_prefix = config.get("slapformat", "user_base_name")
path_prefix = config.get("slapformat", "partition_base_name")
instance_root = config.get("slapos", "instance_root")
return {name: User(name, path)
for name, path in [
(
"%s%s" % (name_prefix, nb),
"%s/%s%s" % (instance_root, path_prefix, nb)
) for nb in range(nb_user)
]
}
def build_snapshot(proc):
assert type(proc) is Process
try:
return Snapshot(
proc.username,
# CPU percentage, we will have to get actual absolute value
cpu = proc.get_cpu_percent(None),
# Thread number, might not be really relevant
cpu_io = proc.get_num_threads(),
# Resident Set Size, virtual memory size is not accounted for
ram = proc.get_memory_info()[0],
# Byte count, Read and write. OSX NOT SUPPORTED
hd = proc.get_io_counters()[2] + proc.get_io_counters()[3],
# Read + write IO cycles
hd_io = proc.get_io_counters()[0] + proc.get_io_counters()[1],
)
except NoSuchProcess:
return None
def current_state():
"""
Iterator used to apply build_snapshot(...) on every single relevant process.
A process is considered relevant if its user matches our user list, i.e.
its user is a slapos user
"""
users = build_user_list()
pList = [p for p in process_iter() if p.username in users]
length = len(pList) / 5
for i, process in enumerate(pList):
if length > 0 and i % length == 0:
sleep(.5)
yield build_snapshot(process)
def main():
"""
Main function
The idea here is to poll system every so many seconds
For each poll, we get a list of Snapshots, holding informations about
processes. We iterate over that list to store datas on a per user basis:
Each user object is a dict, indexed on timestamp. We add every snapshot
matching the user so that we get informations for each users
"""
try:
while True:
users = build_user_list()
key = time()
try:
for snapshot in current_state():
if snapshot:
user = users[snapshot.username]
if key in user:
user[key] += snapshot
else:
user[key] = snapshot
except NoSuchProcess:
continue
except (KeyboardInterrupt, SystemExit):
break
# XXX: we should use a value from the config file and not a hardcoded one
for user in users.values():
user.dumpSummary(user.path + '/var/xml_report/consumption.xml')
except AccessDenied:
print "You HAVE TO execute this script with root permission."
if __name__ == '__main__':
main()
#!/usr/bin/env python
import os
from datetime import datetime
from time import sleep
import gzip
import sys
class Reporter:
def run(self, *args):
json = self._aggregate(*args)
if self._send(json):
self._archive(path_list)
else:
self._fallback(*args)
def _aggregate(self, paths):
json = ""
if paths:
for path in paths:
print ( path )
with open(path, 'r') as f:
json += f.read()
return json
# XXX : implement
def _send(self, json_str):
return False
def _archive(self, paths, archive_dir):
for path in paths:
dirname = os.path.dirname(path)
basename = os.path.basename(path)
f = open(path, 'r')
suffix = datetime.now() + '.gz'
zipfile = gzip.open(archive_dir + basename + suffix, 'w')
zipfile.writelines(f)
os.remove(path)
# XXX : set a more appropriate timer (like 1h or something)
def _fallback(self, *args):
sleep(30)
self.run(*args)
def check(args):
if not args:
print('missing argument : filename list')
sys.exit(-1)
for arg in args:
if not os.path.isfile(arg):
print(arg + ' is not a valid path')
sys.exit(-1)
if __name__ == '__main__':
reporter = Reporter()
# basically, we are waiting for a list of paths there
args = sys.argv[1:]
check(args)
reporter.run(args)