Commit 72277c38 authored by Julien Muchembled's avatar Julien Muchembled

Extend node/connection API to support crossed client connections

This is required, for example, for simultaneous reciprocal replication
between 2 storages.
parent 0f980f31
...@@ -394,13 +394,15 @@ class Connection(BaseConnection): ...@@ -394,13 +394,15 @@ class Connection(BaseConnection):
"""A connection.""" """A connection."""
connecting = False connecting = False
client = False
server = False
peer_id = None
def __init__(self, event_manager, *args, **kw): def __init__(self, event_manager, *args, **kw):
BaseConnection.__init__(self, event_manager, *args, **kw) BaseConnection.__init__(self, event_manager, *args, **kw)
self.read_buf = ReadBuffer() self.read_buf = ReadBuffer()
self.write_buf = [] self.write_buf = []
self.cur_id = 0 self.cur_id = 0
self.peer_id = 0
self.aborted = False self.aborted = False
self.uuid = None self.uuid = None
self._queue = [] self._queue = []
...@@ -409,9 +411,36 @@ class Connection(BaseConnection): ...@@ -409,9 +411,36 @@ class Connection(BaseConnection):
event_manager.addReader(self) event_manager.addReader(self)
def setOnClose(self, callback): def setOnClose(self, callback):
assert self._on_close is None
self._on_close = callback self._on_close = callback
def isClient(self):
return self.client
def isServer(self):
return self.server
def asClient(self):
try:
del self.idle
assert self.client
except AttributeError:
self.client = True
def asServer(self):
self.server = True
def _closeClient(self):
if self.server:
del self.idle
self.client = False
self.notify(Packets.CloseClient())
else:
self.close()
def closeClient(self):
if self.connector is not None:
self.idle = self._closeClient
def isAborted(self): def isAborted(self):
return self.aborted return self.aborted
...@@ -632,6 +661,7 @@ class ClientConnection(Connection): ...@@ -632,6 +661,7 @@ class ClientConnection(Connection):
"""A connection from this node to a remote node.""" """A connection from this node to a remote node."""
connecting = True connecting = True
client = True
def __init__(self, event_manager, handler, node, connector): def __init__(self, event_manager, handler, node, connector):
addr = node.getAddress() addr = node.getAddress()
...@@ -669,9 +699,6 @@ class ClientConnection(Connection): ...@@ -669,9 +699,6 @@ class ClientConnection(Connection):
else: else:
Connection.writable(self) Connection.writable(self)
def isClient(self):
return True
class ServerConnection(Connection): class ServerConnection(Connection):
"""A connection from a remote node to this node.""" """A connection from a remote node to this node."""
...@@ -683,13 +710,12 @@ class ServerConnection(Connection): ...@@ -683,13 +710,12 @@ class ServerConnection(Connection):
# ping the client. Otherwise, it would do it about half of the time. # ping the client. Otherwise, it would do it about half of the time.
KEEP_ALIVE = Connection.KEEP_ALIVE + 5 KEEP_ALIVE = Connection.KEEP_ALIVE + 5
server = True
def __init__(self, *args, **kw): def __init__(self, *args, **kw):
Connection.__init__(self, *args, **kw) Connection.__init__(self, *args, **kw)
self.updateTimeout(time()) self.updateTimeout(time())
def isServer(self):
return True
class MTClientConnection(ClientConnection): class MTClientConnection(ClientConnection):
"""A Multithread-safe version of ClientConnection.""" """A Multithread-safe version of ClientConnection."""
......
...@@ -132,6 +132,11 @@ class EventHandler(object): ...@@ -132,6 +132,11 @@ class EventHandler(object):
def notify(self, conn, message): def notify(self, conn, message):
neo.lib.logging.info('notification from %r: %s', conn, message) neo.lib.logging.info('notification from %r: %s', conn, message)
def closeClient(self, conn):
conn.server = False
if not conn.client:
conn.close()
# Error packet handlers. # Error packet handlers.
def error(self, conn, code, message): def error(self, conn, code, message):
......
...@@ -21,7 +21,7 @@ import json ...@@ -21,7 +21,7 @@ import json
import neo.lib import neo.lib
from .util import dump from .util import dump
from .protocol import NodeTypes, NodeStates from .protocol import NodeTypes, NodeStates, ProtocolError
from . import attributeTracker from . import attributeTracker
...@@ -100,15 +100,29 @@ class Node(object): ...@@ -100,15 +100,29 @@ class Node(object):
del self._connection del self._connection
self._manager._updateIdentified(self) self._manager._updateIdentified(self)
def setConnection(self, connection): def setConnection(self, connection, force=None):
""" """
Define the connection that is currently available to this node. Define the connection that is currently available to this node.
If there is already a connection set, 'force' must be given:
the new connection replaces the old one if it is true. In any case,
the node must be managed by the same handler for the client and
server parts.
""" """
assert not connection.isClosed(), connection
assert self._connection is None, attributeTracker.whoSet(self, '_connection')
assert connection.getUUID() in (None, self._uuid), connection assert connection.getUUID() in (None, self._uuid), connection
connection.setUUID(self._uuid) connection.setUUID(self._uuid)
self._connection = connection conn = self._connection
if conn is None:
self._connection = connection
else:
assert force is not None, \
attributeTracker.whoSet(self, '_connection')
# The test on peer_id is there to protect against buggy nodes.
if not force or conn.getPeerId() is not None or \
type(conn.getHandler()) is not type(connection.getHandler()):
raise ProtocolError("already connected")
conn.setOnClose(lambda: setattr(self, '_connection', connection))
conn.close()
assert not connection.isClosed(), connection
connection.setOnClose(self.onConnectionClosed) connection.setOnClose(self.onConnectionClosed)
self._manager._updateIdentified(self) self._manager._updateIdentified(self)
......
...@@ -25,7 +25,7 @@ from struct import Struct ...@@ -25,7 +25,7 @@ from struct import Struct
from .util import Enum, getAddressType from .util import Enum, getAddressType
# The protocol version (major, minor). # The protocol version (major, minor).
PROTOCOL_VERSION = (5, 1) PROTOCOL_VERSION = (6, 1)
# Size restrictions. # Size restrictions.
MIN_PACKET_SIZE = 10 MIN_PACKET_SIZE = 10
...@@ -649,6 +649,11 @@ class Ping(Packet): ...@@ -649,6 +649,11 @@ class Ping(Packet):
""" """
_answer = PFEmpty _answer = PFEmpty
class CloseClient(Packet):
"""
Tell peer it can close the connection if it has finished with us. Any -> Any
"""
class RequestIdentification(Packet): class RequestIdentification(Packet):
""" """
Request a node identification. This must be the first packet for any Request a node identification. This must be the first packet for any
...@@ -1433,6 +1438,8 @@ class Packets(dict): ...@@ -1433,6 +1438,8 @@ class Packets(dict):
Error) Error)
Ping, Pong = register( Ping, Pong = register(
Ping) Ping)
CloseClient = register(
CloseClient)
Notify = register( Notify = register(
Notify) Notify)
RequestIdentification, AcceptIdentification = register( RequestIdentification, AcceptIdentification = register(
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment