Commit 086b597d authored by Yoshinori Okuji's avatar Yoshinori Okuji

More enhancement in the partition table.

git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@30 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 275a31fe
...@@ -411,13 +411,33 @@ class Application(object): ...@@ -411,13 +411,33 @@ class Application(object):
"""Verify the data in storage nodes and clean them up, if necessary.""" """Verify the data in storage nodes and clean them up, if necessary."""
logging.info('start to verify data') logging.info('start to verify data')
em = self.em
nm = self.nm
# First, send the current partition table to storage nodes, so that # First, send the current partition table to storage nodes, so that
# all nodes share the same view. # all nodes share the same view.
for conn in em.getConnectionList():
uuid = conn.getUUID()
if uuid is not None:
node = nm.getNodeByUUID(uuid)
if isinstance(node, StorageNode) and node.getState() == RUNNING_STATE:
# Split the packet if too huge.
p = Packet()
row_list = []
for offset in xrange(self.num_partitions):
row_list.append((offset, self.pt.getRow(offset)))
if len(row_list) == 1000:
p.sendPartitionTable(self.lptid, row_list)
conn.addPacket(p)
del row_list[:]
if len(row_list) != 0:
p.sendPartitionTable(self.lptid, row_list)
conn.addPacket(p)
# FIXME # Secondly, tweak the partition table, if the distribution of storage nodes
# is not uniform.
# Secondly, gather all unfinished transactions. # Secondly, gather all unfinished transactions.
# FIXME # FIXME
# Thirdly, finish or abort unfinished transactions. # Thirdly, finish or abort unfinished transactions.
......
...@@ -279,7 +279,7 @@ class Packet(object): ...@@ -279,7 +279,7 @@ class Packet(object):
self._body = ''.join(body) self._body = ''.join(body)
return self return self
def sendPartitionTable(self, msg_id, ptid, offset_list, row_list): def sendPartitionTable(self, msg_id, ptid, row_list):
self._id = msg_id self._id = msg_id
self._type = SEND_PARTITION_TABLE self._type = SEND_PARTITION_TABLE
body = [pack('!8sL', ptid, len(row_list))] body = [pack('!8sL', ptid, len(row_list))]
......
...@@ -23,6 +23,9 @@ class Cell(object): ...@@ -23,6 +23,9 @@ class Cell(object):
"""This is a short hand.""" """This is a short hand."""
return self.node.getState() return self.node.getState()
def getUUID(self):
return self.node.getUUID()
class PartitionTable(object): class PartitionTable(object):
"""This class manages a partition table.""" """This class manages a partition table."""
...@@ -31,11 +34,13 @@ class PartitionTable(object): ...@@ -31,11 +34,13 @@ class PartitionTable(object):
self.nr = num_replicas self.nr = num_replicas
self.num_filled_rows = 0 self.num_filled_rows = 0
self.partition_list = [None] * num_partitions self.partition_list = [None] * num_partitions
self.count_dict = {}
def clear(self): def clear(self):
"""Forget an existing partition table.""" """Forget an existing partition table."""
self.num_filled_rows = 0 self.num_filled_rows = 0
self.partition_list = [None] * self.np self.partition_list = [None] * self.np
self.count_dict.clear()
def make(self, node_list): def make(self, node_list):
"""Make a new partition table from scratch.""" """Make a new partition table from scratch."""
...@@ -54,9 +59,11 @@ class PartitionTable(object): ...@@ -54,9 +59,11 @@ class PartitionTable(object):
for offset in xrange(self.np): for offset in xrange(self.np):
row = [] row = []
for i in xrange(repeats): for i in xrange(repeats):
row.append(Cell(node_list[index])) node = node_list[index]
row.append(Cell(node))
self.count_dict.setdefault(node, 0) += 1
index += 1 index += 1
if index == len(uuid_list): if index == len(node_list):
index = 0 index = 0
self.partition_list[offset] = row self.partition_list[offset] = row
...@@ -70,7 +77,9 @@ class PartitionTable(object): ...@@ -70,7 +77,9 @@ class PartitionTable(object):
if row is None: if row is None:
# Create a new row. # Create a new row.
row = [Cell(node, state)] row = [Cell(node, state)]
self.partition_list[offset] if state != FEEDING_STATE:
self.count_dict.setdefault(node, 0) += 1
self.partition_list[offset] = row
self.num_filled_rows += 1 self.num_filled_rows += 1
else: else:
...@@ -79,8 +88,12 @@ class PartitionTable(object): ...@@ -79,8 +88,12 @@ class PartitionTable(object):
for cell in row: for cell in row:
if cell.getNode() == node: if cell.getNode() == node:
row.remove(cell) row.remove(cell)
if state != FEEDING_STATE:
self.count_dict.setdefault(node, 0) -= 1
break break
row.append(Cell(node, state)) row.append(Cell(node, state))
if state != FEEDING_STATE:
self.count_dict.setdefault(node, 0) += 1
def filled(self): def filled(self):
return self.num_filled_rows == self.np return self.num_filled_rows == self.np
...@@ -97,7 +110,7 @@ class PartitionTable(object): ...@@ -97,7 +110,7 @@ class PartitionTable(object):
# a node state, and record which rows are ready. # a node state, and record which rows are ready.
for row in self.partition_list: for row in self.partition_list:
for cell in row: for cell in row:
if cell.getState() == UP_TO_DATE_STATE \ if cell.getState() in (UP_TO_DATE_STATE, FEEDING_STATE) \
and cell.getNodeState() == RUNNING_STATE: and cell.getNodeState() == RUNNING_STATE:
break break
else: else:
...@@ -105,6 +118,17 @@ class PartitionTable(object): ...@@ -105,6 +118,17 @@ class PartitionTable(object):
return True return True
def findLeastUsedNode(self, excluded_node_list = ()):
min_count = self.np + 1
min_node = None
for node, count in self.count_dict.iteritems():
if min_count > count \
and node not in excluded_node_list \
and node.getState() == RUNNING_STATE:
min_node = node
min_count = count
return min_node
def dropNode(self, node): def dropNode(self, node):
cell_list = [] cell_list = []
uuid = node.getUUID() uuid = node.getUUID()
...@@ -114,6 +138,96 @@ class PartitionTable(object): ...@@ -114,6 +138,96 @@ class PartitionTable(object):
if cell.getNode() == node: if cell.getNode() == node:
row.remove(cell) row.remove(cell)
cell_list.append((offset, uuid, DISCARDED_STATE)) cell_list.append((offset, uuid, DISCARDED_STATE))
node = self.findLeastUsedNode()
if node is not None:
row.append(Cell(node, OUT_OF_DATE_STATE))
cell_list.append((offset, node.getUUID(), OUT_OF_DATE_STATE))
break break
del self.count_dict[node]
return cell_list return cell_list
def getRow(self, offset):
row = self.partition_list[offset]
if row is None:
return ()
return [(cell.getUUID(), cell.getState()) for cell in row]
def tweak(self):
"""Test if nodes are distributed uniformly. Otherwise, correct the partition
table."""
changed_cell_list = []
for offset, row in enumerate(self.partition_list):
removed_cell_list = []
feeding_cell = None
out_of_date_cell_present = False
out_of_date_cell_list = []
up_to_date_cell_list = []
for cell in row:
if cell.getNodeState() == BROKEN_STATE:
# Remove a broken cell.
removed_cell_list.append(cell)
elif cell.getState() == FEEDING_STATE:
if feeding_cell is None:
feeding_cell = cell
else:
# Remove an excessive feeding cell.
removed_cell_list.append(cell)
elif cell.getState() == OUT_OF_DATE_STATE:
out_of_date_cell_list.append(cell)
else:
up_to_date_cell_list.append(cell)
# If all cells are up-to-date, a feeding cell is not required.
if len(out_of_date_cell_list) == 0 and feeding_cell is not None:
removed_cell_list.append(feeding_cell)
ideal_num = self.nr
while len(out_of_date_cell_list) + len(up_to_date_cell_list) > ideal_num:
# This row contains too many cells.
if len(up_to_date_cell_list) > 1:
# There are multiple up-to-date cells, so choose whatever
# used too much.
cell_list = out_of_date_cell_list + up_to_date_cell_list
else:
# Drop an out-of-date cell.
cell_list = out_of_date_cell_list
max_count = 0
chosen_cell = None
for cell in out_of_date_cell_list + up_to_date_cell_list:
count = self.count_dict[cell.getNode()]
if max_count < count:
max_count = count
chosen_cell = cell
removed_cell_list.append(chosen_cell)
ideal_num -= 1
# Now remove cells really.
for cell in removed_cell_list:
row.remove(cell)
if cell.getState() != FEEDING_STATE:
self.count_dict[cell.getNode()] -= 1
changed_cell_list.append((offset, cell.getUUID(), DISCARDED_STATE))
# Add cells, if a row contains less than the number of replicas.
for offset, row in enumerate(self.partition_list):
num_cells = 0
for cell in row:
if cell.getState() != FEEDING_STATE:
num_cells += 1
while num_cells < self.nr:
node = self.findLeastUsedNode([cell.getNode() for cell in row])
if node is None:
break
row.append(Cell(node, OUT_OF_DATE_STATE))
changed_cell_list.append((offset, node.getUUID(), OUT_OF_DATE_STATE))
self.count_dict[node] += 1
num_cells += 1
# FIXME still not enough. It is necessary to check if it is possible
# to reduce differences between frequently used nodes and rarely used
# nodes by replacing cells.
return changed_cell_list
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment