Commit 5dad376a authored by Grégory Wisniewski's avatar Grégory Wisniewski

Replacement of broadcastNodeInformation() with broadcastNodesInformation()

- Handle a node list instead of a single node
- Send only one packet per peer, reduce bandwidth and latency
- Remove the related XXX

git-svn-id: https://svn.erp5.org/repos/neo/trunk@1451 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 94efac5f
...@@ -265,32 +265,37 @@ class Application(object): ...@@ -265,32 +265,37 @@ class Application(object):
conn.close() conn.close()
bootstrap = False bootstrap = False
# XXX: should accept a node list and send at most one packet per peer def broadcastNodesInformation(self, node_list):
def broadcastNodeInformation(self, node): """
"""Broadcast a Notify Node Information packet.""" Broadcast changes for a set a nodes
logging.debug('broadcasting node information') Send only one packet per connection to reduce bandwidth
node_type = node.getType() """
state = node.getState() node_dict = {}
uuid = node.getUUID() # group modified nodes by destination node type
for node in node_list:
# The server address may be None. address = node.getAddress()
address = node.getAddress() uuid = node.getUUID()
state = node.getState()
if node.isClient(): node_info = (node.getType(), address, uuid, state)
# Only to master nodes and storage nodes. def assign_for_notification(node_type):
for c in self.em.getConnectionList(): # helper function
if c.getUUID() is not None: node_dict.setdefault(node_type, []).append(node_info)
n = self.nm.getByUUID(c.getUUID()) if node.isMaster() or node.isStorage():
if n.isMaster() or n.isStorage() or n.isAdmin(): # client get notifications for master and storage only
node_list = [(node_type, address, uuid, state)] assign_for_notification(NodeTypes.CLIENT)
c.notify(Packets.NotifyNodeInformation(node_list)) if node.isMaster() or node.isStorage() or node.isClient():
elif node.isMaster() or node.isStorage(): assign_for_notification(NodeTypes.MASTER)
for c in self.em.getConnectionList(): assign_for_notification(NodeTypes.STORAGE)
if c.getUUID() is not None: assign_for_notification(NodeTypes.ADMIN)
node_list = [(node_type, address, uuid, state)]
c.notify(Packets.NotifyNodeInformation(node_list)) # send at most one non-empty notification packet per node
elif not node.isAdmin(): for conn in self.em.getConnectionList():
raise RuntimeError('unknown node type') if conn.getUUID() is None:
continue
node = self.nm.getByUUID(conn.getUUID())
node_list = node_dict.get(node.getType(), [])
if node_list:
conn.notify(Packets.NotifyNodeInformation(node_list))
def broadcastPartitionChanges(self, cell_list): def broadcastPartitionChanges(self, cell_list):
"""Broadcast a Notify Partition Changes packet.""" """Broadcast a Notify Partition Changes packet."""
...@@ -363,7 +368,7 @@ class Application(object): ...@@ -363,7 +368,7 @@ class Application(object):
node_list = nm.getStorageList()[:REQUIRED_NODE_NUMBER] node_list = nm.getStorageList()[:REQUIRED_NODE_NUMBER]
for node in node_list: for node in node_list:
node.setRunning() node.setRunning()
self.broadcastNodeInformation(node) self.broadcastNodesInformation(node_list)
# resert IDs generators # resert IDs generators
self.loid = '\0' * 8 self.loid = '\0' * 8
self.ltid = '\0' * 8 self.ltid = '\0' * 8
...@@ -404,7 +409,7 @@ class Application(object): ...@@ -404,7 +409,7 @@ class Application(object):
refused_node_set = set(self.nm.getStorageList()) - allowed_node_set refused_node_set = set(self.nm.getStorageList()) - allowed_node_set
for node in refused_node_set: for node in refused_node_set:
node.setPending() node.setPending()
self.broadcastNodeInformation(node) self.broadcastNodesInformation(refused_node_set)
logging.debug('cluster starts with loid=%s and this partition table :', logging.debug('cluster starts with loid=%s and this partition table :',
dump(self.loid)) dump(self.loid))
......
...@@ -96,7 +96,7 @@ class BaseServiceHandler(MasterHandler): ...@@ -96,7 +96,7 @@ class BaseServiceHandler(MasterHandler):
logging.info('drop a pending node from the node manager') logging.info('drop a pending node from the node manager')
self.app.nm.remove(node) self.app.nm.remove(node)
node.setState(new_state) node.setState(new_state)
self.app.broadcastNodeInformation(node) self.app.broadcastNodesInformation([node])
# clean node related data in specialized handlers # clean node related data in specialized handlers
self.nodeLost(conn, node) self.nodeLost(conn, node)
...@@ -89,7 +89,7 @@ class AdministrationHandler(MasterHandler): ...@@ -89,7 +89,7 @@ class AdministrationHandler(MasterHandler):
node.setState(state) node.setState(state)
p = protocol.noError('state changed') p = protocol.noError('state changed')
conn.answer(p, packet.getId()) conn.answer(p, packet.getId())
app.broadcastNodeInformation(node) app.broadcastNodesInformation([node])
def addPendingNodes(self, conn, packet, uuid_list): def addPendingNodes(self, conn, packet, uuid_list):
uuids = ', '.join([dump(uuid) for uuid in uuid_list]) uuids = ', '.join([dump(uuid) for uuid in uuid_list])
...@@ -113,12 +113,12 @@ class AdministrationHandler(MasterHandler): ...@@ -113,12 +113,12 @@ class AdministrationHandler(MasterHandler):
uuids = ', '.join([dump(uuid) for uuid in uuid_set]) uuids = ', '.join([dump(uuid) for uuid in uuid_set])
logging.info('Adding nodes %s' % uuids) logging.info('Adding nodes %s' % uuids)
# switch nodes to running state # switch nodes to running state
for uuid in uuid_set: node_list = [nm.getByUUID(uuid) for uuid in uuid_set]
node = nm.getByUUID(uuid) for node in node_list:
new_cells = pt.addNode(node) new_cells = pt.addNode(node)
cell_list.extend(new_cells) cell_list.extend(new_cells)
node.setRunning() node.setRunning()
app.broadcastNodeInformation(node) app.broadcastNodesInformation(node_list)
# start nodes # start nodes
for s_conn in em.getConnectionList(): for s_conn in em.getConnectionList():
if s_conn.getUUID() in uuid_set: if s_conn.getUUID() in uuid_set:
......
...@@ -80,5 +80,5 @@ class IdentificationHandler(MasterHandler): ...@@ -80,5 +80,5 @@ class IdentificationHandler(MasterHandler):
conn.answer(Packets.AcceptIdentification(*args), packet.getId()) conn.answer(Packets.AcceptIdentification(*args), packet.getId())
# trigger the event # trigger the event
handler.connectionCompleted(conn) handler.connectionCompleted(conn)
app.broadcastNodeInformation(node) app.broadcastNodesInformation([node])
...@@ -29,7 +29,7 @@ class SecondaryMasterHandler(MasterHandler): ...@@ -29,7 +29,7 @@ class SecondaryMasterHandler(MasterHandler):
node = self.app.nm.getByUUID(conn.getUUID()) node = self.app.nm.getByUUID(conn.getUUID())
assert node is not None assert node is not None
node.setDown() node.setDown()
self.app.broadcastNodeInformation(node) self.app.broadcastNodesInformation([node])
def connectionCompleted(self, conn): def connectionCompleted(self, conn):
pass pass
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment