Commit 922a597b authored by Arnaud Fontaine's avatar Arnaud Fontaine

Fix KeyboardInterrupt on control process where the results were not

properly flushed before exiting and terminate children properly.

Also, terminate children after a given number of errors (based upon a
patch from jm).

git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk/utils@46015 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent fa44baa3
...@@ -33,11 +33,15 @@ import os ...@@ -33,11 +33,15 @@ import os
import sys import sys
import multiprocessing import multiprocessing
import xmlrpclib import xmlrpclib
import signal
import errno
from erp5.utils.benchmark.argument import ArgumentType from erp5.utils.benchmark.argument import ArgumentType
from erp5.utils.benchmark.process import BenchmarkProcess from erp5.utils.benchmark.process import BenchmarkProcess
from erp5.utils.benchmark.result import ERP5BenchmarkResult, CSVBenchmarkResult from erp5.utils.benchmark.result import ERP5BenchmarkResult, CSVBenchmarkResult
MAXIMUM_KEYBOARD_INTERRUPT = 5
class PerformanceTester(object): class PerformanceTester(object):
def __init__(self, namespace=None): def __init__(self, namespace=None):
if not namespace: if not namespace:
...@@ -46,6 +50,8 @@ class PerformanceTester(object): ...@@ -46,6 +50,8 @@ class PerformanceTester(object):
else: else:
self._argument_namespace = namespace self._argument_namespace = namespace
self._process_terminated_counter = 0
@staticmethod @staticmethod
def _add_parser_arguments(parser): def _add_parser_arguments(parser):
# Optional arguments # Optional arguments
...@@ -189,6 +195,9 @@ class PerformanceTester(object): ...@@ -189,6 +195,9 @@ class PerformanceTester(object):
ERP5BenchmarkResult.closeResultDocument(self._argument_namespace.erp5_publish_url, ERP5BenchmarkResult.closeResultDocument(self._argument_namespace.erp5_publish_url,
error_message_set) error_message_set)
def _child_terminated_handler(self, *args, **kwargs):
self._process_terminated_counter += 1
def _run_constant(self, nb_users): def _run_constant(self, nb_users):
process_list = [] process_list = []
exit_msg_queue = multiprocessing.Queue(nb_users) exit_msg_queue = multiprocessing.Queue(nb_users)
...@@ -202,33 +211,41 @@ class PerformanceTester(object): ...@@ -202,33 +211,41 @@ class PerformanceTester(object):
process_list.append(process) process_list.append(process)
signal.signal(signal.SIGCHLD, self._child_terminated_handler)
for process in process_list: for process in process_list:
process.start() process.start()
error_message_set = set() error_message_set = set()
i = 0 interrupted_counter = 0
while i != len(process_list): while self._process_terminated_counter != len(process_list):
try: try:
msg = exit_msg_queue.get() error_message = exit_msg_queue.get()
except KeyboardInterrupt:
if self._argument_namespace.repeat != -1:
print >>sys.stderr, "Stopping gracefully"
for process in process_list:
process.terminate()
i = 0 except KeyboardInterrupt, e:
continue if interrupted_counter == 0:
else: print >>sys.stderr, "\nInterrupted by user, stopping gracefully " \
msg = None "unless interrupted %d times" % MAXIMUM_KEYBOARD_INTERRUPT
interrupted_counter += 1
if msg is not None:
error_message_set.add(msg)
for process in process_list: for process in process_list:
if (not getattr(process, '_stopping', False) or
interrupted_counter == MAXIMUM_KEYBOARD_INTERRUPT):
process._stopping = True
process.terminate() process.terminate()
break # An IOError may be raised when receiving a SIGCHLD which interrupts the
# blocking system call above and the system call should not be restarted
# (using siginterrupt), otherwise the process will stall forever as its
# child has already exited
except IOError, e:
if e.errno == errno.EINTR:
continue
i += 1 else:
if error_message is not None:
error_message_set.add(error_message)
if error_message_set: if error_message_set:
return (error_message_set, 1) return (error_message_set, 1)
......
...@@ -36,6 +36,9 @@ import sys ...@@ -36,6 +36,9 @@ import sys
from erp5.utils.test_browser.browser import Browser from erp5.utils.test_browser.browser import Browser
MAXIMUM_ERROR_COUNTER = 10
RESULT_NUMBER_BEFORE_FLUSHING = 100
class BenchmarkProcess(multiprocessing.Process): class BenchmarkProcess(multiprocessing.Process):
def __init__(self, exit_msg_queue, result_klass, argument_namespace, def __init__(self, exit_msg_queue, result_klass, argument_namespace,
nb_users, user_index, *args, **kwargs): nb_users, user_index, *args, **kwargs):
...@@ -48,6 +51,7 @@ class BenchmarkProcess(multiprocessing.Process): ...@@ -48,6 +51,7 @@ class BenchmarkProcess(multiprocessing.Process):
# Initialized when running the test # Initialized when running the test
self._browser = None self._browser = None
self._current_repeat = 1 self._current_repeat = 1
self._error_counter = 0
super(BenchmarkProcess, self).__init__(*args, **kwargs) super(BenchmarkProcess, self).__init__(*args, **kwargs)
...@@ -71,6 +75,9 @@ class BenchmarkProcess(multiprocessing.Process): ...@@ -71,6 +75,9 @@ class BenchmarkProcess(multiprocessing.Process):
try: try:
target(result, self._browser) target(result, self._browser)
except BaseException, e: except BaseException, e:
if isinstance(e, StopIteration):
raise
msg = "%s: %s" % (target, traceback.format_exc()) msg = "%s: %s" % (target, traceback.format_exc())
if (self._argument_namespace.enable_debug and isinstance(e, Exception)): if (self._argument_namespace.enable_debug and isinstance(e, Exception)):
...@@ -79,10 +86,11 @@ class BenchmarkProcess(multiprocessing.Process): ...@@ -79,10 +86,11 @@ class BenchmarkProcess(multiprocessing.Process):
except: except:
pass pass
if self._current_repeat == 1: if (self._current_repeat == 1 or
self._logger.error(msg) self._error_counter == MAXIMUM_ERROR_COUNTER):
raise raise RuntimeError(msg)
self._error_counter += 1
self._logger.warning(msg) self._logger.warning(msg)
for stat in result.getCurrentSuiteStatList(): for stat in result.getCurrentSuiteStatList():
...@@ -95,13 +103,11 @@ class BenchmarkProcess(multiprocessing.Process): ...@@ -95,13 +103,11 @@ class BenchmarkProcess(multiprocessing.Process):
stat.standard_deviation, stat.standard_deviation,
stat.maximum)) stat.maximum))
if self._argument_namespace.max_global_average and \ if (self._argument_namespace.max_global_average and
mean > self._argument_namespace.max_global_average: mean > self._argument_namespace.max_global_average):
self._logger.info("Stopping as mean is greater than maximum " raise RuntimeError("Stopping as mean is greater than maximum "
"global average") "global average")
raise StopIteration
result.exitSuite() result.exitSuite()
result.iterationFinished() result.iterationFinished()
...@@ -113,9 +119,13 @@ class BenchmarkProcess(multiprocessing.Process): ...@@ -113,9 +119,13 @@ class BenchmarkProcess(multiprocessing.Process):
self._logger = result_instance.getLogger() self._logger = result_instance.getLogger()
if self._argument_namespace.repeat != -1: # Ensure the data are flushed before exiting, handled by Result class
# __exit__ block
signal.signal(signal.SIGTERM, self.stopGracefully) signal.signal(signal.SIGTERM, self.stopGracefully)
# Ignore KeyboardInterrupt as it is handled by the parent process
signal.signal(signal.SIGINT, signal.SIG_IGN)
exit_status = 0 exit_status = 0
exit_msg = None exit_msg = None
...@@ -128,16 +138,15 @@ class BenchmarkProcess(multiprocessing.Process): ...@@ -128,16 +138,15 @@ class BenchmarkProcess(multiprocessing.Process):
self.runBenchmarkSuiteList(result) self.runBenchmarkSuiteList(result)
self._current_repeat += 1 self._current_repeat += 1
if self._current_repeat == 100: if self._current_repeat == RESULT_NUMBER_BEFORE_FLUSHING:
result.flush() result.flush()
except StopIteration, e: except StopIteration, e:
exit_msg = str(e) self._logger.error(e)
exit_status = 1
except BaseException, e: except BaseException, e:
exit_msg = e exit_msg = e
exit_status = 2 exit_status = 1
self._exit_msg_queue.put(exit_msg) self._exit_msg_queue.put(exit_msg)
sys.exit(exit_status) sys.exit(exit_status)
...@@ -214,15 +214,10 @@ class CSVBenchmarkResult(BenchmarkResult): ...@@ -214,15 +214,10 @@ class CSVBenchmarkResult(BenchmarkResult):
super(CSVBenchmarkResult, self).__exit__(exc_type, exc_value, traceback) super(CSVBenchmarkResult, self).__exit__(exc_type, exc_value, traceback)
self._result_file.close() self._result_file.close()
if exc_type: if exc_type and not issubclass(exc_type, StopIteration):
msg = "An error occured, see: %s" % self._log_filename_path msg = "An error occured, see: %s" % self._log_filename_path
from traceback import format_tb self.getLogger().error("%s: %s" % (exc_type, exc_value))
self.getLogger().error("%s: %s\n%s" % (exc_type, exc_value, raise RuntimeError(msg)
''.join(format_tb(traceback))))
if isinstance(exc_type, StopIteration):
raise StopIteration, msg
else:
raise RuntimeError, msg
from cStringIO import StringIO from cStringIO import StringIO
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment