Commit 92ff4e19 authored by Antoine Pitrou's avatar Antoine Pitrou

Issue #14666: stop multiprocessing's resource-sharing thread after the tests are done.

Also, block delivery of signals to that thread. Patch by Richard Oudkerk.

This will hopefully fix sporadic freezes on the FreeBSD 9.0 buildbot.
parent d0880d57
...@@ -40,6 +40,7 @@ import sys ...@@ -40,6 +40,7 @@ import sys
import socket import socket
import threading import threading
import struct import struct
import signal
from multiprocessing import current_process from multiprocessing import current_process
from multiprocessing.util import register_after_fork, debug, sub_debug from multiprocessing.util import register_after_fork, debug, sub_debug
...@@ -209,6 +210,7 @@ class ResourceSharer(object): ...@@ -209,6 +210,7 @@ class ResourceSharer(object):
self._lock = threading.Lock() self._lock = threading.Lock()
self._listener = None self._listener = None
self._address = None self._address = None
self._thread = None
register_after_fork(self, ResourceSharer._afterfork) register_after_fork(self, ResourceSharer._afterfork)
def register(self, send, close): def register(self, send, close):
...@@ -227,6 +229,24 @@ class ResourceSharer(object): ...@@ -227,6 +229,24 @@ class ResourceSharer(object):
c.send((key, os.getpid())) c.send((key, os.getpid()))
return c return c
def stop(self, timeout=None):
from .connection import Client
with self._lock:
if self._address is not None:
c = Client(self._address, authkey=current_process().authkey)
c.send(None)
c.close()
self._thread.join(timeout)
if self._thread.is_alive():
sub_warn('ResourceSharer thread did not stop when asked')
self._listener.close()
self._thread = None
self._address = None
self._listener = None
for key, (send, close) in self._cache.items():
close()
self._cache.clear()
def _afterfork(self): def _afterfork(self):
for key, (send, close) in self._cache.items(): for key, (send, close) in self._cache.items():
close() close()
...@@ -239,6 +259,7 @@ class ResourceSharer(object): ...@@ -239,6 +259,7 @@ class ResourceSharer(object):
self._listener.close() self._listener.close()
self._listener = None self._listener = None
self._address = None self._address = None
self._thread = None
def _start(self): def _start(self):
from .connection import Listener from .connection import Listener
...@@ -249,12 +270,18 @@ class ResourceSharer(object): ...@@ -249,12 +270,18 @@ class ResourceSharer(object):
t = threading.Thread(target=self._serve) t = threading.Thread(target=self._serve)
t.daemon = True t.daemon = True
t.start() t.start()
self._thread = t
def _serve(self): def _serve(self):
if hasattr(signal, 'pthread_sigmask'):
signal.pthread_sigmask(signal.SIG_BLOCK, range(1, signal.NSIG))
while 1: while 1:
try: try:
conn = self._listener.accept() conn = self._listener.accept()
key, destination_pid = conn.recv() msg = conn.recv()
if msg is None:
break
key, destination_pid = msg
send, close = self._cache.pop(key) send, close = self._cache.pop(key)
send(conn, destination_pid) send(conn, destination_pid)
close() close()
......
...@@ -1965,6 +1965,11 @@ class _TestPicklingConnections(BaseTestCase): ...@@ -1965,6 +1965,11 @@ class _TestPicklingConnections(BaseTestCase):
ALLOWED_TYPES = ('processes',) ALLOWED_TYPES = ('processes',)
@classmethod
def tearDownClass(cls):
from multiprocessing.reduction import resource_sharer
resource_sharer.stop(timeout=5)
@classmethod @classmethod
def _listener(cls, conn, families): def _listener(cls, conn, families):
for fam in families: for fam in families:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment