Commit ad6b13cc authored by Jason Madden's avatar Jason Madden

Unify lots of implementation details between FileObjectPosix/Thread.

Also, support bufsize of 0 in FileObjectPosix.

Fixes #840
parent 12cfd26d
......@@ -83,6 +83,8 @@ Stdlib Compatibility
with a timeout.
- ``FileObjectPosix`` exposes the ``read1`` method when in read mode,
and generally only exposes methods appropriate to the mode it is in.
- ``FileObjectPosix`` supports a *bufsize* of 0 in binary write modes.
Reported in :issue:`840` by Mike Lang.
Other Changes
-------------
......@@ -124,6 +126,11 @@ Other Changes
- If ``sys.stderr`` has been monkey-patched (not recommended),
exceptions that the hub reports aren't lost and can still be caught.
Reported in :issue:`825` by Jelle Smet.
- The various ``FileObject`` implementations are more consistent with
each other.
.. note:: Writing to the *io* property of a FileObject should be
considered deprecated after it is constructed.
1.1.2 (Jul 21, 2016)
====================
......
......@@ -4,6 +4,115 @@ try:
except ImportError:
EBADF = 9
from io import TextIOWrapper
cancel_wait_ex = IOError(EBADF, 'File descriptor was closed in another greenlet')
FileObjectClosed = IOError(EBADF, 'Bad file descriptor (FileObject was closed)')
class FileObjectBase(object):
"""
Internal base class to ensure a level of consistency
between FileObjectPosix and FileObjectThread
"""
# List of methods we delegate to the wrapping IO object, if they
# implement them and we do not.
_delegate_methods = (
# General methods
'flush',
'fileno',
'writable',
'readable',
'seek',
'seekable',
'tell',
# Read
'read',
'readline',
'readlines',
'read1',
# Write
'write',
'writelines',
'truncate',
)
# Whether we are translating universal newlines or not.
_translate = False
def __init__(self, io, closefd):
"""
:param io: An io.IOBase-like object.
"""
self._io = io
# We don't actually use this property ourself, but we save it (and
# pass it along) for compatibility.
self._close = closefd
if self._translate:
# This automatically handles delegation.
self.translate_newlines(None)
else:
self._do_delegate_methods()
io = property(lambda s: s._io,
# Historically we either hand-wrote all the delegation methods
# to use self.io, or we simply used __getattr__ to look them up at
# runtime. This meant people could change the io attribute on the fly
# and it would mostly work (subprocess.py used to do that). We don't recommend
# that, but we still support it.
lambda s, nv: setattr(s, '_io', nv) or s._do_delegate_methods())
def _do_delegate_methods(self):
for meth_name in self._delegate_methods:
meth = getattr(self._io, meth_name, None)
implemented_by_class = hasattr(type(self), meth_name)
if meth and not implemented_by_class:
setattr(self, meth_name, self._wrap_method(meth))
elif hasattr(self, meth_name) and not implemented_by_class:
delattr(self, meth_name)
def _wrap_method(self, method):
"""
Wrap a method we're copying into our dictionary from the underlying
io object to do something special or different, if necessary.
"""
return method
def translate_newlines(self, mode, *text_args, **text_kwargs):
wrapper = TextIOWrapper(self._io, *text_args, **text_kwargs)
if mode:
wrapper.mode = mode
self.io = wrapper
self._translate = True
@property
def closed(self):
"""True if the file is closed"""
return self._io is None
def close(self):
if self._io is None:
return
io = self._io
self._io = None
self._do_close(io, self._close)
def _do_close(self, io, closefd):
raise NotImplementedError()
def __getattr__(self, name):
if self._io is None:
raise FileObjectClosed()
return getattr(self._io, name)
def __repr__(self):
return '<%s _fobj=%r%s>' % (self.__class__.__name__, self.io, self._extra_repr())
def _extra_repr(self):
return ''
......@@ -6,10 +6,10 @@ from io import BufferedWriter
from io import BytesIO
from io import DEFAULT_BUFFER_SIZE
from io import RawIOBase
from io import TextIOWrapper
from io import UnsupportedOperation
from gevent._fileobjectcommon import cancel_wait_ex
from gevent._fileobjectcommon import FileObjectBase
from gevent.hub import get_hub
from gevent.os import _read
from gevent.os import _write
......@@ -136,8 +136,14 @@ class GreenFileDescriptorIO(RawIOBase):
def seek(self, offset, whence=0):
return os.lseek(self._fileno, offset, whence)
class FlushingBufferedWriter(BufferedWriter):
class FileObjectPosix(object):
def write(self, b):
ret = BufferedWriter.write(self, b)
self.flush()
return ret
class FileObjectPosix(FileObjectBase):
"""
A file-like object that operates on non-blocking files but
provides a synchronous, cooperative interface.
......@@ -183,30 +189,6 @@ class FileObjectPosix(object):
#: platform specific default for the *bufsize* parameter
default_bufsize = io.DEFAULT_BUFFER_SIZE
# List of methods we delegate to the wrapping IO object, if they
# implement them.
_delegate_methods = (
# General methods
'flush',
'fileno',
'writable',
'readable',
'seek',
'seekable',
'tell',
# Read
'read',
'readline',
'readlines',
'read1',
# Write
'write',
'writelines',
'truncate',
)
def __init__(self, fobj, mode='rb', bufsize=-1, close=True):
"""
:keyword fobj: Either an integer fileno, or an object supporting the
......@@ -216,10 +198,20 @@ class FileObjectPosix(object):
(where the "b" or "U" can be omitted).
If "U" is part of the mode, IO will be done on text, otherwise bytes.
:keyword int bufsize: If given, the size of the buffer to use. The default
value means to use a platform-specific default, and a value of 0 is translated
to a value of 1. Other values are interpreted as for the :mod:`io` package.
value means to use a platform-specific default
Other values are interpreted as for the :mod:`io` package.
Buffering is ignored in text mode.
.. versionchanged:: 1.2a1
A bufsize of 0 in write mode is no longer forced to be 1.
Instead, the underlying buffer is flushed after every write
operation to simulate a bufsize of 0. In gevent 1.0, a
bufsize of 0 was flushed when a newline was written, while
in gevent 1.1 it was flushed when more than one byte was
written. Note that this may have performance impacts.
"""
if isinstance(fobj, int):
fileno = fobj
fobj = None
......@@ -246,11 +238,10 @@ class FileObjectPosix(object):
raise ValueError('mode can only be [rb, rU, wb], not %r' % (orig_mode,))
self._fobj = fobj
self._closed = False
self._close = close
self.fileio = GreenFileDescriptorIO(fileno, mode, closefd=close)
self._orig_bufsize = bufsize
if bufsize < 0 or bufsize == 1:
bufsize = self.default_bufsize
elif bufsize == 0:
......@@ -261,49 +252,28 @@ class FileObjectPosix(object):
else:
assert mode == 'w'
IOFamily = BufferedWriter
if self._orig_bufsize == 0:
# We could also simply pass self.fileio as *io*, but this way
# we at least consistently expose a BufferedWriter in our *io*
# attribute.
IOFamily = FlushingBufferedWriter
self._io = IOFamily(self.fileio, bufsize)
io = IOFamily(self.fileio, bufsize)
#else: # QQQ: not used, not reachable
#
# self.io = BufferedRandom(self.fileio, bufsize)
if self._translate:
self._io = TextIOWrapper(self._io)
self.__delegate_methods()
def __delegate_methods(self):
for meth_name in self._delegate_methods:
meth = getattr(self._io, meth_name, None)
if meth:
setattr(self, meth_name, meth)
elif hasattr(self, meth_name):
delattr(self, meth_name)
super(FileObjectPosix, self).__init__(io, close)
io = property(lambda s: s._io,
# subprocess.py likes to swizzle our io object for universal_newlines
lambda s, nv: setattr(s, '_io', nv) or s.__delegate_methods())
@property
def closed(self):
"""True if the file is closed"""
return self._closed
def close(self):
if self._closed:
# make sure close() is only run once when called concurrently
return
self._closed = True
def _do_close(self, io, closefd):
try:
self._io.close()
io.close()
# self.fileio already knows whether or not to close the
# file descriptor
self.fileio.close()
finally:
self._fobj = None
self.fileio = None
def __iter__(self):
return self._io
def __getattr__(self, name):
# XXX: Should this really be _fobj, or self.io?
# _fobj can easily be None but io never is
return getattr(self._fobj, name)
......@@ -34,9 +34,13 @@ Classes
=======
"""
from __future__ import absolute_import
import functools
import sys
import os
from gevent._fileobjectcommon import FileObjectClosed
from gevent._fileobjectcommon import FileObjectBase
from gevent.hub import get_hub
from gevent._compat import integer_types
from gevent._compat import reraise
......@@ -66,16 +70,21 @@ else:
from gevent._fileobjectposix import FileObjectPosix
class FileObjectThread(object):
class FileObjectThread(FileObjectBase):
"""
A file-like object wrapping another file-like object, performing all blocking
operations on that object in a background thread.
.. versionchanged:: 1.1b1
The file object is closed using the threadpool. Note that whether or
not this action is synchronous or asynchronous is not documented.
"""
def __init__(self, fobj, *args, **kwargs):
def __init__(self, fobj, mode=None, bufsize=-1, close=True, threadpool=None, lock=True):
"""
:param fobj: The underlying file-like object to wrap, or an integer fileno
that will be pass to :func:`os.fdopen` along with everything in *args*.
that will be pass to :func:`os.fdopen` along with *mode* and *bufsize*.
:keyword bool lock: If True (the default) then all operations will
be performed one-by-one. Note that this does not guarantee that, if using
this file object from multiple threads/greenlets, operations will be performed
......@@ -85,11 +94,9 @@ class FileObjectThread(object):
:keyword bool close: If True (the default) then when this object is closed,
the underlying object is closed as well.
"""
self._close = kwargs.pop('close', True)
self.threadpool = kwargs.pop('threadpool', None)
self.lock = kwargs.pop('lock', True)
if kwargs:
raise TypeError('Unexpected arguments: %r' % kwargs.keys())
closefd = close
self.threadpool = threadpool
self.lock = lock
if self.lock is True:
self.lock = Semaphore()
elif not self.lock:
......@@ -97,32 +104,27 @@ class FileObjectThread(object):
if not hasattr(self.lock, '__enter__'):
raise TypeError('Expected a Semaphore or boolean, got %r' % type(self.lock))
if isinstance(fobj, integer_types):
if not self._close:
if not closefd:
# we cannot do this, since fdopen object will close the descriptor
raise TypeError('FileObjectThread does not support close=False')
fobj = os.fdopen(fobj, *args)
self.io = fobj
raise TypeError('FileObjectThread does not support close=False on an fd.')
if mode is None:
assert bufsize == -1, "If you use the default mode, you can't choose a bufsize"
fobj = os.fdopen(fobj)
else:
fobj = os.fdopen(fobj, mode, bufsize)
if self.threadpool is None:
self.threadpool = get_hub().threadpool
super(FileObjectThread, self).__init__(fobj, closefd)
def _apply(self, func, args=None, kwargs=None):
with self.lock:
return self.threadpool.apply(func, args, kwargs)
def close(self):
"""
.. versionchanged:: 1.1b1
The file object is closed using the threadpool. Note that whether or
not this action is synchronous or asynchronous is not documented.
"""
fobj = self.io
if fobj is None:
return
self.io = None
def _do_close(self, fobj, closefd):
try:
self.flush(_fobj=fobj)
self._apply(fobj.flush)
finally:
if self._close:
if closefd:
# Note that we're not using self._apply; older code
# did fobj.close() without going through the threadpool at all,
# so acquiring the lock could potentially introduce deadlocks
......@@ -139,24 +141,13 @@ class FileObjectThread(object):
if exc_info:
reraise(*exc_info)
def flush(self, _fobj=None):
if _fobj is not None:
fobj = _fobj
else:
fobj = self.io
if fobj is None:
raise FileObjectClosed
return self._apply(fobj.flush)
def __repr__(self):
return '<%s _fobj=%r threadpool=%r>' % (self.__class__.__name__, self.io, self.threadpool)
def __getattr__(self, item):
if self.io is None:
if item == 'closed':
return True
raise FileObjectClosed
return getattr(self.io, item)
def _do_delegate_methods(self):
super(FileObjectThread, self)._do_delegate_methods()
if not hasattr(self, 'read1') and 'r' in self._io.mode:
self.read1 = self.read
def _extra_repr(self):
return ' threadpool=%r' % (self.threadpool,)
def __iter__(self):
return self
......@@ -168,21 +159,16 @@ class FileObjectThread(object):
raise StopIteration
__next__ = next
def _wraps(method):
def x(self, *args, **kwargs):
fobj = self.io
if fobj is None:
raise FileObjectClosed
return self._apply(getattr(fobj, method), args, kwargs)
x.__name__ = method
return x
_method = None
for _method in ('read', 'readinto', 'readline', 'readlines', 'write', 'writelines', 'xreadlines'):
setattr(FileObjectThread, _method, _wraps(_method))
del _method
del _wraps
def _wrap_method(self, method):
@functools.wraps(method)
def thread_method(*args, **kwargs):
if self._io is None:
# This is different than FileObjectPosix, etc,
# because we want to save the expensive trip through
# the threadpool.
raise FileObjectClosed()
return self._apply(method, args, kwargs)
return thread_method
try:
......@@ -191,28 +177,18 @@ except NameError:
FileObject = FileObjectThread
class FileObjectBlock(object):
class FileObjectBlock(FileObjectBase):
def __init__(self, fobj, *args, **kwargs):
self._close = kwargs.pop('close', True)
closefd = kwargs.pop('close', True)
if kwargs:
raise TypeError('Unexpected arguments: %r' % kwargs.keys())
if isinstance(fobj, integer_types):
if not self._close:
if not closefd:
# we cannot do this, since fdopen object will close the descriptor
raise TypeError('FileObjectBlock does not support close=False')
raise TypeError('FileObjectBlock does not support close=False on an fd.')
fobj = os.fdopen(fobj, *args)
self.io = fobj
def __repr__(self):
return '<%s %r>' % (self.__class__.__name__, self.io, )
def __getattr__(self, item):
assert item != '_fobj'
if self.io is None:
raise FileObjectClosed
return getattr(self.io, item)
super(FileObjectBlock, self).__init__(fobj, closefd)
config = os.environ.get('GEVENT_FILE')
if config:
......
......@@ -468,9 +468,9 @@ class Popen(object):
# Under Python 3, if we left on the 'b' we'd get different results
# depending on whether we used FileObjectPosix or FileObjectThread
self.stdin = FileObject(p2cwrite, 'wb', bufsize)
self.stdin._translate = True
self.stdin.io = io.TextIOWrapper(self.stdin.io, write_through=True,
line_buffering=(bufsize == 1))
self.stdin.translate_newlines(None,
write_through=True,
line_buffering=(bufsize == 1))
else:
self.stdin = FileObject(p2cwrite, 'wb', bufsize)
if c2pread is not None:
......@@ -485,9 +485,7 @@ class Popen(object):
# test__subprocess.py that depend on python_universal_newlines,
# but would be inconsistent with the stdlib:
#msvcrt.setmode(self.stdout.fileno(), os.O_TEXT)
self.stdout.io = io.TextIOWrapper(self.stdout.io)
self.stdout.io.mode = 'r'
self.stdout._translate = True
self.stdout.translate_newlines('r')
else:
self.stdout = FileObject(c2pread, 'rU', bufsize)
else:
......@@ -496,8 +494,7 @@ class Popen(object):
if universal_newlines:
if PY3:
self.stderr = FileObject(errread, 'rb', bufsize)
self.stderr.io = io.TextIOWrapper(self.stderr.io)
self.stderr._translate = True
self.stderr.translate_newlines(None)
else:
self.stderr = FileObject(errread, 'rU', bufsize)
else:
......
......@@ -73,7 +73,7 @@ class Test(greentest.TestCase):
self._test_del(close=False)
self.fail("Shouldn't be able to create a FileObjectThread with close=False")
except TypeError as e:
self.assertEqual(str(e), 'FileObjectThread does not support close=False')
self.assertEqual(str(e), 'FileObjectThread does not support close=False on an fd.')
def test_newlines(self):
r, w = os.pipe()
......@@ -130,6 +130,20 @@ class Test(greentest.TestCase):
x.close()
y.close()
#if FileObject is not FileObjectThread:
def test_bufsize_0(self):
# Issue #840
r, w = os.pipe()
x = FileObject(r, 'rb', bufsize=0)
y = FileObject(w, 'wb', bufsize=0)
y.write(b'a')
b = x.read(1)
self.assertEqual(b, b'a')
y.writelines([b'2'])
b = x.read(1)
self.assertEqual(b, b'2')
def writer(fobj, line):
for character in line:
fobj.write(character)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment