Commit 9077cfcd authored by Vincent Pelletier's avatar Vincent Pelletier

Automatically set connection on node when establishing a connection.

parent 9502dd02
...@@ -306,7 +306,7 @@ class Application(object): ...@@ -306,7 +306,7 @@ class Application(object):
# Connect to master # Connect to master
conn = MTClientConnection(self.em, conn = MTClientConnection(self.em,
self.notifications_handler, self.notifications_handler,
addr=self.trying_master_node.getAddress(), node=self.trying_master_node,
connector=self.connector_handler(), connector=self.connector_handler(),
dispatcher=self.dispatcher) dispatcher=self.dispatcher)
# Query for primary master node # Query for primary master node
......
...@@ -59,7 +59,7 @@ class StorageBootstrapHandler(AnswerBaseHandler): ...@@ -59,7 +59,7 @@ class StorageBootstrapHandler(AnswerBaseHandler):
assert node is not None, conn.getAddress() assert node is not None, conn.getAddress()
conn.setUUID(uuid) conn.setUUID(uuid)
node.setUUID(uuid) node.setUUID(uuid)
node.setConnection(conn) assert node.getConnection() is conn, (node.getConnection(), conn)
class StorageAnswersHandler(AnswerBaseHandler): class StorageAnswersHandler(AnswerBaseHandler):
""" Handle all messages related to ZODB operations """ """ Handle all messages related to ZODB operations """
......
...@@ -56,12 +56,10 @@ class ConnectionPool(object): ...@@ -56,12 +56,10 @@ class ConnectionPool(object):
@profiler_decorator @profiler_decorator
def _initNodeConnection(self, node): def _initNodeConnection(self, node):
"""Init a connection to a given storage node.""" """Init a connection to a given storage node."""
addr = node.getAddress()
assert addr is not None
app = self.app app = self.app
neo.lib.logging.debug('trying to connect to %s - %s', node, neo.lib.logging.debug('trying to connect to %s - %s', node,
node.getState()) node.getState())
conn = MTClientConnection(app.em, app.storage_event_handler, addr, conn = MTClientConnection(app.em, app.storage_event_handler, node,
connector=app.connector_handler(), dispatcher=app.dispatcher) connector=app.connector_handler(), dispatcher=app.dispatcher)
p = Packets.RequestIdentification(NodeTypes.CLIENT, p = Packets.RequestIdentification(NodeTypes.CLIENT,
app.uuid, None, app.name) app.uuid, None, app.name)
......
...@@ -144,8 +144,8 @@ class BootstrapManager(EventHandler): ...@@ -144,8 +144,8 @@ class BootstrapManager(EventHandler):
sleep(1) sleep(1)
if conn is None: if conn is None:
# open the connection # open the connection
addr = self.current.getAddress() conn = ClientConnection(em, self, self.current,
conn = ClientConnection(em, self, addr, connector_handler()) connector_handler())
# still processing # still processing
em.poll(1) em.poll(1)
node = nm.getByUUID(conn.getUUID()) node = nm.getByUUID(conn.getUUID())
......
...@@ -632,8 +632,10 @@ class ClientConnection(Connection): ...@@ -632,8 +632,10 @@ class ClientConnection(Connection):
connecting = True connecting = True
def __init__(self, event_manager, handler, addr, connector): def __init__(self, event_manager, handler, node, connector):
addr = node.getAddress()
Connection.__init__(self, event_manager, handler, connector, addr) Connection.__init__(self, event_manager, handler, connector, addr)
node.setConnection(self)
handler.connectionStarted(self) handler.connectionStarted(self)
try: try:
try: try:
......
...@@ -151,6 +151,7 @@ class Application(object): ...@@ -151,6 +151,7 @@ class Application(object):
self.unconnected_master_node_set.clear() self.unconnected_master_node_set.clear()
self.negotiating_master_node_set.clear() self.negotiating_master_node_set.clear()
self.listening_conn.setHandler(election.ServerElectionHandler(self)) self.listening_conn.setHandler(election.ServerElectionHandler(self))
getByAddress = self.nm.getByAddress
while True: while True:
...@@ -166,7 +167,10 @@ class Application(object): ...@@ -166,7 +167,10 @@ class Application(object):
while (self.unconnected_master_node_set or while (self.unconnected_master_node_set or
self.negotiating_master_node_set): self.negotiating_master_node_set):
for addr in self.unconnected_master_node_set: for addr in self.unconnected_master_node_set:
ClientConnection(self.em, client_handler, addr=addr, ClientConnection(self.em, client_handler,
# XXX: Ugly, but the whole election code will be
# replaced soon
node=getByAddress(addr),
connector=self.connector_handler()) connector=self.connector_handler())
self.negotiating_master_node_set.add(addr) self.negotiating_master_node_set.add(addr)
self.unconnected_master_node_set.clear() self.unconnected_master_node_set.clear()
...@@ -297,7 +301,7 @@ class Application(object): ...@@ -297,7 +301,7 @@ class Application(object):
node = nm.getByUUID(conn_uuid) node = nm.getByUUID(conn_uuid)
assert node is not None assert node is not None
assert node.isMaster() and not conn.isClient() assert node.isMaster() and not conn.isClient()
assert node._connection is None and node.isUnknown() assert node.isUnknown()
# this may trigger 'unexpected answer' warnings on remote side # this may trigger 'unexpected answer' warnings on remote side
conn.close() conn.close()
...@@ -331,13 +335,13 @@ class Application(object): ...@@ -331,13 +335,13 @@ class Application(object):
# Restart completely. Non-optimized # Restart completely. Non-optimized
# but lower level code needs to be stabilized first. # but lower level code needs to be stabilized first.
addr = self.primary_master_node.getAddress()
for conn in self.em.getConnectionList(): for conn in self.em.getConnectionList():
conn.close() conn.close()
# Reconnect to primary master node. # Reconnect to primary master node.
primary_handler = secondary.PrimaryHandler(self) primary_handler = secondary.PrimaryHandler(self)
ClientConnection(self.em, primary_handler, addr=addr, ClientConnection(self.em, primary_handler,
node=self.primary_master_node,
connector=self.connector_handler()) connector=self.connector_handler())
# and another for the future incoming connections # and another for the future incoming connections
......
...@@ -20,6 +20,7 @@ from neo.lib.connection import ClientConnection ...@@ -20,6 +20,7 @@ from neo.lib.connection import ClientConnection
from neo.lib.event import EventManager from neo.lib.event import EventManager
from neo.lib.protocol import ClusterStates, NodeStates, ErrorCodes, Packets from neo.lib.protocol import ClusterStates, NodeStates, ErrorCodes, Packets
from neo.lib.util import getConnectorFromAddress from neo.lib.util import getConnectorFromAddress
from neo.lib.node import NodeManager
from .handler import CommandEventHandler from .handler import CommandEventHandler
class NotReadyException(Exception): class NotReadyException(Exception):
...@@ -33,19 +34,21 @@ class NeoCTL(object): ...@@ -33,19 +34,21 @@ class NeoCTL(object):
def __init__(self, address): def __init__(self, address):
connector_name = getConnectorFromAddress(address) connector_name = getConnectorFromAddress(address)
self.connector_handler = getConnectorHandler(connector_name) self.connector_handler = getConnectorHandler(connector_name)
self.server = address self.nm = nm = NodeManager()
self.server = nm.createAdmin(address=address)
self.em = EventManager() self.em = EventManager()
self.handler = CommandEventHandler(self) self.handler = CommandEventHandler(self)
self.response_queue = [] self.response_queue = []
def close(self): def close(self):
self.em.close() self.em.close()
self.nm.close()
del self.__dict__ del self.__dict__
def __getConnection(self): def __getConnection(self):
if not self.connected: if not self.connected:
self.connection = ClientConnection(self.em, self.handler, self.connection = ClientConnection(self.em, self.handler,
addr=self.server, connector=self.connector_handler()) node=self.server, connector=self.connector_handler())
while self.connection is not None: while self.connection is not None:
if self.connected: if self.connected:
break break
......
...@@ -250,7 +250,7 @@ class Replicator(object): ...@@ -250,7 +250,7 @@ class Replicator(object):
if connection is None or connection.getAddress() != addr: if connection is None or connection.getAddress() != addr:
handler = replication.ReplicationHandler(app) handler = replication.ReplicationHandler(app)
self.current_connection = ClientConnection(app.em, handler, self.current_connection = ClientConnection(app.em, handler,
addr=addr, connector=app.connector_handler()) node=node, connector=app.connector_handler())
p = Packets.RequestIdentification(NodeTypes.STORAGE, p = Packets.RequestIdentification(NodeTypes.STORAGE,
app.uuid, app.server, app.name) app.uuid, app.server, app.name)
self.current_connection.ask(p) self.current_connection.ask(p)
......
...@@ -783,7 +783,7 @@ class ClientApplicationTests(NeoUnitTestBase): ...@@ -783,7 +783,7 @@ class ClientApplicationTests(NeoUnitTestBase):
# fourth iteration : connection to primary master succeeded # fourth iteration : connection to primary master succeeded
def _ask5(_): def _ask5(_):
app.trying_master_node = app.primary_master_node = Mock({ app.trying_master_node = app.primary_master_node = Mock({
'getAddress': ('192.168.1.1', 10000), 'getAddress': ('127.0.0.1', 10011),
'__str__': 'Fake master node', '__str__': 'Fake master node',
}) })
# third iteration : node not ready # third iteration : node not ready
...@@ -792,19 +792,23 @@ class ClientApplicationTests(NeoUnitTestBase): ...@@ -792,19 +792,23 @@ class ClientApplicationTests(NeoUnitTestBase):
# second iteration : master node changed # second iteration : master node changed
def _ask3(_): def _ask3(_):
app.primary_master_node = Mock({ app.primary_master_node = Mock({
'getAddress': ('192.168.1.1', 10000), 'getAddress': ('127.0.0.1', 10010),
'__str__': 'Fake master node', '__str__': 'Fake master node',
}) })
# first iteration : connection failed # first iteration : connection failed
def _ask2(_): def _ask2(_):
app.trying_master_node = None app.trying_master_node = None
# do nothing for the first call # do nothing for the first call
# Case of an unknown primary_uuid (XXX: handler should probably raise,
# it's not normal for a node to inform of a primary uuid without
# telling us what its address is.)
def _ask1(_): def _ask1(_):
pass pass
ask_func_list = [_ask1, _ask2, _ask3, _ask4, _ask5, _ask6, _ask7, ask_func_list = [_ask1, _ask2, _ask3, _ask4, _ask5, _ask6, _ask7,
_ask8] _ask8]
def _ask_base(conn, _, handler=None): def _ask_base(conn, _, handler=None):
ask_func_list.pop(0)(conn) ask_func_list.pop(0)(conn)
app.nm.getByAddress(conn.getAddress())._connection = None
app._ask = _ask_base app._ask = _ask_base
# faked environnement # faked environnement
app.connector_handler = DoNothingConnector app.connector_handler = DoNothingConnector
......
...@@ -57,7 +57,7 @@ class StorageBootstrapHandlerTests(NeoUnitTestBase): ...@@ -57,7 +57,7 @@ class StorageBootstrapHandlerTests(NeoUnitTestBase):
def test_acceptIdentification2(self): def test_acceptIdentification2(self):
uuid = self.getNewUUID() uuid = self.getNewUUID()
conn = self.getConnection() conn = self.getConnection()
node = Mock() node = Mock({'getConnection': conn})
self.app.nm = Mock({'getByAddress': node}) self.app.nm = Mock({'getByAddress': node})
self.handler.acceptIdentification(conn, NodeTypes.STORAGE, uuid, self.handler.acceptIdentification(conn, NodeTypes.STORAGE, uuid,
10, 0, None) 10, 0, None)
......
...@@ -39,6 +39,7 @@ class ConnectionTests(NeoUnitTestBase): ...@@ -39,6 +39,7 @@ class ConnectionTests(NeoUnitTestBase):
self.em = Mock({'__repr__': 'Fake Em'}) self.em = Mock({'__repr__': 'Fake Em'})
self.handler = Mock({'__repr__': 'Fake Handler'}) self.handler = Mock({'__repr__': 'Fake Handler'})
self.address = ("127.0.0.7", 93413) self.address = ("127.0.0.7", 93413)
self.node = Mock({'getAddress': self.address})
def _makeListeningConnection(self, addr): def _makeListeningConnection(self, addr):
# create instance after monkey patches # create instance after monkey patches
...@@ -54,7 +55,7 @@ class ConnectionTests(NeoUnitTestBase): ...@@ -54,7 +55,7 @@ class ConnectionTests(NeoUnitTestBase):
def _makeClientConnection(self): def _makeClientConnection(self):
self.connector = DoNothingConnector() self.connector = DoNothingConnector()
return ClientConnection(event_manager=self.em, handler=self.handler, return ClientConnection(event_manager=self.em, handler=self.handler,
connector=self.connector, addr=self.address) connector=self.connector, node=self.node)
def _makeServerConnection(self): def _makeServerConnection(self):
self.connector = DoNothingConnector() self.connector = DoNothingConnector()
...@@ -843,7 +844,7 @@ class MTConnectionTests(ConnectionTests): ...@@ -843,7 +844,7 @@ class MTConnectionTests(ConnectionTests):
def _makeClientConnection(self): def _makeClientConnection(self):
self.connector = DoNothingConnector() self.connector = DoNothingConnector()
return MTClientConnection(event_manager=self.em, handler=self.handler, return MTClientConnection(event_manager=self.em, handler=self.handler,
connector=self.connector, addr=self.address, connector=self.connector, node=self.node,
dispatcher=self.dispatcher) dispatcher=self.dispatcher)
def test_MTClientConnectionQueueParameter(self): def test_MTClientConnectionQueueParameter(self):
......
...@@ -346,9 +346,6 @@ class NeoCTL(neo.neoctl.app.NeoCTL): ...@@ -346,9 +346,6 @@ class NeoCTL(neo.neoctl.app.NeoCTL):
super(NeoCTL, self).__init__(cluster.admin.getVirtualAddress()) super(NeoCTL, self).__init__(cluster.admin.getVirtualAddress())
self.em._timeout = -1 self.em._timeout = -1
server = property(lambda self: ServerNode.resolv(self._server),
lambda self, address: setattr(self, '_server', address))
class LoggerThreadName(str): class LoggerThreadName(str):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment