Commit fd4a9b16 authored by Grégory Wisniewski's avatar Grégory Wisniewski

Add notifyReplicationDone packet.

This packet replace notifyPartitionChange when used by storage's replicator at the end of a single partition replication.
In those case, only one partition was involved and the node concerned must match with the sender identify, causing many useless on the master side.
This packet contains only the partition number that is replicated and implies that the node's cell is now up to date.
It should be sent only by the storage node from replicator and received by master node.

git-svn-id: https://svn.erp5.org/repos/neo/trunk@1467 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 1265b4c9
...@@ -331,6 +331,9 @@ class EventHandler(object): ...@@ -331,6 +331,9 @@ class EventHandler(object):
def notifyLastOID(self, conn, packet, oid): def notifyLastOID(self, conn, packet, oid):
raise UnexpectedPacketError raise UnexpectedPacketError
def notifyReplicationDone(self, conn, packet, offset):
raise UnexpectedPacketError
# Error packet handlers. # Error packet handlers.
...@@ -433,6 +436,7 @@ class EventHandler(object): ...@@ -433,6 +436,7 @@ class EventHandler(object):
d[Packets.AnswerClusterState] = self.answerClusterState d[Packets.AnswerClusterState] = self.answerClusterState
d[Packets.NotifyClusterInformation] = self.notifyClusterInformation d[Packets.NotifyClusterInformation] = self.notifyClusterInformation
d[Packets.NotifyLastOID] = self.notifyLastOID d[Packets.NotifyLastOID] = self.notifyLastOID
d[Packets.NotifyReplicationDone] = self.notifyReplicationDone
return d return d
......
...@@ -249,5 +249,8 @@ class PacketLogger(EventHandler): ...@@ -249,5 +249,8 @@ class PacketLogger(EventHandler):
def notifyLastOID(self, conn, packet, oid): def notifyLastOID(self, conn, packet, oid):
pass pass
def notifyReplicationDone(self, conn, packet, offset):
pass
PACKET_LOGGER = PacketLogger() PACKET_LOGGER = PacketLogger()
...@@ -518,6 +518,20 @@ class NotifyPartitionChanges(Packet): ...@@ -518,6 +518,20 @@ class NotifyPartitionChanges(Packet):
cell_list.append((offset, uuid, state)) cell_list.append((offset, uuid, state))
return (ptid, cell_list) return (ptid, cell_list)
class NotifyReplicationDone(Packet):
"""
Notify the master node that a partition has been successully replicated from
a storage to another.
S -> M
"""
def _encode(self, offset):
return pack('!L', offset)
def _decode(self, body):
(offset, ) = unpack('!L', body)
return (offset, )
class StartOperation(Packet): class StartOperation(Packet):
""" """
Tell a storage nodes to start an operation. Until a storage node receives Tell a storage nodes to start an operation. Until a storage node receives
...@@ -1317,6 +1331,7 @@ class PacketRegistry(dict): ...@@ -1317,6 +1331,7 @@ class PacketRegistry(dict):
AskClusterState = register(0x0028, AskClusterState) AskClusterState = register(0x0028, AskClusterState)
AnswerClusterState = register(0x8028, AnswerClusterState) AnswerClusterState = register(0x8028, AnswerClusterState)
NotifyLastOID = register(0x0030, NotifyLastOID) NotifyLastOID = register(0x0030, NotifyLastOID)
NotifyReplicationDone = register(0x0031, NotifyReplicationDone)
# build a "singleton" # build a "singleton"
Packets = PacketRegistry() Packets = PacketRegistry()
......
...@@ -470,6 +470,12 @@ class ProtocolTests(NeoTestBase): ...@@ -470,6 +470,12 @@ class ProtocolTests(NeoTestBase):
p_oid_list = p.decode()[0] p_oid_list = p.decode()[0]
self.assertEqual(p_oid_list, oid_list) self.assertEqual(p_oid_list, oid_list)
def test_57_notifyReplicationDone(self):
offset = 10
p = Packets.NotifyReplicationDone(offset)
p_offset = p.decode()[0]
self.assertEqual(p_offset, offset)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment