Commit e5907043 authored by Jason Madden's avatar Jason Madden

Fixes for universal newlines on Python 2

parent 5bf0ca88
......@@ -11,6 +11,7 @@ import sys
from gevent.hub import _get_hub_noargs as get_hub
from gevent._compat import PY2
from gevent._compat import integer_types
from gevent._compat import reraise
from gevent._compat import fspath
......@@ -29,6 +30,30 @@ class FileObjectClosed(IOError):
super(FileObjectClosed, self).__init__(
EBADF, 'Bad file descriptor (FileObject was closed)')
class _UniversalNewlineBytesWrapper(io.TextIOWrapper):
"""
Uses TextWrapper to decode universal newlines, but returns the
results as bytes.
This is for Python 2 where the 'rU' mode did that.
"""
def __init__(self, fobj):
io.TextIOWrapper.__init__(self, fobj, encoding='latin-1', newline=None)
def read(self, *args, **kwargs):
result = io.TextIOWrapper.read(self, *args, **kwargs)
return result.encode('latin-1')
def readline(self, *args, **kwargs):
result = io.TextIOWrapper.readline(self, *args, **kwargs)
return result.encode('latin-1')
def readlines(self, *args, **kwargs):
result = io.TextIOWrapper.readlines(self, *args, **kwargs)
return [x.encode('latin-1') for x in result]
class FileObjectBase(object):
"""
Internal base class to ensure a level of consistency
......@@ -64,6 +89,7 @@ class FileObjectBase(object):
# Whether we should apply a TextWrapper (the names are historical).
# Subclasses should set these before calling our constructor.
_translate = False
_translate_mode = None
_translate_encoding = None
_translate_errors = None
_translate_newline = None # None means universal
......@@ -80,7 +106,7 @@ class FileObjectBase(object):
if self._translate:
# This automatically handles delegation by assigning to
# self.io
self.translate_newlines(None,
self.translate_newlines(self._translate_mode,
self._translate_encoding,
self._translate_errors)
else:
......@@ -112,9 +138,13 @@ class FileObjectBase(object):
return method
def translate_newlines(self, mode, *text_args, **text_kwargs):
if mode == 'byte_newlines':
wrapper = _UniversalNewlineBytesWrapper(self._io)
mode = None
else:
wrapper = io.TextIOWrapper(self._io, *text_args, **text_kwargs)
if mode:
wrapper.mode = mode
wrapper.mode = mode # pylint:disable=attribute-defined-outside-init
self.io = wrapper
self._translate = True
......@@ -151,6 +181,19 @@ class FileObjectBase(object):
def __exit__(self, *args):
self.close()
# Modes that work with native strings on Python 2
_NATIVE_PY2_MODES = ('r', 'r+', 'w', 'w+', 'a', 'a+')
if PY2:
@classmethod
def _use_FileIO(cls, mode, encoding, errors):
return mode in cls._NATIVE_PY2_MODES \
and encoding is None and errors is None
else:
@classmethod
def _use_FileIO(cls, mode, encoding, errors): # pylint:disable=unused-argument
return False
@classmethod
def _open_raw(cls, fobj, mode='r', buffering=-1,
encoding=None, errors=None, newline=None, closefd=True):
......@@ -179,8 +222,7 @@ class FileObjectBase(object):
fobj = fspath(fobj)
closefd = True
if bytes is str and mode in ('r', 'r+', 'w', 'w+', 'a', 'a+') \
and encoding is None and errors is None:
if cls._use_FileIO(mode, encoding, errors):
# Python 2, default open. Return native str type, not unicode, which
# is what would happen with io.open('r'), but we don't want to open the file
# in binary mode since that skips newline conversion.
......@@ -259,9 +301,15 @@ class FileObjectThread(FileObjectBase):
self.lock = DummySemaphore()
if not hasattr(self.lock, '__enter__'):
raise TypeError('Expected a Semaphore or boolean, got %r' % type(self.lock))
universal_newline = 'U' in mode or newline is None
mode = mode.replace('U', '')
fobj = self._open_raw(fobj, mode, bufsize,
encoding=encoding, errors=errors, newline=newline,
closefd=close)
if self._use_FileIO(mode, encoding, errors) and universal_newline:
self._translate_mode = 'byte_newlines'
self._translate = True
self.__io_holder = [fobj] # signal for _wrap_method
super(FileObjectThread, self).__init__(fobj, closefd)
......
......@@ -321,25 +321,7 @@ class FileObjectPosix(FileObjectBase):
self._translate_newline = None
if PY2 and not text_mode:
# We're going to be producing unicode objects, but
# universal newlines doesn't do that in the stdlib,
# so fix that to return str objects. The fix is two parts:
# first, set an encoding on the stream that can round-trip
# all bytes, and second, decode all bytes once they've been read.
self._translate_encoding = 'latin-1'
import functools
def wrap_method(m):
if m.__name__.startswith("read"):
@functools.wraps(m)
def wrapped(*args, **kwargs):
result = m(*args, **kwargs)
assert isinstance(result, unicode) # pylint:disable=undefined-variable
return result.encode('latin-1')
return wrapped
return m
self._wrap_method = wrap_method
self._translate_mode = 'byte_newlines'
self._orig_bufsize = bufsize
if bufsize < 0 or bufsize == 1:
......
......@@ -13,7 +13,7 @@ import gevent.testing as greentest
from gevent.testing.sysinfo import PY3
from gevent.testing.flaky import reraiseFlakyTestRaceConditionLibuv
from gevent.testing.skipping import skipOnLibuvOnCIOnPyPy
from gevent.testing.skipping import skipOnLibuv
try:
ResourceWarning
......@@ -293,6 +293,13 @@ class TestFileObjectPosix(ConcurrentFileObjectMixin,
self.assertEqual(io_ex.args, os_ex.args)
self.assertEqual(str(io_ex), str(os_ex))
@skipOnLibuv("libuv on linux raises EPERM ") # but works fine on macOS
def test_str_default_to_native(self):
TestFileObjectBlock.test_str_default_to_native(self)
@skipOnLibuv("libuv in linux raises EPERM")
def test_text_encoding(self):
TestFileObjectBlock.test_text_encoding(self)
class TestTextMode(unittest.TestCase):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment