Commit a5d326d7 authored by Jérome Perrin's avatar Jérome Perrin

proftpd: instance test suite

To run tests locally by `python setup.py test`

This is a new kind of test suite that compile software, request instance
with specific parameters and check that the instance works as expected.
parent f6004869
Tests for ProFTPd software release
##############################################################################
#
# Copyright (c) 2018 Nexedi SA and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from setuptools import setup, find_packages
import glob
import os
version = '0.0.1.dev0'
name = 'slapos.test.proftpd'
long_description = open("README.md").read()
setup(name=name,
version=version,
description="Test for SlapOS' ProFTPd",
long_description=long_description,
long_description_content_type='text/markdown',
maintainer="Nexedi",
maintainer_email="info@nexedi.com",
url="https://lab.nexedi.com/nexedi/slapos",
packages=find_packages(),
install_requires=[
'slapos.core',
'erp5.util',
'pysftp',
],
zip_safe=True,
test_suite='test',
)
##############################################################################
#
# Copyright (c) 2018 Nexedi SA and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import os
import shutil
import urlparse
import tempfile
import StringIO
import subprocess
import pysftp
from paramiko.ssh_exception import SSHException
from paramiko.ssh_exception import AuthenticationException
import utils
# for development: debugging logs and install Ctrl+C handler
if os.environ.get('DEBUG'):
import logging
logging.basicConfig(level=logging.DEBUG)
import unittest
unittest.installHandler()
class ProFTPdTestCase(utils.SlapOSInstanceTestCase):
@classmethod
def getSoftwareURLList(cls):
return (os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'software.cfg')), )
def _getConnection(self, username=None, password=None, hostname=None):
"""Returns a pysftp connection connected to the SFTP
username and password can be specified and default to the ones from
instance connection parameters.
another hostname can also be passed.
"""
# this tells paramiko not to verify host key
cnopts = pysftp.CnOpts()
cnopts.hostkeys = None
parameter_dict = self.computer_partition.getConnectionParameterDict()
sftp_url = urlparse.urlparse(parameter_dict['url'])
return pysftp.Connection(
hostname or sftp_url.hostname,
port=sftp_url.port,
cnopts=cnopts,
username=username or parameter_dict['username'],
password=password or parameter_dict['password'])
class TestSFTPListen(ProFTPdTestCase):
def test_listen_on_ipv4(self):
self.assertTrue(self._getConnection(hostname=self.config['ipv4_address']))
def test_does_not_listen_on_all_ip(self):
with self.assertRaises(SSHException):
self._getConnection(hostname='0.0.0.0')
class TestSFTPOperations(ProFTPdTestCase):
"""Tests upload / download features we expect in SFTP server.
"""
def setUp(self):
self.upload_dir = os.path.join(
self.computer_partition_root_path, 'srv', 'proftpd')
def tearDown(self):
for name in os.listdir(self.upload_dir):
path = os.path.join(self.upload_dir, name)
if os.path.isfile(path) or os.path.islink(path):
os.remove(path)
else:
shutil.rmtree(path)
def test_simple_sftp_session(self):
with self._getConnection() as sftp:
# put a file
with tempfile.NamedTemporaryFile() as f:
f.write("Hello FTP !")
f.flush()
sftp.put(f.name, remotepath='testfile')
# it's visible in listdir()
self.assertEqual(['testfile'], sftp.listdir())
# and also in the server filesystem
self.assertEqual(['testfile'], os.listdir(self.upload_dir))
# download the file again, it should have same content
tempdir = tempfile.mkdtemp()
self.addCleanup(lambda : shutil.rmtree(tempdir))
local_file = os.path.join(tempdir, 'testfile')
retrieve_same_file = sftp.get('testfile', local_file)
with open(local_file) as f:
self.assertEqual(f.read(), "Hello FTP !")
def test_uploaded_file_not_visible_until_fully_uploaded(self):
test_self = self
class PartialFile(StringIO.StringIO):
def read(self, *args):
# file is not visible yet
test_self.assertNotIn('destination', os.listdir(test_self.upload_dir))
# it's just a hidden file
test_self.assertEqual(['.in.destination.'], os.listdir(test_self.upload_dir))
return StringIO.StringIO.read(self, *args)
with self._getConnection() as sftp:
sftp.sftp_client.putfo(PartialFile("content"), "destination")
# now file is visible
self.assertEqual(['destination'], os.listdir(self.upload_dir))
def test_partial_upload_are_deleted(self):
test_self = self
with self._getConnection() as sftp:
class ErrorFile(StringIO.StringIO):
def read(self, *args):
# at this point, file is already created on server
test_self.assertEqual(['.in.destination.'], os.listdir(test_self.upload_dir))
# simulate a connection closed
sftp.sftp_client.close()
return "something that will not be sent to server"
with self.assertRaises(IOError):
sftp.sftp_client.putfo(ErrorFile(), "destination")
# no half uploaded file is kept
self.assertEqual([], os.listdir(self.upload_dir))
def test_user_cannot_escape_home(self):
with self._getConnection() as sftp:
with self.assertRaisesRegexp(IOError, 'Permission denied'):
sftp.listdir('..')
with self.assertRaisesRegexp(IOError, 'Permission denied'):
sftp.listdir('/')
with self.assertRaisesRegexp(IOError, 'Permission denied'):
sftp.listdir('/tmp/')
class TestUserManagement(ProFTPdTestCase):
def test_user_can_be_added_from_script(self):
with self.assertRaisesRegexp(AuthenticationException, 'Authentication failed'):
self._getConnection(username='bob', password='secret')
subprocess.check_call(
'echo secret | %s/bin/ftpasswd --name=bob --stdin' % self.computer_partition_root_path,
shell=True)
self.assertTrue(self._getConnection(username='bob', password='secret'))
class TestBan(ProFTPdTestCase):
def test_client_are_banned_after_5_wrong_passwords(self):
# Simulate failed 5 login attempts
for i in range(5):
with self.assertRaisesRegexp(AuthenticationException, 'Authentication failed'):
self._getConnection(password='wrong')
# after that, even with a valid password we cannot connect
with self.assertRaisesRegexp(SSHException, 'Connection reset by peer'):
self._getConnection()
# ban event is logged
with open(os.path.join(
self.computer_partition_root_path, 'var', 'log', 'proftpd-ban.log')) as ban_log_file:
self.assertRegexpMatches(
ban_log_file.readlines()[-1],
'login from host .* denied due to host ban')
class TestInstanceParameterPort(ProFTPdTestCase):
@classmethod
def getInstanceParmeterDict(cls):
cls.free_port = utils.findFreeTCPPort(cls.config['ipv4_address'])
return {'port': cls.free_port}
def test_instance_parameter_port(self):
parameter_dict = self.computer_partition.getConnectionParameterDict()
sftp_url = urlparse.urlparse(parameter_dict['url'])
self.assertEqual(self.free_port, sftp_url.port)
self.assertTrue(self._getConnection())
##############################################################################
#
# Copyright (c) 2018 Nexedi SA and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import unittest
import os
import socket
from contextlib import closing
import logging
from erp5.util.testnode.SlapOSControler import SlapOSControler
from erp5.util.testnode.ProcessManager import ProcessManager
import slapos
def findFreeTCPPort(ip=''):
"""Find a free TCP port to listen to.
"""
family = socket.AF_INET6 if ':' in ip else socket.AF_INET
with closing(socket.socket(family, socket.SOCK_STREAM)) as s:
s.bind((ip, 0))
return s.getsockname()[1]
class SlapOSInstanceTestCase(unittest.TestCase):
@classmethod
def getSoftwareURLList(cls):
"""Return URL of software releases to install.
To be defined by subclasses.
"""
raise NotImplementedError()
@classmethod
def getInstanceParmeterDict(cls):
"""Return instance parameters
To be defined by subclasses if they need to request instance with specific
parameters.
"""
return {}
# TODO: allow subclasses to request a specific software type ?
@classmethod
def setUpClass(cls):
working_directory = os.environ.get(
'SLAPOS_TEST_WORKING_DIR',
os.path.join(os.path.dirname(__file__), '.slapos'))
# To prevent error: Cannot open an HTTP server: socket.error reported
# AF_UNIX path too long This `working_directory` should not be too deep.
# Socket path is 108 char max on linux
# https://github.com/torvalds/linux/blob/3848ec5/net/unix/af_unix.c#L234-L238
if len(working_directory + '/inst/supervisord.socket') > 108:
raise RuntimeError('working directory too deep, try setting SLAPOS_TEST_WORKING_DIR')
if not os.path.exists(working_directory):
os.mkdir(working_directory)
cls.config = config = {
"working_directory": working_directory,
"slapos_directory": working_directory,
"log_directory": working_directory,
"computer_id": 'slapos.test', # XXX
'proxy_database': os.path.join(working_directory, 'proxy.db'),
'partition_reference': cls.__name__,
# "proper" slapos command must be in $PATH
'slapos_binary': 'slapos',
}
# Some tests are expecting that local IP is not set to 127.0.0.1
ipv4_address = os.environ.get('LOCAL_IPV4', '127.0.1.1')
ipv6_address = os.environ['GLOBAL_IPV6']
config['proxy_host'] = config['ipv4_address'] = ipv4_address
config['ipv6_address'] = ipv6_address
config['proxy_port'] = findFreeTCPPort(ipv4_address)
config['master_url'] = 'http://{proxy_host}:{proxy_port}'.format(**config)
cls._process_manager = process_manager = ProcessManager()
# XXX this code is copied from testnode code
slapos_controler = SlapOSControler(
working_directory,
config
)
slapproxy_log = os.path.join(config['log_directory'], 'slapproxy.log')
logger = logging.getLogger(__name__)
logger.debug('Configured slapproxy log to %r', slapproxy_log)
software_url_list = cls.getSoftwareURLList()
slapos_controler.initializeSlapOSControler(
slapproxy_log=slapproxy_log,
process_manager=process_manager,
reset_software=False,
software_path_list=software_url_list)
process_manager.supervisord_pid_file = os.path.join(
slapos_controler.instance_root, 'var', 'run', 'supervisord.pid')
software_status_dict = slapos_controler.runSoftwareRelease(config, environment=os.environ)
# TODO: log more details in this case
assert software_status_dict['status_code'] == 0
instance_parameter_dict = cls.getInstanceParmeterDict()
instance_status_dict = slapos_controler.runComputerPartition(
config,
cluster_configuration=instance_parameter_dict,
environment=os.environ)
# TODO: log more details in this case
assert instance_status_dict['status_code'] == 0
# FIXME: similar to test node, only one (root) partition is really supported for now.
computer_partition_list = []
for i in range(len(software_url_list)):
computer_partition_list.append(
slapos_controler.slap.registerOpenOrder().request(
software_url_list[i],
# This is how testnode's SlapOSControler name created partitions
partition_reference='testing partition {i}'.format(i=i, **config),
partition_parameter_kw=instance_parameter_dict))
# expose some class attributes so that tests can use them:
# the ComputerPartition instances, to getInstanceParmeterDict
cls.computer_partition = computer_partition_list[0]
# the path of the instance on the filesystem, for low level inspection
cls.computer_partition_root_path = os.path.join(
config['working_directory'],
'inst',
cls.computer_partition.getId())
@classmethod
def tearDownClass(cls):
# FIXME: if setUpClass fail, this is not called and leaks zombie processes
cls._process_manager.killPreviousRun()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment