Commit dee188f8 authored by Arnaud Fontaine's avatar Arnaud Fontaine

multiprocessing does not cope well with terminate() without join() straight

away as it needs to send for example  the exit status over a pipe and also the
SIGCHLD was interrupting this pipe.

Moreover,  there  was a  race  condition in  the  previous  code leaving  some
children left in some cases because some SIGCHLD got lost.

Also, ignore further  SIGTERM once one has been received  to avoid receiving a
SIGTERM   before  putting   the   result  to   the   queue  and   interrupting
multiprocessing magic.

git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk/utils@46025 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 4b5f59c2
...@@ -50,8 +50,6 @@ class PerformanceTester(object): ...@@ -50,8 +50,6 @@ class PerformanceTester(object):
else: else:
self._argument_namespace = namespace self._argument_namespace = namespace
self._process_terminated_counter = 0
@staticmethod @staticmethod
def _add_parser_arguments(parser): def _add_parser_arguments(parser):
# Optional arguments # Optional arguments
...@@ -195,9 +193,6 @@ class PerformanceTester(object): ...@@ -195,9 +193,6 @@ class PerformanceTester(object):
ERP5BenchmarkResult.closeResultDocument(self._argument_namespace.erp5_publish_url, ERP5BenchmarkResult.closeResultDocument(self._argument_namespace.erp5_publish_url,
error_message_set) error_message_set)
def _child_terminated_handler(self, *args, **kwargs):
self._process_terminated_counter += 1
def _run_constant(self, nb_users): def _run_constant(self, nb_users):
process_list = [] process_list = []
exit_msg_queue = multiprocessing.Queue(nb_users) exit_msg_queue = multiprocessing.Queue(nb_users)
...@@ -211,32 +206,24 @@ class PerformanceTester(object): ...@@ -211,32 +206,24 @@ class PerformanceTester(object):
process_list.append(process) process_list.append(process)
signal.signal(signal.SIGCHLD, self._child_terminated_handler)
for process in process_list: for process in process_list:
process.start() process.start()
error_message_set = set() error_message_set = set()
interrupted_counter = 0 process_terminated_counter = 0
while self._process_terminated_counter != len(process_list):
# Ensure that SIGTERM signal (sent by terminate()) is not sent twice
do_exit = False
while process_terminated_counter != len(process_list):
try: try:
error_message = exit_msg_queue.get() error_message = exit_msg_queue.get()
except KeyboardInterrupt, e: except KeyboardInterrupt, e:
if interrupted_counter == 0: print >>sys.stderr, "\nInterrupted by user, stopping gracefully..."
print >>sys.stderr, "\nInterrupted by user, stopping gracefully " \ do_exit = True
"unless interrupted %d times" % MAXIMUM_KEYBOARD_INTERRUPT
interrupted_counter += 1
for process in process_list: # An IOError may be raised when receiving a SIGINT which interrupts the
if (process.is_alive() and
(not getattr(process, '_stopping', False) or
interrupted_counter == MAXIMUM_KEYBOARD_INTERRUPT)):
process._stopping = True
process.terminate()
# An IOError may be raised when receiving a SIGCHLD which interrupts the
# blocking system call above and the system call should not be restarted # blocking system call above and the system call should not be restarted
# (using siginterrupt), otherwise the process will stall forever as its # (using siginterrupt), otherwise the process will stall forever as its
# child has already exited # child has already exited
...@@ -247,15 +234,18 @@ class PerformanceTester(object): ...@@ -247,15 +234,18 @@ class PerformanceTester(object):
else: else:
if error_message is not None: if error_message is not None:
error_message_set.add(error_message) error_message_set.add(error_message)
do_exit = True
# In case of error, kill the other children because they are likely process_terminated_counter += 1
# failing as well (especially because a process only exits after
# encountering 10 errors) # In case of error or SIGINT, kill the other children because they are
if interrupted_counter == 0: # likely failing as well (especially because a process only exits after
for process in process_list: # encountering 10 errors)
if process.is_alive() and not getattr(process, '_stopping', False): if do_exit:
process._stopping = True for process in process_list:
process.terminate() if process.is_alive():
process.terminate()
process.join()
if error_message_set: if error_message_set:
return (error_message_set, 1) return (error_message_set, 1)
......
...@@ -56,6 +56,7 @@ class BenchmarkProcess(multiprocessing.Process): ...@@ -56,6 +56,7 @@ class BenchmarkProcess(multiprocessing.Process):
super(BenchmarkProcess, self).__init__(*args, **kwargs) super(BenchmarkProcess, self).__init__(*args, **kwargs)
def stopGracefully(self, *args, **kwargs): def stopGracefully(self, *args, **kwargs):
signal.signal(signal.SIGTERM, signal.SIG_IGN)
raise StopIteration("Interrupted by user or because of an error from " raise StopIteration("Interrupted by user or because of an error from "
"another process, flushing remaining results...") "another process, flushing remaining results...")
......
...@@ -31,6 +31,7 @@ import math ...@@ -31,6 +31,7 @@ import math
import os import os
import csv import csv
import logging import logging
import signal
class BenchmarkResultStatistic(object): class BenchmarkResultStatistic(object):
def __init__(self, suite, label): def __init__(self, suite, label):
...@@ -163,6 +164,7 @@ class BenchmarkResult(object): ...@@ -163,6 +164,7 @@ class BenchmarkResult(object):
@abc.abstractmethod @abc.abstractmethod
def __exit__(self, exc_type, exc_value, traceback): def __exit__(self, exc_type, exc_value, traceback):
signal.signal(signal.SIGTERM, signal.SIG_IGN)
self.flush(partial=False) self.flush(partial=False)
return True return True
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment