Commit 5aa7df32 authored by Antoine Pitrou's avatar Antoine Pitrou

Issue #13322: Fix BufferedWriter.write() to ensure that BlockingIOError is

raised when the wrapped raw file is non-blocking and the write would block.
Previous code assumed that the raw write() would raise BlockingIOError, but
RawIOBase.write() is defined to returned None when the call would block.
Patch by sbt.
parent 9d38b0dc
...@@ -8,6 +8,7 @@ import os ...@@ -8,6 +8,7 @@ import os
import abc import abc
import codecs import codecs
import warnings import warnings
import errno
# Import thread instead of threading to reduce startup cost # Import thread instead of threading to reduce startup cost
try: try:
from thread import allocate_lock as Lock from thread import allocate_lock as Lock
...@@ -720,8 +721,11 @@ class _BufferedIOMixin(BufferedIOBase): ...@@ -720,8 +721,11 @@ class _BufferedIOMixin(BufferedIOBase):
def close(self): def close(self):
if self.raw is not None and not self.closed: if self.raw is not None and not self.closed:
self.flush() try:
self.raw.close() # may raise BlockingIOError or BrokenPipeError etc
self.flush()
finally:
self.raw.close()
def detach(self): def detach(self):
if self.raw is None: if self.raw is None:
...@@ -1074,13 +1078,9 @@ class BufferedWriter(_BufferedIOMixin): ...@@ -1074,13 +1078,9 @@ class BufferedWriter(_BufferedIOMixin):
# XXX we can implement some more tricks to try and avoid # XXX we can implement some more tricks to try and avoid
# partial writes # partial writes
if len(self._write_buf) > self.buffer_size: if len(self._write_buf) > self.buffer_size:
# We're full, so let's pre-flush the buffer # We're full, so let's pre-flush the buffer. (This may
try: # raise BlockingIOError with characters_written == 0.)
self._flush_unlocked() self._flush_unlocked()
except BlockingIOError as e:
# We can't accept anything else.
# XXX Why not just let the exception pass through?
raise BlockingIOError(e.errno, e.strerror, 0)
before = len(self._write_buf) before = len(self._write_buf)
self._write_buf.extend(b) self._write_buf.extend(b)
written = len(self._write_buf) - before written = len(self._write_buf) - before
...@@ -1111,24 +1111,23 @@ class BufferedWriter(_BufferedIOMixin): ...@@ -1111,24 +1111,23 @@ class BufferedWriter(_BufferedIOMixin):
def _flush_unlocked(self): def _flush_unlocked(self):
if self.closed: if self.closed:
raise ValueError("flush of closed file") raise ValueError("flush of closed file")
written = 0 while self._write_buf:
try: try:
while self._write_buf: n = self.raw.write(self._write_buf)
try: except BlockingIOError:
n = self.raw.write(self._write_buf) raise RuntimeError("self.raw should implement RawIOBase: it "
except IOError as e: "should not raise BlockingIOError")
if e.errno != EINTR: except IOError as e:
raise if e.errno != EINTR:
continue raise
if n > len(self._write_buf) or n < 0: continue
raise IOError("write() returned incorrect number of bytes") if n is None:
del self._write_buf[:n] raise BlockingIOError(
written += n errno.EAGAIN,
except BlockingIOError as e: "write could not complete without blocking", 0)
n = e.characters_written if n > len(self._write_buf) or n < 0:
raise IOError("write() returned incorrect number of bytes")
del self._write_buf[:n] del self._write_buf[:n]
written += n
raise BlockingIOError(e.errno, e.strerror, written)
def tell(self): def tell(self):
return _BufferedIOMixin.tell(self) + len(self._write_buf) return _BufferedIOMixin.tell(self) + len(self._write_buf)
......
...@@ -43,6 +43,10 @@ try: ...@@ -43,6 +43,10 @@ try:
import threading import threading
except ImportError: except ImportError:
threading = None threading = None
try:
import fcntl
except ImportError:
fcntl = None
__metaclass__ = type __metaclass__ = type
bytes = support.py3k_bytes bytes = support.py3k_bytes
...@@ -228,9 +232,14 @@ class MockNonBlockWriterIO: ...@@ -228,9 +232,14 @@ class MockNonBlockWriterIO:
except ValueError: except ValueError:
pass pass
else: else:
self._blocker_char = None if n > 0:
self._write_stack.append(b[:n]) # write data up to the first blocker
raise self.BlockingIOError(0, "test blocking", n) self._write_stack.append(b[:n])
return n
else:
# cancel blocker and indicate would block
self._blocker_char = None
return None
self._write_stack.append(b) self._write_stack.append(b)
return len(b) return len(b)
...@@ -2610,6 +2619,68 @@ class MiscIOTest(unittest.TestCase): ...@@ -2610,6 +2619,68 @@ class MiscIOTest(unittest.TestCase):
# baseline "io" module. # baseline "io" module.
self._check_abc_inheritance(io) self._check_abc_inheritance(io)
@unittest.skipUnless(fcntl, 'fcntl required for this test')
def test_nonblock_pipe_write_bigbuf(self):
self._test_nonblock_pipe_write(16*1024)
@unittest.skipUnless(fcntl, 'fcntl required for this test')
def test_nonblock_pipe_write_smallbuf(self):
self._test_nonblock_pipe_write(1024)
def _set_non_blocking(self, fd):
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
self.assertNotEqual(flags, -1)
res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
self.assertEqual(res, 0)
def _test_nonblock_pipe_write(self, bufsize):
sent = []
received = []
r, w = os.pipe()
self._set_non_blocking(r)
self._set_non_blocking(w)
# To exercise all code paths in the C implementation we need
# to play with buffer sizes. For instance, if we choose a
# buffer size less than or equal to _PIPE_BUF (4096 on Linux)
# then we will never get a partial write of the buffer.
rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
with rf, wf:
for N in 9999, 73, 7574:
try:
i = 0
while True:
msg = bytes([i % 26 + 97]) * N
sent.append(msg)
wf.write(msg)
i += 1
except self.BlockingIOError as e:
self.assertEqual(e.args[0], errno.EAGAIN)
sent[-1] = sent[-1][:e.characters_written]
received.append(rf.read())
msg = b'BLOCKED'
wf.write(msg)
sent.append(msg)
while True:
try:
wf.flush()
break
except self.BlockingIOError as e:
self.assertEqual(e.args[0], errno.EAGAIN)
self.assertEqual(e.characters_written, 0)
received.append(rf.read())
received += iter(rf.read, None)
sent, received = b''.join(sent), b''.join(received)
self.assertTrue(sent == received)
self.assertTrue(wf.closed)
self.assertTrue(rf.closed)
class CMiscIOTest(MiscIOTest): class CMiscIOTest(MiscIOTest):
io = io io = io
......
...@@ -735,6 +735,7 @@ Ilya Sandler ...@@ -735,6 +735,7 @@ Ilya Sandler
Mark Sapiro Mark Sapiro
Ty Sarna Ty Sarna
Ben Sayer Ben Sayer
sbt
Michael Scharf Michael Scharf
Neil Schemenauer Neil Schemenauer
David Scherer David Scherer
......
...@@ -79,6 +79,12 @@ Core and Builtins ...@@ -79,6 +79,12 @@ Core and Builtins
Library Library
------- -------
- Issue #13322: Fix BufferedWriter.write() to ensure that BlockingIOError is
raised when the wrapped raw file is non-blocking and the write would block.
Previous code assumed that the raw write() would raise BlockingIOError, but
RawIOBase.write() is defined to returned None when the call would block.
Patch by sbt.
- Issue #13358: HTMLParser now calls handle_data only once for each CDATA. - Issue #13358: HTMLParser now calls handle_data only once for each CDATA.
- Issue #4147: minidom's toprettyxml no longer adds whitespace around a text - Issue #4147: minidom's toprettyxml no longer adds whitespace around a text
......
...@@ -550,7 +550,7 @@ buffered_isatty(buffered *self, PyObject *args) ...@@ -550,7 +550,7 @@ buffered_isatty(buffered *self, PyObject *args)
/* Forward decls */ /* Forward decls */
static PyObject * static PyObject *
_bufferedwriter_flush_unlocked(buffered *, int); _bufferedwriter_flush_unlocked(buffered *);
static Py_ssize_t static Py_ssize_t
_bufferedreader_fill_buffer(buffered *self); _bufferedreader_fill_buffer(buffered *self);
static void static void
...@@ -571,6 +571,18 @@ _bufferedreader_read_generic(buffered *self, Py_ssize_t); ...@@ -571,6 +571,18 @@ _bufferedreader_read_generic(buffered *self, Py_ssize_t);
* Helpers * Helpers
*/ */
/* Sets the current error to BlockingIOError */
static void
_set_BlockingIOError(char *msg, Py_ssize_t written)
{
PyObject *err;
err = PyObject_CallFunction(PyExc_BlockingIOError, "isn",
errno, msg, written);
if (err)
PyErr_SetObject(PyExc_BlockingIOError, err);
Py_XDECREF(err);
}
/* Returns the address of the `written` member if a BlockingIOError was /* Returns the address of the `written` member if a BlockingIOError was
raised, NULL otherwise. The error is always re-raised. */ raised, NULL otherwise. The error is always re-raised. */
static Py_ssize_t * static Py_ssize_t *
...@@ -725,7 +737,7 @@ buffered_flush_and_rewind_unlocked(buffered *self) ...@@ -725,7 +737,7 @@ buffered_flush_and_rewind_unlocked(buffered *self)
{ {
PyObject *res; PyObject *res;
res = _bufferedwriter_flush_unlocked(self, 0); res = _bufferedwriter_flush_unlocked(self);
if (res == NULL) if (res == NULL)
return NULL; return NULL;
Py_DECREF(res); Py_DECREF(res);
...@@ -1087,7 +1099,7 @@ buffered_seek(buffered *self, PyObject *args) ...@@ -1087,7 +1099,7 @@ buffered_seek(buffered *self, PyObject *args)
/* Fallback: invoke raw seek() method and clear buffer */ /* Fallback: invoke raw seek() method and clear buffer */
if (self->writable) { if (self->writable) {
res = _bufferedwriter_flush_unlocked(self, 0); res = _bufferedwriter_flush_unlocked(self);
if (res == NULL) if (res == NULL)
goto end; goto end;
Py_CLEAR(res); Py_CLEAR(res);
...@@ -1683,6 +1695,7 @@ _bufferedwriter_raw_write(buffered *self, char *start, Py_ssize_t len) ...@@ -1683,6 +1695,7 @@ _bufferedwriter_raw_write(buffered *self, char *start, Py_ssize_t len)
Py_buffer buf; Py_buffer buf;
PyObject *memobj, *res; PyObject *memobj, *res;
Py_ssize_t n; Py_ssize_t n;
int errnum;
/* NOTE: the buffer needn't be released as its object is NULL. */ /* NOTE: the buffer needn't be released as its object is NULL. */
if (PyBuffer_FillInfo(&buf, NULL, start, len, 1, PyBUF_CONTIG_RO) == -1) if (PyBuffer_FillInfo(&buf, NULL, start, len, 1, PyBUF_CONTIG_RO) == -1)
return -1; return -1;
...@@ -1695,11 +1708,21 @@ _bufferedwriter_raw_write(buffered *self, char *start, Py_ssize_t len) ...@@ -1695,11 +1708,21 @@ _bufferedwriter_raw_write(buffered *self, char *start, Py_ssize_t len)
raised (see issue #10956). raised (see issue #10956).
*/ */
do { do {
errno = 0;
res = PyObject_CallMethodObjArgs(self->raw, _PyIO_str_write, memobj, NULL); res = PyObject_CallMethodObjArgs(self->raw, _PyIO_str_write, memobj, NULL);
errnum = errno;
} while (res == NULL && _trap_eintr()); } while (res == NULL && _trap_eintr());
Py_DECREF(memobj); Py_DECREF(memobj);
if (res == NULL) if (res == NULL)
return -1; return -1;
if (res == Py_None) {
/* Non-blocking stream would have blocked. Special return code!
Being paranoid we reset errno in case it is changed by code
triggered by a decref. errno is used by _set_BlockingIOError(). */
Py_DECREF(res);
errno = errnum;
return -2;
}
n = PyNumber_AsSsize_t(res, PyExc_ValueError); n = PyNumber_AsSsize_t(res, PyExc_ValueError);
Py_DECREF(res); Py_DECREF(res);
if (n < 0 || n > len) { if (n < 0 || n > len) {
...@@ -1716,7 +1739,7 @@ _bufferedwriter_raw_write(buffered *self, char *start, Py_ssize_t len) ...@@ -1716,7 +1739,7 @@ _bufferedwriter_raw_write(buffered *self, char *start, Py_ssize_t len)
/* `restore_pos` is 1 if we need to restore the raw stream position at /* `restore_pos` is 1 if we need to restore the raw stream position at
the end, 0 otherwise. */ the end, 0 otherwise. */
static PyObject * static PyObject *
_bufferedwriter_flush_unlocked(buffered *self, int restore_pos) _bufferedwriter_flush_unlocked(buffered *self)
{ {
Py_ssize_t written = 0; Py_ssize_t written = 0;
Py_off_t n, rewind; Py_off_t n, rewind;
...@@ -1738,14 +1761,11 @@ _bufferedwriter_flush_unlocked(buffered *self, int restore_pos) ...@@ -1738,14 +1761,11 @@ _bufferedwriter_flush_unlocked(buffered *self, int restore_pos)
Py_SAFE_DOWNCAST(self->write_end - self->write_pos, Py_SAFE_DOWNCAST(self->write_end - self->write_pos,
Py_off_t, Py_ssize_t)); Py_off_t, Py_ssize_t));
if (n == -1) { if (n == -1) {
Py_ssize_t *w = _buffered_check_blocking_error(); goto error;
if (w == NULL) }
goto error; else if (n == -2) {
self->write_pos += *w; _set_BlockingIOError("write could not complete without blocking",
self->raw_pos = self->write_pos; 0);
written += *w;
*w = written;
/* Already re-raised */
goto error; goto error;
} }
self->write_pos += n; self->write_pos += n;
...@@ -1758,16 +1778,6 @@ _bufferedwriter_flush_unlocked(buffered *self, int restore_pos) ...@@ -1758,16 +1778,6 @@ _bufferedwriter_flush_unlocked(buffered *self, int restore_pos)
goto error; goto error;
} }
if (restore_pos) {
Py_off_t forward = rewind - written;
if (forward != 0) {
n = _buffered_raw_seek(self, forward, 1);
if (n < 0) {
goto error;
}
self->raw_pos += forward;
}
}
_bufferedwriter_reset_buf(self); _bufferedwriter_reset_buf(self);
end: end:
...@@ -1820,7 +1830,7 @@ bufferedwriter_write(buffered *self, PyObject *args) ...@@ -1820,7 +1830,7 @@ bufferedwriter_write(buffered *self, PyObject *args)
} }
/* First write the current buffer */ /* First write the current buffer */
res = _bufferedwriter_flush_unlocked(self, 0); res = _bufferedwriter_flush_unlocked(self);
if (res == NULL) { if (res == NULL) {
Py_ssize_t *w = _buffered_check_blocking_error(); Py_ssize_t *w = _buffered_check_blocking_error();
if (w == NULL) if (w == NULL)
...@@ -1843,14 +1853,19 @@ bufferedwriter_write(buffered *self, PyObject *args) ...@@ -1843,14 +1853,19 @@ bufferedwriter_write(buffered *self, PyObject *args)
PyErr_Clear(); PyErr_Clear();
memcpy(self->buffer + self->write_end, buf.buf, buf.len); memcpy(self->buffer + self->write_end, buf.buf, buf.len);
self->write_end += buf.len; self->write_end += buf.len;
self->pos += buf.len;
written = buf.len; written = buf.len;
goto end; goto end;
} }
/* Buffer as much as possible. */ /* Buffer as much as possible. */
memcpy(self->buffer + self->write_end, buf.buf, avail); memcpy(self->buffer + self->write_end, buf.buf, avail);
self->write_end += avail; self->write_end += avail;
/* Already re-raised */ self->pos += avail;
*w = avail; /* XXX Modifying the existing exception e using the pointer w
will change e.characters_written but not e.args[2].
Therefore we just replace with a new error. */
_set_BlockingIOError("write could not complete without blocking",
avail);
goto error; goto error;
} }
Py_CLEAR(res); Py_CLEAR(res);
...@@ -1875,11 +1890,9 @@ bufferedwriter_write(buffered *self, PyObject *args) ...@@ -1875,11 +1890,9 @@ bufferedwriter_write(buffered *self, PyObject *args)
Py_ssize_t n = _bufferedwriter_raw_write( Py_ssize_t n = _bufferedwriter_raw_write(
self, (char *) buf.buf + written, buf.len - written); self, (char *) buf.buf + written, buf.len - written);
if (n == -1) { if (n == -1) {
Py_ssize_t *w = _buffered_check_blocking_error(); goto error;
if (w == NULL) } else if (n == -2) {
goto error; /* Write failed because raw file is non-blocking */
written += *w;
remaining -= *w;
if (remaining > self->buffer_size) { if (remaining > self->buffer_size) {
/* Can't buffer everything, still buffer as much as possible */ /* Can't buffer everything, still buffer as much as possible */
memcpy(self->buffer, memcpy(self->buffer,
...@@ -1887,8 +1900,9 @@ bufferedwriter_write(buffered *self, PyObject *args) ...@@ -1887,8 +1900,9 @@ bufferedwriter_write(buffered *self, PyObject *args)
self->raw_pos = 0; self->raw_pos = 0;
ADJUST_POSITION(self, self->buffer_size); ADJUST_POSITION(self, self->buffer_size);
self->write_end = self->buffer_size; self->write_end = self->buffer_size;
*w = written + self->buffer_size; written += self->buffer_size;
/* Already re-raised */ _set_BlockingIOError("write could not complete without "
"blocking", written);
goto error; goto error;
} }
PyErr_Clear(); PyErr_Clear();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment