Commit eedd8eda authored by Jason Madden's avatar Jason Madden

test__core works for libev

parent 0c828704
...@@ -17,6 +17,20 @@ __all__ = [ ...@@ -17,6 +17,20 @@ __all__ = [
] ]
def events_to_str(event_field, all_events):
result = []
for (flag, string) in all_events:
c_flag = flag
if event_field & c_flag:
result.append(string)
event_field = event_field & (~c_flag)
if not event_field:
break
if event_field:
result.append(hex(event_field))
return '|'.join(result)
def not_while_active(func): def not_while_active(func):
@functools.wraps(func) @functools.wraps(func)
def nw(self, *args, **kwargs): def nw(self, *args, **kwargs):
......
...@@ -80,11 +80,8 @@ SIGNALFD = libev.EVFLAG_SIGNALFD ...@@ -80,11 +80,8 @@ SIGNALFD = libev.EVFLAG_SIGNALFD
NOSIGMASK = libev.EVFLAG_NOSIGMASK NOSIGMASK = libev.EVFLAG_NOSIGMASK
class _EVENTSType(object): from gevent._ffi.loop import EVENTS
def __repr__(self): GEVENT_CORE_EVENTS = EVENTS
return 'gevent.core.EVENTS'
EVENTS = GEVENT_CORE_EVENTS = _EVENTSType()
def get_version(): def get_version():
......
...@@ -46,21 +46,11 @@ _events = [(libev.EV_READ, 'READ'), ...@@ -46,21 +46,11 @@ _events = [(libev.EV_READ, 'READ'),
(libev.EV_CUSTOM, 'CUSTOM'), (libev.EV_CUSTOM, 'CUSTOM'),
(libev.EV_ERROR, 'ERROR')] (libev.EV_ERROR, 'ERROR')]
def _events_to_str(events): from gevent._ffi import watcher as _base
result = []
for (flag, string) in _events:
c_flag = flag
if events & c_flag:
result.append(string)
events = events & (~c_flag)
if not events:
break
if events:
result.append(hex(events))
return '|'.join(result)
def _events_to_str(events):
return _base.events_to_str(events, _events)
from gevent._ffi import watcher as _base
class watcher(_base.watcher): class watcher(_base.watcher):
......
...@@ -5,6 +5,9 @@ ...@@ -5,6 +5,9 @@
#define UV_EBUSY ... #define UV_EBUSY ...
#define UV_VERSION_MAJOR ...
#define UV_VERSION_MINOR ...
#define UV_VERSION_PATCH ...
typedef enum { typedef enum {
UV_RUN_DEFAULT = 0, UV_RUN_DEFAULT = 0,
...@@ -52,6 +55,7 @@ enum uv_fs_event_flags { ...@@ -52,6 +55,7 @@ enum uv_fs_event_flags {
const char* uv_strerror(int); const char* uv_strerror(int);
const char* uv_err_name(int); const char* uv_err_name(int);
const char* uv_version_string(void);
// handle structs and types // handle structs and types
struct uv_loop_s { struct uv_loop_s {
......
...@@ -22,8 +22,29 @@ __all__ = [ ...@@ -22,8 +22,29 @@ __all__ = [
_callbacks = assign_standard_callbacks(ffi, libuv) _callbacks = assign_standard_callbacks(ffi, libuv)
from gevent._ffi.loop import EVENTS
GEVENT_CORE_EVENTS = EVENTS # export
from gevent.libuv import watcher as _watchers from gevent.libuv import watcher as _watchers
_events_to_str = _watchers._events_to_str # export
READ = libuv.UV_READABLE
WRITE = libuv.UV_WRITABLE
def get_version():
uv_bytes = ffi.string(libuv.uv_version_string())
if not isinstance(uv_bytes, str):
# Py3
uv_str = uv_bytes.decode("ascii")
else:
uv_str = uv_bytes
return 'libuv-' + uv_str
def get_header_version():
return 'libuv-%d.%d.%d' % (libuv.UV_VERSION_MAJOR, libuv.UV_VERSION_MINOR, libuv.UV_VERSION_PATCH)
class loop(AbstractLoop): class loop(AbstractLoop):
error_handler = None error_handler = None
......
...@@ -21,6 +21,12 @@ def _uv_close_callback(handle): ...@@ -21,6 +21,12 @@ def _uv_close_callback(handle):
def _dbg(*args, **kwargs): def _dbg(*args, **kwargs):
pass pass
_events = [(libuv.UV_READABLE, "READ"),
(libuv.UV_WRITABLE, "WRITE")]
def _events_to_str(events): # export
return _base.events_to_str(events, _events)
class watcher(_base.watcher): class watcher(_base.watcher):
_FFI = ffi _FFI = ffi
_LIB = libuv _LIB = libuv
......
import sys import sys
from greentest import TestCase, main from greentest import TestCase, main
from gevent import core from gevent import core
import unittest
class Test(TestCase): class Test(TestCase):
...@@ -16,6 +17,8 @@ class Test(TestCase): ...@@ -16,6 +17,8 @@ class Test(TestCase):
assert header_version, repr(header_version) assert header_version, repr(header_version)
self.assertEqual(version, header_version) self.assertEqual(version, header_version)
@unittest.skipIf(hasattr(core, 'libuv'),
"flags are libev-only")
def test_flags_conversion(self): def test_flags_conversion(self):
if sys.platform != 'win32': if sys.platform != 'win32':
self.assertEqual(core.loop(2, default=False).backend_int, 2) self.assertEqual(core.loop(2, default=False).backend_int, 2)
...@@ -41,7 +44,9 @@ class Test(TestCase): ...@@ -41,7 +44,9 @@ class Test(TestCase):
Error = ValueError Error = ValueError
win32 = False win32 = False
self.assertRaises(Error, core.loop().io, -1, 1) self.assertRaises(Error, core.loop().io, -1, 1)
self.assertRaises(ValueError, core.loop().io, 1, core.TIMER) if hasattr(core, 'TIMER'):
# libev
self.assertRaises(ValueError, core.loop().io, 1, core.TIMER)
# Test we can set events and io before it's started # Test we can set events and io before it's started
if not win32: if not win32:
# We can't do this with arbitrary FDs on windows; # We can't do this with arbitrary FDs on windows;
...@@ -50,7 +55,11 @@ class Test(TestCase): ...@@ -50,7 +55,11 @@ class Test(TestCase):
io.fd = 2 io.fd = 2
self.assertEqual(io.fd, 2) self.assertEqual(io.fd, 2)
io.events = core.WRITE io.events = core.WRITE
self.assertEqual(core._events_to_str(io.events), 'WRITE|_IOFDSET') if not hasattr(core, 'libuv'):
# libev
self.assertEqual(core._events_to_str(io.events), 'WRITE|_IOFDSET')
else:
self.assertEqual(core._events_to_str(io.events), 'WRITE')
def test_timer(self): def test_timer(self):
self.assertRaises(ValueError, core.loop().timer, 1, -1) self.assertRaises(ValueError, core.loop().timer, 1, -1)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment