Commit 059c6d6c authored by Grégory Wisniewski's avatar Grégory Wisniewski

Master use PartitionTable to store the number of partitions, replicas and the

current partition table ID. The next partition table ID is now computed by the
master's PartitionTable instance.


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@655 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 0e03e516
......@@ -46,14 +46,13 @@ class Application(object):
"""The master node application."""
def __init__(self, file, section):
config = ConfigurationManager(file, section)
self.connector_handler = getConnectorHandler(config.getConnector())
self.num_replicas = config.getReplicas()
self.num_partitions = config.getPartitions()
self.name = config.getName()
self.connector_handler = getConnectorHandler(config.getConnector())
logging.debug('the number of replicas is %d, the number of partitions is %d, the name is %s',
self.num_replicas, self.num_partitions, self.name)
if len(self.name) == 0:
raise RuntimeError, 'cluster name must be non-empty'
self.server = config.getServer()
logging.debug('IP address is %s, port is %d', *(self.server))
......@@ -65,7 +64,16 @@ class Application(object):
# Internal attributes.
self.em = EventManager()
self.nm = NodeManager()
self.pt = PartitionTable(self.num_partitions, self.num_replicas)
# Partition table
replicas, partitions = config.getReplicas(), config.getPartitions()
if replicas < 0:
raise RuntimeError, 'replicas must be a positive integer'
if partitions <= 0:
raise RuntimeError, 'partitions must be more than zero'
self.pt = PartitionTable(partitions, replicas)
logging.debug('the number of replicas is %d, the number of partitions is %d, the name is %s',
replicas, partitions, self.name)
self.primary = None
self.primary_master_node = None
......@@ -77,20 +85,11 @@ class Application(object):
self.loid = INVALID_OID
# The last TID.
self.ltid = INVALID_TID
# The last Partition Table ID.
self.lptid = INVALID_PTID
# The target node's uuid to request next.
self.target_uuid = None
def run(self):
"""Make sure that the status is sane and start a loop."""
if self.num_replicas < 0:
raise RuntimeError, 'replicas must be a positive integer'
if self.num_partitions <= 0:
raise RuntimeError, 'partitions must be more than zero'
if len(self.name) == 0:
raise RuntimeError, 'cluster name must be non-empty'
for server in self.master_node_list:
self.nm.add(MasterNode(server = server))
......@@ -325,14 +324,14 @@ class Application(object):
def sendPartitionTable(self, conn):
""" Send the partition table through the given connection """
row_list = []
for offset in xrange(self.num_partitions):
for offset in xrange(self.pt.getPartitions()):
row_list.append((offset, self.pt.getRow(offset)))
# Split the packet if too huge.
if len(row_list) == 1000:
conn.notify(protocol.sendPartitionTable( self.lptid, row_list))
conn.notify(protocol.sendPartitionTable( self.pt.getID(), row_list))
del row_list[:]
if row_list:
conn.notify(protocol.sendPartitionTable(self.lptid, row_list))
conn.notify(protocol.sendPartitionTable(self.pt.getID(), row_list))
def sendNodesInformations(self, conn):
""" Send informations on all nodes through the given connection """
......@@ -368,12 +367,12 @@ class Application(object):
self.loid = INVALID_OID
self.ltid = INVALID_TID
self.lptid = INVALID_PTID
self.pt.setID(INVALID_PTID)
while 1:
self.target_uuid = None
self.pt.clear()
if self.lptid != INVALID_PTID:
if self.pt.getID() != INVALID_PTID:
# I need to retrieve last ids again.
logging.info('resending Ask Last IDs')
for conn in em.getConnectionList():
......@@ -395,7 +394,7 @@ class Application(object):
em.poll(1)
# Now I have at least one to ask.
prev_lptid = self.lptid
prev_lptid = self.pt.getID()
node = nm.getNodeByUUID(self.target_uuid)
if node is None or node.getState() != RUNNING_STATE:
# Weird. It's dead.
......@@ -410,17 +409,17 @@ class Application(object):
logging.info('no connection to the target storage node')
continue
if self.lptid == INVALID_PTID:
if self.pt.getID() == INVALID_PTID:
# This looks like the first time. So make a fresh table.
logging.debug('creating a new partition table')
self.lptid = pack('!Q', 1) # ptid != INVALID_PTID
self.pt.setID(pack('!Q', 1)) # ptid != INVALID_PTID
self.pt.make(nm.getStorageNodeList())
else:
# Obtain a partition table. It is necessary to split this
# message, because the packet size can be huge.
logging.debug('asking a partition table to %s', node)
start = 0
size = self.num_partitions
size = self.pt.getPartitions()
while size:
amt = min(1000, size)
conn.ask(protocol.askPartitionTable(range(start, start + amt)))
......@@ -436,10 +435,10 @@ class Application(object):
if self.pt.filled() or t + 30 < time():
break
if self.lptid != prev_lptid or not self.pt.filled():
if self.pt.getID() != prev_lptid or not self.pt.filled():
# I got something newer or the target is dead.
logging.debug('self.lptid = %s, prev_lptid = %s',
dump(self.lptid), dump(prev_lptid))
logging.debug('lptid = %s, prev_lptid = %s',
dump(self.pt.getID()), dump(prev_lptid))
self.pt.log()
continue
......@@ -451,10 +450,10 @@ class Application(object):
em.poll(1)
if self.pt.operational():
break
if self.lptid != prev_lptid:
if self.pt.getID() != prev_lptid:
break
if self.lptid != prev_lptid:
if self.pt.getID() != prev_lptid:
# I got something newer.
continue
break
......@@ -625,8 +624,7 @@ class Application(object):
# If anything changed, send the changes.
if cell_list:
self.broadcastPartitionChanges(self.getNextPartitionTableID(),
cell_list)
self.broadcastPartitionChanges(self.pt.setNextID(), cell_list)
def provideService(self):
"""This is the normal mode for a primary master node. Handle transactions
......@@ -673,8 +671,7 @@ class Application(object):
node.setState(DOWN_STATE)
self.broadcastNodeInformation(node)
cell_list = self.pt.dropNode(node)
ptid = self.getNextPartitionTableID()
self.broadcastPartitionChanges(ptid, cell_list)
self.broadcastPartitionChanges(self.pt.setNextID(), cell_list)
if not self.pt.operational():
# Catastrophic.
raise OperationFailure, 'cannot continue operation'
......@@ -734,14 +731,6 @@ class Application(object):
while 1:
em.poll(1)
def getNextPartitionTableID(self):
if self.lptid == INVALID_PTID:
raise RuntimeError, 'I do not know the last Partition Table ID'
ptid = unpack('!Q', self.lptid)[0]
self.lptid = pack('!Q', ptid + 1)
return self.lptid
def getNextOID(self):
if self.loid is None:
raise RuntimeError, 'I do not know the last OID'
......@@ -775,7 +764,7 @@ class Application(object):
return tid
def getPartition(self, oid_or_tid):
return unpack('!Q', oid_or_tid)[0] % self.num_partitions
return unpack('!Q', oid_or_tid)[0] % self.pt.getPartitions()
def getNewOIDList(self, num_oids):
return [self.getNextOID() for i in xrange(num_oids)]
......
......@@ -199,10 +199,9 @@ class ElectionEventHandler(MasterEventHandler):
node.setUUID(uuid)
conn.setUUID(uuid)
p = protocol.acceptNodeIdentification(MASTER_NODE_TYPE,
app.uuid, app.server[0], app.server[1],
app.num_partitions, app.num_replicas,
uuid)
p = protocol.acceptNodeIdentification(MASTER_NODE_TYPE, app.uuid,
app.server[0], app.server[1], app.pt.getPartitions(),
app.pt.getReplicas(), uuid)
# Next, the peer should ask a primary master node.
conn.answer(p, packet)
......
......@@ -18,12 +18,24 @@
import logging
import neo.pt
from neo import protocol
from struct import pack, unpack
from neo.protocol import OUT_OF_DATE_STATE, FEEDING_STATE, \
DISCARDED_STATE, RUNNING_STATE, BROKEN_STATE
class PartitionTable(neo.pt.PartitionTable):
"""This class manages a partition table for the primary master node"""
def setID(self, id):
self.id = id
def setNextID(self):
if self.id == INVALID_PTID:
raise RuntimeError, 'I do not know the last Partition Table ID'
last_id = unpack('!Q', self.id)[0]
self.id = pack('!Q', last_id + 1)
return self.id
def make(self, node_list):
"""Make a new partition table from scratch."""
# First, filter the list of nodes.
......
......@@ -165,7 +165,7 @@ class RecoveryEventHandler(MasterEventHandler):
p = protocol.acceptNodeIdentification(MASTER_NODE_TYPE,
app.uuid, app.server[0], app.server[1],
app.num_partitions, app.num_replicas, uuid)
app.pt.getPartitions(), app.pt.getReplicas(), uuid)
# Next, the peer should ask a primary master node.
conn.answer(p, packet)
......@@ -187,10 +187,10 @@ class RecoveryEventHandler(MasterEventHandler):
node = app.nm.getNodeByUUID(uuid)
if node.getNodeType() == STORAGE_NODE_TYPE:
conn.ask(protocol.askLastIDs())
elif node.getNodeType() == ADMIN_NODE_TYPE and app.lptid != INVALID_PTID:
elif node.getNodeType() == ADMIN_NODE_TYPE and app.pt.getID() != INVALID_PTID:
# send partition table if exists
logging.info('sending partition table %s to %s' % (dump(app.lptid),
conn.getAddress()))
logging.info('sending partition table %s to %s' %
(dump(app.pt.getID()), conn.getAddress()))
app.sendPartitionTable(conn)
@identification_required
......@@ -268,11 +268,11 @@ class RecoveryEventHandler(MasterEventHandler):
app.loid = loid
if app.ltid < ltid:
app.ltid = ltid
if app.lptid == INVALID_PTID or app.lptid < lptid:
app.lptid = lptid
if app.pt.getID() == INVALID_PTID or app.pt.getID() < lptid:
app.pt.setID(lptid)
# I need to use the node which has the max Partition Table ID.
app.target_uuid = uuid
elif app.lptid == lptid and app.target_uuid is None:
elif app.pt.getID() == lptid and app.target_uuid is None:
app.target_uuid = uuid
@identification_required
......@@ -288,7 +288,7 @@ class RecoveryEventHandler(MasterEventHandler):
return
for offset, cell_list in row_list:
if offset >= app.num_partitions or app.pt.hasOffset(offset):
if offset >= app.pt.getPartitions() or app.pt.hasOffset(offset):
# There must be something wrong.
raise UnexpectedPacketError
......
......@@ -80,7 +80,7 @@ class SecondaryEventHandler(MasterEventHandler):
p = protocol.acceptNodeIdentification(MASTER_NODE_TYPE,
app.uuid, app.server[0], app.server[1],
app.num_partitions, app.num_replicas,
app.pt.getPartitions(), app.pt.getReplicas(),
uuid)
# Next, the peer should ask a primary master node.
conn.answer(p, packet)
......
......@@ -270,7 +270,7 @@ class ServiceEventHandler(MasterEventHandler):
p = protocol.acceptNodeIdentification(MASTER_NODE_TYPE,
app.uuid, app.server[0], app.server[1],
app.num_partitions, app.num_replicas, uuid)
app.pt.getPartitions(), app.pt.getReplicas(), uuid)
# Next, the peer should ask a primary master node.
conn.answer(p, packet)
......@@ -385,7 +385,7 @@ class ServiceEventHandler(MasterEventHandler):
app = self.app
node = app.nm.getNodeByUUID(uuid)
# If I get a bigger value here, it is dangerous.
if app.loid < loid or app.ltid < ltid or app.lptid < lptid:
if app.loid < loid or app.ltid < ltid or app.pt.getID() < lptid:
logging.critical('got later information in service')
raise OperationFailure
......@@ -556,7 +556,7 @@ class ServiceEventHandler(MasterEventHandler):
break
if new_cell_list:
ptid = app.getNextPartitionTableID()
ptid = app.pt.setNextID()
app.broadcastPartitionChanges(ptid, new_cell_list)
......
......@@ -188,7 +188,7 @@ class VerificationEventHandler(MasterEventHandler):
p = protocol.acceptNodeIdentification(MASTER_NODE_TYPE,
app.uuid, app.server[0], app.server[1],
app.num_partitions, app.num_replicas, uuid)
app.pt.getPartitions(), app.pt.getReplicas(), uuid)
# Next, the peer should ask a primary master node.
conn.answer(p, packet)
......
......@@ -17,6 +17,7 @@
import logging
from neo import protocol
from neo.protocol import UP_TO_DATE_STATE, OUT_OF_DATE_STATE, FEEDING_STATE, \
DISCARDED_STATE, RUNNING_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE, \
BROKEN_STATE, VALID_CELL_STATE_LIST, HIDDEN_STATE, PENDING_STATE
......@@ -56,17 +57,21 @@ class PartitionTable(object):
"""This class manages a partition table."""
def __init__(self, num_partitions, num_replicas):
self.id = protocol.INVALID_PTID
self.np = num_partitions
self.nr = num_replicas
self.num_filled_rows = 0
self.partition_list = [[] for x in xrange(num_partitions)]
self.count_dict = {}
def getID(self):
return self.id
def getPartitions(self):
return self.num_partitions
return self.np
def getReplicas(self):
return self.num_replicas
return self.nr
def clear(self):
"""Forget an existing partition table."""
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment