Commit 668765cb authored by Jason Madden's avatar Jason Madden

Update the changes from #409 and merge them. Use this new FileObjectPosix to...

Update the changes from #409 and merge them. Use this new FileObjectPosix to implement the socket._fileobject where possible. Fixes #409.
parent 75873038
from __future__ import absolute_import
import os
import sys
from types import UnboundMethodType
from gevent._fileobjectcommon import cancel_wait_ex
from gevent._fileobjectcommon import FileObjectClosed
from gevent._socket2 import _fileobject
from gevent._socket2 import _get_memory
from gevent.hub import get_hub, PYPY, integer_types
from gevent.os import _read
from gevent.os import _write
from gevent.os import ignored_errors
from gevent.os import make_nonblocking
from gevent.socket import EBADF
try:
from gevent._util import SocketAdapter__del__, noop
except ImportError:
SocketAdapter__del__ = None
noop = None
class NA(object):
def __repr__(self):
return 'N/A'
NA = NA()
class SocketAdapter(object):
"""Socket-like API on top of a file descriptor.
The main purpose of it is to re-use _fileobject to create proper cooperative file objects
from file descriptors on POSIX platforms.
"""
def __init__(self, fileno, mode=None, close=True):
if not isinstance(fileno, integer_types):
raise TypeError('fileno must be int: %r' % fileno)
self._fileno = fileno
self._mode = mode or 'rb'
self._close = close
self._translate = 'U' in self._mode
make_nonblocking(fileno)
self._eat_newline = False
self.hub = get_hub()
io = self.hub.loop.io
self._read_event = io(fileno, 1)
self._write_event = io(fileno, 2)
self._refcount = 1
def __repr__(self):
if self._fileno is None:
return '<%s at 0x%x closed>' % (self.__class__.__name__, id(self))
else:
args = (self.__class__.__name__, id(self), getattr(self, '_fileno', NA), getattr(self, '_mode', NA))
return '<%s at 0x%x (%r, %r)>' % args
def makefile(self, *args, **kwargs):
return _fileobject(self, *args, **kwargs)
def fileno(self):
result = self._fileno
if result is None:
raise IOError(EBADF, 'Bad file descriptor (%s object is closed)' % self.__class__.__name__)
return result
def detach(self):
x = self._fileno
self._fileno = None
return x
def _reuse(self):
self._refcount += 1
def _drop(self):
self._refcount -= 1
if self._refcount <= 0:
self._realclose()
def close(self):
self._drop()
def _realclose(self):
self.hub.cancel_wait(self._read_event, cancel_wait_ex)
self.hub.cancel_wait(self._write_event, cancel_wait_ex)
fileno = self._fileno
if fileno is not None:
self._fileno = None
if self._close:
os.close(fileno)
def sendall(self, data):
fileno = self.fileno()
bytes_total = len(data)
bytes_written = 0
while True:
try:
bytes_written += _write(fileno, _get_memory(data, bytes_written))
except (IOError, OSError) as ex:
code = ex.args[0]
if code not in ignored_errors:
raise
sys.exc_clear()
if bytes_written >= bytes_total:
return
self.hub.wait(self._write_event)
def recv(self, size):
while True:
try:
data = _read(self.fileno(), size)
except (IOError, OSError) as ex:
code = ex.args[0]
if code not in ignored_errors:
raise
sys.exc_clear()
else:
if not self._translate or not data:
return data
if self._eat_newline:
self._eat_newline = False
if data.startswith(b'\n'):
data = data[1:]
if not data:
return self.recv(size)
if data.endswith(b'\r'):
self._eat_newline = True
return self._translate_newlines(data)
self.hub.wait(self._read_event)
def _translate_newlines(self, data):
data = data.replace(b"\r\n", b"\n")
data = data.replace(b"\r", b"\n")
return data
if not SocketAdapter__del__:
def __del__(self, close=os.close):
fileno = self._fileno
if fileno is not None:
close(fileno)
if SocketAdapter__del__:
SocketAdapter.__del__ = UnboundMethodType(SocketAdapter__del__, None, SocketAdapter)
class FileObjectPosix(_fileobject):
def __init__(self, fobj=None, mode='rb', bufsize=-1, close=True):
if isinstance(fobj, integer_types):
fileno = fobj
fobj = None
else:
fileno = fobj.fileno()
sock = SocketAdapter(fileno, mode, close=close)
self._fobj = fobj
self._closed = False
_fileobject.__init__(self, sock, mode=mode, bufsize=bufsize, close=close)
if PYPY:
sock._drop()
def __repr__(self):
if self._sock is None:
return '<%s closed>' % self.__class__.__name__
elif self._fobj is None:
return '<%s %s>' % (self.__class__.__name__, self._sock)
else:
return '<%s %s _fobj=%r>' % (self.__class__.__name__, self._sock, self._fobj)
def close(self):
if self._closed:
# make sure close() is only ran once when called concurrently
# cannot rely on self._sock for this because we need to keep that until flush() is done
return
self._closed = True
sock = self._sock
if sock is None:
return
try:
self.flush()
finally:
if self._fobj is not None or not self._close:
sock.detach()
else:
sock._drop()
self._sock = None
self._fobj = None
def __getattr__(self, item):
assert item != '_fobj'
if self._fobj is None:
raise FileObjectClosed
return getattr(self._fobj, item)
if not noop:
def __del__(self):
# disable _fileobject's __del__
pass
if noop:
FileObjectPosix.__del__ = UnboundMethodType(FileObjectPosix, None, noop)
import os
import io
from io import BufferedRandom
from io import BufferedReader
from io import BufferedWriter
from io import BytesIO
from io import DEFAULT_BUFFER_SIZE
from io import RawIOBase
from io import TextIOWrapper
from io import UnsupportedOperation
from gevent._fileobjectcommon import cancel_wait_ex
from gevent.hub import get_hub
from gevent.os import _read
from gevent.os import _write
from gevent.os import ignored_errors
from gevent.os import make_nonblocking
class GreenFileDescriptorIO(RawIOBase):
def __init__(self, fileno, mode='r', closefd=True):
RawIOBase.__init__(self)
self._closed = False
self._closefd = closefd
self._fileno = fileno
make_nonblocking(fileno)
self._readable = 'r' in mode
self._writable = 'w' in mode
self.hub = get_hub()
io = self.hub.loop.io
if self._readable:
self._read_event = io(fileno, 1)
else:
self._read_event = None
if self._writable:
self._write_event = io(fileno, 2)
else:
self._write_event = None
def readable(self):
return self._readable
def writable(self):
return self._writable
def fileno(self):
return self._fileno
@property
def closed(self):
return self._closed
def close(self):
if self._closed:
return
self.flush()
self._closed = True
if self._readable:
self.hub.cancel_wait(self._read_event, cancel_wait_ex)
if self._writable:
self.hub.cancel_wait(self._write_event, cancel_wait_ex)
fileno = self._fileno
if self._closefd:
self._fileno = None
os.close(fileno)
def read(self, n=1):
if not self._readable:
raise UnsupportedOperation('readinto')
while True:
try:
return _read(self._fileno, n)
except (IOError, OSError) as ex:
if ex.args[0] not in ignored_errors:
raise
self.hub.wait(self._read_event)
def readall(self):
ret = BytesIO()
while True:
data = self.read(DEFAULT_BUFFER_SIZE)
if not data:
break
ret.write(data)
return ret.getvalue()
def readinto(self, b):
data = self.read(len(b))
n = len(data)
try:
b[:n] = data
except TypeError as err:
import array
if not isinstance(b, array.array):
raise err
b[:n] = array.array(b'b', data)
return n
def write(self, b):
if not self._writable:
raise UnsupportedOperation('write')
while True:
try:
return _write(self._fileno, b)
except (IOError, OSError) as ex:
if ex.args[0] not in ignored_errors:
raise
self.hub.wait(self._write_event)
class FileObjectPosix:
default_bufsize = io.DEFAULT_BUFFER_SIZE
def __init__(self, fobj, mode='rb', bufsize=-1, close=True):
if isinstance(fobj, int):
fileno = fobj
fobj = None
else:
fileno = fobj.fileno()
if not isinstance(fileno, int):
raise TypeError('fileno must be int: %r' % fileno)
mode = (mode or 'rb').replace('b', '')
if 'U' in mode:
self._translate = True
mode = mode.replace('U', '')
else:
self._translate = False
assert len(mode) == 1, 'mode can only be [rb, rU, wb]'
self._fobj = fobj
self._closed = False
self._close = close
self.fileio = GreenFileDescriptorIO(fileno, mode, closefd=close)
if bufsize < 0:
bufsize = self.default_bufsize
if mode == 'r':
if bufsize == 0:
bufsize = 1
elif bufsize == 1:
bufsize = self.default_bufsize
self.io = BufferedReader(self.fileio, bufsize)
elif mode == 'w':
if bufsize == 0:
bufsize = 1
elif bufsize == 1:
bufsize = self.default_bufsize
self.io = BufferedWriter(self.fileio, bufsize)
else:
# QQQ: not used
self.io = BufferedRandom(self.fileio, bufsize)
if self._translate:
self.io = TextIOWrapper(self.io)
@property
def closed(self):
"""True if the file is cloed"""
return self._closed
def close(self):
if self._closed:
# make sure close() is only ran once when called concurrently
return
self._closed = True
try:
self.io.close()
self.fileio.close()
finally:
self._fobj = None
def flush(self):
self.io.flush()
def fileno(self):
return self.io.fileno()
def write(self, data):
self.io.write(data)
def writelines(self, list):
self.io.writelines(list)
def read(self, size=-1):
return self.io.read(size)
def readline(self, size=-1):
return self.io.readline(size)
def readlines(self, sizehint=0):
return self.io.readlines(sizehint)
def __iter__(self):
return self.io
from gevent._socketcommon import EBADF
cancel_wait_ex = IOError(EBADF, 'File descriptor was closed in another greenlet')
FileObjectClosed = IOError(EBADF, 'Bad file descriptor (FileObject was closed)')
......@@ -431,7 +431,7 @@ def _do_reuse_or_drop(socket, methname):
from io import BytesIO
class _fileobject(object):
class _basefileobject(object):
"""Faux file object attached to a socket object."""
default_bufsize = 8192
......@@ -728,6 +728,41 @@ class _fileobject(object):
if not line:
raise StopIteration
return line
__next__ = next
try:
from gevent.fileobject import FileObjectPosix
except ImportError:
# Manual implementation
_fileobject = _basefileobject
else:
class _fileobject(FileObjectPosix):
# Add the proper drop/reuse support for pypy, and match
# the close=False default in the constructor
def __init__(self, sock, mode='rb', bufsize=-1, close=False):
_do_reuse_or_drop(sock, '_reuse')
self._sock = sock
FileObjectPosix.__init__(self, sock, mode, bufsize, close)
def close(self):
try:
if self._sock:
self.flush()
finally:
s = self._sock
self._sock = None
if s is not None:
if self._close:
FileObjectPosix.close(self)
else:
_do_reuse_or_drop(s, '_drop')
def __del__(self):
try:
self.close()
except:
# close() may fail if __init__ didn't complete
pass
__all__ = __implements__ + __extensions__ + __imports__
from __future__ import absolute_import
import sys
import os
from gevent._fileobjectcommon import FileObjectClosed
from gevent.hub import get_hub
from gevent.hub import integer_types
from gevent.hub import PY3
from gevent.socket import EBADF
from gevent.os import _read, _write, ignored_errors
from gevent.lock import Semaphore, DummySemaphore
......@@ -35,203 +34,10 @@ if fcntl is None:
else:
from gevent.socket import _fileobject, _get_memory
cancel_wait_ex = IOError(EBADF, 'File descriptor was closed in another greenlet')
from gevent.os import make_nonblocking
try:
from gevent._util import SocketAdapter__del__, noop
except ImportError:
SocketAdapter__del__ = None
noop = None
if PY3:
UnboundMethodType = None
from gevent._fileobject3 import FileObjectPosix
else:
from types import UnboundMethodType
class NA(object):
def __repr__(self):
return 'N/A'
NA = NA()
class SocketAdapter(object):
"""Socket-like API on top of a file descriptor.
The main purpose of it is to re-use _fileobject to create proper cooperative file objects
from file descriptors on POSIX platforms.
"""
def __init__(self, fileno, mode=None, close=True):
if not isinstance(fileno, integer_types):
raise TypeError('fileno must be int: %r' % fileno)
self._fileno = fileno
self._mode = mode or 'rb'
self._close = close
self._translate = 'U' in self._mode
make_nonblocking(fileno)
self._eat_newline = False
self.hub = get_hub()
io = self.hub.loop.io
self._read_event = io(fileno, 1)
self._write_event = io(fileno, 2)
self._refcount = 1
def __repr__(self):
if self._fileno is None:
return '<%s at 0x%x closed>' % (self.__class__.__name__, id(self))
else:
args = (self.__class__.__name__, id(self), getattr(self, '_fileno', NA), getattr(self, '_mode', NA))
return '<%s at 0x%x (%r, %r)>' % args
def makefile(self, *args, **kwargs):
return _fileobject(self, *args, **kwargs)
def fileno(self):
result = self._fileno
if result is None:
raise IOError(EBADF, 'Bad file descriptor (%s object is closed)' % self.__class__.__name)
return result
def detach(self):
x = self._fileno
self._fileno = None
return x
def _reuse(self):
self._refcount += 1
def _drop(self):
self._refcount -= 1
if self._refcount <= 0:
self._realclose()
def close(self):
self._drop()
def _realclose(self):
self.hub.cancel_wait(self._read_event, cancel_wait_ex)
self.hub.cancel_wait(self._write_event, cancel_wait_ex)
fileno = self._fileno
if fileno is not None:
self._fileno = None
if self._close:
os.close(fileno)
def sendall(self, data):
fileno = self.fileno()
bytes_total = len(data)
bytes_written = 0
while True:
try:
bytes_written += _write(fileno, _get_memory(data, bytes_written))
except (IOError, OSError) as ex:
code = ex.args[0]
if code not in ignored_errors:
raise
_exc_clear()
if bytes_written >= bytes_total:
return
self.hub.wait(self._write_event)
def recv(self, size):
while True:
try:
data = _read(self.fileno(), size)
except (IOError, OSError) as ex:
code = ex.args[0]
if code not in ignored_errors:
raise
_exc_clear()
else:
if not self._translate or not data:
return data
if self._eat_newline:
self._eat_newline = False
if data.startswith(b'\n'):
data = data[1:]
if not data:
return self.recv(size)
if data.endswith(b'\r'):
self._eat_newline = True
return self._translate_newlines(data)
self.hub.wait(self._read_event)
def _translate_newlines(self, data):
data = data.replace(b"\r\n", b"\n")
data = data.replace(b"\r", b"\n")
return data
if not SocketAdapter__del__:
def __del__(self, close=os.close):
fileno = self._fileno
if fileno is not None:
close(fileno)
elif PY3:
def __del__(self, close=os.close):
SocketAdapter__del__(self, close=close)
if SocketAdapter__del__ and not PY3:
SocketAdapter.__del__ = UnboundMethodType(SocketAdapter__del__, None, SocketAdapter)
class FileObjectPosix(_fileobject):
def __init__(self, fobj=None, mode='rb', bufsize=-1, close=True):
if isinstance(fobj, integer_types):
fileno = fobj
fobj = None
else:
fileno = fobj.fileno()
sock = SocketAdapter(fileno, mode, close=close)
self._fobj = fobj
self._closed = False
_fileobject.__init__(self, sock, mode=mode, bufsize=bufsize, close=close)
if PYPY:
sock._drop()
def __repr__(self):
if self._sock is None:
return '<%s closed>' % self.__class__.__name__
elif self._fobj is None:
return '<%s %s>' % (self.__class__.__name__, self._sock)
else:
return '<%s %s _fobj=%r>' % (self.__class__.__name__, self._sock, self._fobj)
def close(self):
if self._closed:
# make sure close() is only ran once when called concurrently
# cannot rely on self._sock for this because we need to keep that until flush() is done
return
self._closed = True
sock = self._sock
if sock is None:
return
try:
self.flush()
finally:
if self._fobj is not None or not self._close:
sock.detach()
else:
sock._drop()
self._sock = None
self._fobj = None
def __getattr__(self, item):
assert item != '_fobj'
if self._fobj is None:
raise FileObjectClosed
return getattr(self._fobj, item)
if not noop or PY3:
def __del__(self):
# disable _fileobject's __del__
pass
if noop and not PY3:
FileObjectPosix.__del__ = UnboundMethodType(FileObjectPosix, None, noop)
from gevent._fileobject2 import FileObjectPosix
class FileObjectThread(object):
......@@ -310,9 +116,6 @@ class FileObjectThread(object):
__next__ = next
FileObjectClosed = IOError(EBADF, 'Bad file descriptor (FileObject was closed)')
try:
FileObject = FileObjectPosix
except NameError:
......
......@@ -33,7 +33,7 @@ NOT_IMPLEMENTED = {
COULD_BE_MISSING = {
'socket': ['create_connection', 'RAND_add', 'RAND_egd', 'RAND_status']}
NO_ALL = ['gevent.threading', 'gevent._util', 'gevent._socketcommon']
NO_ALL = ['gevent.threading', 'gevent._util', 'gevent._socketcommon', 'gevent._fileobjectcommon']
class Test(unittest.TestCase):
......
......@@ -53,7 +53,7 @@ class Test(greentest.TestCase):
g = gevent.spawn(writer, FileObject(w, 'wb'), lines)
try:
result = FileObject(r, 'rU').read()
self.assertEqual(b'line1\nline2\nline3\nline4\nline5\nline6', result)
self.assertEqual('line1\nline2\nline3\nline4\nline5\nline6', result)
finally:
g.kill()
......
......@@ -77,7 +77,7 @@ def pyflakes(args):
pyflakes('examples/ greentest/*.py util/ *.py')
if sys.version_info[0] == 3:
ignored_files = ['gevent/_util_py2.py', 'gevent/_socket2.py']
ignored_files = ['gevent/_util_py2.py', 'gevent/_socket2.py', 'gevent/_fileobject2.py']
else:
ignored_files = ['gevent/_socket3.py']
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment