Commit edfaf987 authored by Denis Bilenko's avatar Denis Bilenko

add a few tests for new gevent.core

--HG--
rename : greentest/test__core_active_event.py => greentest/test__core_callback.py
parent 5f36dbf0
import sys
from greentest import TestCase, main
from gevent import core
class Test(TestCase):
switch_expected = False
__timeout__ = None
def test_get_version(self):
version = core.get_version()
assert isinstance(version, str), repr(version)
assert version, repr(version)
def test_flags_conversion(self):
if sys.platform != 'win32':
self.assertEqual(core.loop(2, default=False).backend_int, 2)
self.assertEqual(core.loop('select', default=False).backend, 'select')
self.assertEqual(core._flags_to_int(None), 0)
self.assertEqual(core._flags_to_int(['kqueue', 'SELECT']), core.BACKEND_KQUEUE|core.BACKEND_SELECT)
self.assertEqual(core._flags_to_list(core.BACKEND_PORT|core.BACKEND_POLL), ['port', 'poll'])
self.assertRaises(ValueError, core.loop, ['port', 'blabla'])
self.assertRaises(TypeError, core.loop, object())
def test_events_conversion(self):
self.assertEqual(core._events_to_str(core.READ|core.WRITE), 'READ|WRITE')
if __name__ == '__main__':
main()
......@@ -9,16 +9,19 @@ def f():
def main():
active_event = get_hub().reactor.active_event
x = active_event(f)
assert x.pending == 1, x.pending
loop = get_hub().loop
x = loop.callback()
x.start(f)
assert x.active, x.pending
gevent.sleep(0)
assert x.pending == 0, x.pending
assert called == [1], called
x = active_event(f)
x = loop.callback()
x.start(f)
assert x.pending == 1, x.pending
x.cancel()
x.stop()
assert x.pending == 0, x.pending
gevent.sleep(0)
assert called == [1], called
......
import sys
from gevent import core, signal
loop = core.loop()
signal = signal(2, sys.stderr.write, 'INTERRUPT!')
print 'must exit immediatelly...'
loop.run() # must exit immediatelly
print '...and once more...'
loop.run() # repeating does not fail
print '..done'
print 'must exit after 0.5 seconds.'
timer = loop.timer(0.5)
timer.start(lambda: None)
loop.run()
from gevent import core
called = []
def f():
called.append(1)
def main():
loop = core.loop(default=True)
x = loop.timer(0.001)
x.start(f)
assert x.active, x.pending
loop.run()
assert x.pending == 0, x.pending
assert called == [1], called
assert x.callback is None, x.callback
assert x.args is None, x.args
x.stop()
if __name__ == '__main__':
import sys
gettotalrefcount = getattr(sys, 'gettotalrefcount', None)
called[:] = []
if gettotalrefcount is not None:
print gettotalrefcount()
main()
called[:] = []
if gettotalrefcount is not None:
print gettotalrefcount()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment