Commit 09eb146e authored by Grégory Wisniewski's avatar Grégory Wisniewski

Second step for manual startup implementation. Leave the recovering state when

the setClusterState packet with is received from the admin node. If no partition
table/IDs were gathered before, wait for at least one storage node and use the
first to create the initial partition table.
A timeout is left to act as it was done before, to preserve automatic startup in
tests, will be removed latter.


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@800 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 57265e7f
...@@ -54,16 +54,12 @@ class AdministrationEventHandler(MasterEventHandler): ...@@ -54,16 +54,12 @@ class AdministrationEventHandler(MasterEventHandler):
def handleSetClusterState(self, conn, packet, name, state): def handleSetClusterState(self, conn, packet, name, state):
self.checkClusterName(name) self.checkClusterName(name)
if state == protocol.RUNNING: self.app.changeClusterState(state)
self.app.cluster_state = state p = protocol.noError('cluster state changed')
conn.answer(p, packet)
if state == protocol.STOPPING: if state == protocol.STOPPING:
self.app.cluster_state = state self.app.cluster_state = state
p = protocol.noError('cluster state changed')
conn.answer(p, packet)
self.app.shutdown() self.app.shutdown()
p = protocol.noError('cluster state changed')
conn.answer(p, packet)
def handleSetNodeState(self, conn, packet, uuid, state, modify_partition_table): def handleSetNodeState(self, conn, packet, uuid, state, modify_partition_table):
logging.info("set node state for %s-%s : %s" % (dump(uuid), state, modify_partition_table)) logging.info("set node state for %s-%s : %s" % (dump(uuid), state, modify_partition_table))
......
...@@ -356,6 +356,20 @@ class Application(object): ...@@ -356,6 +356,20 @@ class Application(object):
if node_list: if node_list:
conn.notify(protocol.notifyNodeInformation(node_list)) conn.notify(protocol.notifyNodeInformation(node_list))
def buildFromScratch(self):
nm, em, pt = self.nm, self.em, self.pt
logging.debug('creating a new partition table, wait for a storage node')
# wait for some empty storage nodes, their are accepted
while len(nm.getStorageNodeList()) == 0:
em.poll(1)
# take the first node available
node = nm.getStorageNodeList()[0]
node.setState(protocol.RUNNING_STATE)
self.broadcastNodeInformation(node)
# build the partition with this node
pt.setID(pack('!Q', 1))
pt.make([node])
def recoverStatus(self): def recoverStatus(self):
"""Recover the status about the cluster. Obtain the last OID, the last TID, """Recover the status about the cluster. Obtain the last OID, the last TID,
and the last Partition Table ID from storage nodes, then get back the latest and the last Partition Table ID from storage nodes, then get back the latest
...@@ -363,102 +377,26 @@ class Application(object): ...@@ -363,102 +377,26 @@ class Application(object):
logging.info('begin the recovery of the status') logging.info('begin the recovery of the status')
self.changeClusterState(protocol.RECOVERING) self.changeClusterState(protocol.RECOVERING)
em, nm = self.em, self.nm
em = self.em
nm = self.nm
self.loid = INVALID_OID self.loid = INVALID_OID
self.ltid = INVALID_TID self.ltid = INVALID_TID
self.pt.setID(INVALID_PTID) self.pt.setID(INVALID_PTID)
while 1: self.target_uuid = None
self.target_uuid = None
self.pt.clear()
if self.pt.getID() != INVALID_PTID:
# I need to retrieve last ids again.
logging.info('resending Ask Last IDs')
for conn in em.getConnectionList():
uuid = conn.getUUID()
if uuid is not None:
node = nm.getNodeByUUID(uuid)
if node.getNodeType() == STORAGE_NODE_TYPE \
and node.getState() == RUNNING_STATE:
conn.ask(protocol.askLastIDs())
# Wait for at least one storage node to appear.
while self.target_uuid is None:
em.poll(1)
# Wait a bit, 1 second is too short for the ZODB test running on a
# dedibox
t = time()
while time() < t + 5:
em.poll(1)
# Now I have at least one to ask.
prev_lptid = self.pt.getID()
node = nm.getNodeByUUID(self.target_uuid)
if node is None or node.getState() != RUNNING_STATE:
# Weird. It's dead.
logging.info('the target storage node is dead')
continue
for conn in em.getConnectionList():
if conn.getUUID() == self.target_uuid:
break
else:
# Why?
logging.info('no connection to the target storage node')
continue
if self.pt.getID() == INVALID_PTID:
# This looks like the first time. So make a fresh table.
logging.debug('creating a new partition table')
self.pt.setID(pack('!Q', 1)) # ptid != INVALID_PTID
self.pt.make(nm.getStorageNodeList())
else:
# Obtain a partition table. It is necessary to split this
# message, because the packet size can be huge.
logging.debug('asking a partition table to %s', node)
start = 0
size = self.pt.getPartitions()
while size:
amt = min(1000, size)
conn.ask(protocol.askPartitionTable(range(start, start + amt)))
size -= amt
start += amt
t = time()
while 1:
em.poll(1)
if node.getState() != RUNNING_STATE:
# Dead.
break
if self.pt.filled() or t + 30 < time():
break
if self.pt.getID() != prev_lptid or not self.pt.filled():
# I got something newer or the target is dead.
logging.debug('lptid = %s, prev_lptid = %s',
dump(self.pt.getID()), dump(prev_lptid))
self.pt.log()
continue
# Wait until the cluster gets operational or the Partition
# Table ID turns out to be not the latest.
logging.info('waiting for the cluster to be operational')
self.pt.log()
while 1:
em.poll(1)
if self.pt.operational():
break
if self.pt.getID() != prev_lptid:
break
if self.pt.getID() != prev_lptid: # collect the last partition table available
# I got something newer. start_time = time()
continue while self.cluster_state == protocol.RECOVERING:
break em.poll(1)
# FIXME: remove this timeout to force manual startup
if start_time + 5 <= time():
self.changeClusterState(protocol.VERIFYING)
logging.info('startup allowed')
if self.pt.getID() == INVALID_PTID:
self.buildFromScratch()
logging.info('cluster starts with this partition table :')
self.pt.log()
def verifyTransaction(self, tid): def verifyTransaction(self, tid):
em = self.em em = self.em
...@@ -526,15 +464,17 @@ class Application(object): ...@@ -526,15 +464,17 @@ class Application(object):
def verifyData(self): def verifyData(self):
"""Verify the data in storage nodes and clean them up, if necessary.""" """Verify the data in storage nodes and clean them up, if necessary."""
logging.info('start to verify data')
em, nm = self.em, self.nm em, nm = self.em, self.nm
self.changeClusterState(protocol.VERIFYING) self.changeClusterState(protocol.VERIFYING)
# wait for any missing node # wait for any missing node
logging.info('waiting for the cluster to be operational')
while not self.pt.operational(): while not self.pt.operational():
em.poll(1) em.poll(1)
logging.info('start to verify data')
# FIXME this part has a potential problem that the write buffers can # FIXME this part has a potential problem that the write buffers can
# be very huge. Thus it would be better to flush the buffers from time # be very huge. Thus it would be better to flush the buffers from time
# to time _without_ reading packets. # to time _without_ reading packets.
...@@ -854,8 +794,7 @@ class Application(object): ...@@ -854,8 +794,7 @@ class Application(object):
state = protocol.RUNNING_STATE state = protocol.RUNNING_STATE
handler = None handler = None
if self.cluster_state == protocol.RECOVERING: if self.cluster_state == protocol.RECOVERING:
# TODO: Enable empty node rejection when manual startup is ok : if uuid == protocol.INVALID_UUID:
if False and uuid == protocol.INVALID_UUID:
logging.info('reject empty storage node') logging.info('reject empty storage node')
raise protocol.NotReadyError raise protocol.NotReadyError
handler = RecoveryEventHandler handler = RecoveryEventHandler
......
...@@ -109,12 +109,7 @@ class RecoveryEventHandler(MasterEventHandler): ...@@ -109,12 +109,7 @@ class RecoveryEventHandler(MasterEventHandler):
app.broadcastNodeInformation(node) app.broadcastNodeInformation(node)
def handleAnswerLastIDs(self, conn, packet, loid, ltid, lptid): def handleAnswerLastIDs(self, conn, packet, loid, ltid, lptid):
uuid = conn.getUUID()
app = self.app app = self.app
node = app.nm.getNodeByUUID(uuid)
# If the target is still unknown, set it to this node for now.
if app.target_uuid is None:
app.target_uuid = uuid
# Get max values. # Get max values.
if app.loid < loid: if app.loid < loid:
...@@ -122,11 +117,11 @@ class RecoveryEventHandler(MasterEventHandler): ...@@ -122,11 +117,11 @@ class RecoveryEventHandler(MasterEventHandler):
if app.ltid < ltid: if app.ltid < ltid:
app.ltid = ltid app.ltid = ltid
if app.pt.getID() == INVALID_PTID or app.pt.getID() < lptid: if app.pt.getID() == INVALID_PTID or app.pt.getID() < lptid:
# something newer
app.pt.setID(lptid) app.pt.setID(lptid)
# I need to use the node which has the max Partition Table ID. app.target_uuid = conn.getUUID()
app.target_uuid = uuid app.pt.clear()
elif app.pt.getID() == lptid and app.target_uuid is None: conn.ask(protocol.askPartitionTable([]))
app.target_uuid = uuid
def handleAnswerPartitionTable(self, conn, packet, ptid, row_list): def handleAnswerPartitionTable(self, conn, packet, ptid, row_list):
uuid = conn.getUUID() uuid = conn.getUUID()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment