Commit 57481c35 authored by Julien Muchembled's avatar Julien Muchembled

Review API betweeen connections and connectors

- Review error handling. Only 2 exceptions remain in connector.py:

  - Drop useless exception handling for EAGAIN since it should not happen
    if the kernel says the socket is ready.
  - Do not distinguish other socket errors. Just close and log in a generic way.
  - No need to raise a specific exception for EOF.
  - Make 'connect' return a boolean instead of raising an exception.
  - Raise appropriate exception when answer/ask/notify is called on a closed
    non-MT connection.

- Add support for more complex connectors, which may need to write for a read
  operation, or to read when there's pending data to send. This will be
  required for SSL support (more exactly, the handshake will be done in
  a transparent way):

  - Move write buffer to connector.
  - Make 'receive' fill the read buffer, instead of returning the read data.
  - Make 'receive' & 'send' return a boolean to switch polling for writing.
  - Tolerate that sockets return 0 as number of bytes sent.

- In testConnection, simply delete all failing tests, as announced
  in commit 71e30fb9.
parent 36a32f23
...@@ -18,9 +18,7 @@ from functools import wraps ...@@ -18,9 +18,7 @@ from functools import wraps
from time import time from time import time
from . import attributeTracker, logging from . import attributeTracker, logging
from .connector import ConnectorException, ConnectorTryAgainException, \ from .connector import ConnectorException, ConnectorDelayedConnection
ConnectorInProgressException, ConnectorConnectionRefusedException, \
ConnectorConnectionClosedException, ConnectorDelayedConnection
from .locking import RLock from .locking import RLock
from .protocol import uuid_str, Errors, \ from .protocol import uuid_str, Errors, \
PacketMalformedError, Packets, ParserState PacketMalformedError, Packets, ParserState
...@@ -31,14 +29,6 @@ CRITICAL_TIMEOUT = 30 ...@@ -31,14 +29,6 @@ CRITICAL_TIMEOUT = 30
class ConnectionClosed(Exception): class ConnectionClosed(Exception):
pass pass
def not_closed(func):
def decorator(self, *args, **kw):
if self.connector is None:
raise ConnectorConnectionClosedException
return func(self, *args, **kw)
return wraps(func)(decorator)
class HandlerSwitcher(object): class HandlerSwitcher(object):
_is_handling = False _is_handling = False
_next_timeout = None _next_timeout = None
...@@ -316,14 +306,11 @@ class ListeningConnection(BaseConnection): ...@@ -316,14 +306,11 @@ class ListeningConnection(BaseConnection):
self.em.register(self) self.em.register(self)
def readable(self): def readable(self):
try:
connector, addr = self.connector.accept() connector, addr = self.connector.accept()
logging.debug('accepted a connection from %s:%d', *addr) logging.debug('accepted a connection from %s:%d', *addr)
handler = self.getHandler() handler = self.getHandler()
new_conn = ServerConnection(self.em, handler, connector, addr) new_conn = ServerConnection(self.em, handler, connector, addr)
handler.connectionAccepted(new_conn) handler.connectionAccepted(new_conn)
except ConnectorTryAgainException:
pass
def getAddress(self): def getAddress(self):
return self.connector.getAddress() return self.connector.getAddress()
...@@ -347,7 +334,6 @@ class Connection(BaseConnection): ...@@ -347,7 +334,6 @@ class Connection(BaseConnection):
def __init__(self, event_manager, *args, **kw): def __init__(self, event_manager, *args, **kw):
BaseConnection.__init__(self, event_manager, *args, **kw) BaseConnection.__init__(self, event_manager, *args, **kw)
self.read_buf = ReadBuffer() self.read_buf = ReadBuffer()
self.write_buf = []
self.cur_id = 0 self.cur_id = 0
self.aborted = False self.aborted = False
self.uuid = None self.uuid = None
...@@ -444,39 +430,47 @@ class Connection(BaseConnection): ...@@ -444,39 +430,47 @@ class Connection(BaseConnection):
"""Abort dealing with this connection.""" """Abort dealing with this connection."""
logging.debug('aborting a connector for %r', self) logging.debug('aborting a connector for %r', self)
self.aborted = True self.aborted = True
assert self.write_buf assert self.pending()
if self._on_close is not None: if self._on_close is not None:
self._on_close() self._on_close()
self._on_close = None self._on_close = None
def writable(self): def writable(self):
"""Called when self is writable.""" """Called when self is writable."""
self._send() try:
if not self.write_buf and self.connector is not None: if self.connector.send():
if self.aborted: if self.aborted:
self.close() self.close()
else: else:
self.em.removeWriter(self) self.em.removeWriter(self)
except ConnectorException:
self._closure()
def readable(self): def readable(self):
"""Called when self is readable.""" """Called when self is readable."""
self._recv() # last known remote activity
self._analyse() self._next_timeout = time() + self._timeout
if self.aborted: read_buf = self.read_buf
self.em.removeReader(self)
return not not self._queue
def _analyse(self):
"""Analyse received data."""
try: try:
while True: try:
packet = Packets.parse(self.read_buf, self._parser_state) if self.connector.receive(read_buf):
self.em.addWriter(self)
finally:
# A connector may read some data
# before raising ConnectorException
while 1:
packet = Packets.parse(read_buf, self._parser_state)
if packet is None: if packet is None:
break break
self._queue.append(packet) self._queue.append(packet)
except ConnectorException:
self._closure()
except PacketMalformedError, e: except PacketMalformedError, e:
logging.error('malformed packet from %r: %s', self, e) logging.error('malformed packet from %r: %s', self, e)
self._closure() self._closure()
if self.aborted:
self.em.removeReader(self)
return not not self._queue
def hasPendingMessages(self): def hasPendingMessages(self):
""" """
...@@ -493,7 +487,8 @@ class Connection(BaseConnection): ...@@ -493,7 +487,8 @@ class Connection(BaseConnection):
self.updateTimeout() self.updateTimeout()
def pending(self): def pending(self):
return self.connector is not None and self.write_buf connector = self.connector
return connector is not None and connector.queued
@property @property
def setReconnectionNoDelay(self): def setReconnectionNoDelay(self):
...@@ -503,7 +498,6 @@ class Connection(BaseConnection): ...@@ -503,7 +498,6 @@ class Connection(BaseConnection):
if self.connector is None: if self.connector is None:
assert self._on_close is None assert self._on_close is None
assert not self.read_buf assert not self.read_buf
assert not self.write_buf
assert not self.isPending() assert not self.isPending()
return return
# process the network events with the last registered handler to # process the network events with the last registered handler to
...@@ -514,7 +508,6 @@ class Connection(BaseConnection): ...@@ -514,7 +508,6 @@ class Connection(BaseConnection):
if self._on_close is not None: if self._on_close is not None:
self._on_close() self._on_close()
self._on_close = None self._on_close = None
del self.write_buf[:]
self.read_buf.clear() self.read_buf.clear()
try: try:
if self.connecting: if self.connecting:
...@@ -531,89 +524,28 @@ class Connection(BaseConnection): ...@@ -531,89 +524,28 @@ class Connection(BaseConnection):
self._handlers.handle(self, self._queue.pop(0)) self._handlers.handle(self, self._queue.pop(0))
self.close() self.close()
def _recv(self):
"""Receive data from a connector."""
try:
data = self.connector.receive()
except ConnectorTryAgainException:
pass
except ConnectorConnectionRefusedException:
assert self.connecting
self._closure()
except ConnectorConnectionClosedException:
# connection resetted by peer, according to the man, this error
# should not occurs but it seems it's false
logging.debug('Connection reset by peer: %r', self.connector)
self._closure()
except:
logging.debug('Unknown connection error: %r', self.connector)
self._closure()
# unhandled connector exception
raise
else:
if not data:
logging.debug('Connection %r closed in recv', self.connector)
self._closure()
return
# last known remote activity
self._next_timeout = time() + self._timeout
self.read_buf.append(data)
def _send(self):
"""Send data to a connector."""
if not self.write_buf:
return
msg = ''.join(self.write_buf)
try:
n = self.connector.send(msg)
except ConnectorTryAgainException:
pass
except ConnectorConnectionClosedException:
# connection resetted by peer
logging.debug('Connection reset by peer: %r', self.connector)
self._closure()
except:
logging.debug('Unknown connection error: %r', self.connector)
# unhandled connector exception
self._closure()
raise
else:
if not n:
logging.debug('Connection %r closed in send', self.connector)
self._closure()
return
if n == len(msg):
del self.write_buf[:]
else:
self.write_buf = [msg[n:]]
def _addPacket(self, packet): def _addPacket(self, packet):
"""Add a packet into the write buffer.""" """Add a packet into the write buffer."""
if self.connector is None: if self.connector.queue(packet.encode()):
return
was_empty = not self.write_buf
self.write_buf.extend(packet.encode())
if was_empty:
# enable polling for writing. # enable polling for writing.
self.em.addWriter(self) self.em.addWriter(self)
logging.packet(self, packet, True) logging.packet(self, packet, True)
@not_closed
def notify(self, packet): def notify(self, packet):
""" Then a packet with a new ID """ """ Then a packet with a new ID """
if self.isClosed():
raise ConnectionClosed
msg_id = self._getNextId() msg_id = self._getNextId()
packet.setId(msg_id) packet.setId(msg_id)
self._addPacket(packet) self._addPacket(packet)
return msg_id return msg_id
@not_closed
def ask(self, packet, timeout=CRITICAL_TIMEOUT, on_timeout=None, **kw): def ask(self, packet, timeout=CRITICAL_TIMEOUT, on_timeout=None, **kw):
""" """
Send a packet with a new ID and register the expectation of an answer Send a packet with a new ID and register the expectation of an answer
""" """
if self.isClosed():
raise ConnectionClosed
msg_id = self._getNextId() msg_id = self._getNextId()
packet.setId(msg_id) packet.setId(msg_id)
self._addPacket(packet) self._addPacket(packet)
...@@ -627,9 +559,10 @@ class Connection(BaseConnection): ...@@ -627,9 +559,10 @@ class Connection(BaseConnection):
self.em.wakeup() self.em.wakeup()
return msg_id return msg_id
@not_closed
def answer(self, packet, msg_id=None): def answer(self, packet, msg_id=None):
""" Answer to a packet by re-using its ID for the packet answer """ """ Answer to a packet by re-using its ID for the packet answer """
if self.isClosed():
raise ConnectionClosed
if msg_id is None: if msg_id is None:
msg_id = self.getPeerId() msg_id = self.getPeerId()
packet.setId(msg_id) packet.setId(msg_id)
...@@ -656,32 +589,25 @@ class ClientConnection(Connection): ...@@ -656,32 +589,25 @@ class ClientConnection(Connection):
def _connect(self): def _connect(self):
try: try:
self.connector.makeClientConnection() connected = self.connector.makeClientConnection()
except ConnectorInProgressException:
self.em.register(self)
self.em.addWriter(self)
except ConnectorDelayedConnection, c: except ConnectorDelayedConnection, c:
connect_limit, = c.args connect_limit, = c.args
self.getTimeout = lambda: connect_limit self.getTimeout = lambda: connect_limit
self.onTimeout = self._delayedConnect self.onTimeout = self._delayedConnect
self.em.register(self, timeout_only=True) self.em.register(self, timeout_only=True)
# Fake _addPacket so that if does not
# try to reenable polling for writing.
self.write_buf.insert(0, '')
except ConnectorConnectionRefusedException:
self._closure()
except ConnectorException: except ConnectorException:
# unhandled connector exception
self._closure() self._closure()
raise
else: else:
self.em.register(self) self.em.register(self)
if self.write_buf: if connected:
self.em.addWriter(self)
self._connectionCompleted() self._connectionCompleted()
# A client connection usually has a pending packet to send
# from the beginning. It would be too smart to detect when
# it's not required to poll for writing.
self.em.addWriter(self)
def _delayedConnect(self): def _delayedConnect(self):
del self.getTimeout, self.onTimeout, self.write_buf[0] del self.getTimeout, self.onTimeout
self._connect() self._connect()
def writable(self): def writable(self):
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
import socket import socket
import errno import errno
from time import time from time import time
from . import logging
# Global connector registry. # Global connector registry.
# Fill by calling registerConnectorHandler. # Fill by calling registerConnectorHandler.
...@@ -56,8 +57,19 @@ class SocketConnector(object): ...@@ -56,8 +57,19 @@ class SocketConnector(object):
s.setblocking(0) s.setblocking(0)
# disable Nagle algorithm to reduce latency # disable Nagle algorithm to reduce latency
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
self.queued = []
return self return self
def queue(self, data):
was_empty = not self.queued
self.queued += data
return was_empty
def _error(self, op, exc):
logging.debug("%s failed for %s: %s (%s)",
op, self, errno.errorcode[exc.errno], exc.strerror)
raise ConnectorException
# Threaded tests monkey-patch the following 2 operations. # Threaded tests monkey-patch the following 2 operations.
_connect = lambda self, addr: self.socket.connect(addr) _connect = lambda self, addr: self.socket.connect(addr)
_bind = lambda self, addr: self.socket.bind(addr) _bind = lambda self, addr: self.socket.bind(addr)
...@@ -68,20 +80,23 @@ class SocketConnector(object): ...@@ -68,20 +80,23 @@ class SocketConnector(object):
try: try:
connect_limit = self.connect_limit[addr] connect_limit = self.connect_limit[addr]
if time() < connect_limit: if time() < connect_limit:
# Next call to queue() must return False
# in order not to enable polling for writing.
self.queued or self.queued.append('')
raise ConnectorDelayedConnection(connect_limit) raise ConnectorDelayedConnection(connect_limit)
if self.queued and not self.queued[0]:
del self.queued[0]
except KeyError: except KeyError:
pass pass
self.connect_limit[addr] = time() + self.CONNECT_LIMIT self.connect_limit[addr] = time() + self.CONNECT_LIMIT
self.is_server = self.is_closed = False self.is_server = self.is_closed = False
try: try:
self._connect(addr) self._connect(addr)
except socket.error, (err, errmsg): except socket.error, e:
if err == errno.EINPROGRESS: if e.errno == errno.EINPROGRESS:
raise ConnectorInProgressException return False
if err == errno.ECONNREFUSED: self._error('connect', e)
raise ConnectorConnectionRefusedException return True
raise ConnectorException, 'makeClientConnection to %s failed:' \
' %s:%s' % (addr, err, errmsg)
def makeListeningConnection(self): def makeListeningConnection(self):
assert self.is_closed is None assert self.is_closed is None
...@@ -90,10 +105,9 @@ class SocketConnector(object): ...@@ -90,10 +105,9 @@ class SocketConnector(object):
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self._bind(self.addr) self._bind(self.addr)
self.socket.listen(5) self.socket.listen(5)
except socket.error, (err, errmsg): except socket.error, e:
self.socket.close() self.socket.close()
raise ConnectorException, 'makeListeningConnection on %s failed:' \ self._error('listen', e)
' %s:%s' % (addr, err, errmsg)
def getError(self): def getError(self):
return self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) return self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
...@@ -116,33 +130,39 @@ class SocketConnector(object): ...@@ -116,33 +130,39 @@ class SocketConnector(object):
s, addr = self.socket.accept() s, addr = self.socket.accept()
s = self.__class__(addr, s) s = self.__class__(addr, s)
return s, s.addr return s, s.addr
except socket.error, (err, errmsg): except socket.error, e:
if err == errno.EAGAIN: self._error('accept', e)
raise ConnectorTryAgainException
raise ConnectorException, 'accept failed: %s:%s' % \
(err, errmsg)
def receive(self): def receive(self, read_buf):
try: try:
return self.socket.recv(4096) data = self.socket.recv(4096)
except socket.error, (err, errmsg): except socket.error, e:
if err == errno.EAGAIN: self._error('recv', e)
raise ConnectorTryAgainException if data:
if err in (errno.ECONNREFUSED, errno.EHOSTUNREACH): read_buf.append(data)
raise ConnectorConnectionRefusedException return
if err in (errno.ECONNRESET, errno.ETIMEDOUT): logging.debug('%r closed in recv', self)
raise ConnectorConnectionClosedException raise ConnectorException
raise ConnectorException, 'receive failed: %s:%s' % (err, errmsg)
def send(self):
def send(self, msg): msg = ''.join(self.queued)
if msg:
try: try:
return self.socket.send(msg) n = self.socket.send(msg)
except socket.error, (err, errmsg): except socket.error, e:
if err == errno.EAGAIN: self._error('send', e)
raise ConnectorTryAgainException # Do nothing special if n == 0:
if err in (errno.ECONNRESET, errno.ETIMEDOUT, errno.EPIPE): # - it never happens for simple sockets;
raise ConnectorConnectionClosedException # - for SSL sockets, this is always the case unless everything
raise ConnectorException, 'send failed: %s:%s' % (err, errmsg) # could be sent.
if n != len(msg):
self.queued[:] = msg[n:],
return False
del self.queued[:]
else:
assert not self.queued
return True
def close(self): def close(self):
self.is_closed = True self.is_closed = True
...@@ -195,17 +215,5 @@ registerConnectorHandler(SocketConnectorIPv6) ...@@ -195,17 +215,5 @@ registerConnectorHandler(SocketConnectorIPv6)
class ConnectorException(Exception): class ConnectorException(Exception):
pass pass
class ConnectorTryAgainException(ConnectorException):
pass
class ConnectorInProgressException(ConnectorException):
pass
class ConnectorConnectionClosedException(ConnectorException):
pass
class ConnectorConnectionRefusedException(ConnectorException):
pass
class ConnectorDelayedConnection(ConnectorException): class ConnectorDelayedConnection(ConnectorException):
pass pass
...@@ -16,8 +16,7 @@ ...@@ -16,8 +16,7 @@
from collections import deque from collections import deque
from neo.lib import logging from neo.lib import logging
from neo.lib.connection import ClientConnection from neo.lib.connection import ClientConnection, ConnectionClosed
from neo.lib.connector import ConnectorConnectionClosedException
from neo.lib.protocol import NodeTypes, Packets, ZERO_OID from neo.lib.protocol import NodeTypes, Packets, ZERO_OID
from neo.lib.util import add64, dump from neo.lib.util import add64, dump
from .handlers.storage import StorageOperationHandler from .handlers.storage import StorageOperationHandler
...@@ -85,7 +84,7 @@ class Checker(object): ...@@ -85,7 +84,7 @@ class Checker(object):
if self.conn_dict: if self.conn_dict:
break break
msg = "no replica" msg = "no replica"
except ConnectorConnectionClosedException: except ConnectionClosed:
msg = "connection closed" msg = "connection closed"
finally: finally:
conn_set.update(self.conn_dict) conn_set.update(self.conn_dict)
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
import weakref import weakref
from functools import wraps from functools import wraps
from neo.lib.connector import ConnectorConnectionClosedException from neo.lib.connection import ConnectionClosed
from neo.lib.handler import EventHandler from neo.lib.handler import EventHandler
from neo.lib.protocol import Errors, NodeStates, Packets, ProtocolError, \ from neo.lib.protocol import Errors, NodeStates, Packets, ProtocolError, \
ZERO_HASH ZERO_HASH
...@@ -154,7 +154,7 @@ class StorageOperationHandler(EventHandler): ...@@ -154,7 +154,7 @@ class StorageOperationHandler(EventHandler):
r = app.dm.checkTIDRange(*args) r = app.dm.checkTIDRange(*args)
try: try:
conn.answer(Packets.AnswerCheckTIDRange(*r), msg_id) conn.answer(Packets.AnswerCheckTIDRange(*r), msg_id)
except (weakref.ReferenceError, ConnectorConnectionClosedException): except (weakref.ReferenceError, ConnectionClosed):
pass pass
yield yield
app.newTask(check()) app.newTask(check())
...@@ -170,7 +170,7 @@ class StorageOperationHandler(EventHandler): ...@@ -170,7 +170,7 @@ class StorageOperationHandler(EventHandler):
r = app.dm.checkSerialRange(*args) r = app.dm.checkSerialRange(*args)
try: try:
conn.answer(Packets.AnswerCheckSerialRange(*r), msg_id) conn.answer(Packets.AnswerCheckSerialRange(*r), msg_id)
except (weakref.ReferenceError, ConnectorConnectionClosedException): except (weakref.ReferenceError, ConnectionClosed):
pass pass
yield yield
app.newTask(check()) app.newTask(check())
...@@ -211,7 +211,7 @@ class StorageOperationHandler(EventHandler): ...@@ -211,7 +211,7 @@ class StorageOperationHandler(EventHandler):
conn.answer(Packets.AnswerFetchTransactions( conn.answer(Packets.AnswerFetchTransactions(
pack_tid, next_tid, peer_tid_set), msg_id) pack_tid, next_tid, peer_tid_set), msg_id)
yield yield
except (weakref.ReferenceError, ConnectorConnectionClosedException): except (weakref.ReferenceError, ConnectionClosed):
pass pass
app.newTask(push()) app.newTask(push())
...@@ -253,6 +253,6 @@ class StorageOperationHandler(EventHandler): ...@@ -253,6 +253,6 @@ class StorageOperationHandler(EventHandler):
conn.answer(Packets.AnswerFetchObjects( conn.answer(Packets.AnswerFetchObjects(
pack_tid, next_tid, next_oid, object_dict), msg_id) pack_tid, next_tid, next_oid, object_dict), msg_id)
yield yield
except (weakref.ReferenceError, ConnectorConnectionClosedException): except (weakref.ReferenceError, ConnectionClosed):
pass pass
app.newTask(push()) app.newTask(push())
This diff is collapsed.
...@@ -31,8 +31,7 @@ import neo.client.app, neo.neoctl.app ...@@ -31,8 +31,7 @@ import neo.client.app, neo.neoctl.app
from neo.client import Storage from neo.client import Storage
from neo.lib import logging from neo.lib import logging
from neo.lib.connection import BaseConnection, Connection from neo.lib.connection import BaseConnection, Connection
from neo.lib.connector import SocketConnector, \ from neo.lib.connector import SocketConnector, ConnectorException
ConnectorConnectionRefusedException
from neo.lib.locking import SimpleQueue from neo.lib.locking import SimpleQueue
from neo.lib.protocol import CellStates, ClusterStates, NodeStates, NodeTypes from neo.lib.protocol import CellStates, ClusterStates, NodeStates, NodeTypes
from neo.lib.util import cached_property, parseMasterList, p64 from neo.lib.util import cached_property, parseMasterList, p64
...@@ -322,7 +321,7 @@ class ServerNode(Node): ...@@ -322,7 +321,7 @@ class ServerNode(Node):
try: try:
return self.listening_conn.getAddress() return self.listening_conn.getAddress()
except AttributeError: except AttributeError:
raise ConnectorConnectionRefusedException raise ConnectorException
class AdminApplication(ServerNode, neo.admin.app.Application): class AdminApplication(ServerNode, neo.admin.app.Application):
pass pass
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment