Commit de75a33d authored by Julien Muchembled's avatar Julien Muchembled

Import erp5.utils.{benchmark,test_browser} from svn.erp5.org:public/erp5/trunk/utils

parents 65d6f5fc 91247aec
...@@ -4,7 +4,12 @@ Changes ...@@ -4,7 +4,12 @@ Changes
0.2 (unreleased) 0.2 (unreleased)
---------------- ----------------
* No changes yet. * Imported from https://svn.erp5.org/repos/public/erp5/trunk/utils/
- erp5.util.test_browser:
Programmable browser for functional and performance tests for ERP5
- erp5.util.benchmark:
Performance benchmarks for ERP5 with erp5.utils.test_browser
0.1 (2011-08-08) 0.1 (2011-08-08)
---------------- ----------------
......
API Documentation
-----------------
You can generate the API documentation using ``epydoc'':
$ epydoc src/erp5
##############################################################################
#
# Copyright (c) 2011 Nexedi SA and Contributors. All Rights Reserved.
# Arnaud Fontaine <arnaud.fontaine@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import os
import argparse
import functools
class ArgumentType(object):
@classmethod
def directoryType(cls, path):
if not (os.path.isdir(path) and os.access(path, os.W_OK)):
raise argparse.ArgumentTypeError("'%s' is not a valid directory or is "\
"not writable" % path)
return path
@classmethod
def objectFromModule(cls, module_name, object_name=None,
callable_object=False):
if module_name.endswith('.py'):
module_name = module_name[:-3]
if not object_name:
object_name = module_name
import sys
sys.path.append(os.getcwd())
try:
module = __import__(module_name, globals(), locals(), [object_name], -1)
except Exception, e:
raise argparse.ArgumentTypeError("Cannot import '%s.%s': %s" % \
(module_name, object_name, str(e)))
try:
obj = getattr(module, object_name)
except AttributeError:
raise argparse.ArgumentTypeError("Could not get '%s' in '%s'" % \
(object_name, module_name))
if callable_object and not callable(obj):
raise argparse.ArgumentTypeError(
"'%s.%s' is not callable" % (module_name, object_name))
return obj
@classmethod
def strictlyPositiveIntType(cls, value):
try:
converted_value = int(value)
except ValueError:
pass
else:
if converted_value > 0:
return converted_value
raise argparse.ArgumentTypeError('expects a strictly positive integer')
@classmethod
def strictlyPositiveIntOrRangeType(cls, value):
try:
return cls.strictlyPositiveIntType(value)
except argparse.ArgumentTypeError:
try:
min_max_list = value.split(',')
except ValueError:
pass
else:
if len(min_max_list) == 2:
minimum, maximum = cls.strictlyPositiveIntType(min_max_list[0]), \
cls.strictlyPositiveIntType(min_max_list[1])
if minimum >= maximum:
raise argparse.ArgumentTypeError('%d >= %d' % (minimum, maximum))
return (minimum, maximum)
raise argparse.ArgumentTypeError(
'expects either a strictly positive integer or a range of strictly '
'positive integer separated by a comma')
@classmethod
def ERP5UrlType(cls, url):
if url[-1] == '/':
url_list = url.rsplit('/', 2)[:-1]
else:
url_list = url.rsplit('/', 1)
url_list[0] = url_list[0] + '/'
if len(url_list) != 2:
raise argparse.ArgumentTypeError("Invalid URL given")
return url_list
# -*- coding: utf-8 -*-
def createPerson(result, browser):
"""
Create a Person and add a telephone number. It can be ran infinitely (e.g.
until it is interrupted by the end user) with 1 concurrent user, through
performance_tester_erp5 with the following command:
performance_tester_erp5 http://foo.bar:4242/erp5/ 1 createPerson
Please note that you must run this command from the same directory of this
script and userInfo.py. Further information about performance_tester_erp5
options and arguments are available by specifying ``--help''.
"""
# Go to Persons module (person_module)
result('Go to person module',
browser.mainForm.submitSelectModule(value='/person_module'))
# Create a new person and record the time elapsed in seconds
result('Add Person', browser.mainForm.submitNew())
# Check whether it has been successfully created
assert browser.getTransitionMessage() == 'Object created.'
# Fill the first and last name of the newly created person
browser.mainForm.getControl(name='field_my_first_name').value = 'Create'
browser.mainForm.getControl(name='field_my_last_name').value = 'Person'
# Submit the changes, record the time elapsed in seconds
result('Save', browser.mainForm.submitSave())
# Check whether the changes have been successfully updated
assert browser.getTransitionMessage() == 'Data updated.'
person_url = browser.url
# Add phone number
result('Add telephone',
browser.mainForm.submitSelectAction(value='add Telephone'))
# Fill telephone title and number
browser.mainForm.getControl(name='field_my_title'). value = 'Personal'
browser.mainForm.getControl(name='field_my_telephone_number').value = '0123456789'
# Submit the changes, record the time elapsed in seconds
result('Save', browser.mainForm.submitSave())
# Check whether the changes have been successfully updated
assert browser.getTransitionMessage() == 'Data updated.'
# Go back to the Person page before validating
browser.open(person_url)
# Validate it (as the workflow action may not be available yet, try 5 times
# and sleep 5s between each attempts before failing)
show_validate_time, waiting_for_validate_action = \
browser.mainForm.submitSelectWorkflow(value='validate_action',
maximum_attempt_number=5,
sleep_between_attempt=5)
result('Waiting for validate_action', waiting_for_validate_action)
result('Show validate', show_validate_time)
result('Validated', browser.mainForm.submitDialogConfirm())
assert browser.getTransitionMessage() == 'Status changed.'
# Specify user login/password used to run the tests
user_tuple = (('zope', 'zope'),)
#!/usr/bin/env python
##############################################################################
#
# Copyright (c) 2011 Nexedi SA and Contributors. All Rights Reserved.
# Arnaud Fontaine <arnaud.fontaine@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import argparse
import os
import sys
import multiprocessing
import xmlrpclib
import signal
import errno
from .argument import ArgumentType
from .process import BenchmarkProcess
from .result import ERP5BenchmarkResult, CSVBenchmarkResult
MAXIMUM_KEYBOARD_INTERRUPT = 5
class PerformanceTester(object):
def __init__(self, namespace=None):
if not namespace:
self._argument_namespace = self._parse_arguments(argparse.ArgumentParser(
description='Run ERP5 benchmarking suites.'))
else:
self._argument_namespace = namespace
@staticmethod
def _add_parser_arguments(parser):
# Optional arguments
parser.add_argument('--filename-prefix',
default='result',
metavar='PREFIX',
help='Filename prefix for results and logs files '
'(default: result)')
parser.add_argument('--report-directory', '-r',
type=ArgumentType.directoryType,
default=os.getcwd(),
metavar='DIRECTORY',
help='Directory where the results and logs will be stored '
'(default: current directory)')
parser.add_argument('--max-global-average',
type=float,
default=0,
metavar='N',
help='Stop when any suite operation is over this value '
'(default: disable)')
parser.add_argument('--users-file',
dest='user_info_filename',
default='userInfo',
metavar='MODULE',
help="Import users from ``user_tuple'' in MODULE")
parser.add_argument('--users-range-increment',
type=ArgumentType.strictlyPositiveIntType,
default=1,
metavar='N',
help='Number of users being added after each repetition '
'(default: 1)')
parser.add_argument('--enable-debug', '-d',
action='store_true',
default=False,
help='Enable debug messages')
parser.add_argument('--enable-legacy-listbox',
dest='is_legacy_listbox',
action='store_true',
default=False,
help='Enable legacy listbox for Browser')
parser.add_argument('--repeat',
type=ArgumentType.strictlyPositiveIntType,
default=-1,
metavar='N',
help='Repeat the benchmark suite N times '
'(default: infinite)')
parser.add_argument('--user-index',
type=int,
default=0,
metavar='INDEX',
help='Index of the first user within userInfo '
'(default: 0)')
parser.add_argument('--erp5-publish-url',
metavar='ERP5_PUBLISH_URL',
help='ERP5 URL to publish the results to '
'(default: disabled, thus writing to CSV files)')
parser.add_argument('--erp5-publish-project',
metavar='ERP5_PUBLISH_PROJECT',
help='ERP5 publish project')
# Mandatory arguments
parser.add_argument('url',
type=ArgumentType.ERP5UrlType,
metavar='URL',
help='ERP5 base URL')
parser.add_argument('users',
type=ArgumentType.strictlyPositiveIntOrRangeType,
metavar='NB_USERS|MIN_NB_USERS,MAX_NB_USERS',
help='Number of users (fixed or a range)')
parser.add_argument('benchmark_suite_list',
nargs='+',
metavar='BENCHMARK_SUITES',
help='Benchmark suite modules')
@staticmethod
def _check_parsed_arguments(namespace):
namespace.user_tuple = ArgumentType.objectFromModule(namespace.user_info_filename,
object_name='user_tuple')
object_benchmark_suite_list = []
for benchmark_suite in namespace.benchmark_suite_list:
object_benchmark_suite_list.append(ArgumentType.objectFromModule(benchmark_suite,
callable_object=True))
namespace.benchmark_suite_name_list = namespace.benchmark_suite_list
namespace.benchmark_suite_list = object_benchmark_suite_list
max_nb_users = isinstance(namespace.users, tuple) and namespace.users[1] or \
namespace.users
namespace.user_tuple = namespace.user_tuple[namespace.user_index:]
if max_nb_users > len(namespace.user_tuple):
raise argparse.ArgumentTypeError("Not enough users in the given file")
if (namespace.erp5_publish_url and not namespace.erp5_publish_project) or \
(not namespace.erp5_publish_url and namespace.erp5_publish_project):
raise argparse.ArgumentTypeError("Publish ERP5 URL and project must "
"be specified")
return namespace
@staticmethod
def _parse_arguments(parser):
PerformanceTester._add_parser_arguments(parser)
namespace = parser.parse_args()
PerformanceTester._check_parsed_arguments(namespace)
return namespace
def getResultClass(self):
if self._argument_namespace.erp5_publish_url:
return ERP5BenchmarkResult
else:
return CSVBenchmarkResult
def preRun(self):
if not self._argument_namespace.erp5_publish_url:
return
self._argument_namespace.erp5_publish_url += \
ERP5BenchmarkResult.createResultDocument(self._argument_namespace.erp5_publish_url,
self._argument_namespace.erp5_publish_project,
self._argument_namespace.repeat,
self._argument_namespace.users)
def postRun(self, error_message_set):
if not self._argument_namespace.erp5_publish_url:
return
ERP5BenchmarkResult.closeResultDocument(self._argument_namespace.erp5_publish_url,
error_message_set)
def _run_constant(self, nb_users):
process_list = []
exit_msg_queue = multiprocessing.Queue(nb_users)
result_class = self.getResultClass()
for user_index in range(nb_users):
process = BenchmarkProcess(exit_msg_queue, result_class,
self._argument_namespace, nb_users,
user_index)
process_list.append(process)
for process in process_list:
process.start()
error_message_set = set()
process_terminated_counter = 0
# Ensure that SIGTERM signal (sent by terminate()) is not sent twice
do_exit = False
while process_terminated_counter != len(process_list):
try:
error_message = exit_msg_queue.get()
except KeyboardInterrupt, e:
print >>sys.stderr, "\nInterrupted by user, stopping gracefully..."
do_exit = True
# An IOError may be raised when receiving a SIGINT which interrupts the
# blocking system call above and the system call should not be restarted
# (using siginterrupt), otherwise the process will stall forever as its
# child has already exited
except IOError, e:
if e.errno == errno.EINTR:
continue
else:
if error_message is not None:
error_message_set.add(error_message)
do_exit = True
process_terminated_counter += 1
# In case of error or SIGINT, kill the other children because they are
# likely failing as well (especially because a process only exits after
# encountering 10 errors)
if do_exit:
for process in process_list:
if process.is_alive():
process.terminate()
process.join()
if error_message_set:
return (error_message_set, 1)
return ((), 0)
def run(self):
error_message_set, exit_status = (), 0
self.preRun()
if isinstance(self._argument_namespace.users, tuple):
nb_users, max_users = self._argument_namespace.users
while True:
error_message_set, exit_status = self._run_constant(nb_users)
if exit_status != 0 or nb_users == max_users:
break
nb_users = min(nb_users + self._argument_namespace.users_range_increment,
max_users)
else:
error_message_set, exit_status = self._run_constant(
self._argument_namespace.users)
self.postRun(error_message_set)
return error_message_set, exit_status
def main():
error_message_set, exit_status = PerformanceTester().run()
for error_message in error_message_set:
print >>sys.stderr, "ERROR: %s" % error_message
sys.exit(exit_status)
if __name__ == '__main__':
main()
##############################################################################
#
# Copyright (c) 2011 Nexedi SA and Contributors. All Rights Reserved.
# Arnaud Fontaine <arnaud.fontaine@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import multiprocessing
import csv
import traceback
import os
import logging
import signal
import sys
from ..test_browser.browser import Browser
MAXIMUM_ERROR_COUNTER = 10
RESULT_NUMBER_BEFORE_FLUSHING = 100
class BenchmarkProcess(multiprocessing.Process):
def __init__(self, exit_msg_queue, result_klass, argument_namespace,
nb_users, user_index, *args, **kwargs):
self._exit_msg_queue = exit_msg_queue
self._result_klass = result_klass
self._argument_namespace = argument_namespace
self._nb_users = nb_users
self._user_index = user_index
# Initialized when running the test
self._browser = None
self._current_repeat = 1
# TODO: Per target error counter instead of global one?
self._error_counter = 0
super(BenchmarkProcess, self).__init__(*args, **kwargs)
def stopGracefully(self, *args, **kwargs):
signal.signal(signal.SIGTERM, signal.SIG_IGN)
raise StopIteration("Interrupted by user or because of an error from "
"another process, flushing remaining results...")
def getBrowser(self, log_file):
info_list = tuple(self._argument_namespace.url) + \
tuple(self._argument_namespace.user_tuple[self._user_index])
return Browser(*info_list,
is_debug=self._argument_namespace.enable_debug,
log_file=log_file,
is_legacy_listbox=self._argument_namespace.is_legacy_listbox)
def runBenchmarkSuiteList(self, result):
for target_idx, target in enumerate(self._argument_namespace.benchmark_suite_list):
self._logger.debug("EXECUTE: %s" % target)
result.enterSuite(target.__name__)
try:
self._browser.open()
target(result, self._browser)
except StopIteration:
raise
except Exception, e:
msg = "%s: %s" % (target, traceback.format_exc())
try:
msg += "Last response headers:\n%s\nLast response contents:\n%s" % \
(self._browser.headers, self._browser.contents)
except:
pass
if (self._current_repeat == 1 or
self._error_counter == MAXIMUM_ERROR_COUNTER):
raise RuntimeError(msg)
self._error_counter += 1
self._logger.warning(msg)
for stat in result.getCurrentSuiteStatList():
mean = stat.mean
self._logger.info("%s: min=%.3f, mean=%.3f (+/- %.3f), max=%.3f" % \
(stat.full_label,
stat.minimum,
mean,
stat.standard_deviation,
stat.maximum))
if (self._argument_namespace.max_global_average and
mean > self._argument_namespace.max_global_average):
raise RuntimeError("Stopping as mean is greater than maximum "
"global average")
result.exitSuite()
result.iterationFinished()
def run(self):
result_instance = self._result_klass(self._argument_namespace,
self._nb_users,
self._user_index)
self._logger = result_instance.getLogger()
# Ensure the data are flushed before exiting, handled by Result class
# __exit__ block
signal.signal(signal.SIGTERM, self.stopGracefully)
# Ignore KeyboardInterrupt as it is handled by the parent process
signal.signal(signal.SIGINT, signal.SIG_IGN)
exit_status = 0
exit_msg = None
try:
with result_instance as result:
self._browser = self.getBrowser(result_instance.log_file)
while self._current_repeat != (self._argument_namespace.repeat + 1):
self._logger.info("Iteration: %d" % self._current_repeat)
self.runBenchmarkSuiteList(result)
self._current_repeat += 1
if not self._current_repeat % RESULT_NUMBER_BEFORE_FLUSHING:
result.flush()
except StopIteration, e:
self._logger.error(e)
except BaseException, e:
exit_msg = str(e)
exit_status = 1
self._exit_msg_queue.put(exit_msg)
sys.exit(exit_status)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
##############################################################################
#
# Copyright (c) 2011 Nexedi SA and Contributors. All Rights Reserved.
# Arnaud Fontaine <arnaud.fontaine@nexedi.com>
#
# First version: ERP5Mechanize from Vincent Pelletier <vincent@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import argparse
def parseArguments():
parser = argparse.ArgumentParser(
description='Generate reports for ERP5 benchmarking suites.')
parser.add_argument('--enable-debug',
dest='is_debug',
action='store_true',
default=False,
help='Enable debug messages')
parser.add_argument('--filename-prefix',
default='result',
metavar='PREFIX',
help='Filename prefix for results CSV files '
'(default: result)')
parser.add_argument('--output-filename',
default='results.pdf',
metavar='FILENAME',
help='PDF output file (default: results.pdf)')
parser.add_argument('report_directory',
help='Reports directory')
namespace = parser.parse_args()
return namespace
import csv
from .result import BenchmarkResultStatistic
def computeStatisticFromFilenameList(argument_namespace, filename_list):
reader_list = []
stat_list = []
label_list = []
for filename in filename_list:
reader = csv.reader(open(filename, 'rb'), delimiter=',',
quoting=csv.QUOTE_MINIMAL)
reader_list.append(reader)
# Get headers
row_list = reader.next()
if not label_list:
label_list = row_list
for label in label_list:
stat_list.append(BenchmarkResultStatistic(*label.split(': ', 1)))
if row_list != label_list:
raise AssertionError, "ERROR: Result labels: %s != %s" % \
(label_list, row_list)
for row_list in reader:
for idx, row in enumerate(row_list):
stat_list[idx].add(float(row))
return stat_list
def formatFloatList(value_list):
return [ format(value, ".3f") for value in value_list ]
import numpy
import pylab
from matplotlib import pyplot, ticker
def drawBarDiagram(pdf, title, stat_list):
mean_list = []
yerr_list = []
minimum_list = []
maximum_list = []
label_list = []
error_list = []
for stat in stat_list:
mean_list.append(stat.mean)
yerr_list.append(stat.standard_deviation)
minimum_list.append(stat.minimum)
maximum_list.append(stat.maximum)
label_list.append(stat.label)
error_list.append(stat.error_sum)
min_array = numpy.array(minimum_list)
mean_array = numpy.array(mean_list)
max_array = numpy.array(maximum_list)
yerr_lower = numpy.minimum(mean_array - min_array, yerr_list)
yerr_upper = numpy.minimum(max_array - mean_array, yerr_list)
## Draw diagrams
# Create the figure
figure = pyplot.figure(figsize=(11.69, 8.29))
figure.subplots_adjust(bottom=0.13, right=0.98, top=0.95)
pyplot.title(title)
# Create the axes along with their labels
axes = figure.add_subplot(111)
axes.set_ylabel('Seconds')
axes.set_xticks([])
axes.yaxis.set_major_locator(ticker.MultipleLocator(0.5))
axes.yaxis.set_minor_locator(ticker.MultipleLocator(0.25))
axes.yaxis.grid(True, 'major', linewidth=1.5)
axes.yaxis.grid(True, 'minor')
# Create the bars
ind = numpy.arange(len(label_list))
width = 0.33
min_rects = axes.bar(ind, minimum_list, width, color='y', label='Minimum')
avg_rects = axes.bar(ind + width, mean_list, width, color='r', label='Mean')
axes.errorbar(numpy.arange(0.5, len(stat_list)), mean_list,
yerr=[yerr_lower, yerr_upper], fmt=None,
label='Standard deviation')
max_rects = axes.bar(ind + width * 2, maximum_list, width, label='Maximum',
color='g')
# Add the legend of bars
axes.legend(loc=0)
axes.table(rowLabels=['Minimum', 'Average', 'Std. deviation', 'Maximum', 'Errors'],
colLabels=label_list,
cellText=[formatFloatList(minimum_list),
formatFloatList(mean_list),
formatFloatList(yerr_list),
formatFloatList(maximum_list),
error_list],
rowColours=('y', 'r', 'b', 'g', 'w'),
loc='bottom',
colLoc='center',
rowLoc='center',
cellLoc='center')
pdf.savefig()
pylab.close()
def drawConcurrentUsersPlot(pdf, title, nb_users_list, stat_list):
figure = pyplot.figure(figsize=(11.69, 8.29), frameon=False)
figure.subplots_adjust(bottom=0.1, right=0.98, left=0.07, top=0.95)
pyplot.title(title)
pyplot.grid(True, linewidth=1.5)
axes = figure.add_subplot(111)
min_array = numpy.array([stat.minimum for stat in stat_list])
mean_array = numpy.array([stat.mean for stat in stat_list])
max_array = numpy.array([stat.maximum for stat in stat_list])
yerr_list = [stat.standard_deviation for stat in stat_list]
yerr_lower = numpy.minimum(mean_array - min_array, yerr_list)
yerr_upper = numpy.minimum(max_array - mean_array, yerr_list)
axes.plot(nb_users_list, min_array, 'yo-', label='Minimum')
axes.errorbar(nb_users_list,
mean_array,
yerr=[yerr_lower, yerr_upper],
color='r',
ecolor='b',
label='Mean',
elinewidth=2,
fmt='D-',
capsize=10.0)
axes.plot(nb_users_list, max_array, 'gs-', label='Maximum')
axes.yaxis.set_major_locator(ticker.MultipleLocator(0.5))
axes.yaxis.set_minor_locator(ticker.MultipleLocator(0.25))
axes.yaxis.grid(True, 'minor')
axes.xaxis.set_major_locator(ticker.FixedLocator(nb_users_list))
axes.set_xticks(nb_users_list)
axes.legend(loc=0)
axes.set_xlabel('Concurrent users')
axes.set_ylabel('Seconds')
pyplot.xlim(xmin=nb_users_list[0])
pdf.savefig()
pylab.close()
from matplotlib.backends.backend_pdf import PdfPages
import glob
import os
import re
user_re = re.compile('-(\d+)users-')
def generateReport():
argument_namespace = parseArguments()
filename_iter = glob.iglob("%s-*repeat*-*users*-*process*.csv" % os.path.join(
argument_namespace.report_directory,
argument_namespace.filename_prefix))
per_nb_users_report_dict = {}
for filename in filename_iter:
report_dict = per_nb_users_report_dict.setdefault(
int(user_re.search(filename).group(1)), {'filename': []})
report_dict['filename'].append(filename)
pdf = PdfPages(argument_namespace.output_filename)
for nb_users, report_dict in per_nb_users_report_dict.items():
stat_list = computeStatisticFromFilenameList(
argument_namespace, report_dict['filename'])
title = "Ran suites with %d users" % len(report_dict['filename'])
for slice_start_idx in range(0, len(stat_list), 12):
if slice_start_idx != 0:
title += ' (Ctd.)'
drawBarDiagram(pdf, title, stat_list[slice_start_idx:slice_start_idx + 12])
report_dict['stats'] = stat_list
if len(per_nb_users_report_dict) != 1:
for i in range(len(report_dict['stats'])):
stat_list = []
nb_users_list = per_nb_users_report_dict.keys()
for report_dict in per_nb_users_report_dict.values():
stat_list.append(report_dict['stats'][i])
drawConcurrentUsersPlot(
pdf,
"%s from %d to %d users (step: %d)" % (stat_list[0].full_label,
nb_users_list[0],
nb_users_list[-1],
nb_users_list[1] - nb_users_list[0]),
nb_users_list,
stat_list)
pdf.close()
if __name__ == '__main__':
generateReport()
##############################################################################
#
# Copyright (c) 2011 Nexedi SA and Contributors. All Rights Reserved.
# Arnaud Fontaine <arnaud.fontaine@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import sys
import math
import os
import csv
import logging
import signal
class BenchmarkResultStatistic(object):
def __init__(self, suite, label):
self.suite = suite
self.label = label
self.full_label = '%s: %s' % (self.suite, self.label)
self.minimum = sys.maxint
self.maximum = -1
self.n = 0
self.error_sum = 0
# For calculating the mean
self._value_sum = 0
# For calculating the standard deviation
self._variance_sum = 0
self._mean = 0
def add_error(self):
self.error_sum += 1
def add(self, value):
if value < self.minimum:
self.minimum = value
if value > self.maximum:
self.maximum = value
self._value_sum += value
self.n += 1
delta = value - self._mean
self._mean += delta / self.n
self._variance_sum += delta * (value - self._mean)
@property
def mean(self):
return self._value_sum / self.n
@property
def standard_deviation(self):
return math.sqrt(self._variance_sum / self.n)
import abc
class BenchmarkResult(object):
__metaclass__ = abc.ABCMeta
def __init__(self, argument_namespace, nb_users, user_index):
self._argument_namespace = argument_namespace
self._nb_users = nb_users
self._user_index = user_index
self._log_level = self._argument_namespace.enable_debug and \
logging.DEBUG or logging.INFO
self._stat_list = []
self._suite_idx = 0
self._result_idx = 0
self.result_list = []
self._all_result_list = []
self._first_iteration = True
self._current_suite_name = None
self._result_idx_checkpoint_list = []
self.label_list = []
self._logger = None
def getLogger(self):
if not self._logger:
logging.basicConfig(stream=self.log_file, level=self._log_level)
self._logger = logging.getLogger('erp5.util.benchmark')
return self._logger
return self._logger
def __enter__(self):
return self
def enterSuite(self, name):
self._current_suite_name = name
def __call__(self, label, value):
self.result_list.append(value)
if self._first_iteration:
self._stat_list.append(BenchmarkResultStatistic(self._current_suite_name,
label))
self._stat_list[self._result_idx].add(value)
self._result_idx += 1
def getLabelList(self):
return [ stat.full_label for stat in self._stat_list ]
def iterationFinished(self):
self._all_result_list.append(self.result_list)
if self._first_iteration:
self.label_list = self.getLabelList()
self.getLogger().debug("RESULTS: %s" % self.result_list)
self.result_list = []
self._first_iteration = False
self._suite_idx = 0
self._result_idx = 0
def getStatList(self):
return self._stat_list
def getCurrentSuiteStatList(self):
start_index = self._suite_idx and \
self._result_idx_checkpoint_list[self._suite_idx - 1] or 0
return self._stat_list[start_index:self._result_idx]
def exitSuite(self):
if self._first_iteration:
self._result_idx_checkpoint_list.append(self._result_idx)
else:
expected_result_idx = self._result_idx_checkpoint_list[self._suite_idx]
while self._result_idx != expected_result_idx:
self.result_list.append(0)
self._stat_list[self._result_idx].add_error()
self._result_idx += 1
self._suite_idx += 1
@abc.abstractmethod
def flush(self, partial=True):
self._all_result_list = []
@abc.abstractmethod
def __exit__(self, exc_type, exc_value, traceback):
signal.signal(signal.SIGTERM, signal.SIG_IGN)
self.flush(partial=False)
return True
class CSVBenchmarkResult(BenchmarkResult):
def __init__(self, *args, **kwargs):
super(CSVBenchmarkResult, self).__init__(*args, **kwargs)
filename_prefix = self._getFilenamePrefix()
self._result_filename = "%s.csv" % filename_prefix
self._result_filename_path = os.path.join(
self._argument_namespace.report_directory, self._result_filename)
self._log_filename = "%s.log" % filename_prefix
self._log_filename_path = os.path.join(
self._argument_namespace.report_directory, self._log_filename)
self.log_file = open(self._log_filename_path, 'w')
def _getFilenamePrefix(self):
max_nb_users = isinstance(self._argument_namespace.users, int) and \
self._argument_namespace.users or self._argument_namespace.users[1]
fmt = "%%s-%%drepeat-%%0%ddusers-process%%0%dd" % \
(len(str(max_nb_users)), len(str(self._nb_users)))
return fmt % (self._argument_namespace.filename_prefix,
self._argument_namespace.repeat,
self._nb_users,
self._user_index)
def __enter__(self):
self._result_file = open(self._result_filename_path, 'wb')
self._csv_writer = csv.writer(self._result_file, delimiter=',',
quoting=csv.QUOTE_MINIMAL)
return self
def flush(self, partial=True):
if self._result_file.tell() == 0:
self._csv_writer.writerow(self.label_list)
self._csv_writer.writerows(self._all_result_list)
self._result_file.flush()
os.fsync(self._result_file.fileno())
super(CSVBenchmarkResult, self).flush(partial)
def __exit__(self, exc_type, exc_value, traceback):
super(CSVBenchmarkResult, self).__exit__(exc_type, exc_value, traceback)
self._result_file.close()
if exc_type and not issubclass(exc_type, StopIteration):
msg = "An error occured, see: %s" % self._log_filename_path
self.getLogger().error("%s: %s" % (exc_type, exc_value))
raise RuntimeError(msg)
from cStringIO import StringIO
import xmlrpclib
import datetime
class ERP5BenchmarkResult(BenchmarkResult):
def __init__(self, *args, **kwargs):
self.log_file = StringIO()
self._log_buffer_list = []
super(ERP5BenchmarkResult, self).__init__(*args, **kwargs)
def iterationFinished(self):
super(ERP5BenchmarkResult, self).iterationFinished()
# TODO: garbage?
self._log_buffer_list.append(self.log_file.getvalue())
self.log_file.seek(0)
def flush(self, partial=True):
benchmark_result = xmlrpclib.ServerProxy(
self._argument_namespace.erp5_publish_url,
verbose=True,
allow_none=True)
benchmark_result.BenchmarkResult_addResultLineList(
self._argument_namespace.user_tuple[self._user_index][0],
self._argument_namespace.repeat,
self._nb_users,
self._argument_namespace.benchmark_suite_name_list,
self.getLabelList(),
self._all_result_list,
self._log_buffer_list)
super(ERP5BenchmarkResult, self).flush()
def __exit__(self, exc_type, exc_value, traceback):
super(ERP5BenchmarkResult, self).__exit__(exc_type, exc_value, traceback)
@staticmethod
def createResultDocument(publish_url, publish_project, repeat, nb_users):
test_result_module = xmlrpclib.ServerProxy(publish_url,
verbose=True,
allow_none=True)
if isinstance(nb_users, tuple):
nb_users_str = '%d to %d' % nb_users
else:
nb_users_str = '%d' % nb_users
benchmark_result = test_result_module.TestResultModule_addBenchmarkResult(
'%d repeat with %s concurrent users' % (repeat, nb_users_str),
publish_project, ' '.join(sys.argv), datetime.datetime.now())
return benchmark_result['id']
@staticmethod
def closeResultDocument(publish_document_url, error_message_set):
result = xmlrpclib.ServerProxy(publish_document_url,
verbose=True,
allow_none=True)
result.BenchmarkResult_completed(error_message_set and 'FAIL' or 'PASS',
error_message_set)
#!/usr/bin/env python
##############################################################################
#
# Copyright (c) 2011 Nexedi SA and Contributors. All Rights Reserved.
# Arnaud Fontaine <arnaud.fontaine@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from .result import CSVBenchmarkResult
class CSVScalabilityBenchmarkResult(CSVBenchmarkResult):
def flush(self, partial=True):
super(CSVScalabilityBenchmarkResult, self).flush(partial)
self._argument_namespace.notify_method(self._result_filename,
self._result_file.tell(),
partial=partial)
from .performance_tester import PerformanceTester
class ScalabilityTester(PerformanceTester):
def preRun(self, *args, **kwargs):
pass
def postRun(self, error_message_set):
from logging import Formatter
import sys
import urllib
import urllib2
try:
urllib2.urlopen("http://[%s]:%d/report" % \
(self._argument_namespace.manager_address,
self._argument_namespace.manager_port),
urllib.urlencode({'error_message_set': '|'.join(error_message_set)})).close()
except:
print >>sys.stderr, "ERROR: %s" % Formatter().formatException(sys.exc_info())
def getResultClass(self):
if not self._argument_namespace.erp5_publish_url:
return CSVScalabilityBenchmarkResult
return super(ScalabilityTester, self).getResultClass()
from slapos.tool.nosqltester import NoSQLTester
class RunScalabilityTester(NoSQLTester):
def __init__(self):
super(RunScalabilityTester, self).__init__()
def _add_parser_arguments(self, parser):
super(RunScalabilityTester, self)._add_parser_arguments(parser)
ScalabilityTester._add_parser_arguments(parser)
def _parse_arguments(self, parser):
namespace = super(RunScalabilityTester, self)._parse_arguments(parser)
ScalabilityTester._check_parsed_arguments(namespace)
namespace.notify_method = self.send_result_availability_notification
return namespace
def run_tester(self):
ScalabilityTester(self.argument_namespace).run()
def main():
RunScalabilityTester().run()
if __name__ == '__main__':
main()
# -*- coding: utf-8 -*-
##############################################################################
#
# Copyright (c) 2011 Nexedi SA and Contributors. All Rights Reserved.
# Arnaud Fontaine <arnaud.fontaine@nexedi.com>
#
# First version: ERP5Mechanize from Vincent Pelletier <vincent@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import logging
import sys
from urlparse import urljoin
from z3c.etestbrowser.browser import ExtendedTestBrowser
from zope.testbrowser.browser import onlyOne
def measurementMetaClass(prefix):
"""
Prepare a meta class where the C{prefix} is used to select for which
methods measurement methods will be added automatically.
@param prefix:
@type prefix: str
@return: The measurement meta class corresponding to the prefix
@rtype: type
"""
class MeasurementMetaClass(type):
"""
Meta class to automatically wrap methods whose prefix starts with
C{prefix}, and also to define C{lastRequestSeconds} on other classes
besides of Browser.
"""
def __new__(metacls, name, bases, dictionary):
def timeInSecondDecorator(method):
def wrapper(self, *args, **kwargs):
"""
Replaced by the wrapped method docstring. Some methods return the
time spent on waiting (C{submitSelectJump} and for example), thus
return a tuple with the time spent on the request and the time spent
on waiting
"""
ret = method(self, *args, **kwargs)
if ret is None:
return self.lastRequestSeconds
else:
return (self.lastRequestSeconds, ret)
return wrapper
def applyMeasure(method):
"""
Inner function to wrap timed methods to automatically return the time
spent on the request
@param method: Instance method to be called
@type method: function
"""
wrapper_method = timeInSecondDecorator(method)
wrapper_method.func_name = method.func_name
wrapper_method.__doc__ = method.__doc__
# In order to avoid re-wrapping the method when looking at the bases
# for example
wrapper_method.__is_wrapper__ = True
dictionary[method.func_name] = wrapper_method
# Only wrap methods prefixed by the given prefix
for attribute_name, attribute in dictionary.items():
if attribute_name.startswith(prefix) and callable(attribute):
applyMeasure(attribute)
# And also create these methods by looking at the bases
for attribute_name in dir(bases[0]):
if attribute_name not in dictionary and \
attribute_name.startswith(prefix):
attribute = getattr(bases[0], attribute_name)
if callable(attribute) and not getattr(attribute, '__is_wrapper__', False):
applyMeasure(attribute)
# lastRequestSeconds properties are only defined on classes inheriting
# from zope.testbrowser.browser.Browser, so create these properties for
# all other classes too
if 'Browser' not in bases[0].__name__:
time_method = lambda self: self.browser.lastRequestSeconds
time_method.func_name = 'lastRequestSeconds'
time_method.__doc__ = Browser.lastRequestSeconds.__doc__
dictionary['lastRequestSeconds'] = property(time_method)
return super(MeasurementMetaClass,
metacls).__new__(metacls, name, bases, dictionary)
return MeasurementMetaClass
class Browser(ExtendedTestBrowser):
"""
Implements mechanize tests specific to an ERP5 environment through
U{ExtendedTestBrowser<http://pypi.python.org/pypi/z3c.etestbrowser>}
(providing features to parse XML and access elements using XPATH)
using U{zope.testbrowser<http://pypi.python.org/pypi/zope.testbrowser>}
(providing benchmark and testing features on top of
U{mechanize<http://wwwsearch.sourceforge.net/mechanize/>}).
@todo:
- getFormulatorFieldValue
"""
__metaclass__ = measurementMetaClass(prefix='open')
def __init__(self,
base_url,
erp5_site_id,
username,
password,
log_file=None,
is_debug=False,
is_legacy_listbox=False):
"""
Create a browser object, allowing to log in right away with the
given username and password. The base URL must contain an I{/} at
the end.
@param base_url: Base HTTP URL
@type base_url: str
@param erp5_site_id: ERP5 site name
@type erp5_site_id: str
@param username: Username to be used to log into ERP5
@type username: str
@param password: Password to be used to log into ERP5
@param log_file: Log file object (stderr if none given)
@type log_file: file
@param is_debug: Enable or disable debugging (disable by default)
@type is_debug: bool
@param is_legacy_listbox: Use legacy listbox
@type is_legacy_listbox: bool
"""
# Meaningful to re-create the MainForm class every time the page
# has been changed
self._main_form = None
assert base_url[-1] == '/'
self._base_url = base_url
self._erp5_site_id = erp5_site_id
self._erp5_base_url = urljoin(self._base_url, self._erp5_site_id) + '/'
self._username = username
self._password = password
# Only display WARNING message if debugging is not enabled
logging_level = is_debug and logging.DEBUG or logging.WARNING
stream = log_file and log_file or sys.stderr
logging.basicConfig(stream=stream, level=logging_level)
self._logger = logging.getLogger('erp5.util.test_browser')
self._is_legacy_listbox = is_legacy_listbox
self._is_logged_in = False
super(Browser, self).__init__()
self.login()
def open(self, url_or_path=None, data=None):
"""
Open a relative (to the ERP5 base URL) or absolute URL. If the
given URL is not given, then it will open the home ERP5 page.
@param url_or_path: Relative or absolute URL
@type url_or_path: str
"""
# In case url_or_path is an absolute URL, urljoin() will return
# it, otherwise it is a relative path and will be concatenated to
# ERP5 base URL
absolute_url = urljoin(self._erp5_base_url, url_or_path)
self._logger.debug("Opening url: " + absolute_url)
super(Browser, self).open(absolute_url, data)
def login(self, force=False):
"""
Log in only if not already logged in unless explicitely specified
to do so.
@param force: Log in even if already logged in
@type force: bool
"""
if force or not self._is_logged_in:
try:
url_before_login = self.url
except:
url_before_login = None
self.open('login_form')
self.mainForm.submitLogin()
# Go back to the page before trying to log in if any URL, or to
# the homepage otherwise
self.open(url_before_login)
def getCookieValue(self, name, default=None):
"""
Get the cookie value of the given cookie name.
@param name: Name of the cookie
@type name: str
@param default: Fallback value if the cookie was not found
@type default: str
@return: Cookie value
@rtype: str
"""
for cookie_name, cookie_value in self.cookies.iteritems():
if name == cookie_name:
return cookie_value
return default
@property
def mainForm(self):
"""
Get the ERP5 main form of the current page. ERP5 generally use
only one form (whose C{id} is C{main_form}) for all the controls
within a page. A Form instance is returned including helper
methods specific to ERP5.
@return: The main Form class instance
@rtype: Form
@raise LookupError: The main form could not be found.
@todo: Perhaps the page could be parsed to generate a class with
only appropriate methods, but that would certainly be an
huge overhead for little benefit...
@todo: Patch zope.testbrowser to allow the class to be given
rather than duplicating the code
"""
# If the page has not changed, no need to re-create a class, so
# just return the main_form instance
if self._main_form and self._counter == self._main_form._browser_counter:
return self._main_form
main_form = None
for form in self.mech_browser.forms():
if form.attrs.get('id') == 'main_form':
main_form = form
if not main_form:
raise LookupError("Could not get 'main_form'")
self.mech_browser.form = form
self._main_form = ContextMainForm(self, form)
return self._main_form
def getLink(self, text=None, url=None, id=None, index=0,
class_attribute=None):
"""
Override original C{getLink} allowing to not consider the HTTP
query string unless it is explicitly given.
Also, allows to select a link by its class attribute, which
basically look for the first element whose C{attribute} is
C{class_attribute} then call C{getLink} with the element C{href}.
@param class_attribute: Get the link with this class
@type class_attribute: str
@param args: Positional arguments given to original C{getLink}
@type args: list
@param kwargs: Keyword arguments given to original C{getLink}
@type kwargs: dict
"""
if class_attribute:
element_list = self.etree.xpath('//a[contains(@class, "%s")]' % \
class_attribute)
try:
url = element_list[0].get('href')
except (IndexError, AttributeError):
url = None
if not url:
raise LookupError("Could not find any link whose class is '%s'" % \
class_attribute)
elif url and '?' not in url:
url += '?'
if id is not None:
def predicate(link):
return dict(link.attrs).get('id') == id
args = {'predicate': predicate}
else:
import re
from zope.testbrowser.browser import RegexType
if isinstance(text, RegexType):
text_regex = text
elif text is not None:
text_regex = re.compile(re.escape(text), re.DOTALL)
else:
text_regex = None
if isinstance(url, RegexType):
url_regex = url
elif url is not None:
url_regex = re.compile(re.escape(url), re.DOTALL)
else:
url_regex = None
args = {'text_regex': text_regex, 'url_regex': url_regex}
args['nr'] = index
return LinkWithTime(self.mech_browser.find_link(**args), self)
def getImportExportLink(self):
"""
Get Import/Export link. Use the class attribute rather than the
name as the latter is dependent on the context.
@return: The link whose class is C{report}
@rtype: Link
@todo: Should perhaps be a ContextBrowser class?
"""
return self.getLink(class_attribute='import_export')
def getFastInputLink(self):
"""
Get Fast Input link. Use the class attribute rather than the name
as the latter is dependent on the context.
@return: The link whose class is C{fast_input}
@rtype: Link
@todo: Should perhaps be a ContextBrowser class?
"""
return self.getLink(class_attribute='fast_input')
def getTransitionMessage(self):
"""
Parse the current page and returns the value of the portal_status
message.
@return: The transition message
@rtype: str
@raise LookupError: Not found
"""
try:
return self.etree.xpath('//div[@id="transition_message"]')[0].text
except IndexError:
raise LookupError("Cannot find div with ID 'transition_message'")
def getInformationArea(self):
"""
Parse the current page and returns the value of the information_area
message.
@return: The information area message
@rtype: str
@raise LookupError: Not found
"""
try:
return self.etree.xpath('//div[@id="information_area"]')[0].text
except IndexError:
raise LookupError("Cannot find div with ID 'information_area'")
_listbox_table_xpath_str = '//table[contains(@class, "listbox-table")]'
_legacy_listbox_table_xpath_str = '//div[contains(@class, "listbox")]'\
'//table'
def getListboxLink(self, line_number, column_number, cell_element_index=1,
*args, **kwargs):
"""
Follow the link at the given position, excluding any link whose
class is hidden. In case there are several links within a cell,
C{cell_element_index} allows to select which one to get (starting
from 1).
@param line_number: Line number of the link
@type line_number: int
@param column_number: Column number of the link
@type column_number: int
@param cell_element_index: Index of the link to be selected in the cell
@type cell_element_index: int
@param args: positional arguments given to C{getLink}
@type args: list
@param kwargs: keyword arguments given to C{getLink}
@type kwargs: dict
@return: C{Link} at the given line and column number
@rtype: L{zope.testbrowser.interfaces.ILink}
@raise LookupError: No link could be found at the given position
and cell indexes
"""
if self._is_legacy_listbox:
listbox_basic_xpath_str = self._legacy_listbox_table_xpath_str
else:
listbox_basic_xpath_str = self._listbox_table_xpath_str
# With XPATH, the position is context-dependent, therefore, as
# there the cells are either within a <thead> or <tbody>, the line
# number must be shifted by the number of header lines (namely 2)
if line_number <= 2:
relative_line_number = line_number
if self._is_legacy_listbox:
column_type = 'td'
else:
column_type = 'th'
else:
if self._is_legacy_listbox:
relative_line_number = line_number
else:
relative_line_number = line_number - 2
column_type = 'td'
xpath_str = '%s//tr[%d]//%s[%d]//a[not(contains(@class, "hidden"))][%d]' % \
(listbox_basic_xpath_str,
relative_line_number,
column_type,
column_number,
cell_element_index)
# xpath() method always return a list even if there is only one element
element_list = self.etree.xpath(xpath_str)
try:
link_href = element_list[0].get('href')
except (IndexError, AttributeError):
link_href = None
if not link_href:
raise LookupError("Could not find link in listbox cell %dx%d (index=%d)" %\
(line_number, column_number, cell_element_index))
return self.getLink(url=link_href, *args, **kwargs)
def getListboxPosition(self,
text,
column_number=None,
line_number=None,
strict=False):
"""
Returns the position number of the first line containing given
text in given column or line number (starting from 1).
@param text: Text to search
@type text: str
@param column_number: Look into all the cells of this column
@type column_number: int
@param line_number: Look into all the cells of this line
@type line_number: int
@param strict: Should given text matches exactly
@type strict: bool
@return: The cell position
@rtype: int
@raise LookupError: Not found
"""
# Require either column_number or line_number to be given
onlyOne([column_number, line_number], '"column_number" and "line_number"')
if self._is_legacy_listbox:
listbox_basic_xpath_str = self._legacy_listbox_table_xpath_str
else:
listbox_basic_xpath_str = self._listbox_table_xpath_str
# Get all cells in the column (if column_number is given and
# including header columns) or line (if line_number is given)
if column_number:
xpath_str_fmt = listbox_basic_xpath_str + '//tr//%%s[%d]' % \
column_number
if self._is_legacy_listbox:
column_or_line_xpath_str = xpath_str_fmt % 'td'
else:
column_or_line_xpath_str = "%s | %s" % (xpath_str_fmt % 'th',
xpath_str_fmt % 'td')
else:
listbox_basic_xpath_str = self._listbox_table_xpath_str
# With XPATH, the position is context-dependent, therefore, as
# there the cells are either within a <thead> or <tbody>, the
# line number must be shifted by the number of header lines
# (namely 2)
if line_number <= 2:
relative_line_number = line_number
if self._is_legacy_listbox:
column_type = 'td'
else:
column_type = 'th'
else:
if self._is_legacy_listbox:
relative_line_number = line_number
else:
relative_line_number = line_number - 2
column_type = 'td'
column_or_line_xpath_str = listbox_basic_xpath_str + '//tr[%d]//%s' %\
(relative_line_number, column_type)
cell_list = self.etree.xpath(column_or_line_xpath_str)
# Iterate over the cells list until one the children content
# matches the expected text
for position, cell in enumerate(cell_list):
for child in cell.iterchildren():
if not child.text:
continue
if (strict and child.text == text) or \
(not strict and text in child.text):
return position + 1
raise LookupError("No matching cell with value '%s'" % text)
def getRemainingActivityCounter(self):
"""
Return the number of remaining activities, but do not visit the
URL so it does not interfere with next calls.
@return: The number of remaining activities
@rtype: int
"""
self._logger.debug("Checking the number of remaining activities")
activity_counter = self.mech_browser.open_novisit(
self._erp5_base_url + 'portal_activities/countMessage').read()
activity_counter = activity_counter and int(activity_counter) or 0
self._logger.debug("Remaining activities: %d" % activity_counter)
return activity_counter
from zope.testbrowser.browser import Form, ListControl
class LoginError(Exception):
"""
Exception raised when login fails
"""
pass
class MainForm(Form):
"""
Class defining convenient methods for the main form of ERP5. All the
methods specified are those always found in an ERP5 page in contrary
to L{ContextMainForm}.
"""
__metaclass__ = measurementMetaClass(prefix='submit')
def submit(self, label=None, name=None, class_attribute=None, index=None,
*args, **kwargs):
"""
Overriden for logging purpose, and for specifying a default index
to 0 if not set, thus avoiding AmbiguityError being raised (in
ERP5 there may be several submit fields with the same name).
Also, allows to select a submit by its class attribute, which
basically look for the first element whose C{attribute} is
C{class_attribute} then call C{submit} with the element C{name}.
@param class_attribute: Submit according to the class attribute
@type class_attribute: str
@raise LookupError: Could not find any element matching the given
class attribute name, if class_attribute
parameter is given.
"""
self.browser._logger.debug(
"Submitting (name='%s', label='%s', class='%s')" % (name, label,
class_attribute))
if class_attribute:
element_list = self.browser.etree.xpath('//*[contains(@class, "%s")]' % \
class_attribute)
try:
name = element_list[0].get('name')
except (IndexError, AttributeError):
name = None
if not name:
raise LookupError("Could not find any button whose class is '%s'" % \
class_attribute)
if label is None and name is None:
super(MainForm, self).submit(label=label, name=name, *args, **kwargs)
else:
if index is None:
index = 0
super(MainForm, self).submit(label=label, name=name, index=index,
*args, **kwargs)
def submitSelect(self, select_name, submit_name, label=None, value=None,
select_index=None, control_index=None):
"""
Get the select control whose name attribute is C{select_name},
then select the option control specified either by its C{label} or
C{value} within that select control, and finally submit it using
the submit control whose name attribute is C{submit_name}.
The C{value} matches an option value if found at the end of the
latter (excluding the query string), for example a search for
I{/logout} will match I{/erp5/logout} and I{/erp5/logout?foo=bar}
(if and only if C{value} contains no query string) but not
I{/erp5/logout_bar}.
Label value is searched as case-sensitive whole words within the
labels for each item--that is, a search for I{Add} will match
I{Add a contact} but not I{Address}. A word is defined as one or
more alphanumeric characters or the underline.
C{select_index} and C{control_index} have the same meaning as in
zope.testbrowser, namely to select a particular select or control
when the C{label} or C{value} is ambiguous.
@param select_name: Select control name
@type select_name: str
@param submit_name: Submit control name
@type submit_name: str
@param label: Label of the option control
@type label: str
@param value: Value of the option control
@type value: str
@param select_index: Index of the select if multiple matches
@type select_index: int
@param control_index: Index of the control if multiple matches
@type control_index: int
@raise LookupError: The select, option or submit control could not
be found
"""
select_control = self.getControl(name=select_name, index=select_index)
# zope.testbrowser checks for a whole word but it is also useful
# to match the end of the option control value string because in
# ERP5, the value could be URL (such as 'http://foo:81/erp5/logout')
if value:
for item in select_control.options:
if '?' not in value:
item = item.split('?')[0]
if item.endswith(value):
value = item
break
self.browser._logger.debug("select_id='%s', label='%s', value='%s'" % \
(select_name, label, value))
select_control.getControl(label=label, value=value,
index=control_index).selected = True
self.submit(name=submit_name)
def submitLogin(self):
"""
Log into ERP5 using the username and password provided in the
browser. It is assumed that the current page is the login page (by
calling C{open('login_form')} beforehand).
This method should rarely be used by scripts as login is already
performed upon instanciation of Browser class.
@raise LoginError: Login failed
@todo: Use information sent back as headers rather than looking
into the page content?
"""
self.browser._logger.debug("Logging in: username='%s', password='%s'" % \
(self.browser._username, self.browser._password))
self.getControl(name='__ac_name').value = self.browser._username
self.getControl(name='__ac_password').value = self.browser._password
self.submit()
if 'Logged In as' not in self.browser.contents:
raise LoginError("%s: Could not log in as '%s:%s'" % \
(self.browser._erp5_base_url,
self.browser._username,
self.browser._password))
self.browser._is_logged_in = True
def submitSelectFavourite(self, label=None, value=None, **kw):
"""
Select and submit a favourite, given either by its label (such as
I{Log out}) or value (I{/logout}). See L{submitSelect}.
"""
self.submitSelect('select_favorite', 'Base_doFavorite:method', label, value,
**kw)
def submitSelectModule(self, label=None, value=None, **kw):
"""
Select and submit a module, given either by its label (such as
I{Currencies}) or value (such as I{/glossary_module}). See
L{submitSelect}.
"""
self.submitSelect('select_module', 'Base_doModule:method', label, value,
**kw)
def submitSelectLanguage(self, label=None, value=None, **kw):
"""
Select and submit a language, given either by its label (such as
I{English}) or value (such as I{en}). See L{submitSelect}.
"""
self.submitSelect('select_language', 'Base_doLanguage:method', label, value)
def submitSearch(self, search_text):
"""
Fill search field with C{search_text} and submit it.
@param search_text: Text to search
@type search_text: str
"""
self.getControl(name='field_your_search_text').value = search_text
self.submit(name='ERP5Site_viewQuickSearchResultList:method')
def submitLogout(self):
"""
Perform logout.
"""
self.submitSelectFavourite(value='/logout')
import time
class ContextMainForm(MainForm):
"""
Class defining context-dependent convenient methods for the main
form of ERP5.
@todo:
- doListboxAction
- doContextListMode
- doContextSearch
- doContextSort
- doContextConfigure
- doContextButton
- doContextReport
- doContextExchange
"""
def submitSelectJump(self, label=None, value=None,
no_jump_transition_message=None,
maximum_attempt_number=1, sleep_between_attempt=0,
**kw):
"""
Select and submit a jump, given either by its label (such as
I{Queries}) or value (such as
I{/person_module/Base_jumpToRelatedObject?portal_type=Foo}). See
L{submitSelect}.
Usually, a transition message will be displayed if it was not possible to
jump (for example because the object has not been created yet), therefore
the number of attempts before failing can be specified if necessary.
@param no_jump_transition_message: Transition message displayed if the
jump could not be performed
@type no_jump_transition_message: str
@param maximum_attempt_number: Number of attempts before failing
@type maximum_attempt_number: int
@param sleep_between_attempt: Sleep N seconds between attempts
@type sleep_between_attempt: int
"""
if not no_jump_transition_message:
self.submitSelect('select_jump', 'Base_doJump:method',
label, value, **kw)
else:
current_attempt_counter = 0
while current_attempt_counter != maximum_attempt_number:
self.browser.mainForm.submitSelect('select_jump', 'Base_doJump:method',
label, value, **kw)
if no_jump_transition_message != self.browser.getTransitionMessage():
return current_attempt_counter * sleep_between_attempt
time.sleep(sleep_between_attempt)
current_attempt_counter += 1
raise AssertionError("Could not jump to related object")
def submitSelectAction(self, label=None, value=None, **kw):
"""
Select and submit an action, given either by its label (such as
I{Add Person}) or value (such as I{add} and I{add Person}). See
L{submitSelect}.
"""
self.submitSelect('select_action', 'Base_doAction:method', label, value,
**kw)
def submitCut(self):
"""
Cut the previously selected objects.
"""
self.submit(name='Folder_cut:method')
def submitCopy(self):
"""
Copy the previously selected objects.
"""
self.submit(name='Folder_copy:method')
def submitPaste(self):
"""
Paste the previously selected objects.
"""
self.submit(name='Folder_paste:method')
def submitPrint(self):
"""
Print the previously selected objects. Use the class attribute
rather than the name as the latter is dependent on the context.
"""
self.submit(class_attribute='print')
def submitReport(self):
"""
Create a report. Use the class attribute rather than the name as
the latter is dependent on the context.
"""
self.submit(class_attribute='report')
def submitNew(self):
"""
Create a new object.
"""
self.submit(name='Folder_create:method')
def submitDelete(self):
"""
Delete the previously selected objects.
"""
self.submit(name='Folder_deleteObjectList:method')
def submitSave(self):
"""
Save the previously selected objects.
"""
self.submit(name='Base_edit:method')
def submitShow(self):
"""
Show the previously selected objects.
"""
self.submit(name='Folder_show:method')
def submitFilter(self):
"""
Filter the objects.
"""
self.submit(name='Folder_filter:method')
def submitAction(self):
"""
Select/unselect objects.
"""
self.submit(name='Base_doSelect:method')
def submitSelectWorkflow(self, label=None, value=None,
script_id='viewWorkflowActionDialog',
maximum_attempt_number=1, sleep_between_attempt=0,
**kw):
"""
Select and submit a workflow action, given either by its label
(such as I{Create User}) or value (such as I{create_user_action}
in I{/Person_viewCreateUserActionDialog?workflow_action=create_user_action},
with C{script_id=Person_viewCreateUserActionDialog}). See L{submitSelect}.
When validating an object, L{submitDialogConfirm} allows to
perform the validation required on the next page.
As the Workflow action may not be available yet, it is possible to set the
maximum number of attempts and the sleep duration between each attempt.
@param script_id: Script identifier
@type script_id: str
@param maximum_attempt_number: Number of attempts before failing
@type maximum_attempt_number: int
@param sleep_between_attempt: Sleep N seconds between attempts
@type sleep_between_attempt: int
"""
def tryLegacyAndNew():
try:
self.browser.mainForm.submitSelect(
'select_action', 'Base_doAction:method', label,
value and '%s?workflow_action=%s' % (script_id, value), **kw)
except LookupError:
self.browser.mainForm.submitSelect(
'select_action', 'Base_doAction:method', label,
value and '%s?field_my_workflow_action=%s' % (script_id, value), **kw)
if maximum_attempt_number == 1:
tryLegacyAndNew()
else:
current_attempt_number = 1
while True:
try:
tryLegacyAndNew()
except LookupError:
if current_attempt_number == maximum_attempt_number:
raise
current_attempt_number += 1
time.sleep(sleep_between_attempt)
else:
break
return (current_attempt_number - 1) * sleep_between_attempt
def submitDialogCancel(self):
"""
Cancel the dialog action. A dialog is showed when validating a
workflow or deleting an object for example.
"""
self.submit(name='Base_cancel:method')
def submitDialogUpdate(self):
"""
Update the dialog action. A dialog may contain a button to update
the form before confirming it. See L{submitDialogConfirm} as well.
"""
self.submit(name='Base_showUpdateDialog:method')
def submitDialogConfirm(self):
"""
Confirm the dialog action. A dialog is showed when validating a
workflow or deleting an object for example.
@todo: Specifying index is kind of ugly (there is C{dummy} field
with the same name though)
"""
self.submit(name='Base_callDialogMethod:method')
def getListboxControl(self, line_number, column_number, cell_element_index=1,
*args, **kwargs):
"""
Get the control located at line and column numbers (both starting
from 1), excluding hidden control and those whose class is hidden
too. The position of a cell from a column or line number can be
obtained through calling
L{erp5.util.test_browser.browser.Browser.getListboxPosition}.
Also, there may be several elements within a cell, thus
C{cell_element_index} allows to select which one to get (starting
from 1).
@param line_number: Line number of the field
@type line_number: int
@param column_number: Column number of the field
@type column_number: int
@param cell_element_index: Index of the control to be selected in the cell
@type cell_element_index: int
@param args: positional arguments given to the parent C{getControl}
@type args: list
@param kwargs: keyword arguments given to the parent C{getControl}
@type kwargs: dict
@return: The control found at the given line and column numbers
@rtype: L{zope.testbrowser.interfaces.IControl}
@raise LookupError: No control could be found at the given
position and cell indexes
"""
if self.browser._is_legacy_listbox:
listbox_basic_xpath_str = self.browser._legacy_listbox_table_xpath_str
else:
listbox_basic_xpath_str = self.browser._listbox_table_xpath_str
if line_number <= 2:
relative_line_number = line_number
if self.browser._is_legacy_listbox:
column_type = 'td'
else:
column_type = 'th'
else:
if self.browser._is_legacy_listbox:
relative_line_number = line_number
else:
relative_line_number = line_number - 2
column_type = 'td'
xpath_str = '%s//tr[%d]//%s[%d]/*[not(@type="hidden") and ' \
'not(contains(@class, "hidden"))][%d]' % \
(listbox_basic_xpath_str,
relative_line_number,
column_type,
column_number,
cell_element_index)
# xpath() method always return a list even if there is only one element
element_list = self.browser.etree.xpath(xpath_str)
try:
input_element = element_list[0]
input_name = input_element.get('name')
except (IndexError, AttributeError):
input_element = input_name = None
if input_element is None or not input_name:
raise LookupError("Could not find control in listbox cell %dx%d (index=%d)" %\
(line_number, column_number, cell_element_index))
control = self.getControl(name=input_element.get('name'), *args, **kwargs)
# If this is a list control (radio button, checkbox or select
# control), then get the item from its value
if isinstance(control, ListControl):
control = control.getControl(value=input_element.get('value'))
return control
from zope.testbrowser.browser import Link
class LinkWithTime(Link):
"""
Only define to wrap click methods to measure the time spent
"""
__metaclass__ = measurementMetaClass(prefix='click')
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from erp5.util.test_browser.browser import Browser
ITERATION = 20
def benchmarkAddPerson(iteration_counter, result_dict):
"""
Benchmark adding a person.
"""
# Create a browser instance
browser = Browser('http://localhost:18080/', 'erp5',
username='zope', password='zope')
# Open ERP5 homepage
browser.open()
# Go to Persons module (person_module)
browser.mainForm.submitSelectModule(value='/person_module')
# Create a new person and record the time elapsed in seconds
result_dict.setdefault('Create', []).append(
browser.mainForm.submitNew())
# Check whether it has been successfully created
assert browser.getTransitionMessage() == 'Object created.'
# Fill the first and last name of the newly created person
browser.mainForm.getControl(name='field_my_first_name').value = 'Foo%d' % \
iteration_counter
browser.mainForm.getControl(name='field_my_last_name').value = 'Bar%d' % \
iteration_counter
# Submit the changes, record the time elapsed in seconds
result_dict.setdefault('Save', []).append(
browser.mainForm.submitSave())
# Check whether the changes have been successfully updated
assert browser.getTransitionMessage() == 'Data updated.'
# Validate the person (as the workflow action may not be available yet, try
# 5 times and sleep 5s between each attempts before failing) and record
# time spent on confirmation
browser.mainForm.submitSelectWorkflow(value='validate_action',
maximum_attempt_number=5,
sleep_between_attempt=5)
result_dict.setdefault('Validate', []).append(
browser.mainForm.submitDialogConfirm())
# Check whether it has been successfully validated
assert browser.getTransitionMessage() == 'Status changed.'
## Go to the new person from the Persons module, showing how to use
## listbox API
# Go to Persons module first (person_module)
browser.mainForm.submitSelectModule(value='/person_module')
# Select all the persons whose Usual Name starts with Foo
browser.mainForm.getListboxControl(2, 2).value = 'Foo%'
result_dict.setdefault('Filter', []).append(
browser.mainForm.submit())
# Get the line number
line_number = browser.getListboxPosition("Foo%(counter)d Bar%(counter)d" % \
{'counter': iteration_counter},
column_number=2)
# From the column and line_number, we can now get the Link instance
link = browser.getListboxLink(line_number=line_number, column_number=2)
# Click on the link
link.click()
assert browser.mainForm.getControl(name='field_my_first_name').value == \
'Foo%d' % iteration_counter
if __name__ == '__main__':
# Run benchmarkAddPerson ITERATION times and compute the average time it
# took for each operation
result_dict = {}
counter = 0
while counter != ITERATION:
benchmarkAddPerson(counter, result_dict)
counter += 1
for title, time_list in result_dict.iteritems():
print "%s: %.4fs" % (title, float(sum(time_list)) / ITERATION)
...@@ -39,6 +39,10 @@ setup(name=name, ...@@ -39,6 +39,10 @@ setup(name=name,
], ],
extras_require={ extras_require={
'testnode': ['slapos.core', 'xml_marshaller'], 'testnode': ['slapos.core', 'xml_marshaller'],
'test_browser': ['zope.testbrowser >= 3.11.1', 'z3c.etestbrowser'],
'benchmark': [name+'[test_browser]'],
'benchmark-report': [name+'[benchmark]', 'matplotlib', 'numpy'],
'scalability_tester': [name+'[benchmark]', 'slapos.tool.nosqltester'],
}, },
zip_safe=True, zip_safe=True,
packages=find_packages(), packages=find_packages(),
...@@ -46,6 +50,9 @@ setup(name=name, ...@@ -46,6 +50,9 @@ setup(name=name,
entry_points={ entry_points={
'console_scripts': [ 'console_scripts': [
'testnode = erp5.util.testnode:main [testnode]', 'testnode = erp5.util.testnode:main [testnode]',
'performance_tester_erp5 = erp5.util.benchmark.performance_tester:main [benchmark]',
'scalability_tester_erp5 = erp5.util.benchmark.scalability_tester:main [scalability_tester]',
'generate_erp5_tester_report = erp5.util.benchmark.report:generateReport [benchmark-report]',
], ],
} }
) )
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment