Commit 39465fec by Grégory Wisniewski

Answer the partition table in one packet.

SendPartitionTable packet was sent between Ask and Answer PartitionTable
packets, as notifications. In this case, the only purpose of the 'Answer'
was to check that the partition table was filled. The 'Ask' allowed also
to request a part of the partitions but was not used and redundant with
AskPartitionList for neoctl.

This commit include the following work:
- The partition table is always send in one packet.
- The full partition table is always requested with AskPartitionTable
- The full partition table is notified with SendPartitionTable
- Client node process the answer in the bootstrap handler.
- Admin can receive answer *and* notifications for the partition table.
- Move the log calls to the pt.py module
- Add pt.getRowList() to factorise the code.
- Build partition table packets out of the loop when possible
- Always load inconditionnaly the partition table in generic pt.py
-

git-svn-id: https://svn.erp5.org/repos/neo/trunk@2114 71dcc9de-d417-0410-9af5-da40c76e7ee4
1 parent b05a5569
......@@ -134,7 +134,7 @@ class Application(object):
# passive handler
self.master_conn.setHandler(self.master_event_handler)
self.master_conn.ask(Packets.AskNodeInformation())
self.master_conn.ask(Packets.AskPartitionTable([]))
self.master_conn.ask(Packets.AskPartitionTable())
def sendPartitionTable(self, conn, min_offset, max_offset, uuid):
# we have a pt
......
......@@ -50,8 +50,7 @@ class AdminEventHandler(EventHandler):
if self.app.master_conn is None:
raise protocol.NotReadyError('Not connected to a primary ' \
'master.')
p = Packets.AskPartitionTable([])
msg_id = self.app.master_conn.ask(p)
msg_id = self.app.master_conn.ask(Packets.AskPartitionTable())
app.dispatcher.register(msg_id, conn,
{'min_offset' : min_offset,
'max_offset' : max_offset,
......@@ -146,18 +145,14 @@ class MasterEventHandler(EventHandler):
# implemented for factorize code (as done for bootstrap)
logging.debug("answerNodeInformation")
def answerPartitionTable(self, conn, ptid, row_list):
# XXX: This will no more exists when the initialization module will be
# implemented for factorize code (as done for bootstrap)
logging.debug("answerPartitionTable")
def notifyPartitionChanges(self, conn, ptid, cell_list):
self.app.pt.update(ptid, cell_list, self.app.nm)
def answerPartitionTable(self, conn, ptid, row_list):
self.app.pt.load(ptid, row_list, self.app.nm)
def sendPartitionTable(self, conn, ptid, row_list):
self.app.pt.clear()
self.app.pt.load(ptid, row_list, self.app.nm)
self.app.pt.log()
def notifyClusterInformation(self, conn, cluster_state):
self.app.cluster_state = cluster_state
......@@ -169,7 +164,7 @@ class MasterEventHandler(EventHandler):
# Re-ask partition table, in case node change filled it.
# XXX: we should only ask it if received states indicates it is
# possible (ignore TEMPORARILY_DOWN for example)
conn.ask(Packets.AskPartitionTable([]))
conn.ask(Packets.AskPartitionTable())
class MasterRequestEventHandler(EventHandler):
""" This class handle all answer from primary master node"""
......
......@@ -368,7 +368,7 @@ class Application(object):
msg_id = conn.ask(Packets.AskNodeInformation())
self._waitMessage(conn, msg_id,
handler=self.primary_bootstrap_handler)
msg_id = conn.ask(Packets.AskPartitionTable([]))
msg_id = conn.ask(Packets.AskPartitionTable())
self._waitMessage(conn, msg_id,
handler=self.primary_bootstrap_handler)
ready = self.uuid is not None and self.pt is not None \
......
......@@ -81,7 +81,8 @@ class PrimaryBootstrapHandler(AnswerBaseHandler):
conn.close()
def answerPartitionTable(self, conn, ptid, row_list):
pass
assert row_list
self.app.pt.load(ptid, row_list, self.app.nm)
def answerNodeInformation(self, conn):
pass
......@@ -138,9 +139,6 @@ class PrimaryNotificationsHandler(BaseHandler):
if self.app.pt.filled():
self.app.pt.update(ptid, cell_list, self.app.nm)
def sendPartitionTable(self, conn, ptid, row_list):
self.app.pt.load(ptid, row_list, self.app.nm)
def notifyNodeInformation(self, conn, node_list):
app = self.app
self.app.nm.update(node_list)
......
......@@ -163,15 +163,12 @@ class EventHandler(object):
def answerLastIDs(self, conn, loid, ltid, lptid):
raise UnexpectedPacketError
def askPartitionTable(self, conn, offset_list):
def askPartitionTable(self, conn):
raise UnexpectedPacketError
def answerPartitionTable(self, conn, ptid, row_list):
raise UnexpectedPacketError
def sendPartitionTable(self, conn, ptid, row_list):
raise UnexpectedPacketError
def notifyPartitionChanges(self, conn, ptid, cell_list):
raise UnexpectedPacketError
......@@ -388,7 +385,6 @@ class EventHandler(object):
d[Packets.AnswerLastIDs] = self.answerLastIDs
d[Packets.AskPartitionTable] = self.askPartitionTable
d[Packets.AnswerPartitionTable] = self.answerPartitionTable
d[Packets.SendPartitionTable] = self.sendPartitionTable
d[Packets.NotifyPartitionChanges] = self.notifyPartitionChanges
d[Packets.StartOperation] = self.startOperation
d[Packets.StopOperation] = self.stopOperation
......
......@@ -278,31 +278,19 @@ class Application(object):
logging.debug('broadcastPartitionChanges')
if not cell_list:
return
ptid = self.pt.setNextID()
self.pt.log()
ptid = self.pt.setNextID()
packet = Packets.NotifyPartitionChanges(ptid, cell_list)
for node in self.nm.getIdentifiedList():
if not node.isRunning():
continue
if node.isClient() or node.isStorage() or node.isAdmin():
node.notify(Packets.NotifyPartitionChanges(ptid, cell_list))
node.notify(packet)
def outdateAndBroadcastPartition(self):
" Outdate cell of non-working nodes and broadcast changes """
self.broadcastPartitionChanges(self.pt.outdate())
def sendPartitionTable(self, conn):
""" Send the partition table through the given connection """
row_list = []
for offset in xrange(self.pt.getPartitions()):
row_list.append((offset, self.pt.getRow(offset)))
# Split the packet if too huge.
if len(row_list) == 1000:
conn.notify(Packets.SendPartitionTable(self.pt.getID(),
row_list))
del row_list[:]
if row_list:
conn.notify(Packets.SendPartitionTable(self.pt.getID(), row_list))
def sendNodesInformations(self, conn):
""" Send informations on all nodes through the given connection """
node_list = []
......
......@@ -62,11 +62,10 @@ class MasterHandler(EventHandler):
self.app.sendNodesInformations(conn)
conn.answer(Packets.AnswerNodeInformation())
def askPartitionTable(self, conn, offset_list):
assert len(offset_list) == 0
app = self.app
app.sendPartitionTable(conn)
conn.answer(Packets.AnswerPartitionTable(app.pt.getID(), []))
def askPartitionTable(self, conn):
ptid = self.app.pt.getID()
row_list = self.app.pt.getRowList()
conn.answer(Packets.AnswerPartitionTable(ptid, row_list))
DISCONNECTED_STATE_DICT = {
......
......@@ -127,7 +127,7 @@ class RecoveryManager(MasterHandler):
if lptid > self.target_ptid:
# something newer
self.target_ptid = lptid
conn.ask(Packets.AskPartitionTable([]))
conn.ask(Packets.AskPartitionTable())
def answerPartitionTable(self, conn, ptid, row_list):
app = self.app
......@@ -142,8 +142,11 @@ class RecoveryManager(MasterHandler):
raise ProtocolError('Invalid offset')
else:
notification = Packets.NotifyNodeInformation(new_nodes)
ptid = self.app.pt.getID()
row_list = self.app.pt.getRowList()
partition_table = Packets.SendPartitionTable(ptid, row_list)
# notify the admin nodes
for node in self.app.nm.getAdminList(only_identified=True):
node.notify(notification)
self.app.sendPartitionTable(node.getConnection())
node.notify(partition_table)
......@@ -476,33 +476,9 @@ class AnswerLastIDs(Packet):
class AskPartitionTable(Packet):
"""
Ask rows in a partition table that a storage node stores. Used to recover
information. PM -> S.
Ask the full partition table. PM -> S.
"""
_header_format = '!L'
_list_entry_format = '!L'
_list_entry_len = calcsize(_list_entry_format)
def _encode(self, offset_list):
body = [pack(self._header_format, len(offset_list))]
list_entry_format = self._list_entry_format
for offset in offset_list:
body.append(pack(list_entry_format, offset))
return ''.join(body)
def _decode(self, body):
packet_offset = self._header_len
(n,) = unpack(self._header_format, body[:packet_offset])
offset_list = []
list_entry_len = self._list_entry_len
list_entry_format = self._list_entry_format
for _ in xrange(n):
next_packet_offset = packet_offset + list_entry_len
offset = unpack(list_entry_format,
body[packet_offset:next_packet_offset])[0]
packet_offset = next_packet_offset
offset_list.append(offset)
return (offset_list,)
pass
class AnswerPartitionTable(Packet):
"""
......
......@@ -195,19 +195,20 @@ class PartitionTable(object):
def load(self, ptid, row_list, nm):
"""
Load the partition table with the specified PTID, discard all previous
content and can be done in multiple calls
content.
"""
if ptid != self.id:
self.clear()
self.id = ptid
self.clear()
self.id = ptid
for offset, row in row_list:
if offset >= self.getPartitions() or self.hasOffset(offset):
if offset >= self.getPartitions():
raise IndexError
for uuid, state in row:
node = nm.getByUUID(uuid)
# the node must be known by the node manager
assert node is not None
self.setCell(offset, node, state)
logging.debug('partition table loaded')
self.log()
def update(self, ptid, cell_list, nm):
"""
......@@ -299,6 +300,10 @@ class PartitionTable(object):
return []
return [(cell.getUUID(), cell.getState()) for cell in row]
def getRowList(self):
getRow = self.getRow
return [(x, getRow(x)) for x in xrange(self.np)]
def thread_safe(method):
def wrapper(self, *args, **kwargs):
......
......@@ -247,7 +247,7 @@ class Application(object):
self.pt.clear()
self.master_conn.ask(Packets.AskLastIDs())
self.master_conn.ask(Packets.AskNodeInformation())
self.master_conn.ask(Packets.AskPartitionTable(()))
self.master_conn.ask(Packets.AskPartitionTable())
while not self.has_node_information or not self.has_partition_table \
or not self.has_last_ids:
self.em.poll(1)
......
......@@ -29,15 +29,10 @@ class InitializationHandler(BaseMasterHandler):
# the whole node list is received here
BaseMasterHandler.notifyNodeInformation(self, conn, node_list)
def sendPartitionTable(self, conn, ptid, row_list):
"""A primary master node sends this packet to synchronize a partition
table. Note that the message can be split into multiple packets."""
self.app.pt.load(ptid, row_list, self.app.nm)
def answerPartitionTable(self, conn, ptid, row_list):
app = self.app
pt = app.pt
assert not row_list
pt.load(ptid, row_list, self.app.nm)
if not pt.filled():
raise protocol.ProtocolError('Partial partition table received')
logging.debug('Got the partition table :')
......@@ -68,9 +63,6 @@ class InitializationHandler(BaseMasterHandler):
def notifyPartitionChanges(self, conn, ptid, cell_list):
# XXX: This is safe to ignore those notifications because all of the
# following applies:
# - master is monothreaded (notifyPartitionChanges cannot happen
# between sendPartitionTable/answerPartitionTable packets), so
# receiving the whole partition table is atomic
# - we first ask for node information, and *then* partition
# table content, so it is possible to get notifyPartitionChanges
# packets in between (or even before asking for node information).
......
......@@ -37,20 +37,10 @@ class VerificationHandler(BaseMasterHandler):
tid = None
conn.answer(Packets.AnswerLastIDs(oid, tid, app.pt.getID()))
def askPartitionTable(self, conn, offset_list):
if not offset_list:
# all is requested
offset_list = xrange(0, self.app.pt.getPartitions())
else:
if max(offset_list) >= self.app.pt.getPartitions():
raise ProtocolError('invalid partition table offset')
# build a table with requested partitions
row_list = [(offset, [(cell.getUUID(), cell.getState())
for cell in self.app.pt.getCellList(offset)])
for offset in offset_list]
conn.answer(Packets.AnswerPartitionTable(self.app.pt.getID(), row_list))
def askPartitionTable(self, conn):
ptid = self.app.pt.getID()
row_list = self.app.pt.getRowList()
conn.answer(Packets.AnswerPartitionTable(ptid, row_list))
def notifyPartitionChanges(self, conn, ptid, cell_list):
"""This is very similar to Send Partition Table, except that
......
......@@ -124,6 +124,16 @@ class MasterBootstrapHandlerTests(MasterHandlerTests):
self.assertTrue(self.app.primary_master_node is node)
self.checkClosed(conn)
def test_answerPartitionTable(self):
conn = self.getConnection()
self.app.pt = Mock()
ptid = 0
row_list = ([], [])
self.handler.answerPartitionTable(conn, ptid, row_list)
load_calls = self.app.pt.mockGetNamedCalls('load')
self.assertEqual(len(load_calls), 1)
# load_calls[0].checkArgs(ptid, row_list, self.app.nm)
class MasterNotificationsHandlerTests(MasterHandlerTests):
......@@ -168,16 +178,6 @@ class MasterNotificationsHandlerTests(MasterHandlerTests):
self.assertEqual(len(update_calls), 1)
update_calls[0].checkArgs(ptid, cell_list, self.app.nm)
def test_sendPartitionTable(self):
conn = self.getConnection()
self.app.pt = Mock()
ptid = 0
row_list = (Mock(), Mock())
self.handler.sendPartitionTable(conn, ptid, row_list)
load_calls = self.app.pt.mockGetNamedCalls('load')
self.assertEqual(len(load_calls), 1)
load_calls[0].checkArgs(ptid, row_list, self.app.nm)
def test_notifyNodeInformation(self):
conn = self.getConnection()
addr = ('127.0.0.1', 1000)
......
......@@ -21,7 +21,7 @@ from neo.tests import NeoTestBase
from neo.pt import PartitionTable
from neo.storage.app import Application
from neo.storage.handlers.initialization import InitializationHandler
from neo.protocol import CellStates
from neo.protocol import CellStates, ProtocolError
from neo.exception import PrimaryFailure
class StorageInitializationHandlerTests(NeoTestBase):
......@@ -71,7 +71,7 @@ class StorageInitializationHandlerTests(NeoTestBase):
# nothing happens
self.checkNoPacketSent(conn)
def test_09_sendPartitionTable(self):
def test_09_answerPartitionTable(self):
# send a table
conn = self.getClientConnection()
self.app.pt = PartitionTable(3, 2)
......@@ -87,20 +87,8 @@ class StorageInitializationHandlerTests(NeoTestBase):
(1, ((node_3, CellStates.UP_TO_DATE), (node_1, CellStates.UP_TO_DATE))),
(2, ((node_2, CellStates.UP_TO_DATE), (node_3, CellStates.UP_TO_DATE)))]
self.assertFalse(self.app.pt.filled())
# send part of the table, won't be filled
self.verification.sendPartitionTable(conn, 1, row_list[:1])
self.assertFalse(self.app.pt.filled())
self.assertEqual(self.app.pt.getID(), 1)
self.assertEqual(self.app.dm.getPartitionTable(), [])
# send remaining of the table (ack with AnswerPartitionTable)
self.verification.sendPartitionTable(conn, 1, row_list[1:])
self.verification.answerPartitionTable(conn, 1, [])
self.assertTrue(self.app.pt.filled())
self.assertEqual(self.app.pt.getID(), 1)
self.assertNotEqual(self.app.dm.getPartitionTable(), [])
# send a complete new table and ack
self.verification.sendPartitionTable(conn, 2, row_list)
self.verification.answerPartitionTable(conn, 2, [])
self.verification.answerPartitionTable(conn, 2, row_list)
self.assertTrue(self.app.pt.filled())
self.assertEqual(self.app.pt.getID(), 2)
self.assertNotEqual(self.app.dm.getPartitionTable(), [])
......
......@@ -121,19 +121,6 @@ class StorageVerificationHandlerTests(NeoTestBase):
self.assertEqual(ptid, self.app.pt.getID())
def test_08_askPartitionTable(self):
# try to get unknown offset
self.assertEqual(len(self.app.pt.getNodeList()), 0)
self.assertFalse(self.app.pt.hasOffset(1))
self.assertEqual(len(self.app.pt.getCellList(1)), 0)
conn = self.getClientConnection()
self.verification.askPartitionTable(conn, [1])
ptid, row_list = self.checkAnswerPartitionTable(conn, decode=True)
self.assertEqual(len(row_list), 1)
offset, rows = row_list[0]
self.assertEqual(offset, 1)
self.assertEqual(len(rows), 0)
# try to get known offset
node = self.app.nm.createStorage(
address=("127.7.9.9", 1),
uuid=self.getNewUUID()
......@@ -141,12 +128,9 @@ class StorageVerificationHandlerTests(NeoTestBase):
self.app.pt.setCell(1, node, CellStates.UP_TO_DATE)
self.assertTrue(self.app.pt.hasOffset(1))
conn = self.getClientConnection()
self.verification.askPartitionTable(conn, [1])
self.verification.askPartitionTable(conn)
ptid, row_list = self.checkAnswerPartitionTable(conn, decode=True)
self.assertEqual(len(row_list), 1)
offset, rows = row_list[0]
self.assertEqual(offset, 1)
self.assertEqual(len(rows), 1)
self.assertEqual(len(row_list), 1009)
def test_10_notifyPartitionChanges(self):
# old partition change
......
......@@ -144,10 +144,7 @@ class ProtocolTests(NeoTestBase):
self.assertEqual(lptid, ptid)
def test_20_askPartitionTable(self):
offset_list = [1, 523, 6, 124]
p = Packets.AskPartitionTable(offset_list)
p_offset_list = p.decode()[0]
self.assertEqual(offset_list, p_offset_list)
self.assertEqual(Packets.AskPartitionTable().decode(), ())
def test_21_answerPartitionTable(self):
ptid = self.getNextTID()
......
Styling with Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!