Commit 58ded201 authored by Geert Jansen's avatar Geert Jansen

Adding new source files that i forgot to add in the previous

commit.
parent 472cdf2a
# Wrapper module for stdlib os. Written by Geert Jansen.
"""
This module provides cooperative versions of os.read() and os.write().
On Posix platforms this uses non-blocking IO, on Windows a threadpool
is used.
"""
from __future__ import absolute_import
import os
import sys
from gevent.hub import get_hub, reinit
from gevent.socket import EBADF, EAGAIN
try:
import fcntl
except ImportError:
fcntl = None
__implements__ = ['read', 'write', 'fork']
__all__ = __implements__
os_read = os.read
os_write = os.write
os_fork = os.fork
def _map_errors(func, *args):
"""Map IOError to OSError."""
try:
return func(*args)
except IOError, e:
# IOError is structered like OSError in that it has two args: an error
# number and a error string. So we can just re-raise OSError passing it
# the IOError args. If at some point we want to catch other errors and
# map those to OSError as well, we need to make sure that it follows
# the OSError convention and it gets passed a valid error number and
# error string.
raise OSError(*e.args), None, sys.exc_info()[2]
def posix_read(fd, n):
"""Read up to `n` bytes from file descriptor `fd`. Return a string
containing the bytes read. If end-of-file is reached, an empty string
is returned."""
while True:
flags = _map_errors(fcntl.fcntl, fd, fcntl.F_GETFL, 0)
if not flags & os.O_NONBLOCK:
_map_errors(fcntl.fcntl, fd, fcntl.F_SETFL, flags|os.O_NONBLOCK)
try:
return os_read(fd, n)
except OSError, e:
if e.errno != EAGAIN:
raise
sys.exc_clear()
finally:
# Be sure to restore the fcntl flags before we switch into the hub.
# Sometimes multiple file descriptors share the same fcntl flags
# (e.g. when using ttys/ptys). Those other file descriptors are
# impacted by our change of flags, so we should restore them
# before any other code can possibly run.
if not flags & os.O_NONBLOCK:
_map_errors(fcntl.fcntl, fd, fcntl.F_SETFL, flags)
hub = get_hub()
event = hub.loop.io(fd, 1)
_map_errors(hub.wait, event)
def posix_write(fd, buf):
"""Write bytes from buffer `buf` to file descriptor `fd`. Return the
number of bytes written."""
while True:
flags = _map_errors(fcntl.fcntl, fd, fcntl.F_GETFL, 0)
if not flags & os.O_NONBLOCK:
_map_errors(fcntl.fcntl, fd, fcntl.F_SETFL, flags|os.O_NONBLOCK)
try:
return os_write(fd, buf)
except OSError, e:
if e.errno != EAGAIN:
raise
sys.exc_clear()
finally:
# See note in posix_read().
if not flags & os.O_NONBLOCK:
_map_errors(fcntl.fcntl, fd, fcntl.F_SETFL, flags)
hub = get_hub()
event = hub.loop.io(fd, 2)
_map_errors(hub.wait, event)
def threadpool_read(fd, n):
"""Read up to `n` bytes from file descriptor `fd`. Return a string
containing the bytes read. If end-of-file is reached, an empty string
is returned."""
threadpool = get_hub().threadpool
return _map_errors(threadpool.apply, os_read, (fd, n))
def threadpool_write(fd, buf):
"""Write bytes from buffer `buf` to file descriptor `fd`. Return the
number of bytes written."""
threadpool = get_hub().threadpool
return _map_errors(threadpool.apply, os_write, (fd, buf))
if fcntl is None:
read = threadpool_read
write = threadpool_write
else:
read = posix_read
write = posix_write
if hasattr(os, 'fork'):
def fork():
result = os_fork()
if not result:
_map_errors(reinit)
return result
else:
__all__.remove('fork')
from gevent import monkey; monkey.patch_all()
import os
from select import PIPE_BUF
from greentest import TestCase, main
from gevent import Greenlet, joinall
from gevent.socket import EAGAIN
from errno import EINTR
try:
import fcntl
except ImportError:
fcntl = None
class TestOS(TestCase):
__timeout__ = 5
def test_if_pipe_blocks(self):
r, w = os.pipe()
# set nbytes such that for sure it is > maximum pipe buffer
nbytes = 1000000
block = 'x' * 4096
buf = buffer(block)
# Lack of "nonlocal" keyword in Python 2.x:
bytesread = [0]
byteswritten = [0]
def produce():
while byteswritten[0] != nbytes:
bytesleft = nbytes - byteswritten[0]
try:
byteswritten[0] += os.write(w, buf[:min(bytesleft, 4096)])
except OSError:
code = sys.exc_info()[1].args[0]
assert code != EAGAIN
if code == EINTR:
continue
raise
def consume():
while bytesread[0] != nbytes:
bytesleft = nbytes - bytesread[0]
try:
bytesread[0] += len(os.read(r, min(bytesleft, 4096)))
except OSError:
code = sys.exc_info()[1].args[0]
assert code != EAGAIN
if code == EINTR:
continue
raise
producer = Greenlet(produce)
producer.start()
consumer = Greenlet(consume)
consumer.start_later(1)
# If patching was not succesful, the producer will have filled
# the pipe before the consumer starts, and would block the entire
# process. Therefore the next line would never finish.
joinall([producer, consumer])
assert bytesread[0] == nbytes
assert bytesread[0] == byteswritten[0]
def test_fd_flags_restored(self):
if fcntl is None:
return
r, w = os.pipe()
flags = fcntl.fcntl(r, fcntl.F_GETFL, 0)
assert not flags & os.O_NONBLOCK
flags = fcntl.fcntl(w, fcntl.F_GETFL, 0)
assert not flags & os.O_NONBLOCK
os.write(w, 'foo')
buf = os.read(r, 3)
assert buf == 'foo'
flags = fcntl.fcntl(r, fcntl.F_GETFL, 0)
assert not flags & os.O_NONBLOCK
flags = fcntl.fcntl(w, fcntl.F_GETFL, 0)
assert not flags & os.O_NONBLOCK
if __name__ == '__main__':
main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment