Commit 135c33f0 authored by Denis Bilenko's avatar Denis Bilenko

test__greenlet.py: add TestBasic

parent fddad38c
...@@ -516,28 +516,154 @@ class TestJoinAll(greentest.GenericWaitTestCase): ...@@ -516,28 +516,154 @@ class TestJoinAll(greentest.GenericWaitTestCase):
gevent.joinall([gevent.spawn(gevent.sleep, 10)], timeout=timeout) gevent.joinall([gevent.spawn(gevent.sleep, 10)], timeout=timeout)
class TestKill(greentest.TestCase): class TestBasic(greentest.TestCase):
def test_simple_exit(self):
link_test = []
def func(delay, return_value=4):
gevent.sleep(delay)
return return_value
g = gevent.Greenlet(func, 0.01, return_value=5)
g.link(lambda x: link_test.append(x))
assert not g
assert not g.dead
assert not g.started
assert not g.ready()
assert not g.successful()
assert g.value is None
assert g.exception is None
g.start()
assert not g
assert not g.dead
assert g.started # changed
assert not g.ready()
assert not g.successful()
assert g.value is None
assert g.exception is None
def test_kill_running(self):
g = gevent.spawn(gevent.sleep, 10)
gevent.sleep(0.001) gevent.sleep(0.001)
error = ExpectedError('hello world') assert g # changed
g.kill(error) assert not g.dead
gevent.sleep(0.01) assert g.started
assert g.dead, g.dead assert not g.ready()
assert not g.successful()
assert g.value is None
assert g.exception is None
assert not link_test
gevent.sleep(0.02)
assert not g
assert g.dead
assert not g.started
assert g.ready()
assert g.successful()
assert g.value == 5
assert g.exception is None # not changed
assert link_test == [g] # changed
def test_error_exit(self):
link_test = []
def func(delay, return_value=4):
gevent.sleep(delay)
error = ExpectedError('test_error_exit')
error.myattr = return_value
raise error
g = gevent.Greenlet(func, 0.01, return_value=5)
g.link(lambda x: link_test.append(x))
g.start()
gevent.sleep(0.02)
assert not g
assert g.dead
assert not g.started
assert g.ready() assert g.ready()
assert g.exception is error, (g.exception, error) assert not g.successful()
assert g.value is None # not changed
assert g.exception.myattr == 5
assert link_test == [g], link_test
def _assertKilled(self, g):
assert not g
assert g.dead
assert not g.started
assert g.ready()
assert g.successful(), (repr(g), g.value, g.exception)
assert isinstance(g.value, gevent.GreenletExit), (repr(g), g.value, g.exception)
assert g.exception is None
#def test_kill_not_started(self): # currently kill() does nothing if Greenlet is not yet started def assertKilled(self, g):
self._assertKilled(g)
gevent.sleep(0.01)
self._assertKilled(g)
def test_kill_started_later(self): def _test_kill(self, g, block):
g = gevent.spawn_later(0.01, gevent.sleep, 10) g.kill(block=block)
error = AssertionError('this must must not be raised') if not block:
g.kill(error) gevent.sleep(0.01)
gevent.sleep(0.1) self.assertKilled(g)
assert g.dead, g # kill second time must not hurt
assert g.ready(), g g.kill(block=block)
assert g.exception is error, (g.exception, error) self.assertKilled(g)
def _test_kill_not_started(self, block):
link_test = []
result = []
g = gevent.Greenlet(lambda : result.append(1))
g.link(lambda x: link_test.append(x))
self._test_kill(g, block=block)
assert not result
assert link_test == [g]
def test_kill_not_started_block(self):
self._test_kill_not_started(block=True)
def test_kill_not_started_noblock(self):
self._test_kill_not_started(block=False)
def _test_kill_just_started(self, block):
result = []
link_test = []
g = gevent.Greenlet(lambda : result.append(1))
g.link(lambda x: link_test.append(x))
g.start()
self._test_kill(g, block=block)
assert not result, result
assert link_test == [g]
def test_kill_just_started_block(self):
self._test_kill_just_started(block=True)
def test_kill_just_started_noblock(self):
self._test_kill_just_started(block=False)
def _test_kill_just_started_later(self, block):
result = []
link_test = []
g = gevent.Greenlet(lambda : result.append(1))
g.link(lambda x: link_test.append(x))
g.start_later(1)
self._test_kill(g, block=block)
assert not result
def test_kill_just_started_later_block(self):
self._test_kill_just_started_later(block=True)
def test_kill_just_started_later_noblock(self):
self._test_kill_just_started_later(block=False)
def _test_kill_running(self, block):
link_test = []
g = gevent.spawn(gevent.sleep, 10)
g.link(lambda x: link_test.append(x))
gevent.sleep(0.01)
self._test_kill(g, block=block)
assert link_test == [g]
def test_kill_running_block(self):
self._test_kill_running(block=True)
def test_kill_running_noblock(self):
self._test_kill_running(block=False)
def assert_ready(g): def assert_ready(g):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment