Commit 8eac180d authored by Grégory Wisniewski's avatar Grégory Wisniewski

Handle logic of replication done in the partition table module.

git-svn-id: https://svn.erp5.org/repos/neo/trunk@2427 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 5dbef120
......@@ -18,11 +18,12 @@
import neo
from neo.protocol import ProtocolError
from neo.protocol import CellStates, Packets
from neo.protocol import Packets
from neo.master.handlers import BaseServiceHandler
from neo.exception import OperationFailure
from neo.util import dump
from neo.connector import ConnectorConnectionClosedException
from neo.pt import PartitionTableException
class StorageServiceHandler(BaseServiceHandler):
......@@ -107,30 +108,12 @@ class StorageServiceHandler(BaseServiceHandler):
tm.remove(tid)
def notifyReplicationDone(self, conn, offset):
uuid = conn.getUUID()
node = self.app.nm.getByUUID(uuid)
neo.logging.debug("node %s is up for offset %s" % (dump(uuid), offset))
# check the partition is assigned and known as outdated
for cell in self.app.pt.getCellList(offset):
if cell.getUUID() == uuid:
if not cell.isOutOfDate():
raise ProtocolError("Non-oudated partition")
break
else:
raise ProtocolError("Non-assigned partition")
# update the partition table
self.app.pt.setCell(offset, node, CellStates.UP_TO_DATE)
cell_list = [(offset, uuid, CellStates.UP_TO_DATE)]
# If the partition contains a feeding cell, drop it now.
for feeding_cell in self.app.pt.getCellList(offset):
if feeding_cell.isFeeding():
self.app.pt.removeCell(offset, feeding_cell.getNode())
cell = (offset, feeding_cell.getUUID(), CellStates.DISCARDED)
cell_list.append(cell)
break
node = self.app.nm.getByUUID(conn.getUUID())
neo.logging.debug("%s is up for offset %s" % (node, offset))
try:
cell_list = self.app.pt.setUpToDate(node, offset)
except PartitionTableException, e:
raise ProtocolError(str(e))
self.app.broadcastPartitionChanges(cell_list)
def answerPack(self, conn, status):
......
......@@ -18,6 +18,7 @@
import neo.pt
from struct import pack, unpack
from neo.protocol import CellStates
from neo.pt import PartitionTableException
class PartitionTable(neo.pt.PartitionTable):
"""This class manages a partition table for the primary master node"""
......@@ -123,6 +124,32 @@ class PartitionTable(neo.pt.PartitionTable):
self.setCell(offset, node, state)
return new_nodes
def setUpToDate(self, node, offset):
"""Set a cell as up-to-date"""
uuid = node.getUUID()
# check the partition is assigned and known as outdated
for cell in self.getCellList(offset):
if cell.getUUID() == uuid:
if not cell.isOutOfDate():
raise PartitionTableException('Non-oudated partition')
break
else:
raise PartitionTableException('Non-assigned partition')
# update the partition table
self.setCell(offset, node, CellStates.UP_TO_DATE)
cell_list = [(offset, uuid, CellStates.UP_TO_DATE)]
# If the partition contains a feeding cell, drop it now.
for feeding_cell in self.getCellList(offset):
if feeding_cell.isFeeding():
self.removeCell(offset, feeding_cell.getNode())
cell = (offset, feeding_cell.getUUID(), CellStates.DISCARDED)
cell_list.append(cell)
break
return cell_list
def addNode(self, node):
"""Add a node. Take it into account that it might not be really a new
node. The strategy is, if a row does not contain a good number of
......
......@@ -22,6 +22,10 @@ from neo.protocol import CellStates
from neo.util import dump, u64
from neo.locking import RLock
class PartitionTableException(Exception):
"""
Base class for partition table exceptions
"""
class Cell(object):
"""This class represents a cell in a partition table."""
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment