Commit 1db68f81 authored by Aurel's avatar Aurel

manage partitions to replicate using dictionnaries instead of lists

thus the remove call can work and prevent from managing partitions which the
storage does not have been assigned to


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@469 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent b1497b28
...@@ -238,8 +238,8 @@ class Replicator(object): ...@@ -238,8 +238,8 @@ class Replicator(object):
def __init__(self, app): def __init__(self, app):
self.app = app self.app = app
self.new_partition_list = self._getOutdatedPartitionList() self.new_partition_dict = self._getOutdatedPartitionList()
self.partition_list = [] self.partition_dict = {}
self.current_partition = None self.current_partition = None
self.current_connection = None self.current_connection = None
self.critical_tid_dict = {} self.critical_tid_dict = {}
...@@ -264,16 +264,16 @@ class Replicator(object): ...@@ -264,16 +264,16 @@ class Replicator(object):
def _getOutdatedPartitionList(self): def _getOutdatedPartitionList(self):
app = self.app app = self.app
partition_list = [] partition_dict = {}
for offset in xrange(app.num_partitions): for offset in xrange(app.num_partitions):
for uuid, state in app.pt.getRow(offset): for uuid, state in app.pt.getRow(offset):
if uuid == app.uuid and state == OUT_OF_DATE_STATE: if uuid == app.uuid and state == OUT_OF_DATE_STATE:
partition_list.append(Partition(offset)) partition_dict[offset] = Partition(offset)
return partition_list return partition_dict
def pending(self): def pending(self):
"""Return whether there is any pending partition.""" """Return whether there is any pending partition."""
return len(self.partition_list) or len(self.new_partition_list) return len(self.partition_dict) or len(self.new_partition_dict)
def setCriticalTID(self, packet, tid): def setCriticalTID(self, packet, tid):
"""This is a callback from OperationEventHandler.""" """This is a callback from OperationEventHandler."""
...@@ -282,11 +282,12 @@ class Replicator(object): ...@@ -282,11 +282,12 @@ class Replicator(object):
partition_list = self.critical_tid_dict[msg_id] partition_list = self.critical_tid_dict[msg_id]
logging.debug('setting critical TID %s to %s', logging.debug('setting critical TID %s to %s',
dump(tid), dump(tid),
', '.join([str(p.getRID()) for p in partition_list])) ', '.join([str(p.getRID()) for p in partition_list]))
for partition in self.critical_tid_dict[msg_id]: for partition in self.critical_tid_dict[msg_id]:
partition.setCriticalTID(tid) partition.setCriticalTID(tid)
del self.critical_tid_dict[msg_id] del self.critical_tid_dict[msg_id]
except KeyError: except KeyError:
logging.infor("setCriticalTID raised KeyError for msg_id %s" %(msg_id,))
pass pass
def _askCriticalTID(self): def _askCriticalTID(self):
...@@ -294,9 +295,9 @@ class Replicator(object): ...@@ -294,9 +295,9 @@ class Replicator(object):
msg_id = conn.getNextId() msg_id = conn.getNextId()
conn.addPacket(Packet().askLastIDs(msg_id)) conn.addPacket(Packet().askLastIDs(msg_id))
conn.expectMessage(msg_id) conn.expectMessage(msg_id)
self.critical_tid_dict[msg_id] = self.new_partition_list self.critical_tid_dict[msg_id] = self.new_partition_dict.values()
self.partition_list.extend(self.new_partition_list) self.partition_dict.update(self.new_partition_dict)
self.new_partition_list = [] self.new_partition_dict = {}
def setUnfinishedTIDList(self, tid_list): def setUnfinishedTIDList(self, tid_list):
"""This is a callback from OperationEventHandler.""" """This is a callback from OperationEventHandler."""
...@@ -321,7 +322,7 @@ class Replicator(object): ...@@ -321,7 +322,7 @@ class Replicator(object):
node_list = [cell.getNode() for cell in cell_list node_list = [cell.getNode() for cell in cell_list
if cell.getNodeState() == RUNNING_STATE] if cell.getNodeState() == RUNNING_STATE]
node = choice(node_list) node = choice(node_list)
except: except IndexError:
# Not operational. # Not operational.
logging.error('not operational', exc_info = 1) logging.error('not operational', exc_info = 1)
self.current_partition = None self.current_partition = None
...@@ -360,7 +361,7 @@ class Replicator(object): ...@@ -360,7 +361,7 @@ class Replicator(object):
def _finishReplication(self): def _finishReplication(self):
app = self.app app = self.app
try: try:
self.partition_list.remove(self.current_partition) self.partition_dict.pop(self.current_partition.getRID())
# Notify to a primary master node that my cell is now up-to-date. # Notify to a primary master node that my cell is now up-to-date.
conn = self.primary_master_connection conn = self.primary_master_connection
p = Packet() p = Packet()
...@@ -377,9 +378,9 @@ class Replicator(object): ...@@ -377,9 +378,9 @@ class Replicator(object):
def act(self): def act(self):
# If the new partition list is not empty, I must ask a critical # If the new partition list is not empty, I must ask a critical
# TID to a primary master node. # TID to a primary master node.
if self.new_partition_list: if self.new_partition_dict:
self._askCriticalTID() self._askCriticalTID()
if self.current_partition is None: if self.current_partition is None:
# I need to choose something. # I need to choose something.
if self.waiting_for_unfinished_tids: if self.waiting_for_unfinished_tids:
...@@ -388,7 +389,7 @@ class Replicator(object): ...@@ -388,7 +389,7 @@ class Replicator(object):
return return
elif self.unfinished_tid_list is not None: elif self.unfinished_tid_list is not None:
# Try to select something. # Try to select something.
for partition in self.partition_list: for partition in self.partition_dict.values():
if partition.safe(self.unfinished_tid_list): if partition.safe(self.unfinished_tid_list):
self.current_partition = partition self.current_partition = partition
self.unfinished_tid_list = None self.unfinished_tid_list = None
...@@ -406,23 +407,23 @@ class Replicator(object): ...@@ -406,23 +407,23 @@ class Replicator(object):
self._askUnfinishedTIDs() self._askUnfinishedTIDs()
else: else:
if self.replication_done: if self.replication_done:
logging.info('replication is done') logging.info('replication is done for %s' %(self.current_partition.getRID(),))
self._finishReplication() self._finishReplication()
def removePartition(self, rid): def removePartition(self, rid):
"""This is a callback from OperationEventHandler.""" """This is a callback from OperationEventHandler."""
try: try:
self.partition_list.remove(rid) self.partition_dict.pop(rid)
except ValueError: except KeyError:
pass pass
try: try:
self.new_partition_list.remove(rid) self.new_partition_dict.pop(rid)
except ValueError: except KeyError:
pass pass
def addPartition(self, rid): def addPartition(self, rid):
"""This is a callback from OperationEventHandler.""" """This is a callback from OperationEventHandler."""
if rid not in self.partition_list \ if not self.partition_dict.has_key(rid) \
and rid not in self.new_partition_list: and not self.new_partition_dict.has_key(rid):
self.new_partition_list.append(Partition(rid)) self.new_partition_dict[rid] = Partition(rid)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment