Commit a7061556 authored by Denis Bilenko's avatar Denis Bilenko

update tests with regard to latest changes

parent 6c7b8ee8
......@@ -207,7 +207,7 @@ class TestCase0(BaseTestCase):
raise AssertionError('%r did not match:\n%r' % (message, self.stderr))
def assert_mainloop_assertion(self, message=None):
self.assert_stderr_traceback('AssertionError', 'Cannot switch to MAINLOOP from MAINLOOP')
self.assert_stderr_traceback('AssertionError', 'Impossible to call blocking function in the event loop callback')
if message is not None:
self.assert_stderr(message)
......
import gevent
from gevent.hub import get_hub
called = []
......@@ -6,16 +7,25 @@ called = []
def f():
called.append(1)
x = gevent.core.active_event(f)
assert x.pending == 1, x.pending
gevent.sleep(0)
assert x.pending == 0, x.pending
assert called == [1], called
x = gevent.core.active_event(f)
assert x.pending == 1, x.pending
x.cancel()
assert x.pending == 0, x.pending
gevent.sleep(0)
assert called == [1], called
assert x.pending == 0, x.pending
def main():
active_event = get_hub().reactor.active_event
x = active_event(f)
assert x.pending == 1, x.pending
gevent.sleep(0)
assert x.pending == 0, x.pending
assert called == [1], called
x = active_event(f)
assert x.pending == 1, x.pending
x.cancel()
assert x.pending == 0, x.pending
gevent.sleep(0)
assert called == [1], called
assert x.pending == 0, x.pending
gevent.sleep(0.1)
if __name__ == '__main__':
called[:] = []
main()
import gevent
import sys
import greentest
from gevent.hub import get_hub
sys.exc_clear()
......@@ -39,7 +40,7 @@ class Test(greentest.TestCase):
assert ex is error, (ex, error)
def test2(self):
gevent.core.timer(0, hello)
get_hub().reactor.timer(0, hello)
self.hook_stderr()
gevent.sleep(0.1)
self.assert_stderr_traceback(expected_error)
......
......@@ -20,15 +20,12 @@
# THE SOFTWARE.
import greentest
import unittest
import time
import re
import sys
import gevent
from gevent import core
from gevent import socket
from gevent.hub import Waiter
import signal
from gevent.hub import Waiter, get_hub
DELAY = 0.1
......@@ -37,7 +34,7 @@ class TestScheduleCall(greentest.TestCase):
def test_global(self):
lst = [1]
gevent.spawn(core.timer, DELAY, lst.pop)
gevent.spawn(get_hub().reactor.timer, DELAY, lst.pop)
gevent.sleep(DELAY * 2)
assert lst == [], lst
......@@ -47,7 +44,7 @@ class TestCloseSocketWhilePolling(greentest.TestCase):
def test(self):
try:
sock = socket.socket()
core.timer(0, sock.close)
get_hub().reactor.timer(0, sock.close)
sock.connect(('python.org', 81))
except Exception:
gevent.sleep(0)
......@@ -68,7 +65,7 @@ class TestExceptionInMainloop(greentest.TestCase):
def fail():
raise greentest.ExpectedException('TestExceptionInMainloop.test_sleep/fail')
core.timer(0, fail)
get_hub().reactor.timer(0, fail)
start = time.time()
gevent.sleep(DELAY)
......@@ -77,46 +74,6 @@ class TestExceptionInMainloop(greentest.TestCase):
assert delay >= DELAY * 0.9, 'sleep returned after %s seconds (was scheduled for %s)' % (delay, DELAY)
class TestShutdown(unittest.TestCase):
def _shutdown(self, seconds=0, fuzzy=None):
if fuzzy is None:
fuzzy = max(0.05, seconds / 2.)
start = time.time()
gevent.hub.shutdown()
delta = time.time() - start
assert seconds - fuzzy < delta < seconds + fuzzy, (seconds - fuzzy, delta, seconds + fuzzy)
def assert_hub(self):
assert 'hub' in gevent.hub._threadlocal.__dict__
def assert_no_hub(self):
assert 'hub' not in gevent.hub._threadlocal.__dict__, gevent.hub._threadlocal.__dict__
def test(self):
# make sure Hub is started. For the test case when hub is not started, see test_hub_shutdown.py
gevent.sleep(0)
assert not gevent.hub.get_hub().dead
self._shutdown()
self.assert_no_hub()
# shutting down dead hub is silent
self._shutdown()
self._shutdown()
self.assert_no_hub()
# ressurect
gevent.sleep(0)
self.assert_hub()
gevent.core.timer(0.1, lambda: None)
self.assert_hub()
self._shutdown(seconds=0.1)
self.assert_no_hub()
self._shutdown(seconds=0)
self.assert_no_hub()
class TestSleep(greentest.GenericWaitTestCase):
def wait(self, timeout):
......@@ -141,43 +98,24 @@ class TestSleep(greentest.GenericWaitTestCase):
self.assertEqual(str(gevent_ex), str(real_ex))
class Expected(Exception):
pass
if hasattr(signal, 'SIGALRM'):
class TestSignal(greentest.TestCase):
__timeout__ = 2
def test_exception_goes_to_MAIN(self):
def handler():
raise Expected('TestSignal')
gevent.signal(signal.SIGALRM, handler)
signal.alarm(1)
try:
gevent.spawn(gevent.sleep, 2).join()
raise AssertionError('must raise Expected')
except Expected, ex:
assert str(ex) == 'TestSignal', ex
class TestWaiter(greentest.GenericWaitTestCase):
class TestWaiterGet(greentest.GenericWaitTestCase):
def setUp(self):
super(TestWaiter, self).setUp()
super(TestWaiterGet, self).setUp()
self.waiter = Waiter()
def wait(self, timeout):
evt = core.timer(timeout, self.waiter.switch, None)
evt = get_hub().reactor.timer(timeout, self.waiter.switch, None)
try:
return self.waiter.get()
finally:
evt.cancel()
class TestWaiter(greentest.TestCase):
def test(self):
waiter = self.waiter
waiter = Waiter()
self.assertEqual(str(waiter), '<Waiter greenlet=None>')
waiter.switch(25)
self.assertEqual(str(waiter), '<Waiter greenlet=None value=25>')
......
from greentest import TestCase, main
import gevent
from gevent import util, core
from gevent.hub import get_hub
from gevent import util
from gevent import queue
from gevent.event import AsyncResult
......@@ -252,8 +253,10 @@ class TestNoWait(TestCase):
def store_result(func, *args):
result.append(func(*args))
core.active_event(store_result, util.wrap_errors(Exception, q.put_nowait), 2)
core.active_event(store_result, util.wrap_errors(Exception, q.put_nowait), 3)
active_event = get_hub().reactor.active_event
active_event(store_result, util.wrap_errors(Exception, q.put_nowait), 2)
active_event(store_result, util.wrap_errors(Exception, q.put_nowait), 3)
gevent.sleep(0)
assert len(result) == 2, result
assert result[0] == None, result
......@@ -267,8 +270,10 @@ class TestNoWait(TestCase):
def store_result(func, *args):
result.append(func(*args))
core.active_event(store_result, util.wrap_errors(Exception, q.get_nowait))
core.active_event(store_result, util.wrap_errors(Exception, q.get_nowait))
active_event = get_hub().reactor.active_event
active_event(store_result, util.wrap_errors(Exception, q.get_nowait))
active_event(store_result, util.wrap_errors(Exception, q.get_nowait))
gevent.sleep(0)
assert len(result) == 2, result
assert result[0] == 4, result
......@@ -288,7 +293,7 @@ class TestNoWait(TestCase):
gevent.sleep(0)
assert q.empty(), q
assert q.full(), q
core.active_event(store_result, util.wrap_errors(Exception, q.get_nowait))
get_hub().reactor.active_event(store_result, util.wrap_errors(Exception, q.get_nowait))
gevent.sleep(0)
assert q.empty(), q
assert q.full(), q
......@@ -311,7 +316,7 @@ class TestNoWait(TestCase):
gevent.sleep(0)
assert q.empty(), q
assert q.full(), q
core.active_event(store_result, util.wrap_errors(Exception, q.put_nowait), 10)
get_hub().reactor.active_event(store_result, util.wrap_errors(Exception, q.put_nowait), 10)
assert not p.ready(), p
gevent.sleep(0)
assert result == [None], result
......
......@@ -297,7 +297,7 @@ class TestDefaultSpawn(TestCase):
class TestRawSpawn(TestDefaultSpawn):
invalid_callback_message = 'Failed active_event...'
invalid_callback_message = 'Failed event...'
def get_spawn(self):
return gevent.spawn_raw
......
import unittest
import time
import gevent
from gevent.hub import get_hub
class TestShutdown(unittest.TestCase):
def _shutdown(self, seconds=0, fuzzy=None):
if fuzzy is None:
fuzzy = max(0.05, seconds / 2.)
start = time.time()
gevent.hub.shutdown()
delta = time.time() - start
assert seconds - fuzzy < delta < seconds + fuzzy, (seconds - fuzzy, delta, seconds + fuzzy)
def assert_hub(self):
assert 'hub' in gevent.hub._threadlocal.__dict__
def assert_no_hub(self):
assert 'hub' not in gevent.hub._threadlocal.__dict__, gevent.hub._threadlocal.__dict__
def test(self):
# make sure Hub is started. For the test case when hub is not started, see test_hub_shutdown.py
gevent.sleep(0)
assert not gevent.hub.get_hub().dead
self._shutdown()
self.assert_no_hub()
# shutting down dead hub is silent
self._shutdown()
self._shutdown()
self.assert_no_hub()
# ressurect
gevent.sleep(0)
self.assert_hub()
get_hub().reactor.timer(0.1, lambda: None)
self.assert_hub()
self._shutdown(seconds=0.1)
self.assert_no_hub()
self._shutdown(seconds=0)
self.assert_no_hub()
if __name__ == '__main__':
unittest.main()
import signal
import greentest
import gevent
class Expected(Exception):
pass
def raise_Expected():
raise Expected('TestSignal')
if hasattr(signal, 'SIGALRM'):
class TestSignal(greentest.TestCase):
__timeout__ = 5
def test(self):
sig = gevent.signal(signal.SIGALRM, raise_Expected)
try:
signal.alarm(1)
try:
gevent.sleep(2)
raise AssertionError('must raise Expected')
except Expected, ex:
assert str(ex) == 'TestSignal', ex
# also let's check that alarm is persistent
signal.alarm(1)
try:
gevent.sleep(2)
raise AssertionError('must raise Expected')
except Expected, ex:
assert str(ex) == 'TestSignal', ex
finally:
sig.cancel()
if __name__ == '__main__':
greentest.main()
......@@ -2,39 +2,59 @@
# -*- coding: utf-8 -*-
import sys
import re
import traceback
import greentest
import socket as real_socket
from gevent.socket import *
import gevent
from gevent.socket import gethostbyname, getaddrinfo, AF_INET, AF_INET6, AF_UNSPEC, SOCK_DGRAM, SOCK_STREAM, gaierror, error
ACCEPTED_GAIERROR_MISMATCH = {
"gaierror(-5, 'No address associated with hostname')": "DNSError(3, 'name does not exist')"}
# EAI_NODATA (-5) is deprecated in RFC3493, so libevent's resolves returning -2 is also right here
"gaierror(-5, 'No address associated with hostname')": "gaierror(-2, 'Name or service not known')"}
assert gaierror is real_socket.gaierror
assert error is real_socket.error
VERBOSE = '-v' in sys.argv
VERBOSE = sys.argv.count('-v') >= 2 or '-vv' in sys.argv
PASS = True
if VERBOSE:
def log(s, *args):
try:
s = s % args
except Exception:
traceback.print_exc()
s = '%s %r' % (s, args)
sys.stdout.write(s + '\n')
else:
def log(*args):
pass
class TestCase(greentest.TestCase):
def _test(self, hostname, check_ip=None):
def _test(self, hostname, check_ip=None, fail=False):
self._test_gethostbyname(hostname, check_ip=check_ip)
self._test_getaddrinfo(hostname)
self._test_getaddrinfo(hostname, 80, 0, SOCK_STREAM, fail=fail)
def _test_gethostbyname(self, hostname, check_ip=None):
log('\nreal_socket.gethostbyname(%r)', hostname)
try:
if VERBOSE:
print 'real_socket.gethostbyname(%r)' % (hostname, )
real_ip = real_socket.gethostbyname(hostname)
log(' returned %r', real_ip)
except Exception, ex:
real_ip = ex
log(' raised %r', real_ip)
try:
if VERBOSE:
print 'gevent.socket.gethostbyname(%r)' % (hostname, )
log('gevent.socket.gethostbyname(%r)', hostname)
ip = gethostbyname(hostname)
log(' returned %r', ip)
except Exception, ex:
ip = ex
log(' raised %r', ip)
if self.equal(real_ip, ip):
return ip
self.assertEqual(real_ip, ip)
......@@ -42,45 +62,46 @@ class TestCase(greentest.TestCase):
self.assertEqual(check_ip, ip)
return ip
PORTS = [80, 0, 53, 'http']
getaddrinfo_args = [(),
(AF_UNSPEC, ),
(AF_UNSPEC, SOCK_STREAM, 0, 0),
(AF_INET, SOCK_STREAM, ),
(AF_UNSPEC, SOCK_DGRAM, ),
(AF_INET, SOCK_RAW, ),
(AF_UNSPEC, SOCK_STREAM, 6),
(AF_INET, SOCK_DGRAM, 17)]
def _test_getaddrinfo(self, hostname):
for port in self.PORTS:
for args in self.getaddrinfo_args:
if VERBOSE:
print
print 'real_socket.getaddrinfo(%r, %r, %r)' % (hostname, port, args)
try:
real_ip = real_socket.getaddrinfo(hostname, port, *args)
if VERBOSE:
print ' returned %r' % (real_ip, )
except Exception, ex:
real_ip = ex
if VERBOSE:
print 'gevent.socket.getaddrinfo(%r, %r, %r)' % (hostname, port, args)
try:
ip = getaddrinfo(hostname, port, *args)
if VERBOSE:
print ' returned %r' % (ip, )
except Exception, ex:
ip = ex
if not self.equal(real_ip, ip):
args_str = ', '.join(repr(x) for x in (hostname, port) + args)
print 'WARNING: getaddrinfo(%s):\n %r\n != %r' % (args_str, real_ip, ip)
# QQQ switch_expected becomes useless when a bunch of unrelated tests are merged
# into a single one like above. Generate individual test cases instead?
def _test_getaddrinfo(self, *args, **kwargs):
fail = kwargs.get('fail', False)
has_failed = None
log('\nreal_socket.getaddrinfo%r', args)
try:
real_ip = real_socket.getaddrinfo(*args)
log(' returned %r', real_ip)
except Exception, ex:
real_ip = ex
log(' raised %r', real_ip)
has_failed = True
log('gevent.socket.getaddrinfo%r', args)
try:
ip = getaddrinfo(*args)
log(' returned %r', ip)
except Exception, ex:
ip = ex
log(' raised %r', ex)
has_failed = True
if not self.equal(real_ip, ip):
args_str = ', '.join(repr(x) for x in args)
msg = 'getaddrinfo(%s):\n expected: %r\n got: %r' % (args_str, real_ip, ip)
if PASS:
log(msg)
else:
raise AssertionError(msg)
if fail is None:
pass
elif fail:
if not has_failed:
raise AssertionError('getaddinfo must fail')
else:
if has_failed:
raise AssertionError('getaddrinfo failed')
def equal(self, a, b):
if a == b:
return True
if isinstance(a, list) and isinstance(b, list) and sorted(a) == sorted(b):
return True
if isinstance(a, Exception) and isinstance(b, Exception):
if repr(a) == repr(b):
return True
......@@ -119,16 +140,17 @@ class TestLocal(TestCase):
def test_1_2_3_4(self):
self._test('1.2.3.4')
def SKIP_test_notexistent(self):
def test_notexistent(self):
# not really interesting because the original gethostbyname() is called for everything without dots
# disabled because it takes too much time on windows for some reason
self._test('notexistent')
self.switch_expected = True
self._test('notexistent', fail=True)
def test_None(self):
self._test(None)
def test_25(self):
self._test(25)
self._test(25, fail=True)
try:
etc_hosts = open('/etc/hosts').read()
......@@ -137,29 +159,61 @@ class TestLocal(TestCase):
for ip, host in re.findall(r'^\s*(\d+\.\d+\.\d+\.\d+)\s+([^\s]+)', etc_hosts, re.M)[:10]:
func = get_test(ip, host)
print 'Adding %s' % func.__name__
#print 'Adding %s' % func.__name__
locals()[func.__name__] = func
del func
class TestRemote(TestCase):
class TestGethostbyname(TestCase):
def test(self):
self._test_gethostbyname('gevent.org')
class TestGetaddrinfo(TestCase):
switch_expected = True
def test_www_python_org(self):
self._test('www.python.org')
def test_80(self):
self._test_getaddrinfo('gevent.org', 80)
def test_0(self):
self._test_getaddrinfo('gevent.org', 0)
def test_http(self):
self._test_getaddrinfo('gevent.org', 'http')
def test_notexistent_tld(self):
self._test('myhost.mytld')
self._test_getaddrinfo('myhost.mytld', fail=True)
def test_notexistent_dot_com(self):
self._test('sdfsdfgu5e66098032453245wfdggd.com')
self._test_getaddrinfo('sdfsdfgu5e66098032453245wfdggd.com', fail=True)
def test1(self):
return self._test_getaddrinfo('gevent.org', 52, AF_UNSPEC, SOCK_STREAM, 0, 0)
def test2(self):
return self._test_getaddrinfo('gevent.org', 53, AF_INET, SOCK_DGRAM, 17)
def test3(self):
return self._test_getaddrinfo('google.com', 'http', AF_INET6, fail=None)
class TestInternational(TestCase):
def test(self):
self._test(u'президент.рф')
def test_getaddrinfo(self):
self._test_getaddrinfo(u'президент.рф', 80)
def test_gethostbyname(self):
self.switch_expected = False
self._test_gethostbyname(u'президент.рф')
class TestInterrupted(greentest.GenericWaitTestCase):
def wait(self, timeout):
with gevent.Timeout(timeout, False):
return getaddrinfo('www.gevent.org', 'http')
if __name__ == '__main__':
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment