Commit e7cccf01 authored by Julien Muchembled's avatar Julien Muchembled

lib.node: fix NodeManager accessors returning identified nodes

Listing connected/connecting nodes with a UUID is used:
- in one place by storage nodes: here, it does not matter if we skip nodes that
  aren't really identified
- in many places by the master, only for server connections, in which case we
  have equivalence with real identification

So in practice, NodeManager is only simplified to reuse the 'identified'
property of nodes.
parent 5941b27d
...@@ -26,6 +26,7 @@ class Node(object): ...@@ -26,6 +26,7 @@ class Node(object):
"""This class represents a node.""" """This class represents a node."""
_connection = None _connection = None
_identified = False
def __init__(self, manager, address=None, uuid=None, def __init__(self, manager, address=None, uuid=None,
state=NodeStates.UNKNOWN): state=NodeStates.UNKNOWN):
...@@ -34,7 +35,6 @@ class Node(object): ...@@ -34,7 +35,6 @@ class Node(object):
self._uuid = uuid self._uuid = uuid
self._manager = manager self._manager = manager
self._last_state_change = time() self._last_state_change = time()
self._identified = False
manager.add(self) manager.add(self)
def notify(self, packet): def notify(self, packet):
...@@ -83,7 +83,6 @@ class Node(object): ...@@ -83,7 +83,6 @@ class Node(object):
old_uuid = self._uuid old_uuid = self._uuid
self._uuid = uuid self._uuid = uuid
self._manager._updateUUID(self, old_uuid) self._manager._updateUUID(self, old_uuid)
self._manager._updateIdentified(self)
if self._connection is not None: if self._connection is not None:
self._connection.setUUID(uuid) self._connection.setUUID(uuid)
...@@ -97,7 +96,6 @@ class Node(object): ...@@ -97,7 +96,6 @@ class Node(object):
assert self._connection is not None assert self._connection is not None
del self._connection del self._connection
self._identified = False self._identified = False
self._manager._updateIdentified(self)
def setConnection(self, connection, force=None): def setConnection(self, connection, force=None):
""" """
...@@ -136,7 +134,6 @@ class Node(object): ...@@ -136,7 +134,6 @@ class Node(object):
conn.close() conn.close()
assert not connection.isClosed(), connection assert not connection.isClosed(), connection
connection.setOnClose(self.onConnectionClosed) connection.setOnClose(self.onConnectionClosed)
self._manager._updateIdentified(self)
def getConnection(self): def getConnection(self):
""" """
...@@ -163,12 +160,13 @@ class Node(object): ...@@ -163,12 +160,13 @@ class Node(object):
return self._identified return self._identified
def __repr__(self): def __repr__(self):
return '<%s(uuid=%s, address=%s, state=%s, connection=%r) at %x>' % ( return '<%s(uuid=%s, address=%s, state=%s, connection=%r%s) at %x>' % (
self.__class__.__name__, self.__class__.__name__,
uuid_str(self._uuid), uuid_str(self._uuid),
self._address, self._address,
self._state, self._state,
self._connection, self._connection,
'' if self._identified else ', not identified',
id(self), id(self),
) )
...@@ -248,7 +246,6 @@ class NodeManager(object): ...@@ -248,7 +246,6 @@ class NodeManager(object):
self._uuid_dict = {} self._uuid_dict = {}
self._type_dict = {} self._type_dict = {}
self._state_dict = {} self._state_dict = {}
self._identified_dict = {}
if master_db is not None: if master_db is not None:
self._master_db = db = MasterDB(master_db) self._master_db = db = MasterDB(master_db)
for addr in db: for addr in db:
...@@ -266,7 +263,6 @@ class NodeManager(object): ...@@ -266,7 +263,6 @@ class NodeManager(object):
self._updateUUID(node, None) self._updateUUID(node, None)
self.__updateSet(self._type_dict, None, node.getType(), node) self.__updateSet(self._type_dict, None, node.getType(), node)
self.__updateSet(self._state_dict, None, node.getState(), node) self.__updateSet(self._state_dict, None, node.getState(), node)
self._updateIdentified(node)
if node.isMaster() and self._master_db is not None: if node.isMaster() and self._master_db is not None:
self._master_db.add(node.getAddress()) self._master_db.add(node.getAddress())
...@@ -283,8 +279,6 @@ class NodeManager(object): ...@@ -283,8 +279,6 @@ class NodeManager(object):
self.__dropSet(self._state_dict, node.getState(), node) self.__dropSet(self._state_dict, node.getState(), node)
self.__dropSet(self._type_dict, node.getType(), node) self.__dropSet(self._type_dict, node.getType(), node)
uuid = node.getUUID() uuid = node.getUUID()
if uuid in self._identified_dict:
del self._identified_dict[uuid]
if node.isMaster() and self._master_db is not None: if node.isMaster() and self._master_db is not None:
self._master_db.discard(node.getAddress()) self._master_db.discard(node.getAddress())
...@@ -300,17 +294,6 @@ class NodeManager(object): ...@@ -300,17 +294,6 @@ class NodeManager(object):
'would overwrite %r' % (node, new_key, index_dict[new_key]) 'would overwrite %r' % (node, new_key, index_dict[new_key])
index_dict[new_key] = node index_dict[new_key] = node
def _updateIdentified(self, node):
uuid = node.getUUID()
if uuid:
# XXX: It's probably a bug to include connecting nodes but there's
# no API yet to update manager when connection is established.
if node.isConnected(connecting=True):
assert node in self._node_set, node
self._identified_dict[uuid] = node
else:
self._identified_dict.pop(uuid, None)
def _updateAddress(self, node, old_address): def _updateAddress(self, node, old_address):
self.__update(self._address_dict, old_address, node.getAddress(), node) self.__update(self._address_dict, old_address, node.getAddress(), node)
...@@ -341,10 +324,8 @@ class NodeManager(object): ...@@ -341,10 +324,8 @@ class NodeManager(object):
Returns a generator to iterate over identified nodes Returns a generator to iterate over identified nodes
pool_set is an iterable of UUIDs allowed pool_set is an iterable of UUIDs allowed
""" """
if pool_set is not None: return [x for x in self._node_set if x.isIdentified() and (
identified_nodes = self._identified_dict.items() pool_set is None or x.getUUID() in pool_set)]
return [v for k, v in identified_nodes if k in pool_set]
return self._identified_dict.values()
def getConnectedList(self): def getConnectedList(self):
""" """
...@@ -360,7 +341,7 @@ class NodeManager(object): ...@@ -360,7 +341,7 @@ class NodeManager(object):
def _getTypeList(self, node_type, only_identified=False): def _getTypeList(self, node_type, only_identified=False):
node_set = self._type_dict.get(node_type, ()) node_set = self._type_dict.get(node_type, ())
if only_identified: if only_identified:
return [x for x in node_set if x.getUUID() in self._identified_dict] return [x for x in node_set if x.isIdentified()]
return list(node_set) return list(node_set)
def getByAddress(self, address): def getByAddress(self, address):
......
...@@ -308,6 +308,11 @@ class Application(BaseApplication): ...@@ -308,6 +308,11 @@ class Application(BaseApplication):
# masters will reconnect nevertheless, but it's dirty. # masters will reconnect nevertheless, but it's dirty.
# Currently, it's not trivial to preserve connected nodes, # Currently, it's not trivial to preserve connected nodes,
# because of poor node status tracking during election. # because of poor node status tracking during election.
# XXX: The above comment is partially wrong in that the primary
# master is now responsible of allocating node ids, and all
# other nodes must only create/update/remove nodes when
# processing node notification. We probably want to keep the
# current behaviour: having only server connections.
conn.abort() conn.abort()
# If I know any storage node, make sure that they are not in the # If I know any storage node, make sure that they are not in the
......
...@@ -108,10 +108,12 @@ class MasterClientHandlerTests(NeoUnitTestBase): ...@@ -108,10 +108,12 @@ class MasterClientHandlerTests(NeoUnitTestBase):
# do the right job # do the right job
client_uuid = self.identifyToMasterNode(node_type=NodeTypes.CLIENT, port=self.client_port) client_uuid = self.identifyToMasterNode(node_type=NodeTypes.CLIENT, port=self.client_port)
storage_uuid = self.storage_uuid storage_uuid = self.storage_uuid
storage_conn = self.getFakeConnection(storage_uuid, self.storage_address) storage_conn = self.getFakeConnection(storage_uuid,
self.storage_address, is_server=True)
storage2_uuid = self.identifyToMasterNode(port=10022) storage2_uuid = self.identifyToMasterNode(port=10022)
storage2_conn = self.getFakeConnection(storage2_uuid, storage2_conn = self.getFakeConnection(storage2_uuid,
(self.storage_address[0], self.storage_address[1] + 1)) (self.storage_address[0], self.storage_address[1] + 1),
is_server=True)
self.app.setStorageReady(storage2_uuid) self.app.setStorageReady(storage2_uuid)
conn = self.getFakeConnection(client_uuid, self.client_address) conn = self.getFakeConnection(client_uuid, self.client_address)
self.app.pt = Mock({ self.app.pt = Mock({
...@@ -176,7 +178,7 @@ class MasterClientHandlerTests(NeoUnitTestBase): ...@@ -176,7 +178,7 @@ class MasterClientHandlerTests(NeoUnitTestBase):
conn = self.getFakeConnection(peer_id=peer_id) conn = self.getFakeConnection(peer_id=peer_id)
storage_uuid = self.storage_uuid storage_uuid = self.storage_uuid
storage_conn = self.getFakeConnection(storage_uuid, storage_conn = self.getFakeConnection(storage_uuid,
self.storage_address) self.storage_address, is_server=True)
self.app.nm.getByUUID(storage_uuid).setConnection(storage_conn) self.app.nm.getByUUID(storage_uuid).setConnection(storage_conn)
self.service.askPack(conn, tid) self.service.askPack(conn, tid)
self.checkNoPacketSent(conn) self.checkNoPacketSent(conn)
...@@ -189,7 +191,7 @@ class MasterClientHandlerTests(NeoUnitTestBase): ...@@ -189,7 +191,7 @@ class MasterClientHandlerTests(NeoUnitTestBase):
# Asking again to pack will cause an immediate error # Asking again to pack will cause an immediate error
storage_uuid = self.identifyToMasterNode(port=10022) storage_uuid = self.identifyToMasterNode(port=10022)
storage_conn = self.getFakeConnection(storage_uuid, storage_conn = self.getFakeConnection(storage_uuid,
self.storage_address) self.storage_address, is_server=True)
self.app.nm.getByUUID(storage_uuid).setConnection(storage_conn) self.app.nm.getByUUID(storage_uuid).setConnection(storage_conn)
self.service.askPack(conn, tid) self.service.askPack(conn, tid)
self.checkNoPacketSent(storage_conn) self.checkNoPacketSent(storage_conn)
......
...@@ -41,8 +41,8 @@ class MasterAppTests(NeoUnitTestBase): ...@@ -41,8 +41,8 @@ class MasterAppTests(NeoUnitTestBase):
client = self.app.nm.createClient(uuid=client_uuid) client = self.app.nm.createClient(uuid=client_uuid)
# create conn and patch em # create conn and patch em
master_conn = self.getFakeConnection() master_conn = self.getFakeConnection()
storage_conn = self.getFakeConnection() storage_conn = self.getFakeConnection(is_server=True)
client_conn = self.getFakeConnection() client_conn = self.getFakeConnection(is_server=True)
master.setConnection(master_conn) master.setConnection(master_conn)
storage.setConnection(storage_conn) storage.setConnection(storage_conn)
client.setConnection(client_conn) client.setConnection(client_conn)
......
...@@ -63,7 +63,7 @@ class MasterStorageHandlerTests(NeoUnitTestBase): ...@@ -63,7 +63,7 @@ class MasterStorageHandlerTests(NeoUnitTestBase):
uuid = self.getNewUUID(node_type) uuid = self.getNewUUID(node_type)
node = nm.createFromNodeType(node_type, address=(ip, port), node = nm.createFromNodeType(node_type, address=(ip, port),
uuid=uuid) uuid=uuid)
conn = self.getFakeConnection(node.getUUID(), node.getAddress()) conn = self.getFakeConnection(node.getUUID(), node.getAddress(), True)
node.setConnection(conn) node.setConnection(conn)
return (node, conn) return (node, conn)
......
...@@ -113,10 +113,6 @@ class NodeManagerTests(NeoUnitTestBase): ...@@ -113,10 +113,6 @@ class NodeManagerTests(NeoUnitTestBase):
def checkByUUID(self, node): def checkByUUID(self, node):
self.assertEqual(node, self.nm.getByUUID(node.getUUID())) self.assertEqual(node, self.nm.getByUUID(node.getUUID()))
def checkIdentified(self, node_list, pool_set=None):
identified_node_list = self.nm.getIdentifiedList(pool_set)
self.assertEqual(set(identified_node_list), set(node_list))
def testInit(self): def testInit(self):
""" Check the manager is empty when started """ """ Check the manager is empty when started """
manager = self.nm manager = self.nm
...@@ -213,30 +209,6 @@ class NodeManagerTests(NeoUnitTestBase): ...@@ -213,30 +209,6 @@ class NodeManagerTests(NeoUnitTestBase):
self.checkNodes([self.master, self.admin, new_storage]) self.checkNodes([self.master, self.admin, new_storage])
self.assertEqual(self.admin.getState(), NodeStates.UNKNOWN) self.assertEqual(self.admin.getState(), NodeStates.UNKNOWN)
def testIdentified(self):
# set up four nodes
self._addMaster()
self._addStorage()
self._addClient()
self._addAdmin()
# switch node to connected
self.checkIdentified([])
self.master.setConnection(Mock())
self.checkIdentified([self.master])
self.storage.setConnection(Mock())
self.checkIdentified([self.master, self.storage])
self.client.setConnection(Mock())
self.checkIdentified([self.master, self.storage, self.client])
self.admin.setConnection(Mock())
self.checkIdentified([self.master, self.storage, self.client, self.admin])
# check the pool_set attribute
self.checkIdentified([self.master], pool_set=[self.master.getUUID()])
self.checkIdentified([self.storage], pool_set=[self.storage.getUUID()])
self.checkIdentified([self.client], pool_set=[self.client.getUUID()])
self.checkIdentified([self.admin], pool_set=[self.admin.getUUID()])
self.checkIdentified([self.master, self.storage], pool_set=[
self.master.getUUID(), self.storage.getUUID()])
class MasterDBTests(NeoUnitTestBase): class MasterDBTests(NeoUnitTestBase):
def _checkMasterDB(self, path, expected_master_list): def _checkMasterDB(self, path, expected_master_list):
db = list(MasterDB(path)) db = list(MasterDB(path))
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment