Commit 7da80407 authored by Jason Madden's avatar Jason Madden

More cleanups for libuv on windows.

parent 977625b4
...@@ -3,19 +3,23 @@ from __future__ import print_function ...@@ -3,19 +3,23 @@ from __future__ import print_function
import gevent import gevent
from gevent import subprocess from gevent import subprocess
import sys
# run 2 jobs in parallel if sys.platform.startswith("win"):
p1 = subprocess.Popen(['uname'], stdout=subprocess.PIPE) print("Unable to run on windows")
p2 = subprocess.Popen(['ls'], stdout=subprocess.PIPE) else:
# run 2 jobs in parallel
p1 = subprocess.Popen(['uname'], stdout=subprocess.PIPE)
p2 = subprocess.Popen(['ls'], stdout=subprocess.PIPE)
gevent.wait([p1, p2], timeout=2) gevent.wait([p1, p2], timeout=2)
# print the results (if available) # print the results (if available)
if p1.poll() is not None: if p1.poll() is not None:
print('uname: %r' % p1.stdout.read()) print('uname: %r' % p1.stdout.read())
else: else:
print('uname: job is still running') print('uname: job is still running')
if p2.poll() is not None: if p2.poll() is not None:
print('ls: %r' % p2.stdout.read()) print('ls: %r' % p2.stdout.read())
else: else:
print('ls: job is still running') print('ls: job is still running')
...@@ -11,6 +11,7 @@ import sys ...@@ -11,6 +11,7 @@ import sys
PY2 = sys.version_info[0] == 2 PY2 = sys.version_info[0] == 2
PY3 = sys.version_info[0] >= 3 PY3 = sys.version_info[0] >= 3
PYPY = hasattr(sys, 'pypy_version_info') PYPY = hasattr(sys, 'pypy_version_info')
WIN = sys.platform.startswith("win")
## Types ## Types
......
...@@ -185,6 +185,13 @@ class loop(AbstractLoop): ...@@ -185,6 +185,13 @@ class loop(AbstractLoop):
""" """
Return all the handles that are open and their ref status. Return all the handles that are open and their ref status.
""" """
# XXX: Disabled because, at least on Windows, the times this
# gets called often produce `SystemError: ffi.from_handle():
# dead or bogus handle object`, and sometimes that crashes the process.
return []
def _really_debug(self):
handle_state = namedtuple("HandleState", handle_state = namedtuple("HandleState",
['handle', ['handle',
'watcher', 'watcher',
......
...@@ -11,7 +11,6 @@ import gevent.libuv._corecffi as _corecffi # pylint:disable=no-name-in-module,im ...@@ -11,7 +11,6 @@ import gevent.libuv._corecffi as _corecffi # pylint:disable=no-name-in-module,im
ffi = _corecffi.ffi ffi = _corecffi.ffi
libuv = _corecffi.lib libuv = _corecffi.lib
from gevent._ffi import watcher as _base from gevent._ffi import watcher as _base
_closing_handles = set() _closing_handles = set()
...@@ -32,7 +31,7 @@ def _pid_dbg(*args, **kwargs): ...@@ -32,7 +31,7 @@ def _pid_dbg(*args, **kwargs):
kwargs['file'] = sys.stderr kwargs['file'] = sys.stderr
print(os.getpid(), *args, **kwargs) print(os.getpid(), *args, **kwargs)
# _dbg = _pid_dbg #_dbg = _pid_dbg
_events = [(libuv.UV_READABLE, "READ"), _events = [(libuv.UV_READABLE, "READ"),
(libuv.UV_WRITABLE, "WRITE")] (libuv.UV_WRITABLE, "WRITE")]
...@@ -232,12 +231,13 @@ class io(_base.IoMixin, watcher): ...@@ -232,12 +231,13 @@ class io(_base.IoMixin, watcher):
class _multiplexwatcher(object): class _multiplexwatcher(object):
callback = None
args = ()
pass_events = False
ref = True
def __init__(self, events, watcher): def __init__(self, events, watcher):
self.events = events self.events = events
self.callback = None
self.args = ()
self.pass_events = False
self.ref = True
# References: # References:
# These objects keep the original IO object alive; # These objects keep the original IO object alive;
...@@ -248,6 +248,7 @@ class io(_base.IoMixin, watcher): ...@@ -248,6 +248,7 @@ class io(_base.IoMixin, watcher):
self._watcher_ref = watcher self._watcher_ref = watcher
def start(self, callback, *args, **kwargs): def start(self, callback, *args, **kwargs):
_dbg("Starting IO multiplex watcher for", self.fd, callback)
self.pass_events = kwargs.get("pass_events") self.pass_events = kwargs.get("pass_events")
self.callback = callback self.callback = callback
self.args = args self.args = args
...@@ -257,6 +258,7 @@ class io(_base.IoMixin, watcher): ...@@ -257,6 +258,7 @@ class io(_base.IoMixin, watcher):
watcher._io_start() watcher._io_start()
def stop(self): def stop(self):
_dbg("Stopping IO multiplex watcher for", self.fd, self.callback)
self.callback = None self.callback = None
self.pass_events = None self.pass_events = None
self.args = None self.args = None
...@@ -316,14 +318,22 @@ class io(_base.IoMixin, watcher): ...@@ -316,14 +318,22 @@ class io(_base.IoMixin, watcher):
# the reader, we get a LoopExit. So we can't return here and arguably shouldn't print it # the reader, we get a LoopExit. So we can't return here and arguably shouldn't print it
# either. The negative events mask will match the watcher's mask. # either. The negative events mask will match the watcher's mask.
# See test__fileobject.py:Test.test_newlines for an example. # See test__fileobject.py:Test.test_newlines for an example.
# On Windows (at least with PyPy), we can get ENOTSOCK (socket operation on non-socket)
# if a socket gets closed. If we don't pass the events on, we hang.
# See test__makefile_ref.TestSSL for examples.
# return # return
_dbg("Callback event for watcher", self._fd, "event", events)
for watcher_ref in self._multiplex_watchers: for watcher_ref in self._multiplex_watchers:
watcher = watcher_ref() watcher = watcher_ref()
if not watcher or not watcher.callback: if not watcher or not watcher.callback:
continue continue
if events & watcher.events: _dbg("Event for watcher", self._fd, events, watcher.events, events & watcher.events)
send_event = (events & watcher.events) or events < 0
if send_event:
if not watcher.pass_events: if not watcher.pass_events:
watcher.callback(*watcher.args) watcher.callback(*watcher.args)
else: else:
......
...@@ -45,7 +45,7 @@ VERBOSE = sys.argv.count('-v') > 1 ...@@ -45,7 +45,7 @@ VERBOSE = sys.argv.count('-v') > 1
WIN = sys.platform.startswith("win") WIN = sys.platform.startswith("win")
# XXX: Formalize this better # XXX: Formalize this better
LIBUV = os.getenv('GEVENT_CORE_CFFI_ONLY') == 'libuv' or (PYPY and WIN) LIBUV = os.getenv('GEVENT_CORE_CFFI_ONLY') == 'libuv' or (PYPY and WIN) or hasattr(gevent.core, 'libuv')
if '--debug-greentest' in sys.argv: if '--debug-greentest' in sys.argv:
...@@ -383,7 +383,11 @@ CI_TIMEOUT = 10 ...@@ -383,7 +383,11 @@ CI_TIMEOUT = 10
if PY3 and PYPY: if PY3 and PYPY:
# pypy3 is very slow right now # pypy3 is very slow right now
CI_TIMEOUT = 15 CI_TIMEOUT = 15
LOCAL_TIMEOUT = 1 if PYPY and WIN and LIBUV:
# slow and flaky timeouts
LOCAL_TIMEOUT = CI_TIMEOUT
else:
LOCAL_TIMEOUT = 1
DEFAULT_LOCAL_HOST_ADDR = 'localhost' DEFAULT_LOCAL_HOST_ADDR = 'localhost'
DEFAULT_LOCAL_HOST_ADDR6 = DEFAULT_LOCAL_HOST_ADDR DEFAULT_LOCAL_HOST_ADDR6 = DEFAULT_LOCAL_HOST_ADDR
...@@ -793,12 +797,17 @@ def _run_lsof(): ...@@ -793,12 +797,17 @@ def _run_lsof():
os.close(fd) os.close(fd)
lsof_command = 'lsof -p %s > %s' % (pid, tmpname) lsof_command = 'lsof -p %s > %s' % (pid, tmpname)
if os.system(lsof_command): if os.system(lsof_command):
raise OSError("lsof failed") # XXX: This prints to the console an annoying message: 'lsof is not recognized'
raise unittest.SkipTest("lsof failed")
with open(tmpname) as fobj: with open(tmpname) as fobj:
data = fobj.read().strip() data = fobj.read().strip()
os.remove(tmpname) os.remove(tmpname)
return data return data
if WIN:
def _run_lsof():
raise unittest.SkipTest("lsof not expected on Windows")
def default_get_open_files(pipes=False): def default_get_open_files(pipes=False):
data = _run_lsof() data = _run_lsof()
results = {} results = {}
...@@ -831,7 +840,7 @@ def default_get_number_open_files(): ...@@ -831,7 +840,7 @@ def default_get_number_open_files():
else: else:
try: try:
return len(get_open_files(pipes=True)) - 1 return len(get_open_files(pipes=True)) - 1
except (OSError, AssertionError): except (OSError, AssertionError, unittest.SkipTest):
return 0 return 0
lsof_get_open_files = default_get_open_files lsof_get_open_files = default_get_open_files
......
...@@ -17,7 +17,8 @@ EV_USE_INOTIFY = getattr(gevent.core, 'EV_USE_INOTIFY', None) ...@@ -17,7 +17,8 @@ EV_USE_INOTIFY = getattr(gevent.core, 'EV_USE_INOTIFY', None)
WIN = sys.platform.startswith('win') WIN = sys.platform.startswith('win')
try: def test():
try:
open(filename, 'wb', buffering=0).close() open(filename, 'wb', buffering=0).close()
assert os.path.exists(filename), filename assert os.path.exists(filename), filename
...@@ -62,7 +63,12 @@ try: ...@@ -62,7 +63,12 @@ try:
with gevent.Timeout(5 + DELAY + 0.5): with gevent.Timeout(5 + DELAY + 0.5):
hub.wait(watcher) hub.wait(watcher)
reaction = time.time() - start - DELAY now = time.time()
if now - start <= 0.0:
# Sigh. This is especially true on PyPy.
assert WIN, ("Bad timer resolution expected on Windows, test is useless", start, now)
return
reaction = now - start - DELAY
print('Watcher %s reacted after %.4f seconds (write)' % (watcher, reaction)) print('Watcher %s reacted after %.4f seconds (write)' % (watcher, reaction))
if reaction >= DELAY and EV_USE_INOTIFY: if reaction >= DELAY and EV_USE_INOTIFY:
print('WARNING: inotify failed (write)') print('WARNING: inotify failed (write)')
...@@ -88,6 +94,9 @@ try: ...@@ -88,6 +94,9 @@ try:
check_attr('attr', True) check_attr('attr', True)
check_attr('prev', False) check_attr('prev', False)
finally: finally:
if os.path.exists(filename): if os.path.exists(filename):
os.unlink(filename) os.unlink(filename)
if __name__ == '__main__':
test()
...@@ -105,6 +105,7 @@ class Test(TestCase): ...@@ -105,6 +105,7 @@ class Test(TestCase):
def make_open_socket(self): def make_open_socket(self):
s = socket.socket() s = socket.socket()
s.bind(('127.0.0.1', 0)) s.bind(('127.0.0.1', 0))
self._close_on_teardown(s)
if WIN: if WIN:
# Windows doesn't show as open until this # Windows doesn't show as open until this
s.listen(1) s.listen(1)
...@@ -239,12 +240,34 @@ class TestSocket(Test): ...@@ -239,12 +240,34 @@ class TestSocket(Test):
listener.close() listener.close()
class TestSSL(Test): class TestSSL(Test):
def _ssl_connect_task(self, connector, port):
connector.connect(('127.0.0.1', port))
try:
# Note: We get ResourceWarning about 'x'
# on Python 3 if we don't join the spawned thread
x = ssl.wrap_socket(connector)
except socket.error:
# Observed on Windows with PyPy2 5.9.0 and libuv:
# if we don't switch in a timely enough fashion,
# the server side runs ahead of us and closes
# our socket first, so this fails.
pass
else:
self._close_on_teardown(x)
def _make_ssl_connect_task(self, connector, port):
t = threading.Thread(target=self._ssl_connect_task, args=(connector, port))
t.daemon = True
return t
def test_simple_close(self): def test_simple_close(self):
s = self.make_open_socket() s = self.make_open_socket()
fileno = s.fileno() fileno = s.fileno()
s = ssl.wrap_socket(s) s = ssl.wrap_socket(s)
self._close_on_teardown(s)
fileno = s.fileno() fileno = s.fileno()
self.assert_open(s, fileno) self.assert_open(s, fileno)
s.close() s.close()
...@@ -255,6 +278,7 @@ class TestSSL(Test): ...@@ -255,6 +278,7 @@ class TestSSL(Test):
fileno = s.fileno() fileno = s.fileno()
s = ssl.wrap_socket(s) s = ssl.wrap_socket(s)
self._close_on_teardown(s)
fileno = s.fileno() fileno = s.fileno()
self.assert_open(s, fileno) self.assert_open(s, fileno)
f = s.makefile() f = s.makefile()
...@@ -269,6 +293,7 @@ class TestSSL(Test): ...@@ -269,6 +293,7 @@ class TestSSL(Test):
fileno = s.fileno() fileno = s.fileno()
s = ssl.wrap_socket(s) s = ssl.wrap_socket(s)
self._close_on_teardown(s)
fileno = s.fileno() fileno = s.fileno()
self.assert_open(s, fileno) self.assert_open(s, fileno)
f = s.makefile() f = s.makefile()
...@@ -288,24 +313,22 @@ class TestSSL(Test): ...@@ -288,24 +313,22 @@ class TestSSL(Test):
connector = socket.socket() connector = socket.socket()
self._close_on_teardown(connector) self._close_on_teardown(connector)
def connect(): t = self._make_ssl_connect_task(connector, port)
connector.connect(('127.0.0.1', port))
x = ssl.wrap_socket(connector)
self._close_on_teardown(x)
t = threading.Thread(target=connect)
t.start() t.start()
try: try:
client_socket, _addr = listener.accept() client_socket, _addr = listener.accept()
self._close_on_teardown(client_socket)
client_socket = ssl.wrap_socket(client_socket, keyfile=certfile, certfile=certfile, server_side=True) client_socket = ssl.wrap_socket(client_socket, keyfile=certfile, certfile=certfile, server_side=True)
self._close_on_teardown(client_socket)
fileno = client_socket.fileno() fileno = client_socket.fileno()
self.assert_open(client_socket, fileno) self.assert_open(client_socket, fileno)
client_socket.close() client_socket.close()
self.assert_closed(client_socket, fileno) self.assert_closed(client_socket, fileno)
finally: finally:
t.join()
listener.close() listener.close()
connector.close()
t.join()
def test_server_makefile1(self): def test_server_makefile1(self):
listener = socket.socket() listener = socket.socket()
...@@ -314,20 +337,18 @@ class TestSSL(Test): ...@@ -314,20 +337,18 @@ class TestSSL(Test):
port = listener.getsockname()[1] port = listener.getsockname()[1]
listener.listen(1) listener.listen(1)
connector = socket.socket() connector = socket.socket()
self._close_on_teardown(connector) self._close_on_teardown(connector)
def connect(): t = self._make_ssl_connect_task(connector, port)
connector.connect(('127.0.0.1', port))
x = ssl.wrap_socket(connector)
self._close_on_teardown(x)
t = threading.Thread(target=connect)
t.start() t.start()
try: try:
client_socket, _addr = listener.accept() client_socket, _addr = listener.accept()
self._close_on_teardown(client_socket)
client_socket = ssl.wrap_socket(client_socket, keyfile=certfile, certfile=certfile, server_side=True) client_socket = ssl.wrap_socket(client_socket, keyfile=certfile, certfile=certfile, server_side=True)
self._close_on_teardown(client_socket)
fileno = client_socket.fileno() fileno = client_socket.fileno()
self.assert_open(client_socket, fileno) self.assert_open(client_socket, fileno)
f = client_socket.makefile() f = client_socket.makefile()
...@@ -337,8 +358,9 @@ class TestSSL(Test): ...@@ -337,8 +358,9 @@ class TestSSL(Test):
f.close() f.close()
self.assert_closed(client_socket, fileno) self.assert_closed(client_socket, fileno)
finally: finally:
t.join() listener.close()
connector.close() connector.close()
t.join()
def test_server_makefile2(self): def test_server_makefile2(self):
listener = socket.socket() listener = socket.socket()
...@@ -349,17 +371,14 @@ class TestSSL(Test): ...@@ -349,17 +371,14 @@ class TestSSL(Test):
connector = socket.socket() connector = socket.socket()
self._close_on_teardown(connector) self._close_on_teardown(connector)
def connect(): t = self._make_ssl_connect_task(connector, port)
connector.connect(('127.0.0.1', port))
x = ssl.wrap_socket(connector)
self._close_on_teardown(x)
t = threading.Thread(target=connect)
t.start() t.start()
try: try:
client_socket, _addr = listener.accept() client_socket, _addr = listener.accept()
self._close_on_teardown(client_socket)
client_socket = ssl.wrap_socket(client_socket, keyfile=certfile, certfile=certfile, server_side=True) client_socket = ssl.wrap_socket(client_socket, keyfile=certfile, certfile=certfile, server_side=True)
self._close_on_teardown(client_socket)
fileno = client_socket.fileno() fileno = client_socket.fileno()
self.assert_open(client_socket, fileno) self.assert_open(client_socket, fileno)
f = client_socket.makefile() f = client_socket.makefile()
...@@ -370,9 +389,10 @@ class TestSSL(Test): ...@@ -370,9 +389,10 @@ class TestSSL(Test):
client_socket.close() client_socket.close()
self.assert_closed(client_socket, fileno) self.assert_closed(client_socket, fileno)
finally: finally:
t.join()
listener.close()
connector.close() connector.close()
listener.close()
client_socket.close()
t.join()
def test_serverssl_makefile1(self): def test_serverssl_makefile1(self):
listener = socket.socket() listener = socket.socket()
...@@ -385,12 +405,7 @@ class TestSSL(Test): ...@@ -385,12 +405,7 @@ class TestSSL(Test):
connector = socket.socket() connector = socket.socket()
self._close_on_teardown(connector) self._close_on_teardown(connector)
def connect(): t = self._make_ssl_connect_task(connector, port)
connector.connect(('127.0.0.1', port))
x = ssl.wrap_socket(connector)
self._close_on_teardown(x)
t = threading.Thread(target=connect)
t.start() t.start()
try: try:
...@@ -404,9 +419,9 @@ class TestSSL(Test): ...@@ -404,9 +419,9 @@ class TestSSL(Test):
f.close() f.close()
self.assert_closed(client_socket, fileno) self.assert_closed(client_socket, fileno)
finally: finally:
t.join()
listener.close() listener.close()
connector.close() connector.close()
t.join()
def test_serverssl_makefile2(self): def test_serverssl_makefile2(self):
listener = socket.socket() listener = socket.socket()
...@@ -425,6 +440,7 @@ class TestSSL(Test): ...@@ -425,6 +440,7 @@ class TestSSL(Test):
connector.close() connector.close()
t = threading.Thread(target=connect) t = threading.Thread(target=connect)
t.daemon = True
t.start() t.start()
try: try:
...@@ -443,8 +459,8 @@ class TestSSL(Test): ...@@ -443,8 +459,8 @@ class TestSSL(Test):
client_socket.close() client_socket.close()
self.assert_closed(client_socket, fileno) self.assert_closed(client_socket, fileno)
finally: finally:
t.join()
listener.close() listener.close()
t.join()
if __name__ == '__main__': if __name__ == '__main__':
......
...@@ -15,7 +15,8 @@ PYPY = hasattr(sys, 'pypy_version_info') ...@@ -15,7 +15,8 @@ PYPY = hasattr(sys, 'pypy_version_info')
class TestCase(greentest.TestCase): class TestCase(greentest.TestCase):
# These generally need more time
__timeout__ = greentest.CI_TIMEOUT
pool = None pool = None
def cleanup(self): def cleanup(self):
...@@ -107,7 +108,7 @@ def sqr_random_sleep(x): ...@@ -107,7 +108,7 @@ def sqr_random_sleep(x):
TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.082, 0.035, 0.14 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.082, 0.035, 0.14
class _AbstractPoolTest(TestCase): class _AbstractPoolTest(TestCase):
__timeout__ = 5
size = 1 size = 1
ClassUnderTest = ThreadPool ClassUnderTest = ThreadPool
...@@ -256,7 +257,7 @@ class TestPool3(TestPool): ...@@ -256,7 +257,7 @@ class TestPool3(TestPool):
class TestPool10(TestPool): class TestPool10(TestPool):
size = 10 size = 10
__timeout__ = 5
# class TestJoinSleep(greentest.GenericGetTestCase): # class TestJoinSleep(greentest.GenericGetTestCase):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment