Commit efc8a8ec authored by Grégory Wisniewski's avatar Grégory Wisniewski

Use NotifyReplicatioDone packet.

git-svn-id: https://svn.erp5.org/repos/neo/trunk@1468 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent fd4a9b16
...@@ -92,50 +92,31 @@ class StorageServiceHandler(BaseServiceHandler): ...@@ -92,50 +92,31 @@ class StorageServiceHandler(BaseServiceHandler):
# What is this? # What is this?
pass pass
def notifyPartitionChanges(self, conn, packet, ptid, cell_list): def notifyReplicationDone(self, conn, packet, offset):
# This should be sent when a cell becomes up-to-date because
# a replication has finished.
uuid = conn.getUUID() uuid = conn.getUUID()
app = self.app node = self.app.nm.getByUUID(uuid)
node = app.nm.getByUUID(uuid) logging.debug("node %s is up for offset %s" % (dump(uuid), offset))
new_cell_list = [] # check the partition is assigned and known as outdated
for cell in cell_list: for cell in self.app.pt.getCellList(offset):
if cell[2] != CellStates.UP_TO_DATE: if cell.getUUID() == uuid:
logging.warn('only up-to-date state should be sent') if not cell.isOutOfDate():
continue raise ProtocolError("Non-oudated partition")
break
if uuid != cell[1]: else:
logging.warn('only a cell itself should send this packet') raise ProtocolError("Non-assigned partition")
continue
offset = cell[0]
logging.debug("node %s is up for offset %s" %
(dump(node.getUUID()), offset))
# check the storage said it is up to date for a partition it was
# assigne to
for xcell in app.pt.getCellList(offset):
if xcell.getNode().getUUID() == node.getUUID() and \
xcell.getState() not in (CellStates.OUT_OF_DATE,
CellStates.UP_TO_DATE):
msg = "node %s telling that it is UP TO DATE for offset \
%s but where %s for that offset" % (dump(node.getUUID()),
offset, xcell.getState())
raise ProtocolError(msg)
app.pt.setCell(offset, node, CellStates.UP_TO_DATE) # update the partition table
new_cell_list.append(cell) self.app.pt.setCell(offset, node, CellStates.UP_TO_DATE)
cell_list = [(offset, uuid, CellStates.UP_TO_DATE)]
# If the partition contains a feeding cell, drop it now. # If the partition contains a feeding cell, drop it now.
for feeding_cell in app.pt.getCellList(offset): for feeding_cell in self.app.pt.getCellList(offset):
if feeding_cell.getState() == CellStates.FEEDING: if feeding_cell.isFeeding():
app.pt.removeCell(offset, feeding_cell.getNode()) self.app.pt.removeCell(offset, feeding_cell.getNode())
new_cell_list.append((offset, feeding_cell.getUUID(), cell = (offset, feeding_cell.getUUID(), CellStates.DISCARDED)
CellStates.DISCARDED)) cell_list.append(cell)
break break
self.app.broadcastPartitionChanges(cell_list)
app.broadcastPartitionChanges(new_cell_list)
...@@ -195,10 +195,8 @@ class Replicator(object): ...@@ -195,10 +195,8 @@ class Replicator(object):
self.partition_dict.pop(self.current_partition.getRID()) self.partition_dict.pop(self.current_partition.getRID())
# Notify to a primary master node that my cell is now up-to-date. # Notify to a primary master node that my cell is now up-to-date.
conn = self.primary_master_connection conn = self.primary_master_connection
p = Packets.NotifyPartitionChanges(app.pt.getID(), offset = self.current_partition.getRID()
[(self.current_partition.getRID(), app.uuid, conn.notify(Packets.NotifyReplicationDone(offset))
CellStates.UP_TO_DATE)])
conn.notify(p)
except KeyError: except KeyError:
pass pass
self.current_partition = None self.current_partition = None
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment