Commit 1c7cc4f9 authored by Rafael Monnerat's avatar Rafael Monnerat

slapos.package was reimplemented using slapcache and ansible playbooks

parent e731fcd1
Changes
========
0.2.1.1 (2014-07-15)
------------------
* Minor fixes [Rafael Monnerat]
0.2.1 (2014-07-15)
------------------
* Fix test inconsistencies [Rafael Monnerat]
* Implement download of repositories keys [Rafael Monnerat]
0.2.0 (2014-06-15)
------------------
* Included a lot of tests [Rafael Monnerat]
* Minor fixed on general code [Rafael Monnerat]
* Include limits promise [Rafael Monnerat]
0.1.2 (2014-06-05)
------------------
* Fix slapos.libnetworkcache usage [Rafael Monnerat]
0.1.1 (2014-02-28)
------------------
* Included slappkg-conf [Rafael Monnerat]
* Fixed README.txt syntax [Rafael Monnerat]
0.1 (2014-02-28)
----------------
* Initial Stable Release [Rafael Monnerat]
0.0.1.4 (2014-02-24)
---------------------
* Initial Release of Basic Functions for Debian/Ubuntu and OpenSuse. [Rafael Monnerat]
include CHANGES.txt
include slapos/package/template/*
include slapos/package/promise/template/*
slapos.package
***************
SlapOS Package is a simple tool which aims on keep a packages updates on a Linux Distribution. The SlapOS Package can support multi distributions and use a simple signature file for take decision to upgrade or not the computer.
Basic Commands
===============
* slappkg-update: Perform the update, if requested.
* slappkg-discover: Prints the system signature, used to match with signature-list to decide which section to use.
* slappkg-upload-key: Uploads the signature configuration.
* slappkg-conf: Creates initial update.cfg and cron entry.
Basic Usage
============
.. code:: bash
# Generates initial configuration
slappkg-conf --slapos-configuration=update.cfg
# Runs update
slappkg-update --slapos-configuration=update.cfg
Upgrade Signature File
=======================
The signature file is composed by at least 2 sections:
System Section ([system]) where is defines reboot and upgrade expected dates. If
server was upgraded before the dates present there, the upgrade will be trigger
for packages (This only affects core promise).
Example:
.. code:: text
[system]
reboot = 2011-10-10
upgrade = 2014-02-20
Distribution sections can have any other name choses by the user and it should
contains the follow entries (always use new line for multiple values):
* repository-list: define a list of repository entries, defined by (name = value).
Special minor notations explaned futher.
* filter-package-list: list of package names that are going to be keep installed and
updated.
* filter-promise-list: list of promises that are enabled for this distribution. The user
can decide which promises are going to be checked on every run. If this
section is not present, all promises available are going to be checked.
* signature-list: defines which systems the promises are applicable on. The signature for
every system can be found by slappkg-discover command. If None signature
matches, the system will not be upgraded.
Example:
.. code:: text
[debian-default]
repository-list =
main = http://ftp.fr.debian.org/debian/ wheezy main
main-src = http://ftp.fr.debian.org/debian/ wheezy main
update = http://ftp.fr.debian.org/debian/ wheezy-updates main
update-src = http://ftp.fr.debian.org/debian/ wheezy-updates main
slapos = http://download.opensuse.org/repositories/home:/VIFIBnexedi/Debian_7.0 ./
re6stnet = http://git.erp5.org/dist/deb ./
filter-package-list =
ntp
slapos.node
re6stnet
filter-promise-list =
core
hostname
signature-list =
debian+++jessie/sid+++
Configuration Examples
========================
* Example of update.cfg:
.. code:: text
[slapupdate]
# Change this key for customise your upgrade.
upgrade_key = 'slapos-generic-upgrade-key'
[networkcache]
download-binary-cache-url = http://www.shacache.org/shacache
download-cache-url = https://www.shacache.org/shacache
download-binary-dir-url = http://www.shacache.org/shadir
# It is important to use only trustfull keys.
signature-certificate-list =
-----BEGIN CERTIFICATE-----
MIIB8DCCAVmgAwIBAgIJAPFf61p8y809MA0GCSqGSIb3DQEBBQUAMBAxDjAMBgNV
BAMMBUNPTVAtMCAXDTE0MDIxNzE2NDgxN1oYDzIxMTQwMTI0MTY0ODE3WjAQMQ4w
DAYDVQQDDAVDT01QLTCBnzANBgkqhkiG9w0BAQEFAAOBjQAwgYkCgYEAsiqCyuv1
HO9FmtwnMbEa1/u8Dn7T0k7hVKYXVQYof+59Ltbb3cA3nLjFSJDr/wQT6N89MccS
PneRzkWqZKL06Kmj+N+XJfRyVaTz1qQtNzjdbYkO6RgQq+fvq2CO0+PSnL6NttLU
/a9nQMcVm7wZ8kmY+AG5LbVo8lmxDD16Wq0CAwEAAaNQME4wHQYDVR0OBBYEFEVi
YyWHF3W7/O4NaTjn4lElLpp7MB8GA1UdIwQYMBaAFEViYyWHF3W7/O4NaTjn4lEl
Lpp7MAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQEFBQADgYEAgIPGoxhUa16AgjZx
Jr1kUrs8Fg3ig8eRFQlBSLYfANIUxcQ2ScFAkmsvwXY3Md7uaSvMJsEl2jcjdmdi
eSreNkx85j9GtMLY/2cv0kF4yAQNRtibtDkbg6fRNkmUopDosJNVf79l1GKX8JFL
zZBOFdOaLYY/6dLRwiTUKHU6su8=
-----END CERTIFICATE-----
* Example of upgrade signature:
.. code:: text
[debian-default]
repository-list =
main = http://ftp.fr.debian.org/debian/ wheezy main
main-src = http://ftp.fr.debian.org/debian/ wheezy main
update = http://ftp.fr.debian.org/debian/ wheezy-updates main
update-src = http://ftp.fr.debian.org/debian/ wheezy-updates main
slapos = http://download.opensuse.org/repositories/home:/VIFIBnexedi/Debian_7.0 ./
re6stnet = http://git.erp5.org/dist/deb ./
filter-package-list =
ntp
slapos.node
re6stnet
filter-promise-list =
core
hostname
signature-list =
debian+++jessie/sid+++
[opensuse-legacy]
repository-list =
suse = http://download.opensuse.org/distribution/12.1/repo/oss/
slapos = http://download.opensuse.org/repositories/home:/VIFIBnexedi/openSUSE_12.1
re6st = http://git.erp5.org/dist/rpm
filter-package-list =
ntp
slapos.node
re6stnet
signature-list =
opensuse+++12.1+++x86_64
[system]
reboot = 2011-10-10
upgrade = 2014-02-20
from setuptools import setup, find_packages
version = '0.2.1.1'
name = 'slapos.package'
long_description = open("README.txt").read() + "\n" + \
open("CHANGES.txt").read() + "\n"
setup(name=name,
version=version,
description="SlapOS Package Utils",
long_description=long_description,
classifiers=[
"Programming Language :: Python",
],
keywords='slapos package update',
license='GPLv3',
url='http://www.slapos.org',
author='VIFIB',
namespace_packages=['slapos'],
packages=find_packages(),
include_package_data=True,
install_requires=[
'slapos.libnetworkcache>=0.14.1',
'iniparse',
],
zip_safe=False,
entry_points={
'console_scripts': [
# Those entry points are development version and
# self updatable API
'slappkg-update-raw = slapos.package.update:do_update',
'slappkg-discover = slapos.package.distribution:do_discover',
'slappkg-upload-key = slapos.package.upload_key:main',
'slappkg-conf = slapos.package.conf:do_conf',
'slappkg-update = slapos.package.autoupdate:do_update',
],
# Not supported yet
#'slapos.cli': [
# 'package upload-key = slapos.package.upload_key:main'
# ]
},
test_suite="slapos.package.test",
)
# See http://peak.telecommunity.com/DevCenter/setuptools#namespace-packages
try:
__import__('pkg_resources').declare_namespace(__name__)
except ImportError:
from pkgutil import extend_path
__path__ = extend_path(__path__, __name__)
# See http://peak.telecommunity.com/DevCenter/setuptools#namespace-packages
try:
__import__('pkg_resources').declare_namespace(__name__)
except ImportError:
from pkgutil import extend_path
__path__ = extend_path(__path__, __name__)
#!/usr/bin/python
# -*- coding: utf-8 -*-
##############################################################################
#
# Copyright (c) 2012-2014 Vifib SARL and Contributors.
# All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly advised to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
"""
Self update the egg every run. It keeps the upgrade system
always upgraded.
"""
import os
import subprocess
import sys
def do_update():
_run_command('slappkg-update-raw')
def _run_command(command):
if '--self-update' in sys.argv:
sys.argv.remove('--self-update')
subprocess.call(['easy_install', '-U',
"-f", "http://www.nexedi.org/static/packages/source/",
"--allow-hosts", "http://www.nexedi.org/static/packages/source/",
'slapos.package'])
args = [
os.path.join(os.path.dirname(sys.argv[0]), command)
] + sys.argv[1:]
subprocess.call(args)
#!/usr/bin/python
# -*- coding: utf-8 -*-
##############################################################################
#
# Copyright (c) 2012-2014 Vifib SARL and Contributors.
# All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly advised to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import os
import subprocess
import logging
import ConfigParser
# slapos.package imports
from distribution import PackageManager
from signature import Signature
class SlapError(Exception):
"""
Slap error
"""
def __init__(self, message):
self.msg = message
class UsageError(SlapError):
pass
class ExecError(SlapError):
pass
# Class containing all parameters needed for configuration
class Config:
def __init__(self, option_dict=None):
if option_dict is not None:
# Set options parameters
for option, value in option_dict.__dict__.items():
setattr(self, option, value)
class BasePromise(PackageManager):
systemctl_path_list = ["/bin/systemctl",
"/usr/bin/systemctl"]
def __init__(self, config_dict=None):
self.config = Config(config_dict)
self.logger = logging.getLogger('')
self.logger.setLevel(logging.DEBUG)
# add ch to logger
#self.logger.addHandler(ch)
self.signature = None
def getSignature(self):
""" Return signature loaded from signature file """
# Get configuration
if self.signature is None:
self.signature = Signature(self.config)
self.signature.load()
return self.signature
def getSlapOSConfigurationDict(self, section="slapos"):
""" Return a dictionary with the slapos.cfg configuration """
configuration_info = ConfigParser.RawConfigParser()
configuration_info.read(self.config.slapos_configuration)
return dict(configuration_info.items(section))
def isApplicable(self):
""" Define if the promise is applicable checking the promise list """
upgrade_goal = self.getPromiseSectionDict()
if upgrade_goal is None:
return False
if upgrade_goal.get("filter-promise-list") is None:
# Run all if no filter is provided
return True
module = self.__module__.split(".")[-1]
return module in upgrade_goal.get("filter-promise-list")
def getPromiseSectionDict(self):
""" Get the section which matches with the system """
signature = self.getSignature()
configuration_dict = signature.get_signature_dict()
for entry in configuration_dict:
signature_list = configuration_dict[entry].get("signature-list")
if self.matchSignatureList(signature_list):
return configuration_dict[entry]
def log(self, message):
""" For now only prints, but it is usefull for test purpose """
self.logger.info(message)
def _isSystemd(self):
""" Dectect if Systemd is used """
for systemctl_path in self.systemctl_path_list:
if os.path.exists(systemctl_path):
return True
return False
def _service(self, name, action, stdout=None, stderr=None, dry_run=False):
"""
Wrapper invokation of service or systemctl by identifying what it is available.
"""
if self._isSystemd():
self._call(['systemctl', action, name], stdout=stdout, stderr=stderr, dry_run=dry_run)
else:
self._call(['service', name, action], stdout=stdout, stderr=stderr, dry_run=dry_run)
def _call(self, cmd_args, stdout=None, stderr=None, dry_run=False):
"""
Wrapper for subprocess.call() which'll secure the usage of external program's.
Args:
cmd_args: list of strings representing the command and all it's needed args
stdout/stderr: only precise PIPE (from subprocess) if you don't want the
command to create output on the regular stream
"""
self.log("Calling: %s" % ' '.join(cmd_args))
if not dry_run:
p = subprocess.Popen(cmd_args, stdout=stdout, stderr=stderr)
output, err = p.communicate()
return output, err
#!/usr/bin/python
# -*- coding: utf-8 -*-
##############################################################################
#
# Copyright (c) 2012-2014 Vifib SARL and Contributors.
# All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly advised to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import datetime
import logging
from optparse import OptionParser, Option
import sys
from base_promise import Config
import os
class Parser(OptionParser):
"""
Parse all arguments.
"""
def __init__(self, usage=None, version=None):
"""
Initialize all options possibles.
"""
OptionParser.__init__(self, usage=usage, version=version,
option_list=[
Option("--slapos-configuration",
default='/etc/opt/update.cfg',
help="Path to slapos configuration file"),
Option("--key",
default="slapos-generic-key",
help="Upgrade Key for configuration file"),
Option("--slapos-location",
default="/opt/slapos",
help="SlapOS binaries are location"),
Option("--cron-file",
default="/etc/cron.d/slappkg-update",
help="Cron configuration file path"),
])
def check_args(self):
"""
Check arguments
"""
(options, args) = self.parse_args()
return options
def get_template(name):
fd = open('/'.join(__file__.split('/')[:-1]) + '/template/%s' % name, "r")
try:
content = fd.read()
finally:
fd.close()
return content
import os, errno
def mkdir_p(path):
if not os.path.exists(path):
os.makedirs(path)
def create(path, text=None, mode=0666):
fd = os.open(path, os.O_CREAT | os.O_WRONLY | os.O_TRUNC, mode)
try:
os.write(fd, text)
finally:
os.close(fd)
def do_conf():
"""Generate Default Configuration file """
usage = "usage: %s [options] " % sys.argv[0]
run_config(Config(Parser(usage=usage).check_args()))
sys.exit()
def run_config(config):
if os.path.exists(config.slapos_configuration):
raise ValueError("%s already exists!" % config.slapos_configuration)
mkdir_p('/'.join(config.slapos_configuration.split('/')[:-1]))
configuration_content = get_template("update.cfg.in")
create(path=config.slapos_configuration,
text=configuration_content % {"upgrade_key": config.key})
configuration_content = get_template("update.cron.in")
create(path=config.cron_file,
text=configuration_content % {
"configuration_path": config.slapos_configuration,
"slapos_location": config.slapos_location
})
#!/usr/bin/python
# -*- coding: utf-8 -*-
##############################################################################
#
# Copyright (c) 2012-2014 Vifib SARL and Contributors.
# All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly advised to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import platform
import urllib
import glob
import re
import os
import subprocess
_distributor_id_file_re = re.compile("(?:DISTRIB_ID\s*=)\s*(.*)", re.I)
_release_file_re = re.compile("(?:DISTRIB_RELEASE\s*=)\s*(.*)", re.I)
_codename_file_re = re.compile("(?:DISTRIB_CODENAME\s*=)\s*(.*)", re.I)
class UnsupportedOSException(Exception):
pass
def patched_linux_distribution(distname='', version='', id='',
supported_dists=platform._supported_dists,
full_distribution_name=1):
# check for the Debian/Ubuntu /etc/lsb-release file first, needed so
# that the distribution doesn't get identified as Debian.
try:
etclsbrel = open("/etc/lsb-release", "rU")
for line in etclsbrel:
m = _distributor_id_file_re.search(line)
if m:
_u_distname = m.group(1).strip()
m = _release_file_re.search(line)
if m:
_u_version = m.group(1).strip()
m = _codename_file_re.search(line)
if m:
_u_id = m.group(1).strip()
if _u_distname and _u_version:
return (_u_distname, _u_version, _u_id)
except (EnvironmentError, UnboundLocalError):
pass
return platform.linux_distribution(distname, version, id, supported_dists, full_distribution_name)
class PackageManager:
def matchSignatureList(self, signature_list):
return self.getOSSignature() in signature_list
def _getLinuxDistribution(self):
return patched_linux_distribution()
def getOSSignature(self):
return "+++".join([i.strip().lower() for i in self._getLinuxDistribution()])
def getDistributionName(self):
return self._getLinuxDistribution()[0]
def getVersion(self):
return self._getLinuxDistribution()[1]
def _call(self, *args, **kw):
""" This is implemented in BasePromise """
raise NotImplemented
def _getDistributionHandler(self):
distribution_name = self.getDistributionName()
if distribution_name.lower().strip() == 'opensuse':
return Zypper()
elif distribution_name.lower().strip() in ['debian', 'ubuntu']:
return AptGet()
raise UnsupportedOSException("Distribution (%s) is not Supported!" % distribution_name)
def _purgeRepository(self):
""" Remove all repositories """
return self._getDistributionHandler().purgeRepository(self._call)
def _addRepository(self, url, alias):
""" Add a repository """
return self._getDistributionHandler().addRepository(self._call, url, alias)
def _addKey(self, url, alias):
""" Add a gpg or a key """
return self._getDistributionHandler().addKey(self._call, url, alias)
def _updateRepository(self):
""" Add a repository """
return self._getDistributionHandler().updateRepository(self._call)
def _installSoftwareList(self, name_list):
""" Upgrade softwares """
return self._getDistributionHandler().installSoftwareList(self._call, name_list)
def _updateSoftware(self):
""" Upgrade softwares """
return self._getDistributionHandler().updateSoftware(self._call)
def _updateSystem(self):
""" Dist-Upgrade of system """
return self._getDistributionHandler().updateSystem(self._call)
def update(self, repository_list=[], package_list=[], key_list=[]):
""" Perform upgrade """
self._purgeRepository()
for alias, url in repository_list:
self._addRepository(url, alias)
self._updateRepository()
for alias, url in key_list:
self._addKey(url, alias)
if len(package_list):
self._installSoftwareList(package_list)
# This helper implements API for package handling
class AptGet:
source_list_path = "/etc/apt/sources.list"
source_list_d_path = "/etc/apt/sources.list.d"
trusted_gpg_d_path = "/etc/apt/trusted.gpg.d"
def purgeRepository(self, caller):
""" Remove all repositories """
# Aggressive removal
if os.path.exists(self.source_list_path):
os.remove(self.source_list_path)
open(self.source_list_path, "w+").write("# Removed all")
for file_path in glob.glob("%s/*" % self.source_list_d_path):
os.remove(file_path)
def addRepository(self, caller, url, alias):
""" Add a repository """
if not os.path.exists(self.source_list_d_path):
os.mkdir(self.source_list_d_path)
repos_file = open("%s/%s.list" % (self.source_list_d_path, alias), "w")
prefix = "deb "
if alias.endswith("-src"):
prefix = "deb-src "
repos_file.write(prefix + url)
repos_file.close()
def addKey(self, caller, url, alias):
""" Download and add a gpg key """
if not os.path.exists(self.trusted_gpg_d_path):
os.mkdir(self.trusted_gpg_d_path)
gpg_path = "%s/%s.gpg" % (self.trusted_gpg_d_path, alias)
urllib.urlretrieve(url, gpg_path)
if os.path.exists(gpg_path):
# File already exists, skip
return
def updateRepository(self, caller):
""" Add a repository """
caller(['apt-get', 'update'], stdout=None)
def installSoftwareList(self, caller, name_list):
""" Instal Software """
self.updateRepository(caller)
command_list = ["apt-get", "install", "-y"]
command_list.extend(name_list)
caller(command_list, stdout=None)
def isUpgradable(self, caller, name):
output, err = caller(["apt-get", "upgrade", "--dry-run"])
for line in output.splitlines():
if line.startswith("Inst %s" % name):
return True
return False
def updateSoftware(self, caller):
""" Upgrade softwares """
self.updateRepository(caller)
caller(["apt-get", "upgrade"], stdout=None)
def updateSystem(self, caller):
""" Dist-Upgrade of system """
self.updateRepository(caller)
caller(['apt-get', 'dist-upgrade', '-y'], stdout=None)
class Zypper:
def purgeRepository(self, caller):
"""Remove all repositories"""
listing, err = caller(['zypper', 'lr'],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
while listing.count('\n') > 2:
output, err = caller(['zypper', 'rr', '1'], stdout=None)
listing, err = caller(['zypper', 'lr'],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
def addRepository(self, caller, url, alias):
""" Add a repository """
base_command = ['zypper', 'ar', '-fc']
if alias.endswith("unsafe"):
base_command.append('--no-gpgcheck')
base_command.extend([url, alias])
caller(base_command, stdout=None)
def addKey(self, caller, url, alias):
""" Add gpg or key """
raise NotImplementedError("Not implemented for this distribution")
def updateRepository(self, caller):
""" Add a repository """
caller(['zypper', '--gpg-auto-import-keys', 'in', '-Dly'], stdout=None)
def isUpgradable(self, caller, name):
output, err = caller(['zypper', '--gpg-auto-import-keys', 'up', '-ly'],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
for line in output.splitlines():
if line.startswith("'%s' is already installed." % name):
return False
return True
def installSoftwareList(self, caller, name_list):
""" Instal Software """
self.updateRepository(caller)
command_list = ['zypper', '--gpg-auto-import-keys', 'in', '-ly']
command_list.extend(name_list)
caller(command_list, stdout=None)
def updateSoftware(self, caller):
""" Upgrade softwares """
caller(['zypper', '--gpg-auto-import-keys', 'up', '-ly'], stdout=None)
def updateSystem(self, caller):
""" Dist-Upgrade of system """
caller(['zypper', '--gpg-auto-import-keys', 'dup', '-ly'], stdout=None)
def do_discover():
package_manager = PackageManager()
print "The signature for your current system is: %s" % \
package_manager.getOSSignature()
#!/usr/bin/python
# -*- coding: utf-8 -*-
##############################################################################
#
# Copyright (c) 2012-2014 Vifib SARL and Contributors.
# All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly advised to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import core
import hostname
import limitconf
import slappkgcron
promise_list = (
core.Promise,
hostname.Promise,
slappkgcron.Promise,
limitconf.Promise
)
#!/usr/bin/python
# -*- coding: utf-8 -*-
##############################################################################
#
# Copyright (c) 2012-2014 Vifib SARL and Contributors.
# All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly advised to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import datetime
import logging
from optparse import OptionParser, Option
import sys
from slapos.package.base_promise import BasePromise
class Promise(BasePromise):
def fixConsistency(self, upgrade=0, reboot=0, boot=0, **kw):
signature = self.getSignature()
today = datetime.date.today().isoformat()
if upgrade:
upgrade_goal = self.getPromiseSectionDict()
if upgrade_goal is None:
raise ValueError("None of the sections are compatible for upgrade!")
repository_tuple_list = []
for repository in upgrade_goal['repository-list']:
alias, url = repository.split("=")
repository_tuple_list.append((alias.strip(), url.strip()))
key_tuple_list = []
for key in upgrade_goal.get('key-list', []):
alias, url = key.split("=")
key_tuple_list.append((alias.strip(), url.strip()))
self.update(repository_tuple_list,
upgrade_goal['filter-package-list'],
key_tuple_list)
if upgrade and boot:
signature.update(reboot=today, upgrade=today)
if upgrade:
signature.update(upgrade=today)
elif reboot:
signature.update(reboot=today)
else:
raise ValueError(
"You need upgrade and/or reboot when invoke fixConsistency!")
signature.load()
self.log("Retrying after fixConsistency....\n\n")
return self.checkConsistency(fixit=0, **kw)
def checkConsistency(self, fixit=0, **kw):
# Get configuration
signature = self.getSignature()
self.log("Expected Reboot early them %s" % signature.reboot)
self.log("Expected Upgrade early them %s" % signature.upgrade)
self.log("Last reboot : %s" % signature.last_reboot)
self.log("Last upgrade : %s" % signature.last_upgrade)
if signature.upgrade > datetime.date.today():
self.log("Upgrade will happens on %s" % signature.upgrade)
# It is consistent for now
return True
# Check if run for first time
if signature.last_reboot is None:
if fixit:
# Purge repositories list and add new ones
return self.fixConsistency(upgrade=1, boot=1)
return False
else:
is_ok = True
if signature.last_upgrade < signature.upgrade:
# Purge repositories list and add new ones
self.log('Upgrade is required.')
if fixit:
is_ok = self.fixConsistency(upgrade=1)
else:
is_ok = False
else:
self.log("Your system is up to date")
if signature.last_reboot < signature.reboot:
self.log("Rebooting is required.")
if fixit:
return self.fixConsistency(reboot=1)
return False
else:
self.log("No need to reboot.")
return is_ok
#!/usr/bin/python
# -*- coding: utf-8 -*-
##############################################################################
#
# Copyright (c) 2012-2014 Vifib SARL and Contributors.
# All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly advised to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from slapos.package.base_promise import BasePromise
import os
class Promise(BasePromise):
configuration_file_path = '/etc/HOSTNAME'
def _getComputerId(self, **kw):
if kw.get("computer_id"):
return kw["computer_id"]
return self.getSlapOSConfigurationDict("slapos").get("computer_id")
def checkConsistency(self, fixit=0, **kw):
is_ok = False
computer_id = self._getComputerId(**kw)
if computer_id is None:
self.log("Unable to detect computer_id from configuration.")
return is_ok
if os.path.exists(self.configuration_file_path):
is_ok = computer_id in open(self.configuration_file_path, 'r').read()
if not is_ok and fixit:
return self.fixConsistency(**kw)
return is_ok
def fixConsistency(self, **kw):
"""Configures hostname daemon"""
computer_id = self._getComputerId(**kw)
if computer_id is None:
return self.checkConsistency(fixit=0, computer_id=computer_id, **kw)
self.log("Setting hostname in : %s" % self.configuration_file_path)
open(self.configuration_file_path, 'w').write("%s\n" % computer_id)
self._call(['hostname', '-F', self.configuration_file_path])
return self.checkConsistency(fixit=0, **kw)
#!/usr/bin/python
# -*- coding: utf-8 -*-
##############################################################################
#
# Copyright (c) 2012-2014 Vifib SARL and Contributors.
# All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly advised to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from slapos.package.base_promise import BasePromise
import os
import pkg_resources
class Promise(BasePromise):
configuration_file_path = '/etc/security/limits.conf'
def _getLimitConfTemplate(self):
return pkg_resources.resource_stream(__name__,
'template/limits.conf.in').read()
def checkConsistency(self, fixit=0, **kw):
is_ok = False
if os.path.exists(self.configuration_file_path):
expected_limit = self._getLimitConfTemplate()
with open(self.configuration_file_path, 'r') as limit_conf:
is_ok = expected_limit == limit_conf.read()
if not is_ok and fixit:
return self.fixConsistency(**kw)
return is_ok
def fixConsistency(self, **kw):
"""Configures hostname daemon"""
self.log("Update limits : %s" % self.configuration_file_path)
if os.path.exists(self.configuration_file_path):
shutil.copy(self.configuration_file_path,
"%s.%s" % (self.configuration_file_path, time.time()))
with open(self.configuration_file_path, 'w') as limit_conf:
limit_conf.write(self._getLimitConfTemplate())
return self.checkConsistency(fixit=0, **kw)
#!/usr/bin/python
# -*- coding: utf-8 -*-
##############################################################################
#
# Copyright (c) 2012-2014 Vifib SARL and Contributors.
# All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly advised to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from slapos.package.base_promise import BasePromise
from slapos.package.conf import get_template, create
import os
import pkg_resources
class Promise(BasePromise):
configuration_file_path = '/etc/cron.d/slappkg-update'
def _getCronTemplate(self):
configuration_content = get_template("update.cron.in")
return configuration_content % {
"configuration_path": self.config.slapos_configuration,
# XXX Unhardcore me please.
"slapos_location": "/opt/slapos"
}
def checkConsistency(self, fixit=0, **kw):
is_ok = False
if os.path.exists(self.configuration_file_path):
expected_content = self._getCronTemplate()
with open(self.configuration_file_path, 'r') as actual_conf:
is_ok = expected_content == actual_conf.read()
if not is_ok and fixit:
return self.fixConsistency(**kw)
return is_ok
def fixConsistency(self, **kw):
self.log("Update cron : %s" % self.configuration_file_path)
if os.path.exists(self.configuration_file_path):
shutil.rmtree(self.configuration_file_path)
create(path=self.configuration_file_path,
text=self._getCronTemplate())
return self.checkConsistency(fixit=0, **kw)
* hard nproc 8096
* soft nproc 1024
* hard nofile 32768
* soft nofile 32768
root hard nofile 65535
root soft nofile 65535
# -*- coding: utf-8 -*-
##############################################################################
#
# Copyright (c) 2012-2014 Vifib SARL and Contributors.
# All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly advised to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import ConfigParser
import os
import time
import traceback
import tempfile
import datetime
import shutil
from slapos.networkcachehelper import NetworkcacheClient, \
helper_download_network_cached
class NetworkCache:
def __init__(self, configuration_path):
if not os.path.exists(configuration_path):
raise ValueError("You need configuration file")
self.configuration = configuration_path
self._load()
def _load(self):
network_cache_info = ConfigParser.RawConfigParser()
network_cache_info.read(self.configuration)
network_cache_info_dict = dict(network_cache_info.items('networkcache'))
def get_(name):
return network_cache_info_dict.get(name)
self.download_binary_cache_url = get_('download-binary-cache-url')
self.download_cache_url = get_('download-cache-url')
self.download_binary_dir_url = get_('download-binary-dir-url')
self.signature_certificate_list = get_('signature-certificate-list')
# Not mandatory
self.dir_url = get_('upload-dir-url')
self.cache_url = get_('upload-cache-url')
self.signature_private_key_file = get_('signature_private_key_file')
self.shacache_cert_file = get_('shacache-cert-file')
self.shacache_key_file = get_('shacache-key-file')
self.shadir_cert_file = get_('shadir-cert-file')
self.shadir_key_file = get_('shadir-key-file')
if network_cache_info.has_section('slapupdate'):
self.directory_key = network_cache_info.get('slapupdate', 'upgrade_key')
else:
self.directory_key = "slapos-upgrade-testing-key"
def upload(self, path, metadata_dict):
"""
Upload an existing file, using a file_descriptor.
"""
file_descriptor = open(path, 'r')
if not (self.dir_url and self.cache_url):
return False
# backward compatibility
metadata_dict.setdefault('file', 'notused')
metadata_dict.setdefault('urlmd5', 'notused')
# convert '' into None in order to call nc nicely
with NetworkcacheClient(self.cache_url, self.dir_url,
signature_private_key_file=self.signature_private_key_file or None,
shacache_cert_file=self.shacache_cert_file or None,
shacache_key_file=self.shacache_key_file or None,
shadir_cert_file=self.shadir_cert_file or None,
shadir_key_file=self.shadir_key_file or None,
) as nc:
return nc.upload(file_descriptor, self.directory_key, **metadata_dict)
def download(self, path, wanted_metadata_dict={},
required_key_list=[], strategy=None):
result = helper_download_network_cached(
self.download_binary_dir_url,
self.download_binary_cache_url,
self.signature_certificate_list,
self.directory_key, wanted_metadata_dict,
required_key_list, strategy)
if result:
# XXX check if nc filters signature_certificate_list!
# Creates a file with content to desired path.
file_descriptor, metadata_dict = result
f = open(path, 'w+b')
try:
shutil.copyfileobj(file_descriptor, f)
# XXX method should check MD5.
return metadata_dict
finally:
f.close()
file_descriptor.close()
return False
def strategy(entry_list):
"""Get the latest entry. """
timestamp = 0
best_entry = None
for entry in entry_list:
if entry['timestamp'] > timestamp:
best_entry = entry
timestamp = entry['timestamp']
return best_entry
def get_yes_no(prompt):
while True:
answer = raw_input(prompt + " [y,n]: ")
if answer.upper() in ['Y', 'YES']:
return True
if answer.upper() in ['N', 'NO']:
return False
class Signature:
def __init__(self, config, logger=None):
self.config = config
self.logger = logger
def log(self, message):
if self.logger is not None:
self.logger.debug(message)
else:
print message
def _download(self, path):
"""
Download a tar of the repository from cache, and untar it.
"""
shacache = NetworkCache(self.config.slapos_configuration)
if shacache.signature_certificate_list is None:
raise ValueError("You need at least one valid signature for download")
return shacache.download(path=path,
required_key_list=['timestamp'],
strategy=strategy)
def download(self):
"""
Get status information and return its path
"""
info, path = tempfile.mkstemp()
if not self._download(path) == False:
return path
else:
raise ValueError("No result from shacache")
def _upload(self, path):
"""
Creates uploads repository to cache.
"""
shacache = NetworkCache(self.config.slapos_configuration)
metadata_dict = {
# XXX: we set date from client side. It can be potentially dangerous
# as it can be badly configured.
'timestamp': time.time(),
}
try:
if shacache.upload(path=path,
metadata_dict=metadata_dict):
self.log('Uploaded update file to cache.')
except Exception:
self.log('Unable to upload to cache:\n%s.' % traceback.format_exc())
def upload(self, dry_run=0, verbose=1):
upgrade_info = ConfigParser.RawConfigParser()
upgrade_info.read(self.config.upgrade_file)
tomorrow = datetime.date.today() + datetime.timedelta(days=1)
if self.config.reboot:
upgrade_info.set('system', 'reboot', tomorrow.isoformat())
if self.config.upgrade:
upgrade_info.set('system', 'upgrade', tomorrow.isoformat())
file = open(self.config.upgrade_file, "w")
upgrade_info.write(file)
file.close()
if verbose:
self.log(" You will update this :")
self.log(open(self.config.upgrade_file).read())
if dry_run:
return
if get_yes_no("Do you want to continue? "):
self._upload(self.config.upgrade_file)
def update(self, reboot=None, upgrade=None):
if reboot is None and upgrade is None:
return
if not self.current_state.has_section('system'):
self.current_state.add_section('system')
if reboot is not None:
self.current_state.set('system', 'reboot', reboot)
if upgrade is not None:
self.current_state.set('system', 'upgrade', upgrade)
current_state_file = open(self.config.srv_file, "w")
self.current_state.write(current_state_file)
current_state_file.close()
def get_signature_dict(self):
""" Convert Next state info into a dict """
map_dict = {}
for key in self.next_state.sections():
if key == "system":
continue
def clean_list(l):
return [x.strip() for x in l.split('\n') if x.strip() != '']
map_dict[key] = {}
for entry in self.next_state.options(key):
map_dict[key][entry] = clean_list(self.next_state.get(key, entry))
return map_dict
def _read_state(self, state, name):
""" Extract information from config file """
if not state.has_section('system'):
return None
return datetime.datetime.strptime(
state.get('system', name), "%Y-%m-%d").date()
def load(self):
"""
Extract information from config file and server file
"""
self.current_state = ConfigParser.RawConfigParser()
self.current_state.read(self.config.srv_file)
self.next_state = ConfigParser.ConfigParser()
self.next_state.read(self.download())
self.reboot = self._read_state(self.next_state, "reboot")
self.upgrade = self._read_state(self.next_state, "upgrade")
self.last_reboot = self._read_state(self.current_state, "reboot")
self.last_upgrade = self._read_state(self.current_state, "upgrade")
[slapupdate]
upgrade_key = '%(upgrade_key)s'
[networkcache]
download-binary-cache-url = http://www.shacache.org/shacache
download-cache-url = https://www.shacache.org/shacache
download-binary-dir-url = http://www.shacache.org/shadir
# Standard Signature handled by SlapOS Administrators
signature-certificate-list =
-----BEGIN CERTIFICATE-----
MIIB8DCCAVmgAwIBAgIJAPFf61p8y809MA0GCSqGSIb3DQEBBQUAMBAxDjAMBgNV
BAMMBUNPTVAtMCAXDTE0MDIxNzE2NDgxN1oYDzIxMTQwMTI0MTY0ODE3WjAQMQ4w
DAYDVQQDDAVDT01QLTCBnzANBgkqhkiG9w0BAQEFAAOBjQAwgYkCgYEAsiqCyuv1
HO9FmtwnMbEa1/u8Dn7T0k7hVKYXVQYof+59Ltbb3cA3nLjFSJDr/wQT6N89MccS
PneRzkWqZKL06Kmj+N+XJfRyVaTz1qQtNzjdbYkO6RgQq+fvq2CO0+PSnL6NttLU
/a9nQMcVm7wZ8kmY+AG5LbVo8lmxDD16Wq0CAwEAAaNQME4wHQYDVR0OBBYEFEVi
YyWHF3W7/O4NaTjn4lElLpp7MB8GA1UdIwQYMBaAFEViYyWHF3W7/O4NaTjn4lEl
Lpp7MAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQEFBQADgYEAgIPGoxhUa16AgjZx
Jr1kUrs8Fg3ig8eRFQlBSLYfANIUxcQ2ScFAkmsvwXY3Md7uaSvMJsEl2jcjdmdi
eSreNkx85j9GtMLY/2cv0kF4yAQNRtibtDkbg6fRNkmUopDosJNVf79l1GKX8JFL
zZBOFdOaLYY/6dLRwiTUKHU6su8=
-----END CERTIFICATE-----
# BEWARE: This file will be automatically regenerated on every run of
# slappkg
SHELL=/bin/sh
PATH=/usr/bin:/usr/sbin:/sbin:/bin:/usr/lib/news/bin:/usr/local/bin
MAILTO=root
# This file expects that
0 1 * * * root slappkg-update --self-update --slapos-configuration=%(configuration_path)s -v >> %(slapos_location)s/log/slappkg-update.log 2>&1
10 * * * * root slappkg-update --wait --slapos-configuration=%(configuration_path)s -v >> %(slapos_location)s/log/slappkg-update.log 2>&1
# -*- coding: utf-8 -*-
##############################################################################
#
# Copyright (c) 2012-2014 Vifib SARL and Contributors.
# All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly advised to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
def _fake_call(self, *args, **kw):
self.last_call = (args, kw)
CONFIGURATION_FILE = """
[networkcache]
download-binary-cache-url = http://www.shacache.org/shacache
download-cache-url = https://www.shacache.org/shacache
download-binary-dir-url = http://www.shacache.org/shadir
signature-certificate-list =
-----BEGIN CERTIFICATE-----
MIIB8DCCAVmgAwIBAgIJAPFf61p8y809MA0GCSqGSIb3DQEBBQUAMBAxDjAMBgNV
BAMMBUNPTVAtMCAXDTE0MDIxNzE2NDgxN1oYDzIxMTQwMTI0MTY0ODE3WjAQMQ4w
DAYDVQQDDAVDT01QLTCBnzANBgkqhkiG9w0BAQEFAAOBjQAwgYkCgYEAsiqCyuv1
HO9FmtwnMbEa1/u8Dn7T0k7hVKYXVQYof+59Ltbb3cA3nLjFSJDr/wQT6N89MccS
PneRzkWqZKL06Kmj+N+XJfRyVaTz1qQtNzjdbYkO6RgQq+fvq2CO0+PSnL6NttLU
/a9nQMcVm7wZ8kmY+AG5LbVo8lmxDD16Wq0CAwEAAaNQME4wHQYDVR0OBBYEFEVi
YyWHF3W7/O4NaTjn4lElLpp7MB8GA1UdIwQYMBaAFEViYyWHF3W7/O4NaTjn4lEl
Lpp7MAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQEFBQADgYEAgIPGoxhUa16AgjZx
Jr1kUrs8Fg3ig8eRFQlBSLYfANIUxcQ2ScFAkmsvwXY3Md7uaSvMJsEl2jcjdmdi
eSreNkx85j9GtMLY/2cv0kF4yAQNRtibtDkbg6fRNkmUopDosJNVf79l1GKX8JFL
zZBOFdOaLYY/6dLRwiTUKHU6su8=
-----END CERTIFICATE-----
[slapupdate]
upgrade_key = slapos-upgrade-testing-key-with-config-file
"""
UPGRADE_KEY = """[debian-default]
repository-list =
main = http://ftp.fr.debian.org/debian/ wheezy main
main-src = http://ftp.fr.debian.org/debian/ wheezy main
update = http://ftp.fr.debian.org/debian/ wheezy-updates main
update-src = http://ftp.fr.debian.org/debian/ wheezy-updates main
slapos = http://download.opensuse.org/repositories/home:/VIFIBnexedi:/branches:/home:/VIFIBnexedi/Debian_7.0/ ./
re6stnet = http://git.erp5.org/dist/deb ./
key-list =
slapos = http://git.erp5.org/gitweb/slapos.package.git/blob_plain/HEAD:/debian-preseed/slapos.openbuildservice.key
re6st = http://git.erp5.org/gitweb/slapos.package.git/blob_plain/HEAD:/debian-preseed/git.erp5.org.key
filter-package-list =
ntp
openvpn
slapos.node
re6stnet
filter-promise-list =
core
signature-list =
debian+++jessie/sid+++
debian+++7.4+++
debian+++7.5+++
debian+++7.3+++
debian+++7+++
[opensuse-legacy]
repository-list =
suse = http://download.opensuse.org/distribution/12.1/repo/oss/
slapos = http://download.opensuse.org/repositories/home:/VIFIBnexedi:/branches:/home:/VIFIBnexedi/openSUSE_12.1/
re6st = http://git.erp5.org/dist/rpm
key-list =
filter-promise-list =
core
filter-package-list =
ntp
openvpn
slapos.node
re6stnet
signature-list =
opensuse+++12.1+++x86_64
[system]
reboot = 2011-10-10
upgrade = 2014-06-04
"""
UPGRADE_KEY_WITHOUT_KEY_LIST = """[debian-default]
repository-list =
main = http://ftp.fr.debian.org/debian/ wheezy main
main-src = http://ftp.fr.debian.org/debian/ wheezy main
update = http://ftp.fr.debian.org/debian/ wheezy-updates main
update-src = http://ftp.fr.debian.org/debian/ wheezy-updates main
slapos = http://download.opensuse.org/repositories/home:/VIFIBnexedi:/branches:/home:/VIFIBnexedi/Debian_7.0/ ./
re6stnet = http://git.erp5.org/dist/deb ./
filter-package-list =
ntp
openvpn
slapos.node
re6stnet
filter-promise-list =
core
signature-list =
debian+++jessie/sid+++
debian+++7.4+++
debian+++7.5+++
debian+++7.3+++
debian+++7+++
[opensuse-legacy]
repository-list =
suse = http://download.opensuse.org/distribution/12.1/repo/oss/
slapos = http://download.opensuse.org/repositories/home:/VIFIBnexedi:/branches:/home:/VIFIBnexedi/openSUSE_12.1/
re6st = http://git.erp5.org/dist/rpm
filter-promise-list =
core
filter-package-list =
ntp
openvpn
slapos.node
re6stnet
signature-list =
opensuse+++12.1+++x86_64
[system]
reboot = 2011-10-10
upgrade = 2014-06-04
"""
# -*- coding: utf-8 -*-
##############################################################################
#
# Copyright (c) 2012-2014 Vifib SARL and Contributors.
# All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly advised to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from slapos.package.base_promise import BasePromise
from slapos.package.test.base import CONFIGURATION_FILE, UPGRADE_KEY, _fake_call
import os
from slapos.package.signature import NetworkCache
from optparse import Values
import unittest
FAKE_CALL_COUNTER = 0
def _fake_signature_download(self, path, *args, **kwargs):
global FAKE_CALL_COUNTER
FAKE_CALL_COUNTER += 1
with open(path, 'w') as upgrade_signature:
upgrade_signature.write(UPGRADE_KEY)
return True
class testBasePromiseCase(unittest.TestCase):
def setUp(self):
self.original_basepromise_call = BasePromise._call
BasePromise._call = _fake_call
self.original_network_cache_download = NetworkCache.download
NetworkCache.download = _fake_signature_download
global FAKE_CALL_COUNTER
FAKE_CALL_COUNTER = 0
self.config_dict = {
"slapos_configuration": self._createConfigurationFile(),
"srv_file": "/tmp/test_base_promise_slapupdate",
"dry_run": False,
"verbose": False
}
def tearDown(self):
BasePromise._call = self.original_basepromise_call
NetworkCache.download = self.original_network_cache_download
def _createConfigurationFile(self):
with open("/tmp/test_base_promise_configuration.cfg", "w") as configuration_file:
configuration_file.write(CONFIGURATION_FILE)
return "/tmp/test_base_promise_configuration.cfg"
def testIsSystemd(self):
promise = BasePromise()
systemctl_path = "/tmp/_testBasePromiseCase_systemctl_test"
promise.systemctl_path_list = ["/tmp/_testBasePromiseCase_systemctl_test"]
if os.path.exists(systemctl_path):
os.remove(systemctl_path)
self.assertFalse(promise._isSystemd())
open(systemctl_path, "w").write("echo test")
self.assertTrue(promise._isSystemd())
def testSystemctl(self):
promise = BasePromise()
def _fake_true_isSystemd():
return True
promise._isSystemd = _fake_true_isSystemd
promise._service("service_name", "service_action")
self.assertEqual(promise.last_call,
((['systemctl', 'service_action', 'service_name'],),
{'dry_run': False, 'stderr': None, 'stdout': None}))
def testService(self):
promise = BasePromise()
def _fake_false_isSystemd():
return False
promise._isSystemd = _fake_false_isSystemd
promise._service("service_name", "service_action")
self.assertEqual(promise.last_call,
((['service', 'service_name', 'service_action'],),
{'dry_run': False, 'stderr': None, 'stdout': None}))
def testGetSignature(self):
global FAKE_CALL_COUNTER
promise = BasePromise(Values(self.config_dict))
self.assertEquals(FAKE_CALL_COUNTER, 0)
signature = promise.getSignature()
self.assertNotEquals(signature, None)
self.assertEquals(FAKE_CALL_COUNTER, 1)
# Make sure is already loaded.
self.assertNotEquals(signature.current_state, None)
self.assertEquals(signature.reboot.strftime("%Y-%m-%d"), "2011-10-10" )
# Make sure it do not download things again.
signature = promise.getSignature()
self.assertEquals(FAKE_CALL_COUNTER, 1)
def testGetSlapOSConfiguration(self):
promise = BasePromise(Values(self.config_dict))
self.assertEquals(promise.getSlapOSConfigurationDict("slapupdate"),
{'upgrade_key': 'slapos-upgrade-testing-key-with-config-file'})
def testGetPromiseSectionDict(self):
promise = BasePromise(Values(self.config_dict))
def fake_debian_getOSSignature():
return "debian+++7.4+++"
def fake_opensuse_getOSSignature():
return "opensuse+++12.1+++x86_64"
def fake_unsupported_getOSSignature():
return "readhat+++1+++"
promise.getOSSignature = fake_unsupported_getOSSignature
self.assertEquals(promise.getPromiseSectionDict(), None)
promise.getOSSignature = fake_opensuse_getOSSignature
opensuse_dict = {'filter-promise-list': ['core'],
'filter-package-list': ['ntp', 'openvpn', 'slapos.node', 're6stnet'],
'key-list': [],
'repository-list': [
'suse = http://download.opensuse.org/distribution/12.1/repo/oss/',
'slapos = http://download.opensuse.org/repositories/home:/VIFIBnexedi:/branches:/home:/VIFIBnexedi/openSUSE_12.1/',
're6st = http://git.erp5.org/dist/rpm'],
'signature-list': ['opensuse+++12.1+++x86_64']}
self.assertEquals(promise.getPromiseSectionDict(), opensuse_dict)
promise.getOSSignature = fake_debian_getOSSignature
debian_dict = {'filter-promise-list': ['core'],
'filter-package-list': ['ntp', 'openvpn', 'slapos.node', 're6stnet'],
'key-list': [
'slapos = http://git.erp5.org/gitweb/slapos.package.git/blob_plain/HEAD:/debian-preseed/slapos.openbuildservice.key',
're6st = http://git.erp5.org/gitweb/slapos.package.git/blob_plain/HEAD:/debian-preseed/git.erp5.org.key'],
'repository-list': [
'main = http://ftp.fr.debian.org/debian/ wheezy main',
'main-src = http://ftp.fr.debian.org/debian/ wheezy main',
'update = http://ftp.fr.debian.org/debian/ wheezy-updates main',
'update-src = http://ftp.fr.debian.org/debian/ wheezy-updates main',
'slapos = http://download.opensuse.org/repositories/home:/VIFIBnexedi:/branches:/home:/VIFIBnexedi/Debian_7.0/ ./',
're6stnet = http://git.erp5.org/dist/deb ./'],
'signature-list': ['debian+++jessie/sid+++', 'debian+++7.4+++', 'debian+++7.5+++',
'debian+++7.3+++', 'debian+++7+++']}
self.assertEquals(promise.getPromiseSectionDict(), debian_dict)
def testIsAplicable(self):
from slapos.package.promise import core, limitconf
def fake_debian_getOSSignature():
return "debian+++7.4+++"
promise = core.Promise(Values(self.config_dict))
promise.getOSSignature = fake_debian_getOSSignature
self.assertEquals(promise.isApplicable(), True)
promise = limitconf.Promise(Values(self.config_dict))
promise.getOSSignature = fake_debian_getOSSignature
self.assertEquals(promise.isApplicable(), False)
# -*- coding: utf-8 -*-
##############################################################################
#
# Copyright (c) 2012-2014 Vifib SARL and Contributors.
# All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly advised to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from slapos.package.distribution import PackageManager, AptGet, Zypper, \
UnsupportedOSException
import tempfile
import shutil
import os
import glob
import unittest
def _fake_call(self, *args, **kw):
self.last_call = args
def _fake_debian_distribution(*args, **kw):
return ('debian', '7.4', '')
def _fake_opensuse_distribution(*args, **kw):
return ('OpenSuse ', '12.1', '')
class DummyDistributionHandler:
called = []
def purgeRepository(self, caller):
self.called.append("purgeRepository")
def addRepository(self, caller, url, alias):
self.called.append("addRepository")
def addKey(self, caller, url, alias):
self.called.append("addKey")
def updateRepository(self, caller):
self.called.append("updateRepository")
def isUpgradable(self, caller, name):
self.called.append("isUpgradeble")
def installSoftwareList(self, caller, name_list):
self.called.append("installSoftwareList")
def updateSoftware(self, caller):
self.called.append("updateSoftware")
def updateSystem(self, caller):
self.called.append("updateSystem")
class testPackageManager(unittest.TestCase):
def setUp(self):
PackageManager._call = _fake_call
AptGet.source_list_path = "/tmp/test_distribution_sources.list"
AptGet.source_list_d_path = "/tmp/test_distribution_sources.list.d"
AptGet.trusted_gpg_d_path = "/tmp/test_distribution_trusted.gpg.d"
def testGetDistributionHandler(self):
package_manager = PackageManager()
def OpenSuseCase():
return "OpenSuse"
package_manager.getDistributionName = OpenSuseCase
self.assertTrue(
isinstance(package_manager._getDistributionHandler(), Zypper))
def DebianCase():
return "Debian"
package_manager.getDistributionName = DebianCase
self.assertTrue(
isinstance(package_manager._getDistributionHandler(), AptGet))
def RedHatCase():
return "Red Hat"
package_manager.getDistributionName = RedHatCase
self.assertRaises(UnsupportedOSException,
package_manager._getDistributionHandler)
def testGetDistributionName(self):
package_manager = PackageManager()
package_manager._getLinuxDistribution = _fake_opensuse_distribution
self.assertEquals(package_manager.getDistributionName(), "OpenSuse ")
package_manager._getLinuxDistribution = _fake_debian_distribution
self.assertEquals(package_manager.getDistributionName(), "debian")
def testGetVersion(self):
package_manager = PackageManager()
package_manager._getLinuxDistribution = _fake_opensuse_distribution
self.assertEquals(package_manager.getVersion(), "12.1")
package_manager._getLinuxDistribution = _fake_debian_distribution
self.assertEquals(package_manager.getVersion(), "7.4")
def testOSSignature(self):
package_manager = PackageManager()
package_manager._getLinuxDistribution = _fake_opensuse_distribution
self.assertEquals(package_manager.getOSSignature(), "opensuse+++12.1+++")
package_manager._getLinuxDistribution = _fake_debian_distribution
self.assertEquals(package_manager.getOSSignature(), "debian+++7.4+++")
def _getPatchedPackageManagerForApiTest(self):
package_manager = PackageManager()
dummy_handler = DummyDistributionHandler()
def DummyCase():
dummy_handler.called = []
return dummy_handler
package_manager._getDistributionHandler = DummyCase
self.assertEquals(package_manager._getDistributionHandler(), dummy_handler)
self.assertEquals(dummy_handler.called, [])
return package_manager, dummy_handler
def testPurgeRepositoryAPI(self):
package_manager, handler = self._getPatchedPackageManagerForApiTest()
package_manager._purgeRepository()
self.assertEquals(handler.called, ["purgeRepository"])
def testAddRepositoryAPI(self):
package_manager, handler = self._getPatchedPackageManagerForApiTest()
package_manager._addRepository("http://...", "slapos")
self.assertEquals(handler.called, ["addRepository"])
def testAddKeyAPI(self):
package_manager, handler = self._getPatchedPackageManagerForApiTest()
package_manager._addKey("http://...", "slapos")
self.assertEquals(handler.called, ["addKey"])
def testUpdateRepositoryAPI(self):
package_manager, handler = self._getPatchedPackageManagerForApiTest()
package_manager._updateRepository()
self.assertEquals(handler.called, ["updateRepository"])
def testInstalledSoftwareListAPI(self):
package_manager, handler = self._getPatchedPackageManagerForApiTest()
package_manager._installSoftwareList(["slapos", "re6st"])
self.assertEquals(handler.called, ["installSoftwareList"])
def testUpdateSoftwareAPI(self):
package_manager, handler = self._getPatchedPackageManagerForApiTest()
package_manager._updateSoftware()
self.assertEquals(handler.called, ["updateSoftware"])
def testUpdateSystemAPI(self):
package_manager, handler = self._getPatchedPackageManagerForApiTest()
package_manager._updateSystem()
self.assertEquals(handler.called, ["updateSystem"])
def testAptGetPurgeRepository(self):
handler = AptGet()
called = []
def dummy_caller(*args, **kwargs):
called.append("called")
source_list = "/tmp/test_distribution_sources.list"
source_list_d = "/tmp/test_distribution_sources.list.d"
if not os.path.exists(source_list_d):
os.mkdir(source_list_d)
open("%s/couscous.list" % source_list_d, "w+").write("# test")
handler.purgeRepository(dummy_caller)
self.assertTrue(os.path.exists(source_list))
self.assertEquals(open(source_list, "r").read(),
"# Removed all")
self.assertEquals(glob.glob("%s/*" % source_list_d), [])
def testAptGetAddRepository(self):
handler = AptGet()
called = []
def dummy_caller(*args, **kwargs):
called.append("called")
source_list_d = "/tmp/test_distribution_sources.list.d"
if os.path.exists(source_list_d):
shutil.rmtree(source_list_d)
handler.addRepository(dummy_caller, "http://test main", "slapos")
self.assertTrue(os.path.exists(source_list_d))
self.assertEquals(open("%s/slapos.list" % source_list_d, "r").read(),
"deb http://test main")
def testAptGetAddKey(self):
handler = AptGet()
called = []
def dummy_caller(*args, **kwargs):
called.append("called")
info, fake_gpg_path = tempfile.mkstemp()
with open(fake_gpg_path, "w") as f:
f.write("KEY")
trusted_list_d = "/tmp/test_distribution_trusted.gpg.d"
if os.path.exists(trusted_list_d):
shutil.rmtree(trusted_list_d)
handler.addKey(dummy_caller, "file://%s" % fake_gpg_path, "slapos")
self.assertTrue(os.path.exists(trusted_list_d))
self.assertEquals(open("%s/slapos.gpg" % trusted_list_d, "r").read(),
"KEY")
def testAptGetUpdateRepository(self):
handler = AptGet()
called = []
def dummy_caller(*args, **kwargs):
called.append((args, kwargs))
handler.updateRepository(dummy_caller)
self.assertEquals(len(called), 1)
command = called[0][0][0]
extra = called[0][1]
self.assertEquals(['apt-get', 'update'], command)
self.assertEquals(extra, {'stdout': None})
def testAptGetInstallSoftwareList(self):
handler = AptGet()
called = []
def dummy_caller(*args, **kwargs):
called.append((args, kwargs))
handler.installSoftwareList(dummy_caller, ["slapos", "re6st"])
self.assertEquals(len(called), 2)
command = called[0][0][0]
extra = called[0][1]
self.assertEquals(['apt-get', 'update'], command)
self.assertEquals(extra, {'stdout': None})
command = called[1][0][0]
extra = called[1][1]
self.assertEquals(["apt-get", "install", "-y", "slapos", "re6st"], command)
self.assertEquals(extra, {'stdout': None})
# def isUpgradable(self, caller, name):
# output, err = caller(["apt-get", "upgrade", "--dry-run"])
# for line in output.splitlines():
# if line.startswith("Inst %s" % name):
# return True
# return False
#
def testAptGetUpdateSoftware(self):
handler = AptGet()
called = []
def dummy_caller(*args, **kwargs):
called.append((args, kwargs))
handler.updateSoftware(dummy_caller)
self.assertEquals(len(called), 2)
command = called[0][0][0]
extra = called[0][1]
self.assertEquals(['apt-get', 'update'], command)
self.assertEquals(extra, {'stdout': None})
command = called[1][0][0]
extra = called[1][1]
self.assertEquals(["apt-get", "upgrade"], command)
self.assertEquals(extra, {'stdout': None})
def testAptGetUpdateSystem(self):
handler = AptGet()
called = []
def dummy_caller(*args, **kwargs):
called.append((args, kwargs))
handler.updateSystem(dummy_caller)
self.assertEquals(len(called), 2)
command = called[0][0][0]
extra = called[0][1]
self.assertEquals(['apt-get', 'update'], command)
self.assertEquals(extra, {'stdout': None})
command = called[1][0][0]
extra = called[1][1]
self.assertEquals(["apt-get", "dist-upgrade", "-y"], command)
self.assertEquals(extra, {'stdout': None})
def testZypperPurgeRepository_NoRepository(self):
handler = Zypper()
called = []
def dummy_caller(*args, **kwargs):
called.append((args, kwargs))
return "", ""
handler.purgeRepository(dummy_caller)
self.assertEquals(len(called), 1)
command = called[0][0][0]
extra = called[0][1]
self.assertEquals(['zypper', 'lr'],
command)
self.assertEquals(extra, {'stderr': -1, 'stdout': -1})
def testZypperPurgeRepository_3Repository(self):
handler = Zypper()
called = []
case = []
repository_list_string = [
"""# | Alias | Name | Enabled | Refresh
--+--------+--------+---------+--------
1 | re6st | re6st | Yes | Yes
2 | slapos | slapos | Yes | Yes
3 | suse | suse | Yes | Yes
""",
"""# | Alias | Name | Enabled | Refresh
--+--------+--------+---------+--------
1 | slapos | slapos | Yes | Yes
2 | suse | suse | Yes | Yes
""",
"""# | Alias | Name | Enabled | Refresh
--+--------+--------+---------+--------
1 | suse | suse | Yes | Yes
""",
"""# | Alias | Name | Enabled | Refresh
--+--------+--------+---------+--------
"""]
def dummy_caller(*args, **kwargs):
called.append((args, kwargs))
if args[0] == ['zypper', 'rr', '1']:
case.append(1)
return repository_list_string[len(case)], ""
handler.purgeRepository(dummy_caller)
self.assertEquals(len(called), 7)
command = called[0][0][0]
extra = called[0][1]
self.assertEquals(['zypper', 'lr'],
command)
self.assertEquals(extra, {'stderr': -1, 'stdout': -1})
command = called[1][0][0]
extra = called[1][1]
self.assertEquals(['zypper', 'rr', '1'],
command)
self.assertEquals(extra, {'stdout': None})
command = called[2][0][0]
extra = called[2][1]
self.assertEquals(['zypper', 'lr'],
command)
self.assertEquals(extra, {'stderr': -1, 'stdout': -1})
command = called[3][0][0]
extra = called[3][1]
self.assertEquals(['zypper', 'rr', '1'],
command)
self.assertEquals(extra, {'stdout': None})
command = called[4][0][0]
extra = called[4][1]
self.assertEquals(['zypper', 'lr'],
command)
self.assertEquals(extra, {'stderr': -1, 'stdout': -1})
command = called[5][0][0]
extra = called[5][1]
self.assertEquals(['zypper', 'rr', '1'],
command)
self.assertEquals(extra, {'stdout': None})
command = called[6][0][0]
extra = called[6][1]
self.assertEquals(['zypper', 'lr'],
command)
self.assertEquals(extra, {'stderr': -1, 'stdout': -1})
def purgeRepository(self, caller):
listing, err = caller(['zypper', 'lr'],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
while listing.count('\n') > 2:
output, err = caller(['zypper', 'rr', '1'], stdout=None)
listing, err = caller(['zypper', 'lr'],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
def testZypperAddRepository(self):
handler = Zypper()
called = []
def dummy_caller(*args, **kwargs):
called.append((args, kwargs))
handler.addRepository(dummy_caller, "http://...", "slapos")
handler.addRepository(dummy_caller, "http://.../2", "re6st-unsafe")
self.assertEquals(len(called), 2)
command = called[0][0][0]
extra = called[0][1]
self.assertEquals(['zypper', 'ar', '-fc', 'http://...', 'slapos'],
command)
self.assertEquals(extra, {'stdout': None})
command = called[1][0][0]
extra = called[1][1]
self.assertEquals(['zypper', 'ar', '-fc', '--no-gpgcheck' ,
'http://.../2', 're6st-unsafe'], command)
self.assertEquals(extra, {'stdout': None})
# def addKey(self, caller, url, alias):
# """ Add gpg or key """
# raise NotImplementedError("Not implemented for this distribution")
#
def testZypperUpdateRepository(self):
handler = Zypper()
called = []
def dummy_caller(*args, **kwargs):
called.append((args, kwargs))
handler.updateRepository(dummy_caller)
self.assertEquals(len(called), 1)
command = called[0][0][0]
extra = called[0][1]
self.assertEquals(['zypper', '--gpg-auto-import-keys', 'in', '-Dly'],
command)
self.assertEquals(extra, {'stdout': None})
# def isUpgradable(self, caller, name):
# output, err = caller(['zypper', '--gpg-auto-import-keys', 'up', '-ly'],
# stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# for line in output.splitlines():
# if line.startswith("'%s' is already installed." % name):
# return False
# return True
#
def testZypperInstallSoftwareList(self):
handler = Zypper()
called = []
def dummy_caller(*args, **kwargs):
called.append((args, kwargs))
handler.installSoftwareList(dummy_caller, ["slapos", "re6st"])
self.assertEquals(len(called), 2)
command = called[0][0][0]
extra = called[0][1]
self.assertEquals(['zypper', '--gpg-auto-import-keys', 'in', '-Dly'],
command)
self.assertEquals(extra, {'stdout': None})
command = called[1][0][0]
extra = called[1][1]
self.assertEquals(command,
['zypper', '--gpg-auto-import-keys', 'in', '-ly', 'slapos', 're6st'])
self.assertEquals(extra, {'stdout': None})
def testZypperUpdateSoftware(self):
handler = Zypper()
called = []
def dummy_caller(*args, **kwargs):
called.append((args, kwargs))
handler.updateSoftware(dummy_caller)
self.assertEquals(len(called), 1)
command = called[0][0][0]
extra = called[0][1]
self.assertEquals(['zypper', '--gpg-auto-import-keys', 'up', '-ly'],
command)
self.assertEquals(extra, {'stdout': None})
def testZypperUpdateSystem(self):
handler = Zypper()
called = []
def dummy_caller(*args, **kwargs):
called.append((args, kwargs))
handler.updateSystem(dummy_caller)
self.assertEquals(len(called), 1)
command = called[0][0][0]
extra = called[0][1]
self.assertEquals(['zypper', '--gpg-auto-import-keys', 'dup', '-ly'],
command)
self.assertEquals(extra, {'stdout': None})
# -*- coding: utf-8 -*-
##############################################################################
#
# Copyright (c) 2012-2014 Vifib SARL and Contributors.
# All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly advised to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import datetime
from slapos.package.promise import core
from slapos.package.test.base import CONFIGURATION_FILE, UPGRADE_KEY, \
_fake_call, UPGRADE_KEY_WITHOUT_KEY_LIST
from slapos.package.signature import NetworkCache
from optparse import Values
import os
import unittest
def _fake_update(self, repository_list=[], package_list=[], key_list=[]):
self._fake_update_call = (repository_list, package_list, key_list)
def fake_debian_getOSSignature():
return "debian+++7.4+++"
def _log_message_list(self, message):
if getattr(self, "_message_list", None) is None:
self._message_list = []
self._message_list.append(message)
class TestCoreCase(unittest.TestCase):
def setUp(self):
core.Promise._call = _fake_call
core.Promise.log = _log_message_list
self.original_promise_update = core.Promise.update
core.Promise.update = _fake_update
self.original_network_cache_download = NetworkCache.download
# Patch Download
def _fake_signature_download(self, path, *args, **kwargs):
with open(path, 'w') as upgrade_signature:
upgrade_signature.write(UPGRADE_KEY)
return True
NetworkCache.download = _fake_signature_download
self.config_dict = {
"slapos_configuration": self._createConfigurationFile(),
"srv_file": "/tmp/test_base_promise_slapupdate",
"dry_run": False,
"verbose": False
}
def tearDown(self):
NetworkCache.download = self.original_network_cache_download
core.Promise.update = self.original_promise_update
def _createConfigurationFile(self):
with open("/tmp/test_base_promise_configuration.cfg", "w") as configuration_file:
configuration_file.write(CONFIGURATION_FILE)
return "/tmp/test_base_promise_configuration.cfg"
def testCheckConsistencyFirstTime(self):
modified_config_dict = self.config_dict.copy()
modified_config_dict["srv_file"] = "/tmp/test_promise_core_which_do_not_exist"
promise = core.Promise(Values(modified_config_dict))
self.assertEquals(promise.checkConsistency(), False)
self.assertEquals(promise._message_list[-2], "Last reboot : None")
self.assertEquals(promise._message_list[-1], "Last upgrade : None")
def testCheckConsistencyAlreadyUpgraded(self):
modified_config_dict = self.config_dict.copy()
slapupdate_path = "/tmp/test_promise_core_testCheckConsistencyAlreadyUpgraded"
with open(slapupdate_path, 'w') as slapupdate:
slapupdate.write("""[system]
upgrade = 2014-11-11
reboot = 2015-11-11
""")
modified_config_dict["srv_file"] = slapupdate_path
promise = core.Promise(Values(modified_config_dict))
self.assertEquals(promise.checkConsistency(), True)
self.assertEquals(promise._message_list[-2], "Your system is up to date")
self.assertEquals(promise._message_list[-1], 'No need to reboot.')
def testCheckConsistencyRebootIsRequired(self):
modified_config_dict = self.config_dict.copy()
slapupdate_path = "/tmp/test_promise_core_testCheckConsistencyRebootIsRequired"
with open(slapupdate_path, 'w') as slapupdate:
slapupdate.write("""[system]
upgrade = 2000-11-11
reboot = 2009-11-11
""")
modified_config_dict["srv_file"] = slapupdate_path
promise = core.Promise(Values(modified_config_dict))
self.assertEquals(promise.checkConsistency(), False)
self.assertEquals(promise._message_list[-1], "Rebooting is required.")
def testCheckConsistencyUpgradeIsRequired(self):
modified_config_dict = self.config_dict.copy()
slapupdate_path = "/tmp/testCheckConsistencyUpgradeIsRequired"
with open(slapupdate_path, 'w') as slapupdate:
slapupdate.write("""[system]
upgrade = 2000-11-11
reboot = 2100-11-11
""")
modified_config_dict["srv_file"] = slapupdate_path
promise = core.Promise(Values(modified_config_dict))
self.assertEquals(promise.checkConsistency(), False)
self.assertEquals(promise._message_list[-1], 'No need to reboot.')
self.assertEquals(promise._message_list[-2], "Upgrade is required.")
def testCheckConsistencyUpgradeInFuture(self):
# Patch Download
def _fake_signature_download(self, path, *args, **kwargs):
copy_of_upgrade_key = UPGRADE_KEY
with open(path, 'w') as upgrade_signature:
modified_upgrade_key = copy_of_upgrade_key.replace(
"upgrade = 2014-06-04",
"upgrade = 2100-01-01")
upgrade_signature.write(modified_upgrade_key)
return True
NetworkCache.download = _fake_signature_download
modified_config_dict = self.config_dict.copy()
modified_config_dict["srv_file"] = "/tmp/test_promise_core_which_do_not_exist"
promise = core.Promise(Values(modified_config_dict))
self.assertEquals(promise.checkConsistency(), True)
self.assertEquals(promise._message_list[-1], 'Upgrade will happens on 2100-01-01')
def testFixConsistency(self):
modified_config_dict = self.config_dict.copy()
def _fake_update(self, repository_list=[], package_list=[], key_list=[]):
self._fake_update_call = (repository_list, package_list, key_list)
slapupdate_path = "/tmp/testFixConsistencyUpgrade"
with open(slapupdate_path, 'w') as slapupdate:
slapupdate.write("""[system]
upgrade = 2000-11-11
reboot = 2100-11-11
""")
modified_config_dict["srv_file"] = slapupdate_path
promise = core.Promise(Values(modified_config_dict))
self.assertEquals(promise.checkConsistency(fixit=1), True)
expected_message_list = [
'Expected Reboot early them 2011-10-10',
'Expected Upgrade early them 2014-06-04',
'Last reboot : 2100-11-11',
'Last upgrade : 2000-11-11',
'Upgrade is required.',
'Retrying after fixConsistency....\n\n',
'Expected Reboot early them 2011-10-10',
'Expected Upgrade early them 2014-06-04',
'Last reboot : 2100-11-11',
'Last upgrade : %s' % datetime.datetime.today().strftime("%Y-%m-%d"),
'Your system is up to date',
'No need to reboot.',
'No need to reboot.']
self.assertEquals(promise._message_list, expected_message_list)
repository_list = [
('main', 'http://ftp.fr.debian.org/debian/ wheezy main'),
('main-src', 'http://ftp.fr.debian.org/debian/ wheezy main'),
('update', 'http://ftp.fr.debian.org/debian/ wheezy-updates main'),
('update-src', 'http://ftp.fr.debian.org/debian/ wheezy-updates main'),
('slapos', 'http://download.opensuse.org/repositories/home:/VIFIBnexedi:/branches:/home:/VIFIBnexedi/Debian_7.0/ ./'),
('re6stnet', 'http://git.erp5.org/dist/deb ./')]
filter_package_list = ['ntp', 'openvpn', 'slapos.node', 're6stnet']
key_list = [
('slapos', 'http://git.erp5.org/gitweb/slapos.package.git/blob_plain/HEAD:/debian-preseed/slapos.openbuildservice.key'),
('re6st', 'http://git.erp5.org/gitweb/slapos.package.git/blob_plain/HEAD:/debian-preseed/git.erp5.org.key')]
self.assertEquals(promise._fake_update_call[0], repository_list)
self.assertEquals(promise._fake_update_call[1], filter_package_list)
self.assertEquals(promise._fake_update_call[2], key_list)
def testFixConsistency_without_key_list(self):
modified_config_dict = self.config_dict.copy()
# Patch Download
def _fake_signature_download(self, path, *args, **kwargs):
with open(path, 'w') as upgrade_signature:
modified_upgrade_key = UPGRADE_KEY_WITHOUT_KEY_LIST.replace(
"upgrade = 2014-06-04",
"upgrade = %s" % datetime.datetime.today().strftime("%Y-%m-%d"))
upgrade_signature.write(modified_upgrade_key)
return True
NetworkCache.download = _fake_signature_download
def _fake_update(self, repository_list=[], package_list=[], key_list=[]):
self._fake_update_call = (repository_list, package_list, key_list)
slapupdate_path = "/tmp/testFixConsistencyUpgrade"
with open(slapupdate_path, 'w') as slapupdate:
slapupdate.write("""[system]
upgrade = 1999-11-11
reboot = 2100-11-11
""")
modified_config_dict["srv_file"] = slapupdate_path
promise = core.Promise(Values(modified_config_dict))
self.assertEquals(promise.checkConsistency(fixit=1), True)
self.maxDiff = None
_today = datetime.datetime.today().strftime("%Y-%m-%d")
expected_message_list = [
'Expected Reboot early them 2011-10-10',
'Expected Upgrade early them %s' % _today,
'Last reboot : 2100-11-11',
'Last upgrade : 1999-11-11',
'Upgrade is required.',
'Retrying after fixConsistency....\n\n',
'Expected Reboot early them 2011-10-10',
'Expected Upgrade early them %s' % _today,
'Last reboot : 2100-11-11',
'Last upgrade : %s' % _today,
'Your system is up to date',
'No need to reboot.',
'No need to reboot.']
self.assertEquals(promise._message_list, expected_message_list)
repository_list = [
('main', 'http://ftp.fr.debian.org/debian/ wheezy main'),
('main-src', 'http://ftp.fr.debian.org/debian/ wheezy main'),
('update', 'http://ftp.fr.debian.org/debian/ wheezy-updates main'),
('update-src', 'http://ftp.fr.debian.org/debian/ wheezy-updates main'),
('slapos', 'http://download.opensuse.org/repositories/home:/VIFIBnexedi:/branches:/home:/VIFIBnexedi/Debian_7.0/ ./'),
('re6stnet', 'http://git.erp5.org/dist/deb ./')]
filter_package_list = ['ntp', 'openvpn', 'slapos.node', 're6stnet']
key_list = []
self.assertEquals(promise._fake_update_call[0], repository_list)
self.assertEquals(promise._fake_update_call[1], filter_package_list)
self.assertEquals(promise._fake_update_call[2], key_list)
# -*- coding: utf-8 -*-
##############################################################################
#
# Copyright (c) 2012-2014 Vifib SARL and Contributors.
# All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly advised to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from slapos.package.promise import hostname
import os
import unittest
def _fake_call(self, *args, **kw):
self.last_call = (args, kw)
class testPromiseHostnameCase(unittest.TestCase):
def setUp(self):
hostname.Promise._call = _fake_call
def testHostnameCheckConsistency(self):
promise = hostname.Promise()
promise.configuration_file_path = "/tmp/hostname_for_test"
self.assertFalse(promise.checkConsistency(computer_id="TESTING"))
def testHostnameFixConsistency(self):
hostname.Promise._call = _fake_call
promise = hostname.Promise()
promise.configuration_file_path = "/tmp/hostname_for_test_fix"
if os.path.exists(promise.configuration_file_path):
os.remove(promise.configuration_file_path)
self.assertFalse(promise.checkConsistency(computer_id="TESTING"))
self.assertTrue(promise.fixConsistency(computer_id="TESTING"))
self.assertEqual(promise.last_call,
((['hostname', '-F', '/tmp/hostname_for_test_fix'],), {})
)
# -*- coding: utf-8 -*-
##############################################################################
#
# Copyright (c) 2012-2014 Vifib SARL and Contributors.
# All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly advised to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from slapos.package.promise import limitconf
import os
import pkg_resources
import unittest
def _fake_call(self, *args, **kw):
self.last_call = (args, kw)
class testLimitConfTestCase(unittest.TestCase):
def setUp(self):
limitconf.Promise._call = _fake_call
if os.path.exists("/tmp/test_promise_testing_limits.conf"):
os.remove("/tmp/test_promise_testing_limits.conf")
def testLimitConfCheckConsistency(self):
promise = limitconf.Promise()
promise.configuration_file_path = "/tmp/test_promise_testing_limits.conf"
self.assertFalse(promise.checkConsistency())
open(promise.configuration_file_path, "w").write("# Something")
self.assertFalse(promise.checkConsistency())
def testLimitConfFixConsistency(self):
limitconf.Promise._call = _fake_call
promise = limitconf.Promise()
promise.configuration_file_path = "/tmp/test_promise_testing_limits.conf"
self.assertFalse(promise.checkConsistency())
self.assertTrue(promise.fixConsistency())
self.assertTrue(promise.checkConsistency())
self.assertTrue(os.path.exists(promise.configuration_file_path))
limit_content = open(promise.configuration_file_path, "r").read()
self.assertEquals(limit_content, pkg_resources.resource_stream(limitconf.__name__,
'template/limits.conf.in').read())
# -*- coding: utf-8 -*-
##############################################################################
#
# Copyright (c) 2012-2014 Vifib SARL and Contributors.
# All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly advised to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from slapos.package.promise import slappkgcron
import os
import pkg_resources
import unittest
EXPECTED_CRON_CONTENT = """# BEWARE: This file will be automatically regenerated on every run of
# slappkg
SHELL=/bin/sh
PATH=/usr/bin:/usr/sbin:/sbin:/bin:/usr/lib/news/bin:/usr/local/bin
MAILTO=root
# This file expects that
0 1 * * * root slappkg-update --self-update --slapos-configuration=/tmp/SOMEFILENAME -v >> /opt/slapos/log/slappkg-update.log 2>&1
10 * * * * root slappkg-update --wait --slapos-configuration=/tmp/SOMEFILENAME -v >> /opt/slapos/log/slappkg-update.log 2>&1"""
def _fake_call(self, *args, **kw):
self.last_call = (args, kw)
class testSlappkgCronTestCase(unittest.TestCase):
def setUp(self):
self.maxDiff = None
self.configuration_file_path = "/tmp/test_promise_testing_slappkg.cron"
slappkgcron.Promise._call = _fake_call
if os.path.exists(self.configuration_file_path):
os.remove(self.configuration_file_path)
def testSlappkgCronCheckConsistency(self):
promise = slappkgcron.Promise()
promise.configuration_file_path = self.configuration_file_path
promise.config.slapos_configuration = "/tmp/SOMEFILENAME"
self.assertFalse(promise.checkConsistency())
open(promise.configuration_file_path, "w").write("# Something")
self.assertFalse(promise.checkConsistency())
def testSlappkgCronFixConsistency(self):
promise = slappkgcron.Promise()
promise.configuration_file_path = self.configuration_file_path
promise.config.slapos_configuration = "/tmp/SOMEFILENAME"
self.assertFalse(promise.checkConsistency())
self.assertTrue(promise.fixConsistency())
self.assertTrue(promise.checkConsistency())
self.assertTrue(os.path.exists(promise.configuration_file_path))
cron_content = open(promise.configuration_file_path, "r").read()
self.assertEquals(cron_content.splitlines(),
EXPECTED_CRON_CONTENT.splitlines())
# -*- coding: utf-8 -*-
##############################################################################
#
# Copyright (c) 2012-2014 Vifib SARL and Contributors.
# All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly advised to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from slapos.package import update, signature
from slapos.libnetworkcache import NetworkcacheClient
from optparse import Values
import slapos.signature
import time
import difflib
import tempfile
import unittest
import os
SIGNATURE = "\n-----BEGIN CERTIFICATE-----\nMIIB8DCCAVmgAwIBAgIJAPFf61p8y809MA0GCSqGSIb3DQEBBQUAMBAxDjAMBgNV\nBAMMBUNPTVAtMCAXDTE0MDIxNzE2NDgxN1oYDzIxMTQwMTI0MTY0ODE3WjAQMQ4w\nDAYDVQQDDAVDT01QLTCBnzANBgkqhkiG9w0BAQEFAAOBjQAwgYkCgYEAsiqCyuv1\nHO9FmtwnMbEa1/u8Dn7T0k7hVKYXVQYof+59Ltbb3cA3nLjFSJDr/wQT6N89MccS\nPneRzkWqZKL06Kmj+N+XJfRyVaTz1qQtNzjdbYkO6RgQq+fvq2CO0+PSnL6NttLU\n/a9nQMcVm7wZ8kmY+AG5LbVo8lmxDD16Wq0CAwEAAaNQME4wHQYDVR0OBBYEFEVi\nYyWHF3W7/O4NaTjn4lElLpp7MB8GA1UdIwQYMBaAFEViYyWHF3W7/O4NaTjn4lEl\nLpp7MAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQEFBQADgYEAgIPGoxhUa16AgjZx\nJr1kUrs8Fg3ig8eRFQlBSLYfANIUxcQ2ScFAkmsvwXY3Md7uaSvMJsEl2jcjdmdi\neSreNkx85j9GtMLY/2cv0kF4yAQNRtibtDkbg6fRNkmUopDosJNVf79l1GKX8JFL\nzZBOFdOaLYY/6dLRwiTUKHU6su8=\n-----END CERTIFICATE-----"
BASE_UPDATE_CFG_DATA = """
[networkcache]
download-binary-cache-url = http://www.shacache.org/shacache
download-cache-url = https://www.shacache.org/shacache
download-binary-dir-url = http://www.shacache.org/shadir
signature-certificate-list =
-----BEGIN CERTIFICATE-----
MIIB8DCCAVmgAwIBAgIJAPFf61p8y809MA0GCSqGSIb3DQEBBQUAMBAxDjAMBgNV
BAMMBUNPTVAtMCAXDTE0MDIxNzE2NDgxN1oYDzIxMTQwMTI0MTY0ODE3WjAQMQ4w
DAYDVQQDDAVDT01QLTCBnzANBgkqhkiG9w0BAQEFAAOBjQAwgYkCgYEAsiqCyuv1
HO9FmtwnMbEa1/u8Dn7T0k7hVKYXVQYof+59Ltbb3cA3nLjFSJDr/wQT6N89MccS
PneRzkWqZKL06Kmj+N+XJfRyVaTz1qQtNzjdbYkO6RgQq+fvq2CO0+PSnL6NttLU
/a9nQMcVm7wZ8kmY+AG5LbVo8lmxDD16Wq0CAwEAAaNQME4wHQYDVR0OBBYEFEVi
YyWHF3W7/O4NaTjn4lElLpp7MB8GA1UdIwQYMBaAFEViYyWHF3W7/O4NaTjn4lEl
Lpp7MAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQEFBQADgYEAgIPGoxhUa16AgjZx
Jr1kUrs8Fg3ig8eRFQlBSLYfANIUxcQ2ScFAkmsvwXY3Md7uaSvMJsEl2jcjdmdi
eSreNkx85j9GtMLY/2cv0kF4yAQNRtibtDkbg6fRNkmUopDosJNVf79l1GKX8JFL
zZBOFdOaLYY/6dLRwiTUKHU6su8=
-----END CERTIFICATE-----
"""
UPDATE_CFG_DATA = """
[slapupdate]
upgrade_key = slapos-upgrade-testing-key-with-config-file-invalid
""" + BASE_UPDATE_CFG_DATA
UPDATE_UPLOAD_CFG_DATA = """
[slapupdate]
upgrade_key = slapos-upgrade-testing-key-with-config-file
""" + BASE_UPDATE_CFG_DATA
VALID_UPDATE_CFG_DATA = """
[slapupdate]
upgrade_key = 'slapos-upgrade-testing-key-with-config-file-valid'
""" + BASE_UPDATE_CFG_DATA
UPDATE_CFG_WITH_UPLOAD_DATA = UPDATE_CFG_DATA + """
signature_private_key_file = /etc/opt/slapos/signature.key
signature_certificate_file = /etc/opt/slapos/signature.cert
upload-cache-url = https://www.shacache.org/shacache
shacache-cert-file = /etc/opt/slapos/shacache.crt
shacache-key-file = /etc/opt/slapos/shacache.key
upload-dir-url = https://www.shacache.org/shadir
shadir-cert-file = /etc/opt/slapos/shacache.crt
shadir-key-file = /etc/opt/slapos/shacache.key
"""
UPGRADE_KEY = """[debian-default]
repository-list =
main = http://ftp.fr.debian.org/debian/ wheezy main
main-src = http://ftp.fr.debian.org/debian/ wheezy main
update = http://ftp.fr.debian.org/debian/ wheezy-updates main
update-src = http://ftp.fr.debian.org/debian/ wheezy-updates main
slapos = http://download.opensuse.org/repositories/home:/VIFIBnexedi:/branches:/home:/VIFIBnexedi/Debian_7.0/ ./
re6stnet = http://git.erp5.org/dist/deb ./
key-list =
slapos = http://git.erp5.org/gitweb/slapos.package.git/blob_plain/HEAD:/debian-preseed/slapos.openbuildservice.key
re6st = http://git.erp5.org/gitweb/slapos.package.git/blob_plain/HEAD:/debian-preseed/git.erp5.org.key
filter-package-list =
ntp
openvpn
slapos.node
re6stnet
filter-promise-list =
core
signature-list =
debian+++jessie/sid+++
debian+++7.4+++
debian+++7.5+++
debian+++7.3+++
debian+++7+++
[opensuse-legacy]
repository-list =
suse = http://download.opensuse.org/distribution/12.1/repo/oss/
slapos = http://download.opensuse.org/repositories/home:/VIFIBnexedi:/branches:/home:/VIFIBnexedi/openSUSE_12.1/
re6st = http://git.erp5.org/dist/rpm
key-list =
filter-promise-list =
core
filter-package-list =
ntp
openvpn
slapos.node
re6stnet
signature-list =
opensuse+++12.1+++x86_64
[system]
reboot = 2011-10-10
upgrade = 2014-06-04
"""
def _fake_upload(self, *args, **kwargs):
return True
class NetworkCacheTestCase(unittest.TestCase):
def setUp(self):
self.original_networkcache_upload = NetworkcacheClient.upload
NetworkcacheClient.upload = _fake_upload
self.config_dict = {
"slapos_configuration": self._createConfigurationFile(),
"srv_file": "/tmp/test_base_promise_slapupdate",
"dry_run": False,
"verbose": False
}
def tearDown(self):
NetworkcacheClient.upload = self.original_networkcache_upload
def _createConfigurationFile(self):
with open("/tmp/test_signature_000000_configuration.cfg", "w") as configuration_file:
configuration_file.write(VALID_UPDATE_CFG_DATA)
return "/tmp/test_signature_000000_configuration.cfg"
def test_basic_configuration(self):
info, self.configuration_file_path = tempfile.mkstemp()
open(self.configuration_file_path, 'w').write(UPDATE_CFG_DATA)
shacache = signature.NetworkCache(self.configuration_file_path)
self.assertEqual(shacache.download_binary_cache_url,
"http://www.shacache.org/shacache")
self.assertEqual(shacache.download_cache_url,
"https://www.shacache.org/shacache")
self.assertEqual(shacache.download_binary_dir_url,
"http://www.shacache.org/shadir")
self.assertEqual(shacache.signature_certificate_list,
SIGNATURE)
self.assertEqual(shacache.directory_key,
'slapos-upgrade-testing-key-with-config-file-invalid')
# Check keys that don't exist
# Not mandatory
self.assertEqual(shacache.dir_url , None)
self.assertEqual(shacache.cache_url , None)
self.assertEqual(shacache.signature_private_key_file , None)
self.assertEqual(shacache.shacache_cert_file , None)
self.assertEqual(shacache.shacache_key_file , None)
self.assertEqual(shacache.shadir_cert_file , None)
self.assertEqual(shacache.shadir_key_file , None)
def test_basic_configuration_with_upload(self):
info, self.configuration_file_path = tempfile.mkstemp()
open(self.configuration_file_path, 'w').write(UPDATE_CFG_WITH_UPLOAD_DATA)
shacache = signature.NetworkCache(self.configuration_file_path)
self.assertEqual(shacache.download_binary_cache_url,
"http://www.shacache.org/shacache")
self.assertEqual(shacache.download_cache_url,
"https://www.shacache.org/shacache")
self.assertEqual(shacache.download_binary_dir_url,
"http://www.shacache.org/shadir")
self.assertEqual(shacache.signature_certificate_list,
SIGNATURE)
self.assertEqual(shacache.directory_key,
'slapos-upgrade-testing-key-with-config-file-invalid')
# Check keys that don't exist
# Not mandatory
self.assertEqual(shacache.dir_url , 'https://www.shacache.org/shadir')
self.assertEqual(shacache.cache_url , 'https://www.shacache.org/shacache')
self.assertEqual(shacache.shacache_cert_file , '/etc/opt/slapos/shacache.crt')
self.assertEqual(shacache.shacache_key_file , '/etc/opt/slapos/shacache.key')
self.assertEqual(shacache.shadir_cert_file , '/etc/opt/slapos/shacache.crt')
self.assertEqual(shacache.shadir_key_file , '/etc/opt/slapos/shacache.key')
self.assertEqual(shacache.signature_private_key_file ,'/etc/opt/slapos/signature.key')
def test_configuration_file_dont_exist(self):
self.assertRaises(ValueError, signature.NetworkCache,
"/abc/123")
def test_download_not_existing_signature_from_cache(self):
info, path = tempfile.mkstemp()
info, self.configuration_file_path = tempfile.mkstemp()
open(self.configuration_file_path, 'w').write(UPDATE_CFG_DATA)
shacache = signature.NetworkCache(self.configuration_file_path)
self.assertEquals(False,
shacache.download(path=path, required_key_list=['timestamp'],
strategy=signature.strategy))
self.assertEquals("", open(path, 'r').read())
def test_download_existing_from_cache(self):
info, path = tempfile.mkstemp()
info, self.configuration_file_path = tempfile.mkstemp()
open(self.configuration_file_path, 'w').write(VALID_UPDATE_CFG_DATA)
shacache = signature.NetworkCache(self.configuration_file_path)
shacache.download(path=path,
required_key_list=['timestamp'],
strategy=signature.strategy)
self.maxDiff = None
self.assertEquals(UPGRADE_KEY.splitlines(),
open(path, 'r').read().splitlines())
def test_upload_to_cache(self):
info, path = tempfile.mkstemp()
info, _fake_signature_path = tempfile.mkstemp()
info, self.configuration_file_path = tempfile.mkstemp()
signature_certificate_file = "/tmp/signature_certificate_file_demo_test"
if os.path.exists(signature_certificate_file):
os.remove(signature_certificate_file)
signature_private_key_file = "/tmp/signature_private_key_file_demo_test"
if os.path.exists(signature_private_key_file):
os.remove(signature_private_key_file)
slapos.signature.generateCertificate(signature_certificate_file,
signature_private_key_file,
"COMP-123A")
configuration_content = UPDATE_UPLOAD_CFG_DATA + """
signature_private_key_file = %(signature_private_key_file)s
signature_certificate_file = %(signature_certificate_file)s
upload-cache-url = https://www.shacache.org/shacache
shacache-cert-file = %(tempfile)s
shacache-key-file = %(tempfile)s
upload-dir-url = https://www.shacache.org/shadir
shadir-cert-file = %(tempfile)s
shadir-key-file = %(tempfile)s
""" % {"tempfile": _fake_signature_path,
"signature_certificate_file": signature_certificate_file,
"signature_private_key_file": signature_private_key_file }
open(self.configuration_file_path, 'w').write(
configuration_content)
open(_fake_signature_path, "w").write("# XXX ...")
shacache = signature.NetworkCache(self.configuration_file_path)
metadata_dict = {'timestamp': time.time(),}
shacache.upload(path=path, metadata_dict=metadata_dict)
def test_signature_strategy(self):
entry_list = [
{'timestamp': 123824.0},
{'timestamp': 12345.0},
{'timestamp': 13345.0},
{'timestamp': 12344.0},
{'timestamp': 12045.0},
]
self.assertEquals(
signature.strategy(entry_list),
{'timestamp': 123824.0})
def testLoadSignature(self):
sig = signature.Signature(Values(self.config_dict))
self.assertNotEquals(getattr(sig, "current_state", "COUSOUS!!!"),
"COUSCOUS!!!")
sig.load()
self.assertNotEquals(sig.current_state, None)
self.assertEquals(sig.reboot.strftime("%Y-%m-%d"), "2011-10-10" )
self.assertEquals(sig.upgrade.strftime("%Y-%m-%d"), "2014-06-04" )
self.assertEquals(sig.last_reboot, None )
self.assertEquals(sig.last_upgrade, None )
def testLoadSignatureWithLast(self):
modified_config_dict = self.config_dict.copy()
info, slapupdate_path = tempfile.mkstemp()
with open(slapupdate_path, "w") as slapupdate_file:
slapupdate_file.write("""
[system]
reboot = 2010-10-10
upgrade = 2010-06-04
""")
modified_config_dict["srv_file"] = slapupdate_path
sig = signature.Signature(Values(modified_config_dict))
self.assertNotEquals(getattr(sig, "current_state", "COUSOUS!!!"),
"COUSCOUS!!!")
sig.load()
self.assertNotEquals(sig.current_state, None)
self.assertEquals(sig.reboot.strftime("%Y-%m-%d"), "2011-10-10" )
self.assertEquals(sig.upgrade.strftime("%Y-%m-%d"), "2014-06-04" )
self.assertEquals(sig.last_reboot.strftime("%Y-%m-%d"), "2010-10-10" )
self.assertEquals(sig.last_upgrade.strftime("%Y-%m-%d"), "2010-06-04" )
def testSignatureUpdate(self):
modified_config_dict = self.config_dict.copy()
info, slapupdate_path = tempfile.mkstemp()
with open(slapupdate_path, "w") as slapupdate_file:
slapupdate_file.write("""
[system]
reboot = 2010-10-10
upgrade = 2010-06-04
""")
modified_config_dict["srv_file"] = slapupdate_path
sig = signature.Signature(Values(modified_config_dict))
sig.load()
self.assertNotEquals(sig.current_state, None)
self.assertEquals(sig.reboot.strftime("%Y-%m-%d"), "2011-10-10" )
self.assertEquals(sig.upgrade.strftime("%Y-%m-%d"), "2014-06-04" )
self.assertEquals(sig.last_reboot.strftime("%Y-%m-%d"), "2010-10-10" )
self.assertEquals(sig.last_upgrade.strftime("%Y-%m-%d"), "2010-06-04" )
sig.update(reboot="2014-07-01")
self.assertEquals(open(slapupdate_path, "r").read().strip(), """[system]
reboot = 2014-07-01
upgrade = 2010-06-04""")
sig.update(upgrade="2014-07-03")
self.assertEquals(open(slapupdate_path, "r").read().strip(), """[system]
reboot = 2014-07-01
upgrade = 2014-07-03""")
sig.update(reboot="2014-08-10", upgrade="2014-08-11")
self.assertEquals(open(slapupdate_path, "r").read().strip(), """[system]
reboot = 2014-08-10
upgrade = 2014-08-11""")
#!/usr/bin/python
# -*- coding: utf-8 -*-
# vim: set et sts=2:
##############################################################################
#
# Copyright (c) 2012 Vifib SARL and Contributors.
# All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly advised to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import ConfigParser
import logging
from optparse import OptionParser, Option
import os
import sys
import tempfile
import urllib2
# create console handler and set level to debug
ch = logging.StreamHandler()
ch.setLevel(logging.WARNING)
# create formatter
formatter = logging.Formatter('%(name)s - %(levelname)s - %(message)s')
# add formatter to ch
ch.setFormatter(formatter)
class Parser(OptionParser):
"""
Parse all arguments.
"""
def __init__(self, usage=None, version=None):
"""
Initialize all options possibles.
"""
OptionParser.__init__(self, usage=usage, version=version,
option_list=[
Option("--slapos-configuration",
help="path to slapos configuration directory",
default='/etc/opt/slapos/',
type=str),
Option("--slapos-cron",
help="path to slapos cron file",
default='/etc/cron.d/slapos-node',
type=str),
Option("--check-upload",
help="Check if upload parameters are ok (do not check certificates)",
default=False,
action="store_true"),
Option("--test-agent",
help="Check if parameters are good for a test agent",
default=False,
action="store_true"),
Option("-v","--verbose",
default=False,
action="store_true",
help="Verbose output."),
Option("-n", "--dry-run",
help="Simulate the execution steps",
default=False,
action="store_true"),
])
def check_args(self):
"""
Check arguments
"""
(options, args) = self.parse_args()
if options.test_agent:
options.check_upload = True
return options
def get_slapos_conf_example():
"""
Get slapos.cfg.example and return its path
"""
register_server_url = "http://git.erp5.org/gitweb/slapos.core.git/blob_plain/HEAD:/slapos.cfg.example"
request = urllib2.Request(register_server_url)
url = urllib2.urlopen(request)
page = url.read()
info, path = tempfile.mkstemp()
slapos_cfg_example = open(path,'w')
slapos_cfg_example.write(page)
slapos_cfg_example.close()
return path
def check_networkcache(config, logger, configuration_parser,
configuration_example_parser):
"""
Check network cache download
"""
section = "networkcache"
if configuration_parser.has_section(section):
configuration_example_dict = dict(configuration_example_parser.items(section))
configuration_dict = dict(configuration_parser.items(section))
for key in configuration_example_dict:
try:
if not configuration_dict[key] == configuration_example_dict[key]:
logger.warn("%s parameter in %s section is out of date" % (key, section))
except KeyError:
logger.warn("No %s parameter in %s section" % (key, section))
pass
if config.test_agent:
configuration_dict = dict(configuration_parser.items('slapformat'))
if int(configuration_dict['partition_amount']) < 60:
logger.warn("Partition amount is to low for a test agent. Is %s but should be at least 60"
% configuration_dict['partition_amount'] )
if config.check_upload == True:
check_networkcache_upload(config, logger, configuration_dict)
class Upload:
"""
Class used as a reference to check network cache upload
"""
def __init__(self):
self.data = {'download-binary-dir-url': 'http://www.shacache.org/shadir',
'signature_certificate_file': '/etc/slapos-cache/signature.cert',
'upload-dir-url': 'https://www.shacache.org/shadir',
'shadir-cert-file': '/etc/slapos-cache/shacache.cert',
'download-cache-url': 'https://www.shacache.org/shacache',
'upload-cache-url': 'https://www.shacache.org/shacache',
'shacache-cert-file': '/etc/slapos-cache/shacache.cert',
'upload-binary-cache-url': 'https://www.shacache.org/shacache',
'shacache-key-file': '/etc/slapos-cache/shacache.key',
'download-binary-cache-url': 'http://www.shacache.org/shacache',
'upload-binary-dir-url':'https://www.shacache.org/shadir',
'signature_private_key_file': '/etc/slapos-cache/signature.key',
'shadir-key-file': '/etc/slapos-cache/shacache.key'}
def check_networkcache_upload(config, logger, configuration_dict):
"""
Check network cache upload
"""
upload_parameters = Upload()
for key in upload_parameters.data:
try:
if not key.find("file") == -1:
file = configuration_dict[key]
if not os.path.exists(file):
logger.critical ("%s file for %s parameters does not exist "
% (file, key))
else:
logger.info ("%s parameter:%s does exists" % (key, file))
else:
if not configuration_dict[key] == upload_parameters.data[key]:
logger.warn("%s is %s sould be %s"
% (
key,
configuration_dict[key],
upload_parameters.data[key]
)
)
except KeyError:
logger.critical("No %s parameter in networkcache section "
% (key))
pass
def get_computer_name(certificate):
"""
Extract computer_id for certificate
"""
certificate = open(certificate,"r")
for line in certificate:
i = 0
if "Subject" in line:
k=line.find("COMP-")
i=line.find("/email")
certificate.close()
return line[k:i]
return -1
def check_computer_id(logger,computer_id,cert_file):
"""
Get computer id from cert_file and compare with computer_id
"""
comp_cert = get_computer_name(cert_file)
if comp_cert == "":
logger.error("Certificate file indicated is corrupted (no computer id)")
elif comp_cert == computer_id:
logger.info("Certificate and slapos.cfg define same computer id: %s"
% computer_id)
else:
logger.critical("Computers id from cerificate (%s) is different from slapos.cfg (%s)"
% (comp_cert,computer_id))
def slapos_conf_check (config):
"""
Check if slapos.cfg look good
"""
# Define logger for slapos.cfg verification
logger = logging.getLogger('Checking slapos.cfg file:')
logger.setLevel(logging.INFO)
logger.addHandler(ch)
# Load configuration file
configuration_file_path = os.path.join (config.slapos_configuration
,'slapos.cfg')
configuration_parser = ConfigParser.SafeConfigParser()
configuration_parser.read(configuration_file_path)
# Get example configuration file
slapos_cfg_example = get_slapos_conf_example()
configuration_example_parser = ConfigParser.RawConfigParser()
configuration_example_parser.read(slapos_cfg_example)
os.remove(slapos_cfg_example)
# Check sections
mandatory_sections = ["slapos","slapformat","networkcache"]
for section in mandatory_sections:
if not configuration_parser.has_section(section):
logger.critical("No %s section in slapos.cfg" % section)
mandatory_sections.remove(section)
if 'networkcache' in mandatory_sections:
mandatory_sections.remove('networkcache')
# Check if parameters for slapos and slapformat exists
for section in mandatory_sections:
configuration_dict = dict(configuration_parser.items(section))
configuration_example_dict = dict(configuration_example_parser.items(section))
for key in configuration_example_dict:
if not key in configuration_dict:
logger.critical("No %s parameter in %s section "
% (key,section))
# check if necessary files exist
elif key in ("key_file","cert_file","certificate_repository_path"):
files = configuration_dict[key]
if not os.path.exists(files):
logger.critical ("%s file for %s parameters does not exist "
% (files,key))
else:
logger.info ("%s parameter:%s does exists" % (key,files))
# check if computer id is the same in slapos.cfg and certificate
if key == "cert_file":
check_computer_id(logger,configuration_dict["computer_id"],
configuration_dict["cert_file"])
# Check networkcache
check_networkcache(config,logger,configuration_parser,
configuration_example_parser)
class CronFile:
"""
Class to analyse each cron line individualy
"""
def __init__(self):
""" Init all values"""
self.slapformat = -1
self.slapgrid_cp = -1
self.slapgrid_ur = -1
self.slapgrid_sr = -1
# cron file from slapos documentation
self.slapgrid_sr_base = "* * * * * root /opt/slapos/bin/slapos node software --verbose --logfile=/opt/slapos/log/slapos-node-software.log > /dev/null 2>&1"
self.slapgrid_cp_base = "* * * * * root /opt/slapos/bin/slapos node instance --verbose --logfile=/opt/slapos/log/slapos-node-instance.log > /dev/null 2>&1"
self.slapgrid_ur_base = "0 * * * * root /opt/slapos/bin/slapos node report --maximal_delay=3600 --verbose --logfile=/opt/slapos/log/slapos-node-report.log > /dev/null 2>&1"
self.slapformat_base = "0 * * * * root /opt/slapos/bin/slapos node format >> /opt/slapos/log/slapos-node-format.log 2>&1"
def parse(self,cron_line):
""" Parse cron line and give value to attributes """
line = cron_line.split()
if "slapos node format" in cron_line:
self.slapformat = self.compare(self.slapformat,
self.slapformat_base.split() , line)
if "slapos node report" in cron_line:
self.slapgrid_ur = self.compare(self.slapgrid_ur,
self.slapgrid_ur_base.split() , line)
if "slapos node instance" in cron_line:
self.slapgrid_cp = self.compare(self.slapgrid_cp,
self.slapgrid_cp_base.split() , line)
if "slapos node software" in cron_line:
self.slapgrid_sr = self.compare(self.slapgrid_sr,
self.slapgrid_sr_base.split() , line)
def compare(self, state, reference, cron_line):
if not state == -1:
return 2
for i in range(0, 6):
if not reference[i] == cron_line[i]:
return 0
ref = len(set(reference[6:]))
if not len(set(reference[6:]) & set(cron_line[6:])) == ref:
return 0
else: return 1
def check(self, logger):
elements = {
'slapos node format': self.slapformat,
'slapos node report': self.slapgrid_ur,
'slapos node software': self.slapgrid_sr,
'slapos node instance': self.slapgrid_cp
}
for key in elements:
if elements[key] == 0:
logger.error("Your line for '%s' command does not seem right" % key)
elif elements[key] == -1:
logger.error("No line found for '%s' command" % key)
elif elements[key] == 1:
logger.info("Line for '%s' command is good" % key)
elif elements[key] == 2:
logger.error("You have a duplicated line for '%s' command" % key)
def cron_check (config):
"""
Check cron file
"""
# Define logger for cron file verification
logger = logging.getLogger('Checking slapos-node cron file:')
logger.setLevel(logging.INFO)
logger.addHandler(ch)
cron = open(config.slapos_cron,"r")
cron_file = CronFile()
for line in cron:
if "/opt/slapos" in line and not line[0]=="#":
cron_file.parse(line)
cron_file.check(logger)
def slapos_global_check (config):
"""
Check for main files
"""
# Define logger for computer chek
logger = logging.getLogger('Checking your computer for SlapOS:')
logger.setLevel(logging.INFO)
logger.addHandler(ch)
# checking slapos.cfg
if not os.path.exists(os.path.join(config.slapos_configuration,'slapos.cfg')):
logger.critical("No slapos.cfg found in slapos configuration directory: %s"
% config.slapos_configuration )
else:
logger.info("SlapOS configuration file found")
slapos_conf_check(config)
# checking cron file
if not os.path.exists(config.slapos_cron):
logger.warn("No %s found for cron" % config.slapos_cron)
else:
logger.info("Cron file found at %s" %config.slapos_cron)
cron_check(config)
# Class containing all parameters needed for configuration
class Config:
def setConfig(self, option_dict):
"""
Set options given by parameters.
"""
# Set options parameters
for option, value in option_dict.__dict__.items():
setattr(self, option, value)
# Define logger for register
self.logger = logging.getLogger('slapos-test configuration')
self.logger.setLevel(logging.DEBUG)
# add ch to logger
self.logger.addHandler(ch)
if self.verbose:
ch.setLevel(logging.DEBUG)
def displayUserConfig(self):
self.logger.debug ("Slapos.cfg : %s" % self.slapos_configuration)
self.logger.debug ("slapos cron file: %s" % self.slapos_cron)
def main():
"""Checking computer state to run slapos"""
usage = "usage: %s [options] " % sys.argv[0]
# Parse arguments
config = Config()
config.setConfig(Parser(usage=usage).check_args())
config.displayUserConfig()
slapos_global_check(config)
sys.exit()
if __name__ == "__main__":
main()
#!/usr/bin/python
# -*- coding: utf-8 -*-
##############################################################################
#
# Copyright (c) 2012-2014 Vifib SARL and Contributors.
# All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly advised to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import datetime
import logging
from optparse import OptionParser, Option
import sys
import random
import time
from signature import Signature
from promise import promise_list
def _wait_before_run():
wait_period = random.randint(1, 50)
print "Sleep few seconds before start (%s) ..." % wait_period
time.sleep(wait_period)
class Parser(OptionParser):
"""
Parse all arguments.
"""
def __init__(self, usage=None, version=None):
"""
Initialize all options possibles.
"""
OptionParser.__init__(self, usage=usage, version=version,
option_list=[
Option("--slapos-configuration",
default='/etc/opt/update.cfg',
help="Path to slapos configuration file"),
Option("--srv-file",
default='/srv/slapupdate',
help="Server status file."),
Option("--wait",
default=False,
action="store_true",
help="Wait random seconds to call upgrade."),
Option("-v", "--verbose",
default=False,
action="store_true",
help="Verbose output."),
Option("-n", "--dry-run",
help="Simulate the execution steps",
default=False,
action="store_true"),
])
def check_args(self):
"""
Check arguments
"""
(options, args) = self.parse_args()
return options
def do_update():
"""Update computer and slapos"""
usage = "usage: %s [options] " % sys.argv[0]
config_dict = Parser(usage=usage).check_args()
if config_dict.wait:
_wait_before_run()
for promise_klass in promise_list:
# Parse arguments
upgrader = promise_klass(config_dict)
if upgrader.isApplicable():
upgrader.checkConsistency(fixit=not upgrader.config.dry_run)
sys.exit()
#!/usr/bin/python
# -*- coding: utf-8 -*-
##############################################################################
#
# Copyright (c) 2012-2014 Vifib SARL and Contributors.
# All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly advised to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import ConfigParser
import datetime
from optparse import OptionParser, Option
import sys
from base_promise import Config
from signature import Signature
def do_upgrade(config):
signature = Signature(config)
signature.upload(dry_run=config.dry_run)
class Parser(OptionParser):
"""
Parse all arguments.
"""
def __init__(self, usage=None, version=None):
"""
Initialize all options possibles.
"""
OptionParser.__init__(self, usage=usage, version=version,
option_list=[
Option("--slapos-configuration",
default='/etc/opt/update.cfg',
help="Configuration File used to upload the key."),
Option("--upgrade-file",
default='/etc/opt/slapos/slapos-upgrade',
help="File used as reference to upgrade."),
Option("-u", "--upgrade",
default=False,
action="store_true",
help="If selected will update tomorrow."),
Option("-r", "--reboot",
default=False,
action="store_true",
help="If selected will reboot tomorrow."),
Option("-n", "--dry-run",
help="Simulate the execution steps",
default=False,
action="store_true"),
])
def check_args(self):
"""
Check arguments
"""
(options, args) = self.parse_args()
return options
# Utility fonction to get yes/no answers
def get_yes_no(prompt):
while True:
answer = raw_input(prompt + " [y,n]: ")
if answer.upper() in ['Y', 'YES']:
return True
if answer.upper() in ['N', 'NO']:
return False
def main():
"""Upload file to update computer and slapos"""
usage = "usage: [options] "
# Parse arguments
config = Config(Parser(usage=usage).check_args())
do_upgrade(config)
sys.exit()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment