Commit a19130e1 authored by Vincent Pelletier's avatar Vincent Pelletier

Implement message queue. This change makes poll handle at most one packet per...

Implement message queue. This change makes poll handle at most one packet per call, instead of handling all pending packets at once.


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@697 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 295a2fa8
......@@ -106,6 +106,9 @@ class BaseConnection(object):
def isListeningConnection(self):
raise NotImplementedError
def hasPendingMessages(self):
return False
class ListeningConnection(BaseConnection):
"""A listen connection."""
def __init__(self, event_manager, handler, addr = None,
......@@ -141,6 +144,7 @@ class Connection(BaseConnection):
self.event_dict = {}
self.aborted = False
self.uuid = None
self._queue = []
BaseConnection.__init__(self, event_manager, handler,
connector = connector, addr = addr,
connector_handler = connector_handler)
......@@ -232,10 +236,34 @@ class Connection(BaseConnection):
packet.getType(), dump(self.uuid), *self.getAddress())
try:
self.handler.packetReceived(self, packet)
self._queue.append(packet)
finally:
self.read_buf = self.read_buf[len(packet):]
def hasPendingMessages(self):
"""
Returns True if there are messages queued and awaiting processing.
"""
return len(self._queue) != 0
def _enqueue(self, packet):
"""
Enqueue a parsed packet for future processing.
"""
self._queue.append(packet)
def _dequeue(self):
"""
Dequeue a packet for processing.
"""
return self._queue.pop(0)
def process(self):
"""
Process a pending packet.
"""
self.handler.packetReceived(self, self._dequeue())
def pending(self):
return self.connector is not None and self.write_buf
......
......@@ -82,6 +82,7 @@ class SelectEventManager(object):
self.exc_list = []
self.event_list = []
self.prev_time = time()
self._pending_processing = []
def getConnectionList(self, with_admin_nodes=False):
return self.connection_dict.values()
......@@ -92,6 +93,30 @@ class SelectEventManager(object):
def unregister(self, conn):
del self.connection_dict[conn.getConnector()]
def _getPendingConnection(self):
if len(self._pending_processing):
result = self._pending_processing.pop(0)
else:
result = None
return result
def _addPendingConnection(self, conn):
self._pending_processing.append(conn)
def poll(self, timeout = 1):
to_process = self._getPendingConnection()
if to_process is None:
# Fetch messages from polled file descriptors
self._poll(timeout=timeout)
# See if there is anything to process
to_process = self._getPendingConnection()
if to_process is not None:
# Process
to_process.process()
# ...and requeue if there are pending messages
if to_process.hasPendingMessages():
self._addPendingConnection(to_process)
def poll(self, timeout = 1):
rlist, wlist, xlist = select(self.reader_set, self.writer_set, self.exc_list,
timeout)
......@@ -102,6 +127,8 @@ class SelectEventManager(object):
conn.readable()
finally:
conn.unlock()
if conn.hasPendingMessages():
self._addPendingConnection(conn)
for s in wlist:
# This can fail, if a connection is closed in readable().
......@@ -166,6 +193,7 @@ class EpollEventManager(object):
self.event_list = []
self.prev_time = time()
self.epoll = Epoll()
self._pending_processing = []
def getConnectionList(self):
return self.connection_dict.values()
......@@ -191,7 +219,31 @@ class EpollEventManager(object):
self.epoll.unregister(fd)
del self.connection_dict[fd]
def _getPendingConnection(self):
if len(self._pending_processing):
result = self._pending_processing.pop(0)
else:
result = None
return result
def _addPendingConnection(self, conn):
self._pending_processing.append(conn)
def poll(self, timeout = 1):
to_process = self._getPendingConnection()
if to_process is None:
# Fetch messages from polled file descriptors
self._poll(timeout=timeout)
# See if there is anything to process
to_process = self._getPendingConnection()
if to_process is not None:
# Process
to_process.process()
# ...and requeue if there are pending messages
if to_process.hasPendingMessages():
self._addPendingConnection(to_process)
def _poll(self, timeout = 1):
rlist, wlist = self.epoll.poll(timeout)
for fd in rlist:
try:
......@@ -205,6 +257,8 @@ class EpollEventManager(object):
conn.readable()
finally:
conn.unlock()
if conn.hasPendingMessages():
self._addPendingConnection(conn)
for fd in wlist:
# This can fail, if a connection is closed in readable().
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment