Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
erp5
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Titouan Soulard
erp5
Commits
c9a92f11
Commit
c9a92f11
authored
May 12, 2016
by
Julien Muchembled
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
In functional tests, try to kill subprocesses gracefully, but make sure they're killed
parent
10792f70
Changes
2
Show whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
69 additions
and
29 deletions
+69
-29
product/ERP5Type/Utils.py
product/ERP5Type/Utils.py
+36
-0
product/ERP5Type/tests/ERP5TypeFunctionalTestCase.py
product/ERP5Type/tests/ERP5TypeFunctionalTestCase.py
+33
-29
No files found.
product/ERP5Type/Utils.py
View file @
c9a92f11
...
@@ -31,6 +31,7 @@
...
@@ -31,6 +31,7 @@
import
os
import
os
import
re
import
re
import
string
import
string
import
threading
import
time
import
time
import
warnings
import
warnings
import
sys
import
sys
...
@@ -1370,6 +1371,41 @@ def sleep(t=5):
...
@@ -1370,6 +1371,41 @@ def sleep(t=5):
"""
"""
time
.
sleep
(
t
)
time
.
sleep
(
t
)
def
stopProcess
(
process
,
graceful
=
5
):
if
process
.
pid
and
process
.
returncode
is
None
:
if
graceful
:
process
.
terminate
()
t
=
threading
.
Timer
(
graceful
,
process
.
kill
)
t
.
start
()
# PY3: use waitid(WNOWAIT) and call process.poll() after t.cancel()
r
=
process
.
wait
()
t
.
cancel
()
return
r
process
.
kill
()
return
process
.
wait
()
from
ctypes
import
CDLL
,
util
as
ctypes_util
,
get_errno
,
c_int
,
c_long
libc
=
CDLL
(
ctypes_util
.
find_library
(
'c'
),
use_errno
=
True
)
class
Prctl
(
object
):
def
__init__
(
self
,
option
,
nargs
=
1
):
self
.
option
=
option
self
.
args0
=
(
0
,)
*
(
4
-
nargs
)
def
__call__
(
self
,
*
args
):
try
:
prctl
=
self
.
_prctl
except
AttributeError
:
prctl
=
self
.
_prctl
=
libc
.
prctl
prctl
.
argtypes
=
c_int
,
c_long
,
c_long
,
c_long
,
c_long
r
=
prctl
(
self
.
option
,
*
(
args
+
self
.
args0
))
if
r
==
-
1
:
e
=
get_errno
()
raise
OSError
(
e
,
os
.
strerror
(
e
))
return
r
PR_SET_PDEATHSIG
=
Prctl
(
1
)
#####################################################
#####################################################
# Timezones
# Timezones
...
...
product/ERP5Type/tests/ERP5TypeFunctionalTestCase.py
View file @
c9a92f11
...
@@ -28,6 +28,7 @@
...
@@ -28,6 +28,7 @@
##############################################################################
##############################################################################
import
os
import
os
import
signal
import
sys
import
sys
import
time
import
time
import
re
import
re
...
@@ -38,6 +39,7 @@ from ZPublisher.HTTPResponse import HTTPResponse
...
@@ -38,6 +39,7 @@ from ZPublisher.HTTPResponse import HTTPResponse
from
zExceptions.ExceptionFormatter
import
format_exception
from
zExceptions.ExceptionFormatter
import
format_exception
from
Products.ERP5Type.tests.ERP5TypeTestCase
import
ERP5TypeTestCase
,
\
from
Products.ERP5Type.tests.ERP5TypeTestCase
import
ERP5TypeTestCase
,
\
_getConversionServerDict
_getConversionServerDict
from
Products.ERP5Type.Utils
import
stopProcess
,
PR_SET_PDEATHSIG
# REGEX FOR ZELENIUM TESTS
# REGEX FOR ZELENIUM TESTS
TEST_PASS_RE
=
re
.
compile
(
'<th[^>]*>Tests passed</th>
\
n
\
s*<
t
d[^>]*>([^<]*)'
)
TEST_PASS_RE
=
re
.
compile
(
'<th[^>]*>Tests passed</th>
\
n
\
s*<
t
d[^>]*>([^<]*)'
)
...
@@ -68,7 +70,20 @@ bt5_dir_list = ','.join([
...
@@ -68,7 +70,20 @@ bt5_dir_list = ','.join([
class TimeoutError(Exception):
class TimeoutError(Exception):
pass
pass
class Xvfb:
class Process(object):
def preexec_fn(self):
PR_SET_PDEATHSIG(signal.SIGTERM)
def _exec(self, *args, **kw):
self.process = subprocess.Popen(preexec_fn=self.preexec_fn, *args, **kw)
def quit(self):
if hasattr(self, '
process
'):
stopProcess(self.process)
del self.process
class Xvfb(Process):
def __init__(self, fbdir):
def __init__(self, fbdir):
self.display_list = [":%s" % i for i in range(123, 144)]
self.display_list = [":%s" % i for i in range(123, 144)]
self.display = None
self.display = None
...
@@ -77,7 +92,7 @@ class Xvfb:
...
@@ -77,7 +92,7 @@ class Xvfb:
def _runCommand(self, display):
def _runCommand(self, display):
xvfb_bin = os.environ.get("xvfb_bin", "Xvfb")
xvfb_bin = os.environ.get("xvfb_bin", "Xvfb")
with open(os.devnull, '
w
') as null:
with open(os.devnull, '
w
') as null:
self.
process = subprocess.Popen
(
self.
_exec
(
(xvfb_bin, '
-
fbdir
' , self.fbdir, display,
(xvfb_bin, '
-
fbdir
' , self.fbdir, display,
'
-
screen
', '
0
', '
1280
x1024x24
'),
'
-
screen
', '
0
', '
1280
x1024x24
'),
stdout=null, stderr=null, close_fds=True)
stdout=null, stderr=null, close_fds=True)
...
@@ -109,21 +124,13 @@ class Xvfb:
...
@@ -109,21 +124,13 @@ class Xvfb:
print '
Xvfb
:
%
d
' % self.process.pid
print '
Xvfb
:
%
d
' % self.process.pid
print '
Take
screenshots
using
xwud
-
in
%
s
/
Xvfb_screen0
' % self.fbdir
print '
Take
screenshots
using
xwud
-
in
%
s
/
Xvfb_screen0
' % self.fbdir
def quit(self):
class Browser(Process):
if hasattr(self, '
process
'):
self.process.terminate()
class Browser:
def __init__(self, profile_dir, host, port):
def __init__(self, profile_dir, host, port):
self.profile_dir = profile_dir
self.profile_dir = profile_dir
self.host = host
self.host = host
self.port = port
self.port = port
def quit(self):
if getattr(self, "process", None) is not None:
self.process.kill()
def _run(self, url, display):
def _run(self, url, display):
""" This method should be implemented on a subclass """
""" This method should be implemented on a subclass """
raise NotImplementedError
raise NotImplementedError
...
@@ -160,8 +167,7 @@ class Browser:
...
@@ -160,8 +167,7 @@ class Browser:
self.environ["XAUTHORITY"] = xauth
self.environ["XAUTHORITY"] = xauth
def _runCommand(self, *args):
def _runCommand(self, *args):
print " ".join(args)
self._exec(args, close_fds=True, env=self.environ)
self.process = subprocess.Popen(args, close_fds=True, env=self.environ)
class Firefox(Browser):
class Firefox(Browser):
""" Use firefox to open run all the tests"""
""" Use firefox to open run all the tests"""
...
@@ -300,32 +306,30 @@ class FunctionalTestRunner:
...
@@ -300,32 +306,30 @@ class FunctionalTestRunner:
self.user, self.password)
self.user, self.password)
def test(self, debug=0):
def test(self, debug=0):
try:
xvfb = Xvfb(self.instance_home)
xvfb = Xvfb(self.instance_home)
start = time.time()
try:
end = time.time() + self.timeout
if not debug:
if not debug:
print("
\
n
Set 'erp5_debug_mode' environment variable to 1"
print("
\
n
Set 'erp5_debug_mode' environment variable to 1"
" to use your existing display instead of Xvfb.")
" to use your existing display instead of Xvfb.")
xvfb.run()
xvfb.run()
try:
self.browser.run(self._getTestURL() , xvfb.display)
self.browser.run(self._getTestURL() , xvfb.display)
while True
:
while time.time() < end
:
status = self.getStatus()
status = self.getStatus()
if status is not None and not '>ONGOING<'
in status:
if status and '>ONGOING<' not
in status:
break
break
time.sleep(10)
time.sleep(10)
if (time.time() - start) > float(self.timeout):
if self.browser.process.poll() is not None:
raise RuntimeError('Test browser is no longer running.')
else:
# TODO: here we could take a screenshot and display it in the report
# TODO: here we could take a screenshot and display it in the report
# (maybe using data: scheme inside a <img>)
# (maybe using data: scheme inside a <img>)
raise TimeoutError("Test took more than %s seconds" % self.timeout)
raise TimeoutError("Test took more than %s seconds" % self.timeout)
if self.browser.process.poll():
raise RuntimeError('Test browser is no longer running.')
except:
print("ERP5TypeFunctionalTestCase.test Exception: %r" % (sys.exc_info(),))
raise
finally:
finally:
xvfb.quit()
if getattr(self, "browser", None) is not None:
self.browser.quit()
self.browser.quit()
finally:
xvfb.quit()
def processResult(self):
def processResult(self):
file_content = self.getStatus().encode("utf-8", "replace")
file_content = self.getStatus().encode("utf-8", "replace")
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment