Commit 64fd593b authored by Grégory Wisniewski's avatar Grégory Wisniewski

Factorize duplicate code that send the whole partition table.


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@495 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 65b73485
...@@ -321,6 +321,18 @@ class Application(object): ...@@ -321,6 +321,18 @@ class Application(object):
size -= amt size -= amt
start += amt start += amt
def sendPartitionTable(self, conn):
""" Send the partition table through the given connection """
row_list = []
for offset in xrange(self.num_partitions):
row_list.append((offset, self.pt.getRow(offset)))
# Split the packet if too huge.
if len(row_list) == 1000:
conn.notify(protocol.sendPartitionTable( self.lptid, row_list))
del row_list[:]
if row_list:
conn.notify(protocol.sendPartitionTable(self.lptid, row_list))
def recoverStatus(self): def recoverStatus(self):
"""Recover the status about the cluster. Obtain the last OID, the last TID, """Recover the status about the cluster. Obtain the last OID, the last TID,
and the last Partition Table ID from storage nodes, then get back the latest and the last Partition Table ID from storage nodes, then get back the latest
...@@ -514,17 +526,7 @@ class Application(object): ...@@ -514,17 +526,7 @@ class Application(object):
if uuid is not None: if uuid is not None:
node = nm.getNodeByUUID(uuid) node = nm.getNodeByUUID(uuid)
if node.getNodeType() == STORAGE_NODE_TYPE: if node.getNodeType() == STORAGE_NODE_TYPE:
# Split the packet if too huge. self.sendPartitionTable(conn)
row_list = []
for offset in xrange(self.num_partitions):
row_list.append((offset, self.pt.getRow(offset)))
if len(row_list) == 1000:
p = protocol.sendPartitionTable( self.lptid, row_list)
conn.notify(p)
del row_list[:]
if len(row_list) != 0:
p = protocol.sendPartitionTable(self.lptid, row_list)
conn.notify(p)
# Gather all unfinished transactions. # Gather all unfinished transactions.
# #
......
...@@ -219,15 +219,7 @@ class RecoveryEventHandler(MasterEventHandler): ...@@ -219,15 +219,7 @@ class RecoveryEventHandler(MasterEventHandler):
# send partition table if exists # send partition table if exists
logging.info('sending partition table %s to %s' % (dump(app.lptid), logging.info('sending partition table %s to %s' % (dump(app.lptid),
conn.getAddress())) conn.getAddress()))
# Split the packet if too huge. app.sendPartitionTable(conn)
row_list = []
for offset in xrange(app.num_partitions):
row_list.append((offset, app.pt.getRow(offset)))
if len(row_list) == 1000:
conn.notify(protocol.sendPartitionTable(app.lptid, row_list))
del row_list[:]
if len(row_list) != 0:
conn.notify(protocol.sendPartitionTable(app.lptid, row_list))
def handleAnnouncePrimaryMaster(self, conn, packet): def handleAnnouncePrimaryMaster(self, conn, packet):
uuid = conn.getUUID() uuid = conn.getUUID()
......
...@@ -330,15 +330,7 @@ class ServiceEventHandler(MasterEventHandler): ...@@ -330,15 +330,7 @@ class ServiceEventHandler(MasterEventHandler):
if node.getNodeType() in (STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, ADMIN_NODE_TYPE): if node.getNodeType() in (STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, ADMIN_NODE_TYPE):
logging.info('sending partition table to %s:%d', logging.info('sending partition table to %s:%d',
*(conn.getAddress())) *(conn.getAddress()))
# Split the packet if too huge. app.sendPartitionTable(conn)
row_list = []
for offset in xrange(app.num_partitions):
row_list.append((offset, app.pt.getRow(offset)))
if len(row_list) == 1000:
conn.notify(protocol.sendPartitionTable(app.lptid, row_list))
del row_list[:]
if len(row_list) != 0:
conn.notify(protocol.sendPartitionTable(app.lptid, row_list))
# If this is a storage node, ask it to start. # If this is a storage node, ask it to start.
if node.getNodeType() == STORAGE_NODE_TYPE: if node.getNodeType() == STORAGE_NODE_TYPE:
......
...@@ -237,15 +237,7 @@ class VerificationEventHandler(MasterEventHandler): ...@@ -237,15 +237,7 @@ class VerificationEventHandler(MasterEventHandler):
# If this is a storage node or an admin node, send the partition table. # If this is a storage node or an admin node, send the partition table.
node = app.nm.getNodeByUUID(uuid) node = app.nm.getNodeByUUID(uuid)
if node.getNodeType() in (STORAGE_NODE_TYPE, ADMIN_NODE_TYPE): if node.getNodeType() in (STORAGE_NODE_TYPE, ADMIN_NODE_TYPE):
# Split the packet if too huge. app.sendPartitionTable(conn)
row_list = []
for offset in xrange(app.num_partitions):
row_list.append((offset, app.pt.getRow(offset)))
if len(row_list) == 1000:
conn.notify(protocol.sendPartitionTable(app.lptid, row_list))
del row_list[:]
if len(row_list) != 0:
conn.notify(protocol.sendPartitionTable(app.lptid, row_list))
def handleAnnouncePrimaryMaster(self, conn, packet): def handleAnnouncePrimaryMaster(self, conn, packet):
uuid = conn.getUUID() uuid = conn.getUUID()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment