Commit 0bce68b3 authored by Jason Madden's avatar Jason Madden

Python 2, subprocess: Let unbuffered binary writes to popen.stdin loop to write all the data.

More like what Python 2 standard library does. Beware of relying on that though during an upgrade.

Fixes #1711
parent 9bc5e65d
Python 2: Make ``gevent.subprocess.Popen.stdin`` objects have a
``write`` method that guarantees to write the entire argument in
binary, unbuffered mode. This may require multiple trips around the
event loop, but more closely matches the behaviour of the Python 2
standard library (and gevent prior to 1.5). The number of bytes
written is still returned (instead of ``None``).
......@@ -70,6 +70,7 @@ class UniversalNewlineBytesWrapper(io.TextIOWrapper):
next = __next__
class FlushingBufferedWriter(io.BufferedWriter):
def write(self, b):
......@@ -77,6 +78,47 @@ class FlushingBufferedWriter(io.BufferedWriter):
self.flush()
return ret
class WriteallMixin(object):
def writeall(self, value):
"""
Similar to :meth:`socket.socket.sendall`, ensures that all the contents of
*value* have been written (though not necessarily flushed) before returning.
Returns the length of *value*.
.. versionadded:: NEXT
"""
# Do we need to play the same get_memory games we do with sockets?
# And what about chunking for large values? See _socketcommon.py
write = super(WriteallMixin, self).write
total = len(value)
while value:
l = len(value)
w = write(value)
if w == l:
break
value = value[w:]
return total
class FileIO(io.FileIO):
"""A subclass that we can dynamically assign __class__ for."""
__slots__ = ()
class WriteIsWriteallMixin(WriteallMixin):
def write(self, value):
return self.writeall(value)
class WriteallFileIO(WriteIsWriteallMixin, io.FileIO):
pass
class OpenDescriptor(object): # pylint:disable=too-many-instance-attributes
"""
Interprets the arguments to `open`. Internal use only.
......@@ -87,25 +129,40 @@ class OpenDescriptor(object): # pylint:disable=too-many-instance-attributes
- Native strings are returned on Python 2 when neither
'b' nor 't' are in the mode string and no encoding is specified.
- Universal newlines work in that mode.
- Allows unbuffered text IO.
- Allows externally unbuffered text IO.
:keyword bool atomic_write: If true, then if the opened, wrapped, stream
is unbuffered (meaning that ``write`` can produce short writes and the return
value needs to be checked), then the implementation will be adjusted so that
``write`` behaves like Python 2 on a built-in file object and writes the
entire value. Only set this on Python 2; the only intended user is
:class:`gevent.subprocess.Popen`.
"""
@staticmethod
def _collapse_arg(preferred_val, old_val, default):
def _collapse_arg(pref_name, preferred_val, old_name, old_val, default):
# We could play tricks with the callers ``locals()`` to avoid having to specify
# the name (which we only use for error handling) but ``locals()`` may be slow and
# inhibit JIT (on PyPy), so we just write it out long hand.
if preferred_val is not None and old_val is not None:
raise TypeError
raise TypeError("Cannot specify both %s=%s and %s=%s" % (
pref_name, preferred_val,
old_name, old_val
))
if preferred_val is None and old_val is None:
return default
return preferred_val if preferred_val is not None else old_val
def __init__(self, fobj, mode='r', bufsize=None, close=None,
encoding=None, errors=None, newline=None,
buffering=None, closefd=None):
buffering=None, closefd=None,
atomic_write=False):
# Based on code in the stdlib's _pyio.py from 3.8.
# pylint:disable=too-many-locals,too-many-branches,too-many-statements
closefd = self._collapse_arg(closefd, close, True)
closefd = self._collapse_arg('closefd', closefd, 'close', close, True)
del close
buffering = self._collapse_arg(buffering, bufsize, -1)
buffering = self._collapse_arg('buffering', buffering, 'bufsize', bufsize, -1)
del bufsize
if not hasattr(fobj, 'fileno'):
......@@ -168,7 +225,7 @@ class OpenDescriptor(object): # pylint:disable=too-many-instance-attributes
"mode, the default buffer size will be used",
RuntimeWarning, 4)
self.fobj = fobj
self._fobj = fobj
self.fileio_mode = (
(creating and "x" or "")
+ (reading and "r" or "")
......@@ -198,58 +255,69 @@ class OpenDescriptor(object): # pylint:disable=too-many-instance-attributes
self.errors = errors
self.newline = newline
self.closefd = closefd
self.atomic_write = atomic_write
default_buffer_size = io.DEFAULT_BUFFER_SIZE
def is_fd(self):
return isinstance(self.fobj, integer_types)
_opened = None
_opened_raw = None
def open(self):
return self.open_raw_and_wrapped()[1]
def is_fd(self):
return isinstance(self._fobj, integer_types)
def open_raw_and_wrapped(self):
raw = self.open_raw()
def opened(self):
"""
Return the :meth:`wrapped` file object.
"""
if self._opened is None:
raw = self.opened_raw()
try:
return raw, self.wrapped(raw)
self._opened = self.__wrapped(raw)
except:
# XXX: This might be a bug? Could we wind up closing
# something we shouldn't close?
raw.close()
raise
return self._opened
def open_raw(self):
if hasattr(self.fobj, 'fileno'):
return self.fobj
return io.FileIO(self.fobj, self.fileio_mode, self.closefd)
def _raw_object_is_new(self, raw):
return self._fobj is not raw
def wrapped(self, raw):
"""
Wraps the raw IO object (`RawIOBase` or `io.TextIOBase`) in
buffers, text decoding, and newline handling.
"""
# pylint:disable=too-many-branches
result = raw
buffering = self.buffering
def opened_raw(self):
if self._opened_raw is None:
self._opened_raw = self._do_open_raw()
return self._opened_raw
line_buffering = False
if buffering == 1 or buffering < 0 and raw.isatty():
buffering = -1
line_buffering = True
if buffering < 0:
buffering = self.default_buffer_size
def _do_open_raw(self):
if hasattr(self._fobj, 'fileno'):
return self._fobj
# io.FileIO doesn't allow assigning to its __class__,
# and we can't know for sure here whether we need the atomic write()
# method or not (it depends on the layers on top of us),
# so we use a subclass that *does* allow assigning.
return FileIO(self._fobj, self.fileio_mode, self.closefd)
@staticmethod
def is_buffered(stream):
return (
# buffering happens internally in the text codecs
isinstance(stream, (io.BufferedIOBase, io.TextIOBase))
or (hasattr(stream, 'buffer') and stream.buffer is not None)
)
@classmethod
def buffer_size_for_stream(cls, stream):
result = cls.default_buffer_size
try:
bs = os.fstat(raw.fileno()).st_blksize
bs = os.fstat(stream.fileno()).st_blksize
except (OSError, AttributeError):
pass
else:
if bs > 1:
buffering = bs
if buffering < 0: # pragma: no cover
raise ValueError("invalid buffering size")
result = bs
return result
if not isinstance(raw, io.BufferedIOBase) and \
(not hasattr(raw, 'buffer') or raw.buffer is None):
# Need to wrap our own buffering around it. If it
# is already buffered, don't do so.
if buffering != 0:
def __buffered(self, stream, buffering):
if self.updating:
Buffer = io.BufferedRandom
elif self.creating or self.writing or self.appending:
......@@ -260,20 +328,70 @@ class OpenDescriptor(object): # pylint:disable=too-many-instance-attributes
raise ValueError("unknown mode: %r" % self.mode)
try:
result = Buffer(raw, buffering)
result = Buffer(stream, buffering)
except AttributeError:
# Python 2 file() objects don't have the readable/writable
# attributes. But they handle their own buffering.
result = raw
result = stream
if self.binary:
if isinstance(raw, io.TextIOBase):
return result
def _make_atomic_write(self, result, raw):
# The idea was to swizzle the class with one that defines
# write() to call writeall(). This avoids setting any
# attribute on the return object, avoids an additional layer
# of proxying, and avoids any reference cycles (if setting a
# method on the object).
#
# However, this is not possible with the built-in io classes
# (static types defined in C cannot have __class__ assigned).
# Fortunately, we need this only for the specific case of
# opening a file descriptor (subprocess.py) on Python 2, in
# which we fully control the types involved.
#
# So rather than attempt that, we only implement exactly what we need.
if result is not raw or self._raw_object_is_new(raw):
if result.__class__ is FileIO:
result.__class__ = WriteallFileIO
else: # pragma: no cover
raise NotImplementedError(
"Don't know how to make %s have atomic write. "
"Please open a gevent issue with your use-case." % (
result
)
)
return result
def __wrapped(self, raw):
"""
Wraps the raw IO object (`RawIOBase` or `io.TextIOBase`) in
buffers, text decoding, and newline handling.
"""
if self.binary and isinstance(raw, io.TextIOBase):
# Can't do it. The TextIO object will have its own buffer, and
# trying to read from the raw stream or the buffer without going through
# the TextIO object is likely to lead to problems with the codec.
raise ValueError("Unable to perform binary IO on top of text IO stream")
return result
result = raw
buffering = self.buffering
line_buffering = False
if buffering == 1 or buffering < 0 and raw.isatty():
buffering = -1
line_buffering = True
if buffering < 0:
buffering = self.buffer_size_for_stream(result)
if buffering < 0: # pragma: no cover
raise ValueError("invalid buffering size")
if buffering != 0 and not self.is_buffered(result):
# Need to wrap our own buffering around it. If it
# is already buffered, don't do so.
result = self.__buffered(result, buffering)
if not self.binary:
# Either native or text at this point.
if PY2 and self.native:
# Neither text mode nor binary mode specified.
......@@ -288,7 +406,7 @@ class OpenDescriptor(object): # pylint:disable=too-many-instance-attributes
result = io.TextIOWrapper(result, self.encoding, self.errors, self.newline,
line_buffering)
if result is not raw:
if result is not raw or self._raw_object_is_new(raw):
# Set the mode, if possible, but only if we created a new
# object.
try:
......@@ -298,6 +416,15 @@ class OpenDescriptor(object): # pylint:disable=too-many-instance-attributes
# TypeError: Readonly attribute (py2)
pass
if (
self.atomic_write
and not self.is_buffered(result)
and not isinstance(result, WriteIsWriteallMixin)
):
# Let subclasses have a say in how they make this atomic, and
# whether or not they do so even if we're actually returning the raw object.
result = self._make_atomic_write(result, raw)
return result
......@@ -326,8 +453,12 @@ class FileObjectBase(object):
'readlines',
'read1',
# Write
# Write.
# Note that we do not extend WriteallMixin,
# so writeall will be copied, if it exists, and
# wrapped.
'write',
'writeall',
'writelines',
'truncate',
)
......@@ -335,11 +466,12 @@ class FileObjectBase(object):
_io = None
def __init__(self, fobj, closefd):
self._io = fobj
def __init__(self, descriptor):
# type: (OpenDescriptor) -> None
self._io = descriptor.opened()
# We don't actually use this property ourself, but we save it (and
# pass it along) for compatibility.
self._close = closefd
self._close = descriptor.closefd
self._do_delegate_methods()
......@@ -379,7 +511,15 @@ class FileObjectBase(object):
fobj = self._io
self._io = None
try:
self._do_close(fobj, self._close)
finally:
fobj = None
# Remove delegate methods to drop remaining references to
# _io.
d = self.__dict__
for meth_name in self._delegate_methods:
d.pop(meth_name, None)
def _do_close(self, fobj, closefd):
raise NotImplementedError()
......@@ -435,7 +575,7 @@ class FileObjectBlock(FileObjectBase):
def __init__(self, fobj, *args, **kwargs):
descriptor = OpenDescriptor(fobj, *args, **kwargs)
FileObjectBase.__init__(self, descriptor.open(), descriptor.closefd)
FileObjectBase.__init__(self, descriptor)
def _do_close(self, fobj, closefd):
fobj.close()
......@@ -457,7 +597,6 @@ class FileObjectThread(FileObjectBase):
not this action is synchronous or asynchronous is not documented.
"""
def __init__(self, *args, **kwargs):
"""
:keyword bool lock: If True (the default) then all operations will
......@@ -483,8 +622,8 @@ class FileObjectThread(FileObjectBase):
if not hasattr(self.lock, '__enter__'):
raise TypeError('Expected a Semaphore or boolean, got %r' % type(self.lock))
self.__io_holder = [descriptor.open()] # signal for _wrap_method
FileObjectBase.__init__(self, self.__io_holder[0], descriptor.closefd)
self.__io_holder = [descriptor.opened()] # signal for _wrap_method
FileObjectBase.__init__(self, descriptor)
def _do_close(self, fobj, closefd):
self.__io_holder[0] = None # for _wrap_method
......@@ -524,16 +663,6 @@ class FileObjectThread(FileObjectBase):
def _extra_repr(self):
return ' threadpool=%r' % (self.threadpool,)
def __iter__(self):
return self
def next(self):
line = self.readline()
if line:
return line
raise StopIteration
__next__ = next
def _wrap_method(self, method):
# NOTE: We are careful to avoid introducing a refcycle
# within self. Our wrapper cannot refer to self.
......
......@@ -14,6 +14,7 @@ from gevent._compat import reraise
from gevent._fileobjectcommon import cancel_wait_ex
from gevent._fileobjectcommon import FileObjectBase
from gevent._fileobjectcommon import OpenDescriptor
from gevent._fileobjectcommon import WriteIsWriteallMixin
from gevent._hub_primitives import wait_on_watcher
from gevent.hub import get_hub
from gevent.os import _read
......@@ -213,27 +214,40 @@ class GreenFileDescriptorIO(RawIOBase):
)
class GreenFileDescriptorIOWriteall(WriteIsWriteallMixin,
GreenFileDescriptorIO):
pass
class GreenOpenDescriptor(OpenDescriptor):
def open_raw(self):
def _do_open_raw(self):
if self.is_fd():
fileio = GreenFileDescriptorIO(self.fobj, self, closefd=self.closefd)
fileio = GreenFileDescriptorIO(self._fobj, self, closefd=self.closefd)
else:
closefd = False
# Either an existing file object or a path string (which
# we open to get a file object). In either case, the other object
# owns the descriptor and we must not close it.
closefd = False
if hasattr(self.fobj, 'fileno'):
raw = self.fobj
else:
raw = OpenDescriptor.open_raw(self)
raw = OpenDescriptor._do_open_raw(self)
fileno = raw.fileno()
fileio = GreenFileDescriptorIO(fileno, self, closefd=closefd)
fileio._keep_alive = raw
return fileio
def _make_atomic_write(self, result, raw):
# Our return value from _do_open_raw is always a new
# object that we own, so we're always free to change
# the class.
assert result is not raw or self._raw_object_is_new(raw)
if result.__class__ is GreenFileDescriptorIO:
result.__class__ = GreenFileDescriptorIOWriteall
else:
result = OpenDescriptor._make_atomic_write(self, result, raw)
return result
class FileObjectPosix(FileObjectBase):
"""
......@@ -309,9 +323,9 @@ class FileObjectPosix(FileObjectBase):
def __init__(self, *args, **kwargs):
descriptor = GreenOpenDescriptor(*args, **kwargs)
FileObjectBase.__init__(self, descriptor)
# This attribute is documented as available for non-blocking reads.
self.fileio, buffered_fobj = descriptor.open_raw_and_wrapped()
FileObjectBase.__init__(self, buffered_fobj, descriptor.closefd)
self.fileio = descriptor.opened_raw()
def _do_close(self, fobj, closefd):
try:
......
......@@ -455,6 +455,14 @@ def FileObject(*args, **kwargs):
# Defer importing FileObject until we need it
# to allow it to be configured more easily.
from gevent.fileobject import FileObject as _FileObject
if not PY3:
# Make write behave like the old Python 2 file
# write and loop to consume output, even when not
# buffered.
__FileObject = _FileObject
def _FileObject(*args, **kwargs):
kwargs['atomic_write'] = True
return __FileObject(*args, **kwargs)
globals()['FileObject'] = _FileObject
return _FileObject(*args)
......@@ -557,6 +565,12 @@ class Popen(object):
.. seealso:: :class:`subprocess.Popen`
This class should have the same interface as the standard library class.
.. caution::
The default values of some arguments, notably ``buffering``, differ
between Python 2 and Python 3. For the most consistent behaviour across
versions, it's best to explicitly pass the desired values.
.. caution::
On Python 2, the ``read`` method of the ``stdout`` and ``stderr`` attributes
......@@ -602,6 +616,15 @@ class Popen(object):
Add the *group*, *extra_groups*, *user*, and *umask* arguments. These
were added to Python 3.9, but are available in any gevent version, provided
the underlying platform support is present.
.. versionchanged:: NEXT
On Python 2 only, if unbuffered binary communication is requested,
the ``stdin`` attribute of this object will have a ``write`` method that
actually performs internal buffering and looping, similar to the standard library.
It guarantees to write all the data given to it in a single call (but internally
it may make many system calls and/or trips around the event loop to accomplish this).
See :issue:`1711`.
"""
if GenericAlias is not None:
......@@ -751,6 +774,7 @@ class Popen(object):
encoding=self.encoding, errors=self.errors)
else:
self.stdin = FileObject(p2cwrite, 'wb', bufsize)
if c2pread != -1:
if universal_newlines or text_mode:
if PY3:
......
......@@ -11,6 +11,10 @@ import unittest
import gevent
from gevent import fileobject
from gevent._fileobjectcommon import OpenDescriptor
try:
from gevent._fileobjectposix import GreenOpenDescriptor
except ImportError:
GreenOpenDescriptor = None
from gevent._compat import PY2
from gevent._compat import PY3
......@@ -387,8 +391,11 @@ class TestTextMode(unittest.TestCase):
class TestOpenDescriptor(greentest.TestCase):
def _getTargetClass(self):
return OpenDescriptor
def _makeOne(self, *args, **kwargs):
return OpenDescriptor(*args, **kwargs)
return self._getTargetClass()(*args, **kwargs)
def _check(self, regex, kind, *args, **kwargs):
with self.assertRaisesRegex(kind, regex):
......@@ -411,14 +418,33 @@ class TestOpenDescriptor(greentest.TestCase):
vase('take a newline', mode='rb', newline='\n'),
)
def test_atomicwrite_fd(self):
from gevent._fileobjectcommon import WriteallMixin
# It basically only does something when buffering is otherwise disabled
desc = self._makeOne(1, 'wb',
buffering=0,
closefd=False,
atomic_write=True)
self.assertTrue(desc.atomic_write)
fobj = desc.opened()
self.assertIsInstance(fobj, WriteallMixin)
def pop():
for regex, kind, kwargs in TestOpenDescriptor.CASES:
setattr(
TestOpenDescriptor, 'test_' + regex,
TestOpenDescriptor, 'test_' + regex.replace(' ', '_'),
lambda self, _re=regex, _kind=kind, _kw=kwargs: self._check(_re, _kind, 1, **_kw)
)
pop()
@unittest.skipIf(GreenOpenDescriptor is None, "No support for non-blocking IO")
class TestGreenOpenDescripton(TestOpenDescriptor):
def _getTargetClass(self):
return GreenOpenDescriptor
if __name__ == '__main__':
greentest.main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment