Commit 41072db7 authored by Richard Oudkerk's avatar Richard Oudkerk

Issue #17097: Make multiprocessing ignore EINTR.

parent 44d8b11c
...@@ -270,7 +270,14 @@ class SocketListener(object): ...@@ -270,7 +270,14 @@ class SocketListener(object):
self._unlink = None self._unlink = None
def accept(self): def accept(self):
s, self._last_accepted = self._socket.accept() while True:
try:
s, self._last_accepted = self._socket.accept()
except socket.error as e:
if e.args[0] != errno.EINTR:
raise
else:
break
s.setblocking(True) s.setblocking(True)
fd = duplicate(s.fileno()) fd = duplicate(s.fileno())
conn = _multiprocessing.Connection(fd) conn = _multiprocessing.Connection(fd)
......
...@@ -2460,13 +2460,81 @@ class TestForkAwareThreadLock(unittest.TestCase): ...@@ -2460,13 +2460,81 @@ class TestForkAwareThreadLock(unittest.TestCase):
p.join() p.join()
self.assertLessEqual(new_size, old_size) self.assertLessEqual(new_size, old_size)
#
# Issue #17097: EINTR should be ignored by recv(), send(), accept() etc
#
class TestIgnoreEINTR(unittest.TestCase):
@classmethod
def _test_ignore(cls, conn):
def handler(signum, frame):
pass
signal.signal(signal.SIGUSR1, handler)
conn.send('ready')
x = conn.recv()
conn.send(x)
conn.send_bytes(b'x'*(1024*1024)) # sending 1 MB should block
@unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
def test_ignore(self):
conn, child_conn = multiprocessing.Pipe()
try:
p = multiprocessing.Process(target=self._test_ignore,
args=(child_conn,))
p.daemon = True
p.start()
child_conn.close()
self.assertEqual(conn.recv(), 'ready')
time.sleep(0.1)
os.kill(p.pid, signal.SIGUSR1)
time.sleep(0.1)
conn.send(1234)
self.assertEqual(conn.recv(), 1234)
time.sleep(0.1)
os.kill(p.pid, signal.SIGUSR1)
self.assertEqual(conn.recv_bytes(), b'x'*(1024*1024))
time.sleep(0.1)
p.join()
finally:
conn.close()
@classmethod
def _test_ignore_listener(cls, conn):
def handler(signum, frame):
pass
signal.signal(signal.SIGUSR1, handler)
l = multiprocessing.connection.Listener()
conn.send(l.address)
a = l.accept()
a.send('welcome')
@unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
def test_ignore_listener(self):
conn, child_conn = multiprocessing.Pipe()
try:
p = multiprocessing.Process(target=self._test_ignore_listener,
args=(child_conn,))
p.daemon = True
p.start()
child_conn.close()
address = conn.recv()
time.sleep(0.1)
os.kill(p.pid, signal.SIGUSR1)
time.sleep(0.1)
client = multiprocessing.connection.Client(address)
self.assertEqual(client.recv(), 'welcome')
p.join()
finally:
conn.close()
# #
# #
# #
testcases_other = [OtherTest, TestInvalidHandle, TestInitializers, testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
TestStdinBadfiledescriptor, TestTimeouts, TestNoForkBomb, TestStdinBadfiledescriptor, TestTimeouts, TestNoForkBomb,
TestFlags, TestForkAwareThreadLock] TestFlags, TestForkAwareThreadLock, TestIgnoreEINTR]
# #
# #
......
...@@ -24,6 +24,8 @@ Core and Builtins ...@@ -24,6 +24,8 @@ Core and Builtins
Library Library
------- -------
- Issue #17097: Make multiprocessing ignore EINTR.
- Issue #18155: The csv module now correctly handles csv files that use - Issue #18155: The csv module now correctly handles csv files that use
a delimiter character that has a special meaning in regexes, instead of a delimiter character that has a special meaning in regexes, instead of
throwing an exception. throwing an exception.
......
...@@ -22,6 +22,21 @@ ...@@ -22,6 +22,21 @@
# define CLOSE(h) close(h) # define CLOSE(h) close(h)
#endif #endif
/*
* Wrapper for PyErr_CheckSignals() which can be called without the GIL
*/
static int
check_signals(void)
{
PyGILState_STATE state;
int res;
state = PyGILState_Ensure();
res = PyErr_CheckSignals();
PyGILState_Release(state);
return res;
}
/* /*
* Send string to file descriptor * Send string to file descriptor
*/ */
...@@ -34,8 +49,14 @@ _conn_sendall(HANDLE h, char *string, size_t length) ...@@ -34,8 +49,14 @@ _conn_sendall(HANDLE h, char *string, size_t length)
while (length > 0) { while (length > 0) {
res = WRITE(h, p, length); res = WRITE(h, p, length);
if (res < 0) if (res < 0) {
if (errno == EINTR) {
if (check_signals() < 0)
return MP_EXCEPTION_HAS_BEEN_SET;
continue;
}
return MP_SOCKET_ERROR; return MP_SOCKET_ERROR;
}
length -= res; length -= res;
p += res; p += res;
} }
...@@ -56,12 +77,16 @@ _conn_recvall(HANDLE h, char *buffer, size_t length) ...@@ -56,12 +77,16 @@ _conn_recvall(HANDLE h, char *buffer, size_t length)
while (remaining > 0) { while (remaining > 0) {
temp = READ(h, p, remaining); temp = READ(h, p, remaining);
if (temp <= 0) { if (temp < 0) {
if (temp == 0) if (errno == EINTR) {
return remaining == length ? if (check_signals() < 0)
MP_END_OF_FILE : MP_EARLY_END_OF_FILE; return MP_EXCEPTION_HAS_BEEN_SET;
else continue;
return temp; }
return temp;
}
else if (temp == 0) {
return remaining == length ? MP_END_OF_FILE : MP_EARLY_END_OF_FILE;
} }
remaining -= temp; remaining -= temp;
p += temp; p += temp;
...@@ -171,9 +196,16 @@ conn_poll(ConnectionObject *conn, double timeout, PyThreadState *_save) ...@@ -171,9 +196,16 @@ conn_poll(ConnectionObject *conn, double timeout, PyThreadState *_save)
p.revents = 0; p.revents = 0;
if (timeout < 0) { if (timeout < 0) {
res = poll(&p, 1, -1); do {
res = poll(&p, 1, -1);
} while (res < 0 && errno == EINTR);
} else { } else {
res = poll(&p, 1, (int)(timeout * 1000 + 0.5)); res = poll(&p, 1, (int)(timeout * 1000 + 0.5));
if (res < 0 && errno == EINTR) {
/* We were interrupted by a signal. Just indicate a
timeout even though we are early. */
return FALSE;
}
} }
if (res < 0) { if (res < 0) {
...@@ -209,12 +241,19 @@ conn_poll(ConnectionObject *conn, double timeout, PyThreadState *_save) ...@@ -209,12 +241,19 @@ conn_poll(ConnectionObject *conn, double timeout, PyThreadState *_save)
FD_SET((SOCKET)conn->handle, &rfds); FD_SET((SOCKET)conn->handle, &rfds);
if (timeout < 0.0) { if (timeout < 0.0) {
res = select((int)conn->handle+1, &rfds, NULL, NULL, NULL); do {
res = select((int)conn->handle+1, &rfds, NULL, NULL, NULL);
} while (res < 0 && errno == EINTR);
} else { } else {
struct timeval tv; struct timeval tv;
tv.tv_sec = (long)timeout; tv.tv_sec = (long)timeout;
tv.tv_usec = (long)((timeout - tv.tv_sec) * 1e6 + 0.5); tv.tv_usec = (long)((timeout - tv.tv_sec) * 1e6 + 0.5);
res = select((int)conn->handle+1, &rfds, NULL, NULL, &tv); res = select((int)conn->handle+1, &rfds, NULL, NULL, &tv);
if (res < 0 && errno == EINTR) {
/* We were interrupted by a signal. Just indicate a
timeout even though we are early. */
return FALSE;
}
} }
if (res < 0) { if (res < 0) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment