Polish seleniumrunner, use work from ERP5

parent 9603db97
##############################################################################
#
# Copyright (c) 2011 Nexedi SARL and Contributors. All Rights Reserved.
# Kazuhiko <kazuhiko@nexedi.com>
# Rafael Monnerat <rafael@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
# XXX-Cedric : This is an import of
# http://git.erp5.org/gitweb/erp5.git/blob/HEAD:/product/ERP5Type/tests/ERP5TypeFunctionalTestCase.py
# Modification of the present file should be ported back to this original file.
import os
import time
import signal
import re
from subprocess import Popen, PIPE
import shutil
#from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase, \
# _getConversionServerDict
# REGEX FOR ZELENIUM TESTS
TEST_PASS_RE = re.compile('<th[^>]*>Tests passed</th>\n\s*<td[^>]*>([^<]*)')
TEST_FAILURE_RE = re.compile('<th[^>]*>Tests failed</th>\n\s*<td[^>]*>([^<]*)')
IMAGE_RE = re.compile('<img[^>]*?>')
TEST_ERROR_TITLE_RE = re.compile('(?:error.gif.*?>|title status_failed"><td[^>]*>)([^>]*?)</td></tr>', re.S)
TEST_RESULT_RE = re.compile('<div style="padding-top: 10px;">\s*<p>\s*'
'<img.*?</div>\s.*?</div>\s*', re.S)
TEST_ERROR_RESULT_RE = re.compile('.*(?:error.gif|title status_failed).*', re.S)
ZELENIUM_BASE_URL = "%s/portal_tests/%s/core/TestRunner.html?test=../test_suite_html&auto=on&resultsUrl=%s/portal_tests/postResults&__ac_name=%s&__ac_password=%s"
tests_framework_home = os.path.dirname(os.path.abspath(__file__))
# handle 'system global' instance
if tests_framework_home.startswith('/usr/lib'):
real_instance_home = '/var/lib/erp5'
else:
real_instance_home = os.path.sep.join(
tests_framework_home.split(os.path.sep)[:-3])
instance_home = os.path.join(real_instance_home, 'unit_test')
bt5_dir_list = ','.join([
os.path.join(instance_home, 'Products/ERP5/bootstrap'),
os.path.join(instance_home, 'bt5')])
class TimeoutError(Exception):
pass
class Xvfb:
def __init__(self, xvfb_location, fbdir):
self.display_list = [":%s" % i for i in range(123, 144)]
self.display = None
self.fbdir = fbdir
self.pid = None
self.xvfb_location = xvfb_location
def _runCommand(self, display):
command = [self.xvfb_location, '-fbdir' , self.fbdir, display]
self.process = Popen(" ".join(command),
stdout=PIPE,
stderr=PIPE,
shell=True,
close_fds=True)
def run(self):
for display_try in self.display_list:
lock_filepath = '/tmp/.X%s-lock' % display_try.replace(":", "")
if not os.path.exists(lock_filepath):
self._runCommand(display_try)
self.display = display_try
break
#display = os.environ.get('DISPLAY')
#if display:
# auth = Popen(['xauth', 'list', display], stdout=PIPE).communicate()[0]
# if auth:
# (displayname, protocolname, hexkey) = auth.split()
# Popen(['xauth', 'add', 'localhost/unix:%s' % display, protocolname, hexkey])
print 'Xvfb : %d' % self.process.pid
print 'Take screenshots using xwud -in %s/Xvfb_screen0' % self.fbdir
def quit(self):
if hasattr(self, 'process'):
process_pid = self.process.pid
try:
self.process.terminate()
finally:
if process_pid:
print "Stopping Xvfb on pid: %s" % self.pid
os.kill(process_pid, signal.SIGTERM)
class Browser:
use_xvfb = 1
def __init__(self, bin_location, profile_dir, host):
self.bin_location = bin_location
self.profile_dir = profile_dir
self.host = host
self.pid = None
def quit(self):
if self.pid:
os.kill(self.pid, signal.SIGTERM)
def _run(self, url, display):
""" This method should be implemented on a subclass """
raise NotImplementedError
def _setEnviron(self):
pass
def run(self, url, display):
self.clean()
self._setEnviron()
self._setDisplay(display)
self._run(url)
print "Browser %s running on pid: %s" % (self.__class__.__name__, self.pid)
def clean(self):
""" Clean up removing profile dir and recreating it"""
os.system("rm -rf %s" % self.profile_dir)
os.mkdir(self.profile_dir)
def _createFile(self, filename, content):
file_path = os.path.join(self.profile_dir, filename)
with open(file_path, 'w') as f:
f.write(content)
return file_path
def _setDisplay(self, display):
if display is None:
try:
shutil.copy2(os.path.expanduser('~/.Xauthority'), '%s/.Xauthority' % self.profile_dir)
except IOError:
pass
else:
os.environ["DISPLAY"] = display
def _runCommand(self, command_tuple):
print " ".join(list(command_tuple))
self.pid = os.spawnlp(os.P_NOWAIT, *command_tuple)
class Firefox(Browser):
""" Use firefox to open run all the tests"""
def _setEnviron(self):
os.environ['MOZ_NO_REMOTE'] = '1'
os.environ['HOME'] = self.profile_dir
os.environ['LC_ALL'] = 'C'
os.environ["MOZ_CRASHREPORTER_DISABLE"] = "1"
os.environ["NO_EM_RESTART"] = "1"
def _run(self, url):
# Prepare to run
self._createFile('prefs.js', self.getPrefJs())
self._runCommand((self.bin_location, "firefox", "-no-remote",
"-profile", self.profile_dir, url))
os.environ['MOZ_NO_REMOTE'] = '0'
def getPrefJs(self):
return """
// Don't ask if we want to switch default browsers
user_pref("browser.shell.checkDefaultBrowser", false);
// Disable pop-up blocking
user_pref("browser.allowpopups", true);
user_pref("dom.disable_open_during_load", false);
// Configure us as the local proxy
//user_pref("network.proxy.type", 2);
// Disable security warnings
user_pref("security.warn_submit_insecure", false);
user_pref("security.warn_submit_insecure.show_once", false);
user_pref("security.warn_entering_secure", false);
user_pref("security.warn_entering_secure.show_once", false);
user_pref("security.warn_entering_weak", false);
user_pref("security.warn_entering_weak.show_once", false);
user_pref("security.warn_leaving_secure", false);
user_pref("security.warn_leaving_secure.show_once", false);
user_pref("security.warn_viewing_mixed", false);
user_pref("security.warn_viewing_mixed.show_once", false);
// Disable "do you want to remember this password?"
user_pref("signon.rememberSignons", false);
// increase the timeout before warning of unresponsive script
user_pref("dom.max_script_run_time", 120);
// this is required to upload files
user_pref("capability.principal.codebase.p1.granted", "UniversalFileRead");
user_pref("signed.applets.codebase_principal_support", true);
user_pref("capability.principal.codebase.p1.id", "http://%s");
user_pref("capability.principal.codebase.p1.subjectName", "");""" % \
(self.host)
class PhantomJS(Browser):
def _createRunJS(self):
run_js = """
var page = new WebPage(),
address;
address = phantom.args[0];
page.open(address, function (status) {
if (status !== 'success') {
console.log('FAIL to load the address');
} else {
console.log('SUCCESS load the address');
}
phantom.exit();
});
"""
return self._createFile('run.js', run_js)
def _run(self, url):
self._runCommand(("phantomjs", "phantomjs", self._createRunJS(), url))
class FunctionalTestRunner:
# There is no test that can take more them 24 hours
timeout = 2.0 * 60 * 60
def __init__(self, host, port, portal, run_only='', use_phanthom=False):
self.instance_home = os.environ['INSTANCE_HOME']
# Such informations should be automatically loaded
self.user = 'ERP5TypeTestCase'
self.password = ''
self.run_only = run_only
profile_dir = os.path.join(self.instance_home, 'profile')
self.portal = portal
if use_phanthom:
self.browser = PhantomJS(profile_dir, host, int(port))
else:
self.browser = Firefox(profile_dir, host, int(port))
def getStatus(self):
transaction.commit()
return self.portal.portal_tests.TestTool_getResults()
def _getTestURL(self):
return ZELENIUM_BASE_URL % (self.portal.portal_url(), self.run_only,
self.portal.portal_url(), self.user, self.password)
def test(self, debug=0):
xvfb = Xvfb(self.instance_home)
try:
start = time.time()
if not debug and self.browser.use_xvfb:
xvfb.run()
self.browser.run(self._getTestURL() , xvfb.display)
while self.getStatus() is None:
time.sleep(10)
if (time.time() - start) > float(self.timeout):
raise TimeoutError("Test took more them %s seconds" % self.timeout)
finally:
self.browser.quit()
xvfb.quit()
def processResult(self):
file_content = self.getStatus().encode("utf-8", "replace")
sucess_amount = TEST_PASS_RE.search(file_content).group(1)
failure_amount = TEST_FAILURE_RE.search(file_content).group(1)
error_title_list = [re.compile('\s+').sub(' ', x).strip()
for x in TEST_ERROR_TITLE_RE.findall(file_content)]
detail = ''
for test_result in TEST_RESULT_RE.findall(file_content):
if TEST_ERROR_RESULT_RE.match(test_result):
detail += test_result
detail = IMAGE_RE.sub('', detail)
if detail:
detail = IMAGE_RE.sub('', detail)
detail = '''<html>
<head>
<style type="text/css">tr.status_failed { background-color:red };</style>
</head>
<body>%s</body>
</html>''' % detail
# When return fix output for handle unicode issues.
return detail, int(sucess_amount), int(failure_amount), error_title_list
class ERP5TypeFunctionalTestCase:
run_only = ""
foreground = 0
use_phanthom = False
def getTitle(self):
return "Zelenium"
def afterSetUp(self):
# create browser_id_manager
if not "browser_id_manager" in self.portal.objectIds():
self.portal.manage_addProduct['Sessions'].constructBrowserIdManager()
transaction.commit()
self.setSystemPreference()
self.portal.portal_tests.TestTool_cleanUpTestResults()
self.stepTic()
def setSystemPreference(self):
conversion_dict = _getConversionServerDict()
self.portal.Zuite_setPreference(
working_copy_list=bt5_dir_list,
conversion_server_hostname=conversion_dict['hostname'],
conversion_server_port=conversion_dict['port']
)
# XXX Memcached is missing
# XXX Persistent cache setup is missing
def testFunctionalTestRunner(self):
# first of all, abort to get rid of the mysql participation inn this
# transaction
self.portal._p_jar.sync()
self.runner = FunctionalTestRunner(self.serverhost, self.serverport,
self.portal, self.run_only, self.use_phanthom)
self.runner.test(debug=self.foreground)
detail, success, failure, error_title_list = self.runner.processResult()
self.logMessage("-" * 79)
total = success + failure
self.logMessage("%s Functional Tests %s Tests, %s Failures" % \
(self.getTitle(), total, failure))
self.logMessage("-" * 79)
self.logMessage(detail)
self.logMessage("-" * 79)
self.assertEquals([], error_title_list)
......@@ -61,7 +61,8 @@ class Recipe(BaseSlapRecipe):
base_url = self.parameter_dict['url'],
browser_argument_list = [],
user = self.parameter_dict['user'],
password = self.parameter_dict['password'])
password = self.parameter_dict['password'],
etc_directory = self.etc_directory)
# Check wanted browser XXX-Cedric not yet used but can be useful
#if self.parameter_dict.get('browser', None) is None:
......
......@@ -48,6 +48,7 @@ IMAGE_RE = re.compile('<img[^>]*?>')
TEST_ERROR_TITLE_RE = re.compile('(?:error.gif.*?>|title status_failed"><td[^>]*>)([^>]*?)</td></tr>', re.S)
TEST_RESULT_RE = re.compile('<div style="padding-top: 10px;">\s*<p>\s*'
'<img.*?</div>\s.*?</div>\s*', re.S)
DURATION_RE = re.compile('<th[^>]*>Elapsed time \(sec\)</th>\n\s*<td[^>]*>([^<]*)')
TEST_ERROR_RESULT_RE = re.compile('.*(?:error.gif|title status_failed).*', re.S)
......@@ -113,7 +114,7 @@ class ERP5TestReportHandler:
)
self.connection_helper = ConnectionHelper(url)
self.suite_name = suite_name
self.test_id = "20111026-18C84076DE29E8"
self.test_id = "20111025-1F0BF8263511D4"
def reportStart(self):
# report that test is running
......@@ -125,62 +126,41 @@ class ERP5TestReportHandler:
def reportFinished(self, out_file):
# make file parsable by erp5_test_results
out_file, success, failure, duration, error_title_list = self.processResult(
out_file)
#tempout = tempfile.mkstemp()[1]
#templog = tempfile.mkstemp()[1]
#log_lines = open(out_file, 'r').readlines()
#tl = open(templog, 'w')
#tl.write(TB_SEP + '\n')
#for log_line in log_lines:
# starts = log_line.startswith
# if starts('Ran') or starts('FAILED') or starts('OK') or starts(TB_SEP):
# continue
# if starts('ERROR: ') or starts('FAIL: '):
# tl.write('internal-test: ' + log_line)
# continue
# tl.write(log_line)
#
#tl.write("----------------------------------------------------------------------\n")
#tl.write('Ran %s test in %.2fs\n' % (test_count, duration))
#if success:
# tl.write('OK\n')
#else:
# tl.write('FAILED (failures=%s)\n' % failure)
#tl.write(TB_SEP + '\n')
out_file, success, failure, duration = self.processResult(out_file)
# XXX-Cedric : make correct display in test_result_module
tempcmd = tempfile.mkstemp()[1]
tempcmd2 = tempfile.mkstemp()[1]
tempout = tempfile.mkstemp()[1]
templog = tempfile.mkstemp()[1]
tl = open(templog, 'w')
tl.write(TB_SEP + '\n')
tl.write(out_file)
tl.write("----------------------------------------------------------------------\n")
tl.write('Ran 1 test in %.2fs\n' % duration)
if success:
tl.write('OK\n')
else:
tl.write('FAILED (failures=1)\n')
tl.write(TB_SEP + '\n')
tl.close()
open(tempcmd, 'w').write(""" %s""" % self.suite_name)
# create nice zip archive
tempzip = tempfile.mkstemp()[1]
zip = zipfile.ZipFile(tempzip, 'w')
# temperr is dummy
temperr = tempfile.mkstemp()[1]
open(temperr, 'w')
i = 1
test_case_list = [dict(name="first", log="OK\n"), dict(name="second", log="FAILED (failures=1)\n")]
for test_case in test_case_list:
tempcmd = tempfile.mkstemp()[1]
tempout = tempfile.mkstemp()[1]
open(tempcmd, 'w').write(test_case['name'])
open(tempout, 'w').write(test_case['log'])
#XXX-Cedric : support cases for more than 9 test cases
zip.write(tempcmd, '%s/00%s/cmdline' % (self.suite_name, i))
zip.write(tempout, '%s/00%s/stderr' % (self.suite_name, i))
zip.write(temperr, '%s/00%s/stdout' % (self.suite_name, i))
zip.write(tempout, '%s/001/stdout' % self.suite_name)
zip.write(templog, '%s/001/stderr' % self.suite_name)
zip.write(tempcmd, '%s/001/cmdline' % self.suite_name)
zip.close()
os.unlink(templog)
os.unlink(tempcmd)
os.unlink(tempout)
i = i + 1
zip.close()
os.unlink(temperr)
os.unlink(tempcmd2)
# post it to ERP5
self.connection_helper.POST('TestResultModule_reportCompleted', dict(
test_report_id=self.test_id),
file_list=[('filepath', tempzip)]
)
import pdb; pdb.set_trace()
self.connection_helper.POST('TestResultModule_reportCompleted',
dict(test_report_id=self.test_id), file_list=[('filepath', tempzip)])
os.unlink(tempzip)
def processResult(self, out_file):
......@@ -189,9 +169,7 @@ class ERP5TestReportHandler:
failure_amount = TEST_FAILURE_RE.search(file_content).group(1)
error_title_list = [re.compile('\s+').sub(' ', x).strip()
for x in TEST_ERROR_TITLE_RE.findall(file_content)]
# XXX-Cedric : duration
duration = 0
duration = DURATION_RE.search(file_content).group(1)
detail = ''
for test_result in TEST_RESULT_RE.findall(file_content):
if TEST_ERROR_RESULT_RE.match(test_result):
......@@ -206,6 +184,4 @@ class ERP5TestReportHandler:
</head>
<body>%s</body>
</html>''' % detail
return detail, int(sucess_amount), int(failure_amount), duration, \
error_title_list
return detail, int(sucess_amount), int(failure_amount), float(duration)
......@@ -26,9 +26,10 @@
#############################################################################
from erp5functionaltestreporthandler import ERP5TestReportHandler
from ERP5TypeFunctionalTestCase import Xvfb, Firefox, TimeoutError
from time import sleep
import time
import os
from subprocess import Popen, PIPE
import urllib2
def run(args):
......@@ -37,6 +38,9 @@ def run(args):
test_url = assembleTestUrl(config['base_url'], config['suite_name'],
config['user'], config['password'])
# There is no test that can take more them 24 hours
timeout = 2.0 * 60 * 60
while True:
erp5_report = ERP5TestReportHandler(config['test_suite_master_url'],
config['project'] + '@' + config['suite_name'])
......@@ -45,32 +49,46 @@ def run(args):
config['base_url'], config['user'], config['password']))
# TODO assert getresult is None
#XXX-Cedric : clean firefox data (so that it does not load previous session)
#xvfb = Popen([config['xvfb_binary'], config['display']], stdout=PIPE)
#os.environ['DISPLAY'] = config['display']
#sleep(10)
command = []
command.append(config['browser_binary'])
for browser_argument in config['browser_argument_list']:
command.append(browser_argument)
command.append(test_url)
browser = Popen(command, stdout=PIPE)
#
#command = []
#command.append(config['browser_binary'])
#for browser_argument in config['browser_argument_list']:
# command.append(browser_argument)
#command.append(test_url)
#browser = Popen(command, stdout=PIPE)
#
#erp5_report.reportStart()
#
## Wait for test to be finished
#while getStatus(config['base_url']) is '':
# sleep(10)
os.environ['DISPLAY'] = config['display']
xvfb = Xvfb(config['xvfb_binary'], config['etc_directory'])
profile_dir = os.path.join(config['etc_directory'], 'profile')
browser = Firefox(config['browser_binary'], profile_dir, config['base_url'])
try:
start = time.time()
xvfb.run()
profile_dir = os.path.join(config['etc_directory'], 'profile')
browser.run(test_url , xvfb.display)
erp5_report.reportStart()
# Wait for test to be finished
while getStatus(config['base_url']) is '':
sleep(1) #10
import pdb; pdb.set_trace()
time.sleep(10)
if (time.time() - start) > float(timeout):
raise TimeoutError("Test took more them %s seconds" % timeout)
except TimeoutError:
continue
finally:
browser.quit()
xvfb.quit()
erp5_report.reportFinished(getStatus(config['base_url']).encode("utf-8",
"replace"))
browser.kill()
#terminateXvfb(xvfb, config['display'])
print("Test finished and report sent, sleeping.")
sleep(3600)
......@@ -108,8 +126,8 @@ def assembleTestUrl(base_url, suite_name, user, password):
base_url, suite_name, base_url, user, password)
return test_url
def terminateXvfb(process, display):
process.kill()
lock_filepath = '/tmp/.X%s-lock' % display.replace(":", "")
if os.path.exists(lock_filepath):
os.system('rm %s' % lock_filepath)
#def terminateXvfb(process, display):
# process.kill()
# lock_filepath = '/tmp/.X%s-lock' % display.replace(":", "")
# if os.path.exists(lock_filepath):
# os.system('rm %s' % lock_filepath)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment