Commit e6193579 authored by Grégory Wisniewski's avatar Grégory Wisniewski

Add master's recovery manager.

git-svn-id: https://svn.erp5.org/repos/neo/trunk@1592 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 8ffcab59
...@@ -27,17 +27,16 @@ from neo.node import NodeManager ...@@ -27,17 +27,16 @@ from neo.node import NodeManager
from neo.event import EventManager from neo.event import EventManager
from neo.connection import ListeningConnection, ClientConnection from neo.connection import ListeningConnection, ClientConnection
from neo.exception import ElectionFailure, PrimaryFailure, OperationFailure from neo.exception import ElectionFailure, PrimaryFailure, OperationFailure
from neo.master.handlers import election, identification, secondary, recovery from neo.master.handlers import election, identification, secondary
from neo.master.handlers import storage, client, shutdown from neo.master.handlers import storage, client, shutdown
from neo.master.handlers import administration from neo.master.handlers import administration
from neo.master.pt import PartitionTable from neo.master.pt import PartitionTable
from neo.master.transactions import TransactionManager from neo.master.transactions import TransactionManager
from neo.master.verification import VerificationManager from neo.master.verification import VerificationManager
from neo.master.recovery import RecoveryManager
from neo.util import dump from neo.util import dump
from neo.connector import getConnectorHandler from neo.connector import getConnectorHandler
REQUIRED_NODE_NUMBER = 1
class Application(object): class Application(object):
"""The master node application.""" """The master node application."""
...@@ -74,6 +73,7 @@ class Application(object): ...@@ -74,6 +73,7 @@ class Application(object):
self.primary = None self.primary = None
self.primary_master_node = None self.primary_master_node = None
self.cluster_state = None self.cluster_state = None
self._startup_allowed = False
# Generate an UUID for self # Generate an UUID for self
uuid = config.getUUID() uuid = config.getUUID()
...@@ -83,15 +83,12 @@ class Application(object): ...@@ -83,15 +83,12 @@ class Application(object):
# The last OID. # The last OID.
self.loid = None self.loid = None
# The target node's uuid to request next.
self.target_uuid = None
# election related data # election related data
self.unconnected_master_node_set = set() self.unconnected_master_node_set = set()
self.negotiating_master_node_set = set() self.negotiating_master_node_set = set()
self._current_manager = None self._current_manager = None
self._startup_allowed = False
def run(self): def run(self):
"""Make sure that the status is sane and start a loop.""" """Make sure that the status is sane and start a loop."""
...@@ -328,61 +325,6 @@ class Application(object): ...@@ -328,61 +325,6 @@ class Application(object):
if node is not None and node.isStorage(): if node is not None and node.isStorage():
conn.notify(packet) conn.notify(packet)
def buildFromScratch(self):
nm, em, pt = self.nm, self.em, self.pt
logging.debug('creating a new partition table, wait for a storage node')
# wait for some empty storage nodes, their are accepted
while len(nm.getStorageList()) < REQUIRED_NODE_NUMBER:
em.poll(1)
# take the first node available
node_list = nm.getStorageList()[:REQUIRED_NODE_NUMBER]
for node in node_list:
node.setRunning()
self.broadcastNodesInformation(node_list)
# resert IDs generators
self.loid = '\0' * 8
# build the partition with this node
pt.setID(pack('!Q', 1))
pt.make(node_list)
def recoverStatus(self):
"""
Recover the status about the cluster. Obtain the last OID, the last
TID, and the last Partition Table ID from storage nodes, then get
back the latest partition table or make a new table from scratch,
if this is the first time.
"""
logging.info('begin the recovery of the status')
self.changeClusterState(ClusterStates.RECOVERING)
em = self.em
self.loid = None
self.pt.setID(None)
self.target_uuid = None
# collect the last partition table available
while not self._startup_allowed:
em.poll(1)
logging.info('startup allowed')
# build a new partition table
if self.pt.getID() is None:
self.buildFromScratch()
# collect node that are connected but not in the selected partition
# table and set them in pending state
allowed_node_set = set(self.pt.getNodeList())
refused_node_set = set(self.nm.getStorageList()) - allowed_node_set
for node in refused_node_set:
node.setPending()
self.broadcastNodesInformation(refused_node_set)
logging.debug('cluster starts with loid=%s and this partition table :',
dump(self.loid))
self.pt.log()
def provideService(self): def provideService(self):
""" """
This is the normal mode for a primary master node. Handle transactions This is the normal mode for a primary master node. Handle transactions
...@@ -438,7 +380,6 @@ class Application(object): ...@@ -438,7 +380,6 @@ class Application(object):
assert node.isMaster() assert node.isMaster()
conn.setHandler(handler) conn.setHandler(handler)
# If I know any storage node, make sure that they are not in the # If I know any storage node, make sure that they are not in the
# running state, because they are not connected at this stage. # running state, because they are not connected at this stage.
for node in nm.getStorageList(): for node in nm.getStorageList():
...@@ -446,8 +387,7 @@ class Application(object): ...@@ -446,8 +387,7 @@ class Application(object):
node.setTemporarilyDown() node.setTemporarilyDown()
# recover the cluster status at startup # recover the cluster status at startup
self.recoverStatus() self.runManager(RecoveryManager)
while True: while True:
self.runManager(VerificationManager) self.runManager(VerificationManager)
self.provideService() self.provideService()
...@@ -509,9 +449,7 @@ class Application(object): ...@@ -509,9 +449,7 @@ class Application(object):
# select the storage handler # select the storage handler
client_handler = client.ClientServiceHandler(self) client_handler = client.ClientServiceHandler(self)
if state == ClusterStates.RECOVERING: if state == ClusterStates.RUNNING:
storage_handler = recovery.RecoveryHandler(self)
elif state == ClusterStates.RUNNING:
storage_handler = storage.StorageServiceHandler(self) storage_handler = storage.StorageServiceHandler(self)
elif self._current_manager is not None: elif self._current_manager is not None:
storage_handler = self._current_manager.getHandler() storage_handler = self._current_manager.getHandler()
...@@ -606,21 +544,7 @@ class Application(object): ...@@ -606,21 +544,7 @@ class Application(object):
def identifyStorageNode(self, uuid, node): def identifyStorageNode(self, uuid, node):
state = NodeStates.RUNNING state = NodeStates.RUNNING
handler = None handler = None
if self.cluster_state == ClusterStates.RECOVERING: if self.cluster_state == ClusterStates.RUNNING:
# accept storage nodes when recovery is over
if uuid is None and self._startup_allowed:
logging.info('reject empty storage node')
raise protocol.NotReadyError
handler = recovery.RecoveryHandler(self)
elif self.cluster_state == ClusterStates.VERIFYING:
if uuid is None or node is None:
# if node is unknown, it has been forget when the current
# partition was validated by the admin
# Here the uuid is not cleared to allow lookup pending nodes by
# uuid from the test framework. It's safe since nodes with a
# conflicting UUID are rejected in the identification handler.
state = NodeStates.PENDING
elif self.cluster_state == ClusterStates.RUNNING:
if uuid is None or node is None: if uuid is None or node is None:
# same as for verification # same as for verification
state = NodeStates.PENDING state = NodeStates.PENDING
......
...@@ -15,14 +15,91 @@ ...@@ -15,14 +15,91 @@
# along with this program; if not, write to the Free Software # along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
from neo import logging from struct import pack
from neo.protocol import Packets, ProtocolError from neo import logging
from neo.master.handlers import MasterHandler
from neo.util import dump from neo.util import dump
from neo.protocol import Packets, ProtocolError, ClusterStates, NodeStates
from neo.protocol import NotReadyError
from neo.master.handlers import MasterHandler
REQUIRED_NODE_NUMBER = 1
class RecoveryManager(MasterHandler):
"""
Manage the cluster recovery
"""
def __init__(self, app):
super(RecoveryManager, self).__init__(app)
# The target node's uuid to request next.
self.target_uuid = None
def getHandler(self):
return self
def identifyStorageNode(self, uuid, node):
"""
Returns the handler for storage nodes
"""
if uuid is None and self.app._startup_allowed:
logging.info('reject empty storage node')
raise NotReadyError
return (uuid, NodeStates.RUNNING, self)
def run(self):
"""
Recover the status about the cluster. Obtain the last OID, the last
TID, and the last Partition Table ID from storage nodes, then get
back the latest partition table or make a new table from scratch,
if this is the first time.
"""
logging.info('begin the recovery of the status')
self.app.changeClusterState(ClusterStates.RECOVERING)
em = self.app.em
self.app.loid = None
self.app.pt.setID(None)
# collect the last partition table available
while not self.app._startup_allowed:
em.poll(1)
logging.info('startup allowed')
# build a new partition table
if self.app.pt.getID() is None:
self.buildFromScratch()
# collect node that are connected but not in the selected partition
# table and set them in pending state
allowed_node_set = set(self.app.pt.getNodeList())
refused_node_set = set(self.app.nm.getStorageList()) - allowed_node_set
for node in refused_node_set:
node.setPending()
self.app.broadcastNodesInformation(refused_node_set)
logging.debug('cluster starts with loid=%s and this partition table :',
dump(self.app.loid))
self.app.pt.log()
class RecoveryHandler(MasterHandler): def buildFromScratch(self):
"""This class deals with events for a recovery phase.""" nm, em, pt = self.app.nm, self.app.em, self.app.pt
logging.debug('creating a new partition table, wait for a storage node')
# wait for some empty storage nodes, their are accepted
while len(nm.getStorageList()) < REQUIRED_NODE_NUMBER:
em.poll(1)
# take the first node available
node_list = nm.getStorageList()[:REQUIRED_NODE_NUMBER]
for node in node_list:
node.setRunning()
self.app.broadcastNodesInformation(node_list)
# resert IDs generators
self.app.loid = '\0' * 8
# build the partition with this node
pt.setID(pack('!Q', 1))
pt.make(node_list)
def connectionLost(self, conn, new_state): def connectionLost(self, conn, new_state):
node = self.app.nm.getByUUID(conn.getUUID()) node = self.app.nm.getByUUID(conn.getUUID())
...@@ -46,17 +123,17 @@ class RecoveryHandler(MasterHandler): ...@@ -46,17 +123,17 @@ class RecoveryHandler(MasterHandler):
self.app.tm.setLastTID(ltid) self.app.tm.setLastTID(ltid)
if lptid > pt.getID(): if lptid > pt.getID():
# something newer # something newer
app.target_uuid = conn.getUUID() self.target_uuid = conn.getUUID()
app.pt.setID(lptid) app.pt.setID(lptid)
conn.ask(Packets.AskPartitionTable([])) conn.ask(Packets.AskPartitionTable([]))
def answerPartitionTable(self, conn, ptid, row_list): def answerPartitionTable(self, conn, ptid, row_list):
uuid = conn.getUUID() uuid = conn.getUUID()
app = self.app app = self.app
if uuid != app.target_uuid: if uuid != self.target_uuid:
# If this is not from a target node, ignore it. # If this is not from a target node, ignore it.
logging.warn('got answer partition table from %s while waiting ' \ logging.warn('got answer partition table from %s while waiting ' \
'for %s', dump(uuid), dump(app.target_uuid)) 'for %s', dump(uuid), dump(self.target_uuid))
return return
# load unknown storage nodes # load unknown storage nodes
for offset, row in row_list: for offset, row in row_list:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment