Commit 38197f23 authored by Denis Bilenko's avatar Denis Bilenko

add server.DatagramServer; modify baseserver.BaseServer in such a way that...

add server.DatagramServer; modify baseserver.BaseServer in such a way that makes it a good base class for both StreamServer and DatagramServer

baseserver.py:
- BaseServer no longer accepts 'backlog' parameter. It is now done by StreamServer.
- BaseServer implements start_accepting() and stop_accepting() methods
- BaseServer now implements "termporarily stop accepting" strategy
- BaseServer now has _do_read method which does everything except for actually calling accept()/recvfrom()
- pre_start() method is renamed to init_socket()
- renamed _stopped_event to _stop_event
- 'started' is now a read-only property (which actually reports state of _stop_event)
- post_stop() method is removed
- close() now sets _stop_event(), thus setting 'started' to False, thus causing serve_forever() to exit
- _tcp_listener() function is moved from baseserver.py to server.py
- added 'fatal_errors' class attribute which is a tuple of all errnos that should kill the server

server.py:
- StreamServer: 'ssl_enabled' is now a read-only property
parent ab9061cb
......@@ -3,62 +3,88 @@
from gevent.greenlet import Greenlet, getfuncname
from gevent.event import Event
from gevent.six import string_types, integer_types
import gevent
import _socket
import sys
import errno
__all__ = ['BaseServer']
DEFAULT_MAX_ACCEPT = 100
class BaseServer(object):
"""An abstract base class that implements some common functionality for the servers in gevent.
*listener* can either be an address that the server should bind on or a :class:`gevent.socket.socket`
instance that is already bound and put into listening mode. In the former case, *backlog* argument specifies the
length of the backlog queue. If not provided, the default (256) is used.
instance that is already bound (and put into listening mode in case of TCP socket).
*spawn*, if provided, is called to create a new greenlet to run the handler. By default, :func:`gevent.spawn` is used.
Possible values for *spawn*:
* a :class:`gevent.pool.Pool` instance -- *handle* will be executed using :meth:`Pool.spawn` method only if the
pool is not full. While it is full, all the connection are dropped;
* :func:`gevent.spawn_raw` -- *handle* will be executed in a raw greenlet which have a little less overhead then
:class:`gevent.Greenlet` instances spawned by default;
* ``None`` -- *handle* will be executed right away, in the :class:`Hub` greenlet. *handle* cannot use any blocking
functions as it means switching to the :class:`Hub`.
* a :class:`gevent.pool.Pool` instance -- *handle* will be executed
using :meth:`Pool.spawn` method only if the pool is not full.
While it is full, all the connection are dropped;
* :func:`gevent.spawn_raw` -- *handle* will be executed in a raw
greenlet which have a little less overhead then :class:`gevent.Greenlet` instances spawned by default;
* ``None`` -- *handle* will be executed right away, in the :class:`Hub` greenlet.
*handle* cannot use any blocking functions as it means switching to the :class:`Hub`.
* an integer -- a shortcut for ``gevent.pool.Pool(integer)``
"""
# the number of seconds to sleep in case there was an error in accept() call
# for consecutive errors the delay will double until it reaches max_delay
# when accept() finally succeeds the delay will be reset to min_delay again
min_delay = 0.01
max_delay = 1
# Sets the maximum number of consecutive accepts that a process may perform on
# a single wake up. High values give higher priority to high connection rates,
# while lower values give higher priority to already established connections.
# Default is 100. Note, that in case of multiple working processes on the same
# listening value, it should be set to a lower value. (pywsgi.WSGIServer sets it
# to 1 when environ["wsgi.multiprocess"] is true)
max_accept = None
_spawn = Greenlet.spawn
# the default backlog to use if none was provided in __init__
backlog = 256
reuse_addr = 1
# the default timeout that we wait for the client connections to close in stop()
stop_timeout = 1
def __init__(self, listener, handle=None, backlog=None, spawn='default'):
self._stopped_event = Event()
self.set_listener(listener, backlog=backlog)
self.set_spawn(spawn)
self.set_handle(handle)
self.started = None
fatal_errors = (errno.EBADF, errno.EINVAL, errno.ENOTSOCK)
def __init__(self, listener, handle=None, spawn='default'):
self._stop_event = Event()
self._stop_event.set()
self._watcher = None
self._timer = None
self.pool = None
try:
self.set_listener(listener)
self.set_spawn(spawn)
self.set_handle(handle)
self.delay = self.min_delay
self.loop = gevent.get_hub().loop
if self.max_accept is None:
self.max_accept = DEFAULT_MAX_ACCEPT
if self.max_accept < 1:
raise ValueError('max_accept must be positive int: %r' % (self.max_accept, ))
except:
self.close()
raise
def set_listener(self, listener, backlog=None):
def set_listener(self, listener):
if hasattr(listener, 'accept'):
if hasattr(listener, 'do_handshake'):
raise TypeError('Expected a regular socket, not SSLSocket: %r' % (listener, ))
if backlog is not None:
raise TypeError('backlog must be None when a socket instance is passed')
self.family = listener.family
self.address = listener.getsockname()
self.socket = listener
else:
if backlog is not None:
self.backlog = backlog
self.family, self.address = parse_address(listener)
def set_spawn(self, spawn):
......@@ -80,13 +106,73 @@ class BaseServer(object):
if self.pool is not None:
self.pool._semaphore.rawlink(self._start_accepting_if_started)
def set_handle(self, handle):
if handle is not None:
self.handle = handle
if hasattr(self, 'handle'):
self._handle = self.handle
else:
raise TypeError("'handle' must be provided")
def _start_accepting_if_started(self, _event=None):
if self.started:
self.start_accepting()
def set_handle(self, handle):
if handle is not None:
self.handle = handle
def start_accepting(self):
if self._watcher is None:
# just stop watcher without creating a new one?
self._watcher = self.loop.io(self.socket.fileno(), 1)
self._watcher.start(self._do_read)
def stop_accepting(self):
if self._watcher is not None:
self._watcher.stop()
self._watcher = None
if self._timer is not None:
self._timer.stop()
self._timer = None
def do_handle(self, *args):
spawn = self._spawn
if spawn is None:
self._handle(*args)
else:
spawn(self._handle, *args)
def _do_read(self):
for _ in xrange(self.max_accept):
if self.full():
self.stop_accepting()
return
try:
args = self.do_read()
self.delay = self.min_delay
if not args:
return
except:
self.loop.handle_error(self, *sys.exc_info())
ex = sys.exc_info()[1]
if self.is_fatal_error(ex):
self.close()
sys.stderr.write('ERROR: %s failed with %s\n' % (self, str(ex) or repr(ex)))
return
if self.delay >= 0:
self.stop_accepting()
self._timer = self.loop.timer(self.delay)
self._timer.start(self._start_accepting_if_started)
self.delay = min(self.max_delay, self.delay * 2)
break
else:
try:
self.do_handle(*args)
except:
self.loop.handle_error((args[1:], self), *sys.exc_info())
if self.delay >= 0:
self.stop_accepting()
self._timer = self.loop.timer(self.delay)
self._timer.start(self._start_accepting_if_started)
self.delay = min(self.max_delay, self.delay * 2)
break
def full(self):
return False
......@@ -135,25 +221,26 @@ class BaseServer(object):
if isinstance(self.address, tuple):
return self.address[1]
def pre_start(self):
def init_socket(self):
"""If the user initialized the server with an address rather than socket,
then this function will create a socket, bind it and put it into listening mode.
It is not supposed to be called by the user, it is called by :meth:`start` before starting
the accept loop."""
if not hasattr(self, 'socket'):
self.socket = _tcp_listener(self.address, backlog=self.backlog, reuse_addr=self.reuse_addr, family=self.family)
self.address = self.socket.getsockname()
self._stopped_event.clear()
pass
@property
def started(self):
return not self._stop_event.is_set()
def start(self):
"""Start accepting the connections.
If an address was provided in the constructor, then also create a socket, bind it and put it into the listening mode.
If an address was provided in the constructor, then also create a socket,
bind it and put it into the listening mode.
"""
assert not self.started, '%s already started' % self.__class__.__name__
self.pre_start()
self.started = True
self.init_socket()
self._stop_event.clear()
try:
self.start_accepting()
except:
......@@ -162,7 +249,7 @@ class BaseServer(object):
def close(self):
"""Close the listener socket and stop accepting."""
self.started = False
self._stop_event.set()
try:
self.stop_accepting()
finally:
......@@ -170,10 +257,14 @@ class BaseServer(object):
self.socket.close()
except Exception:
pass
self.__dict__.pop('socket', None)
self.__dict__.pop('handle', None)
if self.pool is not None:
self.pool._semaphore.unlink(self._start_accepting_if_started)
finally:
self.__dict__.pop('socket', None)
self.__dict__.pop('handle', None)
self.__dict__.pop('_handle', None)
self.__dict__.pop('_spawn', None)
self.__dict__.pop('full', None)
if self.pool is not None:
self.pool._semaphore.unlink(self._start_accepting_if_started)
def stop(self, timeout=None):
"""Stop accepting the connections and close the listening socket.
......@@ -187,10 +278,6 @@ class BaseServer(object):
if self.pool:
self.pool.join(timeout=timeout)
self.pool.kill(block=True, timeout=1)
self.post_stop()
def post_stop(self):
self._stopped_event.set()
def serve_forever(self, stop_timeout=None):
"""Start the server if it hasn't been already started and wait until it's stopped."""
......@@ -198,32 +285,12 @@ class BaseServer(object):
if not self.started:
self.start()
try:
self._stopped_event.wait()
except:
self.stop(timeout=stop_timeout)
raise
def _tcp_listener(address, backlog=50, reuse_addr=None, family=_socket.AF_INET):
"""A shortcut to create a TCP socket, bind it and put it into listening state.
self._stop_event.wait()
finally:
gevent.spawn(self.stop, timeout=stop_timeout).join()
The difference from :meth:`gevent.socket.tcp_listener` is that this function returns
an unwrapped :class:`_socket.socket` instance.
"""
sock = _socket.socket(family=family)
if reuse_addr is not None:
sock.setsockopt(_socket.SOL_SOCKET, _socket.SO_REUSEADDR, reuse_addr)
try:
sock.bind(address)
except _socket.error:
ex = sys.exc_info()[1]
strerror = getattr(ex, 'strerror', None)
if strerror is not None:
ex.strerror = strerror + ': ' + repr(address)
raise
sock.listen(backlog)
sock.setblocking(0)
return sock
def is_fatal_error(self, ex):
return isinstance(ex, _socket.error) and ex[0] in self.fatal_errors
def _extract_family(host):
......
......@@ -565,8 +565,8 @@ class WSGIServer(StreamServer):
def get_environ(self):
return self.environ.copy()
def pre_start(self):
StreamServer.pre_start(self)
def init_socket(self):
StreamServer.init_socket(self)
self.update_environ()
def update_environ(self):
......
# Copyright (c) 2009-2011 Denis Bilenko. See LICENSE for details.
"""TCP/SSL server"""
import sys
import errno
from gevent import socket
import _socket
from gevent.baseserver import BaseServer
from gevent.hub import get_hub
from gevent.socket import EWOULDBLOCK
from gevent.socket import EWOULDBLOCK, socket
__all__ = ['StreamServer']
DEFAULT_MAX_ACCEPT = 100
__all__ = ['StreamServer', 'DatagramServer']
class StreamServer(BaseServer):
......@@ -37,103 +32,127 @@ class StreamServer(BaseServer):
The delay starts with :attr:`min_delay` and doubles with each successive error until it reaches :attr:`max_delay`.
A successful :func:`accept` resets the delay to :attr:`min_delay` again.
"""
# Sets the maximum number of consecutive accepts that a process may perform on
# a single wake up. High values give higher priority to high connection rates,
# while lower values give higher priority to already established connections.
# Default is 100. Note, that in case of multiple working processes on the same
# listening value, it should be set to a lower value. (pywsgi.WSGIServer sets it
# to 1 when environ["wsgi.multiprocess"] is true)
max_accept = None
# the number of seconds to sleep in case there was an error in accept() call
# for consecutive errors the delay will double until it reaches max_delay
# when accept() finally succeeds the delay will be reset to min_delay again
min_delay = 0.01
max_delay = 1
# the default backlog to use if none was provided in __init__
backlog = 256
def __init__(self, listener, handle=None, backlog=None, spawn='default', **ssl_args):
if ssl_args:
ssl_args.setdefault('server_side', True)
from gevent.ssl import wrap_socket
self.wrap_socket = wrap_socket
self.ssl_args = ssl_args
self.ssl_enabled = True
else:
self.ssl_enabled = False
BaseServer.__init__(self, listener, handle=handle, backlog=backlog, spawn=spawn)
self.delay = self.min_delay
self._accept_event = None
self._start_accepting_timer = None
self.loop = get_hub().loop
def set_listener(self, listener, backlog=None):
BaseServer.set_listener(self, listener, backlog=backlog)
BaseServer.__init__(self, listener, handle=handle, spawn=spawn)
try:
if ssl_args:
ssl_args.setdefault('server_side', True)
from gevent.ssl import wrap_socket
self.wrap_socket = wrap_socket
self.ssl_args = ssl_args
else:
self.ssl_args = None
if backlog is not None:
if hasattr(self, 'socket'):
raise TypeError('backlog must be None when a socket instance is passed')
self.backlog = backlog
except:
self.close()
raise
@property
def ssl_enabled(self):
return self.ssl_args is not None
def set_listener(self, listener):
BaseServer.set_listener(self, listener)
try:
self.socket = self.socket._sock
except AttributeError:
pass
def pre_start(self):
BaseServer.pre_start(self)
# make SSL work:
if self.ssl_enabled:
def init_socket(self):
if not hasattr(self, 'socket'):
self.socket = _tcp_listener(self.address, backlog=self.backlog,
reuse_addr=self.reuse_addr, family=self.family)
self.address = self.socket.getsockname()
if self.ssl_args:
self._handle = self.wrap_socket_and_handle
else:
self._handle = self.handle
def start_accepting(self):
if self._accept_event is None:
if self.max_accept is None:
self.max_accept = DEFAULT_MAX_ACCEPT
self._accept_event = self.loop.io(self.socket.fileno(), 1)
self._accept_event.start(self._do_accept)
def stop_accepting(self):
if self._accept_event is not None:
self._accept_event.stop()
self._accept_event = None
if self._start_accepting_timer is not None:
self._start_accepting_timer.stop()
self._start_accepting_timer = None
def _do_accept(self):
for _ in xrange(self.max_accept):
address = None
try:
if self.full():
self.stop_accepting()
return
try:
client_socket, address = self.socket.accept()
except socket.error, err:
if err[0] == EWOULDBLOCK:
return
raise
self.delay = self.min_delay
client_socket = socket.socket(_sock=client_socket)
spawn = self._spawn
if spawn is None:
self._handle(client_socket, address)
else:
spawn(self._handle, client_socket, address)
except:
self.loop.handle_error((address, self), *sys.exc_info())
ex = sys.exc_info()[1]
if self.is_fatal_error(ex):
self.close()
sys.stderr.write('ERROR: %s failed with %s\n' % (self, str(ex) or repr(ex)))
return
if self.delay >= 0:
self.stop_accepting()
self._start_accepting_timer = self.loop.timer(self.delay)
self._start_accepting_timer.start(self._start_accepting_if_started)
self.delay = min(self.max_delay, self.delay * 2)
break
def is_fatal_error(self, ex):
return isinstance(ex, socket.error) and ex[0] in (errno.EBADF, errno.EINVAL, errno.ENOTSOCK)
def do_read(self):
try:
client_socket, address = self.socket.accept()
except _socket.error, err:
if err[0] == EWOULDBLOCK:
return
raise
return socket(_sock=client_socket), address
def wrap_socket_and_handle(self, client_socket, address):
# used in case of ssl sockets
ssl_socket = self.wrap_socket(client_socket, **self.ssl_args)
return self.handle(ssl_socket, address)
class DatagramServer(BaseServer):
"""A UDP server"""
def __init__(self, *args, **kwargs):
BaseServer.__init__(self, *args, **kwargs)
from gevent.coros import Semaphore
self._writelock = Semaphore()
def init_socket(self):
if not hasattr(self, 'socket'):
self.socket = _udp_socket(self.address, reuse_addr=self.reuse_addr, family=self.family)
self.address = self.socket.getsockname()
self._socket = self.socket
try:
self._socket = self._socket._sock
except AttributeError:
pass
def do_read(self):
try:
data, address = self._socket.recvfrom(8192)
except _socket.error, err:
if err[0] == EWOULDBLOCK:
return
raise
return data, address
def sendto(self, *args):
self._writelock.acquire()
try:
self.socket.sendto(*args)
finally:
self._writelock.release()
def _tcp_listener(address, backlog=50, reuse_addr=None, family=_socket.AF_INET):
"""A shortcut to create a TCP socket, bind it and put it into listening state."""
sock = socket(family=family)
if reuse_addr is not None:
sock.setsockopt(_socket.SOL_SOCKET, _socket.SO_REUSEADDR, reuse_addr)
try:
sock.bind(address)
except _socket.error:
ex = sys.exc_info()[1]
strerror = getattr(ex, 'strerror', None)
if strerror is not None:
ex.strerror = strerror + ': ' + repr(address)
raise
sock.listen(backlog)
sock.setblocking(0)
return sock
def _udp_socket(address, backlog=50, reuse_addr=None, family=_socket.AF_INET):
# we want gevent.socket.socket here
sock = socket(family=family, type=_socket.SOCK_DGRAM)
if reuse_addr is not None:
sock.setsockopt(_socket.SOL_SOCKET, _socket.SO_REUSEADDR, reuse_addr)
try:
sock.bind(address)
except _socket.error:
ex = sys.exc_info()[1]
strerror = getattr(ex, 'strerror', None)
if strerror is not None:
ex.strerror = strerror + ': ' + repr(address)
raise
return sock
......@@ -191,14 +191,12 @@ class TestDefaultSpawn(TestCase):
self.server.start_accepting()
self.report_netstat('after start_accepting')
self.assertRequestSucceeded()
else:
self.assertRaises(Exception, self.server.start) # XXX which exception exactly?
self.stop_server()
self.report_netstat('after stop')
def test_backlog_is_not_accepted_for_socket(self):
self.switch_expected = False
self.assertRaises(TypeError, self.ServerClass, self.get_listener(), backlog=25)
self.assertRaises(TypeError, self.ServerClass, self.get_listener(), backlog=25, handle=False)
def test_backlog_is_accepted_for_address(self):
self.server = self.ServerSubClass(('127.0.0.1', 0), backlog=25)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment