Commit a7630a68 authored by Grégory Wisniewski's avatar Grégory Wisniewski

Add 'VerificationManager' for master's application.

- Replace verification handler (statefull handler)
- Include 'verifyData', 'verifyTransation' and verification related attributes.
- Define 'runManager()' method for future purpose (with other managers).
- Include all 'VerificationFailure' exception related handling.
- Remove support of 'VERIFYING' cluster in 'changeClusterState' as the manager takes precedence.
- Move VerificationException in verification module.

git-svn-id: https://svn.erp5.org/repos/neo/trunk@1590 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 2b77af21
......@@ -24,9 +24,6 @@ class ElectionFailure(NeoException):
class PrimaryFailure(NeoException):
pass
class VerificationFailure(NeoException):
pass
class OperationFailure(NeoException):
pass
......
......@@ -26,13 +26,13 @@ from neo.protocol import ClusterStates, NodeStates, NodeTypes, Packets
from neo.node import NodeManager
from neo.event import EventManager
from neo.connection import ListeningConnection, ClientConnection
from neo.exception import ElectionFailure, PrimaryFailure, \
VerificationFailure, OperationFailure
from neo.exception import ElectionFailure, PrimaryFailure, OperationFailure
from neo.master.handlers import election, identification, secondary, recovery
from neo.master.handlers import verification, storage, client, shutdown
from neo.master.handlers import storage, client, shutdown
from neo.master.handlers import administration
from neo.master.pt import PartitionTable
from neo.master.transactions import TransactionManager
from neo.master.verification import VerificationManager
from neo.util import dump
from neo.connector import getConnectorHandler
......@@ -90,12 +90,8 @@ class Application(object):
self.unconnected_master_node_set = set()
self.negotiating_master_node_set = set()
# verification related data
self.unfinished_oid_set = set()
self.unfinished_tid_set = set()
self.asking_uuid_dict = {}
self.object_present = False
self._current_manager = None
self._startup_allowed = False
def run(self):
"""Make sure that the status is sane and start a loop."""
......@@ -366,7 +362,7 @@ class Application(object):
self.target_uuid = None
# collect the last partition table available
while self.cluster_state == ClusterStates.RECOVERING:
while not self._startup_allowed:
em.poll(1)
logging.info('startup allowed')
......@@ -387,132 +383,6 @@ class Application(object):
dump(self.loid))
self.pt.log()
def verifyTransaction(self, tid):
em = self.em
uuid_set = set()
# Determine to which nodes I should ask.
partition = self.pt.getPartition(tid)
transaction_uuid_list = [cell.getUUID() for cell \
in self.pt.getCellList(partition, readable=True)]
if len(transaction_uuid_list) == 0:
raise VerificationFailure
uuid_set.update(transaction_uuid_list)
# Gather OIDs.
self.asking_uuid_dict = {}
self.unfinished_oid_set = set()
for conn in em.getConnectionList():
uuid = conn.getUUID()
if uuid in transaction_uuid_list:
self.asking_uuid_dict[uuid] = False
conn.ask(Packets.AskTransactionInformation(tid))
if len(self.asking_uuid_dict) == 0:
raise VerificationFailure
while True:
em.poll(1)
if not self.pt.operational():
raise VerificationFailure
if False not in self.asking_uuid_dict.values():
break
if self.unfinished_oid_set is None or len(self.unfinished_oid_set) == 0:
# Not commitable.
return None
# Verify that all objects are present.
for oid in self.unfinished_oid_set:
self.asking_uuid_dict.clear()
partition = self.pt.getPartition(oid)
object_uuid_list = [cell.getUUID() for cell \
in self.pt.getCellList(partition, readable=True)]
if len(object_uuid_list) == 0:
raise VerificationFailure
uuid_set.update(object_uuid_list)
self.object_present = True
for conn in em.getConnectionList():
uuid = conn.getUUID()
if uuid in object_uuid_list:
self.asking_uuid_dict[uuid] = False
conn.ask(Packets.AskObjectPresent(oid, tid))
while True:
em.poll(1)
if not self.pt.operational():
raise VerificationFailure
if False not in self.asking_uuid_dict.values():
break
if not self.object_present:
# Not commitable.
return None
return uuid_set
def verifyData(self):
"""Verify the data in storage nodes and clean them up, if necessary."""
em, nm = self.em, self.nm
self.changeClusterState(ClusterStates.VERIFYING)
# wait for any missing node
logging.debug('waiting for the cluster to be operational')
while not self.pt.operational():
em.poll(1)
logging.info('start to verify data')
# Gather all unfinished transactions.
self.asking_uuid_dict = {}
self.unfinished_tid_set = set()
for conn in em.getConnectionList():
uuid = conn.getUUID()
if uuid is not None:
node = nm.getByUUID(uuid)
if node.isStorage():
self.asking_uuid_dict[uuid] = False
conn.ask(Packets.AskUnfinishedTransactions())
while True:
em.poll(1)
if not self.pt.operational():
raise VerificationFailure
if False not in self.asking_uuid_dict.values():
break
# Gather OIDs for each unfinished TID, and verify whether the transaction
# can be finished or must be aborted. This could be in parallel in theory,
# but not so easy. Thus do it one-by-one at the moment.
for tid in self.unfinished_tid_set:
uuid_set = self.verifyTransaction(tid)
if uuid_set is None:
# Make sure that no node has this transaction.
for conn in em.getConnectionList():
uuid = conn.getUUID()
if uuid is not None:
node = nm.getByUUID(uuid)
if node.isStorage():
conn.notify(Packets.DeleteTransaction(tid))
else:
for conn in em.getConnectionList():
uuid = conn.getUUID()
if uuid in uuid_set:
conn.ask(Packets.CommitTransaction(tid))
# If possible, send the packets now.
em.poll(0)
# At this stage, all non-working nodes are out-of-date.
cell_list = self.pt.outdate()
# Tweak the partition table, if the distribution of storage nodes
# is not uniform.
cell_list.extend(self.pt.tweak())
# If anything changed, send the changes.
self.broadcastPartitionChanges(cell_list)
def provideService(self):
"""
This is the normal mode for a primary master node. Handle transactions
......@@ -579,10 +449,7 @@ class Application(object):
self.recoverStatus()
while True:
try:
self.verifyData()
except VerificationFailure:
continue
self.runManager(VerificationManager)
self.provideService()
def playSecondaryRole(self):
......@@ -627,6 +494,11 @@ class Application(object):
while True:
self.em.poll(1)
def runManager(self, manager_klass):
self._current_manager = manager_klass(self)
self._current_manager.run()
self._current_manager = None
def changeClusterState(self, state):
"""
Change the cluster state and apply right handler on each connections
......@@ -639,10 +511,10 @@ class Application(object):
client_handler = client.ClientServiceHandler(self)
if state == ClusterStates.RECOVERING:
storage_handler = recovery.RecoveryHandler(self)
elif state == ClusterStates.VERIFYING:
storage_handler = verification.VerificationHandler(self)
elif state == ClusterStates.RUNNING:
storage_handler = storage.StorageServiceHandler(self)
elif self._current_manager is not None:
storage_handler = self._current_manager.getHandler()
else:
RuntimeError('Unexpected cluster state')
......@@ -735,7 +607,8 @@ class Application(object):
state = NodeStates.RUNNING
handler = None
if self.cluster_state == ClusterStates.RECOVERING:
if uuid is None:
# accept storage nodes when recovery is over
if uuid is None and self._startup_allowed:
logging.info('reject empty storage node')
raise protocol.NotReadyError
handler = recovery.RecoveryHandler(self)
......@@ -747,7 +620,6 @@ class Application(object):
# uuid from the test framework. It's safe since nodes with a
# conflicting UUID are rejected in the identification handler.
state = NodeStates.PENDING
handler = verification.VerificationHandler
elif self.cluster_state == ClusterStates.RUNNING:
if uuid is None or node is None:
# same as for verification
......@@ -787,7 +659,11 @@ class Application(object):
logging.info('Accept a client %s' % (dump(uuid), ))
elif node_type == NodeTypes.STORAGE:
node_ctor = self.nm.createStorage
(uuid, state, handler) = self.identifyStorageNode(uuid, node)
if self._current_manager is not None:
identify = self._current_manager.identifyStorageNode
(uuid, state, handler) = identify(uuid, node)
else:
(uuid, state, handler) = self.identifyStorageNode(uuid, node)
logging.info('Accept a storage %s (%s)' % (dump(uuid), state))
return (uuid, node, state, handler, node_ctor)
......@@ -35,7 +35,11 @@ class AdministrationHandler(MasterHandler):
conn.answer(Packets.AnswerPrimary(app.uuid, []))
def setClusterState(self, conn, state):
self.app.changeClusterState(state)
# XXX: dedicate a packet to start the cluster
if state == ClusterStates.VERIFYING:
self.app._startup_allowed = True
else:
self.app.changeClusterState(state)
p = protocol.ack('cluster state changed')
conn.answer(p)
if state == ClusterStates.STOPPING:
......
......@@ -32,8 +32,10 @@ class RecoveryHandler(MasterHandler):
node.setState(new_state)
def connectionCompleted(self, conn):
# ask the last IDs to perform the recovery
conn.ask(Packets.AskLastIDs())
# XXX: handler split review needed to remove this hack
if not self.app._startup_allowed:
# ask the last IDs to perform the recovery
conn.ask(Packets.AskLastIDs())
def answerLastIDs(self, conn, loid, ltid, lptid):
app = self.app
......
#
# Copyright (C) 2006-2010 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
from neo import logging
from neo.master.handlers import BaseServiceHandler
from neo.exception import VerificationFailure
from neo.util import dump
class VerificationHandler(BaseServiceHandler):
"""This class deals with events for a verification phase."""
def connectionCompleted(self, conn):
pass
def nodeLost(self, conn, node):
if not self.app.pt.operational():
raise VerificationFailure, 'cannot continue verification'
def answerLastIDs(self, conn, loid, ltid, lptid):
app = self.app
# If I get a bigger value here, it is dangerous.
if app.loid < loid or ltid > app.tm.getLastTID() \
or app.pt.getID() < lptid:
logging.critical('got later information in verification')
raise VerificationFailure
def answerUnfinishedTransactions(self, conn, tid_list):
uuid = conn.getUUID()
logging.info('got unfinished transactions %s from %s:%d',
tid_list, *(conn.getAddress()))
app = self.app
if app.asking_uuid_dict.get(uuid, True):
# No interest.
return
app.unfinished_tid_set.update(tid_list)
app.asking_uuid_dict[uuid] = True
def answerTransactionInformation(self, conn, tid,
user, desc, ext, oid_list):
uuid = conn.getUUID()
app = self.app
if app.asking_uuid_dict.get(uuid, True):
# No interest.
return
oid_set = set(oid_list)
if app.unfinished_oid_set is None:
# Someone does not agree.
pass
elif len(app.unfinished_oid_set) == 0:
# This is the first answer.
app.unfinished_oid_set.update(oid_set)
elif app.unfinished_oid_set != oid_set:
app.unfinished_oid_set = None
app.asking_uuid_dict[uuid] = True
def tidNotFound(self, conn, message):
uuid = conn.getUUID()
logging.info('TID not found: %s', message)
app = self.app
if app.asking_uuid_dict.get(uuid, True):
# No interest.
return
app.unfinished_oid_set = None
app.asking_uuid_dict[uuid] = True
def answerObjectPresent(self, conn, oid, tid):
uuid = conn.getUUID()
logging.info('object %s:%s found', dump(oid), dump(tid))
app = self.app
if app.asking_uuid_dict.get(uuid, True):
# No interest.
return
app.asking_uuid_dict[uuid] = True
def oidNotFound(self, conn, message):
uuid = conn.getUUID()
logging.info('OID not found: %s', message)
app = self.app
if app.asking_uuid_dict.get(uuid, True):
# No interest.
return
app.object_present = False
app.asking_uuid_dict[uuid] = True
#
# Copyright (C) 2006-2010 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
from neo import logging
from neo.util import dump
from neo.protocol import ClusterStates, Packets, NodeTypes, NodeStates
from neo.master.handlers import BaseServiceHandler
class VerificationFailure(Exception):
"""
Exception raised each time the cluster integrity failed.
- An required storage node is missing
- A transaction or an object is missing on a node
"""
pass
class VerificationManager(BaseServiceHandler):
"""
Manager for verification step of a NEO cluster:
- Wait for at least one available storage per partition
- Check if all expected content is present
"""
def __init__(self, app):
BaseServiceHandler.__init__(self, app)
self._oid_set = set()
self._tid_set = set()
self._uuid_dict = {}
self._object_present = False
def getHandler(self):
return self
def identifyStorageNode(self, uuid, node):
"""
Returns the handler to manager the given node
"""
state = NodeStates.RUNNING
if uuid is None or node is None:
# if node is unknown, it has been forget when the current
# partition was validated by the admin
# Here the uuid is not cleared to allow lookup pending nodes by
# uuid from the test framework. It's safe since nodes with a
# conflicting UUID are rejected in the identification handler.
state = NodeStates.PENDING
return (uuid, state, self)
def run(self):
self.app.changeClusterState(ClusterStates.VERIFYING)
while True:
try:
self.verifyData()
except VerificationFailure:
continue
break
# At this stage, all non-working nodes are out-of-date.
cell_list = self.app.pt.outdate()
# Tweak the partition table, if the distribution of storage nodes
# is not uniform.
cell_list.extend(self.app.pt.tweak())
# If anything changed, send the changes.
self.app.broadcastPartitionChanges(cell_list)
def verifyData(self):
"""Verify the data in storage nodes and clean them up, if necessary."""
em, nm = self.app.em, self.app.nm
# wait for any missing node
logging.debug('waiting for the cluster to be operational')
while not self.app.pt.operational():
em.poll(1)
logging.info('start to verify data')
# Gather all unfinished transactions.
for conn in em.getConnectionList():
uuid = conn.getUUID()
if uuid is not None:
node = nm.getByUUID(uuid)
if node.isStorage():
self._uuid_dict[uuid] = False
conn.ask(Packets.AskUnfinishedTransactions())
while True:
em.poll(1)
if not self.app.pt.operational():
raise VerificationFailure
if False not in self._uuid_dict.values():
break
# Gather OIDs for each unfinished TID, and verify whether the
# transaction can be finished or must be aborted. This could be
# in parallel in theory, but not so easy. Thus do it one-by-one
# at the moment.
for tid in self._tid_set:
uuid_set = self.verifyTransaction(tid)
if uuid_set is None:
# Make sure that no node has this transaction.
for conn in em.getConnectionList():
uuid = conn.getUUID()
if uuid is not None:
node = nm.getByUUID(uuid)
if node.isStorage():
conn.notify(Packets.DeleteTransaction(tid))
else:
for conn in em.getConnectionList():
uuid = conn.getUUID()
if uuid in uuid_set:
conn.ask(Packets.CommitTransaction(tid))
# If possible, send the packets now.
em.poll(0)
def verifyTransaction(self, tid):
em = self.app.em
uuid_set = set()
# Determine to which nodes I should ask.
partition = self.app.pt.getPartition(tid)
transaction_uuid_list = [cell.getUUID() for cell \
in self.app.pt.getCellList(partition, readable=True)]
if len(transaction_uuid_list) == 0:
raise VerificationFailure
uuid_set.update(transaction_uuid_list)
# Gather OIDs.
self._uuid_dict = {}
for conn in em.getConnectionList():
uuid = conn.getUUID()
if uuid in transaction_uuid_list:
self._uuid_dict[uuid] = False
conn.ask(Packets.AskTransactionInformation(tid))
if len(self._uuid_dict) == 0:
raise VerificationFailure
while True:
em.poll(1)
if not self.app.pt.operational():
raise VerificationFailure
if False not in self._uuid_dict.values():
break
if self._oid_set is None or len(self._oid_set) == 0:
# Not commitable.
return None
# Verify that all objects are present.
for oid in self._oid_set:
self._uuid_dict.clear()
partition = self.app.pt.getPartition(oid)
object_uuid_list = [cell.getUUID() for cell \
in self.app.pt.getCellList(partition, readable=True)]
if len(object_uuid_list) == 0:
raise VerificationFailure
uuid_set.update(object_uuid_list)
self._object_present = True
for conn in em.getConnectionList():
uuid = conn.getUUID()
if uuid in object_uuid_list:
self._uuid_dict[uuid] = False
conn.ask(Packets.AskObjectPresent(oid, tid))
while True:
em.poll(1)
if not self.app.pt.operational():
raise VerificationFailure
if False not in self._uuid_dict.values():
break
if not self._object_present:
# Not commitable.
return None
return uuid_set
def answerLastIDs(self, conn, loid, ltid, lptid):
app = self.app
# If I get a bigger value here, it is dangerous.
if app.loid < loid or ltid > app.tm.getLastTID() \
or app.pt.getID() < lptid:
logging.critical('got later information in verification')
raise VerificationFailure
def answerUnfinishedTransactions(self, conn, tid_list):
uuid = conn.getUUID()
logging.info('got unfinished transactions %s from %s:%d',
tid_list, *(conn.getAddress()))
if self._uuid_dict.get(uuid, True):
# No interest.
return
self._tid_set.update(tid_list)
self._uuid_dict[uuid] = True
def answerTransactionInformation(self, conn, tid,
user, desc, ext, oid_list):
uuid = conn.getUUID()
app = self.app
if self._uuid_dict.get(uuid, True):
# No interest.
return
oid_set = set(oid_list)
if self._oid_set is None:
# Someone does not agree.
pass
elif len(self._oid_set) == 0:
# This is the first answer.
self._oid_set.update(oid_set)
elif self._oid_set != oid_set:
self._oid_set = None
self._uuid_dict[uuid] = True
def tidNotFound(self, conn, message):
uuid = conn.getUUID()
logging.info('TID not found: %s', message)
if self._uuid_dict.get(uuid, True):
# No interest.
return
self._oid_set = None
self._uuid_dict[uuid] = True
def answerObjectPresent(self, conn, oid, tid):
uuid = conn.getUUID()
logging.info('object %s:%s found', dump(oid), dump(tid))
if self._uuid_dict.get(uuid, True):
# No interest.
return
self._uuid_dict[uuid] = True
def oidNotFound(self, conn, message):
uuid = conn.getUUID()
logging.info('OID not found: %s', message)
app = self.app
if self._uuid_dict.get(uuid, True):
# No interest.
return
app.object_present = False
self._uuid_dict[uuid] = True
def connectionCompleted(self, conn):
pass
def nodeLost(self, conn, node):
if not self.app.pt.operational():
raise VerificationFailure, 'cannot continue verification'
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment