Commit 154f61d5 authored by Rafael Monnerat's avatar Rafael Monnerat

slapos.networkbench: Refactor and include tests

  Use json as configuration format
  Accept remote configuration file (url to a json)
  Introduce API to assert reponse codes and result for http and dns
  Add Unit tests.
parent f907713d
Pipeline #1265 skipped
......@@ -47,6 +47,7 @@ setup(name=name,
'netifaces',
'erp5.util',
'PyRSS2Gen',
'dnspython',
] + additional_install_requires,
extras_require = {
'lampconfigure': ["mysqlclient"], #needed for MySQL Database access
......
networkbench
============
import socket
import logging
import time
import ConfigParser
import logging.handlers
import urllib2
import subprocess
import re
import sys
......@@ -11,8 +9,31 @@ import shutil
import netifaces
import random
import pycurl
import argparse
import json
from StringIO import StringIO
from ping import ping, ping6
from dnsbench import resolve
from http import get_curl, request
import textwrap
class HelpFormatter(argparse.ArgumentDefaultsHelpFormatter):
def _get_help_string(self, action):
return super(HelpFormatter, self)._get_help_string(action) \
if action.default else action.help
def _split_lines(self, text, width):
"""Preserves new lines in option descriptions"""
lines = []
for text in text.splitlines():
lines += textwrap.wrap(text, width)
return lines
def _fill_text(self, text, width, indent):
"""Preserves new lines in other descriptions"""
kw = dict(width=width, initial_indent=indent, subsequent_indent=indent)
return '\n'.join(textwrap.fill(t, **kw) for t in text.splitlines())
botname = socket.gethostname()
......@@ -30,116 +51,23 @@ date_reg_exp = re.compile('\d{4}[-/]\d{2}[-/]\d{2}')
def _get_network_gateway(self):
return netifaces.gateways()["default"][netifaces.AF_INET][0]
def _test_dns(name):
begin = time.time()
try:
socket.gethostbyname(name)
resolution = 200
status = "OK"
except socket.gaierror:
resolution = 600
status = "Cannot resolve the hostname"
resolving_time = time.time() - begin
return ('DNS', name, resolution, resolving_time, status)
def _test_ping(host, timeout=10, protocol="4"):
if protocol == '4':
ping_bin = 'ping'
test_title = 'PING'
elif protocol == '6':
ping_bin = 'ping6'
test_title = 'PING6'
proc = subprocess.Popen((ping_bin, '-c', '10', '-w', str(timeout), host),
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = proc.communicate()
if 'Network is unreachable' in err:
return (test_title, host, '600', 'failed', 100, "Network is unreachable")
try:
packet_loss_line, summary_line = (out.splitlines() or [''])[-2:]
except:
return (test_title, host, '600', 'failed', -1, "Fail to parser ping output")
m = ping_re.match(summary_line)
match = re.search('(\d*)% packet loss', packet_loss_line)
packet_lost_ratio = match.group(1)
info_list = (test_title, host, '600', 'failed', packet_lost_ratio, "Cannot ping host")
if packet_lost_ratio != 0:
if m:
info_list = (test_title, host, '200', m.group('avg'), packet_lost_ratio,
'min %(min)s max %(max)s avg %(avg)s' % m.groupdict())
else:
info_list = (test_title, host, '600', 'failed', packet_lost_ratio,
"You have package Lost")
return info_list
def _test_ping6(host, timeout=10):
return _test_ping(host, timeout=10, protocol='6')
def _test_url_request(url):
begin = time.time()
buffer = StringIO()
curl = pycurl.Curl()
curl.setopt(curl.URL, url)
curl.setopt(curl.CONNECTTIMEOUT, 10)
curl.setopt(curl.TIMEOUT, 300)
curl.setopt(curl.WRITEDATA, buffer)
curl.setopt(curl.SSL_VERIFYPEER, False)
curl.setopt(curl.SSL_VERIFYHOST, False)
result = "OK"
try:
curl.perform()
except:
import traceback
traceback.print_exc(file=sys.stderr)
sys.stderr.flush()
result = "FAIL"
body = buffer.getvalue()
def load_configuration(config_path):
if config_path.startswith("http://") or \
config_path.startswith("ftp://") or \
config_path.startswith("https://") or \
config_path.startswith("file://"):
return download_external_configuration(config_path)
rendering_time = "%s;%s;%s;%s;%s" % \
(curl.getinfo(curl.NAMELOOKUP_TIME),
curl.getinfo(curl.CONNECT_TIME),
curl.getinfo(curl.PRETRANSFER_TIME),
curl.getinfo(curl.STARTTRANSFER_TIME),
curl.getinfo(curl.TOTAL_TIME))
response_code = curl.getinfo(pycurl.HTTP_CODE)
curl.close()
info_list = ('GET', url, response_code, rendering_time, result)
return info_list
with open(config_path, "r") as f:
return json.load(f)
def download_external_configuration(url):
buffer = StringIO()
curl = pycurl.Curl()
curl.setopt(curl.URL, url)
curl.setopt(curl.CONNECTTIMEOUT, 10)
curl.setopt(curl.TIMEOUT, 300)
curl.setopt(curl.WRITEDATA, buffer)
curl.setopt(curl.SSL_VERIFYPEER, False)
curl.setopt(curl.SSL_VERIFYHOST, False)
try:
curl.perform()
except:
import traceback
traceback.print_exc(file=sys.stderr)
sys.stderr.flush()
curl, _ = get_curl(buffer, url)
response_code = curl.getinfo(pycurl.HTTP_CODE)
curl.close()
if response_code == 200:
if response_code in (200, 0):
try:
return json.loads(buffer.getvalue())
except ValueError:
......@@ -148,6 +76,8 @@ def download_external_configuration(url):
traceback.print_exc(file=sys.stderr)
sys.stderr.flush()
print "Ignoring external configuration"
finally:
curl.close()
return {}
......@@ -178,15 +108,7 @@ def is_rotate_log(log_file_path):
finally:
log_file.close()
def create_logger(name, log_folder):
new_logger = logging.getLogger(name)
new_logger.setLevel(logging.DEBUG)
log_file = '%s/network_bench.%s.log' % (log_folder, name)
handler = logging.handlers.TimedRotatingFileHandler(
log_file, when="D",
backupCount=1000)
def rotate_logfile(handler, log_file):
last_date = is_rotate_log(log_file)
if last_date:
handler.doRollover()
......@@ -198,73 +120,70 @@ def create_logger(name, log_folder):
stdout=subprocess.PIPE, stdin=subprocess.PIPE, shell=True)
sp.communicate()
format = "%%(asctime)-16s;%s;%%(message)s" % botname
handler.setFormatter(logging.Formatter(format))
new_logger.addHandler(handler)
return new_logger
def main():
if len(sys.argv) not in [2, 3]:
print " USAGE: %s configuration_file [log_folder]" % sys.argv[0]
return
config = ConfigParser.ConfigParser()
config.read(sys.argv[1])
if len(sys.argv) == 3:
log_folder = sys.argv[2]
else:
log_folder = "."
delay = random.randint(0, 30)
name_list = []
url_list = []
ping_list = []
ping6_list = []
def create_logger(name, log_folder, verbose):
new_logger = logging.getLogger(name)
if config.has_option("network_bench", "dns"):
name_list = config.get("network_bench", "dns").split()
new_logger.setLevel(logging.DEBUG)
log_file = '%s/network_bench.%s.log' % (log_folder, name)
handler = logging.handlers.TimedRotatingFileHandler(
log_file, when="D",
backupCount=1000)
if config.has_option("network_bench", "url"):
url_list = config.get("network_bench", "url").split()
rotate_logfile(handler, log_file)
if config.has_option("network_bench", "ping"):
ping_list = config.get("network_bench", "ping").split()
format = "%%(asctime)-16s;%s;%%(message)s" % botname
handler.setFormatter(logging.Formatter(format))
new_logger.addHandler(handler)
if config.has_option("network_bench", "ping6"):
ping6_list = config.get("network_bench", "ping6").split()
if verbose:
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter(format))
new_logger.addHandler(handler)
return new_logger
def run_all(config_dict, log_folder, verbose):
dns_logger = create_logger("dns", log_folder, verbose)
name_dict = config_dict.get("dns", {})
for name in name_dict:
expected = name_dict[name].get("expected")
dns_logger.info(';'.join(str(x) for x in resolve(name, expected)))
if config.has_option("network_bench", "test_distributor_url"):
ping_logger = create_logger("ping", log_folder, verbose)
for host in config_dict.get("ping",[]):
ping_logger.info(';'.join(str(x) for x in ping(host)))
external_configuration_url = config.get("network_bench", "test_distributor_url")
external_config_dict = download_external_configuration(external_configuration_url)
ping6_logger = create_logger("ping6", log_folder, verbose)
for host in config_dict.get("ping6", []):
ping6_logger.info(';'.join(str(x) for x in ping6(host)))
name_list.extend(external_config_dict.get("dns", []))
url_list.extend(external_config_dict.get("url",[]))
ping_list.extend(external_config_dict.get("ping", []))
ping6_list.extend(external_config_dict.get("ping6", []))
http_logger = create_logger("http", log_folder, verbose)
url_dict = config_dict.get("url", {})
for url in url_dict:
http_logger.info(';'.join(str(x) for x in request(url, url_dict[url])))
time.sleep(delay)
def main():
parser = argparse.ArgumentParser(
description="Run network benchmarch.",
)
_ = parser.add_argument
_('-l', '--logdir', default=".",
help="Directory where the logs are going to be placed.")
_('-c', '--conf', help="Path to the configuration json file.")
_('-v', '--verbose', action='store_true',
help="Show the results on stdout.")
_('-d', '--delay', default=random.randint(0, 30),
help="Delay before start to run," \
"as this script can be called on cron.")
dns_logger = create_logger("dns", log_folder)
for name in name_list:
info_list = _test_dns(name)
dns_logger.info(';'.join(str(x) for x in info_list))
config = parser.parse_args()
ping_logger = create_logger("ping", log_folder)
for host in ping_list:
info_list = _test_ping(host)
ping_logger.info(';'.join(str(x) for x in info_list))
print("Downloading %s..." % config.conf.strip())
config_dict = load_configuration(config.conf)
print("Waiting %s before start..." % config.delay)
time.sleep(float(config.delay))
ping6_logger = create_logger("ping6", log_folder)
for host in ping6_list:
info_list = _test_ping6(host)
ping6_logger.info(';'.join(str(x) for x in info_list))
run_all(config_dict,
log_folder=config.logdir,
verbose=config.verbose)
http_logger = create_logger("http", log_folder)
for url in url_list:
info_list = _test_url_request(url)
http_logger.info(';'.join(str(x) for x in info_list))
import socket
import time
import dns.resolver
def resolve(name, expected_list=None):
""" Resolve name using standard system name resolution.
"""
begin = time.time()
try:
ip_list = [i.to_text() for i in dns.resolver.query(name, "A")]
resolution = 200
status = "OK"
except dns.resolver.NXDOMAIN:
resolution = 600
status = "Cannot resolve the hostname"
ip_list = []
resolving_time = time.time() - begin
# Out put is:
# TEST IDENTIFIER, NAME, RESOLUTION (200 or 600), Time for resolve,
# status ("OK" or "Cannot resolve the hostname"), Resolved IP.
if expected_list is not None and set(expected_list) != set(ip_list):
status = "UNEXPECTED"
ip_list = "%s (expected) != %s (found)" % (expected_list, ip_list)
return ('DNS', name, resolution, resolving_time, status, ip_list)
import sys
import pycurl
from StringIO import StringIO
def get_curl(buffer, url):
curl = pycurl.Curl()
curl.setopt(curl.URL, url)
curl.setopt(curl.CONNECTTIMEOUT, 10)
curl.setopt(curl.TIMEOUT, 30)
curl.setopt(curl.WRITEDATA, buffer)
curl.setopt(curl.SSL_VERIFYPEER, False)
curl.setopt(curl.SSL_VERIFYHOST, False)
result = "OK"
try:
curl.perform()
except:
import traceback
traceback.print_exc(file=sys.stderr)
sys.stderr.flush()
result = "FAIL"
return curl, result
def request(url, expected_dict):
buffer = StringIO()
curl, result = get_curl(buffer, url)
body = buffer.getvalue()
rendering_time = "%s;%s;%s;%s;%s" % \
(curl.getinfo(curl.NAMELOOKUP_TIME),
curl.getinfo(curl.CONNECT_TIME),
curl.getinfo(curl.PRETRANSFER_TIME),
curl.getinfo(curl.STARTTRANSFER_TIME),
curl.getinfo(curl.TOTAL_TIME))
response_code = curl.getinfo(pycurl.HTTP_CODE)
expected_response = expected_dict.get("expected_response", None)
if expected_response is not None and \
expected_response != response_code:
result = "UNEXPECTED (%s != %s)" % (expected_response, response_code)
expected_text = expected_dict.get("expected_text", None)
if expected_text is not None and \
str(expected_text) not in str(body):
result = "UNEXPECTED (%s not in page content)" % (expected_text)
curl.close()
info_list = ('GET', url, response_code, rendering_time, result)
return info_list
import subprocess
import re
# rtt min/avg/max/mdev = 1.102/1.493/2.203/0.438 ms
ping_re = re.compile(
".*"
"(?P<min>[\d\.]+)/"
"(?P<avg>[\d\.]+)/"
"(?P<max>[\d\.]+)/"
"(?P<mdev>[\d\.]+) ms"
)
date_reg_exp = re.compile('\d{4}[-/]\d{2}[-/]\d{2}')
def ping(host, timeout=10, protocol="4"):
if protocol == '4':
ping_bin = 'ping'
test_title = 'PING'
elif protocol == '6':
ping_bin = 'ping6'
test_title = 'PING6'
proc = subprocess.Popen((ping_bin, '-c', '10', '-w', str(timeout), host),
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = proc.communicate()
if 'Network is unreachable' in err:
return (test_title, host, 600, 'failed', 100, "Network is unreachable")
try:
packet_loss_line, summary_line = (out.splitlines() or [''])[-2:]
except:
return (test_title, host, 600, 'failed', -1, "Fail to parser ping output")
m = ping_re.match(summary_line)
match = re.search('(\d*)% packet loss', packet_loss_line)
packet_lost_ratio = match.group(1)
info_list = (test_title, host, 600, 'failed', packet_lost_ratio, "Cannot ping host")
if packet_lost_ratio != 0:
if m:
info_list = (test_title, host, 200, m.group('avg'), packet_lost_ratio,
'min %(min)s max %(max)s avg %(avg)s' % m.groupdict())
else:
info_list = (test_title, host, 600, 'failed', packet_lost_ratio,
"You have package Lost")
return info_list
def ping6(host, timeout=10):
return ping(host, timeout=10, protocol='6')
##############################################################################
#
# Copyright (c) 2015 Vifib SARL and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import unittest
import os.path
from slapos.networkbench import dnsbench
from slapos.networkbench.ping import ping, ping6
from slapos.networkbench.http import request
DNS_EXPECTED_LIST = ["85.118.38.162", "176.31.129.213"]
class TestDNSBench(unittest.TestCase):
def test_dnsbench_ok(self):
""" Test dns resolution, this test may fail if
not ran from Europe.
# Update domain later.
"""
info = dnsbench.resolve(
"www.erp5.com", DNS_EXPECTED_LIST)
self.assertEquals(info[0], 'DNS')
self.assertEquals(info[1], 'www.erp5.com')
self.assertEquals(info[2], 200)
self.assertTrue(info[3] < 1)
  • @rafael sometimes this test fail

    ======================================================================
    FAIL: test_dnsbench_fail (slapos.test.test_networkbench.TestDNSBench)
    Test dns failure resolution
    ----------------------------------------------------------------------
    Traceback (most recent call last):
    File "/srv/slapgrid/slappart20/srv/testnode/cum/inst/test0-0/parts/slapos.toolbox/slapos/test/test_networkbench.py", line 66, in test_dnsbench_fail
    self.assertLess(info[3], 1)
    AssertionError: 2.074226140975952 not less than 1

    Is there a reason we should check it's less than 1 second ? how about changing to 5 seconds ?

    or is it a problem on testnodes you think ?

    /cc @tomo @seb

    Edited by Jérome Perrin
  • it is weird to be that slow, but ok to change to 4 or 5. I think the idea was to ensure it is fast.

Please register or sign in to reply
self.assertEquals(info[4], 'OK')
self.assertEquals(set(info[5]), set([u'85.118.38.162', u'176.31.129.213']))
def test_dnsbench_fail(self):
""" Test dns failure resolution
"""
info = dnsbench.resolve(
"thisdomaindontexist.erp5.com")
self.assertEquals(info[0], 'DNS')
self.assertEquals(info[1], 'thisdomaindontexist.erp5.com')
self.assertEquals(info[2], 600)
self.assertTrue(info[3] < 1)
self.assertEquals(info[4], 'Cannot resolve the hostname')
self.assertEquals(info[5], [])
def test_dnsbench_unexpected(self):
""" Test dns unexpected resolution
"""
info = dnsbench.resolve(
"www.erp5.com", [DNS_EXPECTED_LIST[0]])
self.assertEquals(info[0], 'DNS')
self.assertEquals(info[1], 'www.erp5.com')
self.assertEquals(info[2], 200)
self.assertTrue(info[3] < 1)
self.assertEquals(info[4], 'UNEXPECTED')
self.assertTrue(info[5].startswith("['85.118.38.162'] (expected) != "))
class TestPing(unittest.TestCase):
def test_ping_ok(self):
info = ping("localhost")
self.assertEquals(info[0], 'PING')
self.assertEquals(info[1], 'localhost')
self.assertEquals(info[2], 200)
self.assertTrue(float(info[3]) < 0.2)
self.assertEquals(info[4], '0')
self.assertTrue(info[5].startswith("min"))
def test_ping_fail(self):
info = ping("couscous")
self.assertEquals(info[0], 'PING')
self.assertEquals(info[1], 'couscous')
self.assertEquals(info[2], 600)
self.assertEquals(info[3], 'failed')
self.assertEquals(info[4], -1)
self.assertEquals(info[5], 'Fail to parser ping output')
def test_ping6_ok(self):
info = ping6("localhost")
self.assertEquals(info[0], 'PING6')
self.assertEquals(info[1], 'localhost')
self.assertEquals(info[2], 200)
self.assertTrue(float(info[3]) < 0.2)
self.assertEquals(info[4], '0')
self.assertTrue(info[5].startswith("min"))
def test_ping6_fail(self):
info = ping6("couscous")
self.assertEquals(info[0], 'PING6')
self.assertEquals(info[1], 'couscous')
self.assertEquals(info[2], 600)
self.assertEquals(info[3], 'failed')
self.assertEquals(info[4], -1)
self.assertEquals(info[5], 'Fail to parser ping output')
class TestHTTPBench(unittest.TestCase):
def test_request_ok(self):
""" This test is way to badly written as it depends on
www.erp5.com for now, please replace it
Please register or sign in to reply
"""
info = request("https://www.erp5.com", {})
self.assertEquals(info[0], 'GET')
self.assertEquals(info[1], 'https://www.erp5.com')
self.assertEquals(info[2], 200)
self.assertEquals(len(info[3].split(';')), 5 )
self.assertEquals(info[4], "OK")
def test_request_expected_response(self):
""" This test is way to badly written as it depends on
www.erp5.com for now, please replace it
"""
info = request("https://www.erp5.com", {"expected_response": 200})
self.assertEquals(info[0], 'GET')
self.assertEquals(info[1], 'https://www.erp5.com')
self.assertEquals(info[2], 200)
self.assertEquals(len(info[3].split(';')), 5 )
self.assertEquals(info[4], "OK")
def test_request_expected_redirection(self):
""" This test is way to badly written as it depends on
www.erp5.com for now, please replace it
"""
info = request("http://www.erp5.com", {"expected_response": 302})
self.assertEquals(info[0], 'GET')
self.assertEquals(info[1], 'http://www.erp5.com')
self.assertEquals(info[2], 302)
self.assertEquals(len(info[3].split(';')), 5 )
self.assertEquals(info[4], "OK")
def test_request_expected_text(self):
""" This test is way to badly written as it depends on
www.erp5.com for now, please replace it
"""
info = request("https://www.erp5.com", {"expected_text": "ERP5"})
self.assertEquals(info[0], 'GET')
self.assertEquals(info[1], 'https://www.erp5.com')
self.assertEquals(info[2], 200)
self.assertEquals(len(info[3].split(';')), 5 )
self.assertEquals(info[4], "OK")
def test_request_fail(self):
""" Test unreachable URL
"""
info = request("http://thisurldontexist.erp5.com", {})
self.assertEquals(info[0], 'GET')
self.assertEquals(info[1], 'http://thisurldontexist.erp5.com')
self.assertEquals(info[2], 0)
self.assertEquals(len(info[3].split(';')), 5 )
self.assertEquals(info[4], "FAIL")
def test_request_unexpected_response(self):
""" This test is way to badly written as it depends on
www.erp5.com for now, please replace it
"""
info = request("http://www.erp5.com", {"expected_response": 200})
self.assertEquals(info[0], 'GET')
self.assertEquals(info[1], 'http://www.erp5.com')
self.assertEquals(info[2], 302)
self.assertEquals(len(info[3].split(';')), 5 )
self.assertEquals(info[4], "UNEXPECTED (200 != 302)")
def test_request_unexpected_text(self):
""" This test is way to badly written as it depends on
www.erp5.com for now, please replace it.
"""
info = request("https://www.erp5.com", {"expected_text": "COUSCOUS"})
self.assertEquals(info[0], 'GET')
self.assertEquals(info[1], 'https://www.erp5.com')
self.assertEquals(info[2], 200)
self.assertEquals(len(info[3].split(';')), 5 )
self.assertEquals(info[4], "UNEXPECTED (COUSCOUS not in page content)")
#def request(url, expected_dict):
#
# rendering_time = "%s;%s;%s;%s;%s" % \
# (curl.getinfo(curl.NAMELOOKUP_TIME),
# curl.getinfo(curl.CONNECT_TIME),
# curl.getinfo(curl.PRETRANSFER_TIME),
# curl.getinfo(curl.STARTTRANSFER_TIME),
# curl.getinfo(curl.TOTAL_TIME))
#
# response_code = curl.getinfo(pycurl.HTTP_CODE)
#
# expected_response = expected_dict.get("expected_response", None)
# if expected_response is not None and \
# expected_response != response_code:
# result = "UNEXPECTED (%s != %s)" % (expected_response, response_code)
#
# expected_text = expected_dict.get("expected_text", None)
# if expected_text is not None and \
# str(expected_text) not in str(body):
# result = "UNEXPECTED (%s not in page content)" % (expected_text)
#
#
# info_list = ('GET', url, response_code, rendering_time, result)
#
# return info_list
#
if __name__ == '__main__':
unittest.main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment