Commit 357b5258 authored by Victor Stinner's avatar Victor Stinner

asyncio, Tulip issue 157: Improve test_events.py, avoid run_briefly() which is

not reliable
parent a01770fd
...@@ -21,10 +21,11 @@ try: ...@@ -21,10 +21,11 @@ try:
except ImportError: # pragma: no cover except ImportError: # pragma: no cover
ssl = None ssl = None
from . import tasks
from . import base_events from . import base_events
from . import events from . import events
from . import futures
from . import selectors from . import selectors
from . import tasks
if sys.platform == 'win32': # pragma: no cover if sys.platform == 'win32': # pragma: no cover
...@@ -52,18 +53,14 @@ def run_briefly(loop): ...@@ -52,18 +53,14 @@ def run_briefly(loop):
gen.close() gen.close()
def run_until(loop, pred, timeout=None): def run_until(loop, pred, timeout=30):
if timeout is not None: deadline = time.time() + timeout
deadline = time.time() + timeout
while not pred(): while not pred():
if timeout is not None: if timeout is not None:
timeout = deadline - time.time() timeout = deadline - time.time()
if timeout <= 0: if timeout <= 0:
return False raise futures.TimeoutError()
loop.run_until_complete(tasks.sleep(timeout, loop=loop)) loop.run_until_complete(tasks.sleep(0.001, loop=loop))
else:
run_briefly(loop)
return True
def run_once(loop): def run_once(loop):
......
...@@ -56,6 +56,7 @@ SIGNING_CA = data_file('pycacert.pem') ...@@ -56,6 +56,7 @@ SIGNING_CA = data_file('pycacert.pem')
class MyBaseProto(asyncio.Protocol): class MyBaseProto(asyncio.Protocol):
connected = None
done = None done = None
def __init__(self, loop=None): def __init__(self, loop=None):
...@@ -63,12 +64,15 @@ class MyBaseProto(asyncio.Protocol): ...@@ -63,12 +64,15 @@ class MyBaseProto(asyncio.Protocol):
self.state = 'INITIAL' self.state = 'INITIAL'
self.nbytes = 0 self.nbytes = 0
if loop is not None: if loop is not None:
self.connected = asyncio.Future(loop=loop)
self.done = asyncio.Future(loop=loop) self.done = asyncio.Future(loop=loop)
def connection_made(self, transport): def connection_made(self, transport):
self.transport = transport self.transport = transport
assert self.state == 'INITIAL', self.state assert self.state == 'INITIAL', self.state
self.state = 'CONNECTED' self.state = 'CONNECTED'
if self.connected:
self.connected.set_result(None)
def data_received(self, data): def data_received(self, data):
assert self.state == 'CONNECTED', self.state assert self.state == 'CONNECTED', self.state
...@@ -330,7 +334,8 @@ class EventLoopTestsMixin: ...@@ -330,7 +334,8 @@ class EventLoopTestsMixin:
def test_reader_callback(self): def test_reader_callback(self):
r, w = test_utils.socketpair() r, w = test_utils.socketpair()
bytes_read = [] r.setblocking(False)
bytes_read = bytearray()
def reader(): def reader():
try: try:
...@@ -340,37 +345,40 @@ class EventLoopTestsMixin: ...@@ -340,37 +345,40 @@ class EventLoopTestsMixin:
# at least on Linux -- see man select. # at least on Linux -- see man select.
return return
if data: if data:
bytes_read.append(data) bytes_read.extend(data)
else: else:
self.assertTrue(self.loop.remove_reader(r.fileno())) self.assertTrue(self.loop.remove_reader(r.fileno()))
r.close() r.close()
self.loop.add_reader(r.fileno(), reader) self.loop.add_reader(r.fileno(), reader)
self.loop.call_soon(w.send, b'abc') self.loop.call_soon(w.send, b'abc')
test_utils.run_briefly(self.loop) test_utils.run_until(self.loop, lambda: len(bytes_read) >= 3)
self.loop.call_soon(w.send, b'def') self.loop.call_soon(w.send, b'def')
test_utils.run_briefly(self.loop) test_utils.run_until(self.loop, lambda: len(bytes_read) >= 6)
self.loop.call_soon(w.close) self.loop.call_soon(w.close)
self.loop.call_soon(self.loop.stop) self.loop.call_soon(self.loop.stop)
self.loop.run_forever() self.loop.run_forever()
self.assertEqual(b''.join(bytes_read), b'abcdef') self.assertEqual(bytes_read, b'abcdef')
def test_writer_callback(self): def test_writer_callback(self):
r, w = test_utils.socketpair() r, w = test_utils.socketpair()
w.setblocking(False) w.setblocking(False)
self.loop.add_writer(w.fileno(), w.send, b'x'*(256*1024))
test_utils.run_briefly(self.loop)
def remove_writer(): def writer(data):
self.assertTrue(self.loop.remove_writer(w.fileno())) w.send(data)
self.loop.stop()
self.loop.call_soon(remove_writer) data = b'x' * 1024
self.loop.call_soon(self.loop.stop) self.loop.add_writer(w.fileno(), writer, data)
self.loop.run_forever() self.loop.run_forever()
self.assertTrue(self.loop.remove_writer(w.fileno()))
self.assertFalse(self.loop.remove_writer(w.fileno()))
w.close() w.close()
data = r.recv(256*1024) read = r.recv(len(data) * 2)
r.close() r.close()
self.assertGreaterEqual(len(data), 200) self.assertEqual(read, data)
def _basetest_sock_client_ops(self, httpd, sock): def _basetest_sock_client_ops(self, httpd, sock):
sock.setblocking(False) sock.setblocking(False)
...@@ -464,10 +472,10 @@ class EventLoopTestsMixin: ...@@ -464,10 +472,10 @@ class EventLoopTestsMixin:
self.assertFalse(self.loop.remove_signal_handler(signal.SIGKILL)) self.assertFalse(self.loop.remove_signal_handler(signal.SIGKILL))
# Now set a handler and handle it. # Now set a handler and handle it.
self.loop.add_signal_handler(signal.SIGINT, my_handler) self.loop.add_signal_handler(signal.SIGINT, my_handler)
test_utils.run_briefly(self.loop)
os.kill(os.getpid(), signal.SIGINT) os.kill(os.getpid(), signal.SIGINT)
test_utils.run_briefly(self.loop) test_utils.run_until(self.loop, lambda: caught)
self.assertEqual(caught, 1)
# Removing it should restore the default handler. # Removing it should restore the default handler.
self.assertTrue(self.loop.remove_signal_handler(signal.SIGINT)) self.assertTrue(self.loop.remove_signal_handler(signal.SIGINT))
self.assertEqual(signal.getsignal(signal.SIGINT), self.assertEqual(signal.getsignal(signal.SIGINT),
...@@ -623,7 +631,7 @@ class EventLoopTestsMixin: ...@@ -623,7 +631,7 @@ class EventLoopTestsMixin:
self.assertIn(str(httpd.address), cm.exception.strerror) self.assertIn(str(httpd.address), cm.exception.strerror)
def test_create_server(self): def test_create_server(self):
proto = MyProto() proto = MyProto(self.loop)
f = self.loop.create_server(lambda: proto, '0.0.0.0', 0) f = self.loop.create_server(lambda: proto, '0.0.0.0', 0)
server = self.loop.run_until_complete(f) server = self.loop.run_until_complete(f)
self.assertEqual(len(server.sockets), 1) self.assertEqual(len(server.sockets), 1)
...@@ -633,14 +641,11 @@ class EventLoopTestsMixin: ...@@ -633,14 +641,11 @@ class EventLoopTestsMixin:
client = socket.socket() client = socket.socket()
client.connect(('127.0.0.1', port)) client.connect(('127.0.0.1', port))
client.sendall(b'xxx') client.sendall(b'xxx')
test_utils.run_briefly(self.loop)
test_utils.run_until(self.loop, lambda: proto is not None, 10) self.loop.run_until_complete(proto.connected)
self.assertIsInstance(proto, MyProto)
self.assertEqual('INITIAL', proto.state)
test_utils.run_briefly(self.loop)
self.assertEqual('CONNECTED', proto.state) self.assertEqual('CONNECTED', proto.state)
test_utils.run_until(self.loop, lambda: proto.nbytes > 0,
timeout=10) test_utils.run_until(self.loop, lambda: proto.nbytes > 0)
self.assertEqual(3, proto.nbytes) self.assertEqual(3, proto.nbytes)
# extra info is available # extra info is available
...@@ -650,7 +655,7 @@ class EventLoopTestsMixin: ...@@ -650,7 +655,7 @@ class EventLoopTestsMixin:
# close connection # close connection
proto.transport.close() proto.transport.close()
test_utils.run_briefly(self.loop) # windows iocp self.loop.run_until_complete(proto.done)
self.assertEqual('CLOSED', proto.state) self.assertEqual('CLOSED', proto.state)
...@@ -672,27 +677,22 @@ class EventLoopTestsMixin: ...@@ -672,27 +677,22 @@ class EventLoopTestsMixin:
@unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets') @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
def test_create_unix_server(self): def test_create_unix_server(self):
proto = MyProto() proto = MyProto(loop=self.loop)
server, path = self._make_unix_server(lambda: proto) server, path = self._make_unix_server(lambda: proto)
self.assertEqual(len(server.sockets), 1) self.assertEqual(len(server.sockets), 1)
client = socket.socket(socket.AF_UNIX) client = socket.socket(socket.AF_UNIX)
client.connect(path) client.connect(path)
client.sendall(b'xxx') client.sendall(b'xxx')
test_utils.run_briefly(self.loop)
test_utils.run_until(self.loop, lambda: proto is not None, 10)
self.assertIsInstance(proto, MyProto) self.loop.run_until_complete(proto.connected)
self.assertEqual('INITIAL', proto.state)
test_utils.run_briefly(self.loop)
self.assertEqual('CONNECTED', proto.state) self.assertEqual('CONNECTED', proto.state)
test_utils.run_until(self.loop, lambda: proto.nbytes > 0, test_utils.run_until(self.loop, lambda: proto.nbytes > 0)
timeout=10)
self.assertEqual(3, proto.nbytes) self.assertEqual(3, proto.nbytes)
# close connection # close connection
proto.transport.close() proto.transport.close()
test_utils.run_briefly(self.loop) # windows iocp self.loop.run_until_complete(proto.done)
self.assertEqual('CLOSED', proto.state) self.assertEqual('CLOSED', proto.state)
...@@ -735,12 +735,10 @@ class EventLoopTestsMixin: ...@@ -735,12 +735,10 @@ class EventLoopTestsMixin:
client, pr = self.loop.run_until_complete(f_c) client, pr = self.loop.run_until_complete(f_c)
client.write(b'xxx') client.write(b'xxx')
test_utils.run_briefly(self.loop) self.loop.run_until_complete(proto.connected)
self.assertIsInstance(proto, MyProto)
test_utils.run_briefly(self.loop)
self.assertEqual('CONNECTED', proto.state) self.assertEqual('CONNECTED', proto.state)
test_utils.run_until(self.loop, lambda: proto.nbytes > 0,
timeout=10) test_utils.run_until(self.loop, lambda: proto.nbytes > 0)
self.assertEqual(3, proto.nbytes) self.assertEqual(3, proto.nbytes)
# extra info is available # extra info is available
...@@ -774,12 +772,9 @@ class EventLoopTestsMixin: ...@@ -774,12 +772,9 @@ class EventLoopTestsMixin:
client, pr = self.loop.run_until_complete(f_c) client, pr = self.loop.run_until_complete(f_c)
client.write(b'xxx') client.write(b'xxx')
test_utils.run_briefly(self.loop) self.loop.run_until_complete(proto.connected)
self.assertIsInstance(proto, MyProto)
test_utils.run_briefly(self.loop)
self.assertEqual('CONNECTED', proto.state) self.assertEqual('CONNECTED', proto.state)
test_utils.run_until(self.loop, lambda: proto.nbytes > 0, test_utils.run_until(self.loop, lambda: proto.nbytes > 0)
timeout=10)
self.assertEqual(3, proto.nbytes) self.assertEqual(3, proto.nbytes)
# close connection # close connection
...@@ -1044,15 +1039,9 @@ class EventLoopTestsMixin: ...@@ -1044,15 +1039,9 @@ class EventLoopTestsMixin:
self.assertEqual('INITIALIZED', client.state) self.assertEqual('INITIALIZED', client.state)
transport.sendto(b'xxx') transport.sendto(b'xxx')
for _ in range(1000): test_utils.run_until(self.loop, lambda: server.nbytes)
if server.nbytes:
break
test_utils.run_briefly(self.loop)
self.assertEqual(3, server.nbytes) self.assertEqual(3, server.nbytes)
for _ in range(1000): test_utils.run_until(self.loop, lambda: client.nbytes)
if client.nbytes:
break
test_utils.run_briefly(self.loop)
# received # received
self.assertEqual(8, client.nbytes) self.assertEqual(8, client.nbytes)
...@@ -1097,11 +1086,11 @@ class EventLoopTestsMixin: ...@@ -1097,11 +1086,11 @@ class EventLoopTestsMixin:
self.loop.run_until_complete(connect()) self.loop.run_until_complete(connect())
os.write(wpipe, b'1') os.write(wpipe, b'1')
test_utils.run_briefly(self.loop) test_utils.run_until(self.loop, lambda: proto.nbytes >= 1)
self.assertEqual(1, proto.nbytes) self.assertEqual(1, proto.nbytes)
os.write(wpipe, b'2345') os.write(wpipe, b'2345')
test_utils.run_briefly(self.loop) test_utils.run_until(self.loop, lambda: proto.nbytes >= 5)
self.assertEqual(['INITIAL', 'CONNECTED'], proto.state) self.assertEqual(['INITIAL', 'CONNECTED'], proto.state)
self.assertEqual(5, proto.nbytes) self.assertEqual(5, proto.nbytes)
...@@ -1166,14 +1155,19 @@ class EventLoopTestsMixin: ...@@ -1166,14 +1155,19 @@ class EventLoopTestsMixin:
self.assertEqual('CONNECTED', proto.state) self.assertEqual('CONNECTED', proto.state)
transport.write(b'1') transport.write(b'1')
test_utils.run_briefly(self.loop)
data = os.read(rpipe, 1024) data = bytearray()
def reader(data):
chunk = os.read(rpipe, 1024)
data += chunk
return len(data)
test_utils.run_until(self.loop, lambda: reader(data) >= 1)
self.assertEqual(b'1', data) self.assertEqual(b'1', data)
transport.write(b'2345') transport.write(b'2345')
test_utils.run_briefly(self.loop) test_utils.run_until(self.loop, lambda: reader(data) >= 5)
data = os.read(rpipe, 1024) self.assertEqual(b'12345', data)
self.assertEqual(b'2345', data)
self.assertEqual('CONNECTED', proto.state) self.assertEqual('CONNECTED', proto.state)
os.close(rpipe) os.close(rpipe)
...@@ -1225,14 +1219,21 @@ class EventLoopTestsMixin: ...@@ -1225,14 +1219,21 @@ class EventLoopTestsMixin:
self.assertEqual('CONNECTED', proto.state) self.assertEqual('CONNECTED', proto.state)
transport.write(b'1') transport.write(b'1')
test_utils.run_briefly(self.loop)
data = os.read(master, 1024) data = bytearray()
def reader(data):
chunk = os.read(master, 1024)
data += chunk
return len(data)
test_utils.run_until(self.loop, lambda: reader(data) >= 1,
timeout=10)
self.assertEqual(b'1', data) self.assertEqual(b'1', data)
transport.write(b'2345') transport.write(b'2345')
test_utils.run_briefly(self.loop) test_utils.run_until(self.loop, lambda: reader(data) >= 5,
data = os.read(master, 1024) timeout=10)
self.assertEqual(b'2345', data) self.assertEqual(b'12345', data)
self.assertEqual('CONNECTED', proto.state) self.assertEqual('CONNECTED', proto.state)
os.close(master) os.close(master)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment